Before refactor common item traits

This commit is contained in:
Dominik Werder
2022-11-29 14:37:22 +01:00
parent 75d492ff85
commit 075bdd05bd
10 changed files with 66 additions and 18 deletions

View File

@@ -41,5 +41,6 @@ bitshuffle = { path = "../bitshuffle" }
dbconn = { path = "../dbconn" }
parse = { path = "../parse" }
items = { path = "../items" }
items_2 = { path = "../items_2" }
streams = { path = "../streams" }
httpclient = { path = "../httpclient" }

View File

@@ -52,6 +52,16 @@ pub async fn chconf_from_database(channel: &Channel, ncc: &NodeConfigCached) ->
return Ok(ret);
}
}
if channel.backend() == "test-disk-databuffer" {
if channel.name() == "scalar-i32-be" {
let ret = ChConf {
series: 1,
scalar_type: ScalarType::I32,
shape: Shape::Scalar,
};
return Ok(ret);
}
}
// TODO use a common already running worker pool for these queries:
let dbconf = &ncc.node_config.cluster.database;
let dburl = format!(

View File

@@ -418,9 +418,14 @@ impl FrameType for EventQueryJsonStringFrame {
}
}
pub trait EventsNodeProcessorOutput:
Send + Unpin + DeserializeOwned + WithTimestamps + TimeBinnableType + ByteEstimate
{
}
pub trait EventsNodeProcessor: Send + Unpin {
type Input;
type Output: Send + Unpin + DeserializeOwned + WithTimestamps + TimeBinnableType + ByteEstimate;
type Output: EventsNodeProcessorOutput;
fn create(shape: Shape, agg_kind: AggKind) -> Self;
fn process(&self, inp: Self::Input) -> Self::Output;
}

View File

@@ -3,9 +3,9 @@ use crate::numops::NumOps;
use crate::streams::{Collectable, Collector};
use crate::{
pulse_offs_from_abs, ts_offs_from_abs, Appendable, ByteEstimate, Clearable, EventAppendable, EventsDyn,
FilterFittingInside, Fits, FitsInside, FrameType, FrameTypeInnerStatic, NewEmpty, PushableIndex, RangeOverlapInfo,
ReadPbv, ReadableFromFile, TimeBinnableDyn, TimeBinnableType, TimeBinnableTypeAggregator, TimeBinnerDyn, WithLen,
WithTimestamps,
EventsNodeProcessorOutput, FilterFittingInside, Fits, FitsInside, FrameType, FrameTypeInnerStatic, NewEmpty,
PushableIndex, RangeOverlapInfo, ReadPbv, ReadableFromFile, TimeBinnableDyn, TimeBinnableType,
TimeBinnableTypeAggregator, TimeBinnerDyn, WithLen, WithTimestamps,
};
use err::Error;
use netpod::log::*;
@@ -822,3 +822,5 @@ impl<NTY: NumOps + 'static> TimeBinnerDyn for ScalarEventsTimeBinner<NTY> {
}
}
}
impl<NTY> EventsNodeProcessorOutput for ScalarEvents<NTY> where NTY: NumOps {}

View File

@@ -1,8 +1,8 @@
use crate::streams::{Collectable, Collector};
use crate::{
ts_offs_from_abs, Appendable, ByteEstimate, Clearable, EventAppendable, FilterFittingInside, Fits, FitsInside,
FrameType, FrameTypeInnerStatic, NewEmpty, PushableIndex, RangeOverlapInfo, ReadPbv, ReadableFromFile,
TimeBinnableType, TimeBinnableTypeAggregator, WithLen, WithTimestamps,
ts_offs_from_abs, Appendable, ByteEstimate, Clearable, EventAppendable, EventsNodeProcessorOutput,
FilterFittingInside, Fits, FitsInside, FrameType, FrameTypeInnerStatic, NewEmpty, PushableIndex, RangeOverlapInfo,
ReadPbv, ReadableFromFile, TimeBinnableType, TimeBinnableTypeAggregator, WithLen, WithTimestamps,
};
use err::Error;
use netpod::log::*;
@@ -420,3 +420,5 @@ impl EventAppendable for StatsEvents {
ret
}
}
impl EventsNodeProcessorOutput for StatsEvents {}

View File

@@ -3,9 +3,10 @@ use crate::numops::NumOps;
use crate::xbinnedscalarevents::XBinnedScalarEvents;
use crate::xbinnedwaveevents::XBinnedWaveEvents;
use crate::{
Appendable, ByteEstimate, Clearable, EventAppendable, EventsDyn, EventsNodeProcessor, FilterFittingInside, Fits,
FitsInside, FrameType, FrameTypeInnerStatic, NewEmpty, PushableIndex, RangeOverlapInfo, ReadPbv, ReadableFromFile,
SubFrId, TimeBinnableDyn, TimeBinnableType, TimeBinnableTypeAggregator, WithLen, WithTimestamps,
Appendable, ByteEstimate, Clearable, EventAppendable, EventsDyn, EventsNodeProcessor, EventsNodeProcessorOutput,
FilterFittingInside, Fits, FitsInside, FrameType, FrameTypeInnerStatic, NewEmpty, PushableIndex, RangeOverlapInfo,
ReadPbv, ReadableFromFile, SubFrId, TimeBinnableDyn, TimeBinnableType, TimeBinnableTypeAggregator, WithLen,
WithTimestamps,
};
use err::Error;
use netpod::log::*;
@@ -533,3 +534,5 @@ impl<NTY: NumOps> EventsDyn for WaveEvents<NTY> {
todo!()
}
}
impl<NTY> EventsNodeProcessorOutput for WaveEvents<NTY> where NTY: NumOps {}

View File

@@ -2,9 +2,9 @@ use crate::binsdim0::MinMaxAvgDim0Bins;
use crate::numops::NumOps;
use crate::streams::{Collectable, Collector};
use crate::{
ts_offs_from_abs, Appendable, ByteEstimate, Clearable, FilterFittingInside, Fits, FitsInside, FrameType,
FrameTypeInnerStatic, NewEmpty, PushableIndex, RangeOverlapInfo, ReadPbv, ReadableFromFile, SubFrId,
TimeBinnableType, TimeBinnableTypeAggregator, WithLen, WithTimestamps,
ts_offs_from_abs, Appendable, ByteEstimate, Clearable, EventsNodeProcessorOutput, FilterFittingInside, Fits,
FitsInside, FrameType, FrameTypeInnerStatic, NewEmpty, PushableIndex, RangeOverlapInfo, ReadPbv, ReadableFromFile,
SubFrId, TimeBinnableType, TimeBinnableTypeAggregator, WithLen, WithTimestamps,
};
use err::Error;
use netpod::log::*;
@@ -501,3 +501,5 @@ where
Self::Collector::new(bin_count_exp)
}
}
impl<NTY> EventsNodeProcessorOutput for XBinnedScalarEvents<NTY> where NTY: NumOps {}

View File

@@ -2,9 +2,9 @@ use crate::binsdim1::MinMaxAvgDim1Bins;
use crate::numops::NumOps;
use crate::streams::{Collectable, Collector};
use crate::{
Appendable, ByteEstimate, Clearable, FilterFittingInside, Fits, FitsInside, FrameType, FrameTypeInnerStatic,
NewEmpty, PushableIndex, RangeOverlapInfo, ReadPbv, ReadableFromFile, SubFrId, TimeBinnableType,
TimeBinnableTypeAggregator, WithLen, WithTimestamps,
Appendable, ByteEstimate, Clearable, EventsNodeProcessorOutput, FilterFittingInside, Fits, FitsInside, FrameType,
FrameTypeInnerStatic, NewEmpty, PushableIndex, RangeOverlapInfo, ReadPbv, ReadableFromFile, SubFrId,
TimeBinnableType, TimeBinnableTypeAggregator, WithLen, WithTimestamps,
};
use err::Error;
use netpod::log::*;
@@ -532,3 +532,5 @@ where
Self::Collector::new(bin_count_exp)
}
}
impl<NTY> EventsNodeProcessorOutput for XBinnedWaveEvents<NTY> where NTY: NumOps {}

View File

@@ -325,6 +325,26 @@ pub trait TimeBinnable: fmt::Debug + WithLen + RangeOverlapInfo + Any + Send {
fn to_box_to_json_result(&self) -> Box<dyn ToJsonResult>;
}
// Helper trait to bridge between impls of event containers during refactoring.
// TODO get rid when no longer needed.
pub trait IntoEvents {
fn into_events(self) -> Box<dyn Events>;
}
impl<NTY> IntoEvents for items::scalarevents::ScalarEvents<NTY>
where
NTY: ScalarOps,
{
fn into_events(self) -> Box<dyn Events> {
let ret = items_2::eventsdim0::EventsDim0::<NTY> {
tss: self.tss.into(),
pulses: self.pulses.into(),
values: self.values.into(),
};
Box::new(ret)
}
}
// TODO can I remove the Any bound?
/// Container of some form of events, for use as trait object.

View File

@@ -196,7 +196,7 @@ async fn events_conn_handler_inner_try(
let e = Error::with_msg_no_trace("archapp not built");
return Err((e, netout))?;
} else {
match evq.agg_kind {
let stream = match evq.agg_kind {
AggKind::EventBlobs => match disk::raw::conn::make_event_blobs_pipe(&evq, node_config).await {
Ok(j) => j,
Err(e) => return Err((e, netout))?,
@@ -205,7 +205,8 @@ async fn events_conn_handler_inner_try(
Ok(j) => j,
Err(e) => return Err((e, netout))?,
},
}
};
stream
};
let mut buf_len_histo = HistoLog2::new(5);
while let Some(item) = p1.next().await {