Files
daqbuffer/crates/streams/src/timebinnedjson.rs
Dominik Werder ef021ff971 WIP
2024-11-05 14:27:41 +01:00

460 lines
19 KiB
Rust

use crate::collect::Collect;
use crate::collect::CollectResult;
use crate::json_stream::JsonBytes;
use crate::json_stream::JsonStream;
use crate::rangefilter2::RangeFilter2;
use crate::tcprawclient::container_stream_from_bytes_stream;
use crate::tcprawclient::make_sub_query;
use crate::tcprawclient::OpenBoxedBytesStreamsBox;
use crate::timebin::cached::reader::EventsReadProvider;
use crate::timebin::CacheReadProvider;
use crate::transform::build_merged_event_transform;
use futures_util::future::BoxFuture;
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::collect_s::CollectableDyn;
use items_0::on_sitemty_data;
use items_0::streamitem::sitem_err2_from_string;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
use items_0::Events;
use items_2::channelevents::ChannelEvents;
use items_2::merger::Merger;
use netpod::log::*;
use netpod::range::evrange::NanoRange;
use netpod::BinnedRangeEnum;
use netpod::ChannelTypeConfigGen;
use netpod::DtMs;
use netpod::ReqCtx;
use query::api4::binned::BinnedQuery;
use query::api4::events::EventsSubQuerySettings;
use query::transform::TransformQuery;
use serde_json::Value as JsonValue;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
#[derive(Debug, thiserror::Error)]
#[cstm(name = "TimebinnedJson")]
pub enum Error {
Query(#[from] query::api4::binned::Error),
FromLayers(#[from] super::timebin::fromlayers::Error),
Transform(#[from] super::transform::Error),
TcpRawClient(#[from] crate::tcprawclient::Error),
Collect(#[from] crate::collect::Error),
Json(#[from] serde_json::Error),
Msg(String),
}
struct ErrMsg<E>(E)
where
E: ToString;
impl<E> From<ErrMsg<E>> for Error
where
E: ToString,
{
fn from(value: ErrMsg<E>) -> Self {
Self::Msg(value.0.to_string())
}
}
#[allow(unused)]
fn assert_stream_send<'u, R>(stream: impl 'u + Send + Stream<Item = R>) -> impl 'u + Send + Stream<Item = R> {
stream
}
pub async fn timebinnable_stream_sf_databuffer_channelevents(
range: NanoRange,
one_before_range: bool,
ch_conf: ChannelTypeConfigGen,
transform_query: TransformQuery,
sub: EventsSubQuerySettings,
log_level: String,
ctx: Arc<ReqCtx>,
open_bytes: OpenBoxedBytesStreamsBox,
) -> Result<impl Stream<Item = Sitemty<ChannelEvents>>, Error> {
let subq = make_sub_query(
ch_conf,
range.clone().into(),
one_before_range,
transform_query,
sub.clone(),
log_level.clone(),
&ctx,
);
let inmem_bufcap = subq.inmem_bufcap();
let _wasm1 = subq.wasm1().map(ToString::to_string);
let mut tr = build_merged_event_transform(subq.transform())?;
let bytes_streams = open_bytes.open(subq, ctx.as_ref().clone()).await?;
let mut inps = Vec::new();
for s in bytes_streams {
let s = container_stream_from_bytes_stream::<ChannelEvents>(s, inmem_bufcap.clone(), "TODOdbgdesc".into())?;
let s = Box::pin(s) as Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>>;
inps.push(s);
}
// TODO propagate also the max-buf-len for the first stage event reader.
// TODO use a mixture of count and byte-size as threshold.
let stream = Merger::new(inps, sub.merger_out_len_max());
let stream = RangeFilter2::new(stream, range, one_before_range);
let stream = stream.map(move |k: Sitemty<ChannelEvents>| {
use ChannelEvents;
use RangeCompletableItem::*;
use StreamItem::*;
match k {
Ok(DataItem(Data(ChannelEvents::Events(k)))) => {
// let k = k;
// let k: Box<dyn Events> = Box::new(k);
let k = k.to_dim0_f32_for_binning();
let k = tr.0.transform(k);
Ok(StreamItem::DataItem(RangeCompletableItem::Data(ChannelEvents::Events(
k,
))))
}
_ => k,
}
});
#[cfg(feature = "wasm_transform")]
let stream = if let Some(wasmname) = wasm1 {
debug!("make wasm transform");
use httpclient::url::Url;
use wasmer::Value;
use wasmer::WasmSlice;
let t = httpclient::http_get(
Url::parse(&format!("http://data-api.psi.ch/distri/{}", wasmname)).unwrap(),
"*/*",
ctx,
)
.await
.unwrap();
let wasm = t.body;
// let wasm = include_bytes!("dummy.wasm");
let mut store = wasmer::Store::default();
let module = wasmer::Module::new(&store, wasm).unwrap();
// TODO assert that memory is large enough
let memory = wasmer::Memory::new(&mut store, wasmer::MemoryType::new(10, Some(30), false)).unwrap();
let import_object = wasmer::imports! {
"env" => {
"memory" => memory.clone(),
}
};
let instance = wasmer::Instance::new(&mut store, &module, &import_object).unwrap();
let get_buffer_ptr = instance.exports.get_function("get_buffer_ptr").unwrap();
let buffer_ptr = get_buffer_ptr.call(&mut store, &[]).unwrap();
let buffer_ptr = buffer_ptr[0].i32().unwrap();
let stream = stream.map(move |x| {
let memory = memory.clone();
let item = on_sitemty_data!(x, |mut evs: Box<dyn Events>| {
let x = {
use items_0::AsAnyMut;
if true {
let r1 = evs
.as_any_mut()
.downcast_mut::<items_2::eventsdim0::EventsDim0<f64>>()
.is_some();
let r2 = evs
.as_mut()
.as_any_mut()
.downcast_mut::<items_2::eventsdim0::EventsDim0<f64>>()
.is_some();
let r3 = evs
.as_any_mut()
.downcast_mut::<Box<items_2::eventsdim0::EventsDim0<f64>>>()
.is_some();
let r4 = evs
.as_mut()
.as_any_mut()
.downcast_mut::<Box<items_2::eventsdim0::EventsDim0<f64>>>()
.is_some();
let r5 = evs.as_mut().as_any_mut().downcast_mut::<ChannelEvents>().is_some();
let r6 = evs.as_mut().as_any_mut().downcast_mut::<Box<ChannelEvents>>().is_some();
debug!("wasm castings: {r1} {r2} {r3} {r4} {r5} {r6}");
}
if let Some(evs) = evs.as_any_mut().downcast_mut::<ChannelEvents>() {
match evs {
ChannelEvents::Events(evs) => {
if let Some(evs) =
evs.as_any_mut().downcast_mut::<items_2::eventsdim0::EventsDim0<f64>>()
{
use items_0::WithLen;
if evs.len() == 0 {
debug!("wasm empty EventsDim0<f64>");
} else {
debug!("wasm see EventsDim0<f64> len {}", evs.len());
let max_len_needed = 16000;
let dummy1 = instance.exports.get_function("dummy1").unwrap();
let s = evs.values.as_mut_slices();
for sl in [s.0, s.1] {
if sl.len() > max_len_needed as _ {
// TODO cause error
panic!();
}
let wmemoff = buffer_ptr as u64;
let view = memory.view(&store);
// TODO is the offset bytes or elements?
let wsl = WasmSlice::<f64>::new(&view, wmemoff, sl.len() as _).unwrap();
// debug!("wasm pages {:?} data size {:?}", view.size(), view.data_size());
wsl.write_slice(&sl).unwrap();
let ptr = wsl.as_ptr32();
debug!("ptr {:?} offset {}", ptr, ptr.offset());
let params = [Value::I32(ptr.offset() as _), Value::I32(sl.len() as _)];
let res = dummy1.call(&mut store, &params).unwrap();
match res[0] {
Value::I32(x) => {
debug!("wasm dummy1 returned: {x:?}");
if x != 1 {
error!("unexpected return value {res:?}");
}
}
_ => {
error!("unexpected return type {res:?}");
}
}
// Init the slice again because we need to drop ownership for the function call.
let view = memory.view(&store);
let wsl = WasmSlice::<f64>::new(&view, wmemoff, sl.len() as _).unwrap();
wsl.read_slice(sl).unwrap();
}
}
} else {
debug!("wasm not EventsDim0<f64>");
}
}
ChannelEvents::Status(_) => {}
}
} else {
debug!("wasm not ChannelEvents");
}
evs
};
Ok(StreamItem::DataItem(RangeCompletableItem::Data(x)))
});
// Box::new(item) as Box<dyn Framable + Send>
item
});
Box::pin(stream) as Pin<Box<dyn Stream<Item = Sitemty<Box<dyn Events>>> + Send>>
} else {
let stream = stream.map(|x| x);
Box::pin(stream)
};
Ok(stream)
}
async fn timebinned_stream(
query: BinnedQuery,
binned_range: BinnedRangeEnum,
ch_conf: ChannelTypeConfigGen,
ctx: &ReqCtx,
cache_read_provider: Arc<dyn CacheReadProvider>,
events_read_provider: Arc<dyn EventsReadProvider>,
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<Box<dyn CollectableDyn>>> + Send>>, Error> {
use netpod::query::CacheUsage;
let cache_usage = query.cache_usage().unwrap_or(CacheUsage::Ignore);
let do_time_weight = true;
let bin_len_layers = if let Some(subgrids) = query.subgrids() {
subgrids
.iter()
.map(|&x| DtMs::from_ms_u64(1000 * x.as_secs()))
.collect()
} else {
netpod::time_bin_len_cache_opts().to_vec()
};
let stream = crate::timebin::TimeBinnedFromLayers::new(
ch_conf,
cache_usage,
query.transform().clone(),
EventsSubQuerySettings::from(&query),
query.log_level().into(),
Arc::new(ctx.clone()),
binned_range.binned_range_time(),
do_time_weight,
bin_len_layers,
cache_read_provider,
events_read_provider,
)?;
let stream = stream.map(|item| {
use items_0::timebin::BinningggContainerBinsDyn;
on_sitemty_data!(item, |mut x: Box<dyn BinningggContainerBinsDyn>| {
x.fix_numerics();
let ret = Box::new(x) as Box<dyn CollectableDyn>;
Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret)))
})
});
let stream = Box::pin(stream);
Ok(stream)
}
pub async fn timebinned_json(
query: BinnedQuery,
ch_conf: ChannelTypeConfigGen,
ctx: &ReqCtx,
cache_read_provider: Arc<dyn CacheReadProvider>,
events_read_provider: Arc<dyn EventsReadProvider>,
) -> Result<CollectResult<JsonValue>, Error> {
let deadline = Instant::now()
+ query
.timeout_content()
.unwrap_or(Duration::from_millis(3000))
.min(Duration::from_millis(5000))
.max(Duration::from_millis(200));
let binned_range = query.covering_range()?;
// TODO derive better values, from query
let collect_max = 10000;
let bytes_max = 100 * collect_max;
let stream = timebinned_stream(
query.clone(),
binned_range.clone(),
ch_conf,
ctx,
cache_read_provider,
events_read_provider,
)
.await?;
let collected = Collect::new(stream, deadline, collect_max, bytes_max, None, Some(binned_range));
let collected: BoxFuture<_> = Box::pin(collected);
let collres = collected.await?;
match collres {
CollectResult::Some(collres) => {
let collres = if let Some(_bins) = collres
.as_any_ref()
.downcast_ref::<items_2::binsdim0::BinsDim0CollectedResult<netpod::EnumVariant>>()
{
debug!("unexpected binned enum");
// bins.boxed_collected_with_enum_fix()
collres
} else {
debug!("timebinned_json collected type_name {:?}", collres.type_name());
collres
};
let jsval = collres.to_json_value()?;
Ok(CollectResult::Some(jsval))
}
CollectResult::Timeout => Ok(CollectResult::Timeout),
}
}
fn take_collector_result(coll: &mut Box<dyn items_0::collect_s::CollectorDyn>) -> Option<serde_json::Value> {
match coll.result(None, None) {
Ok(collres) => {
let collres = if let Some(_bins) = collres
.as_any_ref()
.downcast_ref::<items_2::binsdim0::BinsDim0CollectedResult<netpod::EnumVariant>>()
{
warn!("unexpected binned enum");
// bins.boxed_collected_with_enum_fix()
collres
} else {
collres
};
match collres.to_json_value() {
Ok(val) => Some(val),
Err(e) => Some(serde_json::Value::String(format!("{e}"))),
}
}
Err(e) => Some(serde_json::Value::String(format!("{e}"))),
}
}
pub async fn timebinned_json_framed(
query: BinnedQuery,
ch_conf: ChannelTypeConfigGen,
ctx: &ReqCtx,
cache_read_provider: Arc<dyn CacheReadProvider>,
events_read_provider: Arc<dyn EventsReadProvider>,
) -> Result<JsonStream, Error> {
trace!("timebinned_json_framed");
let binned_range = query.covering_range()?;
// TODO derive better values, from query
let stream = timebinned_stream(
query.clone(),
binned_range.clone(),
ch_conf,
ctx,
cache_read_provider,
events_read_provider,
)
.await?;
// let stream = timebinned_to_collectable(stream);
// TODO create a custom Stream adapter.
// Want to timeout only on data items: the user wants to wait for bins only a maximum time.
// But also, I want to coalesce.
let timeout_content_base = query
.timeout_content()
.unwrap_or(Duration::from_millis(1000))
.min(Duration::from_millis(5000))
.max(Duration::from_millis(100));
let timeout_content_2 = timeout_content_base * 2 / 3;
let mut coll = None;
let interval = tokio::time::interval(Duration::from(timeout_content_base));
let mut last_emit = Instant::now();
let stream = stream.map(|x| Some(x)).chain(futures_util::stream::iter([None]));
let stream = tokio_stream::StreamExt::timeout_repeating(stream, interval).map(move |x| match x {
Ok(item) => match item {
Some(x) => match x {
Ok(x) => match x {
StreamItem::DataItem(x) => match x {
RangeCompletableItem::Data(mut item) => {
let coll = coll.get_or_insert_with(|| item.new_collector());
coll.ingest(&mut item);
if coll.len() >= 128 || last_emit.elapsed() >= timeout_content_2 {
last_emit = Instant::now();
take_collector_result(coll).map(|x| Ok(x))
} else {
// Some(serde_json::Value::String(format!("coll len {}", coll.len())))
None
}
}
RangeCompletableItem::RangeComplete => None,
},
StreamItem::Log(x) => {
debug!("{x:?}");
// Some(serde_json::Value::String(format!("{x:?}")))
None
}
StreamItem::Stats(x) => {
debug!("{x:?}");
// Some(serde_json::Value::String(format!("{x:?}")))
None
}
},
Err(e) => Some(Err(e)),
},
None => {
if let Some(coll) = coll.as_mut() {
last_emit = Instant::now();
take_collector_result(coll).map(|x| Ok(x))
} else {
// Some(serde_json::Value::String(format!(
// "end of input but no collector to take something from"
// )))
None
}
}
},
Err(_) => {
if let Some(coll) = coll.as_mut() {
if coll.len() != 0 {
last_emit = Instant::now();
take_collector_result(coll).map(|x| Ok(x))
} else {
// Some(serde_json::Value::String(format!("timeout but nothing to do")))
None
}
} else {
// Some(serde_json::Value::String(format!("timeout but no collector")))
None
}
}
});
let stream = stream.filter_map(|x| futures_util::future::ready(x));
// TODO skip the intermediate conversion to js value, go directly to string data
let stream = stream.map(|x| match x {
Ok(x) => Ok(JsonBytes::new(serde_json::to_string(&x).unwrap())),
Err(e) => Err(crate::json_stream::Error::from(crate::json_stream::ErrMsg(e))),
});
Ok(Box::pin(stream))
}