Can apply wasm to merged events, also for binned queries
This commit is contained in:
@@ -65,9 +65,9 @@ async fn go() -> Result<(), Error> {
|
|||||||
};
|
};
|
||||||
match opts.subcmd {
|
match opts.subcmd {
|
||||||
SubCmd::Retrieval(subcmd) => {
|
SubCmd::Retrieval(subcmd) => {
|
||||||
info!("daqbuffer version {} 0000", clap::crate_version!());
|
info!("daqbuffer version {}", clap::crate_version!());
|
||||||
info!("{:?}", service_version);
|
info!(" service_version {}", service_version);
|
||||||
{
|
if false {
|
||||||
#[allow(non_snake_case)]
|
#[allow(non_snake_case)]
|
||||||
let TARGET = std::env!("DAQBUF_TARGET");
|
let TARGET = std::env!("DAQBUF_TARGET");
|
||||||
#[allow(non_snake_case)]
|
#[allow(non_snake_case)]
|
||||||
|
|||||||
@@ -15,7 +15,6 @@ use self::bodystream::ToPublicResponse;
|
|||||||
use crate::bodystream::response;
|
use crate::bodystream::response;
|
||||||
use crate::err::Error;
|
use crate::err::Error;
|
||||||
use crate::gather::gather_get_json;
|
use crate::gather::gather_get_json;
|
||||||
use crate::pulsemap::UpdateTask;
|
|
||||||
use ::err::thiserror;
|
use ::err::thiserror;
|
||||||
use ::err::ThisError;
|
use ::err::ThisError;
|
||||||
use futures_util::Future;
|
use futures_util::Future;
|
||||||
@@ -31,16 +30,13 @@ use hyper::Body;
|
|||||||
use hyper::Request;
|
use hyper::Request;
|
||||||
use hyper::Response;
|
use hyper::Response;
|
||||||
use net::SocketAddr;
|
use net::SocketAddr;
|
||||||
use netpod::is_false;
|
|
||||||
use netpod::log::*;
|
use netpod::log::*;
|
||||||
use netpod::query::prebinned::PreBinnedQuery;
|
use netpod::query::prebinned::PreBinnedQuery;
|
||||||
use netpod::CmpZero;
|
|
||||||
use netpod::NodeConfigCached;
|
use netpod::NodeConfigCached;
|
||||||
use netpod::ProxyConfig;
|
use netpod::ProxyConfig;
|
||||||
use netpod::ServiceVersion;
|
use netpod::ServiceVersion;
|
||||||
use netpod::APP_JSON;
|
use netpod::APP_JSON;
|
||||||
use netpod::APP_JSON_LINES;
|
use netpod::APP_JSON_LINES;
|
||||||
use nodenet::conn::events_service;
|
|
||||||
use panic::AssertUnwindSafe;
|
use panic::AssertUnwindSafe;
|
||||||
use panic::UnwindSafe;
|
use panic::UnwindSafe;
|
||||||
use pin::Pin;
|
use pin::Pin;
|
||||||
@@ -132,7 +128,7 @@ pub async fn host(node_config: NodeConfigCached, service_version: ServiceVersion
|
|||||||
if let Some(bind) = node_config.node.prometheus_api_bind {
|
if let Some(bind) = node_config.node.prometheus_api_bind {
|
||||||
tokio::spawn(prometheus::host(bind));
|
tokio::spawn(prometheus::host(bind));
|
||||||
}
|
}
|
||||||
// let rawjh = taskrun::spawn(events_service(node_config.clone()));
|
// let rawjh = taskrun::spawn(nodenet::conn::events_service(node_config.clone()));
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
let addr = SocketAddr::from_str(&format!("{}:{}", node_config.node.listen(), node_config.node.port))?;
|
let addr = SocketAddr::from_str(&format!("{}:{}", node_config.node.listen(), node_config.node.port))?;
|
||||||
let make_service = make_service_fn({
|
let make_service = make_service_fn({
|
||||||
@@ -883,7 +879,7 @@ impl StatusBoard {
|
|||||||
Some(e) => e.into(),
|
Some(e) => e.into(),
|
||||||
None => {
|
None => {
|
||||||
error!("can not find status id {}", status_id);
|
error!("can not find status id {}", status_id);
|
||||||
let e = ::err::Error::with_public_msg_no_trace(format!("Request status ID unknown {status_id}"));
|
let _e = ::err::Error::with_public_msg_no_trace(format!("Request status ID unknown {status_id}"));
|
||||||
StatusBoardEntryUser {
|
StatusBoardEntryUser {
|
||||||
error_count: 1,
|
error_count: 1,
|
||||||
warn_count: 0,
|
warn_count: 0,
|
||||||
|
|||||||
@@ -12,10 +12,11 @@ serde = { version = "1.0", features = ["derive"] }
|
|||||||
serde_json = "1.0"
|
serde_json = "1.0"
|
||||||
humantime-serde = "1.1.1"
|
humantime-serde = "1.1.1"
|
||||||
async-channel = "1.8.0"
|
async-channel = "1.8.0"
|
||||||
bytes = "1.3"
|
bytes = "1.4.0"
|
||||||
chrono = { version = "0.4.19", features = ["serde"] }
|
chrono = { version = "0.4.19", features = ["serde"] }
|
||||||
futures-util = "0.3.14"
|
futures-util = "0.3.14"
|
||||||
tracing = "0.1.37"
|
tracing = "0.1.37"
|
||||||
url = "2.2"
|
url = "2.2"
|
||||||
num-traits = "0.2"
|
num-traits = "0.2.16"
|
||||||
|
hex = "0.4.3"
|
||||||
err = { path = "../err" }
|
err = { path = "../err" }
|
||||||
|
|||||||
@@ -0,0 +1,10 @@
|
|||||||
|
/// Input may also contain whitespace.
|
||||||
|
pub fn decode_hex<INP: AsRef<str>>(inp: INP) -> Result<Vec<u8>, ()> {
|
||||||
|
let a: Vec<_> = inp
|
||||||
|
.as_ref()
|
||||||
|
.bytes()
|
||||||
|
.filter(|&x| (x >= b'0' && x <= b'9') || (x >= b'a' && x <= b'f'))
|
||||||
|
.collect();
|
||||||
|
let ret = hex::decode(a).map_err(|_| ())?;
|
||||||
|
Ok(ret)
|
||||||
|
}
|
||||||
@@ -1,3 +1,4 @@
|
|||||||
|
pub mod hex;
|
||||||
pub mod histo;
|
pub mod histo;
|
||||||
pub mod query;
|
pub mod query;
|
||||||
pub mod range;
|
pub mod range;
|
||||||
|
|||||||
@@ -21,7 +21,6 @@ tracing = "0.1.25"
|
|||||||
hex = "0.4.3"
|
hex = "0.4.3"
|
||||||
scylla = "0.8.1"
|
scylla = "0.8.1"
|
||||||
tokio-postgres = "0.7.7"
|
tokio-postgres = "0.7.7"
|
||||||
wasmer = { version = "3.2.0", default-features = false, features = ["sys", "cranelift"] }
|
|
||||||
err = { path = "../err" }
|
err = { path = "../err" }
|
||||||
netpod = { path = "../netpod" }
|
netpod = { path = "../netpod" }
|
||||||
query = { path = "../query" }
|
query = { path = "../query" }
|
||||||
|
|||||||
+18
-25
@@ -12,7 +12,6 @@ use items_0::streamitem::RangeCompletableItem;
|
|||||||
use items_0::streamitem::Sitemty;
|
use items_0::streamitem::Sitemty;
|
||||||
use items_0::streamitem::StreamItem;
|
use items_0::streamitem::StreamItem;
|
||||||
use items_0::streamitem::EVENT_QUERY_JSON_STRING_FRAME;
|
use items_0::streamitem::EVENT_QUERY_JSON_STRING_FRAME;
|
||||||
use items_0::Events;
|
|
||||||
use items_2::channelevents::ChannelEvents;
|
use items_2::channelevents::ChannelEvents;
|
||||||
use items_2::empty::empty_events_dyn_ev;
|
use items_2::empty::empty_events_dyn_ev;
|
||||||
use items_2::framable::EventQueryJsonStringFrame;
|
use items_2::framable::EventQueryJsonStringFrame;
|
||||||
@@ -160,6 +159,8 @@ pub async fn create_response_bytes_stream(
|
|||||||
evq: EventsSubQuery,
|
evq: EventsSubQuery,
|
||||||
ncc: &NodeConfigCached,
|
ncc: &NodeConfigCached,
|
||||||
) -> Result<BytesStreamBox, Error> {
|
) -> Result<BytesStreamBox, Error> {
|
||||||
|
debug!("create_response_bytes_stream {:?}", evq.ch_conf().scalar_type());
|
||||||
|
debug!("wasm1 {:?}", evq.wasm1());
|
||||||
let reqctx = netpod::ReqCtx::new(evq.reqid());
|
let reqctx = netpod::ReqCtx::new(evq.reqid());
|
||||||
if evq.create_errors_contains("nodenet_parse_query") {
|
if evq.create_errors_contains("nodenet_parse_query") {
|
||||||
let e = Error::with_msg_no_trace("produced error on request nodenet_parse_query");
|
let e = Error::with_msg_no_trace("produced error on request nodenet_parse_query");
|
||||||
@@ -175,32 +176,24 @@ pub async fn create_response_bytes_stream(
|
|||||||
Ok(ret)
|
Ok(ret)
|
||||||
} else {
|
} else {
|
||||||
let stream = make_channel_events_stream(evq.clone(), reqctx, ncc).await?;
|
let stream = make_channel_events_stream(evq.clone(), reqctx, ncc).await?;
|
||||||
if false {
|
let mut tr = build_event_transform(evq.transform())?;
|
||||||
// TODO wasm example
|
|
||||||
use wasmer::Value;
|
|
||||||
let wasm = b"";
|
|
||||||
let mut store = wasmer::Store::default();
|
|
||||||
let module = wasmer::Module::new(&store, wasm).unwrap();
|
|
||||||
let import_object = wasmer::imports! {};
|
|
||||||
let instance = wasmer::Instance::new(&mut store, &module, &import_object).unwrap();
|
|
||||||
let add_one = instance.exports.get_function("event_transform").unwrap();
|
|
||||||
let result = add_one.call(&mut store, &[Value::I32(42)]).unwrap();
|
|
||||||
assert_eq!(result[0], Value::I32(43));
|
|
||||||
}
|
|
||||||
let mut tr = match build_event_transform(evq.transform()) {
|
|
||||||
Ok(x) => x,
|
|
||||||
Err(e) => {
|
|
||||||
return Err(e);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
let stream = stream.map(move |x| {
|
let stream = stream.map(move |x| {
|
||||||
let item = on_sitemty_data!(x, |x| {
|
on_sitemty_data!(x, |x: ChannelEvents| {
|
||||||
let x: Box<dyn Events> = Box::new(x);
|
match x {
|
||||||
let x = tr.0.transform(x);
|
ChannelEvents::Events(evs) => {
|
||||||
Ok(StreamItem::DataItem(RangeCompletableItem::Data(x)))
|
let evs = tr.0.transform(evs);
|
||||||
});
|
Ok(StreamItem::DataItem(RangeCompletableItem::Data(ChannelEvents::Events(
|
||||||
Box::new(item) as Box<dyn Framable + Send>
|
evs,
|
||||||
|
))))
|
||||||
|
}
|
||||||
|
ChannelEvents::Status(x) => Ok(StreamItem::DataItem(RangeCompletableItem::Data(
|
||||||
|
ChannelEvents::Status(x),
|
||||||
|
))),
|
||||||
|
}
|
||||||
|
})
|
||||||
});
|
});
|
||||||
|
// let stream = stream.map(move |x| Box::new(x) as Box<dyn Framable + Send>);
|
||||||
let stream = stream.map(|x| x.make_frame().map(|x| x.freeze()));
|
let stream = stream.map(|x| x.make_frame().map(|x| x.freeze()));
|
||||||
let ret = Box::pin(stream);
|
let ret = Box::pin(stream);
|
||||||
Ok(ret)
|
Ok(ret)
|
||||||
|
|||||||
@@ -40,6 +40,8 @@ pub struct BinnedQuery {
|
|||||||
disk_stats_every: Option<ByteSize>,
|
disk_stats_every: Option<ByteSize>,
|
||||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||||
pub merger_out_len_max: Option<usize>,
|
pub merger_out_len_max: Option<usize>,
|
||||||
|
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||||
|
test_do_wasm: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BinnedQuery {
|
impl BinnedQuery {
|
||||||
@@ -55,6 +57,7 @@ impl BinnedQuery {
|
|||||||
disk_stats_every: None,
|
disk_stats_every: None,
|
||||||
timeout: None,
|
timeout: None,
|
||||||
merger_out_len_max: None,
|
merger_out_len_max: None,
|
||||||
|
test_do_wasm: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -140,6 +143,13 @@ impl BinnedQuery {
|
|||||||
v.transform = TransformQuery::for_time_weighted_scalar();
|
v.transform = TransformQuery::for_time_weighted_scalar();
|
||||||
v
|
v
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn test_do_wasm(&self) -> Option<&str> {
|
||||||
|
match &self.test_do_wasm {
|
||||||
|
Some(x) => Some(&x),
|
||||||
|
None => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl HasBackend for BinnedQuery {
|
impl HasBackend for BinnedQuery {
|
||||||
@@ -199,6 +209,7 @@ impl FromUrl for BinnedQuery {
|
|||||||
merger_out_len_max: pairs
|
merger_out_len_max: pairs
|
||||||
.get("mergerOutLenMax")
|
.get("mergerOutLenMax")
|
||||||
.map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?,
|
.map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?,
|
||||||
|
test_do_wasm: pairs.get("testDoWasm").map(|x| String::from(x)),
|
||||||
};
|
};
|
||||||
debug!("BinnedQuery::from_url {:?}", ret);
|
debug!("BinnedQuery::from_url {:?}", ret);
|
||||||
Ok(ret)
|
Ok(ret)
|
||||||
@@ -236,5 +247,8 @@ impl AppendToUrl for BinnedQuery {
|
|||||||
if let Some(x) = self.merger_out_len_max.as_ref() {
|
if let Some(x) = self.merger_out_len_max.as_ref() {
|
||||||
g.append_pair("mergerOutLenMax", &format!("{}", x));
|
g.append_pair("mergerOutLenMax", &format!("{}", x));
|
||||||
}
|
}
|
||||||
|
if let Some(x) = &self.test_do_wasm {
|
||||||
|
g.append_pair("testDoWasm", &x);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -46,8 +46,8 @@ pub struct PlainEventsQuery {
|
|||||||
do_test_main_error: bool,
|
do_test_main_error: bool,
|
||||||
#[serde(default, skip_serializing_if = "is_false")]
|
#[serde(default, skip_serializing_if = "is_false")]
|
||||||
do_test_stream_error: bool,
|
do_test_stream_error: bool,
|
||||||
#[serde(default, skip_serializing_if = "is_false")]
|
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||||
test_do_wasm: bool,
|
test_do_wasm: Option<String>,
|
||||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||||
merger_out_len_max: Option<usize>,
|
merger_out_len_max: Option<usize>,
|
||||||
#[serde(default, skip_serializing_if = "Vec::is_empty")]
|
#[serde(default, skip_serializing_if = "Vec::is_empty")]
|
||||||
@@ -71,7 +71,7 @@ impl PlainEventsQuery {
|
|||||||
buf_len_disk_io: None,
|
buf_len_disk_io: None,
|
||||||
do_test_main_error: false,
|
do_test_main_error: false,
|
||||||
do_test_stream_error: false,
|
do_test_stream_error: false,
|
||||||
test_do_wasm: false,
|
test_do_wasm: None,
|
||||||
merger_out_len_max: None,
|
merger_out_len_max: None,
|
||||||
create_errors: Vec::new(),
|
create_errors: Vec::new(),
|
||||||
}
|
}
|
||||||
@@ -127,8 +127,11 @@ impl PlainEventsQuery {
|
|||||||
self.do_test_stream_error
|
self.do_test_stream_error
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn test_do_wasm(&self) -> bool {
|
pub fn test_do_wasm(&self) -> Option<&str> {
|
||||||
self.test_do_wasm
|
match &self.test_do_wasm {
|
||||||
|
Some(x) => Some(&x),
|
||||||
|
None => None,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn set_series_id(&mut self, series: u64) {
|
pub fn set_series_id(&mut self, series: u64) {
|
||||||
@@ -228,11 +231,12 @@ impl FromUrl for PlainEventsQuery {
|
|||||||
.map_or("false", |k| k)
|
.map_or("false", |k| k)
|
||||||
.parse()
|
.parse()
|
||||||
.map_err(|e| Error::with_public_msg_no_trace(format!("can not parse doTestStreamError: {}", e)))?,
|
.map_err(|e| Error::with_public_msg_no_trace(format!("can not parse doTestStreamError: {}", e)))?,
|
||||||
test_do_wasm: pairs
|
// test_do_wasm: pairs
|
||||||
.get("testDoWasm")
|
// .get("testDoWasm")
|
||||||
.map(|x| x.parse::<bool>().ok())
|
// .map(|x| x.parse::<bool>().ok())
|
||||||
.unwrap_or(None)
|
// .unwrap_or(None)
|
||||||
.unwrap_or(false),
|
// .unwrap_or(false),
|
||||||
|
test_do_wasm: pairs.get("testDoWasm").map(|x| String::from(x)),
|
||||||
merger_out_len_max: pairs
|
merger_out_len_max: pairs
|
||||||
.get("mergerOutLenMax")
|
.get("mergerOutLenMax")
|
||||||
.map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?,
|
.map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?,
|
||||||
@@ -281,8 +285,8 @@ impl AppendToUrl for PlainEventsQuery {
|
|||||||
if self.do_test_stream_error {
|
if self.do_test_stream_error {
|
||||||
g.append_pair("doTestStreamError", "true");
|
g.append_pair("doTestStreamError", "true");
|
||||||
}
|
}
|
||||||
if self.test_do_wasm {
|
if let Some(x) = &self.test_do_wasm {
|
||||||
g.append_pair("testDoWasm", "true");
|
g.append_pair("testDoWasm", &x);
|
||||||
}
|
}
|
||||||
if let Some(x) = self.merger_out_len_max.as_ref() {
|
if let Some(x) = self.merger_out_len_max.as_ref() {
|
||||||
g.append_pair("mergerOutLenMax", &format!("{}", x));
|
g.append_pair("mergerOutLenMax", &format!("{}", x));
|
||||||
@@ -298,6 +302,7 @@ pub struct EventsSubQuerySelect {
|
|||||||
ch_conf: ChannelTypeConfigGen,
|
ch_conf: ChannelTypeConfigGen,
|
||||||
range: SeriesRange,
|
range: SeriesRange,
|
||||||
transform: TransformQuery,
|
transform: TransformQuery,
|
||||||
|
wasm1: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl EventsSubQuerySelect {
|
impl EventsSubQuerySelect {
|
||||||
@@ -306,8 +311,20 @@ impl EventsSubQuerySelect {
|
|||||||
ch_conf: ch_info,
|
ch_conf: ch_info,
|
||||||
range,
|
range,
|
||||||
transform,
|
transform,
|
||||||
|
wasm1: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn wasm1(&self) -> Option<&str> {
|
||||||
|
match &self.wasm1 {
|
||||||
|
Some(x) => Some(&x),
|
||||||
|
None => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn set_wasm1(&mut self, x: String) {
|
||||||
|
self.wasm1 = Some(x);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
@@ -318,7 +335,6 @@ pub struct EventsSubQuerySettings {
|
|||||||
stream_batch_len: Option<usize>,
|
stream_batch_len: Option<usize>,
|
||||||
buf_len_disk_io: Option<usize>,
|
buf_len_disk_io: Option<usize>,
|
||||||
queue_len_disk_io: Option<usize>,
|
queue_len_disk_io: Option<usize>,
|
||||||
test_do_wasm: bool,
|
|
||||||
create_errors: Vec<String>,
|
create_errors: Vec<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -331,7 +347,6 @@ impl Default for EventsSubQuerySettings {
|
|||||||
stream_batch_len: None,
|
stream_batch_len: None,
|
||||||
buf_len_disk_io: None,
|
buf_len_disk_io: None,
|
||||||
queue_len_disk_io: None,
|
queue_len_disk_io: None,
|
||||||
test_do_wasm: false,
|
|
||||||
create_errors: Vec::new(),
|
create_errors: Vec::new(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -347,7 +362,6 @@ impl From<&PlainEventsQuery> for EventsSubQuerySettings {
|
|||||||
buf_len_disk_io: value.buf_len_disk_io,
|
buf_len_disk_io: value.buf_len_disk_io,
|
||||||
// TODO add to query
|
// TODO add to query
|
||||||
queue_len_disk_io: None,
|
queue_len_disk_io: None,
|
||||||
test_do_wasm: value.test_do_wasm,
|
|
||||||
create_errors: value.create_errors.clone(),
|
create_errors: value.create_errors.clone(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -364,7 +378,6 @@ impl From<&BinnedQuery> for EventsSubQuerySettings {
|
|||||||
buf_len_disk_io: None,
|
buf_len_disk_io: None,
|
||||||
// TODO add to query
|
// TODO add to query
|
||||||
queue_len_disk_io: None,
|
queue_len_disk_io: None,
|
||||||
test_do_wasm: false,
|
|
||||||
create_errors: Vec::new(),
|
create_errors: Vec::new(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -381,7 +394,6 @@ impl From<&Api1Query> for EventsSubQuerySettings {
|
|||||||
stream_batch_len: None,
|
stream_batch_len: None,
|
||||||
buf_len_disk_io: Some(disk_io_tune.read_buffer_len),
|
buf_len_disk_io: Some(disk_io_tune.read_buffer_len),
|
||||||
queue_len_disk_io: Some(disk_io_tune.read_queue_len),
|
queue_len_disk_io: Some(disk_io_tune.read_queue_len),
|
||||||
test_do_wasm: false,
|
|
||||||
create_errors: Vec::new(),
|
create_errors: Vec::new(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -456,10 +468,6 @@ impl EventsSubQuery {
|
|||||||
self.settings.events_max.unwrap_or(1024 * 512)
|
self.settings.events_max.unwrap_or(1024 * 512)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn test_do_wasm(&self) -> bool {
|
|
||||||
self.settings.test_do_wasm
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn is_event_blobs(&self) -> bool {
|
pub fn is_event_blobs(&self) -> bool {
|
||||||
self.select.transform.is_event_blobs()
|
self.select.transform.is_event_blobs()
|
||||||
}
|
}
|
||||||
@@ -475,6 +483,10 @@ impl EventsSubQuery {
|
|||||||
pub fn reqid(&self) -> &str {
|
pub fn reqid(&self) -> &str {
|
||||||
&self.reqid
|
&self.reqid
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn wasm1(&self) -> Option<&str> {
|
||||||
|
self.select.wasm1()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
|||||||
@@ -18,6 +18,7 @@ crc32fast = "1.3.2"
|
|||||||
byteorder = "1.4.3"
|
byteorder = "1.4.3"
|
||||||
async-channel = "1.8.0"
|
async-channel = "1.8.0"
|
||||||
chrono = { version = "0.4.19", features = ["serde"] }
|
chrono = { version = "0.4.19", features = ["serde"] }
|
||||||
|
wasmer = { version = "4.1.0", default-features = false, features = ["sys", "cranelift"] }
|
||||||
err = { path = "../err" }
|
err = { path = "../err" }
|
||||||
netpod = { path = "../netpod" }
|
netpod = { path = "../netpod" }
|
||||||
query = { path = "../query" }
|
query = { path = "../query" }
|
||||||
|
|||||||
@@ -28,7 +28,10 @@ pub async fn plain_events_json(
|
|||||||
cluster: &Cluster,
|
cluster: &Cluster,
|
||||||
) -> Result<JsonValue, Error> {
|
) -> Result<JsonValue, Error> {
|
||||||
info!("plain_events_json evquery {:?}", evq);
|
info!("plain_events_json evquery {:?}", evq);
|
||||||
let select = EventsSubQuerySelect::new(ch_conf, evq.range().clone(), evq.transform().clone());
|
let mut select = EventsSubQuerySelect::new(ch_conf, evq.range().clone(), evq.transform().clone());
|
||||||
|
if let Some(x) = evq.test_do_wasm() {
|
||||||
|
select.set_wasm1(x.into());
|
||||||
|
}
|
||||||
let settings = EventsSubQuerySettings::from(evq);
|
let settings = EventsSubQuerySettings::from(evq);
|
||||||
let subq = EventsSubQuery::from_parts(select, settings, reqid);
|
let subq = EventsSubQuery::from_parts(select, settings, reqid);
|
||||||
// TODO remove magic constant
|
// TODO remove magic constant
|
||||||
@@ -51,15 +54,148 @@ pub async fn plain_events_json(
|
|||||||
info!("item after rangefilter: {item:?}");
|
info!("item after rangefilter: {item:?}");
|
||||||
item
|
item
|
||||||
});
|
});
|
||||||
|
|
||||||
let stream = stream.map(move |k| {
|
let stream = stream.map(move |k| {
|
||||||
on_sitemty_data!(k, |k| {
|
on_sitemty_data!(k, |k| {
|
||||||
let k: Box<dyn Events> = Box::new(k);
|
let k: Box<dyn Events> = Box::new(k);
|
||||||
trace!("got len {}", k.len());
|
// trace!("got len {}", k.len());
|
||||||
let k = tr.0.transform(k);
|
let k = tr.0.transform(k);
|
||||||
|
Ok(StreamItem::DataItem(RangeCompletableItem::Data(k)))
|
||||||
|
})
|
||||||
|
});
|
||||||
|
|
||||||
|
let stream = if let Some(wasmname) = evq.test_do_wasm() {
|
||||||
|
debug!("make wasm transform");
|
||||||
|
use httpclient::url::Url;
|
||||||
|
use wasmer::Value;
|
||||||
|
use wasmer::WasmSlice;
|
||||||
|
let t = httpclient::http_get(
|
||||||
|
Url::parse(&format!("http://data-api.psi.ch/distri/{}", wasmname)).unwrap(),
|
||||||
|
"*/*",
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let wasm = t.body;
|
||||||
|
// let wasm = include_bytes!("dummy.wasm");
|
||||||
|
let mut store = wasmer::Store::default();
|
||||||
|
let module = wasmer::Module::new(&store, wasm).unwrap();
|
||||||
|
// TODO assert that memory is large enough
|
||||||
|
let memory = wasmer::Memory::new(&mut store, wasmer::MemoryType::new(10, Some(30), false)).unwrap();
|
||||||
|
let import_object = wasmer::imports! {
|
||||||
|
"env" => {
|
||||||
|
"memory" => memory.clone(),
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let instance = wasmer::Instance::new(&mut store, &module, &import_object).unwrap();
|
||||||
|
let get_buffer_ptr = instance.exports.get_function("get_buffer_ptr").unwrap();
|
||||||
|
let buffer_ptr = get_buffer_ptr.call(&mut store, &[]).unwrap();
|
||||||
|
let buffer_ptr = buffer_ptr[0].i32().unwrap();
|
||||||
|
let stream = stream.map(move |x| {
|
||||||
|
let memory = memory.clone();
|
||||||
|
let item = on_sitemty_data!(x, |mut evs: Box<dyn Events>| {
|
||||||
|
let x = {
|
||||||
|
use items_0::AsAnyMut;
|
||||||
|
if true {
|
||||||
|
let r1 = evs
|
||||||
|
.as_any_mut()
|
||||||
|
.downcast_mut::<items_2::eventsdim0::EventsDim0<f64>>()
|
||||||
|
.is_some();
|
||||||
|
let r2 = evs
|
||||||
|
.as_mut()
|
||||||
|
.as_any_mut()
|
||||||
|
.downcast_mut::<items_2::eventsdim0::EventsDim0<f64>>()
|
||||||
|
.is_some();
|
||||||
|
let r3 = evs
|
||||||
|
.as_any_mut()
|
||||||
|
.downcast_mut::<Box<items_2::eventsdim0::EventsDim0<f64>>>()
|
||||||
|
.is_some();
|
||||||
|
let r4 = evs
|
||||||
|
.as_mut()
|
||||||
|
.as_any_mut()
|
||||||
|
.downcast_mut::<Box<items_2::eventsdim0::EventsDim0<f64>>>()
|
||||||
|
.is_some();
|
||||||
|
let r5 = evs.as_mut().as_any_mut().downcast_mut::<ChannelEvents>().is_some();
|
||||||
|
let r6 = evs.as_mut().as_any_mut().downcast_mut::<Box<ChannelEvents>>().is_some();
|
||||||
|
debug!("wasm castings: {r1} {r2} {r3} {r4} {r5} {r6}");
|
||||||
|
}
|
||||||
|
if let Some(evs) = evs.as_any_mut().downcast_mut::<ChannelEvents>() {
|
||||||
|
match evs {
|
||||||
|
ChannelEvents::Events(evs) => {
|
||||||
|
if let Some(evs) =
|
||||||
|
evs.as_any_mut().downcast_mut::<items_2::eventsdim0::EventsDim0<f64>>()
|
||||||
|
{
|
||||||
|
use items_0::WithLen;
|
||||||
|
if evs.len() == 0 {
|
||||||
|
debug!("wasm empty EventsDim0<f64>");
|
||||||
|
} else {
|
||||||
|
debug!("wasm see EventsDim0<f64>");
|
||||||
|
let max_len_needed = 16000;
|
||||||
|
let dummy1 = instance.exports.get_function("dummy1").unwrap();
|
||||||
|
let s = evs.values.as_mut_slices();
|
||||||
|
for sl in [s.0, s.1] {
|
||||||
|
if sl.len() > max_len_needed as _ {
|
||||||
|
// TODO cause error
|
||||||
|
panic!();
|
||||||
|
}
|
||||||
|
let wmemoff = buffer_ptr as u64;
|
||||||
|
let view = memory.view(&store);
|
||||||
|
// TODO is the offset bytes or elements?
|
||||||
|
let wsl = WasmSlice::<f64>::new(&view, wmemoff, sl.len() as _).unwrap();
|
||||||
|
// debug!("wasm pages {:?} data size {:?}", view.size(), view.data_size());
|
||||||
|
wsl.write_slice(&sl).unwrap();
|
||||||
|
let ptr = wsl.as_ptr32();
|
||||||
|
debug!("ptr {:?} offset {}", ptr, ptr.offset());
|
||||||
|
let params = [Value::I32(ptr.offset() as _), Value::I32(sl.len() as _)];
|
||||||
|
let res = dummy1.call(&mut store, ¶ms).unwrap();
|
||||||
|
match res[0] {
|
||||||
|
Value::I32(x) => {
|
||||||
|
debug!("wasm dummy1 returned: {x:?}");
|
||||||
|
if x != 1 {
|
||||||
|
error!("unexpected return value {res:?}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
error!("unexpected return type {res:?}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Init the slice again because we need to drop ownership for the function call.
|
||||||
|
let view = memory.view(&store);
|
||||||
|
let wsl = WasmSlice::<f64>::new(&view, wmemoff, sl.len() as _).unwrap();
|
||||||
|
wsl.read_slice(sl).unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
debug!("wasm not EventsDim0<f64>");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ChannelEvents::Status(_) => {}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
debug!("wasm not ChannelEvents");
|
||||||
|
}
|
||||||
|
evs
|
||||||
|
};
|
||||||
|
Ok(StreamItem::DataItem(RangeCompletableItem::Data(x)))
|
||||||
|
});
|
||||||
|
// Box::new(item) as Box<dyn Framable + Send>
|
||||||
|
item
|
||||||
|
});
|
||||||
|
use futures_util::Stream;
|
||||||
|
use items_0::streamitem::Sitemty;
|
||||||
|
use std::pin::Pin;
|
||||||
|
Box::pin(stream) as Pin<Box<dyn Stream<Item = Sitemty<Box<dyn Events>>> + Send>>
|
||||||
|
} else {
|
||||||
|
let stream = stream.map(|x| x);
|
||||||
|
Box::pin(stream)
|
||||||
|
};
|
||||||
|
|
||||||
|
let stream = stream.map(move |k| {
|
||||||
|
on_sitemty_data!(k, |k| {
|
||||||
let k: Box<dyn Collectable> = Box::new(k);
|
let k: Box<dyn Collectable> = Box::new(k);
|
||||||
Ok(StreamItem::DataItem(RangeCompletableItem::Data(k)))
|
Ok(StreamItem::DataItem(RangeCompletableItem::Data(k)))
|
||||||
})
|
})
|
||||||
});
|
});
|
||||||
|
|
||||||
//let stream = PlainEventStream::new(stream);
|
//let stream = PlainEventStream::new(stream);
|
||||||
//let stream = EventsToTimeBinnable::new(stream);
|
//let stream = EventsToTimeBinnable::new(stream);
|
||||||
//let stream = TimeBinnableToCollectable::new(stream);
|
//let stream = TimeBinnableToCollectable::new(stream);
|
||||||
|
|||||||
@@ -108,6 +108,8 @@ pub async fn x_processed_event_blobs_stream_from_node_http(
|
|||||||
Ok(Box::pin(stream))
|
Ok(Box::pin(stream))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Currently used only for the python data api3 protocol endpoint.
|
||||||
|
// TODO merge with main method.
|
||||||
pub async fn x_processed_event_blobs_stream_from_node(
|
pub async fn x_processed_event_blobs_stream_from_node(
|
||||||
subq: EventsSubQuery,
|
subq: EventsSubQuery,
|
||||||
node: Node,
|
node: Node,
|
||||||
|
|||||||
@@ -44,9 +44,12 @@ async fn timebinnable_stream(
|
|||||||
reqid: String,
|
reqid: String,
|
||||||
cluster: Cluster,
|
cluster: Cluster,
|
||||||
) -> Result<TimeBinnableStreamBox, Error> {
|
) -> Result<TimeBinnableStreamBox, Error> {
|
||||||
let select = EventsSubQuerySelect::new(ch_conf, range.clone().into(), query.transform().clone());
|
let mut select = EventsSubQuerySelect::new(ch_conf, range.clone().into(), query.transform().clone());
|
||||||
|
if let Some(wasm1) = query.test_do_wasm() {
|
||||||
|
select.set_wasm1(wasm1.into());
|
||||||
|
}
|
||||||
let settings = EventsSubQuerySettings::from(&query);
|
let settings = EventsSubQuerySettings::from(&query);
|
||||||
let subq = EventsSubQuery::from_parts(select, settings, reqid);
|
let subq = EventsSubQuery::from_parts(select.clone(), settings, reqid);
|
||||||
let mut tr = build_merged_event_transform(subq.transform())?;
|
let mut tr = build_merged_event_transform(subq.transform())?;
|
||||||
let inps = open_event_data_streams::<ChannelEvents>(subq, &cluster).await?;
|
let inps = open_event_data_streams::<ChannelEvents>(subq, &cluster).await?;
|
||||||
// TODO propagate also the max-buf-len for the first stage event reader.
|
// TODO propagate also the max-buf-len for the first stage event reader.
|
||||||
@@ -58,11 +61,144 @@ async fn timebinnable_stream(
|
|||||||
let stream = stream.map(move |k| {
|
let stream = stream.map(move |k| {
|
||||||
on_sitemty_data!(k, |k| {
|
on_sitemty_data!(k, |k| {
|
||||||
let k: Box<dyn Events> = Box::new(k);
|
let k: Box<dyn Events> = Box::new(k);
|
||||||
trace!("got len {}", k.len());
|
// trace!("got len {}", k.len());
|
||||||
let k = tr.0.transform(k);
|
let k = tr.0.transform(k);
|
||||||
Ok(StreamItem::DataItem(RangeCompletableItem::Data(k)))
|
Ok(StreamItem::DataItem(RangeCompletableItem::Data(k)))
|
||||||
})
|
})
|
||||||
});
|
});
|
||||||
|
|
||||||
|
let stream = if let Some(wasmname) = select.wasm1() {
|
||||||
|
debug!("make wasm transform");
|
||||||
|
use httpclient::url::Url;
|
||||||
|
use wasmer::Value;
|
||||||
|
use wasmer::WasmSlice;
|
||||||
|
let t = httpclient::http_get(
|
||||||
|
Url::parse(&format!("http://data-api.psi.ch/distri/{}", wasmname)).unwrap(),
|
||||||
|
"*/*",
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let wasm = t.body;
|
||||||
|
// let wasm = include_bytes!("dummy.wasm");
|
||||||
|
let mut store = wasmer::Store::default();
|
||||||
|
let module = wasmer::Module::new(&store, wasm).unwrap();
|
||||||
|
// TODO assert that memory is large enough
|
||||||
|
let memory = wasmer::Memory::new(&mut store, wasmer::MemoryType::new(10, Some(30), false)).unwrap();
|
||||||
|
let import_object = wasmer::imports! {
|
||||||
|
"env" => {
|
||||||
|
"memory" => memory.clone(),
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let instance = wasmer::Instance::new(&mut store, &module, &import_object).unwrap();
|
||||||
|
let get_buffer_ptr = instance.exports.get_function("get_buffer_ptr").unwrap();
|
||||||
|
let buffer_ptr = get_buffer_ptr.call(&mut store, &[]).unwrap();
|
||||||
|
let buffer_ptr = buffer_ptr[0].i32().unwrap();
|
||||||
|
let stream = stream.map(move |x| {
|
||||||
|
let memory = memory.clone();
|
||||||
|
let item = on_sitemty_data!(x, |mut evs: Box<dyn Events>| {
|
||||||
|
let x = {
|
||||||
|
use items_0::AsAnyMut;
|
||||||
|
if true {
|
||||||
|
let r1 = evs
|
||||||
|
.as_any_mut()
|
||||||
|
.downcast_mut::<items_2::eventsdim0::EventsDim0<f64>>()
|
||||||
|
.is_some();
|
||||||
|
let r2 = evs
|
||||||
|
.as_mut()
|
||||||
|
.as_any_mut()
|
||||||
|
.downcast_mut::<items_2::eventsdim0::EventsDim0<f64>>()
|
||||||
|
.is_some();
|
||||||
|
let r3 = evs
|
||||||
|
.as_any_mut()
|
||||||
|
.downcast_mut::<Box<items_2::eventsdim0::EventsDim0<f64>>>()
|
||||||
|
.is_some();
|
||||||
|
let r4 = evs
|
||||||
|
.as_mut()
|
||||||
|
.as_any_mut()
|
||||||
|
.downcast_mut::<Box<items_2::eventsdim0::EventsDim0<f64>>>()
|
||||||
|
.is_some();
|
||||||
|
let r5 = evs.as_mut().as_any_mut().downcast_mut::<ChannelEvents>().is_some();
|
||||||
|
let r6 = evs.as_mut().as_any_mut().downcast_mut::<Box<ChannelEvents>>().is_some();
|
||||||
|
debug!("wasm castings: {r1} {r2} {r3} {r4} {r5} {r6}");
|
||||||
|
}
|
||||||
|
if let Some(evs) = evs.as_any_mut().downcast_mut::<ChannelEvents>() {
|
||||||
|
match evs {
|
||||||
|
ChannelEvents::Events(evs) => {
|
||||||
|
if let Some(evs) =
|
||||||
|
evs.as_any_mut().downcast_mut::<items_2::eventsdim0::EventsDim0<f64>>()
|
||||||
|
{
|
||||||
|
use items_0::WithLen;
|
||||||
|
if evs.len() == 0 {
|
||||||
|
debug!("wasm empty EventsDim0<f64>");
|
||||||
|
} else {
|
||||||
|
debug!("wasm see EventsDim0<f64> len {}", evs.len());
|
||||||
|
let max_len_needed = 16000;
|
||||||
|
let dummy1 = instance.exports.get_function("dummy1").unwrap();
|
||||||
|
let s = evs.values.as_mut_slices();
|
||||||
|
for sl in [s.0, s.1] {
|
||||||
|
if sl.len() > max_len_needed as _ {
|
||||||
|
// TODO cause error
|
||||||
|
panic!();
|
||||||
|
}
|
||||||
|
let wmemoff = buffer_ptr as u64;
|
||||||
|
let view = memory.view(&store);
|
||||||
|
// TODO is the offset bytes or elements?
|
||||||
|
let wsl = WasmSlice::<f64>::new(&view, wmemoff, sl.len() as _).unwrap();
|
||||||
|
// debug!("wasm pages {:?} data size {:?}", view.size(), view.data_size());
|
||||||
|
wsl.write_slice(&sl).unwrap();
|
||||||
|
let ptr = wsl.as_ptr32();
|
||||||
|
debug!("ptr {:?} offset {}", ptr, ptr.offset());
|
||||||
|
let params = [Value::I32(ptr.offset() as _), Value::I32(sl.len() as _)];
|
||||||
|
let res = dummy1.call(&mut store, ¶ms).unwrap();
|
||||||
|
match res[0] {
|
||||||
|
Value::I32(x) => {
|
||||||
|
debug!("wasm dummy1 returned: {x:?}");
|
||||||
|
if x != 1 {
|
||||||
|
error!("unexpected return value {res:?}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
error!("unexpected return type {res:?}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Init the slice again because we need to drop ownership for the function call.
|
||||||
|
let view = memory.view(&store);
|
||||||
|
let wsl = WasmSlice::<f64>::new(&view, wmemoff, sl.len() as _).unwrap();
|
||||||
|
wsl.read_slice(sl).unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
debug!("wasm not EventsDim0<f64>");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ChannelEvents::Status(_) => {}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
debug!("wasm not ChannelEvents");
|
||||||
|
}
|
||||||
|
evs
|
||||||
|
};
|
||||||
|
Ok(StreamItem::DataItem(RangeCompletableItem::Data(x)))
|
||||||
|
});
|
||||||
|
// Box::new(item) as Box<dyn Framable + Send>
|
||||||
|
item
|
||||||
|
});
|
||||||
|
use futures_util::Stream;
|
||||||
|
use items_0::streamitem::Sitemty;
|
||||||
|
use std::pin::Pin;
|
||||||
|
Box::pin(stream) as Pin<Box<dyn Stream<Item = Sitemty<Box<dyn Events>>> + Send>>
|
||||||
|
} else {
|
||||||
|
let stream = stream.map(|x| x);
|
||||||
|
Box::pin(stream)
|
||||||
|
};
|
||||||
|
|
||||||
|
// let stream = stream.map(move |k| {
|
||||||
|
// on_sitemty_data!(k, |k| {
|
||||||
|
// let k: Box<dyn Collectable> = Box::new(k);
|
||||||
|
// Ok(StreamItem::DataItem(RangeCompletableItem::Data(k)))
|
||||||
|
// })
|
||||||
|
// });
|
||||||
|
|
||||||
let stream = PlainEventStream::new(stream);
|
let stream = PlainEventStream::new(stream);
|
||||||
let stream = EventsToTimeBinnable::new(stream);
|
let stream = EventsToTimeBinnable::new(stream);
|
||||||
let stream = Box::pin(stream);
|
let stream = Box::pin(stream);
|
||||||
|
|||||||
Reference in New Issue
Block a user