Frame the archapp variants in compatible way

This commit is contained in:
Dominik Werder
2021-07-27 16:17:35 +02:00
parent 28db398714
commit 002139bfae
4 changed files with 261 additions and 55 deletions

View File

@@ -1,5 +1,7 @@
use crate::parse::PbFileReader;
use crate::{EventsItem, PlainEvents, ScalarPlainEvents, WavePlainEvents, XBinnedEvents};
use crate::{
EventsItem, MultiBinWaveEvents, PlainEvents, ScalarPlainEvents, SingleBinWaveEvents, WavePlainEvents, XBinnedEvents,
};
use chrono::{TimeZone, Utc};
use err::Error;
use futures_core::Stream;
@@ -8,8 +10,9 @@ use items::eventvalues::EventValues;
use items::waveevents::{WaveEvents, WaveXBinner};
use items::xbinnedscalarevents::XBinnedScalarEvents;
use items::xbinnedwaveevents::XBinnedWaveEvents;
use items::RangeCompletableItem::RangeComplete;
use items::{EventsNodeProcessor, Framable, RangeCompletableItem, Sitemty, SitemtyFrameType, StreamItem};
use items::{
EventsNodeProcessor, Framable, RangeCompletableItem, Sitemty, SitemtyFrameType, StreamItem, WithLen, WithTimestamps,
};
use netpod::log::*;
use netpod::query::RawEventsQuery;
use netpod::timeunits::{DAY, SEC};
@@ -172,14 +175,7 @@ impl FrameMaker {
where
T: SitemtyFrameType + Serialize + Send + 'static,
{
match item {
Ok(_) => err::todoval(),
Err(e) => {
//let t = Ok(StreamItem::DataItem(RangeCompletableItem::Data()))
let t: Sitemty<T> = Err(e);
Box::new(t)
}
}
err::todoval()
}
}
@@ -220,18 +216,104 @@ macro_rules! events_item_to_sitemty {
}};
}
macro_rules! arm2 {
($item:expr, $t1:ident, $t2:ident, $t3:ident, $t4:ident, $t5:ident, $sty1:ident, $sty2:ident) => {{
type T1 = $t1<$sty1>;
let ret: Sitemty<T1> = match $item {
Ok(k) => match k {
StreamItem::DataItem(k) => match k {
RangeCompletableItem::RangeComplete => {
Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))
}
RangeCompletableItem::Data(k) => match k {
EventsItem::$t2(k) => match k {
$t3::$t4(k) => match k {
$t5::$sty2(k) => {
//
Ok(StreamItem::DataItem(RangeCompletableItem::Data(k)))
}
_ => panic!(),
},
_ => panic!(),
},
_ => err::todoval(),
},
},
StreamItem::Log(k) => Ok(StreamItem::Log(k)),
StreamItem::Stats(k) => Ok(StreamItem::Stats(k)),
},
Err(e) => Err(e),
};
Box::new(ret) as Box<dyn Framable>
}};
}
macro_rules! arm1 {
($item:expr, $sty:ident, $shape:expr, $ak:expr) => {{
($item:expr, $sty1:ident, $sty2:ident, $shape:expr, $ak:expr) => {{
match $shape {
Shape::Scalar => match $ak {
AggKind::Plain => Self::make_frame_gen::<EventValues<$sty>>($item),
AggKind::DimXBins1 => Self::make_frame_gen::<EventValues<$sty>>($item),
AggKind::DimXBinsN(_) => Self::make_frame_gen::<EventValues<$sty>>($item),
AggKind::Plain => arm2!(
$item,
EventValues,
Plain,
PlainEvents,
Scalar,
ScalarPlainEvents,
$sty1,
$sty2
),
AggKind::DimXBins1 => arm2!(
$item,
EventValues,
XBinnedEvents,
XBinnedEvents,
Scalar,
ScalarPlainEvents,
$sty1,
$sty2
),
AggKind::DimXBinsN(_) => arm2!(
$item,
EventValues,
XBinnedEvents,
XBinnedEvents,
Scalar,
ScalarPlainEvents,
$sty1,
$sty2
),
},
Shape::Wave(_) => match $ak {
AggKind::Plain => Self::make_frame_gen::<WaveEvents<$sty>>($item),
AggKind::DimXBins1 => Self::make_frame_gen::<XBinnedScalarEvents<$sty>>($item),
AggKind::DimXBinsN(_) => Self::make_frame_gen::<XBinnedWaveEvents<$sty>>($item),
AggKind::Plain => arm2!(
$item,
WaveEvents,
Plain,
PlainEvents,
Wave,
WavePlainEvents,
$sty1,
$sty2
),
AggKind::DimXBins1 => arm2!(
$item,
XBinnedScalarEvents,
XBinnedEvents,
XBinnedEvents,
SingleBinWave,
SingleBinWaveEvents,
$sty1,
$sty2
),
AggKind::DimXBinsN(_) => arm2!(
$item,
XBinnedWaveEvents,
XBinnedEvents,
XBinnedEvents,
MultiBinWave,
MultiBinWaveEvents,
$sty1,
$sty2
),
},
}
}};
@@ -244,11 +326,11 @@ impl FrameMakerTrait for FrameMaker {
// Therefore, I need to decide that based on given parameters.
// see also channel_info in this mod.
match self.scalar_type {
ScalarType::I8 => arm1!(item, i8, self.shape, self.agg_kind),
ScalarType::I16 => arm1!(item, i16, self.shape, self.agg_kind),
ScalarType::I32 => arm1!(item, i32, self.shape, self.agg_kind),
ScalarType::F32 => arm1!(item, f32, self.shape, self.agg_kind),
ScalarType::F64 => arm1!(item, f64, self.shape, self.agg_kind),
ScalarType::I8 => arm1!(item, i8, Byte, self.shape, self.agg_kind),
ScalarType::I16 => arm1!(item, i16, Short, self.shape, self.agg_kind),
ScalarType::I32 => arm1!(item, i32, Int, self.shape, self.agg_kind),
ScalarType::F32 => arm1!(item, f32, Float, self.shape, self.agg_kind),
ScalarType::F64 => arm1!(item, f64, Double, self.shape, self.agg_kind),
_ => err::todoval(),
}
}
@@ -297,7 +379,16 @@ pub async fn make_single_event_pipe(
// TODO first collect all matching filenames, then sort, then open files.
// TODO if dir does not exist, should notify client but not log as error.
let mut rd = tokio::fs::read_dir(&dir).await?;
let mut rd = match tokio::fs::read_dir(&dir).await {
Ok(k) => k,
Err(e) => match e.kind() {
std::io::ErrorKind::NotFound => {
warn!("does not exist: {:?}", dir);
return Ok(());
}
_ => return Err(e)?,
},
};
while let Some(de) = rd.next_entry().await? {
let s = de.file_name().to_string_lossy().into_owned();
if s.starts_with(&prefix) && s.ends_with(".pb") {
@@ -315,13 +406,24 @@ pub async fn make_single_event_pipe(
let mut pbr = PbFileReader::new(f1).await;
pbr.read_header().await?;
info!("✓ read header {:?}", pbr.payload_type());
loop {
let mut i1 = 0;
'evread: loop {
match pbr.read_msg().await {
Ok(ei) => {
info!("read msg from file");
let tslast = if ei.len() > 0 { Some(ei.ts(ei.len() - 1)) } else { None };
i1 += 1;
if i1 % 1000 == 0 {
info!("read msg from file {}", i1);
}
let ei2 = ei.x_aggregate(&evq.agg_kind);
let g = Ok(StreamItem::DataItem(RangeCompletableItem::Data(ei2)));
tx.send(g).await?;
if let Some(t) = tslast {
if t >= evq.range.end {
info!("after requested range, break");
break 'evread;
}
}
}
Err(e) => {
error!("error while reading msg {:?}", e);

View File

@@ -12,6 +12,7 @@ use items::eventvalues::EventValues;
use items::numops::NumOps;
use items::waveevents::{WaveEvents, WaveXBinner};
use items::xbinnedscalarevents::XBinnedScalarEvents;
use items::xbinnedwaveevents::XBinnedWaveEvents;
use items::{EventsNodeProcessor, Framable, SitemtyFrameType, WithLen, WithTimestamps};
use netpod::{AggKind, HasScalarType, HasShape, ScalarType, Shape};
#[cfg(not(feature = "devread"))]
@@ -22,8 +23,8 @@ pub mod events;
#[cfg(test)]
pub mod test;
fn unescape_archapp_msg(inp: &[u8]) -> Result<Vec<u8>, Error> {
let mut ret = Vec::with_capacity(inp.len() * 5 / 4);
fn unescape_archapp_msg(inp: &[u8], mut ret: Vec<u8>) -> Result<Vec<u8>, Error> {
ret.clear();
let mut esc = false;
for &k in inp.iter() {
if k == 0x1b {
@@ -36,7 +37,7 @@ fn unescape_archapp_msg(inp: &[u8]) -> Result<Vec<u8>, Error> {
} else if k == 0x3 {
ret.push(0xd);
} else {
return Err(Error::with_msg("malformed escaped archapp message"));
return Err(Error::with_msg_no_trace("malformed escaped archapp message"));
}
esc = false;
} else {
@@ -227,6 +228,92 @@ impl HasScalarType for WavePlainEvents {
}
}
#[derive(Debug)]
pub enum MultiBinWaveEvents {
Byte(XBinnedWaveEvents<i8>),
Short(XBinnedWaveEvents<i16>),
Int(XBinnedWaveEvents<i32>),
Float(XBinnedWaveEvents<f32>),
Double(XBinnedWaveEvents<f64>),
}
impl MultiBinWaveEvents {
pub fn variant_name(&self) -> String {
use MultiBinWaveEvents::*;
match self {
Byte(h) => format!("Byte"),
Short(h) => format!("Short"),
Int(h) => format!("Int"),
Float(h) => format!("Float"),
Double(h) => format!("Double"),
}
}
fn x_aggregate(self, ak: &AggKind) -> EventsItem {
use MultiBinWaveEvents::*;
match self {
Byte(k) => match ak {
AggKind::Plain => EventsItem::XBinnedEvents(XBinnedEvents::MultiBinWave(MultiBinWaveEvents::Byte(k))),
AggKind::DimXBins1 => err::todoval(),
AggKind::DimXBinsN(_) => EventsItem::Plain(PlainEvents::Wave(err::todoval())),
},
_ => err::todoval(),
}
}
}
impl WithLen for MultiBinWaveEvents {
fn len(&self) -> usize {
use MultiBinWaveEvents::*;
match self {
Byte(j) => j.len(),
Short(j) => j.len(),
Int(j) => j.len(),
Float(j) => j.len(),
Double(j) => j.len(),
}
}
}
impl WithTimestamps for MultiBinWaveEvents {
fn ts(&self, ix: usize) -> u64 {
use MultiBinWaveEvents::*;
match self {
Byte(j) => j.ts(ix),
Short(j) => j.ts(ix),
Int(j) => j.ts(ix),
Float(j) => j.ts(ix),
Double(j) => j.ts(ix),
}
}
}
impl HasShape for MultiBinWaveEvents {
fn shape(&self) -> Shape {
use MultiBinWaveEvents::*;
match self {
Byte(h) => Shape::Scalar,
Short(h) => Shape::Scalar,
Int(h) => Shape::Scalar,
Float(h) => Shape::Scalar,
Double(h) => Shape::Scalar,
}
}
}
impl HasScalarType for MultiBinWaveEvents {
fn scalar_type(&self) -> ScalarType {
use MultiBinWaveEvents::*;
match self {
Byte(h) => ScalarType::I8,
Short(h) => ScalarType::I16,
Int(h) => ScalarType::I32,
Float(h) => ScalarType::F32,
Double(h) => ScalarType::F64,
}
}
}
#[derive(Debug)]
pub enum SingleBinWaveEvents {
Byte(XBinnedScalarEvents<i8>),
@@ -256,7 +343,7 @@ impl SingleBinWaveEvents {
AggKind::DimXBins1 => err::todoval(),
AggKind::DimXBinsN(_) => EventsItem::Plain(PlainEvents::Wave(err::todoval())),
},
_ => panic!(),
_ => err::todoval(),
}
}
}
@@ -317,7 +404,7 @@ impl HasScalarType for SingleBinWaveEvents {
pub enum XBinnedEvents {
Scalar(ScalarPlainEvents),
SingleBinWave(SingleBinWaveEvents),
//MultiBinWave,
MultiBinWave(MultiBinWaveEvents),
}
impl XBinnedEvents {
@@ -326,6 +413,7 @@ impl XBinnedEvents {
match self {
Scalar(h) => format!("Scalar({})", h.variant_name()),
SingleBinWave(h) => format!("SingleBinWave({})", h.variant_name()),
MultiBinWave(h) => format!("MultiBinWave({})", h.variant_name()),
}
}
@@ -334,6 +422,7 @@ impl XBinnedEvents {
match self {
Scalar(k) => EventsItem::Plain(PlainEvents::Scalar(k)),
SingleBinWave(k) => k.x_aggregate(ak),
MultiBinWave(k) => k.x_aggregate(ak),
}
}
}
@@ -344,6 +433,7 @@ impl WithLen for XBinnedEvents {
match self {
Scalar(j) => j.len(),
SingleBinWave(j) => j.len(),
MultiBinWave(j) => j.len(),
}
}
}
@@ -354,6 +444,7 @@ impl WithTimestamps for XBinnedEvents {
match self {
Scalar(j) => j.ts(ix),
SingleBinWave(j) => j.ts(ix),
MultiBinWave(j) => j.ts(ix),
}
}
}
@@ -364,6 +455,7 @@ impl HasShape for XBinnedEvents {
match self {
Scalar(h) => h.shape(),
SingleBinWave(h) => h.shape(),
MultiBinWave(h) => h.shape(),
}
}
}
@@ -374,6 +466,7 @@ impl HasScalarType for XBinnedEvents {
match self {
Scalar(h) => h.scalar_type(),
SingleBinWave(h) => h.scalar_type(),
MultiBinWave(h) => h.scalar_type(),
}
}
}

View File

@@ -14,6 +14,7 @@ use serde::Serialize;
use serde_json::Value as JsonValue;
use std::collections::{BTreeMap, VecDeque};
use std::fs::FileType;
use std::mem;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Instant;
@@ -23,6 +24,7 @@ use tokio::io::AsyncReadExt;
pub struct PbFileReader {
file: File,
buf: Vec<u8>,
escbuf: Vec<u8>,
wp: usize,
rp: usize,
channel_name: String,
@@ -81,13 +83,14 @@ macro_rules! wave_parse {
}};
}
const MIN_BUF_FILL: usize = 1024 * 16;
const MIN_BUF_FILL: usize = 1024 * 64;
impl PbFileReader {
pub async fn new(file: File) -> Self {
Self {
file,
buf: vec![0; MIN_BUF_FILL * 4],
escbuf: vec![],
wp: 0,
rp: 0,
channel_name: String::new(),
@@ -100,8 +103,9 @@ impl PbFileReader {
self.fill_buf().await?;
let k = self.find_next_nl()?;
let buf = &mut self.buf;
let m = unescape_archapp_msg(&buf[self.rp..k])?;
let payload_info = crate::generated::EPICSEvent::PayloadInfo::parse_from_bytes(&m)
let m = unescape_archapp_msg(&buf[self.rp..k], mem::replace(&mut self.escbuf, vec![]))?;
self.escbuf = m;
let payload_info = crate::generated::EPICSEvent::PayloadInfo::parse_from_bytes(&self.escbuf)
.map_err(|_| Error::with_msg("can not parse PayloadInfo"))?;
self.channel_name = payload_info.get_pvname().into();
self.payload_type = payload_info.get_field_type();
@@ -114,45 +118,51 @@ impl PbFileReader {
self.fill_buf().await?;
let k = self.find_next_nl()?;
let buf = &mut self.buf;
let m = unescape_archapp_msg(&buf[self.rp..k])?;
let m = mem::replace(&mut self.escbuf, vec![]);
let m = unescape_archapp_msg(&buf[self.rp..k], m)?;
self.escbuf = m;
let m = &self.escbuf;
use PayloadType::*;
let ei = match self.payload_type {
SCALAR_BYTE => parse_scalar_byte(&m, self.year)?,
SCALAR_BYTE => parse_scalar_byte(m, self.year)?,
SCALAR_ENUM => {
scalar_parse!(&m, self.year, ScalarEnum, Int, i32)
scalar_parse!(m, self.year, ScalarEnum, Int, i32)
}
SCALAR_SHORT => {
scalar_parse!(&m, self.year, ScalarShort, Short, i16)
scalar_parse!(m, self.year, ScalarShort, Short, i16)
}
SCALAR_INT => {
scalar_parse!(&m, self.year, ScalarInt, Int, i32)
scalar_parse!(m, self.year, ScalarInt, Int, i32)
}
SCALAR_FLOAT => {
scalar_parse!(&m, self.year, ScalarFloat, Float, f32)
scalar_parse!(m, self.year, ScalarFloat, Float, f32)
}
SCALAR_DOUBLE => {
scalar_parse!(&m, self.year, ScalarDouble, Double, f64)
scalar_parse!(m, self.year, ScalarDouble, Double, f64)
}
WAVEFORM_BYTE => {
wave_parse!(&m, self.year, VectorChar, Byte, i8)
wave_parse!(m, self.year, VectorChar, Byte, i8)
}
WAVEFORM_SHORT => {
wave_parse!(&m, self.year, VectorShort, Short, i16)
wave_parse!(m, self.year, VectorShort, Short, i16)
}
WAVEFORM_ENUM => {
wave_parse!(&m, self.year, VectorEnum, Int, i32)
wave_parse!(m, self.year, VectorEnum, Int, i32)
}
WAVEFORM_INT => {
wave_parse!(&m, self.year, VectorInt, Int, i32)
wave_parse!(m, self.year, VectorInt, Int, i32)
}
WAVEFORM_FLOAT => {
wave_parse!(&m, self.year, VectorFloat, Float, f32)
wave_parse!(m, self.year, VectorFloat, Float, f32)
}
WAVEFORM_DOUBLE => {
wave_parse!(&m, self.year, VectorDouble, Double, f64)
wave_parse!(m, self.year, VectorDouble, Double, f64)
}
SCALAR_STRING | WAVEFORM_STRING | V4_GENERIC_BYTES => {
return Err(Error::with_msg(format!("not supported: {:?}", self.payload_type)));
return Err(Error::with_msg_no_trace(format!(
"not supported: {:?}",
self.payload_type
)));
}
};
self.rp = k + 1;
@@ -163,11 +173,12 @@ impl PbFileReader {
if self.wp - self.rp >= MIN_BUF_FILL {
return Ok(());
}
if self.rp >= self.buf.len() - MIN_BUF_FILL {
if self.rp + MIN_BUF_FILL >= self.buf.len() {
let n = self.wp - self.rp;
for i in 0..n {
self.buf[i] = self.buf[self.rp + i];
}
self.buf.copy_within(self.rp..self.rp + n, 0);
//for i in 0..n {
// self.buf[i] = self.buf[self.rp + i];
//}
self.rp = 0;
self.wp = n;
}
@@ -219,7 +230,7 @@ pub struct EpicsEventPayloadInfo {
}
// TODO remove in favor of PbFileRead
async fn read_pb_file(mut f1: File) -> Result<(EpicsEventPayloadInfo, File), Error> {
async fn _read_pb_file(mut f1: File) -> Result<(EpicsEventPayloadInfo, File), Error> {
let mut buf = vec![0; 1024 * 4];
{
let mut i1 = 0;
@@ -254,7 +265,7 @@ async fn read_pb_file(mut f1: File) -> Result<(EpicsEventPayloadInfo, File), Err
}
if i2 != usize::MAX {
//info!("got NL {} .. {}", j1, i2);
let m = unescape_archapp_msg(&buf[j1..i2])?;
let m = unescape_archapp_msg(&buf[j1..i2], vec![])?;
if j1 == 0 {
payload_info = crate::generated::EPICSEvent::PayloadInfo::parse_from_bytes(&m)
.map_err(|_| Error::with_msg("can not parse PayloadInfo"))?;

View File

@@ -24,7 +24,7 @@ fn read_pb_00() -> Result<(), Error> {
}
if i2 != usize::MAX {
info!("got NL {} .. {}", j1, i2);
let m = unescape_archapp_msg(&f1[j1..i2])?;
let m = unescape_archapp_msg(&f1[j1..i2], vec![])?;
if j1 == 0 {
let payload_info = crate::generated::EPICSEvent::PayloadInfo::parse_from_bytes(&m).unwrap();
info!("got payload_info: {:?}", payload_info);