diff --git a/apidoc/src/search.md b/apidoc/src/search.md index dc28ffe..0c9cb3f 100644 --- a/apidoc/src/search.md +++ b/apidoc/src/search.md @@ -1,14 +1,15 @@ # Search Channels -To search for e.g. DBPM channels in `sf-databuffer` that end in `:Q1` the request -looks like this: +Example search request: ```bash -curl "https://data-api.psi.ch/api/4/search/channel?backend=sf-archiver&nameRegex=S10CB08-KBOC-HP.*PI-OUT" +curl "https://data-api.psi.ch/api/4/search/channel?nameRegex=S10CB08-KBOC-HP.*PI-OUT" ``` Parameters: -- `icase=true` uses case-insensitive search (default: case-sensitive). +- `nameRegex` searches for a channel name by regular expression. +- `backend` restricts search to the given backend (optional). +- `icase=true` uses case-insensitive search (optional, default case-sensitive). Example response: diff --git a/crates/daqbuffer/Cargo.toml b/crates/daqbuffer/Cargo.toml index 87b4d1b..8ffe5a8 100644 --- a/crates/daqbuffer/Cargo.toml +++ b/crates/daqbuffer/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqbuffer" -version = "0.5.3-aa.4" +version = "0.5.3-aa.5" authors = ["Dominik Werder "] edition = "2021" diff --git a/crates/disk/src/dataopen.rs b/crates/disk/src/dataopen.rs index fd661a1..4dc3e3c 100644 --- a/crates/disk/src/dataopen.rs +++ b/crates/disk/src/dataopen.rs @@ -51,10 +51,13 @@ async fn position_file( match OpenOptions::new().read(true).open(&index_path).await { Ok(mut index_file) => { let meta = index_file.metadata().await?; - if meta.len() > 1024 * 1024 * 120 { + if meta.len() > 1024 * 1024 * 500 { let msg = format!("too large index file {} bytes for {:?}", meta.len(), index_path); error!("{}", msg); return Err(Error::with_msg(msg)); + } else if meta.len() > 1024 * 1024 * 200 { + let msg = format!("very large index file {} bytes for {:?}", meta.len(), index_path); + warn!("{}", msg); } else if meta.len() > 1024 * 1024 * 80 { let msg = format!("very large index file {} bytes for {:?}", meta.len(), index_path); warn!("{}", msg); @@ -184,12 +187,25 @@ async fn position_file( } pub struct OpenedFile { + pub pos: u64, pub path: PathBuf, pub file: Option, pub positioned: bool, pub index: bool, pub nreads: u32, - pub pos: u64, +} + +impl fmt::Debug for OpenedFile { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("OpenedFile") + .field("pos", &self.pos) + .field("path", &self.path) + .field("file", &self.file.is_some()) + .field("positioned", &self.positioned) + .field("index", &self.index) + .field("nreads", &self.nreads) + .finish() + } } #[derive(Debug)] @@ -198,18 +214,6 @@ pub struct OpenedFileSet { pub files: Vec, } -impl fmt::Debug for OpenedFile { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("OpenedFile") - .field("path", &self.path) - .field("file", &self.file) - .field("positioned", &self.positioned) - .field("index", &self.index) - .field("nreads", &self.nreads) - .finish() - } -} - pub fn open_files( range: &NanoRange, fetch_info: &SfChFetchInfo, @@ -299,7 +303,7 @@ pub fn open_expanded_files( Ok(_) => {} Err(e) => { // To be expected - debug!("open_files channel send error {:?}", e); + debug!("open_expanded_files channel send error {:?}", e); } }, } @@ -345,18 +349,19 @@ async fn open_expanded_files_inner( ) -> Result<(), Error> { let fetch_info = fetch_info.clone(); let timebins = get_timebins(&fetch_info, node.clone()).await?; + debug!("timebins {timebins:?}"); if timebins.len() == 0 { return Ok(()); } let mut p1 = None; - for (i1, tb) in timebins.iter().enumerate().rev() { + for (i, tb) in timebins.iter().enumerate().rev() { let ts_bin = TsNano::from_ns(tb * fetch_info.bs().ns()); if ts_bin.ns() <= range.beg { - p1 = Some(i1); + p1 = Some(i); break; } } - let mut p1 = if let Some(i1) = p1 { i1 } else { 0 }; + let mut p1 = if let Some(i) = p1 { i } else { 0 }; if p1 >= timebins.len() { return Err(Error::with_msg(format!( "logic error p1 {} range {:?} fetch_info {:?}", @@ -370,9 +375,11 @@ async fn open_expanded_files_inner( for path in paths::datapaths_for_timebin(tb, &fetch_info, &node).await? { let w = position_file(&path, range, true, false).await?; if w.found { - debug!("----- open_expanded_files_inner w.found for {:?}", path); + debug!("----- open_expanded_files_inner FOUND tb {:?} path {:?}", tb, path); a.push(w.file); found_pre = true; + } else { + debug!("----- open_expanded_files_inner UNFND tb {:?} path {:?}", tb, path); } } let h = OpenedFileSet { timebin: tb, files: a }; diff --git a/crates/disk/src/eventchunker.rs b/crates/disk/src/eventchunker.rs index 6f3edcf..288e29f 100644 --- a/crates/disk/src/eventchunker.rs +++ b/crates/disk/src/eventchunker.rs @@ -73,7 +73,6 @@ pub struct EventChunker { node_ix: usize, dbg_path: PathBuf, last_ts: u64, - expand: bool, item_len_emit_histo: HistoLog2, seen_before_range_count: usize, seen_after_range_count: usize, @@ -144,7 +143,6 @@ impl EventChunker { stats_conf: EventChunkerConf, node_ix: usize, dbg_path: PathBuf, - expand: bool, ) -> Self { debug!("{}::{} node {}", Self::self_name(), "from_start", node_ix); let need_min_max = match fetch_info.shape() { @@ -172,7 +170,6 @@ impl EventChunker { dbg_path, node_ix, last_ts: 0, - expand, item_len_emit_histo: HistoLog2::new(0), seen_before_range_count: 0, seen_after_range_count: 0, @@ -188,7 +185,6 @@ impl EventChunker { } } - // TODO `expand` flag usage pub fn from_event_boundary( inp: Pin> + Send>>, fetch_info: SfChFetchInfo, @@ -196,10 +192,9 @@ impl EventChunker { stats_conf: EventChunkerConf, node_ix: usize, dbg_path: PathBuf, - expand: bool, ) -> Self { debug!("{}::{} node {}", Self::self_name(), "from_event_boundary", node_ix); - let mut ret = Self::from_start(inp, fetch_info, range, stats_conf, node_ix, dbg_path, expand); + let mut ret = Self::from_start(inp, fetch_info, range, stats_conf, node_ix, dbg_path); ret.state = DataFileState::Event; ret.need_min = 4; ret.inp.set_need_min(4); @@ -324,15 +319,15 @@ impl EventChunker { discard = true; self.discard_count_range += 1; self.seen_after_range_count += 1; - if !self.expand || self.seen_after_range_count >= 2 { - self.seen_beyond_range = true; + self.seen_beyond_range = true; + if self.seen_after_range_count >= 2 { self.data_emit_complete = true; break; } } if ts < self.range.beg { - discard = true; - self.discard_count_range += 1; + // discard = true; + // self.discard_count_range += 1; self.seen_before_range_count += 1; if self.seen_before_range_count < 20 { let msg = format!( @@ -349,6 +344,8 @@ impl EventChunker { self.dbg_path ); warn!("{}", msg); + let item = LogItem::from_node(self.node_ix, Level::INFO, msg); + self.log_items.push_back(item); } if self.seen_before_range_count > 100 { let msg = format!( @@ -474,13 +471,17 @@ impl EventChunker { shape_this, comp_this, ); - match ret.shape_derived(ret.len() - 1, self.fetch_info.shape()) { + match ret.shape_derived( + ret.len() - 1, + self.fetch_info.scalar_type(), + self.fetch_info.shape(), + ) { Ok(sh) => { if sh.ne(self.fetch_info.shape()) { self.discard_count_shape_derived += 1; ret.pop_back(); let msg = format!( - "shape_derived mismatch {:?} {:?} {:?}", + "EventChunker shape_derived mismatch {:?} {:?} {:?}", self.fetch_info.scalar_type(), self.fetch_info.shape(), sh, @@ -493,7 +494,7 @@ impl EventChunker { self.discard_count_shape_derived_err += 1; ret.pop_back(); let msg = format!( - "shape_derived error {} {:?} {:?}", + "EventChunker shape_derived error {} {:?} {:?}", e, self.fetch_info.scalar_type(), self.fetch_info.shape(), diff --git a/crates/disk/src/eventchunkermultifile.rs b/crates/disk/src/eventchunkermultifile.rs index 934a2a0..764c5e3 100644 --- a/crates/disk/src/eventchunkermultifile.rs +++ b/crates/disk/src/eventchunkermultifile.rs @@ -39,7 +39,7 @@ pub struct EventChunkerMultifile { range: NanoRange, files_count: u32, node_ix: usize, - expand: bool, + one_before: bool, max_ts: u64, out_max_len: usize, emit_count: usize, @@ -64,12 +64,12 @@ impl EventChunkerMultifile { node_ix: usize, disk_io_tune: DiskIoTune, event_chunker_conf: EventChunkerConf, - expand: bool, + one_before: bool, out_max_len: usize, reqctx: ReqCtxArc, ) -> Self { - debug!("EventChunkerMultifile expand {expand}"); - let file_chan = if expand { + debug!("EventChunkerMultifile one_before {one_before}"); + let file_chan = if one_before { open_expanded_files(&range, &fetch_info, node) } else { open_files(&range, &fetch_info, reqctx.reqid(), node) @@ -83,7 +83,7 @@ impl EventChunkerMultifile { range, files_count: 0, node_ix, - expand, + one_before, max_ts: 0, out_max_len, emit_count: 0, @@ -129,6 +129,9 @@ impl Stream for EventChunkerMultifile { if h.len() > 0 { let min = h.tss.iter().fold(u64::MAX, |a, &x| a.min(x)); let max = h.tss.iter().fold(u64::MIN, |a, &x| a.max(x)); + if min < self.range.beg() { + debug!("ITEM BEFORE RANGE (how many?)"); + } if min <= self.max_ts { let msg = format!("EventChunkerMultifile repeated or unordered ts {}", min); error!("{}", msg); @@ -180,13 +183,19 @@ impl Stream for EventChunkerMultifile { None => match self.file_chan.poll_next_unpin(cx) { Ready(Some(k)) => match k { Ok(ofs) => { + let msg = format!("received files for timebin {:?}", ofs.timebin); + let item = LogItem::from_node(self.node_ix, Level::INFO, msg); + self.log_queue.push_back(item); + for e in &ofs.files { + let msg = format!("file {:?}", e); + let item = LogItem::from_node(self.node_ix, Level::INFO, msg); + self.log_queue.push_back(item); + } self.files_count += ofs.files.len() as u32; if ofs.files.len() == 1 { let mut ofs = ofs; let file = ofs.files.pop().unwrap(); let path = file.path; - let msg = format!("use opened files {:?}", ofs); - let item = LogItem::from_node(self.node_ix, Level::DEBUG, msg); match file.file { Some(file) => { let inp = Box::pin(crate::file_content_stream( @@ -202,22 +211,19 @@ impl Stream for EventChunkerMultifile { self.event_chunker_conf.clone(), self.node_ix, path.clone(), - self.expand, ); - let filtered = RangeFilter2::new(chunker, self.range.clone(), self.expand); + let filtered = + RangeFilter2::new(chunker, self.range.clone(), self.one_before); self.evs = Some(Box::pin(filtered)); } None => {} } - Ready(Some(Ok(StreamItem::Log(item)))) + continue; } else if ofs.files.len() == 0 { let msg = format!("use opened files {:?} no files", ofs); let item = LogItem::from_node(self.node_ix, Level::DEBUG, msg); Ready(Some(Ok(StreamItem::Log(item)))) } else { - // let paths: Vec<_> = ofs.files.iter().map(|x| &x.path).collect(); - let msg = format!("use opened files {:?} locally merged", ofs); - let item = LogItem::from_node(self.node_ix, Level::DEBUG, msg); let mut chunkers = Vec::new(); for of in ofs.files { if let Some(file) = of.file { @@ -234,14 +240,15 @@ impl Stream for EventChunkerMultifile { self.event_chunker_conf.clone(), self.node_ix, of.path.clone(), - self.expand, ); chunkers.push(Box::pin(chunker) as _); } } let merged = Merger::new(chunkers, Some(self.out_max_len as u32)); - let filtered = RangeFilter2::new(merged, self.range.clone(), self.expand); + let filtered = RangeFilter2::new(merged, self.range.clone(), self.one_before); self.evs = Some(Box::pin(filtered)); + let msg = format!("LOCALLY MERGED"); + let item = LogItem::from_node(self.node_ix, Level::DEBUG, msg); Ready(Some(Ok(StreamItem::Log(item)))) } } diff --git a/crates/disk/src/eventfilter.rs b/crates/disk/src/eventfilter.rs index 7d7b7a3..f043525 100644 --- a/crates/disk/src/eventfilter.rs +++ b/crates/disk/src/eventfilter.rs @@ -6,6 +6,7 @@ use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; use items_0::WithLen; use items_2::eventfull::EventFull; +use netpod::ScalarType; use netpod::Shape; use std::collections::VecDeque; use std::pin::Pin; @@ -15,6 +16,7 @@ use tracing::Level; pub struct EventFullShapeFilter { inp: INP, + scalar_type_exp: ScalarType, shape_exp: Shape, node_ix: usize, log_items: VecDeque, @@ -25,11 +27,14 @@ impl EventFullShapeFilter { let node_ix = self.node_ix; let p: Vec<_> = (0..item.len()) .map(|i| { - let sh = item.shape_derived(i, &self.shape_exp); + let sh = item.shape_derived(i, &self.scalar_type_exp, &self.shape_exp); match sh { Ok(sh) => { if sh.ne(&self.shape_exp) { - let msg = format!("shape_derived mismatch {:?} {:?}", sh, self.shape_exp); + let msg = format!( + "EventFullShapeFilter shape_derived mismatch {:?} {:?}", + sh, self.shape_exp + ); let item = LogItem::from_node(node_ix, Level::WARN, msg); self.log_items.push_back(item); false @@ -38,7 +43,10 @@ impl EventFullShapeFilter { } } Err(_) => { - let msg = format!("shape_derived mismatch {:?} {:?}", sh, self.shape_exp); + let msg = format!( + "EventFullShapeFilter shape_derived mismatch {:?} {:?}", + sh, self.shape_exp + ); let item = LogItem::from_node(self.node_ix, Level::WARN, msg); self.log_items.push_back(item); false diff --git a/crates/disk/src/raw/conn.rs b/crates/disk/src/raw/conn.rs index 5f3fee0..29832dd 100644 --- a/crates/disk/src/raw/conn.rs +++ b/crates/disk/src/raw/conn.rs @@ -62,11 +62,8 @@ pub async fn make_event_pipe( ) -> Result> + Send>>, Error> { // sf-databuffer type backends identify channels by their (backend, name) only. let range = evq.range().clone(); - let one_before = evq.transform().need_one_before_range(); - info!( - "make_event_pipe need_expand {need_expand} {evq:?}", - need_expand = one_before - ); + let one_before = evq.need_one_before_range(); + info!("make_event_pipe one_before {one_before} {evq:?}"); let event_chunker_conf = EventChunkerConf::new(ByteSize::from_kb(1024)); // TODO should not need this for correctness. // Should limit based on return size and latency. @@ -94,7 +91,7 @@ pub async fn make_event_pipe( pub fn make_event_blobs_stream( range: NanoRange, fetch_info: SfChFetchInfo, - expand: bool, + one_before: bool, event_chunker_conf: EventChunkerConf, disk_io_tune: DiskIoTune, reqctx: ReqCtxArc, @@ -115,7 +112,7 @@ pub fn make_event_blobs_stream( ncc.ix, disk_io_tune, event_chunker_conf, - expand, + one_before, out_max_len, reqctx, ); @@ -128,13 +125,13 @@ pub fn make_event_blobs_pipe_real( reqctx: ReqCtxArc, ncc: &NodeConfigCached, ) -> Result> + Send>>, Error> { - let expand = subq.transform().need_one_before_range(); + let one_before = subq.need_one_before_range(); let range = subq.range(); let event_chunker_conf = EventChunkerConf::new(ByteSize::from_kb(1024)); let event_blobs = make_event_blobs_stream( range.try_into()?, fetch_info.clone(), - expand, + one_before, event_chunker_conf, subq.disk_io_tune(), reqctx, diff --git a/crates/dq/src/bin/dq.rs b/crates/dq/src/bin/dq.rs index 80bdcfa..a6bf664 100644 --- a/crates/dq/src/bin/dq.rs +++ b/crates/dq/src/bin/dq.rs @@ -94,7 +94,7 @@ pub fn main() -> Result<(), Error> { let stats_conf = EventChunkerConf { disk_stats_every: ByteSize::from_mb(2), }; - let _chunks = EventChunker::from_start(inp, fetch_info, range, stats_conf, 0, path.clone(), true); + let _chunks = EventChunker::from_start(inp, fetch_info, range, stats_conf, 0, path.clone()); err::todo(); Ok(()) } diff --git a/crates/items_0/src/items_0.rs b/crates/items_0/src/items_0.rs index aeff1c4..9161d06 100644 --- a/crates/items_0/src/items_0.rs +++ b/crates/items_0/src/items_0.rs @@ -9,6 +9,7 @@ pub mod subfr; pub mod test; pub mod timebin; pub mod transform; +pub mod vecpreview; pub mod bincode { pub use bincode::*; @@ -21,6 +22,7 @@ use container::ByteEstimate; use std::any::Any; use std::collections::VecDeque; use std::fmt; +use timebin::BinningggContainerEventsDyn; use timebin::TimeBinnable; pub trait WithLen { @@ -158,6 +160,7 @@ pub trait Events: fn clear(&mut self); // TODO: can not name EventsDim0 from here, so use trait object for now. Anyway is a workaround. fn to_dim0_f32_for_binning(&self) -> Box; + fn to_container_events(&self) -> Box; } impl WithLen for Box { @@ -296,4 +299,8 @@ impl Events for Box { fn to_dim0_f32_for_binning(&self) -> Box { Events::to_dim0_f32_for_binning(self.as_ref()) } + + fn to_container_events(&self) -> Box { + Events::to_container_events(self.as_ref()) + } } diff --git a/crates/items_0/src/timebin.rs b/crates/items_0/src/timebin.rs index 0d15004..52ce431 100644 --- a/crates/items_0/src/timebin.rs +++ b/crates/items_0/src/timebin.rs @@ -4,18 +4,24 @@ use crate::collect_s::Collectable; use crate::collect_s::Collector; use crate::collect_s::ToJsonResult; use crate::overlap::RangeOverlapInfo; +use crate::vecpreview::PreviewRange; use crate::AsAnyMut; use crate::AsAnyRef; +use crate::Empty; use crate::Events; use crate::Resettable; use crate::TypeName; use crate::WithLen; +use err::thiserror; use err::Error; +use err::ThisError; use netpod::log::*; use netpod::range::evrange::SeriesRange; use netpod::BinnedRange; use netpod::BinnedRangeEnum; use netpod::TsNano; +use serde::Deserialize; +use serde::Serialize; use std::any::Any; use std::fmt; use std::ops::Range; @@ -64,10 +70,20 @@ pub trait TimeBinnableTy: fmt::Debug + WithLen + Send + Sized { ) -> Self::TimeBinner; } +// #[derive(Debug, ThisError)] +// #[cstm(name = "Binninggg")] pub enum BinningggError { Dyn(Box), } +impl fmt::Display for BinningggError { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + match self { + BinningggError::Dyn(e) => write!(fmt, "{e}"), + } + } +} + impl From for BinningggError where E: std::error::Error + 'static, @@ -77,11 +93,21 @@ where } } -pub trait BinningggContainerEventsDyn: fmt::Debug { +pub trait BinningggContainerEventsDyn: fmt::Debug + Send { fn binned_events_timeweight_traitobj(&self) -> Box; } -pub trait BinningggContainerBinsDyn: fmt::Debug {} +pub trait BinningggContainerBinsDyn: fmt::Debug + Send + fmt::Display + WithLen { + fn empty(&self) -> BinsBoxed; + fn clone(&self) -> BinsBoxed; + fn edges_iter( + &self, + ) -> std::iter::Zip, std::collections::vec_deque::Iter>; + fn drain_into(&mut self, dst: &mut dyn BinningggContainerBinsDyn, range: Range); + fn to_old_time_binned(&self) -> Box; +} + +pub type BinsBoxed = Box; pub trait BinningggBinnerTy: fmt::Debug + Send { type Input: fmt::Debug; diff --git a/crates/items_0/src/vecpreview.rs b/crates/items_0/src/vecpreview.rs new file mode 100644 index 0000000..74f23ec --- /dev/null +++ b/crates/items_0/src/vecpreview.rs @@ -0,0 +1,53 @@ +use core::fmt; +use std::collections::VecDeque; + +pub struct PreviewCell<'a, T> { + pub a: Option<&'a T>, + pub b: Option<&'a T>, +} + +impl<'a, T> fmt::Debug for PreviewCell<'a, T> +where + T: fmt::Debug, +{ + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + match (self.a.as_ref(), self.b.as_ref()) { + (Some(a), Some(b)) => write!(fmt, "{:?} .. {:?}", a, b), + (Some(a), None) => write!(fmt, "{:?}", a), + _ => write!(fmt, "(empty)"), + } + } +} + +pub trait PreviewRange { + fn preview<'a>(&'a self) -> Box; +} + +impl PreviewRange for VecDeque +where + T: fmt::Debug, +{ + fn preview<'a>(&'a self) -> Box { + let ret = PreviewCell { + a: self.front(), + b: if self.len() <= 1 { None } else { self.back() }, + }; + Box::new(ret) + } +} + +pub struct VecPreview<'a> { + c: &'a dyn PreviewRange, +} + +impl<'a> VecPreview<'a> { + pub fn new(c: &'a dyn PreviewRange) -> Self { + Self { c } + } +} + +impl<'a> fmt::Debug for VecPreview<'a> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "{:?}", self.c.preview()) + } +} diff --git a/crates/items_2/src/binning/container_bins.rs b/crates/items_2/src/binning/container_bins.rs index 5bc1e12..f1b2e0b 100644 --- a/crates/items_2/src/binning/container_bins.rs +++ b/crates/items_2/src/binning/container_bins.rs @@ -2,11 +2,10 @@ use super::aggregator::AggregatorNumeric; use super::aggregator::AggregatorTimeWeight; use super::container_events::EventValueType; use super::___; -use crate::vecpreview::PreviewRange; -use crate::vecpreview::VecPreview; use core::fmt; use err::thiserror; use err::ThisError; +use items_0::vecpreview::VecPreview; use netpod::TsNano; use serde::Deserialize; use serde::Serialize; @@ -114,6 +113,28 @@ impl ContainerBins where EVT: EventValueType, { + pub fn from_constituents( + ts1s: VecDeque, + ts2s: VecDeque, + cnts: VecDeque, + mins: VecDeque, + maxs: VecDeque, + avgs: VecDeque, + lsts: VecDeque, + fnls: VecDeque, + ) -> Self { + Self { + ts1s, + ts2s, + cnts, + mins, + maxs, + avgs, + lsts, + fnls, + } + } + pub fn type_name() -> &'static str { any::type_name::() } @@ -153,6 +174,14 @@ where self.ts2s.back().map(|&x| x) } + pub fn ts1s_iter(&self) -> std::collections::vec_deque::Iter { + self.ts1s.iter() + } + + pub fn ts2s_iter(&self) -> std::collections::vec_deque::Iter { + self.ts2s.iter() + } + pub fn cnts_iter(&self) -> std::collections::vec_deque::Iter { self.cnts.iter() } @@ -165,6 +194,50 @@ where self.maxs.iter() } + pub fn avgs_iter(&self) -> std::collections::vec_deque::Iter { + self.avgs.iter() + } + + pub fn fnls_iter(&self) -> std::collections::vec_deque::Iter { + self.fnls.iter() + } + + pub fn zip_iter( + &self, + ) -> std::iter::Zip< + std::iter::Zip< + std::iter::Zip< + std::iter::Zip< + std::iter::Zip< + std::iter::Zip< + std::collections::vec_deque::Iter, + std::collections::vec_deque::Iter, + >, + std::collections::vec_deque::Iter, + >, + std::collections::vec_deque::Iter, + >, + std::collections::vec_deque::Iter, + >, + std::collections::vec_deque::Iter, + >, + std::collections::vec_deque::Iter, + > { + self.ts1s_iter() + .zip(self.ts2s_iter()) + .zip(self.cnts_iter()) + .zip(self.mins_iter()) + .zip(self.maxs_iter()) + .zip(self.avgs_iter()) + .zip(self.fnls_iter()) + } + + pub fn edges_iter( + &self, + ) -> std::iter::Zip, std::collections::vec_deque::Iter> { + self.ts1s.iter().zip(self.ts2s.iter()) + } + pub fn len_before(&self, end: TsNano) -> usize { let pp = self.ts2s.partition_point(|&x| x <= end); assert!(pp <= self.len(), "len_before pp {} len {}", pp, self.len()); diff --git a/crates/items_2/src/binning/container_events.rs b/crates/items_2/src/binning/container_events.rs index 3d8fc6c..9763359 100644 --- a/crates/items_2/src/binning/container_events.rs +++ b/crates/items_2/src/binning/container_events.rs @@ -2,11 +2,12 @@ use super::aggregator::AggTimeWeightOutputAvg; use super::aggregator::AggregatorNumeric; use super::aggregator::AggregatorTimeWeight; use super::___; -use crate::vecpreview::PreviewRange; -use crate::vecpreview::VecPreview; use core::fmt; use err::thiserror; use err::ThisError; +use items_0::timebin::BinningggContainerEventsDyn; +use items_0::vecpreview::PreviewRange; +use items_0::vecpreview::VecPreview; use netpod::TsNano; use serde::Deserialize; use serde::Serialize; @@ -20,7 +21,7 @@ macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } #[cstm(name = "ValueContainerError")] pub enum ValueContainerError {} -pub trait Container: fmt::Debug + Clone + PreviewRange + Serialize + for<'a> Deserialize<'a> { +pub trait Container: fmt::Debug + Send + Clone + PreviewRange + Serialize + for<'a> Deserialize<'a> { fn new() -> Self; // fn verify(&self) -> Result<(), ValueContainerError>; fn push_back(&mut self, val: EVT); @@ -120,6 +121,10 @@ impl ContainerEvents where EVT: EventValueType, { + pub fn from_constituents(tss: VecDeque, vals: ::Container) -> Self { + Self { tss, vals } + } + pub fn type_name() -> &'static str { any::type_name::() } @@ -233,3 +238,12 @@ where } } } + +impl BinningggContainerEventsDyn for ContainerEvents +where + EVT: EventValueType, +{ + fn binned_events_timeweight_traitobj(&self) -> Box { + todo!() + } +} diff --git a/crates/items_2/src/binning/timeweight.rs b/crates/items_2/src/binning/timeweight.rs index b9791c9..8015202 100644 --- a/crates/items_2/src/binning/timeweight.rs +++ b/crates/items_2/src/binning/timeweight.rs @@ -1,4 +1,5 @@ pub mod timeweight_bins; +pub mod timeweight_bins_dyn; pub mod timeweight_events; pub mod timeweight_events_dyn; diff --git a/crates/items_2/src/binning/timeweight/timeweight_bins_dyn.rs b/crates/items_2/src/binning/timeweight/timeweight_bins_dyn.rs new file mode 100644 index 0000000..1b65daa --- /dev/null +++ b/crates/items_2/src/binning/timeweight/timeweight_bins_dyn.rs @@ -0,0 +1,27 @@ +use futures_util::Stream; +use items_0::streamitem::Sitemty; +use items_0::timebin::BinningggContainerBinsDyn; +use netpod::BinnedRange; +use netpod::TsNano; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +pub struct BinnedBinsTimeweightStream {} + +impl BinnedBinsTimeweightStream { + pub fn new( + range: BinnedRange, + inp: Pin>> + Send>>, + ) -> Self { + todo!() + } +} + +impl Stream for BinnedBinsTimeweightStream { + type Item = Sitemty>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + todo!() + } +} diff --git a/crates/items_2/src/binning/timeweight/timeweight_events.rs b/crates/items_2/src/binning/timeweight/timeweight_events.rs index a5f8c75..b0601b6 100644 --- a/crates/items_2/src/binning/timeweight/timeweight_events.rs +++ b/crates/items_2/src/binning/timeweight/timeweight_events.rs @@ -19,6 +19,7 @@ use netpod::DtNano; use netpod::TsNano; use std::collections::VecDeque; use std::marker::PhantomData; +use std::mem; use std::pin::Pin; use std::task::Context; use std::task::Poll; @@ -382,6 +383,7 @@ where range: BinnedRange, inner_a: InnerA, out: ContainerBins, + produce_cnt_zero: bool, } impl fmt::Debug for BinnedEventsTimeweight @@ -422,9 +424,16 @@ where }, lst: None, out: ContainerBins::new(), + produce_cnt_zero: true, } } + pub fn disable_cnt_zero(self) -> Self { + let mut ret = self; + ret.produce_cnt_zero = false; + ret + } + fn ingest_event_without_lst(&mut self, ev: EventSingle) -> Result<(), Error> { let b = &self.inner_a.inner_b; if ev.ts >= b.active_end { @@ -485,10 +494,26 @@ where let div = b.active_len.ns(); if let Some(lst) = self.lst.as_ref() { let lst = LstRef(lst); - let mut i = 0; - loop { - i += 1; - assert!(i < 100000, "too many iterations"); + if self.produce_cnt_zero { + let mut i = 0; + loop { + i += 1; + assert!(i < 100000, "too many iterations"); + let b = &self.inner_a.inner_b; + if ts > b.filled_until { + if ts >= b.active_end { + if b.filled_until < b.active_end { + self.inner_a.inner_b.fill_until(b.active_end, lst.clone()); + } + self.inner_a.push_out_and_reset(lst.clone(), true, &mut self.out); + } else { + self.inner_a.inner_b.fill_until(ts, lst.clone()); + } + } else { + break; + } + } + } else { let b = &self.inner_a.inner_b; if ts > b.filled_until { if ts >= b.active_end { @@ -497,13 +522,29 @@ where } self.inner_a.push_out_and_reset(lst.clone(), true, &mut self.out); } else { + // TODO should not hit this case. Prove it, assert it. self.inner_a.inner_b.fill_until(ts, lst.clone()); } } else { - break; + // TODO should never hit this case. Count. } + + // TODO jump to next bin + // TODO merge with the other reset + // Below uses the same code + let ts1 = TsNano::from_ns(ts.ns() / div * div); + let b = &mut self.inner_a.inner_b; + b.active_beg = ts1; + b.active_end = ts1.add_dt_nano(b.active_len); + b.filled_until = ts1; + b.filled_width = DtNano::from_ns(0); + b.cnt = 0; + b.agg.reset_for_new_bin(); + // assert!(self.inner_a.minmax.is_none()); + trace_cycle!("cycled direct to {:?} {:?}", b.active_beg, b.active_end); } } else { + assert!(self.inner_a.minmax.is_none()); // TODO merge with the other reset let ts1 = TsNano::from_ns(ts.ns() / div * div); let b = &mut self.inner_a.inner_b; @@ -513,7 +554,6 @@ where b.filled_width = DtNano::from_ns(0); b.cnt = 0; b.agg.reset_for_new_bin(); - assert!(self.inner_a.minmax.is_none()); trace_cycle!("cycled direct to {:?} {:?}", b.active_beg, b.active_end); } } @@ -594,6 +634,6 @@ where } pub fn output(&mut self) -> ContainerBins { - ::core::mem::replace(&mut self.out, ContainerBins::new()) + mem::replace(&mut self.out, ContainerBins::new()) } } diff --git a/crates/items_2/src/binning/timeweight/timeweight_events_dyn.rs b/crates/items_2/src/binning/timeweight/timeweight_events_dyn.rs index 3d30552..d94f8c3 100644 --- a/crates/items_2/src/binning/timeweight/timeweight_events_dyn.rs +++ b/crates/items_2/src/binning/timeweight/timeweight_events_dyn.rs @@ -1,9 +1,11 @@ use super::timeweight_events::BinnedEventsTimeweight; +use crate::binning::container_bins::ContainerBins; use crate::binning::container_events::EventValueType; use crate::channelevents::ChannelEvents; use err::thiserror; use err::ThisError; use futures_util::Stream; +use futures_util::StreamExt; use items_0::streamitem::Sitemty; use items_0::timebin::BinnedEventsTimeweightTrait; use items_0::timebin::BinningggBinnerDyn; @@ -12,6 +14,8 @@ use items_0::timebin::BinningggContainerEventsDyn; use items_0::timebin::BinningggError; use netpod::BinnedRange; use netpod::TsNano; +use std::arch::x86_64; +use std::ops::ControlFlow; use std::pin::Pin; use std::task::Context; use std::task::Poll; @@ -68,18 +72,6 @@ where } } -pub struct BinnedEventsTimeweightStream { - inp: Pin> + Send>>, -} - -impl Stream for BinnedEventsTimeweightStream { - type Item = (); - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - todo!() - } -} - #[derive(Debug)] pub struct BinnedEventsTimeweightLazy { range: BinnedRange, @@ -114,3 +106,109 @@ impl BinnedEventsTimeweightTrait for BinnedEventsTimeweightLazy { todo!() } } + +enum StreamState { + Reading, + Done, +} + +pub struct BinnedEventsTimeweightStream { + state: StreamState, + inp: Pin> + Send>>, + binned_events: BinnedEventsTimeweightLazy, + range_complete: bool, +} + +impl BinnedEventsTimeweightStream { + pub fn new(range: BinnedRange, inp: Pin> + Send>>) -> Self { + Self { + state: StreamState::Reading, + inp, + binned_events: BinnedEventsTimeweightLazy::new(range), + range_complete: false, + } + } + + fn handle_sitemty( + mut self: Pin<&mut Self>, + item: Sitemty, + cx: &mut Context, + ) -> ControlFlow::Item>>> { + use items_0::streamitem::RangeCompletableItem::*; + use items_0::streamitem::StreamItem::*; + use ControlFlow::*; + use Poll::*; + match item { + Ok(x) => match x { + DataItem(x) => match x { + Data(x) => match x { + ChannelEvents::Events(evs) => match self.binned_events.ingest(evs.to_container_events()) { + Ok(()) => { + match self.binned_events.output() { + Ok(x) => { + if x.len() == 0 { + Continue(()) + } else { + Break(Ready(Some(Ok(DataItem(Data(x)))))) + } + } + Err(e) => Break(Ready(Some(Err(::err::Error::from_string(e))))), + } + // Continue(()) + } + Err(e) => Break(Ready(Some(Err(::err::Error::from_string(e))))), + }, + ChannelEvents::Status(_) => { + // TODO use the status + Continue(()) + } + }, + RangeComplete => { + self.range_complete = true; + Continue(()) + } + }, + Log(x) => Break(Ready(Some(Ok(Log(x))))), + Stats(x) => Break(Ready(Some(Ok(Stats(x))))), + }, + Err(e) => { + self.state = StreamState::Done; + Break(Ready(Some(Err(e)))) + } + } + } + + fn handle_eos(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll::Item>> { + use items_0::streamitem::RangeCompletableItem::*; + use items_0::streamitem::StreamItem::*; + use Poll::*; + if self.range_complete { + self.binned_events.input_done_range_final(); + } else { + self.binned_events.input_done_range_open(); + } + match self.binned_events.output() { + Ok(x) => Ready(Some(Ok(DataItem(Data(x))))), + Err(e) => Ready(Some(Err(::err::Error::from_string(e)))), + } + } +} + +impl Stream for BinnedEventsTimeweightStream { + type Item = Sitemty>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use ControlFlow::*; + use Poll::*; + loop { + break match self.as_mut().inp.poll_next_unpin(cx) { + Ready(Some(x)) => match self.as_mut().handle_sitemty(x, cx) { + Continue(()) => continue, + Break(x) => x, + }, + Ready(None) => self.handle_eos(cx), + Pending => Pending, + }; + } + } +} diff --git a/crates/items_2/src/binning/valuetype.rs b/crates/items_2/src/binning/valuetype.rs index aec2e29..5e8bf91 100644 --- a/crates/items_2/src/binning/valuetype.rs +++ b/crates/items_2/src/binning/valuetype.rs @@ -2,8 +2,8 @@ use super::aggregator::AggregatorTimeWeight; use super::binnedvaluetype::BinnedNumericValue; use super::container_events::Container; use super::container_events::EventValueType; -use crate::vecpreview::PreviewRange; use core::fmt; +use items_0::vecpreview::PreviewRange; use netpod::DtNano; use netpod::EnumVariant; use serde::Deserialize; @@ -18,7 +18,7 @@ pub struct EnumVariantContainer { impl PreviewRange for EnumVariantContainer { fn preview<'a>(&'a self) -> Box { - let ret = crate::vecpreview::PreviewCell { + let ret = items_0::vecpreview::PreviewCell { a: self.ixs.front(), b: self.ixs.back(), }; diff --git a/crates/items_2/src/binsdim0.rs b/crates/items_2/src/binsdim0.rs index fecf27d..46f1a5e 100644 --- a/crates/items_2/src/binsdim0.rs +++ b/crates/items_2/src/binsdim0.rs @@ -2,7 +2,6 @@ use crate::timebin::TimeBinnerCommonV0Func; use crate::timebin::TimeBinnerCommonV0Trait; use crate::ts_offs_from_abs; use crate::ts_offs_from_abs_with_anchor; -use crate::vecpreview::VecPreview; use crate::IsoDateTime; use crate::RangeOverlapInfo; use crate::TimeBinnableType; @@ -25,6 +24,7 @@ use items_0::timebin::TimeBinned; use items_0::timebin::TimeBinner; use items_0::timebin::TimeBinnerTy; use items_0::timebin::TimeBins; +use items_0::vecpreview::VecPreview; use items_0::AppendAllFrom; use items_0::AppendEmptyBin; use items_0::AsAnyMut; @@ -134,6 +134,10 @@ where impl BinsDim0 { pub fn push(&mut self, ts1: u64, ts2: u64, count: u64, min: NTY, max: NTY, avg: f32, lst: NTY) { + if avg < min.as_prim_f32_b() || avg > max.as_prim_f32_b() { + // TODO rounding issues? + debug!("bad avg"); + } self.ts1s.push_back(ts1); self.ts2s.push_back(ts2); self.cnts.push_back(count); @@ -534,12 +538,19 @@ where if self.cnt == 0 && !push_empty { self.reset_agg(); } else { + let min = self.min.clone(); + let max = self.max.clone(); + let avg = self.avg as f32; + if avg < min.as_prim_f32_b() || avg > max.as_prim_f32_b() { + // TODO rounding issues? + debug!("bad avg"); + } self.out.ts1s.push_back(self.ts1now.ns()); self.out.ts2s.push_back(self.ts2now.ns()); self.out.cnts.push_back(self.cnt); - self.out.mins.push_back(self.min.clone()); - self.out.maxs.push_back(self.max.clone()); - self.out.avgs.push_back(self.avg as f32); + self.out.mins.push_back(min); + self.out.maxs.push_back(max); + self.out.avgs.push_back(avg); self.out.lsts.push_back(self.lst.clone()); self.reset_agg(); } diff --git a/crates/items_2/src/channelevents.rs b/crates/items_2/src/channelevents.rs index 8ff50c0..b48b57a 100644 --- a/crates/items_2/src/channelevents.rs +++ b/crates/items_2/src/channelevents.rs @@ -1023,6 +1023,10 @@ impl Events for ChannelEvents { Status(x) => panic!("ChannelEvents::to_dim0_f32_for_binning"), } } + + fn to_container_events(&self) -> Box { + panic!("should not get used") + } } impl Collectable for ChannelEvents { diff --git a/crates/items_2/src/eventfull.rs b/crates/items_2/src/eventfull.rs index af599b5..e9bcbf6 100644 --- a/crates/items_2/src/eventfull.rs +++ b/crates/items_2/src/eventfull.rs @@ -270,6 +270,7 @@ pub enum DecompError { UnusedBytes, BitshuffleError, ShapeMakesNoSense, + UnexpectedCompressedScalarValue, } fn decompress(databuf: &[u8], type_size: u32) -> Result, DecompError> { @@ -325,10 +326,18 @@ impl EventFull { /// but we still don't know whether that's an image or a waveform. /// Therefore, the function accepts the expected shape to at least make an assumption /// about whether this is an image or a waveform. - pub fn shape_derived(&self, i: usize, shape_exp: &Shape) -> Result { + pub fn shape_derived( + &self, + i: usize, + scalar_type_exp: &ScalarType, + shape_exp: &Shape, + ) -> Result { match shape_exp { Shape::Scalar => match &self.comps[i] { - Some(_) => Err(DecompError::ShapeMakesNoSense), + Some(_) => match scalar_type_exp { + ScalarType::STRING => Ok(Shape::Scalar), + _ => Err(DecompError::UnexpectedCompressedScalarValue), + }, None => Ok(Shape::Scalar), }, Shape::Wave(_) => match &self.shapes[i] { diff --git a/crates/items_2/src/eventsdim0.rs b/crates/items_2/src/eventsdim0.rs index cd76288..70d4347 100644 --- a/crates/items_2/src/eventsdim0.rs +++ b/crates/items_2/src/eventsdim0.rs @@ -760,6 +760,31 @@ impl EventsDim0Aggregator { } else { lst.as_prim_f32_b() }; + let max = if min > max { + // TODO count + debug!("min > max"); + min.clone() + } else { + max + }; + let avg = { + let g = min.as_prim_f32_b(); + if avg < g { + debug!("avg < min"); + g + } else { + avg + } + }; + let avg = { + let g = max.as_prim_f32_b(); + if avg > g { + debug!("avg > max"); + g + } else { + avg + } + }; let ret = if self.range.is_time() { BinsDim0 { ts1s: [range_beg].into(), @@ -1078,6 +1103,14 @@ impl Events for EventsDim0 { } Box::new(ret) } + + fn to_container_events(&self) -> Box { + // let tss = self.tss.iter().map(|&x| TsNano::from_ns(x)).collect(); + // let vals = self.values.clone(); + // let ret = crate::binning::container_events::ContainerEvents::from_constituents(tss, vals); + // Box::new(ret) + todo!() + } } #[derive(Debug)] diff --git a/crates/items_2/src/eventsdim0enum.rs b/crates/items_2/src/eventsdim0enum.rs index 837abfa..f3c59cc 100644 --- a/crates/items_2/src/eventsdim0enum.rs +++ b/crates/items_2/src/eventsdim0enum.rs @@ -499,4 +499,8 @@ impl Events for EventsDim0Enum { fn to_dim0_f32_for_binning(&self) -> Box { todo!("{}::to_dim0_f32_for_binning", self.type_name()) } + + fn to_container_events(&self) -> Box { + todo!("{}::to_container_events", self.type_name()) + } } diff --git a/crates/items_2/src/eventsdim1.rs b/crates/items_2/src/eventsdim1.rs index 8e20753..11ec1fa 100644 --- a/crates/items_2/src/eventsdim1.rs +++ b/crates/items_2/src/eventsdim1.rs @@ -993,6 +993,10 @@ impl Events for EventsDim1 { fn to_dim0_f32_for_binning(&self) -> Box { todo!("{}::to_dim0_f32_for_binning", self.type_name()) } + + fn to_container_events(&self) -> Box { + todo!("{}::to_container_events", self.type_name()) + } } #[derive(Debug)] diff --git a/crates/items_2/src/eventsxbindim0.rs b/crates/items_2/src/eventsxbindim0.rs index 30ca169..accfaea 100644 --- a/crates/items_2/src/eventsxbindim0.rs +++ b/crates/items_2/src/eventsxbindim0.rs @@ -381,6 +381,10 @@ impl Events for EventsXbinDim0 { fn to_dim0_f32_for_binning(&self) -> Box { todo!("{}::to_dim0_f32_for_binning", self.type_name()) } + + fn to_container_events(&self) -> Box { + todo!("{}::to_container_events", self.type_name()) + } } #[derive(Debug)] diff --git a/crates/items_2/src/vecpreview.rs b/crates/items_2/src/vecpreview.rs index 74f23ec..8b13789 100644 --- a/crates/items_2/src/vecpreview.rs +++ b/crates/items_2/src/vecpreview.rs @@ -1,53 +1 @@ -use core::fmt; -use std::collections::VecDeque; -pub struct PreviewCell<'a, T> { - pub a: Option<&'a T>, - pub b: Option<&'a T>, -} - -impl<'a, T> fmt::Debug for PreviewCell<'a, T> -where - T: fmt::Debug, -{ - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - match (self.a.as_ref(), self.b.as_ref()) { - (Some(a), Some(b)) => write!(fmt, "{:?} .. {:?}", a, b), - (Some(a), None) => write!(fmt, "{:?}", a), - _ => write!(fmt, "(empty)"), - } - } -} - -pub trait PreviewRange { - fn preview<'a>(&'a self) -> Box; -} - -impl PreviewRange for VecDeque -where - T: fmt::Debug, -{ - fn preview<'a>(&'a self) -> Box { - let ret = PreviewCell { - a: self.front(), - b: if self.len() <= 1 { None } else { self.back() }, - }; - Box::new(ret) - } -} - -pub struct VecPreview<'a> { - c: &'a dyn PreviewRange, -} - -impl<'a> VecPreview<'a> { - pub fn new(c: &'a dyn PreviewRange) -> Self { - Self { c } - } -} - -impl<'a> fmt::Debug for VecPreview<'a> { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - write!(fmt, "{:?}", self.c.preview()) - } -} diff --git a/crates/nodenet/src/conn.rs b/crates/nodenet/src/conn.rs index d363e5a..b1a350d 100644 --- a/crates/nodenet/src/conn.rs +++ b/crates/nodenet/src/conn.rs @@ -279,7 +279,7 @@ pub fn events_parse_input_query(frames: Vec) -> Result<(EventsSub }, Err(e) => return Err(e.into()), }; - info!("parsing json {:?}", qitem.str()); + trace!("parsing json {:?}", qitem.str()); let frame1: Frame1Parts = serde_json::from_str(&qitem.str()).map_err(|_e| { let e = Error::BadQuery; error!("{e}"); diff --git a/crates/query/src/api4/events.rs b/crates/query/src/api4/events.rs index 89560ff..17977cc 100644 --- a/crates/query/src/api4/events.rs +++ b/crates/query/src/api4/events.rs @@ -537,7 +537,7 @@ impl EventsSubQuery { } pub fn need_one_before_range(&self) -> bool { - self.select.one_before_range + self.select.one_before_range | self.transform().need_one_before_range() } pub fn transform(&self) -> &TransformQuery { diff --git a/crates/scyllaconn/src/bincache.rs b/crates/scyllaconn/src/bincache.rs index 78ede96..b7a2120 100644 --- a/crates/scyllaconn/src/bincache.rs +++ b/crates/scyllaconn/src/bincache.rs @@ -7,6 +7,7 @@ use err::Error; use futures_util::Future; use futures_util::Stream; use futures_util::StreamExt; +use items_0::timebin::BinsBoxed; use items_0::timebin::TimeBinned; use items_0::Empty; use items_2::binsdim0::BinsDim0; @@ -227,12 +228,14 @@ impl streams::timebin::CacheReadProvider for ScyllaCacheReadProvider { offs: Range, ) -> streams::timebin::cached::reader::CacheReading { let scyqueue = self.scyqueue.clone(); - let fut = async move { scyqueue.read_cache_f32(series, bin_len, msp, offs).await }; + // let fut = async move { scyqueue.read_cache_f32(series, bin_len, msp, offs).await }; + let fut = async { todo!("TODO impl scylla cache read") }; streams::timebin::cached::reader::CacheReading::new(Box::pin(fut)) } - fn write(&self, series: u64, bins: BinsDim0) -> streams::timebin::cached::reader::CacheWriting { + fn write(&self, series: u64, bins: BinsBoxed) -> streams::timebin::cached::reader::CacheWriting { let scyqueue = self.scyqueue.clone(); + let bins = todo!("TODO impl scylla cache write"); let fut = async move { scyqueue.write_cache_f32(series, bins).await }; streams::timebin::cached::reader::CacheWriting::new(Box::pin(fut)) } diff --git a/crates/streams/src/generators.rs b/crates/streams/src/generators.rs index e9c341e..60392f5 100644 --- a/crates/streams/src/generators.rs +++ b/crates/streams/src/generators.rs @@ -85,7 +85,7 @@ fn make_test_channel_events_stream_data_inner( debug!("use test backend data"); let chn = subq.name(); let range = subq.range().clone(); - let one_before = subq.transform().need_one_before_range(); + let one_before = subq.need_one_before_range(); if chn == "test-gen-i32-dim0-v00" { Ok(Box::pin(GenerateI32V00::new(node_ix, node_count, range, one_before))) } else if chn == "test-gen-i32-dim0-v01" { diff --git a/crates/streams/src/rangefilter2.rs b/crates/streams/src/rangefilter2.rs index beb9620..50f16cd 100644 --- a/crates/streams/src/rangefilter2.rs +++ b/crates/streams/src/rangefilter2.rs @@ -39,7 +39,7 @@ where inp: S, range: NanoRange, range_str: String, - one_before_range: bool, + one_before: bool, stats: RangeFilterStats, slot1: Option, have_range_complete: bool, @@ -59,20 +59,20 @@ where std::any::type_name::() } - pub fn new(inp: S, range: NanoRange, one_before_range: bool) -> Self { + pub fn new(inp: S, range: NanoRange, one_before: bool) -> Self { let trdet = false; trace_emit!( trdet, - "{}::new range: {:?} one_before_range {:?}", + "{}::new range: {:?} one_before {:?}", Self::type_name(), range, - one_before_range + one_before ); Self { inp, range_str: format!("{:?}", range), range, - one_before_range, + one_before, stats: RangeFilterStats::new(), slot1: None, have_range_complete: false, @@ -116,6 +116,11 @@ where } fn handle_item(&mut self, item: ITY) -> Result { + if let Some(ts_min) = item.ts_min() { + if ts_min < self.range.beg() { + debug!("ITEM BEFORE RANGE (how many?)"); + } + } let min = item.ts_min().map(|x| TsNano::from_ns(x).fmt()); let max = item.ts_max().map(|x| TsNano::from_ns(x).fmt()); trace_emit!( @@ -126,7 +131,7 @@ where max ); let mut item = self.prune_high(item, self.range.end)?; - let ret = if self.one_before_range { + let ret = if self.one_before { let lige = item.find_lowest_index_ge(self.range.beg); trace_emit!(self.trdet, "YES one_before_range ilge {:?}", lige); match lige { diff --git a/crates/streams/src/timebin/cached/reader.rs b/crates/streams/src/timebin/cached/reader.rs index af30fe6..c0fb17c 100644 --- a/crates/streams/src/timebin/cached/reader.rs +++ b/crates/streams/src/timebin/cached/reader.rs @@ -5,8 +5,7 @@ use futures_util::FutureExt; use futures_util::Stream; use futures_util::StreamExt; use items_0::streamitem::Sitemty; -use items_0::timebin::TimeBinnable; -use items_2::binsdim0::BinsDim0; +use items_0::timebin::BinsBoxed; use items_2::channelevents::ChannelEvents; use netpod::log::*; use netpod::BinnedRange; @@ -55,19 +54,19 @@ pub trait EventsReadProvider: Send + Sync { } pub struct CacheReading { - fut: Pin, streams::timebin::cached::reader::Error>> + Send>>, + fut: Pin> + Send>>, } impl CacheReading { pub fn new( - fut: Pin, streams::timebin::cached::reader::Error>> + Send>>, + fut: Pin> + Send>>, ) -> Self { Self { fut } } } impl Future for CacheReading { - type Output = Result, streams::timebin::cached::reader::Error>; + type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { self.fut.poll_unpin(cx) @@ -94,7 +93,7 @@ impl Future for CacheWriting { pub trait CacheReadProvider: Send + Sync { fn read(&self, series: u64, bin_len: DtMs, msp: u64, offs: Range) -> CacheReading; - fn write(&self, series: u64, bins: BinsDim0) -> CacheWriting; + fn write(&self, series: u64, bins: BinsBoxed) -> CacheWriting; } #[derive(Debug, ThisError)] @@ -112,7 +111,7 @@ pub struct CachedReader { ts1next: TsNano, bin_len: DtMs, cache_read_provider: Arc, - reading: Option, Error>> + Send>>>, + reading: Option> + Send>>>, } impl CachedReader { @@ -134,7 +133,7 @@ impl CachedReader { } impl Stream for CachedReader { - type Item = Result, Error>; + type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; @@ -151,7 +150,6 @@ impl Stream for CachedReader { self.reading = None; match x { Ok(bins) => { - use items_0::WithLen; trace_emit!( "- - - - - - - - - - - - emit cached bins {} bin_len {}", bins.len(), diff --git a/crates/streams/src/timebin/fromevents.rs b/crates/streams/src/timebin/fromevents.rs index 8a05d75..3dc81c1 100644 --- a/crates/streams/src/timebin/fromevents.rs +++ b/crates/streams/src/timebin/fromevents.rs @@ -6,8 +6,7 @@ use futures_util::StreamExt; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; -use items_2::binsdim0::BinsDim0; -use items_2::channelevents::ChannelEvents; +use items_0::timebin::BinsBoxed; use netpod::log::*; use netpod::BinnedRange; use netpod::ChConf; @@ -26,7 +25,7 @@ macro_rules! trace_emit { ($($arg:tt)*) => ( if true { trace!($($arg)*); } ) } pub enum Error {} pub struct BinnedFromEvents { - stream: Pin>> + Send>>, + stream: Pin> + Send>>, } impl BinnedFromEvents { @@ -41,33 +40,28 @@ impl BinnedFromEvents { panic!(); } let stream = read_provider.read(evq, chconf); - let stream = stream.map(|x| { - let x = items_0::try_map_sitemty_data!(x, |x| match x { - ChannelEvents::Events(x) => { - let x = x.to_dim0_f32_for_binning(); - Ok(ChannelEvents::Events(x)) - } - ChannelEvents::Status(x) => Ok(ChannelEvents::Status(x)), - }); - x - }); - let stream = Box::pin(stream); - let stream = super::basic::TimeBinnedStream::new(stream, netpod::BinnedRangeEnum::Time(range), do_time_weight); + // let stream = stream.map(|x| { + // let x = items_0::try_map_sitemty_data!(x, |x| match x { + // ChannelEvents::Events(x) => { + // let x = x.to_dim0_f32_for_binning(); + // Ok(ChannelEvents::Events(x)) + // } + // ChannelEvents::Status(x) => Ok(ChannelEvents::Status(x)), + // }); + // x + // }); + let stream = if do_time_weight { + let stream = Box::pin(stream); + items_2::binning::timeweight::timeweight_events_dyn::BinnedEventsTimeweightStream::new(range, stream) + } else { + panic!("non-weighted TODO") + }; let stream = stream.map(|item| match item { Ok(x) => match x { StreamItem::DataItem(x) => match x { - RangeCompletableItem::Data(mut x) => { - // TODO need a typed time binner - if let Some(x) = x.as_any_mut().downcast_mut::>() { - let y = x.clone(); - use items_0::WithLen; - trace_emit!("=========== ========= emit from events {}", y.len()); - Ok(StreamItem::DataItem(RangeCompletableItem::Data(y))) - } else { - Err(::err::Error::with_msg_no_trace( - "GapFill expects incoming BinsDim0", - )) - } + RangeCompletableItem::Data(x) => { + debug!("see item {:?}", x); + Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))) } RangeCompletableItem::RangeComplete => { info!("BinnedFromEvents sees range final"); @@ -87,7 +81,7 @@ impl BinnedFromEvents { } impl Stream for BinnedFromEvents { - type Item = Sitemty>; + type Item = Sitemty; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { self.stream.poll_next_unpin(cx) diff --git a/crates/streams/src/timebin/fromlayers.rs b/crates/streams/src/timebin/fromlayers.rs index 81d053a..f952787 100644 --- a/crates/streams/src/timebin/fromlayers.rs +++ b/crates/streams/src/timebin/fromlayers.rs @@ -12,7 +12,10 @@ use items_0::on_sitemty_data; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; +use items_0::timebin::BinningggContainerBinsDyn; +use items_0::timebin::BinsBoxed; use items_0::timebin::TimeBinnableTy; +use items_2::binning::timeweight::timeweight_bins_dyn::BinnedBinsTimeweightStream; use items_2::binsdim0::BinsDim0; use netpod::log::*; use netpod::query::CacheUsage; @@ -44,7 +47,7 @@ pub enum Error { FinerGridMismatch(DtMs, DtMs), } -type BoxedInput = Pin>> + Send>>; +type BoxedInput = Pin> + Send>>; pub struct TimeBinnedFromLayers { ch_conf: ChannelTypeConfigGen, @@ -141,11 +144,7 @@ impl TimeBinnedFromLayers { cache_read_provider, events_read_provider.clone(), )?; - let inp = super::basic::TimeBinnedStream::new( - Box::pin(inp), - BinnedRangeEnum::Time(range), - do_time_weight, - ); + let inp = BinnedBinsTimeweightStream::new(range, Box::pin(inp)); let ret = Self { ch_conf, cache_usage, @@ -200,7 +199,7 @@ impl TimeBinnedFromLayers { } impl Stream for TimeBinnedFromLayers { - type Item = Sitemty>; + type Item = Sitemty; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; diff --git a/crates/streams/src/timebin/gapfill.rs b/crates/streams/src/timebin/gapfill.rs index 12d6324..d2c7d32 100644 --- a/crates/streams/src/timebin/gapfill.rs +++ b/crates/streams/src/timebin/gapfill.rs @@ -9,9 +9,7 @@ use futures_util::StreamExt; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; -use items_0::Empty; -use items_0::WithLen; -use items_2::binsdim0::BinsDim0; +use items_0::timebin::BinsBoxed; use netpod::log::*; use netpod::query::CacheUsage; use netpod::range::evrange::NanoRange; @@ -56,7 +54,7 @@ pub enum Error { EventsReader(#[from] super::fromevents::Error), } -type INP = Pin>> + Send>>; +type Input = Pin> + Send>>; // Try to read from cache for the given bin len. // For gaps in the stream, construct an alternative input from finer bin len with a binner. @@ -72,10 +70,10 @@ pub struct GapFill { range: BinnedRange, do_time_weight: bool, bin_len_layers: Vec, - inp: Option, + inp: Option, inp_range_final: bool, - inp_buf: Option>, - inp_finer: Option, + inp_buf: Option, + inp_finer: Option, inp_finer_range_final: bool, inp_finer_range_final_cnt: u32, inp_finer_range_final_max: u32, @@ -84,7 +82,7 @@ pub struct GapFill { exp_finer_range: NanoRange, cache_read_provider: Arc, events_read_provider: Arc, - bins_for_cache_write: BinsDim0, + bins_for_cache_write: Option, done: bool, cache_writing: Option, } @@ -114,7 +112,7 @@ impl GapFill { Ok(x) => Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))), Err(e) => Err(::err::Error::from_string(e)), }); - Box::pin(stream) as Pin>> + Send>> + Box::pin(stream) as Pin> + Send>> } else { let stream = futures_util::stream::empty(); Box::pin(stream) @@ -144,36 +142,33 @@ impl GapFill { exp_finer_range: NanoRange { beg: 0, end: 0 }, cache_read_provider, events_read_provider, - bins_for_cache_write: BinsDim0::empty(), + bins_for_cache_write: None, done: false, cache_writing: None, }; Ok(ret) } - fn handle_bins_finer(mut self: Pin<&mut Self>, bins: BinsDim0) -> Result, Error> { + fn handle_bins_finer(mut self: Pin<&mut Self>, bins: BinsBoxed) -> Result { trace_handle!("{} handle_bins_finer {}", self.dbgname, bins); - for (&ts1, &ts2) in bins.ts1s.iter().zip(&bins.ts2s) { + for (&ts1, &ts2) in bins.edges_iter() { if let Some(last) = self.last_bin_ts2 { - if ts1 != last.ns() { - return Err(Error::GapFromFiner( - TsNano::from_ns(ts1), - last, - self.range.bin_len_dt_ms(), - )); + if ts1 != last { + return Err(Error::GapFromFiner(ts1, last, self.range.bin_len_dt_ms())); } - } else if ts1 != self.range.nano_beg().ns() { + } else if ts1 != self.range.nano_beg() { return Err(Error::MissingBegFromFiner( - TsNano::from_ns(ts1), + ts1, self.range.nano_beg(), self.range.bin_len_dt_ms(), )); } - self.last_bin_ts2 = Some(TsNano::from_ns(ts2)); + self.last_bin_ts2 = Some(ts2); } if bins.len() != 0 { let mut bins2 = bins.clone(); - bins2.drain_into(&mut self.bins_for_cache_write, 0..bins2.len()); + let dst = self.bins_for_cache_write.get_or_insert_with(|| bins.empty()); + bins2.drain_into(dst.as_mut(), 0..bins2.len()); } if self.cache_usage.is_cache_write() { self.cache_write_intermediate()?; @@ -191,34 +186,34 @@ impl GapFill { Ok(()) } - fn handle_bins(mut self: Pin<&mut Self>, bins: BinsDim0) -> Result, Error> { + fn handle_bins(mut self: Pin<&mut Self>, bins: BinsBoxed) -> Result { trace_handle!("{} handle_bins {}", self.dbgname, bins); // TODO could use an interface to iterate over opaque bin items that only expose // edge and count information with all remaining values opaque. - for (i, (&ts1, &ts2)) in bins.ts1s.iter().zip(&bins.ts2s).enumerate() { - if ts1 < self.range.nano_beg().ns() { + for (i, (&ts1, &ts2)) in bins.edges_iter().enumerate() { + if ts1 < self.range.nano_beg() { return Err(Error::InputBeforeRange( - NanoRange::from_ns_u64(ts1, ts2), + NanoRange::from_ns_u64(ts1.ns(), ts2.ns()), self.range.clone(), )); } if let Some(last) = self.last_bin_ts2 { - if ts1 != last.ns() { + if ts1 != last { trace_handle!("{} detect a gap BETWEEN last {} ts1 {}", self.dbgname, last, ts1); - let mut ret = as items_0::Empty>::empty(); + let mut ret = bins.empty(); let mut bins = bins; - bins.drain_into(&mut ret, 0..i); + bins.drain_into(ret.as_mut(), 0..i); self.inp_buf = Some(bins); let range = NanoRange { beg: last.ns(), - end: ts1, + end: ts1.ns(), }; self.setup_sub(range)?; return Ok(ret); } else { // nothing to do } - } else if ts1 != self.range.nano_beg().ns() { + } else if ts1 != self.range.nano_beg() { trace_handle!( "{} detect a gap BEGIN beg {} ts1 {}", self.dbgname, @@ -227,12 +222,12 @@ impl GapFill { ); let range = NanoRange { beg: self.range.nano_beg().ns(), - end: ts1, + end: ts1.ns(), }; self.setup_sub(range)?; - return Ok(BinsDim0::empty()); + return Ok(bins.empty()); } - self.last_bin_ts2 = Some(TsNano::from_ns(ts2)); + self.last_bin_ts2 = Some(ts2); } Ok(bins) } @@ -270,10 +265,12 @@ impl GapFill { self.events_read_provider.clone(), )?; let stream = Box::pin(inp_finer); - let do_time_weight = self.do_time_weight; let range = BinnedRange::from_nano_range(range_finer.full_range(), self.range.bin_len.to_dt_ms()); - let stream = - super::basic::TimeBinnedStream::new(stream, netpod::BinnedRangeEnum::Time(range), do_time_weight); + let stream = if self.do_time_weight { + ::items_2::binning::timeweight::timeweight_bins_dyn::BinnedBinsTimeweightStream::new(range, stream) + } else { + panic!("TODO unweighted") + }; self.inp_finer = Some(Box::pin(stream)); } else { debug_setup!("{} setup_inp_finer next finer from events {}", self.dbgname, range); @@ -309,7 +306,7 @@ impl GapFill { Ok(()) } - fn cache_write(mut self: Pin<&mut Self>, bins: BinsDim0) -> Result<(), Error> { + fn cache_write(mut self: Pin<&mut Self>, bins: BinsBoxed) -> Result<(), Error> { self.cache_writing = Some(self.cache_read_provider.write(self.series, bins)); Ok(()) } @@ -318,42 +315,27 @@ impl GapFill { if self.inp_finer_fills_gap { // TODO can consider all incoming bins as final by assumption. } - let aa = &self.bins_for_cache_write; - if aa.len() >= 2 { - for (i, (&c1, &_c2)) in aa.cnts.iter().rev().zip(aa.cnts.iter().rev().skip(1)).enumerate() { - if c1 != 0 { - let n = aa.len() - (1 + i); - debug_cache!("{} cache_write_on_end consider {} for write", self.dbgname, n); - let mut bins_write = BinsDim0::empty(); - self.bins_for_cache_write.drain_into(&mut bins_write, 0..n); - self.cache_write(bins_write)?; - break; - } + if let Some(bins) = &self.bins_for_cache_write { + if bins.len() >= 2 { + // TODO guard behind flag. + // TODO emit to a async user-given channel, if given. + // Therefore, move to poll loop. + // Should only write to cache with non-zero count, therefore, not even emit others? + // TODO afterwards set to None. + self.bins_for_cache_write = None; } } Ok(()) } - fn cache_write_intermediate(mut self: Pin<&mut Self>) -> Result<(), Error> { - let aa = &self.bins_for_cache_write; - if aa.len() >= 2 { - for (i, (&c1, &_c2)) in aa.cnts.iter().rev().zip(aa.cnts.iter().rev().skip(1)).enumerate() { - if c1 != 0 { - let n = aa.len() - (1 + i); - debug_cache!("{} cache_write_intermediate consider {} for write", self.dbgname, n); - let mut bins_write = BinsDim0::empty(); - self.bins_for_cache_write.drain_into(&mut bins_write, 0..n); - self.cache_write(bins_write)?; - break; - } - } - } + fn cache_write_intermediate(self: Pin<&mut Self>) -> Result<(), Error> { + // TODO See cache_write_on_end Ok(()) } } impl Stream for GapFill { - type Item = Sitemty>; + type Item = Sitemty; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; diff --git a/crates/streams/src/timebinnedjson.rs b/crates/streams/src/timebinnedjson.rs index ae3bb45..6ba5478 100644 --- a/crates/streams/src/timebinnedjson.rs +++ b/crates/streams/src/timebinnedjson.rs @@ -361,9 +361,9 @@ async fn timebinned_stream( ) .map_err(Error::from_string)?; let stream = stream.map(|item| { - on_sitemty_data!(item, |k| Ok(StreamItem::DataItem(RangeCompletableItem::Data( - Box::new(k) as Box - )))) + on_sitemty_data!(item, |k: items_0::timebin::BinsBoxed| Ok(StreamItem::DataItem( + RangeCompletableItem::Data(k.to_old_time_binned()) + ))) }); let stream: Pin>> + Send>> = Box::pin(stream); Ok(stream)