diff --git a/crates/disk/src/decode.rs b/crates/disk/src/decode.rs index ebaa208..16e32b0 100644 --- a/crates/disk/src/decode.rs +++ b/crates/disk/src/decode.rs @@ -6,7 +6,7 @@ use items_0::scalar_ops::ScalarOps; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; -use items_0::Events; +use items_0::timebin::BinningggContainerEventsDyn; use items_0::WithLen; use items_2::empty::empty_events_dyn_ev; use items_2::eventfull::EventFull; @@ -137,15 +137,36 @@ impl ScalarValueFromBytes for bool { } pub trait ValueFromBytes: Send { - fn convert(&self, ts: u64, pulse: u64, buf: &[u8], endian: Endian, events: &mut dyn Events) -> Result<(), Error>; + fn convert( + &self, + ts: u64, + pulse: u64, + buf: &[u8], + endian: Endian, + events: &mut dyn BinningggContainerEventsDyn, + ) -> Result<(), Error>; } pub trait ValueDim0FromBytes { - fn convert(&self, ts: u64, pulse: u64, buf: &[u8], endian: Endian, events: &mut dyn Events) -> Result<(), Error>; + fn convert( + &self, + ts: u64, + pulse: u64, + buf: &[u8], + endian: Endian, + events: &mut dyn BinningggContainerEventsDyn, + ) -> Result<(), Error>; } pub trait ValueDim1FromBytes { - fn convert(&self, ts: u64, pulse: u64, buf: &[u8], endian: Endian, events: &mut dyn Events) -> Result<(), Error>; + fn convert( + &self, + ts: u64, + pulse: u64, + buf: &[u8], + endian: Endian, + events: &mut dyn BinningggContainerEventsDyn, + ) -> Result<(), Error>; } pub struct ValueDim0FromBytesImpl @@ -170,7 +191,14 @@ impl ValueDim0FromBytes for ValueDim0FromBytesImpl where STY: ScalarOps + ScalarValueFromBytes, { - fn convert(&self, ts: u64, pulse: u64, buf: &[u8], endian: Endian, events: &mut dyn Events) -> Result<(), Error> { + fn convert( + &self, + ts: u64, + pulse: u64, + buf: &[u8], + endian: Endian, + events: &mut dyn BinningggContainerEventsDyn, + ) -> Result<(), Error> { if let Some(evs) = events.as_any_mut().downcast_mut::>() { let v = >::convert(buf, endian)?; evs.values.push_back(v); @@ -187,7 +215,14 @@ impl ValueFromBytes for ValueDim0FromBytesImpl where STY: ScalarOps + ScalarValueFromBytes, { - fn convert(&self, ts: u64, pulse: u64, buf: &[u8], endian: Endian, events: &mut dyn Events) -> Result<(), Error> { + fn convert( + &self, + ts: u64, + pulse: u64, + buf: &[u8], + endian: Endian, + events: &mut dyn BinningggContainerEventsDyn, + ) -> Result<(), Error> { ValueDim0FromBytes::convert(self, ts, pulse, buf, endian, events) } } @@ -216,7 +251,14 @@ impl ValueFromBytes for ValueDim1FromBytesImpl where STY: ScalarOps + ScalarValueFromBytes, { - fn convert(&self, ts: u64, pulse: u64, buf: &[u8], endian: Endian, events: &mut dyn Events) -> Result<(), Error> { + fn convert( + &self, + ts: u64, + pulse: u64, + buf: &[u8], + endian: Endian, + events: &mut dyn BinningggContainerEventsDyn, + ) -> Result<(), Error> { ValueDim1FromBytes::convert(self, ts, pulse, buf, endian, events) } } @@ -225,7 +267,14 @@ impl ValueDim1FromBytes for ValueDim1FromBytesImpl where STY: ScalarOps + ScalarValueFromBytes, { - fn convert(&self, ts: u64, pulse: u64, buf: &[u8], endian: Endian, events: &mut dyn Events) -> Result<(), Error> { + fn convert( + &self, + ts: u64, + pulse: u64, + buf: &[u8], + endian: Endian, + events: &mut dyn BinningggContainerEventsDyn, + ) -> Result<(), Error> { if let Some(evs) = events.as_any_mut().downcast_mut::>() { let n = if let Shape::Wave(n) = self.shape { n @@ -304,7 +353,7 @@ pub struct EventsDynStream { scalar_type: ScalarType, shape: Shape, events_full: Pin> + Send>>, - events_out: Box, + events_out: Box, scalar_conv: Box, emit_threshold: usize, done: bool, @@ -346,7 +395,7 @@ impl EventsDynStream { Ok(ret) } - fn replace_events_out(&mut self) -> Result, Error> { + fn replace_events_out(&mut self) -> Result, Error> { let st = &self.scalar_type; let sh = &self.shape; // error!("TODO replace_events_out feed through transform"); @@ -374,7 +423,7 @@ impl EventsDynStream { fn handle_stream_item( &mut self, item: StreamItem>, - ) -> Result>>, Error> { + ) -> Result>>, Error> { let ret = match item { StreamItem::DataItem(item) => match item { RangeCompletableItem::RangeComplete => { @@ -401,7 +450,7 @@ impl EventsDynStream { } impl Stream for EventsDynStream { - type Item = Sitemty>; + type Item = Sitemty>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; diff --git a/crates/scyllaconn/src/events.rs b/crates/scyllaconn/src/events.rs index 3270194..c253403 100644 --- a/crates/scyllaconn/src/events.rs +++ b/crates/scyllaconn/src/events.rs @@ -10,12 +10,11 @@ use err::ThisError; use futures_util::Future; use futures_util::StreamExt; use items_0::scalar_ops::ScalarOps; +use items_0::timebin::BinningggContainerEventsDyn; use items_0::Appendable; use items_0::Empty; -use items_0::Events; use items_0::WithLen; -use items_2::eventsdim0::EventsDim0; -use items_2::eventsdim1::EventsDim1; +use items_2::binning::container_events::ContainerEvents; use netpod::log::*; use netpod::ttl::RetentionTime; use netpod::DtNano; @@ -57,7 +56,7 @@ impl From for Error { pub(super) trait ValTy: Sized + 'static { type ScaTy: ScalarOps + std::default::Default; type ScyTy: scylla::cql_to_rust::FromCqlVal; - type Container: Events + Appendable; + type Container: BinningggContainerEventsDyn + Appendable; fn from_scyty(inp: Self::ScyTy) -> Self; fn from_valueblob(inp: Vec) -> Self; fn table_name() -> &'static str; @@ -69,7 +68,7 @@ pub(super) trait ValTy: Sized + 'static { jobtrace: ReadJobTrace, scy: Arc, stmts: Arc, - ) -> Pin, ReadJobTrace), Error>> + Send>>; + ) -> Pin, ReadJobTrace), Error>> + Send>>; fn convert_rows( rows: Vec, range: ScyllaSeriesRange, @@ -85,7 +84,7 @@ macro_rules! impl_scaty_scalar { impl ValTy for $st { type ScaTy = $st; type ScyTy = $st_scy; - type Container = EventsDim0; + type Container = ContainerEvents; fn from_scyty(inp: Self::ScyTy) -> Self { inp as Self @@ -116,7 +115,7 @@ macro_rules! impl_scaty_scalar { jobtrace: ReadJobTrace, scy: Arc, stmts: Arc, - ) -> Pin, ReadJobTrace), Error>> + Send>> { + ) -> Pin, ReadJobTrace), Error>> + Send>> { Box::pin(read_next_values_2::(opts, jobtrace, scy, stmts)) } @@ -139,7 +138,7 @@ macro_rules! impl_scaty_array { impl ValTy for $vt { type ScaTy = $st; type ScyTy = $st_scy; - type Container = EventsDim1; + type Container = ContainerEvents>; fn from_scyty(inp: Self::ScyTy) -> Self { inp.into_iter().map(|x| x as Self::ScaTy).collect() @@ -183,7 +182,7 @@ macro_rules! impl_scaty_array { jobtrace: ReadJobTrace, scy: Arc, stmts: Arc, - ) -> Pin, ReadJobTrace), Error>> + Send>> { + ) -> Pin, ReadJobTrace), Error>> + Send>> { Box::pin(read_next_values_2::(opts, jobtrace, scy, stmts)) } @@ -204,7 +203,7 @@ macro_rules! impl_scaty_array { impl ValTy for EnumVariant { type ScaTy = EnumVariant; type ScyTy = i16; - type Container = EventsDim0; + type Container = ContainerEvents; fn from_scyty(inp: Self::ScyTy) -> Self { let _ = inp; @@ -237,7 +236,7 @@ impl ValTy for EnumVariant { jobtrace: ReadJobTrace, scy: Arc, stmts: Arc, - ) -> Pin, ReadJobTrace), Error>> + Send>> { + ) -> Pin, ReadJobTrace), Error>> + Send>> { Box::pin(read_next_values_2::(opts, jobtrace, scy, stmts)) } @@ -256,7 +255,7 @@ impl ValTy for EnumVariant { impl ValTy for Vec { type ScaTy = String; type ScyTy = Vec; - type Container = EventsDim1; + type Container = ContainerEvents>; fn from_scyty(inp: Self::ScyTy) -> Self { inp @@ -289,7 +288,7 @@ impl ValTy for Vec { jobtrace: ReadJobTrace, scy: Arc, stmts: Arc, - ) -> Pin, ReadJobTrace), Error>> + Send>> { + ) -> Pin, ReadJobTrace), Error>> + Send>> { let fut = read_next_values_2::(opts, jobtrace, scy, stmts); Box::pin(fut) } @@ -414,7 +413,9 @@ pub(super) struct ReadNextValuesParams { pub jobtrace: ReadJobTrace, } -pub(super) async fn read_next_values(params: ReadNextValuesParams) -> Result<(Box, ReadJobTrace), Error> +pub(super) async fn read_next_values( + params: ReadNextValuesParams, +) -> Result<(Box, ReadJobTrace), Error> where ST: ValTy, { @@ -446,7 +447,13 @@ where } }; Box::pin(fut) - as Pin, ReadJobTrace), crate::worker::Error>> + Send>> + as Pin< + Box< + dyn Future< + Output = Result<(Box, ReadJobTrace), crate::worker::Error>, + > + Send, + >, + > }; let (res, jobtrace) = scyqueue.read_next_values(futgen, jobtrace).await?; Ok((res, jobtrace)) @@ -457,7 +464,7 @@ async fn read_next_values_2( mut jobtrace: ReadJobTrace, scy: Arc, stmts: Arc, -) -> Result<(Box, ReadJobTrace), Error> +) -> Result<(Box, ReadJobTrace), Error> where ST: ValTy, { @@ -662,7 +669,7 @@ fn convert_rows_enum( bck: bool, last_before: &mut Option<(TsNano, EnumVariant)>, ) -> Result<::Container, Error> { - let mut ret = ::Container::empty(); + let mut ret = ::Container::new(); trace_fetch!("convert_rows_enum {}", ::st_name()); for row in rows { let (ts, value) = if with_values { @@ -688,7 +695,7 @@ fn convert_rows_enum( // TODO count as logic error error!("ts >= range.beg"); } else if ts < range.beg() { - ret.push(ts.ns(), 0, value); + ret.push_back(ts, value); } else { *last_before = Some((ts, value)); } @@ -697,7 +704,7 @@ fn convert_rows_enum( // TODO count as logic error error!("ts >= range.end"); } else if ts >= range.beg() { - ret.push(ts.ns(), 0, value); + ret.push_back(ts, value); } else { if last_before.is_none() { warn!("encounter event before range in forward read {ts}"); diff --git a/crates/scyllaconn/src/events2/events.rs b/crates/scyllaconn/src/events2/events.rs index 95b313f..c15465a 100644 --- a/crates/scyllaconn/src/events2/events.rs +++ b/crates/scyllaconn/src/events2/events.rs @@ -12,7 +12,8 @@ use futures_util::Future; use futures_util::FutureExt; use futures_util::Stream; use futures_util::StreamExt; -use items_0::Events; +use items_0::merge::MergeableDyn; +use items_0::timebin::BinningggContainerEventsDyn; use items_2::channelevents::ChannelEvents; use netpod::log::*; use netpod::ttl::RetentionTime; @@ -75,7 +76,6 @@ pub enum Error { ReadQueueEmptyBck, ReadQueueEmptyFwd, Logic, - Merge(#[from] items_0::MergeError), TruncateLogic, AlreadyTaken, } @@ -84,7 +84,7 @@ struct FetchMsp { fut: Pin, crate::events2::msp::Error>> + Send>>, } -type ReadEventsFutOut = Result<(Box, ReadJobTrace), crate::events2::events::Error>; +type ReadEventsFutOut = Result<(Box, ReadJobTrace), crate::events2::events::Error>; type FetchEventsFut = Pin + Send>>; @@ -181,7 +181,9 @@ struct FetchEvents { } impl FetchEvents { - fn from_fut(fut: Pin, ReadJobTrace), Error>> + Send>>) -> Self { + fn from_fut( + fut: Pin, ReadJobTrace), Error>> + Send>>, + ) -> Self { Self { fut } } } @@ -255,7 +257,7 @@ pub struct EventsStreamRt { msp_inp: MspStreamRt, msp_buf: VecDeque, msp_buf_bck: VecDeque, - out: VecDeque>, + out: VecDeque>, out_cnt: u64, ts_seen_max: u64, qucap: usize, @@ -325,7 +327,7 @@ impl EventsStreamRt { bck: bool, mfi: MakeFutInfo, jobtrace: ReadJobTrace, - ) -> Pin, ReadJobTrace), Error>> + Send>> { + ) -> Pin, ReadJobTrace), Error>> + Send>> { let opts = ReadNextValuesOpts::new(mfi.rt, mfi.series, ts_msp, mfi.range, !bck, mfi.readopts, mfi.scyqueue); let scalar_type = mfi.ch_conf.scalar_type().clone(); let shape = mfi.ch_conf.shape().clone(); @@ -491,9 +493,9 @@ impl Stream for EventsStreamRt { item_min, self.ts_seen_max ); - let mut r = items_2::merger::Mergeable::new_empty(&item); - match items_2::merger::Mergeable::find_highest_index_lt(&item, self.ts_seen_max) { - Some(ix) => match items_2::merger::Mergeable::drain_into(&mut item, &mut r, (0, 1 + ix)) { + let mut r = MergeableDyn::new_empty(&item); + match MergeableDyn::find_highest_index_lt(&item, self.ts_seen_max) { + Some(ix) => match MergeableDyn::drain_into(&mut item, &mut r, (0, 1 + ix)) { Ok(()) => { // TODO count for metrics } @@ -584,16 +586,15 @@ impl Stream for EventsStreamRt { ReadingState::FetchEvents(st2) => match st2.fut.poll_unpin(cx) { Ready(x) => match x { Ok((mut evs, jobtrace)) => { - use items_2::merger::Mergeable; trace_fetch!("ReadingBck {jobtrace}"); trace_fetch!("ReadingBck FetchEvents got len {}", evs.len()); - for ts in Mergeable::tss(&evs) { + for ts in MergeableDyn::tss_for_testing(&evs) { trace_every_event!("ReadingBck FetchEvents ts {}", ts.fmt()); } - if let Some(ix) = Mergeable::find_highest_index_lt(&evs, self.range.beg().ns()) { + if let Some(ix) = MergeableDyn::find_highest_index_lt(&evs, self.range.beg().ns()) { trace_fetch!("ReadingBck FetchEvents find_highest_index_lt {:?}", ix); - let mut y = Mergeable::new_empty(&evs); - match Mergeable::drain_into(&mut evs, &mut y, (ix, 1 + ix)) { + let mut y = MergeableDyn::new_empty(&evs); + match MergeableDyn::drain_into(&mut evs, &mut y, (ix, 1 + ix)) { Ok(()) => { trace_fetch!("ReadingBck FetchEvents drained y len {:?}", y.len()); self.out.push_back(y); @@ -660,9 +661,8 @@ impl Stream for EventsStreamRt { Ok((evs, mut jobtrace)) => { jobtrace .add_event_now(crate::events::ReadEventKind::EventsStreamRtSees(evs.len() as u32)); - use items_2::merger::Mergeable; trace_fetch!("ReadingFwd {jobtrace}"); - for ts in Mergeable::tss(&evs) { + for ts in MergeableDyn::tss_for_testing(&evs) { trace_every_event!("ReadingFwd FetchEvents ts {}", ts.fmt()); } self.out.push_back(evs); diff --git a/crates/scyllaconn/src/events2/mergertchained.rs b/crates/scyllaconn/src/events2/mergertchained.rs index 49ae1a3..b5ac33a 100644 --- a/crates/scyllaconn/src/events2/mergertchained.rs +++ b/crates/scyllaconn/src/events2/mergertchained.rs @@ -11,9 +11,9 @@ use futures_util::Future; use futures_util::FutureExt; use futures_util::Stream; use futures_util::StreamExt; +use items_0::merge::MergeableTy; use items_0::WithLen; use items_2::channelevents::ChannelEvents; -use items_2::merger::Mergeable; use netpod::log::*; use netpod::range::evrange::NanoRange; use netpod::range::evrange::SeriesRange; @@ -145,7 +145,7 @@ pub struct MergeRtsChained { buf_lt: VecDeque, out: VecDeque, buf_before: Option, - ts_seen_max: u64, + ts_seen_max: TsNano, tracer: StreamImplTracer, } @@ -169,7 +169,7 @@ impl MergeRtsChained { buf_lt: VecDeque::new(), out: VecDeque::new(), buf_before: None, - ts_seen_max: 0, + ts_seen_max: TsNano::from_ns(0), tracer: StreamImplTracer::new("MergeRtsChained".into(), 2000, 2000), } } @@ -260,7 +260,7 @@ impl MergeRtsChained { trace_fetch!("constrained_range {:?} {:?}", full, buf.front()); if let Some(e) = buf.front() { if let Some(ts) = e.ts_min() { - let nrange = NanoRange::from((full.beg().ns(), ts)); + let nrange = NanoRange::from((full.beg().ns(), ts.ns())); ScyllaSeriesRange::from(&SeriesRange::from(nrange)) } else { full.clone() @@ -311,7 +311,7 @@ impl MergeRtsChained { before.new_empty() }); if let Some(tsn) = before.ts_max() { - let tsn = TsNano::from_ns(tsn); + let tsn = tsn; if buf.ts_max().map_or(true, |x| tsn.ns() > x) { trace_fetch!("move_latest_to_before_buf move possible before item {tsn}"); let n = before.len(); @@ -373,7 +373,7 @@ impl Stream for MergeRtsChained { self.ts_seen_max = item_max; } } - if let Some(ix) = item.find_highest_index_lt(self.range.beg().ns()) { + if let Some(ix) = item.find_highest_index_lt(self.range.beg()) { trace_fetch!("see item before range ix {ix}"); } break Ready(Some(Ok(item))); diff --git a/crates/scyllaconn/src/events2/onebeforeandbulk.rs b/crates/scyllaconn/src/events2/onebeforeandbulk.rs index b3f8105..27fe4f4 100644 --- a/crates/scyllaconn/src/events2/onebeforeandbulk.rs +++ b/crates/scyllaconn/src/events2/onebeforeandbulk.rs @@ -3,8 +3,10 @@ use err::thiserror; use err::ThisError; use futures_util::Stream; use futures_util::StreamExt; +use items_0::merge::DrainIntoDstResult; +use items_0::merge::DrainIntoNewResult; +use items_0::merge::MergeableTy; use items_0::Events; -use items_2::merger::Mergeable; use netpod::log::*; use netpod::stream_impl_tracer::StreamImplTracer; use netpod::TsNano; @@ -61,7 +63,7 @@ enum State { pub struct OneBeforeAndBulk where S: Stream + Unpin, - T: Mergeable + Unpin, + T: MergeableTy + Unpin, { ts0: TsNano, inp: S, @@ -78,7 +80,7 @@ where impl OneBeforeAndBulk where S: Stream + Unpin, - T: Mergeable + Unpin, + T: MergeableTy + Unpin, { fn selfname() -> &'static str { std::any::type_name::() @@ -106,9 +108,11 @@ where debug!("buf set but empty"); None } else { - let mut ret = buf.new_empty(); - buf.drain_into(&mut ret, (buf.len() - 1, buf.len())); - Some(ret) + match buf.drain_into_new(buf.len() - 1..buf.len()) { + DrainIntoNewResult::Done(ret) => Some(ret), + DrainIntoNewResult::Partial(_) => panic!(), + DrainIntoNewResult::NotCompatible => panic!(), + } } } else { None @@ -119,7 +123,7 @@ where impl Stream for OneBeforeAndBulk where S: Stream> + Unpin, - T: Events + Mergeable + Unpin, + T: Events + MergeableTy + Unpin, E: std::error::Error + Send + 'static, { type Item = Result, Error>; @@ -135,14 +139,14 @@ where match &self.state { State::Begin => match self.inp.poll_next_unpin(cx) { Ready(Some(Ok(mut item))) => { - if let Some(tsmin) = Mergeable::ts_min(&item) { - let tsmin = TsNano::from_ns(tsmin); + if let Some(tsmin) = MergeableTy::ts_min(&item) { + let tsmin = tsmin; if tsmin < self.tslast { self.state = State::Done; let e = Error::Unordered; break Ready(Some(Err(e))); } else { - self.tslast = TsNano::from_ns(Mergeable::ts_max(&item).unwrap()); + self.tslast = MergeableTy::ts_max(&item).unwrap(); } } if item.verify() != true { @@ -176,15 +180,20 @@ where self.dbgname, item.len() ); - let buf = self.buf.get_or_insert_with(|| item.new_empty()); - match item.drain_into_evs(buf, (0, item.len())) { - Ok(()) => { - continue; - } - Err(e) => { - self.state = State::Done; - Ready(Some(Err(Error::Input(Box::new(e))))) - } + match self.buf.as_mut() { + Some(buf) => match item.drain_into(buf, 0..item.len()) { + DrainIntoDstResult::Done => continue, + DrainIntoDstResult::Partial => panic!(), + DrainIntoDstResult::NotCompatible => panic!(), + }, + None => match item.drain_into_new(0..item.len()) { + DrainIntoNewResult::Done(buf) => { + self.buf = Some(buf); + continue; + } + DrainIntoNewResult::Partial(_) => panic!(), + DrainIntoNewResult::NotCompatible => panic!(), + }, } } else if pp == 0 { // all entries are bulk @@ -204,25 +213,56 @@ where // mixed trace_transition!("transition with mixed to Bulk"); self.state = State::Bulk; - let buf = self.buf.get_or_insert_with(|| item.new_empty()); - match item.drain_into_evs(buf, (0, pp)) { - Ok(()) => { - if let Some(before) = self.consume_buf_get_latest() { - self.out.push_back(item); - let item = Output::Before(before); - trace_emit!("State::Begin Before {} emit {:?}", self.dbgname, item); - Ready(Some(Ok(item))) - } else { - let item = Output::Bulk(item); - trace_emit!("State::Begin Bulk {} emit {:?}", self.dbgname, item); - Ready(Some(Ok(item))) + match self.buf.as_mut() { + Some(buf) => match item.drain_into(buf, 0..pp) { + DrainIntoDstResult::Done => { + if let Some(before) = self.consume_buf_get_latest() { + self.out.push_back(item); + let item = Output::Before(before); + trace_emit!( + "State::Begin Before {} emit {:?}", + self.dbgname, + item + ); + Ready(Some(Ok(item))) + } else { + let item = Output::Bulk(item); + trace_emit!( + "State::Begin Bulk {} emit {:?}", + self.dbgname, + item + ); + Ready(Some(Ok(item))) + } } - } - Err(e) => { - self.state = State::Done; - let e = Error::Input(Box::new(e)); - Ready(Some(Err(e))) - } + DrainIntoDstResult::Partial => panic!(), + DrainIntoDstResult::NotCompatible => panic!(), + }, + None => match item.drain_into_new(0..pp) { + DrainIntoNewResult::Done(buf) => { + self.buf = Some(buf); + if let Some(before) = self.consume_buf_get_latest() { + self.out.push_back(item); + let item = Output::Before(before); + trace_emit!( + "State::Begin Before {} emit {:?}", + self.dbgname, + item + ); + Ready(Some(Ok(item))) + } else { + let item = Output::Bulk(item); + trace_emit!( + "State::Begin Bulk {} emit {:?}", + self.dbgname, + item + ); + Ready(Some(Ok(item))) + } + } + DrainIntoNewResult::Partial(_) => panic!(), + DrainIntoNewResult::NotCompatible => panic!(), + }, } } } @@ -254,14 +294,14 @@ where } else { match self.inp.poll_next_unpin(cx) { Ready(Some(Ok(item))) => { - if let Some(tsmin) = Mergeable::ts_min(&item) { - let tsmin = TsNano::from_ns(tsmin); + if let Some(tsmin) = MergeableTy::ts_min(&item) { + let tsmin = tsmin; if tsmin < self.tslast { self.state = State::Done; let e = Error::Unordered; break Ready(Some(Err(e))); } else { - self.tslast = TsNano::from_ns(Mergeable::ts_max(&item).unwrap()); + self.tslast = MergeableTy::ts_max(&item).unwrap(); } } if item.verify() != true { diff --git a/crates/scyllaconn/src/worker.rs b/crates/scyllaconn/src/worker.rs index 0a05732..474f3f6 100644 --- a/crates/scyllaconn/src/worker.rs +++ b/crates/scyllaconn/src/worker.rs @@ -10,7 +10,7 @@ use err::thiserror; use err::ThisError; use futures_util::Future; use futures_util::StreamExt; -use items_0::Events; +use items_0::timebin::BinningggContainerEventsDyn; use items_2::binning::container_bins::ContainerBins; use netpod::log::*; use netpod::ttl::RetentionTime; @@ -78,10 +78,11 @@ struct ReadNextValues { Arc, Arc, ReadJobTrace, - ) -> Pin, ReadJobTrace), Error>> + Send>> - + Send, + ) -> Pin< + Box, ReadJobTrace), Error>> + Send>, + > + Send, >, - tx: Sender, ReadJobTrace), Error>>, + tx: Sender, ReadJobTrace), Error>>, jobtrace: ReadJobTrace, } @@ -115,14 +116,15 @@ impl ScyllaQueue { &self, futgen: F, jobtrace: ReadJobTrace, - ) -> Result<(Box, ReadJobTrace), Error> + ) -> Result<(Box, ReadJobTrace), Error> where F: FnOnce( Arc, Arc, ReadJobTrace, - ) -> Pin, ReadJobTrace), Error>> + Send>> - + Send + ) -> Pin< + Box, ReadJobTrace), Error>> + Send>, + > + Send + 'static, { let (tx, rx) = async_channel::bounded(1);