Default to f32 binned

This commit is contained in:
Dominik Werder
2024-12-11 17:02:01 +01:00
parent 402c5e41ee
commit 08d1ba574b
7 changed files with 237 additions and 88 deletions

View File

@@ -1,5 +1,6 @@
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::streamitem::sitem_data;
use items_0::streamitem::RangeCompletableItem::*;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem::*;
@@ -10,17 +11,20 @@ use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
pub struct ConvertForBinning {
inp: Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>>,
pub struct ConvertForBinning<INP> {
inp: INP,
}
impl ConvertForBinning {
pub fn new(inp: Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>>) -> Self {
impl<INP> ConvertForBinning<INP> {
pub fn new(inp: INP) -> Self {
Self { inp }
}
}
impl Stream for ConvertForBinning {
impl<INP> Stream for ConvertForBinning<INP>
where
INP: Stream<Item = Sitemty<ChannelEvents>> + Unpin,
{
type Item = Sitemty<ChannelEvents>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
@@ -29,38 +33,44 @@ impl Stream for ConvertForBinning {
Ready(Some(item)) => match &item {
Ok(DataItem(Data(cevs))) => match cevs {
ChannelEvents::Events(evs) => {
if let Some(evs) = evs
.as_any_ref()
.downcast_ref::<ContainerEvents<EnumVariant>>()
{
let mut dst = ContainerEvents::new();
for (ts, val) in evs.iter_zip() {
dst.push_back(ts, val.ix);
}
let item = Ok(DataItem(Data(ChannelEvents::Events(Box::new(dst)))));
Ready(Some(item))
} else if let Some(evs) =
evs.as_any_ref().downcast_ref::<ContainerEvents<bool>>()
{
let mut dst = ContainerEvents::new();
for (ts, val) in evs.iter_zip() {
dst.push_back(ts, val as u8);
}
let item = Ok(DataItem(Data(ChannelEvents::Events(Box::new(dst)))));
Ready(Some(item))
} else if let Some(evs) =
evs.as_any_ref().downcast_ref::<ContainerEvents<String>>()
{
let mut dst = ContainerEvents::new();
for (ts, _) in evs.iter_zip() {
dst.push_back(ts, 1);
}
let item = Ok(DataItem(Data(ChannelEvents::Events(Box::new(dst)))));
Ready(Some(item))
} else {
Ready(Some(item))
}
let evs = evs.to_f32_for_binning_v01();
let item = ChannelEvents::Events(evs);
let item = sitem_data(item);
Ready(Some(item))
}
// ChannelEvents::Events(evs) => {
// if let Some(evs) = evs
// .as_any_ref()
// .downcast_ref::<ContainerEvents<EnumVariant>>()
// {
// let mut dst = ContainerEvents::new();
// for (ts, val) in evs.iter_zip() {
// dst.push_back(ts, val.ix);
// }
// let item = Ok(DataItem(Data(ChannelEvents::Events(Box::new(dst)))));
// Ready(Some(item))
// } else if let Some(evs) =
// evs.as_any_ref().downcast_ref::<ContainerEvents<bool>>()
// {
// let mut dst = ContainerEvents::new();
// for (ts, val) in evs.iter_zip() {
// dst.push_back(ts, val as u8);
// }
// let item = Ok(DataItem(Data(ChannelEvents::Events(Box::new(dst)))));
// Ready(Some(item))
// } else if let Some(evs) =
// evs.as_any_ref().downcast_ref::<ContainerEvents<String>>()
// {
// let mut dst = ContainerEvents::new();
// for (ts, _) in evs.iter_zip() {
// dst.push_back(ts, 1);
// }
// let item = Ok(DataItem(Data(ChannelEvents::Events(Box::new(dst)))));
// Ready(Some(item))
// } else {
// Ready(Some(item))
// }
// }
ChannelEvents::Status(_) => Ready(Some(item)),
},
_ => Ready(Some(item)),

View File

@@ -9,11 +9,12 @@ use netpod::ChannelTypeConfigGen;
use netpod::ReqCtx;
use query::api4::events::PlainEventsQuery;
#[derive(Debug, thiserror::Error)]
#[cstm(name = "PlainEventsCbor")]
pub enum Error {
Stream(#[from] crate::plaineventsstream::Error),
}
autoerr::create_error_v1!(
name(Error, "PlainEventsCbor"),
enum variants {
Stream(#[from] crate::plaineventsstream::Error),
},
);
pub async fn plain_events_cbor_stream(
evq: &PlainEventsQuery,

View File

@@ -3,27 +3,32 @@ use crate::timebin::cached::reader::EventsReading;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
use items_2::binning::container_events::ContainerEvents;
use items_2::binning::container_events::EventValueType;
use items_2::channelevents::ChannelEvents;
use netpod::range::evrange::NanoRange;
use query::api4::events::EventsSubQuery;
#[derive(Debug, thiserror::Error)]
#[cstm(name = "TestEventsReader")]
pub enum Error {}
pub struct TestEventsReader {
pub struct TestEventsReader<GEN> {
range: NanoRange,
gen: GEN,
}
impl TestEventsReader {
pub fn new(range: NanoRange) -> Self {
Self { range }
impl<GEN> TestEventsReader<GEN> {
pub fn new(range: NanoRange, gen: GEN) -> Self {
Self { range, gen }
}
}
impl EventsReadProvider for TestEventsReader {
impl<GEN, IT, TY> EventsReadProvider for TestEventsReader<GEN>
where
GEN: Fn(NanoRange) -> IT + Send + Sync,
IT: Iterator<Item = ContainerEvents<TY>> + Send + 'static,
TY: EventValueType,
{
fn read(&self, evq: EventsSubQuery) -> EventsReading {
let iter = items_2::testgen::events_gen::new_events_gen_dim1_f32_v00(self.range.clone());
// let iter = items_2::testgen::events_gen::new_events_gen_dim1_f32_v00(self.range.clone());
let iter = (self.gen)(self.range.clone());
let iter = iter
.map(|x| {
let x = Box::new(x);

View File

@@ -46,7 +46,10 @@ async fn timebin_from_layers_inner() -> Result<(), Error> {
end: 1000 * 1000 * 1000 * 2,
};
let cache_read_provider = Arc::new(DummyCacheReadProvider::new());
let events_read_provider = Arc::new(TestEventsReader::new(nano_range.clone()));
let events_read_provider = Arc::new(TestEventsReader::new(
nano_range.clone(),
items_2::testgen::events_gen::new_events_gen_dim1_f32_v00,
));
// let one_before_range = true;
// let series_range = SeriesRange::TimeRange(nano_range.clone());
// let select = EventsSubQuerySelect::new(
@@ -109,7 +112,10 @@ async fn timebin_from_layers_1layer_inner() -> Result<(), Error> {
end: 1000 * 1000 * 1000 * 2,
};
let cache_read_provider = Arc::new(DummyCacheReadProvider::new());
let events_read_provider = Arc::new(TestEventsReader::new(nano_range.clone()));
let events_read_provider = Arc::new(TestEventsReader::new(
nano_range.clone(),
items_2::testgen::events_gen::new_events_gen_dim1_f32_v00,
));
// let one_before_range = true;
// let series_range = SeriesRange::TimeRange(nano_range.clone());
// let select = EventsSubQuerySelect::new(

View File

@@ -16,12 +16,13 @@ use std::task::Poll;
macro_rules! trace_emit { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
#[derive(Debug, thiserror::Error)]
#[cstm(name = "ReadingBinnedFromEvents")]
pub enum Error {
ExpectTimerange,
ExpectTimeweighted,
}
autoerr::create_error_v1!(
name(Error, "ReadingBinnedFromEvents"),
enum variants {
ExpectTimerange,
ExpectTimeweighted,
},
);
pub struct BinnedFromEvents {
stream: Pin<Box<dyn Stream<Item = Sitemty<BinsBoxed>> + Send>>,
@@ -38,7 +39,7 @@ impl BinnedFromEvents {
return Err(Error::ExpectTimerange);
}
let stream = read_provider.read(evq);
let stream = ConvertForBinning::new(Box::pin(stream));
let stream = ConvertForBinning::new(stream);
let stream = if do_time_weight {
let stream = Box::pin(stream);
BinnedEventsTimeweightStream::new(range, stream)

View File

@@ -40,11 +40,8 @@ autoerr::create_error_v1!(
name(Error, "BinCachedGapFill"),
enum variants {
CacheReader(#[from] super::cached::reader::Error),
// #[error("GapFromFiner({0}, {1}, {2})")]
GapFromFiner(TsNano, TsNano, DtMs),
// #[error("MissingBegFromFiner({0}, {1}, {2})")]
MissingBegFromFiner(TsNano, TsNano, DtMs),
// #[error("InputBeforeRange({0}, {1})")]
InputBeforeRange(NanoRange, BinnedRange<TsNano>),
EventsReader(#[from] super::fromevents::Error),
},

View File

@@ -1,3 +1,4 @@
use crate::cbor_stream::CborStream;
use crate::collect::Collect;
use crate::collect::CollectResult;
use crate::json_stream::JsonStream;
@@ -9,6 +10,7 @@ use crate::tcprawclient::make_sub_query;
use crate::tcprawclient::OpenBoxedBytesStreamsBox;
use crate::timebin::cached::reader::CacheReadProvider;
use crate::timebin::cached::reader::EventsReadProvider;
use bytes::Bytes;
use futures_util::future::BoxFuture;
use futures_util::Stream;
use futures_util::StreamExt;
@@ -19,6 +21,7 @@ use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
use items_2::channelevents::ChannelEvents;
use items_2::jsonbytes::CborBytes;
use items_2::jsonbytes::JsonBytes;
use items_2::merger::Merger;
use netpod::log::*;
@@ -35,16 +38,17 @@ use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
#[derive(Debug, thiserror::Error)]
#[cstm(name = "TimebinnedJson")]
pub enum Error {
Query(#[from] query::api4::binned::Error),
FromLayers(#[from] super::timebin::fromlayers::Error),
TcpRawClient(#[from] crate::tcprawclient::Error),
Collect(#[from] crate::collect::Error),
Json(#[from] serde_json::Error),
Msg(String),
}
autoerr::create_error_v1!(
name(Error, "TimebinnedJson"),
enum variants {
Query(#[from] query::api4::binned::Error),
FromLayers(#[from] super::timebin::fromlayers::Error),
TcpRawClient(#[from] crate::tcprawclient::Error),
Collect(#[from] crate::collect::Error),
Json(#[from] serde_json::Error),
Msg(String),
},
);
struct ErrMsg<E>(E)
where
@@ -287,8 +291,10 @@ async fn timebinned_stream(
events_read_provider,
)?;
let stream = stream.map(|item| {
use items_0::timebin::BinningggContainerBinsDyn;
on_sitemty_data!(item, |x: Box<dyn BinningggContainerBinsDyn>| {
// use items_0::timebin::BinningggContainerBinsDyn;
use items_0::timebin::BinsBoxed;
on_sitemty_data!(item, |mut x: BinsBoxed| {
x.fix_numerics();
let ret = x.boxed_into_collectable_box();
Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret)))
})
@@ -346,7 +352,7 @@ pub async fn timebinned_json(
}
}
fn take_collector_result(
fn take_collector_result_json(
coll: &mut Box<dyn items_0::collect_s::CollectorDyn>,
) -> Option<JsonBytes> {
match coll.result() {
@@ -355,10 +361,45 @@ fn take_collector_result(
let val = x.into_serializable_json();
match serde_json::to_string(&val) {
Ok(jsval) => Some(JsonBytes::new(jsval)),
Err(e) => Some(JsonBytes::new("{\"ERROR\":true}")),
Err(e) => {
error!("{}", e);
Some(JsonBytes::new("{\"ERROR\":true}"))
}
}
}
Err(e) => Some(JsonBytes::new("{\"ERROR\":true}")),
Err(e) => {
error!("{}", e);
Some(JsonBytes::new("{\"ERROR\":true}"))
}
}
}
fn take_collector_result_cbor(
coll: &mut Box<dyn items_0::collect_s::CollectorDyn>,
) -> Option<CborBytes> {
match coll.result() {
Ok(collres) => {
let x = collres.into_user_facing_api_type_box();
let val = x.into_serializable_normal();
let mut buf = Vec::with_capacity(64);
ciborium::into_writer(&val, &mut buf).expect("cbor serialize");
let bytes = Bytes::from(buf);
let item = CborBytes::new(bytes);
Some(item)
}
Err(e) => {
error!("{}", e);
use ciborium::cbor;
let val = cbor!({
"ERROR" => true,
})
.unwrap();
let mut buf = Vec::with_capacity(64);
ciborium::into_writer(&val, &mut buf).expect("cbor serialize");
let bytes = Bytes::from(buf);
let item = CborBytes::new(bytes);
Some(item)
}
}
}
@@ -408,7 +449,7 @@ pub async fn timebinned_json_framed(
coll.ingest(&mut item);
if coll.len() >= 128 || last_emit.elapsed() >= timeout_content_2 {
last_emit = Instant::now();
take_collector_result(coll).map(|x| Ok(x))
take_collector_result_json(coll).map(|x| Ok(x))
} else {
// Some(serde_json::Value::String(format!("coll len {}", coll.len())))
None
@@ -432,7 +473,7 @@ pub async fn timebinned_json_framed(
None => {
if let Some(coll) = coll.as_mut() {
last_emit = Instant::now();
take_collector_result(coll).map(|x| Ok(x))
take_collector_result_json(coll).map(|x| Ok(x))
} else {
// Some(serde_json::Value::String(format!(
// "end of input but no collector to take something from"
@@ -445,7 +486,7 @@ pub async fn timebinned_json_framed(
if let Some(coll) = coll.as_mut() {
if coll.len() != 0 {
last_emit = Instant::now();
take_collector_result(coll).map(|x| Ok(x))
take_collector_result_json(coll).map(|x| Ok(x))
} else {
// Some(serde_json::Value::String(format!("timeout but nothing to do")))
None
@@ -458,13 +499,101 @@ pub async fn timebinned_json_framed(
}
});
let stream = stream.filter_map(|x| futures_util::future::ready(x));
// TODO skip the intermediate conversion to js value, go directly to string data
// let stream = stream.map(|x| match x {
// Ok(x) => Ok(JsonBytes::new(serde_json::to_string(&x).unwrap())),
// Err(e) => Err(crate::json_stream::Error::from(crate::json_stream::ErrMsg(
// e,
// ))),
// });
let stream = stream.map_err(|e| crate::json_stream::Error::Msg(e.to_string()));
Ok(Box::pin(stream))
}
pub async fn timebinned_cbor_framed(
query: BinnedQuery,
ch_conf: ChannelTypeConfigGen,
ctx: &ReqCtx,
cache_read_provider: Arc<dyn CacheReadProvider>,
events_read_provider: Arc<dyn EventsReadProvider>,
timeout_provider: Box<dyn StreamTimeout2>,
) -> Result<CborStream, Error> {
let binned_range = query.covering_range()?;
// TODO derive better values, from query
let stream = timebinned_stream(
query.clone(),
binned_range.clone(),
ch_conf,
ctx,
cache_read_provider,
events_read_provider,
)
.await?;
let timeout_content_base = query
.timeout_content()
.unwrap_or(Duration::from_millis(1000))
.min(Duration::from_millis(5000))
.max(Duration::from_millis(100));
let timeout_content_2 = timeout_content_base * 2 / 3;
let mut coll = None;
let mut last_emit = Instant::now();
let stream = stream
.map(|x| Some(x))
.chain(futures_util::stream::iter([None]));
let stream = TimeoutableStream::new(timeout_content_base, timeout_provider, stream);
let stream = stream.map(move |x| {
match x {
Some(x) => match x {
Some(x) => match x {
Ok(x) => match x {
StreamItem::DataItem(x) => match x {
RangeCompletableItem::Data(mut item) => {
let coll = coll.get_or_insert_with(|| item.new_collector());
coll.ingest(&mut item);
if coll.len() >= 128 || last_emit.elapsed() >= timeout_content_2 {
last_emit = Instant::now();
take_collector_result_cbor(coll).map(|x| Ok(x))
} else {
None
}
}
RangeCompletableItem::RangeComplete => None,
},
StreamItem::Log(x) => {
debug!("{x:?}");
// Some(serde_json::Value::String(format!("{x:?}")))
None
}
StreamItem::Stats(x) => {
debug!("{x:?}");
// Some(serde_json::Value::String(format!("{x:?}")))
None
}
},
Err(e) => Some(Err(e)),
},
None => {
if let Some(coll) = coll.as_mut() {
last_emit = Instant::now();
take_collector_result_cbor(coll).map(|x| Ok(x))
} else {
// Some(serde_json::Value::String(format!(
// "end of input but no collector to take something from"
// )))
None
}
}
},
None => {
if let Some(coll) = coll.as_mut() {
if coll.len() != 0 {
last_emit = Instant::now();
take_collector_result_cbor(coll).map(|x| Ok(x))
} else {
// Some(serde_json::Value::String(format!("timeout but nothing to do")))
None
}
} else {
// Some(serde_json::Value::String(format!("timeout but no collector")))
None
}
}
}
});
let stream = stream.filter_map(|x| futures_util::future::ready(x));
let stream = stream.map_err(|e| crate::cbor_stream::Error::Msg(e.to_string()));
Ok(Box::pin(stream))
}