From 002139bfae89c3db77266e95c49f35f1b7c162cf Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 27 Jul 2021 16:17:35 +0200 Subject: [PATCH] Frame the archapp variants in compatible way --- archapp/src/events.rs | 154 +++++++++++++++++++++++++++++++++++------- archapp/src/lib.rs | 103 ++++++++++++++++++++++++++-- archapp/src/parse.rs | 57 +++++++++------- archapp/src/test.rs | 2 +- 4 files changed, 261 insertions(+), 55 deletions(-) diff --git a/archapp/src/events.rs b/archapp/src/events.rs index 78b85e3..8e058a8 100644 --- a/archapp/src/events.rs +++ b/archapp/src/events.rs @@ -1,5 +1,7 @@ use crate::parse::PbFileReader; -use crate::{EventsItem, PlainEvents, ScalarPlainEvents, WavePlainEvents, XBinnedEvents}; +use crate::{ + EventsItem, MultiBinWaveEvents, PlainEvents, ScalarPlainEvents, SingleBinWaveEvents, WavePlainEvents, XBinnedEvents, +}; use chrono::{TimeZone, Utc}; use err::Error; use futures_core::Stream; @@ -8,8 +10,9 @@ use items::eventvalues::EventValues; use items::waveevents::{WaveEvents, WaveXBinner}; use items::xbinnedscalarevents::XBinnedScalarEvents; use items::xbinnedwaveevents::XBinnedWaveEvents; -use items::RangeCompletableItem::RangeComplete; -use items::{EventsNodeProcessor, Framable, RangeCompletableItem, Sitemty, SitemtyFrameType, StreamItem}; +use items::{ + EventsNodeProcessor, Framable, RangeCompletableItem, Sitemty, SitemtyFrameType, StreamItem, WithLen, WithTimestamps, +}; use netpod::log::*; use netpod::query::RawEventsQuery; use netpod::timeunits::{DAY, SEC}; @@ -172,14 +175,7 @@ impl FrameMaker { where T: SitemtyFrameType + Serialize + Send + 'static, { - match item { - Ok(_) => err::todoval(), - Err(e) => { - //let t = Ok(StreamItem::DataItem(RangeCompletableItem::Data())) - let t: Sitemty = Err(e); - Box::new(t) - } - } + err::todoval() } } @@ -220,18 +216,104 @@ macro_rules! events_item_to_sitemty { }}; } +macro_rules! arm2 { + ($item:expr, $t1:ident, $t2:ident, $t3:ident, $t4:ident, $t5:ident, $sty1:ident, $sty2:ident) => {{ + type T1 = $t1<$sty1>; + let ret: Sitemty = match $item { + Ok(k) => match k { + StreamItem::DataItem(k) => match k { + RangeCompletableItem::RangeComplete => { + Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) + } + RangeCompletableItem::Data(k) => match k { + EventsItem::$t2(k) => match k { + $t3::$t4(k) => match k { + $t5::$sty2(k) => { + // + Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))) + } + _ => panic!(), + }, + _ => panic!(), + }, + _ => err::todoval(), + }, + }, + StreamItem::Log(k) => Ok(StreamItem::Log(k)), + StreamItem::Stats(k) => Ok(StreamItem::Stats(k)), + }, + Err(e) => Err(e), + }; + Box::new(ret) as Box + }}; +} + macro_rules! arm1 { - ($item:expr, $sty:ident, $shape:expr, $ak:expr) => {{ + ($item:expr, $sty1:ident, $sty2:ident, $shape:expr, $ak:expr) => {{ match $shape { Shape::Scalar => match $ak { - AggKind::Plain => Self::make_frame_gen::>($item), - AggKind::DimXBins1 => Self::make_frame_gen::>($item), - AggKind::DimXBinsN(_) => Self::make_frame_gen::>($item), + AggKind::Plain => arm2!( + $item, + EventValues, + Plain, + PlainEvents, + Scalar, + ScalarPlainEvents, + $sty1, + $sty2 + ), + AggKind::DimXBins1 => arm2!( + $item, + EventValues, + XBinnedEvents, + XBinnedEvents, + Scalar, + ScalarPlainEvents, + $sty1, + $sty2 + ), + AggKind::DimXBinsN(_) => arm2!( + $item, + EventValues, + XBinnedEvents, + XBinnedEvents, + Scalar, + ScalarPlainEvents, + $sty1, + $sty2 + ), }, Shape::Wave(_) => match $ak { - AggKind::Plain => Self::make_frame_gen::>($item), - AggKind::DimXBins1 => Self::make_frame_gen::>($item), - AggKind::DimXBinsN(_) => Self::make_frame_gen::>($item), + AggKind::Plain => arm2!( + $item, + WaveEvents, + Plain, + PlainEvents, + Wave, + WavePlainEvents, + $sty1, + $sty2 + ), + AggKind::DimXBins1 => arm2!( + $item, + XBinnedScalarEvents, + XBinnedEvents, + XBinnedEvents, + SingleBinWave, + SingleBinWaveEvents, + $sty1, + $sty2 + ), + AggKind::DimXBinsN(_) => arm2!( + $item, + XBinnedWaveEvents, + XBinnedEvents, + XBinnedEvents, + MultiBinWave, + MultiBinWaveEvents, + $sty1, + $sty2 + ), }, } }}; @@ -244,11 +326,11 @@ impl FrameMakerTrait for FrameMaker { // Therefore, I need to decide that based on given parameters. // see also channel_info in this mod. match self.scalar_type { - ScalarType::I8 => arm1!(item, i8, self.shape, self.agg_kind), - ScalarType::I16 => arm1!(item, i16, self.shape, self.agg_kind), - ScalarType::I32 => arm1!(item, i32, self.shape, self.agg_kind), - ScalarType::F32 => arm1!(item, f32, self.shape, self.agg_kind), - ScalarType::F64 => arm1!(item, f64, self.shape, self.agg_kind), + ScalarType::I8 => arm1!(item, i8, Byte, self.shape, self.agg_kind), + ScalarType::I16 => arm1!(item, i16, Short, self.shape, self.agg_kind), + ScalarType::I32 => arm1!(item, i32, Int, self.shape, self.agg_kind), + ScalarType::F32 => arm1!(item, f32, Float, self.shape, self.agg_kind), + ScalarType::F64 => arm1!(item, f64, Double, self.shape, self.agg_kind), _ => err::todoval(), } } @@ -297,7 +379,16 @@ pub async fn make_single_event_pipe( // TODO first collect all matching filenames, then sort, then open files. // TODO if dir does not exist, should notify client but not log as error. - let mut rd = tokio::fs::read_dir(&dir).await?; + let mut rd = match tokio::fs::read_dir(&dir).await { + Ok(k) => k, + Err(e) => match e.kind() { + std::io::ErrorKind::NotFound => { + warn!("does not exist: {:?}", dir); + return Ok(()); + } + _ => return Err(e)?, + }, + }; while let Some(de) = rd.next_entry().await? { let s = de.file_name().to_string_lossy().into_owned(); if s.starts_with(&prefix) && s.ends_with(".pb") { @@ -315,13 +406,24 @@ pub async fn make_single_event_pipe( let mut pbr = PbFileReader::new(f1).await; pbr.read_header().await?; info!("✓ read header {:?}", pbr.payload_type()); - loop { + let mut i1 = 0; + 'evread: loop { match pbr.read_msg().await { Ok(ei) => { - info!("read msg from file"); + let tslast = if ei.len() > 0 { Some(ei.ts(ei.len() - 1)) } else { None }; + i1 += 1; + if i1 % 1000 == 0 { + info!("read msg from file {}", i1); + } let ei2 = ei.x_aggregate(&evq.agg_kind); let g = Ok(StreamItem::DataItem(RangeCompletableItem::Data(ei2))); tx.send(g).await?; + if let Some(t) = tslast { + if t >= evq.range.end { + info!("after requested range, break"); + break 'evread; + } + } } Err(e) => { error!("error while reading msg {:?}", e); diff --git a/archapp/src/lib.rs b/archapp/src/lib.rs index e12dec0..5bc8c05 100644 --- a/archapp/src/lib.rs +++ b/archapp/src/lib.rs @@ -12,6 +12,7 @@ use items::eventvalues::EventValues; use items::numops::NumOps; use items::waveevents::{WaveEvents, WaveXBinner}; use items::xbinnedscalarevents::XBinnedScalarEvents; +use items::xbinnedwaveevents::XBinnedWaveEvents; use items::{EventsNodeProcessor, Framable, SitemtyFrameType, WithLen, WithTimestamps}; use netpod::{AggKind, HasScalarType, HasShape, ScalarType, Shape}; #[cfg(not(feature = "devread"))] @@ -22,8 +23,8 @@ pub mod events; #[cfg(test)] pub mod test; -fn unescape_archapp_msg(inp: &[u8]) -> Result, Error> { - let mut ret = Vec::with_capacity(inp.len() * 5 / 4); +fn unescape_archapp_msg(inp: &[u8], mut ret: Vec) -> Result, Error> { + ret.clear(); let mut esc = false; for &k in inp.iter() { if k == 0x1b { @@ -36,7 +37,7 @@ fn unescape_archapp_msg(inp: &[u8]) -> Result, Error> { } else if k == 0x3 { ret.push(0xd); } else { - return Err(Error::with_msg("malformed escaped archapp message")); + return Err(Error::with_msg_no_trace("malformed escaped archapp message")); } esc = false; } else { @@ -227,6 +228,92 @@ impl HasScalarType for WavePlainEvents { } } +#[derive(Debug)] +pub enum MultiBinWaveEvents { + Byte(XBinnedWaveEvents), + Short(XBinnedWaveEvents), + Int(XBinnedWaveEvents), + Float(XBinnedWaveEvents), + Double(XBinnedWaveEvents), +} + +impl MultiBinWaveEvents { + pub fn variant_name(&self) -> String { + use MultiBinWaveEvents::*; + match self { + Byte(h) => format!("Byte"), + Short(h) => format!("Short"), + Int(h) => format!("Int"), + Float(h) => format!("Float"), + Double(h) => format!("Double"), + } + } + + fn x_aggregate(self, ak: &AggKind) -> EventsItem { + use MultiBinWaveEvents::*; + match self { + Byte(k) => match ak { + AggKind::Plain => EventsItem::XBinnedEvents(XBinnedEvents::MultiBinWave(MultiBinWaveEvents::Byte(k))), + AggKind::DimXBins1 => err::todoval(), + AggKind::DimXBinsN(_) => EventsItem::Plain(PlainEvents::Wave(err::todoval())), + }, + _ => err::todoval(), + } + } +} + +impl WithLen for MultiBinWaveEvents { + fn len(&self) -> usize { + use MultiBinWaveEvents::*; + match self { + Byte(j) => j.len(), + Short(j) => j.len(), + Int(j) => j.len(), + Float(j) => j.len(), + Double(j) => j.len(), + } + } +} + +impl WithTimestamps for MultiBinWaveEvents { + fn ts(&self, ix: usize) -> u64 { + use MultiBinWaveEvents::*; + match self { + Byte(j) => j.ts(ix), + Short(j) => j.ts(ix), + Int(j) => j.ts(ix), + Float(j) => j.ts(ix), + Double(j) => j.ts(ix), + } + } +} + +impl HasShape for MultiBinWaveEvents { + fn shape(&self) -> Shape { + use MultiBinWaveEvents::*; + match self { + Byte(h) => Shape::Scalar, + Short(h) => Shape::Scalar, + Int(h) => Shape::Scalar, + Float(h) => Shape::Scalar, + Double(h) => Shape::Scalar, + } + } +} + +impl HasScalarType for MultiBinWaveEvents { + fn scalar_type(&self) -> ScalarType { + use MultiBinWaveEvents::*; + match self { + Byte(h) => ScalarType::I8, + Short(h) => ScalarType::I16, + Int(h) => ScalarType::I32, + Float(h) => ScalarType::F32, + Double(h) => ScalarType::F64, + } + } +} + #[derive(Debug)] pub enum SingleBinWaveEvents { Byte(XBinnedScalarEvents), @@ -256,7 +343,7 @@ impl SingleBinWaveEvents { AggKind::DimXBins1 => err::todoval(), AggKind::DimXBinsN(_) => EventsItem::Plain(PlainEvents::Wave(err::todoval())), }, - _ => panic!(), + _ => err::todoval(), } } } @@ -317,7 +404,7 @@ impl HasScalarType for SingleBinWaveEvents { pub enum XBinnedEvents { Scalar(ScalarPlainEvents), SingleBinWave(SingleBinWaveEvents), - //MultiBinWave, + MultiBinWave(MultiBinWaveEvents), } impl XBinnedEvents { @@ -326,6 +413,7 @@ impl XBinnedEvents { match self { Scalar(h) => format!("Scalar({})", h.variant_name()), SingleBinWave(h) => format!("SingleBinWave({})", h.variant_name()), + MultiBinWave(h) => format!("MultiBinWave({})", h.variant_name()), } } @@ -334,6 +422,7 @@ impl XBinnedEvents { match self { Scalar(k) => EventsItem::Plain(PlainEvents::Scalar(k)), SingleBinWave(k) => k.x_aggregate(ak), + MultiBinWave(k) => k.x_aggregate(ak), } } } @@ -344,6 +433,7 @@ impl WithLen for XBinnedEvents { match self { Scalar(j) => j.len(), SingleBinWave(j) => j.len(), + MultiBinWave(j) => j.len(), } } } @@ -354,6 +444,7 @@ impl WithTimestamps for XBinnedEvents { match self { Scalar(j) => j.ts(ix), SingleBinWave(j) => j.ts(ix), + MultiBinWave(j) => j.ts(ix), } } } @@ -364,6 +455,7 @@ impl HasShape for XBinnedEvents { match self { Scalar(h) => h.shape(), SingleBinWave(h) => h.shape(), + MultiBinWave(h) => h.shape(), } } } @@ -374,6 +466,7 @@ impl HasScalarType for XBinnedEvents { match self { Scalar(h) => h.scalar_type(), SingleBinWave(h) => h.scalar_type(), + MultiBinWave(h) => h.scalar_type(), } } } diff --git a/archapp/src/parse.rs b/archapp/src/parse.rs index aa736ac..41b7a88 100644 --- a/archapp/src/parse.rs +++ b/archapp/src/parse.rs @@ -14,6 +14,7 @@ use serde::Serialize; use serde_json::Value as JsonValue; use std::collections::{BTreeMap, VecDeque}; use std::fs::FileType; +use std::mem; use std::path::PathBuf; use std::sync::Arc; use std::time::Instant; @@ -23,6 +24,7 @@ use tokio::io::AsyncReadExt; pub struct PbFileReader { file: File, buf: Vec, + escbuf: Vec, wp: usize, rp: usize, channel_name: String, @@ -81,13 +83,14 @@ macro_rules! wave_parse { }}; } -const MIN_BUF_FILL: usize = 1024 * 16; +const MIN_BUF_FILL: usize = 1024 * 64; impl PbFileReader { pub async fn new(file: File) -> Self { Self { file, buf: vec![0; MIN_BUF_FILL * 4], + escbuf: vec![], wp: 0, rp: 0, channel_name: String::new(), @@ -100,8 +103,9 @@ impl PbFileReader { self.fill_buf().await?; let k = self.find_next_nl()?; let buf = &mut self.buf; - let m = unescape_archapp_msg(&buf[self.rp..k])?; - let payload_info = crate::generated::EPICSEvent::PayloadInfo::parse_from_bytes(&m) + let m = unescape_archapp_msg(&buf[self.rp..k], mem::replace(&mut self.escbuf, vec![]))?; + self.escbuf = m; + let payload_info = crate::generated::EPICSEvent::PayloadInfo::parse_from_bytes(&self.escbuf) .map_err(|_| Error::with_msg("can not parse PayloadInfo"))?; self.channel_name = payload_info.get_pvname().into(); self.payload_type = payload_info.get_field_type(); @@ -114,45 +118,51 @@ impl PbFileReader { self.fill_buf().await?; let k = self.find_next_nl()?; let buf = &mut self.buf; - let m = unescape_archapp_msg(&buf[self.rp..k])?; + let m = mem::replace(&mut self.escbuf, vec![]); + let m = unescape_archapp_msg(&buf[self.rp..k], m)?; + self.escbuf = m; + let m = &self.escbuf; use PayloadType::*; let ei = match self.payload_type { - SCALAR_BYTE => parse_scalar_byte(&m, self.year)?, + SCALAR_BYTE => parse_scalar_byte(m, self.year)?, SCALAR_ENUM => { - scalar_parse!(&m, self.year, ScalarEnum, Int, i32) + scalar_parse!(m, self.year, ScalarEnum, Int, i32) } SCALAR_SHORT => { - scalar_parse!(&m, self.year, ScalarShort, Short, i16) + scalar_parse!(m, self.year, ScalarShort, Short, i16) } SCALAR_INT => { - scalar_parse!(&m, self.year, ScalarInt, Int, i32) + scalar_parse!(m, self.year, ScalarInt, Int, i32) } SCALAR_FLOAT => { - scalar_parse!(&m, self.year, ScalarFloat, Float, f32) + scalar_parse!(m, self.year, ScalarFloat, Float, f32) } SCALAR_DOUBLE => { - scalar_parse!(&m, self.year, ScalarDouble, Double, f64) + scalar_parse!(m, self.year, ScalarDouble, Double, f64) } WAVEFORM_BYTE => { - wave_parse!(&m, self.year, VectorChar, Byte, i8) + wave_parse!(m, self.year, VectorChar, Byte, i8) } WAVEFORM_SHORT => { - wave_parse!(&m, self.year, VectorShort, Short, i16) + wave_parse!(m, self.year, VectorShort, Short, i16) } WAVEFORM_ENUM => { - wave_parse!(&m, self.year, VectorEnum, Int, i32) + wave_parse!(m, self.year, VectorEnum, Int, i32) } WAVEFORM_INT => { - wave_parse!(&m, self.year, VectorInt, Int, i32) + wave_parse!(m, self.year, VectorInt, Int, i32) } WAVEFORM_FLOAT => { - wave_parse!(&m, self.year, VectorFloat, Float, f32) + wave_parse!(m, self.year, VectorFloat, Float, f32) } WAVEFORM_DOUBLE => { - wave_parse!(&m, self.year, VectorDouble, Double, f64) + wave_parse!(m, self.year, VectorDouble, Double, f64) } SCALAR_STRING | WAVEFORM_STRING | V4_GENERIC_BYTES => { - return Err(Error::with_msg(format!("not supported: {:?}", self.payload_type))); + return Err(Error::with_msg_no_trace(format!( + "not supported: {:?}", + self.payload_type + ))); } }; self.rp = k + 1; @@ -163,11 +173,12 @@ impl PbFileReader { if self.wp - self.rp >= MIN_BUF_FILL { return Ok(()); } - if self.rp >= self.buf.len() - MIN_BUF_FILL { + if self.rp + MIN_BUF_FILL >= self.buf.len() { let n = self.wp - self.rp; - for i in 0..n { - self.buf[i] = self.buf[self.rp + i]; - } + self.buf.copy_within(self.rp..self.rp + n, 0); + //for i in 0..n { + // self.buf[i] = self.buf[self.rp + i]; + //} self.rp = 0; self.wp = n; } @@ -219,7 +230,7 @@ pub struct EpicsEventPayloadInfo { } // TODO remove in favor of PbFileRead -async fn read_pb_file(mut f1: File) -> Result<(EpicsEventPayloadInfo, File), Error> { +async fn _read_pb_file(mut f1: File) -> Result<(EpicsEventPayloadInfo, File), Error> { let mut buf = vec![0; 1024 * 4]; { let mut i1 = 0; @@ -254,7 +265,7 @@ async fn read_pb_file(mut f1: File) -> Result<(EpicsEventPayloadInfo, File), Err } if i2 != usize::MAX { //info!("got NL {} .. {}", j1, i2); - let m = unescape_archapp_msg(&buf[j1..i2])?; + let m = unescape_archapp_msg(&buf[j1..i2], vec![])?; if j1 == 0 { payload_info = crate::generated::EPICSEvent::PayloadInfo::parse_from_bytes(&m) .map_err(|_| Error::with_msg("can not parse PayloadInfo"))?; diff --git a/archapp/src/test.rs b/archapp/src/test.rs index 69c3b03..023712c 100644 --- a/archapp/src/test.rs +++ b/archapp/src/test.rs @@ -24,7 +24,7 @@ fn read_pb_00() -> Result<(), Error> { } if i2 != usize::MAX { info!("got NL {} .. {}", j1, i2); - let m = unescape_archapp_msg(&f1[j1..i2])?; + let m = unescape_archapp_msg(&f1[j1..i2], vec![])?; if j1 == 0 { let payload_info = crate::generated::EPICSEvent::PayloadInfo::parse_from_bytes(&m).unwrap(); info!("got payload_info: {:?}", payload_info);