diff --git a/dbconn/src/events_scylla.rs b/dbconn/src/events_scylla.rs index c21dc9c..2bbd89d 100644 --- a/dbconn/src/events_scylla.rs +++ b/dbconn/src/events_scylla.rs @@ -7,13 +7,12 @@ use items::{EventsDyn, RangeCompletableItem, Sitemty, StreamItem}; use netpod::log::*; use netpod::query::{ChannelStateEventsQuery, RawEventsQuery}; use netpod::timeunits::DAY; -use netpod::{Channel, Database, NanoRange, ScalarType, ScyllaConfig, Shape}; +use netpod::{Database, NanoRange, ScalarType, ScyllaConfig, Shape}; use scylla::Session as ScySession; use std::collections::VecDeque; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use tokio_postgres::Client as PgClient; macro_rules! read_values { ($fname:ident, $self:expr, $ts_msp:expr) => {{ @@ -359,68 +358,9 @@ impl Stream for EventsStreamScylla { } } -async fn find_series(channel: &Channel, pgclient: Arc) -> Result<(u64, ScalarType, Shape), Error> { - info!("find_series channel {:?}", channel); - let rows = if let Some(series) = channel.series() { - let q = "select series, facility, channel, scalar_type, shape_dims from series_by_channel where series = $1"; - pgclient.query(q, &[&(series as i64)]).await.err_conv()? - } else { - let q = "select series, facility, channel, scalar_type, shape_dims from series_by_channel where facility = $1 and channel = $2"; - pgclient - .query(q, &[&channel.backend(), &channel.name()]) - .await - .err_conv()? - }; - if rows.len() < 1 { - return Err(Error::with_public_msg_no_trace(format!( - "No series found for {channel:?}" - ))); - } - if rows.len() > 1 { - error!("Multiple series found for {channel:?}"); - return Err(Error::with_public_msg_no_trace( - "Multiple series found for channel, can not return data for ambiguous series", - )); - } - let row = rows - .into_iter() - .next() - .ok_or_else(|| Error::with_public_msg_no_trace(format!("can not find series for channel")))?; - let series = row.get::<_, i64>(0) as u64; - let _facility: String = row.get(1); - let _channel: String = row.get(2); - let a: i32 = row.get(3); - let scalar_type = ScalarType::from_scylla_i32(a)?; - let a: Vec = row.get(4); - let shape = Shape::from_scylla_shape_dims(&a)?; - Ok((series, scalar_type, shape)) -} - -async fn find_ts_msp(series: i64, range: NanoRange, scy: Arc) -> Result, Error> { - trace!("find_ts_msp series {} {:?}", series, range); - // TODO use prepared statements - let cql = "select ts_msp from ts_msp where series = ? and ts_msp <= ? order by ts_msp desc limit 2"; - let res = scy.query(cql, (series, range.beg as i64)).await.err_conv()?; - let mut before = VecDeque::new(); - for row in res.rows_typed_or_empty::<(i64,)>() { - let row = row.err_conv()?; - before.push_front(row.0 as u64); - } - let cql = "select ts_msp from ts_msp where series = ? and ts_msp > ? and ts_msp < ?"; - let res = scy - .query(cql, (series, range.beg as i64, range.end as i64)) - .await - .err_conv()?; - let mut ret = VecDeque::new(); - for h in before { - ret.push_back(h); - } - for row in res.rows_typed_or_empty::<(i64,)>() { - let row = row.err_conv()?; - ret.push_back(row.0 as u64); - } - trace!("found in total {} rows", ret.len()); - Ok(ret) +async fn find_ts_msp(_series: i64, _range: NanoRange, _scy: Arc) -> Result, Error> { + // TODO remove + panic!() } macro_rules! read_next_scalar_values { @@ -537,42 +477,13 @@ read_next_scalar_values!(read_next_values_scalar_f64, f64, f64, "events_scalar_f read_next_array_values!(read_next_values_array_u16, u16, i16, "events_wave_u16"); pub async fn make_scylla_stream( - evq: &RawEventsQuery, - scyco: &ScyllaConfig, - dbconf: Database, - do_test_stream_error: bool, + _evq: &RawEventsQuery, + _scyco: &ScyllaConfig, + _dbconf: Database, + _do_test_stream_error: bool, ) -> Result>> + Send>>, Error> { - // TODO should RawEventsQuery already contain ScalarType and Shape? - let (series, scalar_type, shape) = { - let u = { - let d = &dbconf; - format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, d.port, d.name) - }; - info!("--------------- open postgres connection"); - let (pgclient, pgconn) = tokio_postgres::connect(&u, tokio_postgres::NoTls).await.err_conv()?; - // TODO use common connection/pool: - tokio::spawn(pgconn); - let pgclient = Arc::new(pgclient); - find_series(&evq.channel, pgclient.clone()).await? - }; - // TODO reuse existing connection: - info!("--------------- open scylla connection"); - let scy = scylla::SessionBuilder::new() - .known_nodes(&scyco.hosts) - .use_keyspace(&scyco.keyspace, true) - .build() - .await - .err_conv()?; - let scy = Arc::new(scy); - let res = Box::pin(EventsStreamScylla::new( - series, - evq, - scalar_type, - shape, - scy, - do_test_stream_error, - )) as _; - Ok(res) + error!("forward call to crate scyllaconn"); + err::todoval() } pub async fn channel_state_events( diff --git a/items_2/src/binsdim0.rs b/items_2/src/binsdim0.rs index 8158d1a..6743f0b 100644 --- a/items_2/src/binsdim0.rs +++ b/items_2/src/binsdim0.rs @@ -581,6 +581,8 @@ impl TimeBinner for BinsDim0TimeBinner { } } } + + fn set_range_complete(&mut self) {} } impl TimeBinned for BinsDim0 { diff --git a/items_2/src/eventsdim0.rs b/items_2/src/eventsdim0.rs index 6cf16a4..a608f17 100644 --- a/items_2/src/eventsdim0.rs +++ b/items_2/src/eventsdim0.rs @@ -32,6 +32,13 @@ impl EventsDim0 { self.pulses.push_back(pulse); self.values.push_back(value); } + + #[inline(always)] + pub fn push_front(&mut self, ts: u64, pulse: u64, value: NTY) { + self.tss.push_front(ts); + self.pulses.push_front(pulse); + self.values.push_front(value); + } } impl Empty for EventsDim0 { @@ -751,4 +758,6 @@ impl TimeBinner for EventsDim0TimeBinner { } } } + + fn set_range_complete(&mut self) {} } diff --git a/items_2/src/items_2.rs b/items_2/src/items_2.rs index ce899b0..976d570 100644 --- a/items_2/src/items_2.rs +++ b/items_2/src/items_2.rs @@ -23,6 +23,8 @@ use std::time::Instant; use streams::Collectable; use streams::ToJsonResult; +use crate::streams::Collector; + pub fn bool_is_false(x: &bool) -> bool { *x == false } @@ -235,6 +237,8 @@ pub trait TimeBinner: Send { /// to `push_in_progress` did not change the result count, as long as edges are left. /// The next call to `Self::bins_ready_count` must return one higher count than before. fn cycle(&mut self); + + fn set_range_complete(&mut self); } /// Provides a time-binned representation of the implementing type. @@ -630,7 +634,7 @@ impl ChannelEventsMerger { Ready(Some(Ok(k))) => { if let ChannelEvents::Events(events) = &k { if events.len() == 0 { - eprintln!("ERROR bad events item {events:?}"); + warn!("empty events item {events:?}"); } else { trace!("\nrefilled with events {}\nREFILLED\n{:?}\n\n", events.len(), events); } @@ -750,6 +754,7 @@ impl Collectable for Box { } } +// TODO handle status information. pub async fn binned_collected( scalar_type: ScalarType, shape: Shape, @@ -759,8 +764,44 @@ pub async fn binned_collected( inp: Pin> + Send>>, ) -> Result, Error> { let deadline = Instant::now() + timeout; + let mut did_timeout = false; let bin_count_exp = edges.len().max(2) as u32 - 1; let do_time_weight = agg_kind.do_time_weighted(); + // TODO maybe TimeBinner should take all ChannelEvents and handle this? + let mut did_range_complete = false; + fn flush_binned( + binner: &mut Box, + coll: &mut Option>, + bin_count_exp: u32, + force: bool, + ) -> Result<(), Error> { + //info!("bins_ready_count: {}", binner.bins_ready_count()); + if force { + if binner.bins_ready_count() == 0 { + warn!("cycle the binner"); + binner.cycle(); + } else { + warn!("binner was some ready, do nothing"); + } + } + if binner.bins_ready_count() > 0 { + let ready = binner.bins_ready(); + match ready { + Some(mut ready) => { + trace!("binned_collected ready {ready:?}"); + if coll.is_none() { + *coll = Some(ready.as_collectable_mut().new_collector(bin_count_exp)); + } + let cl = coll.as_mut().unwrap(); + cl.ingest(ready.as_collectable_mut()); + Ok(()) + } + None => Err(format!("bins_ready_count but no result").into()), + } + } else { + Ok(()) + } + } let mut coll = None; let mut binner = None; let empty_item = empty_events_dyn_2(&scalar_type, &shape, &AggKind::TimeWeightedScalar); @@ -776,6 +817,7 @@ pub async fn binned_collected( } }, _ = tokio::time::sleep_until(deadline.into()).fuse() => { + did_timeout = true; break; } }; @@ -788,51 +830,31 @@ pub async fn binned_collected( } let binner = binner.as_mut().unwrap(); binner.ingest(events.as_time_binnable()); - trace!("bins_ready_count: {}", binner.bins_ready_count()); - if binner.bins_ready_count() > 0 { - let ready = binner.bins_ready(); - match ready { - Some(mut ready) => { - trace!("binned_collected ready {ready:?}"); - if coll.is_none() { - coll = Some(ready.as_collectable_mut().new_collector(bin_count_exp)); - } - let cl = coll.as_mut().unwrap(); - cl.ingest(ready.as_collectable_mut()); - } - None => { - return Err(format!("bins_ready_count but no result").into()); - } - } - } + flush_binned(binner, &mut coll, bin_count_exp, false)?; } ChannelEvents::Status(_) => { - trace!("binned_collected TODO Status"); + warn!("binned_collected TODO Status"); } ChannelEvents::RangeComplete => { - trace!("binned_collected TODO RangeComplete"); + warn!("binned_collected TODO RangeComplete"); + did_range_complete = true; } } } if let Some(mut binner) = binner { - binner.cycle(); - // TODO merge with the same logic above in the loop. - if binner.bins_ready_count() > 0 { - let ready = binner.bins_ready(); - match ready { - Some(mut ready) => { - trace!("binned_collected ready {ready:?}"); - if coll.is_none() { - coll = Some(ready.as_collectable_mut().new_collector(bin_count_exp)); - } - let cl = coll.as_mut().unwrap(); - cl.ingest(ready.as_collectable_mut()); - } - None => { - return Err(format!("binned_collected bins_ready_count but no result").into()); - } - } + if did_range_complete { + binner.set_range_complete(); } + if !did_timeout { + binner.cycle(); + } + flush_binned(&mut binner, &mut coll, bin_count_exp, false)?; + if coll.is_none() { + warn!("force a bin"); + flush_binned(&mut binner, &mut coll, bin_count_exp, true)?; + } + } else { + error!("no binner, should always have one"); } match coll { Some(mut coll) => { diff --git a/scyllaconn/src/events.rs b/scyllaconn/src/events.rs index 8baf423..4100a7c 100644 --- a/scyllaconn/src/events.rs +++ b/scyllaconn/src/events.rs @@ -14,6 +14,172 @@ use std::sync::Arc; use std::task::{Context, Poll}; use tokio_postgres::Client as PgClient; +async fn find_ts_msp(series: i64, range: NanoRange, scy: Arc) -> Result, Error> { + info!("find_ts_msp series {} {:?}", series, range); + let mut ret = VecDeque::new(); + // TODO use prepared statements + let cql = "select ts_msp from ts_msp where series = ? and ts_msp > ? and ts_msp < ?"; + let res = scy + .query(cql, (series, range.beg as i64, range.end as i64)) + .await + .err_conv()?; + for row in res.rows_typed_or_empty::<(i64,)>() { + let row = row.err_conv()?; + ret.push_back(row.0 as u64); + } + let cql = "select ts_msp from ts_msp where series = ? and ts_msp >= ? limit 1"; + let res = scy.query(cql, (series, range.end as i64)).await.err_conv()?; + for row in res.rows_typed_or_empty::<(i64,)>() { + let row = row.err_conv()?; + ret.push_back(row.0 as u64); + } + let cql = "select ts_msp from ts_msp where series = ? and ts_msp <= ? order by ts_msp desc limit 2"; + let res = scy.query(cql, (series, range.beg as i64)).await.err_conv()?; + for row in res.rows_typed_or_empty::<(i64,)>() { + let row = row.err_conv()?; + ret.push_front(row.0 as u64); + } + trace!("found in total {} rows", ret.len()); + Ok(ret) +} + +macro_rules! read_next_scalar_values { + ($fname:ident, $st:ty, $scyty:ty, $table_name:expr) => { + async fn $fname( + series: i64, + ts_msp: u64, + range: NanoRange, + fwd: bool, + do_one_before: bool, + scy: Arc, + ) -> Result, Error> { + type ST = $st; + type SCYTY = $scyty; + if ts_msp >= range.end { + warn!( + "given ts_msp {} >= range.end {} not necessary to read this", + ts_msp, range.end + ); + } + if range.end > i64::MAX as u64 { + return Err(Error::with_msg_no_trace(format!("range.end overflows i64"))); + } + let res = if fwd { + let ts_lsp_min = if ts_msp < range.beg { range.beg - ts_msp } else { 0 }; + let ts_lsp_max = if ts_msp < range.end { range.end - ts_msp } else { 0 }; + trace!( + "FWD ts_msp {} ts_lsp_min {} ts_lsp_max {} beg {} end {} {}", + ts_msp, + ts_lsp_min, + ts_lsp_max, + range.beg, + range.end, + stringify!($fname) + ); + // TODO use prepared! + let cql = concat!( + "select ts_lsp, pulse, value from ", + $table_name, + " where series = ? and ts_msp = ? and ts_lsp >= ? and ts_lsp < ?" + ); + scy.query(cql, (series, ts_msp as i64, ts_lsp_min as i64, ts_lsp_max as i64)) + .await + .err_conv()? + } else { + let ts_lsp_max = if ts_msp < range.beg { range.beg - ts_msp } else { 0 }; + info!( + "BCK ts_msp {} ts_lsp_max {} beg {} end {} {}", + ts_msp, + ts_lsp_max, + range.beg, + range.end, + stringify!($fname) + ); + // TODO use prepared! + let cql = concat!( + "select ts_lsp, pulse, value from ", + $table_name, + " where series = ? and ts_msp = ? and ts_lsp < ? order by ts_lsp desc limit 1" + ); + scy.query(cql, (series, ts_msp as i64, ts_lsp_max as i64)) + .await + .err_conv()? + }; + let mut last_before = None; + let mut ret = EventsDim0::::empty(); + for row in res.rows_typed_or_empty::<(i64, i64, SCYTY)>() { + let row = row.err_conv()?; + let ts = ts_msp + row.0 as u64; + let pulse = row.1 as u64; + let value = row.2 as ST; + if ts >= range.end { + } else if ts >= range.beg { + ret.push(ts, pulse, value); + } else { + last_before = Some((ts, pulse, value)); + } + } + if do_one_before { + if let Some((ts, pulse, value)) = last_before { + info!("PREPENDING THE LAST BEFORE {ts} {value:?}"); + ret.push_front(ts, pulse, value); + } + } + info!("found in total {} events ts_msp {}", ret.tss.len(), ts_msp); + Ok(ret) + } + }; +} + +macro_rules! read_next_array_values { + ($fname:ident, $st:ty, $scyty:ty, $table_name:expr) => { + async fn $fname( + series: i64, + ts_msp: u64, + _range: NanoRange, + _fwd: bool, + _do_one_before: bool, + scy: Arc, + ) -> Result, Error> { + // TODO change return type: so far EventsDim1 does not exist. + error!("TODO read_next_array_values"); + err::todo(); + if true { + return Err(Error::with_msg_no_trace("redo based on scalar case")); + } + type ST = $st; + type _SCYTY = $scyty; + info!("{} series {} ts_msp {}", stringify!($fname), series, ts_msp); + let cql = concat!( + "select ts_lsp, pulse, value from ", + $table_name, + " where series = ? and ts_msp = ?" + ); + let _res = scy.query(cql, (series, ts_msp as i64)).await.err_conv()?; + let ret = EventsDim0::::empty(); + /* + for row in res.rows_typed_or_empty::<(i64, i64, Vec)>() { + let row = row.err_conv()?; + let ts = ts_msp + row.0 as u64; + let pulse = row.1 as u64; + let value = row.2.into_iter().map(|x| x as ST).collect(); + ret.push(ts, pulse, value); + } + */ + info!("found in total {} events ts_msp {}", ret.tss.len(), ts_msp); + Ok(ret) + } + }; +} + +read_next_scalar_values!(read_next_values_scalar_i8, i8, i8, "events_scalar_i8"); +read_next_scalar_values!(read_next_values_scalar_i16, i16, i16, "events_scalar_i16"); +read_next_scalar_values!(read_next_values_scalar_i32, i32, i32, "events_scalar_i32"); +read_next_scalar_values!(read_next_values_scalar_f32, f32, f32, "events_scalar_f32"); +read_next_scalar_values!(read_next_values_scalar_f64, f64, f64, "events_scalar_f64"); + +read_next_array_values!(read_next_values_array_u16, u16, i16, "events_wave_u16"); + macro_rules! read_values { ($fname:ident, $self:expr, $ts_msp:expr) => {{ let fut = $fname( @@ -80,7 +246,7 @@ impl ReadValues { fn next(&mut self) -> bool { if let Some(ts_msp) = self.ts_msps.pop_front() { - self.fut = self.make_fut(ts_msp, self.ts_msps.len() > 1); + self.fut = self.make_fut(ts_msp, self.ts_msps.len() > 0); true } else { false @@ -178,8 +344,8 @@ impl EventsStreamScylla { } } - fn ts_msps_found_one_before(&mut self, ts_msps: VecDeque) { - info!("ts_msps_found_one_before ts_msps {ts_msps:?}"); + fn ts_msps_found(&mut self, ts_msps: VecDeque) { + info!("ts_msps_found ts_msps {ts_msps:?}"); self.ts_msps = ts_msps; // Find the largest MSP which can potentially contain some event before the range. let befores: Vec<_> = self @@ -188,7 +354,8 @@ impl EventsStreamScylla { .map(|x| *x) .filter(|x| *x < self.range.beg) .collect(); - if befores.len() >= 1 { + if self.do_one_before_range && befores.len() >= 1 { + info!("Try ReadBack1"); let st = ReadValues::new( self.series as i64, self.scalar_type.clone(), @@ -201,6 +368,7 @@ impl EventsStreamScylla { ); self.state = FrState::ReadBack1(st); } else if self.ts_msps.len() >= 1 { + info!("Go straight for forward read"); let st = ReadValues::new( self.series as i64, self.scalar_type.clone(), @@ -217,13 +385,10 @@ impl EventsStreamScylla { } } - fn ts_msps_found(&mut self, ts_msps: VecDeque) { - self.ts_msps_found_one_before(ts_msps); - } - fn back_1_done(&mut self, item: Box) -> Option> { info!("back_1_done len {}", item.len()); if item.len() == 0 { + info!("ReadBack1 returned empty"); // Find the 2nd largest MSP which can potentially contain some event before the range. let befores: Vec<_> = self .ts_msps @@ -232,6 +397,7 @@ impl EventsStreamScylla { .filter(|x| *x < self.range.beg) .collect(); if befores.len() >= 2 { + info!("Try ReadBack2"); let st = ReadValues::new( self.series as i64, self.scalar_type.clone(), @@ -245,6 +411,7 @@ impl EventsStreamScylla { self.state = FrState::ReadBack2(st); None } else if self.ts_msps.len() >= 1 { + info!("No 2nd back MSP, go for forward read"); let st = ReadValues::new( self.series as i64, self.scalar_type.clone(), @@ -262,6 +429,7 @@ impl EventsStreamScylla { None } } else { + info!("FOUND ONE BEFORE"); if self.ts_msps.len() > 0 { let st = ReadValues::new( self.series as i64, @@ -284,6 +452,9 @@ impl EventsStreamScylla { fn back_2_done(&mut self, item: Box) -> Option> { info!("back_2_done len {}", item.len()); + if item.len() == 0 { + info!("ReadBack2 returned empty"); + } if self.ts_msps.len() >= 1 { let st = ReadValues::new( self.series as i64, @@ -424,156 +595,6 @@ pub async fn find_series(channel: &Channel, pgclient: Arc) -> Result<( Ok((series, scalar_type, shape)) } -async fn find_ts_msp(series: i64, range: NanoRange, scy: Arc) -> Result, Error> { - trace!("find_ts_msp series {} {:?}", series, range); - // TODO use prepared statements - let cql = "select ts_msp from ts_msp where series = ? and ts_msp <= ? order by ts_msp desc limit 2"; - let res = scy.query(cql, (series, range.beg as i64)).await.err_conv()?; - let mut before = VecDeque::new(); - for row in res.rows_typed_or_empty::<(i64,)>() { - let row = row.err_conv()?; - before.push_front(row.0 as u64); - } - let cql = "select ts_msp from ts_msp where series = ? and ts_msp > ? and ts_msp < ?"; - let res = scy - .query(cql, (series, range.beg as i64, range.end as i64)) - .await - .err_conv()?; - let mut ret = VecDeque::new(); - for h in before { - ret.push_back(h); - } - for row in res.rows_typed_or_empty::<(i64,)>() { - let row = row.err_conv()?; - ret.push_back(row.0 as u64); - } - trace!("found in total {} rows", ret.len()); - Ok(ret) -} - -macro_rules! read_next_scalar_values { - ($fname:ident, $st:ty, $scyty:ty, $table_name:expr) => { - async fn $fname( - series: i64, - ts_msp: u64, - range: NanoRange, - fwd: bool, - do_one_before: bool, - scy: Arc, - ) -> Result, Error> { - type ST = $st; - type SCYTY = $scyty; - if ts_msp >= range.end { - warn!("given ts_msp {} >= range.end {}", ts_msp, range.end); - } - if range.end > i64::MAX as u64 { - return Err(Error::with_msg_no_trace(format!("range.end overflows i64"))); - } - let res = if fwd { - let ts_lsp_min = if ts_msp < range.beg { range.beg - ts_msp } else { 0 }; - let ts_lsp_max = if ts_msp < range.end { range.end - ts_msp } else { 0 }; - trace!( - "FWD ts_msp {} ts_lsp_min {} ts_lsp_max {} {}", - ts_msp, - ts_lsp_min, - ts_lsp_max, - stringify!($fname) - ); - // TODO use prepared! - let cql = concat!( - "select ts_lsp, pulse, value from ", - $table_name, - " where series = ? and ts_msp = ? and ts_lsp >= ? and ts_lsp < ?" - ); - scy.query(cql, (series, ts_msp as i64, ts_lsp_min as i64, ts_lsp_max as i64)) - .await - .err_conv()? - } else { - let ts_lsp_max = if ts_msp < range.beg { range.beg - ts_msp } else { 0 }; - info!( - "BCK ts_msp {} ts_lsp_max {} range beg {} end {} {}", - ts_msp, - ts_lsp_max, - range.beg, - range.end, - stringify!($fname) - ); - // TODO use prepared! - let cql = concat!( - "select ts_lsp, pulse, value from ", - $table_name, - " where series = ? and ts_msp = ? and ts_lsp < ? order by ts_lsp desc limit 1" - ); - scy.query(cql, (series, ts_msp as i64, ts_lsp_max as i64)) - .await - .err_conv()? - }; - let mut ret = EventsDim0::::empty(); - for row in res.rows_typed_or_empty::<(i64, i64, SCYTY)>() { - let row = row.err_conv()?; - let ts = ts_msp + row.0 as u64; - // TODO this should probably better be done at cql level. - if do_one_before || ts >= range.beg { - let pulse = row.1 as u64; - let value = row.2 as ST; - ret.push(ts, pulse, value); - } - } - trace!("found in total {} events ts_msp {}", ret.tss.len(), ts_msp); - Ok(ret) - } - }; -} - -macro_rules! read_next_array_values { - ($fname:ident, $st:ty, $scyty:ty, $table_name:expr) => { - async fn $fname( - series: i64, - ts_msp: u64, - _range: NanoRange, - _fwd: bool, - _do_one_before: bool, - scy: Arc, - ) -> Result, Error> { - // TODO change return type: so far EventsDim1 does not exist. - error!("TODO read_next_array_values"); - err::todo(); - if true { - return Err(Error::with_msg_no_trace("redo based on scalar case")); - } - type ST = $st; - type _SCYTY = $scyty; - info!("{} series {} ts_msp {}", stringify!($fname), series, ts_msp); - let cql = concat!( - "select ts_lsp, pulse, value from ", - $table_name, - " where series = ? and ts_msp = ?" - ); - let _res = scy.query(cql, (series, ts_msp as i64)).await.err_conv()?; - let ret = EventsDim0::::empty(); - /* - for row in res.rows_typed_or_empty::<(i64, i64, Vec)>() { - let row = row.err_conv()?; - let ts = ts_msp + row.0 as u64; - let pulse = row.1 as u64; - let value = row.2.into_iter().map(|x| x as ST).collect(); - ret.push(ts, pulse, value); - } - */ - info!("found in total {} events ts_msp {}", ret.tss.len(), ts_msp); - Ok(ret) - } - }; -} - -read_next_scalar_values!(read_next_values_scalar_i8, i8, i8, "events_scalar_i8"); -read_next_scalar_values!(read_next_values_scalar_i16, i16, i16, "events_scalar_i16"); -read_next_scalar_values!(read_next_values_scalar_i32, i32, i32, "events_scalar_i32"); -read_next_scalar_values!(read_next_values_scalar_f32, f32, f32, "events_scalar_f32"); -read_next_scalar_values!(read_next_values_scalar_f64, f64, f64, "events_scalar_f64"); - -read_next_array_values!(read_next_values_array_u16, u16, i16, "events_wave_u16"); - pub async fn make_scylla_stream( evq: &PlainEventsQuery, do_one_before_range: bool,