From f550d3760279f9379345056fe5fc106e90e70fc1 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 11 Sep 2024 17:23:11 +0200 Subject: [PATCH] WIP --- crates/httpret/src/api4/binned.rs | 21 +- crates/items_0/src/items_0.rs | 6 + crates/items_0/src/streamitem.rs | 24 ++ crates/items_2/src/binsdim0.rs | 169 ++++++++++- crates/items_2/src/channelevents.rs | 8 + crates/items_2/src/eventsdim0.rs | 12 +- crates/items_2/src/eventsdim0enum.rs | 4 + crates/items_2/src/eventsdim1.rs | 4 + crates/items_2/src/eventsxbindim0.rs | 4 + crates/netpod/src/netpod.rs | 43 ++- crates/netpod/src/range/evrange.rs | 15 +- crates/nodenet/src/scylla.rs | 74 +++++ crates/scyllaconn/src/bincache.rs | 86 +++++- crates/scyllaconn/src/events2/mergert.rs | 21 +- crates/scyllaconn/src/events2/msp.rs | 1 - crates/scyllaconn/src/range.rs | 13 +- crates/scyllaconn/src/worker.rs | 31 ++ crates/streams/src/frames/inmem.rs | 10 +- crates/streams/src/timebin.rs | 1 + crates/streams/src/timebin/basic.rs | 34 +-- crates/streams/src/timebin/cached/reader.rs | 69 ++++- crates/streams/src/timebin/fromevents.rs | 80 +++++ crates/streams/src/timebin/fromlayers.rs | 71 ++++- crates/streams/src/timebin/gapfill.rs | 309 ++++++++++++++------ crates/streams/src/timebinnedjson.rs | 9 +- crates/taskrun/src/taskrun.rs | 9 + 26 files changed, 932 insertions(+), 196 deletions(-) create mode 100644 crates/streams/src/timebin/fromevents.rs diff --git a/crates/httpret/src/api4/binned.rs b/crates/httpret/src/api4/binned.rs index bfc5be5..f417360 100644 --- a/crates/httpret/src/api4/binned.rs +++ b/crates/httpret/src/api4/binned.rs @@ -22,10 +22,12 @@ use netpod::FromUrl; use netpod::NodeConfigCached; use netpod::ReqCtx; use nodenet::client::OpenBoxedBytesViaHttp; +use nodenet::scylla::ScyllaEventReadProvider; use query::api4::binned::BinnedQuery; use scyllaconn::bincache::ScyllaCacheReadProvider; use scyllaconn::worker::ScyllaQueue; use std::sync::Arc; +use streams::timebin::cached::reader::EventsReadProvider; use streams::timebin::CacheReadProvider; use tracing::Instrument; use url::Url; @@ -156,12 +158,23 @@ async fn binned_json( let open_bytes = OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone()); let open_bytes = Arc::pin(open_bytes); let cache_read_provider = scyqueue + .clone() .map(|qu| ScyllaCacheReadProvider::new(qu)) .map(|x| Arc::new(x) as Arc); - let item = streams::timebinnedjson::timebinned_json(query, ch_conf, ctx, open_bytes, cache_read_provider) - .instrument(span1) - .await - .map_err(|e| Error::BinnedStream(e))?; + let events_read_provider = scyqueue + .map(|qu| ScyllaEventReadProvider::new(qu)) + .map(|x| Arc::new(x) as Arc); + let item = streams::timebinnedjson::timebinned_json( + query, + ch_conf, + ctx, + open_bytes, + cache_read_provider, + events_read_provider, + ) + .instrument(span1) + .await + .map_err(|e| Error::BinnedStream(e))?; let ret = response(StatusCode::OK).body(ToJsonBody::from(&item).into_body())?; Ok(ret) } diff --git a/crates/items_0/src/items_0.rs b/crates/items_0/src/items_0.rs index a3e2e3e..aeff1c4 100644 --- a/crates/items_0/src/items_0.rs +++ b/crates/items_0/src/items_0.rs @@ -156,6 +156,8 @@ pub trait Events: fn to_json_vec_u8(&self) -> Vec; fn to_cbor_vec_u8(&self) -> Vec; fn clear(&mut self); + // TODO: can not name EventsDim0 from here, so use trait object for now. Anyway is a workaround. + fn to_dim0_f32_for_binning(&self) -> Box; } impl WithLen for Box { @@ -290,4 +292,8 @@ impl Events for Box { fn clear(&mut self) { Events::clear(self.as_mut()) } + + fn to_dim0_f32_for_binning(&self) -> Box { + Events::to_dim0_f32_for_binning(self.as_ref()) + } } diff --git a/crates/items_0/src/streamitem.rs b/crates/items_0/src/streamitem.rs index 3b099d0..2dec58e 100644 --- a/crates/items_0/src/streamitem.rs +++ b/crates/items_0/src/streamitem.rs @@ -117,6 +117,30 @@ macro_rules! on_sitemty_data { }}; } +#[macro_export] +macro_rules! try_map_sitemty_data { + ($item:expr, $ex:expr) => {{ + use $crate::streamitem::RangeCompletableItem; + use $crate::streamitem::StreamItem; + match $item { + Ok(x) => match x { + StreamItem::DataItem(x) => match x { + RangeCompletableItem::Data(x) => match $ex(x) { + Ok(x) => Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))), + Err(e) => Err(e), + }, + RangeCompletableItem::RangeComplete => { + Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) + } + }, + StreamItem::Log(x) => Ok(StreamItem::Log(x)), + StreamItem::Stats(x) => Ok(StreamItem::Stats(x)), + }, + Err(x) => Err(x), + } + }}; +} + pub fn sitem_data(x: X) -> Sitemty { Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))) } diff --git a/crates/items_2/src/binsdim0.rs b/crates/items_2/src/binsdim0.rs index e09fe3e..20aede6 100644 --- a/crates/items_2/src/binsdim0.rs +++ b/crates/items_2/src/binsdim0.rs @@ -109,6 +109,40 @@ where } } +impl fmt::Display for BinsDim0 +where + NTY: fmt::Debug, +{ + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + let self_name = any::type_name::(); + if true { + write!( + fmt, + "{self_name} count {:?} ts1s {:?} ts2s {:?} counts {:?} mins {:?} maxs {:?} avgs {:?}", + self.ts1s.len(), + self.ts1s.iter().map(|&k| TsNano::from_ns(k)).collect::>(), + self.ts2s.iter().map(|&k| TsNano::from_ns(k)).collect::>(), + self.counts, + self.mins, + self.maxs, + self.avgs, + ) + } else { + write!( + fmt, + "{self_name} count {:?} edges {:?} .. {:?} counts {:?} .. {:?} avgs {:?} .. {:?}", + self.ts1s.len(), + self.ts1s.front().map(|&k| TsNano::from_ns(k)), + self.ts2s.back().map(|&k| TsNano::from_ns(k)), + self.counts.front(), + self.counts.back(), + self.avgs.front(), + self.avgs.back(), + ) + } + } +} + impl BinsDim0 { pub fn push(&mut self, ts1: u64, ts2: u64, count: u64, min: NTY, max: NTY, avg: f32) { self.ts1s.push_back(ts1); @@ -317,13 +351,16 @@ where STY: ScalarOps, { ts1now: TsNano, + ts2now: TsNano, binrange: BinnedRange, do_time_weight: bool, emit_empty_bins: bool, range_complete: bool, - buf: ::Output, out: ::Output, - bins_ready_count: usize, + cnt: u64, + min: STY, + max: STY, + avg: f64, } impl BinsDim0TimeBinnerTy @@ -333,18 +370,31 @@ where pub fn new(binrange: BinnedRange, do_time_weight: bool, emit_empty_bins: bool) -> Self { // let ts1now = TsNano::from_ns(binrange.bin_off * binrange.bin_len.ns()); // let ts2 = ts1.add_dt_nano(binrange.bin_len.to_dt_nano()); + let ts1now = TsNano::from_ns(binrange.full_range().beg()); + let ts2now = ts1now.add_dt_nano(binrange.bin_len.to_dt_nano()); let buf = ::Output::empty(); Self { - ts1now: TsNano::from_ns(binrange.full_range().beg()), + ts1now, + ts2now, binrange, do_time_weight, emit_empty_bins, range_complete: false, - buf, out: ::Output::empty(), - bins_ready_count: 0, + cnt: 0, + min: STY::zero_b(), + max: STY::zero_b(), + avg: 0., } } + + // used internally for the aggregation + fn reset_agg(&mut self) { + self.cnt = 0; + self.min = STY::zero_b(); + self.max = STY::zero_b(); + self.avg = 0.; + } } impl TimeBinnerTy for BinsDim0TimeBinnerTy @@ -355,8 +405,77 @@ where type Output = BinsDim0; fn ingest(&mut self, item: &mut Self::Input) { - // item.ts1s; - todo!("TimeBinnerTy::ingest") + let mut count_before = 0; + for (((((&ts1, &ts2), &cnt), min), max), &avg) in item + .ts1s + .iter() + .zip(&item.ts2s) + .zip(&item.counts) + .zip(&item.mins) + .zip(&item.maxs) + .zip(&item.avgs) + { + if ts1 < self.ts1now.ns() { + // warn!("encountered bin from time before {} {}", ts1, self.ts1now.ns()); + count_before += 1; + continue; + } else { + if ts2 > self.ts2now.ns() { + if ts2 - ts1 > self.ts2now.ns() - self.ts1now.ns() { + panic!("incoming bin len too large"); + } else if ts1 < self.ts2now.ns() { + panic!("encountered unaligned input bin"); + } else { + let mut i = 0; + while ts1 >= self.ts2now.ns() { + self.cycle(); + i += 1; + if i > 50000 { + panic!("cycle forward too many iterations"); + } + } + } + } else { + // ok, we're still inside the current bin + } + } + if cnt == 0 { + // ignore input bin, it does not contain any valid information. + } else { + if self.cnt == 0 { + self.cnt = cnt; + self.min = min.clone(); + self.max = max.clone(); + if self.do_time_weight { + let f = (ts2 - ts1) as f64 / (self.ts2now.ns() - self.ts1now.ns()) as f64; + self.avg = avg as f64 * f; + } else { + panic!("TODO non-time-weighted binning to be impl"); + } + } else { + self.cnt += cnt; + if *min < self.min { + self.min = min.clone(); + } + if *max > self.max { + self.max = max.clone(); + } + if self.do_time_weight { + let f = (ts2 - ts1) as f64 / (self.ts2now.ns() - self.ts1now.ns()) as f64; + self.avg += avg as f64 * f; + } else { + panic!("TODO non-time-weighted binning to be impl"); + } + } + } + } + if count_before != 0 { + warn!( + "----- seen {} / {} input bins from time before", + count_before, + item.len() + ); + } } fn set_range_complete(&mut self) { @@ -364,27 +483,51 @@ where } fn bins_ready_count(&self) -> usize { - self.bins_ready_count + self.out.len() } fn bins_ready(&mut self) -> Option { - todo!("TimeBinnerTy::bins_ready") + if self.out.len() != 0 { + let ret = core::mem::replace(&mut self.out, BinsDim0::empty()); + Some(ret) + } else { + None + } } fn push_in_progress(&mut self, push_empty: bool) { - todo!("TimeBinnerTy::push_in_progress") + if self.cnt == 0 && !push_empty { + self.reset_agg(); + } else { + self.out.ts1s.push_back(self.ts1now.ns()); + self.out.ts2s.push_back(self.ts2now.ns()); + self.out.counts.push_back(self.cnt); + self.out.mins.push_back(self.min.clone()); + self.out.maxs.push_back(self.max.clone()); + self.out.avgs.push_back(self.avg as f32); + self.reset_agg(); + } } fn cycle(&mut self) { - todo!("TimeBinnerTy::cycle") + self.push_in_progress(true); + self.ts1now = self.ts1now.add_dt_nano(self.binrange.bin_len.to_dt_nano()); + self.ts2now = self.ts2now.add_dt_nano(self.binrange.bin_len.to_dt_nano()); } fn empty(&self) -> Option { - todo!("TimeBinnerTy::empty") + Some(::Output::empty()) } fn append_empty_until_end(&mut self) { - todo!("TimeBinnerTy::append_empty_until_end") + let mut i = 0; + while self.ts2now.ns() < self.binrange.full_range().end() { + self.cycle(); + i += 1; + if i > 100000 { + panic!("append_empty_until_end too many iterations"); + } + } } } diff --git a/crates/items_2/src/channelevents.rs b/crates/items_2/src/channelevents.rs index d1a0746..c563cf1 100644 --- a/crates/items_2/src/channelevents.rs +++ b/crates/items_2/src/channelevents.rs @@ -1012,6 +1012,14 @@ impl Events for ChannelEvents { } } } + + fn to_dim0_f32_for_binning(&self) -> Box { + use ChannelEvents::*; + match self { + Events(x) => x.to_dim0_f32_for_binning(), + Status(x) => panic!("ChannelEvents::to_dim0_f32_for_binning"), + } + } } impl Collectable for ChannelEvents { diff --git a/crates/items_2/src/eventsdim0.rs b/crates/items_2/src/eventsdim0.rs index 09665f3..8d333c9 100644 --- a/crates/items_2/src/eventsdim0.rs +++ b/crates/items_2/src/eventsdim0.rs @@ -189,8 +189,8 @@ where "{} {{ count {} ts {:?} .. {:?} vals {:?} .. {:?} }}", self.type_name(), self.tss.len(), - self.tss.front().map(|x| x / SEC), - self.tss.back().map(|x| x / SEC), + self.tss.front().map(|&x| TsNano::from_ns(x)), + self.tss.back().map(|&x| TsNano::from_ns(x)), self.values.front(), self.values.back(), ) @@ -1068,6 +1068,14 @@ impl Events for EventsDim0 { self.pulses.clear(); self.values.clear(); } + + fn to_dim0_f32_for_binning(&self) -> Box { + let mut ret = EventsDim0::empty(); + for (&ts, val) in self.tss.iter().zip(self.values.iter()) { + ret.push(ts, 0, val.as_prim_f32_b()); + } + Box::new(ret) + } } #[derive(Debug)] diff --git a/crates/items_2/src/eventsdim0enum.rs b/crates/items_2/src/eventsdim0enum.rs index 2e90deb..837abfa 100644 --- a/crates/items_2/src/eventsdim0enum.rs +++ b/crates/items_2/src/eventsdim0enum.rs @@ -495,4 +495,8 @@ impl Events for EventsDim0Enum { fn clear(&mut self) { todo!() } + + fn to_dim0_f32_for_binning(&self) -> Box { + todo!("{}::to_dim0_f32_for_binning", self.type_name()) + } } diff --git a/crates/items_2/src/eventsdim1.rs b/crates/items_2/src/eventsdim1.rs index c99cec8..8e20753 100644 --- a/crates/items_2/src/eventsdim1.rs +++ b/crates/items_2/src/eventsdim1.rs @@ -989,6 +989,10 @@ impl Events for EventsDim1 { self.pulses.clear(); self.values.clear(); } + + fn to_dim0_f32_for_binning(&self) -> Box { + todo!("{}::to_dim0_f32_for_binning", self.type_name()) + } } #[derive(Debug)] diff --git a/crates/items_2/src/eventsxbindim0.rs b/crates/items_2/src/eventsxbindim0.rs index dcbe670..30ca169 100644 --- a/crates/items_2/src/eventsxbindim0.rs +++ b/crates/items_2/src/eventsxbindim0.rs @@ -377,6 +377,10 @@ impl Events for EventsXbinDim0 { self.maxs.clear(); self.avgs.clear(); } + + fn to_dim0_f32_for_binning(&self) -> Box { + todo!("{}::to_dim0_f32_for_binning", self.type_name()) + } } #[derive(Debug)] diff --git a/crates/netpod/src/netpod.rs b/crates/netpod/src/netpod.rs index 43d1084..5944259 100644 --- a/crates/netpod/src/netpod.rs +++ b/crates/netpod/src/netpod.rs @@ -1897,14 +1897,12 @@ impl TsNano { } impl fmt::Debug for TsNano { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { let ts = Utc .timestamp_opt((self.0 / SEC) as i64, (self.0 % SEC) as u32) .earliest() .unwrap_or(Default::default()); - f.debug_struct("TsNano") - .field("ts", &ts.format(DATETIME_FMT_3MS).to_string()) - .finish() + write!(fmt, "TsNano {{ {} }}", ts.format(DATETIME_FMT_3MS)) } } @@ -2380,22 +2378,31 @@ where pub bin_cnt: u64, } -impl fmt::Debug for BinnedRange -where - T: Dim0Index, -{ +impl fmt::Debug for BinnedRange { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - fmt.debug_struct("BinnedRange") - .field("bin_len", &self.bin_len) - .field("bin_off", &self.bin_off) - .field("bin_cnt", &self.bin_cnt) - .finish() + let beg = self.bin_len.times(self.bin_off); + let end = self.bin_len.times(self.bin_off + self.bin_cnt); + write!(fmt, "BinnedRange {{ {}, {}, {} }}", beg, end, self.bin_len.to_dt_ms()) + } +} + +impl fmt::Debug for BinnedRange { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "BinnedRange {{ .. }}") + } +} + +impl fmt::Display for BinnedRange { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt::Debug::fmt(self, fmt) } } impl BinnedRange { pub fn to_nano_range(&self) -> NanoRange { - self.full_range() + let beg = self.bin_len.times(self.bin_off).as_u64(); + let end = self.bin_len.times(self.bin_off + self.bin_cnt).as_u64(); + NanoRange { beg, end } } pub fn from_nano_range(range: NanoRange, bin_len: DtMs) -> Self { @@ -2407,6 +2414,14 @@ impl BinnedRange { bin_cnt: off2 - off1, } } + + pub fn nano_beg(&self) -> TsNano { + self.bin_len.times(self.bin_off) + } + + pub fn nano_end(&self) -> TsNano { + self.bin_len.times(self.bin_off + self.bin_cnt) + } } impl BinnedRange diff --git a/crates/netpod/src/range/evrange.rs b/crates/netpod/src/range/evrange.rs index d2af8cf..1d01230 100644 --- a/crates/netpod/src/range/evrange.rs +++ b/crates/netpod/src/range/evrange.rs @@ -74,6 +74,10 @@ impl NanoRange { } } + pub fn from_ns_u64(beg: u64, end: u64) -> Self { + Self { beg, end } + } + pub fn delta(&self) -> u64 { self.end - self.beg } @@ -113,7 +117,7 @@ pub struct PulseRange { pub end: u64, } -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[derive(Clone, Serialize, Deserialize, PartialEq)] pub enum SeriesRange { TimeRange(NanoRange), PulseRange(PulseRange), @@ -163,6 +167,15 @@ impl SeriesRange { } } +impl fmt::Debug for SeriesRange { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + match self { + SeriesRange::TimeRange(range) => write!(fmt, "SeriesRange::TimeRange {{ {} }}", range), + SeriesRange::PulseRange(_) => write!(fmt, "SeriesRange::PulseRange {{ .. }}"), + } + } +} + impl From for SeriesRange { fn from(k: NanoRange) -> Self { Self::TimeRange(k) diff --git a/crates/nodenet/src/scylla.rs b/crates/nodenet/src/scylla.rs index bf38fd8..a3d1fd2 100644 --- a/crates/nodenet/src/scylla.rs +++ b/crates/nodenet/src/scylla.rs @@ -1,11 +1,15 @@ use err::thiserror; use err::ThisError; +use futures_util::Future; +use futures_util::FutureExt; use futures_util::Stream; use futures_util::StreamExt; use futures_util::TryStreamExt; +use items_0::on_sitemty_data; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; +use items_0::try_map_sitemty_data; use items_2::channelevents::ChannelEvents; use netpod::log::*; use netpod::ChConf; @@ -16,6 +20,9 @@ use scyllaconn::events2::mergert; use scyllaconn::worker::ScyllaQueue; use scyllaconn::SeriesId; use std::pin::Pin; +use std::task::Context; +use std::task::Poll; +use streams::timebin::cached::reader::EventsReadProvider; use taskrun::tokio; #[derive(Debug, ThisError)] @@ -129,3 +136,70 @@ pub async fn scylla_channel_event_stream( }); Ok(Box::pin(stream)) } + +struct ScyllaEventsReadStream { + fut1: Option< + Pin> + Send>>, Error>> + Send>>, + >, + stream: Option> + Send>>>, +} + +impl Stream for ScyllaEventsReadStream { + type Item = Sitemty; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + loop { + break if let Some(fut) = self.fut1.as_mut() { + match fut.poll_unpin(cx) { + Ready(Ok(x)) => { + self.fut1 = None; + self.stream = Some(x); + continue; + } + Ready(Err(e)) => Ready(Some(Err(::err::Error::from_string(e)))), + Pending => Pending, + } + } else if let Some(fut) = self.stream.as_mut() { + match fut.poll_next_unpin(cx) { + Ready(Some(x)) => { + let x = try_map_sitemty_data!(x, |x| match x { + ChannelEvents::Events(x) => { + let x = x.to_dim0_f32_for_binning(); + Ok(ChannelEvents::Events(x)) + } + ChannelEvents::Status(x) => Ok(ChannelEvents::Status(x)), + }); + Ready(Some(x)) + } + Ready(None) => Ready(None), + Pending => Pending, + } + } else { + Ready(None) + }; + } + } +} + +pub struct ScyllaEventReadProvider { + scyqueue: ScyllaQueue, +} + +impl ScyllaEventReadProvider { + pub fn new(scyqueue: ScyllaQueue) -> Self { + Self { scyqueue } + } +} + +impl EventsReadProvider for ScyllaEventReadProvider { + fn read(&self, evq: EventsSubQuery, chconf: ChConf) -> streams::timebin::cached::reader::EventsReading { + let scyqueue = self.scyqueue.clone(); + let fut1 = async move { crate::scylla::scylla_channel_event_stream(evq, chconf, &scyqueue).await }; + let stream = ScyllaEventsReadStream { + fut1: Some(Box::pin(fut1)), + stream: None, + }; + streams::timebin::cached::reader::EventsReading::new(Box::pin(stream)) + } +} diff --git a/crates/scyllaconn/src/bincache.rs b/crates/scyllaconn/src/bincache.rs index 215e39f..e649348 100644 --- a/crates/scyllaconn/src/bincache.rs +++ b/crates/scyllaconn/src/bincache.rs @@ -7,20 +7,24 @@ use futures_util::Future; use futures_util::Stream; use futures_util::StreamExt; use items_0::timebin::TimeBinned; +use items_0::Empty; use items_2::binsdim0::BinsDim0; use items_2::channelevents::ChannelEvents; use netpod::log::*; use netpod::query::CacheUsage; use netpod::timeunits::*; use netpod::AggKind; +use netpod::BinnedRange; use netpod::ChannelTyped; use netpod::Dim0Kind; +use netpod::DtMs; use netpod::PreBinnedPatchCoord; use netpod::PreBinnedPatchCoordEnum; use netpod::PreBinnedPatchRange; use netpod::PreBinnedPatchRangeEnum; use netpod::ScalarType; use netpod::Shape; +use netpod::TsNano; use query::transform::TransformQuery; use scylla::Session as ScySession; use std::collections::VecDeque; @@ -532,8 +536,88 @@ impl ScyllaCacheReadProvider { } impl streams::timebin::CacheReadProvider for ScyllaCacheReadProvider { - fn read(&self) -> streams::timebin::cached::reader::Reading { + fn read(&self, series: u64, range: BinnedRange) -> streams::timebin::cached::reader::CacheReading { warn!("impl CacheReadProvider for ScyllaCacheReadProvider"); todo!("impl CacheReadProvider for ScyllaCacheReadProvider") } + + fn write(&self, series: u64, bins: BinsDim0) -> streams::timebin::cached::reader::CacheWriting { + let scyqueue = self.scyqueue.clone(); + let fut = async move { scyqueue.write_cache_f32(series, bins).await }; + streams::timebin::cached::reader::CacheWriting::new(Box::pin(fut)) + } +} + +pub async fn worker_write( + series: u64, + bins: BinsDim0, + scy: &ScySession, +) -> Result<(), streams::timebin::cached::reader::Error> { + let mut msp_last = u64::MAX; + for (((((&ts1, &ts2), &cnt), &min), &max), &avg) in bins + .ts1s + .iter() + .zip(bins.ts2s.iter()) + .zip(bins.counts.iter()) + .zip(bins.mins.iter()) + .zip(bins.maxs.iter()) + .zip(bins.avgs.iter()) + { + let bin_len = DtMs::from_ms_u64((ts2 - ts1) / 1000000); + let part_len = DtMs::from_ms_u64(bin_len.ms() * 1000); + let div = part_len.ns(); + let msp = ts1 / div; + let off = (ts1 - msp * div) / bin_len.ns(); + let params = ( + series as i64, + bin_len.ms() as i32, + msp as i64, + off as i32, + cnt as i64, + min, + max, + avg, + ); + eprintln!("cache write {:?}", params); + scy.query( + "insert into sf_st.st_binned_scalar_f32 (series, bin_len_ms, ts_msp, off, count, min, max, avg) values (?, ?, ?, ?, ?, ?, ?, ?)", + params, + ) + .await + .map_err(|e| streams::timebin::cached::reader::Error::Scylla(e.to_string()))?; + } + Ok(()) +} + +pub async fn worker_read( + series: u64, + range: BinnedRange, + scy: &ScySession, +) -> Result, streams::timebin::cached::reader::Error> { + let bin_len: DtMs = todo!(); + let part_len = DtMs::from_ms_u64(bin_len.ms() * 1000); + let div = part_len.ns(); + let msp: u64 = 0; + let offs: core::ops::Range = todo!(); + let cql = "select off, count, min, max, avg from sf_st.st_binned_scalar_f32 where series = ? and bin_len_ms = ? and ts_msp = ? and off >= ? and off < ?"; + let params = ( + series as i64, + bin_len.ms() as i32, + msp as i64, + offs.start as i32, + offs.end as i32, + ); + let res = scy + .query_iter(cql, params) + .await + .map_err(|e| streams::timebin::cached::reader::Error::Scylla(e.to_string()))?; + let it = res.into_typed::<(i32, i64, f32, f32, f32)>(); + let mut bins = BinsDim0::empty(); + while let Some(x) = it.next().await { + let row = x.map_err(|e| streams::timebin::cached::reader::Error::Scylla(e.to_string()))?; + let off = row.0 as u64; + // TODO push bins + todo!("push bins"); + } + Ok(bins) } diff --git a/crates/scyllaconn/src/events2/mergert.rs b/crates/scyllaconn/src/events2/mergert.rs index 8516b31..238ecac 100644 --- a/crates/scyllaconn/src/events2/mergert.rs +++ b/crates/scyllaconn/src/events2/mergert.rs @@ -25,22 +25,13 @@ use std::task::Context; use std::task::Poll; #[allow(unused)] -macro_rules! trace_fetch { - ($($arg:tt)*) => { - if true { - trace!($($arg)*); - } - }; -} +macro_rules! trace_init { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } #[allow(unused)] -macro_rules! trace_emit { - ($($arg:tt)*) => { - if true { - trace!($($arg)*); - } - }; -} +macro_rules! trace_fetch { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } + +#[allow(unused)] +macro_rules! trace_emit { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } macro_rules! tracer_poll_enter { ($self:expr) => { @@ -158,7 +149,7 @@ pub struct MergeRts { impl MergeRts { pub fn new(ch_conf: ChConf, range: ScyllaSeriesRange, readopts: EventReadOpts, scyqueue: ScyllaQueue) -> Self { - info!("MergeRts readopts {readopts:?}"); + trace_init!("MergeRts readopts {readopts:?}"); Self { ch_conf, range_mt: range.clone(), diff --git a/crates/scyllaconn/src/events2/msp.rs b/crates/scyllaconn/src/events2/msp.rs index e2dfcb8..73b8dcd 100644 --- a/crates/scyllaconn/src/events2/msp.rs +++ b/crates/scyllaconn/src/events2/msp.rs @@ -118,7 +118,6 @@ impl MspStreamRt { async move { scyqueue.find_ts_msp(rt, series.id(), range, false).await } }; let do_trace_detail = netpod::TRACE_SERIES_ID.contains(&series.id()); - info!("------------------------------------- TEST INFO"); trace_emit!(do_trace_detail, "------------------------------------- TEST TRACE"); Self { rt, diff --git a/crates/scyllaconn/src/range.rs b/crates/scyllaconn/src/range.rs index c7526c3..38f9eed 100644 --- a/crates/scyllaconn/src/range.rs +++ b/crates/scyllaconn/src/range.rs @@ -2,7 +2,7 @@ use core::fmt; use netpod::range::evrange::SeriesRange; use netpod::TsNano; -#[derive(Debug, Clone)] +#[derive(Clone)] pub struct ScyllaSeriesRange { beg: u64, end: u64, @@ -29,6 +29,17 @@ impl ScyllaSeriesRange { } } +impl fmt::Debug for ScyllaSeriesRange { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!( + fmt, + "ScyllaSeriesRange {{ beg: {}, end: {} }}", + TsNano::from_ns(self.beg), + TsNano::from_ns(self.end) + ) + } +} + impl From<&SeriesRange> for ScyllaSeriesRange { fn from(value: &SeriesRange) -> Self { match value { diff --git a/crates/scyllaconn/src/worker.rs b/crates/scyllaconn/src/worker.rs index 64f25ca..7fa5df6 100644 --- a/crates/scyllaconn/src/worker.rs +++ b/crates/scyllaconn/src/worker.rs @@ -7,6 +7,7 @@ use err::thiserror; use err::ThisError; use futures_util::Future; use items_0::Events; +use items_2::binsdim0::BinsDim0; use netpod::log::*; use netpod::ttl::RetentionTime; use netpod::ScyllaConfig; @@ -29,6 +30,7 @@ pub enum Error { Join, Toplist(#[from] crate::accounting::toplist::Error), MissingKeyspaceConfig, + CacheWriteF32(#[from] streams::timebin::cached::reader::Error), } #[derive(Debug)] @@ -47,6 +49,11 @@ enum Job { TsMs, Sender>, ), + WriteCacheF32( + u64, + BinsDim0, + Sender>, + ), } struct ReadNextValues { @@ -117,6 +124,24 @@ impl ScyllaQueue { let res = rx.recv().await.map_err(|_| Error::ChannelRecv)??; Ok(res) } + + pub async fn write_cache_f32( + &self, + series: u64, + bins: BinsDim0, + ) -> Result<(), streams::timebin::cached::reader::Error> { + let (tx, rx) = async_channel::bounded(1); + let job = Job::WriteCacheF32(series, bins, tx); + self.tx + .send(job) + .await + .map_err(|_| streams::timebin::cached::reader::Error::ChannelSend)?; + let res = rx + .recv() + .await + .map_err(|_| streams::timebin::cached::reader::Error::ChannelRecv)??; + Ok(res) + } } #[derive(Debug)] @@ -191,6 +216,12 @@ impl ScyllaWorker { // TODO count for stats } } + Job::WriteCacheF32(series, bins, tx) => { + let res = super::bincache::worker_write(series, bins, &scy).await; + if tx.send(res).await.is_err() { + // TODO count for stats + } + } } } info!("scylla worker finished"); diff --git a/crates/streams/src/frames/inmem.rs b/crates/streams/src/frames/inmem.rs index b8fe29e..bb9e410 100644 --- a/crates/streams/src/frames/inmem.rs +++ b/crates/streams/src/frames/inmem.rs @@ -19,10 +19,7 @@ use tokio::io::AsyncRead; pub type BoxedBytesStream = Pin> + Send>>; #[allow(unused)] -macro_rules! trace2 { - ($($arg:tt)*) => (); - ($($arg:tt)*) => (trace!($($arg)*)); -} +macro_rules! trace2 { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ); } impl err::ToErr for crate::slidebuf::Error { fn to_err(self) -> Error { @@ -104,7 +101,7 @@ where } } - fn poll_upstream(&mut self, cx: &mut Context) -> Poll> { + fn poll_upstream(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { trace2!("poll_upstream"); use Poll::*; // use tokio::io::AsyncRead; @@ -112,7 +109,6 @@ where // let mut buf = ReadBuf::new(self.buf.available_writable_area(self.need_min.saturating_sub(self.buf.len()))?); let inp = &mut self.inp; pin_mut!(inp); - trace!("poll_upstream"); match inp.poll_next(cx) { Ready(Some(Ok(x))) => match self.buf.available_writable_area(x.len()) { Ok(dst) => { @@ -254,7 +250,7 @@ where } } } else { - match self.poll_upstream(cx) { + match self.as_mut().poll_upstream(cx) { Ready(Ok(n1)) => { if n1 == 0 { self.done = true; diff --git a/crates/streams/src/timebin.rs b/crates/streams/src/timebin.rs index 551d88c..4884d55 100644 --- a/crates/streams/src/timebin.rs +++ b/crates/streams/src/timebin.rs @@ -1,5 +1,6 @@ mod basic; pub mod cached; +pub mod fromevents; mod fromlayers; mod gapfill; mod grid; diff --git a/crates/streams/src/timebin/basic.rs b/crates/streams/src/timebin/basic.rs index 763ecb9..a8d9a88 100644 --- a/crates/streams/src/timebin/basic.rs +++ b/crates/streams/src/timebin/basic.rs @@ -17,31 +17,16 @@ use std::task::Context; use std::task::Poll; #[allow(unused)] -macro_rules! trace2 { - ($($arg:tt)*) => { - if false { - trace!($($arg)*); - } - }; -} +macro_rules! debug_first { ($($arg:tt)*) => ( if false { debug!($($arg)*); } ) } #[allow(unused)] -macro_rules! trace3 { - ($($arg:tt)*) => { - if false { - trace!($($arg)*); - } - }; -} +macro_rules! trace2 { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } #[allow(unused)] -macro_rules! trace4 { - ($($arg:tt)*) => { - if false { - trace!($($arg)*); - } - }; -} +macro_rules! trace3 { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } + +#[allow(unused)] +macro_rules! trace4 { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } type SitemtyStream = Pin> + Send>>; @@ -115,7 +100,7 @@ where self.process_item(item); let mut do_emit = false; if self.done_first_input == false { - debug!( + debug_first!( "emit container after the first input len {} binner {}", item_len, self.binner.is_some() @@ -191,13 +176,13 @@ where trace2!("================= handle_none"); let self_range_final = self.range_final; if let Some(binner) = self.binner.as_mut() { - trace!("bins ready count before finish {}", binner.bins_ready_count()); + trace2!("bins ready count before finish {}", binner.bins_ready_count()); // TODO rework the finish logic if self_range_final { binner.set_range_complete(); } binner.push_in_progress(false); - trace!("bins ready count after finish {}", binner.bins_ready_count()); + trace2!("bins ready count after finish {}", binner.bins_ready_count()); if let Some(bins) = binner.bins_ready() { self.done_data = true; Ok(Break(Ready(sitem_data(bins)))) @@ -260,6 +245,7 @@ where } else if self.done_data { self.done = true; if self.range_final { + info!("TimeBinnedStream EMIT RANGE FINAL"); Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) } else { continue; diff --git a/crates/streams/src/timebin/cached/reader.rs b/crates/streams/src/timebin/cached/reader.rs index dbb949f..99f5901 100644 --- a/crates/streams/src/timebin/cached/reader.rs +++ b/crates/streams/src/timebin/cached/reader.rs @@ -1,23 +1,71 @@ +use crate as streams; use err::thiserror; use err::ThisError; use futures_util::FutureExt; use futures_util::Stream; +use futures_util::StreamExt; +use items_0::streamitem::Sitemty; +use items_0::timebin::TimeBinnable; use items_2::binsdim0::BinsDim0; +use items_2::channelevents::ChannelEvents; +use netpod::log::*; use netpod::BinnedRange; +use netpod::ChConf; use netpod::DtMs; use netpod::TsNano; +use query::api4::events::EventsSubQuery; use std::future::Future; use std::pin::Pin; use std::sync::Arc; use std::task::Context; use std::task::Poll; -pub struct Reading { - fut: Pin, Box>> + Send>>, +pub struct EventsReading { + stream: Pin> + Send>>, } -impl Future for Reading { - type Output = Result, Box>; +impl EventsReading { + pub fn new(stream: Pin> + Send>>) -> Self { + Self { stream } + } +} + +impl Stream for EventsReading { + type Item = Sitemty; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + self.stream.poll_next_unpin(cx) + } +} + +pub trait EventsReadProvider: Send + Sync { + fn read(&self, evq: EventsSubQuery, chconf: ChConf) -> EventsReading; +} + +pub struct CacheReading { + fut: Pin, Box>> + Send>>, +} + +impl Future for CacheReading { + type Output = Result, Box>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + self.fut.poll_unpin(cx) + } +} + +pub struct CacheWriting { + fut: Pin> + Send>>, +} + +impl CacheWriting { + pub fn new(fut: Pin> + Send>>) -> Self { + Self { fut } + } +} + +impl Future for CacheWriting { + type Output = Result<(), streams::timebin::cached::reader::Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { self.fut.poll_unpin(cx) @@ -25,13 +73,17 @@ impl Future for Reading { } pub trait CacheReadProvider: Send + Sync { - fn read(&self) -> Reading; + fn read(&self, series: u64, range: BinnedRange) -> CacheReading; + fn write(&self, series: u64, bins: BinsDim0) -> CacheWriting; } #[derive(Debug, ThisError)] #[cstm(name = "BinCachedReader")] pub enum Error { TodoImpl, + ChannelSend, + ChannelRecv, + Scylla(String), } pub struct CachedReader { @@ -55,6 +107,13 @@ impl Stream for CachedReader { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; + // TODO + // Must split over different msp (because pkey). + // If we choose the partitioning length low enough, no need to issue multiple queries. + // Change the worker interface: + // We should already compute here the msp and off because we must here implement the loop logic. + // Therefore worker interface should not accept BinnedRange, but msp and off range. + error!("TODO CachedReader impl split reads over known ranges"); // Ready(Some(Err(Error::TodoImpl))) Ready(None) } diff --git a/crates/streams/src/timebin/fromevents.rs b/crates/streams/src/timebin/fromevents.rs new file mode 100644 index 0000000..3e22a70 --- /dev/null +++ b/crates/streams/src/timebin/fromevents.rs @@ -0,0 +1,80 @@ +use super::cached::reader::EventsReadProvider; +use super::cached::reader::EventsReading; +use err::thiserror; +use err::ThisError; +use futures_util::Stream; +use futures_util::StreamExt; +use items_0::streamitem::RangeCompletableItem; +use items_0::streamitem::Sitemty; +use items_0::streamitem::StreamItem; +use items_2::binsdim0::BinsDim0; +use netpod::log::*; +use netpod::BinnedRange; +use netpod::ChConf; +use netpod::TsNano; +use query::api4::events::EventsSubQuery; +use std::pin::Pin; +use std::sync::Arc; +use std::task::Context; +use std::task::Poll; + +#[derive(Debug, ThisError)] +#[cstm(name = "ReadingBinnedFromEvents")] +pub enum Error {} + +pub struct BinnedFromEvents { + stream: Pin>> + Send>>, +} + +impl BinnedFromEvents { + pub fn new( + range: BinnedRange, + evq: EventsSubQuery, + chconf: ChConf, + do_time_weight: bool, + read_provider: Arc, + ) -> Result { + if !evq.range().is_time() { + panic!(); + } + let stream = read_provider.read(evq, chconf); + let stream = Box::pin(stream); + let stream = super::basic::TimeBinnedStream::new(stream, netpod::BinnedRangeEnum::Time(range), do_time_weight); + let stream = stream.map(|item| match item { + Ok(x) => match x { + StreamItem::DataItem(x) => match x { + RangeCompletableItem::Data(mut x) => { + // TODO need a typed time binner + if let Some(x) = x.as_any_mut().downcast_mut::>() { + let y = x.clone(); + Ok(StreamItem::DataItem(RangeCompletableItem::Data(y))) + } else { + Err(::err::Error::with_msg_no_trace( + "GapFill expects incoming BinsDim0", + )) + } + } + RangeCompletableItem::RangeComplete => { + info!("BinnedFromEvents sees range final"); + Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) + } + }, + StreamItem::Log(x) => Ok(StreamItem::Log(x)), + StreamItem::Stats(x) => Ok(StreamItem::Stats(x)), + }, + Err(e) => Err(e), + }); + let ret = Self { + stream: Box::pin(stream), + }; + Ok(ret) + } +} + +impl Stream for BinnedFromEvents { + type Item = Sitemty>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + self.stream.poll_next_unpin(cx) + } +} diff --git a/crates/streams/src/timebin/fromlayers.rs b/crates/streams/src/timebin/fromlayers.rs index a518e46..c17f1ce 100644 --- a/crates/streams/src/timebin/fromlayers.rs +++ b/crates/streams/src/timebin/fromlayers.rs @@ -1,5 +1,7 @@ use super::cached::reader::CacheReadProvider; +use super::cached::reader::EventsReadProvider; use crate::tcprawclient::OpenBoxedBytesStreamsBox; +use crate::timebin::fromevents::BinnedFromEvents; use crate::timebin::grid::find_next_finer_bin_len; use err::thiserror; use err::ThisError; @@ -13,12 +15,17 @@ use items_0::streamitem::StreamItem; use items_0::timebin::TimeBinnableTy; use items_2::binsdim0::BinsDim0; use netpod::log::*; +use netpod::range::evrange::SeriesRange; use netpod::BinnedRange; use netpod::BinnedRangeEnum; +use netpod::ChConf; use netpod::ChannelTypeConfigGen; use netpod::DtMs; use netpod::ReqCtx; +use netpod::SeriesKind; use netpod::TsNano; +use query::api4::events::EventsSubQuery; +use query::api4::events::EventsSubQuerySelect; use query::api4::events::EventsSubQuerySettings; use query::transform::TransformQuery; use std::pin::Pin; @@ -29,8 +36,9 @@ use std::task::Poll; #[derive(Debug, ThisError)] #[cstm(name = "TimeBinnedFromLayers")] pub enum Error { - Logic, GapFill(#[from] super::gapfill::Error), + BinnedFromEvents(#[from] super::fromevents::Error), + SfDatabufferNotSupported, } type BoxedInput = Pin>> + Send>>; @@ -61,7 +69,8 @@ impl TimeBinnedFromLayers { range: BinnedRange, do_time_weight: bool, bin_len_layers: Vec, - cache_read_provider: Arc, + cache_read_provider: Arc, + events_read_provider: Arc, ) -> Result { info!( "{}::new {:?} {:?} {:?}", @@ -72,19 +81,20 @@ impl TimeBinnedFromLayers { ); let bin_len = DtMs::from_ms_u64(range.bin_len.ms()); if bin_len_layers.contains(&bin_len) { - info!("{}::new bin_len in layers", Self::type_name()); + info!("{}::new bin_len in layers {:?}", Self::type_name(), range); let inp = super::gapfill::GapFill::new( + "FromLayers".into(), ch_conf.clone(), transform_query.clone(), sub.clone(), log_level.clone(), ctx.clone(), - open_bytes.clone(), series, range, do_time_weight, bin_len_layers, cache_read_provider, + events_read_provider.clone(), )?; let ret = Self { ch_conf, @@ -99,22 +109,26 @@ impl TimeBinnedFromLayers { } else { match find_next_finer_bin_len(bin_len, &bin_len_layers) { Some(finer) => { - // TODO - // produce from binned sub-stream with additional binner. - let range = BinnedRange::from_nano_range(range.to_nano_range(), finer); - warn!("{}::new next finer {:?} {:?}", Self::type_name(), finer, range); + let range_finer = BinnedRange::from_nano_range(range.to_nano_range(), finer); + warn!( + "{}::new next finer from bins {:?} {:?}", + Self::type_name(), + finer, + range_finer + ); let inp = super::gapfill::GapFill::new( + "FromLayers".into(), ch_conf.clone(), transform_query.clone(), sub.clone(), log_level.clone(), ctx.clone(), - open_bytes.clone(), series, - range.clone(), + range_finer.clone(), do_time_weight, bin_len_layers, cache_read_provider, + events_read_provider.clone(), )?; let inp = super::basic::TimeBinnedStream::new( Box::pin(inp), @@ -133,10 +147,39 @@ impl TimeBinnedFromLayers { Ok(ret) } None => { - warn!("{}::new NO next finer", Self::type_name()); - // TODO - // produce from events - todo!() + warn!("{}::new next finer from events", Self::type_name()); + let series_range = SeriesRange::TimeRange(range.to_nano_range()); + let one_before_range = true; + let select = EventsSubQuerySelect::new( + ch_conf.clone(), + series_range, + one_before_range, + transform_query.clone(), + ); + let evq = EventsSubQuery::from_parts(select, sub.clone(), ctx.reqid().into(), log_level.clone()); + match &ch_conf { + ChannelTypeConfigGen::Scylla(chconf) => { + let inp = BinnedFromEvents::new( + range, + evq, + chconf.clone(), + do_time_weight, + events_read_provider, + )?; + let ret = Self { + ch_conf, + transform_query, + sub, + log_level, + ctx, + open_bytes, + inp: Box::pin(inp), + }; + warn!("{}::new setup from events", Self::type_name()); + Ok(ret) + } + ChannelTypeConfigGen::SfDatabuffer(_) => return Err(Error::SfDatabufferNotSupported), + } } } } diff --git a/crates/streams/src/timebin/gapfill.rs b/crates/streams/src/timebin/gapfill.rs index cb06e0a..ef4a4f5 100644 --- a/crates/streams/src/timebin/gapfill.rs +++ b/crates/streams/src/timebin/gapfill.rs @@ -1,20 +1,27 @@ use super::cached::reader::CacheReadProvider; -use crate::tcprawclient::OpenBoxedBytesStreamsBox; +use super::cached::reader::EventsReadProvider; +use crate::timebin::fromevents::BinnedFromEvents; use err::thiserror; use err::ThisError; +use futures_util::FutureExt; use futures_util::Stream; use futures_util::StreamExt; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; +use items_0::Empty; +use items_0::WithLen; use items_2::binsdim0::BinsDim0; use netpod::log::*; use netpod::range::evrange::NanoRange; +use netpod::range::evrange::SeriesRange; use netpod::BinnedRange; use netpod::ChannelTypeConfigGen; use netpod::DtMs; use netpod::ReqCtx; use netpod::TsNano; +use query::api4::events::EventsSubQuery; +use query::api4::events::EventsSubQuerySelect; use query::api4::events::EventsSubQuerySettings; use query::transform::TransformQuery; use std::pin::Pin; @@ -22,11 +29,27 @@ use std::sync::Arc; use std::task::Context; use std::task::Poll; +#[allow(unused)] +macro_rules! debug_init { ($($arg:tt)*) => ( if true { debug!($($arg)*); } ) } + +#[allow(unused)] +macro_rules! debug_setup { ($($arg:tt)*) => ( if true { debug!($($arg)*); } ) } + +#[allow(unused)] +macro_rules! debug_cache { ($($arg:tt)*) => ( if true { debug!($($arg)*); } ) } + +#[allow(unused)] +macro_rules! trace_handle { ($($arg:tt)*) => ( if true { trace!($($arg)*); } ) } + #[derive(Debug, ThisError)] #[cstm(name = "BinCachedGapFill")] pub enum Error { CacheReader(#[from] super::cached::reader::Error), GapFromFiner, + #[error("InputBeforeRange({0}, {1})")] + InputBeforeRange(NanoRange, BinnedRange), + SfDatabufferNotSupported, + EventsReader(#[from] super::fromevents::Error), } type INP = Pin>> + Send>>; @@ -34,41 +57,51 @@ type INP = Pin>> + Send>>; // Try to read from cache for the given bin len. // For gaps in the stream, construct an alternative input from finer bin len with a binner. pub struct GapFill { + dbgname: String, ch_conf: ChannelTypeConfigGen, transform_query: TransformQuery, sub: EventsSubQuerySettings, log_level: String, ctx: Arc, - open_bytes: OpenBoxedBytesStreamsBox, series: u64, range: BinnedRange, do_time_weight: bool, bin_len_layers: Vec, - inp: INP, + inp: Option, + inp_range_final: bool, inp_buf: Option>, inp_finer: Option, + inp_finer_range_final: bool, + inp_finer_range_final_cnt: u32, + inp_finer_range_final_max: u32, + inp_finer_fills_gap: bool, last_bin_ts2: Option, exp_finer_range: NanoRange, cache_read_provider: Arc, + events_read_provider: Arc, + bins_for_cache_write: BinsDim0, + done: bool, + cache_writing: Option, } impl GapFill { // bin_len of the given range must be a cacheable bin_len. pub fn new( + dbgname_parent: String, ch_conf: ChannelTypeConfigGen, transform_query: TransformQuery, sub: EventsSubQuerySettings, log_level: String, ctx: Arc, - open_bytes: OpenBoxedBytesStreamsBox, - series: u64, range: BinnedRange, do_time_weight: bool, bin_len_layers: Vec, cache_read_provider: Arc, + events_read_provider: Arc, ) -> Result { - // super::fromlayers::TimeBinnedFromLayers::new(series, range, do_time_weight, bin_len_layers)?; + let dbgname = format!("{}--[{}]", dbgname_parent, range); + debug_init!("new dbgname {}", dbgname); let inp = super::cached::reader::CachedReader::new( series, range.bin_len.to_dt_ms(), @@ -80,28 +113,38 @@ impl GapFill { Err(e) => Err(::err::Error::from_string(e)), }); let ret = Self { + dbgname, ch_conf, transform_query, sub, log_level, ctx, - open_bytes, series, range, do_time_weight, bin_len_layers, - inp: Box::pin(inp), + inp: Some(Box::pin(inp)), + inp_range_final: false, inp_buf: None, inp_finer: None, + inp_finer_range_final: false, + inp_finer_range_final_cnt: 0, + inp_finer_range_final_max: 0, + inp_finer_fills_gap: false, last_bin_ts2: None, // TODO just dummy: exp_finer_range: NanoRange { beg: 0, end: 0 }, cache_read_provider, + events_read_provider, + bins_for_cache_write: BinsDim0::empty(), + done: false, + cache_writing: None, }; Ok(ret) } fn handle_bins_finer(mut self: Pin<&mut Self>, bins: BinsDim0) -> Result, Error> { + trace_handle!("{} handle_bins_finer {}", self.dbgname, bins); for (&ts1, &ts2) in bins.ts1s.iter().zip(&bins.ts2s) { if let Some(last) = self.last_bin_ts2 { if ts1 != last.ns() { @@ -110,25 +153,37 @@ impl GapFill { } self.last_bin_ts2 = Some(TsNano::from_ns(ts2)); } - - // TODO keep bins from finer source. - // Only write bins to cache if we receive another - + if bins.len() != 0 { + bins.clone().drain_into(&mut self.bins_for_cache_write, 0..bins.len()); + } + self.cache_write_intermediate()?; // TODO make sure that input does not send "made-up" empty future bins. // On the other hand, if the request is over past range, but the channel was silent ever since? // Then we should in principle know that from is-alive status checking. // So, until then, allow made-up bins? // Maybe, for now, only write those bins before some last non-zero-count bin. The only safe way. - Ok(bins) } fn handle_bins(mut self: Pin<&mut Self>, bins: BinsDim0) -> Result, Error> { + trace_handle!("{} handle_bins {}", self.dbgname, bins); // TODO could use an interface to iterate over opaque bin items that only expose // edge and count information with all remaining values opaque. for (i, (&ts1, &ts2)) in bins.ts1s.iter().zip(&bins.ts2s).enumerate() { + if ts1 < self.range.nano_beg().ns() { + return Err(Error::InputBeforeRange( + NanoRange::from_ns_u64(ts1, ts2), + self.range.clone(), + )); + } if let Some(last) = self.last_bin_ts2 { if ts1 != last.ns() { + trace_handle!( + "{} detect a gap ------------- SETUP SUB STREAM ts1 {} last {}", + self.dbgname, + ts1, + last + ); let mut ret = as items_0::Empty>::empty(); let mut bins = bins; bins.drain_into(&mut ret, 0..i); @@ -137,7 +192,7 @@ impl GapFill { beg: last.ns(), end: ts1, }; - self.setup_inp_finer(range)?; + self.setup_inp_finer(range, true)?; return Ok(ret); } } @@ -146,25 +201,35 @@ impl GapFill { Ok(bins) } - fn setup_inp_finer(mut self: Pin<&mut Self>, range: NanoRange) -> Result<(), Error> { - // Set up range to fill from finer. + fn setup_inp_finer(mut self: Pin<&mut Self>, range: NanoRange, inp_finer_fills_gap: bool) -> Result<(), Error> { + self.inp_finer_range_final = false; + self.inp_finer_range_final_max += 1; + self.inp_finer_fills_gap = inp_finer_fills_gap; self.exp_finer_range = range.clone(); if let Some(bin_len_finer) = super::grid::find_next_finer_bin_len(self.range.bin_len.to_dt_ms(), &self.bin_len_layers) { + debug_setup!( + "{} setup_inp_finer next finer from bins {} {} from {}", + self.dbgname, + range, + bin_len_finer, + self.range.bin_len.to_dt_ms() + ); let range_finer = BinnedRange::from_nano_range(range, bin_len_finer); let inp_finer = GapFill::new( + self.dbgname.clone(), self.ch_conf.clone(), self.transform_query.clone(), self.sub.clone(), self.log_level.clone(), self.ctx.clone(), - self.open_bytes.clone(), self.series, range_finer.clone(), self.do_time_weight, self.bin_len_layers.clone(), self.cache_read_provider.clone(), + self.events_read_provider.clone(), )?; let stream = Box::pin(inp_finer); let do_time_weight = self.do_time_weight; @@ -173,51 +238,77 @@ impl GapFill { super::basic::TimeBinnedStream::new(stream, netpod::BinnedRangeEnum::Time(range), do_time_weight); self.inp_finer = Some(Box::pin(stream)); } else { - let do_time_weight = self.do_time_weight; + debug_setup!("{} setup_inp_finer next finer from events {}", self.dbgname, range); + let series_range = SeriesRange::TimeRange(range.clone()); let one_before_range = true; - let range = BinnedRange::from_nano_range(range, self.range.bin_len.to_dt_ms()); - let stream = crate::timebinnedjson::TimeBinnableStream::new( - range.full_range(), - one_before_range, + let select = EventsSubQuerySelect::new( self.ch_conf.clone(), + series_range, + one_before_range, self.transform_query.clone(), - self.sub.clone(), - self.log_level.clone(), - self.ctx.clone(), - self.open_bytes.clone(), ); - // let stream: Pin> = stream; - let stream = Box::pin(stream); - // TODO rename TimeBinnedStream to make it more clear that it is the component which initiates the time binning. - let stream = - super::basic::TimeBinnedStream::new(stream, netpod::BinnedRangeEnum::Time(range), do_time_weight); - let stream = stream.map(|item| match item { - Ok(x) => match x { - StreamItem::DataItem(x) => match x { - RangeCompletableItem::Data(mut x) => { - // TODO need a typed time binner - if let Some(x) = x.as_any_mut().downcast_mut::>() { - let y = x.clone(); - Ok(StreamItem::DataItem(RangeCompletableItem::Data(y))) - } else { - Err(::err::Error::with_msg_no_trace( - "GapFill expects incoming BinsDim0", - )) - } - } - RangeCompletableItem::RangeComplete => { - Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) - } - }, - StreamItem::Log(x) => Ok(StreamItem::Log(x)), - StreamItem::Stats(x) => Ok(StreamItem::Stats(x)), - }, - Err(e) => Err(e), - }); - // let stream: Pin< - // Box>> + Send>, - // > = Box::pin(stream); - self.inp_finer = Some(Box::pin(stream)); + let evq = EventsSubQuery::from_parts( + select, + self.sub.clone(), + self.ctx.reqid().into(), + self.log_level.clone(), + ); + match &self.ch_conf { + ChannelTypeConfigGen::Scylla(chconf) => { + let range = BinnedRange::from_nano_range(range.clone(), self.range.bin_len.to_dt_ms()); + let inp = BinnedFromEvents::new( + range, + evq, + chconf.clone(), + self.do_time_weight, + self.events_read_provider.clone(), + )?; + self.inp_finer = Some(Box::pin(inp)); + } + ChannelTypeConfigGen::SfDatabuffer(_) => return Err(Error::SfDatabufferNotSupported), + } + } + Ok(()) + } + + fn cache_write(mut self: Pin<&mut Self>, bins: BinsDim0) -> Result<(), Error> { + self.cache_writing = Some(self.cache_read_provider.write(self.series, bins)); + Ok(()) + } + + fn cache_write_on_end(mut self: Pin<&mut Self>) -> Result<(), Error> { + if self.inp_finer_fills_gap { + // TODO can consider all incoming bins as final by assumption. + } + let aa = &self.bins_for_cache_write; + if aa.len() >= 2 { + for (i, (&c1, &c2)) in aa.counts.iter().rev().zip(aa.counts.iter().rev().skip(1)).enumerate() { + if c1 != 0 { + let n = aa.len() - (1 + i); + debug_cache!("{} cache_write_on_end consider {} for write", self.dbgname, n); + let mut bins_write = BinsDim0::empty(); + self.bins_for_cache_write.drain_into(&mut bins_write, 0..n); + self.cache_write(bins_write)?; + break; + } + } + } + Ok(()) + } + + fn cache_write_intermediate(mut self: Pin<&mut Self>) -> Result<(), Error> { + let aa = &self.bins_for_cache_write; + if aa.len() >= 2 { + for (i, (&c1, &c2)) in aa.counts.iter().rev().zip(aa.counts.iter().rev().skip(1)).enumerate() { + if c1 != 0 { + let n = aa.len() - (1 + i); + debug_cache!("{} cache_write_intermediate consider {} for write", self.dbgname, n); + let mut bins_write = BinsDim0::empty(); + self.bins_for_cache_write.drain_into(&mut bins_write, 0..n); + self.cache_write(bins_write)?; + break; + } + } } Ok(()) } @@ -229,7 +320,21 @@ impl Stream for GapFill { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; loop { - break if let Some(inp_finer) = self.inp_finer.as_mut() { + break if self.done { + Ready(None) + } else if let Some(fut) = self.cache_writing.as_mut() { + match fut.poll_unpin(cx) { + Ready(Ok(())) => { + self.cache_writing = None; + continue; + } + Ready(Err(e)) => { + self.cache_writing = None; + Ready(Some(Err(::err::Error::from_string(e)))) + } + Pending => Pending, + } + } else if let Some(inp_finer) = self.inp_finer.as_mut() { // TODO // detect also gaps here: if gap from finer, then error. // on CacheUsage Use or Rereate: @@ -243,26 +348,48 @@ impl Stream for GapFill { } } StreamItem::DataItem(RangeCompletableItem::RangeComplete) => { - Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) + trace_handle!("{} RECV RANGE FINAL", self.dbgname); + self.inp_finer_range_final = true; + self.inp_finer_range_final_cnt += 1; + match self.as_mut().cache_write_on_end() { + Ok(()) => continue, + Err(e) => Ready(Some(Err(::err::Error::from_string(e)))), + } } StreamItem::Log(x) => Ready(Some(Ok(StreamItem::Log(x)))), StreamItem::Stats(x) => Ready(Some(Ok(StreamItem::Stats(x)))), }, Ready(Some(Err(e))) => Ready(Some(Err(::err::Error::from_string(e)))), Ready(None) => { + trace_handle!( + "{} inp_finer Ready(None) last_bin_ts2 {:?}", + self.dbgname, + self.last_bin_ts2 + ); + self.inp_finer = None; if let Some(j) = self.last_bin_ts2 { if j.ns() != self.exp_finer_range.end() { + trace_handle!( + "{} inp_finer Ready(None) last_bin_ts2 {:?} exp_finer_range {:?}", + self.dbgname, + self.last_bin_ts2, + self.exp_finer_range + ); Ready(Some(Err(::err::Error::from_string( "finer input didn't deliver to the end", )))) } else { - self.last_bin_ts2 = None; self.exp_finer_range = NanoRange { beg: 0, end: 0 }; - self.inp_finer = None; continue; } } else { - Ready(Some(Err(::err::Error::from_string("finer input delivered nothing")))) + error!( + "{} inp_finer Ready(None) last_bin_ts2 {:?}", + self.dbgname, self.last_bin_ts2 + ); + Ready(Some(Err(::err::Error::from_string( + "finer input delivered nothing, received nothing at all so far", + )))) } } Pending => Pending, @@ -272,46 +399,52 @@ impl Stream for GapFill { Ok(x) => Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))))), Err(e) => Ready(Some(Err(::err::Error::from_string(e)))), } - } else { - match self.inp.poll_next_unpin(cx) { + } else if let Some(inp) = self.inp.as_mut() { + match inp.poll_next_unpin(cx) { Ready(Some(Ok(x))) => match x { StreamItem::DataItem(RangeCompletableItem::Data(x)) => match self.as_mut().handle_bins(x) { Ok(x) => Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))))), Err(e) => Ready(Some(Err(::err::Error::from_string(e)))), }, StreamItem::DataItem(RangeCompletableItem::RangeComplete) => { - Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) + self.inp_range_final = true; + continue; } StreamItem::Log(x) => Ready(Some(Ok(StreamItem::Log(x)))), StreamItem::Stats(x) => Ready(Some(Ok(StreamItem::Stats(x)))), }, Ready(Some(Err(e))) => Ready(Some(Err(::err::Error::from_string(e)))), Ready(None) => { + self.inp = None; // TODO assert that we have emitted up to the requested range. // If not, request the remaining range from "finer" input. if let Some(j) = self.last_bin_ts2 { - if j.ns() != self.exp_finer_range.end() { + if j != self.range.nano_end() { let range = NanoRange { beg: j.ns(), end: self.range.full_range().end(), }; - match self.as_mut().setup_inp_finer(range) { + warn!( + "----- RECEIVED SOMETHING, BUT NOT ALL, setup rest from finer {} {} {}", + self.range, j, range + ); + match self.as_mut().setup_inp_finer(range, false) { Ok(()) => { continue; } Err(e) => Ready(Some(Err(::err::Error::from_string(e)))), } } else { - // self.last_bin_ts2 = None; - // self.exp_finer_range = NanoRange { beg: 0, end: 0 }; - // self.inp_finer = None; - // continue; + info!("----- RECEIVED EVERYTHING"); Ready(None) } } else { - warn!("----- NOTHING IN CACHE, SETUP FULL FROM FINER"); - let range = self.range.full_range(); - match self.as_mut().setup_inp_finer(range) { + let range = self.range.to_nano_range(); + warn!( + "----- RECEIVED NOTHING SO FAR AT ALL, setup full range from finer {} {}", + self.range, range + ); + match self.as_mut().setup_inp_finer(range, false) { Ok(()) => { continue; } @@ -321,24 +454,16 @@ impl Stream for GapFill { } Pending => Pending, } + } else { + self.done = true; + if self.inp_finer_range_final_cnt == self.inp_finer_range_final_max { + trace_handle!("{} RANGE FINAL ALL", self.dbgname); + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) + } else { + trace_handle!("{} SUBSTREAMS NOT FINAL", self.dbgname); + continue; + } }; } - // When do we detect a gap: - // - when the current item poses a gap to the last. - // - when we see EOS before the requested range is filled. - // Requirements: - // Must always request fully cache-aligned ranges. - // Must remember where the last bin ended. - - // When a gap is detected: - // - buffer the current item, if there is one (can also be EOS). - // - create a new producer of bin: - // - GapFillwith finer range? FromFiner(series, bin_len, range) ? - // - TimeBinnedFromLayers for a bin_len in layers would also go directly into GapFill. - // what does FromFiner bring to the table? - // It does not attempt to read the given bin-len from a cache, because we just did attempt that. - // It still requires that bin-len is cacheable. (NO! it must work with the layering that I passed!) - // Then it finds the next cacheable - // Ready(None) } } diff --git a/crates/streams/src/timebinnedjson.rs b/crates/streams/src/timebinnedjson.rs index 40fd6a8..c0aca15 100644 --- a/crates/streams/src/timebinnedjson.rs +++ b/crates/streams/src/timebinnedjson.rs @@ -3,6 +3,7 @@ use crate::rangefilter2::RangeFilter2; use crate::tcprawclient::container_stream_from_bytes_stream; use crate::tcprawclient::make_sub_query; use crate::tcprawclient::OpenBoxedBytesStreamsBox; +use crate::timebin::cached::reader::EventsReadProvider; use crate::timebin::CacheReadProvider; use crate::timebin::TimeBinnedStream; use crate::transform::build_merged_event_transform; @@ -310,10 +311,11 @@ async fn timebinned_stream( ctx: &ReqCtx, open_bytes: OpenBoxedBytesStreamsBox, cache_read_provider: Option>, + events_read_provider: Option>, ) -> Result>> + Send>>, Error> { use netpod::query::CacheUsage; - match (query.cache_usage(), cache_read_provider) { - (CacheUsage::Use | CacheUsage::Recreate, Some(cache_read_provider)) => { + match (query.cache_usage(), cache_read_provider, events_read_provider) { + (CacheUsage::Use | CacheUsage::Recreate, Some(cache_read_provider), Some(events_read_provider)) => { let series = if let Some(x) = query.channel().series() { x } else { @@ -351,6 +353,7 @@ async fn timebinned_stream( do_time_weight, bin_len_layers, cache_read_provider, + events_read_provider, ) .map_err(Error::from_string)?; let stream = stream.map(|item| { @@ -408,6 +411,7 @@ pub async fn timebinned_json( ctx: &ReqCtx, open_bytes: OpenBoxedBytesStreamsBox, cache_read_provider: Option>, + events_read_provider: Option>, ) -> Result { let deadline = Instant::now() + query.timeout_content().unwrap_or(Duration::from_millis(5000)); let binned_range = BinnedRangeEnum::covering_range(query.range().clone(), query.bin_count())?; @@ -421,6 +425,7 @@ pub async fn timebinned_json( ctx, open_bytes, cache_read_provider, + events_read_provider, ) .await?; let stream = timebinned_to_collectable(stream); diff --git a/crates/taskrun/src/taskrun.rs b/crates/taskrun/src/taskrun.rs index a780b2e..8cbfcb3 100644 --- a/crates/taskrun/src/taskrun.rs +++ b/crates/taskrun/src/taskrun.rs @@ -230,6 +230,15 @@ fn tracing_init_inner(mode: TracingMode) -> Result<(), Error> { break; } } + for e in &tracing_trace { + tmp1.clear(); + tmp1.push_str(e); + tmp1.push_str("::"); + if meta.target() == &tmp1[..tmp1.len() - 2] || meta.target().starts_with(&tmp1) { + target_match = true; + break; + } + } if target_match { let mut sr = ctx.lookup_current(); let mut allow = false;