From 1f08e60d54bc602051497f6bbbf8a70656fb24f2 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 14 Nov 2024 12:13:05 +0100 Subject: [PATCH] Remove unused transform types --- src/generators.rs | 53 +++++++++++++++++++------------- src/plaineventsstream.rs | 65 +++++++++++++++++++++++----------------- src/timebinnedjson.rs | 5 ---- src/transform.rs | 25 ---------------- 4 files changed, 70 insertions(+), 78 deletions(-) diff --git a/src/generators.rs b/src/generators.rs index 286bcad..6b15e92 100644 --- a/src/generators.rs +++ b/src/generators.rs @@ -1,5 +1,4 @@ use crate::frames::inmem::BoxedBytesStream; -use crate::transform::build_event_transform; use futures_util::Future; use futures_util::FutureExt; use futures_util::Stream; @@ -53,26 +52,24 @@ pub fn make_test_channel_events_bytes_stream( error!("{e}"); Err(e) } else { - let mut tr = build_event_transform(subq.transform())?; let stream = make_test_channel_events_stream_data(subq, node_count, node_ix)?; let stream = stream.map(move |x| { on_sitemty_data!(x, |x: ChannelEvents| { match x { - ChannelEvents::Events(evs) => { - let evs = tr.0.transform(evs); - Ok(StreamItem::DataItem(RangeCompletableItem::Data(ChannelEvents::Events( - evs, - )))) - } - ChannelEvents::Status(x) => Ok(StreamItem::DataItem(RangeCompletableItem::Data( - ChannelEvents::Status(x), - ))), + ChannelEvents::Events(evs) => Ok(StreamItem::DataItem( + RangeCompletableItem::Data(ChannelEvents::Events(evs)), + )), + ChannelEvents::Status(x) => Ok(StreamItem::DataItem( + RangeCompletableItem::Data(ChannelEvents::Status(x)), + )), } }) }); - let stream = stream - .map_err(sitem_err2_from_string) - .map(|x| x.make_frame_dyn().map(|x| x.freeze()).map_err(sitem_err2_from_string)); + let stream = stream.map_err(sitem_err2_from_string).map(|x| { + x.make_frame_dyn() + .map(|x| x.freeze()) + .map_err(sitem_err2_from_string) + }); let ret = Box::pin(stream); Ok(ret) } @@ -102,11 +99,17 @@ fn make_test_channel_events_stream_data_inner( let range = subq.range().clone(); let one_before = subq.need_one_before_range(); if chn == "test-gen-i32-dim0-v00" { - Ok(Box::pin(GenerateI32V00::new(node_ix, node_count, range, one_before))) + Ok(Box::pin(GenerateI32V00::new( + node_ix, node_count, range, one_before, + ))) } else if chn == "test-gen-i32-dim0-v01" { - Ok(Box::pin(GenerateI32V01::new(node_ix, node_count, range, one_before))) + Ok(Box::pin(GenerateI32V01::new( + node_ix, node_count, range, one_before, + ))) } else if chn == "test-gen-f64-dim1-v00" { - Ok(Box::pin(GenerateF64V00::new(node_ix, node_count, range, one_before))) + Ok(Box::pin(GenerateF64V00::new( + node_ix, node_count, range, one_before, + ))) } else { let na: Vec<_> = chn.split("-").collect(); if na.len() != 3 { @@ -199,7 +202,9 @@ impl Stream for GenerateI32V00 { } else if self.ts >= self.tsend { self.done = true; self.done_range_final = true; - Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) + Ready(Some(Ok(StreamItem::DataItem( + RangeCompletableItem::RangeComplete, + )))) } else if !self.do_throttle { // To use the generator without throttling, use this scope Ready(Some(self.make_batch())) @@ -302,7 +307,9 @@ impl Stream for GenerateI32V01 { self.done = true; self.done_range_final = true; if self.have_range_final { - Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) + Ready(Some(Ok(StreamItem::DataItem( + RangeCompletableItem::RangeComplete, + )))) } else { continue; } @@ -408,7 +415,9 @@ impl Stream for GenerateF64V00 { } else if self.ts >= self.tsend { self.done = true; self.done_range_final = true; - Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) + Ready(Some(Ok(StreamItem::DataItem( + RangeCompletableItem::RangeComplete, + )))) } else if !self.do_throttle { // To use the generator without throttling, use this scope Ready(Some(self.make_batch())) @@ -519,7 +528,9 @@ impl Stream for GenerateWaveI16V00 { } else if self.ts >= self.tsend { self.done = true; self.done_range_final = true; - Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) + Ready(Some(Ok(StreamItem::DataItem( + RangeCompletableItem::RangeComplete, + )))) } else if !self.do_throttle { // To use the generator without throttling, use this scope Ready(Some(self.make_batch())) diff --git a/src/plaineventsstream.rs b/src/plaineventsstream.rs index c4e3a04..5824744 100644 --- a/src/plaineventsstream.rs +++ b/src/plaineventsstream.rs @@ -1,7 +1,6 @@ use crate::tcprawclient::container_stream_from_bytes_stream; use crate::tcprawclient::make_sub_query; use crate::tcprawclient::OpenBoxedBytesStreamsBox; -use crate::transform::build_merged_event_transform; use futures_util::Stream; use futures_util::StreamExt; use items_0::on_sitemty_data; @@ -43,11 +42,14 @@ pub async fn dyn_events_stream( ctx, ); let inmem_bufcap = subq.inmem_bufcap(); - let mut tr = build_merged_event_transform(evq.transform())?; let bytes_streams = open_bytes.open(subq, ctx.clone()).await?; let mut inps = Vec::new(); for s in bytes_streams { - let s = container_stream_from_bytes_stream::(s, inmem_bufcap.clone(), "TODOdbgdesc".into())?; + let s = container_stream_from_bytes_stream::( + s, + inmem_bufcap.clone(), + "TODOdbgdesc".into(), + )?; let s = Box::pin(s) as Pin> + Send>>; inps.push(s); } @@ -55,30 +57,20 @@ pub async fn dyn_events_stream( // TODO propagate also the max-buf-len for the first stage event reader. // TODO use a mixture of count and byte-size as threshold. let stream = Merger::new(inps, evq.merger_out_len_max()); - - // let stream = stream.map(|item| { - // info!("item after merge: {item:?}"); - // item - // }); - - let stream = crate::rangefilter2::RangeFilter2::new(stream, evq.range().try_into()?, evq.one_before_range()); - - // let stream = stream.map(|item| { - // info!("item after rangefilter: {item:?}"); - // item - // }); - + let stream = crate::rangefilter2::RangeFilter2::new( + stream, + evq.range().try_into()?, + evq.one_before_range(), + ); let stream = stream.map(move |k| { on_sitemty_data!(k, |k| { let k: Box = Box::new(k); - // trace!("got len {}", k.len()); - let k = tr.0.transform(k); Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))) }) }); - if let Some(wasmname) = evq.test_do_wasm() { - let stream = transform_wasm::<_, items_0::streamitem::SitemErrTy>(stream, wasmname, ctx).await?; + let stream = + transform_wasm::<_, items_0::streamitem::SitemErrTy>(stream, wasmname, ctx).await?; Ok(Box::pin(stream)) } else { Ok(Box::pin(stream)) @@ -123,7 +115,8 @@ where let mut store = wasmer::Store::default(); let module = wasmer::Module::new(&store, wasm).unwrap(); // TODO assert that memory is large enough - let memory = wasmer::Memory::new(&mut store, wasmer::MemoryType::new(10, Some(30), false)).unwrap(); + let memory = + wasmer::Memory::new(&mut store, wasmer::MemoryType::new(10, Some(30), false)).unwrap(); let import_object = wasmer::imports! { "env" => { "memory" => memory.clone(), @@ -157,14 +150,25 @@ where .as_any_mut() .downcast_mut::>>() .is_some(); - let r5 = evs.as_mut().as_any_mut().downcast_mut::().is_some(); - let r6 = evs.as_mut().as_any_mut().downcast_mut::>().is_some(); + let r5 = evs + .as_mut() + .as_any_mut() + .downcast_mut::() + .is_some(); + let r6 = evs + .as_mut() + .as_any_mut() + .downcast_mut::>() + .is_some(); debug!("wasm castings: {r1} {r2} {r3} {r4} {r5} {r6}"); } if let Some(evs) = evs.as_any_mut().downcast_mut::() { match evs { ChannelEvents::Events(evs) => { - if let Some(evs) = evs.as_any_mut().downcast_mut::>() { + if let Some(evs) = evs + .as_any_mut() + .downcast_mut::>() + { use items_0::WithLen; if evs.len() == 0 { debug!("wasm empty EventsDim0"); @@ -181,12 +185,17 @@ where let wmemoff = buffer_ptr as u64; let view = memory.view(&store); // TODO is the offset bytes or elements? - let wsl = WasmSlice::::new(&view, wmemoff, sl.len() as _).unwrap(); + let wsl = + WasmSlice::::new(&view, wmemoff, sl.len() as _) + .unwrap(); // debug!("wasm pages {:?} data size {:?}", view.size(), view.data_size()); wsl.write_slice(&sl).unwrap(); let ptr = wsl.as_ptr32(); debug!("ptr {:?} offset {}", ptr, ptr.offset()); - let params = [Value::I32(ptr.offset() as _), Value::I32(sl.len() as _)]; + let params = [ + Value::I32(ptr.offset() as _), + Value::I32(sl.len() as _), + ]; let res = dummy1.call(&mut store, ¶ms).unwrap(); match res[0] { Value::I32(x) => { @@ -201,7 +210,9 @@ where } // Init the slice again because we need to drop ownership for the function call. let view = memory.view(&store); - let wsl = WasmSlice::::new(&view, wmemoff, sl.len() as _).unwrap(); + let wsl = + WasmSlice::::new(&view, wmemoff, sl.len() as _) + .unwrap(); wsl.read_slice(sl).unwrap(); } } diff --git a/src/timebinnedjson.rs b/src/timebinnedjson.rs index 1502233..9b7c794 100644 --- a/src/timebinnedjson.rs +++ b/src/timebinnedjson.rs @@ -10,7 +10,6 @@ use crate::tcprawclient::make_sub_query; use crate::tcprawclient::OpenBoxedBytesStreamsBox; use crate::timebin::cached::reader::CacheReadProvider; use crate::timebin::cached::reader::EventsReadProvider; -use crate::transform::build_merged_event_transform; use futures_util::future::BoxFuture; use futures_util::Stream; use futures_util::StreamExt; @@ -90,7 +89,6 @@ pub async fn timebinnable_stream_sf_databuffer_channelevents( ); let inmem_bufcap = subq.inmem_bufcap(); let _wasm1 = subq.wasm1().map(ToString::to_string); - let mut tr = build_merged_event_transform(subq.transform())?; let bytes_streams = open_bytes.open(subq, ctx.as_ref().clone()).await?; let mut inps = Vec::new(); for s in bytes_streams { @@ -112,10 +110,7 @@ pub async fn timebinnable_stream_sf_databuffer_channelevents( use StreamItem::*; match k { Ok(DataItem(Data(ChannelEvents::Events(k)))) => { - // let k = k; - // let k: Box = Box::new(k); let k = k.to_dim0_f32_for_binning(); - let k = tr.0.transform(k); Ok(StreamItem::DataItem(RangeCompletableItem::Data( ChannelEvents::Events(k), ))) diff --git a/src/transform.rs b/src/transform.rs index 0660fe8..0618e00 100644 --- a/src/transform.rs +++ b/src/transform.rs @@ -1,12 +1,7 @@ use items_0::transform::EventStreamTrait; -use items_0::transform::TransformEvent; use items_0::transform::TransformProperties; use items_0::transform::WithTransformProperties; -use items_2::transform::make_transform_identity; -use items_2::transform::make_transform_min_max_avg; -use items_2::transform::make_transform_pulse_id_diff; use query::transform::EventTransformQuery; -use query::transform::TransformQuery; use std::pin::Pin; #[derive(Debug, thiserror::Error)] @@ -16,26 +11,6 @@ pub enum Error { UnhandledQuery(EventTransformQuery), } -pub fn build_event_transform(tr: &TransformQuery) -> Result { - let trev = tr.get_tr_event(); - match trev { - EventTransformQuery::ValueFull => Ok(make_transform_identity()), - EventTransformQuery::MinMaxAvgDev => Ok(make_transform_min_max_avg()), - EventTransformQuery::ArrayPick(..) => Err(Error::UnhandledQuery(trev.clone())), - EventTransformQuery::PulseIdDiff => Ok(make_transform_pulse_id_diff()), - EventTransformQuery::EventBlobsVerbatim => Err(Error::UnhandledQuery(trev.clone())), - EventTransformQuery::EventBlobsUncompressed => Err(Error::UnhandledQuery(trev.clone())), - } -} - -pub fn build_merged_event_transform(tr: &TransformQuery) -> Result { - let trev = tr.get_tr_event(); - match trev { - EventTransformQuery::PulseIdDiff => Ok(make_transform_pulse_id_diff()), - _ => Ok(make_transform_identity()), - } -} - // TODO remove, in its current usage it reboxes pub struct EventsToTimeBinnable { inp: Pin>,