diff --git a/crates/daqbuffer/Cargo.toml b/crates/daqbuffer/Cargo.toml index 381f916..412bbba 100644 --- a/crates/daqbuffer/Cargo.toml +++ b/crates/daqbuffer/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqbuffer" -version = "0.4.4" +version = "0.4.5-alpha.0" authors = ["Dominik Werder "] edition = "2021" diff --git a/crates/disk/src/cache.rs b/crates/disk/src/cache.rs index d1aeb38..1461c42 100644 --- a/crates/disk/src/cache.rs +++ b/crates/disk/src/cache.rs @@ -1,7 +1,6 @@ use chrono::Utc; use err::Error; use netpod::log::*; -use netpod::timeunits::SEC; use netpod::AggKind; use netpod::Cluster; use netpod::NodeConfigCached; diff --git a/crates/disk/src/eventblobs.rs b/crates/disk/src/eventblobs.rs index dc3d274..1c071c8 100644 --- a/crates/disk/src/eventblobs.rs +++ b/crates/disk/src/eventblobs.rs @@ -186,7 +186,7 @@ impl Stream for EventChunkerMultifile { let file = ofs.files.pop().unwrap(); let path = file.path; let msg = format!("handle OFS {:?}", ofs); - let item = LogItem::quick(Level::DEBUG, msg); + let item = LogItem::from_node(self.node_ix, Level::DEBUG, msg); match file.file { Some(file) => { let inp = Box::pin(crate::file_content_stream( @@ -200,6 +200,7 @@ impl Stream for EventChunkerMultifile { self.fetch_info.clone(), self.range.clone(), self.event_chunker_conf.clone(), + self.node_ix, path.clone(), self.expand, ); @@ -211,12 +212,12 @@ impl Stream for EventChunkerMultifile { Ready(Some(Ok(StreamItem::Log(item)))) } else if ofs.files.len() == 0 { let msg = format!("handle OFS {:?} NO FILES", ofs); - let item = LogItem::quick(Level::DEBUG, msg); + let item = LogItem::from_node(self.node_ix, Level::DEBUG, msg); Ready(Some(Ok(StreamItem::Log(item)))) } else { let paths: Vec<_> = ofs.files.iter().map(|x| &x.path).collect(); let msg = format!("handle OFS MERGED timebin {} {:?}", ofs.timebin, paths); - let item = LogItem::quick(Level::DEBUG, msg); + let item = LogItem::from_node(self.node_ix, Level::DEBUG, msg); let mut chunkers = Vec::new(); for of in ofs.files { if let Some(file) = of.file { @@ -231,6 +232,7 @@ impl Stream for EventChunkerMultifile { self.fetch_info.clone(), self.range.clone(), self.event_chunker_conf.clone(), + self.node_ix, of.path.clone(), self.expand, ); @@ -250,7 +252,8 @@ impl Stream for EventChunkerMultifile { }, Ready(None) => { self.done = true; - let item = LogItem::quick( + let item = LogItem::from_node( + self.node_ix, Level::DEBUG, format!( "EventChunkerMultifile used {} datafiles beg {} end {} node_ix {}", diff --git a/crates/disk/src/eventchunker.rs b/crates/disk/src/eventchunker.rs index 39155ab..7544d8d 100644 --- a/crates/disk/src/eventchunker.rs +++ b/crates/disk/src/eventchunker.rs @@ -5,6 +5,7 @@ use err::Error; use err::ThisError; use futures_util::Stream; use futures_util::StreamExt; +use items_0::streamitem::LogItem; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::StatsItem; use items_0::streamitem::StreamItem; @@ -23,6 +24,7 @@ use netpod::Shape; use parse::channelconfig::CompressionMethod; use serde::Deserialize; use serde::Serialize; +use std::collections::VecDeque; use std::io::Cursor; use std::path::PathBuf; use std::pin::Pin; @@ -37,25 +39,15 @@ pub enum DataParseError { DataFrameLengthMismatch, FileHeaderTooShort, BadVersionTag, - #[error("HeaderTooLarge")] HeaderTooLarge, - #[error("Utf8Error")] Utf8Error, - #[error("EventTooShort")] EventTooShort, - #[error("EventTooLong")] EventTooLong, - #[error("TooManyBeforeRange")] TooManyBeforeRange, - #[error("EventWithOptional")] EventWithOptional, - #[error("BadTypeIndex")] BadTypeIndex, - #[error("WaveShapeWithoutEventArray")] WaveShapeWithoutEventArray, - #[error("ShapedWithoutDims")] ShapedWithoutDims, - #[error("TooManyDims")] TooManyDims, UnknownCompression, BadCompresionBlockSize, @@ -76,6 +68,7 @@ pub struct EventChunker { data_emit_complete: bool, final_stats_sent: bool, parsed_bytes: u64, + node_ix: usize, dbg_path: PathBuf, last_ts: u64, expand: bool, @@ -87,6 +80,7 @@ pub struct EventChunker { repeated_ts_count: usize, config_mismatch_discard: usize, discard_count: usize, + log_items: VecDeque, } impl Drop for EventChunker { @@ -161,14 +155,15 @@ impl EventChunker { fetch_info: SfChFetchInfo, range: NanoRange, stats_conf: EventChunkerConf, + node_ix: usize, dbg_path: PathBuf, expand: bool, ) -> Self { - debug!("{}::{}", Self::self_name(), "from_start"); + debug!("{}::{} node {}", Self::self_name(), "from_start", node_ix); let need_min_max = match fetch_info.shape() { Shape::Scalar => 1024 * 8, Shape::Wave(_) => 1024 * 32, - Shape::Image(_, _) => 1024 * 1024 * 40, + Shape::Image(_, _) => 1024 * 1024 * 80, }; let mut inp = NeedMinBuffer::new(inp); inp.set_need_min(6); @@ -188,6 +183,7 @@ impl EventChunker { final_stats_sent: false, parsed_bytes: 0, dbg_path, + node_ix, last_ts: 0, expand, decomp_dt_histo: HistoLog2::new(8), @@ -198,6 +194,7 @@ impl EventChunker { repeated_ts_count: 0, config_mismatch_discard: 0, discard_count: 0, + log_items: VecDeque::new(), } } @@ -207,28 +204,30 @@ impl EventChunker { fetch_info: SfChFetchInfo, range: NanoRange, stats_conf: EventChunkerConf, + node_ix: usize, dbg_path: PathBuf, expand: bool, ) -> Self { - debug!("{}::{}", Self::self_name(), "from_event_boundary"); - let mut ret = Self::from_start(inp, fetch_info, range, stats_conf, dbg_path, expand); + debug!("{}::{} node {}", Self::self_name(), "from_event_boundary", node_ix); + let mut ret = Self::from_start(inp, fetch_info, range, stats_conf, node_ix, dbg_path, expand); ret.state = DataFileState::Event; ret.need_min = 4; ret.inp.set_need_min(4); ret } - fn parse_buf(&mut self, buf: &mut BytesMut) -> Result { + fn parse_buf(&mut self, buf: &mut BytesMut) -> Result<(ParseResult, Vec), Error> { span!(Level::INFO, "EventChunker::parse_buf") .in_scope(|| self.parse_buf_inner(buf)) .map_err(|e| Error::with_msg_no_trace(format!("{e:?}"))) } - fn parse_buf_inner(&mut self, buf: &mut BytesMut) -> Result { + fn parse_buf_inner(&mut self, buf: &mut BytesMut) -> Result<(ParseResult, Vec), DataParseError> { use byteorder::ReadBytesExt; use byteorder::BE; trace!("parse_buf_inner buf len {}", buf.len()); let mut ret = EventFull::empty(); + let mut log_items = Vec::new(); let mut parsed_bytes = 0; loop { if (buf.len() as u32) < self.need_min { @@ -274,9 +273,9 @@ impl EventChunker { return Err(DataParseError::EventTooShort); } match self.fetch_info.shape() { - Shape::Scalar if len > 512 => return Err(DataParseError::EventTooLong), - Shape::Wave(_) if len > 8 * 1024 * 256 => return Err(DataParseError::EventTooLong), - Shape::Image(_, _) if len > 1024 * 1024 * 40 => return Err(DataParseError::EventTooLong), + Shape::Scalar if len > 1000 => return Err(DataParseError::EventTooLong), + Shape::Wave(_) if len > 500000 * 8 => return Err(DataParseError::EventTooLong), + Shape::Image(_, _) if len > 3200 * 3200 * 8 => return Err(DataParseError::EventTooLong), _ => {} } let len = len as u32; @@ -405,6 +404,8 @@ impl EventChunker { for i1 in 0..shape_dim { shape_lens[i1 as usize] = sl.read_u32::().unwrap(); } + // NOTE the databuffer does not fill in this correctly, the data on disk + // contains often just "scalar" even though it is a waveform. let shape_this = { if is_shaped { if shape_dim == 1 { @@ -433,6 +434,32 @@ impl EventChunker { } else { None }; + if self.fetch_info.scalar_type().ne(&scalar_type) { + discard = true; + let msg = format!( + "scalar_type mismatch {:?} {:?} {:?}", + scalar_type, + self.fetch_info.scalar_type(), + self.dbg_path, + ); + let item = LogItem::from_node(self.node_ix, Level::WARN, msg); + log_items.push(item); + } + if false { + // Unfortunately the shape stored by databuffer recording on disk is not reliable + // especially for waveforms it will wrongly indicate scalar. So this is unusable. + if self.fetch_info.shape().ne(&shape_this) { + discard = true; + let msg = format!( + "shape mismatch {:?} {:?} {:?}", + shape_this, + self.fetch_info.shape(), + self.dbg_path, + ); + let item = LogItem::from_node(self.node_ix, Level::WARN, msg); + log_items.push(item); + } + } let p1 = sl.position(); let n1 = p1 - p0; let n2 = len as u64 - n1 - 4; @@ -449,6 +476,35 @@ impl EventChunker { shape_this, comp_this, ); + match ret.shape_derived(ret.len() - 1, self.fetch_info.shape()) { + Ok(sh) => { + if sh.ne(self.fetch_info.shape()) { + self.discard_count += 1; + ret.pop_back(); + let msg = format!( + "shape_derived mismatch {:?} {:?} {:?} {:?}", + self.fetch_info.scalar_type(), + self.fetch_info.shape(), + sh, + self.dbg_path, + ); + let item = LogItem::from_node(self.node_ix, Level::WARN, msg); + log_items.push(item); + } + } + Err(_) => { + self.discard_count += 1; + ret.pop_back(); + let msg = format!( + "shape_derived error {:?} {:?} {:?}", + self.fetch_info.scalar_type(), + self.fetch_info.shape(), + self.dbg_path, + ); + let item = LogItem::from_node(self.node_ix, Level::WARN, msg); + log_items.push(item); + } + } } buf.advance(len as usize); parsed_bytes += len as u64; @@ -457,10 +513,11 @@ impl EventChunker { } } } - Ok(ParseResult { + let ret = ParseResult { events: ret, parsed_bytes, - }) + }; + Ok((ret, log_items)) } } @@ -472,6 +529,8 @@ impl Stream for EventChunker { 'outer: loop { break if self.completed { panic!("EventChunker poll_next on completed"); + } else if let Some(item) = self.log_items.pop_front() { + Ready(Some(Ok(StreamItem::Log(item)))) } else if self.errored { self.completed = true; Ready(None) @@ -514,9 +573,11 @@ impl Stream for EventChunker { fcr.duration().as_millis() ); } - let r = self.parse_buf(fcr.buf_mut()); - match r { - Ok(res) => { + match self.parse_buf(fcr.buf_mut()) { + Ok((res, log_items)) => { + for item in log_items { + self.log_items.push_back(item); + } self.parsed_bytes += res.parsed_bytes; if fcr.buf().len() > 0 { // TODO gather stats about this: diff --git a/crates/disk/src/merge/mergedblobsfromremotes.rs b/crates/disk/src/merge/mergedblobsfromremotes.rs index 94f1339..3dbb7c8 100644 --- a/crates/disk/src/merge/mergedblobsfromremotes.rs +++ b/crates/disk/src/merge/mergedblobsfromremotes.rs @@ -7,7 +7,6 @@ use items_2::eventfull::EventFull; use items_2::merger::Merger; use netpod::log::*; use netpod::Cluster; -use netpod::SfChFetchInfo; use query::api4::events::EventsSubQuery; use std::future::Future; use std::pin::Pin; diff --git a/crates/dq/src/bin/dq.rs b/crates/dq/src/bin/dq.rs index 415bfb3..80bdcfa 100644 --- a/crates/dq/src/bin/dq.rs +++ b/crates/dq/src/bin/dq.rs @@ -94,7 +94,7 @@ pub fn main() -> Result<(), Error> { let stats_conf = EventChunkerConf { disk_stats_every: ByteSize::from_mb(2), }; - let _chunks = EventChunker::from_start(inp, fetch_info, range, stats_conf, path.clone(), true); + let _chunks = EventChunker::from_start(inp, fetch_info, range, stats_conf, 0, path.clone(), true); err::todo(); Ok(()) } diff --git a/crates/httpret/src/api1.rs b/crates/httpret/src/api1.rs index b0398e4..3bc8da2 100644 --- a/crates/httpret/src/api1.rs +++ b/crates/httpret/src/api1.rs @@ -638,7 +638,7 @@ impl DataApiPython3DataStream { _ => { if do_decompress { let blob = b - .data_decompressed(i1, fetch_info.scalar_type(), fetch_info.shape()) + .data_decompressed(i1) .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; let l1 = 17 + blob.len() as u32; d.put_u32(l1); @@ -720,8 +720,8 @@ impl DataApiPython3DataStream { } Ok(BytesMut::new()) } - StreamItem::Stats(k) => { - // + StreamItem::Stats(_k) => { + // TODO collect the stats Ok(BytesMut::new()) } }, diff --git a/crates/httpret/src/api4/databuffer_tools.rs b/crates/httpret/src/api4/databuffer_tools.rs index 66d665b..6a22d90 100644 --- a/crates/httpret/src/api4/databuffer_tools.rs +++ b/crates/httpret/src/api4/databuffer_tools.rs @@ -86,7 +86,7 @@ impl FindActiveHandler { .headers() .get(http::header::ACCEPT) .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); - let url = { + let _url = { let s1 = format!("dummy:{}", req.uri()); Url::parse(&s1)? }; @@ -175,7 +175,7 @@ impl XorShift32 { async fn find_active_inner( max: usize, ks: u32, - splits: &[u64], + _splits: &[u64], node: Node, tx: Sender>, ) -> Result<(), FindActiveError> { @@ -240,7 +240,9 @@ async fn find_active_inner( name: chname.into(), totlen: sum, }; - tx.send(Ok(x)).await; + if let Err(e) = tx.send(Ok(x)).await { + error!("{e}"); + } count += 1; if count >= max { break 'outer; @@ -275,7 +277,9 @@ async fn find_active( match find_active_inner(max, ks, &splits, node, tx).await { Ok(x) => x, Err(e) => { - tx2.send(Err(e)).await; + if let Err(e) = tx2.send(Err(e)).await { + error!("{e}"); + } return; } } diff --git a/crates/httpret/src/api4/events.rs b/crates/httpret/src/api4/events.rs index 7a556c1..0a9fea6 100644 --- a/crates/httpret/src/api4/events.rs +++ b/crates/httpret/src/api4/events.rs @@ -1,4 +1,4 @@ -use crate::channelconfig::chconf_from_events_v1; +use crate::channelconfig::chconf_from_events_quorum; use crate::err::Error; use crate::response; use crate::response_err; @@ -73,8 +73,8 @@ async fn plain_events_binary( ) -> Result, Error> { debug!("{:?}", req); let query = PlainEventsQuery::from_url(&url).map_err(|e| e.add_public_msg(format!("Can not understand query")))?; - let ch_conf = chconf_from_events_v1(&query, node_config).await?; - info!("plain_events_binary chconf_from_events_v1: {ch_conf:?}"); + let ch_conf = chconf_from_events_quorum(&query, node_config).await?; + info!("plain_events_binary chconf_from_events_quorum: {ch_conf:?}"); let s = stream::iter([Ok::<_, Error>(String::from("TODO_PREBINNED_BINARY_STREAM"))]); let ret = response(StatusCode::OK).body(Body::wrap_stream(s.map_err(Error::from)))?; Ok(ret) @@ -91,11 +91,11 @@ async fn plain_events_json( let query = PlainEventsQuery::from_url(&url)?; info!("plain_events_json query {query:?}"); // TODO handle None case better and return 404 - let ch_conf = chconf_from_events_v1(&query, node_config) + let ch_conf = chconf_from_events_quorum(&query, node_config) .await .map_err(Error::from)? .ok_or_else(|| Error::with_msg_no_trace("channel not found"))?; - info!("plain_events_json chconf_from_events_v1: {ch_conf:?}"); + info!("plain_events_json chconf_from_events_quorum: {ch_conf:?}"); let item = streams::plaineventsjson::plain_events_json(&query, ch_conf, reqid, &node_config.node_config.cluster).await; let item = match item { diff --git a/crates/httpret/src/channelconfig.rs b/crates/httpret/src/channelconfig.rs index 00c4d55..fd8e219 100644 --- a/crates/httpret/src/channelconfig.rs +++ b/crates/httpret/src/channelconfig.rs @@ -31,7 +31,7 @@ use serde::Serialize; use std::collections::BTreeMap; use url::Url; -pub async fn chconf_from_events_v1( +pub async fn chconf_from_events_quorum( q: &PlainEventsQuery, ncc: &NodeConfigCached, ) -> Result, Error> { diff --git a/crates/items_0/src/streamitem.rs b/crates/items_0/src/streamitem.rs index fb086f4..3b099d0 100644 --- a/crates/items_0/src/streamitem.rs +++ b/crates/items_0/src/streamitem.rs @@ -61,11 +61,11 @@ pub struct LogItem { } impl LogItem { - pub fn quick(level: Level, msg: String) -> Self { + pub fn from_node(node_ix: usize, level: Level, msg: String) -> Self { Self { + node_ix: node_ix as _, level, msg, - node_ix: 42, } } } diff --git a/crates/items_2/src/channelevents.rs b/crates/items_2/src/channelevents.rs index 3229978..6ea81ac 100644 --- a/crates/items_2/src/channelevents.rs +++ b/crates/items_2/src/channelevents.rs @@ -155,15 +155,23 @@ impl AsAnyMut for ChannelEvents { } mod serde_channel_events { - use super::{ChannelEvents, Events}; + use super::ChannelEvents; + use super::Events; use crate::channelevents::ConnStatusEvent; use crate::eventsdim0::EventsDim0; use crate::eventsdim1::EventsDim1; use crate::eventsxbindim0::EventsXbinDim0; use items_0::subfr::SubFrId; - use serde::de::{self, EnumAccess, VariantAccess, Visitor}; + use netpod::log::*; + use serde::de; + use serde::de::EnumAccess; + use serde::de::VariantAccess; + use serde::de::Visitor; use serde::ser::SerializeSeq; - use serde::{Deserialize, Deserializer, Serialize, Serializer}; + use serde::Deserialize; + use serde::Deserializer; + use serde::Serialize; + use serde::Serializer; use std::fmt; struct EvRef<'a>(&'a dyn Events); @@ -202,10 +210,10 @@ mod serde_channel_events { where A: de::SeqAccess<'de>, { - let e0: &str = seq.next_element()?.ok_or_else(|| de::Error::missing_field("[0] cty"))?; - let e1: u32 = seq.next_element()?.ok_or_else(|| de::Error::missing_field("[1] nty"))?; - if e0 == EventsDim0::::serde_id() { - match e1 { + let cty: &str = seq.next_element()?.ok_or_else(|| de::Error::missing_field("[0] cty"))?; + let nty: u32 = seq.next_element()?.ok_or_else(|| de::Error::missing_field("[1] nty"))?; + if cty == EventsDim0::::serde_id() { + match nty { u8::SUB => { let obj: EventsDim0 = seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?; @@ -261,10 +269,18 @@ mod serde_channel_events { seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } - _ => Err(de::Error::custom(&format!("unknown nty {e1}"))), + _ => { + error!("TODO serde cty {cty} nty {nty}"); + Err(de::Error::custom(&format!("unknown nty {nty}"))) + } } - } else if e0 == EventsDim1::::serde_id() { - match e1 { + } else if cty == EventsDim1::::serde_id() { + match nty { + i64::SUB => { + let obj: EventsDim1 = + seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?; + Ok(EvBox(Box::new(obj))) + } f32::SUB => { let obj: EventsDim1 = seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?; @@ -280,10 +296,13 @@ mod serde_channel_events { seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } - _ => Err(de::Error::custom(&format!("unknown nty {e1}"))), + _ => { + error!("TODO serde cty {cty} nty {nty}"); + Err(de::Error::custom(&format!("unknown nty {nty}"))) + } } - } else if e0 == EventsXbinDim0::::serde_id() { - match e1 { + } else if cty == EventsXbinDim0::::serde_id() { + match nty { f32::SUB => { let obj: EventsXbinDim0 = seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?; @@ -299,10 +318,14 @@ mod serde_channel_events { seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } - _ => Err(de::Error::custom(&format!("unknown nty {e1}"))), + _ => { + error!("TODO serde cty {cty} nty {nty}"); + Err(de::Error::custom(&format!("unknown nty {nty}"))) + } } } else { - Err(de::Error::custom(&format!("unknown cty {e0}"))) + error!("TODO serde cty {cty} nty {nty}"); + Err(de::Error::custom(&format!("unknown cty {cty}"))) } } } diff --git a/crates/items_2/src/eventfull.rs b/crates/items_2/src/eventfull.rs index 68f39af..d2a6173 100644 --- a/crates/items_2/src/eventfull.rs +++ b/crates/items_2/src/eventfull.rs @@ -126,6 +126,16 @@ impl EventFull { *u = shape.clone(); } } + + pub fn pop_back(&mut self) { + self.tss.pop_back(); + self.pulses.pop_back(); + self.blobs.pop_back(); + self.scalar_types.pop_back(); + self.be.pop_back(); + self.shapes.pop_back(); + self.comps.pop_back(); + } } impl FrameTypeInnerStatic for EventFull { @@ -230,9 +240,11 @@ pub enum DecompError { BadCompresionBlockSize, UnusedBytes, BitshuffleError, + ShapeMakesNoSense, } -fn decompress(databuf: &[u8], type_size: u32, ele_count_2: u64, ele_count_exp: u64) -> Result, DecompError> { +fn decompress(databuf: &[u8], type_size: u32) -> Result, DecompError> { + // TODO collect decompression stats let ts1 = Instant::now(); if databuf.len() < 12 { return Err(DecompError::TooLittleInput); @@ -276,22 +288,93 @@ fn decompress(databuf: &[u8], type_size: u32, ele_count_2: u64, ele_count_exp: u } impl EventFull { + /// Tries to infer the actual shape of the event from what's on disk and what we expect. + /// The event data on disk usually always indicate "scalar" even for waveforms. + /// If the data is compressed via bslz4 then we can infer the number of elements + /// but we still don't know whether that's an image or a waveform. + /// Therefore, the function accepts the expected shape to at least make an assumption + /// about whether this is an image or a waveform. + pub fn shape_derived(&self, i: usize, shape_exp: &Shape) -> Result { + match shape_exp { + Shape::Scalar => match &self.comps[i] { + Some(_) => Err(DecompError::ShapeMakesNoSense), + None => Ok(Shape::Scalar), + }, + Shape::Wave(_) => match &self.shapes[i] { + Shape::Scalar => match &self.comps[i] { + Some(comp) => match comp { + CompressionMethod::BitshuffleLZ4 => { + let type_size = self.scalar_types[i].bytes() as u32; + match self.blobs[i][0..8].try_into() { + Ok(a) => { + let value_bytes = u64::from_be_bytes(a); + let value_bytes = value_bytes as u32; + if value_bytes % type_size != 0 { + Err(DecompError::ShapeMakesNoSense) + } else { + let n = value_bytes / type_size; + // Here we still can't know whether the disk contains a waveform or image + // so we assume that the user input is correct: + Ok(Shape::Wave(n)) + } + } + Err(_) => Err(DecompError::ShapeMakesNoSense), + } + } + }, + None => Err(DecompError::ShapeMakesNoSense), + }, + Shape::Wave(s) => Ok(Shape::Wave(s.clone())), + Shape::Image(_, _) => Err(DecompError::ShapeMakesNoSense), + }, + Shape::Image(a, b) => match &self.shapes[i] { + Shape::Scalar => match &self.comps[i] { + Some(comp) => match comp { + CompressionMethod::BitshuffleLZ4 => { + let type_size = self.scalar_types[i].bytes() as u32; + match self.blobs[i][0..8].try_into() { + Ok(vb) => { + let value_bytes = u64::from_be_bytes(vb); + let value_bytes = value_bytes as u32; + if value_bytes % type_size != 0 { + Err(DecompError::ShapeMakesNoSense) + } else { + let n = value_bytes / type_size; + // Here we still can't know whether the disk contains a waveform or image + // so we assume that the user input is correct. + // NOTE + // We only know the number of pixels from the compressed blob but we can't + // know the actual shape. + // Can only rely on user input. + Ok(Shape::Image(*a, *b)) + } + } + Err(_) => Err(DecompError::ShapeMakesNoSense), + } + } + }, + None => Err(DecompError::ShapeMakesNoSense), + }, + Shape::Wave(_) => Err(DecompError::ShapeMakesNoSense), + Shape::Image(a, b) => Ok(Shape::Image(*a, *b)), + }, + } + } + pub fn data_raw(&self, i: usize) -> &[u8] { &self.blobs[i] } - pub fn data_decompressed( - &self, - i: usize, - _scalar_type: &ScalarType, - shape: &Shape, - ) -> Result, DecompError> { + pub fn data_decompressed(&self, i: usize) -> Result, DecompError> { if let Some(comp) = &self.comps[i] { match comp { CompressionMethod::BitshuffleLZ4 => { + // NOTE the event data on databuffer disk seems to contain the correct scalar type + // but the shape of the event record seems always "scalar" even for waveforms + // so we must derive the shape of the compressed data from the length of the + // uncompressed byte blob and the byte size of the scalar type. let type_size = self.scalar_types[i].bytes() as u32; - let ele_count = self.shapes[i].ele_count(); - let data = decompress(&self.blobs[i], type_size, ele_count, shape.ele_count())?; + let data = decompress(&self.blobs[i], type_size)?; Ok(Cow::Owned(data)) } } diff --git a/crates/items_2/src/eventsdim0.rs b/crates/items_2/src/eventsdim0.rs index 817c2d7..4f0d0be 100644 --- a/crates/items_2/src/eventsdim0.rs +++ b/crates/items_2/src/eventsdim0.rs @@ -228,15 +228,19 @@ where #[derive(Debug)] pub struct EventsDim0Collector { - vals: Option>, + vals: EventsDim0, range_final: bool, timed_out: bool, } impl EventsDim0Collector { + pub fn self_name() -> &'static str { + any::type_name::() + } + pub fn new() -> Self { Self { - vals: None, + vals: EventsDim0::empty(), range_final: false, timed_out: false, } @@ -245,7 +249,7 @@ impl EventsDim0Collector { impl WithLen for EventsDim0Collector { fn len(&self) -> usize { - self.vals.as_ref().map_or(0, |x| x.tss.len()) + self.vals.tss.len() } } @@ -367,13 +371,9 @@ impl CollectorType for EventsDim0Collector { type Output = EventsDim0CollectorOutput; fn ingest(&mut self, src: &mut Self::Input) { - if self.vals.is_none() { - self.vals = Some(EventsDim0::empty()); - } - let vals = self.vals.as_mut().unwrap(); - vals.tss.append(&mut src.tss); - vals.pulses.append(&mut src.pulses); - vals.values.append(&mut src.values); + self.vals.tss.append(&mut src.tss); + self.vals.pulses.append(&mut src.pulses); + self.vals.values.append(&mut src.values); } fn set_range_complete(&mut self) { @@ -389,17 +389,12 @@ impl CollectorType for EventsDim0Collector { range: Option, _binrange: Option, ) -> Result { - let self_name = any::type_name::(); // If we timed out, we want to hint the client from where to continue. // This is tricky: currently, client can not request a left-exclusive range. // We currently give the timestamp of the last event plus a small delta. // The amount of the delta must take into account what kind of timestamp precision the client // can parse and handle. - let vals = if let Some(x) = &mut self.vals { - x - } else { - return Err(Error::with_msg_no_trace(format!("{self_name} no vals"))); - }; + let vals = &mut self.vals; let continue_at = if self.timed_out { if let Some(ts) = vals.tss.back() { Some(IsoDateTime::from_u64(*ts + MS)) diff --git a/crates/items_2/src/eventsdim1.rs b/crates/items_2/src/eventsdim1.rs index f0bc690..3a548ec 100644 --- a/crates/items_2/src/eventsdim1.rs +++ b/crates/items_2/src/eventsdim1.rs @@ -29,6 +29,7 @@ use items_0::WithLen; use netpod::is_false; use netpod::log::*; use netpod::range::evrange::SeriesRange; +use netpod::timeunits::MS; use netpod::timeunits::SEC; use netpod::BinnedRangeEnum; use serde::Deserialize; @@ -184,13 +185,17 @@ where } #[derive(Debug)] -pub struct EventsDim1Collector { - vals: EventsDim1, +pub struct EventsDim1Collector { + vals: EventsDim1, range_final: bool, timed_out: bool, } -impl EventsDim1Collector { +impl EventsDim1Collector { + pub fn self_name() -> &'static str { + any::type_name::() + } + pub fn new() -> Self { Self { vals: EventsDim1::empty(), @@ -200,14 +205,14 @@ impl EventsDim1Collector { } } -impl WithLen for EventsDim1Collector { +impl WithLen for EventsDim1Collector { fn len(&self) -> usize { self.vals.tss.len() } } #[derive(Debug, Serialize, Deserialize)] -pub struct EventsDim1CollectorOutput { +pub struct EventsDim1CollectorOutput { #[serde(rename = "tsAnchor")] ts_anchor_sec: u64, #[serde(rename = "tsMs")] @@ -219,7 +224,7 @@ pub struct EventsDim1CollectorOutput { #[serde(rename = "pulseOff")] pulse_off: VecDeque, #[serde(rename = "values")] - values: VecDeque>, + values: VecDeque>, #[serde(rename = "rangeFinal", default, skip_serializing_if = "is_false")] range_final: bool, #[serde(rename = "timedOut", default, skip_serializing_if = "is_false")] @@ -228,7 +233,7 @@ pub struct EventsDim1CollectorOutput { continue_at: Option, } -impl EventsDim1CollectorOutput { +impl EventsDim1CollectorOutput { pub fn ts_anchor_sec(&self) -> u64 { self.ts_anchor_sec } @@ -253,13 +258,40 @@ impl EventsDim1CollectorOutput { .collect() } - pub fn range_complete(&self) -> bool { + pub fn range_final(&self) -> bool { self.range_final } pub fn timed_out(&self) -> bool { self.timed_out } + + pub fn is_valid(&self) -> bool { + if self.ts_off_ms.len() != self.ts_off_ns.len() { + false + } else if self.ts_off_ms.len() != self.pulse_off.len() { + false + } else if self.ts_off_ms.len() != self.values.len() { + false + } else { + true + } + } + + pub fn info_str(&self) -> String { + use fmt::Write; + let mut out = String::new(); + write!( + out, + "ts_off_ms {} ts_off_ns {} pulse_off {} values {}", + self.ts_off_ms.len(), + self.ts_off_ns.len(), + self.pulse_off.len(), + self.values.len(), + ) + .unwrap(); + out + } } impl AsAnyRef for EventsDim1CollectorOutput @@ -295,9 +327,9 @@ impl ToJsonResult for EventsDim1CollectorOutput { impl Collected for EventsDim1CollectorOutput {} -impl CollectorType for EventsDim1Collector { - type Input = EventsDim1; - type Output = EventsDim1CollectorOutput; +impl CollectorType for EventsDim1Collector { + type Input = EventsDim1; + type Output = EventsDim1CollectorOutput; fn ingest(&mut self, src: &mut Self::Input) { self.vals.tss.append(&mut src.tss); @@ -313,6 +345,7 @@ impl CollectorType for EventsDim1Collector { self.timed_out = true; } + // TODO unify with dim0 case fn result( &mut self, range: Option, @@ -323,12 +356,19 @@ impl CollectorType for EventsDim1Collector { // We currently give the timestamp of the last event plus a small delta. // The amount of the delta must take into account what kind of timestamp precision the client // can parse and handle. - /*let continue_at = if self.timed_out { - if let Some(ts) = self.vals.tss.back() { - Some(IsoDateTime::from_u64(*ts + netpod::timeunits::MS)) + let vals = &mut self.vals; + let continue_at = if self.timed_out { + if let Some(ts) = vals.tss.back() { + Some(IsoDateTime::from_u64(*ts + MS)) } else { if let Some(range) = &range { - Some(IsoDateTime::from_u64(range.beg + netpod::timeunits::SEC)) + match range { + SeriesRange::TimeRange(x) => Some(IsoDateTime::from_u64(x.beg + SEC)), + SeriesRange::PulseRange(x) => { + error!("TODO emit create continueAt for pulse range"); + None + } + } } else { warn!("can not determine continue-at parameters"); None @@ -337,28 +377,40 @@ impl CollectorType for EventsDim1Collector { } else { None }; - let tss_sl = self.vals.tss.make_contiguous(); - let pulses_sl = self.vals.pulses.make_contiguous(); + let tss_sl = vals.tss.make_contiguous(); + let pulses_sl = vals.pulses.make_contiguous(); let (ts_anchor_sec, ts_off_ms, ts_off_ns) = crate::ts_offs_from_abs(tss_sl); let (pulse_anchor, pulse_off) = crate::pulse_offs_from_abs(pulses_sl); + let values = mem::replace(&mut vals.values, VecDeque::new()); + if ts_off_ms.len() != ts_off_ns.len() { + return Err(Error::with_msg_no_trace("collected len mismatch")); + } + if ts_off_ms.len() != pulse_off.len() { + return Err(Error::with_msg_no_trace("collected len mismatch")); + } + if ts_off_ms.len() != values.len() { + return Err(Error::with_msg_no_trace("collected len mismatch")); + } let ret = Self::Output { ts_anchor_sec, ts_off_ms, ts_off_ns, pulse_anchor, - pulse_off: pulse_off, - values: mem::replace(&mut self.vals.values, VecDeque::new()), + pulse_off, + values, range_final: self.range_final, timed_out: self.timed_out, continue_at, }; - Ok(ret)*/ - todo!() + if !ret.is_valid() { + error!("invalid:\n{}", ret.info_str()); + } + Ok(ret) } } -impl CollectableType for EventsDim1 { - type Collector = EventsDim1Collector; +impl CollectableType for EventsDim1 { + type Collector = EventsDim1Collector; fn new_collector() -> Self::Collector { Self::Collector::new() diff --git a/crates/items_2/src/frame.rs b/crates/items_2/src/frame.rs index aef00dd..03b52a0 100644 --- a/crates/items_2/src/frame.rs +++ b/crates/items_2/src/frame.rs @@ -64,7 +64,7 @@ where ser } -pub fn bincode_to_vec(item: S) -> Result, Error> +fn bincode_to_vec(item: S) -> Result, Error> where S: Serialize, { @@ -74,7 +74,7 @@ where Ok(out) } -pub fn bincode_from_slice(buf: &[u8]) -> Result +fn bincode_from_slice(buf: &[u8]) -> Result where T: for<'de> serde::Deserialize<'de>, { @@ -87,14 +87,14 @@ where ::deserialize(&mut de).map_err(|e| format!("{e}").into()) } -pub fn msgpack_to_vec(item: T) -> Result, Error> +fn msgpack_to_vec(item: T) -> Result, Error> where T: Serialize, { rmp_serde::to_vec_named(&item).map_err(|e| format!("{e}").into()) } -pub fn msgpack_erased_to_vec(item: T) -> Result, Error> +fn msgpack_erased_to_vec(item: T) -> Result, Error> where T: erased_serde::Serialize, { @@ -106,21 +106,21 @@ where Ok(out) } -pub fn msgpack_from_slice(buf: &[u8]) -> Result +fn msgpack_from_slice(buf: &[u8]) -> Result where T: for<'de> serde::Deserialize<'de>, { rmp_serde::from_slice(buf).map_err(|e| format!("{e}").into()) } -pub fn postcard_to_vec(item: T) -> Result, Error> +fn postcard_to_vec(item: T) -> Result, Error> where T: Serialize, { postcard::to_stdvec(&item).map_err(|e| format!("{e}").into()) } -pub fn postcard_erased_to_vec(item: T) -> Result, Error> +fn postcard_erased_to_vec(item: T) -> Result, Error> where T: erased_serde::Serialize, { @@ -146,24 +146,37 @@ pub fn encode_to_vec(item: T) -> Result, Error> where T: Serialize, { - // msgpack_to_vec(item) - postcard_to_vec(item) + if false { + msgpack_to_vec(item) + } else if false { + bincode_to_vec(item) + } else { + postcard_to_vec(item) + } } pub fn encode_erased_to_vec(item: T) -> Result, Error> where T: erased_serde::Serialize, { - // msgpack_erased_to_vec(item) - postcard_erased_to_vec(item) + if false { + msgpack_erased_to_vec(item) + } else { + postcard_erased_to_vec(item) + } } pub fn decode_from_slice(buf: &[u8]) -> Result where T: for<'de> serde::Deserialize<'de>, { - // msgpack_from_slice(buf) - postcard_from_slice(buf) + if false { + msgpack_from_slice(buf) + } else if false { + bincode_from_slice(buf) + } else { + postcard_from_slice(buf) + } } pub fn make_frame_2(item: T, fty: u32) -> Result @@ -321,11 +334,7 @@ where let k: err::Error = match decode_from_slice(frame.buf()) { Ok(item) => item, Err(e) => { - error!( - "ERROR deserialize len {} ERROR_FRAME_TYPE_ID {}", - frame.buf().len(), - e - ); + error!("deserialize len {} ERROR_FRAME_TYPE_ID {}", frame.buf().len(), e); let n = frame.buf().len().min(256); let s = String::from_utf8_lossy(&frame.buf()[..n]); error!("frame.buf as string: {:?}", s); @@ -337,7 +346,7 @@ where let k: LogItem = match decode_from_slice(frame.buf()) { Ok(item) => item, Err(e) => { - error!("ERROR deserialize len {} LOG_FRAME_TYPE_ID {}", frame.buf().len(), e); + error!("deserialize len {} LOG_FRAME_TYPE_ID {}", frame.buf().len(), e); let n = frame.buf().len().min(128); let s = String::from_utf8_lossy(&frame.buf()[..n]); error!("frame.buf as string: {:?}", s); @@ -349,11 +358,7 @@ where let k: StatsItem = match decode_from_slice(frame.buf()) { Ok(item) => item, Err(e) => { - error!( - "ERROR deserialize len {} STATS_FRAME_TYPE_ID {}", - frame.buf().len(), - e - ); + error!("deserialize len {} STATS_FRAME_TYPE_ID {}", frame.buf().len(), e); let n = frame.buf().len().min(128); let s = String::from_utf8_lossy(&frame.buf()[..n]); error!("frame.buf as string: {:?}", s); @@ -368,7 +373,7 @@ where let tyid = T::FRAME_TYPE_ID; if frame.tyid() != tyid { Err(Error::with_msg(format!( - "type id mismatch expect {:x} found {:x} {:?}", + "type id mismatch expect {:04x} found {:04x} {:?}", tyid, frame.tyid(), frame @@ -377,11 +382,16 @@ where match decode_from_slice(frame.buf()) { Ok(item) => Ok(item), Err(e) => { - error!("decode_frame T = {}", any::type_name::()); - error!("ERROR deserialize len {} tyid {:x}", frame.buf().len(), frame.tyid()); + error!( + "decode_from_slice error len {} tyid {:04x} T {}", + frame.buf().len(), + frame.tyid(), + any::type_name::() + ); let n = frame.buf().len().min(64); let s = String::from_utf8_lossy(&frame.buf()[..n]); - error!("frame.buf as string: {:?}", s); + error!("decode_from_slice bad frame.buf as bytes: {:?}", &frame.buf()[..n]); + error!("decode_from_slice bad frame.buf as string: {:?}", s); Err(e)? } } diff --git a/crates/nodenet/src/conn.rs b/crates/nodenet/src/conn.rs index 4f9de14..5e4a823 100644 --- a/crates/nodenet/src/conn.rs +++ b/crates/nodenet/src/conn.rs @@ -160,7 +160,11 @@ pub async fn create_response_bytes_stream( evq: EventsSubQuery, ncc: &NodeConfigCached, ) -> Result { - debug!("create_response_bytes_stream {:?}", evq.ch_conf().scalar_type()); + debug!( + "create_response_bytes_stream {:?} {:?}", + evq.ch_conf().scalar_type(), + evq.ch_conf().shape(), + ); debug!("wasm1 {:?}", evq.wasm1()); let reqctx = netpod::ReqCtx::new(evq.reqid()); if evq.create_errors_contains("nodenet_parse_query") { diff --git a/crates/streams/Cargo.toml b/crates/streams/Cargo.toml index 46f0f06..ddd463f 100644 --- a/crates/streams/Cargo.toml +++ b/crates/streams/Cargo.toml @@ -25,7 +25,6 @@ query = { path = "../query" } items_0 = { path = "../items_0" } items_2 = { path = "../items_2" } parse = { path = "../parse" } -bitshuffle = { path = "../bitshuffle" } httpclient = { path = "../httpclient" } [dev-dependencies] diff --git a/crates/streams/src/frames/eventsfromframes.rs b/crates/streams/src/frames/eventsfromframes.rs index 6a4410e..8f1e43c 100644 --- a/crates/streams/src/frames/eventsfromframes.rs +++ b/crates/streams/src/frames/eventsfromframes.rs @@ -77,7 +77,12 @@ where } }, Err(e) => { - error!("frame payload len {} tyid {} {}", frame.buf().len(), frame.tyid(), e); + error!( + "frame payload len {} tyid {:04x} {}", + frame.buf().len(), + frame.tyid(), + e + ); self.errored = true; Ready(Some(Err(e))) } diff --git a/crates/streams/src/test/timebin.rs b/crates/streams/src/test/timebin.rs index 010721c..7ff161d 100644 --- a/crates/streams/src/test/timebin.rs +++ b/crates/streams/src/test/timebin.rs @@ -11,7 +11,6 @@ use futures_util::StreamExt; use items_0::on_sitemty_data; use items_0::streamitem::sitem_data; use items_0::streamitem::RangeCompletableItem; -use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; use items_0::timebin::TimeBinnable; use items_0::timebin::TimeBinned; @@ -421,7 +420,8 @@ fn timebin_multi_stage_00() -> Result<(), Error> { let stream = q1rx; while let Ok(item) = stream.recv().await { //eprintln!("RECV [q1rx] {:?}", item); - on_sitemty_data!(item, |mut item: Box| { + // TODO use the transformed item + let _item = on_sitemty_data!(item, |mut item: Box| { if let Some(k) = item.as_any_mut().downcast_mut::>() { coll.append_all_from(k); } @@ -437,7 +437,8 @@ fn timebin_multi_stage_00() -> Result<(), Error> { let stream = q2rx; while let Ok(item) = stream.recv().await { //eprintln!("RECV [q2rx] {:?}", item); - on_sitemty_data!(item, |mut item: Box| { + // TODO use the transformed item + let _item = on_sitemty_data!(item, |mut item: Box| { if let Some(k) = item.as_any_mut().downcast_mut::>() { coll.append_all_from(k); } diff --git a/crates/taskrun/src/taskrun.rs b/crates/taskrun/src/taskrun.rs index 3b4f967..36bdca9 100644 --- a/crates/taskrun/src/taskrun.rs +++ b/crates/taskrun/src/taskrun.rs @@ -51,27 +51,32 @@ fn on_thread_start() { } pub fn get_runtime_opts(nworkers: usize, nblocking: usize) -> Arc { - let mut g = RUNTIME.lock().unwrap(); - match g.as_ref() { - None => { - let res = tokio::runtime::Builder::new_multi_thread() - .worker_threads(nworkers) - .max_blocking_threads(nblocking) - .enable_all() - .on_thread_start(on_thread_start) - .build(); - let res = match res { - Ok(x) => x, - Err(e) => { - eprintln!("ERROR {e}"); - panic!(); - } - }; - let a = Arc::new(res); - *g = Some(a.clone()); - a + match RUNTIME.lock() { + Ok(mut g) => match g.as_ref() { + None => { + let res = tokio::runtime::Builder::new_multi_thread() + .worker_threads(nworkers) + .max_blocking_threads(nblocking) + .enable_all() + .on_thread_start(on_thread_start) + .build(); + let res = match res { + Ok(x) => x, + Err(e) => { + eprintln!("ERROR {e}"); + panic!("can not create runtime {e}"); + } + }; + let a = Arc::new(res); + *g = Some(a.clone()); + a + } + Some(g) => g.clone(), + }, + Err(e) => { + eprintln!("can not lock tracing init {e}"); + panic!("can not lock tracing init {e}"); } - Some(g) => g.clone(), } } @@ -87,7 +92,6 @@ where eprintln!("ERROR tracing: can not init"); } } - // let res = runtime.block_on(async { fut.await }); let res = runtime.block_on(fut); match res { Ok(k) => Ok(k), @@ -119,19 +123,31 @@ fn tracing_init_inner() -> Result<(), Error> { .with_thread_names(true) .with_filter(filter); - let pid = std::process::id(); - let cspn = format!("/tmp/daqbuffer.tokio.console.pid.{pid}"); - let console_layer = console_subscriber::ConsoleLayer::builder() - .retention(std::time::Duration::from_secs(10)) - // .server_addr(([127, 0, 0, 1], 2875)) - .server_addr(std::path::Path::new(&cspn)) - .spawn(); - // .build(); + let reg = tracing_subscriber::registry(); - // eprintln!("spawn console sever"); - // tokio::spawn(console_server.serve()); + #[cfg(DISABLED_CONSOLE)] + let reg = { + let (console_layer, console_server) = console_subscriber::ConsoleLayer::builder().build(); + tokio::spawn(console_server.serve()); + reg.with(console_layer) + }; + + #[cfg(DISABLED_CONSOLE)] + let reg = { + let pid = std::process::id(); + // let cspn = format!("/tmp/daqbuffer.tokio.console.pid.{pid}"); + let console_layer = console_subscriber::ConsoleLayer::builder() + // .retention(std::time::Duration::from_secs(10)) + .server_addr(([127, 0, 0, 1], 14571)) + // .server_addr(std::path::Path::new(&cspn)) + .spawn(); + // .build(); + + // eprintln!("spawn console sever"); + // tokio::spawn(console_server.serve()); + reg.with(console_layer) + }; - let reg = tracing_subscriber::registry().with(console_layer); let reg = reg.with(fmt_layer); reg.try_init().map_err(|e| { eprintln!("can not initialize tracing layer: {e}"); @@ -176,23 +192,30 @@ fn tracing_init_inner() -> Result<(), Error> { } pub fn tracing_init() -> Result<(), ()> { - let mut initg = INIT_TRACING_ONCE.lock().unwrap(); - if *initg == 0 { - match tracing_init_inner() { - Ok(_) => { - *initg = 1; - } - Err(e) => { - *initg = 2; - eprintln!("tracing_init_inner gave error {e}"); + match INIT_TRACING_ONCE.lock() { + Ok(mut initg) => { + if *initg == 0 { + match tracing_init_inner() { + Ok(_) => { + *initg = 1; + } + Err(e) => { + *initg = 2; + eprintln!("tracing_init_inner gave error {e}"); + } + } + Ok(()) + } else if *initg == 1 { + Ok(()) + } else { + eprintln!("ERROR unknown tracing state"); + Err(()) } } - Ok(()) - } else if *initg == 1 { - Ok(()) - } else { - eprintln!("ERROR unknown tracing state"); - Err(()) + Err(e) => { + eprintln!("can not lock tracing init {e}"); + Err(()) + } } }