From cd500620aaf84725c9fa47c08a1dc79b9ab70953 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 29 Sep 2021 10:18:51 +0200 Subject: [PATCH] Clearable --- disk/src/binned.rs | 8 +- disk/src/binned/pbv.rs | 8 +- disk/src/binned/prebinned.rs | 6 +- disk/src/channelexec.rs | 8 +- disk/src/dataopen.rs | 4 + disk/src/eventblobs.rs | 238 ++++++++++--------- disk/src/eventchunker.rs | 23 +- disk/src/merge.rs | 288 +++++++++++++++++++---- disk/src/merge/mergedblobsfromremotes.rs | 10 +- disk/src/merge/mergedfromremotes.rs | 19 +- disk/src/mergeblobs.rs | 162 +------------ items/src/eventvalues.rs | 13 +- items/src/lib.rs | 4 + items/src/waveevents.rs | 9 +- items/src/xbinnedscalarevents.rs | 15 +- items/src/xbinnedwaveevents.rs | 14 +- 16 files changed, 474 insertions(+), 355 deletions(-) diff --git a/disk/src/binned.rs b/disk/src/binned.rs index 0fc35ad..27153c5 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -14,8 +14,8 @@ use items::frame::MakeBytesFrame; use items::numops::NumOps; use items::streams::{Collectable, Collector}; use items::{ - EventsNodeProcessor, FilterFittingInside, Framable, FrameType, PushableIndex, RangeCompletableItem, Sitemty, - StreamItem, TimeBinnableType, WithLen, WithTimestamps, + Clearable, EventsNodeProcessor, FilterFittingInside, Framable, FrameType, PushableIndex, RangeCompletableItem, + Sitemty, StreamItem, TimeBinnableType, WithLen, WithTimestamps, }; use netpod::log::*; use netpod::query::RawEventsQuery; @@ -66,7 +66,7 @@ impl ChannelExecFunction for BinnedBinaryChannelExec { EVS: EventValueShape + EventValueFromBytes + 'static, ENP: EventsNodeProcessor>::Batch> + 'static, // TODO require these things in general? - ::Output: Collectable + PushableIndex, + ::Output: Collectable + PushableIndex + Clearable, <::Output as TimeBinnableType>::Output: Debug + TimeBinnableType::Output as TimeBinnableType>::Output> + Collectable @@ -309,7 +309,7 @@ impl ChannelExecFunction for BinnedJsonChannelExec { EVS: EventValueShape + EventValueFromBytes + 'static, ENP: EventsNodeProcessor>::Batch> + 'static, // TODO require these things in general? - ::Output: Collectable + PushableIndex, + ::Output: Collectable + PushableIndex + Clearable, <::Output as TimeBinnableType>::Output: Debug + TimeBinnableType::Output as TimeBinnableType>::Output> + Collectable diff --git a/disk/src/binned/pbv.rs b/disk/src/binned/pbv.rs index d2a246b..53aa00e 100644 --- a/disk/src/binned/pbv.rs +++ b/disk/src/binned/pbv.rs @@ -11,8 +11,8 @@ use futures_core::Stream; use futures_util::{FutureExt, StreamExt}; use items::numops::NumOps; use items::{ - Appendable, EventsNodeProcessor, FrameType, PushableIndex, RangeCompletableItem, ReadableFromFile, Sitemty, - StreamItem, TimeBinnableType, + Appendable, Clearable, EventsNodeProcessor, FrameType, PushableIndex, RangeCompletableItem, ReadableFromFile, + Sitemty, StreamItem, TimeBinnableType, }; use netpod::log::*; use netpod::query::RawEventsQuery; @@ -73,7 +73,7 @@ where END: Endianness + 'static, EVS: EventValueShape + EventValueFromBytes + 'static, ENP: EventsNodeProcessor>::Batch> + 'static, - ::Output: PushableIndex + Appendable, + ::Output: PushableIndex + Appendable + Clearable, // TODO is this needed: Sitemty<::Output>: FrameType, // TODO who exactly needs this DeserializeOwned? @@ -230,7 +230,7 @@ where END: Endianness + Unpin + 'static, EVS: EventValueShape + EventValueFromBytes + Unpin + 'static, ENP: EventsNodeProcessor>::Batch> + Unpin + 'static, - ::Output: PushableIndex + Appendable, + ::Output: PushableIndex + Appendable + Clearable, // TODO needed? Sitemty<::Output>: FrameType, Sitemty<<::Output as TimeBinnableType>::Output>: FrameType + DeserializeOwned, diff --git a/disk/src/binned/prebinned.rs b/disk/src/binned/prebinned.rs index 6b068a1..0ea051a 100644 --- a/disk/src/binned/prebinned.rs +++ b/disk/src/binned/prebinned.rs @@ -10,7 +10,9 @@ use err::Error; use futures_core::Stream; use futures_util::StreamExt; use items::numops::{BoolNum, NumOps}; -use items::{Appendable, EventsNodeProcessor, Framable, FrameType, PushableIndex, Sitemty, TimeBinnableType}; +use items::{ + Appendable, Clearable, EventsNodeProcessor, Framable, FrameType, PushableIndex, Sitemty, TimeBinnableType, +}; use netpod::{AggKind, ByteOrder, ChannelConfigQuery, NodeConfigCached, ScalarType, Shape}; use serde::de::DeserializeOwned; use serde::Serialize; @@ -29,7 +31,7 @@ where END: Endianness + 'static, EVS: EventValueShape + EventValueFromBytes + 'static, ENP: EventsNodeProcessor>::Batch> + 'static, - ::Output: PushableIndex + Appendable + 'static, + ::Output: PushableIndex + Appendable + Clearable + 'static, Sitemty<::Output>: FrameType + Framable + 'static, Sitemty<<::Output as TimeBinnableType>::Output>: Framable + FrameType + DeserializeOwned, diff --git a/disk/src/channelexec.rs b/disk/src/channelexec.rs index bd70e49..ab272c0 100644 --- a/disk/src/channelexec.rs +++ b/disk/src/channelexec.rs @@ -13,7 +13,7 @@ use items::eventvalues::EventValues; use items::numops::{BoolNum, NumOps}; use items::streams::{Collectable, Collector}; use items::{ - EventsNodeProcessor, Framable, FrameType, PushableIndex, RangeCompletableItem, Sitemty, StreamItem, + Clearable, EventsNodeProcessor, Framable, FrameType, PushableIndex, RangeCompletableItem, Sitemty, StreamItem, TimeBinnableType, }; use netpod::log::*; @@ -44,7 +44,7 @@ pub trait ChannelExecFunction { EVS: EventValueShape + EventValueFromBytes + 'static, ENP: EventsNodeProcessor>::Batch> + 'static, // TODO require these things in general? - ::Output: Debug + Collectable + PushableIndex, + ::Output: Debug + Collectable + PushableIndex + Clearable, <::Output as TimeBinnableType>::Output: Debug + TimeBinnableType::Output as TimeBinnableType>::Output> + Collectable @@ -70,7 +70,7 @@ where EVS: EventValueShape + EventValueFromBytes + 'static, ENP: EventsNodeProcessor>::Batch> + 'static, // TODO require these things in general? - ::Output: Debug + Collectable + PushableIndex, + ::Output: Debug + Collectable + PushableIndex + Clearable, <::Output as TimeBinnableType>::Output: Debug + TimeBinnableType::Output as TimeBinnableType>::Output> + Collectable @@ -401,7 +401,7 @@ impl ChannelExecFunction for PlainEventsJson { EVS: EventValueShape + EventValueFromBytes + 'static, ENP: EventsNodeProcessor>::Batch> + 'static, // TODO require these things in general? - ::Output: Debug + Collectable + PushableIndex, + ::Output: Debug + Collectable + PushableIndex + Clearable, <::Output as TimeBinnableType>::Output: Debug + TimeBinnableType::Output as TimeBinnableType>::Output> + Collectable diff --git a/disk/src/dataopen.rs b/disk/src/dataopen.rs index 78b03ee..dbd6cfa 100644 --- a/disk/src/dataopen.rs +++ b/disk/src/dataopen.rs @@ -30,6 +30,10 @@ async fn position_file( expand_left: bool, expand_right: bool, ) -> Result { + info!( + "position_file called {} {} {:?} {:?}", + expand_left, expand_right, range, path + ); assert_eq!(expand_left && expand_right, false); match OpenOptions::new().read(true).open(&path).await { Ok(file) => { diff --git a/disk/src/eventblobs.rs b/disk/src/eventblobs.rs index 5092ca6..5b860d2 100644 --- a/disk/src/eventblobs.rs +++ b/disk/src/eventblobs.rs @@ -1,6 +1,6 @@ use crate::dataopen::{open_expanded_files, open_files, OpenedFileSet}; use crate::eventchunker::{EventChunker, EventChunkerConf, EventFull}; -use crate::mergeblobs::MergedBlobsStream; +use crate::merge::MergedStream; use crate::{file_content_stream, HasSeenBeforeRangeCount}; use err::Error; use futures_core::Stream; @@ -128,7 +128,7 @@ impl Stream for EventChunkerMultifile { Ready(Some(k)) => match k { Ok(ofs) => { self.files_count += ofs.files.len() as u32; - if ofs.files.len() == 1 { + if false && ofs.files.len() == 1 { let mut ofs = ofs; let file = ofs.files.pop().unwrap(); let path = file.path; @@ -156,8 +156,13 @@ impl Stream for EventChunkerMultifile { None => {} } Ready(Some(Ok(StreamItem::Log(item)))) - } else if ofs.files.len() > 1 { - let msg = format!("handle OFS MULTIPLE {:?}", ofs); + } else if ofs.files.len() == 0 { + let msg = format!("handle OFS {:?} NO FILES", ofs); + info!("{}", msg); + let item = LogItem::quick(Level::INFO, msg); + Ready(Some(Ok(StreamItem::Log(item)))) + } else { + let msg = format!("handle OFS MERGED {:?}", ofs); warn!("{}", msg); let item = LogItem::quick(Level::INFO, msg); let mut chunkers = vec![]; @@ -180,14 +185,9 @@ impl Stream for EventChunkerMultifile { chunkers.push(chunker); } } - let merged = MergedBlobsStream::new(chunkers); + let merged = MergedStream::new(chunkers, self.range.clone(), self.expand); self.evs = Some(Box::pin(merged)); Ready(Some(Ok(StreamItem::Log(item)))) - } else { - let msg = format!("handle OFS {:?} NO FILES", ofs); - info!("{}", msg); - let item = LogItem::quick(Level::INFO, msg); - Ready(Some(Ok(StreamItem::Log(item)))) } } Err(e) => { @@ -219,121 +219,125 @@ impl Stream for EventChunkerMultifile { } #[cfg(test)] -fn read_expanded_for_range(range: netpod::NanoRange, nodeix: usize) -> Result<(usize, usize), Error> { - use netpod::timeunits::*; - use netpod::{ByteSize, Nanos}; - let chn = netpod::Channel { - backend: "testbackend".into(), - name: "scalar-i32-be".into(), - }; - // TODO read config from disk. - let channel_config = ChannelConfig { - channel: chn, - keyspace: 2, - time_bin_size: Nanos { ns: DAY }, - scalar_type: netpod::ScalarType::I32, - byte_order: netpod::ByteOrder::big_endian(), - shape: netpod::Shape::Scalar, - array: false, - compression: false, - }; - let cluster = taskrun::test_cluster(); - let node = cluster.nodes[nodeix].clone(); - let buffer_size = 512; - let event_chunker_conf = EventChunkerConf { - disk_stats_every: ByteSize::kb(1024), - }; - let task = async move { - let mut event_count = 0; - let mut events = EventChunkerMultifile::new( - range, - channel_config, - node, - nodeix, - FileIoBufferSize::new(buffer_size), - event_chunker_conf, - true, - true, - ); - while let Some(item) = events.next().await { - match item { - Ok(item) => match item { - StreamItem::DataItem(item) => match item { - RangeCompletableItem::Data(item) => { - info!("item: {:?}", item.tss.iter().map(|x| x / 1000000).collect::>()); - event_count += item.tss.len(); - } +mod test { + use crate::{eventblobs::EventChunkerMultifile, eventchunker::EventChunkerConf}; + use err::Error; + use futures_util::StreamExt; + use items::{RangeCompletableItem, StreamItem}; + use netpod::log::*; + use netpod::timeunits::{DAY, MS}; + use netpod::{ByteSize, ChannelConfig, FileIoBufferSize, Nanos}; + + fn read_expanded_for_range(range: netpod::NanoRange, nodeix: usize) -> Result<(usize, Vec), Error> { + let chn = netpod::Channel { + backend: "testbackend".into(), + name: "scalar-i32-be".into(), + }; + // TODO read config from disk. + let channel_config = ChannelConfig { + channel: chn, + keyspace: 2, + time_bin_size: Nanos { ns: DAY }, + scalar_type: netpod::ScalarType::I32, + byte_order: netpod::ByteOrder::big_endian(), + shape: netpod::Shape::Scalar, + array: false, + compression: false, + }; + let cluster = taskrun::test_cluster(); + let node = cluster.nodes[nodeix].clone(); + let buffer_size = 512; + let event_chunker_conf = EventChunkerConf { + disk_stats_every: ByteSize::kb(1024), + }; + let task = async move { + let mut event_count = 0; + let mut events = EventChunkerMultifile::new( + range, + channel_config, + node, + nodeix, + FileIoBufferSize::new(buffer_size), + event_chunker_conf, + true, + true, + ); + let mut tss = vec![]; + while let Some(item) = events.next().await { + match item { + Ok(item) => match item { + StreamItem::DataItem(item) => match item { + RangeCompletableItem::Data(item) => { + info!("item: {:?}", item.tss.iter().map(|x| x / MS).collect::>()); + event_count += item.tss.len(); + for ts in item.tss { + tss.push(ts); + } + } + _ => {} + }, _ => {} }, - _ => {} - }, - Err(e) => return Err(e.into()), + Err(e) => return Err(e.into()), + } } + events.close(); + Ok((event_count, tss)) + }; + Ok(taskrun::run(task).unwrap()) + } + + #[test] + fn read_expanded_0() -> Result<(), Error> { + let range = netpod::NanoRange { + beg: DAY + MS * 0, + end: DAY + MS * 100, + }; + let res = read_expanded_for_range(range, 0)?; + info!("got {:?}", res.1); + if res.0 != 3 { + Err(Error::with_msg(format!("unexpected number of events: {}", res.0)))?; } - events.close(); - if events.seen_before_range_count() != 1 { - return Err(Error::with_msg(format!( - "seen_before_range_count error: {}", - events.seen_before_range_count(), - ))); + Ok(()) + } + + #[test] + fn read_expanded_1() -> Result<(), Error> { + let range = netpod::NanoRange { + beg: DAY + MS * 0, + end: DAY + MS * 1501, + }; + let res = read_expanded_for_range(range, 0)?; + if res.0 != 3 { + Err(Error::with_msg(format!("unexpected number of events: {}", res.0)))?; } - Ok((event_count, events.seen_before_range_count())) - }; - Ok(taskrun::run(task).unwrap()) -} - -#[test] -fn read_expanded_0() -> Result<(), Error> { - use netpod::timeunits::*; - let range = netpod::NanoRange { - beg: DAY + MS * 0, - end: DAY + MS * 1500, - }; - let res = read_expanded_for_range(range, 0)?; - if res.0 != 2 { - Err(Error::with_msg(format!("unexpected number of events: {}", res.0)))?; + Ok(()) } - Ok(()) -} -#[test] -fn read_expanded_1() -> Result<(), Error> { - use netpod::timeunits::*; - let range = netpod::NanoRange { - beg: DAY + MS * 0, - end: DAY + MS * 1501, - }; - let res = read_expanded_for_range(range, 0)?; - if res.0 != 3 { - Err(Error::with_msg(format!("unexpected number of events: {}", res.0)))?; + #[test] + fn read_expanded_2() -> Result<(), Error> { + let range = netpod::NanoRange { + beg: DAY - MS * 100, + end: DAY + MS * 1501, + }; + let res = read_expanded_for_range(range, 0)?; + if res.0 != 3 { + Err(Error::with_msg(format!("unexpected number of events: {}", res.0)))?; + } + Ok(()) } - Ok(()) -} -#[test] -fn read_expanded_2() -> Result<(), Error> { - use netpod::timeunits::*; - let range = netpod::NanoRange { - beg: DAY - MS * 100, - end: DAY + MS * 1501, - }; - let res = read_expanded_for_range(range, 0)?; - if res.0 != 3 { - Err(Error::with_msg(format!("unexpected number of events: {}", res.0)))?; + #[test] + fn read_expanded_3() -> Result<(), Error> { + use netpod::timeunits::*; + let range = netpod::NanoRange { + beg: DAY - MS * 1500, + end: DAY + MS * 1501, + }; + let res = read_expanded_for_range(range, 0)?; + if res.0 != 4 { + Err(Error::with_msg(format!("unexpected number of events: {}", res.0)))?; + } + Ok(()) } - Ok(()) -} - -#[test] -fn read_expanded_3() -> Result<(), Error> { - use netpod::timeunits::*; - let range = netpod::NanoRange { - beg: DAY - MS * 1500, - end: DAY + MS * 1501, - }; - let res = read_expanded_for_range(range, 0)?; - if res.0 != 4 { - Err(Error::with_msg(format!("unexpected number of events: {}", res.0)))?; - } - Ok(()) } diff --git a/disk/src/eventchunker.rs b/disk/src/eventchunker.rs index 2f39f3b..04f3b8b 100644 --- a/disk/src/eventchunker.rs +++ b/disk/src/eventchunker.rs @@ -5,8 +5,8 @@ use err::Error; use futures_core::Stream; use futures_util::StreamExt; use items::{ - Appendable, ByteEstimate, PushableIndex, RangeCompletableItem, SitemtyFrameType, StatsItem, StreamItem, WithLen, - WithTimestamps, + Appendable, ByteEstimate, Clearable, PushableIndex, RangeCompletableItem, SitemtyFrameType, StatsItem, StreamItem, + WithLen, WithTimestamps, }; use netpod::histo::HistoLog2; use netpod::log::*; @@ -544,6 +544,19 @@ impl Appendable for EventFull { } } +impl Clearable for EventFull { + fn clear(&mut self) { + self.tss.clear(); + self.pulses.clear(); + self.blobs.clear(); + self.decomps.clear(); + self.scalar_types.clear(); + self.be.clear(); + self.shapes.clear(); + self.comps.clear(); + } +} + impl WithTimestamps for EventFull { fn ts(&self, ix: usize) -> u64 { self.tss[ix] @@ -696,9 +709,9 @@ impl HasSeenBeforeRangeCount for EventChunker { #[cfg(test)] mod test { - use err::Error; - use netpod::timeunits::*; - use netpod::{ByteSize, Nanos}; + //use err::Error; + //use netpod::timeunits::*; + //use netpod::{ByteSize, Nanos}; /* #[test] diff --git a/disk/src/merge.rs b/disk/src/merge.rs index 9636776..b5f797c 100644 --- a/disk/src/merge.rs +++ b/disk/src/merge.rs @@ -1,15 +1,13 @@ +use crate::HasSeenBeforeRangeCount; use err::Error; use futures_core::Stream; use futures_util::StreamExt; -use items::ByteEstimate; -use items::{ - Appendable, EventsNodeProcessor, LogItem, PushableIndex, RangeCompletableItem, Sitemty, StatsItem, StreamItem, - WithLen, WithTimestamps, -}; +use items::{Appendable, LogItem, PushableIndex, RangeCompletableItem, Sitemty, StatsItem, StreamItem, WithTimestamps}; +use items::{ByteEstimate, Clearable}; use netpod::histo::HistoLog2; -use netpod::log::*; use netpod::ByteSize; use netpod::EventDataReadStats; +use netpod::{log::*, NanoRange}; use std::collections::VecDeque; use std::pin::Pin; use std::task::{Context, Poll}; @@ -17,23 +15,23 @@ use std::task::{Context, Poll}; pub mod mergedblobsfromremotes; pub mod mergedfromremotes; +const LOG_EMIT_ITEM: bool = false; + enum MergedCurVal { None, Finish, Val(T), } -pub struct MergedStream -where - S: Stream::Output>>, - ENP: EventsNodeProcessor, -{ +pub struct MergedStream { inps: Vec, - current: Vec::Output>>, + current: Vec>, ixs: Vec, errored: bool, completed: bool, - batch: ::Output, + batch: ITY, + range: NanoRange, + expand: bool, ts_last_emit: u64, range_complete_observed: Vec, range_complete_observed_all: bool, @@ -41,16 +39,14 @@ where data_emit_complete: bool, batch_size: ByteSize, batch_len_emit_histo: HistoLog2, + emitted_after_range: usize, + pre_range_buf: ITY, logitems: VecDeque, event_data_read_stats_items: VecDeque, } // TODO get rid, log info explicitly. -impl Drop for MergedStream -where - S: Stream::Output>>, - ENP: EventsNodeProcessor, -{ +impl Drop for MergedStream { fn drop(&mut self) { info!( "MergedStream Drop Stats:\nbatch_len_emit_histo: {:?}", @@ -59,13 +55,12 @@ where } } -impl MergedStream +impl MergedStream where - S: Stream::Output>> + Unpin, - ENP: EventsNodeProcessor, - ::Output: Appendable, + S: Stream> + Unpin, + ITY: Appendable + Unpin, { - pub fn new(inps: Vec) -> Self { + pub fn new(inps: Vec, range: NanoRange, expand: bool) -> Self { let n = inps.len(); let current = (0..n).into_iter().map(|_| MergedCurVal::None).collect(); Self { @@ -74,7 +69,9 @@ where ixs: vec![0; n], errored: false, completed: false, - batch: <::Output as Appendable>::empty(), + batch: ::empty(), + range, + expand, ts_last_emit: 0, range_complete_observed: vec![false; n], range_complete_observed_all: false, @@ -82,6 +79,8 @@ where data_emit_complete: false, batch_size: ByteSize::kb(128), batch_len_emit_histo: HistoLog2::new(0), + emitted_after_range: 0, + pre_range_buf: ITY::empty(), logitems: VecDeque::new(), event_data_read_stats_items: VecDeque::new(), } @@ -152,13 +151,12 @@ where } } -impl Stream for MergedStream +impl Stream for MergedStream where - S: Stream::Output>> + Unpin, - ENP: EventsNodeProcessor, - ::Output: PushableIndex + Appendable + ByteEstimate, + S: Stream> + Unpin, + ITY: PushableIndex + Appendable + Clearable + ByteEstimate + WithTimestamps + Unpin, { - type Item = Sitemty<::Output>; + type Item = Sitemty; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; @@ -186,7 +184,6 @@ where Ready(None) } } else { - // Can only run logic if all streams are either finished, errored or have some current value. match self.replenish(cx) { Ready(Ok(_)) => { let mut lowest_ix = usize::MAX; @@ -208,11 +205,30 @@ where } } if lowest_ix == usize::MAX { + if self.pre_range_buf.len() == 1 { + let mut ldst = std::mem::replace(&mut self.batch, ITY::empty()); + let ts4 = self.pre_range_buf.ts(0); + info!("\n\nMERGER enqueue after exhausted from stash {}", ts4); + ldst.push_index(&self.pre_range_buf, 0); + self.pre_range_buf.clear(); + self.ts_last_emit = ts4; + self.batch = ldst; + } else if self.pre_range_buf.len() > 1 { + panic!(); + } else { + }; if self.batch.len() != 0 { - let emp = <::Output>::empty(); + let emp = ITY::empty(); let ret = std::mem::replace(&mut self.batch, emp); self.batch_len_emit_histo.ingest(ret.len() as u32); self.data_emit_complete = true; + if LOG_EMIT_ITEM { + let mut aa = vec![]; + for ii in 0..ret.len() { + aa.push(ret.ts(ii)); + } + info!("MergedBlobsStream A emits {} events tss {:?}", ret.len(), aa); + }; Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret))))) } else { self.data_emit_complete = true; @@ -220,18 +236,69 @@ where } } else { assert!(lowest_ts >= self.ts_last_emit); - let emp = <::Output>::empty(); - let mut local_batch = std::mem::replace(&mut self.batch, emp); - self.ts_last_emit = lowest_ts; - let rix = self.ixs[lowest_ix]; - match &self.current[lowest_ix] { - MergedCurVal::Val(val) => { - local_batch.push_index(val, rix); + let do_emit_event; + let emit_packet_now_2; + if lowest_ts < self.range.beg { + do_emit_event = false; + emit_packet_now_2 = false; + if self.expand { + info!("\n\nMERGER stash {}", lowest_ts); + let mut ldst = std::mem::replace(&mut self.pre_range_buf, ITY::empty()); + ldst.clear(); + let rix = self.ixs[lowest_ix]; + match &self.current[lowest_ix] { + MergedCurVal::Val(val) => ldst.push_index(val, rix), + MergedCurVal::None => panic!(), + MergedCurVal::Finish => panic!(), + } + self.pre_range_buf = ldst; + } else { + }; + } else if lowest_ts >= self.range.end { + if self.expand { + if self.emitted_after_range == 0 { + do_emit_event = true; + emit_packet_now_2 = true; + self.emitted_after_range += 1; + self.range_complete_observed_all = true; + self.data_emit_complete = true; + } else { + do_emit_event = false; + emit_packet_now_2 = false; + }; + } else { + do_emit_event = false; + emit_packet_now_2 = false; + self.data_emit_complete = true; + }; + } else { + do_emit_event = true; + emit_packet_now_2 = false; + }; + if do_emit_event { + let mut ldst = std::mem::replace(&mut self.batch, ITY::empty()); + if self.pre_range_buf.len() == 1 { + let ts4 = self.pre_range_buf.ts(0); + info!("\n\nMERGER enqueue from stash {}", ts4); + ldst.push_index(&self.pre_range_buf, 0); + self.pre_range_buf.clear(); + } else if self.pre_range_buf.len() > 1 { + panic!(); + } else { + info!("\n\nMERGER nothing in stash"); + }; + info!("\n\nMERGER enqueue {}", lowest_ts); + self.ts_last_emit = lowest_ts; + let rix = self.ixs[lowest_ix]; + match &self.current[lowest_ix] { + MergedCurVal::Val(val) => { + ldst.push_index(val, rix); + } + MergedCurVal::None => panic!(), + MergedCurVal::Finish => panic!(), } - MergedCurVal::None => panic!(), - MergedCurVal::Finish => panic!(), + self.batch = ldst; } - self.batch = local_batch; self.ixs[lowest_ix] += 1; let curlen = match &self.current[lowest_ix] { MergedCurVal::Val(val) => val.len(), @@ -242,11 +309,24 @@ where self.ixs[lowest_ix] = 0; self.current[lowest_ix] = MergedCurVal::None; } - if self.batch.byte_estimate() >= self.batch_size.bytes() as u64 { + let emit_packet_now; + if emit_packet_now_2 || self.batch.byte_estimate() >= self.batch_size.bytes() as u64 { + emit_packet_now = true; + } else { + emit_packet_now = false; + }; + if emit_packet_now { trace!("emit item because over threshold len {}", self.batch.len()); - let emp = <::Output>::empty(); + let emp = ITY::empty(); let ret = std::mem::replace(&mut self.batch, emp); self.batch_len_emit_histo.ingest(ret.len() as u32); + if LOG_EMIT_ITEM { + let mut aa = vec![]; + for ii in 0..ret.len() { + aa.push(ret.ts(ii)); + } + info!("MergedBlobsStream B emits {} events tss {:?}", ret.len(), aa); + }; Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret))))) } else { continue 'outer; @@ -263,3 +343,125 @@ where } } } + +impl HasSeenBeforeRangeCount for MergedStream +where + S: Stream> + Unpin, + ITY: Unpin, +{ + fn seen_before_range_count(&self) -> usize { + // TODO (only for debug) + 0 + } +} + +#[cfg(test)] +mod test { + use crate::dataopen::position_file_for_test; + use crate::eventchunker::{EventChunker, EventChunkerConf}; + use crate::file_content_stream; + use err::Error; + use futures_util::StreamExt; + use items::{RangeCompletableItem, StreamItem}; + use netpod::log::*; + use netpod::timeunits::{DAY, MS}; + use netpod::{ByteOrder, ByteSize, Channel, ChannelConfig, FileIoBufferSize, NanoRange, Nanos, ScalarType, Shape}; + use std::path::PathBuf; + use std::sync::atomic::AtomicU64; + use std::sync::Arc; + + const SCALAR_FILE: &str = + "../tmpdata/node00/ks_2/byTime/scalar-i32-be/0000000000000000001/0000000000/0000000000086400000_00000_Data"; + const WAVE_FILE: &str = + "../tmpdata/node00/ks_3/byTime/wave-f64-be-n21/0000000000000000001/0000000000/0000000000086400000_00000_Data"; + + #[derive(Debug)] + struct CollectedEvents { + tss: Vec, + } + + async fn collect_merged_events(paths: Vec, range: NanoRange) -> Result { + let mut files = vec![]; + for path in paths { + let p = position_file_for_test(&path, &range, false, false).await?; + if !p.found { + return Err(Error::with_msg_no_trace("can not position file??")); + } + files.push( + p.file + .file + .ok_or_else(|| Error::with_msg(format!("can not open file {:?}", path)))?, + ); + } + //Merge + let file_io_buffer_size = FileIoBufferSize(1024 * 4); + let inp = file_content_stream(err::todoval(), file_io_buffer_size); + let inp = Box::pin(inp); + let channel_config = ChannelConfig { + channel: Channel { + backend: "testbackend".into(), + name: "scalar-i32-be".into(), + }, + keyspace: 2, + time_bin_size: Nanos { ns: DAY }, + scalar_type: ScalarType::I32, + byte_order: ByteOrder::BE, + array: false, + compression: false, + shape: Shape::Scalar, + }; + let stats_conf = EventChunkerConf { + disk_stats_every: ByteSize::kb(1024), + }; + let max_ts = Arc::new(AtomicU64::new(0)); + let expand = false; + let do_decompress = false; + let dbg_path = err::todoval(); + + // TODO `expand` flag usage + + let mut chunker = EventChunker::from_event_boundary( + inp, + channel_config, + range, + stats_conf, + dbg_path, + max_ts, + expand, + do_decompress, + ); + + let mut cevs = CollectedEvents { tss: vec![] }; + + let mut i1 = 0; + while let Some(item) = chunker.next().await { + if let Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) = item { + info!("item: {:?}", item); + for ts in item.tss { + cevs.tss.push(ts); + } + i1 += 1; + } + if i1 >= 10 { + break; + } + } + info!("read {} data items", i1); + info!("cevs: {:?}", cevs); + err::todoval() + } + + #[test] + fn single_file_through_merger() -> Result<(), Error> { + let fut = async { + let range = NanoRange { + beg: DAY + MS * 1501, + end: DAY + MS * 4000, + }; + let path = PathBuf::from(SCALAR_FILE); + collect_merged_events(vec![path], range).await?; + Ok(()) + }; + taskrun::run(fut) + } +} diff --git a/disk/src/merge/mergedblobsfromremotes.rs b/disk/src/merge/mergedblobsfromremotes.rs index 1615846..9d6ed9e 100644 --- a/disk/src/merge/mergedblobsfromremotes.rs +++ b/disk/src/merge/mergedblobsfromremotes.rs @@ -1,12 +1,12 @@ use crate::eventchunker::EventFull; -use crate::mergeblobs::MergedBlobsStream; +use crate::merge::MergedStream; use crate::raw::client::x_processed_event_blobs_stream_from_node; use err::Error; use futures_core::Stream; use futures_util::{pin_mut, StreamExt}; use items::Sitemty; -use netpod::log::*; use netpod::query::RawEventsQuery; +use netpod::{log::*, NanoRange}; use netpod::{Cluster, PerfOpts}; use std::future::Future; use std::pin::Pin; @@ -21,6 +21,8 @@ pub struct MergedBlobsFromRemotes { merged: Option>, completed: bool, errored: bool, + range: NanoRange, + expand: bool, } impl MergedBlobsFromRemotes { @@ -39,6 +41,8 @@ impl MergedBlobsFromRemotes { merged: None, completed: false, errored: false, + range: evq.range.clone(), + expand: evq.agg_kind.need_expand(), } } } @@ -95,7 +99,7 @@ impl Stream for MergedBlobsFromRemotes { } else { if c1 == self.tcp_establish_futs.len() { let inps: Vec<_> = self.nodein.iter_mut().map(|k| k.take().unwrap()).collect(); - let s1 = MergedBlobsStream::new(inps); + let s1 = MergedStream::new(inps, self.range.clone(), self.expand); self.merged = Some(Box::pin(s1)); } continue 'outer; diff --git a/disk/src/merge/mergedfromremotes.rs b/disk/src/merge/mergedfromremotes.rs index 8f891d5..65a030c 100644 --- a/disk/src/merge/mergedfromremotes.rs +++ b/disk/src/merge/mergedfromremotes.rs @@ -3,9 +3,9 @@ use crate::raw::client::x_processed_stream_from_node; use err::Error; use futures_core::Stream; use futures_util::{pin_mut, StreamExt}; -use items::{Appendable, EventsNodeProcessor, FrameType, PushableIndex, Sitemty}; -use netpod::log::*; +use items::{Appendable, Clearable, EventsNodeProcessor, FrameType, PushableIndex, Sitemty}; use netpod::query::RawEventsQuery; +use netpod::{log::*, NanoRange}; use netpod::{Cluster, PerfOpts}; use std::future::Future; use std::pin::Pin; @@ -23,13 +23,14 @@ where merged: Option::Output>>, completed: bool, errored: bool, + range: NanoRange, + expand: bool, } impl MergedFromRemotes where ENP: EventsNodeProcessor + 'static, - ::Output: 'static, - ::Output: Unpin, + ::Output: Unpin + PushableIndex + Appendable + Clearable + 'static, Sitemty<::Output>: FrameType, { pub fn new(evq: RawEventsQuery, perf_opts: PerfOpts, cluster: Cluster) -> Self { @@ -47,6 +48,8 @@ where merged: None, completed: false, errored: false, + range: evq.range.clone(), + expand: evq.agg_kind.need_expand(), } } } @@ -54,7 +57,7 @@ where impl Stream for MergedFromRemotes where ENP: EventsNodeProcessor + 'static, - ::Output: PushableIndex + Appendable, + ::Output: PushableIndex + Appendable + Clearable, { type Item = Sitemty<::Output>; @@ -107,7 +110,11 @@ where } else { if c1 == self.tcp_establish_futs.len() { let inps: Vec<_> = self.nodein.iter_mut().map(|k| k.take().unwrap()).collect(); - let s1 = MergedStream::<_, ENP>::new(inps); + let s1 = MergedStream::<_, ::Output>::new( + inps, + self.range.clone(), + self.expand, + ); self.merged = Some(Box::pin(s1)); } continue 'outer; diff --git a/disk/src/mergeblobs.rs b/disk/src/mergeblobs.rs index 7b3f8f0..c21e0a5 100644 --- a/disk/src/mergeblobs.rs +++ b/disk/src/mergeblobs.rs @@ -63,6 +63,8 @@ where I: Unpin + Appendable + WithTimestamps + PushableIndex + WithLen + ByteEstimate, { pub fn new(inps: Vec) -> Self { + // TODO remove MergedBlobsStream + err::todo(); let n = inps.len(); let current = (0..n).into_iter().map(|_| MergedCurVal::None).collect(); Self { @@ -157,6 +159,8 @@ where type Item = Sitemty; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + // TODO remove MergedBlobsStream + err::todo(); use Poll::*; 'outer: loop { break if self.completed { @@ -284,161 +288,3 @@ where 0 } } - -#[cfg(test)] -mod test { - use crate::dataopen::position_file_for_test; - use crate::eventchunker::{EventChunker, EventChunkerConf}; - use crate::file_content_stream; - use err::Error; - use futures_util::StreamExt; - use items::{RangeCompletableItem, StreamItem}; - use netpod::log::*; - use netpod::timeunits::{DAY, MS}; - use netpod::{ByteOrder, ByteSize, Channel, ChannelConfig, FileIoBufferSize, NanoRange, Nanos, ScalarType, Shape}; - use std::path::PathBuf; - use std::sync::atomic::AtomicU64; - use std::sync::Arc; - - const SCALAR_FILE: &str = - "../tmpdata/node00/ks_2/byTime/scalar-i32-be/0000000000000000001/0000000000/0000000000086400000_00000_Data"; - const WAVE_FILE: &str = - "../tmpdata/node00/ks_3/byTime/wave-f64-be-n21/0000000000000000001/0000000000/0000000000086400000_00000_Data"; - - struct CollectedEvents { - tss: Vec, - } - // TODO generify the Mergers into one. - - async fn collect_merged_events(paths: Vec, range: NanoRange) -> Result { - let mut files = vec![]; - for path in paths { - let p = position_file_for_test(&path, &range, false, false).await?; - if !p.found { - return Err(Error::with_msg_no_trace("can not position file??")); - } - files.push( - p.file - .file - .ok_or_else(|| Error::with_msg(format!("can not open file {:?}", path)))?, - ); - } - //Merge - let file_io_buffer_size = FileIoBufferSize(1024 * 4); - let inp = file_content_stream(err::todoval(), file_io_buffer_size); - let inp = Box::pin(inp); - let channel_config = ChannelConfig { - channel: Channel { - backend: "testbackend".into(), - name: "scalar-i32-be".into(), - }, - keyspace: 2, - time_bin_size: Nanos { ns: DAY }, - scalar_type: ScalarType::I32, - byte_order: ByteOrder::BE, - array: false, - compression: false, - shape: Shape::Scalar, - }; - let stats_conf = EventChunkerConf { - disk_stats_every: ByteSize::kb(1024), - }; - let max_ts = Arc::new(AtomicU64::new(0)); - let expand = false; - let do_decompress = false; - let dbg_path = err::todoval(); - - // TODO `expand` flag usage - - let mut chunker = EventChunker::from_event_boundary( - inp, - channel_config, - range, - stats_conf, - dbg_path, - max_ts, - expand, - do_decompress, - ); - - let mut i1 = 0; - while let Some(item) = chunker.next().await { - if let Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) = item { - info!("item: {:?}", item); - i1 += 1; - } - if i1 >= 10 { - break; - } - } - info!("read {} data items", i1); - err::todoval() - } - - #[test] - fn single_file_through_merger() -> Result<(), Error> { - let fut = async { - // TODO open a single file, model after the real opening procedure. - //let file = OpenOptions::new().read(true).open(SCALAR_FILE).await?; - let range = NanoRange { - beg: DAY + MS * 1501, - end: DAY + MS * 4000, - }; - let path = PathBuf::from(SCALAR_FILE); - let p = position_file_for_test(&path, &range, false, false).await?; - if !p.found { - return Err(Error::with_msg_no_trace("can not position file??")); - } - let file_io_buffer_size = FileIoBufferSize(1024 * 4); - let inp = file_content_stream(p.file.file.unwrap(), file_io_buffer_size); - let inp = Box::pin(inp); - let channel_config = ChannelConfig { - channel: Channel { - backend: "testbackend".into(), - name: "scalar-i32-be".into(), - }, - keyspace: 2, - time_bin_size: Nanos { ns: DAY }, - scalar_type: ScalarType::I32, - byte_order: ByteOrder::BE, - array: false, - compression: false, - shape: Shape::Scalar, - }; - let stats_conf = EventChunkerConf { - disk_stats_every: ByteSize::kb(1024), - }; - let max_ts = Arc::new(AtomicU64::new(0)); - let expand = false; - let do_decompress = false; - - // TODO `expand` flag usage - - let mut chunker = EventChunker::from_event_boundary( - inp, - channel_config, - range, - stats_conf, - path, - max_ts, - expand, - do_decompress, - ); - - let mut i1 = 0; - while let Some(item) = chunker.next().await { - if let Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) = item { - info!("item: {:?}", item); - i1 += 1; - } - if i1 >= 10 { - break; - } - } - info!("read {} data items", i1); - Ok(()) - }; - // TODO in general, emit the error message in taskrun::run? - taskrun::run(fut) - } -} diff --git a/items/src/eventvalues.rs b/items/src/eventvalues.rs index a69384d..cdcda47 100644 --- a/items/src/eventvalues.rs +++ b/items/src/eventvalues.rs @@ -2,9 +2,9 @@ use crate::minmaxavgbins::MinMaxAvgBins; use crate::numops::NumOps; use crate::streams::{Collectable, Collector}; use crate::{ - ts_offs_from_abs, Appendable, ByteEstimate, EventAppendable, FilterFittingInside, Fits, FitsInside, PushableIndex, - RangeOverlapInfo, ReadPbv, ReadableFromFile, SitemtyFrameType, TimeBinnableType, TimeBinnableTypeAggregator, - WithLen, WithTimestamps, + ts_offs_from_abs, Appendable, ByteEstimate, Clearable, EventAppendable, FilterFittingInside, Fits, FitsInside, + PushableIndex, RangeOverlapInfo, ReadPbv, ReadableFromFile, SitemtyFrameType, TimeBinnableType, + TimeBinnableTypeAggregator, WithLen, WithTimestamps, }; use err::Error; use netpod::timeunits::*; @@ -166,6 +166,13 @@ where } } +impl Clearable for EventValues { + fn clear(&mut self) { + self.tss.clear(); + self.values.clear(); + } +} + impl ReadableFromFile for EventValues where NTY: NumOps, diff --git a/items/src/lib.rs b/items/src/lib.rs index 14b8efe..850c83e 100644 --- a/items/src/lib.rs +++ b/items/src/lib.rs @@ -307,6 +307,10 @@ pub trait Appendable: WithLen { fn append(&mut self, src: &Self); } +pub trait Clearable { + fn clear(&mut self); +} + pub trait EventAppendable { type Value; fn append_event(&mut self, ts: u64, value: Self::Value); diff --git a/items/src/waveevents.rs b/items/src/waveevents.rs index d4ea9cd..22430f6 100644 --- a/items/src/waveevents.rs +++ b/items/src/waveevents.rs @@ -3,7 +3,7 @@ use crate::numops::NumOps; use crate::xbinnedscalarevents::XBinnedScalarEvents; use crate::xbinnedwaveevents::XBinnedWaveEvents; use crate::{ - Appendable, ByteEstimate, EventAppendable, EventsNodeProcessor, FilterFittingInside, Fits, FitsInside, + Appendable, ByteEstimate, Clearable, EventAppendable, EventsNodeProcessor, FilterFittingInside, Fits, FitsInside, PushableIndex, RangeOverlapInfo, ReadPbv, ReadableFromFile, SitemtyFrameType, SubFrId, TimeBinnableType, TimeBinnableTypeAggregator, WithLen, WithTimestamps, }; @@ -140,6 +140,13 @@ where } } +impl Clearable for WaveEvents { + fn clear(&mut self) { + self.tss.clear(); + self.vals.clear(); + } +} + impl ReadableFromFile for WaveEvents where NTY: NumOps, diff --git a/items/src/xbinnedscalarevents.rs b/items/src/xbinnedscalarevents.rs index 1fdbf79..747f896 100644 --- a/items/src/xbinnedscalarevents.rs +++ b/items/src/xbinnedscalarevents.rs @@ -2,9 +2,9 @@ use crate::minmaxavgbins::MinMaxAvgBins; use crate::numops::NumOps; use crate::streams::{Collectable, Collector}; use crate::{ - ts_offs_from_abs, Appendable, ByteEstimate, FilterFittingInside, Fits, FitsInside, PushableIndex, RangeOverlapInfo, - ReadPbv, ReadableFromFile, SitemtyFrameType, SubFrId, TimeBinnableType, TimeBinnableTypeAggregator, WithLen, - WithTimestamps, + ts_offs_from_abs, Appendable, ByteEstimate, Clearable, FilterFittingInside, Fits, FitsInside, PushableIndex, + RangeOverlapInfo, ReadPbv, ReadableFromFile, SitemtyFrameType, SubFrId, TimeBinnableType, + TimeBinnableTypeAggregator, WithLen, WithTimestamps, }; use err::Error; use netpod::timeunits::SEC; @@ -146,6 +146,15 @@ where } } +impl Clearable for XBinnedScalarEvents { + fn clear(&mut self) { + self.tss.clear(); + self.avgs.clear(); + self.mins.clear(); + self.maxs.clear(); + } +} + impl ReadableFromFile for XBinnedScalarEvents where NTY: NumOps, diff --git a/items/src/xbinnedwaveevents.rs b/items/src/xbinnedwaveevents.rs index 28abc42..dddddbd 100644 --- a/items/src/xbinnedwaveevents.rs +++ b/items/src/xbinnedwaveevents.rs @@ -2,8 +2,9 @@ use crate::minmaxavgwavebins::MinMaxAvgWaveBins; use crate::numops::NumOps; use crate::streams::{Collectable, Collector}; use crate::{ - Appendable, ByteEstimate, FilterFittingInside, Fits, FitsInside, PushableIndex, RangeOverlapInfo, ReadPbv, - ReadableFromFile, SitemtyFrameType, SubFrId, TimeBinnableType, TimeBinnableTypeAggregator, WithLen, WithTimestamps, + Appendable, ByteEstimate, Clearable, FilterFittingInside, Fits, FitsInside, PushableIndex, RangeOverlapInfo, + ReadPbv, ReadableFromFile, SitemtyFrameType, SubFrId, TimeBinnableType, TimeBinnableTypeAggregator, WithLen, + WithTimestamps, }; use err::Error; use netpod::log::*; @@ -147,6 +148,15 @@ where } } +impl Clearable for XBinnedWaveEvents { + fn clear(&mut self) { + self.tss.clear(); + self.mins.clear(); + self.maxs.clear(); + self.avgs.clear(); + } +} + impl ReadableFromFile for XBinnedWaveEvents where NTY: NumOps,