Not bad, I get Streamlog LogItem in the test

This commit is contained in:
Dominik Werder
2021-05-05 22:05:24 +02:00
parent 1ae5c3dc80
commit a8932dba0d
10 changed files with 333 additions and 80 deletions

View File

@@ -1,4 +1,5 @@
use crate::agg::AggregatableXdim1Bin;
use crate::streamlog::LogItem;
use err::Error;
use futures_core::Stream;
use futures_util::StreamExt;
@@ -24,6 +25,9 @@ pub trait AggregatableTdim: Sized {
fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator;
fn is_range_complete(&self) -> bool;
fn make_range_complete_item() -> Option<Self>;
fn is_log_item(&self) -> bool;
fn log_item(self) -> Option<LogItem>;
fn make_log_item(item: LogItem) -> Option<Self>;
}
pub trait IntoBinnedT {
@@ -140,6 +144,19 @@ where
if k.is_range_complete() {
self.range_complete = true;
continue 'outer;
} else if k.is_log_item() {
if let Some(item) = k.log_item() {
if let Some(item) =
<I::Aggregator as AggregatorTdim>::OutputValue::make_log_item(item.clone())
{
Ready(Some(Ok(item)))
} else {
warn!("IntoBinnedTDefaultStream can not create log item");
continue 'outer;
}
} else {
panic!()
}
} else {
let ag = self.aggtor.as_mut().unwrap();
if ag.ends_before(&k) {

View File

@@ -1,6 +1,7 @@
use crate::agg::binnedt::{AggregatableTdim, AggregatorTdim};
use crate::agg::scalarbinbatch::{MinMaxAvgScalarBinBatch, MinMaxAvgScalarBinBatchStreamItem};
use crate::agg::AggregatableXdim1Bin;
use crate::streamlog::LogItem;
use bytes::{BufMut, Bytes, BytesMut};
use netpod::log::*;
use netpod::timeunits::SEC;
@@ -120,6 +121,18 @@ impl AggregatableTdim for MinMaxAvgScalarEventBatch {
fn make_range_complete_item() -> Option<Self> {
None
}
fn is_log_item(&self) -> bool {
false
}
fn log_item(self) -> Option<LogItem> {
None
}
fn make_log_item(_item: LogItem) -> Option<Self> {
None
}
}
impl MinMaxAvgScalarEventBatch {
@@ -266,6 +279,7 @@ pub enum MinMaxAvgScalarEventBatchStreamItem {
Values(MinMaxAvgScalarEventBatch),
RangeComplete,
EventDataReadStats(EventDataReadStats),
Log(LogItem),
}
impl AggregatableXdim1Bin for MinMaxAvgScalarEventBatchStreamItem {
@@ -296,6 +310,26 @@ impl AggregatableTdim for MinMaxAvgScalarEventBatchStreamItem {
fn make_range_complete_item() -> Option<Self> {
Some(MinMaxAvgScalarEventBatchStreamItem::RangeComplete)
}
fn is_log_item(&self) -> bool {
if let MinMaxAvgScalarEventBatchStreamItem::Log(_) = self {
true
} else {
false
}
}
fn log_item(self) -> Option<LogItem> {
if let MinMaxAvgScalarEventBatchStreamItem::Log(item) = self {
Some(item)
} else {
None
}
}
fn make_log_item(item: LogItem) -> Option<Self> {
Some(MinMaxAvgScalarEventBatchStreamItem::Log(item))
}
}
pub struct MinMaxAvgScalarEventBatchStreamItemAggregator {
@@ -343,6 +377,7 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchStreamItemAggregator {
MinMaxAvgScalarEventBatchStreamItem::Values(vals) => self.agg.ingest(vals),
MinMaxAvgScalarEventBatchStreamItem::EventDataReadStats(stats) => self.event_data_read_stats.trans(stats),
MinMaxAvgScalarEventBatchStreamItem::RangeComplete => (),
MinMaxAvgScalarEventBatchStreamItem::Log(_) => (),
}
}

View File

@@ -1,5 +1,6 @@
use crate::agg::binnedt::{AggregatableTdim, AggregatorTdim};
use crate::agg::{AggregatableXdim1Bin, Fits, FitsInside};
use crate::streamlog::LogItem;
use bytes::{BufMut, Bytes, BytesMut};
use netpod::log::*;
use netpod::timeunits::SEC;
@@ -201,6 +202,18 @@ impl AggregatableTdim for MinMaxAvgScalarBinBatch {
fn make_range_complete_item() -> Option<Self> {
None
}
fn is_log_item(&self) -> bool {
false
}
fn log_item(self) -> Option<LogItem> {
None
}
fn make_log_item(_item: LogItem) -> Option<Self> {
None
}
}
pub struct MinMaxAvgScalarBinBatchAggregator {
@@ -295,6 +308,7 @@ pub enum MinMaxAvgScalarBinBatchStreamItem {
Values(MinMaxAvgScalarBinBatch),
RangeComplete,
EventDataReadStats(EventDataReadStats),
Log(LogItem),
}
impl AggregatableTdim for MinMaxAvgScalarBinBatchStreamItem {
@@ -316,6 +330,26 @@ impl AggregatableTdim for MinMaxAvgScalarBinBatchStreamItem {
fn make_range_complete_item() -> Option<Self> {
Some(MinMaxAvgScalarBinBatchStreamItem::RangeComplete)
}
fn is_log_item(&self) -> bool {
if let MinMaxAvgScalarBinBatchStreamItem::Log(_) = self {
true
} else {
false
}
}
fn log_item(self) -> Option<LogItem> {
if let MinMaxAvgScalarBinBatchStreamItem::Log(item) = self {
Some(item)
} else {
None
}
}
fn make_log_item(item: LogItem) -> Option<Self> {
Some(MinMaxAvgScalarBinBatchStreamItem::Log(item))
}
}
impl AggregatableXdim1Bin for MinMaxAvgScalarBinBatchStreamItem {
@@ -371,6 +405,7 @@ impl AggregatorTdim for MinMaxAvgScalarBinBatchStreamItemAggregator {
MinMaxAvgScalarBinBatchStreamItem::Values(vals) => self.agg.ingest(vals),
MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats(stats) => self.event_data_read_stats.trans(stats),
MinMaxAvgScalarBinBatchStreamItem::RangeComplete => (),
MinMaxAvgScalarBinBatchStreamItem::Log(_) => (),
}
}

View File

@@ -68,6 +68,7 @@ impl BinnedStream {
Ok(PreBinnedItem::EventDataReadStats(stats)) => {
Some(Ok(MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats(stats)))
}
Ok(PreBinnedItem::Log(item)) => Some(Ok(MinMaxAvgScalarBinBatchStreamItem::Log(item))),
Err(e) => {
error!("observe error in stream {:?}", e);
Some(Err(e))

212
disk/src/cache/pbv.rs vendored
View File

@@ -4,6 +4,7 @@ use crate::cache::pbvfs::{PreBinnedItem, PreBinnedValueFetchedStream};
use crate::cache::{node_ix_for_patch, MergedFromRemotes, PreBinnedQuery};
use crate::frame::makeframe::make_frame;
use crate::raw::EventsQuery;
use crate::streamlog::Streamlog;
use bytes::Bytes;
use err::Error;
use futures_core::Stream;
@@ -51,8 +52,12 @@ pub struct PreBinnedValueStream {
node_config: NodeConfigCached,
open_check_local_file: Option<Pin<Box<dyn Future<Output = Result<tokio::fs::File, std::io::Error>> + Send>>>,
fut2: Option<Pin<Box<dyn Stream<Item = Result<PreBinnedItem, Error>> + Send>>>,
data_complete: bool,
range_complete_observed: bool,
range_complete_emitted: bool,
errored: bool,
completed: bool,
streamlog: Streamlog,
}
impl PreBinnedValueStream {
@@ -64,92 +69,108 @@ impl PreBinnedValueStream {
node_config: node_config.clone(),
open_check_local_file: None,
fut2: None,
data_complete: false,
range_complete_observed: false,
range_complete_emitted: false,
errored: false,
completed: false,
streamlog: Streamlog::new(),
}
}
fn setup_merged_from_remotes(&mut self) {
let g = self.query.patch.bin_t_len();
warn!("no better resolution found for g {}", g);
let evq = EventsQuery {
channel: self.query.channel.clone(),
range: self.query.patch.patch_range(),
agg_kind: self.query.agg_kind.clone(),
};
if self.query.patch.patch_t_len() % self.query.patch.bin_t_len() != 0 {
error!(
"Patch length inconsistency {} {}",
self.query.patch.patch_t_len(),
self.query.patch.bin_t_len()
);
return;
}
// TODO do I need to set up more transformations or binning to deliver the requested data?
let count = self.query.patch.patch_t_len() / self.query.patch.bin_t_len();
let range = BinnedRange::covering_range(evq.range.clone(), count).unwrap();
let s1 = MergedFromRemotes::new(evq, self.node_config.node_config.cluster.clone());
let s2 = s1.into_binned_t(range);
let s2 = s2.map(|k| {
use MinMaxAvgScalarBinBatchStreamItem::*;
match k {
Ok(Values(k)) => Ok(PreBinnedItem::Batch(k)),
Ok(RangeComplete) => Ok(PreBinnedItem::RangeComplete),
Ok(EventDataReadStats(stats)) => Ok(PreBinnedItem::EventDataReadStats(stats)),
Ok(Log(item)) => Ok(PreBinnedItem::Log(item)),
Err(e) => Err(e),
}
});
self.fut2 = Some(Box::pin(s2));
}
fn setup_from_higher_res_prebinned(&mut self, range: PreBinnedPatchRange) {
let g = self.query.patch.bin_t_len();
let h = range.grid_spec.bin_t_len();
info!(
"try_setup_fetch_prebinned_higher_res found g {} h {} ratio {} mod {} {:?}",
g,
h,
g / h,
g % h,
range,
);
if g / h <= 1 {
error!("try_setup_fetch_prebinned_higher_res g {} h {}", g, h);
return;
}
if g / h > 200 {
error!("try_setup_fetch_prebinned_higher_res g {} h {}", g, h);
return;
}
if g % h != 0 {
error!("try_setup_fetch_prebinned_higher_res g {} h {}", g, h);
return;
}
let node_config = self.node_config.clone();
let patch_it = PreBinnedPatchIterator::from_range(range);
let s = futures_util::stream::iter(patch_it)
.map({
let q2 = self.query.clone();
move |patch| {
let query = PreBinnedQuery {
patch,
channel: q2.channel.clone(),
agg_kind: q2.agg_kind.clone(),
cache_usage: q2.cache_usage.clone(),
};
PreBinnedValueFetchedStream::new(&query, &node_config)
}
})
.filter_map(|k| match k {
Ok(k) => ready(Some(k)),
Err(e) => {
// TODO Reconsider error handling here:
error!("{:?}", e);
ready(None)
}
})
.flatten();
self.fut2 = Some(Box::pin(s));
}
fn try_setup_fetch_prebinned_higher_res(&mut self) {
info!("try_setup_fetch_prebinned_higher_res for {:?}", self.query.patch);
let g = self.query.patch.bin_t_len();
let range = self.query.patch.patch_range();
match PreBinnedPatchRange::covering_range(range, self.query.patch.bin_count() + 1) {
Some(range) => {
let h = range.grid_spec.bin_t_len();
info!(
"try_setup_fetch_prebinned_higher_res found g {} h {} ratio {} mod {} {:?}",
g,
h,
g / h,
g % h,
range,
);
if g / h <= 1 {
error!("try_setup_fetch_prebinned_higher_res g {} h {}", g, h);
return;
}
if g / h > 200 {
error!("try_setup_fetch_prebinned_higher_res g {} h {}", g, h);
return;
}
if g % h != 0 {
error!("try_setup_fetch_prebinned_higher_res g {} h {}", g, h);
return;
}
let node_config = self.node_config.clone();
let patch_it = PreBinnedPatchIterator::from_range(range);
let s = futures_util::stream::iter(patch_it)
.map({
let q2 = self.query.clone();
move |patch| {
let query = PreBinnedQuery {
patch,
channel: q2.channel.clone(),
agg_kind: q2.agg_kind.clone(),
cache_usage: q2.cache_usage.clone(),
};
PreBinnedValueFetchedStream::new(&query, &node_config)
}
})
.filter_map(|k| match k {
Ok(k) => ready(Some(k)),
Err(e) => {
// TODO Reconsider error handling here:
error!("{:?}", e);
ready(None)
}
})
.flatten();
self.fut2 = Some(Box::pin(s));
self.setup_from_higher_res_prebinned(range);
}
None => {
warn!("no better resolution found for g {}", g);
let evq = EventsQuery {
channel: self.query.channel.clone(),
range: self.query.patch.patch_range(),
agg_kind: self.query.agg_kind.clone(),
};
if self.query.patch.patch_t_len() % self.query.patch.bin_t_len() != 0 {
error!(
"Patch length inconsistency {} {}",
self.query.patch.patch_t_len(),
self.query.patch.bin_t_len()
);
return;
}
error!("try_setup_fetch_prebinned_higher_res apply all requested transformations and T-binning");
let count = self.query.patch.patch_t_len() / self.query.patch.bin_t_len();
let range = BinnedRange::covering_range(evq.range.clone(), count).unwrap();
let s1 = MergedFromRemotes::new(evq, self.node_config.node_config.cluster.clone());
let s2 = s1.into_binned_t(range).map(|k| match k {
Ok(MinMaxAvgScalarBinBatchStreamItem::Values(k)) => Ok(PreBinnedItem::Batch(k)),
Ok(MinMaxAvgScalarBinBatchStreamItem::RangeComplete) => Ok(PreBinnedItem::RangeComplete),
Ok(MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats(stats)) => {
Ok(PreBinnedItem::EventDataReadStats(stats))
}
Err(e) => Err(e),
});
self.fut2 = Some(Box::pin(s2));
self.setup_merged_from_remotes();
}
}
}
@@ -168,17 +189,54 @@ impl Stream for PreBinnedValueStream {
self.completed = true;
return Ready(None);
}
if let Some(item) = self.streamlog.pop() {
return Ready(Some(Ok(PreBinnedItem::Log(item))));
}
'outer: loop {
break if let Some(fut) = self.fut2.as_mut() {
break if self.data_complete {
if self.range_complete_observed {
if self.range_complete_emitted {
self.completed = true;
Ready(None)
} else {
let msg = format!(
"======== STREAMLOG ========= WRITE CACHE FILE\n{:?}\n\n\n",
self.query.patch
);
self.streamlog.append(Level::INFO, msg);
info!(
"======================== WRITE CACHE FILE\n{:?}\n\n\n",
self.query.patch
);
self.range_complete_emitted = true;
Ready(Some(Ok(PreBinnedItem::RangeComplete)))
}
} else {
self.completed = true;
Ready(None)
}
} else if let Some(fut) = self.fut2.as_mut() {
match fut.poll_next_unpin(cx) {
Ready(Some(k)) => match k {
Ok(k) => Ready(Some(Ok(k))),
Ok(PreBinnedItem::RangeComplete) => {
self.range_complete_observed = true;
//Ready(Some(Ok(PreBinnedItem::RangeComplete)))
continue 'outer;
}
Ok(PreBinnedItem::Batch(batch)) => Ready(Some(Ok(PreBinnedItem::Batch(batch)))),
Ok(PreBinnedItem::EventDataReadStats(stats)) => {
Ready(Some(Ok(PreBinnedItem::EventDataReadStats(stats))))
}
Ok(PreBinnedItem::Log(item)) => Ready(Some(Ok(PreBinnedItem::Log(item)))),
Err(e) => {
self.errored = true;
Ready(Some(Err(e)))
}
},
Ready(None) => Ready(None),
Ready(None) => {
self.data_complete = true;
continue 'outer;
}
Pending => Pending,
}
} else if let Some(fut) = self.open_check_local_file.as_mut() {

View File

@@ -2,6 +2,7 @@ use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
use crate::cache::{node_ix_for_patch, HttpBodyAsAsyncRead, PreBinnedQuery};
use crate::frame::inmem::InMemoryFrameAsyncReadStream;
use crate::frame::makeframe::decode_frame;
use crate::streamlog::LogItem;
use err::Error;
use futures_core::Stream;
use futures_util::{pin_mut, FutureExt};
@@ -50,6 +51,7 @@ pub enum PreBinnedItem {
RangeComplete,
EventDataReadStats(EventDataReadStats),
//ValuesExtractStats(ValuesExtractStats),
Log(LogItem),
}
impl Stream for PreBinnedValueFetchedStream {

View File

@@ -31,6 +31,7 @@ pub mod index;
pub mod merge;
pub mod paths;
pub mod raw;
pub mod streamlog;
pub async fn read_test_1(query: &netpod::AggQuerySingleChannel, node: Node) -> Result<netpod::BodyStream, Error> {
let path = paths::datapath(query.timebin as u64, &query.channel_config, &node);

View File

@@ -1,9 +1,11 @@
use crate::agg::eventbatch::{MinMaxAvgScalarEventBatch, MinMaxAvgScalarEventBatchStreamItem};
use crate::agg::{Dim1F32Stream, Dim1F32StreamItem, ValuesDim1};
use crate::eventchunker::EventChunkerItem;
use crate::streamlog::LogItem;
use err::Error;
use futures_core::Stream;
use futures_util::StreamExt;
use std::collections::VecDeque;
use std::pin::Pin;
use std::task::{Context, Poll};
#[allow(unused_imports)]
@@ -163,6 +165,7 @@ where
range_complete_observed_all_emitted: bool,
data_emit_complete: bool,
batch_size: usize,
logitems: VecDeque<LogItem>,
}
impl<S> MergedMinMaxAvgScalarStream<S>
@@ -188,6 +191,7 @@ where
range_complete_observed_all_emitted: false,
data_emit_complete: false,
batch_size: 64,
logitems: VecDeque::new(),
}
}
@@ -219,6 +223,10 @@ where
}
continue 'l1;
}
MinMaxAvgScalarEventBatchStreamItem::Log(item) => {
self.logitems.push_back(item);
continue 'l1;
}
MinMaxAvgScalarEventBatchStreamItem::EventDataReadStats(_stats) => {
// TODO merge also the stats: either just sum, or sum up by input index.
todo!();
@@ -265,6 +273,9 @@ where
self.completed = true;
return Ready(None);
}
if let Some(item) = self.logitems.pop_front() {
return Ready(Some(Ok(MinMaxAvgScalarEventBatchStreamItem::Log(item))));
}
'outer: loop {
break if self.data_emit_complete {
if self.range_complete_observed_all {

93
disk/src/streamlog.rs Normal file
View File

@@ -0,0 +1,93 @@
use netpod::log::*;
use serde::de::{Error, Visitor};
use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
use std::fmt::Formatter;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct LogItem {
#[serde(with = "levelserde")]
level: Level,
msg: String,
}
struct VisitLevel;
impl<'de> Visitor<'de> for VisitLevel {
type Value = u32;
fn expecting(&self, fmt: &mut Formatter) -> std::fmt::Result {
write!(fmt, "")
}
fn visit_u32<E>(self, v: u32) -> Result<Self::Value, E>
where
E: Error,
{
Ok(v)
}
}
mod levelserde {
use super::Level;
use crate::streamlog::VisitLevel;
use serde::{Deserializer, Serializer};
pub fn serialize<S>(t: &Level, s: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let g = match *t {
Level::ERROR => 1,
Level::WARN => 2,
Level::INFO => 3,
Level::DEBUG => 4,
Level::TRACE => 5,
};
s.serialize_u32(g)
}
pub fn deserialize<'de, D>(d: D) -> Result<Level, D::Error>
where
D: Deserializer<'de>,
{
match d.deserialize_u32(VisitLevel) {
Ok(level) => {
let g = if level == 1 {
Level::ERROR
} else if level == 2 {
Level::WARN
} else if level == 3 {
Level::INFO
} else if level == 4 {
Level::DEBUG
} else if level == 5 {
Level::TRACE
} else {
Level::TRACE
};
Ok(g)
}
Err(e) => Err(e),
}
}
}
pub struct Streamlog {
items: VecDeque<LogItem>,
}
impl Streamlog {
pub fn new() -> Self {
Self { items: VecDeque::new() }
}
pub fn append(&mut self, level: Level, msg: String) {
let item = LogItem { level, msg };
self.items.push_back(item);
}
pub fn pop(&mut self) -> Option<LogItem> {
self.items.pop_back()
}
}