This commit is contained in:
Dominik Werder
2021-05-06 17:53:45 +02:00
parent f391eca970
commit af9c11bdd8
16 changed files with 246 additions and 270 deletions

View File

@@ -21,6 +21,11 @@ pub struct EventChunker {
range: NanoRange,
seen_beyond_range: bool,
sent_beyond_range: bool,
data_emit_complete: bool,
final_stats_sent: bool,
data_since_last_stats: u32,
stats_emit_interval: u32,
parsed_bytes: u64,
}
enum DataFileState {
@@ -30,6 +35,7 @@ enum DataFileState {
struct ParseResult {
events: EventFull,
parsed_bytes: u64,
}
impl EventChunker {
@@ -50,6 +56,11 @@ impl EventChunker {
range,
seen_beyond_range: false,
sent_beyond_range: false,
data_emit_complete: false,
final_stats_sent: false,
data_since_last_stats: 0,
stats_emit_interval: 1,
parsed_bytes: 0,
}
}
@@ -70,6 +81,11 @@ impl EventChunker {
range,
seen_beyond_range: false,
sent_beyond_range: false,
data_emit_complete: false,
final_stats_sent: false,
data_since_last_stats: 0,
stats_emit_interval: 1,
parsed_bytes: 0,
}
}
@@ -79,6 +95,7 @@ impl EventChunker {
fn parse_buf_inner(&mut self, buf: &mut BytesMut) -> Result<ParseResult, Error> {
let mut ret = EventFull::empty();
let mut parsed_bytes = 0;
use byteorder::{ReadBytesExt, BE};
loop {
trace!("parse_buf LOOP buf len {} need_min {}", buf.len(), self.need_min);
@@ -107,7 +124,7 @@ impl EventChunker {
self.state = DataFileState::Event;
self.need_min = 4;
buf.advance(totlen);
// TODO ret.event_data_read_stats.parsed_bytes += totlen as u64;
parsed_bytes += totlen as u64;
}
}
DataFileState::Event => {
@@ -127,6 +144,7 @@ impl EventChunker {
let pulse = sl.read_i64::<BE>().unwrap() as u64;
if ts >= self.range.end {
self.seen_beyond_range = true;
self.data_emit_complete = true;
break;
}
if ts < self.range.beg {
@@ -226,16 +244,17 @@ impl EventChunker {
"TODO uncompressed event parsing not yet implemented"
)))?;
}
trace!("advance and reset need_min");
buf.advance(len as usize);
// TODO ret.event_data_read_stats.parsed_bytes += len as u64;
parsed_bytes += len as u64;
self.need_min = 4;
}
}
}
}
trace!("AFTER PARSE LOOP len {}", ret.tss.len());
Ok(ParseResult { events: ret })
Ok(ParseResult {
events: ret,
parsed_bytes,
})
}
}
@@ -275,60 +294,83 @@ impl Stream for EventChunker {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
if self.completed {
panic!("EventChunker poll_next on completed");
}
if self.errored {
self.completed = true;
return Ready(None);
}
if self.seen_beyond_range {
if self.sent_beyond_range {
self.completed = true;
return Ready(None);
} else {
self.sent_beyond_range = true;
return Ready(Some(Ok(EventChunkerItem::RangeComplete)));
}
}
match self.inp.poll_next_unpin(cx) {
Ready(Some(Ok(mut fcr))) => {
let r = self.parse_buf(&mut fcr.buf);
match r {
Ok(res) => {
if fcr.buf.len() > 0 {
// TODO gather stats about this:
self.inp.put_back(fcr);
}
if self.need_min > 1024 * 8 {
let msg = format!("spurious EventChunker asks for need_min {}", self.need_min);
warn!("{}", msg);
self.errored = true;
Ready(Some(Err(Error::with_msg(msg))))
} else {
let x = self.need_min;
self.inp.set_need_min(x);
let ret = EventChunkerItem::Events(res.events);
let ret = Ok(ret);
Ready(Some(ret))
}
}
Err(e) => {
error!("EventChunker parse_buf returned error {:?}", e);
self.errored = true;
Ready(Some(Err(e.into())))
}
}
}
Ready(Some(Err(e))) => {
self.errored = true;
Ready(Some(Err(e)))
}
Ready(None) => {
'outer: loop {
break if self.completed {
panic!("EventChunker poll_next on completed");
} else if self.errored {
self.completed = true;
Ready(None)
}
Pending => Pending,
} else if self.data_since_last_stats >= self.stats_emit_interval {
self.data_since_last_stats = 0;
let item = EventDataReadStats {
parsed_bytes: self.parsed_bytes,
};
self.parsed_bytes = 0;
let ret = EventChunkerItem::EventDataReadStats(item);
Ready(Some(Ok(ret)))
} else if self.sent_beyond_range {
self.completed = true;
Ready(None)
} else if self.final_stats_sent {
self.sent_beyond_range = true;
if self.seen_beyond_range {
Ready(Some(Ok(EventChunkerItem::RangeComplete)))
} else {
continue 'outer;
}
} else if self.data_emit_complete {
self.data_since_last_stats = 0;
let item = EventDataReadStats {
parsed_bytes: self.parsed_bytes,
};
self.parsed_bytes = 0;
warn!("EMIT FINAL STATS {:?}", item);
let ret = EventChunkerItem::EventDataReadStats(item);
self.final_stats_sent = true;
Ready(Some(Ok(ret)))
} else {
match self.inp.poll_next_unpin(cx) {
Ready(Some(Ok(mut fcr))) => {
let r = self.parse_buf(&mut fcr.buf);
match r {
Ok(res) => {
self.parsed_bytes += res.parsed_bytes;
if fcr.buf.len() > 0 {
// TODO gather stats about this:
self.inp.put_back(fcr);
}
if self.need_min > 1024 * 8 {
let msg = format!("spurious EventChunker asks for need_min {}", self.need_min);
warn!("{}", msg);
self.errored = true;
Ready(Some(Err(Error::with_msg(msg))))
} else {
let x = self.need_min;
self.inp.set_need_min(x);
self.data_since_last_stats += 1;
let ret = EventChunkerItem::Events(res.events);
let ret = Ok(ret);
Ready(Some(ret))
}
}
Err(e) => {
error!("EventChunker parse_buf returned error {:?}", e);
self.errored = true;
Ready(Some(Err(e.into())))
}
}
}
Ready(Some(Err(e))) => {
self.errored = true;
Ready(Some(Err(e)))
}
Ready(None) => {
self.data_emit_complete = true;
continue 'outer;
}
Pending => Pending,
}
};
}
}
}