diff --git a/crates/items_2/src/binning/aggregator.rs b/crates/items_2/src/binning/aggregator.rs index 10194e8..2351718 100644 --- a/crates/items_2/src/binning/aggregator.rs +++ b/crates/items_2/src/binning/aggregator.rs @@ -73,7 +73,7 @@ where fn result_and_reset_for_new_bin(&mut self, filled_width_fraction: f32) -> EVT::AggTimeWeightOutputAvg { let sum = self.sum.clone(); - trace!("result_and_reset_for_new_bin sum {} {}", sum, filled_width_fraction); + trace_result!("result_and_reset_for_new_bin sum {} {}", sum, filled_width_fraction); self.sum = 0.; sum / filled_width_fraction as f64 } diff --git a/crates/items_2/src/binning/container_bins.rs b/crates/items_2/src/binning/container_bins.rs index 87f900e..886d484 100644 --- a/crates/items_2/src/binning/container_bins.rs +++ b/crates/items_2/src/binning/container_bins.rs @@ -472,7 +472,6 @@ where avgs, }; let ret = serde_json::to_value(&val).map_err(err::Error::from_string); - info!("VALUE: {:?}", ret); ret } } @@ -540,10 +539,7 @@ where range: Option, binrange: Option, ) -> Result, err::Error> { - info!( - "----------- ContainerBinsCollector result called len {}", - self.len() - ); + // TODO do we need to set timeout, continueAt or anything? let bins = mem::replace(&mut self.bins, ContainerBins::new()); let ret = ContainerBinsCollectorOutput { bins }; Ok(Box::new(ret)) diff --git a/crates/items_2/src/binning/timeweight/timeweight_events.rs b/crates/items_2/src/binning/timeweight/timeweight_events.rs index 9597ffa..ffcfdc8 100644 --- a/crates/items_2/src/binning/timeweight/timeweight_events.rs +++ b/crates/items_2/src/binning/timeweight/timeweight_events.rs @@ -18,28 +18,28 @@ use std::mem; macro_rules! trace_ { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } #[allow(unused)] -macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) } +macro_rules! trace_init { ($($arg:tt)*) => ( if false { trace_!($($arg)*); }) } #[allow(unused)] -macro_rules! trace_cycle { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) } +macro_rules! trace_cycle { ($($arg:tt)*) => ( if false { trace_!($($arg)*); }) } #[allow(unused)] macro_rules! trace_event_next { ($($arg:tt)*) => ( if false { trace_!($($arg)*); }) } #[allow(unused)] -macro_rules! trace_ingest_init_lst { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) } +macro_rules! trace_ingest_init_lst { ($($arg:tt)*) => ( if false { trace_!($($arg)*); }) } #[allow(unused)] -macro_rules! trace_ingest_minmax { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) } +macro_rules! trace_ingest_minmax { ($($arg:tt)*) => ( if false { trace_!($($arg)*); }) } #[allow(unused)] macro_rules! trace_ingest_event { ($($arg:tt)*) => ( if false { trace_!($($arg)*); }) } #[allow(unused)] -macro_rules! trace_ingest_firsts { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) } +macro_rules! trace_ingest_firsts { ($($arg:tt)*) => ( if false { trace_!($($arg)*); }) } #[allow(unused)] -macro_rules! trace_ingest_finish_bin { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) } +macro_rules! trace_ingest_finish_bin { ($($arg:tt)*) => ( if false { trace_!($($arg)*); }) } #[allow(unused)] macro_rules! trace_ingest_container { ($($arg:tt)*) => ( if false { trace_!($($arg)*); }) } diff --git a/crates/nodenet/src/scylla.rs b/crates/nodenet/src/scylla.rs index df4a50d..cfd5e12 100644 --- a/crates/nodenet/src/scylla.rs +++ b/crates/nodenet/src/scylla.rs @@ -5,11 +5,9 @@ use futures_util::FutureExt; use futures_util::Stream; use futures_util::StreamExt; use futures_util::TryStreamExt; -use items_0::on_sitemty_data; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; -use items_0::try_map_sitemty_data; use items_2::channelevents::ChannelEvents; use netpod::log::*; use netpod::ChConf; diff --git a/crates/scyllaconn/src/events.rs b/crates/scyllaconn/src/events.rs index e2190be..c15e640 100644 --- a/crates/scyllaconn/src/events.rs +++ b/crates/scyllaconn/src/events.rs @@ -2,6 +2,7 @@ use crate::events2::events::EventReadOpts; use crate::events2::prepare::StmtsEvents; use crate::range::ScyllaSeriesRange; use crate::worker::ScyllaQueue; +use core::fmt; use err::thiserror; use err::ThisError; use futures_util::Future; @@ -24,6 +25,7 @@ use scylla::Session; use series::SeriesId; use std::pin::Pin; use std::sync::Arc; +use std::time::Instant; use tracing::Instrument; #[allow(unused)] @@ -44,10 +46,12 @@ pub enum Error { ScyllaTypeConv(#[from] scylla::cql_to_rust::FromRowError), ScyllaWorker(Box), MissingQuery(String), + NotTokenAware, RangeEndOverflow, InvalidFuture, TestError(String), Logic, + TodoUnsupported, } impl From for Error { @@ -66,11 +70,12 @@ pub(super) trait ValTy: Sized + 'static { fn default() -> Self; fn is_valueblob() -> bool; fn st_name() -> &'static str; - fn read_next_values( + fn read_next_values_trait( opts: ReadNextValuesOpts, + jobtrace: ReadJobTrace, scy: Arc, stmts: Arc, - ) -> Pin, Error>> + Send>>; + ) -> Pin, ReadJobTrace), Error>> + Send>>; fn convert_rows( rows: Vec, range: ScyllaSeriesRange, @@ -112,12 +117,13 @@ macro_rules! impl_scaty_scalar { $st_name } - fn read_next_values( + fn read_next_values_trait( opts: ReadNextValuesOpts, + jobtrace: ReadJobTrace, scy: Arc, stmts: Arc, - ) -> Pin, Error>> + Send>> { - Box::pin(read_next_values_2::(opts, scy, stmts)) + ) -> Pin, ReadJobTrace), Error>> + Send>> { + Box::pin(read_next_values_2::(opts, jobtrace, scy, stmts)) } fn convert_rows( @@ -178,12 +184,13 @@ macro_rules! impl_scaty_array { $st_name } - fn read_next_values( + fn read_next_values_trait( opts: ReadNextValuesOpts, + jobtrace: ReadJobTrace, scy: Arc, stmts: Arc, - ) -> Pin, Error>> + Send>> { - Box::pin(read_next_values_2::(opts, scy, stmts)) + ) -> Pin, ReadJobTrace), Error>> + Send>> { + Box::pin(read_next_values_2::(opts, jobtrace, scy, stmts)) } fn convert_rows( @@ -231,13 +238,13 @@ impl ValTy for EnumVariant { "enum" } - fn read_next_values( + fn read_next_values_trait( opts: ReadNextValuesOpts, + jobtrace: ReadJobTrace, scy: Arc, stmts: Arc, - ) -> Pin, Error>> + Send>> { - let fut = read_next_values_2::(opts, scy, stmts); - Box::pin(fut) + ) -> Pin, ReadJobTrace), Error>> + Send>> { + Box::pin(read_next_values_2::(opts, jobtrace, scy, stmts)) } fn convert_rows( @@ -283,12 +290,13 @@ impl ValTy for Vec { "string" } - fn read_next_values( + fn read_next_values_trait( opts: ReadNextValuesOpts, + jobtrace: ReadJobTrace, scy: Arc, stmts: Arc, - ) -> Pin, Error>> + Send>> { - let fut = read_next_values_2::(opts, scy, stmts); + ) -> Pin, ReadJobTrace), Error>> + Send>> { + let fut = read_next_values_2::(opts, jobtrace, scy, stmts); Box::pin(fut) } @@ -329,6 +337,51 @@ impl_scaty_array!(Vec, f32, Vec, "f32", "f32"); impl_scaty_array!(Vec, f64, Vec, "f64", "f64"); impl_scaty_array!(Vec, bool, Vec, "bool", "bool"); +#[derive(Debug)] +pub enum ReadEventKind { + Create, + FutgenCallingReadNextValues, + FutgenFutureCreated, + CallExecuteIter, + ScyllaReadRow(u32), + ScyllaReadRowDone(u32), + ReadNextValuesFutureDone, + EventsStreamRtSees(u32), +} + +#[derive(Debug)] +pub struct ReadJobTrace { + jobid: u64, + ts0: Instant, + events: Vec<(Instant, ReadEventKind)>, +} + +impl ReadJobTrace { + pub fn new() -> Self { + static JOBID: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); + Self { + jobid: JOBID.fetch_add(1, std::sync::atomic::Ordering::AcqRel), + ts0: Instant::now(), + events: Vec::with_capacity(128), + } + } + + pub fn add_event_now(&mut self, kind: ReadEventKind) { + self.events.push((Instant::now(), kind)) + } +} + +impl fmt::Display for ReadJobTrace { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "ReadJobTrace jobid {jid}", jid = self.jobid)?; + for (ts, kind) in &self.events { + let dt = 1e3 * ts.saturating_duration_since(self.ts0).as_secs_f32(); + write!(fmt, "\njobid {jid:4} {dt:7.2} {kind:?}", jid = self.jobid)?; + } + Ok(()) + } +} + #[derive(Debug)] pub(super) struct ReadNextValuesOpts { rt: RetentionTime, @@ -362,15 +415,25 @@ impl ReadNextValuesOpts { } } -pub(super) async fn read_next_values(opts: ReadNextValuesOpts) -> Result, Error> +pub(super) struct ReadNextValuesParams { + pub opts: ReadNextValuesOpts, + pub jobtrace: ReadJobTrace, +} + +pub(super) async fn read_next_values(params: ReadNextValuesParams) -> Result<(Box, ReadJobTrace), Error> where ST: ValTy, { + let opts = params.opts; + let jobtrace = params.jobtrace; // TODO could take scyqeue out of opts struct. let scyqueue = opts.scyqueue.clone(); let level = taskrun::query_log_level(); - let futgen = Box::new(move |scy: Arc, stmts: Arc| { + let futgen = move |scy: Arc, stmts: Arc, mut jobtrace: ReadJobTrace| { + // TODO avoid this + // opts.jobtrace = jobtrace; let fut = async move { + // let jobtrace = &mut opts.jobtrace; let logspan = if level == Level::DEBUG { tracing::span!(Level::INFO, "log_span_debug") } else if level == Level::TRACE { @@ -378,25 +441,34 @@ where } else { tracing::Span::none() }; - ST::read_next_values(opts, scy, stmts) - .instrument(logspan) - .await - .map_err(crate::worker::Error::from) + jobtrace.add_event_now(ReadEventKind::FutgenCallingReadNextValues); + let fut = ST::read_next_values_trait(opts, jobtrace, scy, stmts).instrument(logspan); + match fut.await.map_err(crate::worker::Error::from) { + Ok((ret, mut jobtrace)) => { + jobtrace.add_event_now(ReadEventKind::ReadNextValuesFutureDone); + Ok((ret, jobtrace)) + } + Err(e) => Err(e), + } }; - Box::pin(fut) as Pin, crate::worker::Error>> + Send>> - }); - let res = scyqueue.read_next_values(futgen).await?; - Ok(res) + Box::pin(fut) + as Pin, ReadJobTrace), crate::worker::Error>> + Send>> + }; + let (res, jobtrace) = scyqueue.read_next_values(futgen, jobtrace).await?; + Ok((res, jobtrace)) } async fn read_next_values_2( opts: ReadNextValuesOpts, + mut jobtrace: ReadJobTrace, scy: Arc, stmts: Arc, -) -> Result, Error> +) -> Result<(Box, ReadJobTrace), Error> where ST: ValTy, { + let use_method_2 = true; + trace!("read_next_values_2 {:?} st_name {}", opts, ST::st_name()); let series = opts.series; let ts_msp = opts.ts_msp; @@ -429,6 +501,15 @@ where .lsp(!opts.fwd, with_values) .shape(ST::is_valueblob()) .st(ST::st_name())?; + let qu = { + let mut qu = qu.clone(); + if qu.is_token_aware() == false { + return Err(Error::NotTokenAware); + } + qu.set_page_size(10000); + // qu.disable_paging(); + qu + }; let params = ( series as i64, ts_msp.ms() as i64, @@ -436,14 +517,58 @@ where ts_lsp_max.ns() as i64, ); trace!("FWD event search params {:?}", params); + jobtrace.add_event_now(ReadEventKind::CallExecuteIter); let mut res = scy.execute_iter(qu.clone(), params).await?; - let mut rows = Vec::new(); - while let Some(x) = res.next().await { - rows.push(x?); + if use_method_2 == false { + let mut rows = Vec::new(); + while let Some(x) = res.next().await { + rows.push(x?); + } + let mut last_before = None; + let ret = ::convert_rows(rows, range, ts_msp, with_values, !opts.fwd, &mut last_before)?; + ret + } else { + let mut ret = ::Container::empty(); + // TODO must branch already here depending on what input columns we expect + if with_values { + if ::is_valueblob() { + let mut it = res.into_typed::<(i64, Vec)>(); + while let Some(x) = it.next().await { + let row = x?; + let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); + let value = ::from_valueblob(row.1); + ret.push(ts.ns(), 0, value); + } + ret + } else { + let mut i = 0; + let mut it = res.into_typed::<(i64, ST::ScyTy)>(); + while let Some(x) = it.next().await { + let row = x?; + let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); + let value = ::from_scyty(row.1); + ret.push(ts.ns(), 0, value); + i += 1; + if i % 2000 == 0 { + jobtrace.add_event_now(ReadEventKind::ScyllaReadRow(i)); + } + } + { + jobtrace.add_event_now(ReadEventKind::ScyllaReadRowDone(i)); + } + ret + } + } else { + let mut it = res.into_typed::<(i64,)>(); + while let Some(x) = it.next().await { + let row = x?; + let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); + let value = ::default(); + ret.push(ts.ns(), 0, value); + } + ret + } } - let mut last_before = None; - let ret = ST::convert_rows(rows, range, ts_msp, with_values, !opts.fwd, &mut last_before)?; - ret } else { let ts_lsp_max = if ts_msp.ns() < range.beg() { range.beg().delta(ts_msp.ns()) @@ -477,7 +602,7 @@ where }; trace!("read ts_msp {} len {}", ts_msp.fmt(), ret.len()); let ret = Box::new(ret); - Ok(ret) + Ok((ret, jobtrace)) } fn convert_rows_0( diff --git a/crates/scyllaconn/src/events2/events.rs b/crates/scyllaconn/src/events2/events.rs index e59fc8c..86de4d6 100644 --- a/crates/scyllaconn/src/events2/events.rs +++ b/crates/scyllaconn/src/events2/events.rs @@ -1,5 +1,6 @@ use super::msp::MspStreamRt; use crate::events::read_next_values; +use crate::events::ReadJobTrace; use crate::events::ReadNextValuesOpts; use crate::range::ScyllaSeriesRange; use crate::worker::ScyllaQueue; @@ -27,12 +28,7 @@ use std::task::Poll; #[allow(unused)] macro_rules! trace_fetch { - ($($arg:tt)*) => { - if true { - trace!($($arg)*); - } - }; -} + ($($arg:tt)*) => ( if true { trace!($($arg)*); } ) } #[allow(unused)] macro_rules! trace_emit { @@ -90,14 +86,75 @@ pub enum Error { Logic, Merge(#[from] items_0::MergeError), TruncateLogic, + AlreadyTaken, } struct FetchMsp { fut: Pin>> + Send>>, } +type ReadEventsFutOut = Result<(Box, ReadJobTrace), crate::events2::events::Error>; + +type FetchEventsFut = Pin + Send>>; + +enum Fst +where + F: Future + Unpin, + ::Output: Unpin, +{ + Ongoing(F), + Ready(::Output), + Taken, +} + +impl Fst +where + F: Future + Unpin, + ::Output: Unpin, +{ + fn take_if_ready(&mut self) -> Poll::Output>> { + use Poll::*; + match self { + Fst::Ongoing(_) => Pending, + Fst::Ready(_) => match core::mem::replace(self, Fst::Taken) { + Fst::Ready(x) => Ready(Some(x)), + _ => panic!(), + }, + Fst::Taken => Ready(None), + } + } +} + +impl Future for Fst +where + F: Future + Unpin, + ::Output: Unpin, +{ + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + use Poll::*; + match self.as_mut().get_mut() { + Fst::Ongoing(fut) => match fut.poll_unpin(cx) { + Ready(x) => { + *self = Fst::Ready(x); + Ready(()) + } + Pending => Pending, + }, + Fst::Ready(_) => Ready(()), + Fst::Taken => Ready(()), + } + } +} + +struct FetchEvents2 { + fut: Fst, +} + struct FetchEvents { - fut: Pin, crate::events2::events::Error>> + Send>>, + a: FetchEvents2, + b: Option, } enum ReadingState { @@ -182,7 +239,9 @@ impl EventsStreamRt { ts_msp: TsMs, bck: bool, scyqueue: ScyllaQueue, - ) -> Pin, Error>> + Send>> { + jobtrace: ReadJobTrace, + ) -> Pin, ReadJobTrace), Error>> + Send>> { + trace!("make_read_events_fut --- {} ---", ts_msp); let opts = ReadNextValuesOpts::new( self.rt.clone(), self.series.clone(), @@ -203,34 +262,35 @@ impl EventsStreamRt { scalar_type ); let fut = async move { + let params = crate::events::ReadNextValuesParams { opts, jobtrace }; let ret = match &shape { Shape::Scalar => match &scalar_type { - ScalarType::U8 => read_next_values::(opts).await, - ScalarType::U16 => read_next_values::(opts).await, - ScalarType::U32 => read_next_values::(opts).await, - ScalarType::U64 => read_next_values::(opts).await, - ScalarType::I8 => read_next_values::(opts).await, - ScalarType::I16 => read_next_values::(opts).await, - ScalarType::I32 => read_next_values::(opts).await, - ScalarType::I64 => read_next_values::(opts).await, - ScalarType::F32 => read_next_values::(opts).await, - ScalarType::F64 => read_next_values::(opts).await, - ScalarType::BOOL => read_next_values::(opts).await, - ScalarType::STRING => read_next_values::(opts).await, - ScalarType::Enum => read_next_values::(opts).await, + ScalarType::U8 => read_next_values::(params).await, + ScalarType::U16 => read_next_values::(params).await, + ScalarType::U32 => read_next_values::(params).await, + ScalarType::U64 => read_next_values::(params).await, + ScalarType::I8 => read_next_values::(params).await, + ScalarType::I16 => read_next_values::(params).await, + ScalarType::I32 => read_next_values::(params).await, + ScalarType::I64 => read_next_values::(params).await, + ScalarType::F32 => read_next_values::(params).await, + ScalarType::F64 => read_next_values::(params).await, + ScalarType::BOOL => read_next_values::(params).await, + ScalarType::STRING => read_next_values::(params).await, + ScalarType::Enum => read_next_values::(params).await, }, Shape::Wave(_) => match &scalar_type { - ScalarType::U8 => read_next_values::>(opts).await, - ScalarType::U16 => read_next_values::>(opts).await, - ScalarType::U32 => read_next_values::>(opts).await, - ScalarType::U64 => read_next_values::>(opts).await, - ScalarType::I8 => read_next_values::>(opts).await, - ScalarType::I16 => read_next_values::>(opts).await, - ScalarType::I32 => read_next_values::>(opts).await, - ScalarType::I64 => read_next_values::>(opts).await, - ScalarType::F32 => read_next_values::>(opts).await, - ScalarType::F64 => read_next_values::>(opts).await, - ScalarType::BOOL => read_next_values::>(opts).await, + ScalarType::U8 => read_next_values::>(params).await, + ScalarType::U16 => read_next_values::>(params).await, + ScalarType::U32 => read_next_values::>(params).await, + ScalarType::U64 => read_next_values::>(params).await, + ScalarType::I8 => read_next_values::>(params).await, + ScalarType::I16 => read_next_values::>(params).await, + ScalarType::I32 => read_next_values::>(params).await, + ScalarType::I64 => read_next_values::>(params).await, + ScalarType::F32 => read_next_values::>(params).await, + ScalarType::F64 => read_next_values::>(params).await, + ScalarType::BOOL => read_next_values::>(params).await, ScalarType::STRING => { warn!("read not yet supported {:?} {:?}", shape, scalar_type); err::todoval() @@ -273,10 +333,14 @@ impl EventsStreamRt { fn setup_bck_read(&mut self) { if let Some(ts) = self.msp_buf_bck.pop_back() { trace_fetch!("setup_bck_read {}", ts.fmt()); + let jobtrace = ReadJobTrace::new(); let scyqueue = self.scyqueue.clone(); - let fut = self.make_read_events_fut(ts, true, scyqueue); + let fut = self.make_read_events_fut(ts, true, scyqueue, jobtrace); self.state = State::ReadingBck(ReadingBck { - reading_state: ReadingState::FetchEvents(FetchEvents { fut }), + reading_state: ReadingState::FetchEvents(FetchEvents { + a: FetchEvents2 { fut: Fst::Ongoing(fut) }, + b: None, + }), }); } else { trace_fetch!("setup_bck_read no msp"); @@ -292,13 +356,66 @@ impl EventsStreamRt { } fn setup_fwd_read(&mut self) { + // TODO always try to setup all available slots. if let Some(ts) = self.msp_buf.pop_front() { trace_fetch!("setup_fwd_read {}", ts.fmt()); + let jobtrace = ReadJobTrace::new(); let scyqueue = self.scyqueue.clone(); - let fut = self.make_read_events_fut(ts, false, scyqueue); - self.state = State::ReadingFwd(ReadingFwd { - reading_state: ReadingState::FetchEvents(FetchEvents { fut }), - }); + let fut = self.make_read_events_fut(ts, false, scyqueue, jobtrace); + + // Assert that this fn is only called when there is at least one slot available. + // At the moment with 2 slots, this means that the 2nd is always empty. + // TODO careful in general, MUST NOT overwrite the secondary slot with None, there could be something running. + + if let State::ReadingFwd(st2) = &self.state { + if let ReadingState::FetchEvents(st3) = &st2.reading_state { + if st3.b.is_some() { + panic!() + } else { + } + } else { + self.state = State::ReadingFwd(ReadingFwd { + reading_state: ReadingState::FetchEvents(FetchEvents { + a: FetchEvents2 { fut: Fst::Ongoing(fut) }, + b: None, + }), + }); + } + } else { + self.state = State::ReadingFwd(ReadingFwd { + reading_state: ReadingState::FetchEvents(FetchEvents { + a: FetchEvents2 { fut: Fst::Ongoing(fut) }, + b: None, + }), + }); + } + + if let State::ReadingFwd(st2) = &self.state { + if let ReadingState::FetchEvents(st3) = &st2.reading_state { + if st3.b.is_some() { + panic!() + } else { + // Try the same with the 2nd slot + if let Some(ts) = self.msp_buf.pop_front() { + trace_fetch!("setup_fwd_read {} SECONDARY SLOT", ts.fmt()); + let jobtrace = ReadJobTrace::new(); + let scyqueue = self.scyqueue.clone(); + let fut = self.make_read_events_fut(ts, false, scyqueue, jobtrace); + if let State::ReadingFwd(st2) = &mut self.state { + if let ReadingState::FetchEvents(st3) = &mut st2.reading_state { + if st3.b.is_some() { + panic!() + } else { + st3.b = Some(FetchEvents2 { fut: Fst::Ongoing(fut) }); + } + } + } + } else { + // nothing to do + } + } + } + } } else { trace_fetch!("setup_fwd_read no msp"); let fut = Self::make_msp_read_fut(&mut self.msp_inp); @@ -424,38 +541,51 @@ impl Stream for EventsStreamRt { } Pending => Pending, }, - ReadingState::FetchEvents(st2) => match st2.fut.poll_unpin(cx) { - Ready(Ok(mut x)) => { - use items_2::merger::Mergeable; - trace_fetch!("ReadingBck FetchEvents got len {}", x.len()); - for ts in Mergeable::tss(&x) { - trace_every_event!("ReadingBck FetchEvents ts {}", ts.fmt()); - } - if let Some(ix) = Mergeable::find_highest_index_lt(&x, self.range.beg().ns()) { - trace_fetch!("ReadingBck FetchEvents find_highest_index_lt {:?}", ix); - let mut y = Mergeable::new_empty(&x); - match Mergeable::drain_into(&mut x, &mut y, (ix, 1 + ix)) { - Ok(()) => { - trace_fetch!("ReadingBck FetchEvents drained y len {:?}", y.len()); - self.out.push_back(y); - self.transition_to_fwd_read(); + ReadingState::FetchEvents(st2) => match st2.a.fut.poll_unpin(cx) { + Ready(()) => match st2.a.fut.take_if_ready() { + Ready(Some(x)) => match x { + Ok((mut evs, jobtrace)) => { + use items_2::merger::Mergeable; + trace!("ReadingBck {jobtrace}"); + trace_fetch!("ReadingBck FetchEvents got len {}", evs.len()); + for ts in Mergeable::tss(&evs) { + trace_every_event!("ReadingBck FetchEvents ts {}", ts.fmt()); + } + if let Some(ix) = Mergeable::find_highest_index_lt(&evs, self.range.beg().ns()) { + trace_fetch!("ReadingBck FetchEvents find_highest_index_lt {:?}", ix); + let mut y = Mergeable::new_empty(&evs); + match Mergeable::drain_into(&mut evs, &mut y, (ix, 1 + ix)) { + Ok(()) => { + trace_fetch!("ReadingBck FetchEvents drained y len {:?}", y.len()); + self.out.push_back(y); + self.transition_to_fwd_read(); + continue; + } + Err(e) => { + self.state = State::Done; + Ready(Some(Err(e.into()))) + } + } + } else { + trace_fetch!("ReadingBck FetchEvents find_highest_index_lt None"); + self.setup_bck_read(); continue; } - Err(e) => { - self.state = State::Done; - Ready(Some(Err(e.into()))) - } } - } else { - trace_fetch!("ReadingBck FetchEvents find_highest_index_lt None"); - self.setup_bck_read(); - continue; + Err(e) => { + self.state = State::Done; + Ready(Some(Err(e))) + } + }, + Ready(None) => { + self.state = State::Done; + Ready(Some(Err(Error::AlreadyTaken))) } - } - Ready(Err(e)) => { - self.state = State::Done; - Ready(Some(Err(e.into()))) - } + Pending => { + self.state = State::Done; + Ready(Some(Err(Error::Logic))) + } + }, Pending => Pending, }, }, @@ -474,23 +604,43 @@ impl Stream for EventsStreamRt { } Pending => Pending, }, - ReadingState::FetchEvents(st2) => match st2.fut.poll_unpin(cx) { - Ready(Ok(x)) => { - use items_2::merger::Mergeable; - trace_fetch!("ReadingFwd FetchEvents got len {:?}", x.len()); - for ts in Mergeable::tss(&x) { - trace_every_event!("ReadingFwd FetchEvents ts {}", ts.fmt()); + ReadingState::FetchEvents(st2) => { + let _ = st2.a.fut.poll_unpin(cx); + if let Some(st3) = st2.b.as_mut() { + let _ = st3.fut.poll_unpin(cx); + } + match st2.a.fut.take_if_ready() { + Ready(Some(x)) => { + if let Some(b) = st2.b.take() { + st2.a = b; + } + match x { + Ok((evs, mut jobtrace)) => { + jobtrace.add_event_now(crate::events::ReadEventKind::EventsStreamRtSees( + evs.len() as u32, + )); + use items_2::merger::Mergeable; + trace!("ReadingFwd {jobtrace}"); + for ts in Mergeable::tss(&evs) { + trace_every_event!("ReadingFwd FetchEvents ts {}", ts.fmt()); + } + self.out.push_back(evs); + self.setup_fwd_read(); + continue; + } + Err(e) => { + self.state = State::Done; + Ready(Some(Err(e.into()))) + } + } } - self.out.push_back(x); - self.setup_fwd_read(); - continue; + Ready(None) => { + self.state = State::Done; + Ready(Some(Err(Error::Logic))) + } + Pending => Pending, } - Ready(Err(e)) => { - self.state = State::Done; - Ready(Some(Err(e.into()))) - } - Pending => Pending, - }, + } }, State::InputDone => { if self.out.len() == 0 { diff --git a/crates/scyllaconn/src/worker.rs b/crates/scyllaconn/src/worker.rs index 83c242d..bf518fd 100644 --- a/crates/scyllaconn/src/worker.rs +++ b/crates/scyllaconn/src/worker.rs @@ -1,4 +1,5 @@ use crate::conn::create_scy_session_no_ks; +use crate::events::ReadJobTrace; use crate::events2::prepare::StmtsCache; use crate::events2::prepare::StmtsEvents; use crate::range::ScyllaSeriesRange; @@ -74,11 +75,12 @@ struct ReadNextValues { dyn FnOnce( Arc, Arc, - ) -> Pin, Error>> + Send>> + ReadJobTrace, + ) -> Pin, ReadJobTrace), Error>> + Send>> + Send, >, - // fut: Pin, Error>> + Send>>, - tx: Sender, Error>>, + tx: Sender, ReadJobTrace), Error>>, + jobtrace: ReadJobTrace, } impl fmt::Debug for ReadNextValues { @@ -107,12 +109,17 @@ impl ScyllaQueue { Ok(res) } - pub async fn read_next_values(&self, futgen: F) -> Result, Error> + pub async fn read_next_values( + &self, + futgen: F, + jobtrace: ReadJobTrace, + ) -> Result<(Box, ReadJobTrace), Error> where F: FnOnce( Arc, Arc, - ) -> Pin, Error>> + Send>> + ReadJobTrace, + ) -> Pin, ReadJobTrace), Error>> + Send>> + Send + 'static, { @@ -120,6 +127,7 @@ impl ScyllaQueue { let job = Job::ReadNextValues(ReadNextValues { futgen: Box::new(futgen), tx, + jobtrace, }); self.tx.send(job).await.map_err(|_| Error::ChannelSend)?; let res = rx.recv().await.map_err(|_| Error::ChannelRecv)??; @@ -243,7 +251,7 @@ impl ScyllaWorker { } } Job::ReadNextValues(job) => { - let fut = (job.futgen)(scy.clone(), stmts.clone()); + let fut = (job.futgen)(scy.clone(), stmts.clone(), job.jobtrace); let res = fut.await; if job.tx.send(res.map_err(Into::into)).await.is_err() { // TODO count for stats diff --git a/crates/streams/src/timebin/fromevents.rs b/crates/streams/src/timebin/fromevents.rs index 3b805c7..316e7cf 100644 --- a/crates/streams/src/timebin/fromevents.rs +++ b/crates/streams/src/timebin/fromevents.rs @@ -23,7 +23,10 @@ macro_rules! trace_emit { ($($arg:tt)*) => ( if true { trace!($($arg)*); } ) } #[derive(Debug, ThisError)] #[cstm(name = "ReadingBinnedFromEvents")] -pub enum Error {} +pub enum Error { + ExpectTimerange, + ExpectTimeweighted, +} pub struct BinnedFromEvents { stream: Pin> + Send>>, @@ -37,7 +40,7 @@ impl BinnedFromEvents { read_provider: Arc, ) -> Result { if !evq.range().is_time() { - panic!(); + return Err(Error::ExpectTimerange); } let stream = read_provider.read(evq); let stream = ConvertForBinning::new(Box::pin(stream)); @@ -45,17 +48,17 @@ impl BinnedFromEvents { let stream = Box::pin(stream); BinnedEventsTimeweightStream::new(range, stream) } else { - panic!("non-weighted TODO") + return Err(Error::ExpectTimeweighted); }; let stream = stream.map(|item| match item { Ok(x) => match x { StreamItem::DataItem(x) => match x { RangeCompletableItem::Data(x) => { - debug!("see item {:?}", x); + trace_emit!("see item {:?}", x); Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))) } RangeCompletableItem::RangeComplete => { - info!("BinnedFromEvents sees range final"); + debug!("BinnedFromEvents sees range final"); Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) } },