Remove unused transform types

This commit is contained in:
Dominik Werder
2024-11-14 12:13:05 +01:00
parent a6653e2eb5
commit 1f08e60d54
4 changed files with 70 additions and 78 deletions

View File

@@ -1,5 +1,4 @@
use crate::frames::inmem::BoxedBytesStream;
use crate::transform::build_event_transform;
use futures_util::Future;
use futures_util::FutureExt;
use futures_util::Stream;
@@ -53,26 +52,24 @@ pub fn make_test_channel_events_bytes_stream(
error!("{e}");
Err(e)
} else {
let mut tr = build_event_transform(subq.transform())?;
let stream = make_test_channel_events_stream_data(subq, node_count, node_ix)?;
let stream = stream.map(move |x| {
on_sitemty_data!(x, |x: ChannelEvents| {
match x {
ChannelEvents::Events(evs) => {
let evs = tr.0.transform(evs);
Ok(StreamItem::DataItem(RangeCompletableItem::Data(ChannelEvents::Events(
evs,
))))
}
ChannelEvents::Status(x) => Ok(StreamItem::DataItem(RangeCompletableItem::Data(
ChannelEvents::Status(x),
))),
ChannelEvents::Events(evs) => Ok(StreamItem::DataItem(
RangeCompletableItem::Data(ChannelEvents::Events(evs)),
)),
ChannelEvents::Status(x) => Ok(StreamItem::DataItem(
RangeCompletableItem::Data(ChannelEvents::Status(x)),
)),
}
})
});
let stream = stream
.map_err(sitem_err2_from_string)
.map(|x| x.make_frame_dyn().map(|x| x.freeze()).map_err(sitem_err2_from_string));
let stream = stream.map_err(sitem_err2_from_string).map(|x| {
x.make_frame_dyn()
.map(|x| x.freeze())
.map_err(sitem_err2_from_string)
});
let ret = Box::pin(stream);
Ok(ret)
}
@@ -102,11 +99,17 @@ fn make_test_channel_events_stream_data_inner(
let range = subq.range().clone();
let one_before = subq.need_one_before_range();
if chn == "test-gen-i32-dim0-v00" {
Ok(Box::pin(GenerateI32V00::new(node_ix, node_count, range, one_before)))
Ok(Box::pin(GenerateI32V00::new(
node_ix, node_count, range, one_before,
)))
} else if chn == "test-gen-i32-dim0-v01" {
Ok(Box::pin(GenerateI32V01::new(node_ix, node_count, range, one_before)))
Ok(Box::pin(GenerateI32V01::new(
node_ix, node_count, range, one_before,
)))
} else if chn == "test-gen-f64-dim1-v00" {
Ok(Box::pin(GenerateF64V00::new(node_ix, node_count, range, one_before)))
Ok(Box::pin(GenerateF64V00::new(
node_ix, node_count, range, one_before,
)))
} else {
let na: Vec<_> = chn.split("-").collect();
if na.len() != 3 {
@@ -199,7 +202,9 @@ impl Stream for GenerateI32V00 {
} else if self.ts >= self.tsend {
self.done = true;
self.done_range_final = true;
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))))
Ready(Some(Ok(StreamItem::DataItem(
RangeCompletableItem::RangeComplete,
))))
} else if !self.do_throttle {
// To use the generator without throttling, use this scope
Ready(Some(self.make_batch()))
@@ -302,7 +307,9 @@ impl Stream for GenerateI32V01 {
self.done = true;
self.done_range_final = true;
if self.have_range_final {
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))))
Ready(Some(Ok(StreamItem::DataItem(
RangeCompletableItem::RangeComplete,
))))
} else {
continue;
}
@@ -408,7 +415,9 @@ impl Stream for GenerateF64V00 {
} else if self.ts >= self.tsend {
self.done = true;
self.done_range_final = true;
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))))
Ready(Some(Ok(StreamItem::DataItem(
RangeCompletableItem::RangeComplete,
))))
} else if !self.do_throttle {
// To use the generator without throttling, use this scope
Ready(Some(self.make_batch()))
@@ -519,7 +528,9 @@ impl Stream for GenerateWaveI16V00 {
} else if self.ts >= self.tsend {
self.done = true;
self.done_range_final = true;
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))))
Ready(Some(Ok(StreamItem::DataItem(
RangeCompletableItem::RangeComplete,
))))
} else if !self.do_throttle {
// To use the generator without throttling, use this scope
Ready(Some(self.make_batch()))

View File

@@ -1,7 +1,6 @@
use crate::tcprawclient::container_stream_from_bytes_stream;
use crate::tcprawclient::make_sub_query;
use crate::tcprawclient::OpenBoxedBytesStreamsBox;
use crate::transform::build_merged_event_transform;
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::on_sitemty_data;
@@ -43,11 +42,14 @@ pub async fn dyn_events_stream(
ctx,
);
let inmem_bufcap = subq.inmem_bufcap();
let mut tr = build_merged_event_transform(evq.transform())?;
let bytes_streams = open_bytes.open(subq, ctx.clone()).await?;
let mut inps = Vec::new();
for s in bytes_streams {
let s = container_stream_from_bytes_stream::<ChannelEvents>(s, inmem_bufcap.clone(), "TODOdbgdesc".into())?;
let s = container_stream_from_bytes_stream::<ChannelEvents>(
s,
inmem_bufcap.clone(),
"TODOdbgdesc".into(),
)?;
let s = Box::pin(s) as Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>>;
inps.push(s);
}
@@ -55,30 +57,20 @@ pub async fn dyn_events_stream(
// TODO propagate also the max-buf-len for the first stage event reader.
// TODO use a mixture of count and byte-size as threshold.
let stream = Merger::new(inps, evq.merger_out_len_max());
// let stream = stream.map(|item| {
// info!("item after merge: {item:?}");
// item
// });
let stream = crate::rangefilter2::RangeFilter2::new(stream, evq.range().try_into()?, evq.one_before_range());
// let stream = stream.map(|item| {
// info!("item after rangefilter: {item:?}");
// item
// });
let stream = crate::rangefilter2::RangeFilter2::new(
stream,
evq.range().try_into()?,
evq.one_before_range(),
);
let stream = stream.map(move |k| {
on_sitemty_data!(k, |k| {
let k: Box<dyn Events> = Box::new(k);
// trace!("got len {}", k.len());
let k = tr.0.transform(k);
Ok(StreamItem::DataItem(RangeCompletableItem::Data(k)))
})
});
if let Some(wasmname) = evq.test_do_wasm() {
let stream = transform_wasm::<_, items_0::streamitem::SitemErrTy>(stream, wasmname, ctx).await?;
let stream =
transform_wasm::<_, items_0::streamitem::SitemErrTy>(stream, wasmname, ctx).await?;
Ok(Box::pin(stream))
} else {
Ok(Box::pin(stream))
@@ -123,7 +115,8 @@ where
let mut store = wasmer::Store::default();
let module = wasmer::Module::new(&store, wasm).unwrap();
// TODO assert that memory is large enough
let memory = wasmer::Memory::new(&mut store, wasmer::MemoryType::new(10, Some(30), false)).unwrap();
let memory =
wasmer::Memory::new(&mut store, wasmer::MemoryType::new(10, Some(30), false)).unwrap();
let import_object = wasmer::imports! {
"env" => {
"memory" => memory.clone(),
@@ -157,14 +150,25 @@ where
.as_any_mut()
.downcast_mut::<Box<items_2::eventsdim0::EventsDim0<f64>>>()
.is_some();
let r5 = evs.as_mut().as_any_mut().downcast_mut::<ChannelEvents>().is_some();
let r6 = evs.as_mut().as_any_mut().downcast_mut::<Box<ChannelEvents>>().is_some();
let r5 = evs
.as_mut()
.as_any_mut()
.downcast_mut::<ChannelEvents>()
.is_some();
let r6 = evs
.as_mut()
.as_any_mut()
.downcast_mut::<Box<ChannelEvents>>()
.is_some();
debug!("wasm castings: {r1} {r2} {r3} {r4} {r5} {r6}");
}
if let Some(evs) = evs.as_any_mut().downcast_mut::<ChannelEvents>() {
match evs {
ChannelEvents::Events(evs) => {
if let Some(evs) = evs.as_any_mut().downcast_mut::<items_2::eventsdim0::EventsDim0<f64>>() {
if let Some(evs) = evs
.as_any_mut()
.downcast_mut::<items_2::eventsdim0::EventsDim0<f64>>()
{
use items_0::WithLen;
if evs.len() == 0 {
debug!("wasm empty EventsDim0<f64>");
@@ -181,12 +185,17 @@ where
let wmemoff = buffer_ptr as u64;
let view = memory.view(&store);
// TODO is the offset bytes or elements?
let wsl = WasmSlice::<f64>::new(&view, wmemoff, sl.len() as _).unwrap();
let wsl =
WasmSlice::<f64>::new(&view, wmemoff, sl.len() as _)
.unwrap();
// debug!("wasm pages {:?} data size {:?}", view.size(), view.data_size());
wsl.write_slice(&sl).unwrap();
let ptr = wsl.as_ptr32();
debug!("ptr {:?} offset {}", ptr, ptr.offset());
let params = [Value::I32(ptr.offset() as _), Value::I32(sl.len() as _)];
let params = [
Value::I32(ptr.offset() as _),
Value::I32(sl.len() as _),
];
let res = dummy1.call(&mut store, &params).unwrap();
match res[0] {
Value::I32(x) => {
@@ -201,7 +210,9 @@ where
}
// Init the slice again because we need to drop ownership for the function call.
let view = memory.view(&store);
let wsl = WasmSlice::<f64>::new(&view, wmemoff, sl.len() as _).unwrap();
let wsl =
WasmSlice::<f64>::new(&view, wmemoff, sl.len() as _)
.unwrap();
wsl.read_slice(sl).unwrap();
}
}

View File

@@ -10,7 +10,6 @@ use crate::tcprawclient::make_sub_query;
use crate::tcprawclient::OpenBoxedBytesStreamsBox;
use crate::timebin::cached::reader::CacheReadProvider;
use crate::timebin::cached::reader::EventsReadProvider;
use crate::transform::build_merged_event_transform;
use futures_util::future::BoxFuture;
use futures_util::Stream;
use futures_util::StreamExt;
@@ -90,7 +89,6 @@ pub async fn timebinnable_stream_sf_databuffer_channelevents(
);
let inmem_bufcap = subq.inmem_bufcap();
let _wasm1 = subq.wasm1().map(ToString::to_string);
let mut tr = build_merged_event_transform(subq.transform())?;
let bytes_streams = open_bytes.open(subq, ctx.as_ref().clone()).await?;
let mut inps = Vec::new();
for s in bytes_streams {
@@ -112,10 +110,7 @@ pub async fn timebinnable_stream_sf_databuffer_channelevents(
use StreamItem::*;
match k {
Ok(DataItem(Data(ChannelEvents::Events(k)))) => {
// let k = k;
// let k: Box<dyn Events> = Box::new(k);
let k = k.to_dim0_f32_for_binning();
let k = tr.0.transform(k);
Ok(StreamItem::DataItem(RangeCompletableItem::Data(
ChannelEvents::Events(k),
)))

View File

@@ -1,12 +1,7 @@
use items_0::transform::EventStreamTrait;
use items_0::transform::TransformEvent;
use items_0::transform::TransformProperties;
use items_0::transform::WithTransformProperties;
use items_2::transform::make_transform_identity;
use items_2::transform::make_transform_min_max_avg;
use items_2::transform::make_transform_pulse_id_diff;
use query::transform::EventTransformQuery;
use query::transform::TransformQuery;
use std::pin::Pin;
#[derive(Debug, thiserror::Error)]
@@ -16,26 +11,6 @@ pub enum Error {
UnhandledQuery(EventTransformQuery),
}
pub fn build_event_transform(tr: &TransformQuery) -> Result<TransformEvent, Error> {
let trev = tr.get_tr_event();
match trev {
EventTransformQuery::ValueFull => Ok(make_transform_identity()),
EventTransformQuery::MinMaxAvgDev => Ok(make_transform_min_max_avg()),
EventTransformQuery::ArrayPick(..) => Err(Error::UnhandledQuery(trev.clone())),
EventTransformQuery::PulseIdDiff => Ok(make_transform_pulse_id_diff()),
EventTransformQuery::EventBlobsVerbatim => Err(Error::UnhandledQuery(trev.clone())),
EventTransformQuery::EventBlobsUncompressed => Err(Error::UnhandledQuery(trev.clone())),
}
}
pub fn build_merged_event_transform(tr: &TransformQuery) -> Result<TransformEvent, Error> {
let trev = tr.get_tr_event();
match trev {
EventTransformQuery::PulseIdDiff => Ok(make_transform_pulse_id_diff()),
_ => Ok(make_transform_identity()),
}
}
// TODO remove, in its current usage it reboxes
pub struct EventsToTimeBinnable {
inp: Pin<Box<dyn EventStreamTrait>>,