diff --git a/crates/items_2/src/channelevents.rs b/crates/items_2/src/channelevents.rs index 8ac5a03..dddeab7 100644 --- a/crates/items_2/src/channelevents.rs +++ b/crates/items_2/src/channelevents.rs @@ -809,6 +809,13 @@ impl Mergeable for ChannelEvents { } } } + + fn tss(&self) -> Vec { + Events::tss(self) + .iter() + .map(|x| netpod::TsMs::from_ns_u64(*x)) + .collect() + } } impl RangeOverlapInfo for ChannelEvents { diff --git a/crates/items_2/src/eventfull.rs b/crates/items_2/src/eventfull.rs index c285313..af599b5 100644 --- a/crates/items_2/src/eventfull.rs +++ b/crates/items_2/src/eventfull.rs @@ -256,6 +256,10 @@ impl Mergeable for EventFull { } None } + + fn tss(&self) -> Vec { + self.tss.iter().map(|x| netpod::TsMs::from_ns_u64(*x)).collect() + } } #[derive(Debug, ThisError, Serialize, Deserialize)] diff --git a/crates/items_2/src/eventsdim0.rs b/crates/items_2/src/eventsdim0.rs index b9a50b2..0bcd03b 100644 --- a/crates/items_2/src/eventsdim0.rs +++ b/crates/items_2/src/eventsdim0.rs @@ -112,6 +112,12 @@ impl EventsDim0 { std::any::type_name::() } + pub fn push_back(&mut self, ts: u64, pulse: u64, value: STY) { + self.tss.push_back(ts); + self.pulses.push_back(pulse); + self.values.push_back(value); + } + pub fn push_front(&mut self, ts: u64, pulse: u64, value: STY) { self.tss.push_front(ts); self.pulses.push_front(pulse); @@ -858,11 +864,21 @@ impl Events for EventsDim0 { fn output_info(&self) -> String { let n2 = self.tss.len().max(1) - 1; + let min = if let Some(ts) = self.tss.get(0) { + TsNano::from_ns(*ts).fmt().to_string() + } else { + String::from("None") + }; + let max = if let Some(ts) = self.tss.get(n2) { + TsNano::from_ns(*ts).fmt().to_string() + } else { + String::from("None") + }; format!( "EventsDim0OutputInfo {{ len {}, ts_min {}, ts_max {} }}", self.tss.len(), - self.tss.get(0).map_or(-1i64, |&x| x as i64), - self.tss.get(n2).map_or(-1i64, |&x| x as i64), + min, + max, ) } diff --git a/crates/items_2/src/items_2.rs b/crates/items_2/src/items_2.rs index f379953..5157262 100644 --- a/crates/items_2/src/items_2.rs +++ b/crates/items_2/src/items_2.rs @@ -189,6 +189,13 @@ impl Mergeable for Box { fn find_highest_index_lt(&self, ts: u64) -> Option { self.as_ref().find_highest_index_lt_evs(ts) } + + fn tss(&self) -> Vec { + Events::tss(self) + .iter() + .map(|x| netpod::TsMs::from_ns_u64(*x)) + .collect() + } } // TODO rename to `Typed` diff --git a/crates/items_2/src/merger.rs b/crates/items_2/src/merger.rs index c9ae57b..ff21117 100644 --- a/crates/items_2/src/merger.rs +++ b/crates/items_2/src/merger.rs @@ -15,6 +15,7 @@ use items_0::Events; use items_0::MergeError; use items_0::WithLen; use netpod::log::*; +use netpod::TsMs; use std::collections::VecDeque; use std::fmt; use std::ops::ControlFlow; @@ -53,6 +54,8 @@ pub trait Mergeable: fmt::Debug + WithLen + ByteEstimate + Unpin { fn find_lowest_index_gt(&self, ts: u64) -> Option; fn find_lowest_index_ge(&self, ts: u64) -> Option; fn find_highest_index_lt(&self, ts: u64) -> Option; + // TODO only for testing: + fn tss(&self) -> Vec; } type MergeInp = Pin> + Send>>; diff --git a/crates/netpod/src/netpod.rs b/crates/netpod/src/netpod.rs index 9efd751..dd21331 100644 --- a/crates/netpod/src/netpod.rs +++ b/crates/netpod/src/netpod.rs @@ -118,9 +118,15 @@ pub const DATETIME_FMT_9MS: &str = "%Y-%m-%dT%H:%M:%S.%9fZ"; const TEST_BACKEND: &str = "testbackend-00"; #[allow(non_upper_case_globals)] -pub const trigger: [&'static str; 1] = [ +pub const trigger: [&'static str; 2] = [ // "S30CB05-VMCP-A010:PRESSURE", + "ATSRF-CAV:TUN-DETUNING-REL-ACT", +]; + +pub const TRACE_SERIES_ID: [u64; 1] = [ + // + 4985969403507503043, ]; pub struct OnDrop @@ -1784,8 +1790,8 @@ impl TsNano { Self(ns) } - pub const fn from_ms(ns: u64) -> Self { - Self(1000000 * ns) + pub const fn from_ms(ms: u64) -> Self { + Self(1000000 * ms) } pub const fn ns(&self) -> u64 { @@ -1829,6 +1835,10 @@ impl TsNano { let x = tsunix.as_secs() * 1000000000 + tsunix.subsec_nanos() as u64; Self::from_ns(x) } + + pub fn fmt(&self) -> TsNanoFmt { + TsNanoFmt { ts: self.clone() } + } } impl fmt::Debug for TsNano { @@ -1853,6 +1863,25 @@ impl fmt::Display for TsNano { } } +pub struct TsNanoFmt { + ts: TsNano, +} + +impl fmt::Display for TsNanoFmt { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + chrono::DateTime::from_timestamp_millis(self.ts.ms() as i64) + .unwrap() + .format(DATETIME_FMT_3MS) + .fmt(fmt) + } +} + +impl fmt::Debug for TsNanoFmt { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt::Display::fmt(self, fmt) + } +} + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, PartialOrd)] pub struct PulseId(u64); @@ -2677,6 +2706,20 @@ impl TsMs { let lsp = DtMs(self.0 - msp.0); (msp, lsp) } + + pub fn bump_epsilon(&self) -> TsMs { + Self(self.0 + 1) + } + + pub fn fmt(&self) -> TsMsFmt { + TsMsFmt { ts: self.clone() } + } +} + +impl AsRef for TsMs { + fn as_ref(&self) -> &TsMs { + &self + } } impl fmt::Display for TsMs { @@ -2693,6 +2736,45 @@ impl core::ops::Sub for TsMs { } } +pub struct TsMsFmt { + ts: TsMs, +} + +impl fmt::Debug for TsMsFmt { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + chrono::DateTime::from_timestamp_millis(self.ts.ms() as i64) + .unwrap() + .format(DATETIME_FMT_3MS) + .fmt(fmt) + } +} + +impl fmt::Display for TsMsFmt { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + chrono::DateTime::from_timestamp_millis(self.ts.ms() as i64) + .unwrap() + .format(DATETIME_FMT_3MS) + .fmt(fmt) + } +} + +pub struct TsMsVecFmt(pub I); + +impl fmt::Display for TsMsVecFmt +where + I: Clone + IntoIterator, + T: AsRef, +{ + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "[")?; + for ts in self.0.clone().into_iter() { + write!(fmt, " {}", ts.as_ref().fmt())?; + } + write!(fmt, " ]")?; + Ok(()) + } +} + pub trait RetStreamExt: Stream { fn only_first_error(self) -> OnlyFirstError where diff --git a/crates/netpod/src/range/evrange.rs b/crates/netpod/src/range/evrange.rs index cfab2c3..05ff755 100644 --- a/crates/netpod/src/range/evrange.rs +++ b/crates/netpod/src/range/evrange.rs @@ -29,11 +29,15 @@ pub struct NanoRange { } impl fmt::Debug for NanoRange { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { if true { let beg = TsNano(self.beg); let end = TsNano(self.end); - f.debug_struct("NanoRange") + write!(fmt, "NanoRange {{ beg: {}, end: {} }}", beg.fmt(), end.fmt()) + } else if false { + let beg = TsNano(self.beg); + let end = TsNano(self.end); + fmt.debug_struct("NanoRange") .field("beg", &beg) .field("end", &end) .finish() @@ -45,9 +49,9 @@ impl fmt::Debug for NanoRange { .timestamp_opt((self.end / SEC) as i64, (self.end % SEC) as u32) .earliest(); if let (Some(a), Some(b)) = (beg, end) { - f.debug_struct("NanoRange").field("beg", &a).field("end", &b).finish() + fmt.debug_struct("NanoRange").field("beg", &a).field("end", &b).finish() } else { - f.debug_struct("NanoRange") + fmt.debug_struct("NanoRange") .field("beg", &beg) .field("end", &end) .finish() diff --git a/crates/scyllaconn/src/events.rs b/crates/scyllaconn/src/events.rs index e0f89bb..7647f28 100644 --- a/crates/scyllaconn/src/events.rs +++ b/crates/scyllaconn/src/events.rs @@ -410,7 +410,7 @@ where }; trace!( "FWD ts_msp {} ts_lsp_min {} ts_lsp_max {} {}", - ts_msp, + ts_msp.fmt(), ts_lsp_min, ts_lsp_max, table_name, @@ -441,7 +441,12 @@ where } else { DtNano::from_ns(0) }; - trace!("BCK ts_msp {} ts_lsp_max {} {}", ts_msp, ts_lsp_max, table_name,); + trace!( + "BCK ts_msp {} ts_lsp_max {} {}", + ts_msp.fmt(), + ts_lsp_max, + table_name, + ); let qu = stmts .rt(&opts.rt) .lsp(!opts.fwd, with_values) @@ -461,7 +466,7 @@ where } ret }; - trace!("read ts_msp {:?} len {}", ts_msp, ret.len()); + trace!("read ts_msp {} len {}", ts_msp.fmt(), ret.len()); let ret = Box::new(ret); Ok(ret) } diff --git a/crates/scyllaconn/src/events2/events.rs b/crates/scyllaconn/src/events2/events.rs index b558f3c..2c0b87f 100644 --- a/crates/scyllaconn/src/events2/events.rs +++ b/crates/scyllaconn/src/events2/events.rs @@ -18,6 +18,8 @@ use netpod::EnumVariant; use netpod::ScalarType; use netpod::Shape; use netpod::TsMs; +use netpod::TsMsVecFmt; +use netpod::TsNano; use series::SeriesId; use std::collections::VecDeque; use std::pin::Pin; @@ -184,7 +186,14 @@ impl EventsStreamRt { ); let scalar_type = self.ch_conf.scalar_type().clone(); let shape = self.ch_conf.shape().clone(); - trace_fetch!("make_read_events_fut bck {} {:?} {:?}", bck, shape, scalar_type); + trace_fetch!( + "make_read_events_fut bck {} msp {:?} {} {:?} {:?}", + bck, + ts_msp, + ts_msp.fmt(), + shape, + scalar_type + ); let fut = async move { let ret = match &shape { Shape::Scalar => match &scalar_type { @@ -241,10 +250,10 @@ impl EventsStreamRt { } fn transition_to_bck_read(&mut self) { - trace_fetch!("transition_to_bck_read"); + trace_fetch!("transition_to_bck_read A {}", TsMsVecFmt(&self.msp_buf)); for ts in self.msp_buf.iter() { if ts.ns() < self.range.beg() { - self.msp_buf_bck.push_front(ts.clone()); + self.msp_buf_bck.push_back(ts.clone()); } } let c = self.msp_buf.iter().take_while(|x| x.ns() < self.range.beg()).count(); @@ -252,12 +261,17 @@ impl EventsStreamRt { for _ in 0..g { self.msp_buf.pop_front(); } + trace_fetch!( + "transition_to_bck_read B {} {}", + TsMsVecFmt(&self.msp_buf_bck), + TsMsVecFmt(&self.msp_buf) + ); self.setup_bck_read(); } fn setup_bck_read(&mut self) { - trace_fetch!("setup_bck_read"); - if let Some(ts) = self.msp_buf_bck.pop_front() { + if let Some(ts) = self.msp_buf_bck.pop_back() { + trace_fetch!("setup_bck_read {}", ts.fmt()); let scyqueue = self.scyqueue.clone(); let fut = self.make_read_events_fut(ts, true, scyqueue); self.state = State::ReadingBck(ReadingBck { @@ -265,6 +279,7 @@ impl EventsStreamRt { reading_state: ReadingState::FetchEvents(FetchEvents { fut }), }); } else { + trace_fetch!("setup_bck_read no msp"); self.transition_to_fwd_read(); } } @@ -272,12 +287,13 @@ impl EventsStreamRt { fn transition_to_fwd_read(&mut self) { trace_fetch!("transition_to_fwd_read"); self.msp_buf_bck = VecDeque::new(); + trace_fetch!("transition_to_fwd_read {}", TsMsVecFmt(&self.msp_buf)); self.setup_fwd_read(); } fn setup_fwd_read(&mut self) { if let Some(ts) = self.msp_buf.pop_front() { - trace_fetch!("setup_fwd_read {ts}"); + trace_fetch!("setup_fwd_read {}", ts.fmt()); let scyqueue = self.scyqueue.clone(); let fut = self.make_read_events_fut(ts, false, scyqueue); self.state = State::ReadingFwd(ReadingFwd { @@ -392,7 +408,7 @@ impl Stream for EventsStreamRt { State::ReadingBck(st) => match &mut st.reading_state { ReadingState::FetchMsp(st2) => match st2.fut.poll_unpin(cx) { Ready(Some(Ok(ts))) => { - trace_fetch!("ReadingBck FetchMsp {:?}", ts); + trace_fetch!("ReadingBck FetchMsp {}", ts.fmt()); self.msp_buf.push_back(ts); if ts.ns() >= self.range.beg() { self.transition_to_bck_read(); @@ -415,7 +431,10 @@ impl Stream for EventsStreamRt { ReadingState::FetchEvents(st2) => match st2.fut.poll_unpin(cx) { Ready(Ok(mut x)) => { use items_2::merger::Mergeable; - trace_fetch!("ReadingBck FetchEvents got len {:?}", x.len()); + trace_fetch!("ReadingBck FetchEvents got len {}", x.len()); + for ts in Mergeable::tss(&x) { + trace_fetch!("ReadingBck FetchEvents ts {}", ts.fmt()); + } if let Some(ix) = Mergeable::find_highest_index_lt(&x, self.range.beg().ns()) { trace_fetch!("ReadingBck FetchEvents find_highest_index_lt {:?}", ix); let mut y = Mergeable::new_empty(&x); @@ -447,6 +466,7 @@ impl Stream for EventsStreamRt { State::ReadingFwd(st) => match &mut st.reading_state { ReadingState::FetchMsp(st2) => match st2.fut.poll_unpin(cx) { Ready(Some(Ok(ts))) => { + trace_fetch!("ReadingFwd FetchMsp {}", ts.fmt()); self.msp_buf.push_back(ts); self.setup_fwd_read(); continue; @@ -460,6 +480,11 @@ impl Stream for EventsStreamRt { }, ReadingState::FetchEvents(st2) => match st2.fut.poll_unpin(cx) { Ready(Ok(x)) => { + trace_fetch!("ReadingFwd FetchEvents got len {:?}", x.len()); + for ts_ns in x.tss() { + let ts = TsNano::from_ns(*ts_ns).to_ts_ms(); + trace_fetch!("ReadingFwd FetchEvents ts {}", ts.fmt()); + } self.out.push_back(x); self.setup_fwd_read(); continue; diff --git a/crates/scyllaconn/src/events2/firstbefore.rs b/crates/scyllaconn/src/events2/firstbefore.rs index 1fe24d4..6a18b2e 100644 --- a/crates/scyllaconn/src/events2/firstbefore.rs +++ b/crates/scyllaconn/src/events2/firstbefore.rs @@ -119,7 +119,7 @@ where Ready(Some(Err(e))) } else { // Separate events into before and bulk - let tss = item.tss(); + let tss = Events::tss(&item); let pp = tss.partition_point(|&x| x < self.ts0.ns()); trace_transition!("partition_point {pp:?} {n:?}", n = tss.len()); if pp > item.len() { diff --git a/crates/scyllaconn/src/events2/msp.rs b/crates/scyllaconn/src/events2/msp.rs index df913c0..e2dfcb8 100644 --- a/crates/scyllaconn/src/events2/msp.rs +++ b/crates/scyllaconn/src/events2/msp.rs @@ -1,6 +1,7 @@ use super::prepare::StmtsEvents; use crate::range::ScyllaSeriesRange; use crate::worker::ScyllaQueue; +use core::fmt; use err::thiserror; use err::ThisError; use futures_util::Future; @@ -10,6 +11,7 @@ use futures_util::StreamExt; use netpod::log::*; use netpod::ttl::RetentionTime; use netpod::TsMs; +use netpod::TsMsVecFmt; use scylla::Session; use series::SeriesId; use std::collections::VecDeque; @@ -17,6 +19,15 @@ use std::pin::Pin; use std::task::Context; use std::task::Poll; +#[allow(unused)] +macro_rules! trace_emit { + ($det:expr, $($arg:tt)*) => { + if $det { + trace!($($arg)*); + } + }; +} + #[derive(Debug, ThisError)] #[cstm(name = "EventsMsp")] pub enum Error { @@ -53,6 +64,14 @@ where Resolvable::Taken => None, } } + + fn is_taken(&self) -> bool { + if let Self::Taken = self { + true + } else { + false + } + } } struct BckAndFirstFwd { @@ -60,9 +79,13 @@ struct BckAndFirstFwd { fut_fwd: Resolvable, crate::worker::Error>> + Send>>>, } +struct Fwd { + fut_fwd: Resolvable, crate::worker::Error>> + Send>>>, +} + enum State { BckAndFirstFwd(BckAndFirstFwd), - InputDone, + Fwd(Fwd), } #[pin_project::pin_project] @@ -70,9 +93,12 @@ pub struct MspStreamRt { rt: RetentionTime, series: SeriesId, range: ScyllaSeriesRange, + no_more: bool, #[pin] state: State, out: VecDeque, + scyqueue: ScyllaQueue, + do_trace_detail: bool, } impl MspStreamRt { @@ -91,17 +117,45 @@ impl MspStreamRt { let range = range.clone(); async move { scyqueue.find_ts_msp(rt, series.id(), range, false).await } }; + let do_trace_detail = netpod::TRACE_SERIES_ID.contains(&series.id()); + info!("------------------------------------- TEST INFO"); + trace_emit!(do_trace_detail, "------------------------------------- TEST TRACE"); Self { rt, series, range, + no_more: false, state: State::BckAndFirstFwd(BckAndFirstFwd { fut_bck: Resolvable::Future(Box::pin(fut_bck)), fut_fwd: Resolvable::Future(Box::pin(fut_fwd)), }), out: VecDeque::new(), + scyqueue, + do_trace_detail, } } + + fn next_fwd_fut( + &mut self, + ) -> Resolvable, crate::worker::Error>> + Send>>> { + let range = if let Some(msp) = self.out.back() { + let x = ScyllaSeriesRange::new(msp.bump_epsilon().ns(), self.range.end()); + trace_emit!(self.do_trace_detail, "next_fwd_fut {}", x.fmt()); + x + } else { + // should not get here + let x = ScyllaSeriesRange::new(self.range.end(), self.range.end()); + trace_emit!(self.do_trace_detail, "next_fwd_fut NOTHING IN BUFFER {}", x.fmt()); + x + }; + let fut_fwd = { + let scyqueue = self.scyqueue.clone(); + let rt = self.rt.clone(); + let series = self.series.clone(); + async move { scyqueue.find_ts_msp(rt, series.id(), range, false).await } + }; + Resolvable::Future(Box::pin(fut_fwd)) + } } impl Stream for MspStreamRt { @@ -109,6 +163,7 @@ impl Stream for MspStreamRt { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; + let trdet = self.do_trace_detail; loop { break match &mut self.state { State::BckAndFirstFwd(st) => { @@ -117,6 +172,7 @@ impl Stream for MspStreamRt { match rsv { Resolvable::Future(fut) => match fut.poll_unpin(cx) { Ready(x) => { + trace_emit!(trdet, "bck resolved {x:?}"); *rsv = Resolvable::Output(x); } Pending => { @@ -129,6 +185,7 @@ impl Stream for MspStreamRt { match rsv { Resolvable::Future(fut) => match fut.poll_unpin(cx) { Ready(x) => { + trace_emit!(trdet, "fwd resolved {x:?}"); *rsv = Resolvable::Output(x); } Pending => { @@ -142,7 +199,6 @@ impl Stream for MspStreamRt { } else { let taken_bck = st.fut_bck.take(); let taken_fwd = st.fut_fwd.take(); - self.state = State::InputDone; if let (Some(taken_bck), Some(taken_fwd)) = (taken_bck, taken_fwd) { match taken_bck { Ok(v1) => match taken_fwd { @@ -153,6 +209,10 @@ impl Stream for MspStreamRt { for e in v2 { self.out.push_back(e) } + trace_emit!(trdet, "ready out {}", TsMsVecFmt(self.out.iter())); + self.state = State::Fwd(Fwd { + fut_fwd: self.next_fwd_fut(), + }); continue; } Err(e) => Ready(Some(Err(e.into()))), @@ -164,10 +224,62 @@ impl Stream for MspStreamRt { } } } - State::InputDone => { - if let Some(x) = self.out.pop_front() { - Ready(Some(Ok(x))) + State::Fwd(st) => { + // TODO check if more input is coming + let mut have_pending = false; + let mut have_progress = false; + let rsv = &mut st.fut_fwd; + match rsv { + Resolvable::Future(fut) => match fut.poll_unpin(cx) { + Ready(Ok(x)) => { + *rsv = Resolvable::Taken; + trace_emit!(trdet, "bulk fwd resolved {x:?}"); + if x.len() == 0 { + self.no_more = true; + } + for e in x { + self.out.push_back(e) + } + } + Ready(Err(e)) => { + trace_emit!(trdet, "bulk fwd error {e}"); + *rsv = Resolvable::Taken; + error!("{e}"); + } + Pending => { + trace_emit!(trdet, "bulk fwd Pending"); + have_pending = true; + } + }, + Resolvable::Output(..) => { + trace_emit!(trdet, "bulk fwd Output"); + } + Resolvable::Taken => { + trace_emit!(trdet, "bulk fwd Taken"); + } + } + let is_taken = if let State::Fwd(st2) = &self.state { + st2.fut_fwd.is_taken() } else { + panic!("logic"); + }; + if self.out.len() < 3 && self.no_more == false && is_taken { + trace_emit!(trdet, "bulk fwd low in out make next fut ++++++++++++++++++++"); + self.state = State::Fwd(Fwd { + fut_fwd: self.next_fwd_fut(), + }); + have_progress = true; + } + if let Some(x) = self.out.pop_front() { + trace_emit!(trdet, "State::Fwd => Some emit {}", x.fmt()); + Ready(Some(Ok(x))) + } else if have_progress { + continue; + } else if have_pending { + trace_emit!(trdet, "State::Fwd => Pending !!!!!!!!!!!!!!!!!!!!!!!"); + Pending + } else { + trace_emit!(trdet, "State::Fwd => None !!!!!!!!!!!!!!!!!!!!!!"); Ready(None) } } @@ -200,7 +312,13 @@ pub async fn find_ts_msp( stmts: &StmtsEvents, scy: &Session, ) -> Result, Error> { - trace!("find_ts_msp series {:?} {:?} {:?} bck {}", rt, series, range, bck); + trace!( + "find_ts_msp series {:?} {:?} {} bck {}", + rt, + series, + range.fmt(), + bck + ); if bck { find_ts_msp_bck(rt, series, range, stmts, scy).await } else { diff --git a/crates/scyllaconn/src/events2/prepare.rs b/crates/scyllaconn/src/events2/prepare.rs index 661ec6d..6686124 100644 --- a/crates/scyllaconn/src/events2/prepare.rs +++ b/crates/scyllaconn/src/events2/prepare.rs @@ -113,7 +113,7 @@ async fn make_msp_dir(ks: &str, rt: &RetentionTime, bck: bool, scy: &Session) -> let select_cond = if bck { "ts_msp < ? order by ts_msp desc limit 2" } else { - "ts_msp >= ? and ts_msp < ?" + "ts_msp >= ? and ts_msp < ? limit 20" }; let cql = format!( "select ts_msp from {}.{}{} where series = ? and {}", diff --git a/crates/scyllaconn/src/range.rs b/crates/scyllaconn/src/range.rs index 40859fb..c7526c3 100644 --- a/crates/scyllaconn/src/range.rs +++ b/crates/scyllaconn/src/range.rs @@ -1,3 +1,4 @@ +use core::fmt; use netpod::range::evrange::SeriesRange; use netpod::TsNano; @@ -8,6 +9,13 @@ pub struct ScyllaSeriesRange { } impl ScyllaSeriesRange { + pub fn new(beg: TsNano, end: TsNano) -> Self { + Self { + beg: beg.ns(), + end: end.ns(), + } + } + pub fn beg(&self) -> TsNano { TsNano::from_ns(self.beg) } @@ -15,6 +23,10 @@ impl ScyllaSeriesRange { pub fn end(&self) -> TsNano { TsNano::from_ns(self.end) } + + pub fn fmt(&self) -> ScyllaSeriesRangeFmt { + ScyllaSeriesRangeFmt { val: self.clone() } + } } impl From<&SeriesRange> for ScyllaSeriesRange { @@ -25,3 +37,18 @@ impl From<&SeriesRange> for ScyllaSeriesRange { } } } + +pub struct ScyllaSeriesRangeFmt { + val: ScyllaSeriesRange, +} + +impl fmt::Display for ScyllaSeriesRangeFmt { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!( + fmt, + "ScyllaSeriesRange {{ beg: {}, end: {} }}", + self.val.beg().fmt(), + self.val.end().fmt() + ) + } +} diff --git a/crates/streams/src/rangefilter2.rs b/crates/streams/src/rangefilter2.rs index e4101fa..beb9620 100644 --- a/crates/streams/src/rangefilter2.rs +++ b/crates/streams/src/rangefilter2.rs @@ -10,11 +10,27 @@ use items_2::merger::Mergeable; use netpod::log::*; use netpod::range::evrange::NanoRange; use netpod::RangeFilterStats; +use netpod::TsMsVecFmt; +use netpod::TsNano; use std::fmt; use std::pin::Pin; use std::task::Context; use std::task::Poll; +#[cfg(test)] +use items_0::Events; +#[cfg(test)] +use std::collections::VecDeque; + +#[allow(unused)] +macro_rules! trace_emit { + ($det:expr, $($arg:tt)*) => { + if $det { + eprintln!($($arg)*); + } + }; +} + pub struct RangeFilter2 where S: Stream> + Unpin, @@ -26,12 +42,12 @@ where one_before_range: bool, stats: RangeFilterStats, slot1: Option, - slot2: Option, have_range_complete: bool, - data_done: bool, + inp_done: bool, raco_done: bool, done: bool, complete: bool, + trdet: bool, } impl RangeFilter2 @@ -44,7 +60,9 @@ where } pub fn new(inp: S, range: NanoRange, one_before_range: bool) -> Self { - trace!( + let trdet = false; + trace_emit!( + trdet, "{}::new range: {:?} one_before_range {:?}", Self::type_name(), range, @@ -57,12 +75,12 @@ where one_before_range, stats: RangeFilterStats::new(), slot1: None, - slot2: None, have_range_complete: false, - data_done: false, + inp_done: false, raco_done: false, done: false, complete: false, + trdet, } } @@ -98,21 +116,32 @@ where } fn handle_item(&mut self, item: ITY) -> Result { + let min = item.ts_min().map(|x| TsNano::from_ns(x).fmt()); + let max = item.ts_max().map(|x| TsNano::from_ns(x).fmt()); + trace_emit!( + self.trdet, + "see event len {} min {:?} max {:?}", + item.len(), + min, + max + ); let mut item = self.prune_high(item, self.range.end)?; let ret = if self.one_before_range { - match item.find_lowest_index_ge(self.range.beg) { - Some(ilge) => { - if ilge == 0 { + let lige = item.find_lowest_index_ge(self.range.beg); + trace_emit!(self.trdet, "YES one_before_range ilge {:?}", lige); + match lige { + Some(lige) => { + if lige == 0 { if let Some(sl1) = self.slot1.take() { - self.slot2 = Some(item); + self.slot1 = Some(item); sl1 } else { item } } else { - trace!("discarding events len {:?}", ilge - 1); + trace_emit!(self.trdet, "discarding events len {:?}", lige - 1); let mut dummy = item.new_empty(); - item.drain_into(&mut dummy, (0, ilge - 1)) + item.drain_into(&mut dummy, (0, lige - 1)) .map_err(|e| format!("{e} unexpected MergeError while remove of items"))?; self.slot1 = None; item @@ -120,7 +149,7 @@ where } None => { // TODO keep stats about this case - debug!("drain into to keep one before"); + trace_emit!(self.trdet, "drain into to keep one before"); let n = item.len(); let mut keep = item.new_empty(); item.drain_into(&mut keep, (n.max(1) - 1, n)) @@ -130,10 +159,12 @@ where } } } else { - match item.find_lowest_index_ge(self.range.beg) { - Some(ilge) => { + let lige = item.find_lowest_index_ge(self.range.beg); + trace_emit!(self.trdet, "NOT one_before_range ilge {:?}", lige); + match lige { + Some(lige) => { let mut dummy = item.new_empty(); - item.drain_into(&mut dummy, (0, ilge))?; + item.drain_into(&mut dummy, (0, lige))?; item } None => { @@ -168,23 +199,25 @@ where let k = std::mem::replace(&mut self.stats, RangeFilterStats::new()); let k = StatsItem::RangeFilterStats(k); Ready(Some(Ok(StreamItem::Stats(k)))) - } else if self.data_done { + } else if self.inp_done { self.raco_done = true; if self.have_range_complete { Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) } else { continue; } - } else if let Some(sl2) = self.slot2.take() { - Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(sl2))))) } else { match self.inp.poll_next_unpin(cx) { Ready(Some(item)) => match item { Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) => match self.handle_item(item) { - Ok(item) => Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))))), + Ok(item) => { + trace_emit!(self.trdet, "emit {}", TsMsVecFmt(Mergeable::tss(&item).iter())); + let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))); + Ready(Some(item)) + } Err(e) => { error!("sees: {e}"); - self.data_done = true; + self.inp_done = true; Ready(Some(Err(e))) } }, @@ -195,8 +228,12 @@ where k => Ready(Some(k)), }, Ready(None) => { - self.data_done = true; - continue; + self.inp_done = true; + if let Some(sl1) = self.slot1.take() { + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(sl1))))) + } else { + continue; + } } Pending => Pending, } @@ -216,7 +253,7 @@ where let span1 = span!(Level::INFO, "RangeFilter2", range = tracing::field::Empty); span1.record("range", &self.range_str.as_str()); let _spg = span1.enter(); - Self::poll_next(self, cx) + RangeFilter2::poll_next(self, cx) } } @@ -240,3 +277,260 @@ where debug!("drop {:?}", self); } } + +#[test] +fn test_00() { + use items_0::Empty; + use items_2::eventsdim0::EventsDim0; + let ms = 1_000_000; + let beg = TsNano::from_ms(1000 * 10); + let end = TsNano::from_ms(1000 * 20); + let mut item1 = EventsDim0::::empty(); + item1.push_back(beg.ns() + 0 * ms, 0, 3.); + item1.push_back(beg.ns() + 1 * ms, 0, 3.1); + item1.push_back(beg.ns() + 2 * ms, 0, 3.2); + item1.push_back(beg.ns() + 3 * ms, 0, 3.3); + item1.push_back(beg.ns() + 4 * ms, 0, 3.4); + item1.push_back(end.ns() - 1, 0, 4.0); + item1.push_back(end.ns() + 0, 0, 4.1); + item1.push_back(end.ns() + 1, 0, 4.1); + let w1: Box = Box::new(item1.clone()); + let e1 = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w1))); + let inp = futures_util::stream::iter([e1]); + let one_before_range = false; + let range = NanoRange::from((beg.ns(), end.ns())); + let stream = RangeFilter2::new(inp, range, one_before_range); + let fut = async move { + let tss_items = fetch_into_tss_items(stream).await; + let exp: &[&[u64]] = &[&[ + beg.ns() + 0 * ms, + beg.ns() + 1 * ms, + beg.ns() + 2 * ms, + beg.ns() + 3 * ms, + beg.ns() + 4 * ms, + end.ns() - 1, + ]]; + assert_eq!(&tss_items, &exp); + Ok::<_, Error>(()) + }; + taskrun::run(fut).unwrap(); +} + +#[test] +fn test_cut_before_00() { + use items_0::Empty; + use items_2::eventsdim0::EventsDim0; + let ms = 1_000_000; + let beg = TsNano::from_ms(1000 * 10); + let end = TsNano::from_ms(1000 * 20); + let mut items = Vec::new(); + { + let mut item = EventsDim0::::empty(); + item.push_back(beg.ns() - 1, 0, 2.9); + let w: Box = Box::new(item.clone()); + let e: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w))); + items.push(e); + } + { + let mut item = EventsDim0::::empty(); + item.push_back(beg.ns() + 0 * ms, 0, 3.); + item.push_back(beg.ns() + 1 * ms, 0, 3.1); + item.push_back(beg.ns() + 2 * ms, 0, 3.2); + item.push_back(beg.ns() + 3 * ms, 0, 3.3); + item.push_back(beg.ns() + 4 * ms, 0, 3.4); + item.push_back(end.ns() - 1, 0, 4.0); + item.push_back(end.ns() + 0, 0, 4.1); + item.push_back(end.ns() + 1, 0, 4.1); + let w: Box = Box::new(item.clone()); + let e: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w))); + items.push(e); + } + let inp = futures_util::stream::iter(items); + let one_before_range = false; + let range = NanoRange::from((beg.ns(), end.ns())); + let stream = RangeFilter2::new(inp, range, one_before_range); + let fut = async move { + let tss_items = fetch_into_tss_items(stream).await; + let exp: &[&[u64]] = &[ + // TODO in the future this empty may be discarded + &[], + &[ + beg.ns() + 0 * ms, + beg.ns() + 1 * ms, + beg.ns() + 2 * ms, + beg.ns() + 3 * ms, + beg.ns() + 4 * ms, + end.ns() - 1, + ], + ]; + assert_eq!(&tss_items, &exp); + Ok::<_, Error>(()) + }; + taskrun::run(fut).unwrap(); +} + +#[test] +fn test_one_before_00() { + use items_0::Empty; + use items_2::eventsdim0::EventsDim0; + let ms = 1_000_000; + let beg = TsNano::from_ms(1000 * 10); + let end = TsNano::from_ms(1000 * 20); + let mut items = Vec::new(); + { + let mut item = EventsDim0::::empty(); + item.push_back(beg.ns() - 1, 0, 2.9); + let w: Box = Box::new(item.clone()); + let e: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w))); + items.push(e); + } + { + let mut item = EventsDim0::::empty(); + item.push_back(beg.ns() + 0 * ms, 0, 3.); + item.push_back(beg.ns() + 1 * ms, 0, 3.1); + item.push_back(beg.ns() + 2 * ms, 0, 3.2); + item.push_back(beg.ns() + 3 * ms, 0, 3.3); + item.push_back(beg.ns() + 4 * ms, 0, 3.4); + item.push_back(end.ns() - 1, 0, 4.0); + item.push_back(end.ns() + 0, 0, 4.1); + item.push_back(end.ns() + 1, 0, 4.1); + let w: Box = Box::new(item.clone()); + let e: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w))); + items.push(e); + } + let inp = futures_util::stream::iter(items); + let one_before_range = true; + let range = NanoRange::from((beg.ns(), end.ns())); + let stream = RangeFilter2::new(inp, range, one_before_range); + let fut = async move { + let tss_items = fetch_into_tss_items(stream).await; + let exp: &[&[u64]] = &[ + // TODO in the future this empty may be discarded + &[], + &[ + // + beg.ns() - 1, + ], + &[ + beg.ns() + 0 * ms, + beg.ns() + 1 * ms, + beg.ns() + 2 * ms, + beg.ns() + 3 * ms, + beg.ns() + 4 * ms, + end.ns() - 1, + ], + ]; + assert_eq!(&tss_items, &exp); + Ok::<_, Error>(()) + }; + taskrun::run(fut).unwrap(); +} + +#[test] +fn test_one_before_01() { + use items_0::Empty; + use items_2::eventsdim0::EventsDim0; + let ms = 1_000_000; + let beg = TsNano::from_ms(1000 * 10); + let end = TsNano::from_ms(1000 * 20); + let mut items = Vec::new(); + { + let mut item = EventsDim0::::empty(); + item.push_back(beg.ns() - 1, 0, 2.9); + item.push_back(beg.ns() + 0 * ms, 0, 3.); + let w: Box = Box::new(item.clone()); + let e: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w))); + items.push(e); + } + { + let mut item = EventsDim0::::empty(); + item.push_back(beg.ns() + 1 * ms, 0, 3.1); + item.push_back(beg.ns() + 2 * ms, 0, 3.2); + item.push_back(beg.ns() + 3 * ms, 0, 3.3); + item.push_back(beg.ns() + 4 * ms, 0, 3.4); + item.push_back(end.ns() - 1, 0, 4.0); + item.push_back(end.ns() + 0, 0, 4.1); + item.push_back(end.ns() + 1, 0, 4.1); + let w: Box = Box::new(item.clone()); + let e: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w))); + items.push(e); + } + let inp = futures_util::stream::iter(items); + let one_before_range = true; + let range = NanoRange::from((beg.ns(), end.ns())); + let stream = RangeFilter2::new(inp, range, one_before_range); + let fut = async move { + let tss_items = fetch_into_tss_items(stream).await; + let exp: &[&[u64]] = &[ + // TODO in the future this empty may be discarded + // &[], + &[ + // + beg.ns() - 1, + beg.ns() + 0 * ms, + ], + &[ + beg.ns() + 1 * ms, + beg.ns() + 2 * ms, + beg.ns() + 3 * ms, + beg.ns() + 4 * ms, + end.ns() - 1, + ], + ]; + assert_eq!(&tss_items, &exp); + Ok::<_, Error>(()) + }; + taskrun::run(fut).unwrap(); +} + +#[test] +fn test_one_before_only() { + use items_0::Empty; + use items_2::eventsdim0::EventsDim0; + let _ms = 1_000_000; + let beg = TsNano::from_ms(1000 * 10); + let end = TsNano::from_ms(1000 * 20); + let mut items = Vec::new(); + { + let mut item = EventsDim0::::empty(); + item.push_back(beg.ns() - 1, 0, 2.9); + let w: Box = Box::new(item.clone()); + let e: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w))); + items.push(e); + } + let inp = futures_util::stream::iter(items); + let one_before_range = true; + let range = NanoRange::from((beg.ns(), end.ns())); + let stream = RangeFilter2::new(inp, range, one_before_range); + let fut = async move { + let tss_items = fetch_into_tss_items(stream).await; + let exp: &[&[u64]] = &[ + // TODO in the future this empty may be discarded + &[], + &[ + // + beg.ns() - 1, + ], + ]; + assert_eq!(&tss_items, &exp); + Ok::<_, Error>(()) + }; + taskrun::run(fut).unwrap(); +} + +#[cfg(test)] +async fn fetch_into_tss_items(mut inp: INP) -> VecDeque> +where + INP: Stream>> + Unpin, +{ + let mut tss_items = VecDeque::new(); + while let Some(e) = inp.next().await { + if let Ok(StreamItem::DataItem(RangeCompletableItem::Data(evs))) = e { + eprintln!("{:?}", evs); + tss_items.push_back(Events::tss(&evs).clone()); + } else { + eprintln!("other item ----------: {:?}", e); + } + } + tss_items +}