diff --git a/src/eventsplainreader.rs b/src/eventsplainreader.rs index 8805865..bb08bda 100644 --- a/src/eventsplainreader.rs +++ b/src/eventsplainreader.rs @@ -1,7 +1,7 @@ use crate::tcprawclient::OpenBoxedBytesStreamsBox; +use crate::timebin::cached::reader::CacheReadProvider; use crate::timebin::cached::reader::EventsReadProvider; use crate::timebin::cached::reader::EventsReading; -use crate::timebin::CacheReadProvider; use futures_util::Future; use futures_util::FutureExt; use futures_util::Stream; @@ -73,7 +73,9 @@ impl EventsReadProvider for SfDatabufferEventReadProvider { fn read(&self, evq: EventsSubQuery) -> EventsReading { let range = match evq.range() { netpod::range::evrange::SeriesRange::TimeRange(x) => x.clone(), - netpod::range::evrange::SeriesRange::PulseRange(_) => panic!("not available for pulse range"), + netpod::range::evrange::SeriesRange::PulseRange(_) => { + panic!("not available for pulse range") + } }; let ctx = self.ctx.clone(); let open_bytes = self.open_bytes.clone(); @@ -104,19 +106,24 @@ impl DummyCacheReadProvider { } } +// TODO impl impl CacheReadProvider for DummyCacheReadProvider { fn read( &self, - series: u64, - bin_len: netpod::DtMs, - msp: u64, - offs: std::ops::Range, + _series: u64, + _bin_len: netpod::DtMs, + _msp: u64, + _offs: std::ops::Range, ) -> crate::timebin::cached::reader::CacheReading { let stream = futures_util::future::ready(Ok(None)); crate::timebin::cached::reader::CacheReading::new(Box::pin(stream)) } - fn write(&self, series: u64, bins: items_0::timebin::BinsBoxed) -> crate::timebin::cached::reader::CacheWriting { + fn write( + &self, + _series: u64, + _bins: items_0::timebin::BinsBoxed, + ) -> crate::timebin::cached::reader::CacheWriting { let fut = futures_util::future::ready(Ok(())); crate::timebin::cached::reader::CacheWriting::new(Box::pin(fut)) } diff --git a/src/lib.rs b/src/lib.rs index 2ce9cb4..a42da1c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -30,3 +30,8 @@ pub mod teststream; pub mod timebin; pub mod timebinnedjson; pub mod transform; + +#[allow(unused)] +fn todoval() -> T { + todo!() +} diff --git a/src/test/collect.rs b/src/test/collect.rs index 862aabb..6da199e 100644 --- a/src/test/collect.rs +++ b/src/test/collect.rs @@ -1,25 +1,3 @@ -use crate::collect::Collect; -use crate::collect::CollectResult; -use crate::test::runfut; -use crate::transform::build_event_transform; -use crate::transform::EventsToTimeBinnable; -use futures_util::stream; -use futures_util::StreamExt; -use items_0::on_sitemty_data; -use items_0::streamitem::sitem_data; -use items_0::streamitem::RangeCompletableItem; -use items_0::streamitem::StreamItem; -use items_0::WithLen; -use items_2::eventsdim0::EventsDim0CollectorOutput; -use items_2::streams::PlainEventStream; -use items_2::testgen::make_some_boxed_d0_f32; -use netpod::log::*; -use netpod::timeunits::SEC; -use netpod::FromUrl; -use query::transform::TransformQuery; -use std::time::Duration; -use std::time::Instant; - // #[test] // fn collect_channel_events_00() -> Result<(), Error> { // let fut = async { diff --git a/src/test/events.rs b/src/test/events.rs index c594887..f1dbca6 100644 --- a/src/test/events.rs +++ b/src/test/events.rs @@ -55,12 +55,22 @@ async fn merged_events_inner() -> Result<(), Error> { let evq = PlainEventsQuery::new(channel, range); let open_bytes = StreamOpener::new(); let open_bytes = Arc::pin(open_bytes); - let stream = plain_events_cbor_stream(&evq, ch_conf.clone().into(), &ctx, open_bytes, todo!()) - .await - .unwrap(); + let timeout_provider = crate::todoval(); + let stream = plain_events_cbor_stream( + &evq, + ch_conf.clone().into(), + &ctx, + open_bytes, + timeout_provider, + ) + .await + .unwrap(); let stream = lenframed::length_framed(stream); - let stream = - FramedBytesToSitemtyDynEventsStream::new(stream, ch_conf.scalar_type().clone(), ch_conf.shape().clone()); + let stream = FramedBytesToSitemtyDynEventsStream::new( + stream, + ch_conf.scalar_type().clone(), + ch_conf.shape().clone(), + ); let stream = only_first_err(stream); stream .for_each(|item| { @@ -84,7 +94,9 @@ impl OpenBoxedBytesStreams for StreamOpener { &self, subq: EventsSubQuery, _ctx: ReqCtx, - ) -> Pin, crate::tcprawclient::Error>> + Send>> { + ) -> Pin< + Box, crate::tcprawclient::Error>> + Send>, + > { Box::pin(stream_opener(subq).map_err(|e| crate::tcprawclient::Error::Msg(format!("{e}")))) } } diff --git a/src/teststream.rs b/src/teststream.rs index 3460a9c..18f77ec 100644 --- a/src/teststream.rs +++ b/src/teststream.rs @@ -8,7 +8,11 @@ use netpod::range::evrange::SeriesRange; use query::api4::events::EventsSubQuery; use std::pin::Pin; -fn make_stream(chname: &str, range: &SeriesRange) -> Pin> + Send>> { +fn make_stream( + chname: &str, + range: &SeriesRange, +) -> Pin> + Send>> { + let _ = ⦥ if chname == "unittest;scylla;cont;scalar;f32" { let e = sitem_err2_from_string(format!("unknown channel {chname}")); let ret = futures_util::stream::iter([Err(e)]); diff --git a/src/timebin.rs b/src/timebin.rs index d4afb5b..b8ed999 100644 --- a/src/timebin.rs +++ b/src/timebin.rs @@ -1,13 +1,8 @@ pub mod cached; pub mod fromevents; +pub mod fromlayers; pub mod timebin; mod basic; -pub(super) mod fromlayers; mod gapfill; mod grid; - -pub(super) use basic::TimeBinnedStream; -pub(super) use fromlayers::TimeBinnedFromLayers; - -pub use cached::reader::CacheReadProvider; diff --git a/src/timebin/basic.rs b/src/timebin/basic.rs index 1452495..620dead 100644 --- a/src/timebin/basic.rs +++ b/src/timebin/basic.rs @@ -28,7 +28,6 @@ pub enum Error { MissingBinnerAfterProcessItem, CreateEmpty, NoBinnerAfterInputDone, - Stream, Msg(String), } @@ -66,7 +65,8 @@ impl TimeBinnedStream where T: TimeBinnableTy, { - pub fn new(inp: SitemtyStream, range: BinnedRangeEnum, do_time_weight: bool) -> Self { + #[allow(unused)] + fn new(inp: SitemtyStream, range: BinnedRangeEnum, do_time_weight: bool) -> Self { Self { inp, range, @@ -85,7 +85,8 @@ where trace2!("process_item {item:?}"); if self.binner.is_none() { trace!("process_item call time_binner_new"); - let binner = item.time_binner_new(self.range.clone(), self.do_time_weight, emit_empty_bins); + let binner = + item.time_binner_new(self.range.clone(), self.do_time_weight, emit_empty_bins); self.binner = Some(binner); } let binner = self.binner.as_mut().unwrap(); @@ -96,7 +97,10 @@ where fn handle_data_item( &mut self, item: T, - ) -> Result::TimeBinner as TimeBinnerTy>::Output>>>, Error> { + ) -> Result< + ControlFlow::TimeBinner as TimeBinnerTy>::Output>>>, + Error, + > { use ControlFlow::*; use Poll::*; trace2!("================= handle_data_item"); @@ -147,7 +151,10 @@ where fn handle_item( &mut self, item: Sitemty, - ) -> Result::TimeBinner as TimeBinnerTy>::Output>>>, Error> { + ) -> Result< + ControlFlow::TimeBinner as TimeBinnerTy>::Output>>>, + Error, + > { use ControlFlow::*; use Poll::*; trace2!("================= handle_item"); @@ -174,19 +181,28 @@ where fn handle_none( &mut self, - ) -> Result::TimeBinner as TimeBinnerTy>::Output>>>, Error> { + ) -> Result< + ControlFlow::TimeBinner as TimeBinnerTy>::Output>>>, + Error, + > { use ControlFlow::*; use Poll::*; trace2!("================= handle_none"); let self_range_final = self.range_final; if let Some(binner) = self.binner.as_mut() { - trace2!("bins ready count before finish {}", binner.bins_ready_count()); + trace2!( + "bins ready count before finish {}", + binner.bins_ready_count() + ); // TODO rework the finish logic if self_range_final { binner.set_range_complete(); } binner.push_in_progress(false); - trace2!("bins ready count after finish {}", binner.bins_ready_count()); + trace2!( + "bins ready count after finish {}", + binner.bins_ready_count() + ); if let Some(bins) = binner.bins_ready() { self.done_data = true; Ok(Break(Ready(sitem_data(bins)))) @@ -216,7 +232,10 @@ where fn poll_input( &mut self, cx: &mut Context, - ) -> Result::TimeBinner as TimeBinnerTy>::Output>>>, Error> { + ) -> Result< + ControlFlow::TimeBinner as TimeBinnerTy>::Output>>>, + Error, + > { use ControlFlow::*; use Poll::*; trace2!("================= poll_input"); @@ -250,7 +269,9 @@ where self.done = true; if self.range_final { info!("TimeBinnedStream EMIT RANGE FINAL"); - Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) + Ready(Some(Ok(StreamItem::DataItem( + RangeCompletableItem::RangeComplete, + )))) } else { continue; } diff --git a/src/timebin/gapfill.rs b/src/timebin/gapfill.rs index 7e1b4b3..58e7473 100644 --- a/src/timebin/gapfill.rs +++ b/src/timebin/gapfill.rs @@ -105,11 +105,15 @@ impl GapFill { debug_init!("new dbgname {}", dbgname); let inp = if cache_usage.is_cache_read() { let series = ch_conf.series().expect("series id for cache read"); - let stream = super::cached::reader::CachedReader::new(series, range.clone(), cache_read_provider.clone())? - .map(|x| match x { - Ok(x) => Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))), - Err(e) => sitem_err_from_string(e), - }); + let stream = super::cached::reader::CachedReader::new( + series, + range.clone(), + cache_read_provider.clone(), + )? + .map(|x| match x { + Ok(x) => Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))), + Err(e) => sitem_err_from_string(e), + }); Box::pin(stream) as Pin> + Send>> } else { let stream = futures_util::stream::empty(); @@ -164,7 +168,9 @@ impl GapFill { } if bins.len() != 0 { let mut bins2 = bins.clone(); - let dst = self.bins_for_cache_write.get_or_insert_with(|| bins.empty()); + let dst = self + .bins_for_cache_write + .get_or_insert_with(|| bins.empty()); bins2.drain_into(dst.as_mut(), 0..bins2.len()); } if self.cache_usage.is_cache_write() { @@ -196,7 +202,12 @@ impl GapFill { } if let Some(last) = self.last_bin_ts2 { if ts1 != last { - trace_handle!("{} detect a gap BETWEEN last {} ts1 {}", self.dbgname, last, ts1); + trace_handle!( + "{} detect a gap BETWEEN last {} ts1 {}", + self.dbgname, + last, + ts1 + ); let mut ret = bins.empty(); let mut bins = bins; bins.drain_into(ret.as_mut(), 0..i); @@ -229,14 +240,19 @@ impl GapFill { Ok(bins) } - fn setup_inp_finer(mut self: Pin<&mut Self>, range: NanoRange, inp_finer_fills_gap: bool) -> Result<(), Error> { + fn setup_inp_finer( + mut self: Pin<&mut Self>, + range: NanoRange, + inp_finer_fills_gap: bool, + ) -> Result<(), Error> { self.inp_finer_range_final = false; self.inp_finer_range_final_max += 1; self.inp_finer_fills_gap = inp_finer_fills_gap; self.exp_finer_range = range.clone(); - if let Some(bin_len_finer) = - super::grid::find_next_finer_bin_len(self.range.bin_len.to_dt_ms(), &self.bin_len_layers) - { + if let Some(bin_len_finer) = super::grid::find_next_finer_bin_len( + self.range.bin_len.to_dt_ms(), + &self.bin_len_layers, + ) { debug_setup!( "{} setup_inp_finer next finer from bins {} {} from {}", self.dbgname, @@ -261,7 +277,10 @@ impl GapFill { self.events_read_provider.clone(), )?; let stream = Box::pin(inp_finer); - let range = BinnedRange::from_nano_range(range_finer.full_range(), self.range.bin_len.to_dt_ms()); + let range = BinnedRange::from_nano_range( + range_finer.full_range(), + self.range.bin_len.to_dt_ms(), + ); let stream = if self.do_time_weight { BinnedBinsTimeweightStream::new(range, stream) } else { @@ -269,7 +288,11 @@ impl GapFill { }; self.inp_finer = Some(Box::pin(stream)); } else { - debug_setup!("{} setup_inp_finer next finer from events {}", self.dbgname, range); + debug_setup!( + "{} setup_inp_finer next finer from events {}", + self.dbgname, + range + ); let series_range = SeriesRange::TimeRange(range.clone()); let one_before_range = true; let select = EventsSubQuerySelect::new( @@ -285,19 +308,17 @@ impl GapFill { self.log_level.clone(), ); let range = BinnedRange::from_nano_range(range.clone(), self.range.bin_len.to_dt_ms()); - let inp = BinnedFromEvents::new(range, evq, self.do_time_weight, self.events_read_provider.clone())?; + let inp = BinnedFromEvents::new( + range, + evq, + self.do_time_weight, + self.events_read_provider.clone(), + )?; self.inp_finer = Some(Box::pin(inp)); } Ok(()) } - fn cache_write(mut self: Pin<&mut Self>, bins: BinsBoxed) -> Result<(), Error> { - // TODO emit bins that are ready for cache write into some separate channel - let series = todo!(); - self.cache_writing = Some(self.cache_read_provider.write(series, bins)); - Ok(()) - } - fn cache_write_on_end(mut self: Pin<&mut Self>) -> Result<(), Error> { if self.inp_finer_fills_gap { // TODO can consider all incoming bins as final by assumption. @@ -346,7 +367,9 @@ impl Stream for GapFill { Ready(Some(Ok(x))) => match x { StreamItem::DataItem(RangeCompletableItem::Data(x)) => { match self.as_mut().handle_bins_finer(x) { - Ok(x) => Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))))), + Ok(x) => Ready(Some(Ok(StreamItem::DataItem( + RangeCompletableItem::Data(x), + )))), Err(e) => Ready(Some(sitem_err_from_string(e))), } } @@ -373,8 +396,10 @@ impl Stream for GapFill { self.dbgname, self.last_bin_ts2 ); - let exp_finer_range = - ::core::mem::replace(&mut self.exp_finer_range, NanoRange { beg: 0, end: 0 }); + let exp_finer_range = ::core::mem::replace( + &mut self.exp_finer_range, + NanoRange { beg: 0, end: 0 }, + ); self.inp_finer = None; if let Some(j) = self.last_bin_ts2 { if j.ns() != exp_finer_range.end() { @@ -385,7 +410,9 @@ impl Stream for GapFill { exp_finer_range ); if self.inp_finer_fills_gap { - Ready(Some(sitem_err_from_string("finer input didn't deliver to the end"))) + Ready(Some(sitem_err_from_string( + "finer input didn't deliver to the end", + ))) } else { warn!( "{} inp_finer Ready(None) last_bin_ts2 {:?} not delivered to the end, but maybe in the future", @@ -416,16 +443,22 @@ impl Stream for GapFill { } } else if let Some(x) = self.inp_buf.take() { match self.as_mut().handle_bins_finer(x) { - Ok(x) => Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))))), + Ok(x) => Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data( + x, + ))))), Err(e) => Ready(Some(sitem_err_from_string(e))), } } else if let Some(inp) = self.inp.as_mut() { match inp.poll_next_unpin(cx) { Ready(Some(Ok(x))) => match x { - StreamItem::DataItem(RangeCompletableItem::Data(x)) => match self.as_mut().handle_bins(x) { - Ok(x) => Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))))), - Err(e) => Ready(Some(sitem_err_from_string(e))), - }, + StreamItem::DataItem(RangeCompletableItem::Data(x)) => { + match self.as_mut().handle_bins(x) { + Ok(x) => Ready(Some(Ok(StreamItem::DataItem( + RangeCompletableItem::Data(x), + )))), + Err(e) => Ready(Some(sitem_err_from_string(e))), + } + } StreamItem::DataItem(RangeCompletableItem::RangeComplete) => { self.inp_range_final = true; continue; @@ -478,7 +511,9 @@ impl Stream for GapFill { self.done = true; if self.inp_finer_range_final_cnt == self.inp_finer_range_final_max { trace_handle!("{} range finale all", self.dbgname); - Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) + Ready(Some(Ok(StreamItem::DataItem( + RangeCompletableItem::RangeComplete, + )))) } else { trace_handle!("{} substreams not final", self.dbgname); continue; diff --git a/src/timebinnedjson.rs b/src/timebinnedjson.rs index 764f8f4..1502233 100644 --- a/src/timebinnedjson.rs +++ b/src/timebinnedjson.rs @@ -8,8 +8,8 @@ use crate::streamtimeout::TimeoutableStream; use crate::tcprawclient::container_stream_from_bytes_stream; use crate::tcprawclient::make_sub_query; use crate::tcprawclient::OpenBoxedBytesStreamsBox; +use crate::timebin::cached::reader::CacheReadProvider; use crate::timebin::cached::reader::EventsReadProvider; -use crate::timebin::CacheReadProvider; use crate::transform::build_merged_event_transform; use futures_util::future::BoxFuture; use futures_util::Stream; @@ -63,7 +63,9 @@ where } #[allow(unused)] -fn assert_stream_send<'u, R>(stream: impl 'u + Send + Stream) -> impl 'u + Send + Stream { +fn assert_stream_send<'u, R>( + stream: impl 'u + Send + Stream, +) -> impl 'u + Send + Stream { stream } @@ -92,7 +94,11 @@ pub async fn timebinnable_stream_sf_databuffer_channelevents( let bytes_streams = open_bytes.open(subq, ctx.as_ref().clone()).await?; let mut inps = Vec::new(); for s in bytes_streams { - let s = container_stream_from_bytes_stream::(s, inmem_bufcap.clone(), "TODOdbgdesc".into())?; + let s = container_stream_from_bytes_stream::( + s, + inmem_bufcap.clone(), + "TODOdbgdesc".into(), + )?; let s = Box::pin(s) as Pin> + Send>>; inps.push(s); } @@ -110,9 +116,9 @@ pub async fn timebinnable_stream_sf_databuffer_channelevents( // let k: Box = Box::new(k); let k = k.to_dim0_f32_for_binning(); let k = tr.0.transform(k); - Ok(StreamItem::DataItem(RangeCompletableItem::Data(ChannelEvents::Events( - k, - )))) + Ok(StreamItem::DataItem(RangeCompletableItem::Data( + ChannelEvents::Events(k), + ))) } _ => k, } @@ -136,7 +142,8 @@ pub async fn timebinnable_stream_sf_databuffer_channelevents( let mut store = wasmer::Store::default(); let module = wasmer::Module::new(&store, wasm).unwrap(); // TODO assert that memory is large enough - let memory = wasmer::Memory::new(&mut store, wasmer::MemoryType::new(10, Some(30), false)).unwrap(); + let memory = + wasmer::Memory::new(&mut store, wasmer::MemoryType::new(10, Some(30), false)).unwrap(); let import_object = wasmer::imports! { "env" => { "memory" => memory.clone(), @@ -170,15 +177,24 @@ pub async fn timebinnable_stream_sf_databuffer_channelevents( .as_any_mut() .downcast_mut::>>() .is_some(); - let r5 = evs.as_mut().as_any_mut().downcast_mut::().is_some(); - let r6 = evs.as_mut().as_any_mut().downcast_mut::>().is_some(); + let r5 = evs + .as_mut() + .as_any_mut() + .downcast_mut::() + .is_some(); + let r6 = evs + .as_mut() + .as_any_mut() + .downcast_mut::>() + .is_some(); debug!("wasm castings: {r1} {r2} {r3} {r4} {r5} {r6}"); } if let Some(evs) = evs.as_any_mut().downcast_mut::() { match evs { ChannelEvents::Events(evs) => { - if let Some(evs) = - evs.as_any_mut().downcast_mut::>() + if let Some(evs) = evs + .as_any_mut() + .downcast_mut::>() { use items_0::WithLen; if evs.len() == 0 { @@ -186,7 +202,8 @@ pub async fn timebinnable_stream_sf_databuffer_channelevents( } else { debug!("wasm see EventsDim0 len {}", evs.len()); let max_len_needed = 16000; - let dummy1 = instance.exports.get_function("dummy1").unwrap(); + let dummy1 = + instance.exports.get_function("dummy1").unwrap(); let s = evs.values.as_mut_slices(); for sl in [s.0, s.1] { if sl.len() > max_len_needed as _ { @@ -196,12 +213,20 @@ pub async fn timebinnable_stream_sf_databuffer_channelevents( let wmemoff = buffer_ptr as u64; let view = memory.view(&store); // TODO is the offset bytes or elements? - let wsl = WasmSlice::::new(&view, wmemoff, sl.len() as _).unwrap(); + let wsl = WasmSlice::::new( + &view, + wmemoff, + sl.len() as _, + ) + .unwrap(); // debug!("wasm pages {:?} data size {:?}", view.size(), view.data_size()); wsl.write_slice(&sl).unwrap(); let ptr = wsl.as_ptr32(); debug!("ptr {:?} offset {}", ptr, ptr.offset()); - let params = [Value::I32(ptr.offset() as _), Value::I32(sl.len() as _)]; + let params = [ + Value::I32(ptr.offset() as _), + Value::I32(sl.len() as _), + ]; let res = dummy1.call(&mut store, ¶ms).unwrap(); match res[0] { Value::I32(x) => { @@ -216,7 +241,12 @@ pub async fn timebinnable_stream_sf_databuffer_channelevents( } // Init the slice again because we need to drop ownership for the function call. let view = memory.view(&store); - let wsl = WasmSlice::::new(&view, wmemoff, sl.len() as _).unwrap(); + let wsl = WasmSlice::::new( + &view, + wmemoff, + sl.len() as _, + ) + .unwrap(); wsl.read_slice(sl).unwrap(); } } @@ -263,7 +293,7 @@ async fn timebinned_stream( } else { netpod::time_bin_len_cache_opts().to_vec() }; - let stream = crate::timebin::TimeBinnedFromLayers::new( + let stream = crate::timebin::fromlayers::TimeBinnedFromLayers::new( ch_conf, cache_usage, query.transform().clone(), @@ -328,17 +358,6 @@ pub async fn timebinned_json( let collres = collected.await?; match collres { CollectResult::Some(collres) => { - let collres = if let Some(_bins) = collres - .as_any_ref() - .downcast_ref::>() - { - debug!("unexpected binned enum"); - // bins.boxed_collected_with_enum_fix() - collres - } else { - debug!("timebinned_json collected type_name {:?}", collres.type_name()); - collres - }; let jsval = collres.to_json_value()?; Ok(CollectResult::Some(jsval)) } @@ -346,24 +365,14 @@ pub async fn timebinned_json( } } -fn take_collector_result(coll: &mut Box) -> Option { +fn take_collector_result( + coll: &mut Box, +) -> Option { match coll.result(None, None) { - Ok(collres) => { - let collres = if let Some(_bins) = collres - .as_any_ref() - .downcast_ref::>() - { - warn!("unexpected binned enum"); - // bins.boxed_collected_with_enum_fix() - collres - } else { - collres - }; - match collres.to_json_value() { - Ok(val) => Some(val), - Err(e) => Some(serde_json::Value::String(format!("{e}"))), - } - } + Ok(collres) => match collres.to_json_value() { + Ok(val) => Some(val), + Err(e) => Some(serde_json::Value::String(format!("{e}"))), + }, Err(e) => Some(serde_json::Value::String(format!("{e}"))), } } @@ -400,7 +409,9 @@ pub async fn timebinned_json_framed( let timeout_content_2 = timeout_content_base * 2 / 3; let mut coll = None; let mut last_emit = Instant::now(); - let stream = stream.map(|x| Some(x)).chain(futures_util::stream::iter([None])); + let stream = stream + .map(|x| Some(x)) + .chain(futures_util::stream::iter([None])); let stream = TimeoutableStream::new(timeout_content_base, timeout_provider, stream); let stream = stream.map(move |x| { match x { @@ -466,7 +477,9 @@ pub async fn timebinned_json_framed( // TODO skip the intermediate conversion to js value, go directly to string data let stream = stream.map(|x| match x { Ok(x) => Ok(JsonBytes::new(serde_json::to_string(&x).unwrap())), - Err(e) => Err(crate::json_stream::Error::from(crate::json_stream::ErrMsg(e))), + Err(e) => Err(crate::json_stream::Error::from(crate::json_stream::ErrMsg( + e, + ))), }); Ok(Box::pin(stream)) } diff --git a/src/transform.rs b/src/transform.rs index 0d5bf3d..0660fe8 100644 --- a/src/transform.rs +++ b/src/transform.rs @@ -1,11 +1,3 @@ -use futures_util::Stream; -use futures_util::StreamExt; -use items_0::collect_s::CollectableDyn; -use items_0::streamitem::RangeCompletableItem; -use items_0::streamitem::Sitemty; -use items_0::streamitem::StreamItem; -use items_0::transform::CollectableStreamBox; -use items_0::transform::EventStreamBox; use items_0::transform::EventStreamTrait; use items_0::transform::TransformEvent; use items_0::transform::TransformProperties; @@ -14,7 +6,6 @@ use items_2::transform::make_transform_identity; use items_2::transform::make_transform_min_max_avg; use items_2::transform::make_transform_pulse_id_diff; use query::transform::EventTransformQuery; -use query::transform::TimeBinningTransformQuery; use query::transform::TransformQuery; use std::pin::Pin; @@ -64,37 +55,3 @@ impl WithTransformProperties for EventsToTimeBinnable { self.inp.query_transform_properties() } } - -pub fn build_full_transform_collectable( - tr: &TransformQuery, - inp: EventStreamBox, -) -> Result { - // TODO this must return a Stream! - //let evs = build_event_transform(tr, inp)?; - let trtb = tr.get_tr_time_binning(); - let a: Pin>> + Send>> = - Box::pin(inp.0.map(|item| match item { - Ok(item) => match item { - StreamItem::DataItem(item) => match item { - RangeCompletableItem::Data(item) => { - let item: Box = Box::new(item); - Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) - } - RangeCompletableItem::RangeComplete => { - Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) - } - }, - StreamItem::Log(item) => Ok(StreamItem::Log(item)), - StreamItem::Stats(item) => Ok(StreamItem::Stats(item)), - }, - Err(e) => Err(e), - })); - let stream: Pin>> + Send>> = - Box::pin(futures_util::stream::empty()); - let stream = Box::pin(futures_util::stream::empty()) as _; - match trtb { - TimeBinningTransformQuery::None => Ok(CollectableStreamBox(stream)), - TimeBinningTransformQuery::TimeWeighted => todo!(), - TimeBinningTransformQuery::Unweighted => todo!(), - } -}