Remove unused
This commit is contained in:
+17
-16
@@ -390,26 +390,27 @@ impl<NTY: ScalarOps> CollectorType for BinsDim0Collector<NTY> {
|
|||||||
let bin_count_exp = if let Some(r) = &binrange {
|
let bin_count_exp = if let Some(r) = &binrange {
|
||||||
r.bin_count() as u32
|
r.bin_count() as u32
|
||||||
} else {
|
} else {
|
||||||
|
eprintln!("no binrange given");
|
||||||
0
|
0
|
||||||
};
|
};
|
||||||
let bin_count = self.vals.ts1s.len() as u32;
|
let bin_count = self.vals.ts1s.len() as u32;
|
||||||
let (missing_bins, continue_at, finished_at) = if self.range_final {
|
eprintln!(
|
||||||
if bin_count < bin_count_exp {
|
"-------------- MAKE MISSING BINS bin_count_exp {} bin_count {}",
|
||||||
match self.vals.ts2s.back() {
|
bin_count_exp, bin_count
|
||||||
Some(&k) => {
|
);
|
||||||
let missing_bins = bin_count_exp - bin_count;
|
let (missing_bins, continue_at, finished_at) = if bin_count < bin_count_exp {
|
||||||
let continue_at = IsoDateTime(Utc.timestamp_nanos(k as i64));
|
match self.vals.ts2s.back() {
|
||||||
let u = k + (k - self.vals.ts1s.back().unwrap()) * missing_bins as u64;
|
Some(&k) => {
|
||||||
let finished_at = IsoDateTime(Utc.timestamp_nanos(u as i64));
|
let missing_bins = bin_count_exp - bin_count;
|
||||||
(missing_bins, Some(continue_at), Some(finished_at))
|
let continue_at = IsoDateTime(Utc.timestamp_nanos(k as i64));
|
||||||
}
|
let u = k + (k - self.vals.ts1s.back().unwrap()) * missing_bins as u64;
|
||||||
None => {
|
let finished_at = IsoDateTime(Utc.timestamp_nanos(u as i64));
|
||||||
warn!("can not determine continue-at parameters");
|
(missing_bins, Some(continue_at), Some(finished_at))
|
||||||
(0, None, None)
|
}
|
||||||
}
|
None => {
|
||||||
|
warn!("can not determine continue-at parameters");
|
||||||
|
(0, None, None)
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
(0, None, None)
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
(0, None, None)
|
(0, None, None)
|
||||||
|
|||||||
@@ -33,6 +33,7 @@ use items_0::TimeBinner;
|
|||||||
use netpod::log::*;
|
use netpod::log::*;
|
||||||
use netpod::timeunits::*;
|
use netpod::timeunits::*;
|
||||||
use netpod::AggKind;
|
use netpod::AggKind;
|
||||||
|
use netpod::BinnedRange;
|
||||||
use netpod::NanoRange;
|
use netpod::NanoRange;
|
||||||
use netpod::ScalarType;
|
use netpod::ScalarType;
|
||||||
use netpod::Shape;
|
use netpod::Shape;
|
||||||
@@ -375,11 +376,12 @@ pub async fn binned_collected(
|
|||||||
scalar_type: ScalarType,
|
scalar_type: ScalarType,
|
||||||
shape: Shape,
|
shape: Shape,
|
||||||
agg_kind: AggKind,
|
agg_kind: AggKind,
|
||||||
edges: Vec<u64>,
|
binrange: BinnedRange,
|
||||||
timeout: Duration,
|
timeout: Duration,
|
||||||
inp: Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>>,
|
inp: Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>>,
|
||||||
) -> Result<Box<dyn ToJsonResult>, Error> {
|
) -> Result<Box<dyn ToJsonResult>, Error> {
|
||||||
event!(Level::TRACE, "binned_collected");
|
event!(Level::TRACE, "binned_collected");
|
||||||
|
let edges = binrange.edges();
|
||||||
if edges.len() < 2 {
|
if edges.len() < 2 {
|
||||||
return Err(format!("binned_collected but edges.len() {}", edges.len()).into());
|
return Err(format!("binned_collected but edges.len() {}", edges.len()).into());
|
||||||
}
|
}
|
||||||
@@ -476,7 +478,7 @@ pub async fn binned_collected(
|
|||||||
}
|
}
|
||||||
match coll {
|
match coll {
|
||||||
Some(mut coll) => {
|
Some(mut coll) => {
|
||||||
let res = coll.result(None, None).map_err(|e| format!("{e}"))?;
|
let res = coll.result(None, Some(binrange)).map_err(|e| format!("{e}"))?;
|
||||||
tokio::time::sleep(Duration::from_millis(2000)).await;
|
tokio::time::sleep(Duration::from_millis(2000)).await;
|
||||||
Ok(res)
|
Ok(res)
|
||||||
}
|
}
|
||||||
|
|||||||
+18
-13
@@ -444,22 +444,19 @@ fn bin02() {
|
|||||||
let inp1 = Box::pin(inp1);
|
let inp1 = Box::pin(inp1);
|
||||||
let inp2 = Box::pin(futures_util::stream::empty()) as _;
|
let inp2 = Box::pin(futures_util::stream::empty()) as _;
|
||||||
let stream = ChannelEventsMerger::new(vec![inp1, inp2]);
|
let stream = ChannelEventsMerger::new(vec![inp1, inp2]);
|
||||||
if false {
|
// covering_range result is subject to adjustments, instead, manually choose bin edges
|
||||||
// covering_range result is subject to adjustments, instead, manually choose bin edges
|
let range = NanoRange {
|
||||||
let range = NanoRange {
|
beg: TSBASE + SEC * 1,
|
||||||
beg: TSBASE + SEC * 1,
|
end: TSBASE + SEC * 10,
|
||||||
end: TSBASE + SEC * 10,
|
};
|
||||||
};
|
let binrange = BinnedRange::covering_range(range, 9).map_err(|e| format!("{e}"))?;
|
||||||
let covering = BinnedRange::covering_range(range, 3).map_err(|e| format!("{e}"))?;
|
assert_eq!(binrange.edges().len(), 10);
|
||||||
assert_eq!(covering.edges().len(), 6);
|
|
||||||
}
|
|
||||||
let edges = (0..10).into_iter().map(|x| TSBASE + SEC * 1 + SEC * x).collect();
|
|
||||||
let stream = Box::pin(stream);
|
let stream = Box::pin(stream);
|
||||||
let collected = binned_collected(
|
let collected = binned_collected(
|
||||||
ScalarType::F32,
|
ScalarType::F32,
|
||||||
Shape::Scalar,
|
Shape::Scalar,
|
||||||
AggKind::TimeWeightedScalar,
|
AggKind::TimeWeightedScalar,
|
||||||
edges,
|
binrange,
|
||||||
Duration::from_millis(2000),
|
Duration::from_millis(2000),
|
||||||
stream,
|
stream,
|
||||||
)
|
)
|
||||||
@@ -499,19 +496,27 @@ fn binned_timeout_01() {
|
|||||||
}
|
}
|
||||||
k
|
k
|
||||||
});
|
});
|
||||||
let edges = (0..10).into_iter().map(|x| TSBASE + SEC * (1 + x)).collect();
|
let edges: Vec<_> = (0..10).into_iter().map(|x| TSBASE + SEC * (1 + x)).collect();
|
||||||
|
let range = NanoRange {
|
||||||
|
beg: TSBASE + SEC * 1,
|
||||||
|
end: TSBASE + SEC * 10,
|
||||||
|
};
|
||||||
|
let binrange = BinnedRange::covering_range(range, 9)?;
|
||||||
|
eprintln!("edges1: {:?}", edges);
|
||||||
|
eprintln!("edges2: {:?}", binrange.edges());
|
||||||
let inp1 = Box::pin(inp1) as _;
|
let inp1 = Box::pin(inp1) as _;
|
||||||
let timeout = Duration::from_millis(400);
|
let timeout = Duration::from_millis(400);
|
||||||
let res = binned_collected(
|
let res = binned_collected(
|
||||||
ScalarType::F32,
|
ScalarType::F32,
|
||||||
Shape::Scalar,
|
Shape::Scalar,
|
||||||
AggKind::TimeWeightedScalar,
|
AggKind::TimeWeightedScalar,
|
||||||
edges,
|
binrange,
|
||||||
timeout,
|
timeout,
|
||||||
inp1,
|
inp1,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
let r2: &BinsDim0CollectedResult<f32> = res.as_any_ref().downcast_ref().expect("res seems wrong type");
|
let r2: &BinsDim0CollectedResult<f32> = res.as_any_ref().downcast_ref().expect("res seems wrong type");
|
||||||
|
eprintln!("rs: {r2:?}");
|
||||||
assert_eq!(SEC * r2.ts_anchor_sec(), TSBASE + SEC);
|
assert_eq!(SEC * r2.ts_anchor_sec(), TSBASE + SEC);
|
||||||
assert_eq!(r2.counts(), &[10, 10, 10]);
|
assert_eq!(r2.counts(), &[10, 10, 10]);
|
||||||
assert_eq!(r2.mins(), &[3.0, 2.0, 3.0]);
|
assert_eq!(r2.mins(), &[3.0, 2.0, 3.0]);
|
||||||
|
|||||||
@@ -1 +1 @@
|
|||||||
pub mod mergedstream;
|
|
||||||
|
|||||||
@@ -1,309 +0,0 @@
|
|||||||
use err::Error;
|
|
||||||
use futures_util::{Stream, StreamExt};
|
|
||||||
use items::ByteEstimate;
|
|
||||||
use items::{Appendable, LogItem, PushableIndex, RangeCompletableItem, Sitemty, StatsItem, StreamItem, WithTimestamps};
|
|
||||||
use netpod::histo::HistoLog2;
|
|
||||||
use netpod::log::*;
|
|
||||||
use netpod::ByteSize;
|
|
||||||
use std::collections::VecDeque;
|
|
||||||
use std::pin::Pin;
|
|
||||||
use std::task::{Context, Poll};
|
|
||||||
|
|
||||||
// TODO compare with items_2::merger::*
|
|
||||||
|
|
||||||
const LOG_EMIT_ITEM: bool = false;
|
|
||||||
|
|
||||||
enum MergedCurVal<T> {
|
|
||||||
None,
|
|
||||||
Finish,
|
|
||||||
Val(T),
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct MergedStream<S, ITY> {
|
|
||||||
inps: Vec<S>,
|
|
||||||
current: Vec<MergedCurVal<ITY>>,
|
|
||||||
ixs: Vec<usize>,
|
|
||||||
errored: bool,
|
|
||||||
completed: bool,
|
|
||||||
batch: Option<ITY>,
|
|
||||||
ts_last_emit: u64,
|
|
||||||
range_complete_observed: Vec<bool>,
|
|
||||||
range_complete_observed_all: bool,
|
|
||||||
range_complete_observed_all_emitted: bool,
|
|
||||||
data_emit_complete: bool,
|
|
||||||
batch_size: ByteSize,
|
|
||||||
batch_len_emit_histo: HistoLog2,
|
|
||||||
logitems: VecDeque<LogItem>,
|
|
||||||
stats_items: VecDeque<StatsItem>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<S, ITY> Drop for MergedStream<S, ITY> {
|
|
||||||
fn drop(&mut self) {
|
|
||||||
// TODO collect somewhere
|
|
||||||
debug!(
|
|
||||||
"MergedStream Drop Stats:\nbatch_len_emit_histo: {:?}",
|
|
||||||
self.batch_len_emit_histo
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<S, ITY> MergedStream<S, ITY>
|
|
||||||
where
|
|
||||||
S: Stream<Item = Sitemty<ITY>> + Unpin,
|
|
||||||
ITY: Appendable + Unpin,
|
|
||||||
{
|
|
||||||
pub fn new(inps: Vec<S>) -> Self {
|
|
||||||
trace!("MergedStream::new");
|
|
||||||
let n = inps.len();
|
|
||||||
let current = (0..n).into_iter().map(|_| MergedCurVal::None).collect();
|
|
||||||
Self {
|
|
||||||
inps,
|
|
||||||
current: current,
|
|
||||||
ixs: vec![0; n],
|
|
||||||
errored: false,
|
|
||||||
completed: false,
|
|
||||||
batch: None,
|
|
||||||
ts_last_emit: 0,
|
|
||||||
range_complete_observed: vec![false; n],
|
|
||||||
range_complete_observed_all: false,
|
|
||||||
range_complete_observed_all_emitted: false,
|
|
||||||
data_emit_complete: false,
|
|
||||||
batch_size: ByteSize::kb(128),
|
|
||||||
batch_len_emit_histo: HistoLog2::new(0),
|
|
||||||
logitems: VecDeque::new(),
|
|
||||||
stats_items: VecDeque::new(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn replenish(self: &mut Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Error>> {
|
|
||||||
use Poll::*;
|
|
||||||
let mut pending = 0;
|
|
||||||
for i1 in 0..self.inps.len() {
|
|
||||||
match self.current[i1] {
|
|
||||||
MergedCurVal::None => {
|
|
||||||
'l1: loop {
|
|
||||||
break match self.inps[i1].poll_next_unpin(cx) {
|
|
||||||
Ready(Some(Ok(k))) => match k {
|
|
||||||
StreamItem::Log(item) => {
|
|
||||||
self.logitems.push_back(item);
|
|
||||||
continue 'l1;
|
|
||||||
}
|
|
||||||
StreamItem::Stats(item) => {
|
|
||||||
self.stats_items.push_back(item);
|
|
||||||
continue 'l1;
|
|
||||||
}
|
|
||||||
StreamItem::DataItem(item) => match item {
|
|
||||||
RangeCompletableItem::RangeComplete => {
|
|
||||||
self.range_complete_observed[i1] = true;
|
|
||||||
let d = self.range_complete_observed.iter().filter(|&&k| k).count();
|
|
||||||
if d == self.range_complete_observed.len() {
|
|
||||||
self.range_complete_observed_all = true;
|
|
||||||
debug!("MergedStream range_complete d {} COMPLETE", d);
|
|
||||||
} else {
|
|
||||||
trace!("MergedStream range_complete d {}", d);
|
|
||||||
}
|
|
||||||
continue 'l1;
|
|
||||||
}
|
|
||||||
RangeCompletableItem::Data(item) => {
|
|
||||||
self.ixs[i1] = 0;
|
|
||||||
self.current[i1] = MergedCurVal::Val(item);
|
|
||||||
}
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Ready(Some(Err(e))) => {
|
|
||||||
// TODO emit this error, consider this stream as done, anything more to do here?
|
|
||||||
//self.current[i1] = CurVal::Err(e);
|
|
||||||
self.errored = true;
|
|
||||||
return Ready(Err(e));
|
|
||||||
}
|
|
||||||
Ready(None) => {
|
|
||||||
self.current[i1] = MergedCurVal::Finish;
|
|
||||||
}
|
|
||||||
Pending => {
|
|
||||||
pending += 1;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_ => (),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if pending > 0 {
|
|
||||||
Pending
|
|
||||||
} else {
|
|
||||||
Ready(Ok(()))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<S, ITY> Stream for MergedStream<S, ITY>
|
|
||||||
where
|
|
||||||
S: Stream<Item = Sitemty<ITY>> + Unpin,
|
|
||||||
ITY: PushableIndex + Appendable + ByteEstimate + WithTimestamps + Unpin,
|
|
||||||
{
|
|
||||||
type Item = Sitemty<ITY>;
|
|
||||||
|
|
||||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
|
||||||
use Poll::*;
|
|
||||||
'outer: loop {
|
|
||||||
break if self.completed {
|
|
||||||
panic!("poll_next on completed");
|
|
||||||
} else if self.errored {
|
|
||||||
self.completed = true;
|
|
||||||
Ready(None)
|
|
||||||
} else if let Some(item) = self.logitems.pop_front() {
|
|
||||||
Ready(Some(Ok(StreamItem::Log(item))))
|
|
||||||
} else if let Some(item) = self.stats_items.pop_front() {
|
|
||||||
Ready(Some(Ok(StreamItem::Stats(item))))
|
|
||||||
} else if self.range_complete_observed_all_emitted {
|
|
||||||
self.completed = true;
|
|
||||||
Ready(None)
|
|
||||||
} else if self.data_emit_complete {
|
|
||||||
if self.range_complete_observed_all {
|
|
||||||
if self.range_complete_observed_all_emitted {
|
|
||||||
self.completed = true;
|
|
||||||
Ready(None)
|
|
||||||
} else {
|
|
||||||
self.range_complete_observed_all_emitted = true;
|
|
||||||
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))))
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
self.completed = true;
|
|
||||||
Ready(None)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
match self.replenish(cx) {
|
|
||||||
Ready(Ok(_)) => {
|
|
||||||
let mut lowest_ix = usize::MAX;
|
|
||||||
let mut lowest_ts = u64::MAX;
|
|
||||||
for i1 in 0..self.inps.len() {
|
|
||||||
if let MergedCurVal::Val(val) = &self.current[i1] {
|
|
||||||
let u = self.ixs[i1];
|
|
||||||
if u >= val.len() {
|
|
||||||
self.ixs[i1] = 0;
|
|
||||||
self.current[i1] = MergedCurVal::None;
|
|
||||||
continue 'outer;
|
|
||||||
} else {
|
|
||||||
let ts = val.ts(u);
|
|
||||||
if ts < lowest_ts {
|
|
||||||
lowest_ix = i1;
|
|
||||||
lowest_ts = ts;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if lowest_ix == usize::MAX {
|
|
||||||
if let Some(batch) = self.batch.take() {
|
|
||||||
if batch.len() != 0 {
|
|
||||||
self.batch_len_emit_histo.ingest(batch.len() as u32);
|
|
||||||
self.data_emit_complete = true;
|
|
||||||
if LOG_EMIT_ITEM {
|
|
||||||
let mut aa = vec![];
|
|
||||||
for ii in 0..batch.len() {
|
|
||||||
aa.push(batch.ts(ii));
|
|
||||||
}
|
|
||||||
debug!("MergedBlobsStream A emits {} events tss {:?}", batch.len(), aa);
|
|
||||||
};
|
|
||||||
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(batch)))))
|
|
||||||
} else {
|
|
||||||
self.data_emit_complete = true;
|
|
||||||
continue 'outer;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
self.data_emit_complete = true;
|
|
||||||
continue 'outer;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// TODO unordered cases
|
|
||||||
if lowest_ts < self.ts_last_emit {
|
|
||||||
self.errored = true;
|
|
||||||
let msg = format!(
|
|
||||||
"unordered event at lowest_ts {} ts_last_emit {}",
|
|
||||||
lowest_ts, self.ts_last_emit
|
|
||||||
);
|
|
||||||
return Ready(Some(Err(Error::with_public_msg(msg))));
|
|
||||||
} else {
|
|
||||||
self.ts_last_emit = self.ts_last_emit.max(lowest_ts);
|
|
||||||
}
|
|
||||||
{
|
|
||||||
let batch = self.batch.take();
|
|
||||||
let rix = self.ixs[lowest_ix];
|
|
||||||
match &self.current[lowest_ix] {
|
|
||||||
MergedCurVal::Val(val) => {
|
|
||||||
let mut ldst = batch.unwrap_or_else(|| val.empty_like_self());
|
|
||||||
if false {
|
|
||||||
info!(
|
|
||||||
"Push event rix {} lowest_ix {} lowest_ts {}",
|
|
||||||
rix, lowest_ix, lowest_ts
|
|
||||||
);
|
|
||||||
}
|
|
||||||
ldst.push_index(val, rix);
|
|
||||||
self.batch = Some(ldst);
|
|
||||||
}
|
|
||||||
MergedCurVal::None => panic!(),
|
|
||||||
MergedCurVal::Finish => panic!(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
self.ixs[lowest_ix] += 1;
|
|
||||||
let curlen = match &self.current[lowest_ix] {
|
|
||||||
MergedCurVal::Val(val) => val.len(),
|
|
||||||
MergedCurVal::None => panic!(),
|
|
||||||
MergedCurVal::Finish => panic!(),
|
|
||||||
};
|
|
||||||
if self.ixs[lowest_ix] >= curlen {
|
|
||||||
self.ixs[lowest_ix] = 0;
|
|
||||||
self.current[lowest_ix] = MergedCurVal::None;
|
|
||||||
}
|
|
||||||
let emit_packet_now = if let Some(batch) = &self.batch {
|
|
||||||
if batch.byte_estimate() >= self.batch_size.bytes() as u64 {
|
|
||||||
true
|
|
||||||
} else {
|
|
||||||
false
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
false
|
|
||||||
};
|
|
||||||
if emit_packet_now {
|
|
||||||
if let Some(batch) = self.batch.take() {
|
|
||||||
trace!("emit item because over threshold len {}", batch.len());
|
|
||||||
self.batch_len_emit_histo.ingest(batch.len() as u32);
|
|
||||||
if LOG_EMIT_ITEM {
|
|
||||||
let mut aa = vec![];
|
|
||||||
for ii in 0..batch.len() {
|
|
||||||
aa.push(batch.ts(ii));
|
|
||||||
}
|
|
||||||
debug!("MergedBlobsStream B emits {} events tss {:?}", batch.len(), aa);
|
|
||||||
};
|
|
||||||
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(batch)))))
|
|
||||||
} else {
|
|
||||||
continue 'outer;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
continue 'outer;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ready(Err(e)) => {
|
|
||||||
self.errored = true;
|
|
||||||
Ready(Some(Err(e)))
|
|
||||||
}
|
|
||||||
Pending => Pending,
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod test {
|
|
||||||
use items_0::Empty;
|
|
||||||
use items_2::channelevents::ChannelEvents;
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn merge_channel_events() {
|
|
||||||
let mut evs = items_2::eventsdim0::EventsDim0::empty();
|
|
||||||
evs.push(1, 100, 17u8);
|
|
||||||
evs.push(3, 300, 16);
|
|
||||||
let _cevs = ChannelEvents::Events(Box::new(evs));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -5,6 +5,7 @@ use items::RangeCompletableItem;
|
|||||||
use items::Sitemty;
|
use items::Sitemty;
|
||||||
use items::StatsItem;
|
use items::StatsItem;
|
||||||
use items::StreamItem;
|
use items::StreamItem;
|
||||||
|
use items_2::merger::MergeError;
|
||||||
use items_2::merger::Mergeable;
|
use items_2::merger::Mergeable;
|
||||||
use netpod::log::*;
|
use netpod::log::*;
|
||||||
use netpod::NanoRange;
|
use netpod::NanoRange;
|
||||||
@@ -76,7 +77,15 @@ where
|
|||||||
} else {
|
} else {
|
||||||
self.stats.items_part_prune_high += 1;
|
self.stats.items_part_prune_high += 1;
|
||||||
let mut dummy = item.new_empty();
|
let mut dummy = item.new_empty();
|
||||||
item.drain_into(&mut dummy, (ihlt + 1, n))?;
|
match item.drain_into(&mut dummy, (ihlt + 1, n)) {
|
||||||
|
Ok(_) => {}
|
||||||
|
Err(e) => match e {
|
||||||
|
MergeError::NotCompatible => {
|
||||||
|
error!("logic error")
|
||||||
|
}
|
||||||
|
MergeError::Full => error!("full, logic error"),
|
||||||
|
},
|
||||||
|
}
|
||||||
item
|
item
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -102,7 +111,8 @@ where
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
let mut dummy = item.new_empty();
|
let mut dummy = item.new_empty();
|
||||||
item.drain_into(&mut dummy, (0, ilge - 1))?;
|
item.drain_into(&mut dummy, (0, ilge - 1))
|
||||||
|
.map_err(|e| format!("{e:?} unexpected MergeError while remove of items"))?;
|
||||||
self.slot1 = None;
|
self.slot1 = None;
|
||||||
item
|
item
|
||||||
}
|
}
|
||||||
@@ -110,7 +120,8 @@ where
|
|||||||
None => {
|
None => {
|
||||||
let n = item.len();
|
let n = item.len();
|
||||||
let mut keep = item.new_empty();
|
let mut keep = item.new_empty();
|
||||||
item.drain_into(&mut keep, (n.max(1) - 1, n))?;
|
item.drain_into(&mut keep, (n.max(1) - 1, n))
|
||||||
|
.map_err(|e| format!("{e:?} unexpected MergeError while remove of items"))?;
|
||||||
self.slot1 = Some(keep);
|
self.slot1 = Some(keep);
|
||||||
item.new_empty()
|
item.new_empty()
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user