From 8082271c2ac81656e2f19886290b844d1a968d14 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 30 Nov 2022 16:31:31 +0100 Subject: [PATCH] Refactor remote stream decode --- daqbufp2/Cargo.toml | 1 + daqbufp2/src/test.rs | 44 ++- daqbufp2/src/test/api4.rs | 1 + daqbufp2/src/test/{ => api4}/eventsjson.rs | 57 +++ items/src/frame.rs | 58 +-- items/src/items.rs | 88 ++--- items_2/src/eventsdim0.rs | 36 +- items_2/src/merger.rs | 10 +- netpod/src/api4.rs | 1 + netpod/src/api4/events.rs | 1 + netpod/src/netpod.rs | 1 + nodenet/src/conn.rs | 9 +- streams/Cargo.toml | 2 +- streams/src/frames/inmem.rs | 368 ++++++++----------- streams/src/lib.rs | 1 + streams/src/merge.rs | 4 + streams/src/slidebuf.rs | 401 +++++++++++++++++++++ 17 files changed, 780 insertions(+), 303 deletions(-) rename daqbufp2/src/test/{ => api4}/eventsjson.rs (58%) create mode 100644 netpod/src/api4.rs create mode 100644 netpod/src/api4/events.rs create mode 100644 streams/src/slidebuf.rs diff --git a/daqbufp2/Cargo.toml b/daqbufp2/Cargo.toml index 93d0bf8..916d8aa 100644 --- a/daqbufp2/Cargo.toml +++ b/daqbufp2/Cargo.toml @@ -29,6 +29,7 @@ httpret = { path = "../httpret" } httpclient = { path = "../httpclient" } disk = { path = "../disk" } items_0 = { path = "../items_0" } +items_2 = { path = "../items_2" } items = { path = "../items" } streams = { path = "../streams" } diff --git a/daqbufp2/src/test.rs b/daqbufp2/src/test.rs index 0a07c9f..99dabb1 100644 --- a/daqbufp2/src/test.rs +++ b/daqbufp2/src/test.rs @@ -8,14 +8,54 @@ pub mod binnedjson; #[cfg(test)] mod events; #[cfg(test)] -mod eventsjson; -#[cfg(test)] mod timeweightedjson; use bytes::BytesMut; use err::Error; use std::future::Future; +fn f32_iter_cmp_near(a: A, b: B) -> bool +where + A: IntoIterator, + B: IntoIterator, +{ + let mut a = a.into_iter(); + let mut b = b.into_iter(); + loop { + let x = a.next(); + let y = b.next(); + if let (Some(x), Some(y)) = (x, y) { + let x = { + let mut a = x.to_ne_bytes(); + a[0] &= 0xf0; + f32::from_ne_bytes(a) + }; + let y = { + let mut a = y.to_ne_bytes(); + a[0] &= 0xf0; + f32::from_ne_bytes(a) + }; + if x != y { + return false; + } + } else if x.is_some() || y.is_some() { + return false; + } else { + return true; + } + } +} + +#[test] +fn test_f32_iter_cmp_near() { + let a = [-127.553e17]; + let b = [-127.554e17]; + assert_eq!(f32_iter_cmp_near(a, b), false); + let a = [-127.55300e17]; + let b = [-127.55301e17]; + assert_eq!(f32_iter_cmp_near(a, b), true); +} + fn run_test(f: F) -> Result<(), Error> where F: Future> + Send, diff --git a/daqbufp2/src/test/api4.rs b/daqbufp2/src/test/api4.rs index e69de29..c8a9d81 100644 --- a/daqbufp2/src/test/api4.rs +++ b/daqbufp2/src/test/api4.rs @@ -0,0 +1 @@ +pub mod eventsjson; diff --git a/daqbufp2/src/test/eventsjson.rs b/daqbufp2/src/test/api4/eventsjson.rs similarity index 58% rename from daqbufp2/src/test/eventsjson.rs rename to daqbufp2/src/test/api4/eventsjson.rs index 205aa0f..f28c71f 100644 --- a/daqbufp2/src/test/eventsjson.rs +++ b/daqbufp2/src/test/api4/eventsjson.rs @@ -1,5 +1,6 @@ use crate::err::ErrConv; use crate::nodes::require_test_hosts_running; +use crate::test::f32_iter_cmp_near; use chrono::{DateTime, Utc}; use err::Error; use http::StatusCode; @@ -34,6 +35,62 @@ fn events_plain_json_00() -> Result<(), Error> { taskrun::run(fut) } +#[test] +fn events_plain_json_01() -> Result<(), Error> { + let fut = async { + let rh = require_test_hosts_running()?; + let cluster = &rh.cluster; + let jsv = events_plain_json( + Channel { + backend: "test-disk-databuffer".into(), + name: "scalar-i32-be".into(), + series: None, + }, + "1970-01-01T00:20:10.000Z", + "1970-01-01T00:20:13.000Z", + cluster, + true, + 4, + ) + .await?; + let res: items_2::eventsdim0::EventsDim0CollectorOutput = serde_json::from_value(jsv).unwrap(); + assert_eq!(res.ts_anchor_sec(), 1210); + assert_eq!(res.pulse_anchor(), 2420); + let exp = [2420., 2421., 2422., 2423., 2424., 2425.]; + assert_eq!(f32_iter_cmp_near(res.values_to_f32(), exp), true); + assert_eq!(res.range_complete(), true); + assert_eq!(res.timed_out(), false); + Ok(()) + }; + taskrun::run(fut) +} + +#[test] +fn events_plain_json_02_range_incomplete() -> Result<(), Error> { + let fut = async { + let rh = require_test_hosts_running()?; + let cluster = &rh.cluster; + let jsv = events_plain_json( + Channel { + backend: "test-disk-databuffer".into(), + name: "scalar-i32-be".into(), + series: None, + }, + "1970-01-03T23:59:55.000Z", + "1970-01-04T00:00:01.000Z", + cluster, + true, + 4, + ) + .await?; + let res: items_2::eventsdim0::EventsDim0CollectorOutput = serde_json::from_value(jsv).unwrap(); + assert_eq!(res.range_complete(), false); + assert_eq!(res.timed_out(), false); + Ok(()) + }; + taskrun::run(fut) +} + // TODO improve by a more information-rich return type. async fn events_plain_json( channel: Channel, diff --git a/items/src/frame.rs b/items/src/frame.rs index f820f64..2a456fe 100644 --- a/items/src/frame.rs +++ b/items/src/frame.rs @@ -2,9 +2,8 @@ use crate::inmem::InMemoryFrame; use crate::{ContainsError, FrameDecodable, FrameType, LogItem, StatsItem}; use crate::{ERROR_FRAME_TYPE_ID, INMEM_FRAME_ENCID, INMEM_FRAME_HEAD, INMEM_FRAME_MAGIC}; use crate::{LOG_FRAME_TYPE_ID, RANGE_COMPLETE_FRAME_TYPE_ID, STATS_FRAME_TYPE_ID, TERM_FRAME_TYPE_ID}; -use bincode::config::{ - FixintEncoding, LittleEndian, RejectTrailing, WithOtherEndian, WithOtherIntEncoding, WithOtherTrailing, -}; +use bincode::config::{FixintEncoding, LittleEndian, RejectTrailing}; +use bincode::config::{WithOtherEndian, WithOtherIntEncoding, WithOtherTrailing}; use bincode::DefaultOptions; use bytes::{BufMut, BytesMut}; use err::Error; @@ -83,15 +82,35 @@ where ::deserialize(&mut de).map_err(|e| format!("{e}").into()) } +pub fn encode_to_vec(item: S) -> Result, Error> +where + S: Serialize, +{ + serde_json::to_vec(&item).map_err(|e| e.into()) +} + +pub fn decode_from_slice(buf: &[u8]) -> Result +where + T: for<'de> serde::Deserialize<'de>, +{ + serde_json::from_slice(buf).map_err(|e| e.into()) +} + pub fn make_frame_2(item: &T, fty: u32) -> Result where T: erased_serde::Serialize, { trace!("make_frame_2 fty {:x}", fty); let mut out = Vec::new(); - let mut ser = bincode_ser(&mut out); //let mut ser = rmp_serde::Serializer::new(&mut out).with_struct_map(); //let writer = ciborium::ser::into_writer(&item, &mut out).unwrap(); + #[cfg(DIS)] + let ser2 = { + let mut ser = bincode_ser(&mut out); + let mut ser2 = ::erase(&mut ser); + let _ = ser2; + }; + let mut ser = serde_json::Serializer::new(&mut out); let mut ser2 = ::erase(&mut ser); match item.erased_serialize(&mut ser2) { Ok(_) => { @@ -126,7 +145,7 @@ where // TODO remove duplication for these similar `make_*_frame` functions: pub fn make_error_frame(error: &::err::Error) -> Result { - match bincode_to_vec(error) { + match encode_to_vec(error) { Ok(enc) => { let mut h = crc32fast::Hasher::new(); h.update(&enc); @@ -155,7 +174,7 @@ pub fn make_error_frame(error: &::err::Error) -> Result { // TODO can I remove this usage? pub fn make_log_frame(item: &LogItem) -> Result { warn!("make_log_frame {item:?}"); - match bincode_to_vec(item) { + match encode_to_vec(item) { Ok(enc) => { let mut h = crc32fast::Hasher::new(); h.update(&enc); @@ -180,7 +199,7 @@ pub fn make_log_frame(item: &LogItem) -> Result { } pub fn make_stats_frame(item: &StatsItem) -> Result { - match bincode_to_vec(item) { + match encode_to_vec(item) { Ok(enc) => { let mut h = crc32fast::Hasher::new(); h.update(&enc); @@ -204,6 +223,7 @@ pub fn make_stats_frame(item: &StatsItem) -> Result { } pub fn make_range_complete_frame() -> Result { + warn!("make_range_complete_frame"); let enc = []; let mut h = crc32fast::Hasher::new(); h.update(&enc); @@ -259,7 +279,7 @@ where ))); } if frame.tyid() == ERROR_FRAME_TYPE_ID { - let k: ::err::Error = match bincode_from_slice(frame.buf()) { + let k: ::err::Error = match decode_from_slice(frame.buf()) { Ok(item) => item, Err(e) => { error!("ERROR deserialize len {} ERROR_FRAME_TYPE_ID", frame.buf().len()); @@ -271,7 +291,7 @@ where }; Ok(T::from_error(k)) } else if frame.tyid() == LOG_FRAME_TYPE_ID { - let k: LogItem = match bincode_from_slice(frame.buf()) { + let k: LogItem = match decode_from_slice(frame.buf()) { Ok(item) => item, Err(e) => { error!("ERROR deserialize len {} LOG_FRAME_TYPE_ID", frame.buf().len()); @@ -282,23 +302,8 @@ where } }; Ok(T::from_log(k)) - } else if frame.tyid() == LOG_FRAME_TYPE_ID { - let _: crate::Sitemty<()> = match bincode_from_slice(frame.buf()) { - Ok(item) => { - error!("GOOD DECODE OF A FULL LOG FRAME SITEMTY {item:?}"); - item - } - Err(e) => { - error!("ERROR deserialize len {} LOG_FRAME_TYPE_ID", frame.buf().len()); - let n = frame.buf().len().min(128); - let s = String::from_utf8_lossy(&frame.buf()[..n]); - error!("frame.buf as string: {:?}", s); - Err(e)? - } - }; - Err(Error::with_msg_no_trace("BAD")) } else if frame.tyid() == STATS_FRAME_TYPE_ID { - let k: StatsItem = match bincode_from_slice(frame.buf()) { + let k: StatsItem = match decode_from_slice(frame.buf()) { Ok(item) => item, Err(e) => { error!("ERROR deserialize len {} STATS_FRAME_TYPE_ID", frame.buf().len()); @@ -310,6 +315,7 @@ where }; Ok(T::from_stats(k)) } else if frame.tyid() == RANGE_COMPLETE_FRAME_TYPE_ID { + warn!("decode_frame SEE RANGE COMPLETE FRAME TYPE"); // There is currently no content in this variant. Ok(T::from_range_complete()) } else { @@ -322,7 +328,7 @@ where frame ))) } else { - match bincode_from_slice(frame.buf()) { + match decode_from_slice(frame.buf()) { Ok(item) => Ok(item), Err(e) => { error!("ERROR deserialize len {} tyid {:x}", frame.buf().len(), frame.tyid()); diff --git a/items/src/items.rs b/items/src/items.rs index c4c05a1..2c652c473 100644 --- a/items/src/items.rs +++ b/items/src/items.rs @@ -24,7 +24,7 @@ use netpod::log::*; use netpod::timeunits::{MS, SEC}; use netpod::{log::Level, AggKind, EventDataReadStats, NanoRange, Shape}; use netpod::{DiskStats, RangeFilterStats, ScalarType}; -use serde::de::{self, DeserializeOwned, Visitor}; +use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize, Serializer}; use std::any::Any; use std::collections::VecDeque; @@ -99,6 +99,15 @@ impl LogItem { pub type Sitemty = Result>, Error>; +#[macro_export] +macro_rules! on_sitemty_range_complete { + ($item:expr, $ex:expr) => { + if let Ok($crate::StreamItem::DataItem($crate::RangeCompletableItem::RangeComplete)) = $item { + $ex + } + }; +} + impl FrameType for Sitemty where T: FrameType, @@ -122,29 +131,13 @@ pub fn sitem_data(x: X) -> Sitemty { Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))) } -struct VisitLevel; - -impl<'de> Visitor<'de> for VisitLevel { - type Value = u32; - - fn expecting(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - write!(fmt, "expect u32 Level code") - } - - fn visit_u32(self, v: u32) -> Result - where - E: de::Error, - { - Ok(v) - } -} - mod levelserde { use super::Level; - use super::VisitLevel; + use serde::de::{self, Visitor}; use serde::{Deserializer, Serializer}; + use std::fmt; - pub fn serialize(t: &Level, s: S) -> Result + pub fn serialize(t: &Level, se: S) -> Result where S: Serializer, { @@ -155,32 +148,44 @@ mod levelserde { Level::DEBUG => 4, Level::TRACE => 5, }; - s.serialize_u32(g) + se.serialize_u32(g) } - pub fn deserialize<'de, D>(d: D) -> Result + struct VisitLevel; + + impl VisitLevel { + fn from_u32(x: u32) -> Level { + match x { + 1 => Level::ERROR, + 2 => Level::WARN, + 3 => Level::INFO, + 4 => Level::DEBUG, + 5 => Level::TRACE, + _ => Level::TRACE, + } + } + } + + impl<'de> Visitor<'de> for VisitLevel { + type Value = Level; + + fn expecting(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "expect Level code") + } + + fn visit_u64(self, val: u64) -> Result + where + E: de::Error, + { + Ok(VisitLevel::from_u32(val as _)) + } + } + + pub fn deserialize<'de, D>(de: D) -> Result where D: Deserializer<'de>, { - match d.deserialize_u32(VisitLevel) { - Ok(level) => { - let g = if level == 1 { - Level::ERROR - } else if level == 2 { - Level::WARN - } else if level == 3 { - Level::INFO - } else if level == 4 { - Level::DEBUG - } else if level == 5 { - Level::TRACE - } else { - Level::TRACE - }; - Ok(g) - } - Err(e) => Err(e), - } + de.deserialize_u32(VisitLevel) } } @@ -302,6 +307,7 @@ where T: Sized + serde::Serialize + FrameType, { fn make_frame(&self) -> Result { + info!("-------- make_frame for Sitemty"); match self { Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))) => { let frame_type_id = k.frame_type_id(); diff --git a/items_2/src/eventsdim0.rs b/items_2/src/eventsdim0.rs index 06acb91..76afccb 100644 --- a/items_2/src/eventsdim0.rs +++ b/items_2/src/eventsdim0.rs @@ -145,7 +145,7 @@ impl WithLen for EventsDim0Collector { } } -#[derive(Debug, Serialize)] +#[derive(Debug, Serialize, Deserialize)] pub struct EventsDim0CollectorOutput { #[serde(rename = "tsAnchor")] ts_anchor_sec: u64, @@ -157,10 +157,11 @@ pub struct EventsDim0CollectorOutput { pulse_anchor: u64, #[serde(rename = "pulseOff")] pulse_off: VecDeque, + #[serde(rename = "values")] values: VecDeque, - #[serde(skip_serializing_if = "crate::bool_is_false", rename = "finalisedRange")] + #[serde(rename = "finalisedRange", default, skip_serializing_if = "crate::bool_is_false")] range_complete: bool, - #[serde(skip_serializing_if = "crate::bool_is_false", rename = "timedOut")] + #[serde(rename = "timedOut", default, skip_serializing_if = "crate::bool_is_false")] timed_out: bool, } @@ -168,6 +169,35 @@ impl EventsDim0CollectorOutput { pub fn len(&self) -> usize { self.values.len() } + + pub fn ts_anchor_sec(&self) -> u64 { + self.ts_anchor_sec + } + + pub fn ts_off_ms(&self) -> &VecDeque { + &self.ts_off_ms + } + + pub fn pulse_anchor(&self) -> u64 { + self.pulse_anchor + } + + pub fn pulse_off(&self) -> &VecDeque { + &self.pulse_off + } + + /// Note: only used for unit tests. + pub fn values_to_f32(&self) -> VecDeque { + self.values.iter().map(|x| x.as_prim_f32_b()).collect() + } + + pub fn range_complete(&self) -> bool { + self.range_complete + } + + pub fn timed_out(&self) -> bool { + self.timed_out + } } impl items_0::AsAnyRef for EventsDim0CollectorOutput { diff --git a/items_2/src/merger.rs b/items_2/src/merger.rs index 1106300..0f26efd 100644 --- a/items_2/src/merger.rs +++ b/items_2/src/merger.rs @@ -10,19 +10,19 @@ use std::task::{Context, Poll}; #[allow(unused)] macro_rules! trace2 { - ($($arg:tt)*) => (); + (D$($arg:tt)*) => (); ($($arg:tt)*) => (eprintln!($($arg)*)); } #[allow(unused)] macro_rules! trace3 { - ($($arg:tt)*) => (); + (D$($arg:tt)*) => (); ($($arg:tt)*) => (eprintln!($($arg)*)); } #[allow(unused)] macro_rules! trace4 { - ($($arg:tt)*) => (); + (D$($arg:tt)*) => (); ($($arg:tt)*) => (eprintln!($($arg)*)); } @@ -332,9 +332,9 @@ where use ControlFlow::*; use Poll::*; match Self::refill(Pin::new(&mut self), cx) { - Break(Ready(e)) => Break(Ready(Some(Err(e)))), - Break(Pending) => Self::poll3(self, cx, true), Continue(()) => Self::poll3(self, cx, false), + Break(Pending) => Self::poll3(self, cx, true), + Break(Ready(e)) => Break(Ready(Some(Err(e)))), } } } diff --git a/netpod/src/api4.rs b/netpod/src/api4.rs new file mode 100644 index 0000000..a9970c2 --- /dev/null +++ b/netpod/src/api4.rs @@ -0,0 +1 @@ +pub mod events; diff --git a/netpod/src/api4/events.rs b/netpod/src/api4/events.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/netpod/src/api4/events.rs @@ -0,0 +1 @@ + diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index e351880..9ad22b2 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -2,6 +2,7 @@ pub mod histo; pub mod query; pub mod status; pub mod streamext; +pub mod api4; use crate::log::*; use bytes::Bytes; diff --git a/nodenet/src/conn.rs b/nodenet/src/conn.rs index b61ef3b..bcc16ff 100644 --- a/nodenet/src/conn.rs +++ b/nodenet/src/conn.rs @@ -107,8 +107,7 @@ async fn events_conn_handler_inner_try( return Err((e, netout).into()); } - let mut p1: Pin> + Send>> = if evq.channel().backend() == "test-inmem" - { + let p1: Pin> + Send>> = if evq.channel().backend() == "test-inmem" { warn!("TEST BACKEND DATA"); use netpod::timeunits::MS; let node_count = node_config.node_config.cluster.nodes.len(); @@ -215,6 +214,12 @@ async fn events_conn_handler_inner_try( }; stream }; + + let p1 = p1.inspect(|x| { + items::on_sitemty_range_complete!(x, warn!("GOOD ----------- SEE RangeComplete in conn.rs")); + }); + + let mut p1 = p1; let mut buf_len_histo = HistoLog2::new(5); while let Some(item) = p1.next().await { let item = item.make_frame(); diff --git a/streams/Cargo.toml b/streams/Cargo.toml index 5a6979f..437710c 100644 --- a/streams/Cargo.toml +++ b/streams/Cargo.toml @@ -12,7 +12,7 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" serde_cbor = "0.11.1" erased-serde = "0.3.23" -bytes = "1.0.1" +bytes = "1.3" arrayref = "0.3.6" crc32fast = "1.3.2" byteorder = "1.4.3" diff --git a/streams/src/frames/inmem.rs b/streams/src/frames/inmem.rs index 18a6b5b..96d667d 100644 --- a/streams/src/frames/inmem.rs +++ b/streams/src/frames/inmem.rs @@ -1,4 +1,5 @@ -use bytes::{BufMut, BytesMut}; +use crate::slidebuf::SlideBuf; +use bytes::Bytes; use err::Error; use futures_util::{pin_mut, Stream}; use items::inmem::InMemoryFrame; @@ -9,23 +10,26 @@ use std::pin::Pin; use std::task::{Context, Poll}; use tokio::io::{AsyncRead, ReadBuf}; -/** -Interprets a byte stream as length-delimited frames. +impl err::ToErr for crate::slidebuf::Error { + fn to_err(self) -> Error { + Error::with_msg_no_trace(format!("{self}")) + } +} -Emits each frame as a single item. Therefore, each item must fit easily into memory. -*/ +/// Interprets a byte stream as length-delimited frames. +/// +/// Emits each frame as a single item. Therefore, each item must fit easily into memory. pub struct InMemoryFrameAsyncReadStream where T: AsyncRead + Unpin, { inp: T, - buf: BytesMut, - bufcap: usize, - wp: usize, - tryparse: bool, - errored: bool, - completed: bool, + buf: SlideBuf, + need_min: usize, + done: bool, + complete: bool, inp_bytes_consumed: u64, + npoll: u64, } impl InMemoryFrameAsyncReadStream @@ -33,182 +37,110 @@ where T: AsyncRead + Unpin, { pub fn new(inp: T, bufcap: usize) -> Self { - let mut t = Self { + Self { inp, - buf: BytesMut::new(), - bufcap, - wp: 0, - tryparse: false, - errored: false, - completed: false, + buf: SlideBuf::new(bufcap), + need_min: INMEM_FRAME_HEAD, + done: false, + complete: false, inp_bytes_consumed: 0, - }; - t.buf = t.empty_buf(); - t - } - - fn empty_buf(&self) -> BytesMut { - let mut buf = BytesMut::with_capacity(self.bufcap); - buf.resize(buf.capacity(), 0); - buf + npoll: 0, + } } fn poll_upstream(&mut self, cx: &mut Context) -> Poll> { - if true || self.wp > 0 { - let mut bnew = self.empty_buf(); - assert!(self.buf.len() >= self.wp); - assert!(bnew.capacity() >= self.wp); - trace!( - "InMemoryFrameAsyncReadStream re-use {} bytes from previous i/o", - self.wp, - ); - bnew[..].as_mut().put_slice(&self.buf[..self.wp]); - self.buf = bnew; - } - trace!("prepare read from wp {} self.buf.len() {}", self.wp, self.buf.len(),); - let gg = self.buf.len() - self.wp; - let mut buf2 = ReadBuf::new(&mut self.buf[self.wp..]); - if gg < 1 || gg > 1024 * 1024 * 40 { - use bytes::Buf; - panic!( - "have gg {} len {} cap {} rem {} rem mut {} self.wp {}", - gg, - self.buf.len(), - self.buf.capacity(), - self.buf.remaining(), - self.buf.remaining_mut(), - self.wp, - ); - } - assert!(buf2.remaining() == gg); - assert!(buf2.capacity() == gg); - assert!(buf2.filled().len() == 0); - let j = &mut self.inp; - pin_mut!(j); + trace!("poll_upstream"); use Poll::*; - match AsyncRead::poll_read(j, cx, &mut buf2) { - Ready(Ok(_)) => { - let n1 = buf2.filled().len(); - trace!("InMemoryFrameAsyncReadStream READ {} FROM UPSTREAM", n1); - Ready(Ok(n1)) + let mut buf = ReadBuf::new(self.buf.available_writable_area(self.need_min - self.buf.len())?); + let inp = &mut self.inp; + pin_mut!(inp); + match AsyncRead::poll_read(inp, cx, &mut buf) { + Ready(Ok(())) => { + let n = buf.filled().len(); + self.buf.wadv(n)?; + trace!("InMemoryFrameAsyncReadStream READ {} FROM UPSTREAM", n); + Ready(Ok(n)) } Ready(Err(e)) => Ready(Err(e.into())), Pending => Pending, } } - fn tryparse( - &mut self, - buf: BytesMut, - wp: usize, - ) -> (Option>>, BytesMut, usize) { - let nb = wp; - if nb >= INMEM_FRAME_HEAD { - let magic = u32::from_le_bytes(*arrayref::array_ref![buf, 0, 4]); - let encid = u32::from_le_bytes(*arrayref::array_ref![buf, 4, 4]); - let tyid = u32::from_le_bytes(*arrayref::array_ref![buf, 8, 4]); - let len = u32::from_le_bytes(*arrayref::array_ref![buf, 12, 4]); - if magic != INMEM_FRAME_MAGIC { - let z = nb.min(256); - let u = String::from_utf8_lossy(&buf[0..z]); - let e = Error::with_msg("INCORRECT MAGIC"); - error!("incorrect magic buf as utf8: {:?} error: {:?}", u, e); - let msg = format!( - "InMemoryFrameAsyncReadStream tryparse incorrect magic: {} buf as utf8: {:?}", - magic, u - ); - error!("{}", msg); - return (Some(Some(Err(Error::with_msg(format!("{}", msg))))), buf, wp); - } - if len == 0 { - if nb != INMEM_FRAME_HEAD + INMEM_FRAME_FOOT { - warn!("stop-frame with nb {}", nb); - } - (Some(None), buf, wp) - } else { - if len > 1024 * 1024 * 50 { - let msg = format!( - "InMemoryFrameAsyncReadStream tryparse huge buffer len {} self.inp_bytes_consumed {}", - len, self.inp_bytes_consumed - ); - error!("{}", msg); - return (Some(Some(Err(Error::with_msg(msg)))), buf, wp); - } else if len > 1024 * 1024 * 1 { - // TODO - //warn!("InMemoryFrameAsyncReadStream big len received {}", len); - } - let nl = len as usize + INMEM_FRAME_HEAD + INMEM_FRAME_FOOT; - if self.bufcap < nl { - // TODO count cases in production - let n = 2 * nl; - debug!("Adjust bufcap old {} new {}", self.bufcap, n); - self.bufcap = n; - } - if nb < nl { - (None, buf, wp) - } else { - use bytes::Buf; - let mut h = crc32fast::Hasher::new(); - h.update(&buf[..(nl - INMEM_FRAME_FOOT)]); - let frame_crc = h.finalize(); - let mut h = crc32fast::Hasher::new(); - h.update(&buf[INMEM_FRAME_HEAD..(nl - INMEM_FRAME_FOOT)]); - let payload_crc = h.finalize(); - let frame_crc_ind = - u32::from_le_bytes(*arrayref::array_ref![buf, INMEM_FRAME_HEAD + len as usize, 4]); - let payload_crc_ind = u32::from_le_bytes(*arrayref::array_ref![buf, 16, 4]); - //info!("len {}", len); - //info!("payload_crc_ind {}", payload_crc_ind); - //info!("frame_crc_ind {}", frame_crc_ind); - let payload_crc_match = payload_crc_ind == payload_crc; - let frame_crc_match = frame_crc_ind == frame_crc; - if !frame_crc_match || !payload_crc_match { - let ss = String::from_utf8_lossy(&buf[..buf.len().min(256)]); - warn!("CRC mismatch A frame_crc_match {frame_crc_match} payload_crc_match {payload_crc_match}\n{ss:?}"); - return ( - Some(Some(Err(Error::with_msg(format!( - "InMemoryFrameAsyncReadStream tryparse crc mismatch A {} {}", - payload_crc_match, frame_crc_match, - ))))), - buf, - wp, - ); - } - let mut buf = buf; - let mut buf3 = buf.split_to(nl); - let buf = buf; - buf3.advance(INMEM_FRAME_HEAD); - buf3.truncate(len as usize); - let mut h = crc32fast::Hasher::new(); - h.update(&buf3); - let payload_crc_2 = h.finalize(); - let payload_crc_2_match = payload_crc_2 == payload_crc_ind; - if !payload_crc_2_match { - let sa = String::from_utf8_lossy(&buf[..buf.len().min(256)]); - let sb = String::from_utf8_lossy(&buf3[..buf3.len().min(256)]); - warn!("CRC mismatch B\n{sa:?}\n{sb:?}"); - return ( - Some(Some(Err(Error::with_msg(format!( - "InMemoryFrameAsyncReadStream tryparse crc mismatch B {} {} {}", - payload_crc_match, frame_crc_match, payload_crc_2_match, - ))))), - buf, - wp, - ); - } - self.inp_bytes_consumed += nl as u64; - let ret = InMemoryFrame { - len, - tyid, - encid, - buf: buf3.freeze(), - }; - (Some(Some(Ok(ret))), buf, wp - nl) - } - } - } else { - (None, buf, wp) + // Try to parse a frame. + // Update the need_min to the most current state. + // If successful, return item and number of bytes consumed. + // Must only be called when at least `need_min` bytes are available. + fn parse(&mut self) -> Result, Error> { + trace!("parse"); + let buf = self.buf.data(); + if buf.len() < self.need_min { + return Err(Error::with_msg_no_trace("expect at least need_min")); } + if buf.len() < INMEM_FRAME_HEAD { + return Err(Error::with_msg_no_trace("expect at least enough bytes for the header")); + } + let magic = u32::from_le_bytes(buf[0..4].try_into()?); + let encid = u32::from_le_bytes(buf[4..8].try_into()?); + let tyid = u32::from_le_bytes(buf[8..12].try_into()?); + let len = u32::from_le_bytes(buf[12..16].try_into()?); + let payload_crc_exp = u32::from_le_bytes(buf[16..20].try_into()?); + if magic != INMEM_FRAME_MAGIC { + let n = buf.len().min(64); + let u = String::from_utf8_lossy(&buf[0..n]); + let msg = format!( + "InMemoryFrameAsyncReadStream tryparse incorrect magic: {} buf as utf8: {:?}", + magic, u + ); + error!("{msg}"); + return Err(Error::with_msg(msg)); + } + if len > 1024 * 1024 * 50 { + let msg = format!( + "InMemoryFrameAsyncReadStream tryparse huge buffer len {} self.inp_bytes_consumed {}", + len, self.inp_bytes_consumed + ); + error!("{msg}"); + return Err(Error::with_msg(msg)); + } + let lentot = INMEM_FRAME_HEAD + INMEM_FRAME_FOOT + len as usize; + if buf.len() < lentot { + // TODO count cases in production + self.need_min = lentot; + return Ok(None); + } + let p1 = INMEM_FRAME_HEAD + len as usize; + let mut h = crc32fast::Hasher::new(); + h.update(&buf[..p1]); + let frame_crc = h.finalize(); + let mut h = crc32fast::Hasher::new(); + h.update(&buf[INMEM_FRAME_HEAD..p1]); + let payload_crc = h.finalize(); + let frame_crc_ind = u32::from_le_bytes(buf[p1..p1 + 4].try_into()?); + //info!("len {}", len); + //info!("payload_crc_ind {}", payload_crc_ind); + //info!("frame_crc_ind {}", frame_crc_ind); + let payload_crc_match = payload_crc_exp == payload_crc; + let frame_crc_match = frame_crc_ind == frame_crc; + if !frame_crc_match || !payload_crc_match { + let _ss = String::from_utf8_lossy(&buf[..buf.len().min(256)]); + let msg = format!( + "InMemoryFrameAsyncReadStream tryparse crc mismatch A {} {}", + payload_crc_match, frame_crc_match, + ); + error!("{msg}"); + let e = Error::with_msg_no_trace(msg); + return Err(e); + } + self.inp_bytes_consumed += lentot as u64; + let ret = InMemoryFrame { + len, + tyid, + encid, + buf: Bytes::from(buf[INMEM_FRAME_HEAD..p1].to_vec()), + }; + self.buf.adv(lentot)?; + self.need_min = INMEM_FRAME_HEAD; + Ok(Some(ret)) } } @@ -220,67 +152,57 @@ where fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - assert!(!self.completed); - if self.errored { - self.completed = true; - return Ready(None); + trace!("poll"); + self.npoll += 1; + if self.npoll > 2000 { + panic!() } - 'outer: loop { - if self.tryparse { - let r = { - let buf = std::mem::replace(&mut self.buf, BytesMut::new()); - let wp = self.wp; - let (r, buf, wp) = self.tryparse(buf, wp); - self.buf = buf; - self.wp = wp; - r - }; - break match r { - None => { - self.tryparse = false; - continue 'outer; + loop { + break if self.complete { + panic!("poll_next on complete") + } else if self.done { + self.complete = true; + Ready(None) + } else if self.buf.len() >= self.need_min { + match self.parse() { + Ok(None) => { + if self.buf.len() >= self.need_min { + self.done = true; + let e = Error::with_msg_no_trace("enough bytes but nothing parsed"); + Ready(Some(Err(e))) + } else { + debug!("not enouh for parse, need to wait for more"); + continue; + } } - Some(None) => { - self.tryparse = false; - self.completed = true; - Ready(None) - } - Some(Some(Ok(item))) => Ready(Some(Ok(StreamItem::DataItem(item)))), - Some(Some(Err(e))) => { - self.tryparse = false; - self.errored = true; + Ok(Some(item)) => Ready(Some(Ok(StreamItem::DataItem(item)))), + Err(e) => { + self.done = true; Ready(Some(Err(e))) } - }; + } } else { - let r = self.poll_upstream(cx); - break match r { + match self.poll_upstream(cx) { Ready(Ok(n1)) => { - self.wp += n1; + debug!("read {n1}"); if n1 == 0 { - let n2 = self.buf.len(); - if n2 != 0 { - // TODO anything more to handle here? - debug!( - "InMemoryFrameAsyncReadStream n2 != 0 n2 {} consumed {}", - n2, self.inp_bytes_consumed - ); - } - self.completed = true; - Ready(None) + self.done = true; + continue; } else { - self.tryparse = true; - continue 'outer; + continue; } } Ready(Err(e)) => { - trace!("poll_upstream GIVES Error"); - self.errored = true; - Ready(Some(Err(e.into()))) + error!("poll_upstream {e:?}"); + self.done = true; + Ready(Some(Err(e))) } - Pending => Pending, - }; - } + Pending => { + debug!("PENDING"); + Pending + } + } + }; } } } diff --git a/streams/src/lib.rs b/streams/src/lib.rs index 3203392..704d06b 100644 --- a/streams/src/lib.rs +++ b/streams/src/lib.rs @@ -7,6 +7,7 @@ pub mod merge; pub mod needminbuffer; pub mod plaineventsjson; pub mod rangefilter; +pub mod slidebuf; pub mod tcprawclient; #[cfg(test)] pub mod test; diff --git a/streams/src/merge.rs b/streams/src/merge.rs index 2a79b45..dae4cbe 100644 --- a/streams/src/merge.rs +++ b/streams/src/merge.rs @@ -4,6 +4,7 @@ use crate::frames::eventsfromframes::EventsFromFrames; use crate::frames::inmem::InMemoryFrameAsyncReadStream; use err::Error; use futures_util::Stream; +use futures_util::StreamExt; use items::frame::make_frame; use items::frame::make_term_frame; use items::sitem_data; @@ -41,6 +42,9 @@ where // TODO for images, we need larger buffer capacity let frames = InMemoryFrameAsyncReadStream::new(netin, 1024 * 128); let stream = EventsFromFrames::<_, T>::new(frames); + let stream = stream.inspect(|x| { + items::on_sitemty_range_complete!(x, warn!("RangeComplete SEEN IN RECEIVED TCP STREAM")); + }); streams.push(Box::pin(stream) as _); } Ok(streams) diff --git a/streams/src/slidebuf.rs b/streams/src/slidebuf.rs new file mode 100644 index 0000000..135d6ce --- /dev/null +++ b/streams/src/slidebuf.rs @@ -0,0 +1,401 @@ +use std::fmt; + +#[derive(Debug)] +pub enum Error { + NotEnoughBytes, + NotEnoughSpace, + TryFromSliceError, +} + +impl fmt::Display for Error { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "{self:?}") + } +} + +impl std::error::Error for Error { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + None + } +} + +impl From for Error { + fn from(_: std::array::TryFromSliceError) -> Self { + Self::TryFromSliceError + } +} + +pub struct SlideBuf { + buf: Vec, + wp: usize, + rp: usize, +} + +macro_rules! check_invariants { + ($self:expr) => { + //$self.check_invariants() + }; +} + +impl SlideBuf { + pub fn new(cap: usize) -> Self { + Self { + buf: vec![0; cap], + wp: 0, + rp: 0, + } + } + + pub fn state(&self) -> (usize, usize) { + (self.rp, self.wp) + } + + pub fn len(&self) -> usize { + check_invariants!(self); + self.wp - self.rp + } + + #[inline(always)] + pub fn cap(&self) -> usize { + check_invariants!(self); + self.buf.len() + } + + pub fn wcap(&self) -> usize { + check_invariants!(self); + self.buf.len() - self.wp + } + + pub fn data(&self) -> &[u8] { + check_invariants!(self); + &self.buf[self.rp..self.wp] + } + + pub fn data_mut(&mut self) -> &mut [u8] { + check_invariants!(self); + &mut self.buf[self.rp..self.wp] + } + + pub fn reset(&mut self) { + self.rp = 0; + self.wp = 0; + } + + pub fn adv(&mut self, x: usize) -> Result<(), Error> { + check_invariants!(self); + if self.len() < x { + return Err(Error::NotEnoughBytes); + } else { + self.rp += x; + Ok(()) + } + } + + pub fn wadv(&mut self, x: usize) -> Result<(), Error> { + check_invariants!(self); + if self.wcap() < x { + return Err(Error::NotEnoughSpace); + } else { + self.wp += x; + Ok(()) + } + } + + pub fn rp(&self) -> usize { + self.rp + } + + pub fn set_rp(&mut self, rp: usize) -> Result<(), Error> { + check_invariants!(self); + if rp > self.wp { + Err(Error::NotEnoughBytes) + } else { + self.rp = rp; + Ok(()) + } + } + + pub fn rewind_rp(&mut self, n: usize) -> Result<(), Error> { + check_invariants!(self); + if self.rp < n { + Err(Error::NotEnoughBytes) + } else { + self.rp -= n; + Ok(()) + } + } + + pub fn read_u8(&mut self) -> Result { + check_invariants!(self); + type T = u8; + const TS: usize = std::mem::size_of::(); + if self.len() < TS { + return Err(Error::NotEnoughBytes); + } else { + let val = self.buf[self.rp]; + self.rp += TS; + Ok(val) + } + } + + pub fn read_u16_be(&mut self) -> Result { + check_invariants!(self); + type T = u16; + const TS: usize = std::mem::size_of::(); + if self.len() < TS { + return Err(Error::NotEnoughBytes); + } else { + let val = T::from_be_bytes(self.buf[self.rp..self.rp + TS].try_into()?); + self.rp += TS; + Ok(val) + } + } + + pub fn read_u32_be(&mut self) -> Result { + check_invariants!(self); + type T = u32; + const TS: usize = std::mem::size_of::(); + if self.len() < TS { + return Err(Error::NotEnoughBytes); + } else { + let val = T::from_be_bytes(self.buf[self.rp..self.rp + TS].try_into()?); + self.rp += TS; + Ok(val) + } + } + + pub fn read_u64_be(&mut self) -> Result { + check_invariants!(self); + type T = u64; + const TS: usize = std::mem::size_of::(); + if self.len() < TS { + return Err(Error::NotEnoughBytes); + } else { + let val = T::from_be_bytes(self.buf[self.rp..self.rp + TS].try_into()?); + self.rp += TS; + Ok(val) + } + } + + pub fn read_i32_be(&mut self) -> Result { + check_invariants!(self); + type T = i32; + const TS: usize = std::mem::size_of::(); + if self.len() < TS { + return Err(Error::NotEnoughBytes); + } else { + let val = T::from_be_bytes(self.buf[self.rp..self.rp + TS].try_into()?); + self.rp += TS; + Ok(val) + } + } + + pub fn read_i64_be(&mut self) -> Result { + check_invariants!(self); + type T = i64; + const TS: usize = std::mem::size_of::(); + if self.len() < TS { + return Err(Error::NotEnoughBytes); + } else { + let val = T::from_be_bytes(self.buf[self.rp..self.rp + TS].try_into()?); + self.rp += TS; + Ok(val) + } + } + + pub fn read_f32_be(&mut self) -> Result { + check_invariants!(self); + type T = f32; + const TS: usize = std::mem::size_of::(); + if self.len() < TS { + return Err(Error::NotEnoughBytes); + } else { + let val = T::from_be_bytes(self.buf[self.rp..self.rp + TS].try_into()?); + self.rp += TS; + Ok(val) + } + } + + pub fn read_f64_be(&mut self) -> Result { + check_invariants!(self); + type T = f64; + const TS: usize = std::mem::size_of::(); + if self.len() < TS { + return Err(Error::NotEnoughBytes); + } else { + let val = T::from_be_bytes(self.buf[self.rp..self.rp + TS].try_into()?); + self.rp += TS; + Ok(val) + } + } + + pub fn read_bytes(&mut self, n: usize) -> Result<&[u8], Error> { + check_invariants!(self); + if self.len() < n { + return Err(Error::NotEnoughBytes); + } else { + let val = self.buf[self.rp..self.rp + n].as_ref(); + self.rp += n; + Ok(val) + } + } + + /*pub fn read_buf_for_fill(&mut self, need_min: usize) -> ReadBuf { + check_invariants!(self); + self.rewind_if_needed(need_min); + let read_buf = ReadBuf::new(&mut self.buf[self.wp..]); + read_buf + }*/ + + // TODO issue is that this return exactly the size that was asked for, + // but most of time, we want to first get some scratch space, and later + // advance the write pointer. + pub fn ___write_buf___(&mut self, n: usize) -> Result<&mut [u8], Error> { + check_invariants!(self); + self.rewind_if_needed(n); + if self.wcap() < n { + Err(Error::NotEnoughSpace) + } else { + let ret = &mut self.buf[self.wp..self.wp + n]; + self.wp += n; + Ok(ret) + } + } + + #[inline(always)] + pub fn rewind(&mut self) { + self.buf.copy_within(self.rp..self.wp, 0); + self.wp -= self.rp; + self.rp = 0; + } + + #[inline(always)] + pub fn rewind_if_needed(&mut self, need_min: usize) { + check_invariants!(self); + if self.rp != 0 && self.rp == self.wp { + self.rp = 0; + self.wp = 0; + } else if self.cap() < self.rp + need_min { + self.rewind(); + } + } + + pub fn available_writable_area(&mut self, need_min: usize) -> Result<&mut [u8], Error> { + check_invariants!(self); + self.rewind_if_needed(need_min); + if self.wcap() < need_min { + Err(Error::NotEnoughSpace) + } else { + let ret = &mut self.buf[self.wp..]; + Ok(ret) + } + } + + pub fn put_slice(&mut self, buf: &[u8]) -> Result<(), Error> { + check_invariants!(self); + self.rewind_if_needed(buf.len()); + if self.wcap() < buf.len() { + return Err(Error::NotEnoughSpace); + } else { + self.buf[self.wp..self.wp + buf.len()].copy_from_slice(buf); + self.wp += buf.len(); + Ok(()) + } + } + + pub fn put_u8(&mut self, v: u8) -> Result<(), Error> { + check_invariants!(self); + type T = u8; + const TS: usize = std::mem::size_of::(); + self.rewind_if_needed(TS); + if self.wcap() < TS { + return Err(Error::NotEnoughSpace); + } else { + self.buf[self.wp..self.wp + TS].copy_from_slice(&v.to_be_bytes()); + self.wp += TS; + Ok(()) + } + } + + pub fn put_u16_be(&mut self, v: u16) -> Result<(), Error> { + check_invariants!(self); + type T = u16; + const TS: usize = std::mem::size_of::(); + self.rewind_if_needed(TS); + if self.wcap() < TS { + return Err(Error::NotEnoughSpace); + } else { + self.buf[self.wp..self.wp + TS].copy_from_slice(&v.to_be_bytes()); + self.wp += TS; + Ok(()) + } + } + + pub fn put_u32_be(&mut self, v: u32) -> Result<(), Error> { + check_invariants!(self); + type T = u32; + const TS: usize = std::mem::size_of::(); + self.rewind_if_needed(TS); + if self.wcap() < TS { + return Err(Error::NotEnoughSpace); + } else { + self.buf[self.wp..self.wp + TS].copy_from_slice(&v.to_be_bytes()); + self.wp += TS; + Ok(()) + } + } + + pub fn put_u64_be(&mut self, v: u64) -> Result<(), Error> { + check_invariants!(self); + type T = u64; + const TS: usize = std::mem::size_of::(); + self.rewind_if_needed(TS); + if self.wcap() < TS { + return Err(Error::NotEnoughSpace); + } else { + self.buf[self.wp..self.wp + TS].copy_from_slice(&v.to_be_bytes()); + self.wp += TS; + Ok(()) + } + } + + pub fn put_f32_be(&mut self, v: f32) -> Result<(), Error> { + check_invariants!(self); + type T = f32; + const TS: usize = std::mem::size_of::(); + self.rewind_if_needed(TS); + if self.wcap() < TS { + return Err(Error::NotEnoughSpace); + } else { + self.buf[self.wp..self.wp + TS].copy_from_slice(&v.to_be_bytes()); + self.wp += TS; + Ok(()) + } + } + + pub fn put_f64_be(&mut self, v: f64) -> Result<(), Error> { + check_invariants!(self); + type T = f64; + const TS: usize = std::mem::size_of::(); + self.rewind_if_needed(TS); + if self.wcap() < TS { + return Err(Error::NotEnoughSpace); + } else { + self.buf[self.wp..self.wp + TS].copy_from_slice(&v.to_be_bytes()); + self.wp += TS; + Ok(()) + } + } + + #[allow(unused)] + fn check_invariants(&self) { + if self.wp > self.buf.len() { + eprintln!("ERROR netbuf wp {} rp {}", self.wp, self.rp); + std::process::exit(87); + } + if self.rp > self.wp { + eprintln!("ERROR netbuf wp {} rp {}", self.wp, self.rp); + std::process::exit(87); + } + } +}