diff --git a/items/src/items.rs b/items/src/items.rs index feda325..c4a17e1 100644 --- a/items/src/items.rs +++ b/items/src/items.rs @@ -50,6 +50,7 @@ pub const RANGE_COMPLETE_FRAME_TYPE_ID: u32 = 0xe00; pub const EVENT_FULL_FRAME_TYPE_ID: u32 = 0x2200; pub const EVENTS_ITEM_FRAME_TYPE_ID: u32 = 0x2300; pub const STATS_EVENTS_FRAME_TYPE_ID: u32 = 0x2400; +pub const ITEMS_2_CHANNEL_EVENTS_FRAME_TYPE_ID: u32 = 0x2500; pub const X_BINNED_SCALAR_EVENTS_FRAME_TYPE_ID: u32 = 0x8800; pub const X_BINNED_WAVE_EVENTS_FRAME_TYPE_ID: u32 = 0x8900; diff --git a/items_2/src/eventsdim0.rs b/items_2/src/eventsdim0.rs index b455f3f..a93b26f 100644 --- a/items_2/src/eventsdim0.rs +++ b/items_2/src/eventsdim0.rs @@ -32,6 +32,10 @@ impl EventsDim0 { self.pulses.push_front(pulse); self.values.push_front(value); } + + pub fn serde_id() -> &'static str { + "EventsDim0" + } } impl Empty for EventsDim0 { @@ -561,6 +565,14 @@ impl Events for EventsDim0 { false } } + + fn serde_id(&self) -> &'static str { + Self::serde_id() + } + + fn nty_id(&self) -> u32 { + NTY::SUB + } } pub struct EventsDim0TimeBinner { diff --git a/items_2/src/items_2.rs b/items_2/src/items_2.rs index aec6c7c..6b538be 100644 --- a/items_2/src/items_2.rs +++ b/items_2/src/items_2.rs @@ -8,6 +8,8 @@ use chrono::{DateTime, TimeZone, Utc}; use futures_util::FutureExt; use futures_util::Stream; use futures_util::StreamExt; +use items::FrameTypeInnerStatic; +use items::SubFrId; use netpod::log::*; use netpod::timeunits::*; use netpod::{AggKind, NanoRange, ScalarType, Shape}; @@ -100,7 +102,9 @@ impl_as_prim_f32!(i64); impl_as_prim_f32!(f32); impl_as_prim_f32!(f64); -pub trait ScalarOps: fmt::Debug + Clone + PartialOrd + AsPrimF32 + Serialize + Unpin + Send + 'static { +pub trait ScalarOps: + fmt::Debug + Clone + PartialOrd + SubFrId + AsPrimF32 + Serialize + Unpin + Send + 'static +{ fn zero() -> Self; } @@ -163,6 +167,17 @@ impl From for Error { } } +impl std::error::Error for Error {} + +impl serde::de::Error for Error { + fn custom(msg: T) -> Self + where + T: fmt::Display, + { + format!("{msg}").into() + } +} + pub trait WithLen { fn len(&self) -> usize; } @@ -262,6 +277,8 @@ pub trait Events: fmt::Debug + Any + Collectable + TimeBinnable + Send + erased_ fn ts_max(&self) -> Option; fn take_new_events_until_ts(&mut self, ts_end: u64) -> Box; fn partial_eq_dyn(&self, other: &dyn Events) -> bool; + fn serde_id(&self) -> &'static str; + fn nty_id(&self) -> u32; } erased_serde::serialize_trait_object!(Events); @@ -460,13 +477,140 @@ impl MergableEvents for Box { } } -#[derive(Debug, Serialize)] +/// Events on a channel consist not only of e.g. timestamped values, but can be also +/// connection status changes. +#[derive(Debug)] pub enum ChannelEvents { Events(Box), Status(ConnStatusEvent), + // TODO the RangeComplete event would probably fit better on some outer layer: RangeComplete, } +mod serde_channel_events { + use super::{ChannelEvents, Events}; + use crate::eventsdim0::EventsDim0; + use serde::de::{self, EnumAccess, VariantAccess, Visitor}; + use serde::{Deserialize, Deserializer, Serialize, Serializer}; + use std::fmt; + + impl Serialize for ChannelEvents { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let name = "ChannelEvents"; + match self { + ChannelEvents::Events(obj) => { + use serde::ser::SerializeTupleVariant; + let mut ser = serializer.serialize_tuple_variant(name, 0, "Events", 3)?; + ser.serialize_field(obj.serde_id())?; + ser.serialize_field(&obj.nty_id())?; + ser.serialize_field(obj)?; + ser.end() + } + ChannelEvents::Status(val) => serializer.serialize_newtype_variant(name, 1, "Status", val), + ChannelEvents::RangeComplete => serializer.serialize_unit_variant(name, 2, "RangeComplete"), + } + } + } + + struct EventsBoxVisitor; + + impl<'de> Visitor<'de> for EventsBoxVisitor { + type Value = Box; + + fn expecting(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "Events object") + } + + fn visit_seq(self, mut seq: A) -> Result + where + A: de::SeqAccess<'de>, + { + use items::SubFrId; + let e0: &str = seq.next_element()?.ok_or(de::Error::missing_field("ty .0"))?; + let e1: u32 = seq.next_element()?.ok_or(de::Error::missing_field("nty .1"))?; + if e0 == EventsDim0::::serde_id() { + match e1 { + f32::SUB => { + let obj: EventsDim0 = seq.next_element()?.ok_or(de::Error::missing_field("obj .2"))?; + Ok(Box::new(obj)) + } + _ => Err(de::Error::custom(&format!("unknown nty {e1}"))), + } + } else { + Err(de::Error::custom(&format!("unknown ty {e0}"))) + } + } + } + + pub struct ChannelEventsVisitor; + + impl ChannelEventsVisitor { + fn name() -> &'static str { + "ChannelEvents" + } + + fn allowed_variants() -> &'static [&'static str] { + &["Events", "Status", "RangeComplete"] + } + } + + impl<'de> Visitor<'de> for ChannelEventsVisitor { + type Value = ChannelEvents; + + fn expecting(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "ChannelEvents") + } + + fn visit_enum(self, data: A) -> Result + where + A: EnumAccess<'de>, + { + let (id, var) = data.variant()?; + match id { + "Events" => { + let c = var.tuple_variant(3, EventsBoxVisitor)?; + Ok(Self::Value::Events(c)) + } + _ => return Err(de::Error::unknown_variant(id, Self::allowed_variants())), + } + } + } + + impl<'de> Deserialize<'de> for ChannelEvents { + fn deserialize(de: D) -> Result + where + D: Deserializer<'de>, + { + de.deserialize_enum( + ChannelEventsVisitor::name(), + ChannelEventsVisitor::allowed_variants(), + ChannelEventsVisitor, + ) + } + } +} + +#[cfg(test)] +mod test_channel_events_serde { + use super::ChannelEvents; + use crate::{eventsdim0::EventsDim0, Empty}; + + #[test] + fn channel_events() { + let mut evs = EventsDim0::empty(); + evs.push(8, 2, 3.0f32); + evs.push(12, 3, 3.2f32); + let item = ChannelEvents::Events(Box::new(evs)); + let s = serde_json::to_string_pretty(&item).unwrap(); + eprintln!("{s}"); + let w: ChannelEvents = serde_json::from_str(&s).unwrap(); + eprintln!("{w:?}"); + } +} + impl PartialEq for ChannelEvents { fn eq(&self, other: &Self) -> bool { match (self, other) { @@ -493,6 +637,10 @@ impl MergableEvents for ChannelEvents { } } +impl FrameTypeInnerStatic for ChannelEvents { + const FRAME_TYPE_ID: u32 = items::ITEMS_2_CHANNEL_EVENTS_FRAME_TYPE_ID; +} + type MergeInp = Pin> + Send>>; pub struct ChannelEventsMerger { diff --git a/netpod/src/query.rs b/netpod/src/query.rs index 905eb49..6837d89 100644 --- a/netpod/src/query.rs +++ b/netpod/src/query.rs @@ -62,9 +62,8 @@ impl fmt::Display for CacheUsage { } } -/** -Query parameters to request (optionally) X-processed, but not T-processed events. -*/ +/// Query parameters to request (optionally) X-processed, but not T-processed events. +// TODO maybe merge with PlainEventsQuery? #[derive(Clone, Debug, Serialize, Deserialize)] pub struct RawEventsQuery { pub channel: Channel, @@ -92,6 +91,10 @@ impl RawEventsQuery { do_test_stream_error: false, } } + + pub fn channel(&self) -> &Channel { + &self.channel + } } #[derive(Clone, Debug)] diff --git a/nodenet/src/conn.rs b/nodenet/src/conn.rs index 870aa85..65c8753 100644 --- a/nodenet/src/conn.rs +++ b/nodenet/src/conn.rs @@ -107,7 +107,31 @@ async fn events_conn_handler_inner_try( } let mut p1: Pin> + Send>> = - if let Some(conf) = &node_config.node_config.cluster.scylla { + if evq.channel().backend() == "test-adhoc-dyn" { + use items_2::ChannelEvents; + use items_2::Empty; + use netpod::timeunits::MS; + let node_ix = node_config.ix; + if evq.channel().name() == "scalar-i32" { + let mut item = items_2::eventsdim0::EventsDim0::::empty(); + let td = MS * 10; + let mut ts = MS * 17 + MS * td * node_ix as u64; + let mut pulse = 1 + node_ix as u64; + for _ in 0..20 { + item.push(ts, pulse, ts as _); + ts += 3 * td; + pulse += 3; + } + let item = ChannelEvents::Events(Box::new(item) as _); + let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))); + let item = Box::new(item) as _; + let stream = futures_util::stream::iter([item]); + Box::pin(stream) + } else { + let stream = futures_util::stream::empty(); + Box::pin(stream) + } + } else if let Some(conf) = &node_config.node_config.cluster.scylla { // TODO depends in general on the query // TODO why both in PlainEventsQuery and as separate parameter? Check other usages. let do_one_before_range = false; diff --git a/nodenet/src/conn/test.rs b/nodenet/src/conn/test.rs index 1cd2ac5..b421388 100644 --- a/nodenet/src/conn/test.rs +++ b/nodenet/src/conn/test.rs @@ -1,14 +1,13 @@ use super::*; -use disk::eventchunker::EventFull; use items::frame::make_frame; use items::Sitemty; +use items_2::ChannelEvents; use netpod::timeunits::SEC; -use netpod::{Channel, Cluster, Database, DiskIoTune, FileIoBufferSize, NanoRange, Node, NodeConfig, SfDatabuffer}; +use netpod::{Channel, Cluster, Database, FileIoBufferSize, NanoRange, Node, NodeConfig, SfDatabuffer}; use tokio::net::TcpListener; #[test] fn raw_data_00() { - //taskrun::run(disk::gen::gen_test_data()).unwrap(); let fut = async { let lis = TcpListener::bind("127.0.0.1:0").await.unwrap(); let mut con = TcpStream::connect(lis.local_addr().unwrap()).await.unwrap(); @@ -50,26 +49,16 @@ fn raw_data_00() { }, ix: 0, }; - let qu = RawEventsQuery { - channel: Channel { - series: None, - backend: "testbackend".into(), - name: "scalar-i32-be".into(), - }, - range: NanoRange { - beg: SEC, - end: SEC * 10, - }, - agg_kind: AggKind::Plain, - disk_io_tune: DiskIoTune { - read_sys: netpod::ReadSys::TokioAsyncRead, - read_buffer_len: 1024 * 4, - read_queue_len: 1, - }, - do_decompress: true, - do_test_main_error: false, - do_test_stream_error: false, + let channel = Channel { + series: None, + backend: "test-adhoc-dyn".into(), + name: "scalar-i32".into(), }; + let range = NanoRange { + beg: SEC, + end: SEC * 10, + }; + let qu = RawEventsQuery::new(channel, range, AggKind::Plain); let query = EventQueryJsonStringFrame(serde_json::to_string(&qu).unwrap()); let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(query))); let frame = make_frame(&item).unwrap(); @@ -85,14 +74,14 @@ fn raw_data_00() { Ok(frame) => match frame { StreamItem::DataItem(k) => { eprintln!("{k:?}"); - if k.tyid() == items::EVENT_FULL_FRAME_TYPE_ID { + if k.tyid() == items::ITEMS_2_CHANNEL_EVENTS_FRAME_TYPE_ID { } else if k.tyid() == items::ERROR_FRAME_TYPE_ID { } else if k.tyid() == items::LOG_FRAME_TYPE_ID { } else if k.tyid() == items::STATS_FRAME_TYPE_ID { } else { panic!("unexpected frame type id {:x}", k.tyid()); } - let item: Result, Error> = decode_frame(&k); + let item: Sitemty = decode_frame(&k).unwrap(); eprintln!("decoded: {:?}", item); } StreamItem::Log(_) => todo!(),