From 9c1522f9bb84cb50965b36145812f0b9f5ed5e3d Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Mon, 27 Feb 2023 16:02:41 +0100 Subject: [PATCH] Improve scylla read --- items_0/src/items_0.rs | 2 + items_2/src/eventsdim0.rs | 8 ++ items_2/src/eventsdim1.rs | 8 ++ nodenet/src/conn.rs | 10 +- scyllaconn/src/events.rs | 196 ++++++++++++++++++++++++-------------- 5 files changed, 147 insertions(+), 77 deletions(-) diff --git a/items_0/src/items_0.rs b/items_0/src/items_0.rs index 9e81d46..e8a571d 100644 --- a/items_0/src/items_0.rs +++ b/items_0/src/items_0.rs @@ -178,6 +178,8 @@ pub trait Events: fn partial_eq_dyn(&self, other: &dyn Events) -> bool; fn serde_id(&self) -> &'static str; fn nty_id(&self) -> u32; + fn tss(&self) -> &VecDeque; + fn pulses(&self) -> &VecDeque; } pub trait EventsNonObj { diff --git a/items_2/src/eventsdim0.rs b/items_2/src/eventsdim0.rs index e1d534a..e2fba61 100644 --- a/items_2/src/eventsdim0.rs +++ b/items_2/src/eventsdim0.rs @@ -861,6 +861,14 @@ impl Events for EventsDim0 { fn clone_dyn(&self) -> Box { Box::new(self.clone()) } + + fn tss(&self) -> &VecDeque { + &self.tss + } + + fn pulses(&self) -> &VecDeque { + &self.pulses + } } pub struct EventsDim0TimeBinner { diff --git a/items_2/src/eventsdim1.rs b/items_2/src/eventsdim1.rs index f45b514..483ad89 100644 --- a/items_2/src/eventsdim1.rs +++ b/items_2/src/eventsdim1.rs @@ -815,6 +815,14 @@ impl Events for EventsDim1 { fn clone_dyn(&self) -> Box { Box::new(self.clone()) } + + fn tss(&self) -> &VecDeque { + &self.tss + } + + fn pulses(&self) -> &VecDeque { + &self.pulses + } } pub struct EventsDim1TimeBinner { diff --git a/nodenet/src/conn.rs b/nodenet/src/conn.rs index 7e452e9..638f2d6 100644 --- a/nodenet/src/conn.rs +++ b/nodenet/src/conn.rs @@ -119,6 +119,7 @@ async fn make_channel_events_stream( let stream = stream .map({ let agg_kind = evq.agg_kind_value(); + let mut pulse_last = None; move |item| match item { Ok(item) => { let x = if let AggKind::PulseIdDiff = agg_kind { @@ -126,11 +127,14 @@ async fn make_channel_events_stream( ChannelEvents::Events(item) => { let (tss, pulses) = items_0::EventsNonObj::into_tss_pulses(item); let mut item = items_2::eventsdim0::EventsDim0::empty(); - let mut pulse_last = pulses.front().map_or(0, |&x| x); for (ts, pulse) in tss.into_iter().zip(pulses) { - let value = pulse as i64 - pulse_last as i64; + let value = if let Some(last) = pulse_last { + pulse as i64 - last as i64 + } else { + 0 + }; item.push(ts, pulse, value); - pulse_last = pulse; + pulse_last = Some(pulse); } ChannelEvents::Events(Box::new(item)) } diff --git a/scyllaconn/src/events.rs b/scyllaconn/src/events.rs index 0514f9e..d33246a 100644 --- a/scyllaconn/src/events.rs +++ b/scyllaconn/src/events.rs @@ -15,6 +15,7 @@ use netpod::ScalarType; use netpod::Shape; use scylla::Session as ScySession; use std::collections::VecDeque; +use std::mem; use std::pin::Pin; use std::sync::Arc; use std::task::Context; @@ -29,13 +30,13 @@ async fn find_ts_msp( let mut ret1 = VecDeque::new(); let mut ret2 = VecDeque::new(); // TODO use prepared statements - let cql = "select ts_msp from ts_msp where series = ? and ts_msp <= ? order by ts_msp desc limit 2"; + let cql = "select ts_msp from ts_msp where series = ? and ts_msp < ? order by ts_msp desc limit 2"; let res = scy.query(cql, (series as i64, range.beg as i64)).await.err_conv()?; for row in res.rows_typed_or_empty::<(i64,)>() { let row = row.err_conv()?; ret1.push_front(row.0 as u64); } - let cql = "select ts_msp from ts_msp where series = ? and ts_msp > ? and ts_msp < ?"; + let cql = "select ts_msp from ts_msp where series = ? and ts_msp >= ? and ts_msp < ?"; let res = scy .query(cql, (series as i64, range.beg as i64, range.end as i64)) .await @@ -56,7 +57,7 @@ async fn find_ts_msp( macro_rules! read_next_scalar_values { ($fname:ident, $st:ty, $scyty:ty, $table_name:expr) => { - async fn $fname( + async fn $fname( series: u64, ts_msp: u64, range: NanoRange, @@ -65,12 +66,6 @@ macro_rules! read_next_scalar_values { ) -> Result, Error> { type ST = $st; type SCYTY = $scyty; - if ts_msp >= range.end { - warn!( - "given ts_msp {} >= range.end {} not necessary to read this", - ts_msp, range.end - ); - } if range.end > i64::MAX as u64 { return Err(Error::with_msg_no_trace(format!("range.end overflows i64"))); } @@ -107,6 +102,8 @@ macro_rules! read_next_scalar_values { let pulse = row.1 as u64; let value = row.2 as ST; if ts >= range.end { + // TODO count as logic error + error!("ts >= range.end"); } else if ts >= range.beg { ret.push(ts, pulse, value); } else { @@ -144,16 +141,19 @@ macro_rules! read_next_scalar_values { let ts = ts_msp + row.0 as u64; let pulse = row.1 as u64; let value = row.2 as ST; - if ts >= range.end { + if ts >= range.beg { + // TODO count as logic error + error!("ts >= range.beg"); } else if ts < range.beg { ret.push(ts, pulse, value); } else { - if !seen_before { - warn!("encounter event before range in forward read {ts}"); - } seen_before = true; } } + let _ = seen_before; + if ret.len() > 1 { + error!("multiple events in backwards search {}", ret.len()); + } ret }; trace!("found in total {} events ts_msp {}", ret.tss.len(), ts_msp); @@ -164,7 +164,7 @@ macro_rules! read_next_scalar_values { macro_rules! read_next_array_values { ($fname:ident, $st:ty, $scyty:ty, $table_name:expr) => { - async fn $fname( + async fn $fname( series: u64, ts_msp: u64, range: NanoRange, @@ -173,12 +173,6 @@ macro_rules! read_next_array_values { ) -> Result, Error> { type ST = $st; type SCYTY = $scyty; - if ts_msp >= range.end { - warn!( - "given ts_msp {} >= range.end {} not necessary to read this", - ts_msp, range.end - ); - } if range.end > i64::MAX as u64 { return Err(Error::with_msg_no_trace(format!("range.end overflows i64"))); } @@ -186,8 +180,7 @@ macro_rules! read_next_array_values { let ts_lsp_min = if ts_msp < range.beg { range.beg - ts_msp } else { 0 }; let ts_lsp_max = if ts_msp < range.end { range.end - ts_msp } else { 0 }; trace!( - "FWD {} ts_msp {} ts_lsp_min {} ts_lsp_max {} beg {} end {} {}", - stringify!($fname), + "FWD ts_msp {} ts_lsp_min {} ts_lsp_max {} beg {} end {} {}", ts_msp, ts_lsp_min, ts_lsp_max, @@ -216,6 +209,8 @@ macro_rules! read_next_array_values { let pulse = row.1 as u64; let value = row.2.into_iter().map(|x| x as ST).collect(); if ts >= range.end { + // TODO count as logic error + error!("ts >= range.end"); } else if ts >= range.beg { ret.push(ts, pulse, value); } else { @@ -254,16 +249,19 @@ macro_rules! read_next_array_values { let ts = ts_msp + row.0 as u64; let pulse = row.1 as u64; let value = row.2.into_iter().map(|x| x as ST).collect(); - if ts >= range.end { + if ts >= range.beg { + // TODO count as logic error + error!("ts >= range.beg"); } else if ts < range.beg { ret.push(ts, pulse, value); } else { - if !seen_before { - warn!("encounter event before range in forward read {ts}"); - } seen_before = true; } } + let _ = seen_before; + if ret.len() > 1 { + error!("multiple events in backwards search {}", ret.len()); + } ret }; trace!("found in total {} events ts_msp {}", ret.tss.len(), ts_msp); @@ -282,13 +280,23 @@ read_next_scalar_values!(read_next_values_scalar_i32, i32, i32, "events_scalar_i read_next_scalar_values!(read_next_values_scalar_i64, i64, i64, "events_scalar_i64"); read_next_scalar_values!(read_next_values_scalar_f32, f32, f32, "events_scalar_f32"); read_next_scalar_values!(read_next_values_scalar_f64, f64, f64, "events_scalar_f64"); +read_next_scalar_values!(read_next_values_scalar_bool, bool, bool, "events_scalar_bool"); +read_next_array_values!(read_next_values_array_u8, u8, i8, "events_array_u8"); read_next_array_values!(read_next_values_array_u16, u16, i16, "events_array_u16"); +read_next_array_values!(read_next_values_array_u32, u32, i32, "events_array_u32"); +read_next_array_values!(read_next_values_array_u64, u64, i64, "events_array_u64"); +read_next_array_values!(read_next_values_array_i8, i8, i8, "events_array_i8"); +read_next_array_values!(read_next_values_array_i16, i16, i16, "events_array_i16"); +read_next_array_values!(read_next_values_array_i32, i32, i32, "events_array_i32"); +read_next_array_values!(read_next_values_array_i64, i64, i64, "events_array_i64"); +read_next_array_values!(read_next_values_array_f32, f32, f32, "events_array_f32"); +read_next_array_values!(read_next_values_array_f64, f64, f64, "events_array_f64"); read_next_array_values!(read_next_values_array_bool, bool, bool, "events_array_bool"); macro_rules! read_values { - ($fname:ident, $self:expr, $ts_msp:expr) => {{ - let fut = $fname($self.series, $ts_msp, $self.range.clone(), $self.fwd, $self.scy.clone()); + ($fname:ident, $st:ty, $scyty:ty, $self:expr, $ts_msp:expr) => {{ + let fut = $fname::<$st, $scyty>($self.series, $ts_msp, $self.range.clone(), $self.fwd, $self.scy.clone()); let fut = fut.map(|x| match x { Ok(k) => { let self_name = std::any::type_name::(); @@ -356,34 +364,37 @@ impl ReadValues { let fut = match &self.shape { Shape::Scalar => match &self.scalar_type { ScalarType::U8 => { - read_values!(read_next_values_scalar_u8, self, ts_msp) + read_values!(read_next_values_scalar_u8, u8, i8, self, ts_msp) } ScalarType::U16 => { - read_values!(read_next_values_scalar_u16, self, ts_msp) + read_values!(read_next_values_scalar_u16, u16, i16, self, ts_msp) } ScalarType::U32 => { - read_values!(read_next_values_scalar_u32, self, ts_msp) + read_values!(read_next_values_scalar_u32, u32, i32, self, ts_msp) } ScalarType::U64 => { - read_values!(read_next_values_scalar_u64, self, ts_msp) + read_values!(read_next_values_scalar_u64, u64, i64, self, ts_msp) } ScalarType::I8 => { - read_values!(read_next_values_scalar_i8, self, ts_msp) + read_values!(read_next_values_scalar_i8, i8, i8, self, ts_msp) } ScalarType::I16 => { - read_values!(read_next_values_scalar_i16, self, ts_msp) + read_values!(read_next_values_scalar_i16, i16, i16, self, ts_msp) } ScalarType::I32 => { - read_values!(read_next_values_scalar_i32, self, ts_msp) + read_values!(read_next_values_scalar_i32, i32, i32, self, ts_msp) } ScalarType::I64 => { - read_values!(read_next_values_scalar_i64, self, ts_msp) + read_values!(read_next_values_scalar_i64, i64, i64, self, ts_msp) } ScalarType::F32 => { - read_values!(read_next_values_scalar_f32, self, ts_msp) + read_values!(read_next_values_scalar_f32, f32, f32, self, ts_msp) } ScalarType::F64 => { - read_values!(read_next_values_scalar_f64, self, ts_msp) + read_values!(read_next_values_scalar_f64, f64, f64, self, ts_msp) + } + ScalarType::BOOL => { + read_values!(read_next_values_scalar_bool, bool, bool, self, ts_msp) } _ => { error!("TODO ReadValues add more types"); @@ -391,12 +402,39 @@ impl ReadValues { } }, Shape::Wave(_) => match &self.scalar_type { + ScalarType::U8 => { + read_values!(read_next_values_array_u8, u8, i8, self, ts_msp) + } ScalarType::U16 => { - read_values!(read_next_values_array_u16, self, ts_msp) + read_values!(read_next_values_array_u16, u16, i16, self, ts_msp) + } + ScalarType::U32 => { + read_values!(read_next_values_array_u32, u32, i32, self, ts_msp) + } + ScalarType::U64 => { + read_values!(read_next_values_array_u64, u64, i64, self, ts_msp) + } + ScalarType::I8 => { + read_values!(read_next_values_array_i8, i8, i8, self, ts_msp) + } + ScalarType::I16 => { + read_values!(read_next_values_array_i16, i16, i16, self, ts_msp) + } + ScalarType::I32 => { + read_values!(read_next_values_array_i32, i32, i32, self, ts_msp) + } + ScalarType::I64 => { + read_values!(read_next_values_array_i64, i64, i64, self, ts_msp) + } + ScalarType::F32 => { + read_values!(read_next_values_array_f32, f32, f32, self, ts_msp) + } + ScalarType::F64 => { + read_values!(read_next_values_array_f64, f64, f64, self, ts_msp) } ScalarType::BOOL => { info!("attempt to read bool"); - read_values!(read_next_values_array_bool, self, ts_msp) + read_values!(read_next_values_array_bool, bool, bool, self, ts_msp) } _ => { error!("TODO ReadValues add more types"); @@ -418,6 +456,7 @@ enum FrState { ReadBack1(ReadValues), ReadBack2(ReadValues), ReadValues(ReadValues), + DataDone, Done, } @@ -427,13 +466,12 @@ pub struct EventsStreamScylla { scalar_type: ScalarType, shape: Shape, range: NanoRange, - #[allow(unused)] do_one_before_range: bool, - ts_msp_b1: Option, - ts_msp_b2: Option, - ts_msps: VecDeque, + ts_msp_bck: VecDeque, + ts_msp_fwd: VecDeque, scy: Arc, do_test_stream_error: bool, + found_one_after: bool, outqueue: VecDeque>, } @@ -454,28 +492,30 @@ impl EventsStreamScylla { shape, range, do_one_before_range, - ts_msp_b1: None, - ts_msp_b2: None, - ts_msps: VecDeque::new(), + ts_msp_bck: VecDeque::new(), + ts_msp_fwd: VecDeque::new(), scy, do_test_stream_error, + found_one_after: false, outqueue: VecDeque::new(), } } fn ts_msps_found(&mut self, msps1: VecDeque, msps2: VecDeque) { trace!("ts_msps_found msps1 {msps1:?} msps2 {msps2:?}"); - let mut msps1 = msps1; - self.ts_msp_b1 = msps1.pop_back(); - self.ts_msp_b2 = msps1.pop_back(); - self.ts_msps = msps2; - if let Some(x) = self.ts_msp_b1.clone() { - self.ts_msps.push_front(x); + self.ts_msp_bck = msps1; + self.ts_msp_fwd = msps2; + for x in self.ts_msp_bck.iter().rev() { + let x = x.clone(); + if x >= self.range.end { + info!("FOUND one-after because of MSP"); + self.found_one_after = true; + } + self.ts_msp_fwd.push_front(x); } - trace!("ts_msp_b1 {:?}", self.ts_msp_b1); - trace!("ts_msp_b2 {:?}", self.ts_msp_b2); - trace!("ts_msps {:?}", self.ts_msps); - if let Some(msp) = self.ts_msp_b1.clone() { + trace!("ts_msp_bck {:?}", self.ts_msp_bck); + trace!("ts_msp_fwd {:?}", self.ts_msp_fwd); + if let Some(msp) = self.ts_msp_bck.pop_back() { trace!("Try ReadBack1"); let st = ReadValues::new( self.series, @@ -487,20 +527,20 @@ impl EventsStreamScylla { self.scy.clone(), ); self.state = FrState::ReadBack1(st); - } else if self.ts_msps.len() >= 1 { + } else if self.ts_msp_fwd.len() > 0 { trace!("Go straight for forward read"); let st = ReadValues::new( self.series, self.scalar_type.clone(), self.shape.clone(), self.range.clone(), - self.ts_msps.clone(), + mem::replace(&mut self.ts_msp_fwd, VecDeque::new()), true, self.scy.clone(), ); self.state = FrState::ReadValues(st); } else { - self.state = FrState::Done; + self.state = FrState::DataDone; } } @@ -508,22 +548,22 @@ impl EventsStreamScylla { trace!("back_1_done item len {}", item.len()); if item.len() > 0 { self.outqueue.push_back(item); - if self.ts_msps.len() > 0 { + if self.ts_msp_fwd.len() > 0 { let st = ReadValues::new( self.series, self.scalar_type.clone(), self.shape.clone(), self.range.clone(), - self.ts_msps.clone(), + mem::replace(&mut self.ts_msp_fwd, VecDeque::new()), true, self.scy.clone(), ); self.state = FrState::ReadValues(st); } else { - self.state = FrState::Done; + self.state = FrState::DataDone; } } else { - if let Some(msp) = self.ts_msp_b2.clone() { + if let Some(msp) = self.ts_msp_bck.pop_back() { trace!("Try ReadBack2"); let st = ReadValues::new( self.series, @@ -535,20 +575,21 @@ impl EventsStreamScylla { self.scy.clone(), ); self.state = FrState::ReadBack2(st); - } else if self.ts_msps.len() >= 1 { + } else if self.ts_msp_fwd.len() > 0 { trace!("No 2nd back MSP, go for forward read"); let st = ReadValues::new( self.series, self.scalar_type.clone(), self.shape.clone(), self.range.clone(), - self.ts_msps.clone(), + mem::replace(&mut self.ts_msp_fwd, VecDeque::new()), true, self.scy.clone(), ); self.state = FrState::ReadValues(st); } else { - self.state = FrState::Done; + trace!("No 2nd back MSP, but also nothing to go forward"); + self.state = FrState::DataDone; } } } @@ -558,19 +599,19 @@ impl EventsStreamScylla { if item.len() > 0 { self.outqueue.push_back(item); } - if self.ts_msps.len() >= 1 { + if self.ts_msp_fwd.len() > 0 { let st = ReadValues::new( self.series, self.scalar_type.clone(), self.shape.clone(), self.range.clone(), - self.ts_msps.clone(), + mem::replace(&mut self.ts_msp_fwd, VecDeque::new()), true, self.scy.clone(), ); self.state = FrState::ReadValues(st); } else { - self.state = FrState::Done; + self.state = FrState::DataDone; } } } @@ -604,7 +645,7 @@ impl Stream for EventsStreamScylla { continue; } Ready(Err(e)) => { - self.state = FrState::Done; + self.state = FrState::DataDone; Ready(Some(Err(e))) } Pending => Pending, @@ -617,7 +658,7 @@ impl Stream for EventsStreamScylla { } Ready(Err(e)) => { st.fut_done = true; - self.state = FrState::Done; + self.state = FrState::DataDone; Ready(Some(Err(e))) } Pending => Pending, @@ -630,7 +671,7 @@ impl Stream for EventsStreamScylla { } Ready(Err(e)) => { st.fut_done = true; - self.state = FrState::Done; + self.state = FrState::DataDone; Ready(Some(Err(e))) } Pending => Pending, @@ -640,7 +681,7 @@ impl Stream for EventsStreamScylla { st.fut_done = true; if !st.next() { trace!("ReadValues exhausted"); - self.state = FrState::Done; + self.state = FrState::DataDone; } if item.len() > 0 { self.outqueue.push_back(item); @@ -653,6 +694,13 @@ impl Stream for EventsStreamScylla { } Pending => Pending, }, + FrState::DataDone => { + if self.found_one_after { + // TODO emit RangeComplete + } + self.state = FrState::Done; + continue; + } FrState::Done => Ready(None), }; }