WIP TODO propagate the error item

This commit is contained in:
Dominik Werder
2021-04-24 21:26:11 +02:00
parent a2047d292c
commit 832cc1747e
+91 -51
View File
@@ -466,7 +466,10 @@ pub struct EventChunker {
inp: NeedMinBuffer, inp: NeedMinBuffer,
polled: u32, polled: u32,
state: DataFileState, state: DataFileState,
need_min: u32,
channel_config: ChannelConfig, channel_config: ChannelConfig,
errored: bool,
completed: bool,
} }
enum DataFileState { enum DataFileState {
@@ -485,7 +488,10 @@ impl EventChunker {
inp: inp, inp: inp,
polled: 0, polled: 0,
state: DataFileState::FileHeader, state: DataFileState::FileHeader,
need_min: 6,
channel_config, channel_config,
errored: false,
completed: false,
} }
} }
@@ -499,12 +505,10 @@ impl EventChunker {
// what I've consumed from the buffer // what I've consumed from the buffer
// how many bytes I need min to make progress // how many bytes I need min to make progress
let mut ret = EventFull::empty(); let mut ret = EventFull::empty();
let mut need_min = 0 as u32;
use byteorder::{ReadBytesExt, BE}; use byteorder::{ReadBytesExt, BE};
error!(" ???????????????????????? Why should need_min ever be zero?");
info!("parse_buf buf len {} need_min {}", buf.len(), need_min);
loop { loop {
if (buf.len() as u32) < need_min { info!("parse_buf LOOP buf len {} need_min {}", buf.len(), self.need_min);
if (buf.len() as u32) < self.need_min {
break; break;
} }
match self.state { match self.state {
@@ -518,7 +522,7 @@ impl EventChunker {
let totlen = len as usize + 2; let totlen = len as usize + 2;
if buf.len() < totlen { if buf.len() < totlen {
info!("parse_buf not enough A totlen {}", totlen); info!("parse_buf not enough A totlen {}", totlen);
need_min = totlen as u32; self.need_min = totlen as u32;
break; break;
} else { } else {
sl.advance(len as usize - 8); sl.advance(len as usize - 8);
@@ -527,42 +531,49 @@ impl EventChunker {
let s1 = String::from_utf8(buf.as_ref()[6..(len as usize + 6 - 8)].to_vec()).unwrap(); let s1 = String::from_utf8(buf.as_ref()[6..(len as usize + 6 - 8)].to_vec()).unwrap();
info!("channel name {} len {} len2 {}", s1, len, len2); info!("channel name {} len {} len2 {}", s1, len, len2);
self.state = DataFileState::Event; self.state = DataFileState::Event;
need_min = 4; self.need_min = 4;
buf.advance(totlen); buf.advance(totlen);
} }
} }
DataFileState::Event => { DataFileState::Event => {
let mut sl = std::io::Cursor::new(buf.as_ref()); let mut sl = std::io::Cursor::new(buf.as_ref());
let len = sl.read_i32::<BE>().unwrap(); let len = sl.read_i32::<BE>().unwrap();
//info!("event len {}", len); assert!(len >= 20 && len < 1024 * 1024 * 10);
let len = len as u32;
info!(
"+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+- event len {}",
len,
);
if (buf.len() as u32) < 20 { if (buf.len() as u32) < 20 {
// TODO gather stats about how often we find not enough input // TODO gather stats about how often we find not enough input
//info!("parse_buf not enough B"); //info!("parse_buf not enough B");
need_min = len as u32; self.need_min = len as u32;
break; break;
} else if (buf.len() as u32) < len as u32 { } else if (buf.len() as u32) < len {
// TODO this is just for testing {
let mut sl = std::io::Cursor::new(buf.as_ref()); // TODO this is just for testing
sl.read_i32::<BE>().unwrap(); let mut sl = std::io::Cursor::new(buf.as_ref());
sl.read_i64::<BE>().unwrap(); sl.read_i32::<BE>().unwrap();
let _ts = sl.read_i64::<BE>().unwrap(); sl.read_i64::<BE>().unwrap();
//info!("parse_buf not enough C len {} have {} ts {}", len, buf.len(), ts); let ts = sl.read_i64::<BE>().unwrap();
need_min = len as u32; info!("parse_buf not enough C len {} have {} ts {}", len, buf.len(), ts);
}
self.need_min = len as u32;
break; break;
} else { } else {
let mut sl = std::io::Cursor::new(buf.as_ref()); let mut sl = std::io::Cursor::new(buf.as_ref());
let len1b = sl.read_i32::<BE>().unwrap(); let len1b = sl.read_i32::<BE>().unwrap();
assert!(len == len1b); assert!(len == len1b as u32);
sl.read_i64::<BE>().unwrap(); sl.read_i64::<BE>().unwrap();
let ts = sl.read_i64::<BE>().unwrap() as u64; let ts = sl.read_i64::<BE>().unwrap() as u64;
let pulse = sl.read_i64::<BE>().unwrap() as u64; let pulse = sl.read_i64::<BE>().unwrap() as u64;
sl.read_i64::<BE>().unwrap(); sl.read_i64::<BE>().unwrap();
let _status = sl.read_i8().unwrap(); let status = sl.read_i8().unwrap();
let _severity = sl.read_i8().unwrap(); let severity = sl.read_i8().unwrap();
let _optional = sl.read_i32::<BE>().unwrap(); let optional = sl.read_i32::<BE>().unwrap();
assert!(_status == 0); assert!(status == 0);
assert!(_severity == 0); assert!(severity == 0);
assert!(_optional == -1); assert!(optional == -1);
let type_flags = sl.read_u8().unwrap(); let type_flags = sl.read_u8().unwrap();
let type_index = sl.read_u8().unwrap(); let type_index = sl.read_u8().unwrap();
assert!(type_index <= 13); assert!(type_index <= 13);
@@ -604,7 +615,13 @@ impl EventChunker {
)))?; )))?;
} }
} }
_ => panic!(), Shape::Scalar => {
if is_array {
Err(Error::with_msg(format!(
"ChannelConfig expects Scalar but we find event is_array"
)))?;
}
}
} }
let decomp_bytes = (type_size * ele_count as u32) as usize; let decomp_bytes = (type_size * ele_count as u32) as usize;
let mut decomp = BytesMut::with_capacity(decomp_bytes); let mut decomp = BytesMut::with_capacity(decomp_bytes);
@@ -612,31 +629,36 @@ impl EventChunker {
decomp.set_len(decomp_bytes); decomp.set_len(decomp_bytes);
} }
//debug!("try decompress value_bytes {} ele_size {} ele_count {} type_index {}", value_bytes, ele_size, ele_count, type_index); //debug!("try decompress value_bytes {} ele_size {} ele_count {} type_index {}", value_bytes, ele_size, ele_count, type_index);
let c1 = bitshuffle_decompress( match bitshuffle_decompress(
&buf.as_ref()[p1 as usize..], &buf.as_ref()[p1 as usize..],
&mut decomp, &mut decomp,
ele_count as usize, ele_count as usize,
ele_size as usize, ele_size as usize,
0, 0,
) ) {
.unwrap(); Ok(c1) => {
//debug!("decompress result c1 {} k1 {}", c1, k1); assert!(c1 as u32 == k1);
assert!(c1 as u32 == k1); debug!("decompress result c1 {} k1 {}", c1, k1);
ret.add_event(ts, pulse, Some(decomp), ScalarType::from_dtype_index(type_index)); ret.add_event(ts, pulse, Some(decomp), ScalarType::from_dtype_index(type_index));
}
Err(e) => {
Err(Error::with_msg(format!("decompression failed {:?}", e)))?;
}
};
} else { } else {
todo!() Err(Error::with_msg(format!(
"TODO uncompressed event parsing not yet implemented"
)))?;
} }
info!("advance and reset need_min");
buf.advance(len as usize); buf.advance(len as usize);
need_min = 4; self.need_min = 4;
} }
} }
} }
//i1 += 1;
} }
Ok(ParseResult { info!("AFTER PARSE LOOP len {}", ret.tss.len());
events: ret, Ok(ParseResult { events: ret })
need_min: need_min,
})
} }
} }
@@ -662,7 +684,6 @@ fn type_size(ix: u8) -> u32 {
struct ParseResult { struct ParseResult {
events: EventFull, events: EventFull,
need_min: u32,
} }
impl Stream for EventChunker { impl Stream for EventChunker {
@@ -670,16 +691,25 @@ impl Stream for EventChunker {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
use Poll::*; use Poll::*;
if self.completed {
panic!("EventChunker poll_next on completed");
}
if self.errored {
self.completed = true;
return Ready(None);
}
self.polled += 1; self.polled += 1;
if self.polled >= 2000000 { if self.polled >= 20000 {
warn!("EventChunker poll limit reached"); warn!("EventChunker poll limit reached");
return Poll::Ready(None); self.errored = true;
return Poll::Ready(Some(Err(Error::with_msg(format!("EventChunker poll limit reached")))));
} }
let g = &mut self.inp; let g = &mut self.inp;
pin_mut!(g); pin_mut!(g);
info!("EventChunker call input poll_next");
match g.poll_next(cx) { match g.poll_next(cx) {
Ready(Some(Ok(mut buf))) => { Ready(Some(Ok(mut buf))) => {
//info!("EventChunker got buffer len {}", buf.len()); info!("EventChunker got buffer len {}", buf.len());
let r = self.parse_buf(&mut buf); let r = self.parse_buf(&mut buf);
match r { match r {
Ok(res) => { Ok(res) => {
@@ -688,22 +718,32 @@ impl Stream for EventChunker {
//info!("parse_buf returned {} leftover bytes to me", buf.len()); //info!("parse_buf returned {} leftover bytes to me", buf.len());
self.inp.put_back(buf); self.inp.put_back(buf);
} }
if res.need_min > 8000 { if self.need_min > 1024 * 8 {
warn!("spurious EventChunker asks for need_min {}", res.need_min); let msg = format!("spurious EventChunker asks for need_min {}", self.need_min);
Ready(Some(Err(Error::with_msg(format!( warn!("{}", msg);
"spurious EventChunker asks for need_min {}", self.errored = true;
res.need_min Ready(Some(Err(Error::with_msg(msg))))
)))))
} else { } else {
self.inp.set_need_min(res.need_min); let x = self.need_min;
self.inp.set_need_min(x);
Ready(Some(Ok(res.events))) Ready(Some(Ok(res.events)))
} }
} }
Err(e) => Ready(Some(Err(e.into()))), Err(e) => {
error!("EventChunker parse_buf returned error {:?}", e);
self.errored = true;
Ready(Some(Err(e.into())))
}
} }
} }
Ready(Some(Err(e))) => Ready(Some(Err(e))), Ready(Some(Err(e))) => {
Ready(None) => Ready(None), self.errored = true;
Ready(Some(Err(e)))
}
Ready(None) => {
self.completed = true;
Ready(None)
}
Pending => Pending, Pending => Pending,
} }
} }