diff --git a/Cargo.toml b/Cargo.toml index 402ae5b..4379c8c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,8 +1,8 @@ [workspace] -members = ["daqbuffer", "h5out", "archapp", "archapp_xc", "archapp_wrap", "items", "nodenet", "httpclient", "fsio", "dq"] +members = ["daqbuffer", "h5out", "archapp", "archapp_xc", "archapp_wrap", "items", "items_proc", "nodenet", "httpclient", "fsio", "dq"] [profile.release] -opt-level = 3 +opt-level = 2 debug = 0 overflow-checks = false debug-assertions = false diff --git a/archapp/Cargo.toml b/archapp/Cargo.toml index 283c077..570bc89 100644 --- a/archapp/Cargo.toml +++ b/archapp/Cargo.toml @@ -27,6 +27,7 @@ taskrun = { path = "../taskrun" } netpod = { path = "../netpod" } dbconn = { path = "../dbconn" } items = { path = "../items" } +items_proc = { path = "../items_proc" } streams = { path = "../streams" } commonio = { path = "../commonio" } diff --git a/archapp/src/archeng/pipe.rs b/archapp/src/archeng/pipe.rs index a3c4c39..26ddf7d 100644 --- a/archapp/src/archeng/pipe.rs +++ b/archapp/src/archeng/pipe.rs @@ -104,7 +104,16 @@ pub async fn make_event_pipe( } PlainEvents::Wave(j) => { trace!("EventsItem::Plain Wave for {:?} {:?}", cfgshape, q_agg_kind); - match j { + items_proc::tycases1!(j, WavePlainEvents, (j), { + let binner = + WaveXBinner::<$ty>::create(cfgshape.clone(), q_agg_kind.clone()); + let out = binner.process(j); + let item = SingleBinWaveEvents::$id(out); + let item = XBinnedEvents::SingleBinWave(item); + let item = EventsItem::XBinnedEvents(item); + Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) + }) + /*match j { WavePlainEvents::I8(j) => { let binner = WaveXBinner::::create(cfgshape.clone(), q_agg_kind.clone()); @@ -150,7 +159,7 @@ pub async fn make_event_pipe( let item = EventsItem::XBinnedEvents(item); Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) } - } + }*/ } }, EventsItem::XBinnedEvents(j) => match j { @@ -192,7 +201,16 @@ pub async fn make_event_pipe( } PlainEvents::Wave(j) => { trace!("EventsItem::Plain Wave for {:?} {:?}", cfgshape, q_agg_kind); - match j { + items_proc::tycases1!(j, WavePlainEvents, (j), { + let binner = + WaveNBinner::<$ty>::create(cfgshape.clone(), q_agg_kind.clone()); + let out = binner.process(j); + let item = MultiBinWaveEvents::$id(out); + let item = XBinnedEvents::MultiBinWave(item); + let item = EventsItem::XBinnedEvents(item); + Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) + }) + /*match j { WavePlainEvents::I8(j) => { let binner = WaveNBinner::::create(cfgshape.clone(), q_agg_kind.clone()); @@ -238,7 +256,7 @@ pub async fn make_event_pipe( let item = EventsItem::XBinnedEvents(item); Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) } - } + }*/ } }, EventsItem::XBinnedEvents(j) => match j { diff --git a/dq/src/bin/daq-convert.rs b/dq/src/bin/daq-convert.rs index 777427f..3d18bd9 100644 --- a/dq/src/bin/daq-convert.rs +++ b/dq/src/bin/daq-convert.rs @@ -1,6 +1,7 @@ use clap::{crate_version, Parser}; use err::Error; -use std::path::PathBuf; +use netpod::{timeunits::*, Nanos}; +use std::{path::PathBuf, str::FromStr}; #[derive(Debug, Parser)] #[clap(name="DAQ buffer tools", version=crate_version!())] @@ -21,22 +22,52 @@ pub enum SubCmd { pub struct ConvertArchiverApplianceChannel { #[clap( long, - about = "Prefix for keyspaces, e.g. specify `daq` to get scalar keyspace directory `daq_2`" + about = "Prefix for keyspaces, e.g. specify `daq` to get scalar keyspace directory `daq_2`." )] keyspace_prefix: String, - #[clap(long, about = "Name of the channel to convert")] + #[clap(long, about = "Name of the channel to convert.")] channel_name: String, - #[clap(long, about = "Look for archiver appliance data at given path")] + #[clap(long, about = "Look for archiver appliance data at given path.")] input_dir: PathBuf, - #[clap(long, about = "Generate Databuffer format at given path")] + #[clap(long, about = "Generate Databuffer format at given path.")] output_dir: PathBuf, + #[clap( + default_value = "1d", + long, + about = "Size of the time-bins in the generated Databuffer format.\nUnit-suffixes: `h` (hours), `d` (days)" + )] + time_bin_size: TimeBinSize, +} + +#[derive(Clone, Debug)] +pub struct TimeBinSize { + nanos: u64, +} + +impl FromStr for TimeBinSize { + type Err = Error; + + fn from_str(s: &str) -> Result { + if s.is_empty() { + return Err(Error::with_msg_no_trace("Malformed time-bin size")); + } + let suff = s.chars().last().unwrap(); + if suff.is_numeric() { + Err(Error::with_msg_no_trace("Malformed time-bin size")) + } else if suff == 'h' { + let bs: u64 = s[..s.len() - 1].parse()?; + Ok(Self { nanos: bs * HOUR }) + } else if suff == 'd' { + let bs: u64 = s[..s.len() - 1].parse()?; + Ok(Self { nanos: bs * DAY }) + } else { + Err(Error::with_msg_no_trace("Malformed time-bin size")) + } + } } pub fn main() -> Result<(), Error> { taskrun::run(async { - if false { - return Err(Error::with_msg_no_trace(format!("unknown command"))); - } let opts = Opts::parse(); match opts.subcmd { SubCmd::ConvertArchiverApplianceChannel(sub) => { @@ -45,6 +76,7 @@ pub fn main() -> Result<(), Error> { channel_name: sub.channel_name, input_dir: sub.input_dir, output_dir: sub.output_dir, + time_bin_size: Nanos::from_ns(sub.time_bin_size.nanos), }; dq::convert(params).await } diff --git a/dq/src/bin/dq.rs b/dq/src/bin/dq.rs index 92fa454..caeb441 100644 --- a/dq/src/bin/dq.rs +++ b/dq/src/bin/dq.rs @@ -24,27 +24,10 @@ pub struct Opts { #[derive(Debug, Parser)] pub enum SubCmd { - #[clap(about = "Convert a channel from the Archiver Appliance into Databuffer format.")] - ConvertArchiverApplianceChannel(ConvertArchiverApplianceChannel), ReadDatabufferConfigfile(ReadDatabufferConfigfile), ReadDatabufferDatafile(ReadDatabufferDatafile), } -#[derive(Debug, Parser)] -pub struct ConvertArchiverApplianceChannel { - #[clap( - long, - about = "Prefix for keyspaces, e.g. specify `daq` to get scalar keyspace directory `daq_2`" - )] - keyspace_prefix: String, - #[clap(long, about = "Name of the channel to convert")] - channel_name: String, - #[clap(long, about = "Look for archiver appliance data at given path")] - input_dir: PathBuf, - #[clap(long, about = "Generate Databuffer format at given path")] - output_dir: PathBuf, -} - #[derive(Debug, Parser)] pub struct ReadDatabufferConfigfile { #[clap(long)] @@ -66,15 +49,6 @@ pub fn main() -> Result<(), Error> { } let opts = Opts::parse(); match opts.subcmd { - SubCmd::ConvertArchiverApplianceChannel(sub) => { - let params = dq::ConvertParams { - keyspace_prefix: sub.keyspace_prefix, - channel_name: sub.channel_name, - input_dir: sub.input_dir, - output_dir: sub.output_dir, - }; - dq::convert(params).await - } SubCmd::ReadDatabufferConfigfile(sub) => { let mut file = File::open(&sub.configfile).await?; let meta = file.metadata().await?; diff --git a/dq/src/dq.rs b/dq/src/dq.rs index 85093f0..16e9281 100644 --- a/dq/src/dq.rs +++ b/dq/src/dq.rs @@ -111,6 +111,7 @@ impl DataWriter { self.write_events(2, ScalarType::F64, &events.tss, &events.values) .await?; } + _ => todo!(), }, PlainEvents::Wave(item) => match item { WavePlainEvents::F64(_events) => { @@ -319,6 +320,7 @@ pub struct ConvertParams { pub channel_name: String, pub input_dir: PathBuf, pub output_dir: PathBuf, + pub time_bin_size: Nanos, } pub async fn convert(convert_params: ConvertParams) -> Result<(), Error> { @@ -330,7 +332,7 @@ pub async fn convert(convert_params: ConvertParams) -> Result<(), Error> { convert_params.output_dir ))); } - let bs = Nanos::from_ns(DAY); + let bs = convert_params.time_bin_size.clone(); let mut channel_config: Option = None; let channel = Channel { backend: String::new(), @@ -424,30 +426,6 @@ pub async fn convert(convert_params: ConvertParams) -> Result<(), Error> { } if channel_config.is_none() { let ks = if ei.is_wave() { 3 } else { 2 }; - let scalar_type_2 = match &ei { - items::eventsitem::EventsItem::Plain(k) => match k { - PlainEvents::Scalar(k) => match k { - ScalarPlainEvents::U32(_) => ScalarType::U32, - ScalarPlainEvents::I8(_) => ScalarType::I8, - ScalarPlainEvents::I16(_) => ScalarType::I16, - ScalarPlainEvents::I32(_) => ScalarType::I32, - ScalarPlainEvents::F32(_) => ScalarType::F32, - ScalarPlainEvents::F64(_) => ScalarType::F64, - }, - PlainEvents::Wave(k) => match k { - WavePlainEvents::I8(_) => ScalarType::I8, - WavePlainEvents::I16(_) => ScalarType::I16, - WavePlainEvents::I32(_) => ScalarType::I32, - WavePlainEvents::F32(_) => ScalarType::F32, - WavePlainEvents::F64(_) => ScalarType::F64, - }, - }, - items::eventsitem::EventsItem::XBinnedEvents(_) => panic!(), - }; - if scalar_type_2 != scalar_type { - let msg = format!("unexpected type: {:?} vs {:?}", scalar_type_2, scalar_type); - return Err(Error::with_msg_no_trace(msg)); - } let e = parse::channelconfig::ConfigEntry { ts: 0, pulse: 0, diff --git a/items/Cargo.toml b/items/Cargo.toml index a6be0fd..1c9225b 100644 --- a/items/Cargo.toml +++ b/items/Cargo.toml @@ -15,4 +15,5 @@ tokio = { version = "1.7.1", features = ["fs"] } chrono = { version = "0.4.19", features = ["serde"] } crc32fast = "1.2.1" err = { path = "../err" } +items_proc = { path = "../items_proc" } netpod = { path = "../netpod" } diff --git a/items/src/binnedevents.rs b/items/src/binnedevents.rs index 1c4d977..b523f4f 100644 --- a/items/src/binnedevents.rs +++ b/items/src/binnedevents.rs @@ -1,339 +1,184 @@ +use crate::eventsitem::EventsItem; +use crate::plainevents::{PlainEvents, ScalarPlainEvents}; use crate::xbinnedscalarevents::XBinnedScalarEvents; use crate::xbinnedwaveevents::XBinnedWaveEvents; use crate::{Appendable, Clearable, PushableIndex, WithLen, WithTimestamps}; use netpod::{AggKind, HasScalarType, HasShape, ScalarType, Shape}; use serde::{Deserialize, Serialize}; -use crate::{ - eventsitem::EventsItem, - plainevents::{PlainEvents, ScalarPlainEvents}, -}; - #[derive(Debug, Serialize, Deserialize)] pub enum SingleBinWaveEvents { + U8(XBinnedScalarEvents), + U16(XBinnedScalarEvents), + U32(XBinnedScalarEvents), + U64(XBinnedScalarEvents), I8(XBinnedScalarEvents), I16(XBinnedScalarEvents), I32(XBinnedScalarEvents), + I64(XBinnedScalarEvents), F32(XBinnedScalarEvents), F64(XBinnedScalarEvents), } impl SingleBinWaveEvents { pub fn variant_name(&self) -> String { - use SingleBinWaveEvents::*; - match self { - I8(_) => format!("I8"), - I16(_) => format!("I16"), - I32(_) => format!("I32"), - F32(_) => format!("F32"), - F64(_) => format!("F64"), - } + items_proc::tycases1!(self, Self, (k), { "$id".into() }) } fn x_aggregate(self, ak: &AggKind) -> EventsItem { - use SingleBinWaveEvents::*; - match self { - I8(k) => match ak { + items_proc::tycases1!(self, Self, (k), { + match ak { AggKind::EventBlobs => panic!(), - AggKind::Plain => EventsItem::XBinnedEvents(XBinnedEvents::SingleBinWave(SingleBinWaveEvents::I8(k))), + AggKind::Plain => EventsItem::XBinnedEvents(XBinnedEvents::SingleBinWave(SingleBinWaveEvents::$id(k))), AggKind::TimeWeightedScalar => err::todoval(), AggKind::DimXBins1 => err::todoval(), AggKind::DimXBinsN(_) => EventsItem::Plain(PlainEvents::Wave(err::todoval())), - }, - _ => err::todoval(), - } + } + }) } } impl Clearable for SingleBinWaveEvents { fn clear(&mut self) { - match self { - SingleBinWaveEvents::I8(k) => k.clear(), - SingleBinWaveEvents::I16(k) => k.clear(), - SingleBinWaveEvents::I32(k) => k.clear(), - SingleBinWaveEvents::F32(k) => k.clear(), - SingleBinWaveEvents::F64(k) => k.clear(), - } + items_proc::tycases1!(self, Self, (k), { k.clear() }) } } impl Appendable for SingleBinWaveEvents { fn empty_like_self(&self) -> Self { - match self { - Self::I8(k) => Self::I8(k.empty_like_self()), - Self::I16(k) => Self::I16(k.empty_like_self()), - Self::I32(k) => Self::I32(k.empty_like_self()), - Self::F32(k) => Self::F32(k.empty_like_self()), - Self::F64(k) => Self::F64(k.empty_like_self()), - } + items_proc::tycases1!(self, Self, (k), { Self::$id(k.empty_like_self()) }) } fn append(&mut self, src: &Self) { - match self { - Self::I8(k) => match src { - Self::I8(j) => k.append(j), + items_proc::tycases1!(self, Self, (k), { + match src { + Self::$id(j) => k.append(j), _ => panic!(), - }, - Self::I16(k) => match src { - Self::I16(j) => k.append(j), - _ => panic!(), - }, - Self::I32(k) => match src { - Self::I32(j) => k.append(j), - _ => panic!(), - }, - Self::F32(k) => match src { - Self::F32(j) => k.append(j), - _ => panic!(), - }, - Self::F64(k) => match src { - Self::F64(j) => k.append(j), - _ => panic!(), - }, - } + } + }) } } impl PushableIndex for SingleBinWaveEvents { fn push_index(&mut self, src: &Self, ix: usize) { - match self { - Self::I8(k) => match src { - Self::I8(j) => k.push_index(j, ix), + items_proc::tycases1!(self, Self, (k), { + match src { + Self::$id(j) => k.push_index(j, ix), _ => panic!(), - }, - Self::I16(k) => match src { - Self::I16(j) => k.push_index(j, ix), - _ => panic!(), - }, - Self::I32(k) => match src { - Self::I32(j) => k.push_index(j, ix), - _ => panic!(), - }, - Self::F32(k) => match src { - Self::F32(j) => k.push_index(j, ix), - _ => panic!(), - }, - Self::F64(k) => match src { - Self::F64(j) => k.push_index(j, ix), - _ => panic!(), - }, - } + } + }) } } impl WithLen for SingleBinWaveEvents { fn len(&self) -> usize { - use SingleBinWaveEvents::*; - match self { - I8(j) => j.len(), - I16(j) => j.len(), - I32(j) => j.len(), - F32(j) => j.len(), - F64(j) => j.len(), - } + items_proc::tycases1!(self, Self, (k), { k.len() }) } } impl WithTimestamps for SingleBinWaveEvents { fn ts(&self, ix: usize) -> u64 { - use SingleBinWaveEvents::*; - match self { - I8(j) => j.ts(ix), - I16(j) => j.ts(ix), - I32(j) => j.ts(ix), - F32(j) => j.ts(ix), - F64(j) => j.ts(ix), - } + items_proc::tycases1!(self, Self, (k), { k.ts(ix) }) } } impl HasShape for SingleBinWaveEvents { fn shape(&self) -> Shape { - use SingleBinWaveEvents::*; - match self { - I8(_) => Shape::Scalar, - I16(_) => Shape::Scalar, - I32(_) => Shape::Scalar, - F32(_) => Shape::Scalar, - F64(_) => Shape::Scalar, - } + Shape::Scalar } } impl HasScalarType for SingleBinWaveEvents { fn scalar_type(&self) -> ScalarType { - use SingleBinWaveEvents::*; - match self { - I8(_) => ScalarType::I8, - I16(_) => ScalarType::I16, - I32(_) => ScalarType::I32, - F32(_) => ScalarType::F32, - F64(_) => ScalarType::F64, - } + items_proc::tycases1!(self, Self, (k), { ScalarType::$id }) } } #[derive(Debug, Serialize, Deserialize)] pub enum MultiBinWaveEvents { + U8(XBinnedWaveEvents), + U16(XBinnedWaveEvents), + U32(XBinnedWaveEvents), + U64(XBinnedWaveEvents), I8(XBinnedWaveEvents), I16(XBinnedWaveEvents), I32(XBinnedWaveEvents), + I64(XBinnedWaveEvents), F32(XBinnedWaveEvents), F64(XBinnedWaveEvents), } impl MultiBinWaveEvents { pub fn variant_name(&self) -> String { - use MultiBinWaveEvents::*; - match self { - I8(_) => format!("I8"), - I16(_) => format!("I16"), - I32(_) => format!("I32"), - F32(_) => format!("F32"), - F64(_) => format!("F64"), - } + items_proc::tycases1!(self, Self, (k), { "$id".into() }) } fn x_aggregate(self, ak: &AggKind) -> EventsItem { - use MultiBinWaveEvents::*; - match self { - I8(k) => match ak { + items_proc::tycases1!(self, Self, (k), { + match ak { AggKind::EventBlobs => panic!(), - AggKind::Plain => EventsItem::XBinnedEvents(XBinnedEvents::MultiBinWave(MultiBinWaveEvents::I8(k))), + AggKind::Plain => EventsItem::XBinnedEvents(XBinnedEvents::MultiBinWave(MultiBinWaveEvents::$id(k))), AggKind::TimeWeightedScalar => err::todoval(), AggKind::DimXBins1 => err::todoval(), AggKind::DimXBinsN(_) => EventsItem::Plain(PlainEvents::Wave(err::todoval())), - }, - _ => err::todoval(), - } + } + }) } } impl Clearable for MultiBinWaveEvents { fn clear(&mut self) { - match self { - MultiBinWaveEvents::I8(k) => k.clear(), - MultiBinWaveEvents::I16(k) => k.clear(), - MultiBinWaveEvents::I32(k) => k.clear(), - MultiBinWaveEvents::F32(k) => k.clear(), - MultiBinWaveEvents::F64(k) => k.clear(), - } + items_proc::tycases1!(self, Self, (k), { k.clear() }) } } impl Appendable for MultiBinWaveEvents { fn empty_like_self(&self) -> Self { - match self { - Self::I8(k) => Self::I8(k.empty_like_self()), - Self::I16(k) => Self::I16(k.empty_like_self()), - Self::I32(k) => Self::I32(k.empty_like_self()), - Self::F32(k) => Self::F32(k.empty_like_self()), - Self::F64(k) => Self::F64(k.empty_like_self()), - } + items_proc::tycases1!(self, Self, (k), { Self::$id(k.empty_like_self()) }) } fn append(&mut self, src: &Self) { - match self { - Self::I8(k) => match src { - Self::I8(j) => k.append(j), + items_proc::tycases1!(self, Self, (k), { + match src { + Self::$id(j) => k.append(j), _ => panic!(), - }, - Self::I16(k) => match src { - Self::I16(j) => k.append(j), - _ => panic!(), - }, - Self::I32(k) => match src { - Self::I32(j) => k.append(j), - _ => panic!(), - }, - Self::F32(k) => match src { - Self::F32(j) => k.append(j), - _ => panic!(), - }, - Self::F64(k) => match src { - Self::F64(j) => k.append(j), - _ => panic!(), - }, - } + } + }) } } impl PushableIndex for MultiBinWaveEvents { fn push_index(&mut self, src: &Self, ix: usize) { - match self { - Self::I8(k) => match src { - Self::I8(j) => k.push_index(j, ix), + items_proc::tycases1!(self, Self, (k), { + match src { + Self::$id(j) => k.push_index(j, ix), _ => panic!(), - }, - Self::I16(k) => match src { - Self::I16(j) => k.push_index(j, ix), - _ => panic!(), - }, - Self::I32(k) => match src { - Self::I32(j) => k.push_index(j, ix), - _ => panic!(), - }, - Self::F32(k) => match src { - Self::F32(j) => k.push_index(j, ix), - _ => panic!(), - }, - Self::F64(k) => match src { - Self::F64(j) => k.push_index(j, ix), - _ => panic!(), - }, - } + } + }) } } impl WithLen for MultiBinWaveEvents { fn len(&self) -> usize { - use MultiBinWaveEvents::*; - match self { - I8(j) => j.len(), - I16(j) => j.len(), - I32(j) => j.len(), - F32(j) => j.len(), - F64(j) => j.len(), - } + items_proc::tycases1!(self, Self, (k), { k.len() }) } } impl WithTimestamps for MultiBinWaveEvents { fn ts(&self, ix: usize) -> u64 { - use MultiBinWaveEvents::*; - match self { - I8(j) => j.ts(ix), - I16(j) => j.ts(ix), - I32(j) => j.ts(ix), - F32(j) => j.ts(ix), - F64(j) => j.ts(ix), - } + items_proc::tycases1!(self, Self, (k), { k.ts(ix) }) } } impl HasShape for MultiBinWaveEvents { fn shape(&self) -> Shape { - use MultiBinWaveEvents::*; - match self { - I8(_) => Shape::Scalar, - I16(_) => Shape::Scalar, - I32(_) => Shape::Scalar, - F32(_) => Shape::Scalar, - F64(_) => Shape::Scalar, - } + Shape::Scalar } } impl HasScalarType for MultiBinWaveEvents { fn scalar_type(&self) -> ScalarType { - use MultiBinWaveEvents::*; - match self { - I8(_) => ScalarType::I8, - I16(_) => ScalarType::I16, - I32(_) => ScalarType::I32, - F32(_) => ScalarType::F32, - F64(_) => ScalarType::F64, - } + items_proc::tycases1!(self, Self, (k), { ScalarType::$id }) } } diff --git a/items/src/eventsitem.rs b/items/src/eventsitem.rs index 10c9225..7212e48 100644 --- a/items/src/eventsitem.rs +++ b/items/src/eventsitem.rs @@ -1,5 +1,5 @@ use crate::binnedevents::XBinnedEvents; -use crate::plainevents::{PlainEvents, ScalarPlainEvents, WavePlainEvents}; +use crate::plainevents::PlainEvents; use crate::{Appendable, Clearable, PushableIndex, SitemtyFrameType, WithLen, WithTimestamps}; use netpod::{AggKind, HasScalarType, HasShape, ScalarType, Shape}; use serde::{Deserialize, Serialize}; @@ -46,29 +46,7 @@ impl EventsItem { } pub fn type_info(&self) -> (ScalarType, Shape) { - match self { - EventsItem::Plain(k) => match k { - PlainEvents::Scalar(k) => match k { - ScalarPlainEvents::U32(_) => (ScalarType::U32, Shape::Scalar), - ScalarPlainEvents::I8(_) => (ScalarType::I8, Shape::Scalar), - ScalarPlainEvents::I16(_) => (ScalarType::I16, Shape::Scalar), - ScalarPlainEvents::I32(_) => (ScalarType::I32, Shape::Scalar), - ScalarPlainEvents::F32(_) => (ScalarType::F32, Shape::Scalar), - ScalarPlainEvents::F64(_) => (ScalarType::F64, Shape::Scalar), - }, - PlainEvents::Wave(k) => match k { - // TODO - // Inherent issue for the non-static-type backends: - // there is a chance that we can't determine the shape here. - WavePlainEvents::I8(k) => (ScalarType::I8, k.shape().unwrap()), - WavePlainEvents::I16(k) => (ScalarType::I16, k.shape().unwrap()), - WavePlainEvents::I32(k) => (ScalarType::I32, k.shape().unwrap()), - WavePlainEvents::F32(k) => (ScalarType::F32, k.shape().unwrap()), - WavePlainEvents::F64(k) => (ScalarType::F64, k.shape().unwrap()), - }, - }, - EventsItem::XBinnedEvents(_k) => panic!(), - } + (self.scalar_type(), self.shape()) } } diff --git a/items/src/plainevents.rs b/items/src/plainevents.rs index 8a67322..cfe8b68 100644 --- a/items/src/plainevents.rs +++ b/items/src/plainevents.rs @@ -2,363 +2,190 @@ use crate::binnedevents::{SingleBinWaveEvents, XBinnedEvents}; use crate::eventsitem::EventsItem; use crate::eventvalues::EventValues; use crate::waveevents::{WaveEvents, WaveXBinner}; +use crate::xbinnedscalarevents::XBinnedScalarEvents; use crate::{Appendable, Clearable, EventsNodeProcessor, PushableIndex, WithLen, WithTimestamps}; use err::Error; use netpod::{AggKind, HasScalarType, HasShape, ScalarType, Shape}; use serde::{Deserialize, Serialize}; +//items_proc::enumvars!(ScalarPlainEvents, EventValues); + #[derive(Debug, Serialize, Deserialize)] pub enum ScalarPlainEvents { + U8(EventValues), + U16(EventValues), U32(EventValues), + U64(EventValues), I8(EventValues), I16(EventValues), I32(EventValues), + I64(EventValues), F32(EventValues), F64(EventValues), } impl ScalarPlainEvents { pub fn variant_name(&self) -> String { - use ScalarPlainEvents::*; - match self { - U32(_) => format!("U32"), - I8(_) => format!("I8"), - I16(_) => format!("I16"), - I32(_) => format!("I32"), - F32(_) => format!("F32"), - F64(_) => format!("F64"), - } + items_proc::tycases1!(self, Self, (k), { "$id".into() }) } } impl Clearable for ScalarPlainEvents { fn clear(&mut self) { - match self { - ScalarPlainEvents::U32(k) => k.clear(), - ScalarPlainEvents::I8(k) => k.clear(), - ScalarPlainEvents::I16(k) => k.clear(), - ScalarPlainEvents::I32(k) => k.clear(), - ScalarPlainEvents::F32(k) => k.clear(), - ScalarPlainEvents::F64(k) => k.clear(), - } + items_proc::tycases1!(self, Self, (k), { k.clear() }) } } impl Appendable for ScalarPlainEvents { fn empty_like_self(&self) -> Self { - match self { - Self::U32(k) => Self::U32(k.empty_like_self()), - Self::I8(k) => Self::I8(k.empty_like_self()), - Self::I16(k) => Self::I16(k.empty_like_self()), - Self::I32(k) => Self::I32(k.empty_like_self()), - Self::F32(k) => Self::F32(k.empty_like_self()), - Self::F64(k) => Self::F64(k.empty_like_self()), - } + items_proc::tycases1!(self, Self, (k), { Self::$id(k.empty_like_self()) }) } fn append(&mut self, src: &Self) { - match self { - Self::U32(k) => match src { - Self::U32(j) => k.append(j), + items_proc::tycases1!(self, Self, (k), { + match src { + Self::$id(j) => k.append(j), _ => panic!(), - }, - Self::I8(k) => match src { - Self::I8(j) => k.append(j), - _ => panic!(), - }, - Self::I16(k) => match src { - Self::I16(j) => k.append(j), - _ => panic!(), - }, - Self::I32(k) => match src { - Self::I32(j) => k.append(j), - _ => panic!(), - }, - Self::F32(k) => match src { - Self::F32(j) => k.append(j), - _ => panic!(), - }, - Self::F64(k) => match src { - Self::F64(j) => k.append(j), - _ => panic!(), - }, - } + } + }) } } impl PushableIndex for ScalarPlainEvents { fn push_index(&mut self, src: &Self, ix: usize) { - match self { - Self::U32(k) => match src { - Self::U32(j) => k.push_index(j, ix), + items_proc::tycases1!(self, Self, (k), { + match src { + Self::$id(j) => k.push_index(j, ix), _ => panic!(), - }, - Self::I8(k) => match src { - Self::I8(j) => k.push_index(j, ix), - _ => panic!(), - }, - Self::I16(k) => match src { - Self::I16(j) => k.push_index(j, ix), - _ => panic!(), - }, - Self::I32(k) => match src { - Self::I32(j) => k.push_index(j, ix), - _ => panic!(), - }, - Self::F32(k) => match src { - Self::F32(j) => k.push_index(j, ix), - _ => panic!(), - }, - Self::F64(k) => match src { - Self::F64(j) => k.push_index(j, ix), - _ => panic!(), - }, - } + } + }) } } impl WithLen for ScalarPlainEvents { fn len(&self) -> usize { - use ScalarPlainEvents::*; - match self { - U32(j) => j.len(), - I8(j) => j.len(), - I16(j) => j.len(), - I32(j) => j.len(), - F32(j) => j.len(), - F64(j) => j.len(), - } + items_proc::tycases1!(self, Self, (k), { k.len() }) } } impl WithTimestamps for ScalarPlainEvents { fn ts(&self, ix: usize) -> u64 { - use ScalarPlainEvents::*; - match self { - U32(j) => j.ts(ix), - I8(j) => j.ts(ix), - I16(j) => j.ts(ix), - I32(j) => j.ts(ix), - F32(j) => j.ts(ix), - F64(j) => j.ts(ix), - } + items_proc::tycases1!(self, Self, (k), { k.ts(ix) }) } } impl HasShape for ScalarPlainEvents { fn shape(&self) -> Shape { - match self { - _ => Shape::Scalar, - } + Shape::Scalar } } impl HasScalarType for ScalarPlainEvents { fn scalar_type(&self) -> ScalarType { - use ScalarPlainEvents::*; - match self { - U32(_) => ScalarType::U32, - I8(_) => ScalarType::I8, - I16(_) => ScalarType::I16, - I32(_) => ScalarType::I32, - F32(_) => ScalarType::F32, - F64(_) => ScalarType::F64, - } + items_proc::tycases1!(self, Self, (k), { ScalarType::$id }) } } +//items_proc::enumvars!(WavePlainEvents, WaveEvents); + #[derive(Debug, Serialize, Deserialize)] pub enum WavePlainEvents { + U8(WaveEvents), + U16(WaveEvents), + U32(WaveEvents), + U64(WaveEvents), I8(WaveEvents), I16(WaveEvents), I32(WaveEvents), + I64(WaveEvents), F32(WaveEvents), F64(WaveEvents), } impl WavePlainEvents { pub fn shape(&self) -> Result { - match self { - WavePlainEvents::I8(k) => k.shape(), - WavePlainEvents::I16(k) => k.shape(), - WavePlainEvents::I32(k) => k.shape(), - WavePlainEvents::F32(k) => k.shape(), - WavePlainEvents::F64(k) => k.shape(), - } + items_proc::tycases1!(self, Self, (k), { k.shape() }) } } -macro_rules! wagg1 { - ($k:expr, $ak:expr, $shape:expr, $sty:ident) => { - match $ak { - AggKind::EventBlobs => panic!(), - AggKind::Plain => EventsItem::Plain(PlainEvents::Wave(WavePlainEvents::$sty($k))), - AggKind::TimeWeightedScalar => { - let p = WaveXBinner::create($shape, $ak.clone()); - let j = p.process($k); - EventsItem::XBinnedEvents(XBinnedEvents::SingleBinWave(SingleBinWaveEvents::$sty(j))) - } - AggKind::DimXBins1 => { - let p = WaveXBinner::create($shape, $ak.clone()); - let j = p.process($k); - EventsItem::XBinnedEvents(XBinnedEvents::SingleBinWave(SingleBinWaveEvents::$sty(j))) - } - AggKind::DimXBinsN(_) => EventsItem::Plain(PlainEvents::Wave(err::todoval())), - } - }; -} - impl WavePlainEvents { pub fn variant_name(&self) -> String { - use WavePlainEvents::*; - match self { - I8(h) => format!("I8({})", h.vals.first().map_or(0, |j| j.len())), - I16(h) => format!("I16({})", h.vals.first().map_or(0, |j| j.len())), - I32(h) => format!("I32({})", h.vals.first().map_or(0, |j| j.len())), - F32(h) => format!("F32({})", h.vals.first().map_or(0, |j| j.len())), - F64(h) => format!("F64({})", h.vals.first().map_or(0, |j| j.len())), - } + items_proc::tycases1!(self, Self, (k), { + format!("$id({})", k.vals.first().map_or(0, |j| j.len())) + }) } fn x_aggregate(self, ak: &AggKind) -> EventsItem { - use WavePlainEvents::*; let shape = self.shape().unwrap(); - match self { - I8(k) => wagg1!(k, ak, shape, I8), - I16(k) => wagg1!(k, ak, shape, I16), - I32(k) => wagg1!(k, ak, shape, I32), - F32(k) => wagg1!(k, ak, shape, F32), - F64(k) => wagg1!(k, ak, shape, F64), - } + items_proc::tycases1!(self, Self, (k), { + match ak { + AggKind::EventBlobs => panic!(), + AggKind::Plain => EventsItem::Plain(PlainEvents::Wave(WavePlainEvents::$id(k))), + AggKind::TimeWeightedScalar => { + let p = WaveXBinner::<$ty>::create(shape, ak.clone()); + let j: XBinnedScalarEvents<$ty> = p.process(k); + EventsItem::XBinnedEvents(XBinnedEvents::SingleBinWave(SingleBinWaveEvents::$id(j))) + } + AggKind::DimXBins1 => { + let p = WaveXBinner::<$ty>::create(shape, ak.clone()); + let j: XBinnedScalarEvents<$ty> = p.process(k); + EventsItem::XBinnedEvents(XBinnedEvents::SingleBinWave(SingleBinWaveEvents::$id(j))) + } + AggKind::DimXBinsN(_) => EventsItem::Plain(PlainEvents::Wave(err::todoval())), + } + }) } } impl Clearable for WavePlainEvents { fn clear(&mut self) { - match self { - WavePlainEvents::I8(k) => k.clear(), - WavePlainEvents::I16(k) => k.clear(), - WavePlainEvents::I32(k) => k.clear(), - WavePlainEvents::F32(k) => k.clear(), - WavePlainEvents::F64(k) => k.clear(), - } + items_proc::tycases1!(self, Self, (k), { k.clear() }) } } impl Appendable for WavePlainEvents { fn empty_like_self(&self) -> Self { - match self { - Self::I8(k) => Self::I8(k.empty_like_self()), - Self::I16(k) => Self::I16(k.empty_like_self()), - Self::I32(k) => Self::I32(k.empty_like_self()), - Self::F32(k) => Self::F32(k.empty_like_self()), - Self::F64(k) => Self::F64(k.empty_like_self()), - } + items_proc::tycases1!(self, Self, (k), { Self::$id(k.empty_like_self()) }) } fn append(&mut self, src: &Self) { - match self { - Self::I8(k) => match src { - Self::I8(j) => k.append(j), - _ => panic!(), - }, - Self::I16(k) => match src { - Self::I16(j) => k.append(j), - _ => panic!(), - }, - Self::I32(k) => match src { - Self::I32(j) => k.append(j), - _ => panic!(), - }, - Self::F32(k) => match src { - Self::F32(j) => k.append(j), - _ => panic!(), - }, - Self::F64(k) => match src { - Self::F64(j) => k.append(j), - _ => panic!(), - }, - } + items_proc::tycases1!(self, Self, (k), { match src { + Self::$id(j) => k.append(j), + _ => panic!(), + } }) } } impl PushableIndex for WavePlainEvents { fn push_index(&mut self, src: &Self, ix: usize) { - match self { - Self::I8(k) => match src { - Self::I8(j) => k.push_index(j, ix), - _ => panic!(), - }, - Self::I16(k) => match src { - Self::I16(j) => k.push_index(j, ix), - _ => panic!(), - }, - Self::I32(k) => match src { - Self::I32(j) => k.push_index(j, ix), - _ => panic!(), - }, - Self::F32(k) => match src { - Self::F32(j) => k.push_index(j, ix), - _ => panic!(), - }, - Self::F64(k) => match src { - Self::F64(j) => k.push_index(j, ix), - _ => panic!(), - }, - } + items_proc::tycases1!(self, Self, (k), { match src { + Self::$id(j) => k.push_index(j, ix), + _ => panic!(), + } }) } } impl WithLen for WavePlainEvents { fn len(&self) -> usize { - use WavePlainEvents::*; - match self { - I8(j) => j.len(), - I16(j) => j.len(), - I32(j) => j.len(), - F32(j) => j.len(), - F64(j) => j.len(), - } + items_proc::tycases1!(self, Self, (k), { k.len() }) } } impl WithTimestamps for WavePlainEvents { fn ts(&self, ix: usize) -> u64 { - use WavePlainEvents::*; - match self { - I8(j) => j.ts(ix), - I16(j) => j.ts(ix), - I32(j) => j.ts(ix), - F32(j) => j.ts(ix), - F64(j) => j.ts(ix), - } + items_proc::tycases1!(self, Self, (k), { k.ts(ix) }) } } impl HasShape for WavePlainEvents { fn shape(&self) -> Shape { - /*use WavePlainEvents::*; - match self { - Byte(h) => Shape::Wave(h.vals.first().map_or(0, |x| x.len() as u32)), - I16(h) => Shape::Wave(h.vals.first().map_or(0, |x| x.len() as u32)), - I32(h) => Shape::Wave(h.vals.first().map_or(0, |x| x.len() as u32)), - Float(h) => Shape::Wave(h.vals.first().map_or(0, |x| x.len() as u32)), - Double(h) => Shape::Wave(h.vals.first().map_or(0, |x| x.len() as u32)), - }*/ self.shape().unwrap() } } impl HasScalarType for WavePlainEvents { fn scalar_type(&self) -> ScalarType { - use WavePlainEvents::*; - match self { - I8(_) => ScalarType::I8, - I16(_) => ScalarType::I16, - I32(_) => ScalarType::I32, - F32(_) => ScalarType::F32, - F64(_) => ScalarType::F64, - } + items_proc::tycases1!(self, Self, (k), { ScalarType::$id }) } } diff --git a/items_proc/Cargo.toml b/items_proc/Cargo.toml new file mode 100644 index 0000000..1bfb180 --- /dev/null +++ b/items_proc/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "items_proc" +version = "0.0.1" +authors = ["Dominik Werder "] +edition = "2021" + +[lib] +path = "src/items_proc.rs" +proc-macro = true diff --git a/items_proc/src/items_proc.rs b/items_proc/src/items_proc.rs new file mode 100644 index 0000000..5d57ea3 --- /dev/null +++ b/items_proc/src/items_proc.rs @@ -0,0 +1,81 @@ +use proc_macro::{TokenStream, TokenTree}; + +const TYS: [&str; 10] = ["u8", "u16", "u32", "u64", "i8", "i16", "i32", "i64", "f32", "f64"]; +const IDS: [&str; 10] = ["U8", "U16", "U32", "U64", "I8", "I16", "I32", "I64", "F32", "F64"]; + +#[proc_macro] +pub fn make_answer(_item: TokenStream) -> TokenStream { + "fn answer() -> u32 { 42 }".parse().unwrap() +} + +#[proc_macro] +pub fn tycases1(ts: TokenStream) -> TokenStream { + for tt in ts.clone() { + match tt { + TokenTree::Group(..) => (), + TokenTree::Ident(..) => (), + TokenTree::Punct(..) => (), + TokenTree::Literal(..) => (), + } + } + let tokens: Vec<_> = ts.clone().into_iter().collect(); + let match_val = if let TokenTree::Ident(x) = tokens[0].clone() { + //panic!("GOT {}", x.to_string()); + x.to_string() + } else { + panic!("match_val") + }; + let enum_1_pre = if let TokenTree::Ident(x) = tokens[2].clone() { + //panic!("GOT {}", x.to_string()); + x.to_string() + } else { + panic!("enum_1_pre") + }; + let enum_1_suff = tokens[4].to_string(); + let rhs = if let TokenTree::Group(x) = tokens[6].clone() { + //panic!("GOT {}", x.to_string()); + x.to_string() + } else { + panic!("RHS mismatch {:?}", tokens[6]) + }; + //panic!("{:?}", tokens[0]); + let tys = ["u8", "u16", "u32", "u64", "i8", "i16", "i32", "i64", "f32", "f64"]; + let ids = ["U8", "U16", "U32", "U64", "I8", "I16", "I32", "I64", "F32", "F64"]; + let mut arms = vec![]; + for (id, ty) in ids.iter().zip(&tys) { + let rhs = rhs.replace("$id", id); + let rhs = rhs.replace("$ty", ty); + let s = format!("{}::{}{} => {},", enum_1_pre, id, enum_1_suff, rhs); + arms.push(s); + } + let gen = format!("match {} {{\n{}\n}}", match_val, arms.join("\n")); + //panic!("GENERATED: {}", gen); + gen.parse().unwrap() +} + +#[proc_macro] +pub fn enumvars(ts: TokenStream) -> TokenStream { + let tokens: Vec<_> = ts.clone().into_iter().collect(); + let name = if let TokenTree::Ident(x) = tokens[0].clone() { + x.to_string() + } else { + panic!("name") + }; + let rhe = if let TokenTree::Ident(x) = tokens[2].clone() { + x.to_string() + } else { + panic!("rhe") + }; + let mut cases = vec![]; + for (id, ty) in IDS.iter().zip(&TYS) { + let s = format!("{}({}<{}>),", id, rhe, ty); + cases.push(s); + } + let gen = format!( + "#[derive(Debug, Serialize, Deserialize)]\npub enum {} {{\n{}\n}}\n", + name, + cases.join("\n") + ); + //panic!("GENERATED: {}", gen); + gen.parse().unwrap() +}