From 068d6ab71f42260c98cd25e073a7ce6637129206 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 12 Nov 2024 13:40:01 +0100 Subject: [PATCH] Retrieve from all RT streams merged --- crates/nodenet/src/conn.rs | 14 +- crates/nodenet/src/scylla.rs | 6 +- crates/scyllaconn/src/bincache.rs | 2 +- crates/scyllaconn/src/events2.rs | 3 +- crates/scyllaconn/src/events2/firstbefore.rs | 240 ------- crates/scyllaconn/src/events2/mergert.rs | 635 +++--------------- .../scyllaconn/src/events2/mergertchained.rs | 613 +++++++++++++++++ .../src/events2/onebeforeandbulk.rs | 315 +++++++++ 8 files changed, 1020 insertions(+), 808 deletions(-) delete mode 100644 crates/scyllaconn/src/events2/firstbefore.rs create mode 100644 crates/scyllaconn/src/events2/mergertchained.rs create mode 100644 crates/scyllaconn/src/events2/onebeforeandbulk.rs diff --git a/crates/nodenet/src/conn.rs b/crates/nodenet/src/conn.rs index be22ad4..c0d8299 100644 --- a/crates/nodenet/src/conn.rs +++ b/crates/nodenet/src/conn.rs @@ -115,17 +115,6 @@ async fn make_channel_events_stream_data( } } -async fn make_channel_events_stream( - subq: EventsSubQuery, - reqctx: ReqCtxArc, - scyqueue: Option<&ScyllaQueue>, - ncc: &NodeConfigCached, -) -> Result> + Send>>, Error> { - let stream = make_channel_events_stream_data(subq, reqctx, scyqueue, ncc).await?; - let ret = Box::pin(stream); - Ok(ret) -} - pub async fn create_response_bytes_stream( evq: EventsSubQuery, scyqueue: Option<&ScyllaQueue>, @@ -151,7 +140,7 @@ pub async fn create_response_bytes_stream( Ok(ret) } else { let mut tr = build_event_transform(evq.transform())?; - let stream = make_channel_events_stream(evq, reqctx, scyqueue, ncc).await?; + let stream = make_channel_events_stream_data(evq, reqctx, scyqueue, ncc).await?; let stream = stream.map(move |x| { on_sitemty_data!(x, |x: ChannelEvents| { match x { @@ -167,7 +156,6 @@ pub async fn create_response_bytes_stream( } }) }); - // let stream = stream.map(move |x| Box::new(x) as Box); let stream = stream.map(|x| { x.make_frame_dyn() .map(bytes::BytesMut::freeze) diff --git a/crates/nodenet/src/scylla.rs b/crates/nodenet/src/scylla.rs index 6a72588..3873fb9 100644 --- a/crates/nodenet/src/scylla.rs +++ b/crates/nodenet/src/scylla.rs @@ -45,7 +45,7 @@ pub async fn scylla_channel_event_stream( evq.settings().scylla_read_queue_len(), ); let stream: Pin + Send>> = if let Some(rt) = evq.use_rt() { - info!("========= SOLO {rt:?} ====================="); + trace!("========= SOLO {rt:?} ====================="); let x = scyllaconn::events2::events::EventsStreamRt::new( rt, chconf.clone(), @@ -53,10 +53,10 @@ pub async fn scylla_channel_event_stream( readopts, scyqueue.clone(), ) - .map_err(|e| scyllaconn::events2::mergert::Error::from(e)); + .map_err(|e| scyllaconn::events2::mergert::Error::Msg(e.to_string())); Box::pin(x) } else { - info!("========= MERGED ====================="); + trace!("========= MERGED ====================="); let x = scyllaconn::events2::mergert::MergeRts::new(chconf.clone(), evq.range().into(), readopts, scyqueue.clone()); Box::pin(x) diff --git a/crates/scyllaconn/src/bincache.rs b/crates/scyllaconn/src/bincache.rs index 729b49e..c5d7b12 100644 --- a/crates/scyllaconn/src/bincache.rs +++ b/crates/scyllaconn/src/bincache.rs @@ -46,7 +46,7 @@ pub async fn worker_write( stmts_cache: &StmtsCache, scy: &ScySession, ) -> Result<(), streams::timebin::cached::reader::Error> { - for ((((((&ts1, &ts2), &cnt), &min), &max), &avg), &lst) in bins.zip_iter() { + for (((((((&ts1, &ts2), &cnt), &min), &max), &avg), &lst), &fnl) in bins.zip_iter() { let bin_len = DtMs::from_ms_u64((ts2.ns() - ts1.ns()) / 1000000); let div = streams::timebin::cached::reader::part_len(bin_len).ns(); let msp = ts1.ns() / div; diff --git a/crates/scyllaconn/src/events2.rs b/crates/scyllaconn/src/events2.rs index ea58344..6b6effd 100644 --- a/crates/scyllaconn/src/events2.rs +++ b/crates/scyllaconn/src/events2.rs @@ -1,6 +1,7 @@ pub mod events; -pub mod firstbefore; pub mod mergert; +pub mod mergertchained; pub mod msp; pub mod nonempty; +pub mod onebeforeandbulk; pub mod prepare; diff --git a/crates/scyllaconn/src/events2/firstbefore.rs b/crates/scyllaconn/src/events2/firstbefore.rs deleted file mode 100644 index 7589a71..0000000 --- a/crates/scyllaconn/src/events2/firstbefore.rs +++ /dev/null @@ -1,240 +0,0 @@ -use daqbuf_err as err; -use err::thiserror; -use err::ThisError; -use futures_util::Stream; -use futures_util::StreamExt; -use items_0::Events; -use items_2::merger::Mergeable; -use netpod::log::*; -use netpod::stream_impl_tracer::StreamImplTracer; -use netpod::TsNano; -use std::pin::Pin; -use std::task::Context; -use std::task::Poll; - -#[allow(unused)] -macro_rules! trace_transition { - ($($arg:tt)*) => { - if true { - trace!($($arg)*); - } - }; -} - -#[allow(unused)] -macro_rules! trace_emit { - ($($arg:tt)*) => { - if true { - trace!($($arg)*); - } - }; -} - -macro_rules! tracer_poll_enter { - ($self:expr) => { - if false && $self.tracer.poll_enter() { - return Ready(Some(Err(Error::LimitPoll))); - } - }; -} - -macro_rules! tracer_loop_enter { - ($self:expr) => { - if false && $self.tracer.loop_enter() { - return Ready(Some(Err(Error::LimitLoop))); - } - }; -} - -#[derive(Debug, ThisError)] -#[cstm(name = "EventsFirstBefore")] -pub enum Error { - Unordered, - Logic, - Input(Box), - LimitPoll, - LimitLoop, -} - -pub enum Output { - First(T, T), - Bulk(T), -} - -enum State { - Begin, - Bulk, - Done, -} - -pub struct FirstBeforeAndInside -where - S: Stream + Unpin, - T: Events + Mergeable + Unpin, -{ - ts0: TsNano, - inp: S, - state: State, - buf: Option, - tracer: StreamImplTracer, -} - -impl FirstBeforeAndInside -where - S: Stream + Unpin, - T: Events + Mergeable + Unpin, -{ - pub fn new(inp: S, ts0: TsNano) -> Self { - trace_transition!("FirstBeforeAndInside::new"); - Self { - ts0, - inp, - state: State::Begin, - buf: None, - tracer: StreamImplTracer::new("FirstBeforeAndInside".into(), 2000, 100), - } - } -} - -impl Stream for FirstBeforeAndInside -where - S: Stream> + Unpin, - T: Events + Mergeable + Unpin, - E: std::error::Error + Send + 'static, -{ - type Item = Result, Error>; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - use Poll::*; - tracer_poll_enter!(self); - loop { - tracer_loop_enter!(self); - break match &self.state { - State::Begin => match self.inp.poll_next_unpin(cx) { - Ready(Some(Ok(mut item))) => { - // It is an invariant that we process ordered streams, but for robustness - // verify the batch item again: - if item.verify() != true { - self.state = State::Done; - let e = Error::Unordered; - Ready(Some(Err(e))) - } else { - // Separate events into before and bulk - let tss = Events::tss(&item); - let pp = tss.partition_point(|&x| x < self.ts0.ns()); - trace_transition!("partition_point {pp:?} {n:?}", n = tss.len()); - if pp > item.len() { - error!("bad partition point {} {}", pp, item.len()); - self.state = State::Done; - Ready(Some(Err(Error::Logic))) - } else if pp == item.len() { - // all entries are before, or empty item - if self.buf.is_none() { - self.buf = Some(item.new_empty()); - } - match item.drain_into_evs(self.buf.as_mut().unwrap(), (0, item.len())) { - Ok(()) => { - continue; - } - Err(e) => { - self.state = State::Done; - Ready(Some(Err(Error::Input(Box::new(e))))) - } - } - } else if pp == 0 { - // all entries are bulk - trace_transition!("transition immediately to bulk"); - self.state = State::Bulk; - let o1 = core::mem::replace(&mut self.buf, Some(item.new_empty())) - .unwrap_or_else(|| item.new_empty()); - Ready(Some(Ok(Output::First(o1, item)))) - } else { - // mixed - if self.buf.is_none() { - self.buf = Some(item.new_empty()); - } - match item.drain_into_evs(self.buf.as_mut().unwrap(), (0, pp)) { - Ok(()) => { - trace_transition!("transition with mixed to bulk"); - self.state = State::Bulk; - let o1 = core::mem::replace(&mut self.buf, Some(item.new_empty())) - .unwrap_or_else(|| item.new_empty()); - Ready(Some(Ok(Output::First(o1, item)))) - } - Err(e) => { - self.state = State::Done; - Ready(Some(Err(Error::Input(Box::new(e))))) - } - } - } - } - } - Ready(Some(Err(e))) => { - self.state = State::Done; - Ready(Some(Err(Error::Input(Box::new(e))))) - } - Ready(None) => { - self.state = State::Done; - if let Some(x) = self.buf.take() { - let empty = x.new_empty(); - Ready(Some(Ok(Output::First(x, empty)))) - } else { - Ready(None) - } - } - Pending => Pending, - }, - State::Bulk => { - if self.buf.as_ref().map_or(0, |x| x.len()) != 0 { - error!( - "State::Bulk but buf non-empty {}", - self.buf.as_ref().map_or(0, |x| x.len()) - ); - self.state = State::Done; - Ready(Some(Err(Error::Logic))) - } else { - match self.inp.poll_next_unpin(cx) { - Ready(Some(Ok(item))) => { - if item.verify() != true { - self.state = State::Done; - let e = Error::Unordered; - Ready(Some(Err(e))) - } else { - trace_emit!("output bulk item len {}", item.len()); - Ready(Some(Ok(Output::Bulk(item)))) - } - } - Ready(Some(Err(e))) => { - self.state = State::Done; - Ready(Some(Err(Error::Input(Box::new(e))))) - } - Ready(None) => { - trace_emit!("in bulk, input done"); - self.state = State::Done; - Ready(None) - } - Pending => Pending, - } - } - } - State::Done => Ready(None), - }; - } - } -} - -fn trait_assert(_: T) -where - T: Stream + Unpin + Send, -{ -} - -#[allow(unused)] -fn trait_assert_try() { - let x: FirstBeforeAndInside = phantomval(); - trait_assert(x); -} - -fn phantomval() -> T { - panic!() -} diff --git a/crates/scyllaconn/src/events2/mergert.rs b/crates/scyllaconn/src/events2/mergert.rs index ffb8fcb..fd83990 100644 --- a/crates/scyllaconn/src/events2/mergert.rs +++ b/crates/scyllaconn/src/events2/mergert.rs @@ -1,318 +1,114 @@ use super::events::EventReadOpts; use super::events::EventsStreamRt; -use super::firstbefore::FirstBeforeAndInside; -use crate::events2::firstbefore; +use crate::events2::onebeforeandbulk::OneBeforeAndBulk; use crate::range::ScyllaSeriesRange; use crate::worker::ScyllaQueue; use daqbuf_err as err; use err::thiserror; use err::ThisError; -use futures_util::Future; -use futures_util::FutureExt; use futures_util::Stream; use futures_util::StreamExt; -use items_0::WithLen; +use items_0::streamitem::sitem_err2_from_string; +use items_0::streamitem::RangeCompletableItem; +use items_0::streamitem::SitemErrTy; +use items_0::streamitem::StreamItem; use items_2::channelevents::ChannelEvents; -use items_2::merger::Mergeable; +use items_2::merger::Merger; use netpod::log::*; -use netpod::range::evrange::NanoRange; -use netpod::range::evrange::SeriesRange; -use netpod::stream_impl_tracer::StreamImplTracer; use netpod::ttl::RetentionTime; use netpod::ChConf; -use std::collections::VecDeque; use std::pin::Pin; use std::task::Context; use std::task::Poll; -#[allow(unused)] -macro_rules! trace_init { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } - -#[allow(unused)] -macro_rules! trace_fetch { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } - -#[allow(unused)] -macro_rules! trace_emit { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } - -macro_rules! tracer_poll_enter { - ($self:expr) => { - if false && $self.tracer.poll_enter() { - return Ready(Some(Err(Error::LimitPoll))); - } - }; -} - -macro_rules! tracer_loop_enter { - ($self:expr) => { - if false && $self.tracer.loop_enter() { - return Ready(Some(Err(Error::LimitLoop))); - } - }; -} +macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace!($($arg)*); } ) } #[derive(Debug, ThisError)] #[cstm(name = "EventsMergeRt")] pub enum Error { - Input(#[from] crate::events2::firstbefore::Error), - Events(#[from] crate::events2::events::Error), - Logic, - OrderMin, - OrderMax, - LimitPoll, - LimitLoop, -} - -#[allow(unused)] -enum Resolvable -where - F: Future, -{ - Future(F), - Output(::Output), - Taken, -} - -#[allow(unused)] -impl Resolvable -where - F: Future, -{ - fn unresolved(&self) -> bool { - match self { - Resolvable::Future(_) => true, - Resolvable::Output(_) => false, - Resolvable::Taken => false, - } - } - - fn take(&mut self) -> Option<::Output> { - let x = std::mem::replace(self, Resolvable::Taken); - match x { - Resolvable::Future(_) => None, - Resolvable::Output(x) => Some(x), - Resolvable::Taken => None, - } - } -} - -impl Future for Resolvable -where - F: Future + Unpin, -{ - type Output = ::Output; - - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<::Output> { - match unsafe { self.get_unchecked_mut() } { - Resolvable::Future(fut) => fut.poll_unpin(cx), - Resolvable::Output(_) => panic!(), - Resolvable::Taken => panic!(), - } - } -} - -type TI = FirstBeforeAndInside; -type INPI = Result, crate::events2::firstbefore::Error>; - -struct ReadEvents { - fut: Pin> + Send>>, -} - -enum State { - Begin, - FetchFirstSt(ReadEvents), - FetchFirstMt(ReadEvents), - FetchFirstLt(ReadEvents), - ReadingLt(Option, VecDeque, Option>), - ReadingMt(Option, VecDeque, Option>), - ReadingSt(Option, VecDeque, Option>), - Done, + Msg(String), } pub struct MergeRts { - ch_conf: ChConf, - range: ScyllaSeriesRange, - range_mt: ScyllaSeriesRange, - range_lt: ScyllaSeriesRange, - readopts: EventReadOpts, - scyqueue: ScyllaQueue, - inp_st: Option>, - inp_mt: Option>, - inp_lt: Option>, - state: State, - buf_st: VecDeque, - buf_mt: VecDeque, - buf_lt: VecDeque, - out: VecDeque, - buf_before: Option, - ts_seen_max: u64, - tracer: StreamImplTracer, + inp: Pin> + Send>>, } impl MergeRts { pub fn new(ch_conf: ChConf, range: ScyllaSeriesRange, readopts: EventReadOpts, scyqueue: ScyllaQueue) -> Self { trace_init!("MergeRts readopts {readopts:?}"); - Self { - ch_conf, - range_mt: range.clone(), - range_lt: range.clone(), - range, - readopts, - scyqueue, - inp_st: None, - inp_mt: None, - inp_lt: None, - state: State::Begin, - buf_st: VecDeque::new(), - buf_mt: VecDeque::new(), - buf_lt: VecDeque::new(), - out: VecDeque::new(), - buf_before: None, - ts_seen_max: 0, - tracer: StreamImplTracer::new("MergeRts".into(), 2000, 2000), - } - } - - fn setup_first_st(&mut self) { - let rt = RetentionTime::Short; - let limbuf = &VecDeque::new(); - let inpdst = &mut self.inp_st; - let range = Self::constrained_range(&self.range, limbuf); - trace_fetch!("setup_first_st constrained beg {}", range.beg().ns()); - let tsbeg = range.beg(); - let inp = EventsStreamRt::new( - rt, - self.ch_conf.clone(), - range, - self.readopts.clone(), - self.scyqueue.clone(), - ); - let inp = TI::new(inp, tsbeg); - *inpdst = Some(Box::new(inp)); - } - - fn setup_first_mt(&mut self) { - let rt = RetentionTime::Medium; - let limbuf = &self.buf_st; - let inpdst = &mut self.inp_mt; - let range = Self::constrained_range(&self.range_mt, limbuf); - self.range_lt = range.clone(); - trace_fetch!("setup_first_mt constrained beg {}", range.beg().ns()); - let tsbeg = range.beg(); - let inp = EventsStreamRt::new( - rt, - self.ch_conf.clone(), - range, - self.readopts.clone(), - self.scyqueue.clone(), - ); - let inp = TI::new(inp, tsbeg); - *inpdst = Some(Box::new(inp)); - } - - fn setup_first_lt(&mut self) { - let rt = RetentionTime::Long; - let limbuf = &self.buf_mt; - let inpdst = &mut self.inp_lt; - let range = Self::constrained_range(&self.range_lt, limbuf); - trace_fetch!("setup_first_lt constrained beg {}", range.beg().ns()); - let tsbeg = range.beg(); - let inp = EventsStreamRt::new( - rt, - self.ch_conf.clone(), - range, - self.readopts.clone(), - self.scyqueue.clone(), - ); - let inp = TI::new(inp, tsbeg); - *inpdst = Some(Box::new(inp)); - } - - fn setup_read_st(&mut self) -> ReadEvents { - trace_fetch!("setup_read_st"); - Self::setup_read_any(&mut self.inp_st) - } - - fn setup_read_mt(&mut self) -> ReadEvents { - trace_fetch!("setup_read_mt"); - Self::setup_read_any(&mut self.inp_mt) - } - - fn setup_read_lt(&mut self) -> ReadEvents { - trace_fetch!("setup_read_lt"); - Self::setup_read_any(&mut self.inp_lt) - } - - fn setup_read_any(inp: &mut Option>) -> ReadEvents { - trace_fetch!("setup_read_any"); - let stream = unsafe { &mut *(inp.as_mut().unwrap().as_mut() as *mut TI) }; - let fut = Box::pin(stream.next()); - ReadEvents { fut } - } - - fn constrained_range(full: &ScyllaSeriesRange, buf: &VecDeque) -> ScyllaSeriesRange { - trace_fetch!("constrained_range {:?} {:?}", full, buf.front()); - if let Some(e) = buf.front() { - if let Some(ts) = e.ts_min() { - let nrange = NanoRange::from((full.beg().ns(), ts)); - ScyllaSeriesRange::from(&SeriesRange::from(nrange)) - } else { - debug!("constrained_range no ts even though should not have empty buffers"); - full.clone() + let inp_st = EventsStreamRt::new( + RetentionTime::Short, + ch_conf.clone(), + range.clone(), + readopts.clone(), + scyqueue.clone(), + ) + .map(|x| { + use RangeCompletableItem::*; + use StreamItem::*; + match x { + Ok(x) => Ok(DataItem(Data(x))), + Err(e) => Err(daqbuf_err::Error::from_string(e)), } - } else { - full.clone() - } - } - - fn handle_first_st(&mut self, mut before: ChannelEvents, bulk: ChannelEvents) { - trace_fetch!("handle_first_st"); - Self::move_latest_to_before_buf(&mut before, &mut self.buf_before); - self.buf_st.push_back(bulk); - self.setup_first_mt(); - self.state = State::FetchFirstMt(self.setup_read_mt()); - } - - fn handle_first_mt(&mut self, mut before: ChannelEvents, bulk: ChannelEvents) { - trace_fetch!("handle_first_mt"); - Self::move_latest_to_before_buf(&mut before, &mut self.buf_before); - self.buf_mt.push_back(bulk); - self.setup_first_lt(); - self.state = State::FetchFirstLt(self.setup_read_lt()); - } - - fn handle_first_lt(&mut self, mut before: ChannelEvents, bulk: ChannelEvents) { - trace_fetch!("handle_first_lt"); - Self::move_latest_to_before_buf(&mut before, &mut self.buf_before); - self.buf_lt.push_back(bulk); - self.push_out_one_before(); - let buf = core::mem::replace(&mut self.buf_lt, VecDeque::new()); - self.state = State::ReadingLt(None, buf, self.inp_lt.take()); - } - - fn move_latest_to_before_buf(before: &mut ChannelEvents, buf: &mut Option) { - trace_fetch!("move_latest_to_before_buf"); - if buf.is_none() { - *buf = Some(before.new_empty()); - } - let buf = buf.as_mut().unwrap(); - if let Some(tsn) = before.ts_max() { - if buf.ts_max().map_or(true, |x| tsn > x) { - let n = before.len(); - buf.clear(); - before.drain_into(buf, (n - 1, n)).unwrap(); + }); + let inp_mt = EventsStreamRt::new( + RetentionTime::Medium, + ch_conf.clone(), + range.clone(), + readopts.clone(), + scyqueue.clone(), + ) + .map(|x| { + use RangeCompletableItem::*; + use StreamItem::*; + match x { + Ok(x) => Ok(DataItem(Data(x))), + Err(e) => Err(daqbuf_err::Error::from_string(e)), } - } - } - - fn push_out_one_before(&mut self) { - if let Some(buf) = self.buf_before.take() { - trace_fetch!("push_out_one_before len {len:?}", len = buf.len()); - if buf.len() != 0 { - self.out.push_back(buf); + }); + let inp_lt = EventsStreamRt::new( + RetentionTime::Long, + ch_conf.clone(), + range.clone(), + readopts.clone(), + scyqueue.clone(), + ) + .map(|x| { + use RangeCompletableItem::*; + use StreamItem::*; + match x { + Ok(x) => Ok(DataItem(Data(x))), + Err(e) => Err(daqbuf_err::Error::from_string(e)), } - } else { - trace_fetch!("push_out_one_before no buffer"); - } + }); + let merger: Merger = + Merger::new(vec![Box::pin(inp_st), Box::pin(inp_mt), Box::pin(inp_lt)], None); + let stream = merger.filter_map(|x| { + // TODO all stream adapters must support Sitemty, otherwise range-final item gets dropped. + use RangeCompletableItem::*; + use StreamItem::*; + let x = match x { + Ok(x) => match x { + DataItem(x) => match x { + Data(x) => Some(Ok(x)), + _ => None, + }, + _ => None, + }, + Err(e) => Some(Err(e)), + }; + futures_util::future::ready(x) + }); + let stream = OneBeforeAndBulk::<_, ChannelEvents>::new(stream, range.beg(), "after-rt-merged".into()); + let stream = stream.map(|x| match x { + Ok(x) => match x { + crate::events2::onebeforeandbulk::Output::Before(x) => Ok(x), + crate::events2::onebeforeandbulk::Output::Bulk(x) => Ok(x), + }, + Err(e) => Err(sitem_err2_from_string(e)), + }); + let inp = Box::pin(stream); + Self { inp } } } @@ -321,276 +117,15 @@ impl Stream for MergeRts { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - tracer_poll_enter!(self); - let mut out2 = VecDeque::new(); loop { - tracer_loop_enter!(self); - while let Some(x) = out2.pop_front() { - self.out.push_back(x); - } - if let Some(item) = self.out.pop_front() { - trace_emit!("emit item {} {:?}", items_0::Events::verify(&item), item); - if items_0::Events::verify(&item) != true { - debug!("{}bad item {:?}", "\n\n--------------------------\n", item); - self.state = State::Done; - } - if let Some(item_min) = item.ts_min() { - if item_min < self.ts_seen_max { - debug!( - "{}ordering error A {} {}", - "\n\n--------------------------\n", item_min, self.ts_seen_max - ); - self.state = State::Done; - break Ready(Some(Err(Error::OrderMin))); - } - } - if let Some(item_max) = item.ts_max() { - if item_max < self.ts_seen_max { - debug!( - "{}ordering error B {} {}", - "\n\n--------------------------\n", item_max, self.ts_seen_max - ); - self.state = State::Done; - break Ready(Some(Err(Error::OrderMax))); - } else { - self.ts_seen_max = item_max; - } - } - if let Some(ix) = item.find_highest_index_lt(self.range.beg().ns()) { - trace_fetch!("see item before range ix {ix}"); - } - break Ready(Some(Ok(item))); - } - break match &mut self.state { - State::Begin => { - self.setup_first_st(); - self.state = State::FetchFirstSt(self.setup_read_st()); - continue; - } - State::FetchFirstSt(st2) => match st2.fut.poll_unpin(cx) { - Ready(Some(Ok(x))) => match x { - firstbefore::Output::First(before, bulk) => { - trace_fetch!("have first from ST"); - self.handle_first_st(before, bulk); - continue; - } - firstbefore::Output::Bulk(_) => { - self.state = State::Done; - let e = Error::Logic; - Ready(Some(Err(e))) - } - }, - Ready(Some(Err(e))) => { - self.state = State::Done; - Ready(Some(Err(e.into()))) - } - Ready(None) => { - trace_fetch!("no first from ST"); - self.inp_st = None; - self.setup_first_mt(); - self.state = State::FetchFirstMt(self.setup_read_mt()); - continue; - } - Pending => Pending, + break match self.inp.poll_next_unpin(cx) { + Ready(Some(x)) => match x { + Ok(x) => Ready(Some(Ok(x))), + Err(e) => Ready(Some(Err(Error::Msg(e.to_string())))), }, - State::FetchFirstMt(st2) => match st2.fut.poll_unpin(cx) { - Ready(Some(Ok(x))) => match x { - firstbefore::Output::First(before, bulk) => { - trace_fetch!("have first from MT"); - self.handle_first_mt(before, bulk); - continue; - } - firstbefore::Output::Bulk(_) => { - self.state = State::Done; - let e = Error::Logic; - Ready(Some(Err(e))) - } - }, - Ready(Some(Err(e))) => { - self.state = State::Done; - Ready(Some(Err(e.into()))) - } - Ready(None) => { - trace_fetch!("no first from MT"); - self.inp_mt = None; - self.setup_first_lt(); - self.state = State::FetchFirstLt(self.setup_read_lt()); - continue; - } - Pending => Pending, - }, - State::FetchFirstLt(st2) => match st2.fut.poll_unpin(cx) { - Ready(Some(Ok(x))) => match x { - firstbefore::Output::First(before, bulk) => { - trace_fetch!("have first from LT"); - self.handle_first_lt(before, bulk); - continue; - } - firstbefore::Output::Bulk(_) => { - self.state = State::Done; - let e = Error::Logic; - Ready(Some(Err(e))) - } - }, - Ready(Some(Err(e))) => { - self.state = State::Done; - Ready(Some(Err(e.into()))) - } - Ready(None) => { - trace_fetch!("no first from LT"); - self.inp_lt = None; - self.push_out_one_before(); - let buf = core::mem::replace(&mut self.buf_lt, VecDeque::new()); - self.state = State::ReadingLt(None, buf, self.inp_lt.take()); - continue; - } - Pending => Pending, - }, - State::ReadingLt(fut, buf, inp) => { - if let Some(x) = buf.pop_front() { - out2.push_back(x); - continue; - } else if let Some(fut2) = fut { - match fut2.fut.poll_unpin(cx) { - Ready(Some(Ok(x))) => { - *fut = None; - match x { - firstbefore::Output::Bulk(x) => { - buf.push_back(x); - continue; - } - firstbefore::Output::First(_, _) => { - self.state = State::Done; - let e = Error::Logic; - Ready(Some(Err(e))) - } - } - } - Ready(Some(Err(e))) => { - *fut = None; - self.state = State::Done; - Ready(Some(Err(e.into()))) - } - Ready(None) => { - *fut = None; - *inp = None; - continue; - } - Pending => Pending, - } - } else if inp.is_some() { - let buf = core::mem::replace(buf, VecDeque::new()); - self.state = State::ReadingLt(Some(Self::setup_read_any(inp)), buf, inp.take()); - continue; - } else { - trace_emit!("transition ReadingLt to ReadingMt"); - let buf = core::mem::replace(&mut self.buf_mt, VecDeque::new()); - self.state = State::ReadingMt(None, buf, self.inp_mt.take()); - continue; - } - } - State::ReadingMt(fut, buf, inp) => { - if let Some(x) = buf.pop_front() { - out2.push_back(x); - continue; - } else if let Some(fut2) = fut { - match fut2.fut.poll_unpin(cx) { - Ready(Some(Ok(x))) => { - *fut = None; - match x { - firstbefore::Output::Bulk(x) => { - buf.push_back(x); - continue; - } - firstbefore::Output::First(_, _) => { - self.state = State::Done; - let e = Error::Logic; - Ready(Some(Err(e))) - } - } - } - Ready(Some(Err(e))) => { - *fut = None; - self.state = State::Done; - Ready(Some(Err(e.into()))) - } - Ready(None) => { - *fut = None; - *inp = None; - continue; - } - Pending => Pending, - } - } else if inp.is_some() { - let buf = core::mem::replace(buf, VecDeque::new()); - self.state = State::ReadingMt(Some(Self::setup_read_any(inp)), buf, inp.take()); - continue; - } else { - trace_emit!("transition ReadingMt to ReadingSt"); - let buf = core::mem::replace(&mut self.buf_st, VecDeque::new()); - self.state = State::ReadingSt(None, buf, self.inp_st.take()); - continue; - } - } - State::ReadingSt(fut, buf, inp) => { - if let Some(x) = buf.pop_front() { - out2.push_back(x); - continue; - } else if let Some(fut2) = fut { - match fut2.fut.poll_unpin(cx) { - Ready(Some(Ok(x))) => { - *fut = None; - match x { - firstbefore::Output::Bulk(x) => { - buf.push_back(x); - continue; - } - firstbefore::Output::First(_, _) => { - self.state = State::Done; - let e = Error::Logic; - Ready(Some(Err(e))) - } - } - } - Ready(Some(Err(e))) => { - *fut = None; - self.state = State::Done; - Ready(Some(Err(e.into()))) - } - Ready(None) => { - *fut = None; - *inp = None; - continue; - } - Pending => Pending, - } - } else if inp.is_some() { - let buf = core::mem::replace(buf, VecDeque::new()); - self.state = State::ReadingSt(Some(Self::setup_read_any(inp)), buf, inp.take()); - continue; - } else { - trace_emit!("fully done"); - Ready(None) - } - } - State::Done => Ready(None), + Ready(None) => Ready(None), + Pending => Pending, }; } } } - -fn trait_assert(_: T) -where - T: Stream + Unpin + Send, -{ -} - -#[allow(unused)] -fn trait_assert_try() { - let x: MergeRts = phantomval(); - trait_assert(x); -} - -fn phantomval() -> T { - panic!() -} diff --git a/crates/scyllaconn/src/events2/mergertchained.rs b/crates/scyllaconn/src/events2/mergertchained.rs new file mode 100644 index 0000000..49ae1a3 --- /dev/null +++ b/crates/scyllaconn/src/events2/mergertchained.rs @@ -0,0 +1,613 @@ +use super::events::EventReadOpts; +use super::events::EventsStreamRt; +use super::onebeforeandbulk::OneBeforeAndBulk; +use crate::events2::onebeforeandbulk; +use crate::range::ScyllaSeriesRange; +use crate::worker::ScyllaQueue; +use daqbuf_err as err; +use err::thiserror; +use err::ThisError; +use futures_util::Future; +use futures_util::FutureExt; +use futures_util::Stream; +use futures_util::StreamExt; +use items_0::WithLen; +use items_2::channelevents::ChannelEvents; +use items_2::merger::Mergeable; +use netpod::log::*; +use netpod::range::evrange::NanoRange; +use netpod::range::evrange::SeriesRange; +use netpod::stream_impl_tracer::StreamImplTracer; +use netpod::ttl::RetentionTime; +use netpod::ChConf; +use netpod::TsNano; +use std::collections::VecDeque; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace!($($arg)*); } ) } + +macro_rules! trace_fetch { ($($arg:tt)*) => ( if true { trace!($($arg)*); } ) } + +macro_rules! trace_emit { ($($arg:tt)*) => ( if true { trace!($($arg)*); } ) } + +macro_rules! trace_switch { ($($arg:tt)*) => ( if true { trace!($($arg)*); } ) } + +macro_rules! tracer_poll_enter { + ($self:expr) => { + if false && $self.tracer.poll_enter() { + return Ready(Some(Err(Error::LimitPoll))); + } + }; +} + +macro_rules! tracer_loop_enter { + ($self:expr) => { + if false && $self.tracer.loop_enter() { + return Ready(Some(Err(Error::LimitLoop))); + } + }; +} + +#[derive(Debug, ThisError)] +#[cstm(name = "EventsMergeRt")] +pub enum Error { + Input(#[from] crate::events2::onebeforeandbulk::Error), + Events(#[from] crate::events2::events::Error), + Logic, + OrderMin, + OrderMax, + LimitPoll, + LimitLoop, +} + +#[allow(unused)] +enum Resolvable +where + F: Future, +{ + Future(F), + Output(::Output), + Taken, +} + +#[allow(unused)] +impl Resolvable +where + F: Future, +{ + fn unresolved(&self) -> bool { + match self { + Resolvable::Future(_) => true, + Resolvable::Output(_) => false, + Resolvable::Taken => false, + } + } + + fn take(&mut self) -> Option<::Output> { + let x = std::mem::replace(self, Resolvable::Taken); + match x { + Resolvable::Future(_) => None, + Resolvable::Output(x) => Some(x), + Resolvable::Taken => None, + } + } +} + +impl Future for Resolvable +where + F: Future + Unpin, +{ + type Output = ::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<::Output> { + match unsafe { self.get_unchecked_mut() } { + Resolvable::Future(fut) => fut.poll_unpin(cx), + Resolvable::Output(_) => panic!(), + Resolvable::Taken => panic!(), + } + } +} + +type TI = OneBeforeAndBulk; +type INPI = Result, crate::events2::onebeforeandbulk::Error>; + +struct ReadEvents { + fut: Pin> + Send>>, +} + +enum State { + Begin, + FetchFirstSt(ReadEvents), + FetchFirstMt(ReadEvents), + FetchFirstLt(ReadEvents), + ReadingLt(Option, VecDeque, Option>), + ReadingMt(Option, VecDeque, Option>), + ReadingSt(Option, VecDeque, Option>), + Done, +} + +pub struct MergeRtsChained { + ch_conf: ChConf, + range: ScyllaSeriesRange, + range_st: ScyllaSeriesRange, + range_mt: ScyllaSeriesRange, + range_lt: ScyllaSeriesRange, + readopts: EventReadOpts, + scyqueue: ScyllaQueue, + inp_st: Option>, + inp_mt: Option>, + inp_lt: Option>, + state: State, + buf_st: VecDeque, + buf_mt: VecDeque, + buf_lt: VecDeque, + out: VecDeque, + buf_before: Option, + ts_seen_max: u64, + tracer: StreamImplTracer, +} + +impl MergeRtsChained { + pub fn new(ch_conf: ChConf, range: ScyllaSeriesRange, readopts: EventReadOpts, scyqueue: ScyllaQueue) -> Self { + trace_init!("MergeRtsChained readopts {readopts:?}"); + Self { + ch_conf, + range_st: range.clone(), + range_mt: range.clone(), + range_lt: range.clone(), + range, + readopts, + scyqueue, + inp_st: None, + inp_mt: None, + inp_lt: None, + state: State::Begin, + buf_st: VecDeque::new(), + buf_mt: VecDeque::new(), + buf_lt: VecDeque::new(), + out: VecDeque::new(), + buf_before: None, + ts_seen_max: 0, + tracer: StreamImplTracer::new("MergeRtsChained".into(), 2000, 2000), + } + } + + fn setup_first_st(&mut self) { + let rt = RetentionTime::Short; + let limbuf = &VecDeque::new(); + let inpdst = &mut self.inp_st; + let range = Self::constrained_range(&self.range_st, limbuf); + self.range_st = range.clone(); + self.range_mt = range.clone(); + self.range_lt = range.clone(); + trace_fetch!("setup_first_st constrained beg {}", range.beg().ns()); + let tsbeg = range.beg(); + let inp = EventsStreamRt::new( + rt, + self.ch_conf.clone(), + range, + self.readopts.clone(), + self.scyqueue.clone(), + ); + let inp = TI::new(inp, tsbeg, "ST".into()); + *inpdst = Some(Box::new(inp)); + } + + fn setup_first_mt(&mut self) { + let rt = RetentionTime::Medium; + let limbuf = &self.buf_st; + let inpdst = &mut self.inp_mt; + let range = Self::constrained_range(&self.range_mt, limbuf); + self.range_mt = range.clone(); + self.range_lt = range.clone(); + trace_fetch!("setup_first_mt constrained beg {}", range.beg().ns()); + let tsbeg = range.beg(); + let inp = EventsStreamRt::new( + rt, + self.ch_conf.clone(), + range, + self.readopts.clone(), + self.scyqueue.clone(), + ); + let inp = TI::new(inp, tsbeg, "MT".into()); + *inpdst = Some(Box::new(inp)); + } + + fn setup_first_lt(&mut self) { + let rt = RetentionTime::Long; + let limbuf = &self.buf_mt; + let inpdst = &mut self.inp_lt; + let range = Self::constrained_range(&self.range_lt, limbuf); + self.range_lt = range.clone(); + trace_fetch!("setup_first_lt constrained beg {}", range.beg().ns()); + let tsbeg = range.beg(); + let inp = EventsStreamRt::new( + rt, + self.ch_conf.clone(), + range, + self.readopts.clone(), + self.scyqueue.clone(), + ); + let inp = TI::new(inp, tsbeg, "LT".into()); + *inpdst = Some(Box::new(inp)); + } + + fn setup_read_st(&mut self) -> ReadEvents { + trace_fetch!("setup_read_st"); + Self::setup_read_any(&mut self.inp_st) + } + + fn setup_read_mt(&mut self) -> ReadEvents { + trace_fetch!("setup_read_mt"); + Self::setup_read_any(&mut self.inp_mt) + } + + fn setup_read_lt(&mut self) -> ReadEvents { + trace_fetch!("setup_read_lt"); + Self::setup_read_any(&mut self.inp_lt) + } + + fn setup_read_any(inp: &mut Option>) -> ReadEvents { + trace_fetch!("setup_read_any"); + let stream = unsafe { &mut *(inp.as_mut().unwrap().as_mut() as *mut TI) }; + let fut = Box::pin(stream.next()); + ReadEvents { fut } + } + + fn constrained_range(full: &ScyllaSeriesRange, buf: &VecDeque) -> ScyllaSeriesRange { + trace_fetch!("constrained_range {:?} {:?}", full, buf.front()); + if let Some(e) = buf.front() { + if let Some(ts) = e.ts_min() { + let nrange = NanoRange::from((full.beg().ns(), ts)); + ScyllaSeriesRange::from(&SeriesRange::from(nrange)) + } else { + full.clone() + } + } else { + full.clone() + } + } + + fn handle_first_st(&mut self, mut before: ChannelEvents, bulk: ChannelEvents) { + trace_fetch!("handle_first_st"); + Self::move_latest_to_before_buf(&mut before, &mut self.buf_before); + self.buf_st.push_back(bulk); + self.setup_first_mt(); + self.state = State::FetchFirstMt(self.setup_read_mt()); + } + + fn handle_first_mt(&mut self, mut before: ChannelEvents, bulk: ChannelEvents) { + trace_fetch!("handle_first_mt"); + Self::move_latest_to_before_buf(&mut before, &mut self.buf_before); + self.buf_mt.push_back(bulk); + self.setup_first_lt(); + self.state = State::FetchFirstLt(self.setup_read_lt()); + } + + fn handle_first_lt(&mut self, mut before: ChannelEvents, bulk: ChannelEvents) { + trace_fetch!("handle_first_lt"); + Self::move_latest_to_before_buf(&mut before, &mut self.buf_before); + self.buf_lt.push_back(bulk); + } + + fn handle_all_firsts_done(&mut self) { + trace_switch!( + "CONSIDERED RANGES:\nFULL {:?}\nST {:?}\nMT {:?}\nLT {:?}\n", + self.range, + self.range_st, + self.range_mt, + self.range_lt + ); + self.push_out_one_before(); + let buf = core::mem::replace(&mut self.buf_lt, VecDeque::new()); + self.state = State::ReadingLt(None, buf, self.inp_lt.take()); + } + + fn move_latest_to_before_buf(before: &mut ChannelEvents, buf: &mut Option) { + let buf = buf.get_or_insert_with(|| { + trace_fetch!("move_latest_to_before_buf init before buf"); + before.new_empty() + }); + if let Some(tsn) = before.ts_max() { + let tsn = TsNano::from_ns(tsn); + if buf.ts_max().map_or(true, |x| tsn.ns() > x) { + trace_fetch!("move_latest_to_before_buf move possible before item {tsn}"); + let n = before.len(); + buf.clear(); + before.drain_into(buf, (n - 1, n)).unwrap(); + } + } + } + + fn push_out_one_before(&mut self) { + if let Some(buf) = self.buf_before.take() { + trace_fetch!("push_out_one_before len {len:?}", len = buf.len()); + if buf.len() != 0 { + self.out.push_back(buf); + } + } else { + trace_fetch!("push_out_one_before no buffer"); + } + } +} + +impl Stream for MergeRtsChained { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + tracer_poll_enter!(self); + let mut out2 = VecDeque::new(); + loop { + tracer_loop_enter!(self); + while let Some(x) = out2.pop_front() { + self.out.push_back(x); + } + if let Some(item) = self.out.pop_front() { + trace_emit!("emit item {} {:?}", items_0::Events::verify(&item), item); + if items_0::Events::verify(&item) != true { + debug!("{}bad item {:?}", "\n\n--------------------------\n", item); + self.state = State::Done; + } + if let Some(item_min) = item.ts_min() { + if item_min < self.ts_seen_max { + debug!( + "{}ordering error A {} {}", + "\n\n--------------------------\n", item_min, self.ts_seen_max + ); + self.state = State::Done; + break Ready(Some(Err(Error::OrderMin))); + } + } + if let Some(item_max) = item.ts_max() { + if item_max < self.ts_seen_max { + debug!( + "{}ordering error B {} {}", + "\n\n--------------------------\n", item_max, self.ts_seen_max + ); + self.state = State::Done; + break Ready(Some(Err(Error::OrderMax))); + } else { + self.ts_seen_max = item_max; + } + } + if let Some(ix) = item.find_highest_index_lt(self.range.beg().ns()) { + trace_fetch!("see item before range ix {ix}"); + } + break Ready(Some(Ok(item))); + } + break match &mut self.state { + State::Begin => { + self.setup_first_st(); + self.state = State::FetchFirstSt(self.setup_read_st()); + continue; + } + State::FetchFirstSt(st2) => match st2.fut.poll_unpin(cx) { + Ready(Some(Ok(x))) => match x { + onebeforeandbulk::Output::Before(before) => { + trace_fetch!("have first from ST"); + let empty = before.new_empty(); + self.handle_first_st(before, empty); + continue; + } + onebeforeandbulk::Output::Bulk(item) => { + self.handle_first_st(item.new_empty(), item); + continue; + } + }, + Ready(Some(Err(e))) => { + self.state = State::Done; + Ready(Some(Err(e.into()))) + } + Ready(None) => { + trace_fetch!("no first from ST"); + self.inp_st = None; + self.setup_first_mt(); + self.state = State::FetchFirstMt(self.setup_read_mt()); + continue; + } + Pending => Pending, + }, + State::FetchFirstMt(st2) => match st2.fut.poll_unpin(cx) { + Ready(Some(Ok(x))) => match x { + onebeforeandbulk::Output::Before(before) => { + trace_fetch!("have first from MT"); + let empty = before.new_empty(); + self.handle_first_mt(before, empty); + continue; + } + onebeforeandbulk::Output::Bulk(item) => { + self.handle_first_mt(item.new_empty(), item); + continue; + } + }, + Ready(Some(Err(e))) => { + self.state = State::Done; + Ready(Some(Err(e.into()))) + } + Ready(None) => { + trace_fetch!("no first from MT"); + self.inp_mt = None; + self.setup_first_lt(); + self.state = State::FetchFirstLt(self.setup_read_lt()); + continue; + } + Pending => Pending, + }, + State::FetchFirstLt(st2) => match st2.fut.poll_unpin(cx) { + Ready(Some(Ok(x))) => match x { + onebeforeandbulk::Output::Before(before) => { + trace_fetch!("have first from LT"); + let empty = before.new_empty(); + self.handle_first_lt(before, empty); + self.handle_all_firsts_done(); + continue; + } + onebeforeandbulk::Output::Bulk(item) => { + self.handle_first_lt(item.new_empty(), item); + self.handle_all_firsts_done(); + continue; + } + }, + Ready(Some(Err(e))) => { + self.state = State::Done; + Ready(Some(Err(e.into()))) + } + Ready(None) => { + trace_fetch!("no first from LT"); + self.inp_lt = None; + self.handle_all_firsts_done(); + continue; + } + Pending => Pending, + }, + State::ReadingLt(fut, buf, inp) => { + if let Some(x) = buf.pop_front() { + out2.push_back(x); + continue; + } else if let Some(fut2) = fut { + match fut2.fut.poll_unpin(cx) { + Ready(Some(Ok(x))) => { + *fut = None; + match x { + onebeforeandbulk::Output::Bulk(x) => { + buf.push_back(x); + continue; + } + onebeforeandbulk::Output::Before(_) => { + self.state = State::Done; + let e = Error::Logic; + Ready(Some(Err(e))) + } + } + } + Ready(Some(Err(e))) => { + *fut = None; + self.state = State::Done; + Ready(Some(Err(e.into()))) + } + Ready(None) => { + *fut = None; + *inp = None; + continue; + } + Pending => Pending, + } + } else if inp.is_some() { + let buf = core::mem::replace(buf, VecDeque::new()); + self.state = State::ReadingLt(Some(Self::setup_read_any(inp)), buf, inp.take()); + continue; + } else { + trace_emit!("transition ReadingLt to ReadingMt"); + let buf = core::mem::replace(&mut self.buf_mt, VecDeque::new()); + self.state = State::ReadingMt(None, buf, self.inp_mt.take()); + continue; + } + } + State::ReadingMt(fut, buf, inp) => { + if let Some(x) = buf.pop_front() { + out2.push_back(x); + continue; + } else if let Some(fut2) = fut { + match fut2.fut.poll_unpin(cx) { + Ready(Some(Ok(x))) => { + *fut = None; + match x { + onebeforeandbulk::Output::Bulk(x) => { + buf.push_back(x); + continue; + } + onebeforeandbulk::Output::Before(_) => { + self.state = State::Done; + let e = Error::Logic; + Ready(Some(Err(e))) + } + } + } + Ready(Some(Err(e))) => { + *fut = None; + self.state = State::Done; + Ready(Some(Err(e.into()))) + } + Ready(None) => { + *fut = None; + *inp = None; + continue; + } + Pending => Pending, + } + } else if inp.is_some() { + let buf = core::mem::replace(buf, VecDeque::new()); + self.state = State::ReadingMt(Some(Self::setup_read_any(inp)), buf, inp.take()); + continue; + } else { + trace_emit!("transition ReadingMt to ReadingSt"); + let buf = core::mem::replace(&mut self.buf_st, VecDeque::new()); + self.state = State::ReadingSt(None, buf, self.inp_st.take()); + continue; + } + } + State::ReadingSt(fut, buf, inp) => { + if let Some(x) = buf.pop_front() { + out2.push_back(x); + continue; + } else if let Some(fut2) = fut { + match fut2.fut.poll_unpin(cx) { + Ready(Some(Ok(x))) => { + *fut = None; + match x { + onebeforeandbulk::Output::Bulk(x) => { + buf.push_back(x); + continue; + } + onebeforeandbulk::Output::Before(_) => { + self.state = State::Done; + let e = Error::Logic; + Ready(Some(Err(e))) + } + } + } + Ready(Some(Err(e))) => { + *fut = None; + self.state = State::Done; + Ready(Some(Err(e.into()))) + } + Ready(None) => { + *fut = None; + *inp = None; + continue; + } + Pending => Pending, + } + } else if inp.is_some() { + let buf = core::mem::replace(buf, VecDeque::new()); + self.state = State::ReadingSt(Some(Self::setup_read_any(inp)), buf, inp.take()); + continue; + } else { + trace_emit!("fully done"); + Ready(None) + } + } + State::Done => Ready(None), + }; + } + } +} + +fn trait_assert(_: T) +where + T: Stream + Unpin + Send, +{ +} + +#[allow(unused)] +fn trait_assert_try() { + let x: MergeRtsChained = phantomval(); + trait_assert(x); +} + +fn phantomval() -> T { + panic!() +} diff --git a/crates/scyllaconn/src/events2/onebeforeandbulk.rs b/crates/scyllaconn/src/events2/onebeforeandbulk.rs new file mode 100644 index 0000000..b3f8105 --- /dev/null +++ b/crates/scyllaconn/src/events2/onebeforeandbulk.rs @@ -0,0 +1,315 @@ +use daqbuf_err as err; +use err::thiserror; +use err::ThisError; +use futures_util::Stream; +use futures_util::StreamExt; +use items_0::Events; +use items_2::merger::Mergeable; +use netpod::log::*; +use netpod::stream_impl_tracer::StreamImplTracer; +use netpod::TsNano; +use std::collections::VecDeque; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +macro_rules! trace_transition { ($($arg:tt)*) => ( if true { trace!($($arg)*); } ) } + +macro_rules! trace_emit { ($($arg:tt)*) => ( if true { trace!($($arg)*); } ) } + +macro_rules! tracer_poll_enter { + ($self:expr) => { + if false && $self.tracer.poll_enter() { + return Ready(Some(Err(Error::LimitPoll))); + } + }; +} + +macro_rules! tracer_loop_enter { + ($self:expr) => { + if false && $self.tracer.loop_enter() { + return Ready(Some(Err(Error::LimitLoop))); + } + }; +} + +#[allow(unused)] +macro_rules! debug_fetch { ($($arg:tt)*) => ( if true { debug!($($arg)*); } ) } + +#[derive(Debug, ThisError)] +#[cstm(name = "EventsOneBeforeAndBulk")] +pub enum Error { + Unordered, + Logic, + Input(Box), + LimitPoll, + LimitLoop, +} + +#[derive(Debug)] +pub enum Output { + Before(T), + Bulk(T), +} + +enum State { + Begin, + Bulk, + Done, +} + +pub struct OneBeforeAndBulk +where + S: Stream + Unpin, + T: Mergeable + Unpin, +{ + ts0: TsNano, + inp: S, + state: State, + buf: Option, + out: VecDeque, + tracer: StreamImplTracer, + seen_empty_during_begin: bool, + seen_empty_during_bulk: bool, + dbgname: String, + tslast: TsNano, +} + +impl OneBeforeAndBulk +where + S: Stream + Unpin, + T: Mergeable + Unpin, +{ + fn selfname() -> &'static str { + std::any::type_name::() + } + + pub fn new(inp: S, ts0: TsNano, dbgname: String) -> Self { + trace_transition!("{}::new", Self::selfname()); + Self { + ts0, + inp, + state: State::Begin, + buf: None, + out: VecDeque::new(), + tracer: StreamImplTracer::new(Self::selfname().into(), 2000, 100), + seen_empty_during_begin: false, + seen_empty_during_bulk: false, + dbgname, + tslast: TsNano::from_ns(0), + } + } + + fn consume_buf_get_latest(&mut self) -> Option { + if let Some(mut buf) = self.buf.take() { + if buf.len() == 0 { + debug!("buf set but empty"); + None + } else { + let mut ret = buf.new_empty(); + buf.drain_into(&mut ret, (buf.len() - 1, buf.len())); + Some(ret) + } + } else { + None + } + } +} + +impl Stream for OneBeforeAndBulk +where + S: Stream> + Unpin, + T: Events + Mergeable + Unpin, + E: std::error::Error + Send + 'static, +{ + type Item = Result, Error>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + tracer_poll_enter!(self); + loop { + tracer_loop_enter!(self); + break if let Some(item) = self.out.pop_front() { + Ready(Some(Ok(Output::Bulk(item)))) + } else { + match &self.state { + State::Begin => match self.inp.poll_next_unpin(cx) { + Ready(Some(Ok(mut item))) => { + if let Some(tsmin) = Mergeable::ts_min(&item) { + let tsmin = TsNano::from_ns(tsmin); + if tsmin < self.tslast { + self.state = State::Done; + let e = Error::Unordered; + break Ready(Some(Err(e))); + } else { + self.tslast = TsNano::from_ns(Mergeable::ts_max(&item).unwrap()); + } + } + if item.verify() != true { + self.state = State::Done; + let e = Error::Unordered; + Ready(Some(Err(e))) + } else { + if item.len() == 0 { + self.seen_empty_during_begin = true; + } else { + if self.seen_empty_during_begin { + debug_fetch!( + "still in Begin current event len {} but seen empty before", + item.len() + ); + } + } + // Separate events into before and bulk + let tss = Events::tss(&item); + let pp = tss.partition_point(|&x| x < self.ts0.ns()); + trace_transition!("partition_point {pp:?} {n:?}", n = tss.len()); + if pp > item.len() { + error!("bad partition point {} {}", pp, item.len()); + self.state = State::Done; + Ready(Some(Err(Error::Logic))) + } else if pp == item.len() { + // all entries are before, or empty item + trace_transition!("stay in Begin"); + trace_emit!( + "State::Begin Before {} all content still before len {}", + self.dbgname, + item.len() + ); + let buf = self.buf.get_or_insert_with(|| item.new_empty()); + match item.drain_into_evs(buf, (0, item.len())) { + Ok(()) => { + continue; + } + Err(e) => { + self.state = State::Done; + Ready(Some(Err(Error::Input(Box::new(e))))) + } + } + } else if pp == 0 { + // all entries are bulk + trace_transition!("transition with bulk to Bulk"); + self.state = State::Bulk; + if let Some(before) = self.consume_buf_get_latest() { + self.out.push_back(item); + let item = Output::Before(before); + trace_emit!("State::Begin Before {} emit {:?}", self.dbgname, item); + Ready(Some(Ok(item))) + } else { + let item = Output::Bulk(item); + trace_emit!("State::Begin Bulk {} emit {:?}", self.dbgname, item); + Ready(Some(Ok(item))) + } + } else { + // mixed + trace_transition!("transition with mixed to Bulk"); + self.state = State::Bulk; + let buf = self.buf.get_or_insert_with(|| item.new_empty()); + match item.drain_into_evs(buf, (0, pp)) { + Ok(()) => { + if let Some(before) = self.consume_buf_get_latest() { + self.out.push_back(item); + let item = Output::Before(before); + trace_emit!("State::Begin Before {} emit {:?}", self.dbgname, item); + Ready(Some(Ok(item))) + } else { + let item = Output::Bulk(item); + trace_emit!("State::Begin Bulk {} emit {:?}", self.dbgname, item); + Ready(Some(Ok(item))) + } + } + Err(e) => { + self.state = State::Done; + let e = Error::Input(Box::new(e)); + Ready(Some(Err(e))) + } + } + } + } + } + Ready(Some(Err(e))) => { + self.state = State::Done; + Ready(Some(Err(Error::Input(Box::new(e))))) + } + Ready(None) => { + self.state = State::Done; + trace_transition!("transition from Begin to end of stream"); + if let Some(before) = self.consume_buf_get_latest() { + let item = Output::Before(before); + trace_emit!("State::Begin EOS {} emit {:?}", self.dbgname, item); + Ready(Some(Ok(item))) + } else { + trace_emit!("State::Begin EOS {} emit None", self.dbgname); + Ready(None) + } + } + Pending => Pending, + }, + State::Bulk => { + if self.buf.is_some() { + let n = self.buf.as_ref().map_or(0, |x| x.len()); + error!("State::Bulk but buf non-empty {}", n); + self.state = State::Done; + Ready(Some(Err(Error::Logic))) + } else { + match self.inp.poll_next_unpin(cx) { + Ready(Some(Ok(item))) => { + if let Some(tsmin) = Mergeable::ts_min(&item) { + let tsmin = TsNano::from_ns(tsmin); + if tsmin < self.tslast { + self.state = State::Done; + let e = Error::Unordered; + break Ready(Some(Err(e))); + } else { + self.tslast = TsNano::from_ns(Mergeable::ts_max(&item).unwrap()); + } + } + if item.verify() != true { + self.state = State::Done; + let e = Error::Unordered; + Ready(Some(Err(e))) + } else { + if item.len() == 0 { + self.seen_empty_during_bulk = true; + } + let item = Output::Bulk(item); + trace_emit!("State::Bulk data {} emit {:?}", self.dbgname, item); + Ready(Some(Ok(item))) + } + } + Ready(Some(Err(e))) => { + self.state = State::Done; + Ready(Some(Err(Error::Input(Box::new(e))))) + } + Ready(None) => { + trace_emit!("in bulk, input done"); + self.state = State::Done; + trace_emit!("State::Bulk EOS {} emit None", self.dbgname); + Ready(None) + } + Pending => Pending, + } + } + } + State::Done => Ready(None), + } + }; + } + } +} + +fn trait_assert(_: T) +where + T: Stream + Unpin + Send, +{ +} + +#[allow(unused)] +fn trait_assert_try() { + let x: OneBeforeAndBulk = phantomval(); + trait_assert(x); +} + +fn phantomval() -> T { + panic!() +}