diff --git a/crates/daqbuffer/Cargo.toml b/crates/daqbuffer/Cargo.toml index 1f9fd8e..f347371 100644 --- a/crates/daqbuffer/Cargo.toml +++ b/crates/daqbuffer/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqbuffer" -version = "0.5.1-aa.1" +version = "0.5.1-aa.2" authors = ["Dominik Werder "] edition = "2021" diff --git a/crates/daqbuffer/src/bin/daqbuffer.rs b/crates/daqbuffer/src/bin/daqbuffer.rs index b3ce06f..2c846b9 100644 --- a/crates/daqbuffer/src/bin/daqbuffer.rs +++ b/crates/daqbuffer/src/bin/daqbuffer.rs @@ -17,6 +17,7 @@ use netpod::Shape; use taskrun::tokio; use tokio::fs::File; use tokio::io::AsyncReadExt; +use tracing::Instrument; pub fn main() { match taskrun::run(go()) { @@ -55,6 +56,7 @@ fn parse_ts(s: &str) -> Result, Error> { } async fn go() -> Result<(), Error> { + let buildmark = "+0008"; let opts = Opts::parse(); let service_version = ServiceVersion { major: std::env!("CARGO_PKG_VERSION_MAJOR").parse().unwrap_or(0), @@ -64,8 +66,8 @@ async fn go() -> Result<(), Error> { }; match opts.subcmd { SubCmd::Retrieval(subcmd) => { - info!("daqbuffer version {} +0005", clap::crate_version!()); - info!(" service_version {}", service_version); + info!("daqbuffer version {} {} retrieval", clap::crate_version!(), buildmark); + info!(" service_version {} {} retrieval", service_version, buildmark); if false { #[allow(non_snake_case)] let TARGET = std::env!("DAQBUF_TARGET"); @@ -95,7 +97,8 @@ async fn go() -> Result<(), Error> { } } SubCmd::Proxy(subcmd) => { - info!("daqbuffer proxy {}", clap::crate_version!()); + info!("daqbuffer version {} {} proxy", clap::crate_version!(), buildmark); + info!(" service_version {} {} proxy", service_version, buildmark); let mut config_file = File::open(&subcmd.config).await?; let mut buf = Vec::new(); config_file.read_to_end(&mut buf).await?; @@ -143,10 +146,25 @@ async fn go() -> Result<(), Error> { SubCmd::Version => { println!("{}", clap::crate_version!()); } + SubCmd::TestLog => { + test_log().await; + } } Ok(()) } +async fn test_log() { + daqbufp2::test_log().await; + let logspan = tracing::span!(tracing::Level::INFO, "log_span_debug", spanlevel = "info"); + daqbufp2::test_log().instrument(logspan).await; + let logspan = tracing::span!(tracing::Level::INFO, "log_span_debug", spanlevel = "trace"); + daqbufp2::test_log().instrument(logspan).await; + let logspan = tracing::span!(tracing::Level::TRACE, "log_span_trace", spanlevel = "info"); + daqbufp2::test_log().instrument(logspan).await; + let logspan = tracing::span!(tracing::Level::TRACE, "log_span_trace", spanlevel = "trace"); + daqbufp2::test_log().instrument(logspan).await; +} + // TODO test data needs to be generated. // TODO use httpclient for the request: need to add binary POST. //#[test] diff --git a/crates/daqbuffer/src/cli.rs b/crates/daqbuffer/src/cli.rs index c272d5f..53168b9 100644 --- a/crates/daqbuffer/src/cli.rs +++ b/crates/daqbuffer/src/cli.rs @@ -18,6 +18,7 @@ pub enum SubCmd { GenerateTestData, Test, Version, + TestLog, } #[derive(Debug, Parser)] diff --git a/crates/daqbufp2/src/daqbufp2.rs b/crates/daqbufp2/src/daqbufp2.rs index 66d9c23..80b6703 100644 --- a/crates/daqbufp2/src/daqbufp2.rs +++ b/crates/daqbufp2/src/daqbufp2.rs @@ -46,3 +46,13 @@ pub async fn run_proxy(proxy_config: ProxyConfig, service_version: ServiceVersio httpret::proxy::proxy(proxy_config, service_version).await?; Ok(()) } + +pub async fn test_log() { + use netpod::log::*; + error!("------"); + warn!("------"); + info!("------"); + debug!("------"); + trace!("------"); + httpret::test_log().await; +} diff --git a/crates/dbconn/src/dbconn.rs b/crates/dbconn/src/dbconn.rs index 35c4d62..90ac476 100644 --- a/crates/dbconn/src/dbconn.rs +++ b/crates/dbconn/src/dbconn.rs @@ -66,7 +66,7 @@ pub async fn delay_io_medium() { } pub async fn create_connection(db_config: &Database) -> Result<(PgClient, JoinHandle>), Error> { - warn!("create_connection\n\n CREATING CONNECTION\n\n"); + warn!("create_connection\n\n CREATING POSTGRES CONNECTION\n\n"); // TODO use a common already running worker pool for these queries: let d = db_config; let uri = format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, d.port, d.name); diff --git a/crates/dbconn/src/search.rs b/crates/dbconn/src/search.rs index ec64b3e..184e31e 100644 --- a/crates/dbconn/src/search.rs +++ b/crates/dbconn/src/search.rs @@ -13,6 +13,7 @@ use serde_json::Value as JsVal; pub async fn search_channel_databuffer( query: ChannelSearchQuery, + backend: &str, node_config: &NodeConfigCached, ) -> Result { let empty = if !query.name_regex.is_empty() { @@ -33,12 +34,19 @@ pub async fn search_channel_databuffer( " channel_id, channel_name, source_name,", " dtype, shape, unit, description, channel_backend", " from searchext($1, $2, $3, $4)", + " where channel_backend = $5" ); let (pg, _pgjh) = create_connection(&node_config.node_config.cluster.database).await?; let rows = pg .query( sql, - &[&query.name_regex, &query.source_regex, &query.description_regex, &"asc"], + &[ + &query.name_regex, + &query.source_regex, + &query.description_regex, + &"asc", + &backend, + ], ) .await .err_conv()?; @@ -90,14 +98,19 @@ pub async fn search_channel_databuffer( Ok(ret) } -pub async fn search_channel_scylla(query: ChannelSearchQuery, pgconf: &Database) -> Result { +pub async fn search_channel_scylla( + query: ChannelSearchQuery, + backend: &str, + pgconf: &Database, +) -> Result { let empty = if !query.name_regex.is_empty() { false } else { true }; if empty { let ret = ChannelSearchResult { channels: Vec::new() }; return Ok(ret); } let ch_kind: i16 = if query.channel_status { 1 } else { 2 }; - let (cb1, cb2) = if let Some(x) = &query.backend { + let tmp_backend = Some(backend.to_string()); + let (cb1, cb2) = if let Some(x) = &tmp_backend { (false, x.as_str()) } else { (true, "") @@ -266,19 +279,17 @@ async fn search_channel_archeng( Ok(ret) } -pub async fn search_channel( - query: ChannelSearchQuery, - node_config: &NodeConfigCached, -) -> Result { - let pgconf = &node_config.node_config.cluster.database; - if let Some(_scyconf) = node_config.node_config.cluster.scylla_st() { - search_channel_scylla(query, pgconf).await - } else if let Some(conf) = node_config.node.channel_archiver.as_ref() { - search_channel_archeng(query, node_config.node_config.cluster.backend.clone(), conf, pgconf).await - } else if let Some(_conf) = node_config.node.archiver_appliance.as_ref() { +pub async fn search_channel(query: ChannelSearchQuery, ncc: &NodeConfigCached) -> Result { + let backend = &ncc.node_config.cluster.backend; + let pgconf = &ncc.node_config.cluster.database; + if let Some(_scyconf) = ncc.node_config.cluster.scylla_st() { + search_channel_scylla(query, backend, pgconf).await + } else if let Some(conf) = ncc.node.channel_archiver.as_ref() { + search_channel_archeng(query, backend.clone(), conf, pgconf).await + } else if let Some(_conf) = ncc.node.archiver_appliance.as_ref() { // TODO err::todoval() } else { - search_channel_databuffer(query, node_config).await + search_channel_databuffer(query, backend, ncc).await } } diff --git a/crates/httpret/src/api4/eventdata.rs b/crates/httpret/src/api4/eventdata.rs index ba6fde0..8f6c35c 100644 --- a/crates/httpret/src/api4/eventdata.rs +++ b/crates/httpret/src/api4/eventdata.rs @@ -6,6 +6,7 @@ use err::thiserror; use err::PublicError; use err::ThisError; use err::ToPublicError; +use futures_util::Stream; use http::Method; use http::StatusCode; use httpclient::body_empty; @@ -16,7 +17,7 @@ use httpclient::StreamResponse; use netpod::log::*; use netpod::NodeConfigCached; use std::sync::Arc; -use tracing::Instrument; +use streams::instrument::InstrumentStream; #[derive(Debug, ThisError)] pub enum EventDataError { @@ -85,9 +86,8 @@ impl EventDataHandler { .await .map_err(|_| EventDataError::InternalError)?; let (evsubq,) = nodenet::conn::events_parse_input_query(frames).map_err(|_| EventDataError::QueryParse)?; - let logspan = if false { - tracing::Span::none() - } else if evsubq.log_level() == "trace" { + info!("{:?}", evsubq); + let logspan = if evsubq.log_level() == "trace" { trace!("enable trace for handler"); tracing::span!(tracing::Level::INFO, "log_span_trace") } else if evsubq.log_level() == "debug" { @@ -96,10 +96,12 @@ impl EventDataHandler { } else { tracing::Span::none() }; + use tracing::Instrument; let stream = nodenet::conn::create_response_bytes_stream(evsubq, shared_res.scyqueue.as_ref(), ncc) - .instrument(logspan) + .instrument(logspan.clone()) .await .map_err(|e| EventDataError::Error(Box::new(e)))?; + let stream = InstrumentStream::new(stream, logspan); let ret = response(StatusCode::OK) .body(body_stream(stream)) .map_err(|_| EventDataError::InternalError)?; diff --git a/crates/httpret/src/api4/events.rs b/crates/httpret/src/api4/events.rs index 46c3c13..7756b33 100644 --- a/crates/httpret/src/api4/events.rs +++ b/crates/httpret/src/api4/events.rs @@ -29,6 +29,7 @@ use netpod::NodeConfigCached; use netpod::ReqCtx; use nodenet::client::OpenBoxedBytesViaHttp; use query::api4::events::PlainEventsQuery; +use streams::instrument::InstrumentStream; use tracing::Instrument; pub struct EventsHandler {} @@ -57,9 +58,7 @@ impl EventsHandler { let evq = PlainEventsQuery::from_url(&url).map_err(|e| e.add_public_msg(format!("Can not understand query")))?; debug!("{self_name} evq {evq:?}"); - let logspan = if false { - tracing::Span::none() - } else if evq.log_level() == "trace" { + let logspan = if evq.log_level() == "trace" { trace!("enable trace for handler"); tracing::span!(tracing::Level::INFO, "log_span_trace") } else if evq.log_level() == "debug" { @@ -134,6 +133,16 @@ async fn plain_events_cbor_framed( } }) .filter(|x| if let Ok(x) = x { ready(x.len() > 0) } else { ready(true) }); + let logspan = if evq.log_level() == "trace" { + trace!("enable trace for handler"); + tracing::span!(tracing::Level::INFO, "log_span_trace") + } else if evq.log_level() == "debug" { + debug!("enable debug for handler"); + tracing::span!(tracing::Level::INFO, "log_span_debug") + } else { + tracing::Span::none() + }; + let stream = InstrumentStream::new(stream, logspan); let ret = response(StatusCode::OK).body(body_stream(stream))?; Ok(ret) } diff --git a/crates/httpret/src/channelconfig.rs b/crates/httpret/src/channelconfig.rs index 5e0a973..ee95bef 100644 --- a/crates/httpret/src/channelconfig.rs +++ b/crates/httpret/src/channelconfig.rs @@ -639,7 +639,7 @@ impl ScyllaSeriesTsMsp { let mut st_ts_msp_ms = Vec::new(); let mut msp_stream = - scyllaconn::events2::msp::MspStream::new(RetentionTime::Short, sid, (&q.range).into(), scyqueue.clone()); + scyllaconn::events2::msp::MspStreamRt::new(RetentionTime::Short, sid, (&q.range).into(), scyqueue.clone()); use chrono::TimeZone; while let Some(x) = msp_stream.next().await { let v = x.unwrap().ms(); @@ -650,7 +650,7 @@ impl ScyllaSeriesTsMsp { let mut mt_ts_msp_ms = Vec::new(); let mut msp_stream = - scyllaconn::events2::msp::MspStream::new(RetentionTime::Medium, sid, (&q.range).into(), scyqueue.clone()); + scyllaconn::events2::msp::MspStreamRt::new(RetentionTime::Medium, sid, (&q.range).into(), scyqueue.clone()); while let Some(x) = msp_stream.next().await { let v = x.unwrap().ms(); let st = chrono::Utc.timestamp_millis_opt(v as _).earliest().unwrap(); @@ -660,7 +660,7 @@ impl ScyllaSeriesTsMsp { let mut lt_ts_msp_ms = Vec::new(); let mut msp_stream = - scyllaconn::events2::msp::MspStream::new(RetentionTime::Long, sid, (&q.range).into(), scyqueue.clone()); + scyllaconn::events2::msp::MspStreamRt::new(RetentionTime::Long, sid, (&q.range).into(), scyqueue.clone()); while let Some(x) = msp_stream.next().await { let v = x.unwrap().ms(); let st = chrono::Utc.timestamp_millis_opt(v as _).earliest().unwrap(); diff --git a/crates/httpret/src/httpret.rs b/crates/httpret/src/httpret.rs index 78bcb4a..cb69535 100644 --- a/crates/httpret/src/httpret.rs +++ b/crates/httpret/src/httpret.rs @@ -522,3 +522,12 @@ async fn clear_cache_all( .body(body_string(serde_json::to_string(&res)?))?; Ok(ret) } + +pub async fn test_log() { + error!("------"); + warn!("------"); + info!("------"); + debug!("------"); + trace!("------"); + scyllaconn::test_log().await; +} diff --git a/crates/httpret/src/proxy/api4.rs b/crates/httpret/src/proxy/api4.rs index 8d5244c..297b700 100644 --- a/crates/httpret/src/proxy/api4.rs +++ b/crates/httpret/src/proxy/api4.rs @@ -74,11 +74,12 @@ pub async fn channel_search(req: Requ, ctx: &ReqCtx, proxy_config: &ProxyConfig) let res: ChannelSearchResult = match serde_json::from_slice(&body) { Ok(k) => k, Err(_) => { - let msg = format!("can not parse result: {}", String::from_utf8_lossy(&body)); + let msg = format!("can not parse result tag {} {}", tag, String::from_utf8_lossy(&body)); error!("{}", msg); return Err(Error::with_msg_no_trace(msg)); } }; + info!("from {} len {}", tag, res.channels.len()); let ret = SubRes { tag, status: StatusCode::OK, diff --git a/crates/items_0/src/items_0.rs b/crates/items_0/src/items_0.rs index a10788a..c495a1d 100644 --- a/crates/items_0/src/items_0.rs +++ b/crates/items_0/src/items_0.rs @@ -110,6 +110,8 @@ impl fmt::Display for MergeError { } } +impl std::error::Error for MergeError {} + // TODO can I remove the Any bound? /// Container of some form of events, for use as trait object. @@ -137,7 +139,7 @@ pub trait Events: // TODO is this used? fn take_new_events_until_ts(&mut self, ts_end: u64) -> Box; fn new_empty_evs(&self) -> Box; - fn drain_into_evs(&mut self, dst: &mut Box, range: (usize, usize)) -> Result<(), MergeError>; + fn drain_into_evs(&mut self, dst: &mut dyn Events, range: (usize, usize)) -> Result<(), MergeError>; fn find_lowest_index_gt_evs(&self, ts: u64) -> Option; fn find_lowest_index_ge_evs(&self, ts: u64) -> Option; fn find_highest_index_lt_evs(&self, ts: u64) -> Option; @@ -151,6 +153,7 @@ pub trait Events: fn to_min_max_avg(&mut self) -> Box; fn to_json_vec_u8(&self) -> Vec; fn to_cbor_vec_u8(&self) -> Vec; + fn clear(&mut self); } impl WithLen for Box { @@ -222,7 +225,7 @@ impl Events for Box { Events::new_empty_evs(self.as_ref()) } - fn drain_into_evs(&mut self, dst: &mut Box, range: (usize, usize)) -> Result<(), MergeError> { + fn drain_into_evs(&mut self, dst: &mut dyn Events, range: (usize, usize)) -> Result<(), MergeError> { Events::drain_into_evs(self.as_mut(), dst, range) } @@ -277,4 +280,8 @@ impl Events for Box { fn to_cbor_vec_u8(&self) -> Vec { Events::to_cbor_vec_u8(self.as_ref()) } + + fn clear(&mut self) { + Events::clear(self.as_mut()) + } } diff --git a/crates/items_2/src/channelevents.rs b/crates/items_2/src/channelevents.rs index f918917..fe16a6e 100644 --- a/crates/items_2/src/channelevents.rs +++ b/crates/items_2/src/channelevents.rs @@ -154,6 +154,15 @@ pub enum ChannelEvents { Status(Option), } +impl ChannelEvents { + pub fn is_events(&self) -> bool { + match self { + ChannelEvents::Events(_) => true, + ChannelEvents::Status(_) => false, + } + } +} + impl TypeName for ChannelEvents { fn type_name(&self) -> String { any::type_name::().into() @@ -702,6 +711,17 @@ impl Mergeable for ChannelEvents { } } + fn clear(&mut self) { + match self { + ChannelEvents::Events(x) => { + Mergeable::clear(x); + } + ChannelEvents::Status(x) => { + *x = None; + } + } + } + fn drain_into(&mut self, dst: &mut Self, range: (usize, usize)) -> Result<(), MergeError> { match self { ChannelEvents::Events(k) => match dst { @@ -829,7 +849,10 @@ impl Events for ChannelEvents { } fn verify(&self) -> bool { - todo!() + match self { + ChannelEvents::Events(x) => Events::verify(x), + ChannelEvents::Status(_) => panic!(), + } } fn output_info(&self) -> String { @@ -861,11 +884,26 @@ impl Events for ChannelEvents { } fn new_empty_evs(&self) -> Box { - todo!() + match self { + ChannelEvents::Events(x) => Events::new_empty_evs(x), + ChannelEvents::Status(_) => panic!(), + } } - fn drain_into_evs(&mut self, dst: &mut Box, range: (usize, usize)) -> Result<(), MergeError> { - todo!() + fn drain_into_evs(&mut self, dst: &mut dyn Events, range: (usize, usize)) -> Result<(), MergeError> { + let dst2 = if let Some(x) = dst.as_any_mut().downcast_mut::() { + // debug!("unwrapped dst ChannelEvents as well"); + x + } else { + panic!("dst is not ChannelEvents"); + }; + match self { + ChannelEvents::Events(k) => match dst2 { + ChannelEvents::Events(j) => Events::drain_into_evs(k, j, range), + ChannelEvents::Status(_) => panic!("dst is not events"), + }, + ChannelEvents::Status(_) => panic!("self is not events"), + } } fn find_lowest_index_gt_evs(&self, ts: u64) -> Option { @@ -896,11 +934,14 @@ impl Events for ChannelEvents { todo!() } - fn tss(&self) -> &std::collections::VecDeque { - todo!() + fn tss(&self) -> &VecDeque { + match self { + ChannelEvents::Events(x) => Events::tss(x), + ChannelEvents::Status(_) => panic!(), + } } - fn pulses(&self) -> &std::collections::VecDeque { + fn pulses(&self) -> &VecDeque { todo!() } @@ -934,6 +975,15 @@ impl Events for ChannelEvents { } } } + + fn clear(&mut self) { + match self { + ChannelEvents::Events(x) => Events::clear(x.as_mut()), + ChannelEvents::Status(x) => { + *x = None; + } + } + } } impl Collectable for ChannelEvents { diff --git a/crates/items_2/src/eventfull.rs b/crates/items_2/src/eventfull.rs index 5fce3de..fbcb173 100644 --- a/crates/items_2/src/eventfull.rs +++ b/crates/items_2/src/eventfull.rs @@ -201,6 +201,17 @@ impl Mergeable for EventFull { Empty::empty() } + fn clear(&mut self) { + self.tss.clear(); + self.pulses.clear(); + self.blobs.clear(); + self.scalar_types.clear(); + self.be.clear(); + self.shapes.clear(); + self.comps.clear(); + self.entry_payload_max = 0; + } + fn drain_into(&mut self, dst: &mut Self, range: (usize, usize)) -> Result<(), MergeError> { // TODO make it harder to forget new members when the struct may get modified in the future let r = range.0..range.1; diff --git a/crates/items_2/src/eventsdim0.rs b/crates/items_2/src/eventsdim0.rs index dba7ea3..0b95187 100644 --- a/crates/items_2/src/eventsdim0.rs +++ b/crates/items_2/src/eventsdim0.rs @@ -74,6 +74,23 @@ macro_rules! trace2 { ($($arg:tt)*) => { trace!($($arg)*); }; } +#[derive(Clone, PartialEq, Serialize, Deserialize)] +pub struct EventsDim0NoPulse { + pub tss: VecDeque, + pub values: VecDeque, +} + +impl From> for EventsDim0 { + fn from(value: EventsDim0NoPulse) -> Self { + let pulses = vec![0; value.tss.len()].into(); + Self { + tss: value.tss, + pulses, + values: value.values, + } + } +} + #[derive(Clone, PartialEq, Serialize, Deserialize)] pub struct EventsDim0 { pub tss: VecDeque, @@ -859,9 +876,9 @@ impl Events for EventsDim0 { Box::new(Self::empty()) } - fn drain_into_evs(&mut self, dst: &mut Box, range: (usize, usize)) -> Result<(), MergeError> { + fn drain_into_evs(&mut self, dst: &mut dyn Events, range: (usize, usize)) -> Result<(), MergeError> { // TODO as_any and as_any_mut are declared on unrelated traits. Simplify. - if let Some(dst) = dst.as_mut().as_any_mut().downcast_mut::() { + if let Some(dst) = dst.as_any_mut().downcast_mut::() { // TODO make it harder to forget new members when the struct may get modified in the future let r = range.0..range.1; dst.tss.extend(self.tss.drain(r.clone())); @@ -869,7 +886,12 @@ impl Events for EventsDim0 { dst.values.extend(self.values.drain(r.clone())); Ok(()) } else { - error!("downcast to EventsDim0 FAILED"); + error!( + "downcast to EventsDim0 FAILED\n\n{}\n\n{}\n\n", + self.type_name(), + dst.type_name() + ); + panic!(); Err(MergeError::NotCompatible) } } @@ -989,6 +1011,12 @@ impl Events for EventsDim0 { ciborium::into_writer(&ret, &mut buf).unwrap(); buf } + + fn clear(&mut self) { + self.tss.clear(); + self.pulses.clear(); + self.values.clear(); + } } #[derive(Debug)] diff --git a/crates/items_2/src/eventsdim1.rs b/crates/items_2/src/eventsdim1.rs index 33250b2..9e959e7 100644 --- a/crates/items_2/src/eventsdim1.rs +++ b/crates/items_2/src/eventsdim1.rs @@ -47,6 +47,23 @@ macro_rules! trace2 { ($($arg:tt)*) => (trace!($($arg)*)); } +#[derive(Clone, PartialEq, Serialize, Deserialize)] +pub struct EventsDim1NoPulse { + pub tss: VecDeque, + pub values: VecDeque>, +} + +impl From> for EventsDim1 { + fn from(value: EventsDim1NoPulse) -> Self { + let pulses = vec![0; value.tss.len()].into(); + Self { + tss: value.tss, + pulses, + values: value.values, + } + } +} + #[derive(Clone, PartialEq, Serialize, Deserialize)] pub struct EventsDim1 { pub tss: VecDeque, @@ -813,9 +830,9 @@ impl Events for EventsDim1 { Box::new(Self::empty()) } - fn drain_into_evs(&mut self, dst: &mut Box, range: (usize, usize)) -> Result<(), MergeError> { + fn drain_into_evs(&mut self, dst: &mut dyn Events, range: (usize, usize)) -> Result<(), MergeError> { // TODO as_any and as_any_mut are declared on unrelated traits. Simplify. - if let Some(dst) = dst.as_mut().as_any_mut().downcast_mut::() { + if let Some(dst) = dst.as_any_mut().downcast_mut::() { // TODO make it harder to forget new members when the struct may get modified in the future let r = range.0..range.1; dst.tss.extend(self.tss.drain(r.clone())); @@ -949,6 +966,12 @@ impl Events for EventsDim1 { ciborium::into_writer(&ret, &mut buf).unwrap(); buf } + + fn clear(&mut self) { + self.tss.clear(); + self.pulses.clear(); + self.values.clear(); + } } #[derive(Debug)] diff --git a/crates/items_2/src/eventsxbindim0.rs b/crates/items_2/src/eventsxbindim0.rs index 62102d9..b8e004d 100644 --- a/crates/items_2/src/eventsxbindim0.rs +++ b/crates/items_2/src/eventsxbindim0.rs @@ -261,9 +261,9 @@ impl Events for EventsXbinDim0 { Box::new(Self::empty()) } - fn drain_into_evs(&mut self, dst: &mut Box, range: (usize, usize)) -> Result<(), MergeError> { + fn drain_into_evs(&mut self, dst: &mut dyn Events, range: (usize, usize)) -> Result<(), MergeError> { // TODO as_any and as_any_mut are declared on unrelated traits. Simplify. - if let Some(dst) = dst.as_mut().as_any_mut().downcast_mut::() { + if let Some(dst) = dst.as_any_mut().downcast_mut::() { // TODO make it harder to forget new members when the struct may get modified in the future let r = range.0..range.1; dst.tss.extend(self.tss.drain(r.clone())); @@ -365,6 +365,14 @@ impl Events for EventsXbinDim0 { fn to_cbor_vec_u8(&self) -> Vec { todo!() } + + fn clear(&mut self) { + self.tss.clear(); + self.pulses.clear(); + self.mins.clear(); + self.maxs.clear(); + self.avgs.clear(); + } } #[derive(Debug)] diff --git a/crates/items_2/src/items_2.rs b/crates/items_2/src/items_2.rs index 3c31de9..f379953 100644 --- a/crates/items_2/src/items_2.rs +++ b/crates/items_2/src/items_2.rs @@ -170,6 +170,10 @@ impl Mergeable for Box { self.as_ref().new_empty_evs() } + fn clear(&mut self) { + Events::clear(self.as_mut()) + } + fn drain_into(&mut self, dst: &mut Self, range: (usize, usize)) -> Result<(), MergeError> { self.as_mut().drain_into_evs(dst, range) } diff --git a/crates/items_2/src/merger.rs b/crates/items_2/src/merger.rs index e144d12..c9ae57b 100644 --- a/crates/items_2/src/merger.rs +++ b/crates/items_2/src/merger.rs @@ -47,6 +47,7 @@ pub trait Mergeable: fmt::Debug + WithLen + ByteEstimate + Unpin { fn ts_min(&self) -> Option; fn ts_max(&self) -> Option; fn new_empty(&self) -> Self; + fn clear(&mut self); // TODO when MergeError::Full gets returned, any guarantees about what has been modified or kept unchanged? fn drain_into(&mut self, dst: &mut Self, range: (usize, usize)) -> Result<(), MergeError>; fn find_lowest_index_gt(&self, ts: u64) -> Option; diff --git a/crates/netpod/Cargo.toml b/crates/netpod/Cargo.toml index 3a5b286..41117e1 100644 --- a/crates/netpod/Cargo.toml +++ b/crates/netpod/Cargo.toml @@ -11,6 +11,7 @@ path = "src/netpod.rs" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" http = "1.0.0" +humantime = "2.1.0" humantime-serde = "1.1.1" async-channel = "1.8.0" bytes = "1.4.0" diff --git a/crates/netpod/src/netpod.rs b/crates/netpod/src/netpod.rs index 9abcfa9..8d01317 100644 --- a/crates/netpod/src/netpod.rs +++ b/crates/netpod/src/netpod.rs @@ -226,7 +226,7 @@ impl<'de> serde::de::Visitor<'de> for ScalarTypeVis { "bool" => ScalarType::BOOL, "string" => ScalarType::STRING, "enum" => ScalarType::Enum, - "channelstatus" => ScalarType::ChannelStatus, + "ChannelStatus" => ScalarType::ChannelStatus, k => return Err(E::custom(format!("can not understand variant {k:?}"))), }; Ok(ret) diff --git a/crates/netpod/src/query.rs b/crates/netpod/src/query.rs index 09126f5..74695eb 100644 --- a/crates/netpod/src/query.rs +++ b/crates/netpod/src/query.rs @@ -85,6 +85,20 @@ pub struct TimeRangeQuery { range: NanoRange, } +fn parse_time(v: &str) -> Result, Error> { + if let Ok(x) = v.parse() { + Ok(x) + } else { + if v.ends_with("ago") { + let d = humantime::parse_duration(&v[..v.len() - 3]) + .map_err(|_| Error::with_public_msg_no_trace(format!("can not parse {v}")))?; + Ok(Utc::now() - d) + } else { + Err(Error::with_public_msg_no_trace(format!("can not parse {v}"))) + } + } +} + impl FromUrl for TimeRangeQuery { fn from_url(url: &Url) -> Result { let pairs = get_url_query_pairs(url); @@ -95,8 +109,8 @@ impl FromUrl for TimeRangeQuery { if let (Some(beg), Some(end)) = (pairs.get("begDate"), pairs.get("endDate")) { let ret = Self { range: NanoRange { - beg: beg.parse::>()?.to_nanos(), - end: end.parse::>()?.to_nanos(), + beg: parse_time(beg)?.to_nanos(), + end: parse_time(end)?.to_nanos(), }, }; Ok(ret) diff --git a/crates/netpod/src/range/evrange.rs b/crates/netpod/src/range/evrange.rs index a6e6b8c..cfab2c3 100644 --- a/crates/netpod/src/range/evrange.rs +++ b/crates/netpod/src/range/evrange.rs @@ -77,6 +77,15 @@ impl NanoRange { } } +impl From<(u64, u64)> for NanoRange { + fn from(value: (u64, u64)) -> Self { + Self { + beg: value.0, + end: value.1, + } + } +} + impl TryFrom<&SeriesRange> for NanoRange { type Error = Error; diff --git a/crates/netpod/src/ttl.rs b/crates/netpod/src/ttl.rs index f07bbbd..f715f72 100644 --- a/crates/netpod/src/ttl.rs +++ b/crates/netpod/src/ttl.rs @@ -1,6 +1,12 @@ +use core::fmt; +use err::thiserror; +use err::ThisError; +use serde::Deserialize; +use serde::Serialize; +use std::str::FromStr; use std::time::Duration; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub enum RetentionTime { Short, Medium, @@ -59,3 +65,43 @@ impl RetentionTime { self.ttl_events_d0() } } + +impl fmt::Display for RetentionTime { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + let s = match self { + RetentionTime::Short => "short", + RetentionTime::Medium => "medium", + RetentionTime::Long => "long", + }; + fmt.write_str(s) + } +} + +#[derive(Debug, ThisError)] +pub enum Error { + Parse, +} + +impl FromStr for RetentionTime { + type Err = Error; + + fn from_str(s: &str) -> Result { + let ret = match s { + "short" => Self::Short, + "medium" => Self::Medium, + "long" => Self::Long, + _ => return Err(Error::Parse), + }; + Ok(ret) + } +} + +// impl ToString for RetentionTime { +// fn to_string(&self) -> String { +// match self { +// RetentionTime::Short => "short".into(), +// RetentionTime::Medium => "medium".into(), +// RetentionTime::Long => "long".into(), +// } +// } +// } diff --git a/crates/nodenet/src/conn.rs b/crates/nodenet/src/conn.rs index eb5c4d4..dda1cc3 100644 --- a/crates/nodenet/src/conn.rs +++ b/crates/nodenet/src/conn.rs @@ -118,7 +118,7 @@ pub async fn create_response_bytes_stream( scyqueue: Option<&ScyllaQueue>, ncc: &NodeConfigCached, ) -> Result { - debug!( + info!( "create_response_bytes_stream {:?} {:?}", evq.ch_conf().scalar_type(), evq.ch_conf().shape(), @@ -275,6 +275,7 @@ pub fn events_parse_input_query(frames: Vec) -> Result<(EventsSub }, Err(e) => return Err(e), }; + info!("parsing json {:?}", qitem.str()); let frame1: Frame1Parts = serde_json::from_str(&qitem.str()).map_err(|e| { let e = Error::with_msg_no_trace(format!("json parse error: {} inp {}", e, qitem.str())); error!("{e}"); diff --git a/crates/nodenet/src/scylla.rs b/crates/nodenet/src/scylla.rs index 1044c86..5e0c14a 100644 --- a/crates/nodenet/src/scylla.rs +++ b/crates/nodenet/src/scylla.rs @@ -1,15 +1,16 @@ use err::Error; use futures_util::Stream; use futures_util::StreamExt; +use futures_util::TryStreamExt; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; use items_2::channelevents::ChannelEvents; use netpod::log::*; -use netpod::ttl::RetentionTime; use netpod::ChConf; use query::api4::events::EventsSubQuery; use scyllaconn::worker::ScyllaQueue; +use scyllaconn::SeriesId; use std::pin::Pin; use taskrun::tokio; @@ -22,23 +23,34 @@ pub async fn scylla_channel_event_stream( // TODO why both in PlainEventsQuery and as separate parameter? Check other usages. // let do_one_before_range = evq.need_one_before_range(); let do_one_before_range = false; - let series = chconf.series(); + let series = SeriesId::new(chconf.series()); let scalar_type = chconf.scalar_type(); let shape = chconf.shape(); let do_test_stream_error = false; let with_values = evq.need_value_data(); - debug!("\n\nmake EventsStreamScylla {series:?} {scalar_type:?} {shape:?}\n"); - let stream = scyllaconn::events::EventsStreamScylla::new( - RetentionTime::Short, - series, - evq.range().into(), - do_one_before_range, - scalar_type.clone(), - shape.clone(), - with_values, - scyqueue.clone(), - do_test_stream_error, - ); + let stream: Pin + Send>> = if let Some(rt) = evq.use_rt() { + let x = scyllaconn::events2::events::EventsStreamRt::new( + rt, + series, + scalar_type.clone(), + shape.clone(), + evq.range().into(), + with_values, + scyqueue.clone(), + ) + .map_err(|e| scyllaconn::events2::mergert::Error::from(e)); + Box::pin(x) + } else { + let x = scyllaconn::events2::mergert::MergeRts::new( + series, + scalar_type.clone(), + shape.clone(), + evq.range().into(), + with_values, + scyqueue.clone(), + ); + Box::pin(x) + }; let stream = stream .map(move |item| match &item { Ok(k) => match k { @@ -72,7 +84,7 @@ pub async fn scylla_channel_event_stream( item } }, - Err(e) => Err(Error::with_msg_no_trace(format!("scyllaconn eevents error {e}"))), + Err(e) => Err(Error::with_msg_no_trace(format!("scyllaconn events error {e}"))), }; item }); diff --git a/crates/query/Cargo.toml b/crates/query/Cargo.toml index 3dca474..f89a577 100644 --- a/crates/query/Cargo.toml +++ b/crates/query/Cargo.toml @@ -10,6 +10,7 @@ serde_json = "1.0" tracing = "0.1" chrono = { version = "0.4.19", features = ["serde"] } url = "2.2" +humantime = "2.1.0" humantime-serde = "1.1.1" err = { path = "../err" } netpod = { path = "../netpod" } diff --git a/crates/query/src/api4.rs b/crates/query/src/api4.rs index 6f5c6b9..4d00698 100644 --- a/crates/query/src/api4.rs +++ b/crates/query/src/api4.rs @@ -6,6 +6,7 @@ use chrono::TimeZone; use chrono::Utc; use err::Error; use netpod::get_url_query_pairs; +use netpod::log::*; use netpod::range::evrange::SeriesRange; use netpod::AppendToUrl; use netpod::FromUrl; @@ -124,7 +125,13 @@ impl FromUrl for AccountingToplistQuery { let v = pairs .get("tsDate") .ok_or(Error::with_public_msg_no_trace("missing tsDate"))?; - let w = v.parse::>()?; + let mut w = v.parse::>(); + if w.is_err() && v.ends_with("ago") { + let d = humantime::parse_duration(&v[..v.len() - 3]) + .map_err(|_| Error::with_public_msg_no_trace(format!("can not parse {v}")))?; + w = Ok(Utc::now() - d); + } + let w = w?; Ok::<_, Error>(TsNano::from_ns(w.to_nanos())) }; let ret = Self { diff --git a/crates/query/src/api4/events.rs b/crates/query/src/api4/events.rs index c9e5eab..eda148c 100644 --- a/crates/query/src/api4/events.rs +++ b/crates/query/src/api4/events.rs @@ -7,6 +7,7 @@ use netpod::query::api1::Api1Query; use netpod::query::PulseRangeQuery; use netpod::query::TimeRangeQuery; use netpod::range::evrange::SeriesRange; +use netpod::ttl::RetentionTime; use netpod::AppendToUrl; use netpod::ByteSize; use netpod::ChannelTypeConfigGen; @@ -56,6 +57,8 @@ pub struct PlainEventsQuery { create_errors: Vec, #[serde(default)] log_level: String, + #[serde(default)] + use_rt: Option, } impl PlainEventsQuery { @@ -81,6 +84,7 @@ impl PlainEventsQuery { merger_out_len_max: None, create_errors: Vec::new(), log_level: String::new(), + use_rt: None, } } @@ -206,6 +210,10 @@ impl PlainEventsQuery { pub fn log_level(&self) -> &str { &self.log_level } + + pub fn use_rt(&self) -> Option { + self.use_rt.clone() + } } impl HasBackend for PlainEventsQuery { @@ -283,6 +291,11 @@ impl FromUrl for PlainEventsQuery { .map(|x| x.split(",").map(|x| x.to_string()).collect()) .unwrap_or(Vec::new()), log_level: pairs.get("log_level").map_or(String::new(), String::from), + use_rt: pairs.get("useRt").map_or(Ok(None), |k| { + k.parse() + .map(Some) + .map_err(|_| Error::with_public_msg_no_trace(format!("can not parse useRt: {}", k))) + })?, }; Ok(ret) } @@ -342,6 +355,9 @@ impl AppendToUrl for PlainEventsQuery { if self.log_level.len() != 0 { g.append_pair("log_level", &self.log_level); } + if let Some(x) = self.use_rt.as_ref() { + g.append_pair("useRt", &x.to_string()); + } } } @@ -385,6 +401,7 @@ pub struct EventsSubQuerySettings { buf_len_disk_io: Option, queue_len_disk_io: Option, create_errors: Vec, + use_rt: Option, } impl Default for EventsSubQuerySettings { @@ -398,6 +415,7 @@ impl Default for EventsSubQuerySettings { buf_len_disk_io: None, queue_len_disk_io: None, create_errors: Vec::new(), + use_rt: None, } } } @@ -414,6 +432,7 @@ impl From<&PlainEventsQuery> for EventsSubQuerySettings { // TODO add to query queue_len_disk_io: None, create_errors: value.create_errors.clone(), + use_rt: value.use_rt(), } } } @@ -431,6 +450,7 @@ impl From<&BinnedQuery> for EventsSubQuerySettings { // TODO add to query queue_len_disk_io: None, create_errors: Vec::new(), + use_rt: None, } } } @@ -448,6 +468,7 @@ impl From<&Api1Query> for EventsSubQuerySettings { buf_len_disk_io: Some(disk_io_tune.read_buffer_len), queue_len_disk_io: Some(disk_io_tune.read_queue_len), create_errors: Vec::new(), + use_rt: None, } } } @@ -551,6 +572,10 @@ impl EventsSubQuery { pub fn log_level(&self) -> &str { &self.log_level } + + pub fn use_rt(&self) -> Option { + self.settings.use_rt.clone() + } } #[derive(Debug, Serialize, Deserialize)] diff --git a/crates/scyllaconn/src/conn.rs b/crates/scyllaconn/src/conn.rs index e57011f..8e1b31c 100644 --- a/crates/scyllaconn/src/conn.rs +++ b/crates/scyllaconn/src/conn.rs @@ -1,5 +1,6 @@ use crate::errconv::ErrConv; use err::Error; +use netpod::log::*; use netpod::ScyllaConfig; use scylla::execution_profile::ExecutionProfileBuilder; use scylla::statement::Consistency; @@ -16,6 +17,7 @@ pub async fn create_scy_session(scyconf: &ScyllaConfig) -> Result Result { + warn!("create_connection\n\n CREATING SCYLLA CONNECTION\n\n"); let scy = scylla::SessionBuilder::new() .known_nodes(&scyconf.hosts) .default_execution_profile_handle( diff --git a/crates/scyllaconn/src/events.rs b/crates/scyllaconn/src/events.rs index 89dc3fb..b6bb760 100644 --- a/crates/scyllaconn/src/events.rs +++ b/crates/scyllaconn/src/events.rs @@ -1,4 +1,4 @@ -use crate::errconv::ErrConv; +use crate::events2::prepare::StmtsEvents; use crate::range::ScyllaSeriesRange; use crate::worker::ScyllaQueue; use err::thiserror; @@ -23,9 +23,8 @@ use netpod::Shape; use netpod::TsMs; use netpod::TsNano; use scylla::frame::response::result::Row; -use scylla::prepared_statement::PreparedStatement; use scylla::Session; -use scylla::Session as ScySession; +use series::SeriesId; use std::collections::VecDeque; use std::mem; use std::pin::Pin; @@ -35,6 +34,7 @@ use std::task::Poll; #[derive(Debug, ThisError)] pub enum Error { + Prepare(#[from] crate::events2::prepare::Error), ScyllaQuery(#[from] scylla::transport::errors::QueryError), ScyllaNextRow(#[from] scylla::transport::iterator::NextRowError), ScyllaTypeConv(#[from] scylla::cql_to_rust::FromRowError), @@ -51,268 +51,7 @@ impl From for Error { } } -#[derive(Debug)] -pub struct StmtsLspShape { - u8: PreparedStatement, - u16: PreparedStatement, - u32: PreparedStatement, - u64: PreparedStatement, - i8: PreparedStatement, - i16: PreparedStatement, - i32: PreparedStatement, - i64: PreparedStatement, - f32: PreparedStatement, - f64: PreparedStatement, - bool: PreparedStatement, - string: PreparedStatement, -} - -impl StmtsLspShape { - fn st(&self, stname: &str) -> Result<&PreparedStatement, Error> { - let ret = match stname { - "u8" => &self.u8, - _ => return Err(Error::MissingQuery(format!("no query for stname {stname}"))), - }; - Ok(ret) - } -} - -#[derive(Debug)] -pub struct StmtsLspDir { - scalar: StmtsLspShape, - array: StmtsLspShape, -} - -impl StmtsLspDir { - fn shape(&self, array: bool) -> &StmtsLspShape { - if array { - &self.array - } else { - &self.scalar - } - } -} - -#[derive(Debug)] -pub struct StmtsEventsRt { - ts_msp_fwd: PreparedStatement, - ts_msp_bck: PreparedStatement, - lsp_fwd_val: StmtsLspDir, - lsp_bck_val: StmtsLspDir, - lsp_fwd_ts: StmtsLspDir, - lsp_bck_ts: StmtsLspDir, -} - -impl StmtsEventsRt { - fn lsp(&self, bck: bool, val: bool) -> &StmtsLspDir { - if bck { - if val { - &self.lsp_bck_val - } else { - &self.lsp_bck_ts - } - } else { - if val { - &self.lsp_fwd_val - } else { - &self.lsp_fwd_ts - } - } - } -} - -#[derive(Debug)] -pub struct StmtsEvents { - st: StmtsEventsRt, - mt: StmtsEventsRt, - lt: StmtsEventsRt, -} - -async fn make_msp_dir(ks: &str, rt: &RetentionTime, bck: bool, scy: &Session) -> Result { - let table_name = "ts_msp"; - let select_cond = if bck { - "ts_msp < ? order by ts_msp desc limit 2" - } else { - "ts_msp >= ? and ts_msp < ?" - }; - let cql = format!( - "select ts_msp from {}.{}{} where series = ? and {}", - ks, - rt.table_prefix(), - table_name, - select_cond - ); - let qu = scy.prepare(cql).await?; - Ok(qu) -} - -async fn make_lsp( - ks: &str, - rt: &RetentionTime, - shapepre: &str, - stname: &str, - values: &str, - bck: bool, - scy: &Session, -) -> Result { - let select_cond = if bck { - "ts_lsp < ? order by ts_lsp desc limit 1" - } else { - "ts_lsp >= ? and ts_lsp < ?" - }; - let cql = format!( - concat!( - "select {} from {}.{}events_{}_{}", - " where series = ? and ts_msp = ? and {}" - ), - values, - ks, - rt.table_prefix(), - shapepre, - stname, - select_cond - ); - let qu = scy.prepare(cql).await?; - Ok(qu) -} - -async fn make_lsp_shape( - ks: &str, - rt: &RetentionTime, - shapepre: &str, - values: &str, - bck: bool, - scy: &Session, -) -> Result { - let values = if shapepre.contains("array") { - values.replace("value", "valueblob") - } else { - values.into() - }; - let values = &values; - let maker = |stname| make_lsp(ks, rt, shapepre, stname, values, bck, scy); - let ret = StmtsLspShape { - u8: maker("u8").await?, - u16: maker("u16").await?, - u32: maker("u32").await?, - u64: maker("u64").await?, - i8: maker("i8").await?, - i16: maker("i16").await?, - i32: maker("i32").await?, - i64: maker("i64").await?, - f32: maker("f32").await?, - f64: maker("f64").await?, - bool: maker("bool").await?, - string: maker("string").await?, - }; - Ok(ret) -} - -async fn make_lsp_dir( - ks: &str, - rt: &RetentionTime, - values: &str, - bck: bool, - scy: &Session, -) -> Result { - let ret = StmtsLspDir { - scalar: make_lsp_shape(ks, rt, "scalar", values, bck, scy).await?, - array: make_lsp_shape(ks, rt, "array", values, bck, scy).await?, - }; - Ok(ret) -} - -async fn make_rt(ks: &str, rt: &RetentionTime, scy: &Session) -> Result { - let ret = StmtsEventsRt { - ts_msp_fwd: make_msp_dir(ks, rt, false, scy).await?, - ts_msp_bck: make_msp_dir(ks, rt, true, scy).await?, - lsp_fwd_val: make_lsp_dir(ks, rt, "ts_lsp, pulse, value", false, scy).await?, - lsp_bck_val: make_lsp_dir(ks, rt, "ts_lsp, pulse, value", true, scy).await?, - lsp_fwd_ts: make_lsp_dir(ks, rt, "ts_lsp, pulse", false, scy).await?, - lsp_bck_ts: make_lsp_dir(ks, rt, "ts_lsp, pulse", true, scy).await?, - }; - Ok(ret) -} - -impl StmtsEvents { - pub(super) async fn new(ks: [&str; 3], scy: &Session) -> Result { - let ret = StmtsEvents { - st: make_rt(ks[0], &RetentionTime::Short, scy).await?, - mt: make_rt(ks[1], &RetentionTime::Medium, scy).await?, - lt: make_rt(ks[2], &RetentionTime::Long, scy).await?, - }; - Ok(ret) - } - - fn rt(&self, rt: &RetentionTime) -> &StmtsEventsRt { - match rt { - RetentionTime::Short => &self.st, - RetentionTime::Medium => &self.mt, - RetentionTime::Long => &&self.lt, - } - } -} - -pub(super) async fn find_ts_msp( - rt: &RetentionTime, - series: u64, - range: ScyllaSeriesRange, - bck: bool, - stmts: &StmtsEvents, - scy: &ScySession, -) -> Result, Error> { - trace!("find_ts_msp series {:?} {:?} {:?} bck {}", rt, series, range, bck); - if bck { - find_ts_msp_bck(rt, series, range, stmts, scy).await - } else { - find_ts_msp_fwd(rt, series, range, stmts, scy).await - } -} - -async fn find_ts_msp_fwd( - rt: &RetentionTime, - series: u64, - range: ScyllaSeriesRange, - stmts: &StmtsEvents, - scy: &ScySession, -) -> Result, Error> { - let mut ret = VecDeque::new(); - // TODO time range truncation can be handled better - let params = (series as i64, range.beg().ms() as i64, 1 + range.end().ms() as i64); - let mut res = scy - .execute_iter(stmts.rt(rt).ts_msp_fwd.clone(), params) - .await? - .into_typed::<(i64,)>(); - while let Some(x) = res.next().await { - let row = x?; - let ts = TsMs::from_ms_u64(row.0 as u64); - ret.push_back(ts); - } - Ok(ret) -} - -async fn find_ts_msp_bck( - rt: &RetentionTime, - series: u64, - range: ScyllaSeriesRange, - stmts: &StmtsEvents, - scy: &ScySession, -) -> Result, Error> { - let mut ret = VecDeque::new(); - let params = (series as i64, range.beg().ms() as i64); - let mut res = scy - .execute_iter(stmts.rt(rt).ts_msp_bck.clone(), params) - .await? - .into_typed::<(i64,)>(); - while let Some(x) = res.next().await { - let row = x?; - let ts = TsMs::from_ms_u64(row.0 as u64); - ret.push_front(ts); - } - Ok(ret) -} - -trait ValTy: Sized + 'static { +pub(super) trait ValTy: Sized + 'static { type ScaTy: ScalarOps + std::default::Default; type ScyTy: scylla::cql_to_rust::FromCqlVal; type Container: Events + Appendable; @@ -449,7 +188,7 @@ impl_scaty_array!(Vec, f32, Vec, "f32", "f32"); impl_scaty_array!(Vec, f64, Vec, "f64", "f64"); impl_scaty_array!(Vec, bool, Vec, "bool", "bool"); -struct ReadNextValuesOpts { +pub(super) struct ReadNextValuesOpts { rt: RetentionTime, series: u64, ts_msp: TsMs, @@ -459,13 +198,35 @@ struct ReadNextValuesOpts { scyqueue: ScyllaQueue, } -async fn read_next_values(opts: ReadNextValuesOpts) -> Result, Error> +impl ReadNextValuesOpts { + pub(super) fn new( + rt: RetentionTime, + series: SeriesId, + ts_msp: TsMs, + range: ScyllaSeriesRange, + fwd: bool, + with_values: bool, + scyqueue: ScyllaQueue, + ) -> Self { + Self { + rt, + series: series.id(), + ts_msp, + range, + fwd, + with_values, + scyqueue, + } + } +} + +pub(super) async fn read_next_values(opts: ReadNextValuesOpts) -> Result, Error> where ST: ValTy, { // TODO could take scyqeue out of opts struct. let scyqueue = opts.scyqueue.clone(); - let futgen = Box::new(|scy: Arc, stmts: Arc| { + let futgen = Box::new(|scy: Arc, stmts: Arc| { let fut = async { read_next_values_2::(opts, scy, stmts) .await @@ -479,7 +240,7 @@ where async fn read_next_values_2( opts: ReadNextValuesOpts, - scy: Arc, + scy: Arc, stmts: Arc, ) -> Result, Error> where @@ -511,20 +272,6 @@ where ts_lsp_max, table_name, ); - let dir = "fwd"; - let qu_name = if opts.with_values { - if ST::is_valueblob() { - format!("array_{}_valueblobs_{}", ST::st_name(), dir) - } else { - format!("scalar_{}_values_{}", ST::st_name(), dir) - } - } else { - if ST::is_valueblob() { - format!("array_{}_timestamps_{}", ST::st_name(), dir) - } else { - format!("scalar_{}_timestamps_{}", ST::st_name(), dir) - } - }; let qu = stmts .rt(&opts.rt) .lsp(!opts.fwd, opts.with_values) @@ -552,20 +299,6 @@ where DtNano::from_ns(0) }; trace!("BCK ts_msp {} ts_lsp_max {} {}", ts_msp, ts_lsp_max, table_name,); - let dir = "bck"; - let qu_name = if opts.with_values { - if ST::is_valueblob() { - format!("array_{}_valueblobs_{}", ST::st_name(), dir) - } else { - format!("scalar_{}_values_{}", ST::st_name(), dir) - } - } else { - if ST::is_valueblob() { - format!("array_{}_timestamps_{}", ST::st_name(), dir) - } else { - format!("scalar_{}_timestamps_{}", ST::st_name(), dir) - } - }; let qu = stmts .rt(&opts.rt) .lsp(!opts.fwd, opts.with_values) @@ -648,7 +381,7 @@ fn convert_rows( Ok(ret) } -struct ReadValues { +pub(super) struct ReadValues { rt: RetentionTime, series: u64, scalar_type: ScalarType, @@ -663,7 +396,7 @@ struct ReadValues { } impl ReadValues { - fn new( + pub(super) fn new( rt: RetentionTime, series: u64, scalar_type: ScalarType, @@ -795,7 +528,7 @@ pub struct EventsStreamScylla { } impl EventsStreamScylla { - pub fn new( + pub fn _new( rt: RetentionTime, series: u64, range: ScyllaSeriesRange, @@ -956,16 +689,6 @@ impl EventsStreamScylla { } } -async fn find_ts_msp_via_queue( - rt: RetentionTime, - series: u64, - range: ScyllaSeriesRange, - bck: bool, - scyqueue: ScyllaQueue, -) -> Result, crate::worker::Error> { - scyqueue.find_ts_msp(rt, series, range, bck).await -} - impl Stream for EventsStreamScylla { type Item = Result; @@ -998,8 +721,9 @@ impl Stream for EventsStreamScylla { let series = self.series.clone(); let range = self.range.clone(); // TODO this no longer works, we miss the backwards part here - let fut = find_ts_msp_via_queue(self.rt.clone(), series, range, false, self.scyqueue.clone()); - let fut = Box::pin(fut); + // let fut = find_ts_msp_via_queue(self.rt.clone(), series, range, false, self.scyqueue.clone()); + // let fut = Box::pin(fut); + let fut = todo!(); self.state = FrState::FindMsp(fut); continue; } diff --git a/crates/scyllaconn/src/events2.rs b/crates/scyllaconn/src/events2.rs index a0554af..ea58344 100644 --- a/crates/scyllaconn/src/events2.rs +++ b/crates/scyllaconn/src/events2.rs @@ -1 +1,6 @@ +pub mod events; +pub mod firstbefore; +pub mod mergert; pub mod msp; +pub mod nonempty; +pub mod prepare; diff --git a/crates/scyllaconn/src/events2/events.rs b/crates/scyllaconn/src/events2/events.rs new file mode 100644 index 0000000..6132502 --- /dev/null +++ b/crates/scyllaconn/src/events2/events.rs @@ -0,0 +1,360 @@ +use super::msp::MspStreamRt; +use crate::events::read_next_values; +use crate::events::ReadNextValuesOpts; +use crate::range::ScyllaSeriesRange; +use crate::worker::ScyllaQueue; +use err::thiserror; +use err::ThisError; +use futures_util::Future; +use futures_util::FutureExt; +use futures_util::Stream; +use futures_util::StreamExt; +use items_0::Events; +use items_2::channelevents::ChannelEvents; +use netpod::log::*; +use netpod::ttl::RetentionTime; +use netpod::ScalarType; +use netpod::Shape; +use netpod::TsMs; +use series::SeriesId; +use std::collections::VecDeque; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +#[allow(unused)] +macro_rules! trace_emit { + ($($arg:tt)*) => { + if true { + trace!($($arg)*); + } + }; +} + +#[allow(unused)] +macro_rules! warn_item { + ($($arg:tt)*) => { + if true { + debug!($($arg)*); + } + }; +} + +#[derive(Debug, ThisError)] +pub enum Error { + Worker(#[from] crate::worker::Error), + Events(#[from] crate::events::Error), + Msp(#[from] crate::events2::msp::Error), + Unordered, + OutOfRange, + BadBatch, + Logic, + Merge(#[from] items_0::MergeError), + TruncateLogic, +} + +struct FetchMsp { + fut: Pin>> + Send>>, +} + +struct FetchEvents { + fut: Pin, crate::events2::events::Error>> + Send>>, +} + +enum ReadingState { + FetchMsp(FetchMsp), + FetchEvents(FetchEvents), +} + +struct Reading { + scyqueue: ScyllaQueue, + reading_state: ReadingState, +} + +enum State { + Begin, + Reading(Reading), + InputDone, + Done, +} + +pub struct EventsStreamRt { + rt: RetentionTime, + series: SeriesId, + scalar_type: ScalarType, + shape: Shape, + range: ScyllaSeriesRange, + with_values: bool, + state: State, + scyqueue: ScyllaQueue, + msp_inp: MspStreamRt, + out: VecDeque>, + ts_seen_max: u64, +} + +impl EventsStreamRt { + pub fn new( + rt: RetentionTime, + series: SeriesId, + scalar_type: ScalarType, + shape: Shape, + range: ScyllaSeriesRange, + with_values: bool, + scyqueue: ScyllaQueue, + ) -> Self { + debug!("EventsStreamRt::new {series:?} {range:?} {rt:?}"); + let msp_inp = + crate::events2::msp::MspStreamRt::new(rt.clone(), series.clone(), range.clone(), scyqueue.clone()); + Self { + rt, + series, + scalar_type, + shape, + range, + with_values, + state: State::Begin, + scyqueue, + msp_inp, + out: VecDeque::new(), + ts_seen_max: 0, + } + } + + fn __handle_reading(self: Pin<&mut Self>, st: &mut Reading, cx: &mut Context) -> Result<(), Error> { + let _ = st; + let _ = cx; + todo!() + } + + fn make_read_events_fut( + &mut self, + ts_msp: TsMs, + scyqueue: ScyllaQueue, + ) -> Pin, Error>> + Send>> { + let fwd = true; + let opts = ReadNextValuesOpts::new( + self.rt.clone(), + self.series.clone(), + ts_msp, + self.range.clone(), + fwd, + self.with_values, + scyqueue, + ); + let scalar_type = self.scalar_type.clone(); + let shape = self.shape.clone(); + let fut = async move { + let ret = match &shape { + Shape::Scalar => match &scalar_type { + ScalarType::U8 => read_next_values::(opts).await, + ScalarType::U16 => read_next_values::(opts).await, + ScalarType::U32 => read_next_values::(opts).await, + ScalarType::U64 => read_next_values::(opts).await, + ScalarType::I8 => read_next_values::(opts).await, + ScalarType::I16 => read_next_values::(opts).await, + ScalarType::I32 => read_next_values::(opts).await, + ScalarType::I64 => read_next_values::(opts).await, + ScalarType::F32 => read_next_values::(opts).await, + ScalarType::F64 => read_next_values::(opts).await, + ScalarType::BOOL => read_next_values::(opts).await, + ScalarType::STRING => read_next_values::(opts).await, + ScalarType::Enum => read_next_values::(opts).await, + ScalarType::ChannelStatus => { + warn!("read scalar channel status not yet supported"); + err::todoval() + } + }, + Shape::Wave(_) => match &scalar_type { + ScalarType::U8 => read_next_values::>(opts).await, + ScalarType::U16 => read_next_values::>(opts).await, + ScalarType::U32 => read_next_values::>(opts).await, + ScalarType::U64 => read_next_values::>(opts).await, + ScalarType::I8 => read_next_values::>(opts).await, + ScalarType::I16 => read_next_values::>(opts).await, + ScalarType::I32 => read_next_values::>(opts).await, + ScalarType::I64 => read_next_values::>(opts).await, + ScalarType::F32 => read_next_values::>(opts).await, + ScalarType::F64 => read_next_values::>(opts).await, + ScalarType::BOOL => read_next_values::>(opts).await, + ScalarType::STRING => { + warn!("read array string not yet supported"); + err::todoval() + } + ScalarType::Enum => read_next_values::>(opts).await, + ScalarType::ChannelStatus => { + warn!("read array channel status not yet supported"); + err::todoval() + } + }, + _ => { + error!("TODO ReadValues add more types"); + err::todoval() + } + }; + ret.map_err(Error::from) + }; + Box::pin(fut) + } +} + +impl Stream for EventsStreamRt { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + loop { + if let Some(mut item) = self.out.pop_front() { + if !item.verify() { + warn_item!("{}bad item {:?}", "\n\n--------------------------\n", item); + self.state = State::Done; + break Ready(Some(Err(Error::BadBatch))); + } + if let Some(item_min) = item.ts_min() { + if item_min < self.range.beg().ns() { + warn_item!( + "{}out of range error A {} {:?}", + "\n\n--------------------------\n", + item_min, + self.range + ); + self.state = State::Done; + break Ready(Some(Err(Error::OutOfRange))); + } + if item_min < self.ts_seen_max { + warn_item!( + "{}ordering error A {} {}", + "\n\n--------------------------\n", + item_min, + self.ts_seen_max + ); + let mut r = items_2::merger::Mergeable::new_empty(&item); + match items_2::merger::Mergeable::find_highest_index_lt(&item, self.ts_seen_max) { + Some(ix) => { + match items_2::merger::Mergeable::drain_into(&mut item, &mut r, (0, ix)) { + Ok(()) => {} + Err(e) => { + self.state = State::Done; + break Ready(Some(Err(e.into()))); + } + } + // self.state = State::Done; + // break Ready(Some(Err(Error::Unordered))); + } + None => { + self.state = State::Done; + break Ready(Some(Err(Error::TruncateLogic))); + } + } + } + } + if let Some(item_max) = item.ts_max() { + if item_max >= self.range.end().ns() { + warn_item!( + "{}out of range error B {} {:?}", + "\n\n--------------------------\n", + item_max, + self.range + ); + self.state = State::Done; + break Ready(Some(Err(Error::OutOfRange))); + } + if item_max < self.ts_seen_max { + warn_item!( + "{}ordering error B {} {}", + "\n\n--------------------------\n", + item_max, + self.ts_seen_max + ); + self.state = State::Done; + break Ready(Some(Err(Error::Unordered))); + } else { + self.ts_seen_max = item_max; + } + } + trace_emit!("deliver item {}", item.output_info()); + break Ready(Some(Ok(ChannelEvents::Events(item)))); + } + break match &mut self.state { + State::Begin => { + let msp_inp = unsafe { + let ptr = (&mut self.msp_inp) as *mut MspStreamRt; + &mut *ptr + }; + let fut = Box::pin(msp_inp.next()); + self.state = State::Reading(Reading { + scyqueue: self.scyqueue.clone(), + reading_state: ReadingState::FetchMsp(FetchMsp { fut }), + }); + continue; + } + State::Reading(st) => match &mut st.reading_state { + ReadingState::FetchMsp(st2) => match st2.fut.poll_unpin(cx) { + Ready(Some(Ok(ts))) => { + let scyqueue = st.scyqueue.clone(); + let fut = self.make_read_events_fut(ts, scyqueue); + if let State::Reading(st) = &mut self.state { + st.reading_state = ReadingState::FetchEvents(FetchEvents { fut }); + continue; + } else { + self.state = State::Done; + Ready(Some(Err(Error::Logic))) + } + } + Ready(Some(Err(e))) => Ready(Some(Err(e.into()))), + Ready(None) => { + self.state = State::InputDone; + continue; + } + Pending => Pending, + }, + ReadingState::FetchEvents(st2) => match st2.fut.poll_unpin(cx) { + Ready(Ok(x)) => { + self.out.push_back(x); + let msp_inp = unsafe { + let ptr = (&mut self.msp_inp) as *mut MspStreamRt; + &mut *ptr + }; + let fut = Box::pin(msp_inp.next()); + if let State::Reading(st) = &mut self.state { + st.reading_state = ReadingState::FetchMsp(FetchMsp { fut }); + continue; + } else { + self.state = State::Done; + Ready(Some(Err(Error::Logic))) + } + } + Ready(Err(e)) => { + self.state = State::Done; + Ready(Some(Err(e.into()))) + } + Pending => Pending, + }, + }, + State::InputDone => { + if self.out.len() == 0 { + Ready(None) + } else { + continue; + } + } + State::Done => Ready(None), + }; + } + } +} + +fn trait_assert(_: T) +where + T: Stream + Unpin + Send, +{ +} + +#[allow(unused)] +fn trait_assert_try() { + let x: EventsStreamRt = phantomval(); + trait_assert(x); +} + +fn phantomval() -> T { + panic!() +} diff --git a/crates/scyllaconn/src/events2/firstbefore.rs b/crates/scyllaconn/src/events2/firstbefore.rs new file mode 100644 index 0000000..da102eb --- /dev/null +++ b/crates/scyllaconn/src/events2/firstbefore.rs @@ -0,0 +1,179 @@ +use err::thiserror; +use err::ThisError; +use futures_util::Stream; +use futures_util::StreamExt; +use items_0::Events; +use items_2::merger::Mergeable; +use netpod::log::*; +use netpod::TsNano; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +#[derive(Debug, ThisError)] +pub enum Error { + Unordered, + Logic, + Input(Box), +} + +pub enum Output { + First(T, T), + Bulk(T), +} + +enum State { + Begin, + Bulk, + Done, +} + +pub struct FirstBeforeAndInside +where + S: Stream + Unpin, + T: Events + Mergeable + Unpin, +{ + ts0: TsNano, + inp: S, + state: State, + buf: Option, +} + +impl FirstBeforeAndInside +where + S: Stream + Unpin, + T: Events + Mergeable + Unpin, +{ + pub fn new(inp: S, ts0: TsNano) -> Self { + Self { + ts0, + inp, + state: State::Begin, + buf: None, + } + } +} + +impl Stream for FirstBeforeAndInside +where + S: Stream> + Unpin, + T: Events + Mergeable + Unpin, + E: std::error::Error + Send + 'static, +{ + type Item = Result, Error>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + loop { + break match &self.state { + State::Begin => match self.inp.poll_next_unpin(cx) { + Ready(Some(Ok(mut item))) => { + // It is an invariant that we process ordered streams, but for robustness + // verify the batch item again: + if item.verify() != true { + self.state = State::Done; + let e = Error::Unordered; + Ready(Some(Err(e))) + } else { + // Separate events into before and bulk + let tss = item.tss(); + let pp = tss.partition_point(|&x| x < self.ts0.ns()); + if pp >= tss.len() { + // all entries are before + if self.buf.is_none() { + self.buf = Some(item.new_empty()); + } + match item.drain_into_evs(self.buf.as_mut().unwrap(), (0, item.len())) { + Ok(()) => { + continue; + } + Err(e) => { + self.state = State::Done; + Ready(Some(Err(Error::Input(Box::new(e))))) + } + } + } else if pp == 0 { + // all entries are bulk + debug!("transition immediately to bulk"); + self.state = State::Bulk; + let o1 = core::mem::replace(&mut self.buf, Some(item.new_empty())) + .unwrap_or_else(|| item.new_empty()); + Ready(Some(Ok(Output::First(o1, item)))) + } else { + // mixed + match item.drain_into_evs(self.buf.as_mut().unwrap(), (0, pp)) { + Ok(()) => { + debug!("transition with mixed to bulk"); + self.state = State::Bulk; + let o1 = core::mem::replace(&mut self.buf, Some(item.new_empty())) + .unwrap_or_else(|| item.new_empty()); + Ready(Some(Ok(Output::First(o1, item)))) + } + Err(e) => { + self.state = State::Done; + Ready(Some(Err(Error::Input(Box::new(e))))) + } + } + } + } + } + Ready(Some(Err(e))) => { + self.state = State::Done; + Ready(Some(Err(Error::Input(Box::new(e))))) + } + Ready(None) => { + self.state = State::Done; + Ready(None) + } + Pending => Pending, + }, + State::Bulk => { + if self.buf.as_ref().map_or(0, |x| x.len()) != 0 { + self.state = State::Done; + Ready(Some(Err(Error::Logic))) + } else { + match self.inp.poll_next_unpin(cx) { + Ready(Some(Ok(item))) => { + if item.verify() != true { + self.state = State::Done; + let e = Error::Unordered; + Ready(Some(Err(e))) + } else { + debug!("output bulk item len {}", item.len()); + Ready(Some(Ok(Output::Bulk(item)))) + } + } + Ready(Some(Err(e))) => { + self.state = State::Done; + Ready(Some(Err(Error::Input(Box::new(e))))) + } + Ready(None) => { + debug!("in bulk, input done"); + self.state = State::Done; + Ready(None) + } + Pending => Pending, + } + } + } + State::Done => Ready(None), + }; + } + } +} + +fn trait_assert(_: T) +where + T: Stream + Unpin + Send, +{ +} + +#[allow(unused)] +fn trait_assert_try() { + let x: FirstBeforeAndInside = phantomval(); + trait_assert(x); +} + +fn phantomval() -> T { + panic!() +} diff --git a/crates/scyllaconn/src/events2/mergert.rs b/crates/scyllaconn/src/events2/mergert.rs new file mode 100644 index 0000000..158415a --- /dev/null +++ b/crates/scyllaconn/src/events2/mergert.rs @@ -0,0 +1,559 @@ +use super::events::EventsStreamRt; +use super::firstbefore::FirstBeforeAndInside; +use crate::events2::firstbefore; +use crate::range::ScyllaSeriesRange; +use crate::worker::ScyllaQueue; +use err::thiserror; +use err::ThisError; +use futures_util::Future; +use futures_util::FutureExt; +use futures_util::Stream; +use futures_util::StreamExt; +use items_0::WithLen; +use items_2::channelevents::ChannelEvents; +use items_2::merger::Mergeable; +use netpod::log::*; +use netpod::range::evrange::NanoRange; +use netpod::range::evrange::SeriesRange; +use netpod::ttl::RetentionTime; +use netpod::ScalarType; +use netpod::Shape; +use series::SeriesId; +use std::collections::VecDeque; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +#[derive(Debug, ThisError)] +pub enum Error { + Input(#[from] crate::events2::firstbefore::Error), + Events(#[from] crate::events2::events::Error), + Logic, +} + +enum Resolvable +where + F: Future, +{ + Future(F), + Output(::Output), + Taken, +} + +impl Resolvable +where + F: Future, +{ + fn unresolved(&self) -> bool { + match self { + Resolvable::Future(_) => true, + Resolvable::Output(_) => false, + Resolvable::Taken => false, + } + } + + fn take(&mut self) -> Option<::Output> { + let x = std::mem::replace(self, Resolvable::Taken); + match x { + Resolvable::Future(_) => None, + Resolvable::Output(x) => Some(x), + Resolvable::Taken => None, + } + } +} + +impl Future for Resolvable +where + F: Future + Unpin, +{ + type Output = ::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<::Output> { + match unsafe { self.get_unchecked_mut() } { + Resolvable::Future(fut) => fut.poll_unpin(cx), + Resolvable::Output(_) => panic!(), + Resolvable::Taken => panic!(), + } + } +} + +type TI = FirstBeforeAndInside; +type INPI = Result, crate::events2::firstbefore::Error>; + +struct ReadEvents { + fut: Pin> + Send>>, +} + +enum State { + Begin, + FetchFirstSt(ReadEvents), + FetchFirstMt(ReadEvents), + FetchFirstLt(ReadEvents), + ReadingLt(Option, VecDeque, Option>), + ReadingMt(Option, VecDeque, Option>), + ReadingSt(Option, VecDeque, Option>), + Done, +} + +pub struct MergeRts { + series: SeriesId, + scalar_type: ScalarType, + shape: Shape, + range: ScyllaSeriesRange, + range_mt: ScyllaSeriesRange, + range_lt: ScyllaSeriesRange, + with_values: bool, + scyqueue: ScyllaQueue, + inp_st: Option>, + inp_mt: Option>, + inp_lt: Option>, + state: State, + buf_st: VecDeque, + buf_mt: VecDeque, + buf_lt: VecDeque, + out: VecDeque, + buf_before: Option, + ts_seen_max: u64, +} + +impl MergeRts { + pub fn new( + series: SeriesId, + scalar_type: ScalarType, + shape: Shape, + range: ScyllaSeriesRange, + with_values: bool, + scyqueue: ScyllaQueue, + ) -> Self { + Self { + series, + scalar_type, + shape, + range_mt: range.clone(), + range_lt: range.clone(), + range, + with_values, + scyqueue, + inp_st: None, + inp_mt: None, + inp_lt: None, + state: State::Begin, + buf_st: VecDeque::new(), + buf_mt: VecDeque::new(), + buf_lt: VecDeque::new(), + out: VecDeque::new(), + buf_before: None, + ts_seen_max: 0, + } + } + + fn setup_first_st(&mut self) { + let rt = RetentionTime::Short; + let limbuf = &VecDeque::new(); + let inpdst = &mut self.inp_st; + let range = Self::constrained_range(&self.range, limbuf); + debug!("setup_first_st constrained beg {}", range.beg().ns()); + let tsbeg = range.beg(); + let inp = EventsStreamRt::new( + rt, + self.series.clone(), + self.scalar_type.clone(), + self.shape.clone(), + range, + self.with_values, + self.scyqueue.clone(), + ); + let inp = TI::new(inp, tsbeg); + *inpdst = Some(Box::new(inp)); + } + + fn setup_first_mt(&mut self) { + let rt = RetentionTime::Medium; + let limbuf = &self.buf_st; + let inpdst = &mut self.inp_mt; + let range = Self::constrained_range(&self.range_mt, limbuf); + self.range_lt = range.clone(); + debug!("setup_first_mt constrained beg {}", range.beg().ns()); + let tsbeg = range.beg(); + let inp = EventsStreamRt::new( + rt, + self.series.clone(), + self.scalar_type.clone(), + self.shape.clone(), + range, + self.with_values, + self.scyqueue.clone(), + ); + let inp = TI::new(inp, tsbeg); + *inpdst = Some(Box::new(inp)); + } + + fn setup_first_lt(&mut self) { + let rt = RetentionTime::Long; + let limbuf = &self.buf_mt; + let inpdst = &mut self.inp_lt; + let range = Self::constrained_range(&self.range_lt, limbuf); + debug!("setup_first_lt constrained beg {}", range.beg().ns()); + let tsbeg = range.beg(); + let inp = EventsStreamRt::new( + rt, + self.series.clone(), + self.scalar_type.clone(), + self.shape.clone(), + range, + self.with_values, + self.scyqueue.clone(), + ); + let inp = TI::new(inp, tsbeg); + *inpdst = Some(Box::new(inp)); + } + + fn setup_read_st(&mut self) -> ReadEvents { + let stream = unsafe { &mut *(self.inp_st.as_mut().unwrap().as_mut() as *mut TI) }; + let fut = Box::pin(stream.next()); + ReadEvents { fut } + } + + fn setup_read_mt(&mut self) -> ReadEvents { + let stream = unsafe { &mut *(self.inp_mt.as_mut().unwrap().as_mut() as *mut TI) }; + let fut = Box::pin(stream.next()); + ReadEvents { fut } + } + + fn setup_read_lt(&mut self) -> ReadEvents { + let stream = unsafe { &mut *(self.inp_lt.as_mut().unwrap().as_mut() as *mut TI) }; + let fut = Box::pin(stream.next()); + ReadEvents { fut } + } + + fn setup_read_any(inp: &mut Option>) -> ReadEvents { + let stream = unsafe { &mut *(inp.as_mut().unwrap().as_mut() as *mut TI) }; + let fut = Box::pin(stream.next()); + ReadEvents { fut } + } + + fn constrained_range(full: &ScyllaSeriesRange, buf: &VecDeque) -> ScyllaSeriesRange { + debug!("constrained_range {:?} {:?}", full, buf.front()); + if let Some(e) = buf.front() { + if let Some(ts) = e.ts_min() { + let nrange = NanoRange::from((full.beg().ns(), ts)); + ScyllaSeriesRange::from(&SeriesRange::from(nrange)) + } else { + debug!("no ts even though should not have empty buffers"); + full.clone() + } + } else { + full.clone() + } + } + + fn handle_first_st(&mut self, mut before: ChannelEvents, bulk: ChannelEvents) { + Self::move_latest_to_before_buf(&mut before, &mut self.buf_before); + self.buf_st.push_back(bulk); + self.setup_first_mt(); + self.state = State::FetchFirstMt(self.setup_read_mt()); + } + + fn handle_first_mt(&mut self, mut before: ChannelEvents, bulk: ChannelEvents) { + Self::move_latest_to_before_buf(&mut before, &mut self.buf_before); + self.buf_mt.push_back(bulk); + self.setup_first_lt(); + self.state = State::FetchFirstLt(self.setup_read_lt()); + } + + fn handle_first_lt(&mut self, mut before: ChannelEvents, bulk: ChannelEvents) { + Self::move_latest_to_before_buf(&mut before, &mut self.buf_before); + self.buf_lt.push_back(bulk); + let buf = core::mem::replace(&mut self.buf_lt, VecDeque::new()); + self.state = State::ReadingLt(None, buf, self.inp_lt.take()); + } + + fn move_latest_to_before_buf(before: &mut ChannelEvents, buf: &mut Option) { + if buf.is_none() { + *buf = Some(before.new_empty()); + } + let buf = buf.as_mut().unwrap(); + if let Some(tsn) = before.ts_max() { + if let Some(tse) = buf.ts_max() { + if tsn > tse { + let n = before.len(); + buf.clear(); + before.drain_into(buf, (n - 1, n)).unwrap(); + } + } + } + } +} + +impl Stream for MergeRts { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + let mut out2 = VecDeque::new(); + loop { + while let Some(x) = out2.pop_front() { + self.out.push_back(x); + } + if let Some(item) = self.out.pop_front() { + debug!("emit item {} {:?}", items_0::Events::verify(&item), item); + if items_0::Events::verify(&item) != true { + debug!("{}bad item {:?}", "\n\n--------------------------\n", item); + self.state = State::Done; + } + if let Some(item_min) = item.ts_min() { + if item_min < self.ts_seen_max { + debug!( + "{}ordering error A {} {}", + "\n\n--------------------------\n", item_min, self.ts_seen_max + ); + self.state = State::Done; + break Ready(Some(Err(Error::Logic))); + } + } + if let Some(item_max) = item.ts_max() { + if item_max < self.ts_seen_max { + debug!( + "{}ordering error B {} {}", + "\n\n--------------------------\n", item_max, self.ts_seen_max + ); + self.state = State::Done; + break Ready(Some(Err(Error::Logic))); + } else { + self.ts_seen_max = item_max; + } + } + break Ready(Some(Ok(item))); + } + break match &mut self.state { + State::Begin => { + self.setup_first_st(); + self.state = State::FetchFirstSt(self.setup_read_st()); + continue; + } + State::FetchFirstSt(st2) => match st2.fut.poll_unpin(cx) { + Ready(Some(Ok(x))) => match x { + firstbefore::Output::First(before, bulk) => { + debug!("have first from ST"); + self.handle_first_st(before, bulk); + continue; + } + firstbefore::Output::Bulk(_) => { + self.state = State::Done; + let e = Error::Logic; + Ready(Some(Err(e))) + } + }, + Ready(Some(Err(e))) => { + self.state = State::Done; + Ready(Some(Err(e.into()))) + } + Ready(None) => { + debug!("no first from ST"); + self.inp_st = None; + self.setup_first_mt(); + self.state = State::FetchFirstMt(self.setup_read_mt()); + continue; + } + Pending => Pending, + }, + State::FetchFirstMt(st2) => match st2.fut.poll_unpin(cx) { + Ready(Some(Ok(x))) => match x { + firstbefore::Output::First(before, bulk) => { + debug!("have first from MT"); + self.handle_first_mt(before, bulk); + continue; + } + firstbefore::Output::Bulk(_) => { + self.state = State::Done; + let e = Error::Logic; + Ready(Some(Err(e))) + } + }, + Ready(Some(Err(e))) => { + self.state = State::Done; + Ready(Some(Err(e.into()))) + } + Ready(None) => { + debug!("no first from MT"); + self.inp_mt = None; + self.setup_first_lt(); + self.state = State::FetchFirstLt(self.setup_read_lt()); + continue; + } + Pending => Pending, + }, + State::FetchFirstLt(st2) => match st2.fut.poll_unpin(cx) { + Ready(Some(Ok(x))) => match x { + firstbefore::Output::First(before, bulk) => { + debug!("have first from LT"); + self.handle_first_lt(before, bulk); + continue; + } + firstbefore::Output::Bulk(_) => { + self.state = State::Done; + let e = Error::Logic; + Ready(Some(Err(e))) + } + }, + Ready(Some(Err(e))) => { + self.state = State::Done; + Ready(Some(Err(e.into()))) + } + Ready(None) => { + debug!("no first from LT"); + self.inp_lt = None; + let buf = core::mem::replace(&mut self.buf_lt, VecDeque::new()); + self.state = State::ReadingLt(None, buf, self.inp_lt.take()); + continue; + } + Pending => Pending, + }, + State::ReadingLt(fut, buf, inp) => { + if let Some(x) = buf.pop_front() { + out2.push_back(x); + continue; + } else if let Some(fut2) = fut { + match fut2.fut.poll_unpin(cx) { + Ready(Some(Ok(x))) => { + *fut = None; + match x { + firstbefore::Output::Bulk(x) => { + buf.push_back(x); + continue; + } + firstbefore::Output::First(_, _) => { + self.state = State::Done; + let e = Error::Logic; + Ready(Some(Err(e))) + } + } + } + Ready(Some(Err(e))) => { + *fut = None; + self.state = State::Done; + Ready(Some(Err(e.into()))) + } + Ready(None) => { + *fut = None; + *inp = None; + continue; + } + Pending => Pending, + } + } else if inp.is_some() { + let buf = core::mem::replace(buf, VecDeque::new()); + self.state = State::ReadingLt(Some(Self::setup_read_any(inp)), buf, inp.take()); + continue; + } else { + debug!("transition ReadingLt to ReadingMt"); + let buf = core::mem::replace(&mut self.buf_mt, VecDeque::new()); + self.state = State::ReadingMt(None, buf, self.inp_mt.take()); + continue; + } + } + State::ReadingMt(fut, buf, inp) => { + if let Some(x) = buf.pop_front() { + out2.push_back(x); + continue; + } else if let Some(fut2) = fut { + match fut2.fut.poll_unpin(cx) { + Ready(Some(Ok(x))) => { + *fut = None; + match x { + firstbefore::Output::Bulk(x) => { + buf.push_back(x); + continue; + } + firstbefore::Output::First(_, _) => { + self.state = State::Done; + let e = Error::Logic; + Ready(Some(Err(e))) + } + } + } + Ready(Some(Err(e))) => { + *fut = None; + self.state = State::Done; + Ready(Some(Err(e.into()))) + } + Ready(None) => { + *fut = None; + *inp = None; + continue; + } + Pending => Pending, + } + } else if inp.is_some() { + let buf = core::mem::replace(buf, VecDeque::new()); + self.state = State::ReadingMt(Some(Self::setup_read_any(inp)), buf, inp.take()); + continue; + } else { + debug!("transition ReadingMt to ReadingSt"); + let buf = core::mem::replace(&mut self.buf_st, VecDeque::new()); + self.state = State::ReadingSt(None, buf, self.inp_st.take()); + continue; + } + } + State::ReadingSt(fut, buf, inp) => { + if let Some(x) = buf.pop_front() { + out2.push_back(x); + continue; + } else if let Some(fut2) = fut { + match fut2.fut.poll_unpin(cx) { + Ready(Some(Ok(x))) => { + *fut = None; + match x { + firstbefore::Output::Bulk(x) => { + buf.push_back(x); + continue; + } + firstbefore::Output::First(_, _) => { + self.state = State::Done; + let e = Error::Logic; + Ready(Some(Err(e))) + } + } + } + Ready(Some(Err(e))) => { + *fut = None; + self.state = State::Done; + Ready(Some(Err(e.into()))) + } + Ready(None) => { + *fut = None; + *inp = None; + continue; + } + Pending => Pending, + } + } else if inp.is_some() { + let buf = core::mem::replace(buf, VecDeque::new()); + self.state = State::ReadingSt(Some(Self::setup_read_any(inp)), buf, inp.take()); + continue; + } else { + debug!("fully done"); + Ready(None) + } + } + State::Done => Ready(None), + }; + } + } +} + +fn trait_assert(_: T) +where + T: Stream + Unpin + Send, +{ +} + +#[allow(unused)] +fn trait_assert_try() { + let x: MergeRts = phantomval(); + trait_assert(x); +} + +fn phantomval() -> T { + panic!() +} diff --git a/crates/scyllaconn/src/events2/msp.rs b/crates/scyllaconn/src/events2/msp.rs index a9213b3..89b02e2 100644 --- a/crates/scyllaconn/src/events2/msp.rs +++ b/crates/scyllaconn/src/events2/msp.rs @@ -1,3 +1,4 @@ +use super::prepare::StmtsEvents; use crate::range::ScyllaSeriesRange; use crate::worker::ScyllaQueue; use err::thiserror; @@ -5,8 +6,11 @@ use err::ThisError; use futures_util::Future; use futures_util::FutureExt; use futures_util::Stream; +use futures_util::StreamExt; +use netpod::log::*; use netpod::ttl::RetentionTime; use netpod::TsMs; +use scylla::Session; use series::SeriesId; use std::collections::VecDeque; use std::pin::Pin; @@ -15,8 +19,17 @@ use std::task::Poll; #[derive(Debug, ThisError)] pub enum Error { - Worker(#[from] crate::worker::Error), Logic, + #[error("Worker({0})")] + Worker(Box), + ScyllaQuery(#[from] scylla::transport::errors::QueryError), + ScyllaRow(#[from] scylla::transport::iterator::NextRowError), +} + +impl From for Error { + fn from(value: crate::worker::Error) -> Self { + Self::Worker(Box::new(value)) + } } enum Resolvable @@ -54,7 +67,7 @@ enum State { } #[pin_project::pin_project] -pub struct MspStream { +pub struct MspStreamRt { rt: RetentionTime, series: SeriesId, range: ScyllaSeriesRange, @@ -63,7 +76,7 @@ pub struct MspStream { out: VecDeque, } -impl MspStream { +impl MspStreamRt { pub fn new(rt: RetentionTime, series: SeriesId, range: ScyllaSeriesRange, scyqueue: ScyllaQueue) -> Self { let fut_bck = { let scyqueue = scyqueue.clone(); @@ -93,7 +106,7 @@ impl MspStream { } } -impl Stream for MspStream { +impl Stream for MspStreamRt { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { @@ -112,8 +125,7 @@ impl Stream for MspStream { have_pending = true; } }, - Resolvable::Output(_) => {} - Resolvable::Taken => {} + _ => {} } let rsv = &mut st.fut_fwd; match rsv { @@ -125,37 +137,28 @@ impl Stream for MspStream { have_pending = true; } }, - Resolvable::Output(_) => {} - Resolvable::Taken => {} + _ => {} } if have_pending { Pending } else { let taken_bck = st.fut_bck.take(); let taken_fwd = st.fut_fwd.take(); - if let Some(x) = taken_bck { - match x { - Ok(v) => { - for e in v { - self.out.push_back(e) - } - - if let Some(x) = taken_fwd { - match x { - Ok(v) => { - for e in v { - self.out.push_back(e) - } - - self.state = State::InputDone; - continue; - } - Err(e) => Ready(Some(Err(e.into()))), + self.state = State::InputDone; + if let (Some(taken_bck), Some(taken_fwd)) = (taken_bck, taken_fwd) { + match taken_bck { + Ok(v1) => match taken_fwd { + Ok(v2) => { + for e in v1 { + self.out.push_back(e) } - } else { - Ready(Some(Err(Error::Logic))) + for e in v2 { + self.out.push_back(e) + } + continue; } - } + Err(e) => Ready(Some(Err(e.into()))), + }, Err(e) => Ready(Some(Err(e.into()))), } } else { @@ -183,10 +186,69 @@ where #[allow(unused)] fn trait_assert_try() { - let x: MspStream = todoval(); + let x: MspStreamRt = phantomval(); trait_assert(x); } -fn todoval() -> T { - todo!() +fn phantomval() -> T { + panic!() +} + +pub async fn find_ts_msp( + rt: &RetentionTime, + series: u64, + range: ScyllaSeriesRange, + bck: bool, + stmts: &StmtsEvents, + scy: &Session, +) -> Result, Error> { + trace!("find_ts_msp series {:?} {:?} {:?} bck {}", rt, series, range, bck); + if bck { + find_ts_msp_bck(rt, series, range, stmts, scy).await + } else { + find_ts_msp_fwd(rt, series, range, stmts, scy).await + } +} + +async fn find_ts_msp_fwd( + rt: &RetentionTime, + series: u64, + range: ScyllaSeriesRange, + stmts: &StmtsEvents, + scy: &Session, +) -> Result, Error> { + let mut ret = VecDeque::new(); + // TODO time range truncation can be handled better + let params = (series as i64, range.beg().ms() as i64, 1 + range.end().ms() as i64); + let mut res = scy + .execute_iter(stmts.rt(rt).ts_msp_fwd().clone(), params) + .await? + .into_typed::<(i64,)>(); + while let Some(x) = res.next().await { + let row = x?; + let ts = TsMs::from_ms_u64(row.0 as u64); + ret.push_back(ts); + } + Ok(ret) +} + +async fn find_ts_msp_bck( + rt: &RetentionTime, + series: u64, + range: ScyllaSeriesRange, + stmts: &StmtsEvents, + scy: &Session, +) -> Result, Error> { + let mut ret = VecDeque::new(); + let params = (series as i64, range.beg().ms() as i64); + let mut res = scy + .execute_iter(stmts.rt(rt).ts_msp_bck().clone(), params) + .await? + .into_typed::<(i64,)>(); + while let Some(x) = res.next().await { + let row = x?; + let ts = TsMs::from_ms_u64(row.0 as u64); + ret.push_front(ts); + } + Ok(ret) } diff --git a/crates/scyllaconn/src/events2/nonempty.rs b/crates/scyllaconn/src/events2/nonempty.rs new file mode 100644 index 0000000..565576a --- /dev/null +++ b/crates/scyllaconn/src/events2/nonempty.rs @@ -0,0 +1,42 @@ +use futures_util::Stream; +use futures_util::StreamExt; +use items_0::WithLen; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +pub struct NonEmpty { + inp: S, +} + +impl NonEmpty { + pub fn new(inp: S) -> Self { + Self { inp } + } +} + +impl Stream for NonEmpty +where + S: Stream> + Unpin, + T: WithLen, +{ + type Item = ::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + loop { + break match self.inp.poll_next_unpin(cx) { + Ready(Some(Ok(x))) => { + if x.len() != 0 { + Ready(Some(Ok(x))) + } else { + continue; + } + } + Ready(Some(Err(e))) => Ready(Some(Err(e))), + Ready(None) => Ready(None), + Pending => Pending, + }; + } + } +} diff --git a/crates/scyllaconn/src/events2/prepare.rs b/crates/scyllaconn/src/events2/prepare.rs new file mode 100644 index 0000000..a631885 --- /dev/null +++ b/crates/scyllaconn/src/events2/prepare.rs @@ -0,0 +1,238 @@ +use err::thiserror; +use err::ThisError; +use netpod::ttl::RetentionTime; +use scylla::prepared_statement::PreparedStatement; +use scylla::Session; + +#[derive(Debug, ThisError)] +pub enum Error { + ScyllaQuery(#[from] scylla::transport::errors::QueryError), + ScyllaNextRow(#[from] scylla::transport::iterator::NextRowError), + ScyllaTypeConv(#[from] scylla::cql_to_rust::FromRowError), + ScyllaWorker(Box), + MissingQuery(String), + RangeEndOverflow, + InvalidFuture, + TestError(String), +} + +#[derive(Debug)] +pub struct StmtsLspShape { + u8: PreparedStatement, + u16: PreparedStatement, + u32: PreparedStatement, + u64: PreparedStatement, + i8: PreparedStatement, + i16: PreparedStatement, + i32: PreparedStatement, + i64: PreparedStatement, + f32: PreparedStatement, + f64: PreparedStatement, + bool: PreparedStatement, + string: PreparedStatement, +} + +impl StmtsLspShape { + pub fn st(&self, stname: &str) -> Result<&PreparedStatement, Error> { + let ret = match stname { + "u8" => &self.u8, + "u16" => &self.u16, + "u32" => &self.u32, + "u64" => &self.u64, + "i8" => &self.i8, + "i16" => &self.i16, + "i32" => &self.i32, + "i64" => &self.i64, + "f32" => &self.f32, + "f64" => &self.f64, + "bool" => &self.bool, + "string" => &self.string, + _ => return Err(Error::MissingQuery(format!("no query for stname {stname}"))), + }; + Ok(ret) + } +} + +#[derive(Debug)] +pub struct StmtsLspDir { + scalar: StmtsLspShape, + array: StmtsLspShape, +} + +impl StmtsLspDir { + pub fn shape(&self, array: bool) -> &StmtsLspShape { + if array { + &self.array + } else { + &self.scalar + } + } +} + +#[derive(Debug)] +pub struct StmtsEventsRt { + ts_msp_fwd: PreparedStatement, + ts_msp_bck: PreparedStatement, + lsp_fwd_val: StmtsLspDir, + lsp_bck_val: StmtsLspDir, + lsp_fwd_ts: StmtsLspDir, + lsp_bck_ts: StmtsLspDir, +} + +impl StmtsEventsRt { + pub fn ts_msp_fwd(&self) -> &PreparedStatement { + &self.ts_msp_fwd + } + + pub fn ts_msp_bck(&self) -> &PreparedStatement { + &self.ts_msp_bck + } + + pub fn lsp(&self, bck: bool, val: bool) -> &StmtsLspDir { + if bck { + if val { + &self.lsp_bck_val + } else { + &self.lsp_bck_ts + } + } else { + if val { + &self.lsp_fwd_val + } else { + &self.lsp_fwd_ts + } + } + } +} + +async fn make_msp_dir(ks: &str, rt: &RetentionTime, bck: bool, scy: &Session) -> Result { + let table_name = "ts_msp"; + let select_cond = if bck { + "ts_msp < ? order by ts_msp desc limit 2" + } else { + "ts_msp >= ? and ts_msp < ?" + }; + let cql = format!( + "select ts_msp from {}.{}{} where series = ? and {}", + ks, + rt.table_prefix(), + table_name, + select_cond + ); + let qu = scy.prepare(cql).await?; + Ok(qu) +} + +async fn make_lsp( + ks: &str, + rt: &RetentionTime, + shapepre: &str, + stname: &str, + values: &str, + bck: bool, + scy: &Session, +) -> Result { + let select_cond = if bck { + "ts_lsp < ? order by ts_lsp desc limit 1" + } else { + "ts_lsp >= ? and ts_lsp < ?" + }; + let cql = format!( + concat!( + "select {} from {}.{}events_{}_{}", + " where series = ? and ts_msp = ? and {}" + ), + values, + ks, + rt.table_prefix(), + shapepre, + stname, + select_cond + ); + let qu = scy.prepare(cql).await?; + Ok(qu) +} + +async fn make_lsp_shape( + ks: &str, + rt: &RetentionTime, + shapepre: &str, + values: &str, + bck: bool, + scy: &Session, +) -> Result { + let values = if shapepre.contains("array") { + values.replace("value", "valueblob") + } else { + values.into() + }; + let values = &values; + let maker = |stname| make_lsp(ks, rt, shapepre, stname, values, bck, scy); + let ret = StmtsLspShape { + u8: maker("u8").await?, + u16: maker("u16").await?, + u32: maker("u32").await?, + u64: maker("u64").await?, + i8: maker("i8").await?, + i16: maker("i16").await?, + i32: maker("i32").await?, + i64: maker("i64").await?, + f32: maker("f32").await?, + f64: maker("f64").await?, + bool: maker("bool").await?, + string: maker("string").await?, + }; + Ok(ret) +} + +async fn make_lsp_dir( + ks: &str, + rt: &RetentionTime, + values: &str, + bck: bool, + scy: &Session, +) -> Result { + let ret = StmtsLspDir { + scalar: make_lsp_shape(ks, rt, "scalar", values, bck, scy).await?, + array: make_lsp_shape(ks, rt, "array", values, bck, scy).await?, + }; + Ok(ret) +} + +async fn make_rt(ks: &str, rt: &RetentionTime, scy: &Session) -> Result { + let ret = StmtsEventsRt { + ts_msp_fwd: make_msp_dir(ks, rt, false, scy).await?, + ts_msp_bck: make_msp_dir(ks, rt, true, scy).await?, + lsp_fwd_val: make_lsp_dir(ks, rt, "ts_lsp, pulse, value", false, scy).await?, + lsp_bck_val: make_lsp_dir(ks, rt, "ts_lsp, pulse, value", true, scy).await?, + lsp_fwd_ts: make_lsp_dir(ks, rt, "ts_lsp, pulse", false, scy).await?, + lsp_bck_ts: make_lsp_dir(ks, rt, "ts_lsp, pulse", true, scy).await?, + }; + Ok(ret) +} + +#[derive(Debug)] +pub struct StmtsEvents { + st: StmtsEventsRt, + mt: StmtsEventsRt, + lt: StmtsEventsRt, +} + +impl StmtsEvents { + pub async fn new(ks: [&str; 3], scy: &Session) -> Result { + let ret = StmtsEvents { + st: make_rt(ks[0], &RetentionTime::Short, scy).await?, + mt: make_rt(ks[1], &RetentionTime::Medium, scy).await?, + lt: make_rt(ks[2], &RetentionTime::Long, scy).await?, + }; + Ok(ret) + } + + pub fn rt(&self, rt: &RetentionTime) -> &StmtsEventsRt { + match rt { + RetentionTime::Short => &self.st, + RetentionTime::Medium => &self.mt, + RetentionTime::Long => &&self.lt, + } + } +} diff --git a/crates/scyllaconn/src/scyllaconn.rs b/crates/scyllaconn/src/scyllaconn.rs index b50e9ec..a5c1763 100644 --- a/crates/scyllaconn/src/scyllaconn.rs +++ b/crates/scyllaconn/src/scyllaconn.rs @@ -10,3 +10,12 @@ pub mod worker; pub use scylla; pub use series::SeriesId; + +pub async fn test_log() { + use netpod::log::*; + error!("------"); + warn!("------"); + info!("------"); + debug!("------"); + trace!("------"); +} diff --git a/crates/scyllaconn/src/worker.rs b/crates/scyllaconn/src/worker.rs index 8c1f1bb..99d236d 100644 --- a/crates/scyllaconn/src/worker.rs +++ b/crates/scyllaconn/src/worker.rs @@ -1,5 +1,5 @@ use crate::conn::create_scy_session_no_ks; -use crate::events::StmtsEvents; +use crate::events2::prepare::StmtsEvents; use crate::range::ScyllaSeriesRange; use async_channel::Receiver; use async_channel::Sender; @@ -19,8 +19,11 @@ use std::sync::Arc; #[derive(Debug, ThisError)] pub enum Error { + #[error("ScyllaConnection({0})")] ScyllaConnection(err::Error), + Prepare(#[from] crate::events2::prepare::Error), EventsQuery(#[from] crate::events::Error), + Msp(#[from] crate::events2::msp::Error), ChannelSend, ChannelRecv, Join, @@ -145,7 +148,7 @@ impl ScyllaWorker { }; match job { Job::FindTsMsp(rt, series, range, bck, tx) => { - let res = crate::events::find_ts_msp(&rt, series, range, bck, &stmts, &scy).await; + let res = crate::events2::msp::find_ts_msp(&rt, series, range, bck, &stmts, &scy).await; if tx.send(res.map_err(Into::into)).await.is_err() { // TODO count for stats } diff --git a/crates/streams/Cargo.toml b/crates/streams/Cargo.toml index 89fa60b..18392b7 100644 --- a/crates/streams/Cargo.toml +++ b/crates/streams/Cargo.toml @@ -14,7 +14,7 @@ serde_json = "1.0" serde_cbor = "0.11.1" typetag = "0.2.14" ciborium = "0.2.1" -bytes = "1.3" +bytes = "1.6" arrayref = "0.3.6" crc32fast = "1.3.2" byteorder = "1.4.3" diff --git a/crates/streams/src/framed_bytes.rs b/crates/streams/src/framed_bytes.rs new file mode 100644 index 0000000..dbe43d5 --- /dev/null +++ b/crates/streams/src/framed_bytes.rs @@ -0,0 +1,136 @@ +use bytes::Buf; +use bytes::BufMut; +use bytes::Bytes; +use bytes::BytesMut; +use err::thiserror; +use err::ThisError; +use futures_util::Stream; +use futures_util::StreamExt; +use netpod::log::*; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +const FRAME_HEAD_LEN: usize = 16; +const FRAME_PAYLOAD_MAX: u32 = 1024 * 1024 * 8; +const BUF_MAX: usize = (FRAME_HEAD_LEN + FRAME_PAYLOAD_MAX as usize) * 2; + +#[allow(unused)] +macro_rules! trace_parse { + ($($arg:tt)*) => { + if false { + trace!($($arg)*); + } + }; +} + +#[derive(Debug, ThisError)] +pub enum Error { + FrameTooLarge, + Logic, +} + +pub type BoxedFramedBytesStream = Pin> + Send>>; + +// TODO move this type decl because it is not specific to cbor +pub type SitemtyFramedBytesStream = Pin> + Send>>; + +pub enum State { + Reading, + Done, +} + +pub struct FramedBytesStream { + inp: S, + buf: BytesMut, + state: State, +} + +impl FramedBytesStream +where + S: Stream> + Unpin, + E: Into, +{ + pub fn new(inp: S) -> Self { + Self { + inp, + buf: BytesMut::with_capacity(1024 * 256), + state: State::Reading, + } + } + + fn try_parse(&mut self) -> Result, Error> { + trace_parse!("try_parse self.buf.len() {}", self.buf.len()); + if self.buf.len() < FRAME_HEAD_LEN { + return Ok(None); + } + let n = u32::from_le_bytes(self.buf[..4].try_into().map_err(|_| Error::Logic)?); + trace_parse!("try_parse n {}", n); + if n > FRAME_PAYLOAD_MAX { + let e = Error::FrameTooLarge; + return Err(e); + } + let frame_len = FRAME_HEAD_LEN + n as usize; + trace_parse!("try_parse frame_len {}", frame_len); + assert!(self.buf.len() <= self.buf.capacity()); + if self.buf.capacity() < frame_len { + let add_max = BUF_MAX - self.buf.capacity().min(BUF_MAX); + let nadd = ((frame_len.min(FRAME_PAYLOAD_MAX as usize) - self.buf.len()) * 2).min(add_max); + self.buf.reserve(nadd); + } + let adv = (frame_len + 7) / 8 * 8; + trace_parse!("try_parse adv {}", adv); + if self.buf.len() < adv { + Ok(None) + } else { + self.buf.advance(FRAME_HEAD_LEN); + let buf = self.buf.split_to(n as usize); + self.buf.advance(adv - frame_len); + Ok(Some(buf.freeze())) + } + } +} + +impl Stream for FramedBytesStream +where + S: Stream> + Unpin, + E: Into, +{ + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + loop { + break match &self.state { + State::Reading => match self.try_parse() { + Ok(Some(x)) => Ready(Some(Ok(x))), + Ok(None) => match self.inp.poll_next_unpin(cx) { + Ready(Some(x)) => match x { + Ok(x) => { + self.buf.put_slice(&x); + continue; + } + Err(e) => { + self.state = State::Done; + Ready(Some(Err(e.into()))) + } + }, + Ready(None) => { + if self.buf.len() > 0 { + warn!("remaining bytes in input buffer, input closed len {}", self.buf.len()); + } + self.state = State::Done; + Ready(None) + } + Pending => Pending, + }, + Err(e) => { + self.state = State::Done; + Ready(Some(Err(e))) + } + }, + State::Done => Ready(None), + }; + } + } +} diff --git a/crates/streams/src/instrument.rs b/crates/streams/src/instrument.rs new file mode 100644 index 0000000..28c7bf6 --- /dev/null +++ b/crates/streams/src/instrument.rs @@ -0,0 +1,33 @@ +use futures_util::Stream; +use futures_util::StreamExt; +use netpod::log::tracing; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +#[pin_project::pin_project] +pub struct InstrumentStream { + #[pin] + inp: S, + #[pin] + span: tracing::Span, +} + +impl InstrumentStream { + pub fn new(inp: S, span: tracing::Span) -> Self { + Self { inp, span } + } +} + +impl Stream for InstrumentStream +where + S: Stream, +{ + type Item = ::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let mut this = self.project(); + let _spg = this.span.enter(); + this.inp.poll_next_unpin(cx) + } +} diff --git a/crates/streams/src/lib.rs b/crates/streams/src/lib.rs index 1449949..5dc067d 100644 --- a/crates/streams/src/lib.rs +++ b/crates/streams/src/lib.rs @@ -4,8 +4,10 @@ pub mod collect; pub mod dtflags; pub mod filechunkread; pub mod firsterr; +pub mod framed_bytes; pub mod frames; pub mod generators; +pub mod instrument; pub mod itemclone; pub mod json_stream; pub mod lenframed; diff --git a/crates/taskrun/src/taskrun.rs b/crates/taskrun/src/taskrun.rs index 4a20afc..aebe991 100644 --- a/crates/taskrun/src/taskrun.rs +++ b/crates/taskrun/src/taskrun.rs @@ -159,7 +159,7 @@ fn tracing_init_inner(mode: TracingMode) -> Result<(), Error> { console_subscriber::init(); } else { // Logging setup - let filter = tracing_subscriber::EnvFilter::builder() + let filter_1 = tracing_subscriber::EnvFilter::builder() .with_default_directive(tracing::metadata::LevelFilter::INFO.into()) .from_env() .map_err(|e| Error::with_msg_no_trace(format!("can not build tracing env filter {e}")))?; @@ -168,9 +168,9 @@ fn tracing_init_inner(mode: TracingMode) -> Result<(), Error> { .with_default_directive(tracing::metadata::LevelFilter::INFO.into()) .from_env() .map_err(|e| Error::with_msg_no_trace(format!("can not build tracing env filter {e}")))?; - let filter_3 = tracing_subscriber::filter::dynamic_filter_fn(|meta, ctx| { + /*let filter_3 = tracing_subscriber::filter::dynamic_filter_fn(|meta, ctx| { if true { - return true; + return false; } if *meta.level() <= tracing::Level::TRACE { if ["httpret", "scyllaconn"].contains(&meta.target()) { @@ -207,7 +207,7 @@ fn tracing_init_inner(mode: TracingMode) -> Result<(), Error> { } else { true } - }); + });*/ let fmt_layer = tracing_subscriber::fmt::Layer::new() .with_writer(io::stderr) .with_timer(timer) @@ -215,13 +215,22 @@ fn tracing_init_inner(mode: TracingMode) -> Result<(), Error> { .with_ansi(false) .with_thread_names(true) .event_format(formatter::FormatTxt) - .with_filter(filter_3) + // .with_filter(filter_3) .with_filter(filter_2) - .with_filter(filter) - // .and_then(LogFilterLayer::new("lay1".into())) - // .and_then(LogFilterLayer::new("lay2".into())) - ; + .with_filter(filter_1) + // let fmt_layer = fmt_layer.with_filter(filter_3); + // let fmt_layer: Box> = if std::env::var("RUST_LOG_USE_2").is_ok() { + // let a = fmt_layer.with_filter(filter_2); + // Box::new(a) + // } else { + // let a = fmt_layer; + // Box::new(a) + // }; + // let fmt_layer = fmt_layer.with_filter(filter_1); + // .and_then(LogFilterLayer::new("lay1".into())) + // .and_then(LogFilterLayer::new("lay2".into())) // let layer_2 = LogFilterLayer::new("lay1".into(), fmt_layer); + ; let reg = tracing_subscriber::registry();