From b453d612501dda4111a48b1dd00a94aa11f5262c Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 10 Feb 2023 12:37:39 +0100 Subject: [PATCH] WIP --- items_0/src/items_0.rs | 3 +- items_2/src/channelevents.rs | 89 +++++++++++++++++++--------------- items_2/src/eventsdim0.rs | 26 ---------- items_2/src/eventsdim1.rs | 26 ---------- items_2/src/items_2.rs | 10 ---- items_2/src/merger.rs | 18 +++---- items_2/src/test.rs | 64 +++++++++++++++++------- streams/Cargo.toml | 1 + streams/src/plaineventsjson.rs | 11 +++++ streams/src/test/timebin.rs | 4 +- 10 files changed, 120 insertions(+), 132 deletions(-) diff --git a/items_0/src/items_0.rs b/items_0/src/items_0.rs index 5bdc346..ab8b499 100644 --- a/items_0/src/items_0.rs +++ b/items_0/src/items_0.rs @@ -151,9 +151,8 @@ pub trait Events: fn as_collectable_with_default_mut(&mut self) -> &mut dyn CollectableWithDefault; fn ts_min(&self) -> Option; fn ts_max(&self) -> Option; + // TODO is this used? fn take_new_events_until_ts(&mut self, ts_end: u64) -> Box; - fn move_into_fresh(&mut self, ts_end: u64) -> Box; - fn move_into_existing(&mut self, tgt: &mut Box, ts_end: u64) -> Result<(), ()>; fn new_empty(&self) -> Box; fn drain_into(&mut self, dst: &mut Box, range: (usize, usize)) -> Result<(), ()>; fn find_lowest_index_gt(&self, ts: u64) -> Option; diff --git a/items_2/src/channelevents.rs b/items_2/src/channelevents.rs index d426578..1234ad2 100644 --- a/items_2/src/channelevents.rs +++ b/items_2/src/channelevents.rs @@ -85,7 +85,7 @@ impl ChannelStatusEvent { #[derive(Debug)] pub enum ChannelEvents { Events(Box), - Status(ConnStatusEvent), + Status(Option), } impl FrameTypeInnerStatic for ChannelEvents { @@ -342,7 +342,7 @@ mod serde_channel_events { Ok(Self::Value::Events(x.0)) } VarId::Status => { - let x: ConnStatusEvent = var.newtype_variant()?; + let x: Option = var.newtype_variant()?; Ok(Self::Value::Status(x)) } } @@ -440,7 +440,7 @@ mod test_channel_events_serde { datetime: SystemTime::UNIX_EPOCH, status: crate::channelevents::ConnStatus::Connect, }; - let item = ChannelEvents::Status(status); + let item = ChannelEvents::Status(Some(status)); let opts = bincode_opts(); let mut out = Vec::new(); let mut ser = bincode::Serializer::new(&mut out, opts); @@ -453,7 +453,11 @@ mod test_channel_events_serde { } else { panic!() }; - assert_eq!(item.ts, 567); + if let Some(item) = item { + assert_eq!(item.ts, 567); + } else { + panic!() + } } } @@ -472,7 +476,10 @@ impl MergeableCev for ChannelEvents { use ChannelEvents::*; match self { Events(k) => k.ts_min(), - Status(k) => Some(k.ts), + Status(k) => match k { + Some(k) => Some(k.ts), + None => None, + }, } } @@ -486,40 +493,29 @@ impl crate::merger::Mergeable for ChannelEvents { fn len(&self) -> usize { match self { ChannelEvents::Events(k) => k.len(), - ChannelEvents::Status(_) => 1, + ChannelEvents::Status(k) => match k { + Some(_) => 1, + None => 0, + }, } } fn ts_min(&self) -> Option { match self { ChannelEvents::Events(k) => k.ts_min(), - ChannelEvents::Status(k) => Some(k.ts), + ChannelEvents::Status(k) => match k { + Some(k) => Some(k.ts), + None => None, + }, } } fn ts_max(&self) -> Option { match self { ChannelEvents::Events(k) => k.ts_max(), - ChannelEvents::Status(k) => Some(k.ts), - } - } - - fn move_into_fresh(&mut self, ts_end: u64) -> Self { - match self { - ChannelEvents::Events(k) => ChannelEvents::Events(k.move_into_fresh(ts_end)), - ChannelEvents::Status(k) => ChannelEvents::Status(k.clone()), - } - } - - fn move_into_existing(&mut self, tgt: &mut Self, ts_end: u64) -> Result<(), merger::MergeError> { - match self { - ChannelEvents::Events(k) => match tgt { - ChannelEvents::Events(tgt) => k.move_into_existing(tgt, ts_end), - ChannelEvents::Status(_) => Err(merger::MergeError::NotCompatible), - }, - ChannelEvents::Status(_) => match tgt { - ChannelEvents::Events(_) => Err(merger::MergeError::NotCompatible), - ChannelEvents::Status(_) => Err(merger::MergeError::Full), + ChannelEvents::Status(k) => match k { + Some(k) => Some(k.ts), + None => None, }, } } @@ -539,11 +535,13 @@ impl crate::merger::Mergeable for ChannelEvents { }, ChannelEvents::Status(k) => match dst { ChannelEvents::Events(_) => Err(merger::MergeError::NotCompatible), - ChannelEvents::Status(j) => { - // TODO must have some empty-value for the status container. - *j = k.clone(); - Ok(()) - } + ChannelEvents::Status(j) => match j { + Some(_) => Err(merger::MergeError::Full), + None => { + *j = k.take(); + Ok(()) + } + }, }, } } @@ -552,8 +550,12 @@ impl crate::merger::Mergeable for ChannelEvents { match self { ChannelEvents::Events(k) => k.find_lowest_index_gt(ts), ChannelEvents::Status(k) => { - if k.ts > ts { - Some(0) + if let Some(k) = k { + if k.ts > ts { + Some(0) + } else { + None + } } else { None } @@ -565,8 +567,12 @@ impl crate::merger::Mergeable for ChannelEvents { match self { ChannelEvents::Events(k) => k.find_lowest_index_ge(ts), ChannelEvents::Status(k) => { - if k.ts >= ts { - Some(0) + if let Some(k) = k { + if k.ts >= ts { + Some(0) + } else { + None + } } else { None } @@ -578,8 +584,12 @@ impl crate::merger::Mergeable for ChannelEvents { match self { ChannelEvents::Events(k) => k.find_highest_index_lt(ts), ChannelEvents::Status(k) => { - if k.ts < ts { - Some(0) + if let Some(k) = k { + if k.ts < ts { + Some(0) + } else { + None + } } else { None } @@ -686,9 +696,10 @@ impl crate::timebin::TimeBinnable for ChannelEvents { type TimeBinner = ChannelEventsTimeBinner; fn time_binner_new(&self, edges: Vec, do_time_weight: bool) -> Self::TimeBinner { + // TODO probably wrong? let (binner, status) = match self { ChannelEvents::Events(_events) => (None, ConnStatus::Connect), - ChannelEvents::Status(status) => (None, status.status.clone()), + ChannelEvents::Status(_status) => (None, ConnStatus::Connect), }; ChannelEventsTimeBinner { edges, diff --git a/items_2/src/eventsdim0.rs b/items_2/src/eventsdim0.rs index 2274626..54b5e14 100644 --- a/items_2/src/eventsdim0.rs +++ b/items_2/src/eventsdim0.rs @@ -778,32 +778,6 @@ impl Events for EventsDim0 { Box::new(ret) } - fn move_into_fresh(&mut self, ts_end: u64) -> Box { - // TODO improve the search - let n1 = self.tss.iter().take_while(|&&x| x <= ts_end).count(); - let tss = self.tss.drain(..n1).collect(); - let pulses = self.pulses.drain(..n1).collect(); - let values = self.values.drain(..n1).collect(); - let ret = Self { tss, pulses, values }; - Box::new(ret) - } - - fn move_into_existing(&mut self, tgt: &mut Box, ts_end: u64) -> Result<(), ()> { - // TODO as_any and as_any_mut are declared on unrealted traits. Simplify. - if let Some(tgt) = tgt.as_mut().as_any_mut().downcast_mut::() { - // TODO improve the search - let n1 = self.tss.iter().take_while(|&&x| x <= ts_end).count(); - // TODO make it harder to forget new members when the struct may get modified in the future - tgt.tss.extend(self.tss.drain(..n1)); - tgt.pulses.extend(self.pulses.drain(..n1)); - tgt.values.extend(self.values.drain(..n1)); - Ok(()) - } else { - eprintln!("downcast to EventsDim0 FAILED"); - Err(()) - } - } - fn new_empty(&self) -> Box { Box::new(Self::empty()) } diff --git a/items_2/src/eventsdim1.rs b/items_2/src/eventsdim1.rs index 21b8dd6..4e55073 100644 --- a/items_2/src/eventsdim1.rs +++ b/items_2/src/eventsdim1.rs @@ -730,32 +730,6 @@ impl Events for EventsDim1 { Box::new(ret) } - fn move_into_fresh(&mut self, ts_end: u64) -> Box { - // TODO improve the search - let n1 = self.tss.iter().take_while(|&&x| x <= ts_end).count(); - let tss = self.tss.drain(..n1).collect(); - let pulses = self.pulses.drain(..n1).collect(); - let values = self.values.drain(..n1).collect(); - let ret = Self { tss, pulses, values }; - Box::new(ret) - } - - fn move_into_existing(&mut self, tgt: &mut Box, ts_end: u64) -> Result<(), ()> { - // TODO as_any and as_any_mut are declared on unrealted traits. Simplify. - if let Some(tgt) = tgt.as_mut().as_any_mut().downcast_mut::() { - // TODO improve the search - let n1 = self.tss.iter().take_while(|&&x| x <= ts_end).count(); - // TODO make it harder to forget new members when the struct may get modified in the future - tgt.tss.extend(self.tss.drain(..n1)); - tgt.pulses.extend(self.pulses.drain(..n1)); - tgt.values.extend(self.values.drain(..n1)); - Ok(()) - } else { - eprintln!("downcast to EventsDim0 FAILED"); - Err(()) - } - } - fn new_empty(&self) -> Box { Box::new(Self::empty()) } diff --git a/items_2/src/items_2.rs b/items_2/src/items_2.rs index 6b3bbe5..d16d443 100644 --- a/items_2/src/items_2.rs +++ b/items_2/src/items_2.rs @@ -182,16 +182,6 @@ impl crate::merger::Mergeable for Box { self.as_ref().ts_max() } - fn move_into_fresh(&mut self, ts_end: u64) -> Self { - self.as_mut().move_into_fresh(ts_end) - } - - fn move_into_existing(&mut self, tgt: &mut Self, ts_end: u64) -> Result<(), merger::MergeError> { - self.as_mut() - .move_into_existing(tgt, ts_end) - .map_err(|()| merger::MergeError::NotCompatible) - } - fn new_empty(&self) -> Self { self.as_ref().new_empty() } diff --git a/items_2/src/merger.rs b/items_2/src/merger.rs index 59376fb..c05e99e 100644 --- a/items_2/src/merger.rs +++ b/items_2/src/merger.rs @@ -1,13 +1,17 @@ use crate::Error; -use futures_util::{Stream, StreamExt}; +use futures_util::Stream; +use futures_util::StreamExt; use items::sitem_data; -use items::{RangeCompletableItem, Sitemty, StreamItem}; +use items::RangeCompletableItem; +use items::Sitemty; +use items::StreamItem; use netpod::log::*; use std::collections::VecDeque; use std::fmt; -use std::ops::{ControlFlow, RangeBounds}; +use std::ops::ControlFlow; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::Context; +use std::task::Poll; #[allow(unused)] macro_rules! trace2 { @@ -43,12 +47,6 @@ pub trait Mergeable: fmt::Debug + Unpin { fn len(&self) -> usize; fn ts_min(&self) -> Option; fn ts_max(&self) -> Option; - - // TODO remove, superseded. - fn move_into_fresh(&mut self, ts_end: u64) -> Rhs; - fn move_into_existing(&mut self, tgt: &mut Rhs, ts_end: u64) -> Result<(), MergeError>; - - // TODO: split the logic into: make fresh container, and a single drain_into method. Or is there any advantage in having both? fn new_empty(&self) -> Self; fn drain_into(&mut self, dst: &mut Self, range: (usize, usize)) -> Result<(), MergeError>; fn find_lowest_index_gt(&self, ts: u64) -> Option; diff --git a/items_2/src/test.rs b/items_2/src/test.rs index 7961b4d..a35dc9b 100644 --- a/items_2/src/test.rs +++ b/items_2/src/test.rs @@ -1,18 +1,33 @@ +use crate::binned_collected; use crate::binsdim0::BinsDim0CollectedResult; -use crate::channelevents::{ConnStatus, ConnStatusEvent}; +use crate::channelevents::ConnStatus; +use crate::channelevents::ConnStatusEvent; use crate::eventsdim0::EventsDim0; -use crate::merger::{Mergeable, Merger}; +use crate::merger::Mergeable; +use crate::merger::Merger; use crate::merger_cev::ChannelEventsMerger; +use crate::runfut; use crate::testgen::make_some_boxed_d0_f32; +use crate::ChannelEvents; use crate::Error; -use crate::{binned_collected, runfut, ChannelEvents, Events, IsoDateTime}; -use chrono::{TimeZone, Utc}; -use futures_util::{stream, StreamExt}; -use items::{sitem_data, RangeCompletableItem, Sitemty, StreamItem}; +use crate::Events; +use crate::IsoDateTime; +use chrono::TimeZone; +use chrono::Utc; +use futures_util::stream; +use futures_util::StreamExt; +use items::sitem_data; +use items::RangeCompletableItem; +use items::Sitemty; +use items::StreamItem; use items_0::Empty; use netpod::log::*; use netpod::timeunits::*; -use netpod::{AggKind, BinnedRange, NanoRange, ScalarType, Shape}; +use netpod::AggKind; +use netpod::BinnedRange; +use netpod::NanoRange; +use netpod::ScalarType; +use netpod::Shape; use std::time::Duration; #[test] @@ -22,10 +37,25 @@ fn items_move_events() { let mut v1 = v0.clone(); eprintln!("{v1:?}"); eprintln!("{}", v1.len()); - let mut v2 = v1.move_into_fresh(4); + let mut v2 = v1.new_empty(); + match v1.find_lowest_index_gt(4) { + Some(ilgt) => { + v1.drain_into(&mut v2, (0, ilgt)).unwrap(); + } + None => { + v1.drain_into(&mut v2, (0, v1.len())).unwrap(); + } + } eprintln!("{}", v1.len()); eprintln!("{}", v2.len()); - v1.move_into_existing(&mut v2, u64::MAX).unwrap(); + match v1.find_lowest_index_gt(u64::MAX) { + Some(ilgt) => { + v1.drain_into(&mut v2, (0, ilgt)).unwrap(); + } + None => { + v1.drain_into(&mut v2, (0, v1.len())).unwrap(); + } + } eprintln!("{}", v1.len()); eprintln!("{}", v2.len()); eprintln!("{v1:?}"); @@ -62,9 +92,9 @@ fn items_merge_01() { let evs1 = make_some_boxed_d0_f32(10, SEC * 2, SEC * 2, 0, 828764893); let v0 = ChannelEvents::Events(evs0); let v1 = ChannelEvents::Events(evs1); - let v2 = ChannelEvents::Status(ConnStatusEvent::new(MS * 100, ConnStatus::Connect)); - let v3 = ChannelEvents::Status(ConnStatusEvent::new(MS * 2300, ConnStatus::Disconnect)); - let v4 = ChannelEvents::Status(ConnStatusEvent::new(MS * 2800, ConnStatus::Connect)); + let v2 = ChannelEvents::Status(Some(ConnStatusEvent::new(MS * 100, ConnStatus::Connect))); + let v3 = ChannelEvents::Status(Some(ConnStatusEvent::new(MS * 2300, ConnStatus::Disconnect))); + let v4 = ChannelEvents::Status(Some(ConnStatusEvent::new(MS * 2800, ConnStatus::Connect))); let stream0 = Box::pin(stream::iter(vec![sitem_data(v0)])); let stream1 = Box::pin(stream::iter(vec![sitem_data(v1)])); let stream2 = Box::pin(stream::iter(vec![sitem_data(v2), sitem_data(v3), sitem_data(v4)])); @@ -97,9 +127,9 @@ fn items_merge_02() { let evs1 = make_some_boxed_d0_f32(100, SEC * 2, SEC * 2, 0, 828764893); let v0 = ChannelEvents::Events(evs0); let v1 = ChannelEvents::Events(evs1); - let v2 = ChannelEvents::Status(ConnStatusEvent::new(MS * 100, ConnStatus::Connect)); - let v3 = ChannelEvents::Status(ConnStatusEvent::new(MS * 2300, ConnStatus::Disconnect)); - let v4 = ChannelEvents::Status(ConnStatusEvent::new(MS * 2800, ConnStatus::Connect)); + let v2 = ChannelEvents::Status(Some(ConnStatusEvent::new(MS * 100, ConnStatus::Connect))); + let v3 = ChannelEvents::Status(Some(ConnStatusEvent::new(MS * 2300, ConnStatus::Disconnect))); + let v4 = ChannelEvents::Status(Some(ConnStatusEvent::new(MS * 2800, ConnStatus::Connect))); let stream0 = Box::pin(stream::iter(vec![sitem_data(v0)])); let stream1 = Box::pin(stream::iter(vec![sitem_data(v1)])); let stream2 = Box::pin(stream::iter(vec![sitem_data(v2), sitem_data(v3), sitem_data(v4)])); @@ -241,7 +271,7 @@ fn merge03() { status: ConnStatus::Disconnect, }; let item: Sitemty = Ok(StreamItem::DataItem(RangeCompletableItem::Data( - ChannelEvents::Status(ev), + ChannelEvents::Status(Some(ev)), ))); vec![item] }; @@ -253,7 +283,7 @@ fn merge03() { status: ConnStatus::Disconnect, }; let item: Sitemty = Ok(StreamItem::DataItem(RangeCompletableItem::Data( - ChannelEvents::Status(ev), + ChannelEvents::Status(Some(ev)), ))); vec![item] }; diff --git a/streams/Cargo.toml b/streams/Cargo.toml index e02d9b3..13fd87f 100644 --- a/streams/Cargo.toml +++ b/streams/Cargo.toml @@ -17,6 +17,7 @@ arrayref = "0.3.6" crc32fast = "1.3.2" byteorder = "1.4.3" chrono = { version = "0.4.19", features = ["serde"] } +wasmer = { version = "3.1.1", default-features = false, features = ["sys", "cranelift"] } err = { path = "../err" } netpod = { path = "../netpod" } items = { path = "../items" } diff --git a/streams/src/plaineventsjson.rs b/streams/src/plaineventsjson.rs index 4e49c6f..781a4ce 100644 --- a/streams/src/plaineventsjson.rs +++ b/streams/src/plaineventsjson.rs @@ -17,6 +17,17 @@ pub async fn plain_events_json( chconf: &ChConf, cluster: &Cluster, ) -> Result { + if query.channel().name() == "wasm-test-01" { + use wasmer::Value; + let wasm = query.channel().name().as_bytes(); + let mut store = wasmer::Store::default(); + let module = wasmer::Module::new(&store, wasm).unwrap(); + let import_object = wasmer::imports! {}; + let instance = wasmer::Instance::new(&mut store, &module, &import_object).unwrap(); + let add_one = instance.exports.get_function("event_transform").unwrap(); + let result = add_one.call(&mut store, &[Value::I32(42)]).unwrap(); + assert_eq!(result[0], Value::I32(43)); + } // TODO remove magic constant let deadline = Instant::now() + query.timeout() + Duration::from_millis(1000); let events_max = query.events_max(); diff --git a/streams/src/test/timebin.rs b/streams/src/test/timebin.rs index 6dd18ee..3d2e311 100644 --- a/streams/src/test/timebin.rs +++ b/streams/src/test/timebin.rs @@ -16,8 +16,8 @@ fn time_bin_00() { let edges = [0, 1, 2, 3, 4, 5, 6, 7, 8].into_iter().map(|x| SEC * x).collect(); let evs0 = make_some_boxed_d0_f32(10, SEC * 1, MS * 500, 0, 1846713782); let v0 = ChannelEvents::Events(evs0); - let v2 = ChannelEvents::Status(ConnStatusEvent::new(MS * 100, ConnStatus::Connect)); - let v4 = ChannelEvents::Status(ConnStatusEvent::new(MS * 6000, ConnStatus::Disconnect)); + let v2 = ChannelEvents::Status(Some(ConnStatusEvent::new(MS * 100, ConnStatus::Connect))); + let v4 = ChannelEvents::Status(Some(ConnStatusEvent::new(MS * 6000, ConnStatus::Disconnect))); let stream0 = Box::pin(stream::iter(vec![ // sitem_data(v2),