From 6b4fa3f7e12e36e2c2e2c7a895845c47339ea35d Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 19 Jun 2024 11:20:28 +0200 Subject: [PATCH] Reduce db connections, improve merge mt/lt --- crates/daqbuffer/src/bin/daqbuffer.rs | 18 +- crates/daqbuffer/src/cli.rs | 1 + crates/daqbufp2/src/daqbufp2.rs | 10 + crates/dbconn/src/dbconn.rs | 2 +- crates/dbconn/src/search.rs | 39 ++- crates/httpret/src/api4/eventdata.rs | 2 + crates/httpret/src/httpret.rs | 9 + crates/httpret/src/proxy/api4.rs | 3 +- crates/items_0/src/items_0.rs | 11 +- crates/items_2/src/channelevents.rs | 64 +++- crates/items_2/src/eventfull.rs | 11 + crates/items_2/src/eventsdim0.rs | 17 +- crates/items_2/src/eventsdim1.rs | 10 +- crates/items_2/src/eventsxbindim0.rs | 12 +- crates/items_2/src/items_2.rs | 4 + crates/items_2/src/merger.rs | 1 + crates/netpod/Cargo.toml | 1 + crates/netpod/src/query.rs | 18 +- crates/nodenet/src/conn.rs | 2 +- crates/nodenet/src/scylla.rs | 1 - crates/query/Cargo.toml | 1 + crates/query/src/api4.rs | 9 +- crates/scyllaconn/src/conn.rs | 2 + crates/scyllaconn/src/events.rs | 59 +--- crates/scyllaconn/src/events2/events.rs | 45 ++- crates/scyllaconn/src/events2/firstbefore.rs | 179 ++++++++++- crates/scyllaconn/src/events2/mergert.rs | 302 ++++++++++++++----- crates/scyllaconn/src/scyllaconn.rs | 9 + crates/taskrun/src/taskrun.rs | 15 +- 29 files changed, 669 insertions(+), 188 deletions(-) diff --git a/crates/daqbuffer/src/bin/daqbuffer.rs b/crates/daqbuffer/src/bin/daqbuffer.rs index b3ce06f..1fa7a0d 100644 --- a/crates/daqbuffer/src/bin/daqbuffer.rs +++ b/crates/daqbuffer/src/bin/daqbuffer.rs @@ -17,6 +17,7 @@ use netpod::Shape; use taskrun::tokio; use tokio::fs::File; use tokio::io::AsyncReadExt; +use tracing::Instrument; pub fn main() { match taskrun::run(go()) { @@ -64,7 +65,7 @@ async fn go() -> Result<(), Error> { }; match opts.subcmd { SubCmd::Retrieval(subcmd) => { - info!("daqbuffer version {} +0005", clap::crate_version!()); + info!("daqbuffer version {} +0007", clap::crate_version!()); info!(" service_version {}", service_version); if false { #[allow(non_snake_case)] @@ -143,10 +144,25 @@ async fn go() -> Result<(), Error> { SubCmd::Version => { println!("{}", clap::crate_version!()); } + SubCmd::TestLog => { + test_log().await; + } } Ok(()) } +async fn test_log() { + daqbufp2::test_log().await; + let logspan = tracing::span!(tracing::Level::INFO, "log_span_debug", spanlevel = "info"); + daqbufp2::test_log().instrument(logspan).await; + let logspan = tracing::span!(tracing::Level::INFO, "log_span_debug", spanlevel = "trace"); + daqbufp2::test_log().instrument(logspan).await; + let logspan = tracing::span!(tracing::Level::TRACE, "log_span_trace", spanlevel = "info"); + daqbufp2::test_log().instrument(logspan).await; + let logspan = tracing::span!(tracing::Level::TRACE, "log_span_trace", spanlevel = "trace"); + daqbufp2::test_log().instrument(logspan).await; +} + // TODO test data needs to be generated. // TODO use httpclient for the request: need to add binary POST. //#[test] diff --git a/crates/daqbuffer/src/cli.rs b/crates/daqbuffer/src/cli.rs index c272d5f..53168b9 100644 --- a/crates/daqbuffer/src/cli.rs +++ b/crates/daqbuffer/src/cli.rs @@ -18,6 +18,7 @@ pub enum SubCmd { GenerateTestData, Test, Version, + TestLog, } #[derive(Debug, Parser)] diff --git a/crates/daqbufp2/src/daqbufp2.rs b/crates/daqbufp2/src/daqbufp2.rs index 66d9c23..80b6703 100644 --- a/crates/daqbufp2/src/daqbufp2.rs +++ b/crates/daqbufp2/src/daqbufp2.rs @@ -46,3 +46,13 @@ pub async fn run_proxy(proxy_config: ProxyConfig, service_version: ServiceVersio httpret::proxy::proxy(proxy_config, service_version).await?; Ok(()) } + +pub async fn test_log() { + use netpod::log::*; + error!("------"); + warn!("------"); + info!("------"); + debug!("------"); + trace!("------"); + httpret::test_log().await; +} diff --git a/crates/dbconn/src/dbconn.rs b/crates/dbconn/src/dbconn.rs index 35c4d62..90ac476 100644 --- a/crates/dbconn/src/dbconn.rs +++ b/crates/dbconn/src/dbconn.rs @@ -66,7 +66,7 @@ pub async fn delay_io_medium() { } pub async fn create_connection(db_config: &Database) -> Result<(PgClient, JoinHandle>), Error> { - warn!("create_connection\n\n CREATING CONNECTION\n\n"); + warn!("create_connection\n\n CREATING POSTGRES CONNECTION\n\n"); // TODO use a common already running worker pool for these queries: let d = db_config; let uri = format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, d.port, d.name); diff --git a/crates/dbconn/src/search.rs b/crates/dbconn/src/search.rs index ec64b3e..184e31e 100644 --- a/crates/dbconn/src/search.rs +++ b/crates/dbconn/src/search.rs @@ -13,6 +13,7 @@ use serde_json::Value as JsVal; pub async fn search_channel_databuffer( query: ChannelSearchQuery, + backend: &str, node_config: &NodeConfigCached, ) -> Result { let empty = if !query.name_regex.is_empty() { @@ -33,12 +34,19 @@ pub async fn search_channel_databuffer( " channel_id, channel_name, source_name,", " dtype, shape, unit, description, channel_backend", " from searchext($1, $2, $3, $4)", + " where channel_backend = $5" ); let (pg, _pgjh) = create_connection(&node_config.node_config.cluster.database).await?; let rows = pg .query( sql, - &[&query.name_regex, &query.source_regex, &query.description_regex, &"asc"], + &[ + &query.name_regex, + &query.source_regex, + &query.description_regex, + &"asc", + &backend, + ], ) .await .err_conv()?; @@ -90,14 +98,19 @@ pub async fn search_channel_databuffer( Ok(ret) } -pub async fn search_channel_scylla(query: ChannelSearchQuery, pgconf: &Database) -> Result { +pub async fn search_channel_scylla( + query: ChannelSearchQuery, + backend: &str, + pgconf: &Database, +) -> Result { let empty = if !query.name_regex.is_empty() { false } else { true }; if empty { let ret = ChannelSearchResult { channels: Vec::new() }; return Ok(ret); } let ch_kind: i16 = if query.channel_status { 1 } else { 2 }; - let (cb1, cb2) = if let Some(x) = &query.backend { + let tmp_backend = Some(backend.to_string()); + let (cb1, cb2) = if let Some(x) = &tmp_backend { (false, x.as_str()) } else { (true, "") @@ -266,19 +279,17 @@ async fn search_channel_archeng( Ok(ret) } -pub async fn search_channel( - query: ChannelSearchQuery, - node_config: &NodeConfigCached, -) -> Result { - let pgconf = &node_config.node_config.cluster.database; - if let Some(_scyconf) = node_config.node_config.cluster.scylla_st() { - search_channel_scylla(query, pgconf).await - } else if let Some(conf) = node_config.node.channel_archiver.as_ref() { - search_channel_archeng(query, node_config.node_config.cluster.backend.clone(), conf, pgconf).await - } else if let Some(_conf) = node_config.node.archiver_appliance.as_ref() { +pub async fn search_channel(query: ChannelSearchQuery, ncc: &NodeConfigCached) -> Result { + let backend = &ncc.node_config.cluster.backend; + let pgconf = &ncc.node_config.cluster.database; + if let Some(_scyconf) = ncc.node_config.cluster.scylla_st() { + search_channel_scylla(query, backend, pgconf).await + } else if let Some(conf) = ncc.node.channel_archiver.as_ref() { + search_channel_archeng(query, backend.clone(), conf, pgconf).await + } else if let Some(_conf) = ncc.node.archiver_appliance.as_ref() { // TODO err::todoval() } else { - search_channel_databuffer(query, node_config).await + search_channel_databuffer(query, backend, ncc).await } } diff --git a/crates/httpret/src/api4/eventdata.rs b/crates/httpret/src/api4/eventdata.rs index ba6fde0..f169f1f 100644 --- a/crates/httpret/src/api4/eventdata.rs +++ b/crates/httpret/src/api4/eventdata.rs @@ -84,7 +84,9 @@ impl EventDataHandler { let frames = nodenet::conn::events_get_input_frames(inp) .await .map_err(|_| EventDataError::InternalError)?; + info!("start parse"); let (evsubq,) = nodenet::conn::events_parse_input_query(frames).map_err(|_| EventDataError::QueryParse)?; + info!("done parse"); let logspan = if false { tracing::Span::none() } else if evsubq.log_level() == "trace" { diff --git a/crates/httpret/src/httpret.rs b/crates/httpret/src/httpret.rs index 78bcb4a..cb69535 100644 --- a/crates/httpret/src/httpret.rs +++ b/crates/httpret/src/httpret.rs @@ -522,3 +522,12 @@ async fn clear_cache_all( .body(body_string(serde_json::to_string(&res)?))?; Ok(ret) } + +pub async fn test_log() { + error!("------"); + warn!("------"); + info!("------"); + debug!("------"); + trace!("------"); + scyllaconn::test_log().await; +} diff --git a/crates/httpret/src/proxy/api4.rs b/crates/httpret/src/proxy/api4.rs index 8d5244c..297b700 100644 --- a/crates/httpret/src/proxy/api4.rs +++ b/crates/httpret/src/proxy/api4.rs @@ -74,11 +74,12 @@ pub async fn channel_search(req: Requ, ctx: &ReqCtx, proxy_config: &ProxyConfig) let res: ChannelSearchResult = match serde_json::from_slice(&body) { Ok(k) => k, Err(_) => { - let msg = format!("can not parse result: {}", String::from_utf8_lossy(&body)); + let msg = format!("can not parse result tag {} {}", tag, String::from_utf8_lossy(&body)); error!("{}", msg); return Err(Error::with_msg_no_trace(msg)); } }; + info!("from {} len {}", tag, res.channels.len()); let ret = SubRes { tag, status: StatusCode::OK, diff --git a/crates/items_0/src/items_0.rs b/crates/items_0/src/items_0.rs index a10788a..c495a1d 100644 --- a/crates/items_0/src/items_0.rs +++ b/crates/items_0/src/items_0.rs @@ -110,6 +110,8 @@ impl fmt::Display for MergeError { } } +impl std::error::Error for MergeError {} + // TODO can I remove the Any bound? /// Container of some form of events, for use as trait object. @@ -137,7 +139,7 @@ pub trait Events: // TODO is this used? fn take_new_events_until_ts(&mut self, ts_end: u64) -> Box; fn new_empty_evs(&self) -> Box; - fn drain_into_evs(&mut self, dst: &mut Box, range: (usize, usize)) -> Result<(), MergeError>; + fn drain_into_evs(&mut self, dst: &mut dyn Events, range: (usize, usize)) -> Result<(), MergeError>; fn find_lowest_index_gt_evs(&self, ts: u64) -> Option; fn find_lowest_index_ge_evs(&self, ts: u64) -> Option; fn find_highest_index_lt_evs(&self, ts: u64) -> Option; @@ -151,6 +153,7 @@ pub trait Events: fn to_min_max_avg(&mut self) -> Box; fn to_json_vec_u8(&self) -> Vec; fn to_cbor_vec_u8(&self) -> Vec; + fn clear(&mut self); } impl WithLen for Box { @@ -222,7 +225,7 @@ impl Events for Box { Events::new_empty_evs(self.as_ref()) } - fn drain_into_evs(&mut self, dst: &mut Box, range: (usize, usize)) -> Result<(), MergeError> { + fn drain_into_evs(&mut self, dst: &mut dyn Events, range: (usize, usize)) -> Result<(), MergeError> { Events::drain_into_evs(self.as_mut(), dst, range) } @@ -277,4 +280,8 @@ impl Events for Box { fn to_cbor_vec_u8(&self) -> Vec { Events::to_cbor_vec_u8(self.as_ref()) } + + fn clear(&mut self) { + Events::clear(self.as_mut()) + } } diff --git a/crates/items_2/src/channelevents.rs b/crates/items_2/src/channelevents.rs index f918917..fe16a6e 100644 --- a/crates/items_2/src/channelevents.rs +++ b/crates/items_2/src/channelevents.rs @@ -154,6 +154,15 @@ pub enum ChannelEvents { Status(Option), } +impl ChannelEvents { + pub fn is_events(&self) -> bool { + match self { + ChannelEvents::Events(_) => true, + ChannelEvents::Status(_) => false, + } + } +} + impl TypeName for ChannelEvents { fn type_name(&self) -> String { any::type_name::().into() @@ -702,6 +711,17 @@ impl Mergeable for ChannelEvents { } } + fn clear(&mut self) { + match self { + ChannelEvents::Events(x) => { + Mergeable::clear(x); + } + ChannelEvents::Status(x) => { + *x = None; + } + } + } + fn drain_into(&mut self, dst: &mut Self, range: (usize, usize)) -> Result<(), MergeError> { match self { ChannelEvents::Events(k) => match dst { @@ -829,7 +849,10 @@ impl Events for ChannelEvents { } fn verify(&self) -> bool { - todo!() + match self { + ChannelEvents::Events(x) => Events::verify(x), + ChannelEvents::Status(_) => panic!(), + } } fn output_info(&self) -> String { @@ -861,11 +884,26 @@ impl Events for ChannelEvents { } fn new_empty_evs(&self) -> Box { - todo!() + match self { + ChannelEvents::Events(x) => Events::new_empty_evs(x), + ChannelEvents::Status(_) => panic!(), + } } - fn drain_into_evs(&mut self, dst: &mut Box, range: (usize, usize)) -> Result<(), MergeError> { - todo!() + fn drain_into_evs(&mut self, dst: &mut dyn Events, range: (usize, usize)) -> Result<(), MergeError> { + let dst2 = if let Some(x) = dst.as_any_mut().downcast_mut::() { + // debug!("unwrapped dst ChannelEvents as well"); + x + } else { + panic!("dst is not ChannelEvents"); + }; + match self { + ChannelEvents::Events(k) => match dst2 { + ChannelEvents::Events(j) => Events::drain_into_evs(k, j, range), + ChannelEvents::Status(_) => panic!("dst is not events"), + }, + ChannelEvents::Status(_) => panic!("self is not events"), + } } fn find_lowest_index_gt_evs(&self, ts: u64) -> Option { @@ -896,11 +934,14 @@ impl Events for ChannelEvents { todo!() } - fn tss(&self) -> &std::collections::VecDeque { - todo!() + fn tss(&self) -> &VecDeque { + match self { + ChannelEvents::Events(x) => Events::tss(x), + ChannelEvents::Status(_) => panic!(), + } } - fn pulses(&self) -> &std::collections::VecDeque { + fn pulses(&self) -> &VecDeque { todo!() } @@ -934,6 +975,15 @@ impl Events for ChannelEvents { } } } + + fn clear(&mut self) { + match self { + ChannelEvents::Events(x) => Events::clear(x.as_mut()), + ChannelEvents::Status(x) => { + *x = None; + } + } + } } impl Collectable for ChannelEvents { diff --git a/crates/items_2/src/eventfull.rs b/crates/items_2/src/eventfull.rs index 5fce3de..fbcb173 100644 --- a/crates/items_2/src/eventfull.rs +++ b/crates/items_2/src/eventfull.rs @@ -201,6 +201,17 @@ impl Mergeable for EventFull { Empty::empty() } + fn clear(&mut self) { + self.tss.clear(); + self.pulses.clear(); + self.blobs.clear(); + self.scalar_types.clear(); + self.be.clear(); + self.shapes.clear(); + self.comps.clear(); + self.entry_payload_max = 0; + } + fn drain_into(&mut self, dst: &mut Self, range: (usize, usize)) -> Result<(), MergeError> { // TODO make it harder to forget new members when the struct may get modified in the future let r = range.0..range.1; diff --git a/crates/items_2/src/eventsdim0.rs b/crates/items_2/src/eventsdim0.rs index dba7ea3..3a1bbd1 100644 --- a/crates/items_2/src/eventsdim0.rs +++ b/crates/items_2/src/eventsdim0.rs @@ -859,9 +859,9 @@ impl Events for EventsDim0 { Box::new(Self::empty()) } - fn drain_into_evs(&mut self, dst: &mut Box, range: (usize, usize)) -> Result<(), MergeError> { + fn drain_into_evs(&mut self, dst: &mut dyn Events, range: (usize, usize)) -> Result<(), MergeError> { // TODO as_any and as_any_mut are declared on unrelated traits. Simplify. - if let Some(dst) = dst.as_mut().as_any_mut().downcast_mut::() { + if let Some(dst) = dst.as_any_mut().downcast_mut::() { // TODO make it harder to forget new members when the struct may get modified in the future let r = range.0..range.1; dst.tss.extend(self.tss.drain(r.clone())); @@ -869,7 +869,12 @@ impl Events for EventsDim0 { dst.values.extend(self.values.drain(r.clone())); Ok(()) } else { - error!("downcast to EventsDim0 FAILED"); + error!( + "downcast to EventsDim0 FAILED\n\n{}\n\n{}\n\n", + self.type_name(), + dst.type_name() + ); + panic!(); Err(MergeError::NotCompatible) } } @@ -989,6 +994,12 @@ impl Events for EventsDim0 { ciborium::into_writer(&ret, &mut buf).unwrap(); buf } + + fn clear(&mut self) { + self.tss.clear(); + self.pulses.clear(); + self.values.clear(); + } } #[derive(Debug)] diff --git a/crates/items_2/src/eventsdim1.rs b/crates/items_2/src/eventsdim1.rs index 33250b2..802afb3 100644 --- a/crates/items_2/src/eventsdim1.rs +++ b/crates/items_2/src/eventsdim1.rs @@ -813,9 +813,9 @@ impl Events for EventsDim1 { Box::new(Self::empty()) } - fn drain_into_evs(&mut self, dst: &mut Box, range: (usize, usize)) -> Result<(), MergeError> { + fn drain_into_evs(&mut self, dst: &mut dyn Events, range: (usize, usize)) -> Result<(), MergeError> { // TODO as_any and as_any_mut are declared on unrelated traits. Simplify. - if let Some(dst) = dst.as_mut().as_any_mut().downcast_mut::() { + if let Some(dst) = dst.as_any_mut().downcast_mut::() { // TODO make it harder to forget new members when the struct may get modified in the future let r = range.0..range.1; dst.tss.extend(self.tss.drain(r.clone())); @@ -949,6 +949,12 @@ impl Events for EventsDim1 { ciborium::into_writer(&ret, &mut buf).unwrap(); buf } + + fn clear(&mut self) { + self.tss.clear(); + self.pulses.clear(); + self.values.clear(); + } } #[derive(Debug)] diff --git a/crates/items_2/src/eventsxbindim0.rs b/crates/items_2/src/eventsxbindim0.rs index 62102d9..b8e004d 100644 --- a/crates/items_2/src/eventsxbindim0.rs +++ b/crates/items_2/src/eventsxbindim0.rs @@ -261,9 +261,9 @@ impl Events for EventsXbinDim0 { Box::new(Self::empty()) } - fn drain_into_evs(&mut self, dst: &mut Box, range: (usize, usize)) -> Result<(), MergeError> { + fn drain_into_evs(&mut self, dst: &mut dyn Events, range: (usize, usize)) -> Result<(), MergeError> { // TODO as_any and as_any_mut are declared on unrelated traits. Simplify. - if let Some(dst) = dst.as_mut().as_any_mut().downcast_mut::() { + if let Some(dst) = dst.as_any_mut().downcast_mut::() { // TODO make it harder to forget new members when the struct may get modified in the future let r = range.0..range.1; dst.tss.extend(self.tss.drain(r.clone())); @@ -365,6 +365,14 @@ impl Events for EventsXbinDim0 { fn to_cbor_vec_u8(&self) -> Vec { todo!() } + + fn clear(&mut self) { + self.tss.clear(); + self.pulses.clear(); + self.mins.clear(); + self.maxs.clear(); + self.avgs.clear(); + } } #[derive(Debug)] diff --git a/crates/items_2/src/items_2.rs b/crates/items_2/src/items_2.rs index 3c31de9..f379953 100644 --- a/crates/items_2/src/items_2.rs +++ b/crates/items_2/src/items_2.rs @@ -170,6 +170,10 @@ impl Mergeable for Box { self.as_ref().new_empty_evs() } + fn clear(&mut self) { + Events::clear(self.as_mut()) + } + fn drain_into(&mut self, dst: &mut Self, range: (usize, usize)) -> Result<(), MergeError> { self.as_mut().drain_into_evs(dst, range) } diff --git a/crates/items_2/src/merger.rs b/crates/items_2/src/merger.rs index e144d12..c9ae57b 100644 --- a/crates/items_2/src/merger.rs +++ b/crates/items_2/src/merger.rs @@ -47,6 +47,7 @@ pub trait Mergeable: fmt::Debug + WithLen + ByteEstimate + Unpin { fn ts_min(&self) -> Option; fn ts_max(&self) -> Option; fn new_empty(&self) -> Self; + fn clear(&mut self); // TODO when MergeError::Full gets returned, any guarantees about what has been modified or kept unchanged? fn drain_into(&mut self, dst: &mut Self, range: (usize, usize)) -> Result<(), MergeError>; fn find_lowest_index_gt(&self, ts: u64) -> Option; diff --git a/crates/netpod/Cargo.toml b/crates/netpod/Cargo.toml index 3a5b286..41117e1 100644 --- a/crates/netpod/Cargo.toml +++ b/crates/netpod/Cargo.toml @@ -11,6 +11,7 @@ path = "src/netpod.rs" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" http = "1.0.0" +humantime = "2.1.0" humantime-serde = "1.1.1" async-channel = "1.8.0" bytes = "1.4.0" diff --git a/crates/netpod/src/query.rs b/crates/netpod/src/query.rs index 09126f5..74695eb 100644 --- a/crates/netpod/src/query.rs +++ b/crates/netpod/src/query.rs @@ -85,6 +85,20 @@ pub struct TimeRangeQuery { range: NanoRange, } +fn parse_time(v: &str) -> Result, Error> { + if let Ok(x) = v.parse() { + Ok(x) + } else { + if v.ends_with("ago") { + let d = humantime::parse_duration(&v[..v.len() - 3]) + .map_err(|_| Error::with_public_msg_no_trace(format!("can not parse {v}")))?; + Ok(Utc::now() - d) + } else { + Err(Error::with_public_msg_no_trace(format!("can not parse {v}"))) + } + } +} + impl FromUrl for TimeRangeQuery { fn from_url(url: &Url) -> Result { let pairs = get_url_query_pairs(url); @@ -95,8 +109,8 @@ impl FromUrl for TimeRangeQuery { if let (Some(beg), Some(end)) = (pairs.get("begDate"), pairs.get("endDate")) { let ret = Self { range: NanoRange { - beg: beg.parse::>()?.to_nanos(), - end: end.parse::>()?.to_nanos(), + beg: parse_time(beg)?.to_nanos(), + end: parse_time(end)?.to_nanos(), }, }; Ok(ret) diff --git a/crates/nodenet/src/conn.rs b/crates/nodenet/src/conn.rs index eb5c4d4..26ea1d3 100644 --- a/crates/nodenet/src/conn.rs +++ b/crates/nodenet/src/conn.rs @@ -118,7 +118,7 @@ pub async fn create_response_bytes_stream( scyqueue: Option<&ScyllaQueue>, ncc: &NodeConfigCached, ) -> Result { - debug!( + info!( "create_response_bytes_stream {:?} {:?}", evq.ch_conf().scalar_type(), evq.ch_conf().shape(), diff --git a/crates/nodenet/src/scylla.rs b/crates/nodenet/src/scylla.rs index 74f0824..ca93919 100644 --- a/crates/nodenet/src/scylla.rs +++ b/crates/nodenet/src/scylla.rs @@ -29,7 +29,6 @@ pub async fn scylla_channel_event_stream( let shape = chconf.shape(); let do_test_stream_error = false; let with_values = evq.need_value_data(); - debug!("\n\nmake EventsStreamScylla {series:?} {scalar_type:?} {shape:?}\n"); let stream: Pin + Send>> = if evq.use_all_rt() { let x = scyllaconn::events2::mergert::MergeRts::new( SeriesId::new(chconf.series()), diff --git a/crates/query/Cargo.toml b/crates/query/Cargo.toml index 3dca474..f89a577 100644 --- a/crates/query/Cargo.toml +++ b/crates/query/Cargo.toml @@ -10,6 +10,7 @@ serde_json = "1.0" tracing = "0.1" chrono = { version = "0.4.19", features = ["serde"] } url = "2.2" +humantime = "2.1.0" humantime-serde = "1.1.1" err = { path = "../err" } netpod = { path = "../netpod" } diff --git a/crates/query/src/api4.rs b/crates/query/src/api4.rs index 6f5c6b9..4d00698 100644 --- a/crates/query/src/api4.rs +++ b/crates/query/src/api4.rs @@ -6,6 +6,7 @@ use chrono::TimeZone; use chrono::Utc; use err::Error; use netpod::get_url_query_pairs; +use netpod::log::*; use netpod::range::evrange::SeriesRange; use netpod::AppendToUrl; use netpod::FromUrl; @@ -124,7 +125,13 @@ impl FromUrl for AccountingToplistQuery { let v = pairs .get("tsDate") .ok_or(Error::with_public_msg_no_trace("missing tsDate"))?; - let w = v.parse::>()?; + let mut w = v.parse::>(); + if w.is_err() && v.ends_with("ago") { + let d = humantime::parse_duration(&v[..v.len() - 3]) + .map_err(|_| Error::with_public_msg_no_trace(format!("can not parse {v}")))?; + w = Ok(Utc::now() - d); + } + let w = w?; Ok::<_, Error>(TsNano::from_ns(w.to_nanos())) }; let ret = Self { diff --git a/crates/scyllaconn/src/conn.rs b/crates/scyllaconn/src/conn.rs index e57011f..8e1b31c 100644 --- a/crates/scyllaconn/src/conn.rs +++ b/crates/scyllaconn/src/conn.rs @@ -1,5 +1,6 @@ use crate::errconv::ErrConv; use err::Error; +use netpod::log::*; use netpod::ScyllaConfig; use scylla::execution_profile::ExecutionProfileBuilder; use scylla::statement::Consistency; @@ -16,6 +17,7 @@ pub async fn create_scy_session(scyconf: &ScyllaConfig) -> Result Result { + warn!("create_connection\n\n CREATING SCYLLA CONNECTION\n\n"); let scy = scylla::SessionBuilder::new() .known_nodes(&scyconf.hosts) .default_execution_profile_handle( diff --git a/crates/scyllaconn/src/events.rs b/crates/scyllaconn/src/events.rs index 475eac2..f9e73b4 100644 --- a/crates/scyllaconn/src/events.rs +++ b/crates/scyllaconn/src/events.rs @@ -1,4 +1,3 @@ -use crate::errconv::ErrConv; use crate::range::ScyllaSeriesRange; use crate::worker::ScyllaQueue; use err::thiserror; @@ -25,7 +24,6 @@ use netpod::TsNano; use scylla::frame::response::result::Row; use scylla::prepared_statement::PreparedStatement; use scylla::Session; -use scylla::Session as ScySession; use series::SeriesId; use std::collections::VecDeque; use std::mem; @@ -271,7 +269,7 @@ pub(super) async fn find_ts_msp( range: ScyllaSeriesRange, bck: bool, stmts: &StmtsEvents, - scy: &ScySession, + scy: &Session, ) -> Result, Error> { trace!("find_ts_msp series {:?} {:?} {:?} bck {}", rt, series, range, bck); if bck { @@ -286,7 +284,7 @@ async fn find_ts_msp_fwd( series: u64, range: ScyllaSeriesRange, stmts: &StmtsEvents, - scy: &ScySession, + scy: &Session, ) -> Result, Error> { let mut ret = VecDeque::new(); // TODO time range truncation can be handled better @@ -308,7 +306,7 @@ async fn find_ts_msp_bck( series: u64, range: ScyllaSeriesRange, stmts: &StmtsEvents, - scy: &ScySession, + scy: &Session, ) -> Result, Error> { let mut ret = VecDeque::new(); let params = (series as i64, range.beg().ms() as i64); @@ -324,7 +322,7 @@ async fn find_ts_msp_bck( Ok(ret) } -trait ValTy: Sized + 'static { +pub(super) trait ValTy: Sized + 'static { type ScaTy: ScalarOps + std::default::Default; type ScyTy: scylla::cql_to_rust::FromCqlVal; type Container: Events + Appendable; @@ -499,7 +497,7 @@ where { // TODO could take scyqeue out of opts struct. let scyqueue = opts.scyqueue.clone(); - let futgen = Box::new(|scy: Arc, stmts: Arc| { + let futgen = Box::new(|scy: Arc, stmts: Arc| { let fut = async { read_next_values_2::(opts, scy, stmts) .await @@ -513,7 +511,7 @@ where async fn read_next_values_2( opts: ReadNextValuesOpts, - scy: Arc, + scy: Arc, stmts: Arc, ) -> Result, Error> where @@ -545,20 +543,6 @@ where ts_lsp_max, table_name, ); - let dir = "fwd"; - let qu_name = if opts.with_values { - if ST::is_valueblob() { - format!("array_{}_valueblobs_{}", ST::st_name(), dir) - } else { - format!("scalar_{}_values_{}", ST::st_name(), dir) - } - } else { - if ST::is_valueblob() { - format!("array_{}_timestamps_{}", ST::st_name(), dir) - } else { - format!("scalar_{}_timestamps_{}", ST::st_name(), dir) - } - }; let qu = stmts .rt(&opts.rt) .lsp(!opts.fwd, opts.with_values) @@ -586,20 +570,6 @@ where DtNano::from_ns(0) }; trace!("BCK ts_msp {} ts_lsp_max {} {}", ts_msp, ts_lsp_max, table_name,); - let dir = "bck"; - let qu_name = if opts.with_values { - if ST::is_valueblob() { - format!("array_{}_valueblobs_{}", ST::st_name(), dir) - } else { - format!("scalar_{}_values_{}", ST::st_name(), dir) - } - } else { - if ST::is_valueblob() { - format!("array_{}_timestamps_{}", ST::st_name(), dir) - } else { - format!("scalar_{}_timestamps_{}", ST::st_name(), dir) - } - }; let qu = stmts .rt(&opts.rt) .lsp(!opts.fwd, opts.with_values) @@ -829,7 +799,7 @@ pub struct EventsStreamScylla { } impl EventsStreamScylla { - pub fn new( + pub fn _new( rt: RetentionTime, series: u64, range: ScyllaSeriesRange, @@ -990,16 +960,6 @@ impl EventsStreamScylla { } } -async fn find_ts_msp_via_queue( - rt: RetentionTime, - series: u64, - range: ScyllaSeriesRange, - bck: bool, - scyqueue: ScyllaQueue, -) -> Result, crate::worker::Error> { - scyqueue.find_ts_msp(rt, series, range, bck).await -} - impl Stream for EventsStreamScylla { type Item = Result; @@ -1032,8 +992,9 @@ impl Stream for EventsStreamScylla { let series = self.series.clone(); let range = self.range.clone(); // TODO this no longer works, we miss the backwards part here - let fut = find_ts_msp_via_queue(self.rt.clone(), series, range, false, self.scyqueue.clone()); - let fut = Box::pin(fut); + // let fut = find_ts_msp_via_queue(self.rt.clone(), series, range, false, self.scyqueue.clone()); + // let fut = Box::pin(fut); + let fut = todo!(); self.state = FrState::FindMsp(fut); continue; } diff --git a/crates/scyllaconn/src/events2/events.rs b/crates/scyllaconn/src/events2/events.rs index e2fefde..f20077b 100644 --- a/crates/scyllaconn/src/events2/events.rs +++ b/crates/scyllaconn/src/events2/events.rs @@ -52,6 +52,7 @@ enum State { Begin, Reading(Reading), InputDone, + Done, } pub struct EventsStreamRt { @@ -180,15 +181,45 @@ impl Stream for EventsStreamRt { use Poll::*; loop { if let Some(item) = self.out.pop_front() { - item.verify(); + if !item.verify() { + debug!("{}bad item {:?}", "\n\n--------------------------\n", item); + self.state = State::Done; + break Ready(Some(Err(Error::Logic))); + } if let Some(item_min) = item.ts_min() { + if item_min < self.range.beg().ns() { + debug!( + "{}out of range error A {} {:?}", + "\n\n--------------------------\n", item_min, self.range + ); + self.state = State::Done; + break Ready(Some(Err(Error::Logic))); + } if item_min < self.ts_seen_max { - debug!("ordering error A {} {}", item_min, self.ts_seen_max); + debug!( + "{}ordering error A {} {}", + "\n\n--------------------------\n", item_min, self.ts_seen_max + ); + self.state = State::Done; + break Ready(Some(Err(Error::Logic))); } } if let Some(item_max) = item.ts_max() { + if item_max >= self.range.end().ns() { + debug!( + "{}out of range error B {} {:?}", + "\n\n--------------------------\n", item_max, self.range + ); + self.state = State::Done; + break Ready(Some(Err(Error::Logic))); + } if item_max < self.ts_seen_max { - debug!("ordering error B {} {}", item_max, self.ts_seen_max); + debug!( + "{}ordering error B {} {}", + "\n\n--------------------------\n", item_max, self.ts_seen_max + ); + self.state = State::Done; + break Ready(Some(Err(Error::Logic))); } else { self.ts_seen_max = item_max; } @@ -218,6 +249,7 @@ impl Stream for EventsStreamRt { st.reading_state = ReadingState::FetchEvents(FetchEvents { fut }); continue; } else { + self.state = State::Done; Ready(Some(Err(Error::Logic))) } } @@ -240,10 +272,14 @@ impl Stream for EventsStreamRt { st.reading_state = ReadingState::FetchMsp(FetchMsp { fut }); continue; } else { + self.state = State::Done; Ready(Some(Err(Error::Logic))) } } - Ready(Err(e)) => Ready(Some(Err(e.into()))), + Ready(Err(e)) => { + self.state = State::Done; + Ready(Some(Err(e.into()))) + } Pending => Pending, }, }, @@ -254,6 +290,7 @@ impl Stream for EventsStreamRt { continue; } } + State::Done => Ready(None), }; } } diff --git a/crates/scyllaconn/src/events2/firstbefore.rs b/crates/scyllaconn/src/events2/firstbefore.rs index 9c0aafa..da102eb 100644 --- a/crates/scyllaconn/src/events2/firstbefore.rs +++ b/crates/scyllaconn/src/events2/firstbefore.rs @@ -1,22 +1,179 @@ +use err::thiserror; +use err::ThisError; use futures_util::Stream; +use futures_util::StreamExt; use items_0::Events; -use items_0::WithLen; +use items_2::merger::Mergeable; +use netpod::log::*; +use netpod::TsNano; use std::pin::Pin; use std::task::Context; use std::task::Poll; -pub struct FirstBefore { - inp: S, +#[derive(Debug, ThisError)] +pub enum Error { + Unordered, + Logic, + Input(Box), } -impl Stream for FirstBefore -where - S: Stream> + Unpin, - T: Events, -{ - type Item = ::Item; +pub enum Output { + First(T, T), + Bulk(T), +} - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - todo!() +enum State { + Begin, + Bulk, + Done, +} + +pub struct FirstBeforeAndInside +where + S: Stream + Unpin, + T: Events + Mergeable + Unpin, +{ + ts0: TsNano, + inp: S, + state: State, + buf: Option, +} + +impl FirstBeforeAndInside +where + S: Stream + Unpin, + T: Events + Mergeable + Unpin, +{ + pub fn new(inp: S, ts0: TsNano) -> Self { + Self { + ts0, + inp, + state: State::Begin, + buf: None, + } } } + +impl Stream for FirstBeforeAndInside +where + S: Stream> + Unpin, + T: Events + Mergeable + Unpin, + E: std::error::Error + Send + 'static, +{ + type Item = Result, Error>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + loop { + break match &self.state { + State::Begin => match self.inp.poll_next_unpin(cx) { + Ready(Some(Ok(mut item))) => { + // It is an invariant that we process ordered streams, but for robustness + // verify the batch item again: + if item.verify() != true { + self.state = State::Done; + let e = Error::Unordered; + Ready(Some(Err(e))) + } else { + // Separate events into before and bulk + let tss = item.tss(); + let pp = tss.partition_point(|&x| x < self.ts0.ns()); + if pp >= tss.len() { + // all entries are before + if self.buf.is_none() { + self.buf = Some(item.new_empty()); + } + match item.drain_into_evs(self.buf.as_mut().unwrap(), (0, item.len())) { + Ok(()) => { + continue; + } + Err(e) => { + self.state = State::Done; + Ready(Some(Err(Error::Input(Box::new(e))))) + } + } + } else if pp == 0 { + // all entries are bulk + debug!("transition immediately to bulk"); + self.state = State::Bulk; + let o1 = core::mem::replace(&mut self.buf, Some(item.new_empty())) + .unwrap_or_else(|| item.new_empty()); + Ready(Some(Ok(Output::First(o1, item)))) + } else { + // mixed + match item.drain_into_evs(self.buf.as_mut().unwrap(), (0, pp)) { + Ok(()) => { + debug!("transition with mixed to bulk"); + self.state = State::Bulk; + let o1 = core::mem::replace(&mut self.buf, Some(item.new_empty())) + .unwrap_or_else(|| item.new_empty()); + Ready(Some(Ok(Output::First(o1, item)))) + } + Err(e) => { + self.state = State::Done; + Ready(Some(Err(Error::Input(Box::new(e))))) + } + } + } + } + } + Ready(Some(Err(e))) => { + self.state = State::Done; + Ready(Some(Err(Error::Input(Box::new(e))))) + } + Ready(None) => { + self.state = State::Done; + Ready(None) + } + Pending => Pending, + }, + State::Bulk => { + if self.buf.as_ref().map_or(0, |x| x.len()) != 0 { + self.state = State::Done; + Ready(Some(Err(Error::Logic))) + } else { + match self.inp.poll_next_unpin(cx) { + Ready(Some(Ok(item))) => { + if item.verify() != true { + self.state = State::Done; + let e = Error::Unordered; + Ready(Some(Err(e))) + } else { + debug!("output bulk item len {}", item.len()); + Ready(Some(Ok(Output::Bulk(item)))) + } + } + Ready(Some(Err(e))) => { + self.state = State::Done; + Ready(Some(Err(Error::Input(Box::new(e))))) + } + Ready(None) => { + debug!("in bulk, input done"); + self.state = State::Done; + Ready(None) + } + Pending => Pending, + } + } + } + State::Done => Ready(None), + }; + } + } +} + +fn trait_assert(_: T) +where + T: Stream + Unpin + Send, +{ +} + +#[allow(unused)] +fn trait_assert_try() { + let x: FirstBeforeAndInside = phantomval(); + trait_assert(x); +} + +fn phantomval() -> T { + panic!() +} diff --git a/crates/scyllaconn/src/events2/mergert.rs b/crates/scyllaconn/src/events2/mergert.rs index a2c9498..158415a 100644 --- a/crates/scyllaconn/src/events2/mergert.rs +++ b/crates/scyllaconn/src/events2/mergert.rs @@ -1,5 +1,6 @@ use super::events::EventsStreamRt; -use super::nonempty::NonEmpty; +use super::firstbefore::FirstBeforeAndInside; +use crate::events2::firstbefore; use crate::range::ScyllaSeriesRange; use crate::worker::ScyllaQueue; use err::thiserror; @@ -8,6 +9,7 @@ use futures_util::Future; use futures_util::FutureExt; use futures_util::Stream; use futures_util::StreamExt; +use items_0::WithLen; use items_2::channelevents::ChannelEvents; use items_2::merger::Mergeable; use netpod::log::*; @@ -24,6 +26,7 @@ use std::task::Poll; #[derive(Debug, ThisError)] pub enum Error { + Input(#[from] crate::events2::firstbefore::Error), Events(#[from] crate::events2::events::Error), Logic, } @@ -74,8 +77,11 @@ where } } +type TI = FirstBeforeAndInside; +type INPI = Result, crate::events2::firstbefore::Error>; + struct ReadEvents { - fut: Pin>> + Send>>, + fut: Pin> + Send>>, } enum State { @@ -83,22 +89,10 @@ enum State { FetchFirstSt(ReadEvents), FetchFirstMt(ReadEvents), FetchFirstLt(ReadEvents), - ReadingLt( - Option, - VecDeque, - Option>>, - ), - ReadingMt( - Option, - VecDeque, - Option>>, - ), - ReadingSt( - Option, - VecDeque, - Option>>, - ), - Error, + ReadingLt(Option, VecDeque, Option>), + ReadingMt(Option, VecDeque, Option>), + ReadingSt(Option, VecDeque, Option>), + Done, } pub struct MergeRts { @@ -106,16 +100,20 @@ pub struct MergeRts { scalar_type: ScalarType, shape: Shape, range: ScyllaSeriesRange, + range_mt: ScyllaSeriesRange, + range_lt: ScyllaSeriesRange, with_values: bool, scyqueue: ScyllaQueue, - inp_st: Option>>, - inp_mt: Option>>, - inp_lt: Option>>, + inp_st: Option>, + inp_mt: Option>, + inp_lt: Option>, state: State, buf_st: VecDeque, buf_mt: VecDeque, buf_lt: VecDeque, out: VecDeque, + buf_before: Option, + ts_seen_max: u64, } impl MergeRts { @@ -131,6 +129,8 @@ impl MergeRts { series, scalar_type, shape, + range_mt: range.clone(), + range_lt: range.clone(), range, with_values, scyqueue, @@ -142,79 +142,101 @@ impl MergeRts { buf_mt: VecDeque::new(), buf_lt: VecDeque::new(), out: VecDeque::new(), + buf_before: None, + ts_seen_max: 0, } } fn setup_first_st(&mut self) { + let rt = RetentionTime::Short; + let limbuf = &VecDeque::new(); + let inpdst = &mut self.inp_st; + let range = Self::constrained_range(&self.range, limbuf); + debug!("setup_first_st constrained beg {}", range.beg().ns()); + let tsbeg = range.beg(); let inp = EventsStreamRt::new( - RetentionTime::Short, + rt, self.series.clone(), self.scalar_type.clone(), self.shape.clone(), - self.range.clone(), + range, self.with_values, self.scyqueue.clone(), ); - let inp = NonEmpty::new(inp); - self.inp_st = Some(Box::new(inp)); + let inp = TI::new(inp, tsbeg); + *inpdst = Some(Box::new(inp)); } fn setup_first_mt(&mut self) { + let rt = RetentionTime::Medium; + let limbuf = &self.buf_st; + let inpdst = &mut self.inp_mt; + let range = Self::constrained_range(&self.range_mt, limbuf); + self.range_lt = range.clone(); + debug!("setup_first_mt constrained beg {}", range.beg().ns()); + let tsbeg = range.beg(); let inp = EventsStreamRt::new( - RetentionTime::Medium, + rt, self.series.clone(), self.scalar_type.clone(), self.shape.clone(), - Self::constrained_range(&self.range, &self.buf_st), + range, self.with_values, self.scyqueue.clone(), ); - let inp = NonEmpty::new(inp); - self.inp_mt = Some(Box::new(inp)); + let inp = TI::new(inp, tsbeg); + *inpdst = Some(Box::new(inp)); } fn setup_first_lt(&mut self) { + let rt = RetentionTime::Long; + let limbuf = &self.buf_mt; + let inpdst = &mut self.inp_lt; + let range = Self::constrained_range(&self.range_lt, limbuf); + debug!("setup_first_lt constrained beg {}", range.beg().ns()); + let tsbeg = range.beg(); let inp = EventsStreamRt::new( - RetentionTime::Long, + rt, self.series.clone(), self.scalar_type.clone(), self.shape.clone(), - Self::constrained_range(&self.range, &self.buf_mt), + range, self.with_values, self.scyqueue.clone(), ); - let inp = NonEmpty::new(inp); - self.inp_lt = Some(Box::new(inp)); + let inp = TI::new(inp, tsbeg); + *inpdst = Some(Box::new(inp)); } fn setup_read_st(&mut self) -> ReadEvents { - let stream = unsafe { &mut *(self.inp_st.as_mut().unwrap().as_mut() as *mut NonEmpty) }; + let stream = unsafe { &mut *(self.inp_st.as_mut().unwrap().as_mut() as *mut TI) }; let fut = Box::pin(stream.next()); ReadEvents { fut } } fn setup_read_mt(&mut self) -> ReadEvents { - let stream = unsafe { &mut *(self.inp_mt.as_mut().unwrap().as_mut() as *mut NonEmpty) }; + let stream = unsafe { &mut *(self.inp_mt.as_mut().unwrap().as_mut() as *mut TI) }; let fut = Box::pin(stream.next()); ReadEvents { fut } } fn setup_read_lt(&mut self) -> ReadEvents { - let stream = unsafe { &mut *(self.inp_lt.as_mut().unwrap().as_mut() as *mut NonEmpty) }; + let stream = unsafe { &mut *(self.inp_lt.as_mut().unwrap().as_mut() as *mut TI) }; let fut = Box::pin(stream.next()); ReadEvents { fut } } - fn setup_read_any(inp: &mut Option>>) -> ReadEvents { - let stream = unsafe { &mut *(inp.as_mut().unwrap().as_mut() as *mut NonEmpty) }; + fn setup_read_any(inp: &mut Option>) -> ReadEvents { + let stream = unsafe { &mut *(inp.as_mut().unwrap().as_mut() as *mut TI) }; let fut = Box::pin(stream.next()); ReadEvents { fut } } fn constrained_range(full: &ScyllaSeriesRange, buf: &VecDeque) -> ScyllaSeriesRange { + debug!("constrained_range {:?} {:?}", full, buf.front()); if let Some(e) = buf.front() { if let Some(ts) = e.ts_min() { - let nrange = NanoRange::from((ts, 0)); + let nrange = NanoRange::from((full.beg().ns(), ts)); ScyllaSeriesRange::from(&SeriesRange::from(nrange)) } else { debug!("no ts even though should not have empty buffers"); @@ -225,12 +247,40 @@ impl MergeRts { } } - fn dummy(&mut self) -> bool { - if self.inp_lt.is_some() { - // *fut = Some(self.setup_read_lt()); - true - } else { - false + fn handle_first_st(&mut self, mut before: ChannelEvents, bulk: ChannelEvents) { + Self::move_latest_to_before_buf(&mut before, &mut self.buf_before); + self.buf_st.push_back(bulk); + self.setup_first_mt(); + self.state = State::FetchFirstMt(self.setup_read_mt()); + } + + fn handle_first_mt(&mut self, mut before: ChannelEvents, bulk: ChannelEvents) { + Self::move_latest_to_before_buf(&mut before, &mut self.buf_before); + self.buf_mt.push_back(bulk); + self.setup_first_lt(); + self.state = State::FetchFirstLt(self.setup_read_lt()); + } + + fn handle_first_lt(&mut self, mut before: ChannelEvents, bulk: ChannelEvents) { + Self::move_latest_to_before_buf(&mut before, &mut self.buf_before); + self.buf_lt.push_back(bulk); + let buf = core::mem::replace(&mut self.buf_lt, VecDeque::new()); + self.state = State::ReadingLt(None, buf, self.inp_lt.take()); + } + + fn move_latest_to_before_buf(before: &mut ChannelEvents, buf: &mut Option) { + if buf.is_none() { + *buf = Some(before.new_empty()); + } + let buf = buf.as_mut().unwrap(); + if let Some(tsn) = before.ts_max() { + if let Some(tse) = buf.ts_max() { + if tsn > tse { + let n = before.len(); + buf.clear(); + before.drain_into(buf, (n - 1, n)).unwrap(); + } + } } } } @@ -246,6 +296,33 @@ impl Stream for MergeRts { self.out.push_back(x); } if let Some(item) = self.out.pop_front() { + debug!("emit item {} {:?}", items_0::Events::verify(&item), item); + if items_0::Events::verify(&item) != true { + debug!("{}bad item {:?}", "\n\n--------------------------\n", item); + self.state = State::Done; + } + if let Some(item_min) = item.ts_min() { + if item_min < self.ts_seen_max { + debug!( + "{}ordering error A {} {}", + "\n\n--------------------------\n", item_min, self.ts_seen_max + ); + self.state = State::Done; + break Ready(Some(Err(Error::Logic))); + } + } + if let Some(item_max) = item.ts_max() { + if item_max < self.ts_seen_max { + debug!( + "{}ordering error B {} {}", + "\n\n--------------------------\n", item_max, self.ts_seen_max + ); + self.state = State::Done; + break Ready(Some(Err(Error::Logic))); + } else { + self.ts_seen_max = item_max; + } + } break Ready(Some(Ok(item))); } break match &mut self.state { @@ -255,15 +332,20 @@ impl Stream for MergeRts { continue; } State::FetchFirstSt(st2) => match st2.fut.poll_unpin(cx) { - Ready(Some(Ok(x))) => { - debug!("have first from ST"); - self.buf_st.push_back(x); - self.setup_first_mt(); - self.state = State::FetchFirstMt(self.setup_read_mt()); - continue; - } + Ready(Some(Ok(x))) => match x { + firstbefore::Output::First(before, bulk) => { + debug!("have first from ST"); + self.handle_first_st(before, bulk); + continue; + } + firstbefore::Output::Bulk(_) => { + self.state = State::Done; + let e = Error::Logic; + Ready(Some(Err(e))) + } + }, Ready(Some(Err(e))) => { - self.state = State::Error; + self.state = State::Done; Ready(Some(Err(e.into()))) } Ready(None) => { @@ -276,15 +358,20 @@ impl Stream for MergeRts { Pending => Pending, }, State::FetchFirstMt(st2) => match st2.fut.poll_unpin(cx) { - Ready(Some(Ok(x))) => { - debug!("have first from MT"); - self.buf_mt.push_back(x); - self.setup_first_lt(); - self.state = State::FetchFirstLt(self.setup_read_lt()); - continue; - } + Ready(Some(Ok(x))) => match x { + firstbefore::Output::First(before, bulk) => { + debug!("have first from MT"); + self.handle_first_mt(before, bulk); + continue; + } + firstbefore::Output::Bulk(_) => { + self.state = State::Done; + let e = Error::Logic; + Ready(Some(Err(e))) + } + }, Ready(Some(Err(e))) => { - self.state = State::Error; + self.state = State::Done; Ready(Some(Err(e.into()))) } Ready(None) => { @@ -297,15 +384,20 @@ impl Stream for MergeRts { Pending => Pending, }, State::FetchFirstLt(st2) => match st2.fut.poll_unpin(cx) { - Ready(Some(Ok(x))) => { - debug!("have first from LT"); - self.buf_lt.push_back(x); - let buf = core::mem::replace(&mut self.buf_lt, VecDeque::new()); - self.state = State::ReadingLt(None, buf, self.inp_lt.take()); - continue; - } + Ready(Some(Ok(x))) => match x { + firstbefore::Output::First(before, bulk) => { + debug!("have first from LT"); + self.handle_first_lt(before, bulk); + continue; + } + firstbefore::Output::Bulk(_) => { + self.state = State::Done; + let e = Error::Logic; + Ready(Some(Err(e))) + } + }, Ready(Some(Err(e))) => { - self.state = State::Error; + self.state = State::Done; Ready(Some(Err(e.into()))) } Ready(None) => { @@ -325,17 +417,26 @@ impl Stream for MergeRts { match fut2.fut.poll_unpin(cx) { Ready(Some(Ok(x))) => { *fut = None; - buf.push_back(x); - continue; + match x { + firstbefore::Output::Bulk(x) => { + buf.push_back(x); + continue; + } + firstbefore::Output::First(_, _) => { + self.state = State::Done; + let e = Error::Logic; + Ready(Some(Err(e))) + } + } } Ready(Some(Err(e))) => { *fut = None; - self.state = State::Error; + self.state = State::Done; Ready(Some(Err(e.into()))) } Ready(None) => { *fut = None; - self.inp_lt = None; + *inp = None; continue; } Pending => Pending, @@ -343,9 +444,9 @@ impl Stream for MergeRts { } else if inp.is_some() { let buf = core::mem::replace(buf, VecDeque::new()); self.state = State::ReadingLt(Some(Self::setup_read_any(inp)), buf, inp.take()); - // *fut = Some(self.setup_read_lt()); continue; } else { + debug!("transition ReadingLt to ReadingMt"); let buf = core::mem::replace(&mut self.buf_mt, VecDeque::new()); self.state = State::ReadingMt(None, buf, self.inp_mt.take()); continue; @@ -359,17 +460,26 @@ impl Stream for MergeRts { match fut2.fut.poll_unpin(cx) { Ready(Some(Ok(x))) => { *fut = None; - buf.push_back(x); - continue; + match x { + firstbefore::Output::Bulk(x) => { + buf.push_back(x); + continue; + } + firstbefore::Output::First(_, _) => { + self.state = State::Done; + let e = Error::Logic; + Ready(Some(Err(e))) + } + } } Ready(Some(Err(e))) => { *fut = None; - self.state = State::Error; + self.state = State::Done; Ready(Some(Err(e.into()))) } Ready(None) => { *fut = None; - self.inp_mt = None; + *inp = None; continue; } Pending => Pending, @@ -379,6 +489,7 @@ impl Stream for MergeRts { self.state = State::ReadingMt(Some(Self::setup_read_any(inp)), buf, inp.take()); continue; } else { + debug!("transition ReadingMt to ReadingSt"); let buf = core::mem::replace(&mut self.buf_st, VecDeque::new()); self.state = State::ReadingSt(None, buf, self.inp_st.take()); continue; @@ -392,17 +503,26 @@ impl Stream for MergeRts { match fut2.fut.poll_unpin(cx) { Ready(Some(Ok(x))) => { *fut = None; - buf.push_back(x); - continue; + match x { + firstbefore::Output::Bulk(x) => { + buf.push_back(x); + continue; + } + firstbefore::Output::First(_, _) => { + self.state = State::Done; + let e = Error::Logic; + Ready(Some(Err(e))) + } + } } Ready(Some(Err(e))) => { *fut = None; - self.state = State::Error; + self.state = State::Done; Ready(Some(Err(e.into()))) } Ready(None) => { *fut = None; - self.inp_st = None; + *inp = None; continue; } Pending => Pending, @@ -416,8 +536,24 @@ impl Stream for MergeRts { Ready(None) } } - State::Error => Ready(None), + State::Done => Ready(None), }; } } } + +fn trait_assert(_: T) +where + T: Stream + Unpin + Send, +{ +} + +#[allow(unused)] +fn trait_assert_try() { + let x: MergeRts = phantomval(); + trait_assert(x); +} + +fn phantomval() -> T { + panic!() +} diff --git a/crates/scyllaconn/src/scyllaconn.rs b/crates/scyllaconn/src/scyllaconn.rs index b50e9ec..a5c1763 100644 --- a/crates/scyllaconn/src/scyllaconn.rs +++ b/crates/scyllaconn/src/scyllaconn.rs @@ -10,3 +10,12 @@ pub mod worker; pub use scylla; pub use series::SeriesId; + +pub async fn test_log() { + use netpod::log::*; + error!("------"); + warn!("------"); + info!("------"); + debug!("------"); + trace!("------"); +} diff --git a/crates/taskrun/src/taskrun.rs b/crates/taskrun/src/taskrun.rs index 9698d17..aebe991 100644 --- a/crates/taskrun/src/taskrun.rs +++ b/crates/taskrun/src/taskrun.rs @@ -218,10 +218,19 @@ fn tracing_init_inner(mode: TracingMode) -> Result<(), Error> { // .with_filter(filter_3) .with_filter(filter_2) .with_filter(filter_1) - // .and_then(LogFilterLayer::new("lay1".into())) - // .and_then(LogFilterLayer::new("lay2".into())) - ; + // let fmt_layer = fmt_layer.with_filter(filter_3); + // let fmt_layer: Box> = if std::env::var("RUST_LOG_USE_2").is_ok() { + // let a = fmt_layer.with_filter(filter_2); + // Box::new(a) + // } else { + // let a = fmt_layer; + // Box::new(a) + // }; + // let fmt_layer = fmt_layer.with_filter(filter_1); + // .and_then(LogFilterLayer::new("lay1".into())) + // .and_then(LogFilterLayer::new("lay2".into())) // let layer_2 = LogFilterLayer::new("lay1".into(), fmt_layer); + ; let reg = tracing_subscriber::registry();