CBOR chunked download
This commit is contained in:
@@ -11,7 +11,7 @@ pin-project = "1.0.12"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1.0"
|
||||
serde_cbor = "0.11.1"
|
||||
erased-serde = "0.3.23"
|
||||
ciborium = "0.2.1"
|
||||
bytes = "1.3"
|
||||
arrayref = "0.3.6"
|
||||
crc32fast = "1.3.2"
|
||||
|
||||
@@ -2,6 +2,7 @@ use err::Error;
|
||||
use futures_util::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use items_0::framable::FrameTypeInnerStatic;
|
||||
use items_0::streamitem::RangeCompletableItem;
|
||||
use items_0::streamitem::Sitemty;
|
||||
use items_0::streamitem::StreamItem;
|
||||
use items_2::frame::decode_frame;
|
||||
@@ -64,11 +65,20 @@ where
|
||||
StreamItem::DataItem(frame) => match decode_frame::<Sitemty<O>>(&frame) {
|
||||
Ok(item) => match item {
|
||||
Ok(item) => match item {
|
||||
StreamItem::DataItem(item2) => match item2 {
|
||||
RangeCompletableItem::Data(item3) => {
|
||||
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item3)))))
|
||||
}
|
||||
RangeCompletableItem::RangeComplete => {
|
||||
debug!("EventsFromFrames RangeComplete");
|
||||
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))))
|
||||
}
|
||||
},
|
||||
StreamItem::Log(k) => {
|
||||
//info!("rcvd log: {} {:?} {}", k.node_ix, k.level, k.msg);
|
||||
Ready(Some(Ok(StreamItem::Log(k))))
|
||||
}
|
||||
item => Ready(Some(Ok(item))),
|
||||
StreamItem::Stats(k) => Ready(Some(Ok(StreamItem::Stats(k)))),
|
||||
},
|
||||
Err(e) => {
|
||||
error!("rcvd err: {}", e);
|
||||
|
||||
@@ -6,6 +6,7 @@ pub mod frames;
|
||||
pub mod generators;
|
||||
pub mod itemclone;
|
||||
pub mod needminbuffer;
|
||||
pub mod plaineventscbor;
|
||||
pub mod plaineventsjson;
|
||||
pub mod print_on_done;
|
||||
pub mod rangefilter2;
|
||||
|
||||
114
crates/streams/src/plaineventscbor.rs
Normal file
114
crates/streams/src/plaineventscbor.rs
Normal file
@@ -0,0 +1,114 @@
|
||||
use crate::plaineventsjson::dyn_events_stream;
|
||||
use bytes::Bytes;
|
||||
use err::Error;
|
||||
use futures_util::future;
|
||||
use futures_util::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use items_0::streamitem::LogItem;
|
||||
use items_0::streamitem::RangeCompletableItem;
|
||||
use items_0::streamitem::StreamItem;
|
||||
use netpod::log::Level;
|
||||
use netpod::log::*;
|
||||
use netpod::ChannelTypeConfigGen;
|
||||
use netpod::NodeConfigCached;
|
||||
use netpod::ReqCtx;
|
||||
use query::api4::events::PlainEventsQuery;
|
||||
use std::pin::Pin;
|
||||
|
||||
pub struct CborBytes(Bytes);
|
||||
|
||||
impl CborBytes {
|
||||
pub fn into_inner(self) -> Bytes {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
pub type CborStream = Pin<Box<dyn Stream<Item = Result<CborBytes, Error>> + Send>>;
|
||||
|
||||
pub async fn plain_events_cbor(
|
||||
evq: &PlainEventsQuery,
|
||||
ch_conf: ChannelTypeConfigGen,
|
||||
ctx: &ReqCtx,
|
||||
ncc: &NodeConfigCached,
|
||||
) -> Result<CborStream, Error> {
|
||||
let stream = dyn_events_stream(evq, ch_conf, ctx, &ncc.node_config.cluster).await?;
|
||||
let stream = stream
|
||||
.map(|x| match x {
|
||||
Ok(x) => match x {
|
||||
StreamItem::DataItem(x) => match x {
|
||||
RangeCompletableItem::Data(evs) => {
|
||||
if false {
|
||||
use items_0::AsAnyRef;
|
||||
// TODO impl generically on EventsDim0 ?
|
||||
if let Some(evs) = evs.as_any_ref().downcast_ref::<items_2::eventsdim0::EventsDim0<f64>>() {
|
||||
let mut buf = Vec::new();
|
||||
ciborium::into_writer(evs, &mut buf)
|
||||
.map_err(|e| Error::with_msg_no_trace(format!("{e}")))?;
|
||||
let bytes = Bytes::from(buf);
|
||||
let item = CborBytes(bytes);
|
||||
// Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))
|
||||
} else {
|
||||
let item = LogItem::from_node(0, Level::DEBUG, format!("cbor stream discarded item"));
|
||||
// Ok(StreamItem::Log(item))
|
||||
};
|
||||
}
|
||||
let buf = evs.to_cbor_vec_u8();
|
||||
let bytes = Bytes::from(buf);
|
||||
let item = CborBytes(bytes);
|
||||
Ok(item)
|
||||
}
|
||||
RangeCompletableItem::RangeComplete => {
|
||||
use ciborium::cbor;
|
||||
let item = cbor!({
|
||||
"rangeFinal" => true,
|
||||
})
|
||||
.map_err(Error::from_string)?;
|
||||
let mut buf = Vec::with_capacity(64);
|
||||
ciborium::into_writer(&item, &mut buf).map_err(Error::from_string)?;
|
||||
let bytes = Bytes::from(buf);
|
||||
let item = CborBytes(bytes);
|
||||
Ok(item)
|
||||
}
|
||||
},
|
||||
StreamItem::Log(item) => {
|
||||
info!("{item:?}");
|
||||
let item = CborBytes(Bytes::new());
|
||||
Ok(item)
|
||||
}
|
||||
StreamItem::Stats(item) => {
|
||||
info!("{item:?}");
|
||||
let item = CborBytes(Bytes::new());
|
||||
Ok(item)
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
use ciborium::cbor;
|
||||
let item = cbor!({
|
||||
"error" => e.to_string(),
|
||||
})
|
||||
.map_err(Error::from_string)?;
|
||||
let mut buf = Vec::with_capacity(64);
|
||||
ciborium::into_writer(&item, &mut buf).map_err(Error::from_string)?;
|
||||
let bytes = Bytes::from(buf);
|
||||
let item = CborBytes(bytes);
|
||||
Ok(item)
|
||||
}
|
||||
})
|
||||
.filter(|x| {
|
||||
future::ready(match x {
|
||||
Ok(x) => x.0.len() > 0,
|
||||
Err(_) => true,
|
||||
})
|
||||
})
|
||||
.take_while({
|
||||
let mut state = true;
|
||||
move |x| {
|
||||
let ret = state;
|
||||
if x.is_err() {
|
||||
state = false;
|
||||
}
|
||||
future::ready(ret)
|
||||
}
|
||||
});
|
||||
Ok(Box::pin(stream))
|
||||
}
|
||||
@@ -4,9 +4,13 @@ use crate::transform::build_merged_event_transform;
|
||||
use crate::transform::EventsToTimeBinnable;
|
||||
use crate::transform::TimeBinnableToCollectable;
|
||||
use err::Error;
|
||||
use futures_util::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use items_0::collect_s::Collectable;
|
||||
use items_0::on_sitemty_data;
|
||||
use items_0::streamitem::RangeCompletableItem;
|
||||
use items_0::streamitem::Sitemty;
|
||||
use items_0::streamitem::StreamItem;
|
||||
use items_0::Events;
|
||||
use items_2::channelevents::ChannelEvents;
|
||||
use items_2::merger::Merger;
|
||||
@@ -20,6 +24,7 @@ use query::api4::events::EventsSubQuerySelect;
|
||||
use query::api4::events::EventsSubQuerySettings;
|
||||
use query::api4::events::PlainEventsQuery;
|
||||
use serde_json::Value as JsonValue;
|
||||
use std::pin::Pin;
|
||||
use std::time::Instant;
|
||||
|
||||
pub async fn plain_events_json(
|
||||
@@ -29,14 +34,40 @@ pub async fn plain_events_json(
|
||||
cluster: &Cluster,
|
||||
) -> Result<JsonValue, Error> {
|
||||
info!("plain_events_json evquery {:?}", evq);
|
||||
let deadline = Instant::now() + evq.timeout();
|
||||
|
||||
let stream = dyn_events_stream(evq, ch_conf, ctx, cluster).await?;
|
||||
|
||||
let stream = stream.map(move |k| {
|
||||
on_sitemty_data!(k, |k| {
|
||||
let k: Box<dyn Collectable> = Box::new(k);
|
||||
Ok(StreamItem::DataItem(RangeCompletableItem::Data(k)))
|
||||
})
|
||||
});
|
||||
|
||||
//let stream = PlainEventStream::new(stream);
|
||||
//let stream = EventsToTimeBinnable::new(stream);
|
||||
//let stream = TimeBinnableToCollectable::new(stream);
|
||||
let stream = Box::pin(stream);
|
||||
let collected = Collect::new(stream, deadline, evq.events_max(), Some(evq.range().clone()), None).await?;
|
||||
let jsval = serde_json::to_value(&collected)?;
|
||||
Ok(jsval)
|
||||
}
|
||||
|
||||
pub type DynEventsStream = Pin<Box<dyn Stream<Item = Sitemty<Box<dyn Events>>> + Send>>;
|
||||
|
||||
pub async fn dyn_events_stream(
|
||||
evq: &PlainEventsQuery,
|
||||
ch_conf: ChannelTypeConfigGen,
|
||||
ctx: &ReqCtx,
|
||||
cluster: &Cluster,
|
||||
) -> Result<DynEventsStream, Error> {
|
||||
let mut select = EventsSubQuerySelect::new(ch_conf, evq.range().clone(), evq.transform().clone());
|
||||
if let Some(x) = evq.test_do_wasm() {
|
||||
select.set_wasm1(x.into());
|
||||
}
|
||||
let settings = EventsSubQuerySettings::from(evq);
|
||||
let subq = EventsSubQuery::from_parts(select, settings, ctx.reqid().into());
|
||||
// TODO remove magic constant
|
||||
let deadline = Instant::now() + evq.timeout();
|
||||
let mut tr = build_merged_event_transform(evq.transform())?;
|
||||
// TODO make sure the empty container arrives over the network.
|
||||
let inps = open_event_data_streams::<ChannelEvents>(subq, ctx, cluster).await?;
|
||||
@@ -65,144 +96,137 @@ pub async fn plain_events_json(
|
||||
})
|
||||
});
|
||||
|
||||
let stream = if let Some(wasmname) = evq.test_do_wasm() {
|
||||
debug!("make wasm transform");
|
||||
use httpclient::url::Url;
|
||||
use wasmer::Value;
|
||||
use wasmer::WasmSlice;
|
||||
let t = httpclient::http_get(
|
||||
Url::parse(&format!("http://data-api.psi.ch/distri/{}", wasmname)).unwrap(),
|
||||
"*/*",
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let wasm = t.body;
|
||||
// let wasm = include_bytes!("dummy.wasm");
|
||||
let mut store = wasmer::Store::default();
|
||||
let module = wasmer::Module::new(&store, wasm).unwrap();
|
||||
// TODO assert that memory is large enough
|
||||
let memory = wasmer::Memory::new(&mut store, wasmer::MemoryType::new(10, Some(30), false)).unwrap();
|
||||
let import_object = wasmer::imports! {
|
||||
"env" => {
|
||||
"memory" => memory.clone(),
|
||||
}
|
||||
};
|
||||
let instance = wasmer::Instance::new(&mut store, &module, &import_object).unwrap();
|
||||
let get_buffer_ptr = instance.exports.get_function("get_buffer_ptr").unwrap();
|
||||
let buffer_ptr = get_buffer_ptr.call(&mut store, &[]).unwrap();
|
||||
let buffer_ptr = buffer_ptr[0].i32().unwrap();
|
||||
let stream = stream.map(move |x| {
|
||||
let memory = memory.clone();
|
||||
let item = on_sitemty_data!(x, |mut evs: Box<dyn Events>| {
|
||||
let x = {
|
||||
use items_0::AsAnyMut;
|
||||
if true {
|
||||
let r1 = evs
|
||||
.as_any_mut()
|
||||
.downcast_mut::<items_2::eventsdim0::EventsDim0<f64>>()
|
||||
.is_some();
|
||||
let r2 = evs
|
||||
.as_mut()
|
||||
.as_any_mut()
|
||||
.downcast_mut::<items_2::eventsdim0::EventsDim0<f64>>()
|
||||
.is_some();
|
||||
let r3 = evs
|
||||
.as_any_mut()
|
||||
.downcast_mut::<Box<items_2::eventsdim0::EventsDim0<f64>>>()
|
||||
.is_some();
|
||||
let r4 = evs
|
||||
.as_mut()
|
||||
.as_any_mut()
|
||||
.downcast_mut::<Box<items_2::eventsdim0::EventsDim0<f64>>>()
|
||||
.is_some();
|
||||
let r5 = evs.as_mut().as_any_mut().downcast_mut::<ChannelEvents>().is_some();
|
||||
let r6 = evs.as_mut().as_any_mut().downcast_mut::<Box<ChannelEvents>>().is_some();
|
||||
debug!("wasm castings: {r1} {r2} {r3} {r4} {r5} {r6}");
|
||||
}
|
||||
if let Some(evs) = evs.as_any_mut().downcast_mut::<ChannelEvents>() {
|
||||
match evs {
|
||||
ChannelEvents::Events(evs) => {
|
||||
if let Some(evs) =
|
||||
evs.as_any_mut().downcast_mut::<items_2::eventsdim0::EventsDim0<f64>>()
|
||||
{
|
||||
use items_0::WithLen;
|
||||
if evs.len() == 0 {
|
||||
debug!("wasm empty EventsDim0<f64>");
|
||||
} else {
|
||||
debug!("wasm see EventsDim0<f64>");
|
||||
let max_len_needed = 16000;
|
||||
let dummy1 = instance.exports.get_function("dummy1").unwrap();
|
||||
let s = evs.values.as_mut_slices();
|
||||
for sl in [s.0, s.1] {
|
||||
if sl.len() > max_len_needed as _ {
|
||||
// TODO cause error
|
||||
panic!();
|
||||
}
|
||||
let wmemoff = buffer_ptr as u64;
|
||||
let view = memory.view(&store);
|
||||
// TODO is the offset bytes or elements?
|
||||
let wsl = WasmSlice::<f64>::new(&view, wmemoff, sl.len() as _).unwrap();
|
||||
// debug!("wasm pages {:?} data size {:?}", view.size(), view.data_size());
|
||||
wsl.write_slice(&sl).unwrap();
|
||||
let ptr = wsl.as_ptr32();
|
||||
debug!("ptr {:?} offset {}", ptr, ptr.offset());
|
||||
let params = [Value::I32(ptr.offset() as _), Value::I32(sl.len() as _)];
|
||||
let res = dummy1.call(&mut store, ¶ms).unwrap();
|
||||
match res[0] {
|
||||
Value::I32(x) => {
|
||||
debug!("wasm dummy1 returned: {x:?}");
|
||||
if x != 1 {
|
||||
error!("unexpected return value {res:?}");
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
error!("unexpected return type {res:?}");
|
||||
}
|
||||
}
|
||||
// Init the slice again because we need to drop ownership for the function call.
|
||||
let view = memory.view(&store);
|
||||
let wsl = WasmSlice::<f64>::new(&view, wmemoff, sl.len() as _).unwrap();
|
||||
wsl.read_slice(sl).unwrap();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
debug!("wasm not EventsDim0<f64>");
|
||||
}
|
||||
}
|
||||
ChannelEvents::Status(_) => {}
|
||||
}
|
||||
} else {
|
||||
debug!("wasm not ChannelEvents");
|
||||
}
|
||||
evs
|
||||
};
|
||||
Ok(StreamItem::DataItem(RangeCompletableItem::Data(x)))
|
||||
});
|
||||
// Box::new(item) as Box<dyn Framable + Send>
|
||||
item
|
||||
});
|
||||
use futures_util::Stream;
|
||||
use items_0::streamitem::Sitemty;
|
||||
use std::pin::Pin;
|
||||
Box::pin(stream) as Pin<Box<dyn Stream<Item = Sitemty<Box<dyn Events>>> + Send>>
|
||||
if let Some(wasmname) = evq.test_do_wasm() {
|
||||
let stream = transform_wasm(stream, wasmname, ctx).await?;
|
||||
Ok(Box::pin(stream))
|
||||
} else {
|
||||
let stream = stream.map(|x| x);
|
||||
Box::pin(stream)
|
||||
};
|
||||
|
||||
let stream = stream.map(move |k| {
|
||||
on_sitemty_data!(k, |k| {
|
||||
let k: Box<dyn Collectable> = Box::new(k);
|
||||
Ok(StreamItem::DataItem(RangeCompletableItem::Data(k)))
|
||||
})
|
||||
});
|
||||
|
||||
//let stream = PlainEventStream::new(stream);
|
||||
//let stream = EventsToTimeBinnable::new(stream);
|
||||
//let stream = TimeBinnableToCollectable::new(stream);
|
||||
let stream = Box::pin(stream);
|
||||
let collected = Collect::new(stream, deadline, evq.events_max(), Some(evq.range().clone()), None).await?;
|
||||
let jsval = serde_json::to_value(&collected)?;
|
||||
Ok(jsval)
|
||||
// let stream = stream.map(|x| x);
|
||||
Ok(Box::pin(stream))
|
||||
}
|
||||
}
|
||||
|
||||
async fn transform_wasm<INP>(
|
||||
stream: INP,
|
||||
wasmname: &str,
|
||||
ctx: &ReqCtx,
|
||||
) -> Result<impl Stream<Item = Result<StreamItem<RangeCompletableItem<Box<dyn Events>>>, Error>> + Send, Error>
|
||||
where
|
||||
INP: Stream<Item = Result<StreamItem<RangeCompletableItem<Box<dyn Events>>>, Error>> + Send + 'static,
|
||||
{
|
||||
debug!("make wasm transform");
|
||||
use httpclient::url::Url;
|
||||
use wasmer::Value;
|
||||
use wasmer::WasmSlice;
|
||||
let t = httpclient::http_get(
|
||||
Url::parse(&format!("http://data-api.psi.ch/distri/{}", wasmname)).unwrap(),
|
||||
"*/*",
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let wasm = t.body;
|
||||
// let wasm = include_bytes!("dummy.wasm");
|
||||
let mut store = wasmer::Store::default();
|
||||
let module = wasmer::Module::new(&store, wasm).unwrap();
|
||||
// TODO assert that memory is large enough
|
||||
let memory = wasmer::Memory::new(&mut store, wasmer::MemoryType::new(10, Some(30), false)).unwrap();
|
||||
let import_object = wasmer::imports! {
|
||||
"env" => {
|
||||
"memory" => memory.clone(),
|
||||
}
|
||||
};
|
||||
let instance = wasmer::Instance::new(&mut store, &module, &import_object).unwrap();
|
||||
let get_buffer_ptr = instance.exports.get_function("get_buffer_ptr").unwrap();
|
||||
let buffer_ptr = get_buffer_ptr.call(&mut store, &[]).unwrap();
|
||||
let buffer_ptr = buffer_ptr[0].i32().unwrap();
|
||||
let stream = stream.map(move |x| {
|
||||
let memory = memory.clone();
|
||||
let item = on_sitemty_data!(x, |mut evs: Box<dyn Events>| {
|
||||
let x = {
|
||||
use items_0::AsAnyMut;
|
||||
if true {
|
||||
let r1 = evs
|
||||
.as_any_mut()
|
||||
.downcast_mut::<items_2::eventsdim0::EventsDim0<f64>>()
|
||||
.is_some();
|
||||
let r2 = evs
|
||||
.as_mut()
|
||||
.as_any_mut()
|
||||
.downcast_mut::<items_2::eventsdim0::EventsDim0<f64>>()
|
||||
.is_some();
|
||||
let r3 = evs
|
||||
.as_any_mut()
|
||||
.downcast_mut::<Box<items_2::eventsdim0::EventsDim0<f64>>>()
|
||||
.is_some();
|
||||
let r4 = evs
|
||||
.as_mut()
|
||||
.as_any_mut()
|
||||
.downcast_mut::<Box<items_2::eventsdim0::EventsDim0<f64>>>()
|
||||
.is_some();
|
||||
let r5 = evs.as_mut().as_any_mut().downcast_mut::<ChannelEvents>().is_some();
|
||||
let r6 = evs.as_mut().as_any_mut().downcast_mut::<Box<ChannelEvents>>().is_some();
|
||||
debug!("wasm castings: {r1} {r2} {r3} {r4} {r5} {r6}");
|
||||
}
|
||||
if let Some(evs) = evs.as_any_mut().downcast_mut::<ChannelEvents>() {
|
||||
match evs {
|
||||
ChannelEvents::Events(evs) => {
|
||||
if let Some(evs) = evs.as_any_mut().downcast_mut::<items_2::eventsdim0::EventsDim0<f64>>() {
|
||||
use items_0::WithLen;
|
||||
if evs.len() == 0 {
|
||||
debug!("wasm empty EventsDim0<f64>");
|
||||
} else {
|
||||
debug!("wasm see EventsDim0<f64>");
|
||||
let max_len_needed = 16000;
|
||||
let dummy1 = instance.exports.get_function("dummy1").unwrap();
|
||||
let s = evs.values.as_mut_slices();
|
||||
for sl in [s.0, s.1] {
|
||||
if sl.len() > max_len_needed as _ {
|
||||
// TODO cause error
|
||||
panic!();
|
||||
}
|
||||
let wmemoff = buffer_ptr as u64;
|
||||
let view = memory.view(&store);
|
||||
// TODO is the offset bytes or elements?
|
||||
let wsl = WasmSlice::<f64>::new(&view, wmemoff, sl.len() as _).unwrap();
|
||||
// debug!("wasm pages {:?} data size {:?}", view.size(), view.data_size());
|
||||
wsl.write_slice(&sl).unwrap();
|
||||
let ptr = wsl.as_ptr32();
|
||||
debug!("ptr {:?} offset {}", ptr, ptr.offset());
|
||||
let params = [Value::I32(ptr.offset() as _), Value::I32(sl.len() as _)];
|
||||
let res = dummy1.call(&mut store, ¶ms).unwrap();
|
||||
match res[0] {
|
||||
Value::I32(x) => {
|
||||
debug!("wasm dummy1 returned: {x:?}");
|
||||
if x != 1 {
|
||||
error!("unexpected return value {res:?}");
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
error!("unexpected return type {res:?}");
|
||||
}
|
||||
}
|
||||
// Init the slice again because we need to drop ownership for the function call.
|
||||
let view = memory.view(&store);
|
||||
let wsl = WasmSlice::<f64>::new(&view, wmemoff, sl.len() as _).unwrap();
|
||||
wsl.read_slice(sl).unwrap();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
debug!("wasm not EventsDim0<f64>");
|
||||
}
|
||||
}
|
||||
ChannelEvents::Status(_) => {}
|
||||
}
|
||||
} else {
|
||||
debug!("wasm not ChannelEvents");
|
||||
}
|
||||
evs
|
||||
};
|
||||
Ok(StreamItem::DataItem(RangeCompletableItem::Data(x)))
|
||||
});
|
||||
// Box::new(item) as Box<dyn Framable + Send>
|
||||
item
|
||||
});
|
||||
let ret: Pin<Box<dyn Stream<Item = Sitemty<Box<dyn Events>>> + Send>> = Box::pin(stream);
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user