From b8a8ecd53748e01946bec1bcb49990046ee6e328 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 22 Aug 2023 15:56:10 +0200 Subject: [PATCH] Can apply wasm to merged events, also for binned queries --- crates/daqbuffer/src/bin/daqbuffer.rs | 6 +- crates/httpret/src/httpret.rs | 8 +- crates/netpod/Cargo.toml | 5 +- crates/netpod/src/hex.rs | 10 ++ crates/netpod/src/netpod.rs | 1 + crates/nodenet/Cargo.toml | 1 - crates/nodenet/src/conn.rs | 43 ++++---- crates/query/src/api4/binned.rs | 14 +++ crates/query/src/api4/events.rs | 54 ++++++---- crates/streams/Cargo.toml | 1 + crates/streams/src/plaineventsjson.rs | 140 ++++++++++++++++++++++++- crates/streams/src/tcprawclient.rs | 2 + crates/streams/src/timebinnedjson.rs | 142 +++++++++++++++++++++++++- 13 files changed, 364 insertions(+), 63 deletions(-) create mode 100644 crates/netpod/src/hex.rs diff --git a/crates/daqbuffer/src/bin/daqbuffer.rs b/crates/daqbuffer/src/bin/daqbuffer.rs index 373d01e..edadc74 100644 --- a/crates/daqbuffer/src/bin/daqbuffer.rs +++ b/crates/daqbuffer/src/bin/daqbuffer.rs @@ -65,9 +65,9 @@ async fn go() -> Result<(), Error> { }; match opts.subcmd { SubCmd::Retrieval(subcmd) => { - info!("daqbuffer version {} 0000", clap::crate_version!()); - info!("{:?}", service_version); - { + info!("daqbuffer version {}", clap::crate_version!()); + info!(" service_version {}", service_version); + if false { #[allow(non_snake_case)] let TARGET = std::env!("DAQBUF_TARGET"); #[allow(non_snake_case)] diff --git a/crates/httpret/src/httpret.rs b/crates/httpret/src/httpret.rs index 6858b0e..4f58fc4 100644 --- a/crates/httpret/src/httpret.rs +++ b/crates/httpret/src/httpret.rs @@ -15,7 +15,6 @@ use self::bodystream::ToPublicResponse; use crate::bodystream::response; use crate::err::Error; use crate::gather::gather_get_json; -use crate::pulsemap::UpdateTask; use ::err::thiserror; use ::err::ThisError; use futures_util::Future; @@ -31,16 +30,13 @@ use hyper::Body; use hyper::Request; use hyper::Response; use net::SocketAddr; -use netpod::is_false; use netpod::log::*; use netpod::query::prebinned::PreBinnedQuery; -use netpod::CmpZero; use netpod::NodeConfigCached; use netpod::ProxyConfig; use netpod::ServiceVersion; use netpod::APP_JSON; use netpod::APP_JSON_LINES; -use nodenet::conn::events_service; use panic::AssertUnwindSafe; use panic::UnwindSafe; use pin::Pin; @@ -132,7 +128,7 @@ pub async fn host(node_config: NodeConfigCached, service_version: ServiceVersion if let Some(bind) = node_config.node.prometheus_api_bind { tokio::spawn(prometheus::host(bind)); } - // let rawjh = taskrun::spawn(events_service(node_config.clone())); + // let rawjh = taskrun::spawn(nodenet::conn::events_service(node_config.clone())); use std::str::FromStr; let addr = SocketAddr::from_str(&format!("{}:{}", node_config.node.listen(), node_config.node.port))?; let make_service = make_service_fn({ @@ -883,7 +879,7 @@ impl StatusBoard { Some(e) => e.into(), None => { error!("can not find status id {}", status_id); - let e = ::err::Error::with_public_msg_no_trace(format!("Request status ID unknown {status_id}")); + let _e = ::err::Error::with_public_msg_no_trace(format!("Request status ID unknown {status_id}")); StatusBoardEntryUser { error_count: 1, warn_count: 0, diff --git a/crates/netpod/Cargo.toml b/crates/netpod/Cargo.toml index 4cdc8c9..b2afb5d 100644 --- a/crates/netpod/Cargo.toml +++ b/crates/netpod/Cargo.toml @@ -12,10 +12,11 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" humantime-serde = "1.1.1" async-channel = "1.8.0" -bytes = "1.3" +bytes = "1.4.0" chrono = { version = "0.4.19", features = ["serde"] } futures-util = "0.3.14" tracing = "0.1.37" url = "2.2" -num-traits = "0.2" +num-traits = "0.2.16" +hex = "0.4.3" err = { path = "../err" } diff --git a/crates/netpod/src/hex.rs b/crates/netpod/src/hex.rs new file mode 100644 index 0000000..604a107 --- /dev/null +++ b/crates/netpod/src/hex.rs @@ -0,0 +1,10 @@ +/// Input may also contain whitespace. +pub fn decode_hex>(inp: INP) -> Result, ()> { + let a: Vec<_> = inp + .as_ref() + .bytes() + .filter(|&x| (x >= b'0' && x <= b'9') || (x >= b'a' && x <= b'f')) + .collect(); + let ret = hex::decode(a).map_err(|_| ())?; + Ok(ret) +} diff --git a/crates/netpod/src/netpod.rs b/crates/netpod/src/netpod.rs index a56b7c5..4565fb0 100644 --- a/crates/netpod/src/netpod.rs +++ b/crates/netpod/src/netpod.rs @@ -1,3 +1,4 @@ +pub mod hex; pub mod histo; pub mod query; pub mod range; diff --git a/crates/nodenet/Cargo.toml b/crates/nodenet/Cargo.toml index f292daa..e1853a5 100644 --- a/crates/nodenet/Cargo.toml +++ b/crates/nodenet/Cargo.toml @@ -21,7 +21,6 @@ tracing = "0.1.25" hex = "0.4.3" scylla = "0.8.1" tokio-postgres = "0.7.7" -wasmer = { version = "3.2.0", default-features = false, features = ["sys", "cranelift"] } err = { path = "../err" } netpod = { path = "../netpod" } query = { path = "../query" } diff --git a/crates/nodenet/src/conn.rs b/crates/nodenet/src/conn.rs index ffd4051..305a6da 100644 --- a/crates/nodenet/src/conn.rs +++ b/crates/nodenet/src/conn.rs @@ -12,7 +12,6 @@ use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; use items_0::streamitem::EVENT_QUERY_JSON_STRING_FRAME; -use items_0::Events; use items_2::channelevents::ChannelEvents; use items_2::empty::empty_events_dyn_ev; use items_2::framable::EventQueryJsonStringFrame; @@ -160,6 +159,8 @@ pub async fn create_response_bytes_stream( evq: EventsSubQuery, ncc: &NodeConfigCached, ) -> Result { + debug!("create_response_bytes_stream {:?}", evq.ch_conf().scalar_type()); + debug!("wasm1 {:?}", evq.wasm1()); let reqctx = netpod::ReqCtx::new(evq.reqid()); if evq.create_errors_contains("nodenet_parse_query") { let e = Error::with_msg_no_trace("produced error on request nodenet_parse_query"); @@ -175,32 +176,24 @@ pub async fn create_response_bytes_stream( Ok(ret) } else { let stream = make_channel_events_stream(evq.clone(), reqctx, ncc).await?; - if false { - // TODO wasm example - use wasmer::Value; - let wasm = b""; - let mut store = wasmer::Store::default(); - let module = wasmer::Module::new(&store, wasm).unwrap(); - let import_object = wasmer::imports! {}; - let instance = wasmer::Instance::new(&mut store, &module, &import_object).unwrap(); - let add_one = instance.exports.get_function("event_transform").unwrap(); - let result = add_one.call(&mut store, &[Value::I32(42)]).unwrap(); - assert_eq!(result[0], Value::I32(43)); - } - let mut tr = match build_event_transform(evq.transform()) { - Ok(x) => x, - Err(e) => { - return Err(e); - } - }; + let mut tr = build_event_transform(evq.transform())?; + let stream = stream.map(move |x| { - let item = on_sitemty_data!(x, |x| { - let x: Box = Box::new(x); - let x = tr.0.transform(x); - Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))) - }); - Box::new(item) as Box + on_sitemty_data!(x, |x: ChannelEvents| { + match x { + ChannelEvents::Events(evs) => { + let evs = tr.0.transform(evs); + Ok(StreamItem::DataItem(RangeCompletableItem::Data(ChannelEvents::Events( + evs, + )))) + } + ChannelEvents::Status(x) => Ok(StreamItem::DataItem(RangeCompletableItem::Data( + ChannelEvents::Status(x), + ))), + } + }) }); + // let stream = stream.map(move |x| Box::new(x) as Box); let stream = stream.map(|x| x.make_frame().map(|x| x.freeze())); let ret = Box::pin(stream); Ok(ret) diff --git a/crates/query/src/api4/binned.rs b/crates/query/src/api4/binned.rs index aa3d10a..55609d4 100644 --- a/crates/query/src/api4/binned.rs +++ b/crates/query/src/api4/binned.rs @@ -40,6 +40,8 @@ pub struct BinnedQuery { disk_stats_every: Option, #[serde(default, skip_serializing_if = "Option::is_none")] pub merger_out_len_max: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + test_do_wasm: Option, } impl BinnedQuery { @@ -55,6 +57,7 @@ impl BinnedQuery { disk_stats_every: None, timeout: None, merger_out_len_max: None, + test_do_wasm: None, } } @@ -140,6 +143,13 @@ impl BinnedQuery { v.transform = TransformQuery::for_time_weighted_scalar(); v } + + pub fn test_do_wasm(&self) -> Option<&str> { + match &self.test_do_wasm { + Some(x) => Some(&x), + None => None, + } + } } impl HasBackend for BinnedQuery { @@ -199,6 +209,7 @@ impl FromUrl for BinnedQuery { merger_out_len_max: pairs .get("mergerOutLenMax") .map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?, + test_do_wasm: pairs.get("testDoWasm").map(|x| String::from(x)), }; debug!("BinnedQuery::from_url {:?}", ret); Ok(ret) @@ -236,5 +247,8 @@ impl AppendToUrl for BinnedQuery { if let Some(x) = self.merger_out_len_max.as_ref() { g.append_pair("mergerOutLenMax", &format!("{}", x)); } + if let Some(x) = &self.test_do_wasm { + g.append_pair("testDoWasm", &x); + } } } diff --git a/crates/query/src/api4/events.rs b/crates/query/src/api4/events.rs index 7ab7604..c996005 100644 --- a/crates/query/src/api4/events.rs +++ b/crates/query/src/api4/events.rs @@ -46,8 +46,8 @@ pub struct PlainEventsQuery { do_test_main_error: bool, #[serde(default, skip_serializing_if = "is_false")] do_test_stream_error: bool, - #[serde(default, skip_serializing_if = "is_false")] - test_do_wasm: bool, + #[serde(default, skip_serializing_if = "Option::is_none")] + test_do_wasm: Option, #[serde(default, skip_serializing_if = "Option::is_none")] merger_out_len_max: Option, #[serde(default, skip_serializing_if = "Vec::is_empty")] @@ -71,7 +71,7 @@ impl PlainEventsQuery { buf_len_disk_io: None, do_test_main_error: false, do_test_stream_error: false, - test_do_wasm: false, + test_do_wasm: None, merger_out_len_max: None, create_errors: Vec::new(), } @@ -127,8 +127,11 @@ impl PlainEventsQuery { self.do_test_stream_error } - pub fn test_do_wasm(&self) -> bool { - self.test_do_wasm + pub fn test_do_wasm(&self) -> Option<&str> { + match &self.test_do_wasm { + Some(x) => Some(&x), + None => None, + } } pub fn set_series_id(&mut self, series: u64) { @@ -228,11 +231,12 @@ impl FromUrl for PlainEventsQuery { .map_or("false", |k| k) .parse() .map_err(|e| Error::with_public_msg_no_trace(format!("can not parse doTestStreamError: {}", e)))?, - test_do_wasm: pairs - .get("testDoWasm") - .map(|x| x.parse::().ok()) - .unwrap_or(None) - .unwrap_or(false), + // test_do_wasm: pairs + // .get("testDoWasm") + // .map(|x| x.parse::().ok()) + // .unwrap_or(None) + // .unwrap_or(false), + test_do_wasm: pairs.get("testDoWasm").map(|x| String::from(x)), merger_out_len_max: pairs .get("mergerOutLenMax") .map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?, @@ -281,8 +285,8 @@ impl AppendToUrl for PlainEventsQuery { if self.do_test_stream_error { g.append_pair("doTestStreamError", "true"); } - if self.test_do_wasm { - g.append_pair("testDoWasm", "true"); + if let Some(x) = &self.test_do_wasm { + g.append_pair("testDoWasm", &x); } if let Some(x) = self.merger_out_len_max.as_ref() { g.append_pair("mergerOutLenMax", &format!("{}", x)); @@ -298,6 +302,7 @@ pub struct EventsSubQuerySelect { ch_conf: ChannelTypeConfigGen, range: SeriesRange, transform: TransformQuery, + wasm1: Option, } impl EventsSubQuerySelect { @@ -306,8 +311,20 @@ impl EventsSubQuerySelect { ch_conf: ch_info, range, transform, + wasm1: None, } } + + pub fn wasm1(&self) -> Option<&str> { + match &self.wasm1 { + Some(x) => Some(&x), + None => None, + } + } + + pub fn set_wasm1(&mut self, x: String) { + self.wasm1 = Some(x); + } } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -318,7 +335,6 @@ pub struct EventsSubQuerySettings { stream_batch_len: Option, buf_len_disk_io: Option, queue_len_disk_io: Option, - test_do_wasm: bool, create_errors: Vec, } @@ -331,7 +347,6 @@ impl Default for EventsSubQuerySettings { stream_batch_len: None, buf_len_disk_io: None, queue_len_disk_io: None, - test_do_wasm: false, create_errors: Vec::new(), } } @@ -347,7 +362,6 @@ impl From<&PlainEventsQuery> for EventsSubQuerySettings { buf_len_disk_io: value.buf_len_disk_io, // TODO add to query queue_len_disk_io: None, - test_do_wasm: value.test_do_wasm, create_errors: value.create_errors.clone(), } } @@ -364,7 +378,6 @@ impl From<&BinnedQuery> for EventsSubQuerySettings { buf_len_disk_io: None, // TODO add to query queue_len_disk_io: None, - test_do_wasm: false, create_errors: Vec::new(), } } @@ -381,7 +394,6 @@ impl From<&Api1Query> for EventsSubQuerySettings { stream_batch_len: None, buf_len_disk_io: Some(disk_io_tune.read_buffer_len), queue_len_disk_io: Some(disk_io_tune.read_queue_len), - test_do_wasm: false, create_errors: Vec::new(), } } @@ -456,10 +468,6 @@ impl EventsSubQuery { self.settings.events_max.unwrap_or(1024 * 512) } - pub fn test_do_wasm(&self) -> bool { - self.settings.test_do_wasm - } - pub fn is_event_blobs(&self) -> bool { self.select.transform.is_event_blobs() } @@ -475,6 +483,10 @@ impl EventsSubQuery { pub fn reqid(&self) -> &str { &self.reqid } + + pub fn wasm1(&self) -> Option<&str> { + self.select.wasm1() + } } #[derive(Debug, Serialize, Deserialize)] diff --git a/crates/streams/Cargo.toml b/crates/streams/Cargo.toml index b800b67..46f0f06 100644 --- a/crates/streams/Cargo.toml +++ b/crates/streams/Cargo.toml @@ -18,6 +18,7 @@ crc32fast = "1.3.2" byteorder = "1.4.3" async-channel = "1.8.0" chrono = { version = "0.4.19", features = ["serde"] } +wasmer = { version = "4.1.0", default-features = false, features = ["sys", "cranelift"] } err = { path = "../err" } netpod = { path = "../netpod" } query = { path = "../query" } diff --git a/crates/streams/src/plaineventsjson.rs b/crates/streams/src/plaineventsjson.rs index acfa000..ab84fb6 100644 --- a/crates/streams/src/plaineventsjson.rs +++ b/crates/streams/src/plaineventsjson.rs @@ -28,7 +28,10 @@ pub async fn plain_events_json( cluster: &Cluster, ) -> Result { info!("plain_events_json evquery {:?}", evq); - let select = EventsSubQuerySelect::new(ch_conf, evq.range().clone(), evq.transform().clone()); + let mut select = EventsSubQuerySelect::new(ch_conf, evq.range().clone(), evq.transform().clone()); + if let Some(x) = evq.test_do_wasm() { + select.set_wasm1(x.into()); + } let settings = EventsSubQuerySettings::from(evq); let subq = EventsSubQuery::from_parts(select, settings, reqid); // TODO remove magic constant @@ -51,15 +54,148 @@ pub async fn plain_events_json( info!("item after rangefilter: {item:?}"); item }); + let stream = stream.map(move |k| { on_sitemty_data!(k, |k| { let k: Box = Box::new(k); - trace!("got len {}", k.len()); + // trace!("got len {}", k.len()); let k = tr.0.transform(k); + Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))) + }) + }); + + let stream = if let Some(wasmname) = evq.test_do_wasm() { + debug!("make wasm transform"); + use httpclient::url::Url; + use wasmer::Value; + use wasmer::WasmSlice; + let t = httpclient::http_get( + Url::parse(&format!("http://data-api.psi.ch/distri/{}", wasmname)).unwrap(), + "*/*", + ) + .await + .unwrap(); + let wasm = t.body; + // let wasm = include_bytes!("dummy.wasm"); + let mut store = wasmer::Store::default(); + let module = wasmer::Module::new(&store, wasm).unwrap(); + // TODO assert that memory is large enough + let memory = wasmer::Memory::new(&mut store, wasmer::MemoryType::new(10, Some(30), false)).unwrap(); + let import_object = wasmer::imports! { + "env" => { + "memory" => memory.clone(), + } + }; + let instance = wasmer::Instance::new(&mut store, &module, &import_object).unwrap(); + let get_buffer_ptr = instance.exports.get_function("get_buffer_ptr").unwrap(); + let buffer_ptr = get_buffer_ptr.call(&mut store, &[]).unwrap(); + let buffer_ptr = buffer_ptr[0].i32().unwrap(); + let stream = stream.map(move |x| { + let memory = memory.clone(); + let item = on_sitemty_data!(x, |mut evs: Box| { + let x = { + use items_0::AsAnyMut; + if true { + let r1 = evs + .as_any_mut() + .downcast_mut::>() + .is_some(); + let r2 = evs + .as_mut() + .as_any_mut() + .downcast_mut::>() + .is_some(); + let r3 = evs + .as_any_mut() + .downcast_mut::>>() + .is_some(); + let r4 = evs + .as_mut() + .as_any_mut() + .downcast_mut::>>() + .is_some(); + let r5 = evs.as_mut().as_any_mut().downcast_mut::().is_some(); + let r6 = evs.as_mut().as_any_mut().downcast_mut::>().is_some(); + debug!("wasm castings: {r1} {r2} {r3} {r4} {r5} {r6}"); + } + if let Some(evs) = evs.as_any_mut().downcast_mut::() { + match evs { + ChannelEvents::Events(evs) => { + if let Some(evs) = + evs.as_any_mut().downcast_mut::>() + { + use items_0::WithLen; + if evs.len() == 0 { + debug!("wasm empty EventsDim0"); + } else { + debug!("wasm see EventsDim0"); + let max_len_needed = 16000; + let dummy1 = instance.exports.get_function("dummy1").unwrap(); + let s = evs.values.as_mut_slices(); + for sl in [s.0, s.1] { + if sl.len() > max_len_needed as _ { + // TODO cause error + panic!(); + } + let wmemoff = buffer_ptr as u64; + let view = memory.view(&store); + // TODO is the offset bytes or elements? + let wsl = WasmSlice::::new(&view, wmemoff, sl.len() as _).unwrap(); + // debug!("wasm pages {:?} data size {:?}", view.size(), view.data_size()); + wsl.write_slice(&sl).unwrap(); + let ptr = wsl.as_ptr32(); + debug!("ptr {:?} offset {}", ptr, ptr.offset()); + let params = [Value::I32(ptr.offset() as _), Value::I32(sl.len() as _)]; + let res = dummy1.call(&mut store, ¶ms).unwrap(); + match res[0] { + Value::I32(x) => { + debug!("wasm dummy1 returned: {x:?}"); + if x != 1 { + error!("unexpected return value {res:?}"); + } + } + _ => { + error!("unexpected return type {res:?}"); + } + } + // Init the slice again because we need to drop ownership for the function call. + let view = memory.view(&store); + let wsl = WasmSlice::::new(&view, wmemoff, sl.len() as _).unwrap(); + wsl.read_slice(sl).unwrap(); + } + } + } else { + debug!("wasm not EventsDim0"); + } + } + ChannelEvents::Status(_) => {} + } + } else { + debug!("wasm not ChannelEvents"); + } + evs + }; + Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))) + }); + // Box::new(item) as Box + item + }); + use futures_util::Stream; + use items_0::streamitem::Sitemty; + use std::pin::Pin; + Box::pin(stream) as Pin>> + Send>> + } else { + let stream = stream.map(|x| x); + Box::pin(stream) + }; + + let stream = stream.map(move |k| { + on_sitemty_data!(k, |k| { let k: Box = Box::new(k); Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))) }) }); + //let stream = PlainEventStream::new(stream); //let stream = EventsToTimeBinnable::new(stream); //let stream = TimeBinnableToCollectable::new(stream); diff --git a/crates/streams/src/tcprawclient.rs b/crates/streams/src/tcprawclient.rs index c04effe..794bc1a 100644 --- a/crates/streams/src/tcprawclient.rs +++ b/crates/streams/src/tcprawclient.rs @@ -108,6 +108,8 @@ pub async fn x_processed_event_blobs_stream_from_node_http( Ok(Box::pin(stream)) } +// Currently used only for the python data api3 protocol endpoint. +// TODO merge with main method. pub async fn x_processed_event_blobs_stream_from_node( subq: EventsSubQuery, node: Node, diff --git a/crates/streams/src/timebinnedjson.rs b/crates/streams/src/timebinnedjson.rs index eed40c1..b715bda 100644 --- a/crates/streams/src/timebinnedjson.rs +++ b/crates/streams/src/timebinnedjson.rs @@ -44,9 +44,12 @@ async fn timebinnable_stream( reqid: String, cluster: Cluster, ) -> Result { - let select = EventsSubQuerySelect::new(ch_conf, range.clone().into(), query.transform().clone()); + let mut select = EventsSubQuerySelect::new(ch_conf, range.clone().into(), query.transform().clone()); + if let Some(wasm1) = query.test_do_wasm() { + select.set_wasm1(wasm1.into()); + } let settings = EventsSubQuerySettings::from(&query); - let subq = EventsSubQuery::from_parts(select, settings, reqid); + let subq = EventsSubQuery::from_parts(select.clone(), settings, reqid); let mut tr = build_merged_event_transform(subq.transform())?; let inps = open_event_data_streams::(subq, &cluster).await?; // TODO propagate also the max-buf-len for the first stage event reader. @@ -58,11 +61,144 @@ async fn timebinnable_stream( let stream = stream.map(move |k| { on_sitemty_data!(k, |k| { let k: Box = Box::new(k); - trace!("got len {}", k.len()); + // trace!("got len {}", k.len()); let k = tr.0.transform(k); Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))) }) }); + + let stream = if let Some(wasmname) = select.wasm1() { + debug!("make wasm transform"); + use httpclient::url::Url; + use wasmer::Value; + use wasmer::WasmSlice; + let t = httpclient::http_get( + Url::parse(&format!("http://data-api.psi.ch/distri/{}", wasmname)).unwrap(), + "*/*", + ) + .await + .unwrap(); + let wasm = t.body; + // let wasm = include_bytes!("dummy.wasm"); + let mut store = wasmer::Store::default(); + let module = wasmer::Module::new(&store, wasm).unwrap(); + // TODO assert that memory is large enough + let memory = wasmer::Memory::new(&mut store, wasmer::MemoryType::new(10, Some(30), false)).unwrap(); + let import_object = wasmer::imports! { + "env" => { + "memory" => memory.clone(), + } + }; + let instance = wasmer::Instance::new(&mut store, &module, &import_object).unwrap(); + let get_buffer_ptr = instance.exports.get_function("get_buffer_ptr").unwrap(); + let buffer_ptr = get_buffer_ptr.call(&mut store, &[]).unwrap(); + let buffer_ptr = buffer_ptr[0].i32().unwrap(); + let stream = stream.map(move |x| { + let memory = memory.clone(); + let item = on_sitemty_data!(x, |mut evs: Box| { + let x = { + use items_0::AsAnyMut; + if true { + let r1 = evs + .as_any_mut() + .downcast_mut::>() + .is_some(); + let r2 = evs + .as_mut() + .as_any_mut() + .downcast_mut::>() + .is_some(); + let r3 = evs + .as_any_mut() + .downcast_mut::>>() + .is_some(); + let r4 = evs + .as_mut() + .as_any_mut() + .downcast_mut::>>() + .is_some(); + let r5 = evs.as_mut().as_any_mut().downcast_mut::().is_some(); + let r6 = evs.as_mut().as_any_mut().downcast_mut::>().is_some(); + debug!("wasm castings: {r1} {r2} {r3} {r4} {r5} {r6}"); + } + if let Some(evs) = evs.as_any_mut().downcast_mut::() { + match evs { + ChannelEvents::Events(evs) => { + if let Some(evs) = + evs.as_any_mut().downcast_mut::>() + { + use items_0::WithLen; + if evs.len() == 0 { + debug!("wasm empty EventsDim0"); + } else { + debug!("wasm see EventsDim0 len {}", evs.len()); + let max_len_needed = 16000; + let dummy1 = instance.exports.get_function("dummy1").unwrap(); + let s = evs.values.as_mut_slices(); + for sl in [s.0, s.1] { + if sl.len() > max_len_needed as _ { + // TODO cause error + panic!(); + } + let wmemoff = buffer_ptr as u64; + let view = memory.view(&store); + // TODO is the offset bytes or elements? + let wsl = WasmSlice::::new(&view, wmemoff, sl.len() as _).unwrap(); + // debug!("wasm pages {:?} data size {:?}", view.size(), view.data_size()); + wsl.write_slice(&sl).unwrap(); + let ptr = wsl.as_ptr32(); + debug!("ptr {:?} offset {}", ptr, ptr.offset()); + let params = [Value::I32(ptr.offset() as _), Value::I32(sl.len() as _)]; + let res = dummy1.call(&mut store, ¶ms).unwrap(); + match res[0] { + Value::I32(x) => { + debug!("wasm dummy1 returned: {x:?}"); + if x != 1 { + error!("unexpected return value {res:?}"); + } + } + _ => { + error!("unexpected return type {res:?}"); + } + } + // Init the slice again because we need to drop ownership for the function call. + let view = memory.view(&store); + let wsl = WasmSlice::::new(&view, wmemoff, sl.len() as _).unwrap(); + wsl.read_slice(sl).unwrap(); + } + } + } else { + debug!("wasm not EventsDim0"); + } + } + ChannelEvents::Status(_) => {} + } + } else { + debug!("wasm not ChannelEvents"); + } + evs + }; + Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))) + }); + // Box::new(item) as Box + item + }); + use futures_util::Stream; + use items_0::streamitem::Sitemty; + use std::pin::Pin; + Box::pin(stream) as Pin>> + Send>> + } else { + let stream = stream.map(|x| x); + Box::pin(stream) + }; + + // let stream = stream.map(move |k| { + // on_sitemty_data!(k, |k| { + // let k: Box = Box::new(k); + // Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))) + // }) + // }); + let stream = PlainEventStream::new(stream); let stream = EventsToTimeBinnable::new(stream); let stream = Box::pin(stream);