From a94d68aa494ccbf3b6801866f2756a0e83fe84bb Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 19 Apr 2023 11:30:18 +0200 Subject: [PATCH] Pulse id diff in events endpoint via transform chain --- daqbufp2/src/test/api4.rs | 1 + daqbufp2/src/test/api4/eventsjson.rs | 8 +- daqbufp2/src/test/api4/pulseiddiff.rs | 81 ++++++++++++++++++++ items_0/src/items_0.rs | 50 ++++++++----- items_0/src/transform.rs | 27 ++++++- items_2/src/channelevents.rs | 21 +++++- items_2/src/eventsdim0.rs | 18 +++++ items_2/src/eventsdim1.rs | 10 +++ items_2/src/framable.rs | 7 ++ items_2/src/merger.rs | 8 +- items_2/src/streams.rs | 17 +++-- items_2/src/transform.rs | 48 +++++------- nodenet/Cargo.toml | 1 + nodenet/src/conn.rs | 72 +++++++++++------- nodenet/src/conn/generator.rs | 12 ++- query/src/api4/events.rs | 24 ++++++ query/src/transform.rs | 8 ++ streams/Cargo.toml | 1 - streams/src/plaineventsjson.rs | 103 +++++++------------------- streams/src/test/collect.rs | 48 ++++++++++++ streams/src/test/timebin.rs | 3 +- streams/src/timebin.rs | 14 ++-- streams/src/timebinnedjson.rs | 2 - streams/src/transform.rs | 37 ++++----- 24 files changed, 421 insertions(+), 200 deletions(-) create mode 100644 daqbufp2/src/test/api4/pulseiddiff.rs diff --git a/daqbufp2/src/test/api4.rs b/daqbufp2/src/test/api4.rs index f72445b..151dc7f 100644 --- a/daqbufp2/src/test/api4.rs +++ b/daqbufp2/src/test/api4.rs @@ -1,2 +1,3 @@ pub mod binnedjson; pub mod eventsjson; +pub mod pulseiddiff; diff --git a/daqbufp2/src/test/api4/eventsjson.rs b/daqbufp2/src/test/api4/eventsjson.rs index 06fb69d..17d68d2 100644 --- a/daqbufp2/src/test/api4/eventsjson.rs +++ b/daqbufp2/src/test/api4/eventsjson.rs @@ -6,6 +6,7 @@ use err::Error; use http::StatusCode; use hyper::Body; use items_0::WithLen; +use items_2::eventsdim0::EventsDim0CollectorOutput; use netpod::log::*; use netpod::range::evrange::NanoRange; use netpod::AppendToUrl; @@ -33,7 +34,7 @@ fn events_plain_json_00() -> Result<(), Error> { cluster, ) .await?; - let res: items_2::eventsdim0::EventsDim0CollectorOutput = serde_json::from_value(jsv)?; + let res: EventsDim0CollectorOutput = serde_json::from_value(jsv)?; // inmem was meant just for functional test, ignores the requested time range assert_eq!(res.ts_anchor_sec(), 1204); assert_eq!(res.len(), 66); @@ -63,7 +64,7 @@ fn events_plain_json_01() -> Result<(), Error> { cluster, ) .await?; - let res: items_2::eventsdim0::EventsDim0CollectorOutput = serde_json::from_value(jsv)?; + let res: EventsDim0CollectorOutput = serde_json::from_value(jsv)?; assert_eq!(res.ts_anchor_sec(), 1210); assert_eq!(res.pulse_anchor(), 2420); let exp = [2420., 2421., 2422., 2423., 2424., 2425.]; @@ -91,7 +92,7 @@ fn events_plain_json_02_range_incomplete() -> Result<(), Error> { cluster, ) .await?; - let res: items_2::eventsdim0::EventsDim0CollectorOutput = serde_json::from_value(jsv).unwrap(); + let res: EventsDim0CollectorOutput = serde_json::from_value(jsv).unwrap(); assert_eq!(res.range_complete(), false); assert_eq!(res.timed_out(), false); Ok(()) @@ -131,6 +132,7 @@ async fn events_plain_json( } let buf = hyper::body::to_bytes(res.into_body()).await.ec()?; let s = String::from_utf8_lossy(&buf); + //info!("received from server: {s}"); let res: JsonValue = serde_json::from_str(&s)?; let pretty = serde_json::to_string_pretty(&res)?; info!("{pretty}"); diff --git a/daqbufp2/src/test/api4/pulseiddiff.rs b/daqbufp2/src/test/api4/pulseiddiff.rs new file mode 100644 index 0000000..08c1bb8 --- /dev/null +++ b/daqbufp2/src/test/api4/pulseiddiff.rs @@ -0,0 +1,81 @@ +use crate::err::ErrConv; +use crate::nodes::require_test_hosts_running; +use crate::test::f32_iter_cmp_near; +use chrono::Utc; +use err::Error; +use http::StatusCode; +use hyper::Body; +use items_0::WithLen; +use items_2::eventsdim0::EventsDim0CollectorOutput; +use netpod::log::*; +use netpod::range::evrange::NanoRange; +use netpod::AppendToUrl; +use netpod::Channel; +use netpod::Cluster; +use netpod::HostPort; +use netpod::APP_JSON; +use query::api4::events::PlainEventsQuery; +use serde_json::Value as JsonValue; +use url::Url; + +fn make_query>(name: S, beg_date: &str, end_date: &str) -> Result { + let channel = Channel { + backend: "test-inmem".into(), + name: name.into(), + series: None, + }; + let beg_date = beg_date.parse()?; + let end_date = end_date.parse()?; + let range = NanoRange::from_date_time(beg_date, end_date); + let query = PlainEventsQuery::new(channel, range).for_pulse_id_diff(); + Ok(query) +} + +#[test] +fn events_plain_json_00() -> Result<(), Error> { + let fut = async { + let rh = require_test_hosts_running()?; + let cluster = &rh.cluster; + let query = make_query("inmem-d0-i32", "1970-01-01T00:20:04.000Z", "1970-01-01T00:21:10.000Z")?; + let jsv = events_plain_json(query, cluster).await?; + let res: EventsDim0CollectorOutput = serde_json::from_value(jsv)?; + // inmem was meant just for functional test, ignores the requested time range + assert_eq!(res.ts_anchor_sec(), 1204); + assert_eq!(res.len(), 66); + Ok(()) + }; + taskrun::run(fut) +} + +// TODO improve by a more information-rich return type. +async fn events_plain_json(query: PlainEventsQuery, cluster: &Cluster) -> Result { + let t1 = Utc::now(); + let node0 = &cluster.nodes[0]; + let hp = HostPort::from_node(node0); + let mut url = Url::parse(&format!("http://{}:{}/api/4/events", hp.host, hp.port))?; + query.append_to_url(&mut url); + let url = url; + info!("http get {}", url); + let req = hyper::Request::builder() + .method(http::Method::GET) + .uri(url.to_string()) + .header(http::header::ACCEPT, APP_JSON) + .body(Body::empty()) + .ec()?; + let client = hyper::Client::new(); + let res = client.request(req).await.ec()?; + if res.status() != StatusCode::OK { + error!("client response {:?}", res); + return Err(Error::with_msg_no_trace(format!("bad result {res:?}"))); + } + let buf = hyper::body::to_bytes(res.into_body()).await.ec()?; + let s = String::from_utf8_lossy(&buf); + let res: JsonValue = serde_json::from_str(&s)?; + let pretty = serde_json::to_string_pretty(&res)?; + info!("{pretty}"); + let t2 = chrono::Utc::now(); + let ms = t2.signed_duration_since(t1).num_milliseconds() as u64; + // TODO add timeout + info!("time {} ms", ms); + Ok(res) +} diff --git a/items_0/src/items_0.rs b/items_0/src/items_0.rs index d835f81..6677c47 100644 --- a/items_0/src/items_0.rs +++ b/items_0/src/items_0.rs @@ -122,6 +122,8 @@ pub trait Events: fn nty_id(&self) -> u32; fn tss(&self) -> &VecDeque; fn pulses(&self) -> &VecDeque; + fn frame_type_id(&self) -> u32; + fn to_min_max_avg(&mut self) -> Box; } impl WithLen for Box { @@ -150,82 +152,90 @@ impl EventsNonObj for Box { impl Events for Box { fn as_time_binnable_mut(&mut self) -> &mut dyn TimeBinnable { - todo!() + Events::as_time_binnable_mut(self.as_mut()) } fn verify(&self) -> bool { - todo!() + Events::verify(self.as_ref()) } fn output_info(&self) { - todo!() + Events::output_info(self.as_ref()) } fn as_collectable_mut(&mut self) -> &mut dyn Collectable { - todo!() + Events::as_collectable_mut(self.as_mut()) } fn as_collectable_with_default_ref(&self) -> &dyn Collectable { - todo!() + Events::as_collectable_with_default_ref(self.as_ref()) } fn as_collectable_with_default_mut(&mut self) -> &mut dyn Collectable { - todo!() + Events::as_collectable_with_default_mut(self.as_mut()) } fn ts_min(&self) -> Option { - todo!() + Events::ts_min(self.as_ref()) } fn ts_max(&self) -> Option { - todo!() + Events::ts_max(self.as_ref()) } fn take_new_events_until_ts(&mut self, ts_end: u64) -> Box { - todo!() + Events::take_new_events_until_ts(self.as_mut(), ts_end) } fn new_empty_evs(&self) -> Box { - todo!() + Events::new_empty_evs(self.as_ref()) } fn drain_into_evs(&mut self, dst: &mut Box, range: (usize, usize)) -> Result<(), MergeError> { - todo!() + Events::drain_into_evs(self.as_mut(), dst, range) } fn find_lowest_index_gt_evs(&self, ts: u64) -> Option { - todo!() + Events::find_lowest_index_gt_evs(self.as_ref(), ts) } fn find_lowest_index_ge_evs(&self, ts: u64) -> Option { - todo!() + Events::find_lowest_index_ge_evs(self.as_ref(), ts) } fn find_highest_index_lt_evs(&self, ts: u64) -> Option { - todo!() + Events::find_highest_index_lt_evs(self.as_ref(), ts) } fn clone_dyn(&self) -> Box { - todo!() + Events::clone_dyn(self.as_ref()) } fn partial_eq_dyn(&self, other: &dyn Events) -> bool { - todo!() + Events::partial_eq_dyn(self.as_ref(), other) } fn serde_id(&self) -> &'static str { - todo!() + Events::serde_id(self.as_ref()) } fn nty_id(&self) -> u32 { - todo!() + Events::nty_id(self.as_ref()) } fn tss(&self) -> &VecDeque { - todo!() + Events::tss(self.as_ref()) } fn pulses(&self) -> &VecDeque { - todo!() + Events::pulses(self.as_ref()) + } + + fn frame_type_id(&self) -> u32 { + Events::frame_type_id(self.as_ref()) + } + + fn to_min_max_avg(&mut self) -> Box { + Events::to_min_max_avg(self.as_mut()) } } diff --git a/items_0/src/transform.rs b/items_0/src/transform.rs index 07cfb59..428f317 100644 --- a/items_0/src/transform.rs +++ b/items_0/src/transform.rs @@ -9,7 +9,10 @@ use err::Error; use futures_util::stream; use futures_util::Future; use futures_util::Stream; +use futures_util::StreamExt; use std::pin::Pin; +use std::task::Context; +use std::task::Poll; pub trait EventStreamTrait: Stream>> + WithTransformProperties + Send {} @@ -23,6 +26,10 @@ pub trait CollectableStreamTrait: { } +pub struct EventTransformProperties { + pub needs_value: bool, +} + pub struct TransformProperties { pub needs_one_before_range: bool, pub needs_value: bool, @@ -50,7 +57,7 @@ where } } -pub trait EventTransform: WithTransformProperties { +pub trait EventTransform: WithTransformProperties + Send { fn transform(&mut self, src: Box) -> Box; } @@ -130,6 +137,24 @@ where } } +pub struct TimeBinnableStreamBox(pub Pin>); + +impl WithTransformProperties for TimeBinnableStreamBox { + fn query_transform_properties(&self) -> TransformProperties { + self.0.query_transform_properties() + } +} + +impl Stream for TimeBinnableStreamBox { + type Item = ::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + self.0.poll_next_unpin(cx) + } +} + +impl TimeBinnableStreamTrait for TimeBinnableStreamBox {} + pub struct CollectableStreamBox(pub Pin>); impl WithTransformProperties for stream::Empty { diff --git a/items_2/src/channelevents.rs b/items_2/src/channelevents.rs index 084e6c6..a64b0a0 100644 --- a/items_2/src/channelevents.rs +++ b/items_2/src/channelevents.rs @@ -26,6 +26,7 @@ use serde::Deserialize; use serde::Serialize; use std::any; use std::any::Any; +use std::collections::VecDeque; use std::fmt; use std::time::Duration; use std::time::SystemTime; @@ -666,8 +667,11 @@ impl TimeBinnable for ChannelEvents { } impl EventsNonObj for ChannelEvents { - fn into_tss_pulses(self: Box) -> (std::collections::VecDeque, std::collections::VecDeque) { - todo!() + fn into_tss_pulses(self: Box) -> (VecDeque, VecDeque) { + match *self { + ChannelEvents::Events(k) => k.into_tss_pulses(), + ChannelEvents::Status(k) => (VecDeque::new(), VecDeque::new()), + } } } @@ -751,6 +755,17 @@ impl Events for ChannelEvents { fn pulses(&self) -> &std::collections::VecDeque { todo!() } + + fn frame_type_id(&self) -> u32 { + ::FRAME_TYPE_ID + } + + fn to_min_max_avg(&mut self) -> Box { + match self { + ChannelEvents::Events(item) => Box::new(ChannelEvents::Events(Events::to_min_max_avg(item))), + ChannelEvents::Status(item) => Box::new(ChannelEvents::Status(item.take())), + } + } } impl Collectable for ChannelEvents { @@ -771,6 +786,8 @@ pub struct ChannelEventsTimeBinner { impl fmt::Debug for ChannelEventsTimeBinner { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { fmt.debug_struct("ChannelEventsTimeBinner") + .field("binrange", &self.binrange) + .field("do_time_weight", &self.do_time_weight) .field("conn_state", &self.conn_state) .finish() } diff --git a/items_2/src/eventsdim0.rs b/items_2/src/eventsdim0.rs index 3c8f62d..c01ce1d 100644 --- a/items_2/src/eventsdim0.rs +++ b/items_2/src/eventsdim0.rs @@ -1,4 +1,6 @@ use crate::binsdim0::BinsDim0; +use crate::framable::FrameType; +use crate::framable::FrameTypeStatic; use crate::IsoDateTime; use crate::RangeOverlapInfo; use crate::TimeBinnableType; @@ -11,6 +13,7 @@ use items_0::collect_s::CollectorType; use items_0::collect_s::ToJsonBytes; use items_0::collect_s::ToJsonResult; use items_0::container::ByteEstimate; +use items_0::framable::FrameTypeInnerStatic; use items_0::scalar_ops::ScalarOps; use items_0::timebin::TimeBinnable; use items_0::timebin::TimeBinned; @@ -922,6 +925,21 @@ impl Events for EventsDim0 { fn pulses(&self) -> &VecDeque { &self.pulses } + + fn frame_type_id(&self) -> u32 { + error!("TODO frame_type_id should not be called"); + // TODO make more nice + panic!() + } + + fn to_min_max_avg(&mut self) -> Box { + let dst = Self { + tss: mem::replace(&mut self.tss, Default::default()), + pulses: mem::replace(&mut self.pulses, Default::default()), + values: mem::replace(&mut self.values, Default::default()), + }; + Box::new(dst) + } } #[derive(Debug)] diff --git a/items_2/src/eventsdim1.rs b/items_2/src/eventsdim1.rs index ed00b18..32a6440 100644 --- a/items_2/src/eventsdim1.rs +++ b/items_2/src/eventsdim1.rs @@ -1,4 +1,5 @@ use crate::binsdim0::BinsDim0; +use crate::framable::FrameType; use crate::IsoDateTime; use crate::RangeOverlapInfo; use crate::TimeBinnableType; @@ -813,6 +814,15 @@ impl Events for EventsDim1 { fn pulses(&self) -> &VecDeque { &self.pulses } + + fn frame_type_id(&self) -> u32 { + // TODO make more nice + panic!() + } + + fn to_min_max_avg(&mut self) -> Box { + todo!() + } } #[derive(Debug)] diff --git a/items_2/src/framable.rs b/items_2/src/framable.rs index 4721aab..1973056 100644 --- a/items_2/src/framable.rs +++ b/items_2/src/framable.rs @@ -15,6 +15,7 @@ use items_0::streamitem::StreamItem; use items_0::streamitem::ERROR_FRAME_TYPE_ID; use items_0::streamitem::EVENT_QUERY_JSON_STRING_FRAME; use items_0::streamitem::SITEMTY_NONSPEC_FRAME_TYPE_ID; +use items_0::Events; use serde::de::DeserializeOwned; use serde::Deserialize; use serde::Serialize; @@ -50,6 +51,12 @@ where } } +impl FrameType for Box { + fn frame_type_id(&self) -> u32 { + self.as_ref().frame_type_id() + } +} + pub trait Framable { fn make_frame(&self) -> Result; } diff --git a/items_2/src/merger.rs b/items_2/src/merger.rs index 7dc8d1f..ad06ead 100644 --- a/items_2/src/merger.rs +++ b/items_2/src/merger.rs @@ -10,6 +10,7 @@ use items_0::streamitem::StreamItem; use items_0::transform::EventTransform; use items_0::transform::TransformProperties; use items_0::transform::WithTransformProperties; +use items_0::Events; use items_0::MergeError; use items_0::WithLen; use netpod::log::*; @@ -451,8 +452,11 @@ impl WithTransformProperties for Merger { } } -impl EventTransform for Merger { - fn transform(&mut self, src: Box) -> Box { +impl EventTransform for Merger +where + T: Send, +{ + fn transform(&mut self, src: Box) -> Box { todo!() } } diff --git a/items_2/src/streams.rs b/items_2/src/streams.rs index 141c1b4..dcd9579 100644 --- a/items_2/src/streams.rs +++ b/items_2/src/streams.rs @@ -62,9 +62,9 @@ where impl EventTransform for Enumerate2 where - T: WithTransformProperties, + T: WithTransformProperties + Send, { - fn transform(&mut self, src: Box) -> Box { + fn transform(&mut self, src: Box) -> Box { todo!() } } @@ -148,9 +148,11 @@ where impl EventTransform for Then2 where - T: EventTransform, + T: EventTransform + Send, + F: Send, + Fut: Send, { - fn transform(&mut self, src: Box) -> Box { + fn transform(&mut self, src: Box) -> Box { todo!() } } @@ -219,8 +221,11 @@ impl WithTransformProperties for VecStream { } } -impl EventTransform for VecStream { - fn transform(&mut self, src: Box) -> Box { +impl EventTransform for VecStream +where + T: Send, +{ + fn transform(&mut self, src: Box) -> Box { todo!() } } diff --git a/items_2/src/transform.rs b/items_2/src/transform.rs index 24963ca..0235936 100644 --- a/items_2/src/transform.rs +++ b/items_2/src/transform.rs @@ -42,8 +42,8 @@ impl WithTransformProperties for TransformEventMinMaxAvg { } impl EventTransform for TransformEventMinMaxAvg { - fn transform(&mut self, src: Box) -> Box { - todo!() + fn transform(&mut self, mut src: Box) -> Box { + src.to_min_max_avg() } } @@ -51,7 +51,9 @@ pub fn make_transform_min_max_avg() -> TransformEvent { TransformEvent(Box::new(TransformEventMinMaxAvg {})) } -struct TransformEventPulseIdDiff {} +struct TransformEventPulseIdDiff { + pulse_last: Option, +} impl WithTransformProperties for TransformEventPulseIdDiff { fn query_transform_properties(&self) -> TransformProperties { @@ -61,36 +63,22 @@ impl WithTransformProperties for TransformEventPulseIdDiff { impl EventTransform for TransformEventPulseIdDiff { fn transform(&mut self, src: Box) -> Box { - let mut src = src; - if let Some(chevs) = src.as_any_mut().downcast_mut::() { - let chevs2 = chevs; - let chevs = mem::replace(chevs2, ChannelEvents::Status(None)); - let mut pulse_last = None; - match chevs { - ChannelEvents::Events(item) => { - let (tss, pulses) = EventsNonObj::into_tss_pulses(item); - let mut item = EventsDim0::empty(); - for (ts, pulse) in tss.into_iter().zip(pulses) { - let value = if let Some(last) = pulse_last { - pulse as i64 - last as i64 - } else { - 0 - }; - item.push(ts, pulse, value); - pulse_last = Some(pulse); - } - *chevs2 = ChannelEvents::Events(Box::new(item)); - } - ChannelEvents::Status(_) => {} - } - src - } else { - warn!("make_transform_pulse_id_diff item is not ChannelEvents"); - src + let (tss, pulses) = EventsNonObj::into_tss_pulses(src); + let mut item = EventsDim0::empty(); + let pulse_last = &mut self.pulse_last; + for (ts, pulse) in tss.into_iter().zip(pulses) { + let value = if let Some(last) = pulse_last { + pulse as i64 - *last as i64 + } else { + 0 + }; + item.push(ts, pulse, value); + *pulse_last = Some(pulse); } + Box::new(ChannelEvents::Events(Box::new(item))) } } pub fn make_transform_pulse_id_diff() -> TransformEvent { - TransformEvent(Box::new(TransformEventPulseIdDiff {})) + TransformEvent(Box::new(TransformEventPulseIdDiff { pulse_last: None })) } diff --git a/nodenet/Cargo.toml b/nodenet/Cargo.toml index fd76b0d..e40befe 100644 --- a/nodenet/Cargo.toml +++ b/nodenet/Cargo.toml @@ -21,6 +21,7 @@ tracing = "0.1.25" hex = "0.4.3" scylla = "0.7" tokio-postgres = "0.7.7" +wasmer = { version = "3.2.0", default-features = false, features = ["sys", "cranelift"] } err = { path = "../err" } netpod = { path = "../netpod" } query = { path = "../query" } diff --git a/nodenet/src/conn.rs b/nodenet/src/conn.rs index a8f09a5..f778e96 100644 --- a/nodenet/src/conn.rs +++ b/nodenet/src/conn.rs @@ -1,15 +1,19 @@ pub mod generator; +use crate::scylla::scylla_channel_event_stream; use err::Error; use futures_util::Stream; use futures_util::StreamExt; use items_0::on_sitemty_data; +use items_0::streamitem::sitem_data; use items_0::streamitem::LogItem; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; use items_0::streamitem::EVENT_QUERY_JSON_STRING_FRAME; +use items_0::Events; use items_2::channelevents::ChannelEvents; +use items_2::empty::empty_events_dyn_ev; use items_2::framable::EventQueryJsonStringFrame; use items_2::framable::Framable; use items_2::frame::decode_frame; @@ -24,14 +28,13 @@ use query::api4::events::PlainEventsQuery; use std::net::SocketAddr; use std::pin::Pin; use streams::frames::inmem::InMemoryFrameAsyncReadStream; +use streams::transform::build_event_transform; use tokio::io::AsyncWriteExt; use tokio::net::tcp::OwnedReadHalf; use tokio::net::tcp::OwnedWriteHalf; use tokio::net::TcpStream; use tracing::Instrument; -use crate::scylla::scylla_channel_event_stream; - #[cfg(test)] mod test; @@ -63,14 +66,14 @@ impl> From<(E, OwnedWriteHalf)> for ConnErr { } } -async fn make_channel_events_stream( +async fn make_channel_events_stream_data( evq: PlainEventsQuery, chconf: ChConf, node_config: &NodeConfigCached, ) -> Result> + Send>>, Error> { info!("nodenet::conn::make_channel_events_stream"); if evq.channel().backend() == "test-inmem" { - warn!("TEST BACKEND DATA"); + warn!("GENERATE INMEM TEST DATA"); let node_count = node_config.node_config.cluster.nodes.len() as u64; let node_ix = node_config.ix as u64; let chn = evq.channel().name(); @@ -116,6 +119,19 @@ async fn make_channel_events_stream( } } +async fn make_channel_events_stream( + evq: PlainEventsQuery, + chconf: ChConf, + node_config: &NodeConfigCached, +) -> Result> + Send>>, Error> { + let empty = empty_events_dyn_ev(&chconf.scalar_type, &chconf.shape)?; + let empty = sitem_data(ChannelEvents::Events(empty)); + let stream = make_channel_events_stream_data(evq, chconf, node_config).await?; + let ret = futures_util::stream::iter([empty]).chain(stream); + let ret = Box::pin(ret); + Ok(ret) +} + async fn events_get_input_frames(netin: OwnedReadHalf) -> Result, Error> { warn!("fix magic inmem_bufcap option"); let perf_opts = PerfOpts::default(); @@ -215,28 +231,32 @@ async fn events_conn_handler_inner_try( } else { match make_channel_events_stream(evq.clone(), chconf, node_config).await { Ok(stream) => { - let stream = stream - .map({ - use items_0::transform::EventTransform; - let mut tf = items_0::transform::IdentityTransform::default(); - move |item| { - if false { - on_sitemty_data!(item, |item4| { - match item4 { - ChannelEvents::Events(item5) => { - let a = tf.transform(item5); - let x = ChannelEvents::Events(a); - Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))) - } - x => Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))), - } - }) - } else { - item - } - } - }) - .map(|x| Box::new(x) as _); + if false { + // TODO wasm example + use wasmer::Value; + let wasm = b""; + let mut store = wasmer::Store::default(); + let module = wasmer::Module::new(&store, wasm).unwrap(); + let import_object = wasmer::imports! {}; + let instance = wasmer::Instance::new(&mut store, &module, &import_object).unwrap(); + let add_one = instance.exports.get_function("event_transform").unwrap(); + let result = add_one.call(&mut store, &[Value::I32(42)]).unwrap(); + assert_eq!(result[0], Value::I32(43)); + } + let mut tr = match build_event_transform(evq.transform()) { + Ok(x) => x, + Err(e) => { + return Err((e, netout).into()); + } + }; + let stream = stream.map(move |x| { + let item = on_sitemty_data!(x, |x| { + let x: Box = Box::new(x); + let x = tr.0.transform(x); + Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))) + }); + Box::new(item) as Box + }); Box::pin(stream) } Err(e) => { diff --git a/nodenet/src/conn/generator.rs b/nodenet/src/conn/generator.rs index 45cffec..3e693e4 100644 --- a/nodenet/src/conn/generator.rs +++ b/nodenet/src/conn/generator.rs @@ -1,6 +1,7 @@ use err::Error; use futures_util::Stream; use items_0::container::ByteEstimate; +use items_0::streamitem::sitem_data; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; @@ -18,6 +19,7 @@ pub fn generate_i32( node_count: u64, range: SeriesRange, ) -> Result> + Send>>, Error> { + info!("generate_i32 {node_ix} {node_count}"); type T = i32; let mut items = Vec::new(); match range { @@ -29,21 +31,23 @@ pub fn generate_i32( if ts >= range.end { break; } - let pulse = ts; - item.push(ts, pulse, pulse as T); + let pulse = ts / td; + let value = pulse as T; + item.push(ts, pulse, value); ts += td * node_count as u64; if item.byte_estimate() > 200 { let w = ChannelEvents::Events(Box::new(item) as _); - let w = Ok::<_, Error>(StreamItem::DataItem(RangeCompletableItem::Data(w))); + let w = sitem_data(w); items.push(w); item = items_2::eventsdim0::EventsDim0::empty(); } } if item.len() != 0 { let w = ChannelEvents::Events(Box::new(item) as _); - let w = Ok::<_, Error>(StreamItem::DataItem(RangeCompletableItem::Data(w))); + let w = sitem_data(w); items.push(w); } + items.push(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))) } SeriesRange::PulseRange(_) => { error!("TODO generate test data by pulse id range"); diff --git a/query/src/api4/events.rs b/query/src/api4/events.rs index bb49c03..cb0dbad 100644 --- a/query/src/api4/events.rs +++ b/query/src/api4/events.rs @@ -44,6 +44,8 @@ pub struct PlainEventsQuery { do_test_main_error: bool, #[serde(default, skip_serializing_if = "is_false")] do_test_stream_error: bool, + #[serde(default, skip_serializing_if = "is_false")] + test_do_wasm: bool, } impl PlainEventsQuery { @@ -63,6 +65,7 @@ impl PlainEventsQuery { buf_len_disk_io: None, do_test_main_error: false, do_test_stream_error: false, + test_do_wasm: false, } } @@ -104,6 +107,10 @@ impl PlainEventsQuery { &self.event_delay } + pub fn merger_out_len_max(&self) -> usize { + 1024 + } + pub fn do_test_main_error(&self) -> bool { self.do_test_main_error } @@ -112,6 +119,10 @@ impl PlainEventsQuery { self.do_test_stream_error } + pub fn test_do_wasm(&self) -> bool { + self.test_do_wasm + } + pub fn set_series_id(&mut self, series: u64) { self.channel.series = Some(series); } @@ -134,6 +145,11 @@ impl PlainEventsQuery { self } + pub fn for_pulse_id_diff(mut self) -> Self { + self.transform = TransformQuery::for_pulse_id_diff(); + self + } + pub fn is_event_blobs(&self) -> bool { self.transform.is_event_blobs() } @@ -200,6 +216,11 @@ impl FromUrl for PlainEventsQuery { .map_or("false", |k| k) .parse() .map_err(|e| Error::with_public_msg(format!("can not parse doTestStreamError: {}", e)))?, + test_do_wasm: pairs + .get("testDoWasm") + .map(|x| x.parse::().ok()) + .unwrap_or(None) + .unwrap_or(false), }; Ok(ret) } @@ -241,5 +262,8 @@ impl AppendToUrl for PlainEventsQuery { if self.do_test_stream_error { g.append_pair("doTestStreamError", "true"); } + if self.test_do_wasm { + g.append_pair("testDoWasm", "true"); + } } } diff --git a/query/src/transform.rs b/query/src/transform.rs index 5b02f1b..f872b5e 100644 --- a/query/src/transform.rs +++ b/query/src/transform.rs @@ -86,6 +86,14 @@ impl TransformQuery { } } + pub fn for_pulse_id_diff() -> Self { + Self { + event: EventTransformQuery::PulseIdDiff, + // TODO probably we want unweighted binning here. + time_binning: TimeBinningTransformQuery::TimeWeighted, + } + } + pub fn is_event_blobs(&self) -> bool { match &self.event { EventTransformQuery::EventBlobsVerbatim => true, diff --git a/streams/Cargo.toml b/streams/Cargo.toml index 52e27ab..4c96e55 100644 --- a/streams/Cargo.toml +++ b/streams/Cargo.toml @@ -16,7 +16,6 @@ arrayref = "0.3.6" crc32fast = "1.3.2" byteorder = "1.4.3" chrono = { version = "0.4.19", features = ["serde"] } -wasmer = { version = "3.1.1", default-features = false, features = ["sys", "cranelift"] } err = { path = "../err" } netpod = { path = "../netpod" } query = { path = "../query" } diff --git a/streams/src/plaineventsjson.rs b/streams/src/plaineventsjson.rs index 6d98778..5891f7e 100644 --- a/streams/src/plaineventsjson.rs +++ b/streams/src/plaineventsjson.rs @@ -1,13 +1,12 @@ use crate::collect::Collect; -use crate::rangefilter2::RangeFilter2; use crate::tcprawclient::open_tcp_streams; +use crate::transform::build_merged_event_transform; use crate::transform::EventsToTimeBinnable; use crate::transform::TimeBinnableToCollectable; use err::Error; -use futures_util::stream; use futures_util::StreamExt; use items_0::on_sitemty_data; -use items_0::streamitem::sitem_data; +use items_0::Events; use items_2::channelevents::ChannelEvents; use items_2::merger::Merger; use items_2::streams::PlainEventStream; @@ -18,96 +17,44 @@ use query::api4::events::PlainEventsQuery; use serde_json::Value as JsonValue; use std::time::Instant; -pub async fn plain_events_json(evq: &PlainEventsQuery, chconf: &ChConf, cluster: &Cluster) -> Result { - if evq.channel().name() == "wasm-test-01" { - use wasmer::Value; - let wasm = evq.channel().name().as_bytes(); - let mut store = wasmer::Store::default(); - let module = wasmer::Module::new(&store, wasm).unwrap(); - let import_object = wasmer::imports! {}; - let instance = wasmer::Instance::new(&mut store, &module, &import_object).unwrap(); - let add_one = instance.exports.get_function("event_transform").unwrap(); - let result = add_one.call(&mut store, &[Value::I32(42)]).unwrap(); - assert_eq!(result[0], Value::I32(43)); - } +pub async fn plain_events_json( + evq: &PlainEventsQuery, + _chconf: &ChConf, + cluster: &Cluster, +) -> Result { + info!("plain_events_json evquery {:?}", evq); // TODO remove magic constant let deadline = Instant::now() + evq.timeout(); - let events_max = evq.events_max(); - let evquery = evq.clone(); - info!("plain_events_json evquery {:?}", evquery); - //let ev_agg_kind = evquery.agg_kind().as_ref().map_or(AggKind::Plain, |x| x.clone()); - //info!("plain_events_json ev_agg_kind {:?}", ev_agg_kind); - warn!("TODO feed through transform chain"); - let empty = if evq.transform().is_pulse_id_diff() { - use items_0::Empty; - Box::new(items_2::eventsdim0::EventsDim0::::empty()) - } else { - items_2::empty::empty_events_dyn_ev(&chconf.scalar_type, &chconf.shape)? - }; - info!("plain_events_json with empty item {}", empty.type_name()); - let empty = ChannelEvents::Events(empty); - let empty = sitem_data(empty); - // TODO should be able to ask for data-events only, instead of mixed data and status events. - let inps = open_tcp_streams::<_, items_2::channelevents::ChannelEvents>(&evquery, cluster).await?; - //let inps = open_tcp_streams::<_, Box>(&query, cluster).await?; - // TODO propagate also the max-buf-len for the first stage event reader: - let stream = Merger::new(inps, 1024); - - // Transforms that keep state between batches of events, usually only useful after merge. - // Example: pulse-id-diff - use futures_util::Stream; - use items_0::streamitem::Sitemty; - use std::pin::Pin; - let stream: Pin> + Send>> = if evq.transform().is_pulse_id_diff() { - let mut pulse_last = None; - Box::pin(stream.map(move |item| { - on_sitemty_data!(item, |item| { - use items_0::streamitem::RangeCompletableItem; - use items_0::streamitem::StreamItem; - use items_0::Appendable; - use items_0::Empty; - let x = match item { - ChannelEvents::Events(item) => { - let (tss, pulses) = items_0::EventsNonObj::into_tss_pulses(item); - let mut item = items_2::eventsdim0::EventsDim0::empty(); - for (ts, pulse) in tss.into_iter().zip(pulses) { - let value = if let Some(last) = pulse_last { - pulse as i64 - last as i64 - } else { - 0 - }; - item.push(ts, pulse, value); - pulse_last = Some(pulse); - } - ChannelEvents::Events(Box::new(item)) - } - ChannelEvents::Status(x) => ChannelEvents::Status(x), - }; - Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))) - }) - })) - } else { - Box::pin(stream) - }; - + let mut tr = build_merged_event_transform(evq.transform())?; + // TODO make sure the empty container arrives over the network. + let inps = open_tcp_streams::<_, ChannelEvents>(&evq, cluster).await?; + // TODO propagate also the max-buf-len for the first stage event reader. + // TODO use a mixture of count and byte-size as threshold. + let stream = Merger::new(inps, evq.merger_out_len_max()); #[cfg(DISABLED)] let stream = stream.map(|item| { info!("item after merge: {item:?}"); item }); - //let stream = RangeFilter2::new(stream, evq.range().try_into()?, evquery.one_before_range()); + #[cfg(DISABLED)] + let stream = crate::rangefilter2::RangeFilter2::new(stream, evq.range().try_into()?, evquery.one_before_range()); #[cfg(DISABLED)] let stream = stream.map(|item| { info!("item after rangefilter: {item:?}"); item }); - let stream = stream::iter([empty]).chain(stream); + let stream = stream.map(move |k| { + on_sitemty_data!(k, |k| { + let k: Box = Box::new(k); + info!("-------------------------\ngot len {}", k.len()); + let k = tr.0.transform(k); + Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))) + }) + }); let stream = PlainEventStream::new(stream); let stream = EventsToTimeBinnable::new(stream); let stream = TimeBinnableToCollectable::new(stream); - let collected = Collect::new(stream, deadline, events_max, Some(evq.range().clone()), None).await; - - //let collected = crate::collect::collect(stream, deadline, events_max, Some(evq.range().clone()), None).await?; + let collected = Collect::new(stream, deadline, evq.events_max(), Some(evq.range().clone()), None).await?; let jsval = serde_json::to_value(&collected)?; Ok(jsval) } diff --git a/streams/src/test/collect.rs b/streams/src/test/collect.rs index eb67142..b22fee6 100644 --- a/streams/src/test/collect.rs +++ b/streams/src/test/collect.rs @@ -1,17 +1,26 @@ use crate::collect::Collect; use crate::test::runfut; +use crate::transform::build_event_transform; +use crate::transform::build_time_binning_transform; use crate::transform::EventsToTimeBinnable; use crate::transform::TimeBinnableToCollectable; use err::Error; use futures_util::stream; +use futures_util::StreamExt; +use items_0::on_sitemty_data; use items_0::streamitem::sitem_data; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::StreamItem; +use items_0::transform::EventStreamBox; +use items_0::transform::EventStreamTrait; use items_0::WithLen; use items_2::eventsdim0::EventsDim0CollectorOutput; use items_2::streams::PlainEventStream; use items_2::testgen::make_some_boxed_d0_f32; +use netpod::log::*; use netpod::timeunits::SEC; +use netpod::FromUrl; +use query::transform::TransformQuery; use std::time::Duration; use std::time::Instant; @@ -69,3 +78,42 @@ fn collect_channel_events_01() -> Result<(), Error> { }; runfut(fut) } + +#[test] +fn collect_channel_events_pulse_id_diff() -> Result<(), Error> { + let fut = async { + let trqu = TransformQuery::from_url(&"https://data-api.psi.ch/?binningScheme=pulseIdDiff".parse()?)?; + info!("{trqu:?}"); + let evs0 = make_some_boxed_d0_f32(20, SEC * 10, SEC * 1, 0, 28736487); + let evs1 = make_some_boxed_d0_f32(20, SEC * 30, SEC * 1, 0, 882716583); + let stream = stream::iter(vec![ + sitem_data(evs0), + sitem_data(evs1), + Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)), + ]); + let mut tr = build_event_transform(&trqu)?; + let stream = stream.map(move |x| { + on_sitemty_data!(x, |x| { + let x = tr.0.transform(x); + Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))) + }) + }); + let stream = PlainEventStream::new(stream); + let stream = EventsToTimeBinnable::new(stream); + let deadline = Instant::now() + Duration::from_millis(4000); + let events_max = 10000; + let stream = Box::pin(stream); + let stream = build_time_binning_transform(&trqu, stream)?; + let stream = TimeBinnableToCollectable::new(stream); + let res = Collect::new(stream, deadline, events_max, None, None).await?; + if let Some(res) = res.as_any_ref().downcast_ref::>() { + eprintln!("Great, a match"); + eprintln!("{res:?}"); + assert_eq!(res.len(), 40); + } else { + return Err(Error::with_msg(format!("bad type of collected result"))); + } + Ok(()) + }; + runfut(fut) +} diff --git a/streams/src/test/timebin.rs b/streams/src/test/timebin.rs index c9d224e..a8184d0 100644 --- a/streams/src/test/timebin.rs +++ b/streams/src/test/timebin.rs @@ -256,6 +256,7 @@ fn transform_chain_correctness_01() -> Result<(), Error> { type STY = f32; let tq = TransformQuery::default_time_binned(); let empty = EventsDim0::::empty(); - build_event_transform(&tq, empty.into())?; + build_event_transform(&tq)?; + todo!(); Ok(()) } diff --git a/streams/src/timebin.rs b/streams/src/timebin.rs index d189677..aace697 100644 --- a/streams/src/timebin.rs +++ b/streams/src/timebin.rs @@ -50,7 +50,7 @@ where do_time_weight: bool, deadline: Instant, deadline_fut: Pin + Send>>, - range_complete: bool, + range_final: bool, binner: Option<::TimeBinner>, done_data: bool, done: bool, @@ -65,7 +65,7 @@ where fmt.debug_struct(any::type_name::()) .field("range", &self.range) .field("deadline", &self.deadline) - .field("range_complete", &self.range_complete) + .field("range_final", &self.range_final) .field("binner", &self.binner) .finish() } @@ -84,7 +84,7 @@ where do_time_weight, deadline, deadline_fut, - range_complete: false, + range_final: false, binner: None, done_data: false, done: false, @@ -123,7 +123,7 @@ where Ready(None) } else if self.done_data { self.done = true; - if self.range_complete { + if self.range_final { Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) } else { continue; @@ -132,7 +132,7 @@ where match self.deadline_fut.poll_unpin(cx) { Ready(()) => { trace2!("timeout"); - let self_range_complete = self.range_complete; + let self_range_complete = self.range_final; if let Some(binner) = self.binner.as_mut() { trace2!("bins ready count before finish {}", binner.bins_ready_count()); // TODO rework the finish logic @@ -159,7 +159,7 @@ where StreamItem::DataItem(item) => match item { RangeCompletableItem::RangeComplete => { debug!("see RangeComplete"); - self.range_complete = true; + self.range_final = true; continue; } RangeCompletableItem::Data(item) => { @@ -193,7 +193,7 @@ where }, Ready(None) => { trace!("finish up"); - let self_range_complete = self.range_complete; + let self_range_complete = self.range_final; if let Some(binner) = self.binner.as_mut() { trace!("bins ready count before finish {}", binner.bins_ready_count()); // TODO rework the finish logic diff --git a/streams/src/timebinnedjson.rs b/streams/src/timebinnedjson.rs index 145853e..3a7f3cc 100644 --- a/streams/src/timebinnedjson.rs +++ b/streams/src/timebinnedjson.rs @@ -32,8 +32,6 @@ pub async fn timebinned_json(query: &BinnedQuery, chconf: &ChConf, cluster: &Clu //crate::transform::build_event_transform(tr, inp); query.transform(); - crate::transform::build_event_transform; - // TODO let evquery = PlainEventsQuery::new(query.channel().clone(), query.range().clone()).for_time_weighted_scalar(); let inps = open_tcp_streams::<_, items_2::channelevents::ChannelEvents>(&evquery, cluster).await?; diff --git a/streams/src/transform.rs b/streams/src/transform.rs index 1972bb5..c3c7880 100644 --- a/streams/src/transform.rs +++ b/streams/src/transform.rs @@ -10,6 +10,7 @@ use items_0::transform::CollectableStreamBox; use items_0::transform::CollectableStreamTrait; use items_0::transform::EventStreamBox; use items_0::transform::EventStreamTrait; +use items_0::transform::TimeBinnableStreamBox; use items_0::transform::TimeBinnableStreamTrait; use items_0::transform::TransformEvent; use items_0::transform::TransformProperties; @@ -24,7 +25,7 @@ use std::pin::Pin; use std::task::Context; use std::task::Poll; -pub fn build_event_transform(tr: &TransformQuery, inp: EventStreamBox) -> Result { +pub fn build_event_transform(tr: &TransformQuery) -> Result { let trev = tr.get_tr_event(); match trev { EventTransformQuery::ValueFull => Ok(make_transform_identity()), @@ -42,6 +43,14 @@ pub fn build_event_transform(tr: &TransformQuery, inp: EventStreamBox) -> Result } } +pub fn build_merged_event_transform(tr: &TransformQuery) -> Result { + let trev = tr.get_tr_event(); + match trev { + EventTransformQuery::PulseIdDiff => Ok(make_transform_pulse_id_diff()), + _ => Ok(make_transform_identity()), + } +} + pub struct EventsToTimeBinnable { inp: Pin>, } @@ -133,22 +142,16 @@ impl CollectableStreamTrait for TimeBinnableToCollectable {} pub fn build_time_binning_transform( tr: &TransformQuery, inp: Pin>, -) -> Result { - let trev = tr.get_tr_event(); - match trev { - EventTransformQuery::ValueFull => Ok(make_transform_identity()), - EventTransformQuery::MinMaxAvgDev => Ok(make_transform_min_max_avg()), - EventTransformQuery::ArrayPick(..) => Err(Error::with_msg_no_trace(format!( - "build_event_transform don't know what to do {trev:?}" - ))), - EventTransformQuery::PulseIdDiff => Ok(make_transform_pulse_id_diff()), - EventTransformQuery::EventBlobsVerbatim => Err(Error::with_msg_no_trace(format!( - "build_event_transform don't know what to do {trev:?}" - ))), - EventTransformQuery::EventBlobsUncompressed => Err(Error::with_msg_no_trace(format!( - "build_event_transform don't know what to do {trev:?}" - ))), - } +) -> Result { + let trev = tr.get_tr_time_binning(); + let res = match trev { + TimeBinningTransformQuery::None => TimeBinnableStreamBox(inp), + _ => { + // TODO apply the desired transformations. + todo!() + } + }; + Ok(res) } pub fn build_full_transform_collectable(