Can show pulse id diff from sf-databuffer

This commit is contained in:
Dominik Werder
2023-03-22 14:07:05 +01:00
parent d1c10e1712
commit e53b328f21
24 changed files with 421 additions and 224 deletions

View File

@@ -310,7 +310,7 @@ impl EventsDynStream {
let st = &scalar_type;
let sh = &shape;
let ag = &agg_kind;
error!("TODO feed through transform?");
warn!("TODO EventsDynStream::new feed through transform");
// TODO do we need/want the empty item from here?
let events_out = items_2::empty::empty_events_dyn_ev(st, sh)?;
let scalar_conv = make_scalar_conv(st, sh, ag)?;
@@ -336,7 +336,7 @@ impl EventsDynStream {
fn replace_events_out(&mut self) -> Result<Box<dyn Events>, Error> {
let st = &self.scalar_type;
let sh = &self.shape;
error!("TODO feed through transform?");
error!("TODO replace_events_out feed through transform");
// TODO do we need/want the empty item from here?
let empty = items_2::empty::empty_events_dyn_ev(st, sh)?;
let evs = mem::replace(&mut self.events_out, empty);

View File

@@ -99,7 +99,7 @@ impl Stream for MergedBlobsFromRemotes {
if c1 == self.tcp_establish_futs.len() {
let inps = self.nodein.iter_mut().map(|k| k.take().unwrap()).collect();
// TODO set out_max_len dynamically
let s1 = Merger::new(inps, 1);
let s1 = Merger::new(inps, 128);
self.merged = Some(Box::pin(s1));
}
continue 'outer;

View File

@@ -57,6 +57,7 @@ pub async fn make_event_pipe(
evq: &PlainEventsQuery,
node_config: &NodeConfigCached,
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>>, Error> {
info!("---------- disk::raw::conn::make_event_pipe");
if false {
match dbconn::channel_exists(&evq.channel(), &node_config).await {
Ok(_) => (),
@@ -86,9 +87,9 @@ pub async fn make_event_pipe(
// TODO should not need this for correctness.
// Should limit based on return size and latency.
let out_max_len = if node_config.node_config.cluster.is_central_storage {
1
128
} else {
1
128
};
let event_blobs = EventChunkerMultifile::new(
(&range).try_into()?,
@@ -167,9 +168,9 @@ pub fn make_local_event_blobs_stream(
// TODO should not need this for correctness.
// Should limit based on return size and latency.
let out_max_len = if node_config.node_config.cluster.is_central_storage {
1
128
} else {
1
128
};
let event_blobs = EventChunkerMultifile::new(
range,
@@ -213,9 +214,9 @@ pub fn make_remote_event_blobs_stream(
// TODO should not need this for correctness.
// Should limit based on return size and latency.
let out_max_len = if node_config.node_config.cluster.is_central_storage {
1
128
} else {
1
128
};
let event_blobs = EventChunkerMultifile::new(
range,
@@ -274,11 +275,6 @@ pub async fn make_event_blobs_pipe(
DiskIoTune::default(),
node_config,
)?;
/*let s = event_blobs.map(|item: ItemType| Box::new(item) as Box<dyn Framable + Send>);
//let s = tracing_futures::Instrumented::instrument(s, tracing::info_span!("make_event_blobs_pipe"));
let pipe: Pin<Box<dyn Stream<Item = Box<dyn Framable + Send>> + Send>>;
pipe = Box::pin(s);
pipe*/
Box::pin(event_blobs) as _
} else {
let event_blobs = make_remote_event_blobs_stream(

View File

@@ -1,12 +1,15 @@
use crate::AsAnyMut;
use crate::AsAnyRef;
use crate::Events;
use crate::TimeBinned;
use crate::TypeName;
use crate::WithLen;
use err::Error;
use netpod::log::*;
use netpod::range::evrange::SeriesRange;
use netpod::BinnedRangeEnum;
use serde::Serialize;
use std::any;
use std::any::Any;
use std::fmt;
@@ -88,12 +91,23 @@ where
T: fmt::Debug + CollectorType + 'static,
{
fn ingest(&mut self, src: &mut dyn Collectable) {
let x = src.as_any_mut().downcast_mut();
if x.is_none() {
warn!("TODO handle the case of incoming Box");
if let Some(src) = src.as_any_mut().downcast_mut::<<T as CollectorType>::Input>() {
info!("sees incoming &mut ref");
T::ingest(self, src)
} else {
if let Some(src) = src.as_any_mut().downcast_mut::<Box<<T as CollectorType>::Input>>() {
info!("sees incoming &mut Box");
T::ingest(self, src)
} else {
error!(
"No idea what this is. Expect: {} input {} got: {} {:?}",
any::type_name::<T>(),
any::type_name::<<T as CollectorType>::Input>(),
src.type_name(),
src
);
}
}
let src: &mut <T as CollectorType>::Input = x.expect("can not downcast");
T::ingest(self, src)
}
fn set_range_complete(&mut self) {
@@ -115,15 +129,21 @@ where
}
// TODO rename to `Typed`
pub trait CollectableType: fmt::Debug + AsAnyRef + AsAnyMut {
pub trait CollectableType: fmt::Debug + AsAnyRef + AsAnyMut + TypeName {
type Collector: CollectorType<Input = Self>;
fn new_collector() -> Self::Collector;
}
pub trait Collectable: fmt::Debug + AsAnyRef + AsAnyMut {
pub trait Collectable: fmt::Debug + AsAnyRef + AsAnyMut + TypeName {
fn new_collector(&self) -> Box<dyn Collector>;
}
impl TypeName for Box<dyn Events> {
fn type_name(&self) -> String {
self.as_ref().type_name()
}
}
impl Collectable for Box<dyn Events> {
fn new_collector(&self) -> Box<dyn Collector> {
self.as_ref().new_collector()
@@ -139,6 +159,12 @@ where
}
}
impl TypeName for Box<dyn Collectable> {
fn type_name(&self) -> String {
self.as_ref().type_name()
}
}
// TODO do this with some blanket impl:
impl Collectable for Box<dyn Collectable> {
fn new_collector(&self) -> Box<dyn Collector> {
@@ -146,13 +172,19 @@ impl Collectable for Box<dyn Collectable> {
}
}
impl WithLen for Box<dyn crate::TimeBinned> {
impl WithLen for Box<dyn TimeBinned> {
fn len(&self) -> usize {
self.as_ref().len()
}
}
impl Collectable for Box<dyn crate::TimeBinned> {
impl TypeName for Box<dyn TimeBinned> {
fn type_name(&self) -> String {
self.as_ref().type_name()
}
}
impl Collectable for Box<dyn TimeBinned> {
fn new_collector(&self) -> Box<dyn Collector> {
self.as_ref().new_collector()
}

View File

@@ -5,6 +5,7 @@ pub mod isodate;
pub mod scalar_ops;
pub mod streamitem;
pub mod subfr;
pub mod transform;
pub mod bincode {
pub use bincode::*;
@@ -79,7 +80,7 @@ where
}
/// Data in time-binned form.
pub trait TimeBinned: Any + TimeBinnable + Collectable + erased_serde::Serialize {
pub trait TimeBinned: Any + TypeName + TimeBinnable + Collectable + erased_serde::Serialize {
fn as_time_binnable_dyn(&self) -> &dyn TimeBinnable;
fn as_collectable_mut(&mut self) -> &mut dyn Collectable;
fn edges_slice(&self) -> (&[u64], &[u64]);
@@ -187,30 +188,3 @@ impl PartialEq for Box<dyn Events> {
Events::partial_eq_dyn(self.as_ref(), other.as_ref())
}
}
pub struct TransformProperties {
pub needs_one_before_range: bool,
pub needs_value: bool,
}
pub trait EventTransform {
fn query_transform_properties(&self) -> TransformProperties;
}
impl<T> EventTransform for Box<T>
where
T: EventTransform,
{
fn query_transform_properties(&self) -> TransformProperties {
self.as_ref().query_transform_properties()
}
}
impl<T> EventTransform for std::pin::Pin<Box<T>>
where
T: EventTransform,
{
fn query_transform_properties(&self) -> TransformProperties {
self.as_ref().query_transform_properties()
}
}

View File

@@ -80,6 +80,19 @@ macro_rules! on_sitemty_range_complete {
};
}
#[macro_export]
macro_rules! on_sitemty_data {
($item:expr, $ex:expr) => {
if let Ok($crate::streamitem::StreamItem::DataItem($crate::streamitem::RangeCompletableItem::Data(item))) =
$item
{
$ex(item)
} else {
$item
}
};
}
pub fn sitem_data<X>(x: X) -> Sitemty<X> {
Ok(StreamItem::DataItem(RangeCompletableItem::Data(x)))
}

57
items_0/src/transform.rs Normal file
View File

@@ -0,0 +1,57 @@
use std::pin;
use crate::Events;
pub struct TransformProperties {
pub needs_one_before_range: bool,
pub needs_value: bool,
}
pub trait EventTransform {
fn query_transform_properties(&self) -> TransformProperties;
fn transform(&mut self, src: Box<dyn Events>) -> Box<dyn Events>;
}
impl<T> EventTransform for Box<T>
where
T: EventTransform,
{
fn query_transform_properties(&self) -> TransformProperties {
self.as_ref().query_transform_properties()
}
fn transform(&mut self, src: Box<dyn Events>) -> Box<dyn Events> {
todo!()
}
}
impl<T> EventTransform for pin::Pin<Box<T>>
where
T: EventTransform,
{
fn query_transform_properties(&self) -> TransformProperties {
self.as_ref().query_transform_properties()
}
fn transform(&mut self, src: Box<dyn Events>) -> Box<dyn Events> {
todo!()
}
}
pub struct IdentityTransform {}
impl IdentityTransform {
pub fn default() -> Self {
Self {}
}
}
impl EventTransform for IdentityTransform {
fn transform(&mut self, src: Box<dyn Events>) -> Box<dyn Events> {
src
}
fn query_transform_properties(&self) -> TransformProperties {
todo!()
}
}

View File

@@ -11,7 +11,7 @@ use items_0::collect_s::ToJsonResult;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
use items_0::EventTransform;
use items_0::transform::EventTransform;
use items_0::TimeBinnable;
use items_0::TimeBinner;
use netpod::log::*;

View File

@@ -17,6 +17,7 @@ use items_0::TimeBinnable;
use items_0::TimeBinned;
use items_0::TimeBinner;
use items_0::TimeBins;
use items_0::TypeName;
use items_0::WithLen;
use netpod::is_false;
use netpod::log::*;
@@ -50,6 +51,12 @@ pub struct BinsDim0<NTY> {
pub dim0kind: Option<Dim0Kind>,
}
impl<STY> TypeName for BinsDim0<STY> {
fn type_name(&self) -> String {
any::type_name::<Self>().into()
}
}
impl<NTY> fmt::Debug for BinsDim0<NTY>
where
NTY: fmt::Debug,

View File

@@ -20,6 +20,7 @@ use items_0::TimeBinnable;
use items_0::TimeBinned;
use items_0::TimeBinner;
use items_0::TimeBins;
use items_0::TypeName;
use items_0::WithLen;
use netpod::is_false;
use netpod::log::*;
@@ -27,8 +28,8 @@ use netpod::range::evrange::NanoRange;
use netpod::range::evrange::SeriesRange;
use netpod::timeunits::SEC;
use netpod::BinnedRangeEnum;
use netpod::Dim0Kind;
use netpod::CmpZero;
use netpod::Dim0Kind;
use serde::Deserialize;
use serde::Serialize;
use std::any;
@@ -56,6 +57,12 @@ pub struct BinsXbinDim0<NTY> {
dim0kind: Option<Dim0Kind>,
}
impl<STY> TypeName for BinsXbinDim0<STY> {
fn type_name(&self) -> String {
any::type_name::<Self>().into()
}
}
impl<NTY> fmt::Debug for BinsXbinDim0<NTY>
where
NTY: fmt::Debug,

View File

@@ -1,6 +1,7 @@
use crate::framable::FrameType;
use crate::merger::Mergeable;
use crate::Events;
use items_0::TypeName;
use items_0::collect_s::Collectable;
use items_0::collect_s::Collected;
use items_0::collect_s::Collector;
@@ -16,6 +17,7 @@ use netpod::range::evrange::SeriesRange;
use netpod::BinnedRangeEnum;
use serde::Deserialize;
use serde::Serialize;
use std::any;
use std::any::Any;
use std::fmt;
use std::time::Duration;
@@ -106,6 +108,12 @@ pub enum ChannelEvents {
Status(Option<ConnStatusEvent>),
}
impl TypeName for ChannelEvents {
fn type_name(&self) -> String {
any::type_name::<Self>().into()
}
}
impl FrameTypeInnerStatic for ChannelEvents {
const FRAME_TYPE_ID: u32 = ITEMS_2_CHANNEL_EVENTS_FRAME_TYPE_ID;
}

View File

@@ -21,6 +21,7 @@ use items_0::EventsNonObj;
use items_0::MergeError;
use items_0::TimeBinnable;
use items_0::TimeBinner;
use items_0::TypeName;
use items_0::WithLen;
use netpod::is_false;
use netpod::log::*;
@@ -747,10 +748,11 @@ impl<NTY: ScalarOps> TimeBinnable for EventsDim0<NTY> {
}
}
impl<STY> items_0::TypeName for EventsDim0<STY> {
impl<STY> TypeName for EventsDim0<STY> {
fn type_name(&self) -> String {
let sty = std::any::type_name::<STY>();
format!("EventsDim0<{sty}>")
let self_name = any::type_name::<Self>();
let sty = any::type_name::<STY>();
format!("EventsDim0<{sty}> aka {self_name}<{sty}>")
}
}

View File

@@ -1,5 +1,4 @@
use crate::binsxbindim0::BinsXbinDim0;
use items_0::container::ByteEstimate;
use crate::IsoDateTime;
use crate::RangeOverlapInfo;
use crate::TimeBinnableType;
@@ -10,10 +9,12 @@ use items_0::collect_s::Collected;
use items_0::collect_s::CollectorType;
use items_0::collect_s::ToJsonBytes;
use items_0::collect_s::ToJsonResult;
use items_0::container::ByteEstimate;
use items_0::scalar_ops::ScalarOps;
use items_0::AsAnyMut;
use items_0::AsAnyRef;
use items_0::Empty;
use items_0::TypeName;
use items_0::WithLen;
use netpod::is_false;
use netpod::log::*;
@@ -56,6 +57,12 @@ impl<NTY> EventsXbinDim0<NTY> {
}
}
impl<STY> TypeName for EventsXbinDim0<STY> {
fn type_name(&self) -> String {
any::type_name::<Self>().into()
}
}
impl<NTY> fmt::Debug for EventsXbinDim0<NTY>
where
NTY: fmt::Debug,

View File

@@ -1,19 +0,0 @@
use items_0::Events;
pub trait EventTransform {
fn transform(&mut self, src: Box<dyn Events>) -> Box<dyn Events>;
}
pub struct IdentityTransform {}
impl IdentityTransform {
pub fn default() -> Self {
Self {}
}
}
impl EventTransform for IdentityTransform {
fn transform(&mut self, src: Box<dyn Events>) -> Box<dyn Events> {
src
}
}

View File

@@ -7,7 +7,6 @@ pub mod eventfull;
pub mod eventsdim0;
pub mod eventsdim1;
pub mod eventsxbindim0;
pub mod eventtransform;
pub mod framable;
pub mod frame;
pub mod inmem;
@@ -24,6 +23,7 @@ use chrono::TimeZone;
use chrono::Utc;
use futures_util::Stream;
use items_0::streamitem::Sitemty;
use items_0::transform::EventTransform;
use items_0::Empty;
use items_0::Events;
use items_0::MergeError;
@@ -199,9 +199,9 @@ pub trait TimeBinnableTypeAggregator: Send {
fn result_reset(&mut self, range: SeriesRange, expand: bool) -> Self::Output;
}
pub trait ChannelEventsInput: Stream<Item = Sitemty<ChannelEvents>> + items_0::EventTransform + Send {}
pub trait ChannelEventsInput: Stream<Item = Sitemty<ChannelEvents>> + EventTransform + Send {}
impl<T> ChannelEventsInput for T where T: Stream<Item = Sitemty<ChannelEvents>> + items_0::EventTransform + Send {}
impl<T> ChannelEventsInput for T where T: Stream<Item = Sitemty<ChannelEvents>> + EventTransform + Send {}
pub fn runfut<T, F>(fut: F) -> Result<T, err::Error>
where

View File

@@ -7,6 +7,8 @@ use items_0::streamitem::sitem_data;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
use items_0::transform::EventTransform;
use items_0::transform::TransformProperties;
use items_0::MergeError;
use items_0::WithLen;
use netpod::log::*;
@@ -412,8 +414,12 @@ where
}
}
impl<T> items_0::EventTransform for Merger<T> {
fn query_transform_properties(&self) -> items_0::TransformProperties {
impl<T> EventTransform for Merger<T> {
fn query_transform_properties(&self) -> TransformProperties {
todo!()
}
fn transform(&mut self, src: Box<dyn items_0::Events>) -> Box<dyn items_0::Events> {
todo!()
}
}

View File

@@ -2,8 +2,8 @@ use futures_util::Future;
use futures_util::FutureExt;
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::EventTransform;
use items_0::TransformProperties;
use items_0::transform::EventTransform;
use items_0::transform::TransformProperties;
use std::collections::VecDeque;
use std::pin::Pin;
use std::task::Context;
@@ -47,6 +47,10 @@ impl<T> EventTransform for Enumerate2<T> {
fn query_transform_properties(&self) -> TransformProperties {
todo!()
}
fn transform(&mut self, src: Box<dyn items_0::Events>) -> Box<dyn items_0::Events> {
todo!()
}
}
pub struct Then2<T, F, Fut> {
@@ -118,6 +122,10 @@ impl<T, F, Fut> EventTransform for Then2<T, F, Fut> {
fn query_transform_properties(&self) -> TransformProperties {
todo!()
}
fn transform(&mut self, src: Box<dyn items_0::Events>) -> Box<dyn items_0::Events> {
todo!()
}
}
pub trait TransformerExt {
@@ -182,4 +190,8 @@ impl<T> EventTransform for VecStream<T> {
fn query_transform_properties(&self) -> TransformProperties {
todo!()
}
fn transform(&mut self, src: Box<dyn items_0::Events>) -> Box<dyn items_0::Events> {
todo!()
}
}

View File

@@ -1,6 +1,8 @@
use err::Error;
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::on_sitemty_data;
use items_0::streamitem::LogItem;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
@@ -11,7 +13,6 @@ use items_2::channelevents::ChannelEvents;
use items_2::framable::EventQueryJsonStringFrame;
use items_2::framable::Framable;
use items_2::frame::decode_frame;
use items_2::frame::make_error_frame;
use items_2::frame::make_term_frame;
use netpod::histo::HistoLog2;
use netpod::log::*;
@@ -27,6 +28,8 @@ use tokio::net::tcp::OwnedWriteHalf;
use tokio::net::TcpStream;
use tracing::Instrument;
use crate::scylla::scylla_channel_event_stream;
#[cfg(test)]
mod test;
@@ -62,6 +65,7 @@ async fn make_channel_events_stream(
evq: PlainEventsQuery,
node_config: &NodeConfigCached,
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>>, Error> {
info!("nodenet::conn::make_channel_events_stream");
if evq.channel().backend() == "test-inmem" {
warn!("TEST BACKEND DATA");
use netpod::timeunits::MS;
@@ -95,107 +99,8 @@ async fn make_channel_events_stream(
let stream = futures_util::stream::empty();
Ok(Box::pin(stream))
}
} else if let Some(conf) = &node_config.node_config.cluster.scylla {
// TODO depends in general on the query
// TODO why both in PlainEventsQuery and as separate parameter? Check other usages.
let do_one_before_range = false;
// TODO use better builder pattern with shortcuts for production and dev defaults
let f = crate::channelconfig::channel_config(evq.range().try_into()?, evq.channel().clone(), node_config)
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let scyco = conf;
let scy = scyllaconn::create_scy_session(scyco).await?;
let series = f.series;
let scalar_type = f.scalar_type;
let shape = f.shape;
let do_test_stream_error = false;
error!("TODO derive AggKind from Transformed empty [846397]");
let with_values = if let AggKind::PulseIdDiff = AggKind::TimeWeightedScalar {
false
} else {
true
};
debug!("Make EventsStreamScylla for {series} {scalar_type:?} {shape:?}");
let stream = scyllaconn::events::EventsStreamScylla::new(
series,
evq.range().into(),
do_one_before_range,
scalar_type,
shape,
with_values,
scy,
do_test_stream_error,
);
let stream = stream
.map({
let mut pulse_last = None;
move |item| match item {
Ok(item) => {
// TODO support pulseid extract
let x = if false {
let x = match item {
ChannelEvents::Events(item) => {
let (tss, pulses) = items_0::EventsNonObj::into_tss_pulses(item);
let mut item = items_2::eventsdim0::EventsDim0::empty();
for (ts, pulse) in tss.into_iter().zip(pulses) {
let value = if let Some(last) = pulse_last {
pulse as i64 - last as i64
} else {
0
};
item.push(ts, pulse, value);
pulse_last = Some(pulse);
}
ChannelEvents::Events(Box::new(item))
}
ChannelEvents::Status(x) => ChannelEvents::Status(x),
};
x
} else {
item
};
Ok(x)
}
Err(e) => Err(e),
}
})
.map(move |item| match &item {
Ok(k) => match k {
ChannelEvents::Events(k) => {
let n = k.len();
let d = evq.event_delay();
(item, n, d.clone())
}
ChannelEvents::Status(_) => (item, 1, None),
},
Err(_) => (item, 1, None),
})
.then(|(item, n, d)| async move {
if let Some(d) = d {
warn!("sleep {} times {:?}", n, d);
tokio::time::sleep(d).await;
}
item
})
.map(|item| {
let item = match item {
Ok(item) => match item {
ChannelEvents::Events(item) => {
let item = ChannelEvents::Events(item);
let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)));
item
}
ChannelEvents::Status(item) => {
let item = ChannelEvents::Status(item);
let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)));
item
}
},
Err(e) => Err(e),
};
item
});
Ok(Box::pin(stream))
} else if let Some(scyconf) = &node_config.node_config.cluster.scylla {
scylla_channel_event_stream(evq, scyconf, node_config).await
} else if let Some(_) = &node_config.node.channel_archiver {
let e = Error::with_msg_no_trace("archapp not built");
Err(e)
@@ -278,11 +183,7 @@ async fn events_conn_handler_inner_try(
}
let mut stream: Pin<Box<dyn Stream<Item = Box<dyn Framable + Send>> + Send>> = if evq.is_event_blobs() {
if false {
error!("TODO support event blob transform");
let e = Error::with_msg(format!("TODO support event blob transform"));
return Err((e, netout).into());
}
// TODO support event blobs as transform
match disk::raw::conn::make_event_blobs_pipe(&evq, node_config).await {
Ok(stream) => {
let stream = stream.map(|x| Box::new(x) as _);
@@ -293,29 +194,27 @@ async fn events_conn_handler_inner_try(
}
}
} else {
match make_channel_events_stream(evq, node_config).await {
match make_channel_events_stream(evq.clone(), node_config).await {
Ok(stream) => {
let stream = stream
.map({
use items_2::eventtransform::EventTransform;
let mut tf = items_2::eventtransform::IdentityTransform::default();
move |item| match item {
Ok(item2) => match item2 {
StreamItem::DataItem(item3) => match item3 {
RangeCompletableItem::Data(item4) => match item4 {
use items_0::transform::EventTransform;
let mut tf = items_0::transform::IdentityTransform::default();
move |item| {
if false {
on_sitemty_data!(item, |item4| {
match item4 {
ChannelEvents::Events(item5) => {
let a = tf.transform(item5);
Ok(StreamItem::DataItem(RangeCompletableItem::Data(ChannelEvents::Events(
a,
))))
let x = ChannelEvents::Events(a);
Ok(StreamItem::DataItem(RangeCompletableItem::Data(x)))
}
x => Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))),
},
x => Ok(StreamItem::DataItem(x)),
},
x => Ok(x),
},
_ => item,
}
})
} else {
item
}
}
})
.map(|x| Box::new(x) as _);
@@ -327,18 +226,11 @@ async fn events_conn_handler_inner_try(
}
};
let mut buf_len_cnt = 0;
let mut buf_len_sum = 0;
let mut buf_len_histo = HistoLog2::new(5);
while let Some(item) = stream.next().await {
let item = item.make_frame();
match item {
Ok(buf) => {
buf_len_cnt += 1;
buf_len_sum += buf.len();
if buf.len() > 1024 * 128 {
warn!("emit buf len {}", buf.len());
}
buf_len_histo.ingest(buf.len() as u32);
match netout.write_all(&buf).await {
Ok(_) => {}
@@ -351,7 +243,22 @@ async fn events_conn_handler_inner_try(
}
}
}
info!("buf_len_cnt {} buf_len_avg {}", buf_len_cnt, buf_len_sum / buf_len_cnt);
{
let item = LogItem {
node_ix: node_config.ix as _,
level: Level::INFO,
msg: format!("buf_len_histo: {:?}", buf_len_histo),
};
let item: Sitemty<ChannelEvents> = Ok(StreamItem::Log(item));
let buf = match item.make_frame() {
Ok(k) => k,
Err(e) => return Err((e, netout))?,
};
match netout.write_all(&buf).await {
Ok(_) => (),
Err(e) => return Err((e, netout))?,
}
}
let buf = match make_term_frame() {
Ok(k) => k,
Err(e) => return Err((e, netout))?,
@@ -364,7 +271,6 @@ async fn events_conn_handler_inner_try(
Ok(_) => (),
Err(e) => return Err((e, netout))?,
}
debug!("events_conn_handler_inner_try buf_len_histo: {:?}", buf_len_histo);
Ok(())
}

View File

@@ -1,2 +1,3 @@
pub mod channelconfig;
pub mod conn;
pub mod scylla;

115
nodenet/src/scylla.rs Normal file
View File

@@ -0,0 +1,115 @@
use err::Error;
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
use items_0::Appendable;
use items_0::Empty;
use items_2::channelevents::ChannelEvents;
use netpod::log::*;
use netpod::NodeConfigCached;
use netpod::ScyllaConfig;
use query::api4::events::PlainEventsQuery;
use std::pin::Pin;
pub async fn scylla_channel_event_stream(
evq: PlainEventsQuery,
scyco: &ScyllaConfig,
node_config: &NodeConfigCached,
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>>, Error> {
// TODO depends in general on the query
// TODO why both in PlainEventsQuery and as separate parameter? Check other usages.
let do_one_before_range = false;
// TODO use better builder pattern with shortcuts for production and dev defaults
let f = crate::channelconfig::channel_config(evq.range().try_into()?, evq.channel().clone(), node_config)
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let scy = scyllaconn::create_scy_session(scyco).await?;
let series = f.series;
let scalar_type = f.scalar_type;
let shape = f.shape;
let do_test_stream_error = false;
let with_values = evq.need_value_data();
debug!("Make EventsStreamScylla for {series} {scalar_type:?} {shape:?}");
let stream = scyllaconn::events::EventsStreamScylla::new(
series,
evq.range().into(),
do_one_before_range,
scalar_type,
shape,
with_values,
scy,
do_test_stream_error,
);
let stream = stream
.map({
let is_pulse_id_diff = evq.transform().is_pulse_id_diff();
let mut pulse_last = None;
move |item| match item {
Ok(item) => {
let x = if is_pulse_id_diff {
let x = match item {
ChannelEvents::Events(item) => {
let (tss, pulses) = items_0::EventsNonObj::into_tss_pulses(item);
let mut item = items_2::eventsdim0::EventsDim0::empty();
for (ts, pulse) in tss.into_iter().zip(pulses) {
let value = if let Some(last) = pulse_last {
pulse as i64 - last as i64
} else {
0
};
item.push(ts, pulse, value);
pulse_last = Some(pulse);
}
ChannelEvents::Events(Box::new(item))
}
ChannelEvents::Status(x) => ChannelEvents::Status(x),
};
x
} else {
item
};
Ok(x)
}
Err(e) => Err(e),
}
})
.map(move |item| match &item {
Ok(k) => match k {
ChannelEvents::Events(k) => {
let n = k.len();
let d = evq.event_delay();
(item, n, d.clone())
}
ChannelEvents::Status(_) => (item, 1, None),
},
Err(_) => (item, 1, None),
})
.then(|(item, n, d)| async move {
if let Some(d) = d {
warn!("sleep {} times {:?}", n, d);
tokio::time::sleep(d.saturating_mul(n as _)).await;
}
item
})
.map(|item| {
let item = match item {
Ok(item) => match item {
ChannelEvents::Events(item) => {
let item = ChannelEvents::Events(item);
let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)));
item
}
ChannelEvents::Status(item) => {
let item = ChannelEvents::Status(item);
let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)));
item
}
},
Err(e) => Err(e),
};
item
});
Ok(Box::pin(stream))
}

View File

@@ -131,6 +131,10 @@ impl PlainEventsQuery {
pub fn is_event_blobs(&self) -> bool {
self.transform.is_event_blobs()
}
pub fn need_value_data(&self) -> bool {
self.transform.need_value_data()
}
}
impl HasBackend for PlainEventsQuery {

View File

@@ -19,6 +19,19 @@ enum EventTransformQuery {
PulseIdDiff,
}
impl EventTransformQuery {
pub fn need_value_data(&self) -> bool {
match self {
EventTransformQuery::EventBlobsVerbatim => true,
EventTransformQuery::EventBlobsUncompressed => true,
EventTransformQuery::ValueFull => true,
EventTransformQuery::ArrayPick(_) => true,
EventTransformQuery::MinMaxAvgDev => true,
EventTransformQuery::PulseIdDiff => false,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
enum TimeBinningTransformQuery {
None,
@@ -84,6 +97,17 @@ impl TransformQuery {
}
}
pub fn need_value_data(&self) -> bool {
self.event.need_value_data()
}
pub fn is_pulse_id_diff(&self) -> bool {
match &self.event {
EventTransformQuery::PulseIdDiff => true,
_ => false,
}
}
pub fn build_event_transform(&self) -> () {}
}
@@ -114,7 +138,7 @@ impl FromUrl for TransformQuery {
}
} else if s == "unweightedScalar" {
TransformQuery {
event: EventTransformQuery::EventBlobsVerbatim,
event: EventTransformQuery::ValueFull,
time_binning: TimeBinningTransformQuery::None,
}
} else if s == "binnedX" {
@@ -143,7 +167,7 @@ impl FromUrl for TransformQuery {
})
.unwrap_or(None);
let ret = TransformQuery {
event: EventTransformQuery::EventBlobsVerbatim,
event: EventTransformQuery::ValueFull,
time_binning: TimeBinningTransformQuery::None,
};
Ok(ret)

View File

@@ -3,8 +3,11 @@ use crate::tcprawclient::open_tcp_streams;
use err::Error;
use futures_util::stream;
use futures_util::StreamExt;
use items_0::on_sitemty_data;
use items_0::streamitem::sitem_data;
use items_0::Empty;
use items_2::channelevents::ChannelEvents;
use items_2::merger::Merger;
use netpod::log::*;
use netpod::ChConf;
use netpod::Cluster;
@@ -36,8 +39,13 @@ pub async fn plain_events_json(
info!("plain_events_json evquery {:?}", evquery);
//let ev_agg_kind = evquery.agg_kind().as_ref().map_or(AggKind::Plain, |x| x.clone());
//info!("plain_events_json ev_agg_kind {:?}", ev_agg_kind);
error!("TODO feed through transform chain");
let empty = items_2::empty::empty_events_dyn_ev(&chconf.scalar_type, &chconf.shape)?;
warn!("TODO feed through transform chain");
let empty = if query.transform().is_pulse_id_diff() {
use items_0::Empty;
Box::new(items_2::eventsdim0::EventsDim0::<i64>::empty())
} else {
items_2::empty::empty_events_dyn_ev(&chconf.scalar_type, &chconf.shape)?
};
info!("plain_events_json with empty item {}", empty.type_name());
let empty = ChannelEvents::Events(empty);
let empty = sitem_data(empty);
@@ -45,14 +53,51 @@ pub async fn plain_events_json(
let inps = open_tcp_streams::<_, items_2::channelevents::ChannelEvents>(&evquery, cluster).await?;
//let inps = open_tcp_streams::<_, Box<dyn items_2::Events>>(&query, cluster).await?;
// TODO propagate also the max-buf-len for the first stage event reader:
let stream = items_2::merger::Merger::new(inps, 1024);
let stream = Merger::new(inps, 1024);
// Transforms that keep state between batches of events, usually only useful after merge.
// Example: pulse-id-diff
use futures_util::Stream;
use items_0::streamitem::Sitemty;
use std::pin::Pin;
let stream: Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>> = if query.transform().is_pulse_id_diff() {
Box::pin(stream.map(|item| {
let mut pulse_last = None;
on_sitemty_data!(item, move |item| {
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::StreamItem;
use items_0::Appendable;
let x = match item {
ChannelEvents::Events(item) => {
let (tss, pulses) = items_0::EventsNonObj::into_tss_pulses(item);
let mut item = items_2::eventsdim0::EventsDim0::empty();
for (ts, pulse) in tss.into_iter().zip(pulses) {
let value = if let Some(last) = pulse_last {
pulse as i64 - last as i64
} else {
0
};
item.push(ts, pulse, value);
pulse_last = Some(pulse);
}
ChannelEvents::Events(Box::new(item))
}
ChannelEvents::Status(x) => ChannelEvents::Status(x),
};
Ok(StreamItem::DataItem(RangeCompletableItem::Data(x)))
})
}))
} else {
Box::pin(stream)
};
let stream = stream.map(|item| {
info!("item after merge: {item:?}");
//info!("item after merge: {item:?}");
item
});
let stream = RangeFilter2::new(stream, todo!(), evquery.one_before_range());
let stream = RangeFilter2::new(stream, query.range().try_into()?, evquery.one_before_range());
let stream = stream.map(|item| {
info!("item after rangefilter: {item:?}");
//info!("item after rangefilter: {item:?}");
item
});
let stream = stream::iter([empty]).chain(stream);

View File

@@ -32,7 +32,7 @@ pub async fn timebinned_json(query: &BinnedQuery, chconf: &ChConf, cluster: &Clu
let inps = open_tcp_streams::<_, items_2::channelevents::ChannelEvents>(&evquery, cluster).await?;
// TODO propagate also the max-buf-len for the first stage event reader:
info!("timebinned_json with empty item {empty:?}");
let stream = Merger::new(inps, 128);
let stream = Merger::new(inps, 1024);
let stream = stream::iter([empty]).chain(stream);
let stream = RangeFilter2::new(stream, todo!(), evquery.one_before_range());
let stream = Box::pin(stream);