diff --git a/crates/httpret/src/api4/binned.rs b/crates/httpret/src/api4/binned.rs index 5237ef1..021a0c9 100644 --- a/crates/httpret/src/api4/binned.rs +++ b/crates/httpret/src/api4/binned.rs @@ -211,9 +211,17 @@ async fn binned_json_single( let open_bytes = Arc::pin(OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone())); let (events_read_provider, cache_read_provider) = make_read_provider(ch_conf.name(), scyqueue, open_bytes, ctx, ncc); - let item = streams::timebinnedjson::timebinned_json(query, ch_conf, ctx, cache_read_provider, events_read_provider) - .instrument(span1) - .await?; + let timeout_provider = streamio::streamtimeout::StreamTimeout::boxed(); + let item = streams::timebinnedjson::timebinned_json( + query, + ch_conf, + ctx, + cache_read_provider, + events_read_provider, + timeout_provider, + ) + .instrument(span1) + .await?; match item { CollectResult::Some(item) => { let ret = response(StatusCode::OK) diff --git a/crates/httpret/src/api4/events.rs b/crates/httpret/src/api4/events.rs index 750ac6e..c98761c 100644 --- a/crates/httpret/src/api4/events.rs +++ b/crates/httpret/src/api4/events.rs @@ -208,7 +208,9 @@ async fn plain_events_json_framed( debug!("plain_events_json_framed {ch_conf:?} {req:?}"); let open_bytes = OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone()); let open_bytes = Arc::pin(open_bytes); - let stream = streams::plaineventsjson::plain_events_json_stream(&evq, ch_conf, ctx, open_bytes).await?; + let timeout_provider = streamio::streamtimeout::StreamTimeout::boxed(); + let stream = + streams::plaineventsjson::plain_events_json_stream(&evq, ch_conf, ctx, open_bytes, timeout_provider).await?; let stream = bytes_chunks_to_len_framed_str(stream); let ret = response(StatusCode::OK) .header(CONTENT_TYPE, APP_JSON_FRAMED) @@ -231,8 +233,16 @@ async fn plain_events_json( debug!("{self_name} chconf_from_events_quorum: {ch_conf:?}"); let open_bytes = OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone()); let open_bytes = Arc::pin(open_bytes); - let item = - streams::plaineventsjson::plain_events_json(&evq, ch_conf, ctx, &ncc.node_config.cluster, open_bytes).await; + let timeout_provider = streamio::streamtimeout::StreamTimeout::boxed(); + let item = streams::plaineventsjson::plain_events_json( + &evq, + ch_conf, + ctx, + &ncc.node_config.cluster, + open_bytes, + timeout_provider, + ) + .await; debug!("{self_name} returned {}", item.is_ok()); let item = match item { Ok(item) => item, diff --git a/crates/streams/Cargo.toml b/crates/streams/Cargo.toml index 9a2613a..f67f92e 100644 --- a/crates/streams/Cargo.toml +++ b/crates/streams/Cargo.toml @@ -5,8 +5,8 @@ authors = ["Dominik Werder "] edition = "2021" [dependencies] -tokio = { version = "1.41", features = ["io-util", "net", "time", "sync", "fs"] } -tokio-stream = "0.1.16" +#tokio = { version = "1.41", features = ["io-util", "net", "time", "sync", "fs"] } +#tokio-stream = "0.1.16" futures-util = "0.3.15" pin-project = "1.0.12" serde = { version = "1.0", features = ["derive"] } @@ -37,6 +37,7 @@ taskrun = { path = "../taskrun" } [features] wasm_transform = ["wasmer"] +indev = [] [patch.crates-io] thiserror = { git = "https://github.com/dominikwerder/thiserror.git", branch = "cstm" } diff --git a/crates/streams/src/collect.rs b/crates/streams/src/collect.rs index 298f3ce..4f9e99e 100644 --- a/crates/streams/src/collect.rs +++ b/crates/streams/src/collect.rs @@ -1,3 +1,4 @@ +use crate::streamtimeout::StreamTimeout2; use futures_util::Future; use futures_util::FutureExt; use futures_util::Stream; @@ -14,13 +15,10 @@ use netpod::log::*; use netpod::range::evrange::SeriesRange; use netpod::BinnedRangeEnum; use netpod::DiskStats; -use std::fmt; use std::pin::Pin; use std::task::Context; use std::task::Poll; -use std::time::Duration; use std::time::Instant; -use tracing::Instrument; #[derive(Debug, thiserror::Error)] #[cstm(name = "CollectDyn")] @@ -68,8 +66,8 @@ impl Collect { bytes_max: u64, range: Option, binrange: Option, + timeout_provider: Box, ) -> Self { - let timer = tokio::time::sleep_until(deadline.into()); Self { inp, events_max, @@ -79,7 +77,7 @@ impl Collect { collector: None, range_final: false, timeout: false, - timer: Box::pin(timer), + timer: timeout_provider.timeout_intervals(deadline.saturating_duration_since(Instant::now())), done_input: false, } } @@ -220,124 +218,3 @@ impl Future for Collect { } } } - -async fn collect_in_span( - stream: S, - deadline: Instant, - events_max: u64, - range: Option, - binrange: Option, -) -> Result, Error> -where - S: Stream> + Unpin, - T: CollectableDyn, -{ - info!("collect events_max {events_max} deadline {deadline:?}"); - let mut collector: Option> = None; - let mut stream = stream; - let deadline = deadline.into(); - let mut range_complete = false; - let mut timed_out = false; - let mut total_duration = Duration::ZERO; - loop { - let item = match tokio::time::timeout_at(deadline, stream.next()).await { - Ok(Some(k)) => k, - Ok(None) => break, - Err(_e) => { - warn!("collect timeout"); - timed_out = true; - if let Some(coll) = collector.as_mut() { - info!("collect_in_span call set_timed_out"); - coll.set_timed_out(); - } else { - warn!("collect_in_span collect timeout but no collector yet"); - } - break; - } - }; - match item { - Ok(item) => match item { - StreamItem::DataItem(item) => match item { - RangeCompletableItem::RangeComplete => { - range_complete = true; - if let Some(coll) = collector.as_mut() { - coll.set_range_complete(); - } else { - warn!("collect_in_span received RangeComplete but no collector yet"); - } - } - RangeCompletableItem::Data(mut item) => { - trace!("collect_in_span sees len {}", item.len()); - if collector.is_none() { - let c = item.new_collector(); - collector = Some(c); - } - let coll = collector.as_mut().unwrap(); - coll.ingest(&mut item); - if coll.len() as u64 >= events_max { - warn!("span reached events_max {}", events_max); - info!("collect_in_span call set_continue_at_here"); - coll.set_continue_at_here(); - break; - } - } - }, - StreamItem::Log(item) => { - trace!("collect_in_span log {:?}", item); - } - StreamItem::Stats(item) => { - trace!("collect_in_span stats {:?}", item); - match item { - // TODO factor and simplify the stats collection: - StatsItem::EventDataReadStats(_) => {} - StatsItem::RangeFilterStats(_) => {} - StatsItem::DiskStats(item) => match item { - DiskStats::OpenStats(k) => { - total_duration += k.duration; - } - DiskStats::SeekStats(k) => { - total_duration += k.duration; - } - DiskStats::ReadStats(k) => { - total_duration += k.duration; - } - DiskStats::ReadExactStats(k) => { - total_duration += k.duration; - } - }, - _ => {} - } - } - }, - Err(e) => { - // TODO Need to use some flags to get good enough error message for remote user. - return Err(ErrMsg(e).into()); - } - } - } - let _ = range_complete; - let _ = timed_out; - let res = collector - .ok_or_else(|| Error::NoResultNoCollector)? - .result(range, binrange) - .map_err(ErrMsg)?; - info!("collect_in_span stats total duration: {:?}", total_duration); - Ok(res) -} - -pub async fn collect( - stream: S, - deadline: Instant, - events_max: u64, - range: Option, - binrange: Option, -) -> Result, Error> -where - S: Stream> + Unpin, - T: CollectableDyn + WithLen + fmt::Debug, -{ - let span = span!(Level::INFO, "collect"); - collect_in_span(stream, deadline, events_max, range, binrange) - .instrument(span) - .await -} diff --git a/crates/streams/src/collect_adapter.rs b/crates/streams/src/collect_adapter.rs new file mode 100644 index 0000000..064c3ac --- /dev/null +++ b/crates/streams/src/collect_adapter.rs @@ -0,0 +1,124 @@ +use std::fmt; +use std::time::Duration; +use tracing::Instrument; + +async fn collect_in_span( + stream: S, + deadline: Instant, + events_max: u64, + range: Option, + binrange: Option, +) -> Result, Error> +where + S: Stream> + Unpin, + T: CollectableDyn, +{ + info!("collect events_max {events_max} deadline {deadline:?}"); + let mut collector: Option> = None; + let mut stream = stream; + let deadline = deadline.into(); + let mut range_complete = false; + let mut timed_out = false; + let mut total_duration = Duration::ZERO; + loop { + let item = match tokio::time::timeout_at(deadline, stream.next()).await { + Ok(Some(k)) => k, + Ok(None) => break, + Err(_e) => { + warn!("collect timeout"); + timed_out = true; + if let Some(coll) = collector.as_mut() { + info!("collect_in_span call set_timed_out"); + coll.set_timed_out(); + } else { + warn!("collect_in_span collect timeout but no collector yet"); + } + break; + } + }; + match item { + Ok(item) => match item { + StreamItem::DataItem(item) => match item { + RangeCompletableItem::RangeComplete => { + range_complete = true; + if let Some(coll) = collector.as_mut() { + coll.set_range_complete(); + } else { + warn!("collect_in_span received RangeComplete but no collector yet"); + } + } + RangeCompletableItem::Data(mut item) => { + trace!("collect_in_span sees len {}", item.len()); + if collector.is_none() { + let c = item.new_collector(); + collector = Some(c); + } + let coll = collector.as_mut().unwrap(); + coll.ingest(&mut item); + if coll.len() as u64 >= events_max { + warn!("span reached events_max {}", events_max); + info!("collect_in_span call set_continue_at_here"); + coll.set_continue_at_here(); + break; + } + } + }, + StreamItem::Log(item) => { + trace!("collect_in_span log {:?}", item); + } + StreamItem::Stats(item) => { + trace!("collect_in_span stats {:?}", item); + match item { + // TODO factor and simplify the stats collection: + StatsItem::EventDataReadStats(_) => {} + StatsItem::RangeFilterStats(_) => {} + StatsItem::DiskStats(item) => match item { + DiskStats::OpenStats(k) => { + total_duration += k.duration; + } + DiskStats::SeekStats(k) => { + total_duration += k.duration; + } + DiskStats::ReadStats(k) => { + total_duration += k.duration; + } + DiskStats::ReadExactStats(k) => { + total_duration += k.duration; + } + }, + _ => {} + } + } + }, + Err(e) => { + // TODO Need to use some flags to get good enough error message for remote user. + return Err(ErrMsg(e).into()); + } + } + } + let _ = range_complete; + let _ = timed_out; + let res = collector + .ok_or_else(|| Error::NoResultNoCollector)? + .result(range, binrange) + .map_err(ErrMsg)?; + info!("collect_in_span stats total duration: {:?}", total_duration); + Ok(res) +} + +async fn collect( + stream: S, + deadline: Instant, + events_max: u64, + range: Option, + binrange: Option, +) -> Result, Error> +where + S: Stream> + Unpin, + T: CollectableDyn + WithLen + fmt::Debug, +{ + let span = span!(Level::INFO, "collect"); + collect_in_span(stream, deadline, events_max, range, binrange) + .instrument(span) + .await +} diff --git a/crates/streams/src/generators.rs b/crates/streams/src/generators.rs index dcdc5ff..286bcad 100644 --- a/crates/streams/src/generators.rs +++ b/crates/streams/src/generators.rs @@ -29,7 +29,6 @@ use std::f64::consts::PI; use std::pin::Pin; use std::task::Context; use std::task::Poll; -use std::time::Duration; #[derive(Debug, thiserror::Error)] #[cstm(name = "Generator")] @@ -40,6 +39,10 @@ pub enum Error { BadChannelName, } +fn make_sleep_fut() -> Pin + Send>> { + todo!() +} + pub fn make_test_channel_events_bytes_stream( subq: EventsSubQuery, node_count: u64, @@ -209,7 +212,7 @@ impl Stream for GenerateI32V00 { Pending => Pending, } } else { - self.timeout = Some(Box::pin(tokio::time::sleep(Duration::from_millis(2)))); + self.timeout = Some(make_sleep_fut()); continue; }; } @@ -315,7 +318,7 @@ impl Stream for GenerateI32V01 { Pending => Pending, } } else { - self.timeout = Some(Box::pin(tokio::time::sleep(Duration::from_millis(2)))); + self.timeout = Some(make_sleep_fut()); continue; }; } @@ -418,7 +421,7 @@ impl Stream for GenerateF64V00 { Pending => Pending, } } else { - self.timeout = Some(Box::pin(tokio::time::sleep(Duration::from_millis(2)))); + self.timeout = Some(make_sleep_fut()); continue; }; } @@ -529,7 +532,7 @@ impl Stream for GenerateWaveI16V00 { Pending => Pending, } } else { - self.timeout = Some(Box::pin(tokio::time::sleep(Duration::from_millis(2)))); + self.timeout = Some(make_sleep_fut()); continue; }; } diff --git a/crates/streams/src/json_stream.rs b/crates/streams/src/json_stream.rs index c2dc3e1..0f922c1 100644 --- a/crates/streams/src/json_stream.rs +++ b/crates/streams/src/json_stream.rs @@ -1,4 +1,6 @@ use crate::cbor_stream::SitemtyDynEventsStream; +use crate::streamtimeout::StreamTimeout2; +use crate::streamtimeout::TimeoutableStream; use futures_util::Stream; use futures_util::StreamExt; use items_0::streamitem::RangeCompletableItem; @@ -60,11 +62,15 @@ impl From for String { pub type JsonStream = Pin> + Send>>; -pub fn events_stream_to_json_stream(stream: SitemtyDynEventsStream) -> impl Stream> { - let interval = tokio::time::interval(Duration::from_millis(4000)); - let stream = tokio_stream::StreamExt::timeout_repeating(stream, interval).map(|x| match x { - Ok(x) => map_events(x), - Err(_) => make_keepalive(), +pub fn events_stream_to_json_stream( + stream: SitemtyDynEventsStream, + timeout_provider: Box, +) -> impl Stream> { + let ivl = Duration::from_millis(4000); + let stream = TimeoutableStream::new(ivl, timeout_provider, stream); + let stream = stream.map(|x| match x { + Some(x) => map_events(x), + None => make_keepalive(), }); let prepend = { let item = make_keepalive(); diff --git a/crates/streams/src/lib.rs b/crates/streams/src/lib.rs index 9cacafd..2ce9cb4 100644 --- a/crates/streams/src/lib.rs +++ b/crates/streams/src/lib.rs @@ -1,6 +1,8 @@ pub mod boxed; pub mod cbor_stream; pub mod collect; +#[cfg(feature = "indev")] +pub mod collect_adapter; pub mod dtflags; pub mod events; pub mod eventsplainreader; diff --git a/crates/streams/src/plaineventsjson.rs b/crates/streams/src/plaineventsjson.rs index 8a986de..53a587d 100644 --- a/crates/streams/src/plaineventsjson.rs +++ b/crates/streams/src/plaineventsjson.rs @@ -5,6 +5,7 @@ use crate::firsterr::only_first_err; use crate::json_stream::events_stream_to_json_stream; use crate::json_stream::JsonStream; use crate::plaineventsstream::dyn_events_stream; +use crate::streamtimeout::StreamTimeout2; use crate::tcprawclient::OpenBoxedBytesStreamsBox; use futures_util::StreamExt; use items_0::collect_s::CollectableDyn; @@ -33,6 +34,7 @@ pub async fn plain_events_json( ctx: &ReqCtx, _cluster: &Cluster, open_bytes: OpenBoxedBytesStreamsBox, + timeout_provider: Box, ) -> Result, Error> { debug!("plain_events_json evquery {:?}", evq); let deadline = Instant::now() + evq.timeout().unwrap_or(Duration::from_millis(4000)); @@ -88,6 +90,7 @@ pub async fn plain_events_json( evq.bytes_max(), Some(evq.range().clone()), None, + timeout_provider, ) .await?; debug!("plain_events_json collected"); @@ -106,10 +109,11 @@ pub async fn plain_events_json_stream( ch_conf: ChannelTypeConfigGen, ctx: &ReqCtx, open_bytes: OpenBoxedBytesStreamsBox, + timeout_provider: Box, ) -> Result { trace!("plain_events_json_stream"); let stream = dyn_events_stream(evq, ch_conf, ctx, open_bytes).await?; - let stream = events_stream_to_json_stream(stream); + let stream = events_stream_to_json_stream(stream, timeout_provider); let stream = non_empty(stream); let stream = only_first_err(stream); Ok(Box::pin(stream)) diff --git a/crates/streams/src/timebinnedjson.rs b/crates/streams/src/timebinnedjson.rs index 010f08a..764f8f4 100644 --- a/crates/streams/src/timebinnedjson.rs +++ b/crates/streams/src/timebinnedjson.rs @@ -294,6 +294,7 @@ pub async fn timebinned_json( ctx: &ReqCtx, cache_read_provider: Arc, events_read_provider: Arc, + timeout_provider: Box, ) -> Result, Error> { let deadline = Instant::now() + query @@ -314,7 +315,15 @@ pub async fn timebinned_json( events_read_provider, ) .await?; - let collected = Collect::new(stream, deadline, collect_max, bytes_max, None, Some(binned_range)); + let collected = Collect::new( + stream, + deadline, + collect_max, + bytes_max, + None, + Some(binned_range), + timeout_provider, + ); let collected: BoxFuture<_> = Box::pin(collected); let collres = collected.await?; match collres {