Improve container output

This commit is contained in:
Dominik Werder
2024-12-04 16:01:02 +01:00
parent 71f5c3c763
commit 855f1796f1
6 changed files with 21 additions and 107 deletions

View File

@@ -75,7 +75,7 @@ where
StreamItem::DataItem(x) => match x {
RangeCompletableItem::Data(evs) => {
let val = evs.into_user_facing_api_type();
let val = val.into_serializable();
let val = val.into_serializable_normal();
let mut buf = Vec::with_capacity(64);
ciborium::into_writer(&val, &mut buf).map_err(|e| Error::Msg(e.to_string()))?;
let bytes = Bytes::from(buf);

View File

@@ -3,10 +3,7 @@ use crate::tcprawclient::container_stream_from_bytes_stream;
use crate::tcprawclient::make_sub_query;
use crate::tcprawclient::OpenBoxedBytesStreamsBox;
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::on_sitemty_data;
use items_0::streamitem::Sitemty;
use items_0::Events;
use items_2::channelevents::ChannelEvents;
use items_2::merger::Merger;
use netpod::log::*;
@@ -58,39 +55,7 @@ pub async fn dyn_events_stream(
// TODO propagate also the max-buf-len for the first stage event reader.
// TODO use a mixture of count and byte-size as threshold.
let stream = Merger::new(inps, evq.merger_out_len_max());
let stream = stream.inspect(|x| {
if true {
use items_0::streamitem::RangeCompletableItem::*;
use items_0::streamitem::StreamItem::*;
use items_0::WithLen;
use items_2::channelevents::ChannelEvents;
match x {
Ok(DataItem(Data(ChannelEvents::Events(x)))) => {
trace!("after MERGE yields item len {}", x.len());
}
_ => {
trace!("after MERGE yields item {:?}", x);
}
}
}
});
let stream = RangeFilter2::new(stream, evq.range().try_into()?, evq.one_before_range());
let stream = stream.inspect(|x| {
if true {
use items_0::streamitem::RangeCompletableItem::*;
use items_0::streamitem::StreamItem::*;
use items_0::WithLen;
use items_2::channelevents::ChannelEvents;
match x {
Ok(DataItem(Data(ChannelEvents::Events(x)))) => {
trace!("after merge and filter yields item len {}", x.len());
}
_ => {
trace!("after merge and filter yields item {:?}", x);
}
}
}
});
if let Some(wasmname) = evq.test_do_wasm() {
let stream =
transform_wasm::<_, items_0::streamitem::SitemErrTy>(stream, wasmname, ctx).await?;

View File

@@ -45,8 +45,6 @@ pub enum Error {
Framable(#[from] items_2::framable::Error),
Json(#[from] serde_json::Error),
Http(#[from] http::Error),
// HttpClient(#[from] httpclient::Error),
// Hyper(#[from] httpclient::hyper::Error),
#[error("ServerError({0:?}, {1})")]
ServerError(http::response::Parts, String),
HttpBody(Box<dyn std::error::Error + Send>),
@@ -187,26 +185,7 @@ where
{
let frames = InMemoryFrameStream::new(inp, bufcap);
let frames = frames.map_err(sitem_err2_from_string);
let frames = frames.inspect(|x| {
if true {
trace!("container_stream_from_bytes_stream see frame {:?}", x);
}
});
let stream = EventsFromFrames::<T, _>::new(frames, dbgdesc);
let stream = stream.inspect(|x| {
if true {
use items_0::streamitem::RangeCompletableItem::*;
use items_0::streamitem::StreamItem::*;
match x {
Ok(DataItem(Data(x))) => {
trace!("EventsFromFrames yields item len {}", x.len());
}
_ => {
trace!("EventsFromFrames yields item {:?}", x);
}
}
}
});
Ok(stream)
}

View File

@@ -1,13 +1,11 @@
use super::cached::reader::EventsReadProvider;
use crate::events::convertforbinning::ConvertForBinning;
use crate::log::*;
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
use items_0::timebin::BinsBoxed;
use items_2::binning::timeweight::timeweight_events_dyn::BinnedEventsTimeweightStream;
use netpod::log::*;
use netpod::BinnedRange;
use netpod::TsNano;
use query::api4::events::EventsSubQuery;
@@ -16,7 +14,7 @@ use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
macro_rules! trace_emit { ($($arg:tt)*) => ( if true { trace!($($arg)*); } ) }
macro_rules! trace_emit { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
#[derive(Debug, thiserror::Error)]
#[cstm(name = "ReadingBinnedFromEvents")]
@@ -47,23 +45,6 @@ impl BinnedFromEvents {
} else {
return Err(Error::ExpectTimeweighted);
};
let stream = stream.map(|item| match item {
Ok(x) => match x {
StreamItem::DataItem(x) => match x {
RangeCompletableItem::Data(x) => {
trace_emit!("see item {:?}", x);
Ok(StreamItem::DataItem(RangeCompletableItem::Data(x)))
}
RangeCompletableItem::RangeComplete => {
debug!("BinnedFromEvents sees range final");
Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))
}
},
StreamItem::Log(x) => Ok(StreamItem::Log(x)),
StreamItem::Stats(x) => Ok(StreamItem::Stats(x)),
},
Err(e) => Err(e),
});
let ret = Self {
stream: Box::pin(stream),
};

View File

@@ -25,6 +25,8 @@ use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
macro_rules! trace_init { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
#[derive(Debug, thiserror::Error)]
#[cstm(name = "TimeBinnedFromLayers")]
pub enum Error {
@@ -58,7 +60,7 @@ impl TimeBinnedFromLayers {
cache_read_provider: Arc<dyn CacheReadProvider>,
events_read_provider: Arc<dyn EventsReadProvider>,
) -> Result<Self, Error> {
debug!(
trace_init!(
"{}::new {:?} {:?} {:?}",
Self::type_name(),
ch_conf.series(),
@@ -67,7 +69,7 @@ impl TimeBinnedFromLayers {
);
let bin_len = DtMs::from_ms_u64(range.bin_len.ms());
if bin_len_layers.contains(&bin_len) {
debug!("{}::new bin_len in layers {:?}", Self::type_name(), range);
trace_init!("{}::new bin_len in layers {:?}", Self::type_name(), range);
let inp = GapFill::new(
"FromLayers-ongrid".into(),
ch_conf.clone(),
@@ -85,7 +87,7 @@ impl TimeBinnedFromLayers {
let ret = Self { inp: Box::pin(inp) };
Ok(ret)
} else {
debug!(
trace_init!(
"{}::new bin_len off layers {:?}",
Self::type_name(),
range
@@ -96,7 +98,7 @@ impl TimeBinnedFromLayers {
return Err(Error::FinerGridMismatch(bin_len, finer));
}
let range_finer = BinnedRange::from_nano_range(range.to_nano_range(), finer);
debug!(
trace_init!(
"{}::new next finer from bins {:?} {:?}",
Self::type_name(),
finer,
@@ -121,7 +123,7 @@ impl TimeBinnedFromLayers {
Ok(ret)
}
None => {
debug!("{}::new next finer from events", Self::type_name());
trace_init!("{}::new next finer from events", Self::type_name());
let series_range = SeriesRange::TimeRange(range.to_nano_range());
let one_before_range = true;
let select = EventsSubQuerySelect::new(
@@ -139,7 +141,7 @@ impl TimeBinnedFromLayers {
let inp =
BinnedFromEvents::new(range, evq, do_time_weight, events_read_provider)?;
let ret = Self { inp: Box::pin(inp) };
debug!("{}::new setup from events", Self::type_name());
trace_init!("{}::new setup from events", Self::type_name());
Ok(ret)
}
}

View File

@@ -157,40 +157,29 @@ pub async fn timebinnable_stream_sf_databuffer_channelevents(
if true {
let r1 = evs
.as_any_mut()
.downcast_mut::<items_2::eventsdim0::EventsDim0<f64>>()
.downcast_mut::<ContainerEvents<f64>>()
.is_some();
let r2 = evs
.as_mut()
.as_any_mut()
.downcast_mut::<items_2::eventsdim0::EventsDim0<f64>>()
.downcast_mut::<Box<ContainerEvents<f64>>>()
.is_some();
let r3 = evs
.as_any_mut()
.downcast_mut::<Box<items_2::eventsdim0::EventsDim0<f64>>>()
.is_some();
let r4 = evs
.as_mut()
.as_any_mut()
.downcast_mut::<Box<items_2::eventsdim0::EventsDim0<f64>>>()
.is_some();
let r5 = evs
.as_mut()
.as_any_mut()
.downcast_mut::<ChannelEvents>()
.is_some();
let r6 = evs
let r4 = evs
.as_mut()
.as_any_mut()
.downcast_mut::<Box<ChannelEvents>>()
.is_some();
debug!("wasm castings: {r1} {r2} {r3} {r4} {r5} {r6}");
debug!("wasm castings: {r1} {r2} {r3} {r4}");
}
if let Some(evs) = evs.as_any_mut().downcast_mut::<ChannelEvents>() {
match evs {
ChannelEvents::Events(evs) => {
if let Some(evs) = evs
.as_any_mut()
.downcast_mut::<items_2::eventsdim0::EventsDim0<f64>>()
if let Some(evs) =
evs.as_any_mut().downcast_mut::<ContainerEvents<f64>>()
{
use items_0::WithLen;
if evs.len() == 0 {
@@ -304,9 +293,8 @@ async fn timebinned_stream(
)?;
let stream = stream.map(|item| {
use items_0::timebin::BinningggContainerBinsDyn;
on_sitemty_data!(item, |mut x: Box<dyn BinningggContainerBinsDyn>| {
x.fix_numerics();
let ret = Box::new(x) as Box<dyn CollectableDyn>;
on_sitemty_data!(item, |x: Box<dyn BinningggContainerBinsDyn>| {
let ret = x.boxed_into_collectable_box();
Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret)))
})
});
@@ -355,7 +343,7 @@ pub async fn timebinned_json(
match collres {
CollectResult::Some(collres) => {
let x = collres.into_user_facing_api_type_box();
let val = x.into_serializable();
let val = x.into_serializable_json();
let jsval = serde_json::to_string(&val)?;
Ok(CollectResult::Some(JsonBytes::new(jsval)))
}
@@ -369,7 +357,7 @@ fn take_collector_result(
match coll.result() {
Ok(collres) => {
let x = collres.into_user_facing_api_type_box();
let val = x.into_serializable();
let val = x.into_serializable_json();
match serde_json::to_string(&val) {
Ok(jsval) => Some(JsonBytes::new(jsval)),
Err(e) => Some(JsonBytes::new("{\"ERROR\":true}")),
@@ -387,7 +375,6 @@ pub async fn timebinned_json_framed(
events_read_provider: Arc<dyn EventsReadProvider>,
timeout_provider: Box<dyn StreamTimeout2>,
) -> Result<JsonStream, Error> {
trace!("timebinned_json_framed");
let binned_range = query.covering_range()?;
// TODO derive better values, from query
let stream = timebinned_stream(