Fix warnings

This commit is contained in:
Dominik Werder
2024-11-12 16:36:31 +01:00
parent bc1818e496
commit bb2a3adc97
10 changed files with 199 additions and 172 deletions

View File

@@ -1,7 +1,7 @@
use crate::tcprawclient::OpenBoxedBytesStreamsBox;
use crate::timebin::cached::reader::CacheReadProvider;
use crate::timebin::cached::reader::EventsReadProvider;
use crate::timebin::cached::reader::EventsReading;
use crate::timebin::CacheReadProvider;
use futures_util::Future;
use futures_util::FutureExt;
use futures_util::Stream;
@@ -73,7 +73,9 @@ impl EventsReadProvider for SfDatabufferEventReadProvider {
fn read(&self, evq: EventsSubQuery) -> EventsReading {
let range = match evq.range() {
netpod::range::evrange::SeriesRange::TimeRange(x) => x.clone(),
netpod::range::evrange::SeriesRange::PulseRange(_) => panic!("not available for pulse range"),
netpod::range::evrange::SeriesRange::PulseRange(_) => {
panic!("not available for pulse range")
}
};
let ctx = self.ctx.clone();
let open_bytes = self.open_bytes.clone();
@@ -104,19 +106,24 @@ impl DummyCacheReadProvider {
}
}
// TODO impl
impl CacheReadProvider for DummyCacheReadProvider {
fn read(
&self,
series: u64,
bin_len: netpod::DtMs,
msp: u64,
offs: std::ops::Range<u32>,
_series: u64,
_bin_len: netpod::DtMs,
_msp: u64,
_offs: std::ops::Range<u32>,
) -> crate::timebin::cached::reader::CacheReading {
let stream = futures_util::future::ready(Ok(None));
crate::timebin::cached::reader::CacheReading::new(Box::pin(stream))
}
fn write(&self, series: u64, bins: items_0::timebin::BinsBoxed) -> crate::timebin::cached::reader::CacheWriting {
fn write(
&self,
_series: u64,
_bins: items_0::timebin::BinsBoxed,
) -> crate::timebin::cached::reader::CacheWriting {
let fut = futures_util::future::ready(Ok(()));
crate::timebin::cached::reader::CacheWriting::new(Box::pin(fut))
}

View File

@@ -30,3 +30,8 @@ pub mod teststream;
pub mod timebin;
pub mod timebinnedjson;
pub mod transform;
#[allow(unused)]
fn todoval<T>() -> T {
todo!()
}

View File

@@ -1,25 +1,3 @@
use crate::collect::Collect;
use crate::collect::CollectResult;
use crate::test::runfut;
use crate::transform::build_event_transform;
use crate::transform::EventsToTimeBinnable;
use futures_util::stream;
use futures_util::StreamExt;
use items_0::on_sitemty_data;
use items_0::streamitem::sitem_data;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::StreamItem;
use items_0::WithLen;
use items_2::eventsdim0::EventsDim0CollectorOutput;
use items_2::streams::PlainEventStream;
use items_2::testgen::make_some_boxed_d0_f32;
use netpod::log::*;
use netpod::timeunits::SEC;
use netpod::FromUrl;
use query::transform::TransformQuery;
use std::time::Duration;
use std::time::Instant;
// #[test]
// fn collect_channel_events_00() -> Result<(), Error> {
// let fut = async {

View File

@@ -55,12 +55,22 @@ async fn merged_events_inner() -> Result<(), Error> {
let evq = PlainEventsQuery::new(channel, range);
let open_bytes = StreamOpener::new();
let open_bytes = Arc::pin(open_bytes);
let stream = plain_events_cbor_stream(&evq, ch_conf.clone().into(), &ctx, open_bytes, todo!())
.await
.unwrap();
let timeout_provider = crate::todoval();
let stream = plain_events_cbor_stream(
&evq,
ch_conf.clone().into(),
&ctx,
open_bytes,
timeout_provider,
)
.await
.unwrap();
let stream = lenframed::length_framed(stream);
let stream =
FramedBytesToSitemtyDynEventsStream::new(stream, ch_conf.scalar_type().clone(), ch_conf.shape().clone());
let stream = FramedBytesToSitemtyDynEventsStream::new(
stream,
ch_conf.scalar_type().clone(),
ch_conf.shape().clone(),
);
let stream = only_first_err(stream);
stream
.for_each(|item| {
@@ -84,7 +94,9 @@ impl OpenBoxedBytesStreams for StreamOpener {
&self,
subq: EventsSubQuery,
_ctx: ReqCtx,
) -> Pin<Box<dyn Future<Output = Result<Vec<BoxedBytesStream>, crate::tcprawclient::Error>> + Send>> {
) -> Pin<
Box<dyn Future<Output = Result<Vec<BoxedBytesStream>, crate::tcprawclient::Error>> + Send>,
> {
Box::pin(stream_opener(subq).map_err(|e| crate::tcprawclient::Error::Msg(format!("{e}"))))
}
}

View File

@@ -8,7 +8,11 @@ use netpod::range::evrange::SeriesRange;
use query::api4::events::EventsSubQuery;
use std::pin::Pin;
fn make_stream(chname: &str, range: &SeriesRange) -> Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>> {
fn make_stream(
chname: &str,
range: &SeriesRange,
) -> Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>> {
let _ = &range;
if chname == "unittest;scylla;cont;scalar;f32" {
let e = sitem_err2_from_string(format!("unknown channel {chname}"));
let ret = futures_util::stream::iter([Err(e)]);

View File

@@ -1,13 +1,8 @@
pub mod cached;
pub mod fromevents;
pub mod fromlayers;
pub mod timebin;
mod basic;
pub(super) mod fromlayers;
mod gapfill;
mod grid;
pub(super) use basic::TimeBinnedStream;
pub(super) use fromlayers::TimeBinnedFromLayers;
pub use cached::reader::CacheReadProvider;

View File

@@ -28,7 +28,6 @@ pub enum Error {
MissingBinnerAfterProcessItem,
CreateEmpty,
NoBinnerAfterInputDone,
Stream,
Msg(String),
}
@@ -66,7 +65,8 @@ impl<T> TimeBinnedStream<T>
where
T: TimeBinnableTy,
{
pub fn new(inp: SitemtyStream<T>, range: BinnedRangeEnum, do_time_weight: bool) -> Self {
#[allow(unused)]
fn new(inp: SitemtyStream<T>, range: BinnedRangeEnum, do_time_weight: bool) -> Self {
Self {
inp,
range,
@@ -85,7 +85,8 @@ where
trace2!("process_item {item:?}");
if self.binner.is_none() {
trace!("process_item call time_binner_new");
let binner = item.time_binner_new(self.range.clone(), self.do_time_weight, emit_empty_bins);
let binner =
item.time_binner_new(self.range.clone(), self.do_time_weight, emit_empty_bins);
self.binner = Some(binner);
}
let binner = self.binner.as_mut().unwrap();
@@ -96,7 +97,10 @@ where
fn handle_data_item(
&mut self,
item: T,
) -> Result<ControlFlow<Poll<Sitemty<<<T as TimeBinnableTy>::TimeBinner as TimeBinnerTy>::Output>>>, Error> {
) -> Result<
ControlFlow<Poll<Sitemty<<<T as TimeBinnableTy>::TimeBinner as TimeBinnerTy>::Output>>>,
Error,
> {
use ControlFlow::*;
use Poll::*;
trace2!("================= handle_data_item");
@@ -147,7 +151,10 @@ where
fn handle_item(
&mut self,
item: Sitemty<T>,
) -> Result<ControlFlow<Poll<Sitemty<<<T as TimeBinnableTy>::TimeBinner as TimeBinnerTy>::Output>>>, Error> {
) -> Result<
ControlFlow<Poll<Sitemty<<<T as TimeBinnableTy>::TimeBinner as TimeBinnerTy>::Output>>>,
Error,
> {
use ControlFlow::*;
use Poll::*;
trace2!("================= handle_item");
@@ -174,19 +181,28 @@ where
fn handle_none(
&mut self,
) -> Result<ControlFlow<Poll<Sitemty<<<T as TimeBinnableTy>::TimeBinner as TimeBinnerTy>::Output>>>, Error> {
) -> Result<
ControlFlow<Poll<Sitemty<<<T as TimeBinnableTy>::TimeBinner as TimeBinnerTy>::Output>>>,
Error,
> {
use ControlFlow::*;
use Poll::*;
trace2!("================= handle_none");
let self_range_final = self.range_final;
if let Some(binner) = self.binner.as_mut() {
trace2!("bins ready count before finish {}", binner.bins_ready_count());
trace2!(
"bins ready count before finish {}",
binner.bins_ready_count()
);
// TODO rework the finish logic
if self_range_final {
binner.set_range_complete();
}
binner.push_in_progress(false);
trace2!("bins ready count after finish {}", binner.bins_ready_count());
trace2!(
"bins ready count after finish {}",
binner.bins_ready_count()
);
if let Some(bins) = binner.bins_ready() {
self.done_data = true;
Ok(Break(Ready(sitem_data(bins))))
@@ -216,7 +232,10 @@ where
fn poll_input(
&mut self,
cx: &mut Context,
) -> Result<ControlFlow<Poll<Sitemty<<<T as TimeBinnableTy>::TimeBinner as TimeBinnerTy>::Output>>>, Error> {
) -> Result<
ControlFlow<Poll<Sitemty<<<T as TimeBinnableTy>::TimeBinner as TimeBinnerTy>::Output>>>,
Error,
> {
use ControlFlow::*;
use Poll::*;
trace2!("================= poll_input");
@@ -250,7 +269,9 @@ where
self.done = true;
if self.range_final {
info!("TimeBinnedStream EMIT RANGE FINAL");
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))))
Ready(Some(Ok(StreamItem::DataItem(
RangeCompletableItem::RangeComplete,
))))
} else {
continue;
}

View File

@@ -105,11 +105,15 @@ impl GapFill {
debug_init!("new dbgname {}", dbgname);
let inp = if cache_usage.is_cache_read() {
let series = ch_conf.series().expect("series id for cache read");
let stream = super::cached::reader::CachedReader::new(series, range.clone(), cache_read_provider.clone())?
.map(|x| match x {
Ok(x) => Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))),
Err(e) => sitem_err_from_string(e),
});
let stream = super::cached::reader::CachedReader::new(
series,
range.clone(),
cache_read_provider.clone(),
)?
.map(|x| match x {
Ok(x) => Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))),
Err(e) => sitem_err_from_string(e),
});
Box::pin(stream) as Pin<Box<dyn Stream<Item = Sitemty<BinsBoxed>> + Send>>
} else {
let stream = futures_util::stream::empty();
@@ -164,7 +168,9 @@ impl GapFill {
}
if bins.len() != 0 {
let mut bins2 = bins.clone();
let dst = self.bins_for_cache_write.get_or_insert_with(|| bins.empty());
let dst = self
.bins_for_cache_write
.get_or_insert_with(|| bins.empty());
bins2.drain_into(dst.as_mut(), 0..bins2.len());
}
if self.cache_usage.is_cache_write() {
@@ -196,7 +202,12 @@ impl GapFill {
}
if let Some(last) = self.last_bin_ts2 {
if ts1 != last {
trace_handle!("{} detect a gap BETWEEN last {} ts1 {}", self.dbgname, last, ts1);
trace_handle!(
"{} detect a gap BETWEEN last {} ts1 {}",
self.dbgname,
last,
ts1
);
let mut ret = bins.empty();
let mut bins = bins;
bins.drain_into(ret.as_mut(), 0..i);
@@ -229,14 +240,19 @@ impl GapFill {
Ok(bins)
}
fn setup_inp_finer(mut self: Pin<&mut Self>, range: NanoRange, inp_finer_fills_gap: bool) -> Result<(), Error> {
fn setup_inp_finer(
mut self: Pin<&mut Self>,
range: NanoRange,
inp_finer_fills_gap: bool,
) -> Result<(), Error> {
self.inp_finer_range_final = false;
self.inp_finer_range_final_max += 1;
self.inp_finer_fills_gap = inp_finer_fills_gap;
self.exp_finer_range = range.clone();
if let Some(bin_len_finer) =
super::grid::find_next_finer_bin_len(self.range.bin_len.to_dt_ms(), &self.bin_len_layers)
{
if let Some(bin_len_finer) = super::grid::find_next_finer_bin_len(
self.range.bin_len.to_dt_ms(),
&self.bin_len_layers,
) {
debug_setup!(
"{} setup_inp_finer next finer from bins {} {} from {}",
self.dbgname,
@@ -261,7 +277,10 @@ impl GapFill {
self.events_read_provider.clone(),
)?;
let stream = Box::pin(inp_finer);
let range = BinnedRange::from_nano_range(range_finer.full_range(), self.range.bin_len.to_dt_ms());
let range = BinnedRange::from_nano_range(
range_finer.full_range(),
self.range.bin_len.to_dt_ms(),
);
let stream = if self.do_time_weight {
BinnedBinsTimeweightStream::new(range, stream)
} else {
@@ -269,7 +288,11 @@ impl GapFill {
};
self.inp_finer = Some(Box::pin(stream));
} else {
debug_setup!("{} setup_inp_finer next finer from events {}", self.dbgname, range);
debug_setup!(
"{} setup_inp_finer next finer from events {}",
self.dbgname,
range
);
let series_range = SeriesRange::TimeRange(range.clone());
let one_before_range = true;
let select = EventsSubQuerySelect::new(
@@ -285,19 +308,17 @@ impl GapFill {
self.log_level.clone(),
);
let range = BinnedRange::from_nano_range(range.clone(), self.range.bin_len.to_dt_ms());
let inp = BinnedFromEvents::new(range, evq, self.do_time_weight, self.events_read_provider.clone())?;
let inp = BinnedFromEvents::new(
range,
evq,
self.do_time_weight,
self.events_read_provider.clone(),
)?;
self.inp_finer = Some(Box::pin(inp));
}
Ok(())
}
fn cache_write(mut self: Pin<&mut Self>, bins: BinsBoxed) -> Result<(), Error> {
// TODO emit bins that are ready for cache write into some separate channel
let series = todo!();
self.cache_writing = Some(self.cache_read_provider.write(series, bins));
Ok(())
}
fn cache_write_on_end(mut self: Pin<&mut Self>) -> Result<(), Error> {
if self.inp_finer_fills_gap {
// TODO can consider all incoming bins as final by assumption.
@@ -346,7 +367,9 @@ impl Stream for GapFill {
Ready(Some(Ok(x))) => match x {
StreamItem::DataItem(RangeCompletableItem::Data(x)) => {
match self.as_mut().handle_bins_finer(x) {
Ok(x) => Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))))),
Ok(x) => Ready(Some(Ok(StreamItem::DataItem(
RangeCompletableItem::Data(x),
)))),
Err(e) => Ready(Some(sitem_err_from_string(e))),
}
}
@@ -373,8 +396,10 @@ impl Stream for GapFill {
self.dbgname,
self.last_bin_ts2
);
let exp_finer_range =
::core::mem::replace(&mut self.exp_finer_range, NanoRange { beg: 0, end: 0 });
let exp_finer_range = ::core::mem::replace(
&mut self.exp_finer_range,
NanoRange { beg: 0, end: 0 },
);
self.inp_finer = None;
if let Some(j) = self.last_bin_ts2 {
if j.ns() != exp_finer_range.end() {
@@ -385,7 +410,9 @@ impl Stream for GapFill {
exp_finer_range
);
if self.inp_finer_fills_gap {
Ready(Some(sitem_err_from_string("finer input didn't deliver to the end")))
Ready(Some(sitem_err_from_string(
"finer input didn't deliver to the end",
)))
} else {
warn!(
"{} inp_finer Ready(None) last_bin_ts2 {:?} not delivered to the end, but maybe in the future",
@@ -416,16 +443,22 @@ impl Stream for GapFill {
}
} else if let Some(x) = self.inp_buf.take() {
match self.as_mut().handle_bins_finer(x) {
Ok(x) => Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))))),
Ok(x) => Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(
x,
))))),
Err(e) => Ready(Some(sitem_err_from_string(e))),
}
} else if let Some(inp) = self.inp.as_mut() {
match inp.poll_next_unpin(cx) {
Ready(Some(Ok(x))) => match x {
StreamItem::DataItem(RangeCompletableItem::Data(x)) => match self.as_mut().handle_bins(x) {
Ok(x) => Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))))),
Err(e) => Ready(Some(sitem_err_from_string(e))),
},
StreamItem::DataItem(RangeCompletableItem::Data(x)) => {
match self.as_mut().handle_bins(x) {
Ok(x) => Ready(Some(Ok(StreamItem::DataItem(
RangeCompletableItem::Data(x),
)))),
Err(e) => Ready(Some(sitem_err_from_string(e))),
}
}
StreamItem::DataItem(RangeCompletableItem::RangeComplete) => {
self.inp_range_final = true;
continue;
@@ -478,7 +511,9 @@ impl Stream for GapFill {
self.done = true;
if self.inp_finer_range_final_cnt == self.inp_finer_range_final_max {
trace_handle!("{} range finale all", self.dbgname);
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))))
Ready(Some(Ok(StreamItem::DataItem(
RangeCompletableItem::RangeComplete,
))))
} else {
trace_handle!("{} substreams not final", self.dbgname);
continue;

View File

@@ -8,8 +8,8 @@ use crate::streamtimeout::TimeoutableStream;
use crate::tcprawclient::container_stream_from_bytes_stream;
use crate::tcprawclient::make_sub_query;
use crate::tcprawclient::OpenBoxedBytesStreamsBox;
use crate::timebin::cached::reader::CacheReadProvider;
use crate::timebin::cached::reader::EventsReadProvider;
use crate::timebin::CacheReadProvider;
use crate::transform::build_merged_event_transform;
use futures_util::future::BoxFuture;
use futures_util::Stream;
@@ -63,7 +63,9 @@ where
}
#[allow(unused)]
fn assert_stream_send<'u, R>(stream: impl 'u + Send + Stream<Item = R>) -> impl 'u + Send + Stream<Item = R> {
fn assert_stream_send<'u, R>(
stream: impl 'u + Send + Stream<Item = R>,
) -> impl 'u + Send + Stream<Item = R> {
stream
}
@@ -92,7 +94,11 @@ pub async fn timebinnable_stream_sf_databuffer_channelevents(
let bytes_streams = open_bytes.open(subq, ctx.as_ref().clone()).await?;
let mut inps = Vec::new();
for s in bytes_streams {
let s = container_stream_from_bytes_stream::<ChannelEvents>(s, inmem_bufcap.clone(), "TODOdbgdesc".into())?;
let s = container_stream_from_bytes_stream::<ChannelEvents>(
s,
inmem_bufcap.clone(),
"TODOdbgdesc".into(),
)?;
let s = Box::pin(s) as Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>>;
inps.push(s);
}
@@ -110,9 +116,9 @@ pub async fn timebinnable_stream_sf_databuffer_channelevents(
// let k: Box<dyn Events> = Box::new(k);
let k = k.to_dim0_f32_for_binning();
let k = tr.0.transform(k);
Ok(StreamItem::DataItem(RangeCompletableItem::Data(ChannelEvents::Events(
k,
))))
Ok(StreamItem::DataItem(RangeCompletableItem::Data(
ChannelEvents::Events(k),
)))
}
_ => k,
}
@@ -136,7 +142,8 @@ pub async fn timebinnable_stream_sf_databuffer_channelevents(
let mut store = wasmer::Store::default();
let module = wasmer::Module::new(&store, wasm).unwrap();
// TODO assert that memory is large enough
let memory = wasmer::Memory::new(&mut store, wasmer::MemoryType::new(10, Some(30), false)).unwrap();
let memory =
wasmer::Memory::new(&mut store, wasmer::MemoryType::new(10, Some(30), false)).unwrap();
let import_object = wasmer::imports! {
"env" => {
"memory" => memory.clone(),
@@ -170,15 +177,24 @@ pub async fn timebinnable_stream_sf_databuffer_channelevents(
.as_any_mut()
.downcast_mut::<Box<items_2::eventsdim0::EventsDim0<f64>>>()
.is_some();
let r5 = evs.as_mut().as_any_mut().downcast_mut::<ChannelEvents>().is_some();
let r6 = evs.as_mut().as_any_mut().downcast_mut::<Box<ChannelEvents>>().is_some();
let r5 = evs
.as_mut()
.as_any_mut()
.downcast_mut::<ChannelEvents>()
.is_some();
let r6 = evs
.as_mut()
.as_any_mut()
.downcast_mut::<Box<ChannelEvents>>()
.is_some();
debug!("wasm castings: {r1} {r2} {r3} {r4} {r5} {r6}");
}
if let Some(evs) = evs.as_any_mut().downcast_mut::<ChannelEvents>() {
match evs {
ChannelEvents::Events(evs) => {
if let Some(evs) =
evs.as_any_mut().downcast_mut::<items_2::eventsdim0::EventsDim0<f64>>()
if let Some(evs) = evs
.as_any_mut()
.downcast_mut::<items_2::eventsdim0::EventsDim0<f64>>()
{
use items_0::WithLen;
if evs.len() == 0 {
@@ -186,7 +202,8 @@ pub async fn timebinnable_stream_sf_databuffer_channelevents(
} else {
debug!("wasm see EventsDim0<f64> len {}", evs.len());
let max_len_needed = 16000;
let dummy1 = instance.exports.get_function("dummy1").unwrap();
let dummy1 =
instance.exports.get_function("dummy1").unwrap();
let s = evs.values.as_mut_slices();
for sl in [s.0, s.1] {
if sl.len() > max_len_needed as _ {
@@ -196,12 +213,20 @@ pub async fn timebinnable_stream_sf_databuffer_channelevents(
let wmemoff = buffer_ptr as u64;
let view = memory.view(&store);
// TODO is the offset bytes or elements?
let wsl = WasmSlice::<f64>::new(&view, wmemoff, sl.len() as _).unwrap();
let wsl = WasmSlice::<f64>::new(
&view,
wmemoff,
sl.len() as _,
)
.unwrap();
// debug!("wasm pages {:?} data size {:?}", view.size(), view.data_size());
wsl.write_slice(&sl).unwrap();
let ptr = wsl.as_ptr32();
debug!("ptr {:?} offset {}", ptr, ptr.offset());
let params = [Value::I32(ptr.offset() as _), Value::I32(sl.len() as _)];
let params = [
Value::I32(ptr.offset() as _),
Value::I32(sl.len() as _),
];
let res = dummy1.call(&mut store, &params).unwrap();
match res[0] {
Value::I32(x) => {
@@ -216,7 +241,12 @@ pub async fn timebinnable_stream_sf_databuffer_channelevents(
}
// Init the slice again because we need to drop ownership for the function call.
let view = memory.view(&store);
let wsl = WasmSlice::<f64>::new(&view, wmemoff, sl.len() as _).unwrap();
let wsl = WasmSlice::<f64>::new(
&view,
wmemoff,
sl.len() as _,
)
.unwrap();
wsl.read_slice(sl).unwrap();
}
}
@@ -263,7 +293,7 @@ async fn timebinned_stream(
} else {
netpod::time_bin_len_cache_opts().to_vec()
};
let stream = crate::timebin::TimeBinnedFromLayers::new(
let stream = crate::timebin::fromlayers::TimeBinnedFromLayers::new(
ch_conf,
cache_usage,
query.transform().clone(),
@@ -328,17 +358,6 @@ pub async fn timebinned_json(
let collres = collected.await?;
match collres {
CollectResult::Some(collres) => {
let collres = if let Some(_bins) = collres
.as_any_ref()
.downcast_ref::<items_2::binsdim0::BinsDim0CollectedResult<netpod::EnumVariant>>()
{
debug!("unexpected binned enum");
// bins.boxed_collected_with_enum_fix()
collres
} else {
debug!("timebinned_json collected type_name {:?}", collres.type_name());
collres
};
let jsval = collres.to_json_value()?;
Ok(CollectResult::Some(jsval))
}
@@ -346,24 +365,14 @@ pub async fn timebinned_json(
}
}
fn take_collector_result(coll: &mut Box<dyn items_0::collect_s::CollectorDyn>) -> Option<serde_json::Value> {
fn take_collector_result(
coll: &mut Box<dyn items_0::collect_s::CollectorDyn>,
) -> Option<serde_json::Value> {
match coll.result(None, None) {
Ok(collres) => {
let collres = if let Some(_bins) = collres
.as_any_ref()
.downcast_ref::<items_2::binsdim0::BinsDim0CollectedResult<netpod::EnumVariant>>()
{
warn!("unexpected binned enum");
// bins.boxed_collected_with_enum_fix()
collres
} else {
collres
};
match collres.to_json_value() {
Ok(val) => Some(val),
Err(e) => Some(serde_json::Value::String(format!("{e}"))),
}
}
Ok(collres) => match collres.to_json_value() {
Ok(val) => Some(val),
Err(e) => Some(serde_json::Value::String(format!("{e}"))),
},
Err(e) => Some(serde_json::Value::String(format!("{e}"))),
}
}
@@ -400,7 +409,9 @@ pub async fn timebinned_json_framed(
let timeout_content_2 = timeout_content_base * 2 / 3;
let mut coll = None;
let mut last_emit = Instant::now();
let stream = stream.map(|x| Some(x)).chain(futures_util::stream::iter([None]));
let stream = stream
.map(|x| Some(x))
.chain(futures_util::stream::iter([None]));
let stream = TimeoutableStream::new(timeout_content_base, timeout_provider, stream);
let stream = stream.map(move |x| {
match x {
@@ -466,7 +477,9 @@ pub async fn timebinned_json_framed(
// TODO skip the intermediate conversion to js value, go directly to string data
let stream = stream.map(|x| match x {
Ok(x) => Ok(JsonBytes::new(serde_json::to_string(&x).unwrap())),
Err(e) => Err(crate::json_stream::Error::from(crate::json_stream::ErrMsg(e))),
Err(e) => Err(crate::json_stream::Error::from(crate::json_stream::ErrMsg(
e,
))),
});
Ok(Box::pin(stream))
}

View File

@@ -1,11 +1,3 @@
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::collect_s::CollectableDyn;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
use items_0::transform::CollectableStreamBox;
use items_0::transform::EventStreamBox;
use items_0::transform::EventStreamTrait;
use items_0::transform::TransformEvent;
use items_0::transform::TransformProperties;
@@ -14,7 +6,6 @@ use items_2::transform::make_transform_identity;
use items_2::transform::make_transform_min_max_avg;
use items_2::transform::make_transform_pulse_id_diff;
use query::transform::EventTransformQuery;
use query::transform::TimeBinningTransformQuery;
use query::transform::TransformQuery;
use std::pin::Pin;
@@ -64,37 +55,3 @@ impl WithTransformProperties for EventsToTimeBinnable {
self.inp.query_transform_properties()
}
}
pub fn build_full_transform_collectable(
tr: &TransformQuery,
inp: EventStreamBox,
) -> Result<CollectableStreamBox, Error> {
// TODO this must return a Stream!
//let evs = build_event_transform(tr, inp)?;
let trtb = tr.get_tr_time_binning();
let a: Pin<Box<dyn Stream<Item = Sitemty<Box<dyn CollectableDyn>>> + Send>> =
Box::pin(inp.0.map(|item| match item {
Ok(item) => match item {
StreamItem::DataItem(item) => match item {
RangeCompletableItem::Data(item) => {
let item: Box<dyn CollectableDyn> = Box::new(item);
Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))
}
RangeCompletableItem::RangeComplete => {
Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))
}
},
StreamItem::Log(item) => Ok(StreamItem::Log(item)),
StreamItem::Stats(item) => Ok(StreamItem::Stats(item)),
},
Err(e) => Err(e),
}));
let stream: Pin<Box<dyn Stream<Item = Sitemty<Box<dyn CollectableDyn>>> + Send>> =
Box::pin(futures_util::stream::empty());
let stream = Box::pin(futures_util::stream::empty()) as _;
match trtb {
TimeBinningTransformQuery::None => Ok(CollectableStreamBox(stream)),
TimeBinningTransformQuery::TimeWeighted => todo!(),
TimeBinningTransformQuery::Unweighted => todo!(),
}
}