From 4a250227cd0e7fc615bad9f6ad6eeb1932808021 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 1 Dec 2022 22:45:44 +0100 Subject: [PATCH] WIP --- httpret/src/httpret.rs | 2 ++ items_0/src/collect_c.rs | 36 ++++++++++++++++++++++++++++++++++ nodenet/src/conn.rs | 1 - streams/src/lib.rs | 1 + streams/src/plaineventsjson.rs | 18 +---------------- streams/src/timebin.rs | 7 ++++--- streams/src/timebinnedjson.rs | 32 ++++++++++++++++++++++++++++++ 7 files changed, 76 insertions(+), 21 deletions(-) create mode 100644 streams/src/timebinnedjson.rs diff --git a/httpret/src/httpret.rs b/httpret/src/httpret.rs index af33654..5af163a 100644 --- a/httpret/src/httpret.rs +++ b/httpret/src/httpret.rs @@ -322,6 +322,8 @@ async fn http_service_inner( h.handle(req, ctx, &node_config).await } else if let Some(h) = channel_status::ChannelConnectionStatusEvents::handler(&req) { h.handle(req, ctx, &node_config).await + } else if let Some(h) = api4::binned::BinnedHandler::handler(&req) { + h.handle(req, &node_config).await } else if path == "/api/4/binned" { if req.method() == Method::GET { Ok(binned(req, ctx, node_config).await?) diff --git a/items_0/src/collect_c.rs b/items_0/src/collect_c.rs index 31e0acf..0bba76a 100644 --- a/items_0/src/collect_c.rs +++ b/items_0/src/collect_c.rs @@ -116,3 +116,39 @@ impl Collectable for Box { EventsCollector::new(coll) } } + +#[derive(Debug)] +pub struct TimeBinnedCollector {} + +impl Collector for TimeBinnedCollector { + type Input = Box; + type Output = Box; + + fn len(&self) -> usize { + todo!() + } + + fn ingest(&mut self, _item: &mut Self::Input) { + todo!() + } + + fn set_range_complete(&mut self) { + todo!() + } + + fn set_timed_out(&mut self) { + todo!() + } + + fn result(&mut self) -> Result { + todo!() + } +} + +impl Collectable for Box { + type Collector = TimeBinnedCollector; + + fn new_collector(&self) -> Self::Collector { + todo!() + } +} diff --git a/nodenet/src/conn.rs b/nodenet/src/conn.rs index 02f52c4..f244f59 100644 --- a/nodenet/src/conn.rs +++ b/nodenet/src/conn.rs @@ -65,7 +65,6 @@ async fn events_conn_handler_inner_try( { match k { Ok(StreamItem::DataItem(item)) => { - info!("GOT FRAME: {:?}", item); frames.push(item); } Ok(item) => { diff --git a/streams/src/lib.rs b/streams/src/lib.rs index 704d06b..44ef8d1 100644 --- a/streams/src/lib.rs +++ b/streams/src/lib.rs @@ -12,3 +12,4 @@ pub mod tcprawclient; #[cfg(test)] pub mod test; pub mod timebin; +pub mod timebinnedjson; diff --git a/streams/src/plaineventsjson.rs b/streams/src/plaineventsjson.rs index 367ce47..df84742 100644 --- a/streams/src/plaineventsjson.rs +++ b/streams/src/plaineventsjson.rs @@ -1,28 +1,12 @@ use crate::tcprawclient::open_tcp_streams; -use bytes::Bytes; use err::Error; -use futures_util::{Stream, StreamExt}; -use items::Sitemty; #[allow(unused)] use netpod::log::*; use netpod::Cluster; use serde::Serialize; use serde_json::Value as JsonValue; -use std::pin::Pin; -use std::task::{Context, Poll}; use std::time::{Duration, Instant}; -pub struct BytesStream(Pin> + Send>>); - -impl Stream for BytesStream { - type Item = Sitemty; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - StreamExt::poll_next_unpin(&mut self, cx) - } -} - -// TODO remove? pub async fn plain_events_json(query: SER, cluster: &Cluster) -> Result where SER: Serialize, @@ -41,7 +25,7 @@ where stream }; let stream = { items_2::merger::Merger::new(inps, 1) }; - let deadline = Instant::now() + Duration::from_millis(8000); + let deadline = Instant::now() + Duration::from_millis(3500); let events_max = 100; let collected = crate::collect::collect(stream, deadline, events_max).await?; let jsval = serde_json::to_value(&collected)?; diff --git a/streams/src/timebin.rs b/streams/src/timebin.rs index 96bb723..1a4d938 100644 --- a/streams/src/timebin.rs +++ b/streams/src/timebin.rs @@ -1,6 +1,7 @@ use err::Error; use futures_util::{Future, FutureExt, Stream, StreamExt}; use items::{sitem_data, RangeCompletableItem, Sitemty, StreamItem}; +//use items_0::{TimeBinnable, TimeBinner}; use items_2::timebin::{TimeBinnable, TimeBinner}; use netpod::log::*; use std::fmt; @@ -11,19 +12,19 @@ use std::time::Instant; #[allow(unused)] macro_rules! trace2 { (D$($arg:tt)*) => (); - ($($arg:tt)*) => (eprintln!($($arg)*)); + ($($arg:tt)*) => (trace!($($arg)*)); } #[allow(unused)] macro_rules! trace3 { (D$($arg:tt)*) => (); - ($($arg:tt)*) => (eprintln!($($arg)*)); + ($($arg:tt)*) => (trace!($($arg)*)); } #[allow(unused)] macro_rules! trace4 { (D$($arg:tt)*) => (); - ($($arg:tt)*) => (eprintln!($($arg)*)); + ($($arg:tt)*) => (trace!($($arg)*)); } type MergeInp = Pin> + Send>>; diff --git a/streams/src/timebinnedjson.rs b/streams/src/timebinnedjson.rs new file mode 100644 index 0000000..ae366fe --- /dev/null +++ b/streams/src/timebinnedjson.rs @@ -0,0 +1,32 @@ +use crate::tcprawclient::open_tcp_streams; +use err::Error; +use futures_util::StreamExt; +#[allow(unused)] +use netpod::log::*; +use netpod::Cluster; +use serde::Serialize; +use serde_json::Value as JsonValue; +use std::time::{Duration, Instant}; + +pub async fn timebinned_json(query: SER, cluster: &Cluster) -> Result +where + SER: Serialize, +{ + // TODO should be able to ask for data-events only, instead of mixed data and status events. + let inps = open_tcp_streams::<_, items_2::channelevents::ChannelEvents>(&query, cluster).await?; + // TODO propagate also the max-buf-len for the first stage event reader: + let stream = { items_2::merger::Merger::new(inps, 1) }; + let events_max = 10000; + let do_time_weight = true; + let deadline = Instant::now() + Duration::from_millis(7500); + let stream = Box::pin(stream); + let stream = crate::timebin::TimeBinnedStream::new(stream, Vec::new(), do_time_weight, deadline); + if false { + let mut stream = stream; + let _: Option>> = stream.next().await; + panic!() + } + let collected = crate::collect::collect(stream, deadline, events_max).await?; + let jsval = serde_json::to_value(&collected)?; + Ok(jsval) +}