Add serde compat for ChannelEvents

This commit is contained in:
Dominik Werder
2022-11-11 20:03:55 +01:00
parent 8ab96e565e
commit 88fa03fb4a
6 changed files with 207 additions and 30 deletions

View File

@@ -50,6 +50,7 @@ pub const RANGE_COMPLETE_FRAME_TYPE_ID: u32 = 0xe00;
pub const EVENT_FULL_FRAME_TYPE_ID: u32 = 0x2200;
pub const EVENTS_ITEM_FRAME_TYPE_ID: u32 = 0x2300;
pub const STATS_EVENTS_FRAME_TYPE_ID: u32 = 0x2400;
pub const ITEMS_2_CHANNEL_EVENTS_FRAME_TYPE_ID: u32 = 0x2500;
pub const X_BINNED_SCALAR_EVENTS_FRAME_TYPE_ID: u32 = 0x8800;
pub const X_BINNED_WAVE_EVENTS_FRAME_TYPE_ID: u32 = 0x8900;

View File

@@ -32,6 +32,10 @@ impl<NTY> EventsDim0<NTY> {
self.pulses.push_front(pulse);
self.values.push_front(value);
}
pub fn serde_id() -> &'static str {
"EventsDim0"
}
}
impl<NTY> Empty for EventsDim0<NTY> {
@@ -561,6 +565,14 @@ impl<NTY: ScalarOps> Events for EventsDim0<NTY> {
false
}
}
fn serde_id(&self) -> &'static str {
Self::serde_id()
}
fn nty_id(&self) -> u32 {
NTY::SUB
}
}
pub struct EventsDim0TimeBinner<NTY: ScalarOps> {

View File

@@ -8,6 +8,8 @@ use chrono::{DateTime, TimeZone, Utc};
use futures_util::FutureExt;
use futures_util::Stream;
use futures_util::StreamExt;
use items::FrameTypeInnerStatic;
use items::SubFrId;
use netpod::log::*;
use netpod::timeunits::*;
use netpod::{AggKind, NanoRange, ScalarType, Shape};
@@ -100,7 +102,9 @@ impl_as_prim_f32!(i64);
impl_as_prim_f32!(f32);
impl_as_prim_f32!(f64);
pub trait ScalarOps: fmt::Debug + Clone + PartialOrd + AsPrimF32 + Serialize + Unpin + Send + 'static {
pub trait ScalarOps:
fmt::Debug + Clone + PartialOrd + SubFrId + AsPrimF32 + Serialize + Unpin + Send + 'static
{
fn zero() -> Self;
}
@@ -163,6 +167,17 @@ impl From<String> for Error {
}
}
impl std::error::Error for Error {}
impl serde::de::Error for Error {
fn custom<T>(msg: T) -> Self
where
T: fmt::Display,
{
format!("{msg}").into()
}
}
pub trait WithLen {
fn len(&self) -> usize;
}
@@ -262,6 +277,8 @@ pub trait Events: fmt::Debug + Any + Collectable + TimeBinnable + Send + erased_
fn ts_max(&self) -> Option<u64>;
fn take_new_events_until_ts(&mut self, ts_end: u64) -> Box<dyn Events>;
fn partial_eq_dyn(&self, other: &dyn Events) -> bool;
fn serde_id(&self) -> &'static str;
fn nty_id(&self) -> u32;
}
erased_serde::serialize_trait_object!(Events);
@@ -460,13 +477,140 @@ impl<T: MergableEvents> MergableEvents for Box<T> {
}
}
#[derive(Debug, Serialize)]
/// Events on a channel consist not only of e.g. timestamped values, but can be also
/// connection status changes.
#[derive(Debug)]
pub enum ChannelEvents {
Events(Box<dyn Events>),
Status(ConnStatusEvent),
// TODO the RangeComplete event would probably fit better on some outer layer:
RangeComplete,
}
mod serde_channel_events {
use super::{ChannelEvents, Events};
use crate::eventsdim0::EventsDim0;
use serde::de::{self, EnumAccess, VariantAccess, Visitor};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::fmt;
impl Serialize for ChannelEvents {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let name = "ChannelEvents";
match self {
ChannelEvents::Events(obj) => {
use serde::ser::SerializeTupleVariant;
let mut ser = serializer.serialize_tuple_variant(name, 0, "Events", 3)?;
ser.serialize_field(obj.serde_id())?;
ser.serialize_field(&obj.nty_id())?;
ser.serialize_field(obj)?;
ser.end()
}
ChannelEvents::Status(val) => serializer.serialize_newtype_variant(name, 1, "Status", val),
ChannelEvents::RangeComplete => serializer.serialize_unit_variant(name, 2, "RangeComplete"),
}
}
}
struct EventsBoxVisitor;
impl<'de> Visitor<'de> for EventsBoxVisitor {
type Value = Box<dyn Events>;
fn expecting(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "Events object")
}
fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
where
A: de::SeqAccess<'de>,
{
use items::SubFrId;
let e0: &str = seq.next_element()?.ok_or(de::Error::missing_field("ty .0"))?;
let e1: u32 = seq.next_element()?.ok_or(de::Error::missing_field("nty .1"))?;
if e0 == EventsDim0::<u8>::serde_id() {
match e1 {
f32::SUB => {
let obj: EventsDim0<f32> = seq.next_element()?.ok_or(de::Error::missing_field("obj .2"))?;
Ok(Box::new(obj))
}
_ => Err(de::Error::custom(&format!("unknown nty {e1}"))),
}
} else {
Err(de::Error::custom(&format!("unknown ty {e0}")))
}
}
}
pub struct ChannelEventsVisitor;
impl ChannelEventsVisitor {
fn name() -> &'static str {
"ChannelEvents"
}
fn allowed_variants() -> &'static [&'static str] {
&["Events", "Status", "RangeComplete"]
}
}
impl<'de> Visitor<'de> for ChannelEventsVisitor {
type Value = ChannelEvents;
fn expecting(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "ChannelEvents")
}
fn visit_enum<A>(self, data: A) -> Result<Self::Value, A::Error>
where
A: EnumAccess<'de>,
{
let (id, var) = data.variant()?;
match id {
"Events" => {
let c = var.tuple_variant(3, EventsBoxVisitor)?;
Ok(Self::Value::Events(c))
}
_ => return Err(de::Error::unknown_variant(id, Self::allowed_variants())),
}
}
}
impl<'de> Deserialize<'de> for ChannelEvents {
fn deserialize<D>(de: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
de.deserialize_enum(
ChannelEventsVisitor::name(),
ChannelEventsVisitor::allowed_variants(),
ChannelEventsVisitor,
)
}
}
}
#[cfg(test)]
mod test_channel_events_serde {
use super::ChannelEvents;
use crate::{eventsdim0::EventsDim0, Empty};
#[test]
fn channel_events() {
let mut evs = EventsDim0::empty();
evs.push(8, 2, 3.0f32);
evs.push(12, 3, 3.2f32);
let item = ChannelEvents::Events(Box::new(evs));
let s = serde_json::to_string_pretty(&item).unwrap();
eprintln!("{s}");
let w: ChannelEvents = serde_json::from_str(&s).unwrap();
eprintln!("{w:?}");
}
}
impl PartialEq for ChannelEvents {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
@@ -493,6 +637,10 @@ impl MergableEvents for ChannelEvents {
}
}
impl FrameTypeInnerStatic for ChannelEvents {
const FRAME_TYPE_ID: u32 = items::ITEMS_2_CHANNEL_EVENTS_FRAME_TYPE_ID;
}
type MergeInp = Pin<Box<dyn Stream<Item = Result<ChannelEvents, Error>> + Send>>;
pub struct ChannelEventsMerger {

View File

@@ -62,9 +62,8 @@ impl fmt::Display for CacheUsage {
}
}
/**
Query parameters to request (optionally) X-processed, but not T-processed events.
*/
/// Query parameters to request (optionally) X-processed, but not T-processed events.
// TODO maybe merge with PlainEventsQuery?
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct RawEventsQuery {
pub channel: Channel,
@@ -92,6 +91,10 @@ impl RawEventsQuery {
do_test_stream_error: false,
}
}
pub fn channel(&self) -> &Channel {
&self.channel
}
}
#[derive(Clone, Debug)]

View File

@@ -107,7 +107,31 @@ async fn events_conn_handler_inner_try(
}
let mut p1: Pin<Box<dyn Stream<Item = Box<dyn Framable + Send>> + Send>> =
if let Some(conf) = &node_config.node_config.cluster.scylla {
if evq.channel().backend() == "test-adhoc-dyn" {
use items_2::ChannelEvents;
use items_2::Empty;
use netpod::timeunits::MS;
let node_ix = node_config.ix;
if evq.channel().name() == "scalar-i32" {
let mut item = items_2::eventsdim0::EventsDim0::<f32>::empty();
let td = MS * 10;
let mut ts = MS * 17 + MS * td * node_ix as u64;
let mut pulse = 1 + node_ix as u64;
for _ in 0..20 {
item.push(ts, pulse, ts as _);
ts += 3 * td;
pulse += 3;
}
let item = ChannelEvents::Events(Box::new(item) as _);
let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)));
let item = Box::new(item) as _;
let stream = futures_util::stream::iter([item]);
Box::pin(stream)
} else {
let stream = futures_util::stream::empty();
Box::pin(stream)
}
} else if let Some(conf) = &node_config.node_config.cluster.scylla {
// TODO depends in general on the query
// TODO why both in PlainEventsQuery and as separate parameter? Check other usages.
let do_one_before_range = false;

View File

@@ -1,14 +1,13 @@
use super::*;
use disk::eventchunker::EventFull;
use items::frame::make_frame;
use items::Sitemty;
use items_2::ChannelEvents;
use netpod::timeunits::SEC;
use netpod::{Channel, Cluster, Database, DiskIoTune, FileIoBufferSize, NanoRange, Node, NodeConfig, SfDatabuffer};
use netpod::{Channel, Cluster, Database, FileIoBufferSize, NanoRange, Node, NodeConfig, SfDatabuffer};
use tokio::net::TcpListener;
#[test]
fn raw_data_00() {
//taskrun::run(disk::gen::gen_test_data()).unwrap();
let fut = async {
let lis = TcpListener::bind("127.0.0.1:0").await.unwrap();
let mut con = TcpStream::connect(lis.local_addr().unwrap()).await.unwrap();
@@ -50,26 +49,16 @@ fn raw_data_00() {
},
ix: 0,
};
let qu = RawEventsQuery {
channel: Channel {
series: None,
backend: "testbackend".into(),
name: "scalar-i32-be".into(),
},
range: NanoRange {
beg: SEC,
end: SEC * 10,
},
agg_kind: AggKind::Plain,
disk_io_tune: DiskIoTune {
read_sys: netpod::ReadSys::TokioAsyncRead,
read_buffer_len: 1024 * 4,
read_queue_len: 1,
},
do_decompress: true,
do_test_main_error: false,
do_test_stream_error: false,
let channel = Channel {
series: None,
backend: "test-adhoc-dyn".into(),
name: "scalar-i32".into(),
};
let range = NanoRange {
beg: SEC,
end: SEC * 10,
};
let qu = RawEventsQuery::new(channel, range, AggKind::Plain);
let query = EventQueryJsonStringFrame(serde_json::to_string(&qu).unwrap());
let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(query)));
let frame = make_frame(&item).unwrap();
@@ -85,14 +74,14 @@ fn raw_data_00() {
Ok(frame) => match frame {
StreamItem::DataItem(k) => {
eprintln!("{k:?}");
if k.tyid() == items::EVENT_FULL_FRAME_TYPE_ID {
if k.tyid() == items::ITEMS_2_CHANNEL_EVENTS_FRAME_TYPE_ID {
} else if k.tyid() == items::ERROR_FRAME_TYPE_ID {
} else if k.tyid() == items::LOG_FRAME_TYPE_ID {
} else if k.tyid() == items::STATS_FRAME_TYPE_ID {
} else {
panic!("unexpected frame type id {:x}", k.tyid());
}
let item: Result<Sitemty<EventFull>, Error> = decode_frame(&k);
let item: Sitemty<ChannelEvents> = decode_frame(&k).unwrap();
eprintln!("decoded: {:?}", item);
}
StreamItem::Log(_) => todo!(),