Support container output format

This commit is contained in:
Dominik Werder
2024-12-04 12:15:51 +01:00
parent aeefe566b7
commit e7613d6864
6 changed files with 28 additions and 19 deletions

View File

@@ -6,14 +6,14 @@ use items_0::scalar_ops::ScalarOps;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
use items_0::subfr::SubFrId;
use items_0::timebin::BinningggContainerEventsDyn;
use items_0::WithLen;
use items_2::binning::container_events::ContainerEvents;
use items_2::binning::container_events::EventValueType;
use items_2::empty::empty_events_dyn_ev;
use items_2::binning::container_events::PulsedVal;
use items_2::empty::empty_events_pulsed_dyn_ev;
use items_2::eventfull::EventFull;
use items_2::eventsdim0::EventsDim0;
use items_2::eventsdim1::EventsDim1;
use netpod::log::*;
use netpod::AggKind;
use netpod::ScalarType;
@@ -181,7 +181,7 @@ where
impl<EVT> ValueDim0FromBytesImpl<EVT>
where
EVT: EventValueType + ScalarOps + ScalarValueFromBytes<EVT>,
EVT: EventValueType + SubFrId + ScalarOps + ScalarValueFromBytes<EVT>,
{
fn boxed() -> Box<dyn ValueFromBytes> {
Box::new(Self {
@@ -192,7 +192,7 @@ where
impl<EVT> ValueDim0FromBytes for ValueDim0FromBytesImpl<EVT>
where
EVT: EventValueType + ScalarOps + ScalarValueFromBytes<EVT>,
EVT: EventValueType + SubFrId + ScalarOps + ScalarValueFromBytes<EVT>,
{
fn convert(
&self,
@@ -202,9 +202,9 @@ where
endian: Endian,
events: &mut dyn BinningggContainerEventsDyn,
) -> Result<(), Error> {
if let Some(evs) = events.as_any_mut().downcast_mut::<ContainerEvents<EVT>>() {
if let Some(evs) = events.as_any_mut().downcast_mut::<ContainerEvents<PulsedVal<EVT>>>() {
let v = <EVT as ScalarValueFromBytes<EVT>>::convert(buf, endian)?;
evs.push_back(TsNano::from_ns(ts), v);
evs.push_back(TsNano::from_ns(ts), PulsedVal(pulse, v));
Ok(())
} else {
Err(Error::with_msg_no_trace("unexpected container"))
@@ -214,7 +214,7 @@ where
impl<EVT> ValueFromBytes for ValueDim0FromBytesImpl<EVT>
where
EVT: EventValueType + ScalarOps + ScalarValueFromBytes<EVT>,
EVT: EventValueType + SubFrId + ScalarOps + ScalarValueFromBytes<EVT>,
{
fn convert(
&self,
@@ -239,7 +239,7 @@ where
impl<EVT> ValueDim1FromBytesImpl<EVT>
where
EVT: EventValueType + ScalarOps + ScalarValueFromBytes<EVT>,
Vec<EVT>: EventValueType,
Vec<EVT>: EventValueType + SubFrId,
{
fn boxed(shape: Shape) -> Box<dyn ValueFromBytes> {
Box::new(Self {
@@ -252,7 +252,7 @@ where
impl<EVT> ValueFromBytes for ValueDim1FromBytesImpl<EVT>
where
EVT: EventValueType + ScalarOps + ScalarValueFromBytes<EVT>,
Vec<EVT>: EventValueType,
Vec<EVT>: EventValueType + SubFrId,
{
fn convert(
&self,
@@ -269,7 +269,7 @@ where
impl<EVT> ValueDim1FromBytes for ValueDim1FromBytesImpl<EVT>
where
EVT: EventValueType + ScalarOps + ScalarValueFromBytes<EVT>,
Vec<EVT>: EventValueType,
Vec<EVT>: EventValueType + SubFrId,
{
fn convert(
&self,
@@ -279,14 +279,17 @@ where
endian: Endian,
events: &mut dyn BinningggContainerEventsDyn,
) -> Result<(), Error> {
if let Some(evs) = events.as_any_mut().downcast_mut::<ContainerEvents<Vec<EVT>>>() {
if let Some(evs) = events
.as_any_mut()
.downcast_mut::<ContainerEvents<PulsedVal<Vec<EVT>>>>()
{
let n = if let Shape::Wave(n) = self.shape {
n
} else {
return Err(Error::with_msg_no_trace("ValueDim1FromBytesImpl bad shape"));
};
let v = <EVT as ScalarValueFromBytes<EVT>>::convert_dim1(buf, endian, n as _)?;
evs.push_back(TsNano::from_ns(ts), v);
evs.push_back(TsNano::from_ns(ts), PulsedVal(pulse, v));
// evs.values.push_back(v);
// evs.tss.push_back(ts);
// evs.pulses.push_back(pulse);
@@ -380,7 +383,7 @@ impl EventsDynStream {
let sh = &shape;
warn!("TODO EventsDynStream::new feed through transform");
// TODO do we need/want the empty item from here?
let events_out = empty_events_dyn_ev(st, sh)?;
let events_out = empty_events_pulsed_dyn_ev(st, sh)?;
let scalar_conv = make_scalar_conv(st, sh, &agg_kind)?;
let emit_threshold = match &shape {
Shape::Scalar => 2048,
@@ -405,7 +408,7 @@ impl EventsDynStream {
let sh = &self.shape;
// error!("TODO replace_events_out feed through transform");
// TODO do we need/want the empty item from here?
let empty = empty_events_dyn_ev(st, sh)?;
let empty = empty_events_pulsed_dyn_ev(st, sh)?;
let evs = mem::replace(&mut self.events_out, empty);
Ok(evs)
}

View File

@@ -181,6 +181,12 @@ pub struct ToJsonBody {
body: Vec<u8>,
}
impl From<Vec<u8>> for ToJsonBody {
fn from(value: Vec<u8>) -> Self {
Self { body: value }
}
}
impl<S: Serialize> From<&S> for ToJsonBody {
fn from(value: &S) -> Self {
Self {

View File

@@ -228,7 +228,7 @@ async fn binned_json_single(
let ret = response(StatusCode::OK)
.header(CONTENT_TYPE, APP_JSON)
.header(HEADER_NAME_REQUEST_ID, ctx.reqid())
.body(ToJsonBody::from(&item).into_body())?;
.body(ToJsonBody::from(item.into_bytes()).into_body())?;
Ok(ret)
}
CollectResult::Timeout => {

View File

@@ -253,7 +253,7 @@ async fn plain_events_json(
let ret = response(StatusCode::OK)
.header(CONTENT_TYPE, APP_JSON)
.header(HEADER_NAME_REQUEST_ID, ctx.reqid())
.body(ToJsonBody::from(&item).into_body())?;
.body(ToJsonBody::from(item.into_bytes()).into_body())?;
debug!("{self_name} response created");
Ok(ret)
}

View File

@@ -71,7 +71,7 @@ pub async fn scylla_channel_event_stream(
type C2 = ContainerEvents<String>;
if let Some(j) = k.as_any_mut().downcast_mut::<C1>() {
let mut g = C2::new();
for (&ts, val) in j.iter_zip() {
for (ts, val) in j.iter_zip() {
use netpod::channelstatus as cs2;
let val = match cs2::ChannelStatus::from_kind(val as _) {
Ok(x) => x.to_user_variant_string(),

View File

@@ -46,7 +46,7 @@ pub async fn worker_write(
stmts_cache: &StmtsCache,
scy: &ScySession,
) -> Result<(), streams::timebin::cached::reader::Error> {
for (((((((&ts1, &ts2), &cnt), &min), &max), &avg), &lst), &fnl) in bins.zip_iter() {
for (((((((&ts1, &ts2), &cnt), min), max), &avg), lst), &fnl) in bins.zip_iter() {
let bin_len = DtMs::from_ms_u64((ts2.ns() - ts1.ns()) / 1000000);
let div = streams::timebin::cached::reader::part_len(bin_len).ns();
let msp = ts1.ns() / div;