Introduce EventAppendable

This commit is contained in:
Dominik Werder
2021-07-21 18:48:54 +02:00
parent d1401bffd5
commit 2502f7a574
14 changed files with 642 additions and 203 deletions

View File

@@ -1,15 +1,20 @@
use crate::parse::PbFileReader;
use crate::EventsItem;
use crate::{EventsItem, PlainEvents, ScalarPlainEvents, WavePlainEvents, XBinnedEvents};
use chrono::{TimeZone, Utc};
use err::Error;
use futures_core::Stream;
use futures_util::StreamExt;
use items::eventvalues::EventValues;
use items::{Framable, RangeCompletableItem, Sitemty, StreamItem};
use items::waveevents::{WaveEvents, WaveXBinner};
use items::xbinnedscalarevents::XBinnedScalarEvents;
use items::xbinnedwaveevents::XBinnedWaveEvents;
use items::RangeCompletableItem::RangeComplete;
use items::{EventsNodeProcessor, Framable, RangeCompletableItem, Sitemty, SitemtyFrameType, StreamItem};
use netpod::log::*;
use netpod::query::RawEventsQuery;
use netpod::timeunits::{DAY, SEC};
use netpod::{ArchiverAppliance, Channel, ChannelInfo, NanoRange, ScalarType, Shape};
use netpod::{AggKind, ArchiverAppliance, Channel, ChannelInfo, HasScalarType, HasShape, NanoRange, ScalarType, Shape};
use serde::Serialize;
use serde_json::Value as JsonValue;
use std::path::PathBuf;
use std::pin::Pin;
@@ -159,18 +164,47 @@ trait FrameMakerTrait: Send {
struct FrameMaker {
scalar_type: ScalarType,
shape: Shape,
agg_kind: AggKind,
}
impl FrameMaker {
fn make_frame_gen<T>(item: Sitemty<EventsItem>) -> Box<dyn Framable>
where
T: SitemtyFrameType + Serialize + Send + 'static,
{
match item {
Ok(_) => err::todoval(),
Err(e) => {
//let t = Ok(StreamItem::DataItem(RangeCompletableItem::Data()))
let t: Sitemty<T> = Err(e);
Box::new(t)
}
}
}
}
macro_rules! events_item_to_sitemty {
($ei:expr, $var:ident) => {{
let d = match $ei {
Ok(j) => match j {
StreamItem::DataItem(j) => match j {
RangeCompletableItem::Data(j) => {
if let EventsItem::$var(j) = j {
Ok(StreamItem::DataItem(RangeCompletableItem::Data(j)))
} else {
Err(Error::with_msg_no_trace("unexpected variant"))
($ei:expr, $t1:ident, $t2:ident, $t3:ident) => {{
let ret = match $ei {
Ok(k) => match k {
StreamItem::DataItem(k) => match k {
RangeCompletableItem::Data(k) => {
//
match k {
EventsItem::Plain(h) => {
//
match h {
PlainEvents::$t1(h) => {
//
match h {
$t2::$t3(h) => Ok(StreamItem::DataItem(RangeCompletableItem::Data(h))),
_ => panic!(),
}
}
_ => panic!(),
}
}
_ => panic!(),
}
}
RangeCompletableItem::RangeComplete => {
@@ -182,30 +216,40 @@ macro_rules! events_item_to_sitemty {
},
Err(e) => Err(e),
};
Box::new(d)
Box::new(ret)
}};
}
macro_rules! arm1 {
($item:expr, $sty:ident, $shape:expr, $ak:expr) => {{
match $shape {
Shape::Scalar => match $ak {
AggKind::Plain => Self::make_frame_gen::<EventValues<$sty>>($item),
AggKind::DimXBins1 => Self::make_frame_gen::<EventValues<$sty>>($item),
AggKind::DimXBinsN(_) => Self::make_frame_gen::<EventValues<$sty>>($item),
},
Shape::Wave(_) => match $ak {
AggKind::Plain => Self::make_frame_gen::<WaveEvents<$sty>>($item),
AggKind::DimXBins1 => Self::make_frame_gen::<XBinnedScalarEvents<$sty>>($item),
AggKind::DimXBinsN(_) => Self::make_frame_gen::<XBinnedWaveEvents<$sty>>($item),
},
}
}};
}
impl FrameMakerTrait for FrameMaker {
fn make_frame(&self, ei: Sitemty<EventsItem>) -> Box<dyn Framable> {
fn make_frame(&self, item: Sitemty<EventsItem>) -> Box<dyn Framable> {
// Take from `self` the expected inner type.
// If `ei` is not some data, then I can't dynamically determine the expected T of Sitemty.
// Therefore, I need to decide that based on given parameters.
// see also channel_info in this mod.
match self.shape {
Shape::Scalar => match self.scalar_type {
ScalarType::I8 => events_item_to_sitemty!(ei, ScalarByte),
ScalarType::I16 => events_item_to_sitemty!(ei, ScalarShort),
ScalarType::I32 => events_item_to_sitemty!(ei, ScalarInt),
ScalarType::F32 => events_item_to_sitemty!(ei, ScalarFloat),
ScalarType::F64 => events_item_to_sitemty!(ei, ScalarDouble),
_ => panic!(),
},
Shape::Wave(_) => match self.scalar_type {
ScalarType::I8 => events_item_to_sitemty!(ei, WaveByte),
ScalarType::I16 => events_item_to_sitemty!(ei, WaveShort),
ScalarType::I32 => events_item_to_sitemty!(ei, WaveInt),
ScalarType::F32 => events_item_to_sitemty!(ei, WaveFloat),
ScalarType::F64 => events_item_to_sitemty!(ei, WaveDouble),
_ => panic!(),
},
match self.scalar_type {
ScalarType::I8 => arm1!(item, i8, self.shape, self.agg_kind),
ScalarType::I16 => arm1!(item, i16, self.shape, self.agg_kind),
ScalarType::I32 => arm1!(item, i32, self.shape, self.agg_kind),
ScalarType::F32 => arm1!(item, f32, self.shape, self.agg_kind),
ScalarType::F64 => arm1!(item, f64, self.shape, self.agg_kind),
_ => err::todoval(),
}
}
}
@@ -230,6 +274,7 @@ pub async fn make_event_pipe(
let frame_maker = Box::new(FrameMaker {
scalar_type: ci.scalar_type.clone(),
shape: ci.shape.clone(),
agg_kind: evq.agg_kind.clone(),
}) as Box<dyn FrameMakerTrait>;
let ret = sm.map(move |j| frame_maker.make_frame(j));
Ok(Box::pin(ret))
@@ -239,6 +284,8 @@ pub async fn make_single_event_pipe(
evq: &RawEventsQuery,
base_path: PathBuf,
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<EventsItem>> + Send>>, Error> {
// TODO must apply the proper x-binning depending on the requested AggKind.
info!("make_event_pipe {:?}", evq);
let evq = evq.clone();
let DirAndPrefix { dir, prefix } = directory_for_channel_files(&evq.channel, base_path)?;
@@ -249,6 +296,7 @@ pub async fn make_single_event_pipe(
info!("start read of {:?}", dir);
// TODO first collect all matching filenames, then sort, then open files.
// TODO if dir does not exist, should notify client but not log as error.
let mut rd = tokio::fs::read_dir(&dir).await?;
while let Some(de) = rd.next_entry().await? {
let s = de.file_name().to_string_lossy().into_owned();
@@ -271,7 +319,8 @@ pub async fn make_single_event_pipe(
match pbr.read_msg().await {
Ok(ei) => {
info!("read msg from file");
let g = Ok(StreamItem::DataItem(RangeCompletableItem::Data(ei)));
let ei2 = ei.x_aggregate(&evq.agg_kind);
let g = Ok(StreamItem::DataItem(RangeCompletableItem::Data(ei2)));
tx.send(g).await?;
}
Err(e) => {
@@ -307,7 +356,7 @@ pub async fn make_single_event_pipe(
#[allow(unused)]
fn events_item_to_framable(ei: EventsItem) -> Result<Box<dyn Framable + Send>, Error> {
match ei {
EventsItem::ScalarDouble(h) => {
EventsItem::Plain(PlainEvents::Scalar(ScalarPlainEvents::Int(h))) => {
let range: NanoRange = err::todoval();
let (x, y) = h
.tss
@@ -378,57 +427,9 @@ pub async fn channel_info(channel: &Channel, aa: &ArchiverAppliance) -> Result<C
match ev {
Ok(item) => {
msgs.push(format!("got event {:?}", item));
shape = Some(match &item {
EventsItem::ScalarByte(_) => Shape::Scalar,
EventsItem::ScalarShort(_) => Shape::Scalar,
EventsItem::ScalarInt(_) => Shape::Scalar,
EventsItem::ScalarFloat(_) => Shape::Scalar,
EventsItem::ScalarDouble(_) => Shape::Scalar,
// TODO use macro:
EventsItem::WaveByte(item) => Shape::Wave(
item.vals
.first()
.ok_or_else(|| Error::with_msg_no_trace("empty event batch"))?
.len() as u32,
),
EventsItem::WaveShort(item) => Shape::Wave(
item.vals
.first()
.ok_or_else(|| Error::with_msg_no_trace("empty event batch"))?
.len() as u32,
),
EventsItem::WaveInt(item) => Shape::Wave(
item.vals
.first()
.ok_or_else(|| Error::with_msg_no_trace("empty event batch"))?
.len() as u32,
),
EventsItem::WaveFloat(item) => Shape::Wave(
item.vals
.first()
.ok_or_else(|| Error::with_msg_no_trace("empty event batch"))?
.len() as u32,
),
EventsItem::WaveDouble(item) => Shape::Wave(
item.vals
.first()
.ok_or_else(|| Error::with_msg_no_trace("empty event batch"))?
.len() as u32,
),
});
shape = Some(item.shape());
// These type mappings are defined by the protobuffer schema.
scalar_type = Some(match item {
EventsItem::ScalarByte(_) => ScalarType::I8,
EventsItem::ScalarShort(_) => ScalarType::I16,
EventsItem::ScalarInt(_) => ScalarType::I32,
EventsItem::ScalarFloat(_) => ScalarType::F32,
EventsItem::ScalarDouble(_) => ScalarType::F64,
EventsItem::WaveByte(_) => ScalarType::I8,
EventsItem::WaveShort(_) => ScalarType::I16,
EventsItem::WaveInt(_) => ScalarType::I32,
EventsItem::WaveFloat(_) => ScalarType::F32,
EventsItem::WaveDouble(_) => ScalarType::F64,
});
scalar_type = Some(item.scalar_type());
break;
}
Err(e) => {

View File

@@ -9,8 +9,11 @@ pub mod parse;
#[cfg(not(feature = "devread"))]
pub mod parsestub;
use items::eventvalues::EventValues;
use items::waveevents::WaveEvents;
use items::{WithLen, WithTimestamps};
use items::numops::NumOps;
use items::waveevents::{WaveEvents, WaveXBinner};
use items::xbinnedscalarevents::XBinnedScalarEvents;
use items::{EventsNodeProcessor, Framable, SitemtyFrameType, WithLen, WithTimestamps};
use netpod::{AggKind, HasScalarType, HasShape, ScalarType, Shape};
#[cfg(not(feature = "devread"))]
pub use parsestub as parse;
@@ -43,46 +46,435 @@ fn unescape_archapp_msg(inp: &[u8]) -> Result<Vec<u8>, Error> {
Ok(ret)
}
#[derive(Debug)]
pub enum ScalarPlainEvents {
Byte(EventValues<i8>),
Short(EventValues<i16>),
Int(EventValues<i32>),
Float(EventValues<f32>),
Double(EventValues<f64>),
}
impl ScalarPlainEvents {
pub fn variant_name(&self) -> String {
use ScalarPlainEvents::*;
match self {
Byte(h) => format!("Byte"),
Short(h) => format!("Short"),
Int(h) => format!("Int"),
Float(h) => format!("Float"),
Double(h) => format!("Double"),
}
}
}
impl WithLen for ScalarPlainEvents {
fn len(&self) -> usize {
use ScalarPlainEvents::*;
match self {
Byte(j) => j.len(),
Short(j) => j.len(),
Int(j) => j.len(),
Float(j) => j.len(),
Double(j) => j.len(),
}
}
}
impl WithTimestamps for ScalarPlainEvents {
fn ts(&self, ix: usize) -> u64 {
use ScalarPlainEvents::*;
match self {
Byte(j) => j.ts(ix),
Short(j) => j.ts(ix),
Int(j) => j.ts(ix),
Float(j) => j.ts(ix),
Double(j) => j.ts(ix),
}
}
}
impl HasShape for ScalarPlainEvents {
fn shape(&self) -> Shape {
use ScalarPlainEvents::*;
match self {
_ => Shape::Scalar,
}
}
}
impl HasScalarType for ScalarPlainEvents {
fn scalar_type(&self) -> ScalarType {
use ScalarPlainEvents::*;
match self {
Byte(h) => ScalarType::I8,
Short(h) => ScalarType::I16,
Int(h) => ScalarType::I32,
Float(h) => ScalarType::F32,
Double(h) => ScalarType::F64,
}
}
}
#[derive(Debug)]
pub enum WavePlainEvents {
Byte(WaveEvents<i8>),
Short(WaveEvents<i16>),
Int(WaveEvents<i32>),
Float(WaveEvents<f32>),
Double(WaveEvents<f64>),
}
fn tmp1() {
let ev = EventValues::<u8> {
tss: vec![],
values: vec![],
};
<u8 as NumOps>::is_nan(err::todoval());
<EventValues<u8> as SitemtyFrameType>::FRAME_TYPE_ID;
//<Vec<u8> as NumOps>::is_nan(err::todoval());
//<EventValues<Vec<u8>> as SitemtyFrameType>::FRAME_TYPE_ID;
}
impl WavePlainEvents {
pub fn variant_name(&self) -> String {
use WavePlainEvents::*;
match self {
Byte(h) => format!("Byte({})", h.vals.first().map_or(0, |j| j.len())),
Short(h) => format!("Short({})", h.vals.first().map_or(0, |j| j.len())),
Int(h) => format!("Int({})", h.vals.first().map_or(0, |j| j.len())),
Float(h) => format!("Float({})", h.vals.first().map_or(0, |j| j.len())),
Double(h) => format!("Double({})", h.vals.first().map_or(0, |j| j.len())),
}
}
fn x_aggregate(self, ak: &AggKind) -> EventsItem {
use WavePlainEvents::*;
let shape = self.shape();
match self {
Byte(k) => match ak {
AggKind::Plain => EventsItem::Plain(PlainEvents::Wave(WavePlainEvents::Byte(k))),
AggKind::DimXBins1 => {
let p = WaveXBinner::create(shape, ak.clone());
let j = p.process(k);
EventsItem::XBinnedEvents(XBinnedEvents::SingleBinWave(SingleBinWaveEvents::Byte(j)))
}
AggKind::DimXBinsN(_) => EventsItem::Plain(PlainEvents::Wave(err::todoval())),
},
_ => panic!(),
}
}
}
impl WithLen for WavePlainEvents {
fn len(&self) -> usize {
use WavePlainEvents::*;
match self {
Byte(j) => j.len(),
Short(j) => j.len(),
Int(j) => j.len(),
Float(j) => j.len(),
Double(j) => j.len(),
}
}
}
impl WithTimestamps for WavePlainEvents {
fn ts(&self, ix: usize) -> u64 {
use WavePlainEvents::*;
match self {
Byte(j) => j.ts(ix),
Short(j) => j.ts(ix),
Int(j) => j.ts(ix),
Float(j) => j.ts(ix),
Double(j) => j.ts(ix),
}
}
}
impl HasShape for WavePlainEvents {
fn shape(&self) -> Shape {
use WavePlainEvents::*;
match self {
Byte(h) => Shape::Wave(h.vals.first().map_or(0, |x| x.len() as u32)),
Short(h) => Shape::Wave(h.vals.first().map_or(0, |x| x.len() as u32)),
Int(h) => Shape::Wave(h.vals.first().map_or(0, |x| x.len() as u32)),
Float(h) => Shape::Wave(h.vals.first().map_or(0, |x| x.len() as u32)),
Double(h) => Shape::Wave(h.vals.first().map_or(0, |x| x.len() as u32)),
}
}
}
impl HasScalarType for WavePlainEvents {
fn scalar_type(&self) -> ScalarType {
use WavePlainEvents::*;
match self {
Byte(h) => ScalarType::I8,
Short(h) => ScalarType::I16,
Int(h) => ScalarType::I32,
Float(h) => ScalarType::F32,
Double(h) => ScalarType::F64,
}
}
}
#[derive(Debug)]
pub enum SingleBinWaveEvents {
Byte(XBinnedScalarEvents<i8>),
Short(XBinnedScalarEvents<i16>),
Int(XBinnedScalarEvents<i32>),
Float(XBinnedScalarEvents<f32>),
Double(XBinnedScalarEvents<f64>),
}
impl SingleBinWaveEvents {
pub fn variant_name(&self) -> String {
use SingleBinWaveEvents::*;
match self {
Byte(h) => format!("Byte"),
Short(h) => format!("Short"),
Int(h) => format!("Int"),
Float(h) => format!("Float"),
Double(h) => format!("Double"),
}
}
fn x_aggregate(self, ak: &AggKind) -> EventsItem {
use SingleBinWaveEvents::*;
match self {
Byte(k) => match ak {
AggKind::Plain => EventsItem::XBinnedEvents(XBinnedEvents::SingleBinWave(SingleBinWaveEvents::Byte(k))),
AggKind::DimXBins1 => err::todoval(),
AggKind::DimXBinsN(_) => EventsItem::Plain(PlainEvents::Wave(err::todoval())),
},
_ => panic!(),
}
}
}
impl WithLen for SingleBinWaveEvents {
fn len(&self) -> usize {
use SingleBinWaveEvents::*;
match self {
Byte(j) => j.len(),
Short(j) => j.len(),
Int(j) => j.len(),
Float(j) => j.len(),
Double(j) => j.len(),
}
}
}
impl WithTimestamps for SingleBinWaveEvents {
fn ts(&self, ix: usize) -> u64 {
use SingleBinWaveEvents::*;
match self {
Byte(j) => j.ts(ix),
Short(j) => j.ts(ix),
Int(j) => j.ts(ix),
Float(j) => j.ts(ix),
Double(j) => j.ts(ix),
}
}
}
impl HasShape for SingleBinWaveEvents {
fn shape(&self) -> Shape {
use SingleBinWaveEvents::*;
match self {
Byte(h) => Shape::Scalar,
Short(h) => Shape::Scalar,
Int(h) => Shape::Scalar,
Float(h) => Shape::Scalar,
Double(h) => Shape::Scalar,
}
}
}
impl HasScalarType for SingleBinWaveEvents {
fn scalar_type(&self) -> ScalarType {
use SingleBinWaveEvents::*;
match self {
Byte(h) => ScalarType::I8,
Short(h) => ScalarType::I16,
Int(h) => ScalarType::I32,
Float(h) => ScalarType::F32,
Double(h) => ScalarType::F64,
}
}
}
#[derive(Debug)]
pub enum XBinnedEvents {
Scalar(ScalarPlainEvents),
SingleBinWave(SingleBinWaveEvents),
//MultiBinWave,
}
impl XBinnedEvents {
pub fn variant_name(&self) -> String {
use XBinnedEvents::*;
match self {
Scalar(h) => format!("Scalar({})", h.variant_name()),
SingleBinWave(h) => format!("SingleBinWave({})", h.variant_name()),
}
}
pub fn x_aggregate(self, ak: &AggKind) -> EventsItem {
use XBinnedEvents::*;
match self {
Scalar(k) => EventsItem::Plain(PlainEvents::Scalar(k)),
SingleBinWave(k) => k.x_aggregate(ak),
}
}
}
impl WithLen for XBinnedEvents {
fn len(&self) -> usize {
use XBinnedEvents::*;
match self {
Scalar(j) => j.len(),
SingleBinWave(j) => j.len(),
}
}
}
impl WithTimestamps for XBinnedEvents {
fn ts(&self, ix: usize) -> u64 {
use XBinnedEvents::*;
match self {
Scalar(j) => j.ts(ix),
SingleBinWave(j) => j.ts(ix),
}
}
}
impl HasShape for XBinnedEvents {
fn shape(&self) -> Shape {
use XBinnedEvents::*;
match self {
Scalar(h) => h.shape(),
SingleBinWave(h) => h.shape(),
}
}
}
impl HasScalarType for XBinnedEvents {
fn scalar_type(&self) -> ScalarType {
use XBinnedEvents::*;
match self {
Scalar(h) => h.scalar_type(),
SingleBinWave(h) => h.scalar_type(),
}
}
}
#[derive(Debug)]
pub enum PlainEvents {
Scalar(ScalarPlainEvents),
Wave(WavePlainEvents),
}
impl PlainEvents {
pub fn is_wave(&self) -> bool {
use PlainEvents::*;
match self {
Scalar(_) => false,
Wave(_) => true,
}
}
pub fn variant_name(&self) -> String {
use PlainEvents::*;
match self {
Scalar(h) => format!("Scalar({})", h.variant_name()),
Wave(h) => format!("Scalar({})", h.variant_name()),
}
}
pub fn x_aggregate(self, ak: &AggKind) -> EventsItem {
use PlainEvents::*;
match self {
Scalar(k) => EventsItem::Plain(PlainEvents::Scalar(k)),
Wave(k) => k.x_aggregate(ak),
}
}
}
impl WithLen for PlainEvents {
fn len(&self) -> usize {
use PlainEvents::*;
match self {
Scalar(j) => j.len(),
Wave(j) => j.len(),
}
}
}
impl WithTimestamps for PlainEvents {
fn ts(&self, ix: usize) -> u64 {
use PlainEvents::*;
match self {
Scalar(j) => j.ts(ix),
Wave(j) => j.ts(ix),
}
}
}
impl HasShape for PlainEvents {
fn shape(&self) -> Shape {
use PlainEvents::*;
match self {
Scalar(h) => h.shape(),
Wave(h) => h.shape(),
}
}
}
impl HasScalarType for PlainEvents {
fn scalar_type(&self) -> ScalarType {
use PlainEvents::*;
match self {
Scalar(h) => h.scalar_type(),
Wave(h) => h.scalar_type(),
}
}
}
#[derive(Debug)]
pub enum EventsItem {
ScalarByte(EventValues<i8>),
ScalarShort(EventValues<i16>),
ScalarInt(EventValues<i32>),
ScalarFloat(EventValues<f32>),
ScalarDouble(EventValues<f64>),
WaveByte(WaveEvents<i8>),
WaveShort(WaveEvents<i16>),
WaveInt(WaveEvents<i32>),
WaveFloat(WaveEvents<f32>),
WaveDouble(WaveEvents<f64>),
Plain(PlainEvents),
XBinnedEvents(XBinnedEvents),
}
impl EventsItem {
pub fn is_wave(&self) -> bool {
use EventsItem::*;
match self {
WaveByte(_) => true,
WaveShort(_) => true,
WaveInt(_) => true,
WaveFloat(_) => true,
WaveDouble(_) => true,
_ => false,
Plain(h) => h.is_wave(),
XBinnedEvents(h) => {
if let Shape::Wave(_) = h.shape() {
true
} else {
false
}
}
}
}
pub fn variant_name(&self) -> String {
use EventsItem::*;
match self {
ScalarByte(item) => format!("ScalarByte"),
ScalarShort(item) => format!("ScalarShort"),
ScalarInt(item) => format!("ScalarInt"),
ScalarFloat(item) => format!("ScalarFloat"),
ScalarDouble(item) => format!("ScalarDouble"),
WaveByte(item) => format!("WaveByte({})", item.len()),
WaveShort(item) => format!("WaveShort({})", item.len()),
WaveInt(item) => format!("WaveInt({})", item.len()),
WaveFloat(item) => format!("WaveFloat({})", item.len()),
WaveDouble(item) => format!("WaveDouble({})", item.len()),
Plain(h) => format!("Plain({})", h.variant_name()),
XBinnedEvents(h) => format!("Plain({})", h.variant_name()),
}
}
pub fn x_aggregate(self, ak: &AggKind) -> EventsItem {
use EventsItem::*;
match self {
Plain(k) => k.x_aggregate(ak),
XBinnedEvents(k) => k.x_aggregate(ak),
}
}
}
@@ -91,16 +483,8 @@ impl WithLen for EventsItem {
fn len(&self) -> usize {
use EventsItem::*;
match self {
ScalarByte(j) => j.len(),
ScalarShort(j) => j.len(),
ScalarInt(j) => j.len(),
ScalarFloat(j) => j.len(),
ScalarDouble(j) => j.len(),
WaveByte(j) => j.len(),
WaveShort(j) => j.len(),
WaveInt(j) => j.len(),
WaveFloat(j) => j.len(),
WaveDouble(j) => j.len(),
Plain(j) => j.len(),
XBinnedEvents(j) => j.len(),
}
}
}
@@ -109,16 +493,28 @@ impl WithTimestamps for EventsItem {
fn ts(&self, ix: usize) -> u64 {
use EventsItem::*;
match self {
ScalarByte(j) => j.ts(ix),
ScalarShort(j) => j.ts(ix),
ScalarInt(j) => j.ts(ix),
ScalarFloat(j) => j.ts(ix),
ScalarDouble(j) => j.ts(ix),
WaveByte(j) => j.ts(ix),
WaveShort(j) => j.ts(ix),
WaveInt(j) => j.ts(ix),
WaveFloat(j) => j.ts(ix),
WaveDouble(j) => j.ts(ix),
Plain(j) => j.ts(ix),
XBinnedEvents(j) => j.ts(ix),
}
}
}
impl HasShape for EventsItem {
fn shape(&self) -> Shape {
use EventsItem::*;
match self {
Plain(h) => h.shape(),
XBinnedEvents(h) => h.shape(),
}
}
}
impl HasScalarType for EventsItem {
fn scalar_type(&self) -> ScalarType {
use EventsItem::*;
match self {
Plain(h) => h.scalar_type(),
XBinnedEvents(h) => h.scalar_type(),
}
}
}

View File

@@ -1,6 +1,6 @@
use crate::events::parse_data_filename;
use crate::generated::EPICSEvent::PayloadType;
use crate::{unescape_archapp_msg, EventsItem};
use crate::{unescape_archapp_msg, EventsItem, PlainEvents, ScalarPlainEvents, WavePlainEvents};
use archapp_xc::*;
use async_channel::{bounded, Receiver};
use chrono::{TimeZone, Utc};
@@ -33,16 +33,16 @@ pub struct PbFileReader {
fn parse_scalar_byte(m: &[u8], year: u32) -> Result<EventsItem, Error> {
let msg = crate::generated::EPICSEvent::ScalarByte::parse_from_bytes(m)
.map_err(|_| Error::with_msg(format!("can not parse pb-type {}", "ScalarByte")))?;
let mut t = EventValues::<i32> {
let mut t = EventValues::<i8> {
tss: vec![],
values: vec![],
};
let yd = Utc.ymd(year as i32, 1, 1).and_hms(0, 0, 0);
let ts = yd.timestamp() as u64 * 1000000000 + msg.get_secondsintoyear() as u64 * 1000000000 + msg.get_nano() as u64;
let v = msg.get_val().first().map_or(0, |k| *k as i32);
let v = msg.get_val().first().map_or(0, |k| *k as i8);
t.tss.push(ts);
t.values.push(v);
Ok(EventsItem::ScalarByte(t))
Ok(EventsItem::Plain(PlainEvents::Scalar(ScalarPlainEvents::Byte(t))))
}
macro_rules! scalar_parse {
@@ -58,8 +58,8 @@ macro_rules! scalar_parse {
yd.timestamp() as u64 * 1000000000 + msg.get_secondsintoyear() as u64 * 1000000000 + msg.get_nano() as u64;
let v = msg.get_val();
t.tss.push(ts);
t.values.push(v);
EventsItem::$eit(t)
t.values.push(v as $evty);
EventsItem::Plain(PlainEvents::Scalar(ScalarPlainEvents::$eit(t)))
}};
}
@@ -76,8 +76,8 @@ macro_rules! wave_parse {
yd.timestamp() as u64 * 1000000000 + msg.get_secondsintoyear() as u64 * 1000000000 + msg.get_nano() as u64;
let v = msg.get_val();
t.tss.push(ts);
t.vals.push(v.to_vec());
EventsItem::$eit(t)
t.vals.push(v.into_iter().map(|&x| x as $evty).collect());
EventsItem::Plain(PlainEvents::Wave(WavePlainEvents::$eit(t)))
}};
}
@@ -119,37 +119,37 @@ impl PbFileReader {
let ei = match self.payload_type {
SCALAR_BYTE => parse_scalar_byte(&m, self.year)?,
SCALAR_ENUM => {
scalar_parse!(&m, self.year, ScalarEnum, ScalarInt, i32)
scalar_parse!(&m, self.year, ScalarEnum, Int, i32)
}
SCALAR_SHORT => {
scalar_parse!(&m, self.year, ScalarShort, ScalarShort, i32)
scalar_parse!(&m, self.year, ScalarShort, Short, i16)
}
SCALAR_INT => {
scalar_parse!(&m, self.year, ScalarInt, ScalarInt, i32)
scalar_parse!(&m, self.year, ScalarInt, Int, i32)
}
SCALAR_FLOAT => {
scalar_parse!(&m, self.year, ScalarFloat, ScalarFloat, f32)
scalar_parse!(&m, self.year, ScalarFloat, Float, f32)
}
SCALAR_DOUBLE => {
scalar_parse!(&m, self.year, ScalarDouble, ScalarDouble, f64)
scalar_parse!(&m, self.year, ScalarDouble, Double, f64)
}
WAVEFORM_BYTE => {
wave_parse!(&m, self.year, VectorChar, WaveByte, u8)
wave_parse!(&m, self.year, VectorChar, Byte, i8)
}
WAVEFORM_SHORT => {
wave_parse!(&m, self.year, VectorShort, WaveShort, i32)
wave_parse!(&m, self.year, VectorShort, Short, i16)
}
WAVEFORM_ENUM => {
wave_parse!(&m, self.year, VectorEnum, WaveInt, i32)
wave_parse!(&m, self.year, VectorEnum, Int, i32)
}
WAVEFORM_INT => {
wave_parse!(&m, self.year, VectorInt, WaveInt, i32)
wave_parse!(&m, self.year, VectorInt, Int, i32)
}
WAVEFORM_FLOAT => {
wave_parse!(&m, self.year, VectorFloat, WaveFloat, f32)
wave_parse!(&m, self.year, VectorFloat, Float, f32)
}
WAVEFORM_DOUBLE => {
wave_parse!(&m, self.year, VectorDouble, WaveDouble, f64)
wave_parse!(&m, self.year, VectorDouble, Double, f64)
}
SCALAR_STRING | WAVEFORM_STRING | V4_GENERIC_BYTES => {
return Err(Error::with_msg(format!("not supported: {:?}", self.payload_type)));
@@ -409,7 +409,7 @@ pub async fn scan_files_inner(
//tx.send(Ok(Box::new(path.clone()) as RT1)).await?;
let fns = pe.path.to_str().ok_or_else(|| Error::with_msg("invalid path string"))?;
if let Ok(fnp) = parse_data_filename(&fns) {
tx.send(Ok(Box::new(serde_json::to_value(fns)?) as ItemSerBox)).await?;
//tx.send(Ok(Box::new(serde_json::to_value(fns)?) as ItemSerBox)).await?;
let channel_path = &fns[proots.len() + 1..fns.len() - 11];
if !lru.query(channel_path) {
let mut pbr = PbFileReader::new(tokio::fs::File::open(&pe.path).await?).await;
@@ -432,22 +432,21 @@ pub async fn scan_files_inner(
dbconn::insert_channel(channel_path.into(), ndi.facility, &dbc).await?;
}
if let Ok(msg) = pbr.read_msg().await {
if msg.is_wave() {
lru.insert(channel_path);
{
tx.send(Ok(Box::new(serde_json::to_value(format!(
"found {} {}",
msg.variant_name(),
channel_path
"channel {} type {}",
pbr.channel_name(),
msg.variant_name()
))?) as ItemSerBox))
.await?;
waves_found += 1;
/*waves_found += 1;
if waves_found >= 20 {
break;
}
}*/
}
} else {
}
}
lru.insert(channel_path);
}
}
}