diff --git a/disk/src/agg/binnedt.rs b/disk/src/agg/binnedt.rs index 7dcde8f..9a37137 100644 --- a/disk/src/agg/binnedt.rs +++ b/disk/src/agg/binnedt.rs @@ -1,4 +1,5 @@ use crate::agg::AggregatableXdim1Bin; +use crate::streamlog::LogItem; use err::Error; use futures_core::Stream; use futures_util::StreamExt; @@ -24,6 +25,9 @@ pub trait AggregatableTdim: Sized { fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator; fn is_range_complete(&self) -> bool; fn make_range_complete_item() -> Option; + fn is_log_item(&self) -> bool; + fn log_item(self) -> Option; + fn make_log_item(item: LogItem) -> Option; } pub trait IntoBinnedT { @@ -140,6 +144,19 @@ where if k.is_range_complete() { self.range_complete = true; continue 'outer; + } else if k.is_log_item() { + if let Some(item) = k.log_item() { + if let Some(item) = + ::OutputValue::make_log_item(item.clone()) + { + Ready(Some(Ok(item))) + } else { + warn!("IntoBinnedTDefaultStream can not create log item"); + continue 'outer; + } + } else { + panic!() + } } else { let ag = self.aggtor.as_mut().unwrap(); if ag.ends_before(&k) { diff --git a/disk/src/agg/eventbatch.rs b/disk/src/agg/eventbatch.rs index 5c0dfe9..7ddfdc2 100644 --- a/disk/src/agg/eventbatch.rs +++ b/disk/src/agg/eventbatch.rs @@ -1,6 +1,7 @@ use crate::agg::binnedt::{AggregatableTdim, AggregatorTdim}; use crate::agg::scalarbinbatch::{MinMaxAvgScalarBinBatch, MinMaxAvgScalarBinBatchStreamItem}; use crate::agg::AggregatableXdim1Bin; +use crate::streamlog::LogItem; use bytes::{BufMut, Bytes, BytesMut}; use netpod::log::*; use netpod::timeunits::SEC; @@ -120,6 +121,18 @@ impl AggregatableTdim for MinMaxAvgScalarEventBatch { fn make_range_complete_item() -> Option { None } + + fn is_log_item(&self) -> bool { + false + } + + fn log_item(self) -> Option { + None + } + + fn make_log_item(_item: LogItem) -> Option { + None + } } impl MinMaxAvgScalarEventBatch { @@ -266,6 +279,7 @@ pub enum MinMaxAvgScalarEventBatchStreamItem { Values(MinMaxAvgScalarEventBatch), RangeComplete, EventDataReadStats(EventDataReadStats), + Log(LogItem), } impl AggregatableXdim1Bin for MinMaxAvgScalarEventBatchStreamItem { @@ -296,6 +310,26 @@ impl AggregatableTdim for MinMaxAvgScalarEventBatchStreamItem { fn make_range_complete_item() -> Option { Some(MinMaxAvgScalarEventBatchStreamItem::RangeComplete) } + + fn is_log_item(&self) -> bool { + if let MinMaxAvgScalarEventBatchStreamItem::Log(_) = self { + true + } else { + false + } + } + + fn log_item(self) -> Option { + if let MinMaxAvgScalarEventBatchStreamItem::Log(item) = self { + Some(item) + } else { + None + } + } + + fn make_log_item(item: LogItem) -> Option { + Some(MinMaxAvgScalarEventBatchStreamItem::Log(item)) + } } pub struct MinMaxAvgScalarEventBatchStreamItemAggregator { @@ -343,6 +377,7 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchStreamItemAggregator { MinMaxAvgScalarEventBatchStreamItem::Values(vals) => self.agg.ingest(vals), MinMaxAvgScalarEventBatchStreamItem::EventDataReadStats(stats) => self.event_data_read_stats.trans(stats), MinMaxAvgScalarEventBatchStreamItem::RangeComplete => (), + MinMaxAvgScalarEventBatchStreamItem::Log(_) => (), } } diff --git a/disk/src/agg/scalarbinbatch.rs b/disk/src/agg/scalarbinbatch.rs index ef4b504..51669b7 100644 --- a/disk/src/agg/scalarbinbatch.rs +++ b/disk/src/agg/scalarbinbatch.rs @@ -1,5 +1,6 @@ use crate::agg::binnedt::{AggregatableTdim, AggregatorTdim}; use crate::agg::{AggregatableXdim1Bin, Fits, FitsInside}; +use crate::streamlog::LogItem; use bytes::{BufMut, Bytes, BytesMut}; use netpod::log::*; use netpod::timeunits::SEC; @@ -201,6 +202,18 @@ impl AggregatableTdim for MinMaxAvgScalarBinBatch { fn make_range_complete_item() -> Option { None } + + fn is_log_item(&self) -> bool { + false + } + + fn log_item(self) -> Option { + None + } + + fn make_log_item(_item: LogItem) -> Option { + None + } } pub struct MinMaxAvgScalarBinBatchAggregator { @@ -295,6 +308,7 @@ pub enum MinMaxAvgScalarBinBatchStreamItem { Values(MinMaxAvgScalarBinBatch), RangeComplete, EventDataReadStats(EventDataReadStats), + Log(LogItem), } impl AggregatableTdim for MinMaxAvgScalarBinBatchStreamItem { @@ -316,6 +330,26 @@ impl AggregatableTdim for MinMaxAvgScalarBinBatchStreamItem { fn make_range_complete_item() -> Option { Some(MinMaxAvgScalarBinBatchStreamItem::RangeComplete) } + + fn is_log_item(&self) -> bool { + if let MinMaxAvgScalarBinBatchStreamItem::Log(_) = self { + true + } else { + false + } + } + + fn log_item(self) -> Option { + if let MinMaxAvgScalarBinBatchStreamItem::Log(item) = self { + Some(item) + } else { + None + } + } + + fn make_log_item(item: LogItem) -> Option { + Some(MinMaxAvgScalarBinBatchStreamItem::Log(item)) + } } impl AggregatableXdim1Bin for MinMaxAvgScalarBinBatchStreamItem { @@ -371,6 +405,7 @@ impl AggregatorTdim for MinMaxAvgScalarBinBatchStreamItemAggregator { MinMaxAvgScalarBinBatchStreamItem::Values(vals) => self.agg.ingest(vals), MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats(stats) => self.event_data_read_stats.trans(stats), MinMaxAvgScalarBinBatchStreamItem::RangeComplete => (), + MinMaxAvgScalarBinBatchStreamItem::Log(_) => (), } } diff --git a/disk/src/binnedstream.rs b/disk/src/binnedstream.rs index 0a81ed4..a006862 100644 --- a/disk/src/binnedstream.rs +++ b/disk/src/binnedstream.rs @@ -68,6 +68,7 @@ impl BinnedStream { Ok(PreBinnedItem::EventDataReadStats(stats)) => { Some(Ok(MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats(stats))) } + Ok(PreBinnedItem::Log(item)) => Some(Ok(MinMaxAvgScalarBinBatchStreamItem::Log(item))), Err(e) => { error!("observe error in stream {:?}", e); Some(Err(e)) diff --git a/disk/src/cache/pbv.rs b/disk/src/cache/pbv.rs index bb81f33..d9cab9d 100644 --- a/disk/src/cache/pbv.rs +++ b/disk/src/cache/pbv.rs @@ -4,6 +4,7 @@ use crate::cache::pbvfs::{PreBinnedItem, PreBinnedValueFetchedStream}; use crate::cache::{node_ix_for_patch, MergedFromRemotes, PreBinnedQuery}; use crate::frame::makeframe::make_frame; use crate::raw::EventsQuery; +use crate::streamlog::Streamlog; use bytes::Bytes; use err::Error; use futures_core::Stream; @@ -51,8 +52,12 @@ pub struct PreBinnedValueStream { node_config: NodeConfigCached, open_check_local_file: Option> + Send>>>, fut2: Option> + Send>>>, + data_complete: bool, + range_complete_observed: bool, + range_complete_emitted: bool, errored: bool, completed: bool, + streamlog: Streamlog, } impl PreBinnedValueStream { @@ -64,92 +69,108 @@ impl PreBinnedValueStream { node_config: node_config.clone(), open_check_local_file: None, fut2: None, + data_complete: false, + range_complete_observed: false, + range_complete_emitted: false, errored: false, completed: false, + streamlog: Streamlog::new(), } } + fn setup_merged_from_remotes(&mut self) { + let g = self.query.patch.bin_t_len(); + warn!("no better resolution found for g {}", g); + let evq = EventsQuery { + channel: self.query.channel.clone(), + range: self.query.patch.patch_range(), + agg_kind: self.query.agg_kind.clone(), + }; + if self.query.patch.patch_t_len() % self.query.patch.bin_t_len() != 0 { + error!( + "Patch length inconsistency {} {}", + self.query.patch.patch_t_len(), + self.query.patch.bin_t_len() + ); + return; + } + // TODO do I need to set up more transformations or binning to deliver the requested data? + let count = self.query.patch.patch_t_len() / self.query.patch.bin_t_len(); + let range = BinnedRange::covering_range(evq.range.clone(), count).unwrap(); + let s1 = MergedFromRemotes::new(evq, self.node_config.node_config.cluster.clone()); + let s2 = s1.into_binned_t(range); + let s2 = s2.map(|k| { + use MinMaxAvgScalarBinBatchStreamItem::*; + match k { + Ok(Values(k)) => Ok(PreBinnedItem::Batch(k)), + Ok(RangeComplete) => Ok(PreBinnedItem::RangeComplete), + Ok(EventDataReadStats(stats)) => Ok(PreBinnedItem::EventDataReadStats(stats)), + Ok(Log(item)) => Ok(PreBinnedItem::Log(item)), + Err(e) => Err(e), + } + }); + self.fut2 = Some(Box::pin(s2)); + } + + fn setup_from_higher_res_prebinned(&mut self, range: PreBinnedPatchRange) { + let g = self.query.patch.bin_t_len(); + let h = range.grid_spec.bin_t_len(); + info!( + "try_setup_fetch_prebinned_higher_res found g {} h {} ratio {} mod {} {:?}", + g, + h, + g / h, + g % h, + range, + ); + if g / h <= 1 { + error!("try_setup_fetch_prebinned_higher_res g {} h {}", g, h); + return; + } + if g / h > 200 { + error!("try_setup_fetch_prebinned_higher_res g {} h {}", g, h); + return; + } + if g % h != 0 { + error!("try_setup_fetch_prebinned_higher_res g {} h {}", g, h); + return; + } + let node_config = self.node_config.clone(); + let patch_it = PreBinnedPatchIterator::from_range(range); + let s = futures_util::stream::iter(patch_it) + .map({ + let q2 = self.query.clone(); + move |patch| { + let query = PreBinnedQuery { + patch, + channel: q2.channel.clone(), + agg_kind: q2.agg_kind.clone(), + cache_usage: q2.cache_usage.clone(), + }; + PreBinnedValueFetchedStream::new(&query, &node_config) + } + }) + .filter_map(|k| match k { + Ok(k) => ready(Some(k)), + Err(e) => { + // TODO Reconsider error handling here: + error!("{:?}", e); + ready(None) + } + }) + .flatten(); + self.fut2 = Some(Box::pin(s)); + } + fn try_setup_fetch_prebinned_higher_res(&mut self) { info!("try_setup_fetch_prebinned_higher_res for {:?}", self.query.patch); - let g = self.query.patch.bin_t_len(); let range = self.query.patch.patch_range(); match PreBinnedPatchRange::covering_range(range, self.query.patch.bin_count() + 1) { Some(range) => { - let h = range.grid_spec.bin_t_len(); - info!( - "try_setup_fetch_prebinned_higher_res found g {} h {} ratio {} mod {} {:?}", - g, - h, - g / h, - g % h, - range, - ); - if g / h <= 1 { - error!("try_setup_fetch_prebinned_higher_res g {} h {}", g, h); - return; - } - if g / h > 200 { - error!("try_setup_fetch_prebinned_higher_res g {} h {}", g, h); - return; - } - if g % h != 0 { - error!("try_setup_fetch_prebinned_higher_res g {} h {}", g, h); - return; - } - let node_config = self.node_config.clone(); - let patch_it = PreBinnedPatchIterator::from_range(range); - let s = futures_util::stream::iter(patch_it) - .map({ - let q2 = self.query.clone(); - move |patch| { - let query = PreBinnedQuery { - patch, - channel: q2.channel.clone(), - agg_kind: q2.agg_kind.clone(), - cache_usage: q2.cache_usage.clone(), - }; - PreBinnedValueFetchedStream::new(&query, &node_config) - } - }) - .filter_map(|k| match k { - Ok(k) => ready(Some(k)), - Err(e) => { - // TODO Reconsider error handling here: - error!("{:?}", e); - ready(None) - } - }) - .flatten(); - self.fut2 = Some(Box::pin(s)); + self.setup_from_higher_res_prebinned(range); } None => { - warn!("no better resolution found for g {}", g); - let evq = EventsQuery { - channel: self.query.channel.clone(), - range: self.query.patch.patch_range(), - agg_kind: self.query.agg_kind.clone(), - }; - if self.query.patch.patch_t_len() % self.query.patch.bin_t_len() != 0 { - error!( - "Patch length inconsistency {} {}", - self.query.patch.patch_t_len(), - self.query.patch.bin_t_len() - ); - return; - } - error!("try_setup_fetch_prebinned_higher_res apply all requested transformations and T-binning"); - let count = self.query.patch.patch_t_len() / self.query.patch.bin_t_len(); - let range = BinnedRange::covering_range(evq.range.clone(), count).unwrap(); - let s1 = MergedFromRemotes::new(evq, self.node_config.node_config.cluster.clone()); - let s2 = s1.into_binned_t(range).map(|k| match k { - Ok(MinMaxAvgScalarBinBatchStreamItem::Values(k)) => Ok(PreBinnedItem::Batch(k)), - Ok(MinMaxAvgScalarBinBatchStreamItem::RangeComplete) => Ok(PreBinnedItem::RangeComplete), - Ok(MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats(stats)) => { - Ok(PreBinnedItem::EventDataReadStats(stats)) - } - Err(e) => Err(e), - }); - self.fut2 = Some(Box::pin(s2)); + self.setup_merged_from_remotes(); } } } @@ -168,17 +189,54 @@ impl Stream for PreBinnedValueStream { self.completed = true; return Ready(None); } + if let Some(item) = self.streamlog.pop() { + return Ready(Some(Ok(PreBinnedItem::Log(item)))); + } 'outer: loop { - break if let Some(fut) = self.fut2.as_mut() { + break if self.data_complete { + if self.range_complete_observed { + if self.range_complete_emitted { + self.completed = true; + Ready(None) + } else { + let msg = format!( + "======== STREAMLOG ========= WRITE CACHE FILE\n{:?}\n\n\n", + self.query.patch + ); + self.streamlog.append(Level::INFO, msg); + info!( + "======================== WRITE CACHE FILE\n{:?}\n\n\n", + self.query.patch + ); + self.range_complete_emitted = true; + Ready(Some(Ok(PreBinnedItem::RangeComplete))) + } + } else { + self.completed = true; + Ready(None) + } + } else if let Some(fut) = self.fut2.as_mut() { match fut.poll_next_unpin(cx) { Ready(Some(k)) => match k { - Ok(k) => Ready(Some(Ok(k))), + Ok(PreBinnedItem::RangeComplete) => { + self.range_complete_observed = true; + //Ready(Some(Ok(PreBinnedItem::RangeComplete))) + continue 'outer; + } + Ok(PreBinnedItem::Batch(batch)) => Ready(Some(Ok(PreBinnedItem::Batch(batch)))), + Ok(PreBinnedItem::EventDataReadStats(stats)) => { + Ready(Some(Ok(PreBinnedItem::EventDataReadStats(stats)))) + } + Ok(PreBinnedItem::Log(item)) => Ready(Some(Ok(PreBinnedItem::Log(item)))), Err(e) => { self.errored = true; Ready(Some(Err(e))) } }, - Ready(None) => Ready(None), + Ready(None) => { + self.data_complete = true; + continue 'outer; + } Pending => Pending, } } else if let Some(fut) = self.open_check_local_file.as_mut() { diff --git a/disk/src/cache/pbvfs.rs b/disk/src/cache/pbvfs.rs index 1cd91bc..bf08d8d 100644 --- a/disk/src/cache/pbvfs.rs +++ b/disk/src/cache/pbvfs.rs @@ -2,6 +2,7 @@ use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::cache::{node_ix_for_patch, HttpBodyAsAsyncRead, PreBinnedQuery}; use crate::frame::inmem::InMemoryFrameAsyncReadStream; use crate::frame::makeframe::decode_frame; +use crate::streamlog::LogItem; use err::Error; use futures_core::Stream; use futures_util::{pin_mut, FutureExt}; @@ -50,6 +51,7 @@ pub enum PreBinnedItem { RangeComplete, EventDataReadStats(EventDataReadStats), //ValuesExtractStats(ValuesExtractStats), + Log(LogItem), } impl Stream for PreBinnedValueFetchedStream { diff --git a/disk/src/lib.rs b/disk/src/lib.rs index 96a9e2e..46d930b 100644 --- a/disk/src/lib.rs +++ b/disk/src/lib.rs @@ -31,6 +31,7 @@ pub mod index; pub mod merge; pub mod paths; pub mod raw; +pub mod streamlog; pub async fn read_test_1(query: &netpod::AggQuerySingleChannel, node: Node) -> Result { let path = paths::datapath(query.timebin as u64, &query.channel_config, &node); diff --git a/disk/src/merge.rs b/disk/src/merge.rs index 02e3056..ef4ac5f 100644 --- a/disk/src/merge.rs +++ b/disk/src/merge.rs @@ -1,9 +1,11 @@ use crate::agg::eventbatch::{MinMaxAvgScalarEventBatch, MinMaxAvgScalarEventBatchStreamItem}; use crate::agg::{Dim1F32Stream, Dim1F32StreamItem, ValuesDim1}; use crate::eventchunker::EventChunkerItem; +use crate::streamlog::LogItem; use err::Error; use futures_core::Stream; use futures_util::StreamExt; +use std::collections::VecDeque; use std::pin::Pin; use std::task::{Context, Poll}; #[allow(unused_imports)] @@ -163,6 +165,7 @@ where range_complete_observed_all_emitted: bool, data_emit_complete: bool, batch_size: usize, + logitems: VecDeque, } impl MergedMinMaxAvgScalarStream @@ -188,6 +191,7 @@ where range_complete_observed_all_emitted: false, data_emit_complete: false, batch_size: 64, + logitems: VecDeque::new(), } } @@ -219,6 +223,10 @@ where } continue 'l1; } + MinMaxAvgScalarEventBatchStreamItem::Log(item) => { + self.logitems.push_back(item); + continue 'l1; + } MinMaxAvgScalarEventBatchStreamItem::EventDataReadStats(_stats) => { // TODO merge also the stats: either just sum, or sum up by input index. todo!(); @@ -265,6 +273,9 @@ where self.completed = true; return Ready(None); } + if let Some(item) = self.logitems.pop_front() { + return Ready(Some(Ok(MinMaxAvgScalarEventBatchStreamItem::Log(item)))); + } 'outer: loop { break if self.data_emit_complete { if self.range_complete_observed_all { diff --git a/disk/src/streamlog.rs b/disk/src/streamlog.rs new file mode 100644 index 0000000..d890474 --- /dev/null +++ b/disk/src/streamlog.rs @@ -0,0 +1,93 @@ +use netpod::log::*; +use serde::de::{Error, Visitor}; +use serde::{Deserialize, Serialize}; +use std::collections::VecDeque; +use std::fmt::Formatter; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct LogItem { + #[serde(with = "levelserde")] + level: Level, + msg: String, +} + +struct VisitLevel; + +impl<'de> Visitor<'de> for VisitLevel { + type Value = u32; + + fn expecting(&self, fmt: &mut Formatter) -> std::fmt::Result { + write!(fmt, "") + } + + fn visit_u32(self, v: u32) -> Result + where + E: Error, + { + Ok(v) + } +} + +mod levelserde { + use super::Level; + use crate::streamlog::VisitLevel; + use serde::{Deserializer, Serializer}; + + pub fn serialize(t: &Level, s: S) -> Result + where + S: Serializer, + { + let g = match *t { + Level::ERROR => 1, + Level::WARN => 2, + Level::INFO => 3, + Level::DEBUG => 4, + Level::TRACE => 5, + }; + s.serialize_u32(g) + } + + pub fn deserialize<'de, D>(d: D) -> Result + where + D: Deserializer<'de>, + { + match d.deserialize_u32(VisitLevel) { + Ok(level) => { + let g = if level == 1 { + Level::ERROR + } else if level == 2 { + Level::WARN + } else if level == 3 { + Level::INFO + } else if level == 4 { + Level::DEBUG + } else if level == 5 { + Level::TRACE + } else { + Level::TRACE + }; + Ok(g) + } + Err(e) => Err(e), + } + } +} + +pub struct Streamlog { + items: VecDeque, +} + +impl Streamlog { + pub fn new() -> Self { + Self { items: VecDeque::new() } + } + + pub fn append(&mut self, level: Level, msg: String) { + let item = LogItem { level, msg }; + self.items.push_back(item); + } + + pub fn pop(&mut self) -> Option { + self.items.pop_back() + } +} diff --git a/retrieval/src/test.rs b/retrieval/src/test.rs index f59ad5d..995f42f 100644 --- a/retrieval/src/test.rs +++ b/retrieval/src/test.rs @@ -52,9 +52,6 @@ async fn get_binned_0_inner() -> Result<(), Error> { &cluster, ) .await?; - if true { - return Ok(()); - } get_binned_channel( "wave-u16-le-n77", "1970-01-01T01:11:00.000Z", @@ -71,6 +68,9 @@ async fn get_binned_0_inner() -> Result<(), Error> { &cluster, ) .await?; + if true { + return Ok(()); + } Ok(()) }