Switch to dyn events process

This commit is contained in:
Dominik Werder
2023-02-08 07:14:22 +01:00
parent 0da895ef50
commit 326fe793ce
27 changed files with 867 additions and 519 deletions

View File

@@ -22,7 +22,6 @@ bytes = "1.0.1"
crc32fast = "1.3.2"
arrayref = "0.3.6"
byteorder = "1.4.3"
futures-core = "0.3.14"
futures-util = "0.3.14"
async-stream = "0.3.0"
tracing = "0.1.25"

View File

@@ -1,5 +1,5 @@
use err::Error;
use futures_core::Stream;
use futures_util::Stream;
use futures_util::StreamExt;
use std::pin::Pin;
use std::task::{Context, Poll};

40
disk/src/channelconfig.rs Normal file
View File

@@ -0,0 +1,40 @@
use err::Error;
use netpod::Channel;
use netpod::ChannelConfig;
use netpod::NanoRange;
use netpod::NodeConfigCached;
use parse::channelconfig::extract_matching_config_entry;
use parse::channelconfig::read_local_config;
use parse::channelconfig::MatchingConfigEntry;
pub async fn config(
range: NanoRange,
channel: Channel,
node_config: &NodeConfigCached,
) -> Result<ChannelConfig, Error> {
let channel_config = read_local_config(channel.clone(), node_config.node.clone()).await?;
let entry_res = match extract_matching_config_entry(&range, &channel_config) {
Ok(k) => k,
Err(e) => return Err(e)?,
};
let entry = match entry_res {
MatchingConfigEntry::None => return Err(Error::with_public_msg("no config entry found"))?,
MatchingConfigEntry::Multiple => return Err(Error::with_public_msg("multiple config entries found"))?,
MatchingConfigEntry::Entry(entry) => entry,
};
let shape = match entry.to_shape() {
Ok(k) => k,
Err(e) => return Err(e)?,
};
let channel_config = ChannelConfig {
channel: channel.clone(),
keyspace: entry.ks as u8,
time_bin_size: entry.bs,
shape: shape,
scalar_type: entry.scalar_type.clone(),
byte_order: entry.byte_order.clone(),
array: entry.is_array,
compression: entry.is_compressed,
};
Ok(channel_config)
}

View File

@@ -1,20 +1,41 @@
use crate::agg::enp::Identity;
use crate::eventblobs::EventChunkerMultifile;
use err::Error;
use futures_core::Stream;
use futures_util::Stream;
use futures_util::StreamExt;
use items::eventfull::EventFull;
use items::eventsitem::EventsItem;
use items::numops::{BoolNum, NumOps, StringNum};
use items::plainevents::{PlainEvents, ScalarPlainEvents};
use items::numops::BoolNum;
use items::numops::NumOps;
use items::numops::StringNum;
use items::plainevents::PlainEvents;
use items::plainevents::ScalarPlainEvents;
use items::scalarevents::ScalarEvents;
use items::waveevents::{WaveEvents, WaveNBinner, WavePlainProc, WaveXBinner};
use items::{Appendable, EventAppendable, EventsNodeProcessor, RangeCompletableItem, Sitemty, StreamItem};
use netpod::{ScalarType, Shape};
use items::waveevents::WaveEvents;
use items::waveevents::WaveNBinner;
use items::waveevents::WavePlainProc;
use items::waveevents::WaveXBinner;
use items::Appendable;
use items::EventAppendable;
use items::EventsNodeProcessor;
use items::RangeCompletableItem;
use items::Sitemty;
use items::StreamItem;
use items_0::scalar_ops::ScalarOps;
use items_0::Events;
use items_2::eventsdim0::EventsDim0;
use items_2::eventsdim1::EventsDim1;
#[allow(unused)]
use netpod::log::*;
use netpod::AggKind;
use netpod::ScalarType;
use netpod::Shape;
use std::marker::PhantomData;
use std::mem;
use std::mem::size_of;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::task::Context;
use std::task::Poll;
pub trait Endianness: Send + Unpin {
fn is_big() -> bool;
@@ -32,6 +53,11 @@ impl Endianness for BigEndian {
}
}
pub enum Endian {
Little,
Big,
}
pub trait NumFromBytes<NTY, END> {
fn convert(buf: &[u8], big_endian: bool) -> NTY;
}
@@ -113,6 +139,247 @@ impl_num_from_bytes!(i64, 8);
impl_num_from_bytes!(f32, 4);
impl_num_from_bytes!(f64, 8);
pub trait ScalarValueFromBytes<STY> {
fn convert(buf: &[u8], endian: Endian) -> Result<STY, Error>;
fn convert_dim1(buf: &[u8], endian: Endian, n: usize) -> Result<Vec<STY>, Error>;
}
macro_rules! impl_scalar_value_from_bytes {
($nty:ident, $nl:expr) => {
impl ScalarValueFromBytes<$nty> for $nty {
// Error in data on disk:
// Can not rely on byte order as stated in the channel config.
// Endianness in sf-databuffer can be specified for each event.
fn convert(buf: &[u8], endian: Endian) -> Result<$nty, Error> {
//$nty::$ec(*arrayref::array_ref![buf, 0, $nl])
use Endian::*;
let ret = match endian {
Little => $nty::from_le_bytes(buf[..$nl].try_into()?),
Big => $nty::from_be_bytes(buf[..$nl].try_into()?),
};
Ok(ret)
}
fn convert_dim1(buf: &[u8], endian: Endian, n: usize) -> Result<Vec<$nty>, Error> {
let ret = buf
.chunks_exact(n.min($nl))
.map(|b2| {
use Endian::*;
let ret = match endian {
Little => $nty::from_le_bytes(b2[..$nl].try_into().unwrap()),
Big => $nty::from_be_bytes(b2[..$nl].try_into().unwrap()),
};
ret
})
.collect();
Ok(ret)
}
}
};
}
impl_scalar_value_from_bytes!(u8, 1);
impl_scalar_value_from_bytes!(u16, 2);
impl_scalar_value_from_bytes!(u32, 4);
impl_scalar_value_from_bytes!(u64, 8);
impl_scalar_value_from_bytes!(i8, 1);
impl_scalar_value_from_bytes!(i16, 2);
impl_scalar_value_from_bytes!(i32, 4);
impl_scalar_value_from_bytes!(i64, 8);
impl_scalar_value_from_bytes!(f32, 4);
impl_scalar_value_from_bytes!(f64, 8);
impl ScalarValueFromBytes<String> for String {
fn convert(buf: &[u8], _endian: Endian) -> Result<String, Error> {
let s = if buf.len() >= 255 {
String::from_utf8_lossy(&buf[..255])
} else {
String::from_utf8_lossy(buf)
};
Ok(s.into())
}
fn convert_dim1(buf: &[u8], _endian: Endian, _n: usize) -> Result<Vec<String>, Error> {
let s = if buf.len() >= 255 {
String::from_utf8_lossy(&buf[..255])
} else {
String::from_utf8_lossy(buf)
};
Ok(vec![s.into()])
}
}
impl ScalarValueFromBytes<bool> for bool {
fn convert(buf: &[u8], _endian: Endian) -> Result<bool, Error> {
if buf.len() >= 1 {
if buf[0] != 0 {
Ok(true)
} else {
Ok(false)
}
} else {
Ok(false)
}
}
fn convert_dim1(buf: &[u8], _endian: Endian, n: usize) -> Result<Vec<bool>, Error> {
let nn = buf.len().min(n);
Ok(buf.iter().take(nn).map(|&x| x != 0).collect())
}
}
pub trait ValueFromBytes: Send {
fn convert(&self, ts: u64, pulse: u64, buf: &[u8], endian: Endian, events: &mut dyn Events) -> Result<(), Error>;
}
pub trait ValueDim0FromBytes {
fn convert(&self, ts: u64, pulse: u64, buf: &[u8], endian: Endian, events: &mut dyn Events) -> Result<(), Error>;
}
pub trait ValueDim1FromBytes {
fn convert(&self, ts: u64, pulse: u64, buf: &[u8], endian: Endian, events: &mut dyn Events) -> Result<(), Error>;
}
pub struct ValueDim0FromBytesImpl<STY>
where
STY: ScalarOps,
{
_m1: PhantomData<STY>,
}
impl<STY> ValueDim0FromBytesImpl<STY>
where
STY: ScalarOps + ScalarValueFromBytes<STY>,
{
fn boxed() -> Box<dyn ValueFromBytes> {
Box::new(Self {
_m1: Default::default(),
})
}
}
impl<STY> ValueDim0FromBytes for ValueDim0FromBytesImpl<STY>
where
STY: ScalarOps + ScalarValueFromBytes<STY>,
{
fn convert(&self, ts: u64, pulse: u64, buf: &[u8], endian: Endian, events: &mut dyn Events) -> Result<(), Error> {
if let Some(evs) = events.as_any_mut().downcast_mut::<EventsDim0<STY>>() {
let v = <STY as ScalarValueFromBytes<STY>>::convert(buf, endian)?;
evs.values.push_back(v);
evs.tss.push_back(ts);
evs.pulses.push_back(pulse);
Ok(())
} else {
Err(Error::with_msg_no_trace("unexpected container"))
}
}
}
impl<STY> ValueFromBytes for ValueDim0FromBytesImpl<STY>
where
STY: ScalarOps + ScalarValueFromBytes<STY>,
{
fn convert(&self, ts: u64, pulse: u64, buf: &[u8], endian: Endian, events: &mut dyn Events) -> Result<(), Error> {
ValueDim0FromBytes::convert(self, ts, pulse, buf, endian, events)
}
}
pub struct ValueDim1FromBytesImpl<STY>
where
STY: ScalarOps,
{
shape: Shape,
_m1: PhantomData<STY>,
}
impl<STY> ValueDim1FromBytesImpl<STY>
where
STY: ScalarOps + ScalarValueFromBytes<STY>,
{
fn boxed(shape: Shape) -> Box<dyn ValueFromBytes> {
Box::new(Self {
shape,
_m1: Default::default(),
})
}
}
impl<STY> ValueFromBytes for ValueDim1FromBytesImpl<STY>
where
STY: ScalarOps + ScalarValueFromBytes<STY>,
{
fn convert(&self, ts: u64, pulse: u64, buf: &[u8], endian: Endian, events: &mut dyn Events) -> Result<(), Error> {
ValueDim1FromBytes::convert(self, ts, pulse, buf, endian, events)
}
}
impl<STY> ValueDim1FromBytes for ValueDim1FromBytesImpl<STY>
where
STY: ScalarOps + ScalarValueFromBytes<STY>,
{
fn convert(&self, ts: u64, pulse: u64, buf: &[u8], endian: Endian, events: &mut dyn Events) -> Result<(), Error> {
if let Some(evs) = events.as_any_mut().downcast_mut::<EventsDim1<STY>>() {
let n = if let Shape::Wave(n) = self.shape {
n
} else {
return Err(Error::with_msg_no_trace("ValueDim1FromBytesImpl bad shape"));
};
let v = <STY as ScalarValueFromBytes<STY>>::convert_dim1(buf, endian, n as _)?;
evs.values.push_back(v);
evs.tss.push_back(ts);
evs.pulses.push_back(pulse);
Ok(())
} else {
Err(Error::with_msg_no_trace("unexpected container"))
}
}
}
fn make_scalar_conv(
scalar_type: &ScalarType,
shape: &Shape,
agg_kind: &AggKind,
) -> Result<Box<dyn ValueFromBytes>, Error> {
let ret = match agg_kind {
AggKind::EventBlobs => todo!("make_scalar_conv EventBlobs"),
AggKind::Plain | AggKind::DimXBinsN(_) | AggKind::DimXBins1 | AggKind::TimeWeightedScalar => match shape {
Shape::Scalar => match scalar_type {
ScalarType::U8 => ValueDim0FromBytesImpl::<u8>::boxed(),
ScalarType::U16 => ValueDim0FromBytesImpl::<u16>::boxed(),
ScalarType::U32 => ValueDim0FromBytesImpl::<u32>::boxed(),
ScalarType::U64 => ValueDim0FromBytesImpl::<u64>::boxed(),
ScalarType::I8 => ValueDim0FromBytesImpl::<i8>::boxed(),
ScalarType::I16 => ValueDim0FromBytesImpl::<i16>::boxed(),
ScalarType::I32 => ValueDim0FromBytesImpl::<i32>::boxed(),
ScalarType::I64 => ValueDim0FromBytesImpl::<i64>::boxed(),
ScalarType::F32 => ValueDim0FromBytesImpl::<f32>::boxed(),
ScalarType::F64 => ValueDim0FromBytesImpl::<f64>::boxed(),
ScalarType::BOOL => ValueDim0FromBytesImpl::<bool>::boxed(),
ScalarType::STRING => ValueDim0FromBytesImpl::<String>::boxed(),
},
Shape::Wave(_) => {
let shape = shape.clone();
match scalar_type {
ScalarType::U8 => ValueDim1FromBytesImpl::<u8>::boxed(shape),
ScalarType::U16 => ValueDim1FromBytesImpl::<u16>::boxed(shape),
ScalarType::U32 => ValueDim1FromBytesImpl::<u32>::boxed(shape),
ScalarType::U64 => ValueDim1FromBytesImpl::<u64>::boxed(shape),
ScalarType::I8 => ValueDim1FromBytesImpl::<i8>::boxed(shape),
ScalarType::I16 => ValueDim1FromBytesImpl::<i16>::boxed(shape),
ScalarType::I32 => ValueDim1FromBytesImpl::<i32>::boxed(shape),
ScalarType::I64 => ValueDim1FromBytesImpl::<i64>::boxed(shape),
ScalarType::F32 => ValueDim1FromBytesImpl::<f32>::boxed(shape),
ScalarType::F64 => ValueDim1FromBytesImpl::<f64>::boxed(shape),
ScalarType::BOOL => ValueDim1FromBytesImpl::<bool>::boxed(shape),
ScalarType::STRING => ValueDim1FromBytesImpl::<String>::boxed(shape),
}
}
Shape::Image(_, _) => todo!("make_scalar_conv Image"),
},
};
Ok(ret)
}
pub trait EventValueFromBytes<NTY, END>
where
NTY: NumFromBytes<NTY, END>,
@@ -339,18 +606,102 @@ where
}
pub struct EventsDynStream {
scalar_type: ScalarType,
shape: Shape,
agg_kind: AggKind,
events_full: EventChunkerMultifile,
events_out: Box<dyn Events>,
scalar_conv: Box<dyn ValueFromBytes>,
emit_threshold: usize,
done: bool,
complete: bool,
}
impl EventsDynStream {
pub fn new(events_full: EventChunkerMultifile) -> Self {
Self {
pub fn new(
scalar_type: ScalarType,
shape: Shape,
agg_kind: AggKind,
events_full: EventChunkerMultifile,
) -> Result<Self, Error> {
let st = &scalar_type;
let sh = &shape;
let ag = &agg_kind;
let events_out = items_2::empty_events_dyn_ev(st, sh, ag)?;
let scalar_conv = make_scalar_conv(st, sh, ag)?;
let emit_threshold = match &shape {
Shape::Scalar => 2048,
Shape::Wave(_) => 64,
Shape::Image(_, _) => 1,
};
let ret = Self {
scalar_type,
shape,
agg_kind,
events_full,
events_out,
scalar_conv,
emit_threshold,
done: false,
complete: false,
};
Ok(ret)
}
fn replace_events_out(&mut self) -> Result<Box<dyn Events>, Error> {
let st = &self.scalar_type;
let sh = &self.shape;
let ag = &self.agg_kind;
let empty = items_2::empty_events_dyn_ev(st, sh, ag)?;
let evs = mem::replace(&mut self.events_out, empty);
Ok(evs)
}
fn handle_event_full(&mut self, item: EventFull) -> Result<(), Error> {
use items::WithLen;
if item.len() >= self.emit_threshold {
info!("handle_event_full item len {}", item.len());
}
for (((buf, &be), &ts), &pulse) in item
.blobs
.iter()
.zip(item.be.iter())
.zip(item.tss.iter())
.zip(item.pulses.iter())
{
let endian = if be { Endian::Big } else { Endian::Little };
self.scalar_conv
.convert(ts, pulse, buf, endian, self.events_out.as_mut())?;
}
Ok(())
}
fn handle_stream_item(
&mut self,
item: StreamItem<RangeCompletableItem<EventFull>>,
) -> Result<Option<Sitemty<Box<dyn items_0::Events>>>, Error> {
let ret = match item {
StreamItem::DataItem(item) => match item {
RangeCompletableItem::RangeComplete => {
Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))
}
RangeCompletableItem::Data(item) => match self.handle_event_full(item) {
Ok(()) => {
// TODO collect stats.
if self.events_out.len() >= self.emit_threshold {
let evs = self.replace_events_out()?;
Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(evs))))
} else {
None
}
}
Err(e) => Some(Err(e)),
},
},
StreamItem::Log(item) => Some(Ok(StreamItem::Log(item))),
StreamItem::Stats(item) => Some(Ok(StreamItem::Stats(item))),
};
Ok(ret)
}
}
@@ -367,16 +718,30 @@ impl Stream for EventsDynStream {
Ready(None)
} else {
match self.events_full.poll_next_unpin(cx) {
Ready(Some(Ok(k))) => {
todo!()
}
Ready(Some(Ok(item))) => match self.handle_stream_item(item) {
Ok(Some(item)) => Ready(Some(item)),
Ok(None) => continue,
Err(e) => {
self.done = true;
Ready(Some(Err(e)))
}
},
Ready(Some(Err(e))) => {
self.done = true;
Ready(Some(Err(e)))
}
Ready(None) => {
self.done = true;
continue;
// Produce a last one even if it is empty.
match self.replace_events_out() {
Ok(item) => {
self.done = true;
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))))
}
Err(e) => {
self.done = true;
Ready(Some(Err(e)))
}
}
}
Pending => Pending,
}

View File

@@ -3,6 +3,7 @@ pub mod agg;
pub mod aggtest;
pub mod binnedstream;
pub mod cache;
pub mod channelconfig;
pub mod dataopen;
pub mod decode;
pub mod eventblobs;
@@ -20,12 +21,17 @@ pub mod streamlog;
use bytes::Bytes;
use bytes::BytesMut;
use err::Error;
use futures_core::Stream;
use futures_util::future::FusedFuture;
use futures_util::{FutureExt, StreamExt, TryFutureExt};
use futures_util::FutureExt;
use futures_util::Stream;
use futures_util::StreamExt;
use futures_util::TryFutureExt;
use netpod::log::*;
use netpod::ChannelConfig;
use netpod::DiskIoTune;
use netpod::Node;
use netpod::ReadSys;
use netpod::{ChannelConfig, DiskIoTune, Node, Shape};
use netpod::Shape;
use std::collections::VecDeque;
use std::future::Future;
use std::io::SeekFrom;
@@ -37,12 +43,17 @@ use std::task::Context;
use std::task::Poll;
use std::time::Duration;
use std::time::Instant;
use streams::dtflags::{ARRAY, BIG_ENDIAN, COMPRESSION, SHAPE};
use streams::dtflags::ARRAY;
use streams::dtflags::BIG_ENDIAN;
use streams::dtflags::COMPRESSION;
use streams::dtflags::SHAPE;
use streams::filechunkread::FileChunkRead;
use tokio::fs::File;
use tokio::fs::OpenOptions;
use tokio::io::AsyncRead;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncSeekExt;
use tokio::io::ReadBuf;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeekExt};
use tokio::sync::mpsc;
// TODO transform this into a self-test or remove.
@@ -257,14 +268,14 @@ fn start_read5(
}
};
let mut pos = pos_beg;
info!("start_read5 BEGIN {disk_io_tune:?}");
info!("read5 begin {disk_io_tune:?}");
loop {
let mut buf = BytesMut::new();
buf.resize(1024 * 256, 0);
buf.resize(disk_io_tune.read_buffer_len, 0);
match tokio::time::timeout(Duration::from_millis(8000), file.read(&mut buf)).await {
Ok(Ok(n)) => {
if n == 0 {
info!("read5 EOF pos_beg {pos_beg} pos {pos} path {path:?}");
//info!("read5 EOF pos_beg {pos_beg} pos {pos} path {path:?}");
break;
}
pos += n as u64;
@@ -273,7 +284,7 @@ fn start_read5(
match tx.send(Ok(item)).await {
Ok(()) => {}
Err(_) => {
error!("broken channel");
//error!("broken channel");
break;
}
}
@@ -283,7 +294,7 @@ fn start_read5(
break;
}
Err(_) => {
error!("broken channel");
//error!("broken channel");
break;
}
},
@@ -294,7 +305,7 @@ fn start_read5(
match tx.send(Err(e)).await {
Ok(()) => {}
Err(_e) => {
error!("broken channel");
//error!("broken channel");
break;
}
}
@@ -302,6 +313,8 @@ fn start_read5(
}
}
}
let n = pos - pos_beg;
info!("read5 done {n}");
};
tokio::task::spawn(fut);
Ok(())

View File

@@ -1,16 +1,26 @@
use crate::dataopen::{open_expanded_files, open_files, OpenedFileSet};
use crate::dataopen::open_expanded_files;
use crate::dataopen::open_files;
use crate::dataopen::OpenedFileSet;
use crate::merge::MergedStream;
use err::Error;
use futures_core::Stream;
use futures_util::Stream;
use futures_util::StreamExt;
use items::eventfull::EventFull;
use items::{LogItem, RangeCompletableItem, Sitemty, StreamItem};
use items::LogItem;
use items::RangeCompletableItem;
use items::Sitemty;
use items::StreamItem;
use netpod::log::*;
use netpod::timeunits::SEC;
use netpod::{ChannelConfig, DiskIoTune, NanoRange, Node};
use netpod::ChannelConfig;
use netpod::DiskIoTune;
use netpod::NanoRange;
use netpod::Node;
use std::pin::Pin;
use std::task::{Context, Poll};
use streams::eventchunker::{EventChunker, EventChunkerConf};
use std::task::Context;
use std::task::Poll;
use streams::eventchunker::EventChunker;
use streams::eventchunker::EventChunkerConf;
use streams::rangefilter::RangeFilter;
pub trait InputTraits: Stream<Item = Sitemty<EventFull>> {}
@@ -97,7 +107,7 @@ impl Stream for EventChunkerMultifile {
Some(evs) => match evs.poll_next_unpin(cx) {
Ready(Some(k)) => {
if let Ok(StreamItem::DataItem(RangeCompletableItem::Data(h))) = &k {
if let Some(&g) = h.tss.last() {
if let Some(&g) = h.tss.back() {
if g == self.max_ts {
let msg = format!("EventChunkerMultifile repeated ts {}", g);
error!("{}", msg);

View File

@@ -1,16 +1,24 @@
pub mod mergedblobsfromremotes;
use err::Error;
use futures_core::Stream;
use futures_util::Stream;
use futures_util::StreamExt;
use items::Appendable;
use items::ByteEstimate;
use items::{Appendable, LogItem, PushableIndex, RangeCompletableItem, Sitemty, StatsItem, StreamItem, WithTimestamps};
use items::LogItem;
use items::PushableIndex;
use items::RangeCompletableItem;
use items::Sitemty;
use items::StatsItem;
use items::StreamItem;
use items::WithTimestamps;
use netpod::histo::HistoLog2;
use netpod::log::*;
use netpod::ByteSize;
use std::collections::VecDeque;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::task::Context;
use std::task::Poll;
const LOG_EMIT_ITEM: bool = false;

View File

@@ -1,15 +1,18 @@
use crate::merge::MergedStream;
use err::Error;
use futures_core::Stream;
use futures_util::{pin_mut, StreamExt};
use futures_util::pin_mut;
use futures_util::Stream;
use futures_util::StreamExt;
use items::eventfull::EventFull;
use items::Sitemty;
use netpod::log::*;
use netpod::query::PlainEventsQuery;
use netpod::{Cluster, PerfOpts};
use netpod::Cluster;
use netpod::PerfOpts;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::task::Context;
use std::task::Poll;
use streams::tcprawclient::x_processed_event_blobs_stream_from_node;
type T001<T> = Pin<Box<dyn Stream<Item = Sitemty<T>> + Send>>;

View File

@@ -1,31 +1,16 @@
use crate::decode::BigEndian;
use crate::decode::Endianness;
use crate::decode::EventValueFromBytes;
use crate::decode::EventValueShape;
use crate::decode::EventValuesDim0Case;
use crate::decode::EventValuesDim1Case;
use crate::decode::EventsDecodedStream;
use crate::decode::LittleEndian;
use crate::decode::NumFromBytes;
use crate::eventblobs::EventChunkerMultifile;
use err::Error;
use futures_util::stream;
use futures_util::Stream;
use futures_util::StreamExt;
use items::eventfull::EventFull;
use items::numops::BoolNum;
use items::numops::NumOps;
use items::numops::StringNum;
use items::EventsNodeProcessor;
use items::Framable;
use items::RangeCompletableItem;
use items::Sitemty;
use items::StreamItem;
use items_2::channelevents::ChannelEvents;
use items_2::eventsdim0::EventsDim0;
use netpod::log::*;
use netpod::query::PlainEventsQuery;
use netpod::AggKind;
use netpod::ByteOrder;
use netpod::ByteSize;
use netpod::Channel;
use netpod::DiskIoTune;
@@ -37,167 +22,35 @@ use parse::channelconfig::extract_matching_config_entry;
use parse::channelconfig::read_local_config;
use parse::channelconfig::ConfigEntry;
use parse::channelconfig::MatchingConfigEntry;
use std::collections::VecDeque;
use std::pin::Pin;
use streams::eventchunker::EventChunkerConf;
fn make_num_pipeline_stream_evs<NTY, END, EVS, ENP>(
event_value_shape: EVS,
events_node_proc: ENP,
fn make_num_pipeline_stream_evs(
scalar_type: ScalarType,
shape: Shape,
agg_kind: AggKind,
event_blobs: EventChunkerMultifile,
) -> Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>>
where
NTY: NumOps + NumFromBytes<NTY, END> + 'static,
END: Endianness + 'static,
EVS: EventValueShape<NTY, END> + EventValueFromBytes<NTY, END> + 'static,
ENP: EventsNodeProcessor<Input = <EVS as EventValueFromBytes<NTY, END>>::Batch> + 'static,
Sitemty<<ENP as EventsNodeProcessor>::Output>: Framable + 'static,
<ENP as EventsNodeProcessor>::Output: 'static,
{
let decs = EventsDecodedStream::<NTY, END, EVS>::new(event_value_shape, event_blobs);
let s2 = StreamExt::map(decs, move |item| match item {
) -> Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>> {
let event_stream = match crate::decode::EventsDynStream::new(scalar_type, shape, agg_kind, event_blobs) {
Ok(k) => k,
Err(e) => {
return Box::pin(stream::iter([Err(e)]));
}
};
let stream = event_stream.map(|item| match item {
Ok(item) => match item {
StreamItem::DataItem(item) => match item {
RangeCompletableItem::Data(item) => {
// TODO fix super ugly slow glue code
use items::EventsNodeProcessorOutput;
let mut item = events_node_proc.process(item);
if let Some(item) = item
.as_any_mut()
.downcast_mut::<items::scalarevents::ScalarEvents<NTY>>()
{
trace!("ScalarEvents");
let tss: VecDeque<u64> = item.tss.iter().map(|x| *x).collect();
let pulses: VecDeque<u64> = item.pulses.iter().map(|x| *x).collect();
let values: VecDeque<NTY> = item.values.iter().map(|x| x.clone()).collect();
let item = EventsDim0 { tss, pulses, values };
let item = ChannelEvents::Events(Box::new(item));
Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))
} else if let Some(item) = item.as_any_mut().downcast_mut::<items::waveevents::WaveEvents<NTY>>() {
trace!("WaveEvents");
let _tss: VecDeque<u64> = item.tss.iter().map(|x| *x).collect();
let _pulses: VecDeque<u64> = item.pulses.iter().map(|x| *x).collect();
let _values: VecDeque<Vec<NTY>> = item.vals.iter().map(|x| x.clone()).collect();
//let item = EventsDim1 { tss, pulses, values };
//let item = ChannelEvents::Events(Box::new(item));
//Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))
Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))
} else if let Some(item) = item
.as_any_mut()
.downcast_mut::<items::xbinnedscalarevents::XBinnedScalarEvents<NTY>>()
{
trace!("XBinnedScalarEvents");
let tss: VecDeque<u64> = item.tss.iter().map(|x| *x).collect();
let pulses: VecDeque<u64> = (0..tss.len()).map(|_| 0).collect();
let _avgs: VecDeque<f32> = item.avgs.iter().map(|x| x.clone()).collect();
let mins: VecDeque<NTY> = item.mins.iter().map(|x| x.clone()).collect();
let _maxs: VecDeque<NTY> = item.maxs.iter().map(|x| x.clone()).collect();
let item = EventsDim0 {
tss,
pulses,
values: mins,
};
let item = ChannelEvents::Events(Box::new(item));
Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))
} else {
error!("TODO bad, no idea what this item is\n\n{:?}\n\n", item);
Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))
}
}
RangeCompletableItem::RangeComplete => Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)),
RangeCompletableItem::Data(item) => Ok(StreamItem::DataItem(RangeCompletableItem::Data(
ChannelEvents::Events(item),
))),
},
StreamItem::Log(item) => Ok(StreamItem::Log(item)),
StreamItem::Stats(item) => Ok(StreamItem::Stats(item)),
StreamItem::Log(k) => Ok(StreamItem::Log(k)),
StreamItem::Stats(k) => Ok(StreamItem::Stats(k)),
},
Err(e) => Err(e),
});
Box::pin(s2)
}
macro_rules! pipe4 {
($nty:ident, $end:ident, $shape:expr, $evs:ident, $evsv:expr, $agg_kind:expr, $event_blobs:expr) => {
match $agg_kind {
AggKind::EventBlobs => panic!(),
AggKind::TimeWeightedScalar | AggKind::DimXBins1 => {
make_num_pipeline_stream_evs::<$nty, $end, $evs<$nty>, _>(
$evsv,
<$evs<$nty> as EventValueShape<$nty, $end>>::NumXAggToSingleBin::create($shape, $agg_kind),
$event_blobs,
)
}
AggKind::DimXBinsN(_) => make_num_pipeline_stream_evs::<$nty, $end, $evs<$nty>, _>(
$evsv,
<$evs<$nty> as EventValueShape<$nty, $end>>::NumXAggToNBins::create($shape, $agg_kind),
$event_blobs,
),
AggKind::Plain => make_num_pipeline_stream_evs::<$nty, $end, $evs<$nty>, _>(
$evsv,
<$evs<$nty> as EventValueShape<$nty, $end>>::NumXAggPlain::create($shape, $agg_kind),
$event_blobs,
),
}
};
}
macro_rules! pipe3 {
($nty:ident, $end:ident, $shape:expr, $agg_kind:expr, $event_blobs:expr) => {
match $shape {
Shape::Scalar => {
pipe4!(
$nty,
$end,
$shape,
EventValuesDim0Case,
EventValuesDim0Case::<$nty>::new(),
$agg_kind,
$event_blobs
)
}
Shape::Wave(n) => {
pipe4!(
$nty,
$end,
$shape,
EventValuesDim1Case,
EventValuesDim1Case::<$nty>::new(n),
$agg_kind,
$event_blobs
)
}
Shape::Image(_, _) => {
// TODO not needed for python data api v3 protocol, but later for api4.
err::todoval()
}
}
};
}
macro_rules! pipe2 {
($nty:ident, $end:expr, $shape:expr, $agg_kind:expr, $event_blobs:expr) => {
match $end {
ByteOrder::Little => pipe3!($nty, LittleEndian, $shape, $agg_kind, $event_blobs),
ByteOrder::Big => pipe3!($nty, BigEndian, $shape, $agg_kind, $event_blobs),
}
};
}
macro_rules! pipe1 {
($nty:expr, $end:expr, $shape:expr, $agg_kind:expr, $event_blobs:expr) => {
match $nty {
ScalarType::U8 => pipe2!(u8, $end, $shape, $agg_kind, $event_blobs),
ScalarType::U16 => pipe2!(u16, $end, $shape, $agg_kind, $event_blobs),
ScalarType::U32 => pipe2!(u32, $end, $shape, $agg_kind, $event_blobs),
ScalarType::U64 => pipe2!(u64, $end, $shape, $agg_kind, $event_blobs),
ScalarType::I8 => pipe2!(i8, $end, $shape, $agg_kind, $event_blobs),
ScalarType::I16 => pipe2!(i16, $end, $shape, $agg_kind, $event_blobs),
ScalarType::I32 => pipe2!(i32, $end, $shape, $agg_kind, $event_blobs),
ScalarType::I64 => pipe2!(i64, $end, $shape, $agg_kind, $event_blobs),
ScalarType::F32 => pipe2!(f32, $end, $shape, $agg_kind, $event_blobs),
ScalarType::F64 => pipe2!(f64, $end, $shape, $agg_kind, $event_blobs),
ScalarType::BOOL => pipe2!(BoolNum, $end, $shape, $agg_kind, $event_blobs),
ScalarType::STRING => pipe2!(StringNum, $end, $shape, $agg_kind, $event_blobs),
}
};
Box::pin(stream)
}
pub async fn make_event_pipe(
@@ -262,12 +115,11 @@ pub async fn make_event_pipe(
true,
);
let shape = entry.to_shape()?;
let pipe = pipe1!(
entry.scalar_type,
entry.byte_order,
shape,
let pipe = make_num_pipeline_stream_evs(
entry.scalar_type.clone(),
shape.clone(),
evq.agg_kind().clone(),
event_blobs
event_blobs,
);
Ok(pipe)
}