Clearable

This commit is contained in:
Dominik Werder
2021-09-29 10:18:51 +02:00
parent 10e0fd887d
commit cd500620aa
16 changed files with 474 additions and 355 deletions

View File

@@ -14,8 +14,8 @@ use items::frame::MakeBytesFrame;
use items::numops::NumOps;
use items::streams::{Collectable, Collector};
use items::{
EventsNodeProcessor, FilterFittingInside, Framable, FrameType, PushableIndex, RangeCompletableItem, Sitemty,
StreamItem, TimeBinnableType, WithLen, WithTimestamps,
Clearable, EventsNodeProcessor, FilterFittingInside, Framable, FrameType, PushableIndex, RangeCompletableItem,
Sitemty, StreamItem, TimeBinnableType, WithLen, WithTimestamps,
};
use netpod::log::*;
use netpod::query::RawEventsQuery;
@@ -66,7 +66,7 @@ impl ChannelExecFunction for BinnedBinaryChannelExec {
EVS: EventValueShape<NTY, END> + EventValueFromBytes<NTY, END> + 'static,
ENP: EventsNodeProcessor<Input = <EVS as EventValueFromBytes<NTY, END>>::Batch> + 'static,
// TODO require these things in general?
<ENP as EventsNodeProcessor>::Output: Collectable + PushableIndex,
<ENP as EventsNodeProcessor>::Output: Collectable + PushableIndex + Clearable,
<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output: Debug
+ TimeBinnableType<Output = <<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output>
+ Collectable
@@ -309,7 +309,7 @@ impl ChannelExecFunction for BinnedJsonChannelExec {
EVS: EventValueShape<NTY, END> + EventValueFromBytes<NTY, END> + 'static,
ENP: EventsNodeProcessor<Input = <EVS as EventValueFromBytes<NTY, END>>::Batch> + 'static,
// TODO require these things in general?
<ENP as EventsNodeProcessor>::Output: Collectable + PushableIndex,
<ENP as EventsNodeProcessor>::Output: Collectable + PushableIndex + Clearable,
<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output: Debug
+ TimeBinnableType<Output = <<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output>
+ Collectable

View File

@@ -11,8 +11,8 @@ use futures_core::Stream;
use futures_util::{FutureExt, StreamExt};
use items::numops::NumOps;
use items::{
Appendable, EventsNodeProcessor, FrameType, PushableIndex, RangeCompletableItem, ReadableFromFile, Sitemty,
StreamItem, TimeBinnableType,
Appendable, Clearable, EventsNodeProcessor, FrameType, PushableIndex, RangeCompletableItem, ReadableFromFile,
Sitemty, StreamItem, TimeBinnableType,
};
use netpod::log::*;
use netpod::query::RawEventsQuery;
@@ -73,7 +73,7 @@ where
END: Endianness + 'static,
EVS: EventValueShape<NTY, END> + EventValueFromBytes<NTY, END> + 'static,
ENP: EventsNodeProcessor<Input = <EVS as EventValueFromBytes<NTY, END>>::Batch> + 'static,
<ENP as EventsNodeProcessor>::Output: PushableIndex + Appendable,
<ENP as EventsNodeProcessor>::Output: PushableIndex + Appendable + Clearable,
// TODO is this needed:
Sitemty<<ENP as EventsNodeProcessor>::Output>: FrameType,
// TODO who exactly needs this DeserializeOwned?
@@ -230,7 +230,7 @@ where
END: Endianness + Unpin + 'static,
EVS: EventValueShape<NTY, END> + EventValueFromBytes<NTY, END> + Unpin + 'static,
ENP: EventsNodeProcessor<Input = <EVS as EventValueFromBytes<NTY, END>>::Batch> + Unpin + 'static,
<ENP as EventsNodeProcessor>::Output: PushableIndex + Appendable,
<ENP as EventsNodeProcessor>::Output: PushableIndex + Appendable + Clearable,
// TODO needed?
Sitemty<<ENP as EventsNodeProcessor>::Output>: FrameType,
Sitemty<<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output>: FrameType + DeserializeOwned,

View File

@@ -10,7 +10,9 @@ use err::Error;
use futures_core::Stream;
use futures_util::StreamExt;
use items::numops::{BoolNum, NumOps};
use items::{Appendable, EventsNodeProcessor, Framable, FrameType, PushableIndex, Sitemty, TimeBinnableType};
use items::{
Appendable, Clearable, EventsNodeProcessor, Framable, FrameType, PushableIndex, Sitemty, TimeBinnableType,
};
use netpod::{AggKind, ByteOrder, ChannelConfigQuery, NodeConfigCached, ScalarType, Shape};
use serde::de::DeserializeOwned;
use serde::Serialize;
@@ -29,7 +31,7 @@ where
END: Endianness + 'static,
EVS: EventValueShape<NTY, END> + EventValueFromBytes<NTY, END> + 'static,
ENP: EventsNodeProcessor<Input = <EVS as EventValueFromBytes<NTY, END>>::Batch> + 'static,
<ENP as EventsNodeProcessor>::Output: PushableIndex + Appendable + 'static,
<ENP as EventsNodeProcessor>::Output: PushableIndex + Appendable + Clearable + 'static,
Sitemty<<ENP as EventsNodeProcessor>::Output>: FrameType + Framable + 'static,
Sitemty<<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output>:
Framable + FrameType + DeserializeOwned,

View File

@@ -13,7 +13,7 @@ use items::eventvalues::EventValues;
use items::numops::{BoolNum, NumOps};
use items::streams::{Collectable, Collector};
use items::{
EventsNodeProcessor, Framable, FrameType, PushableIndex, RangeCompletableItem, Sitemty, StreamItem,
Clearable, EventsNodeProcessor, Framable, FrameType, PushableIndex, RangeCompletableItem, Sitemty, StreamItem,
TimeBinnableType,
};
use netpod::log::*;
@@ -44,7 +44,7 @@ pub trait ChannelExecFunction {
EVS: EventValueShape<NTY, END> + EventValueFromBytes<NTY, END> + 'static,
ENP: EventsNodeProcessor<Input = <EVS as EventValueFromBytes<NTY, END>>::Batch> + 'static,
// TODO require these things in general?
<ENP as EventsNodeProcessor>::Output: Debug + Collectable + PushableIndex,
<ENP as EventsNodeProcessor>::Output: Debug + Collectable + PushableIndex + Clearable,
<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output: Debug
+ TimeBinnableType<Output = <<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output>
+ Collectable
@@ -70,7 +70,7 @@ where
EVS: EventValueShape<NTY, END> + EventValueFromBytes<NTY, END> + 'static,
ENP: EventsNodeProcessor<Input = <EVS as EventValueFromBytes<NTY, END>>::Batch> + 'static,
// TODO require these things in general?
<ENP as EventsNodeProcessor>::Output: Debug + Collectable + PushableIndex,
<ENP as EventsNodeProcessor>::Output: Debug + Collectable + PushableIndex + Clearable,
<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output: Debug
+ TimeBinnableType<Output = <<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output>
+ Collectable
@@ -401,7 +401,7 @@ impl ChannelExecFunction for PlainEventsJson {
EVS: EventValueShape<NTY, END> + EventValueFromBytes<NTY, END> + 'static,
ENP: EventsNodeProcessor<Input = <EVS as EventValueFromBytes<NTY, END>>::Batch> + 'static,
// TODO require these things in general?
<ENP as EventsNodeProcessor>::Output: Debug + Collectable + PushableIndex,
<ENP as EventsNodeProcessor>::Output: Debug + Collectable + PushableIndex + Clearable,
<<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output: Debug
+ TimeBinnableType<Output = <<ENP as EventsNodeProcessor>::Output as TimeBinnableType>::Output>
+ Collectable

View File

@@ -30,6 +30,10 @@ async fn position_file(
expand_left: bool,
expand_right: bool,
) -> Result<Positioned, Error> {
info!(
"position_file called {} {} {:?} {:?}",
expand_left, expand_right, range, path
);
assert_eq!(expand_left && expand_right, false);
match OpenOptions::new().read(true).open(&path).await {
Ok(file) => {

View File

@@ -1,6 +1,6 @@
use crate::dataopen::{open_expanded_files, open_files, OpenedFileSet};
use crate::eventchunker::{EventChunker, EventChunkerConf, EventFull};
use crate::mergeblobs::MergedBlobsStream;
use crate::merge::MergedStream;
use crate::{file_content_stream, HasSeenBeforeRangeCount};
use err::Error;
use futures_core::Stream;
@@ -128,7 +128,7 @@ impl Stream for EventChunkerMultifile {
Ready(Some(k)) => match k {
Ok(ofs) => {
self.files_count += ofs.files.len() as u32;
if ofs.files.len() == 1 {
if false && ofs.files.len() == 1 {
let mut ofs = ofs;
let file = ofs.files.pop().unwrap();
let path = file.path;
@@ -156,8 +156,13 @@ impl Stream for EventChunkerMultifile {
None => {}
}
Ready(Some(Ok(StreamItem::Log(item))))
} else if ofs.files.len() > 1 {
let msg = format!("handle OFS MULTIPLE {:?}", ofs);
} else if ofs.files.len() == 0 {
let msg = format!("handle OFS {:?} NO FILES", ofs);
info!("{}", msg);
let item = LogItem::quick(Level::INFO, msg);
Ready(Some(Ok(StreamItem::Log(item))))
} else {
let msg = format!("handle OFS MERGED {:?}", ofs);
warn!("{}", msg);
let item = LogItem::quick(Level::INFO, msg);
let mut chunkers = vec![];
@@ -180,14 +185,9 @@ impl Stream for EventChunkerMultifile {
chunkers.push(chunker);
}
}
let merged = MergedBlobsStream::new(chunkers);
let merged = MergedStream::new(chunkers, self.range.clone(), self.expand);
self.evs = Some(Box::pin(merged));
Ready(Some(Ok(StreamItem::Log(item))))
} else {
let msg = format!("handle OFS {:?} NO FILES", ofs);
info!("{}", msg);
let item = LogItem::quick(Level::INFO, msg);
Ready(Some(Ok(StreamItem::Log(item))))
}
}
Err(e) => {
@@ -219,121 +219,125 @@ impl Stream for EventChunkerMultifile {
}
#[cfg(test)]
fn read_expanded_for_range(range: netpod::NanoRange, nodeix: usize) -> Result<(usize, usize), Error> {
use netpod::timeunits::*;
use netpod::{ByteSize, Nanos};
let chn = netpod::Channel {
backend: "testbackend".into(),
name: "scalar-i32-be".into(),
};
// TODO read config from disk.
let channel_config = ChannelConfig {
channel: chn,
keyspace: 2,
time_bin_size: Nanos { ns: DAY },
scalar_type: netpod::ScalarType::I32,
byte_order: netpod::ByteOrder::big_endian(),
shape: netpod::Shape::Scalar,
array: false,
compression: false,
};
let cluster = taskrun::test_cluster();
let node = cluster.nodes[nodeix].clone();
let buffer_size = 512;
let event_chunker_conf = EventChunkerConf {
disk_stats_every: ByteSize::kb(1024),
};
let task = async move {
let mut event_count = 0;
let mut events = EventChunkerMultifile::new(
range,
channel_config,
node,
nodeix,
FileIoBufferSize::new(buffer_size),
event_chunker_conf,
true,
true,
);
while let Some(item) = events.next().await {
match item {
Ok(item) => match item {
StreamItem::DataItem(item) => match item {
RangeCompletableItem::Data(item) => {
info!("item: {:?}", item.tss.iter().map(|x| x / 1000000).collect::<Vec<_>>());
event_count += item.tss.len();
}
mod test {
use crate::{eventblobs::EventChunkerMultifile, eventchunker::EventChunkerConf};
use err::Error;
use futures_util::StreamExt;
use items::{RangeCompletableItem, StreamItem};
use netpod::log::*;
use netpod::timeunits::{DAY, MS};
use netpod::{ByteSize, ChannelConfig, FileIoBufferSize, Nanos};
fn read_expanded_for_range(range: netpod::NanoRange, nodeix: usize) -> Result<(usize, Vec<u64>), Error> {
let chn = netpod::Channel {
backend: "testbackend".into(),
name: "scalar-i32-be".into(),
};
// TODO read config from disk.
let channel_config = ChannelConfig {
channel: chn,
keyspace: 2,
time_bin_size: Nanos { ns: DAY },
scalar_type: netpod::ScalarType::I32,
byte_order: netpod::ByteOrder::big_endian(),
shape: netpod::Shape::Scalar,
array: false,
compression: false,
};
let cluster = taskrun::test_cluster();
let node = cluster.nodes[nodeix].clone();
let buffer_size = 512;
let event_chunker_conf = EventChunkerConf {
disk_stats_every: ByteSize::kb(1024),
};
let task = async move {
let mut event_count = 0;
let mut events = EventChunkerMultifile::new(
range,
channel_config,
node,
nodeix,
FileIoBufferSize::new(buffer_size),
event_chunker_conf,
true,
true,
);
let mut tss = vec![];
while let Some(item) = events.next().await {
match item {
Ok(item) => match item {
StreamItem::DataItem(item) => match item {
RangeCompletableItem::Data(item) => {
info!("item: {:?}", item.tss.iter().map(|x| x / MS).collect::<Vec<_>>());
event_count += item.tss.len();
for ts in item.tss {
tss.push(ts);
}
}
_ => {}
},
_ => {}
},
_ => {}
},
Err(e) => return Err(e.into()),
Err(e) => return Err(e.into()),
}
}
events.close();
Ok((event_count, tss))
};
Ok(taskrun::run(task).unwrap())
}
#[test]
fn read_expanded_0() -> Result<(), Error> {
let range = netpod::NanoRange {
beg: DAY + MS * 0,
end: DAY + MS * 100,
};
let res = read_expanded_for_range(range, 0)?;
info!("got {:?}", res.1);
if res.0 != 3 {
Err(Error::with_msg(format!("unexpected number of events: {}", res.0)))?;
}
events.close();
if events.seen_before_range_count() != 1 {
return Err(Error::with_msg(format!(
"seen_before_range_count error: {}",
events.seen_before_range_count(),
)));
Ok(())
}
#[test]
fn read_expanded_1() -> Result<(), Error> {
let range = netpod::NanoRange {
beg: DAY + MS * 0,
end: DAY + MS * 1501,
};
let res = read_expanded_for_range(range, 0)?;
if res.0 != 3 {
Err(Error::with_msg(format!("unexpected number of events: {}", res.0)))?;
}
Ok((event_count, events.seen_before_range_count()))
};
Ok(taskrun::run(task).unwrap())
}
#[test]
fn read_expanded_0() -> Result<(), Error> {
use netpod::timeunits::*;
let range = netpod::NanoRange {
beg: DAY + MS * 0,
end: DAY + MS * 1500,
};
let res = read_expanded_for_range(range, 0)?;
if res.0 != 2 {
Err(Error::with_msg(format!("unexpected number of events: {}", res.0)))?;
Ok(())
}
Ok(())
}
#[test]
fn read_expanded_1() -> Result<(), Error> {
use netpod::timeunits::*;
let range = netpod::NanoRange {
beg: DAY + MS * 0,
end: DAY + MS * 1501,
};
let res = read_expanded_for_range(range, 0)?;
if res.0 != 3 {
Err(Error::with_msg(format!("unexpected number of events: {}", res.0)))?;
#[test]
fn read_expanded_2() -> Result<(), Error> {
let range = netpod::NanoRange {
beg: DAY - MS * 100,
end: DAY + MS * 1501,
};
let res = read_expanded_for_range(range, 0)?;
if res.0 != 3 {
Err(Error::with_msg(format!("unexpected number of events: {}", res.0)))?;
}
Ok(())
}
Ok(())
}
#[test]
fn read_expanded_2() -> Result<(), Error> {
use netpod::timeunits::*;
let range = netpod::NanoRange {
beg: DAY - MS * 100,
end: DAY + MS * 1501,
};
let res = read_expanded_for_range(range, 0)?;
if res.0 != 3 {
Err(Error::with_msg(format!("unexpected number of events: {}", res.0)))?;
#[test]
fn read_expanded_3() -> Result<(), Error> {
use netpod::timeunits::*;
let range = netpod::NanoRange {
beg: DAY - MS * 1500,
end: DAY + MS * 1501,
};
let res = read_expanded_for_range(range, 0)?;
if res.0 != 4 {
Err(Error::with_msg(format!("unexpected number of events: {}", res.0)))?;
}
Ok(())
}
Ok(())
}
#[test]
fn read_expanded_3() -> Result<(), Error> {
use netpod::timeunits::*;
let range = netpod::NanoRange {
beg: DAY - MS * 1500,
end: DAY + MS * 1501,
};
let res = read_expanded_for_range(range, 0)?;
if res.0 != 4 {
Err(Error::with_msg(format!("unexpected number of events: {}", res.0)))?;
}
Ok(())
}

View File

@@ -5,8 +5,8 @@ use err::Error;
use futures_core::Stream;
use futures_util::StreamExt;
use items::{
Appendable, ByteEstimate, PushableIndex, RangeCompletableItem, SitemtyFrameType, StatsItem, StreamItem, WithLen,
WithTimestamps,
Appendable, ByteEstimate, Clearable, PushableIndex, RangeCompletableItem, SitemtyFrameType, StatsItem, StreamItem,
WithLen, WithTimestamps,
};
use netpod::histo::HistoLog2;
use netpod::log::*;
@@ -544,6 +544,19 @@ impl Appendable for EventFull {
}
}
impl Clearable for EventFull {
fn clear(&mut self) {
self.tss.clear();
self.pulses.clear();
self.blobs.clear();
self.decomps.clear();
self.scalar_types.clear();
self.be.clear();
self.shapes.clear();
self.comps.clear();
}
}
impl WithTimestamps for EventFull {
fn ts(&self, ix: usize) -> u64 {
self.tss[ix]
@@ -696,9 +709,9 @@ impl HasSeenBeforeRangeCount for EventChunker {
#[cfg(test)]
mod test {
use err::Error;
use netpod::timeunits::*;
use netpod::{ByteSize, Nanos};
//use err::Error;
//use netpod::timeunits::*;
//use netpod::{ByteSize, Nanos};
/*
#[test]

View File

@@ -1,15 +1,13 @@
use crate::HasSeenBeforeRangeCount;
use err::Error;
use futures_core::Stream;
use futures_util::StreamExt;
use items::ByteEstimate;
use items::{
Appendable, EventsNodeProcessor, LogItem, PushableIndex, RangeCompletableItem, Sitemty, StatsItem, StreamItem,
WithLen, WithTimestamps,
};
use items::{Appendable, LogItem, PushableIndex, RangeCompletableItem, Sitemty, StatsItem, StreamItem, WithTimestamps};
use items::{ByteEstimate, Clearable};
use netpod::histo::HistoLog2;
use netpod::log::*;
use netpod::ByteSize;
use netpod::EventDataReadStats;
use netpod::{log::*, NanoRange};
use std::collections::VecDeque;
use std::pin::Pin;
use std::task::{Context, Poll};
@@ -17,23 +15,23 @@ use std::task::{Context, Poll};
pub mod mergedblobsfromremotes;
pub mod mergedfromremotes;
const LOG_EMIT_ITEM: bool = false;
enum MergedCurVal<T> {
None,
Finish,
Val(T),
}
pub struct MergedStream<S, ENP>
where
S: Stream<Item = Sitemty<<ENP as EventsNodeProcessor>::Output>>,
ENP: EventsNodeProcessor,
{
pub struct MergedStream<S, ITY> {
inps: Vec<S>,
current: Vec<MergedCurVal<<ENP as EventsNodeProcessor>::Output>>,
current: Vec<MergedCurVal<ITY>>,
ixs: Vec<usize>,
errored: bool,
completed: bool,
batch: <ENP as EventsNodeProcessor>::Output,
batch: ITY,
range: NanoRange,
expand: bool,
ts_last_emit: u64,
range_complete_observed: Vec<bool>,
range_complete_observed_all: bool,
@@ -41,16 +39,14 @@ where
data_emit_complete: bool,
batch_size: ByteSize,
batch_len_emit_histo: HistoLog2,
emitted_after_range: usize,
pre_range_buf: ITY,
logitems: VecDeque<LogItem>,
event_data_read_stats_items: VecDeque<EventDataReadStats>,
}
// TODO get rid, log info explicitly.
impl<S, ENP> Drop for MergedStream<S, ENP>
where
S: Stream<Item = Sitemty<<ENP as EventsNodeProcessor>::Output>>,
ENP: EventsNodeProcessor,
{
impl<S, ITY> Drop for MergedStream<S, ITY> {
fn drop(&mut self) {
info!(
"MergedStream Drop Stats:\nbatch_len_emit_histo: {:?}",
@@ -59,13 +55,12 @@ where
}
}
impl<S, ENP> MergedStream<S, ENP>
impl<S, ITY> MergedStream<S, ITY>
where
S: Stream<Item = Sitemty<<ENP as EventsNodeProcessor>::Output>> + Unpin,
ENP: EventsNodeProcessor,
<ENP as EventsNodeProcessor>::Output: Appendable,
S: Stream<Item = Sitemty<ITY>> + Unpin,
ITY: Appendable + Unpin,
{
pub fn new(inps: Vec<S>) -> Self {
pub fn new(inps: Vec<S>, range: NanoRange, expand: bool) -> Self {
let n = inps.len();
let current = (0..n).into_iter().map(|_| MergedCurVal::None).collect();
Self {
@@ -74,7 +69,9 @@ where
ixs: vec![0; n],
errored: false,
completed: false,
batch: <<ENP as EventsNodeProcessor>::Output as Appendable>::empty(),
batch: <ITY as Appendable>::empty(),
range,
expand,
ts_last_emit: 0,
range_complete_observed: vec![false; n],
range_complete_observed_all: false,
@@ -82,6 +79,8 @@ where
data_emit_complete: false,
batch_size: ByteSize::kb(128),
batch_len_emit_histo: HistoLog2::new(0),
emitted_after_range: 0,
pre_range_buf: ITY::empty(),
logitems: VecDeque::new(),
event_data_read_stats_items: VecDeque::new(),
}
@@ -152,13 +151,12 @@ where
}
}
impl<S, ENP> Stream for MergedStream<S, ENP>
impl<S, ITY> Stream for MergedStream<S, ITY>
where
S: Stream<Item = Sitemty<<ENP as EventsNodeProcessor>::Output>> + Unpin,
ENP: EventsNodeProcessor,
<ENP as EventsNodeProcessor>::Output: PushableIndex + Appendable + ByteEstimate,
S: Stream<Item = Sitemty<ITY>> + Unpin,
ITY: PushableIndex + Appendable + Clearable + ByteEstimate + WithTimestamps + Unpin,
{
type Item = Sitemty<<ENP as EventsNodeProcessor>::Output>;
type Item = Sitemty<ITY>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
@@ -186,7 +184,6 @@ where
Ready(None)
}
} else {
// Can only run logic if all streams are either finished, errored or have some current value.
match self.replenish(cx) {
Ready(Ok(_)) => {
let mut lowest_ix = usize::MAX;
@@ -208,11 +205,30 @@ where
}
}
if lowest_ix == usize::MAX {
if self.pre_range_buf.len() == 1 {
let mut ldst = std::mem::replace(&mut self.batch, ITY::empty());
let ts4 = self.pre_range_buf.ts(0);
info!("\n\nMERGER enqueue after exhausted from stash {}", ts4);
ldst.push_index(&self.pre_range_buf, 0);
self.pre_range_buf.clear();
self.ts_last_emit = ts4;
self.batch = ldst;
} else if self.pre_range_buf.len() > 1 {
panic!();
} else {
};
if self.batch.len() != 0 {
let emp = <<ENP as EventsNodeProcessor>::Output>::empty();
let emp = ITY::empty();
let ret = std::mem::replace(&mut self.batch, emp);
self.batch_len_emit_histo.ingest(ret.len() as u32);
self.data_emit_complete = true;
if LOG_EMIT_ITEM {
let mut aa = vec![];
for ii in 0..ret.len() {
aa.push(ret.ts(ii));
}
info!("MergedBlobsStream A emits {} events tss {:?}", ret.len(), aa);
};
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret)))))
} else {
self.data_emit_complete = true;
@@ -220,18 +236,69 @@ where
}
} else {
assert!(lowest_ts >= self.ts_last_emit);
let emp = <<ENP as EventsNodeProcessor>::Output>::empty();
let mut local_batch = std::mem::replace(&mut self.batch, emp);
self.ts_last_emit = lowest_ts;
let rix = self.ixs[lowest_ix];
match &self.current[lowest_ix] {
MergedCurVal::Val(val) => {
local_batch.push_index(val, rix);
let do_emit_event;
let emit_packet_now_2;
if lowest_ts < self.range.beg {
do_emit_event = false;
emit_packet_now_2 = false;
if self.expand {
info!("\n\nMERGER stash {}", lowest_ts);
let mut ldst = std::mem::replace(&mut self.pre_range_buf, ITY::empty());
ldst.clear();
let rix = self.ixs[lowest_ix];
match &self.current[lowest_ix] {
MergedCurVal::Val(val) => ldst.push_index(val, rix),
MergedCurVal::None => panic!(),
MergedCurVal::Finish => panic!(),
}
self.pre_range_buf = ldst;
} else {
};
} else if lowest_ts >= self.range.end {
if self.expand {
if self.emitted_after_range == 0 {
do_emit_event = true;
emit_packet_now_2 = true;
self.emitted_after_range += 1;
self.range_complete_observed_all = true;
self.data_emit_complete = true;
} else {
do_emit_event = false;
emit_packet_now_2 = false;
};
} else {
do_emit_event = false;
emit_packet_now_2 = false;
self.data_emit_complete = true;
};
} else {
do_emit_event = true;
emit_packet_now_2 = false;
};
if do_emit_event {
let mut ldst = std::mem::replace(&mut self.batch, ITY::empty());
if self.pre_range_buf.len() == 1 {
let ts4 = self.pre_range_buf.ts(0);
info!("\n\nMERGER enqueue from stash {}", ts4);
ldst.push_index(&self.pre_range_buf, 0);
self.pre_range_buf.clear();
} else if self.pre_range_buf.len() > 1 {
panic!();
} else {
info!("\n\nMERGER nothing in stash");
};
info!("\n\nMERGER enqueue {}", lowest_ts);
self.ts_last_emit = lowest_ts;
let rix = self.ixs[lowest_ix];
match &self.current[lowest_ix] {
MergedCurVal::Val(val) => {
ldst.push_index(val, rix);
}
MergedCurVal::None => panic!(),
MergedCurVal::Finish => panic!(),
}
MergedCurVal::None => panic!(),
MergedCurVal::Finish => panic!(),
self.batch = ldst;
}
self.batch = local_batch;
self.ixs[lowest_ix] += 1;
let curlen = match &self.current[lowest_ix] {
MergedCurVal::Val(val) => val.len(),
@@ -242,11 +309,24 @@ where
self.ixs[lowest_ix] = 0;
self.current[lowest_ix] = MergedCurVal::None;
}
if self.batch.byte_estimate() >= self.batch_size.bytes() as u64 {
let emit_packet_now;
if emit_packet_now_2 || self.batch.byte_estimate() >= self.batch_size.bytes() as u64 {
emit_packet_now = true;
} else {
emit_packet_now = false;
};
if emit_packet_now {
trace!("emit item because over threshold len {}", self.batch.len());
let emp = <<ENP as EventsNodeProcessor>::Output>::empty();
let emp = ITY::empty();
let ret = std::mem::replace(&mut self.batch, emp);
self.batch_len_emit_histo.ingest(ret.len() as u32);
if LOG_EMIT_ITEM {
let mut aa = vec![];
for ii in 0..ret.len() {
aa.push(ret.ts(ii));
}
info!("MergedBlobsStream B emits {} events tss {:?}", ret.len(), aa);
};
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret)))))
} else {
continue 'outer;
@@ -263,3 +343,125 @@ where
}
}
}
impl<S, ITY> HasSeenBeforeRangeCount for MergedStream<S, ITY>
where
S: Stream<Item = Sitemty<ITY>> + Unpin,
ITY: Unpin,
{
fn seen_before_range_count(&self) -> usize {
// TODO (only for debug)
0
}
}
#[cfg(test)]
mod test {
use crate::dataopen::position_file_for_test;
use crate::eventchunker::{EventChunker, EventChunkerConf};
use crate::file_content_stream;
use err::Error;
use futures_util::StreamExt;
use items::{RangeCompletableItem, StreamItem};
use netpod::log::*;
use netpod::timeunits::{DAY, MS};
use netpod::{ByteOrder, ByteSize, Channel, ChannelConfig, FileIoBufferSize, NanoRange, Nanos, ScalarType, Shape};
use std::path::PathBuf;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
const SCALAR_FILE: &str =
"../tmpdata/node00/ks_2/byTime/scalar-i32-be/0000000000000000001/0000000000/0000000000086400000_00000_Data";
const WAVE_FILE: &str =
"../tmpdata/node00/ks_3/byTime/wave-f64-be-n21/0000000000000000001/0000000000/0000000000086400000_00000_Data";
#[derive(Debug)]
struct CollectedEvents {
tss: Vec<u64>,
}
async fn collect_merged_events(paths: Vec<PathBuf>, range: NanoRange) -> Result<CollectedEvents, Error> {
let mut files = vec![];
for path in paths {
let p = position_file_for_test(&path, &range, false, false).await?;
if !p.found {
return Err(Error::with_msg_no_trace("can not position file??"));
}
files.push(
p.file
.file
.ok_or_else(|| Error::with_msg(format!("can not open file {:?}", path)))?,
);
}
//Merge
let file_io_buffer_size = FileIoBufferSize(1024 * 4);
let inp = file_content_stream(err::todoval(), file_io_buffer_size);
let inp = Box::pin(inp);
let channel_config = ChannelConfig {
channel: Channel {
backend: "testbackend".into(),
name: "scalar-i32-be".into(),
},
keyspace: 2,
time_bin_size: Nanos { ns: DAY },
scalar_type: ScalarType::I32,
byte_order: ByteOrder::BE,
array: false,
compression: false,
shape: Shape::Scalar,
};
let stats_conf = EventChunkerConf {
disk_stats_every: ByteSize::kb(1024),
};
let max_ts = Arc::new(AtomicU64::new(0));
let expand = false;
let do_decompress = false;
let dbg_path = err::todoval();
// TODO `expand` flag usage
let mut chunker = EventChunker::from_event_boundary(
inp,
channel_config,
range,
stats_conf,
dbg_path,
max_ts,
expand,
do_decompress,
);
let mut cevs = CollectedEvents { tss: vec![] };
let mut i1 = 0;
while let Some(item) = chunker.next().await {
if let Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) = item {
info!("item: {:?}", item);
for ts in item.tss {
cevs.tss.push(ts);
}
i1 += 1;
}
if i1 >= 10 {
break;
}
}
info!("read {} data items", i1);
info!("cevs: {:?}", cevs);
err::todoval()
}
#[test]
fn single_file_through_merger() -> Result<(), Error> {
let fut = async {
let range = NanoRange {
beg: DAY + MS * 1501,
end: DAY + MS * 4000,
};
let path = PathBuf::from(SCALAR_FILE);
collect_merged_events(vec![path], range).await?;
Ok(())
};
taskrun::run(fut)
}
}

View File

@@ -1,12 +1,12 @@
use crate::eventchunker::EventFull;
use crate::mergeblobs::MergedBlobsStream;
use crate::merge::MergedStream;
use crate::raw::client::x_processed_event_blobs_stream_from_node;
use err::Error;
use futures_core::Stream;
use futures_util::{pin_mut, StreamExt};
use items::Sitemty;
use netpod::log::*;
use netpod::query::RawEventsQuery;
use netpod::{log::*, NanoRange};
use netpod::{Cluster, PerfOpts};
use std::future::Future;
use std::pin::Pin;
@@ -21,6 +21,8 @@ pub struct MergedBlobsFromRemotes {
merged: Option<T001<EventFull>>,
completed: bool,
errored: bool,
range: NanoRange,
expand: bool,
}
impl MergedBlobsFromRemotes {
@@ -39,6 +41,8 @@ impl MergedBlobsFromRemotes {
merged: None,
completed: false,
errored: false,
range: evq.range.clone(),
expand: evq.agg_kind.need_expand(),
}
}
}
@@ -95,7 +99,7 @@ impl Stream for MergedBlobsFromRemotes {
} else {
if c1 == self.tcp_establish_futs.len() {
let inps: Vec<_> = self.nodein.iter_mut().map(|k| k.take().unwrap()).collect();
let s1 = MergedBlobsStream::new(inps);
let s1 = MergedStream::new(inps, self.range.clone(), self.expand);
self.merged = Some(Box::pin(s1));
}
continue 'outer;

View File

@@ -3,9 +3,9 @@ use crate::raw::client::x_processed_stream_from_node;
use err::Error;
use futures_core::Stream;
use futures_util::{pin_mut, StreamExt};
use items::{Appendable, EventsNodeProcessor, FrameType, PushableIndex, Sitemty};
use netpod::log::*;
use items::{Appendable, Clearable, EventsNodeProcessor, FrameType, PushableIndex, Sitemty};
use netpod::query::RawEventsQuery;
use netpod::{log::*, NanoRange};
use netpod::{Cluster, PerfOpts};
use std::future::Future;
use std::pin::Pin;
@@ -23,13 +23,14 @@ where
merged: Option<T001<<ENP as EventsNodeProcessor>::Output>>,
completed: bool,
errored: bool,
range: NanoRange,
expand: bool,
}
impl<ENP> MergedFromRemotes<ENP>
where
ENP: EventsNodeProcessor + 'static,
<ENP as EventsNodeProcessor>::Output: 'static,
<ENP as EventsNodeProcessor>::Output: Unpin,
<ENP as EventsNodeProcessor>::Output: Unpin + PushableIndex + Appendable + Clearable + 'static,
Sitemty<<ENP as EventsNodeProcessor>::Output>: FrameType,
{
pub fn new(evq: RawEventsQuery, perf_opts: PerfOpts, cluster: Cluster) -> Self {
@@ -47,6 +48,8 @@ where
merged: None,
completed: false,
errored: false,
range: evq.range.clone(),
expand: evq.agg_kind.need_expand(),
}
}
}
@@ -54,7 +57,7 @@ where
impl<ENP> Stream for MergedFromRemotes<ENP>
where
ENP: EventsNodeProcessor + 'static,
<ENP as EventsNodeProcessor>::Output: PushableIndex + Appendable,
<ENP as EventsNodeProcessor>::Output: PushableIndex + Appendable + Clearable,
{
type Item = Sitemty<<ENP as EventsNodeProcessor>::Output>;
@@ -107,7 +110,11 @@ where
} else {
if c1 == self.tcp_establish_futs.len() {
let inps: Vec<_> = self.nodein.iter_mut().map(|k| k.take().unwrap()).collect();
let s1 = MergedStream::<_, ENP>::new(inps);
let s1 = MergedStream::<_, <ENP as EventsNodeProcessor>::Output>::new(
inps,
self.range.clone(),
self.expand,
);
self.merged = Some(Box::pin(s1));
}
continue 'outer;

View File

@@ -63,6 +63,8 @@ where
I: Unpin + Appendable + WithTimestamps + PushableIndex + WithLen + ByteEstimate,
{
pub fn new(inps: Vec<S>) -> Self {
// TODO remove MergedBlobsStream
err::todo();
let n = inps.len();
let current = (0..n).into_iter().map(|_| MergedCurVal::None).collect();
Self {
@@ -157,6 +159,8 @@ where
type Item = Sitemty<I>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
// TODO remove MergedBlobsStream
err::todo();
use Poll::*;
'outer: loop {
break if self.completed {
@@ -284,161 +288,3 @@ where
0
}
}
#[cfg(test)]
mod test {
use crate::dataopen::position_file_for_test;
use crate::eventchunker::{EventChunker, EventChunkerConf};
use crate::file_content_stream;
use err::Error;
use futures_util::StreamExt;
use items::{RangeCompletableItem, StreamItem};
use netpod::log::*;
use netpod::timeunits::{DAY, MS};
use netpod::{ByteOrder, ByteSize, Channel, ChannelConfig, FileIoBufferSize, NanoRange, Nanos, ScalarType, Shape};
use std::path::PathBuf;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
const SCALAR_FILE: &str =
"../tmpdata/node00/ks_2/byTime/scalar-i32-be/0000000000000000001/0000000000/0000000000086400000_00000_Data";
const WAVE_FILE: &str =
"../tmpdata/node00/ks_3/byTime/wave-f64-be-n21/0000000000000000001/0000000000/0000000000086400000_00000_Data";
struct CollectedEvents {
tss: Vec<u64>,
}
// TODO generify the Mergers into one.
async fn collect_merged_events(paths: Vec<PathBuf>, range: NanoRange) -> Result<CollectedEvents, Error> {
let mut files = vec![];
for path in paths {
let p = position_file_for_test(&path, &range, false, false).await?;
if !p.found {
return Err(Error::with_msg_no_trace("can not position file??"));
}
files.push(
p.file
.file
.ok_or_else(|| Error::with_msg(format!("can not open file {:?}", path)))?,
);
}
//Merge
let file_io_buffer_size = FileIoBufferSize(1024 * 4);
let inp = file_content_stream(err::todoval(), file_io_buffer_size);
let inp = Box::pin(inp);
let channel_config = ChannelConfig {
channel: Channel {
backend: "testbackend".into(),
name: "scalar-i32-be".into(),
},
keyspace: 2,
time_bin_size: Nanos { ns: DAY },
scalar_type: ScalarType::I32,
byte_order: ByteOrder::BE,
array: false,
compression: false,
shape: Shape::Scalar,
};
let stats_conf = EventChunkerConf {
disk_stats_every: ByteSize::kb(1024),
};
let max_ts = Arc::new(AtomicU64::new(0));
let expand = false;
let do_decompress = false;
let dbg_path = err::todoval();
// TODO `expand` flag usage
let mut chunker = EventChunker::from_event_boundary(
inp,
channel_config,
range,
stats_conf,
dbg_path,
max_ts,
expand,
do_decompress,
);
let mut i1 = 0;
while let Some(item) = chunker.next().await {
if let Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) = item {
info!("item: {:?}", item);
i1 += 1;
}
if i1 >= 10 {
break;
}
}
info!("read {} data items", i1);
err::todoval()
}
#[test]
fn single_file_through_merger() -> Result<(), Error> {
let fut = async {
// TODO open a single file, model after the real opening procedure.
//let file = OpenOptions::new().read(true).open(SCALAR_FILE).await?;
let range = NanoRange {
beg: DAY + MS * 1501,
end: DAY + MS * 4000,
};
let path = PathBuf::from(SCALAR_FILE);
let p = position_file_for_test(&path, &range, false, false).await?;
if !p.found {
return Err(Error::with_msg_no_trace("can not position file??"));
}
let file_io_buffer_size = FileIoBufferSize(1024 * 4);
let inp = file_content_stream(p.file.file.unwrap(), file_io_buffer_size);
let inp = Box::pin(inp);
let channel_config = ChannelConfig {
channel: Channel {
backend: "testbackend".into(),
name: "scalar-i32-be".into(),
},
keyspace: 2,
time_bin_size: Nanos { ns: DAY },
scalar_type: ScalarType::I32,
byte_order: ByteOrder::BE,
array: false,
compression: false,
shape: Shape::Scalar,
};
let stats_conf = EventChunkerConf {
disk_stats_every: ByteSize::kb(1024),
};
let max_ts = Arc::new(AtomicU64::new(0));
let expand = false;
let do_decompress = false;
// TODO `expand` flag usage
let mut chunker = EventChunker::from_event_boundary(
inp,
channel_config,
range,
stats_conf,
path,
max_ts,
expand,
do_decompress,
);
let mut i1 = 0;
while let Some(item) = chunker.next().await {
if let Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) = item {
info!("item: {:?}", item);
i1 += 1;
}
if i1 >= 10 {
break;
}
}
info!("read {} data items", i1);
Ok(())
};
// TODO in general, emit the error message in taskrun::run?
taskrun::run(fut)
}
}

View File

@@ -2,9 +2,9 @@ use crate::minmaxavgbins::MinMaxAvgBins;
use crate::numops::NumOps;
use crate::streams::{Collectable, Collector};
use crate::{
ts_offs_from_abs, Appendable, ByteEstimate, EventAppendable, FilterFittingInside, Fits, FitsInside, PushableIndex,
RangeOverlapInfo, ReadPbv, ReadableFromFile, SitemtyFrameType, TimeBinnableType, TimeBinnableTypeAggregator,
WithLen, WithTimestamps,
ts_offs_from_abs, Appendable, ByteEstimate, Clearable, EventAppendable, FilterFittingInside, Fits, FitsInside,
PushableIndex, RangeOverlapInfo, ReadPbv, ReadableFromFile, SitemtyFrameType, TimeBinnableType,
TimeBinnableTypeAggregator, WithLen, WithTimestamps,
};
use err::Error;
use netpod::timeunits::*;
@@ -166,6 +166,13 @@ where
}
}
impl<NTY> Clearable for EventValues<NTY> {
fn clear(&mut self) {
self.tss.clear();
self.values.clear();
}
}
impl<NTY> ReadableFromFile for EventValues<NTY>
where
NTY: NumOps,

View File

@@ -307,6 +307,10 @@ pub trait Appendable: WithLen {
fn append(&mut self, src: &Self);
}
pub trait Clearable {
fn clear(&mut self);
}
pub trait EventAppendable {
type Value;
fn append_event(&mut self, ts: u64, value: Self::Value);

View File

@@ -3,7 +3,7 @@ use crate::numops::NumOps;
use crate::xbinnedscalarevents::XBinnedScalarEvents;
use crate::xbinnedwaveevents::XBinnedWaveEvents;
use crate::{
Appendable, ByteEstimate, EventAppendable, EventsNodeProcessor, FilterFittingInside, Fits, FitsInside,
Appendable, ByteEstimate, Clearable, EventAppendable, EventsNodeProcessor, FilterFittingInside, Fits, FitsInside,
PushableIndex, RangeOverlapInfo, ReadPbv, ReadableFromFile, SitemtyFrameType, SubFrId, TimeBinnableType,
TimeBinnableTypeAggregator, WithLen, WithTimestamps,
};
@@ -140,6 +140,13 @@ where
}
}
impl<NTY> Clearable for WaveEvents<NTY> {
fn clear(&mut self) {
self.tss.clear();
self.vals.clear();
}
}
impl<NTY> ReadableFromFile for WaveEvents<NTY>
where
NTY: NumOps,

View File

@@ -2,9 +2,9 @@ use crate::minmaxavgbins::MinMaxAvgBins;
use crate::numops::NumOps;
use crate::streams::{Collectable, Collector};
use crate::{
ts_offs_from_abs, Appendable, ByteEstimate, FilterFittingInside, Fits, FitsInside, PushableIndex, RangeOverlapInfo,
ReadPbv, ReadableFromFile, SitemtyFrameType, SubFrId, TimeBinnableType, TimeBinnableTypeAggregator, WithLen,
WithTimestamps,
ts_offs_from_abs, Appendable, ByteEstimate, Clearable, FilterFittingInside, Fits, FitsInside, PushableIndex,
RangeOverlapInfo, ReadPbv, ReadableFromFile, SitemtyFrameType, SubFrId, TimeBinnableType,
TimeBinnableTypeAggregator, WithLen, WithTimestamps,
};
use err::Error;
use netpod::timeunits::SEC;
@@ -146,6 +146,15 @@ where
}
}
impl<NTY> Clearable for XBinnedScalarEvents<NTY> {
fn clear(&mut self) {
self.tss.clear();
self.avgs.clear();
self.mins.clear();
self.maxs.clear();
}
}
impl<NTY> ReadableFromFile for XBinnedScalarEvents<NTY>
where
NTY: NumOps,

View File

@@ -2,8 +2,9 @@ use crate::minmaxavgwavebins::MinMaxAvgWaveBins;
use crate::numops::NumOps;
use crate::streams::{Collectable, Collector};
use crate::{
Appendable, ByteEstimate, FilterFittingInside, Fits, FitsInside, PushableIndex, RangeOverlapInfo, ReadPbv,
ReadableFromFile, SitemtyFrameType, SubFrId, TimeBinnableType, TimeBinnableTypeAggregator, WithLen, WithTimestamps,
Appendable, ByteEstimate, Clearable, FilterFittingInside, Fits, FitsInside, PushableIndex, RangeOverlapInfo,
ReadPbv, ReadableFromFile, SitemtyFrameType, SubFrId, TimeBinnableType, TimeBinnableTypeAggregator, WithLen,
WithTimestamps,
};
use err::Error;
use netpod::log::*;
@@ -147,6 +148,15 @@ where
}
}
impl<NTY> Clearable for XBinnedWaveEvents<NTY> {
fn clear(&mut self) {
self.tss.clear();
self.mins.clear();
self.maxs.clear();
self.avgs.clear();
}
}
impl<NTY> ReadableFromFile for XBinnedWaveEvents<NTY>
where
NTY: NumOps,