WIP typechecks
This commit is contained in:
@@ -34,8 +34,11 @@ use nodenet::scylla::ScyllaEventReadProvider;
|
||||
use query::api4::binned::BinnedQuery;
|
||||
use scyllaconn::bincache::ScyllaCacheReadProvider;
|
||||
use scyllaconn::worker::ScyllaQueue;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use streams::collect::CollectResult;
|
||||
use streams::eventsplainreader::DummyCacheReadProvider;
|
||||
use streams::eventsplainreader::SfDatabufferEventReadProvider;
|
||||
use streams::timebin::cached::reader::EventsReadProvider;
|
||||
use streams::timebin::CacheReadProvider;
|
||||
use tracing::Instrument;
|
||||
@@ -136,6 +139,40 @@ async fn binned(
|
||||
}
|
||||
}
|
||||
|
||||
fn make_read_provider(
|
||||
scyqueue: Option<ScyllaQueue>,
|
||||
open_bytes: Pin<Arc<OpenBoxedBytesViaHttp>>,
|
||||
ctx: &ReqCtx,
|
||||
ncc: &NodeConfigCached,
|
||||
) -> (Arc<dyn EventsReadProvider>, Arc<dyn CacheReadProvider>) {
|
||||
let events_read_provider = if ncc.node_config.cluster.scylla_lt().is_some() {
|
||||
scyqueue
|
||||
.clone()
|
||||
.map(|qu| ScyllaEventReadProvider::new(qu))
|
||||
.map(|x| Arc::new(x) as Arc<dyn EventsReadProvider>)
|
||||
.expect("expect scylla queue")
|
||||
} else if ncc.node.sf_databuffer.is_some() {
|
||||
// TODO do not clone the request. Pass an Arc up to here.
|
||||
let x = SfDatabufferEventReadProvider::new(Arc::new(ctx.clone()), open_bytes);
|
||||
Arc::new(x)
|
||||
} else {
|
||||
panic!()
|
||||
};
|
||||
let cache_read_provider = if ncc.node_config.cluster.scylla_lt().is_some() {
|
||||
scyqueue
|
||||
.clone()
|
||||
.map(|qu| ScyllaCacheReadProvider::new(qu))
|
||||
.map(|x| Arc::new(x) as Arc<dyn CacheReadProvider>)
|
||||
.expect("expect scylla queue")
|
||||
} else if ncc.node.sf_databuffer.is_some() {
|
||||
let x = DummyCacheReadProvider::new();
|
||||
Arc::new(x)
|
||||
} else {
|
||||
panic!()
|
||||
};
|
||||
(events_read_provider, cache_read_provider)
|
||||
}
|
||||
|
||||
async fn binned_json_single(
|
||||
url: Url,
|
||||
req: Requ,
|
||||
@@ -169,13 +206,8 @@ async fn binned_json_single(
|
||||
});
|
||||
let open_bytes = OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone());
|
||||
let open_bytes = Arc::pin(open_bytes);
|
||||
let cache_read_provider = scyqueue
|
||||
.clone()
|
||||
.map(|qu| ScyllaCacheReadProvider::new(qu))
|
||||
.map(|x| Arc::new(x) as Arc<dyn CacheReadProvider>);
|
||||
let events_read_provider = scyqueue
|
||||
.map(|qu| ScyllaEventReadProvider::new(qu))
|
||||
.map(|x| Arc::new(x) as Arc<dyn EventsReadProvider>);
|
||||
let open_bytes2 = Arc::pin(OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone()));
|
||||
let (events_read_provider, cache_read_provider) = make_read_provider(scyqueue, open_bytes2, ctx, ncc);
|
||||
let item = streams::timebinnedjson::timebinned_json(
|
||||
query,
|
||||
ch_conf,
|
||||
@@ -238,13 +270,8 @@ async fn binned_json_framed(
|
||||
});
|
||||
let open_bytes = OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone());
|
||||
let open_bytes = Arc::pin(open_bytes);
|
||||
let cache_read_provider = scyqueue
|
||||
.clone()
|
||||
.map(|qu| ScyllaCacheReadProvider::new(qu))
|
||||
.map(|x| Arc::new(x) as Arc<dyn CacheReadProvider>);
|
||||
let events_read_provider = scyqueue
|
||||
.map(|qu| ScyllaEventReadProvider::new(qu))
|
||||
.map(|x| Arc::new(x) as Arc<dyn EventsReadProvider>);
|
||||
let open_bytes2 = Arc::pin(OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone()));
|
||||
let (events_read_provider, cache_read_provider) = make_read_provider(scyqueue, open_bytes2, ctx, ncc);
|
||||
let stream = streams::timebinnedjson::timebinned_json_framed(
|
||||
query,
|
||||
ch_conf,
|
||||
|
||||
@@ -74,12 +74,16 @@ pub trait TimeBinnableTy: fmt::Debug + WithLen + Send + Sized {
|
||||
// #[cstm(name = "Binninggg")]
|
||||
pub enum BinningggError {
|
||||
Dyn(Box<dyn std::error::Error>),
|
||||
TypeMismatch { have: String, expect: String },
|
||||
}
|
||||
|
||||
impl fmt::Display for BinningggError {
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||
match self {
|
||||
BinningggError::Dyn(e) => write!(fmt, "{e}"),
|
||||
BinningggError::TypeMismatch { have, expect } => {
|
||||
write!(fmt, "TypeMismatch(have: {have}, expect: {expect})")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -94,10 +98,13 @@ where
|
||||
}
|
||||
|
||||
pub trait BinningggContainerEventsDyn: fmt::Debug + Send {
|
||||
fn binned_events_timeweight_traitobj(&self) -> Box<dyn BinnedEventsTimeweightTrait>;
|
||||
fn type_name(&self) -> &'static str;
|
||||
fn binned_events_timeweight_traitobj(&self, range: BinnedRange<TsNano>) -> Box<dyn BinnedEventsTimeweightTrait>;
|
||||
fn to_anybox(&mut self) -> Box<dyn std::any::Any>;
|
||||
}
|
||||
|
||||
pub trait BinningggContainerBinsDyn: fmt::Debug + Send + fmt::Display + WithLen {
|
||||
pub trait BinningggContainerBinsDyn: fmt::Debug + Send + fmt::Display + WithLen + AsAnyMut {
|
||||
fn type_name(&self) -> &'static str;
|
||||
fn empty(&self) -> BinsBoxed;
|
||||
fn clone(&self) -> BinsBoxed;
|
||||
fn edges_iter(
|
||||
@@ -133,10 +140,10 @@ pub trait BinningggBinnerDyn: fmt::Debug + Send {
|
||||
}
|
||||
|
||||
pub trait BinnedEventsTimeweightTrait: fmt::Debug + Send {
|
||||
fn ingest(&mut self, evs_all: Box<dyn BinningggContainerEventsDyn>) -> Result<(), BinningggError>;
|
||||
fn ingest(&mut self, evs_all: EventsBoxed) -> Result<(), BinningggError>;
|
||||
fn input_done_range_final(&mut self) -> Result<(), BinningggError>;
|
||||
fn input_done_range_open(&mut self) -> Result<(), BinningggError>;
|
||||
fn output(&mut self) -> Result<Box<dyn BinningggContainerBinsDyn>, BinningggError>;
|
||||
fn output(&mut self) -> Result<Option<BinsBoxed>, BinningggError>;
|
||||
}
|
||||
|
||||
/// Data in time-binned form.
|
||||
|
||||
@@ -2,16 +2,28 @@ use super::container_events::EventValueType;
|
||||
use core::fmt;
|
||||
use netpod::log::*;
|
||||
use netpod::DtNano;
|
||||
use netpod::EnumVariant;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! trace_event { ($($arg:tt)*) => ( if false { trace!($($arg)*); }) }
|
||||
|
||||
pub trait AggTimeWeightOutputAvg: fmt::Debug + Clone + Send + Serialize + for<'a> Deserialize<'a> {}
|
||||
|
||||
impl AggTimeWeightOutputAvg for u8 {}
|
||||
impl AggTimeWeightOutputAvg for u16 {}
|
||||
impl AggTimeWeightOutputAvg for u32 {}
|
||||
impl AggTimeWeightOutputAvg for u64 {}
|
||||
|
||||
impl AggTimeWeightOutputAvg for i8 {}
|
||||
impl AggTimeWeightOutputAvg for i16 {}
|
||||
impl AggTimeWeightOutputAvg for i32 {}
|
||||
impl AggTimeWeightOutputAvg for i64 {}
|
||||
impl AggTimeWeightOutputAvg for f32 {}
|
||||
|
||||
impl AggTimeWeightOutputAvg for f64 {}
|
||||
impl AggTimeWeightOutputAvg for EnumVariant {}
|
||||
impl AggTimeWeightOutputAvg for String {}
|
||||
impl AggTimeWeightOutputAvg for bool {}
|
||||
|
||||
pub trait AggregatorTimeWeight<EVT>: fmt::Debug + Send
|
||||
where
|
||||
@@ -48,7 +60,7 @@ where
|
||||
|
||||
fn ingest(&mut self, dt: DtNano, bl: DtNano, val: EVT) {
|
||||
let f = dt.ns() as f64 / bl.ns() as f64;
|
||||
trace!("INGEST {} {:?}", f, val);
|
||||
trace_event!("INGEST {} {:?}", f, val);
|
||||
self.sum += f * val.as_f64();
|
||||
}
|
||||
|
||||
@@ -71,7 +83,7 @@ impl AggregatorTimeWeight<f32> for AggregatorNumeric {
|
||||
|
||||
fn ingest(&mut self, dt: DtNano, bl: DtNano, val: f32) {
|
||||
let f = dt.ns() as f64 / bl.ns() as f64;
|
||||
trace!("INGEST {} {}", f, val);
|
||||
trace_event!("INGEST {} {}", f, val);
|
||||
self.sum += f * val as f64;
|
||||
}
|
||||
|
||||
@@ -87,6 +99,45 @@ impl AggregatorTimeWeight<f32> for AggregatorNumeric {
|
||||
}
|
||||
}
|
||||
|
||||
macro_rules! impl_agg_tw_for_agg_num {
|
||||
($evt:ty) => {
|
||||
impl AggregatorTimeWeight<$evt> for AggregatorNumeric {
|
||||
fn new() -> Self {
|
||||
Self { sum: 0. }
|
||||
}
|
||||
|
||||
fn ingest(&mut self, dt: DtNano, bl: DtNano, val: $evt) {
|
||||
let f = dt.ns() as f64 / bl.ns() as f64;
|
||||
trace!("INGEST {} {}", f, val);
|
||||
self.sum += f * val as f64;
|
||||
}
|
||||
|
||||
fn reset_for_new_bin(&mut self) {
|
||||
self.sum = 0.;
|
||||
}
|
||||
|
||||
fn result_and_reset_for_new_bin(&mut self, filled_width_fraction: f32) -> f64 {
|
||||
let sum = self.sum.clone();
|
||||
trace!(
|
||||
"result_and_reset_for_new_bin sum {} {}",
|
||||
sum,
|
||||
filled_width_fraction
|
||||
);
|
||||
self.sum = 0.;
|
||||
sum / filled_width_fraction as f64
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
impl_agg_tw_for_agg_num!(u8);
|
||||
impl_agg_tw_for_agg_num!(u16);
|
||||
impl_agg_tw_for_agg_num!(u32);
|
||||
impl_agg_tw_for_agg_num!(i8);
|
||||
impl_agg_tw_for_agg_num!(i16);
|
||||
impl_agg_tw_for_agg_num!(i32);
|
||||
impl_agg_tw_for_agg_num!(i64);
|
||||
|
||||
impl AggregatorTimeWeight<u64> for AggregatorNumeric {
|
||||
fn new() -> Self {
|
||||
Self { sum: 0. }
|
||||
|
||||
@@ -5,7 +5,11 @@ use super::___;
|
||||
use core::fmt;
|
||||
use err::thiserror;
|
||||
use err::ThisError;
|
||||
use items_0::timebin::BinningggContainerBinsDyn;
|
||||
use items_0::timebin::BinsBoxed;
|
||||
use items_0::vecpreview::VecPreview;
|
||||
use items_0::AsAnyMut;
|
||||
use items_0::WithLen;
|
||||
use netpod::TsNano;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
@@ -245,6 +249,7 @@ where
|
||||
}
|
||||
|
||||
pub fn pop_front(&mut self) -> Option<BinSingle<EVT>> {
|
||||
todo!("pop_front");
|
||||
let ts1 = if let Some(x) = self.ts1s.pop_front() {
|
||||
x
|
||||
} else {
|
||||
@@ -307,6 +312,88 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<EVT> fmt::Display for ContainerBins<EVT>
|
||||
where
|
||||
EVT: EventValueType,
|
||||
{
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||
fmt::Debug::fmt(self, fmt)
|
||||
}
|
||||
}
|
||||
|
||||
impl<EVT> AsAnyMut for ContainerBins<EVT>
|
||||
where
|
||||
EVT: EventValueType,
|
||||
{
|
||||
fn as_any_mut(&mut self) -> &mut dyn any::Any {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl<EVT> WithLen for ContainerBins<EVT>
|
||||
where
|
||||
EVT: EventValueType,
|
||||
{
|
||||
fn len(&self) -> usize {
|
||||
Self::len(self)
|
||||
}
|
||||
}
|
||||
|
||||
impl<EVT> BinningggContainerBinsDyn for ContainerBins<EVT>
|
||||
where
|
||||
EVT: EventValueType,
|
||||
{
|
||||
fn type_name(&self) -> &'static str {
|
||||
any::type_name::<Self>()
|
||||
}
|
||||
|
||||
fn empty(&self) -> BinsBoxed {
|
||||
Box::new(Self::new())
|
||||
}
|
||||
|
||||
fn clone(&self) -> BinsBoxed {
|
||||
Box::new(<Self as Clone>::clone(self))
|
||||
}
|
||||
|
||||
fn edges_iter(
|
||||
&self,
|
||||
) -> std::iter::Zip<std::collections::vec_deque::Iter<TsNano>, std::collections::vec_deque::Iter<TsNano>> {
|
||||
self.ts1s.iter().zip(self.ts2s.iter())
|
||||
}
|
||||
|
||||
fn drain_into(&mut self, dst: &mut dyn BinningggContainerBinsDyn, range: std::ops::Range<usize>) {
|
||||
let obj = dst.as_any_mut();
|
||||
if let Some(dst) = obj.downcast_mut::<Self>() {
|
||||
dst.ts1s.extend(self.ts1s.drain(range.clone()));
|
||||
} else {
|
||||
let styn = any::type_name::<EVT>();
|
||||
panic!("unexpected drain EVT {} dst {}", styn, dst.type_name());
|
||||
}
|
||||
}
|
||||
|
||||
fn to_old_time_binned(&self) -> Box<dyn items_0::timebin::TimeBinned> {
|
||||
let a = self as &dyn any::Any;
|
||||
if let Some(src) = a.downcast_ref::<ContainerBins<f64>>() {
|
||||
use items_0::Empty;
|
||||
let mut ret = crate::binsdim0::BinsDim0::<f64>::empty();
|
||||
for ((((((&ts1, &ts2), &cnt), min), max), avg), fnl) in src.zip_iter() {
|
||||
ret.push(ts1.ns(), ts2.ns(), cnt, *min, *max, *avg as f32, 0.);
|
||||
}
|
||||
Box::new(ret)
|
||||
} else if let Some(src) = a.downcast_ref::<ContainerBins<f32>>() {
|
||||
use items_0::Empty;
|
||||
let mut ret = crate::binsdim0::BinsDim0::<f32>::empty();
|
||||
for ((((((&ts1, &ts2), &cnt), min), max), avg), fnl) in src.zip_iter() {
|
||||
ret.push(ts1.ns(), ts2.ns(), cnt, *min, *max, *avg as f32, 0.);
|
||||
}
|
||||
Box::new(ret)
|
||||
} else {
|
||||
let styn = any::type_name::<EVT>();
|
||||
todo!("TODO impl for {styn}")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ContainerBinsTakeUpTo<'a, EVT>
|
||||
where
|
||||
EVT: EventValueType,
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use super::aggregator::AggTimeWeightOutputAvg;
|
||||
use super::aggregator::AggregatorNumeric;
|
||||
use super::aggregator::AggregatorTimeWeight;
|
||||
use super::timeweight::timeweight_events_dyn::BinnedEventsTimeweightDynbox;
|
||||
use super::___;
|
||||
use core::fmt;
|
||||
use err::thiserror;
|
||||
@@ -8,6 +9,7 @@ use err::ThisError;
|
||||
use items_0::timebin::BinningggContainerEventsDyn;
|
||||
use items_0::vecpreview::PreviewRange;
|
||||
use items_0::vecpreview::VecPreview;
|
||||
use netpod::BinnedRange;
|
||||
use netpod::TsNano;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
@@ -28,13 +30,13 @@ pub trait Container<EVT>: fmt::Debug + Send + Clone + PreviewRange + Serialize +
|
||||
fn pop_front(&mut self) -> Option<EVT>;
|
||||
}
|
||||
|
||||
pub trait EventValueType: fmt::Debug + Clone + PartialOrd + Send {
|
||||
pub trait EventValueType: fmt::Debug + Clone + PartialOrd + Send + 'static {
|
||||
type Container: Container<Self>;
|
||||
type AggregatorTimeWeight: AggregatorTimeWeight<Self>;
|
||||
type AggTimeWeightOutputAvg: AggTimeWeightOutputAvg;
|
||||
|
||||
fn identity_sum() -> Self;
|
||||
fn add_weighted(&self, add: &Self, f: f32) -> Self;
|
||||
// fn add_weighted(&self, add: &Self, f: f32) -> Self;
|
||||
}
|
||||
|
||||
impl<EVT> Container<EVT> for VecDeque<EVT>
|
||||
@@ -54,6 +56,31 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
macro_rules! impl_event_value_type {
|
||||
($evt:ty, $zero:expr) => {
|
||||
impl EventValueType for $evt {
|
||||
type Container = VecDeque<Self>;
|
||||
type AggregatorTimeWeight = AggregatorNumeric;
|
||||
type AggTimeWeightOutputAvg = f64;
|
||||
|
||||
fn identity_sum() -> Self {
|
||||
$zero
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
impl_event_value_type!(u8, 0);
|
||||
impl_event_value_type!(u16, 0);
|
||||
impl_event_value_type!(u32, 0);
|
||||
impl_event_value_type!(u64, 0);
|
||||
impl_event_value_type!(i8, 0);
|
||||
impl_event_value_type!(i16, 0);
|
||||
impl_event_value_type!(i32, 0);
|
||||
impl_event_value_type!(i64, 0);
|
||||
// impl_event_value_type!(f32, 0.);
|
||||
// impl_event_value_type!(f64, 0.);
|
||||
|
||||
impl EventValueType for f32 {
|
||||
type Container = VecDeque<Self>;
|
||||
type AggregatorTimeWeight = AggregatorNumeric;
|
||||
@@ -62,10 +89,6 @@ impl EventValueType for f32 {
|
||||
fn identity_sum() -> Self {
|
||||
0.
|
||||
}
|
||||
|
||||
fn add_weighted(&self, add: &Self, f: f32) -> Self {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
impl EventValueType for f64 {
|
||||
@@ -76,24 +99,6 @@ impl EventValueType for f64 {
|
||||
fn identity_sum() -> Self {
|
||||
0.
|
||||
}
|
||||
|
||||
fn add_weighted(&self, add: &Self, f: f32) -> Self {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
impl EventValueType for u64 {
|
||||
type Container = VecDeque<Self>;
|
||||
type AggregatorTimeWeight = AggregatorNumeric;
|
||||
type AggTimeWeightOutputAvg = f64;
|
||||
|
||||
fn identity_sum() -> Self {
|
||||
0
|
||||
}
|
||||
|
||||
fn add_weighted(&self, add: &Self, f: f32) -> Self {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -243,7 +248,19 @@ impl<EVT> BinningggContainerEventsDyn for ContainerEvents<EVT>
|
||||
where
|
||||
EVT: EventValueType,
|
||||
{
|
||||
fn binned_events_timeweight_traitobj(&self) -> Box<dyn items_0::timebin::BinnedEventsTimeweightTrait> {
|
||||
todo!()
|
||||
fn type_name(&self) -> &'static str {
|
||||
std::any::type_name::<Self>()
|
||||
}
|
||||
|
||||
fn binned_events_timeweight_traitobj(
|
||||
&self,
|
||||
range: BinnedRange<TsNano>,
|
||||
) -> Box<dyn items_0::timebin::BinnedEventsTimeweightTrait> {
|
||||
BinnedEventsTimeweightDynbox::<EVT>::new(range)
|
||||
}
|
||||
|
||||
fn to_anybox(&mut self) -> Box<dyn std::any::Any> {
|
||||
let ret = core::mem::replace(self, Self::new());
|
||||
Box::new(ret)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,24 +5,14 @@ use crate::binning::container_bins::ContainerBins;
|
||||
use crate::binning::container_events::ContainerEvents;
|
||||
use crate::binning::container_events::ContainerEventsTakeUpTo;
|
||||
use crate::binning::container_events::EventSingle;
|
||||
use crate::channelevents::ChannelEvents;
|
||||
use core::fmt;
|
||||
use err::thiserror;
|
||||
use err::ThisError;
|
||||
use futures_util::Stream;
|
||||
use items_0::streamitem::Sitemty;
|
||||
use items_0::timebin::BinningggBinnerDyn;
|
||||
use items_0::timebin::BinningggBinnerTy;
|
||||
use netpod::log::*;
|
||||
use netpod::BinnedRange;
|
||||
use netpod::DtNano;
|
||||
use netpod::TsNano;
|
||||
use std::collections::VecDeque;
|
||||
use std::marker::PhantomData;
|
||||
use std::mem;
|
||||
use std::pin::Pin;
|
||||
use std::task::Context;
|
||||
use std::task::Poll;
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! trace_ { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) }
|
||||
@@ -34,7 +24,7 @@ macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) }
|
||||
macro_rules! trace_cycle { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) }
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! trace_event_next { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) }
|
||||
macro_rules! trace_event_next { ($($arg:tt)*) => ( if false { trace_!($($arg)*); }) }
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! trace_ingest_init_lst { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) }
|
||||
@@ -43,7 +33,7 @@ macro_rules! trace_ingest_init_lst { ($($arg:tt)*) => ( if true { trace_!($($arg
|
||||
macro_rules! trace_ingest_minmax { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) }
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! trace_ingest_event { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) }
|
||||
macro_rules! trace_ingest_event { ($($arg:tt)*) => ( if false { trace_!($($arg)*); }) }
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! trace_ingest_firsts { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) }
|
||||
@@ -103,7 +93,8 @@ where
|
||||
{
|
||||
// NOTE that this is also used during bin-cycle.
|
||||
fn ingest_event_with_lst_gt_range_beg_agg(&mut self, ev: EventSingle<EVT>, lst: LstRef<EVT>) {
|
||||
trace_ingest_event!("ingest_event_with_lst_gt_range_beg_agg {:?}", ev);
|
||||
let selfname = "ingest_event_with_lst_gt_range_beg_agg";
|
||||
trace_ingest_event!("{selfname} {:?}", ev);
|
||||
if DEBUG_CHECKS {
|
||||
if ev.ts <= self.active_beg {
|
||||
panic!("should never get here");
|
||||
@@ -113,7 +104,7 @@ where
|
||||
}
|
||||
}
|
||||
let dt = ev.ts.delta(self.filled_until);
|
||||
trace_ingest_event!("ingest_event_with_lst_gt_range_beg_agg dt {:?} ev {:?}", dt, ev);
|
||||
trace_ingest_event!("{selfname} dt {:?} ev {:?}", dt, ev);
|
||||
// TODO can the caller already take the value and replace it afterwards with the current value?
|
||||
// This fn could swap the value in lst and directly use it.
|
||||
// This would require that any call path does not mess with lst.
|
||||
@@ -153,7 +144,8 @@ where
|
||||
lst: LstMut<EVT>,
|
||||
minmax: &mut MinMax<EVT>,
|
||||
) -> Result<(), Error> {
|
||||
trace_ingest_event!("ingest_event_with_lst_eq_range_beg");
|
||||
let selfname = "ingest_event_with_lst_eq_range_beg";
|
||||
trace_ingest_event!("{selfname}");
|
||||
// TODO if the event is exactly on the current bin first edge, then there is no contribution to the avg yet
|
||||
// and I must initialize the min/max with the current event.
|
||||
InnerA::apply_min_max(&ev, minmax);
|
||||
@@ -167,9 +159,10 @@ where
|
||||
lst: LstMut<EVT>,
|
||||
minmax: &mut MinMax<EVT>,
|
||||
) -> Result<(), Error> {
|
||||
trace_ingest_event!("ingest_with_lst_gt_range_beg");
|
||||
let selfname = "ingest_with_lst_gt_range_beg";
|
||||
trace_ingest_event!("{selfname}");
|
||||
while let Some(ev) = evs.pop_front() {
|
||||
trace_event_next!("EVENT POP FRONT {:?} {:30}", ev, "ingest_with_lst_gt_range_beg");
|
||||
trace_event_next!("EVENT POP FRONT {:?} {:30}", ev, selfname);
|
||||
if ev.ts <= self.active_beg {
|
||||
panic!("should never get here");
|
||||
}
|
||||
@@ -188,9 +181,10 @@ where
|
||||
lst: LstMut<EVT>,
|
||||
minmax: &mut MinMax<EVT>,
|
||||
) -> Result<(), Error> {
|
||||
trace_ingest_event!("ingest_with_lst_ge_range_beg");
|
||||
let selfname = "ingest_with_lst_ge_range_beg";
|
||||
trace_ingest_event!("{selfname}");
|
||||
while let Some(ev) = evs.pop_front() {
|
||||
trace_event_next!("EVENT POP FRONT {:?} {:30}", ev, "ingest_with_lst_ge_range_beg");
|
||||
trace_event_next!("EVENT POP FRONT {:?} {:30}", ev, selfname);
|
||||
if ev.ts < self.active_beg {
|
||||
panic!("should never get here");
|
||||
}
|
||||
@@ -203,7 +197,7 @@ where
|
||||
} else {
|
||||
self.ingest_event_with_lst_gt_range_beg(ev.clone(), LstMut(lst.0), minmax)?;
|
||||
self.cnt += 1;
|
||||
trace_ingest_firsts!("ingest_with_lst_ge_range_beg now calling ingest_with_lst_gt_range_beg");
|
||||
trace_ingest_firsts!("{selfname} now calling ingest_with_lst_gt_range_beg");
|
||||
return self.ingest_with_lst_gt_range_beg(evs, LstMut(lst.0), minmax);
|
||||
}
|
||||
}
|
||||
@@ -216,11 +210,12 @@ where
|
||||
lst: LstMut<EVT>,
|
||||
minmax: &mut MinMax<EVT>,
|
||||
) -> Result<(), Error> {
|
||||
trace_ingest_event!("ingest_with_lst_minmax");
|
||||
let selfname = "ingest_with_lst_minmax";
|
||||
trace_ingest_event!("{selfname}");
|
||||
// TODO how to handle the min max? I don't take event data yet out of the container.
|
||||
if let Some(ts0) = evs.ts_first() {
|
||||
trace_ingest_event!("EVENT POP FRONT ingest_with_lst_minmax");
|
||||
trace_ingest_event!("EVENT TIMESTAMP FRONT {:?} ingest_with_lst_minmax", ts0);
|
||||
trace_ingest_event!("EVENT POP FRONT {selfname}");
|
||||
trace_ingest_event!("EVENT TIMESTAMP FRONT {:?} {selfname}", ts0);
|
||||
if ts0 < self.active_beg {
|
||||
panic!("should never get here");
|
||||
} else {
|
||||
@@ -435,9 +430,10 @@ where
|
||||
}
|
||||
|
||||
fn ingest_event_without_lst(&mut self, ev: EventSingle<EVT>) -> Result<(), Error> {
|
||||
let selfname = "ingest_event_without_lst";
|
||||
let b = &self.inner_a.inner_b;
|
||||
if ev.ts >= b.active_end {
|
||||
panic!("should never get here");
|
||||
panic!("{selfname} should never get here");
|
||||
} else {
|
||||
trace_ingest_init_lst!("ingest_event_without_lst set lst {:?}", ev);
|
||||
self.lst = Some(ev.clone());
|
||||
@@ -453,10 +449,11 @@ where
|
||||
}
|
||||
|
||||
fn ingest_without_lst(&mut self, mut evs: ContainerEventsTakeUpTo<EVT>) -> Result<(), Error> {
|
||||
let selfname = "ingest_without_lst";
|
||||
if let Some(ev) = evs.pop_front() {
|
||||
trace_event_next!("EVENT POP FRONT {:?} {:30}", ev, "ingest_without_lst");
|
||||
trace_event_next!("EVENT POP FRONT {:?} {:30}", ev, selfname);
|
||||
if ev.ts >= self.inner_a.inner_b.active_end {
|
||||
panic!("should never get here");
|
||||
panic!("{selfname} should never get here");
|
||||
} else {
|
||||
self.ingest_event_without_lst(ev)?;
|
||||
if let Some(lst) = self.lst.as_mut() {
|
||||
@@ -633,6 +630,10 @@ where
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn output_len(&self) -> usize {
|
||||
self.out.len()
|
||||
}
|
||||
|
||||
pub fn output(&mut self) -> ContainerBins<EVT> {
|
||||
mem::replace(&mut self.out, ContainerBins::new())
|
||||
}
|
||||
|
||||
@@ -1,11 +1,13 @@
|
||||
use super::timeweight_events::BinnedEventsTimeweight;
|
||||
use crate::binning::container_bins::ContainerBins;
|
||||
use crate::binning::container_events::ContainerEvents;
|
||||
use crate::binning::container_events::EventValueType;
|
||||
use crate::channelevents::ChannelEvents;
|
||||
use err::thiserror;
|
||||
use err::ThisError;
|
||||
use futures_util::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use items_0::streamitem::LogItem;
|
||||
use items_0::streamitem::Sitemty;
|
||||
use items_0::timebin::BinnedEventsTimeweightTrait;
|
||||
use items_0::timebin::BinningggBinnerDyn;
|
||||
@@ -17,6 +19,7 @@ use items_0::timebin::EventsBoxed;
|
||||
use netpod::log::*;
|
||||
use netpod::BinnedRange;
|
||||
use netpod::TsNano;
|
||||
use std::any;
|
||||
use std::arch::x86_64;
|
||||
use std::ops::ControlFlow;
|
||||
use std::pin::Pin;
|
||||
@@ -55,23 +58,40 @@ impl<EVT> BinnedEventsTimeweightTrait for BinnedEventsTimeweightDynbox<EVT>
|
||||
where
|
||||
EVT: EventValueType,
|
||||
{
|
||||
fn ingest(&mut self, evs_all: EventsBoxed) -> Result<(), BinningggError> {
|
||||
todo!()
|
||||
fn ingest(&mut self, mut evs: EventsBoxed) -> Result<(), BinningggError> {
|
||||
// let a = (&evs as &dyn any::Any).downcast_ref::<String>();
|
||||
// evs.downcast::<String>();
|
||||
// evs.as_anybox().downcast::<ContainerEvents<f64>>();
|
||||
match evs.to_anybox().downcast::<ContainerEvents<EVT>>() {
|
||||
Ok(evs) => {
|
||||
let evs = {
|
||||
let a = evs;
|
||||
*a
|
||||
};
|
||||
Ok(self.binner.ingest(evs)?)
|
||||
}
|
||||
Err(_) => Err(BinningggError::TypeMismatch {
|
||||
have: evs.type_name().into(),
|
||||
expect: std::any::type_name::<ContainerEvents<EVT>>().into(),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
fn input_done_range_final(&mut self) -> Result<(), BinningggError> {
|
||||
// self.binner.input_done_range_final()
|
||||
todo!()
|
||||
Ok(self.binner.input_done_range_final()?)
|
||||
}
|
||||
|
||||
fn input_done_range_open(&mut self) -> Result<(), BinningggError> {
|
||||
// self.binner.input_done_range_open()
|
||||
todo!()
|
||||
Ok(self.binner.input_done_range_open()?)
|
||||
}
|
||||
|
||||
fn output(&mut self) -> Result<BinsBoxed, BinningggError> {
|
||||
// self.binner.output()
|
||||
todo!()
|
||||
fn output(&mut self) -> Result<Option<BinsBoxed>, BinningggError> {
|
||||
if self.binner.output_len() == 0 {
|
||||
Ok(None)
|
||||
} else {
|
||||
let c = self.binner.output();
|
||||
Ok(Some(Box::new(c)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -93,36 +113,36 @@ impl BinnedEventsTimeweightLazy {
|
||||
impl BinnedEventsTimeweightTrait for BinnedEventsTimeweightLazy {
|
||||
fn ingest(&mut self, evs_all: EventsBoxed) -> Result<(), BinningggError> {
|
||||
self.binned_events
|
||||
.get_or_insert_with(|| evs_all.binned_events_timeweight_traitobj())
|
||||
.get_or_insert_with(|| evs_all.binned_events_timeweight_traitobj(self.range.clone()))
|
||||
.ingest(evs_all)
|
||||
}
|
||||
|
||||
fn input_done_range_final(&mut self) -> Result<(), BinningggError> {
|
||||
debug!("TODO something to do if we miss the binner here?");
|
||||
self.binned_events
|
||||
.as_mut()
|
||||
.map(|x| x.input_done_range_final())
|
||||
.unwrap_or(Ok(()))
|
||||
.unwrap_or_else(|| {
|
||||
debug!("TODO something to do if we miss the binner here?");
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
fn input_done_range_open(&mut self) -> Result<(), BinningggError> {
|
||||
debug!("TODO something to do if we miss the binner here?");
|
||||
self.binned_events
|
||||
.as_mut()
|
||||
.map(|x| x.input_done_range_open())
|
||||
.unwrap_or(Ok(()))
|
||||
}
|
||||
|
||||
fn output(&mut self) -> Result<BinsBoxed, BinningggError> {
|
||||
debug!("TODO something to do if we miss the binner here?");
|
||||
// TODO change trait because without binner we can not produce any container here
|
||||
todo!()
|
||||
fn output(&mut self) -> Result<Option<BinsBoxed>, BinningggError> {
|
||||
self.binned_events.as_mut().map(|x| x.output()).unwrap_or(Ok(None))
|
||||
}
|
||||
}
|
||||
|
||||
enum StreamState {
|
||||
Reading,
|
||||
Done,
|
||||
Invalid,
|
||||
}
|
||||
|
||||
pub struct BinnedEventsTimeweightStream {
|
||||
@@ -158,13 +178,14 @@ impl BinnedEventsTimeweightStream {
|
||||
ChannelEvents::Events(evs) => match self.binned_events.ingest(evs.to_container_events()) {
|
||||
Ok(()) => {
|
||||
match self.binned_events.output() {
|
||||
Ok(x) => {
|
||||
Ok(Some(x)) => {
|
||||
if x.len() == 0 {
|
||||
Continue(())
|
||||
} else {
|
||||
Break(Ready(Some(Ok(DataItem(Data(x))))))
|
||||
}
|
||||
}
|
||||
Ok(None) => Continue(()),
|
||||
Err(e) => Break(Ready(Some(Err(::err::Error::from_string(e))))),
|
||||
}
|
||||
// Continue(())
|
||||
@@ -192,19 +213,54 @@ impl BinnedEventsTimeweightStream {
|
||||
}
|
||||
|
||||
fn handle_eos(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<<Self as Stream>::Item>> {
|
||||
debug!("handle_eos");
|
||||
use items_0::streamitem::RangeCompletableItem::*;
|
||||
use items_0::streamitem::StreamItem::*;
|
||||
use Poll::*;
|
||||
self.state = StreamState::Done;
|
||||
if self.range_complete {
|
||||
self.binned_events.input_done_range_final();
|
||||
self.binned_events
|
||||
.input_done_range_final()
|
||||
.map_err(::err::Error::from_string)?;
|
||||
} else {
|
||||
self.binned_events.input_done_range_open();
|
||||
self.binned_events
|
||||
.input_done_range_open()
|
||||
.map_err(::err::Error::from_string)?;
|
||||
}
|
||||
match self.binned_events.output() {
|
||||
Ok(x) => Ready(Some(Ok(DataItem(Data(x))))),
|
||||
Err(e) => Ready(Some(Err(::err::Error::from_string(e)))),
|
||||
match self.binned_events.output().map_err(::err::Error::from_string)? {
|
||||
Some(x) => {
|
||||
debug!("seeing ready bins {:?}", x);
|
||||
Ready(Some(Ok(DataItem(Data(x)))))
|
||||
}
|
||||
None => {
|
||||
let item = LogItem::from_node(888, Level::INFO, format!("no bins ready on eos"));
|
||||
Ready(Some(Ok(Log(item))))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_main(mut self: Pin<&mut Self>, cx: &mut Context) -> ControlFlow<Poll<Option<<Self as Stream>::Item>>> {
|
||||
use ControlFlow::*;
|
||||
use Poll::*;
|
||||
let ret = match &self.state {
|
||||
StreamState::Reading => match self.as_mut().inp.poll_next_unpin(cx) {
|
||||
Ready(Some(x)) => self.as_mut().handle_sitemty(x, cx),
|
||||
Ready(None) => Break(self.as_mut().handle_eos(cx)),
|
||||
Pending => Break(Pending),
|
||||
},
|
||||
StreamState::Done => {
|
||||
self.state = StreamState::Invalid;
|
||||
Break(Ready(None))
|
||||
}
|
||||
StreamState::Invalid => {
|
||||
panic!("StreamState::Invalid")
|
||||
}
|
||||
};
|
||||
if let Break(Ready(Some(Err(_)))) = ret {
|
||||
self.state = StreamState::Done;
|
||||
}
|
||||
ret
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for BinnedEventsTimeweightStream {
|
||||
@@ -212,15 +268,10 @@ impl Stream for BinnedEventsTimeweightStream {
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use ControlFlow::*;
|
||||
use Poll::*;
|
||||
loop {
|
||||
break match self.as_mut().inp.poll_next_unpin(cx) {
|
||||
Ready(Some(x)) => match self.as_mut().handle_sitemty(x, cx) {
|
||||
Continue(()) => continue,
|
||||
Break(x) => x,
|
||||
},
|
||||
Ready(None) => self.handle_eos(cx),
|
||||
Pending => Pending,
|
||||
break match self.as_mut().handle_main(cx) {
|
||||
Break(x) => x,
|
||||
Continue(()) => continue,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
use super::aggregator::AggregatorTimeWeight;
|
||||
use super::binnedvaluetype::BinnedNumericValue;
|
||||
use super::container_events::Container;
|
||||
use super::container_events::EventValueType;
|
||||
use core::fmt;
|
||||
@@ -88,9 +87,4 @@ impl EventValueType for EnumVariant {
|
||||
fn identity_sum() -> Self {
|
||||
todo!()
|
||||
}
|
||||
|
||||
// TODO also remove from trait, push it to a more specialized trait for the plain numeric cases.
|
||||
fn add_weighted(&self, add: &Self, f: f32) -> Self {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -882,6 +882,18 @@ impl<STY: ScalarOps> EventsNonObj for EventsDim0<STY> {
|
||||
}
|
||||
}
|
||||
|
||||
macro_rules! try_to_container_events {
|
||||
($sty:ty, $this:expr) => {
|
||||
if let Some(evs) = $this.as_any_ref().downcast_ref::<EventsDim0<$sty>>() {
|
||||
use crate::binning::container_events::ContainerEvents;
|
||||
let tss = $this.tss.iter().map(|&x| TsNano::from_ns(x)).collect();
|
||||
let vals = evs.values.clone();
|
||||
let ret = ContainerEvents::<$sty>::from_constituents(tss, vals);
|
||||
return Box::new(ret);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
impl<STY: ScalarOps> Events for EventsDim0<STY> {
|
||||
fn as_time_binnable_ref(&self) -> &dyn TimeBinnable {
|
||||
self
|
||||
@@ -1105,15 +1117,32 @@ impl<STY: ScalarOps> Events for EventsDim0<STY> {
|
||||
}
|
||||
|
||||
fn to_container_events(&self) -> Box<dyn ::items_0::timebin::BinningggContainerEventsDyn> {
|
||||
use crate::binning::container_events::ContainerEvents;
|
||||
let tss = self.tss.iter().map(|&x| TsNano::from_ns(x)).collect();
|
||||
if let Some(evs) = self.as_any_ref().downcast_ref::<EventsDim0<f64>>() {
|
||||
let vals = evs.values.clone();
|
||||
let ret = ContainerEvents::<f64>::from_constituents(tss, vals);
|
||||
Box::new(ret)
|
||||
} else {
|
||||
todo!()
|
||||
}
|
||||
try_to_container_events!(u8, self);
|
||||
try_to_container_events!(u16, self);
|
||||
try_to_container_events!(u32, self);
|
||||
try_to_container_events!(u64, self);
|
||||
try_to_container_events!(f32, self);
|
||||
try_to_container_events!(f64, self);
|
||||
let styn = any::type_name::<STY>();
|
||||
todo!("TODO for {styn}")
|
||||
}
|
||||
}
|
||||
|
||||
fn try_to_container_events_fn<STY, EVT>(
|
||||
this: &EventsDim0<STY>,
|
||||
) -> Option<Box<dyn ::items_0::timebin::BinningggContainerEventsDyn>>
|
||||
where
|
||||
STY: ScalarOps,
|
||||
EVT: crate::binning::container_events::EventValueType<Container = std::collections::VecDeque<STY>>,
|
||||
{
|
||||
use crate::binning::container_events::ContainerEvents;
|
||||
if let Some(evs) = this.as_any_ref().downcast_ref::<EventsDim0<STY>>() {
|
||||
let tss = this.tss.iter().map(|&x| TsNano::from_ns(x)).collect();
|
||||
let vals = evs.values.clone();
|
||||
let ret = ContainerEvents::<EVT>::from_constituents(tss, vals);
|
||||
Some(Box::new(ret))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -20,7 +20,6 @@ pub mod test;
|
||||
pub mod testgen;
|
||||
pub mod timebin;
|
||||
pub mod transform;
|
||||
pub mod vecpreview;
|
||||
|
||||
use channelevents::ChannelEvents;
|
||||
use futures_util::Stream;
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
|
||||
@@ -193,13 +193,18 @@ impl ScyllaEventReadProvider {
|
||||
}
|
||||
|
||||
impl EventsReadProvider for ScyllaEventReadProvider {
|
||||
fn read(&self, evq: EventsSubQuery, chconf: ChConf) -> streams::timebin::cached::reader::EventsReading {
|
||||
fn read(&self, evq: EventsSubQuery) -> streams::timebin::cached::reader::EventsReading {
|
||||
let scyqueue = self.scyqueue.clone();
|
||||
let fut1 = async move { crate::scylla::scylla_channel_event_stream(evq, chconf, &scyqueue).await };
|
||||
let stream = ScyllaEventsReadStream {
|
||||
fut1: Some(Box::pin(fut1)),
|
||||
stream: None,
|
||||
};
|
||||
streams::timebin::cached::reader::EventsReading::new(Box::pin(stream))
|
||||
match evq.ch_conf().clone() {
|
||||
netpod::ChannelTypeConfigGen::Scylla(ch_conf) => {
|
||||
let fut1 = async move { crate::scylla::scylla_channel_event_stream(evq, ch_conf, &scyqueue).await };
|
||||
let stream = ScyllaEventsReadStream {
|
||||
fut1: Some(Box::pin(fut1)),
|
||||
stream: None,
|
||||
};
|
||||
streams::timebin::cached::reader::EventsReading::new(Box::pin(stream))
|
||||
}
|
||||
netpod::ChannelTypeConfigGen::SfDatabuffer(_) => panic!("not a scylla reader"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -610,6 +610,10 @@ impl EventsSubQuery {
|
||||
pub fn merger_out_len_max(&self) -> Option<u32> {
|
||||
self.settings.merger_out_len_max()
|
||||
}
|
||||
|
||||
pub fn settings(&self) -> &EventsSubQuerySettings {
|
||||
&self.settings
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
|
||||
121
crates/streams/src/eventsplainreader.rs
Normal file
121
crates/streams/src/eventsplainreader.rs
Normal file
@@ -0,0 +1,121 @@
|
||||
use crate::tcprawclient::OpenBoxedBytesStreamsBox;
|
||||
use crate::timebin::cached::reader::EventsReadProvider;
|
||||
use crate::timebin::cached::reader::EventsReading;
|
||||
use crate::timebin::CacheReadProvider;
|
||||
use futures_util::Future;
|
||||
use futures_util::FutureExt;
|
||||
use futures_util::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use items_0::streamitem::Sitemty;
|
||||
use items_2::channelevents::ChannelEvents;
|
||||
use netpod::ReqCtx;
|
||||
use query::api4::events::EventsSubQuery;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::task::Context;
|
||||
use std::task::Poll;
|
||||
|
||||
enum StreamState {
|
||||
Opening(
|
||||
Pin<
|
||||
Box<
|
||||
dyn Future<Output = Result<Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>>, ::err::Error>>
|
||||
+ Send,
|
||||
>,
|
||||
>,
|
||||
),
|
||||
Reading(Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>>),
|
||||
}
|
||||
|
||||
struct InnerStream {
|
||||
state: StreamState,
|
||||
}
|
||||
|
||||
impl Stream for InnerStream {
|
||||
type Item = Sitemty<ChannelEvents>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use Poll::*;
|
||||
loop {
|
||||
break match &mut self.state {
|
||||
StreamState::Opening(fut) => match fut.poll_unpin(cx) {
|
||||
Ready(Ok(x)) => {
|
||||
self.state = StreamState::Reading(x);
|
||||
continue;
|
||||
}
|
||||
Ready(Err(e)) => Ready(Some(Err(e))),
|
||||
Pending => Pending,
|
||||
},
|
||||
StreamState::Reading(fut) => match fut.poll_next_unpin(cx) {
|
||||
Ready(Some(x)) => Ready(Some(x)),
|
||||
Ready(None) => Ready(None),
|
||||
Pending => Pending,
|
||||
},
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct SfDatabufferEventReadProvider {
|
||||
ctx: Arc<ReqCtx>,
|
||||
open_bytes: OpenBoxedBytesStreamsBox,
|
||||
}
|
||||
|
||||
impl SfDatabufferEventReadProvider {
|
||||
pub fn new(ctx: Arc<ReqCtx>, open_bytes: OpenBoxedBytesStreamsBox) -> Self {
|
||||
Self { ctx, open_bytes }
|
||||
}
|
||||
}
|
||||
|
||||
impl EventsReadProvider for SfDatabufferEventReadProvider {
|
||||
fn read(&self, evq: EventsSubQuery) -> EventsReading {
|
||||
let range = match evq.range() {
|
||||
netpod::range::evrange::SeriesRange::TimeRange(x) => x.clone(),
|
||||
netpod::range::evrange::SeriesRange::PulseRange(_) => panic!("not available for pulse range"),
|
||||
};
|
||||
let ctx = self.ctx.clone();
|
||||
let open_bytes = self.open_bytes.clone();
|
||||
let state = StreamState::Opening(Box::pin(async move {
|
||||
let ret = crate::timebinnedjson::timebinnable_stream_sf_databuffer_channelevents(
|
||||
range,
|
||||
evq.need_one_before_range(),
|
||||
evq.ch_conf().clone(),
|
||||
evq.transform().clone(),
|
||||
evq.settings().clone(),
|
||||
evq.log_level().into(),
|
||||
ctx,
|
||||
open_bytes,
|
||||
)
|
||||
.await;
|
||||
ret.map(|x| Box::pin(x) as _)
|
||||
}));
|
||||
let stream = InnerStream { state };
|
||||
EventsReading::new(Box::pin(stream))
|
||||
}
|
||||
}
|
||||
|
||||
pub struct DummyCacheReadProvider {}
|
||||
|
||||
impl DummyCacheReadProvider {
|
||||
pub fn new() -> Self {
|
||||
Self {}
|
||||
}
|
||||
}
|
||||
|
||||
impl CacheReadProvider for DummyCacheReadProvider {
|
||||
fn read(
|
||||
&self,
|
||||
series: u64,
|
||||
bin_len: netpod::DtMs,
|
||||
msp: u64,
|
||||
offs: std::ops::Range<u32>,
|
||||
) -> crate::timebin::cached::reader::CacheReading {
|
||||
let stream = futures_util::future::ready(Ok(None));
|
||||
crate::timebin::cached::reader::CacheReading::new(Box::pin(stream))
|
||||
}
|
||||
|
||||
fn write(&self, series: u64, bins: items_0::timebin::BinsBoxed) -> crate::timebin::cached::reader::CacheWriting {
|
||||
let fut = futures_util::future::ready(Ok(()));
|
||||
crate::timebin::cached::reader::CacheWriting::new(Box::pin(fut))
|
||||
}
|
||||
}
|
||||
@@ -2,6 +2,7 @@ pub mod boxed;
|
||||
pub mod cbor_stream;
|
||||
pub mod collect;
|
||||
pub mod dtflags;
|
||||
pub mod eventsplainreader;
|
||||
pub mod filechunkread;
|
||||
pub mod firsterr;
|
||||
pub mod framed_bytes;
|
||||
|
||||
@@ -9,7 +9,6 @@ use items_0::timebin::BinsBoxed;
|
||||
use items_2::channelevents::ChannelEvents;
|
||||
use netpod::log::*;
|
||||
use netpod::BinnedRange;
|
||||
use netpod::ChConf;
|
||||
use netpod::DtMs;
|
||||
use netpod::TsNano;
|
||||
use query::api4::events::EventsSubQuery;
|
||||
@@ -50,23 +49,23 @@ impl Stream for EventsReading {
|
||||
}
|
||||
|
||||
pub trait EventsReadProvider: Send + Sync {
|
||||
fn read(&self, evq: EventsSubQuery, chconf: ChConf) -> EventsReading;
|
||||
fn read(&self, evq: EventsSubQuery) -> EventsReading;
|
||||
}
|
||||
|
||||
pub struct CacheReading {
|
||||
fut: Pin<Box<dyn Future<Output = Result<BinsBoxed, streams::timebin::cached::reader::Error>> + Send>>,
|
||||
fut: Pin<Box<dyn Future<Output = Result<Option<BinsBoxed>, streams::timebin::cached::reader::Error>> + Send>>,
|
||||
}
|
||||
|
||||
impl CacheReading {
|
||||
pub fn new(
|
||||
fut: Pin<Box<dyn Future<Output = Result<BinsBoxed, streams::timebin::cached::reader::Error>> + Send>>,
|
||||
fut: Pin<Box<dyn Future<Output = Result<Option<BinsBoxed>, streams::timebin::cached::reader::Error>> + Send>>,
|
||||
) -> Self {
|
||||
Self { fut }
|
||||
}
|
||||
}
|
||||
|
||||
impl Future for CacheReading {
|
||||
type Output = Result<BinsBoxed, streams::timebin::cached::reader::Error>;
|
||||
type Output = Result<Option<BinsBoxed>, streams::timebin::cached::reader::Error>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||
self.fut.poll_unpin(cx)
|
||||
@@ -111,7 +110,7 @@ pub struct CachedReader {
|
||||
ts1next: TsNano,
|
||||
bin_len: DtMs,
|
||||
cache_read_provider: Arc<dyn CacheReadProvider>,
|
||||
reading: Option<Pin<Box<dyn Future<Output = Result<BinsBoxed, Error>> + Send>>>,
|
||||
reading: Option<Pin<Box<dyn Future<Output = Result<Option<BinsBoxed>, Error>> + Send>>>,
|
||||
}
|
||||
|
||||
impl CachedReader {
|
||||
@@ -149,7 +148,7 @@ impl Stream for CachedReader {
|
||||
Ready(x) => {
|
||||
self.reading = None;
|
||||
match x {
|
||||
Ok(bins) => {
|
||||
Ok(Some(bins)) => {
|
||||
trace_emit!(
|
||||
"- - - - - - - - - - - - emit cached bins {} bin_len {}",
|
||||
bins.len(),
|
||||
@@ -157,6 +156,9 @@ impl Stream for CachedReader {
|
||||
);
|
||||
Ready(Some(Ok(bins)))
|
||||
}
|
||||
Ok(None) => {
|
||||
continue;
|
||||
}
|
||||
Err(e) => Ready(Some(Err(e))),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -32,14 +32,13 @@ impl BinnedFromEvents {
|
||||
pub fn new(
|
||||
range: BinnedRange<TsNano>,
|
||||
evq: EventsSubQuery,
|
||||
chconf: ChConf,
|
||||
do_time_weight: bool,
|
||||
read_provider: Arc<dyn EventsReadProvider>,
|
||||
) -> Result<Self, Error> {
|
||||
if !evq.range().is_time() {
|
||||
panic!();
|
||||
}
|
||||
let stream = read_provider.read(evq, chconf);
|
||||
let stream = read_provider.read(evq);
|
||||
// let stream = stream.map(|x| {
|
||||
// let x = items_0::try_map_sitemty_data!(x, |x| match x {
|
||||
// ChannelEvents::Events(x) => {
|
||||
|
||||
@@ -56,7 +56,6 @@ pub struct TimeBinnedFromLayers {
|
||||
sub: EventsSubQuerySettings,
|
||||
log_level: String,
|
||||
ctx: Arc<ReqCtx>,
|
||||
open_bytes: OpenBoxedBytesStreamsBox,
|
||||
inp: BoxedInput,
|
||||
}
|
||||
|
||||
@@ -72,8 +71,6 @@ impl TimeBinnedFromLayers {
|
||||
sub: EventsSubQuerySettings,
|
||||
log_level: String,
|
||||
ctx: Arc<ReqCtx>,
|
||||
open_bytes: OpenBoxedBytesStreamsBox,
|
||||
series: u64,
|
||||
range: BinnedRange<TsNano>,
|
||||
do_time_weight: bool,
|
||||
bin_len_layers: Vec<DtMs>,
|
||||
@@ -83,7 +80,7 @@ impl TimeBinnedFromLayers {
|
||||
debug!(
|
||||
"{}::new {:?} {:?} {:?}",
|
||||
Self::type_name(),
|
||||
series,
|
||||
ch_conf.series(),
|
||||
range,
|
||||
bin_len_layers
|
||||
);
|
||||
@@ -98,7 +95,6 @@ impl TimeBinnedFromLayers {
|
||||
sub.clone(),
|
||||
log_level.clone(),
|
||||
ctx.clone(),
|
||||
series,
|
||||
range,
|
||||
do_time_weight,
|
||||
bin_len_layers,
|
||||
@@ -112,7 +108,6 @@ impl TimeBinnedFromLayers {
|
||||
sub,
|
||||
log_level,
|
||||
ctx,
|
||||
open_bytes,
|
||||
inp: Box::pin(inp),
|
||||
};
|
||||
Ok(ret)
|
||||
@@ -137,7 +132,6 @@ impl TimeBinnedFromLayers {
|
||||
sub.clone(),
|
||||
log_level.clone(),
|
||||
ctx.clone(),
|
||||
series,
|
||||
range_finer.clone(),
|
||||
do_time_weight,
|
||||
bin_len_layers,
|
||||
@@ -152,7 +146,6 @@ impl TimeBinnedFromLayers {
|
||||
sub,
|
||||
log_level,
|
||||
ctx,
|
||||
open_bytes,
|
||||
inp: Box::pin(inp),
|
||||
};
|
||||
Ok(ret)
|
||||
@@ -168,30 +161,18 @@ impl TimeBinnedFromLayers {
|
||||
transform_query.clone(),
|
||||
);
|
||||
let evq = EventsSubQuery::from_parts(select, sub.clone(), ctx.reqid().into(), log_level.clone());
|
||||
match &ch_conf {
|
||||
ChannelTypeConfigGen::Scylla(chconf) => {
|
||||
let inp = BinnedFromEvents::new(
|
||||
range,
|
||||
evq,
|
||||
chconf.clone(),
|
||||
do_time_weight,
|
||||
events_read_provider,
|
||||
)?;
|
||||
let ret = Self {
|
||||
ch_conf,
|
||||
cache_usage,
|
||||
transform_query,
|
||||
sub,
|
||||
log_level,
|
||||
ctx,
|
||||
open_bytes,
|
||||
inp: Box::pin(inp),
|
||||
};
|
||||
debug!("{}::new setup from events", Self::type_name());
|
||||
Ok(ret)
|
||||
}
|
||||
ChannelTypeConfigGen::SfDatabuffer(_) => return Err(Error::SfDatabufferNotSupported),
|
||||
}
|
||||
let inp = BinnedFromEvents::new(range, evq, do_time_weight, events_read_provider)?;
|
||||
let ret = Self {
|
||||
ch_conf,
|
||||
cache_usage,
|
||||
transform_query,
|
||||
sub,
|
||||
log_level,
|
||||
ctx,
|
||||
inp: Box::pin(inp),
|
||||
};
|
||||
debug!("{}::new setup from events", Self::type_name());
|
||||
Ok(ret)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -66,7 +66,6 @@ pub struct GapFill {
|
||||
sub: EventsSubQuerySettings,
|
||||
log_level: String,
|
||||
ctx: Arc<ReqCtx>,
|
||||
series: u64,
|
||||
range: BinnedRange<TsNano>,
|
||||
do_time_weight: bool,
|
||||
bin_len_layers: Vec<DtMs>,
|
||||
@@ -97,7 +96,6 @@ impl GapFill {
|
||||
sub: EventsSubQuerySettings,
|
||||
log_level: String,
|
||||
ctx: Arc<ReqCtx>,
|
||||
series: u64,
|
||||
range: BinnedRange<TsNano>,
|
||||
do_time_weight: bool,
|
||||
bin_len_layers: Vec<DtMs>,
|
||||
@@ -107,6 +105,7 @@ impl GapFill {
|
||||
let dbgname = format!("{}--[{}]", dbgname_parent, range);
|
||||
debug_init!("new dbgname {}", dbgname);
|
||||
let inp = if cache_usage.is_cache_read() {
|
||||
let series = ch_conf.series().expect("series id for cache read");
|
||||
let stream = super::cached::reader::CachedReader::new(series, range.clone(), cache_read_provider.clone())?
|
||||
.map(|x| match x {
|
||||
Ok(x) => Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))),
|
||||
@@ -125,7 +124,6 @@ impl GapFill {
|
||||
sub,
|
||||
log_level,
|
||||
ctx,
|
||||
series,
|
||||
range,
|
||||
do_time_weight,
|
||||
bin_len_layers,
|
||||
@@ -257,7 +255,6 @@ impl GapFill {
|
||||
self.sub.clone(),
|
||||
self.log_level.clone(),
|
||||
self.ctx.clone(),
|
||||
self.series,
|
||||
range_finer_one_before_bin,
|
||||
self.do_time_weight,
|
||||
self.bin_len_layers.clone(),
|
||||
@@ -288,26 +285,16 @@ impl GapFill {
|
||||
self.ctx.reqid().into(),
|
||||
self.log_level.clone(),
|
||||
);
|
||||
match &self.ch_conf {
|
||||
ChannelTypeConfigGen::Scylla(chconf) => {
|
||||
let range = BinnedRange::from_nano_range(range.clone(), self.range.bin_len.to_dt_ms());
|
||||
let inp = BinnedFromEvents::new(
|
||||
range,
|
||||
evq,
|
||||
chconf.clone(),
|
||||
self.do_time_weight,
|
||||
self.events_read_provider.clone(),
|
||||
)?;
|
||||
self.inp_finer = Some(Box::pin(inp));
|
||||
}
|
||||
ChannelTypeConfigGen::SfDatabuffer(_) => return Err(Error::SfDatabufferNotSupported),
|
||||
}
|
||||
let range = BinnedRange::from_nano_range(range.clone(), self.range.bin_len.to_dt_ms());
|
||||
let inp = BinnedFromEvents::new(range, evq, self.do_time_weight, self.events_read_provider.clone())?;
|
||||
self.inp_finer = Some(Box::pin(inp));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn cache_write(mut self: Pin<&mut Self>, bins: BinsBoxed) -> Result<(), Error> {
|
||||
self.cache_writing = Some(self.cache_read_provider.write(self.series, bins));
|
||||
let series = ::err::todoval();
|
||||
self.cache_writing = Some(self.cache_read_provider.write(series, bins));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -53,7 +53,7 @@ fn assert_stream_send<'u, R>(stream: impl 'u + Send + Stream<Item = R>) -> impl
|
||||
stream
|
||||
}
|
||||
|
||||
pub async fn timebinnable_stream(
|
||||
pub async fn timebinnable_stream_sf_databuffer_box_events(
|
||||
range: NanoRange,
|
||||
one_before_range: bool,
|
||||
ch_conf: ChannelTypeConfigGen,
|
||||
@@ -62,7 +62,7 @@ pub async fn timebinnable_stream(
|
||||
log_level: String,
|
||||
ctx: Arc<ReqCtx>,
|
||||
open_bytes: OpenBoxedBytesStreamsBox,
|
||||
) -> Result<TimeBinnableStreamBox, Error> {
|
||||
) -> Result<impl Stream<Item = Sitemty<Box<dyn Events>>>, Error> {
|
||||
let subq = make_sub_query(
|
||||
ch_conf,
|
||||
range.clone().into(),
|
||||
@@ -222,6 +222,32 @@ pub async fn timebinnable_stream(
|
||||
let stream = stream.map(|x| x);
|
||||
Box::pin(stream)
|
||||
};
|
||||
Ok(stream)
|
||||
}
|
||||
|
||||
async fn timebinnable_stream_sf_databuffer_binnable_box(
|
||||
range: NanoRange,
|
||||
one_before_range: bool,
|
||||
ch_conf: ChannelTypeConfigGen,
|
||||
transform_query: TransformQuery,
|
||||
sub: EventsSubQuerySettings,
|
||||
log_level: String,
|
||||
ctx: Arc<ReqCtx>,
|
||||
open_bytes: OpenBoxedBytesStreamsBox,
|
||||
) -> Result<TimeBinnableStreamBox, Error> {
|
||||
let stream = timebinnable_stream_sf_databuffer_box_events(
|
||||
range,
|
||||
one_before_range,
|
||||
ch_conf,
|
||||
transform_query,
|
||||
sub,
|
||||
log_level,
|
||||
ctx,
|
||||
open_bytes,
|
||||
)
|
||||
.await?;
|
||||
// let stream = stream.map(|x| x);
|
||||
// let stream = stream.map(|x| ChannelEvents::Events(x));
|
||||
|
||||
// let stream = stream.map(move |k| {
|
||||
// on_sitemty_data!(k, |k| {
|
||||
@@ -236,6 +262,39 @@ pub async fn timebinnable_stream(
|
||||
Ok(TimeBinnableStreamBox(stream))
|
||||
}
|
||||
|
||||
pub async fn timebinnable_stream_sf_databuffer_channelevents(
|
||||
range: NanoRange,
|
||||
one_before_range: bool,
|
||||
ch_conf: ChannelTypeConfigGen,
|
||||
transform_query: TransformQuery,
|
||||
sub: EventsSubQuerySettings,
|
||||
log_level: String,
|
||||
ctx: Arc<ReqCtx>,
|
||||
open_bytes: OpenBoxedBytesStreamsBox,
|
||||
) -> Result<impl Stream<Item = Sitemty<ChannelEvents>>, Error> {
|
||||
let stream = timebinnable_stream_sf_databuffer_box_events(
|
||||
range,
|
||||
one_before_range,
|
||||
ch_conf,
|
||||
transform_query,
|
||||
sub,
|
||||
log_level,
|
||||
ctx,
|
||||
open_bytes,
|
||||
)
|
||||
.await?;
|
||||
// let stream = stream.map(|x| x);
|
||||
let stream = stream.map(move |k| {
|
||||
on_sitemty_data!(k, |k| {
|
||||
// let k: Box<dyn Collectable> = Box::new(k);
|
||||
Ok(StreamItem::DataItem(RangeCompletableItem::Data(ChannelEvents::Events(
|
||||
k,
|
||||
))))
|
||||
})
|
||||
});
|
||||
Ok(stream)
|
||||
}
|
||||
|
||||
pub struct TimeBinnableStream {
|
||||
make_stream_fut: Option<Pin<Box<dyn Future<Output = Result<TimeBinnableStreamBox, Error>> + Send>>>,
|
||||
stream: Option<Pin<Box<dyn Stream<Item = Sitemty<Box<dyn TimeBinnable>>> + Send>>>,
|
||||
@@ -253,7 +312,7 @@ impl TimeBinnableStream {
|
||||
ctx: Arc<ReqCtx>,
|
||||
open_bytes: OpenBoxedBytesStreamsBox,
|
||||
) -> Self {
|
||||
let fut = timebinnable_stream(
|
||||
let fut = timebinnable_stream_sf_databuffer_binnable_box(
|
||||
range,
|
||||
one_before_range,
|
||||
ch_conf,
|
||||
@@ -313,23 +372,13 @@ async fn timebinned_stream(
|
||||
ch_conf: ChannelTypeConfigGen,
|
||||
ctx: &ReqCtx,
|
||||
open_bytes: OpenBoxedBytesStreamsBox,
|
||||
cache_read_provider: Option<Arc<dyn CacheReadProvider>>,
|
||||
events_read_provider: Option<Arc<dyn EventsReadProvider>>,
|
||||
cache_read_provider: Arc<dyn CacheReadProvider>,
|
||||
events_read_provider: Arc<dyn EventsReadProvider>,
|
||||
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<Box<dyn TimeBinned>>> + Send>>, Error> {
|
||||
use netpod::query::CacheUsage;
|
||||
let cache_usage = query.cache_usage().unwrap_or(CacheUsage::V0NoCache);
|
||||
match (
|
||||
ch_conf.series(),
|
||||
cache_usage.clone(),
|
||||
cache_read_provider,
|
||||
events_read_provider,
|
||||
) {
|
||||
(
|
||||
Some(series),
|
||||
CacheUsage::Use | CacheUsage::Recreate | CacheUsage::Ignore,
|
||||
Some(cache_read_provider),
|
||||
Some(events_read_provider),
|
||||
) => {
|
||||
match cache_usage.clone() {
|
||||
CacheUsage::Use | CacheUsage::Recreate | CacheUsage::Ignore => {
|
||||
debug!(
|
||||
"timebinned_stream caching {:?} subgrids {:?}",
|
||||
query,
|
||||
@@ -351,8 +400,6 @@ async fn timebinned_stream(
|
||||
EventsSubQuerySettings::from(&query),
|
||||
query.log_level().into(),
|
||||
Arc::new(ctx.clone()),
|
||||
open_bytes.clone(),
|
||||
series,
|
||||
binned_range.binned_range_time(),
|
||||
do_time_weight,
|
||||
bin_len_layers,
|
||||
@@ -372,7 +419,7 @@ async fn timebinned_stream(
|
||||
let range = binned_range.binned_range_time().to_nano_range();
|
||||
let do_time_weight = true;
|
||||
let one_before_range = true;
|
||||
let stream = timebinnable_stream(
|
||||
let stream = timebinnable_stream_sf_databuffer_binnable_box(
|
||||
range,
|
||||
one_before_range,
|
||||
ch_conf,
|
||||
@@ -412,8 +459,8 @@ pub async fn timebinned_json(
|
||||
ch_conf: ChannelTypeConfigGen,
|
||||
ctx: &ReqCtx,
|
||||
open_bytes: OpenBoxedBytesStreamsBox,
|
||||
cache_read_provider: Option<Arc<dyn CacheReadProvider>>,
|
||||
events_read_provider: Option<Arc<dyn EventsReadProvider>>,
|
||||
cache_read_provider: Arc<dyn CacheReadProvider>,
|
||||
events_read_provider: Arc<dyn EventsReadProvider>,
|
||||
) -> Result<CollectResult<JsonValue>, Error> {
|
||||
let deadline = Instant::now()
|
||||
+ query
|
||||
@@ -486,8 +533,8 @@ pub async fn timebinned_json_framed(
|
||||
ch_conf: ChannelTypeConfigGen,
|
||||
ctx: &ReqCtx,
|
||||
open_bytes: OpenBoxedBytesStreamsBox,
|
||||
cache_read_provider: Option<Arc<dyn CacheReadProvider>>,
|
||||
events_read_provider: Option<Arc<dyn EventsReadProvider>>,
|
||||
cache_read_provider: Arc<dyn CacheReadProvider>,
|
||||
events_read_provider: Arc<dyn EventsReadProvider>,
|
||||
) -> Result<JsonStream, Error> {
|
||||
trace!("timebinned_json_framed");
|
||||
let binned_range = query.covering_range()?;
|
||||
|
||||
Reference in New Issue
Block a user