diff --git a/src/events/convertforbinning.rs b/src/events/convertforbinning.rs index d8fa9cf..44aa7c2 100644 --- a/src/events/convertforbinning.rs +++ b/src/events/convertforbinning.rs @@ -1,5 +1,6 @@ use futures_util::Stream; use futures_util::StreamExt; +use items_0::streamitem::sitem_data; use items_0::streamitem::RangeCompletableItem::*; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem::*; @@ -10,17 +11,20 @@ use std::pin::Pin; use std::task::Context; use std::task::Poll; -pub struct ConvertForBinning { - inp: Pin> + Send>>, +pub struct ConvertForBinning { + inp: INP, } -impl ConvertForBinning { - pub fn new(inp: Pin> + Send>>) -> Self { +impl ConvertForBinning { + pub fn new(inp: INP) -> Self { Self { inp } } } -impl Stream for ConvertForBinning { +impl Stream for ConvertForBinning +where + INP: Stream> + Unpin, +{ type Item = Sitemty; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { @@ -29,38 +33,44 @@ impl Stream for ConvertForBinning { Ready(Some(item)) => match &item { Ok(DataItem(Data(cevs))) => match cevs { ChannelEvents::Events(evs) => { - if let Some(evs) = evs - .as_any_ref() - .downcast_ref::>() - { - let mut dst = ContainerEvents::new(); - for (ts, val) in evs.iter_zip() { - dst.push_back(ts, val.ix); - } - let item = Ok(DataItem(Data(ChannelEvents::Events(Box::new(dst))))); - Ready(Some(item)) - } else if let Some(evs) = - evs.as_any_ref().downcast_ref::>() - { - let mut dst = ContainerEvents::new(); - for (ts, val) in evs.iter_zip() { - dst.push_back(ts, val as u8); - } - let item = Ok(DataItem(Data(ChannelEvents::Events(Box::new(dst))))); - Ready(Some(item)) - } else if let Some(evs) = - evs.as_any_ref().downcast_ref::>() - { - let mut dst = ContainerEvents::new(); - for (ts, _) in evs.iter_zip() { - dst.push_back(ts, 1); - } - let item = Ok(DataItem(Data(ChannelEvents::Events(Box::new(dst))))); - Ready(Some(item)) - } else { - Ready(Some(item)) - } + let evs = evs.to_f32_for_binning_v01(); + let item = ChannelEvents::Events(evs); + let item = sitem_data(item); + Ready(Some(item)) } + // ChannelEvents::Events(evs) => { + // if let Some(evs) = evs + // .as_any_ref() + // .downcast_ref::>() + // { + // let mut dst = ContainerEvents::new(); + // for (ts, val) in evs.iter_zip() { + // dst.push_back(ts, val.ix); + // } + // let item = Ok(DataItem(Data(ChannelEvents::Events(Box::new(dst))))); + // Ready(Some(item)) + // } else if let Some(evs) = + // evs.as_any_ref().downcast_ref::>() + // { + // let mut dst = ContainerEvents::new(); + // for (ts, val) in evs.iter_zip() { + // dst.push_back(ts, val as u8); + // } + // let item = Ok(DataItem(Data(ChannelEvents::Events(Box::new(dst))))); + // Ready(Some(item)) + // } else if let Some(evs) = + // evs.as_any_ref().downcast_ref::>() + // { + // let mut dst = ContainerEvents::new(); + // for (ts, _) in evs.iter_zip() { + // dst.push_back(ts, 1); + // } + // let item = Ok(DataItem(Data(ChannelEvents::Events(Box::new(dst))))); + // Ready(Some(item)) + // } else { + // Ready(Some(item)) + // } + // } ChannelEvents::Status(_) => Ready(Some(item)), }, _ => Ready(Some(item)), diff --git a/src/plaineventscbor.rs b/src/plaineventscbor.rs index 75bced1..3784d43 100644 --- a/src/plaineventscbor.rs +++ b/src/plaineventscbor.rs @@ -9,11 +9,12 @@ use netpod::ChannelTypeConfigGen; use netpod::ReqCtx; use query::api4::events::PlainEventsQuery; -#[derive(Debug, thiserror::Error)] -#[cstm(name = "PlainEventsCbor")] -pub enum Error { - Stream(#[from] crate::plaineventsstream::Error), -} +autoerr::create_error_v1!( + name(Error, "PlainEventsCbor"), + enum variants { + Stream(#[from] crate::plaineventsstream::Error), + }, +); pub async fn plain_events_cbor_stream( evq: &PlainEventsQuery, diff --git a/src/test/events_reader.rs b/src/test/events_reader.rs index c4132c7..c7f5a6d 100644 --- a/src/test/events_reader.rs +++ b/src/test/events_reader.rs @@ -3,27 +3,32 @@ use crate::timebin::cached::reader::EventsReading; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; +use items_2::binning::container_events::ContainerEvents; +use items_2::binning::container_events::EventValueType; use items_2::channelevents::ChannelEvents; use netpod::range::evrange::NanoRange; use query::api4::events::EventsSubQuery; -#[derive(Debug, thiserror::Error)] -#[cstm(name = "TestEventsReader")] -pub enum Error {} - -pub struct TestEventsReader { +pub struct TestEventsReader { range: NanoRange, + gen: GEN, } -impl TestEventsReader { - pub fn new(range: NanoRange) -> Self { - Self { range } +impl TestEventsReader { + pub fn new(range: NanoRange, gen: GEN) -> Self { + Self { range, gen } } } -impl EventsReadProvider for TestEventsReader { +impl EventsReadProvider for TestEventsReader +where + GEN: Fn(NanoRange) -> IT + Send + Sync, + IT: Iterator> + Send + 'static, + TY: EventValueType, +{ fn read(&self, evq: EventsSubQuery) -> EventsReading { - let iter = items_2::testgen::events_gen::new_events_gen_dim1_f32_v00(self.range.clone()); + // let iter = items_2::testgen::events_gen::new_events_gen_dim1_f32_v00(self.range.clone()); + let iter = (self.gen)(self.range.clone()); let iter = iter .map(|x| { let x = Box::new(x); diff --git a/src/test/timebin/fromlayers.rs b/src/test/timebin/fromlayers.rs index 00d8791..88099c3 100644 --- a/src/test/timebin/fromlayers.rs +++ b/src/test/timebin/fromlayers.rs @@ -46,7 +46,10 @@ async fn timebin_from_layers_inner() -> Result<(), Error> { end: 1000 * 1000 * 1000 * 2, }; let cache_read_provider = Arc::new(DummyCacheReadProvider::new()); - let events_read_provider = Arc::new(TestEventsReader::new(nano_range.clone())); + let events_read_provider = Arc::new(TestEventsReader::new( + nano_range.clone(), + items_2::testgen::events_gen::new_events_gen_dim1_f32_v00, + )); // let one_before_range = true; // let series_range = SeriesRange::TimeRange(nano_range.clone()); // let select = EventsSubQuerySelect::new( @@ -109,7 +112,10 @@ async fn timebin_from_layers_1layer_inner() -> Result<(), Error> { end: 1000 * 1000 * 1000 * 2, }; let cache_read_provider = Arc::new(DummyCacheReadProvider::new()); - let events_read_provider = Arc::new(TestEventsReader::new(nano_range.clone())); + let events_read_provider = Arc::new(TestEventsReader::new( + nano_range.clone(), + items_2::testgen::events_gen::new_events_gen_dim1_f32_v00, + )); // let one_before_range = true; // let series_range = SeriesRange::TimeRange(nano_range.clone()); // let select = EventsSubQuerySelect::new( diff --git a/src/timebin/fromevents.rs b/src/timebin/fromevents.rs index 2cd0499..d506345 100644 --- a/src/timebin/fromevents.rs +++ b/src/timebin/fromevents.rs @@ -16,12 +16,13 @@ use std::task::Poll; macro_rules! trace_emit { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } -#[derive(Debug, thiserror::Error)] -#[cstm(name = "ReadingBinnedFromEvents")] -pub enum Error { - ExpectTimerange, - ExpectTimeweighted, -} +autoerr::create_error_v1!( + name(Error, "ReadingBinnedFromEvents"), + enum variants { + ExpectTimerange, + ExpectTimeweighted, + }, +); pub struct BinnedFromEvents { stream: Pin> + Send>>, @@ -38,7 +39,7 @@ impl BinnedFromEvents { return Err(Error::ExpectTimerange); } let stream = read_provider.read(evq); - let stream = ConvertForBinning::new(Box::pin(stream)); + let stream = ConvertForBinning::new(stream); let stream = if do_time_weight { let stream = Box::pin(stream); BinnedEventsTimeweightStream::new(range, stream) diff --git a/src/timebin/gapfill.rs b/src/timebin/gapfill.rs index cd41f0e..a2a8cc8 100644 --- a/src/timebin/gapfill.rs +++ b/src/timebin/gapfill.rs @@ -40,11 +40,8 @@ autoerr::create_error_v1!( name(Error, "BinCachedGapFill"), enum variants { CacheReader(#[from] super::cached::reader::Error), - // #[error("GapFromFiner({0}, {1}, {2})")] GapFromFiner(TsNano, TsNano, DtMs), - // #[error("MissingBegFromFiner({0}, {1}, {2})")] MissingBegFromFiner(TsNano, TsNano, DtMs), - // #[error("InputBeforeRange({0}, {1})")] InputBeforeRange(NanoRange, BinnedRange), EventsReader(#[from] super::fromevents::Error), }, diff --git a/src/timebinnedjson.rs b/src/timebinnedjson.rs index 4388792..95d2cac 100644 --- a/src/timebinnedjson.rs +++ b/src/timebinnedjson.rs @@ -1,3 +1,4 @@ +use crate::cbor_stream::CborStream; use crate::collect::Collect; use crate::collect::CollectResult; use crate::json_stream::JsonStream; @@ -9,6 +10,7 @@ use crate::tcprawclient::make_sub_query; use crate::tcprawclient::OpenBoxedBytesStreamsBox; use crate::timebin::cached::reader::CacheReadProvider; use crate::timebin::cached::reader::EventsReadProvider; +use bytes::Bytes; use futures_util::future::BoxFuture; use futures_util::Stream; use futures_util::StreamExt; @@ -19,6 +21,7 @@ use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; use items_2::channelevents::ChannelEvents; +use items_2::jsonbytes::CborBytes; use items_2::jsonbytes::JsonBytes; use items_2::merger::Merger; use netpod::log::*; @@ -35,16 +38,17 @@ use std::sync::Arc; use std::time::Duration; use std::time::Instant; -#[derive(Debug, thiserror::Error)] -#[cstm(name = "TimebinnedJson")] -pub enum Error { - Query(#[from] query::api4::binned::Error), - FromLayers(#[from] super::timebin::fromlayers::Error), - TcpRawClient(#[from] crate::tcprawclient::Error), - Collect(#[from] crate::collect::Error), - Json(#[from] serde_json::Error), - Msg(String), -} +autoerr::create_error_v1!( + name(Error, "TimebinnedJson"), + enum variants { + Query(#[from] query::api4::binned::Error), + FromLayers(#[from] super::timebin::fromlayers::Error), + TcpRawClient(#[from] crate::tcprawclient::Error), + Collect(#[from] crate::collect::Error), + Json(#[from] serde_json::Error), + Msg(String), + }, +); struct ErrMsg(E) where @@ -287,8 +291,10 @@ async fn timebinned_stream( events_read_provider, )?; let stream = stream.map(|item| { - use items_0::timebin::BinningggContainerBinsDyn; - on_sitemty_data!(item, |x: Box| { + // use items_0::timebin::BinningggContainerBinsDyn; + use items_0::timebin::BinsBoxed; + on_sitemty_data!(item, |mut x: BinsBoxed| { + x.fix_numerics(); let ret = x.boxed_into_collectable_box(); Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret))) }) @@ -346,7 +352,7 @@ pub async fn timebinned_json( } } -fn take_collector_result( +fn take_collector_result_json( coll: &mut Box, ) -> Option { match coll.result() { @@ -355,10 +361,45 @@ fn take_collector_result( let val = x.into_serializable_json(); match serde_json::to_string(&val) { Ok(jsval) => Some(JsonBytes::new(jsval)), - Err(e) => Some(JsonBytes::new("{\"ERROR\":true}")), + Err(e) => { + error!("{}", e); + Some(JsonBytes::new("{\"ERROR\":true}")) + } } } - Err(e) => Some(JsonBytes::new("{\"ERROR\":true}")), + Err(e) => { + error!("{}", e); + Some(JsonBytes::new("{\"ERROR\":true}")) + } + } +} + +fn take_collector_result_cbor( + coll: &mut Box, +) -> Option { + match coll.result() { + Ok(collres) => { + let x = collres.into_user_facing_api_type_box(); + let val = x.into_serializable_normal(); + let mut buf = Vec::with_capacity(64); + ciborium::into_writer(&val, &mut buf).expect("cbor serialize"); + let bytes = Bytes::from(buf); + let item = CborBytes::new(bytes); + Some(item) + } + Err(e) => { + error!("{}", e); + use ciborium::cbor; + let val = cbor!({ + "ERROR" => true, + }) + .unwrap(); + let mut buf = Vec::with_capacity(64); + ciborium::into_writer(&val, &mut buf).expect("cbor serialize"); + let bytes = Bytes::from(buf); + let item = CborBytes::new(bytes); + Some(item) + } } } @@ -408,7 +449,7 @@ pub async fn timebinned_json_framed( coll.ingest(&mut item); if coll.len() >= 128 || last_emit.elapsed() >= timeout_content_2 { last_emit = Instant::now(); - take_collector_result(coll).map(|x| Ok(x)) + take_collector_result_json(coll).map(|x| Ok(x)) } else { // Some(serde_json::Value::String(format!("coll len {}", coll.len()))) None @@ -432,7 +473,7 @@ pub async fn timebinned_json_framed( None => { if let Some(coll) = coll.as_mut() { last_emit = Instant::now(); - take_collector_result(coll).map(|x| Ok(x)) + take_collector_result_json(coll).map(|x| Ok(x)) } else { // Some(serde_json::Value::String(format!( // "end of input but no collector to take something from" @@ -445,7 +486,7 @@ pub async fn timebinned_json_framed( if let Some(coll) = coll.as_mut() { if coll.len() != 0 { last_emit = Instant::now(); - take_collector_result(coll).map(|x| Ok(x)) + take_collector_result_json(coll).map(|x| Ok(x)) } else { // Some(serde_json::Value::String(format!("timeout but nothing to do"))) None @@ -458,13 +499,101 @@ pub async fn timebinned_json_framed( } }); let stream = stream.filter_map(|x| futures_util::future::ready(x)); - // TODO skip the intermediate conversion to js value, go directly to string data - // let stream = stream.map(|x| match x { - // Ok(x) => Ok(JsonBytes::new(serde_json::to_string(&x).unwrap())), - // Err(e) => Err(crate::json_stream::Error::from(crate::json_stream::ErrMsg( - // e, - // ))), - // }); let stream = stream.map_err(|e| crate::json_stream::Error::Msg(e.to_string())); Ok(Box::pin(stream)) } + +pub async fn timebinned_cbor_framed( + query: BinnedQuery, + ch_conf: ChannelTypeConfigGen, + ctx: &ReqCtx, + cache_read_provider: Arc, + events_read_provider: Arc, + timeout_provider: Box, +) -> Result { + let binned_range = query.covering_range()?; + // TODO derive better values, from query + let stream = timebinned_stream( + query.clone(), + binned_range.clone(), + ch_conf, + ctx, + cache_read_provider, + events_read_provider, + ) + .await?; + let timeout_content_base = query + .timeout_content() + .unwrap_or(Duration::from_millis(1000)) + .min(Duration::from_millis(5000)) + .max(Duration::from_millis(100)); + let timeout_content_2 = timeout_content_base * 2 / 3; + let mut coll = None; + let mut last_emit = Instant::now(); + let stream = stream + .map(|x| Some(x)) + .chain(futures_util::stream::iter([None])); + let stream = TimeoutableStream::new(timeout_content_base, timeout_provider, stream); + let stream = stream.map(move |x| { + match x { + Some(x) => match x { + Some(x) => match x { + Ok(x) => match x { + StreamItem::DataItem(x) => match x { + RangeCompletableItem::Data(mut item) => { + let coll = coll.get_or_insert_with(|| item.new_collector()); + coll.ingest(&mut item); + if coll.len() >= 128 || last_emit.elapsed() >= timeout_content_2 { + last_emit = Instant::now(); + take_collector_result_cbor(coll).map(|x| Ok(x)) + } else { + None + } + } + RangeCompletableItem::RangeComplete => None, + }, + StreamItem::Log(x) => { + debug!("{x:?}"); + // Some(serde_json::Value::String(format!("{x:?}"))) + None + } + StreamItem::Stats(x) => { + debug!("{x:?}"); + // Some(serde_json::Value::String(format!("{x:?}"))) + None + } + }, + Err(e) => Some(Err(e)), + }, + None => { + if let Some(coll) = coll.as_mut() { + last_emit = Instant::now(); + take_collector_result_cbor(coll).map(|x| Ok(x)) + } else { + // Some(serde_json::Value::String(format!( + // "end of input but no collector to take something from" + // ))) + None + } + } + }, + None => { + if let Some(coll) = coll.as_mut() { + if coll.len() != 0 { + last_emit = Instant::now(); + take_collector_result_cbor(coll).map(|x| Ok(x)) + } else { + // Some(serde_json::Value::String(format!("timeout but nothing to do"))) + None + } + } else { + // Some(serde_json::Value::String(format!("timeout but no collector"))) + None + } + } + } + }); + let stream = stream.filter_map(|x| futures_util::future::ready(x)); + let stream = stream.map_err(|e| crate::cbor_stream::Error::Msg(e.to_string())); + Ok(Box::pin(stream)) +}