From 7053af83b7a9f36ed3ceee7dbfc1fd8aa86aeebe Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 15 Dec 2021 22:54:42 +0100 Subject: [PATCH] Refactor for easier channel conversion tool --- archapp/src/archeng/datablock.rs | 16 +- archapp/src/archeng/pipe.rs | 40 +-- archapp/src/events.rs | 12 +- archapp/src/parse.rs | 25 +- disk/src/decode.rs | 208 ++++++++++++- disk/src/eventchunker.rs | 8 + dq/src/bin/daq-convert.rs | 53 ++++ dq/src/bin/dq.rs | 481 ++-------------------------- dq/src/dq.rs | 519 +++++++++++++++++++++++++++++++ items/src/binnedevents.rs | 248 +++++++-------- items/src/eventsitem.rs | 21 +- items/src/plainevents.rs | 259 +++++++-------- 12 files changed, 1126 insertions(+), 764 deletions(-) create mode 100644 dq/src/bin/daq-convert.rs diff --git a/archapp/src/archeng/datablock.rs b/archapp/src/archeng/datablock.rs index c13ab43..b1aa6bc 100644 --- a/archapp/src/archeng/datablock.rs +++ b/archapp/src/archeng/datablock.rs @@ -451,21 +451,21 @@ pub async fn read_data2( return Err(Error::with_msg_no_trace(format!("unexpected dbrcount {}", dbrcount))); } let res = match &dbrt { - DbrType::DbrTimeChar => read_msg!(i8, ex_s_i8, ex_v_i8, Byte, rb, msg_len, numsamples, dbrt, dbrcount), - DbrType::DbrTimeShort => read_msg!(i16, ex_s_i16, ex_v_i16, Short, rb, msg_len, numsamples, dbrt, dbrcount), - DbrType::DbrTimeLong => read_msg!(i32, ex_s_i32, ex_v_i32, Int, rb, msg_len, numsamples, dbrt, dbrcount), - DbrType::DbrTimeFloat => read_msg!(f32, ex_s_f32, ex_v_f32, Float, rb, msg_len, numsamples, dbrt, dbrcount), - DbrType::DbrTimeDouble => read_msg!(f64, ex_s_f64, ex_v_f64, Double, rb, msg_len, numsamples, dbrt, dbrcount), + DbrType::DbrTimeChar => read_msg!(i8, ex_s_i8, ex_v_i8, I8, rb, msg_len, numsamples, dbrt, dbrcount), + DbrType::DbrTimeShort => read_msg!(i16, ex_s_i16, ex_v_i16, I16, rb, msg_len, numsamples, dbrt, dbrcount), + DbrType::DbrTimeLong => read_msg!(i32, ex_s_i32, ex_v_i32, I32, rb, msg_len, numsamples, dbrt, dbrcount), + DbrType::DbrTimeFloat => read_msg!(f32, ex_s_f32, ex_v_f32, F32, rb, msg_len, numsamples, dbrt, dbrcount), + DbrType::DbrTimeDouble => read_msg!(f64, ex_s_f64, ex_v_f64, F64, rb, msg_len, numsamples, dbrt, dbrcount), DbrType::DbrTimeString => { if dbrcount == 1 { // TODO - let evs = ScalarPlainEvents::Byte(EventValues::empty()); + let evs = ScalarPlainEvents::I8(EventValues::empty()); let plain = PlainEvents::Scalar(evs); let item = EventsItem::Plain(plain); item } else { // TODO - let evs = WavePlainEvents::Double(WaveEvents::empty()); + let evs = WavePlainEvents::F64(WaveEvents::empty()); let plain = PlainEvents::Wave(evs); let item = EventsItem::Plain(plain); item @@ -526,7 +526,7 @@ pub async fn read_data_1( } } debug!("parsed block with {} / {} events", ntot, evs.tss.len()); - let evs = ScalarPlainEvents::Double(evs); + let evs = ScalarPlainEvents::F64(evs); let plain = PlainEvents::Scalar(evs); let item = EventsItem::Plain(plain); item diff --git a/archapp/src/archeng/pipe.rs b/archapp/src/archeng/pipe.rs index b41cca9..a3c4c39 100644 --- a/archapp/src/archeng/pipe.rs +++ b/archapp/src/archeng/pipe.rs @@ -105,47 +105,47 @@ pub async fn make_event_pipe( PlainEvents::Wave(j) => { trace!("EventsItem::Plain Wave for {:?} {:?}", cfgshape, q_agg_kind); match j { - WavePlainEvents::Byte(j) => { + WavePlainEvents::I8(j) => { let binner = WaveXBinner::::create(cfgshape.clone(), q_agg_kind.clone()); let out = binner.process(j); - let item = SingleBinWaveEvents::Byte(out); + let item = SingleBinWaveEvents::I8(out); let item = XBinnedEvents::SingleBinWave(item); let item = EventsItem::XBinnedEvents(item); Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) } - WavePlainEvents::Short(j) => { + WavePlainEvents::I16(j) => { let binner = WaveXBinner::::create(cfgshape.clone(), q_agg_kind.clone()); let out = binner.process(j); - let item = SingleBinWaveEvents::Short(out); + let item = SingleBinWaveEvents::I16(out); let item = XBinnedEvents::SingleBinWave(item); let item = EventsItem::XBinnedEvents(item); Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) } - WavePlainEvents::Int(j) => { + WavePlainEvents::I32(j) => { let binner = WaveXBinner::::create(cfgshape.clone(), q_agg_kind.clone()); let out = binner.process(j); - let item = SingleBinWaveEvents::Int(out); + let item = SingleBinWaveEvents::I32(out); let item = XBinnedEvents::SingleBinWave(item); let item = EventsItem::XBinnedEvents(item); Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) } - WavePlainEvents::Float(j) => { + WavePlainEvents::F32(j) => { let binner = WaveXBinner::::create(cfgshape.clone(), q_agg_kind.clone()); let out = binner.process(j); - let item = SingleBinWaveEvents::Float(out); + let item = SingleBinWaveEvents::F32(out); let item = XBinnedEvents::SingleBinWave(item); let item = EventsItem::XBinnedEvents(item); Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) } - WavePlainEvents::Double(j) => { + WavePlainEvents::F64(j) => { let binner = WaveXBinner::::create(cfgshape.clone(), q_agg_kind.clone()); let out = binner.process(j); - let item = SingleBinWaveEvents::Double(out); + let item = SingleBinWaveEvents::F64(out); let item = XBinnedEvents::SingleBinWave(item); let item = EventsItem::XBinnedEvents(item); Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) @@ -193,47 +193,47 @@ pub async fn make_event_pipe( PlainEvents::Wave(j) => { trace!("EventsItem::Plain Wave for {:?} {:?}", cfgshape, q_agg_kind); match j { - WavePlainEvents::Byte(j) => { + WavePlainEvents::I8(j) => { let binner = WaveNBinner::::create(cfgshape.clone(), q_agg_kind.clone()); let out = binner.process(j); - let item = MultiBinWaveEvents::Byte(out); + let item = MultiBinWaveEvents::I8(out); let item = XBinnedEvents::MultiBinWave(item); let item = EventsItem::XBinnedEvents(item); Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) } - WavePlainEvents::Short(j) => { + WavePlainEvents::I16(j) => { let binner = WaveNBinner::::create(cfgshape.clone(), q_agg_kind.clone()); let out = binner.process(j); - let item = MultiBinWaveEvents::Short(out); + let item = MultiBinWaveEvents::I16(out); let item = XBinnedEvents::MultiBinWave(item); let item = EventsItem::XBinnedEvents(item); Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) } - WavePlainEvents::Int(j) => { + WavePlainEvents::I32(j) => { let binner = WaveNBinner::::create(cfgshape.clone(), q_agg_kind.clone()); let out = binner.process(j); - let item = MultiBinWaveEvents::Int(out); + let item = MultiBinWaveEvents::I32(out); let item = XBinnedEvents::MultiBinWave(item); let item = EventsItem::XBinnedEvents(item); Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) } - WavePlainEvents::Float(j) => { + WavePlainEvents::F32(j) => { let binner = WaveNBinner::::create(cfgshape.clone(), q_agg_kind.clone()); let out = binner.process(j); - let item = MultiBinWaveEvents::Float(out); + let item = MultiBinWaveEvents::F32(out); let item = XBinnedEvents::MultiBinWave(item); let item = EventsItem::XBinnedEvents(item); Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) } - WavePlainEvents::Double(j) => { + WavePlainEvents::F64(j) => { let binner = WaveNBinner::::create(cfgshape.clone(), q_agg_kind.clone()); let out = binner.process(j); - let item = MultiBinWaveEvents::Double(out); + let item = MultiBinWaveEvents::F64(out); let item = XBinnedEvents::MultiBinWave(item); let item = EventsItem::XBinnedEvents(item); Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) diff --git a/archapp/src/events.rs b/archapp/src/events.rs index 1bb519c..7eed341 100644 --- a/archapp/src/events.rs +++ b/archapp/src/events.rs @@ -290,11 +290,11 @@ impl FrameMakerTrait for FrameMaker { let shape = &self.shape; let agg_kind = &self.agg_kind; match scalar_type { - ScalarType::I8 => arm1!(item, i8, Byte, shape, agg_kind), - ScalarType::I16 => arm1!(item, i16, Short, shape, agg_kind), - ScalarType::I32 => arm1!(item, i32, Int, shape, agg_kind), - ScalarType::F32 => arm1!(item, f32, Float, shape, agg_kind), - ScalarType::F64 => arm1!(item, f64, Double, shape, agg_kind), + ScalarType::I8 => arm1!(item, i8, I8, shape, agg_kind), + ScalarType::I16 => arm1!(item, i16, I16, shape, agg_kind), + ScalarType::I32 => arm1!(item, i32, I32, shape, agg_kind), + ScalarType::F32 => arm1!(item, f32, F32, shape, agg_kind), + ScalarType::F64 => arm1!(item, f64, F64, shape, agg_kind), _ => { warn!("TODO for scalar_type {:?}", scalar_type); err::todoval() @@ -625,7 +625,7 @@ async fn linear_search_2( #[allow(unused)] fn events_item_to_framable(ei: EventsItem) -> Result, Error> { match ei { - EventsItem::Plain(PlainEvents::Scalar(ScalarPlainEvents::Int(h))) => { + EventsItem::Plain(PlainEvents::Scalar(ScalarPlainEvents::I32(h))) => { let range: NanoRange = err::todoval(); let (x, y) = h .tss diff --git a/archapp/src/parse.rs b/archapp/src/parse.rs index 3d19d12..bdccc3e 100644 --- a/archapp/src/parse.rs +++ b/archapp/src/parse.rs @@ -50,7 +50,7 @@ fn parse_scalar_byte(m: &[u8], year: u32) -> Result { let v = msg.get_val().first().map_or(0, |k| *k as i8); t.tss.push(ts); t.values.push(v); - Ok(EventsItem::Plain(PlainEvents::Scalar(ScalarPlainEvents::Byte(t)))) + Ok(EventsItem::Plain(PlainEvents::Scalar(ScalarPlainEvents::I8(t)))) } macro_rules! scalar_parse { @@ -65,6 +65,7 @@ macro_rules! scalar_parse { let ts = yd.timestamp() as u64 * 1000000000 + msg.get_secondsintoyear() as u64 * 1000000000 + msg.get_nano() as u64; let v = msg.get_val(); + //eprintln!("ts {} val {}", ts, v); t.tss.push(ts); t.values.push(v as $evty); EventsItem::Plain(PlainEvents::Scalar(ScalarPlainEvents::$eit(t))) @@ -175,37 +176,37 @@ impl PbFileReader { let ei = match payload_type { SCALAR_BYTE => parse_scalar_byte(m, year)?, SCALAR_ENUM => { - scalar_parse!(m, year, ScalarEnum, Int, i32) + scalar_parse!(m, year, ScalarEnum, I32, i32) } SCALAR_SHORT => { - scalar_parse!(m, year, ScalarShort, Short, i16) + scalar_parse!(m, year, ScalarShort, I16, i16) } SCALAR_INT => { - scalar_parse!(m, year, ScalarInt, Int, i32) + scalar_parse!(m, year, ScalarInt, I32, i32) } SCALAR_FLOAT => { - scalar_parse!(m, year, ScalarFloat, Float, f32) + scalar_parse!(m, year, ScalarFloat, F32, f32) } SCALAR_DOUBLE => { - scalar_parse!(m, year, ScalarDouble, Double, f64) + scalar_parse!(m, year, ScalarDouble, F64, f64) } WAVEFORM_BYTE => { - wave_parse!(m, year, VectorChar, Byte, i8) + wave_parse!(m, year, VectorChar, I8, i8) } WAVEFORM_SHORT => { - wave_parse!(m, year, VectorShort, Short, i16) + wave_parse!(m, year, VectorShort, I16, i16) } WAVEFORM_ENUM => { - wave_parse!(m, year, VectorEnum, Int, i32) + wave_parse!(m, year, VectorEnum, I32, i32) } WAVEFORM_INT => { - wave_parse!(m, year, VectorInt, Int, i32) + wave_parse!(m, year, VectorInt, I32, i32) } WAVEFORM_FLOAT => { - wave_parse!(m, year, VectorFloat, Float, f32) + wave_parse!(m, year, VectorFloat, F32, f32) } WAVEFORM_DOUBLE => { - wave_parse!(m, year, VectorDouble, Double, f64) + wave_parse!(m, year, VectorDouble, F64, f64) } SCALAR_STRING | WAVEFORM_STRING | V4_GENERIC_BYTES => { return Err(Error::with_msg_no_trace(format!("not supported: {:?}", payload_type))); diff --git a/disk/src/decode.rs b/disk/src/decode.rs index 4158793..597a436 100644 --- a/disk/src/decode.rs +++ b/disk/src/decode.rs @@ -4,10 +4,13 @@ use crate::eventchunker::EventFull; use err::Error; use futures_core::Stream; use futures_util::StreamExt; +use items::eventsitem::EventsItem; use items::eventvalues::EventValues; use items::numops::{BoolNum, NumOps}; +use items::plainevents::{PlainEvents, ScalarPlainEvents}; use items::waveevents::{WaveEvents, WaveNBinner, WavePlainProc, WaveXBinner}; -use items::{Appendable, EventAppendable, EventsNodeProcessor, RangeCompletableItem, StreamItem}; +use items::{Appendable, EventAppendable, EventsNodeProcessor, RangeCompletableItem, Sitemty, StreamItem}; +use netpod::{ScalarType, Shape}; use std::marker::PhantomData; use std::mem::size_of; use std::pin::Pin; @@ -300,3 +303,206 @@ where } } } + +pub struct EventsItemStream { + inp: Pin>>>, + done: bool, + complete: bool, +} + +impl EventsItemStream { + pub fn new(inp: Pin>>>) -> Self { + Self { + inp, + done: false, + complete: false, + } + } + + // TODO need some default expectation about the content type, because real world data does not + // always contain that information per event, or even contains wrong information. + fn decode(&mut self, ev: &EventFull) -> Result, Error> { + // TODO define expected endian from parameters: + let big_endian = false; + // TODO: + let mut tyi = None; + let mut ret = None; + for i1 in 0..ev.tss.len() { + let ts = ev.tss[i1]; + let pulse = ev.pulses[i1]; + // TODO check that dtype, event endianness and event shape match our static + // expectation about the data in this channel. + let _ty = &ev.scalar_types[i1]; + let be = ev.be[i1]; + if be != big_endian { + return Err(Error::with_msg(format!("big endian mismatch {} vs {}", be, big_endian))); + } + // TODO bad, data on disk is inconsistent, can not rely on endian as stated in channel config. + let decomp = ev.decomp(i1); + // If not done yet, infer the actual type from the (undocumented) combinations of channel + // config parameters and values in the event data. + // TODO + match &tyi { + Some(_) => {} + None => { + //let cont = EventValues::::empty(); + tyi = Some((ev.scalar_types[i1].clone(), ev.shapes[i1].clone())); + match &tyi.as_ref().unwrap().1 { + Shape::Scalar => match &tyi.as_ref().unwrap().0 { + ScalarType::U8 => { + // TODO + let cont = EventValues::::empty(); + ret = Some(EventsItem::Plain(PlainEvents::Scalar(ScalarPlainEvents::I8(cont)))); + } + ScalarType::U16 => { + // TODO + let cont = EventValues::::empty(); + ret = Some(EventsItem::Plain(PlainEvents::Scalar(ScalarPlainEvents::I16(cont)))); + } + ScalarType::U32 => { + // TODO + let cont = EventValues::::empty(); + ret = Some(EventsItem::Plain(PlainEvents::Scalar(ScalarPlainEvents::I32(cont)))); + } + ScalarType::U64 => { + // TODO + let cont = EventValues::::empty(); + ret = Some(EventsItem::Plain(PlainEvents::Scalar(ScalarPlainEvents::I32(cont)))); + } + ScalarType::I8 => { + let cont = EventValues::::empty(); + ret = Some(EventsItem::Plain(PlainEvents::Scalar(ScalarPlainEvents::I8(cont)))); + } + ScalarType::I16 => { + let cont = EventValues::::empty(); + ret = Some(EventsItem::Plain(PlainEvents::Scalar(ScalarPlainEvents::I16(cont)))); + } + ScalarType::I32 => { + let cont = EventValues::::empty(); + ret = Some(EventsItem::Plain(PlainEvents::Scalar(ScalarPlainEvents::I32(cont)))); + } + ScalarType::I64 => { + // TODO + let cont = EventValues::::empty(); + ret = Some(EventsItem::Plain(PlainEvents::Scalar(ScalarPlainEvents::I32(cont)))); + } + ScalarType::F32 => { + let cont = EventValues::::empty(); + ret = Some(EventsItem::Plain(PlainEvents::Scalar(ScalarPlainEvents::F32(cont)))); + } + ScalarType::F64 => { + let cont = EventValues::::empty(); + ret = Some(EventsItem::Plain(PlainEvents::Scalar(ScalarPlainEvents::F64(cont)))); + } + ScalarType::BOOL => { + // TODO + let cont = EventValues::::empty(); + ret = Some(EventsItem::Plain(PlainEvents::Scalar(ScalarPlainEvents::I8(cont)))); + } + }, + Shape::Wave(_) => todo!(), + Shape::Image(..) => todo!(), + } + } + }; + // TODO here, I expect that we found the type. + let tyi = tyi.as_ref().unwrap(); + match &tyi.1 { + Shape::Scalar => match &tyi.0 { + ScalarType::U8 => todo!(), + ScalarType::U16 => todo!(), + ScalarType::U32 => todo!(), + ScalarType::U64 => todo!(), + ScalarType::I8 => todo!(), + ScalarType::I16 => todo!(), + ScalarType::I32 => todo!(), + ScalarType::I64 => todo!(), + ScalarType::F32 => todo!(), + ScalarType::F64 => { + let conv = EventValuesDim0Case::::new(); + let val = EventValueFromBytes::<_, LittleEndian>::convert(&conv, decomp, big_endian)?; + match &mut ret { + Some(ret) => match ret { + EventsItem::Plain(ret) => match ret { + PlainEvents::Scalar(ret) => match ret { + ScalarPlainEvents::F64(ret) => { + ret.tss.push(ts); + // TODO + let _ = pulse; + ret.values.push(val); + } + _ => panic!(), + }, + PlainEvents::Wave(_) => panic!(), + }, + EventsItem::XBinnedEvents(_) => todo!(), + }, + None => panic!(), + } + } + ScalarType::BOOL => todo!(), + }, + Shape::Wave(_) => todo!(), + Shape::Image(_, _) => todo!(), + } + //let val = self.evs.convert(decomp, be)?; + //let k = <>::Batch as EventAppendable>::append_event(ret, ev.tss[i1], val); + } + Ok(ret) + } +} + +impl Stream for EventsItemStream { + type Item = Sitemty; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + loop { + break if self.complete { + panic!("poll_next on complete") + } else if self.done { + self.complete = true; + Ready(None) + } else { + match self.inp.poll_next_unpin(cx) { + Ready(item) => match item { + Some(item) => match item { + Ok(item) => match item { + StreamItem::DataItem(item) => match item { + RangeCompletableItem::RangeComplete => { + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) + } + RangeCompletableItem::Data(item) => match self.decode(&item) { + Ok(res) => match res { + Some(res) => { + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(res))))) + } + None => { + continue; + } + }, + Err(e) => { + self.done = true; + Ready(Some(Err(e))) + } + }, + }, + StreamItem::Log(item) => Ready(Some(Ok(StreamItem::Log(item)))), + StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))), + }, + Err(e) => { + self.done = true; + Ready(Some(Err(e))) + } + }, + None => { + self.done = true; + Ready(None) + } + }, + Pending => Pending, + } + }; + } + } +} diff --git a/disk/src/eventchunker.rs b/disk/src/eventchunker.rs index a4fe8f6..d7730cc 100644 --- a/disk/src/eventchunker.rs +++ b/disk/src/eventchunker.rs @@ -436,6 +436,7 @@ pub struct EventFull { pub pulses: Vec, pub blobs: Vec>, #[serde(serialize_with = "decomps_ser", deserialize_with = "decomps_de")] + // TODO allow access to `decomps` via method which checks first if `blobs` is already the decomp. pub decomps: Vec>, pub scalar_types: Vec, pub be: Vec, @@ -510,6 +511,13 @@ impl EventFull { self.shapes.push(shape); self.comps.push(comp); } + + pub fn decomp(&self, i: usize) -> &[u8] { + match &self.decomps[i] { + Some(decomp) => &decomp, + None => &self.blobs[i], + } + } } impl SitemtyFrameType for EventFull { diff --git a/dq/src/bin/daq-convert.rs b/dq/src/bin/daq-convert.rs new file mode 100644 index 0000000..777427f --- /dev/null +++ b/dq/src/bin/daq-convert.rs @@ -0,0 +1,53 @@ +use clap::{crate_version, Parser}; +use err::Error; +use std::path::PathBuf; + +#[derive(Debug, Parser)] +#[clap(name="DAQ buffer tools", version=crate_version!())] +pub struct Opts { + #[clap(short, long, parse(from_occurrences))] + pub verbose: u32, + #[clap(subcommand)] + pub subcmd: SubCmd, +} + +#[derive(Debug, Parser)] +pub enum SubCmd { + #[clap(about = "Convert a channel from the Archiver Appliance into Databuffer format.")] + ConvertArchiverApplianceChannel(ConvertArchiverApplianceChannel), +} + +#[derive(Debug, Parser)] +pub struct ConvertArchiverApplianceChannel { + #[clap( + long, + about = "Prefix for keyspaces, e.g. specify `daq` to get scalar keyspace directory `daq_2`" + )] + keyspace_prefix: String, + #[clap(long, about = "Name of the channel to convert")] + channel_name: String, + #[clap(long, about = "Look for archiver appliance data at given path")] + input_dir: PathBuf, + #[clap(long, about = "Generate Databuffer format at given path")] + output_dir: PathBuf, +} + +pub fn main() -> Result<(), Error> { + taskrun::run(async { + if false { + return Err(Error::with_msg_no_trace(format!("unknown command"))); + } + let opts = Opts::parse(); + match opts.subcmd { + SubCmd::ConvertArchiverApplianceChannel(sub) => { + let params = dq::ConvertParams { + keyspace_prefix: sub.keyspace_prefix, + channel_name: sub.channel_name, + input_dir: sub.input_dir, + output_dir: sub.output_dir, + }; + dq::convert(params).await + } + } + }) +} diff --git a/dq/src/bin/dq.rs b/dq/src/bin/dq.rs index cd89677..92fa454 100644 --- a/dq/src/bin/dq.rs +++ b/dq/src/bin/dq.rs @@ -1,23 +1,17 @@ -use archapp::events::PositionState; -use archapp::parse::PbFileReader; -use bytes::BufMut; -use chrono::{TimeZone, Utc}; use clap::{crate_version, Parser}; +//use disk::decode::EventValueShape; +//use disk::decode::EventValuesDim0Case; use disk::eventchunker::EventChunkerConf; use err::Error; -use items::plainevents::{PlainEvents, ScalarPlainEvents, WavePlainEvents}; -use netpod::query::RawEventsQuery; -use netpod::{log::*, ByteOrder, ByteSize, ChannelConfig, HasScalarType, HasShape}; -use netpod::{timeunits::*, Shape}; -use netpod::{AggKind, Channel, NanoRange, Nanos, ScalarType}; -use parse::channelconfig::Config; -use std::io::SeekFrom; -use std::mem::take; +use netpod::log::*; +#[allow(unused)] +use netpod::timeunits::*; +use netpod::{ByteOrder, ByteSize, Channel, ChannelConfig, NanoRange, Shape}; use std::path::PathBuf; use std::sync::atomic::AtomicU64; use std::sync::Arc; -use tokio::fs::{File, OpenOptions}; -use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; +use tokio::fs::File; +use tokio::io::AsyncReadExt; #[derive(Debug, Parser)] #[clap(name="DAQ buffer tools", version=crate_version!())] @@ -65,264 +59,6 @@ pub struct ReadDatabufferDatafile { datafile: PathBuf, } -trait WritableValue { - fn put_value(&self, buf: &mut Vec); -} - -impl WritableValue for f32 { - fn put_value(&self, buf: &mut Vec) { - buf.put_f32(*self); - } -} - -impl WritableValue for f64 { - fn put_value(&self, buf: &mut Vec) { - buf.put_f64(*self); - } -} - -struct DataWriter { - output_dir: PathBuf, - kspre: String, - channel: Channel, - bs: Nanos, - tb: u64, - datafile: Option, - indexfile: Option, - wpos: u64, - buf1: Vec, -} - -impl DataWriter { - async fn new(output_dir: PathBuf, kspre: String, channel: Channel, bs: Nanos) -> Result { - let ret = Self { - output_dir, - kspre, - channel, - bs, - tb: u64::MAX, - datafile: None, - indexfile: None, - wpos: 0, - buf1: vec![0; 1024 * 1024], - }; - Ok(ret) - } - - async fn write_item(&mut self, item: &PlainEvents) -> Result<(), Error> { - match item { - PlainEvents::Scalar(item) => match item { - ScalarPlainEvents::Float(events) => { - self.write_events(2, ScalarType::F32, &events.tss, &events.values) - .await?; - } - ScalarPlainEvents::Double(events) => { - self.write_events(2, ScalarType::F64, &events.tss, &events.values) - .await?; - } - _ => todo!(), - }, - PlainEvents::Wave(item) => match item { - WavePlainEvents::Double(_events) => { - todo!() - } - _ => todo!(), - }, - } - Ok(()) - } - - async fn write_events( - &mut self, - ks: u32, - scalar_type: ScalarType, - tss: &Vec, - vals: &Vec, - ) -> Result<(), Error> { - let split = 0; - assert_eq!(tss.len(), vals.len()); - for i in 0..tss.len() { - let ts = tss[i]; - let tb = ts / self.bs.ns; - if tb != self.tb { - let tbdate = chrono::Utc.timestamp((tb * (self.bs.ns / SEC)) as i64, 0); - eprintln!("Create directory for timebin {}", tbdate); - let p1 = self.output_dir.join(format!("{}_{}", self.kspre, ks)); - let p2 = p1.join(self.channel.name()); - let p3 = p2.join(format!("{:019}", tb)); - let p4 = p3.join(format!("{:010}", split)); - let p5 = p4.join(format!("{:019}_00000_Data", self.bs.ns / MS)); - let p6 = p4.join(format!("{:019}_00000_Data_Index", self.bs.ns / MS)); - tokio::fs::create_dir_all(&p4).await.map_err(|e| { - error!("Can not create {:?}", p4); - e - })?; - let mut file = OpenOptions::new() - .create(true) - .truncate(true) - .write(true) - .open(&p5) - .await - .map_err(|e| { - error!("can not create new file {:?}", p5); - e - })?; - file.write_all(&0u16.to_be_bytes()).await?; - let chs = self.channel.name().as_bytes(); - let len1 = (chs.len() + 8) as u32; - file.write_all(&len1.to_be_bytes()).await?; - file.write_all(chs).await?; - file.write_all(&len1.to_be_bytes()).await?; - self.wpos = 10 + chs.len() as u64; - self.datafile = Some(file); - if ks == 3 { - let mut file = OpenOptions::new() - .create(true) - .truncate(true) - .write(true) - .open(&p6) - .await - .map_err(|e| { - error!("can not create new file {:?}", p6); - e - })?; - file.write_all(&0u16.to_be_bytes()).await?; - self.indexfile = Some(file); - } - self.tb = tb; - } - let file = self.datafile.as_mut().unwrap(); - let mut buf = take(&mut self.buf1); - buf.clear(); - buf.put_i32(0); - buf.put_u64(0); - buf.put_u64(ts); - buf.put_u64(0); - buf.put_u64(0); - // Status, Severity - buf.put_u8(0); - buf.put_u8(0); - buf.put_i32(-1); - let flags = 0; - buf.put_u8(flags); - buf.put_u8(scalar_type.index()); - vals[i].put_value(&mut buf); - buf.put_i32(0); - let len1 = buf.len(); - buf[0..4].as_mut().put_u32(len1 as u32); - buf[len1 - 4..len1].as_mut().put_u32(len1 as u32); - file.write_all(&buf).await?; - self.buf1 = buf; - if ks == 3 { - let file = self.indexfile.as_mut().unwrap(); - let mut buf = take(&mut self.buf1); - buf.clear(); - buf.put_u64(ts); - buf.put_u64(self.wpos); - file.write_all(&buf).await?; - self.buf1 = buf; - } - self.wpos += len1 as u64; - } - Ok(()) - } - - async fn write_config(&mut self, config: &Config) -> Result<(), Error> { - eprintln!("Create directory for channel config"); - let p1 = self.output_dir.join("config").join(self.channel.name()).join("latest"); - tokio::fs::create_dir_all(&p1).await.map_err(|e| { - error!("Can not create {:?}", p1); - e - })?; - let mut file = OpenOptions::new() - .create(true) - .truncate(true) - .write(true) - .open(p1.join("00000_Config")) - .await - .map_err(|e| { - error!("can not create config file in {:?}", p1); - e - })?; - let mut buf = take(&mut self.buf1); - { - buf.clear(); - buf.put_u16(0); - file.write_all(&buf).await?; - } - { - buf.clear(); - let chs = self.channel.name().as_bytes(); - let len1 = (chs.len() + 8) as u32; - buf.put_u32(len1); - buf.put_slice(chs); - buf.put_u32(len1); - //let len1 = buf.len(); - //buf[0..4].as_mut().put_u32(len1 as u32); - //buf[len1 - 4..len1].as_mut().put_u32(len1 as u32); - file.write_all(&buf).await?; - } - { - let e = &config.entries[0]; - buf.clear(); - buf.put_u32(0); - buf.put_u64(0); - buf.put_u64(0); - buf.put_i32(e.ks); - buf.put_u64(e.bs.ns / MS); - buf.put_i32(e.split_count); - buf.put_i32(e.status); - buf.put_i8(e.bb); - buf.put_i32(e.modulo); - buf.put_i32(e.offset); - buf.put_i16(e.precision); - let dtlen = 0; - buf.put_i32(dtlen); - let flags = 0; - buf.put_u8(flags); - buf.put_u8(e.scalar_type.index()); - if false { - // is shaped? - buf.put_u8(1); - buf.put_u32(16); - } - buf.put_i32(-1); - buf.put_i32(-1); - buf.put_i32(-1); - buf.put_i32(-1); - buf.put_i32(-1); - buf.put_u32(0); - let len1 = buf.len(); - buf[0..4].as_mut().put_u32(len1 as u32); - buf[len1 - 4..len1].as_mut().put_u32(len1 as u32); - file.write_all(&buf).await?; - } - self.buf1 = buf; - Ok(()) - } -} - -impl Drop for DataWriter { - fn drop(&mut self) { - let indexfile = self.indexfile.take(); - let datafile = self.datafile.take(); - tokio::task::spawn(async move { - match indexfile { - Some(mut file) => { - let _ = file.flush().await; - } - None => {} - } - match datafile { - Some(mut file) => { - let _ = file.flush().await; - } - None => {} - } - }); - } -} - pub fn main() -> Result<(), Error> { taskrun::run(async { if false { @@ -331,189 +67,13 @@ pub fn main() -> Result<(), Error> { let opts = Opts::parse(); match opts.subcmd { SubCmd::ConvertArchiverApplianceChannel(sub) => { - let _ = tokio::fs::create_dir(&sub.output_dir).await; - let meta = tokio::fs::metadata(&sub.output_dir).await?; - if !meta.is_dir() { - return Err(Error::from_string(format!( - "Given output path is not a directory: {:?}", - sub.output_dir - ))); - } - let bs = Nanos::from_ns(DAY); - let mut channel_config: Option = None; - let channel = Channel { - backend: String::new(), - name: sub.channel_name.into(), + let params = dq::ConvertParams { + keyspace_prefix: sub.keyspace_prefix, + channel_name: sub.channel_name, + input_dir: sub.input_dir, + output_dir: sub.output_dir, }; - let mut data_writer = - DataWriter::new(sub.output_dir, sub.keyspace_prefix.into(), channel.clone(), bs.clone()).await?; - let chandir = archapp::events::directory_for_channel_files(&channel, &sub.input_dir)?; - eprintln!("Looking for files in: {:?}", chandir); - let files = archapp::events::find_files_for_channel(&sub.input_dir, &channel).await?; - let mut evstot = 0; - for file in files { - eprintln!("Try to open {:?}", file); - let fni = archapp::events::parse_data_filename(file.to_str().unwrap())?; - debug!("fni: {:?}", fni); - let ts0 = Utc.ymd(fni.year as i32, fni.month, 1).and_hms(0, 0, 0); - let ts1 = ts0.timestamp() as u64 * SEC + ts0.timestamp_subsec_nanos() as u64; - let _ = ts1; - let mut f1 = File::open(&file).await?; - let _flen = f1.seek(SeekFrom::End(0)).await?; - f1.seek(SeekFrom::Start(0)).await?; - let pbr = PbFileReader::new(f1).await?; - debug!("channel name in pbr file: {:?}", pbr.channel_name()); - debug!("data type in file: {:?}", pbr.payload_type()); - let evq = RawEventsQuery { - channel: channel.clone(), - range: NanoRange { - beg: u64::MIN, - end: u64::MAX, - }, - agg_kind: AggKind::Plain, - disk_io_buffer_size: 1024 * 4, - do_decompress: true, - }; - let f1 = pbr.into_file(); - // TODO can the positioning-logic maybe re-use the pbr? - let z = archapp::events::position_file_for_evq(f1, evq.clone(), fni.year).await?; - if let PositionState::Positioned(pos) = z.state { - let mut pbr = z.pbr; - assert_eq!(pos, pbr.abspos()); - let mut i1 = 0; - let mut repnext = u64::MAX; - loop { - match pbr.read_msg().await { - Ok(Some(ei)) => { - use items::{WithLen, WithTimestamps}; - let ei = ei.item; - if ei.is_wave() { - eprintln!("ERROR wave channels are not yet fully supported"); - return Ok(()); - } - if ei.len() > 0 { - let scalar_type = ei.scalar_type(); - let shape = match &ei { - items::eventsitem::EventsItem::Plain(k) => match k.shape() { - Shape::Scalar => None, - Shape::Wave(n) => Some(vec![n]), - Shape::Image(..) => panic!(), - }, - items::eventsitem::EventsItem::XBinnedEvents(_) => panic!(), - }; - if let Some(conf) = &channel_config { - if scalar_type != conf.entries[0].scalar_type { - let msg = format!( - "unexpected type: {:?} vs {:?}", - scalar_type, conf.entries[0].scalar_type - ); - return Err(Error::with_msg_no_trace(msg)); - } - if shape != conf.entries[0].shape { - let msg = format!( - "unexpected shape: {:?} vs {:?}", - shape, conf.entries[0].shape - ); - return Err(Error::with_msg_no_trace(msg)); - } - } - if channel_config.is_none() { - let ks = if ei.is_wave() { 3 } else { 2 }; - let scalar_type_2 = match &ei { - items::eventsitem::EventsItem::Plain(k) => match k { - PlainEvents::Scalar(k) => match k { - ScalarPlainEvents::Byte(_) => ScalarType::I8, - ScalarPlainEvents::Short(_) => ScalarType::I16, - ScalarPlainEvents::Int(_) => ScalarType::I32, - ScalarPlainEvents::Float(_) => ScalarType::F32, - ScalarPlainEvents::Double(_) => ScalarType::F64, - }, - PlainEvents::Wave(k) => match k { - WavePlainEvents::Byte(_) => ScalarType::I8, - WavePlainEvents::Short(_) => ScalarType::I16, - WavePlainEvents::Int(_) => ScalarType::I32, - WavePlainEvents::Float(_) => ScalarType::F32, - WavePlainEvents::Double(_) => ScalarType::F64, - }, - }, - items::eventsitem::EventsItem::XBinnedEvents(_) => panic!(), - }; - if scalar_type_2 != scalar_type { - let msg = format!( - "unexpected type: {:?} vs {:?}", - scalar_type_2, scalar_type - ); - return Err(Error::with_msg_no_trace(msg)); - } - let e = parse::channelconfig::ConfigEntry { - ts: 0, - pulse: 0, - ks, - bs: bs.clone(), - split_count: 1, - status: 0, - bb: 0, - modulo: 0, - offset: 0, - precision: 0, - scalar_type: scalar_type, - is_compressed: false, - is_shaped: false, - is_array: false, - byte_order: netpod::ByteOrder::LE, - compression_method: None, - shape, - source_name: None, - unit: None, - description: None, - optional_fields: None, - value_converter: None, - }; - let k = parse::channelconfig::Config { - format_version: 0, - channel_name: channel.name().into(), - entries: vec![e], - }; - channel_config = Some(k); - } - match &ei { - items::eventsitem::EventsItem::Plain(item) => { - data_writer.write_item(item).await?; - } - items::eventsitem::EventsItem::XBinnedEvents(_) => { - panic!() - } - } - } - let tslast = if ei.len() > 0 { Some(ei.ts(ei.len() - 1)) } else { None }; - if i1 == repnext { - debug!("read msg from file {} len {} tslast {:?}", i1, ei.len(), tslast); - repnext = 1 + 4 * repnext / 3; - } - i1 += 1; - if false { - ei.x_aggregate(&evq.agg_kind); - } - } - Ok(None) => { - debug!("reached end of file"); - break; - } - Err(e) => { - error!("error while reading msg {:?}", e); - break; - } - } - } - debug!("read total {} events from the last file", i1); - evstot += i1; - } else { - error!("Position fail."); - } - } - eprintln!("Total number of events converted: {}", evstot); - data_writer.write_config(channel_config.as_ref().unwrap()).await?; - Ok(()) + dq::convert(params).await } SubCmd::ReadDatabufferConfigfile(sub) => { let mut file = File::open(&sub.configfile).await?; @@ -526,7 +86,6 @@ pub fn main() -> Result<(), Error> { Err(e) => return Err(Error::with_msg_no_trace(format!("can not parse: {:?}", e))), }; eprintln!("Read config: {:?}", config); - eprintln!("Config bs: {}", config.entries[0].bs.ns / MS); Ok(()) } SubCmd::ReadDatabufferDatafile(sub) => { @@ -539,6 +98,7 @@ pub fn main() -> Result<(), Error> { Ok(k) => k.1, Err(e) => return Err(Error::with_msg_no_trace(format!("can not parse: {:?}", e))), }; + eprintln!("Read config: {:?}", config); let file = File::open(&sub.datafile).await?; let inp = Box::pin(disk::file_content_stream( file, @@ -566,7 +126,7 @@ pub fn main() -> Result<(), Error> { disk_stats_every: ByteSize::mb(2), }; let max_ts = Arc::new(AtomicU64::new(0)); - let mut chunks = disk::eventchunker::EventChunker::from_start( + let chunks = disk::eventchunker::EventChunker::from_start( inp, channel_config.clone(), range, @@ -576,11 +136,11 @@ pub fn main() -> Result<(), Error> { false, true, ); - use futures_util::stream::StreamExt; use items::WithLen; - - while let Some(item) = chunks.next().await { + //let evs = EventValuesDim0Case::::new(); + let mut stream = disk::decode::EventsItemStream::new(Box::pin(chunks)); + while let Some(item) = stream.next().await { let item = item?; match item { items::StreamItem::DataItem(item) => { @@ -589,8 +149,7 @@ pub fn main() -> Result<(), Error> { warn!("RangeComplete"); } items::RangeCompletableItem::Data(item) => { - info!("Data len {}", item.len()); - info!("{:?}", item); + info!("{:?} ({} events)", item, item.len()); } }; } diff --git a/dq/src/dq.rs b/dq/src/dq.rs index 8b13789..85093f0 100644 --- a/dq/src/dq.rs +++ b/dq/src/dq.rs @@ -1 +1,520 @@ +use archapp::events::PositionState; +use archapp::parse::PbFileReader; +use bytes::BufMut; +use chrono::{TimeZone, Utc}; +use err::Error; +use items::plainevents::{PlainEvents, ScalarPlainEvents, WavePlainEvents}; +use netpod::log::*; +use netpod::query::RawEventsQuery; +use netpod::timeunits::*; +use netpod::{AggKind, Channel, HasScalarType, HasShape, NanoRange, Nanos, ScalarType, Shape}; +use parse::channelconfig::Config; +use std::fmt; +use std::io::SeekFrom; +use std::mem::take; +use std::path::PathBuf; +use tokio::fs::{File, OpenOptions}; +use tokio::io::{AsyncSeekExt, AsyncWriteExt}; +trait WritableValue: fmt::Debug { + fn put_value(&self, buf: &mut Vec); +} + +impl WritableValue for u32 { + fn put_value(&self, buf: &mut Vec) { + buf.put_u32_le(*self); + } +} + +impl WritableValue for i8 { + fn put_value(&self, buf: &mut Vec) { + buf.put_i8(*self); + } +} + +impl WritableValue for i16 { + fn put_value(&self, buf: &mut Vec) { + buf.put_i16_le(*self); + } +} + +impl WritableValue for i32 { + fn put_value(&self, buf: &mut Vec) { + buf.put_i32_le(*self); + } +} + +impl WritableValue for f32 { + fn put_value(&self, buf: &mut Vec) { + buf.put_f32_le(*self); + } +} + +impl WritableValue for f64 { + fn put_value(&self, buf: &mut Vec) { + buf.put_f64_le(*self); + } +} + +struct DataWriter { + output_dir: PathBuf, + kspre: String, + channel: Channel, + bs: Nanos, + tb: u64, + datafile: Option, + indexfile: Option, + wpos: u64, + buf1: Vec, +} + +impl DataWriter { + async fn new(output_dir: PathBuf, kspre: String, channel: Channel, bs: Nanos) -> Result { + let ret = Self { + output_dir, + kspre, + channel, + bs, + tb: u64::MAX, + datafile: None, + indexfile: None, + wpos: 0, + buf1: vec![0; 1024 * 1024], + }; + Ok(ret) + } + + async fn write_item(&mut self, item: &PlainEvents) -> Result<(), Error> { + match item { + PlainEvents::Scalar(item) => match item { + ScalarPlainEvents::U32(events) => { + self.write_events(2, ScalarType::U32, &events.tss, &events.values) + .await?; + } + ScalarPlainEvents::I8(events) => { + self.write_events(2, ScalarType::I8, &events.tss, &events.values) + .await?; + } + ScalarPlainEvents::I16(events) => { + self.write_events(2, ScalarType::I16, &events.tss, &events.values) + .await?; + } + ScalarPlainEvents::I32(events) => { + self.write_events(2, ScalarType::I32, &events.tss, &events.values) + .await?; + } + ScalarPlainEvents::F32(events) => { + self.write_events(2, ScalarType::F32, &events.tss, &events.values) + .await?; + } + ScalarPlainEvents::F64(events) => { + self.write_events(2, ScalarType::F64, &events.tss, &events.values) + .await?; + } + }, + PlainEvents::Wave(item) => match item { + WavePlainEvents::F64(_events) => { + todo!() + } + _ => todo!(), + }, + } + Ok(()) + } + + async fn write_events( + &mut self, + ks: u32, + scalar_type: ScalarType, + tss: &Vec, + vals: &Vec, + ) -> Result<(), Error> { + let split = 0; + assert_eq!(tss.len(), vals.len()); + for i in 0..tss.len() { + let ts = tss[i]; + let tb = ts / self.bs.ns; + if tb != self.tb { + let tbdate = chrono::Utc.timestamp((tb * (self.bs.ns / SEC)) as i64, 0); + eprintln!("Create directory for timebin {}", tbdate); + let p1 = self.output_dir.join(format!("{}_{}", self.kspre, ks)); + let p2 = p1.join(self.channel.name()); + let p3 = p2.join(format!("{:019}", tb)); + let p4 = p3.join(format!("{:010}", split)); + let p5 = p4.join(format!("{:019}_00000_Data", self.bs.ns / MS)); + let p6 = p4.join(format!("{:019}_00000_Data_Index", self.bs.ns / MS)); + tokio::fs::create_dir_all(&p4).await.map_err(|e| { + error!("Can not create {:?}", p4); + e + })?; + let mut file = OpenOptions::new() + .create(true) + .truncate(true) + .write(true) + .open(&p5) + .await + .map_err(|e| { + error!("can not create new file {:?}", p5); + e + })?; + file.write_all(&0u16.to_be_bytes()).await?; + let chs = self.channel.name().as_bytes(); + let len1 = (chs.len() + 8) as u32; + file.write_all(&len1.to_be_bytes()).await?; + file.write_all(chs).await?; + file.write_all(&len1.to_be_bytes()).await?; + self.wpos = 10 + chs.len() as u64; + self.datafile = Some(file); + if ks == 3 { + let mut file = OpenOptions::new() + .create(true) + .truncate(true) + .write(true) + .open(&p6) + .await + .map_err(|e| { + error!("can not create new file {:?}", p6); + e + })?; + file.write_all(&0u16.to_be_bytes()).await?; + self.indexfile = Some(file); + } + self.tb = tb; + } + let file = self.datafile.as_mut().unwrap(); + let mut buf = take(&mut self.buf1); + buf.clear(); + buf.put_i32(0); + buf.put_u64(0); + buf.put_u64(ts); + buf.put_u64(0); + buf.put_u64(0); + // Status, Severity + buf.put_u8(0); + buf.put_u8(0); + buf.put_i32(-1); + let flags = 0; + buf.put_u8(flags); + buf.put_u8(scalar_type.index()); + vals[i].put_value(&mut buf); + buf.put_i32(0); + let len1 = buf.len(); + buf[0..4].as_mut().put_u32(len1 as u32); + buf[len1 - 4..len1].as_mut().put_u32(len1 as u32); + file.write_all(&buf).await?; + self.buf1 = buf; + if ks == 3 { + let file = self.indexfile.as_mut().unwrap(); + let mut buf = take(&mut self.buf1); + buf.clear(); + buf.put_u64(ts); + buf.put_u64(self.wpos); + file.write_all(&buf).await?; + self.buf1 = buf; + } + self.wpos += len1 as u64; + } + Ok(()) + } + + async fn write_config(&mut self, config: &Config) -> Result<(), Error> { + eprintln!("Create directory for channel config"); + let p1 = self.output_dir.join("config").join(self.channel.name()).join("latest"); + tokio::fs::create_dir_all(&p1).await.map_err(|e| { + error!("Can not create {:?}", p1); + e + })?; + let mut file = OpenOptions::new() + .create(true) + .truncate(true) + .write(true) + .open(p1.join("00000_Config")) + .await + .map_err(|e| { + error!("can not create config file in {:?}", p1); + e + })?; + let mut buf = take(&mut self.buf1); + { + buf.clear(); + buf.put_u16(0); + file.write_all(&buf).await?; + } + { + buf.clear(); + let chs = self.channel.name().as_bytes(); + let len1 = (chs.len() + 8) as u32; + buf.put_u32(len1); + buf.put_slice(chs); + buf.put_u32(len1); + //let len1 = buf.len(); + //buf[0..4].as_mut().put_u32(len1 as u32); + //buf[len1 - 4..len1].as_mut().put_u32(len1 as u32); + file.write_all(&buf).await?; + } + { + let e = &config.entries[0]; + buf.clear(); + buf.put_u32(0); + buf.put_u64(0); + buf.put_u64(0); + buf.put_i32(e.ks); + buf.put_u64(e.bs.ns / MS); + buf.put_i32(e.split_count); + buf.put_i32(e.status); + buf.put_i8(e.bb); + buf.put_i32(e.modulo); + buf.put_i32(e.offset); + buf.put_i16(e.precision); + let dtlen = 0; + buf.put_i32(dtlen); + let flags = 0; + buf.put_u8(flags); + buf.put_u8(e.scalar_type.index()); + if false { + // is shaped? + buf.put_u8(1); + buf.put_u32(16); + } + buf.put_i32(-1); + buf.put_i32(-1); + buf.put_i32(-1); + buf.put_i32(-1); + buf.put_i32(-1); + buf.put_u32(0); + let len1 = buf.len(); + buf[0..4].as_mut().put_u32(len1 as u32); + buf[len1 - 4..len1].as_mut().put_u32(len1 as u32); + file.write_all(&buf).await?; + } + self.buf1 = buf; + Ok(()) + } +} + +impl Drop for DataWriter { + fn drop(&mut self) { + let indexfile = self.indexfile.take(); + let datafile = self.datafile.take(); + tokio::task::spawn(async move { + match indexfile { + Some(mut file) => { + let _ = file.flush().await; + } + None => {} + } + match datafile { + Some(mut file) => { + let _ = file.flush().await; + } + None => {} + } + }); + } +} + +#[derive(Clone, Debug)] +pub struct ConvertParams { + pub keyspace_prefix: String, + pub channel_name: String, + pub input_dir: PathBuf, + pub output_dir: PathBuf, +} + +pub async fn convert(convert_params: ConvertParams) -> Result<(), Error> { + let _ = tokio::fs::create_dir(&convert_params.output_dir).await; + let meta = tokio::fs::metadata(&convert_params.output_dir).await?; + if !meta.is_dir() { + return Err(Error::from_string(format!( + "Given output path is not a directory: {:?}", + convert_params.output_dir + ))); + } + let bs = Nanos::from_ns(DAY); + let mut channel_config: Option = None; + let channel = Channel { + backend: String::new(), + name: convert_params.channel_name.into(), + }; + let mut data_writer = DataWriter::new( + convert_params.output_dir, + convert_params.keyspace_prefix.into(), + channel.clone(), + bs.clone(), + ) + .await?; + let chandir = archapp::events::directory_for_channel_files(&channel, &convert_params.input_dir)?; + eprintln!("Looking for files in: {:?}", chandir); + let files = archapp::events::find_files_for_channel(&convert_params.input_dir, &channel).await?; + let mut evstot = 0; + for file in files { + eprintln!("Try to open {:?}", file); + let fni = archapp::events::parse_data_filename(file.to_str().unwrap())?; + debug!("fni: {:?}", fni); + let ts0 = Utc.ymd(fni.year as i32, fni.month, 1).and_hms(0, 0, 0); + let ts1 = ts0.timestamp() as u64 * SEC + ts0.timestamp_subsec_nanos() as u64; + let _ = ts1; + let mut f1 = File::open(&file).await?; + let _flen = f1.seek(SeekFrom::End(0)).await?; + f1.seek(SeekFrom::Start(0)).await?; + let pbr = PbFileReader::new(f1).await?; + eprintln!( + "PBR file header channel name: {:?} data type: {:?}", + pbr.channel_name(), + pbr.payload_type() + ); + debug!("channel name in pbr file: {:?}", pbr.channel_name()); + debug!("data type in file: {:?}", pbr.payload_type()); + if pbr.channel_name() != channel.name() { + return Err(Error::with_msg(format!( + "channel name mismatch: {:?} vs {:?}", + pbr.channel_name(), + channel.name() + ))); + } + let evq = RawEventsQuery { + channel: channel.clone(), + range: NanoRange { + beg: u64::MIN, + end: u64::MAX, + }, + agg_kind: AggKind::Plain, + disk_io_buffer_size: 1024 * 4, + do_decompress: true, + }; + let f1 = pbr.into_file(); + // TODO can the positioning-logic maybe re-use the pbr? + let z = archapp::events::position_file_for_evq(f1, evq.clone(), fni.year).await?; + if let PositionState::Positioned(pos) = z.state { + let mut pbr = z.pbr; + assert_eq!(pos, pbr.abspos()); + let mut i1 = 0; + let mut repnext = u64::MAX; + loop { + match pbr.read_msg().await { + Ok(Some(ei)) => { + use items::{WithLen, WithTimestamps}; + let ei = ei.item; + if ei.is_wave() { + eprintln!("ERROR wave channels are not yet fully supported"); + return Ok(()); + } + if ei.len() > 0 { + let scalar_type = ei.scalar_type(); + let shape = match &ei { + items::eventsitem::EventsItem::Plain(k) => match k.shape() { + Shape::Scalar => None, + Shape::Wave(n) => Some(vec![n]), + Shape::Image(..) => panic!(), + }, + items::eventsitem::EventsItem::XBinnedEvents(_) => panic!(), + }; + if let Some(conf) = &channel_config { + if scalar_type != conf.entries[0].scalar_type { + let msg = format!( + "unexpected type: {:?} vs {:?}", + scalar_type, conf.entries[0].scalar_type + ); + return Err(Error::with_msg_no_trace(msg)); + } + if shape != conf.entries[0].shape { + let msg = format!("unexpected shape: {:?} vs {:?}", shape, conf.entries[0].shape); + return Err(Error::with_msg_no_trace(msg)); + } + } + if channel_config.is_none() { + let ks = if ei.is_wave() { 3 } else { 2 }; + let scalar_type_2 = match &ei { + items::eventsitem::EventsItem::Plain(k) => match k { + PlainEvents::Scalar(k) => match k { + ScalarPlainEvents::U32(_) => ScalarType::U32, + ScalarPlainEvents::I8(_) => ScalarType::I8, + ScalarPlainEvents::I16(_) => ScalarType::I16, + ScalarPlainEvents::I32(_) => ScalarType::I32, + ScalarPlainEvents::F32(_) => ScalarType::F32, + ScalarPlainEvents::F64(_) => ScalarType::F64, + }, + PlainEvents::Wave(k) => match k { + WavePlainEvents::I8(_) => ScalarType::I8, + WavePlainEvents::I16(_) => ScalarType::I16, + WavePlainEvents::I32(_) => ScalarType::I32, + WavePlainEvents::F32(_) => ScalarType::F32, + WavePlainEvents::F64(_) => ScalarType::F64, + }, + }, + items::eventsitem::EventsItem::XBinnedEvents(_) => panic!(), + }; + if scalar_type_2 != scalar_type { + let msg = format!("unexpected type: {:?} vs {:?}", scalar_type_2, scalar_type); + return Err(Error::with_msg_no_trace(msg)); + } + let e = parse::channelconfig::ConfigEntry { + ts: 0, + pulse: 0, + ks, + bs: bs.clone(), + split_count: 1, + status: 0, + bb: 0, + modulo: 0, + offset: 0, + precision: 0, + scalar_type: scalar_type, + is_compressed: false, + is_shaped: false, + is_array: false, + byte_order: netpod::ByteOrder::LE, + compression_method: None, + shape, + source_name: None, + unit: None, + description: None, + optional_fields: None, + value_converter: None, + }; + let k = parse::channelconfig::Config { + format_version: 0, + channel_name: channel.name().into(), + entries: vec![e], + }; + channel_config = Some(k); + } + match &ei { + items::eventsitem::EventsItem::Plain(item) => { + data_writer.write_item(item).await?; + } + items::eventsitem::EventsItem::XBinnedEvents(_) => { + panic!() + } + } + } + let tslast = if ei.len() > 0 { Some(ei.ts(ei.len() - 1)) } else { None }; + if i1 == repnext { + debug!("read msg from file {} len {} tslast {:?}", i1, ei.len(), tslast); + repnext = 1 + 4 * repnext / 3; + } + i1 += 1; + if false { + ei.x_aggregate(&evq.agg_kind); + } + } + Ok(None) => { + debug!("reached end of file"); + break; + } + Err(e) => { + error!("error while reading msg {:?}", e); + break; + } + } + } + debug!("read total {} events from the last file", i1); + evstot += i1; + } else { + error!("Position fail."); + } + } + eprintln!("Total number of events converted: {}", evstot); + data_writer.write_config(channel_config.as_ref().unwrap()).await?; + Ok(()) +} diff --git a/items/src/binnedevents.rs b/items/src/binnedevents.rs index bc0d157..1c4d977 100644 --- a/items/src/binnedevents.rs +++ b/items/src/binnedevents.rs @@ -11,31 +11,31 @@ use crate::{ #[derive(Debug, Serialize, Deserialize)] pub enum SingleBinWaveEvents { - Byte(XBinnedScalarEvents), - Short(XBinnedScalarEvents), - Int(XBinnedScalarEvents), - Float(XBinnedScalarEvents), - Double(XBinnedScalarEvents), + I8(XBinnedScalarEvents), + I16(XBinnedScalarEvents), + I32(XBinnedScalarEvents), + F32(XBinnedScalarEvents), + F64(XBinnedScalarEvents), } impl SingleBinWaveEvents { pub fn variant_name(&self) -> String { use SingleBinWaveEvents::*; match self { - Byte(_) => format!("Byte"), - Short(_) => format!("Short"), - Int(_) => format!("Int"), - Float(_) => format!("Float"), - Double(_) => format!("Double"), + I8(_) => format!("I8"), + I16(_) => format!("I16"), + I32(_) => format!("I32"), + F32(_) => format!("F32"), + F64(_) => format!("F64"), } } fn x_aggregate(self, ak: &AggKind) -> EventsItem { use SingleBinWaveEvents::*; match self { - Byte(k) => match ak { + I8(k) => match ak { AggKind::EventBlobs => panic!(), - AggKind::Plain => EventsItem::XBinnedEvents(XBinnedEvents::SingleBinWave(SingleBinWaveEvents::Byte(k))), + AggKind::Plain => EventsItem::XBinnedEvents(XBinnedEvents::SingleBinWave(SingleBinWaveEvents::I8(k))), AggKind::TimeWeightedScalar => err::todoval(), AggKind::DimXBins1 => err::todoval(), AggKind::DimXBinsN(_) => EventsItem::Plain(PlainEvents::Wave(err::todoval())), @@ -48,11 +48,11 @@ impl SingleBinWaveEvents { impl Clearable for SingleBinWaveEvents { fn clear(&mut self) { match self { - SingleBinWaveEvents::Byte(k) => k.clear(), - SingleBinWaveEvents::Short(k) => k.clear(), - SingleBinWaveEvents::Int(k) => k.clear(), - SingleBinWaveEvents::Float(k) => k.clear(), - SingleBinWaveEvents::Double(k) => k.clear(), + SingleBinWaveEvents::I8(k) => k.clear(), + SingleBinWaveEvents::I16(k) => k.clear(), + SingleBinWaveEvents::I32(k) => k.clear(), + SingleBinWaveEvents::F32(k) => k.clear(), + SingleBinWaveEvents::F64(k) => k.clear(), } } } @@ -60,34 +60,34 @@ impl Clearable for SingleBinWaveEvents { impl Appendable for SingleBinWaveEvents { fn empty_like_self(&self) -> Self { match self { - Self::Byte(k) => Self::Byte(k.empty_like_self()), - Self::Short(k) => Self::Short(k.empty_like_self()), - Self::Int(k) => Self::Int(k.empty_like_self()), - Self::Float(k) => Self::Float(k.empty_like_self()), - Self::Double(k) => Self::Double(k.empty_like_self()), + Self::I8(k) => Self::I8(k.empty_like_self()), + Self::I16(k) => Self::I16(k.empty_like_self()), + Self::I32(k) => Self::I32(k.empty_like_self()), + Self::F32(k) => Self::F32(k.empty_like_self()), + Self::F64(k) => Self::F64(k.empty_like_self()), } } fn append(&mut self, src: &Self) { match self { - Self::Byte(k) => match src { - Self::Byte(j) => k.append(j), + Self::I8(k) => match src { + Self::I8(j) => k.append(j), _ => panic!(), }, - Self::Short(k) => match src { - Self::Short(j) => k.append(j), + Self::I16(k) => match src { + Self::I16(j) => k.append(j), _ => panic!(), }, - Self::Int(k) => match src { - Self::Int(j) => k.append(j), + Self::I32(k) => match src { + Self::I32(j) => k.append(j), _ => panic!(), }, - Self::Float(k) => match src { - Self::Float(j) => k.append(j), + Self::F32(k) => match src { + Self::F32(j) => k.append(j), _ => panic!(), }, - Self::Double(k) => match src { - Self::Double(j) => k.append(j), + Self::F64(k) => match src { + Self::F64(j) => k.append(j), _ => panic!(), }, } @@ -97,24 +97,24 @@ impl Appendable for SingleBinWaveEvents { impl PushableIndex for SingleBinWaveEvents { fn push_index(&mut self, src: &Self, ix: usize) { match self { - Self::Byte(k) => match src { - Self::Byte(j) => k.push_index(j, ix), + Self::I8(k) => match src { + Self::I8(j) => k.push_index(j, ix), _ => panic!(), }, - Self::Short(k) => match src { - Self::Short(j) => k.push_index(j, ix), + Self::I16(k) => match src { + Self::I16(j) => k.push_index(j, ix), _ => panic!(), }, - Self::Int(k) => match src { - Self::Int(j) => k.push_index(j, ix), + Self::I32(k) => match src { + Self::I32(j) => k.push_index(j, ix), _ => panic!(), }, - Self::Float(k) => match src { - Self::Float(j) => k.push_index(j, ix), + Self::F32(k) => match src { + Self::F32(j) => k.push_index(j, ix), _ => panic!(), }, - Self::Double(k) => match src { - Self::Double(j) => k.push_index(j, ix), + Self::F64(k) => match src { + Self::F64(j) => k.push_index(j, ix), _ => panic!(), }, } @@ -125,11 +125,11 @@ impl WithLen for SingleBinWaveEvents { fn len(&self) -> usize { use SingleBinWaveEvents::*; match self { - Byte(j) => j.len(), - Short(j) => j.len(), - Int(j) => j.len(), - Float(j) => j.len(), - Double(j) => j.len(), + I8(j) => j.len(), + I16(j) => j.len(), + I32(j) => j.len(), + F32(j) => j.len(), + F64(j) => j.len(), } } } @@ -138,11 +138,11 @@ impl WithTimestamps for SingleBinWaveEvents { fn ts(&self, ix: usize) -> u64 { use SingleBinWaveEvents::*; match self { - Byte(j) => j.ts(ix), - Short(j) => j.ts(ix), - Int(j) => j.ts(ix), - Float(j) => j.ts(ix), - Double(j) => j.ts(ix), + I8(j) => j.ts(ix), + I16(j) => j.ts(ix), + I32(j) => j.ts(ix), + F32(j) => j.ts(ix), + F64(j) => j.ts(ix), } } } @@ -151,11 +151,11 @@ impl HasShape for SingleBinWaveEvents { fn shape(&self) -> Shape { use SingleBinWaveEvents::*; match self { - Byte(_) => Shape::Scalar, - Short(_) => Shape::Scalar, - Int(_) => Shape::Scalar, - Float(_) => Shape::Scalar, - Double(_) => Shape::Scalar, + I8(_) => Shape::Scalar, + I16(_) => Shape::Scalar, + I32(_) => Shape::Scalar, + F32(_) => Shape::Scalar, + F64(_) => Shape::Scalar, } } } @@ -164,42 +164,42 @@ impl HasScalarType for SingleBinWaveEvents { fn scalar_type(&self) -> ScalarType { use SingleBinWaveEvents::*; match self { - Byte(_) => ScalarType::I8, - Short(_) => ScalarType::I16, - Int(_) => ScalarType::I32, - Float(_) => ScalarType::F32, - Double(_) => ScalarType::F64, + I8(_) => ScalarType::I8, + I16(_) => ScalarType::I16, + I32(_) => ScalarType::I32, + F32(_) => ScalarType::F32, + F64(_) => ScalarType::F64, } } } #[derive(Debug, Serialize, Deserialize)] pub enum MultiBinWaveEvents { - Byte(XBinnedWaveEvents), - Short(XBinnedWaveEvents), - Int(XBinnedWaveEvents), - Float(XBinnedWaveEvents), - Double(XBinnedWaveEvents), + I8(XBinnedWaveEvents), + I16(XBinnedWaveEvents), + I32(XBinnedWaveEvents), + F32(XBinnedWaveEvents), + F64(XBinnedWaveEvents), } impl MultiBinWaveEvents { pub fn variant_name(&self) -> String { use MultiBinWaveEvents::*; match self { - Byte(_) => format!("Byte"), - Short(_) => format!("Short"), - Int(_) => format!("Int"), - Float(_) => format!("Float"), - Double(_) => format!("Double"), + I8(_) => format!("I8"), + I16(_) => format!("I16"), + I32(_) => format!("I32"), + F32(_) => format!("F32"), + F64(_) => format!("F64"), } } fn x_aggregate(self, ak: &AggKind) -> EventsItem { use MultiBinWaveEvents::*; match self { - Byte(k) => match ak { + I8(k) => match ak { AggKind::EventBlobs => panic!(), - AggKind::Plain => EventsItem::XBinnedEvents(XBinnedEvents::MultiBinWave(MultiBinWaveEvents::Byte(k))), + AggKind::Plain => EventsItem::XBinnedEvents(XBinnedEvents::MultiBinWave(MultiBinWaveEvents::I8(k))), AggKind::TimeWeightedScalar => err::todoval(), AggKind::DimXBins1 => err::todoval(), AggKind::DimXBinsN(_) => EventsItem::Plain(PlainEvents::Wave(err::todoval())), @@ -212,11 +212,11 @@ impl MultiBinWaveEvents { impl Clearable for MultiBinWaveEvents { fn clear(&mut self) { match self { - MultiBinWaveEvents::Byte(k) => k.clear(), - MultiBinWaveEvents::Short(k) => k.clear(), - MultiBinWaveEvents::Int(k) => k.clear(), - MultiBinWaveEvents::Float(k) => k.clear(), - MultiBinWaveEvents::Double(k) => k.clear(), + MultiBinWaveEvents::I8(k) => k.clear(), + MultiBinWaveEvents::I16(k) => k.clear(), + MultiBinWaveEvents::I32(k) => k.clear(), + MultiBinWaveEvents::F32(k) => k.clear(), + MultiBinWaveEvents::F64(k) => k.clear(), } } } @@ -224,34 +224,34 @@ impl Clearable for MultiBinWaveEvents { impl Appendable for MultiBinWaveEvents { fn empty_like_self(&self) -> Self { match self { - Self::Byte(k) => Self::Byte(k.empty_like_self()), - Self::Short(k) => Self::Short(k.empty_like_self()), - Self::Int(k) => Self::Int(k.empty_like_self()), - Self::Float(k) => Self::Float(k.empty_like_self()), - Self::Double(k) => Self::Double(k.empty_like_self()), + Self::I8(k) => Self::I8(k.empty_like_self()), + Self::I16(k) => Self::I16(k.empty_like_self()), + Self::I32(k) => Self::I32(k.empty_like_self()), + Self::F32(k) => Self::F32(k.empty_like_self()), + Self::F64(k) => Self::F64(k.empty_like_self()), } } fn append(&mut self, src: &Self) { match self { - Self::Byte(k) => match src { - Self::Byte(j) => k.append(j), + Self::I8(k) => match src { + Self::I8(j) => k.append(j), _ => panic!(), }, - Self::Short(k) => match src { - Self::Short(j) => k.append(j), + Self::I16(k) => match src { + Self::I16(j) => k.append(j), _ => panic!(), }, - Self::Int(k) => match src { - Self::Int(j) => k.append(j), + Self::I32(k) => match src { + Self::I32(j) => k.append(j), _ => panic!(), }, - Self::Float(k) => match src { - Self::Float(j) => k.append(j), + Self::F32(k) => match src { + Self::F32(j) => k.append(j), _ => panic!(), }, - Self::Double(k) => match src { - Self::Double(j) => k.append(j), + Self::F64(k) => match src { + Self::F64(j) => k.append(j), _ => panic!(), }, } @@ -261,24 +261,24 @@ impl Appendable for MultiBinWaveEvents { impl PushableIndex for MultiBinWaveEvents { fn push_index(&mut self, src: &Self, ix: usize) { match self { - Self::Byte(k) => match src { - Self::Byte(j) => k.push_index(j, ix), + Self::I8(k) => match src { + Self::I8(j) => k.push_index(j, ix), _ => panic!(), }, - Self::Short(k) => match src { - Self::Short(j) => k.push_index(j, ix), + Self::I16(k) => match src { + Self::I16(j) => k.push_index(j, ix), _ => panic!(), }, - Self::Int(k) => match src { - Self::Int(j) => k.push_index(j, ix), + Self::I32(k) => match src { + Self::I32(j) => k.push_index(j, ix), _ => panic!(), }, - Self::Float(k) => match src { - Self::Float(j) => k.push_index(j, ix), + Self::F32(k) => match src { + Self::F32(j) => k.push_index(j, ix), _ => panic!(), }, - Self::Double(k) => match src { - Self::Double(j) => k.push_index(j, ix), + Self::F64(k) => match src { + Self::F64(j) => k.push_index(j, ix), _ => panic!(), }, } @@ -289,11 +289,11 @@ impl WithLen for MultiBinWaveEvents { fn len(&self) -> usize { use MultiBinWaveEvents::*; match self { - Byte(j) => j.len(), - Short(j) => j.len(), - Int(j) => j.len(), - Float(j) => j.len(), - Double(j) => j.len(), + I8(j) => j.len(), + I16(j) => j.len(), + I32(j) => j.len(), + F32(j) => j.len(), + F64(j) => j.len(), } } } @@ -302,11 +302,11 @@ impl WithTimestamps for MultiBinWaveEvents { fn ts(&self, ix: usize) -> u64 { use MultiBinWaveEvents::*; match self { - Byte(j) => j.ts(ix), - Short(j) => j.ts(ix), - Int(j) => j.ts(ix), - Float(j) => j.ts(ix), - Double(j) => j.ts(ix), + I8(j) => j.ts(ix), + I16(j) => j.ts(ix), + I32(j) => j.ts(ix), + F32(j) => j.ts(ix), + F64(j) => j.ts(ix), } } } @@ -315,11 +315,11 @@ impl HasShape for MultiBinWaveEvents { fn shape(&self) -> Shape { use MultiBinWaveEvents::*; match self { - Byte(_) => Shape::Scalar, - Short(_) => Shape::Scalar, - Int(_) => Shape::Scalar, - Float(_) => Shape::Scalar, - Double(_) => Shape::Scalar, + I8(_) => Shape::Scalar, + I16(_) => Shape::Scalar, + I32(_) => Shape::Scalar, + F32(_) => Shape::Scalar, + F64(_) => Shape::Scalar, } } } @@ -328,11 +328,11 @@ impl HasScalarType for MultiBinWaveEvents { fn scalar_type(&self) -> ScalarType { use MultiBinWaveEvents::*; match self { - Byte(_) => ScalarType::I8, - Short(_) => ScalarType::I16, - Int(_) => ScalarType::I32, - Float(_) => ScalarType::F32, - Double(_) => ScalarType::F64, + I8(_) => ScalarType::I8, + I16(_) => ScalarType::I16, + I32(_) => ScalarType::I32, + F32(_) => ScalarType::F32, + F64(_) => ScalarType::F64, } } } diff --git a/items/src/eventsitem.rs b/items/src/eventsitem.rs index dcc9ce1..10c9225 100644 --- a/items/src/eventsitem.rs +++ b/items/src/eventsitem.rs @@ -49,21 +49,22 @@ impl EventsItem { match self { EventsItem::Plain(k) => match k { PlainEvents::Scalar(k) => match k { - ScalarPlainEvents::Byte(_) => (ScalarType::I8, Shape::Scalar), - ScalarPlainEvents::Short(_) => (ScalarType::I16, Shape::Scalar), - ScalarPlainEvents::Int(_) => (ScalarType::I32, Shape::Scalar), - ScalarPlainEvents::Float(_) => (ScalarType::F32, Shape::Scalar), - ScalarPlainEvents::Double(_) => (ScalarType::F64, Shape::Scalar), + ScalarPlainEvents::U32(_) => (ScalarType::U32, Shape::Scalar), + ScalarPlainEvents::I8(_) => (ScalarType::I8, Shape::Scalar), + ScalarPlainEvents::I16(_) => (ScalarType::I16, Shape::Scalar), + ScalarPlainEvents::I32(_) => (ScalarType::I32, Shape::Scalar), + ScalarPlainEvents::F32(_) => (ScalarType::F32, Shape::Scalar), + ScalarPlainEvents::F64(_) => (ScalarType::F64, Shape::Scalar), }, PlainEvents::Wave(k) => match k { // TODO // Inherent issue for the non-static-type backends: // there is a chance that we can't determine the shape here. - WavePlainEvents::Byte(k) => (ScalarType::I8, k.shape().unwrap()), - WavePlainEvents::Short(k) => (ScalarType::I16, k.shape().unwrap()), - WavePlainEvents::Int(k) => (ScalarType::I32, k.shape().unwrap()), - WavePlainEvents::Float(k) => (ScalarType::F32, k.shape().unwrap()), - WavePlainEvents::Double(k) => (ScalarType::F64, k.shape().unwrap()), + WavePlainEvents::I8(k) => (ScalarType::I8, k.shape().unwrap()), + WavePlainEvents::I16(k) => (ScalarType::I16, k.shape().unwrap()), + WavePlainEvents::I32(k) => (ScalarType::I32, k.shape().unwrap()), + WavePlainEvents::F32(k) => (ScalarType::F32, k.shape().unwrap()), + WavePlainEvents::F64(k) => (ScalarType::F64, k.shape().unwrap()), }, }, EventsItem::XBinnedEvents(_k) => panic!(), diff --git a/items/src/plainevents.rs b/items/src/plainevents.rs index 9d68c06..8a67322 100644 --- a/items/src/plainevents.rs +++ b/items/src/plainevents.rs @@ -9,22 +9,24 @@ use serde::{Deserialize, Serialize}; #[derive(Debug, Serialize, Deserialize)] pub enum ScalarPlainEvents { - Byte(EventValues), - Short(EventValues), - Int(EventValues), - Float(EventValues), - Double(EventValues), + U32(EventValues), + I8(EventValues), + I16(EventValues), + I32(EventValues), + F32(EventValues), + F64(EventValues), } impl ScalarPlainEvents { pub fn variant_name(&self) -> String { use ScalarPlainEvents::*; match self { - Byte(_) => format!("Byte"), - Short(_) => format!("Short"), - Int(_) => format!("Int"), - Float(_) => format!("Float"), - Double(_) => format!("Double"), + U32(_) => format!("U32"), + I8(_) => format!("I8"), + I16(_) => format!("I16"), + I32(_) => format!("I32"), + F32(_) => format!("F32"), + F64(_) => format!("F64"), } } } @@ -32,11 +34,12 @@ impl ScalarPlainEvents { impl Clearable for ScalarPlainEvents { fn clear(&mut self) { match self { - ScalarPlainEvents::Byte(k) => k.clear(), - ScalarPlainEvents::Short(k) => k.clear(), - ScalarPlainEvents::Int(k) => k.clear(), - ScalarPlainEvents::Float(k) => k.clear(), - ScalarPlainEvents::Double(k) => k.clear(), + ScalarPlainEvents::U32(k) => k.clear(), + ScalarPlainEvents::I8(k) => k.clear(), + ScalarPlainEvents::I16(k) => k.clear(), + ScalarPlainEvents::I32(k) => k.clear(), + ScalarPlainEvents::F32(k) => k.clear(), + ScalarPlainEvents::F64(k) => k.clear(), } } } @@ -44,34 +47,39 @@ impl Clearable for ScalarPlainEvents { impl Appendable for ScalarPlainEvents { fn empty_like_self(&self) -> Self { match self { - Self::Byte(k) => Self::Byte(k.empty_like_self()), - Self::Short(k) => Self::Short(k.empty_like_self()), - Self::Int(k) => Self::Int(k.empty_like_self()), - Self::Float(k) => Self::Float(k.empty_like_self()), - Self::Double(k) => Self::Double(k.empty_like_self()), + Self::U32(k) => Self::U32(k.empty_like_self()), + Self::I8(k) => Self::I8(k.empty_like_self()), + Self::I16(k) => Self::I16(k.empty_like_self()), + Self::I32(k) => Self::I32(k.empty_like_self()), + Self::F32(k) => Self::F32(k.empty_like_self()), + Self::F64(k) => Self::F64(k.empty_like_self()), } } fn append(&mut self, src: &Self) { match self { - Self::Byte(k) => match src { - Self::Byte(j) => k.append(j), + Self::U32(k) => match src { + Self::U32(j) => k.append(j), _ => panic!(), }, - Self::Short(k) => match src { - Self::Short(j) => k.append(j), + Self::I8(k) => match src { + Self::I8(j) => k.append(j), _ => panic!(), }, - Self::Int(k) => match src { - Self::Int(j) => k.append(j), + Self::I16(k) => match src { + Self::I16(j) => k.append(j), _ => panic!(), }, - Self::Float(k) => match src { - Self::Float(j) => k.append(j), + Self::I32(k) => match src { + Self::I32(j) => k.append(j), _ => panic!(), }, - Self::Double(k) => match src { - Self::Double(j) => k.append(j), + Self::F32(k) => match src { + Self::F32(j) => k.append(j), + _ => panic!(), + }, + Self::F64(k) => match src { + Self::F64(j) => k.append(j), _ => panic!(), }, } @@ -81,24 +89,28 @@ impl Appendable for ScalarPlainEvents { impl PushableIndex for ScalarPlainEvents { fn push_index(&mut self, src: &Self, ix: usize) { match self { - Self::Byte(k) => match src { - Self::Byte(j) => k.push_index(j, ix), + Self::U32(k) => match src { + Self::U32(j) => k.push_index(j, ix), _ => panic!(), }, - Self::Short(k) => match src { - Self::Short(j) => k.push_index(j, ix), + Self::I8(k) => match src { + Self::I8(j) => k.push_index(j, ix), _ => panic!(), }, - Self::Int(k) => match src { - Self::Int(j) => k.push_index(j, ix), + Self::I16(k) => match src { + Self::I16(j) => k.push_index(j, ix), _ => panic!(), }, - Self::Float(k) => match src { - Self::Float(j) => k.push_index(j, ix), + Self::I32(k) => match src { + Self::I32(j) => k.push_index(j, ix), _ => panic!(), }, - Self::Double(k) => match src { - Self::Double(j) => k.push_index(j, ix), + Self::F32(k) => match src { + Self::F32(j) => k.push_index(j, ix), + _ => panic!(), + }, + Self::F64(k) => match src { + Self::F64(j) => k.push_index(j, ix), _ => panic!(), }, } @@ -109,11 +121,12 @@ impl WithLen for ScalarPlainEvents { fn len(&self) -> usize { use ScalarPlainEvents::*; match self { - Byte(j) => j.len(), - Short(j) => j.len(), - Int(j) => j.len(), - Float(j) => j.len(), - Double(j) => j.len(), + U32(j) => j.len(), + I8(j) => j.len(), + I16(j) => j.len(), + I32(j) => j.len(), + F32(j) => j.len(), + F64(j) => j.len(), } } } @@ -122,11 +135,12 @@ impl WithTimestamps for ScalarPlainEvents { fn ts(&self, ix: usize) -> u64 { use ScalarPlainEvents::*; match self { - Byte(j) => j.ts(ix), - Short(j) => j.ts(ix), - Int(j) => j.ts(ix), - Float(j) => j.ts(ix), - Double(j) => j.ts(ix), + U32(j) => j.ts(ix), + I8(j) => j.ts(ix), + I16(j) => j.ts(ix), + I32(j) => j.ts(ix), + F32(j) => j.ts(ix), + F64(j) => j.ts(ix), } } } @@ -143,32 +157,33 @@ impl HasScalarType for ScalarPlainEvents { fn scalar_type(&self) -> ScalarType { use ScalarPlainEvents::*; match self { - Byte(_) => ScalarType::I8, - Short(_) => ScalarType::I16, - Int(_) => ScalarType::I32, - Float(_) => ScalarType::F32, - Double(_) => ScalarType::F64, + U32(_) => ScalarType::U32, + I8(_) => ScalarType::I8, + I16(_) => ScalarType::I16, + I32(_) => ScalarType::I32, + F32(_) => ScalarType::F32, + F64(_) => ScalarType::F64, } } } #[derive(Debug, Serialize, Deserialize)] pub enum WavePlainEvents { - Byte(WaveEvents), - Short(WaveEvents), - Int(WaveEvents), - Float(WaveEvents), - Double(WaveEvents), + I8(WaveEvents), + I16(WaveEvents), + I32(WaveEvents), + F32(WaveEvents), + F64(WaveEvents), } impl WavePlainEvents { pub fn shape(&self) -> Result { match self { - WavePlainEvents::Byte(k) => k.shape(), - WavePlainEvents::Short(k) => k.shape(), - WavePlainEvents::Int(k) => k.shape(), - WavePlainEvents::Float(k) => k.shape(), - WavePlainEvents::Double(k) => k.shape(), + WavePlainEvents::I8(k) => k.shape(), + WavePlainEvents::I16(k) => k.shape(), + WavePlainEvents::I32(k) => k.shape(), + WavePlainEvents::F32(k) => k.shape(), + WavePlainEvents::F64(k) => k.shape(), } } } @@ -197,11 +212,11 @@ impl WavePlainEvents { pub fn variant_name(&self) -> String { use WavePlainEvents::*; match self { - Byte(h) => format!("Byte({})", h.vals.first().map_or(0, |j| j.len())), - Short(h) => format!("Short({})", h.vals.first().map_or(0, |j| j.len())), - Int(h) => format!("Int({})", h.vals.first().map_or(0, |j| j.len())), - Float(h) => format!("Float({})", h.vals.first().map_or(0, |j| j.len())), - Double(h) => format!("Double({})", h.vals.first().map_or(0, |j| j.len())), + I8(h) => format!("I8({})", h.vals.first().map_or(0, |j| j.len())), + I16(h) => format!("I16({})", h.vals.first().map_or(0, |j| j.len())), + I32(h) => format!("I32({})", h.vals.first().map_or(0, |j| j.len())), + F32(h) => format!("F32({})", h.vals.first().map_or(0, |j| j.len())), + F64(h) => format!("F64({})", h.vals.first().map_or(0, |j| j.len())), } } @@ -209,11 +224,11 @@ impl WavePlainEvents { use WavePlainEvents::*; let shape = self.shape().unwrap(); match self { - Byte(k) => wagg1!(k, ak, shape, Byte), - Short(k) => wagg1!(k, ak, shape, Short), - Int(k) => wagg1!(k, ak, shape, Int), - Float(k) => wagg1!(k, ak, shape, Float), - Double(k) => wagg1!(k, ak, shape, Double), + I8(k) => wagg1!(k, ak, shape, I8), + I16(k) => wagg1!(k, ak, shape, I16), + I32(k) => wagg1!(k, ak, shape, I32), + F32(k) => wagg1!(k, ak, shape, F32), + F64(k) => wagg1!(k, ak, shape, F64), } } } @@ -221,11 +236,11 @@ impl WavePlainEvents { impl Clearable for WavePlainEvents { fn clear(&mut self) { match self { - WavePlainEvents::Byte(k) => k.clear(), - WavePlainEvents::Short(k) => k.clear(), - WavePlainEvents::Int(k) => k.clear(), - WavePlainEvents::Float(k) => k.clear(), - WavePlainEvents::Double(k) => k.clear(), + WavePlainEvents::I8(k) => k.clear(), + WavePlainEvents::I16(k) => k.clear(), + WavePlainEvents::I32(k) => k.clear(), + WavePlainEvents::F32(k) => k.clear(), + WavePlainEvents::F64(k) => k.clear(), } } } @@ -233,34 +248,34 @@ impl Clearable for WavePlainEvents { impl Appendable for WavePlainEvents { fn empty_like_self(&self) -> Self { match self { - Self::Byte(k) => Self::Byte(k.empty_like_self()), - Self::Short(k) => Self::Short(k.empty_like_self()), - Self::Int(k) => Self::Int(k.empty_like_self()), - Self::Float(k) => Self::Float(k.empty_like_self()), - Self::Double(k) => Self::Double(k.empty_like_self()), + Self::I8(k) => Self::I8(k.empty_like_self()), + Self::I16(k) => Self::I16(k.empty_like_self()), + Self::I32(k) => Self::I32(k.empty_like_self()), + Self::F32(k) => Self::F32(k.empty_like_self()), + Self::F64(k) => Self::F64(k.empty_like_self()), } } fn append(&mut self, src: &Self) { match self { - Self::Byte(k) => match src { - Self::Byte(j) => k.append(j), + Self::I8(k) => match src { + Self::I8(j) => k.append(j), _ => panic!(), }, - Self::Short(k) => match src { - Self::Short(j) => k.append(j), + Self::I16(k) => match src { + Self::I16(j) => k.append(j), _ => panic!(), }, - Self::Int(k) => match src { - Self::Int(j) => k.append(j), + Self::I32(k) => match src { + Self::I32(j) => k.append(j), _ => panic!(), }, - Self::Float(k) => match src { - Self::Float(j) => k.append(j), + Self::F32(k) => match src { + Self::F32(j) => k.append(j), _ => panic!(), }, - Self::Double(k) => match src { - Self::Double(j) => k.append(j), + Self::F64(k) => match src { + Self::F64(j) => k.append(j), _ => panic!(), }, } @@ -270,24 +285,24 @@ impl Appendable for WavePlainEvents { impl PushableIndex for WavePlainEvents { fn push_index(&mut self, src: &Self, ix: usize) { match self { - Self::Byte(k) => match src { - Self::Byte(j) => k.push_index(j, ix), + Self::I8(k) => match src { + Self::I8(j) => k.push_index(j, ix), _ => panic!(), }, - Self::Short(k) => match src { - Self::Short(j) => k.push_index(j, ix), + Self::I16(k) => match src { + Self::I16(j) => k.push_index(j, ix), _ => panic!(), }, - Self::Int(k) => match src { - Self::Int(j) => k.push_index(j, ix), + Self::I32(k) => match src { + Self::I32(j) => k.push_index(j, ix), _ => panic!(), }, - Self::Float(k) => match src { - Self::Float(j) => k.push_index(j, ix), + Self::F32(k) => match src { + Self::F32(j) => k.push_index(j, ix), _ => panic!(), }, - Self::Double(k) => match src { - Self::Double(j) => k.push_index(j, ix), + Self::F64(k) => match src { + Self::F64(j) => k.push_index(j, ix), _ => panic!(), }, } @@ -298,11 +313,11 @@ impl WithLen for WavePlainEvents { fn len(&self) -> usize { use WavePlainEvents::*; match self { - Byte(j) => j.len(), - Short(j) => j.len(), - Int(j) => j.len(), - Float(j) => j.len(), - Double(j) => j.len(), + I8(j) => j.len(), + I16(j) => j.len(), + I32(j) => j.len(), + F32(j) => j.len(), + F64(j) => j.len(), } } } @@ -311,11 +326,11 @@ impl WithTimestamps for WavePlainEvents { fn ts(&self, ix: usize) -> u64 { use WavePlainEvents::*; match self { - Byte(j) => j.ts(ix), - Short(j) => j.ts(ix), - Int(j) => j.ts(ix), - Float(j) => j.ts(ix), - Double(j) => j.ts(ix), + I8(j) => j.ts(ix), + I16(j) => j.ts(ix), + I32(j) => j.ts(ix), + F32(j) => j.ts(ix), + F64(j) => j.ts(ix), } } } @@ -325,8 +340,8 @@ impl HasShape for WavePlainEvents { /*use WavePlainEvents::*; match self { Byte(h) => Shape::Wave(h.vals.first().map_or(0, |x| x.len() as u32)), - Short(h) => Shape::Wave(h.vals.first().map_or(0, |x| x.len() as u32)), - Int(h) => Shape::Wave(h.vals.first().map_or(0, |x| x.len() as u32)), + I16(h) => Shape::Wave(h.vals.first().map_or(0, |x| x.len() as u32)), + I32(h) => Shape::Wave(h.vals.first().map_or(0, |x| x.len() as u32)), Float(h) => Shape::Wave(h.vals.first().map_or(0, |x| x.len() as u32)), Double(h) => Shape::Wave(h.vals.first().map_or(0, |x| x.len() as u32)), }*/ @@ -338,11 +353,11 @@ impl HasScalarType for WavePlainEvents { fn scalar_type(&self) -> ScalarType { use WavePlainEvents::*; match self { - Byte(_) => ScalarType::I8, - Short(_) => ScalarType::I16, - Int(_) => ScalarType::I32, - Float(_) => ScalarType::F32, - Double(_) => ScalarType::F64, + I8(_) => ScalarType::I8, + I16(_) => ScalarType::I16, + I32(_) => ScalarType::I32, + F32(_) => ScalarType::F32, + F64(_) => ScalarType::F64, } } }