diff --git a/crates/daqbufp2/src/test.rs b/crates/daqbufp2/src/test.rs index 4acd40f..f115f8c 100644 --- a/crates/daqbufp2/src/test.rs +++ b/crates/daqbufp2/src/test.rs @@ -8,23 +8,11 @@ pub mod binnedjson; mod timeweightedjson; use bytes::BytesMut; -use err::Error; -use std::future::Future; - -fn run_test(f: F) -> Result<(), Error> -where - F: Future> + Send, -{ - let runtime = taskrun::get_runtime(); - let _g = runtime.enter(); - runtime.block_on(f) - //let jh = tokio::spawn(f); - //jh.await; -} #[test] fn bufs() { - use bytes::{Buf, BufMut}; + use bytes::Buf; + use bytes::BufMut; let mut buf = BytesMut::with_capacity(1024); assert!(buf.as_mut().len() == 0); buf.put_u32_le(123); diff --git a/crates/disk/src/dataopen.rs b/crates/disk/src/dataopen.rs index da3b0cf..c9e12c3 100644 --- a/crates/disk/src/dataopen.rs +++ b/crates/disk/src/dataopen.rs @@ -1,5 +1,4 @@ use super::paths; -use crate::SfDbChConf; use bytes::BytesMut; use err::ErrStr; use err::Error; @@ -827,9 +826,9 @@ mod test { beg: DAY + HOUR * 5, end: DAY + HOUR * 8, }; - let chn = netpod::SfDbChannel::from_name(BACKEND, "scalar-i32-be"); + let _chn = netpod::SfDbChannel::from_name(BACKEND, "scalar-i32-be"); // TODO read config from disk? Or expose the config from data generator? - let fetch_info = todo!(); + let fetch_info = err::todoval(); // let fetch_info = SfChFetchInfo { // channel: chn, // keyspace: 2, diff --git a/crates/disk/src/decode.rs b/crates/disk/src/decode.rs index 6726e79..3d1c116 100644 --- a/crates/disk/src/decode.rs +++ b/crates/disk/src/decode.rs @@ -1,4 +1,3 @@ -use crate::eventchunkermultifile::EventChunkerMultifile; use err::Error; use futures_util::Stream; use futures_util::StreamExt; @@ -300,8 +299,7 @@ fn make_scalar_conv( pub struct EventsDynStream { scalar_type: ScalarType, shape: Shape, - agg_kind: AggKind, - events_full: EventChunkerMultifile, + events_full: Pin> + Send>>, events_out: Box, scalar_conv: Box, emit_threshold: usize, @@ -318,15 +316,14 @@ impl EventsDynStream { scalar_type: ScalarType, shape: Shape, agg_kind: AggKind, - events_full: EventChunkerMultifile, + events_full: Pin> + Send>>, ) -> Result { let st = &scalar_type; let sh = &shape; - let ag = &agg_kind; warn!("TODO EventsDynStream::new feed through transform"); // TODO do we need/want the empty item from here? let events_out = items_2::empty::empty_events_dyn_ev(st, sh)?; - let scalar_conv = make_scalar_conv(st, sh, ag)?; + let scalar_conv = make_scalar_conv(st, sh, &agg_kind)?; let emit_threshold = match &shape { Shape::Scalar => 2048, Shape::Wave(_) => 64, @@ -335,7 +332,6 @@ impl EventsDynStream { let ret = Self { scalar_type, shape, - agg_kind, events_full, events_out, scalar_conv, @@ -360,16 +356,13 @@ impl EventsDynStream { if item.len() >= self.emit_threshold { info!("handle_event_full item len {}", item.len()); } - for (((buf, &be), &ts), &pulse) in item - .blobs - .iter() - .zip(item.be.iter()) - .zip(item.tss.iter()) - .zip(item.pulses.iter()) - { + for (i, ((&be, &ts), &pulse)) in item.be.iter().zip(item.tss.iter()).zip(item.pulses.iter()).enumerate() { + let buf = item + .data_decompressed(i) + .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; let endian = if be { Endian::Big } else { Endian::Little }; self.scalar_conv - .convert(ts, pulse, buf, endian, self.events_out.as_mut())?; + .convert(ts, pulse, &buf, endian, self.events_out.as_mut())?; } Ok(()) } diff --git a/crates/disk/src/disk.rs b/crates/disk/src/disk.rs index 0125486..6b669dc 100644 --- a/crates/disk/src/disk.rs +++ b/crates/disk/src/disk.rs @@ -7,6 +7,7 @@ pub mod dataopen; pub mod decode; pub mod eventchunker; pub mod eventchunkermultifile; +pub mod eventfilter; pub mod frame; pub mod gen; pub mod index; @@ -836,7 +837,7 @@ impl Stream for BlockingTaskIntoChannel { } fn blocking_task_into_channel( - path: PathBuf, + _path: PathBuf, file: File, disk_io_tune: DiskIoTune, reqid: String, diff --git a/crates/disk/src/eventchunker.rs b/crates/disk/src/eventchunker.rs index 7544d8d..5c2ad4d 100644 --- a/crates/disk/src/eventchunker.rs +++ b/crates/disk/src/eventchunker.rs @@ -72,26 +72,38 @@ pub struct EventChunker { dbg_path: PathBuf, last_ts: u64, expand: bool, - decomp_dt_histo: HistoLog2, item_len_emit_histo: HistoLog2, seen_before_range_count: usize, seen_after_range_count: usize, + seen_events: usize, unordered_count: usize, repeated_ts_count: usize, - config_mismatch_discard: usize, - discard_count: usize, + discard_count_range: usize, + discard_count_scalar_type: usize, + discard_count_shape: usize, + discard_count_shape_derived: usize, + discard_count_shape_derived_err: usize, log_items: VecDeque, } impl Drop for EventChunker { fn drop(&mut self) { // TODO collect somewhere - if self.config_mismatch_discard != 0 { - warn!("config_mismatch_discard {}", self.config_mismatch_discard); - } debug!( - "EventChunker-stats {{ decomp_dt_histo: {:?}, item_len_emit_histo: {:?} }}", - self.decomp_dt_histo, self.item_len_emit_histo + concat!( + "EventChunker-stats {{ node_ix: {}, seen_events: {}, discard_count_range: {},", + " discard_count_scalar_type: {}, discard_count_shape: {},", + " discard_count_shape_derived: {}, discard_count_shape_derived_err: {},", + " item_len_emit_histo: {:?} }}", + ), + self.node_ix, + self.seen_events, + self.discard_count_range, + self.discard_count_scalar_type, + self.discard_count_shape, + self.discard_count_shape_derived, + self.discard_count_shape_derived_err, + self.item_len_emit_histo ); } } @@ -117,33 +129,6 @@ impl EventChunkerConf { } } -fn is_config_match(is_array: &bool, ele_count: &u64, fetch_info: &SfChFetchInfo) -> bool { - match fetch_info.shape() { - Shape::Scalar => { - if *is_array { - false - } else { - true - } - } - Shape::Wave(dim1count) => { - if (*dim1count as u64) != *ele_count { - false - } else { - true - } - } - Shape::Image(n1, n2) => { - let nt = (*n1 as u64) * (*n2 as u64); - if nt != *ele_count { - false - } else { - true - } - } - } -} - impl EventChunker { pub fn self_name() -> &'static str { std::any::type_name::() @@ -186,14 +171,17 @@ impl EventChunker { node_ix, last_ts: 0, expand, - decomp_dt_histo: HistoLog2::new(8), item_len_emit_histo: HistoLog2::new(0), seen_before_range_count: 0, seen_after_range_count: 0, + seen_events: 0, unordered_count: 0, repeated_ts_count: 0, - config_mismatch_discard: 0, - discard_count: 0, + discard_count_range: 0, + discard_count_scalar_type: 0, + discard_count_shape: 0, + discard_count_shape_derived: 0, + discard_count_shape_derived_err: 0, log_items: VecDeque::new(), } } @@ -283,6 +271,7 @@ impl EventChunker { self.need_min = len as u32; break; } else { + self.seen_events += 1; let mut discard = false; let _ttl = sl.read_i64::().unwrap(); let ts = sl.read_i64::().unwrap() as u64; @@ -305,6 +294,7 @@ impl EventChunker { } if ts < self.last_ts { discard = true; + self.discard_count_range += 1; self.unordered_count += 1; if self.unordered_count < 20 { let msg = format!( @@ -323,6 +313,7 @@ impl EventChunker { self.last_ts = ts; if ts >= self.range.end { discard = true; + self.discard_count_range += 1; self.seen_after_range_count += 1; if !self.expand || self.seen_after_range_count >= 2 { self.seen_beyond_range = true; @@ -332,6 +323,7 @@ impl EventChunker { } if ts < self.range.beg { discard = true; + self.discard_count_range += 1; self.seen_before_range_count += 1; if self.seen_before_range_count < 20 { let msg = format!( @@ -414,10 +406,12 @@ impl EventChunker { Shape::Image(shape_lens[0], shape_lens[1]) } else if shape_dim == 0 { discard = true; + self.discard_count_shape += 1; // return Err(DataParseError::ShapedWithoutDims); Shape::Scalar } else { discard = true; + self.discard_count_shape += 1; // return Err(DataParseError::TooManyDims); Shape::Scalar } @@ -436,11 +430,11 @@ impl EventChunker { }; if self.fetch_info.scalar_type().ne(&scalar_type) { discard = true; + self.discard_count_scalar_type += 1; let msg = format!( - "scalar_type mismatch {:?} {:?} {:?}", + "scalar_type mismatch {:?} {:?}", scalar_type, self.fetch_info.scalar_type(), - self.dbg_path, ); let item = LogItem::from_node(self.node_ix, Level::WARN, msg); log_items.push(item); @@ -450,12 +444,8 @@ impl EventChunker { // especially for waveforms it will wrongly indicate scalar. So this is unusable. if self.fetch_info.shape().ne(&shape_this) { discard = true; - let msg = format!( - "shape mismatch {:?} {:?} {:?}", - shape_this, - self.fetch_info.shape(), - self.dbg_path, - ); + self.discard_count_shape += 1; + let msg = format!("shape mismatch {:?} {:?}", shape_this, self.fetch_info.shape(),); let item = LogItem::from_node(self.node_ix, Level::WARN, msg); log_items.push(item); } @@ -465,7 +455,6 @@ impl EventChunker { let n2 = len as u64 - n1 - 4; let databuf = buf[p1 as usize..(p1 as usize + n2 as usize)].as_ref(); if discard { - self.discard_count += 1; } else { ret.push( ts, @@ -479,27 +468,25 @@ impl EventChunker { match ret.shape_derived(ret.len() - 1, self.fetch_info.shape()) { Ok(sh) => { if sh.ne(self.fetch_info.shape()) { - self.discard_count += 1; + self.discard_count_shape_derived += 1; ret.pop_back(); let msg = format!( - "shape_derived mismatch {:?} {:?} {:?} {:?}", + "shape_derived mismatch {:?} {:?} {:?}", self.fetch_info.scalar_type(), self.fetch_info.shape(), sh, - self.dbg_path, ); let item = LogItem::from_node(self.node_ix, Level::WARN, msg); log_items.push(item); } } Err(_) => { - self.discard_count += 1; + self.discard_count_shape_derived_err += 1; ret.pop_back(); let msg = format!( - "shape_derived error {:?} {:?} {:?}", + "shape_derived error {:?} {:?}", self.fetch_info.scalar_type(), self.fetch_info.shape(), - self.dbg_path, ); let item = LogItem::from_node(self.node_ix, Level::WARN, msg); log_items.push(item); @@ -526,7 +513,7 @@ impl Stream for EventChunker { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - 'outer: loop { + loop { break if self.completed { panic!("EventChunker poll_next on completed"); } else if let Some(item) = self.log_items.pop_front() { @@ -552,7 +539,7 @@ impl Stream for EventChunker { Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) } else { trace!("sent_beyond_range non-complete"); - continue 'outer; + continue; } } else if self.data_emit_complete { let item = EventDataReadStats { @@ -616,7 +603,7 @@ impl Stream for EventChunker { } Ready(None) => { self.data_emit_complete = true; - continue 'outer; + continue; } Pending => Pending, } diff --git a/crates/disk/src/eventfilter.rs b/crates/disk/src/eventfilter.rs new file mode 100644 index 0000000..7d7b7a3 --- /dev/null +++ b/crates/disk/src/eventfilter.rs @@ -0,0 +1,77 @@ +use futures_util::Stream; +use futures_util::StreamExt; +use items_0::streamitem::LogItem; +use items_0::streamitem::RangeCompletableItem; +use items_0::streamitem::Sitemty; +use items_0::streamitem::StreamItem; +use items_0::WithLen; +use items_2::eventfull::EventFull; +use netpod::Shape; +use std::collections::VecDeque; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; +use tracing::Level; + +pub struct EventFullShapeFilter { + inp: INP, + shape_exp: Shape, + node_ix: usize, + log_items: VecDeque, +} + +impl EventFullShapeFilter { + fn filter_item(&mut self, item: &mut EventFull) { + let node_ix = self.node_ix; + let p: Vec<_> = (0..item.len()) + .map(|i| { + let sh = item.shape_derived(i, &self.shape_exp); + match sh { + Ok(sh) => { + if sh.ne(&self.shape_exp) { + let msg = format!("shape_derived mismatch {:?} {:?}", sh, self.shape_exp); + let item = LogItem::from_node(node_ix, Level::WARN, msg); + self.log_items.push_back(item); + false + } else { + true + } + } + Err(_) => { + let msg = format!("shape_derived mismatch {:?} {:?}", sh, self.shape_exp); + let item = LogItem::from_node(self.node_ix, Level::WARN, msg); + self.log_items.push_back(item); + false + } + } + }) + .collect(); + item.keep_ixs(&p); + } +} + +impl Stream for EventFullShapeFilter +where + INP: Stream> + Unpin, +{ + type Item = Sitemty; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + if let Some(item) = self.log_items.pop_front() { + Ready(Some(Ok(StreamItem::Log(item)))) + } else { + match self.inp.poll_next_unpin(cx) { + Ready(Some(item)) => match item { + Ok(StreamItem::DataItem(RangeCompletableItem::Data(mut item))) => { + self.filter_item(&mut item); + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))))) + } + x => Ready(Some(x)), + }, + Ready(None) => Ready(None), + Pending => Pending, + } + } + } +} diff --git a/crates/disk/src/raw/conn.rs b/crates/disk/src/raw/conn.rs index f03ebdb..5b34c82 100644 --- a/crates/disk/src/raw/conn.rs +++ b/crates/disk/src/raw/conn.rs @@ -31,6 +31,7 @@ fn make_num_pipeline_stream_evs( ) -> Pin> + Send>> { let scalar_type = fetch_info.scalar_type().clone(); let shape = fetch_info.shape().clone(); + let event_blobs = Box::pin(event_blobs); let event_stream = match crate::decode::EventsDynStream::new(scalar_type, shape, agg_kind, event_blobs) { Ok(k) => k, Err(e) => { diff --git a/crates/err/Cargo.toml b/crates/err/Cargo.toml index 4e0531e..2a1e99e 100644 --- a/crates/err/Cargo.toml +++ b/crates/err/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "err" -version = "0.0.4" +version = "0.0.5" authors = ["Dominik Werder "] edition = "2021" @@ -18,5 +18,8 @@ chrono = { version = "0.4.26", features = ["serde"] } url = "2.4.0" regex = "1.9.1" http = "0.2.9" -thiserror = { path = "../../../thiserror" } +thiserror = "=0.0.1" anyhow = "1.0" + +[patch.crates-io] +thiserror = { git = "https://github.com/dominikwerder/thiserror.git" } diff --git a/crates/httpret/src/api1.rs b/crates/httpret/src/api1.rs index a59c368..122432b 100644 --- a/crates/httpret/src/api1.rs +++ b/crates/httpret/src/api1.rs @@ -5,9 +5,7 @@ use crate::response; use crate::ReqCtx; use bytes::BufMut; use bytes::BytesMut; -use disk::eventchunker::EventChunkerConf; use disk::merge::mergedblobsfromremotes::MergedBlobsFromRemotes; -use disk::raw::conn::make_event_blobs_stream; use futures_util::Stream; use futures_util::StreamExt; use http::Method; @@ -27,7 +25,6 @@ use netpod::query::api1::Api1Query; use netpod::range::evrange::NanoRange; use netpod::timeunits::SEC; use netpod::Api1WarningStats; -use netpod::ByteSize; use netpod::ChannelSearchQuery; use netpod::ChannelSearchResult; use netpod::ChannelTypeConfigGen; @@ -523,6 +520,7 @@ pub struct DataApiPython3DataStream { current_fetch_info: Option, node_config: NodeConfigCached, chan_stream: Option> + Send>>>, + #[allow(unused)] disk_io_tune: DiskIoTune, do_decompress: bool, event_count: usize, diff --git a/crates/httpret/src/pulsemap.rs b/crates/httpret/src/pulsemap.rs index 8802855..c06a66a 100644 --- a/crates/httpret/src/pulsemap.rs +++ b/crates/httpret/src/pulsemap.rs @@ -1388,7 +1388,10 @@ impl Api4MapPulseHttpFunction { let ret = match Self::find_timestamp(q, ncc).await { Ok(Some(val)) => Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&val)?))?), Ok(None) => Ok(response(StatusCode::NO_CONTENT).body(Body::empty())?), - Err(e) => Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())?), + Err(e) => { + error!("find_timestamp {e}"); + Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())?) + } }; let ts2 = Instant::now(); let dt = ts2.duration_since(ts1); @@ -1446,7 +1449,10 @@ impl Api4MapPulse2HttpFunction { Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&res)?))?) } Ok(None) => Ok(response(StatusCode::NO_CONTENT).body(Body::empty())?), - Err(e) => Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())?), + Err(e) => { + error!("find_timestamp {e}"); + Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())?) + } }; let ts2 = Instant::now(); let dt = ts2.duration_since(ts1); diff --git a/crates/items_0/src/items_0.rs b/crates/items_0/src/items_0.rs index 3b33fd5..8706361 100644 --- a/crates/items_0/src/items_0.rs +++ b/crates/items_0/src/items_0.rs @@ -18,7 +18,6 @@ pub use futures_util; use collect_s::Collectable; use container::ByteEstimate; -use netpod::range::evrange::SeriesRange; use std::any::Any; use std::collections::VecDeque; use std::fmt; diff --git a/crates/items_2/src/eventfull.rs b/crates/items_2/src/eventfull.rs index 331ed1b..fe1dbd0 100644 --- a/crates/items_2/src/eventfull.rs +++ b/crates/items_2/src/eventfull.rs @@ -136,6 +136,20 @@ impl EventFull { self.shapes.pop_back(); self.comps.pop_back(); } + + pub fn keep_ixs(&mut self, ixs: &[bool]) { + fn inner(v: &mut VecDeque, ixs: &[bool]) { + let mut it = ixs.iter(); + v.retain_mut(move |_| it.next().map(Clone::clone).unwrap_or(false)); + } + inner(&mut self.tss, ixs); + inner(&mut self.pulses, ixs); + inner(&mut self.blobs, ixs); + inner(&mut self.scalar_types, ixs); + inner(&mut self.be, ixs); + inner(&mut self.shapes, ixs); + inner(&mut self.comps, ixs); + } } impl FrameTypeInnerStatic for EventFull { diff --git a/crates/netpod/src/netpod.rs b/crates/netpod/src/netpod.rs index 4565fb0..7041a40 100644 --- a/crates/netpod/src/netpod.rs +++ b/crates/netpod/src/netpod.rs @@ -1437,6 +1437,7 @@ impl Dim0Index for TsNano { } fn prebin_patch_len_for(i: usize) -> Self { + let _ = i; todo!() } @@ -1507,6 +1508,7 @@ impl Dim0Index for PulseId { } fn prebin_patch_len_for(i: usize) -> Self { + let _ = i; todo!() } @@ -1543,6 +1545,7 @@ const PREBIN_TIME_BIN_LEN_VAR0: [u64; 3] = [MIN * 1, HOUR * 1, DAY]; const PREBIN_PULSE_BIN_LEN_VAR0: [u64; 4] = [100, 10000, 1000000, 100000000]; +#[allow(unused)] const PATCH_T_LEN_OPTIONS_SCALAR: [u64; 3] = [ // //MIN * 60, @@ -1551,6 +1554,7 @@ const PATCH_T_LEN_OPTIONS_SCALAR: [u64; 3] = [ DAY * 64, ]; +#[allow(unused)] const PATCH_T_LEN_OPTIONS_WAVE: [u64; 3] = [ // //MIN * 10, @@ -1606,10 +1610,12 @@ const PULSE_BIN_THRESHOLDS: [u64; 25] = [ 800000, 1000000, 2000000, 4000000, 8000000, 10000000, ]; +#[allow(unused)] const fn time_bin_threshold_at(i: usize) -> TsNano { TsNano(TIME_BIN_THRESHOLDS[i]) } +#[allow(unused)] const fn pulse_bin_threshold_at(i: usize) -> PulseId { PulseId(PULSE_BIN_THRESHOLDS[i]) } @@ -1726,17 +1732,17 @@ impl PreBinnedPatchCoordEnum { } impl FromUrl for PreBinnedPatchCoordEnum { - fn from_url(url: &Url) -> Result { + fn from_url(_url: &Url) -> Result { todo!() } - fn from_pairs(pairs: &BTreeMap) -> Result { + fn from_pairs(_pairs: &BTreeMap) -> Result { todo!() } } impl AppendToUrl for PreBinnedPatchCoordEnum { - fn append_to_url(&self, url: &mut Url) { + fn append_to_url(&self, _url: &mut Url) { todo!() } } @@ -1755,8 +1761,8 @@ where T: Dim0Index, { pub fn edges(&self) -> Vec { - let mut ret = Vec::new(); err::todo(); + let ret = Vec::new(); ret } @@ -1855,6 +1861,7 @@ where } pub fn get_range(&self, ix: u32) -> NanoRange { + let _ = ix; /*NanoRange { beg: (self.offset + ix as u64) * self.grid_spec.bin_t_len, end: (self.offset + ix as u64 + 1) * self.grid_spec.bin_t_len, diff --git a/crates/netpod/src/query.rs b/crates/netpod/src/query.rs index 5d664a3..03724a9 100644 --- a/crates/netpod/src/query.rs +++ b/crates/netpod/src/query.rs @@ -3,17 +3,14 @@ pub mod datetime; pub mod prebinned; use crate::get_url_query_pairs; -use crate::is_false; use crate::log::*; use crate::AggKind; use crate::AppendToUrl; -use crate::ByteSize; use crate::FromUrl; use crate::HasBackend; use crate::HasTimeout; use crate::NanoRange; use crate::PulseRange; -use crate::SeriesRange; use crate::SfDbChannel; use crate::ToNanos; use crate::DATETIME_FMT_6MS; diff --git a/crates/netpod/src/query/api1.rs b/crates/netpod/src/query/api1.rs index c645002..143f538 100644 --- a/crates/netpod/src/query/api1.rs +++ b/crates/netpod/src/query/api1.rs @@ -5,14 +5,6 @@ use serde::{Deserialize, Serialize}; use std::fmt; use std::time::Duration; -fn bool_true() -> bool { - true -} - -fn bool_is_true(x: &bool) -> bool { - *x -} - #[derive(Debug, PartialEq, Serialize, Deserialize)] pub struct Api1Range { #[serde(rename = "type", default, skip_serializing_if = "String::is_empty")] diff --git a/crates/streams/src/collect.rs b/crates/streams/src/collect.rs index 2a32b45..abb7b95 100644 --- a/crates/streams/src/collect.rs +++ b/crates/streams/src/collect.rs @@ -105,7 +105,17 @@ impl Collect { } }, StreamItem::Log(item) => { - trace!("collect log {:?}", item); + if item.level == Level::ERROR { + error!("node {} msg {}", item.node_ix, item.msg); + } else if item.level == Level::WARN { + warn!("node {} msg {}", item.node_ix, item.msg); + } else if item.level == Level::INFO { + info!("node {} msg {}", item.node_ix, item.msg); + } else if item.level == Level::DEBUG { + debug!("node {} msg {}", item.node_ix, item.msg); + } else if item.level == Level::TRACE { + trace!("node {} msg {}", item.node_ix, item.msg); + } Ok(()) } StreamItem::Stats(item) => { diff --git a/crates/streams/src/rangefilter2.rs b/crates/streams/src/rangefilter2.rs index 69d78a9..2b93c45 100644 --- a/crates/streams/src/rangefilter2.rs +++ b/crates/streams/src/rangefilter2.rs @@ -225,7 +225,7 @@ where ITY: Mergeable, { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct(Self::type_name()).finish() + f.debug_struct("RangeFilter2").field("stats", &self.stats).finish() } } @@ -235,6 +235,7 @@ where ITY: Mergeable, { fn drop(&mut self) { - debug!("drop {} {:?}", Self::type_name(), self); + // Self::type_name() + debug!("drop {:?}", self); } }