diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index 947f754..3f64cb2 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -7,6 +7,8 @@ use futures_util::{Stream, StreamExt}; use items::eventfull::EventFull; use items::numops::{BoolNum, NumOps, StringNum}; use items::{EventsNodeProcessor, Framable, RangeCompletableItem, Sitemty, StreamItem}; +use items_2::channelevents::ChannelEvents; +use items_2::eventsdim0::EventsDim0; use netpod::log::*; use netpod::query::RawEventsQuery; use netpod::{AggKind, ByteOrder, ByteSize, Channel, DiskIoTune, NanoRange, NodeConfigCached, ScalarType, Shape}; @@ -19,7 +21,7 @@ fn make_num_pipeline_stream_evs( event_value_shape: EVS, events_node_proc: ENP, event_blobs: EventChunkerMultifile, -) -> Pin> + Send>> +) -> Pin> + Send>> where NTY: NumOps + NumFromBytes + 'static, END: Endianness + 'static, @@ -33,17 +35,35 @@ where Ok(item) => match item { StreamItem::DataItem(item) => match item { RangeCompletableItem::Data(item) => { - let item = events_node_proc.process(item); + // TODO fix super ugly slow glue code use items::EventsNodeProcessorOutput; - let parts = item.into_parts::(); - let item = items_2::eventsdim0::EventsDim0 { - tss: parts.1, - pulses: VecDeque::new(), - values: parts.0, - }; - let item = Box::new(item) as Box; - //Ok(StreamItem::DataItem(RangeCompletableItem::Data(todo!()))) - Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) + let mut item = events_node_proc.process(item); + if let Some(item) = item + .as_any_mut() + .downcast_mut::>() + { + warn!("ScalarEvents"); + let tss: VecDeque = item.tss.iter().map(|x| *x).collect(); + let pulses: VecDeque = item.pulses.iter().map(|x| *x).collect(); + let values: VecDeque = item.values.iter().map(|x| x.clone()).collect(); + let item = EventsDim0 { tss, pulses, values }; + let item = ChannelEvents::Events(Box::new(item)); + Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) + } else { + if let Some(item) = item.as_any_mut().downcast_mut::>() { + warn!("WaveEvents"); + let _tss: VecDeque = item.tss.iter().map(|x| *x).collect(); + let _pulses: VecDeque = item.pulses.iter().map(|x| *x).collect(); + let _values: VecDeque> = item.vals.iter().map(|x| x.clone()).collect(); + //let item = EventsDim1 { tss, pulses, values }; + //let item = ChannelEvents::Events(Box::new(item)); + //Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) + Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) + } else { + error!("TODO bad, no idea what this item is"); + Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) + } + } } RangeCompletableItem::RangeComplete => Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)), }, @@ -51,8 +71,7 @@ where StreamItem::Stats(item) => Ok(StreamItem::Stats(item)), }, Err(e) => Err(e), - }) - .map(|item| Box::new(item) as Box); + }); Box::pin(s2) } @@ -150,7 +169,7 @@ macro_rules! pipe1 { pub async fn make_event_pipe( evq: &RawEventsQuery, node_config: &NodeConfigCached, -) -> Result> + Send>>, Error> { +) -> Result> + Send>>, Error> { if false { match dbconn::channel_exists(&evq.channel, &node_config).await { Ok(_) => (), diff --git a/items/src/items.rs b/items/src/items.rs index 06158a1..c4c05a1 100644 --- a/items/src/items.rs +++ b/items/src/items.rs @@ -368,7 +368,8 @@ impl FrameType for EventQueryJsonStringFrame { pub trait EventsNodeProcessorOutput: Send + Unpin + DeserializeOwned + WithTimestamps + TimeBinnableType + ByteEstimate { - fn into_parts(self) -> (VecDeque, VecDeque); + fn as_any_mut(&mut self) -> &mut dyn Any; + fn into_parts(self) -> (Box, VecDeque, VecDeque); } pub trait EventsNodeProcessor: Send + Unpin { diff --git a/items/src/scalarevents.rs b/items/src/scalarevents.rs index 6bb3c83..2e26270 100644 --- a/items/src/scalarevents.rs +++ b/items/src/scalarevents.rs @@ -827,7 +827,15 @@ impl EventsNodeProcessorOutput for ScalarEvents where NTY: NumOps, { - fn into_parts(self) -> (VecDeque, VecDeque) { - todo!() + fn as_any_mut(&mut self) -> &mut dyn Any { + self + } + + fn into_parts(self) -> (Box, VecDeque, VecDeque) { + ( + Box::new(VecDeque::from(self.values)), + self.tss.into(), + self.pulses.into(), + ) } } diff --git a/items/src/statsevents.rs b/items/src/statsevents.rs index a6844d9..bd0045f 100644 --- a/items/src/statsevents.rs +++ b/items/src/statsevents.rs @@ -8,6 +8,7 @@ use err::Error; use netpod::log::*; use netpod::{NanoRange, Shape}; use serde::{Deserialize, Serialize}; +use std::any::Any; use std::collections::VecDeque; use std::fmt; use tokio::fs::File; @@ -423,7 +424,11 @@ impl EventAppendable for StatsEvents { } impl EventsNodeProcessorOutput for StatsEvents { - fn into_parts(self) -> (VecDeque, VecDeque) { + fn as_any_mut(&mut self) -> &mut dyn Any { + self + } + + fn into_parts(self) -> (Box, VecDeque, VecDeque) { todo!() } } diff --git a/items/src/waveevents.rs b/items/src/waveevents.rs index 884910d..fe4ee51 100644 --- a/items/src/waveevents.rs +++ b/items/src/waveevents.rs @@ -12,6 +12,7 @@ use items_0::subfr::SubFrId; use netpod::log::*; use netpod::{x_bin_count, AggKind, NanoRange, Shape}; use serde::{Deserialize, Serialize}; +use std::any::Any; use std::collections::VecDeque; use std::marker::PhantomData; use tokio::fs::File; @@ -540,7 +541,11 @@ impl EventsNodeProcessorOutput for WaveEvents where NTY: NumOps, { - fn into_parts(self) -> (VecDeque, VecDeque) { + fn as_any_mut(&mut self) -> &mut dyn Any { + self + } + + fn into_parts(self) -> (Box, VecDeque, VecDeque) { todo!() } } diff --git a/items/src/xbinnedscalarevents.rs b/items/src/xbinnedscalarevents.rs index ed9f131..7e68845 100644 --- a/items/src/xbinnedscalarevents.rs +++ b/items/src/xbinnedscalarevents.rs @@ -1,3 +1,4 @@ +use std::any::Any; use std::collections::VecDeque; use crate::binsdim0::MinMaxAvgDim0Bins; @@ -509,7 +510,11 @@ impl EventsNodeProcessorOutput for XBinnedScalarEvents where NTY: NumOps, { - fn into_parts(self) -> (VecDeque, VecDeque) { + fn as_any_mut(&mut self) -> &mut dyn Any { + self + } + + fn into_parts(self) -> (Box, VecDeque, VecDeque) { todo!() } } diff --git a/items/src/xbinnedwaveevents.rs b/items/src/xbinnedwaveevents.rs index c67f144..af06bff 100644 --- a/items/src/xbinnedwaveevents.rs +++ b/items/src/xbinnedwaveevents.rs @@ -12,6 +12,7 @@ use netpod::log::*; use netpod::timeunits::*; use netpod::{NanoRange, Shape}; use serde::{Deserialize, Serialize}; +use std::any::Any; use std::collections::VecDeque; use std::mem; use tokio::fs::File; @@ -539,7 +540,11 @@ impl EventsNodeProcessorOutput for XBinnedWaveEvents where NTY: NumOps, { - fn into_parts(self) -> (VecDeque, VecDeque) { + fn as_any_mut(&mut self) -> &mut dyn Any { + self + } + + fn into_parts(self) -> (Box, VecDeque, VecDeque) { todo!() } } diff --git a/nodenet/Cargo.toml b/nodenet/Cargo.toml index 56b7ba0..6483beb 100644 --- a/nodenet/Cargo.toml +++ b/nodenet/Cargo.toml @@ -19,7 +19,6 @@ bytes = "1.0.1" crc32fast = "1.2.1" arrayref = "0.3.6" byteorder = "1.4.3" -futures-core = "0.3.14" futures-util = "0.3.14" tracing = "0.1.25" hex = "0.4.3" diff --git a/nodenet/src/conn.rs b/nodenet/src/conn.rs index 0aba3e0..b61ef3b 100644 --- a/nodenet/src/conn.rs +++ b/nodenet/src/conn.rs @@ -1,8 +1,8 @@ use err::Error; -use futures_core::Stream; -use futures_util::StreamExt; +use futures_util::{stream, Stream, StreamExt}; use items::frame::{decode_frame, make_term_frame}; use items::{EventQueryJsonStringFrame, Framable, RangeCompletableItem, Sitemty, StreamItem}; +use items_0::Empty; use items_2::channelevents::ChannelEvents; use netpod::histo::HistoLog2; use netpod::log::*; @@ -107,107 +107,114 @@ async fn events_conn_handler_inner_try( return Err((e, netout).into()); } - let mut p1: Pin> + Send>> = - if evq.channel().backend() == "test-inmem" { - warn!("TEST BACKEND DATA"); - use items_0::Empty; - use netpod::timeunits::MS; - let node_count = node_config.node_config.cluster.nodes.len(); - let node_ix = node_config.ix; - if evq.channel().name() == "inmem-d0-i32" { - let mut item = items_2::eventsdim0::EventsDim0::::empty(); - let td = MS * 10; - for i in 0..20 { - let ts = MS * 17 + td * node_ix as u64 + td * node_count as u64 * i; - let pulse = 1 + node_ix as u64 + node_count as u64 * i; - item.push(ts, pulse, pulse as _); - } - let item = ChannelEvents::Events(Box::new(item) as _); - let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))); - let item = Box::new(item) as _; - let stream = futures_util::stream::iter([item]); - Box::pin(stream) - } else if evq.channel().name() == "inmem-d0-f32" { - let mut item = items_2::eventsdim0::EventsDim0::::empty(); - let td = MS * 10; - for i in 0..20 { - let ts = MS * 17 + td * node_ix as u64 + td * node_count as u64 * i; - let pulse = 1 + node_ix as u64 + node_count as u64 * i; - item.push(ts, pulse, ts as _); - } - let item = ChannelEvents::Events(Box::new(item) as _); - let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))); - let item = Box::new(item) as _; - let stream = futures_util::stream::iter([item]); - Box::pin(stream) - } else { - let stream = futures_util::stream::empty(); - Box::pin(stream) + let mut p1: Pin> + Send>> = if evq.channel().backend() == "test-inmem" + { + warn!("TEST BACKEND DATA"); + use netpod::timeunits::MS; + let node_count = node_config.node_config.cluster.nodes.len(); + let node_ix = node_config.ix; + if evq.channel().name() == "inmem-d0-i32" { + let mut item = items_2::eventsdim0::EventsDim0::::empty(); + let td = MS * 10; + for i in 0..20 { + let ts = MS * 17 + td * node_ix as u64 + td * node_count as u64 * i; + let pulse = 1 + node_ix as u64 + node_count as u64 * i; + item.push(ts, pulse, pulse as _); } - } else if let Some(conf) = &node_config.node_config.cluster.scylla { - // TODO depends in general on the query - // TODO why both in PlainEventsQuery and as separate parameter? Check other usages. - let do_one_before_range = false; - // TODO use better builder pattern with shortcuts for production and dev defaults - let qu = PlainEventsQuery::new(evq.channel, evq.range, 1024 * 8, None, true); - let scyco = conf; - let _dbconf = node_config.node_config.cluster.database.clone(); - let scy = match scyllaconn::create_scy_session(scyco).await { - Ok(k) => k, - Err(e) => return Err((e, netout))?, - }; - let series = err::todoval(); - let scalar_type = err::todoval(); - let shape = err::todoval(); - let do_test_stream_error = false; - let stream = match scyllaconn::events::make_scylla_stream( - &qu, - do_one_before_range, - series, - scalar_type, - shape, - scy, - do_test_stream_error, - ) - .await - { - Ok(k) => k, - Err(e) => return Err((e, netout))?, - }; - let s = stream.map(|item| { - let item = match item { - Ok(item) => match item { - ChannelEvents::Events(_item) => { - // TODO - let item = items::scalarevents::ScalarEvents::::empty(); - Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) - } - ChannelEvents::Status(_item) => todo!(), - }, - Err(e) => Err(e), - }; - Box::new(item) as Box - }); - Box::pin(s) - } else if let Some(_) = &node_config.node.channel_archiver { - let e = Error::with_msg_no_trace("archapp not built"); - return Err((e, netout))?; - } else if let Some(_) = &node_config.node.archiver_appliance { - let e = Error::with_msg_no_trace("archapp not built"); - return Err((e, netout))?; + let item = ChannelEvents::Events(Box::new(item) as _); + let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))); + let stream = futures_util::stream::iter([item]); + Box::pin(stream) + } else if evq.channel().name() == "inmem-d0-f32" { + let mut item = items_2::eventsdim0::EventsDim0::::empty(); + let td = MS * 10; + for i in 0..20 { + let ts = MS * 17 + td * node_ix as u64 + td * node_count as u64 * i; + let pulse = 1 + node_ix as u64 + node_count as u64 * i; + item.push(ts, pulse, ts as _); + } + let item = ChannelEvents::Events(Box::new(item) as _); + let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))); + let stream = futures_util::stream::iter([item]); + Box::pin(stream) } else { - let stream = match evq.agg_kind { - AggKind::EventBlobs => match disk::raw::conn::make_event_blobs_pipe(&evq, node_config).await { - Ok(j) => j, - Err(e) => return Err((e, netout))?, - }, - _ => match disk::raw::conn::make_event_pipe(&evq, node_config).await { - Ok(j) => j, - Err(e) => return Err((e, netout))?, - }, - }; - stream + let stream = futures_util::stream::empty(); + Box::pin(stream) + } + } else if let Some(conf) = &node_config.node_config.cluster.scylla { + // TODO depends in general on the query + // TODO why both in PlainEventsQuery and as separate parameter? Check other usages. + let do_one_before_range = false; + // TODO use better builder pattern with shortcuts for production and dev defaults + let qu = PlainEventsQuery::new(evq.channel, evq.range, 1024 * 8, None, true); + let scyco = conf; + let _dbconf = node_config.node_config.cluster.database.clone(); + let scy = match scyllaconn::create_scy_session(scyco).await { + Ok(k) => k, + Err(e) => return Err((e, netout))?, }; + let series = err::todoval(); + let scalar_type = err::todoval(); + let shape = err::todoval(); + let do_test_stream_error = false; + let stream = match scyllaconn::events::make_scylla_stream( + &qu, + do_one_before_range, + series, + scalar_type, + shape, + scy, + do_test_stream_error, + ) + .await + { + Ok(k) => k, + Err(e) => return Err((e, netout))?, + }; + let e = Error::with_msg_no_trace("TODO scylla events"); + if true { + return Err((e, netout))?; + } + let _s = stream.map(|item| { + let item = match item { + Ok(item) => match item { + ChannelEvents::Events(_item) => { + // TODO + let item = items_2::eventsdim0::EventsDim0::::empty(); + let item = ChannelEvents::Events(Box::new(item)); + let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))); + item + } + ChannelEvents::Status(_item) => todo!(), + }, + Err(e) => Err(e), + }; + Box::new(item) as Box + }); + let s = stream::empty(); + Box::pin(s) + } else if let Some(_) = &node_config.node.channel_archiver { + let e = Error::with_msg_no_trace("archapp not built"); + return Err((e, netout))?; + } else if let Some(_) = &node_config.node.archiver_appliance { + let e = Error::with_msg_no_trace("archapp not built"); + return Err((e, netout))?; + } else { + let stream = match evq.agg_kind { + AggKind::EventBlobs => match disk::raw::conn::make_event_blobs_pipe(&evq, node_config).await { + Ok(_stream) => { + let e = Error::with_msg_no_trace("TODO make_event_blobs_pipe"); + return Err((e, netout))?; + } + Err(e) => return Err((e, netout))?, + }, + _ => match disk::raw::conn::make_event_pipe(&evq, node_config).await { + Ok(j) => j, + Err(e) => return Err((e, netout))?, + }, + }; + stream + }; let mut buf_len_histo = HistoLog2::new(5); while let Some(item) = p1.next().await { let item = item.make_frame();