diff --git a/disk/src/decode.rs b/disk/src/decode.rs index 60f4ad7..2f09e79 100644 --- a/disk/src/decode.rs +++ b/disk/src/decode.rs @@ -245,7 +245,11 @@ fn make_scalar_conv( ) -> Result, Error> { let ret = match agg_kind { AggKind::EventBlobs => todo!("make_scalar_conv EventBlobs"), - AggKind::Plain | AggKind::DimXBinsN(_) | AggKind::DimXBins1 | AggKind::TimeWeightedScalar => match shape { + AggKind::Plain + | AggKind::DimXBinsN(_) + | AggKind::DimXBins1 + | AggKind::TimeWeightedScalar + | AggKind::PulseIdDiff => match shape { Shape::Scalar => match scalar_type { ScalarType::U8 => ValueDim0FromBytesImpl::::boxed(), ScalarType::U16 => ValueDim0FromBytesImpl::::boxed(), diff --git a/items_0/src/items_0.rs b/items_0/src/items_0.rs index ab8b499..9e81d46 100644 --- a/items_0/src/items_0.rs +++ b/items_0/src/items_0.rs @@ -10,8 +10,11 @@ pub mod bincode { use collect_c::CollectableWithDefault; use collect_s::Collectable; use collect_s::ToJsonResult; -use netpod::{NanoRange, ScalarType, Shape}; +use netpod::NanoRange; +use netpod::ScalarType; +use netpod::Shape; use std::any::Any; +use std::collections::VecDeque; use std::fmt; pub trait WithLen { @@ -129,6 +132,18 @@ pub trait TimeBinnable: fmt::Debug + WithLen + RangeOverlapInfo + Any + AsAnyRef fn to_box_to_json_result(&self) -> Box; } +#[derive(Debug)] +pub enum MergeError { + NotCompatible, + Full, +} + +impl From for err::Error { + fn from(e: MergeError) -> Self { + format!("{e:?}").into() + } +} + // TODO can I remove the Any bound? /// Container of some form of events, for use as trait object. @@ -142,6 +157,7 @@ pub trait Events: + WithLen + Send + erased_serde::Serialize + + EventsNonObj { fn as_time_binnable(&self) -> &dyn TimeBinnable; fn verify(&self) -> bool; @@ -154,7 +170,7 @@ pub trait Events: // TODO is this used? fn take_new_events_until_ts(&mut self, ts_end: u64) -> Box; fn new_empty(&self) -> Box; - fn drain_into(&mut self, dst: &mut Box, range: (usize, usize)) -> Result<(), ()>; + fn drain_into(&mut self, dst: &mut Box, range: (usize, usize)) -> Result<(), MergeError>; fn find_lowest_index_gt(&self, ts: u64) -> Option; fn find_lowest_index_ge(&self, ts: u64) -> Option; fn find_highest_index_lt(&self, ts: u64) -> Option; @@ -164,6 +180,10 @@ pub trait Events: fn nty_id(&self) -> u32; } +pub trait EventsNonObj { + fn into_tss_pulses(self: Box) -> (VecDeque, VecDeque); +} + erased_serde::serialize_trait_object!(Events); impl PartialEq for Box { diff --git a/items_2/src/eventsdim0.rs b/items_2/src/eventsdim0.rs index 261d67e..e1d534a 100644 --- a/items_2/src/eventsdim0.rs +++ b/items_2/src/eventsdim0.rs @@ -11,6 +11,7 @@ use items_0::AsAnyMut; use items_0::AsAnyRef; use items_0::Empty; use items_0::Events; +use items_0::EventsNonObj; use items_0::WithLen; use netpod::log::*; use netpod::timeunits::SEC; @@ -713,7 +714,18 @@ impl items_0::TypeName for EventsDim0 { } } -impl Events for EventsDim0 { +impl EventsNonObj for EventsDim0 { + fn into_tss_pulses(self: Box) -> (VecDeque, VecDeque) { + info!( + "EventsDim0::into_tss_pulses len {} len {}", + self.tss.len(), + self.pulses.len() + ); + (self.tss, self.pulses) + } +} + +impl Events for EventsDim0 { fn as_time_binnable(&self) -> &dyn TimeBinnable { self as &dyn TimeBinnable } @@ -780,7 +792,7 @@ impl Events for EventsDim0 { Box::new(Self::empty()) } - fn drain_into(&mut self, dst: &mut Box, range: (usize, usize)) -> Result<(), ()> { + fn drain_into(&mut self, dst: &mut Box, range: (usize, usize)) -> Result<(), items_0::MergeError> { // TODO as_any and as_any_mut are declared on unrelated traits. Simplify. if let Some(dst) = dst.as_mut().as_any_mut().downcast_mut::() { // TODO make it harder to forget new members when the struct may get modified in the future @@ -791,7 +803,7 @@ impl Events for EventsDim0 { Ok(()) } else { error!("downcast to EventsDim0 FAILED"); - Err(()) + Err(items_0::MergeError::NotCompatible) } } @@ -843,7 +855,7 @@ impl Events for EventsDim0 { } fn nty_id(&self) -> u32 { - NTY::SUB + STY::SUB } fn clone_dyn(&self) -> Box { diff --git a/items_2/src/eventsdim1.rs b/items_2/src/eventsdim1.rs index 4e55073..f45b514 100644 --- a/items_2/src/eventsdim1.rs +++ b/items_2/src/eventsdim1.rs @@ -11,6 +11,7 @@ use items_0::AsAnyMut; use items_0::AsAnyRef; use items_0::Empty; use items_0::Events; +use items_0::EventsNonObj; use items_0::WithLen; use netpod::log::*; use netpod::timeunits::SEC; @@ -667,7 +668,18 @@ impl items_0::TypeName for EventsDim1 { } } -impl Events for EventsDim1 { +impl EventsNonObj for EventsDim1 { + fn into_tss_pulses(self: Box) -> (VecDeque, VecDeque) { + info!( + "EventsDim1::into_tss_pulses len {} len {}", + self.tss.len(), + self.pulses.len() + ); + (self.tss, self.pulses) + } +} + +impl Events for EventsDim1 { fn as_time_binnable(&self) -> &dyn TimeBinnable { self as &dyn TimeBinnable } @@ -734,7 +746,7 @@ impl Events for EventsDim1 { Box::new(Self::empty()) } - fn drain_into(&mut self, dst: &mut Box, range: (usize, usize)) -> Result<(), ()> { + fn drain_into(&mut self, dst: &mut Box, range: (usize, usize)) -> Result<(), items_0::MergeError> { // TODO as_any and as_any_mut are declared on unrelated traits. Simplify. if let Some(dst) = dst.as_mut().as_any_mut().downcast_mut::() { // TODO make it harder to forget new members when the struct may get modified in the future @@ -745,7 +757,7 @@ impl Events for EventsDim1 { Ok(()) } else { error!("downcast to EventsDim0 FAILED"); - Err(()) + Err(items_0::MergeError::NotCompatible) } } @@ -797,7 +809,7 @@ impl Events for EventsDim1 { } fn nty_id(&self) -> u32 { - NTY::SUB + STY::SUB } fn clone_dyn(&self) -> Box { diff --git a/items_2/src/items_2.rs b/items_2/src/items_2.rs index c19ae2a..d5b4be0 100644 --- a/items_2/src/items_2.rs +++ b/items_2/src/items_2.rs @@ -188,9 +188,7 @@ impl crate::merger::Mergeable for Box { } fn drain_into(&mut self, dst: &mut Self, range: (usize, usize)) -> Result<(), merger::MergeError> { - self.as_mut() - .drain_into(dst, range) - .map_err(|()| merger::MergeError::NotCompatible) + self.as_mut().drain_into(dst, range) } fn find_lowest_index_gt(&self, ts: u64) -> Option { @@ -246,8 +244,9 @@ pub fn empty_events_dyn_ev( STRING => Box::new(K::::empty()), } } - _ => { - error!("TODO empty_events_dyn_2 {scalar_type:?} {shape:?} {agg_kind:?}"); + AggKind::PulseIdDiff => Box::new(eventsdim0::EventsDim0::::empty()), + AggKind::DimXBins1 | AggKind::DimXBinsN(..) | AggKind::EventBlobs => { + error!("TODO empty_events_dyn_ev {agg_kind:?} {scalar_type:?} {shape:?}"); err::todoval() } }, @@ -270,13 +269,14 @@ pub fn empty_events_dyn_ev( STRING => Box::new(K::::empty()), } } - _ => { - error!("TODO empty_events_dyn_2 {scalar_type:?} {shape:?} {agg_kind:?}"); + AggKind::PulseIdDiff => Box::new(eventsdim0::EventsDim0::::empty()), + AggKind::DimXBins1 | AggKind::DimXBinsN(..) | AggKind::EventBlobs => { + error!("TODO empty_events_dyn_ev {agg_kind:?} {scalar_type:?} {shape:?}"); err::todoval() } }, Shape::Image(..) => { - error!("TODO empty_events_dyn_2 {scalar_type:?} {shape:?} {agg_kind:?}"); + error!("TODO empty_events_dyn_ev {agg_kind:?} {scalar_type:?} {shape:?}"); err::todoval() } }; @@ -300,14 +300,18 @@ pub fn empty_binned_dyn_tb(scalar_type: &ScalarType, shape: &Shape, agg_kind: &A I64 => Box::new(K::::empty()), F32 => Box::new(K::::empty()), F64 => Box::new(K::::empty()), - _ => { - error!("TODO empty_binned_dyn"); + BOOL | STRING => { + error!("TODO empty_binned_dyn_tb {agg_kind:?} {scalar_type:?} {shape:?}"); err::todoval() } } } - _ => { - error!("TODO empty_binned_dyn"); + AggKind::Plain + | AggKind::DimXBins1 + | AggKind::DimXBinsN(..) + | AggKind::EventBlobs + | AggKind::PulseIdDiff => { + error!("TODO empty_binned_dyn_tb {agg_kind:?} {scalar_type:?} {shape:?}"); err::todoval() } }, @@ -317,21 +321,32 @@ pub fn empty_binned_dyn_tb(scalar_type: &ScalarType, shape: &Shape, agg_kind: &A type K = binsdim0::BinsDim0; match scalar_type { U8 => Box::new(K::::empty()), + U16 => Box::new(K::::empty()), + U32 => Box::new(K::::empty()), + U64 => Box::new(K::::empty()), + I8 => Box::new(K::::empty()), + I16 => Box::new(K::::empty()), + I32 => Box::new(K::::empty()), + I64 => Box::new(K::::empty()), F32 => Box::new(K::::empty()), F64 => Box::new(K::::empty()), - _ => { - error!("TODO empty_binned_dyn"); + BOOL | STRING => { + error!("TODO empty_binned_dyn_tb {agg_kind:?} {scalar_type:?} {shape:?}"); err::todoval() } } } - _ => { - error!("TODO empty_binned_dyn"); + AggKind::EventBlobs + | AggKind::DimXBinsN(..) + | AggKind::Plain + | AggKind::TimeWeightedScalar + | AggKind::PulseIdDiff => { + error!("TODO empty_binned_dyn_tb {agg_kind:?} {scalar_type:?} {shape:?}"); err::todoval() } }, Shape::Image(..) => { - error!("TODO empty_binned_dyn"); + error!("TODO empty_binned_dyn_tb {agg_kind:?} {scalar_type:?} {shape:?}"); err::todoval() } } diff --git a/items_2/src/merger.rs b/items_2/src/merger.rs index e372165..8a6c7da 100644 --- a/items_2/src/merger.rs +++ b/items_2/src/merger.rs @@ -31,17 +31,8 @@ macro_rules! trace4 { ($($arg:tt)*) => (trace!($($arg)*)); } -#[derive(Debug)] -pub enum MergeError { - NotCompatible, - Full, -} - -impl From for err::Error { - fn from(e: MergeError) -> Self { - format!("{e:?}").into() - } -} +// TODO +pub use items_0::MergeError; pub trait Mergeable: fmt::Debug + Unpin { fn len(&self) -> usize; @@ -204,7 +195,8 @@ where // Take only up to the lowest ts of the second-lowest input let mut item = self.items[il0].take().unwrap(); trace3!("Take up to {tl1} from item {item:?}"); - match self.take_into_output_upto(&mut item, tl1) { + let res = self.take_into_output_upto(&mut item, tl1); + match res { Ok(()) => { if item.len() == 0 { // TODO should never be here because we should have taken the whole item @@ -216,7 +208,7 @@ where } Err(MergeError::Full) | Err(MergeError::NotCompatible) => { // TODO count for stats - trace3!("Put item back"); + info!("Put item back because {res:?}"); self.items[il0] = Some(item); self.do_clear_out = true; Ok(Break(())) diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index b2bb813..3b62388 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -1565,6 +1565,7 @@ pub enum AggKind { DimXBinsN(u32), Plain, TimeWeightedScalar, + PulseIdDiff, } impl AggKind { @@ -1575,6 +1576,7 @@ impl AggKind { Self::DimXBins1 => false, Self::DimXBinsN(_) => false, Self::Plain => false, + Self::PulseIdDiff => false, } } @@ -1585,6 +1587,7 @@ impl AggKind { Self::DimXBins1 => false, Self::DimXBinsN(_) => false, Self::Plain => false, + Self::PulseIdDiff => false, } } } @@ -1610,6 +1613,7 @@ pub fn x_bin_count(shape: &Shape, agg_kind: &AggKind) -> usize { Shape::Wave(n) => *n as usize, Shape::Image(j, k) => *j as usize * *k as usize, }, + AggKind::PulseIdDiff => 0, } } @@ -1631,6 +1635,9 @@ impl fmt::Display for AggKind { Self::TimeWeightedScalar => { write!(fmt, "TimeWeightedScalar") } + Self::PulseIdDiff => { + write!(fmt, "PulseIdDiff") + } } } } @@ -1655,6 +1662,8 @@ impl FromStr for AggKind { } else if s.starts_with(nmark) { let nbins: u32 = s[nmark.len()..].parse()?; Ok(AggKind::DimXBinsN(nbins)) + } else if s == "PulseIdDiff" { + Ok(AggKind::PulseIdDiff) } else { Err(Error::with_msg(format!("can not parse {} as AggKind", s))) } diff --git a/netpod/src/query.rs b/netpod/src/query.rs index a02a73e..88fe9c8 100644 --- a/netpod/src/query.rs +++ b/netpod/src/query.rs @@ -519,6 +519,9 @@ pub fn binning_scheme_append_to_url(agg_kind: &AggKind, url: &mut Url) { g.append_pair("binningScheme", "binnedX"); g.append_pair("binnedXcount", &format!("{}", n)); } + AggKind::PulseIdDiff => { + g.append_pair("binningScheme", "pulseIdDiff"); + } } } @@ -537,6 +540,8 @@ pub fn agg_kind_from_binning_scheme(pairs: &BTreeMap) -> Result< } else if s == "binnedX" { let u = pairs.get("binnedXcount").map_or("1", |k| k).parse()?; AggKind::DimXBinsN(u) + } else if s == "pulseIdDiff" { + AggKind::PulseIdDiff } else { return Err(Error::with_msg("can not extract binningScheme")); }; diff --git a/nodenet/src/conn.rs b/nodenet/src/conn.rs index 19cc1b2..7e452e9 100644 --- a/nodenet/src/conn.rs +++ b/nodenet/src/conn.rs @@ -117,6 +117,34 @@ async fn make_channel_events_stream( do_test_stream_error, ); let stream = stream + .map({ + let agg_kind = evq.agg_kind_value(); + move |item| match item { + Ok(item) => { + let x = if let AggKind::PulseIdDiff = agg_kind { + let x = match item { + ChannelEvents::Events(item) => { + let (tss, pulses) = items_0::EventsNonObj::into_tss_pulses(item); + let mut item = items_2::eventsdim0::EventsDim0::empty(); + let mut pulse_last = pulses.front().map_or(0, |&x| x); + for (ts, pulse) in tss.into_iter().zip(pulses) { + let value = pulse as i64 - pulse_last as i64; + item.push(ts, pulse, value); + pulse_last = pulse; + } + ChannelEvents::Events(Box::new(item)) + } + ChannelEvents::Status(x) => ChannelEvents::Status(x), + }; + x + } else { + item + }; + Ok(x) + } + Err(e) => Err(e), + } + }) .map(move |item| match &item { Ok(k) => match k { ChannelEvents::Events(k) => { diff --git a/streams/src/plaineventsjson.rs b/streams/src/plaineventsjson.rs index 781a4ce..73eef1c 100644 --- a/streams/src/plaineventsjson.rs +++ b/streams/src/plaineventsjson.rs @@ -44,7 +44,15 @@ pub async fn plain_events_json( //let inps = open_tcp_streams::<_, Box>(&query, cluster).await?; // TODO propagate also the max-buf-len for the first stage event reader: let stream = items_2::merger::Merger::new(inps, 1024); + let stream = stream.map(|item| { + info!("item after merge: {item:?}"); + item + }); let stream = crate::rangefilter2::RangeFilter2::new(stream, query.range().clone(), evquery.one_before_range()); + let stream = stream.map(|item| { + info!("item after rangefilter: {item:?}"); + item + }); let stream = stream::iter([empty]).chain(stream); let collected = crate::collect::collect(stream, deadline, events_max, Some(query.range().clone()), None).await?; let jsval = serde_json::to_value(&collected)?; diff --git a/streams/src/rangefilter2.rs b/streams/src/rangefilter2.rs index a3fab51..7fec24a 100644 --- a/streams/src/rangefilter2.rs +++ b/streams/src/rangefilter2.rs @@ -176,6 +176,7 @@ where Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) => match self.handle_item(item) { Ok(item) => Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))))), Err(e) => { + error!("sees: {e}"); self.data_done = true; Ready(Some(Err(e))) } diff --git a/streams/src/tcprawclient.rs b/streams/src/tcprawclient.rs index 98ee1a9..9f6b869 100644 --- a/streams/src/tcprawclient.rs +++ b/streams/src/tcprawclient.rs @@ -9,18 +9,24 @@ use crate::frames::eventsfromframes::EventsFromFrames; use crate::frames::inmem::InMemoryFrameAsyncReadStream; use err::Error; use futures_util::Stream; +use futures_util::StreamExt; use items::eventfull::EventFull; -use items::frame::{make_frame, make_term_frame}; +use items::frame::make_frame; +use items::frame::make_term_frame; use items::sitem_data; -use items::{EventQueryJsonStringFrame, RangeCompletableItem, Sitemty, StreamItem}; +use items::EventQueryJsonStringFrame; +use items::RangeCompletableItem; +use items::Sitemty; +use items::StreamItem; use netpod::log::*; use netpod::query::PlainEventsQuery; use netpod::Cluster; -use netpod::{Node, PerfOpts}; +use netpod::Node; +use netpod::PerfOpts; +use std::fmt; use std::pin::Pin; use tokio::io::AsyncWriteExt; use tokio::net::TcpStream; -use tracing::Instrument; pub async fn x_processed_event_blobs_stream_from_node( query: PlainEventsQuery, @@ -55,7 +61,7 @@ pub async fn open_tcp_streams(query: Q, cluster: &Cluster) -> Result::new(frames); + let stream = stream.map(|x| { + info!("tcp stream recv sees item {x:?}"); + x + }); streams.push(Box::pin(stream) as _); } Ok(streams)