From aeefe566b7b2f785ad7efb1af40d4c899939f51a Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Sat, 30 Nov 2024 11:44:49 +0100 Subject: [PATCH] WIP change container type --- crates/commonio/Cargo.toml | 2 +- crates/disk/src/decode.rs | 49 ++++++++++++++++------------- crates/nodenet/src/conn.rs | 3 +- crates/nodenet/src/conn/test.rs | 9 ++++-- crates/streamio/src/tcprawclient.rs | 3 +- 5 files changed, 38 insertions(+), 28 deletions(-) diff --git a/crates/commonio/Cargo.toml b/crates/commonio/Cargo.toml index 2362618..ebed590 100644 --- a/crates/commonio/Cargo.toml +++ b/crates/commonio/Cargo.toml @@ -14,7 +14,7 @@ bytes = "1" serde = { version = "1", features = ["derive"] } serde_json = "1" chrono = "0.4" -async-channel = "1.9.0" +async-channel = "1.9" parking_lot = "0.12" crc32fast = "1.2" daqbuf-err = { path = "../../../daqbuf-err" } diff --git a/crates/disk/src/decode.rs b/crates/disk/src/decode.rs index 16e32b0..5649c98 100644 --- a/crates/disk/src/decode.rs +++ b/crates/disk/src/decode.rs @@ -8,6 +8,8 @@ use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; use items_0::timebin::BinningggContainerEventsDyn; use items_0::WithLen; +use items_2::binning::container_events::ContainerEvents; +use items_2::binning::container_events::EventValueType; use items_2::empty::empty_events_dyn_ev; use items_2::eventfull::EventFull; use items_2::eventsdim0::EventsDim0; @@ -16,6 +18,7 @@ use netpod::log::*; use netpod::AggKind; use netpod::ScalarType; use netpod::Shape; +use netpod::TsNano; use std::marker::PhantomData; use std::mem; use std::pin::Pin; @@ -176,9 +179,9 @@ where _m1: PhantomData, } -impl ValueDim0FromBytesImpl +impl ValueDim0FromBytesImpl where - STY: ScalarOps + ScalarValueFromBytes, + EVT: EventValueType + ScalarOps + ScalarValueFromBytes, { fn boxed() -> Box { Box::new(Self { @@ -187,9 +190,9 @@ where } } -impl ValueDim0FromBytes for ValueDim0FromBytesImpl +impl ValueDim0FromBytes for ValueDim0FromBytesImpl where - STY: ScalarOps + ScalarValueFromBytes, + EVT: EventValueType + ScalarOps + ScalarValueFromBytes, { fn convert( &self, @@ -199,11 +202,9 @@ where endian: Endian, events: &mut dyn BinningggContainerEventsDyn, ) -> Result<(), Error> { - if let Some(evs) = events.as_any_mut().downcast_mut::>() { - let v = >::convert(buf, endian)?; - evs.values.push_back(v); - evs.tss.push_back(ts); - evs.pulses.push_back(pulse); + if let Some(evs) = events.as_any_mut().downcast_mut::>() { + let v = >::convert(buf, endian)?; + evs.push_back(TsNano::from_ns(ts), v); Ok(()) } else { Err(Error::with_msg_no_trace("unexpected container")) @@ -211,9 +212,9 @@ where } } -impl ValueFromBytes for ValueDim0FromBytesImpl +impl ValueFromBytes for ValueDim0FromBytesImpl where - STY: ScalarOps + ScalarValueFromBytes, + EVT: EventValueType + ScalarOps + ScalarValueFromBytes, { fn convert( &self, @@ -235,9 +236,10 @@ where _m1: PhantomData, } -impl ValueDim1FromBytesImpl +impl ValueDim1FromBytesImpl where - STY: ScalarOps + ScalarValueFromBytes, + EVT: EventValueType + ScalarOps + ScalarValueFromBytes, + Vec: EventValueType, { fn boxed(shape: Shape) -> Box { Box::new(Self { @@ -247,9 +249,10 @@ where } } -impl ValueFromBytes for ValueDim1FromBytesImpl +impl ValueFromBytes for ValueDim1FromBytesImpl where - STY: ScalarOps + ScalarValueFromBytes, + EVT: EventValueType + ScalarOps + ScalarValueFromBytes, + Vec: EventValueType, { fn convert( &self, @@ -263,9 +266,10 @@ where } } -impl ValueDim1FromBytes for ValueDim1FromBytesImpl +impl ValueDim1FromBytes for ValueDim1FromBytesImpl where - STY: ScalarOps + ScalarValueFromBytes, + EVT: EventValueType + ScalarOps + ScalarValueFromBytes, + Vec: EventValueType, { fn convert( &self, @@ -275,16 +279,17 @@ where endian: Endian, events: &mut dyn BinningggContainerEventsDyn, ) -> Result<(), Error> { - if let Some(evs) = events.as_any_mut().downcast_mut::>() { + if let Some(evs) = events.as_any_mut().downcast_mut::>>() { let n = if let Shape::Wave(n) = self.shape { n } else { return Err(Error::with_msg_no_trace("ValueDim1FromBytesImpl bad shape")); }; - let v = >::convert_dim1(buf, endian, n as _)?; - evs.values.push_back(v); - evs.tss.push_back(ts); - evs.pulses.push_back(pulse); + let v = >::convert_dim1(buf, endian, n as _)?; + evs.push_back(TsNano::from_ns(ts), v); + // evs.values.push_back(v); + // evs.tss.push_back(ts); + // evs.pulses.push_back(pulse); Ok(()) } else { Err(Error::with_msg_no_trace("unexpected container")) diff --git a/crates/nodenet/src/conn.rs b/crates/nodenet/src/conn.rs index 4b37b25..efc72e2 100644 --- a/crates/nodenet/src/conn.rs +++ b/crates/nodenet/src/conn.rs @@ -94,6 +94,7 @@ async fn make_channel_events_stream_data( scyqueue: Option<&ScyllaQueue>, ncc: &NodeConfigCached, ) -> Result> + Send>>, Error> { + // ) -> Result>, Error> { if subq.backend() == TEST_BACKEND { let node_count = ncc.node_config.cluster.nodes.len() as u64; let node_ix = ncc.ix as u64; @@ -219,7 +220,7 @@ where .await { match k { - Ok(StreamItem::DataItem(item)) => { + Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) => { frames.push(item); } Ok(item) => { diff --git a/crates/nodenet/src/conn/test.rs b/crates/nodenet/src/conn/test.rs index a04bc2b..9fb47f0 100644 --- a/crates/nodenet/src/conn/test.rs +++ b/crates/nodenet/src/conn/test.rs @@ -4,6 +4,7 @@ use daqbuf_err as err; use err::Error; use futures_util::StreamExt; use items_0::streamitem::sitem_data; +use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; use items_0::streamitem::ERROR_FRAME_TYPE_ID; @@ -102,8 +103,8 @@ fn raw_data_00() { let mut frames = InMemoryFrameStream::new(TcpReadAsBytes::new(netin), qu.inmem_bufcap()); while let Some(frame) = frames.next().await { match frame { - Ok(frame) => match frame { - StreamItem::DataItem(k) => { + Ok(x) => match x { + StreamItem::DataItem(RangeCompletableItem::Data(k)) => { eprintln!("{k:?}"); if k.tyid() == ITEMS_2_CHANNEL_EVENTS_FRAME_TYPE_ID { } else if k.tyid() == ERROR_FRAME_TYPE_ID { @@ -115,6 +116,10 @@ fn raw_data_00() { let item: Sitemty = decode_frame(&k).unwrap(); eprintln!("decoded: {:?}", item); } + StreamItem::DataItem(RangeCompletableItem::RangeComplete) => { + eprintln!("decoded: RangeComplete"); + todo!() + } StreamItem::Log(_) => todo!(), StreamItem::Stats(_) => todo!(), }, diff --git a/crates/streamio/src/tcprawclient.rs b/crates/streamio/src/tcprawclient.rs index 8452ba2..d61a3a8 100644 --- a/crates/streamio/src/tcprawclient.rs +++ b/crates/streamio/src/tcprawclient.rs @@ -87,8 +87,7 @@ where let inp = Box::pin(inp) as BoxedBytesStream; let frames = InMemoryFrameStream::new(inp, subq.inmem_bufcap()); let frames = frames.map_err(sitem_err2_from_string); - let frames = Box::pin(frames); - let stream = EventsFromFrames::::new(frames, addr); + let stream = EventsFromFrames::::new(frames, addr); streams.push(Box::pin(stream) as _); } Ok(streams)