diff --git a/items_0/src/items_0.rs b/items_0/src/items_0.rs index 985deb5..dcc6768 100644 --- a/items_0/src/items_0.rs +++ b/items_0/src/items_0.rs @@ -59,6 +59,10 @@ pub trait Empty { fn empty() -> Self; } +pub trait Appendable: Empty + WithLen { + fn push(&mut self, ts: u64, pulse: u64, value: STY); +} + pub trait TypeName { fn type_name(&self) -> String; } @@ -196,3 +200,11 @@ impl PartialEq for Box { Events::partial_eq_dyn(self.as_ref(), other.as_ref()) } } + +pub struct TransformProperties { + pub needs_value: bool, +} + +pub trait TransformStage { + fn query_transform_properties(&self) -> TransformProperties; +} diff --git a/items_2/src/eventsdim0.rs b/items_2/src/eventsdim0.rs index 15b9acf..9ed69f7 100644 --- a/items_2/src/eventsdim0.rs +++ b/items_2/src/eventsdim0.rs @@ -6,6 +6,7 @@ use crate::TimeBinnableType; use crate::TimeBinnableTypeAggregator; use crate::TimeBinner; use err::Error; +use items_0::Appendable; use items_0::scalar_ops::ScalarOps; use items_0::AsAnyMut; use items_0::AsAnyRef; @@ -1153,6 +1154,15 @@ impl items_0::collect_c::Collectable for EventsDim0 { } } +impl Appendable for EventsDim0 +where + STY: ScalarOps, +{ + fn push(&mut self, ts: u64, pulse: u64, value: STY) { + Self::push(self, ts, pulse, value) + } +} + #[cfg(test)] mod test_frame { use super::*; diff --git a/items_2/src/eventsdim1.rs b/items_2/src/eventsdim1.rs index 483ad89..b29c289 100644 --- a/items_2/src/eventsdim1.rs +++ b/items_2/src/eventsdim1.rs @@ -7,6 +7,7 @@ use crate::TimeBinnableTypeAggregator; use crate::TimeBinner; use err::Error; use items_0::scalar_ops::ScalarOps; +use items_0::Appendable; use items_0::AsAnyMut; use items_0::AsAnyRef; use items_0::Empty; @@ -1106,3 +1107,12 @@ impl items_0::collect_c::Collectable for EventsDim1 { Box::new(EventsDim1Collector::::new()) } } + +impl Appendable> for EventsDim1 +where + STY: ScalarOps, +{ + fn push(&mut self, ts: u64, pulse: u64, value: Vec) { + Self::push(self, ts, pulse, value) + } +} diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index 3b62388..1dd5986 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -1410,7 +1410,7 @@ impl Iterator for PreBinnedPatchIterator { } } -#[derive(Clone)] +#[derive(Clone, Serialize, Deserialize)] pub struct BinnedGridSpec { bin_t_len: u64, } @@ -1447,7 +1447,7 @@ impl fmt::Debug for BinnedGridSpec { } } -#[derive(Clone, Debug)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct BinnedRange { grid_spec: BinnedGridSpec, offset: u64, diff --git a/netpod/src/transform.rs b/netpod/src/transform.rs index a56ef19..b8c6bf1 100644 --- a/netpod/src/transform.rs +++ b/netpod/src/transform.rs @@ -1,12 +1,36 @@ use crate::get_url_query_pairs; +use crate::log::*; use crate::AppendToUrl; +use crate::BinnedRange; use crate::FromUrl; +use crate::NanoRange; +use err::Error; use serde::Deserialize; use serde::Serialize; +use std::collections::BTreeMap; +use url::Url; -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum EventTransform { + EventBlobsVerbatim, + EventBlobsUncompressed, + ValueFull, + ArrayPick(usize), + MinMaxAvgDev, + PulseIdDiff, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum TimeBinningTransform { + None, + TimeWeighted(BinnedRange), + Unweighted(BinnedRange), +} + +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct Transform { - array_pick: Option, + event: EventTransform, + time_binning: TimeBinningTransform, } impl Transform { @@ -16,31 +40,81 @@ impl Transform { } impl FromUrl for Transform { - fn from_url(url: &url::Url) -> Result { + fn from_url(url: &Url) -> Result { let pairs = get_url_query_pairs(url); Self::from_pairs(&pairs) } - fn from_pairs(pairs: &std::collections::BTreeMap) -> Result { + fn from_pairs(pairs: &BTreeMap) -> Result { let upre = Self::url_prefix(); - let ret = Self { - array_pick: pairs + let key = "binningScheme"; + if let Some(s) = pairs.get(key) { + let ret = if s == "eventBlobs" { + Transform { + event: EventTransform::EventBlobsVerbatim, + time_binning: TimeBinningTransform::None, + } + } else if s == "fullValue" { + Transform { + event: EventTransform::ValueFull, + time_binning: TimeBinningTransform::None, + } + } else if s == "timeWeightedScalar" { + Transform { + event: EventTransform::MinMaxAvgDev, + time_binning: TimeBinningTransform::TimeWeighted(BinnedRange::covering_range( + NanoRange { + beg: 20000000000, + end: 30000000000, + }, + 20, + )?), + } + } else if s == "unweightedScalar" { + Transform { + event: EventTransform::EventBlobsVerbatim, + time_binning: TimeBinningTransform::None, + } + } else if s == "binnedX" { + let _u: usize = pairs.get("binnedXcount").map_or("1", |k| k).parse()?; + warn!("TODO binnedXcount"); + Transform { + event: EventTransform::MinMaxAvgDev, + time_binning: TimeBinningTransform::None, + } + } else if s == "pulseIdDiff" { + Transform { + event: EventTransform::PulseIdDiff, + time_binning: TimeBinningTransform::None, + } + } else { + return Err(Error::with_msg("can not extract binningScheme")); + }; + Ok(ret) + } else { + // TODO add option to pick from array. + let _pick = pairs .get(&format!("{}ArrayPick", upre)) .map(|x| match x.parse::() { Ok(n) => Some(n), Err(_) => None, }) - .unwrap_or(None), - }; - Ok(ret) + .unwrap_or(None); + let ret = Transform { + event: EventTransform::EventBlobsVerbatim, + time_binning: TimeBinningTransform::None, + }; + Ok(ret) + } } } impl AppendToUrl for Transform { - fn append_to_url(&self, url: &mut url::Url) { + fn append_to_url(&self, url: &mut Url) { + warn!("TODO AppendToUrl for Transform"); let upre = Self::url_prefix(); let mut g = url.query_pairs_mut(); - if let Some(x) = &self.array_pick { + if let Some(x) = &Some(123) { g.append_pair(&format!("{}ArrayPick", upre), &format!("{}", x)); } } diff --git a/nodenet/src/conn.rs b/nodenet/src/conn.rs index ea8b05e..487cb0a 100644 --- a/nodenet/src/conn.rs +++ b/nodenet/src/conn.rs @@ -108,6 +108,11 @@ async fn make_channel_events_stream( let scalar_type = f.scalar_type; let shape = f.shape; let do_test_stream_error = false; + let with_values = if let AggKind::PulseIdDiff = evq.agg_kind_value() { + false + } else { + true + }; debug!("Make EventsStreamScylla for {series} {scalar_type:?} {shape:?}"); let stream = scyllaconn::events::EventsStreamScylla::new( series, @@ -115,6 +120,7 @@ async fn make_channel_events_stream( do_one_before_range, scalar_type, shape, + with_values, scy, do_test_stream_error, ); diff --git a/scyllaconn/src/bincache.rs b/scyllaconn/src/bincache.rs index 5d00c32..248d995 100644 --- a/scyllaconn/src/bincache.rs +++ b/scyllaconn/src/bincache.rs @@ -391,6 +391,7 @@ pub async fn fetch_uncached_binned_events( do_one_before_range, chn.scalar_type.clone(), chn.shape.clone(), + true, scy, false, ); diff --git a/scyllaconn/src/errconv.rs b/scyllaconn/src/errconv.rs index 887f554..9841060 100644 --- a/scyllaconn/src/errconv.rs +++ b/scyllaconn/src/errconv.rs @@ -1,6 +1,8 @@ use err::Error; use scylla::frame::response::cql_to_rust::FromRowError as ScyFromRowError; -use scylla::transport::errors::{NewSessionError as ScyNewSessionError, QueryError as ScyQueryError}; +use scylla::transport::errors::NewSessionError as ScyNewSessionError; +use scylla::transport::errors::QueryError as ScyQueryError; +use scylla::transport::query_result::RowsExpectedError; pub trait ErrConv { fn err_conv(self) -> Result; @@ -49,3 +51,12 @@ impl ErrConv for Result { } } } + +impl ErrConv for Result { + fn err_conv(self) -> Result { + match self { + Ok(k) => Ok(k), + Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))), + } + } +} diff --git a/scyllaconn/src/events.rs b/scyllaconn/src/events.rs index d33246a..b6a88d5 100644 --- a/scyllaconn/src/events.rs +++ b/scyllaconn/src/events.rs @@ -3,6 +3,8 @@ use err::Error; use futures_util::Future; use futures_util::FutureExt; use futures_util::Stream; +use items_0::scalar_ops::ScalarOps; +use items_0::Appendable; use items_0::Empty; use items_0::Events; use items_0::WithLen; @@ -55,260 +57,220 @@ async fn find_ts_msp( Ok((ret1, ret2)) } -macro_rules! read_next_scalar_values { - ($fname:ident, $st:ty, $scyty:ty, $table_name:expr) => { - async fn $fname( - series: u64, - ts_msp: u64, - range: NanoRange, - fwd: bool, - scy: Arc, - ) -> Result, Error> { - type ST = $st; - type SCYTY = $scyty; - if range.end > i64::MAX as u64 { - return Err(Error::with_msg_no_trace(format!("range.end overflows i64"))); +trait ValTy: Sized { + type ScaTy: ScalarOps + std::default::Default; + type ScyTy: scylla::cql_to_rust::FromCqlVal; + type Container: Events + Appendable; + fn from_scyty(inp: Self::ScyTy) -> Self; + fn table_name() -> &'static str; + fn default() -> Self; +} + +macro_rules! impl_scaty_scalar { + ($st:ty, $st_scy:ty, $table_name:expr) => { + impl ValTy for $st { + type ScaTy = $st; + type ScyTy = $st_scy; + type Container = EventsDim0; + fn from_scyty(inp: Self::ScyTy) -> Self { + inp as Self + } + fn table_name() -> &'static str { + $table_name + } + fn default() -> Self { + ::default() } - let ret = if fwd { - let ts_lsp_min = if ts_msp < range.beg { range.beg - ts_msp } else { 0 }; - let ts_lsp_max = if ts_msp < range.end { range.end - ts_msp } else { 0 }; - trace!( - "FWD ts_msp {} ts_lsp_min {} ts_lsp_max {} beg {} end {} {}", - ts_msp, - ts_lsp_min, - ts_lsp_max, - range.beg, - range.end, - stringify!($table_name) - ); - // TODO use prepared! - let cql = concat!( - "select ts_lsp, pulse, value from ", - $table_name, - " where series = ? and ts_msp = ? and ts_lsp >= ? and ts_lsp < ?" - ); - let res = scy - .query( - cql, - (series as i64, ts_msp as i64, ts_lsp_min as i64, ts_lsp_max as i64), - ) - .await - .err_conv()?; - let mut last_before = None; - let mut ret = EventsDim0::::empty(); - for row in res.rows_typed_or_empty::<(i64, i64, SCYTY)>() { - let row = row.err_conv()?; - let ts = ts_msp + row.0 as u64; - let pulse = row.1 as u64; - let value = row.2 as ST; - if ts >= range.end { - // TODO count as logic error - error!("ts >= range.end"); - } else if ts >= range.beg { - ret.push(ts, pulse, value); - } else { - if last_before.is_none() { - warn!("encounter event before range in forward read {ts}"); - } - last_before = Some((ts, pulse, value)); - } - } - ret - } else { - let ts_lsp_max = if ts_msp < range.beg { range.beg - ts_msp } else { 0 }; - trace!( - "BCK ts_msp {} ts_lsp_max {} beg {} end {} {}", - ts_msp, - ts_lsp_max, - range.beg, - range.end, - stringify!($table_name) - ); - // TODO use prepared! - let cql = concat!( - "select ts_lsp, pulse, value from ", - $table_name, - " where series = ? and ts_msp = ? and ts_lsp < ? order by ts_lsp desc limit 1" - ); - let res = scy - .query(cql, (series as i64, ts_msp as i64, ts_lsp_max as i64)) - .await - .err_conv()?; - let mut seen_before = false; - let mut ret = EventsDim0::::empty(); - for row in res.rows_typed_or_empty::<(i64, i64, SCYTY)>() { - let row = row.err_conv()?; - let ts = ts_msp + row.0 as u64; - let pulse = row.1 as u64; - let value = row.2 as ST; - if ts >= range.beg { - // TODO count as logic error - error!("ts >= range.beg"); - } else if ts < range.beg { - ret.push(ts, pulse, value); - } else { - seen_before = true; - } - } - let _ = seen_before; - if ret.len() > 1 { - error!("multiple events in backwards search {}", ret.len()); - } - ret - }; - trace!("found in total {} events ts_msp {}", ret.tss.len(), ts_msp); - Ok(ret) } }; } -macro_rules! read_next_array_values { - ($fname:ident, $st:ty, $scyty:ty, $table_name:expr) => { - async fn $fname( - series: u64, - ts_msp: u64, - range: NanoRange, - fwd: bool, - scy: Arc, - ) -> Result, Error> { - type ST = $st; - type SCYTY = $scyty; - if range.end > i64::MAX as u64 { - return Err(Error::with_msg_no_trace(format!("range.end overflows i64"))); +macro_rules! impl_scaty_array { + ($vt:ty, $st:ty, $st_scy:ty, $table_name:expr) => { + impl ValTy for $vt { + type ScaTy = $st; + type ScyTy = $st_scy; + type Container = EventsDim1; + fn from_scyty(inp: Self::ScyTy) -> Self { + inp.into_iter().map(|x| x as Self::ScaTy).collect() + } + fn table_name() -> &'static str { + $table_name + } + fn default() -> Self { + Vec::new() } - let ret = if fwd { - let ts_lsp_min = if ts_msp < range.beg { range.beg - ts_msp } else { 0 }; - let ts_lsp_max = if ts_msp < range.end { range.end - ts_msp } else { 0 }; - trace!( - "FWD ts_msp {} ts_lsp_min {} ts_lsp_max {} beg {} end {} {}", - ts_msp, - ts_lsp_min, - ts_lsp_max, - range.beg, - range.end, - stringify!($table_name) - ); - // TODO use prepared! - let cql = concat!( - "select ts_lsp, pulse, value from ", - $table_name, - " where series = ? and ts_msp = ? and ts_lsp >= ? and ts_lsp < ?" - ); - let res = scy - .query( - cql, - (series as i64, ts_msp as i64, ts_lsp_min as i64, ts_lsp_max as i64), - ) - .await - .err_conv()?; - let mut last_before = None; - let mut ret = EventsDim1::::empty(); - for row in res.rows_typed_or_empty::<(i64, i64, Vec)>() { - let row = row.err_conv()?; - let ts = ts_msp + row.0 as u64; - let pulse = row.1 as u64; - let value = row.2.into_iter().map(|x| x as ST).collect(); - if ts >= range.end { - // TODO count as logic error - error!("ts >= range.end"); - } else if ts >= range.beg { - ret.push(ts, pulse, value); - } else { - if last_before.is_none() { - warn!("encounter event before range in forward read {ts}"); - } - last_before = Some((ts, pulse, value)); - } - } - ret - } else { - let ts_lsp_max = if ts_msp < range.beg { range.beg - ts_msp } else { 0 }; - trace!( - "BCK {} ts_msp {} ts_lsp_max {} beg {} end {} {}", - stringify!($fname), - ts_msp, - ts_lsp_max, - range.beg, - range.end, - stringify!($table_name) - ); - // TODO use prepared! - let cql = concat!( - "select ts_lsp, pulse, value from ", - $table_name, - " where series = ? and ts_msp = ? and ts_lsp < ? order by ts_lsp desc limit 1" - ); - let res = scy - .query(cql, (series as i64, ts_msp as i64, ts_lsp_max as i64)) - .await - .err_conv()?; - let mut seen_before = false; - let mut ret = EventsDim1::::empty(); - for row in res.rows_typed_or_empty::<(i64, i64, Vec)>() { - let row = row.err_conv()?; - let ts = ts_msp + row.0 as u64; - let pulse = row.1 as u64; - let value = row.2.into_iter().map(|x| x as ST).collect(); - if ts >= range.beg { - // TODO count as logic error - error!("ts >= range.beg"); - } else if ts < range.beg { - ret.push(ts, pulse, value); - } else { - seen_before = true; - } - } - let _ = seen_before; - if ret.len() > 1 { - error!("multiple events in backwards search {}", ret.len()); - } - ret - }; - trace!("found in total {} events ts_msp {}", ret.tss.len(), ts_msp); - Ok(ret) } }; } -read_next_scalar_values!(read_next_values_scalar_u8, u8, i8, "events_scalar_u8"); -read_next_scalar_values!(read_next_values_scalar_u16, u16, i16, "events_scalar_u16"); -read_next_scalar_values!(read_next_values_scalar_u32, u32, i32, "events_scalar_u32"); -read_next_scalar_values!(read_next_values_scalar_u64, u64, i64, "events_scalar_u64"); -read_next_scalar_values!(read_next_values_scalar_i8, i8, i8, "events_scalar_i8"); -read_next_scalar_values!(read_next_values_scalar_i16, i16, i16, "events_scalar_i16"); -read_next_scalar_values!(read_next_values_scalar_i32, i32, i32, "events_scalar_i32"); -read_next_scalar_values!(read_next_values_scalar_i64, i64, i64, "events_scalar_i64"); -read_next_scalar_values!(read_next_values_scalar_f32, f32, f32, "events_scalar_f32"); -read_next_scalar_values!(read_next_values_scalar_f64, f64, f64, "events_scalar_f64"); -read_next_scalar_values!(read_next_values_scalar_bool, bool, bool, "events_scalar_bool"); +impl_scaty_scalar!(u8, i8, "events_scalar_u8"); +impl_scaty_scalar!(u16, i16, "events_scalar_u16"); +impl_scaty_scalar!(u32, i32, "events_scalar_u32"); +impl_scaty_scalar!(u64, i64, "events_scalar_u64"); +impl_scaty_scalar!(i8, i8, "events_scalar_i8"); +impl_scaty_scalar!(i16, i16, "events_scalar_i16"); +impl_scaty_scalar!(i32, i32, "events_scalar_i32"); +impl_scaty_scalar!(i64, i64, "events_scalar_i64"); +impl_scaty_scalar!(f32, f32, "events_scalar_f32"); +impl_scaty_scalar!(f64, f64, "events_scalar_f64"); +impl_scaty_scalar!(bool, bool, "events_scalar_bool"); -read_next_array_values!(read_next_values_array_u8, u8, i8, "events_array_u8"); -read_next_array_values!(read_next_values_array_u16, u16, i16, "events_array_u16"); -read_next_array_values!(read_next_values_array_u32, u32, i32, "events_array_u32"); -read_next_array_values!(read_next_values_array_u64, u64, i64, "events_array_u64"); -read_next_array_values!(read_next_values_array_i8, i8, i8, "events_array_i8"); -read_next_array_values!(read_next_values_array_i16, i16, i16, "events_array_i16"); -read_next_array_values!(read_next_values_array_i32, i32, i32, "events_array_i32"); -read_next_array_values!(read_next_values_array_i64, i64, i64, "events_array_i64"); -read_next_array_values!(read_next_values_array_f32, f32, f32, "events_array_f32"); -read_next_array_values!(read_next_values_array_f64, f64, f64, "events_array_f64"); -read_next_array_values!(read_next_values_array_bool, bool, bool, "events_array_bool"); +impl_scaty_array!(Vec, u8, Vec, "events_array_u8"); +impl_scaty_array!(Vec, u16, Vec, "events_array_u16"); +impl_scaty_array!(Vec, u32, Vec, "events_array_u32"); +impl_scaty_array!(Vec, u64, Vec, "events_array_u64"); +impl_scaty_array!(Vec, i8, Vec, "events_array_i8"); +impl_scaty_array!(Vec, i16, Vec, "events_array_i16"); +impl_scaty_array!(Vec, i32, Vec, "events_array_i32"); +impl_scaty_array!(Vec, i64, Vec, "events_array_i64"); +impl_scaty_array!(Vec, f32, Vec, "events_array_f32"); +impl_scaty_array!(Vec, f64, Vec, "events_array_f64"); +impl_scaty_array!(Vec, bool, Vec, "events_array_bool"); -macro_rules! read_values { - ($fname:ident, $st:ty, $scyty:ty, $self:expr, $ts_msp:expr) => {{ - let fut = $fname::<$st, $scyty>($self.series, $ts_msp, $self.range.clone(), $self.fwd, $self.scy.clone()); - let fut = fut.map(|x| match x { - Ok(k) => { - let self_name = std::any::type_name::(); - trace!("{self_name} read values len {}", k.len()); - let b = Box::new(k) as Box; - Ok(b) +struct ReadNextValuesOpts { + series: u64, + ts_msp: u64, + range: NanoRange, + fwd: bool, + with_values: bool, + scy: Arc, +} + +async fn read_next_values(opts: ReadNextValuesOpts) -> Result, Error> +where + ST: ValTy, +{ + let series = opts.series; + let ts_msp = opts.ts_msp; + let range = opts.range; + let fwd = opts.fwd; + let scy = opts.scy; + let table_name = ST::table_name(); + if range.end > i64::MAX as u64 { + return Err(Error::with_msg_no_trace(format!("range.end overflows i64"))); + } + let cql_fields = if opts.with_values { + "ts_lsp, pulse, value" + } else { + "ts_lsp, pulse" + }; + let ret = if fwd { + let ts_lsp_min = if ts_msp < range.beg { range.beg - ts_msp } else { 0 }; + let ts_lsp_max = if ts_msp < range.end { range.end - ts_msp } else { 0 }; + trace!( + "FWD ts_msp {} ts_lsp_min {} ts_lsp_max {} beg {} end {} {}", + ts_msp, + ts_lsp_min, + ts_lsp_max, + range.beg, + range.end, + table_name, + ); + // TODO use prepared! + let cql = format!( + concat!( + "select {} from {}", + " where series = ? and ts_msp = ? and ts_lsp >= ? and ts_lsp < ?" + ), + cql_fields, table_name, + ); + let res = scy + .query( + cql, + (series as i64, ts_msp as i64, ts_lsp_min as i64, ts_lsp_max as i64), + ) + .await + .err_conv()?; + let mut last_before = None; + let mut ret = ST::Container::empty(); + for row in res.rows().err_conv()? { + let (ts, pulse, value) = if opts.with_values { + let row: (i64, i64, ST::ScyTy) = row.into_typed().err_conv()?; + let ts = ts_msp + row.0 as u64; + let pulse = row.1 as u64; + let value = ValTy::from_scyty(row.2); + (ts, pulse, value) + } else { + let row: (i64, i64) = row.into_typed().err_conv()?; + let ts = ts_msp + row.0 as u64; + let pulse = row.1 as u64; + let value = ValTy::default(); + (ts, pulse, value) + }; + if ts >= range.end { + // TODO count as logic error + error!("ts >= range.end"); + } else if ts >= range.beg { + if pulse % 27 != 3618 { + ret.push(ts, pulse, value); + } + } else { + if last_before.is_none() { + warn!("encounter event before range in forward read {ts}"); + } + last_before = Some((ts, pulse, value)); } - Err(e) => Err(e), - }); - let fut = Box::pin(fut) as Pin, Error>> + Send>>; - fut - }}; + } + ret + } else { + let ts_lsp_max = if ts_msp < range.beg { range.beg - ts_msp } else { 0 }; + trace!( + "BCK ts_msp {} ts_lsp_max {} beg {} end {} {}", + ts_msp, + ts_lsp_max, + range.beg, + range.end, + table_name, + ); + // TODO use prepared! + let cql = format!( + concat!( + "select {} from {}", + " where series = ? and ts_msp = ? and ts_lsp < ? order by ts_lsp desc limit 1" + ), + cql_fields, table_name, + ); + let res = scy + .query(cql, (series as i64, ts_msp as i64, ts_lsp_max as i64)) + .await + .err_conv()?; + let mut seen_before = false; + let mut ret = ST::Container::empty(); + for row in res.rows().err_conv()? { + let (ts, pulse, value) = if opts.with_values { + let row: (i64, i64, ST::ScyTy) = row.into_typed().err_conv()?; + let ts = ts_msp + row.0 as u64; + let pulse = row.1 as u64; + let value = ValTy::from_scyty(row.2); + (ts, pulse, value) + } else { + let row: (i64, i64) = row.into_typed().err_conv()?; + let ts = ts_msp + row.0 as u64; + let pulse = row.1 as u64; + let value = ValTy::default(); + (ts, pulse, value) + }; + if ts >= range.beg { + // TODO count as logic error + error!("ts >= range.beg"); + } else if ts < range.beg { + if pulse % 27 != 3618 { + ret.push(ts, pulse, value); + } + } else { + seen_before = true; + } + } + let _ = seen_before; + if ret.len() > 1 { + error!("multiple events in backwards search {}", ret.len()); + } + ret + }; + trace!("read ts_msp {} len {}", ts_msp, ret.len()); + let ret = Box::new(ret); + Ok(ret) } struct ReadValues { @@ -318,6 +280,7 @@ struct ReadValues { range: NanoRange, ts_msps: VecDeque, fwd: bool, + with_values: bool, fut: Pin, Error>> + Send>>, fut_done: bool, scy: Arc, @@ -331,6 +294,7 @@ impl ReadValues { range: NanoRange, ts_msps: VecDeque, fwd: bool, + with_values: bool, scy: Arc, ) -> Self { let mut ret = Self { @@ -340,6 +304,7 @@ impl ReadValues { range, ts_msps, fwd, + with_values, fut: Box::pin(futures_util::future::ready(Err(Error::with_msg_no_trace( "future not initialized", )))), @@ -361,92 +326,59 @@ impl ReadValues { } fn make_fut(&mut self, ts_msp: u64) -> Pin, Error>> + Send>> { - let fut = match &self.shape { - Shape::Scalar => match &self.scalar_type { - ScalarType::U8 => { - read_values!(read_next_values_scalar_u8, u8, i8, self, ts_msp) - } - ScalarType::U16 => { - read_values!(read_next_values_scalar_u16, u16, i16, self, ts_msp) - } - ScalarType::U32 => { - read_values!(read_next_values_scalar_u32, u32, i32, self, ts_msp) - } - ScalarType::U64 => { - read_values!(read_next_values_scalar_u64, u64, i64, self, ts_msp) - } - ScalarType::I8 => { - read_values!(read_next_values_scalar_i8, i8, i8, self, ts_msp) - } - ScalarType::I16 => { - read_values!(read_next_values_scalar_i16, i16, i16, self, ts_msp) - } - ScalarType::I32 => { - read_values!(read_next_values_scalar_i32, i32, i32, self, ts_msp) - } - ScalarType::I64 => { - read_values!(read_next_values_scalar_i64, i64, i64, self, ts_msp) - } - ScalarType::F32 => { - read_values!(read_next_values_scalar_f32, f32, f32, self, ts_msp) - } - ScalarType::F64 => { - read_values!(read_next_values_scalar_f64, f64, f64, self, ts_msp) - } - ScalarType::BOOL => { - read_values!(read_next_values_scalar_bool, bool, bool, self, ts_msp) - } + let opts = ReadNextValuesOpts { + series: self.series.clone(), + ts_msp, + range: self.range.clone(), + fwd: self.fwd, + with_values: self.with_values, + scy: self.scy.clone(), + }; + let scalar_type = self.scalar_type.clone(); + let shape = self.shape.clone(); + let fut = async move { + match &shape { + Shape::Scalar => match &scalar_type { + ScalarType::U8 => read_next_values::(opts).await, + ScalarType::U16 => read_next_values::(opts).await, + ScalarType::U32 => read_next_values::(opts).await, + ScalarType::U64 => read_next_values::(opts).await, + ScalarType::I8 => read_next_values::(opts).await, + ScalarType::I16 => read_next_values::(opts).await, + ScalarType::I32 => read_next_values::(opts).await, + ScalarType::I64 => read_next_values::(opts).await, + ScalarType::F32 => read_next_values::(opts).await, + ScalarType::F64 => read_next_values::(opts).await, + ScalarType::BOOL => read_next_values::(opts).await, + _ => { + error!("TODO ReadValues add more types"); + err::todoval() + } + }, + Shape::Wave(_) => match &scalar_type { + ScalarType::U8 => read_next_values::>(opts).await, + ScalarType::U16 => read_next_values::>(opts).await, + ScalarType::U32 => read_next_values::>(opts).await, + ScalarType::U64 => read_next_values::>(opts).await, + ScalarType::I8 => read_next_values::>(opts).await, + ScalarType::I16 => read_next_values::>(opts).await, + ScalarType::I32 => read_next_values::>(opts).await, + ScalarType::I64 => read_next_values::>(opts).await, + ScalarType::F32 => read_next_values::>(opts).await, + ScalarType::F64 => read_next_values::>(opts).await, + ScalarType::BOOL => read_next_values::>(opts).await, + _ => { + error!("TODO ReadValues add more types"); + err::todoval() + } + }, _ => { error!("TODO ReadValues add more types"); err::todoval() } - }, - Shape::Wave(_) => match &self.scalar_type { - ScalarType::U8 => { - read_values!(read_next_values_array_u8, u8, i8, self, ts_msp) - } - ScalarType::U16 => { - read_values!(read_next_values_array_u16, u16, i16, self, ts_msp) - } - ScalarType::U32 => { - read_values!(read_next_values_array_u32, u32, i32, self, ts_msp) - } - ScalarType::U64 => { - read_values!(read_next_values_array_u64, u64, i64, self, ts_msp) - } - ScalarType::I8 => { - read_values!(read_next_values_array_i8, i8, i8, self, ts_msp) - } - ScalarType::I16 => { - read_values!(read_next_values_array_i16, i16, i16, self, ts_msp) - } - ScalarType::I32 => { - read_values!(read_next_values_array_i32, i32, i32, self, ts_msp) - } - ScalarType::I64 => { - read_values!(read_next_values_array_i64, i64, i64, self, ts_msp) - } - ScalarType::F32 => { - read_values!(read_next_values_array_f32, f32, f32, self, ts_msp) - } - ScalarType::F64 => { - read_values!(read_next_values_array_f64, f64, f64, self, ts_msp) - } - ScalarType::BOOL => { - info!("attempt to read bool"); - read_values!(read_next_values_array_bool, bool, bool, self, ts_msp) - } - _ => { - error!("TODO ReadValues add more types"); - err::todoval() - } - }, - _ => { - error!("TODO ReadValues add more types"); - err::todoval() } }; - fut + Box::pin(fut) } } @@ -472,6 +404,7 @@ pub struct EventsStreamScylla { scy: Arc, do_test_stream_error: bool, found_one_after: bool, + with_values: bool, outqueue: VecDeque>, } @@ -482,6 +415,7 @@ impl EventsStreamScylla { do_one_before_range: bool, scalar_type: ScalarType, shape: Shape, + with_values: bool, scy: Arc, do_test_stream_error: bool, ) -> Self { @@ -497,6 +431,7 @@ impl EventsStreamScylla { scy, do_test_stream_error, found_one_after: false, + with_values, outqueue: VecDeque::new(), } } @@ -524,6 +459,7 @@ impl EventsStreamScylla { self.range.clone(), [msp].into(), false, + self.with_values, self.scy.clone(), ); self.state = FrState::ReadBack1(st); @@ -536,6 +472,7 @@ impl EventsStreamScylla { self.range.clone(), mem::replace(&mut self.ts_msp_fwd, VecDeque::new()), true, + self.with_values, self.scy.clone(), ); self.state = FrState::ReadValues(st); @@ -556,6 +493,7 @@ impl EventsStreamScylla { self.range.clone(), mem::replace(&mut self.ts_msp_fwd, VecDeque::new()), true, + self.with_values, self.scy.clone(), ); self.state = FrState::ReadValues(st); @@ -572,6 +510,7 @@ impl EventsStreamScylla { self.range.clone(), [msp].into(), false, + self.with_values, self.scy.clone(), ); self.state = FrState::ReadBack2(st); @@ -584,6 +523,7 @@ impl EventsStreamScylla { self.range.clone(), mem::replace(&mut self.ts_msp_fwd, VecDeque::new()), true, + self.with_values, self.scy.clone(), ); self.state = FrState::ReadValues(st); @@ -607,6 +547,7 @@ impl EventsStreamScylla { self.range.clone(), mem::replace(&mut self.ts_msp_fwd, VecDeque::new()), true, + self.with_values, self.scy.clone(), ); self.state = FrState::ReadValues(st);