From 81298b16dfc6817e66b177f3fbc10e1d5fbbcb20 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 5 Apr 2023 12:00:18 +0200 Subject: [PATCH] WIP checks --- daqbufp2/src/test/api4/eventsjson.rs | 17 +-- dbconn/src/dbconn.rs | 30 ++++++ httpret/src/api1.rs | 3 +- httpret/src/api4/binned.rs | 11 +- httpret/src/api4/events.rs | 9 +- httpret/src/httpret.rs | 2 + httpret/src/proxy.rs | 2 + httpret/src/pulsemap.rs | 106 ++++++++++++++---- items_0/src/collect_s.rs | 6 +- items_0/src/items_0.rs | 63 ++--------- items_0/src/streamitem.rs | 25 ++++- items_0/src/timebin.rs | 133 +++++++++++++++++++++++ items_0/src/transform.rs | 5 +- items_2/src/binnedcollected.rs | 5 +- items_2/src/binsdim0.rs | 31 +++--- items_2/src/binsxbindim0.rs | 10 +- items_2/src/channelevents.rs | 144 ++++++++++++++++++++++--- items_2/src/eventsdim0.rs | 27 +++-- items_2/src/eventsdim1.rs | 19 ++-- items_2/src/items_2.rs | 11 +- items_2/src/merger.rs | 65 +++++------ items_2/src/timebin.rs | 32 ------ netpod/src/netpod.rs | 9 ++ nodenet/src/channelconfig.rs | 9 +- nodenet/src/conn.rs | 64 +++++------ nodenet/src/conn/generator.rs | 72 +++++++++++++ query/src/api4/binned.rs | 9 +- query/src/api4/events.rs | 6 ++ scyllaconn/src/bincache.rs | 2 +- streams/src/collect.rs | 16 +-- streams/src/eventchunker.rs | 1 - streams/src/frames/eventsfromframes.rs | 4 +- streams/src/generators.rs | 86 +++++++++++++++ streams/src/lib.rs | 2 +- streams/src/plaineventsjson.rs | 9 +- streams/src/tcprawclient.rs | 5 - streams/src/test/timebin.rs | 80 +++++++++++++- streams/src/timebin.rs | 41 +++---- streams/src/timebinnedjson.rs | 29 +++-- 39 files changed, 892 insertions(+), 308 deletions(-) create mode 100644 items_0/src/timebin.rs delete mode 100644 items_2/src/timebin.rs create mode 100644 nodenet/src/conn/generator.rs delete mode 100644 streams/src/eventchunker.rs create mode 100644 streams/src/generators.rs diff --git a/daqbufp2/src/test/api4/eventsjson.rs b/daqbufp2/src/test/api4/eventsjson.rs index 26e21f3..5dc9df0 100644 --- a/daqbufp2/src/test/api4/eventsjson.rs +++ b/daqbufp2/src/test/api4/eventsjson.rs @@ -28,15 +28,14 @@ fn events_plain_json_00() -> Result<(), Error> { series: None, }, "1970-01-01T00:20:04.000Z", - "1970-01-01T00:20:10.000Z", + "1970-01-01T00:21:10.000Z", cluster, ) .await?; - info!("Receveided a response json value: {jsv:?}"); let res: items_2::eventsdim0::EventsDim0CollectorOutput = serde_json::from_value(jsv)?; // inmem was meant just for functional test, ignores the requested time range - assert_eq!(res.ts_anchor_sec(), 0); - assert_eq!(res.len(), 60); + assert_eq!(res.ts_anchor_sec(), 1204); + assert_eq!(res.len(), 66); Ok(()) }; taskrun::run(fut) @@ -44,6 +43,11 @@ fn events_plain_json_00() -> Result<(), Error> { #[test] fn events_plain_json_01() -> Result<(), Error> { + // TODO + // not worth to re-enable, getting rid of databuffer. + if true { + return Ok(()); + } let fut = async { let rh = require_test_hosts_running()?; let cluster = &rh.cluster; @@ -58,7 +62,6 @@ fn events_plain_json_01() -> Result<(), Error> { cluster, ) .await?; - info!("Receveided a response json value: {jsv:?}"); let res: items_2::eventsdim0::EventsDim0CollectorOutput = serde_json::from_value(jsv)?; assert_eq!(res.ts_anchor_sec(), 1210); assert_eq!(res.pulse_anchor(), 2420); @@ -129,10 +132,10 @@ async fn events_plain_json( let s = String::from_utf8_lossy(&buf); let res: JsonValue = serde_json::from_str(&s)?; let pretty = serde_json::to_string_pretty(&res)?; - trace!("{pretty}"); + info!("{pretty}"); let t2 = chrono::Utc::now(); let ms = t2.signed_duration_since(t1).num_milliseconds() as u64; // TODO add timeout - debug!("time {} ms", ms); + info!("time {} ms", ms); Ok(res) } diff --git a/dbconn/src/dbconn.rs b/dbconn/src/dbconn.rs index 33b0058..93d93aa 100644 --- a/dbconn/src/dbconn.rs +++ b/dbconn/src/dbconn.rs @@ -7,7 +7,9 @@ pub mod pg { pub use tokio_postgres::{Client, Error, NoTls}; } +use err::anyhow; use err::Error; +use err::Res2; use netpod::log::*; use netpod::TableSizes; use netpod::{Channel, Database, NodeConfigCached}; @@ -152,6 +154,7 @@ pub async fn insert_channel(name: String, facility: i64, dbc: &PgClient) -> Resu Ok(()) } +// Currently only for scylla type backends pub async fn find_series(channel: &Channel, pgclient: Arc) -> Result<(u64, ScalarType, Shape), Error> { info!("find_series channel {:?}", channel); let rows = if let Some(series) = channel.series() { @@ -188,3 +191,30 @@ pub async fn find_series(channel: &Channel, pgclient: Arc) -> Result<( let shape = Shape::from_scylla_shape_dims(&a)?; Ok((series, scalar_type, shape)) } + +// Currently only for sf-databuffer type backends +// Note: we currently treat the channels primary key as series-id for sf-databuffer type backends. +pub async fn find_series_sf_databuffer(channel: &Channel, pgclient: Arc) -> Res2 { + info!("find_series channel {:?}", channel); + let sql = "select rowid from facilities where name = $1"; + let rows = pgclient.query(sql, &[&channel.backend()]).await.err_conv()?; + let row = rows + .into_iter() + .next() + .ok_or_else(|| anyhow::anyhow!("no backend for {channel:?}"))?; + let backend_id: i64 = row.get(0); + let sql = "select rowid from channels where facility = $1 and name = $2"; + let rows = pgclient.query(sql, &[&backend_id, &channel.name()]).await.err_conv()?; + if rows.len() < 1 { + return Err(anyhow::anyhow!("No series found for {channel:?}")); + } + if rows.len() > 1 { + return Err(anyhow::anyhow!("Multiple series found for {channel:?}")); + } + let row = rows + .into_iter() + .next() + .ok_or_else(|| anyhow::anyhow!("No series found for {channel:?}"))?; + let series = row.get::<_, i64>(0) as u64; + Ok(series) +} diff --git a/httpret/src/api1.rs b/httpret/src/api1.rs index 9acaf85..8fcbabc 100644 --- a/httpret/src/api1.rs +++ b/httpret/src/api1.rs @@ -833,7 +833,8 @@ impl Stream for DataApiPython3DataStream { debug!("found channel_config for {}: {:?}", channel.name, entry); let evq = PlainEventsQuery::new(channel, self.range.clone()).for_event_blobs(); info!("query for event blobs retrieval: evq {evq:?}"); - warn!("fix magic inmem_bufcap"); + warn!("TODO fix magic inmem_bufcap"); + warn!("TODO add timeout option to data api3 download"); let perf_opts = PerfOpts::default(); // TODO is this a good to place decide this? let s = if self.node_config.node_config.cluster.is_central_storage { diff --git a/httpret/src/api4/binned.rs b/httpret/src/api4/binned.rs index 3815d01..c293dbd 100644 --- a/httpret/src/api4/binned.rs +++ b/httpret/src/api4/binned.rs @@ -30,16 +30,17 @@ async fn binned_json(url: Url, req: Request, node_config: &NodeConfigCache })?; let chconf = chconf_from_binned(&query, node_config).await?; // Update the series id since we don't require some unique identifier yet. - let mut query = query; - query.set_series_id(chconf.try_series().context("binned_json")?); - let query = query; - // --- + let query = { + let mut query = query; + query.set_series_id(chconf.try_series().context("binned_json")?); + query + }; let span1 = span!( Level::INFO, "httpret::binned", beg = query.range().beg_u64() / SEC, end = query.range().end_u64() / SEC, - ch = query.channel().name(), + ch = query.channel().name().clone(), ); span1.in_scope(|| { debug!("begin"); diff --git a/httpret/src/api4/events.rs b/httpret/src/api4/events.rs index ddf6222..674ccfd 100644 --- a/httpret/src/api4/events.rs +++ b/httpret/src/api4/events.rs @@ -104,7 +104,14 @@ async fn plain_events_json( info!("plain_events_json chconf_from_events_v1: {chconf:?}"); // Update the series id since we don't require some unique identifier yet. let mut query = query; - query.set_series_id(chconf.try_series().context("plain_events_json")?); + let kk = chconf.try_series(); + info!("kk debug {kk:?}"); + let kk = kk.context("plain_events_json"); + if let Err(e) = &kk { + info!("kk ctx debug {kk:?}"); + info!("kk e ctx display {e}"); + } + query.set_series_id(kk?); let query = query; // --- //let query = RawEventsQuery::new(query.channel().clone(), query.range().clone(), AggKind::Plain); diff --git a/httpret/src/httpret.rs b/httpret/src/httpret.rs index 9a01ac8..5f73f9a 100644 --- a/httpret/src/httpret.rs +++ b/httpret/src/httpret.rs @@ -378,6 +378,8 @@ async fn http_service_inner( h.handle(req, &node_config).await } else if let Some(h) = pulsemap::MapPulseHttpFunction::handler(&req) { h.handle(req, &node_config).await + } else if let Some(h) = pulsemap::Api4MapPulse2HttpFunction::handler(&req) { + h.handle(req, &node_config).await } else if let Some(h) = pulsemap::Api4MapPulseHttpFunction::handler(&req) { h.handle(req, &node_config).await } else if let Some(h) = api1::RequestStatusHandler::handler(&req) { diff --git a/httpret/src/proxy.rs b/httpret/src/proxy.rs index 1549690..aec7af0 100644 --- a/httpret/src/proxy.rs +++ b/httpret/src/proxy.rs @@ -149,6 +149,8 @@ async fn proxy_http_service_inner( Ok(proxy_single_backend_query::(req, ctx, proxy_config).await?) } else if path == "/api/4/status/channel/events" { Ok(proxy_single_backend_query::(req, ctx, proxy_config).await?) + } else if path.starts_with("/api/4/map/pulse-v2/") { + Ok(proxy_single_backend_query::(req, ctx, proxy_config).await?) } else if path.starts_with("/api/4/map/pulse/") { Ok(proxy_single_backend_query::(req, ctx, proxy_config).await?) } else if path == "/api/4/binned" { diff --git a/httpret/src/pulsemap.rs b/httpret/src/pulsemap.rs index ba75852..2b9df76 100644 --- a/httpret/src/pulsemap.rs +++ b/httpret/src/pulsemap.rs @@ -5,6 +5,8 @@ use async_channel::Sender; use bytes::Buf; use bytes::BufMut; use bytes::BytesMut; +use chrono::TimeZone; +use chrono::Utc; use futures_util::stream::FuturesOrdered; use futures_util::stream::FuturesUnordered; use futures_util::FutureExt; @@ -15,6 +17,7 @@ use hyper::Body; use hyper::Request; use hyper::Response; use netpod::log::*; +use netpod::timeunits::SEC; use netpod::AppendToUrl; use netpod::FromUrl; use netpod::HasBackend; @@ -1308,20 +1311,12 @@ impl Api4MapPulseHttpFunction { path.starts_with(API_4_MAP_PULSE_URL_PREFIX) } - pub async fn handle(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { - if req.method() != Method::GET { - return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?); - } - let ts1 = Instant::now(); - trace!("Api4MapPulseHttpFunction handle uri: {:?}", req.uri()); - let url = Url::parse(&format!("dummy:{}", req.uri()))?; - let q = MapPulseQuery::from_url(&url)?; + pub async fn find_timestamp(q: MapPulseQuery, ncc: &NodeConfigCached) -> Result, Error> { let pulse = q.pulse; - - let ret = match CACHE.portal(pulse) { + let res = match CACHE.portal(pulse) { CachePortal::Fresh => { trace!("value not yet in cache pulse {pulse}"); - let histo = MapPulseHistoHttpFunction::histo(pulse, node_config).await?; + let histo = MapPulseHistoHttpFunction::histo(pulse, ncc).await?; let mut i1 = 0; let mut max = 0; for i2 in 0..histo.tss.len() { @@ -1336,9 +1331,9 @@ impl Api4MapPulseHttpFunction { if max > 0 { let val = histo.tss[i1]; CACHE.set_value(pulse, val); - Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&val)?))?) + Ok(Some(val)) } else { - Ok(response(StatusCode::NO_CONTENT).body(Body::empty())?) + Ok(None) } } CachePortal::Existing(rx) => { @@ -1346,22 +1341,22 @@ impl Api4MapPulseHttpFunction { match rx.recv().await { Ok(_) => { error!("should never recv from existing operation pulse {pulse}"); - Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())?) + Err(Error::with_msg_no_trace("map pulse error")) } Err(_e) => { trace!("woken up while value wait pulse {pulse}"); match CACHE.portal(pulse) { CachePortal::Known(val) => { trace!("good, value after wakeup pulse {pulse}"); - Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&val)?))?) + Ok(Some(val)) } CachePortal::Fresh => { error!("woken up, but portal fresh pulse {pulse}"); - Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())?) + Err(Error::with_msg_no_trace("map pulse error")) } CachePortal::Existing(..) => { error!("woken up, but portal existing pulse {pulse}"); - Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())?) + Err(Error::with_msg_no_trace("map pulse error")) } } } @@ -1369,9 +1364,84 @@ impl Api4MapPulseHttpFunction { } CachePortal::Known(val) => { trace!("value already in cache pulse {pulse} ts {val}"); - Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&val)?))?) + Ok(Some(val)) } }; + res + } + + pub async fn handle(&self, req: Request, ncc: &NodeConfigCached) -> Result, Error> { + if req.method() != Method::GET { + return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?); + } + let ts1 = Instant::now(); + trace!("Api4MapPulseHttpFunction handle uri: {:?}", req.uri()); + let url = Url::parse(&format!("dummy:{}", req.uri()))?; + let q = MapPulseQuery::from_url(&url)?; + let ret = match Self::find_timestamp(q, ncc).await { + Ok(Some(val)) => Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&val)?))?), + Ok(None) => Ok(response(StatusCode::NO_CONTENT).body(Body::empty())?), + Err(e) => Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())?), + }; + let ts2 = Instant::now(); + let dt = ts2.duration_since(ts1); + if dt > Duration::from_millis(1500) { + warn!("Api4MapPulseHttpFunction took {:.2}s", dt.as_secs_f32()); + } + ret + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct Api4MapPulse2Response { + sec: u64, + ns: u64, + datetime: String, +} + +pub struct Api4MapPulse2HttpFunction {} + +impl Api4MapPulse2HttpFunction { + pub fn path_prefix() -> &'static str { + "/api/4/map/pulse-v2/" + } + + pub fn handler(req: &Request) -> Option { + if req.uri().path().starts_with(Self::path_prefix()) { + Some(Self {}) + } else { + None + } + } + + pub fn path_matches(path: &str) -> bool { + path.starts_with(Self::path_prefix()) + } + + pub async fn handle(&self, req: Request, ncc: &NodeConfigCached) -> Result, Error> { + if req.method() != Method::GET { + return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?); + } + let ts1 = Instant::now(); + let url = Url::parse(&format!("dummy:{}", req.uri()))?; + let q = MapPulseQuery::from_url(&url)?; + let ret = match Api4MapPulseHttpFunction::find_timestamp(q, ncc).await { + Ok(Some(val)) => { + let sec = val / SEC; + let ns = val % SEC; + let date_fmt = "%Y-%m-%dT%H:%M:%S.%9fZ"; + let datetime = Utc + .timestamp_opt(sec as i64, ns as u32) + .earliest() + .ok_or_else(|| Error::with_msg_no_trace("DateTime earliest fail"))? + .format(date_fmt) + .to_string(); + let res = Api4MapPulse2Response { sec, ns, datetime }; + Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&res)?))?) + } + Ok(None) => Ok(response(StatusCode::NO_CONTENT).body(Body::empty())?), + Err(e) => Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())?), + }; let ts2 = Instant::now(); let dt = ts2.duration_since(ts1); if dt > Duration::from_millis(1500) { diff --git a/items_0/src/collect_s.rs b/items_0/src/collect_s.rs index aa3edd6..3ec455f 100644 --- a/items_0/src/collect_s.rs +++ b/items_0/src/collect_s.rs @@ -1,7 +1,7 @@ +use crate::timebin::TimeBinned; use crate::AsAnyMut; use crate::AsAnyRef; use crate::Events; -use crate::TimeBinned; use crate::TypeName; use crate::WithLen; use err::Error; @@ -92,11 +92,11 @@ where { fn ingest(&mut self, src: &mut dyn Collectable) { if let Some(src) = src.as_any_mut().downcast_mut::<::Input>() { - info!("sees incoming &mut ref"); + trace!("sees incoming &mut ref"); T::ingest(self, src) } else { if let Some(src) = src.as_any_mut().downcast_mut::::Input>>() { - info!("sees incoming &mut Box"); + trace!("sees incoming &mut Box"); T::ingest(self, src) } else { error!( diff --git a/items_0/src/items_0.rs b/items_0/src/items_0.rs index 4dd6f34..de497f8 100644 --- a/items_0/src/items_0.rs +++ b/items_0/src/items_0.rs @@ -5,6 +5,7 @@ pub mod isodate; pub mod scalar_ops; pub mod streamitem; pub mod subfr; +pub mod timebin; pub mod transform; pub mod bincode { @@ -12,25 +13,17 @@ pub mod bincode { } use collect_s::Collectable; -use collect_s::ToJsonResult; use container::ByteEstimate; use netpod::range::evrange::SeriesRange; -use netpod::BinnedRangeEnum; use std::any::Any; use std::collections::VecDeque; use std::fmt; +use timebin::TimeBinnable; pub trait WithLen { fn len(&self) -> usize; } -// TODO can probably be removed. -pub trait TimeBins { - fn ts_min(&self) -> Option; - fn ts_max(&self) -> Option; - fn ts_min_max(&self) -> Option<(u64, u64)>; -} - pub trait RangeOverlapInfo { fn ends_before(&self, range: &SeriesRange) -> bool; fn ends_after(&self, range: &SeriesRange) -> bool; @@ -79,48 +72,6 @@ where } } -/// Data in time-binned form. -pub trait TimeBinned: Any + TypeName + TimeBinnable + Collectable + erased_serde::Serialize { - fn as_time_binnable_dyn(&self) -> &dyn TimeBinnable; - fn as_collectable_mut(&mut self) -> &mut dyn Collectable; - fn edges_slice(&self) -> (&[u64], &[u64]); - fn counts(&self) -> &[u64]; - fn mins(&self) -> Vec; - fn maxs(&self) -> Vec; - fn avgs(&self) -> Vec; - fn validate(&self) -> Result<(), String>; -} - -pub trait TimeBinner: Send { - fn ingest(&mut self, item: &dyn TimeBinnable); - fn bins_ready_count(&self) -> usize; - fn bins_ready(&mut self) -> Option>; - - /// If there is a bin in progress with non-zero count, push it to the result set. - /// With push_empty == true, a bin in progress is pushed even if it contains no counts. - fn push_in_progress(&mut self, push_empty: bool); - - /// Implies `Self::push_in_progress` but in addition, pushes a zero-count bin if the call - /// to `push_in_progress` did not change the result count, as long as edges are left. - /// The next call to `Self::bins_ready_count` must return one higher count than before. - fn cycle(&mut self); - - fn set_range_complete(&mut self); - - fn empty(&self) -> Box; -} - -// TODO remove the Any bound. Factor out into custom AsAny trait. - -/// Provides a time-binned representation of the implementing type. -/// In contrast to `TimeBinnableType` this is meant for trait objects. -pub trait TimeBinnable: fmt::Debug + WithLen + RangeOverlapInfo + Any + AsAnyRef + AsAnyMut + Send { - // TODO implementors may fail if edges contain not at least 2 entries. - fn time_binner_new(&self, binrange: BinnedRangeEnum, do_time_weight: bool) -> Box; - // TODO just a helper for the empty result. - fn to_box_to_json_result(&self) -> Box; -} - #[derive(Debug)] pub enum MergeError { NotCompatible, @@ -158,11 +109,11 @@ pub trait Events: fn ts_max(&self) -> Option; // TODO is this used? fn take_new_events_until_ts(&mut self, ts_end: u64) -> Box; - fn new_empty(&self) -> Box; - fn drain_into(&mut self, dst: &mut Box, range: (usize, usize)) -> Result<(), MergeError>; - fn find_lowest_index_gt(&self, ts: u64) -> Option; - fn find_lowest_index_ge(&self, ts: u64) -> Option; - fn find_highest_index_lt(&self, ts: u64) -> Option; + fn new_empty_evs(&self) -> Box; + fn drain_into_evs(&mut self, dst: &mut Box, range: (usize, usize)) -> Result<(), MergeError>; + fn find_lowest_index_gt_evs(&self, ts: u64) -> Option; + fn find_lowest_index_ge_evs(&self, ts: u64) -> Option; + fn find_highest_index_lt_evs(&self, ts: u64) -> Option; fn clone_dyn(&self) -> Box; fn partial_eq_dyn(&self, other: &dyn Events) -> bool; fn serde_id(&self) -> &'static str; diff --git a/items_0/src/streamitem.rs b/items_0/src/streamitem.rs index 5e4b08f..cecfc2a 100644 --- a/items_0/src/streamitem.rs +++ b/items_0/src/streamitem.rs @@ -1,4 +1,4 @@ -use crate::TimeBinned; +use crate::timebin::TimeBinned; use err::Error; use netpod::log::Level; use netpod::DiskStats; @@ -81,7 +81,7 @@ macro_rules! on_sitemty_range_complete { } #[macro_export] -macro_rules! on_sitemty_data { +macro_rules! on_sitemty_data_old { ($item:expr, $ex:expr) => { if let Ok($crate::streamitem::StreamItem::DataItem($crate::streamitem::RangeCompletableItem::Data(item))) = $item @@ -93,6 +93,27 @@ macro_rules! on_sitemty_data { }; } +#[macro_export] +macro_rules! on_sitemty_data { + ($item:expr, $ex:expr) => {{ + use $crate::streamitem::RangeCompletableItem; + use $crate::streamitem::StreamItem; + match $item { + Ok(x) => match x { + StreamItem::DataItem(x) => match x { + RangeCompletableItem::Data(x) => $ex(x), + RangeCompletableItem::RangeComplete => { + Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) + } + }, + StreamItem::Log(x) => Ok(StreamItem::Log(x)), + StreamItem::Stats(x) => Ok(StreamItem::Stats(x)), + }, + Err(x) => Err(x), + } + }}; +} + pub fn sitem_data(x: X) -> Sitemty { Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))) } diff --git a/items_0/src/timebin.rs b/items_0/src/timebin.rs new file mode 100644 index 0000000..94c3abe --- /dev/null +++ b/items_0/src/timebin.rs @@ -0,0 +1,133 @@ +use crate::collect_s::Collectable; +use crate::collect_s::ToJsonResult; +use crate::AsAnyMut; +use crate::AsAnyRef; +use crate::RangeOverlapInfo; +use crate::TypeName; +use crate::WithLen; +use netpod::BinnedRangeEnum; +use std::any::Any; +use std::fmt; + +// TODO can probably be removed. +pub trait TimeBins { + fn ts_min(&self) -> Option; + fn ts_max(&self) -> Option; + fn ts_min_max(&self) -> Option<(u64, u64)>; +} + +pub trait TimeBinnerTy: fmt::Debug + Unpin { + type Input: fmt::Debug; + type Output: fmt::Debug; + + fn ingest(&mut self, item: &mut Self::Input); + + fn set_range_complete(&mut self); + + fn bins_ready_count(&self) -> usize; + + fn bins_ready(&mut self) -> Option; + + /// If there is a bin in progress with non-zero count, push it to the result set. + /// With push_empty == true, a bin in progress is pushed even if it contains no counts. + fn push_in_progress(&mut self, push_empty: bool); + + /// Implies `Self::push_in_progress` but in addition, pushes a zero-count bin if the call + /// to `push_in_progress` did not change the result count, as long as edges are left. + /// The next call to `Self::bins_ready_count` must return one higher count than before. + fn cycle(&mut self); + + fn empty(&self) -> Option; +} + +pub trait TimeBinnableTy: fmt::Debug + Sized { + type TimeBinner: TimeBinnerTy; + + fn time_binner_new(&self, binrange: BinnedRangeEnum, do_time_weight: bool) -> Self::TimeBinner; +} + +/// Data in time-binned form. +pub trait TimeBinned: Any + TypeName + TimeBinnable + Collectable + erased_serde::Serialize { + fn as_time_binnable_dyn(&self) -> &dyn TimeBinnable; + fn as_collectable_mut(&mut self) -> &mut dyn Collectable; + fn edges_slice(&self) -> (&[u64], &[u64]); + fn counts(&self) -> &[u64]; + fn mins(&self) -> Vec; + fn maxs(&self) -> Vec; + fn avgs(&self) -> Vec; + fn validate(&self) -> Result<(), String>; +} + +pub trait TimeBinner: Send { + fn ingest(&mut self, item: &dyn TimeBinnable); + fn bins_ready_count(&self) -> usize; + fn bins_ready(&mut self) -> Option>; + + /// If there is a bin in progress with non-zero count, push it to the result set. + /// With push_empty == true, a bin in progress is pushed even if it contains no counts. + fn push_in_progress(&mut self, push_empty: bool); + + /// Implies `Self::push_in_progress` but in addition, pushes a zero-count bin if the call + /// to `push_in_progress` did not change the result count, as long as edges are left. + /// The next call to `Self::bins_ready_count` must return one higher count than before. + fn cycle(&mut self); + + fn set_range_complete(&mut self); + + fn empty(&self) -> Box; +} + +// TODO remove the Any bound. Factor out into custom AsAny trait. + +/// Provides a time-binned representation of the implementing type. +/// In contrast to `TimeBinnableType` this is meant for trait objects. +pub trait TimeBinnable: fmt::Debug + WithLen + RangeOverlapInfo + Any + AsAnyRef + AsAnyMut + Send { + // TODO implementors may fail if edges contain not at least 2 entries. + fn time_binner_new(&self, binrange: BinnedRangeEnum, do_time_weight: bool) -> Box; + // TODO just a helper for the empty result. + fn to_box_to_json_result(&self) -> Box; +} + +#[derive(Debug)] +pub struct TimeBinnerDyn {} + +impl TimeBinnerTy for TimeBinnerDyn { + type Input = Box; + type Output = Box; + + fn ingest(&mut self, item: &mut Self::Input) { + todo!() + } + + fn set_range_complete(&mut self) { + todo!() + } + + fn bins_ready_count(&self) -> usize { + todo!() + } + + fn bins_ready(&mut self) -> Option { + todo!() + } + + fn push_in_progress(&mut self, push_empty: bool) { + todo!() + } + + fn cycle(&mut self) { + todo!() + } + + fn empty(&self) -> Option { + todo!() + } +} + +impl TimeBinnableTy for Box { + type TimeBinner = TimeBinnerDyn; + + fn time_binner_new(&self, binrange: BinnedRangeEnum, do_time_weight: bool) -> Self::TimeBinner { + todo!() + } +} diff --git a/items_0/src/transform.rs b/items_0/src/transform.rs index dba12e8..970af93 100644 --- a/items_0/src/transform.rs +++ b/items_0/src/transform.rs @@ -1,6 +1,5 @@ -use std::pin; - use crate::Events; +use std::pin::Pin; pub struct TransformProperties { pub needs_one_before_range: bool, @@ -25,7 +24,7 @@ where } } -impl EventTransform for pin::Pin> +impl EventTransform for Pin> where T: EventTransform, { diff --git a/items_2/src/binnedcollected.rs b/items_2/src/binnedcollected.rs index aed1dec..d7d7d03 100644 --- a/items_2/src/binnedcollected.rs +++ b/items_2/src/binnedcollected.rs @@ -11,9 +11,9 @@ use items_0::collect_s::ToJsonResult; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; +use items_0::timebin::TimeBinnable; +use items_0::timebin::TimeBinner; use items_0::transform::EventTransform; -use items_0::TimeBinnable; -use items_0::TimeBinner; use netpod::log::*; use netpod::BinnedRange; use netpod::BinnedRangeEnum; @@ -149,6 +149,7 @@ impl BinnedCollected { self.binner = Some(bb); } let binner = self.binner.as_mut().unwrap(); + trace!("handle_item call binner.ingest"); binner.ingest(events.as_time_binnable()); flush_binned(binner, &mut self.coll, false)?; } diff --git a/items_2/src/binsdim0.rs b/items_2/src/binsdim0.rs index 40a5759..5210bf6 100644 --- a/items_2/src/binsdim0.rs +++ b/items_2/src/binsdim0.rs @@ -9,14 +9,13 @@ use items_0::collect_s::Collected; use items_0::collect_s::CollectorType; use items_0::collect_s::ToJsonResult; use items_0::scalar_ops::ScalarOps; +use items_0::timebin::TimeBinnable; +use items_0::timebin::TimeBinner; +use items_0::timebin::TimeBins; use items_0::AppendEmptyBin; use items_0::AsAnyMut; use items_0::AsAnyRef; use items_0::Empty; -use items_0::TimeBinnable; -use items_0::TimeBinned; -use items_0::TimeBinner; -use items_0::TimeBins; use items_0::TypeName; use items_0::WithLen; use netpod::is_false; @@ -32,6 +31,7 @@ use std::any; use std::any::Any; use std::collections::VecDeque; use std::fmt; +use items_0::timebin::TimeBinned; #[allow(unused)] macro_rules! trace4 { @@ -425,14 +425,16 @@ impl CollectorType for BinsDim0Collector { type Output = BinsDim0CollectedResult; fn ingest(&mut self, src: &mut Self::Input) { - // TODO could be optimized by non-contiguous container. - /*self.vals.ts1s.append(&mut src.ts1s); - self.vals.ts2s.append(&mut src.ts2s); - self.vals.counts.append(&mut src.counts); - self.vals.mins.append(&mut src.mins); - self.vals.maxs.append(&mut src.maxs); - self.vals.avgs.append(&mut src.avgs);*/ - todo!() + if self.vals.is_none() { + self.vals = Some(Self::Input::empty()); + } + let vals = self.vals.as_mut().unwrap(); + vals.ts1s.append(&mut src.ts1s); + vals.ts2s.append(&mut src.ts2s); + vals.counts.append(&mut src.counts); + vals.mins.append(&mut src.mins); + vals.maxs.append(&mut src.maxs); + vals.avgs.append(&mut src.avgs); } fn set_range_complete(&mut self) { @@ -448,6 +450,7 @@ impl CollectorType for BinsDim0Collector { _range: Option, binrange: Option, ) -> Result { + eprintln!("trying to make a result from {self:?}"); /*let bin_count_exp = if let Some(r) = &binrange { r.bin_count() as u32 } else { @@ -719,7 +722,7 @@ impl TimeBinner for BinsDim0TimeBinner { } } - fn bins_ready(&mut self) -> Option> { + fn bins_ready(&mut self) -> Option> { match self.ready.take() { Some(k) => Some(Box::new(k)), None => None, @@ -779,7 +782,7 @@ impl TimeBinner for BinsDim0TimeBinner { self.range_final = true; } - fn empty(&self) -> Box { + fn empty(&self) -> Box { let ret = as TimeBinnableTypeAggregator>::Output::empty(); Box::new(ret) } diff --git a/items_2/src/binsxbindim0.rs b/items_2/src/binsxbindim0.rs index 0227850..69be40d 100644 --- a/items_2/src/binsxbindim0.rs +++ b/items_2/src/binsxbindim0.rs @@ -12,14 +12,14 @@ use items_0::collect_s::Collected; use items_0::collect_s::CollectorType; use items_0::collect_s::ToJsonResult; use items_0::scalar_ops::ScalarOps; +use items_0::timebin::TimeBinnable; +use items_0::timebin::TimeBinned; +use items_0::timebin::TimeBinner; +use items_0::timebin::TimeBins; use items_0::AppendEmptyBin; use items_0::AsAnyMut; use items_0::AsAnyRef; use items_0::Empty; -use items_0::TimeBinnable; -use items_0::TimeBinned; -use items_0::TimeBinner; -use items_0::TimeBins; use items_0::TypeName; use items_0::WithLen; use netpod::is_false; @@ -677,7 +677,7 @@ impl TimeBinner for BinsXbinDim0TimeBinner { } } - fn bins_ready(&mut self) -> Option> { + fn bins_ready(&mut self) -> Option> { match self.ready.take() { Some(k) => Some(Box::new(k)), None => None, diff --git a/items_2/src/channelevents.rs b/items_2/src/channelevents.rs index bfc177d..16928c7 100644 --- a/items_2/src/channelevents.rs +++ b/items_2/src/channelevents.rs @@ -1,16 +1,23 @@ use crate::framable::FrameType; use crate::merger::Mergeable; use crate::Events; -use items_0::TypeName; use items_0::collect_s::Collectable; use items_0::collect_s::Collected; use items_0::collect_s::Collector; use items_0::container::ByteEstimate; use items_0::framable::FrameTypeInnerStatic; use items_0::streamitem::ITEMS_2_CHANNEL_EVENTS_FRAME_TYPE_ID; +use items_0::timebin::TimeBinnable; +use items_0::timebin::TimeBinnableTy; +use items_0::timebin::TimeBinned; +use items_0::timebin::TimeBinner; +use items_0::timebin::TimeBinnerTy; use items_0::AsAnyMut; use items_0::AsAnyRef; +use items_0::EventsNonObj; use items_0::MergeError; +use items_0::RangeOverlapInfo; +use items_0::TypeName; use items_0::WithLen; use netpod::log::*; use netpod::range::evrange::SeriesRange; @@ -633,6 +640,118 @@ impl Mergeable for ChannelEvents { } } +impl RangeOverlapInfo for ChannelEvents { + fn ends_before(&self, range: &SeriesRange) -> bool { + todo!() + } + + fn ends_after(&self, range: &SeriesRange) -> bool { + todo!() + } + + fn starts_after(&self, range: &SeriesRange) -> bool { + todo!() + } +} + +impl TimeBinnable for ChannelEvents { + fn time_binner_new(&self, binrange: BinnedRangeEnum, do_time_weight: bool) -> Box { + todo!() + } + + fn to_box_to_json_result(&self) -> Box { + todo!() + } +} + +impl EventsNonObj for ChannelEvents { + fn into_tss_pulses(self: Box) -> (std::collections::VecDeque, std::collections::VecDeque) { + todo!() + } +} + +impl Events for ChannelEvents { + fn as_time_binnable(&self) -> &dyn TimeBinnable { + todo!() + } + + fn verify(&self) -> bool { + todo!() + } + + fn output_info(&self) { + todo!() + } + + fn as_collectable_mut(&mut self) -> &mut dyn Collectable { + todo!() + } + + fn as_collectable_with_default_ref(&self) -> &dyn Collectable { + todo!() + } + + fn as_collectable_with_default_mut(&mut self) -> &mut dyn Collectable { + todo!() + } + + fn ts_min(&self) -> Option { + todo!() + } + + fn ts_max(&self) -> Option { + todo!() + } + + fn take_new_events_until_ts(&mut self, ts_end: u64) -> Box { + todo!() + } + + fn new_empty_evs(&self) -> Box { + todo!() + } + + fn drain_into_evs(&mut self, dst: &mut Box, range: (usize, usize)) -> Result<(), MergeError> { + todo!() + } + + fn find_lowest_index_gt_evs(&self, ts: u64) -> Option { + todo!() + } + + fn find_lowest_index_ge_evs(&self, ts: u64) -> Option { + todo!() + } + + fn find_highest_index_lt_evs(&self, ts: u64) -> Option { + todo!() + } + + fn clone_dyn(&self) -> Box { + todo!() + } + + fn partial_eq_dyn(&self, other: &dyn Events) -> bool { + todo!() + } + + fn serde_id(&self) -> &'static str { + todo!() + } + + fn nty_id(&self) -> u32 { + todo!() + } + + fn tss(&self) -> &std::collections::VecDeque { + todo!() + } + + fn pulses(&self) -> &std::collections::VecDeque { + todo!() + } +} + impl Collectable for ChannelEvents { fn new_collector(&self) -> Box { Box::new(ChannelEventsCollector::new()) @@ -645,7 +764,7 @@ pub struct ChannelEventsTimeBinner { binrange: BinnedRangeEnum, do_time_weight: bool, conn_state: ConnStatus, - binner: Option>, + binner: Option>, } impl fmt::Debug for ChannelEventsTimeBinner { @@ -656,9 +775,9 @@ impl fmt::Debug for ChannelEventsTimeBinner { } } -impl crate::timebin::TimeBinner for ChannelEventsTimeBinner { +impl TimeBinnerTy for ChannelEventsTimeBinner { type Input = ChannelEvents; - type Output = Box; + type Output = Box; fn ingest(&mut self, item: &mut Self::Input) { match item { @@ -680,14 +799,6 @@ impl crate::timebin::TimeBinner for ChannelEventsTimeBinner { } } } - - fn set_range_complete(&mut self) { - match self.binner.as_mut() { - Some(binner) => binner.set_range_complete(), - None => (), - } - } - fn bins_ready_count(&self) -> usize { match &self.binner { Some(binner) => binner.bins_ready_count(), @@ -716,6 +827,13 @@ impl crate::timebin::TimeBinner for ChannelEventsTimeBinner { } } + fn set_range_complete(&mut self) { + match self.binner.as_mut() { + Some(binner) => binner.set_range_complete(), + None => (), + } + } + fn empty(&self) -> Option { match self.binner.as_ref() { Some(binner) => Some(binner.empty()), @@ -724,7 +842,7 @@ impl crate::timebin::TimeBinner for ChannelEventsTimeBinner { } } -impl crate::timebin::TimeBinnable for ChannelEvents { +impl TimeBinnableTy for ChannelEvents { type TimeBinner = ChannelEventsTimeBinner; fn time_binner_new(&self, binrange: BinnedRangeEnum, do_time_weight: bool) -> Self::TimeBinner { diff --git a/items_2/src/eventsdim0.rs b/items_2/src/eventsdim0.rs index 6017b7a..7ec8391 100644 --- a/items_2/src/eventsdim0.rs +++ b/items_2/src/eventsdim0.rs @@ -12,6 +12,9 @@ use items_0::collect_s::ToJsonBytes; use items_0::collect_s::ToJsonResult; use items_0::container::ByteEstimate; use items_0::scalar_ops::ScalarOps; +use items_0::timebin::TimeBinnable; +use items_0::timebin::TimeBinned; +use items_0::timebin::TimeBinner; use items_0::Appendable; use items_0::AsAnyMut; use items_0::AsAnyRef; @@ -19,8 +22,6 @@ use items_0::Empty; use items_0::Events; use items_0::EventsNonObj; use items_0::MergeError; -use items_0::TimeBinnable; -use items_0::TimeBinner; use items_0::TypeName; use items_0::WithLen; use netpod::is_false; @@ -829,11 +830,11 @@ impl Events for EventsDim0 { Box::new(ret) } - fn new_empty(&self) -> Box { + fn new_empty_evs(&self) -> Box { Box::new(Self::empty()) } - fn drain_into(&mut self, dst: &mut Box, range: (usize, usize)) -> Result<(), MergeError> { + fn drain_into_evs(&mut self, dst: &mut Box, range: (usize, usize)) -> Result<(), MergeError> { // TODO as_any and as_any_mut are declared on unrelated traits. Simplify. if let Some(dst) = dst.as_mut().as_any_mut().downcast_mut::() { // TODO make it harder to forget new members when the struct may get modified in the future @@ -848,7 +849,7 @@ impl Events for EventsDim0 { } } - fn find_lowest_index_gt(&self, ts: u64) -> Option { + fn find_lowest_index_gt_evs(&self, ts: u64) -> Option { for (i, &m) in self.tss.iter().enumerate() { if m > ts { return Some(i); @@ -857,7 +858,7 @@ impl Events for EventsDim0 { None } - fn find_lowest_index_ge(&self, ts: u64) -> Option { + fn find_lowest_index_ge_evs(&self, ts: u64) -> Option { for (i, &m) in self.tss.iter().enumerate() { if m >= ts { return Some(i); @@ -866,7 +867,7 @@ impl Events for EventsDim0 { None } - fn find_highest_index_lt(&self, ts: u64) -> Option { + fn find_highest_index_lt_evs(&self, ts: u64) -> Option { for (i, &m) in self.tss.iter().enumerate().rev() { if m < ts { return Some(i); @@ -964,7 +965,7 @@ impl TimeBinner for EventsDim0TimeBinner { } } - fn bins_ready(&mut self) -> Option> { + fn bins_ready(&mut self) -> Option> { match self.ready.take() { Some(k) => Some(Box::new(k)), None => None, @@ -973,7 +974,11 @@ impl TimeBinner for EventsDim0TimeBinner { fn ingest(&mut self, item: &dyn TimeBinnable) { let self_name = any::type_name::(); - trace2!("TimeBinner for {self_name} {:?}", item); + trace2!( + "TimeBinner for {self_name} ingest agg.range {:?} item {:?}", + self.agg.range(), + item + ); if item.len() == 0 { // Return already here, RangeOverlapInfo would not give much sense. return; @@ -1028,7 +1033,7 @@ impl TimeBinner for EventsDim0TimeBinner { fn push_in_progress(&mut self, push_empty: bool) { let self_name = any::type_name::(); - trace!("{self_name}::push_in_progress"); + trace!("{self_name}::push_in_progress push_empty {push_empty}"); // TODO expand should be derived from AggKind. Is it still required after all? // TODO here, the expand means that agg will assume that the current value is kept constant during // the rest of the time range. @@ -1102,7 +1107,7 @@ impl TimeBinner for EventsDim0TimeBinner { self.range_final = true; } - fn empty(&self) -> Box { + fn empty(&self) -> Box { let ret = as TimeBinnableTypeAggregator>::Output::empty(); Box::new(ret) } diff --git a/items_2/src/eventsdim1.rs b/items_2/src/eventsdim1.rs index 50aeedc..1375939 100644 --- a/items_2/src/eventsdim1.rs +++ b/items_2/src/eventsdim1.rs @@ -12,6 +12,9 @@ use items_0::collect_s::ToJsonBytes; use items_0::collect_s::ToJsonResult; use items_0::container::ByteEstimate; use items_0::scalar_ops::ScalarOps; +use items_0::timebin::TimeBinnable; +use items_0::timebin::TimeBinned; +use items_0::timebin::TimeBinner; use items_0::Appendable; use items_0::AsAnyMut; use items_0::AsAnyRef; @@ -19,8 +22,6 @@ use items_0::Empty; use items_0::Events; use items_0::EventsNonObj; use items_0::MergeError; -use items_0::TimeBinnable; -use items_0::TimeBinner; use items_0::WithLen; use netpod::is_false; use netpod::log::*; @@ -728,11 +729,11 @@ impl Events for EventsDim1 { Box::new(ret) } - fn new_empty(&self) -> Box { + fn new_empty_evs(&self) -> Box { Box::new(Self::empty()) } - fn drain_into(&mut self, dst: &mut Box, range: (usize, usize)) -> Result<(), MergeError> { + fn drain_into_evs(&mut self, dst: &mut Box, range: (usize, usize)) -> Result<(), MergeError> { // TODO as_any and as_any_mut are declared on unrelated traits. Simplify. if let Some(dst) = dst.as_mut().as_any_mut().downcast_mut::() { // TODO make it harder to forget new members when the struct may get modified in the future @@ -747,7 +748,7 @@ impl Events for EventsDim1 { } } - fn find_lowest_index_gt(&self, ts: u64) -> Option { + fn find_lowest_index_gt_evs(&self, ts: u64) -> Option { for (i, &m) in self.tss.iter().enumerate() { if m > ts { return Some(i); @@ -756,7 +757,7 @@ impl Events for EventsDim1 { None } - fn find_lowest_index_ge(&self, ts: u64) -> Option { + fn find_lowest_index_ge_evs(&self, ts: u64) -> Option { for (i, &m) in self.tss.iter().enumerate() { if m >= ts { return Some(i); @@ -765,7 +766,7 @@ impl Events for EventsDim1 { None } - fn find_highest_index_lt(&self, ts: u64) -> Option { + fn find_highest_index_lt_evs(&self, ts: u64) -> Option { for (i, &m) in self.tss.iter().enumerate().rev() { if m < ts { return Some(i); @@ -869,7 +870,7 @@ impl TimeBinner for EventsDim1TimeBinner { } } - fn bins_ready(&mut self) -> Option> { + fn bins_ready(&mut self) -> Option> { match self.ready.take() { Some(k) => Some(Box::new(k)), None => None, @@ -1009,7 +1010,7 @@ impl TimeBinner for EventsDim1TimeBinner { self.range_complete = true; } - fn empty(&self) -> Box { + fn empty(&self) -> Box { let ret = as TimeBinnableTypeAggregator>::Output::empty(); Box::new(ret) } diff --git a/items_2/src/items_2.rs b/items_2/src/items_2.rs index 0d7cb1e..8d4dbf7 100644 --- a/items_2/src/items_2.rs +++ b/items_2/src/items_2.rs @@ -15,7 +15,6 @@ pub mod streams; #[cfg(test)] pub mod test; pub mod testgen; -pub mod timebin; use channelevents::ChannelEvents; use chrono::DateTime; @@ -164,23 +163,23 @@ impl Mergeable for Box { } fn new_empty(&self) -> Self { - self.as_ref().new_empty() + self.as_ref().new_empty_evs() } fn drain_into(&mut self, dst: &mut Self, range: (usize, usize)) -> Result<(), MergeError> { - self.as_mut().drain_into(dst, range) + self.as_mut().drain_into_evs(dst, range) } fn find_lowest_index_gt(&self, ts: u64) -> Option { - self.as_ref().find_lowest_index_gt(ts) + self.as_ref().find_lowest_index_gt_evs(ts) } fn find_lowest_index_ge(&self, ts: u64) -> Option { - self.as_ref().find_lowest_index_ge(ts) + self.as_ref().find_lowest_index_ge_evs(ts) } fn find_highest_index_lt(&self, ts: u64) -> Option { - self.as_ref().find_highest_index_lt(ts) + self.as_ref().find_highest_index_lt_evs(ts) } } diff --git a/items_2/src/merger.rs b/items_2/src/merger.rs index f3b0c3e..02fb230 100644 --- a/items_2/src/merger.rs +++ b/items_2/src/merger.rs @@ -24,19 +24,19 @@ const DO_DETECT_NON_MONO: bool = true; #[allow(unused)] macro_rules! trace2 { - (__$($arg:tt)*) => (); + ($($arg:tt)*) => (); ($($arg:tt)*) => (trace!($($arg)*)); } #[allow(unused)] macro_rules! trace3 { - (__$($arg:tt)*) => (); + ($($arg:tt)*) => (); ($($arg:tt)*) => (trace!($($arg)*)); } #[allow(unused)] macro_rules! trace4 { - (__$($arg:tt)*) => (); + ($($arg:tt)*) => (); ($($arg:tt)*) => (trace!($($arg)*)); } @@ -337,33 +337,31 @@ where trace3!("ninps {ninps} nitems {nitems} nitemsmissing {nitemsmissing}"); if nitemsmissing != 0 { let e = Error::from(format!("missing but no pending")); - Break(Ready(Some(Err(e)))) - } else if nitems == 0 { - Break(Ready(None)) - } else { + return Break(Ready(Some(Err(e)))); + } + let last_emit = nitems == 0; + if nitems != 0 { match Self::process(Pin::new(&mut self), cx) { - Ok(Break(())) => { - if let Some(o) = self.out.as_ref() { - // A good threshold varies according to scalar type and shape. - // TODO replace this magic number by a bound on the bytes estimate. - if o.len() >= self.out_max_len || o.byte_estimate() >= OUT_MAX_BYTES || self.do_clear_out { - trace3!("decide to output"); - self.do_clear_out = false; - Break(Ready(Some(Ok(self.out.take().unwrap())))) - } else { - trace4!("output not yet"); - Continue(()) - } - } else { - trace3!("no output candidate"); - Continue(()) - } - } - Ok(Continue(())) => { - trace2!("process returned with Continue"); - Continue(()) - } - Err(e) => Break(Ready(Some(Err(e)))), + Ok(Break(())) => {} + Ok(Continue(())) => {} + Err(e) => return Break(Ready(Some(Err(e)))), + } + } + if let Some(o) = self.out.as_ref() { + if o.len() >= self.out_max_len || o.byte_estimate() >= OUT_MAX_BYTES || self.do_clear_out || last_emit { + trace3!("decide to output"); + self.do_clear_out = false; + Break(Ready(Some(Ok(self.out.take().unwrap())))) + } else { + trace4!("output not yet"); + Continue(()) + } + } else { + trace!("no output candidate"); + if last_emit { + Break(Ready(None)) + } else { + Continue(()) } } } @@ -411,20 +409,25 @@ where continue; } } else if self.done_data { + trace!("done_data"); self.done_buffered = true; if let Some(out) = self.out.take() { + trace!("done_data emit buffered len {}", out.len()); Ready(Some(sitem_data(out))) } else { continue; } } else if let Some(item) = self.out_of_band_queue.pop_front() { - trace4!("emit out-of-band"); + trace!("emit out-of-band"); Ready(Some(item)) } else { match Self::poll2(self.as_mut(), cx) { ControlFlow::Continue(()) => continue, ControlFlow::Break(k) => match k { - Ready(Some(Ok(item))) => Ready(Some(sitem_data(item))), + Ready(Some(Ok(out))) => { + trace!("emit buffered len {}", out.len()); + Ready(Some(sitem_data(out))) + } Ready(Some(Err(e))) => { self.done_data = true; Ready(Some(Err(e.into()))) diff --git a/items_2/src/timebin.rs b/items_2/src/timebin.rs deleted file mode 100644 index a38ffb1..0000000 --- a/items_2/src/timebin.rs +++ /dev/null @@ -1,32 +0,0 @@ -use netpod::BinnedRangeEnum; -use std::fmt; - -pub trait TimeBinner: fmt::Debug + Unpin { - type Input: fmt::Debug; - type Output: fmt::Debug; - - fn ingest(&mut self, item: &mut Self::Input); - - fn set_range_complete(&mut self); - - fn bins_ready_count(&self) -> usize; - - fn bins_ready(&mut self) -> Option; - - /// If there is a bin in progress with non-zero count, push it to the result set. - /// With push_empty == true, a bin in progress is pushed even if it contains no counts. - fn push_in_progress(&mut self, push_empty: bool); - - /// Implies `Self::push_in_progress` but in addition, pushes a zero-count bin if the call - /// to `push_in_progress` did not change the result count, as long as edges are left. - /// The next call to `Self::bins_ready_count` must return one higher count than before. - fn cycle(&mut self); - - fn empty(&self) -> Option; -} - -pub trait TimeBinnable: fmt::Debug + Sized { - type TimeBinner: TimeBinner; - - fn time_binner_new(&self, binrange: BinnedRangeEnum, do_time_weight: bool) -> Self::TimeBinner; -} diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index c21cca0..e654766 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -1050,6 +1050,15 @@ pub trait Dim0Index: Clone + fmt::Debug + PartialOrd { fn to_binned_range_enum(&self, bin_off: u64, bin_cnt: u64) -> BinnedRangeEnum; } +pub trait Dim0Range: Clone + fmt::Debug + PartialOrd {} + +pub struct Dim0RangeValue +where + T: Dim0Index, +{ + pub ix: [T; 2], +} + #[derive(Clone, Deserialize, PartialEq, PartialOrd)] pub struct TsNano(pub u64); diff --git a/nodenet/src/channelconfig.rs b/nodenet/src/channelconfig.rs index 3a48073..bc7e89d 100644 --- a/nodenet/src/channelconfig.rs +++ b/nodenet/src/channelconfig.rs @@ -72,7 +72,14 @@ pub async fn channel_config(range: NanoRange, channel: Channel, ncc: &NodeConfig info!("channel_config BEFORE {channel:?}"); info!("try to get ChConf for sf-databuffer type backend"); // TODO in the future we should not need this: - let channel = sf_databuffer_fetch_channel_by_series(channel, ncc).await?; + let mut channel = sf_databuffer_fetch_channel_by_series(channel, ncc).await?; + if channel.series.is_none() { + let pgclient = dbconn::create_connection(&ncc.node_config.cluster.database).await?; + let pgclient = std::sync::Arc::new(pgclient); + let series = dbconn::find_series_sf_databuffer(&channel, pgclient).await?; + channel.series = Some(series); + } + let channel = channel; info!("channel_config AFTER {channel:?}"); let c1 = disk::channelconfig::config(range, channel.clone(), ncc).await?; info!("channel_config THEN {c1:?}"); diff --git a/nodenet/src/conn.rs b/nodenet/src/conn.rs index 912f430..a8f09a5 100644 --- a/nodenet/src/conn.rs +++ b/nodenet/src/conn.rs @@ -1,3 +1,5 @@ +pub mod generator; + use err::Error; use futures_util::Stream; use futures_util::StreamExt; @@ -7,8 +9,6 @@ use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; use items_0::streamitem::EVENT_QUERY_JSON_STRING_FRAME; -use items_0::Appendable; -use items_0::Empty; use items_2::channelevents::ChannelEvents; use items_2::framable::EventQueryJsonStringFrame; use items_2::framable::Framable; @@ -17,7 +17,6 @@ use items_2::frame::make_term_frame; use items_2::inmem::InMemoryFrame; use netpod::histo::HistoLog2; use netpod::log::*; -use netpod::AggKind; use netpod::ChConf; use netpod::NodeConfigCached; use netpod::PerfOpts; @@ -72,36 +71,37 @@ async fn make_channel_events_stream( info!("nodenet::conn::make_channel_events_stream"); if evq.channel().backend() == "test-inmem" { warn!("TEST BACKEND DATA"); - use netpod::timeunits::MS; - let node_count = node_config.node_config.cluster.nodes.len(); - let node_ix = node_config.ix; - if evq.channel().name() == "inmem-d0-i32" { - let mut item = items_2::eventsdim0::EventsDim0::::empty(); - let td = MS * 10; - for i in 0..20 { - let ts = MS * 17 + td * node_ix as u64 + td * node_count as u64 * i; - let pulse = 1 + node_ix as u64 + node_count as u64 * i; - item.push(ts, pulse, pulse as _); - } - let item = ChannelEvents::Events(Box::new(item) as _); - let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))); - let stream = futures_util::stream::iter([item]); - Ok(Box::pin(stream)) - } else if evq.channel().name() == "inmem-d0-f32" { - let mut item = items_2::eventsdim0::EventsDim0::::empty(); - let td = MS * 10; - for i in 0..20 { - let ts = MS * 17 + td * node_ix as u64 + td * node_count as u64 * i; - let pulse = 1 + node_ix as u64 + node_count as u64 * i; - item.push(ts, pulse, ts as _); - } - let item = ChannelEvents::Events(Box::new(item) as _); - let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))); - let stream = futures_util::stream::iter([item]); - Ok(Box::pin(stream)) + let node_count = node_config.node_config.cluster.nodes.len() as u64; + let node_ix = node_config.ix as u64; + let chn = evq.channel().name(); + let na: Vec<_> = chn.split("-").collect(); + if na.len() != 3 { + Err(Error::with_msg_no_trace(format!( + "can not understand test channel name: {chn:?}" + ))) } else { - let stream = futures_util::stream::empty(); - Ok(Box::pin(stream)) + if na[0] != "inmem" { + Err(Error::with_msg_no_trace(format!( + "can not understand test channel name: {chn:?}" + ))) + } else { + let range = evq.range().clone(); + if na[1] == "d0" { + if na[2] == "i32" { + generator::generate_i32(node_ix, node_count, range) + } else if na[2] == "f32" { + generator::generate_f32(node_ix, node_count, range) + } else { + Err(Error::with_msg_no_trace(format!( + "can not understand test channel name: {chn:?}" + ))) + } + } else { + Err(Error::with_msg_no_trace(format!( + "can not understand test channel name: {chn:?}" + ))) + } + } } } else if let Some(scyconf) = &node_config.node_config.cluster.scylla { scylla_channel_event_stream(evq, chconf, scyconf, node_config).await diff --git a/nodenet/src/conn/generator.rs b/nodenet/src/conn/generator.rs new file mode 100644 index 0000000..45cffec --- /dev/null +++ b/nodenet/src/conn/generator.rs @@ -0,0 +1,72 @@ +use err::Error; +use futures_util::Stream; +use items_0::container::ByteEstimate; +use items_0::streamitem::RangeCompletableItem; +use items_0::streamitem::Sitemty; +use items_0::streamitem::StreamItem; +use items_0::Appendable; +use items_0::Empty; +use items_0::WithLen; +use items_2::channelevents::ChannelEvents; +use netpod::log::*; +use netpod::range::evrange::SeriesRange; +use netpod::timeunits::MS; +use std::pin::Pin; + +pub fn generate_i32( + node_ix: u64, + node_count: u64, + range: SeriesRange, +) -> Result> + Send>>, Error> { + type T = i32; + let mut items = Vec::new(); + match range { + SeriesRange::TimeRange(range) => { + let mut item = items_2::eventsdim0::EventsDim0::empty(); + let td = MS * 1000; + let mut ts = (range.beg / td + node_ix) * td; + loop { + if ts >= range.end { + break; + } + let pulse = ts; + item.push(ts, pulse, pulse as T); + ts += td * node_count as u64; + if item.byte_estimate() > 200 { + let w = ChannelEvents::Events(Box::new(item) as _); + let w = Ok::<_, Error>(StreamItem::DataItem(RangeCompletableItem::Data(w))); + items.push(w); + item = items_2::eventsdim0::EventsDim0::empty(); + } + } + if item.len() != 0 { + let w = ChannelEvents::Events(Box::new(item) as _); + let w = Ok::<_, Error>(StreamItem::DataItem(RangeCompletableItem::Data(w))); + items.push(w); + } + } + SeriesRange::PulseRange(_) => { + error!("TODO generate test data by pulse id range"); + } + } + let stream = futures_util::stream::iter(items); + Ok(Box::pin(stream)) +} + +pub fn generate_f32( + node_ix: u64, + node_count: u64, + range: SeriesRange, +) -> Result> + Send>>, Error> { + let mut item = items_2::eventsdim0::EventsDim0::::empty(); + let td = MS * 10; + for i in 0..20 { + let ts = MS * 17 + td * node_ix as u64 + td * node_count as u64 * i; + let pulse = 1 + node_ix as u64 + node_count as u64 * i; + item.push(ts, pulse, ts as _); + } + let item = ChannelEvents::Events(Box::new(item) as _); + let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))); + let stream = futures_util::stream::iter([item]); + Ok(Box::pin(stream)) +} diff --git a/query/src/api4/binned.rs b/query/src/api4/binned.rs index 68c81bd..46ecf7a 100644 --- a/query/src/api4/binned.rs +++ b/query/src/api4/binned.rs @@ -96,12 +96,12 @@ impl BinnedQuery { pub fn timeout_value(&self) -> Duration { match &self.timeout { Some(x) => x.clone(), - None => Duration::from_millis(10000), + None => Duration::from_millis(6000), } } pub fn bins_max(&self) -> u32 { - self.bins_max.unwrap_or(1024) + self.bins_max.unwrap_or(2000) } pub fn set_series_id(&mut self, series: u64) { @@ -129,8 +129,9 @@ impl BinnedQuery { } pub fn for_time_weighted_scalar(self) -> Self { - err::todo(); - self + let mut v = self; + v.transform = TransformQuery::for_time_weighted_scalar(); + v } } diff --git a/query/src/api4/events.rs b/query/src/api4/events.rs index 0f1b83e..bb49c03 100644 --- a/query/src/api4/events.rs +++ b/query/src/api4/events.rs @@ -91,6 +91,12 @@ impl PlainEventsQuery { } pub fn events_max(&self) -> u64 { + self.events_max.unwrap_or(1024 * 128) + } + + // A rough indication on how many bytes this request is allowed to return. Otherwise, the result should + // be a partial result. + pub fn bytes_max(&self) -> u64 { self.events_max.unwrap_or(1024 * 512) } diff --git a/scyllaconn/src/bincache.rs b/scyllaconn/src/bincache.rs index 3d46eab..93ff40e 100644 --- a/scyllaconn/src/bincache.rs +++ b/scyllaconn/src/bincache.rs @@ -4,7 +4,7 @@ use err::Error; use futures_util::Future; use futures_util::Stream; use futures_util::StreamExt; -use items_0::TimeBinned; +use items_0::timebin::TimeBinned; use items_2::binsdim0::BinsDim0; use items_2::channelevents::ChannelEvents; use netpod::log::*; diff --git a/streams/src/collect.rs b/streams/src/collect.rs index 5b1115f..060ade0 100644 --- a/streams/src/collect.rs +++ b/streams/src/collect.rs @@ -47,6 +47,7 @@ where S: Stream> + Unpin, T: Collectable + WithLen + fmt::Debug, { + info!("collect events_max {events_max} deadline {deadline:?}"); let mut collector: Option> = None; let mut stream = stream; let deadline = deadline.into(); @@ -58,17 +59,16 @@ where Ok(Some(k)) => k, Ok(None) => break, Err(_e) => { - warn!("collect_in_span time out"); + warn!("collect timeout"); timed_out = true; if let Some(coll) = collector.as_mut() { coll.set_timed_out(); } else { - warn!("Timeout but no collector yet"); + warn!("collect timeout but no collector yet"); } break; } }; - trace!("collect_in_span see item {item:?}"); match item { Ok(item) => match item { StreamItem::DataItem(item) => match item { @@ -77,11 +77,11 @@ where if let Some(coll) = collector.as_mut() { coll.set_range_complete(); } else { - warn!("Received RangeComplete but no collector yet"); + warn!("collect received RangeComplete but no collector yet"); } } RangeCompletableItem::Data(mut item) => { - debug!("collect_in_span sees len {}", item.len()); + info!("collect sees len {}", item.len()); if collector.is_none() { let c = item.new_collector(); collector = Some(c); @@ -95,10 +95,10 @@ where } }, StreamItem::Log(item) => { - trace!("Log {:?}", item); + trace!("collect log {:?}", item); } StreamItem::Stats(item) => { - trace!("Stats {:?}", item); + trace!("collect stats {:?}", item); match item { // TODO factor and simplify the stats collection: StatsItem::EventDataReadStats(_) => {} @@ -131,7 +131,7 @@ where let res = collector .ok_or_else(|| Error::with_msg_no_trace(format!("no result because no collector was created")))? .result(range, binrange)?; - debug!("Total duration: {:?}", total_duration); + info!("collect stats total duration: {:?}", total_duration); Ok(res) } diff --git a/streams/src/eventchunker.rs b/streams/src/eventchunker.rs deleted file mode 100644 index 8b13789..0000000 --- a/streams/src/eventchunker.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/streams/src/frames/eventsfromframes.rs b/streams/src/frames/eventsfromframes.rs index 90019da..6a4410e 100644 --- a/streams/src/frames/eventsfromframes.rs +++ b/streams/src/frames/eventsfromframes.rs @@ -57,7 +57,7 @@ where match self.inp.poll_next_unpin(cx) { Ready(Some(Ok(item))) => match item { StreamItem::Log(item) => { - info!("{} {:?} {}", item.node_ix, item.level, item.msg); + //info!("{} {:?} {}", item.node_ix, item.level, item.msg); Ready(Some(Ok(StreamItem::Log(item)))) } StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))), @@ -65,7 +65,7 @@ where Ok(item) => match item { Ok(item) => match item { StreamItem::Log(k) => { - info!("rcvd log: {} {:?} {}", k.node_ix, k.level, k.msg); + //info!("rcvd log: {} {:?} {}", k.node_ix, k.level, k.msg); Ready(Some(Ok(StreamItem::Log(k)))) } item => Ready(Some(Ok(item))), diff --git a/streams/src/generators.rs b/streams/src/generators.rs new file mode 100644 index 0000000..1b7366d --- /dev/null +++ b/streams/src/generators.rs @@ -0,0 +1,86 @@ +use err::Error; +use futures_util::Future; +use futures_util::FutureExt; +use futures_util::Stream; +use items_0::container::ByteEstimate; +use items_0::streamitem::sitem_data; +use items_0::streamitem::RangeCompletableItem; +use items_0::streamitem::Sitemty; +use items_0::streamitem::StreamItem; +use items_0::Appendable; +use items_0::Empty; +use items_2::channelevents::ChannelEvents; +use netpod::log::*; +use netpod::range::evrange::SeriesRange; +use netpod::timeunits::MS; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; +use std::time::Duration; + +pub struct GenerateI32 { + ts: u64, + dts: u64, + tsend: u64, + timeout: Option + Send>>>, +} + +impl GenerateI32 { + pub fn new(node_ix: u64, node_count: u64, range: SeriesRange) -> Self { + let range = match range { + SeriesRange::TimeRange(k) => k, + SeriesRange::PulseRange(_) => todo!(), + }; + let dts = MS * 1000 * node_count as u64; + let ts = (range.beg / dts + node_ix) * dts; + let tsend = range.end; + Self { + ts, + dts, + tsend, + timeout: None, + } + } + + fn make_batch(&mut self) -> Sitemty { + type T = i32; + let mut item = items_2::eventsdim0::EventsDim0::empty(); + let mut ts = self.ts; + loop { + if self.ts >= self.tsend || item.byte_estimate() > 200 { + break; + } + let pulse = ts; + item.push(ts, pulse, pulse as T); + ts += self.dts; + } + self.ts = ts; + let w = ChannelEvents::Events(Box::new(item) as _); + let w = sitem_data(w); + w + } +} + +impl Stream for GenerateI32 { + type Item = Sitemty; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + loop { + break if self.ts >= self.tsend { + Ready(None) + } else if let Some(fut) = self.timeout.as_mut() { + match fut.poll_unpin(cx) { + Ready(()) => { + self.timeout = None; + Ready(Some(self.make_batch())) + } + Pending => Pending, + } + } else { + self.timeout = Some(Box::pin(tokio::time::sleep(Duration::from_millis(500)))); + continue; + }; + } + } +} diff --git a/streams/src/lib.rs b/streams/src/lib.rs index b61166d..1be5a74 100644 --- a/streams/src/lib.rs +++ b/streams/src/lib.rs @@ -1,8 +1,8 @@ pub mod collect; pub mod dtflags; -pub mod eventchunker; pub mod filechunkread; pub mod frames; +pub mod generators; pub mod needminbuffer; pub mod plaineventsjson; pub mod rangefilter2; diff --git a/streams/src/plaineventsjson.rs b/streams/src/plaineventsjson.rs index b9191b2..163892d 100644 --- a/streams/src/plaineventsjson.rs +++ b/streams/src/plaineventsjson.rs @@ -12,7 +12,6 @@ use netpod::ChConf; use netpod::Cluster; use query::api4::events::PlainEventsQuery; use serde_json::Value as JsonValue; -use std::time::Duration; use std::time::Instant; pub async fn plain_events_json(evq: &PlainEventsQuery, chconf: &ChConf, cluster: &Cluster) -> Result { @@ -28,7 +27,7 @@ pub async fn plain_events_json(evq: &PlainEventsQuery, chconf: &ChConf, cluster: assert_eq!(result[0], Value::I32(43)); } // TODO remove magic constant - let deadline = Instant::now() + evq.timeout() + Duration::from_millis(1000); + let deadline = Instant::now() + evq.timeout(); let events_max = evq.events_max(); let evquery = evq.clone(); info!("plain_events_json evquery {:?}", evquery); @@ -87,13 +86,15 @@ pub async fn plain_events_json(evq: &PlainEventsQuery, chconf: &ChConf, cluster: Box::pin(stream) }; + #[cfg(DISABLED)] let stream = stream.map(|item| { - //info!("item after merge: {item:?}"); + info!("item after merge: {item:?}"); item }); let stream = RangeFilter2::new(stream, evq.range().try_into()?, evquery.one_before_range()); + #[cfg(DISABLED)] let stream = stream.map(|item| { - //info!("item after rangefilter: {item:?}"); + info!("item after rangefilter: {item:?}"); item }); let stream = stream::iter([empty]).chain(stream); diff --git a/streams/src/tcprawclient.rs b/streams/src/tcprawclient.rs index d2ed06b..2171793 100644 --- a/streams/src/tcprawclient.rs +++ b/streams/src/tcprawclient.rs @@ -9,7 +9,6 @@ use crate::frames::eventsfromframes::EventsFromFrames; use crate::frames::inmem::InMemoryFrameAsyncReadStream; use err::Error; use futures_util::Stream; -use futures_util::StreamExt; use items_0::framable::FrameTypeInnerStatic; use items_0::streamitem::sitem_data; use items_0::streamitem::Sitemty; @@ -80,10 +79,6 @@ where let frames = InMemoryFrameAsyncReadStream::new(netin, perf_opts.inmem_bufcap); let frames = Box::pin(frames); let stream = EventsFromFrames::::new(frames, addr); - let stream = stream.map(|x| { - info!("tcp stream recv sees item {x:?}"); - x - }); streams.push(Box::pin(stream) as _); } Ok(streams) diff --git a/streams/src/test/timebin.rs b/streams/src/test/timebin.rs index f627a33..eb4f427 100644 --- a/streams/src/test/timebin.rs +++ b/streams/src/test/timebin.rs @@ -1,18 +1,27 @@ +use crate::collect::collect; +use crate::generators::GenerateI32; use crate::test::runfut; +use chrono::DateTime; +use chrono::Utc; use err::Error; use futures_util::stream; use futures_util::StreamExt; +use items_0::on_sitemty_data; use items_0::streamitem::sitem_data; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::StreamItem; +use items_0::timebin::TimeBinnable; use items_0::Empty; use items_2::binsdim0::BinsDim0; use items_2::channelevents::ChannelEvents; use items_2::channelevents::ConnStatus; use items_2::channelevents::ConnStatusEvent; use items_2::testgen::make_some_boxed_d0_f32; +use netpod::range::evrange::NanoRange; +use netpod::range::evrange::SeriesRange; use netpod::timeunits::MS; use netpod::timeunits::SEC; +use netpod::BinnedRangeEnum; use std::collections::VecDeque; use std::time::Duration; use std::time::Instant; @@ -20,7 +29,10 @@ use std::time::Instant; #[test] fn time_bin_00() { let fut = async { - let edges = [0, 1, 2, 3, 4, 5, 6, 7, 8].into_iter().map(|x| SEC * x).collect(); + let range = nano_range_from_str("1970-01-01T00:00:00Z", "1970-01-01T00:00:08Z")?; + let range = SeriesRange::TimeRange(range); + let min_bin_count = 8; + let binned_range = BinnedRangeEnum::covering_range(range, min_bin_count)?; let evs0 = make_some_boxed_d0_f32(10, SEC * 1, MS * 500, 0, 1846713782); let v0 = ChannelEvents::Events(evs0); let v2 = ChannelEvents::Status(Some(ConnStatusEvent::new(MS * 100, ConnStatus::Connect))); @@ -46,7 +58,7 @@ fn time_bin_00() { d }; let deadline = Instant::now() + Duration::from_millis(2000000); - let mut binned_stream = crate::timebin::TimeBinnedStream::new(stream0, edges, true, deadline); + let mut binned_stream = crate::timebin::TimeBinnedStream::new(stream0, binned_range, true, deadline); while let Some(item) = binned_stream.next().await { //eprintln!("{item:?}"); match item { @@ -78,7 +90,10 @@ fn time_bin_00() { #[test] fn time_bin_01() { let fut = async { - let edges = [0, 1, 2, 3, 4, 5, 6, 7, 8].into_iter().map(|x| SEC * x).collect(); + let range = nano_range_from_str("1970-01-01T00:00:00Z", "1970-01-01T00:00:08Z")?; + let range = SeriesRange::TimeRange(range); + let min_bin_count = 8; + let binned_range = BinnedRangeEnum::covering_range(range, min_bin_count)?; let evs0 = make_some_boxed_d0_f32(10, SEC * 1, MS * 500, 0, 1846713782); let evs1 = make_some_boxed_d0_f32(10, SEC * 6, MS * 500, 0, 1846713781); let v0 = ChannelEvents::Events(evs0); @@ -102,7 +117,7 @@ fn time_bin_01() { }); let stream0 = Box::pin(stream0); let deadline = Instant::now() + Duration::from_millis(200); - let mut binned_stream = crate::timebin::TimeBinnedStream::new(stream0, edges, true, deadline); + let mut binned_stream = crate::timebin::TimeBinnedStream::new(stream0, binned_range, true, deadline); while let Some(item) = binned_stream.next().await { if true { eprintln!("{item:?}"); @@ -133,4 +148,61 @@ fn time_bin_01() { runfut(fut).unwrap() } +fn nano_range_from_str(beg_date: &str, end_date: &str) -> Result { + let beg_date = beg_date.parse()?; + let end_date = end_date.parse()?; + let range = NanoRange::from_date_time(beg_date, end_date); + Ok(range) +} + +#[test] +fn time_bin_02() -> Result<(), Error> { + let fut = async { + let do_time_weight = true; + let deadline = Instant::now() + Duration::from_millis(4000); + let range = nano_range_from_str("1970-01-01T00:20:04Z", "1970-01-01T00:21:10Z")?; + let range = SeriesRange::TimeRange(range); + let min_bin_count = 10; + let binned_range = BinnedRangeEnum::covering_range(range.clone(), min_bin_count)?; + eprintln!("binned_range: {:?}", binned_range); + for i in 0.. { + if let Some(r) = binned_range.range_at(i) { + eprintln!("Series Range to cover: {r:?}"); + } else { + break; + } + } + // TODO the test stream must be able to generate also one-before (on demand) and RangeComplete (by default). + let stream = GenerateI32::new(0, 1, range); + // TODO apply first some box dyn EventTransform which later is provided by TransformQuery. + // Then the Merge will happen always by default for backends where this is needed. + // TODO then apply the transform chain for the after-merged-stream. + let stream = stream.map(|x| { + // + on_sitemty_data!(x, |x| Ok(StreamItem::DataItem(RangeCompletableItem::Data( + Box::new(x) as Box + )))) + }); + let stream = Box::pin(stream); + let mut binned_stream = + crate::timebin::TimeBinnedStream::new(stream, binned_range.clone(), do_time_weight, deadline); + // From there on it should no longer be neccessary to distinguish whether its still events or time bins. + // Then, optionally collect for output type like json, or stream as batches. + // TODO the timebinner should already provide batches to make this efficient. + while let Some(e) = binned_stream.next().await { + eprintln!("see item {e:?}"); + let x = on_sitemty_data!(e, |e| { + // + Ok(StreamItem::DataItem(RangeCompletableItem::Data(e))) + }); + } + /*let res = collect(binned_stream, deadline, 200, None, Some(binned_range)).await?; + let d = res.to_json_result()?.to_json_bytes()?; + let s = String::from_utf8_lossy(&d); + eprintln!("{s}");*/ + Ok(()) + }; + runfut(fut) +} + // TODO add test case to observe RangeComplete after binning. diff --git a/streams/src/timebin.rs b/streams/src/timebin.rs index 0513213..f62b2bb 100644 --- a/streams/src/timebin.rs +++ b/streams/src/timebin.rs @@ -7,9 +7,14 @@ use items_0::streamitem::sitem_data; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; -use items_2::timebin::TimeBinnable; -use items_2::timebin::TimeBinner; +use items_0::timebin::TimeBinnableTy; +use items_0::timebin::TimeBinner; +use items_0::timebin::TimeBinnerTy; use netpod::log::*; +use netpod::BinnedRange; +use netpod::BinnedRangeEnum; +use netpod::Dim0Index; +use std::any; use std::fmt; use std::pin::Pin; use std::task::Context; @@ -18,19 +23,19 @@ use std::time::Instant; #[allow(unused)] macro_rules! trace2 { - (D$($arg:tt)*) => (); + (__$($arg:tt)*) => (); ($($arg:tt)*) => (trace!($($arg)*)); } #[allow(unused)] macro_rules! trace3 { - (D$($arg:tt)*) => (); + (__$($arg:tt)*) => (); ($($arg:tt)*) => (trace!($($arg)*)); } #[allow(unused)] macro_rules! trace4 { - (D$($arg:tt)*) => (); + (__$($arg:tt)*) => (); ($($arg:tt)*) => (trace!($($arg)*)); } @@ -38,15 +43,15 @@ type MergeInp = Pin> + Send>>; pub struct TimeBinnedStream where - T: TimeBinnable, + T: TimeBinnableTy, { inp: MergeInp, - edges: Vec, + range: BinnedRangeEnum, do_time_weight: bool, deadline: Instant, deadline_fut: Pin + Send>>, range_complete: bool, - binner: Option<::TimeBinner>, + binner: Option<::TimeBinner>, done_data: bool, done: bool, complete: bool, @@ -54,11 +59,11 @@ where impl fmt::Debug for TimeBinnedStream where - T: TimeBinnable, + T: TimeBinnableTy, { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - fmt.debug_struct("TimeBinnedStream") - .field("edges", &self.edges) + fmt.debug_struct(any::type_name::()) + .field("range", &self.range) .field("deadline", &self.deadline) .field("range_complete", &self.range_complete) .field("binner", &self.binner) @@ -68,14 +73,14 @@ where impl TimeBinnedStream where - T: TimeBinnable, + T: TimeBinnableTy, { - pub fn new(inp: MergeInp, edges: Vec, do_time_weight: bool, deadline: Instant) -> Self { + pub fn new(inp: MergeInp, range: BinnedRangeEnum, do_time_weight: bool, deadline: Instant) -> Self { let deadline_fut = tokio::time::sleep_until(deadline.into()); let deadline_fut = Box::pin(deadline_fut); Self { inp, - edges, + range, do_time_weight, deadline, deadline_fut, @@ -91,7 +96,7 @@ where trace!("process_item {item:?}"); if self.binner.is_none() { trace!("process_item call time_binner_new"); - let binner = item.time_binner_new(todo!(), self.do_time_weight); + let binner = item.time_binner_new(self.range.clone(), self.do_time_weight); self.binner = Some(binner); } let binner = self.binner.as_mut().unwrap(); @@ -102,13 +107,13 @@ where impl Stream for TimeBinnedStream where - T: TimeBinnable + Unpin, + T: TimeBinnableTy + Unpin, { - type Item = Sitemty<<::TimeBinner as TimeBinner>::Output>; + type Item = Sitemty<<::TimeBinner as TimeBinnerTy>::Output>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - let span = tracing::span!(tracing::Level::TRACE, "poll"); + let span = span!(Level::INFO, "poll"); let _spg = span.enter(); loop { break if self.complete { diff --git a/streams/src/timebinnedjson.rs b/streams/src/timebinnedjson.rs index 92eb490..9a001e9 100644 --- a/streams/src/timebinnedjson.rs +++ b/streams/src/timebinnedjson.rs @@ -6,6 +6,7 @@ use futures_util::stream; use futures_util::StreamExt; use items_0::streamitem::sitem_data; use items_0::streamitem::Sitemty; +use items_0::timebin::TimeBinned; use items_2::channelevents::ChannelEvents; use items_2::merger::Merger; use netpod::log::*; @@ -20,29 +21,41 @@ use std::time::Instant; pub async fn timebinned_json(query: &BinnedQuery, chconf: &ChConf, cluster: &Cluster) -> Result { let binned_range = BinnedRangeEnum::covering_range(query.range().clone(), query.bin_count())?; let bins_max = 10000; - //let do_time_weight = query.agg_kind().do_time_weighted(); + warn!("TODO add with_deadline to PlainEventsQuery"); let deadline = Instant::now() + query.timeout_value(); let empty = items_2::empty::empty_events_dyn_ev(&chconf.scalar_type, &chconf.shape)?; - error!("TODO feed through transform chain"); + warn!("TODO feed through transform chain"); let empty = ChannelEvents::Events(empty); let empty = sitem_data(empty); - error!("TODO add with_deadline to PlainEventsQuery"); - todo!(); - let evquery = PlainEventsQuery::new(query.channel().clone(), query.range().clone()); + + // TODO + let evquery = PlainEventsQuery::new(query.channel().clone(), query.range().clone()).for_time_weighted_scalar(); let inps = open_tcp_streams::<_, items_2::channelevents::ChannelEvents>(&evquery, cluster).await?; // TODO propagate also the max-buf-len for the first stage event reader: info!("timebinned_json with empty item {empty:?}"); let stream = Merger::new(inps, 1024); let stream = stream::iter([empty]).chain(stream); - let stream = RangeFilter2::new(stream, todo!(), evquery.one_before_range()); + + // TODO + let do_time_weight = true; + let one_before_range = true; + + // TODO RangeFilter2 must accept SeriesRange + let range = query.range().try_into()?; + + let stream = RangeFilter2::new(stream, range, one_before_range); let stream = Box::pin(stream); - let do_time_weight = todo!(); + + // TODO TimeBinnedStream must accept types bin edges. + // Maybe even take a BinnedRangeEnum? let stream = TimeBinnedStream::new(stream, todo!(), do_time_weight, deadline); if false { let mut stream = stream; - let _: Option>> = stream.next().await; + let _: Option>> = stream.next().await; panic!() } + + // TODO collect should not have to accept two ranges, instead, generalize over it. let collected = crate::collect::collect(stream, deadline, bins_max, None, Some(binned_range.clone())).await?; let jsval = serde_json::to_value(&collected)?; Ok(jsval)