Move workspace crates into subfolder

This commit is contained in:
Dominik Werder
2023-07-10 14:45:25 +02:00
parent 8938e55f86
commit 30c7fcb1e5
212 changed files with 246 additions and 41 deletions

View File

@@ -0,0 +1,59 @@
use futures_util::stream::StreamExt;
use futures_util::Stream;
use items_0::on_sitemty_data;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
use items_0::transform::EventStreamBox;
use items_0::transform::TransformProperties;
use items_0::transform::WithTransformProperties;
use items_0::Events;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
pub struct IntoBoxedEventStream<INP, T>
where
T: Events,
INP: Stream<Item = Sitemty<T>> + WithTransformProperties,
{
//inp: Pin<Box<dyn Stream<Item = Sitemty<T>>>>,
inp: Pin<Box<INP>>,
}
impl<INP, T> Stream for IntoBoxedEventStream<INP, T>
where
T: Events,
INP: Stream<Item = Sitemty<T>> + WithTransformProperties,
{
type Item = Sitemty<Box<dyn Events>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
match self.inp.poll_next_unpin(cx) {
Ready(Some(item)) => Ready(Some(match item {
Ok(item) => Ok(match item {
StreamItem::DataItem(item) => StreamItem::DataItem(match item {
RangeCompletableItem::RangeComplete => RangeCompletableItem::RangeComplete,
RangeCompletableItem::Data(item) => RangeCompletableItem::Data(Box::new(item)),
}),
StreamItem::Log(item) => StreamItem::Log(item),
StreamItem::Stats(item) => StreamItem::Stats(item),
}),
Err(e) => Err(e),
})),
Ready(None) => Ready(None),
Pending => Pending,
}
}
}
impl<INP, T> WithTransformProperties for IntoBoxedEventStream<INP, T>
where
T: Events,
INP: Stream<Item = Sitemty<T>> + WithTransformProperties,
{
fn query_transform_properties(&self) -> TransformProperties {
self.inp.query_transform_properties()
}
}

View File

@@ -0,0 +1,317 @@
use err::Error;
use futures_util::Future;
use futures_util::FutureExt;
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::collect_s::Collectable;
use items_0::collect_s::Collected;
use items_0::collect_s::Collector;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StatsItem;
use items_0::streamitem::StreamItem;
use items_0::transform::CollectableStreamBox;
use items_0::transform::CollectableStreamTrait;
use items_0::transform::EventStreamTrait;
use items_0::WithLen;
use netpod::log::*;
use netpod::range::evrange::SeriesRange;
use netpod::BinnedRangeEnum;
use netpod::DiskStats;
use std::fmt;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use std::time::Duration;
use std::time::Instant;
use tracing::Instrument;
#[allow(unused)]
macro_rules! trace2 {
(D$($arg:tt)*) => ();
($($arg:tt)*) => (eprintln!($($arg)*));
}
#[allow(unused)]
macro_rules! trace3 {
(D$($arg:tt)*) => ();
($($arg:tt)*) => (eprintln!($($arg)*));
}
#[allow(unused)]
macro_rules! trace4 {
(D$($arg:tt)*) => ();
($($arg:tt)*) => (eprintln!($($arg)*));
}
pub struct Collect {
inp: Pin<Box<dyn Stream<Item = Sitemty<Box<dyn Collectable>>> + Send>>,
events_max: u64,
range: Option<SeriesRange>,
binrange: Option<BinnedRangeEnum>,
collector: Option<Box<dyn Collector>>,
range_final: bool,
timeout: bool,
timer: Pin<Box<dyn Future<Output = ()> + Send>>,
done_input: bool,
}
impl Collect {
pub fn new(
inp: Pin<Box<dyn Stream<Item = Sitemty<Box<dyn Collectable>>> + Send>>,
deadline: Instant,
events_max: u64,
range: Option<SeriesRange>,
binrange: Option<BinnedRangeEnum>,
) -> Self {
let timer = tokio::time::sleep_until(deadline.into());
Self {
inp,
events_max,
range,
binrange,
collector: None,
range_final: false,
timeout: false,
timer: Box::pin(timer),
done_input: false,
}
}
fn handle_item(&mut self, item: Sitemty<Box<dyn Collectable>>) -> Result<(), Error> {
match item {
Ok(item) => match item {
StreamItem::DataItem(item) => match item {
RangeCompletableItem::RangeComplete => {
self.range_final = true;
if let Some(coll) = self.collector.as_mut() {
coll.set_range_complete();
} else {
warn!("collect received RangeComplete but no collector yet");
}
Ok(())
}
RangeCompletableItem::Data(mut item) => {
trace!("collect sees len {}", item.len());
let coll = self.collector.get_or_insert_with(|| item.new_collector());
coll.ingest(&mut item);
if coll.len() as u64 >= self.events_max {
warn!(
"TODO compute continue-at reached events_max {} abort",
self.events_max
);
}
Ok(())
}
},
StreamItem::Log(item) => {
trace!("collect log {:?}", item);
Ok(())
}
StreamItem::Stats(item) => {
trace!("collect stats {:?}", item);
match item {
// TODO factor and simplify the stats collection:
StatsItem::EventDataReadStats(_) => {}
StatsItem::RangeFilterStats(_) => {}
StatsItem::DiskStats(item) => match item {
DiskStats::OpenStats(k) => {
//total_duration += k.duration;
}
DiskStats::SeekStats(k) => {
//total_duration += k.duration;
}
DiskStats::ReadStats(k) => {
//total_duration += k.duration;
}
DiskStats::ReadExactStats(k) => {
//total_duration += k.duration;
}
},
}
Ok(())
}
},
Err(e) => {
// TODO Need to use some flags to get good enough error message for remote user.
Err(e)
}
}
}
}
impl Future for Collect {
type Output = Result<Box<dyn Collected>, Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
use Poll::*;
let span = tracing::span!(Level::INFO, "Collect");
let _spg = span.enter();
loop {
break if self.done_input {
if self.timeout {
if let Some(coll) = self.collector.as_mut() {
coll.set_timed_out();
} else {
warn!("collect timeout but no collector yet");
}
}
// TODO use range_final and timeout in result.
match self.collector.take() {
Some(mut coll) => match coll.result(self.range.clone(), self.binrange.clone()) {
Ok(res) => {
//info!("collect stats total duration: {:?}", total_duration);
Ready(Ok(res))
}
Err(e) => Ready(Err(e)),
},
None => {
let e = Error::with_msg_no_trace(format!("no result because no collector was created"));
error!("{e}");
Ready(Err(e))
}
}
} else {
match self.timer.poll_unpin(cx) {
Ready(()) => {
self.timeout = true;
self.done_input = true;
continue;
}
Pending => match self.inp.poll_next_unpin(cx) {
Ready(Some(item)) => match self.handle_item(item) {
Ok(()) => {
continue;
}
Err(e) => {
error!("{e}");
Ready(Err(e))
}
},
Ready(None) => {
self.done_input = true;
continue;
}
Pending => Pending,
},
}
};
}
}
}
async fn collect_in_span<T, S>(
stream: S,
deadline: Instant,
events_max: u64,
range: Option<SeriesRange>,
binrange: Option<BinnedRangeEnum>,
) -> Result<Box<dyn Collected>, Error>
where
S: Stream<Item = Sitemty<T>> + Unpin,
T: Collectable,
{
info!("collect events_max {events_max} deadline {deadline:?}");
let mut collector: Option<Box<dyn Collector>> = None;
let mut stream = stream;
let deadline = deadline.into();
let mut range_complete = false;
let mut timed_out = false;
let mut total_duration = Duration::ZERO;
loop {
let item = match tokio::time::timeout_at(deadline, stream.next()).await {
Ok(Some(k)) => k,
Ok(None) => break,
Err(_e) => {
warn!("collect timeout");
timed_out = true;
if let Some(coll) = collector.as_mut() {
coll.set_timed_out();
} else {
warn!("collect timeout but no collector yet");
}
break;
}
};
match item {
Ok(item) => match item {
StreamItem::DataItem(item) => match item {
RangeCompletableItem::RangeComplete => {
range_complete = true;
if let Some(coll) = collector.as_mut() {
coll.set_range_complete();
} else {
warn!("collect received RangeComplete but no collector yet");
}
}
RangeCompletableItem::Data(mut item) => {
trace!("collect sees len {}", item.len());
if collector.is_none() {
let c = item.new_collector();
collector = Some(c);
}
let coll = collector.as_mut().unwrap();
coll.ingest(&mut item);
if coll.len() as u64 >= events_max {
warn!("TODO compute continue-at reached events_max {} abort", events_max);
break;
}
}
},
StreamItem::Log(item) => {
trace!("collect log {:?}", item);
}
StreamItem::Stats(item) => {
trace!("collect stats {:?}", item);
match item {
// TODO factor and simplify the stats collection:
StatsItem::EventDataReadStats(_) => {}
StatsItem::RangeFilterStats(_) => {}
StatsItem::DiskStats(item) => match item {
DiskStats::OpenStats(k) => {
total_duration += k.duration;
}
DiskStats::SeekStats(k) => {
total_duration += k.duration;
}
DiskStats::ReadStats(k) => {
total_duration += k.duration;
}
DiskStats::ReadExactStats(k) => {
total_duration += k.duration;
}
},
}
}
},
Err(e) => {
// TODO Need to use some flags to get good enough error message for remote user.
return Err(e);
}
}
}
let _ = range_complete;
let _ = timed_out;
let res = collector
.ok_or_else(|| Error::with_msg_no_trace(format!("no result because no collector was created")))?
.result(range, binrange)?;
info!("collect stats total duration: {:?}", total_duration);
Ok(res)
}
pub async fn collect<T, S>(
stream: S,
deadline: Instant,
events_max: u64,
range: Option<SeriesRange>,
binrange: Option<BinnedRangeEnum>,
) -> Result<Box<dyn Collected>, Error>
where
S: Stream<Item = Sitemty<T>> + Unpin,
T: Collectable + WithLen + fmt::Debug,
{
let span = span!(Level::INFO, "collect");
collect_in_span(stream, deadline, events_max, range, binrange)
.instrument(span)
.await
}

View File

@@ -0,0 +1,4 @@
pub const COMPRESSION: u8 = 0x80;
pub const ARRAY: u8 = 0x40;
pub const BIG_ENDIAN: u8 = 0x20;
pub const SHAPE: u8 = 0x10;

View File

@@ -0,0 +1,55 @@
use bytes::BytesMut;
use std::fmt;
use std::time::Duration;
pub struct FileChunkRead {
buf: BytesMut,
duration: Duration,
}
impl FileChunkRead {
pub fn with_buf(buf: BytesMut) -> Self {
Self {
buf,
duration: Duration::from_millis(0),
}
}
pub fn with_buf_dur(buf: BytesMut, duration: Duration) -> Self {
Self { buf, duration }
}
pub fn into_buf(self) -> BytesMut {
self.buf
}
pub fn buf(&self) -> &BytesMut {
&self.buf
}
pub fn buf_mut(&mut self) -> &mut BytesMut {
&mut self.buf
}
pub fn buf_take(&mut self) -> BytesMut {
core::mem::replace(&mut self.buf, BytesMut::new())
}
pub fn duration(&self) -> &Duration {
&self.duration
}
pub fn duration_mut(&mut self) -> &mut Duration {
&mut self.duration
}
}
impl fmt::Debug for FileChunkRead {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("FileChunkRead")
.field("buf.len", &self.buf.len())
.field("buf.cap", &self.buf.capacity())
.field("duration", &self.duration)
.finish()
}
}

View File

@@ -0,0 +1,2 @@
pub mod eventsfromframes;
pub mod inmem;

View File

@@ -0,0 +1,99 @@
use err::Error;
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::framable::FrameTypeInnerStatic;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
use items_2::frame::decode_frame;
use items_2::inmem::InMemoryFrame;
use netpod::log::*;
use serde::de::DeserializeOwned;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
pub struct EventsFromFrames<O> {
inp: Pin<Box<dyn Stream<Item = Result<StreamItem<InMemoryFrame>, Error>> + Send>>,
dbgdesc: String,
errored: bool,
completed: bool,
_m1: PhantomData<O>,
}
impl<O> EventsFromFrames<O> {
pub fn new(
inp: Pin<Box<dyn Stream<Item = Result<StreamItem<InMemoryFrame>, Error>> + Send>>,
dbgdesc: String,
) -> Self {
Self {
inp,
dbgdesc,
errored: false,
completed: false,
_m1: PhantomData,
}
}
}
impl<O> Stream for EventsFromFrames<O>
where
O: FrameTypeInnerStatic + DeserializeOwned + Unpin,
{
type Item = Sitemty<O>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
let span = span!(Level::INFO, "EvFrFr", id = tracing::field::Empty);
span.record("id", &self.dbgdesc);
let _spg = span.enter();
loop {
break if self.completed {
panic!("poll_next on completed");
} else if self.errored {
self.completed = true;
Ready(None)
} else {
match self.inp.poll_next_unpin(cx) {
Ready(Some(Ok(item))) => match item {
StreamItem::Log(item) => {
//info!("{} {:?} {}", item.node_ix, item.level, item.msg);
Ready(Some(Ok(StreamItem::Log(item))))
}
StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))),
StreamItem::DataItem(frame) => match decode_frame::<Sitemty<O>>(&frame) {
Ok(item) => match item {
Ok(item) => match item {
StreamItem::Log(k) => {
//info!("rcvd log: {} {:?} {}", k.node_ix, k.level, k.msg);
Ready(Some(Ok(StreamItem::Log(k))))
}
item => Ready(Some(Ok(item))),
},
Err(e) => {
error!("rcvd err: {}", e);
self.errored = true;
Ready(Some(Err(e)))
}
},
Err(e) => {
error!("frame payload len {} tyid {} {}", frame.buf().len(), frame.tyid(), e);
self.errored = true;
Ready(Some(Err(e)))
}
},
},
Ready(Some(Err(e))) => {
self.errored = true;
Ready(Some(Err(e)))
}
Ready(None) => {
self.completed = true;
Ready(None)
}
Pending => Pending,
}
};
}
}
}

View File

@@ -0,0 +1,219 @@
use crate::slidebuf::SlideBuf;
use bytes::Bytes;
use err::Error;
use futures_util::pin_mut;
use futures_util::Stream;
use items_0::streamitem::StreamItem;
use items_0::streamitem::TERM_FRAME_TYPE_ID;
use items_2::framable::INMEM_FRAME_FOOT;
use items_2::framable::INMEM_FRAME_HEAD;
use items_2::framable::INMEM_FRAME_MAGIC;
use items_2::inmem::InMemoryFrame;
use netpod::log::*;
use netpod::ByteSize;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use tokio::io::AsyncRead;
use tokio::io::ReadBuf;
#[allow(unused)]
macro_rules! trace2 {
($($arg:tt)*) => ();
($($arg:tt)*) => (trace!($($arg)*));
}
impl err::ToErr for crate::slidebuf::Error {
fn to_err(self) -> Error {
Error::with_msg_no_trace(format!("{self}"))
}
}
/// Interprets a byte stream as length-delimited frames.
///
/// Emits each frame as a single item. Therefore, each item must fit easily into memory.
pub struct InMemoryFrameAsyncReadStream<T>
where
T: AsyncRead + Unpin,
{
inp: T,
buf: SlideBuf,
need_min: usize,
done: bool,
complete: bool,
inp_bytes_consumed: u64,
}
impl<T> InMemoryFrameAsyncReadStream<T>
where
T: AsyncRead + Unpin,
{
pub fn type_name() -> &'static str {
std::any::type_name::<Self>()
}
pub fn new(inp: T, bufcap: ByteSize) -> Self {
Self {
inp,
buf: SlideBuf::new(bufcap.bytes() as usize),
need_min: INMEM_FRAME_HEAD,
done: false,
complete: false,
inp_bytes_consumed: 0,
}
}
fn poll_upstream(&mut self, cx: &mut Context) -> Poll<Result<usize, Error>> {
trace2!("poll_upstream");
use Poll::*;
let mut buf = ReadBuf::new(self.buf.available_writable_area(self.need_min - self.buf.len())?);
let inp = &mut self.inp;
pin_mut!(inp);
match AsyncRead::poll_read(inp, cx, &mut buf) {
Ready(Ok(())) => {
let n = buf.filled().len();
self.buf.wadv(n)?;
trace2!("recv bytes {}", n);
Ready(Ok(n))
}
Ready(Err(e)) => Ready(Err(e.into())),
Pending => Pending,
}
}
// Try to consume bytes to parse a frame.
// Update the need_min to the most current state.
// Must only be called when at least `need_min` bytes are available.
fn parse(&mut self) -> Result<Option<InMemoryFrame>, Error> {
let buf = self.buf.data();
if buf.len() < self.need_min {
return Err(Error::with_msg_no_trace("expect at least need_min"));
}
if buf.len() < INMEM_FRAME_HEAD {
return Err(Error::with_msg_no_trace("expect at least enough bytes for the header"));
}
let magic = u32::from_le_bytes(buf[0..4].try_into()?);
let encid = u32::from_le_bytes(buf[4..8].try_into()?);
let tyid = u32::from_le_bytes(buf[8..12].try_into()?);
let len = u32::from_le_bytes(buf[12..16].try_into()?);
let payload_crc_exp = u32::from_le_bytes(buf[16..20].try_into()?);
if magic != INMEM_FRAME_MAGIC {
let n = buf.len().min(64);
let u = String::from_utf8_lossy(&buf[0..n]);
let msg = format!(
"InMemoryFrameAsyncReadStream tryparse incorrect magic: {} buf as utf8: {:?}",
magic, u
);
error!("{msg}");
return Err(Error::with_msg(msg));
}
if len > 1024 * 1024 * 50 {
let msg = format!(
"InMemoryFrameAsyncReadStream tryparse huge buffer len {} self.inp_bytes_consumed {}",
len, self.inp_bytes_consumed
);
error!("{msg}");
return Err(Error::with_msg(msg));
}
let lentot = INMEM_FRAME_HEAD + INMEM_FRAME_FOOT + len as usize;
if buf.len() < lentot {
// TODO count cases in production
self.need_min = lentot;
return Ok(None);
}
let p1 = INMEM_FRAME_HEAD + len as usize;
let mut h = crc32fast::Hasher::new();
h.update(&buf[..p1]);
let frame_crc = h.finalize();
let mut h = crc32fast::Hasher::new();
h.update(&buf[INMEM_FRAME_HEAD..p1]);
let payload_crc = h.finalize();
let frame_crc_ind = u32::from_le_bytes(buf[p1..p1 + 4].try_into()?);
let payload_crc_match = payload_crc_exp == payload_crc;
let frame_crc_match = frame_crc_ind == frame_crc;
if !frame_crc_match || !payload_crc_match {
let _ss = String::from_utf8_lossy(&buf[..buf.len().min(256)]);
let msg = format!(
"InMemoryFrameAsyncReadStream tryparse crc mismatch A {} {}",
payload_crc_match, frame_crc_match,
);
error!("{msg}");
let e = Error::with_msg_no_trace(msg);
return Err(e);
}
self.inp_bytes_consumed += lentot as u64;
// TODO metrics
//trace!("parsed frame well len {}", len);
let ret = InMemoryFrame {
len,
tyid,
encid,
buf: Bytes::from(buf[INMEM_FRAME_HEAD..p1].to_vec()),
};
self.buf.adv(lentot)?;
self.need_min = INMEM_FRAME_HEAD;
Ok(Some(ret))
}
}
impl<T> Stream for InMemoryFrameAsyncReadStream<T>
where
T: AsyncRead + Unpin,
{
type Item = Result<StreamItem<InMemoryFrame>, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
let span = span!(Level::INFO, "InMemRd");
let _spanguard = span.enter();
loop {
break if self.complete {
panic!("{} poll_next on complete", Self::type_name())
} else if self.done {
self.complete = true;
Ready(None)
} else if self.buf.len() >= self.need_min {
match self.parse() {
Ok(None) => {
if self.buf.len() >= self.need_min {
self.done = true;
let e = Error::with_msg_no_trace("enough bytes but nothing parsed");
Ready(Some(Err(e)))
} else {
continue;
}
}
Ok(Some(item)) => {
if item.tyid() == TERM_FRAME_TYPE_ID {
self.done = true;
continue;
} else {
Ready(Some(Ok(StreamItem::DataItem(item))))
}
}
Err(e) => {
self.done = true;
Ready(Some(Err(e)))
}
}
} else {
match self.poll_upstream(cx) {
Ready(Ok(n1)) => {
if n1 == 0 {
self.done = true;
continue;
} else {
continue;
}
}
Ready(Err(e)) => {
error!("poll_upstream need_min {} buf {:?} {:?}", self.need_min, self.buf, e);
self.done = true;
Ready(Some(Err(e)))
}
Pending => Pending,
}
};
}
}
}

View File

@@ -0,0 +1,317 @@
use futures_util::Future;
use futures_util::FutureExt;
use futures_util::Stream;
use items_0::container::ByteEstimate;
use items_0::streamitem::sitem_data;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
use items_0::Appendable;
use items_0::Empty;
use items_0::WithLen;
use items_2::channelevents::ChannelEvents;
use items_2::eventsdim0::EventsDim0;
use items_2::eventsdim1::EventsDim1;
use netpod::log::*;
use netpod::range::evrange::SeriesRange;
use netpod::timeunits::DAY;
use netpod::timeunits::MS;
use std::f64::consts::PI;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use std::time::Duration;
pub struct GenerateI32V00 {
ts: u64,
dts: u64,
tsend: u64,
#[allow(unused)]
c1: u64,
timeout: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
do_throttle: bool,
done: bool,
done_range_final: bool,
}
impl GenerateI32V00 {
pub fn new(node_ix: u64, node_count: u64, range: SeriesRange, one_before_range: bool) -> Self {
let range = match range {
SeriesRange::TimeRange(k) => k,
SeriesRange::PulseRange(_) => todo!(),
};
let ivl = MS * 1000;
let dts = ivl * node_count as u64;
let ts = (range.beg / ivl + node_ix - if one_before_range { 1 } else { 0 }) * ivl;
let tsend = range.end;
Self {
ts,
dts,
tsend,
c1: 0,
timeout: None,
do_throttle: false,
done: false,
done_range_final: false,
}
}
fn make_batch(&mut self) -> Sitemty<ChannelEvents> {
type T = i32;
let mut item = EventsDim0::empty();
let mut ts = self.ts;
loop {
if self.ts >= self.tsend || item.byte_estimate() > 100 {
break;
}
let pulse = ts;
let value = (ts / (MS * 100) % 1000) as T;
item.push(ts, pulse, value);
ts += self.dts;
}
self.ts = ts;
let w = ChannelEvents::Events(Box::new(item) as _);
let w = sitem_data(w);
w
}
}
impl Stream for GenerateI32V00 {
type Item = Sitemty<ChannelEvents>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
loop {
break if self.done {
Ready(None)
} else if self.ts >= self.tsend {
self.done = true;
self.done_range_final = true;
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))))
} else if !self.do_throttle {
// To use the generator without throttling, use this scope
Ready(Some(self.make_batch()))
} else if let Some(fut) = self.timeout.as_mut() {
match fut.poll_unpin(cx) {
Ready(()) => {
self.timeout = None;
Ready(Some(self.make_batch()))
}
Pending => Pending,
}
} else {
self.timeout = Some(Box::pin(tokio::time::sleep(Duration::from_millis(2))));
continue;
};
}
}
}
pub struct GenerateI32V01 {
ivl: u64,
ts: u64,
dts: u64,
tsend: u64,
#[allow(unused)]
c1: u64,
node_ix: u64,
timeout: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
do_throttle: bool,
have_range_final: bool,
done: bool,
done_range_final: bool,
}
impl GenerateI32V01 {
pub fn new(node_ix: u64, node_count: u64, range: SeriesRange, one_before_range: bool) -> Self {
let range = match range {
SeriesRange::TimeRange(k) => k,
SeriesRange::PulseRange(_) => todo!(),
};
let ivl = MS * 500;
let dts = ivl * node_count as u64;
let ts = (range.beg / ivl + node_ix - if one_before_range { 1 } else { 0 }) * ivl;
let tsend = range.end.min(DAY);
let have_range_final = range.end < (DAY - ivl);
debug!(
"GenerateI32V01::new ivl {} dts {} ts {} one_before_range {}",
ivl, dts, ts, one_before_range
);
Self {
ivl,
ts,
dts,
tsend,
c1: 0,
node_ix,
timeout: None,
do_throttle: false,
have_range_final,
done: false,
done_range_final: false,
}
}
fn make_batch(&mut self) -> Sitemty<ChannelEvents> {
type T = i32;
let mut item = EventsDim0::empty();
let mut ts = self.ts;
loop {
if self.ts >= self.tsend || item.byte_estimate() > 100 {
break;
}
let pulse = ts;
let value = (ts / self.ivl) as T;
if false {
info!(
"v01 node {} made event ts {} pulse {} value {}",
self.node_ix, ts, pulse, value
);
}
item.push(ts, pulse, value);
ts += self.dts;
}
self.ts = ts;
let w = ChannelEvents::Events(Box::new(item) as _);
let w = sitem_data(w);
w
}
}
impl Stream for GenerateI32V01 {
type Item = Sitemty<ChannelEvents>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
loop {
break if self.done {
Ready(None)
} else if self.ts >= self.tsend {
self.done = true;
self.done_range_final = true;
if self.have_range_final {
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))))
} else {
continue;
}
} else if !self.do_throttle {
// To use the generator without throttling, use this scope
Ready(Some(self.make_batch()))
} else if let Some(fut) = self.timeout.as_mut() {
match fut.poll_unpin(cx) {
Ready(()) => {
self.timeout = None;
Ready(Some(self.make_batch()))
}
Pending => Pending,
}
} else {
self.timeout = Some(Box::pin(tokio::time::sleep(Duration::from_millis(2))));
continue;
};
}
}
}
pub struct GenerateF64V00 {
ivl: u64,
ts: u64,
dts: u64,
tsend: u64,
node_ix: u64,
timeout: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
do_throttle: bool,
done: bool,
done_range_final: bool,
}
impl GenerateF64V00 {
pub fn new(node_ix: u64, node_count: u64, range: SeriesRange, one_before_range: bool) -> Self {
let range = match range {
SeriesRange::TimeRange(k) => k,
SeriesRange::PulseRange(_) => todo!(),
};
let ivl = MS * 100;
let dts = ivl * node_count as u64;
let ts = (range.beg / ivl + node_ix - if one_before_range { 1 } else { 0 }) * ivl;
let tsend = range.end;
debug!(
"GenerateF64V00::new ivl {} dts {} ts {} one_before_range {}",
ivl, dts, ts, one_before_range
);
Self {
ivl,
ts,
dts,
tsend,
node_ix,
timeout: None,
do_throttle: false,
done: false,
done_range_final: false,
}
}
fn make_batch(&mut self) -> Sitemty<ChannelEvents> {
type T = f64;
let mut item = EventsDim1::empty();
let mut ts = self.ts;
loop {
if self.ts >= self.tsend || item.byte_estimate() > 400 {
break;
}
let pulse = ts;
let ampl = ((ts / self.ivl) as T).sin() + 2.;
let mut value = Vec::new();
let pi = PI;
for i in 0..21 {
let x = ((-pi + (2. * pi / 20.) * i as f64).cos() + 1.1) * ampl;
value.push(x);
}
if false {
info!(
"v01 node {} made event ts {} pulse {} value {:?}",
self.node_ix, ts, pulse, value
);
}
item.push(ts, pulse, value);
ts += self.dts;
}
self.ts = ts;
trace!("generated len {}", item.len());
let w = ChannelEvents::Events(Box::new(item) as _);
let w = sitem_data(w);
w
}
}
impl Stream for GenerateF64V00 {
type Item = Sitemty<ChannelEvents>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
loop {
break if self.done {
Ready(None)
} else if self.ts >= self.tsend {
self.done = true;
self.done_range_final = true;
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))))
} else if !self.do_throttle {
// To use the generator without throttling, use this scope
Ready(Some(self.make_batch()))
} else if let Some(fut) = self.timeout.as_mut() {
match fut.poll_unpin(cx) {
Ready(()) => {
self.timeout = None;
Ready(Some(self.make_batch()))
}
Pending => Pending,
}
} else {
self.timeout = Some(Box::pin(tokio::time::sleep(Duration::from_millis(2))));
continue;
};
}
}
}

View File

@@ -0,0 +1,89 @@
use async_channel::Send;
use async_channel::Sender;
use err::Error;
use futures_util::pin_mut;
use futures_util::Future;
use futures_util::Stream;
use futures_util::StreamExt;
use std::pin::Pin;
use std::ptr::NonNull;
use std::task::Context;
use std::task::Poll;
pub struct Itemclone<'a, T, INP>
where
T: 'static,
{
sender: Pin<Box<Sender<T>>>,
inp: INP,
send_fut: Option<Send<'a, T>>,
}
impl<'a, T, INP> Itemclone<'a, T, INP> {
pub fn new(inp: INP, sender: Sender<T>) -> Self
where
INP: Stream<Item = T> + Unpin,
T: Clone + Unpin,
{
let sender = Box::pin(sender);
Self {
sender,
inp,
send_fut: None,
}
}
}
impl<'a, T, INP> Itemclone<'a, T, INP>
where
INP: Stream<Item = T> + Unpin,
T: Clone + Unpin,
{
fn poll_fresh(&mut self, cx: &mut Context) -> Poll<Option<Result<T, Error>>> {
use Poll::*;
match self.inp.poll_next_unpin(cx) {
Ready(Some(item)) => {
let sender = self.sender.as_mut().get_mut();
let mut ptr1 = NonNull::from(sender);
let sender = unsafe { ptr1.as_mut() };
self.send_fut = Some(sender.send(item.clone()));
Ready(Some(Ok(item)))
}
Ready(None) => {
self.sender.close();
Ready(None)
}
Pending => Pending,
}
}
fn send_copy(fut: &mut Send<T>, cx: &mut Context) -> Poll<Result<(), Error>> {
use Poll::*;
pin_mut!(fut);
match fut.poll(cx) {
Ready(Ok(())) => Ready(Ok(())),
Ready(Err(e)) => Ready(Err(e.into())),
Pending => Pending,
}
}
}
impl<'a, T, INP> Stream for Itemclone<'a, T, INP>
where
INP: Stream<Item = T> + Unpin,
T: Clone + Unpin,
{
type Item = Result<T, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
match self.send_fut.as_mut() {
Some(fut) => match Self::send_copy(fut, cx) {
Ready(Ok(())) => self.poll_fresh(cx),
Ready(Err(e)) => Ready(Some(Err(e))),
Pending => Pending,
},
None => self.poll_fresh(cx),
}
}
}

17
crates/streams/src/lib.rs Normal file
View File

@@ -0,0 +1,17 @@
pub mod boxed;
pub mod collect;
pub mod dtflags;
pub mod filechunkread;
pub mod frames;
pub mod generators;
pub mod itemclone;
pub mod needminbuffer;
pub mod plaineventsjson;
pub mod rangefilter2;
pub mod slidebuf;
pub mod tcprawclient;
#[cfg(test)]
pub mod test;
pub mod timebin;
pub mod timebinnedjson;
pub mod transform;

View File

@@ -0,0 +1,104 @@
use crate::filechunkread::FileChunkRead;
use err::Error;
use futures_util::{Stream, StreamExt};
use netpod::histo::HistoLog2;
use netpod::log::*;
use std::pin::Pin;
use std::task::{Context, Poll};
pub struct NeedMinBuffer {
inp: Pin<Box<dyn Stream<Item = Result<FileChunkRead, Error>> + Send>>,
need_min: u32,
left: Option<FileChunkRead>,
buf_len_histo: HistoLog2,
errored: bool,
completed: bool,
}
impl NeedMinBuffer {
pub fn new(inp: Pin<Box<dyn Stream<Item = Result<FileChunkRead, Error>> + Send>>) -> Self {
Self {
inp: inp,
need_min: 1,
left: None,
buf_len_histo: HistoLog2::new(8),
errored: false,
completed: false,
}
}
pub fn put_back(&mut self, buf: FileChunkRead) {
assert!(self.left.is_none());
self.left = Some(buf);
}
pub fn set_need_min(&mut self, need_min: u32) {
self.need_min = need_min;
}
}
// TODO collect somewhere else
impl Drop for NeedMinBuffer {
fn drop(&mut self) {
debug!("NeedMinBuffer Drop Stats:\nbuf_len_histo: {:?}", self.buf_len_histo);
}
}
impl Stream for NeedMinBuffer {
type Item = Result<FileChunkRead, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
loop {
break if self.completed {
panic!("NeedMinBuffer poll_next on completed");
} else if self.errored {
self.completed = true;
return Ready(None);
} else {
match self.inp.poll_next_unpin(cx) {
Ready(Some(Ok(mut fcr))) => {
self.buf_len_histo.ingest(fcr.buf().len() as u32);
//info!("NeedMinBuffer got buf len {}", fcr.buf.len());
match self.left.take() {
Some(mut lfcr) => {
// TODO measure:
lfcr.buf_mut().unsplit(fcr.buf_take());
*lfcr.duration_mut() += *fcr.duration();
let fcr = lfcr;
if fcr.buf().len() as u32 >= self.need_min {
//info!("with left ready len {} need_min {}", buf.len(), self.need_min);
Ready(Some(Ok(fcr)))
} else {
//info!("with left not enough len {} need_min {}", buf.len(), self.need_min);
self.left.replace(fcr);
continue;
}
}
None => {
if fcr.buf().len() as u32 >= self.need_min {
//info!("simply ready len {} need_min {}", buf.len(), self.need_min);
Ready(Some(Ok(fcr)))
} else {
//info!("no previous leftover, need more len {} need_min {}", buf.len(), self.need_min);
self.left.replace(fcr);
continue;
}
}
}
}
Ready(Some(Err(e))) => {
self.errored = true;
Ready(Some(Err(e.into())))
}
Ready(None) => {
// TODO collect somewhere
debug!("NeedMinBuffer histo: {:?}", self.buf_len_histo);
Ready(None)
}
Pending => Pending,
}
};
}
}
}

View File

@@ -0,0 +1,69 @@
use crate::collect::Collect;
use crate::tcprawclient::open_tcp_streams;
use crate::transform::build_merged_event_transform;
use crate::transform::EventsToTimeBinnable;
use crate::transform::TimeBinnableToCollectable;
use err::Error;
use futures_util::StreamExt;
use items_0::collect_s::Collectable;
use items_0::on_sitemty_data;
use items_0::Events;
use items_2::channelevents::ChannelEvents;
use items_2::merger::Merger;
use items_2::streams::PlainEventStream;
use netpod::log::*;
use netpod::ChannelTypeConfigGen;
use netpod::Cluster;
use query::api4::events::EventsSubQuery;
use query::api4::events::EventsSubQuerySelect;
use query::api4::events::EventsSubQuerySettings;
use query::api4::events::PlainEventsQuery;
use serde_json::Value as JsonValue;
use std::time::Instant;
pub async fn plain_events_json(
evq: &PlainEventsQuery,
ch_conf: ChannelTypeConfigGen,
cluster: &Cluster,
) -> Result<JsonValue, Error> {
info!("plain_events_json evquery {:?}", evq);
let select = EventsSubQuerySelect::new(ch_conf, evq.range().clone(), evq.transform().clone());
let settings = EventsSubQuerySettings::from(evq);
let subq = EventsSubQuery::from_parts(select, settings);
// TODO remove magic constant
let deadline = Instant::now() + evq.timeout();
let mut tr = build_merged_event_transform(evq.transform())?;
// TODO make sure the empty container arrives over the network.
let inps = open_tcp_streams::<ChannelEvents>(subq, cluster).await?;
// TODO propagate also the max-buf-len for the first stage event reader.
// TODO use a mixture of count and byte-size as threshold.
let stream = Merger::new(inps, evq.merger_out_len_max());
#[cfg(DISABLED)]
let stream = stream.map(|item| {
info!("item after merge: {item:?}");
item
});
//#[cfg(DISABLED)]
let stream = crate::rangefilter2::RangeFilter2::new(stream, evq.range().try_into()?, evq.one_before_range());
#[cfg(DISABLED)]
let stream = stream.map(|item| {
info!("item after rangefilter: {item:?}");
item
});
let stream = stream.map(move |k| {
on_sitemty_data!(k, |k| {
let k: Box<dyn Events> = Box::new(k);
trace!("got len {}", k.len());
let k = tr.0.transform(k);
let k: Box<dyn Collectable> = Box::new(k);
Ok(StreamItem::DataItem(RangeCompletableItem::Data(k)))
})
});
//let stream = PlainEventStream::new(stream);
//let stream = EventsToTimeBinnable::new(stream);
//let stream = TimeBinnableToCollectable::new(stream);
let stream = Box::pin(stream);
let collected = Collect::new(stream, deadline, evq.events_max(), Some(evq.range().clone()), None).await?;
let jsval = serde_json::to_value(&collected)?;
Ok(jsval)
}

View File

@@ -0,0 +1,238 @@
use err::Error;
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StatsItem;
use items_0::streamitem::StreamItem;
use items_0::MergeError;
use items_2::merger::Mergeable;
use netpod::log::*;
use netpod::range::evrange::NanoRange;
use netpod::RangeFilterStats;
use std::fmt;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
pub struct RangeFilter2<S, ITY>
where
S: Stream<Item = Sitemty<ITY>> + Unpin,
ITY: Mergeable,
{
inp: S,
range: NanoRange,
range_str: String,
one_before_range: bool,
stats: RangeFilterStats,
slot1: Option<ITY>,
slot2: Option<ITY>,
have_range_complete: bool,
data_done: bool,
raco_done: bool,
done: bool,
complete: bool,
}
impl<S, ITY> RangeFilter2<S, ITY>
where
S: Stream<Item = Sitemty<ITY>> + Unpin,
ITY: Mergeable,
{
pub fn type_name() -> &'static str {
std::any::type_name::<Self>()
}
pub fn new(inp: S, range: NanoRange, one_before_range: bool) -> Self {
trace!(
"{}::new range: {:?} one_before_range: {:?}",
Self::type_name(),
range,
one_before_range
);
Self {
inp,
range_str: format!("{:?}", range),
range,
one_before_range,
stats: RangeFilterStats::new(),
slot1: None,
slot2: None,
have_range_complete: false,
data_done: false,
raco_done: false,
done: false,
complete: false,
}
}
fn prune_high(&mut self, mut item: ITY, ts: u64) -> Result<ITY, Error> {
let ret = match item.find_highest_index_lt(ts) {
Some(ihlt) => {
let n = item.len();
if ihlt + 1 == n {
// TODO gather stats, this should be the most common case.
self.stats.items_no_prune_high += 1;
item
} else {
self.stats.items_part_prune_high += 1;
let mut dummy = item.new_empty();
match item.drain_into(&mut dummy, (ihlt + 1, n)) {
Ok(_) => {}
Err(e) => match e {
MergeError::NotCompatible => {
error!("logic error")
}
MergeError::Full => error!("full, logic error"),
},
}
item
}
}
None => {
self.stats.items_all_prune_high += 1;
item.new_empty()
}
};
Ok(ret)
}
fn handle_item(&mut self, item: ITY) -> Result<ITY, Error> {
let mut item = self.prune_high(item, self.range.end)?;
let ret = if self.one_before_range {
match item.find_lowest_index_ge(self.range.beg) {
Some(ilge) => {
if ilge == 0 {
if let Some(sl1) = self.slot1.take() {
self.slot2 = Some(item);
sl1
} else {
item
}
} else {
let mut dummy = item.new_empty();
item.drain_into(&mut dummy, (0, ilge - 1))
.map_err(|e| format!("{e:?} unexpected MergeError while remove of items"))?;
self.slot1 = None;
item
}
}
None => {
let n = item.len();
let mut keep = item.new_empty();
item.drain_into(&mut keep, (n.max(1) - 1, n))
.map_err(|e| format!("{e:?} unexpected MergeError while remove of items"))?;
self.slot1 = Some(keep);
item.new_empty()
}
}
} else {
match item.find_lowest_index_ge(self.range.beg) {
Some(ilge) => {
let mut dummy = item.new_empty();
item.drain_into(&mut dummy, (0, ilge))?;
item
}
None => {
// TODO count case for stats
item.new_empty()
}
}
};
Ok(ret)
}
}
impl<S, ITY> RangeFilter2<S, ITY>
where
S: Stream<Item = Sitemty<ITY>> + Unpin,
ITY: Mergeable,
{
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<<Self as Stream>::Item>> {
use Poll::*;
loop {
break if self.complete {
error!("{} poll_next on complete", Self::type_name());
Ready(Some(Err(Error::with_msg_no_trace(format!(
"{} poll_next on complete",
Self::type_name()
)))))
} else if self.done {
self.complete = true;
Ready(None)
} else if self.raco_done {
self.done = true;
let k = std::mem::replace(&mut self.stats, RangeFilterStats::new());
let k = StatsItem::RangeFilterStats(k);
Ready(Some(Ok(StreamItem::Stats(k))))
} else if self.data_done {
self.raco_done = true;
if self.have_range_complete {
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))))
} else {
continue;
}
} else if let Some(sl2) = self.slot2.take() {
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(sl2)))))
} else {
match self.inp.poll_next_unpin(cx) {
Ready(Some(item)) => match item {
Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) => match self.handle_item(item) {
Ok(item) => Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))))),
Err(e) => {
error!("sees: {e}");
self.data_done = true;
Ready(Some(Err(e)))
}
},
Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) => {
self.have_range_complete = true;
continue;
}
k => Ready(Some(k)),
},
Ready(None) => {
self.data_done = true;
continue;
}
Pending => Pending,
}
};
}
}
}
impl<S, ITY> Stream for RangeFilter2<S, ITY>
where
S: Stream<Item = Sitemty<ITY>> + Unpin,
ITY: Mergeable,
{
type Item = Sitemty<ITY>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let span1 = span!(Level::INFO, "RangeFilter2", range = tracing::field::Empty);
span1.record("range", &self.range_str.as_str());
let _spg = span1.enter();
Self::poll_next(self, cx)
}
}
impl<S, ITY> fmt::Debug for RangeFilter2<S, ITY>
where
S: Stream<Item = Sitemty<ITY>> + Unpin,
ITY: Mergeable,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct(Self::type_name()).finish()
}
}
impl<S, ITY> Drop for RangeFilter2<S, ITY>
where
S: Stream<Item = Sitemty<ITY>> + Unpin,
ITY: Mergeable,
{
fn drop(&mut self) {
debug!("drop {} {:?}", Self::type_name(), self);
}
}

View File

@@ -0,0 +1,441 @@
use std::fmt;
#[derive(Debug)]
pub enum Error {
NotEnoughBytes,
NotEnoughSpace,
TryFromSliceError,
}
impl fmt::Display for Error {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "{self:?}")
}
}
impl std::error::Error for Error {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
None
}
}
impl From<std::array::TryFromSliceError> for Error {
fn from(_: std::array::TryFromSliceError) -> Self {
Self::TryFromSliceError
}
}
pub struct SlideBuf {
buf: Vec<u8>,
wp: usize,
rp: usize,
}
macro_rules! check_invariants {
($self:expr) => {
//$self.check_invariants()
};
}
impl SlideBuf {
pub fn new(cap: usize) -> Self {
Self {
buf: vec![0; cap],
wp: 0,
rp: 0,
}
}
pub fn state(&self) -> (usize, usize) {
(self.rp, self.wp)
}
pub fn len(&self) -> usize {
check_invariants!(self);
self.wp - self.rp
}
#[inline(always)]
pub fn cap(&self) -> usize {
check_invariants!(self);
self.buf.len()
}
pub fn wcap(&self) -> usize {
check_invariants!(self);
self.buf.len() - self.wp
}
pub fn data(&self) -> &[u8] {
check_invariants!(self);
&self.buf[self.rp..self.wp]
}
pub fn data_mut(&mut self) -> &mut [u8] {
check_invariants!(self);
&mut self.buf[self.rp..self.wp]
}
pub fn reset(&mut self) {
self.rp = 0;
self.wp = 0;
}
pub fn adv(&mut self, x: usize) -> Result<(), Error> {
check_invariants!(self);
if self.len() < x {
return Err(Error::NotEnoughBytes);
} else {
self.rp += x;
Ok(())
}
}
pub fn wadv(&mut self, x: usize) -> Result<(), Error> {
check_invariants!(self);
if self.wcap() < x {
self.rewind();
}
if self.wcap() < x {
return Err(Error::NotEnoughSpace);
} else {
self.wp += x;
Ok(())
}
}
pub fn rp(&self) -> usize {
self.rp
}
pub fn set_rp(&mut self, rp: usize) -> Result<(), Error> {
check_invariants!(self);
if rp > self.wp {
Err(Error::NotEnoughBytes)
} else {
self.rp = rp;
Ok(())
}
}
pub fn rewind_rp(&mut self, n: usize) -> Result<(), Error> {
check_invariants!(self);
if self.rp < n {
Err(Error::NotEnoughBytes)
} else {
self.rp -= n;
Ok(())
}
}
pub fn read_u8(&mut self) -> Result<u8, Error> {
check_invariants!(self);
type T = u8;
const TS: usize = std::mem::size_of::<T>();
if self.len() < TS {
return Err(Error::NotEnoughBytes);
} else {
let val = self.buf[self.rp];
self.rp += TS;
Ok(val)
}
}
pub fn read_u16_be(&mut self) -> Result<u16, Error> {
check_invariants!(self);
type T = u16;
const TS: usize = std::mem::size_of::<T>();
if self.len() < TS {
return Err(Error::NotEnoughBytes);
} else {
let val = T::from_be_bytes(self.buf[self.rp..self.rp + TS].try_into()?);
self.rp += TS;
Ok(val)
}
}
pub fn read_u32_be(&mut self) -> Result<u32, Error> {
check_invariants!(self);
type T = u32;
const TS: usize = std::mem::size_of::<T>();
if self.len() < TS {
return Err(Error::NotEnoughBytes);
} else {
let val = T::from_be_bytes(self.buf[self.rp..self.rp + TS].try_into()?);
self.rp += TS;
Ok(val)
}
}
pub fn read_u64_be(&mut self) -> Result<u64, Error> {
check_invariants!(self);
type T = u64;
const TS: usize = std::mem::size_of::<T>();
if self.len() < TS {
return Err(Error::NotEnoughBytes);
} else {
let val = T::from_be_bytes(self.buf[self.rp..self.rp + TS].try_into()?);
self.rp += TS;
Ok(val)
}
}
pub fn read_i32_be(&mut self) -> Result<i32, Error> {
check_invariants!(self);
type T = i32;
const TS: usize = std::mem::size_of::<T>();
if self.len() < TS {
return Err(Error::NotEnoughBytes);
} else {
let val = T::from_be_bytes(self.buf[self.rp..self.rp + TS].try_into()?);
self.rp += TS;
Ok(val)
}
}
pub fn read_i64_be(&mut self) -> Result<i64, Error> {
check_invariants!(self);
type T = i64;
const TS: usize = std::mem::size_of::<T>();
if self.len() < TS {
return Err(Error::NotEnoughBytes);
} else {
let val = T::from_be_bytes(self.buf[self.rp..self.rp + TS].try_into()?);
self.rp += TS;
Ok(val)
}
}
pub fn read_f32_be(&mut self) -> Result<f32, Error> {
check_invariants!(self);
type T = f32;
const TS: usize = std::mem::size_of::<T>();
if self.len() < TS {
return Err(Error::NotEnoughBytes);
} else {
let val = T::from_be_bytes(self.buf[self.rp..self.rp + TS].try_into()?);
self.rp += TS;
Ok(val)
}
}
pub fn read_f64_be(&mut self) -> Result<f64, Error> {
check_invariants!(self);
type T = f64;
const TS: usize = std::mem::size_of::<T>();
if self.len() < TS {
return Err(Error::NotEnoughBytes);
} else {
let val = T::from_be_bytes(self.buf[self.rp..self.rp + TS].try_into()?);
self.rp += TS;
Ok(val)
}
}
pub fn read_bytes(&mut self, n: usize) -> Result<&[u8], Error> {
check_invariants!(self);
if self.len() < n {
return Err(Error::NotEnoughBytes);
} else {
let val = self.buf[self.rp..self.rp + n].as_ref();
self.rp += n;
Ok(val)
}
}
/*pub fn read_buf_for_fill(&mut self, need_min: usize) -> ReadBuf {
check_invariants!(self);
self.rewind_if_needed(need_min);
let read_buf = ReadBuf::new(&mut self.buf[self.wp..]);
read_buf
}*/
// TODO issue is that this return exactly the size that was asked for,
// but most of time, we want to first get some scratch space, and later
// advance the write pointer.
pub fn ___write_buf___(&mut self, n: usize) -> Result<&mut [u8], Error> {
check_invariants!(self);
self.rewind_if_needed(n);
if self.wcap() < n {
self.rewind();
}
if self.wcap() < n {
Err(Error::NotEnoughSpace)
} else {
let ret = &mut self.buf[self.wp..self.wp + n];
self.wp += n;
Ok(ret)
}
}
#[inline(always)]
pub fn rewind(&mut self) {
self.buf.copy_within(self.rp..self.wp, 0);
self.wp -= self.rp;
self.rp = 0;
}
#[inline(always)]
pub fn rewind_if_needed(&mut self, need_min: usize) {
check_invariants!(self);
if self.rp != 0 && self.rp == self.wp {
self.rp = 0;
self.wp = 0;
} else if self.cap() < self.rp + need_min {
self.rewind();
}
}
pub fn available_writable_area(&mut self, need_min: usize) -> Result<&mut [u8], Error> {
check_invariants!(self);
self.rewind_if_needed(need_min);
if self.wcap() < need_min {
self.rewind();
}
if self.wcap() < need_min {
Err(Error::NotEnoughSpace)
} else {
let ret = &mut self.buf[self.wp..];
Ok(ret)
}
}
pub fn put_slice(&mut self, buf: &[u8]) -> Result<(), Error> {
check_invariants!(self);
self.rewind_if_needed(buf.len());
if self.wcap() < buf.len() {
self.rewind();
}
if self.wcap() < buf.len() {
return Err(Error::NotEnoughSpace);
} else {
self.buf[self.wp..self.wp + buf.len()].copy_from_slice(buf);
self.wp += buf.len();
Ok(())
}
}
pub fn put_u8(&mut self, v: u8) -> Result<(), Error> {
check_invariants!(self);
type T = u8;
const TS: usize = std::mem::size_of::<T>();
self.rewind_if_needed(TS);
if self.wcap() < TS {
self.rewind();
}
if self.wcap() < TS {
return Err(Error::NotEnoughSpace);
} else {
self.buf[self.wp..self.wp + TS].copy_from_slice(&v.to_be_bytes());
self.wp += TS;
Ok(())
}
}
pub fn put_u16_be(&mut self, v: u16) -> Result<(), Error> {
check_invariants!(self);
type T = u16;
const TS: usize = std::mem::size_of::<T>();
self.rewind_if_needed(TS);
if self.wcap() < TS {
self.rewind();
}
if self.wcap() < TS {
return Err(Error::NotEnoughSpace);
} else {
self.buf[self.wp..self.wp + TS].copy_from_slice(&v.to_be_bytes());
self.wp += TS;
Ok(())
}
}
pub fn put_u32_be(&mut self, v: u32) -> Result<(), Error> {
check_invariants!(self);
type T = u32;
const TS: usize = std::mem::size_of::<T>();
self.rewind_if_needed(TS);
if self.wcap() < TS {
self.rewind();
}
if self.wcap() < TS {
return Err(Error::NotEnoughSpace);
} else {
self.buf[self.wp..self.wp + TS].copy_from_slice(&v.to_be_bytes());
self.wp += TS;
Ok(())
}
}
pub fn put_u64_be(&mut self, v: u64) -> Result<(), Error> {
check_invariants!(self);
type T = u64;
const TS: usize = std::mem::size_of::<T>();
self.rewind_if_needed(TS);
if self.wcap() < TS {
self.rewind();
}
if self.wcap() < TS {
return Err(Error::NotEnoughSpace);
} else {
self.buf[self.wp..self.wp + TS].copy_from_slice(&v.to_be_bytes());
self.wp += TS;
Ok(())
}
}
pub fn put_f32_be(&mut self, v: f32) -> Result<(), Error> {
check_invariants!(self);
type T = f32;
const TS: usize = std::mem::size_of::<T>();
self.rewind_if_needed(TS);
if self.wcap() < TS {
self.rewind();
}
if self.wcap() < TS {
return Err(Error::NotEnoughSpace);
} else {
self.buf[self.wp..self.wp + TS].copy_from_slice(&v.to_be_bytes());
self.wp += TS;
Ok(())
}
}
pub fn put_f64_be(&mut self, v: f64) -> Result<(), Error> {
check_invariants!(self);
type T = f64;
const TS: usize = std::mem::size_of::<T>();
self.rewind_if_needed(TS);
if self.wcap() < TS {
self.rewind();
}
if self.wcap() < TS {
return Err(Error::NotEnoughSpace);
} else {
self.buf[self.wp..self.wp + TS].copy_from_slice(&v.to_be_bytes());
self.wp += TS;
Ok(())
}
}
#[allow(unused)]
fn check_invariants(&self) {
if self.wp > self.buf.len() {
eprintln!("ERROR netbuf wp {} rp {}", self.wp, self.rp);
std::process::exit(87);
}
if self.rp > self.wp {
eprintln!("ERROR netbuf wp {} rp {}", self.wp, self.rp);
std::process::exit(87);
}
}
}
impl fmt::Debug for SlideBuf {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("SlideBuf")
.field("cap", &self.cap())
.field("wp", &self.wp)
.field("rp", &self.rp)
.finish()
}
}

View File

@@ -0,0 +1,87 @@
/*!
Delivers event data.
Delivers event data (not yet time-binned) from local storage and provides client functions
to request such data from nodes.
*/
use crate::frames::eventsfromframes::EventsFromFrames;
use crate::frames::inmem::InMemoryFrameAsyncReadStream;
use err::Error;
use futures_util::Stream;
use items_0::framable::FrameTypeInnerStatic;
use items_0::streamitem::sitem_data;
use items_0::streamitem::Sitemty;
use items_2::eventfull::EventFull;
use items_2::framable::EventQueryJsonStringFrame;
use items_2::framable::Framable;
use items_2::frame::make_term_frame;
use netpod::log::*;
use netpod::Cluster;
use netpod::Node;
use query::api4::events::EventsSubQuery;
use query::api4::events::Frame1Parts;
use serde::de::DeserializeOwned;
use std::fmt;
use std::pin::Pin;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;
pub fn make_node_command_frame(query: EventsSubQuery) -> Result<EventQueryJsonStringFrame, Error> {
let obj = Frame1Parts::new(query);
let ret = serde_json::to_string(&obj)?;
Ok(EventQueryJsonStringFrame(ret))
}
pub async fn x_processed_event_blobs_stream_from_node(
subq: EventsSubQuery,
node: Node,
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<EventFull>> + Send>>, Error> {
let addr = format!("{}:{}", node.host, node.port_raw);
debug!("x_processed_event_blobs_stream_from_node to: {addr}",);
let frame1 = make_node_command_frame(subq.clone())?;
let net = TcpStream::connect(addr.clone()).await?;
let (netin, mut netout) = net.into_split();
let item = sitem_data(frame1);
let buf = item.make_frame()?;
netout.write_all(&buf).await?;
let buf = make_term_frame()?;
netout.write_all(&buf).await?;
netout.flush().await?;
netout.forget();
let frames = InMemoryFrameAsyncReadStream::new(netin, subq.inmem_bufcap());
let frames = Box::pin(frames);
let items = EventsFromFrames::new(frames, addr);
Ok(Box::pin(items))
}
pub type BoxedStream<T> = Pin<Box<dyn Stream<Item = Sitemty<T>> + Send>>;
pub async fn open_tcp_streams<T>(subq: EventsSubQuery, cluster: &Cluster) -> Result<Vec<BoxedStream<T>>, Error>
where
// Group bounds in new trait
T: FrameTypeInnerStatic + DeserializeOwned + Send + Unpin + fmt::Debug + 'static,
{
// TODO when unit tests established, change to async connect:
let frame1 = make_node_command_frame(subq.clone())?;
let mut streams = Vec::new();
for node in &cluster.nodes {
let addr = format!("{}:{}", node.host, node.port_raw);
debug!("open_tcp_streams to: {addr}");
let net = TcpStream::connect(addr.clone()).await?;
let (netin, mut netout) = net.into_split();
let item = sitem_data(frame1.clone());
let buf = item.make_frame()?;
netout.write_all(&buf).await?;
let buf = make_term_frame()?;
netout.write_all(&buf).await?;
netout.flush().await?;
netout.forget();
// TODO for images, we need larger buffer capacity
let frames = InMemoryFrameAsyncReadStream::new(netin, subq.inmem_bufcap());
let frames = Box::pin(frames);
let stream = EventsFromFrames::<T>::new(frames, addr);
streams.push(Box::pin(stream) as _);
}
Ok(streams)
}

View File

@@ -0,0 +1,59 @@
#[cfg(test)]
mod collect;
#[cfg(test)]
mod timebin;
use err::Error;
use futures_util::stream;
use futures_util::Stream;
use items_0::streamitem::sitem_data;
use items_0::streamitem::Sitemty;
use items_0::Appendable;
use items_0::Empty;
use items_2::channelevents::ChannelEvents;
use items_2::eventsdim0::EventsDim0;
use netpod::timeunits::SEC;
use std::pin::Pin;
type BoxedEventStream = Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>>;
// TODO use some xorshift generator.
fn inmem_test_events_d0_i32_00() -> BoxedEventStream {
let mut evs = EventsDim0::empty();
evs.push(SEC * 1, 1, 10001);
evs.push(SEC * 4, 4, 10004);
let cev = ChannelEvents::Events(Box::new(evs));
let item = sitem_data(cev);
let stream = stream::iter(vec![item]);
Box::pin(stream)
}
fn inmem_test_events_d0_i32_01() -> BoxedEventStream {
let mut evs = EventsDim0::empty();
evs.push(SEC * 2, 2, 10002);
let cev = ChannelEvents::Events(Box::new(evs));
let item = sitem_data(cev);
let stream = stream::iter(vec![item]);
Box::pin(stream)
}
#[test]
fn merge_mergeable_00() -> Result<(), Error> {
let fut = async {
let inp0 = inmem_test_events_d0_i32_00();
let inp1 = inmem_test_events_d0_i32_01();
let _merger = items_2::merger::Merger::new(vec![inp0, inp1], 4);
Ok(())
};
runfut(fut)
}
fn runfut<T, F>(fut: F) -> Result<T, err::Error>
where
F: std::future::Future<Output = Result<T, Error>>,
{
use futures_util::TryFutureExt;
let fut = fut.map_err(|e| e.into());
taskrun::run(fut)
}

View File

@@ -0,0 +1,121 @@
use crate::collect::Collect;
use crate::test::runfut;
use crate::transform::build_event_transform;
use crate::transform::build_time_binning_transform;
use crate::transform::EventsToTimeBinnable;
use crate::transform::TimeBinnableToCollectable;
use err::Error;
use futures_util::stream;
use futures_util::StreamExt;
use items_0::on_sitemty_data;
use items_0::streamitem::sitem_data;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::StreamItem;
use items_0::transform::EventStreamBox;
use items_0::transform::EventStreamTrait;
use items_0::WithLen;
use items_2::eventsdim0::EventsDim0CollectorOutput;
use items_2::streams::PlainEventStream;
use items_2::testgen::make_some_boxed_d0_f32;
use netpod::log::*;
use netpod::timeunits::SEC;
use netpod::FromUrl;
use query::transform::TransformQuery;
use std::time::Duration;
use std::time::Instant;
#[test]
fn collect_channel_events_00() -> Result<(), Error> {
let fut = async {
let evs0 = make_some_boxed_d0_f32(20, SEC * 10, SEC * 1, 0, 28736487);
let evs1 = make_some_boxed_d0_f32(20, SEC * 30, SEC * 1, 0, 882716583);
let stream = stream::iter(vec![
sitem_data(evs0),
sitem_data(evs1),
Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)),
]);
let deadline = Instant::now() + Duration::from_millis(4000);
let events_max = 10000;
let res = crate::collect::collect(stream, deadline, events_max, None, None).await?;
//eprintln!("collected result: {res:?}");
if let Some(res) = res.as_any_ref().downcast_ref::<EventsDim0CollectorOutput<f32>>() {
eprintln!("Great, a match");
eprintln!("{res:?}");
assert_eq!(res.len(), 40);
} else {
return Err(Error::with_msg(format!("bad type of collected result")));
}
Ok(())
};
runfut(fut)
}
#[test]
fn collect_channel_events_01() -> Result<(), Error> {
let fut = async {
let evs0 = make_some_boxed_d0_f32(20, SEC * 10, SEC * 1, 0, 28736487);
let evs1 = make_some_boxed_d0_f32(20, SEC * 30, SEC * 1, 0, 882716583);
let stream = stream::iter(vec![
sitem_data(evs0),
sitem_data(evs1),
Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)),
]);
// TODO build like in request code
let deadline = Instant::now() + Duration::from_millis(4000);
let events_max = 10000;
let stream = PlainEventStream::new(stream);
let stream = EventsToTimeBinnable::new(stream);
let stream = TimeBinnableToCollectable::new(stream);
let stream = Box::pin(stream);
let res = Collect::new(stream, deadline, events_max, None, None).await?;
if let Some(res) = res.as_any_ref().downcast_ref::<EventsDim0CollectorOutput<f32>>() {
eprintln!("Great, a match");
eprintln!("{res:?}");
assert_eq!(res.len(), 40);
} else {
return Err(Error::with_msg(format!("bad type of collected result")));
}
Ok(())
};
runfut(fut)
}
#[test]
fn collect_channel_events_pulse_id_diff() -> Result<(), Error> {
let fut = async {
let trqu = TransformQuery::from_url(&"https://data-api.psi.ch/?binningScheme=pulseIdDiff".parse()?)?;
info!("{trqu:?}");
let evs0 = make_some_boxed_d0_f32(20, SEC * 10, SEC * 1, 0, 28736487);
let evs1 = make_some_boxed_d0_f32(20, SEC * 30, SEC * 1, 0, 882716583);
let stream = stream::iter(vec![
sitem_data(evs0),
sitem_data(evs1),
Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)),
]);
let mut tr = build_event_transform(&trqu)?;
let stream = stream.map(move |x| {
on_sitemty_data!(x, |x| {
let x = tr.0.transform(x);
Ok(StreamItem::DataItem(RangeCompletableItem::Data(x)))
})
});
let stream = PlainEventStream::new(stream);
let stream = EventsToTimeBinnable::new(stream);
let deadline = Instant::now() + Duration::from_millis(4000);
let events_max = 10000;
let stream = Box::pin(stream);
let stream = build_time_binning_transform(&trqu, stream)?;
let stream = TimeBinnableToCollectable::new(stream);
let stream = Box::pin(stream);
let res = Collect::new(stream, deadline, events_max, None, None).await?;
if let Some(res) = res.as_any_ref().downcast_ref::<EventsDim0CollectorOutput<i64>>() {
eprintln!("Great, a match");
eprintln!("{res:?}");
assert_eq!(res.len(), 40);
} else {
return Err(Error::with_msg(format!("bad type of collected result")));
}
Ok(())
};
runfut(fut)
}

View File

@@ -0,0 +1,453 @@
use crate::collect::collect;
use crate::generators::GenerateI32V00;
use crate::generators::GenerateI32V01;
use crate::itemclone::Itemclone;
use crate::test::runfut;
use crate::timebin::TimeBinnedStream;
use crate::transform::build_event_transform;
use err::Error;
use futures_util::stream;
use futures_util::StreamExt;
use items_0::on_sitemty_data;
use items_0::streamitem::sitem_data;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
use items_0::timebin::TimeBinnable;
use items_0::timebin::TimeBinned;
use items_0::AppendAllFrom;
use items_0::Empty;
use items_2::binsdim0::BinsDim0;
use items_2::channelevents::ChannelEvents;
use items_2::channelevents::ConnStatus;
use items_2::channelevents::ConnStatusEvent;
use items_2::eventsdim0::EventsDim0;
use items_2::testgen::make_some_boxed_d0_f32;
use netpod::range::evrange::NanoRange;
use netpod::range::evrange::SeriesRange;
use netpod::timeunits::MS;
use netpod::timeunits::SEC;
use netpod::BinnedRangeEnum;
use query::transform::TransformQuery;
use serde_json::Value as JsValue;
use std::collections::VecDeque;
use std::time::Duration;
use std::time::Instant;
fn nano_range_from_str(beg_date: &str, end_date: &str) -> Result<NanoRange, Error> {
let beg_date = beg_date.parse()?;
let end_date = end_date.parse()?;
let range = NanoRange::from_date_time(beg_date, end_date);
Ok(range)
}
#[test]
fn time_bin_00() -> Result<(), Error> {
let fut = async {
let range = nano_range_from_str("1970-01-01T00:00:00Z", "1970-01-01T00:00:08Z")?;
let range = SeriesRange::TimeRange(range);
let min_bin_count = 8;
let binned_range = BinnedRangeEnum::covering_range(range, min_bin_count)?;
let evs0 = make_some_boxed_d0_f32(10, SEC * 1, MS * 500, 0, 1846713782);
let v00 = ChannelEvents::Events(Box::new(EventsDim0::<f32>::empty()));
let v01 = ChannelEvents::Events(evs0);
let v02 = ChannelEvents::Status(Some(ConnStatusEvent::new(MS * 100, ConnStatus::Connect)));
let v03 = ChannelEvents::Status(Some(ConnStatusEvent::new(MS * 6000, ConnStatus::Disconnect)));
let stream0 = Box::pin(stream::iter(vec![
//
sitem_data(v00),
sitem_data(v02),
sitem_data(v01),
sitem_data(v03),
]));
let mut exps = {
let mut d = VecDeque::new();
let bins = BinsDim0::empty();
d.push_back(bins);
let mut bins = BinsDim0::empty();
bins.push(SEC * 0, SEC * 1, 0, 0.0, 0.0, 0.0);
bins.push(SEC * 1, SEC * 2, 2, 0.0535830, 100.0589, 50.05624);
bins.push(SEC * 2, SEC * 3, 2, 200.06143, 300.07645, 250.06894);
bins.push(SEC * 3, SEC * 4, 2, 400.08554, 500.05222, 450.06888);
bins.push(SEC * 4, SEC * 5, 2, 600.0025, 700.09094, 650.04675);
d.push_back(bins);
let mut bins = BinsDim0::empty();
bins.push(SEC * 5, SEC * 6, 2, 800.0619, 900.02844, 850.04517);
d.push_back(bins);
d
};
let mut binned_stream = TimeBinnedStream::new(stream0, binned_range, true);
while let Some(item) = binned_stream.next().await {
eprintln!("{item:?}");
match item {
Ok(item) => match item {
StreamItem::DataItem(item) => match item {
RangeCompletableItem::Data(item) => {
if let Some(item) = item.as_any_ref().downcast_ref::<BinsDim0<f32>>() {
let exp = exps.pop_front().unwrap();
if !item.equal_slack(&exp) {
eprintln!("-----------------------");
eprintln!("item {:?}", item);
eprintln!("-----------------------");
eprintln!("exp {:?}", exp);
eprintln!("-----------------------");
return Err(Error::with_msg_no_trace(format!("bad, content not equal")));
}
} else {
return Err(Error::with_msg_no_trace(format!("bad, got item with unexpected type")));
}
}
RangeCompletableItem::RangeComplete => {}
},
StreamItem::Log(_) => {}
StreamItem::Stats(_) => {}
},
Err(e) => Err(e).unwrap(),
}
}
Ok(())
};
runfut(fut)
}
#[test]
fn time_bin_01() -> Result<(), Error> {
let fut = async {
let range = nano_range_from_str("1970-01-01T00:00:00Z", "1970-01-01T00:00:08Z")?;
let range = SeriesRange::TimeRange(range);
let min_bin_count = 8;
let binned_range = BinnedRangeEnum::covering_range(range, min_bin_count)?;
let v00 = ChannelEvents::Events(Box::new(EventsDim0::<f32>::empty()));
let evs0 = make_some_boxed_d0_f32(10, SEC * 1, MS * 500, 0, 1846713782);
let evs1 = make_some_boxed_d0_f32(10, SEC * 6, MS * 500, 0, 1846713781);
let v01 = ChannelEvents::Events(evs0);
let v02 = ChannelEvents::Events(evs1);
let stream0 = stream::iter(vec![
//
sitem_data(v00),
sitem_data(v01),
sitem_data(v02),
]);
let stream0 = stream0.then({
let mut i = 0;
move |x| {
let delay = if i == 1 { 2000 } else { 0 };
i += 1;
let dur = Duration::from_millis(delay);
async move {
tokio::time::sleep(dur).await;
x
}
}
});
let stream0 = Box::pin(stream0);
let mut binned_stream = TimeBinnedStream::new(stream0, binned_range, true);
while let Some(item) = binned_stream.next().await {
if true {
eprintln!("{item:?}");
}
match item {
Ok(item) => match item {
StreamItem::DataItem(item) => match item {
RangeCompletableItem::Data(item) => {
if let Some(_) = item.as_any_ref().downcast_ref::<BinsDim0<f32>>() {
} else {
return Err(Error::with_msg_no_trace(format!("bad, got item with unexpected type")));
}
}
RangeCompletableItem::RangeComplete => {}
},
StreamItem::Log(_) => {}
StreamItem::Stats(_) => {}
},
Err(e) => Err(e).unwrap(),
}
}
// TODO assert that we get the bins which are sure to be ready.
// TODO assert correct numbers.
// TODO assert that we don't get bins which may be still changing.
// TODO add similar test case with a RangeComplete event at different places before the timeout.
Ok(())
};
runfut(fut)
}
#[test]
fn time_bin_02() -> Result<(), Error> {
let fut = async {
let do_time_weight = true;
let deadline = Instant::now() + Duration::from_millis(4000);
let range = nano_range_from_str("1970-01-01T00:20:04Z", "1970-01-01T00:22:10Z")?;
let range = SeriesRange::TimeRange(range);
// TODO add test: 26 bins should result in next higher resolution.
let min_bin_count = 25;
let expected_bin_count = 26;
let binned_range = BinnedRangeEnum::covering_range(range.clone(), min_bin_count)?;
eprintln!("binned_range: {:?}", binned_range);
for i in 0.. {
if let Some(r) = binned_range.range_at(i) {
eprintln!("Series Range to cover: {r:?}");
} else {
break;
}
}
let event_range = binned_range.binned_range_time().full_range();
let series_range = SeriesRange::TimeRange(event_range);
// TODO the test stream must be able to generate also one-before (on demand) and RangeComplete (by default).
let stream = GenerateI32V00::new(0, 1, series_range, true);
// TODO apply first some box dyn EventTransform which later is provided by TransformQuery.
// Then the Merge will happen always by default for backends where this is needed.
// TODO then apply the transform chain for the after-merged-stream.
let stream = stream.map(|x| {
let x = on_sitemty_data!(x, |x| Ok(StreamItem::DataItem(RangeCompletableItem::Data(
Box::new(x) as Box<dyn TimeBinnable>
))));
x
});
let stream = Box::pin(stream);
let mut binned_stream = TimeBinnedStream::new(stream, binned_range.clone(), do_time_weight);
// From there on it should no longer be neccessary to distinguish whether its still events or time bins.
// Then, optionally collect for output type like json, or stream as batches.
// TODO the timebinner should already provide batches to make this efficient.
if false {
while let Some(e) = binned_stream.next().await {
eprintln!("see item {e:?}");
let _x = on_sitemty_data!(e, |e| {
//
Ok(StreamItem::DataItem(RangeCompletableItem::Data(e)))
});
}
} else {
let res = collect(binned_stream, deadline, 200, None, Some(binned_range)).await?;
assert_eq!(res.len(), expected_bin_count);
let d = res.to_json_result()?.to_json_bytes()?;
let s = String::from_utf8_lossy(&d);
eprintln!("{s}");
let jsval: JsValue = serde_json::from_slice(&d)?;
{
let ts_anchor = jsval.get("tsAnchor").unwrap().as_u64().unwrap();
assert_eq!(ts_anchor, 1200);
}
{
let counts = jsval.get("counts").unwrap().as_array().unwrap();
assert_eq!(counts.len(), expected_bin_count);
for v in counts {
assert_eq!(v.as_u64().unwrap(), 5);
}
}
{
let ts1ms = jsval.get("ts1Ms").unwrap().as_array().unwrap();
let mins = jsval.get("mins").unwrap().as_array().unwrap();
assert_eq!(mins.len(), expected_bin_count);
for (ts1ms, min) in ts1ms.iter().zip(mins) {
assert_eq!((ts1ms.as_u64().unwrap() / 100) % 1000, min.as_u64().unwrap());
}
}
{
let ts1ms = jsval.get("ts1Ms").unwrap().as_array().unwrap();
let maxs = jsval.get("maxs").unwrap().as_array().unwrap();
assert_eq!(maxs.len(), expected_bin_count);
for (ts1ms, max) in ts1ms.iter().zip(maxs) {
assert_eq!((40 + ts1ms.as_u64().unwrap() / 100) % 1000, max.as_u64().unwrap());
}
}
{
let range_final = jsval.get("rangeFinal").unwrap().as_bool().unwrap();
assert_eq!(range_final, true);
}
}
Ok(())
};
runfut(fut)
}
// Should fail because of missing empty item.
// But should have some option to suppress the error log for this test case.
#[test]
fn time_bin_03() -> Result<(), Error> {
// TODO re-enable with error log suppressed.
if true {
return Ok(());
}
let fut = async {
let range = nano_range_from_str("1970-01-01T00:00:00Z", "1970-01-01T00:00:08Z")?;
let range = SeriesRange::TimeRange(range);
let min_bin_count = 8;
let binned_range = BinnedRangeEnum::covering_range(range, min_bin_count)?;
let evs0 = make_some_boxed_d0_f32(10, SEC * 1, MS * 500, 0, 1846713782);
//let v00 = ChannelEvents::Events(Box::new(EventsDim0::<f32>::empty()));
let v01 = ChannelEvents::Events(evs0);
let v02 = ChannelEvents::Status(Some(ConnStatusEvent::new(MS * 100, ConnStatus::Connect)));
let v03 = ChannelEvents::Status(Some(ConnStatusEvent::new(MS * 6000, ConnStatus::Disconnect)));
let stream0 = Box::pin(stream::iter(vec![
//
//sitem_data(v00),
sitem_data(v02),
sitem_data(v01),
sitem_data(v03),
]));
let mut binned_stream = TimeBinnedStream::new(stream0, binned_range, true);
while let Some(item) = binned_stream.next().await {
eprintln!("{item:?}");
match item {
Err(e) => {
if e.to_string().contains("must emit but can not even create empty A") {
return Ok(());
} else {
return Err(Error::with_msg_no_trace("should not succeed"));
}
}
_ => {
return Err(Error::with_msg_no_trace("should not succeed"));
}
}
}
return Err(Error::with_msg_no_trace("should not succeed"));
};
runfut(fut)
}
// TODO add test case to observe RangeComplete after binning.
#[test]
fn transform_chain_correctness_00() -> Result<(), Error> {
// TODO
//type STY = f32;
//let empty = EventsDim0::<STY>::empty();
let tq = TransformQuery::default_time_binned();
build_event_transform(&tq)?;
Ok(())
}
#[test]
fn timebin_multi_stage_00() -> Result<(), Error> {
// TODO chain two timebin stages with different binning grid.
let fut = async {
let do_time_weight = true;
let one_before_range = do_time_weight;
let range = nano_range_from_str("1970-01-01T00:00:10Z", "1970-01-01T00:01:03Z")?;
let range = SeriesRange::TimeRange(range);
let binned_range_0 = BinnedRangeEnum::covering_range(range.clone(), 22)?;
dbg!(&binned_range_0);
let range: SeriesRange = binned_range_0.binned_range_time().to_nano_range().into();
let binned_range_1 = BinnedRangeEnum::covering_range(range.clone(), 48)?;
dbg!(&binned_range_1);
let stream_evs = GenerateI32V01::new(0, 1, range.clone(), one_before_range);
let exp1 = {
let mut bins = BinsDim0::<i32>::empty();
for i in 0..54 {
bins.push(
SEC * (10 + i),
SEC * (11 + i),
2,
20 + 2 * i as i32,
21 + 2 * i as i32,
20.5 + 2. * i as f32,
);
}
bins
};
let exp2 = {
let mut bins = BinsDim0::<i32>::empty();
for i in 0..27 {
bins.push(
SEC * (10 + 2 * i),
SEC * (12 + 2 * i),
4,
20 + 4 * i as i32,
23 + 4 * i as i32,
21.5 + 4. * i as f32,
);
}
bins
};
// NOTE:
// can store all bins in cache for which there is some non-empty bin following, or if the container has range-final.
let (q1tx, q1rx) = async_channel::bounded(128);
let (q2tx, q2rx) = async_channel::bounded(128);
let stream_evs = Box::pin(stream_evs);
let binned_stream = {
TimeBinnedStream::new(stream_evs, binned_range_1, do_time_weight).map(|x| {
//eprintln!("STAGE 1 -- {:?}", x);
x
})
};
let binned_stream = Itemclone::new(binned_stream, q1tx).map(|x| match x {
Ok(x) => x,
Err(e) => Err(e),
});
let binned_stream = {
TimeBinnedStream::new(Box::pin(binned_stream), binned_range_0, do_time_weight).map(|x| {
eprintln!("STAGE -- 2 {:?}", x);
x
})
};
let binned_stream = Itemclone::new(binned_stream, q2tx).map(|x| match x {
Ok(x) => x,
Err(e) => Err(e),
});
let mut have_range_final = false;
let mut binned_stream = binned_stream;
while let Some(item) = binned_stream.next().await {
//eprintln!("{item:?}");
match item {
Ok(item) => match item {
StreamItem::DataItem(item) => match item {
RangeCompletableItem::Data(item) => {
if let Some(item) = item.as_any_ref().downcast_ref::<BinsDim0<i32>>() {
if false {
eprintln!("-----------------------");
eprintln!("item {:?}", item);
eprintln!("-----------------------");
}
} else {
return Err(Error::with_msg_no_trace(format!("bad, got item with unexpected type")));
}
}
RangeCompletableItem::RangeComplete => {
have_range_final = true;
}
},
StreamItem::Log(_) => {}
StreamItem::Stats(_) => {}
},
Err(e) => Err(e).unwrap(),
}
}
assert!(have_range_final);
{
eprintln!("---------------------------------------------------------------------");
let mut coll = BinsDim0::empty();
let stream = q1rx;
while let Ok(item) = stream.recv().await {
//eprintln!("RECV [q1rx] {:?}", item);
on_sitemty_data!(item, |mut item: Box<dyn TimeBinned>| {
if let Some(k) = item.as_any_mut().downcast_mut::<BinsDim0<i32>>() {
coll.append_all_from(k);
}
sitem_data(item)
});
}
eprintln!("collected 1: {:?}", coll);
assert_eq!(coll, exp1);
}
{
eprintln!("---------------------------------------------------------------------");
let mut coll = BinsDim0::empty();
let stream = q2rx;
while let Ok(item) = stream.recv().await {
//eprintln!("RECV [q2rx] {:?}", item);
on_sitemty_data!(item, |mut item: Box<dyn TimeBinned>| {
if let Some(k) = item.as_any_mut().downcast_mut::<BinsDim0<i32>>() {
coll.append_all_from(k);
}
sitem_data(item)
});
}
eprintln!("collected 1: {:?}", coll);
assert_eq!(coll, exp2);
}
Ok(())
};
runfut(fut)
}

View File

@@ -0,0 +1,277 @@
use err::Error;
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::streamitem::sitem_data;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
use items_0::timebin::TimeBinnableTy;
use items_0::timebin::TimeBinnerTy;
use netpod::log::*;
use netpod::BinnedRangeEnum;
use std::any;
use std::fmt;
use std::ops::ControlFlow;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
#[allow(unused)]
macro_rules! trace2 {
($($arg:tt)*) => {};
($($arg:tt)*) => { trace!($($arg)*) };
}
#[allow(unused)]
macro_rules! trace3 {
($($arg:tt)*) => {};
($($arg:tt)*) => { trace!($($arg)*) };
}
#[allow(unused)]
macro_rules! trace4 {
($($arg:tt)*) => {};
($($arg:tt)*) => { trace!($($arg)*) };
}
type MergeInp<T> = Pin<Box<dyn Stream<Item = Sitemty<T>> + Send>>;
pub struct TimeBinnedStream<T>
where
T: TimeBinnableTy,
{
inp: MergeInp<T>,
range: BinnedRangeEnum,
do_time_weight: bool,
range_final: bool,
binner: Option<<T as TimeBinnableTy>::TimeBinner>,
done_first_input: bool,
done_data: bool,
done: bool,
complete: bool,
}
impl<T> fmt::Debug for TimeBinnedStream<T>
where
T: TimeBinnableTy,
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct(any::type_name::<Self>())
.field("range", &self.range)
.field("range_final", &self.range_final)
.field("binner", &self.binner)
.finish()
}
}
impl<T> TimeBinnedStream<T>
where
T: TimeBinnableTy,
{
pub fn new(inp: MergeInp<T>, range: BinnedRangeEnum, do_time_weight: bool) -> Self {
Self {
inp,
range,
do_time_weight,
range_final: false,
binner: None,
done_first_input: false,
done_data: false,
done: false,
complete: false,
}
}
fn process_item(&mut self, mut item: T) -> () {
trace2!("process_item {item:?}");
if self.binner.is_none() {
trace!("process_item call time_binner_new");
let binner = item.time_binner_new(self.range.clone(), self.do_time_weight);
self.binner = Some(binner);
}
let binner = self.binner.as_mut().unwrap();
trace2!("process_item call binner ingest");
binner.ingest(&mut item);
}
fn handle_data_item(
&mut self,
item: T,
) -> Result<ControlFlow<Poll<Sitemty<<<T as TimeBinnableTy>::TimeBinner as TimeBinnerTy>::Output>>>, Error> {
use ControlFlow::*;
use Poll::*;
trace2!("================= handle_data_item");
let item_len = item.len();
self.process_item(item);
let mut do_emit = false;
if self.done_first_input == false {
debug!(
"emit container after the first input len {} binner {}",
item_len,
self.binner.is_some()
);
if self.binner.is_none() {
let e = Error::with_msg_no_trace("must emit on first input but no binner");
self.done = true;
return Err(e);
}
do_emit = true;
self.done_first_input = true;
}
if let Some(binner) = self.binner.as_mut() {
trace3!("bins ready count {}", binner.bins_ready_count());
if binner.bins_ready_count() > 0 {
do_emit = true
}
if do_emit {
if let Some(bins) = binner.bins_ready() {
Ok(Break(Ready(sitem_data(bins))))
} else {
if let Some(bins) = binner.empty() {
Ok(Break(Ready(sitem_data(bins))))
} else {
let e = Error::with_msg_no_trace("must emit but can not even create empty A");
error!("{e}");
Err(e)
}
}
} else {
trace3!("not emit");
Ok(ControlFlow::Continue(()))
}
} else {
warn!("processed item, but no binner yet");
Ok(ControlFlow::Continue(()))
}
}
fn handle_item(
&mut self,
item: Result<StreamItem<RangeCompletableItem<T>>, Error>,
) -> Result<ControlFlow<Poll<Sitemty<<<T as TimeBinnableTy>::TimeBinner as TimeBinnerTy>::Output>>>, Error> {
use ControlFlow::*;
use Poll::*;
trace2!("================= handle_item");
match item {
Ok(item) => match item {
StreamItem::DataItem(item) => match item {
RangeCompletableItem::RangeComplete => {
debug!("see RangeComplete");
self.range_final = true;
Ok(Continue(()))
}
RangeCompletableItem::Data(item) => self.handle_data_item(item),
},
StreamItem::Log(item) => Ok(Break(Ready(Ok(StreamItem::Log(item))))),
StreamItem::Stats(item) => Ok(Break(Ready(Ok(StreamItem::Stats(item))))),
},
Err(e) => {
error!("received error item: {e}");
self.done = true;
Err(e)
}
}
}
fn handle_none(
&mut self,
) -> Result<ControlFlow<Poll<Sitemty<<<T as TimeBinnableTy>::TimeBinner as TimeBinnerTy>::Output>>>, Error> {
use ControlFlow::*;
use Poll::*;
trace2!("================= handle_none");
let self_range_final = self.range_final;
if let Some(binner) = self.binner.as_mut() {
trace!("bins ready count before finish {}", binner.bins_ready_count());
// TODO rework the finish logic
if self_range_final {
binner.set_range_complete();
}
binner.push_in_progress(false);
trace!("bins ready count after finish {}", binner.bins_ready_count());
if let Some(bins) = binner.bins_ready() {
self.done_data = true;
Ok(Break(Ready(sitem_data(bins))))
} else {
if let Some(bins) = binner.empty() {
self.done_data = true;
Ok(Break(Ready(sitem_data(bins))))
} else {
let e = Error::with_msg_no_trace("must emit but can not even create empty B");
error!("{e}");
self.done_data = true;
Err(e)
}
}
} else {
warn!("input stream finished, still no binner");
self.done_data = true;
let e = Error::with_msg_no_trace(format!("input stream finished, still no binner"));
Err(e)
}
}
// TODO
// Original block inside the poll loop was able to:
// continue
// break with Poll<Option<Item>>
fn poll_input(
&mut self,
cx: &mut Context,
) -> Result<ControlFlow<Poll<Sitemty<<<T as TimeBinnableTy>::TimeBinner as TimeBinnerTy>::Output>>>, Error> {
use ControlFlow::*;
use Poll::*;
trace2!("================= poll_input");
match self.inp.poll_next_unpin(cx) {
Ready(Some(item)) => self.handle_item(item),
Ready(None) => self.handle_none(),
Pending => Ok(Break(Pending)),
}
}
}
impl<T> Stream for TimeBinnedStream<T>
where
T: TimeBinnableTy + Unpin,
{
type Item = Sitemty<<<T as TimeBinnableTy>::TimeBinner as TimeBinnerTy>::Output>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
let span = span!(Level::INFO, "TimeBinner");
let _spg = span.enter();
trace2!("================= POLL");
loop {
break if self.complete {
panic!("TimeBinnedStream poll on complete")
} else if self.done {
self.complete = true;
Ready(None)
} else if self.done_data {
self.done = true;
if self.range_final {
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))))
} else {
continue;
}
} else {
match self.poll_input(cx) {
Ok(item) => match item {
ControlFlow::Continue(()) => continue,
ControlFlow::Break(item) => match item {
Ready(item) => break Ready(Some(item)),
Pending => break Pending,
},
},
Err(e) => {
self.done = true;
break Ready(Some(Err(e)));
}
}
};
}
}
}
//impl<T> WithTransformProperties for TimeBinnedStream<T> where T: TimeBinnableTy {}
//impl<T> TimeBinnableStreamTrait for TimeBinnedStream<T> where T: TimeBinnableTy {}

View File

@@ -0,0 +1,120 @@
use crate::collect::Collect;
use crate::rangefilter2::RangeFilter2;
use crate::tcprawclient::open_tcp_streams;
use crate::timebin::TimeBinnedStream;
use crate::transform::build_merged_event_transform;
use crate::transform::EventsToTimeBinnable;
use err::Error;
use futures_util::future::BoxFuture;
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::collect_s::Collectable;
use items_0::on_sitemty_data;
use items_0::streamitem::Sitemty;
use items_0::timebin::TimeBinned;
use items_0::transform::TimeBinnableStreamBox;
use items_0::transform::TimeBinnableStreamTrait;
use items_0::Events;
use items_2::channelevents::ChannelEvents;
use items_2::merger::Merger;
use items_2::streams::PlainEventStream;
use netpod::log::*;
use netpod::range::evrange::NanoRange;
use netpod::BinnedRangeEnum;
use netpod::ChannelTypeConfigGen;
use netpod::Cluster;
use query::api4::binned::BinnedQuery;
use query::api4::events::EventsSubQuery;
use query::api4::events::EventsSubQuerySelect;
use query::api4::events::EventsSubQuerySettings;
use serde_json::Value as JsonValue;
use std::pin::Pin;
use std::time::Instant;
#[allow(unused)]
fn assert_stream_send<'u, R>(stream: impl 'u + Send + Stream<Item = R>) -> impl 'u + Send + Stream<Item = R> {
stream
}
async fn timebinnable_stream(
query: BinnedQuery,
range: NanoRange,
one_before_range: bool,
ch_conf: ChannelTypeConfigGen,
cluster: Cluster,
) -> Result<TimeBinnableStreamBox, Error> {
let select = EventsSubQuerySelect::new(ch_conf, range.clone().into(), query.transform().clone());
let settings = EventsSubQuerySettings::from(&query);
let subq = EventsSubQuery::from_parts(select, settings);
let mut tr = build_merged_event_transform(subq.transform())?;
let inps = open_tcp_streams::<ChannelEvents>(subq, &cluster).await?;
// TODO propagate also the max-buf-len for the first stage event reader.
// TODO use a mixture of count and byte-size as threshold.
let stream = Merger::new(inps, query.merger_out_len_max());
let stream = RangeFilter2::new(stream, range, one_before_range);
let stream = stream.map(move |k| {
on_sitemty_data!(k, |k| {
let k: Box<dyn Events> = Box::new(k);
trace!("got len {}", k.len());
let k = tr.0.transform(k);
Ok(StreamItem::DataItem(RangeCompletableItem::Data(k)))
})
});
let stream = PlainEventStream::new(stream);
let stream = EventsToTimeBinnable::new(stream);
let stream = Box::pin(stream);
Ok(TimeBinnableStreamBox(stream))
}
async fn timebinned_stream(
query: BinnedQuery,
binned_range: BinnedRangeEnum,
ch_conf: ChannelTypeConfigGen,
cluster: Cluster,
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<Box<dyn TimeBinned>>> + Send>>, Error> {
let range = binned_range.binned_range_time().to_nano_range();
let do_time_weight = true;
let one_before_range = true;
let stream = timebinnable_stream(query.clone(), range, one_before_range, ch_conf, cluster).await?;
let stream: Pin<Box<dyn TimeBinnableStreamTrait>> = stream.0;
let stream = Box::pin(stream);
// TODO rename TimeBinnedStream to make it more clear that it is the component which initiates the time binning.
let stream = TimeBinnedStream::new(stream, binned_range, do_time_weight);
let stream: Pin<Box<dyn Stream<Item = Sitemty<Box<dyn TimeBinned>>> + Send>> = Box::pin(stream);
Ok(stream)
}
fn timebinned_to_collectable(
stream: Pin<Box<dyn Stream<Item = Sitemty<Box<dyn TimeBinned>>> + Send>>,
) -> Pin<Box<dyn Stream<Item = Sitemty<Box<dyn Collectable>>> + Send>> {
let stream = stream.map(|k| {
on_sitemty_data!(k, |k| {
let k: Box<dyn Collectable> = Box::new(k);
trace!("got len {}", k.len());
Ok(StreamItem::DataItem(RangeCompletableItem::Data(k)))
})
});
let stream: Pin<Box<dyn Stream<Item = Sitemty<Box<dyn Collectable>>> + Send>> = Box::pin(stream);
stream
}
pub async fn timebinned_json(
query: BinnedQuery,
ch_conf: ChannelTypeConfigGen,
cluster: Cluster,
) -> Result<JsonValue, Error> {
let deadline = Instant::now().checked_add(query.timeout_value()).unwrap();
let binned_range = BinnedRangeEnum::covering_range(query.range().clone(), query.bin_count())?;
let collect_max = 10000;
let stream = timebinned_stream(query.clone(), binned_range.clone(), ch_conf, cluster).await?;
let stream = timebinned_to_collectable(stream);
let collected = Collect::new(stream, deadline, collect_max, None, Some(binned_range));
let collected: BoxFuture<_> = Box::pin(collected);
let collected = collected.await?;
let jsval = serde_json::to_value(&collected)?;
Ok(jsval)
}

View File

@@ -0,0 +1,194 @@
use err::Error;
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::collect_s::Collectable;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
use items_0::timebin::TimeBinnable;
use items_0::transform::CollectableStreamBox;
use items_0::transform::CollectableStreamTrait;
use items_0::transform::EventStreamBox;
use items_0::transform::EventStreamTrait;
use items_0::transform::TimeBinnableStreamBox;
use items_0::transform::TimeBinnableStreamTrait;
use items_0::transform::TransformEvent;
use items_0::transform::TransformProperties;
use items_0::transform::WithTransformProperties;
use items_2::transform::make_transform_identity;
use items_2::transform::make_transform_min_max_avg;
use items_2::transform::make_transform_pulse_id_diff;
use query::transform::EventTransformQuery;
use query::transform::TimeBinningTransformQuery;
use query::transform::TransformQuery;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
pub fn build_event_transform(tr: &TransformQuery) -> Result<TransformEvent, Error> {
let trev = tr.get_tr_event();
match trev {
EventTransformQuery::ValueFull => Ok(make_transform_identity()),
EventTransformQuery::MinMaxAvgDev => Ok(make_transform_min_max_avg()),
EventTransformQuery::ArrayPick(..) => Err(Error::with_msg_no_trace(format!(
"build_event_transform don't know what to do {trev:?}"
))),
EventTransformQuery::PulseIdDiff => Ok(make_transform_pulse_id_diff()),
EventTransformQuery::EventBlobsVerbatim => Err(Error::with_msg_no_trace(format!(
"build_event_transform don't know what to do {trev:?}"
))),
EventTransformQuery::EventBlobsUncompressed => Err(Error::with_msg_no_trace(format!(
"build_event_transform don't know what to do {trev:?}"
))),
}
}
pub fn build_merged_event_transform(tr: &TransformQuery) -> Result<TransformEvent, Error> {
let trev = tr.get_tr_event();
match trev {
EventTransformQuery::PulseIdDiff => Ok(make_transform_pulse_id_diff()),
_ => Ok(make_transform_identity()),
}
}
pub struct EventsToTimeBinnable {
inp: Pin<Box<dyn EventStreamTrait>>,
}
impl EventsToTimeBinnable {
pub fn new<INP>(inp: INP) -> Self
where
INP: EventStreamTrait + 'static,
{
Self { inp: Box::pin(inp) }
}
}
impl Stream for EventsToTimeBinnable {
type Item = Sitemty<Box<dyn TimeBinnable>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
match self.inp.poll_next_unpin(cx) {
Ready(Some(item)) => Ready(Some(match item {
Ok(item) => Ok(match item {
StreamItem::DataItem(item) => StreamItem::DataItem(match item {
RangeCompletableItem::RangeComplete => RangeCompletableItem::RangeComplete,
RangeCompletableItem::Data(item) => RangeCompletableItem::Data(Box::new(item)),
}),
StreamItem::Log(item) => StreamItem::Log(item),
StreamItem::Stats(item) => StreamItem::Stats(item),
}),
Err(e) => Err(e),
})),
Ready(None) => Ready(None),
Pending => Pending,
}
}
}
impl WithTransformProperties for EventsToTimeBinnable {
fn query_transform_properties(&self) -> TransformProperties {
self.inp.query_transform_properties()
}
}
impl TimeBinnableStreamTrait for EventsToTimeBinnable {}
pub struct TimeBinnableToCollectable {
inp: Pin<Box<dyn TimeBinnableStreamTrait>>,
}
impl TimeBinnableToCollectable {
pub fn new<INP>(inp: INP) -> Self
where
INP: TimeBinnableStreamTrait + 'static,
{
Self { inp: Box::pin(inp) }
}
}
impl Stream for TimeBinnableToCollectable {
type Item = Sitemty<Box<dyn Collectable>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
match self.inp.poll_next_unpin(cx) {
Ready(Some(item)) => Ready(Some(match item {
Ok(item) => Ok(match item {
StreamItem::DataItem(item) => StreamItem::DataItem(match item {
RangeCompletableItem::RangeComplete => RangeCompletableItem::RangeComplete,
RangeCompletableItem::Data(item) => RangeCompletableItem::Data(Box::new(item)),
}),
StreamItem::Log(item) => StreamItem::Log(item),
StreamItem::Stats(item) => StreamItem::Stats(item),
}),
Err(e) => Err(e),
})),
Ready(None) => Ready(None),
Pending => Pending,
}
}
}
impl WithTransformProperties for TimeBinnableToCollectable {
fn query_transform_properties(&self) -> TransformProperties {
self.inp.query_transform_properties()
}
}
impl CollectableStreamTrait for TimeBinnableToCollectable {}
//impl CollectableStreamTrait for Pin<Box<TimeBinnableToCollectable>> {}
pub fn build_time_binning_transform(
tr: &TransformQuery,
inp: Pin<Box<dyn TimeBinnableStreamTrait>>,
) -> Result<TimeBinnableStreamBox, Error> {
let trev = tr.get_tr_time_binning();
let res = match trev {
TimeBinningTransformQuery::None => TimeBinnableStreamBox(inp),
_ => {
// TODO apply the desired transformations.
todo!()
}
};
Ok(res)
}
pub fn build_full_transform_collectable(
tr: &TransformQuery,
inp: EventStreamBox,
) -> Result<CollectableStreamBox, Error> {
// TODO this must return a Stream!
//let evs = build_event_transform(tr, inp)?;
let trtb = tr.get_tr_time_binning();
use futures_util::Stream;
use items_0::collect_s::Collectable;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
use std::pin::Pin;
let a: Pin<Box<dyn Stream<Item = Sitemty<Box<dyn Collectable>>> + Send>> = Box::pin(inp.0.map(|item| match item {
Ok(item) => match item {
StreamItem::DataItem(item) => match item {
RangeCompletableItem::Data(item) => {
let item: Box<dyn Collectable> = Box::new(item);
Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))
}
RangeCompletableItem::RangeComplete => Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)),
},
StreamItem::Log(item) => Ok(StreamItem::Log(item)),
StreamItem::Stats(item) => Ok(StreamItem::Stats(item)),
},
Err(e) => Err(e),
}));
let stream: Pin<Box<dyn Stream<Item = Sitemty<Box<dyn Collectable>>> + Send>> =
Box::pin(futures_util::stream::empty());
let stream = Box::pin(futures_util::stream::empty()) as _;
match trtb {
TimeBinningTransformQuery::None => Ok(CollectableStreamBox(stream)),
TimeBinningTransformQuery::TimeWeighted => todo!(),
TimeBinningTransformQuery::Unweighted => todo!(),
}
}