diff --git a/src/cbor_stream.rs b/src/cbor_stream.rs index 502ad95..0b500cf 100644 --- a/src/cbor_stream.rs +++ b/src/cbor_stream.rs @@ -75,7 +75,7 @@ where StreamItem::DataItem(x) => match x { RangeCompletableItem::Data(evs) => { let val = evs.into_user_facing_api_type(); - let val = val.into_serializable(); + let val = val.into_serializable_normal(); let mut buf = Vec::with_capacity(64); ciborium::into_writer(&val, &mut buf).map_err(|e| Error::Msg(e.to_string()))?; let bytes = Bytes::from(buf); diff --git a/src/plaineventsstream.rs b/src/plaineventsstream.rs index f010b5a..0306bce 100644 --- a/src/plaineventsstream.rs +++ b/src/plaineventsstream.rs @@ -3,10 +3,7 @@ use crate::tcprawclient::container_stream_from_bytes_stream; use crate::tcprawclient::make_sub_query; use crate::tcprawclient::OpenBoxedBytesStreamsBox; use futures_util::Stream; -use futures_util::StreamExt; -use items_0::on_sitemty_data; use items_0::streamitem::Sitemty; -use items_0::Events; use items_2::channelevents::ChannelEvents; use items_2::merger::Merger; use netpod::log::*; @@ -58,39 +55,7 @@ pub async fn dyn_events_stream( // TODO propagate also the max-buf-len for the first stage event reader. // TODO use a mixture of count and byte-size as threshold. let stream = Merger::new(inps, evq.merger_out_len_max()); - let stream = stream.inspect(|x| { - if true { - use items_0::streamitem::RangeCompletableItem::*; - use items_0::streamitem::StreamItem::*; - use items_0::WithLen; - use items_2::channelevents::ChannelEvents; - match x { - Ok(DataItem(Data(ChannelEvents::Events(x)))) => { - trace!("after MERGE yields item len {}", x.len()); - } - _ => { - trace!("after MERGE yields item {:?}", x); - } - } - } - }); let stream = RangeFilter2::new(stream, evq.range().try_into()?, evq.one_before_range()); - let stream = stream.inspect(|x| { - if true { - use items_0::streamitem::RangeCompletableItem::*; - use items_0::streamitem::StreamItem::*; - use items_0::WithLen; - use items_2::channelevents::ChannelEvents; - match x { - Ok(DataItem(Data(ChannelEvents::Events(x)))) => { - trace!("after merge and filter yields item len {}", x.len()); - } - _ => { - trace!("after merge and filter yields item {:?}", x); - } - } - } - }); if let Some(wasmname) = evq.test_do_wasm() { let stream = transform_wasm::<_, items_0::streamitem::SitemErrTy>(stream, wasmname, ctx).await?; diff --git a/src/tcprawclient.rs b/src/tcprawclient.rs index 5f18e46..eea9a56 100644 --- a/src/tcprawclient.rs +++ b/src/tcprawclient.rs @@ -45,8 +45,6 @@ pub enum Error { Framable(#[from] items_2::framable::Error), Json(#[from] serde_json::Error), Http(#[from] http::Error), - // HttpClient(#[from] httpclient::Error), - // Hyper(#[from] httpclient::hyper::Error), #[error("ServerError({0:?}, {1})")] ServerError(http::response::Parts, String), HttpBody(Box), @@ -187,26 +185,7 @@ where { let frames = InMemoryFrameStream::new(inp, bufcap); let frames = frames.map_err(sitem_err2_from_string); - let frames = frames.inspect(|x| { - if true { - trace!("container_stream_from_bytes_stream see frame {:?}", x); - } - }); let stream = EventsFromFrames::::new(frames, dbgdesc); - let stream = stream.inspect(|x| { - if true { - use items_0::streamitem::RangeCompletableItem::*; - use items_0::streamitem::StreamItem::*; - match x { - Ok(DataItem(Data(x))) => { - trace!("EventsFromFrames yields item len {}", x.len()); - } - _ => { - trace!("EventsFromFrames yields item {:?}", x); - } - } - } - }); Ok(stream) } diff --git a/src/timebin/fromevents.rs b/src/timebin/fromevents.rs index 3e3cddf..2cd0499 100644 --- a/src/timebin/fromevents.rs +++ b/src/timebin/fromevents.rs @@ -1,13 +1,11 @@ use super::cached::reader::EventsReadProvider; use crate::events::convertforbinning::ConvertForBinning; +use crate::log::*; use futures_util::Stream; use futures_util::StreamExt; -use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; -use items_0::streamitem::StreamItem; use items_0::timebin::BinsBoxed; use items_2::binning::timeweight::timeweight_events_dyn::BinnedEventsTimeweightStream; -use netpod::log::*; use netpod::BinnedRange; use netpod::TsNano; use query::api4::events::EventsSubQuery; @@ -16,7 +14,7 @@ use std::sync::Arc; use std::task::Context; use std::task::Poll; -macro_rules! trace_emit { ($($arg:tt)*) => ( if true { trace!($($arg)*); } ) } +macro_rules! trace_emit { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } #[derive(Debug, thiserror::Error)] #[cstm(name = "ReadingBinnedFromEvents")] @@ -47,23 +45,6 @@ impl BinnedFromEvents { } else { return Err(Error::ExpectTimeweighted); }; - let stream = stream.map(|item| match item { - Ok(x) => match x { - StreamItem::DataItem(x) => match x { - RangeCompletableItem::Data(x) => { - trace_emit!("see item {:?}", x); - Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))) - } - RangeCompletableItem::RangeComplete => { - debug!("BinnedFromEvents sees range final"); - Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) - } - }, - StreamItem::Log(x) => Ok(StreamItem::Log(x)), - StreamItem::Stats(x) => Ok(StreamItem::Stats(x)), - }, - Err(e) => Err(e), - }); let ret = Self { stream: Box::pin(stream), }; diff --git a/src/timebin/fromlayers.rs b/src/timebin/fromlayers.rs index ea4b06b..22ca79f 100644 --- a/src/timebin/fromlayers.rs +++ b/src/timebin/fromlayers.rs @@ -25,6 +25,8 @@ use std::sync::Arc; use std::task::Context; use std::task::Poll; +macro_rules! trace_init { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } + #[derive(Debug, thiserror::Error)] #[cstm(name = "TimeBinnedFromLayers")] pub enum Error { @@ -58,7 +60,7 @@ impl TimeBinnedFromLayers { cache_read_provider: Arc, events_read_provider: Arc, ) -> Result { - debug!( + trace_init!( "{}::new {:?} {:?} {:?}", Self::type_name(), ch_conf.series(), @@ -67,7 +69,7 @@ impl TimeBinnedFromLayers { ); let bin_len = DtMs::from_ms_u64(range.bin_len.ms()); if bin_len_layers.contains(&bin_len) { - debug!("{}::new bin_len in layers {:?}", Self::type_name(), range); + trace_init!("{}::new bin_len in layers {:?}", Self::type_name(), range); let inp = GapFill::new( "FromLayers-ongrid".into(), ch_conf.clone(), @@ -85,7 +87,7 @@ impl TimeBinnedFromLayers { let ret = Self { inp: Box::pin(inp) }; Ok(ret) } else { - debug!( + trace_init!( "{}::new bin_len off layers {:?}", Self::type_name(), range @@ -96,7 +98,7 @@ impl TimeBinnedFromLayers { return Err(Error::FinerGridMismatch(bin_len, finer)); } let range_finer = BinnedRange::from_nano_range(range.to_nano_range(), finer); - debug!( + trace_init!( "{}::new next finer from bins {:?} {:?}", Self::type_name(), finer, @@ -121,7 +123,7 @@ impl TimeBinnedFromLayers { Ok(ret) } None => { - debug!("{}::new next finer from events", Self::type_name()); + trace_init!("{}::new next finer from events", Self::type_name()); let series_range = SeriesRange::TimeRange(range.to_nano_range()); let one_before_range = true; let select = EventsSubQuerySelect::new( @@ -139,7 +141,7 @@ impl TimeBinnedFromLayers { let inp = BinnedFromEvents::new(range, evq, do_time_weight, events_read_provider)?; let ret = Self { inp: Box::pin(inp) }; - debug!("{}::new setup from events", Self::type_name()); + trace_init!("{}::new setup from events", Self::type_name()); Ok(ret) } } diff --git a/src/timebinnedjson.rs b/src/timebinnedjson.rs index 5bda731..4d96e7f 100644 --- a/src/timebinnedjson.rs +++ b/src/timebinnedjson.rs @@ -157,40 +157,29 @@ pub async fn timebinnable_stream_sf_databuffer_channelevents( if true { let r1 = evs .as_any_mut() - .downcast_mut::>() + .downcast_mut::>() .is_some(); let r2 = evs - .as_mut() .as_any_mut() - .downcast_mut::>() + .downcast_mut::>>() .is_some(); let r3 = evs - .as_any_mut() - .downcast_mut::>>() - .is_some(); - let r4 = evs - .as_mut() - .as_any_mut() - .downcast_mut::>>() - .is_some(); - let r5 = evs .as_mut() .as_any_mut() .downcast_mut::() .is_some(); - let r6 = evs + let r4 = evs .as_mut() .as_any_mut() .downcast_mut::>() .is_some(); - debug!("wasm castings: {r1} {r2} {r3} {r4} {r5} {r6}"); + debug!("wasm castings: {r1} {r2} {r3} {r4}"); } if let Some(evs) = evs.as_any_mut().downcast_mut::() { match evs { ChannelEvents::Events(evs) => { - if let Some(evs) = evs - .as_any_mut() - .downcast_mut::>() + if let Some(evs) = + evs.as_any_mut().downcast_mut::>() { use items_0::WithLen; if evs.len() == 0 { @@ -304,9 +293,8 @@ async fn timebinned_stream( )?; let stream = stream.map(|item| { use items_0::timebin::BinningggContainerBinsDyn; - on_sitemty_data!(item, |mut x: Box| { - x.fix_numerics(); - let ret = Box::new(x) as Box; + on_sitemty_data!(item, |x: Box| { + let ret = x.boxed_into_collectable_box(); Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret))) }) }); @@ -355,7 +343,7 @@ pub async fn timebinned_json( match collres { CollectResult::Some(collres) => { let x = collres.into_user_facing_api_type_box(); - let val = x.into_serializable(); + let val = x.into_serializable_json(); let jsval = serde_json::to_string(&val)?; Ok(CollectResult::Some(JsonBytes::new(jsval))) } @@ -369,7 +357,7 @@ fn take_collector_result( match coll.result() { Ok(collres) => { let x = collres.into_user_facing_api_type_box(); - let val = x.into_serializable(); + let val = x.into_serializable_json(); match serde_json::to_string(&val) { Ok(jsval) => Some(JsonBytes::new(jsval)), Err(e) => Some(JsonBytes::new("{\"ERROR\":true}")), @@ -387,7 +375,6 @@ pub async fn timebinned_json_framed( events_read_provider: Arc, timeout_provider: Box, ) -> Result { - trace!("timebinned_json_framed"); let binned_range = query.covering_range()?; // TODO derive better values, from query let stream = timebinned_stream(