From dafe0a6e3b4b26a8c381a233c3a123c7a216859c Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Mon, 18 Oct 2021 21:41:35 +0200 Subject: [PATCH] Adapt empty generator --- archapp/Cargo.toml | 2 + archapp/src/archeng.rs | 138 +++++--- archapp/src/archeng/datablockstream.rs | 310 +++++++++++++++++ archapp/src/archeng/datastream.rs | 15 + archapp/src/events.rs | 1 + archapp/src/lib.rs | 456 ++++++++++++++++++++++++- daqbufp2/Cargo.toml | 3 +- daqbufp2/src/test/binnedbinary.rs | 1 + daqbufp2/src/test/events.rs | 1 + dbconn/Cargo.toml | 1 - disk/Cargo.toml | 2 +- disk/src/agg.rs | 1 - disk/src/agg/eventbatch.rs | 169 --------- disk/src/binned.rs | 22 -- disk/src/binned/pbv.rs | 67 ++-- disk/src/decode.rs | 23 +- disk/src/eventblobs.rs | 4 +- disk/src/eventchunker.rs | 2 +- disk/src/lib.rs | 2 - disk/src/merge.rs | 86 ++--- disk/src/mergeblobs.rs | 278 --------------- items/src/eventvalues.rs | 10 +- items/src/lib.rs | 23 +- items/src/minmaxavgbins.rs | 2 +- items/src/minmaxavgdim1bins.rs | 2 +- items/src/minmaxavgwavebins.rs | 2 +- items/src/waveevents.rs | 10 +- items/src/xbinnedscalarevents.rs | 2 +- items/src/xbinnedwaveevents.rs | 2 +- netpod/src/lib.rs | 51 ++- streams/Cargo.toml | 19 ++ streams/src/lib.rs | 1 + {disk => streams}/src/rangefilter.rs | 58 +++- taskrun/src/lib.rs | 2 +- 34 files changed, 1129 insertions(+), 639 deletions(-) create mode 100644 archapp/src/archeng/datablockstream.rs create mode 100644 archapp/src/archeng/datastream.rs delete mode 100644 disk/src/agg/eventbatch.rs delete mode 100644 disk/src/mergeblobs.rs create mode 100644 streams/Cargo.toml create mode 100644 streams/src/lib.rs rename {disk => streams}/src/rangefilter.rs (67%) diff --git a/archapp/Cargo.toml b/archapp/Cargo.toml index e28b6b9..596d8ba 100644 --- a/archapp/Cargo.toml +++ b/archapp/Cargo.toml @@ -13,6 +13,7 @@ bytes = "1.0.1" serde = "1.0.126" serde_derive = "1.0.126" serde_json = "1.0.64" +bincode = "1.3.3" chrono = "0.4.19" protobuf = "2.24.1" async-channel = "1.6" @@ -22,6 +23,7 @@ taskrun = { path = "../taskrun" } netpod = { path = "../netpod" } dbconn = { path = "../dbconn" } items = { path = "../items" } +streams = { path = "../streams" } [features] default = ["devread"] diff --git a/archapp/src/archeng.rs b/archapp/src/archeng.rs index d6c1a75..96ee144 100644 --- a/archapp/src/archeng.rs +++ b/archapp/src/archeng.rs @@ -1,3 +1,6 @@ +pub mod datablockstream; +pub mod datastream; + use crate::{EventsItem, PlainEvents, ScalarPlainEvents}; use async_channel::{Receiver, Sender}; use err::Error; @@ -5,7 +8,7 @@ use futures_core::Future; use futures_util::StreamExt; use items::eventvalues::EventValues; use netpod::timeunits::SEC; -use netpod::{log::*, ChannelArchiver, FilePos, Nanos}; +use netpod::{log::*, ChannelArchiver, DataHeaderPos, FilePos, Nanos}; use std::convert::TryInto; use std::io::{self, SeekFrom}; use std::path::PathBuf; @@ -224,6 +227,21 @@ fn readf64(buf: &[u8], pos: usize) -> f64 { f64::from_be_bytes(buf.as_ref()[pos..pos + 8].try_into().unwrap()) } +fn read_string(buf: &[u8]) -> Result { + let imax = buf + .iter() + .map(|k| *k) + .enumerate() + .take_while(|&(i, k)| k != 0) + .last() + .map(|(i, _)| i); + let ret = match imax { + Some(imax) => String::from_utf8(buf[..imax + 1].to_vec())?, + None => String::new(), + }; + Ok(ret) +} + pub async fn read_file_basics(file: &mut File) -> Result { let mut buf = vec![0; 128]; read_exact(file, &mut buf[0..4]).await?; @@ -331,8 +349,8 @@ pub async fn read_file_basics(file: &mut File) -> Result #[derive(Debug)] pub struct RTreeNodeRecord { - ts1: u64, - ts2: u64, + ts1: Nanos, + ts2: Nanos, // TODO should probably be better name `child or offset` and be made enum. child_or_id: Offset, } @@ -351,6 +369,12 @@ pub struct RTreeNodeAtRecord { rix: usize, } +impl RTreeNodeAtRecord { + pub fn rec(&self) -> &RTreeNodeRecord { + &self.node.records[self.rix] + } +} + // TODO refactor as struct, rtree_m is a property of the tree. pub async fn read_rtree_node(file: &mut File, pos: FilePos, rtree_m: usize) -> Result { const OFF1: usize = 9; @@ -388,7 +412,11 @@ pub async fn read_rtree_node(file: &mut File, pos: FilePos, rtree_m: usize) -> R let child_or_id = readu64(b, off2 + 16); //info!("NODE {} {} {} {} {}", ts1a, ts1b, ts2a, ts2b, child_or_id); if child_or_id != 0 { - let rec = RTreeNodeRecord { ts1, ts2, child_or_id }; + let rec = RTreeNodeRecord { + ts1: Nanos { ns: ts1 }, + ts2: Nanos { ns: ts2 }, + child_or_id, + }; Some(rec) } else { None @@ -451,7 +479,7 @@ pub async fn search_record( let nr = node.records.len(); for (i, rec) in node.records.iter().enumerate() { //info!("looking at record i {}", i); - if rec.ts2 > beg.ns { + if rec.ts2.ns > beg.ns { if node.is_leaf { info!("found leaf match at {} / {}", i, nr); let ret = RTreeNodeAtRecord { node, rix: i }; @@ -676,12 +704,22 @@ pub fn datarange_stream(channel_name: &str) -> Result, Error #[derive(Debug)] pub struct Datablock { next: Offset, - data: Offset, + data_header_pos: Offset, fname: String, } +impl Datablock { + fn file_name(&self) -> &str { + &self.fname + } + + fn data_header_pos(&self) -> DataHeaderPos { + DataHeaderPos(self.data_header_pos) + } +} + async fn read_index_datablockref(file: &mut File, pos: FilePos) -> Result { - seek(file, SeekFrom::Start(pos.into())).await?; + seek(file, SeekFrom::Start(pos.pos)).await?; let mut rb = RingBuf::new(); rb.fill_min(file, 18).await?; let buf = rb.data(); @@ -691,7 +729,11 @@ async fn read_index_datablockref(file: &mut File, pos: FilePos) -> Result Result { - seek(file, SeekFrom::Start(pos.into())).await?; +async fn read_datafile_header(file: &mut File, pos: DataHeaderPos) -> Result { + seek(file, SeekFrom::Start(pos.0)).await?; let mut rb = RingBuf::new(); - rb.fill_min(file, 88).await?; + rb.fill_min(file, DATA_HEADER_LEN_ON_DISK).await?; let buf = rb.data(); let dir_offset = readu32(buf, 0); let next_offset = readu32(buf, 4); @@ -765,7 +813,7 @@ async fn read_datafile_header(file: &mut File, pos: FilePos) -> Result Result Result Result { - let datafile_header = read_datafile_header(file, pos).await?; - //info!("datafile_header {:?}", datafile_header); - seek(file, SeekFrom::Start(u64::from(pos) + DATA_HEADER_LEN_ON_DISK as u64)).await?; +async fn read_data_1(file: &mut File, datafile_header: &DatafileHeader) -> Result { + let dhpos = datafile_header.pos.0 + DATA_HEADER_LEN_ON_DISK as u64; + seek(file, SeekFrom::Start(dhpos)).await?; let res = match &datafile_header.dbr_type { DbrType::DbrTimeDouble => { if datafile_header.dbr_count == 1 { @@ -946,8 +1008,8 @@ mod test { //use disk::{eventblobs::EventChunkerMultifile, eventchunker::EventChunkerConf}; use super::search_record; - use crate::archeng::EPICS_EPOCH_OFFSET; use crate::archeng::{open_read, read_channel, read_data_1, read_file_basics, read_index_datablockref}; + use crate::archeng::{read_datafile_header, EPICS_EPOCH_OFFSET}; use err::Error; use netpod::log::*; use netpod::timeunits::*; @@ -1019,12 +1081,12 @@ mod test { assert_eq!(res.node.pos.pos, 8216); assert_eq!(res.rix, 17); let rec = &res.node.records[res.rix]; - assert_eq!(rec.ts1, 970351499684884156 + EPICS_EPOCH_OFFSET); - assert_eq!(rec.ts2, 970417919634086480 + EPICS_EPOCH_OFFSET); + assert_eq!(rec.ts1.ns, 970351499684884156 + EPICS_EPOCH_OFFSET); + assert_eq!(rec.ts2.ns, 970417919634086480 + EPICS_EPOCH_OFFSET); assert_eq!(rec.child_or_id, 185074); let pos = FilePos { pos: rec.child_or_id }; let datablock = read_index_datablockref(&mut index_file, pos).await?; - assert_eq!(datablock.data, 52787); + assert_eq!(datablock.data_header_pos, 52787); assert_eq!(datablock.fname, "20201001/20201001"); // The actual datafile for that time was not retained any longer. // But the index still points to that. @@ -1050,17 +1112,17 @@ mod test { assert_eq!(res.node.pos.pos, 1861178); assert_eq!(res.rix, 41); let rec = &res.node.records[res.rix]; - assert_eq!(rec.ts1, 1001993759871202919 + EPICS_EPOCH_OFFSET); - assert_eq!(rec.ts2, 1002009299596362122 + EPICS_EPOCH_OFFSET); + assert_eq!(rec.ts1.ns, 1001993759871202919 + EPICS_EPOCH_OFFSET); + assert_eq!(rec.ts2.ns, 1002009299596362122 + EPICS_EPOCH_OFFSET); assert_eq!(rec.child_or_id, 2501903); let pos = FilePos { pos: rec.child_or_id }; let datablock = read_index_datablockref(&mut index_file, pos).await?; - assert_eq!(datablock.data, 9311367); - assert_eq!(datablock.fname, "20211001/20211001"); - let data_path = index_path.parent().unwrap().join(datablock.fname); + assert_eq!(datablock.data_header_pos().0, 9311367); + assert_eq!(datablock.file_name(), "20211001/20211001"); + let data_path = index_path.parent().unwrap().join(datablock.file_name()); let mut data_file = open_read(data_path).await?; - let data_pos = FilePos { pos: datablock.data }; - let events = read_data_1(&mut data_file, data_pos).await?; + let datafile_header = read_datafile_header(&mut data_file, datablock.data_header_pos()).await?; + let events = read_data_1(&mut data_file, &datafile_header).await?; info!("read events: {:?}", events); Ok(()) }; diff --git a/archapp/src/archeng/datablockstream.rs b/archapp/src/archeng/datablockstream.rs new file mode 100644 index 0000000..204dde8 --- /dev/null +++ b/archapp/src/archeng/datablockstream.rs @@ -0,0 +1,310 @@ +use crate::archeng::{ + open_read, read_channel, read_data_1, read_datafile_header, read_index_datablockref, search_record, +}; +use crate::EventsItem; +use async_channel::{Receiver, Sender}; +use err::Error; +use futures_core::{Future, Stream}; +use futures_util::{FutureExt, StreamExt}; +use items::{RangeCompletableItem, Sitemty, StreamItem, WithLen}; +use netpod::{log::*, DataHeaderPos, FilePos, Nanos}; +use netpod::{Channel, NanoRange}; +use std::collections::VecDeque; +use std::path::PathBuf; +use std::pin::Pin; +use std::task::{Context, Poll}; + +type FR = (Option>, Box); + +trait FretCb { + fn call(&mut self, stream: &mut Pin<&mut DatablockStream>); +} + +async fn datablock_stream( + range: NanoRange, + channel: Channel, + base_dirs: VecDeque, + expand: bool, + tx: Sender>, +) { + match datablock_stream_inner(range, channel, base_dirs, expand, tx.clone()).await { + Ok(_) => {} + Err(e) => match tx.send(Err(e)).await { + Ok(_) => {} + Err(e) => { + error!("can not forward error: {:?}", e); + } + }, + } +} + +async fn datablock_stream_inner( + range: NanoRange, + channel: Channel, + base_dirs: VecDeque, + expand: bool, + tx: Sender>, +) -> Result<(), Error> { + let basename = channel + .name() + .split("-") + .next() + .ok_or(Error::with_msg_no_trace("can not find base for channel"))?; + for base in base_dirs { + debug!( + "search for {:?} with basename: {} in path {:?}", + channel, basename, base + ); + let index_path = base.join(format!("archive_{}_SH", basename)).join("index"); + let res = open_read(index_path.clone()).await; + debug!("tried to open index file: {:?}", res); + if let Ok(mut index_file) = res { + if let Some(basics) = read_channel(&mut index_file, channel.name()).await? { + let beg = Nanos { ns: range.beg }; + let mut search_ts = beg.clone(); + let mut last_data_file_path = PathBuf::new(); + let mut last_data_file_pos = DataHeaderPos(0); + loop { + // TODO for expand mode, this needs another search function. + let (res, _stats) = + search_record(&mut index_file, basics.rtree_m, basics.rtree_start_pos, search_ts).await?; + if let Some(nrec) = res { + let rec = nrec.rec(); + info!("found record: {:?}", rec); + let pos = FilePos { pos: rec.child_or_id }; + // TODO rename Datablock? → IndexNodeDatablock + info!("\n\nREAD Datablock FROM {:?}\n", pos); + let datablock = read_index_datablockref(&mut index_file, pos).await?; + info!("\nDatablock: {:?}\n", datablock); + let data_path = index_path.parent().unwrap().join(datablock.file_name()); + if data_path == last_data_file_path && datablock.data_header_pos() == last_data_file_pos { + warn!("SKIP BECAUSE ITS THE SAME BLOCK"); + } else { + info!("try to open data_path: {:?}", data_path); + let mut data_file = open_read(data_path.clone()).await?; + let datafile_header = + read_datafile_header(&mut data_file, datablock.data_header_pos()).await?; + info!("datafile_header -------------- HEADER\n{:?}", datafile_header); + let events = read_data_1(&mut data_file, &datafile_header).await?; + info!("Was able to read data: {} events", events.len()); + let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(events))); + tx.send(item).await?; + } + if datablock.next != 0 { + error!("datablock.next != 0: {:?}", datablock); + } + last_data_file_path = data_path; + last_data_file_pos = datablock.data_header_pos(); + if expand { + err::todo() + } else { + search_ts.ns = rec.ts2.ns; + }; + } else { + warn!("nothing found, break"); + break; + } + } + } + } else { + info!("can not find index file at {:?}", index_path); + } + } + Ok(()) +} + +pub struct DatablockStream { + range: NanoRange, + channel: Channel, + base_dirs: VecDeque, + expand: bool, + fut: Pin>>, + rx: Receiver>, + done: bool, + complete: bool, +} + +impl DatablockStream { + pub fn for_channel_range(range: NanoRange, channel: Channel, base_dirs: VecDeque, expand: bool) -> Self { + let (tx, rx) = async_channel::bounded(1); + taskrun::spawn(datablock_stream( + range.clone(), + channel.clone(), + base_dirs.clone(), + expand.clone(), + tx, + )); + let ret = Self { + range, + channel, + base_dirs: VecDeque::new(), + expand, + fut: Box::pin(Self::start()), + rx, + done: false, + complete: false, + }; + ret + } + + async fn start() -> FR { + struct Cb {} + impl FretCb for Cb { + fn call(&mut self, stream: &mut Pin<&mut DatablockStream>) { + if let Some(path) = stream.base_dirs.pop_front() { + stream.fut = Box::pin(DatablockStream::start_with_base_dir(path)); + } else { + // TODO: done? + err::todo(); + } + } + } + (None, Box::new(Cb {})) + } + + async fn start_with_base_dir(path: PathBuf) -> FR { + warn!("start_with_base_dir"); + struct Cb {} + impl FretCb for Cb { + fn call(&mut self, stream: &mut Pin<&mut DatablockStream>) { + let _ = stream; + } + } + (None, Box::new(Cb {})) + } +} + +/* +Loop through all configured data directories. +Locate the index file. +Search for the correct Datablock to start with. +Iterate from there. +*/ + +impl Stream for DatablockStream { + type Item = Sitemty; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + loop { + break if self.complete { + panic!("poll on complete") + } else if self.done { + self.complete = true; + Ready(None) + } else if true { + self.rx.poll_next_unpin(cx) + } else { + match self.fut.poll_unpin(cx) { + Ready((k, mut fr)) => { + fr.call(&mut self); + match k { + Some(item) => Ready(Some(item)), + None => continue, + } + } + Pending => Pending, + } + }; + } + } +} + +#[cfg(test)] +mod test { + use crate::EventsItem; + + use super::DatablockStream; + use chrono::{DateTime, Utc}; + use err::Error; + use futures_util::StreamExt; + use items::{LogItem, Sitemty, StatsItem, StreamItem}; + use netpod::timeunits::SEC; + use netpod::{log::*, RangeFilterStats}; + use netpod::{Channel, NanoRange}; + use serde::Serialize; + use std::collections::VecDeque; + use std::path::PathBuf; + use streams::rangefilter::RangeFilter; + + #[test] + fn read_file_basic_info() -> Result<(), Error> { + let fut = async { + // file begin archive_X05DA_SH/20211001/20211001: 1633039259 + // 1633145759 + // October 1st CEST: 1633039200 + // archive_X05DA_SH/20210901/20210920 (has no next-links) + // maybe there is no linking across files? + // now in this case, there is a `next`. Does the rtree also contain an entry for that? + let beg = "2021-10-01T00:00:00Z".parse::>().unwrap(); + let end = "2021-10-01T02:00:00Z".parse::>().unwrap(); + let range = NanoRange { + beg: beg.timestamp() as u64 * SEC, + end: end.timestamp() as u64 * SEC, + }; + let channel = Channel { + backend: "test-archapp".into(), + name: "X05DA-FE-WI1:TC1".into(), + }; + let base_dirs: VecDeque<_> = ["/data/daqbuffer-testdata/sls/gfa03/bl_arch"] + .iter() + .map(PathBuf::from) + .collect(); + let expand = false; + let datablocks = DatablockStream::for_channel_range(range.clone(), channel, base_dirs, expand); + let filtered = RangeFilter::<_, EventsItem>::new(datablocks, range, expand); + let mut stream = filtered; + while let Some(block) = stream.next().await { + info!("DatablockStream yields: {:?}", block); + } + Ok(()) + }; + Ok(taskrun::run(fut).unwrap()) + } + + #[test] + fn test_bincode_rep_stats() { + fn make_stats() -> Vec + where + T: Serialize, + { + let stats = RangeFilterStats { + events_pre: 626262, + events_post: 929292, + events_unordered: 131313, + }; + let item = StreamItem::Stats(StatsItem::RangeFilterStats(stats)); + let item: Sitemty = Ok(item); + bincode::serialize(&item).unwrap() + } + let v1 = make_stats::(); + let v2 = make_stats::(); + let v3 = make_stats::>(); + let v4 = make_stats::>(); + assert_eq!(v1, v2); + assert_eq!(v1, v3); + assert_eq!(v1, v4); + } + + #[test] + fn test_bincode_rep_log() { + fn make_log() -> Vec + where + T: Serialize, + { + let item = StreamItem::Log(LogItem::quick( + Level::DEBUG, + format!("Some easy log message for testing purpose here."), + )); + let item: Sitemty = Ok(item); + bincode::serialize(&item).unwrap() + } + let v1 = make_log::(); + let v2 = make_log::(); + let v3 = make_log::>(); + let v4 = make_log::>(); + assert_eq!(v1, v2); + assert_eq!(v1, v3); + assert_eq!(v1, v4); + } +} diff --git a/archapp/src/archeng/datastream.rs b/archapp/src/archeng/datastream.rs new file mode 100644 index 0000000..29b8e88 --- /dev/null +++ b/archapp/src/archeng/datastream.rs @@ -0,0 +1,15 @@ +use crate::EventsItem; +use futures_core::Stream; +use items::Sitemty; +use std::pin::Pin; +use std::task::{Context, Poll}; + +pub struct DataStream {} + +impl Stream for DataStream { + type Item = Sitemty; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + todo!() + } +} diff --git a/archapp/src/events.rs b/archapp/src/events.rs index bb33dbf..291c8d4 100644 --- a/archapp/src/events.rs +++ b/archapp/src/events.rs @@ -239,6 +239,7 @@ macro_rules! arm2 { } _ => panic!(), }, + _ => panic!(), }, _ => err::todoval(), diff --git a/archapp/src/lib.rs b/archapp/src/lib.rs index 156fb65..d3578c0 100644 --- a/archapp/src/lib.rs +++ b/archapp/src/lib.rs @@ -14,7 +14,7 @@ use items::numops::NumOps; use items::waveevents::{WaveEvents, WaveXBinner}; use items::xbinnedscalarevents::XBinnedScalarEvents; use items::xbinnedwaveevents::XBinnedWaveEvents; -use items::{EventsNodeProcessor, SitemtyFrameType, WithLen, WithTimestamps}; +use items::{Appendable, Clearable, EventsNodeProcessor, PushableIndex, SitemtyFrameType, WithLen, WithTimestamps}; use netpod::{AggKind, HasScalarType, HasShape, ScalarType, Shape}; #[cfg(not(feature = "devread"))] pub use parsestub as parse; @@ -70,6 +70,82 @@ impl ScalarPlainEvents { } } +impl Clearable for ScalarPlainEvents { + fn clear(&mut self) { + match self { + ScalarPlainEvents::Byte(k) => k.clear(), + ScalarPlainEvents::Short(k) => k.clear(), + ScalarPlainEvents::Int(k) => k.clear(), + ScalarPlainEvents::Float(k) => k.clear(), + ScalarPlainEvents::Double(k) => k.clear(), + } + } +} + +impl Appendable for ScalarPlainEvents { + fn empty_like_self(&self) -> Self { + match self { + Self::Byte(k) => Self::Byte(k.empty_like_self()), + Self::Short(k) => Self::Short(k.empty_like_self()), + Self::Int(k) => Self::Int(k.empty_like_self()), + Self::Float(k) => Self::Float(k.empty_like_self()), + Self::Double(k) => Self::Double(k.empty_like_self()), + } + } + + fn append(&mut self, src: &Self) { + match self { + Self::Byte(k) => match src { + Self::Byte(j) => k.append(j), + _ => panic!(), + }, + Self::Short(k) => match src { + Self::Short(j) => k.append(j), + _ => panic!(), + }, + Self::Int(k) => match src { + Self::Int(j) => k.append(j), + _ => panic!(), + }, + Self::Float(k) => match src { + Self::Float(j) => k.append(j), + _ => panic!(), + }, + Self::Double(k) => match src { + Self::Double(j) => k.append(j), + _ => panic!(), + }, + } + } +} + +impl PushableIndex for ScalarPlainEvents { + fn push_index(&mut self, src: &Self, ix: usize) { + match self { + Self::Byte(k) => match src { + Self::Byte(j) => k.push_index(j, ix), + _ => panic!(), + }, + Self::Short(k) => match src { + Self::Short(j) => k.push_index(j, ix), + _ => panic!(), + }, + Self::Int(k) => match src { + Self::Int(j) => k.push_index(j, ix), + _ => panic!(), + }, + Self::Float(k) => match src { + Self::Float(j) => k.push_index(j, ix), + _ => panic!(), + }, + Self::Double(k) => match src { + Self::Double(j) => k.push_index(j, ix), + _ => panic!(), + }, + } + } +} + impl WithLen for ScalarPlainEvents { fn len(&self) -> usize { use ScalarPlainEvents::*; @@ -185,6 +261,82 @@ impl WavePlainEvents { } } +impl Clearable for WavePlainEvents { + fn clear(&mut self) { + match self { + WavePlainEvents::Byte(k) => k.clear(), + WavePlainEvents::Short(k) => k.clear(), + WavePlainEvents::Int(k) => k.clear(), + WavePlainEvents::Float(k) => k.clear(), + WavePlainEvents::Double(k) => k.clear(), + } + } +} + +impl Appendable for WavePlainEvents { + fn empty_like_self(&self) -> Self { + match self { + Self::Byte(k) => Self::Byte(k.empty_like_self()), + Self::Short(k) => Self::Short(k.empty_like_self()), + Self::Int(k) => Self::Int(k.empty_like_self()), + Self::Float(k) => Self::Float(k.empty_like_self()), + Self::Double(k) => Self::Double(k.empty_like_self()), + } + } + + fn append(&mut self, src: &Self) { + match self { + Self::Byte(k) => match src { + Self::Byte(j) => k.append(j), + _ => panic!(), + }, + Self::Short(k) => match src { + Self::Short(j) => k.append(j), + _ => panic!(), + }, + Self::Int(k) => match src { + Self::Int(j) => k.append(j), + _ => panic!(), + }, + Self::Float(k) => match src { + Self::Float(j) => k.append(j), + _ => panic!(), + }, + Self::Double(k) => match src { + Self::Double(j) => k.append(j), + _ => panic!(), + }, + } + } +} + +impl PushableIndex for WavePlainEvents { + fn push_index(&mut self, src: &Self, ix: usize) { + match self { + Self::Byte(k) => match src { + Self::Byte(j) => k.push_index(j, ix), + _ => panic!(), + }, + Self::Short(k) => match src { + Self::Short(j) => k.push_index(j, ix), + _ => panic!(), + }, + Self::Int(k) => match src { + Self::Int(j) => k.push_index(j, ix), + _ => panic!(), + }, + Self::Float(k) => match src { + Self::Float(j) => k.push_index(j, ix), + _ => panic!(), + }, + Self::Double(k) => match src { + Self::Double(j) => k.push_index(j, ix), + _ => panic!(), + }, + } + } +} + impl WithLen for WavePlainEvents { fn len(&self) -> usize { use WavePlainEvents::*; @@ -273,6 +425,82 @@ impl MultiBinWaveEvents { } } +impl Clearable for MultiBinWaveEvents { + fn clear(&mut self) { + match self { + MultiBinWaveEvents::Byte(k) => k.clear(), + MultiBinWaveEvents::Short(k) => k.clear(), + MultiBinWaveEvents::Int(k) => k.clear(), + MultiBinWaveEvents::Float(k) => k.clear(), + MultiBinWaveEvents::Double(k) => k.clear(), + } + } +} + +impl Appendable for MultiBinWaveEvents { + fn empty_like_self(&self) -> Self { + match self { + Self::Byte(k) => Self::Byte(k.empty_like_self()), + Self::Short(k) => Self::Short(k.empty_like_self()), + Self::Int(k) => Self::Int(k.empty_like_self()), + Self::Float(k) => Self::Float(k.empty_like_self()), + Self::Double(k) => Self::Double(k.empty_like_self()), + } + } + + fn append(&mut self, src: &Self) { + match self { + Self::Byte(k) => match src { + Self::Byte(j) => k.append(j), + _ => panic!(), + }, + Self::Short(k) => match src { + Self::Short(j) => k.append(j), + _ => panic!(), + }, + Self::Int(k) => match src { + Self::Int(j) => k.append(j), + _ => panic!(), + }, + Self::Float(k) => match src { + Self::Float(j) => k.append(j), + _ => panic!(), + }, + Self::Double(k) => match src { + Self::Double(j) => k.append(j), + _ => panic!(), + }, + } + } +} + +impl PushableIndex for MultiBinWaveEvents { + fn push_index(&mut self, src: &Self, ix: usize) { + match self { + Self::Byte(k) => match src { + Self::Byte(j) => k.push_index(j, ix), + _ => panic!(), + }, + Self::Short(k) => match src { + Self::Short(j) => k.push_index(j, ix), + _ => panic!(), + }, + Self::Int(k) => match src { + Self::Int(j) => k.push_index(j, ix), + _ => panic!(), + }, + Self::Float(k) => match src { + Self::Float(j) => k.push_index(j, ix), + _ => panic!(), + }, + Self::Double(k) => match src { + Self::Double(j) => k.push_index(j, ix), + _ => panic!(), + }, + } + } +} + impl WithLen for MultiBinWaveEvents { fn len(&self) -> usize { use MultiBinWaveEvents::*; @@ -361,6 +589,82 @@ impl SingleBinWaveEvents { } } +impl Clearable for SingleBinWaveEvents { + fn clear(&mut self) { + match self { + SingleBinWaveEvents::Byte(k) => k.clear(), + SingleBinWaveEvents::Short(k) => k.clear(), + SingleBinWaveEvents::Int(k) => k.clear(), + SingleBinWaveEvents::Float(k) => k.clear(), + SingleBinWaveEvents::Double(k) => k.clear(), + } + } +} + +impl Appendable for SingleBinWaveEvents { + fn empty_like_self(&self) -> Self { + match self { + Self::Byte(k) => Self::Byte(k.empty_like_self()), + Self::Short(k) => Self::Short(k.empty_like_self()), + Self::Int(k) => Self::Int(k.empty_like_self()), + Self::Float(k) => Self::Float(k.empty_like_self()), + Self::Double(k) => Self::Double(k.empty_like_self()), + } + } + + fn append(&mut self, src: &Self) { + match self { + Self::Byte(k) => match src { + Self::Byte(j) => k.append(j), + _ => panic!(), + }, + Self::Short(k) => match src { + Self::Short(j) => k.append(j), + _ => panic!(), + }, + Self::Int(k) => match src { + Self::Int(j) => k.append(j), + _ => panic!(), + }, + Self::Float(k) => match src { + Self::Float(j) => k.append(j), + _ => panic!(), + }, + Self::Double(k) => match src { + Self::Double(j) => k.append(j), + _ => panic!(), + }, + } + } +} + +impl PushableIndex for SingleBinWaveEvents { + fn push_index(&mut self, src: &Self, ix: usize) { + match self { + Self::Byte(k) => match src { + Self::Byte(j) => k.push_index(j, ix), + _ => panic!(), + }, + Self::Short(k) => match src { + Self::Short(j) => k.push_index(j, ix), + _ => panic!(), + }, + Self::Int(k) => match src { + Self::Int(j) => k.push_index(j, ix), + _ => panic!(), + }, + Self::Float(k) => match src { + Self::Float(j) => k.push_index(j, ix), + _ => panic!(), + }, + Self::Double(k) => match src { + Self::Double(j) => k.push_index(j, ix), + _ => panic!(), + }, + } + } +} + impl WithLen for SingleBinWaveEvents { fn len(&self) -> usize { use SingleBinWaveEvents::*; @@ -440,6 +744,62 @@ impl XBinnedEvents { } } +impl Clearable for XBinnedEvents { + fn clear(&mut self) { + match self { + XBinnedEvents::Scalar(k) => k.clear(), + XBinnedEvents::SingleBinWave(k) => k.clear(), + XBinnedEvents::MultiBinWave(k) => k.clear(), + } + } +} + +impl Appendable for XBinnedEvents { + fn empty_like_self(&self) -> Self { + match self { + Self::Scalar(k) => Self::Scalar(k.empty_like_self()), + Self::SingleBinWave(k) => Self::SingleBinWave(k.empty_like_self()), + Self::MultiBinWave(k) => Self::MultiBinWave(k.empty_like_self()), + } + } + + fn append(&mut self, src: &Self) { + match self { + Self::Scalar(k) => match src { + Self::Scalar(j) => k.append(j), + _ => panic!(), + }, + Self::SingleBinWave(k) => match src { + Self::SingleBinWave(j) => k.append(j), + _ => panic!(), + }, + Self::MultiBinWave(k) => match src { + Self::MultiBinWave(j) => k.append(j), + _ => panic!(), + }, + } + } +} + +impl PushableIndex for XBinnedEvents { + fn push_index(&mut self, src: &Self, ix: usize) { + match self { + Self::Scalar(k) => match src { + Self::Scalar(j) => k.push_index(j, ix), + _ => panic!(), + }, + Self::SingleBinWave(k) => match src { + Self::SingleBinWave(j) => k.push_index(j, ix), + _ => panic!(), + }, + Self::MultiBinWave(k) => match src { + Self::MultiBinWave(j) => k.push_index(j, ix), + _ => panic!(), + }, + } + } +} + impl WithLen for XBinnedEvents { fn len(&self) -> usize { use XBinnedEvents::*; @@ -516,6 +876,52 @@ impl PlainEvents { } } +impl Clearable for PlainEvents { + fn clear(&mut self) { + match self { + PlainEvents::Scalar(k) => k.clear(), + PlainEvents::Wave(k) => k.clear(), + } + } +} + +impl Appendable for PlainEvents { + fn empty_like_self(&self) -> Self { + match self { + Self::Scalar(k) => Self::Scalar(k.empty_like_self()), + Self::Wave(k) => Self::Wave(k.empty_like_self()), + } + } + + fn append(&mut self, src: &Self) { + match self { + PlainEvents::Scalar(k) => match src { + Self::Scalar(j) => k.append(j), + _ => panic!(), + }, + PlainEvents::Wave(k) => match src { + Self::Wave(j) => k.append(j), + _ => panic!(), + }, + } + } +} + +impl PushableIndex for PlainEvents { + fn push_index(&mut self, src: &Self, ix: usize) { + match self { + Self::Scalar(k) => match src { + Self::Scalar(j) => k.push_index(j, ix), + _ => panic!(), + }, + Self::Wave(k) => match src { + Self::Wave(j) => k.push_index(j, ix), + _ => panic!(), + }, + } + } +} + impl WithLen for PlainEvents { fn len(&self) -> usize { use PlainEvents::*; @@ -585,7 +991,7 @@ impl EventsItem { } } - pub fn x_aggregate(self, ak: &AggKind) -> EventsItem { + pub fn x_aggregate(self, ak: &AggKind) -> Self { use EventsItem::*; match self { Plain(k) => k.x_aggregate(ak), @@ -614,6 +1020,52 @@ impl WithTimestamps for EventsItem { } } +impl Appendable for EventsItem { + fn empty_like_self(&self) -> Self { + match self { + EventsItem::Plain(k) => EventsItem::Plain(k.empty_like_self()), + EventsItem::XBinnedEvents(k) => EventsItem::XBinnedEvents(k.empty_like_self()), + } + } + + fn append(&mut self, src: &Self) { + match self { + Self::Plain(k) => match src { + Self::Plain(j) => k.append(j), + _ => panic!(), + }, + Self::XBinnedEvents(k) => match src { + Self::XBinnedEvents(j) => k.append(j), + _ => panic!(), + }, + } + } +} + +impl PushableIndex for EventsItem { + fn push_index(&mut self, src: &Self, ix: usize) { + match self { + Self::Plain(k) => match src { + Self::Plain(j) => k.push_index(j, ix), + _ => panic!(), + }, + Self::XBinnedEvents(k) => match src { + Self::XBinnedEvents(j) => k.push_index(j, ix), + _ => panic!(), + }, + } + } +} + +impl Clearable for EventsItem { + fn clear(&mut self) { + match self { + EventsItem::Plain(k) => k.clear(), + EventsItem::XBinnedEvents(k) => k.clear(), + } + } +} + impl HasShape for EventsItem { fn shape(&self) -> Shape { use EventsItem::*; diff --git a/daqbufp2/Cargo.toml b/daqbufp2/Cargo.toml index 5266784..625d8fb 100644 --- a/daqbufp2/Cargo.toml +++ b/daqbufp2/Cargo.toml @@ -13,12 +13,13 @@ tracing-subscriber = "0.2.17" futures-core = "0.3.14" futures-util = "0.3.14" bytes = "1.0.1" -bincode = "1.3.3" #async-channel = "1" #dashmap = "3" serde = "1.0" serde_derive = "1.0" serde_json = "1.0" +bincode = "1.3.3" +#ciborium = "0.1.0" chrono = "0.4" url = "2.2.2" lazy_static = "1.4.0" diff --git a/daqbufp2/src/test/binnedbinary.rs b/daqbufp2/src/test/binnedbinary.rs index 78c9690..e97632e 100644 --- a/daqbufp2/src/test/binnedbinary.rs +++ b/daqbufp2/src/test/binnedbinary.rs @@ -232,6 +232,7 @@ where a.bytes_read += item.parsed_bytes; a } + _ => a, }, Ok(StreamItem::DataItem(item)) => match item { RangeCompletableItem::RangeComplete => { diff --git a/daqbufp2/src/test/events.rs b/daqbufp2/src/test/events.rs index 8987da8..2e65219 100644 --- a/daqbufp2/src/test/events.rs +++ b/daqbufp2/src/test/events.rs @@ -187,6 +187,7 @@ where a.bytes_read += item.parsed_bytes; a } + _ => a, }, Ok(StreamItem::DataItem(item)) => match item { RangeCompletableItem::RangeComplete => { diff --git a/dbconn/Cargo.toml b/dbconn/Cargo.toml index b8a45cc..9f8c9dd 100644 --- a/dbconn/Cargo.toml +++ b/dbconn/Cargo.toml @@ -15,7 +15,6 @@ byteorder = "1.4.3" futures-core = "0.3.14" futures-util = "0.3.14" bytes = "1.0.1" -bincode = "1.3.3" pin-project = "1.0.7" #async-channel = "1" #dashmap = "3" diff --git a/disk/Cargo.toml b/disk/Cargo.toml index 7f477ee..3573ac9 100644 --- a/disk/Cargo.toml +++ b/disk/Cargo.toml @@ -15,7 +15,6 @@ tokio-stream = {version = "0.1.5", features = ["fs"]} hyper = { version = "0.14", features = ["http1", "http2", "client", "server", "tcp", "stream"] } async-channel = "1.6" bytes = "1.0.1" -bincode = "1.3.3" crc32fast = "1.2.1" arrayref = "0.3.6" byteorder = "1.4.3" @@ -39,4 +38,5 @@ bitshuffle = { path = "../bitshuffle" } dbconn = { path = "../dbconn" } parse = { path = "../parse" } items = { path = "../items" } +streams = { path = "../streams" } httpclient = { path = "../httpclient" } diff --git a/disk/src/agg.rs b/disk/src/agg.rs index f94af22..223ee3c 100644 --- a/disk/src/agg.rs +++ b/disk/src/agg.rs @@ -8,7 +8,6 @@ use std::time::Duration; pub mod binnedt; pub mod enp; -pub mod eventbatch; pub mod scalarbinbatch; pub mod streams; diff --git a/disk/src/agg/eventbatch.rs b/disk/src/agg/eventbatch.rs deleted file mode 100644 index 12cd7a5..0000000 --- a/disk/src/agg/eventbatch.rs +++ /dev/null @@ -1,169 +0,0 @@ -use bytes::{BufMut, Bytes, BytesMut}; -use items::{Appendable, RangeOverlapInfo, SitemtyFrameType}; -use netpod::log::*; -use netpod::NanoRange; -use serde::{Deserialize, Serialize}; -use std::mem::size_of; - -#[derive(Serialize, Deserialize)] -pub struct MinMaxAvgScalarEventBatch { - pub tss: Vec, - pub mins: Vec, - pub maxs: Vec, - pub avgs: Vec, -} - -impl SitemtyFrameType for MinMaxAvgScalarEventBatch { - const FRAME_TYPE_ID: u32 = 0x300; -} - -impl MinMaxAvgScalarEventBatch { - pub fn empty() -> Self { - Self { - tss: vec![], - mins: vec![], - maxs: vec![], - avgs: vec![], - } - } - - #[allow(dead_code)] - pub fn old_from_full_frame(buf: &Bytes) -> Self { - info!("construct MinMaxAvgScalarEventBatch from full frame len {}", buf.len()); - assert!(buf.len() >= 4); - let mut g = MinMaxAvgScalarEventBatch::empty(); - let n1; - unsafe { - let ptr = (&buf[0] as *const u8) as *const [u8; 4]; - n1 = u32::from_le_bytes(*ptr); - trace!("--- +++ --- +++ --- +++ n1: {}", n1); - } - if n1 == 0 { - g - } else { - let n2 = n1 as usize; - g.tss.reserve(n2); - g.mins.reserve(n2); - g.maxs.reserve(n2); - g.avgs.reserve(n2); - unsafe { - // TODO Can I unsafely create ptrs and just assign them? - // TODO What are cases where I really need transmute? - g.tss.set_len(n2); - g.mins.set_len(n2); - g.maxs.set_len(n2); - g.avgs.set_len(n2); - let ptr0 = &buf[4] as *const u8; - { - let ptr1 = ptr0 as *const u64; - for i1 in 0..n2 { - g.tss[i1] = *ptr1.add(i1); - } - } - { - let ptr1 = ptr0.add((8) * n2) as *const f32; - for i1 in 0..n2 { - g.mins[i1] = *ptr1.add(i1); - } - } - { - let ptr1 = ptr0.add((8 + 4) * n2) as *const f32; - for i1 in 0..n2 { - g.maxs[i1] = *ptr1; - } - } - { - let ptr1 = ptr0.add((8 + 4 + 4) * n2) as *const f32; - for i1 in 0..n2 { - g.avgs[i1] = *ptr1; - } - } - } - info!("CONTENT {:?}", g); - g - } - } -} - -impl std::fmt::Debug for MinMaxAvgScalarEventBatch { - fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { - write!( - fmt, - "MinMaxAvgScalarEventBatch count {} tss {:?} mins {:?} maxs {:?} avgs {:?}", - self.tss.len(), - self.tss, - self.mins, - self.maxs, - self.avgs, - ) - } -} - -impl MinMaxAvgScalarEventBatch { - #[allow(dead_code)] - fn old_serialized(&self) -> Bytes { - let n1 = self.tss.len(); - let mut g = BytesMut::with_capacity(4 + n1 * (8 + 3 * 4)); - g.put_u32_le(n1 as u32); - if n1 > 0 { - let ptr = &self.tss[0] as *const u64 as *const u8; - let a = unsafe { std::slice::from_raw_parts(ptr, size_of::() * n1) }; - g.put(a); - let ptr = &self.mins[0] as *const f32 as *const u8; - let a = unsafe { std::slice::from_raw_parts(ptr, size_of::() * n1) }; - g.put(a); - let ptr = &self.maxs[0] as *const f32 as *const u8; - let a = unsafe { std::slice::from_raw_parts(ptr, size_of::() * n1) }; - g.put(a); - let ptr = &self.avgs[0] as *const f32 as *const u8; - let a = unsafe { std::slice::from_raw_parts(ptr, size_of::() * n1) }; - g.put(a); - } - info!("impl Frameable for MinMaxAvgScalarEventBatch g.len() {}", g.len()); - g.freeze() - } -} - -/* -TODO remove? -impl MakeBytesFrame for Result>, Error> { - fn make_bytes_frame(&self) -> Result { - Ok(make_frame(self)?.freeze()) - } -}*/ - -impl RangeOverlapInfo for MinMaxAvgScalarEventBatch { - fn ends_before(&self, range: NanoRange) -> bool { - match self.tss.last() { - Some(&ts) => ts < range.beg, - None => true, - } - } - - fn ends_after(&self, range: NanoRange) -> bool { - match self.tss.last() { - Some(&ts) => ts >= range.end, - None => panic!(), - } - } - - fn starts_after(&self, range: NanoRange) -> bool { - match self.tss.first() { - Some(&ts) => ts >= range.end, - None => panic!(), - } - } -} - -impl Appendable for MinMaxAvgScalarEventBatch { - fn empty() -> Self { - Self::empty() - } - - fn append(&mut self, src: &Self) { - self.tss.extend_from_slice(&src.tss); - self.mins.extend_from_slice(&src.mins); - self.maxs.extend_from_slice(&src.maxs); - self.avgs.extend_from_slice(&src.avgs); - } -} diff --git a/disk/src/binned.rs b/disk/src/binned.rs index 27153c5..e63167a 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -1,5 +1,4 @@ use crate::agg::binnedt::TBinnerStream; -use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use crate::binned::binnedfrompbv::BinnedFromPreBinned; use crate::binned::query::BinnedQuery; use crate::binnedstream::BoxedStream; @@ -406,27 +405,6 @@ pub async fn binned_json( Ok(Box::pin(ret)) } -impl WithLen for MinMaxAvgScalarEventBatch { - fn len(&self) -> usize { - self.tss.len() - } -} - -impl WithTimestamps for MinMaxAvgScalarEventBatch { - fn ts(&self, ix: usize) -> u64 { - self.tss[ix] - } -} - -impl PushableIndex for MinMaxAvgScalarEventBatch { - fn push_index(&mut self, src: &Self, ix: usize) { - self.tss.push(src.tss[ix]); - self.mins.push(src.mins[ix]); - self.maxs.push(src.maxs[ix]); - self.avgs.push(src.avgs[ix]); - } -} - pub trait EventsDecoder { type Output; fn ingest(&mut self, event: &[u8]); diff --git a/disk/src/binned/pbv.rs b/disk/src/binned/pbv.rs index 53aa00e..1d76f8b 100644 --- a/disk/src/binned/pbv.rs +++ b/disk/src/binned/pbv.rs @@ -11,8 +11,8 @@ use futures_core::Stream; use futures_util::{FutureExt, StreamExt}; use items::numops::NumOps; use items::{ - Appendable, Clearable, EventsNodeProcessor, FrameType, PushableIndex, RangeCompletableItem, ReadableFromFile, - Sitemty, StreamItem, TimeBinnableType, + Appendable, Clearable, EventsNodeProcessor, EventsTypeAliases, FrameType, PushableIndex, RangeCompletableItem, + ReadableFromFile, Sitemty, StreamItem, TimeBinnableType, }; use netpod::log::*; use netpod::query::RawEventsQuery; @@ -41,9 +41,7 @@ where agg_kind: AggKind, node_config: NodeConfigCached, open_check_local_file: Option> + Send>>>, - fut2: Option< - Pin::Output as TimeBinnableType>::Output>> + Send>>, - >, + fut2: Option::TimeBinOutput>> + Send>>>, read_from_cache: bool, cache_written: bool, data_complete: bool, @@ -52,15 +50,9 @@ where errored: bool, completed: bool, streamlog: Streamlog, - values: <::Output as TimeBinnableType>::Output, + values: Option<::TimeBinOutput>, write_fut: Option> + Send>>>, - read_cache_fut: Option< - Pin< - Box< - dyn Future::Output as TimeBinnableType>::Output>> + Send, - >, - >, - >, + read_cache_fut: Option::TimeBinOutput>> + Send>>>, _m1: PhantomData, _m2: PhantomData, _m3: PhantomData, @@ -96,7 +88,8 @@ where completed: false, streamlog: Streamlog::new(node_config.ix as u32), // TODO use alias via some trait associated type: - values: <<::Output as TimeBinnableType>::Output as Appendable>::empty(), + //values: <<::Output as TimeBinnableType>::Output as Appendable>::empty(), + values: None, write_fut: None, read_cache_fut: None, _m1: PhantomData, @@ -309,24 +302,26 @@ where } else { match self.query.cache_usage() { CacheUsage::Use | CacheUsage::Recreate => { - let msg = format!( - "write cache file query: {:?} bin count: {}", - self.query.patch(), - self.values.len(), - ); - self.streamlog.append(Level::INFO, msg); - // TODO use alias vias trait: - let emp = <<::Output as TimeBinnableType>::Output as Appendable>::empty(); - let values = std::mem::replace(&mut self.values, emp); - let fut = write_pb_cache_min_max_avg_scalar( - values, - self.query.patch().clone(), - self.query.agg_kind().clone(), - self.query.channel().clone(), - self.node_config.clone(), - ); - self.write_fut = Some(Box::pin(fut)); - continue 'outer; + if let Some(values) = self.values.take() { + let msg = format!( + "write cache file query: {:?} bin count: {}", + self.query.patch(), + values.len(), + ); + self.streamlog.append(Level::INFO, msg); + let fut = write_pb_cache_min_max_avg_scalar( + values, + self.query.patch().clone(), + self.query.agg_kind().clone(), + self.query.channel().clone(), + self.node_config.clone(), + ); + self.write_fut = Some(Box::pin(fut)); + continue 'outer; + } else { + warn!("no values to write to cache"); + continue 'outer; + } } _ => { self.cache_written = true; @@ -346,7 +341,13 @@ where continue 'outer; } RangeCompletableItem::Data(item) => { - self.values.append(&item); + if let Some(values) = &mut self.values { + values.append(&item); + } else { + let mut values = item.empty_like_self(); + values.append(&item); + self.values = Some(values); + } Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))))) } }, diff --git a/disk/src/decode.rs b/disk/src/decode.rs index c1315c5..4158793 100644 --- a/disk/src/decode.rs +++ b/disk/src/decode.rs @@ -212,8 +212,10 @@ where } } - fn decode(&mut self, ev: &EventFull) -> Result<>::Batch, Error> { - let mut ret = <>::Batch as Appendable>::empty(); + fn decode(&mut self, ev: &EventFull) -> Result>::Batch>, Error> { + //let mut ret = <>::Batch as Appendable>::empty(); + //let mut ret = EventValues::<>::Output>::empty(); + let mut ret = None; //ret.tss.reserve(ev.tss.len()); //ret.values.reserve(ev.tss.len()); for i1 in 0..ev.tss.len() { @@ -231,9 +233,9 @@ where } let decomp = ev.decomps[i1].as_ref().unwrap().as_ref(); let val = self.evs.convert(decomp, be)?; - <>::Batch as EventAppendable>::append_event(&mut ret, ev.tss[i1], val); - //ret.tss.push(ev.tss[i1]); - //ret.values.push(val); + let k = + <>::Batch as EventAppendable>::append_event(ret, ev.tss[i1], val); + ret = Some(k); } Ok(ret) } @@ -265,9 +267,14 @@ where Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) } RangeCompletableItem::Data(item) => match self.decode(&item) { - Ok(res) => { - Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(res))))) - } + Ok(res) => match res { + Some(res) => { + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(res))))) + } + None => { + continue; + } + }, Err(e) => { self.errored = true; Ready(Some(Err(e))) diff --git a/disk/src/eventblobs.rs b/disk/src/eventblobs.rs index e6c5e03..239c6c8 100644 --- a/disk/src/eventblobs.rs +++ b/disk/src/eventblobs.rs @@ -2,7 +2,6 @@ use crate::dataopen::{open_expanded_files, open_files, OpenedFileSet}; use crate::eventchunker::{EventChunker, EventChunkerConf, EventFull}; use crate::file_content_stream; use crate::merge::MergedStream; -use crate::rangefilter::RangeFilter; use err::Error; use futures_core::Stream; use futures_util::StreamExt; @@ -14,6 +13,7 @@ use std::pin::Pin; use std::sync::atomic::AtomicU64; use std::sync::Arc; use std::task::{Context, Poll}; +use streams::rangefilter::RangeFilter; pub trait InputTraits: Stream> {} @@ -204,7 +204,6 @@ impl Stream for EventChunkerMultifile { #[cfg(test)] mod test { - use crate::rangefilter::RangeFilter; use crate::{eventblobs::EventChunkerMultifile, eventchunker::EventChunkerConf}; use err::Error; use futures_util::StreamExt; @@ -212,6 +211,7 @@ mod test { use netpod::log::*; use netpod::timeunits::{DAY, MS}; use netpod::{ByteSize, ChannelConfig, FileIoBufferSize, Nanos}; + use streams::rangefilter::RangeFilter; fn read_expanded_for_range(range: netpod::NanoRange, nodeix: usize) -> Result<(usize, Vec), Error> { let chn = netpod::Channel { diff --git a/disk/src/eventchunker.rs b/disk/src/eventchunker.rs index 1233876..9abdf9f 100644 --- a/disk/src/eventchunker.rs +++ b/disk/src/eventchunker.rs @@ -524,7 +524,7 @@ impl WithLen for EventFull { } impl Appendable for EventFull { - fn empty() -> Self { + fn empty_like_self(&self) -> Self { Self::empty() } diff --git a/disk/src/lib.rs b/disk/src/lib.rs index e8bda3c..b750c1c 100644 --- a/disk/src/lib.rs +++ b/disk/src/lib.rs @@ -32,9 +32,7 @@ pub mod frame; pub mod gen; pub mod index; pub mod merge; -pub mod mergeblobs; pub mod paths; -pub mod rangefilter; pub mod raw; pub mod streamlog; diff --git a/disk/src/merge.rs b/disk/src/merge.rs index cfe4b32..5676120 100644 --- a/disk/src/merge.rs +++ b/disk/src/merge.rs @@ -6,7 +6,6 @@ use items::{Appendable, LogItem, PushableIndex, RangeCompletableItem, Sitemty, S use netpod::histo::HistoLog2; use netpod::log::*; use netpod::ByteSize; -use netpod::EventDataReadStats; use std::collections::VecDeque; use std::pin::Pin; use std::task::{Context, Poll}; @@ -28,7 +27,7 @@ pub struct MergedStream { ixs: Vec, errored: bool, completed: bool, - batch: ITY, + batch: Option, ts_last_emit: u64, range_complete_observed: Vec, range_complete_observed_all: bool, @@ -37,7 +36,7 @@ pub struct MergedStream { batch_size: ByteSize, batch_len_emit_histo: HistoLog2, logitems: VecDeque, - event_data_read_stats_items: VecDeque, + stats_items: VecDeque, } // TODO get rid, log info explicitly. @@ -65,7 +64,7 @@ where ixs: vec![0; n], errored: false, completed: false, - batch: ::empty(), + batch: None, ts_last_emit: 0, range_complete_observed: vec![false; n], range_complete_observed_all: false, @@ -74,7 +73,7 @@ where batch_size: ByteSize::kb(128), batch_len_emit_histo: HistoLog2::new(0), logitems: VecDeque::new(), - event_data_read_stats_items: VecDeque::new(), + stats_items: VecDeque::new(), } } @@ -92,11 +91,7 @@ where continue 'l1; } StreamItem::Stats(item) => { - match item { - StatsItem::EventDataReadStats(item) => { - self.event_data_read_stats_items.push_back(item); - } - } + self.stats_items.push_back(item); continue 'l1; } StreamItem::DataItem(item) => match item { @@ -160,8 +155,8 @@ where Ready(None) } else if let Some(item) = self.logitems.pop_front() { Ready(Some(Ok(StreamItem::Log(item)))) - } else if let Some(item) = self.event_data_read_stats_items.pop_front() { - Ready(Some(Ok(StreamItem::Stats(StatsItem::EventDataReadStats(item))))) + } else if let Some(item) = self.stats_items.pop_front() { + Ready(Some(Ok(StreamItem::Stats(item)))) } else if self.data_emit_complete { if self.range_complete_observed_all { if self.range_complete_observed_all_emitted { @@ -197,18 +192,22 @@ where } } if lowest_ix == usize::MAX { - if self.batch.len() != 0 { - let ret = std::mem::replace(&mut self.batch, ITY::empty()); - self.batch_len_emit_histo.ingest(ret.len() as u32); - self.data_emit_complete = true; - if LOG_EMIT_ITEM { - let mut aa = vec![]; - for ii in 0..ret.len() { - aa.push(ret.ts(ii)); - } - info!("MergedBlobsStream A emits {} events tss {:?}", ret.len(), aa); - }; - Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret))))) + if let Some(batch) = self.batch.take() { + if batch.len() != 0 { + self.batch_len_emit_histo.ingest(batch.len() as u32); + self.data_emit_complete = true; + if LOG_EMIT_ITEM { + let mut aa = vec![]; + for ii in 0..batch.len() { + aa.push(batch.ts(ii)); + } + info!("MergedBlobsStream A emits {} events tss {:?}", batch.len(), aa); + }; + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(batch))))) + } else { + self.data_emit_complete = true; + continue 'outer; + } } else { self.data_emit_complete = true; continue 'outer; @@ -216,17 +215,18 @@ where } else { assert!(lowest_ts >= self.ts_last_emit); { - let mut ldst = std::mem::replace(&mut self.batch, ITY::empty()); + let batch = self.batch.take(); self.ts_last_emit = lowest_ts; let rix = self.ixs[lowest_ix]; match &self.current[lowest_ix] { MergedCurVal::Val(val) => { + let mut ldst = batch.unwrap_or_else(|| val.empty_like_self()); ldst.push_index(val, rix); + self.batch = Some(ldst); } MergedCurVal::None => panic!(), MergedCurVal::Finish => panic!(), } - self.batch = ldst; } self.ixs[lowest_ix] += 1; let curlen = match &self.current[lowest_ix] { @@ -238,24 +238,30 @@ where self.ixs[lowest_ix] = 0; self.current[lowest_ix] = MergedCurVal::None; } - let emit_packet_now = if self.batch.byte_estimate() >= self.batch_size.bytes() as u64 { - true + let emit_packet_now = if let Some(batch) = &self.batch { + if batch.byte_estimate() >= self.batch_size.bytes() as u64 { + true + } else { + false + } } else { false }; if emit_packet_now { - trace!("emit item because over threshold len {}", self.batch.len()); - let emp = ITY::empty(); - let ret = std::mem::replace(&mut self.batch, emp); - self.batch_len_emit_histo.ingest(ret.len() as u32); - if LOG_EMIT_ITEM { - let mut aa = vec![]; - for ii in 0..ret.len() { - aa.push(ret.ts(ii)); - } - info!("MergedBlobsStream B emits {} events tss {:?}", ret.len(), aa); - }; - Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret))))) + if let Some(batch) = self.batch.take() { + trace!("emit item because over threshold len {}", batch.len()); + self.batch_len_emit_histo.ingest(batch.len() as u32); + if LOG_EMIT_ITEM { + let mut aa = vec![]; + for ii in 0..batch.len() { + aa.push(batch.ts(ii)); + } + info!("MergedBlobsStream B emits {} events tss {:?}", batch.len(), aa); + }; + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(batch))))) + } else { + continue 'outer; + } } else { continue 'outer; } diff --git a/disk/src/mergeblobs.rs b/disk/src/mergeblobs.rs deleted file mode 100644 index 22b8b93..0000000 --- a/disk/src/mergeblobs.rs +++ /dev/null @@ -1,278 +0,0 @@ -use err::Error; -use futures_core::Stream; -use futures_util::StreamExt; -use items::ByteEstimate; -use items::{ - Appendable, LogItem, PushableIndex, RangeCompletableItem, Sitemty, StatsItem, StreamItem, WithLen, WithTimestamps, -}; -use netpod::histo::HistoLog2; -use netpod::EventDataReadStats; -use netpod::{log::*, ByteSize}; -use std::collections::VecDeque; -use std::pin::Pin; -use std::task::{Context, Poll}; - -const LOG_EMIT_ITEM: bool = false; - -enum MergedCurVal { - None, - Finish, - Val(T), -} - -pub struct MergedBlobsStream -where - S: Stream> + Unpin, - I: Unpin + Appendable + WithTimestamps + PushableIndex + WithLen + ByteEstimate, -{ - inps: Vec, - current: Vec>, - ixs: Vec, - errored: bool, - completed: bool, - batch: I, - ts_last_emit: u64, - range_complete_observed: Vec, - range_complete_observed_all: bool, - range_complete_observed_all_emitted: bool, - data_emit_complete: bool, - batch_size: ByteSize, - batch_len_emit_histo: HistoLog2, - logitems: VecDeque, - event_data_read_stats_items: VecDeque, -} - -// TODO get rid, log info explicitly. -impl Drop for MergedBlobsStream -where - S: Stream> + Unpin, - I: Unpin + Appendable + WithTimestamps + PushableIndex + WithLen + ByteEstimate, -{ - fn drop(&mut self) { - info!( - "MergedBlobsStream Drop Stats:\nbatch_len_emit_histo: {:?}", - self.batch_len_emit_histo - ); - } -} - -impl MergedBlobsStream -where - S: Stream> + Unpin, - I: Unpin + Appendable + WithTimestamps + PushableIndex + WithLen + ByteEstimate, -{ - pub fn new(inps: Vec) -> Self { - // TODO remove MergedBlobsStream - err::todo(); - let n = inps.len(); - let current = (0..n).into_iter().map(|_| MergedCurVal::None).collect(); - Self { - inps, - current: current, - ixs: vec![0; n], - errored: false, - completed: false, - batch: I::empty(), - ts_last_emit: 0, - range_complete_observed: vec![false; n], - range_complete_observed_all: false, - range_complete_observed_all_emitted: false, - data_emit_complete: false, - batch_size: ByteSize::kb(128), - batch_len_emit_histo: HistoLog2::new(0), - logitems: VecDeque::new(), - event_data_read_stats_items: VecDeque::new(), - } - } - - fn replenish(self: &mut Pin<&mut Self>, cx: &mut Context) -> Poll> { - use Poll::*; - let mut pending = 0; - for i1 in 0..self.inps.len() { - match self.current[i1] { - MergedCurVal::None => { - 'l1: loop { - break match self.inps[i1].poll_next_unpin(cx) { - Ready(Some(Ok(k))) => match k { - StreamItem::Log(item) => { - self.logitems.push_back(item); - continue 'l1; - } - StreamItem::Stats(item) => { - match item { - StatsItem::EventDataReadStats(item) => { - self.event_data_read_stats_items.push_back(item); - } - } - continue 'l1; - } - StreamItem::DataItem(item) => match item { - RangeCompletableItem::RangeComplete => { - self.range_complete_observed[i1] = true; - let d = self.range_complete_observed.iter().filter(|&&k| k).count(); - if d == self.range_complete_observed.len() { - self.range_complete_observed_all = true; - debug!("MergedStream range_complete d {} COMPLETE", d); - } else { - trace!("MergedStream range_complete d {}", d); - } - continue 'l1; - } - RangeCompletableItem::Data(item) => { - self.ixs[i1] = 0; - self.current[i1] = MergedCurVal::Val(item); - } - }, - }, - Ready(Some(Err(e))) => { - // TODO emit this error, consider this stream as done, anything more to do here? - //self.current[i1] = CurVal::Err(e); - self.errored = true; - return Ready(Err(e)); - } - Ready(None) => { - self.current[i1] = MergedCurVal::Finish; - } - Pending => { - pending += 1; - } - }; - } - } - _ => (), - } - } - if pending > 0 { - Pending - } else { - Ready(Ok(())) - } - } -} - -impl Stream for MergedBlobsStream -where - S: Stream> + Unpin, - I: Unpin + Appendable + WithTimestamps + PushableIndex + WithLen + ByteEstimate, -{ - type Item = Sitemty; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - // TODO remove MergedBlobsStream - err::todo(); - use Poll::*; - 'outer: loop { - break if self.completed { - panic!("poll_next on completed"); - } else if self.errored { - self.completed = true; - Ready(None) - } else if let Some(item) = self.logitems.pop_front() { - Ready(Some(Ok(StreamItem::Log(item)))) - } else if let Some(item) = self.event_data_read_stats_items.pop_front() { - Ready(Some(Ok(StreamItem::Stats(StatsItem::EventDataReadStats(item))))) - } else if self.data_emit_complete { - if self.range_complete_observed_all { - if self.range_complete_observed_all_emitted { - self.completed = true; - Ready(None) - } else { - self.range_complete_observed_all_emitted = true; - Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) - } - } else { - self.completed = true; - Ready(None) - } - } else { - // Can only run logic if all streams are either finished, errored or have some current value. - match self.replenish(cx) { - Ready(Ok(_)) => { - let mut lowest_ix = usize::MAX; - let mut lowest_ts = u64::MAX; - for i1 in 0..self.inps.len() { - if let MergedCurVal::Val(val) = &self.current[i1] { - let u = self.ixs[i1]; - if u >= val.len() { - self.ixs[i1] = 0; - self.current[i1] = MergedCurVal::None; - continue 'outer; - } else { - let ts = val.ts(u); - if ts < lowest_ts { - lowest_ix = i1; - lowest_ts = ts; - } - } - } - } - if lowest_ix == usize::MAX { - if self.batch.len() != 0 { - let emp = I::empty(); - let ret = std::mem::replace(&mut self.batch, emp); - self.batch_len_emit_histo.ingest(ret.len() as u32); - self.data_emit_complete = true; - if LOG_EMIT_ITEM { - let mut aa = vec![]; - for ii in 0..ret.len() { - aa.push(ret.ts(ii)); - } - info!("MergedBlobsStream A emits {} events tss {:?}", ret.len(), aa); - }; - Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret))))) - } else { - self.data_emit_complete = true; - continue 'outer; - } - } else { - assert!(lowest_ts >= self.ts_last_emit); - let emp = I::empty(); - let mut local_batch = std::mem::replace(&mut self.batch, emp); - self.ts_last_emit = lowest_ts; - let rix = self.ixs[lowest_ix]; - match &self.current[lowest_ix] { - MergedCurVal::Val(val) => { - local_batch.push_index(val, rix); - } - MergedCurVal::None => panic!(), - MergedCurVal::Finish => panic!(), - } - self.batch = local_batch; - self.ixs[lowest_ix] += 1; - let curlen = match &self.current[lowest_ix] { - MergedCurVal::Val(val) => val.len(), - MergedCurVal::None => panic!(), - MergedCurVal::Finish => panic!(), - }; - if self.ixs[lowest_ix] >= curlen { - self.ixs[lowest_ix] = 0; - self.current[lowest_ix] = MergedCurVal::None; - } - if self.batch.byte_estimate() >= self.batch_size.bytes() as u64 { - trace!("emit item because over threshold len {}", self.batch.len()); - let emp = I::empty(); - let ret = std::mem::replace(&mut self.batch, emp); - self.batch_len_emit_histo.ingest(ret.len() as u32); - if LOG_EMIT_ITEM { - let mut aa = vec![]; - for ii in 0..ret.len() { - aa.push(ret.ts(ii)); - } - info!("MergedBlobsStream B emits {} events tss {:?}", ret.len(), aa); - }; - Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret))))) - } else { - continue 'outer; - } - } - } - Ready(Err(e)) => { - self.errored = true; - Ready(Some(Err(e))) - } - Pending => Pending, - } - }; - } - } -} diff --git a/items/src/eventvalues.rs b/items/src/eventvalues.rs index cdcda47..5fdcaf4 100644 --- a/items/src/eventvalues.rs +++ b/items/src/eventvalues.rs @@ -156,7 +156,7 @@ impl Appendable for EventValues where NTY: NumOps, { - fn empty() -> Self { + fn empty_like_self(&self) -> Self { Self::empty() } @@ -482,8 +482,10 @@ where { type Value = NTY; - fn append_event(&mut self, ts: u64, value: Self::Value) { - self.tss.push(ts); - self.values.push(value); + fn append_event(ret: Option, ts: u64, value: Self::Value) -> Self { + let mut ret = if let Some(ret) = ret { ret } else { Self::empty() }; + ret.tss.push(ts); + ret.values.push(value); + ret } } diff --git a/items/src/lib.rs b/items/src/lib.rs index 850c83e..2df1ed8 100644 --- a/items/src/lib.rs +++ b/items/src/lib.rs @@ -4,6 +4,7 @@ use bytes::BytesMut; use chrono::{TimeZone, Utc}; use err::Error; use netpod::timeunits::{MS, SEC}; +use netpod::RangeFilterStats; use netpod::{log::Level, AggKind, EventDataReadStats, EventQueryJsonStringFrame, NanoRange, Shape}; use serde::de::{self, DeserializeOwned, Visitor}; use serde::{Deserialize, Serialize, Serializer}; @@ -50,6 +51,7 @@ pub enum RangeCompletableItem { #[derive(Debug, Serialize, Deserialize)] pub enum StatsItem { EventDataReadStats(EventDataReadStats), + RangeFilterStats(RangeFilterStats), } #[derive(Debug, Serialize, Deserialize)] @@ -243,6 +245,18 @@ pub trait EventsNodeProcessor: Send + Unpin { fn process(&self, inp: Self::Input) -> Self::Output; } +pub trait EventsTypeAliases { + type TimeBinOutput; +} + +impl EventsTypeAliases for ENP +where + ENP: EventsNodeProcessor, + ::Output: TimeBinnableType, +{ + type TimeBinOutput = <::Output as TimeBinnableType>::Output; +} + #[derive(Clone, Debug, Deserialize)] pub struct IsoDateTime(chrono::DateTime); @@ -303,7 +317,7 @@ pub trait PushableIndex { } pub trait Appendable: WithLen { - fn empty() -> Self; + fn empty_like_self(&self) -> Self; fn append(&mut self, src: &Self); } @@ -311,9 +325,12 @@ pub trait Clearable { fn clear(&mut self); } -pub trait EventAppendable { +pub trait EventAppendable +where + Self: Sized, +{ type Value; - fn append_event(&mut self, ts: u64, value: Self::Value); + fn append_event(ret: Option, ts: u64, value: Self::Value) -> Self; } pub trait TimeBins: Send + Unpin + WithLen + Appendable + FilterFittingInside { diff --git a/items/src/minmaxavgbins.rs b/items/src/minmaxavgbins.rs index 96c3363..2c6d963 100644 --- a/items/src/minmaxavgbins.rs +++ b/items/src/minmaxavgbins.rs @@ -144,7 +144,7 @@ impl Appendable for MinMaxAvgBins where NTY: NumOps, { - fn empty() -> Self { + fn empty_like_self(&self) -> Self { Self::empty() } diff --git a/items/src/minmaxavgdim1bins.rs b/items/src/minmaxavgdim1bins.rs index bfd2459..72824e4 100644 --- a/items/src/minmaxavgdim1bins.rs +++ b/items/src/minmaxavgdim1bins.rs @@ -145,7 +145,7 @@ impl Appendable for MinMaxAvgDim1Bins where NTY: NumOps, { - fn empty() -> Self { + fn empty_like_self(&self) -> Self { Self::empty() } diff --git a/items/src/minmaxavgwavebins.rs b/items/src/minmaxavgwavebins.rs index b76adeb..08f1159 100644 --- a/items/src/minmaxavgwavebins.rs +++ b/items/src/minmaxavgwavebins.rs @@ -143,7 +143,7 @@ impl Appendable for MinMaxAvgWaveBins where NTY: NumOps, { - fn empty() -> Self { + fn empty_like_self(&self) -> Self { Self::empty() } diff --git a/items/src/waveevents.rs b/items/src/waveevents.rs index 22430f6..f9eb38a 100644 --- a/items/src/waveevents.rs +++ b/items/src/waveevents.rs @@ -130,7 +130,7 @@ impl Appendable for WaveEvents where NTY: NumOps, { - fn empty() -> Self { + fn empty_like_self(&self) -> Self { Self::empty() } @@ -304,9 +304,11 @@ where { type Value = Vec; - fn append_event(&mut self, ts: u64, value: Self::Value) { - self.tss.push(ts); - self.vals.push(value); + fn append_event(ret: Option, ts: u64, value: Self::Value) -> Self { + let mut ret = if let Some(ret) = ret { ret } else { Self::empty() }; + ret.tss.push(ts); + ret.vals.push(value); + ret } } diff --git a/items/src/xbinnedscalarevents.rs b/items/src/xbinnedscalarevents.rs index 747f896..63bda4c 100644 --- a/items/src/xbinnedscalarevents.rs +++ b/items/src/xbinnedscalarevents.rs @@ -134,7 +134,7 @@ impl Appendable for XBinnedScalarEvents where NTY: NumOps, { - fn empty() -> Self { + fn empty_like_self(&self) -> Self { Self::empty() } diff --git a/items/src/xbinnedwaveevents.rs b/items/src/xbinnedwaveevents.rs index dddddbd..eab1ad2 100644 --- a/items/src/xbinnedwaveevents.rs +++ b/items/src/xbinnedwaveevents.rs @@ -136,7 +136,7 @@ impl Appendable for XBinnedWaveEvents where NTY: NumOps, { - fn empty() -> Self { + fn empty_like_self(&self) -> Self { Self::empty() } diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index 6e2e4fc..c08a099 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -277,6 +277,15 @@ impl From for u64 { } } +#[derive(Clone, Copy, Debug, Serialize, Deserialize)] +pub struct DataHeaderPos(pub u64); + +impl PartialEq for DataHeaderPos { + fn eq(&self, other: &Self) -> bool { + self.0 == other.0 + } +} + #[derive(Clone, Debug, Serialize, Deserialize)] pub enum TimeRange { Time { beg: DateTime, end: DateTime }, @@ -284,11 +293,18 @@ pub enum TimeRange { Nano { beg: u64, end: u64 }, } -#[derive(Clone, Copy, Debug, Serialize, Deserialize)] +#[derive(Clone, Copy, Serialize, Deserialize)] pub struct Nanos { pub ns: u64, } +impl fmt::Debug for Nanos { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + let ts = chrono::Utc.timestamp((self.ns / SEC) as i64, (self.ns % SEC) as u32); + f.debug_struct("Nanos").field("ns", &ts).finish() + } +} + #[derive(Clone, Serialize, Deserialize)] pub struct NanoRange { pub beg: u64, @@ -296,15 +312,13 @@ pub struct NanoRange { } impl fmt::Debug for NanoRange { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - write!( - fmt, - "NanoRange {{ beg: {}.{:03} s, end: {}.{:03} s }}", - self.beg / SEC, - (self.beg % SEC) / MS, - self.end / SEC, - (self.end % SEC) / MS, - ) + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + let beg = chrono::Utc.timestamp((self.beg / SEC) as i64, (self.beg % SEC) as u32); + let end = chrono::Utc.timestamp((self.end / SEC) as i64, (self.end % SEC) as u32); + f.debug_struct("NanoRange") + .field("beg", &beg) + .field("end", &end) + .finish() } } @@ -939,6 +953,23 @@ impl EventDataReadStats { } } +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct RangeFilterStats { + pub events_pre: u64, + pub events_post: u64, + pub events_unordered: u64, +} + +impl RangeFilterStats { + pub fn new() -> Self { + Self { + events_pre: 0, + events_post: 0, + events_unordered: 0, + } + } +} + #[derive(Clone, Debug)] pub struct PerfOpts { pub inmem_bufcap: usize, diff --git a/streams/Cargo.toml b/streams/Cargo.toml new file mode 100644 index 0000000..fc86250 --- /dev/null +++ b/streams/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "streams" +version = "0.0.1-a.dev.4" +authors = ["Dominik Werder "] +edition = "2018" + +[dependencies] +tracing = "0.1.26" +futures-core = "0.3.15" +futures-util = "0.3.15" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +serde_cbor = "0.11.1" +bincode = "1.3.3" +bytes = "1.0.1" +chrono = { version = "0.4.19", features = ["serde"] } +err = { path = "../err" } +netpod = { path = "../netpod" } +items = { path = "../items" } diff --git a/streams/src/lib.rs b/streams/src/lib.rs new file mode 100644 index 0000000..8896be0 --- /dev/null +++ b/streams/src/lib.rs @@ -0,0 +1 @@ +pub mod rangefilter; diff --git a/disk/src/rangefilter.rs b/streams/src/rangefilter.rs similarity index 67% rename from disk/src/rangefilter.rs rename to streams/src/rangefilter.rs index 88f9796..37d317e 100644 --- a/disk/src/rangefilter.rs +++ b/streams/src/rangefilter.rs @@ -1,8 +1,9 @@ use futures_core::Stream; use futures_util::StreamExt; +use items::StatsItem; use items::{Appendable, Clearable, PushableIndex, RangeCompletableItem, Sitemty, StreamItem, WithTimestamps}; -use netpod::log::*; use netpod::NanoRange; +use netpod::{log::*, RangeFilterStats}; use std::pin::Pin; use std::task::{Context, Poll}; @@ -11,11 +12,13 @@ pub struct RangeFilter { range: NanoRange, range_str: String, expand: bool, - prerange: ITY, + stats: RangeFilterStats, + prerange: Option, have_pre: bool, have_range_complete: bool, emitted_post: bool, data_done: bool, + raco_done: bool, done: bool, complete: bool, } @@ -31,11 +34,13 @@ where range_str: format!("{:?}", range), range, expand, - prerange: ITY::empty(), + stats: RangeFilterStats::new(), + prerange: None, have_pre: false, have_range_complete: false, emitted_post: false, data_done: false, + raco_done: false, done: false, complete: false, } @@ -55,8 +60,13 @@ where } else if self.done { self.complete = true; Ready(None) - } else if self.data_done { + } else if self.raco_done { self.done = true; + let k = std::mem::replace(&mut self.stats, RangeFilterStats::new()); + let k = StatsItem::RangeFilterStats(k); + Ready(Some(Ok(StreamItem::Stats(k)))) + } else if self.data_done { + self.raco_done = true; if self.have_range_complete { Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) } else { @@ -66,21 +76,32 @@ where match self.inp.poll_next_unpin(cx) { Ready(Some(item)) => match item { Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) => { - let mut ret = ITY::empty(); + let mut ret = item.empty_like_self(); for i1 in 0..item.len() { let ts = item.ts(i1); if ts < self.range.beg { if self.expand { - self.prerange.clear(); - self.prerange.push_index(&item, i1); + let mut prerange = if let Some(prerange) = self.prerange.take() { + prerange + } else { + item.empty_like_self() + }; + prerange.clear(); + prerange.push_index(&item, i1); + self.prerange = Some(prerange); self.have_pre = true; } } else if ts >= self.range.end { self.have_range_complete = true; if self.expand { if self.have_pre { - ret.push_index(&self.prerange, 0); - self.prerange.clear(); + let prerange = if let Some(prerange) = &mut self.prerange { + prerange + } else { + panic!() + }; + ret.push_index(prerange, 0); + prerange.clear(); self.have_pre = false; } if !self.emitted_post { @@ -94,8 +115,13 @@ where } else { if self.expand { if self.have_pre { - ret.push_index(&self.prerange, 0); - self.prerange.clear(); + let prerange = if let Some(prerange) = &mut self.prerange { + prerange + } else { + panic!() + }; + ret.push_index(prerange, 0); + prerange.clear(); self.have_pre = false; } } @@ -113,8 +139,14 @@ where Ready(None) => { self.data_done = true; if self.have_pre { - let mut ret = ITY::empty(); - ret.push_index(&self.prerange, 0); + let prerange = if let Some(prerange) = &mut self.prerange { + prerange + } else { + panic!() + }; + let mut ret = prerange.empty_like_self(); + ret.push_index(&prerange, 0); + prerange.clear(); self.have_pre = false; Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret))))) } else { diff --git a/taskrun/src/lib.rs b/taskrun/src/lib.rs index a8c8d3a..db497cd 100644 --- a/taskrun/src/lib.rs +++ b/taskrun/src/lib.rs @@ -90,7 +90,7 @@ pub fn tracing_init() { .with_thread_names(true) //.with_max_level(tracing::Level::INFO) .with_env_filter(tracing_subscriber::EnvFilter::new( - ["info", "daqbuffer::test=trace"].join(","), + ["info", "archapp::archeng=trace", "daqbuffer::test=trace"].join(","), )) .init(); *g = 1;