diff --git a/apidoc/src/events.md b/apidoc/src/events.md index e45a878..8c5e5ac 100644 --- a/apidoc/src/events.md +++ b/apidoc/src/events.md @@ -11,8 +11,10 @@ Parameters: - `channelName`: the name of the channel. - `begDate`: start of the time range, inclusive. In ISO format e.g. `2024-02-15T12:41:00Z`. - `endDate`: end of the time range, exclusive. -- `allowLargeResult=true` indicates that the client is prepared to accept also larger responses compared to - what might be suitable for a typical browser. +- `oneBeforeRange`: if set to `true` the reponse will in addition also contain the most recent event before the given range. +- `allowLargeResult=true` **DEPRECATED**. indicates that the client is prepared to accept also larger responses compared to + what might be suitable for a typical browser. Please download large result sets as + framed json or framed cbor streams, see below. By default, events are returned as a json object. diff --git a/crates/dbconn/src/search.rs b/crates/dbconn/src/search.rs index 02e543e..3092b16 100644 --- a/crates/dbconn/src/search.rs +++ b/crates/dbconn/src/search.rs @@ -2,6 +2,7 @@ use crate::create_connection; use crate::worker::PgQueue; use crate::ErrConv; use err::Error; +use netpod::log::*; use netpod::ChannelArchiver; use netpod::ChannelSearchQuery; use netpod::ChannelSearchResult; @@ -128,10 +129,9 @@ pub(super) async fn search_channel_scylla( ), regop ); - let rows = pgc - .query(sql, &[&ch_kind, &query.name_regex, &cb1, &cb2]) - .await - .err_conv()?; + let params: &[&(dyn tokio_postgres::types::ToSql + Sync)] = &[&ch_kind, &query.name_regex, &cb1, &cb2]; + info!("search_channel_scylla {:?}", params); + let rows = pgc.query(sql, params).await.err_conv()?; let mut res = Vec::new(); for row in rows { let series: i64 = row.get(0); diff --git a/crates/httpclient/src/httpclient.rs b/crates/httpclient/src/httpclient.rs index 091a364..8864e29 100644 --- a/crates/httpclient/src/httpclient.rs +++ b/crates/httpclient/src/httpclient.rs @@ -310,8 +310,7 @@ pub async fn connect_client(uri: &http::Uri) -> Result, } }); let stream = TcpStream::connect(format!("{host}:{port}")).await?; - #[cfg(DISABLED)] - { + if false { let executor = hyper_util::rt::TokioExecutor::new(); hyper::client::conn::http2::Builder::new(executor); } diff --git a/crates/httpret/src/api1.rs b/crates/httpret/src/api1.rs index e848f51..d59462c 100644 --- a/crates/httpret/src/api1.rs +++ b/crates/httpret/src/api1.rs @@ -765,6 +765,7 @@ impl DataApiPython3DataStream { let select = EventsSubQuerySelect::new( ChannelTypeConfigGen::SfDatabuffer(fetch_info.clone()), self.range.clone().into(), + false, TransformQuery::for_event_blobs(), ); let log_level = String::new(); diff --git a/crates/httpret/src/proxy.rs b/crates/httpret/src/proxy.rs index ff6561a..8982828 100644 --- a/crates/httpret/src/proxy.rs +++ b/crates/httpret/src/proxy.rs @@ -242,20 +242,21 @@ async fn proxy_http_service_inner( } else if let Some(h) = super::api4::docs::DocsHandler::handler(&req) { Ok(h.handle(req, ctx).await?) } else { - use std::fmt::Write; - let mut body = String::new(); - let out = &mut body; - write!(out, "
\n")?;
-        write!(out, "METHOD {:?}
\n", req.method())?; - write!(out, "URI {:?}
\n", req.uri())?; - write!(out, "HOST {:?}
\n", req.uri().host())?; - write!(out, "PORT {:?}
\n", req.uri().port())?; - write!(out, "PATH {:?}
\n", req.uri().path())?; - write!(out, "QUERY {:?}
\n", req.uri().query())?; - for (hn, hv) in req.headers() { - write!(out, "HEADER {hn:?}: {hv:?}
\n")?; - } - write!(out, "
\n")?; + let headers: std::collections::BTreeMap = req + .headers() + .iter() + .map(|(k, v)| (k.to_string(), v.to_str().map_or(String::new(), |x| x.to_string()))) + .collect(); + let res = serde_json::json!({ + "method": req.method().to_string(), + "uri": req.uri().to_string(), + "host": req.uri().host().unwrap_or(""), + "port": req.uri().port().map_or(String::new(), |x| x.to_string()), + "path": req.uri().path(), + "query": req.uri().query(), + "headers": headers, + }); + let body = serde_json::to_string(&res).unwrap_or("{}".into()); Ok(response(StatusCode::NOT_FOUND).body(body_string(body))?) } } @@ -513,18 +514,20 @@ where return Ok(response_err_msg(StatusCode::BAD_REQUEST, msg)?); } }; - debug!("proxy_backend_query {:?} {:?}", query, req.uri()); + trace!("proxy_backend_query {:?} {:?}", query, req.uri()); let timeout = query.timeout(); let timeout_next = timeout.saturating_sub(Duration::from_millis(1000)); - debug!("timeout {timeout:?} timeout_next {timeout_next:?}"); + trace!("timeout {timeout:?} timeout_next {timeout_next:?}"); query.set_timeout(timeout_next); let query = query; let backend = query.backend(); let uri_path = proxy_backend_query_helper_uri_path(req.uri().path(), &url); - debug!("uri_path {uri_path}"); + trace!("uri_path {uri_path}"); let query_host = get_query_host_for_backend(backend, proxy_config)?; let mut url = Url::parse(&format!("{}{}", query_host, uri_path))?; + trace!("query_host {query_host} uri_path {uri_path} query {query:?}"); query.append_to_url(&mut url); + trace!("url {:?}", url); proxy_backend_query_inner(req.headers(), url, timeout, ctx, proxy_config).await } diff --git a/crates/httpret/src/proxy/api4/events.rs b/crates/httpret/src/proxy/api4/events.rs index f3a6b4e..322e988 100644 --- a/crates/httpret/src/proxy/api4/events.rs +++ b/crates/httpret/src/proxy/api4/events.rs @@ -68,7 +68,7 @@ impl EventsHandler { let url = req_uri_to_url(&head.uri)?; let pairs = get_url_query_pairs(&url); let evq = PlainEventsQuery::from_pairs(&pairs)?; - debug!("{evq:?}"); + debug!("{:?}", evq); let query_host = get_query_host_for_backend(evq.backend(), proxy_config)?; let url_str = format!( "{}{}", diff --git a/crates/items_0/src/isodate.rs b/crates/items_0/src/isodate.rs index 38fea90..262c7f3 100644 --- a/crates/items_0/src/isodate.rs +++ b/crates/items_0/src/isodate.rs @@ -11,9 +11,7 @@ pub struct IsoDateTime(DateTime); impl IsoDateTime { pub fn from_unix_millis(ms: u64) -> Self { - let datetime = chrono::NaiveDateTime::from_timestamp_millis(ms as i64) - .unwrap() - .and_utc(); + let datetime = chrono::DateTime::from_timestamp_millis(ms as i64).unwrap(); Self(datetime) } } diff --git a/crates/items_0/src/overlap.rs b/crates/items_0/src/overlap.rs index 01b964d..34f0116 100644 --- a/crates/items_0/src/overlap.rs +++ b/crates/items_0/src/overlap.rs @@ -1,4 +1,3 @@ -use netpod::log::*; use netpod::range::evrange::SeriesRange; // TODO rename, no more deque involved diff --git a/crates/items_0/src/timebin.rs b/crates/items_0/src/timebin.rs index fee111f..1d49f1e 100644 --- a/crates/items_0/src/timebin.rs +++ b/crates/items_0/src/timebin.rs @@ -161,6 +161,7 @@ impl WithLen for Box { } } +#[allow(unused)] impl RangeOverlapInfo for Box { fn ends_before(&self, range: &SeriesRange) -> bool { todo!() @@ -190,6 +191,7 @@ impl TimeBinnable for Box { } } +#[allow(unused)] impl RangeOverlapInfo for Box { fn ends_before(&self, range: &SeriesRange) -> bool { todo!() @@ -455,6 +457,7 @@ impl TimeBinnerTy for TimeBinnerDynStruct2 { } } +#[allow(unused)] impl TimeBinner for TimeBinnerDynStruct2 { fn ingest(&mut self, item: &mut dyn TimeBinnable) { todo!() diff --git a/crates/items_0/src/timebin/timebinimpl.rs b/crates/items_0/src/timebin/timebinimpl.rs index dbd314a..e6baf7e 100644 --- a/crates/items_0/src/timebin/timebinimpl.rs +++ b/crates/items_0/src/timebin/timebinimpl.rs @@ -17,7 +17,7 @@ macro_rules! trace_ingest { ($($arg:tt)*) => { trace!($($arg)*) }; } -#[cfg(DISABLED)] +#[cfg(target_abi = "x32")] impl TimeBinner for T where T: TimeBinnerIngest, diff --git a/crates/items_2/src/eventsdim0.rs b/crates/items_2/src/eventsdim0.rs index e9a1b6f..b9a50b2 100644 --- a/crates/items_2/src/eventsdim0.rs +++ b/crates/items_2/src/eventsdim0.rs @@ -74,6 +74,15 @@ macro_rules! trace2 { ($($arg:tt)*) => { trace!($($arg)*); }; } +#[allow(unused)] +macro_rules! trace_binning { + ($($arg:tt)*) => { + if false { + trace!($($arg)*); + } + }; +} + #[derive(Clone, PartialEq, Serialize, Deserialize)] pub struct EventsDim0NoPulse { pub tss: VecDeque, @@ -667,7 +676,7 @@ impl EventsDim0Aggregator { fn reset_values(&mut self, range: SeriesRange) { self.int_ts = range.beg_u64(); - trace!("ON RESET SET int_ts {:10}", self.int_ts); + trace_binning!("ON RESET SET int_ts {:10}", self.int_ts); self.range = range; self.count = 0; self.sum = 0.; @@ -707,9 +716,11 @@ impl EventsDim0Aggregator { fn result_reset_time_weight(&mut self, range: SeriesRange) -> BinsDim0 { // TODO check callsite for correct expand status. - debug!( + trace_binning!( "result_reset_time_weight calls apply_event_time_weight range {:?} items_seen {} count {}", - self.range, self.items_seen, self.count + self.range, + self.items_seen, + self.count ); let range_beg = self.range.beg_u64(); let range_end = self.range.end_u64(); @@ -777,7 +788,7 @@ impl TimeBinnableTypeAggregator for EventsDim0Aggregator { } fn result_reset(&mut self, range: SeriesRange) -> Self::Output { - trace!("result_reset {:?}", range); + trace_binning!("result_reset {:?}", range); if self.do_time_weight { self.result_reset_time_weight(range) } else { @@ -1066,10 +1077,10 @@ impl EventsDim0TimeBinner { fn next_bin_range(&mut self) -> Option { self.rix += 1; if let Some(rng) = self.binrange.range_at(self.rix) { - trace!("{} next_bin_range {:?}", Self::type_name(), rng); + trace_binning!("{} next_bin_range {:?}", Self::type_name(), rng); Some(rng) } else { - trace!("{} next_bin_range None", Self::type_name()); + trace_binning!("{} next_bin_range None", Self::type_name()); None } } diff --git a/crates/items_2/src/timebin.rs b/crates/items_2/src/timebin.rs index 010d416..c224541 100644 --- a/crates/items_2/src/timebin.rs +++ b/crates/items_2/src/timebin.rs @@ -22,7 +22,7 @@ macro_rules! trace_ingest { #[allow(unused)] macro_rules! trace_ingest_item { ($($arg:tt)*) => { - if true { + if false { info!($($arg)*); } }; @@ -30,8 +30,11 @@ macro_rules! trace_ingest_item { #[allow(unused)] macro_rules! trace2 { - ($($arg:tt)*) => {}; - ($($arg:tt)*) => { trace!($($arg)*); }; + ($($arg:tt)*) => { + if false { + trace!($($arg)*); + } + }; } pub trait TimeBinnerCommonV0Trait { diff --git a/crates/netpod/src/netpod.rs b/crates/netpod/src/netpod.rs index e442061..9efd751 100644 --- a/crates/netpod/src/netpod.rs +++ b/crates/netpod/src/netpod.rs @@ -3,6 +3,7 @@ pub mod histo; pub mod query; pub mod range; pub mod status; +pub mod stream_impl_tracer; pub mod streamext; pub mod ttl; @@ -116,6 +117,12 @@ pub const DATETIME_FMT_9MS: &str = "%Y-%m-%dT%H:%M:%S.%9fZ"; const TEST_BACKEND: &str = "testbackend-00"; +#[allow(non_upper_case_globals)] +pub const trigger: [&'static str; 1] = [ + // + "S30CB05-VMCP-A010:PRESSURE", +]; + pub struct OnDrop where F: FnOnce() -> (), diff --git a/crates/netpod/src/stream_impl_tracer.rs b/crates/netpod/src/stream_impl_tracer.rs new file mode 100644 index 0000000..e38ce99 --- /dev/null +++ b/crates/netpod/src/stream_impl_tracer.rs @@ -0,0 +1,43 @@ +use crate::log::*; + +pub struct StreamImplTracer { + name: String, + npoll_cnt: usize, + npoll_max: usize, + loop_cnt: usize, + loop_max: usize, +} + +impl StreamImplTracer { + pub fn new(name: String, npoll_max: usize, loop_max: usize) -> Self { + Self { + name, + npoll_cnt: 0, + npoll_max, + loop_cnt: 0, + loop_max, + } + } + + pub fn poll_enter(&mut self) -> bool { + self.npoll_cnt += 1; + if self.npoll_cnt >= self.npoll_max { + trace!("{} poll {} reached limit", self.name, self.npoll_cnt); + true + } else { + trace!("{} poll {}", self.name, self.npoll_cnt); + false + } + } + + pub fn loop_enter(&mut self) -> bool { + self.loop_cnt += 1; + if self.loop_cnt >= self.loop_max { + trace!("{} loop {} reached limit", self.name, self.loop_cnt); + true + } else { + trace!("{} loop {}", self.name, self.loop_cnt); + false + } + } +} diff --git a/crates/nodenet/src/conn/test.rs b/crates/nodenet/src/conn/test.rs index 70b897f..c014b3b 100644 --- a/crates/nodenet/src/conn/test.rs +++ b/crates/nodenet/src/conn/test.rs @@ -82,7 +82,8 @@ fn raw_data_00() { ScalarType::I32, Shape::Scalar, ); - let select = EventsSubQuerySelect::new(fetch_info.into(), range.into(), TransformQuery::default_events()); + let select = + EventsSubQuerySelect::new(fetch_info.into(), range.into(), false, TransformQuery::default_events()); let settings = EventsSubQuerySettings::default(); let log_level = String::new(); let qu = EventsSubQuery::from_parts(select, settings, "dummy".into(), log_level); diff --git a/crates/nodenet/src/scylla.rs b/crates/nodenet/src/scylla.rs index 9afe9f9..7f59859 100644 --- a/crates/nodenet/src/scylla.rs +++ b/crates/nodenet/src/scylla.rs @@ -31,13 +31,12 @@ pub async fn scylla_channel_event_stream( debug!("scylla_channel_event_stream {evq:?}"); // TODO depends in general on the query // TODO why both in PlainEventsQuery and as separate parameter? Check other usages. - // let do_one_before_range = evq.need_one_before_range(); - let do_one_before_range = false; - let series = SeriesId::new(chconf.series()); - let scalar_type = chconf.scalar_type(); - let shape = chconf.shape(); - let do_test_stream_error = false; - let readopts = EventReadOpts::new(evq.need_value_data(), evq.transform().enum_as_string().unwrap_or(false)); + let _series = SeriesId::new(chconf.series()); + let readopts = EventReadOpts::new( + evq.need_one_before_range(), + evq.need_value_data(), + evq.transform().enum_as_string().unwrap_or(false), + ); let stream: Pin + Send>> = if let Some(rt) = evq.use_rt() { let x = scyllaconn::events2::events::EventsStreamRt::new( rt, diff --git a/crates/query/src/api4/events.rs b/crates/query/src/api4/events.rs index af442ec..8b78155 100644 --- a/crates/query/src/api4/events.rs +++ b/crates/query/src/api4/events.rs @@ -59,6 +59,7 @@ pub struct PlainEventsQuery { log_level: String, #[serde(default)] use_rt: Option, + querymarker: String, } impl PlainEventsQuery { @@ -85,6 +86,7 @@ impl PlainEventsQuery { create_errors: Vec::new(), log_level: String::new(), use_rt: None, + querymarker: String::new(), } } @@ -97,7 +99,7 @@ impl PlainEventsQuery { } pub fn one_before_range(&self) -> bool { - self.transform.need_one_before_range() + self.one_before_range || self.transform.need_one_before_range() } pub fn transform(&self) -> &TransformQuery { @@ -296,6 +298,7 @@ impl FromUrl for PlainEventsQuery { .map(Some) .map_err(|_| Error::with_public_msg_no_trace(format!("can not parse useRt: {}", k))) })?, + querymarker: pairs.get("querymarker").map_or(String::new(), |x| x.to_string()), }; Ok(ret) } @@ -308,12 +311,10 @@ impl AppendToUrl for PlainEventsQuery { SeriesRange::PulseRange(_) => todo!(), } self.channel.append_to_url(url); - { - let mut g = url.query_pairs_mut(); - if self.one_before_range() { - g.append_pair("oneBeforeRange", "true"); - } - } + let mut g = url.query_pairs_mut(); + g.append_pair("oneBeforeRange", &self.one_before_range().to_string()); + g.append_pair("querymarker", &self.querymarker); + drop(g); self.transform.append_to_url(url); let mut g = url.query_pairs_mut(); if let Some(x) = &self.timeout { @@ -365,15 +366,22 @@ impl AppendToUrl for PlainEventsQuery { pub struct EventsSubQuerySelect { ch_conf: ChannelTypeConfigGen, range: SeriesRange, + one_before_range: bool, transform: TransformQuery, wasm1: Option, } impl EventsSubQuerySelect { - pub fn new(ch_info: ChannelTypeConfigGen, range: SeriesRange, transform: TransformQuery) -> Self { + pub fn new( + ch_info: ChannelTypeConfigGen, + range: SeriesRange, + one_before_range: bool, + transform: TransformQuery, + ) -> Self { Self { ch_conf: ch_info, range, + one_before_range, transform, wasm1: None, } @@ -510,6 +518,10 @@ impl EventsSubQuery { &self.select.range } + pub fn need_one_before_range(&self) -> bool { + self.select.one_before_range + } + pub fn transform(&self) -> &TransformQuery { &self.select.transform } diff --git a/crates/scyllaconn/src/events2/events.rs b/crates/scyllaconn/src/events2/events.rs index df45863..b558f3c 100644 --- a/crates/scyllaconn/src/events2/events.rs +++ b/crates/scyllaconn/src/events2/events.rs @@ -24,6 +24,15 @@ use std::pin::Pin; use std::task::Context; use std::task::Poll; +#[allow(unused)] +macro_rules! trace_fetch { + ($($arg:tt)*) => { + if true { + trace!($($arg)*); + } + }; +} + #[allow(unused)] macro_rules! trace_emit { ($($arg:tt)*) => { @@ -46,11 +55,13 @@ macro_rules! warn_item { pub struct EventReadOpts { pub with_values: bool, pub enum_as_strings: bool, + pub one_before: bool, } impl EventReadOpts { - pub fn new(with_values: bool, enum_as_strings: bool) -> Self { + pub fn new(one_before: bool, with_values: bool, enum_as_strings: bool) -> Self { Self { + one_before, with_values, enum_as_strings, } @@ -84,14 +95,20 @@ enum ReadingState { FetchEvents(FetchEvents), } -struct Reading { +struct ReadingBck { + scyqueue: ScyllaQueue, + reading_state: ReadingState, +} + +struct ReadingFwd { scyqueue: ScyllaQueue, reading_state: ReadingState, } enum State { Begin, - Reading(Reading), + ReadingBck(ReadingBck), + ReadingFwd(ReadingFwd), InputDone, Done, } @@ -105,6 +122,8 @@ pub struct EventsStreamRt { state: State, scyqueue: ScyllaQueue, msp_inp: MspStreamRt, + msp_buf: VecDeque, + msp_buf_bck: VecDeque, out: VecDeque>, ts_seen_max: u64, } @@ -129,29 +148,43 @@ impl EventsStreamRt { state: State::Begin, scyqueue, msp_inp, + msp_buf: VecDeque::new(), + msp_buf_bck: VecDeque::new(), out: VecDeque::new(), ts_seen_max: 0, } } + fn make_msp_read_fut( + msp_inp: &mut MspStreamRt, + ) -> Pin>> + Send>> { + trace_fetch!("make_msp_read_fut"); + let msp_inp = unsafe { + let ptr = msp_inp as *mut MspStreamRt; + &mut *ptr + }; + let fut = Box::pin(msp_inp.next()); + fut + } + fn make_read_events_fut( &mut self, ts_msp: TsMs, + bck: bool, scyqueue: ScyllaQueue, ) -> Pin, Error>> + Send>> { - let fwd = true; let opts = ReadNextValuesOpts::new( self.rt.clone(), self.series.clone(), ts_msp, self.range.clone(), - fwd, + !bck, self.readopts.clone(), scyqueue, ); let scalar_type = self.ch_conf.scalar_type().clone(); let shape = self.ch_conf.shape().clone(); - debug!("make_read_events_fut {:?} {:?}", shape, scalar_type); + trace_fetch!("make_read_events_fut bck {} {:?} {:?}", bck, shape, scalar_type); let fut = async move { let ret = match &shape { Shape::Scalar => match &scalar_type { @@ -168,9 +201,10 @@ impl EventsStreamRt { ScalarType::BOOL => read_next_values::(opts).await, ScalarType::STRING => read_next_values::(opts).await, ScalarType::Enum => { - debug!( + trace_fetch!( "make_read_events_fut {:?} {:?} ------------- good", - shape, scalar_type + shape, + scalar_type ); read_next_values::(opts).await } @@ -205,6 +239,60 @@ impl EventsStreamRt { }; Box::pin(fut) } + + fn transition_to_bck_read(&mut self) { + trace_fetch!("transition_to_bck_read"); + for ts in self.msp_buf.iter() { + if ts.ns() < self.range.beg() { + self.msp_buf_bck.push_front(ts.clone()); + } + } + let c = self.msp_buf.iter().take_while(|x| x.ns() < self.range.beg()).count(); + let g = c.max(1) - 1; + for _ in 0..g { + self.msp_buf.pop_front(); + } + self.setup_bck_read(); + } + + fn setup_bck_read(&mut self) { + trace_fetch!("setup_bck_read"); + if let Some(ts) = self.msp_buf_bck.pop_front() { + let scyqueue = self.scyqueue.clone(); + let fut = self.make_read_events_fut(ts, true, scyqueue); + self.state = State::ReadingBck(ReadingBck { + scyqueue: self.scyqueue.clone(), + reading_state: ReadingState::FetchEvents(FetchEvents { fut }), + }); + } else { + self.transition_to_fwd_read(); + } + } + + fn transition_to_fwd_read(&mut self) { + trace_fetch!("transition_to_fwd_read"); + self.msp_buf_bck = VecDeque::new(); + self.setup_fwd_read(); + } + + fn setup_fwd_read(&mut self) { + if let Some(ts) = self.msp_buf.pop_front() { + trace_fetch!("setup_fwd_read {ts}"); + let scyqueue = self.scyqueue.clone(); + let fut = self.make_read_events_fut(ts, false, scyqueue); + self.state = State::ReadingFwd(ReadingFwd { + scyqueue: self.scyqueue.clone(), + reading_state: ReadingState::FetchEvents(FetchEvents { fut }), + }); + } else { + trace_fetch!("setup_fwd_read no msp"); + let fut = Self::make_msp_read_fut(&mut self.msp_inp); + self.state = State::ReadingFwd(ReadingFwd { + scyqueue: self.scyqueue.clone(), + reading_state: ReadingState::FetchMsp(FetchMsp { fut }), + }); + } + } } impl Stream for EventsStreamRt { @@ -220,7 +308,7 @@ impl Stream for EventsStreamRt { break Ready(Some(Err(Error::BadBatch))); } if let Some(item_min) = item.ts_min() { - if item_min < self.range.beg().ns() { + if !self.readopts.one_before && item_min < self.range.beg().ns() { warn_item!( "{}out of range error A {} {:?}", "\n\n--------------------------\n", @@ -284,29 +372,84 @@ impl Stream for EventsStreamRt { } break match &mut self.state { State::Begin => { - let msp_inp = unsafe { - let ptr = (&mut self.msp_inp) as *mut MspStreamRt; - &mut *ptr - }; - let fut = Box::pin(msp_inp.next()); - self.state = State::Reading(Reading { - scyqueue: self.scyqueue.clone(), - reading_state: ReadingState::FetchMsp(FetchMsp { fut }), - }); + if self.readopts.one_before { + trace_fetch!("State::Begin Bck"); + let fut = Self::make_msp_read_fut(&mut self.msp_inp); + self.state = State::ReadingBck(ReadingBck { + scyqueue: self.scyqueue.clone(), + reading_state: ReadingState::FetchMsp(FetchMsp { fut }), + }); + } else { + trace_fetch!("State::Begin Fwd"); + let fut = Self::make_msp_read_fut(&mut self.msp_inp); + self.state = State::ReadingFwd(ReadingFwd { + scyqueue: self.scyqueue.clone(), + reading_state: ReadingState::FetchMsp(FetchMsp { fut }), + }); + } continue; } - State::Reading(st) => match &mut st.reading_state { + State::ReadingBck(st) => match &mut st.reading_state { ReadingState::FetchMsp(st2) => match st2.fut.poll_unpin(cx) { Ready(Some(Ok(ts))) => { - let scyqueue = st.scyqueue.clone(); - let fut = self.make_read_events_fut(ts, scyqueue); - if let State::Reading(st) = &mut self.state { - st.reading_state = ReadingState::FetchEvents(FetchEvents { fut }); - continue; + trace_fetch!("ReadingBck FetchMsp {:?}", ts); + self.msp_buf.push_back(ts); + if ts.ns() >= self.range.beg() { + self.transition_to_bck_read(); } else { - self.state = State::Done; - Ready(Some(Err(Error::Logic))) + let fut = Self::make_msp_read_fut(&mut self.msp_inp); + self.state = State::ReadingBck(ReadingBck { + scyqueue: self.scyqueue.clone(), + reading_state: ReadingState::FetchMsp(FetchMsp { fut }), + }); } + continue; + } + Ready(Some(Err(e))) => Ready(Some(Err(e.into()))), + Ready(None) => { + self.transition_to_bck_read(); + continue; + } + Pending => Pending, + }, + ReadingState::FetchEvents(st2) => match st2.fut.poll_unpin(cx) { + Ready(Ok(mut x)) => { + use items_2::merger::Mergeable; + trace_fetch!("ReadingBck FetchEvents got len {:?}", x.len()); + if let Some(ix) = Mergeable::find_highest_index_lt(&x, self.range.beg().ns()) { + trace_fetch!("ReadingBck FetchEvents find_highest_index_lt {:?}", ix); + let mut y = Mergeable::new_empty(&x); + match Mergeable::drain_into(&mut x, &mut y, (ix, 1 + ix)) { + Ok(()) => { + trace_fetch!("ReadingBck FetchEvents drained y len {:?}", y.len()); + self.out.push_back(y); + self.transition_to_fwd_read(); + continue; + } + Err(e) => { + self.state = State::Done; + Ready(Some(Err(e.into()))) + } + } + } else { + trace_fetch!("ReadingBck FetchEvents find_highest_index_lt None"); + self.setup_bck_read(); + continue; + } + } + Ready(Err(e)) => { + self.state = State::Done; + Ready(Some(Err(e.into()))) + } + Pending => Pending, + }, + }, + State::ReadingFwd(st) => match &mut st.reading_state { + ReadingState::FetchMsp(st2) => match st2.fut.poll_unpin(cx) { + Ready(Some(Ok(ts))) => { + self.msp_buf.push_back(ts); + self.setup_fwd_read(); + continue; } Ready(Some(Err(e))) => Ready(Some(Err(e.into()))), Ready(None) => { @@ -318,18 +461,8 @@ impl Stream for EventsStreamRt { ReadingState::FetchEvents(st2) => match st2.fut.poll_unpin(cx) { Ready(Ok(x)) => { self.out.push_back(x); - let msp_inp = unsafe { - let ptr = (&mut self.msp_inp) as *mut MspStreamRt; - &mut *ptr - }; - let fut = Box::pin(msp_inp.next()); - if let State::Reading(st) = &mut self.state { - st.reading_state = ReadingState::FetchMsp(FetchMsp { fut }); - continue; - } else { - self.state = State::Done; - Ready(Some(Err(Error::Logic))) - } + self.setup_fwd_read(); + continue; } Ready(Err(e)) => { self.state = State::Done; diff --git a/crates/scyllaconn/src/events2/firstbefore.rs b/crates/scyllaconn/src/events2/firstbefore.rs index da2d9bf..1fe24d4 100644 --- a/crates/scyllaconn/src/events2/firstbefore.rs +++ b/crates/scyllaconn/src/events2/firstbefore.rs @@ -5,17 +5,54 @@ use futures_util::StreamExt; use items_0::Events; use items_2::merger::Mergeable; use netpod::log::*; +use netpod::stream_impl_tracer::StreamImplTracer; use netpod::TsNano; use std::pin::Pin; use std::task::Context; use std::task::Poll; +#[allow(unused)] +macro_rules! trace_transition { + ($($arg:tt)*) => { + if true { + trace!($($arg)*); + } + }; +} + +#[allow(unused)] +macro_rules! trace_emit { + ($($arg:tt)*) => { + if true { + trace!($($arg)*); + } + }; +} + +macro_rules! tracer_poll_enter { + ($self:expr) => { + if false && $self.tracer.poll_enter() { + return Ready(Some(Err(Error::LimitPoll))); + } + }; +} + +macro_rules! tracer_loop_enter { + ($self:expr) => { + if false && $self.tracer.loop_enter() { + return Ready(Some(Err(Error::LimitLoop))); + } + }; +} + #[derive(Debug, ThisError)] #[cstm(name = "EventsFirstBefore")] pub enum Error { Unordered, Logic, Input(Box), + LimitPoll, + LimitLoop, } pub enum Output { @@ -38,6 +75,7 @@ where inp: S, state: State, buf: Option, + tracer: StreamImplTracer, } impl FirstBeforeAndInside @@ -46,11 +84,13 @@ where T: Events + Mergeable + Unpin, { pub fn new(inp: S, ts0: TsNano) -> Self { + trace_transition!("FirstBeforeAndInside::new"); Self { ts0, inp, state: State::Begin, buf: None, + tracer: StreamImplTracer::new("FirstBeforeAndInside".into(), 2000, 100), } } } @@ -65,7 +105,9 @@ where fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; + tracer_poll_enter!(self); loop { + tracer_loop_enter!(self); break match &self.state { State::Begin => match self.inp.poll_next_unpin(cx) { Ready(Some(Ok(mut item))) => { @@ -79,8 +121,13 @@ where // Separate events into before and bulk let tss = item.tss(); let pp = tss.partition_point(|&x| x < self.ts0.ns()); - if pp >= tss.len() { - // all entries are before + trace_transition!("partition_point {pp:?} {n:?}", n = tss.len()); + if pp > item.len() { + error!("bad partition point {} {}", pp, item.len()); + self.state = State::Done; + Ready(Some(Err(Error::Logic))) + } else if pp == item.len() { + // all entries are before, or empty item if self.buf.is_none() { self.buf = Some(item.new_empty()); } @@ -95,16 +142,19 @@ where } } else if pp == 0 { // all entries are bulk - debug!("transition immediately to bulk"); + trace_transition!("transition immediately to bulk"); self.state = State::Bulk; let o1 = core::mem::replace(&mut self.buf, Some(item.new_empty())) .unwrap_or_else(|| item.new_empty()); Ready(Some(Ok(Output::First(o1, item)))) } else { // mixed + if self.buf.is_none() { + self.buf = Some(item.new_empty()); + } match item.drain_into_evs(self.buf.as_mut().unwrap(), (0, pp)) { Ok(()) => { - debug!("transition with mixed to bulk"); + trace_transition!("transition with mixed to bulk"); self.state = State::Bulk; let o1 = core::mem::replace(&mut self.buf, Some(item.new_empty())) .unwrap_or_else(|| item.new_empty()); @@ -124,12 +174,21 @@ where } Ready(None) => { self.state = State::Done; - Ready(None) + if let Some(x) = self.buf.take() { + let empty = x.new_empty(); + Ready(Some(Ok(Output::First(x, empty)))) + } else { + Ready(None) + } } Pending => Pending, }, State::Bulk => { if self.buf.as_ref().map_or(0, |x| x.len()) != 0 { + error!( + "State::Bulk but buf non-empty {}", + self.buf.as_ref().map_or(0, |x| x.len()) + ); self.state = State::Done; Ready(Some(Err(Error::Logic))) } else { @@ -140,7 +199,7 @@ where let e = Error::Unordered; Ready(Some(Err(e))) } else { - debug!("output bulk item len {}", item.len()); + trace_emit!("output bulk item len {}", item.len()); Ready(Some(Ok(Output::Bulk(item)))) } } @@ -149,7 +208,7 @@ where Ready(Some(Err(Error::Input(Box::new(e))))) } Ready(None) => { - debug!("in bulk, input done"); + trace_emit!("in bulk, input done"); self.state = State::Done; Ready(None) } diff --git a/crates/scyllaconn/src/events2/mergert.rs b/crates/scyllaconn/src/events2/mergert.rs index 8244957..8516b31 100644 --- a/crates/scyllaconn/src/events2/mergert.rs +++ b/crates/scyllaconn/src/events2/mergert.rs @@ -16,6 +16,7 @@ use items_2::merger::Mergeable; use netpod::log::*; use netpod::range::evrange::NanoRange; use netpod::range::evrange::SeriesRange; +use netpod::stream_impl_tracer::StreamImplTracer; use netpod::ttl::RetentionTime; use netpod::ChConf; use std::collections::VecDeque; @@ -23,6 +24,40 @@ use std::pin::Pin; use std::task::Context; use std::task::Poll; +#[allow(unused)] +macro_rules! trace_fetch { + ($($arg:tt)*) => { + if true { + trace!($($arg)*); + } + }; +} + +#[allow(unused)] +macro_rules! trace_emit { + ($($arg:tt)*) => { + if true { + trace!($($arg)*); + } + }; +} + +macro_rules! tracer_poll_enter { + ($self:expr) => { + if false && $self.tracer.poll_enter() { + return Ready(Some(Err(Error::LimitPoll))); + } + }; +} + +macro_rules! tracer_loop_enter { + ($self:expr) => { + if false && $self.tracer.loop_enter() { + return Ready(Some(Err(Error::LimitLoop))); + } + }; +} + #[derive(Debug, ThisError)] #[cstm(name = "EventsMergeRt")] pub enum Error { @@ -31,6 +66,8 @@ pub enum Error { Logic, OrderMin, OrderMax, + LimitPoll, + LimitLoop, } #[allow(unused)] @@ -116,10 +153,12 @@ pub struct MergeRts { out: VecDeque, buf_before: Option, ts_seen_max: u64, + tracer: StreamImplTracer, } impl MergeRts { pub fn new(ch_conf: ChConf, range: ScyllaSeriesRange, readopts: EventReadOpts, scyqueue: ScyllaQueue) -> Self { + info!("MergeRts readopts {readopts:?}"); Self { ch_conf, range_mt: range.clone(), @@ -137,6 +176,7 @@ impl MergeRts { out: VecDeque::new(), buf_before: None, ts_seen_max: 0, + tracer: StreamImplTracer::new("MergeRts".into(), 2000, 2000), } } @@ -145,7 +185,7 @@ impl MergeRts { let limbuf = &VecDeque::new(); let inpdst = &mut self.inp_st; let range = Self::constrained_range(&self.range, limbuf); - debug!("setup_first_st constrained beg {}", range.beg().ns()); + trace_fetch!("setup_first_st constrained beg {}", range.beg().ns()); let tsbeg = range.beg(); let inp = EventsStreamRt::new( rt, @@ -164,7 +204,7 @@ impl MergeRts { let inpdst = &mut self.inp_mt; let range = Self::constrained_range(&self.range_mt, limbuf); self.range_lt = range.clone(); - debug!("setup_first_mt constrained beg {}", range.beg().ns()); + trace_fetch!("setup_first_mt constrained beg {}", range.beg().ns()); let tsbeg = range.beg(); let inp = EventsStreamRt::new( rt, @@ -182,7 +222,7 @@ impl MergeRts { let limbuf = &self.buf_mt; let inpdst = &mut self.inp_lt; let range = Self::constrained_range(&self.range_lt, limbuf); - debug!("setup_first_lt constrained beg {}", range.beg().ns()); + trace_fetch!("setup_first_lt constrained beg {}", range.beg().ns()); let tsbeg = range.beg(); let inp = EventsStreamRt::new( rt, @@ -196,37 +236,35 @@ impl MergeRts { } fn setup_read_st(&mut self) -> ReadEvents { - let stream = unsafe { &mut *(self.inp_st.as_mut().unwrap().as_mut() as *mut TI) }; - let fut = Box::pin(stream.next()); - ReadEvents { fut } + trace_fetch!("setup_read_st"); + Self::setup_read_any(&mut self.inp_st) } fn setup_read_mt(&mut self) -> ReadEvents { - let stream = unsafe { &mut *(self.inp_mt.as_mut().unwrap().as_mut() as *mut TI) }; - let fut = Box::pin(stream.next()); - ReadEvents { fut } + trace_fetch!("setup_read_mt"); + Self::setup_read_any(&mut self.inp_mt) } fn setup_read_lt(&mut self) -> ReadEvents { - let stream = unsafe { &mut *(self.inp_lt.as_mut().unwrap().as_mut() as *mut TI) }; - let fut = Box::pin(stream.next()); - ReadEvents { fut } + trace_fetch!("setup_read_lt"); + Self::setup_read_any(&mut self.inp_lt) } fn setup_read_any(inp: &mut Option>) -> ReadEvents { + trace_fetch!("setup_read_any"); let stream = unsafe { &mut *(inp.as_mut().unwrap().as_mut() as *mut TI) }; let fut = Box::pin(stream.next()); ReadEvents { fut } } fn constrained_range(full: &ScyllaSeriesRange, buf: &VecDeque) -> ScyllaSeriesRange { - debug!("constrained_range {:?} {:?}", full, buf.front()); + trace_fetch!("constrained_range {:?} {:?}", full, buf.front()); if let Some(e) = buf.front() { if let Some(ts) = e.ts_min() { let nrange = NanoRange::from((full.beg().ns(), ts)); ScyllaSeriesRange::from(&SeriesRange::from(nrange)) } else { - debug!("no ts even though should not have empty buffers"); + debug!("constrained_range no ts even though should not have empty buffers"); full.clone() } } else { @@ -235,6 +273,7 @@ impl MergeRts { } fn handle_first_st(&mut self, mut before: ChannelEvents, bulk: ChannelEvents) { + trace_fetch!("handle_first_st"); Self::move_latest_to_before_buf(&mut before, &mut self.buf_before); self.buf_st.push_back(bulk); self.setup_first_mt(); @@ -242,6 +281,7 @@ impl MergeRts { } fn handle_first_mt(&mut self, mut before: ChannelEvents, bulk: ChannelEvents) { + trace_fetch!("handle_first_mt"); Self::move_latest_to_before_buf(&mut before, &mut self.buf_before); self.buf_mt.push_back(bulk); self.setup_first_lt(); @@ -249,27 +289,39 @@ impl MergeRts { } fn handle_first_lt(&mut self, mut before: ChannelEvents, bulk: ChannelEvents) { + trace_fetch!("handle_first_lt"); Self::move_latest_to_before_buf(&mut before, &mut self.buf_before); self.buf_lt.push_back(bulk); + self.push_out_one_before(); let buf = core::mem::replace(&mut self.buf_lt, VecDeque::new()); self.state = State::ReadingLt(None, buf, self.inp_lt.take()); } fn move_latest_to_before_buf(before: &mut ChannelEvents, buf: &mut Option) { + trace_fetch!("move_latest_to_before_buf"); if buf.is_none() { *buf = Some(before.new_empty()); } let buf = buf.as_mut().unwrap(); if let Some(tsn) = before.ts_max() { - if let Some(tse) = buf.ts_max() { - if tsn > tse { - let n = before.len(); - buf.clear(); - before.drain_into(buf, (n - 1, n)).unwrap(); - } + if buf.ts_max().map_or(true, |x| tsn > x) { + let n = before.len(); + buf.clear(); + before.drain_into(buf, (n - 1, n)).unwrap(); } } } + + fn push_out_one_before(&mut self) { + if let Some(buf) = self.buf_before.take() { + trace_fetch!("push_out_one_before len {len:?}", len = buf.len()); + if buf.len() != 0 { + self.out.push_back(buf); + } + } else { + trace_fetch!("push_out_one_before no buffer"); + } + } } impl Stream for MergeRts { @@ -277,13 +329,15 @@ impl Stream for MergeRts { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; + tracer_poll_enter!(self); let mut out2 = VecDeque::new(); loop { + tracer_loop_enter!(self); while let Some(x) = out2.pop_front() { self.out.push_back(x); } if let Some(item) = self.out.pop_front() { - debug!("emit item {} {:?}", items_0::Events::verify(&item), item); + trace_emit!("emit item {} {:?}", items_0::Events::verify(&item), item); if items_0::Events::verify(&item) != true { debug!("{}bad item {:?}", "\n\n--------------------------\n", item); self.state = State::Done; @@ -310,6 +364,9 @@ impl Stream for MergeRts { self.ts_seen_max = item_max; } } + if let Some(ix) = item.find_highest_index_lt(self.range.beg().ns()) { + trace_fetch!("see item before range ix {ix}"); + } break Ready(Some(Ok(item))); } break match &mut self.state { @@ -321,7 +378,7 @@ impl Stream for MergeRts { State::FetchFirstSt(st2) => match st2.fut.poll_unpin(cx) { Ready(Some(Ok(x))) => match x { firstbefore::Output::First(before, bulk) => { - debug!("have first from ST"); + trace_fetch!("have first from ST"); self.handle_first_st(before, bulk); continue; } @@ -336,7 +393,7 @@ impl Stream for MergeRts { Ready(Some(Err(e.into()))) } Ready(None) => { - debug!("no first from ST"); + trace_fetch!("no first from ST"); self.inp_st = None; self.setup_first_mt(); self.state = State::FetchFirstMt(self.setup_read_mt()); @@ -347,7 +404,7 @@ impl Stream for MergeRts { State::FetchFirstMt(st2) => match st2.fut.poll_unpin(cx) { Ready(Some(Ok(x))) => match x { firstbefore::Output::First(before, bulk) => { - debug!("have first from MT"); + trace_fetch!("have first from MT"); self.handle_first_mt(before, bulk); continue; } @@ -362,7 +419,7 @@ impl Stream for MergeRts { Ready(Some(Err(e.into()))) } Ready(None) => { - debug!("no first from MT"); + trace_fetch!("no first from MT"); self.inp_mt = None; self.setup_first_lt(); self.state = State::FetchFirstLt(self.setup_read_lt()); @@ -373,7 +430,7 @@ impl Stream for MergeRts { State::FetchFirstLt(st2) => match st2.fut.poll_unpin(cx) { Ready(Some(Ok(x))) => match x { firstbefore::Output::First(before, bulk) => { - debug!("have first from LT"); + trace_fetch!("have first from LT"); self.handle_first_lt(before, bulk); continue; } @@ -388,8 +445,9 @@ impl Stream for MergeRts { Ready(Some(Err(e.into()))) } Ready(None) => { - debug!("no first from LT"); + trace_fetch!("no first from LT"); self.inp_lt = None; + self.push_out_one_before(); let buf = core::mem::replace(&mut self.buf_lt, VecDeque::new()); self.state = State::ReadingLt(None, buf, self.inp_lt.take()); continue; @@ -433,7 +491,7 @@ impl Stream for MergeRts { self.state = State::ReadingLt(Some(Self::setup_read_any(inp)), buf, inp.take()); continue; } else { - debug!("transition ReadingLt to ReadingMt"); + trace_emit!("transition ReadingLt to ReadingMt"); let buf = core::mem::replace(&mut self.buf_mt, VecDeque::new()); self.state = State::ReadingMt(None, buf, self.inp_mt.take()); continue; @@ -476,7 +534,7 @@ impl Stream for MergeRts { self.state = State::ReadingMt(Some(Self::setup_read_any(inp)), buf, inp.take()); continue; } else { - debug!("transition ReadingMt to ReadingSt"); + trace_emit!("transition ReadingMt to ReadingSt"); let buf = core::mem::replace(&mut self.buf_st, VecDeque::new()); self.state = State::ReadingSt(None, buf, self.inp_st.take()); continue; @@ -519,7 +577,7 @@ impl Stream for MergeRts { self.state = State::ReadingSt(Some(Self::setup_read_any(inp)), buf, inp.take()); continue; } else { - debug!("fully done"); + trace_emit!("fully done"); Ready(None) } } diff --git a/crates/scyllaconn/src/worker.rs b/crates/scyllaconn/src/worker.rs index 470a709..64f25ca 100644 --- a/crates/scyllaconn/src/worker.rs +++ b/crates/scyllaconn/src/worker.rs @@ -159,7 +159,6 @@ impl ScyllaWorker { let stmts = Arc::new(stmts); info!("scylla worker PREPARE DONE"); loop { - info!("scylla worker WAIT FOR JOB"); let x = self.rx.recv().await; let job = match x { Ok(x) => x, @@ -169,14 +168,12 @@ impl ScyllaWorker { }; match job { Job::FindTsMsp(rt, series, range, bck, tx) => { - info!("scylla worker Job::FindTsMsp"); let res = crate::events2::msp::find_ts_msp(&rt, series, range, bck, &stmts, &scy).await; if tx.send(res.map_err(Into::into)).await.is_err() { // TODO count for stats } } Job::ReadNextValues(job) => { - info!("scylla worker Job::ReadNextValues"); let fut = (job.futgen)(scy.clone(), stmts.clone()); let res = fut.await; if job.tx.send(res.map_err(Into::into)).await.is_err() { @@ -184,7 +181,6 @@ impl ScyllaWorker { } } Job::AccountingReadTs(rt, ts, tx) => { - info!("scylla worker Job::AccountingReadTs"); let ks = match &rt { RetentionTime::Short => &self.scyconf_st.keyspace, RetentionTime::Medium => &self.scyconf_mt.keyspace, diff --git a/crates/streams/src/plaineventsstream.rs b/crates/streams/src/plaineventsstream.rs index d6ee21a..a53abf0 100644 --- a/crates/streams/src/plaineventsstream.rs +++ b/crates/streams/src/plaineventsstream.rs @@ -30,6 +30,7 @@ pub async fn dyn_events_stream( let subq = make_sub_query( ch_conf, evq.range().clone(), + evq.one_before_range(), evq.transform().clone(), evq.test_do_wasm(), evq, diff --git a/crates/streams/src/rangefilter2.rs b/crates/streams/src/rangefilter2.rs index 2b93c45..e4101fa 100644 --- a/crates/streams/src/rangefilter2.rs +++ b/crates/streams/src/rangefilter2.rs @@ -110,6 +110,7 @@ where item } } else { + trace!("discarding events len {:?}", ilge - 1); let mut dummy = item.new_empty(); item.drain_into(&mut dummy, (0, ilge - 1)) .map_err(|e| format!("{e} unexpected MergeError while remove of items"))?; diff --git a/crates/streams/src/tcprawclient.rs b/crates/streams/src/tcprawclient.rs index f4438e4..f863617 100644 --- a/crates/streams/src/tcprawclient.rs +++ b/crates/streams/src/tcprawclient.rs @@ -200,6 +200,7 @@ where pub fn make_sub_query( ch_conf: ChannelTypeConfigGen, range: SeriesRange, + one_before_range: bool, transform: TransformQuery, test_do_wasm: Option<&str>, sub: SUB, @@ -209,7 +210,7 @@ pub fn make_sub_query( where SUB: Into, { - let mut select = EventsSubQuerySelect::new(ch_conf, range, transform); + let mut select = EventsSubQuerySelect::new(ch_conf, range, one_before_range, transform); if let Some(wasm1) = test_do_wasm { select.set_wasm1(wasm1.into()); } diff --git a/crates/streams/src/timebinnedjson.rs b/crates/streams/src/timebinnedjson.rs index 8afa46b..2829272 100644 --- a/crates/streams/src/timebinnedjson.rs +++ b/crates/streams/src/timebinnedjson.rs @@ -46,6 +46,7 @@ async fn timebinnable_stream( let subq = make_sub_query( ch_conf, range.clone().into(), + one_before_range, query.transform().clone(), query.test_do_wasm(), &query,