Write cache
This commit is contained in:
73
disk/src/cache/pbv.rs
vendored
73
disk/src/cache/pbv.rs
vendored
@@ -1,13 +1,14 @@
|
||||
use crate::agg::binnedt::IntoBinnedT;
|
||||
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatchStreamItem;
|
||||
use crate::agg::scalarbinbatch::{MinMaxAvgScalarBinBatch, MinMaxAvgScalarBinBatchStreamItem};
|
||||
use crate::cache::pbvfs::{PreBinnedItem, PreBinnedValueFetchedStream};
|
||||
use crate::cache::{node_ix_for_patch, MergedFromRemotes, PreBinnedQuery};
|
||||
use crate::cache::{MergedFromRemotes, PreBinnedQuery};
|
||||
use crate::frame::makeframe::make_frame;
|
||||
use crate::raw::EventsQuery;
|
||||
use crate::streamlog::Streamlog;
|
||||
use bytes::Bytes;
|
||||
use err::Error;
|
||||
use futures_core::Stream;
|
||||
use futures_util::pin_mut;
|
||||
use futures_util::{FutureExt, StreamExt};
|
||||
use netpod::log::*;
|
||||
use netpod::streamext::SCC;
|
||||
@@ -58,12 +59,12 @@ pub struct PreBinnedValueStream {
|
||||
errored: bool,
|
||||
completed: bool,
|
||||
streamlog: Streamlog,
|
||||
values: MinMaxAvgScalarBinBatch,
|
||||
write_fut: Option<Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>>,
|
||||
}
|
||||
|
||||
impl PreBinnedValueStream {
|
||||
pub fn new(query: PreBinnedQuery, node_config: &NodeConfigCached) -> Self {
|
||||
// TODO check that we are the correct node.
|
||||
let _node_ix = node_ix_for_patch(&query.patch, &query.channel, &node_config.node_config.cluster);
|
||||
Self {
|
||||
query,
|
||||
node_config: node_config.clone(),
|
||||
@@ -74,7 +75,9 @@ impl PreBinnedValueStream {
|
||||
range_complete_emitted: false,
|
||||
errored: false,
|
||||
completed: false,
|
||||
streamlog: Streamlog::new(),
|
||||
streamlog: Streamlog::new(node_config.ix as u32),
|
||||
values: MinMaxAvgScalarBinBatch::empty(),
|
||||
write_fut: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -182,32 +185,49 @@ impl Stream for PreBinnedValueStream {
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use Poll::*;
|
||||
if self.completed {
|
||||
panic!("PreBinnedValueStream poll_next on completed");
|
||||
}
|
||||
if self.errored {
|
||||
self.completed = true;
|
||||
return Ready(None);
|
||||
}
|
||||
if let Some(item) = self.streamlog.pop() {
|
||||
return Ready(Some(Ok(PreBinnedItem::Log(item))));
|
||||
}
|
||||
'outer: loop {
|
||||
break if self.data_complete {
|
||||
break if self.completed {
|
||||
panic!("PreBinnedValueStream poll_next on completed");
|
||||
} else if self.errored {
|
||||
self.completed = true;
|
||||
Ready(None)
|
||||
} else if let Some(item) = self.streamlog.pop() {
|
||||
Ready(Some(Ok(PreBinnedItem::Log(item))))
|
||||
} else if let Some(fut) = &mut self.write_fut {
|
||||
pin_mut!(fut);
|
||||
match fut.poll(cx) {
|
||||
Ready(Ok(())) => {
|
||||
self.write_fut = None;
|
||||
self.streamlog.append(Level::INFO, format!("cache file written"));
|
||||
continue 'outer;
|
||||
}
|
||||
Ready(Err(e)) => {
|
||||
self.errored = true;
|
||||
Ready(Some(Err(e)))
|
||||
}
|
||||
Pending => Pending,
|
||||
}
|
||||
} else if self.data_complete {
|
||||
if self.range_complete_observed {
|
||||
if self.range_complete_emitted {
|
||||
self.completed = true;
|
||||
Ready(None)
|
||||
} else {
|
||||
let msg = format!(
|
||||
"======== STREAMLOG ========= WRITE CACHE FILE\n{:?}\n\n\n",
|
||||
self.query.patch
|
||||
"Write cache file\n{:?}\nN: {}\n\n\n",
|
||||
self.query.patch,
|
||||
self.values.ts1s.len()
|
||||
);
|
||||
self.streamlog.append(Level::INFO, msg);
|
||||
info!(
|
||||
"======================== WRITE CACHE FILE\n{:?}\n\n\n",
|
||||
self.query.patch
|
||||
let values = std::mem::replace(&mut self.values, MinMaxAvgScalarBinBatch::empty());
|
||||
let fut = super::write_pb_cache_min_max_avg_scalar(
|
||||
values,
|
||||
self.query.patch.clone(),
|
||||
self.query.agg_kind.clone(),
|
||||
self.query.channel.clone(),
|
||||
self.node_config.clone(),
|
||||
);
|
||||
self.write_fut = Some(Box::pin(fut));
|
||||
self.range_complete_emitted = true;
|
||||
Ready(Some(Ok(PreBinnedItem::RangeComplete)))
|
||||
}
|
||||
@@ -220,10 +240,17 @@ impl Stream for PreBinnedValueStream {
|
||||
Ready(Some(k)) => match k {
|
||||
Ok(PreBinnedItem::RangeComplete) => {
|
||||
self.range_complete_observed = true;
|
||||
//Ready(Some(Ok(PreBinnedItem::RangeComplete)))
|
||||
continue 'outer;
|
||||
}
|
||||
Ok(PreBinnedItem::Batch(batch)) => Ready(Some(Ok(PreBinnedItem::Batch(batch)))),
|
||||
Ok(PreBinnedItem::Batch(batch)) => {
|
||||
self.values.ts1s.extend(batch.ts1s.iter());
|
||||
self.values.ts2s.extend(batch.ts2s.iter());
|
||||
self.values.counts.extend(batch.counts.iter());
|
||||
self.values.mins.extend(batch.mins.iter());
|
||||
self.values.maxs.extend(batch.maxs.iter());
|
||||
self.values.avgs.extend(batch.avgs.iter());
|
||||
Ready(Some(Ok(PreBinnedItem::Batch(batch))))
|
||||
}
|
||||
Ok(PreBinnedItem::EventDataReadStats(stats)) => {
|
||||
Ready(Some(Ok(PreBinnedItem::EventDataReadStats(stats))))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user