From e7613d68643bb862c8e2400fe6d18cf77f7860e1 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 4 Dec 2024 12:15:51 +0100 Subject: [PATCH] Support container output format --- crates/disk/src/decode.rs | 33 ++++++++++++++++------------- crates/httpclient/src/httpclient.rs | 6 ++++++ crates/httpret/src/api4/binned.rs | 2 +- crates/httpret/src/api4/events.rs | 2 +- crates/nodenet/src/scylla.rs | 2 +- crates/scyllaconn/src/bincache.rs | 2 +- 6 files changed, 28 insertions(+), 19 deletions(-) diff --git a/crates/disk/src/decode.rs b/crates/disk/src/decode.rs index 5649c98..24f68ef 100644 --- a/crates/disk/src/decode.rs +++ b/crates/disk/src/decode.rs @@ -6,14 +6,14 @@ use items_0::scalar_ops::ScalarOps; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; +use items_0::subfr::SubFrId; use items_0::timebin::BinningggContainerEventsDyn; use items_0::WithLen; use items_2::binning::container_events::ContainerEvents; use items_2::binning::container_events::EventValueType; -use items_2::empty::empty_events_dyn_ev; +use items_2::binning::container_events::PulsedVal; +use items_2::empty::empty_events_pulsed_dyn_ev; use items_2::eventfull::EventFull; -use items_2::eventsdim0::EventsDim0; -use items_2::eventsdim1::EventsDim1; use netpod::log::*; use netpod::AggKind; use netpod::ScalarType; @@ -181,7 +181,7 @@ where impl ValueDim0FromBytesImpl where - EVT: EventValueType + ScalarOps + ScalarValueFromBytes, + EVT: EventValueType + SubFrId + ScalarOps + ScalarValueFromBytes, { fn boxed() -> Box { Box::new(Self { @@ -192,7 +192,7 @@ where impl ValueDim0FromBytes for ValueDim0FromBytesImpl where - EVT: EventValueType + ScalarOps + ScalarValueFromBytes, + EVT: EventValueType + SubFrId + ScalarOps + ScalarValueFromBytes, { fn convert( &self, @@ -202,9 +202,9 @@ where endian: Endian, events: &mut dyn BinningggContainerEventsDyn, ) -> Result<(), Error> { - if let Some(evs) = events.as_any_mut().downcast_mut::>() { + if let Some(evs) = events.as_any_mut().downcast_mut::>>() { let v = >::convert(buf, endian)?; - evs.push_back(TsNano::from_ns(ts), v); + evs.push_back(TsNano::from_ns(ts), PulsedVal(pulse, v)); Ok(()) } else { Err(Error::with_msg_no_trace("unexpected container")) @@ -214,7 +214,7 @@ where impl ValueFromBytes for ValueDim0FromBytesImpl where - EVT: EventValueType + ScalarOps + ScalarValueFromBytes, + EVT: EventValueType + SubFrId + ScalarOps + ScalarValueFromBytes, { fn convert( &self, @@ -239,7 +239,7 @@ where impl ValueDim1FromBytesImpl where EVT: EventValueType + ScalarOps + ScalarValueFromBytes, - Vec: EventValueType, + Vec: EventValueType + SubFrId, { fn boxed(shape: Shape) -> Box { Box::new(Self { @@ -252,7 +252,7 @@ where impl ValueFromBytes for ValueDim1FromBytesImpl where EVT: EventValueType + ScalarOps + ScalarValueFromBytes, - Vec: EventValueType, + Vec: EventValueType + SubFrId, { fn convert( &self, @@ -269,7 +269,7 @@ where impl ValueDim1FromBytes for ValueDim1FromBytesImpl where EVT: EventValueType + ScalarOps + ScalarValueFromBytes, - Vec: EventValueType, + Vec: EventValueType + SubFrId, { fn convert( &self, @@ -279,14 +279,17 @@ where endian: Endian, events: &mut dyn BinningggContainerEventsDyn, ) -> Result<(), Error> { - if let Some(evs) = events.as_any_mut().downcast_mut::>>() { + if let Some(evs) = events + .as_any_mut() + .downcast_mut::>>>() + { let n = if let Shape::Wave(n) = self.shape { n } else { return Err(Error::with_msg_no_trace("ValueDim1FromBytesImpl bad shape")); }; let v = >::convert_dim1(buf, endian, n as _)?; - evs.push_back(TsNano::from_ns(ts), v); + evs.push_back(TsNano::from_ns(ts), PulsedVal(pulse, v)); // evs.values.push_back(v); // evs.tss.push_back(ts); // evs.pulses.push_back(pulse); @@ -380,7 +383,7 @@ impl EventsDynStream { let sh = &shape; warn!("TODO EventsDynStream::new feed through transform"); // TODO do we need/want the empty item from here? - let events_out = empty_events_dyn_ev(st, sh)?; + let events_out = empty_events_pulsed_dyn_ev(st, sh)?; let scalar_conv = make_scalar_conv(st, sh, &agg_kind)?; let emit_threshold = match &shape { Shape::Scalar => 2048, @@ -405,7 +408,7 @@ impl EventsDynStream { let sh = &self.shape; // error!("TODO replace_events_out feed through transform"); // TODO do we need/want the empty item from here? - let empty = empty_events_dyn_ev(st, sh)?; + let empty = empty_events_pulsed_dyn_ev(st, sh)?; let evs = mem::replace(&mut self.events_out, empty); Ok(evs) } diff --git a/crates/httpclient/src/httpclient.rs b/crates/httpclient/src/httpclient.rs index 421ba85..2511395 100644 --- a/crates/httpclient/src/httpclient.rs +++ b/crates/httpclient/src/httpclient.rs @@ -181,6 +181,12 @@ pub struct ToJsonBody { body: Vec, } +impl From> for ToJsonBody { + fn from(value: Vec) -> Self { + Self { body: value } + } +} + impl From<&S> for ToJsonBody { fn from(value: &S) -> Self { Self { diff --git a/crates/httpret/src/api4/binned.rs b/crates/httpret/src/api4/binned.rs index 1bafa6d..5850328 100644 --- a/crates/httpret/src/api4/binned.rs +++ b/crates/httpret/src/api4/binned.rs @@ -228,7 +228,7 @@ async fn binned_json_single( let ret = response(StatusCode::OK) .header(CONTENT_TYPE, APP_JSON) .header(HEADER_NAME_REQUEST_ID, ctx.reqid()) - .body(ToJsonBody::from(&item).into_body())?; + .body(ToJsonBody::from(item.into_bytes()).into_body())?; Ok(ret) } CollectResult::Timeout => { diff --git a/crates/httpret/src/api4/events.rs b/crates/httpret/src/api4/events.rs index f731a8b..d12912b 100644 --- a/crates/httpret/src/api4/events.rs +++ b/crates/httpret/src/api4/events.rs @@ -253,7 +253,7 @@ async fn plain_events_json( let ret = response(StatusCode::OK) .header(CONTENT_TYPE, APP_JSON) .header(HEADER_NAME_REQUEST_ID, ctx.reqid()) - .body(ToJsonBody::from(&item).into_body())?; + .body(ToJsonBody::from(item.into_bytes()).into_body())?; debug!("{self_name} response created"); Ok(ret) } diff --git a/crates/nodenet/src/scylla.rs b/crates/nodenet/src/scylla.rs index f35458a..7818958 100644 --- a/crates/nodenet/src/scylla.rs +++ b/crates/nodenet/src/scylla.rs @@ -71,7 +71,7 @@ pub async fn scylla_channel_event_stream( type C2 = ContainerEvents; if let Some(j) = k.as_any_mut().downcast_mut::() { let mut g = C2::new(); - for (&ts, val) in j.iter_zip() { + for (ts, val) in j.iter_zip() { use netpod::channelstatus as cs2; let val = match cs2::ChannelStatus::from_kind(val as _) { Ok(x) => x.to_user_variant_string(), diff --git a/crates/scyllaconn/src/bincache.rs b/crates/scyllaconn/src/bincache.rs index 67cc150..953d820 100644 --- a/crates/scyllaconn/src/bincache.rs +++ b/crates/scyllaconn/src/bincache.rs @@ -46,7 +46,7 @@ pub async fn worker_write( stmts_cache: &StmtsCache, scy: &ScySession, ) -> Result<(), streams::timebin::cached::reader::Error> { - for (((((((&ts1, &ts2), &cnt), &min), &max), &avg), &lst), &fnl) in bins.zip_iter() { + for (((((((&ts1, &ts2), &cnt), min), max), &avg), lst), &fnl) in bins.zip_iter() { let bin_len = DtMs::from_ms_u64((ts2.ns() - ts1.ns()) / 1000000); let div = streams::timebin::cached::reader::part_len(bin_len).ns(); let msp = ts1.ns() / div;