From 4694f987583038a6364b98fbfe35540eab0f302d Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 8 Feb 2023 16:53:18 +0100 Subject: [PATCH] WIP query options --- disk/src/dataopen.rs | 18 +- disk/src/decode.rs | 5 + disk/src/eventblobs.rs | 331 +++++++++++++------------ disk/src/merge.rs | 3 +- dq/src/bin/dq.rs | 12 +- httpret/src/api1.rs | 6 +- httpret/src/api4/events.rs | 8 +- items/src/eventfull.rs | 38 ++- items/src/items.rs | 20 -- items_0/src/items_0.rs | 14 +- items_2/src/eventsdim0.rs | 70 +++++- items_2/src/eventsdim1.rs | 21 +- items_2/src/eventsxbindim0.rs | 23 +- items_2/src/items_2.rs | 24 +- items_2/src/merger.rs | 2 +- netpod/src/query.rs | 15 +- streams/src/collect.rs | 2 +- streams/src/eventchunker.rs | 35 ++- streams/src/frames/eventsfromframes.rs | 10 +- streams/src/lib.rs | 1 + streams/src/plaineventsjson.rs | 20 +- streams/src/rangefilter2.rs | 258 +++++++++++++++++++ 22 files changed, 664 insertions(+), 272 deletions(-) create mode 100644 streams/src/rangefilter2.rs diff --git a/disk/src/dataopen.rs b/disk/src/dataopen.rs index f599545..b6c6f1e 100644 --- a/disk/src/dataopen.rs +++ b/disk/src/dataopen.rs @@ -215,7 +215,8 @@ pub fn open_files( match chtx.send(Err(e.into())).await { Ok(_) => {} Err(e) => { - error!("open_files channel send error {:?}", e); + // This case is fine. + debug!("open_files channel send error {:?}", e); } } } @@ -245,7 +246,7 @@ async fn open_files_inner( if ts_bin.ns + channel_config.time_bin_size.ns <= range.beg { continue; } - let mut a = vec![]; + let mut a = Vec::new(); for path in paths::datapaths_for_timebin(tb, &channel_config, &node).await? { let w = position_file(&path, range, false, false).await?; if w.found { @@ -281,7 +282,8 @@ pub fn open_expanded_files( Err(e) => match chtx.send(Err(e.into())).await { Ok(_) => {} Err(e) => { - error!("open_files channel send error {:?}", e); + // To be expected + debug!("open_files channel send error {:?}", e); } }, } @@ -290,7 +292,7 @@ pub fn open_expanded_files( } async fn get_timebins(channel_config: &ChannelConfig, node: Node) -> Result, Error> { - let mut timebins = vec![]; + let mut timebins = Vec::new(); let p0 = paths::channel_timebins_dir_path(&channel_config, &node)?; match tokio::fs::read_dir(&p0).await { Ok(rd) => { @@ -317,7 +319,7 @@ async fn get_timebins(channel_config: &ChannelConfig, node: Node) -> Result, + range_final: bool, + done: bool, + done_emit_range_final: bool, + complete: bool, } impl EventChunkerMultifile { @@ -57,7 +58,7 @@ impl EventChunkerMultifile { expand: bool, do_decompress: bool, ) -> Self { - info!("EventChunkerMultifile do_decompress {do_decompress}"); + info!("EventChunkerMultifile expand {expand} do_decompress {do_decompress}"); let file_chan = if expand { open_expanded_files(&range, &channel_config, node) } else { @@ -70,9 +71,6 @@ impl EventChunkerMultifile { event_chunker_conf, channel_config, range, - data_completed: false, - errored: false, - completed: false, files_count: 0, node_ix, expand, @@ -80,6 +78,10 @@ impl EventChunkerMultifile { max_ts: 0, emit_count: 0, do_emit_err_after: None, + range_final: false, + done: false, + done_emit_range_final: false, + complete: false, } } } @@ -91,170 +93,171 @@ impl Stream for EventChunkerMultifile { //tracing::field::DebugValue; let span1 = span!(Level::INFO, "EventChunkerMultifile", node_ix = self.node_ix); //span1.record("node_ix", &self.node_ix); - span1.in_scope(|| { - use Poll::*; - 'outer: loop { - break if self.completed { - panic!("EventChunkerMultifile poll_next on completed"); - } else if self.errored { - self.completed = true; - return Ready(None); - } else if self.data_completed { - self.completed = true; - return Ready(None); + let _spg = span1.enter(); + use Poll::*; + 'outer: loop { + break if self.complete { + panic!("EventChunkerMultifile poll_next on complete"); + } else if self.done_emit_range_final { + self.complete = true; + Ready(None) + } else if self.done { + self.done_emit_range_final = true; + if self.range_final { + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) } else { - match &mut self.evs { - Some(evs) => match evs.poll_next_unpin(cx) { - Ready(Some(k)) => { - if let Ok(StreamItem::DataItem(RangeCompletableItem::Data(h))) = &k { - if let Some(&g) = h.tss.back() { - if g == self.max_ts { - let msg = format!("EventChunkerMultifile repeated ts {}", g); - error!("{}", msg); - let e = Error::with_msg(msg); - self.errored = true; - Ready(Some(Err(e))) - } else if g < self.max_ts { - let msg = format!("EventChunkerMultifile unordered ts {}", g); - error!("{}", msg); - let e = Error::with_msg(msg); - self.errored = true; - Ready(Some(Err(e))) - } else { - self.max_ts = g; - if let Some(after) = self.do_emit_err_after { - if self.emit_count < after { - debug!( - "EventChunkerMultifile emit {}/{} events {}", - self.emit_count, - after, - h.tss.len() - ); - self.emit_count += 1; - Ready(Some(k)) - } else { - warn!("GENERATE ERROR FOR TESTING PURPOSE"); - let e = Error::with_msg(format!("Private-error-message")); - let e = e.add_public_msg(format!("Public-error-message")); - Ready(Some(Err(e))) - } - } else { - Ready(Some(k)) - } - } - } else { - Ready(Some(k)) + continue; + } + } else { + match &mut self.evs { + Some(evs) => match evs.poll_next_unpin(cx) { + Ready(Some(Ok(k))) => { + let k = if let StreamItem::DataItem(RangeCompletableItem::Data(h)) = k { + let mut h: EventFull = h; + if h.tss.len() > 0 { + let min = h.tss.iter().fold(u64::MAX, |a, &x| a.min(x)); + let max = h.tss.iter().fold(u64::MIN, |a, &x| a.max(x)); + if min <= self.max_ts { + let msg = format!("EventChunkerMultifile repeated or unordered ts {}", min); + error!("{}", msg); } - } else if let Err(_) = &k { - self.errored = true; - Ready(Some(k)) + self.max_ts = max; + if let Some(after) = self.do_emit_err_after { + if self.emit_count < after { + debug!( + "EventChunkerMultifile emit {}/{} events {}", + self.emit_count, + after, + h.tss.len() + ); + self.emit_count += 1; + } + } + if max >= self.range.end { + info!("REACHED RANGE END, TRUNCATE"); + info!("{:20} ... {:20}", self.range.beg, self.range.end); + self.range_final = true; + h.truncate_ts(self.range.end); + self.evs = None; + let (tx, rx) = async_channel::bounded(1); + drop(tx); + self.file_chan = rx; + } + } + StreamItem::DataItem(RangeCompletableItem::Data(h)) + } else { + k + }; + Ready(Some(Ok(k))) + } + Ready(Some(Err(e))) => { + error!("{e}"); + self.done = true; + Ready(Some(Err(e))) + } + Ready(None) => { + self.evs = None; + continue 'outer; + } + Pending => Pending, + }, + None => match self.file_chan.poll_next_unpin(cx) { + Ready(Some(k)) => match k { + Ok(ofs) => { + self.files_count += ofs.files.len() as u32; + if ofs.files.len() == 1 { + let mut ofs = ofs; + let file = ofs.files.pop().unwrap(); + let path = file.path; + let msg = format!("handle OFS {:?}", ofs); + debug!("{}", msg); + let item = LogItem::quick(Level::INFO, msg); + match file.file { + Some(file) => { + let inp = Box::pin(crate::file_content_stream( + path.clone(), + file, + self.disk_io_tune.clone(), + )); + let chunker = EventChunker::from_event_boundary( + inp, + self.channel_config.clone(), + self.range.clone(), + self.event_chunker_conf.clone(), + path.clone(), + self.expand, + self.do_decompress, + format!("{:?}", path), + ); + let filtered = RangeFilter::new(chunker, self.range.clone(), self.expand); + self.evs = Some(Box::pin(filtered)); + } + None => {} + } + Ready(Some(Ok(StreamItem::Log(item)))) + } else if ofs.files.len() == 0 { + let msg = format!("handle OFS {:?} NO FILES", ofs); + debug!("{}", msg); + let item = LogItem::quick(Level::INFO, msg); + Ready(Some(Ok(StreamItem::Log(item)))) } else { - Ready(Some(k)) - } - } - Ready(None) => { - self.evs = None; - continue 'outer; - } - Pending => Pending, - }, - None => match self.file_chan.poll_next_unpin(cx) { - Ready(Some(k)) => match k { - Ok(ofs) => { - self.files_count += ofs.files.len() as u32; - if ofs.files.len() == 1 { - let mut ofs = ofs; - let file = ofs.files.pop().unwrap(); - let path = file.path; - let msg = format!("handle OFS {:?}", ofs); - debug!("{}", msg); - let item = LogItem::quick(Level::INFO, msg); - match file.file { - Some(file) => { - let inp = Box::pin(crate::file_content_stream( - path.clone(), - file, - self.disk_io_tune.clone(), - )); - let chunker = EventChunker::from_event_boundary( - inp, - self.channel_config.clone(), - self.range.clone(), - self.event_chunker_conf.clone(), - path, - self.expand, - self.do_decompress, - ); - let filtered = - RangeFilter::new(chunker, self.range.clone(), self.expand); - self.evs = Some(Box::pin(filtered)); - } - None => {} - } - Ready(Some(Ok(StreamItem::Log(item)))) - } else if ofs.files.len() == 0 { - let msg = format!("handle OFS {:?} NO FILES", ofs); - debug!("{}", msg); - let item = LogItem::quick(Level::INFO, msg); - Ready(Some(Ok(StreamItem::Log(item)))) - } else { - let msg = format!("handle OFS MERGED timebin {}", ofs.timebin); - info!("{}", msg); - for x in &ofs.files { - info!(" path {:?}", x.path); - } - let item = LogItem::quick(Level::INFO, msg); - let mut chunkers = Vec::new(); - for of in ofs.files { - if let Some(file) = of.file { - let inp = crate::file_content_stream( - of.path.clone(), - file, - self.disk_io_tune.clone(), - ); - let chunker = EventChunker::from_event_boundary( - inp, - self.channel_config.clone(), - self.range.clone(), - self.event_chunker_conf.clone(), - of.path, - self.expand, - self.do_decompress, - ); - chunkers.push(chunker); - } - } - let merged = MergedStream::new(chunkers); - let filtered = RangeFilter::new(merged, self.range.clone(), self.expand); - self.evs = Some(Box::pin(filtered)); - Ready(Some(Ok(StreamItem::Log(item)))) + let msg = format!("handle OFS MERGED timebin {}", ofs.timebin); + info!("{}", msg); + for x in &ofs.files { + info!(" path {:?}", x.path); } + let item = LogItem::quick(Level::INFO, msg); + let mut chunkers = Vec::new(); + for of in ofs.files { + if let Some(file) = of.file { + let inp = crate::file_content_stream( + of.path.clone(), + file, + self.disk_io_tune.clone(), + ); + let chunker = EventChunker::from_event_boundary( + inp, + self.channel_config.clone(), + self.range.clone(), + self.event_chunker_conf.clone(), + of.path.clone(), + self.expand, + self.do_decompress, + format!("{:?}", of.path), + ); + chunkers.push(chunker); + } + } + let merged = MergedStream::new(chunkers); + let filtered = RangeFilter::new(merged, self.range.clone(), self.expand); + self.evs = Some(Box::pin(filtered)); + Ready(Some(Ok(StreamItem::Log(item)))) } - Err(e) => { - self.errored = true; - Ready(Some(Err(e))) - } - }, - Ready(None) => { - self.data_completed = true; - let item = LogItem::quick( - Level::INFO, - format!( - "EventChunkerMultifile used {} datafiles beg {} end {} node_ix {}", - self.files_count, - self.range.beg / SEC, - self.range.end / SEC, - self.node_ix - ), - ); - Ready(Some(Ok(StreamItem::Log(item)))) } - Pending => Pending, + Err(e) => { + self.done = true; + Ready(Some(Err(e))) + } }, - } - }; - } - }) + Ready(None) => { + self.done = true; + let item = LogItem::quick( + Level::INFO, + format!( + "EventChunkerMultifile used {} datafiles beg {} end {} node_ix {}", + self.files_count, + self.range.beg / SEC, + self.range.end / SEC, + self.node_ix + ), + ); + Ready(Some(Ok(StreamItem::Log(item)))) + } + Pending => Pending, + }, + } + }; + } } } diff --git a/disk/src/merge.rs b/disk/src/merge.rs index d8521cb..97885ad 100644 --- a/disk/src/merge.rs +++ b/disk/src/merge.rs @@ -387,9 +387,10 @@ mod test { channel_config, range.clone(), stats_conf, - dbg_path, + dbg_path.clone(), expand, do_decompress, + format!("{:?}", dbg_path), ); chunker }) diff --git a/dq/src/bin/dq.rs b/dq/src/bin/dq.rs index 89e9b98..6519600 100644 --- a/dq/src/bin/dq.rs +++ b/dq/src/bin/dq.rs @@ -101,8 +101,16 @@ pub fn main() -> Result<(), Error> { let stats_conf = EventChunkerConf { disk_stats_every: ByteSize::mb(2), }; - let _chunks = - EventChunker::from_start(inp, channel_config.clone(), range, stats_conf, path, false, true); + let _chunks = EventChunker::from_start( + inp, + channel_config.clone(), + range, + stats_conf, + path.clone(), + false, + true, + format!("{:?}", path), + ); err::todo(); Ok(()) } diff --git a/httpret/src/api1.rs b/httpret/src/api1.rs index 75191a0..1025d93 100644 --- a/httpret/src/api1.rs +++ b/httpret/src/api1.rs @@ -706,12 +706,14 @@ impl DataApiPython3DataStream { } match &b.shapes[i1] { _ => { - let l1 = 17 + b.blobs[i1].len() as u32; + let empty_blob=Vec::new(); + let blob = b.blobs[i1].as_ref().unwrap_or(&empty_blob); + let l1 = 17 + blob.len() as u32; d.put_u32(l1); d.put_u8(1); d.put_u64(b.tss[i1]); d.put_u64(b.pulses[i1]); - d.put_slice(&b.blobs[i1]); + d.put_slice(&blob); d.put_u32(l1); } } diff --git a/httpret/src/api4/events.rs b/httpret/src/api4/events.rs index 663a568..75b798b 100644 --- a/httpret/src/api4/events.rs +++ b/httpret/src/api4/events.rs @@ -72,9 +72,10 @@ async fn plain_events_binary( req: Request, node_config: &NodeConfigCached, ) -> Result, Error> { - debug!("httpret plain_events_binary req: {:?}", req); + debug!("plain_events_binary req: {:?}", req); let query = PlainEventsQuery::from_url(&url).map_err(|e| e.add_public_msg(format!("Can not understand query")))?; let chconf = chconf_from_events_v1(&query, node_config).await?; + info!("plain_events_binary chconf_from_events_v1: {chconf:?}"); // Update the series id since we don't require some unique identifier yet. let mut query = query; query.set_series_id(chconf.series); @@ -94,11 +95,12 @@ async fn plain_events_json( req: Request, node_config: &NodeConfigCached, ) -> Result, Error> { - info!("httpret plain_events_json req: {:?}", req); + info!("plain_events_json req: {:?}", req); let (_head, _body) = req.into_parts(); let query = PlainEventsQuery::from_url(&url)?; - info!("plain_events_json query {query:?}"); + info!("plain_events_json query {query:?}"); let chconf = chconf_from_events_v1(&query, node_config).await.map_err(Error::from)?; + info!("plain_events_json chconf_from_events_v1: {chconf:?}"); // Update the series id since we don't require some unique identifier yet. let mut query = query; query.set_series_id(chconf.series); diff --git a/items/src/eventfull.rs b/items/src/eventfull.rs index f1ff9b8..19bdd37 100644 --- a/items/src/eventfull.rs +++ b/items/src/eventfull.rs @@ -20,16 +20,17 @@ use std::collections::VecDeque; pub struct EventFull { pub tss: VecDeque, pub pulses: VecDeque, - pub blobs: VecDeque>, - #[serde(with = "decomps_serde")] + pub blobs: VecDeque>>, + //#[serde(with = "decomps_serde")] // TODO allow access to `decomps` via method which checks first if `blobs` is already the decomp. - pub decomps: VecDeque>, + pub decomps: VecDeque>>, pub scalar_types: VecDeque, pub be: VecDeque, pub shapes: VecDeque, pub comps: VecDeque>, } +#[allow(unused)] mod decomps_serde { use super::*; @@ -85,8 +86,8 @@ impl EventFull { &mut self, ts: u64, pulse: u64, - blob: Vec, - decomp: Option, + blob: Option>, + decomp: Option>, scalar_type: ScalarType, be: bool, shape: Shape, @@ -102,11 +103,26 @@ impl EventFull { self.comps.push_back(comp); } - pub fn decomp(&self, i: usize) -> &[u8] { - match &self.decomps[i] { - Some(decomp) => &decomp, - None => &self.blobs[i], + pub fn truncate_ts(&mut self, end: u64) { + let mut nkeep = usize::MAX; + for (i, &ts) in self.tss.iter().enumerate() { + if ts >= end { + for (i, &ts) in self.tss.iter().enumerate() { + eprintln!("{i:5} {ts:20}"); + } + eprintln!("truncate to i {i} ts {ts}"); + nkeep = i; + break; + } } + self.tss.truncate(nkeep); + self.pulses.truncate(nkeep); + self.blobs.truncate(nkeep); + self.decomps.truncate(nkeep); + self.scalar_types.truncate(nkeep); + self.be.truncate(nkeep); + self.shapes.truncate(nkeep); + self.comps.truncate(nkeep); } } @@ -170,13 +186,13 @@ impl WithTimestamps for EventFull { impl ByteEstimate for EventFull { fn byte_estimate(&self) -> u64 { - if self.tss.len() == 0 { + if self.len() == 0 { 0 } else { // TODO that is clumsy... it assumes homogenous types. // TODO improve via a const fn on NTY let decomp_len = self.decomps[0].as_ref().map_or(0, |h| h.len()); - self.tss.len() as u64 * (40 + self.blobs[0].len() as u64 + decomp_len as u64) + self.tss.len() as u64 * (40 + self.blobs[0].as_ref().map_or(0, |x| x.len()) as u64 + decomp_len as u64) } } } diff --git a/items/src/items.rs b/items/src/items.rs index 5223d09..6e91b85 100644 --- a/items/src/items.rs +++ b/items/src/items.rs @@ -16,8 +16,6 @@ use items_0::AsAnyRef; use netpod::log::Level; #[allow(unused)] use netpod::log::*; -use netpod::timeunits::MS; -use netpod::timeunits::SEC; use netpod::DiskStats; use netpod::EventDataReadStats; use netpod::NanoRange; @@ -657,24 +655,6 @@ where } } -pub fn ts_offs_from_abs(tss: &[u64]) -> (u64, Vec, Vec) { - let ts_anchor_sec = tss.first().map_or(0, |&k| k) / SEC; - let ts_anchor_ns = ts_anchor_sec * SEC; - let ts_off_ms: Vec<_> = tss.iter().map(|&k| (k - ts_anchor_ns) / MS).collect(); - let ts_off_ns = tss - .iter() - .zip(ts_off_ms.iter().map(|&k| k * MS)) - .map(|(&j, k)| (j - ts_anchor_ns - k)) - .collect(); - (ts_anchor_sec, ts_off_ms, ts_off_ns) -} - -pub fn pulse_offs_from_abs(pulse: &[u64]) -> (u64, Vec) { - let pulse_anchor = pulse.first().map_or(0, |k| *k); - let pulse_off: Vec<_> = pulse.iter().map(|k| *k - pulse_anchor).collect(); - (pulse_anchor, pulse_off) -} - pub trait TimeBinnableTypeAggregator: Send { type Input: TimeBinnableType; type Output: TimeBinnableType; diff --git a/items_0/src/items_0.rs b/items_0/src/items_0.rs index d27d5bd..8f42a64 100644 --- a/items_0/src/items_0.rs +++ b/items_0/src/items_0.rs @@ -53,6 +53,10 @@ pub trait Empty { fn empty() -> Self; } +pub trait TypeName { + fn type_name(&self) -> String; +} + pub trait AppendEmptyBin { fn append_empty_bin(&mut self, ts1: u64, ts2: u64); } @@ -129,7 +133,15 @@ pub trait TimeBinnable: fmt::Debug + WithLen + RangeOverlapInfo + Any + AsAnyRef /// Container of some form of events, for use as trait object. pub trait Events: - fmt::Debug + Any + Collectable + CollectableWithDefault + TimeBinnable + WithLen + Send + erased_serde::Serialize + fmt::Debug + + TypeName + + Any + + Collectable + + CollectableWithDefault + + TimeBinnable + + WithLen + + Send + + erased_serde::Serialize { fn as_time_binnable(&self) -> &dyn TimeBinnable; fn verify(&self) -> bool; diff --git a/items_2/src/eventsdim0.rs b/items_2/src/eventsdim0.rs index dff7598..5da5fba 100644 --- a/items_2/src/eventsdim0.rs +++ b/items_2/src/eventsdim0.rs @@ -1,17 +1,27 @@ use crate::binsdim0::BinsDim0; -use crate::{pulse_offs_from_abs, ts_offs_from_abs}; -use crate::{IsoDateTime, RangeOverlapInfo}; -use crate::{TimeBinnable, TimeBinnableType, TimeBinnableTypeAggregator, TimeBinner}; +use crate::IsoDateTime; +use crate::RangeOverlapInfo; +use crate::TimeBinnable; +use crate::TimeBinnableType; +use crate::TimeBinnableTypeAggregator; +use crate::TimeBinner; use err::Error; use items_0::scalar_ops::ScalarOps; -use items_0::{AsAnyMut, AsAnyRef, Empty, Events, WithLen}; +use items_0::AsAnyMut; +use items_0::AsAnyRef; +use items_0::Empty; +use items_0::Events; +use items_0::WithLen; +use netpod::log::*; use netpod::timeunits::SEC; +use netpod::BinnedRange; use netpod::NanoRange; -use netpod::{log::*, BinnedRange}; -use serde::{Deserialize, Serialize}; +use serde::Deserialize; +use serde::Serialize; use std::any::Any; use std::collections::VecDeque; -use std::{fmt, mem}; +use std::fmt; +use std::mem; #[allow(unused)] macro_rules! trace2 { @@ -197,6 +207,7 @@ pub struct EventsDim0CollectorOutput { timed_out: bool, #[serde(rename = "continueAt", default, skip_serializing_if = "Option::is_none")] continue_at: Option, + dummy_marker: u32, } impl EventsDim0CollectorOutput { @@ -232,6 +243,33 @@ impl EventsDim0CollectorOutput { pub fn timed_out(&self) -> bool { self.timed_out } + + pub fn is_valid(&self) -> bool { + if self.ts_off_ms.len() != self.ts_off_ns.len() { + false + } else if self.ts_off_ms.len() != self.pulse_off.len() { + false + } else if self.ts_off_ms.len() != self.values.len() { + false + } else { + true + } + } + + pub fn info_str(&self) -> String { + use fmt::Write; + let mut out = String::new(); + write!( + out, + "ts_off_ms {} ts_off_ns {} pulse_off {} values {}", + self.ts_off_ms.len(), + self.ts_off_ns.len(), + self.pulse_off.len(), + self.values.len(), + ) + .unwrap(); + out + } } impl AsAnyRef for EventsDim0CollectorOutput @@ -281,6 +319,8 @@ impl items_0::collect_s::CollectorType for EventsDim0Collector items_0::collect_s::CollectorType for EventsDim0Collector items_0::collect_s::CollectorType for EventsDim0Collector TimeBinnable for EventsDim0 { } } +impl items_0::TypeName for EventsDim0 { + fn type_name(&self) -> String { + let sty = std::any::type_name::(); + format!("EventsDim0<{sty}>") + } +} + impl Events for EventsDim0 { fn as_time_binnable(&self) -> &dyn TimeBinnable { self as &dyn TimeBinnable diff --git a/items_2/src/eventsdim1.rs b/items_2/src/eventsdim1.rs index ebf6aa6..b485b48 100644 --- a/items_2/src/eventsdim1.rs +++ b/items_2/src/eventsdim1.rs @@ -1,6 +1,4 @@ use crate::binsdim0::BinsDim0; -use crate::pulse_offs_from_abs; -use crate::ts_offs_from_abs; use crate::IsoDateTime; use crate::RangeOverlapInfo; use crate::TimeBinnable; @@ -10,12 +8,16 @@ use crate::TimeBinner; use err::Error; use items_0::scalar_ops::ScalarOps; use items_0::AsAnyMut; -use items_0::{AsAnyRef, Empty, Events, WithLen}; +use items_0::AsAnyRef; +use items_0::Empty; +use items_0::Events; +use items_0::WithLen; use netpod::log::*; use netpod::timeunits::SEC; use netpod::BinnedRange; use netpod::NanoRange; -use serde::{Deserialize, Serialize}; +use serde::Deserialize; +use serde::Serialize; use std::any::Any; use std::collections::VecDeque; use std::fmt; @@ -313,8 +315,8 @@ impl items_0::collect_s::CollectorType for EventsDim1Collector TimeBinnable for EventsDim1 { } } +impl items_0::TypeName for EventsDim1 { + fn type_name(&self) -> String { + let sty = std::any::type_name::(); + format!("EventsDim1<{sty}>") + } +} + impl Events for EventsDim1 { fn as_time_binnable(&self) -> &dyn TimeBinnable { self as &dyn TimeBinnable diff --git a/items_2/src/eventsxbindim0.rs b/items_2/src/eventsxbindim0.rs index c4215cd..cb95027 100644 --- a/items_2/src/eventsxbindim0.rs +++ b/items_2/src/eventsxbindim0.rs @@ -1,14 +1,19 @@ use crate::binsxbindim0::BinsXbinDim0; -use crate::{pulse_offs_from_abs, ts_offs_from_abs}; -use crate::{IsoDateTime, RangeOverlapInfo}; -use crate::{TimeBinnableType, TimeBinnableTypeAggregator}; +use crate::IsoDateTime; +use crate::RangeOverlapInfo; +use crate::TimeBinnableType; +use crate::TimeBinnableTypeAggregator; use err::Error; use items_0::scalar_ops::ScalarOps; -use items_0::{AsAnyMut, WithLen}; -use items_0::{AsAnyRef, Empty}; +use items_0::AsAnyMut; +use items_0::AsAnyRef; +use items_0::Empty; +use items_0::WithLen; +use netpod::log::*; +use netpod::BinnedRange; use netpod::NanoRange; -use netpod::{log::*, BinnedRange}; -use serde::{Deserialize, Serialize}; +use serde::Deserialize; +use serde::Serialize; use std::any::Any; use std::collections::VecDeque; use std::fmt; @@ -463,8 +468,8 @@ where let avgs = replace(&mut self.vals.avgs, VecDeque::new()); self.vals.tss.make_contiguous(); self.vals.pulses.make_contiguous(); - let tst = ts_offs_from_abs(self.vals.tss.as_slices().0); - let (pulse_anchor, pulse_off) = pulse_offs_from_abs(&self.vals.pulses.as_slices().0); + let tst = crate::ts_offs_from_abs(self.vals.tss.as_slices().0); + let (pulse_anchor, pulse_off) = crate::pulse_offs_from_abs(&self.vals.pulses.as_slices().0); let ret = Self::Output { ts_anchor_sec: tst.0, ts_off_ms: tst.1, diff --git a/items_2/src/items_2.rs b/items_2/src/items_2.rs index 3174549..982f8c4 100644 --- a/items_2/src/items_2.rs +++ b/items_2/src/items_2.rs @@ -14,7 +14,9 @@ pub mod testgen; pub mod timebin; use channelevents::ChannelEvents; -use chrono::{DateTime, TimeZone, Utc}; +use chrono::DateTime; +use chrono::TimeZone; +use chrono::Utc; use futures_util::FutureExt; use futures_util::Stream; use futures_util::StreamExt; @@ -26,11 +28,17 @@ use items_0::collect_s::ToJsonResult; use items_0::Empty; use items_0::Events; use items_0::RangeOverlapInfo; -use items_0::{TimeBinnable, TimeBinner}; +use items_0::TimeBinnable; +use items_0::TimeBinner; use netpod::log::*; use netpod::timeunits::*; -use netpod::{AggKind, NanoRange, ScalarType, Shape}; -use serde::{Deserialize, Serialize, Serializer}; +use netpod::AggKind; +use netpod::NanoRange; +use netpod::ScalarType; +use netpod::Shape; +use serde::Deserialize; +use serde::Serialize; +use serde::Serializer; use std::collections::VecDeque; use std::fmt; use std::pin::Pin; @@ -41,9 +49,9 @@ pub fn bool_is_false(x: &bool) -> bool { *x == false } -// TODO take iterator instead of slice, because a VecDeque can't produce a slice in general. pub fn ts_offs_from_abs(tss: &[u64]) -> (u64, VecDeque, VecDeque) { let ts_anchor_sec = tss.first().map_or(0, |&k| k) / SEC; + info!("ts_offs_from_abs ts_anchor_sec {ts_anchor_sec}"); let ts_anchor_ns = ts_anchor_sec * SEC; let ts_off_ms: VecDeque<_> = tss.iter().map(|&k| (k - ts_anchor_ns) / MS).collect(); let ts_off_ns = tss @@ -65,9 +73,13 @@ pub fn ts_offs_from_abs_with_anchor(ts_anchor_sec: u64, tss: &[u64]) -> (VecDequ (ts_off_ms, ts_off_ns) } -// TODO take iterator instead of slice, because a VecDeque can't produce a slice in general. pub fn pulse_offs_from_abs(pulse: &[u64]) -> (u64, VecDeque) { + error!("pulse_offs_from_abs {} DATA", pulse.len()); + for x in pulse { + error!("{x}"); + } let pulse_anchor = pulse.first().map_or(0, |&k| k) / 10000 * 10000; + info!("pulse_offs_from_abs pulse_anchor {pulse_anchor}"); let pulse_off = pulse.iter().map(|&k| k - pulse_anchor).collect(); (pulse_anchor, pulse_off) } diff --git a/items_2/src/merger.rs b/items_2/src/merger.rs index 08cea99..4031fe2 100644 --- a/items_2/src/merger.rs +++ b/items_2/src/merger.rs @@ -159,7 +159,7 @@ where } } } - trace4!("tslows {tslows:?}"); + info!("tslows {tslows:?}"); if let Some((il0, _tl0)) = tslows[0] { if let Some((_il1, tl1)) = tslows[1] { // There is a second input, take only up to the second highest timestamp diff --git a/netpod/src/query.rs b/netpod/src/query.rs index b842ea5..116be68 100644 --- a/netpod/src/query.rs +++ b/netpod/src/query.rs @@ -182,6 +182,19 @@ impl PlainEventsQuery { pub fn set_do_test_stream_error(&mut self, k: bool) { self.do_test_stream_error = k; } + + // TODO remove again. + pub fn adjust_for_events_query(&mut self) { + match &self.agg_kind { + AggKind::EventBlobs => {} + AggKind::DimXBins1 => {} + AggKind::DimXBinsN(_) => {} + AggKind::Plain => {} + AggKind::TimeWeightedScalar => { + self.agg_kind = AggKind::Plain; + } + } + } } impl HasBackend for PlainEventsQuery { @@ -211,7 +224,7 @@ impl FromUrl for PlainEventsQuery { beg: beg_date.parse::>()?.to_nanos(), end: end_date.parse::>()?.to_nanos(), }, - agg_kind: agg_kind_from_binning_scheme(&pairs).unwrap_or(AggKind::TimeWeightedScalar), + agg_kind: agg_kind_from_binning_scheme(&pairs).unwrap_or(AggKind::Plain), timeout: pairs .get("timeout") .map_or("10000", |k| k) diff --git a/streams/src/collect.rs b/streams/src/collect.rs index 6a3820b..bd7883a 100644 --- a/streams/src/collect.rs +++ b/streams/src/collect.rs @@ -78,7 +78,7 @@ where let coll = collector.as_mut().unwrap(); coll.ingest(&mut item); if coll.len() as u64 >= events_max { - warn!("Reached events_max {} abort", events_max); + warn!("TODO compute continue-at reached events_max {} abort", events_max); break; } } diff --git a/streams/src/eventchunker.rs b/streams/src/eventchunker.rs index 7ff7302..ef052f0 100644 --- a/streams/src/eventchunker.rs +++ b/streams/src/eventchunker.rs @@ -40,6 +40,7 @@ pub struct EventChunker { seen_after_range_count: usize, unordered_warn_count: usize, repeated_ts_warn_count: usize, + dbgdesc: String, } impl Drop for EventChunker { @@ -83,6 +84,7 @@ impl EventChunker { dbg_path: PathBuf, expand: bool, do_decompress: bool, + dbgdesc: String, ) -> Self { trace!("EventChunker::from_start"); let mut inp = NeedMinBuffer::new(inp); @@ -111,6 +113,7 @@ impl EventChunker { seen_after_range_count: 0, unordered_warn_count: 0, repeated_ts_warn_count: 0, + dbgdesc, } } @@ -123,8 +126,18 @@ impl EventChunker { dbg_path: PathBuf, expand: bool, do_decompress: bool, + dbgdesc: String, ) -> Self { - let mut ret = Self::from_start(inp, channel_config, range, stats_conf, dbg_path, expand, do_decompress); + let mut ret = Self::from_start( + inp, + channel_config, + range, + stats_conf, + dbg_path, + expand, + do_decompress, + dbgdesc, + ); ret.state = DataFileState::Event; ret.need_min = 4; ret.inp.set_need_min(4); @@ -192,6 +205,7 @@ impl EventChunker { let _ttl = sl.read_i64::().unwrap(); let ts = sl.read_i64::().unwrap() as u64; let pulse = sl.read_i64::().unwrap() as u64; + info!("SEE {ts:20} {pulse:20} {0}", self.dbgdesc); if ts == self.max_ts { if self.repeated_ts_warn_count < 20 { let msg = format!( @@ -362,17 +376,16 @@ impl EventChunker { } } } + let data = &buf.as_ref()[(p1 as usize)..(p1 as usize + k1 as usize)]; let decomp = { if self.do_decompress { + assert!(data.len() > 12); let ts1 = Instant::now(); let decomp_bytes = (type_size * ele_count as u32) as usize; - let mut decomp = BytesMut::with_capacity(decomp_bytes); - unsafe { - decomp.set_len(decomp_bytes); - } + let mut decomp = vec![0; decomp_bytes]; // TODO limit the buf slice range match bitshuffle_decompress( - &buf.as_ref()[(p1 as usize + 12)..(p1 as usize + k1 as usize)], + &data[12..], &mut decomp, ele_count as usize, ele_size as usize, @@ -382,6 +395,7 @@ impl EventChunker { assert!(c1 as u64 + 12 == k1); let ts2 = Instant::now(); let dt = ts2.duration_since(ts1); + // TODO analyze the histo self.decomp_dt_histo.ingest(dt.as_secs() as u32 + dt.subsec_micros()); Some(decomp) } @@ -396,7 +410,7 @@ impl EventChunker { ret.add_event( ts, pulse, - buf.as_ref()[(p1 as usize)..(p1 as usize + k1 as usize)].to_vec(), + Some(data.to_vec()), decomp, ScalarType::from_dtype_index(type_index)?, is_big_endian, @@ -409,13 +423,12 @@ impl EventChunker { Err(Error::with_msg(msg))?; } let vlen = len - p1 as u32 - 4; - // TODO in this case, decomp and comp is the same and not needed. - let decomp = BytesMut::from(&buf[p1 as usize..(p1 as u32 + vlen) as usize]); + let data = &buf[p1 as usize..(p1 as u32 + vlen) as usize]; ret.add_event( ts, pulse, - buf.as_ref()[(p1 as usize)..(p1 as usize + k1 as usize)].to_vec(), - Some(decomp), + Some(data.to_vec()), + Some(data.to_vec()), ScalarType::from_dtype_index(type_index)?, is_big_endian, shape_this, diff --git a/streams/src/frames/eventsfromframes.rs b/streams/src/frames/eventsfromframes.rs index ad7c823..ff6237f 100644 --- a/streams/src/frames/eventsfromframes.rs +++ b/streams/src/frames/eventsfromframes.rs @@ -1,12 +1,16 @@ use super::inmem::InMemoryFrameAsyncReadStream; -use futures_util::{Stream, StreamExt}; +use futures_util::Stream; +use futures_util::StreamExt; use items::frame::decode_frame; -use items::{FrameTypeInnerStatic, Sitemty, StreamItem}; +use items::FrameTypeInnerStatic; +use items::Sitemty; +use items::StreamItem; use netpod::log::*; use serde::de::DeserializeOwned; use std::marker::PhantomData; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::Context; +use std::task::Poll; use tokio::io::AsyncRead; pub struct EventsFromFrames diff --git a/streams/src/lib.rs b/streams/src/lib.rs index 44ef8d1..fb12678 100644 --- a/streams/src/lib.rs +++ b/streams/src/lib.rs @@ -7,6 +7,7 @@ pub mod merge; pub mod needminbuffer; pub mod plaineventsjson; pub mod rangefilter; +pub mod rangefilter2; pub mod slidebuf; pub mod tcprawclient; #[cfg(test)] diff --git a/streams/src/plaineventsjson.rs b/streams/src/plaineventsjson.rs index b6fb73d..b1f212d 100644 --- a/streams/src/plaineventsjson.rs +++ b/streams/src/plaineventsjson.rs @@ -3,6 +3,7 @@ use err::Error; use futures_util::stream; use futures_util::StreamExt; use items_2::channelevents::ChannelEvents; +use netpod::log::*; use netpod::query::PlainEventsQuery; use netpod::ChConf; use netpod::Cluster; @@ -18,24 +19,17 @@ pub async fn plain_events_json( // TODO remove magic constant let deadline = Instant::now() + query.timeout() + Duration::from_millis(1000); let events_max = query.events_max(); - let empty = items_2::empty_events_dyn_ev(&chconf.scalar_type, &chconf.shape, query.agg_kind())?; + let mut evquery = query.clone(); + evquery.adjust_for_events_query(); + let empty = items_2::empty_events_dyn_ev(&chconf.scalar_type, &chconf.shape, evquery.agg_kind())?; + info!("plain_events_json with empty item {}", empty.type_name()); let empty = ChannelEvents::Events(empty); let empty = items::sitem_data(empty); // TODO should be able to ask for data-events only, instead of mixed data and status events. - let inps = open_tcp_streams::<_, items_2::channelevents::ChannelEvents>(&query, cluster).await?; + let inps = open_tcp_streams::<_, items_2::channelevents::ChannelEvents>(&evquery, cluster).await?; //let inps = open_tcp_streams::<_, Box>(&query, cluster).await?; // TODO propagate also the max-buf-len for the first stage event reader: - #[cfg(NOTHING)] - let stream = { - let mut it = inps.into_iter(); - let inp0 = it.next().unwrap(); - let inp1 = it.next().unwrap(); - let inp2 = it.next().unwrap(); - let stream = inp0.chain(inp1).chain(inp2); - stream - }; - netpod::log::info!("plain_events_json with empty item {empty:?}"); - let stream = { items_2::merger::Merger::new(inps, 1) }; + let stream = items_2::merger::Merger::new(inps, 1024); let stream = stream::iter([empty]).chain(stream); let collected = crate::collect::collect(stream, deadline, events_max, Some(query.range().clone()), None).await?; let jsval = serde_json::to_value(&collected)?; diff --git a/streams/src/rangefilter2.rs b/streams/src/rangefilter2.rs new file mode 100644 index 0000000..6fe4223 --- /dev/null +++ b/streams/src/rangefilter2.rs @@ -0,0 +1,258 @@ +use err::Error; +use futures_util::Stream; +use futures_util::StreamExt; +use items::Appendable; +use items::Clearable; +use items::PushableIndex; +use items::RangeCompletableItem; +use items::Sitemty; +use items::StatsItem; +use items::StreamItem; +use items::WithTimestamps; +use netpod::log::*; +use netpod::NanoRange; +use netpod::Nanos; +use netpod::RangeFilterStats; +use std::fmt; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +pub struct RangeFilter2 +where + S: Stream> + Unpin, + ITY: WithTimestamps + PushableIndex + Appendable + Clearable + Unpin, +{ + inp: S, + range: NanoRange, + range_str: String, + expand: bool, + ts_max: u64, + stats: RangeFilterStats, + prerange: Option, + have_pre: bool, + have_range_complete: bool, + emitted_post: bool, + data_done: bool, + raco_done: bool, + done: bool, + complete: bool, + items_with_pre: usize, + items_with_post: usize, + items_with_unordered: usize, +} + +impl RangeFilter2 +where + S: Stream> + Unpin, + ITY: WithTimestamps + PushableIndex + Appendable + Clearable + Unpin, +{ + fn selfname() -> &'static str { + "RangeFilter2" + } + + pub fn new(inp: S, range: NanoRange, expand: bool) -> Self { + trace!("{}::new range: {:?} expand: {:?}", Self::selfname(), range, expand); + Self { + inp, + range_str: format!("{:?}", range), + range, + expand, + ts_max: 0, + stats: RangeFilterStats::new(), + prerange: None, + have_pre: false, + have_range_complete: false, + emitted_post: false, + data_done: false, + raco_done: false, + done: false, + complete: false, + items_with_pre: 0, + items_with_post: 0, + items_with_unordered: 0, + } + } +} + +impl RangeFilter2 +where + S: Stream> + Unpin, + ITY: WithTimestamps + PushableIndex + Appendable + Clearable + Unpin, +{ + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll::Item>> { + use Poll::*; + loop { + break if self.complete { + panic!("poll_next on complete"); + } else if self.done { + self.complete = true; + Ready(None) + } else if self.raco_done { + self.done = true; + let k = std::mem::replace(&mut self.stats, RangeFilterStats::new()); + let k = StatsItem::RangeFilterStats(k); + Ready(Some(Ok(StreamItem::Stats(k)))) + } else if self.data_done { + self.raco_done = true; + if self.have_range_complete { + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) + } else { + continue; + } + } else { + match self.inp.poll_next_unpin(cx) { + Ready(Some(item)) => match item { + Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) => { + let mut contains_pre = false; + let mut contains_post = false; + let mut contains_unordered = false; + let mut ret = item.empty_like_self(); + for i1 in 0..item.len() { + let ts = item.ts(i1); + if ts < self.ts_max { + contains_unordered = true; + if false { + self.done = true; + let msg = format!( + "unordered event i1 {} / {} ts {:?} ts_max {:?}", + i1, + item.len(), + Nanos::from_ns(ts), + Nanos::from_ns(self.ts_max) + ); + error!("{}", msg); + return Ready(Some(Err(Error::with_msg(msg)))); + } + } else { + self.ts_max = ts; + if ts < self.range.beg { + contains_pre = true; + if self.expand { + let mut prerange = if let Some(prerange) = self.prerange.take() { + prerange + } else { + item.empty_like_self() + }; + prerange.clear(); + prerange.push_index(&item, i1); + self.prerange = Some(prerange); + self.have_pre = true; + } + } else if ts >= self.range.end { + contains_post = true; + self.have_range_complete = true; + if self.expand { + if self.have_pre { + let prerange = if let Some(prerange) = &mut self.prerange { + prerange + } else { + panic!() + }; + ret.push_index(prerange, 0); + prerange.clear(); + self.have_pre = false; + } + if !self.emitted_post { + self.emitted_post = true; + ret.push_index(&item, i1); + //self.data_done = true; + } + } else { + //self.data_done = true; + } + } else { + if self.expand { + if self.have_pre { + let prerange = if let Some(prerange) = &mut self.prerange { + prerange + } else { + panic!() + }; + ret.push_index(prerange, 0); + prerange.clear(); + self.have_pre = false; + } + } + ret.push_index(&item, i1); + } + } + } + if contains_pre { + self.items_with_pre += 1; + } + if contains_post { + self.items_with_post += 1; + } + if contains_unordered { + self.items_with_unordered += 1; + } + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret))))) + } + Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) => { + self.have_range_complete = true; + continue; + } + k => Ready(Some(k)), + }, + Ready(None) => { + self.data_done = true; + if self.have_pre { + let prerange = if let Some(prerange) = &mut self.prerange { + prerange + } else { + panic!() + }; + let mut ret = prerange.empty_like_self(); + ret.push_index(&prerange, 0); + prerange.clear(); + self.have_pre = false; + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret))))) + } else { + continue; + } + } + Pending => Pending, + } + }; + } + } +} + +impl Stream for RangeFilter2 +where + S: Stream> + Unpin, + ITY: WithTimestamps + PushableIndex + Appendable + Clearable + Unpin, +{ + type Item = Sitemty; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let span1 = span!(Level::INFO, "RangeFilter", range = tracing::field::Empty); + span1.record("range", &self.range_str.as_str()); + span1.in_scope(|| Self::poll_next(self, cx)) + } +} + +impl fmt::Debug for RangeFilter2 +where + S: Stream> + Unpin, + ITY: WithTimestamps + PushableIndex + Appendable + Clearable + Unpin, +{ + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct(Self::selfname()) + .field("items_with_pre", &self.items_with_pre) + .field("items_with_post", &self.items_with_post) + .field("items_with_unordered", &self.items_with_unordered) + .finish() + } +} + +impl Drop for RangeFilter2 +where + S: Stream> + Unpin, + ITY: WithTimestamps + PushableIndex + Appendable + Clearable + Unpin, +{ + fn drop(&mut self) { + debug!("drop {} {:?}", Self::selfname(), self); + } +}