This commit is contained in:
Dominik Werder
2024-11-07 15:37:54 +01:00
parent e89904244c
commit 09463302ee
10 changed files with 190 additions and 146 deletions

View File

@@ -211,9 +211,17 @@ async fn binned_json_single(
let open_bytes = Arc::pin(OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone()));
let (events_read_provider, cache_read_provider) =
make_read_provider(ch_conf.name(), scyqueue, open_bytes, ctx, ncc);
let item = streams::timebinnedjson::timebinned_json(query, ch_conf, ctx, cache_read_provider, events_read_provider)
.instrument(span1)
.await?;
let timeout_provider = streamio::streamtimeout::StreamTimeout::boxed();
let item = streams::timebinnedjson::timebinned_json(
query,
ch_conf,
ctx,
cache_read_provider,
events_read_provider,
timeout_provider,
)
.instrument(span1)
.await?;
match item {
CollectResult::Some(item) => {
let ret = response(StatusCode::OK)

View File

@@ -208,7 +208,9 @@ async fn plain_events_json_framed(
debug!("plain_events_json_framed {ch_conf:?} {req:?}");
let open_bytes = OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone());
let open_bytes = Arc::pin(open_bytes);
let stream = streams::plaineventsjson::plain_events_json_stream(&evq, ch_conf, ctx, open_bytes).await?;
let timeout_provider = streamio::streamtimeout::StreamTimeout::boxed();
let stream =
streams::plaineventsjson::plain_events_json_stream(&evq, ch_conf, ctx, open_bytes, timeout_provider).await?;
let stream = bytes_chunks_to_len_framed_str(stream);
let ret = response(StatusCode::OK)
.header(CONTENT_TYPE, APP_JSON_FRAMED)
@@ -231,8 +233,16 @@ async fn plain_events_json(
debug!("{self_name} chconf_from_events_quorum: {ch_conf:?}");
let open_bytes = OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone());
let open_bytes = Arc::pin(open_bytes);
let item =
streams::plaineventsjson::plain_events_json(&evq, ch_conf, ctx, &ncc.node_config.cluster, open_bytes).await;
let timeout_provider = streamio::streamtimeout::StreamTimeout::boxed();
let item = streams::plaineventsjson::plain_events_json(
&evq,
ch_conf,
ctx,
&ncc.node_config.cluster,
open_bytes,
timeout_provider,
)
.await;
debug!("{self_name} returned {}", item.is_ok());
let item = match item {
Ok(item) => item,

View File

@@ -5,8 +5,8 @@ authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"
[dependencies]
tokio = { version = "1.41", features = ["io-util", "net", "time", "sync", "fs"] }
tokio-stream = "0.1.16"
#tokio = { version = "1.41", features = ["io-util", "net", "time", "sync", "fs"] }
#tokio-stream = "0.1.16"
futures-util = "0.3.15"
pin-project = "1.0.12"
serde = { version = "1.0", features = ["derive"] }
@@ -37,6 +37,7 @@ taskrun = { path = "../taskrun" }
[features]
wasm_transform = ["wasmer"]
indev = []
[patch.crates-io]
thiserror = { git = "https://github.com/dominikwerder/thiserror.git", branch = "cstm" }

View File

@@ -1,3 +1,4 @@
use crate::streamtimeout::StreamTimeout2;
use futures_util::Future;
use futures_util::FutureExt;
use futures_util::Stream;
@@ -14,13 +15,10 @@ use netpod::log::*;
use netpod::range::evrange::SeriesRange;
use netpod::BinnedRangeEnum;
use netpod::DiskStats;
use std::fmt;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use std::time::Duration;
use std::time::Instant;
use tracing::Instrument;
#[derive(Debug, thiserror::Error)]
#[cstm(name = "CollectDyn")]
@@ -68,8 +66,8 @@ impl Collect {
bytes_max: u64,
range: Option<SeriesRange>,
binrange: Option<BinnedRangeEnum>,
timeout_provider: Box<dyn StreamTimeout2>,
) -> Self {
let timer = tokio::time::sleep_until(deadline.into());
Self {
inp,
events_max,
@@ -79,7 +77,7 @@ impl Collect {
collector: None,
range_final: false,
timeout: false,
timer: Box::pin(timer),
timer: timeout_provider.timeout_intervals(deadline.saturating_duration_since(Instant::now())),
done_input: false,
}
}
@@ -220,124 +218,3 @@ impl Future for Collect {
}
}
}
async fn collect_in_span<T, S>(
stream: S,
deadline: Instant,
events_max: u64,
range: Option<SeriesRange>,
binrange: Option<BinnedRangeEnum>,
) -> Result<Box<dyn CollectedDyn>, Error>
where
S: Stream<Item = Sitemty<T>> + Unpin,
T: CollectableDyn,
{
info!("collect events_max {events_max} deadline {deadline:?}");
let mut collector: Option<Box<dyn CollectorDyn>> = None;
let mut stream = stream;
let deadline = deadline.into();
let mut range_complete = false;
let mut timed_out = false;
let mut total_duration = Duration::ZERO;
loop {
let item = match tokio::time::timeout_at(deadline, stream.next()).await {
Ok(Some(k)) => k,
Ok(None) => break,
Err(_e) => {
warn!("collect timeout");
timed_out = true;
if let Some(coll) = collector.as_mut() {
info!("collect_in_span call set_timed_out");
coll.set_timed_out();
} else {
warn!("collect_in_span collect timeout but no collector yet");
}
break;
}
};
match item {
Ok(item) => match item {
StreamItem::DataItem(item) => match item {
RangeCompletableItem::RangeComplete => {
range_complete = true;
if let Some(coll) = collector.as_mut() {
coll.set_range_complete();
} else {
warn!("collect_in_span received RangeComplete but no collector yet");
}
}
RangeCompletableItem::Data(mut item) => {
trace!("collect_in_span sees len {}", item.len());
if collector.is_none() {
let c = item.new_collector();
collector = Some(c);
}
let coll = collector.as_mut().unwrap();
coll.ingest(&mut item);
if coll.len() as u64 >= events_max {
warn!("span reached events_max {}", events_max);
info!("collect_in_span call set_continue_at_here");
coll.set_continue_at_here();
break;
}
}
},
StreamItem::Log(item) => {
trace!("collect_in_span log {:?}", item);
}
StreamItem::Stats(item) => {
trace!("collect_in_span stats {:?}", item);
match item {
// TODO factor and simplify the stats collection:
StatsItem::EventDataReadStats(_) => {}
StatsItem::RangeFilterStats(_) => {}
StatsItem::DiskStats(item) => match item {
DiskStats::OpenStats(k) => {
total_duration += k.duration;
}
DiskStats::SeekStats(k) => {
total_duration += k.duration;
}
DiskStats::ReadStats(k) => {
total_duration += k.duration;
}
DiskStats::ReadExactStats(k) => {
total_duration += k.duration;
}
},
_ => {}
}
}
},
Err(e) => {
// TODO Need to use some flags to get good enough error message for remote user.
return Err(ErrMsg(e).into());
}
}
}
let _ = range_complete;
let _ = timed_out;
let res = collector
.ok_or_else(|| Error::NoResultNoCollector)?
.result(range, binrange)
.map_err(ErrMsg)?;
info!("collect_in_span stats total duration: {:?}", total_duration);
Ok(res)
}
pub async fn collect<T, S>(
stream: S,
deadline: Instant,
events_max: u64,
range: Option<SeriesRange>,
binrange: Option<BinnedRangeEnum>,
) -> Result<Box<dyn CollectedDyn>, Error>
where
S: Stream<Item = Sitemty<T>> + Unpin,
T: CollectableDyn + WithLen + fmt::Debug,
{
let span = span!(Level::INFO, "collect");
collect_in_span(stream, deadline, events_max, range, binrange)
.instrument(span)
.await
}

View File

@@ -0,0 +1,124 @@
use std::fmt;
use std::time::Duration;
use tracing::Instrument;
async fn collect_in_span<T, S>(
stream: S,
deadline: Instant,
events_max: u64,
range: Option<SeriesRange>,
binrange: Option<BinnedRangeEnum>,
) -> Result<Box<dyn CollectedDyn>, Error>
where
S: Stream<Item = Sitemty<T>> + Unpin,
T: CollectableDyn,
{
info!("collect events_max {events_max} deadline {deadline:?}");
let mut collector: Option<Box<dyn CollectorDyn>> = None;
let mut stream = stream;
let deadline = deadline.into();
let mut range_complete = false;
let mut timed_out = false;
let mut total_duration = Duration::ZERO;
loop {
let item = match tokio::time::timeout_at(deadline, stream.next()).await {
Ok(Some(k)) => k,
Ok(None) => break,
Err(_e) => {
warn!("collect timeout");
timed_out = true;
if let Some(coll) = collector.as_mut() {
info!("collect_in_span call set_timed_out");
coll.set_timed_out();
} else {
warn!("collect_in_span collect timeout but no collector yet");
}
break;
}
};
match item {
Ok(item) => match item {
StreamItem::DataItem(item) => match item {
RangeCompletableItem::RangeComplete => {
range_complete = true;
if let Some(coll) = collector.as_mut() {
coll.set_range_complete();
} else {
warn!("collect_in_span received RangeComplete but no collector yet");
}
}
RangeCompletableItem::Data(mut item) => {
trace!("collect_in_span sees len {}", item.len());
if collector.is_none() {
let c = item.new_collector();
collector = Some(c);
}
let coll = collector.as_mut().unwrap();
coll.ingest(&mut item);
if coll.len() as u64 >= events_max {
warn!("span reached events_max {}", events_max);
info!("collect_in_span call set_continue_at_here");
coll.set_continue_at_here();
break;
}
}
},
StreamItem::Log(item) => {
trace!("collect_in_span log {:?}", item);
}
StreamItem::Stats(item) => {
trace!("collect_in_span stats {:?}", item);
match item {
// TODO factor and simplify the stats collection:
StatsItem::EventDataReadStats(_) => {}
StatsItem::RangeFilterStats(_) => {}
StatsItem::DiskStats(item) => match item {
DiskStats::OpenStats(k) => {
total_duration += k.duration;
}
DiskStats::SeekStats(k) => {
total_duration += k.duration;
}
DiskStats::ReadStats(k) => {
total_duration += k.duration;
}
DiskStats::ReadExactStats(k) => {
total_duration += k.duration;
}
},
_ => {}
}
}
},
Err(e) => {
// TODO Need to use some flags to get good enough error message for remote user.
return Err(ErrMsg(e).into());
}
}
}
let _ = range_complete;
let _ = timed_out;
let res = collector
.ok_or_else(|| Error::NoResultNoCollector)?
.result(range, binrange)
.map_err(ErrMsg)?;
info!("collect_in_span stats total duration: {:?}", total_duration);
Ok(res)
}
async fn collect<T, S>(
stream: S,
deadline: Instant,
events_max: u64,
range: Option<SeriesRange>,
binrange: Option<BinnedRangeEnum>,
) -> Result<Box<dyn CollectedDyn>, Error>
where
S: Stream<Item = Sitemty<T>> + Unpin,
T: CollectableDyn + WithLen + fmt::Debug,
{
let span = span!(Level::INFO, "collect");
collect_in_span(stream, deadline, events_max, range, binrange)
.instrument(span)
.await
}

View File

@@ -29,7 +29,6 @@ use std::f64::consts::PI;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use std::time::Duration;
#[derive(Debug, thiserror::Error)]
#[cstm(name = "Generator")]
@@ -40,6 +39,10 @@ pub enum Error {
BadChannelName,
}
fn make_sleep_fut() -> Pin<Box<dyn Future<Output = ()> + Send>> {
todo!()
}
pub fn make_test_channel_events_bytes_stream(
subq: EventsSubQuery,
node_count: u64,
@@ -209,7 +212,7 @@ impl Stream for GenerateI32V00 {
Pending => Pending,
}
} else {
self.timeout = Some(Box::pin(tokio::time::sleep(Duration::from_millis(2))));
self.timeout = Some(make_sleep_fut());
continue;
};
}
@@ -315,7 +318,7 @@ impl Stream for GenerateI32V01 {
Pending => Pending,
}
} else {
self.timeout = Some(Box::pin(tokio::time::sleep(Duration::from_millis(2))));
self.timeout = Some(make_sleep_fut());
continue;
};
}
@@ -418,7 +421,7 @@ impl Stream for GenerateF64V00 {
Pending => Pending,
}
} else {
self.timeout = Some(Box::pin(tokio::time::sleep(Duration::from_millis(2))));
self.timeout = Some(make_sleep_fut());
continue;
};
}
@@ -529,7 +532,7 @@ impl Stream for GenerateWaveI16V00 {
Pending => Pending,
}
} else {
self.timeout = Some(Box::pin(tokio::time::sleep(Duration::from_millis(2))));
self.timeout = Some(make_sleep_fut());
continue;
};
}

View File

@@ -1,4 +1,6 @@
use crate::cbor_stream::SitemtyDynEventsStream;
use crate::streamtimeout::StreamTimeout2;
use crate::streamtimeout::TimeoutableStream;
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::streamitem::RangeCompletableItem;
@@ -60,11 +62,15 @@ impl From<JsonBytes> for String {
pub type JsonStream = Pin<Box<dyn Stream<Item = Result<JsonBytes, Error>> + Send>>;
pub fn events_stream_to_json_stream(stream: SitemtyDynEventsStream) -> impl Stream<Item = Result<JsonBytes, Error>> {
let interval = tokio::time::interval(Duration::from_millis(4000));
let stream = tokio_stream::StreamExt::timeout_repeating(stream, interval).map(|x| match x {
Ok(x) => map_events(x),
Err(_) => make_keepalive(),
pub fn events_stream_to_json_stream(
stream: SitemtyDynEventsStream,
timeout_provider: Box<dyn StreamTimeout2>,
) -> impl Stream<Item = Result<JsonBytes, Error>> {
let ivl = Duration::from_millis(4000);
let stream = TimeoutableStream::new(ivl, timeout_provider, stream);
let stream = stream.map(|x| match x {
Some(x) => map_events(x),
None => make_keepalive(),
});
let prepend = {
let item = make_keepalive();

View File

@@ -1,6 +1,8 @@
pub mod boxed;
pub mod cbor_stream;
pub mod collect;
#[cfg(feature = "indev")]
pub mod collect_adapter;
pub mod dtflags;
pub mod events;
pub mod eventsplainreader;

View File

@@ -5,6 +5,7 @@ use crate::firsterr::only_first_err;
use crate::json_stream::events_stream_to_json_stream;
use crate::json_stream::JsonStream;
use crate::plaineventsstream::dyn_events_stream;
use crate::streamtimeout::StreamTimeout2;
use crate::tcprawclient::OpenBoxedBytesStreamsBox;
use futures_util::StreamExt;
use items_0::collect_s::CollectableDyn;
@@ -33,6 +34,7 @@ pub async fn plain_events_json(
ctx: &ReqCtx,
_cluster: &Cluster,
open_bytes: OpenBoxedBytesStreamsBox,
timeout_provider: Box<dyn StreamTimeout2>,
) -> Result<CollectResult<JsonValue>, Error> {
debug!("plain_events_json evquery {:?}", evq);
let deadline = Instant::now() + evq.timeout().unwrap_or(Duration::from_millis(4000));
@@ -88,6 +90,7 @@ pub async fn plain_events_json(
evq.bytes_max(),
Some(evq.range().clone()),
None,
timeout_provider,
)
.await?;
debug!("plain_events_json collected");
@@ -106,10 +109,11 @@ pub async fn plain_events_json_stream(
ch_conf: ChannelTypeConfigGen,
ctx: &ReqCtx,
open_bytes: OpenBoxedBytesStreamsBox,
timeout_provider: Box<dyn StreamTimeout2>,
) -> Result<JsonStream, Error> {
trace!("plain_events_json_stream");
let stream = dyn_events_stream(evq, ch_conf, ctx, open_bytes).await?;
let stream = events_stream_to_json_stream(stream);
let stream = events_stream_to_json_stream(stream, timeout_provider);
let stream = non_empty(stream);
let stream = only_first_err(stream);
Ok(Box::pin(stream))

View File

@@ -294,6 +294,7 @@ pub async fn timebinned_json(
ctx: &ReqCtx,
cache_read_provider: Arc<dyn CacheReadProvider>,
events_read_provider: Arc<dyn EventsReadProvider>,
timeout_provider: Box<dyn StreamTimeout2>,
) -> Result<CollectResult<JsonValue>, Error> {
let deadline = Instant::now()
+ query
@@ -314,7 +315,15 @@ pub async fn timebinned_json(
events_read_provider,
)
.await?;
let collected = Collect::new(stream, deadline, collect_max, bytes_max, None, Some(binned_range));
let collected = Collect::new(
stream,
deadline,
collect_max,
bytes_max,
None,
Some(binned_range),
timeout_provider,
);
let collected: BoxFuture<_> = Box::pin(collected);
let collres = collected.await?;
match collres {