Retrieve from all RT streams merged

This commit is contained in:
Dominik Werder
2024-11-12 13:40:01 +01:00
parent 2a4e6a5169
commit 068d6ab71f
8 changed files with 1020 additions and 808 deletions

View File

@@ -115,17 +115,6 @@ async fn make_channel_events_stream_data(
}
}
async fn make_channel_events_stream(
subq: EventsSubQuery,
reqctx: ReqCtxArc,
scyqueue: Option<&ScyllaQueue>,
ncc: &NodeConfigCached,
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>>, Error> {
let stream = make_channel_events_stream_data(subq, reqctx, scyqueue, ncc).await?;
let ret = Box::pin(stream);
Ok(ret)
}
pub async fn create_response_bytes_stream(
evq: EventsSubQuery,
scyqueue: Option<&ScyllaQueue>,
@@ -151,7 +140,7 @@ pub async fn create_response_bytes_stream(
Ok(ret)
} else {
let mut tr = build_event_transform(evq.transform())?;
let stream = make_channel_events_stream(evq, reqctx, scyqueue, ncc).await?;
let stream = make_channel_events_stream_data(evq, reqctx, scyqueue, ncc).await?;
let stream = stream.map(move |x| {
on_sitemty_data!(x, |x: ChannelEvents| {
match x {
@@ -167,7 +156,6 @@ pub async fn create_response_bytes_stream(
}
})
});
// let stream = stream.map(move |x| Box::new(x) as Box<dyn Framable + Send>);
let stream = stream.map(|x| {
x.make_frame_dyn()
.map(bytes::BytesMut::freeze)

View File

@@ -45,7 +45,7 @@ pub async fn scylla_channel_event_stream(
evq.settings().scylla_read_queue_len(),
);
let stream: Pin<Box<dyn Stream<Item = _> + Send>> = if let Some(rt) = evq.use_rt() {
info!("========= SOLO {rt:?} =====================");
trace!("========= SOLO {rt:?} =====================");
let x = scyllaconn::events2::events::EventsStreamRt::new(
rt,
chconf.clone(),
@@ -53,10 +53,10 @@ pub async fn scylla_channel_event_stream(
readopts,
scyqueue.clone(),
)
.map_err(|e| scyllaconn::events2::mergert::Error::from(e));
.map_err(|e| scyllaconn::events2::mergert::Error::Msg(e.to_string()));
Box::pin(x)
} else {
info!("========= MERGED =====================");
trace!("========= MERGED =====================");
let x =
scyllaconn::events2::mergert::MergeRts::new(chconf.clone(), evq.range().into(), readopts, scyqueue.clone());
Box::pin(x)

View File

@@ -46,7 +46,7 @@ pub async fn worker_write(
stmts_cache: &StmtsCache,
scy: &ScySession,
) -> Result<(), streams::timebin::cached::reader::Error> {
for ((((((&ts1, &ts2), &cnt), &min), &max), &avg), &lst) in bins.zip_iter() {
for (((((((&ts1, &ts2), &cnt), &min), &max), &avg), &lst), &fnl) in bins.zip_iter() {
let bin_len = DtMs::from_ms_u64((ts2.ns() - ts1.ns()) / 1000000);
let div = streams::timebin::cached::reader::part_len(bin_len).ns();
let msp = ts1.ns() / div;

View File

@@ -1,6 +1,7 @@
pub mod events;
pub mod firstbefore;
pub mod mergert;
pub mod mergertchained;
pub mod msp;
pub mod nonempty;
pub mod onebeforeandbulk;
pub mod prepare;

View File

@@ -1,240 +0,0 @@
use daqbuf_err as err;
use err::thiserror;
use err::ThisError;
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::Events;
use items_2::merger::Mergeable;
use netpod::log::*;
use netpod::stream_impl_tracer::StreamImplTracer;
use netpod::TsNano;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
#[allow(unused)]
macro_rules! trace_transition {
($($arg:tt)*) => {
if true {
trace!($($arg)*);
}
};
}
#[allow(unused)]
macro_rules! trace_emit {
($($arg:tt)*) => {
if true {
trace!($($arg)*);
}
};
}
macro_rules! tracer_poll_enter {
($self:expr) => {
if false && $self.tracer.poll_enter() {
return Ready(Some(Err(Error::LimitPoll)));
}
};
}
macro_rules! tracer_loop_enter {
($self:expr) => {
if false && $self.tracer.loop_enter() {
return Ready(Some(Err(Error::LimitLoop)));
}
};
}
#[derive(Debug, ThisError)]
#[cstm(name = "EventsFirstBefore")]
pub enum Error {
Unordered,
Logic,
Input(Box<dyn std::error::Error + Send>),
LimitPoll,
LimitLoop,
}
pub enum Output<T> {
First(T, T),
Bulk(T),
}
enum State {
Begin,
Bulk,
Done,
}
pub struct FirstBeforeAndInside<S, T>
where
S: Stream + Unpin,
T: Events + Mergeable + Unpin,
{
ts0: TsNano,
inp: S,
state: State,
buf: Option<T>,
tracer: StreamImplTracer,
}
impl<S, T> FirstBeforeAndInside<S, T>
where
S: Stream + Unpin,
T: Events + Mergeable + Unpin,
{
pub fn new(inp: S, ts0: TsNano) -> Self {
trace_transition!("FirstBeforeAndInside::new");
Self {
ts0,
inp,
state: State::Begin,
buf: None,
tracer: StreamImplTracer::new("FirstBeforeAndInside".into(), 2000, 100),
}
}
}
impl<S, T, E> Stream for FirstBeforeAndInside<S, T>
where
S: Stream<Item = Result<T, E>> + Unpin,
T: Events + Mergeable + Unpin,
E: std::error::Error + Send + 'static,
{
type Item = Result<Output<T>, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
tracer_poll_enter!(self);
loop {
tracer_loop_enter!(self);
break match &self.state {
State::Begin => match self.inp.poll_next_unpin(cx) {
Ready(Some(Ok(mut item))) => {
// It is an invariant that we process ordered streams, but for robustness
// verify the batch item again:
if item.verify() != true {
self.state = State::Done;
let e = Error::Unordered;
Ready(Some(Err(e)))
} else {
// Separate events into before and bulk
let tss = Events::tss(&item);
let pp = tss.partition_point(|&x| x < self.ts0.ns());
trace_transition!("partition_point {pp:?} {n:?}", n = tss.len());
if pp > item.len() {
error!("bad partition point {} {}", pp, item.len());
self.state = State::Done;
Ready(Some(Err(Error::Logic)))
} else if pp == item.len() {
// all entries are before, or empty item
if self.buf.is_none() {
self.buf = Some(item.new_empty());
}
match item.drain_into_evs(self.buf.as_mut().unwrap(), (0, item.len())) {
Ok(()) => {
continue;
}
Err(e) => {
self.state = State::Done;
Ready(Some(Err(Error::Input(Box::new(e)))))
}
}
} else if pp == 0 {
// all entries are bulk
trace_transition!("transition immediately to bulk");
self.state = State::Bulk;
let o1 = core::mem::replace(&mut self.buf, Some(item.new_empty()))
.unwrap_or_else(|| item.new_empty());
Ready(Some(Ok(Output::First(o1, item))))
} else {
// mixed
if self.buf.is_none() {
self.buf = Some(item.new_empty());
}
match item.drain_into_evs(self.buf.as_mut().unwrap(), (0, pp)) {
Ok(()) => {
trace_transition!("transition with mixed to bulk");
self.state = State::Bulk;
let o1 = core::mem::replace(&mut self.buf, Some(item.new_empty()))
.unwrap_or_else(|| item.new_empty());
Ready(Some(Ok(Output::First(o1, item))))
}
Err(e) => {
self.state = State::Done;
Ready(Some(Err(Error::Input(Box::new(e)))))
}
}
}
}
}
Ready(Some(Err(e))) => {
self.state = State::Done;
Ready(Some(Err(Error::Input(Box::new(e)))))
}
Ready(None) => {
self.state = State::Done;
if let Some(x) = self.buf.take() {
let empty = x.new_empty();
Ready(Some(Ok(Output::First(x, empty))))
} else {
Ready(None)
}
}
Pending => Pending,
},
State::Bulk => {
if self.buf.as_ref().map_or(0, |x| x.len()) != 0 {
error!(
"State::Bulk but buf non-empty {}",
self.buf.as_ref().map_or(0, |x| x.len())
);
self.state = State::Done;
Ready(Some(Err(Error::Logic)))
} else {
match self.inp.poll_next_unpin(cx) {
Ready(Some(Ok(item))) => {
if item.verify() != true {
self.state = State::Done;
let e = Error::Unordered;
Ready(Some(Err(e)))
} else {
trace_emit!("output bulk item len {}", item.len());
Ready(Some(Ok(Output::Bulk(item))))
}
}
Ready(Some(Err(e))) => {
self.state = State::Done;
Ready(Some(Err(Error::Input(Box::new(e)))))
}
Ready(None) => {
trace_emit!("in bulk, input done");
self.state = State::Done;
Ready(None)
}
Pending => Pending,
}
}
}
State::Done => Ready(None),
};
}
}
}
fn trait_assert<T>(_: T)
where
T: Stream + Unpin + Send,
{
}
#[allow(unused)]
fn trait_assert_try() {
let x: FirstBeforeAndInside<super::events::EventsStreamRt, items_2::channelevents::ChannelEvents> = phantomval();
trait_assert(x);
}
fn phantomval<T>() -> T {
panic!()
}

View File

@@ -1,318 +1,114 @@
use super::events::EventReadOpts;
use super::events::EventsStreamRt;
use super::firstbefore::FirstBeforeAndInside;
use crate::events2::firstbefore;
use crate::events2::onebeforeandbulk::OneBeforeAndBulk;
use crate::range::ScyllaSeriesRange;
use crate::worker::ScyllaQueue;
use daqbuf_err as err;
use err::thiserror;
use err::ThisError;
use futures_util::Future;
use futures_util::FutureExt;
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::WithLen;
use items_0::streamitem::sitem_err2_from_string;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::SitemErrTy;
use items_0::streamitem::StreamItem;
use items_2::channelevents::ChannelEvents;
use items_2::merger::Mergeable;
use items_2::merger::Merger;
use netpod::log::*;
use netpod::range::evrange::NanoRange;
use netpod::range::evrange::SeriesRange;
use netpod::stream_impl_tracer::StreamImplTracer;
use netpod::ttl::RetentionTime;
use netpod::ChConf;
use std::collections::VecDeque;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
#[allow(unused)]
macro_rules! trace_init { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
#[allow(unused)]
macro_rules! trace_fetch { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
#[allow(unused)]
macro_rules! trace_emit { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
macro_rules! tracer_poll_enter {
($self:expr) => {
if false && $self.tracer.poll_enter() {
return Ready(Some(Err(Error::LimitPoll)));
}
};
}
macro_rules! tracer_loop_enter {
($self:expr) => {
if false && $self.tracer.loop_enter() {
return Ready(Some(Err(Error::LimitLoop)));
}
};
}
macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace!($($arg)*); } ) }
#[derive(Debug, ThisError)]
#[cstm(name = "EventsMergeRt")]
pub enum Error {
Input(#[from] crate::events2::firstbefore::Error),
Events(#[from] crate::events2::events::Error),
Logic,
OrderMin,
OrderMax,
LimitPoll,
LimitLoop,
}
#[allow(unused)]
enum Resolvable<F>
where
F: Future,
{
Future(F),
Output(<F as Future>::Output),
Taken,
}
#[allow(unused)]
impl<F> Resolvable<F>
where
F: Future,
{
fn unresolved(&self) -> bool {
match self {
Resolvable::Future(_) => true,
Resolvable::Output(_) => false,
Resolvable::Taken => false,
}
}
fn take(&mut self) -> Option<<F as Future>::Output> {
let x = std::mem::replace(self, Resolvable::Taken);
match x {
Resolvable::Future(_) => None,
Resolvable::Output(x) => Some(x),
Resolvable::Taken => None,
}
}
}
impl<F> Future for Resolvable<F>
where
F: Future + Unpin,
{
type Output = <F as Future>::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<<F as Future>::Output> {
match unsafe { self.get_unchecked_mut() } {
Resolvable::Future(fut) => fut.poll_unpin(cx),
Resolvable::Output(_) => panic!(),
Resolvable::Taken => panic!(),
}
}
}
type TI = FirstBeforeAndInside<EventsStreamRt, ChannelEvents>;
type INPI = Result<crate::events2::firstbefore::Output<ChannelEvents>, crate::events2::firstbefore::Error>;
struct ReadEvents {
fut: Pin<Box<dyn Future<Output = Option<INPI>> + Send>>,
}
enum State {
Begin,
FetchFirstSt(ReadEvents),
FetchFirstMt(ReadEvents),
FetchFirstLt(ReadEvents),
ReadingLt(Option<ReadEvents>, VecDeque<ChannelEvents>, Option<Box<TI>>),
ReadingMt(Option<ReadEvents>, VecDeque<ChannelEvents>, Option<Box<TI>>),
ReadingSt(Option<ReadEvents>, VecDeque<ChannelEvents>, Option<Box<TI>>),
Done,
Msg(String),
}
pub struct MergeRts {
ch_conf: ChConf,
range: ScyllaSeriesRange,
range_mt: ScyllaSeriesRange,
range_lt: ScyllaSeriesRange,
readopts: EventReadOpts,
scyqueue: ScyllaQueue,
inp_st: Option<Box<TI>>,
inp_mt: Option<Box<TI>>,
inp_lt: Option<Box<TI>>,
state: State,
buf_st: VecDeque<ChannelEvents>,
buf_mt: VecDeque<ChannelEvents>,
buf_lt: VecDeque<ChannelEvents>,
out: VecDeque<ChannelEvents>,
buf_before: Option<ChannelEvents>,
ts_seen_max: u64,
tracer: StreamImplTracer,
inp: Pin<Box<dyn Stream<Item = Result<ChannelEvents, SitemErrTy>> + Send>>,
}
impl MergeRts {
pub fn new(ch_conf: ChConf, range: ScyllaSeriesRange, readopts: EventReadOpts, scyqueue: ScyllaQueue) -> Self {
trace_init!("MergeRts readopts {readopts:?}");
Self {
ch_conf,
range_mt: range.clone(),
range_lt: range.clone(),
range,
readopts,
scyqueue,
inp_st: None,
inp_mt: None,
inp_lt: None,
state: State::Begin,
buf_st: VecDeque::new(),
buf_mt: VecDeque::new(),
buf_lt: VecDeque::new(),
out: VecDeque::new(),
buf_before: None,
ts_seen_max: 0,
tracer: StreamImplTracer::new("MergeRts".into(), 2000, 2000),
}
}
fn setup_first_st(&mut self) {
let rt = RetentionTime::Short;
let limbuf = &VecDeque::new();
let inpdst = &mut self.inp_st;
let range = Self::constrained_range(&self.range, limbuf);
trace_fetch!("setup_first_st constrained beg {}", range.beg().ns());
let tsbeg = range.beg();
let inp = EventsStreamRt::new(
rt,
self.ch_conf.clone(),
range,
self.readopts.clone(),
self.scyqueue.clone(),
);
let inp = TI::new(inp, tsbeg);
*inpdst = Some(Box::new(inp));
}
fn setup_first_mt(&mut self) {
let rt = RetentionTime::Medium;
let limbuf = &self.buf_st;
let inpdst = &mut self.inp_mt;
let range = Self::constrained_range(&self.range_mt, limbuf);
self.range_lt = range.clone();
trace_fetch!("setup_first_mt constrained beg {}", range.beg().ns());
let tsbeg = range.beg();
let inp = EventsStreamRt::new(
rt,
self.ch_conf.clone(),
range,
self.readopts.clone(),
self.scyqueue.clone(),
);
let inp = TI::new(inp, tsbeg);
*inpdst = Some(Box::new(inp));
}
fn setup_first_lt(&mut self) {
let rt = RetentionTime::Long;
let limbuf = &self.buf_mt;
let inpdst = &mut self.inp_lt;
let range = Self::constrained_range(&self.range_lt, limbuf);
trace_fetch!("setup_first_lt constrained beg {}", range.beg().ns());
let tsbeg = range.beg();
let inp = EventsStreamRt::new(
rt,
self.ch_conf.clone(),
range,
self.readopts.clone(),
self.scyqueue.clone(),
);
let inp = TI::new(inp, tsbeg);
*inpdst = Some(Box::new(inp));
}
fn setup_read_st(&mut self) -> ReadEvents {
trace_fetch!("setup_read_st");
Self::setup_read_any(&mut self.inp_st)
}
fn setup_read_mt(&mut self) -> ReadEvents {
trace_fetch!("setup_read_mt");
Self::setup_read_any(&mut self.inp_mt)
}
fn setup_read_lt(&mut self) -> ReadEvents {
trace_fetch!("setup_read_lt");
Self::setup_read_any(&mut self.inp_lt)
}
fn setup_read_any(inp: &mut Option<Box<TI>>) -> ReadEvents {
trace_fetch!("setup_read_any");
let stream = unsafe { &mut *(inp.as_mut().unwrap().as_mut() as *mut TI) };
let fut = Box::pin(stream.next());
ReadEvents { fut }
}
fn constrained_range(full: &ScyllaSeriesRange, buf: &VecDeque<ChannelEvents>) -> ScyllaSeriesRange {
trace_fetch!("constrained_range {:?} {:?}", full, buf.front());
if let Some(e) = buf.front() {
if let Some(ts) = e.ts_min() {
let nrange = NanoRange::from((full.beg().ns(), ts));
ScyllaSeriesRange::from(&SeriesRange::from(nrange))
} else {
debug!("constrained_range no ts even though should not have empty buffers");
full.clone()
let inp_st = EventsStreamRt::new(
RetentionTime::Short,
ch_conf.clone(),
range.clone(),
readopts.clone(),
scyqueue.clone(),
)
.map(|x| {
use RangeCompletableItem::*;
use StreamItem::*;
match x {
Ok(x) => Ok(DataItem(Data(x))),
Err(e) => Err(daqbuf_err::Error::from_string(e)),
}
} else {
full.clone()
}
}
fn handle_first_st(&mut self, mut before: ChannelEvents, bulk: ChannelEvents) {
trace_fetch!("handle_first_st");
Self::move_latest_to_before_buf(&mut before, &mut self.buf_before);
self.buf_st.push_back(bulk);
self.setup_first_mt();
self.state = State::FetchFirstMt(self.setup_read_mt());
}
fn handle_first_mt(&mut self, mut before: ChannelEvents, bulk: ChannelEvents) {
trace_fetch!("handle_first_mt");
Self::move_latest_to_before_buf(&mut before, &mut self.buf_before);
self.buf_mt.push_back(bulk);
self.setup_first_lt();
self.state = State::FetchFirstLt(self.setup_read_lt());
}
fn handle_first_lt(&mut self, mut before: ChannelEvents, bulk: ChannelEvents) {
trace_fetch!("handle_first_lt");
Self::move_latest_to_before_buf(&mut before, &mut self.buf_before);
self.buf_lt.push_back(bulk);
self.push_out_one_before();
let buf = core::mem::replace(&mut self.buf_lt, VecDeque::new());
self.state = State::ReadingLt(None, buf, self.inp_lt.take());
}
fn move_latest_to_before_buf(before: &mut ChannelEvents, buf: &mut Option<ChannelEvents>) {
trace_fetch!("move_latest_to_before_buf");
if buf.is_none() {
*buf = Some(before.new_empty());
}
let buf = buf.as_mut().unwrap();
if let Some(tsn) = before.ts_max() {
if buf.ts_max().map_or(true, |x| tsn > x) {
let n = before.len();
buf.clear();
before.drain_into(buf, (n - 1, n)).unwrap();
});
let inp_mt = EventsStreamRt::new(
RetentionTime::Medium,
ch_conf.clone(),
range.clone(),
readopts.clone(),
scyqueue.clone(),
)
.map(|x| {
use RangeCompletableItem::*;
use StreamItem::*;
match x {
Ok(x) => Ok(DataItem(Data(x))),
Err(e) => Err(daqbuf_err::Error::from_string(e)),
}
}
}
fn push_out_one_before(&mut self) {
if let Some(buf) = self.buf_before.take() {
trace_fetch!("push_out_one_before len {len:?}", len = buf.len());
if buf.len() != 0 {
self.out.push_back(buf);
});
let inp_lt = EventsStreamRt::new(
RetentionTime::Long,
ch_conf.clone(),
range.clone(),
readopts.clone(),
scyqueue.clone(),
)
.map(|x| {
use RangeCompletableItem::*;
use StreamItem::*;
match x {
Ok(x) => Ok(DataItem(Data(x))),
Err(e) => Err(daqbuf_err::Error::from_string(e)),
}
} else {
trace_fetch!("push_out_one_before no buffer");
}
});
let merger: Merger<ChannelEvents> =
Merger::new(vec![Box::pin(inp_st), Box::pin(inp_mt), Box::pin(inp_lt)], None);
let stream = merger.filter_map(|x| {
// TODO all stream adapters must support Sitemty, otherwise range-final item gets dropped.
use RangeCompletableItem::*;
use StreamItem::*;
let x = match x {
Ok(x) => match x {
DataItem(x) => match x {
Data(x) => Some(Ok(x)),
_ => None,
},
_ => None,
},
Err(e) => Some(Err(e)),
};
futures_util::future::ready(x)
});
let stream = OneBeforeAndBulk::<_, ChannelEvents>::new(stream, range.beg(), "after-rt-merged".into());
let stream = stream.map(|x| match x {
Ok(x) => match x {
crate::events2::onebeforeandbulk::Output::Before(x) => Ok(x),
crate::events2::onebeforeandbulk::Output::Bulk(x) => Ok(x),
},
Err(e) => Err(sitem_err2_from_string(e)),
});
let inp = Box::pin(stream);
Self { inp }
}
}
@@ -321,276 +117,15 @@ impl Stream for MergeRts {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
tracer_poll_enter!(self);
let mut out2 = VecDeque::new();
loop {
tracer_loop_enter!(self);
while let Some(x) = out2.pop_front() {
self.out.push_back(x);
}
if let Some(item) = self.out.pop_front() {
trace_emit!("emit item {} {:?}", items_0::Events::verify(&item), item);
if items_0::Events::verify(&item) != true {
debug!("{}bad item {:?}", "\n\n--------------------------\n", item);
self.state = State::Done;
}
if let Some(item_min) = item.ts_min() {
if item_min < self.ts_seen_max {
debug!(
"{}ordering error A {} {}",
"\n\n--------------------------\n", item_min, self.ts_seen_max
);
self.state = State::Done;
break Ready(Some(Err(Error::OrderMin)));
}
}
if let Some(item_max) = item.ts_max() {
if item_max < self.ts_seen_max {
debug!(
"{}ordering error B {} {}",
"\n\n--------------------------\n", item_max, self.ts_seen_max
);
self.state = State::Done;
break Ready(Some(Err(Error::OrderMax)));
} else {
self.ts_seen_max = item_max;
}
}
if let Some(ix) = item.find_highest_index_lt(self.range.beg().ns()) {
trace_fetch!("see item before range ix {ix}");
}
break Ready(Some(Ok(item)));
}
break match &mut self.state {
State::Begin => {
self.setup_first_st();
self.state = State::FetchFirstSt(self.setup_read_st());
continue;
}
State::FetchFirstSt(st2) => match st2.fut.poll_unpin(cx) {
Ready(Some(Ok(x))) => match x {
firstbefore::Output::First(before, bulk) => {
trace_fetch!("have first from ST");
self.handle_first_st(before, bulk);
continue;
}
firstbefore::Output::Bulk(_) => {
self.state = State::Done;
let e = Error::Logic;
Ready(Some(Err(e)))
}
},
Ready(Some(Err(e))) => {
self.state = State::Done;
Ready(Some(Err(e.into())))
}
Ready(None) => {
trace_fetch!("no first from ST");
self.inp_st = None;
self.setup_first_mt();
self.state = State::FetchFirstMt(self.setup_read_mt());
continue;
}
Pending => Pending,
break match self.inp.poll_next_unpin(cx) {
Ready(Some(x)) => match x {
Ok(x) => Ready(Some(Ok(x))),
Err(e) => Ready(Some(Err(Error::Msg(e.to_string())))),
},
State::FetchFirstMt(st2) => match st2.fut.poll_unpin(cx) {
Ready(Some(Ok(x))) => match x {
firstbefore::Output::First(before, bulk) => {
trace_fetch!("have first from MT");
self.handle_first_mt(before, bulk);
continue;
}
firstbefore::Output::Bulk(_) => {
self.state = State::Done;
let e = Error::Logic;
Ready(Some(Err(e)))
}
},
Ready(Some(Err(e))) => {
self.state = State::Done;
Ready(Some(Err(e.into())))
}
Ready(None) => {
trace_fetch!("no first from MT");
self.inp_mt = None;
self.setup_first_lt();
self.state = State::FetchFirstLt(self.setup_read_lt());
continue;
}
Pending => Pending,
},
State::FetchFirstLt(st2) => match st2.fut.poll_unpin(cx) {
Ready(Some(Ok(x))) => match x {
firstbefore::Output::First(before, bulk) => {
trace_fetch!("have first from LT");
self.handle_first_lt(before, bulk);
continue;
}
firstbefore::Output::Bulk(_) => {
self.state = State::Done;
let e = Error::Logic;
Ready(Some(Err(e)))
}
},
Ready(Some(Err(e))) => {
self.state = State::Done;
Ready(Some(Err(e.into())))
}
Ready(None) => {
trace_fetch!("no first from LT");
self.inp_lt = None;
self.push_out_one_before();
let buf = core::mem::replace(&mut self.buf_lt, VecDeque::new());
self.state = State::ReadingLt(None, buf, self.inp_lt.take());
continue;
}
Pending => Pending,
},
State::ReadingLt(fut, buf, inp) => {
if let Some(x) = buf.pop_front() {
out2.push_back(x);
continue;
} else if let Some(fut2) = fut {
match fut2.fut.poll_unpin(cx) {
Ready(Some(Ok(x))) => {
*fut = None;
match x {
firstbefore::Output::Bulk(x) => {
buf.push_back(x);
continue;
}
firstbefore::Output::First(_, _) => {
self.state = State::Done;
let e = Error::Logic;
Ready(Some(Err(e)))
}
}
}
Ready(Some(Err(e))) => {
*fut = None;
self.state = State::Done;
Ready(Some(Err(e.into())))
}
Ready(None) => {
*fut = None;
*inp = None;
continue;
}
Pending => Pending,
}
} else if inp.is_some() {
let buf = core::mem::replace(buf, VecDeque::new());
self.state = State::ReadingLt(Some(Self::setup_read_any(inp)), buf, inp.take());
continue;
} else {
trace_emit!("transition ReadingLt to ReadingMt");
let buf = core::mem::replace(&mut self.buf_mt, VecDeque::new());
self.state = State::ReadingMt(None, buf, self.inp_mt.take());
continue;
}
}
State::ReadingMt(fut, buf, inp) => {
if let Some(x) = buf.pop_front() {
out2.push_back(x);
continue;
} else if let Some(fut2) = fut {
match fut2.fut.poll_unpin(cx) {
Ready(Some(Ok(x))) => {
*fut = None;
match x {
firstbefore::Output::Bulk(x) => {
buf.push_back(x);
continue;
}
firstbefore::Output::First(_, _) => {
self.state = State::Done;
let e = Error::Logic;
Ready(Some(Err(e)))
}
}
}
Ready(Some(Err(e))) => {
*fut = None;
self.state = State::Done;
Ready(Some(Err(e.into())))
}
Ready(None) => {
*fut = None;
*inp = None;
continue;
}
Pending => Pending,
}
} else if inp.is_some() {
let buf = core::mem::replace(buf, VecDeque::new());
self.state = State::ReadingMt(Some(Self::setup_read_any(inp)), buf, inp.take());
continue;
} else {
trace_emit!("transition ReadingMt to ReadingSt");
let buf = core::mem::replace(&mut self.buf_st, VecDeque::new());
self.state = State::ReadingSt(None, buf, self.inp_st.take());
continue;
}
}
State::ReadingSt(fut, buf, inp) => {
if let Some(x) = buf.pop_front() {
out2.push_back(x);
continue;
} else if let Some(fut2) = fut {
match fut2.fut.poll_unpin(cx) {
Ready(Some(Ok(x))) => {
*fut = None;
match x {
firstbefore::Output::Bulk(x) => {
buf.push_back(x);
continue;
}
firstbefore::Output::First(_, _) => {
self.state = State::Done;
let e = Error::Logic;
Ready(Some(Err(e)))
}
}
}
Ready(Some(Err(e))) => {
*fut = None;
self.state = State::Done;
Ready(Some(Err(e.into())))
}
Ready(None) => {
*fut = None;
*inp = None;
continue;
}
Pending => Pending,
}
} else if inp.is_some() {
let buf = core::mem::replace(buf, VecDeque::new());
self.state = State::ReadingSt(Some(Self::setup_read_any(inp)), buf, inp.take());
continue;
} else {
trace_emit!("fully done");
Ready(None)
}
}
State::Done => Ready(None),
Ready(None) => Ready(None),
Pending => Pending,
};
}
}
}
fn trait_assert<T>(_: T)
where
T: Stream + Unpin + Send,
{
}
#[allow(unused)]
fn trait_assert_try() {
let x: MergeRts = phantomval();
trait_assert(x);
}
fn phantomval<T>() -> T {
panic!()
}

View File

@@ -0,0 +1,613 @@
use super::events::EventReadOpts;
use super::events::EventsStreamRt;
use super::onebeforeandbulk::OneBeforeAndBulk;
use crate::events2::onebeforeandbulk;
use crate::range::ScyllaSeriesRange;
use crate::worker::ScyllaQueue;
use daqbuf_err as err;
use err::thiserror;
use err::ThisError;
use futures_util::Future;
use futures_util::FutureExt;
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::WithLen;
use items_2::channelevents::ChannelEvents;
use items_2::merger::Mergeable;
use netpod::log::*;
use netpod::range::evrange::NanoRange;
use netpod::range::evrange::SeriesRange;
use netpod::stream_impl_tracer::StreamImplTracer;
use netpod::ttl::RetentionTime;
use netpod::ChConf;
use netpod::TsNano;
use std::collections::VecDeque;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace!($($arg)*); } ) }
macro_rules! trace_fetch { ($($arg:tt)*) => ( if true { trace!($($arg)*); } ) }
macro_rules! trace_emit { ($($arg:tt)*) => ( if true { trace!($($arg)*); } ) }
macro_rules! trace_switch { ($($arg:tt)*) => ( if true { trace!($($arg)*); } ) }
macro_rules! tracer_poll_enter {
($self:expr) => {
if false && $self.tracer.poll_enter() {
return Ready(Some(Err(Error::LimitPoll)));
}
};
}
macro_rules! tracer_loop_enter {
($self:expr) => {
if false && $self.tracer.loop_enter() {
return Ready(Some(Err(Error::LimitLoop)));
}
};
}
#[derive(Debug, ThisError)]
#[cstm(name = "EventsMergeRt")]
pub enum Error {
Input(#[from] crate::events2::onebeforeandbulk::Error),
Events(#[from] crate::events2::events::Error),
Logic,
OrderMin,
OrderMax,
LimitPoll,
LimitLoop,
}
#[allow(unused)]
enum Resolvable<F>
where
F: Future,
{
Future(F),
Output(<F as Future>::Output),
Taken,
}
#[allow(unused)]
impl<F> Resolvable<F>
where
F: Future,
{
fn unresolved(&self) -> bool {
match self {
Resolvable::Future(_) => true,
Resolvable::Output(_) => false,
Resolvable::Taken => false,
}
}
fn take(&mut self) -> Option<<F as Future>::Output> {
let x = std::mem::replace(self, Resolvable::Taken);
match x {
Resolvable::Future(_) => None,
Resolvable::Output(x) => Some(x),
Resolvable::Taken => None,
}
}
}
impl<F> Future for Resolvable<F>
where
F: Future + Unpin,
{
type Output = <F as Future>::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<<F as Future>::Output> {
match unsafe { self.get_unchecked_mut() } {
Resolvable::Future(fut) => fut.poll_unpin(cx),
Resolvable::Output(_) => panic!(),
Resolvable::Taken => panic!(),
}
}
}
type TI = OneBeforeAndBulk<EventsStreamRt, ChannelEvents>;
type INPI = Result<crate::events2::onebeforeandbulk::Output<ChannelEvents>, crate::events2::onebeforeandbulk::Error>;
struct ReadEvents {
fut: Pin<Box<dyn Future<Output = Option<INPI>> + Send>>,
}
enum State {
Begin,
FetchFirstSt(ReadEvents),
FetchFirstMt(ReadEvents),
FetchFirstLt(ReadEvents),
ReadingLt(Option<ReadEvents>, VecDeque<ChannelEvents>, Option<Box<TI>>),
ReadingMt(Option<ReadEvents>, VecDeque<ChannelEvents>, Option<Box<TI>>),
ReadingSt(Option<ReadEvents>, VecDeque<ChannelEvents>, Option<Box<TI>>),
Done,
}
pub struct MergeRtsChained {
ch_conf: ChConf,
range: ScyllaSeriesRange,
range_st: ScyllaSeriesRange,
range_mt: ScyllaSeriesRange,
range_lt: ScyllaSeriesRange,
readopts: EventReadOpts,
scyqueue: ScyllaQueue,
inp_st: Option<Box<TI>>,
inp_mt: Option<Box<TI>>,
inp_lt: Option<Box<TI>>,
state: State,
buf_st: VecDeque<ChannelEvents>,
buf_mt: VecDeque<ChannelEvents>,
buf_lt: VecDeque<ChannelEvents>,
out: VecDeque<ChannelEvents>,
buf_before: Option<ChannelEvents>,
ts_seen_max: u64,
tracer: StreamImplTracer,
}
impl MergeRtsChained {
pub fn new(ch_conf: ChConf, range: ScyllaSeriesRange, readopts: EventReadOpts, scyqueue: ScyllaQueue) -> Self {
trace_init!("MergeRtsChained readopts {readopts:?}");
Self {
ch_conf,
range_st: range.clone(),
range_mt: range.clone(),
range_lt: range.clone(),
range,
readopts,
scyqueue,
inp_st: None,
inp_mt: None,
inp_lt: None,
state: State::Begin,
buf_st: VecDeque::new(),
buf_mt: VecDeque::new(),
buf_lt: VecDeque::new(),
out: VecDeque::new(),
buf_before: None,
ts_seen_max: 0,
tracer: StreamImplTracer::new("MergeRtsChained".into(), 2000, 2000),
}
}
fn setup_first_st(&mut self) {
let rt = RetentionTime::Short;
let limbuf = &VecDeque::new();
let inpdst = &mut self.inp_st;
let range = Self::constrained_range(&self.range_st, limbuf);
self.range_st = range.clone();
self.range_mt = range.clone();
self.range_lt = range.clone();
trace_fetch!("setup_first_st constrained beg {}", range.beg().ns());
let tsbeg = range.beg();
let inp = EventsStreamRt::new(
rt,
self.ch_conf.clone(),
range,
self.readopts.clone(),
self.scyqueue.clone(),
);
let inp = TI::new(inp, tsbeg, "ST".into());
*inpdst = Some(Box::new(inp));
}
fn setup_first_mt(&mut self) {
let rt = RetentionTime::Medium;
let limbuf = &self.buf_st;
let inpdst = &mut self.inp_mt;
let range = Self::constrained_range(&self.range_mt, limbuf);
self.range_mt = range.clone();
self.range_lt = range.clone();
trace_fetch!("setup_first_mt constrained beg {}", range.beg().ns());
let tsbeg = range.beg();
let inp = EventsStreamRt::new(
rt,
self.ch_conf.clone(),
range,
self.readopts.clone(),
self.scyqueue.clone(),
);
let inp = TI::new(inp, tsbeg, "MT".into());
*inpdst = Some(Box::new(inp));
}
fn setup_first_lt(&mut self) {
let rt = RetentionTime::Long;
let limbuf = &self.buf_mt;
let inpdst = &mut self.inp_lt;
let range = Self::constrained_range(&self.range_lt, limbuf);
self.range_lt = range.clone();
trace_fetch!("setup_first_lt constrained beg {}", range.beg().ns());
let tsbeg = range.beg();
let inp = EventsStreamRt::new(
rt,
self.ch_conf.clone(),
range,
self.readopts.clone(),
self.scyqueue.clone(),
);
let inp = TI::new(inp, tsbeg, "LT".into());
*inpdst = Some(Box::new(inp));
}
fn setup_read_st(&mut self) -> ReadEvents {
trace_fetch!("setup_read_st");
Self::setup_read_any(&mut self.inp_st)
}
fn setup_read_mt(&mut self) -> ReadEvents {
trace_fetch!("setup_read_mt");
Self::setup_read_any(&mut self.inp_mt)
}
fn setup_read_lt(&mut self) -> ReadEvents {
trace_fetch!("setup_read_lt");
Self::setup_read_any(&mut self.inp_lt)
}
fn setup_read_any(inp: &mut Option<Box<TI>>) -> ReadEvents {
trace_fetch!("setup_read_any");
let stream = unsafe { &mut *(inp.as_mut().unwrap().as_mut() as *mut TI) };
let fut = Box::pin(stream.next());
ReadEvents { fut }
}
fn constrained_range(full: &ScyllaSeriesRange, buf: &VecDeque<ChannelEvents>) -> ScyllaSeriesRange {
trace_fetch!("constrained_range {:?} {:?}", full, buf.front());
if let Some(e) = buf.front() {
if let Some(ts) = e.ts_min() {
let nrange = NanoRange::from((full.beg().ns(), ts));
ScyllaSeriesRange::from(&SeriesRange::from(nrange))
} else {
full.clone()
}
} else {
full.clone()
}
}
fn handle_first_st(&mut self, mut before: ChannelEvents, bulk: ChannelEvents) {
trace_fetch!("handle_first_st");
Self::move_latest_to_before_buf(&mut before, &mut self.buf_before);
self.buf_st.push_back(bulk);
self.setup_first_mt();
self.state = State::FetchFirstMt(self.setup_read_mt());
}
fn handle_first_mt(&mut self, mut before: ChannelEvents, bulk: ChannelEvents) {
trace_fetch!("handle_first_mt");
Self::move_latest_to_before_buf(&mut before, &mut self.buf_before);
self.buf_mt.push_back(bulk);
self.setup_first_lt();
self.state = State::FetchFirstLt(self.setup_read_lt());
}
fn handle_first_lt(&mut self, mut before: ChannelEvents, bulk: ChannelEvents) {
trace_fetch!("handle_first_lt");
Self::move_latest_to_before_buf(&mut before, &mut self.buf_before);
self.buf_lt.push_back(bulk);
}
fn handle_all_firsts_done(&mut self) {
trace_switch!(
"CONSIDERED RANGES:\nFULL {:?}\nST {:?}\nMT {:?}\nLT {:?}\n",
self.range,
self.range_st,
self.range_mt,
self.range_lt
);
self.push_out_one_before();
let buf = core::mem::replace(&mut self.buf_lt, VecDeque::new());
self.state = State::ReadingLt(None, buf, self.inp_lt.take());
}
fn move_latest_to_before_buf(before: &mut ChannelEvents, buf: &mut Option<ChannelEvents>) {
let buf = buf.get_or_insert_with(|| {
trace_fetch!("move_latest_to_before_buf init before buf");
before.new_empty()
});
if let Some(tsn) = before.ts_max() {
let tsn = TsNano::from_ns(tsn);
if buf.ts_max().map_or(true, |x| tsn.ns() > x) {
trace_fetch!("move_latest_to_before_buf move possible before item {tsn}");
let n = before.len();
buf.clear();
before.drain_into(buf, (n - 1, n)).unwrap();
}
}
}
fn push_out_one_before(&mut self) {
if let Some(buf) = self.buf_before.take() {
trace_fetch!("push_out_one_before len {len:?}", len = buf.len());
if buf.len() != 0 {
self.out.push_back(buf);
}
} else {
trace_fetch!("push_out_one_before no buffer");
}
}
}
impl Stream for MergeRtsChained {
type Item = Result<ChannelEvents, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
tracer_poll_enter!(self);
let mut out2 = VecDeque::new();
loop {
tracer_loop_enter!(self);
while let Some(x) = out2.pop_front() {
self.out.push_back(x);
}
if let Some(item) = self.out.pop_front() {
trace_emit!("emit item {} {:?}", items_0::Events::verify(&item), item);
if items_0::Events::verify(&item) != true {
debug!("{}bad item {:?}", "\n\n--------------------------\n", item);
self.state = State::Done;
}
if let Some(item_min) = item.ts_min() {
if item_min < self.ts_seen_max {
debug!(
"{}ordering error A {} {}",
"\n\n--------------------------\n", item_min, self.ts_seen_max
);
self.state = State::Done;
break Ready(Some(Err(Error::OrderMin)));
}
}
if let Some(item_max) = item.ts_max() {
if item_max < self.ts_seen_max {
debug!(
"{}ordering error B {} {}",
"\n\n--------------------------\n", item_max, self.ts_seen_max
);
self.state = State::Done;
break Ready(Some(Err(Error::OrderMax)));
} else {
self.ts_seen_max = item_max;
}
}
if let Some(ix) = item.find_highest_index_lt(self.range.beg().ns()) {
trace_fetch!("see item before range ix {ix}");
}
break Ready(Some(Ok(item)));
}
break match &mut self.state {
State::Begin => {
self.setup_first_st();
self.state = State::FetchFirstSt(self.setup_read_st());
continue;
}
State::FetchFirstSt(st2) => match st2.fut.poll_unpin(cx) {
Ready(Some(Ok(x))) => match x {
onebeforeandbulk::Output::Before(before) => {
trace_fetch!("have first from ST");
let empty = before.new_empty();
self.handle_first_st(before, empty);
continue;
}
onebeforeandbulk::Output::Bulk(item) => {
self.handle_first_st(item.new_empty(), item);
continue;
}
},
Ready(Some(Err(e))) => {
self.state = State::Done;
Ready(Some(Err(e.into())))
}
Ready(None) => {
trace_fetch!("no first from ST");
self.inp_st = None;
self.setup_first_mt();
self.state = State::FetchFirstMt(self.setup_read_mt());
continue;
}
Pending => Pending,
},
State::FetchFirstMt(st2) => match st2.fut.poll_unpin(cx) {
Ready(Some(Ok(x))) => match x {
onebeforeandbulk::Output::Before(before) => {
trace_fetch!("have first from MT");
let empty = before.new_empty();
self.handle_first_mt(before, empty);
continue;
}
onebeforeandbulk::Output::Bulk(item) => {
self.handle_first_mt(item.new_empty(), item);
continue;
}
},
Ready(Some(Err(e))) => {
self.state = State::Done;
Ready(Some(Err(e.into())))
}
Ready(None) => {
trace_fetch!("no first from MT");
self.inp_mt = None;
self.setup_first_lt();
self.state = State::FetchFirstLt(self.setup_read_lt());
continue;
}
Pending => Pending,
},
State::FetchFirstLt(st2) => match st2.fut.poll_unpin(cx) {
Ready(Some(Ok(x))) => match x {
onebeforeandbulk::Output::Before(before) => {
trace_fetch!("have first from LT");
let empty = before.new_empty();
self.handle_first_lt(before, empty);
self.handle_all_firsts_done();
continue;
}
onebeforeandbulk::Output::Bulk(item) => {
self.handle_first_lt(item.new_empty(), item);
self.handle_all_firsts_done();
continue;
}
},
Ready(Some(Err(e))) => {
self.state = State::Done;
Ready(Some(Err(e.into())))
}
Ready(None) => {
trace_fetch!("no first from LT");
self.inp_lt = None;
self.handle_all_firsts_done();
continue;
}
Pending => Pending,
},
State::ReadingLt(fut, buf, inp) => {
if let Some(x) = buf.pop_front() {
out2.push_back(x);
continue;
} else if let Some(fut2) = fut {
match fut2.fut.poll_unpin(cx) {
Ready(Some(Ok(x))) => {
*fut = None;
match x {
onebeforeandbulk::Output::Bulk(x) => {
buf.push_back(x);
continue;
}
onebeforeandbulk::Output::Before(_) => {
self.state = State::Done;
let e = Error::Logic;
Ready(Some(Err(e)))
}
}
}
Ready(Some(Err(e))) => {
*fut = None;
self.state = State::Done;
Ready(Some(Err(e.into())))
}
Ready(None) => {
*fut = None;
*inp = None;
continue;
}
Pending => Pending,
}
} else if inp.is_some() {
let buf = core::mem::replace(buf, VecDeque::new());
self.state = State::ReadingLt(Some(Self::setup_read_any(inp)), buf, inp.take());
continue;
} else {
trace_emit!("transition ReadingLt to ReadingMt");
let buf = core::mem::replace(&mut self.buf_mt, VecDeque::new());
self.state = State::ReadingMt(None, buf, self.inp_mt.take());
continue;
}
}
State::ReadingMt(fut, buf, inp) => {
if let Some(x) = buf.pop_front() {
out2.push_back(x);
continue;
} else if let Some(fut2) = fut {
match fut2.fut.poll_unpin(cx) {
Ready(Some(Ok(x))) => {
*fut = None;
match x {
onebeforeandbulk::Output::Bulk(x) => {
buf.push_back(x);
continue;
}
onebeforeandbulk::Output::Before(_) => {
self.state = State::Done;
let e = Error::Logic;
Ready(Some(Err(e)))
}
}
}
Ready(Some(Err(e))) => {
*fut = None;
self.state = State::Done;
Ready(Some(Err(e.into())))
}
Ready(None) => {
*fut = None;
*inp = None;
continue;
}
Pending => Pending,
}
} else if inp.is_some() {
let buf = core::mem::replace(buf, VecDeque::new());
self.state = State::ReadingMt(Some(Self::setup_read_any(inp)), buf, inp.take());
continue;
} else {
trace_emit!("transition ReadingMt to ReadingSt");
let buf = core::mem::replace(&mut self.buf_st, VecDeque::new());
self.state = State::ReadingSt(None, buf, self.inp_st.take());
continue;
}
}
State::ReadingSt(fut, buf, inp) => {
if let Some(x) = buf.pop_front() {
out2.push_back(x);
continue;
} else if let Some(fut2) = fut {
match fut2.fut.poll_unpin(cx) {
Ready(Some(Ok(x))) => {
*fut = None;
match x {
onebeforeandbulk::Output::Bulk(x) => {
buf.push_back(x);
continue;
}
onebeforeandbulk::Output::Before(_) => {
self.state = State::Done;
let e = Error::Logic;
Ready(Some(Err(e)))
}
}
}
Ready(Some(Err(e))) => {
*fut = None;
self.state = State::Done;
Ready(Some(Err(e.into())))
}
Ready(None) => {
*fut = None;
*inp = None;
continue;
}
Pending => Pending,
}
} else if inp.is_some() {
let buf = core::mem::replace(buf, VecDeque::new());
self.state = State::ReadingSt(Some(Self::setup_read_any(inp)), buf, inp.take());
continue;
} else {
trace_emit!("fully done");
Ready(None)
}
}
State::Done => Ready(None),
};
}
}
}
fn trait_assert<T>(_: T)
where
T: Stream + Unpin + Send,
{
}
#[allow(unused)]
fn trait_assert_try() {
let x: MergeRtsChained = phantomval();
trait_assert(x);
}
fn phantomval<T>() -> T {
panic!()
}

View File

@@ -0,0 +1,315 @@
use daqbuf_err as err;
use err::thiserror;
use err::ThisError;
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::Events;
use items_2::merger::Mergeable;
use netpod::log::*;
use netpod::stream_impl_tracer::StreamImplTracer;
use netpod::TsNano;
use std::collections::VecDeque;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
macro_rules! trace_transition { ($($arg:tt)*) => ( if true { trace!($($arg)*); } ) }
macro_rules! trace_emit { ($($arg:tt)*) => ( if true { trace!($($arg)*); } ) }
macro_rules! tracer_poll_enter {
($self:expr) => {
if false && $self.tracer.poll_enter() {
return Ready(Some(Err(Error::LimitPoll)));
}
};
}
macro_rules! tracer_loop_enter {
($self:expr) => {
if false && $self.tracer.loop_enter() {
return Ready(Some(Err(Error::LimitLoop)));
}
};
}
#[allow(unused)]
macro_rules! debug_fetch { ($($arg:tt)*) => ( if true { debug!($($arg)*); } ) }
#[derive(Debug, ThisError)]
#[cstm(name = "EventsOneBeforeAndBulk")]
pub enum Error {
Unordered,
Logic,
Input(Box<dyn std::error::Error + Send>),
LimitPoll,
LimitLoop,
}
#[derive(Debug)]
pub enum Output<T> {
Before(T),
Bulk(T),
}
enum State {
Begin,
Bulk,
Done,
}
pub struct OneBeforeAndBulk<S, T>
where
S: Stream + Unpin,
T: Mergeable + Unpin,
{
ts0: TsNano,
inp: S,
state: State,
buf: Option<T>,
out: VecDeque<T>,
tracer: StreamImplTracer,
seen_empty_during_begin: bool,
seen_empty_during_bulk: bool,
dbgname: String,
tslast: TsNano,
}
impl<S, T> OneBeforeAndBulk<S, T>
where
S: Stream + Unpin,
T: Mergeable + Unpin,
{
fn selfname() -> &'static str {
std::any::type_name::<Self>()
}
pub fn new(inp: S, ts0: TsNano, dbgname: String) -> Self {
trace_transition!("{}::new", Self::selfname());
Self {
ts0,
inp,
state: State::Begin,
buf: None,
out: VecDeque::new(),
tracer: StreamImplTracer::new(Self::selfname().into(), 2000, 100),
seen_empty_during_begin: false,
seen_empty_during_bulk: false,
dbgname,
tslast: TsNano::from_ns(0),
}
}
fn consume_buf_get_latest(&mut self) -> Option<T> {
if let Some(mut buf) = self.buf.take() {
if buf.len() == 0 {
debug!("buf set but empty");
None
} else {
let mut ret = buf.new_empty();
buf.drain_into(&mut ret, (buf.len() - 1, buf.len()));
Some(ret)
}
} else {
None
}
}
}
impl<S, T, E> Stream for OneBeforeAndBulk<S, T>
where
S: Stream<Item = Result<T, E>> + Unpin,
T: Events + Mergeable + Unpin,
E: std::error::Error + Send + 'static,
{
type Item = Result<Output<T>, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
tracer_poll_enter!(self);
loop {
tracer_loop_enter!(self);
break if let Some(item) = self.out.pop_front() {
Ready(Some(Ok(Output::Bulk(item))))
} else {
match &self.state {
State::Begin => match self.inp.poll_next_unpin(cx) {
Ready(Some(Ok(mut item))) => {
if let Some(tsmin) = Mergeable::ts_min(&item) {
let tsmin = TsNano::from_ns(tsmin);
if tsmin < self.tslast {
self.state = State::Done;
let e = Error::Unordered;
break Ready(Some(Err(e)));
} else {
self.tslast = TsNano::from_ns(Mergeable::ts_max(&item).unwrap());
}
}
if item.verify() != true {
self.state = State::Done;
let e = Error::Unordered;
Ready(Some(Err(e)))
} else {
if item.len() == 0 {
self.seen_empty_during_begin = true;
} else {
if self.seen_empty_during_begin {
debug_fetch!(
"still in Begin current event len {} but seen empty before",
item.len()
);
}
}
// Separate events into before and bulk
let tss = Events::tss(&item);
let pp = tss.partition_point(|&x| x < self.ts0.ns());
trace_transition!("partition_point {pp:?} {n:?}", n = tss.len());
if pp > item.len() {
error!("bad partition point {} {}", pp, item.len());
self.state = State::Done;
Ready(Some(Err(Error::Logic)))
} else if pp == item.len() {
// all entries are before, or empty item
trace_transition!("stay in Begin");
trace_emit!(
"State::Begin Before {} all content still before len {}",
self.dbgname,
item.len()
);
let buf = self.buf.get_or_insert_with(|| item.new_empty());
match item.drain_into_evs(buf, (0, item.len())) {
Ok(()) => {
continue;
}
Err(e) => {
self.state = State::Done;
Ready(Some(Err(Error::Input(Box::new(e)))))
}
}
} else if pp == 0 {
// all entries are bulk
trace_transition!("transition with bulk to Bulk");
self.state = State::Bulk;
if let Some(before) = self.consume_buf_get_latest() {
self.out.push_back(item);
let item = Output::Before(before);
trace_emit!("State::Begin Before {} emit {:?}", self.dbgname, item);
Ready(Some(Ok(item)))
} else {
let item = Output::Bulk(item);
trace_emit!("State::Begin Bulk {} emit {:?}", self.dbgname, item);
Ready(Some(Ok(item)))
}
} else {
// mixed
trace_transition!("transition with mixed to Bulk");
self.state = State::Bulk;
let buf = self.buf.get_or_insert_with(|| item.new_empty());
match item.drain_into_evs(buf, (0, pp)) {
Ok(()) => {
if let Some(before) = self.consume_buf_get_latest() {
self.out.push_back(item);
let item = Output::Before(before);
trace_emit!("State::Begin Before {} emit {:?}", self.dbgname, item);
Ready(Some(Ok(item)))
} else {
let item = Output::Bulk(item);
trace_emit!("State::Begin Bulk {} emit {:?}", self.dbgname, item);
Ready(Some(Ok(item)))
}
}
Err(e) => {
self.state = State::Done;
let e = Error::Input(Box::new(e));
Ready(Some(Err(e)))
}
}
}
}
}
Ready(Some(Err(e))) => {
self.state = State::Done;
Ready(Some(Err(Error::Input(Box::new(e)))))
}
Ready(None) => {
self.state = State::Done;
trace_transition!("transition from Begin to end of stream");
if let Some(before) = self.consume_buf_get_latest() {
let item = Output::Before(before);
trace_emit!("State::Begin EOS {} emit {:?}", self.dbgname, item);
Ready(Some(Ok(item)))
} else {
trace_emit!("State::Begin EOS {} emit None", self.dbgname);
Ready(None)
}
}
Pending => Pending,
},
State::Bulk => {
if self.buf.is_some() {
let n = self.buf.as_ref().map_or(0, |x| x.len());
error!("State::Bulk but buf non-empty {}", n);
self.state = State::Done;
Ready(Some(Err(Error::Logic)))
} else {
match self.inp.poll_next_unpin(cx) {
Ready(Some(Ok(item))) => {
if let Some(tsmin) = Mergeable::ts_min(&item) {
let tsmin = TsNano::from_ns(tsmin);
if tsmin < self.tslast {
self.state = State::Done;
let e = Error::Unordered;
break Ready(Some(Err(e)));
} else {
self.tslast = TsNano::from_ns(Mergeable::ts_max(&item).unwrap());
}
}
if item.verify() != true {
self.state = State::Done;
let e = Error::Unordered;
Ready(Some(Err(e)))
} else {
if item.len() == 0 {
self.seen_empty_during_bulk = true;
}
let item = Output::Bulk(item);
trace_emit!("State::Bulk data {} emit {:?}", self.dbgname, item);
Ready(Some(Ok(item)))
}
}
Ready(Some(Err(e))) => {
self.state = State::Done;
Ready(Some(Err(Error::Input(Box::new(e)))))
}
Ready(None) => {
trace_emit!("in bulk, input done");
self.state = State::Done;
trace_emit!("State::Bulk EOS {} emit None", self.dbgname);
Ready(None)
}
Pending => Pending,
}
}
}
State::Done => Ready(None),
}
};
}
}
}
fn trait_assert<T>(_: T)
where
T: Stream + Unpin + Send,
{
}
#[allow(unused)]
fn trait_assert_try() {
let x: OneBeforeAndBulk<super::events::EventsStreamRt, items_2::channelevents::ChannelEvents> = phantomval();
trait_assert(x);
}
fn phantomval<T>() -> T {
panic!()
}