diff --git a/crates/disk/src/binned/prebinned.rs b/crates/disk/src/binned/prebinned.rs index 40c28d8..448560f 100644 --- a/crates/disk/src/binned/prebinned.rs +++ b/crates/disk/src/binned/prebinned.rs @@ -251,7 +251,7 @@ pub async fn pre_binned_bytes_for_http( node_config, ) .await? - .map(|item| match item.make_frame() { + .map(|item| match item.make_frame________() { Ok(item) => Ok(item.freeze()), Err(e) => Err(e), }); diff --git a/crates/items_2/src/binning/aggregator.rs b/crates/items_2/src/binning/aggregator.rs index 6f7640e..c6cca0c 100644 --- a/crates/items_2/src/binning/aggregator.rs +++ b/crates/items_2/src/binning/aggregator.rs @@ -161,4 +161,48 @@ impl AggregatorTimeWeight for AggregatorNumeric { } } -// TODO do enum right from begin, using a SOA enum container. +impl AggregatorTimeWeight for AggregatorNumeric { + fn new() -> Self { + Self { sum: 0. } + } + + fn ingest(&mut self, dt: DtNano, bl: DtNano, val: bool) { + let f = dt.ns() as f64 / bl.ns() as f64; + trace!("INGEST {} {}", f, val); + self.sum += f * val as u8 as f64; + } + + fn reset_for_new_bin(&mut self) { + self.sum = 0.; + } + + fn result_and_reset_for_new_bin(&mut self, filled_width_fraction: f32) -> f64 { + let sum = self.sum.clone(); + trace!("result_and_reset_for_new_bin sum {} {}", sum, filled_width_fraction); + self.sum = 0.; + sum / filled_width_fraction as f64 + } +} + +impl AggregatorTimeWeight for AggregatorNumeric { + fn new() -> Self { + Self { sum: 0. } + } + + fn ingest(&mut self, dt: DtNano, bl: DtNano, val: String) { + let f = dt.ns() as f64 / bl.ns() as f64; + trace!("INGEST {} {}", f, val); + self.sum += f * val.len() as f64; + } + + fn reset_for_new_bin(&mut self) { + self.sum = 0.; + } + + fn result_and_reset_for_new_bin(&mut self, filled_width_fraction: f32) -> f64 { + let sum = self.sum.clone(); + trace!("result_and_reset_for_new_bin sum {} {}", sum, filled_width_fraction); + self.sum = 0.; + sum / filled_width_fraction as f64 + } +} diff --git a/crates/items_2/src/binning/container_bins.rs b/crates/items_2/src/binning/container_bins.rs index 2629210..d60a60d 100644 --- a/crates/items_2/src/binning/container_bins.rs +++ b/crates/items_2/src/binning/container_bins.rs @@ -10,6 +10,7 @@ use items_0::timebin::BinsBoxed; use items_0::vecpreview::VecPreview; use items_0::AsAnyMut; use items_0::WithLen; +use netpod::EnumVariant; use netpod::TsNano; use serde::Deserialize; use serde::Serialize; @@ -339,6 +340,21 @@ where } } +macro_rules! try_to_old_time_binned { + ($sty:ty, $this:expr, $lst:expr) => { + let this = $this; + let a = this as &dyn any::Any; + if let Some(src) = a.downcast_ref::>() { + use items_0::Empty; + let mut ret = crate::binsdim0::BinsDim0::<$sty>::empty(); + for ((((((&ts1, &ts2), &cnt), min), max), avg), fnl) in src.zip_iter() { + ret.push(ts1.ns(), ts2.ns(), cnt, *min, *max, *avg as f32, $lst); + } + return Box::new(ret); + } + }; +} + impl BinningggContainerBinsDyn for ContainerBins where EVT: EventValueType, @@ -372,25 +388,56 @@ where } fn to_old_time_binned(&self) -> Box { + try_to_old_time_binned!(u8, self, 0); + try_to_old_time_binned!(u16, self, 0); + try_to_old_time_binned!(u32, self, 0); + try_to_old_time_binned!(u64, self, 0); + try_to_old_time_binned!(i8, self, 0); + try_to_old_time_binned!(i16, self, 0); + try_to_old_time_binned!(i32, self, 0); + try_to_old_time_binned!(i64, self, 0); + try_to_old_time_binned!(f32, self, 0.); + try_to_old_time_binned!(f64, self, 0.); + try_to_old_time_binned!(bool, self, false); + // try_to_old_time_binned!(String, self, String::new()); let a = self as &dyn any::Any; - if let Some(src) = a.downcast_ref::>() { + if let Some(src) = a.downcast_ref::>() { use items_0::Empty; - let mut ret = crate::binsdim0::BinsDim0::::empty(); - for ((((((&ts1, &ts2), &cnt), min), max), avg), fnl) in src.zip_iter() { - ret.push(ts1.ns(), ts2.ns(), cnt, *min, *max, *avg as f32, 0.); + let mut ret = crate::binsdim0::BinsDim0::::empty(); + for ((((((&ts1, &ts2), &cnt), min), max), avg), _fnl) in src.zip_iter() { + ret.push( + ts1.ns(), + ts2.ns(), + cnt, + min.clone(), + max.clone(), + *avg as f32, + EnumVariant::new(0, ""), + ); } - Box::new(ret) - } else if let Some(src) = a.downcast_ref::>() { - use items_0::Empty; - let mut ret = crate::binsdim0::BinsDim0::::empty(); - for ((((((&ts1, &ts2), &cnt), min), max), avg), fnl) in src.zip_iter() { - ret.push(ts1.ns(), ts2.ns(), cnt, *min, *max, *avg as f32, 0.); - } - Box::new(ret) - } else { - let styn = any::type_name::(); - todo!("TODO impl for {styn}") + return Box::new(ret); } + let styn = any::type_name::(); + todo!("TODO impl for {styn}"); + // let a = self as &dyn any::Any; + // if let Some(src) = a.downcast_ref::>() { + // use items_0::Empty; + // let mut ret = crate::binsdim0::BinsDim0::::empty(); + // for ((((((&ts1, &ts2), &cnt), min), max), avg), fnl) in src.zip_iter() { + // ret.push(ts1.ns(), ts2.ns(), cnt, *min, *max, *avg as f32, 0.); + // } + // Box::new(ret) + // } else if let Some(src) = a.downcast_ref::>() { + // use items_0::Empty; + // let mut ret = crate::binsdim0::BinsDim0::::empty(); + // for ((((((&ts1, &ts2), &cnt), min), max), avg), fnl) in src.zip_iter() { + // ret.push(ts1.ns(), ts2.ns(), cnt, *min, *max, *avg as f32, 0.); + // } + // Box::new(ret) + // } else { + // let styn = any::type_name::(); + // todo!("TODO impl for {styn}") + // } } } diff --git a/crates/items_2/src/binning/container_events.rs b/crates/items_2/src/binning/container_events.rs index ac44f85..97f3164 100644 --- a/crates/items_2/src/binning/container_events.rs +++ b/crates/items_2/src/binning/container_events.rs @@ -9,6 +9,7 @@ use err::ThisError; use items_0::timebin::BinningggContainerEventsDyn; use items_0::vecpreview::PreviewRange; use items_0::vecpreview::VecPreview; +use items_0::AsAnyRef; use netpod::BinnedRange; use netpod::TsNano; use serde::Deserialize; @@ -35,7 +36,7 @@ pub trait EventValueType: fmt::Debug + Clone + PartialOrd + Send + 'static { type AggregatorTimeWeight: AggregatorTimeWeight; type AggTimeWeightOutputAvg: AggTimeWeightOutputAvg; - fn identity_sum() -> Self; + // fn identity_sum() -> Self; // fn add_weighted(&self, add: &Self, f: f32) -> Self; } @@ -57,48 +58,48 @@ where } macro_rules! impl_event_value_type { - ($evt:ty, $zero:expr) => { + ($evt:ty) => { impl EventValueType for $evt { type Container = VecDeque; type AggregatorTimeWeight = AggregatorNumeric; type AggTimeWeightOutputAvg = f64; - - fn identity_sum() -> Self { - $zero - } } }; } -impl_event_value_type!(u8, 0); -impl_event_value_type!(u16, 0); -impl_event_value_type!(u32, 0); -impl_event_value_type!(u64, 0); -impl_event_value_type!(i8, 0); -impl_event_value_type!(i16, 0); -impl_event_value_type!(i32, 0); -impl_event_value_type!(i64, 0); -// impl_event_value_type!(f32, 0.); -// impl_event_value_type!(f64, 0.); +impl_event_value_type!(u8); +impl_event_value_type!(u16); +impl_event_value_type!(u32); +impl_event_value_type!(u64); +impl_event_value_type!(i8); +impl_event_value_type!(i16); +impl_event_value_type!(i32); +impl_event_value_type!(i64); +// impl_event_value_type!(f32); +// impl_event_value_type!(f64); impl EventValueType for f32 { type Container = VecDeque; type AggregatorTimeWeight = AggregatorNumeric; type AggTimeWeightOutputAvg = f32; - - fn identity_sum() -> Self { - 0. - } } impl EventValueType for f64 { type Container = VecDeque; type AggregatorTimeWeight = AggregatorNumeric; type AggTimeWeightOutputAvg = f64; +} - fn identity_sum() -> Self { - 0. - } +impl EventValueType for bool { + type Container = VecDeque; + type AggregatorTimeWeight = AggregatorNumeric; + type AggTimeWeightOutputAvg = f64; +} + +impl EventValueType for String { + type Container = VecDeque; + type AggregatorTimeWeight = AggregatorNumeric; + type AggTimeWeightOutputAvg = f64; } #[derive(Debug, Clone)] @@ -122,6 +123,20 @@ where vals: ::Container, } +macro_rules! try_to_events_dim0 { + ($sty:ty, $this:expr) => { + let this = $this; + if let Some(evs) = this.as_any_ref().downcast_ref::>() { + use crate::eventsdim0::EventsDim0; + let tss: VecDeque<_> = this.tss.iter().map(|x| x.ns()).collect(); + let pulses = tss.iter().map(|_| 0).collect(); + let values = evs.vals.clone(); + let ret = EventsDim0::<$sty> { tss, pulses, values }; + return Box::new(ret); + } + }; +} + impl ContainerEvents where EVT: EventValueType, @@ -178,6 +193,12 @@ where self.tss.push_back(ts); self.vals.push_back(val); } + + pub fn to_events_dim0(&self) -> Box { + try_to_events_dim0!(f64, self); + let styn = any::type_name::(); + todo!("TODO to_container_events for {styn}") + } } impl fmt::Debug for ContainerEvents @@ -196,6 +217,15 @@ where } } +impl AsAnyRef for ContainerEvents +where + EVT: EventValueType, +{ + fn as_any_ref(&self) -> &dyn any::Any { + self + } +} + pub struct ContainerEventsTakeUpTo<'a, EVT> where EVT: EventValueType, diff --git a/crates/items_2/src/binning/valuetype.rs b/crates/items_2/src/binning/valuetype.rs index 1493b55..d1b373f 100644 --- a/crates/items_2/src/binning/valuetype.rs +++ b/crates/items_2/src/binning/valuetype.rs @@ -65,7 +65,7 @@ impl AggregatorTimeWeight for EnumVariantAggregatorTimeWeight { } fn reset_for_new_bin(&mut self) { - self.sum = f32::identity_sum(); + self.sum = 0.; } fn result_and_reset_for_new_bin( @@ -73,7 +73,7 @@ impl AggregatorTimeWeight for EnumVariantAggregatorTimeWeight { filled_width_fraction: f32, ) -> ::AggTimeWeightOutputAvg { let ret = self.sum.clone(); - self.sum = f32::identity_sum(); + self.sum = 0.; ret / filled_width_fraction } } @@ -82,9 +82,4 @@ impl EventValueType for EnumVariant { type Container = EnumVariantContainer; type AggregatorTimeWeight = EnumVariantAggregatorTimeWeight; type AggTimeWeightOutputAvg = f32; - - // TODO remove this from trait, only needed for common numeric cases but not in general. - fn identity_sum() -> Self { - todo!() - } } diff --git a/crates/items_2/src/eventsdim0.rs b/crates/items_2/src/eventsdim0.rs index 7d44dcd..80894ac 100644 --- a/crates/items_2/src/eventsdim0.rs +++ b/crates/items_2/src/eventsdim0.rs @@ -1128,8 +1128,8 @@ impl Events for EventsDim0 { try_to_container_events!(i64, self); try_to_container_events!(f32, self); try_to_container_events!(f64, self); - // try_to_container_events!(bool, self); - // try_to_container_events!(String, self); + try_to_container_events!(bool, self); + try_to_container_events!(String, self); let this = self; if let Some(evs) = self.as_any_ref().downcast_ref::>() { use crate::binning::container_events::ContainerEvents; @@ -1143,25 +1143,7 @@ impl Events for EventsDim0 { return Box::new(ret); } let styn = any::type_name::(); - todo!("TODO for {styn}") - } -} - -fn try_to_container_events_fn( - this: &EventsDim0, -) -> Option> -where - STY: ScalarOps, - EVT: crate::binning::container_events::EventValueType>, -{ - use crate::binning::container_events::ContainerEvents; - if let Some(evs) = this.as_any_ref().downcast_ref::>() { - let tss = this.tss.iter().map(|&x| TsNano::from_ns(x)).collect(); - let vals = evs.values.clone(); - let ret = ContainerEvents::::from_constituents(tss, vals); - Some(Box::new(ret)) - } else { - None + todo!("TODO to_container_events for {styn}") } } @@ -1352,7 +1334,7 @@ mod test_frame { let events: Box = Box::new(events); let item = ChannelEvents::Events(events); let item = Ok::<_, Error>(StreamItem::DataItem(RangeCompletableItem::Data(item))); - let mut buf = item.make_frame().unwrap(); + let mut buf = item.make_frame_dyn().unwrap(); let s = String::from_utf8_lossy(&buf[20..buf.len() - 4]); eprintln!("[[{s}]]"); let buflen = buf.len(); diff --git a/crates/items_2/src/framable.rs b/crates/items_2/src/framable.rs index 576ea32..f7466da 100644 --- a/crates/items_2/src/framable.rs +++ b/crates/items_2/src/framable.rs @@ -59,7 +59,7 @@ impl FrameType for Box { } pub trait Framable { - fn make_frame(&self) -> Result; + fn make_frame_dyn(&self) -> Result; } pub trait FramableInner: erased_serde::Serialize + FrameTypeInnerDyn + Send { @@ -74,7 +74,7 @@ impl Framable for Sitemty where T: Sized + serde::Serialize + FrameType, { - fn make_frame(&self) -> Result { + fn make_frame_dyn(&self) -> Result { match self { Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))) => { let frame_type_id = k.frame_type_id(); @@ -95,8 +95,8 @@ impl Framable for Box where T: Framable + ?Sized, { - fn make_frame(&self) -> Result { - self.as_ref().make_frame() + fn make_frame_dyn(&self) -> Result { + self.as_ref().make_frame_dyn() } } @@ -177,7 +177,7 @@ fn test_frame_log() { msg: format!("test-log-message"), }; let item: Sitemty = Ok(StreamItem::Log(item)); - let buf = Framable::make_frame(&item).unwrap(); + let buf = Framable::make_frame_dyn(&item).unwrap(); let len = u32::from_le_bytes(buf[12..16].try_into().unwrap()); let item2: LogItem = decode_from_slice(&buf[20..20 + len as usize]).unwrap(); } @@ -187,7 +187,7 @@ fn test_frame_error() { use crate::channelevents::ChannelEvents; use crate::frame::json_from_slice; let item: Sitemty = Err(Error::with_msg_no_trace(format!("dummy-error-message"))); - let buf = Framable::make_frame(&item).unwrap(); + let buf = Framable::make_frame_dyn(&item).unwrap(); let len = u32::from_le_bytes(buf[12..16].try_into().unwrap()); let tyid = u32::from_le_bytes(buf[8..12].try_into().unwrap()); if tyid != ERROR_FRAME_TYPE_ID { diff --git a/crates/items_2/src/frame.rs b/crates/items_2/src/frame.rs index d40ae96..1ca32fa 100644 --- a/crates/items_2/src/frame.rs +++ b/crates/items_2/src/frame.rs @@ -27,22 +27,6 @@ use serde::Serialize; use std::any; use std::io; -trait EC { - fn ec(self) -> err::Error; -} - -impl EC for rmp_serde::encode::Error { - fn ec(self) -> err::Error { - err::Error::with_msg_no_trace(format!("{self:?}")) - } -} - -impl EC for rmp_serde::decode::Error { - fn ec(self) -> err::Error { - err::Error::with_msg_no_trace(format!("{self:?}")) - } -} - pub fn bincode_ser( w: W, ) -> bincode::Serializer< diff --git a/crates/nodenet/src/client.rs b/crates/nodenet/src/client.rs index 8d4ac52..450e718 100644 --- a/crates/nodenet/src/client.rs +++ b/crates/nodenet/src/client.rs @@ -28,7 +28,7 @@ async fn open_bytes_data_streams_http( let mut streams = Vec::new(); for node in &cluster.nodes { let item = sitem_data(frame1.clone()); - let buf = item.make_frame()?; + let buf = item.make_frame_dyn()?; let url = node.baseurl().join("/api/4/private/eventdata/frames").unwrap(); debug!("open_event_data_streams_http post {url}"); diff --git a/crates/nodenet/src/conn.rs b/crates/nodenet/src/conn.rs index b1a350d..25821e2 100644 --- a/crates/nodenet/src/conn.rs +++ b/crates/nodenet/src/conn.rs @@ -139,7 +139,7 @@ pub async fn create_response_bytes_stream( let fetch_info = evq.ch_conf().to_sf_databuffer()?; let stream = disk::raw::conn::make_event_blobs_pipe(&evq, &fetch_info, reqctx, ncc)?; // let stream = stream.map(|x| Box::new(x) as _); - let stream = stream.map(|x| x.make_frame().map(|x| x.freeze())); + let stream = stream.map(|x| x.make_frame_dyn().map(|x| x.freeze())); let ret = Box::pin(stream); Ok(ret) } else { @@ -161,7 +161,7 @@ pub async fn create_response_bytes_stream( }) }); // let stream = stream.map(move |x| Box::new(x) as Box); - let stream = stream.map(|x| x.make_frame().map(bytes::BytesMut::freeze)); + let stream = stream.map(|x| x.make_frame_dyn().map(bytes::BytesMut::freeze)); let ret = Box::pin(stream); Ok(ret) } @@ -204,7 +204,7 @@ async fn events_conn_handler_with_reqid( msg: format!("buf_len_histo: {:?}", buf_len_histo), }; let item: Sitemty = Ok(StreamItem::Log(item)); - let buf = match item.make_frame() { + let buf = match item.make_frame_dyn() { Ok(k) => k, Err(e) => return Err((e, netout))?, }; @@ -331,7 +331,7 @@ where Err(ce) => { let mut out = ce.netout; let item: Sitemty = Err(err::Error::from_string(ce.err)); - let buf = Framable::make_frame(&item)?; + let buf = Framable::make_frame_dyn(&item)?; out.write_all(&buf).await?; } } diff --git a/crates/nodenet/src/conn/test.rs b/crates/nodenet/src/conn/test.rs index c014b3b..33e53b2 100644 --- a/crates/nodenet/src/conn/test.rs +++ b/crates/nodenet/src/conn/test.rs @@ -89,7 +89,7 @@ fn raw_data_00() { let qu = EventsSubQuery::from_parts(select, settings, "dummy".into(), log_level); let frame1 = Frame1Parts::new(qu.clone()); let query = EventQueryJsonStringFrame(serde_json::to_string(&frame1).unwrap()); - let frame = sitem_data(query).make_frame()?; + let frame = sitem_data(query).make_frame_dyn()?; let scyqueue = err::todoval(); let jh = taskrun::spawn(events_conn_handler(client, addr, scyqueue, cfg)); con.write_all(&frame).await.unwrap(); diff --git a/crates/streams/src/generators.rs b/crates/streams/src/generators.rs index 60392f5..5b8abae 100644 --- a/crates/streams/src/generators.rs +++ b/crates/streams/src/generators.rs @@ -57,7 +57,7 @@ pub fn make_test_channel_events_bytes_stream( } }) }); - let stream = stream.map(|x| x.make_frame().map(|x| x.freeze())); + let stream = stream.map(|x| x.make_frame_dyn().map(|x| x.freeze())); let ret = Box::pin(stream); Ok(ret) } diff --git a/crates/streams/src/tcprawclient.rs b/crates/streams/src/tcprawclient.rs index a7d2838..b365c50 100644 --- a/crates/streams/src/tcprawclient.rs +++ b/crates/streams/src/tcprawclient.rs @@ -69,7 +69,7 @@ pub async fn x_processed_event_blobs_stream_from_node_tcp( let net = TcpStream::connect(addr.clone()).await?; let (netin, mut netout) = net.into_split(); let item = sitem_data(frame1); - let buf = item.make_frame()?; + let buf = item.make_frame_dyn()?; netout.write_all(&buf).await?; let buf = make_term_frame()?; netout.write_all(&buf).await?; @@ -95,7 +95,7 @@ pub async fn x_processed_event_blobs_stream_from_node_http( let frame1 = make_node_command_frame(subq.clone())?; let item = sitem_data(frame1.clone()); - let buf = item.make_frame()?; + let buf = item.make_frame_dyn()?; let url = node.baseurl().join("/api/4/private/eventdata/frames").unwrap(); debug!("open_event_data_streams_http post {url}"); @@ -168,7 +168,7 @@ where let net = TcpStream::connect(addr.clone()).await?; let (netin, mut netout) = net.into_split(); let item = sitem_data(frame1.clone()); - let buf = item.make_frame()?; + let buf = item.make_frame_dyn()?; netout.write_all(&buf).await?; let buf = make_term_frame()?; netout.write_all(&buf).await?;