From 53dc4b7361cb1290e304ca06516d4d5f1237e078 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 11 Dec 2024 16:57:20 +0100 Subject: [PATCH] Rework support for vec string --- crates/dbconn/Cargo.toml | 1 + crates/dbconn/src/channelinfo.rs | 16 +- crates/httpret/src/api4/accounting.rs | 145 +++++++----- crates/httpret/src/api4/binned.rs | 63 +++++- crates/scyllaconn/src/events.rs | 306 +++++--------------------- 5 files changed, 213 insertions(+), 318 deletions(-) diff --git a/crates/dbconn/Cargo.toml b/crates/dbconn/Cargo.toml index b74d00a..7d668f8 100644 --- a/crates/dbconn/Cargo.toml +++ b/crates/dbconn/Cargo.toml @@ -20,6 +20,7 @@ pin-project = "1" async-channel = "1.9.0" chrono = "0.4.38" regex = "1.10.4" +autoerr = "0.0.3" daqbuf-err = { path = "../../../daqbuf-err" } netpod = { path = "../../../daqbuf-netpod", package = "daqbuf-netpod" } parse = { path = "../../../daqbuf-parse", package = "daqbuf-parse" } diff --git a/crates/dbconn/src/channelinfo.rs b/crates/dbconn/src/channelinfo.rs index 43dfe8a..7e30814 100644 --- a/crates/dbconn/src/channelinfo.rs +++ b/crates/dbconn/src/channelinfo.rs @@ -1,16 +1,14 @@ -use daqbuf_err as err; -use err::thiserror; -use err::ThisError; use netpod::ScalarType; use netpod::Shape; use tokio_postgres::Client; -#[derive(Debug, ThisError)] -#[cstm(name = "ChannelInfo")] -pub enum Error { - Pg(#[from] crate::pg::Error), - BadValue, -} +autoerr::create_error_v1!( + name(Error, "ChannelInfo"), + enum variants { + Pg(#[from] crate::pg::Error), + BadValue, + }, +); pub struct ChannelInfo { pub series: u64, diff --git a/crates/httpret/src/api4/accounting.rs b/crates/httpret/src/api4/accounting.rs index 2ded292..e7b5dd2 100644 --- a/crates/httpret/src/api4/accounting.rs +++ b/crates/httpret/src/api4/accounting.rs @@ -20,6 +20,7 @@ use netpod::req_uri_to_url; use netpod::ttl::RetentionTime; use netpod::FromUrl; use netpod::NodeConfigCached; +use netpod::ScalarType; use netpod::Shape; use netpod::TsMs; use query::api4::AccountingIngestedBytesQuery; @@ -28,6 +29,71 @@ use scyllaconn::accounting::toplist::UsageData; use serde::Deserialize; use serde::Serialize; +#[derive(Debug, Serialize, Deserialize)] +pub struct AccountedIngested { + names: Vec, + counts: Vec, + bytes: Vec, + scalar_types: Vec, + shapes: Vec, +} + +impl AccountedIngested { + fn new() -> Self { + Self { + names: Vec::new(), + counts: Vec::new(), + bytes: Vec::new(), + scalar_types: Vec::new(), + shapes: Vec::new(), + } + } + + fn push(&mut self, name: String, counts: u64, bytes: u64, scalar_type: ScalarType, shape: Shape) { + self.names.push(name); + self.counts.push(counts); + self.bytes.push(bytes); + self.scalar_types.push(scalar_type); + self.shapes.push(shape); + } + + fn sort_by_counts(&mut self) { + let mut tmp: Vec<_> = self + .counts + .iter() + .map(|&x| x) + .enumerate() + .map(|(i, x)| (x, i)) + .collect(); + tmp.sort_unstable(); + let tmp: Vec<_> = tmp.into_iter().rev().map(|x| x.1).collect(); + self.reorder_by_index_list(&tmp); + } + + fn sort_by_bytes(&mut self) { + let mut tmp: Vec<_> = self.bytes.iter().map(|&x| x).enumerate().map(|(i, x)| (x, i)).collect(); + tmp.sort_unstable(); + let tmp: Vec<_> = tmp.into_iter().rev().map(|x| x.1).collect(); + self.reorder_by_index_list(&tmp); + } + + fn reorder_by_index_list(&mut self, tmp: &[usize]) { + self.names = tmp.iter().map(|&x| self.names[x].clone()).collect(); + self.counts = tmp.iter().map(|&x| self.counts[x]).collect(); + self.bytes = tmp.iter().map(|&x| self.bytes[x]).collect(); + self.scalar_types = tmp.iter().map(|&x| self.scalar_types[x].clone()).collect(); + self.shapes = tmp.iter().map(|&x| self.shapes[x].clone()).collect(); + } + + fn truncate(&mut self, len: usize) { + self.names.truncate(len); + self.counts.truncate(len); + self.bytes.truncate(len); + self.scalar_types.truncate(len); + self.shapes.truncate(len); + } +} + #[derive(Debug, Serialize, Deserialize)] pub struct Toplist { dim0: AccountedIngested, @@ -57,61 +123,6 @@ impl Toplist { } } -#[derive(Debug, Serialize, Deserialize)] -pub struct AccountedIngested { - names: Vec, - counts: Vec, - bytes: Vec, -} - -impl AccountedIngested { - fn new() -> Self { - Self { - names: Vec::new(), - counts: Vec::new(), - bytes: Vec::new(), - } - } - - fn push(&mut self, name: String, counts: u64, bytes: u64) { - self.names.push(name); - self.counts.push(counts); - self.bytes.push(bytes); - } - - fn sort_by_counts(&mut self) { - let mut tmp: Vec<_> = self - .counts - .iter() - .map(|&x| x) - .enumerate() - .map(|(i, x)| (x, i)) - .collect(); - tmp.sort_unstable(); - let tmp: Vec<_> = tmp.into_iter().rev().map(|x| x.1).collect(); - self.reorder_by_index_list(&tmp); - } - - fn sort_by_bytes(&mut self) { - let mut tmp: Vec<_> = self.bytes.iter().map(|&x| x).enumerate().map(|(i, x)| (x, i)).collect(); - tmp.sort_unstable(); - let tmp: Vec<_> = tmp.into_iter().rev().map(|x| x.1).collect(); - self.reorder_by_index_list(&tmp); - } - - fn reorder_by_index_list(&mut self, tmp: &[usize]) { - self.names = tmp.iter().map(|&x| self.names[x].clone()).collect(); - self.counts = tmp.iter().map(|&x| self.counts[x]).collect(); - self.bytes = tmp.iter().map(|&x| self.bytes[x]).collect(); - } - - fn truncate(&mut self, len: usize) { - self.names.truncate(len); - self.counts.truncate(len); - self.bytes.truncate(len); - } -} - pub struct AccountingIngested {} impl AccountingIngested { @@ -169,6 +180,12 @@ impl AccountingIngested { for e in res.dim0.bytes { ret.bytes.push(e) } + for e in res.dim0.scalar_types { + ret.scalar_types.push(e) + } + for e in res.dim0.shapes { + ret.shapes.push(e) + } for e in res.dim1.names { ret.names.push(e) } @@ -178,6 +195,12 @@ impl AccountingIngested { for e in res.dim1.bytes { ret.bytes.push(e) } + for e in res.dim1.scalar_types { + ret.scalar_types.push(e) + } + for e in res.dim1.shapes { + ret.shapes.push(e) + } if let Some(sort) = qu.sort() { if sort == "counts" { // ret.sort_by_counts(); @@ -309,17 +332,23 @@ async fn resolve_usages(usage: UsageData, pgqu: &PgQueue) -> Result { ret.scalar_count += 1; - ret.dim0.push(info.name, counts, bytes); + ret.dim0.push(info.name, counts, bytes, info.scalar_type, info.shape); } Shape::Wave(_) => { ret.wave_count += 1; - ret.dim1.push(info.name, counts, bytes); + ret.dim1.push(info.name, counts, bytes, info.scalar_type, info.shape); } Shape::Image(_, _) => {} } } else { ret.infos_missing_count += 1; - ret.dim0.push("UNRESOLVEDSERIES".into(), counts, bytes); + ret.dim0.push( + "UNRESOLVEDSERIES".into(), + counts, + bytes, + ScalarType::BOOL, + Shape::Scalar, + ); } } usage_skip += nn; diff --git a/crates/httpret/src/api4/binned.rs b/crates/httpret/src/api4/binned.rs index 45b4cdf..b6c7611 100644 --- a/crates/httpret/src/api4/binned.rs +++ b/crates/httpret/src/api4/binned.rs @@ -1,5 +1,6 @@ use crate::bodystream::response; use crate::channelconfig::ch_conf_from_binned; +use crate::requests::accepts_cbor_framed; use crate::requests::accepts_json_framed; use crate::requests::accepts_json_or_all; use crate::requests::accepts_octets; @@ -26,6 +27,7 @@ use netpod::timeunits::SEC; use netpod::FromUrl; use netpod::NodeConfigCached; use netpod::ReqCtx; +use netpod::APP_CBOR_FRAMED; use netpod::APP_JSON; use netpod::APP_JSON_FRAMED; use netpod::HEADER_NAME_REQUEST_ID; @@ -38,7 +40,6 @@ use std::sync::Arc; use streams::collect::CollectResult; use streams::eventsplainreader::DummyCacheReadProvider; use streams::eventsplainreader::SfDatabufferEventReadProvider; -use streams::lenframe::bytes_chunks_to_len_framed_str; use streams::timebin::cached::reader::EventsReadProvider; use streams::timebin::CacheReadProvider; use tracing::Instrument; @@ -125,7 +126,9 @@ async fn binned( { Err(Error::ServerError)?; } - if accepts_json_framed(req.headers()) { + if accepts_cbor_framed(req.headers()) { + Ok(binned_cbor_framed(url, req, ctx, pgqueue, scyqueue, ncc).await?) + } else if accepts_json_framed(req.headers()) { Ok(binned_json_framed(url, req, ctx, pgqueue, scyqueue, ncc).await?) } else if accepts_json_or_all(req.headers()) { Ok(binned_json_single(url, req, ctx, pgqueue, scyqueue, ncc).await?) @@ -253,7 +256,7 @@ async fn binned_json_framed( let reqid = crate::status_board().map_err(|_e| Error::ServerError)?.new_status_id(); let (_head, _body) = req.into_parts(); let query = BinnedQuery::from_url(&url).map_err(|e| { - error!("binned_json: {e:?}"); + error!("binned_json_framed: {e:?}"); Error::BadQuery(e.to_string()) })?; // TODO handle None case better and return 404 @@ -285,10 +288,62 @@ async fn binned_json_framed( ) .instrument(span1) .await?; - let stream = bytes_chunks_to_len_framed_str(stream); + let stream = streams::lenframe::bytes_chunks_to_len_framed_str(stream); let ret = response(StatusCode::OK) .header(CONTENT_TYPE, APP_JSON_FRAMED) .header(HEADER_NAME_REQUEST_ID, ctx.reqid()) .body(body_stream(stream))?; Ok(ret) } + +async fn binned_cbor_framed( + url: Url, + req: Requ, + ctx: &ReqCtx, + pgqueue: &PgQueue, + scyqueue: Option, + ncc: &NodeConfigCached, +) -> Result { + debug!("binned_cbor_framed {:?}", req); + let reqid = crate::status_board().map_err(|_e| Error::ServerError)?.new_status_id(); + let (_head, _body) = req.into_parts(); + let query = BinnedQuery::from_url(&url).map_err(|e| { + error!("binned_cbor_framed: {e:?}"); + Error::BadQuery(e.to_string()) + })?; + // TODO handle None case better and return 404 + let ch_conf = ch_conf_from_binned(&query, ctx, pgqueue, ncc) + .await? + .ok_or_else(|| Error::ChannelNotFound)?; + let span1 = span!( + Level::INFO, + "httpret::binned_cbor_framed", + reqid, + beg = query.range().beg_u64() / SEC, + end = query.range().end_u64() / SEC, + ch = query.channel().name(), + ); + span1.in_scope(|| { + debug!("begin"); + }); + let open_bytes = Arc::pin(OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone())); + let (events_read_provider, cache_read_provider) = + make_read_provider(ch_conf.name(), scyqueue, open_bytes, ctx, ncc); + let timeout_provider = streamio::streamtimeout::StreamTimeout::boxed(); + let stream = streams::timebinnedjson::timebinned_cbor_framed( + query, + ch_conf, + ctx, + cache_read_provider, + events_read_provider, + timeout_provider, + ) + .instrument(span1) + .await?; + let stream = streams::lenframe::bytes_chunks_to_framed(stream); + let ret = response(StatusCode::OK) + .header(CONTENT_TYPE, APP_CBOR_FRAMED) + .header(HEADER_NAME_REQUEST_ID, ctx.reqid()) + .body(body_stream(stream))?; + Ok(ret) +} diff --git a/crates/scyllaconn/src/events.rs b/crates/scyllaconn/src/events.rs index 75e9419..f7fc34f 100644 --- a/crates/scyllaconn/src/events.rs +++ b/crates/scyllaconn/src/events.rs @@ -1,14 +1,11 @@ use crate::events2::events::EventReadOpts; use crate::events2::prepare::StmtsEvents; +use crate::log::*; use crate::range::ScyllaSeriesRange; use crate::worker::ScyllaQueue; use core::fmt; -use daqbuf_err as err; use daqbuf_series::SeriesId; -use err::thiserror; -use err::ThisError; use futures_util::Future; -use futures_util::StreamExt; use futures_util::TryStreamExt; use items_0::scalar_ops::ScalarOps; use items_0::timebin::BinningggContainerEventsDyn; @@ -16,13 +13,11 @@ use items_0::Appendable; use items_0::Empty; use items_0::WithLen; use items_2::binning::container_events::ContainerEvents; -use netpod::log::*; use netpod::ttl::RetentionTime; use netpod::DtNano; use netpod::EnumVariant; use netpod::TsMs; use netpod::TsNano; -use scylla::frame::response::result::Row; use scylla::Session; use std::pin::Pin; use std::sync::Arc; @@ -31,23 +26,24 @@ use tracing::Instrument; macro_rules! trace_fetch { ($($arg:tt)*) => ( if true { trace!($($arg)*); } ) } -#[derive(Debug, ThisError)] -#[cstm(name = "ScyllaReadEvents")] -pub enum Error { - Prepare(#[from] crate::events2::prepare::Error), - ScyllaQuery(#[from] scylla::transport::errors::QueryError), - ScyllaNextRow(#[from] scylla::transport::iterator::NextRowError), - ScyllaTypeConv(#[from] scylla::cql_to_rust::FromRowError), - ScyllaWorker(Box), - ScyllaTypeCheck(#[from] scylla::deserialize::TypeCheckError), - MissingQuery(String), - NotTokenAware, - RangeEndOverflow, - InvalidFuture, - TestError(String), - Logic, - TodoUnsupported, -} +autoerr::create_error_v1!( + name(Error, "ScyllaReadEvents"), + enum variants { + Prepare(#[from] crate::events2::prepare::Error), + ScyllaQuery(#[from] scylla::transport::errors::QueryError), + ScyllaNextRow(#[from] scylla::transport::iterator::NextRowError), + ScyllaTypeConv(#[from] scylla::cql_to_rust::FromRowError), + ScyllaWorker(Box), + ScyllaTypeCheck(#[from] scylla::deserialize::TypeCheckError), + MissingQuery(String), + NotTokenAware, + RangeEndOverflow, + InvalidFuture, + TestError(String), + Logic, + TodoUnsupported, + }, +); impl From for Error { fn from(value: crate::worker::Error) -> Self { @@ -58,8 +54,8 @@ impl From for Error { pub(super) trait ValTy: Sized + 'static { type ScaTy: ScalarOps + std::default::Default; type ScyTy: for<'a, 'b> scylla::deserialize::DeserializeValue<'a, 'b>; + type ScyRowTy: for<'a, 'b> scylla::deserialize::DeserializeRow<'a, 'b>; type Container: BinningggContainerEventsDyn + Empty + Appendable; - fn from_scyty(inp: Self::ScyTy) -> Self; fn from_valueblob(inp: Vec) -> Self; fn table_name() -> &'static str; fn default() -> Self; @@ -71,14 +67,7 @@ pub(super) trait ValTy: Sized + 'static { scy: Arc, stmts: Arc, ) -> Pin, ReadJobTrace), Error>> + Send>>; - fn convert_rows( - rows: Vec, - range: ScyllaSeriesRange, - ts_msp: TsMs, - with_values: bool, - bck: bool, - last_before: &mut Option<(TsNano, Self)>, - ) -> Result; + fn scy_row_to_ts_val(msp: TsMs, inp: Self::ScyRowTy) -> (TsNano, Self); } macro_rules! impl_scaty_scalar { @@ -86,14 +75,11 @@ macro_rules! impl_scaty_scalar { impl ValTy for $st { type ScaTy = $st; type ScyTy = $st_scy; + type ScyRowTy = (i64, $st_scy); type Container = ContainerEvents; - fn from_scyty(inp: Self::ScyTy) -> Self { - inp as Self - } - fn from_valueblob(_inp: Vec) -> Self { - ::default() + panic!("unused") } fn table_name() -> &'static str { @@ -121,15 +107,9 @@ macro_rules! impl_scaty_scalar { Box::pin(read_next_values_2::(opts, jobtrace, scy, stmts)) } - fn convert_rows( - rows: Vec, - range: ScyllaSeriesRange, - ts_msp: TsMs, - with_values: bool, - bck: bool, - last_before: &mut Option<(TsNano, Self)>, - ) -> Result { - convert_rows_0::(rows, range, ts_msp, with_values, bck, last_before) + fn scy_row_to_ts_val(msp: TsMs, inp: Self::ScyRowTy) -> (TsNano, Self) { + let ts = TsNano::from_ns(msp.ns_u64() + inp.0 as u64); + (ts, inp.1 as Self::ScaTy) } } }; @@ -140,12 +120,9 @@ macro_rules! impl_scaty_array { impl ValTy for $vt { type ScaTy = $st; type ScyTy = $st_scy; + type ScyRowTy = (i64, $st_scy); type Container = ContainerEvents>; - fn from_scyty(inp: Self::ScyTy) -> Self { - inp.into_iter().map(|x| x as Self::ScaTy).collect() - } - fn from_valueblob(inp: Vec) -> Self { if inp.len() < 32 { ::default() @@ -188,15 +165,9 @@ macro_rules! impl_scaty_array { Box::pin(read_next_values_2::(opts, jobtrace, scy, stmts)) } - fn convert_rows( - rows: Vec, - range: ScyllaSeriesRange, - ts_msp: TsMs, - with_values: bool, - bck: bool, - last_before: &mut Option<(TsNano, Self)>, - ) -> Result { - convert_rows_0::(rows, range, ts_msp, with_values, bck, last_before) + fn scy_row_to_ts_val(msp: TsMs, inp: Self::ScyRowTy) -> (TsNano, Self) { + let ts = TsNano::from_ns(msp.ns_u64() + inp.0 as u64); + (ts, inp.1 .into_iter().map(|x| x as _).collect()) } } }; @@ -205,16 +176,11 @@ macro_rules! impl_scaty_array { impl ValTy for EnumVariant { type ScaTy = EnumVariant; type ScyTy = i16; + type ScyRowTy = (i64, i16, String); type Container = ContainerEvents; - fn from_scyty(inp: Self::ScyTy) -> Self { - let _ = inp; - panic!("uses more specialized impl") - } - - fn from_valueblob(inp: Vec) -> Self { - let _ = inp; - panic!("uses more specialized impl") + fn from_valueblob(_inp: Vec) -> Self { + panic!("unused") } fn table_name() -> &'static str { @@ -242,31 +208,20 @@ impl ValTy for EnumVariant { Box::pin(read_next_values_2::(opts, jobtrace, scy, stmts)) } - fn convert_rows( - rows: Vec, - range: ScyllaSeriesRange, - ts_msp: TsMs, - with_values: bool, - bck: bool, - last_before: &mut Option<(TsNano, Self)>, - ) -> Result { - convert_rows_enum(rows, range, ts_msp, with_values, bck, last_before) + fn scy_row_to_ts_val(msp: TsMs, inp: Self::ScyRowTy) -> (TsNano, Self) { + let ts = TsNano::from_ns(msp.ns_u64() + inp.0 as u64); + (ts, EnumVariant::new(inp.1 as u16, inp.2)) } } impl ValTy for Vec { type ScaTy = String; type ScyTy = Vec; + type ScyRowTy = (i64, Vec); type Container = ContainerEvents>; - fn from_scyty(inp: Self::ScyTy) -> Self { - inp - } - - fn from_valueblob(inp: Vec) -> Self { - let _ = inp; - warn!("ValTy::from_valueblob for Vec"); - Vec::new() + fn from_valueblob(_inp: Vec) -> Self { + panic!("unused") } fn table_name() -> &'static str { @@ -295,15 +250,9 @@ impl ValTy for Vec { Box::pin(fut) } - fn convert_rows( - rows: Vec, - range: ScyllaSeriesRange, - ts_msp: TsMs, - with_values: bool, - bck: bool, - last_before: &mut Option<(TsNano, Self)>, - ) -> Result { - convert_rows_0::(rows, range, ts_msp, with_values, bck, last_before) + fn scy_row_to_ts_val(msp: TsMs, inp: Self::ScyRowTy) -> (TsNano, Self) { + let ts = TsNano::from_ns(msp.ns_u64() + inp.0 as u64); + (ts, inp.1) } } @@ -470,9 +419,7 @@ async fn read_next_values_2( where ST: ValTy, { - let use_method_2 = true; - - trace!("read_next_values_2 {:?} st_name {}", opts, ST::st_name()); + trace_fetch!("read_next_values_2 {:?} st_name {}", opts, ST::st_name()); let series = opts.series; let ts_msp = opts.ts_msp; let range = opts.range; @@ -492,7 +439,7 @@ where } else { DtNano::from_ns(0) }; - trace!( + trace_fetch!( "FWD ts_msp {} ts_lsp_min {} ts_lsp_max {} {}", ts_msp.fmt(), ts_lsp_min, @@ -519,19 +466,10 @@ where ts_lsp_min.ns() as i64, ts_lsp_max.ns() as i64, ); - trace!("FWD event search params {:?}", params); + trace_fetch!("FWD event search params {:?}", params); jobtrace.add_event_now(ReadEventKind::CallExecuteIter); - let mut res = scy.execute_iter(qu.clone(), params).await?; - if use_method_2 == false { - // let mut rows = Vec::new(); - // while let Some(x) = res.next().await { - // rows.push(x?); - // } - // let mut last_before = None; - // let ret = ::convert_rows(rows, range, ts_msp, with_values, !opts.fwd, &mut last_before)?; - // ret - todo!() - } else { + let res = scy.execute_iter(qu.clone(), params).await?; + { let mut ret = ::Container::empty(); // TODO must branch already here depending on what input columns we expect if with_values { @@ -545,10 +483,11 @@ where ret } else { let mut i = 0; - let mut it = res.rows_stream::<(i64, ST::ScyTy)>()?; + let mut it = res.rows_stream::<::ScyRowTy>()?; while let Some(row) = it.try_next().await? { - let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); - let value = ::from_scyty(row.1); + let (ts, value) = ::scy_row_to_ts_val(ts_msp, row); + // let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); + // let value = ::from_scyty(row.1); ret.push(ts, value); i += 1; if i % 2000 == 0 { @@ -576,7 +515,7 @@ where } else { DtNano::from_ns(0) }; - trace!( + trace_fetch!( "BCK ts_msp {} ts_lsp_max {} {}", ts_msp.fmt(), ts_lsp_max, @@ -588,8 +527,8 @@ where .shape(ST::is_valueblob()) .st(ST::st_name())?; let params = (series as i64, ts_msp.ms() as i64, ts_lsp_max.ns() as i64); - trace!("BCK event search params {:?}", params); - let mut res = scy.execute_iter(qu.clone(), params).await?; + trace_fetch!("BCK event search params {:?}", params); + let res = scy.execute_iter(qu.clone(), params).await?; { let mut ret = ::Container::empty(); // TODO must branch already here depending on what input columns we expect @@ -604,10 +543,11 @@ where ret } else { let mut i = 0; - let mut it = res.rows_stream::<(i64, ST::ScyTy)>()?; + let mut it = res.rows_stream::<::ScyRowTy>()?; while let Some(row) = it.try_next().await? { - let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); - let value = ::from_scyty(row.1); + let (ts, value) = ::scy_row_to_ts_val(ts_msp, row); + // let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); + // let value = ::from_scyty(row.1); ret.push(ts, value); i += 1; if i % 2000 == 0 { @@ -629,136 +569,8 @@ where ret } } - // let mut _last_before = None; - // let ret = ST::convert_rows(rows, range, ts_msp, with_values, !opts.fwd, &mut _last_before)?; - // if ret.len() > 1 { - // error!("multiple events in backwards search {}", ret.len()); - // } - // ret }; - trace!("read ts_msp {} len {}", ts_msp.fmt(), ret.len()); + trace_fetch!("read ts_msp {} len {}", ts_msp.fmt(), ret.len()); let ret = Box::new(ret); Ok((ret, jobtrace)) } - -fn convert_rows_0( - rows: Vec, - range: ScyllaSeriesRange, - ts_msp: TsMs, - with_values: bool, - bck: bool, - last_before: &mut Option<(TsNano, ST)>, -) -> Result<::Container, Error> { - todo!() -} - -// fn convert_rows_0( -// rows: Vec, -// range: ScyllaSeriesRange, -// ts_msp: TsMs, -// with_values: bool, -// bck: bool, -// last_before: &mut Option<(TsNano, ST)>, -// ) -> Result<::Container, Error> { -// let mut ret = ::Container::empty(); -// for row in rows { -// let (ts, value) = if with_values { -// if ST::is_valueblob() { -// let row: (i64, Vec) = row.into_typed()?; -// // trace!("read a value blob len {}", row.1.len()); -// let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); -// let value = ValTy::from_valueblob(row.1); -// (ts, value) -// } else { -// let row: (i64, ST::ScyTy) = row.into_typed()?; -// let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); -// let value = ValTy::from_scyty(row.1); -// (ts, value) -// } -// } else { -// let row: (i64,) = row.into_typed()?; -// let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); -// let value = ValTy::default(); -// (ts, value) -// }; -// if bck { -// if ts >= range.beg() { -// // TODO count as logic error -// error!("ts >= range.beg"); -// } else if ts < range.beg() { -// ret.push(ts, value); -// } else { -// *last_before = Some((ts, value)); -// } -// } else { -// if ts >= range.end() { -// // TODO count as logic error -// error!("ts >= range.end"); -// } else if ts >= range.beg() { -// ret.push(ts, value); -// } else { -// if last_before.is_none() { -// warn!("encounter event before range in forward read {ts}"); -// } -// *last_before = Some((ts, value)); -// } -// } -// } -// Ok(ret) -// } - -fn convert_rows_enum( - rows: Vec, - range: ScyllaSeriesRange, - ts_msp: TsMs, - with_values: bool, - bck: bool, - last_before: &mut Option<(TsNano, EnumVariant)>, -) -> Result<::Container, Error> { - let mut ret = ::Container::new(); - trace_fetch!("convert_rows_enum {}", ::st_name()); - for row in rows { - let (ts, value) = if with_values { - if EnumVariant::is_valueblob() { - return Err(Error::Logic); - } else { - let row: (i64, i16, String) = row.into_typed()?; - let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); - let val = row.1 as u16; - let valstr = row.2; - let value = EnumVariant::new(val, valstr); - // trace_fetch!("read enum variant {:?} {:?}", value, value.name_string()); - (ts, value) - } - } else { - let row: (i64,) = row.into_typed()?; - let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); - let value = ValTy::default(); - (ts, value) - }; - if bck { - if ts >= range.beg() { - // TODO count as logic error - error!("ts >= range.beg"); - } else if ts < range.beg() { - ret.push_back(ts, value); - } else { - *last_before = Some((ts, value)); - } - } else { - if ts >= range.end() { - // TODO count as logic error - error!("ts >= range.end"); - } else if ts >= range.beg() { - ret.push_back(ts, value); - } else { - if last_before.is_none() { - warn!("encounter event before range in forward read {ts}"); - } - *last_before = Some((ts, value)); - } - } - } - trace_fetch!("convert_rows_enum return {:?}", ret); - Ok(ret) -}