Adjust caching and disk usage log output

This commit is contained in:
Dominik Werder
2021-05-10 13:56:11 +02:00
parent 2df0be8ed2
commit 8cedf06d9c
15 changed files with 128 additions and 58 deletions

View File

@@ -4,10 +4,9 @@ use bytes::{Buf, BytesMut};
use err::Error;
use futures_core::Stream;
use futures_util::StreamExt;
#[allow(unused_imports)]
use netpod::log::*;
use netpod::timeunits::SEC;
use netpod::{ChannelConfig, EventDataReadStats, NanoRange, ScalarType, Shape};
use netpod::{ByteSize, ChannelConfig, EventDataReadStats, NanoRange, ScalarType, Shape};
use std::pin::Pin;
use std::task::{Context, Poll};
@@ -19,12 +18,11 @@ pub struct EventChunker {
errored: bool,
completed: bool,
range: NanoRange,
stats_conf: EventChunkerConf,
seen_beyond_range: bool,
sent_beyond_range: bool,
data_emit_complete: bool,
final_stats_sent: bool,
data_since_last_stats: u32,
stats_emit_interval: u32,
parsed_bytes: u64,
}
@@ -38,11 +36,23 @@ struct ParseResult {
parsed_bytes: u64,
}
#[derive(Clone, Debug)]
pub struct EventChunkerConf {
disk_stats_every: ByteSize,
}
impl EventChunkerConf {
pub fn new(disk_stats_every: ByteSize) -> Self {
Self { disk_stats_every }
}
}
impl EventChunker {
pub fn from_start(
inp: Pin<Box<dyn Stream<Item = Result<FileChunkRead, Error>> + Send>>,
channel_config: ChannelConfig,
range: NanoRange,
stats_conf: EventChunkerConf,
) -> Self {
let mut inp = NeedMinBuffer::new(inp);
inp.set_need_min(6);
@@ -54,12 +64,11 @@ impl EventChunker {
errored: false,
completed: false,
range,
stats_conf,
seen_beyond_range: false,
sent_beyond_range: false,
data_emit_complete: false,
final_stats_sent: false,
data_since_last_stats: 0,
stats_emit_interval: 256,
parsed_bytes: 0,
}
}
@@ -68,8 +77,9 @@ impl EventChunker {
inp: Pin<Box<dyn Stream<Item = Result<FileChunkRead, Error>> + Send>>,
channel_config: ChannelConfig,
range: NanoRange,
stats_conf: EventChunkerConf,
) -> Self {
let mut ret = Self::from_start(inp, channel_config, range);
let mut ret = Self::from_start(inp, channel_config, range, stats_conf);
ret.state = DataFileState::Event;
ret.need_min = 4;
ret.inp.set_need_min(4);
@@ -85,7 +95,6 @@ impl EventChunker {
let mut parsed_bytes = 0;
use byteorder::{ReadBytesExt, BE};
loop {
trace!("parse_buf LOOP buf len {} need_min {}", buf.len(), self.need_min);
if (buf.len() as u32) < self.need_min {
break;
}
@@ -209,10 +218,10 @@ impl EventChunker {
) {
Ok(c1) => {
assert!(c1 as u32 == k1);
trace!("decompress result c1 {} k1 {}", c1, k1);
//trace!("decompress result c1 {} k1 {}", c1, k1);
if ts < self.range.beg {
} else if ts >= self.range.end {
error!("EVENT AFTER RANGE {}", ts / SEC);
Err(Error::with_msg(format!("event after range {}", ts / SEC)))?;
} else {
ret.add_event(
ts,
@@ -287,10 +296,10 @@ impl Stream for EventChunker {
} else if self.errored {
self.completed = true;
Ready(None)
} else if self.data_since_last_stats >= self.stats_emit_interval {
self.data_since_last_stats = 0;
} else if self.parsed_bytes >= self.stats_conf.disk_stats_every.bytes() as u64 {
let item = EventDataReadStats {
parsed_bytes: self.parsed_bytes,
//parsed_bytes: self.parsed_bytes,
parsed_bytes: 1000,
};
self.parsed_bytes = 0;
let ret = EventChunkerItem::EventDataReadStats(item);
@@ -306,7 +315,6 @@ impl Stream for EventChunker {
continue 'outer;
}
} else if self.data_emit_complete {
self.data_since_last_stats = 0;
let item = EventDataReadStats {
parsed_bytes: self.parsed_bytes,
};
@@ -327,19 +335,16 @@ impl Stream for EventChunker {
}
if self.need_min > 1024 * 8 {
let msg = format!("spurious EventChunker asks for need_min {}", self.need_min);
warn!("{}", msg);
self.errored = true;
Ready(Some(Err(Error::with_msg(msg))))
} else {
let x = self.need_min;
self.inp.set_need_min(x);
self.data_since_last_stats += 1;
let ret = EventChunkerItem::Events(res.events);
Ready(Some(Ok(ret)))
}
}
Err(e) => {
error!("EventChunker parse_buf returned error {:?}", e);
self.errored = true;
Ready(Some(Err(e.into())))
}