diff --git a/disk/Cargo.toml b/disk/Cargo.toml index eaaa6ba..deeea27 100644 --- a/disk/Cargo.toml +++ b/disk/Cargo.toml @@ -41,5 +41,6 @@ bitshuffle = { path = "../bitshuffle" } dbconn = { path = "../dbconn" } parse = { path = "../parse" } items = { path = "../items" } +items_2 = { path = "../items_2" } streams = { path = "../streams" } httpclient = { path = "../httpclient" } diff --git a/httpret/src/channelconfig.rs b/httpret/src/channelconfig.rs index 038f80d..8d4a5eb 100644 --- a/httpret/src/channelconfig.rs +++ b/httpret/src/channelconfig.rs @@ -52,6 +52,16 @@ pub async fn chconf_from_database(channel: &Channel, ncc: &NodeConfigCached) -> return Ok(ret); } } + if channel.backend() == "test-disk-databuffer" { + if channel.name() == "scalar-i32-be" { + let ret = ChConf { + series: 1, + scalar_type: ScalarType::I32, + shape: Shape::Scalar, + }; + return Ok(ret); + } + } // TODO use a common already running worker pool for these queries: let dbconf = &ncc.node_config.cluster.database; let dburl = format!( diff --git a/items/src/items.rs b/items/src/items.rs index be9c105..e93c1cd 100644 --- a/items/src/items.rs +++ b/items/src/items.rs @@ -418,9 +418,14 @@ impl FrameType for EventQueryJsonStringFrame { } } +pub trait EventsNodeProcessorOutput: + Send + Unpin + DeserializeOwned + WithTimestamps + TimeBinnableType + ByteEstimate +{ +} + pub trait EventsNodeProcessor: Send + Unpin { type Input; - type Output: Send + Unpin + DeserializeOwned + WithTimestamps + TimeBinnableType + ByteEstimate; + type Output: EventsNodeProcessorOutput; fn create(shape: Shape, agg_kind: AggKind) -> Self; fn process(&self, inp: Self::Input) -> Self::Output; } diff --git a/items/src/scalarevents.rs b/items/src/scalarevents.rs index ba612d4..499caee 100644 --- a/items/src/scalarevents.rs +++ b/items/src/scalarevents.rs @@ -3,9 +3,9 @@ use crate::numops::NumOps; use crate::streams::{Collectable, Collector}; use crate::{ pulse_offs_from_abs, ts_offs_from_abs, Appendable, ByteEstimate, Clearable, EventAppendable, EventsDyn, - FilterFittingInside, Fits, FitsInside, FrameType, FrameTypeInnerStatic, NewEmpty, PushableIndex, RangeOverlapInfo, - ReadPbv, ReadableFromFile, TimeBinnableDyn, TimeBinnableType, TimeBinnableTypeAggregator, TimeBinnerDyn, WithLen, - WithTimestamps, + EventsNodeProcessorOutput, FilterFittingInside, Fits, FitsInside, FrameType, FrameTypeInnerStatic, NewEmpty, + PushableIndex, RangeOverlapInfo, ReadPbv, ReadableFromFile, TimeBinnableDyn, TimeBinnableType, + TimeBinnableTypeAggregator, TimeBinnerDyn, WithLen, WithTimestamps, }; use err::Error; use netpod::log::*; @@ -822,3 +822,5 @@ impl TimeBinnerDyn for ScalarEventsTimeBinner { } } } + +impl EventsNodeProcessorOutput for ScalarEvents where NTY: NumOps {} diff --git a/items/src/statsevents.rs b/items/src/statsevents.rs index 5bf30e4..6431d10 100644 --- a/items/src/statsevents.rs +++ b/items/src/statsevents.rs @@ -1,8 +1,8 @@ use crate::streams::{Collectable, Collector}; use crate::{ - ts_offs_from_abs, Appendable, ByteEstimate, Clearable, EventAppendable, FilterFittingInside, Fits, FitsInside, - FrameType, FrameTypeInnerStatic, NewEmpty, PushableIndex, RangeOverlapInfo, ReadPbv, ReadableFromFile, - TimeBinnableType, TimeBinnableTypeAggregator, WithLen, WithTimestamps, + ts_offs_from_abs, Appendable, ByteEstimate, Clearable, EventAppendable, EventsNodeProcessorOutput, + FilterFittingInside, Fits, FitsInside, FrameType, FrameTypeInnerStatic, NewEmpty, PushableIndex, RangeOverlapInfo, + ReadPbv, ReadableFromFile, TimeBinnableType, TimeBinnableTypeAggregator, WithLen, WithTimestamps, }; use err::Error; use netpod::log::*; @@ -420,3 +420,5 @@ impl EventAppendable for StatsEvents { ret } } + +impl EventsNodeProcessorOutput for StatsEvents {} diff --git a/items/src/waveevents.rs b/items/src/waveevents.rs index 9cf9b57..d22ca72 100644 --- a/items/src/waveevents.rs +++ b/items/src/waveevents.rs @@ -3,9 +3,10 @@ use crate::numops::NumOps; use crate::xbinnedscalarevents::XBinnedScalarEvents; use crate::xbinnedwaveevents::XBinnedWaveEvents; use crate::{ - Appendable, ByteEstimate, Clearable, EventAppendable, EventsDyn, EventsNodeProcessor, FilterFittingInside, Fits, - FitsInside, FrameType, FrameTypeInnerStatic, NewEmpty, PushableIndex, RangeOverlapInfo, ReadPbv, ReadableFromFile, - SubFrId, TimeBinnableDyn, TimeBinnableType, TimeBinnableTypeAggregator, WithLen, WithTimestamps, + Appendable, ByteEstimate, Clearable, EventAppendable, EventsDyn, EventsNodeProcessor, EventsNodeProcessorOutput, + FilterFittingInside, Fits, FitsInside, FrameType, FrameTypeInnerStatic, NewEmpty, PushableIndex, RangeOverlapInfo, + ReadPbv, ReadableFromFile, SubFrId, TimeBinnableDyn, TimeBinnableType, TimeBinnableTypeAggregator, WithLen, + WithTimestamps, }; use err::Error; use netpod::log::*; @@ -533,3 +534,5 @@ impl EventsDyn for WaveEvents { todo!() } } + +impl EventsNodeProcessorOutput for WaveEvents where NTY: NumOps {} diff --git a/items/src/xbinnedscalarevents.rs b/items/src/xbinnedscalarevents.rs index 6f616d5..0b9182c 100644 --- a/items/src/xbinnedscalarevents.rs +++ b/items/src/xbinnedscalarevents.rs @@ -2,9 +2,9 @@ use crate::binsdim0::MinMaxAvgDim0Bins; use crate::numops::NumOps; use crate::streams::{Collectable, Collector}; use crate::{ - ts_offs_from_abs, Appendable, ByteEstimate, Clearable, FilterFittingInside, Fits, FitsInside, FrameType, - FrameTypeInnerStatic, NewEmpty, PushableIndex, RangeOverlapInfo, ReadPbv, ReadableFromFile, SubFrId, - TimeBinnableType, TimeBinnableTypeAggregator, WithLen, WithTimestamps, + ts_offs_from_abs, Appendable, ByteEstimate, Clearable, EventsNodeProcessorOutput, FilterFittingInside, Fits, + FitsInside, FrameType, FrameTypeInnerStatic, NewEmpty, PushableIndex, RangeOverlapInfo, ReadPbv, ReadableFromFile, + SubFrId, TimeBinnableType, TimeBinnableTypeAggregator, WithLen, WithTimestamps, }; use err::Error; use netpod::log::*; @@ -501,3 +501,5 @@ where Self::Collector::new(bin_count_exp) } } + +impl EventsNodeProcessorOutput for XBinnedScalarEvents where NTY: NumOps {} diff --git a/items/src/xbinnedwaveevents.rs b/items/src/xbinnedwaveevents.rs index a2550fb..5020aec 100644 --- a/items/src/xbinnedwaveevents.rs +++ b/items/src/xbinnedwaveevents.rs @@ -2,9 +2,9 @@ use crate::binsdim1::MinMaxAvgDim1Bins; use crate::numops::NumOps; use crate::streams::{Collectable, Collector}; use crate::{ - Appendable, ByteEstimate, Clearable, FilterFittingInside, Fits, FitsInside, FrameType, FrameTypeInnerStatic, - NewEmpty, PushableIndex, RangeOverlapInfo, ReadPbv, ReadableFromFile, SubFrId, TimeBinnableType, - TimeBinnableTypeAggregator, WithLen, WithTimestamps, + Appendable, ByteEstimate, Clearable, EventsNodeProcessorOutput, FilterFittingInside, Fits, FitsInside, FrameType, + FrameTypeInnerStatic, NewEmpty, PushableIndex, RangeOverlapInfo, ReadPbv, ReadableFromFile, SubFrId, + TimeBinnableType, TimeBinnableTypeAggregator, WithLen, WithTimestamps, }; use err::Error; use netpod::log::*; @@ -532,3 +532,5 @@ where Self::Collector::new(bin_count_exp) } } + +impl EventsNodeProcessorOutput for XBinnedWaveEvents where NTY: NumOps {} diff --git a/items_2/src/items_2.rs b/items_2/src/items_2.rs index 456361c..05db162 100644 --- a/items_2/src/items_2.rs +++ b/items_2/src/items_2.rs @@ -325,6 +325,26 @@ pub trait TimeBinnable: fmt::Debug + WithLen + RangeOverlapInfo + Any + Send { fn to_box_to_json_result(&self) -> Box; } +// Helper trait to bridge between impls of event containers during refactoring. +// TODO get rid when no longer needed. +pub trait IntoEvents { + fn into_events(self) -> Box; +} + +impl IntoEvents for items::scalarevents::ScalarEvents +where + NTY: ScalarOps, +{ + fn into_events(self) -> Box { + let ret = items_2::eventsdim0::EventsDim0:: { + tss: self.tss.into(), + pulses: self.pulses.into(), + values: self.values.into(), + }; + Box::new(ret) + } +} + // TODO can I remove the Any bound? /// Container of some form of events, for use as trait object. diff --git a/nodenet/src/conn.rs b/nodenet/src/conn.rs index bc74fed..6d0a55d 100644 --- a/nodenet/src/conn.rs +++ b/nodenet/src/conn.rs @@ -196,7 +196,7 @@ async fn events_conn_handler_inner_try( let e = Error::with_msg_no_trace("archapp not built"); return Err((e, netout))?; } else { - match evq.agg_kind { + let stream = match evq.agg_kind { AggKind::EventBlobs => match disk::raw::conn::make_event_blobs_pipe(&evq, node_config).await { Ok(j) => j, Err(e) => return Err((e, netout))?, @@ -205,7 +205,8 @@ async fn events_conn_handler_inner_try( Ok(j) => j, Err(e) => return Err((e, netout))?, }, - } + }; + stream }; let mut buf_len_histo = HistoLog2::new(5); while let Some(item) = p1.next().await {