WIP pulse diff

This commit is contained in:
Dominik Werder
2023-02-24 16:32:47 +01:00
parent 2e286eb28e
commit 16719b384c
12 changed files with 163 additions and 49 deletions

View File

@@ -245,7 +245,11 @@ fn make_scalar_conv(
) -> Result<Box<dyn ValueFromBytes>, Error> {
let ret = match agg_kind {
AggKind::EventBlobs => todo!("make_scalar_conv EventBlobs"),
AggKind::Plain | AggKind::DimXBinsN(_) | AggKind::DimXBins1 | AggKind::TimeWeightedScalar => match shape {
AggKind::Plain
| AggKind::DimXBinsN(_)
| AggKind::DimXBins1
| AggKind::TimeWeightedScalar
| AggKind::PulseIdDiff => match shape {
Shape::Scalar => match scalar_type {
ScalarType::U8 => ValueDim0FromBytesImpl::<u8>::boxed(),
ScalarType::U16 => ValueDim0FromBytesImpl::<u16>::boxed(),

View File

@@ -10,8 +10,11 @@ pub mod bincode {
use collect_c::CollectableWithDefault;
use collect_s::Collectable;
use collect_s::ToJsonResult;
use netpod::{NanoRange, ScalarType, Shape};
use netpod::NanoRange;
use netpod::ScalarType;
use netpod::Shape;
use std::any::Any;
use std::collections::VecDeque;
use std::fmt;
pub trait WithLen {
@@ -129,6 +132,18 @@ pub trait TimeBinnable: fmt::Debug + WithLen + RangeOverlapInfo + Any + AsAnyRef
fn to_box_to_json_result(&self) -> Box<dyn ToJsonResult>;
}
#[derive(Debug)]
pub enum MergeError {
NotCompatible,
Full,
}
impl From<MergeError> for err::Error {
fn from(e: MergeError) -> Self {
format!("{e:?}").into()
}
}
// TODO can I remove the Any bound?
/// Container of some form of events, for use as trait object.
@@ -142,6 +157,7 @@ pub trait Events:
+ WithLen
+ Send
+ erased_serde::Serialize
+ EventsNonObj
{
fn as_time_binnable(&self) -> &dyn TimeBinnable;
fn verify(&self) -> bool;
@@ -154,7 +170,7 @@ pub trait Events:
// TODO is this used?
fn take_new_events_until_ts(&mut self, ts_end: u64) -> Box<dyn Events>;
fn new_empty(&self) -> Box<dyn Events>;
fn drain_into(&mut self, dst: &mut Box<dyn Events>, range: (usize, usize)) -> Result<(), ()>;
fn drain_into(&mut self, dst: &mut Box<dyn Events>, range: (usize, usize)) -> Result<(), MergeError>;
fn find_lowest_index_gt(&self, ts: u64) -> Option<usize>;
fn find_lowest_index_ge(&self, ts: u64) -> Option<usize>;
fn find_highest_index_lt(&self, ts: u64) -> Option<usize>;
@@ -164,6 +180,10 @@ pub trait Events:
fn nty_id(&self) -> u32;
}
pub trait EventsNonObj {
fn into_tss_pulses(self: Box<Self>) -> (VecDeque<u64>, VecDeque<u64>);
}
erased_serde::serialize_trait_object!(Events);
impl PartialEq for Box<dyn Events> {

View File

@@ -11,6 +11,7 @@ use items_0::AsAnyMut;
use items_0::AsAnyRef;
use items_0::Empty;
use items_0::Events;
use items_0::EventsNonObj;
use items_0::WithLen;
use netpod::log::*;
use netpod::timeunits::SEC;
@@ -713,7 +714,18 @@ impl<STY> items_0::TypeName for EventsDim0<STY> {
}
}
impl<NTY: ScalarOps> Events for EventsDim0<NTY> {
impl<STY: ScalarOps> EventsNonObj for EventsDim0<STY> {
fn into_tss_pulses(self: Box<Self>) -> (VecDeque<u64>, VecDeque<u64>) {
info!(
"EventsDim0::into_tss_pulses len {} len {}",
self.tss.len(),
self.pulses.len()
);
(self.tss, self.pulses)
}
}
impl<STY: ScalarOps> Events for EventsDim0<STY> {
fn as_time_binnable(&self) -> &dyn TimeBinnable {
self as &dyn TimeBinnable
}
@@ -780,7 +792,7 @@ impl<NTY: ScalarOps> Events for EventsDim0<NTY> {
Box::new(Self::empty())
}
fn drain_into(&mut self, dst: &mut Box<dyn Events>, range: (usize, usize)) -> Result<(), ()> {
fn drain_into(&mut self, dst: &mut Box<dyn Events>, range: (usize, usize)) -> Result<(), items_0::MergeError> {
// TODO as_any and as_any_mut are declared on unrelated traits. Simplify.
if let Some(dst) = dst.as_mut().as_any_mut().downcast_mut::<Self>() {
// TODO make it harder to forget new members when the struct may get modified in the future
@@ -791,7 +803,7 @@ impl<NTY: ScalarOps> Events for EventsDim0<NTY> {
Ok(())
} else {
error!("downcast to EventsDim0 FAILED");
Err(())
Err(items_0::MergeError::NotCompatible)
}
}
@@ -843,7 +855,7 @@ impl<NTY: ScalarOps> Events for EventsDim0<NTY> {
}
fn nty_id(&self) -> u32 {
NTY::SUB
STY::SUB
}
fn clone_dyn(&self) -> Box<dyn Events> {

View File

@@ -11,6 +11,7 @@ use items_0::AsAnyMut;
use items_0::AsAnyRef;
use items_0::Empty;
use items_0::Events;
use items_0::EventsNonObj;
use items_0::WithLen;
use netpod::log::*;
use netpod::timeunits::SEC;
@@ -667,7 +668,18 @@ impl<STY> items_0::TypeName for EventsDim1<STY> {
}
}
impl<NTY: ScalarOps> Events for EventsDim1<NTY> {
impl<STY: ScalarOps> EventsNonObj for EventsDim1<STY> {
fn into_tss_pulses(self: Box<Self>) -> (VecDeque<u64>, VecDeque<u64>) {
info!(
"EventsDim1::into_tss_pulses len {} len {}",
self.tss.len(),
self.pulses.len()
);
(self.tss, self.pulses)
}
}
impl<STY: ScalarOps> Events for EventsDim1<STY> {
fn as_time_binnable(&self) -> &dyn TimeBinnable {
self as &dyn TimeBinnable
}
@@ -734,7 +746,7 @@ impl<NTY: ScalarOps> Events for EventsDim1<NTY> {
Box::new(Self::empty())
}
fn drain_into(&mut self, dst: &mut Box<dyn Events>, range: (usize, usize)) -> Result<(), ()> {
fn drain_into(&mut self, dst: &mut Box<dyn Events>, range: (usize, usize)) -> Result<(), items_0::MergeError> {
// TODO as_any and as_any_mut are declared on unrelated traits. Simplify.
if let Some(dst) = dst.as_mut().as_any_mut().downcast_mut::<Self>() {
// TODO make it harder to forget new members when the struct may get modified in the future
@@ -745,7 +757,7 @@ impl<NTY: ScalarOps> Events for EventsDim1<NTY> {
Ok(())
} else {
error!("downcast to EventsDim0 FAILED");
Err(())
Err(items_0::MergeError::NotCompatible)
}
}
@@ -797,7 +809,7 @@ impl<NTY: ScalarOps> Events for EventsDim1<NTY> {
}
fn nty_id(&self) -> u32 {
NTY::SUB
STY::SUB
}
fn clone_dyn(&self) -> Box<dyn Events> {

View File

@@ -188,9 +188,7 @@ impl crate::merger::Mergeable for Box<dyn Events> {
}
fn drain_into(&mut self, dst: &mut Self, range: (usize, usize)) -> Result<(), merger::MergeError> {
self.as_mut()
.drain_into(dst, range)
.map_err(|()| merger::MergeError::NotCompatible)
self.as_mut().drain_into(dst, range)
}
fn find_lowest_index_gt(&self, ts: u64) -> Option<usize> {
@@ -246,8 +244,9 @@ pub fn empty_events_dyn_ev(
STRING => Box::new(K::<String>::empty()),
}
}
_ => {
error!("TODO empty_events_dyn_2 {scalar_type:?} {shape:?} {agg_kind:?}");
AggKind::PulseIdDiff => Box::new(eventsdim0::EventsDim0::<i64>::empty()),
AggKind::DimXBins1 | AggKind::DimXBinsN(..) | AggKind::EventBlobs => {
error!("TODO empty_events_dyn_ev {agg_kind:?} {scalar_type:?} {shape:?}");
err::todoval()
}
},
@@ -270,13 +269,14 @@ pub fn empty_events_dyn_ev(
STRING => Box::new(K::<String>::empty()),
}
}
_ => {
error!("TODO empty_events_dyn_2 {scalar_type:?} {shape:?} {agg_kind:?}");
AggKind::PulseIdDiff => Box::new(eventsdim0::EventsDim0::<i64>::empty()),
AggKind::DimXBins1 | AggKind::DimXBinsN(..) | AggKind::EventBlobs => {
error!("TODO empty_events_dyn_ev {agg_kind:?} {scalar_type:?} {shape:?}");
err::todoval()
}
},
Shape::Image(..) => {
error!("TODO empty_events_dyn_2 {scalar_type:?} {shape:?} {agg_kind:?}");
error!("TODO empty_events_dyn_ev {agg_kind:?} {scalar_type:?} {shape:?}");
err::todoval()
}
};
@@ -300,14 +300,18 @@ pub fn empty_binned_dyn_tb(scalar_type: &ScalarType, shape: &Shape, agg_kind: &A
I64 => Box::new(K::<i64>::empty()),
F32 => Box::new(K::<f32>::empty()),
F64 => Box::new(K::<f64>::empty()),
_ => {
error!("TODO empty_binned_dyn");
BOOL | STRING => {
error!("TODO empty_binned_dyn_tb {agg_kind:?} {scalar_type:?} {shape:?}");
err::todoval()
}
}
}
_ => {
error!("TODO empty_binned_dyn");
AggKind::Plain
| AggKind::DimXBins1
| AggKind::DimXBinsN(..)
| AggKind::EventBlobs
| AggKind::PulseIdDiff => {
error!("TODO empty_binned_dyn_tb {agg_kind:?} {scalar_type:?} {shape:?}");
err::todoval()
}
},
@@ -317,21 +321,32 @@ pub fn empty_binned_dyn_tb(scalar_type: &ScalarType, shape: &Shape, agg_kind: &A
type K<T> = binsdim0::BinsDim0<T>;
match scalar_type {
U8 => Box::new(K::<u8>::empty()),
U16 => Box::new(K::<u16>::empty()),
U32 => Box::new(K::<u32>::empty()),
U64 => Box::new(K::<u64>::empty()),
I8 => Box::new(K::<i8>::empty()),
I16 => Box::new(K::<i16>::empty()),
I32 => Box::new(K::<i32>::empty()),
I64 => Box::new(K::<i64>::empty()),
F32 => Box::new(K::<f32>::empty()),
F64 => Box::new(K::<f64>::empty()),
_ => {
error!("TODO empty_binned_dyn");
BOOL | STRING => {
error!("TODO empty_binned_dyn_tb {agg_kind:?} {scalar_type:?} {shape:?}");
err::todoval()
}
}
}
_ => {
error!("TODO empty_binned_dyn");
AggKind::EventBlobs
| AggKind::DimXBinsN(..)
| AggKind::Plain
| AggKind::TimeWeightedScalar
| AggKind::PulseIdDiff => {
error!("TODO empty_binned_dyn_tb {agg_kind:?} {scalar_type:?} {shape:?}");
err::todoval()
}
},
Shape::Image(..) => {
error!("TODO empty_binned_dyn");
error!("TODO empty_binned_dyn_tb {agg_kind:?} {scalar_type:?} {shape:?}");
err::todoval()
}
}

View File

@@ -31,17 +31,8 @@ macro_rules! trace4 {
($($arg:tt)*) => (trace!($($arg)*));
}
#[derive(Debug)]
pub enum MergeError {
NotCompatible,
Full,
}
impl From<MergeError> for err::Error {
fn from(e: MergeError) -> Self {
format!("{e:?}").into()
}
}
// TODO
pub use items_0::MergeError;
pub trait Mergeable<Rhs = Self>: fmt::Debug + Unpin {
fn len(&self) -> usize;
@@ -204,7 +195,8 @@ where
// Take only up to the lowest ts of the second-lowest input
let mut item = self.items[il0].take().unwrap();
trace3!("Take up to {tl1} from item {item:?}");
match self.take_into_output_upto(&mut item, tl1) {
let res = self.take_into_output_upto(&mut item, tl1);
match res {
Ok(()) => {
if item.len() == 0 {
// TODO should never be here because we should have taken the whole item
@@ -216,7 +208,7 @@ where
}
Err(MergeError::Full) | Err(MergeError::NotCompatible) => {
// TODO count for stats
trace3!("Put item back");
info!("Put item back because {res:?}");
self.items[il0] = Some(item);
self.do_clear_out = true;
Ok(Break(()))

View File

@@ -1565,6 +1565,7 @@ pub enum AggKind {
DimXBinsN(u32),
Plain,
TimeWeightedScalar,
PulseIdDiff,
}
impl AggKind {
@@ -1575,6 +1576,7 @@ impl AggKind {
Self::DimXBins1 => false,
Self::DimXBinsN(_) => false,
Self::Plain => false,
Self::PulseIdDiff => false,
}
}
@@ -1585,6 +1587,7 @@ impl AggKind {
Self::DimXBins1 => false,
Self::DimXBinsN(_) => false,
Self::Plain => false,
Self::PulseIdDiff => false,
}
}
}
@@ -1610,6 +1613,7 @@ pub fn x_bin_count(shape: &Shape, agg_kind: &AggKind) -> usize {
Shape::Wave(n) => *n as usize,
Shape::Image(j, k) => *j as usize * *k as usize,
},
AggKind::PulseIdDiff => 0,
}
}
@@ -1631,6 +1635,9 @@ impl fmt::Display for AggKind {
Self::TimeWeightedScalar => {
write!(fmt, "TimeWeightedScalar")
}
Self::PulseIdDiff => {
write!(fmt, "PulseIdDiff")
}
}
}
}
@@ -1655,6 +1662,8 @@ impl FromStr for AggKind {
} else if s.starts_with(nmark) {
let nbins: u32 = s[nmark.len()..].parse()?;
Ok(AggKind::DimXBinsN(nbins))
} else if s == "PulseIdDiff" {
Ok(AggKind::PulseIdDiff)
} else {
Err(Error::with_msg(format!("can not parse {} as AggKind", s)))
}

View File

@@ -519,6 +519,9 @@ pub fn binning_scheme_append_to_url(agg_kind: &AggKind, url: &mut Url) {
g.append_pair("binningScheme", "binnedX");
g.append_pair("binnedXcount", &format!("{}", n));
}
AggKind::PulseIdDiff => {
g.append_pair("binningScheme", "pulseIdDiff");
}
}
}
@@ -537,6 +540,8 @@ pub fn agg_kind_from_binning_scheme(pairs: &BTreeMap<String, String>) -> Result<
} else if s == "binnedX" {
let u = pairs.get("binnedXcount").map_or("1", |k| k).parse()?;
AggKind::DimXBinsN(u)
} else if s == "pulseIdDiff" {
AggKind::PulseIdDiff
} else {
return Err(Error::with_msg("can not extract binningScheme"));
};

View File

@@ -117,6 +117,34 @@ async fn make_channel_events_stream(
do_test_stream_error,
);
let stream = stream
.map({
let agg_kind = evq.agg_kind_value();
move |item| match item {
Ok(item) => {
let x = if let AggKind::PulseIdDiff = agg_kind {
let x = match item {
ChannelEvents::Events(item) => {
let (tss, pulses) = items_0::EventsNonObj::into_tss_pulses(item);
let mut item = items_2::eventsdim0::EventsDim0::empty();
let mut pulse_last = pulses.front().map_or(0, |&x| x);
for (ts, pulse) in tss.into_iter().zip(pulses) {
let value = pulse as i64 - pulse_last as i64;
item.push(ts, pulse, value);
pulse_last = pulse;
}
ChannelEvents::Events(Box::new(item))
}
ChannelEvents::Status(x) => ChannelEvents::Status(x),
};
x
} else {
item
};
Ok(x)
}
Err(e) => Err(e),
}
})
.map(move |item| match &item {
Ok(k) => match k {
ChannelEvents::Events(k) => {

View File

@@ -44,7 +44,15 @@ pub async fn plain_events_json(
//let inps = open_tcp_streams::<_, Box<dyn items_2::Events>>(&query, cluster).await?;
// TODO propagate also the max-buf-len for the first stage event reader:
let stream = items_2::merger::Merger::new(inps, 1024);
let stream = stream.map(|item| {
info!("item after merge: {item:?}");
item
});
let stream = crate::rangefilter2::RangeFilter2::new(stream, query.range().clone(), evquery.one_before_range());
let stream = stream.map(|item| {
info!("item after rangefilter: {item:?}");
item
});
let stream = stream::iter([empty]).chain(stream);
let collected = crate::collect::collect(stream, deadline, events_max, Some(query.range().clone()), None).await?;
let jsval = serde_json::to_value(&collected)?;

View File

@@ -176,6 +176,7 @@ where
Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) => match self.handle_item(item) {
Ok(item) => Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))))),
Err(e) => {
error!("sees: {e}");
self.data_done = true;
Ready(Some(Err(e)))
}

View File

@@ -9,18 +9,24 @@ use crate::frames::eventsfromframes::EventsFromFrames;
use crate::frames::inmem::InMemoryFrameAsyncReadStream;
use err::Error;
use futures_util::Stream;
use futures_util::StreamExt;
use items::eventfull::EventFull;
use items::frame::{make_frame, make_term_frame};
use items::frame::make_frame;
use items::frame::make_term_frame;
use items::sitem_data;
use items::{EventQueryJsonStringFrame, RangeCompletableItem, Sitemty, StreamItem};
use items::EventQueryJsonStringFrame;
use items::RangeCompletableItem;
use items::Sitemty;
use items::StreamItem;
use netpod::log::*;
use netpod::query::PlainEventsQuery;
use netpod::Cluster;
use netpod::{Node, PerfOpts};
use netpod::Node;
use netpod::PerfOpts;
use std::fmt;
use std::pin::Pin;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;
use tracing::Instrument;
pub async fn x_processed_event_blobs_stream_from_node(
query: PlainEventsQuery,
@@ -55,7 +61,7 @@ pub async fn open_tcp_streams<Q, T>(query: Q, cluster: &Cluster) -> Result<Vec<B
where
Q: serde::Serialize,
// Group bounds in new trait
T: items::FrameTypeInnerStatic + serde::de::DeserializeOwned + Send + Unpin + 'static,
T: items::FrameTypeInnerStatic + serde::de::DeserializeOwned + Send + Unpin + fmt::Debug + 'static,
{
// TODO when unit tests established, change to async connect:
let mut streams = Vec::new();
@@ -73,11 +79,13 @@ where
netout.flush().await?;
netout.forget();
// TODO for images, we need larger buffer capacity
let frames = InMemoryFrameAsyncReadStream::new(netin, 1024 * 1024 * 2)
//.instrument(netpod::log::span!(netpod::log::Level::TRACE, "InMemRd"))
;
let frames = InMemoryFrameAsyncReadStream::new(netin, 1024 * 1024 * 2);
let frames = Box::pin(frames) as _;
let stream = EventsFromFrames::<T>::new(frames);
let stream = stream.map(|x| {
info!("tcp stream recv sees item {x:?}");
x
});
streams.push(Box::pin(stream) as _);
}
Ok(streams)