WIP typechecks

This commit is contained in:
Dominik Werder
2024-10-22 16:14:32 +02:00
parent 773da33777
commit f754c5c962
36 changed files with 631 additions and 273 deletions

View File

@@ -1,6 +1,6 @@
[package]
name = "daqbuffer"
version = "0.5.3-aa.4"
version = "0.5.3-aa.5"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"

View File

@@ -51,10 +51,13 @@ async fn position_file(
match OpenOptions::new().read(true).open(&index_path).await {
Ok(mut index_file) => {
let meta = index_file.metadata().await?;
if meta.len() > 1024 * 1024 * 120 {
if meta.len() > 1024 * 1024 * 500 {
let msg = format!("too large index file {} bytes for {:?}", meta.len(), index_path);
error!("{}", msg);
return Err(Error::with_msg(msg));
} else if meta.len() > 1024 * 1024 * 200 {
let msg = format!("very large index file {} bytes for {:?}", meta.len(), index_path);
warn!("{}", msg);
} else if meta.len() > 1024 * 1024 * 80 {
let msg = format!("very large index file {} bytes for {:?}", meta.len(), index_path);
warn!("{}", msg);
@@ -184,12 +187,25 @@ async fn position_file(
}
pub struct OpenedFile {
pub pos: u64,
pub path: PathBuf,
pub file: Option<File>,
pub positioned: bool,
pub index: bool,
pub nreads: u32,
pub pos: u64,
}
impl fmt::Debug for OpenedFile {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("OpenedFile")
.field("pos", &self.pos)
.field("path", &self.path)
.field("file", &self.file.is_some())
.field("positioned", &self.positioned)
.field("index", &self.index)
.field("nreads", &self.nreads)
.finish()
}
}
#[derive(Debug)]
@@ -198,18 +214,6 @@ pub struct OpenedFileSet {
pub files: Vec<OpenedFile>,
}
impl fmt::Debug for OpenedFile {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("OpenedFile")
.field("path", &self.path)
.field("file", &self.file)
.field("positioned", &self.positioned)
.field("index", &self.index)
.field("nreads", &self.nreads)
.finish()
}
}
pub fn open_files(
range: &NanoRange,
fetch_info: &SfChFetchInfo,
@@ -299,7 +303,7 @@ pub fn open_expanded_files(
Ok(_) => {}
Err(e) => {
// To be expected
debug!("open_files channel send error {:?}", e);
debug!("open_expanded_files channel send error {:?}", e);
}
},
}
@@ -345,18 +349,19 @@ async fn open_expanded_files_inner(
) -> Result<(), Error> {
let fetch_info = fetch_info.clone();
let timebins = get_timebins(&fetch_info, node.clone()).await?;
debug!("timebins {timebins:?}");
if timebins.len() == 0 {
return Ok(());
}
let mut p1 = None;
for (i1, tb) in timebins.iter().enumerate().rev() {
for (i, tb) in timebins.iter().enumerate().rev() {
let ts_bin = TsNano::from_ns(tb * fetch_info.bs().ns());
if ts_bin.ns() <= range.beg {
p1 = Some(i1);
p1 = Some(i);
break;
}
}
let mut p1 = if let Some(i1) = p1 { i1 } else { 0 };
let mut p1 = if let Some(i) = p1 { i } else { 0 };
if p1 >= timebins.len() {
return Err(Error::with_msg(format!(
"logic error p1 {} range {:?} fetch_info {:?}",
@@ -370,9 +375,11 @@ async fn open_expanded_files_inner(
for path in paths::datapaths_for_timebin(tb, &fetch_info, &node).await? {
let w = position_file(&path, range, true, false).await?;
if w.found {
debug!("----- open_expanded_files_inner w.found for {:?}", path);
debug!("----- open_expanded_files_inner FOUND tb {:?} path {:?}", tb, path);
a.push(w.file);
found_pre = true;
} else {
debug!("----- open_expanded_files_inner UNFND tb {:?} path {:?}", tb, path);
}
}
let h = OpenedFileSet { timebin: tb, files: a };

View File

@@ -73,7 +73,6 @@ pub struct EventChunker {
node_ix: usize,
dbg_path: PathBuf,
last_ts: u64,
expand: bool,
item_len_emit_histo: HistoLog2,
seen_before_range_count: usize,
seen_after_range_count: usize,
@@ -144,7 +143,6 @@ impl EventChunker {
stats_conf: EventChunkerConf,
node_ix: usize,
dbg_path: PathBuf,
expand: bool,
) -> Self {
debug!("{}::{} node {}", Self::self_name(), "from_start", node_ix);
let need_min_max = match fetch_info.shape() {
@@ -172,7 +170,6 @@ impl EventChunker {
dbg_path,
node_ix,
last_ts: 0,
expand,
item_len_emit_histo: HistoLog2::new(0),
seen_before_range_count: 0,
seen_after_range_count: 0,
@@ -188,7 +185,6 @@ impl EventChunker {
}
}
// TODO `expand` flag usage
pub fn from_event_boundary(
inp: Pin<Box<dyn Stream<Item = Result<FileChunkRead, Error>> + Send>>,
fetch_info: SfChFetchInfo,
@@ -196,10 +192,9 @@ impl EventChunker {
stats_conf: EventChunkerConf,
node_ix: usize,
dbg_path: PathBuf,
expand: bool,
) -> Self {
debug!("{}::{} node {}", Self::self_name(), "from_event_boundary", node_ix);
let mut ret = Self::from_start(inp, fetch_info, range, stats_conf, node_ix, dbg_path, expand);
let mut ret = Self::from_start(inp, fetch_info, range, stats_conf, node_ix, dbg_path);
ret.state = DataFileState::Event;
ret.need_min = 4;
ret.inp.set_need_min(4);
@@ -324,15 +319,15 @@ impl EventChunker {
discard = true;
self.discard_count_range += 1;
self.seen_after_range_count += 1;
if !self.expand || self.seen_after_range_count >= 2 {
self.seen_beyond_range = true;
self.seen_beyond_range = true;
if self.seen_after_range_count >= 2 {
self.data_emit_complete = true;
break;
}
}
if ts < self.range.beg {
discard = true;
self.discard_count_range += 1;
// discard = true;
// self.discard_count_range += 1;
self.seen_before_range_count += 1;
if self.seen_before_range_count < 20 {
let msg = format!(
@@ -349,6 +344,8 @@ impl EventChunker {
self.dbg_path
);
warn!("{}", msg);
let item = LogItem::from_node(self.node_ix, Level::INFO, msg);
self.log_items.push_back(item);
}
if self.seen_before_range_count > 100 {
let msg = format!(
@@ -474,13 +471,17 @@ impl EventChunker {
shape_this,
comp_this,
);
match ret.shape_derived(ret.len() - 1, self.fetch_info.shape()) {
match ret.shape_derived(
ret.len() - 1,
self.fetch_info.scalar_type(),
self.fetch_info.shape(),
) {
Ok(sh) => {
if sh.ne(self.fetch_info.shape()) {
self.discard_count_shape_derived += 1;
ret.pop_back();
let msg = format!(
"shape_derived mismatch {:?} {:?} {:?}",
"EventChunker shape_derived mismatch {:?} {:?} {:?}",
self.fetch_info.scalar_type(),
self.fetch_info.shape(),
sh,
@@ -493,7 +494,7 @@ impl EventChunker {
self.discard_count_shape_derived_err += 1;
ret.pop_back();
let msg = format!(
"shape_derived error {} {:?} {:?}",
"EventChunker shape_derived error {} {:?} {:?}",
e,
self.fetch_info.scalar_type(),
self.fetch_info.shape(),

View File

@@ -39,7 +39,7 @@ pub struct EventChunkerMultifile {
range: NanoRange,
files_count: u32,
node_ix: usize,
expand: bool,
one_before: bool,
max_ts: u64,
out_max_len: usize,
emit_count: usize,
@@ -64,12 +64,12 @@ impl EventChunkerMultifile {
node_ix: usize,
disk_io_tune: DiskIoTune,
event_chunker_conf: EventChunkerConf,
expand: bool,
one_before: bool,
out_max_len: usize,
reqctx: ReqCtxArc,
) -> Self {
debug!("EventChunkerMultifile expand {expand}");
let file_chan = if expand {
debug!("EventChunkerMultifile one_before {one_before}");
let file_chan = if one_before {
open_expanded_files(&range, &fetch_info, node)
} else {
open_files(&range, &fetch_info, reqctx.reqid(), node)
@@ -83,7 +83,7 @@ impl EventChunkerMultifile {
range,
files_count: 0,
node_ix,
expand,
one_before,
max_ts: 0,
out_max_len,
emit_count: 0,
@@ -129,6 +129,9 @@ impl Stream for EventChunkerMultifile {
if h.len() > 0 {
let min = h.tss.iter().fold(u64::MAX, |a, &x| a.min(x));
let max = h.tss.iter().fold(u64::MIN, |a, &x| a.max(x));
if min < self.range.beg() {
debug!("ITEM BEFORE RANGE (how many?)");
}
if min <= self.max_ts {
let msg = format!("EventChunkerMultifile repeated or unordered ts {}", min);
error!("{}", msg);
@@ -180,13 +183,19 @@ impl Stream for EventChunkerMultifile {
None => match self.file_chan.poll_next_unpin(cx) {
Ready(Some(k)) => match k {
Ok(ofs) => {
let msg = format!("received files for timebin {:?}", ofs.timebin);
let item = LogItem::from_node(self.node_ix, Level::INFO, msg);
self.log_queue.push_back(item);
for e in &ofs.files {
let msg = format!("file {:?}", e);
let item = LogItem::from_node(self.node_ix, Level::INFO, msg);
self.log_queue.push_back(item);
}
self.files_count += ofs.files.len() as u32;
if ofs.files.len() == 1 {
let mut ofs = ofs;
let file = ofs.files.pop().unwrap();
let path = file.path;
let msg = format!("use opened files {:?}", ofs);
let item = LogItem::from_node(self.node_ix, Level::DEBUG, msg);
match file.file {
Some(file) => {
let inp = Box::pin(crate::file_content_stream(
@@ -202,22 +211,19 @@ impl Stream for EventChunkerMultifile {
self.event_chunker_conf.clone(),
self.node_ix,
path.clone(),
self.expand,
);
let filtered = RangeFilter2::new(chunker, self.range.clone(), self.expand);
let filtered =
RangeFilter2::new(chunker, self.range.clone(), self.one_before);
self.evs = Some(Box::pin(filtered));
}
None => {}
}
Ready(Some(Ok(StreamItem::Log(item))))
continue;
} else if ofs.files.len() == 0 {
let msg = format!("use opened files {:?} no files", ofs);
let item = LogItem::from_node(self.node_ix, Level::DEBUG, msg);
Ready(Some(Ok(StreamItem::Log(item))))
} else {
// let paths: Vec<_> = ofs.files.iter().map(|x| &x.path).collect();
let msg = format!("use opened files {:?} locally merged", ofs);
let item = LogItem::from_node(self.node_ix, Level::DEBUG, msg);
let mut chunkers = Vec::new();
for of in ofs.files {
if let Some(file) = of.file {
@@ -234,14 +240,15 @@ impl Stream for EventChunkerMultifile {
self.event_chunker_conf.clone(),
self.node_ix,
of.path.clone(),
self.expand,
);
chunkers.push(Box::pin(chunker) as _);
}
}
let merged = Merger::new(chunkers, Some(self.out_max_len as u32));
let filtered = RangeFilter2::new(merged, self.range.clone(), self.expand);
let filtered = RangeFilter2::new(merged, self.range.clone(), self.one_before);
self.evs = Some(Box::pin(filtered));
let msg = format!("LOCALLY MERGED");
let item = LogItem::from_node(self.node_ix, Level::DEBUG, msg);
Ready(Some(Ok(StreamItem::Log(item))))
}
}

View File

@@ -6,6 +6,7 @@ use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
use items_0::WithLen;
use items_2::eventfull::EventFull;
use netpod::ScalarType;
use netpod::Shape;
use std::collections::VecDeque;
use std::pin::Pin;
@@ -15,6 +16,7 @@ use tracing::Level;
pub struct EventFullShapeFilter<INP> {
inp: INP,
scalar_type_exp: ScalarType,
shape_exp: Shape,
node_ix: usize,
log_items: VecDeque<LogItem>,
@@ -25,11 +27,14 @@ impl<INP> EventFullShapeFilter<INP> {
let node_ix = self.node_ix;
let p: Vec<_> = (0..item.len())
.map(|i| {
let sh = item.shape_derived(i, &self.shape_exp);
let sh = item.shape_derived(i, &self.scalar_type_exp, &self.shape_exp);
match sh {
Ok(sh) => {
if sh.ne(&self.shape_exp) {
let msg = format!("shape_derived mismatch {:?} {:?}", sh, self.shape_exp);
let msg = format!(
"EventFullShapeFilter shape_derived mismatch {:?} {:?}",
sh, self.shape_exp
);
let item = LogItem::from_node(node_ix, Level::WARN, msg);
self.log_items.push_back(item);
false
@@ -38,7 +43,10 @@ impl<INP> EventFullShapeFilter<INP> {
}
}
Err(_) => {
let msg = format!("shape_derived mismatch {:?} {:?}", sh, self.shape_exp);
let msg = format!(
"EventFullShapeFilter shape_derived mismatch {:?} {:?}",
sh, self.shape_exp
);
let item = LogItem::from_node(self.node_ix, Level::WARN, msg);
self.log_items.push_back(item);
false

View File

@@ -62,11 +62,8 @@ pub async fn make_event_pipe(
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>>, Error> {
// sf-databuffer type backends identify channels by their (backend, name) only.
let range = evq.range().clone();
let one_before = evq.transform().need_one_before_range();
info!(
"make_event_pipe need_expand {need_expand} {evq:?}",
need_expand = one_before
);
let one_before = evq.need_one_before_range();
info!("make_event_pipe one_before {one_before} {evq:?}");
let event_chunker_conf = EventChunkerConf::new(ByteSize::from_kb(1024));
// TODO should not need this for correctness.
// Should limit based on return size and latency.
@@ -94,7 +91,7 @@ pub async fn make_event_pipe(
pub fn make_event_blobs_stream(
range: NanoRange,
fetch_info: SfChFetchInfo,
expand: bool,
one_before: bool,
event_chunker_conf: EventChunkerConf,
disk_io_tune: DiskIoTune,
reqctx: ReqCtxArc,
@@ -115,7 +112,7 @@ pub fn make_event_blobs_stream(
ncc.ix,
disk_io_tune,
event_chunker_conf,
expand,
one_before,
out_max_len,
reqctx,
);
@@ -128,13 +125,13 @@ pub fn make_event_blobs_pipe_real(
reqctx: ReqCtxArc,
ncc: &NodeConfigCached,
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<EventFull>> + Send>>, Error> {
let expand = subq.transform().need_one_before_range();
let one_before = subq.need_one_before_range();
let range = subq.range();
let event_chunker_conf = EventChunkerConf::new(ByteSize::from_kb(1024));
let event_blobs = make_event_blobs_stream(
range.try_into()?,
fetch_info.clone(),
expand,
one_before,
event_chunker_conf,
subq.disk_io_tune(),
reqctx,

View File

@@ -94,7 +94,7 @@ pub fn main() -> Result<(), Error> {
let stats_conf = EventChunkerConf {
disk_stats_every: ByteSize::from_mb(2),
};
let _chunks = EventChunker::from_start(inp, fetch_info, range, stats_conf, 0, path.clone(), true);
let _chunks = EventChunker::from_start(inp, fetch_info, range, stats_conf, 0, path.clone());
err::todo();
Ok(())
}

View File

@@ -9,6 +9,7 @@ pub mod subfr;
pub mod test;
pub mod timebin;
pub mod transform;
pub mod vecpreview;
pub mod bincode {
pub use bincode::*;
@@ -21,6 +22,7 @@ use container::ByteEstimate;
use std::any::Any;
use std::collections::VecDeque;
use std::fmt;
use timebin::BinningggContainerEventsDyn;
use timebin::TimeBinnable;
pub trait WithLen {
@@ -158,6 +160,7 @@ pub trait Events:
fn clear(&mut self);
// TODO: can not name EventsDim0 from here, so use trait object for now. Anyway is a workaround.
fn to_dim0_f32_for_binning(&self) -> Box<dyn Events>;
fn to_container_events(&self) -> Box<dyn BinningggContainerEventsDyn>;
}
impl WithLen for Box<dyn Events> {
@@ -296,4 +299,8 @@ impl Events for Box<dyn Events> {
fn to_dim0_f32_for_binning(&self) -> Box<dyn Events> {
Events::to_dim0_f32_for_binning(self.as_ref())
}
fn to_container_events(&self) -> Box<dyn BinningggContainerEventsDyn> {
Events::to_container_events(self.as_ref())
}
}

View File

@@ -4,18 +4,24 @@ use crate::collect_s::Collectable;
use crate::collect_s::Collector;
use crate::collect_s::ToJsonResult;
use crate::overlap::RangeOverlapInfo;
use crate::vecpreview::PreviewRange;
use crate::AsAnyMut;
use crate::AsAnyRef;
use crate::Empty;
use crate::Events;
use crate::Resettable;
use crate::TypeName;
use crate::WithLen;
use err::thiserror;
use err::Error;
use err::ThisError;
use netpod::log::*;
use netpod::range::evrange::SeriesRange;
use netpod::BinnedRange;
use netpod::BinnedRangeEnum;
use netpod::TsNano;
use serde::Deserialize;
use serde::Serialize;
use std::any::Any;
use std::fmt;
use std::ops::Range;
@@ -64,10 +70,20 @@ pub trait TimeBinnableTy: fmt::Debug + WithLen + Send + Sized {
) -> Self::TimeBinner;
}
// #[derive(Debug, ThisError)]
// #[cstm(name = "Binninggg")]
pub enum BinningggError {
Dyn(Box<dyn std::error::Error>),
}
impl fmt::Display for BinningggError {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match self {
BinningggError::Dyn(e) => write!(fmt, "{e}"),
}
}
}
impl<E> From<E> for BinningggError
where
E: std::error::Error + 'static,
@@ -77,11 +93,21 @@ where
}
}
pub trait BinningggContainerEventsDyn: fmt::Debug {
pub trait BinningggContainerEventsDyn: fmt::Debug + Send {
fn binned_events_timeweight_traitobj(&self) -> Box<dyn BinnedEventsTimeweightTrait>;
}
pub trait BinningggContainerBinsDyn: fmt::Debug {}
pub trait BinningggContainerBinsDyn: fmt::Debug + Send + fmt::Display + WithLen {
fn empty(&self) -> BinsBoxed;
fn clone(&self) -> BinsBoxed;
fn edges_iter(
&self,
) -> std::iter::Zip<std::collections::vec_deque::Iter<TsNano>, std::collections::vec_deque::Iter<TsNano>>;
fn drain_into(&mut self, dst: &mut dyn BinningggContainerBinsDyn, range: Range<usize>);
fn to_old_time_binned(&self) -> Box<dyn TimeBinned>;
}
pub type BinsBoxed = Box<dyn BinningggContainerBinsDyn>;
pub trait BinningggBinnerTy: fmt::Debug + Send {
type Input: fmt::Debug;

View File

@@ -0,0 +1,53 @@
use core::fmt;
use std::collections::VecDeque;
pub struct PreviewCell<'a, T> {
pub a: Option<&'a T>,
pub b: Option<&'a T>,
}
impl<'a, T> fmt::Debug for PreviewCell<'a, T>
where
T: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match (self.a.as_ref(), self.b.as_ref()) {
(Some(a), Some(b)) => write!(fmt, "{:?} .. {:?}", a, b),
(Some(a), None) => write!(fmt, "{:?}", a),
_ => write!(fmt, "(empty)"),
}
}
}
pub trait PreviewRange {
fn preview<'a>(&'a self) -> Box<dyn fmt::Debug + 'a>;
}
impl<T> PreviewRange for VecDeque<T>
where
T: fmt::Debug,
{
fn preview<'a>(&'a self) -> Box<dyn fmt::Debug + 'a> {
let ret = PreviewCell {
a: self.front(),
b: if self.len() <= 1 { None } else { self.back() },
};
Box::new(ret)
}
}
pub struct VecPreview<'a> {
c: &'a dyn PreviewRange,
}
impl<'a> VecPreview<'a> {
pub fn new(c: &'a dyn PreviewRange) -> Self {
Self { c }
}
}
impl<'a> fmt::Debug for VecPreview<'a> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "{:?}", self.c.preview())
}
}

View File

@@ -2,11 +2,10 @@ use super::aggregator::AggregatorNumeric;
use super::aggregator::AggregatorTimeWeight;
use super::container_events::EventValueType;
use super::___;
use crate::vecpreview::PreviewRange;
use crate::vecpreview::VecPreview;
use core::fmt;
use err::thiserror;
use err::ThisError;
use items_0::vecpreview::VecPreview;
use netpod::TsNano;
use serde::Deserialize;
use serde::Serialize;
@@ -114,6 +113,28 @@ impl<EVT> ContainerBins<EVT>
where
EVT: EventValueType,
{
pub fn from_constituents(
ts1s: VecDeque<TsNano>,
ts2s: VecDeque<TsNano>,
cnts: VecDeque<u64>,
mins: VecDeque<EVT>,
maxs: VecDeque<EVT>,
avgs: VecDeque<EVT::AggTimeWeightOutputAvg>,
lsts: VecDeque<EVT>,
fnls: VecDeque<bool>,
) -> Self {
Self {
ts1s,
ts2s,
cnts,
mins,
maxs,
avgs,
lsts,
fnls,
}
}
pub fn type_name() -> &'static str {
any::type_name::<Self>()
}
@@ -153,6 +174,14 @@ where
self.ts2s.back().map(|&x| x)
}
pub fn ts1s_iter(&self) -> std::collections::vec_deque::Iter<TsNano> {
self.ts1s.iter()
}
pub fn ts2s_iter(&self) -> std::collections::vec_deque::Iter<TsNano> {
self.ts2s.iter()
}
pub fn cnts_iter(&self) -> std::collections::vec_deque::Iter<u64> {
self.cnts.iter()
}
@@ -165,6 +194,50 @@ where
self.maxs.iter()
}
pub fn avgs_iter(&self) -> std::collections::vec_deque::Iter<EVT::AggTimeWeightOutputAvg> {
self.avgs.iter()
}
pub fn fnls_iter(&self) -> std::collections::vec_deque::Iter<bool> {
self.fnls.iter()
}
pub fn zip_iter(
&self,
) -> std::iter::Zip<
std::iter::Zip<
std::iter::Zip<
std::iter::Zip<
std::iter::Zip<
std::iter::Zip<
std::collections::vec_deque::Iter<TsNano>,
std::collections::vec_deque::Iter<TsNano>,
>,
std::collections::vec_deque::Iter<u64>,
>,
std::collections::vec_deque::Iter<EVT>,
>,
std::collections::vec_deque::Iter<EVT>,
>,
std::collections::vec_deque::Iter<EVT::AggTimeWeightOutputAvg>,
>,
std::collections::vec_deque::Iter<bool>,
> {
self.ts1s_iter()
.zip(self.ts2s_iter())
.zip(self.cnts_iter())
.zip(self.mins_iter())
.zip(self.maxs_iter())
.zip(self.avgs_iter())
.zip(self.fnls_iter())
}
pub fn edges_iter(
&self,
) -> std::iter::Zip<std::collections::vec_deque::Iter<TsNano>, std::collections::vec_deque::Iter<TsNano>> {
self.ts1s.iter().zip(self.ts2s.iter())
}
pub fn len_before(&self, end: TsNano) -> usize {
let pp = self.ts2s.partition_point(|&x| x <= end);
assert!(pp <= self.len(), "len_before pp {} len {}", pp, self.len());

View File

@@ -2,11 +2,12 @@ use super::aggregator::AggTimeWeightOutputAvg;
use super::aggregator::AggregatorNumeric;
use super::aggregator::AggregatorTimeWeight;
use super::___;
use crate::vecpreview::PreviewRange;
use crate::vecpreview::VecPreview;
use core::fmt;
use err::thiserror;
use err::ThisError;
use items_0::timebin::BinningggContainerEventsDyn;
use items_0::vecpreview::PreviewRange;
use items_0::vecpreview::VecPreview;
use netpod::TsNano;
use serde::Deserialize;
use serde::Serialize;
@@ -20,7 +21,7 @@ macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) }
#[cstm(name = "ValueContainerError")]
pub enum ValueContainerError {}
pub trait Container<EVT>: fmt::Debug + Clone + PreviewRange + Serialize + for<'a> Deserialize<'a> {
pub trait Container<EVT>: fmt::Debug + Send + Clone + PreviewRange + Serialize + for<'a> Deserialize<'a> {
fn new() -> Self;
// fn verify(&self) -> Result<(), ValueContainerError>;
fn push_back(&mut self, val: EVT);
@@ -120,6 +121,10 @@ impl<EVT> ContainerEvents<EVT>
where
EVT: EventValueType,
{
pub fn from_constituents(tss: VecDeque<TsNano>, vals: <EVT as EventValueType>::Container) -> Self {
Self { tss, vals }
}
pub fn type_name() -> &'static str {
any::type_name::<Self>()
}
@@ -233,3 +238,12 @@ where
}
}
}
impl<EVT> BinningggContainerEventsDyn for ContainerEvents<EVT>
where
EVT: EventValueType,
{
fn binned_events_timeweight_traitobj(&self) -> Box<dyn items_0::timebin::BinnedEventsTimeweightTrait> {
todo!()
}
}

View File

@@ -1,4 +1,5 @@
pub mod timeweight_bins;
pub mod timeweight_bins_dyn;
pub mod timeweight_events;
pub mod timeweight_events_dyn;

View File

@@ -0,0 +1,27 @@
use futures_util::Stream;
use items_0::streamitem::Sitemty;
use items_0::timebin::BinningggContainerBinsDyn;
use netpod::BinnedRange;
use netpod::TsNano;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
pub struct BinnedBinsTimeweightStream {}
impl BinnedBinsTimeweightStream {
pub fn new(
range: BinnedRange<TsNano>,
inp: Pin<Box<dyn Stream<Item = Sitemty<Box<dyn BinningggContainerBinsDyn>>> + Send>>,
) -> Self {
todo!()
}
}
impl Stream for BinnedBinsTimeweightStream {
type Item = Sitemty<Box<dyn BinningggContainerBinsDyn>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
todo!()
}
}

View File

@@ -19,6 +19,7 @@ use netpod::DtNano;
use netpod::TsNano;
use std::collections::VecDeque;
use std::marker::PhantomData;
use std::mem;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
@@ -382,6 +383,7 @@ where
range: BinnedRange<TsNano>,
inner_a: InnerA<EVT>,
out: ContainerBins<EVT>,
produce_cnt_zero: bool,
}
impl<EVT> fmt::Debug for BinnedEventsTimeweight<EVT>
@@ -422,9 +424,16 @@ where
},
lst: None,
out: ContainerBins::new(),
produce_cnt_zero: true,
}
}
pub fn disable_cnt_zero(self) -> Self {
let mut ret = self;
ret.produce_cnt_zero = false;
ret
}
fn ingest_event_without_lst(&mut self, ev: EventSingle<EVT>) -> Result<(), Error> {
let b = &self.inner_a.inner_b;
if ev.ts >= b.active_end {
@@ -485,10 +494,26 @@ where
let div = b.active_len.ns();
if let Some(lst) = self.lst.as_ref() {
let lst = LstRef(lst);
let mut i = 0;
loop {
i += 1;
assert!(i < 100000, "too many iterations");
if self.produce_cnt_zero {
let mut i = 0;
loop {
i += 1;
assert!(i < 100000, "too many iterations");
let b = &self.inner_a.inner_b;
if ts > b.filled_until {
if ts >= b.active_end {
if b.filled_until < b.active_end {
self.inner_a.inner_b.fill_until(b.active_end, lst.clone());
}
self.inner_a.push_out_and_reset(lst.clone(), true, &mut self.out);
} else {
self.inner_a.inner_b.fill_until(ts, lst.clone());
}
} else {
break;
}
}
} else {
let b = &self.inner_a.inner_b;
if ts > b.filled_until {
if ts >= b.active_end {
@@ -497,13 +522,29 @@ where
}
self.inner_a.push_out_and_reset(lst.clone(), true, &mut self.out);
} else {
// TODO should not hit this case. Prove it, assert it.
self.inner_a.inner_b.fill_until(ts, lst.clone());
}
} else {
break;
// TODO should never hit this case. Count.
}
// TODO jump to next bin
// TODO merge with the other reset
// Below uses the same code
let ts1 = TsNano::from_ns(ts.ns() / div * div);
let b = &mut self.inner_a.inner_b;
b.active_beg = ts1;
b.active_end = ts1.add_dt_nano(b.active_len);
b.filled_until = ts1;
b.filled_width = DtNano::from_ns(0);
b.cnt = 0;
b.agg.reset_for_new_bin();
// assert!(self.inner_a.minmax.is_none());
trace_cycle!("cycled direct to {:?} {:?}", b.active_beg, b.active_end);
}
} else {
assert!(self.inner_a.minmax.is_none());
// TODO merge with the other reset
let ts1 = TsNano::from_ns(ts.ns() / div * div);
let b = &mut self.inner_a.inner_b;
@@ -513,7 +554,6 @@ where
b.filled_width = DtNano::from_ns(0);
b.cnt = 0;
b.agg.reset_for_new_bin();
assert!(self.inner_a.minmax.is_none());
trace_cycle!("cycled direct to {:?} {:?}", b.active_beg, b.active_end);
}
}
@@ -594,6 +634,6 @@ where
}
pub fn output(&mut self) -> ContainerBins<EVT> {
::core::mem::replace(&mut self.out, ContainerBins::new())
mem::replace(&mut self.out, ContainerBins::new())
}
}

View File

@@ -1,9 +1,11 @@
use super::timeweight_events::BinnedEventsTimeweight;
use crate::binning::container_bins::ContainerBins;
use crate::binning::container_events::EventValueType;
use crate::channelevents::ChannelEvents;
use err::thiserror;
use err::ThisError;
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::streamitem::Sitemty;
use items_0::timebin::BinnedEventsTimeweightTrait;
use items_0::timebin::BinningggBinnerDyn;
@@ -12,6 +14,8 @@ use items_0::timebin::BinningggContainerEventsDyn;
use items_0::timebin::BinningggError;
use netpod::BinnedRange;
use netpod::TsNano;
use std::arch::x86_64;
use std::ops::ControlFlow;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
@@ -68,18 +72,6 @@ where
}
}
pub struct BinnedEventsTimeweightStream {
inp: Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>>,
}
impl Stream for BinnedEventsTimeweightStream {
type Item = ();
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
todo!()
}
}
#[derive(Debug)]
pub struct BinnedEventsTimeweightLazy {
range: BinnedRange<TsNano>,
@@ -114,3 +106,109 @@ impl BinnedEventsTimeweightTrait for BinnedEventsTimeweightLazy {
todo!()
}
}
enum StreamState {
Reading,
Done,
}
pub struct BinnedEventsTimeweightStream {
state: StreamState,
inp: Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>>,
binned_events: BinnedEventsTimeweightLazy,
range_complete: bool,
}
impl BinnedEventsTimeweightStream {
pub fn new(range: BinnedRange<TsNano>, inp: Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>>) -> Self {
Self {
state: StreamState::Reading,
inp,
binned_events: BinnedEventsTimeweightLazy::new(range),
range_complete: false,
}
}
fn handle_sitemty(
mut self: Pin<&mut Self>,
item: Sitemty<ChannelEvents>,
cx: &mut Context,
) -> ControlFlow<Poll<Option<<Self as Stream>::Item>>> {
use items_0::streamitem::RangeCompletableItem::*;
use items_0::streamitem::StreamItem::*;
use ControlFlow::*;
use Poll::*;
match item {
Ok(x) => match x {
DataItem(x) => match x {
Data(x) => match x {
ChannelEvents::Events(evs) => match self.binned_events.ingest(evs.to_container_events()) {
Ok(()) => {
match self.binned_events.output() {
Ok(x) => {
if x.len() == 0 {
Continue(())
} else {
Break(Ready(Some(Ok(DataItem(Data(x))))))
}
}
Err(e) => Break(Ready(Some(Err(::err::Error::from_string(e))))),
}
// Continue(())
}
Err(e) => Break(Ready(Some(Err(::err::Error::from_string(e))))),
},
ChannelEvents::Status(_) => {
// TODO use the status
Continue(())
}
},
RangeComplete => {
self.range_complete = true;
Continue(())
}
},
Log(x) => Break(Ready(Some(Ok(Log(x))))),
Stats(x) => Break(Ready(Some(Ok(Stats(x))))),
},
Err(e) => {
self.state = StreamState::Done;
Break(Ready(Some(Err(e))))
}
}
}
fn handle_eos(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<<Self as Stream>::Item>> {
use items_0::streamitem::RangeCompletableItem::*;
use items_0::streamitem::StreamItem::*;
use Poll::*;
if self.range_complete {
self.binned_events.input_done_range_final();
} else {
self.binned_events.input_done_range_open();
}
match self.binned_events.output() {
Ok(x) => Ready(Some(Ok(DataItem(Data(x))))),
Err(e) => Ready(Some(Err(::err::Error::from_string(e)))),
}
}
}
impl Stream for BinnedEventsTimeweightStream {
type Item = Sitemty<Box<dyn BinningggContainerBinsDyn>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use ControlFlow::*;
use Poll::*;
loop {
break match self.as_mut().inp.poll_next_unpin(cx) {
Ready(Some(x)) => match self.as_mut().handle_sitemty(x, cx) {
Continue(()) => continue,
Break(x) => x,
},
Ready(None) => self.handle_eos(cx),
Pending => Pending,
};
}
}
}

View File

@@ -2,8 +2,8 @@ use super::aggregator::AggregatorTimeWeight;
use super::binnedvaluetype::BinnedNumericValue;
use super::container_events::Container;
use super::container_events::EventValueType;
use crate::vecpreview::PreviewRange;
use core::fmt;
use items_0::vecpreview::PreviewRange;
use netpod::DtNano;
use netpod::EnumVariant;
use serde::Deserialize;
@@ -18,7 +18,7 @@ pub struct EnumVariantContainer {
impl PreviewRange for EnumVariantContainer {
fn preview<'a>(&'a self) -> Box<dyn fmt::Debug + 'a> {
let ret = crate::vecpreview::PreviewCell {
let ret = items_0::vecpreview::PreviewCell {
a: self.ixs.front(),
b: self.ixs.back(),
};

View File

@@ -2,7 +2,6 @@ use crate::timebin::TimeBinnerCommonV0Func;
use crate::timebin::TimeBinnerCommonV0Trait;
use crate::ts_offs_from_abs;
use crate::ts_offs_from_abs_with_anchor;
use crate::vecpreview::VecPreview;
use crate::IsoDateTime;
use crate::RangeOverlapInfo;
use crate::TimeBinnableType;
@@ -25,6 +24,7 @@ use items_0::timebin::TimeBinned;
use items_0::timebin::TimeBinner;
use items_0::timebin::TimeBinnerTy;
use items_0::timebin::TimeBins;
use items_0::vecpreview::VecPreview;
use items_0::AppendAllFrom;
use items_0::AppendEmptyBin;
use items_0::AsAnyMut;
@@ -134,6 +134,10 @@ where
impl<NTY: ScalarOps> BinsDim0<NTY> {
pub fn push(&mut self, ts1: u64, ts2: u64, count: u64, min: NTY, max: NTY, avg: f32, lst: NTY) {
if avg < min.as_prim_f32_b() || avg > max.as_prim_f32_b() {
// TODO rounding issues?
debug!("bad avg");
}
self.ts1s.push_back(ts1);
self.ts2s.push_back(ts2);
self.cnts.push_back(count);
@@ -534,12 +538,19 @@ where
if self.cnt == 0 && !push_empty {
self.reset_agg();
} else {
let min = self.min.clone();
let max = self.max.clone();
let avg = self.avg as f32;
if avg < min.as_prim_f32_b() || avg > max.as_prim_f32_b() {
// TODO rounding issues?
debug!("bad avg");
}
self.out.ts1s.push_back(self.ts1now.ns());
self.out.ts2s.push_back(self.ts2now.ns());
self.out.cnts.push_back(self.cnt);
self.out.mins.push_back(self.min.clone());
self.out.maxs.push_back(self.max.clone());
self.out.avgs.push_back(self.avg as f32);
self.out.mins.push_back(min);
self.out.maxs.push_back(max);
self.out.avgs.push_back(avg);
self.out.lsts.push_back(self.lst.clone());
self.reset_agg();
}

View File

@@ -1023,6 +1023,10 @@ impl Events for ChannelEvents {
Status(x) => panic!("ChannelEvents::to_dim0_f32_for_binning"),
}
}
fn to_container_events(&self) -> Box<dyn ::items_0::timebin::BinningggContainerEventsDyn> {
panic!("should not get used")
}
}
impl Collectable for ChannelEvents {

View File

@@ -270,6 +270,7 @@ pub enum DecompError {
UnusedBytes,
BitshuffleError,
ShapeMakesNoSense,
UnexpectedCompressedScalarValue,
}
fn decompress(databuf: &[u8], type_size: u32) -> Result<Vec<u8>, DecompError> {
@@ -325,10 +326,18 @@ impl EventFull {
/// but we still don't know whether that's an image or a waveform.
/// Therefore, the function accepts the expected shape to at least make an assumption
/// about whether this is an image or a waveform.
pub fn shape_derived(&self, i: usize, shape_exp: &Shape) -> Result<Shape, DecompError> {
pub fn shape_derived(
&self,
i: usize,
scalar_type_exp: &ScalarType,
shape_exp: &Shape,
) -> Result<Shape, DecompError> {
match shape_exp {
Shape::Scalar => match &self.comps[i] {
Some(_) => Err(DecompError::ShapeMakesNoSense),
Some(_) => match scalar_type_exp {
ScalarType::STRING => Ok(Shape::Scalar),
_ => Err(DecompError::UnexpectedCompressedScalarValue),
},
None => Ok(Shape::Scalar),
},
Shape::Wave(_) => match &self.shapes[i] {

View File

@@ -760,6 +760,31 @@ impl<STY: ScalarOps> EventsDim0Aggregator<STY> {
} else {
lst.as_prim_f32_b()
};
let max = if min > max {
// TODO count
debug!("min > max");
min.clone()
} else {
max
};
let avg = {
let g = min.as_prim_f32_b();
if avg < g {
debug!("avg < min");
g
} else {
avg
}
};
let avg = {
let g = max.as_prim_f32_b();
if avg > g {
debug!("avg > max");
g
} else {
avg
}
};
let ret = if self.range.is_time() {
BinsDim0 {
ts1s: [range_beg].into(),
@@ -1078,6 +1103,14 @@ impl<STY: ScalarOps> Events for EventsDim0<STY> {
}
Box::new(ret)
}
fn to_container_events(&self) -> Box<dyn ::items_0::timebin::BinningggContainerEventsDyn> {
// let tss = self.tss.iter().map(|&x| TsNano::from_ns(x)).collect();
// let vals = self.values.clone();
// let ret = crate::binning::container_events::ContainerEvents::from_constituents(tss, vals);
// Box::new(ret)
todo!()
}
}
#[derive(Debug)]

View File

@@ -499,4 +499,8 @@ impl Events for EventsDim0Enum {
fn to_dim0_f32_for_binning(&self) -> Box<dyn Events> {
todo!("{}::to_dim0_f32_for_binning", self.type_name())
}
fn to_container_events(&self) -> Box<dyn ::items_0::timebin::BinningggContainerEventsDyn> {
todo!("{}::to_container_events", self.type_name())
}
}

View File

@@ -993,6 +993,10 @@ impl<STY: ScalarOps> Events for EventsDim1<STY> {
fn to_dim0_f32_for_binning(&self) -> Box<dyn Events> {
todo!("{}::to_dim0_f32_for_binning", self.type_name())
}
fn to_container_events(&self) -> Box<dyn ::items_0::timebin::BinningggContainerEventsDyn> {
todo!("{}::to_container_events", self.type_name())
}
}
#[derive(Debug)]

View File

@@ -381,6 +381,10 @@ impl<STY: ScalarOps> Events for EventsXbinDim0<STY> {
fn to_dim0_f32_for_binning(&self) -> Box<dyn Events> {
todo!("{}::to_dim0_f32_for_binning", self.type_name())
}
fn to_container_events(&self) -> Box<dyn ::items_0::timebin::BinningggContainerEventsDyn> {
todo!("{}::to_container_events", self.type_name())
}
}
#[derive(Debug)]

View File

@@ -1,53 +1 @@
use core::fmt;
use std::collections::VecDeque;
pub struct PreviewCell<'a, T> {
pub a: Option<&'a T>,
pub b: Option<&'a T>,
}
impl<'a, T> fmt::Debug for PreviewCell<'a, T>
where
T: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match (self.a.as_ref(), self.b.as_ref()) {
(Some(a), Some(b)) => write!(fmt, "{:?} .. {:?}", a, b),
(Some(a), None) => write!(fmt, "{:?}", a),
_ => write!(fmt, "(empty)"),
}
}
}
pub trait PreviewRange {
fn preview<'a>(&'a self) -> Box<dyn fmt::Debug + 'a>;
}
impl<T> PreviewRange for VecDeque<T>
where
T: fmt::Debug,
{
fn preview<'a>(&'a self) -> Box<dyn fmt::Debug + 'a> {
let ret = PreviewCell {
a: self.front(),
b: if self.len() <= 1 { None } else { self.back() },
};
Box::new(ret)
}
}
pub struct VecPreview<'a> {
c: &'a dyn PreviewRange,
}
impl<'a> VecPreview<'a> {
pub fn new(c: &'a dyn PreviewRange) -> Self {
Self { c }
}
}
impl<'a> fmt::Debug for VecPreview<'a> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "{:?}", self.c.preview())
}
}

View File

@@ -279,7 +279,7 @@ pub fn events_parse_input_query(frames: Vec<InMemoryFrame>) -> Result<(EventsSub
},
Err(e) => return Err(e.into()),
};
info!("parsing json {:?}", qitem.str());
trace!("parsing json {:?}", qitem.str());
let frame1: Frame1Parts = serde_json::from_str(&qitem.str()).map_err(|_e| {
let e = Error::BadQuery;
error!("{e}");

View File

@@ -537,7 +537,7 @@ impl EventsSubQuery {
}
pub fn need_one_before_range(&self) -> bool {
self.select.one_before_range
self.select.one_before_range | self.transform().need_one_before_range()
}
pub fn transform(&self) -> &TransformQuery {

View File

@@ -7,6 +7,7 @@ use err::Error;
use futures_util::Future;
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::timebin::BinsBoxed;
use items_0::timebin::TimeBinned;
use items_0::Empty;
use items_2::binsdim0::BinsDim0;
@@ -227,12 +228,14 @@ impl streams::timebin::CacheReadProvider for ScyllaCacheReadProvider {
offs: Range<u32>,
) -> streams::timebin::cached::reader::CacheReading {
let scyqueue = self.scyqueue.clone();
let fut = async move { scyqueue.read_cache_f32(series, bin_len, msp, offs).await };
// let fut = async move { scyqueue.read_cache_f32(series, bin_len, msp, offs).await };
let fut = async { todo!("TODO impl scylla cache read") };
streams::timebin::cached::reader::CacheReading::new(Box::pin(fut))
}
fn write(&self, series: u64, bins: BinsDim0<f32>) -> streams::timebin::cached::reader::CacheWriting {
fn write(&self, series: u64, bins: BinsBoxed) -> streams::timebin::cached::reader::CacheWriting {
let scyqueue = self.scyqueue.clone();
let bins = todo!("TODO impl scylla cache write");
let fut = async move { scyqueue.write_cache_f32(series, bins).await };
streams::timebin::cached::reader::CacheWriting::new(Box::pin(fut))
}

View File

@@ -85,7 +85,7 @@ fn make_test_channel_events_stream_data_inner(
debug!("use test backend data");
let chn = subq.name();
let range = subq.range().clone();
let one_before = subq.transform().need_one_before_range();
let one_before = subq.need_one_before_range();
if chn == "test-gen-i32-dim0-v00" {
Ok(Box::pin(GenerateI32V00::new(node_ix, node_count, range, one_before)))
} else if chn == "test-gen-i32-dim0-v01" {

View File

@@ -39,7 +39,7 @@ where
inp: S,
range: NanoRange,
range_str: String,
one_before_range: bool,
one_before: bool,
stats: RangeFilterStats,
slot1: Option<ITY>,
have_range_complete: bool,
@@ -59,20 +59,20 @@ where
std::any::type_name::<Self>()
}
pub fn new(inp: S, range: NanoRange, one_before_range: bool) -> Self {
pub fn new(inp: S, range: NanoRange, one_before: bool) -> Self {
let trdet = false;
trace_emit!(
trdet,
"{}::new range: {:?} one_before_range {:?}",
"{}::new range: {:?} one_before {:?}",
Self::type_name(),
range,
one_before_range
one_before
);
Self {
inp,
range_str: format!("{:?}", range),
range,
one_before_range,
one_before,
stats: RangeFilterStats::new(),
slot1: None,
have_range_complete: false,
@@ -116,6 +116,11 @@ where
}
fn handle_item(&mut self, item: ITY) -> Result<ITY, Error> {
if let Some(ts_min) = item.ts_min() {
if ts_min < self.range.beg() {
debug!("ITEM BEFORE RANGE (how many?)");
}
}
let min = item.ts_min().map(|x| TsNano::from_ns(x).fmt());
let max = item.ts_max().map(|x| TsNano::from_ns(x).fmt());
trace_emit!(
@@ -126,7 +131,7 @@ where
max
);
let mut item = self.prune_high(item, self.range.end)?;
let ret = if self.one_before_range {
let ret = if self.one_before {
let lige = item.find_lowest_index_ge(self.range.beg);
trace_emit!(self.trdet, "YES one_before_range ilge {:?}", lige);
match lige {

View File

@@ -5,8 +5,7 @@ use futures_util::FutureExt;
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::streamitem::Sitemty;
use items_0::timebin::TimeBinnable;
use items_2::binsdim0::BinsDim0;
use items_0::timebin::BinsBoxed;
use items_2::channelevents::ChannelEvents;
use netpod::log::*;
use netpod::BinnedRange;
@@ -55,19 +54,19 @@ pub trait EventsReadProvider: Send + Sync {
}
pub struct CacheReading {
fut: Pin<Box<dyn Future<Output = Result<BinsDim0<f32>, streams::timebin::cached::reader::Error>> + Send>>,
fut: Pin<Box<dyn Future<Output = Result<BinsBoxed, streams::timebin::cached::reader::Error>> + Send>>,
}
impl CacheReading {
pub fn new(
fut: Pin<Box<dyn Future<Output = Result<BinsDim0<f32>, streams::timebin::cached::reader::Error>> + Send>>,
fut: Pin<Box<dyn Future<Output = Result<BinsBoxed, streams::timebin::cached::reader::Error>> + Send>>,
) -> Self {
Self { fut }
}
}
impl Future for CacheReading {
type Output = Result<BinsDim0<f32>, streams::timebin::cached::reader::Error>;
type Output = Result<BinsBoxed, streams::timebin::cached::reader::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
self.fut.poll_unpin(cx)
@@ -94,7 +93,7 @@ impl Future for CacheWriting {
pub trait CacheReadProvider: Send + Sync {
fn read(&self, series: u64, bin_len: DtMs, msp: u64, offs: Range<u32>) -> CacheReading;
fn write(&self, series: u64, bins: BinsDim0<f32>) -> CacheWriting;
fn write(&self, series: u64, bins: BinsBoxed) -> CacheWriting;
}
#[derive(Debug, ThisError)]
@@ -112,7 +111,7 @@ pub struct CachedReader {
ts1next: TsNano,
bin_len: DtMs,
cache_read_provider: Arc<dyn CacheReadProvider>,
reading: Option<Pin<Box<dyn Future<Output = Result<BinsDim0<f32>, Error>> + Send>>>,
reading: Option<Pin<Box<dyn Future<Output = Result<BinsBoxed, Error>> + Send>>>,
}
impl CachedReader {
@@ -134,7 +133,7 @@ impl CachedReader {
}
impl Stream for CachedReader {
type Item = Result<BinsDim0<f32>, Error>;
type Item = Result<BinsBoxed, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
@@ -151,7 +150,6 @@ impl Stream for CachedReader {
self.reading = None;
match x {
Ok(bins) => {
use items_0::WithLen;
trace_emit!(
"- - - - - - - - - - - - emit cached bins {} bin_len {}",
bins.len(),

View File

@@ -6,8 +6,7 @@ use futures_util::StreamExt;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
use items_2::binsdim0::BinsDim0;
use items_2::channelevents::ChannelEvents;
use items_0::timebin::BinsBoxed;
use netpod::log::*;
use netpod::BinnedRange;
use netpod::ChConf;
@@ -26,7 +25,7 @@ macro_rules! trace_emit { ($($arg:tt)*) => ( if true { trace!($($arg)*); } ) }
pub enum Error {}
pub struct BinnedFromEvents {
stream: Pin<Box<dyn Stream<Item = Sitemty<BinsDim0<f32>>> + Send>>,
stream: Pin<Box<dyn Stream<Item = Sitemty<BinsBoxed>> + Send>>,
}
impl BinnedFromEvents {
@@ -41,33 +40,28 @@ impl BinnedFromEvents {
panic!();
}
let stream = read_provider.read(evq, chconf);
let stream = stream.map(|x| {
let x = items_0::try_map_sitemty_data!(x, |x| match x {
ChannelEvents::Events(x) => {
let x = x.to_dim0_f32_for_binning();
Ok(ChannelEvents::Events(x))
}
ChannelEvents::Status(x) => Ok(ChannelEvents::Status(x)),
});
x
});
let stream = Box::pin(stream);
let stream = super::basic::TimeBinnedStream::new(stream, netpod::BinnedRangeEnum::Time(range), do_time_weight);
// let stream = stream.map(|x| {
// let x = items_0::try_map_sitemty_data!(x, |x| match x {
// ChannelEvents::Events(x) => {
// let x = x.to_dim0_f32_for_binning();
// Ok(ChannelEvents::Events(x))
// }
// ChannelEvents::Status(x) => Ok(ChannelEvents::Status(x)),
// });
// x
// });
let stream = if do_time_weight {
let stream = Box::pin(stream);
items_2::binning::timeweight::timeweight_events_dyn::BinnedEventsTimeweightStream::new(range, stream)
} else {
panic!("non-weighted TODO")
};
let stream = stream.map(|item| match item {
Ok(x) => match x {
StreamItem::DataItem(x) => match x {
RangeCompletableItem::Data(mut x) => {
// TODO need a typed time binner
if let Some(x) = x.as_any_mut().downcast_mut::<BinsDim0<f32>>() {
let y = x.clone();
use items_0::WithLen;
trace_emit!("=========== ========= emit from events {}", y.len());
Ok(StreamItem::DataItem(RangeCompletableItem::Data(y)))
} else {
Err(::err::Error::with_msg_no_trace(
"GapFill expects incoming BinsDim0<f32>",
))
}
RangeCompletableItem::Data(x) => {
debug!("see item {:?}", x);
Ok(StreamItem::DataItem(RangeCompletableItem::Data(x)))
}
RangeCompletableItem::RangeComplete => {
info!("BinnedFromEvents sees range final");
@@ -87,7 +81,7 @@ impl BinnedFromEvents {
}
impl Stream for BinnedFromEvents {
type Item = Sitemty<BinsDim0<f32>>;
type Item = Sitemty<BinsBoxed>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
self.stream.poll_next_unpin(cx)

View File

@@ -12,7 +12,10 @@ use items_0::on_sitemty_data;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
use items_0::timebin::BinningggContainerBinsDyn;
use items_0::timebin::BinsBoxed;
use items_0::timebin::TimeBinnableTy;
use items_2::binning::timeweight::timeweight_bins_dyn::BinnedBinsTimeweightStream;
use items_2::binsdim0::BinsDim0;
use netpod::log::*;
use netpod::query::CacheUsage;
@@ -44,7 +47,7 @@ pub enum Error {
FinerGridMismatch(DtMs, DtMs),
}
type BoxedInput = Pin<Box<dyn Stream<Item = Sitemty<BinsDim0<f32>>> + Send>>;
type BoxedInput = Pin<Box<dyn Stream<Item = Sitemty<BinsBoxed>> + Send>>;
pub struct TimeBinnedFromLayers {
ch_conf: ChannelTypeConfigGen,
@@ -141,11 +144,7 @@ impl TimeBinnedFromLayers {
cache_read_provider,
events_read_provider.clone(),
)?;
let inp = super::basic::TimeBinnedStream::new(
Box::pin(inp),
BinnedRangeEnum::Time(range),
do_time_weight,
);
let inp = BinnedBinsTimeweightStream::new(range, Box::pin(inp));
let ret = Self {
ch_conf,
cache_usage,
@@ -200,7 +199,7 @@ impl TimeBinnedFromLayers {
}
impl Stream for TimeBinnedFromLayers {
type Item = Sitemty<BinsDim0<f32>>;
type Item = Sitemty<BinsBoxed>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;

View File

@@ -9,9 +9,7 @@ use futures_util::StreamExt;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
use items_0::Empty;
use items_0::WithLen;
use items_2::binsdim0::BinsDim0;
use items_0::timebin::BinsBoxed;
use netpod::log::*;
use netpod::query::CacheUsage;
use netpod::range::evrange::NanoRange;
@@ -56,7 +54,7 @@ pub enum Error {
EventsReader(#[from] super::fromevents::Error),
}
type INP = Pin<Box<dyn Stream<Item = Sitemty<BinsDim0<f32>>> + Send>>;
type Input = Pin<Box<dyn Stream<Item = Sitemty<BinsBoxed>> + Send>>;
// Try to read from cache for the given bin len.
// For gaps in the stream, construct an alternative input from finer bin len with a binner.
@@ -72,10 +70,10 @@ pub struct GapFill {
range: BinnedRange<TsNano>,
do_time_weight: bool,
bin_len_layers: Vec<DtMs>,
inp: Option<INP>,
inp: Option<Input>,
inp_range_final: bool,
inp_buf: Option<BinsDim0<f32>>,
inp_finer: Option<INP>,
inp_buf: Option<BinsBoxed>,
inp_finer: Option<Input>,
inp_finer_range_final: bool,
inp_finer_range_final_cnt: u32,
inp_finer_range_final_max: u32,
@@ -84,7 +82,7 @@ pub struct GapFill {
exp_finer_range: NanoRange,
cache_read_provider: Arc<dyn CacheReadProvider>,
events_read_provider: Arc<dyn EventsReadProvider>,
bins_for_cache_write: BinsDim0<f32>,
bins_for_cache_write: Option<BinsBoxed>,
done: bool,
cache_writing: Option<super::cached::reader::CacheWriting>,
}
@@ -114,7 +112,7 @@ impl GapFill {
Ok(x) => Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))),
Err(e) => Err(::err::Error::from_string(e)),
});
Box::pin(stream) as Pin<Box<dyn Stream<Item = Sitemty<BinsDim0<f32>>> + Send>>
Box::pin(stream) as Pin<Box<dyn Stream<Item = Sitemty<BinsBoxed>> + Send>>
} else {
let stream = futures_util::stream::empty();
Box::pin(stream)
@@ -144,36 +142,33 @@ impl GapFill {
exp_finer_range: NanoRange { beg: 0, end: 0 },
cache_read_provider,
events_read_provider,
bins_for_cache_write: BinsDim0::empty(),
bins_for_cache_write: None,
done: false,
cache_writing: None,
};
Ok(ret)
}
fn handle_bins_finer(mut self: Pin<&mut Self>, bins: BinsDim0<f32>) -> Result<BinsDim0<f32>, Error> {
fn handle_bins_finer(mut self: Pin<&mut Self>, bins: BinsBoxed) -> Result<BinsBoxed, Error> {
trace_handle!("{} handle_bins_finer {}", self.dbgname, bins);
for (&ts1, &ts2) in bins.ts1s.iter().zip(&bins.ts2s) {
for (&ts1, &ts2) in bins.edges_iter() {
if let Some(last) = self.last_bin_ts2 {
if ts1 != last.ns() {
return Err(Error::GapFromFiner(
TsNano::from_ns(ts1),
last,
self.range.bin_len_dt_ms(),
));
if ts1 != last {
return Err(Error::GapFromFiner(ts1, last, self.range.bin_len_dt_ms()));
}
} else if ts1 != self.range.nano_beg().ns() {
} else if ts1 != self.range.nano_beg() {
return Err(Error::MissingBegFromFiner(
TsNano::from_ns(ts1),
ts1,
self.range.nano_beg(),
self.range.bin_len_dt_ms(),
));
}
self.last_bin_ts2 = Some(TsNano::from_ns(ts2));
self.last_bin_ts2 = Some(ts2);
}
if bins.len() != 0 {
let mut bins2 = bins.clone();
bins2.drain_into(&mut self.bins_for_cache_write, 0..bins2.len());
let dst = self.bins_for_cache_write.get_or_insert_with(|| bins.empty());
bins2.drain_into(dst.as_mut(), 0..bins2.len());
}
if self.cache_usage.is_cache_write() {
self.cache_write_intermediate()?;
@@ -191,34 +186,34 @@ impl GapFill {
Ok(())
}
fn handle_bins(mut self: Pin<&mut Self>, bins: BinsDim0<f32>) -> Result<BinsDim0<f32>, Error> {
fn handle_bins(mut self: Pin<&mut Self>, bins: BinsBoxed) -> Result<BinsBoxed, Error> {
trace_handle!("{} handle_bins {}", self.dbgname, bins);
// TODO could use an interface to iterate over opaque bin items that only expose
// edge and count information with all remaining values opaque.
for (i, (&ts1, &ts2)) in bins.ts1s.iter().zip(&bins.ts2s).enumerate() {
if ts1 < self.range.nano_beg().ns() {
for (i, (&ts1, &ts2)) in bins.edges_iter().enumerate() {
if ts1 < self.range.nano_beg() {
return Err(Error::InputBeforeRange(
NanoRange::from_ns_u64(ts1, ts2),
NanoRange::from_ns_u64(ts1.ns(), ts2.ns()),
self.range.clone(),
));
}
if let Some(last) = self.last_bin_ts2 {
if ts1 != last.ns() {
if ts1 != last {
trace_handle!("{} detect a gap BETWEEN last {} ts1 {}", self.dbgname, last, ts1);
let mut ret = <BinsDim0<f32> as items_0::Empty>::empty();
let mut ret = bins.empty();
let mut bins = bins;
bins.drain_into(&mut ret, 0..i);
bins.drain_into(ret.as_mut(), 0..i);
self.inp_buf = Some(bins);
let range = NanoRange {
beg: last.ns(),
end: ts1,
end: ts1.ns(),
};
self.setup_sub(range)?;
return Ok(ret);
} else {
// nothing to do
}
} else if ts1 != self.range.nano_beg().ns() {
} else if ts1 != self.range.nano_beg() {
trace_handle!(
"{} detect a gap BEGIN beg {} ts1 {}",
self.dbgname,
@@ -227,12 +222,12 @@ impl GapFill {
);
let range = NanoRange {
beg: self.range.nano_beg().ns(),
end: ts1,
end: ts1.ns(),
};
self.setup_sub(range)?;
return Ok(BinsDim0::empty());
return Ok(bins.empty());
}
self.last_bin_ts2 = Some(TsNano::from_ns(ts2));
self.last_bin_ts2 = Some(ts2);
}
Ok(bins)
}
@@ -270,10 +265,12 @@ impl GapFill {
self.events_read_provider.clone(),
)?;
let stream = Box::pin(inp_finer);
let do_time_weight = self.do_time_weight;
let range = BinnedRange::from_nano_range(range_finer.full_range(), self.range.bin_len.to_dt_ms());
let stream =
super::basic::TimeBinnedStream::new(stream, netpod::BinnedRangeEnum::Time(range), do_time_weight);
let stream = if self.do_time_weight {
::items_2::binning::timeweight::timeweight_bins_dyn::BinnedBinsTimeweightStream::new(range, stream)
} else {
panic!("TODO unweighted")
};
self.inp_finer = Some(Box::pin(stream));
} else {
debug_setup!("{} setup_inp_finer next finer from events {}", self.dbgname, range);
@@ -309,7 +306,7 @@ impl GapFill {
Ok(())
}
fn cache_write(mut self: Pin<&mut Self>, bins: BinsDim0<f32>) -> Result<(), Error> {
fn cache_write(mut self: Pin<&mut Self>, bins: BinsBoxed) -> Result<(), Error> {
self.cache_writing = Some(self.cache_read_provider.write(self.series, bins));
Ok(())
}
@@ -318,42 +315,27 @@ impl GapFill {
if self.inp_finer_fills_gap {
// TODO can consider all incoming bins as final by assumption.
}
let aa = &self.bins_for_cache_write;
if aa.len() >= 2 {
for (i, (&c1, &_c2)) in aa.cnts.iter().rev().zip(aa.cnts.iter().rev().skip(1)).enumerate() {
if c1 != 0 {
let n = aa.len() - (1 + i);
debug_cache!("{} cache_write_on_end consider {} for write", self.dbgname, n);
let mut bins_write = BinsDim0::empty();
self.bins_for_cache_write.drain_into(&mut bins_write, 0..n);
self.cache_write(bins_write)?;
break;
}
if let Some(bins) = &self.bins_for_cache_write {
if bins.len() >= 2 {
// TODO guard behind flag.
// TODO emit to a async user-given channel, if given.
// Therefore, move to poll loop.
// Should only write to cache with non-zero count, therefore, not even emit others?
// TODO afterwards set to None.
self.bins_for_cache_write = None;
}
}
Ok(())
}
fn cache_write_intermediate(mut self: Pin<&mut Self>) -> Result<(), Error> {
let aa = &self.bins_for_cache_write;
if aa.len() >= 2 {
for (i, (&c1, &_c2)) in aa.cnts.iter().rev().zip(aa.cnts.iter().rev().skip(1)).enumerate() {
if c1 != 0 {
let n = aa.len() - (1 + i);
debug_cache!("{} cache_write_intermediate consider {} for write", self.dbgname, n);
let mut bins_write = BinsDim0::empty();
self.bins_for_cache_write.drain_into(&mut bins_write, 0..n);
self.cache_write(bins_write)?;
break;
}
}
}
fn cache_write_intermediate(self: Pin<&mut Self>) -> Result<(), Error> {
// TODO See cache_write_on_end
Ok(())
}
}
impl Stream for GapFill {
type Item = Sitemty<BinsDim0<f32>>;
type Item = Sitemty<BinsBoxed>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;

View File

@@ -361,9 +361,9 @@ async fn timebinned_stream(
)
.map_err(Error::from_string)?;
let stream = stream.map(|item| {
on_sitemty_data!(item, |k| Ok(StreamItem::DataItem(RangeCompletableItem::Data(
Box::new(k) as Box<dyn TimeBinned>
))))
on_sitemty_data!(item, |k: items_0::timebin::BinsBoxed| Ok(StreamItem::DataItem(
RangeCompletableItem::Data(k.to_old_time_binned())
)))
});
let stream: Pin<Box<dyn Stream<Item = Sitemty<Box<dyn TimeBinned>>> + Send>> = Box::pin(stream);
Ok(stream)