From b913fcc8c640f8694d62c4bf233307f2614a20eb Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 7 Nov 2024 20:55:38 +0100 Subject: [PATCH] factor out items-0 --- crates/commonio/Cargo.toml | 2 +- crates/daqbufp2/Cargo.toml | 2 +- crates/disk/Cargo.toml | 2 +- crates/httpclient/Cargo.toml | 2 +- crates/httpret/Cargo.toml | 2 +- crates/items_0/Cargo.toml | 20 -- crates/items_0/src/collect_s.rs | 255 ------------------------- crates/items_0/src/container.rs | 11 -- crates/items_0/src/framable.rs | 19 -- crates/items_0/src/isodate.rs | 41 ---- crates/items_0/src/items_0.rs | 287 ---------------------------- crates/items_0/src/overlap.rs | 7 - crates/items_0/src/scalar_ops.rs | 309 ------------------------------- crates/items_0/src/streamitem.rs | 219 ---------------------- crates/items_0/src/subfr.rs | 57 ------ crates/items_0/src/test.rs | 87 --------- crates/items_0/src/timebin.rs | 129 ------------- crates/items_0/src/transform.rs | 166 ----------------- crates/items_0/src/vecpreview.rs | 53 ------ crates/items_2/Cargo.toml | 2 +- crates/nodenet/Cargo.toml | 3 +- crates/query/Cargo.toml | 2 +- crates/scyllaconn/Cargo.toml | 2 +- crates/streamio/Cargo.toml | 2 +- crates/streams/Cargo.toml | 2 +- 25 files changed, 11 insertions(+), 1672 deletions(-) delete mode 100644 crates/items_0/Cargo.toml delete mode 100644 crates/items_0/src/collect_s.rs delete mode 100644 crates/items_0/src/container.rs delete mode 100644 crates/items_0/src/framable.rs delete mode 100644 crates/items_0/src/isodate.rs delete mode 100644 crates/items_0/src/items_0.rs delete mode 100644 crates/items_0/src/overlap.rs delete mode 100644 crates/items_0/src/scalar_ops.rs delete mode 100644 crates/items_0/src/streamitem.rs delete mode 100644 crates/items_0/src/subfr.rs delete mode 100644 crates/items_0/src/test.rs delete mode 100644 crates/items_0/src/timebin.rs delete mode 100644 crates/items_0/src/transform.rs delete mode 100644 crates/items_0/src/vecpreview.rs diff --git a/crates/commonio/Cargo.toml b/crates/commonio/Cargo.toml index 0aa1f75..a2431d3 100644 --- a/crates/commonio/Cargo.toml +++ b/crates/commonio/Cargo.toml @@ -20,5 +20,5 @@ crc32fast = "1.2" daqbuf-err = { path = "../../../daqbuf-err" } netpod = { path = "../../../daqbuf-netpod", package = "daqbuf-netpod" } taskrun = { path = "../taskrun" } -items_0 = { path = "../items_0" } +items_0 = { path = "../../../daqbuf-items-0", package = "daqbuf-items-0" } items_proc = { path = "../items_proc" } diff --git a/crates/daqbufp2/Cargo.toml b/crates/daqbufp2/Cargo.toml index b298d61..c0cdda3 100644 --- a/crates/daqbufp2/Cargo.toml +++ b/crates/daqbufp2/Cargo.toml @@ -28,7 +28,7 @@ query = { path = "../query" } httpret = { path = "../httpret" } httpclient = { path = "../httpclient" } disk = { path = "../disk" } -items_0 = { path = "../items_0" } +items_0 = { path = "../../../daqbuf-items-0", package = "daqbuf-items-0" } items_2 = { path = "../items_2" } streams = { path = "../streams" } parse = { path = "../parse" } diff --git a/crates/disk/Cargo.toml b/crates/disk/Cargo.toml index dddfe2b..f660bb2 100644 --- a/crates/disk/Cargo.toml +++ b/crates/disk/Cargo.toml @@ -36,7 +36,7 @@ netpod = { path = "../../../daqbuf-netpod", package = "daqbuf-netpod" } query = { path = "../query" } dbconn = { path = "../dbconn" } parse = { path = "../parse" } -items_0 = { path = "../items_0" } +items_0 = { path = "../../../daqbuf-items-0", package = "daqbuf-items-0" } items_2 = { path = "../items_2" } streams = { path = "../streams" } streamio = { path = "../streamio" } diff --git a/crates/httpclient/Cargo.toml b/crates/httpclient/Cargo.toml index df507e5..a5d94d8 100644 --- a/crates/httpclient/Cargo.toml +++ b/crates/httpclient/Cargo.toml @@ -23,7 +23,7 @@ daqbuf-err = { path = "../../../daqbuf-err" } netpod = { path = "../../../daqbuf-netpod", package = "daqbuf-netpod" } parse = { path = "../parse" } streams = { path = "../streams" } -thiserror = "0.0.1" +thiserror = "=0.0.1" [patch.crates-io] thiserror = { git = "https://github.com/dominikwerder/thiserror.git", branch = "cstm" } diff --git a/crates/httpret/Cargo.toml b/crates/httpret/Cargo.toml index db52481..ce13b9a 100644 --- a/crates/httpret/Cargo.toml +++ b/crates/httpret/Cargo.toml @@ -33,7 +33,7 @@ netpod = { path = "../../../daqbuf-netpod", package = "daqbuf-netpod" } query = { path = "../query" } dbconn = { path = "../dbconn" } disk = { path = "../disk" } -items_0 = { path = "../items_0" } +items_0 = { path = "../../../daqbuf-items-0", package = "daqbuf-items-0" } items_2 = { path = "../items_2" } parse = { path = "../parse" } streams = { path = "../streams" } diff --git a/crates/items_0/Cargo.toml b/crates/items_0/Cargo.toml deleted file mode 100644 index c3a5022..0000000 --- a/crates/items_0/Cargo.toml +++ /dev/null @@ -1,20 +0,0 @@ -[package] -name = "items_0" -version = "0.0.2" -authors = ["Dominik Werder "] -edition = "2021" - -[lib] -path = "src/items_0.rs" - -[dependencies] -serde = { version = "1.0", features = ["derive"] } -erased-serde = "0.4" -typetag = "0.2.14" -serde_json = "1.0" -bincode = "1.3.3" -bytes = "1.2.1" -futures-util = "0.3.24" -chrono = { version = "0.4.19", features = ["serde"] } -netpod = { path = "../../../daqbuf-netpod", package = "daqbuf-netpod" } -daqbuf-err = { path = "../../../daqbuf-err" } diff --git a/crates/items_0/src/collect_s.rs b/crates/items_0/src/collect_s.rs deleted file mode 100644 index f57cee6..0000000 --- a/crates/items_0/src/collect_s.rs +++ /dev/null @@ -1,255 +0,0 @@ -use crate::container::ByteEstimate; -use crate::timebin::BinningggContainerBinsDyn; -use crate::AsAnyMut; -use crate::AsAnyRef; -use crate::Events; -use crate::TypeName; -use crate::WithLen; -use daqbuf_err as err; -use err::Error; -use netpod::log::*; -use netpod::range::evrange::SeriesRange; -use netpod::BinnedRangeEnum; -use serde::Serialize; -use std::any; -use std::any::Any; -use std::fmt; - -// TODO check usage of this trait -pub trait ToJsonBytes { - fn to_json_bytes(&self) -> Result, Error>; -} - -pub trait ToJsonResult: fmt::Debug + AsAnyRef + AsAnyMut + Send { - fn to_json_value(&self) -> Result; -} - -impl AsAnyRef for serde_json::Value { - fn as_any_ref(&self) -> &dyn Any { - self - } -} - -impl AsAnyMut for serde_json::Value { - fn as_any_mut(&mut self) -> &mut dyn Any { - self - } -} - -impl ToJsonResult for serde_json::Value { - fn to_json_value(&self) -> Result { - Ok(self.clone()) - } -} - -impl ToJsonBytes for serde_json::Value { - fn to_json_bytes(&self) -> Result, Error> { - Ok(serde_json::to_vec(self)?) - } -} - -pub trait CollectedDyn: fmt::Debug + TypeName + Send + AsAnyRef + WithLen + ToJsonResult {} - -impl ToJsonResult for Box { - fn to_json_value(&self) -> Result { - ToJsonResult::to_json_value(self.as_ref()) - } -} - -impl WithLen for Box { - fn len(&self) -> usize { - self.as_ref().len() - } -} - -impl TypeName for Box { - fn type_name(&self) -> String { - self.as_ref().type_name() - } -} - -impl CollectedDyn for Box {} - -pub trait CollectorTy: fmt::Debug + Send + Unpin + WithLen + ByteEstimate { - type Input: CollectableDyn; - type Output: CollectedDyn + ToJsonResult + Serialize; - - fn ingest(&mut self, src: &mut Self::Input); - fn set_range_complete(&mut self); - fn set_timed_out(&mut self); - fn set_continue_at_here(&mut self); - - // TODO use this crate's Error instead: - fn result(&mut self, range: Option, binrange: Option) -> Result; -} - -pub trait CollectorDyn: fmt::Debug + Send + WithLen + ByteEstimate { - fn ingest(&mut self, src: &mut dyn CollectableDyn); - fn set_range_complete(&mut self); - fn set_timed_out(&mut self); - fn set_continue_at_here(&mut self); - // TODO factor the required parameters into new struct? Generic over events or binned? - fn result( - &mut self, - range: Option, - binrange: Option, - ) -> Result, Error>; -} - -impl CollectorDyn for T -where - T: fmt::Debug + CollectorTy + 'static, -{ - fn ingest(&mut self, src: &mut dyn CollectableDyn) { - if let Some(src) = src.as_any_mut().downcast_mut::<::Input>() { - trace!("sees incoming &mut ref"); - T::ingest(self, src) - } else { - if let Some(src) = src.as_any_mut().downcast_mut::::Input>>() { - trace!("sees incoming &mut Box"); - T::ingest(self, src) - } else { - error!( - "No idea what this is. Expect: {} input {} got: {} {:?}", - any::type_name::(), - any::type_name::<::Input>(), - src.type_name(), - src - ); - } - } - } - - fn set_range_complete(&mut self) { - T::set_range_complete(self) - } - - fn set_timed_out(&mut self) { - T::set_timed_out(self) - } - - fn set_continue_at_here(&mut self) { - T::set_continue_at_here(self) - } - - fn result( - &mut self, - range: Option, - binrange: Option, - ) -> Result, Error> { - let ret = T::result(self, range, binrange)?; - Ok(Box::new(ret)) - } -} - -// TODO rename to `Typed` -pub trait CollectableType: fmt::Debug + WithLen + AsAnyRef + AsAnyMut + TypeName + Send { - type Collector: CollectorTy; - fn new_collector() -> Self::Collector; -} - -#[derive(Debug)] -pub struct CollectorForDyn { - inner: Box, -} - -impl WithLen for CollectorForDyn { - fn len(&self) -> usize { - todo!() - } -} - -impl ByteEstimate for CollectorForDyn { - fn byte_estimate(&self) -> u64 { - todo!() - } -} - -impl CollectorDyn for CollectorForDyn { - fn ingest(&mut self, src: &mut dyn CollectableDyn) { - todo!() - } - - fn set_range_complete(&mut self) { - todo!() - } - - fn set_timed_out(&mut self) { - todo!() - } - - fn set_continue_at_here(&mut self) { - todo!() - } - - fn result( - &mut self, - range: Option, - binrange: Option, - ) -> Result, Error> { - todo!() - } -} - -pub trait CollectableDyn: fmt::Debug + WithLen + AsAnyRef + AsAnyMut + TypeName + Send { - fn new_collector(&self) -> Box; -} - -impl TypeName for Box { - fn type_name(&self) -> String { - BinningggContainerBinsDyn::type_name(self.as_ref()).into() - } -} - -impl WithLen for Box { - fn len(&self) -> usize { - WithLen::len(self.as_ref()) - } -} - -impl CollectableDyn for Box { - fn new_collector(&self) -> Box { - self.as_ref().new_collector() - } -} - -impl TypeName for Box { - fn type_name(&self) -> String { - self.as_ref().type_name() - } -} - -impl CollectableDyn for Box { - fn new_collector(&self) -> Box { - self.as_ref().new_collector() - } -} - -impl CollectableDyn for T -where - T: CollectableType + 'static, -{ - fn new_collector(&self) -> Box { - Box::new(T::new_collector()) - } -} - -impl TypeName for Box { - fn type_name(&self) -> String { - self.as_ref().type_name() - } -} - -// TODO do this with some blanket impl: -impl WithLen for Box { - fn len(&self) -> usize { - WithLen::len(self.as_ref()) - } -} - -// TODO do this with some blanket impl: -impl CollectableDyn for Box { - fn new_collector(&self) -> Box { - CollectableDyn::new_collector(self.as_ref()) - } -} diff --git a/crates/items_0/src/container.rs b/crates/items_0/src/container.rs deleted file mode 100644 index e26c1f7..0000000 --- a/crates/items_0/src/container.rs +++ /dev/null @@ -1,11 +0,0 @@ -use crate::Events; - -pub trait ByteEstimate { - fn byte_estimate(&self) -> u64; -} - -impl ByteEstimate for Box { - fn byte_estimate(&self) -> u64 { - self.as_ref().byte_estimate() - } -} diff --git a/crates/items_0/src/framable.rs b/crates/items_0/src/framable.rs deleted file mode 100644 index 5fd10cf..0000000 --- a/crates/items_0/src/framable.rs +++ /dev/null @@ -1,19 +0,0 @@ -// Required for any inner type of Sitemty. -pub trait FrameTypeInnerStatic { - const FRAME_TYPE_ID: u32; -} - -// To be implemented by the T of Sitemty, e.g. ScalarEvents. -pub trait FrameTypeInnerDyn { - // TODO check actual usage of this - fn frame_type_id(&self) -> u32; -} - -impl FrameTypeInnerDyn for T -where - T: FrameTypeInnerStatic, -{ - fn frame_type_id(&self) -> u32 { - ::FRAME_TYPE_ID - } -} diff --git a/crates/items_0/src/isodate.rs b/crates/items_0/src/isodate.rs deleted file mode 100644 index 18abefd..0000000 --- a/crates/items_0/src/isodate.rs +++ /dev/null @@ -1,41 +0,0 @@ -use chrono::DateTime; -use chrono::TimeZone; -use chrono::Utc; -use netpod::DATETIME_FMT_3MS; -use serde::Deserialize; -use serde::Serialize; -use serde::Serializer; - -#[derive(Debug, Clone, PartialEq, Deserialize)] -pub struct IsoDateTime(DateTime); - -impl IsoDateTime { - pub fn from_unix_millis(ms: u64) -> Self { - // let datetime = chrono::DateTime::from_timestamp_millis(ms as i64).unwrap(); - // Self(datetime) - IsoDateTime( - Utc.timestamp_millis_opt(ms as i64) - .earliest() - .unwrap_or(Utc.timestamp_nanos(0)), - ) - } - - pub fn from_ns_u64(ts: u64) -> Self { - IsoDateTime(Utc.timestamp_nanos(ts as i64)) - } -} - -impl Serialize for IsoDateTime { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - serializer.serialize_str(&self.0.format(DATETIME_FMT_3MS).to_string()) - } -} - -pub fn make_iso_ts(tss: &[u64]) -> Vec { - tss.iter() - .map(|&k| IsoDateTime(Utc.timestamp_nanos(k as i64))) - .collect() -} diff --git a/crates/items_0/src/items_0.rs b/crates/items_0/src/items_0.rs deleted file mode 100644 index 10877a7..0000000 --- a/crates/items_0/src/items_0.rs +++ /dev/null @@ -1,287 +0,0 @@ -pub mod collect_s; -pub mod container; -pub mod framable; -pub mod isodate; -pub mod overlap; -pub mod scalar_ops; -pub mod streamitem; -pub mod subfr; -pub mod test; -pub mod timebin; -pub mod transform; -pub mod vecpreview; - -pub mod bincode { - pub use bincode::*; -} - -pub use futures_util; - -use collect_s::CollectableDyn; -use container::ByteEstimate; -use daqbuf_err as err; -use std::any::Any; -use std::collections::VecDeque; -use std::fmt; -use timebin::BinningggContainerEventsDyn; - -pub trait WithLen { - fn len(&self) -> usize; -} - -impl WithLen for bytes::Bytes { - fn len(&self) -> usize { - self.len() - } -} - -pub trait Empty { - fn empty() -> Self; -} - -pub trait Resettable { - fn reset(&mut self); -} - -pub trait Appendable: Empty + WithLen { - fn push(&mut self, ts: u64, pulse: u64, value: STY); -} - -pub trait Extendable: Empty + WithLen { - fn extend_from(&mut self, src: &mut Self); -} - -pub trait TypeName { - fn type_name(&self) -> String; -} - -pub trait AppendEmptyBin { - fn append_empty_bin(&mut self, ts1: u64, ts2: u64); -} - -// TODO rename to make it clear that this moves. Better use drain-into or something similar. -pub trait AppendAllFrom { - fn append_all_from(&mut self, src: &mut Self); -} - -// TODO check usage, probably only for legacy -pub trait HasNonemptyFirstBin { - fn has_nonempty_first_bin(&self) -> bool; -} - -pub trait AsAnyRef { - fn as_any_ref(&self) -> &dyn Any; -} - -pub trait AsAnyMut { - fn as_any_mut(&mut self) -> &mut dyn Any; -} - -impl AsAnyRef for Box -where - T: AsAnyRef + ?Sized, -{ - fn as_any_ref(&self) -> &dyn Any { - self.as_ref().as_any_ref() - } -} - -impl AsAnyMut for Box -where - T: AsAnyMut + ?Sized, -{ - fn as_any_mut(&mut self) -> &mut dyn Any { - self.as_mut().as_any_mut() - } -} - -#[derive(Debug)] -pub enum MergeError { - NotCompatible, - Full, -} - -impl From for err::Error { - fn from(e: MergeError) -> Self { - e.to_string().into() - } -} - -impl fmt::Display for MergeError { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - write!(fmt, "{:?}", self) - } -} - -impl std::error::Error for MergeError {} - -// TODO can I remove the Any bound? - -/// Container of some form of events, for use as trait object. -pub trait Events: - fmt::Debug + TypeName + Any + CollectableDyn + WithLen + ByteEstimate + Send + erased_serde::Serialize + EventsNonObj -{ - fn verify(&self) -> bool; - fn output_info(&self) -> String; - fn as_collectable_mut(&mut self) -> &mut dyn CollectableDyn; - fn as_collectable_with_default_ref(&self) -> &dyn CollectableDyn; - fn as_collectable_with_default_mut(&mut self) -> &mut dyn CollectableDyn; - fn ts_min(&self) -> Option; - fn ts_max(&self) -> Option; - // TODO is this used? - fn take_new_events_until_ts(&mut self, ts_end: u64) -> Box; - fn new_empty_evs(&self) -> Box; - fn drain_into_evs(&mut self, dst: &mut dyn Events, range: (usize, usize)) -> Result<(), MergeError>; - fn find_lowest_index_gt_evs(&self, ts: u64) -> Option; - fn find_lowest_index_ge_evs(&self, ts: u64) -> Option; - fn find_highest_index_lt_evs(&self, ts: u64) -> Option; - fn clone_dyn(&self) -> Box; - fn partial_eq_dyn(&self, other: &dyn Events) -> bool; - fn serde_id(&self) -> &'static str; - fn nty_id(&self) -> u32; - fn tss(&self) -> &VecDeque; - fn pulses(&self) -> &VecDeque; - fn frame_type_id(&self) -> u32; - fn to_min_max_avg(&mut self) -> Box; - fn to_json_string(&self) -> String; - fn to_json_vec_u8(&self) -> Vec; - fn to_cbor_vec_u8(&self) -> Vec; - fn clear(&mut self); - // TODO: can not name EventsDim0 from here, so use trait object for now. Anyway is a workaround. - fn to_dim0_f32_for_binning(&self) -> Box; - fn to_container_events(&self) -> Box; -} - -impl WithLen for Box { - fn len(&self) -> usize { - self.as_ref().len() - } -} - -pub trait EventsNonObj { - fn into_tss_pulses(self: Box) -> (VecDeque, VecDeque); -} - -erased_serde::serialize_trait_object!(Events); - -impl PartialEq for Box { - fn eq(&self, other: &Self) -> bool { - Events::partial_eq_dyn(self.as_ref(), other.as_ref()) - } -} - -impl EventsNonObj for Box { - fn into_tss_pulses(self: Box) -> (VecDeque, VecDeque) { - todo!() - } -} - -impl Events for Box { - fn verify(&self) -> bool { - Events::verify(self.as_ref()) - } - - fn output_info(&self) -> String { - Events::output_info(self.as_ref()) - } - - fn as_collectable_mut(&mut self) -> &mut dyn CollectableDyn { - Events::as_collectable_mut(self.as_mut()) - } - - fn as_collectable_with_default_ref(&self) -> &dyn CollectableDyn { - Events::as_collectable_with_default_ref(self.as_ref()) - } - - fn as_collectable_with_default_mut(&mut self) -> &mut dyn CollectableDyn { - Events::as_collectable_with_default_mut(self.as_mut()) - } - - fn ts_min(&self) -> Option { - Events::ts_min(self.as_ref()) - } - - fn ts_max(&self) -> Option { - Events::ts_max(self.as_ref()) - } - - fn take_new_events_until_ts(&mut self, ts_end: u64) -> Box { - Events::take_new_events_until_ts(self.as_mut(), ts_end) - } - - fn new_empty_evs(&self) -> Box { - Events::new_empty_evs(self.as_ref()) - } - - fn drain_into_evs(&mut self, dst: &mut dyn Events, range: (usize, usize)) -> Result<(), MergeError> { - Events::drain_into_evs(self.as_mut(), dst, range) - } - - fn find_lowest_index_gt_evs(&self, ts: u64) -> Option { - Events::find_lowest_index_gt_evs(self.as_ref(), ts) - } - - fn find_lowest_index_ge_evs(&self, ts: u64) -> Option { - Events::find_lowest_index_ge_evs(self.as_ref(), ts) - } - - fn find_highest_index_lt_evs(&self, ts: u64) -> Option { - Events::find_highest_index_lt_evs(self.as_ref(), ts) - } - - fn clone_dyn(&self) -> Box { - Events::clone_dyn(self.as_ref()) - } - - fn partial_eq_dyn(&self, other: &dyn Events) -> bool { - Events::partial_eq_dyn(self.as_ref(), other) - } - - fn serde_id(&self) -> &'static str { - Events::serde_id(self.as_ref()) - } - - fn nty_id(&self) -> u32 { - Events::nty_id(self.as_ref()) - } - - fn tss(&self) -> &VecDeque { - Events::tss(self.as_ref()) - } - - fn pulses(&self) -> &VecDeque { - Events::pulses(self.as_ref()) - } - - fn frame_type_id(&self) -> u32 { - Events::frame_type_id(self.as_ref()) - } - - fn to_min_max_avg(&mut self) -> Box { - Events::to_min_max_avg(self.as_mut()) - } - - fn to_json_string(&self) -> String { - Events::to_json_string(self.as_ref()) - } - - fn to_json_vec_u8(&self) -> Vec { - Events::to_json_vec_u8(self.as_ref()) - } - - fn to_cbor_vec_u8(&self) -> Vec { - Events::to_cbor_vec_u8(self.as_ref()) - } - - fn clear(&mut self) { - Events::clear(self.as_mut()) - } - - fn to_dim0_f32_for_binning(&self) -> Box { - Events::to_dim0_f32_for_binning(self.as_ref()) - } - - fn to_container_events(&self) -> Box { - Events::to_container_events(self.as_ref()) - } -} diff --git a/crates/items_0/src/overlap.rs b/crates/items_0/src/overlap.rs deleted file mode 100644 index 2943395..0000000 --- a/crates/items_0/src/overlap.rs +++ /dev/null @@ -1,7 +0,0 @@ -// TODO rename, no more deque involved -pub trait HasTimestampDeque { - fn timestamp_min(&self) -> Option; - fn timestamp_max(&self) -> Option; - fn pulse_min(&self) -> Option; - fn pulse_max(&self) -> Option; -} diff --git a/crates/items_0/src/scalar_ops.rs b/crates/items_0/src/scalar_ops.rs deleted file mode 100644 index 2e739fa..0000000 --- a/crates/items_0/src/scalar_ops.rs +++ /dev/null @@ -1,309 +0,0 @@ -use crate::container::ByteEstimate; -use crate::subfr::SubFrId; -use daqbuf_err as err; -use netpod::EnumVariant; -use netpod::StringFix; -use serde::Serialize; -use std::fmt; -use std::ops; - -#[allow(unused)] -const fn is_nan_int(_x: &T) -> bool { - false -} - -#[allow(unused)] -fn is_nan_f32(x: f32) -> bool { - x.is_nan() -} - -#[allow(unused)] -fn is_nan_f64(x: f64) -> bool { - x.is_nan() -} - -pub trait AsPrimF32 { - fn as_prim_f32_b(&self) -> f32; -} - -macro_rules! impl_as_prim_f32 { - ($ty:ident) => { - impl AsPrimF32 for $ty { - fn as_prim_f32_b(&self) -> f32 { - *self as f32 - } - } - }; -} - -impl_as_prim_f32!(u8); -impl_as_prim_f32!(u16); -impl_as_prim_f32!(u32); -impl_as_prim_f32!(u64); -impl_as_prim_f32!(i8); -impl_as_prim_f32!(i16); -impl_as_prim_f32!(i32); -impl_as_prim_f32!(i64); -impl_as_prim_f32!(f32); -impl_as_prim_f32!(f64); - -impl AsPrimF32 for bool { - fn as_prim_f32_b(&self) -> f32 { - if *self { - 1. - } else { - 0. - } - } -} - -impl AsPrimF32 for String { - fn as_prim_f32_b(&self) -> f32 { - // Well, at least some impl. - self.len() as f32 - } -} - -pub trait ScalarOps: - fmt::Debug - + fmt::Display - + Clone - + PartialOrd - + PartialEq - + SubFrId - + AsPrimF32 - + ByteEstimate - + Serialize - + Unpin - + Send - + 'static -{ - fn scalar_type_name() -> &'static str; - fn zero_b() -> Self; - fn equal_slack(&self, rhs: &Self) -> bool; - fn add(&mut self, rhs: &Self); - fn div(&mut self, n: usize); - fn find_vec_min(a: &Vec) -> Option; - fn find_vec_max(a: &Vec) -> Option; - fn avg_vec(a: &Vec) -> Option; -} - -macro_rules! impl_scalar_ops { - ($ty:ident, $zero:expr, $equal_slack:ident, $mac_add:ident, $mac_div:ident, $sty_name:expr, $byte_estimate:expr) => { - impl ByteEstimate for $ty { - fn byte_estimate(&self) -> u64 { - $byte_estimate - } - } - - impl ScalarOps for $ty { - fn scalar_type_name() -> &'static str { - $sty_name - } - - fn zero_b() -> Self { - $zero - } - - fn equal_slack(&self, rhs: &Self) -> bool { - $equal_slack(self, rhs) - } - - fn add(&mut self, rhs: &Self) { - $mac_add!(self, rhs); - } - - fn div(&mut self, n: usize) { - $mac_div!(self, n); - } - - fn find_vec_min(a: &Vec) -> Option { - if a.len() == 0 { - None - } else { - let mut k = &a[0]; - for (i, v) in a.iter().enumerate() { - if *v < *k { - k = &a[i]; - } - } - Some(k.clone()) - } - } - - fn find_vec_max(a: &Vec) -> Option { - if a.len() == 0 { - None - } else { - let mut k = &a[0]; - for (i, v) in a.iter().enumerate() { - if *v > *k { - k = &a[i]; - } - } - Some(k.clone()) - } - } - - fn avg_vec(a: &Vec) -> Option { - if a.len() == 0 { - None - } else { - let mut sum = Self::zero_b(); - let mut c = 0; - for v in a.iter() { - sum.add(v); - c += 1; - } - ScalarOps::div(&mut sum, c); - Some(sum) - } - } - } - }; -} - -fn equal_int(a: T, b: T) -> bool { - a == b -} - -fn equal_f32(&a: &f32, &b: &f32) -> bool { - (a - b).abs() < 1e-4 || (a / b > 0.999 && a / b < 1.001) -} - -fn equal_f64(&a: &f64, &b: &f64) -> bool { - (a - b).abs() < 1e-6 || (a / b > 0.99999 && a / b < 1.00001) -} - -fn equal_bool(&a: &bool, &b: &bool) -> bool { - a == b -} - -fn equal_string(a: &String, b: &String) -> bool { - a == b -} - -fn _add_int(a: &mut T, b: &T) { - let _ = b; - ops::AddAssign::add_assign(a, err::todoval()); -} - -macro_rules! add_int { - ($a:expr, $b:expr) => { - *$a += $b; - }; -} - -macro_rules! add_bool { - ($a:expr, $b:expr) => { - *$a |= $b; - }; -} - -macro_rules! add_string { - ($a:expr, $b:expr) => { - $a.push_str($b); - }; -} - -macro_rules! div_int { - ($a:expr, $b:expr) => { - // TODO what is this used for? - - // TODO for average calculation, the accumulator must be large enough! - // Use u64 for all ints, and f32 for all floats. - // Therefore, the name "add" is too general. - //*$a /= $b; - { - let _ = $a; - let _ = $b; - } - }; -} - -macro_rules! div_bool { - ($a:expr, $b:expr) => { - // TODO what is this used for? - { - let _ = $a; - let _ = $b; - } - }; -} - -macro_rules! div_string { - ($a:expr, $b:expr) => { - // TODO what is this used for? - { - let _ = $a; - let _ = $b; - } - }; -} - -impl_scalar_ops!(u8, 0, equal_int, add_int, div_int, "u8", 1); -impl_scalar_ops!(u16, 0, equal_int, add_int, div_int, "u16", 2); -impl_scalar_ops!(u32, 0, equal_int, add_int, div_int, "u32", 4); -impl_scalar_ops!(u64, 0, equal_int, add_int, div_int, "u64", 8); -impl_scalar_ops!(i8, 0, equal_int, add_int, div_int, "i8", 1); -impl_scalar_ops!(i16, 0, equal_int, add_int, div_int, "i16", 2); -impl_scalar_ops!(i32, 0, equal_int, add_int, div_int, "i32", 4); -impl_scalar_ops!(i64, 0, equal_int, add_int, div_int, "i64", 8); -impl_scalar_ops!(f32, 0., equal_f32, add_int, div_int, "f32", 4); -impl_scalar_ops!(f64, 0., equal_f64, add_int, div_int, "f64", 8); -impl_scalar_ops!(bool, false, equal_bool, add_bool, div_bool, "bool", 1); -impl_scalar_ops!( - String, - String::new(), - equal_string, - add_string, - div_string, - "string", - 16 -); - -impl ByteEstimate for EnumVariant { - fn byte_estimate(&self) -> u64 { - 12 - } -} - -impl AsPrimF32 for EnumVariant { - fn as_prim_f32_b(&self) -> f32 { - self.ix() as f32 - } -} - -impl ScalarOps for EnumVariant { - fn scalar_type_name() -> &'static str { - "enumvariant" - } - - fn zero_b() -> Self { - EnumVariant::default() - } - - fn equal_slack(&self, rhs: &Self) -> bool { - self == rhs - } - - fn add(&mut self, _rhs: &Self) { - // undefined so far - } - - fn div(&mut self, _n: usize) { - // undefined so far - } - - fn find_vec_min(a: &Vec) -> Option { - todo!() - } - - fn find_vec_max(a: &Vec) -> Option { - todo!() - } - - fn avg_vec(a: &Vec) -> Option { - todo!() - } -} diff --git a/crates/items_0/src/streamitem.rs b/crates/items_0/src/streamitem.rs deleted file mode 100644 index 00b28de..0000000 --- a/crates/items_0/src/streamitem.rs +++ /dev/null @@ -1,219 +0,0 @@ -use daqbuf_err as err; -use netpod::log::Level; -use netpod::DiskStats; -use netpod::EventDataReadStats; -use netpod::RangeFilterStats; -use serde::Deserialize; -use serde::Serialize; - -pub const TERM_FRAME_TYPE_ID: u32 = 0xaa01; -pub const ERROR_FRAME_TYPE_ID: u32 = 0xaa02; -pub const SITEMTY_NONSPEC_FRAME_TYPE_ID: u32 = 0xaa04; -pub const EVENT_QUERY_JSON_STRING_FRAME: u32 = 0x100; -pub const EVENTS_0D_FRAME_TYPE_ID: u32 = 0x500; -pub const MIN_MAX_AVG_DIM_0_BINS_FRAME_TYPE_ID: u32 = 0x700; -pub const MIN_MAX_AVG_DIM_1_BINS_FRAME_TYPE_ID: u32 = 0x800; -pub const MIN_MAX_AVG_WAVE_BINS: u32 = 0xa00; -pub const WAVE_EVENTS_FRAME_TYPE_ID: u32 = 0xb00; -pub const LOG_FRAME_TYPE_ID: u32 = 0xc00; -pub const STATS_FRAME_TYPE_ID: u32 = 0xd00; -pub const RANGE_COMPLETE_FRAME_TYPE_ID: u32 = 0xe00; -pub const EVENT_FULL_FRAME_TYPE_ID: u32 = 0x2200; -pub const EVENTS_ITEM_FRAME_TYPE_ID: u32 = 0x2300; -pub const STATS_EVENTS_FRAME_TYPE_ID: u32 = 0x2400; -pub const ITEMS_2_CHANNEL_EVENTS_FRAME_TYPE_ID: u32 = 0x2500; -pub const X_BINNED_SCALAR_EVENTS_FRAME_TYPE_ID: u32 = 0x8800; -pub const X_BINNED_WAVE_EVENTS_FRAME_TYPE_ID: u32 = 0x8900; -pub const DATABUFFER_EVENT_BLOB_FRAME_TYPE_ID: u32 = 0x8a00; - -pub fn bool_is_false(j: &bool) -> bool { - *j == false -} - -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] -pub enum RangeCompletableItem { - RangeComplete, - Data(T), -} - -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] -pub enum StatsItem { - EventDataReadStats(EventDataReadStats), - RangeFilterStats(RangeFilterStats), - DiskStats(DiskStats), - Warnings(), -} - -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] -pub enum StreamItem { - DataItem(T), - Log(LogItem), - Stats(StatsItem), -} - -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] -pub struct LogItem { - pub node_ix: u32, - #[serde(with = "levelserde")] - pub level: Level, - pub msg: String, -} - -impl LogItem { - pub fn from_node(node_ix: usize, level: Level, msg: String) -> Self { - Self { - node_ix: node_ix as _, - level, - msg, - } - } -} - -pub type SitemErrTy = err::Error; - -pub type Sitemty = Result>, SitemErrTy>; - -pub type Sitemty2 = Result>, E>; - -#[macro_export] -macro_rules! on_sitemty_range_complete { - ($item:expr, $ex:expr) => { - if let Ok($crate::StreamItem::DataItem($crate::RangeCompletableItem::RangeComplete)) = $item { - $ex - } - }; -} - -#[macro_export] -macro_rules! on_sitemty_data_old { - ($item:expr, $ex:expr) => { - if let Ok($crate::streamitem::StreamItem::DataItem($crate::streamitem::RangeCompletableItem::Data(item))) = - $item - { - $ex(item) - } else { - $item - } - }; -} - -#[macro_export] -macro_rules! on_sitemty_data { - ($item:expr, $ex:expr) => {{ - use $crate::streamitem::RangeCompletableItem; - use $crate::streamitem::StreamItem; - match $item { - Ok(x) => match x { - StreamItem::DataItem(x) => match x { - RangeCompletableItem::Data(x) => $ex(x), - RangeCompletableItem::RangeComplete => { - Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) - } - }, - StreamItem::Log(x) => Ok(StreamItem::Log(x)), - StreamItem::Stats(x) => Ok(StreamItem::Stats(x)), - }, - Err(x) => Err(x), - } - }}; -} - -#[macro_export] -macro_rules! try_map_sitemty_data { - ($item:expr, $ex:expr) => {{ - use $crate::streamitem::RangeCompletableItem; - use $crate::streamitem::StreamItem; - match $item { - Ok(x) => match x { - StreamItem::DataItem(x) => match x { - RangeCompletableItem::Data(x) => match $ex(x) { - Ok(x) => Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))), - Err(e) => Err(e), - }, - RangeCompletableItem::RangeComplete => { - Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) - } - }, - StreamItem::Log(x) => Ok(StreamItem::Log(x)), - StreamItem::Stats(x) => Ok(StreamItem::Stats(x)), - }, - Err(x) => Err(x), - } - }}; -} - -pub fn sitem_data(x: X) -> Sitemty { - Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))) -} - -pub fn sitem_err_from_string(x: T) -> Sitemty -where - T: ToString, -{ - Err(err::Error::from_string(x)) -} - -pub fn sitem_err2_from_string(x: T) -> err::Error -where - T: ToString, -{ - err::Error::from_string(x) -} - -mod levelserde { - use super::Level; - use serde::de::{self, Visitor}; - use serde::{Deserializer, Serializer}; - use std::fmt; - - pub fn serialize(t: &Level, se: S) -> Result - where - S: Serializer, - { - let g = match *t { - Level::ERROR => 1, - Level::WARN => 2, - Level::INFO => 3, - Level::DEBUG => 4, - Level::TRACE => 5, - }; - se.serialize_u32(g) - } - - struct VisitLevel; - - impl VisitLevel { - fn from_u32(x: u32) -> Level { - match x { - 1 => Level::ERROR, - 2 => Level::WARN, - 3 => Level::INFO, - 4 => Level::DEBUG, - 5 => Level::TRACE, - _ => Level::TRACE, - } - } - } - - impl<'de> Visitor<'de> for VisitLevel { - type Value = Level; - - fn expecting(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - write!(fmt, "expect Level code") - } - - fn visit_u64(self, val: u64) -> Result - where - E: de::Error, - { - Ok(VisitLevel::from_u32(val as _)) - } - } - - pub fn deserialize<'de, D>(de: D) -> Result - where - D: Deserializer<'de>, - { - de.deserialize_u32(VisitLevel) - } -} diff --git a/crates/items_0/src/subfr.rs b/crates/items_0/src/subfr.rs deleted file mode 100644 index 25ac5b9..0000000 --- a/crates/items_0/src/subfr.rs +++ /dev/null @@ -1,57 +0,0 @@ -use netpod::EnumVariant; - -pub trait SubFrId { - const SUB: u32; -} - -impl SubFrId for u8 { - const SUB: u32 = 0x03; -} - -impl SubFrId for u16 { - const SUB: u32 = 0x05; -} - -impl SubFrId for u32 { - const SUB: u32 = 0x08; -} - -impl SubFrId for u64 { - const SUB: u32 = 0x0a; -} - -impl SubFrId for i8 { - const SUB: u32 = 0x02; -} - -impl SubFrId for i16 { - const SUB: u32 = 0x04; -} - -impl SubFrId for i32 { - const SUB: u32 = 0x07; -} - -impl SubFrId for i64 { - const SUB: u32 = 0x09; -} - -impl SubFrId for f32 { - const SUB: u32 = 0x0b; -} - -impl SubFrId for f64 { - const SUB: u32 = 0x0c; -} - -impl SubFrId for bool { - const SUB: u32 = 0x0d; -} - -impl SubFrId for String { - const SUB: u32 = 0x0e; -} - -impl SubFrId for EnumVariant { - const SUB: u32 = 0x0f; -} diff --git a/crates/items_0/src/test.rs b/crates/items_0/src/test.rs deleted file mode 100644 index 3e60a90..0000000 --- a/crates/items_0/src/test.rs +++ /dev/null @@ -1,87 +0,0 @@ -pub fn f32_cmp_near(x: f32, y: f32, abs: f32, rel: f32) -> bool { - /*let x = { - let mut a = x.to_le_bytes(); - a[0] &= 0xf0; - f32::from_ne_bytes(a) - }; - let y = { - let mut a = y.to_le_bytes(); - a[0] &= 0xf0; - f32::from_ne_bytes(a) - }; - x == y*/ - let ad = (x - y).abs(); - ad <= abs || (ad / y).abs() <= rel -} - -pub fn f64_cmp_near(x: f64, y: f64, abs: f64, rel: f64) -> bool { - /*let x = { - let mut a = x.to_le_bytes(); - a[0] &= 0x00; - a[1] &= 0x00; - f64::from_ne_bytes(a) - }; - let y = { - let mut a = y.to_le_bytes(); - a[0] &= 0x00; - a[1] &= 0x00; - f64::from_ne_bytes(a) - }; - x == y*/ - let ad = (x - y).abs(); - ad <= abs || (ad / y).abs() <= rel -} - -pub fn f32_iter_cmp_near(a: A, b: B, abs: f32, rel: f32) -> bool -where - A: IntoIterator, - B: IntoIterator, -{ - let mut a = a.into_iter(); - let mut b = b.into_iter(); - loop { - let x = a.next(); - let y = b.next(); - if let (Some(x), Some(y)) = (x, y) { - if !f32_cmp_near(x, y, abs, rel) { - return false; - } - } else if x.is_some() || y.is_some() { - return false; - } else { - return true; - } - } -} - -pub fn f64_iter_cmp_near(a: A, b: B, abs: f64, rel: f64) -> bool -where - A: IntoIterator, - B: IntoIterator, -{ - let mut a = a.into_iter(); - let mut b = b.into_iter(); - loop { - let x = a.next(); - let y = b.next(); - if let (Some(x), Some(y)) = (x, y) { - if !f64_cmp_near(x, y, abs, rel) { - return false; - } - } else if x.is_some() || y.is_some() { - return false; - } else { - return true; - } - } -} - -#[test] -fn test_f32_iter_cmp_near() { - let a = [-127.553e17]; - let b = [-127.554e17]; - assert_eq!(f32_iter_cmp_near(a, b, 0.000001, 0.000001), false); - let a = [-127.55300e17]; - let b = [-127.55301e17]; - assert_eq!(f32_iter_cmp_near(a, b, 0.000001, 0.000001), true); -} diff --git a/crates/items_0/src/timebin.rs b/crates/items_0/src/timebin.rs deleted file mode 100644 index d93ddd7..0000000 --- a/crates/items_0/src/timebin.rs +++ /dev/null @@ -1,129 +0,0 @@ -use crate::collect_s::CollectableDyn; -use crate::AsAnyMut; -use crate::WithLen; -use netpod::BinnedRange; -use netpod::BinnedRangeEnum; -use netpod::TsNano; -use std::fmt; -use std::ops::Range; - -// TODO can probably be removed. -pub trait TimeBins { - fn ts_min(&self) -> Option; - fn ts_max(&self) -> Option; - fn ts_min_max(&self) -> Option<(u64, u64)>; -} - -// TODO remove -pub trait TimeBinnerTy: fmt::Debug + Send + Unpin { - type Input: fmt::Debug; - type Output: fmt::Debug; - - fn ingest(&mut self, item: &mut Self::Input); - - fn set_range_complete(&mut self); - - fn bins_ready_count(&self) -> usize; - - fn bins_ready(&mut self) -> Option; - - /// If there is a bin in progress with non-zero count, push it to the result set. - /// With push_empty == true, a bin in progress is pushed even if it contains no counts. - fn push_in_progress(&mut self, push_empty: bool); - - /// Implies `Self::push_in_progress` but in addition, pushes a zero-count bin if the call - /// to `push_in_progress` did not change the result count, as long as edges are left. - /// The next call to `Self::bins_ready_count` must return one higher count than before. - fn cycle(&mut self); - - fn empty(&self) -> Option; - - fn append_empty_until_end(&mut self); -} - -pub trait TimeBinnableTy: fmt::Debug + WithLen + Send + Sized { - type TimeBinner: TimeBinnerTy; - - fn time_binner_new( - &self, - binrange: BinnedRangeEnum, - do_time_weight: bool, - emit_empty_bins: bool, - ) -> Self::TimeBinner; -} - -// #[derive(Debug, ThisError)] -// #[cstm(name = "Binninggg")] -pub enum BinningggError { - Dyn(Box), - TypeMismatch { have: String, expect: String }, -} - -impl fmt::Display for BinningggError { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - match self { - BinningggError::Dyn(e) => write!(fmt, "{e}"), - BinningggError::TypeMismatch { have, expect } => { - write!(fmt, "TypeMismatch(have: {have}, expect: {expect})") - } - } - } -} - -impl From for BinningggError -where - E: std::error::Error + 'static, -{ - fn from(value: E) -> Self { - Self::Dyn(Box::new(value)) - } -} - -pub trait BinningggContainerEventsDyn: fmt::Debug + Send { - fn type_name(&self) -> &'static str; - fn binned_events_timeweight_traitobj(&self, range: BinnedRange) -> Box; - fn to_anybox(&mut self) -> Box; -} - -pub trait BinningggContainerBinsDyn: fmt::Debug + Send + fmt::Display + WithLen + AsAnyMut + CollectableDyn { - fn type_name(&self) -> &'static str; - fn empty(&self) -> BinsBoxed; - fn clone(&self) -> BinsBoxed; - fn edges_iter( - &self, - ) -> std::iter::Zip, std::collections::vec_deque::Iter>; - fn drain_into(&mut self, dst: &mut dyn BinningggContainerBinsDyn, range: Range); - fn fix_numerics(&mut self); -} - -pub type BinsBoxed = Box; - -pub type EventsBoxed = Box; - -pub trait BinningggBinnerTy: fmt::Debug + Send { - type Input: fmt::Debug; - type Output: fmt::Debug; - - fn ingest(&mut self, item: &mut Self::Input); - fn range_final(&mut self); - fn bins_ready_count(&self) -> usize; - fn bins_ready(&mut self) -> Option; -} - -pub trait BinningggBinnableTy: fmt::Debug + WithLen + Send { - type Binner: BinningggBinnerTy; - - fn binner_new(range: BinnedRange) -> Self::Binner; -} - -pub trait BinningggBinnerDyn: fmt::Debug + Send { - fn input_done_range_final(&mut self) -> Result<(), BinningggError>; - fn input_done_range_open(&mut self) -> Result<(), BinningggError>; -} - -pub trait BinnedEventsTimeweightTrait: fmt::Debug + Send { - fn ingest(&mut self, evs_all: EventsBoxed) -> Result<(), BinningggError>; - fn input_done_range_final(&mut self) -> Result<(), BinningggError>; - fn input_done_range_open(&mut self) -> Result<(), BinningggError>; - fn output(&mut self) -> Result, BinningggError>; -} diff --git a/crates/items_0/src/transform.rs b/crates/items_0/src/transform.rs deleted file mode 100644 index 1908ea2..0000000 --- a/crates/items_0/src/transform.rs +++ /dev/null @@ -1,166 +0,0 @@ -use crate::collect_s::CollectableDyn; -use crate::collect_s::CollectedDyn; -use crate::streamitem::RangeCompletableItem; -use crate::streamitem::Sitemty; -use crate::streamitem::StreamItem; -use crate::Events; -use daqbuf_err as err; -use err::Error; -use futures_util::stream; -use futures_util::Future; -use futures_util::Stream; -use futures_util::StreamExt; -use std::pin::Pin; -use std::task::Context; -use std::task::Poll; - -pub trait EventStreamTrait: Stream>> + WithTransformProperties + Send {} - -pub trait CollectableStreamTrait: - Stream>> + WithTransformProperties + Send -{ -} - -pub struct EventTransformProperties { - pub needs_value: bool, -} - -pub struct TransformProperties { - pub needs_one_before_range: bool, - pub needs_value: bool, -} - -pub trait WithTransformProperties { - fn query_transform_properties(&self) -> TransformProperties; -} - -impl WithTransformProperties for Box -where - T: WithTransformProperties, -{ - fn query_transform_properties(&self) -> TransformProperties { - self.as_ref().query_transform_properties() - } -} - -impl WithTransformProperties for Pin> -where - T: WithTransformProperties, -{ - fn query_transform_properties(&self) -> TransformProperties { - self.as_ref().query_transform_properties() - } -} - -pub trait EventTransform: WithTransformProperties + Send { - fn transform(&mut self, src: Box) -> Box; -} - -impl EventTransform for Box -where - T: EventTransform, -{ - fn transform(&mut self, src: Box) -> Box { - self.as_mut().transform(src) - } -} - -impl EventTransform for Pin> -where - T: EventTransform, -{ - fn transform(&mut self, src: Box) -> Box { - todo!() - } -} - -pub struct IdentityTransform {} - -impl IdentityTransform { - pub fn default() -> Self { - Self {} - } -} - -impl WithTransformProperties for IdentityTransform { - fn query_transform_properties(&self) -> TransformProperties { - todo!() - } -} - -impl EventTransform for IdentityTransform { - fn transform(&mut self, src: Box) -> Box { - src - } -} - -pub struct TransformEvent(pub Box); - -impl WithTransformProperties for TransformEvent { - fn query_transform_properties(&self) -> TransformProperties { - self.0.query_transform_properties() - } -} - -impl EventTransform for TransformEvent { - fn transform(&mut self, src: Box) -> Box { - self.0.transform(src) - } -} - -impl WithTransformProperties for stream::Iter { - fn query_transform_properties(&self) -> TransformProperties { - todo!() - } -} - -impl EventStreamTrait for stream::Iter where - T: core::iter::Iterator>> + Send -{ -} - -pub struct EventStreamBox(pub Pin>); - -impl From for EventStreamBox -where - T: Events, -{ - fn from(value: T) -> Self { - let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(Box::new(value) as _))); - let x = stream::iter(vec![item]); - Self(Box::pin(x)) - } -} - -pub struct CollectableStreamBox(pub Pin>); - -impl Stream for CollectableStreamBox { - type Item = Sitemty>; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - self.0.poll_next_unpin(cx) - } -} - -impl WithTransformProperties for CollectableStreamBox { - fn query_transform_properties(&self) -> TransformProperties { - todo!() - } -} - -impl CollectableStreamTrait for CollectableStreamBox {} - -impl WithTransformProperties for stream::Empty { - fn query_transform_properties(&self) -> TransformProperties { - todo!() - } -} - -impl CollectableStreamTrait for stream::Empty -where - T: Send, - stream::Empty: Stream>>, -{ -} - -impl CollectableStreamTrait for Pin> where T: CollectableStreamTrait {} diff --git a/crates/items_0/src/vecpreview.rs b/crates/items_0/src/vecpreview.rs deleted file mode 100644 index 74f23ec..0000000 --- a/crates/items_0/src/vecpreview.rs +++ /dev/null @@ -1,53 +0,0 @@ -use core::fmt; -use std::collections::VecDeque; - -pub struct PreviewCell<'a, T> { - pub a: Option<&'a T>, - pub b: Option<&'a T>, -} - -impl<'a, T> fmt::Debug for PreviewCell<'a, T> -where - T: fmt::Debug, -{ - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - match (self.a.as_ref(), self.b.as_ref()) { - (Some(a), Some(b)) => write!(fmt, "{:?} .. {:?}", a, b), - (Some(a), None) => write!(fmt, "{:?}", a), - _ => write!(fmt, "(empty)"), - } - } -} - -pub trait PreviewRange { - fn preview<'a>(&'a self) -> Box; -} - -impl PreviewRange for VecDeque -where - T: fmt::Debug, -{ - fn preview<'a>(&'a self) -> Box { - let ret = PreviewCell { - a: self.front(), - b: if self.len() <= 1 { None } else { self.back() }, - }; - Box::new(ret) - } -} - -pub struct VecPreview<'a> { - c: &'a dyn PreviewRange, -} - -impl<'a> VecPreview<'a> { - pub fn new(c: &'a dyn PreviewRange) -> Self { - Self { c } - } -} - -impl<'a> fmt::Debug for VecPreview<'a> { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - write!(fmt, "{:?}", self.c.preview()) - } -} diff --git a/crates/items_2/Cargo.toml b/crates/items_2/Cargo.toml index 1e80da4..ae1fd15 100644 --- a/crates/items_2/Cargo.toml +++ b/crates/items_2/Cargo.toml @@ -24,7 +24,7 @@ futures-util = "0.3.24" humantime-serde = "1.1.1" thiserror = "0.0.1" daqbuf-err = { path = "../../../daqbuf-err" } -items_0 = { path = "../items_0" } +items_0 = { path = "../../../daqbuf-items-0", package = "daqbuf-items-0" } items_proc = { path = "../items_proc" } netpod = { path = "../../../daqbuf-netpod", package = "daqbuf-netpod" } parse = { path = "../parse" } diff --git a/crates/nodenet/Cargo.toml b/crates/nodenet/Cargo.toml index 8ccc17c..a211099 100644 --- a/crates/nodenet/Cargo.toml +++ b/crates/nodenet/Cargo.toml @@ -21,8 +21,7 @@ daqbuf-err = { path = "../../../daqbuf-err" } netpod = { path = "../../../daqbuf-netpod", package = "daqbuf-netpod" } query = { path = "../query" } disk = { path = "../disk" } -#parse = { path = "../parse" } -items_0 = { path = "../items_0" } +items_0 = { path = "../../../daqbuf-items-0", package = "daqbuf-items-0" } items_2 = { path = "../items_2" } dbconn = { path = "../dbconn" } scyllaconn = { path = "../scyllaconn" } diff --git a/crates/query/Cargo.toml b/crates/query/Cargo.toml index 513cbf9..ad629d2 100644 --- a/crates/query/Cargo.toml +++ b/crates/query/Cargo.toml @@ -14,7 +14,7 @@ humantime = "2.1.0" humantime-serde = "1.1.1" thiserror = "0.0.1" netpod = { path = "../../../daqbuf-netpod", package = "daqbuf-netpod" } -items_0 = { path = "../items_0" } +items_0 = { path = "../../../daqbuf-items-0", package = "daqbuf-items-0" } items_2 = { path = "../items_2" } [patch.crates-io] diff --git a/crates/scyllaconn/Cargo.toml b/crates/scyllaconn/Cargo.toml index d14105e..920e673 100644 --- a/crates/scyllaconn/Cargo.toml +++ b/crates/scyllaconn/Cargo.toml @@ -15,7 +15,7 @@ scylla = "0.13.0" daqbuf-err = { path = "../../../daqbuf-err" } netpod = { path = "../../../daqbuf-netpod", package = "daqbuf-netpod" } query = { path = "../query" } -items_0 = { path = "../items_0" } +items_0 = { path = "../../../daqbuf-items-0", package = "daqbuf-items-0" } items_2 = { path = "../items_2" } streams = { path = "../streams" } daqbuf-series = { path = "../../../daqbuf-series" } diff --git a/crates/streamio/Cargo.toml b/crates/streamio/Cargo.toml index 4ef1314..5c3f477 100644 --- a/crates/streamio/Cargo.toml +++ b/crates/streamio/Cargo.toml @@ -25,7 +25,7 @@ chrono = { version = "0.4.19", features = ["serde"] } wasmer = { version = "4.1.0", default-features = false, features = ["sys", "cranelift"], optional = true } netpod = { path = "../../../daqbuf-netpod", package = "daqbuf-netpod" } query = { path = "../query" } -items_0 = { path = "../items_0" } +items_0 = { path = "../../../daqbuf-items-0", package = "daqbuf-items-0" } items_2 = { path = "../items_2" } parse = { path = "../parse" } streams = { path = "../streams" } diff --git a/crates/streams/Cargo.toml b/crates/streams/Cargo.toml index 4a0bbfd..cdba91d 100644 --- a/crates/streams/Cargo.toml +++ b/crates/streams/Cargo.toml @@ -23,7 +23,7 @@ chrono = { version = "0.4.19", features = ["serde"] } wasmer = { version = "4.1.0", default-features = false, features = ["sys", "cranelift"], optional = true } netpod = { path = "../../../daqbuf-netpod", package = "daqbuf-netpod" } query = { path = "../query" } -items_0 = { path = "../items_0" } +items_0 = { path = "../../../daqbuf-items-0", package = "daqbuf-items-0" } items_2 = { path = "../items_2" } parse = { path = "../parse" } http = "1"