diff --git a/crates/daqbuffer/Cargo.toml b/crates/daqbuffer/Cargo.toml index 8ffe5a8..25ec841 100644 --- a/crates/daqbuffer/Cargo.toml +++ b/crates/daqbuffer/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqbuffer" -version = "0.5.3-aa.5" +version = "0.5.3-aa.6" authors = ["Dominik Werder "] edition = "2021" diff --git a/crates/disk/src/eventchunker.rs b/crates/disk/src/eventchunker.rs index 288e29f..b21dec3 100644 --- a/crates/disk/src/eventchunker.rs +++ b/crates/disk/src/eventchunker.rs @@ -34,6 +34,9 @@ use streams::dtflags::*; use streams::filechunkread::FileChunkRead; use streams::needminbuffer::NeedMinBuffer; +#[allow(unused)] +macro_rules! trace_parse_buf { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) } + #[derive(Debug, ThisError, Serialize, Deserialize)] #[cstm(name = "DatabufferDataParse")] pub enum DataParseError { @@ -210,7 +213,7 @@ impl EventChunker { fn parse_buf_inner(&mut self, buf: &mut BytesMut) -> Result<(ParseResult, Vec), DataParseError> { use byteorder::ReadBytesExt; use byteorder::BE; - trace!("parse_buf_inner buf len {}", buf.len()); + trace_parse_buf!("parse_buf_inner buf len {}", buf.len()); let mut ret = EventFull::empty(); let mut log_items = Vec::new(); let mut parsed_bytes = 0; diff --git a/crates/items_2/src/binning/aggregator.rs b/crates/items_2/src/binning/aggregator.rs index c6cca0c..10194e8 100644 --- a/crates/items_2/src/binning/aggregator.rs +++ b/crates/items_2/src/binning/aggregator.rs @@ -9,6 +9,9 @@ use serde::Serialize; #[allow(unused)] macro_rules! trace_event { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) } +#[allow(unused)] +macro_rules! trace_result { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) } + pub trait AggTimeWeightOutputAvg: fmt::Debug + Clone + Send + Serialize + for<'a> Deserialize<'a> {} impl AggTimeWeightOutputAvg for u8 {} @@ -93,7 +96,7 @@ impl AggregatorTimeWeight for AggregatorNumeric { fn result_and_reset_for_new_bin(&mut self, filled_width_fraction: f32) -> f32 { let sum = self.sum.clone() as f32; - trace!("result_and_reset_for_new_bin sum {} {}", sum, filled_width_fraction); + trace_result!("result_and_reset_for_new_bin sum {} {}", sum, filled_width_fraction); self.sum = 0.; sum / filled_width_fraction } @@ -108,7 +111,7 @@ macro_rules! impl_agg_tw_for_agg_num { fn ingest(&mut self, dt: DtNano, bl: DtNano, val: $evt) { let f = dt.ns() as f64 / bl.ns() as f64; - trace!("INGEST {} {}", f, val); + trace_event!("INGEST {} {}", f, val); self.sum += f * val as f64; } @@ -118,7 +121,7 @@ macro_rules! impl_agg_tw_for_agg_num { fn result_and_reset_for_new_bin(&mut self, filled_width_fraction: f32) -> f64 { let sum = self.sum.clone(); - trace!( + trace_result!( "result_and_reset_for_new_bin sum {} {}", sum, filled_width_fraction @@ -145,7 +148,7 @@ impl AggregatorTimeWeight for AggregatorNumeric { fn ingest(&mut self, dt: DtNano, bl: DtNano, val: u64) { let f = dt.ns() as f64 / bl.ns() as f64; - trace!("INGEST {} {}", f, val); + trace_event!("INGEST {} {}", f, val); self.sum += f * val as f64; } @@ -155,7 +158,7 @@ impl AggregatorTimeWeight for AggregatorNumeric { fn result_and_reset_for_new_bin(&mut self, filled_width_fraction: f32) -> f64 { let sum = self.sum.clone(); - trace!("result_and_reset_for_new_bin sum {} {}", sum, filled_width_fraction); + trace_result!("result_and_reset_for_new_bin sum {} {}", sum, filled_width_fraction); self.sum = 0.; sum / filled_width_fraction as f64 } @@ -168,7 +171,7 @@ impl AggregatorTimeWeight for AggregatorNumeric { fn ingest(&mut self, dt: DtNano, bl: DtNano, val: bool) { let f = dt.ns() as f64 / bl.ns() as f64; - trace!("INGEST {} {}", f, val); + trace_event!("INGEST {} {}", f, val); self.sum += f * val as u8 as f64; } @@ -178,7 +181,7 @@ impl AggregatorTimeWeight for AggregatorNumeric { fn result_and_reset_for_new_bin(&mut self, filled_width_fraction: f32) -> f64 { let sum = self.sum.clone(); - trace!("result_and_reset_for_new_bin sum {} {}", sum, filled_width_fraction); + trace_result!("result_and_reset_for_new_bin sum {} {}", sum, filled_width_fraction); self.sum = 0.; sum / filled_width_fraction as f64 } @@ -191,7 +194,7 @@ impl AggregatorTimeWeight for AggregatorNumeric { fn ingest(&mut self, dt: DtNano, bl: DtNano, val: String) { let f = dt.ns() as f64 / bl.ns() as f64; - trace!("INGEST {} {}", f, val); + trace_event!("INGEST {} {}", f, val); self.sum += f * val.len() as f64; } @@ -201,7 +204,7 @@ impl AggregatorTimeWeight for AggregatorNumeric { fn result_and_reset_for_new_bin(&mut self, filled_width_fraction: f32) -> f64 { let sum = self.sum.clone(); - trace!("result_and_reset_for_new_bin sum {} {}", sum, filled_width_fraction); + trace_result!("result_and_reset_for_new_bin sum {} {}", sum, filled_width_fraction); self.sum = 0.; sum / filled_width_fraction as f64 } diff --git a/crates/items_2/src/binning/timeweight/timeweight_events.rs b/crates/items_2/src/binning/timeweight/timeweight_events.rs index 4f2d757..9597ffa 100644 --- a/crates/items_2/src/binning/timeweight/timeweight_events.rs +++ b/crates/items_2/src/binning/timeweight/timeweight_events.rs @@ -42,7 +42,13 @@ macro_rules! trace_ingest_firsts { ($($arg:tt)*) => ( if true { trace_!($($arg)* macro_rules! trace_ingest_finish_bin { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) } #[allow(unused)] -macro_rules! trace_ingest_container { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) } +macro_rules! trace_ingest_container { ($($arg:tt)*) => ( if false { trace_!($($arg)*); }) } + +#[allow(unused)] +macro_rules! trace_ingest_container_2 { ($($arg:tt)*) => ( if false { trace_!($($arg)*); }) } + +#[allow(unused)] +macro_rules! trace_fill_until { ($($arg:tt)*) => ( if false { trace_!($($arg)*); }) } #[cold] #[inline] @@ -197,7 +203,7 @@ where } else { self.ingest_event_with_lst_gt_range_beg(ev.clone(), LstMut(lst.0), minmax)?; self.cnt += 1; - trace_ingest_firsts!("{selfname} now calling ingest_with_lst_gt_range_beg"); + trace_ingest_event!("{selfname} now calling ingest_with_lst_gt_range_beg"); return self.ingest_with_lst_gt_range_beg(evs, LstMut(lst.0), minmax); } } @@ -232,7 +238,7 @@ where assert!(b.filled_until < ts); assert!(ts <= b.active_end); let dt = ts.delta(b.filled_until); - trace_cycle!("fill_until ts {:?} dt {:?} lst {:?}", ts, dt, lst.0); + trace_fill_until!("fill_until ts {:?} dt {:?} lst {:?}", ts, dt, lst.0); assert!(b.filled_until < ts); assert!(ts <= b.active_end); b.agg.ingest(dt, b.active_len, lst.0.val.clone()); @@ -280,7 +286,7 @@ where fn ingest_with_lst(&mut self, mut evs: ContainerEventsTakeUpTo, lst: LstMut) -> Result<(), Error> { let selfname = "ingest_with_lst"; - trace_ingest_container!("{selfname}"); + trace_ingest_container!("{selfname} evs len {}", evs.len()); let b = &mut self.inner_b; if let Some(minmax) = self.minmax.as_mut() { b.ingest_with_lst_minmax(evs, lst, minmax) @@ -587,7 +593,6 @@ where return Err(Error::EventAfterRange); } if ts >= b.active_end { - trace_cycle!("bin edge boundary {:?}", b.active_end); assert!(b.filled_until < b.active_end, "{} < {}", b.filled_until, b.active_end); self.cycle_01(ts); } @@ -603,7 +608,7 @@ where } else { self.ingest_ordered(evs)? }; - trace_ingest_container!("ingest after still left len evs {}", evs_all.len()); + trace_ingest_container_2!("ingest after still left len evs {}", evs_all.len()); let n2 = evs_all.len(); if n2 != 0 { if n2 == n1 { diff --git a/crates/streams/src/events.rs b/crates/streams/src/events.rs new file mode 100644 index 0000000..7d0f1ac --- /dev/null +++ b/crates/streams/src/events.rs @@ -0,0 +1 @@ +pub mod convertforbinning; diff --git a/crates/streams/src/events/convertforbinning.rs b/crates/streams/src/events/convertforbinning.rs new file mode 100644 index 0000000..f4fae64 --- /dev/null +++ b/crates/streams/src/events/convertforbinning.rs @@ -0,0 +1,166 @@ +use futures_util::Stream; +use futures_util::StreamExt; +use items_0::streamitem::RangeCompletableItem::*; +use items_0::streamitem::Sitemty; +use items_0::streamitem::StreamItem::*; +use items_0::Empty; +use items_2::channelevents::ChannelEvents; +use items_2::eventsdim0::EventsDim0; +use netpod::EnumVariant; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +pub struct ConvertForBinning { + inp: Pin> + Send>>, +} + +impl ConvertForBinning { + pub fn new(inp: Pin> + Send>>) -> Self { + Self { inp } + } +} + +impl Stream for ConvertForBinning { + type Item = Sitemty; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + match self.inp.poll_next_unpin(cx) { + Ready(Some(item)) => match &item { + Ok(DataItem(Data(cevs))) => match cevs { + ChannelEvents::Events(evs) => { + if let Some(evs) = evs.as_any_ref().downcast_ref::>() { + let mut dst = EventsDim0::::empty(); + for ((&ts, &pulse), val) in evs + .tss() + .iter() + .zip(evs.pulses.iter()) + .zip(evs.private_values_ref().iter()) + { + dst.push_back(ts, pulse, val.ix()); + } + let item = Ok(DataItem(Data(ChannelEvents::Events(Box::new(dst))))); + Ready(Some(item)) + } else if let Some(evs) = evs.as_any_ref().downcast_ref::>() { + let mut dst = EventsDim0::::empty(); + for ((&ts, &pulse), &val) in evs + .tss() + .iter() + .zip(evs.pulses.iter()) + .zip(evs.private_values_ref().iter()) + { + dst.push_back(ts, pulse, val as u8); + } + let item = Ok(DataItem(Data(ChannelEvents::Events(Box::new(dst))))); + Ready(Some(item)) + } else if let Some(evs) = evs.as_any_ref().downcast_ref::>() { + let mut dst = EventsDim0::::empty(); + for ((&ts, &pulse), _) in evs + .tss() + .iter() + .zip(evs.pulses.iter()) + .zip(evs.private_values_ref().iter()) + { + dst.push_back(ts, pulse, 1); + } + let item = Ok(DataItem(Data(ChannelEvents::Events(Box::new(dst))))); + Ready(Some(item)) + } else { + Ready(Some(item)) + } + } + ChannelEvents::Status(_) => Ready(Some(item)), + }, + _ => Ready(Some(item)), + }, + Ready(None) => Ready(None), + Pending => Pending, + } + } +} + +pub struct ConvertForTesting { + inp: Pin> + Send>>, +} + +impl ConvertForTesting { + pub fn new(inp: Pin> + Send>>) -> Self { + Self { inp } + } +} + +impl Stream for ConvertForTesting { + type Item = Sitemty; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + match self.inp.poll_next_unpin(cx) { + Ready(Some(item)) => match &item { + Ok(DataItem(Data(cevs))) => match cevs { + ChannelEvents::Events(evs) => { + if let Some(evs) = evs.as_any_ref().downcast_ref::>() { + let buf = std::fs::read("evmod").unwrap_or(Vec::new()); + let s = String::from_utf8_lossy(&buf); + if s.contains("u8") { + use items_0::Empty; + let mut dst = EventsDim0::::empty(); + for (ts, val) in evs.tss().iter().zip(evs.private_values_ref().iter()) { + let v = (val * 1e6) as u8; + dst.push_back(*ts, 0, v); + } + let item = Ok(DataItem(Data(ChannelEvents::Events(Box::new(dst))))); + Ready(Some(item)) + } else if s.contains("i16") { + use items_0::Empty; + let mut dst = EventsDim0::::empty(); + for (ts, val) in evs.tss().iter().zip(evs.private_values_ref().iter()) { + let v = (val * 1e6) as i16 - 50; + dst.push_back(*ts, 0, v); + } + let item = Ok(DataItem(Data(ChannelEvents::Events(Box::new(dst))))); + Ready(Some(item)) + } else if s.contains("bool") { + use items_0::Empty; + let mut dst = EventsDim0::::empty(); + for (ts, val) in evs.tss().iter().zip(evs.private_values_ref().iter()) { + let g = u64::from_ne_bytes(val.to_ne_bytes()); + let val = g % 2 == 0; + dst.push_back(*ts, 0, val); + } + let item = Ok(DataItem(Data(ChannelEvents::Events(Box::new(dst))))); + Ready(Some(item)) + } else if s.contains("enum") { + use items_0::Empty; + let mut dst = EventsDim0::::empty(); + for (ts, val) in evs.tss().iter().zip(evs.private_values_ref().iter()) { + let buf = val.to_ne_bytes(); + let h = buf[0] ^ buf[1] ^ buf[2] ^ buf[3] ^ buf[4] ^ buf[5] ^ buf[6] ^ buf[7]; + dst.push_back(*ts, 0, EnumVariant::new(h as u16, h.to_string())); + } + let item = Ok(DataItem(Data(ChannelEvents::Events(Box::new(dst))))); + Ready(Some(item)) + } else if s.contains("string") { + use items_0::Empty; + let mut dst = EventsDim0::::empty(); + for (ts, val) in evs.tss().iter().zip(evs.private_values_ref().iter()) { + dst.push_back(*ts, 0, val.to_string()); + } + let item = Ok(DataItem(Data(ChannelEvents::Events(Box::new(dst))))); + Ready(Some(item)) + } else { + Ready(Some(item)) + } + } else { + Ready(Some(item)) + } + } + ChannelEvents::Status(_) => Ready(Some(item)), + }, + _ => Ready(Some(item)), + }, + Ready(None) => Ready(None), + Pending => Pending, + } + } +} diff --git a/crates/streams/src/lib.rs b/crates/streams/src/lib.rs index 605ce96..e026435 100644 --- a/crates/streams/src/lib.rs +++ b/crates/streams/src/lib.rs @@ -2,6 +2,7 @@ pub mod boxed; pub mod cbor_stream; pub mod collect; pub mod dtflags; +pub mod events; pub mod eventsplainreader; pub mod filechunkread; pub mod firsterr; diff --git a/crates/streams/src/timebin.rs b/crates/streams/src/timebin.rs index 4884d55..29d7b5f 100644 --- a/crates/streams/src/timebin.rs +++ b/crates/streams/src/timebin.rs @@ -1,6 +1,8 @@ -mod basic; pub mod cached; pub mod fromevents; +pub mod timebin; + +mod basic; mod fromlayers; mod gapfill; mod grid; diff --git a/crates/streams/src/timebin/cached/reader.rs b/crates/streams/src/timebin/cached/reader.rs index c25cfc3..aea708c 100644 --- a/crates/streams/src/timebin/cached/reader.rs +++ b/crates/streams/src/timebin/cached/reader.rs @@ -7,11 +7,9 @@ use futures_util::StreamExt; use items_0::streamitem::Sitemty; use items_0::timebin::BinsBoxed; use items_2::channelevents::ChannelEvents; -use items_2::eventsdim0::EventsDim0; use netpod::log::*; use netpod::BinnedRange; use netpod::DtMs; -use netpod::EnumVariant; use netpod::TsNano; use query::api4::events::EventsSubQuery; use std::future::Future; @@ -53,103 +51,8 @@ impl Stream for EventsReading { use items_0::streamitem::StreamItem::*; match &item { Ok(DataItem(Data(cevs))) => match cevs { - ChannelEvents::Events(evs) => { - if let Some(evs) = evs.as_any_ref().downcast_ref::>() { - use items_0::Empty; - let mut dst = EventsDim0::::empty(); - for ((&ts, &pulse), val) in evs - .tss() - .iter() - .zip(evs.pulses.iter()) - .zip(evs.private_values_ref().iter()) - { - dst.push_back(ts, pulse, val.ix()); - } - let item = Ok(DataItem(Data(ChannelEvents::Events(Box::new(dst))))); - Ready(Some(item)) - } else if let Some(evs) = evs.as_any_ref().downcast_ref::>() { - use items_0::Empty; - let mut dst = EventsDim0::::empty(); - for ((&ts, &pulse), &val) in evs - .tss() - .iter() - .zip(evs.pulses.iter()) - .zip(evs.private_values_ref().iter()) - { - dst.push_back(ts, pulse, val as u8); - } - let item = Ok(DataItem(Data(ChannelEvents::Events(Box::new(dst))))); - Ready(Some(item)) - } else if let Some(evs) = evs.as_any_ref().downcast_ref::>() { - use items_0::Empty; - let mut dst = EventsDim0::::empty(); - for ((&ts, &pulse), _) in evs - .tss() - .iter() - .zip(evs.pulses.iter()) - .zip(evs.private_values_ref().iter()) - { - dst.push_back(ts, pulse, 1); - } - let item = Ok(DataItem(Data(ChannelEvents::Events(Box::new(dst))))); - Ready(Some(item)) - } else if let Some(evs) = evs.as_any_ref().downcast_ref::>() { - let buf = std::fs::read("evmod").unwrap_or(Vec::new()); - let s = String::from_utf8_lossy(&buf); - if s.contains("u8") { - use items_0::Empty; - let mut dst = EventsDim0::::empty(); - for (ts, val) in evs.tss().iter().zip(evs.private_values_ref().iter()) { - let v = (val * 1e6) as u8; - dst.push_back(*ts, 0, v); - } - let item = Ok(DataItem(Data(ChannelEvents::Events(Box::new(dst))))); - Ready(Some(item)) - } else if s.contains("i16") { - use items_0::Empty; - let mut dst = EventsDim0::::empty(); - for (ts, val) in evs.tss().iter().zip(evs.private_values_ref().iter()) { - let v = (val * 1e6) as i16 - 50; - dst.push_back(*ts, 0, v); - } - let item = Ok(DataItem(Data(ChannelEvents::Events(Box::new(dst))))); - Ready(Some(item)) - } else if s.contains("bool") { - use items_0::Empty; - let mut dst = EventsDim0::::empty(); - for (ts, val) in evs.tss().iter().zip(evs.private_values_ref().iter()) { - let g = u64::from_ne_bytes(val.to_ne_bytes()); - let val = g % 2 == 0; - dst.push_back(*ts, 0, val); - } - let item = Ok(DataItem(Data(ChannelEvents::Events(Box::new(dst))))); - Ready(Some(item)) - } else if s.contains("enum") { - use items_0::Empty; - let mut dst = EventsDim0::::empty(); - for (ts, val) in evs.tss().iter().zip(evs.private_values_ref().iter()) { - let buf = val.to_ne_bytes(); - let h = buf[0] ^ buf[1] ^ buf[2] ^ buf[3] ^ buf[4] ^ buf[5] ^ buf[6] ^ buf[7]; - dst.push_back(*ts, 0, EnumVariant::new(h as u16, h.to_string())); - } - let item = Ok(DataItem(Data(ChannelEvents::Events(Box::new(dst))))); - Ready(Some(item)) - } else if s.contains("string") { - use items_0::Empty; - let mut dst = EventsDim0::::empty(); - for (ts, val) in evs.tss().iter().zip(evs.private_values_ref().iter()) { - dst.push_back(*ts, 0, val.to_string()); - } - let item = Ok(DataItem(Data(ChannelEvents::Events(Box::new(dst))))); - Ready(Some(item)) - } else { - Ready(Some(item)) - } - } else { - Ready(Some(item)) - } - } - ChannelEvents::Status(conn_status_event) => Ready(Some(item)), + ChannelEvents::Events(_) => Ready(Some(item)), + ChannelEvents::Status(_) => Ready(Some(item)), }, _ => Ready(Some(item)), } @@ -248,12 +151,6 @@ impl Stream for CachedReader { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - // TODO - // Must split over different msp (because pkey). - // If we choose the partitioning length low enough, no need to issue multiple queries. - // Change the worker interface: - // We should already compute here the msp and off because we must here implement the loop logic. - // Therefore worker interface should not accept BinnedRange, but msp and off range. loop { break if let Some(fut) = self.reading.as_mut() { match fut.poll_unpin(cx) { diff --git a/crates/streams/src/timebin/fromevents.rs b/crates/streams/src/timebin/fromevents.rs index a8a5961..3b805c7 100644 --- a/crates/streams/src/timebin/fromevents.rs +++ b/crates/streams/src/timebin/fromevents.rs @@ -1,4 +1,5 @@ use super::cached::reader::EventsReadProvider; +use crate::events::convertforbinning::ConvertForBinning; use err::thiserror; use err::ThisError; use futures_util::Stream; @@ -7,9 +8,9 @@ use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; use items_0::timebin::BinsBoxed; +use items_2::binning::timeweight::timeweight_events_dyn::BinnedEventsTimeweightStream; use netpod::log::*; use netpod::BinnedRange; -use netpod::ChConf; use netpod::TsNano; use query::api4::events::EventsSubQuery; use std::pin::Pin; @@ -39,9 +40,10 @@ impl BinnedFromEvents { panic!(); } let stream = read_provider.read(evq); + let stream = ConvertForBinning::new(Box::pin(stream)); let stream = if do_time_weight { let stream = Box::pin(stream); - items_2::binning::timeweight::timeweight_events_dyn::BinnedEventsTimeweightStream::new(range, stream) + BinnedEventsTimeweightStream::new(range, stream) } else { panic!("non-weighted TODO") }; diff --git a/crates/streams/src/timebin/fromlayers.rs b/crates/streams/src/timebin/fromlayers.rs index 96c3d68..b9c1174 100644 --- a/crates/streams/src/timebin/fromlayers.rs +++ b/crates/streams/src/timebin/fromlayers.rs @@ -1,32 +1,21 @@ use super::cached::reader::CacheReadProvider; use super::cached::reader::EventsReadProvider; -use crate::tcprawclient::OpenBoxedBytesStreamsBox; use crate::timebin::fromevents::BinnedFromEvents; use crate::timebin::grid::find_next_finer_bin_len; use err::thiserror; use err::ThisError; use futures_util::Stream; use futures_util::StreamExt; -use futures_util::TryStreamExt; -use items_0::on_sitemty_data; -use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; -use items_0::streamitem::StreamItem; -use items_0::timebin::BinningggContainerBinsDyn; use items_0::timebin::BinsBoxed; -use items_0::timebin::TimeBinnableTy; use items_2::binning::timeweight::timeweight_bins_dyn::BinnedBinsTimeweightStream; -use items_2::binsdim0::BinsDim0; use netpod::log::*; use netpod::query::CacheUsage; use netpod::range::evrange::SeriesRange; use netpod::BinnedRange; -use netpod::BinnedRangeEnum; -use netpod::ChConf; use netpod::ChannelTypeConfigGen; use netpod::DtMs; use netpod::ReqCtx; -use netpod::SeriesKind; use netpod::TsNano; use query::api4::events::EventsSubQuery; use query::api4::events::EventsSubQuerySelect; @@ -42,7 +31,6 @@ use std::task::Poll; pub enum Error { GapFill(#[from] super::gapfill::Error), BinnedFromEvents(#[from] super::fromevents::Error), - SfDatabufferNotSupported, #[error("FinerGridMismatch({0}, {1})")] FinerGridMismatch(DtMs, DtMs), } @@ -50,12 +38,6 @@ pub enum Error { type BoxedInput = Pin> + Send>>; pub struct TimeBinnedFromLayers { - ch_conf: ChannelTypeConfigGen, - cache_usage: CacheUsage, - transform_query: TransformQuery, - sub: EventsSubQuerySettings, - log_level: String, - ctx: Arc, inp: BoxedInput, } @@ -101,15 +83,7 @@ impl TimeBinnedFromLayers { cache_read_provider, events_read_provider.clone(), )?; - let ret = Self { - ch_conf, - cache_usage, - transform_query, - sub, - log_level, - ctx, - inp: Box::pin(inp), - }; + let ret = Self { inp: Box::pin(inp) }; Ok(ret) } else { match find_next_finer_bin_len(bin_len, &bin_len_layers) { @@ -139,15 +113,7 @@ impl TimeBinnedFromLayers { events_read_provider.clone(), )?; let inp = BinnedBinsTimeweightStream::new(range, Box::pin(inp)); - let ret = Self { - ch_conf, - cache_usage, - transform_query, - sub, - log_level, - ctx, - inp: Box::pin(inp), - }; + let ret = Self { inp: Box::pin(inp) }; Ok(ret) } None => { @@ -162,15 +128,7 @@ impl TimeBinnedFromLayers { ); let evq = EventsSubQuery::from_parts(select, sub.clone(), ctx.reqid().into(), log_level.clone()); let inp = BinnedFromEvents::new(range, evq, do_time_weight, events_read_provider)?; - let ret = Self { - ch_conf, - cache_usage, - transform_query, - sub, - log_level, - ctx, - inp: Box::pin(inp), - }; + let ret = Self { inp: Box::pin(inp) }; debug!("{}::new setup from events", Self::type_name()); Ok(ret) } diff --git a/crates/streams/src/timebin/gapfill.rs b/crates/streams/src/timebin/gapfill.rs index 53334d5..3ed7399 100644 --- a/crates/streams/src/timebin/gapfill.rs +++ b/crates/streams/src/timebin/gapfill.rs @@ -10,6 +10,7 @@ use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; use items_0::timebin::BinsBoxed; +use items_2::binning::timeweight::timeweight_bins_dyn::BinnedBinsTimeweightStream; use netpod::log::*; use netpod::query::CacheUsage; use netpod::range::evrange::NanoRange; @@ -50,7 +51,6 @@ pub enum Error { MissingBegFromFiner(TsNano, TsNano, DtMs), #[error("InputBeforeRange({0}, {1})")] InputBeforeRange(NanoRange, BinnedRange), - SfDatabufferNotSupported, EventsReader(#[from] super::fromevents::Error), } @@ -264,7 +264,7 @@ impl GapFill { let stream = Box::pin(inp_finer); let range = BinnedRange::from_nano_range(range_finer.full_range(), self.range.bin_len.to_dt_ms()); let stream = if self.do_time_weight { - ::items_2::binning::timeweight::timeweight_bins_dyn::BinnedBinsTimeweightStream::new(range, stream) + BinnedBinsTimeweightStream::new(range, stream) } else { panic!("TODO unweighted") }; @@ -293,6 +293,7 @@ impl GapFill { } fn cache_write(mut self: Pin<&mut Self>, bins: BinsBoxed) -> Result<(), Error> { + // TODO emit bins that are ready for cache write into some separate channel let series = ::err::todoval(); self.cache_writing = Some(self.cache_read_provider.write(series, bins)); Ok(()) diff --git a/crates/streams/src/timebin/timebin.rs b/crates/streams/src/timebin/timebin.rs new file mode 100644 index 0000000..e69de29