From cde2e4c1a25cf830c37d2b72853743142867b5f8 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 30 Oct 2024 17:57:32 +0100 Subject: [PATCH] Improve event read --- crates/nodenet/src/scylla.rs | 2 +- crates/scyllaconn/src/events2/events.rs | 185 ++++++++++++++++-------- 2 files changed, 123 insertions(+), 64 deletions(-) diff --git a/crates/nodenet/src/scylla.rs b/crates/nodenet/src/scylla.rs index e2988fb..43bfd98 100644 --- a/crates/nodenet/src/scylla.rs +++ b/crates/nodenet/src/scylla.rs @@ -42,7 +42,7 @@ pub async fn scylla_channel_event_stream( evq.need_one_before_range(), evq.need_value_data(), evq.transform().enum_as_string().unwrap_or(false), - evq.settings().scylla_read_queue_len().unwrap_or(1), + evq.settings().scylla_read_queue_len(), ); let stream: Pin + Send>> = if let Some(rt) = evq.use_rt() { let x = scyllaconn::events2::events::EventsStreamRt::new( diff --git a/crates/scyllaconn/src/events2/events.rs b/crates/scyllaconn/src/events2/events.rs index a15367c..24691df 100644 --- a/crates/scyllaconn/src/events2/events.rs +++ b/crates/scyllaconn/src/events2/events.rs @@ -33,20 +33,17 @@ macro_rules! trace_fetch { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } #[allow(unused)] macro_rules! trace_msp_fetch { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } +#[allow(unused)] +macro_rules! trace_redo_fwd_read { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } + #[allow(unused)] macro_rules! trace_emit { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } #[allow(unused)] -macro_rules! warn_item { ($($arg:tt)*) => ( if true { debug!($($arg)*); } ) } +macro_rules! trace_every_event { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } #[allow(unused)] -macro_rules! trace_every_event { - ($($arg:tt)*) => { - if false { - trace!($($arg)*); - } - }; -} +macro_rules! warn_item { ($($arg:tt)*) => ( if true { debug!($($arg)*); } ) } #[derive(Debug, Clone)] pub struct EventReadOpts { @@ -57,12 +54,12 @@ pub struct EventReadOpts { } impl EventReadOpts { - pub fn new(one_before: bool, with_values: bool, enum_as_strings: bool, qucap: u32) -> Self { + pub fn new(one_before: bool, with_values: bool, enum_as_strings: bool, qucap: Option) -> Self { Self { one_before, with_values, enum_as_strings, - qucap, + qucap: qucap.unwrap_or(2), } } @@ -209,14 +206,39 @@ struct ReadingFwd { msp_done: bool, msp_fut: Option, qu: ReadQueue, + make_fut_info: MakeFutInfo, } impl ReadingFwd { - fn new(qucap: usize) -> Self { + fn new(k: &EventsStreamRt) -> Self { Self { msp_done: false, msp_fut: None, - qu: ReadQueue::new(qucap), + qu: ReadQueue::new(k.qucap), + make_fut_info: MakeFutInfo::new(k), + } + } +} + +#[derive(Clone)] +struct MakeFutInfo { + scyqueue: ScyllaQueue, + rt: RetentionTime, + series: SeriesId, + range: ScyllaSeriesRange, + readopts: EventReadOpts, + ch_conf: ChConf, +} + +impl MakeFutInfo { + fn new(k: &EventsStreamRt) -> Self { + Self { + scyqueue: k.scyqueue.clone(), + rt: k.rt.clone(), + series: k.series.clone(), + range: k.range.clone(), + readopts: k.readopts.clone(), + ch_conf: k.ch_conf.clone(), } } } @@ -308,25 +330,12 @@ impl EventsStreamRt { fn make_read_events_fut( ts_msp: TsMs, bck: bool, - rt: RetentionTime, - series: SeriesId, - range: ScyllaSeriesRange, - readopts: EventReadOpts, - ch_conf: ChConf, - scyqueue: ScyllaQueue, + mfi: MakeFutInfo, jobtrace: ReadJobTrace, ) -> Pin, ReadJobTrace), Error>> + Send>> { - let opts = ReadNextValuesOpts::new( - rt.clone(), - series.clone(), - ts_msp, - range.clone(), - !bck, - readopts.clone(), - scyqueue, - ); - let scalar_type = ch_conf.scalar_type().clone(); - let shape = ch_conf.shape().clone(); + let opts = ReadNextValuesOpts::new(mfi.rt, mfi.series, ts_msp, mfi.range, !bck, mfi.readopts, mfi.scyqueue); + let scalar_type = mfi.ch_conf.scalar_type().clone(); + let shape = mfi.ch_conf.shape().clone(); trace_fetch!( "make_read_events_fut bck {} msp {:?} {} {:?} {:?}", bck, @@ -411,13 +420,8 @@ impl EventsStreamRt { if let Some(ts) = self.msp_buf_bck.pop_back() { trace_fetch!("setup_bck_read {}", ts.fmt()); let jobtrace = ReadJobTrace::new(); - let scyqueue = self.scyqueue.clone(); - let rt = self.rt.clone(); - let series = self.series.clone(); - let range = self.range.clone(); - let readopts = self.readopts.clone(); - let ch_conf = self.ch_conf.clone(); - let fut = Self::make_read_events_fut(ts, true, rt, series, range, readopts, ch_conf, scyqueue, jobtrace); + let mfi = MakeFutInfo::new(self); + let fut = Self::make_read_events_fut(ts, true, mfi, jobtrace); let mut qu = ReadQueue::new(self.qucap); qu.push(fut); self.state = State::ReadingBck(ReadingBck { @@ -437,29 +441,27 @@ impl EventsStreamRt { } fn setup_fwd_read(&mut self) { - self.state = State::ReadingFwd(ReadingFwd::new(self.qucap)); + let selfname = "setup_fwd_read"; + trace_fetch!("{selfname}"); + self.state = State::ReadingFwd(ReadingFwd::new(self)); } - fn redo_fwd_read(&mut self) { - // trace_fetch!("setup_fwd_read {} {} BEFORE", self.msp_buf.len(), qu.len()); - // while qu.has_space() { - // if let Some(ts) = self.msp_buf.pop_front() { - // trace_fetch!("setup_fwd_read {} FILL A SLOT", ts.fmt()); - // let jobtrace = ReadJobTrace::new(); - // let scyqueue = self.scyqueue.clone(); - // let rt = self.rt.clone(); - // let series = self.series.clone(); - // let range = self.range.clone(); - // let readopts = self.readopts.clone(); - // let ch_conf = self.ch_conf.clone(); - // let fut = - // Self::make_read_events_fut(ts, false, rt, series, range, readopts, ch_conf, scyqueue, jobtrace); - // qu.push(fut); - // } else { - // break; - // } - // } - // trace_fetch!("setup_fwd_read {} {} AFTER", self.msp_buf.len(), qu.len()); + fn redo_fwd_read(st: &mut ReadingFwd, msp_buf: &mut VecDeque) { + let selfname = "redo_fwd_read"; + let qu = &mut st.qu; + trace_redo_fwd_read!("{selfname} {} {} BEFORE", msp_buf.len(), qu.len()); + while qu.has_space() { + if let Some(ts) = msp_buf.pop_front() { + trace_fetch!("{selfname} {} FILL A SLOT", ts.fmt()); + let jobtrace = ReadJobTrace::new(); + let mfi = st.make_fut_info.clone(); + let fut = Self::make_read_events_fut(ts, false, mfi, jobtrace); + qu.push(fut); + } else { + break; + } + } + trace_redo_fwd_read!("{selfname} {} {} AFTER", msp_buf.len(), qu.len()); } } @@ -468,8 +470,12 @@ impl Stream for EventsStreamRt { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - // return Ready(Some(Err(Error::Logic))); + let mut i = 0; loop { + i += 1; + if i > 5000 { + panic!("too many iterations") + } if let Some(mut item) = self.out.pop_front() { if !item.verify() { warn_item!("{}bad item {:?}", "\n\n--------------------------\n", item); @@ -632,8 +638,23 @@ impl Stream for EventsStreamRt { let mut have_pending = false; if let Some(fut) = st.msp_fut.as_mut() { match fut.fut.poll_unpin(cx) { - Ready(_) => { + Ready(a) => { st.msp_fut = None; + match a { + Ok(a) => { + if a.len() == 0 { + trace_fetch!("MSP INPUT DONE --------------------"); + st.msp_done = true; + } + for x in a { + msp_buf.push_back(x); + } + } + Err(e) => { + self.state = State::Done; + return Ready(Some(Err(e.into()))); + } + } } Pending => { have_pending = true; @@ -642,10 +663,48 @@ impl Stream for EventsStreamRt { } else if st.msp_done == false && msp_buf.len() < 100 { trace_msp_fetch!("create msp read fut"); let fut = Self::make_msp_read_fut(&mut self2.msp_inp); - // let fut = err::todoval(); - st.msp_fut = Some(FetchMsp { fut: fut }); - }; - panic!() + st.msp_fut = Some(FetchMsp { fut }); + } else { + // trace_fetch!("nothing to do for msp fetch"); + } + if st.qu.has_space() { + Self::redo_fwd_read(st, msp_buf); + } + match st.qu.poll_next_unpin(cx) { + Ready(Some(x)) => match x { + Ok((evs, mut jobtrace)) => { + jobtrace + .add_event_now(crate::events::ReadEventKind::EventsStreamRtSees(evs.len() as u32)); + use items_2::merger::Mergeable; + trace_fetch!("ReadingFwd {jobtrace}"); + for ts in Mergeable::tss(&evs) { + trace_every_event!("ReadingFwd FetchEvents ts {}", ts.fmt()); + } + self.out.push_back(evs); + continue; + } + Err(e) => { + self.state = State::Done; + return Ready(Some(Err(e.into()))); + } + }, + Ready(None) => {} + Pending => { + have_pending = true; + } + } + if have_pending { + Pending + } else { + if msp_buf.len() == 0 && st.msp_done && st.qu.len() == 0 { + self.state = State::InputDone; + continue; + } else if self.out.len() != 0 { + continue; + } else { + panic!("not pending, nothing to output") + } + } } // State::ReadingFwd(st) => match &mut st.reading_state { // ReadingState::FetchMsp(st2) => match st2.fut.poll_unpin(cx) {