This commit is contained in:
Dominik Werder
2024-11-25 06:51:04 +01:00
parent 7e8c6bd676
commit 38f8b19473
6 changed files with 198 additions and 100 deletions

View File

@@ -6,7 +6,7 @@ use items_0::scalar_ops::ScalarOps;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
use items_0::Events;
use items_0::timebin::BinningggContainerEventsDyn;
use items_0::WithLen;
use items_2::empty::empty_events_dyn_ev;
use items_2::eventfull::EventFull;
@@ -137,15 +137,36 @@ impl ScalarValueFromBytes<bool> for bool {
}
pub trait ValueFromBytes: Send {
fn convert(&self, ts: u64, pulse: u64, buf: &[u8], endian: Endian, events: &mut dyn Events) -> Result<(), Error>;
fn convert(
&self,
ts: u64,
pulse: u64,
buf: &[u8],
endian: Endian,
events: &mut dyn BinningggContainerEventsDyn,
) -> Result<(), Error>;
}
pub trait ValueDim0FromBytes {
fn convert(&self, ts: u64, pulse: u64, buf: &[u8], endian: Endian, events: &mut dyn Events) -> Result<(), Error>;
fn convert(
&self,
ts: u64,
pulse: u64,
buf: &[u8],
endian: Endian,
events: &mut dyn BinningggContainerEventsDyn,
) -> Result<(), Error>;
}
pub trait ValueDim1FromBytes {
fn convert(&self, ts: u64, pulse: u64, buf: &[u8], endian: Endian, events: &mut dyn Events) -> Result<(), Error>;
fn convert(
&self,
ts: u64,
pulse: u64,
buf: &[u8],
endian: Endian,
events: &mut dyn BinningggContainerEventsDyn,
) -> Result<(), Error>;
}
pub struct ValueDim0FromBytesImpl<STY>
@@ -170,7 +191,14 @@ impl<STY> ValueDim0FromBytes for ValueDim0FromBytesImpl<STY>
where
STY: ScalarOps + ScalarValueFromBytes<STY>,
{
fn convert(&self, ts: u64, pulse: u64, buf: &[u8], endian: Endian, events: &mut dyn Events) -> Result<(), Error> {
fn convert(
&self,
ts: u64,
pulse: u64,
buf: &[u8],
endian: Endian,
events: &mut dyn BinningggContainerEventsDyn,
) -> Result<(), Error> {
if let Some(evs) = events.as_any_mut().downcast_mut::<EventsDim0<STY>>() {
let v = <STY as ScalarValueFromBytes<STY>>::convert(buf, endian)?;
evs.values.push_back(v);
@@ -187,7 +215,14 @@ impl<STY> ValueFromBytes for ValueDim0FromBytesImpl<STY>
where
STY: ScalarOps + ScalarValueFromBytes<STY>,
{
fn convert(&self, ts: u64, pulse: u64, buf: &[u8], endian: Endian, events: &mut dyn Events) -> Result<(), Error> {
fn convert(
&self,
ts: u64,
pulse: u64,
buf: &[u8],
endian: Endian,
events: &mut dyn BinningggContainerEventsDyn,
) -> Result<(), Error> {
ValueDim0FromBytes::convert(self, ts, pulse, buf, endian, events)
}
}
@@ -216,7 +251,14 @@ impl<STY> ValueFromBytes for ValueDim1FromBytesImpl<STY>
where
STY: ScalarOps + ScalarValueFromBytes<STY>,
{
fn convert(&self, ts: u64, pulse: u64, buf: &[u8], endian: Endian, events: &mut dyn Events) -> Result<(), Error> {
fn convert(
&self,
ts: u64,
pulse: u64,
buf: &[u8],
endian: Endian,
events: &mut dyn BinningggContainerEventsDyn,
) -> Result<(), Error> {
ValueDim1FromBytes::convert(self, ts, pulse, buf, endian, events)
}
}
@@ -225,7 +267,14 @@ impl<STY> ValueDim1FromBytes for ValueDim1FromBytesImpl<STY>
where
STY: ScalarOps + ScalarValueFromBytes<STY>,
{
fn convert(&self, ts: u64, pulse: u64, buf: &[u8], endian: Endian, events: &mut dyn Events) -> Result<(), Error> {
fn convert(
&self,
ts: u64,
pulse: u64,
buf: &[u8],
endian: Endian,
events: &mut dyn BinningggContainerEventsDyn,
) -> Result<(), Error> {
if let Some(evs) = events.as_any_mut().downcast_mut::<EventsDim1<STY>>() {
let n = if let Shape::Wave(n) = self.shape {
n
@@ -304,7 +353,7 @@ pub struct EventsDynStream {
scalar_type: ScalarType,
shape: Shape,
events_full: Pin<Box<dyn Stream<Item = Sitemty<EventFull>> + Send>>,
events_out: Box<dyn Events>,
events_out: Box<dyn BinningggContainerEventsDyn>,
scalar_conv: Box<dyn ValueFromBytes>,
emit_threshold: usize,
done: bool,
@@ -346,7 +395,7 @@ impl EventsDynStream {
Ok(ret)
}
fn replace_events_out(&mut self) -> Result<Box<dyn Events>, Error> {
fn replace_events_out(&mut self) -> Result<Box<dyn BinningggContainerEventsDyn>, Error> {
let st = &self.scalar_type;
let sh = &self.shape;
// error!("TODO replace_events_out feed through transform");
@@ -374,7 +423,7 @@ impl EventsDynStream {
fn handle_stream_item(
&mut self,
item: StreamItem<RangeCompletableItem<EventFull>>,
) -> Result<Option<Sitemty<Box<dyn Events>>>, Error> {
) -> Result<Option<Sitemty<Box<dyn BinningggContainerEventsDyn>>>, Error> {
let ret = match item {
StreamItem::DataItem(item) => match item {
RangeCompletableItem::RangeComplete => {
@@ -401,7 +450,7 @@ impl EventsDynStream {
}
impl Stream for EventsDynStream {
type Item = Sitemty<Box<dyn Events>>;
type Item = Sitemty<Box<dyn BinningggContainerEventsDyn>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;

View File

@@ -10,12 +10,11 @@ use err::ThisError;
use futures_util::Future;
use futures_util::StreamExt;
use items_0::scalar_ops::ScalarOps;
use items_0::timebin::BinningggContainerEventsDyn;
use items_0::Appendable;
use items_0::Empty;
use items_0::Events;
use items_0::WithLen;
use items_2::eventsdim0::EventsDim0;
use items_2::eventsdim1::EventsDim1;
use items_2::binning::container_events::ContainerEvents;
use netpod::log::*;
use netpod::ttl::RetentionTime;
use netpod::DtNano;
@@ -57,7 +56,7 @@ impl From<crate::worker::Error> for Error {
pub(super) trait ValTy: Sized + 'static {
type ScaTy: ScalarOps + std::default::Default;
type ScyTy: scylla::cql_to_rust::FromCqlVal<scylla::frame::response::result::CqlValue>;
type Container: Events + Appendable<Self>;
type Container: BinningggContainerEventsDyn + Appendable<Self>;
fn from_scyty(inp: Self::ScyTy) -> Self;
fn from_valueblob(inp: Vec<u8>) -> Self;
fn table_name() -> &'static str;
@@ -69,7 +68,7 @@ pub(super) trait ValTy: Sized + 'static {
jobtrace: ReadJobTrace,
scy: Arc<Session>,
stmts: Arc<StmtsEvents>,
) -> Pin<Box<dyn Future<Output = Result<(Box<dyn Events>, ReadJobTrace), Error>> + Send>>;
) -> Pin<Box<dyn Future<Output = Result<(Box<dyn BinningggContainerEventsDyn>, ReadJobTrace), Error>> + Send>>;
fn convert_rows(
rows: Vec<Row>,
range: ScyllaSeriesRange,
@@ -85,7 +84,7 @@ macro_rules! impl_scaty_scalar {
impl ValTy for $st {
type ScaTy = $st;
type ScyTy = $st_scy;
type Container = EventsDim0<Self::ScaTy>;
type Container = ContainerEvents<Self::ScaTy>;
fn from_scyty(inp: Self::ScyTy) -> Self {
inp as Self
@@ -116,7 +115,7 @@ macro_rules! impl_scaty_scalar {
jobtrace: ReadJobTrace,
scy: Arc<Session>,
stmts: Arc<StmtsEvents>,
) -> Pin<Box<dyn Future<Output = Result<(Box<dyn Events>, ReadJobTrace), Error>> + Send>> {
) -> Pin<Box<dyn Future<Output = Result<(Box<dyn BinningggContainerEventsDyn>, ReadJobTrace), Error>> + Send>> {
Box::pin(read_next_values_2::<Self>(opts, jobtrace, scy, stmts))
}
@@ -139,7 +138,7 @@ macro_rules! impl_scaty_array {
impl ValTy for $vt {
type ScaTy = $st;
type ScyTy = $st_scy;
type Container = EventsDim1<Self::ScaTy>;
type Container = ContainerEvents<Vec<Self::ScaTy>>;
fn from_scyty(inp: Self::ScyTy) -> Self {
inp.into_iter().map(|x| x as Self::ScaTy).collect()
@@ -183,7 +182,7 @@ macro_rules! impl_scaty_array {
jobtrace: ReadJobTrace,
scy: Arc<Session>,
stmts: Arc<StmtsEvents>,
) -> Pin<Box<dyn Future<Output = Result<(Box<dyn Events>, ReadJobTrace), Error>> + Send>> {
) -> Pin<Box<dyn Future<Output = Result<(Box<dyn BinningggContainerEventsDyn>, ReadJobTrace), Error>> + Send>> {
Box::pin(read_next_values_2::<Self>(opts, jobtrace, scy, stmts))
}
@@ -204,7 +203,7 @@ macro_rules! impl_scaty_array {
impl ValTy for EnumVariant {
type ScaTy = EnumVariant;
type ScyTy = i16;
type Container = EventsDim0<EnumVariant>;
type Container = ContainerEvents<EnumVariant>;
fn from_scyty(inp: Self::ScyTy) -> Self {
let _ = inp;
@@ -237,7 +236,7 @@ impl ValTy for EnumVariant {
jobtrace: ReadJobTrace,
scy: Arc<Session>,
stmts: Arc<StmtsEvents>,
) -> Pin<Box<dyn Future<Output = Result<(Box<dyn Events>, ReadJobTrace), Error>> + Send>> {
) -> Pin<Box<dyn Future<Output = Result<(Box<dyn BinningggContainerEventsDyn>, ReadJobTrace), Error>> + Send>> {
Box::pin(read_next_values_2::<Self>(opts, jobtrace, scy, stmts))
}
@@ -256,7 +255,7 @@ impl ValTy for EnumVariant {
impl ValTy for Vec<String> {
type ScaTy = String;
type ScyTy = Vec<String>;
type Container = EventsDim1<String>;
type Container = ContainerEvents<Vec<String>>;
fn from_scyty(inp: Self::ScyTy) -> Self {
inp
@@ -289,7 +288,7 @@ impl ValTy for Vec<String> {
jobtrace: ReadJobTrace,
scy: Arc<Session>,
stmts: Arc<StmtsEvents>,
) -> Pin<Box<dyn Future<Output = Result<(Box<dyn Events>, ReadJobTrace), Error>> + Send>> {
) -> Pin<Box<dyn Future<Output = Result<(Box<dyn BinningggContainerEventsDyn>, ReadJobTrace), Error>> + Send>> {
let fut = read_next_values_2::<Self>(opts, jobtrace, scy, stmts);
Box::pin(fut)
}
@@ -414,7 +413,9 @@ pub(super) struct ReadNextValuesParams {
pub jobtrace: ReadJobTrace,
}
pub(super) async fn read_next_values<ST>(params: ReadNextValuesParams) -> Result<(Box<dyn Events>, ReadJobTrace), Error>
pub(super) async fn read_next_values<ST>(
params: ReadNextValuesParams,
) -> Result<(Box<dyn BinningggContainerEventsDyn>, ReadJobTrace), Error>
where
ST: ValTy,
{
@@ -446,7 +447,13 @@ where
}
};
Box::pin(fut)
as Pin<Box<dyn Future<Output = Result<(Box<dyn Events>, ReadJobTrace), crate::worker::Error>> + Send>>
as Pin<
Box<
dyn Future<
Output = Result<(Box<dyn BinningggContainerEventsDyn>, ReadJobTrace), crate::worker::Error>,
> + Send,
>,
>
};
let (res, jobtrace) = scyqueue.read_next_values(futgen, jobtrace).await?;
Ok((res, jobtrace))
@@ -457,7 +464,7 @@ async fn read_next_values_2<ST>(
mut jobtrace: ReadJobTrace,
scy: Arc<Session>,
stmts: Arc<StmtsEvents>,
) -> Result<(Box<dyn Events>, ReadJobTrace), Error>
) -> Result<(Box<dyn BinningggContainerEventsDyn>, ReadJobTrace), Error>
where
ST: ValTy,
{
@@ -662,7 +669,7 @@ fn convert_rows_enum(
bck: bool,
last_before: &mut Option<(TsNano, EnumVariant)>,
) -> Result<<EnumVariant as ValTy>::Container, Error> {
let mut ret = <EnumVariant as ValTy>::Container::empty();
let mut ret = <EnumVariant as ValTy>::Container::new();
trace_fetch!("convert_rows_enum {}", <EnumVariant as ValTy>::st_name());
for row in rows {
let (ts, value) = if with_values {
@@ -688,7 +695,7 @@ fn convert_rows_enum(
// TODO count as logic error
error!("ts >= range.beg");
} else if ts < range.beg() {
ret.push(ts.ns(), 0, value);
ret.push_back(ts, value);
} else {
*last_before = Some((ts, value));
}
@@ -697,7 +704,7 @@ fn convert_rows_enum(
// TODO count as logic error
error!("ts >= range.end");
} else if ts >= range.beg() {
ret.push(ts.ns(), 0, value);
ret.push_back(ts, value);
} else {
if last_before.is_none() {
warn!("encounter event before range in forward read {ts}");

View File

@@ -12,7 +12,8 @@ use futures_util::Future;
use futures_util::FutureExt;
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::Events;
use items_0::merge::MergeableDyn;
use items_0::timebin::BinningggContainerEventsDyn;
use items_2::channelevents::ChannelEvents;
use netpod::log::*;
use netpod::ttl::RetentionTime;
@@ -75,7 +76,6 @@ pub enum Error {
ReadQueueEmptyBck,
ReadQueueEmptyFwd,
Logic,
Merge(#[from] items_0::MergeError),
TruncateLogic,
AlreadyTaken,
}
@@ -84,7 +84,7 @@ struct FetchMsp {
fut: Pin<Box<dyn Future<Output = Result<Vec<TsMs>, crate::events2::msp::Error>> + Send>>,
}
type ReadEventsFutOut = Result<(Box<dyn Events>, ReadJobTrace), crate::events2::events::Error>;
type ReadEventsFutOut = Result<(Box<dyn BinningggContainerEventsDyn>, ReadJobTrace), crate::events2::events::Error>;
type FetchEventsFut = Pin<Box<dyn Future<Output = ReadEventsFutOut> + Send>>;
@@ -181,7 +181,9 @@ struct FetchEvents {
}
impl FetchEvents {
fn from_fut(fut: Pin<Box<dyn Future<Output = Result<(Box<dyn Events>, ReadJobTrace), Error>> + Send>>) -> Self {
fn from_fut(
fut: Pin<Box<dyn Future<Output = Result<(Box<dyn BinningggContainerEventsDyn>, ReadJobTrace), Error>> + Send>>,
) -> Self {
Self { fut }
}
}
@@ -255,7 +257,7 @@ pub struct EventsStreamRt {
msp_inp: MspStreamRt,
msp_buf: VecDeque<TsMs>,
msp_buf_bck: VecDeque<TsMs>,
out: VecDeque<Box<dyn Events>>,
out: VecDeque<Box<dyn BinningggContainerEventsDyn>>,
out_cnt: u64,
ts_seen_max: u64,
qucap: usize,
@@ -325,7 +327,7 @@ impl EventsStreamRt {
bck: bool,
mfi: MakeFutInfo,
jobtrace: ReadJobTrace,
) -> Pin<Box<dyn Future<Output = Result<(Box<dyn Events>, ReadJobTrace), Error>> + Send>> {
) -> Pin<Box<dyn Future<Output = Result<(Box<dyn BinningggContainerEventsDyn>, ReadJobTrace), Error>> + Send>> {
let opts = ReadNextValuesOpts::new(mfi.rt, mfi.series, ts_msp, mfi.range, !bck, mfi.readopts, mfi.scyqueue);
let scalar_type = mfi.ch_conf.scalar_type().clone();
let shape = mfi.ch_conf.shape().clone();
@@ -491,9 +493,9 @@ impl Stream for EventsStreamRt {
item_min,
self.ts_seen_max
);
let mut r = items_2::merger::Mergeable::new_empty(&item);
match items_2::merger::Mergeable::find_highest_index_lt(&item, self.ts_seen_max) {
Some(ix) => match items_2::merger::Mergeable::drain_into(&mut item, &mut r, (0, 1 + ix)) {
let mut r = MergeableDyn::new_empty(&item);
match MergeableDyn::find_highest_index_lt(&item, self.ts_seen_max) {
Some(ix) => match MergeableDyn::drain_into(&mut item, &mut r, (0, 1 + ix)) {
Ok(()) => {
// TODO count for metrics
}
@@ -584,16 +586,15 @@ impl Stream for EventsStreamRt {
ReadingState::FetchEvents(st2) => match st2.fut.poll_unpin(cx) {
Ready(x) => match x {
Ok((mut evs, jobtrace)) => {
use items_2::merger::Mergeable;
trace_fetch!("ReadingBck {jobtrace}");
trace_fetch!("ReadingBck FetchEvents got len {}", evs.len());
for ts in Mergeable::tss(&evs) {
for ts in MergeableDyn::tss_for_testing(&evs) {
trace_every_event!("ReadingBck FetchEvents ts {}", ts.fmt());
}
if let Some(ix) = Mergeable::find_highest_index_lt(&evs, self.range.beg().ns()) {
if let Some(ix) = MergeableDyn::find_highest_index_lt(&evs, self.range.beg().ns()) {
trace_fetch!("ReadingBck FetchEvents find_highest_index_lt {:?}", ix);
let mut y = Mergeable::new_empty(&evs);
match Mergeable::drain_into(&mut evs, &mut y, (ix, 1 + ix)) {
let mut y = MergeableDyn::new_empty(&evs);
match MergeableDyn::drain_into(&mut evs, &mut y, (ix, 1 + ix)) {
Ok(()) => {
trace_fetch!("ReadingBck FetchEvents drained y len {:?}", y.len());
self.out.push_back(y);
@@ -660,9 +661,8 @@ impl Stream for EventsStreamRt {
Ok((evs, mut jobtrace)) => {
jobtrace
.add_event_now(crate::events::ReadEventKind::EventsStreamRtSees(evs.len() as u32));
use items_2::merger::Mergeable;
trace_fetch!("ReadingFwd {jobtrace}");
for ts in Mergeable::tss(&evs) {
for ts in MergeableDyn::tss_for_testing(&evs) {
trace_every_event!("ReadingFwd FetchEvents ts {}", ts.fmt());
}
self.out.push_back(evs);

View File

@@ -11,9 +11,9 @@ use futures_util::Future;
use futures_util::FutureExt;
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::merge::MergeableTy;
use items_0::WithLen;
use items_2::channelevents::ChannelEvents;
use items_2::merger::Mergeable;
use netpod::log::*;
use netpod::range::evrange::NanoRange;
use netpod::range::evrange::SeriesRange;
@@ -145,7 +145,7 @@ pub struct MergeRtsChained {
buf_lt: VecDeque<ChannelEvents>,
out: VecDeque<ChannelEvents>,
buf_before: Option<ChannelEvents>,
ts_seen_max: u64,
ts_seen_max: TsNano,
tracer: StreamImplTracer,
}
@@ -169,7 +169,7 @@ impl MergeRtsChained {
buf_lt: VecDeque::new(),
out: VecDeque::new(),
buf_before: None,
ts_seen_max: 0,
ts_seen_max: TsNano::from_ns(0),
tracer: StreamImplTracer::new("MergeRtsChained".into(), 2000, 2000),
}
}
@@ -260,7 +260,7 @@ impl MergeRtsChained {
trace_fetch!("constrained_range {:?} {:?}", full, buf.front());
if let Some(e) = buf.front() {
if let Some(ts) = e.ts_min() {
let nrange = NanoRange::from((full.beg().ns(), ts));
let nrange = NanoRange::from((full.beg().ns(), ts.ns()));
ScyllaSeriesRange::from(&SeriesRange::from(nrange))
} else {
full.clone()
@@ -311,7 +311,7 @@ impl MergeRtsChained {
before.new_empty()
});
if let Some(tsn) = before.ts_max() {
let tsn = TsNano::from_ns(tsn);
let tsn = tsn;
if buf.ts_max().map_or(true, |x| tsn.ns() > x) {
trace_fetch!("move_latest_to_before_buf move possible before item {tsn}");
let n = before.len();
@@ -373,7 +373,7 @@ impl Stream for MergeRtsChained {
self.ts_seen_max = item_max;
}
}
if let Some(ix) = item.find_highest_index_lt(self.range.beg().ns()) {
if let Some(ix) = item.find_highest_index_lt(self.range.beg()) {
trace_fetch!("see item before range ix {ix}");
}
break Ready(Some(Ok(item)));

View File

@@ -3,8 +3,10 @@ use err::thiserror;
use err::ThisError;
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::merge::DrainIntoDstResult;
use items_0::merge::DrainIntoNewResult;
use items_0::merge::MergeableTy;
use items_0::Events;
use items_2::merger::Mergeable;
use netpod::log::*;
use netpod::stream_impl_tracer::StreamImplTracer;
use netpod::TsNano;
@@ -61,7 +63,7 @@ enum State {
pub struct OneBeforeAndBulk<S, T>
where
S: Stream + Unpin,
T: Mergeable + Unpin,
T: MergeableTy + Unpin,
{
ts0: TsNano,
inp: S,
@@ -78,7 +80,7 @@ where
impl<S, T> OneBeforeAndBulk<S, T>
where
S: Stream + Unpin,
T: Mergeable + Unpin,
T: MergeableTy + Unpin,
{
fn selfname() -> &'static str {
std::any::type_name::<Self>()
@@ -106,9 +108,11 @@ where
debug!("buf set but empty");
None
} else {
let mut ret = buf.new_empty();
buf.drain_into(&mut ret, (buf.len() - 1, buf.len()));
Some(ret)
match buf.drain_into_new(buf.len() - 1..buf.len()) {
DrainIntoNewResult::Done(ret) => Some(ret),
DrainIntoNewResult::Partial(_) => panic!(),
DrainIntoNewResult::NotCompatible => panic!(),
}
}
} else {
None
@@ -119,7 +123,7 @@ where
impl<S, T, E> Stream for OneBeforeAndBulk<S, T>
where
S: Stream<Item = Result<T, E>> + Unpin,
T: Events + Mergeable + Unpin,
T: Events + MergeableTy + Unpin,
E: std::error::Error + Send + 'static,
{
type Item = Result<Output<T>, Error>;
@@ -135,14 +139,14 @@ where
match &self.state {
State::Begin => match self.inp.poll_next_unpin(cx) {
Ready(Some(Ok(mut item))) => {
if let Some(tsmin) = Mergeable::ts_min(&item) {
let tsmin = TsNano::from_ns(tsmin);
if let Some(tsmin) = MergeableTy::ts_min(&item) {
let tsmin = tsmin;
if tsmin < self.tslast {
self.state = State::Done;
let e = Error::Unordered;
break Ready(Some(Err(e)));
} else {
self.tslast = TsNano::from_ns(Mergeable::ts_max(&item).unwrap());
self.tslast = MergeableTy::ts_max(&item).unwrap();
}
}
if item.verify() != true {
@@ -176,15 +180,20 @@ where
self.dbgname,
item.len()
);
let buf = self.buf.get_or_insert_with(|| item.new_empty());
match item.drain_into_evs(buf, (0, item.len())) {
Ok(()) => {
continue;
}
Err(e) => {
self.state = State::Done;
Ready(Some(Err(Error::Input(Box::new(e)))))
}
match self.buf.as_mut() {
Some(buf) => match item.drain_into(buf, 0..item.len()) {
DrainIntoDstResult::Done => continue,
DrainIntoDstResult::Partial => panic!(),
DrainIntoDstResult::NotCompatible => panic!(),
},
None => match item.drain_into_new(0..item.len()) {
DrainIntoNewResult::Done(buf) => {
self.buf = Some(buf);
continue;
}
DrainIntoNewResult::Partial(_) => panic!(),
DrainIntoNewResult::NotCompatible => panic!(),
},
}
} else if pp == 0 {
// all entries are bulk
@@ -204,25 +213,56 @@ where
// mixed
trace_transition!("transition with mixed to Bulk");
self.state = State::Bulk;
let buf = self.buf.get_or_insert_with(|| item.new_empty());
match item.drain_into_evs(buf, (0, pp)) {
Ok(()) => {
if let Some(before) = self.consume_buf_get_latest() {
self.out.push_back(item);
let item = Output::Before(before);
trace_emit!("State::Begin Before {} emit {:?}", self.dbgname, item);
Ready(Some(Ok(item)))
} else {
let item = Output::Bulk(item);
trace_emit!("State::Begin Bulk {} emit {:?}", self.dbgname, item);
Ready(Some(Ok(item)))
match self.buf.as_mut() {
Some(buf) => match item.drain_into(buf, 0..pp) {
DrainIntoDstResult::Done => {
if let Some(before) = self.consume_buf_get_latest() {
self.out.push_back(item);
let item = Output::Before(before);
trace_emit!(
"State::Begin Before {} emit {:?}",
self.dbgname,
item
);
Ready(Some(Ok(item)))
} else {
let item = Output::Bulk(item);
trace_emit!(
"State::Begin Bulk {} emit {:?}",
self.dbgname,
item
);
Ready(Some(Ok(item)))
}
}
}
Err(e) => {
self.state = State::Done;
let e = Error::Input(Box::new(e));
Ready(Some(Err(e)))
}
DrainIntoDstResult::Partial => panic!(),
DrainIntoDstResult::NotCompatible => panic!(),
},
None => match item.drain_into_new(0..pp) {
DrainIntoNewResult::Done(buf) => {
self.buf = Some(buf);
if let Some(before) = self.consume_buf_get_latest() {
self.out.push_back(item);
let item = Output::Before(before);
trace_emit!(
"State::Begin Before {} emit {:?}",
self.dbgname,
item
);
Ready(Some(Ok(item)))
} else {
let item = Output::Bulk(item);
trace_emit!(
"State::Begin Bulk {} emit {:?}",
self.dbgname,
item
);
Ready(Some(Ok(item)))
}
}
DrainIntoNewResult::Partial(_) => panic!(),
DrainIntoNewResult::NotCompatible => panic!(),
},
}
}
}
@@ -254,14 +294,14 @@ where
} else {
match self.inp.poll_next_unpin(cx) {
Ready(Some(Ok(item))) => {
if let Some(tsmin) = Mergeable::ts_min(&item) {
let tsmin = TsNano::from_ns(tsmin);
if let Some(tsmin) = MergeableTy::ts_min(&item) {
let tsmin = tsmin;
if tsmin < self.tslast {
self.state = State::Done;
let e = Error::Unordered;
break Ready(Some(Err(e)));
} else {
self.tslast = TsNano::from_ns(Mergeable::ts_max(&item).unwrap());
self.tslast = MergeableTy::ts_max(&item).unwrap();
}
}
if item.verify() != true {

View File

@@ -10,7 +10,7 @@ use err::thiserror;
use err::ThisError;
use futures_util::Future;
use futures_util::StreamExt;
use items_0::Events;
use items_0::timebin::BinningggContainerEventsDyn;
use items_2::binning::container_bins::ContainerBins;
use netpod::log::*;
use netpod::ttl::RetentionTime;
@@ -78,10 +78,11 @@ struct ReadNextValues {
Arc<Session>,
Arc<StmtsEvents>,
ReadJobTrace,
) -> Pin<Box<dyn Future<Output = Result<(Box<dyn Events>, ReadJobTrace), Error>> + Send>>
+ Send,
) -> Pin<
Box<dyn Future<Output = Result<(Box<dyn BinningggContainerEventsDyn>, ReadJobTrace), Error>> + Send>,
> + Send,
>,
tx: Sender<Result<(Box<dyn Events>, ReadJobTrace), Error>>,
tx: Sender<Result<(Box<dyn BinningggContainerEventsDyn>, ReadJobTrace), Error>>,
jobtrace: ReadJobTrace,
}
@@ -115,14 +116,15 @@ impl ScyllaQueue {
&self,
futgen: F,
jobtrace: ReadJobTrace,
) -> Result<(Box<dyn Events>, ReadJobTrace), Error>
) -> Result<(Box<dyn BinningggContainerEventsDyn>, ReadJobTrace), Error>
where
F: FnOnce(
Arc<Session>,
Arc<StmtsEvents>,
ReadJobTrace,
) -> Pin<Box<dyn Future<Output = Result<(Box<dyn Events>, ReadJobTrace), Error>> + Send>>
+ Send
) -> Pin<
Box<dyn Future<Output = Result<(Box<dyn BinningggContainerEventsDyn>, ReadJobTrace), Error>> + Send>,
> + Send
+ 'static,
{
let (tx, rx) = async_channel::bounded(1);