From f5909ea03cbc6b4fbb103d4c7393642a608a293d Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 6 Sep 2024 19:08:54 +0200 Subject: [PATCH] WIP --- crates/daqbuffer/Cargo.toml | 2 +- crates/httpret/src/api4/binned.rs | 1 - crates/items_0/src/collect_s.rs | 8 +- crates/items_0/src/scalar_ops.rs | 2 +- crates/items_2/src/binsdim0.rs | 121 ++++++++- crates/items_2/src/binsxbindim0.rs | 6 + crates/items_2/src/channelevents.rs | 8 + crates/items_2/src/eventsdim0.rs | 11 +- crates/items_2/src/eventsdim0enum.rs | 7 + crates/items_2/src/eventsdim1.rs | 11 +- crates/items_2/src/eventsxbindim0.rs | 6 + crates/nodenet/src/channelconfig.rs | 6 + crates/query/src/api4/binned.rs | 63 ++++- crates/query/src/api4/events.rs | 9 +- crates/scyllaconn/src/events.rs | 2 +- crates/streams/src/timebin.rs | 284 +------------------ crates/streams/src/timebin/basic.rs | 287 ++++++++++++++++++++ crates/streams/src/timebin/cached.rs | 4 + crates/streams/src/timebin/cached/reader.rs | 30 ++ crates/streams/src/timebin/fromlayers.rs | 101 +++++++ crates/streams/src/timebin/gapfill.rs | 49 ++++ crates/streams/src/timebin/grid.rs | 7 + crates/streams/src/timebinnedjson.rs | 94 ++++++- 23 files changed, 818 insertions(+), 301 deletions(-) create mode 100644 crates/streams/src/timebin/basic.rs create mode 100644 crates/streams/src/timebin/cached.rs create mode 100644 crates/streams/src/timebin/cached/reader.rs create mode 100644 crates/streams/src/timebin/fromlayers.rs create mode 100644 crates/streams/src/timebin/gapfill.rs create mode 100644 crates/streams/src/timebin/grid.rs diff --git a/crates/daqbuffer/Cargo.toml b/crates/daqbuffer/Cargo.toml index fb6d22c..9eabc57 100644 --- a/crates/daqbuffer/Cargo.toml +++ b/crates/daqbuffer/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqbuffer" -version = "0.5.3-aa.1" +version = "0.5.3-aa.2" authors = ["Dominik Werder "] edition = "2021" diff --git a/crates/httpret/src/api4/binned.rs b/crates/httpret/src/api4/binned.rs index afbaab0..9600330 100644 --- a/crates/httpret/src/api4/binned.rs +++ b/crates/httpret/src/api4/binned.rs @@ -149,6 +149,5 @@ async fn binned_json( .await .map_err(|e| Error::BinnedStream(e))?; let ret = response(StatusCode::OK).body(ToJsonBody::from(&item).into_body())?; - // let ret = error_response(e.public_message(), ctx.reqid()); Ok(ret) } diff --git a/crates/items_0/src/collect_s.rs b/crates/items_0/src/collect_s.rs index d5a8380..72ab41e 100644 --- a/crates/items_0/src/collect_s.rs +++ b/crates/items_0/src/collect_s.rs @@ -50,7 +50,7 @@ impl ToJsonBytes for serde_json::Value { } } -pub trait Collected: fmt::Debug + Send + AsAnyRef + WithLen + ToJsonResult {} +pub trait Collected: fmt::Debug + TypeName + Send + AsAnyRef + WithLen + ToJsonResult {} erased_serde::serialize_trait_object!(Collected); @@ -66,6 +66,12 @@ impl WithLen for Box { } } +impl TypeName for Box { + fn type_name(&self) -> String { + self.as_ref().type_name() + } +} + impl Collected for Box {} // TODO rename to `Typed` diff --git a/crates/items_0/src/scalar_ops.rs b/crates/items_0/src/scalar_ops.rs index ac7bbef..95a92af 100644 --- a/crates/items_0/src/scalar_ops.rs +++ b/crates/items_0/src/scalar_ops.rs @@ -258,7 +258,7 @@ impl ByteEstimate for EnumVariant { impl AsPrimF32 for EnumVariant { fn as_prim_f32_b(&self) -> f32 { - 0. + self.ix() as f32 } } diff --git a/crates/items_2/src/binsdim0.rs b/crates/items_2/src/binsdim0.rs index 058cba2..b6d6227 100644 --- a/crates/items_2/src/binsdim0.rs +++ b/crates/items_2/src/binsdim0.rs @@ -19,8 +19,10 @@ use items_0::overlap::HasTimestampDeque; use items_0::scalar_ops::AsPrimF32; use items_0::scalar_ops::ScalarOps; use items_0::timebin::TimeBinnable; +use items_0::timebin::TimeBinnableTy; use items_0::timebin::TimeBinned; use items_0::timebin::TimeBinner; +use items_0::timebin::TimeBinnerTy; use items_0::timebin::TimeBins; use items_0::AppendAllFrom; use items_0::AppendEmptyBin; @@ -298,6 +300,64 @@ impl TimeBinnableType for BinsDim0 { } } +#[derive(Debug)] +pub struct BinsDim0TimeBinnerTy { + _t1: std::marker::PhantomData, +} + +impl TimeBinnerTy for BinsDim0TimeBinnerTy +where + STY: ScalarOps, +{ + type Input = BinsDim0; + type Output = BinsDim0; + + fn ingest(&mut self, item: &mut Self::Input) { + todo!() + } + + fn set_range_complete(&mut self) { + todo!() + } + + fn bins_ready_count(&self) -> usize { + todo!() + } + + fn bins_ready(&mut self) -> Option { + todo!() + } + + fn push_in_progress(&mut self, push_empty: bool) { + todo!() + } + + fn cycle(&mut self) { + todo!() + } + + fn empty(&self) -> Option { + todo!() + } + + fn append_empty_until_end(&mut self) { + todo!() + } +} + +impl TimeBinnableTy for BinsDim0 { + type TimeBinner = BinsDim0TimeBinnerTy; + + fn time_binner_new( + &self, + binrange: BinnedRangeEnum, + do_time_weight: bool, + emit_empty_bins: bool, + ) -> Self::TimeBinner { + todo!() + } +} + // TODO rename to BinsDim0CollectorOutput #[derive(Debug, Serialize, Deserialize)] pub struct BinsDim0CollectedResult { @@ -331,9 +391,60 @@ pub struct BinsDim0CollectedResult { finished_at: Option, } +// TODO temporary fix for the enum output +impl BinsDim0CollectedResult +where + STY: ScalarOps, +{ + pub fn boxed_collected_with_enum_fix(&self) -> Box { + if let Some(bins) = self + .as_any_ref() + .downcast_ref::>() + { + let mins = self.mins.iter().map(|x| 0).collect(); + let maxs = self.mins.iter().map(|x| 0).collect(); + let bins = BinsDim0CollectedResult:: { + ts_anchor_sec: self.ts_anchor_sec.clone(), + ts1_off_ms: self.ts1_off_ms.clone(), + ts2_off_ms: self.ts2_off_ms.clone(), + ts1_off_ns: self.ts1_off_ns.clone(), + ts2_off_ns: self.ts2_off_ns.clone(), + counts: self.counts.clone(), + mins, + maxs, + avgs: self.avgs.clone(), + range_final: self.range_final.clone(), + timed_out: self.timed_out.clone(), + missing_bins: self.missing_bins.clone(), + continue_at: self.continue_at.clone(), + finished_at: self.finished_at.clone(), + }; + Box::new(bins) + } else { + let bins = Self { + ts_anchor_sec: self.ts_anchor_sec.clone(), + ts1_off_ms: self.ts1_off_ms.clone(), + ts2_off_ms: self.ts2_off_ms.clone(), + ts1_off_ns: self.ts1_off_ns.clone(), + ts2_off_ns: self.ts2_off_ns.clone(), + counts: self.counts.clone(), + mins: self.mins.clone(), + maxs: self.maxs.clone(), + avgs: self.avgs.clone(), + range_final: self.range_final.clone(), + timed_out: self.timed_out.clone(), + missing_bins: self.missing_bins.clone(), + continue_at: self.continue_at.clone(), + finished_at: self.finished_at.clone(), + }; + Box::new(bins) + } + } +} + impl AsAnyRef for BinsDim0CollectedResult where - NTY: ScalarOps, + NTY: 'static, { fn as_any_ref(&self) -> &dyn Any { self @@ -342,13 +453,19 @@ where impl AsAnyMut for BinsDim0CollectedResult where - NTY: ScalarOps, + NTY: 'static, { fn as_any_mut(&mut self) -> &mut dyn Any { self } } +impl TypeName for BinsDim0CollectedResult { + fn type_name(&self) -> String { + any::type_name::().into() + } +} + impl WithLen for BinsDim0CollectedResult { fn len(&self) -> usize { self.mins.len() diff --git a/crates/items_2/src/binsxbindim0.rs b/crates/items_2/src/binsxbindim0.rs index 970c910..b36e28b 100644 --- a/crates/items_2/src/binsxbindim0.rs +++ b/crates/items_2/src/binsxbindim0.rs @@ -344,6 +344,12 @@ where } } +impl TypeName for BinsXbinDim0CollectedResult { + fn type_name(&self) -> String { + any::type_name::().into() + } +} + impl WithLen for BinsXbinDim0CollectedResult { fn len(&self) -> usize { self.mins.len() diff --git a/crates/items_2/src/channelevents.rs b/crates/items_2/src/channelevents.rs index dddeab7..d1a0746 100644 --- a/crates/items_2/src/channelevents.rs +++ b/crates/items_2/src/channelevents.rs @@ -1193,6 +1193,7 @@ impl TimeBinnableTy for ChannelEvents { } } +// TODO remove type #[derive(Debug, Serialize, Deserialize)] pub struct ChannelEventsCollectorOutput {} @@ -1208,6 +1209,13 @@ impl AsAnyMut for ChannelEventsCollectorOutput { } } +impl TypeName for ChannelEventsCollectorOutput { + fn type_name(&self) -> String { + // TODO should not be here + any::type_name::().into() + } +} + impl WithLen for ChannelEventsCollectorOutput { fn len(&self) -> usize { todo!() diff --git a/crates/items_2/src/eventsdim0.rs b/crates/items_2/src/eventsdim0.rs index 8611c0b..09665f3 100644 --- a/crates/items_2/src/eventsdim0.rs +++ b/crates/items_2/src/eventsdim0.rs @@ -267,6 +267,7 @@ where type Aggregator = EventsDim0Aggregator; fn aggregator(range: SeriesRange, x_bin_count: usize, do_time_weight: bool) -> Self::Aggregator { + panic!("TODO remove, should no longer be used"); let self_name = any::type_name::(); debug!( "TimeBinnableType for {self_name} aggregator() range {:?} x_bin_count {} do_time_weight {}", @@ -404,7 +405,7 @@ impl EventsDim0CollectorOutput { impl AsAnyRef for EventsDim0CollectorOutput where - STY: ScalarOps, + STY: 'static, { fn as_any_ref(&self) -> &dyn Any { self @@ -413,13 +414,19 @@ where impl AsAnyMut for EventsDim0CollectorOutput where - STY: ScalarOps, + STY: 'static, { fn as_any_mut(&mut self) -> &mut dyn Any { self } } +impl TypeName for EventsDim0CollectorOutput { + fn type_name(&self) -> String { + any::type_name::().into() + } +} + impl WithLen for EventsDim0CollectorOutput { fn len(&self) -> usize { self.values.len() diff --git a/crates/items_2/src/eventsdim0enum.rs b/crates/items_2/src/eventsdim0enum.rs index 1442afa..2e90deb 100644 --- a/crates/items_2/src/eventsdim0enum.rs +++ b/crates/items_2/src/eventsdim0enum.rs @@ -25,6 +25,7 @@ use netpod::timeunits::SEC; use netpod::BinnedRangeEnum; use serde::Deserialize; use serde::Serialize; +use std::any; use std::any::Any; use std::collections::VecDeque; use std::mem; @@ -114,6 +115,12 @@ impl AsAnyMut for EventsDim0EnumCollectorOutput { } } +impl TypeName for EventsDim0EnumCollectorOutput { + fn type_name(&self) -> String { + any::type_name::().into() + } +} + impl ToJsonResult for EventsDim0EnumCollectorOutput { fn to_json_result(&self) -> Result, Error> { todo!() diff --git a/crates/items_2/src/eventsdim1.rs b/crates/items_2/src/eventsdim1.rs index 86cf0cc..c99cec8 100644 --- a/crates/items_2/src/eventsdim1.rs +++ b/crates/items_2/src/eventsdim1.rs @@ -26,6 +26,7 @@ use items_0::Empty; use items_0::Events; use items_0::EventsNonObj; use items_0::MergeError; +use items_0::TypeName; use items_0::WithLen; use netpod::is_false; use netpod::log::*; @@ -332,7 +333,7 @@ impl EventsDim1CollectorOutput { impl AsAnyRef for EventsDim1CollectorOutput where - STY: ScalarOps, + STY: 'static, { fn as_any_ref(&self) -> &dyn Any { self @@ -341,13 +342,19 @@ where impl AsAnyMut for EventsDim1CollectorOutput where - STY: ScalarOps, + STY: 'static, { fn as_any_mut(&mut self) -> &mut dyn Any { self } } +impl TypeName for EventsDim1CollectorOutput { + fn type_name(&self) -> String { + any::type_name::().into() + } +} + impl WithLen for EventsDim1CollectorOutput { fn len(&self) -> usize { self.values.len() diff --git a/crates/items_2/src/eventsxbindim0.rs b/crates/items_2/src/eventsxbindim0.rs index 3746ade..dcbe670 100644 --- a/crates/items_2/src/eventsxbindim0.rs +++ b/crates/items_2/src/eventsxbindim0.rs @@ -925,6 +925,12 @@ where } } +impl TypeName for EventsXbinDim0CollectorOutput { + fn type_name(&self) -> String { + any::type_name::().into() + } +} + impl WithLen for EventsXbinDim0CollectorOutput { fn len(&self) -> usize { self.mins.len() diff --git a/crates/nodenet/src/channelconfig.rs b/crates/nodenet/src/channelconfig.rs index 862fcbf..e4a6d68 100644 --- a/crates/nodenet/src/channelconfig.rs +++ b/crates/nodenet/src/channelconfig.rs @@ -60,6 +60,12 @@ impl From for Error { use dbconn::channelconfig::Error::*; match value { NotFound(chn, _) => Self::NotFoundChannel(chn), + SeriesNotFound(backend, series) => Self::NotFoundChannel(SfDbChannel::from_full( + backend, + Some(series), + "", + netpod::SeriesKind::ChannelData, + )), _ => Self::ChannelConfig(value), } } diff --git a/crates/query/src/api4/binned.rs b/crates/query/src/api4/binned.rs index e5fce77..061a940 100644 --- a/crates/query/src/api4/binned.rs +++ b/crates/query/src/api4/binned.rs @@ -17,7 +17,43 @@ use std::collections::BTreeMap; use std::time::Duration; use url::Url; -#[derive(Clone, Debug, Serialize, Deserialize)] +mod serde_option_vec_duration { + use serde::Deserialize; + use serde::Deserializer; + use serde::Serialize; + use serde::Serializer; + use std::time::Duration; + + #[derive(Debug, Clone, Serialize, Deserialize)] + struct HumantimeDuration { + #[serde(with = "humantime_serde")] + inner: Duration, + } + + pub fn serialize(val: &Option>, ser: S) -> Result + where + S: Serializer, + { + match val { + Some(vec) => { + // humantime_serde::serialize(&t, ser) + let t: Vec<_> = vec.iter().map(|&x| HumantimeDuration { inner: x }).collect(); + serde::Serialize::serialize(&t, ser) + } + None => ser.serialize_none(), + } + } + + pub fn deserialize<'a, D>(de: D) -> Result>, D::Error> + where + D: Deserializer<'a>, + { + let t: Option> = serde::Deserialize::deserialize(de)?; + Ok(t.map(|v| v.iter().map(|x| x.inner).collect())) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct BinnedQuery { channel: SfDbChannel, range: SeriesRange, @@ -33,6 +69,8 @@ pub struct BinnedQuery { cache_usage: Option, #[serde(default, skip_serializing_if = "Option::is_none")] bins_max: Option, + #[serde(default, skip_serializing_if = "Option::is_none", with = "serde_option_vec_duration")] + subgrids: Option>, #[serde( default, skip_serializing_if = "Option::is_none", @@ -64,6 +102,7 @@ impl BinnedQuery { transform: TransformQuery::default_time_binned(), cache_usage: None, bins_max: None, + subgrids: None, buf_len_disk_io: None, disk_stats_every: None, timeout_content: None, @@ -91,7 +130,7 @@ impl BinnedQuery { } pub fn cache_usage(&self) -> CacheUsage { - self.cache_usage.as_ref().map_or(CacheUsage::Use, |x| x.clone()) + self.cache_usage.as_ref().map_or(CacheUsage::Ignore, |x| x.clone()) } pub fn disk_stats_every(&self) -> ByteSize { @@ -116,6 +155,10 @@ impl BinnedQuery { self.bins_max.unwrap_or(200000) } + pub fn subgrids(&self) -> Option<&[Duration]> { + self.subgrids.as_ref().map(|x| x.as_slice()) + } + pub fn merger_out_len_max(&self) -> usize { self.merger_out_len_max.unwrap_or(1024) } @@ -210,6 +253,9 @@ impl FromUrl for BinnedQuery { .get("contentTimeout") .and_then(|x| humantime::parse_duration(x).ok()), bins_max: pairs.get("binsMax").map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?, + subgrids: pairs + .get("subgrids") + .map(|x| x.split(",").filter_map(|x| humantime::parse_duration(x).ok()).collect()), merger_out_len_max: pairs .get("mergerOutLenMax") .map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?, @@ -258,6 +304,19 @@ impl AppendToUrl for BinnedQuery { if let Some(x) = self.bins_max { g.append_pair("binsMax", &format!("{}", x)); } + if let Some(x) = &self.subgrids { + let s: String = + x.iter() + .map(|&x| humantime::format_duration(x).to_string()) + .fold(String::new(), |mut a, x| { + if a.len() != 0 { + a.push_str(","); + } + a.push_str(&x); + a + }); + g.append_pair("subgrids", &s); + } if let Some(x) = self.buf_len_disk_io { g.append_pair("bufLenDiskIo", &format!("{}", x)); } diff --git a/crates/query/src/api4/events.rs b/crates/query/src/api4/events.rs index efeb5f5..ed867a6 100644 --- a/crates/query/src/api4/events.rs +++ b/crates/query/src/api4/events.rs @@ -28,6 +28,8 @@ pub struct PlainEventsQuery { range: SeriesRange, #[serde(default, skip_serializing_if = "is_false", rename = "oneBeforeRange")] one_before_range: bool, + #[serde(default, skip_serializing_if = "is_false", rename = "begExcl")] + beg_excl: bool, #[serde(default = "TransformQuery::default_events")] #[serde(skip_serializing_if = "TransformQuery::is_default_events")] transform: TransformQuery, @@ -75,6 +77,7 @@ impl PlainEventsQuery { Self { channel, range: range.into(), + beg_excl: false, one_before_range: false, transform: TransformQuery::default_events(), timeout_content: None, @@ -252,7 +255,8 @@ impl FromUrl for PlainEventsQuery { let ret = Self { channel: SfDbChannel::from_pairs(pairs)?, range, - one_before_range: pairs.get("oneBeforeRange").map_or("false", |x| x.as_ref()) == "true", + one_before_range: pairs.get("oneBeforeRange").map_or(false, |x| x == "true"), + beg_excl: pairs.get("begExcl").map_or(false, |x| x == "true"), transform: TransformQuery::from_pairs(pairs)?, timeout_content: pairs .get("contentTimeout") @@ -313,6 +317,9 @@ impl AppendToUrl for PlainEventsQuery { self.channel.append_to_url(url); let mut g = url.query_pairs_mut(); g.append_pair("oneBeforeRange", &self.one_before_range().to_string()); + if self.beg_excl { + g.append_pair("begExcl", "true"); + } g.append_pair("querymarker", &self.querymarker); drop(g); self.transform.append_to_url(url); diff --git a/crates/scyllaconn/src/events.rs b/crates/scyllaconn/src/events.rs index 098cafb..e2190be 100644 --- a/crates/scyllaconn/src/events.rs +++ b/crates/scyllaconn/src/events.rs @@ -555,7 +555,7 @@ fn convert_rows_enum( let val = row.1 as u16; let valstr = row.2; let value = EnumVariant::new(val, valstr); - info!("read enum variant {:?} {:?}", value, value.name_string()); + // trace_fetch!("read enum variant {:?} {:?}", value, value.name_string()); (ts, value) } } else { diff --git a/crates/streams/src/timebin.rs b/crates/streams/src/timebin.rs index ee0cb76..f5c6fca 100644 --- a/crates/streams/src/timebin.rs +++ b/crates/streams/src/timebin.rs @@ -1,278 +1,8 @@ -use err::Error; -use futures_util::Stream; -use futures_util::StreamExt; -use items_0::streamitem::sitem_data; -use items_0::streamitem::RangeCompletableItem; -use items_0::streamitem::Sitemty; -use items_0::streamitem::StreamItem; -use items_0::timebin::TimeBinnableTy; -use items_0::timebin::TimeBinnerTy; -use netpod::log::*; -use netpod::BinnedRangeEnum; -use std::any; -use std::fmt; -use std::ops::ControlFlow; -use std::pin::Pin; -use std::task::Context; -use std::task::Poll; +mod basic; +mod cached; +mod fromlayers; +mod gapfill; +mod grid; -#[allow(unused)] -macro_rules! trace2 { - ($($arg:tt)*) => {}; - ($($arg:tt)*) => { trace!($($arg)*) }; -} - -#[allow(unused)] -macro_rules! trace3 { - ($($arg:tt)*) => {}; - ($($arg:tt)*) => { trace!($($arg)*) }; -} - -#[allow(unused)] -macro_rules! trace4 { - ($($arg:tt)*) => {}; - ($($arg:tt)*) => { trace!($($arg)*) }; -} - -type MergeInp = Pin> + Send>>; - -pub struct TimeBinnedStream -where - T: TimeBinnableTy, -{ - inp: MergeInp, - range: BinnedRangeEnum, - do_time_weight: bool, - range_final: bool, - binner: Option<::TimeBinner>, - done_first_input: bool, - done_data: bool, - done: bool, - complete: bool, -} - -impl fmt::Debug for TimeBinnedStream -where - T: TimeBinnableTy, -{ - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - fmt.debug_struct(any::type_name::()) - .field("range", &self.range) - .field("range_final", &self.range_final) - .field("binner", &self.binner) - .finish() - } -} - -impl TimeBinnedStream -where - T: TimeBinnableTy, -{ - pub fn new(inp: MergeInp, range: BinnedRangeEnum, do_time_weight: bool) -> Self { - Self { - inp, - range, - do_time_weight, - range_final: false, - binner: None, - done_first_input: false, - done_data: false, - done: false, - complete: false, - } - } - - fn process_item(&mut self, mut item: T) -> () { - let emit_empty_bins = true; - trace2!("process_item {item:?}"); - if self.binner.is_none() { - trace!("process_item call time_binner_new"); - let binner = item.time_binner_new(self.range.clone(), self.do_time_weight, emit_empty_bins); - self.binner = Some(binner); - } - let binner = self.binner.as_mut().unwrap(); - trace2!("process_item call binner ingest"); - binner.ingest(&mut item); - } - - fn handle_data_item( - &mut self, - item: T, - ) -> Result::TimeBinner as TimeBinnerTy>::Output>>>, Error> { - use ControlFlow::*; - use Poll::*; - trace2!("================= handle_data_item"); - let item_len = item.len(); - self.process_item(item); - let mut do_emit = false; - if self.done_first_input == false { - debug!( - "emit container after the first input len {} binner {}", - item_len, - self.binner.is_some() - ); - if self.binner.is_none() { - let e = Error::with_msg_no_trace("must emit on first input but no binner"); - self.done = true; - return Err(e); - } - do_emit = true; - self.done_first_input = true; - } - if let Some(binner) = self.binner.as_mut() { - trace3!("bins ready count {}", binner.bins_ready_count()); - if binner.bins_ready_count() > 0 { - do_emit = true - } - if do_emit { - if let Some(bins) = binner.bins_ready() { - Ok(Break(Ready(sitem_data(bins)))) - } else { - if let Some(bins) = binner.empty() { - Ok(Break(Ready(sitem_data(bins)))) - } else { - let e = Error::with_msg_no_trace("must emit but can not even create empty A"); - error!("{e}"); - Err(e) - } - } - } else { - trace3!("not emit"); - Ok(ControlFlow::Continue(())) - } - } else { - warn!("processed item, but no binner yet"); - Ok(ControlFlow::Continue(())) - } - } - - fn handle_item( - &mut self, - item: Result>, Error>, - ) -> Result::TimeBinner as TimeBinnerTy>::Output>>>, Error> { - use ControlFlow::*; - use Poll::*; - trace2!("================= handle_item"); - match item { - Ok(item) => match item { - StreamItem::DataItem(item) => match item { - RangeCompletableItem::RangeComplete => { - debug!("see RangeComplete"); - self.range_final = true; - Ok(Continue(())) - } - RangeCompletableItem::Data(item) => self.handle_data_item(item), - }, - StreamItem::Log(item) => Ok(Break(Ready(Ok(StreamItem::Log(item))))), - StreamItem::Stats(item) => Ok(Break(Ready(Ok(StreamItem::Stats(item))))), - }, - Err(e) => { - error!("received error item: {e}"); - self.done = true; - Err(e) - } - } - } - - fn handle_none( - &mut self, - ) -> Result::TimeBinner as TimeBinnerTy>::Output>>>, Error> { - use ControlFlow::*; - use Poll::*; - trace2!("================= handle_none"); - let self_range_final = self.range_final; - if let Some(binner) = self.binner.as_mut() { - trace!("bins ready count before finish {}", binner.bins_ready_count()); - // TODO rework the finish logic - if self_range_final { - binner.set_range_complete(); - } - binner.push_in_progress(false); - trace!("bins ready count after finish {}", binner.bins_ready_count()); - if let Some(bins) = binner.bins_ready() { - self.done_data = true; - Ok(Break(Ready(sitem_data(bins)))) - } else { - if let Some(bins) = binner.empty() { - self.done_data = true; - Ok(Break(Ready(sitem_data(bins)))) - } else { - let e = Error::with_msg_no_trace("must emit but can not even create empty B"); - error!("{e}"); - self.done_data = true; - Err(e) - } - } - } else { - warn!("input stream finished, still no binner"); - self.done_data = true; - let e = Error::with_msg_no_trace(format!("input stream finished, still no binner")); - Err(e) - } - } - - // TODO - // Original block inside the poll loop was able to: - // continue - // break with Poll> - fn poll_input( - &mut self, - cx: &mut Context, - ) -> Result::TimeBinner as TimeBinnerTy>::Output>>>, Error> { - use ControlFlow::*; - use Poll::*; - trace2!("================= poll_input"); - match self.inp.poll_next_unpin(cx) { - Ready(Some(item)) => self.handle_item(item), - Ready(None) => self.handle_none(), - Pending => Ok(Break(Pending)), - } - } -} - -impl Stream for TimeBinnedStream -where - T: TimeBinnableTy + Unpin, -{ - type Item = Sitemty<<::TimeBinner as TimeBinnerTy>::Output>; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - use Poll::*; - let span = span!(Level::INFO, "TimeBinner"); - let _spg = span.enter(); - trace2!("================= POLL"); - loop { - break if self.complete { - panic!("TimeBinnedStream poll on complete") - } else if self.done { - self.complete = true; - Ready(None) - } else if self.done_data { - self.done = true; - if self.range_final { - Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) - } else { - continue; - } - } else { - match self.poll_input(cx) { - Ok(item) => match item { - ControlFlow::Continue(()) => continue, - ControlFlow::Break(item) => match item { - Ready(item) => break Ready(Some(item)), - Pending => break Pending, - }, - }, - Err(e) => { - self.done = true; - break Ready(Some(Err(e))); - } - } - }; - } - } -} - -//impl WithTransformProperties for TimeBinnedStream where T: TimeBinnableTy {} - -//impl TimeBinnableStreamTrait for TimeBinnedStream where T: TimeBinnableTy {} +pub(super) use basic::TimeBinnedStream; +pub(super) use fromlayers::TimeBinnedFromLayers; diff --git a/crates/streams/src/timebin/basic.rs b/crates/streams/src/timebin/basic.rs new file mode 100644 index 0000000..4fa1ec6 --- /dev/null +++ b/crates/streams/src/timebin/basic.rs @@ -0,0 +1,287 @@ +use err::Error; +use futures_util::Stream; +use futures_util::StreamExt; +use items_0::streamitem::sitem_data; +use items_0::streamitem::RangeCompletableItem; +use items_0::streamitem::Sitemty; +use items_0::streamitem::StreamItem; +use items_0::timebin::TimeBinnableTy; +use items_0::timebin::TimeBinnerTy; +use netpod::log::*; +use netpod::BinnedRangeEnum; +use std::any; +use std::fmt; +use std::ops::ControlFlow; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +#[allow(unused)] +macro_rules! trace2 { + ($($arg:tt)*) => { + if false { + trace!($($arg)*); + } + }; +} + +#[allow(unused)] +macro_rules! trace3 { + ($($arg:tt)*) => { + if false { + trace!($($arg)*); + } + }; +} + +#[allow(unused)] +macro_rules! trace4 { + ($($arg:tt)*) => { + if false { + trace!($($arg)*); + } + }; +} + +type SitemtyStream = Pin> + Send>>; + +pub struct TimeBinnedStream +where + T: TimeBinnableTy, +{ + inp: SitemtyStream, + range: BinnedRangeEnum, + do_time_weight: bool, + range_final: bool, + binner: Option<::TimeBinner>, + done_first_input: bool, + done_data: bool, + done: bool, + complete: bool, +} + +impl fmt::Debug for TimeBinnedStream +where + T: TimeBinnableTy, +{ + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct(any::type_name::()) + .field("range", &self.range) + .field("range_final", &self.range_final) + .field("binner", &self.binner) + .finish() + } +} + +impl TimeBinnedStream +where + T: TimeBinnableTy, +{ + pub fn new(inp: SitemtyStream, range: BinnedRangeEnum, do_time_weight: bool) -> Self { + Self { + inp, + range, + do_time_weight, + range_final: false, + binner: None, + done_first_input: false, + done_data: false, + done: false, + complete: false, + } + } + + fn process_item(&mut self, mut item: T) -> () { + let emit_empty_bins = true; + trace2!("process_item {item:?}"); + if self.binner.is_none() { + trace!("process_item call time_binner_new"); + let binner = item.time_binner_new(self.range.clone(), self.do_time_weight, emit_empty_bins); + self.binner = Some(binner); + } + let binner = self.binner.as_mut().unwrap(); + trace2!("process_item call binner ingest"); + binner.ingest(&mut item); + } + + fn handle_data_item( + &mut self, + item: T, + ) -> Result::TimeBinner as TimeBinnerTy>::Output>>>, Error> { + use ControlFlow::*; + use Poll::*; + trace2!("================= handle_data_item"); + let item_len = item.len(); + self.process_item(item); + let mut do_emit = false; + if self.done_first_input == false { + debug!( + "emit container after the first input len {} binner {}", + item_len, + self.binner.is_some() + ); + if self.binner.is_none() { + let e = Error::with_msg_no_trace("must emit on first input but no binner"); + self.done = true; + return Err(e); + } + do_emit = true; + self.done_first_input = true; + } + if let Some(binner) = self.binner.as_mut() { + trace3!("bins ready count {}", binner.bins_ready_count()); + if binner.bins_ready_count() > 0 { + do_emit = true + } + if do_emit { + if let Some(bins) = binner.bins_ready() { + Ok(Break(Ready(sitem_data(bins)))) + } else { + if let Some(bins) = binner.empty() { + Ok(Break(Ready(sitem_data(bins)))) + } else { + let e = Error::with_msg_no_trace("must emit but can not even create empty A"); + error!("{e}"); + Err(e) + } + } + } else { + trace3!("not emit"); + Ok(ControlFlow::Continue(())) + } + } else { + warn!("processed item, but no binner yet"); + Ok(ControlFlow::Continue(())) + } + } + + fn handle_item( + &mut self, + item: Result>, Error>, + ) -> Result::TimeBinner as TimeBinnerTy>::Output>>>, Error> { + use ControlFlow::*; + use Poll::*; + trace2!("================= handle_item"); + match item { + Ok(item) => match item { + StreamItem::DataItem(item) => match item { + RangeCompletableItem::RangeComplete => { + debug!("see RangeComplete"); + self.range_final = true; + Ok(Continue(())) + } + RangeCompletableItem::Data(item) => self.handle_data_item(item), + }, + StreamItem::Log(item) => Ok(Break(Ready(Ok(StreamItem::Log(item))))), + StreamItem::Stats(item) => Ok(Break(Ready(Ok(StreamItem::Stats(item))))), + }, + Err(e) => { + error!("received error item: {e}"); + self.done = true; + Err(e) + } + } + } + + fn handle_none( + &mut self, + ) -> Result::TimeBinner as TimeBinnerTy>::Output>>>, Error> { + use ControlFlow::*; + use Poll::*; + trace2!("================= handle_none"); + let self_range_final = self.range_final; + if let Some(binner) = self.binner.as_mut() { + trace!("bins ready count before finish {}", binner.bins_ready_count()); + // TODO rework the finish logic + if self_range_final { + binner.set_range_complete(); + } + binner.push_in_progress(false); + trace!("bins ready count after finish {}", binner.bins_ready_count()); + if let Some(bins) = binner.bins_ready() { + self.done_data = true; + Ok(Break(Ready(sitem_data(bins)))) + } else { + if let Some(bins) = binner.empty() { + self.done_data = true; + Ok(Break(Ready(sitem_data(bins)))) + } else { + let e = Error::with_msg_no_trace("must emit but can not even create empty B"); + error!("{e}"); + self.done_data = true; + Err(e) + } + } + } else { + warn!("input stream finished, still no binner"); + self.done_data = true; + let e = Error::with_msg_no_trace(format!("input stream finished, still no binner")); + Err(e) + } + } + + // TODO + // Original block inside the poll loop was able to: + // continue + // break with Poll> + fn poll_input( + &mut self, + cx: &mut Context, + ) -> Result::TimeBinner as TimeBinnerTy>::Output>>>, Error> { + use ControlFlow::*; + use Poll::*; + trace2!("================= poll_input"); + match self.inp.poll_next_unpin(cx) { + Ready(Some(item)) => self.handle_item(item), + Ready(None) => self.handle_none(), + Pending => Ok(Break(Pending)), + } + } +} + +impl Stream for TimeBinnedStream +where + T: TimeBinnableTy + Unpin, +{ + type Item = Sitemty<<::TimeBinner as TimeBinnerTy>::Output>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + let span = span!(Level::INFO, "TimeBinner"); + let _spg = span.enter(); + trace2!("================= POLL"); + loop { + break if self.complete { + panic!("TimeBinnedStream poll on complete") + } else if self.done { + self.complete = true; + Ready(None) + } else if self.done_data { + self.done = true; + if self.range_final { + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) + } else { + continue; + } + } else { + match self.poll_input(cx) { + Ok(item) => match item { + ControlFlow::Continue(()) => continue, + ControlFlow::Break(item) => match item { + Ready(item) => break Ready(Some(item)), + Pending => break Pending, + }, + }, + Err(e) => { + self.done = true; + break Ready(Some(Err(e))); + } + } + }; + } + } +} + +//impl WithTransformProperties for TimeBinnedStream where T: TimeBinnableTy {} + +//impl TimeBinnableStreamTrait for TimeBinnedStream where T: TimeBinnableTy {} diff --git a/crates/streams/src/timebin/cached.rs b/crates/streams/src/timebin/cached.rs new file mode 100644 index 0000000..749e400 --- /dev/null +++ b/crates/streams/src/timebin/cached.rs @@ -0,0 +1,4 @@ +// mods for: +// time-binned at any resolution. + +pub mod reader; diff --git a/crates/streams/src/timebin/cached/reader.rs b/crates/streams/src/timebin/cached/reader.rs new file mode 100644 index 0000000..66b7a6e --- /dev/null +++ b/crates/streams/src/timebin/cached/reader.rs @@ -0,0 +1,30 @@ +use err::thiserror; +use err::ThisError; +use futures_util::Stream; +use items_2::binsdim0::BinsDim0; +use netpod::BinnedRange; +use netpod::DtMs; +use netpod::TsNano; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +#[derive(Debug, ThisError)] +#[cstm(name = "BinCachedReader")] +pub enum Error {} + +pub struct CachedReader {} + +impl CachedReader { + pub fn new(series: u64, bin_len: DtMs, range: BinnedRange) -> Self { + todo!() + } +} + +impl Stream for CachedReader { + type Item = Result, Error>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + todo!() + } +} diff --git a/crates/streams/src/timebin/fromlayers.rs b/crates/streams/src/timebin/fromlayers.rs new file mode 100644 index 0000000..2fd75cb --- /dev/null +++ b/crates/streams/src/timebin/fromlayers.rs @@ -0,0 +1,101 @@ +use crate::timebin::grid::find_next_finer_bin_len; +use err::thiserror; +use err::ThisError; +use futures_util::Stream; +use futures_util::StreamExt; +use futures_util::TryStreamExt; +use items_0::streamitem::RangeCompletableItem; +use items_0::streamitem::Sitemty; +use items_0::streamitem::StreamItem; +use items_0::timebin::TimeBinnableTy; +use items_2::binsdim0::BinsDim0; +use netpod::log::*; +use netpod::BinnedRange; +use netpod::BinnedRangeEnum; +use netpod::DtMs; +use netpod::TsNano; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +#[derive(Debug, ThisError)] +#[cstm(name = "TimeBinnedFromLayers")] +pub enum Error { + Logic, + GapFill(#[from] super::gapfill::Error), +} + +type BoxedInput = Pin>> + Send>>; + +pub struct TimeBinnedFromLayers { + inp: BoxedInput, +} + +impl TimeBinnedFromLayers { + pub fn type_name() -> &'static str { + core::any::type_name::() + } + + pub fn new( + series: u64, + range: BinnedRange, + do_time_weight: bool, + bin_len_layers: Vec, + ) -> Result { + info!( + "{}::new {:?} {:?} {:?}", + Self::type_name(), + series, + range, + bin_len_layers + ); + // cases: + // if this bin_len is a cachable bin_len: + // - have to attempt to read from cache. + // expect to read bins in a stream (randomize to small max len for testing). + // also, if this bin_len is a cachable bin_len: + // must produce bins missing in cache from separate stream. + let bin_len = DtMs::from_ms_u64(range.bin_len.ms()); + if bin_len_layers.contains(&bin_len) { + let inp = super::gapfill::GapFill::new(series, bin_len, range)?; + let ret = Self { inp: Box::pin(inp) }; + Ok(ret) + } else { + match find_next_finer_bin_len(bin_len, &bin_len_layers) { + Some(finer) => { + // TODO + // produce from binned sub-stream with additional binner. + let inp = super::gapfill::GapFill::new(series, bin_len, range.clone())? + // .map(|item| { + // let ret = match item { + // Ok(k) => Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))), + // Err(e) => Err(::err::Error::from_string(e)), + // }; + // ret + // }) + ; + let inp = super::basic::TimeBinnedStream::new( + Box::pin(inp), + BinnedRangeEnum::Time(range), + do_time_weight, + ); + let ret = Self { inp: Box::pin(inp) }; + Ok(ret) + } + None => { + // TODO + // produce from events + todo!() + } + } + } + } +} + +impl Stream for TimeBinnedFromLayers { + type Item = Sitemty>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + todo!() + } +} diff --git a/crates/streams/src/timebin/gapfill.rs b/crates/streams/src/timebin/gapfill.rs new file mode 100644 index 0000000..27f62de --- /dev/null +++ b/crates/streams/src/timebin/gapfill.rs @@ -0,0 +1,49 @@ +use err::thiserror; +use err::ThisError; +use futures_util::Stream; +use items_0::streamitem::Sitemty; +use items_2::binsdim0::BinsDim0; +use netpod::BinnedRange; +use netpod::DtMs; +use netpod::TsNano; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +#[derive(Debug, ThisError)] +#[cstm(name = "BinCachedGapFill")] +pub enum Error {} + +// Try to read from cache for the given bin len. +// For gaps in the stream, construct an alternative input from finer bin len with a binner. +pub struct GapFill {} + +impl GapFill { + pub fn new(series: u64, bin_len: DtMs, range: BinnedRange) -> Result { + // TODO assert that the requested bin_len is a cacheable length. + todo!() + } +} + +impl Stream for GapFill { + type Item = Sitemty>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + // When do we detect a gap: + // - when the current item poses a gap to the last. + // - when we see EOS before the requested range is filled. + // Requirements: + // Must always request fully cache-aligned ranges. + // Must remember where the last bin ended. + + // When a gap is detected: + // - buffer the current item, if there is one (can also be EOS). + // - create a new producer of bin: + // - FromFiner(series, bin_len, range) + // what does FromFiner bring to the table? + // It does not attempt to read the given bin-len from a cache, because we just did attempt that. + // It still requires that bin-len is cacheable. (NO! it must work with the layering that I passed!) + // Then it finds the next cacheable + todo!() + } +} diff --git a/crates/streams/src/timebin/grid.rs b/crates/streams/src/timebin/grid.rs new file mode 100644 index 0000000..ad2cc13 --- /dev/null +++ b/crates/streams/src/timebin/grid.rs @@ -0,0 +1,7 @@ +use netpod::DtMs; + +// Find the next finer bin len from the passed list. +// The list is assumed to be sorted ascending, meaning finer bin len first. +pub fn find_next_finer_bin_len(bin_len: DtMs, layers: &[DtMs]) -> Option { + todo!("find_next_finer_bin_len") +} diff --git a/crates/streams/src/timebinnedjson.rs b/crates/streams/src/timebinnedjson.rs index 1ba9e20..87eb985 100644 --- a/crates/streams/src/timebinnedjson.rs +++ b/crates/streams/src/timebinnedjson.rs @@ -12,7 +12,9 @@ use futures_util::Stream; use futures_util::StreamExt; use items_0::collect_s::Collectable; use items_0::on_sitemty_data; +use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; +use items_0::streamitem::StreamItem; use items_0::timebin::TimeBinned; use items_0::transform::TimeBinnableStreamBox; use items_0::transform::TimeBinnableStreamTrait; @@ -21,9 +23,11 @@ use items_2::channelevents::ChannelEvents; use items_2::merger::Merger; use items_2::streams::PlainEventStream; use netpod::log::*; +use netpod::log::*; use netpod::range::evrange::NanoRange; use netpod::BinnedRangeEnum; use netpod::ChannelTypeConfigGen; +use netpod::DtMs; use netpod::ReqCtx; use query::api4::binned::BinnedQuery; use serde_json::Value as JsonValue; @@ -224,18 +228,78 @@ async fn timebinned_stream( ctx: &ReqCtx, open_bytes: OpenBoxedBytesStreamsBox, ) -> Result>> + Send>>, Error> { - let range = binned_range.binned_range_time().to_nano_range(); + use netpod::query::CacheUsage; + match query.cache_usage() { + CacheUsage::Use | CacheUsage::Recreate => { + let series = if let Some(x) = query.channel().series() { + x + } else { + return Err(Error::with_msg_no_trace( + "cached time binned only available given a series id", + )); + }; + info!("--- CACHING PATH ---"); + info!("{query:?}"); + info!("subgrids {:?}", query.subgrids()); + let range = binned_range.binned_range_time().to_nano_range(); + let do_time_weight = true; + let bin_len_layers = if let Some(subgrids) = query.subgrids() { + subgrids + .iter() + .map(|&x| DtMs::from_ms_u64(1000 * x.as_secs())) + .collect() + } else { + vec![ + DtMs::from_ms_u64(1000 * 60), + // DtMs::from_ms_u64(1000 * 60 * 60), + // DtMs::from_ms_u64(1000 * 60 * 60 * 12), + // DtMs::from_ms_u64(1000 * 10), + ] + }; + let stream = crate::timebin::TimeBinnedFromLayers::new( + series, + binned_range.binned_range_time(), + do_time_weight, + bin_len_layers, + ) + .map_err(Error::from_string)?; + // Possible to simplify these kind of seemingly simple type conversions? + let stream = stream.map(|item| match item { + Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))) => Ok(StreamItem::DataItem( + RangeCompletableItem::Data(Box::new(k) as Box), + )), + Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) => { + Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) + } + Ok(StreamItem::Log(k)) => Ok(StreamItem::Log(k)), + Ok(StreamItem::Stats(k)) => Ok(StreamItem::Stats(k)), + Err(e) => Err(e), + }); + // let stream = stream.map(|item| match item { + // Ok(k) => { + // let k = Box::new(k) as Box; + // Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))) + // } + // Err(e) => Err(::err::Error::from_string(e)), + // }); + let stream: Pin>> + Send>> = Box::pin(stream); + Ok(stream) + } + CacheUsage::Ignore => { + let range = binned_range.binned_range_time().to_nano_range(); - let do_time_weight = true; - let one_before_range = true; + let do_time_weight = true; + let one_before_range = true; - let stream = timebinnable_stream(query.clone(), range, one_before_range, ch_conf, ctx, open_bytes).await?; - let stream: Pin> = stream.0; - let stream = Box::pin(stream); - // TODO rename TimeBinnedStream to make it more clear that it is the component which initiates the time binning. - let stream = TimeBinnedStream::new(stream, binned_range, do_time_weight); - let stream: Pin>> + Send>> = Box::pin(stream); - Ok(stream) + let stream = timebinnable_stream(query.clone(), range, one_before_range, ch_conf, ctx, open_bytes).await?; + let stream: Pin> = stream.0; + let stream = Box::pin(stream); + // TODO rename TimeBinnedStream to make it more clear that it is the component which initiates the time binning. + let stream = TimeBinnedStream::new(stream, binned_range, do_time_weight); + let stream: Pin>> + Send>> = Box::pin(stream); + Ok(stream) + } + } } fn timebinned_to_collectable( @@ -268,6 +332,16 @@ pub async fn timebinned_json( let collected = Collect::new(stream, deadline, collect_max, bytes_max, None, Some(binned_range)); let collected: BoxFuture<_> = Box::pin(collected); let collected = collected.await?; + info!("timebinned_json collected type_name {:?}", collected.type_name()); + let collected = if let Some(bins) = collected + .as_any_ref() + .downcast_ref::>() + { + info!("MATCHED"); + bins.boxed_collected_with_enum_fix() + } else { + collected + }; let jsval = serde_json::to_value(&collected)?; Ok(jsval) }