WIP new container

This commit is contained in:
Dominik Werder
2024-11-24 22:32:04 +01:00
parent e17bb885fc
commit 46b3d28db2
20 changed files with 988 additions and 491 deletions

View File

@@ -1,18 +1,23 @@
use crate::Error;
use crate::log::*;
use core::ops::Range;
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::container::ByteEstimate;
use items_0::merge::DrainIntoDstResult;
use items_0::merge::DrainIntoNewResult;
use items_0::merge::MergeableTy;
use items_0::on_sitemty_data;
use items_0::streamitem::sitem_data;
use items_0::streamitem::sitem_err2_from_string;
use items_0::streamitem::LogItem;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::SitemErrTy;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
use items_0::Events;
use items_0::MergeError;
use items_0::WithLen;
use netpod::log::*;
use netpod::TsMs;
use netpod::TsNano;
use std::collections::VecDeque;
use std::fmt;
use std::ops::ControlFlow;
@@ -29,59 +34,14 @@ macro_rules! trace3 { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
macro_rules! trace4 { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
pub trait Mergeable<Rhs = Self>: fmt::Debug + WithLen + ByteEstimate + Unpin {
fn ts_min(&self) -> Option<u64>;
fn ts_max(&self) -> Option<u64>;
fn new_empty(&self) -> Self;
fn clear(&mut self);
// TODO when MergeError::Full gets returned, any guarantees about what has been modified or kept unchanged?
fn drain_into(&mut self, dst: &mut Self, range: (usize, usize)) -> Result<(), MergeError>;
fn find_lowest_index_gt(&self, ts: u64) -> Option<usize>;
fn find_lowest_index_ge(&self, ts: u64) -> Option<usize>;
fn find_highest_index_lt(&self, ts: u64) -> Option<usize>;
// TODO only for testing:
fn tss(&self) -> Vec<TsMs>;
}
impl Mergeable for Box<dyn Events> {
fn ts_min(&self) -> Option<u64> {
self.as_ref().ts_min()
}
fn ts_max(&self) -> Option<u64> {
self.as_ref().ts_max()
}
fn new_empty(&self) -> Self {
self.as_ref().new_empty_evs()
}
fn clear(&mut self) {
Events::clear(self.as_mut())
}
fn drain_into(&mut self, dst: &mut Self, range: (usize, usize)) -> Result<(), MergeError> {
self.as_mut().drain_into_evs(dst, range)
}
fn find_lowest_index_gt(&self, ts: u64) -> Option<usize> {
self.as_ref().find_lowest_index_gt_evs(ts)
}
fn find_lowest_index_ge(&self, ts: u64) -> Option<usize> {
self.as_ref().find_lowest_index_ge_evs(ts)
}
fn find_highest_index_lt(&self, ts: u64) -> Option<usize> {
self.as_ref().find_highest_index_lt_evs(ts)
}
fn tss(&self) -> Vec<netpod::TsMs> {
Events::tss(self)
.iter()
.map(|x| netpod::TsMs::from_ns_u64(*x))
.collect()
}
#[derive(Debug, thiserror::Error)]
#[cstm(name = "MergerError")]
pub enum Error {
NoPendingButMissing,
Input(SitemErrTy),
ShouldFindTsMin,
ItemShouldHaveTsMax,
PartialPathDrainedAllItems,
}
type MergeInp<T> = Pin<Box<dyn Stream<Item = Sitemty<T>> + Send>>;
@@ -95,8 +55,7 @@ pub struct Merger<T> {
range_complete: Vec<bool>,
out_of_band_queue: VecDeque<Sitemty<T>>,
log_queue: VecDeque<LogItem>,
dim0ix_max: u64,
done_emit_first_empty: bool,
dim0ix_max: TsNano,
done_data: bool,
done_buffered: bool,
done_range_complete: bool,
@@ -106,7 +65,7 @@ pub struct Merger<T> {
impl<T> fmt::Debug for Merger<T>
where
T: Mergeable,
T: MergeableTy,
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
let inps: Vec<_> = self.inps.iter().map(|x| x.is_some()).collect();
@@ -125,7 +84,7 @@ where
impl<T> Merger<T>
where
T: Mergeable,
T: MergeableTy,
{
pub fn new(inps: Vec<MergeInp<T>>, out_max_len: Option<u32>) -> Self {
let n = inps.len();
@@ -138,8 +97,7 @@ where
range_complete: vec![false; n],
out_of_band_queue: VecDeque::new(),
log_queue: VecDeque::new(),
dim0ix_max: 0,
done_emit_first_empty: false,
dim0ix_max: TsNano::from_ns(0),
done_data: false,
done_buffered: false,
done_range_complete: false,
@@ -148,37 +106,51 @@ where
}
}
fn drain_into_upto(src: &mut T, dst: &mut T, upto: u64) -> Result<(), MergeError> {
fn drain_into_dst_upto(src: &mut T, dst: &mut T, upto: TsNano) -> DrainIntoDstResult {
match src.find_lowest_index_gt(upto) {
Some(ilgt) => {
src.drain_into(dst, (0, ilgt))?;
}
Some(ilgt) => src.drain_into(dst, 0..ilgt),
None => {
// TODO should not be here.
src.drain_into(dst, (0, src.len()))?;
src.drain_into(dst, 0..src.len())
}
}
Ok(())
}
fn take_into_output_all(&mut self, src: &mut T) -> Result<(), MergeError> {
// TODO optimize the case when some large batch should be added to some existing small batch already in out.
// TODO maybe use two output slots?
self.take_into_output_upto(src, u64::MAX)
fn drain_into_new_upto(src: &mut T, upto: TsNano) -> DrainIntoNewResult<T> {
match src.find_lowest_index_gt(upto) {
Some(ilgt) => src.drain_into_new(0..ilgt),
None => {
// TODO should not be here.
src.drain_into_new(0..src.len())
}
}
}
fn take_into_output_upto(&mut self, src: &mut T, upto: u64) -> Result<(), MergeError> {
fn take_into_output_upto(&mut self, src: &mut T, upto: TsNano) -> DrainIntoDstResult {
// TODO optimize the case when some large batch should be added to some existing small batch already in out.
// TODO maybe use two output slots?
if let Some(out) = self.out.as_mut() {
Self::drain_into_upto(src, out, upto)?;
Self::drain_into_dst_upto(src, out, upto)
} else {
trace2!("move into fresh");
let mut fresh = src.new_empty();
Self::drain_into_upto(src, &mut fresh, upto)?;
self.out = Some(fresh);
match Self::drain_into_new_upto(src, upto) {
DrainIntoNewResult::Done(x) => {
self.out = Some(x);
DrainIntoDstResult::Done
}
DrainIntoNewResult::Partial(x) => {
self.out = Some(x);
DrainIntoDstResult::Partial
}
DrainIntoNewResult::NotCompatible => DrainIntoDstResult::NotCompatible,
}
}
Ok(())
}
fn take_into_output_all(&mut self, src: &mut T) -> DrainIntoDstResult {
// TODO optimize the case when some large batch should be added to some existing small batch already in out.
// TODO maybe use two output slots?
self.take_into_output_upto(src, TsNano::from_ns(u64::MAX))
}
fn process(mut self: Pin<&mut Self>, _cx: &mut Context) -> Result<ControlFlow<()>, Error> {
@@ -227,7 +199,7 @@ where
"dim0ix_max {} vs {} diff {}",
self.dim0ix_max,
t1,
self.dim0ix_max - t1
self.dim0ix_max.ns() - t1.ns()
),
};
log_items.push(item);
@@ -246,8 +218,15 @@ where
let mut item = self.items[il0].take().unwrap();
trace3!("Take all from item {item:?}");
match self.take_into_output_all(&mut item) {
Ok(()) => Ok(Break(())),
Err(MergeError::Full) | Err(MergeError::NotCompatible) => {
DrainIntoDstResult::Done => Ok(Break(())),
DrainIntoDstResult::Partial => {
// TODO count for stats
trace3!("Put item back");
self.items[il0] = Some(item);
self.do_clear_out = true;
Ok(Break(()))
}
DrainIntoDstResult::NotCompatible => {
// TODO count for stats
trace3!("Put item back");
self.items[il0] = Some(item);
@@ -259,20 +238,26 @@ where
// Take only up to the lowest ts of the second-lowest input
let mut item = self.items[il0].take().unwrap();
trace3!("Take up to {tl1} from item {item:?}");
let res = self.take_into_output_upto(&mut item, tl1);
match res {
Ok(()) => {
match self.take_into_output_upto(&mut item, tl1) {
DrainIntoDstResult::Done => {
if item.len() == 0 {
// TODO should never be here because we should have taken the whole item
Err(format!("Should have taken the whole item instead").into())
Err(Error::PartialPathDrainedAllItems)
} else {
self.items[il0] = Some(item);
Ok(Break(()))
}
}
Err(MergeError::Full) | Err(MergeError::NotCompatible) => {
DrainIntoDstResult::Partial => {
// TODO count for stats
info!("Put item back because {res:?}");
trace3!("Put item back because Partial");
self.items[il0] = Some(item);
self.do_clear_out = true;
Ok(Break(()))
}
DrainIntoDstResult::NotCompatible => {
// TODO count for stats
trace3!("Put item back because NotCompatible");
self.items[il0] = Some(item);
self.do_clear_out = true;
Ok(Break(()))
@@ -280,16 +265,22 @@ where
}
}
} else {
// TODO should never be here because ts-max should always exist here.
Err(format!("selected input without max ts").into())
Err(Error::ItemShouldHaveTsMax)
}
} else {
// No other input, take the whole item
let mut item = self.items[il0].take().unwrap();
trace3!("Take all from item (no other input) {item:?}");
match self.take_into_output_all(&mut item) {
Ok(()) => Ok(Break(())),
Err(_) => {
DrainIntoDstResult::Done => Ok(Break(())),
DrainIntoDstResult::Partial => {
// TODO count for stats
trace3!("Put item back");
self.items[il0] = Some(item);
self.do_clear_out = true;
Ok(Break(()))
}
DrainIntoDstResult::NotCompatible => {
// TODO count for stats
trace3!("Put item back");
self.items[il0] = Some(item);
@@ -299,7 +290,7 @@ where
}
}
} else {
Err(format!("after low ts search nothing found").into())
Err(Error::ShouldFindTsMin)
}
}
@@ -314,13 +305,6 @@ where
Ready(Some(Ok(k))) => match k {
StreamItem::DataItem(k) => match k {
RangeCompletableItem::Data(k) => {
if self.done_emit_first_empty == false {
trace!("emit first empty marker item");
self.done_emit_first_empty = true;
let item = k.new_empty();
let item = sitem_data(item);
self.out_of_band_queue.push_back(item);
}
self.items[i] = Some(k);
trace4!("refilled {}", i);
}
@@ -344,7 +328,7 @@ where
},
Ready(Some(Err(e))) => {
self.inps[i] = None;
return Err(e.into());
return Err(Error::Input(e));
}
Ready(None) => {
self.inps[i] = None;
@@ -364,10 +348,7 @@ where
}
}
fn poll3(
mut self: Pin<&mut Self>,
cx: &mut Context,
) -> ControlFlow<Poll<Option<Result<T, Error>>>> {
fn poll3(mut self: Pin<&mut Self>, cx: &mut Context) -> ControlFlow<Poll<Option<Error>>> {
use ControlFlow::*;
use Poll::*;
trace4!("poll3");
@@ -382,15 +363,15 @@ where
.count();
trace3!("ninps {ninps} nitems {nitems} nitemsmissing {nitemsmissing}");
if nitemsmissing != 0 {
let e = Error::from(format!("missing but no pending"));
return Break(Ready(Some(Err(e))));
let e = Error::NoPendingButMissing;
return Break(Ready(Some(e)));
}
let last_emit = nitems == 0;
if nitems != 0 {
match Self::process(Pin::new(&mut self), cx) {
Ok(Break(())) => {}
Ok(Continue(())) => {}
Err(e) => return Break(Ready(Some(Err(e)))),
Err(e) => return Break(Ready(Some(e))),
}
}
if let Some(o) = self.out.as_ref() {
@@ -426,23 +407,20 @@ where
}
}
fn poll2(
mut self: Pin<&mut Self>,
cx: &mut Context,
) -> ControlFlow<Poll<Option<Result<T, Error>>>> {
fn poll2(mut self: Pin<&mut Self>, cx: &mut Context) -> ControlFlow<Poll<Option<Error>>> {
use ControlFlow::*;
use Poll::*;
match Self::refill(Pin::new(&mut self), cx) {
Ok(Ready(())) => Self::poll3(self, cx),
Ok(Pending) => Break(Pending),
Err(e) => Break(Ready(Some(Err(e)))),
Err(e) => Break(Ready(Some(e))),
}
}
}
impl<T> Stream for Merger<T>
where
T: Mergeable,
T: MergeableTy,
{
type Item = Sitemty<T>;
@@ -492,19 +470,9 @@ where
match Self::poll2(self.as_mut(), cx) {
ControlFlow::Continue(()) => continue,
ControlFlow::Break(k) => match k {
Ready(Some(Ok(out))) => {
if true {
error!("THIS BRANCH SHOULD NO LONGER OCCUR, REFACTOR");
self.done_data = true;
let e = Error::from(format!("TODO refactor direct emit in merger"));
return Ready(Some(Err(e.into())));
}
trace!("emit buffered len {}", out.len());
Ready(Some(sitem_data(out)))
}
Ready(Some(Err(e))) => {
Ready(Some(e)) => {
self.done_data = true;
Ready(Some(Err(e.into())))
Ready(Some(Err(sitem_err2_from_string(e))))
}
Ready(None) => {
self.done_data = true;