WIP on adding stats to the streams
This commit is contained in:
@@ -1,19 +1,18 @@
|
||||
use crate::NeedMinBuffer;
|
||||
use crate::{FileChunkRead, NeedMinBuffer};
|
||||
use bitshuffle::bitshuffle_decompress;
|
||||
use bytes::{Buf, BytesMut};
|
||||
use err::Error;
|
||||
use futures_core::Stream;
|
||||
use futures_util::pin_mut;
|
||||
use futures_util::StreamExt;
|
||||
#[allow(unused_imports)]
|
||||
use netpod::log::*;
|
||||
use netpod::timeunits::SEC;
|
||||
use netpod::{ChannelConfig, NanoRange, ScalarType, Shape};
|
||||
use netpod::{ChannelConfig, EventDataReadStats, NanoRange, ScalarType, Shape};
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
pub struct EventChunker {
|
||||
inp: NeedMinBuffer,
|
||||
polled: u32,
|
||||
state: DataFileState,
|
||||
need_min: u32,
|
||||
channel_config: ChannelConfig,
|
||||
@@ -30,7 +29,7 @@ enum DataFileState {
|
||||
|
||||
impl EventChunker {
|
||||
pub fn from_start(
|
||||
inp: Pin<Box<dyn Stream<Item = Result<BytesMut, Error>> + Send>>,
|
||||
inp: Pin<Box<dyn Stream<Item = Result<FileChunkRead, Error>> + Send>>,
|
||||
channel_config: ChannelConfig,
|
||||
range: NanoRange,
|
||||
) -> Self {
|
||||
@@ -38,7 +37,6 @@ impl EventChunker {
|
||||
inp.set_need_min(6);
|
||||
Self {
|
||||
inp,
|
||||
polled: 0,
|
||||
state: DataFileState::FileHeader,
|
||||
need_min: 6,
|
||||
channel_config,
|
||||
@@ -50,7 +48,7 @@ impl EventChunker {
|
||||
}
|
||||
|
||||
pub fn from_event_boundary(
|
||||
inp: Pin<Box<dyn Stream<Item = Result<BytesMut, Error>> + Send>>,
|
||||
inp: Pin<Box<dyn Stream<Item = Result<FileChunkRead, Error>> + Send>>,
|
||||
channel_config: ChannelConfig,
|
||||
range: NanoRange,
|
||||
) -> Self {
|
||||
@@ -58,7 +56,6 @@ impl EventChunker {
|
||||
inp.set_need_min(4);
|
||||
Self {
|
||||
inp,
|
||||
polled: 0,
|
||||
state: DataFileState::Event,
|
||||
need_min: 4,
|
||||
channel_config,
|
||||
@@ -103,6 +100,7 @@ impl EventChunker {
|
||||
self.state = DataFileState::Event;
|
||||
self.need_min = 4;
|
||||
buf.advance(totlen);
|
||||
ret.event_data_read_stats.parsed_bytes += totlen as u64;
|
||||
}
|
||||
}
|
||||
DataFileState::Event => {
|
||||
@@ -110,24 +108,12 @@ impl EventChunker {
|
||||
let len = sl.read_i32::<BE>().unwrap();
|
||||
assert!(len >= 20 && len < 1024 * 1024 * 10);
|
||||
let len = len as u32;
|
||||
trace!(
|
||||
"+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+- event len {}",
|
||||
len,
|
||||
);
|
||||
if (buf.len() as u32) < 20 {
|
||||
// TODO gather stats about how often we find not enough input
|
||||
//info!("parse_buf not enough B");
|
||||
self.need_min = len as u32;
|
||||
break;
|
||||
} else if (buf.len() as u32) < len {
|
||||
{
|
||||
// TODO this is just for testing
|
||||
let mut sl = std::io::Cursor::new(buf.as_ref());
|
||||
sl.read_i32::<BE>().unwrap();
|
||||
sl.read_i64::<BE>().unwrap();
|
||||
let ts = sl.read_i64::<BE>().unwrap();
|
||||
trace!("parse_buf not enough C len {} have {} ts {}", len, buf.len(), ts);
|
||||
}
|
||||
self.need_min = len as u32;
|
||||
break;
|
||||
} else {
|
||||
@@ -239,6 +225,7 @@ impl EventChunker {
|
||||
}
|
||||
trace!("advance and reset need_min");
|
||||
buf.advance(len as usize);
|
||||
ret.event_data_read_stats.parsed_bytes += len as u64;
|
||||
self.need_min = 4;
|
||||
}
|
||||
}
|
||||
@@ -289,25 +276,15 @@ impl Stream for EventChunker {
|
||||
self.completed = true;
|
||||
return Ready(None);
|
||||
}
|
||||
self.polled += 1;
|
||||
if self.polled >= 20000 {
|
||||
warn!("EventChunker poll limit reached");
|
||||
self.errored = true;
|
||||
return Poll::Ready(Some(Err(Error::with_msg(format!("EventChunker poll limit reached")))));
|
||||
}
|
||||
let g = &mut self.inp;
|
||||
pin_mut!(g);
|
||||
trace!("EventChunker call input poll_next");
|
||||
match g.poll_next(cx) {
|
||||
Ready(Some(Ok(mut buf))) => {
|
||||
trace!("EventChunker got buffer len {}", buf.len());
|
||||
let r = self.parse_buf(&mut buf);
|
||||
match self.inp.poll_next_unpin(cx) {
|
||||
Ready(Some(Ok(mut fcr))) => {
|
||||
trace!("EventChunker got buffer len {}", fcr.buf.len());
|
||||
let r = self.parse_buf(&mut fcr.buf);
|
||||
match r {
|
||||
Ok(res) => {
|
||||
if buf.len() > 0 {
|
||||
if fcr.buf.len() > 0 {
|
||||
// TODO gather stats about this:
|
||||
//info!("parse_buf returned {} leftover bytes to me", buf.len());
|
||||
self.inp.put_back(buf);
|
||||
self.inp.put_back(fcr);
|
||||
}
|
||||
if self.need_min > 1024 * 8 {
|
||||
let msg = format!("spurious EventChunker asks for need_min {}", self.need_min);
|
||||
@@ -345,6 +322,7 @@ pub struct EventFull {
|
||||
pub pulses: Vec<u64>,
|
||||
pub decomps: Vec<Option<BytesMut>>,
|
||||
pub scalar_types: Vec<ScalarType>,
|
||||
pub event_data_read_stats: EventDataReadStats,
|
||||
}
|
||||
|
||||
impl EventFull {
|
||||
@@ -354,6 +332,7 @@ impl EventFull {
|
||||
pulses: vec![],
|
||||
decomps: vec![],
|
||||
scalar_types: vec![],
|
||||
event_data_read_stats: EventDataReadStats::new(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user