From 5ee1779feeab40f00674ccdb9c8c69c48e673f5b Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 12 Sep 2024 17:13:50 +0200 Subject: [PATCH] WIP --- crates/httpret/src/api4/binned.rs | 77 +++- crates/httpret/src/api4/events.rs | 3 +- crates/items_2/src/binsdim0.rs | 1 + crates/netpod/src/query.rs | 16 + crates/scyllaconn/src/bincache.rs | 372 ++------------------ crates/scyllaconn/src/events2/prepare.rs | 50 +++ crates/scyllaconn/src/worker.rs | 50 ++- crates/streams/src/plaineventsjson.rs | 2 +- crates/streams/src/timebin/cached/reader.rs | 81 ++++- crates/streams/src/timebin/fromevents.rs | 6 +- crates/streams/src/timebin/fromlayers.rs | 8 + crates/streams/src/timebin/gapfill.rs | 118 ++++--- crates/streams/src/timebinnedjson.rs | 118 ++++++- 13 files changed, 502 insertions(+), 400 deletions(-) diff --git a/crates/httpret/src/api4/binned.rs b/crates/httpret/src/api4/binned.rs index f417360..107a6cd 100644 --- a/crates/httpret/src/api4/binned.rs +++ b/crates/httpret/src/api4/binned.rs @@ -1,14 +1,18 @@ +use crate::api4::events::bytes_chunks_to_len_framed_str; use crate::bodystream::response; use crate::channelconfig::ch_conf_from_binned; +use crate::requests::accepts_json_framed; use crate::requests::accepts_json_or_all; use crate::requests::accepts_octets; use crate::ServiceSharedResources; use dbconn::worker::PgQueue; use err::thiserror; use err::ThisError; +use http::header::CONTENT_TYPE; use http::Method; use http::StatusCode; use httpclient::body_empty; +use httpclient::body_stream; use httpclient::error_response; use httpclient::not_found_response; use httpclient::IntoBody; @@ -21,6 +25,8 @@ use netpod::timeunits::SEC; use netpod::FromUrl; use netpod::NodeConfigCached; use netpod::ReqCtx; +use netpod::APP_JSON_FRAMED; +use netpod::HEADER_NAME_REQUEST_ID; use nodenet::client::OpenBoxedBytesViaHttp; use nodenet::scylla::ScyllaEventReadProvider; use query::api4::binned::BinnedQuery; @@ -112,9 +118,11 @@ async fn binned( { Err(Error::ServerError)?; } - if accepts_json_or_all(&req.headers()) { - Ok(binned_json(url, req, ctx, pgqueue, scyqueue, ncc).await?) - } else if accepts_octets(&req.headers()) { + if accepts_json_framed(req.headers()) { + Ok(binned_json_framed(url, req, ctx, pgqueue, scyqueue, ncc).await?) + } else if accepts_json_or_all(req.headers()) { + Ok(binned_json_single(url, req, ctx, pgqueue, scyqueue, ncc).await?) + } else if accepts_octets(req.headers()) { Ok(error_response( format!("binary binned data not yet available"), ctx.reqid(), @@ -125,7 +133,7 @@ async fn binned( } } -async fn binned_json( +async fn binned_json_single( url: Url, req: Requ, ctx: &ReqCtx, @@ -133,7 +141,8 @@ async fn binned_json( scyqueue: Option, ncc: &NodeConfigCached, ) -> Result { - debug!("{:?}", req); + // TODO unify with binned_json_framed + debug!("binned_json_single {:?}", req); let reqid = crate::status_board().map_err(|_e| Error::ServerError)?.new_status_id(); let (_head, _body) = req.into_parts(); let query = BinnedQuery::from_url(&url).map_err(|e| { @@ -178,3 +187,61 @@ async fn binned_json( let ret = response(StatusCode::OK).body(ToJsonBody::from(&item).into_body())?; Ok(ret) } + +async fn binned_json_framed( + url: Url, + req: Requ, + ctx: &ReqCtx, + pgqueue: &PgQueue, + scyqueue: Option, + ncc: &NodeConfigCached, +) -> Result { + debug!("binned_json_framed {:?}", req); + let reqid = crate::status_board().map_err(|_e| Error::ServerError)?.new_status_id(); + let (_head, _body) = req.into_parts(); + let query = BinnedQuery::from_url(&url).map_err(|e| { + error!("binned_json: {e:?}"); + Error::BadQuery(e.to_string()) + })?; + // TODO handle None case better and return 404 + let ch_conf = ch_conf_from_binned(&query, ctx, pgqueue, ncc) + .await? + .ok_or_else(|| Error::ChannelNotFound)?; + let span1 = span!( + Level::INFO, + "httpret::binned", + reqid, + beg = query.range().beg_u64() / SEC, + end = query.range().end_u64() / SEC, + ch = query.channel().name(), + ); + span1.in_scope(|| { + debug!("begin"); + }); + let open_bytes = OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone()); + let open_bytes = Arc::pin(open_bytes); + let cache_read_provider = scyqueue + .clone() + .map(|qu| ScyllaCacheReadProvider::new(qu)) + .map(|x| Arc::new(x) as Arc); + let events_read_provider = scyqueue + .map(|qu| ScyllaEventReadProvider::new(qu)) + .map(|x| Arc::new(x) as Arc); + let stream = streams::timebinnedjson::timebinned_json_framed( + query, + ch_conf, + ctx, + open_bytes, + cache_read_provider, + events_read_provider, + ) + .instrument(span1) + .await + .map_err(|e| Error::BinnedStream(e))?; + let stream = bytes_chunks_to_len_framed_str(stream); + let ret = response(StatusCode::OK) + .header(CONTENT_TYPE, APP_JSON_FRAMED) + .header(HEADER_NAME_REQUEST_ID, ctx.reqid()) + .body(body_stream(stream))?; + Ok(ret) +} diff --git a/crates/httpret/src/api4/events.rs b/crates/httpret/src/api4/events.rs index 20b4d94..9a552ac 100644 --- a/crates/httpret/src/api4/events.rs +++ b/crates/httpret/src/api4/events.rs @@ -275,7 +275,8 @@ where .filter(|x| if let Ok(x) = x { ready(x.len() > 0) } else { ready(true) }) } -fn bytes_chunks_to_len_framed_str(stream: S) -> impl Stream> +// TODO move this, it's also used by binned. +pub fn bytes_chunks_to_len_framed_str(stream: S) -> impl Stream> where S: Stream>, T: Into, diff --git a/crates/items_2/src/binsdim0.rs b/crates/items_2/src/binsdim0.rs index 20aede6..38ec0cc 100644 --- a/crates/items_2/src/binsdim0.rs +++ b/crates/items_2/src/binsdim0.rs @@ -848,6 +848,7 @@ impl CollectorType for BinsDim0Collector { continue_at, finished_at, }; + *self = Self::new(); Ok(ret) } } diff --git a/crates/netpod/src/query.rs b/crates/netpod/src/query.rs index d629ac5..5f9913e 100644 --- a/crates/netpod/src/query.rs +++ b/crates/netpod/src/query.rs @@ -72,6 +72,22 @@ impl CacheUsage { }; Ok(ret) } + + pub fn is_cache_write(&self) -> bool { + match self { + CacheUsage::Use => true, + CacheUsage::Ignore => false, + CacheUsage::Recreate => true, + } + } + + pub fn is_cache_read(&self) -> bool { + match self { + CacheUsage::Use => true, + CacheUsage::Ignore => false, + CacheUsage::Recreate => false, + } + } } impl fmt::Display for CacheUsage { diff --git a/crates/scyllaconn/src/bincache.rs b/crates/scyllaconn/src/bincache.rs index e649348..b68456f 100644 --- a/crates/scyllaconn/src/bincache.rs +++ b/crates/scyllaconn/src/bincache.rs @@ -1,6 +1,7 @@ #![allow(unused)] use crate::errconv::ErrConv; +use crate::events2::prepare::StmtsCache; use crate::worker::ScyllaQueue; use err::Error; use futures_util::Future; @@ -28,6 +29,7 @@ use netpod::TsNano; use query::transform::TransformQuery; use scylla::Session as ScySession; use std::collections::VecDeque; +use std::ops::Range; use std::pin::Pin; use std::sync::Arc; use std::task::Context; @@ -35,100 +37,6 @@ use std::task::Poll; use std::time::Duration; use std::time::Instant; -pub async fn read_cached_scylla( - series: u64, - chn: &ChannelTyped, - coord: &PreBinnedPatchCoordEnum, - scy: &ScySession, -) -> Result>, Error> { - /*let vals = ( - series as i64, - (coord.bin_t_len() / SEC) as i32, - (coord.patch_t_len() / SEC) as i32, - coord.ix() as i64, - );*/ - todo!(); - let vals: (i64, i32, i32, i64) = todo!(); - let res = scy - .query_iter( - "select counts, avgs, mins, maxs from binned_scalar_f32 where series = ? and bin_len_sec = ? and patch_len_sec = ? and agg_kind = 'dummy-agg-kind' and offset = ?", - vals, - ) - .await; - let mut res = res.err_conv().map_err(|e| { - error!("can not read from cache"); - e - })?; - while let Some(item) = res.next().await { - let row = item.err_conv()?; - // let edges = coord.edges(); - let edges: Vec = todo!(); - let (counts, avgs, mins, maxs): (Vec, Vec, Vec, Vec) = row.into_typed().err_conv()?; - let mut counts_mismatch = false; - if edges.len() != counts.len() + 1 { - counts_mismatch = true; - } - if counts.len() != avgs.len() { - counts_mismatch = true; - } - let ts1s: VecDeque<_> = edges[..(edges.len() - 1).min(edges.len())].iter().map(|&x| x).collect(); - let ts2s: VecDeque<_> = edges[1.min(edges.len())..].iter().map(|&x| x).collect(); - if ts1s.len() != ts2s.len() { - error!("ts1s vs ts2s mismatch"); - counts_mismatch = true; - } - if ts1s.len() != counts.len() { - counts_mismatch = true; - } - let avgs: VecDeque<_> = avgs.into_iter().map(|x| x).collect(); - let mins: VecDeque<_> = mins.into_iter().map(|x| x as _).collect(); - let maxs: VecDeque<_> = maxs.into_iter().map(|x| x as _).collect(); - if counts_mismatch { - error!( - "mismatch: edges {} ts1s {} ts2s {} counts {} avgs {} mins {} maxs {}", - edges.len(), - ts1s.len(), - ts2s.len(), - counts.len(), - avgs.len(), - mins.len(), - maxs.len(), - ); - } - let counts: VecDeque<_> = counts.into_iter().map(|x| x as u64).collect(); - // TODO construct a dyn TimeBinned using the scalar type and shape information. - // TODO place the values with little copying into the TimeBinned. - use ScalarType::*; - use Shape::*; - match &chn.shape { - Scalar => match &chn.scalar_type { - F64 => { - let ret = BinsDim0:: { - ts1s, - ts2s, - counts, - avgs, - mins, - maxs, - // TODO: - dim0kind: Some(Dim0Kind::Time), - }; - return Ok(Some(Box::new(ret))); - } - _ => { - error!("TODO can not yet restore {:?} {:?}", chn.scalar_type, chn.shape); - err::todoval() - } - }, - _ => { - error!("TODO can not yet restore {:?} {:?}", chn.scalar_type, chn.shape); - err::todoval() - } - } - } - Ok(None) -} - #[allow(unused)] struct WriteFut<'a> { chn: &'a ChannelTyped, @@ -300,231 +208,6 @@ pub fn fetch_uncached_data_box( )) } -pub async fn fetch_uncached_higher_res_prebinned( - series: u64, - chn: &ChannelTyped, - coord: PreBinnedPatchCoordEnum, - range: PreBinnedPatchRangeEnum, - one_before_range: bool, - transform: TransformQuery, - cache_usage: CacheUsage, - scy: Arc, -) -> Result<(Box, bool), Error> { - /*let edges = coord.edges(); - // TODO refine the AggKind scheme or introduce a new BinningOpts type and get time-weight from there. - let do_time_weight = true; - // We must produce some result with correct types even if upstream delivers nothing at all. - //let bin0 = empty_binned_dyn_tb(&chn.scalar_type, &chn.shape, &transform); - let bin0 = err::todoval(); - let mut time_binner = bin0.time_binner_new(edges.clone(), do_time_weight); - let mut complete = true; - //let patch_it = PreBinnedPatchIterator::from_range(range.clone()); - let patches_dummy: Vec = Vec::new(); - let mut patch_it = patches_dummy.into_iter(); - for patch_coord in patch_it { - // We request data here for a Coord, meaning that we expect to receive multiple bins. - // The expectation is that we receive a single TimeBinned which contains all bins of that PatchCoord. - //let patch_coord = PreBinnedPatchCoord::new(patch.bin_t_len(), patch.patch_t_len(), patch.ix()); - let (bin, comp) = pre_binned_value_stream_with_scy( - series, - chn, - &patch_coord, - one_before_range, - transform.clone(), - cache_usage.clone(), - scy.clone(), - ) - .await?; - if let Err(msg) = bin.validate() { - error!( - "pre-binned intermediate issue {} coord {:?} patch_coord {:?}", - msg, coord, patch_coord - ); - } - complete = complete && comp; - time_binner.ingest(bin.as_time_binnable_dyn()); - } - // Fixed limit to defend against a malformed implementation: - let mut i = 0; - while i < 80000 && time_binner.bins_ready_count() < coord.bin_count() as usize { - let n1 = time_binner.bins_ready_count(); - if false { - trace!( - "pre-binned extra cycle {} {} {}", - i, - time_binner.bins_ready_count(), - coord.bin_count() - ); - } - time_binner.cycle(); - i += 1; - if time_binner.bins_ready_count() <= n1 { - warn!("pre-binned cycle did not add another bin, break"); - break; - } - } - if time_binner.bins_ready_count() < coord.bin_count() as usize { - return Err(Error::with_msg_no_trace(format!( - "pre-binned unable to produce all bins for the patch bins_ready {} coord.bin_count {} edges.len {}", - time_binner.bins_ready_count(), - coord.bin_count(), - edges.len(), - ))); - } - let ready = time_binner - .bins_ready() - .ok_or_else(|| Error::with_msg_no_trace(format!("unable to produce any bins for the patch range")))?; - if let Err(msg) = ready.validate() { - error!("pre-binned final issue {} coord {:?}", msg, coord); - } - Ok((ready, complete))*/ - todo!() -} - -pub async fn fetch_uncached_binned_events( - series: u64, - chn: &ChannelTyped, - coord: PreBinnedPatchCoordEnum, - one_before_range: bool, - transform: TransformQuery, - scy: Arc, -) -> Result<(Box, bool), Error> { - /*let edges = coord.edges(); - // TODO refine the AggKind scheme or introduce a new BinningOpts type and get time-weight from there. - let do_time_weight = true; - // We must produce some result with correct types even if upstream delivers nothing at all. - //let bin0 = empty_events_dyn_tb(&chn.scalar_type, &chn.shape, &agg_kind); - //let mut time_binner = bin0.time_binner_new(edges.clone(), do_time_weight); - let mut time_binner = items_2::empty::empty_events_dyn_ev(&chn.scalar_type, &chn.shape)? - .as_time_binnable() - .time_binner_new(edges.clone(), do_time_weight); - // TODO handle deadline better - let deadline = Instant::now(); - // TODO take timeout from query - let deadline = deadline - .checked_add(Duration::from_millis(6000)) - .ok_or_else(|| Error::with_msg_no_trace(format!("deadline overflow")))?; - let evq = PlainEventsQuery::new(chn.channel.clone(), coord.patch_range()); - let mut events_dyn = EventsStreamScylla::new( - series, - evq.range().clone(), - one_before_range, - chn.scalar_type.clone(), - chn.shape.clone(), - true, - scy, - false, - ); - let mut complete = false; - loop { - let item = tokio::time::timeout_at(deadline.into(), events_dyn.next()).await; - let item = match item { - Ok(Some(k)) => k, - Ok(None) => break, - Err(_) => { - error!("fetch_uncached_binned_events timeout"); - return Err(Error::with_msg_no_trace(format!( - "TODO handle fetch_uncached_binned_events timeout" - ))); - } - }; - if false { - // TODO as soon we encounter RangeComplete we just: - complete = true; - } - match item { - Ok(ChannelEvents::Events(item)) => { - time_binner.ingest(item.as_time_binnable()); - // TODO could also ask the binner here whether we are "complete" to stop sending useless data. - } - Ok(ChannelEvents::Status(_)) => { - // TODO flag, should not happen. - return Err(Error::with_msg_no_trace(format!( - "unexpected read of channel status events" - ))); - } - Err(e) => return Err(e), - } - } - // Fixed limit to defend against a malformed implementation: - let mut i = 0; - while i < 80000 && time_binner.bins_ready_count() < coord.bin_count() as usize { - let n1 = time_binner.bins_ready_count(); - if false { - trace!( - "events extra cycle {} {} {}", - i, - time_binner.bins_ready_count(), - coord.bin_count() - ); - } - time_binner.cycle(); - i += 1; - if time_binner.bins_ready_count() <= n1 { - warn!("events cycle did not add another bin, break"); - break; - } - } - if time_binner.bins_ready_count() < coord.bin_count() as usize { - return Err(Error::with_msg_no_trace(format!( - "events unable to produce all bins for the patch bins_ready {} coord.bin_count {} edges.len {}", - time_binner.bins_ready_count(), - coord.bin_count(), - edges.len(), - ))); - } - let ready = time_binner - .bins_ready() - .ok_or_else(|| Error::with_msg_no_trace(format!("unable to produce any bins for the patch")))?; - if let Err(msg) = ready.validate() { - error!("time binned invalid {} coord {:?}", msg, coord); - } - Ok((ready, complete))*/ - todo!() -} - -pub async fn pre_binned_value_stream_with_scy( - series: u64, - chn: &ChannelTyped, - coord: &PreBinnedPatchCoordEnum, - one_before_range: bool, - transform: TransformQuery, - cache_usage: CacheUsage, - scy: Arc, -) -> Result<(Box, bool), Error> { - trace!("pre_binned_value_stream_with_scy {chn:?} {coord:?}"); - if let (Some(item), CacheUsage::Use) = (read_cached_scylla(series, chn, coord, &scy).await?, &cache_usage) { - info!("+++++++++++++ GOOD READ"); - Ok((item, true)) - } else { - if let CacheUsage::Use = &cache_usage { - warn!("--+--+--+--+--+--+ NOT YET CACHED"); - } - let res = fetch_uncached_data_box(series, chn, coord, one_before_range, transform, cache_usage, scy).await?; - let (bin, complete) = - res.ok_or_else(|| Error::with_msg_no_trace(format!("pre_binned_value_stream_with_scy got None bin")))?; - Ok((bin, complete)) - } -} - -pub async fn pre_binned_value_stream( - series: u64, - chn: &ChannelTyped, - coord: &PreBinnedPatchCoordEnum, - one_before_range: bool, - transform: TransformQuery, - agg_kind: AggKind, - cache_usage: CacheUsage, - scy: Arc, -) -> Result, Error>> + Send>>, Error> { - trace!("pre_binned_value_stream series {series} {chn:?} {coord:?}"); - let res = - pre_binned_value_stream_with_scy(series, chn, coord, one_before_range, transform, cache_usage, scy).await?; - error!("TODO pre_binned_value_stream"); - err::todo(); - Ok(Box::pin(futures_util::stream::iter([Ok(res.0)]))) -} - pub struct ScyllaCacheReadProvider { scyqueue: ScyllaQueue, } @@ -536,9 +219,16 @@ impl ScyllaCacheReadProvider { } impl streams::timebin::CacheReadProvider for ScyllaCacheReadProvider { - fn read(&self, series: u64, range: BinnedRange) -> streams::timebin::cached::reader::CacheReading { - warn!("impl CacheReadProvider for ScyllaCacheReadProvider"); - todo!("impl CacheReadProvider for ScyllaCacheReadProvider") + fn read( + &self, + series: u64, + bin_len: DtMs, + msp: u64, + offs: Range, + ) -> streams::timebin::cached::reader::CacheReading { + let scyqueue = self.scyqueue.clone(); + let fut = async move { scyqueue.read_cache_f32(series, bin_len, msp, offs).await }; + streams::timebin::cached::reader::CacheReading::new(Box::pin(fut)) } fn write(&self, series: u64, bins: BinsDim0) -> streams::timebin::cached::reader::CacheWriting { @@ -551,6 +241,7 @@ impl streams::timebin::CacheReadProvider for ScyllaCacheReadProvider { pub async fn worker_write( series: u64, bins: BinsDim0, + stmts_cache: &StmtsCache, scy: &ScySession, ) -> Result<(), streams::timebin::cached::reader::Error> { let mut msp_last = u64::MAX; @@ -564,8 +255,7 @@ pub async fn worker_write( .zip(bins.avgs.iter()) { let bin_len = DtMs::from_ms_u64((ts2 - ts1) / 1000000); - let part_len = DtMs::from_ms_u64(bin_len.ms() * 1000); - let div = part_len.ns(); + let div = streams::timebin::cached::reader::part_len(bin_len).ns(); let msp = ts1 / div; let off = (ts1 - msp * div) / bin_len.ns(); let params = ( @@ -579,27 +269,22 @@ pub async fn worker_write( avg, ); eprintln!("cache write {:?}", params); - scy.query( - "insert into sf_st.st_binned_scalar_f32 (series, bin_len_ms, ts_msp, off, count, min, max, avg) values (?, ?, ?, ?, ?, ?, ?, ?)", - params, - ) - .await - .map_err(|e| streams::timebin::cached::reader::Error::Scylla(e.to_string()))?; + scy.execute(stmts_cache.st_write_f32(), params) + .await + .map_err(|e| streams::timebin::cached::reader::Error::Scylla(e.to_string()))?; } Ok(()) } pub async fn worker_read( series: u64, - range: BinnedRange, + bin_len: DtMs, + msp: u64, + offs: core::ops::Range, + stmts_cache: &StmtsCache, scy: &ScySession, ) -> Result, streams::timebin::cached::reader::Error> { - let bin_len: DtMs = todo!(); - let part_len = DtMs::from_ms_u64(bin_len.ms() * 1000); - let div = part_len.ns(); - let msp: u64 = 0; - let offs: core::ops::Range = todo!(); - let cql = "select off, count, min, max, avg from sf_st.st_binned_scalar_f32 where series = ? and bin_len_ms = ? and ts_msp = ? and off >= ? and off < ?"; + let div = streams::timebin::cached::reader::part_len(bin_len).ns(); let params = ( series as i64, bin_len.ms() as i32, @@ -608,16 +293,21 @@ pub async fn worker_read( offs.end as i32, ); let res = scy - .query_iter(cql, params) + .execute_iter(stmts_cache.st_read_f32().clone(), params) .await .map_err(|e| streams::timebin::cached::reader::Error::Scylla(e.to_string()))?; - let it = res.into_typed::<(i32, i64, f32, f32, f32)>(); + let mut it = res.into_typed::<(i32, i64, f32, f32, f32)>(); let mut bins = BinsDim0::empty(); while let Some(x) = it.next().await { let row = x.map_err(|e| streams::timebin::cached::reader::Error::Scylla(e.to_string()))?; let off = row.0 as u64; - // TODO push bins - todo!("push bins"); + let cnt = row.1 as u64; + let min = row.2; + let max = row.3; + let avg = row.4; + let ts1 = bin_len.ns() * off + div * msp; + let ts2 = ts1 + bin_len.ns(); + bins.push(ts1, ts2, cnt, min, max, avg); } Ok(bins) } diff --git a/crates/scyllaconn/src/events2/prepare.rs b/crates/scyllaconn/src/events2/prepare.rs index 6686124..4e27d93 100644 --- a/crates/scyllaconn/src/events2/prepare.rs +++ b/crates/scyllaconn/src/events2/prepare.rs @@ -248,3 +248,53 @@ impl StmtsEvents { } } } + +#[derive(Debug)] +pub struct StmtsCache { + st_write_f32: PreparedStatement, + st_read_f32: PreparedStatement, +} + +impl StmtsCache { + pub async fn new(ks: &str, scy: &Session) -> Result { + let rt = RetentionTime::Short; + let st_write_f32 = scy + .prepare(format!( + concat!( + "insert into {}.{}binned_scalar_f32", + " (series, bin_len_ms, ts_msp, off, count, min, max, avg)", + " values (?, ?, ?, ?, ?, ?, ?, ?)" + ), + ks, + rt.table_prefix() + )) + .await?; + let st_read_f32 = scy + .prepare(format!( + concat!( + "select off, count, min, max, avg", + " from {}.{}binned_scalar_f32", + " where series = ?", + " and bin_len_ms = ?", + " and ts_msp = ?", + " and off >= ? and off < ?" + ), + ks, + rt.table_prefix() + )) + .await?; + let ret = Self { + st_write_f32, + st_read_f32, + }; + Ok(ret) + } + + pub fn st_write_f32(&self) -> &PreparedStatement { + &self.st_write_f32 + } + + pub fn st_read_f32(&self) -> &PreparedStatement { + &self.st_read_f32 + } +} diff --git a/crates/scyllaconn/src/worker.rs b/crates/scyllaconn/src/worker.rs index 7fa5df6..2534f97 100644 --- a/crates/scyllaconn/src/worker.rs +++ b/crates/scyllaconn/src/worker.rs @@ -1,4 +1,5 @@ use crate::conn::create_scy_session_no_ks; +use crate::events2::prepare::StmtsCache; use crate::events2::prepare::StmtsEvents; use crate::range::ScyllaSeriesRange; use async_channel::Receiver; @@ -10,6 +11,7 @@ use items_0::Events; use items_2::binsdim0::BinsDim0; use netpod::log::*; use netpod::ttl::RetentionTime; +use netpod::DtMs; use netpod::ScyllaConfig; use netpod::TsMs; use scylla::Session; @@ -33,6 +35,15 @@ pub enum Error { CacheWriteF32(#[from] streams::timebin::cached::reader::Error), } +#[derive(Debug)] +struct ReadCacheF32 { + series: u64, + bin_len: DtMs, + msp: u64, + offs: core::ops::Range, + tx: Sender, streams::timebin::cached::reader::Error>>, +} + #[derive(Debug)] enum Job { FindTsMsp( @@ -54,6 +65,7 @@ enum Job { BinsDim0, Sender>, ), + ReadCacheF32(ReadCacheF32), } struct ReadNextValues { @@ -142,6 +154,32 @@ impl ScyllaQueue { .map_err(|_| streams::timebin::cached::reader::Error::ChannelRecv)??; Ok(res) } + + pub async fn read_cache_f32( + &self, + series: u64, + bin_len: DtMs, + msp: u64, + offs: core::ops::Range, + ) -> Result, streams::timebin::cached::reader::Error> { + let (tx, rx) = async_channel::bounded(1); + let job = Job::ReadCacheF32(ReadCacheF32 { + series, + bin_len, + msp, + offs, + tx, + }); + self.tx + .send(job) + .await + .map_err(|_| streams::timebin::cached::reader::Error::ChannelSend)?; + let res = rx + .recv() + .await + .map_err(|_| streams::timebin::cached::reader::Error::ChannelRecv)??; + Ok(res) + } } #[derive(Debug)] @@ -182,6 +220,8 @@ impl ScyllaWorker { info!("scylla worker PREPARE START"); let stmts = StmtsEvents::new(kss.try_into().map_err(|_| Error::MissingKeyspaceConfig)?, &scy).await?; let stmts = Arc::new(stmts); + let stmts_cache = StmtsCache::new(kss[0], &scy).await?; + let stmts_cache = Arc::new(stmts_cache); info!("scylla worker PREPARE DONE"); loop { let x = self.rx.recv().await; @@ -217,11 +257,19 @@ impl ScyllaWorker { } } Job::WriteCacheF32(series, bins, tx) => { - let res = super::bincache::worker_write(series, bins, &scy).await; + let res = super::bincache::worker_write(series, bins, &stmts_cache, &scy).await; if tx.send(res).await.is_err() { // TODO count for stats } } + Job::ReadCacheF32(job) => { + let res = + super::bincache::worker_read(job.series, job.bin_len, job.msp, job.offs, &stmts_cache, &scy) + .await; + if job.tx.send(res).await.is_err() { + // TODO count for stats + } + } } } info!("scylla worker finished"); diff --git a/crates/streams/src/plaineventsjson.rs b/crates/streams/src/plaineventsjson.rs index 910033f..f2d2a07 100644 --- a/crates/streams/src/plaineventsjson.rs +++ b/crates/streams/src/plaineventsjson.rs @@ -104,7 +104,7 @@ pub async fn plain_events_json_stream( ctx: &ReqCtx, open_bytes: OpenBoxedBytesStreamsBox, ) -> Result { - trace!("build stream"); + trace!("plain_events_json_stream"); let stream = dyn_events_stream(evq, ch_conf, ctx, open_bytes).await?; let stream = events_stream_to_json_stream(stream); let stream = non_empty(stream); diff --git a/crates/streams/src/timebin/cached/reader.rs b/crates/streams/src/timebin/cached/reader.rs index 99f5901..af30fe6 100644 --- a/crates/streams/src/timebin/cached/reader.rs +++ b/crates/streams/src/timebin/cached/reader.rs @@ -15,11 +15,23 @@ use netpod::DtMs; use netpod::TsNano; use query::api4::events::EventsSubQuery; use std::future::Future; +use std::ops::Range; use std::pin::Pin; use std::sync::Arc; use std::task::Context; use std::task::Poll; +#[allow(unused)] +macro_rules! trace_emit { ($($arg:tt)*) => ( if true { trace!($($arg)*); } ) } + +pub fn off_max() -> u64 { + 1000 +} + +pub fn part_len(bin_len: DtMs) -> DtMs { + DtMs::from_ms_u64(bin_len.ms() * off_max()) +} + pub struct EventsReading { stream: Pin> + Send>>, } @@ -43,11 +55,19 @@ pub trait EventsReadProvider: Send + Sync { } pub struct CacheReading { - fut: Pin, Box>> + Send>>, + fut: Pin, streams::timebin::cached::reader::Error>> + Send>>, +} + +impl CacheReading { + pub fn new( + fut: Pin, streams::timebin::cached::reader::Error>> + Send>>, + ) -> Self { + Self { fut } + } } impl Future for CacheReading { - type Output = Result, Box>; + type Output = Result, streams::timebin::cached::reader::Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { self.fut.poll_unpin(cx) @@ -73,7 +93,7 @@ impl Future for CacheWriting { } pub trait CacheReadProvider: Send + Sync { - fn read(&self, series: u64, range: BinnedRange) -> CacheReading; + fn read(&self, series: u64, bin_len: DtMs, msp: u64, offs: Range) -> CacheReading; fn write(&self, series: u64, bins: BinsDim0) -> CacheWriting; } @@ -87,17 +107,28 @@ pub enum Error { } pub struct CachedReader { + series: u64, + range: BinnedRange, + ts1next: TsNano, + bin_len: DtMs, cache_read_provider: Arc, + reading: Option, Error>> + Send>>>, } impl CachedReader { pub fn new( series: u64, - bin_len: DtMs, range: BinnedRange, cache_read_provider: Arc, ) -> Result { - let ret = Self { cache_read_provider }; + let ret = Self { + series, + ts1next: range.nano_beg(), + bin_len: range.bin_len.to_dt_ms(), + range, + cache_read_provider, + reading: None, + }; Ok(ret) } } @@ -113,8 +144,42 @@ impl Stream for CachedReader { // Change the worker interface: // We should already compute here the msp and off because we must here implement the loop logic. // Therefore worker interface should not accept BinnedRange, but msp and off range. - error!("TODO CachedReader impl split reads over known ranges"); - // Ready(Some(Err(Error::TodoImpl))) - Ready(None) + loop { + break if let Some(fut) = self.reading.as_mut() { + match fut.poll_unpin(cx) { + Ready(x) => { + self.reading = None; + match x { + Ok(bins) => { + use items_0::WithLen; + trace_emit!( + "- - - - - - - - - - - - emit cached bins {} bin_len {}", + bins.len(), + self.bin_len + ); + Ready(Some(Ok(bins))) + } + Err(e) => Ready(Some(Err(e))), + } + } + Pending => Pending, + } + } else { + if self.ts1next < self.range.nano_end() { + let div = part_len(self.bin_len).ns(); + let msp = self.ts1next.ns() / div; + let off = (self.ts1next.ns() - div * msp) / self.bin_len.ns(); + let off2 = (self.range.nano_end().ns() - div * msp) / self.bin_len.ns(); + let off2 = off2.min(off_max()); + self.ts1next = TsNano::from_ns(self.bin_len.ns() * off2 + div * msp); + let offs = off as u32..off2 as u32; + let fut = self.cache_read_provider.read(self.series, self.bin_len, msp, offs); + self.reading = Some(Box::pin(fut)); + continue; + } else { + Ready(None) + } + }; + } } } diff --git a/crates/streams/src/timebin/fromevents.rs b/crates/streams/src/timebin/fromevents.rs index 3e22a70..71d8d2a 100644 --- a/crates/streams/src/timebin/fromevents.rs +++ b/crates/streams/src/timebin/fromevents.rs @@ -1,5 +1,4 @@ use super::cached::reader::EventsReadProvider; -use super::cached::reader::EventsReading; use err::thiserror; use err::ThisError; use futures_util::Stream; @@ -18,6 +17,9 @@ use std::sync::Arc; use std::task::Context; use std::task::Poll; +#[allow(unused)] +macro_rules! trace_emit { ($($arg:tt)*) => ( if true { trace!($($arg)*); } ) } + #[derive(Debug, ThisError)] #[cstm(name = "ReadingBinnedFromEvents")] pub enum Error {} @@ -47,6 +49,8 @@ impl BinnedFromEvents { // TODO need a typed time binner if let Some(x) = x.as_any_mut().downcast_mut::>() { let y = x.clone(); + use items_0::WithLen; + trace_emit!("=========== ========= emit from events {}", y.len()); Ok(StreamItem::DataItem(RangeCompletableItem::Data(y))) } else { Err(::err::Error::with_msg_no_trace( diff --git a/crates/streams/src/timebin/fromlayers.rs b/crates/streams/src/timebin/fromlayers.rs index c17f1ce..4bb7ced 100644 --- a/crates/streams/src/timebin/fromlayers.rs +++ b/crates/streams/src/timebin/fromlayers.rs @@ -15,6 +15,7 @@ use items_0::streamitem::StreamItem; use items_0::timebin::TimeBinnableTy; use items_2::binsdim0::BinsDim0; use netpod::log::*; +use netpod::query::CacheUsage; use netpod::range::evrange::SeriesRange; use netpod::BinnedRange; use netpod::BinnedRangeEnum; @@ -45,6 +46,7 @@ type BoxedInput = Pin>> + Send>>; pub struct TimeBinnedFromLayers { ch_conf: ChannelTypeConfigGen, + cache_usage: CacheUsage, transform_query: TransformQuery, sub: EventsSubQuerySettings, log_level: String, @@ -60,6 +62,7 @@ impl TimeBinnedFromLayers { pub fn new( ch_conf: ChannelTypeConfigGen, + cache_usage: CacheUsage, transform_query: TransformQuery, sub: EventsSubQuerySettings, log_level: String, @@ -85,6 +88,7 @@ impl TimeBinnedFromLayers { let inp = super::gapfill::GapFill::new( "FromLayers".into(), ch_conf.clone(), + cache_usage.clone(), transform_query.clone(), sub.clone(), log_level.clone(), @@ -98,6 +102,7 @@ impl TimeBinnedFromLayers { )?; let ret = Self { ch_conf, + cache_usage, transform_query, sub, log_level, @@ -119,6 +124,7 @@ impl TimeBinnedFromLayers { let inp = super::gapfill::GapFill::new( "FromLayers".into(), ch_conf.clone(), + cache_usage.clone(), transform_query.clone(), sub.clone(), log_level.clone(), @@ -137,6 +143,7 @@ impl TimeBinnedFromLayers { ); let ret = Self { ch_conf, + cache_usage, transform_query, sub, log_level, @@ -168,6 +175,7 @@ impl TimeBinnedFromLayers { )?; let ret = Self { ch_conf, + cache_usage, transform_query, sub, log_level, diff --git a/crates/streams/src/timebin/gapfill.rs b/crates/streams/src/timebin/gapfill.rs index ef4a4f5..4c80fb8 100644 --- a/crates/streams/src/timebin/gapfill.rs +++ b/crates/streams/src/timebin/gapfill.rs @@ -13,6 +13,7 @@ use items_0::Empty; use items_0::WithLen; use items_2::binsdim0::BinsDim0; use netpod::log::*; +use netpod::query::CacheUsage; use netpod::range::evrange::NanoRange; use netpod::range::evrange::SeriesRange; use netpod::BinnedRange; @@ -59,6 +60,7 @@ type INP = Pin>> + Send>>; pub struct GapFill { dbgname: String, ch_conf: ChannelTypeConfigGen, + cache_usage: CacheUsage, transform_query: TransformQuery, sub: EventsSubQuerySettings, log_level: String, @@ -89,6 +91,7 @@ impl GapFill { pub fn new( dbgname_parent: String, ch_conf: ChannelTypeConfigGen, + cache_usage: CacheUsage, transform_query: TransformQuery, sub: EventsSubQuerySettings, log_level: String, @@ -102,19 +105,21 @@ impl GapFill { ) -> Result { let dbgname = format!("{}--[{}]", dbgname_parent, range); debug_init!("new dbgname {}", dbgname); - let inp = super::cached::reader::CachedReader::new( - series, - range.bin_len.to_dt_ms(), - range.clone(), - cache_read_provider.clone(), - )? - .map(|x| match x { - Ok(x) => Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))), - Err(e) => Err(::err::Error::from_string(e)), - }); + let inp = if cache_usage.is_cache_read() { + let stream = super::cached::reader::CachedReader::new(series, range.clone(), cache_read_provider.clone())? + .map(|x| match x { + Ok(x) => Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))), + Err(e) => Err(::err::Error::from_string(e)), + }); + Box::pin(stream) as Pin>> + Send>> + } else { + let stream = futures_util::stream::empty(); + Box::pin(stream) + }; let ret = Self { dbgname, ch_conf, + cache_usage, transform_query, sub, log_level, @@ -123,7 +128,7 @@ impl GapFill { range, do_time_weight, bin_len_layers, - inp: Some(Box::pin(inp)), + inp: Some(inp), inp_range_final: false, inp_buf: None, inp_finer: None, @@ -156,15 +161,22 @@ impl GapFill { if bins.len() != 0 { bins.clone().drain_into(&mut self.bins_for_cache_write, 0..bins.len()); } - self.cache_write_intermediate()?; - // TODO make sure that input does not send "made-up" empty future bins. - // On the other hand, if the request is over past range, but the channel was silent ever since? - // Then we should in principle know that from is-alive status checking. - // So, until then, allow made-up bins? - // Maybe, for now, only write those bins before some last non-zero-count bin. The only safe way. + if self.cache_usage.is_cache_write() { + self.cache_write_intermediate()?; + } // TODO make sure that input does not send "made-up" empty future bins. + // On the other hand, if the request is over past range, but the channel was silent ever since? + // Then we should in principle know that from is-alive status checking. + // So, until then, allow made-up bins? + // Maybe, for now, only write those bins before some last non-zero-count bin. The only safe way. Ok(bins) } + fn setup_sub(self: Pin<&mut Self>, range: NanoRange) -> Result<(), Error> { + trace_handle!("{} SETUP SUB STREAM {}", self.dbgname, range); + self.setup_inp_finer(range, true)?; + Ok(()) + } + fn handle_bins(mut self: Pin<&mut Self>, bins: BinsDim0) -> Result, Error> { trace_handle!("{} handle_bins {}", self.dbgname, bins); // TODO could use an interface to iterate over opaque bin items that only expose @@ -178,12 +190,7 @@ impl GapFill { } if let Some(last) = self.last_bin_ts2 { if ts1 != last.ns() { - trace_handle!( - "{} detect a gap ------------- SETUP SUB STREAM ts1 {} last {}", - self.dbgname, - ts1, - last - ); + trace_handle!("{} detect a gap BETWEEN last {} ts1 {}", self.dbgname, last, ts1); let mut ret = as items_0::Empty>::empty(); let mut bins = bins; bins.drain_into(&mut ret, 0..i); @@ -192,9 +199,24 @@ impl GapFill { beg: last.ns(), end: ts1, }; - self.setup_inp_finer(range, true)?; + self.setup_sub(range)?; return Ok(ret); + } else { + // nothing to do } + } else if ts1 != self.range.nano_beg().ns() { + trace_handle!( + "{} detect a gap BEGIN beg {} ts1 {}", + self.dbgname, + self.range.nano_beg(), + ts1 + ); + let range = NanoRange { + beg: self.range.nano_beg().ns(), + end: ts1, + }; + self.setup_sub(range)?; + return Ok(BinsDim0::empty()); } self.last_bin_ts2 = Some(TsNano::from_ns(ts2)); } @@ -220,6 +242,7 @@ impl GapFill { let inp_finer = GapFill::new( self.dbgname.clone(), self.ch_conf.clone(), + self.cache_usage.clone(), self.transform_query.clone(), self.sub.clone(), self.log_level.clone(), @@ -282,7 +305,7 @@ impl GapFill { } let aa = &self.bins_for_cache_write; if aa.len() >= 2 { - for (i, (&c1, &c2)) in aa.counts.iter().rev().zip(aa.counts.iter().rev().skip(1)).enumerate() { + for (i, (&c1, &_c2)) in aa.counts.iter().rev().zip(aa.counts.iter().rev().skip(1)).enumerate() { if c1 != 0 { let n = aa.len() - (1 + i); debug_cache!("{} cache_write_on_end consider {} for write", self.dbgname, n); @@ -299,7 +322,7 @@ impl GapFill { fn cache_write_intermediate(mut self: Pin<&mut Self>) -> Result<(), Error> { let aa = &self.bins_for_cache_write; if aa.len() >= 2 { - for (i, (&c1, &c2)) in aa.counts.iter().rev().zip(aa.counts.iter().rev().skip(1)).enumerate() { + for (i, (&c1, &_c2)) in aa.counts.iter().rev().zip(aa.counts.iter().rev().skip(1)).enumerate() { if c1 != 0 { let n = aa.len() - (1 + i); debug_cache!("{} cache_write_intermediate consider {} for write", self.dbgname, n); @@ -335,10 +358,6 @@ impl Stream for GapFill { Pending => Pending, } } else if let Some(inp_finer) = self.inp_finer.as_mut() { - // TODO - // detect also gaps here: if gap from finer, then error. - // on CacheUsage Use or Rereate: - // write these bins to cache because we did not find them in cache before. match inp_finer.poll_next_unpin(cx) { Ready(Some(Ok(x))) => match x { StreamItem::DataItem(RangeCompletableItem::Data(x)) => { @@ -351,9 +370,13 @@ impl Stream for GapFill { trace_handle!("{} RECV RANGE FINAL", self.dbgname); self.inp_finer_range_final = true; self.inp_finer_range_final_cnt += 1; - match self.as_mut().cache_write_on_end() { - Ok(()) => continue, - Err(e) => Ready(Some(Err(::err::Error::from_string(e)))), + if self.cache_usage.is_cache_write() { + match self.as_mut().cache_write_on_end() { + Ok(()) => continue, + Err(e) => Ready(Some(Err(::err::Error::from_string(e)))), + } + } else { + continue; } } StreamItem::Log(x) => Ready(Some(Ok(StreamItem::Log(x)))), @@ -366,23 +389,32 @@ impl Stream for GapFill { self.dbgname, self.last_bin_ts2 ); + let exp_finer_range = + ::core::mem::replace(&mut self.exp_finer_range, NanoRange { beg: 0, end: 0 }); self.inp_finer = None; if let Some(j) = self.last_bin_ts2 { - if j.ns() != self.exp_finer_range.end() { + if j.ns() != exp_finer_range.end() { trace_handle!( "{} inp_finer Ready(None) last_bin_ts2 {:?} exp_finer_range {:?}", self.dbgname, self.last_bin_ts2, - self.exp_finer_range + exp_finer_range ); - Ready(Some(Err(::err::Error::from_string( - "finer input didn't deliver to the end", - )))) + if self.inp_finer_fills_gap { + Ready(Some(Err(::err::Error::from_string( + "finer input didn't deliver to the end", + )))) + } else { + warn!( + "{} inp_finer Ready(None) last_bin_ts2 {:?} not delivered to the end, but maybe in the future", + self.dbgname, self.last_bin_ts2 + ); + continue; + } } else { - self.exp_finer_range = NanoRange { beg: 0, end: 0 }; continue; } - } else { + } else if self.inp_finer_fills_gap { error!( "{} inp_finer Ready(None) last_bin_ts2 {:?}", self.dbgname, self.last_bin_ts2 @@ -390,6 +422,12 @@ impl Stream for GapFill { Ready(Some(Err(::err::Error::from_string( "finer input delivered nothing, received nothing at all so far", )))) + } else { + warn!( + "{} inp_finer Ready(None) last_bin_ts2 {:?}", + self.dbgname, self.last_bin_ts2 + ); + continue; } } Pending => Pending, diff --git a/crates/streams/src/timebinnedjson.rs b/crates/streams/src/timebinnedjson.rs index c0aca15..265fbbf 100644 --- a/crates/streams/src/timebinnedjson.rs +++ b/crates/streams/src/timebinnedjson.rs @@ -1,4 +1,6 @@ use crate::collect::Collect; +use crate::json_stream::JsonBytes; +use crate::json_stream::JsonStream; use crate::rangefilter2::RangeFilter2; use crate::tcprawclient::container_stream_from_bytes_stream; use crate::tcprawclient::make_sub_query; @@ -335,14 +337,15 @@ async fn timebinned_stream( .collect() } else { vec![ - DtMs::from_ms_u64(1000 * 60), - // DtMs::from_ms_u64(1000 * 60 * 60), + DtMs::from_ms_u64(1000 * 10), + DtMs::from_ms_u64(1000 * 60 * 60), // DtMs::from_ms_u64(1000 * 60 * 60 * 12), // DtMs::from_ms_u64(1000 * 10), ] }; let stream = crate::timebin::TimeBinnedFromLayers::new( ch_conf, + query.cache_usage(), query.transform().clone(), EventsSubQuerySettings::from(&query), query.log_level().into(), @@ -445,3 +448,114 @@ pub async fn timebinned_json( let jsval = serde_json::to_value(&collected)?; Ok(jsval) } + +fn take_collector_result(coll: &mut Box) -> Option { + match coll.result(None, None) { + Ok(collres) => { + let collres = if let Some(bins) = collres + .as_any_ref() + .downcast_ref::>() + { + info!("MATCHED ENUM"); + bins.boxed_collected_with_enum_fix() + } else { + collres + }; + match serde_json::to_value(&collres) { + Ok(val) => Some(val), + Err(e) => Some(serde_json::Value::String(format!("{e}"))), + } + } + Err(e) => Some(serde_json::Value::String(format!("{e}"))), + } +} + +pub async fn timebinned_json_framed( + query: BinnedQuery, + ch_conf: ChannelTypeConfigGen, + ctx: &ReqCtx, + open_bytes: OpenBoxedBytesStreamsBox, + cache_read_provider: Option>, + events_read_provider: Option>, +) -> Result { + trace!("timebinned_json_framed"); + let binned_range = BinnedRangeEnum::covering_range(query.range().clone(), query.bin_count())?; + // TODO derive better values, from query + let stream = timebinned_stream( + query.clone(), + binned_range.clone(), + ch_conf, + ctx, + open_bytes, + cache_read_provider, + events_read_provider, + ) + .await?; + let stream = timebinned_to_collectable(stream); + + let mut coll = None; + let interval = tokio::time::interval(Duration::from( + query.timeout_content().unwrap_or(Duration::from_millis(1000)), + )); + let stream = stream.map(|x| Some(x)).chain(futures_util::stream::iter([None])); + let stream = tokio_stream::StreamExt::timeout_repeating(stream, interval).map(move |x| match x { + Ok(item) => match item { + Some(x) => match x { + Ok(x) => match x { + StreamItem::DataItem(x) => match x { + RangeCompletableItem::Data(mut item) => { + let coll = coll.get_or_insert_with(|| item.new_collector()); + coll.ingest(&mut item); + if coll.len() >= 128 { + take_collector_result(coll) + } else { + None + } + } + RangeCompletableItem::RangeComplete => None, + }, + StreamItem::Log(x) => { + info!("{x:?}"); + None + } + StreamItem::Stats(x) => { + info!("{x:?}"); + None + } + }, + Err(e) => Some(serde_json::Value::String(format!("{e}"))), + }, + None => { + if let Some(coll) = coll.as_mut() { + take_collector_result(coll) + } else { + None + } + } + }, + Err(_) => { + if let Some(coll) = coll.as_mut() { + if coll.len() != 0 { + take_collector_result(coll) + } else { + None + } + } else { + None + } + } + }); + // TODO skip the intermediate conversion to js value, go directly to string data + let stream = stream.map(|x| match x { + Some(x) => Some(JsonBytes::new(serde_json::to_string(&x).unwrap())), + None => None, + }); + let stream = stream.filter_map(|x| futures_util::future::ready(x)); + let stream = stream.map(|x| Ok(x)); + + // let stream = dyn_events_stream(evq, ch_conf, ctx, open_bytes).await?; + // let stream = events_stream_to_json_stream(stream); + // let stream = non_empty(stream); + // let stream = only_first_err(stream); + Ok(Box::pin(stream)) +}