From e89904244c2ca9d58a99be62bde701b6b51f02fc Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 7 Nov 2024 15:13:45 +0100 Subject: [PATCH] Timeout provider as resource --- crates/httpret/src/api4/binned.rs | 5 +- crates/httpret/src/api4/events.rs | 4 +- crates/streamio/src/streamtimeout.rs | 21 ++--- crates/streamio/src/streamtimeout/test.rs | 48 +++++++++++ crates/streams/src/cbor_stream.rs | 16 ++-- crates/streams/src/plaineventscbor.rs | 4 +- crates/streams/src/streamtimeout.rs | 83 +++++++++++++++---- crates/streams/src/test/events.rs | 2 +- crates/streams/src/timebinnedjson.rs | 97 ++++++++++++----------- 9 files changed, 195 insertions(+), 85 deletions(-) create mode 100644 crates/streamio/src/streamtimeout/test.rs diff --git a/crates/httpret/src/api4/binned.rs b/crates/httpret/src/api4/binned.rs index 81b6d72..5237ef1 100644 --- a/crates/httpret/src/api4/binned.rs +++ b/crates/httpret/src/api4/binned.rs @@ -266,15 +266,14 @@ async fn binned_json_framed( let open_bytes = Arc::pin(OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone())); let (events_read_provider, cache_read_provider) = make_read_provider(ch_conf.name(), scyqueue, open_bytes, ctx, ncc); - let stream_timeout = streamio::streamtimeout::StreamTimeout::new(); - let stream_timeout = Box::new(stream_timeout); + let timeout_provider = streamio::streamtimeout::StreamTimeout::boxed(); let stream = streams::timebinnedjson::timebinned_json_framed( query, ch_conf, ctx, cache_read_provider, events_read_provider, - stream_timeout, + timeout_provider, ) .instrument(span1) .await?; diff --git a/crates/httpret/src/api4/events.rs b/crates/httpret/src/api4/events.rs index a3c2f47..750ac6e 100644 --- a/crates/httpret/src/api4/events.rs +++ b/crates/httpret/src/api4/events.rs @@ -177,7 +177,9 @@ async fn plain_events_cbor_framed( debug!("plain_events_cbor_framed {ch_conf:?} {req:?}"); let open_bytes = OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone()); let open_bytes = Arc::pin(open_bytes); - let stream = streams::plaineventscbor::plain_events_cbor_stream(&evq, ch_conf, ctx, open_bytes).await?; + let timeout_provider = streamio::streamtimeout::StreamTimeout::boxed(); + let stream = + streams::plaineventscbor::plain_events_cbor_stream(&evq, ch_conf, ctx, open_bytes, timeout_provider).await?; let stream = bytes_chunks_to_framed(stream); let logspan = if evq.log_level() == "trace" { trace!("enable trace for handler"); diff --git a/crates/streamio/src/streamtimeout.rs b/crates/streamio/src/streamtimeout.rs index a195191..66891b8 100644 --- a/crates/streamio/src/streamtimeout.rs +++ b/crates/streamio/src/streamtimeout.rs @@ -1,6 +1,9 @@ -use futures_util::Stream; -use std::pin::Pin; -use streams::streamtimeout::TimeoutableStream; +#[cfg(test)] +mod test; + +use std::time::Duration; +use streams::streamtimeout::BoxedTimeoutFuture; +use streams::streamtimeout::StreamTimeout2; pub struct StreamTimeout {} @@ -8,16 +11,14 @@ impl StreamTimeout { pub fn new() -> Self { Self {} } -} -impl streams::streamtimeout::StreamTimeout for StreamTimeout { - fn timeout_intervals(&self, inp: Pin + Send>>) -> Pin + Send>> { - todo!() + pub fn boxed() -> Box { + Box::new(Self::new()) } } -impl streams::streamtimeout::StreamTimeout2 for StreamTimeout { - fn timeout_intervals(&self, inp: S) -> TimeoutableStream { - todo!() +impl StreamTimeout2 for StreamTimeout { + fn timeout_intervals(&self, ivl: Duration) -> BoxedTimeoutFuture { + Box::pin(tokio::time::sleep(ivl)) } } diff --git a/crates/streamio/src/streamtimeout/test.rs b/crates/streamio/src/streamtimeout/test.rs new file mode 100644 index 0000000..896ea0d --- /dev/null +++ b/crates/streamio/src/streamtimeout/test.rs @@ -0,0 +1,48 @@ +use super::StreamTimeout; +use futures_util::StreamExt; +use std::time::Duration; +use std::time::Instant; +use streams::streamtimeout::TimeoutableStream; + +async fn stream_timeout_inner() -> Result<(), u8> { + let stream = futures_util::stream::iter((0..500).collect::>()); + let stream = stream.then({ + let mut i = 0; + move |x| { + i += 1; + let dur = if i % 5 == 0 { 500 } else { 0 }; + async move { + tokio::time::sleep(Duration::from_millis(dur)).await; + x + } + } + }); + let stream = stream.inspect(|_| { + // eprintln!("A see {x:?}"); + }); + let timeout_provider = StreamTimeout::new(); + let timeout_provider = Box::new(timeout_provider); + let ivl = Duration::from_millis(200); + let stream = TimeoutableStream::new(ivl, timeout_provider, stream); + let stream = stream.inspect({ + let mut tsl = Instant::now(); + move |x| match x { + Some(x) => { + let tsnow = Instant::now(); + let dt = tsnow.saturating_duration_since(tsl).as_secs_f32() * 1e3; + eprintln!("B see {x:?} {dt:7.2}"); + tsl = tsnow; + } + None => { + eprintln!("B see None"); + } + } + }); + stream.count().await; + Ok(()) +} + +#[test] +fn stream_timeout() { + taskrun::run(stream_timeout_inner()).unwrap() +} diff --git a/crates/streams/src/cbor_stream.rs b/crates/streams/src/cbor_stream.rs index 78feab8..0a1bf10 100644 --- a/crates/streams/src/cbor_stream.rs +++ b/crates/streams/src/cbor_stream.rs @@ -1,3 +1,5 @@ +use crate::streamtimeout::StreamTimeout2; +use crate::streamtimeout::TimeoutableStream; use bytes::Buf; use bytes::BufMut; use bytes::Bytes; @@ -77,11 +79,15 @@ pub type CborStream = Pin> + Send // TODO move this type decl because it is not specific to cbor pub type SitemtyDynEventsStream = Pin>> + Send>>; -pub fn events_stream_to_cbor_stream(stream: SitemtyDynEventsStream) -> impl Stream> { - let interval = tokio::time::interval(Duration::from_millis(4000)); - let stream = tokio_stream::StreamExt::timeout_repeating(stream, interval).map(|x| match x { - Ok(x) => map_events(x), - Err(_) => make_keepalive(), +pub fn events_stream_to_cbor_stream( + stream: SitemtyDynEventsStream, + timeout_provider: Box, +) -> impl Stream> { + let ivl = Duration::from_millis(4000); + let stream = TimeoutableStream::new(ivl, timeout_provider, stream); + let stream = stream.map(|x| match x { + Some(x) => map_events(x), + None => make_keepalive(), }); let prepend = { let item = make_keepalive(); diff --git a/crates/streams/src/plaineventscbor.rs b/crates/streams/src/plaineventscbor.rs index c3470ea..599d528 100644 --- a/crates/streams/src/plaineventscbor.rs +++ b/crates/streams/src/plaineventscbor.rs @@ -3,6 +3,7 @@ use crate::cbor_stream::CborStream; use crate::firsterr::non_empty; use crate::firsterr::only_first_err; use crate::plaineventsstream::dyn_events_stream; +use crate::streamtimeout::StreamTimeout2; use crate::tcprawclient::OpenBoxedBytesStreamsBox; use netpod::ChannelTypeConfigGen; use netpod::ReqCtx; @@ -19,9 +20,10 @@ pub async fn plain_events_cbor_stream( ch_conf: ChannelTypeConfigGen, ctx: &ReqCtx, open_bytes: OpenBoxedBytesStreamsBox, + timeout_provider: Box, ) -> Result { let stream = dyn_events_stream(evq, ch_conf, ctx, open_bytes).await?; - let stream = events_stream_to_cbor_stream(stream); + let stream = events_stream_to_cbor_stream(stream, timeout_provider); let stream = non_empty(stream); let stream = only_first_err(stream); Ok(Box::pin(stream)) diff --git a/crates/streams/src/streamtimeout.rs b/crates/streams/src/streamtimeout.rs index c7daac4..a23cb26 100644 --- a/crates/streams/src/streamtimeout.rs +++ b/crates/streams/src/streamtimeout.rs @@ -1,16 +1,67 @@ +use futures_util::FutureExt; use futures_util::Stream; -use std::marker::PhantomData; +use futures_util::StreamExt; +use std::future::Future; use std::pin::Pin; use std::task::Context; use std::task::Poll; +use std::time::Duration; +use std::time::Instant; -pub struct TimeoutableStream { - _t1: PhantomData, +pub type BoxedTimeoutFuture = Pin + Send>>; + +pub trait StreamTimeout2: Send { + fn timeout_intervals(&self, ivl: Duration) -> BoxedTimeoutFuture; } -impl TimeoutableStream { - fn new() -> Self { - Self { _t1: PhantomData } +pub struct TimeoutableStream { + ivl: Duration, + timeout_provider: Box, + inp: Pin>, + timeout_fut: BoxedTimeoutFuture, + last_seen: Instant, +} + +impl TimeoutableStream +where + S: Stream, +{ + pub fn new(ivl: Duration, timeout_provider: Box, inp: S) -> Self { + let timeout_fut = timeout_provider.timeout_intervals(ivl); + Self { + ivl, + timeout_provider, + inp: Box::pin(inp), + timeout_fut, + last_seen: Instant::now(), + } + } + + fn resetup(mut self: Pin<&mut Self>, ivl: Duration) -> () { + self.timeout_fut = self.timeout_provider.timeout_intervals(ivl) + } + + fn handle_timeout(self: Pin<&mut Self>, cx: &mut Context) -> Poll::Item>> { + use Poll::*; + let tsnow = Instant::now(); + if self.last_seen + self.ivl < tsnow { + let ivl2 = self.ivl; + self.resetup(ivl2); + Ready(Some(None)) + } else { + let ivl2 = (self.last_seen + self.ivl) - tsnow + Duration::from_millis(1); + self.resetup(ivl2); + cx.waker().wake_by_ref(); + Pending + } + } + + fn handle_inp_pending(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll::Item>> { + use Poll::*; + match self.timeout_fut.poll_unpin(cx) { + Ready(()) => self.handle_timeout(cx), + Pending => Pending, + } } } @@ -20,15 +71,15 @@ where { type Item = Option<::Item>; - fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - todo!() + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + match self.inp.poll_next_unpin(cx) { + Ready(Some(x)) => { + self.last_seen = Instant::now(); + Ready(Some(Some(x))) + } + Ready(None) => Ready(None), + Pending => self.handle_inp_pending(cx), + } } } - -pub trait StreamTimeout: Send { - fn timeout_intervals(&self, inp: Pin + Send>>) -> Pin + Send>>; -} - -pub trait StreamTimeout2: Send { - fn timeout_intervals(&self, inp: S) -> TimeoutableStream; -} diff --git a/crates/streams/src/test/events.rs b/crates/streams/src/test/events.rs index c3d1948..c594887 100644 --- a/crates/streams/src/test/events.rs +++ b/crates/streams/src/test/events.rs @@ -55,7 +55,7 @@ async fn merged_events_inner() -> Result<(), Error> { let evq = PlainEventsQuery::new(channel, range); let open_bytes = StreamOpener::new(); let open_bytes = Arc::pin(open_bytes); - let stream = plain_events_cbor_stream(&evq, ch_conf.clone().into(), &ctx, open_bytes) + let stream = plain_events_cbor_stream(&evq, ch_conf.clone().into(), &ctx, open_bytes, todo!()) .await .unwrap(); let stream = lenframed::length_framed(stream); diff --git a/crates/streams/src/timebinnedjson.rs b/crates/streams/src/timebinnedjson.rs index 3e42e8b..010f08a 100644 --- a/crates/streams/src/timebinnedjson.rs +++ b/crates/streams/src/timebinnedjson.rs @@ -3,8 +3,8 @@ use crate::collect::CollectResult; use crate::json_stream::JsonBytes; use crate::json_stream::JsonStream; use crate::rangefilter2::RangeFilter2; -use crate::streamtimeout::StreamTimeout; use crate::streamtimeout::StreamTimeout2; +use crate::streamtimeout::TimeoutableStream; use crate::tcprawclient::container_stream_from_bytes_stream; use crate::tcprawclient::make_sub_query; use crate::tcprawclient::OpenBoxedBytesStreamsBox; @@ -16,7 +16,6 @@ use futures_util::Stream; use futures_util::StreamExt; use items_0::collect_s::CollectableDyn; use items_0::on_sitemty_data; -use items_0::streamitem::sitem_err2_from_string; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; @@ -366,7 +365,7 @@ pub async fn timebinned_json_framed( ctx: &ReqCtx, cache_read_provider: Arc, events_read_provider: Arc, - stream_timeout_provider: Box>>, + timeout_provider: Box, ) -> Result { trace!("timebinned_json_framed"); let binned_range = query.covering_range()?; @@ -391,65 +390,67 @@ pub async fn timebinned_json_framed( .max(Duration::from_millis(100)); let timeout_content_2 = timeout_content_base * 2 / 3; let mut coll = None; - let interval = tokio::time::interval(Duration::from(timeout_content_base)); let mut last_emit = Instant::now(); let stream = stream.map(|x| Some(x)).chain(futures_util::stream::iter([None])); - let stream = tokio_stream::StreamExt::timeout_repeating(stream, interval).map(move |x| match x { - Ok(item) => match item { + let stream = TimeoutableStream::new(timeout_content_base, timeout_provider, stream); + let stream = stream.map(move |x| { + match x { Some(x) => match x { - Ok(x) => match x { - StreamItem::DataItem(x) => match x { - RangeCompletableItem::Data(mut item) => { - let coll = coll.get_or_insert_with(|| item.new_collector()); - coll.ingest(&mut item); - if coll.len() >= 128 || last_emit.elapsed() >= timeout_content_2 { - last_emit = Instant::now(); - take_collector_result(coll).map(|x| Ok(x)) - } else { - // Some(serde_json::Value::String(format!("coll len {}", coll.len()))) - None + Some(x) => match x { + Ok(x) => match x { + StreamItem::DataItem(x) => match x { + RangeCompletableItem::Data(mut item) => { + let coll = coll.get_or_insert_with(|| item.new_collector()); + coll.ingest(&mut item); + if coll.len() >= 128 || last_emit.elapsed() >= timeout_content_2 { + last_emit = Instant::now(); + take_collector_result(coll).map(|x| Ok(x)) + } else { + // Some(serde_json::Value::String(format!("coll len {}", coll.len()))) + None + } } + RangeCompletableItem::RangeComplete => None, + }, + StreamItem::Log(x) => { + debug!("{x:?}"); + // Some(serde_json::Value::String(format!("{x:?}"))) + None + } + StreamItem::Stats(x) => { + debug!("{x:?}"); + // Some(serde_json::Value::String(format!("{x:?}"))) + None } - RangeCompletableItem::RangeComplete => None, }, - StreamItem::Log(x) => { - debug!("{x:?}"); - // Some(serde_json::Value::String(format!("{x:?}"))) - None - } - StreamItem::Stats(x) => { - debug!("{x:?}"); - // Some(serde_json::Value::String(format!("{x:?}"))) - None - } + Err(e) => Some(Err(e)), }, - Err(e) => Some(Err(e)), + None => { + if let Some(coll) = coll.as_mut() { + last_emit = Instant::now(); + take_collector_result(coll).map(|x| Ok(x)) + } else { + // Some(serde_json::Value::String(format!( + // "end of input but no collector to take something from" + // ))) + None + } + } }, None => { if let Some(coll) = coll.as_mut() { - last_emit = Instant::now(); - take_collector_result(coll).map(|x| Ok(x)) + if coll.len() != 0 { + last_emit = Instant::now(); + take_collector_result(coll).map(|x| Ok(x)) + } else { + // Some(serde_json::Value::String(format!("timeout but nothing to do"))) + None + } } else { - // Some(serde_json::Value::String(format!( - // "end of input but no collector to take something from" - // ))) + // Some(serde_json::Value::String(format!("timeout but no collector"))) None } } - }, - Err(_) => { - if let Some(coll) = coll.as_mut() { - if coll.len() != 0 { - last_emit = Instant::now(); - take_collector_result(coll).map(|x| Ok(x)) - } else { - // Some(serde_json::Value::String(format!("timeout but nothing to do"))) - None - } - } else { - // Some(serde_json::Value::String(format!("timeout but no collector"))) - None - } } }); let stream = stream.filter_map(|x| futures_util::future::ready(x));