WIP new container

This commit is contained in:
Dominik Werder
2024-11-24 22:32:42 +01:00
parent a7cd1977dc
commit 760e0abed4
22 changed files with 392 additions and 221 deletions

View File

@@ -33,7 +33,9 @@ where
Ok(item) => Ok(match item {
StreamItem::DataItem(item) => StreamItem::DataItem(match item {
RangeCompletableItem::RangeComplete => RangeCompletableItem::RangeComplete,
RangeCompletableItem::Data(item) => RangeCompletableItem::Data(Box::new(item)),
RangeCompletableItem::Data(item) => {
RangeCompletableItem::Data(Box::new(item))
}
}),
StreamItem::Log(item) => StreamItem::Log(item),
StreamItem::Stats(item) => StreamItem::Stats(item),

View File

@@ -12,12 +12,17 @@ use items_0::streamitem::LogItem;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
use items_0::AsAnyMut;
use items_0::AsAnyRef;
use items_0::Events;
use items_0::WithLen;
use items_2::binning::container_events::ContainerEvents;
use items_2::channelevents::ChannelEvents;
use items_2::eventsdim0::EventsDim0;
use items_2::eventsdim1::EventsDim1;
use netpod::log::Level;
use netpod::log::*;
use netpod::EnumVariant;
use netpod::ScalarType;
use netpod::Shape;
use std::io::Cursor;
@@ -102,41 +107,46 @@ fn map_events(x: Sitemty<Box<dyn Events>>) -> Result<CborBytes, Error> {
StreamItem::DataItem(x) => match x {
RangeCompletableItem::Data(evs) => {
if false {
use items_0::AsAnyRef;
// TODO impl generically on EventsDim0 ?
if let Some(evs) = evs.as_any_ref().downcast_ref::<items_2::eventsdim0::EventsDim0<f64>>() {
if let Some(evs) = evs.as_any_ref().downcast_ref::<ContainerEvents<f64>>() {
let mut buf = Vec::new();
ciborium::into_writer(evs, &mut buf).map_err(|e| Error::Msg(e.to_string()))?;
ciborium::into_writer(evs, &mut buf)
.map_err(|e| Error::Msg(e.to_string()))?;
let bytes = Bytes::from(buf);
let _item = CborBytes(bytes);
// Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))
} else {
let _item = LogItem::from_node(0, Level::DEBUG, format!("cbor stream discarded item"));
let _item = LogItem::from_node(
0,
Level::DEBUG,
format!("cbor stream discarded item"),
);
// Ok(StreamItem::Log(item))
};
}
let mut k = evs;
let evs = if let Some(j) = k.as_any_mut().downcast_mut::<items_2::channelevents::ChannelEvents>() {
use items_0::AsAnyMut;
let evs = if let Some(j) = k.as_any_mut().downcast_mut::<ChannelEvents>() {
match j {
items_2::channelevents::ChannelEvents::Events(m) => {
ChannelEvents::Events(m) => {
if let Some(g) = m
.as_any_mut()
.downcast_mut::<items_2::eventsdim0::EventsDim0<netpod::EnumVariant>>()
.downcast_mut::<ContainerEvents<EnumVariant>>()
{
trace!("consider container EnumVariant");
let mut out = items_2::eventsdim0enum::EventsDim0Enum::new();
for (&ts, val) in g.tss.iter().zip(g.values.iter()) {
out.push_back(ts, val.ix(), val.name_string());
}
Box::new(items_2::channelevents::ChannelEvents::Events(Box::new(out)))
k
} else {
trace!("consider container channel events other events {}", k.type_name());
trace!(
"consider container channel events other events {}",
k.type_name()
);
k
}
}
items_2::channelevents::ChannelEvents::Status(_) => {
trace!("consider container channel events status {}", k.type_name());
ChannelEvents::Status(_) => {
trace!(
"consider container channel events status {}",
k.type_name()
);
k
}
}
@@ -156,7 +166,8 @@ fn map_events(x: Sitemty<Box<dyn Events>>) -> Result<CborBytes, Error> {
})
.map_err(|e| Error::Msg(e.to_string()))?;
let mut buf = Vec::with_capacity(64);
ciborium::into_writer(&item, &mut buf).map_err(|e| Error::Msg(e.to_string()))?;
ciborium::into_writer(&item, &mut buf)
.map_err(|e| Error::Msg(e.to_string()))?;
let bytes = Bytes::from(buf);
let item = CborBytes(bytes);
Ok(item)
@@ -239,10 +250,14 @@ impl<S> FramedBytesToSitemtyDynEventsStream<S> {
return Ok(None);
}
let buf = &self.buf[FRAME_HEAD_LEN..frame_len];
let val: ciborium::Value = ciborium::from_reader(std::io::Cursor::new(buf)).map_err(ErrMsg)?;
let val: ciborium::Value =
ciborium::from_reader(std::io::Cursor::new(buf)).map_err(ErrMsg)?;
// debug!("decoded ciborium value {val:?}");
let item = if let Some(map) = val.as_map() {
let keys: Vec<&str> = map.iter().map(|k| k.0.as_text().unwrap_or("(none)")).collect();
let keys: Vec<&str> = map
.iter()
.map(|k| k.0.as_text().unwrap_or("(none)"))
.collect();
debug!("keys {keys:?}");
if let Some(x) = map.get(0) {
if let Some(y) = x.0.as_text() {
@@ -309,7 +324,10 @@ where
},
Ready(None) => {
if self.buf.len() > 0 {
warn!("remaining bytes in input buffer, input closed len {}", self.buf.len());
warn!(
"remaining bytes in input buffer, input closed len {}",
self.buf.len()
);
}
Ready(None)
}
@@ -339,7 +357,11 @@ macro_rules! cbor_wave {
}};
}
fn decode_cbor_to_box_events(buf: &[u8], scalar_type: &ScalarType, shape: &Shape) -> Result<Box<dyn Events>, Error> {
fn decode_cbor_to_box_events(
buf: &[u8],
scalar_type: &ScalarType,
shape: &Shape,
) -> Result<Box<dyn Events>, Error> {
let item: Box<dyn Events> = match shape {
Shape::Scalar => match scalar_type {
ScalarType::U8 => cbor_scalar!(u8, buf),
@@ -352,13 +374,25 @@ fn decode_cbor_to_box_events(buf: &[u8], scalar_type: &ScalarType, shape: &Shape
ScalarType::I64 => cbor_scalar!(i64, buf),
ScalarType::F32 => cbor_scalar!(f32, buf),
ScalarType::F64 => cbor_scalar!(f64, buf),
_ => return Err(ErrMsg(format!("decode_cbor_to_box_events {:?} {:?}", scalar_type, shape)).into()),
_ => {
return Err(ErrMsg(format!(
"decode_cbor_to_box_events {:?} {:?}",
scalar_type, shape
))
.into())
}
},
Shape::Wave(_) => match scalar_type {
ScalarType::U8 => cbor_wave!(u8, buf),
ScalarType::U16 => cbor_wave!(u16, buf),
ScalarType::I64 => cbor_wave!(i64, buf),
_ => return Err(ErrMsg(format!("decode_cbor_to_box_events {:?} {:?}", scalar_type, shape)).into()),
_ => {
return Err(ErrMsg(format!(
"decode_cbor_to_box_events {:?} {:?}",
scalar_type, shape
))
.into())
}
},
Shape::Image(_, _) => todo!(),
};

View File

@@ -77,7 +77,8 @@ impl Collect {
collector: None,
range_final: false,
timeout: false,
timer: timeout_provider.timeout_intervals(deadline.saturating_duration_since(Instant::now())),
timer: timeout_provider
.timeout_intervals(deadline.saturating_duration_since(Instant::now())),
done_input: false,
}
}
@@ -105,7 +106,11 @@ impl Collect {
self.done_input = true;
}
if coll.byte_estimate() >= self.bytes_max {
info!("reached bytes_max {} / {}", coll.byte_estimate(), self.events_max);
info!(
"reached bytes_max {} / {}",
coll.byte_estimate(),
self.events_max
);
coll.set_continue_at_here();
self.done_input = true;
}
@@ -178,13 +183,15 @@ impl Future for Collect {
}
// TODO use range_final and timeout in result.
match self.collector.take() {
Some(mut coll) => match coll.result(self.range.clone(), self.binrange.clone()) {
Ok(res) => {
//info!("collect stats total duration: {:?}", total_duration);
Ready(Ok(CollectResult::Some(res)))
Some(mut coll) => {
match coll.result(self.range.clone(), self.binrange.clone()) {
Ok(res) => {
//info!("collect stats total duration: {:?}", total_duration);
Ready(Ok(CollectResult::Some(res)))
}
Err(e) => Ready(Err(ErrMsg(e).into())),
}
Err(e) => Ready(Err(ErrMsg(e).into())),
},
}
None => {
debug!("no result because no collector was created");
Ready(Ok(CollectResult::Timeout))

View File

@@ -102,7 +102,10 @@ where
.ok_or_else(|| Error::NoResultNoCollector)?
.result(range, binrange)
.map_err(ErrMsg)?;
info!("collect_in_span stats total duration: {:?}", total_duration);
info!(
"collect_in_span stats total duration: {:?}",
total_duration
);
Ok(res)
}

View File

@@ -3,9 +3,8 @@ use futures_util::StreamExt;
use items_0::streamitem::RangeCompletableItem::*;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem::*;
use items_0::Empty;
use items_2::binning::container_events::ContainerEvents;
use items_2::channelevents::ChannelEvents;
use items_2::eventsdim0::EventsDim0;
use netpod::EnumVariant;
use std::pin::Pin;
use std::task::Context;
@@ -30,39 +29,31 @@ impl Stream for ConvertForBinning {
Ready(Some(item)) => match &item {
Ok(DataItem(Data(cevs))) => match cevs {
ChannelEvents::Events(evs) => {
if let Some(evs) = evs.as_any_ref().downcast_ref::<EventsDim0<EnumVariant>>() {
let mut dst = EventsDim0::<u16>::empty();
for ((&ts, &pulse), val) in evs
.tss()
.iter()
.zip(evs.pulses.iter())
.zip(evs.private_values_ref().iter())
{
dst.push_back(ts, pulse, val.ix());
if let Some(evs) = evs
.as_any_ref()
.downcast_ref::<ContainerEvents<EnumVariant>>()
{
let mut dst = ContainerEvents::new();
for (&ts, val) in evs.iter_zip() {
dst.push_back(ts, val.ix);
}
let item = Ok(DataItem(Data(ChannelEvents::Events(Box::new(dst)))));
Ready(Some(item))
} else if let Some(evs) = evs.as_any_ref().downcast_ref::<EventsDim0<bool>>() {
let mut dst = EventsDim0::<u8>::empty();
for ((&ts, &pulse), &val) in evs
.tss()
.iter()
.zip(evs.pulses.iter())
.zip(evs.private_values_ref().iter())
{
dst.push_back(ts, pulse, val as u8);
} else if let Some(evs) =
evs.as_any_ref().downcast_ref::<ContainerEvents<bool>>()
{
let mut dst = ContainerEvents::new();
for (&ts, val) in evs.iter_zip() {
dst.push_back(ts, val as u8);
}
let item = Ok(DataItem(Data(ChannelEvents::Events(Box::new(dst)))));
Ready(Some(item))
} else if let Some(evs) = evs.as_any_ref().downcast_ref::<EventsDim0<String>>() {
let mut dst = EventsDim0::<u64>::empty();
for ((&ts, &pulse), _) in evs
.tss()
.iter()
.zip(evs.pulses.iter())
.zip(evs.private_values_ref().iter())
{
dst.push_back(ts, pulse, 1);
} else if let Some(evs) =
evs.as_any_ref().downcast_ref::<ContainerEvents<String>>()
{
let mut dst = ContainerEvents::new();
for (&ts, _) in evs.iter_zip() {
dst.push_back(ts, 1);
}
let item = Ok(DataItem(Data(ChannelEvents::Events(Box::new(dst)))));
Ready(Some(item))
@@ -95,56 +86,60 @@ impl Stream for ConvertForTesting {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
type Cont<T> = ContainerEvents<T>;
match self.inp.poll_next_unpin(cx) {
Ready(Some(item)) => match &item {
Ok(DataItem(Data(cevs))) => match cevs {
ChannelEvents::Events(evs) => {
if let Some(evs) = evs.as_any_ref().downcast_ref::<EventsDim0<f64>>() {
if let Some(evs) = evs.as_any_ref().downcast_ref::<Cont<f64>>() {
let buf = std::fs::read("evmod").unwrap_or(Vec::new());
let s = String::from_utf8_lossy(&buf);
if s.contains("u8") {
use items_0::Empty;
let mut dst = EventsDim0::<u8>::empty();
for (ts, val) in evs.tss().iter().zip(evs.private_values_ref().iter()) {
let mut dst = Cont::new();
for (&ts, val) in evs.iter_zip() {
let v = (val * 1e6) as u8;
dst.push_back(*ts, 0, v);
dst.push_back(ts, v);
}
let item = Ok(DataItem(Data(ChannelEvents::Events(Box::new(dst)))));
Ready(Some(item))
} else if s.contains("i16") {
use items_0::Empty;
let mut dst = EventsDim0::<i16>::empty();
for (ts, val) in evs.tss().iter().zip(evs.private_values_ref().iter()) {
let mut dst = Cont::new();
for (&ts, val) in evs.iter_zip() {
let v = (val * 1e6) as i16 - 50;
dst.push_back(*ts, 0, v);
dst.push_back(ts, v);
}
let item = Ok(DataItem(Data(ChannelEvents::Events(Box::new(dst)))));
Ready(Some(item))
} else if s.contains("bool") {
use items_0::Empty;
let mut dst = EventsDim0::<bool>::empty();
for (ts, val) in evs.tss().iter().zip(evs.private_values_ref().iter()) {
let mut dst = Cont::new();
for (&ts, val) in evs.iter_zip() {
let g = u64::from_ne_bytes(val.to_ne_bytes());
let val = g % 2 == 0;
dst.push_back(*ts, 0, val);
dst.push_back(ts, val);
}
let item = Ok(DataItem(Data(ChannelEvents::Events(Box::new(dst)))));
Ready(Some(item))
} else if s.contains("enum") {
use items_0::Empty;
let mut dst = EventsDim0::<EnumVariant>::empty();
for (ts, val) in evs.tss().iter().zip(evs.private_values_ref().iter()) {
let mut dst = Cont::new();
for (&ts, val) in evs.iter_zip() {
let buf = val.to_ne_bytes();
let h = buf[0] ^ buf[1] ^ buf[2] ^ buf[3] ^ buf[4] ^ buf[5] ^ buf[6] ^ buf[7];
dst.push_back(*ts, 0, EnumVariant::new(h as u16, h.to_string()));
let h = buf[0]
^ buf[1]
^ buf[2]
^ buf[3]
^ buf[4]
^ buf[5]
^ buf[6]
^ buf[7];
let val = EnumVariant::new(h as u16, h.to_string());
dst.push_back(ts, val);
}
let item = Ok(DataItem(Data(ChannelEvents::Events(Box::new(dst)))));
Ready(Some(item))
} else if s.contains("string") {
use items_0::Empty;
let mut dst = EventsDim0::<String>::empty();
for (ts, val) in evs.tss().iter().zip(evs.private_values_ref().iter()) {
dst.push_back(*ts, 0, val.to_string());
let mut dst = Cont::new();
for (&ts, val) in evs.iter_zip() {
dst.push_back(ts, val.to_string());
}
let item = Ok(DataItem(Data(ChannelEvents::Events(Box::new(dst)))));
Ready(Some(item))

View File

@@ -74,7 +74,8 @@ where
assert!(self.buf.len() <= self.buf.capacity());
if self.buf.capacity() < frame_len {
let add_max = BUF_MAX - self.buf.capacity().min(BUF_MAX);
let nadd = ((frame_len.min(FRAME_PAYLOAD_MAX as usize) - self.buf.len()) * 2).min(add_max);
let nadd =
((frame_len.min(FRAME_PAYLOAD_MAX as usize) - self.buf.len()) * 2).min(add_max);
self.buf.reserve(nadd);
}
let adv = (frame_len + 7) / 8 * 8;
@@ -116,7 +117,10 @@ where
},
Ready(None) => {
if self.buf.len() > 0 {
warn!("remaining bytes in input buffer, input closed len {}", self.buf.len());
warn!(
"remaining bytes in input buffer, input closed len {}",
self.buf.len()
);
}
self.state = State::Done;
Ready(None)

View File

@@ -71,12 +71,14 @@ where
Ok(item) => match item {
Ok(item) => match item {
StreamItem::DataItem(item2) => match item2 {
RangeCompletableItem::Data(item3) => {
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item3)))))
}
RangeCompletableItem::Data(item3) => Ready(Some(Ok(
StreamItem::DataItem(RangeCompletableItem::Data(item3)),
))),
RangeCompletableItem::RangeComplete => {
debug!("EventsFromFrames RangeComplete");
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))))
Ready(Some(Ok(StreamItem::DataItem(
RangeCompletableItem::RangeComplete,
))))
}
},
StreamItem::Log(k) => {

View File

@@ -230,7 +230,10 @@ where
}
}
Ready(Err(e)) => {
error!("poll_upstream need_min {} buf {:?} {:?}", self.need_min, self.buf, e);
error!(
"poll_upstream need_min {} buf {:?} {:?}",
self.need_min, self.buf, e
);
self.done = true;
Ready(Some(Err(e)))
}

View File

@@ -11,18 +11,15 @@ use items_0::streamitem::sitem_err2_from_string;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
use items_0::Appendable;
use items_0::Empty;
use items_0::WithLen;
use items_2::binning::container_events::ContainerEvents;
use items_2::channelevents::ChannelEvents;
use items_2::empty::empty_events_dyn_ev;
use items_2::eventsdim0::EventsDim0;
use items_2::eventsdim1::EventsDim1;
use items_2::framable::Framable;
use netpod::log::*;
use netpod::range::evrange::SeriesRange;
use netpod::timeunits::DAY;
use netpod::timeunits::MS;
use netpod::TsNano;
use query::api4::events::EventsSubQuery;
use std::f64::consts::PI;
use std::pin::Pin;
@@ -173,15 +170,14 @@ impl GenerateI32V00 {
fn make_batch(&mut self) -> Sitemty<ChannelEvents> {
type T = i32;
let mut item = EventsDim0::empty();
let mut item = ContainerEvents::new();
let mut ts = self.ts;
loop {
if self.ts >= self.tsend || item.byte_estimate() > 100 {
break;
}
let pulse = ts;
let value = (ts / (MS * 100) % 1000) as T;
item.push(ts, pulse, value);
item.push_back(TsNano::from_ns(ts), value);
ts += self.dts;
}
self.ts = ts;
@@ -271,21 +267,20 @@ impl GenerateI32V01 {
fn make_batch(&mut self) -> Sitemty<ChannelEvents> {
type T = i32;
let mut item = EventsDim0::empty();
let mut item = ContainerEvents::new();
let mut ts = self.ts;
loop {
if self.ts >= self.tsend || item.byte_estimate() > 100 {
break;
}
let pulse = ts;
let value = (ts / self.ivl) as T;
if false {
info!(
"v01 node {} made event ts {} pulse {} value {}",
self.node_ix, ts, pulse, value
"v01 node {} made event ts {} value {}",
self.node_ix, ts, value
);
}
item.push(ts, pulse, value);
item.push_back(TsNano::from_ns(ts), value);
ts += self.dts;
}
self.ts = ts;
@@ -373,13 +368,12 @@ impl GenerateF64V00 {
fn make_batch(&mut self) -> Sitemty<ChannelEvents> {
type T = f64;
let mut item = EventsDim1::empty();
let mut item = ContainerEvents::new();
let mut ts = self.ts;
loop {
if self.ts >= self.tsend || item.byte_estimate() > 400 {
break;
}
let pulse = ts;
let ampl = ((ts / self.ivl) as T).sin() + 2.;
let mut value = Vec::new();
let pi = PI;
@@ -389,11 +383,11 @@ impl GenerateF64V00 {
}
if false {
info!(
"v01 node {} made event ts {} pulse {} value {:?}",
self.node_ix, ts, pulse, value
"v01 node {} made event ts {} value {:?}",
self.node_ix, ts, value
);
}
item.push(ts, pulse, value);
item.push_back(TsNano::from_ns(ts), value);
ts += self.dts;
}
self.ts = ts;
@@ -486,13 +480,12 @@ impl GenerateWaveI16V00 {
fn make_batch(&mut self) -> Sitemty<ChannelEvents> {
type T = i16;
let mut item = EventsDim1::empty();
let mut item = ContainerEvents::new();
let mut ts = self.ts;
loop {
if self.ts >= self.tsend || item.byte_estimate() > 1024 * 20 {
break;
}
let pulse = ts;
let ampl = ((ts / self.ivl) as f32).sin() + 2.;
let mut value = Vec::new();
let pi = std::f32::consts::PI;
@@ -502,11 +495,11 @@ impl GenerateWaveI16V00 {
}
if false {
info!(
"v01 node {} made event ts {} pulse {} value {:?}",
self.node_ix, ts, pulse, value
"v01 node {} made event ts {} value {:?}",
self.node_ix, ts, value
);
}
item.push(ts, pulse, value);
item.push_back(TsNano::from_ns(ts), value);
ts += self.dts;
}
self.ts = ts;

View File

@@ -8,7 +8,10 @@ use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
use items_0::Events;
use items_0::WithLen;
use items_2::binning::container_events::ContainerEvents;
use items_2::channelevents::ChannelEvents;
use netpod::log::*;
use netpod::EnumVariant;
use std::pin::Pin;
use std::time::Duration;
@@ -85,27 +88,32 @@ fn map_events(x: Sitemty<Box<dyn Events>>) -> Result<JsonBytes, Error> {
StreamItem::DataItem(x) => match x {
RangeCompletableItem::Data(evs) => {
let mut k = evs;
let evs = if let Some(j) = k.as_any_mut().downcast_mut::<items_2::channelevents::ChannelEvents>() {
use items_0::AsAnyMut;
let evs = if let Some(j) = k.as_any_mut().downcast_mut::<ChannelEvents>() {
match j {
items_2::channelevents::ChannelEvents::Events(m) => {
ChannelEvents::Events(m) => {
if let Some(g) = m
.as_any_mut()
.downcast_mut::<items_2::eventsdim0::EventsDim0<netpod::EnumVariant>>()
.downcast_mut::<ContainerEvents<EnumVariant>>()
{
trace!("consider container EnumVariant");
let mut out = items_2::eventsdim0enum::EventsDim0Enum::new();
for (&ts, val) in g.tss.iter().zip(g.values.iter()) {
out.push_back(ts, val.ix(), val.name_string());
let mut out = ContainerEvents::new();
for (&ts, val) in g.iter_zip() {
out.push_back(ts, val.name.to_string());
}
Box::new(items_2::channelevents::ChannelEvents::Events(Box::new(out)))
Box::new(ChannelEvents::Events(Box::new(out)))
} else {
trace!("consider container channel events other events {}", k.type_name());
trace!(
"consider container channel events other events {}",
k.type_name()
);
k
}
}
items_2::channelevents::ChannelEvents::Status(_) => {
trace!("consider container channel events status {}", k.type_name());
ChannelEvents::Status(_) => {
trace!(
"consider container channel events status {}",
k.type_name()
);
k
}
}

View File

@@ -22,7 +22,9 @@ pub struct NeedMinBuffer {
impl NeedMinBuffer {
pub fn new(
inp: Pin<Box<dyn Stream<Item = Result<FileChunkRead, items_0::streamitem::SitemErrTy>> + Send>>,
inp: Pin<
Box<dyn Stream<Item = Result<FileChunkRead, items_0::streamitem::SitemErrTy>> + Send>,
>,
) -> Self {
Self {
inp,
@@ -47,7 +49,10 @@ impl NeedMinBuffer {
// TODO collect somewhere else
impl Drop for NeedMinBuffer {
fn drop(&mut self) {
debug!("NeedMinBuffer-drop {{ buf_len_histo: {:?} }}", self.buf_len_histo);
debug!(
"NeedMinBuffer-drop {{ buf_len_histo: {:?} }}",
self.buf_len_histo
);
}
}

View File

@@ -43,7 +43,10 @@ pub async fn plain_events_json(
let stream = stream.map(move |k| {
on_sitemty_data!(k, |mut k: Box<dyn items_0::Events>| {
if let Some(j) = k.as_any_mut().downcast_mut::<items_2::channelevents::ChannelEvents>() {
if let Some(j) = k
.as_any_mut()
.downcast_mut::<items_2::channelevents::ChannelEvents>()
{
use items_0::AsAnyMut;
match j {
items_2::channelevents::ChannelEvents::Events(m) => {
@@ -59,13 +62,19 @@ pub async fn plain_events_json(
let k: Box<dyn CollectableDyn> = Box::new(out);
Ok(StreamItem::DataItem(RangeCompletableItem::Data(k)))
} else {
trace!("consider container channel events other events {}", k.type_name());
trace!(
"consider container channel events other events {}",
k.type_name()
);
let k: Box<dyn CollectableDyn> = Box::new(k);
Ok(StreamItem::DataItem(RangeCompletableItem::Data(k)))
}
}
items_2::channelevents::ChannelEvents::Status(_) => {
trace!("consider container channel events status {}", k.type_name());
trace!(
"consider container channel events status {}",
k.type_name()
);
let k: Box<dyn CollectableDyn> = Box::new(k);
Ok(StreamItem::DataItem(RangeCompletableItem::Data(k)))
}

View File

@@ -3,13 +3,13 @@ mod test;
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::merge::DrainIntoNewResult;
use items_0::merge::MergeableTy;
use items_0::streamitem::sitem_err_from_string;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StatsItem;
use items_0::streamitem::StreamItem;
use items_0::MergeError;
use items_2::merger::Mergeable;
use netpod::log::*;
use netpod::range::evrange::NanoRange;
use netpod::RangeFilterStats;
@@ -25,13 +25,13 @@ macro_rules! trace_emit { ($det:expr, $($arg:tt)*) => ( if false && $det { trace
#[derive(Debug, thiserror::Error)]
#[cstm(name = "Rangefilter")]
pub enum Error {
Merge(#[from] MergeError),
DrainUnclean,
}
pub struct RangeFilter2<S, ITY>
where
S: Stream<Item = Sitemty<ITY>> + Unpin,
ITY: Mergeable,
ITY: MergeableTy,
{
inp: S,
range: NanoRange,
@@ -50,7 +50,7 @@ where
impl<S, ITY> RangeFilter2<S, ITY>
where
S: Stream<Item = Sitemty<ITY>> + Unpin,
ITY: Mergeable,
ITY: MergeableTy,
{
pub fn type_name() -> &'static str {
std::any::type_name::<Self>()
@@ -81,45 +81,54 @@ where
}
}
fn prune_high(&mut self, mut item: ITY, ts: u64) -> Result<ITY, Error> {
fn prune_high(&mut self, mut item: ITY, ts: TsNano) -> Result<ITY, Error> {
let n = item.len();
let ret = match item.find_highest_index_lt(ts) {
Some(ihlt) => {
let n = item.len();
if ihlt + 1 == n {
// TODO gather stats, this should be the most common case.
self.stats.items_no_prune_high += 1;
item
} else {
self.stats.items_part_prune_high += 1;
let mut dummy = item.new_empty();
match item.drain_into(&mut dummy, (ihlt + 1, n)) {
Ok(_) => {}
Err(e) => match e {
MergeError::NotCompatible => {
error!("logic error")
}
MergeError::Full => error!("full, logic error"),
},
match item.drain_into_new(ihlt + 1..n) {
DrainIntoNewResult::Done(_) => {}
DrainIntoNewResult::Partial(_) => {
error!("full, logic error");
}
DrainIntoNewResult::NotCompatible => {
error!("logic error");
}
}
item
}
}
None => {
// TODO should not happen often, observe.
self.stats.items_all_prune_high += 1;
item.new_empty()
match item.drain_into_new(0..n) {
DrainIntoNewResult::Done(_) => {}
DrainIntoNewResult::Partial(_) => {
error!("full, logic error");
}
DrainIntoNewResult::NotCompatible => {
error!("logic error");
}
}
item
}
};
Ok(ret)
}
fn handle_item(&mut self, item: ITY) -> Result<ITY, Error> {
fn handle_item(&mut self, item: ITY) -> Result<Option<ITY>, Error> {
if let Some(ts_min) = item.ts_min() {
if ts_min < self.range.beg() {
if ts_min.ns() < self.range.beg() {
debug!("ITEM BEFORE RANGE (how many?)");
}
}
let min = item.ts_min().map(|x| TsNano::from_ns(x).fmt());
let max = item.ts_max().map(|x| TsNano::from_ns(x).fmt());
let min = item.ts_min();
let max = item.ts_max();
trace_emit!(
self.trdet,
"see event len {} min {:?} max {:?}",
@@ -127,62 +136,74 @@ where
min,
max
);
let mut item = self.prune_high(item, self.range.end)?;
let ret = if self.one_before {
let lige = item.find_lowest_index_ge(self.range.beg);
let mut item = self.prune_high(item, TsNano::from_ns(self.range.end))?;
if self.one_before {
let lige = item.find_lowest_index_ge(TsNano::from_ns(self.range.beg));
trace_emit!(self.trdet, "YES one_before_range ilge {:?}", lige);
match lige {
Some(lige) => {
if lige == 0 {
if let Some(sl1) = self.slot1.take() {
self.slot1 = Some(item);
sl1
Ok(Some(sl1))
} else {
item
Ok(Some(item))
}
} else {
trace_emit!(self.trdet, "discarding events len {:?}", lige - 1);
let mut dummy = item.new_empty();
item.drain_into(&mut dummy, (0, lige - 1))?;
match item.drain_into_new(0..lige - 1) {
DrainIntoNewResult::Done(_) => {}
DrainIntoNewResult::Partial(_) => return Err(Error::DrainUnclean),
DrainIntoNewResult::NotCompatible => return Err(Error::DrainUnclean),
}
self.slot1 = None;
item
Ok(Some(item))
}
}
None => {
// TODO keep stats about this case
trace_emit!(self.trdet, "drain into to keep one before");
let n = item.len();
let mut keep = item.new_empty();
item.drain_into(&mut keep, (n.max(1) - 1, n))?;
self.slot1 = Some(keep);
item.new_empty()
match item.drain_into_new(n.max(1) - 1..n) {
DrainIntoNewResult::Done(keep) => {
self.slot1 = Some(keep);
}
DrainIntoNewResult::Partial(_) => return Err(Error::DrainUnclean),
DrainIntoNewResult::NotCompatible => return Err(Error::DrainUnclean),
}
Ok(None)
}
}
} else {
let lige = item.find_lowest_index_ge(self.range.beg);
let lige = item.find_lowest_index_ge(TsNano::from_ns(self.range.beg));
trace_emit!(self.trdet, "NOT one_before_range ilge {:?}", lige);
match lige {
Some(lige) => {
let mut dummy = item.new_empty();
item.drain_into(&mut dummy, (0, lige))?;
item
match item.drain_into_new(0..lige) {
DrainIntoNewResult::Done(_) => {}
DrainIntoNewResult::Partial(_) => return Err(Error::DrainUnclean),
DrainIntoNewResult::NotCompatible => return Err(Error::DrainUnclean),
}
Ok(Some(item))
}
None => {
// TODO count case for stats
item.new_empty()
Ok(None)
}
}
};
Ok(ret)
}
}
}
impl<S, ITY> RangeFilter2<S, ITY>
where
S: Stream<Item = Sitemty<ITY>> + Unpin,
ITY: Mergeable,
ITY: MergeableTy,
{
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<<Self as Stream>::Item>> {
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Option<<Self as Stream>::Item>> {
use Poll::*;
loop {
break if self.complete {
@@ -199,25 +220,35 @@ where
} else if self.inp_done {
self.raco_done = true;
if self.have_range_complete {
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))))
Ready(Some(Ok(StreamItem::DataItem(
RangeCompletableItem::RangeComplete,
))))
} else {
continue;
}
} else {
match self.inp.poll_next_unpin(cx) {
Ready(Some(item)) => match item {
Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) => match self.handle_item(item) {
Ok(item) => {
trace_emit!(self.trdet, "emit {}", TsMsVecFmt(Mergeable::tss(&item).iter()));
let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)));
Ready(Some(item))
Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) => {
match self.handle_item(item) {
Ok(Some(item)) => {
trace_emit!(
self.trdet,
"emit {}",
TsMsVecFmt(MergeableTy::tss_for_testing(&item).iter())
);
let item =
Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)));
Ready(Some(item))
}
Ok(None) => continue,
Err(e) => {
error!("sees: {e}");
self.inp_done = true;
Ready(Some(sitem_err_from_string(e)))
}
}
Err(e) => {
error!("sees: {e}");
self.inp_done = true;
Ready(Some(sitem_err_from_string(e)))
}
},
}
Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) => {
self.have_range_complete = true;
continue;
@@ -227,7 +258,9 @@ where
Ready(None) => {
self.inp_done = true;
if let Some(sl1) = self.slot1.take() {
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(sl1)))))
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(
sl1,
)))))
} else {
continue;
}
@@ -242,7 +275,7 @@ where
impl<S, ITY> Stream for RangeFilter2<S, ITY>
where
S: Stream<Item = Sitemty<ITY>> + Unpin,
ITY: Mergeable,
ITY: MergeableTy,
{
type Item = Sitemty<ITY>;
@@ -257,20 +290,11 @@ where
impl<S, ITY> fmt::Debug for RangeFilter2<S, ITY>
where
S: Stream<Item = Sitemty<ITY>> + Unpin,
ITY: Mergeable,
ITY: MergeableTy,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("RangeFilter2").field("stats", &self.stats).finish()
}
}
impl<S, ITY> Drop for RangeFilter2<S, ITY>
where
S: Stream<Item = Sitemty<ITY>> + Unpin,
ITY: Mergeable,
{
fn drop(&mut self) {
// Self::type_name()
debug!("drop {:?}", self);
f.debug_struct("RangeFilter2")
.field("stats", &self.stats)
.finish()
}
}

View File

@@ -41,7 +41,10 @@ where
self.timeout_fut = self.timeout_provider.timeout_intervals(ivl)
}
fn handle_timeout(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<<Self as Stream>::Item>> {
fn handle_timeout(
self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Option<<Self as Stream>::Item>> {
use Poll::*;
let tsnow = Instant::now();
if self.last_seen + self.ivl < tsnow {
@@ -56,7 +59,10 @@ where
}
}
fn handle_inp_pending(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<<Self as Stream>::Item>> {
fn handle_inp_pending(
mut self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Option<<Self as Stream>::Item>> {
use Poll::*;
match self.timeout_fut.poll_unpin(cx) {
Ready(()) => self.handle_timeout(cx),

View File

@@ -98,7 +98,13 @@ pub trait HttpSimplePost: Send {
&self,
req: http::Request<http_body_util::Full<Bytes>>,
) -> Pin<
Box<dyn Future<Output = http::Response<http_body_util::combinators::UnsyncBoxBody<Bytes, ErrorBody>>> + Send>,
Box<
dyn Future<
Output = http::Response<
http_body_util::combinators::UnsyncBoxBody<Bytes, ErrorBody>,
>,
> + Send,
>,
>;
}
@@ -132,7 +138,10 @@ pub async fn x_processed_event_blobs_stream_from_node_http(
let frame1 = make_node_command_frame(subq.clone())?;
let item = sitem_data(frame1.clone());
let buf = item.make_frame_dyn()?.freeze();
let url = node.baseurl().join("/api/4/private/eventdata/frames").unwrap();
let url = node
.baseurl()
.join("/api/4/private/eventdata/frames")
.unwrap();
debug!("open_event_data_streams_http post {url}");
let uri: Uri = url.as_str().parse().unwrap();
let body = http_body_util::Full::new(buf);

View File

@@ -7,11 +7,10 @@ use futures_util::stream;
use futures_util::Stream;
use items_0::streamitem::sitem_data;
use items_0::streamitem::Sitemty;
use items_0::Appendable;
use items_0::Empty;
use items_2::binning::container_events::ContainerEvents;
use items_2::channelevents::ChannelEvents;
use items_2::eventsdim0::EventsDim0;
use netpod::timeunits::SEC;
use netpod::TsNano;
use std::pin::Pin;
#[derive(Debug, thiserror::Error)]
@@ -23,9 +22,9 @@ type BoxedEventStream = Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send
// TODO use some xorshift generator.
fn inmem_test_events_d0_i32_00() -> BoxedEventStream {
let mut evs = EventsDim0::empty();
evs.push(SEC * 1, 1, 10001);
evs.push(SEC * 4, 4, 10004);
let mut evs = ContainerEvents::new();
evs.push_back(TsNano::from_ns(SEC * 1), 10001);
evs.push_back(TsNano::from_ns(SEC * 4), 10004);
let cev = ChannelEvents::Events(Box::new(evs));
let item = sitem_data(cev);
let stream = stream::iter([item]);
@@ -33,8 +32,8 @@ fn inmem_test_events_d0_i32_00() -> BoxedEventStream {
}
fn inmem_test_events_d0_i32_01() -> BoxedEventStream {
let mut evs = EventsDim0::empty();
evs.push(SEC * 2, 2, 10002);
let mut evs = ContainerEvents::new();
evs.push_back(TsNano::from_ns(SEC * 2), 10002);
let cev = ChannelEvents::Events(Box::new(evs));
let item = sitem_data(cev);
let stream = stream::iter([item]);

View File

@@ -1,6 +1,5 @@
use crate::timebin::cached::reader::EventsReadProvider;
use crate::timebin::cached::reader::EventsReading;
use futures_util::StreamExt;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
@@ -24,8 +23,8 @@ impl TestEventsReader {
impl EventsReadProvider for TestEventsReader {
fn read(&self, evq: EventsSubQuery) -> EventsReading {
let stream = items_2::testgen::events_gen::old_events_gen_dim0_f32_v00(self.range.clone());
let stream = stream
let iter = items_2::testgen::events_gen::new_events_gen_dim1_f32_v00(self.range.clone());
let iter = iter
.map(|x| {
let x = Box::new(x);
let x = ChannelEvents::Events(x);
@@ -36,9 +35,9 @@ impl EventsReadProvider for TestEventsReader {
use RangeCompletableItem::*;
use StreamItem::*;
let item1 = Ok(DataItem(RangeComplete));
futures_util::stream::iter([item1])
[item1].into_iter()
});
let stream = Box::pin(stream);
let stream = Box::pin(futures_util::stream::iter(iter));
let ret = EventsReading::new(stream);
ret
}

View File

@@ -88,3 +88,65 @@ fn timebin_from_layers() {
.unwrap();
rt.block_on(timebin_from_layers_inner()).unwrap()
}
async fn timebin_from_layers_1layer_inner() -> Result<(), Error> {
let ctx = Arc::new(ReqCtx::for_test());
let ch_conf = ChannelTypeConfigGen::Scylla(ChConf::new(
"testing",
123,
SeriesKind::ChannelData,
ScalarType::F32,
Shape::Scalar,
"basictest-f32",
));
let cache_usage = CacheUsage::Ignore;
let transform_query = TransformQuery::default_time_binned();
let nano_range = NanoRange {
beg: 1000 * 1000 * 1000 * 1,
end: 1000 * 1000 * 1000 * 2,
};
let cache_read_provider = Arc::new(DummyCacheReadProvider::new());
let events_read_provider = Arc::new(TestEventsReader::new(nano_range.clone()));
// let one_before_range = true;
// let series_range = SeriesRange::TimeRange(nano_range.clone());
// let select = EventsSubQuerySelect::new(
// ch_conf.clone(),
// series_range,
// one_before_range,
// transform_query.clone(),
// );
let settings = EventsSubQuerySettings::default();
// let reqid = ctx.reqid().into();
let log_level = "INFO";
// let query = EventsSubQuery::from_parts(select, settings.clone(), reqid, log_level.into());
let bin_len_layers = [20, 100].into_iter().map(DtMs::from_ms_u64).collect();
let do_time_weight = true;
let bin_len = DtMs::from_ms_u64(200);
let range = BinnedRange::from_nano_range(nano_range, bin_len);
let mut stream = TimeBinnedFromLayers::new(
ch_conf,
cache_usage,
transform_query,
settings,
log_level.into(),
ctx,
range,
do_time_weight,
bin_len_layers,
cache_read_provider,
events_read_provider,
)?;
while let Some(x) = stream.next().await {
let item = x.map_err(|e| Error::Msg(e.to_string()))?;
trace!("item {:?}", item);
}
Ok(())
}
#[test]
fn timebin_from_layers_1layer() {
let rt = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();
rt.block_on(timebin_from_layers_1layer_inner()).unwrap()
}

View File

@@ -1,13 +1,14 @@
use super::cached::reader::CacheReadProvider;
use super::cached::reader::EventsReadProvider;
use crate::log::*;
use crate::timebin::fromevents::BinnedFromEvents;
use crate::timebin::gapfill::GapFill;
use crate::timebin::grid::find_next_finer_bin_len;
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::streamitem::Sitemty;
use items_0::timebin::BinsBoxed;
use items_2::binning::timeweight::timeweight_bins_stream::BinnedBinsTimeweightStream;
use netpod::log::*;
use netpod::query::CacheUsage;
use netpod::range::evrange::SeriesRange;
use netpod::BinnedRange;
@@ -67,7 +68,7 @@ impl TimeBinnedFromLayers {
let bin_len = DtMs::from_ms_u64(range.bin_len.ms());
if bin_len_layers.contains(&bin_len) {
debug!("{}::new bin_len in layers {:?}", Self::type_name(), range);
let inp = super::gapfill::GapFill::new(
let inp = GapFill::new(
"FromLayers-ongrid".into(),
ch_conf.clone(),
cache_usage.clone(),
@@ -84,6 +85,11 @@ impl TimeBinnedFromLayers {
let ret = Self { inp: Box::pin(inp) };
Ok(ret)
} else {
debug!(
"{}::new bin_len off layers {:?}",
Self::type_name(),
range
);
match find_next_finer_bin_len(bin_len, &bin_len_layers) {
Some(finer) => {
if bin_len.ms() % finer.ms() != 0 {
@@ -96,7 +102,7 @@ impl TimeBinnedFromLayers {
finer,
range_finer
);
let inp = super::gapfill::GapFill::new(
let inp = GapFill::new(
"FromLayers-finergrid".into(),
ch_conf.clone(),
cache_usage.clone(),

View File

@@ -1,5 +1,6 @@
use super::cached::reader::CacheReadProvider;
use super::cached::reader::EventsReadProvider;
use crate::log::*;
use crate::timebin::fromevents::BinnedFromEvents;
use futures_util::FutureExt;
use futures_util::Stream;
@@ -10,7 +11,6 @@ use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
use items_0::timebin::BinsBoxed;
use items_2::binning::timeweight::timeweight_bins_stream::BinnedBinsTimeweightStream;
use netpod::log::*;
use netpod::query::CacheUsage;
use netpod::range::evrange::NanoRange;
use netpod::range::evrange::SeriesRange;

View File

@@ -0,0 +1 @@

View File

@@ -110,7 +110,7 @@ pub async fn timebinnable_stream_sf_databuffer_channelevents(
use StreamItem::*;
match k {
Ok(DataItem(Data(ChannelEvents::Events(k)))) => {
let k = k.to_dim0_f32_for_binning();
// let k = k.to_dim0_f32_for_binning();
Ok(StreamItem::DataItem(RangeCompletableItem::Data(
ChannelEvents::Events(k),
)))