Seems to work on remote
This commit is contained in:
+24
-23
@@ -53,6 +53,7 @@ where
|
|||||||
aggtor: Option<I::Aggregator>,
|
aggtor: Option<I::Aggregator>,
|
||||||
spec: BinnedRange,
|
spec: BinnedRange,
|
||||||
curbin: u32,
|
curbin: u32,
|
||||||
|
data_completed: bool,
|
||||||
range_complete: bool,
|
range_complete: bool,
|
||||||
range_complete_emitted: bool,
|
range_complete_emitted: bool,
|
||||||
left: Option<Poll<Option<Result<I, Error>>>>,
|
left: Option<Poll<Option<Result<I, Error>>>>,
|
||||||
@@ -74,6 +75,7 @@ where
|
|||||||
aggtor: Some(I::aggregator_new_static(range.beg, range.end)),
|
aggtor: Some(I::aggregator_new_static(range.beg, range.end)),
|
||||||
spec,
|
spec,
|
||||||
curbin: 0,
|
curbin: 0,
|
||||||
|
data_completed: false,
|
||||||
range_complete: false,
|
range_complete: false,
|
||||||
range_complete_emitted: false,
|
range_complete_emitted: false,
|
||||||
left: None,
|
left: None,
|
||||||
@@ -105,30 +107,30 @@ where
|
|||||||
'outer: loop {
|
'outer: loop {
|
||||||
if let Some(item) = self.tmp_agg_results.pop_front() {
|
if let Some(item) = self.tmp_agg_results.pop_front() {
|
||||||
return Ready(Some(Ok(item)));
|
return Ready(Some(Ok(item)));
|
||||||
|
} else if self.data_completed {
|
||||||
|
if self.range_complete {
|
||||||
|
if self.range_complete_emitted {
|
||||||
|
self.completed = true;
|
||||||
|
return Ready(None);
|
||||||
|
} else {
|
||||||
|
self.range_complete_emitted = true;
|
||||||
|
warn!("IntoBinnedTDefaultStream /////////////////////////////////// emit now RangeComplete");
|
||||||
|
// TODO why can't I declare that type?
|
||||||
|
//type TT = <I::Aggregator as AggregatorTdim>::OutputValue;
|
||||||
|
if let Some(item) = <I::Aggregator as AggregatorTdim>::OutputValue::make_range_complete_item() {
|
||||||
|
return Ready(Some(Ok(item)));
|
||||||
|
} else {
|
||||||
|
warn!("IntoBinnedTDefaultStream should emit RangeComplete but it doesn't have one");
|
||||||
|
self.completed = true;
|
||||||
|
return Ready(None);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
let cur = if let Some(k) = self.left.take() {
|
let cur = if let Some(k) = self.left.take() {
|
||||||
k
|
k
|
||||||
} else if self.inp_completed {
|
} else if self.inp_completed {
|
||||||
if self.range_complete {
|
Ready(None)
|
||||||
if self.range_complete_emitted {
|
|
||||||
self.completed = true;
|
|
||||||
Ready(None)
|
|
||||||
} else {
|
|
||||||
self.range_complete_emitted = true;
|
|
||||||
// TODO why can't I declare that type?
|
|
||||||
//type TT = <I::Aggregator as AggregatorTdim>::OutputValue;
|
|
||||||
if let Some(k) = <I::Aggregator as AggregatorTdim>::OutputValue::make_range_complete_item() {
|
|
||||||
return Ready(Some(Ok(k)));
|
|
||||||
} else {
|
|
||||||
warn!("IntoBinnedTDefaultStream should emit RangeComplete but I doesn't have one");
|
|
||||||
self.completed = true;
|
|
||||||
Ready(None)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
self.completed = true;
|
|
||||||
Ready(None)
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
let inp_poll_span = span!(Level::TRACE, "into_t_inp_poll");
|
let inp_poll_span = span!(Level::TRACE, "into_t_inp_poll");
|
||||||
inp_poll_span.in_scope(|| self.inp.poll_next_unpin(cx))
|
inp_poll_span.in_scope(|| self.inp.poll_next_unpin(cx))
|
||||||
@@ -190,15 +192,14 @@ where
|
|||||||
self.inp_completed = true;
|
self.inp_completed = true;
|
||||||
if self.curbin as u64 >= self.spec.count {
|
if self.curbin as u64 >= self.spec.count {
|
||||||
warn!("IntoBinnedTDefaultStream curbin out of spec, END");
|
warn!("IntoBinnedTDefaultStream curbin out of spec, END");
|
||||||
self.completed = true;
|
self.data_completed = true;
|
||||||
Ready(None)
|
continue 'outer;
|
||||||
} else {
|
} else {
|
||||||
self.curbin += 1;
|
self.curbin += 1;
|
||||||
let range = self.spec.get_range(self.curbin);
|
let range = self.spec.get_range(self.curbin);
|
||||||
match self.aggtor.replace(I::aggregator_new_static(range.beg, range.end)) {
|
match self.aggtor.replace(I::aggregator_new_static(range.beg, range.end)) {
|
||||||
Some(ag) => {
|
Some(ag) => {
|
||||||
let ret = ag.result();
|
let ret = ag.result();
|
||||||
//Ready(Some(Ok(ag.result())))
|
|
||||||
self.tmp_agg_results = ret.into();
|
self.tmp_agg_results = ret.into();
|
||||||
continue 'outer;
|
continue 'outer;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -61,7 +61,10 @@ impl BinnedStream {
|
|||||||
_ => None,
|
_ => None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(PreBinnedItem::RangeComplete) => Some(Ok(MinMaxAvgScalarBinBatchStreamItem::RangeComplete)),
|
Ok(PreBinnedItem::RangeComplete) => {
|
||||||
|
info!("=================== BINNED STREAM OBSERVES RangeComplete ====================");
|
||||||
|
Some(Ok(MinMaxAvgScalarBinBatchStreamItem::RangeComplete))
|
||||||
|
}
|
||||||
Ok(PreBinnedItem::EventDataReadStats(stats)) => {
|
Ok(PreBinnedItem::EventDataReadStats(stats)) => {
|
||||||
Some(Ok(MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats(stats)))
|
Some(Ok(MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats(stats)))
|
||||||
}
|
}
|
||||||
@@ -73,7 +76,6 @@ impl BinnedStream {
|
|||||||
ready(g)
|
ready(g)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
//.map(|k| k)
|
|
||||||
.into_binned_t(range);
|
.into_binned_t(range);
|
||||||
Self { inp: Box::pin(inp) }
|
Self { inp: Box::pin(inp) }
|
||||||
}
|
}
|
||||||
|
|||||||
Vendored
+2
-5
@@ -161,7 +161,6 @@ impl Stream for PreBinnedValueStream {
|
|||||||
|
|
||||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||||
use Poll::*;
|
use Poll::*;
|
||||||
info!("PreBinnedValueStream poll_next ENTER");
|
|
||||||
if self.completed {
|
if self.completed {
|
||||||
panic!("PreBinnedValueStream poll_next on completed");
|
panic!("PreBinnedValueStream poll_next on completed");
|
||||||
}
|
}
|
||||||
@@ -169,7 +168,7 @@ impl Stream for PreBinnedValueStream {
|
|||||||
self.completed = true;
|
self.completed = true;
|
||||||
return Ready(None);
|
return Ready(None);
|
||||||
}
|
}
|
||||||
let u = 'outer: loop {
|
'outer: loop {
|
||||||
break if let Some(fut) = self.fut2.as_mut() {
|
break if let Some(fut) = self.fut2.as_mut() {
|
||||||
match fut.poll_next_unpin(cx) {
|
match fut.poll_next_unpin(cx) {
|
||||||
Ready(Some(k)) => match k {
|
Ready(Some(k)) => match k {
|
||||||
@@ -218,8 +217,6 @@ impl Stream for PreBinnedValueStream {
|
|||||||
self.open_check_local_file = Some(Box::pin(fut));
|
self.open_check_local_file = Some(Box::pin(fut));
|
||||||
continue 'outer;
|
continue 'outer;
|
||||||
};
|
};
|
||||||
};
|
}
|
||||||
info!("PBV EXIT WITH {:?}", u);
|
|
||||||
u
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -127,8 +127,6 @@ impl EventChunker {
|
|||||||
let pulse = sl.read_i64::<BE>().unwrap() as u64;
|
let pulse = sl.read_i64::<BE>().unwrap() as u64;
|
||||||
if ts >= self.range.end {
|
if ts >= self.range.end {
|
||||||
self.seen_beyond_range = true;
|
self.seen_beyond_range = true;
|
||||||
// TODO ret.end_of_range_observed = true;
|
|
||||||
info!("END OF RANGE OBSERVED");
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if ts < self.range.beg {
|
if ts < self.range.beg {
|
||||||
|
|||||||
+15
-31
@@ -213,9 +213,9 @@ where
|
|||||||
let d = self.range_complete_observed.iter().filter(|&&k| k).count();
|
let d = self.range_complete_observed.iter().filter(|&&k| k).count();
|
||||||
if d == self.range_complete_observed.len() {
|
if d == self.range_complete_observed.len() {
|
||||||
self.range_complete_observed_all = true;
|
self.range_complete_observed_all = true;
|
||||||
info!("\n\n:::::: range_complete d {} COMPLETE", d);
|
info!("MergedMinMaxAvgScalarStream range_complete d {} COMPLETE", d);
|
||||||
} else {
|
} else {
|
||||||
info!("\n\n:::::: range_complete d {}", d);
|
trace!("MergedMinMaxAvgScalarStream range_complete d {}", d);
|
||||||
}
|
}
|
||||||
continue 'l1;
|
continue 'l1;
|
||||||
}
|
}
|
||||||
@@ -258,7 +258,6 @@ where
|
|||||||
|
|
||||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||||
use Poll::*;
|
use Poll::*;
|
||||||
info!("Merger poll ENTER");
|
|
||||||
if self.completed {
|
if self.completed {
|
||||||
panic!("MergedMinMaxAvgScalarStream poll_next on completed");
|
panic!("MergedMinMaxAvgScalarStream poll_next on completed");
|
||||||
}
|
}
|
||||||
@@ -266,20 +265,14 @@ where
|
|||||||
self.completed = true;
|
self.completed = true;
|
||||||
return Ready(None);
|
return Ready(None);
|
||||||
}
|
}
|
||||||
let u = 'outer: loop {
|
'outer: loop {
|
||||||
break if self.data_emit_complete {
|
break if self.data_emit_complete {
|
||||||
error!("MERGER NOTE data_emit_complete");
|
|
||||||
if self.range_complete_observed_all {
|
if self.range_complete_observed_all {
|
||||||
error!("MERGER NOTE range_complete_observed_all");
|
|
||||||
if self.range_complete_observed_all_emitted {
|
if self.range_complete_observed_all_emitted {
|
||||||
error!("MERGER NOTE range_complete_observed_all_emitted");
|
|
||||||
// NOTE everything else (data and stats) must be emitted before data_emit_complete gets set.
|
|
||||||
self.completed = true;
|
self.completed = true;
|
||||||
Ready(None)
|
Ready(None)
|
||||||
} else {
|
} else {
|
||||||
error!("MERGER NOTE range_complete_observed_all EMIT NOW");
|
|
||||||
self.range_complete_observed_all_emitted = true;
|
self.range_complete_observed_all_emitted = true;
|
||||||
// NOTE this is supposed to return
|
|
||||||
Ready(Some(Ok(MinMaxAvgScalarEventBatchStreamItem::RangeComplete)))
|
Ready(Some(Ok(MinMaxAvgScalarEventBatchStreamItem::RangeComplete)))
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@@ -293,39 +286,32 @@ where
|
|||||||
let mut lowest_ix = usize::MAX;
|
let mut lowest_ix = usize::MAX;
|
||||||
let mut lowest_ts = u64::MAX;
|
let mut lowest_ts = u64::MAX;
|
||||||
for i1 in 0..self.inps.len() {
|
for i1 in 0..self.inps.len() {
|
||||||
match &self.current[i1] {
|
if let MergedMinMaxAvgScalarStreamCurVal::Val(val) = &self.current[i1] {
|
||||||
MergedMinMaxAvgScalarStreamCurVal::Finish => {}
|
let u = self.ixs[i1];
|
||||||
MergedMinMaxAvgScalarStreamCurVal::Val(val) => {
|
if u >= val.tss.len() {
|
||||||
let u = self.ixs[i1];
|
self.ixs[i1] = 0;
|
||||||
if u >= val.tss.len() {
|
self.current[i1] = MergedMinMaxAvgScalarStreamCurVal::None;
|
||||||
self.ixs[i1] = 0;
|
continue 'outer;
|
||||||
self.current[i1] = MergedMinMaxAvgScalarStreamCurVal::None;
|
} else {
|
||||||
continue 'outer;
|
let ts = val.tss[u];
|
||||||
} else {
|
if ts < lowest_ts {
|
||||||
let ts = val.tss[u];
|
lowest_ix = i1;
|
||||||
if ts < lowest_ts {
|
lowest_ts = ts;
|
||||||
lowest_ix = i1;
|
|
||||||
lowest_ts = ts;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
_ => panic!(),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if lowest_ix == usize::MAX {
|
if lowest_ix == usize::MAX {
|
||||||
if self.batch.tss.len() != 0 {
|
if self.batch.tss.len() != 0 {
|
||||||
let k = std::mem::replace(&mut self.batch, MinMaxAvgScalarEventBatch::empty());
|
let k = std::mem::replace(&mut self.batch, MinMaxAvgScalarEventBatch::empty());
|
||||||
info!("MergedMinMaxAvgScalarStream no more lowest emit Ready(Some( current batch ))");
|
|
||||||
let ret = MinMaxAvgScalarEventBatchStreamItem::Values(k);
|
let ret = MinMaxAvgScalarEventBatchStreamItem::Values(k);
|
||||||
self.data_emit_complete = true;
|
self.data_emit_complete = true;
|
||||||
Ready(Some(Ok(ret)))
|
Ready(Some(Ok(ret)))
|
||||||
} else {
|
} else {
|
||||||
info!("MergedMinMaxAvgScalarStream no more lowest emit Ready(None)");
|
|
||||||
self.data_emit_complete = true;
|
self.data_emit_complete = true;
|
||||||
continue 'outer;
|
continue 'outer;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
//trace!("decided on next lowest ts {} ix {}", lowest_ts, lowest_ix);
|
|
||||||
assert!(lowest_ts >= self.ts_last_emit);
|
assert!(lowest_ts >= self.ts_last_emit);
|
||||||
self.ts_last_emit = lowest_ts;
|
self.ts_last_emit = lowest_ts;
|
||||||
self.batch.tss.push(lowest_ts);
|
self.batch.tss.push(lowest_ts);
|
||||||
@@ -360,9 +346,7 @@ where
|
|||||||
Pending => Pending,
|
Pending => Pending,
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
};
|
}
|
||||||
info!("Merger poll DONE");
|
|
||||||
u
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user