From 21259e6591f15d1b6b95cb28cceac53fcc259cbd Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 27 Jun 2024 11:03:57 +0200 Subject: [PATCH] Better accounting data retrieve --- crates/dbconn/src/channelinfo.rs | 12 +- crates/dbconn/src/worker.rs | 20 + crates/httpret/src/api4/accounting.rs | 381 ++++++++++++-------- crates/httpret/src/httpret.rs | 6 +- crates/httpret/src/proxy.rs | 4 +- crates/httpret/src/proxy/api4/backend.rs | 24 +- crates/netpod/src/netpod.rs | 9 +- crates/query/src/api4.rs | 19 + crates/query/src/api4/binned.rs | 26 +- crates/query/src/api4/events.rs | 2 +- crates/scyllaconn/src/accounting/toplist.rs | 124 +++++-- crates/scyllaconn/src/events2/events.rs | 18 +- crates/scyllaconn/src/worker.rs | 29 ++ crates/taskrun/Cargo.toml | 2 +- 14 files changed, 456 insertions(+), 220 deletions(-) diff --git a/crates/dbconn/src/channelinfo.rs b/crates/dbconn/src/channelinfo.rs index 2433ed4..a33dacc 100644 --- a/crates/dbconn/src/channelinfo.rs +++ b/crates/dbconn/src/channelinfo.rs @@ -19,13 +19,13 @@ pub struct ChannelInfo { pub kind: u16, } -pub async fn info_for_series_ids(series_ids: &[u64], pg: &Client) -> Result)>, Error> { +pub async fn info_for_series_ids(series_ids: &[u64], pg: &Client) -> Result>, Error> { let (ord, seriess) = series_ids .iter() .enumerate() - .fold((Vec::new(), Vec::new()), |mut a, x| { - a.0.push(x.0 as i32); - a.1.push(*x.1 as i64); + .fold((Vec::new(), Vec::new()), |mut a, (i, &series)| { + a.0.push(i as i32); + a.1.push(series as i64); a }); let sql = concat!( @@ -62,9 +62,9 @@ pub async fn info_for_series_ids(series_ids: &[u64], pg: &Client) -> Result>), ChConfForSeries(String, u64, Sender>), + InfoForSeriesIds( + Vec, + Sender>, crate::channelinfo::Error>>, + ), } #[derive(Debug, Clone)] @@ -59,6 +63,16 @@ impl PgQueue { self.tx.send(job).await.map_err(|_| Error::ChannelSend)?; Ok(rx) } + + pub async fn info_for_series_ids( + &self, + series_ids: Vec, + ) -> Result>, crate::channelinfo::Error>>, Error> { + let (tx, rx) = async_channel::bounded(1); + let job = Job::InfoForSeriesIds(series_ids, tx); + self.tx.send(job).await.map_err(|_| Error::ChannelSend)?; + Ok(rx) + } } #[derive(Debug)] @@ -106,6 +120,12 @@ impl PgWorker { // TODO count for stats } } + Job::InfoForSeriesIds(ids, tx) => { + let res = crate::channelinfo::info_for_series_ids(&ids, &self.pg).await; + if tx.send(res.map_err(Into::into)).await.is_err() { + // TODO count for stats + } + } } } } diff --git a/crates/httpret/src/api4/accounting.rs b/crates/httpret/src/api4/accounting.rs index 0c12850..5c7d311 100644 --- a/crates/httpret/src/api4/accounting.rs +++ b/crates/httpret/src/api4/accounting.rs @@ -2,6 +2,8 @@ use crate::bodystream::response; use crate::err::Error; use crate::requests::accepts_json_or_all; use crate::ReqCtx; +use crate::ServiceSharedResources; +use dbconn::worker::PgQueue; use err::ToPublicError; use futures_util::StreamExt; use http::Method; @@ -17,30 +19,122 @@ use items_0::Extendable; use items_2::accounting::AccountingEvents; use netpod::log::*; use netpod::req_uri_to_url; +use netpod::ttl::RetentionTime; use netpod::FromUrl; use netpod::NodeConfigCached; use netpod::Shape; +use netpod::TsMs; use query::api4::AccountingIngestedBytesQuery; use query::api4::AccountingToplistQuery; +use scyllaconn::accounting::toplist::UsageData; use serde::Deserialize; use serde::Serialize; -use std::collections::BTreeMap; -pub struct AccountingIngestedBytes {} +#[derive(Debug, Serialize, Deserialize)] +pub struct Toplist { + dim0: AccountedIngested, + dim1: AccountedIngested, + infos_count_total: usize, + infos_missing_count: usize, + top1_usage_len: usize, + scalar_count: usize, + wave_count: usize, + found: usize, + mismatch_count: usize, +} -impl AccountingIngestedBytes { +impl Toplist { + fn new() -> Self { + Self { + dim0: AccountedIngested::new(), + dim1: AccountedIngested::new(), + infos_count_total: 0, + infos_missing_count: 0, + top1_usage_len: 0, + scalar_count: 0, + wave_count: 0, + found: 0, + mismatch_count: 0, + } + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct AccountedIngested { + names: Vec, + counts: Vec, + bytes: Vec, +} + +impl AccountedIngested { + fn new() -> Self { + Self { + names: Vec::new(), + counts: Vec::new(), + bytes: Vec::new(), + } + } + + fn push(&mut self, name: String, counts: u64, bytes: u64) { + self.names.push(name); + self.counts.push(counts); + self.bytes.push(bytes); + } + + fn sort_by_counts(&mut self) { + let mut tmp: Vec<_> = self + .counts + .iter() + .map(|&x| x) + .enumerate() + .map(|(i, x)| (x, i)) + .collect(); + tmp.sort_unstable(); + let tmp: Vec<_> = tmp.into_iter().rev().map(|x| x.1).collect(); + self.reorder_by_index_list(&tmp); + } + + fn sort_by_bytes(&mut self) { + let mut tmp: Vec<_> = self.bytes.iter().map(|&x| x).enumerate().map(|(i, x)| (x, i)).collect(); + tmp.sort_unstable(); + let tmp: Vec<_> = tmp.into_iter().rev().map(|x| x.1).collect(); + self.reorder_by_index_list(&tmp); + } + + fn reorder_by_index_list(&mut self, tmp: &[usize]) { + self.names = tmp.iter().map(|&x| self.names[x].clone()).collect(); + self.counts = tmp.iter().map(|&x| self.counts[x]).collect(); + self.bytes = tmp.iter().map(|&x| self.bytes[x]).collect(); + } + + fn truncate(&mut self, len: usize) { + self.names.truncate(len); + self.counts.truncate(len); + self.bytes.truncate(len); + } +} + +pub struct AccountingIngested {} + +impl AccountingIngested { pub fn handler(req: &Requ) -> Option { - if req.uri().path().starts_with("/api/4/accounting/ingested/bytes") { + if req.uri().path().starts_with("/api/4/accounting/ingested") { Some(Self {}) } else { None } } - pub async fn handle(&self, req: Requ, ctx: &ReqCtx, ncc: &NodeConfigCached) -> Result { + pub async fn handle( + &self, + req: Requ, + ctx: &ReqCtx, + shared_res: &ServiceSharedResources, + ncc: &NodeConfigCached, + ) -> Result { if req.method() == Method::GET { if accepts_json_or_all(req.headers()) { - match self.handle_get(req, ctx, ncc).await { + match self.handle_get(req, ctx, shared_res, ncc).await { Ok(x) => Ok(x), Err(e) => { error!("{e}"); @@ -57,64 +151,44 @@ impl AccountingIngestedBytes { } } - async fn handle_get(&self, req: Requ, ctx: &ReqCtx, ncc: &NodeConfigCached) -> Result { - let url = req_uri_to_url(req.uri())?; - let q = AccountingIngestedBytesQuery::from_url(&url)?; - let res = self.fetch_data(q, ctx, ncc).await?; - let body = ToJsonBody::from(&res).into_body(); - Ok(response(StatusCode::OK).body(body)?) - } - - async fn fetch_data( + async fn handle_get( &self, - q: AccountingIngestedBytesQuery, - _ctx: &ReqCtx, + req: Requ, + ctx: &ReqCtx, + shared_res: &ServiceSharedResources, ncc: &NodeConfigCached, - ) -> Result { - let scyco = ncc - .node_config - .cluster - .scylla_st() - .ok_or_else(|| Error::with_public_msg_no_trace(format!("no scylla configured")))?; - let scy = scyllaconn::conn::create_scy_session(scyco).await?; - let mut stream = scyllaconn::accounting::totals::AccountingStreamScylla::new(q.range().try_into()?, scy); - let mut ret = AccountingEvents::empty(); - while let Some(item) = stream.next().await { - let mut item = item?; - ret.extend_from(&mut item); + ) -> Result { + let url = req_uri_to_url(req.uri())?; + let qu = AccountingToplistQuery::from_url(&url)?; + let res = fetch_data(qu.rt(), qu.ts().to_ts_ms(), ctx, shared_res, ncc).await?; + let mut ret = AccountedIngested::new(); + for e in res.dim0.names { + ret.names.push(e) } - Ok(ret) - } -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct Toplist { - dim0: Vec<(String, u64, u64)>, - dim1: Vec<(String, u64, u64)>, - infos_count_total: usize, - infos_missing_count: usize, - top1_usage_len: usize, - scalar_count: usize, - wave_count: usize, - found: usize, - incomplete_count: usize, - mismatch_count: usize, -} - -impl Toplist { - fn new() -> Self { - Self { - dim0: Vec::new(), - dim1: Vec::new(), - infos_count_total: 0, - infos_missing_count: 0, - top1_usage_len: 0, - scalar_count: 0, - wave_count: 0, - found: 0, - incomplete_count: 0, - mismatch_count: 0, + for e in res.dim0.counts { + ret.counts.push(e) } + for e in res.dim0.bytes { + ret.bytes.push(e) + } + for e in res.dim1.names { + ret.names.push(e) + } + for e in res.dim1.counts { + ret.counts.push(e) + } + for e in res.dim1.bytes { + ret.bytes.push(e) + } + if let Some(sort) = qu.sort() { + if sort == "counts" { + // ret.sort_by_counts(); + } else if sort == "bytes" { + // ret.sort_by_bytes(); + } + } + let body = ToJsonBody::from(&ret).into_body(); + Ok(response(StatusCode::OK).body(body)?) } } @@ -129,10 +203,16 @@ impl AccountingToplistCounts { } } - pub async fn handle(&self, req: Requ, ctx: &ReqCtx, ncc: &NodeConfigCached) -> Result { + pub async fn handle( + &self, + req: Requ, + ctx: &ReqCtx, + shared_res: &ServiceSharedResources, + ncc: &NodeConfigCached, + ) -> Result { if req.method() == Method::GET { if accepts_json_or_all(req.headers()) { - match self.handle_get(req, ctx, ncc).await { + match self.handle_get(req, ctx, shared_res, ncc).await { Ok(x) => Ok(x), Err(e) => { error!("{e}"); @@ -149,99 +229,102 @@ impl AccountingToplistCounts { } } - async fn handle_get(&self, req: Requ, ctx: &ReqCtx, ncc: &NodeConfigCached) -> Result { + async fn handle_get( + &self, + req: Requ, + ctx: &ReqCtx, + shared_res: &ServiceSharedResources, + ncc: &NodeConfigCached, + ) -> Result { let url = req_uri_to_url(req.uri())?; let qu = AccountingToplistQuery::from_url(&url)?; - let res = self.fetch_data(qu, ctx, ncc).await?; + let res = fetch_data(qu.rt(), qu.ts().to_ts_ms(), ctx, shared_res, ncc).await?; let body = ToJsonBody::from(&res).into_body(); Ok(response(StatusCode::OK).body(body)?) } +} - async fn fetch_data( - &self, - qu: AccountingToplistQuery, - _ctx: &ReqCtx, - ncc: &NodeConfigCached, - ) -> Result { - let list_len_max = qu.limit() as usize; - // TODO assumes that accounting data is in the LT keyspace - let scyco = ncc - .node_config - .cluster - .scylla_lt() - .ok_or_else(|| Error::with_public_msg_no_trace(format!("no lt scylla configured")))?; - let scy = scyllaconn::conn::create_scy_session(scyco).await?; - let pgconf = &ncc.node_config.cluster.database; - let (pg, pgjh) = dbconn::create_connection(&pgconf).await?; - let mut top1 = scyllaconn::accounting::toplist::read_ts(qu.ts().ns(), scy).await?; - top1.sort_by_counts(); - let mut ret = Toplist::new(); - let top1_usage = top1.usage(); - ret.top1_usage_len = top1_usage.len(); - let usage_map_0: BTreeMap = top1_usage.iter().map(|x| (x.0, (x.1, x.2))).collect(); - let mut usage_it = usage_map_0.iter(); - loop { - let mut series_ids = Vec::new(); - let mut usages = Vec::new(); - while let Some(u) = usage_it.next() { - series_ids.push(*u.0); - usages.push(u.1.clone()); - if series_ids.len() >= 200 { - break; - } - } - if series_ids.len() == 0 { - break; - } - let infos = dbconn::channelinfo::info_for_series_ids(&series_ids, &pg) - .await - .map_err(Error::from_to_string)?; - for (_series, info_res) in &infos { - if let Some(info) = info_res { - match &info.shape { - Shape::Scalar => { - ret.scalar_count += 1; - } - Shape::Wave(_) => { - ret.wave_count += 1; - } - _ => {} - } - } - } - if usages.len() > infos.len() { - ret.incomplete_count += usages.len() - infos.len(); - } - if infos.len() > usages.len() { - ret.incomplete_count += infos.len() - usages.len(); - } - for ((series2, info_res), usage) in infos.into_iter().zip(usages.into_iter()) { - if let Some(info) = info_res { - if series2 != info.series { - ret.mismatch_count += 1; - } - ret.infos_count_total += 1; - // if info.name == "SINSB04-RMOD:PULSE-I-WF" { - // ret.found += 1; - // } - match &info.shape { - Shape::Scalar => { - ret.dim0.push((info.name, usage.0, usage.1)); - } - Shape::Wave(_) => { - ret.dim1.push((info.name, usage.0, usage.1)); - } - Shape::Image(_, _) => {} - } - } else { - ret.infos_missing_count += 1; - } - } - } - ret.dim0.sort_by_cached_key(|x| u64::MAX - x.1); - ret.dim1.sort_by_cached_key(|x| u64::MAX - x.1); - ret.dim0.truncate(list_len_max); - ret.dim1.truncate(list_len_max); +async fn fetch_data( + rt: RetentionTime, + ts: TsMs, + _ctx: &ReqCtx, + shared_res: &ServiceSharedResources, + _ncc: &NodeConfigCached, +) -> Result { + let list_len_max = 10000000; + if let Some(scyqu) = &shared_res.scyqueue { + let x = scyqu + .accounting_read_ts(rt, ts) + .await + .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; + let mut ret = resolve_usages(x, &shared_res.pgqueue).await?; + // ret.dim0.sort_by_bytes(); + // ret.dim1.sort_by_bytes(); + // ret.dim0.truncate(list_len_max); + // ret.dim1.truncate(list_len_max); Ok(ret) + } else { + Err(Error::with_public_msg_no_trace("not a scylla backend")) } } + +async fn resolve_usages(usage: UsageData, pgqu: &PgQueue) -> Result { + let mut ret = Toplist::new(); + let mut series_id_it = usage.series().iter().map(|&x| x); + let mut usage_skip = 0; + loop { + let mut series_ids = Vec::new(); + while let Some(u) = series_id_it.next() { + series_ids.push(u); + if series_ids.len() >= 1000 { + break; + } + } + if series_ids.len() == 0 { + break; + } + let infos = pgqu + .info_for_series_ids(series_ids.clone()) + .await + .map_err(|e| Error::with_msg_no_trace(e.to_string()))? + .recv() + .await + .map_err(|e| Error::with_msg_no_trace(e.to_string()))? + .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; + if infos.len() != series_ids.len() { + return Err(Error::with_msg_no_trace("database result len mismatch")); + } + let nn = series_ids.len(); + for ((series, info_res), (counts, bytes)) in series_ids.into_iter().zip(infos.into_iter()).zip( + usage + .counts() + .iter() + .skip(usage_skip) + .map(|&x| x) + .zip(usage.bytes().iter().skip(usage_skip).map(|&x| x)), + ) { + if let Some(info) = info_res { + if series != info.series { + return Err(Error::with_msg_no_trace("lookup mismatch")); + } + ret.infos_count_total += 1; + match &info.shape { + Shape::Scalar => { + ret.scalar_count += 1; + ret.dim0.push(info.name, counts, bytes); + } + Shape::Wave(_) => { + ret.wave_count += 1; + ret.dim1.push(info.name, counts, bytes); + } + Shape::Image(_, _) => {} + } + } else { + ret.infos_missing_count += 1; + ret.dim0.push("UNRESOLVEDSERIES".into(), counts, bytes); + } + } + usage_skip += nn; + } + Ok(ret) +} diff --git a/crates/httpret/src/httpret.rs b/crates/httpret/src/httpret.rs index cb69535..a672ddf 100644 --- a/crates/httpret/src/httpret.rs +++ b/crates/httpret/src/httpret.rs @@ -376,10 +376,10 @@ async fn http_service_inner( Ok(h.handle(req, &shared_res, &node_config).await?) } else if let Some(h) = channelconfig::AmbigiousChannelNames::handler(&req) { Ok(h.handle(req, &node_config).await?) + } else if let Some(h) = api4::accounting::AccountingIngested::handler(&req) { + Ok(h.handle(req, ctx, &shared_res, &node_config).await?) } else if let Some(h) = api4::accounting::AccountingToplistCounts::handler(&req) { - Ok(h.handle(req, ctx, &node_config).await?) - } else if let Some(h) = api4::accounting::AccountingIngestedBytes::handler(&req) { - Ok(h.handle(req, ctx, &node_config).await?) + Ok(h.handle(req, ctx, &shared_res, &node_config).await?) } else if path == "/api/4/prebinned" { if req.method() == Method::GET { Ok(prebinned(req, ctx, &node_config).await?) diff --git a/crates/httpret/src/proxy.rs b/crates/httpret/src/proxy.rs index c95f3f3..ff6561a 100644 --- a/crates/httpret/src/proxy.rs +++ b/crates/httpret/src/proxy.rs @@ -197,6 +197,8 @@ async fn proxy_http_service_inner( h.handle(req, ctx, &proxy_config).await } else if let Some(h) = api4::events::EventsHandler::handler(&req) { h.handle(req, ctx, &proxy_config).await + } else if path == "/api/4/accounting/ingested" { + Ok(proxy_backend_query::(req, ctx, proxy_config).await?) } else if path == "/api/4/accounting/toplist/counts" { Ok(proxy_backend_query::(req, ctx, proxy_config).await?) } else if path == "/api/4/status/connection/events" { @@ -209,8 +211,6 @@ async fn proxy_http_service_inner( Ok(proxy_backend_query::(req, ctx, proxy_config).await?) } else if path == "/api/4/binned" { Ok(proxy_backend_query::(req, ctx, proxy_config).await?) - } else if let Some(_) = crate::api4::accounting::AccountingIngestedBytes::handler(&req) { - Ok(proxy_backend_query::(req, ctx, proxy_config).await?) } else if path == "/api/4/channel/config" { Ok(proxy_backend_query::(req, ctx, proxy_config).await?) } else if path.starts_with("/api/4/test/http/204") { diff --git a/crates/httpret/src/proxy/api4/backend.rs b/crates/httpret/src/proxy/api4/backend.rs index 9ef2101..97f7853 100644 --- a/crates/httpret/src/proxy/api4/backend.rs +++ b/crates/httpret/src/proxy/api4/backend.rs @@ -9,6 +9,8 @@ use httpclient::Requ; use httpclient::StreamResponse; use netpod::ProxyConfig; use netpod::ReqCtx; +use netpod::ServiceVersion; +use std::collections::BTreeMap; pub struct BackendListHandler {} @@ -21,21 +23,19 @@ impl BackendListHandler { } } - pub async fn handle(&self, req: Requ, _ctx: &ReqCtx, _node_config: &ProxyConfig) -> Result { + pub async fn handle(&self, req: Requ, _ctx: &ReqCtx, cfg: &ProxyConfig) -> Result { if req.method() == Method::GET { if accepts_json_or_all(req.headers()) { + let mut list = Vec::new(); + if let Some(g) = &cfg.announce_backends { + for j in g { + let mut map = BTreeMap::new(); + map.insert("name", j.clone()); + list.push(map); + } + } let res = serde_json::json!({ - "backends_available": [ - { - "name": "sf-databuffer", - }, - { - "name": "sf-imagebuffer", - }, - { - "name": "sf-archiver", - }, - ] + "backends_available": list, }); let body = serde_json::to_string(&res)?; Ok(response(StatusCode::OK).body(body_string(body))?) diff --git a/crates/netpod/src/netpod.rs b/crates/netpod/src/netpod.rs index 8d01317..0a28ad1 100644 --- a/crates/netpod/src/netpod.rs +++ b/crates/netpod/src/netpod.rs @@ -287,7 +287,7 @@ impl ScalarType { BOOL => "bool", STRING => "string", Enum => "enum", - ChannelStatus => "ChannelStatus", + ChannelStatus => "channelstatus", } } @@ -307,7 +307,7 @@ impl ScalarType { "bool" => BOOL, "string" => STRING, "enum" => Enum, - "ChannelStatus" => ChannelStatus, + "channelstatus" => ChannelStatus, _ => { return Err(Error::with_msg_no_trace(format!( "from_bsread_str can not understand bsread {:?}", @@ -469,6 +469,10 @@ impl ScalarType { } } + pub fn to_scylla_table_name_id(&self) -> &'static str { + self.to_variant_str() + } + pub fn to_scylla_i32(&self) -> i32 { self.index() as i32 } @@ -2942,6 +2946,7 @@ pub struct ProxyConfig { pub port: u16, pub backends: Vec, pub status_subs: Vec, + pub announce_backends: Option>, } pub trait HasBackend { diff --git a/crates/query/src/api4.rs b/crates/query/src/api4.rs index 4d00698..ed67ec1 100644 --- a/crates/query/src/api4.rs +++ b/crates/query/src/api4.rs @@ -8,6 +8,7 @@ use err::Error; use netpod::get_url_query_pairs; use netpod::log::*; use netpod::range::evrange::SeriesRange; +use netpod::ttl::RetentionTime; use netpod::AppendToUrl; use netpod::FromUrl; use netpod::HasBackend; @@ -82,12 +83,18 @@ impl AppendToUrl for AccountingIngestedBytesQuery { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct AccountingToplistQuery { + rt: RetentionTime, backend: String, ts: TsNano, limit: u32, + sort: Option, } impl AccountingToplistQuery { + pub fn rt(&self) -> RetentionTime { + self.rt.clone() + } + pub fn ts(&self) -> TsNano { self.ts.clone() } @@ -95,6 +102,10 @@ impl AccountingToplistQuery { pub fn limit(&self) -> u32 { self.limit } + + pub fn sort(&self) -> Option<&str> { + self.sort.as_ref().map(|x| x.as_str()) + } } impl HasBackend for AccountingToplistQuery { @@ -135,12 +146,20 @@ impl FromUrl for AccountingToplistQuery { Ok::<_, Error>(TsNano::from_ns(w.to_nanos())) }; let ret = Self { + rt: pairs + .get("retentionTime") + .ok_or_else(|| Error::with_public_msg_no_trace("missing retentionTime")) + .and_then(|x| { + x.parse() + .map_err(|_| Error::with_public_msg_no_trace("missing retentionTime")) + })?, backend: pairs .get("backend") .ok_or_else(|| Error::with_public_msg_no_trace("missing backend"))? .to_string(), ts: fn1(pairs)?, limit: pairs.get("limit").map_or(None, |x| x.parse().ok()).unwrap_or(20), + sort: pairs.get("sort").map(ToString::to_string), }; Ok(ret) } diff --git a/crates/query/src/api4/binned.rs b/crates/query/src/api4/binned.rs index cc2550a..db7e6d3 100644 --- a/crates/query/src/api4/binned.rs +++ b/crates/query/src/api4/binned.rs @@ -4,6 +4,7 @@ use netpod::get_url_query_pairs; use netpod::log::*; use netpod::query::CacheUsage; use netpod::range::evrange::SeriesRange; +use netpod::ttl::RetentionTime; use netpod::AppendToUrl; use netpod::ByteSize; use netpod::FromUrl; @@ -40,6 +41,10 @@ pub struct BinnedQuery { pub merger_out_len_max: Option, #[serde(default, skip_serializing_if = "Option::is_none")] test_do_wasm: Option, + #[serde(default)] + log_level: String, + #[serde(default)] + use_rt: Option, } impl BinnedQuery { @@ -56,6 +61,8 @@ impl BinnedQuery { timeout: None, merger_out_len_max: None, test_do_wasm: None, + log_level: String::new(), + use_rt: None, } } @@ -150,8 +157,11 @@ impl BinnedQuery { } pub fn log_level(&self) -> &str { - // TODO take from query - "" + &self.log_level + } + + pub fn use_rt(&self) -> Option { + self.use_rt.clone() } } @@ -211,6 +221,12 @@ impl FromUrl for BinnedQuery { .get("mergerOutLenMax") .map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?, test_do_wasm: pairs.get("testDoWasm").map(|x| String::from(x)), + log_level: pairs.get("log_level").map_or(String::new(), String::from), + use_rt: pairs.get("useRt").map_or(Ok(None), |k| { + k.parse() + .map(Some) + .map_err(|_| Error::with_public_msg_no_trace(format!("can not parse useRt: {}", k))) + })?, }; debug!("BinnedQuery::from_url {:?}", ret); Ok(ret) @@ -248,5 +264,11 @@ impl AppendToUrl for BinnedQuery { if let Some(x) = &self.test_do_wasm { g.append_pair("testDoWasm", &x); } + if self.log_level.len() != 0 { + g.append_pair("log_level", &self.log_level); + } + if let Some(x) = self.use_rt.as_ref() { + g.append_pair("useRt", &x.to_string()); + } } } diff --git a/crates/query/src/api4/events.rs b/crates/query/src/api4/events.rs index eda148c..3da6465 100644 --- a/crates/query/src/api4/events.rs +++ b/crates/query/src/api4/events.rs @@ -450,7 +450,7 @@ impl From<&BinnedQuery> for EventsSubQuerySettings { // TODO add to query queue_len_disk_io: None, create_errors: Vec::new(), - use_rt: None, + use_rt: value.use_rt(), } } } diff --git a/crates/scyllaconn/src/accounting/toplist.rs b/crates/scyllaconn/src/accounting/toplist.rs index de1d94b..f7e9669 100644 --- a/crates/scyllaconn/src/accounting/toplist.rs +++ b/crates/scyllaconn/src/accounting/toplist.rs @@ -1,76 +1,136 @@ -use crate::errconv::ErrConv; -use err::Error; +use err::thiserror; +use err::ThisError; use futures_util::StreamExt; use netpod::log::*; -use netpod::timeunits; +use netpod::ttl::RetentionTime; +use netpod::TsMs; use netpod::EMIT_ACCOUNTING_SNAP; use scylla::prepared_statement::PreparedStatement; use scylla::Session as ScySession; -use std::sync::Arc; + +#[derive(Debug, ThisError)] +pub enum Error { + ScyllaQuery(#[from] scylla::transport::errors::QueryError), + ScyllaNextRow(#[from] scylla::transport::iterator::NextRowError), + UsageDataMalformed, +} #[derive(Debug)] pub struct UsageData { - ts: u64, - // (series, count, bytes) - usage: Vec<(u64, u64, u64)>, + ts: TsMs, + series: Vec, + counts: Vec, + bytes: Vec, } impl UsageData { - pub fn new(ts: u64) -> Self { - Self { ts, usage: Vec::new() } + pub fn new(ts: TsMs) -> Self { + Self { + ts, + series: Vec::new(), + counts: Vec::new(), + bytes: Vec::new(), + } } - pub fn ts(&self) -> u64 { + pub fn len(&self) -> usize { + self.series.len() + } + + pub fn ts(&self) -> TsMs { self.ts } - pub fn usage(&self) -> &[(u64, u64, u64)] { - &self.usage + pub fn series(&self) -> &[u64] { + &self.series + } + + pub fn counts(&self) -> &[u64] { + &self.counts + } + + pub fn bytes(&self) -> &[u64] { + &self.bytes } pub fn sort_by_counts(&mut self) { - self.usage.sort_unstable_by(|a, b| b.1.cmp(&a.1)) + let mut tmp: Vec<_> = self + .counts + .iter() + .map(|&x| x) + .enumerate() + .map(|(i, x)| (x, i)) + .collect(); + tmp.sort_unstable(); + let tmp: Vec<_> = tmp.into_iter().rev().map(|x| x.1).collect(); + self.reorder_by_index_list(&tmp); } pub fn sort_by_bytes(&mut self) { - self.usage.sort_unstable_by(|a, b| b.2.cmp(&a.2)) + let mut tmp: Vec<_> = self.bytes.iter().map(|&x| x).enumerate().map(|(i, x)| (x, i)).collect(); + tmp.sort_unstable(); + let tmp: Vec<_> = tmp.into_iter().rev().map(|x| x.1).collect(); + self.reorder_by_index_list(&tmp); + } + + fn reorder_by_index_list(&mut self, tmp: &[usize]) { + self.series = tmp.iter().map(|&x| self.series[x]).collect(); + self.counts = tmp.iter().map(|&x| self.counts[x]).collect(); + self.bytes = tmp.iter().map(|&x| self.bytes[x]).collect(); + } + + fn verify(&self) -> Result<(), Error> { + if self.counts.len() != self.series.len() { + Err(Error::UsageDataMalformed) + } else if self.bytes.len() != self.series.len() { + Err(Error::UsageDataMalformed) + } else { + Ok(()) + } } } -pub async fn read_ts(ts: u64, scy: Arc) -> Result { +pub async fn read_ts(ks: &str, rt: RetentionTime, ts: TsMs, scy: &ScySession) -> Result { // TODO toplist::read_ts refactor - info!("TODO toplist::read_ts refactor"); - let snap = EMIT_ACCOUNTING_SNAP.ms() / 1000; - info!("ts {ts} snap {snap:?}"); - let ts = ts / timeunits::SEC / snap * snap; - let ret = read_ts_inner(ts, scy).await?; + let snap = EMIT_ACCOUNTING_SNAP.ms(); + let ts = TsMs::from_ms_u64(ts.ms() / snap * snap); + let ret = read_ts_inner(ks, rt, ts, scy).await?; Ok(ret) } -async fn read_ts_inner(ts: u64, scy: Arc) -> Result { +async fn read_ts_inner(ks: &str, rt: RetentionTime, ts: TsMs, scy: &ScySession) -> Result { type RowType = (i64, i64, i64); - let cql = concat!("select series, count, bytes from lt_account_00 where part = ? and ts = ?"); - let qu = prep(cql, scy.clone()).await?; + let cql = format!( + concat!( + "select series, count, bytes", + " from {}.{}account_00", + " where part = ? and ts = ?" + ), + ks, + rt.table_prefix() + ); + let qu = prep(&cql, scy).await?; + let ts_sec = ts.ms() as i64 / 1000; let mut ret = UsageData::new(ts); for part in 0..255_u32 { let mut res = scy - .execute_iter(qu.clone(), (part as i32, ts as i64)) - .await - .err_conv()? + .execute_iter(qu.clone(), (part as i32, ts_sec)) + .await? .into_typed::(); while let Some(row) = res.next().await { - let row = row.map_err(Error::from_string)?; + let row = row?; let series = row.0 as u64; let count = row.1 as u64; let bytes = row.2 as u64; - ret.usage.push((series, count, bytes)); + ret.series.push(series); + ret.counts.push(count); + ret.bytes.push(bytes); } } + ret.verify()?; Ok(ret) } -async fn prep(cql: &str, scy: Arc) -> Result { - scy.prepare(cql) - .await - .map_err(|e| Error::with_msg_no_trace(format!("cql error {e}"))) +async fn prep(cql: &str, scy: &ScySession) -> Result { + Ok(scy.prepare(cql).await?) } diff --git a/crates/scyllaconn/src/events2/events.rs b/crates/scyllaconn/src/events2/events.rs index 6132502..ca1c697 100644 --- a/crates/scyllaconn/src/events2/events.rs +++ b/crates/scyllaconn/src/events2/events.rs @@ -229,17 +229,15 @@ impl Stream for EventsStreamRt { ); let mut r = items_2::merger::Mergeable::new_empty(&item); match items_2::merger::Mergeable::find_highest_index_lt(&item, self.ts_seen_max) { - Some(ix) => { - match items_2::merger::Mergeable::drain_into(&mut item, &mut r, (0, ix)) { - Ok(()) => {} - Err(e) => { - self.state = State::Done; - break Ready(Some(Err(e.into()))); - } + Some(ix) => match items_2::merger::Mergeable::drain_into(&mut item, &mut r, (0, 1 + ix)) { + Ok(()) => { + // TODO count for metrics } - // self.state = State::Done; - // break Ready(Some(Err(Error::Unordered))); - } + Err(e) => { + self.state = State::Done; + break Ready(Some(Err(e.into()))); + } + }, None => { self.state = State::Done; break Ready(Some(Err(Error::TruncateLogic))); diff --git a/crates/scyllaconn/src/worker.rs b/crates/scyllaconn/src/worker.rs index 99d236d..75d5452 100644 --- a/crates/scyllaconn/src/worker.rs +++ b/crates/scyllaconn/src/worker.rs @@ -27,6 +27,7 @@ pub enum Error { ChannelSend, ChannelRecv, Join, + Toplist(#[from] crate::accounting::toplist::Error), } #[derive(Debug)] @@ -40,6 +41,11 @@ enum Job { Sender, Error>>, ), ReadNextValues(ReadNextValues), + AccountingReadTs( + RetentionTime, + TsMs, + Sender>, + ), } struct ReadNextValues { @@ -98,6 +104,18 @@ impl ScyllaQueue { let res = rx.recv().await.map_err(|_| Error::ChannelRecv)??; Ok(res) } + + pub async fn accounting_read_ts( + &self, + rt: RetentionTime, + ts: TsMs, + ) -> Result { + let (tx, rx) = async_channel::bounded(1); + let job = Job::AccountingReadTs(rt, ts, tx); + self.tx.send(job).await.map_err(|_| Error::ChannelSend)?; + let res = rx.recv().await.map_err(|_| Error::ChannelRecv)??; + Ok(res) + } } #[derive(Debug)] @@ -160,6 +178,17 @@ impl ScyllaWorker { // TODO count for stats } } + Job::AccountingReadTs(rt, ts, tx) => { + let ks = match &rt { + RetentionTime::Short => &self.scyconf_st.keyspace, + RetentionTime::Medium => &self.scyconf_mt.keyspace, + RetentionTime::Long => &self.scyconf_lt.keyspace, + }; + let res = crate::accounting::toplist::read_ts(&ks, rt, ts, &scy).await; + if tx.send(res.map_err(Into::into)).await.is_err() { + // TODO count for stats + } + } } } } diff --git a/crates/taskrun/Cargo.toml b/crates/taskrun/Cargo.toml index 4bf9336..890647e 100644 --- a/crates/taskrun/Cargo.toml +++ b/crates/taskrun/Cargo.toml @@ -8,7 +8,7 @@ edition = "2021" path = "src/taskrun.rs" [dependencies] -tokio = { version = "1.37.0", features = ["full", "tracing", "time"] } +tokio = { version = "1.38.0", features = ["full", "tracing", "time"] } futures-util = "0.3.28" tracing = "0.1.40" tracing-log = "0.2.0"