From 6c5ada63e72b67e42d77557ff7ea664cdbc190b2 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 16 Dec 2022 06:27:35 +0100 Subject: [PATCH] Read channel status from disk --- httpret/src/channel_status.rs | 16 ++- httpret/src/channelconfig.rs | 2 +- httpret/src/proxy.rs | 6 +- items_2/src/binsdim0.rs | 36 ++++-- items_2/src/channelevents.rs | 39 ++++++ items_2/src/eventsdim0.rs | 2 +- netpod/src/netpod.rs | 24 +++- nodenet/src/conn.rs | 14 +- scyllaconn/Cargo.toml | 1 + scyllaconn/src/events.rs | 35 +---- scyllaconn/src/scyllaconn.rs | 1 + scyllaconn/src/status.rs | 237 ++++++++++++++++++++++++++++++++++ streams/src/frames/inmem.rs | 2 +- streams/src/slidebuf.rs | 10 ++ streams/src/tcprawclient.rs | 2 +- 15 files changed, 364 insertions(+), 63 deletions(-) create mode 100644 scyllaconn/src/status.rs diff --git a/httpret/src/channel_status.rs b/httpret/src/channel_status.rs index 1a73d78..12103f7 100644 --- a/httpret/src/channel_status.rs +++ b/httpret/src/channel_status.rs @@ -13,7 +13,7 @@ pub struct ConnectionStatusEvents {} impl ConnectionStatusEvents { pub fn handler(req: &Request) -> Option { - if req.uri().path() == "/api/4/scylla/connection/status/events" { + if req.uri().path() == "/api/4/status/connection/events" { Some(Self {}) } else { None @@ -63,7 +63,11 @@ impl ConnectionStatusEvents { .as_ref() .ok_or_else(|| Error::with_public_msg_no_trace(format!("No Scylla configured")))?; let scy = scyllaconn::create_scy_session(scyco).await?; - let mut stream = scyllaconn::events::channel_state_events(q, scy).await?; + let chconf = dbconn::channelconfig::chconf_from_database(q.channel(), node_config).await?; + let series = chconf.series; + let do_one_before_range = true; + let mut stream = + scyllaconn::status::StatusStreamScylla::new(series, q.range().clone(), do_one_before_range, scy); let mut ret = Vec::new(); while let Some(item) = stream.next().await { let item = item?; @@ -77,7 +81,7 @@ pub struct ChannelStatusEvents {} impl ChannelStatusEvents { pub fn handler(req: &Request) -> Option { - if req.uri().path() == "/api/4/scylla/channel/status/events" { + if req.uri().path() == "/api/4/status/channel/events" { Some(Self {}) } else { None @@ -127,7 +131,11 @@ impl ChannelStatusEvents { .as_ref() .ok_or_else(|| Error::with_public_msg_no_trace(format!("No Scylla configured")))?; let scy = scyllaconn::create_scy_session(scyco).await?; - let mut stream = scyllaconn::events::channel_state_events(q, scy).await?; + let chconf = dbconn::channelconfig::chconf_from_database(q.channel(), node_config).await?; + let series = chconf.series; + let do_one_before_range = true; + let mut stream = + scyllaconn::status::StatusStreamScylla::new(series, q.range().clone(), do_one_before_range, scy); let mut ret = Vec::new(); while let Some(item) = stream.next().await { let item = item?; diff --git a/httpret/src/channelconfig.rs b/httpret/src/channelconfig.rs index d0bd543..7b3836b 100644 --- a/httpret/src/channelconfig.rs +++ b/httpret/src/channelconfig.rs @@ -439,7 +439,7 @@ pub struct ScyllaChannelsActive {} impl ScyllaChannelsActive { pub fn handler(req: &Request) -> Option { - if req.uri().path() == "/api/4/scylla/channels/active" { + if req.uri().path() == "/api/4/channels/active" { Some(Self {}) } else { None diff --git a/httpret/src/proxy.rs b/httpret/src/proxy.rs index c2ff873..e5c74a1 100644 --- a/httpret/src/proxy.rs +++ b/httpret/src/proxy.rs @@ -11,7 +11,7 @@ use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Request, Response, Server}; use itertools::Itertools; use netpod::log::*; -use netpod::query::{BinnedQuery, PlainEventsQuery}; +use netpod::query::{BinnedQuery, ChannelStateEventsQuery, PlainEventsQuery}; use netpod::{AppendToUrl, ChannelConfigQuery, FromUrl, HasBackend, HasTimeout, ProxyConfig}; use netpod::{ChannelSearchQuery, ChannelSearchResult, ChannelSearchSingleResult}; use netpod::{ACCEPT_ALL, APP_JSON}; @@ -121,6 +121,10 @@ async fn proxy_http_service_inner( Ok(api4::channel_search(req, proxy_config).await?) } else if path == "/api/4/events" { Ok(proxy_single_backend_query::(req, ctx, proxy_config).await?) + } else if path == "/api/4/status/connection/events" { + Ok(proxy_single_backend_query::(req, ctx, proxy_config).await?) + } else if path == "/api/4/status/channel/events" { + Ok(proxy_single_backend_query::(req, ctx, proxy_config).await?) } else if path.starts_with("/api/4/map/pulse/") { Ok(proxy_single_backend_query::(req, ctx, proxy_config).await?) } else if path == "/api/4/binned" { diff --git a/items_2/src/binsdim0.rs b/items_2/src/binsdim0.rs index bef42da..c801166 100644 --- a/items_2/src/binsdim0.rs +++ b/items_2/src/binsdim0.rs @@ -42,17 +42,31 @@ where { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { let self_name = std::any::type_name::(); - write!( - fmt, - "{self_name} count {} ts1s {:?} ts2s {:?} counts {:?} mins {:?} maxs {:?} avgs {:?}", - self.ts1s.len(), - self.ts1s.iter().map(|k| k / SEC).collect::>(), - self.ts2s.iter().map(|k| k / SEC).collect::>(), - self.counts, - self.mins, - self.maxs, - self.avgs, - ) + if true { + write!( + fmt, + "{self_name} count {} ts1s {:?} ts2s {:?} counts {:?} mins {:?} maxs {:?} avgs {:?}", + self.ts1s.len(), + self.ts1s.iter().map(|k| k / SEC).collect::>(), + self.ts2s.iter().map(|k| k / SEC).collect::>(), + self.counts, + self.mins, + self.maxs, + self.avgs, + ) + } else { + write!( + fmt, + "{self_name} count {} edges {:?} .. {:?} counts {:?} .. {:?} avgs {:?} .. {:?}", + self.ts1s.len(), + self.ts1s.front().map(|k| k / SEC), + self.ts2s.back().map(|k| k / SEC), + self.counts.front(), + self.counts.back(), + self.avgs.front(), + self.avgs.back(), + ) + } } } diff --git a/items_2/src/channelevents.rs b/items_2/src/channelevents.rs index 522445d..65c94a4 100644 --- a/items_2/src/channelevents.rs +++ b/items_2/src/channelevents.rs @@ -19,6 +19,17 @@ pub enum ConnStatus { Disconnect, } +impl ConnStatus { + pub fn from_ca_ingest_status_kind(k: u32) -> Self { + match k { + 1 => Self::Connect, + 2 => Self::Disconnect, + 3 => Self::Disconnect, + _ => Self::Disconnect, + } + } +} + #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct ConnStatusEvent { pub ts: u64, @@ -121,10 +132,38 @@ mod serde_channel_events { let e1: u32 = seq.next_element()?.ok_or(de::Error::missing_field("[1] nty"))?; if e0 == EventsDim0::::serde_id() { match e1 { + u8::SUB => { + let obj: EventsDim0 = seq.next_element()?.ok_or(de::Error::missing_field("[2] obj"))?; + Ok(EvBox(Box::new(obj))) + } + u16::SUB => { + let obj: EventsDim0 = seq.next_element()?.ok_or(de::Error::missing_field("[2] obj"))?; + Ok(EvBox(Box::new(obj))) + } + u32::SUB => { + let obj: EventsDim0 = seq.next_element()?.ok_or(de::Error::missing_field("[2] obj"))?; + Ok(EvBox(Box::new(obj))) + } + u64::SUB => { + let obj: EventsDim0 = seq.next_element()?.ok_or(de::Error::missing_field("[2] obj"))?; + Ok(EvBox(Box::new(obj))) + } + i8::SUB => { + let obj: EventsDim0 = seq.next_element()?.ok_or(de::Error::missing_field("[2] obj"))?; + Ok(EvBox(Box::new(obj))) + } + i16::SUB => { + let obj: EventsDim0 = seq.next_element()?.ok_or(de::Error::missing_field("[2] obj"))?; + Ok(EvBox(Box::new(obj))) + } i32::SUB => { let obj: EventsDim0 = seq.next_element()?.ok_or(de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } + i64::SUB => { + let obj: EventsDim0 = seq.next_element()?.ok_or(de::Error::missing_field("[2] obj"))?; + Ok(EvBox(Box::new(obj))) + } f32::SUB => { let obj: EventsDim0 = seq.next_element()?.ok_or(de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) diff --git a/items_2/src/eventsdim0.rs b/items_2/src/eventsdim0.rs index fa54dc6..f86c1fb 100644 --- a/items_2/src/eventsdim0.rs +++ b/items_2/src/eventsdim0.rs @@ -77,7 +77,7 @@ where NTY: fmt::Debug, { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - if true { + if false { write!( fmt, "EventsDim0 {{ count {} ts {:?} vals {:?} }}", diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index d9b9cdd..cb20da4 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -28,6 +28,10 @@ pub const APP_JSON_LINES: &'static str = "application/jsonlines"; pub const APP_OCTET: &'static str = "application/octet-stream"; pub const ACCEPT_ALL: &'static str = "*/*"; +pub const CONNECTION_STATUS_DIV: u64 = timeunits::DAY; +pub const TS_MSP_GRID_UNIT: u64 = timeunits::SEC * 10; +pub const TS_MSP_GRID_SPACING: u64 = 6 * 2; + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct AggQuerySingleChannel { pub channel_config: ChannelConfig, @@ -685,12 +689,20 @@ pub struct NanoRange { impl fmt::Debug for NanoRange { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - let beg = chrono::Utc.timestamp_opt((self.beg / SEC) as i64, (self.beg % SEC) as u32); - let end = chrono::Utc.timestamp_opt((self.end / SEC) as i64, (self.end % SEC) as u32); - f.debug_struct("NanoRange") - .field("beg", &beg) - .field("end", &end) - .finish() + let beg = chrono::Utc + .timestamp_opt((self.beg / SEC) as i64, (self.beg % SEC) as u32) + .earliest(); + let end = chrono::Utc + .timestamp_opt((self.end / SEC) as i64, (self.end % SEC) as u32) + .earliest(); + if let (Some(a), Some(b)) = (beg, end) { + f.debug_struct("NanoRange").field("beg", &a).field("end", &b).finish() + } else { + f.debug_struct("NanoRange") + .field("beg", &beg) + .field("end", &end) + .finish() + } } } diff --git a/nodenet/src/conn.rs b/nodenet/src/conn.rs index 991d300..eff78ff 100644 --- a/nodenet/src/conn.rs +++ b/nodenet/src/conn.rs @@ -174,20 +174,16 @@ async fn events_conn_handler_inner_try( let scalar_type = f.scalar_type; let shape = f.shape; let do_test_stream_error = false; - let stream = match scyllaconn::events::make_scylla_stream( - &evq, - do_one_before_range, + debug!("Make EventsStreamScylla for {series} {scalar_type:?} {shape:?}"); + let stream = scyllaconn::events::EventsStreamScylla::new( series, + evq.range().clone(), + do_one_before_range, scalar_type, shape, scy, do_test_stream_error, - ) - .await - { - Ok(k) => k, - Err(e) => return Err((e, netout))?, - }; + ); let stream = stream.map(|item| { let item = match item { Ok(item) => match item { diff --git a/scyllaconn/Cargo.toml b/scyllaconn/Cargo.toml index 7842738..08c7351 100644 --- a/scyllaconn/Cargo.toml +++ b/scyllaconn/Cargo.toml @@ -13,6 +13,7 @@ serde_json = "1.0" serde_cbor = "0.11.2" erased-serde = "0.3" tokio = { version = "1.23.0", default-features = false, features = ["time", "sync"] } +tracing = "0.1.37" byteorder = "1.4.3" bytes = "1.2.1" num-traits = "0.2.15" diff --git a/scyllaconn/src/events.rs b/scyllaconn/src/events.rs index 955bd98..cf39829 100644 --- a/scyllaconn/src/events.rs +++ b/scyllaconn/src/events.rs @@ -5,7 +5,7 @@ use items_0::{Empty, Events, WithLen}; use items_2::channelevents::{ChannelEvents, ConnStatus, ConnStatusEvent}; use items_2::eventsdim0::EventsDim0; use netpod::log::*; -use netpod::query::{ChannelStateEventsQuery, PlainEventsQuery}; +use netpod::query::ChannelStateEventsQuery; use netpod::timeunits::*; use netpod::{NanoRange, ScalarType, Shape}; use scylla::Session as ScySession; @@ -14,13 +14,13 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -async fn find_ts_msp(series: i64, range: NanoRange, scy: Arc) -> Result, Error> { +async fn find_ts_msp(series: u64, range: NanoRange, scy: Arc) -> Result, Error> { info!("find_ts_msp series {} {:?}", series, range); let mut ret = VecDeque::new(); // TODO use prepared statements let cql = "select ts_msp from ts_msp where series = ? and ts_msp > ? and ts_msp < ?"; let res = scy - .query(cql, (series, range.beg as i64, range.end as i64)) + .query(cql, (series as i64, range.beg as i64, range.end as i64)) .await .err_conv()?; for row in res.rows_typed_or_empty::<(i64,)>() { @@ -28,13 +28,13 @@ async fn find_ts_msp(series: i64, range: NanoRange, scy: Arc) -> Res ret.push_back(row.0 as u64); } let cql = "select ts_msp from ts_msp where series = ? and ts_msp >= ? limit 1"; - let res = scy.query(cql, (series, range.end as i64)).await.err_conv()?; + let res = scy.query(cql, (series as i64, range.end as i64)).await.err_conv()?; for row in res.rows_typed_or_empty::<(i64,)>() { let row = row.err_conv()?; ret.push_back(row.0 as u64); } let cql = "select ts_msp from ts_msp where series = ? and ts_msp <= ? order by ts_msp desc limit 2"; - let res = scy.query(cql, (series, range.beg as i64)).await.err_conv()?; + let res = scy.query(cql, (series as i64, range.beg as i64)).await.err_conv()?; for row in res.rows_typed_or_empty::<(i64,)>() { let row = row.err_conv()?; ret.push_front(row.0 as u64); @@ -491,7 +491,7 @@ impl Stream for EventsStreamScylla { loop { break match self.state { FrState::New => { - let fut = find_ts_msp(self.series as i64, self.range.clone(), self.scy.clone()); + let fut = find_ts_msp(self.series, self.range.clone(), self.scy.clone()); let fut = Box::pin(fut); self.state = FrState::FindMsp(fut); continue; @@ -558,28 +558,7 @@ impl Stream for EventsStreamScylla { } } -pub async fn make_scylla_stream( - evq: &PlainEventsQuery, - do_one_before_range: bool, - series: u64, - scalar_type: ScalarType, - shape: Shape, - scy: Arc, - do_test_stream_error: bool, -) -> Result { - let res = EventsStreamScylla::new( - series, - evq.range().clone(), - do_one_before_range, - scalar_type, - shape, - scy, - do_test_stream_error, - ); - Ok(res) -} - -pub async fn channel_state_events( +async fn _channel_state_events( evq: &ChannelStateEventsQuery, scy: Arc, ) -> Result> + Send>>, Error> { diff --git a/scyllaconn/src/scyllaconn.rs b/scyllaconn/src/scyllaconn.rs index 1c915d9..0094b98 100644 --- a/scyllaconn/src/scyllaconn.rs +++ b/scyllaconn/src/scyllaconn.rs @@ -2,6 +2,7 @@ pub mod bincache; pub mod config; pub mod errconv; pub mod events; +pub mod status; use err::Error; use errconv::ErrConv; diff --git a/scyllaconn/src/status.rs b/scyllaconn/src/status.rs new file mode 100644 index 0000000..f2fc4f6 --- /dev/null +++ b/scyllaconn/src/status.rs @@ -0,0 +1,237 @@ +use crate::errconv::ErrConv; +use err::Error; +use futures_util::{Future, FutureExt, Stream}; +use items_2::channelevents::{ConnStatus, ConnStatusEvent}; +use netpod::log::*; +use netpod::NanoRange; +use netpod::CONNECTION_STATUS_DIV; +use scylla::Session as ScySession; +use std::collections::VecDeque; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +async fn read_next_status_events( + series: u64, + ts_msp: u64, + range: NanoRange, + fwd: bool, + do_one_before: bool, + scy: Arc, +) -> Result, Error> { + if ts_msp >= range.end { + warn!( + "given ts_msp {} >= range.end {} not necessary to read this", + ts_msp, range.end + ); + } + if range.end > i64::MAX as u64 { + return Err(Error::with_msg_no_trace(format!("range.end overflows i64"))); + } + let res = if fwd { + let ts_lsp_min = if ts_msp < range.beg { range.beg - ts_msp } else { 0 }; + let ts_lsp_max = if ts_msp < range.end { range.end - ts_msp } else { 0 }; + trace!( + "FWD ts_msp {} ts_lsp_min {} ts_lsp_max {} beg {} end {}", + ts_msp, + ts_lsp_min, + ts_lsp_max, + range.beg, + range.end + ); + // TODO use prepared! + let cql = concat!( + "select ts_lsp, pulse, kind from channel_status where series = ? and ts_msp = ? and ts_lsp >= ? and ts_lsp < ?" + ); + scy.query( + cql, + (series as i64, ts_msp as i64, ts_lsp_min as i64, ts_lsp_max as i64), + ) + .await + .err_conv()? + } else { + let ts_lsp_max = if ts_msp < range.beg { range.beg - ts_msp } else { 0 }; + debug!( + "BCK ts_msp {} ts_lsp_max {} beg {} end {} {}", + ts_msp, + ts_lsp_max, + range.beg, + range.end, + stringify!($fname) + ); + // TODO use prepared! + let cql = concat!( + "select ts_lsp, pulse, kind from channel_status where series = ? and ts_msp = ? and ts_lsp < ? order by ts_lsp desc limit 1" + ); + scy.query(cql, (series as i64, ts_msp as i64, ts_lsp_max as i64)) + .await + .err_conv()? + }; + let mut last_before = None; + let mut ret = VecDeque::new(); + for row in res.rows_typed_or_empty::<(i64, i64, i32)>() { + let row = row.err_conv()?; + let ts = ts_msp + row.0 as u64; + let _pulse = row.1 as u64; + let kind = row.2 as u32; + // from netfetch::store::ChannelStatus + let ev = ConnStatusEvent { + ts, + status: ConnStatus::from_ca_ingest_status_kind(kind), + }; + if ts >= range.end { + } else if ts >= range.beg { + ret.push_back(ev); + } else { + last_before = Some(ev); + } + } + if do_one_before { + if let Some(ev) = last_before { + debug!("PREPENDING THE LAST BEFORE {ev:?}"); + ret.push_front(ev); + } + } + trace!("found in total {} events ts_msp {}", ret.len(), ts_msp); + Ok(ret) +} + +struct ReadValues { + series: u64, + range: NanoRange, + ts_msps: VecDeque, + fwd: bool, + do_one_before_range: bool, + fut: Pin, Error>> + Send>>, + scy: Arc, +} + +impl ReadValues { + fn new( + series: u64, + range: NanoRange, + ts_msps: VecDeque, + fwd: bool, + do_one_before_range: bool, + scy: Arc, + ) -> Self { + let mut ret = Self { + series, + range, + ts_msps, + fwd, + do_one_before_range, + fut: Box::pin(futures_util::future::ready(Err(Error::with_msg_no_trace( + "future not initialized", + )))), + scy, + }; + ret.next(); + ret + } + + fn next(&mut self) -> bool { + if let Some(ts_msp) = self.ts_msps.pop_front() { + self.fut = self.make_fut(ts_msp, self.ts_msps.len() > 0); + true + } else { + false + } + } + + fn make_fut( + &mut self, + ts_msp: u64, + _has_more_msp: bool, + ) -> Pin, Error>> + Send>> { + let fut = read_next_status_events( + self.series, + ts_msp, + self.range.clone(), + self.fwd, + self.do_one_before_range, + self.scy.clone(), + ); + Box::pin(fut) + } +} + +enum FrState { + New, + ReadValues(ReadValues), + Done, +} + +pub struct StatusStreamScylla { + state: FrState, + series: u64, + range: NanoRange, + do_one_before_range: bool, + scy: Arc, + ts_msps: VecDeque, + outbuf: VecDeque, +} + +impl StatusStreamScylla { + pub fn new(series: u64, range: NanoRange, do_one_before_range: bool, scy: Arc) -> Self { + Self { + state: FrState::New, + series, + range, + do_one_before_range, + scy, + ts_msps: VecDeque::new(), + outbuf: VecDeque::new(), + } + } +} + +impl Stream for StatusStreamScylla { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + let span = tracing::span!(tracing::Level::TRACE, "poll_next"); + let _spg = span.enter(); + loop { + if let Some(x) = self.outbuf.pop_front() { + break Ready(Some(Ok(x))); + } + break match self.state { + FrState::New => { + let mut ts_msps = VecDeque::new(); + let mut ts = self.range.beg / CONNECTION_STATUS_DIV * CONNECTION_STATUS_DIV; + while ts < self.range.end { + ts_msps.push_back(ts); + ts += CONNECTION_STATUS_DIV; + } + let st = ReadValues::new( + self.series, + self.range.clone(), + self.ts_msps.clone(), + true, + self.do_one_before_range, + self.scy.clone(), + ); + self.state = FrState::ReadValues(st); + continue; + } + FrState::ReadValues(ref mut st) => match st.fut.poll_unpin(cx) { + Ready(Ok(item)) => { + if !st.next() { + debug!("ReadValues exhausted"); + self.state = FrState::Done; + } + for x in item { + self.outbuf.push_back(x); + } + continue; + } + Ready(Err(e)) => Ready(Some(Err(e))), + Pending => Pending, + }, + FrState::Done => Ready(None), + }; + } + } +} diff --git a/streams/src/frames/inmem.rs b/streams/src/frames/inmem.rs index e07d214..5b8bf46 100644 --- a/streams/src/frames/inmem.rs +++ b/streams/src/frames/inmem.rs @@ -194,7 +194,7 @@ where } } Ready(Err(e)) => { - error!("poll_upstream {e:?}"); + error!("poll_upstream need_min {} buf {:?} {:?}", self.need_min, self.buf, e); self.done = true; Ready(Some(Err(e))) } diff --git a/streams/src/slidebuf.rs b/streams/src/slidebuf.rs index 135d6ce..7b2e14d 100644 --- a/streams/src/slidebuf.rs +++ b/streams/src/slidebuf.rs @@ -399,3 +399,13 @@ impl SlideBuf { } } } + +impl fmt::Debug for SlideBuf { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("SlideBuf") + .field("cap", &self.cap()) + .field("wp", &self.wp) + .field("rp", &self.rp) + .finish() + } +} diff --git a/streams/src/tcprawclient.rs b/streams/src/tcprawclient.rs index ff4fc4a..bb541e7 100644 --- a/streams/src/tcprawclient.rs +++ b/streams/src/tcprawclient.rs @@ -71,7 +71,7 @@ where netout.flush().await?; netout.forget(); // TODO for images, we need larger buffer capacity - let frames = InMemoryFrameAsyncReadStream::new(netin, 1024 * 128); + let frames = InMemoryFrameAsyncReadStream::new(netin, 1024 * 256); let stream = EventsFromFrames::<_, T>::new(frames); streams.push(Box::pin(stream) as _); }