Try to defend better against unexpected shapes where possible
This commit is contained in:
@@ -1,7 +1,6 @@
|
||||
use chrono::Utc;
|
||||
use err::Error;
|
||||
use netpod::log::*;
|
||||
use netpod::timeunits::SEC;
|
||||
use netpod::AggKind;
|
||||
use netpod::Cluster;
|
||||
use netpod::NodeConfigCached;
|
||||
|
||||
@@ -186,7 +186,7 @@ impl Stream for EventChunkerMultifile {
|
||||
let file = ofs.files.pop().unwrap();
|
||||
let path = file.path;
|
||||
let msg = format!("handle OFS {:?}", ofs);
|
||||
let item = LogItem::quick(Level::DEBUG, msg);
|
||||
let item = LogItem::from_node(self.node_ix, Level::DEBUG, msg);
|
||||
match file.file {
|
||||
Some(file) => {
|
||||
let inp = Box::pin(crate::file_content_stream(
|
||||
@@ -200,6 +200,7 @@ impl Stream for EventChunkerMultifile {
|
||||
self.fetch_info.clone(),
|
||||
self.range.clone(),
|
||||
self.event_chunker_conf.clone(),
|
||||
self.node_ix,
|
||||
path.clone(),
|
||||
self.expand,
|
||||
);
|
||||
@@ -211,12 +212,12 @@ impl Stream for EventChunkerMultifile {
|
||||
Ready(Some(Ok(StreamItem::Log(item))))
|
||||
} else if ofs.files.len() == 0 {
|
||||
let msg = format!("handle OFS {:?} NO FILES", ofs);
|
||||
let item = LogItem::quick(Level::DEBUG, msg);
|
||||
let item = LogItem::from_node(self.node_ix, Level::DEBUG, msg);
|
||||
Ready(Some(Ok(StreamItem::Log(item))))
|
||||
} else {
|
||||
let paths: Vec<_> = ofs.files.iter().map(|x| &x.path).collect();
|
||||
let msg = format!("handle OFS MERGED timebin {} {:?}", ofs.timebin, paths);
|
||||
let item = LogItem::quick(Level::DEBUG, msg);
|
||||
let item = LogItem::from_node(self.node_ix, Level::DEBUG, msg);
|
||||
let mut chunkers = Vec::new();
|
||||
for of in ofs.files {
|
||||
if let Some(file) = of.file {
|
||||
@@ -231,6 +232,7 @@ impl Stream for EventChunkerMultifile {
|
||||
self.fetch_info.clone(),
|
||||
self.range.clone(),
|
||||
self.event_chunker_conf.clone(),
|
||||
self.node_ix,
|
||||
of.path.clone(),
|
||||
self.expand,
|
||||
);
|
||||
@@ -250,7 +252,8 @@ impl Stream for EventChunkerMultifile {
|
||||
},
|
||||
Ready(None) => {
|
||||
self.done = true;
|
||||
let item = LogItem::quick(
|
||||
let item = LogItem::from_node(
|
||||
self.node_ix,
|
||||
Level::DEBUG,
|
||||
format!(
|
||||
"EventChunkerMultifile used {} datafiles beg {} end {} node_ix {}",
|
||||
|
||||
@@ -5,6 +5,7 @@ use err::Error;
|
||||
use err::ThisError;
|
||||
use futures_util::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use items_0::streamitem::LogItem;
|
||||
use items_0::streamitem::RangeCompletableItem;
|
||||
use items_0::streamitem::StatsItem;
|
||||
use items_0::streamitem::StreamItem;
|
||||
@@ -23,6 +24,7 @@ use netpod::Shape;
|
||||
use parse::channelconfig::CompressionMethod;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use std::collections::VecDeque;
|
||||
use std::io::Cursor;
|
||||
use std::path::PathBuf;
|
||||
use std::pin::Pin;
|
||||
@@ -37,25 +39,15 @@ pub enum DataParseError {
|
||||
DataFrameLengthMismatch,
|
||||
FileHeaderTooShort,
|
||||
BadVersionTag,
|
||||
#[error("HeaderTooLarge")]
|
||||
HeaderTooLarge,
|
||||
#[error("Utf8Error")]
|
||||
Utf8Error,
|
||||
#[error("EventTooShort")]
|
||||
EventTooShort,
|
||||
#[error("EventTooLong")]
|
||||
EventTooLong,
|
||||
#[error("TooManyBeforeRange")]
|
||||
TooManyBeforeRange,
|
||||
#[error("EventWithOptional")]
|
||||
EventWithOptional,
|
||||
#[error("BadTypeIndex")]
|
||||
BadTypeIndex,
|
||||
#[error("WaveShapeWithoutEventArray")]
|
||||
WaveShapeWithoutEventArray,
|
||||
#[error("ShapedWithoutDims")]
|
||||
ShapedWithoutDims,
|
||||
#[error("TooManyDims")]
|
||||
TooManyDims,
|
||||
UnknownCompression,
|
||||
BadCompresionBlockSize,
|
||||
@@ -76,6 +68,7 @@ pub struct EventChunker {
|
||||
data_emit_complete: bool,
|
||||
final_stats_sent: bool,
|
||||
parsed_bytes: u64,
|
||||
node_ix: usize,
|
||||
dbg_path: PathBuf,
|
||||
last_ts: u64,
|
||||
expand: bool,
|
||||
@@ -87,6 +80,7 @@ pub struct EventChunker {
|
||||
repeated_ts_count: usize,
|
||||
config_mismatch_discard: usize,
|
||||
discard_count: usize,
|
||||
log_items: VecDeque<LogItem>,
|
||||
}
|
||||
|
||||
impl Drop for EventChunker {
|
||||
@@ -161,14 +155,15 @@ impl EventChunker {
|
||||
fetch_info: SfChFetchInfo,
|
||||
range: NanoRange,
|
||||
stats_conf: EventChunkerConf,
|
||||
node_ix: usize,
|
||||
dbg_path: PathBuf,
|
||||
expand: bool,
|
||||
) -> Self {
|
||||
debug!("{}::{}", Self::self_name(), "from_start");
|
||||
debug!("{}::{} node {}", Self::self_name(), "from_start", node_ix);
|
||||
let need_min_max = match fetch_info.shape() {
|
||||
Shape::Scalar => 1024 * 8,
|
||||
Shape::Wave(_) => 1024 * 32,
|
||||
Shape::Image(_, _) => 1024 * 1024 * 40,
|
||||
Shape::Image(_, _) => 1024 * 1024 * 80,
|
||||
};
|
||||
let mut inp = NeedMinBuffer::new(inp);
|
||||
inp.set_need_min(6);
|
||||
@@ -188,6 +183,7 @@ impl EventChunker {
|
||||
final_stats_sent: false,
|
||||
parsed_bytes: 0,
|
||||
dbg_path,
|
||||
node_ix,
|
||||
last_ts: 0,
|
||||
expand,
|
||||
decomp_dt_histo: HistoLog2::new(8),
|
||||
@@ -198,6 +194,7 @@ impl EventChunker {
|
||||
repeated_ts_count: 0,
|
||||
config_mismatch_discard: 0,
|
||||
discard_count: 0,
|
||||
log_items: VecDeque::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -207,28 +204,30 @@ impl EventChunker {
|
||||
fetch_info: SfChFetchInfo,
|
||||
range: NanoRange,
|
||||
stats_conf: EventChunkerConf,
|
||||
node_ix: usize,
|
||||
dbg_path: PathBuf,
|
||||
expand: bool,
|
||||
) -> Self {
|
||||
debug!("{}::{}", Self::self_name(), "from_event_boundary");
|
||||
let mut ret = Self::from_start(inp, fetch_info, range, stats_conf, dbg_path, expand);
|
||||
debug!("{}::{} node {}", Self::self_name(), "from_event_boundary", node_ix);
|
||||
let mut ret = Self::from_start(inp, fetch_info, range, stats_conf, node_ix, dbg_path, expand);
|
||||
ret.state = DataFileState::Event;
|
||||
ret.need_min = 4;
|
||||
ret.inp.set_need_min(4);
|
||||
ret
|
||||
}
|
||||
|
||||
fn parse_buf(&mut self, buf: &mut BytesMut) -> Result<ParseResult, Error> {
|
||||
fn parse_buf(&mut self, buf: &mut BytesMut) -> Result<(ParseResult, Vec<LogItem>), Error> {
|
||||
span!(Level::INFO, "EventChunker::parse_buf")
|
||||
.in_scope(|| self.parse_buf_inner(buf))
|
||||
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))
|
||||
}
|
||||
|
||||
fn parse_buf_inner(&mut self, buf: &mut BytesMut) -> Result<ParseResult, DataParseError> {
|
||||
fn parse_buf_inner(&mut self, buf: &mut BytesMut) -> Result<(ParseResult, Vec<LogItem>), DataParseError> {
|
||||
use byteorder::ReadBytesExt;
|
||||
use byteorder::BE;
|
||||
trace!("parse_buf_inner buf len {}", buf.len());
|
||||
let mut ret = EventFull::empty();
|
||||
let mut log_items = Vec::new();
|
||||
let mut parsed_bytes = 0;
|
||||
loop {
|
||||
if (buf.len() as u32) < self.need_min {
|
||||
@@ -274,9 +273,9 @@ impl EventChunker {
|
||||
return Err(DataParseError::EventTooShort);
|
||||
}
|
||||
match self.fetch_info.shape() {
|
||||
Shape::Scalar if len > 512 => return Err(DataParseError::EventTooLong),
|
||||
Shape::Wave(_) if len > 8 * 1024 * 256 => return Err(DataParseError::EventTooLong),
|
||||
Shape::Image(_, _) if len > 1024 * 1024 * 40 => return Err(DataParseError::EventTooLong),
|
||||
Shape::Scalar if len > 1000 => return Err(DataParseError::EventTooLong),
|
||||
Shape::Wave(_) if len > 500000 * 8 => return Err(DataParseError::EventTooLong),
|
||||
Shape::Image(_, _) if len > 3200 * 3200 * 8 => return Err(DataParseError::EventTooLong),
|
||||
_ => {}
|
||||
}
|
||||
let len = len as u32;
|
||||
@@ -405,6 +404,8 @@ impl EventChunker {
|
||||
for i1 in 0..shape_dim {
|
||||
shape_lens[i1 as usize] = sl.read_u32::<BE>().unwrap();
|
||||
}
|
||||
// NOTE the databuffer does not fill in this correctly, the data on disk
|
||||
// contains often just "scalar" even though it is a waveform.
|
||||
let shape_this = {
|
||||
if is_shaped {
|
||||
if shape_dim == 1 {
|
||||
@@ -433,6 +434,32 @@ impl EventChunker {
|
||||
} else {
|
||||
None
|
||||
};
|
||||
if self.fetch_info.scalar_type().ne(&scalar_type) {
|
||||
discard = true;
|
||||
let msg = format!(
|
||||
"scalar_type mismatch {:?} {:?} {:?}",
|
||||
scalar_type,
|
||||
self.fetch_info.scalar_type(),
|
||||
self.dbg_path,
|
||||
);
|
||||
let item = LogItem::from_node(self.node_ix, Level::WARN, msg);
|
||||
log_items.push(item);
|
||||
}
|
||||
if false {
|
||||
// Unfortunately the shape stored by databuffer recording on disk is not reliable
|
||||
// especially for waveforms it will wrongly indicate scalar. So this is unusable.
|
||||
if self.fetch_info.shape().ne(&shape_this) {
|
||||
discard = true;
|
||||
let msg = format!(
|
||||
"shape mismatch {:?} {:?} {:?}",
|
||||
shape_this,
|
||||
self.fetch_info.shape(),
|
||||
self.dbg_path,
|
||||
);
|
||||
let item = LogItem::from_node(self.node_ix, Level::WARN, msg);
|
||||
log_items.push(item);
|
||||
}
|
||||
}
|
||||
let p1 = sl.position();
|
||||
let n1 = p1 - p0;
|
||||
let n2 = len as u64 - n1 - 4;
|
||||
@@ -449,6 +476,35 @@ impl EventChunker {
|
||||
shape_this,
|
||||
comp_this,
|
||||
);
|
||||
match ret.shape_derived(ret.len() - 1, self.fetch_info.shape()) {
|
||||
Ok(sh) => {
|
||||
if sh.ne(self.fetch_info.shape()) {
|
||||
self.discard_count += 1;
|
||||
ret.pop_back();
|
||||
let msg = format!(
|
||||
"shape_derived mismatch {:?} {:?} {:?} {:?}",
|
||||
self.fetch_info.scalar_type(),
|
||||
self.fetch_info.shape(),
|
||||
sh,
|
||||
self.dbg_path,
|
||||
);
|
||||
let item = LogItem::from_node(self.node_ix, Level::WARN, msg);
|
||||
log_items.push(item);
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
self.discard_count += 1;
|
||||
ret.pop_back();
|
||||
let msg = format!(
|
||||
"shape_derived error {:?} {:?} {:?}",
|
||||
self.fetch_info.scalar_type(),
|
||||
self.fetch_info.shape(),
|
||||
self.dbg_path,
|
||||
);
|
||||
let item = LogItem::from_node(self.node_ix, Level::WARN, msg);
|
||||
log_items.push(item);
|
||||
}
|
||||
}
|
||||
}
|
||||
buf.advance(len as usize);
|
||||
parsed_bytes += len as u64;
|
||||
@@ -457,10 +513,11 @@ impl EventChunker {
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(ParseResult {
|
||||
let ret = ParseResult {
|
||||
events: ret,
|
||||
parsed_bytes,
|
||||
})
|
||||
};
|
||||
Ok((ret, log_items))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -472,6 +529,8 @@ impl Stream for EventChunker {
|
||||
'outer: loop {
|
||||
break if self.completed {
|
||||
panic!("EventChunker poll_next on completed");
|
||||
} else if let Some(item) = self.log_items.pop_front() {
|
||||
Ready(Some(Ok(StreamItem::Log(item))))
|
||||
} else if self.errored {
|
||||
self.completed = true;
|
||||
Ready(None)
|
||||
@@ -514,9 +573,11 @@ impl Stream for EventChunker {
|
||||
fcr.duration().as_millis()
|
||||
);
|
||||
}
|
||||
let r = self.parse_buf(fcr.buf_mut());
|
||||
match r {
|
||||
Ok(res) => {
|
||||
match self.parse_buf(fcr.buf_mut()) {
|
||||
Ok((res, log_items)) => {
|
||||
for item in log_items {
|
||||
self.log_items.push_back(item);
|
||||
}
|
||||
self.parsed_bytes += res.parsed_bytes;
|
||||
if fcr.buf().len() > 0 {
|
||||
// TODO gather stats about this:
|
||||
|
||||
@@ -7,7 +7,6 @@ use items_2::eventfull::EventFull;
|
||||
use items_2::merger::Merger;
|
||||
use netpod::log::*;
|
||||
use netpod::Cluster;
|
||||
use netpod::SfChFetchInfo;
|
||||
use query::api4::events::EventsSubQuery;
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
|
||||
Reference in New Issue
Block a user