From 2502f7a5741b1d258ba92db8bdaeee270fff4d04 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 21 Jul 2021 18:48:54 +0200 Subject: [PATCH] Introduce EventAppendable --- archapp/src/events.rs | 167 ++++++------ archapp/src/lib.rs | 492 +++++++++++++++++++++++++++++++---- archapp/src/parse.rs | 55 ++-- disk/src/agg/enp.rs | 4 +- disk/src/binned.rs | 4 +- disk/src/binned/pbv.rs | 6 +- disk/src/binned/prebinned.rs | 2 +- disk/src/channelexec.rs | 8 +- disk/src/decode.rs | 33 +-- disk/src/raw/conn.rs | 2 +- items/src/eventvalues.rs | 18 +- items/src/lib.rs | 7 +- items/src/waveevents.rs | 39 ++- netpod/src/lib.rs | 8 + 14 files changed, 642 insertions(+), 203 deletions(-) diff --git a/archapp/src/events.rs b/archapp/src/events.rs index 01618fc..c0b1b48 100644 --- a/archapp/src/events.rs +++ b/archapp/src/events.rs @@ -1,15 +1,20 @@ use crate::parse::PbFileReader; -use crate::EventsItem; +use crate::{EventsItem, PlainEvents, ScalarPlainEvents, WavePlainEvents, XBinnedEvents}; use chrono::{TimeZone, Utc}; use err::Error; use futures_core::Stream; use futures_util::StreamExt; use items::eventvalues::EventValues; -use items::{Framable, RangeCompletableItem, Sitemty, StreamItem}; +use items::waveevents::{WaveEvents, WaveXBinner}; +use items::xbinnedscalarevents::XBinnedScalarEvents; +use items::xbinnedwaveevents::XBinnedWaveEvents; +use items::RangeCompletableItem::RangeComplete; +use items::{EventsNodeProcessor, Framable, RangeCompletableItem, Sitemty, SitemtyFrameType, StreamItem}; use netpod::log::*; use netpod::query::RawEventsQuery; use netpod::timeunits::{DAY, SEC}; -use netpod::{ArchiverAppliance, Channel, ChannelInfo, NanoRange, ScalarType, Shape}; +use netpod::{AggKind, ArchiverAppliance, Channel, ChannelInfo, HasScalarType, HasShape, NanoRange, ScalarType, Shape}; +use serde::Serialize; use serde_json::Value as JsonValue; use std::path::PathBuf; use std::pin::Pin; @@ -159,18 +164,47 @@ trait FrameMakerTrait: Send { struct FrameMaker { scalar_type: ScalarType, shape: Shape, + agg_kind: AggKind, +} + +impl FrameMaker { + fn make_frame_gen(item: Sitemty) -> Box + where + T: SitemtyFrameType + Serialize + Send + 'static, + { + match item { + Ok(_) => err::todoval(), + Err(e) => { + //let t = Ok(StreamItem::DataItem(RangeCompletableItem::Data())) + let t: Sitemty = Err(e); + Box::new(t) + } + } + } } macro_rules! events_item_to_sitemty { - ($ei:expr, $var:ident) => {{ - let d = match $ei { - Ok(j) => match j { - StreamItem::DataItem(j) => match j { - RangeCompletableItem::Data(j) => { - if let EventsItem::$var(j) = j { - Ok(StreamItem::DataItem(RangeCompletableItem::Data(j))) - } else { - Err(Error::with_msg_no_trace("unexpected variant")) + ($ei:expr, $t1:ident, $t2:ident, $t3:ident) => {{ + let ret = match $ei { + Ok(k) => match k { + StreamItem::DataItem(k) => match k { + RangeCompletableItem::Data(k) => { + // + match k { + EventsItem::Plain(h) => { + // + match h { + PlainEvents::$t1(h) => { + // + match h { + $t2::$t3(h) => Ok(StreamItem::DataItem(RangeCompletableItem::Data(h))), + _ => panic!(), + } + } + _ => panic!(), + } + } + _ => panic!(), } } RangeCompletableItem::RangeComplete => { @@ -182,30 +216,40 @@ macro_rules! events_item_to_sitemty { }, Err(e) => Err(e), }; - Box::new(d) + Box::new(ret) + }}; +} + +macro_rules! arm1 { + ($item:expr, $sty:ident, $shape:expr, $ak:expr) => {{ + match $shape { + Shape::Scalar => match $ak { + AggKind::Plain => Self::make_frame_gen::>($item), + AggKind::DimXBins1 => Self::make_frame_gen::>($item), + AggKind::DimXBinsN(_) => Self::make_frame_gen::>($item), + }, + Shape::Wave(_) => match $ak { + AggKind::Plain => Self::make_frame_gen::>($item), + AggKind::DimXBins1 => Self::make_frame_gen::>($item), + AggKind::DimXBinsN(_) => Self::make_frame_gen::>($item), + }, + } }}; } impl FrameMakerTrait for FrameMaker { - fn make_frame(&self, ei: Sitemty) -> Box { + fn make_frame(&self, item: Sitemty) -> Box { + // Take from `self` the expected inner type. + // If `ei` is not some data, then I can't dynamically determine the expected T of Sitemty. + // Therefore, I need to decide that based on given parameters. // see also channel_info in this mod. - match self.shape { - Shape::Scalar => match self.scalar_type { - ScalarType::I8 => events_item_to_sitemty!(ei, ScalarByte), - ScalarType::I16 => events_item_to_sitemty!(ei, ScalarShort), - ScalarType::I32 => events_item_to_sitemty!(ei, ScalarInt), - ScalarType::F32 => events_item_to_sitemty!(ei, ScalarFloat), - ScalarType::F64 => events_item_to_sitemty!(ei, ScalarDouble), - _ => panic!(), - }, - Shape::Wave(_) => match self.scalar_type { - ScalarType::I8 => events_item_to_sitemty!(ei, WaveByte), - ScalarType::I16 => events_item_to_sitemty!(ei, WaveShort), - ScalarType::I32 => events_item_to_sitemty!(ei, WaveInt), - ScalarType::F32 => events_item_to_sitemty!(ei, WaveFloat), - ScalarType::F64 => events_item_to_sitemty!(ei, WaveDouble), - _ => panic!(), - }, + match self.scalar_type { + ScalarType::I8 => arm1!(item, i8, self.shape, self.agg_kind), + ScalarType::I16 => arm1!(item, i16, self.shape, self.agg_kind), + ScalarType::I32 => arm1!(item, i32, self.shape, self.agg_kind), + ScalarType::F32 => arm1!(item, f32, self.shape, self.agg_kind), + ScalarType::F64 => arm1!(item, f64, self.shape, self.agg_kind), + _ => err::todoval(), } } } @@ -230,6 +274,7 @@ pub async fn make_event_pipe( let frame_maker = Box::new(FrameMaker { scalar_type: ci.scalar_type.clone(), shape: ci.shape.clone(), + agg_kind: evq.agg_kind.clone(), }) as Box; let ret = sm.map(move |j| frame_maker.make_frame(j)); Ok(Box::pin(ret)) @@ -239,6 +284,8 @@ pub async fn make_single_event_pipe( evq: &RawEventsQuery, base_path: PathBuf, ) -> Result> + Send>>, Error> { + // TODO must apply the proper x-binning depending on the requested AggKind. + info!("make_event_pipe {:?}", evq); let evq = evq.clone(); let DirAndPrefix { dir, prefix } = directory_for_channel_files(&evq.channel, base_path)?; @@ -249,6 +296,7 @@ pub async fn make_single_event_pipe( info!("start read of {:?}", dir); // TODO first collect all matching filenames, then sort, then open files. + // TODO if dir does not exist, should notify client but not log as error. let mut rd = tokio::fs::read_dir(&dir).await?; while let Some(de) = rd.next_entry().await? { let s = de.file_name().to_string_lossy().into_owned(); @@ -271,7 +319,8 @@ pub async fn make_single_event_pipe( match pbr.read_msg().await { Ok(ei) => { info!("read msg from file"); - let g = Ok(StreamItem::DataItem(RangeCompletableItem::Data(ei))); + let ei2 = ei.x_aggregate(&evq.agg_kind); + let g = Ok(StreamItem::DataItem(RangeCompletableItem::Data(ei2))); tx.send(g).await?; } Err(e) => { @@ -307,7 +356,7 @@ pub async fn make_single_event_pipe( #[allow(unused)] fn events_item_to_framable(ei: EventsItem) -> Result, Error> { match ei { - EventsItem::ScalarDouble(h) => { + EventsItem::Plain(PlainEvents::Scalar(ScalarPlainEvents::Int(h))) => { let range: NanoRange = err::todoval(); let (x, y) = h .tss @@ -378,57 +427,9 @@ pub async fn channel_info(channel: &Channel, aa: &ArchiverAppliance) -> Result { msgs.push(format!("got event {:?}", item)); - shape = Some(match &item { - EventsItem::ScalarByte(_) => Shape::Scalar, - EventsItem::ScalarShort(_) => Shape::Scalar, - EventsItem::ScalarInt(_) => Shape::Scalar, - EventsItem::ScalarFloat(_) => Shape::Scalar, - EventsItem::ScalarDouble(_) => Shape::Scalar, - // TODO use macro: - EventsItem::WaveByte(item) => Shape::Wave( - item.vals - .first() - .ok_or_else(|| Error::with_msg_no_trace("empty event batch"))? - .len() as u32, - ), - EventsItem::WaveShort(item) => Shape::Wave( - item.vals - .first() - .ok_or_else(|| Error::with_msg_no_trace("empty event batch"))? - .len() as u32, - ), - EventsItem::WaveInt(item) => Shape::Wave( - item.vals - .first() - .ok_or_else(|| Error::with_msg_no_trace("empty event batch"))? - .len() as u32, - ), - EventsItem::WaveFloat(item) => Shape::Wave( - item.vals - .first() - .ok_or_else(|| Error::with_msg_no_trace("empty event batch"))? - .len() as u32, - ), - EventsItem::WaveDouble(item) => Shape::Wave( - item.vals - .first() - .ok_or_else(|| Error::with_msg_no_trace("empty event batch"))? - .len() as u32, - ), - }); + shape = Some(item.shape()); // These type mappings are defined by the protobuffer schema. - scalar_type = Some(match item { - EventsItem::ScalarByte(_) => ScalarType::I8, - EventsItem::ScalarShort(_) => ScalarType::I16, - EventsItem::ScalarInt(_) => ScalarType::I32, - EventsItem::ScalarFloat(_) => ScalarType::F32, - EventsItem::ScalarDouble(_) => ScalarType::F64, - EventsItem::WaveByte(_) => ScalarType::I8, - EventsItem::WaveShort(_) => ScalarType::I16, - EventsItem::WaveInt(_) => ScalarType::I32, - EventsItem::WaveFloat(_) => ScalarType::F32, - EventsItem::WaveDouble(_) => ScalarType::F64, - }); + scalar_type = Some(item.scalar_type()); break; } Err(e) => { diff --git a/archapp/src/lib.rs b/archapp/src/lib.rs index a72221c..4863318 100644 --- a/archapp/src/lib.rs +++ b/archapp/src/lib.rs @@ -9,8 +9,11 @@ pub mod parse; #[cfg(not(feature = "devread"))] pub mod parsestub; use items::eventvalues::EventValues; -use items::waveevents::WaveEvents; -use items::{WithLen, WithTimestamps}; +use items::numops::NumOps; +use items::waveevents::{WaveEvents, WaveXBinner}; +use items::xbinnedscalarevents::XBinnedScalarEvents; +use items::{EventsNodeProcessor, Framable, SitemtyFrameType, WithLen, WithTimestamps}; +use netpod::{AggKind, HasScalarType, HasShape, ScalarType, Shape}; #[cfg(not(feature = "devread"))] pub use parsestub as parse; @@ -43,46 +46,435 @@ fn unescape_archapp_msg(inp: &[u8]) -> Result, Error> { Ok(ret) } +#[derive(Debug)] +pub enum ScalarPlainEvents { + Byte(EventValues), + Short(EventValues), + Int(EventValues), + Float(EventValues), + Double(EventValues), +} + +impl ScalarPlainEvents { + pub fn variant_name(&self) -> String { + use ScalarPlainEvents::*; + match self { + Byte(h) => format!("Byte"), + Short(h) => format!("Short"), + Int(h) => format!("Int"), + Float(h) => format!("Float"), + Double(h) => format!("Double"), + } + } +} + +impl WithLen for ScalarPlainEvents { + fn len(&self) -> usize { + use ScalarPlainEvents::*; + match self { + Byte(j) => j.len(), + Short(j) => j.len(), + Int(j) => j.len(), + Float(j) => j.len(), + Double(j) => j.len(), + } + } +} + +impl WithTimestamps for ScalarPlainEvents { + fn ts(&self, ix: usize) -> u64 { + use ScalarPlainEvents::*; + match self { + Byte(j) => j.ts(ix), + Short(j) => j.ts(ix), + Int(j) => j.ts(ix), + Float(j) => j.ts(ix), + Double(j) => j.ts(ix), + } + } +} + +impl HasShape for ScalarPlainEvents { + fn shape(&self) -> Shape { + use ScalarPlainEvents::*; + match self { + _ => Shape::Scalar, + } + } +} + +impl HasScalarType for ScalarPlainEvents { + fn scalar_type(&self) -> ScalarType { + use ScalarPlainEvents::*; + match self { + Byte(h) => ScalarType::I8, + Short(h) => ScalarType::I16, + Int(h) => ScalarType::I32, + Float(h) => ScalarType::F32, + Double(h) => ScalarType::F64, + } + } +} + +#[derive(Debug)] +pub enum WavePlainEvents { + Byte(WaveEvents), + Short(WaveEvents), + Int(WaveEvents), + Float(WaveEvents), + Double(WaveEvents), +} + +fn tmp1() { + let ev = EventValues:: { + tss: vec![], + values: vec![], + }; + ::is_nan(err::todoval()); + as SitemtyFrameType>::FRAME_TYPE_ID; + // as NumOps>::is_nan(err::todoval()); + //> as SitemtyFrameType>::FRAME_TYPE_ID; +} + +impl WavePlainEvents { + pub fn variant_name(&self) -> String { + use WavePlainEvents::*; + match self { + Byte(h) => format!("Byte({})", h.vals.first().map_or(0, |j| j.len())), + Short(h) => format!("Short({})", h.vals.first().map_or(0, |j| j.len())), + Int(h) => format!("Int({})", h.vals.first().map_or(0, |j| j.len())), + Float(h) => format!("Float({})", h.vals.first().map_or(0, |j| j.len())), + Double(h) => format!("Double({})", h.vals.first().map_or(0, |j| j.len())), + } + } + + fn x_aggregate(self, ak: &AggKind) -> EventsItem { + use WavePlainEvents::*; + let shape = self.shape(); + match self { + Byte(k) => match ak { + AggKind::Plain => EventsItem::Plain(PlainEvents::Wave(WavePlainEvents::Byte(k))), + AggKind::DimXBins1 => { + let p = WaveXBinner::create(shape, ak.clone()); + let j = p.process(k); + EventsItem::XBinnedEvents(XBinnedEvents::SingleBinWave(SingleBinWaveEvents::Byte(j))) + } + AggKind::DimXBinsN(_) => EventsItem::Plain(PlainEvents::Wave(err::todoval())), + }, + _ => panic!(), + } + } +} + +impl WithLen for WavePlainEvents { + fn len(&self) -> usize { + use WavePlainEvents::*; + match self { + Byte(j) => j.len(), + Short(j) => j.len(), + Int(j) => j.len(), + Float(j) => j.len(), + Double(j) => j.len(), + } + } +} + +impl WithTimestamps for WavePlainEvents { + fn ts(&self, ix: usize) -> u64 { + use WavePlainEvents::*; + match self { + Byte(j) => j.ts(ix), + Short(j) => j.ts(ix), + Int(j) => j.ts(ix), + Float(j) => j.ts(ix), + Double(j) => j.ts(ix), + } + } +} + +impl HasShape for WavePlainEvents { + fn shape(&self) -> Shape { + use WavePlainEvents::*; + match self { + Byte(h) => Shape::Wave(h.vals.first().map_or(0, |x| x.len() as u32)), + Short(h) => Shape::Wave(h.vals.first().map_or(0, |x| x.len() as u32)), + Int(h) => Shape::Wave(h.vals.first().map_or(0, |x| x.len() as u32)), + Float(h) => Shape::Wave(h.vals.first().map_or(0, |x| x.len() as u32)), + Double(h) => Shape::Wave(h.vals.first().map_or(0, |x| x.len() as u32)), + } + } +} + +impl HasScalarType for WavePlainEvents { + fn scalar_type(&self) -> ScalarType { + use WavePlainEvents::*; + match self { + Byte(h) => ScalarType::I8, + Short(h) => ScalarType::I16, + Int(h) => ScalarType::I32, + Float(h) => ScalarType::F32, + Double(h) => ScalarType::F64, + } + } +} + +#[derive(Debug)] +pub enum SingleBinWaveEvents { + Byte(XBinnedScalarEvents), + Short(XBinnedScalarEvents), + Int(XBinnedScalarEvents), + Float(XBinnedScalarEvents), + Double(XBinnedScalarEvents), +} + +impl SingleBinWaveEvents { + pub fn variant_name(&self) -> String { + use SingleBinWaveEvents::*; + match self { + Byte(h) => format!("Byte"), + Short(h) => format!("Short"), + Int(h) => format!("Int"), + Float(h) => format!("Float"), + Double(h) => format!("Double"), + } + } + + fn x_aggregate(self, ak: &AggKind) -> EventsItem { + use SingleBinWaveEvents::*; + match self { + Byte(k) => match ak { + AggKind::Plain => EventsItem::XBinnedEvents(XBinnedEvents::SingleBinWave(SingleBinWaveEvents::Byte(k))), + AggKind::DimXBins1 => err::todoval(), + AggKind::DimXBinsN(_) => EventsItem::Plain(PlainEvents::Wave(err::todoval())), + }, + _ => panic!(), + } + } +} + +impl WithLen for SingleBinWaveEvents { + fn len(&self) -> usize { + use SingleBinWaveEvents::*; + match self { + Byte(j) => j.len(), + Short(j) => j.len(), + Int(j) => j.len(), + Float(j) => j.len(), + Double(j) => j.len(), + } + } +} + +impl WithTimestamps for SingleBinWaveEvents { + fn ts(&self, ix: usize) -> u64 { + use SingleBinWaveEvents::*; + match self { + Byte(j) => j.ts(ix), + Short(j) => j.ts(ix), + Int(j) => j.ts(ix), + Float(j) => j.ts(ix), + Double(j) => j.ts(ix), + } + } +} + +impl HasShape for SingleBinWaveEvents { + fn shape(&self) -> Shape { + use SingleBinWaveEvents::*; + match self { + Byte(h) => Shape::Scalar, + Short(h) => Shape::Scalar, + Int(h) => Shape::Scalar, + Float(h) => Shape::Scalar, + Double(h) => Shape::Scalar, + } + } +} + +impl HasScalarType for SingleBinWaveEvents { + fn scalar_type(&self) -> ScalarType { + use SingleBinWaveEvents::*; + match self { + Byte(h) => ScalarType::I8, + Short(h) => ScalarType::I16, + Int(h) => ScalarType::I32, + Float(h) => ScalarType::F32, + Double(h) => ScalarType::F64, + } + } +} + +#[derive(Debug)] +pub enum XBinnedEvents { + Scalar(ScalarPlainEvents), + SingleBinWave(SingleBinWaveEvents), + //MultiBinWave, +} + +impl XBinnedEvents { + pub fn variant_name(&self) -> String { + use XBinnedEvents::*; + match self { + Scalar(h) => format!("Scalar({})", h.variant_name()), + SingleBinWave(h) => format!("SingleBinWave({})", h.variant_name()), + } + } + + pub fn x_aggregate(self, ak: &AggKind) -> EventsItem { + use XBinnedEvents::*; + match self { + Scalar(k) => EventsItem::Plain(PlainEvents::Scalar(k)), + SingleBinWave(k) => k.x_aggregate(ak), + } + } +} + +impl WithLen for XBinnedEvents { + fn len(&self) -> usize { + use XBinnedEvents::*; + match self { + Scalar(j) => j.len(), + SingleBinWave(j) => j.len(), + } + } +} + +impl WithTimestamps for XBinnedEvents { + fn ts(&self, ix: usize) -> u64 { + use XBinnedEvents::*; + match self { + Scalar(j) => j.ts(ix), + SingleBinWave(j) => j.ts(ix), + } + } +} + +impl HasShape for XBinnedEvents { + fn shape(&self) -> Shape { + use XBinnedEvents::*; + match self { + Scalar(h) => h.shape(), + SingleBinWave(h) => h.shape(), + } + } +} + +impl HasScalarType for XBinnedEvents { + fn scalar_type(&self) -> ScalarType { + use XBinnedEvents::*; + match self { + Scalar(h) => h.scalar_type(), + SingleBinWave(h) => h.scalar_type(), + } + } +} + +#[derive(Debug)] +pub enum PlainEvents { + Scalar(ScalarPlainEvents), + Wave(WavePlainEvents), +} + +impl PlainEvents { + pub fn is_wave(&self) -> bool { + use PlainEvents::*; + match self { + Scalar(_) => false, + Wave(_) => true, + } + } + + pub fn variant_name(&self) -> String { + use PlainEvents::*; + match self { + Scalar(h) => format!("Scalar({})", h.variant_name()), + Wave(h) => format!("Scalar({})", h.variant_name()), + } + } + + pub fn x_aggregate(self, ak: &AggKind) -> EventsItem { + use PlainEvents::*; + match self { + Scalar(k) => EventsItem::Plain(PlainEvents::Scalar(k)), + Wave(k) => k.x_aggregate(ak), + } + } +} + +impl WithLen for PlainEvents { + fn len(&self) -> usize { + use PlainEvents::*; + match self { + Scalar(j) => j.len(), + Wave(j) => j.len(), + } + } +} + +impl WithTimestamps for PlainEvents { + fn ts(&self, ix: usize) -> u64 { + use PlainEvents::*; + match self { + Scalar(j) => j.ts(ix), + Wave(j) => j.ts(ix), + } + } +} + +impl HasShape for PlainEvents { + fn shape(&self) -> Shape { + use PlainEvents::*; + match self { + Scalar(h) => h.shape(), + Wave(h) => h.shape(), + } + } +} + +impl HasScalarType for PlainEvents { + fn scalar_type(&self) -> ScalarType { + use PlainEvents::*; + match self { + Scalar(h) => h.scalar_type(), + Wave(h) => h.scalar_type(), + } + } +} + #[derive(Debug)] pub enum EventsItem { - ScalarByte(EventValues), - ScalarShort(EventValues), - ScalarInt(EventValues), - ScalarFloat(EventValues), - ScalarDouble(EventValues), - WaveByte(WaveEvents), - WaveShort(WaveEvents), - WaveInt(WaveEvents), - WaveFloat(WaveEvents), - WaveDouble(WaveEvents), + Plain(PlainEvents), + XBinnedEvents(XBinnedEvents), } impl EventsItem { pub fn is_wave(&self) -> bool { use EventsItem::*; match self { - WaveByte(_) => true, - WaveShort(_) => true, - WaveInt(_) => true, - WaveFloat(_) => true, - WaveDouble(_) => true, - _ => false, + Plain(h) => h.is_wave(), + XBinnedEvents(h) => { + if let Shape::Wave(_) = h.shape() { + true + } else { + false + } + } } } pub fn variant_name(&self) -> String { use EventsItem::*; match self { - ScalarByte(item) => format!("ScalarByte"), - ScalarShort(item) => format!("ScalarShort"), - ScalarInt(item) => format!("ScalarInt"), - ScalarFloat(item) => format!("ScalarFloat"), - ScalarDouble(item) => format!("ScalarDouble"), - WaveByte(item) => format!("WaveByte({})", item.len()), - WaveShort(item) => format!("WaveShort({})", item.len()), - WaveInt(item) => format!("WaveInt({})", item.len()), - WaveFloat(item) => format!("WaveFloat({})", item.len()), - WaveDouble(item) => format!("WaveDouble({})", item.len()), + Plain(h) => format!("Plain({})", h.variant_name()), + XBinnedEvents(h) => format!("Plain({})", h.variant_name()), + } + } + + pub fn x_aggregate(self, ak: &AggKind) -> EventsItem { + use EventsItem::*; + match self { + Plain(k) => k.x_aggregate(ak), + XBinnedEvents(k) => k.x_aggregate(ak), } } } @@ -91,16 +483,8 @@ impl WithLen for EventsItem { fn len(&self) -> usize { use EventsItem::*; match self { - ScalarByte(j) => j.len(), - ScalarShort(j) => j.len(), - ScalarInt(j) => j.len(), - ScalarFloat(j) => j.len(), - ScalarDouble(j) => j.len(), - WaveByte(j) => j.len(), - WaveShort(j) => j.len(), - WaveInt(j) => j.len(), - WaveFloat(j) => j.len(), - WaveDouble(j) => j.len(), + Plain(j) => j.len(), + XBinnedEvents(j) => j.len(), } } } @@ -109,16 +493,28 @@ impl WithTimestamps for EventsItem { fn ts(&self, ix: usize) -> u64 { use EventsItem::*; match self { - ScalarByte(j) => j.ts(ix), - ScalarShort(j) => j.ts(ix), - ScalarInt(j) => j.ts(ix), - ScalarFloat(j) => j.ts(ix), - ScalarDouble(j) => j.ts(ix), - WaveByte(j) => j.ts(ix), - WaveShort(j) => j.ts(ix), - WaveInt(j) => j.ts(ix), - WaveFloat(j) => j.ts(ix), - WaveDouble(j) => j.ts(ix), + Plain(j) => j.ts(ix), + XBinnedEvents(j) => j.ts(ix), + } + } +} + +impl HasShape for EventsItem { + fn shape(&self) -> Shape { + use EventsItem::*; + match self { + Plain(h) => h.shape(), + XBinnedEvents(h) => h.shape(), + } + } +} + +impl HasScalarType for EventsItem { + fn scalar_type(&self) -> ScalarType { + use EventsItem::*; + match self { + Plain(h) => h.scalar_type(), + XBinnedEvents(h) => h.scalar_type(), } } } diff --git a/archapp/src/parse.rs b/archapp/src/parse.rs index 12fcc04..aa736ac 100644 --- a/archapp/src/parse.rs +++ b/archapp/src/parse.rs @@ -1,6 +1,6 @@ use crate::events::parse_data_filename; use crate::generated::EPICSEvent::PayloadType; -use crate::{unescape_archapp_msg, EventsItem}; +use crate::{unescape_archapp_msg, EventsItem, PlainEvents, ScalarPlainEvents, WavePlainEvents}; use archapp_xc::*; use async_channel::{bounded, Receiver}; use chrono::{TimeZone, Utc}; @@ -33,16 +33,16 @@ pub struct PbFileReader { fn parse_scalar_byte(m: &[u8], year: u32) -> Result { let msg = crate::generated::EPICSEvent::ScalarByte::parse_from_bytes(m) .map_err(|_| Error::with_msg(format!("can not parse pb-type {}", "ScalarByte")))?; - let mut t = EventValues:: { + let mut t = EventValues:: { tss: vec![], values: vec![], }; let yd = Utc.ymd(year as i32, 1, 1).and_hms(0, 0, 0); let ts = yd.timestamp() as u64 * 1000000000 + msg.get_secondsintoyear() as u64 * 1000000000 + msg.get_nano() as u64; - let v = msg.get_val().first().map_or(0, |k| *k as i32); + let v = msg.get_val().first().map_or(0, |k| *k as i8); t.tss.push(ts); t.values.push(v); - Ok(EventsItem::ScalarByte(t)) + Ok(EventsItem::Plain(PlainEvents::Scalar(ScalarPlainEvents::Byte(t)))) } macro_rules! scalar_parse { @@ -58,8 +58,8 @@ macro_rules! scalar_parse { yd.timestamp() as u64 * 1000000000 + msg.get_secondsintoyear() as u64 * 1000000000 + msg.get_nano() as u64; let v = msg.get_val(); t.tss.push(ts); - t.values.push(v); - EventsItem::$eit(t) + t.values.push(v as $evty); + EventsItem::Plain(PlainEvents::Scalar(ScalarPlainEvents::$eit(t))) }}; } @@ -76,8 +76,8 @@ macro_rules! wave_parse { yd.timestamp() as u64 * 1000000000 + msg.get_secondsintoyear() as u64 * 1000000000 + msg.get_nano() as u64; let v = msg.get_val(); t.tss.push(ts); - t.vals.push(v.to_vec()); - EventsItem::$eit(t) + t.vals.push(v.into_iter().map(|&x| x as $evty).collect()); + EventsItem::Plain(PlainEvents::Wave(WavePlainEvents::$eit(t))) }}; } @@ -119,37 +119,37 @@ impl PbFileReader { let ei = match self.payload_type { SCALAR_BYTE => parse_scalar_byte(&m, self.year)?, SCALAR_ENUM => { - scalar_parse!(&m, self.year, ScalarEnum, ScalarInt, i32) + scalar_parse!(&m, self.year, ScalarEnum, Int, i32) } SCALAR_SHORT => { - scalar_parse!(&m, self.year, ScalarShort, ScalarShort, i32) + scalar_parse!(&m, self.year, ScalarShort, Short, i16) } SCALAR_INT => { - scalar_parse!(&m, self.year, ScalarInt, ScalarInt, i32) + scalar_parse!(&m, self.year, ScalarInt, Int, i32) } SCALAR_FLOAT => { - scalar_parse!(&m, self.year, ScalarFloat, ScalarFloat, f32) + scalar_parse!(&m, self.year, ScalarFloat, Float, f32) } SCALAR_DOUBLE => { - scalar_parse!(&m, self.year, ScalarDouble, ScalarDouble, f64) + scalar_parse!(&m, self.year, ScalarDouble, Double, f64) } WAVEFORM_BYTE => { - wave_parse!(&m, self.year, VectorChar, WaveByte, u8) + wave_parse!(&m, self.year, VectorChar, Byte, i8) } WAVEFORM_SHORT => { - wave_parse!(&m, self.year, VectorShort, WaveShort, i32) + wave_parse!(&m, self.year, VectorShort, Short, i16) } WAVEFORM_ENUM => { - wave_parse!(&m, self.year, VectorEnum, WaveInt, i32) + wave_parse!(&m, self.year, VectorEnum, Int, i32) } WAVEFORM_INT => { - wave_parse!(&m, self.year, VectorInt, WaveInt, i32) + wave_parse!(&m, self.year, VectorInt, Int, i32) } WAVEFORM_FLOAT => { - wave_parse!(&m, self.year, VectorFloat, WaveFloat, f32) + wave_parse!(&m, self.year, VectorFloat, Float, f32) } WAVEFORM_DOUBLE => { - wave_parse!(&m, self.year, VectorDouble, WaveDouble, f64) + wave_parse!(&m, self.year, VectorDouble, Double, f64) } SCALAR_STRING | WAVEFORM_STRING | V4_GENERIC_BYTES => { return Err(Error::with_msg(format!("not supported: {:?}", self.payload_type))); @@ -409,7 +409,7 @@ pub async fn scan_files_inner( //tx.send(Ok(Box::new(path.clone()) as RT1)).await?; let fns = pe.path.to_str().ok_or_else(|| Error::with_msg("invalid path string"))?; if let Ok(fnp) = parse_data_filename(&fns) { - tx.send(Ok(Box::new(serde_json::to_value(fns)?) as ItemSerBox)).await?; + //tx.send(Ok(Box::new(serde_json::to_value(fns)?) as ItemSerBox)).await?; let channel_path = &fns[proots.len() + 1..fns.len() - 11]; if !lru.query(channel_path) { let mut pbr = PbFileReader::new(tokio::fs::File::open(&pe.path).await?).await; @@ -432,22 +432,21 @@ pub async fn scan_files_inner( dbconn::insert_channel(channel_path.into(), ndi.facility, &dbc).await?; } if let Ok(msg) = pbr.read_msg().await { - if msg.is_wave() { + lru.insert(channel_path); + { tx.send(Ok(Box::new(serde_json::to_value(format!( - "found {} {}", - msg.variant_name(), - channel_path + "channel {} type {}", + pbr.channel_name(), + msg.variant_name() ))?) as ItemSerBox)) .await?; - waves_found += 1; + /*waves_found += 1; if waves_found >= 20 { break; - } + }*/ } - } else { } } - lru.insert(channel_path); } } } diff --git a/disk/src/agg/enp.rs b/disk/src/agg/enp.rs index 2234d00..d481f0f 100644 --- a/disk/src/agg/enp.rs +++ b/disk/src/agg/enp.rs @@ -12,14 +12,14 @@ impl EventsNodeProcessor for Identity where NTY: NumOps, { - type Input = NTY; + type Input = EventValues; type Output = EventValues; fn create(_shape: Shape, _agg_kind: AggKind) -> Self { Self { _m1: PhantomData } } - fn process(&self, inp: EventValues) -> Self::Output { + fn process(&self, inp: Self::Input) -> Self::Output { inp } } diff --git a/disk/src/binned.rs b/disk/src/binned.rs index f6a5046..8bef4b3 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -64,7 +64,7 @@ impl ChannelExecFunction for BinnedBinaryChannelExec { NTY: NumOps + NumFromBytes + 'static, END: Endianness + 'static, EVS: EventValueShape + EventValueFromBytes + 'static, - ENP: EventsNodeProcessor>::Output> + 'static, + ENP: EventsNodeProcessor>::Batch> + 'static, // TODO require these things in general? ::Output: Collectable + PushableIndex, <::Output as TimeBinnableType>::Output: Debug @@ -309,7 +309,7 @@ impl ChannelExecFunction for BinnedJsonChannelExec { NTY: NumOps + NumFromBytes + 'static, END: Endianness + 'static, EVS: EventValueShape + EventValueFromBytes + 'static, - ENP: EventsNodeProcessor>::Output> + 'static, + ENP: EventsNodeProcessor>::Batch> + 'static, // TODO require these things in general? ::Output: Collectable + PushableIndex, <::Output as TimeBinnableType>::Output: Debug diff --git a/disk/src/binned/pbv.rs b/disk/src/binned/pbv.rs index 1d29e0d..cd3477c 100644 --- a/disk/src/binned/pbv.rs +++ b/disk/src/binned/pbv.rs @@ -34,7 +34,7 @@ where NTY: NumOps + NumFromBytes + Serialize + 'static, END: Endianness + 'static, EVS: EventValueShape + EventValueFromBytes + 'static, - ENP: EventsNodeProcessor>::Output>, + ENP: EventsNodeProcessor>::Batch>, { query: PreBinnedQuery, shape: Shape, @@ -72,7 +72,7 @@ where NTY: NumOps + NumFromBytes + Serialize + 'static, END: Endianness + 'static, EVS: EventValueShape + EventValueFromBytes + 'static, - ENP: EventsNodeProcessor>::Output> + 'static, + ENP: EventsNodeProcessor>::Batch> + 'static, ::Output: PushableIndex + Appendable, // TODO is this needed: Sitemty<::Output>: FrameType, @@ -231,7 +231,7 @@ where NTY: NumOps + NumFromBytes + Serialize + Unpin + 'static, END: Endianness + Unpin + 'static, EVS: EventValueShape + EventValueFromBytes + Unpin + 'static, - ENP: EventsNodeProcessor>::Output> + Unpin + 'static, + ENP: EventsNodeProcessor>::Batch> + Unpin + 'static, ::Output: PushableIndex + Appendable, // TODO needed? Sitemty<::Output>: FrameType, diff --git a/disk/src/binned/prebinned.rs b/disk/src/binned/prebinned.rs index 51154f8..946e564 100644 --- a/disk/src/binned/prebinned.rs +++ b/disk/src/binned/prebinned.rs @@ -28,7 +28,7 @@ where NTY: NumOps + NumFromBytes + Serialize + 'static, END: Endianness + 'static, EVS: EventValueShape + EventValueFromBytes + 'static, - ENP: EventsNodeProcessor>::Output> + 'static, + ENP: EventsNodeProcessor>::Batch> + 'static, ::Output: PushableIndex + Appendable + 'static, Sitemty<::Output>: FrameType + Framable + 'static, Sitemty<<::Output as TimeBinnableType>::Output>: diff --git a/disk/src/channelexec.rs b/disk/src/channelexec.rs index 5bdcd2a..ab9b0d5 100644 --- a/disk/src/channelexec.rs +++ b/disk/src/channelexec.rs @@ -42,7 +42,7 @@ pub trait ChannelExecFunction { NTY: NumOps + NumFromBytes + 'static, END: Endianness + 'static, EVS: EventValueShape + EventValueFromBytes + 'static, - ENP: EventsNodeProcessor>::Output> + 'static, + ENP: EventsNodeProcessor>::Batch> + 'static, // TODO require these things in general? ::Output: Debug + Collectable + PushableIndex, <::Output as TimeBinnableType>::Output: Debug @@ -68,7 +68,7 @@ where NTY: NumOps + NumFromBytes + 'static, END: Endianness + 'static, EVS: EventValueShape + EventValueFromBytes + 'static, - ENP: EventsNodeProcessor>::Output> + 'static, + ENP: EventsNodeProcessor>::Batch> + 'static, // TODO require these things in general? ::Output: Debug + Collectable + PushableIndex, <::Output as TimeBinnableType>::Output: Debug @@ -237,7 +237,7 @@ impl ChannelExecFunction for PlainEvents { NTY: NumOps + NumFromBytes + 'static, END: Endianness + 'static, EVS: EventValueShape + EventValueFromBytes + 'static, - ENP: EventsNodeProcessor>::Output> + 'static, + ENP: EventsNodeProcessor>::Batch> + 'static, { let _ = byte_order; let _ = event_value_shape; @@ -382,7 +382,7 @@ impl ChannelExecFunction for PlainEventsJson { NTY: NumOps + NumFromBytes + 'static, END: Endianness + 'static, EVS: EventValueShape + EventValueFromBytes + 'static, - ENP: EventsNodeProcessor>::Output> + 'static, + ENP: EventsNodeProcessor>::Batch> + 'static, // TODO require these things in general? ::Output: Debug + Collectable + PushableIndex, <::Output as TimeBinnableType>::Output: Debug diff --git a/disk/src/decode.rs b/disk/src/decode.rs index b058ce4..c1315c5 100644 --- a/disk/src/decode.rs +++ b/disk/src/decode.rs @@ -6,8 +6,8 @@ use futures_core::Stream; use futures_util::StreamExt; use items::eventvalues::EventValues; use items::numops::{BoolNum, NumOps}; -use items::waveevents::{WaveNBinner, WavePlainProc, WaveXBinner}; -use items::{EventsNodeProcessor, RangeCompletableItem, StreamItem}; +use items::waveevents::{WaveEvents, WaveNBinner, WavePlainProc, WaveXBinner}; +use items::{Appendable, EventAppendable, EventsNodeProcessor, RangeCompletableItem, StreamItem}; use std::marker::PhantomData; use std::mem::size_of; use std::pin::Pin; @@ -85,6 +85,7 @@ where NTY: NumFromBytes, { type Output; + type Batch: Appendable + EventAppendable; // The written data on disk has errors: // The endian as stated in the channel config does not match written events. // Therefore, can not rely on that but have to check for each single event... @@ -93,9 +94,10 @@ where impl EventValueFromBytes for EventValuesDim0Case where - NTY: NumFromBytes, + NTY: NumOps + NumFromBytes, { type Output = NTY; + type Batch = EventValues; fn convert(&self, buf: &[u8], big_endian: bool) -> Result { Ok(NTY::convert(buf, big_endian)) @@ -104,9 +106,10 @@ where impl EventValueFromBytes for EventValuesDim1Case where - NTY: NumFromBytes, + NTY: NumOps + NumFromBytes, { type Output = Vec; + type Batch = WaveEvents; fn convert(&self, buf: &[u8], big_endian: bool) -> Result { let es = size_of::(); @@ -131,9 +134,9 @@ pub trait EventValueShape: EventValueFromBytes + Send + Unpi where NTY: NumFromBytes, { - type NumXAggToSingleBin: EventsNodeProcessor>::Output>; - type NumXAggToNBins: EventsNodeProcessor>::Output>; - type NumXAggPlain: EventsNodeProcessor>::Output>; + type NumXAggToSingleBin: EventsNodeProcessor>::Batch>; + type NumXAggToNBins: EventsNodeProcessor>::Batch>; + type NumXAggPlain: EventsNodeProcessor>::Batch>; } pub struct EventValuesDim0Case { @@ -209,10 +212,10 @@ where } } - fn decode(&mut self, ev: &EventFull) -> Result>::Output>, Error> { - let mut ret = EventValues::empty(); - ret.tss.reserve(ev.tss.len()); - ret.values.reserve(ev.tss.len()); + fn decode(&mut self, ev: &EventFull) -> Result<>::Batch, Error> { + let mut ret = <>::Batch as Appendable>::empty(); + //ret.tss.reserve(ev.tss.len()); + //ret.values.reserve(ev.tss.len()); for i1 in 0..ev.tss.len() { // TODO check that dtype, event endianness and event shape match our static // expectation about the data in this channel. @@ -228,8 +231,9 @@ where } let decomp = ev.decomps[i1].as_ref().unwrap().as_ref(); let val = self.evs.convert(decomp, be)?; - ret.tss.push(ev.tss[i1]); - ret.values.push(val); + <>::Batch as EventAppendable>::append_event(&mut ret, ev.tss[i1], val); + //ret.tss.push(ev.tss[i1]); + //ret.values.push(val); } Ok(ret) } @@ -241,8 +245,7 @@ where END: Endianness, EVS: EventValueShape + EventValueFromBytes, { - type Item = - Result>::Output>>>, Error>; + type Item = Result>::Batch>>, Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index 10de015..d37f665 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -23,7 +23,7 @@ where NTY: NumOps + NumFromBytes + 'static, END: Endianness + 'static, EVS: EventValueShape + EventValueFromBytes + 'static, - ENP: EventsNodeProcessor>::Output> + 'static, + ENP: EventsNodeProcessor>::Batch> + 'static, Sitemty<::Output>: Framable + 'static, ::Output: 'static, { diff --git a/items/src/eventvalues.rs b/items/src/eventvalues.rs index 2c86582..1558386 100644 --- a/items/src/eventvalues.rs +++ b/items/src/eventvalues.rs @@ -2,8 +2,9 @@ use crate::minmaxavgbins::MinMaxAvgBins; use crate::numops::NumOps; use crate::streams::{Collectable, Collector}; use crate::{ - ts_offs_from_abs, Appendable, FilterFittingInside, Fits, FitsInside, PushableIndex, RangeOverlapInfo, ReadPbv, - ReadableFromFile, SitemtyFrameType, TimeBinnableType, TimeBinnableTypeAggregator, WithLen, WithTimestamps, + ts_offs_from_abs, Appendable, EventAppendable, FilterFittingInside, Fits, FitsInside, PushableIndex, + RangeOverlapInfo, ReadPbv, ReadableFromFile, SitemtyFrameType, TimeBinnableType, TimeBinnableTypeAggregator, + WithLen, WithTimestamps, }; use err::Error; use netpod::NanoRange; @@ -250,6 +251,7 @@ where Self::Collector::new() } } + pub struct EventValuesAggregator { range: NanoRange, count: u64, @@ -339,3 +341,15 @@ where } } } + +impl EventAppendable for EventValues +where + NTY: NumOps, +{ + type Value = NTY; + + fn append_event(&mut self, ts: u64, value: Self::Value) { + self.tss.push(ts); + self.values.push(value); + } +} diff --git a/items/src/lib.rs b/items/src/lib.rs index 2187e0b..65eeb17 100644 --- a/items/src/lib.rs +++ b/items/src/lib.rs @@ -231,7 +231,7 @@ pub trait EventsNodeProcessor: Send + Unpin { type Input; type Output: Send + Unpin + DeserializeOwned + WithTimestamps + TimeBinnableType; fn create(shape: Shape, agg_kind: AggKind) -> Self; - fn process(&self, inp: EventValues) -> Self::Output; + fn process(&self, inp: Self::Input) -> Self::Output; } #[derive(Clone, Debug, Deserialize)] @@ -294,6 +294,11 @@ pub trait Appendable: WithLen { fn append(&mut self, src: &Self); } +pub trait EventAppendable { + type Value; + fn append_event(&mut self, ts: u64, value: Self::Value); +} + pub trait TimeBins: Send + Unpin + WithLen + Appendable + FilterFittingInside { fn ts1s(&self) -> &Vec; fn ts2s(&self) -> &Vec; diff --git a/items/src/waveevents.rs b/items/src/waveevents.rs index b62e430..504aa3e 100644 --- a/items/src/waveevents.rs +++ b/items/src/waveevents.rs @@ -4,8 +4,9 @@ use crate::numops::NumOps; use crate::xbinnedscalarevents::XBinnedScalarEvents; use crate::xbinnedwaveevents::XBinnedWaveEvents; use crate::{ - Appendable, EventsNodeProcessor, FilterFittingInside, Fits, FitsInside, PushableIndex, RangeOverlapInfo, ReadPbv, - ReadableFromFile, SitemtyFrameType, SubFrId, TimeBinnableType, TimeBinnableTypeAggregator, WithLen, WithTimestamps, + Appendable, EventAppendable, EventsNodeProcessor, FilterFittingInside, Fits, FitsInside, PushableIndex, + RangeOverlapInfo, ReadPbv, ReadableFromFile, SitemtyFrameType, SubFrId, TimeBinnableType, + TimeBinnableTypeAggregator, WithLen, WithTimestamps, }; use err::Error; use netpod::log::*; @@ -267,6 +268,18 @@ where } } +impl EventAppendable for WaveEvents +where + NTY: NumOps, +{ + type Value = Vec; + + fn append_event(&mut self, ts: u64, value: Self::Value) { + self.tss.push(ts); + self.vals.push(value); + } +} + pub struct WaveXBinner { _m1: PhantomData, } @@ -275,14 +288,14 @@ impl EventsNodeProcessor for WaveXBinner where NTY: NumOps, { - type Input = Vec; + type Input = WaveEvents; type Output = XBinnedScalarEvents; fn create(_shape: Shape, _agg_kind: AggKind) -> Self { Self { _m1: PhantomData } } - fn process(&self, inp: EventValues) -> Self::Output { + fn process(&self, inp: Self::Input) -> Self::Output { let nev = inp.tss.len(); let mut ret = Self::Output { tss: inp.tss, @@ -295,7 +308,7 @@ where let mut max = NTY::min_or_nan(); let mut sum = 0f32; let mut sumc = 0; - let vals = &inp.values[i1]; + let vals = &inp.vals[i1]; for &v in vals { if v < min || min.is_nan() { min = v; @@ -332,7 +345,7 @@ impl EventsNodeProcessor for WaveNBinner where NTY: NumOps, { - type Input = Vec; + type Input = WaveEvents; type Output = XBinnedWaveEvents; fn create(shape: Shape, agg_kind: AggKind) -> Self { @@ -348,7 +361,7 @@ where } } - fn process(&self, inp: EventValues) -> Self::Output { + fn process(&self, inp: Self::Input) -> Self::Output { let nev = inp.tss.len(); let mut ret = Self::Output { // TODO get rid of this clone: @@ -362,7 +375,7 @@ where let mut max = vec![NTY::min_or_nan(); self.x_bin_count]; let mut sum = vec![0f32; self.x_bin_count]; let mut sumc = vec![0u64; self.x_bin_count]; - for (i2, &v) in inp.values[i1].iter().enumerate() { + for (i2, &v) in inp.vals[i1].iter().enumerate() { let i3 = i2 * self.x_bin_count / self.shape_bin_count; if v < min[i3] || min[i3].is_nan() { min[i3] = v; @@ -401,25 +414,25 @@ impl EventsNodeProcessor for WavePlainProc where NTY: NumOps, { - type Input = Vec; + type Input = WaveEvents; type Output = WaveEvents; fn create(_shape: Shape, _agg_kind: AggKind) -> Self { Self { _m1: PhantomData } } - fn process(&self, inp: EventValues) -> Self::Output { + fn process(&self, inp: Self::Input) -> Self::Output { if false { - let n = if inp.values.len() > 0 { inp.values[0].len() } else { 0 }; + let n = if inp.vals.len() > 0 { inp.vals[0].len() } else { 0 }; let n = if n > 5 { 5 } else { n }; WaveEvents { tss: inp.tss, - vals: inp.values.iter().map(|k| k[..n].to_vec()).collect(), + vals: inp.vals.iter().map(|k| k[..n].to_vec()).collect(), } } else { WaveEvents { tss: inp.tss, - vals: inp.values, + vals: inp.vals, } } } diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index c203c33..945448c 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -54,6 +54,10 @@ pub enum ScalarType { BOOL, } +pub trait HasScalarType { + fn scalar_type(&self) -> ScalarType; +} + impl ScalarType { pub fn from_dtype_index(ix: u8) -> Result { use ScalarType::*; @@ -331,6 +335,10 @@ pub enum Shape { Wave(u32), } +pub trait HasShape { + fn shape(&self) -> Shape; +} + pub mod timeunits { pub const MU: u64 = 1000; pub const MS: u64 = MU * 1000;