diff --git a/archapp/src/archeng/pipe.rs b/archapp/src/archeng/pipe.rs index c3374d8..dbe1cde 100644 --- a/archapp/src/archeng/pipe.rs +++ b/archapp/src/archeng/pipe.rs @@ -19,7 +19,7 @@ pub async fn make_event_pipe( evq: &RawEventsQuery, node: NodeConfigCached, conf: ChannelArchiver, -) -> Result> + Send>>, Error> { +) -> Result> + Send>>, Error> { debug!("make_event_pipe {:?}", evq); let channel_config = { let q = ChannelConfigQuery { diff --git a/archapp/src/events.rs b/archapp/src/events.rs index f4fbcb2..6b69b2c 100644 --- a/archapp/src/events.rs +++ b/archapp/src/events.rs @@ -53,8 +53,9 @@ pub fn parse_data_filename(s: &str) -> Result { Ok(ret) } +// TODO do we need Send here? pub trait FrameMakerTrait: Send { - fn make_frame(&mut self, ei: Sitemty) -> Box; + fn make_frame(&mut self, ei: Sitemty) -> Box; } pub struct FrameMaker { @@ -176,7 +177,7 @@ macro_rules! arm2 { }, Err(e) => Err(e), }; - Box::new(ret) as Box + Box::new(ret) as Box }}; } @@ -302,7 +303,7 @@ macro_rules! arm1 { } impl FrameMakerTrait for FrameMaker { - fn make_frame(&mut self, item: Sitemty) -> Box { + fn make_frame(&mut self, item: Sitemty) -> Box { let scalar_type = &self.scalar_type; let shape = &self.shape; let agg_kind = &self.agg_kind; @@ -323,7 +324,7 @@ impl FrameMakerTrait for FrameMaker { pub async fn make_event_pipe( evq: &RawEventsQuery, aa: &ArchiverAppliance, -) -> Result> + Send>>, Error> { +) -> Result> + Send>>, Error> { let ci = channel_info(&evq.channel, aa).await?; let mut inps = vec![]; let mut names = vec![]; diff --git a/archapp_wrap/src/lib.rs b/archapp_wrap/src/lib.rs index 5e7d090..3ebb1fe 100644 --- a/archapp_wrap/src/lib.rs +++ b/archapp_wrap/src/lib.rs @@ -36,7 +36,7 @@ pub fn scan_files_insert( pub async fn make_event_pipe( evq: &RawEventsQuery, aa: &ArchiverAppliance, -) -> Result> + Send>>, Error> { +) -> Result> + Send>>, Error> { archapp::events::make_event_pipe(evq, aa).await } diff --git a/daqbufp2/src/client.rs b/daqbufp2/src/client.rs index f4654fe..fd050cb 100644 --- a/daqbufp2/src/client.rs +++ b/daqbufp2/src/client.rs @@ -7,7 +7,7 @@ use futures_util::TryStreamExt; use http::StatusCode; use hyper::Body; use items::xbinnedwaveevents::XBinnedWaveEvents; -use items::{FrameType, Sitemty, StreamItem}; +use items::{Sitemty, StreamItem}; use netpod::query::{BinnedQuery, CacheUsage}; use netpod::{log::*, AppendToUrl}; use netpod::{AggKind, ByteSize, Channel, HostPort, NanoRange, PerfOpts, APP_OCTET}; @@ -113,7 +113,9 @@ pub async fn get_binned( // The expected type nowadays depends on the channel and agg-kind. err::todo(); type ExpectedType = Sitemty>; - let type_id_exp = ::FRAME_TYPE_ID; + // TODO the non-data variants of Sitemty no longer carry a frame id. + //let type_id_exp = ::FRAME_TYPE_ID; + let type_id_exp: u32 = err::todoval(); if frame.tyid() != type_id_exp { error!("unexpected type id got {} exp {}", frame.tyid(), type_id_exp); } diff --git a/daqbufp2/src/test/binnedbinary.rs b/daqbufp2/src/test/binnedbinary.rs index 37cf075..7f36f8f 100644 --- a/daqbufp2/src/test/binnedbinary.rs +++ b/daqbufp2/src/test/binnedbinary.rs @@ -8,7 +8,7 @@ use futures_util::{StreamExt, TryStreamExt}; use http::StatusCode; use hyper::Body; use items::binsdim0::MinMaxAvgDim0Bins; -use items::{FrameType, RangeCompletableItem, Sitemty, StatsItem, StreamItem, SubFrId, WithLen}; +use items::{RangeCompletableItem, Sitemty, StatsItem, StreamItem, SubFrId, WithLen}; use netpod::query::{BinnedQuery, CacheUsage}; use netpod::{log::*, AppendToUrl}; use netpod::{AggKind, Channel, Cluster, HostPort, NanoRange, PerfOpts, APP_OCTET}; @@ -195,7 +195,9 @@ where None } StreamItem::DataItem(frame) => { - if frame.tyid() != > as FrameType>::FRAME_TYPE_ID { + // TODO non-data Sitety no longer carry frame id: + //if frame.tyid() != > as FrameType>::FRAME_TYPE_ID { + if frame.tyid() != err::todoval::() { error!("test receives unexpected tyid {:x}", frame.tyid()); } match bincode::deserialize::>>(frame.buf()) { diff --git a/daqbufp2/src/test/events.rs b/daqbufp2/src/test/events.rs index c5a2ef0..c8d3372 100644 --- a/daqbufp2/src/test/events.rs +++ b/daqbufp2/src/test/events.rs @@ -10,7 +10,7 @@ use http::StatusCode; use hyper::Body; use items::numops::NumOps; use items::scalarevents::ScalarEvents; -use items::{FrameType, RangeCompletableItem, Sitemty, StatsItem, StreamItem, WithLen}; +use items::{RangeCompletableItem, Sitemty, StatsItem, StreamItem, WithLen}; use netpod::log::*; use netpod::{Channel, Cluster, HostPort, NanoRange, PerfOpts, APP_JSON, APP_OCTET}; use serde_json::Value as JsonValue; @@ -150,7 +150,9 @@ where None } StreamItem::DataItem(frame) => { - if frame.tyid() != > as FrameType>::FRAME_TYPE_ID { + // TODO the non-data variants of Sitemty no longer carry frame type id: + //if frame.tyid() != > as FrameType>::FRAME_TYPE_ID { + if frame.tyid() != err::todoval::() { error!("test receives unexpected tyid {:x}", frame.tyid()); None } else { diff --git a/dbconn/src/bincache.rs b/dbconn/src/bincache.rs index 049e45d..e6531c4 100644 --- a/dbconn/src/bincache.rs +++ b/dbconn/src/bincache.rs @@ -1,8 +1,8 @@ -use crate::events_scylla::ScyllaFramableStream; +use crate::events_scylla::EventsStreamScylla; use crate::ErrConv; use err::Error; use futures_util::{Future, Stream, StreamExt}; -use items::TimeBinned; +use items::{TimeBinnableDyn, TimeBinned}; use netpod::log::*; use netpod::query::RawEventsQuery; use netpod::{ @@ -142,13 +142,15 @@ pub async fn fetch_uncached_binned_events( // TODO ask Scylla directly, do not go through HTTP. // Refactor the event fetch stream code such that I can use that easily here. let evq = RawEventsQuery::new(chn.channel.clone(), range.clone(), AggKind::Plain); - let _res = Box::pin(ScyllaFramableStream::new( + let _res = Box::pin(EventsStreamScylla::new( &evq, chn.scalar_type.clone(), chn.shape.clone(), scy, false, )); + // TODO add the time binner. + // TODO return the result of the binning procedure. // TODO ScyllaFramableStream must return a new events trait object designed for trait object use. err::todoval() } diff --git a/dbconn/src/events_scylla.rs b/dbconn/src/events_scylla.rs index 5044606..136af75 100644 --- a/dbconn/src/events_scylla.rs +++ b/dbconn/src/events_scylla.rs @@ -1,14 +1,12 @@ +use crate::ErrConv; use err::Error; -use futures_core::{Future, Stream}; -use futures_util::FutureExt; +use futures_util::{Future, FutureExt, Stream}; use items::scalarevents::ScalarEvents; use items::waveevents::WaveEvents; -use items::{Framable, RangeCompletableItem, StreamItem}; +use items::{EventsDyn, RangeCompletableItem, Sitemty, StreamItem}; use netpod::log::*; use netpod::query::RawEventsQuery; use netpod::{Database, NanoRange, ScalarType, ScyllaConfig, Shape}; -use scylla::frame::response::cql_to_rust::FromRowError as ScyFromRowError; -use scylla::transport::errors::{NewSessionError as ScyNewSessionError, QueryError as ScyQueryError}; use scylla::Session as ScySession; use std::collections::VecDeque; use std::pin::Pin; @@ -16,65 +14,20 @@ use std::sync::Arc; use std::task::{Context, Poll}; use tokio_postgres::Client as PgClient; -trait ErrConv { - fn err_conv(self) -> Result; -} - -impl ErrConv for Result { - fn err_conv(self) -> Result { - match self { - Ok(k) => Ok(k), - Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))), - } - } -} - -impl ErrConv for Result { - fn err_conv(self) -> Result { - match self { - Ok(k) => Ok(k), - Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))), - } - } -} - -impl ErrConv for Result { - fn err_conv(self) -> Result { - match self { - Ok(k) => Ok(k), - Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))), - } - } -} - -impl ErrConv for Result { - fn err_conv(self) -> Result { - match self { - Ok(k) => Ok(k), - Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))), - } - } -} - macro_rules! impl_read_values_fut { ($fname:ident, $self:expr, $ts_msp:expr) => {{ let fut = $fname($self.series, $ts_msp, $self.range.clone(), $self.scy.clone()); let fut = fut.map(|x| { - let x2 = match x { + match x { Ok(k) => { - // - Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))) + // TODO why static needed? + let b = Box::new(k) as Box; + Ok(b) } - Err(e) => { - // - Err(e) - } - }; - //Box::new(Ok(StreamItem::DataItem(RangeCompletableItem::Data(x)))) as Box}); - let ret = Box::new(x2) as Box; - ret + Err(e) => Err(e), + } }); - let fut = Box::pin(fut) as Pin> + Send>>; + let fut = Box::pin(fut) as Pin, Error>> + Send>>; fut }}; } @@ -85,7 +38,7 @@ struct ReadValues { shape: Shape, range: NanoRange, ts_msp: VecDeque, - fut: Pin> + Send>>, + fut: Pin, Error>> + Send>>, scy: Arc, } @@ -122,7 +75,7 @@ impl ReadValues { &mut self, ts_msp: u64, _has_more_msp: bool, - ) -> Pin> + Send>> { + ) -> Pin, Error>> + Send>> { // TODO this also needs to differentiate on Shape. let fut = match &self.shape { Shape::Scalar => match &self.scalar_type { @@ -156,7 +109,7 @@ enum FrState { Done, } -pub struct ScyllaFramableStream { +pub struct EventsStreamScylla { state: FrState, #[allow(unused)] evq: RawEventsQuery, @@ -168,7 +121,7 @@ pub struct ScyllaFramableStream { do_test_stream_error: bool, } -impl ScyllaFramableStream { +impl EventsStreamScylla { pub fn new( evq: &RawEventsQuery, scalar_type: ScalarType, @@ -189,17 +142,15 @@ impl ScyllaFramableStream { } } -impl Stream for ScyllaFramableStream { - type Item = Box; +impl Stream for EventsStreamScylla { + type Item = Sitemty>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; if self.do_test_stream_error { let e = Error::with_msg(format!("Test PRIVATE STREAM error.")) .add_public_msg(format!("Test PUBLIC STREAM error.")); - return Ready(Some( - Box::new(Err::>>, _>(e)) as _, - )); + return Ready(Some(Err(e))); } loop { break match self.state { @@ -231,21 +182,20 @@ impl Stream for ScyllaFramableStream { } Ready(Err(e)) => { self.state = FrState::Done; - Ready(Some(Box::new( - Err(e) as Result>>, _> - ))) + Ready(Some(Err(e))) } Pending => Pending, }, FrState::ReadValues(ref mut st) => match st.fut.poll_unpin(cx) { - Ready(item) => { + Ready(Ok(item)) => { if st.next() { } else { info!("ReadValues exhausted"); self.state = FrState::Done; } - Ready(Some(item)) + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))))) } + Ready(Err(e)) => Ready(Some(Err(e))), Pending => Pending, }, FrState::Done => Ready(None), @@ -408,7 +358,7 @@ pub async fn make_scylla_stream( scyco: &ScyllaConfig, dbconf: Database, do_test_stream_error: bool, -) -> Result> + Send>>, Error> { +) -> Result>> + Send>>, Error> { info!("make_scylla_stream open scylla connection"); // TODO should RawEventsQuery already contain ScalarType and Shape? let (scalar_type, shape) = { @@ -430,7 +380,7 @@ pub async fn make_scylla_stream( .await .err_conv()?; let scy = Arc::new(scy); - let res = Box::pin(ScyllaFramableStream::new( + let res = Box::pin(EventsStreamScylla::new( evq, scalar_type, shape, @@ -439,62 +389,3 @@ pub async fn make_scylla_stream( )) as _; Ok(res) } - -#[allow(unused)] -async fn _make_scylla_stream_2( - evq: &RawEventsQuery, - scyco: &ScyllaConfig, -) -> Result> + Send>>, Error> { - // Find the "series" id. - info!("make_scylla_stream finding series id"); - let scy = scylla::SessionBuilder::new() - .known_nodes(&scyco.hosts) - .use_keyspace(&scyco.keyspace, true) - .build() - .await - .err_conv()?; - let res = { - let cql = - "select series, scalar_type, shape_dims from series_by_channel where facility = ? and channel_name = ?"; - scy.query(cql, (&evq.channel.backend, evq.channel.name())) - .await - .err_conv()? - }; - let rows: Vec<_> = res.rows_typed_or_empty::<(i64, i32, Vec)>().collect(); - if rows.len() > 1 { - error!("Multiple series found for channel, can not return data for ambiguous series"); - return Err(Error::with_public_msg_no_trace( - "Multiple series found for channel, can not return data for ambiguous series", - )); - } - if rows.len() < 1 { - return Err(Error::with_public_msg_no_trace( - "Multiple series found for channel, can not return data for ambiguous series", - )); - } - let row = rows - .into_iter() - .next() - .ok_or_else(|| Error::with_public_msg_no_trace(format!("can not find series for channel")))? - .err_conv()?; - info!("make_scylla_stream row {row:?}"); - let series = row.0; - info!("make_scylla_stream series {series}"); - let _expand = evq.agg_kind.need_expand(); - let range = &evq.range; - { - let cql = "select ts_msp from ts_msp where series = ? and ts_msp >= ? and ts_msp < ?"; - let res = scy - .query(cql, (series, range.beg as i64, range.end as i64)) - .await - .err_conv()?; - let mut rc = 0; - for _row in res.rows_or_empty() { - rc += 1; - } - info!("found in total {} rows", rc); - } - error!("TODO scylla fetch continue here"); - let res = Box::pin(futures_util::stream::empty()); - Ok(res) -} diff --git a/disk/src/binned/binnedfrompbv.rs b/disk/src/binned/binnedfrompbv.rs index 19df69c..6b10fb2 100644 --- a/disk/src/binned/binnedfrompbv.rs +++ b/disk/src/binned/binnedfrompbv.rs @@ -7,7 +7,7 @@ use futures_core::Stream; use futures_util::{FutureExt, StreamExt}; use http::{StatusCode, Uri}; use items::frame::decode_frame; -use items::{FrameType, RangeCompletableItem, Sitemty, StreamItem, TimeBinnableType}; +use items::{FrameType, FrameTypeStatic, RangeCompletableItem, Sitemty, StreamItem, TimeBinnableType}; use netpod::log::*; use netpod::query::CacheUsage; use netpod::{ @@ -32,7 +32,10 @@ pub struct FetchedPreBinned { } impl FetchedPreBinned { - pub fn new(query: &PreBinnedQuery, host: String, port: u16) -> Result { + pub fn new(query: &PreBinnedQuery, host: String, port: u16) -> Result + where + TBT: TimeBinnableType, + { // TODO should not assume http: let mut url = Url::parse(&format!("http://{host}:{port}/api/4/prebinned"))?; query.append_to_url(&mut url); @@ -50,7 +53,7 @@ impl FetchedPreBinned { impl Stream for FetchedPreBinned where - TBT: TimeBinnableType, + TBT: FrameTypeStatic + TimeBinnableType, Sitemty: FrameType + DeserializeOwned, { type Item = Sitemty; diff --git a/disk/src/binned/prebinned.rs b/disk/src/binned/prebinned.rs index fe5d4d7..f49ad0e 100644 --- a/disk/src/binned/prebinned.rs +++ b/disk/src/binned/prebinned.rs @@ -12,7 +12,8 @@ use futures_core::Stream; use futures_util::StreamExt; use items::numops::{BoolNum, NumOps, StringNum}; use items::{ - Appendable, Clearable, EventsNodeProcessor, Framable, FrameType, PushableIndex, Sitemty, TimeBinnableType, + Appendable, Clearable, EventsNodeProcessor, Framable, FrameType, PushableIndex, RangeCompletableItem, Sitemty, + SitemtyFrameType, StreamItem, TimeBinnableType, TimeBinned, }; use netpod::log::*; use netpod::{AggKind, ByteOrder, ChannelTyped, NodeConfigCached, ScalarType, Shape}; @@ -28,7 +29,7 @@ async fn make_num_pipeline_nty_end_evs_enp( _events_node_proc: ENP, query: PreBinnedQuery, node_config: &NodeConfigCached, -) -> Result> + Send>>, Error> +) -> Result>> + Send>>, Error> where NTY: NumOps + NumFromBytes + Serialize + 'static, END: Endianness + 'static, @@ -38,6 +39,7 @@ where Sitemty<::Output>: FrameType + Framable + 'static, Sitemty<<::Output as TimeBinnableType>::Output>: Framable + FrameType + DeserializeOwned, + <::Output as TimeBinnableType>::Output: SitemtyFrameType + TimeBinned, { if let Some(scyconf) = &node_config.node_config.cluster.cache_scylla { info!("~~~~~~~~~~~~~~~ make_num_pipeline_nty_end_evs_enp using scylla as cache"); @@ -50,21 +52,29 @@ where let stream = stream.map(|x| { // match x { - Ok(k) => { - let g = Box::new(k) as Box; - g - } - Err(e) => { - let u: Sitemty> = Err(e); - Box::new(u) as Box - } + Ok(k) => Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))), + Err(e) => Err(e), } }); - let stream = Box::pin(stream) as Pin> + Send>>; + let stream = Box::pin(stream) as Pin>> + Send>>; Ok(stream) } else { let ret = PreBinnedValueStream::::new(query, agg_kind, node_config); - let ret = StreamExt::map(ret, |item| Box::new(item) as Box); + let ret = StreamExt::map(ret, |item| { + // + match item { + Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))) => { + let g = Box::new(k) as Box; + Ok(StreamItem::DataItem(RangeCompletableItem::Data(g))) + } + Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) => { + Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) + } + Ok(StreamItem::Log(k)) => Ok(StreamItem::Log(k)), + Ok(StreamItem::Stats(k)) => Ok(StreamItem::Stats(k)), + Err(e) => Err(e), + } + }); Ok(Box::pin(ret)) } } @@ -75,7 +85,7 @@ async fn make_num_pipeline_nty_end( agg_kind: AggKind, query: PreBinnedQuery, node_config: &NodeConfigCached, -) -> Result> + Send>>, Error> +) -> Result>> + Send>>, Error> where NTY: NumOps + NumFromBytes + Serialize + 'static, END: Endianness + 'static, @@ -188,7 +198,7 @@ async fn make_num_pipeline( agg_kind: AggKind, query: PreBinnedQuery, node_config: &NodeConfigCached, -) -> Result> + Send>>, Error> { +) -> Result>> + Send>>, Error> { match scalar_type { ScalarType::U8 => match_end!(u8, byte_order, scalar_type, shape, agg_kind, query, node_config), ScalarType::U16 => match_end!(u16, byte_order, scalar_type, shape, agg_kind, query, node_config), diff --git a/disk/src/channelexec.rs b/disk/src/channelexec.rs index 03773b1..2cfd3f9 100644 --- a/disk/src/channelexec.rs +++ b/disk/src/channelexec.rs @@ -243,7 +243,7 @@ impl PlainEvents { } impl ChannelExecFunction for PlainEvents { - type Output = Pin> + Send>>; + type Output = Pin> + Send>>; fn exec( self, @@ -265,7 +265,7 @@ impl ChannelExecFunction for PlainEvents { // TODO let upstream provide DiskIoTune and pass in RawEventsQuery: let evq = RawEventsQuery::new(self.channel, self.range, self.agg_kind); let s = MergedFromRemotes::>::new(evq, perf_opts, self.node_config.node_config.cluster); - let s = s.map(|item| Box::new(item) as Box); + let s = s.map(|item| Box::new(item) as Box); Ok(Box::pin(s)) } diff --git a/disk/src/eventchunker.rs b/disk/src/eventchunker.rs index d4e2532..646c877 100644 --- a/disk/src/eventchunker.rs +++ b/disk/src/eventchunker.rs @@ -4,8 +4,8 @@ use bytes::{Buf, BytesMut}; use err::Error; use futures_util::{Stream, StreamExt}; use items::{ - Appendable, ByteEstimate, Clearable, PushableIndex, RangeCompletableItem, SitemtyFrameType, StatsItem, StreamItem, - WithLen, WithTimestamps, + Appendable, ByteEstimate, Clearable, FrameTypeStatic, PushableIndex, RangeCompletableItem, SitemtyFrameType, + StatsItem, StreamItem, WithLen, WithTimestamps, }; use netpod::histo::HistoLog2; use netpod::log::*; @@ -528,8 +528,19 @@ impl EventFull { } } -impl SitemtyFrameType for EventFull { +impl FrameTypeStatic for EventFull { const FRAME_TYPE_ID: u32 = items::EVENT_FULL_FRAME_TYPE_ID; + + fn from_error(_: err::Error) -> Self { + // TODO remove usage of this + panic!() + } +} + +impl SitemtyFrameType for EventFull { + fn frame_type_id(&self) -> u32 { + ::FRAME_TYPE_ID + } } impl WithLen for EventFull { diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index 4db30a2..3f1e9ce 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -12,7 +12,6 @@ use items::{EventsNodeProcessor, Framable, RangeCompletableItem, Sitemty, Stream use netpod::log::*; use netpod::query::RawEventsQuery; use netpod::{AggKind, ByteOrder, ByteSize, Channel, DiskIoTune, NanoRange, NodeConfigCached, ScalarType, Shape}; - use parse::channelconfig::{extract_matching_config_entry, read_local_config, ConfigEntry, MatchingConfigEntry}; use std::pin::Pin; @@ -20,7 +19,7 @@ fn make_num_pipeline_stream_evs( event_value_shape: EVS, events_node_proc: ENP, event_blobs: EventChunkerMultifile, -) -> Pin> + Send>> +) -> Pin> + Send>> where NTY: NumOps + NumFromBytes + 'static, END: Endianness + 'static, @@ -44,7 +43,7 @@ where }, Err(e) => Err(e), }) - .map(|item| Box::new(item) as Box); + .map(|item| Box::new(item) as Box); Box::pin(s2) } @@ -142,7 +141,7 @@ macro_rules! pipe1 { pub async fn make_event_pipe( evq: &RawEventsQuery, node_config: &NodeConfigCached, -) -> Result> + Send>>, Error> { +) -> Result> + Send>>, Error> { if false { match dbconn::channel_exists(&evq.channel, &node_config).await { Ok(_) => (), @@ -309,7 +308,7 @@ pub fn make_remote_event_blobs_stream( pub async fn make_event_blobs_pipe( evq: &RawEventsQuery, node_config: &NodeConfigCached, -) -> Result> + Send>>, Error> { +) -> Result> + Send>>, Error> { if false { match dbconn::channel_exists(&evq.channel, &node_config).await { Ok(_) => (), @@ -331,9 +330,9 @@ pub async fn make_event_blobs_pipe( evq.disk_io_tune.clone(), node_config, )?; - let s = event_blobs.map(|item| Box::new(item) as Box); + let s = event_blobs.map(|item| Box::new(item) as Box); //let s = tracing_futures::Instrumented::instrument(s, tracing::info_span!("make_event_blobs_pipe")); - let pipe: Pin> + Send>>; + let pipe: Pin> + Send>>; pipe = Box::pin(s); pipe } else { @@ -347,9 +346,9 @@ pub async fn make_event_blobs_pipe( evq.disk_io_tune.clone(), node_config, )?; - let s = event_blobs.map(|item| Box::new(item) as Box); + let s = event_blobs.map(|item| Box::new(item) as Box); //let s = tracing_futures::Instrumented::instrument(s, tracing::info_span!("make_event_blobs_pipe")); - let pipe: Pin> + Send>>; + let pipe: Pin> + Send>>; pipe = Box::pin(s); pipe }; diff --git a/disk/src/raw/eventsfromframes.rs b/disk/src/raw/eventsfromframes.rs index 38d98fe..0d3da97 100644 --- a/disk/src/raw/eventsfromframes.rs +++ b/disk/src/raw/eventsfromframes.rs @@ -2,7 +2,7 @@ use crate::frame::inmem::InMemoryFrameAsyncReadStream; use futures_core::Stream; use futures_util::StreamExt; use items::frame::decode_frame; -use items::{FrameType, Sitemty, StreamItem}; +use items::{FrameTypeStatic, Sitemty, StreamItem}; use netpod::log::*; use serde::de::DeserializeOwned; use std::marker::PhantomData; @@ -37,8 +37,7 @@ where impl Stream for EventsFromFrames where T: AsyncRead + Unpin, - I: DeserializeOwned + Unpin, - Sitemty: FrameType, + I: FrameTypeStatic + DeserializeOwned + Unpin, { type Item = Sitemty; diff --git a/err/Cargo.toml b/err/Cargo.toml index 9744a99..b90dc1b 100644 --- a/err/Cargo.toml +++ b/err/Cargo.toml @@ -9,6 +9,7 @@ backtrace = "0.3" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" serde_cbor = "0.11" +erased-serde = "0.3" async-channel = "1.6" chrono = { version = "0.4", features = ["serde"] } url = "2.2" diff --git a/err/src/lib.rs b/err/src/lib.rs index 45efe48..74815a9 100644 --- a/err/src/lib.rs +++ b/err/src/lib.rs @@ -317,9 +317,7 @@ impl nom::error::ParseError for Error { Self::with_msg(format!("ParseError kind {:?} other {:?}", kind, other)) } } -*/ -/* impl From for Error { fn from(k: JoinError) -> Self { Self::with_msg(format!("JoinError {:?}", k)) @@ -329,7 +327,7 @@ impl From for Error { impl From> for Error { fn from(k: Box) -> Self { - Self::with_msg(format!("bincode::ErrorKind {:?}", k)) + Self::with_msg(k.to_string()) } } @@ -339,6 +337,12 @@ impl From for Error { } } +impl From for Error { + fn from(k: erased_serde::Error) -> Self { + Self::with_msg(k.to_string()) + } +} + impl From for Error { fn from(k: std::fmt::Error) -> Self { Self::with_msg(k.to_string()) diff --git a/items/Cargo.toml b/items/Cargo.toml index f28310f..fda7403 100644 --- a/items/Cargo.toml +++ b/items/Cargo.toml @@ -11,6 +11,7 @@ path = "src/items.rs" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" serde_cbor = "0.11.1" +erased-serde = "0.3" bincode = "1.3.3" bytes = "1.0.1" num-traits = "0.2.14" diff --git a/items/src/binsdim0.rs b/items/src/binsdim0.rs index 996aafe..a4b4aa7 100644 --- a/items/src/binsdim0.rs +++ b/items/src/binsdim0.rs @@ -1,9 +1,9 @@ use crate::numops::NumOps; use crate::streams::{Collectable, Collector, ToJsonBytes, ToJsonResult}; use crate::{ - ts_offs_from_abs, Appendable, FilterFittingInside, Fits, FitsInside, IsoDateTime, RangeOverlapInfo, ReadPbv, - ReadableFromFile, Sitemty, SitemtyFrameType, SubFrId, TimeBinnableType, TimeBinnableTypeAggregator, TimeBins, - WithLen, + ts_offs_from_abs, Appendable, FilterFittingInside, Fits, FitsInside, FrameTypeStatic, IsoDateTime, + RangeOverlapInfo, ReadPbv, ReadableFromFile, Sitemty, SitemtyFrameType, SubFrId, TimeBinnableDyn, + TimeBinnableDynAggregator, TimeBinnableType, TimeBinnableTypeAggregator, TimeBinned, TimeBins, WithLen, }; use chrono::{TimeZone, Utc}; use err::Error; @@ -27,11 +27,25 @@ pub struct MinMaxAvgDim0Bins { pub avgs: Vec>, } +impl FrameTypeStatic for MinMaxAvgDim0Bins +where + NTY: SubFrId, +{ + const FRAME_TYPE_ID: u32 = crate::MIN_MAX_AVG_DIM_0_BINS_FRAME_TYPE_ID + NTY::SUB; + + fn from_error(_: err::Error) -> Self { + // TODO remove usage of this + panic!() + } +} + impl SitemtyFrameType for MinMaxAvgDim0Bins where NTY: SubFrId, { - const FRAME_TYPE_ID: u32 = crate::MIN_MAX_AVG_BINS + NTY::SUB; + fn frame_type_id(&self) -> u32 { + ::FRAME_TYPE_ID + } } impl fmt::Debug for MinMaxAvgDim0Bins @@ -41,7 +55,7 @@ where fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { write!( fmt, - "MinMaxAvgBins count {} ts1s {:?} ts2s {:?} counts {:?} mins {:?} maxs {:?} avgs {:?}", + "MinMaxAvgDim0Bins count {} ts1s {:?} ts2s {:?} counts {:?} mins {:?} maxs {:?} avgs {:?}", self.ts1s.len(), self.ts1s.iter().map(|k| k / SEC).collect::>(), self.ts2s.iter().map(|k| k / SEC).collect::>(), @@ -428,3 +442,25 @@ where ret } } + +impl TimeBinnableDyn for MinMaxAvgDim0Bins { + fn aggregator_new(&self) -> Box { + todo!() + } +} + +impl TimeBinned for MinMaxAvgDim0Bins { + fn as_time_binnable_dyn(&self) -> &dyn TimeBinnableDyn { + self as &dyn TimeBinnableDyn + } + + fn workaround_clone(&self) -> Box { + // TODO remove + panic!() + } + + fn dummy_test_i32(&self) -> i32 { + // TODO remove + panic!() + } +} diff --git a/items/src/binsdim1.rs b/items/src/binsdim1.rs index d4ae154..cbeb92f 100644 --- a/items/src/binsdim1.rs +++ b/items/src/binsdim1.rs @@ -2,9 +2,9 @@ use crate::numops::NumOps; use crate::streams::{Collectable, Collector, ToJsonBytes, ToJsonResult}; use crate::waveevents::WaveEvents; use crate::{ - pulse_offs_from_abs, ts_offs_from_abs, Appendable, FilterFittingInside, Fits, FitsInside, IsoDateTime, - RangeOverlapInfo, ReadPbv, ReadableFromFile, Sitemty, SitemtyFrameType, SubFrId, TimeBinnableType, - TimeBinnableTypeAggregator, TimeBins, WithLen, + pulse_offs_from_abs, ts_offs_from_abs, Appendable, FilterFittingInside, Fits, FitsInside, FrameTypeStatic, + IsoDateTime, RangeOverlapInfo, ReadPbv, ReadableFromFile, Sitemty, SitemtyFrameType, SubFrId, TimeBinnableDyn, + TimeBinnableDynAggregator, TimeBinnableType, TimeBinnableTypeAggregator, TimeBinned, TimeBins, WithLen, }; use chrono::{TimeZone, Utc}; use err::Error; @@ -27,11 +27,25 @@ pub struct MinMaxAvgDim1Bins { pub avgs: Vec>>, } -impl SitemtyFrameType for MinMaxAvgDim1Bins +impl FrameTypeStatic for MinMaxAvgDim1Bins where NTY: SubFrId, { const FRAME_TYPE_ID: u32 = crate::MIN_MAX_AVG_DIM_1_BINS_FRAME_TYPE_ID + NTY::SUB; + + fn from_error(_: err::Error) -> Self { + // TODO remove usage of this + panic!() + } +} + +impl SitemtyFrameType for MinMaxAvgDim1Bins +where + NTY: SubFrId, +{ + fn frame_type_id(&self) -> u32 { + ::FRAME_TYPE_ID + } } impl fmt::Debug for MinMaxAvgDim1Bins @@ -531,3 +545,25 @@ where Self::Collector::new(bin_count_exp) } } + +impl TimeBinnableDyn for MinMaxAvgDim1Bins { + fn aggregator_new(&self) -> Box { + todo!() + } +} + +impl TimeBinned for MinMaxAvgDim1Bins { + fn as_time_binnable_dyn(&self) -> &dyn TimeBinnableDyn { + self as &dyn TimeBinnableDyn + } + + fn workaround_clone(&self) -> Box { + // TODO remove + panic!() + } + + fn dummy_test_i32(&self) -> i32 { + // TODO remove + panic!() + } +} diff --git a/items/src/eventsitem.rs b/items/src/eventsitem.rs index 7212e48..d217d47 100644 --- a/items/src/eventsitem.rs +++ b/items/src/eventsitem.rs @@ -11,7 +11,11 @@ pub enum EventsItem { } impl SitemtyFrameType for EventsItem { - const FRAME_TYPE_ID: u32 = crate::EVENTS_ITEM_FRAME_TYPE_ID; + //const FRAME_TYPE_ID: u32 = crate::EVENTS_ITEM_FRAME_TYPE_ID; + + fn frame_type_id(&self) -> u32 { + crate::EVENTS_ITEM_FRAME_TYPE_ID + } } impl EventsItem { diff --git a/items/src/frame.rs b/items/src/frame.rs index 72ddbd6..b00c3f4 100644 --- a/items/src/frame.rs +++ b/items/src/frame.rs @@ -1,6 +1,7 @@ use crate::inmem::InMemoryFrame; use crate::{ - FrameType, ERROR_FRAME_TYPE_ID, INMEM_FRAME_ENCID, INMEM_FRAME_HEAD, INMEM_FRAME_MAGIC, TERM_FRAME_TYPE_ID, + FrameType, FrameTypeStatic, ERROR_FRAME_TYPE_ID, INMEM_FRAME_ENCID, INMEM_FRAME_HEAD, INMEM_FRAME_MAGIC, + TERM_FRAME_TYPE_ID, }; use bytes::{BufMut, BytesMut}; use err::Error; @@ -16,17 +17,31 @@ where if item.is_err() { make_error_frame(item.err().unwrap()) } else { - make_frame_2(item, FT::FRAME_TYPE_ID) + make_frame_2( + item, + //FT::FRAME_TYPE_ID + item.frame_type_id(), + ) } } pub fn make_frame_2(item: &FT, fty: u32) -> Result where - FT: Serialize, + FT: erased_serde::Serialize, { //trace!("make_frame_2"); - match bincode::serialize(item) { - Ok(enc) => { + let mut out = vec![]; + let opts = bincode::DefaultOptions::new() + //.with_fixint_encoding() + //.allow_trailing_bytes() + ; + let mut ser = bincode::Serializer::new(&mut out, opts); + //let mut ser = serde_json::Serializer::new(std::io::stdout()); + let mut ser2 = ::erase(&mut ser); + //match bincode::serialize(item) { + match item.erased_serialize(&mut ser2) { + Ok(_) => { + let enc = out; if enc.len() > u32::MAX as usize { return Err(Error::with_msg(format!("too long payload {}", enc.len()))); } @@ -105,7 +120,7 @@ pub fn make_term_frame() -> BytesMut { pub fn decode_frame(frame: &InMemoryFrame) -> Result where - T: FrameType + DeserializeOwned, + T: FrameTypeStatic + DeserializeOwned, { if frame.encid() != INMEM_FRAME_ENCID { return Err(Error::with_msg(format!("unknown encoder id {:?}", frame))); @@ -125,7 +140,7 @@ where }; Ok(T::from_error(k)) } else { - let tyid = ::FRAME_TYPE_ID; + let tyid = T::FRAME_TYPE_ID; if frame.tyid() != tyid { return Err(Error::with_msg(format!( "type id mismatch expect {:x} found {:?}", diff --git a/items/src/items.rs b/items/src/items.rs index e910c12..5b7eaf3 100644 --- a/items/src/items.rs +++ b/items/src/items.rs @@ -39,11 +39,11 @@ pub const TERM_FRAME_TYPE_ID: u32 = 0x01; pub const ERROR_FRAME_TYPE_ID: u32 = 0x02; pub const EVENT_QUERY_JSON_STRING_FRAME: u32 = 0x100; pub const EVENT_VALUES_FRAME_TYPE_ID: u32 = 0x500; -pub const MIN_MAX_AVG_BINS: u32 = 0x700; pub const WAVE_EVENTS_FRAME_TYPE_ID: u32 = 0x800; pub const X_BINNED_SCALAR_EVENTS_FRAME_TYPE_ID: u32 = 0x8800; pub const X_BINNED_WAVE_EVENTS_FRAME_TYPE_ID: u32 = 0x900; pub const MIN_MAX_AVG_WAVE_BINS: u32 = 0xa00; +pub const MIN_MAX_AVG_DIM_0_BINS_FRAME_TYPE_ID: u32 = 0x700; pub const MIN_MAX_AVG_DIM_1_BINS_FRAME_TYPE_ID: u32 = 0xb00; pub const EVENT_FULL_FRAME_TYPE_ID: u32 = 0x2200; pub const EVENTS_ITEM_FRAME_TYPE_ID: u32 = 0x2300; @@ -212,38 +212,68 @@ impl SubFrId for BoolNum { const SUB: u32 = 14; } +// To be implemented by the data containers, i.e. the T's in Sitemty, e.g. ScalarEvents. +// TODO rename this, since it is misleading because it is not meanto to be implemented by Sitemty. pub trait SitemtyFrameType { - const FRAME_TYPE_ID: u32; + //const FRAME_TYPE_ID: u32; + fn frame_type_id(&self) -> u32; } -pub trait FrameType { +pub trait FrameTypeStatic { const FRAME_TYPE_ID: u32; - fn is_err(&self) -> bool; - fn err(&self) -> Option<&::err::Error>; fn from_error(x: ::err::Error) -> Self; } -impl FrameType for EventQueryJsonStringFrame { +// Meant to be implemented by Sitemty. +pub trait FrameType { + fn frame_type_id(&self) -> u32; + fn is_err(&self) -> bool; + fn err(&self) -> Option<&::err::Error>; +} + +impl FrameTypeStatic for EventQueryJsonStringFrame { const FRAME_TYPE_ID: u32 = EVENT_QUERY_JSON_STRING_FRAME; + fn from_error(_x: err::Error) -> Self { + error!("FrameTypeStatic::from_error todo"); + todo!() + } +} + +impl FrameTypeStatic for Sitemty { + const FRAME_TYPE_ID: u32 = ::FRAME_TYPE_ID; + + fn from_error(_: err::Error) -> Self { + // TODO remove this method. + panic!() + } +} + +impl FrameType for Box +where + T: FrameType, +{ + fn frame_type_id(&self) -> u32 { + self.as_ref().frame_type_id() + } + fn is_err(&self) -> bool { - false + self.as_ref().is_err() } fn err(&self) -> Option<&::err::Error> { - None - } - - fn from_error(_x: ::err::Error) -> Self { - panic!() + self.as_ref().err() } } impl FrameType for Sitemty where - T: SitemtyFrameType, + // SitemtyFrameType + T: FrameTypeStatic, { - const FRAME_TYPE_ID: u32 = T::FRAME_TYPE_ID; + fn frame_type_id(&self) -> u32 { + ::FRAME_TYPE_ID + } fn is_err(&self) -> bool { match self { @@ -258,47 +288,74 @@ where Err(e) => Some(e), } } +} - fn from_error(x: ::err::Error) -> Self { - Err(x) +impl FrameType for EventQueryJsonStringFrame { + fn frame_type_id(&self) -> u32 { + ::FRAME_TYPE_ID + } + + fn is_err(&self) -> bool { + false + } + + fn err(&self) -> Option<&::err::Error> { + None } } -pub trait ProvidesFrameType { - fn frame_type_id(&self) -> u32; +impl SitemtyFrameType for Box { + fn frame_type_id(&self) -> u32 { + self.as_time_binnable_dyn().frame_type_id() + } } -pub trait Framable: Send { - fn typeid(&self) -> u32; +// TODO do we need Send here? +pub trait Framable { fn make_frame(&self) -> Result; } +// erased_serde::Serialize +pub trait FramableInner: SitemtyFrameType + Send { + fn _dummy(&self); +} + +// erased_serde::Serialize` +impl FramableInner for T { + fn _dummy(&self) {} +} + +//impl FramableInner for Box {} + // TODO need also Framable for those types defined in other crates. +// TODO not all T have FrameTypeStatic, e.g. Box impl Framable for Sitemty -where - T: SitemtyFrameType + Serialize + Send, +//where +//Self: erased_serde::Serialize, +//T: FramableInner + FrameTypeStatic, +//T: Sized, { - fn typeid(&self) -> u32 { - T::FRAME_TYPE_ID + fn make_frame(&self) -> Result { + todo!() } - fn make_frame(&self) -> Result { + /*fn make_frame(&self) -> Result { //trace!("make_frame"); match self { - Ok(_) => make_frame_2(self, T::FRAME_TYPE_ID), + Ok(_) => make_frame_2( + self, + //T::FRAME_TYPE_ID + self.frame_type_id(), + ), Err(e) => make_error_frame(e), } - } + }*/ } impl Framable for Box where T: Framable + ?Sized, { - fn typeid(&self) -> u32 { - self.as_ref().typeid() - } - fn make_frame(&self) -> Result { self.as_ref().make_frame() } @@ -405,7 +462,7 @@ pub trait TimeBins: Send + Unpin + WithLen + Appendable + FilterFittingInside { } pub trait TimeBinnableType: - Send + Unpin + RangeOverlapInfo + FilterFittingInside + Appendable + Serialize + ReadableFromFile + Send + Unpin + RangeOverlapInfo + FilterFittingInside + Appendable + Serialize + ReadableFromFile + FrameTypeStatic { type Output: TimeBinnableType; type Aggregator: TimeBinnableTypeAggregator + Send + Unpin; @@ -414,20 +471,39 @@ pub trait TimeBinnableType: /// Provides a time-binned representation of the implementing type. /// In contrast to `TimeBinnableType` this is meant for trait objects. -pub trait TimeBinnableDyn {} + +// TODO should not require Sync! +// TODO SitemtyFrameType is already supertrait of FramableInner. +pub trait TimeBinnableDyn: FramableInner + SitemtyFrameType + Sync + Send { + fn aggregator_new(&self) -> Box; +} pub trait TimeBinnableDynAggregator: Send { fn ingest(&mut self, item: &dyn TimeBinnableDyn); fn result(&mut self) -> Box; } -pub trait TimeBinned: Framable + Sync + Send + TimeBinnableDyn { - fn aggregator_new(&self) -> Box; +/// Container of some form of events, for use as trait object. +pub trait EventsDyn: TimeBinnableDyn {} + +/// Data in time-binned form. +pub trait TimeBinned: TimeBinnableDyn { fn as_time_binnable_dyn(&self) -> &dyn TimeBinnableDyn; fn workaround_clone(&self) -> Box; fn dummy_test_i32(&self) -> i32; } +// TODO this impl is already covered by the generic one: +/*impl FramableInner for Box { + fn _dummy(&self) {} +}*/ + +impl TimeBinnableDyn for Box { + fn aggregator_new(&self) -> Box { + self.as_time_binnable_dyn().aggregator_new() + } +} + // TODO should get I/O and tokio dependence out of this crate pub trait ReadableFromFile: Sized { fn read_from_file(file: File) -> Result, Error>; diff --git a/items/src/numops.rs b/items/src/numops.rs index 8123ba3..4a38093 100644 --- a/items/src/numops.rs +++ b/items/src/numops.rs @@ -114,6 +114,7 @@ pub trait NumOps: + Clone + AsPrimF32 + Send + + Sync + Unpin + Debug + Zero diff --git a/items/src/scalarevents.rs b/items/src/scalarevents.rs index b34a7f5..ad03081 100644 --- a/items/src/scalarevents.rs +++ b/items/src/scalarevents.rs @@ -2,9 +2,9 @@ use crate::binsdim0::MinMaxAvgDim0Bins; use crate::numops::NumOps; use crate::streams::{Collectable, Collector}; use crate::{ - pulse_offs_from_abs, ts_offs_from_abs, Appendable, ByteEstimate, Clearable, EventAppendable, FilterFittingInside, - Fits, FitsInside, PushableIndex, RangeOverlapInfo, ReadPbv, ReadableFromFile, SitemtyFrameType, TimeBinnableType, - TimeBinnableTypeAggregator, WithLen, WithTimestamps, + pulse_offs_from_abs, ts_offs_from_abs, Appendable, ByteEstimate, Clearable, EventAppendable, EventsDyn, + FilterFittingInside, Fits, FitsInside, FrameTypeStatic, PushableIndex, RangeOverlapInfo, ReadPbv, ReadableFromFile, + SitemtyFrameType, TimeBinnableDyn, TimeBinnableType, TimeBinnableTypeAggregator, WithLen, WithTimestamps, }; use err::Error; use netpod::log::*; @@ -49,11 +49,26 @@ impl ScalarEvents { } } -impl SitemtyFrameType for ScalarEvents +impl FrameTypeStatic for ScalarEvents where NTY: NumOps, { const FRAME_TYPE_ID: u32 = crate::EVENT_VALUES_FRAME_TYPE_ID + NTY::SUB; + + fn from_error(_: err::Error) -> Self { + // TODO this method should not be used, remove. + error!("impl FrameTypeStatic for ScalarEvents"); + panic!() + } +} + +impl SitemtyFrameType for ScalarEvents +where + NTY: NumOps, +{ + fn frame_type_id(&self) -> u32 { + ::FRAME_TYPE_ID + } } impl ScalarEvents { @@ -539,3 +554,11 @@ where ret } } + +impl TimeBinnableDyn for ScalarEvents { + fn aggregator_new(&self) -> Box { + todo!() + } +} + +impl EventsDyn for ScalarEvents {} diff --git a/items/src/statsevents.rs b/items/src/statsevents.rs index 31d7fc4..5b91660 100644 --- a/items/src/statsevents.rs +++ b/items/src/statsevents.rs @@ -1,7 +1,7 @@ use crate::streams::{Collectable, Collector}; use crate::{ ts_offs_from_abs, Appendable, ByteEstimate, Clearable, EventAppendable, FilterFittingInside, Fits, FitsInside, - PushableIndex, RangeOverlapInfo, ReadPbv, ReadableFromFile, SitemtyFrameType, TimeBinnableType, + FrameTypeStatic, PushableIndex, RangeOverlapInfo, ReadPbv, ReadableFromFile, SitemtyFrameType, TimeBinnableType, TimeBinnableTypeAggregator, WithLen, WithTimestamps, }; use err::Error; @@ -17,8 +17,19 @@ pub struct StatsEvents { pub pulses: Vec, } -impl SitemtyFrameType for StatsEvents { +impl FrameTypeStatic for StatsEvents { const FRAME_TYPE_ID: u32 = crate::STATS_EVENTS_FRAME_TYPE_ID; + + fn from_error(_: err::Error) -> Self { + // TODO remove usage of this + panic!() + } +} + +impl SitemtyFrameType for StatsEvents { + fn frame_type_id(&self) -> u32 { + ::FRAME_TYPE_ID + } } impl StatsEvents { diff --git a/items/src/waveevents.rs b/items/src/waveevents.rs index ac5e532..3cb91f3 100644 --- a/items/src/waveevents.rs +++ b/items/src/waveevents.rs @@ -3,9 +3,9 @@ use crate::numops::NumOps; use crate::xbinnedscalarevents::XBinnedScalarEvents; use crate::xbinnedwaveevents::XBinnedWaveEvents; use crate::{ - Appendable, ByteEstimate, Clearable, EventAppendable, EventsNodeProcessor, FilterFittingInside, Fits, FitsInside, - PushableIndex, RangeOverlapInfo, ReadPbv, ReadableFromFile, SitemtyFrameType, SubFrId, TimeBinnableType, - TimeBinnableTypeAggregator, WithLen, WithTimestamps, + Appendable, ByteEstimate, Clearable, EventAppendable, EventsDyn, EventsNodeProcessor, FilterFittingInside, Fits, + FitsInside, FrameTypeStatic, PushableIndex, RangeOverlapInfo, ReadPbv, ReadableFromFile, SitemtyFrameType, SubFrId, + TimeBinnableDyn, TimeBinnableType, TimeBinnableTypeAggregator, WithLen, WithTimestamps, }; use err::Error; use netpod::log::*; @@ -40,11 +40,25 @@ impl WaveEvents { } } -impl SitemtyFrameType for WaveEvents +impl FrameTypeStatic for WaveEvents where NTY: SubFrId, { const FRAME_TYPE_ID: u32 = crate::WAVE_EVENTS_FRAME_TYPE_ID + NTY::SUB; + + fn from_error(_: err::Error) -> Self { + // TODO remove this method. + panic!() + } +} + +impl SitemtyFrameType for WaveEvents +where + NTY: SubFrId, +{ + fn frame_type_id(&self) -> u32 { + ::FRAME_TYPE_ID + } } impl WaveEvents { @@ -494,3 +508,11 @@ where } } } + +impl TimeBinnableDyn for WaveEvents { + fn aggregator_new(&self) -> Box { + todo!() + } +} + +impl EventsDyn for WaveEvents {} diff --git a/items/src/xbinnedscalarevents.rs b/items/src/xbinnedscalarevents.rs index da38439..66e8c19 100644 --- a/items/src/xbinnedscalarevents.rs +++ b/items/src/xbinnedscalarevents.rs @@ -4,7 +4,7 @@ use crate::streams::{Collectable, Collector}; use crate::{ ts_offs_from_abs, Appendable, ByteEstimate, Clearable, FilterFittingInside, Fits, FitsInside, PushableIndex, RangeOverlapInfo, ReadPbv, ReadableFromFile, SitemtyFrameType, SubFrId, TimeBinnableType, - TimeBinnableTypeAggregator, WithLen, WithTimestamps, + TimeBinnableTypeAggregator, WithLen, WithTimestamps, FrameTypeStatic, }; use err::Error; use netpod::log::*; @@ -23,11 +23,24 @@ pub struct XBinnedScalarEvents { pub avgs: Vec, } -impl SitemtyFrameType for XBinnedScalarEvents +impl FrameTypeStatic for XBinnedScalarEvents where NTY: SubFrId, { const FRAME_TYPE_ID: u32 = crate::X_BINNED_SCALAR_EVENTS_FRAME_TYPE_ID + NTY::SUB; + + fn from_error(_: err::Error) -> Self { + panic!() + } +} + +impl SitemtyFrameType for XBinnedScalarEvents +where + NTY: SubFrId, +{ + fn frame_type_id(&self) -> u32 { + ::FRAME_TYPE_ID + } } impl XBinnedScalarEvents { diff --git a/items/src/xbinnedwaveevents.rs b/items/src/xbinnedwaveevents.rs index 6108269..96d6d32 100644 --- a/items/src/xbinnedwaveevents.rs +++ b/items/src/xbinnedwaveevents.rs @@ -2,9 +2,9 @@ use crate::binsdim1::MinMaxAvgDim1Bins; use crate::numops::NumOps; use crate::streams::{Collectable, Collector}; use crate::{ - Appendable, ByteEstimate, Clearable, FilterFittingInside, Fits, FitsInside, PushableIndex, RangeOverlapInfo, - ReadPbv, ReadableFromFile, SitemtyFrameType, SubFrId, TimeBinnableType, TimeBinnableTypeAggregator, WithLen, - WithTimestamps, + Appendable, ByteEstimate, Clearable, FilterFittingInside, Fits, FitsInside, FrameTypeStatic, PushableIndex, + RangeOverlapInfo, ReadPbv, ReadableFromFile, SitemtyFrameType, SubFrId, TimeBinnableType, + TimeBinnableTypeAggregator, WithLen, WithTimestamps, }; use err::Error; use netpod::log::*; @@ -23,11 +23,25 @@ pub struct XBinnedWaveEvents { pub avgs: Vec>, } -impl SitemtyFrameType for XBinnedWaveEvents +impl FrameTypeStatic for XBinnedWaveEvents where NTY: SubFrId, { const FRAME_TYPE_ID: u32 = crate::X_BINNED_WAVE_EVENTS_FRAME_TYPE_ID + NTY::SUB; + + fn from_error(_: err::Error) -> Self { + panic!() + } +} + +// TODO use a generic impl for this: +impl SitemtyFrameType for XBinnedWaveEvents +where + NTY: SubFrId, +{ + fn frame_type_id(&self) -> u32 { + ::FRAME_TYPE_ID + } } impl XBinnedWaveEvents { diff --git a/nodenet/src/conn.rs b/nodenet/src/conn.rs index d25aab0..5ef31ff 100644 --- a/nodenet/src/conn.rs +++ b/nodenet/src/conn.rs @@ -4,7 +4,7 @@ use err::Error; use futures_core::Stream; use futures_util::StreamExt; use items::frame::{decode_frame, make_term_frame}; -use items::{Framable, StreamItem}; +use items::{Framable, RangeCompletableItem, StreamItem}; use netpod::histo::HistoLog2; use netpod::log::*; use netpod::query::RawEventsQuery; @@ -129,12 +129,31 @@ async fn events_conn_handler_inner_try( return Err((e, netout).into()); } - let mut p1: Pin> + Send>> = + let mut p1: Pin> + Send>> = if let Some(conf) = &node_config.node_config.cluster.scylla { let scyco = conf; let dbconf = node_config.node_config.cluster.database.clone(); match make_scylla_stream(&evq, scyco, dbconf, evq.do_test_stream_error).await { - Ok(j) => j, + Ok(s) => { + // + let s = s.map(|item| { + // + /*match item { + Ok(StreamItem::Data(RangeCompletableItem::Data(k))) => { + let b = Box::new(b); + Ok(StreamItem::Data(RangeCompletableItem::Data(b))) + } + Ok(StreamItem::Data(RangeCompletableItem::Complete)) => { + Ok(StreamItem::Data(RangeCompletableItem::Complete)) + } + Ok(StreamItem::Log(k)) => Ok(StreamItem::Log(k)), + Ok(StreamItem::Stats(k)) => Ok(StreamItem::Stats(k)), + Err(e) => Err(e), + }*/ + Box::new(item) as Box + }); + Box::pin(s) + } Err(e) => return Err((e, netout))?, } } else if let Some(aa) = &node_config.node.channel_archiver {