From 760e0abed44cfaef84452c599592bc55bc131116 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Sun, 24 Nov 2024 22:32:42 +0100 Subject: [PATCH] WIP new container --- src/boxed.rs | 4 +- src/cbor_stream.rs | 80 +++++++++++----- src/collect.rs | 23 +++-- src/collect_adapter.rs | 5 +- src/events/convertforbinning.rs | 97 +++++++++---------- src/framed_bytes.rs | 8 +- src/frames/eventsfromframes.rs | 10 +- src/frames/inmem.rs | 5 +- src/generators.rs | 39 ++++---- src/json_stream.rs | 30 +++--- src/needminbuffer.rs | 9 +- src/plaineventsjson.rs | 15 ++- src/rangefilter2.rs | 162 ++++++++++++++++++-------------- src/streamtimeout.rs | 10 +- src/tcprawclient.rs | 13 ++- src/test.rs | 15 ++- src/test/events_reader.rs | 9 +- src/test/timebin/fromlayers.rs | 62 ++++++++++++ src/timebin/fromlayers.rs | 12 ++- src/timebin/gapfill.rs | 2 +- src/timebin/timebin.rs | 1 + src/timebinnedjson.rs | 2 +- 22 files changed, 392 insertions(+), 221 deletions(-) diff --git a/src/boxed.rs b/src/boxed.rs index 2e6f178..5ddc5ab 100644 --- a/src/boxed.rs +++ b/src/boxed.rs @@ -33,7 +33,9 @@ where Ok(item) => Ok(match item { StreamItem::DataItem(item) => StreamItem::DataItem(match item { RangeCompletableItem::RangeComplete => RangeCompletableItem::RangeComplete, - RangeCompletableItem::Data(item) => RangeCompletableItem::Data(Box::new(item)), + RangeCompletableItem::Data(item) => { + RangeCompletableItem::Data(Box::new(item)) + } }), StreamItem::Log(item) => StreamItem::Log(item), StreamItem::Stats(item) => StreamItem::Stats(item), diff --git a/src/cbor_stream.rs b/src/cbor_stream.rs index 0a1bf10..594ab37 100644 --- a/src/cbor_stream.rs +++ b/src/cbor_stream.rs @@ -12,12 +12,17 @@ use items_0::streamitem::LogItem; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; +use items_0::AsAnyMut; +use items_0::AsAnyRef; use items_0::Events; use items_0::WithLen; +use items_2::binning::container_events::ContainerEvents; +use items_2::channelevents::ChannelEvents; use items_2::eventsdim0::EventsDim0; use items_2::eventsdim1::EventsDim1; use netpod::log::Level; use netpod::log::*; +use netpod::EnumVariant; use netpod::ScalarType; use netpod::Shape; use std::io::Cursor; @@ -102,41 +107,46 @@ fn map_events(x: Sitemty>) -> Result { StreamItem::DataItem(x) => match x { RangeCompletableItem::Data(evs) => { if false { - use items_0::AsAnyRef; // TODO impl generically on EventsDim0 ? - if let Some(evs) = evs.as_any_ref().downcast_ref::>() { + if let Some(evs) = evs.as_any_ref().downcast_ref::>() { let mut buf = Vec::new(); - ciborium::into_writer(evs, &mut buf).map_err(|e| Error::Msg(e.to_string()))?; + ciborium::into_writer(evs, &mut buf) + .map_err(|e| Error::Msg(e.to_string()))?; let bytes = Bytes::from(buf); let _item = CborBytes(bytes); // Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) } else { - let _item = LogItem::from_node(0, Level::DEBUG, format!("cbor stream discarded item")); + let _item = LogItem::from_node( + 0, + Level::DEBUG, + format!("cbor stream discarded item"), + ); // Ok(StreamItem::Log(item)) }; } let mut k = evs; - let evs = if let Some(j) = k.as_any_mut().downcast_mut::() { - use items_0::AsAnyMut; + let evs = if let Some(j) = k.as_any_mut().downcast_mut::() { match j { - items_2::channelevents::ChannelEvents::Events(m) => { + ChannelEvents::Events(m) => { if let Some(g) = m .as_any_mut() - .downcast_mut::>() + .downcast_mut::>() { trace!("consider container EnumVariant"); - let mut out = items_2::eventsdim0enum::EventsDim0Enum::new(); - for (&ts, val) in g.tss.iter().zip(g.values.iter()) { - out.push_back(ts, val.ix(), val.name_string()); - } - Box::new(items_2::channelevents::ChannelEvents::Events(Box::new(out))) + k } else { - trace!("consider container channel events other events {}", k.type_name()); + trace!( + "consider container channel events other events {}", + k.type_name() + ); k } } - items_2::channelevents::ChannelEvents::Status(_) => { - trace!("consider container channel events status {}", k.type_name()); + ChannelEvents::Status(_) => { + trace!( + "consider container channel events status {}", + k.type_name() + ); k } } @@ -156,7 +166,8 @@ fn map_events(x: Sitemty>) -> Result { }) .map_err(|e| Error::Msg(e.to_string()))?; let mut buf = Vec::with_capacity(64); - ciborium::into_writer(&item, &mut buf).map_err(|e| Error::Msg(e.to_string()))?; + ciborium::into_writer(&item, &mut buf) + .map_err(|e| Error::Msg(e.to_string()))?; let bytes = Bytes::from(buf); let item = CborBytes(bytes); Ok(item) @@ -239,10 +250,14 @@ impl FramedBytesToSitemtyDynEventsStream { return Ok(None); } let buf = &self.buf[FRAME_HEAD_LEN..frame_len]; - let val: ciborium::Value = ciborium::from_reader(std::io::Cursor::new(buf)).map_err(ErrMsg)?; + let val: ciborium::Value = + ciborium::from_reader(std::io::Cursor::new(buf)).map_err(ErrMsg)?; // debug!("decoded ciborium value {val:?}"); let item = if let Some(map) = val.as_map() { - let keys: Vec<&str> = map.iter().map(|k| k.0.as_text().unwrap_or("(none)")).collect(); + let keys: Vec<&str> = map + .iter() + .map(|k| k.0.as_text().unwrap_or("(none)")) + .collect(); debug!("keys {keys:?}"); if let Some(x) = map.get(0) { if let Some(y) = x.0.as_text() { @@ -309,7 +324,10 @@ where }, Ready(None) => { if self.buf.len() > 0 { - warn!("remaining bytes in input buffer, input closed len {}", self.buf.len()); + warn!( + "remaining bytes in input buffer, input closed len {}", + self.buf.len() + ); } Ready(None) } @@ -339,7 +357,11 @@ macro_rules! cbor_wave { }}; } -fn decode_cbor_to_box_events(buf: &[u8], scalar_type: &ScalarType, shape: &Shape) -> Result, Error> { +fn decode_cbor_to_box_events( + buf: &[u8], + scalar_type: &ScalarType, + shape: &Shape, +) -> Result, Error> { let item: Box = match shape { Shape::Scalar => match scalar_type { ScalarType::U8 => cbor_scalar!(u8, buf), @@ -352,13 +374,25 @@ fn decode_cbor_to_box_events(buf: &[u8], scalar_type: &ScalarType, shape: &Shape ScalarType::I64 => cbor_scalar!(i64, buf), ScalarType::F32 => cbor_scalar!(f32, buf), ScalarType::F64 => cbor_scalar!(f64, buf), - _ => return Err(ErrMsg(format!("decode_cbor_to_box_events {:?} {:?}", scalar_type, shape)).into()), + _ => { + return Err(ErrMsg(format!( + "decode_cbor_to_box_events {:?} {:?}", + scalar_type, shape + )) + .into()) + } }, Shape::Wave(_) => match scalar_type { ScalarType::U8 => cbor_wave!(u8, buf), ScalarType::U16 => cbor_wave!(u16, buf), ScalarType::I64 => cbor_wave!(i64, buf), - _ => return Err(ErrMsg(format!("decode_cbor_to_box_events {:?} {:?}", scalar_type, shape)).into()), + _ => { + return Err(ErrMsg(format!( + "decode_cbor_to_box_events {:?} {:?}", + scalar_type, shape + )) + .into()) + } }, Shape::Image(_, _) => todo!(), }; diff --git a/src/collect.rs b/src/collect.rs index 4f9e99e..d0818b9 100644 --- a/src/collect.rs +++ b/src/collect.rs @@ -77,7 +77,8 @@ impl Collect { collector: None, range_final: false, timeout: false, - timer: timeout_provider.timeout_intervals(deadline.saturating_duration_since(Instant::now())), + timer: timeout_provider + .timeout_intervals(deadline.saturating_duration_since(Instant::now())), done_input: false, } } @@ -105,7 +106,11 @@ impl Collect { self.done_input = true; } if coll.byte_estimate() >= self.bytes_max { - info!("reached bytes_max {} / {}", coll.byte_estimate(), self.events_max); + info!( + "reached bytes_max {} / {}", + coll.byte_estimate(), + self.events_max + ); coll.set_continue_at_here(); self.done_input = true; } @@ -178,13 +183,15 @@ impl Future for Collect { } // TODO use range_final and timeout in result. match self.collector.take() { - Some(mut coll) => match coll.result(self.range.clone(), self.binrange.clone()) { - Ok(res) => { - //info!("collect stats total duration: {:?}", total_duration); - Ready(Ok(CollectResult::Some(res))) + Some(mut coll) => { + match coll.result(self.range.clone(), self.binrange.clone()) { + Ok(res) => { + //info!("collect stats total duration: {:?}", total_duration); + Ready(Ok(CollectResult::Some(res))) + } + Err(e) => Ready(Err(ErrMsg(e).into())), } - Err(e) => Ready(Err(ErrMsg(e).into())), - }, + } None => { debug!("no result because no collector was created"); Ready(Ok(CollectResult::Timeout)) diff --git a/src/collect_adapter.rs b/src/collect_adapter.rs index 064c3ac..e5b17c1 100644 --- a/src/collect_adapter.rs +++ b/src/collect_adapter.rs @@ -102,7 +102,10 @@ where .ok_or_else(|| Error::NoResultNoCollector)? .result(range, binrange) .map_err(ErrMsg)?; - info!("collect_in_span stats total duration: {:?}", total_duration); + info!( + "collect_in_span stats total duration: {:?}", + total_duration + ); Ok(res) } diff --git a/src/events/convertforbinning.rs b/src/events/convertforbinning.rs index f4fae64..71963bd 100644 --- a/src/events/convertforbinning.rs +++ b/src/events/convertforbinning.rs @@ -3,9 +3,8 @@ use futures_util::StreamExt; use items_0::streamitem::RangeCompletableItem::*; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem::*; -use items_0::Empty; +use items_2::binning::container_events::ContainerEvents; use items_2::channelevents::ChannelEvents; -use items_2::eventsdim0::EventsDim0; use netpod::EnumVariant; use std::pin::Pin; use std::task::Context; @@ -30,39 +29,31 @@ impl Stream for ConvertForBinning { Ready(Some(item)) => match &item { Ok(DataItem(Data(cevs))) => match cevs { ChannelEvents::Events(evs) => { - if let Some(evs) = evs.as_any_ref().downcast_ref::>() { - let mut dst = EventsDim0::::empty(); - for ((&ts, &pulse), val) in evs - .tss() - .iter() - .zip(evs.pulses.iter()) - .zip(evs.private_values_ref().iter()) - { - dst.push_back(ts, pulse, val.ix()); + if let Some(evs) = evs + .as_any_ref() + .downcast_ref::>() + { + let mut dst = ContainerEvents::new(); + for (&ts, val) in evs.iter_zip() { + dst.push_back(ts, val.ix); } let item = Ok(DataItem(Data(ChannelEvents::Events(Box::new(dst))))); Ready(Some(item)) - } else if let Some(evs) = evs.as_any_ref().downcast_ref::>() { - let mut dst = EventsDim0::::empty(); - for ((&ts, &pulse), &val) in evs - .tss() - .iter() - .zip(evs.pulses.iter()) - .zip(evs.private_values_ref().iter()) - { - dst.push_back(ts, pulse, val as u8); + } else if let Some(evs) = + evs.as_any_ref().downcast_ref::>() + { + let mut dst = ContainerEvents::new(); + for (&ts, val) in evs.iter_zip() { + dst.push_back(ts, val as u8); } let item = Ok(DataItem(Data(ChannelEvents::Events(Box::new(dst))))); Ready(Some(item)) - } else if let Some(evs) = evs.as_any_ref().downcast_ref::>() { - let mut dst = EventsDim0::::empty(); - for ((&ts, &pulse), _) in evs - .tss() - .iter() - .zip(evs.pulses.iter()) - .zip(evs.private_values_ref().iter()) - { - dst.push_back(ts, pulse, 1); + } else if let Some(evs) = + evs.as_any_ref().downcast_ref::>() + { + let mut dst = ContainerEvents::new(); + for (&ts, _) in evs.iter_zip() { + dst.push_back(ts, 1); } let item = Ok(DataItem(Data(ChannelEvents::Events(Box::new(dst))))); Ready(Some(item)) @@ -95,56 +86,60 @@ impl Stream for ConvertForTesting { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; + type Cont = ContainerEvents; match self.inp.poll_next_unpin(cx) { Ready(Some(item)) => match &item { Ok(DataItem(Data(cevs))) => match cevs { ChannelEvents::Events(evs) => { - if let Some(evs) = evs.as_any_ref().downcast_ref::>() { + if let Some(evs) = evs.as_any_ref().downcast_ref::>() { let buf = std::fs::read("evmod").unwrap_or(Vec::new()); let s = String::from_utf8_lossy(&buf); if s.contains("u8") { - use items_0::Empty; - let mut dst = EventsDim0::::empty(); - for (ts, val) in evs.tss().iter().zip(evs.private_values_ref().iter()) { + let mut dst = Cont::new(); + for (&ts, val) in evs.iter_zip() { let v = (val * 1e6) as u8; - dst.push_back(*ts, 0, v); + dst.push_back(ts, v); } let item = Ok(DataItem(Data(ChannelEvents::Events(Box::new(dst))))); Ready(Some(item)) } else if s.contains("i16") { - use items_0::Empty; - let mut dst = EventsDim0::::empty(); - for (ts, val) in evs.tss().iter().zip(evs.private_values_ref().iter()) { + let mut dst = Cont::new(); + for (&ts, val) in evs.iter_zip() { let v = (val * 1e6) as i16 - 50; - dst.push_back(*ts, 0, v); + dst.push_back(ts, v); } let item = Ok(DataItem(Data(ChannelEvents::Events(Box::new(dst))))); Ready(Some(item)) } else if s.contains("bool") { - use items_0::Empty; - let mut dst = EventsDim0::::empty(); - for (ts, val) in evs.tss().iter().zip(evs.private_values_ref().iter()) { + let mut dst = Cont::new(); + for (&ts, val) in evs.iter_zip() { let g = u64::from_ne_bytes(val.to_ne_bytes()); let val = g % 2 == 0; - dst.push_back(*ts, 0, val); + dst.push_back(ts, val); } let item = Ok(DataItem(Data(ChannelEvents::Events(Box::new(dst))))); Ready(Some(item)) } else if s.contains("enum") { - use items_0::Empty; - let mut dst = EventsDim0::::empty(); - for (ts, val) in evs.tss().iter().zip(evs.private_values_ref().iter()) { + let mut dst = Cont::new(); + for (&ts, val) in evs.iter_zip() { let buf = val.to_ne_bytes(); - let h = buf[0] ^ buf[1] ^ buf[2] ^ buf[3] ^ buf[4] ^ buf[5] ^ buf[6] ^ buf[7]; - dst.push_back(*ts, 0, EnumVariant::new(h as u16, h.to_string())); + let h = buf[0] + ^ buf[1] + ^ buf[2] + ^ buf[3] + ^ buf[4] + ^ buf[5] + ^ buf[6] + ^ buf[7]; + let val = EnumVariant::new(h as u16, h.to_string()); + dst.push_back(ts, val); } let item = Ok(DataItem(Data(ChannelEvents::Events(Box::new(dst))))); Ready(Some(item)) } else if s.contains("string") { - use items_0::Empty; - let mut dst = EventsDim0::::empty(); - for (ts, val) in evs.tss().iter().zip(evs.private_values_ref().iter()) { - dst.push_back(*ts, 0, val.to_string()); + let mut dst = Cont::new(); + for (&ts, val) in evs.iter_zip() { + dst.push_back(ts, val.to_string()); } let item = Ok(DataItem(Data(ChannelEvents::Events(Box::new(dst))))); Ready(Some(item)) diff --git a/src/framed_bytes.rs b/src/framed_bytes.rs index b4fe883..9731de0 100644 --- a/src/framed_bytes.rs +++ b/src/framed_bytes.rs @@ -74,7 +74,8 @@ where assert!(self.buf.len() <= self.buf.capacity()); if self.buf.capacity() < frame_len { let add_max = BUF_MAX - self.buf.capacity().min(BUF_MAX); - let nadd = ((frame_len.min(FRAME_PAYLOAD_MAX as usize) - self.buf.len()) * 2).min(add_max); + let nadd = + ((frame_len.min(FRAME_PAYLOAD_MAX as usize) - self.buf.len()) * 2).min(add_max); self.buf.reserve(nadd); } let adv = (frame_len + 7) / 8 * 8; @@ -116,7 +117,10 @@ where }, Ready(None) => { if self.buf.len() > 0 { - warn!("remaining bytes in input buffer, input closed len {}", self.buf.len()); + warn!( + "remaining bytes in input buffer, input closed len {}", + self.buf.len() + ); } self.state = State::Done; Ready(None) diff --git a/src/frames/eventsfromframes.rs b/src/frames/eventsfromframes.rs index d87e189..ba036e6 100644 --- a/src/frames/eventsfromframes.rs +++ b/src/frames/eventsfromframes.rs @@ -71,12 +71,14 @@ where Ok(item) => match item { Ok(item) => match item { StreamItem::DataItem(item2) => match item2 { - RangeCompletableItem::Data(item3) => { - Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item3))))) - } + RangeCompletableItem::Data(item3) => Ready(Some(Ok( + StreamItem::DataItem(RangeCompletableItem::Data(item3)), + ))), RangeCompletableItem::RangeComplete => { debug!("EventsFromFrames RangeComplete"); - Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) + Ready(Some(Ok(StreamItem::DataItem( + RangeCompletableItem::RangeComplete, + )))) } }, StreamItem::Log(k) => { diff --git a/src/frames/inmem.rs b/src/frames/inmem.rs index 5cfda2e..e5bb1a6 100644 --- a/src/frames/inmem.rs +++ b/src/frames/inmem.rs @@ -230,7 +230,10 @@ where } } Ready(Err(e)) => { - error!("poll_upstream need_min {} buf {:?} {:?}", self.need_min, self.buf, e); + error!( + "poll_upstream need_min {} buf {:?} {:?}", + self.need_min, self.buf, e + ); self.done = true; Ready(Some(Err(e))) } diff --git a/src/generators.rs b/src/generators.rs index 6b15e92..d3386ec 100644 --- a/src/generators.rs +++ b/src/generators.rs @@ -11,18 +11,15 @@ use items_0::streamitem::sitem_err2_from_string; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; -use items_0::Appendable; -use items_0::Empty; -use items_0::WithLen; +use items_2::binning::container_events::ContainerEvents; use items_2::channelevents::ChannelEvents; use items_2::empty::empty_events_dyn_ev; -use items_2::eventsdim0::EventsDim0; -use items_2::eventsdim1::EventsDim1; use items_2::framable::Framable; use netpod::log::*; use netpod::range::evrange::SeriesRange; use netpod::timeunits::DAY; use netpod::timeunits::MS; +use netpod::TsNano; use query::api4::events::EventsSubQuery; use std::f64::consts::PI; use std::pin::Pin; @@ -173,15 +170,14 @@ impl GenerateI32V00 { fn make_batch(&mut self) -> Sitemty { type T = i32; - let mut item = EventsDim0::empty(); + let mut item = ContainerEvents::new(); let mut ts = self.ts; loop { if self.ts >= self.tsend || item.byte_estimate() > 100 { break; } - let pulse = ts; let value = (ts / (MS * 100) % 1000) as T; - item.push(ts, pulse, value); + item.push_back(TsNano::from_ns(ts), value); ts += self.dts; } self.ts = ts; @@ -271,21 +267,20 @@ impl GenerateI32V01 { fn make_batch(&mut self) -> Sitemty { type T = i32; - let mut item = EventsDim0::empty(); + let mut item = ContainerEvents::new(); let mut ts = self.ts; loop { if self.ts >= self.tsend || item.byte_estimate() > 100 { break; } - let pulse = ts; let value = (ts / self.ivl) as T; if false { info!( - "v01 node {} made event ts {} pulse {} value {}", - self.node_ix, ts, pulse, value + "v01 node {} made event ts {} value {}", + self.node_ix, ts, value ); } - item.push(ts, pulse, value); + item.push_back(TsNano::from_ns(ts), value); ts += self.dts; } self.ts = ts; @@ -373,13 +368,12 @@ impl GenerateF64V00 { fn make_batch(&mut self) -> Sitemty { type T = f64; - let mut item = EventsDim1::empty(); + let mut item = ContainerEvents::new(); let mut ts = self.ts; loop { if self.ts >= self.tsend || item.byte_estimate() > 400 { break; } - let pulse = ts; let ampl = ((ts / self.ivl) as T).sin() + 2.; let mut value = Vec::new(); let pi = PI; @@ -389,11 +383,11 @@ impl GenerateF64V00 { } if false { info!( - "v01 node {} made event ts {} pulse {} value {:?}", - self.node_ix, ts, pulse, value + "v01 node {} made event ts {} value {:?}", + self.node_ix, ts, value ); } - item.push(ts, pulse, value); + item.push_back(TsNano::from_ns(ts), value); ts += self.dts; } self.ts = ts; @@ -486,13 +480,12 @@ impl GenerateWaveI16V00 { fn make_batch(&mut self) -> Sitemty { type T = i16; - let mut item = EventsDim1::empty(); + let mut item = ContainerEvents::new(); let mut ts = self.ts; loop { if self.ts >= self.tsend || item.byte_estimate() > 1024 * 20 { break; } - let pulse = ts; let ampl = ((ts / self.ivl) as f32).sin() + 2.; let mut value = Vec::new(); let pi = std::f32::consts::PI; @@ -502,11 +495,11 @@ impl GenerateWaveI16V00 { } if false { info!( - "v01 node {} made event ts {} pulse {} value {:?}", - self.node_ix, ts, pulse, value + "v01 node {} made event ts {} value {:?}", + self.node_ix, ts, value ); } - item.push(ts, pulse, value); + item.push_back(TsNano::from_ns(ts), value); ts += self.dts; } self.ts = ts; diff --git a/src/json_stream.rs b/src/json_stream.rs index 0f922c1..68f1728 100644 --- a/src/json_stream.rs +++ b/src/json_stream.rs @@ -8,7 +8,10 @@ use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; use items_0::Events; use items_0::WithLen; +use items_2::binning::container_events::ContainerEvents; +use items_2::channelevents::ChannelEvents; use netpod::log::*; +use netpod::EnumVariant; use std::pin::Pin; use std::time::Duration; @@ -85,27 +88,32 @@ fn map_events(x: Sitemty>) -> Result { StreamItem::DataItem(x) => match x { RangeCompletableItem::Data(evs) => { let mut k = evs; - let evs = if let Some(j) = k.as_any_mut().downcast_mut::() { - use items_0::AsAnyMut; + let evs = if let Some(j) = k.as_any_mut().downcast_mut::() { match j { - items_2::channelevents::ChannelEvents::Events(m) => { + ChannelEvents::Events(m) => { if let Some(g) = m .as_any_mut() - .downcast_mut::>() + .downcast_mut::>() { trace!("consider container EnumVariant"); - let mut out = items_2::eventsdim0enum::EventsDim0Enum::new(); - for (&ts, val) in g.tss.iter().zip(g.values.iter()) { - out.push_back(ts, val.ix(), val.name_string()); + let mut out = ContainerEvents::new(); + for (&ts, val) in g.iter_zip() { + out.push_back(ts, val.name.to_string()); } - Box::new(items_2::channelevents::ChannelEvents::Events(Box::new(out))) + Box::new(ChannelEvents::Events(Box::new(out))) } else { - trace!("consider container channel events other events {}", k.type_name()); + trace!( + "consider container channel events other events {}", + k.type_name() + ); k } } - items_2::channelevents::ChannelEvents::Status(_) => { - trace!("consider container channel events status {}", k.type_name()); + ChannelEvents::Status(_) => { + trace!( + "consider container channel events status {}", + k.type_name() + ); k } } diff --git a/src/needminbuffer.rs b/src/needminbuffer.rs index 37689b6..330ada7 100644 --- a/src/needminbuffer.rs +++ b/src/needminbuffer.rs @@ -22,7 +22,9 @@ pub struct NeedMinBuffer { impl NeedMinBuffer { pub fn new( - inp: Pin> + Send>>, + inp: Pin< + Box> + Send>, + >, ) -> Self { Self { inp, @@ -47,7 +49,10 @@ impl NeedMinBuffer { // TODO collect somewhere else impl Drop for NeedMinBuffer { fn drop(&mut self) { - debug!("NeedMinBuffer-drop {{ buf_len_histo: {:?} }}", self.buf_len_histo); + debug!( + "NeedMinBuffer-drop {{ buf_len_histo: {:?} }}", + self.buf_len_histo + ); } } diff --git a/src/plaineventsjson.rs b/src/plaineventsjson.rs index 53a587d..5c5adc6 100644 --- a/src/plaineventsjson.rs +++ b/src/plaineventsjson.rs @@ -43,7 +43,10 @@ pub async fn plain_events_json( let stream = stream.map(move |k| { on_sitemty_data!(k, |mut k: Box| { - if let Some(j) = k.as_any_mut().downcast_mut::() { + if let Some(j) = k + .as_any_mut() + .downcast_mut::() + { use items_0::AsAnyMut; match j { items_2::channelevents::ChannelEvents::Events(m) => { @@ -59,13 +62,19 @@ pub async fn plain_events_json( let k: Box = Box::new(out); Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))) } else { - trace!("consider container channel events other events {}", k.type_name()); + trace!( + "consider container channel events other events {}", + k.type_name() + ); let k: Box = Box::new(k); Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))) } } items_2::channelevents::ChannelEvents::Status(_) => { - trace!("consider container channel events status {}", k.type_name()); + trace!( + "consider container channel events status {}", + k.type_name() + ); let k: Box = Box::new(k); Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))) } diff --git a/src/rangefilter2.rs b/src/rangefilter2.rs index e3f2ec9..5d67ab7 100644 --- a/src/rangefilter2.rs +++ b/src/rangefilter2.rs @@ -3,13 +3,13 @@ mod test; use futures_util::Stream; use futures_util::StreamExt; +use items_0::merge::DrainIntoNewResult; +use items_0::merge::MergeableTy; use items_0::streamitem::sitem_err_from_string; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StatsItem; use items_0::streamitem::StreamItem; -use items_0::MergeError; -use items_2::merger::Mergeable; use netpod::log::*; use netpod::range::evrange::NanoRange; use netpod::RangeFilterStats; @@ -25,13 +25,13 @@ macro_rules! trace_emit { ($det:expr, $($arg:tt)*) => ( if false && $det { trace #[derive(Debug, thiserror::Error)] #[cstm(name = "Rangefilter")] pub enum Error { - Merge(#[from] MergeError), + DrainUnclean, } pub struct RangeFilter2 where S: Stream> + Unpin, - ITY: Mergeable, + ITY: MergeableTy, { inp: S, range: NanoRange, @@ -50,7 +50,7 @@ where impl RangeFilter2 where S: Stream> + Unpin, - ITY: Mergeable, + ITY: MergeableTy, { pub fn type_name() -> &'static str { std::any::type_name::() @@ -81,45 +81,54 @@ where } } - fn prune_high(&mut self, mut item: ITY, ts: u64) -> Result { + fn prune_high(&mut self, mut item: ITY, ts: TsNano) -> Result { + let n = item.len(); let ret = match item.find_highest_index_lt(ts) { Some(ihlt) => { - let n = item.len(); if ihlt + 1 == n { // TODO gather stats, this should be the most common case. self.stats.items_no_prune_high += 1; item } else { self.stats.items_part_prune_high += 1; - let mut dummy = item.new_empty(); - match item.drain_into(&mut dummy, (ihlt + 1, n)) { - Ok(_) => {} - Err(e) => match e { - MergeError::NotCompatible => { - error!("logic error") - } - MergeError::Full => error!("full, logic error"), - }, + match item.drain_into_new(ihlt + 1..n) { + DrainIntoNewResult::Done(_) => {} + DrainIntoNewResult::Partial(_) => { + error!("full, logic error"); + } + DrainIntoNewResult::NotCompatible => { + error!("logic error"); + } } item } } None => { + // TODO should not happen often, observe. self.stats.items_all_prune_high += 1; - item.new_empty() + match item.drain_into_new(0..n) { + DrainIntoNewResult::Done(_) => {} + DrainIntoNewResult::Partial(_) => { + error!("full, logic error"); + } + DrainIntoNewResult::NotCompatible => { + error!("logic error"); + } + } + item } }; Ok(ret) } - fn handle_item(&mut self, item: ITY) -> Result { + fn handle_item(&mut self, item: ITY) -> Result, Error> { if let Some(ts_min) = item.ts_min() { - if ts_min < self.range.beg() { + if ts_min.ns() < self.range.beg() { debug!("ITEM BEFORE RANGE (how many?)"); } } - let min = item.ts_min().map(|x| TsNano::from_ns(x).fmt()); - let max = item.ts_max().map(|x| TsNano::from_ns(x).fmt()); + let min = item.ts_min(); + let max = item.ts_max(); trace_emit!( self.trdet, "see event len {} min {:?} max {:?}", @@ -127,62 +136,74 @@ where min, max ); - let mut item = self.prune_high(item, self.range.end)?; - let ret = if self.one_before { - let lige = item.find_lowest_index_ge(self.range.beg); + let mut item = self.prune_high(item, TsNano::from_ns(self.range.end))?; + if self.one_before { + let lige = item.find_lowest_index_ge(TsNano::from_ns(self.range.beg)); trace_emit!(self.trdet, "YES one_before_range ilge {:?}", lige); match lige { Some(lige) => { if lige == 0 { if let Some(sl1) = self.slot1.take() { self.slot1 = Some(item); - sl1 + Ok(Some(sl1)) } else { - item + Ok(Some(item)) } } else { trace_emit!(self.trdet, "discarding events len {:?}", lige - 1); - let mut dummy = item.new_empty(); - item.drain_into(&mut dummy, (0, lige - 1))?; + match item.drain_into_new(0..lige - 1) { + DrainIntoNewResult::Done(_) => {} + DrainIntoNewResult::Partial(_) => return Err(Error::DrainUnclean), + DrainIntoNewResult::NotCompatible => return Err(Error::DrainUnclean), + } self.slot1 = None; - item + Ok(Some(item)) } } None => { // TODO keep stats about this case trace_emit!(self.trdet, "drain into to keep one before"); let n = item.len(); - let mut keep = item.new_empty(); - item.drain_into(&mut keep, (n.max(1) - 1, n))?; - self.slot1 = Some(keep); - item.new_empty() + match item.drain_into_new(n.max(1) - 1..n) { + DrainIntoNewResult::Done(keep) => { + self.slot1 = Some(keep); + } + DrainIntoNewResult::Partial(_) => return Err(Error::DrainUnclean), + DrainIntoNewResult::NotCompatible => return Err(Error::DrainUnclean), + } + Ok(None) } } } else { - let lige = item.find_lowest_index_ge(self.range.beg); + let lige = item.find_lowest_index_ge(TsNano::from_ns(self.range.beg)); trace_emit!(self.trdet, "NOT one_before_range ilge {:?}", lige); match lige { Some(lige) => { - let mut dummy = item.new_empty(); - item.drain_into(&mut dummy, (0, lige))?; - item + match item.drain_into_new(0..lige) { + DrainIntoNewResult::Done(_) => {} + DrainIntoNewResult::Partial(_) => return Err(Error::DrainUnclean), + DrainIntoNewResult::NotCompatible => return Err(Error::DrainUnclean), + } + Ok(Some(item)) } None => { // TODO count case for stats - item.new_empty() + Ok(None) } } - }; - Ok(ret) + } } } impl RangeFilter2 where S: Stream> + Unpin, - ITY: Mergeable, + ITY: MergeableTy, { - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll::Item>> { + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context, + ) -> Poll::Item>> { use Poll::*; loop { break if self.complete { @@ -199,25 +220,35 @@ where } else if self.inp_done { self.raco_done = true; if self.have_range_complete { - Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) + Ready(Some(Ok(StreamItem::DataItem( + RangeCompletableItem::RangeComplete, + )))) } else { continue; } } else { match self.inp.poll_next_unpin(cx) { Ready(Some(item)) => match item { - Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) => match self.handle_item(item) { - Ok(item) => { - trace_emit!(self.trdet, "emit {}", TsMsVecFmt(Mergeable::tss(&item).iter())); - let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))); - Ready(Some(item)) + Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) => { + match self.handle_item(item) { + Ok(Some(item)) => { + trace_emit!( + self.trdet, + "emit {}", + TsMsVecFmt(MergeableTy::tss_for_testing(&item).iter()) + ); + let item = + Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))); + Ready(Some(item)) + } + Ok(None) => continue, + Err(e) => { + error!("sees: {e}"); + self.inp_done = true; + Ready(Some(sitem_err_from_string(e))) + } } - Err(e) => { - error!("sees: {e}"); - self.inp_done = true; - Ready(Some(sitem_err_from_string(e))) - } - }, + } Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) => { self.have_range_complete = true; continue; @@ -227,7 +258,9 @@ where Ready(None) => { self.inp_done = true; if let Some(sl1) = self.slot1.take() { - Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(sl1))))) + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data( + sl1, + ))))) } else { continue; } @@ -242,7 +275,7 @@ where impl Stream for RangeFilter2 where S: Stream> + Unpin, - ITY: Mergeable, + ITY: MergeableTy, { type Item = Sitemty; @@ -257,20 +290,11 @@ where impl fmt::Debug for RangeFilter2 where S: Stream> + Unpin, - ITY: Mergeable, + ITY: MergeableTy, { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("RangeFilter2").field("stats", &self.stats).finish() - } -} - -impl Drop for RangeFilter2 -where - S: Stream> + Unpin, - ITY: Mergeable, -{ - fn drop(&mut self) { - // Self::type_name() - debug!("drop {:?}", self); + f.debug_struct("RangeFilter2") + .field("stats", &self.stats) + .finish() } } diff --git a/src/streamtimeout.rs b/src/streamtimeout.rs index a23cb26..bca8071 100644 --- a/src/streamtimeout.rs +++ b/src/streamtimeout.rs @@ -41,7 +41,10 @@ where self.timeout_fut = self.timeout_provider.timeout_intervals(ivl) } - fn handle_timeout(self: Pin<&mut Self>, cx: &mut Context) -> Poll::Item>> { + fn handle_timeout( + self: Pin<&mut Self>, + cx: &mut Context, + ) -> Poll::Item>> { use Poll::*; let tsnow = Instant::now(); if self.last_seen + self.ivl < tsnow { @@ -56,7 +59,10 @@ where } } - fn handle_inp_pending(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll::Item>> { + fn handle_inp_pending( + mut self: Pin<&mut Self>, + cx: &mut Context, + ) -> Poll::Item>> { use Poll::*; match self.timeout_fut.poll_unpin(cx) { Ready(()) => self.handle_timeout(cx), diff --git a/src/tcprawclient.rs b/src/tcprawclient.rs index c17b9f3..ce63b79 100644 --- a/src/tcprawclient.rs +++ b/src/tcprawclient.rs @@ -98,7 +98,13 @@ pub trait HttpSimplePost: Send { &self, req: http::Request>, ) -> Pin< - Box>> + Send>, + Box< + dyn Future< + Output = http::Response< + http_body_util::combinators::UnsyncBoxBody, + >, + > + Send, + >, >; } @@ -132,7 +138,10 @@ pub async fn x_processed_event_blobs_stream_from_node_http( let frame1 = make_node_command_frame(subq.clone())?; let item = sitem_data(frame1.clone()); let buf = item.make_frame_dyn()?.freeze(); - let url = node.baseurl().join("/api/4/private/eventdata/frames").unwrap(); + let url = node + .baseurl() + .join("/api/4/private/eventdata/frames") + .unwrap(); debug!("open_event_data_streams_http post {url}"); let uri: Uri = url.as_str().parse().unwrap(); let body = http_body_util::Full::new(buf); diff --git a/src/test.rs b/src/test.rs index 6a44f14..a7b2594 100644 --- a/src/test.rs +++ b/src/test.rs @@ -7,11 +7,10 @@ use futures_util::stream; use futures_util::Stream; use items_0::streamitem::sitem_data; use items_0::streamitem::Sitemty; -use items_0::Appendable; -use items_0::Empty; +use items_2::binning::container_events::ContainerEvents; use items_2::channelevents::ChannelEvents; -use items_2::eventsdim0::EventsDim0; use netpod::timeunits::SEC; +use netpod::TsNano; use std::pin::Pin; #[derive(Debug, thiserror::Error)] @@ -23,9 +22,9 @@ type BoxedEventStream = Pin> + Send // TODO use some xorshift generator. fn inmem_test_events_d0_i32_00() -> BoxedEventStream { - let mut evs = EventsDim0::empty(); - evs.push(SEC * 1, 1, 10001); - evs.push(SEC * 4, 4, 10004); + let mut evs = ContainerEvents::new(); + evs.push_back(TsNano::from_ns(SEC * 1), 10001); + evs.push_back(TsNano::from_ns(SEC * 4), 10004); let cev = ChannelEvents::Events(Box::new(evs)); let item = sitem_data(cev); let stream = stream::iter([item]); @@ -33,8 +32,8 @@ fn inmem_test_events_d0_i32_00() -> BoxedEventStream { } fn inmem_test_events_d0_i32_01() -> BoxedEventStream { - let mut evs = EventsDim0::empty(); - evs.push(SEC * 2, 2, 10002); + let mut evs = ContainerEvents::new(); + evs.push_back(TsNano::from_ns(SEC * 2), 10002); let cev = ChannelEvents::Events(Box::new(evs)); let item = sitem_data(cev); let stream = stream::iter([item]); diff --git a/src/test/events_reader.rs b/src/test/events_reader.rs index fca0527..c4132c7 100644 --- a/src/test/events_reader.rs +++ b/src/test/events_reader.rs @@ -1,6 +1,5 @@ use crate::timebin::cached::reader::EventsReadProvider; use crate::timebin::cached::reader::EventsReading; -use futures_util::StreamExt; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; @@ -24,8 +23,8 @@ impl TestEventsReader { impl EventsReadProvider for TestEventsReader { fn read(&self, evq: EventsSubQuery) -> EventsReading { - let stream = items_2::testgen::events_gen::old_events_gen_dim0_f32_v00(self.range.clone()); - let stream = stream + let iter = items_2::testgen::events_gen::new_events_gen_dim1_f32_v00(self.range.clone()); + let iter = iter .map(|x| { let x = Box::new(x); let x = ChannelEvents::Events(x); @@ -36,9 +35,9 @@ impl EventsReadProvider for TestEventsReader { use RangeCompletableItem::*; use StreamItem::*; let item1 = Ok(DataItem(RangeComplete)); - futures_util::stream::iter([item1]) + [item1].into_iter() }); - let stream = Box::pin(stream); + let stream = Box::pin(futures_util::stream::iter(iter)); let ret = EventsReading::new(stream); ret } diff --git a/src/test/timebin/fromlayers.rs b/src/test/timebin/fromlayers.rs index 0285920..c4ce2cb 100644 --- a/src/test/timebin/fromlayers.rs +++ b/src/test/timebin/fromlayers.rs @@ -88,3 +88,65 @@ fn timebin_from_layers() { .unwrap(); rt.block_on(timebin_from_layers_inner()).unwrap() } + +async fn timebin_from_layers_1layer_inner() -> Result<(), Error> { + let ctx = Arc::new(ReqCtx::for_test()); + let ch_conf = ChannelTypeConfigGen::Scylla(ChConf::new( + "testing", + 123, + SeriesKind::ChannelData, + ScalarType::F32, + Shape::Scalar, + "basictest-f32", + )); + let cache_usage = CacheUsage::Ignore; + let transform_query = TransformQuery::default_time_binned(); + let nano_range = NanoRange { + beg: 1000 * 1000 * 1000 * 1, + end: 1000 * 1000 * 1000 * 2, + }; + let cache_read_provider = Arc::new(DummyCacheReadProvider::new()); + let events_read_provider = Arc::new(TestEventsReader::new(nano_range.clone())); + // let one_before_range = true; + // let series_range = SeriesRange::TimeRange(nano_range.clone()); + // let select = EventsSubQuerySelect::new( + // ch_conf.clone(), + // series_range, + // one_before_range, + // transform_query.clone(), + // ); + let settings = EventsSubQuerySettings::default(); + // let reqid = ctx.reqid().into(); + let log_level = "INFO"; + // let query = EventsSubQuery::from_parts(select, settings.clone(), reqid, log_level.into()); + let bin_len_layers = [20, 100].into_iter().map(DtMs::from_ms_u64).collect(); + let do_time_weight = true; + let bin_len = DtMs::from_ms_u64(200); + let range = BinnedRange::from_nano_range(nano_range, bin_len); + let mut stream = TimeBinnedFromLayers::new( + ch_conf, + cache_usage, + transform_query, + settings, + log_level.into(), + ctx, + range, + do_time_weight, + bin_len_layers, + cache_read_provider, + events_read_provider, + )?; + while let Some(x) = stream.next().await { + let item = x.map_err(|e| Error::Msg(e.to_string()))?; + trace!("item {:?}", item); + } + Ok(()) +} + +#[test] +fn timebin_from_layers_1layer() { + let rt = tokio::runtime::Builder::new_current_thread() + .build() + .unwrap(); + rt.block_on(timebin_from_layers_1layer_inner()).unwrap() +} diff --git a/src/timebin/fromlayers.rs b/src/timebin/fromlayers.rs index 05bc90a..ea4b06b 100644 --- a/src/timebin/fromlayers.rs +++ b/src/timebin/fromlayers.rs @@ -1,13 +1,14 @@ use super::cached::reader::CacheReadProvider; use super::cached::reader::EventsReadProvider; +use crate::log::*; use crate::timebin::fromevents::BinnedFromEvents; +use crate::timebin::gapfill::GapFill; use crate::timebin::grid::find_next_finer_bin_len; use futures_util::Stream; use futures_util::StreamExt; use items_0::streamitem::Sitemty; use items_0::timebin::BinsBoxed; use items_2::binning::timeweight::timeweight_bins_stream::BinnedBinsTimeweightStream; -use netpod::log::*; use netpod::query::CacheUsage; use netpod::range::evrange::SeriesRange; use netpod::BinnedRange; @@ -67,7 +68,7 @@ impl TimeBinnedFromLayers { let bin_len = DtMs::from_ms_u64(range.bin_len.ms()); if bin_len_layers.contains(&bin_len) { debug!("{}::new bin_len in layers {:?}", Self::type_name(), range); - let inp = super::gapfill::GapFill::new( + let inp = GapFill::new( "FromLayers-ongrid".into(), ch_conf.clone(), cache_usage.clone(), @@ -84,6 +85,11 @@ impl TimeBinnedFromLayers { let ret = Self { inp: Box::pin(inp) }; Ok(ret) } else { + debug!( + "{}::new bin_len off layers {:?}", + Self::type_name(), + range + ); match find_next_finer_bin_len(bin_len, &bin_len_layers) { Some(finer) => { if bin_len.ms() % finer.ms() != 0 { @@ -96,7 +102,7 @@ impl TimeBinnedFromLayers { finer, range_finer ); - let inp = super::gapfill::GapFill::new( + let inp = GapFill::new( "FromLayers-finergrid".into(), ch_conf.clone(), cache_usage.clone(), diff --git a/src/timebin/gapfill.rs b/src/timebin/gapfill.rs index 1a87fc3..b95b5f9 100644 --- a/src/timebin/gapfill.rs +++ b/src/timebin/gapfill.rs @@ -1,5 +1,6 @@ use super::cached::reader::CacheReadProvider; use super::cached::reader::EventsReadProvider; +use crate::log::*; use crate::timebin::fromevents::BinnedFromEvents; use futures_util::FutureExt; use futures_util::Stream; @@ -10,7 +11,6 @@ use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; use items_0::timebin::BinsBoxed; use items_2::binning::timeweight::timeweight_bins_stream::BinnedBinsTimeweightStream; -use netpod::log::*; use netpod::query::CacheUsage; use netpod::range::evrange::NanoRange; use netpod::range::evrange::SeriesRange; diff --git a/src/timebin/timebin.rs b/src/timebin/timebin.rs index e69de29..8b13789 100644 --- a/src/timebin/timebin.rs +++ b/src/timebin/timebin.rs @@ -0,0 +1 @@ + diff --git a/src/timebinnedjson.rs b/src/timebinnedjson.rs index 9b7c794..30545a8 100644 --- a/src/timebinnedjson.rs +++ b/src/timebinnedjson.rs @@ -110,7 +110,7 @@ pub async fn timebinnable_stream_sf_databuffer_channelevents( use StreamItem::*; match k { Ok(DataItem(Data(ChannelEvents::Events(k)))) => { - let k = k.to_dim0_f32_for_binning(); + // let k = k.to_dim0_f32_for_binning(); Ok(StreamItem::DataItem(RangeCompletableItem::Data( ChannelEvents::Events(k), )))