From 1efef86ae0d9fbdd010df1d60209daa86a7759ff Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 21 Jun 2024 16:25:02 +0200 Subject: [PATCH] Move code --- crates/daqbuffer/src/bin/daqbuffer.rs | 8 +- crates/httpret/src/api4/eventdata.rs | 14 +- crates/httpret/src/api4/events.rs | 15 +- crates/items_2/src/eventsdim0.rs | 17 ++ crates/items_2/src/eventsdim1.rs | 17 ++ crates/netpod/src/netpod.rs | 2 +- crates/netpod/src/ttl.rs | 48 +++- crates/nodenet/src/conn.rs | 1 + crates/nodenet/src/scylla.rs | 42 ++-- crates/query/src/api4/events.rs | 37 +-- crates/scyllaconn/src/events.rs | 275 +---------------------- crates/scyllaconn/src/events2.rs | 1 + crates/scyllaconn/src/events2/events.rs | 81 +++++-- crates/scyllaconn/src/events2/msp.rs | 74 +++++- crates/scyllaconn/src/events2/prepare.rs | 238 ++++++++++++++++++++ crates/scyllaconn/src/worker.rs | 7 +- crates/streams/Cargo.toml | 2 +- crates/streams/src/framed_bytes.rs | 136 +++++++++++ crates/streams/src/instrument.rs | 33 +++ crates/streams/src/lib.rs | 2 + 20 files changed, 696 insertions(+), 354 deletions(-) create mode 100644 crates/scyllaconn/src/events2/prepare.rs create mode 100644 crates/streams/src/framed_bytes.rs create mode 100644 crates/streams/src/instrument.rs diff --git a/crates/daqbuffer/src/bin/daqbuffer.rs b/crates/daqbuffer/src/bin/daqbuffer.rs index 1fa7a0d..2c846b9 100644 --- a/crates/daqbuffer/src/bin/daqbuffer.rs +++ b/crates/daqbuffer/src/bin/daqbuffer.rs @@ -56,6 +56,7 @@ fn parse_ts(s: &str) -> Result, Error> { } async fn go() -> Result<(), Error> { + let buildmark = "+0008"; let opts = Opts::parse(); let service_version = ServiceVersion { major: std::env!("CARGO_PKG_VERSION_MAJOR").parse().unwrap_or(0), @@ -65,8 +66,8 @@ async fn go() -> Result<(), Error> { }; match opts.subcmd { SubCmd::Retrieval(subcmd) => { - info!("daqbuffer version {} +0007", clap::crate_version!()); - info!(" service_version {}", service_version); + info!("daqbuffer version {} {} retrieval", clap::crate_version!(), buildmark); + info!(" service_version {} {} retrieval", service_version, buildmark); if false { #[allow(non_snake_case)] let TARGET = std::env!("DAQBUF_TARGET"); @@ -96,7 +97,8 @@ async fn go() -> Result<(), Error> { } } SubCmd::Proxy(subcmd) => { - info!("daqbuffer proxy {}", clap::crate_version!()); + info!("daqbuffer version {} {} proxy", clap::crate_version!(), buildmark); + info!(" service_version {} {} proxy", service_version, buildmark); let mut config_file = File::open(&subcmd.config).await?; let mut buf = Vec::new(); config_file.read_to_end(&mut buf).await?; diff --git a/crates/httpret/src/api4/eventdata.rs b/crates/httpret/src/api4/eventdata.rs index f169f1f..8f6c35c 100644 --- a/crates/httpret/src/api4/eventdata.rs +++ b/crates/httpret/src/api4/eventdata.rs @@ -6,6 +6,7 @@ use err::thiserror; use err::PublicError; use err::ThisError; use err::ToPublicError; +use futures_util::Stream; use http::Method; use http::StatusCode; use httpclient::body_empty; @@ -16,7 +17,7 @@ use httpclient::StreamResponse; use netpod::log::*; use netpod::NodeConfigCached; use std::sync::Arc; -use tracing::Instrument; +use streams::instrument::InstrumentStream; #[derive(Debug, ThisError)] pub enum EventDataError { @@ -84,12 +85,9 @@ impl EventDataHandler { let frames = nodenet::conn::events_get_input_frames(inp) .await .map_err(|_| EventDataError::InternalError)?; - info!("start parse"); let (evsubq,) = nodenet::conn::events_parse_input_query(frames).map_err(|_| EventDataError::QueryParse)?; - info!("done parse"); - let logspan = if false { - tracing::Span::none() - } else if evsubq.log_level() == "trace" { + info!("{:?}", evsubq); + let logspan = if evsubq.log_level() == "trace" { trace!("enable trace for handler"); tracing::span!(tracing::Level::INFO, "log_span_trace") } else if evsubq.log_level() == "debug" { @@ -98,10 +96,12 @@ impl EventDataHandler { } else { tracing::Span::none() }; + use tracing::Instrument; let stream = nodenet::conn::create_response_bytes_stream(evsubq, shared_res.scyqueue.as_ref(), ncc) - .instrument(logspan) + .instrument(logspan.clone()) .await .map_err(|e| EventDataError::Error(Box::new(e)))?; + let stream = InstrumentStream::new(stream, logspan); let ret = response(StatusCode::OK) .body(body_stream(stream)) .map_err(|_| EventDataError::InternalError)?; diff --git a/crates/httpret/src/api4/events.rs b/crates/httpret/src/api4/events.rs index 46c3c13..7756b33 100644 --- a/crates/httpret/src/api4/events.rs +++ b/crates/httpret/src/api4/events.rs @@ -29,6 +29,7 @@ use netpod::NodeConfigCached; use netpod::ReqCtx; use nodenet::client::OpenBoxedBytesViaHttp; use query::api4::events::PlainEventsQuery; +use streams::instrument::InstrumentStream; use tracing::Instrument; pub struct EventsHandler {} @@ -57,9 +58,7 @@ impl EventsHandler { let evq = PlainEventsQuery::from_url(&url).map_err(|e| e.add_public_msg(format!("Can not understand query")))?; debug!("{self_name} evq {evq:?}"); - let logspan = if false { - tracing::Span::none() - } else if evq.log_level() == "trace" { + let logspan = if evq.log_level() == "trace" { trace!("enable trace for handler"); tracing::span!(tracing::Level::INFO, "log_span_trace") } else if evq.log_level() == "debug" { @@ -134,6 +133,16 @@ async fn plain_events_cbor_framed( } }) .filter(|x| if let Ok(x) = x { ready(x.len() > 0) } else { ready(true) }); + let logspan = if evq.log_level() == "trace" { + trace!("enable trace for handler"); + tracing::span!(tracing::Level::INFO, "log_span_trace") + } else if evq.log_level() == "debug" { + debug!("enable debug for handler"); + tracing::span!(tracing::Level::INFO, "log_span_debug") + } else { + tracing::Span::none() + }; + let stream = InstrumentStream::new(stream, logspan); let ret = response(StatusCode::OK).body(body_stream(stream))?; Ok(ret) } diff --git a/crates/items_2/src/eventsdim0.rs b/crates/items_2/src/eventsdim0.rs index 3a1bbd1..0b95187 100644 --- a/crates/items_2/src/eventsdim0.rs +++ b/crates/items_2/src/eventsdim0.rs @@ -74,6 +74,23 @@ macro_rules! trace2 { ($($arg:tt)*) => { trace!($($arg)*); }; } +#[derive(Clone, PartialEq, Serialize, Deserialize)] +pub struct EventsDim0NoPulse { + pub tss: VecDeque, + pub values: VecDeque, +} + +impl From> for EventsDim0 { + fn from(value: EventsDim0NoPulse) -> Self { + let pulses = vec![0; value.tss.len()].into(); + Self { + tss: value.tss, + pulses, + values: value.values, + } + } +} + #[derive(Clone, PartialEq, Serialize, Deserialize)] pub struct EventsDim0 { pub tss: VecDeque, diff --git a/crates/items_2/src/eventsdim1.rs b/crates/items_2/src/eventsdim1.rs index 802afb3..9e959e7 100644 --- a/crates/items_2/src/eventsdim1.rs +++ b/crates/items_2/src/eventsdim1.rs @@ -47,6 +47,23 @@ macro_rules! trace2 { ($($arg:tt)*) => (trace!($($arg)*)); } +#[derive(Clone, PartialEq, Serialize, Deserialize)] +pub struct EventsDim1NoPulse { + pub tss: VecDeque, + pub values: VecDeque>, +} + +impl From> for EventsDim1 { + fn from(value: EventsDim1NoPulse) -> Self { + let pulses = vec![0; value.tss.len()].into(); + Self { + tss: value.tss, + pulses, + values: value.values, + } + } +} + #[derive(Clone, PartialEq, Serialize, Deserialize)] pub struct EventsDim1 { pub tss: VecDeque, diff --git a/crates/netpod/src/netpod.rs b/crates/netpod/src/netpod.rs index 9abcfa9..8d01317 100644 --- a/crates/netpod/src/netpod.rs +++ b/crates/netpod/src/netpod.rs @@ -226,7 +226,7 @@ impl<'de> serde::de::Visitor<'de> for ScalarTypeVis { "bool" => ScalarType::BOOL, "string" => ScalarType::STRING, "enum" => ScalarType::Enum, - "channelstatus" => ScalarType::ChannelStatus, + "ChannelStatus" => ScalarType::ChannelStatus, k => return Err(E::custom(format!("can not understand variant {k:?}"))), }; Ok(ret) diff --git a/crates/netpod/src/ttl.rs b/crates/netpod/src/ttl.rs index f07bbbd..f715f72 100644 --- a/crates/netpod/src/ttl.rs +++ b/crates/netpod/src/ttl.rs @@ -1,6 +1,12 @@ +use core::fmt; +use err::thiserror; +use err::ThisError; +use serde::Deserialize; +use serde::Serialize; +use std::str::FromStr; use std::time::Duration; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub enum RetentionTime { Short, Medium, @@ -59,3 +65,43 @@ impl RetentionTime { self.ttl_events_d0() } } + +impl fmt::Display for RetentionTime { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + let s = match self { + RetentionTime::Short => "short", + RetentionTime::Medium => "medium", + RetentionTime::Long => "long", + }; + fmt.write_str(s) + } +} + +#[derive(Debug, ThisError)] +pub enum Error { + Parse, +} + +impl FromStr for RetentionTime { + type Err = Error; + + fn from_str(s: &str) -> Result { + let ret = match s { + "short" => Self::Short, + "medium" => Self::Medium, + "long" => Self::Long, + _ => return Err(Error::Parse), + }; + Ok(ret) + } +} + +// impl ToString for RetentionTime { +// fn to_string(&self) -> String { +// match self { +// RetentionTime::Short => "short".into(), +// RetentionTime::Medium => "medium".into(), +// RetentionTime::Long => "long".into(), +// } +// } +// } diff --git a/crates/nodenet/src/conn.rs b/crates/nodenet/src/conn.rs index 26ea1d3..dda1cc3 100644 --- a/crates/nodenet/src/conn.rs +++ b/crates/nodenet/src/conn.rs @@ -275,6 +275,7 @@ pub fn events_parse_input_query(frames: Vec) -> Result<(EventsSub }, Err(e) => return Err(e), }; + info!("parsing json {:?}", qitem.str()); let frame1: Frame1Parts = serde_json::from_str(&qitem.str()).map_err(|e| { let e = Error::with_msg_no_trace(format!("json parse error: {} inp {}", e, qitem.str())); error!("{e}"); diff --git a/crates/nodenet/src/scylla.rs b/crates/nodenet/src/scylla.rs index ca93919..5e0c14a 100644 --- a/crates/nodenet/src/scylla.rs +++ b/crates/nodenet/src/scylla.rs @@ -7,7 +7,6 @@ use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; use items_2::channelevents::ChannelEvents; use netpod::log::*; -use netpod::ttl::RetentionTime; use netpod::ChConf; use query::api4::events::EventsSubQuery; use scyllaconn::worker::ScyllaQueue; @@ -24,25 +23,15 @@ pub async fn scylla_channel_event_stream( // TODO why both in PlainEventsQuery and as separate parameter? Check other usages. // let do_one_before_range = evq.need_one_before_range(); let do_one_before_range = false; - let series = chconf.series(); + let series = SeriesId::new(chconf.series()); let scalar_type = chconf.scalar_type(); let shape = chconf.shape(); let do_test_stream_error = false; let with_values = evq.need_value_data(); - let stream: Pin + Send>> = if evq.use_all_rt() { - let x = scyllaconn::events2::mergert::MergeRts::new( - SeriesId::new(chconf.series()), - scalar_type.clone(), - shape.clone(), - evq.range().into(), - with_values, - scyqueue.clone(), - ); - Box::pin(x) - } else { + let stream: Pin + Send>> = if let Some(rt) = evq.use_rt() { let x = scyllaconn::events2::events::EventsStreamRt::new( - RetentionTime::Short, - SeriesId::new(chconf.series()), + rt, + series, scalar_type.clone(), shape.clone(), evq.range().into(), @@ -51,18 +40,17 @@ pub async fn scylla_channel_event_stream( ) .map_err(|e| scyllaconn::events2::mergert::Error::from(e)); Box::pin(x) + } else { + let x = scyllaconn::events2::mergert::MergeRts::new( + series, + scalar_type.clone(), + shape.clone(), + evq.range().into(), + with_values, + scyqueue.clone(), + ); + Box::pin(x) }; - /*let stream = scyllaconn::events::EventsStreamScylla::new( - RetentionTime::Short, - series, - evq.range().into(), - do_one_before_range, - scalar_type.clone(), - shape.clone(), - with_values, - scyqueue.clone(), - do_test_stream_error, - );*/ let stream = stream .map(move |item| match &item { Ok(k) => match k { @@ -96,7 +84,7 @@ pub async fn scylla_channel_event_stream( item } }, - Err(e) => Err(Error::with_msg_no_trace(format!("scyllaconn eevents error {e}"))), + Err(e) => Err(Error::with_msg_no_trace(format!("scyllaconn events error {e}"))), }; item }); diff --git a/crates/query/src/api4/events.rs b/crates/query/src/api4/events.rs index 25a8900..eda148c 100644 --- a/crates/query/src/api4/events.rs +++ b/crates/query/src/api4/events.rs @@ -7,6 +7,7 @@ use netpod::query::api1::Api1Query; use netpod::query::PulseRangeQuery; use netpod::query::TimeRangeQuery; use netpod::range::evrange::SeriesRange; +use netpod::ttl::RetentionTime; use netpod::AppendToUrl; use netpod::ByteSize; use netpod::ChannelTypeConfigGen; @@ -57,7 +58,7 @@ pub struct PlainEventsQuery { #[serde(default)] log_level: String, #[serde(default)] - use_all_rt: bool, + use_rt: Option, } impl PlainEventsQuery { @@ -83,7 +84,7 @@ impl PlainEventsQuery { merger_out_len_max: None, create_errors: Vec::new(), log_level: String::new(), - use_all_rt: false, + use_rt: None, } } @@ -210,8 +211,8 @@ impl PlainEventsQuery { &self.log_level } - pub fn use_all_rt(&self) -> bool { - self.use_all_rt + pub fn use_rt(&self) -> Option { + self.use_rt.clone() } } @@ -290,11 +291,11 @@ impl FromUrl for PlainEventsQuery { .map(|x| x.split(",").map(|x| x.to_string()).collect()) .unwrap_or(Vec::new()), log_level: pairs.get("log_level").map_or(String::new(), String::from), - use_all_rt: pairs - .get("useAllRt") - .map_or("false", |k| k) - .parse() - .map_err(|e| Error::with_public_msg_no_trace(format!("can not parse useAllRt: {}", e)))?, + use_rt: pairs.get("useRt").map_or(Ok(None), |k| { + k.parse() + .map(Some) + .map_err(|_| Error::with_public_msg_no_trace(format!("can not parse useRt: {}", k))) + })?, }; Ok(ret) } @@ -354,8 +355,8 @@ impl AppendToUrl for PlainEventsQuery { if self.log_level.len() != 0 { g.append_pair("log_level", &self.log_level); } - if self.use_all_rt { - g.append_pair("useAllRt", "true"); + if let Some(x) = self.use_rt.as_ref() { + g.append_pair("useRt", &x.to_string()); } } } @@ -400,7 +401,7 @@ pub struct EventsSubQuerySettings { buf_len_disk_io: Option, queue_len_disk_io: Option, create_errors: Vec, - use_all_rt: bool, + use_rt: Option, } impl Default for EventsSubQuerySettings { @@ -414,7 +415,7 @@ impl Default for EventsSubQuerySettings { buf_len_disk_io: None, queue_len_disk_io: None, create_errors: Vec::new(), - use_all_rt: true, + use_rt: None, } } } @@ -431,7 +432,7 @@ impl From<&PlainEventsQuery> for EventsSubQuerySettings { // TODO add to query queue_len_disk_io: None, create_errors: value.create_errors.clone(), - use_all_rt: value.use_all_rt(), + use_rt: value.use_rt(), } } } @@ -449,7 +450,7 @@ impl From<&BinnedQuery> for EventsSubQuerySettings { // TODO add to query queue_len_disk_io: None, create_errors: Vec::new(), - use_all_rt: true, + use_rt: None, } } } @@ -467,7 +468,7 @@ impl From<&Api1Query> for EventsSubQuerySettings { buf_len_disk_io: Some(disk_io_tune.read_buffer_len), queue_len_disk_io: Some(disk_io_tune.read_queue_len), create_errors: Vec::new(), - use_all_rt: false, + use_rt: None, } } } @@ -572,8 +573,8 @@ impl EventsSubQuery { &self.log_level } - pub fn use_all_rt(&self) -> bool { - self.settings.use_all_rt + pub fn use_rt(&self) -> Option { + self.settings.use_rt.clone() } } diff --git a/crates/scyllaconn/src/events.rs b/crates/scyllaconn/src/events.rs index f9e73b4..b6bb760 100644 --- a/crates/scyllaconn/src/events.rs +++ b/crates/scyllaconn/src/events.rs @@ -1,3 +1,4 @@ +use crate::events2::prepare::StmtsEvents; use crate::range::ScyllaSeriesRange; use crate::worker::ScyllaQueue; use err::thiserror; @@ -22,7 +23,6 @@ use netpod::Shape; use netpod::TsMs; use netpod::TsNano; use scylla::frame::response::result::Row; -use scylla::prepared_statement::PreparedStatement; use scylla::Session; use series::SeriesId; use std::collections::VecDeque; @@ -34,6 +34,7 @@ use std::task::Poll; #[derive(Debug, ThisError)] pub enum Error { + Prepare(#[from] crate::events2::prepare::Error), ScyllaQuery(#[from] scylla::transport::errors::QueryError), ScyllaNextRow(#[from] scylla::transport::iterator::NextRowError), ScyllaTypeConv(#[from] scylla::cql_to_rust::FromRowError), @@ -50,278 +51,6 @@ impl From for Error { } } -#[derive(Debug)] -pub struct StmtsLspShape { - u8: PreparedStatement, - u16: PreparedStatement, - u32: PreparedStatement, - u64: PreparedStatement, - i8: PreparedStatement, - i16: PreparedStatement, - i32: PreparedStatement, - i64: PreparedStatement, - f32: PreparedStatement, - f64: PreparedStatement, - bool: PreparedStatement, - string: PreparedStatement, -} - -impl StmtsLspShape { - fn st(&self, stname: &str) -> Result<&PreparedStatement, Error> { - let ret = match stname { - "u8" => &self.u8, - "u16" => &self.u16, - "u32" => &self.u32, - "u64" => &self.u64, - "i8" => &self.i8, - "i16" => &self.i16, - "i32" => &self.i32, - "i64" => &self.i64, - "f32" => &self.f32, - "f64" => &self.f64, - "bool" => &self.bool, - "string" => &self.string, - _ => return Err(Error::MissingQuery(format!("no query for stname {stname}"))), - }; - Ok(ret) - } -} - -#[derive(Debug)] -pub struct StmtsLspDir { - scalar: StmtsLspShape, - array: StmtsLspShape, -} - -impl StmtsLspDir { - fn shape(&self, array: bool) -> &StmtsLspShape { - if array { - &self.array - } else { - &self.scalar - } - } -} - -#[derive(Debug)] -pub struct StmtsEventsRt { - ts_msp_fwd: PreparedStatement, - ts_msp_bck: PreparedStatement, - lsp_fwd_val: StmtsLspDir, - lsp_bck_val: StmtsLspDir, - lsp_fwd_ts: StmtsLspDir, - lsp_bck_ts: StmtsLspDir, -} - -impl StmtsEventsRt { - fn lsp(&self, bck: bool, val: bool) -> &StmtsLspDir { - if bck { - if val { - &self.lsp_bck_val - } else { - &self.lsp_bck_ts - } - } else { - if val { - &self.lsp_fwd_val - } else { - &self.lsp_fwd_ts - } - } - } -} - -#[derive(Debug)] -pub struct StmtsEvents { - st: StmtsEventsRt, - mt: StmtsEventsRt, - lt: StmtsEventsRt, -} - -async fn make_msp_dir(ks: &str, rt: &RetentionTime, bck: bool, scy: &Session) -> Result { - let table_name = "ts_msp"; - let select_cond = if bck { - "ts_msp < ? order by ts_msp desc limit 2" - } else { - "ts_msp >= ? and ts_msp < ?" - }; - let cql = format!( - "select ts_msp from {}.{}{} where series = ? and {}", - ks, - rt.table_prefix(), - table_name, - select_cond - ); - let qu = scy.prepare(cql).await?; - Ok(qu) -} - -async fn make_lsp( - ks: &str, - rt: &RetentionTime, - shapepre: &str, - stname: &str, - values: &str, - bck: bool, - scy: &Session, -) -> Result { - let select_cond = if bck { - "ts_lsp < ? order by ts_lsp desc limit 1" - } else { - "ts_lsp >= ? and ts_lsp < ?" - }; - let cql = format!( - concat!( - "select {} from {}.{}events_{}_{}", - " where series = ? and ts_msp = ? and {}" - ), - values, - ks, - rt.table_prefix(), - shapepre, - stname, - select_cond - ); - let qu = scy.prepare(cql).await?; - Ok(qu) -} - -async fn make_lsp_shape( - ks: &str, - rt: &RetentionTime, - shapepre: &str, - values: &str, - bck: bool, - scy: &Session, -) -> Result { - let values = if shapepre.contains("array") { - values.replace("value", "valueblob") - } else { - values.into() - }; - let values = &values; - let maker = |stname| make_lsp(ks, rt, shapepre, stname, values, bck, scy); - let ret = StmtsLspShape { - u8: maker("u8").await?, - u16: maker("u16").await?, - u32: maker("u32").await?, - u64: maker("u64").await?, - i8: maker("i8").await?, - i16: maker("i16").await?, - i32: maker("i32").await?, - i64: maker("i64").await?, - f32: maker("f32").await?, - f64: maker("f64").await?, - bool: maker("bool").await?, - string: maker("string").await?, - }; - Ok(ret) -} - -async fn make_lsp_dir( - ks: &str, - rt: &RetentionTime, - values: &str, - bck: bool, - scy: &Session, -) -> Result { - let ret = StmtsLspDir { - scalar: make_lsp_shape(ks, rt, "scalar", values, bck, scy).await?, - array: make_lsp_shape(ks, rt, "array", values, bck, scy).await?, - }; - Ok(ret) -} - -async fn make_rt(ks: &str, rt: &RetentionTime, scy: &Session) -> Result { - let ret = StmtsEventsRt { - ts_msp_fwd: make_msp_dir(ks, rt, false, scy).await?, - ts_msp_bck: make_msp_dir(ks, rt, true, scy).await?, - lsp_fwd_val: make_lsp_dir(ks, rt, "ts_lsp, pulse, value", false, scy).await?, - lsp_bck_val: make_lsp_dir(ks, rt, "ts_lsp, pulse, value", true, scy).await?, - lsp_fwd_ts: make_lsp_dir(ks, rt, "ts_lsp, pulse", false, scy).await?, - lsp_bck_ts: make_lsp_dir(ks, rt, "ts_lsp, pulse", true, scy).await?, - }; - Ok(ret) -} - -impl StmtsEvents { - pub(super) async fn new(ks: [&str; 3], scy: &Session) -> Result { - let ret = StmtsEvents { - st: make_rt(ks[0], &RetentionTime::Short, scy).await?, - mt: make_rt(ks[1], &RetentionTime::Medium, scy).await?, - lt: make_rt(ks[2], &RetentionTime::Long, scy).await?, - }; - Ok(ret) - } - - fn rt(&self, rt: &RetentionTime) -> &StmtsEventsRt { - match rt { - RetentionTime::Short => &self.st, - RetentionTime::Medium => &self.mt, - RetentionTime::Long => &&self.lt, - } - } -} - -pub(super) async fn find_ts_msp( - rt: &RetentionTime, - series: u64, - range: ScyllaSeriesRange, - bck: bool, - stmts: &StmtsEvents, - scy: &Session, -) -> Result, Error> { - trace!("find_ts_msp series {:?} {:?} {:?} bck {}", rt, series, range, bck); - if bck { - find_ts_msp_bck(rt, series, range, stmts, scy).await - } else { - find_ts_msp_fwd(rt, series, range, stmts, scy).await - } -} - -async fn find_ts_msp_fwd( - rt: &RetentionTime, - series: u64, - range: ScyllaSeriesRange, - stmts: &StmtsEvents, - scy: &Session, -) -> Result, Error> { - let mut ret = VecDeque::new(); - // TODO time range truncation can be handled better - let params = (series as i64, range.beg().ms() as i64, 1 + range.end().ms() as i64); - let mut res = scy - .execute_iter(stmts.rt(rt).ts_msp_fwd.clone(), params) - .await? - .into_typed::<(i64,)>(); - while let Some(x) = res.next().await { - let row = x?; - let ts = TsMs::from_ms_u64(row.0 as u64); - ret.push_back(ts); - } - Ok(ret) -} - -async fn find_ts_msp_bck( - rt: &RetentionTime, - series: u64, - range: ScyllaSeriesRange, - stmts: &StmtsEvents, - scy: &Session, -) -> Result, Error> { - let mut ret = VecDeque::new(); - let params = (series as i64, range.beg().ms() as i64); - let mut res = scy - .execute_iter(stmts.rt(rt).ts_msp_bck.clone(), params) - .await? - .into_typed::<(i64,)>(); - while let Some(x) = res.next().await { - let row = x?; - let ts = TsMs::from_ms_u64(row.0 as u64); - ret.push_front(ts); - } - Ok(ret) -} - pub(super) trait ValTy: Sized + 'static { type ScaTy: ScalarOps + std::default::Default; type ScyTy: scylla::cql_to_rust::FromCqlVal; diff --git a/crates/scyllaconn/src/events2.rs b/crates/scyllaconn/src/events2.rs index be623b6..ea58344 100644 --- a/crates/scyllaconn/src/events2.rs +++ b/crates/scyllaconn/src/events2.rs @@ -3,3 +3,4 @@ pub mod firstbefore; pub mod mergert; pub mod msp; pub mod nonempty; +pub mod prepare; diff --git a/crates/scyllaconn/src/events2/events.rs b/crates/scyllaconn/src/events2/events.rs index f20077b..6132502 100644 --- a/crates/scyllaconn/src/events2/events.rs +++ b/crates/scyllaconn/src/events2/events.rs @@ -22,12 +22,35 @@ use std::pin::Pin; use std::task::Context; use std::task::Poll; +#[allow(unused)] +macro_rules! trace_emit { + ($($arg:tt)*) => { + if true { + trace!($($arg)*); + } + }; +} + +#[allow(unused)] +macro_rules! warn_item { + ($($arg:tt)*) => { + if true { + debug!($($arg)*); + } + }; +} + #[derive(Debug, ThisError)] pub enum Error { Worker(#[from] crate::worker::Error), Events(#[from] crate::events::Error), Msp(#[from] crate::events2::msp::Error), + Unordered, + OutOfRange, + BadBatch, Logic, + Merge(#[from] items_0::MergeError), + TruncateLogic, } struct FetchMsp { @@ -180,51 +203,75 @@ impl Stream for EventsStreamRt { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; loop { - if let Some(item) = self.out.pop_front() { + if let Some(mut item) = self.out.pop_front() { if !item.verify() { - debug!("{}bad item {:?}", "\n\n--------------------------\n", item); + warn_item!("{}bad item {:?}", "\n\n--------------------------\n", item); self.state = State::Done; - break Ready(Some(Err(Error::Logic))); + break Ready(Some(Err(Error::BadBatch))); } if let Some(item_min) = item.ts_min() { if item_min < self.range.beg().ns() { - debug!( + warn_item!( "{}out of range error A {} {:?}", - "\n\n--------------------------\n", item_min, self.range + "\n\n--------------------------\n", + item_min, + self.range ); self.state = State::Done; - break Ready(Some(Err(Error::Logic))); + break Ready(Some(Err(Error::OutOfRange))); } if item_min < self.ts_seen_max { - debug!( + warn_item!( "{}ordering error A {} {}", - "\n\n--------------------------\n", item_min, self.ts_seen_max + "\n\n--------------------------\n", + item_min, + self.ts_seen_max ); - self.state = State::Done; - break Ready(Some(Err(Error::Logic))); + let mut r = items_2::merger::Mergeable::new_empty(&item); + match items_2::merger::Mergeable::find_highest_index_lt(&item, self.ts_seen_max) { + Some(ix) => { + match items_2::merger::Mergeable::drain_into(&mut item, &mut r, (0, ix)) { + Ok(()) => {} + Err(e) => { + self.state = State::Done; + break Ready(Some(Err(e.into()))); + } + } + // self.state = State::Done; + // break Ready(Some(Err(Error::Unordered))); + } + None => { + self.state = State::Done; + break Ready(Some(Err(Error::TruncateLogic))); + } + } } } if let Some(item_max) = item.ts_max() { if item_max >= self.range.end().ns() { - debug!( + warn_item!( "{}out of range error B {} {:?}", - "\n\n--------------------------\n", item_max, self.range + "\n\n--------------------------\n", + item_max, + self.range ); self.state = State::Done; - break Ready(Some(Err(Error::Logic))); + break Ready(Some(Err(Error::OutOfRange))); } if item_max < self.ts_seen_max { - debug!( + warn_item!( "{}ordering error B {} {}", - "\n\n--------------------------\n", item_max, self.ts_seen_max + "\n\n--------------------------\n", + item_max, + self.ts_seen_max ); self.state = State::Done; - break Ready(Some(Err(Error::Logic))); + break Ready(Some(Err(Error::Unordered))); } else { self.ts_seen_max = item_max; } } - debug!("deliver item {}", item.output_info()); + trace_emit!("deliver item {}", item.output_info()); break Ready(Some(Ok(ChannelEvents::Events(item)))); } break match &mut self.state { diff --git a/crates/scyllaconn/src/events2/msp.rs b/crates/scyllaconn/src/events2/msp.rs index 0da7866..89b02e2 100644 --- a/crates/scyllaconn/src/events2/msp.rs +++ b/crates/scyllaconn/src/events2/msp.rs @@ -1,3 +1,4 @@ +use super::prepare::StmtsEvents; use crate::range::ScyllaSeriesRange; use crate::worker::ScyllaQueue; use err::thiserror; @@ -5,8 +6,11 @@ use err::ThisError; use futures_util::Future; use futures_util::FutureExt; use futures_util::Stream; +use futures_util::StreamExt; +use netpod::log::*; use netpod::ttl::RetentionTime; use netpod::TsMs; +use scylla::Session; use series::SeriesId; use std::collections::VecDeque; use std::pin::Pin; @@ -15,8 +19,17 @@ use std::task::Poll; #[derive(Debug, ThisError)] pub enum Error { - Worker(#[from] crate::worker::Error), Logic, + #[error("Worker({0})")] + Worker(Box), + ScyllaQuery(#[from] scylla::transport::errors::QueryError), + ScyllaRow(#[from] scylla::transport::iterator::NextRowError), +} + +impl From for Error { + fn from(value: crate::worker::Error) -> Self { + Self::Worker(Box::new(value)) + } } enum Resolvable @@ -180,3 +193,62 @@ fn trait_assert_try() { fn phantomval() -> T { panic!() } + +pub async fn find_ts_msp( + rt: &RetentionTime, + series: u64, + range: ScyllaSeriesRange, + bck: bool, + stmts: &StmtsEvents, + scy: &Session, +) -> Result, Error> { + trace!("find_ts_msp series {:?} {:?} {:?} bck {}", rt, series, range, bck); + if bck { + find_ts_msp_bck(rt, series, range, stmts, scy).await + } else { + find_ts_msp_fwd(rt, series, range, stmts, scy).await + } +} + +async fn find_ts_msp_fwd( + rt: &RetentionTime, + series: u64, + range: ScyllaSeriesRange, + stmts: &StmtsEvents, + scy: &Session, +) -> Result, Error> { + let mut ret = VecDeque::new(); + // TODO time range truncation can be handled better + let params = (series as i64, range.beg().ms() as i64, 1 + range.end().ms() as i64); + let mut res = scy + .execute_iter(stmts.rt(rt).ts_msp_fwd().clone(), params) + .await? + .into_typed::<(i64,)>(); + while let Some(x) = res.next().await { + let row = x?; + let ts = TsMs::from_ms_u64(row.0 as u64); + ret.push_back(ts); + } + Ok(ret) +} + +async fn find_ts_msp_bck( + rt: &RetentionTime, + series: u64, + range: ScyllaSeriesRange, + stmts: &StmtsEvents, + scy: &Session, +) -> Result, Error> { + let mut ret = VecDeque::new(); + let params = (series as i64, range.beg().ms() as i64); + let mut res = scy + .execute_iter(stmts.rt(rt).ts_msp_bck().clone(), params) + .await? + .into_typed::<(i64,)>(); + while let Some(x) = res.next().await { + let row = x?; + let ts = TsMs::from_ms_u64(row.0 as u64); + ret.push_front(ts); + } + Ok(ret) +} diff --git a/crates/scyllaconn/src/events2/prepare.rs b/crates/scyllaconn/src/events2/prepare.rs new file mode 100644 index 0000000..a631885 --- /dev/null +++ b/crates/scyllaconn/src/events2/prepare.rs @@ -0,0 +1,238 @@ +use err::thiserror; +use err::ThisError; +use netpod::ttl::RetentionTime; +use scylla::prepared_statement::PreparedStatement; +use scylla::Session; + +#[derive(Debug, ThisError)] +pub enum Error { + ScyllaQuery(#[from] scylla::transport::errors::QueryError), + ScyllaNextRow(#[from] scylla::transport::iterator::NextRowError), + ScyllaTypeConv(#[from] scylla::cql_to_rust::FromRowError), + ScyllaWorker(Box), + MissingQuery(String), + RangeEndOverflow, + InvalidFuture, + TestError(String), +} + +#[derive(Debug)] +pub struct StmtsLspShape { + u8: PreparedStatement, + u16: PreparedStatement, + u32: PreparedStatement, + u64: PreparedStatement, + i8: PreparedStatement, + i16: PreparedStatement, + i32: PreparedStatement, + i64: PreparedStatement, + f32: PreparedStatement, + f64: PreparedStatement, + bool: PreparedStatement, + string: PreparedStatement, +} + +impl StmtsLspShape { + pub fn st(&self, stname: &str) -> Result<&PreparedStatement, Error> { + let ret = match stname { + "u8" => &self.u8, + "u16" => &self.u16, + "u32" => &self.u32, + "u64" => &self.u64, + "i8" => &self.i8, + "i16" => &self.i16, + "i32" => &self.i32, + "i64" => &self.i64, + "f32" => &self.f32, + "f64" => &self.f64, + "bool" => &self.bool, + "string" => &self.string, + _ => return Err(Error::MissingQuery(format!("no query for stname {stname}"))), + }; + Ok(ret) + } +} + +#[derive(Debug)] +pub struct StmtsLspDir { + scalar: StmtsLspShape, + array: StmtsLspShape, +} + +impl StmtsLspDir { + pub fn shape(&self, array: bool) -> &StmtsLspShape { + if array { + &self.array + } else { + &self.scalar + } + } +} + +#[derive(Debug)] +pub struct StmtsEventsRt { + ts_msp_fwd: PreparedStatement, + ts_msp_bck: PreparedStatement, + lsp_fwd_val: StmtsLspDir, + lsp_bck_val: StmtsLspDir, + lsp_fwd_ts: StmtsLspDir, + lsp_bck_ts: StmtsLspDir, +} + +impl StmtsEventsRt { + pub fn ts_msp_fwd(&self) -> &PreparedStatement { + &self.ts_msp_fwd + } + + pub fn ts_msp_bck(&self) -> &PreparedStatement { + &self.ts_msp_bck + } + + pub fn lsp(&self, bck: bool, val: bool) -> &StmtsLspDir { + if bck { + if val { + &self.lsp_bck_val + } else { + &self.lsp_bck_ts + } + } else { + if val { + &self.lsp_fwd_val + } else { + &self.lsp_fwd_ts + } + } + } +} + +async fn make_msp_dir(ks: &str, rt: &RetentionTime, bck: bool, scy: &Session) -> Result { + let table_name = "ts_msp"; + let select_cond = if bck { + "ts_msp < ? order by ts_msp desc limit 2" + } else { + "ts_msp >= ? and ts_msp < ?" + }; + let cql = format!( + "select ts_msp from {}.{}{} where series = ? and {}", + ks, + rt.table_prefix(), + table_name, + select_cond + ); + let qu = scy.prepare(cql).await?; + Ok(qu) +} + +async fn make_lsp( + ks: &str, + rt: &RetentionTime, + shapepre: &str, + stname: &str, + values: &str, + bck: bool, + scy: &Session, +) -> Result { + let select_cond = if bck { + "ts_lsp < ? order by ts_lsp desc limit 1" + } else { + "ts_lsp >= ? and ts_lsp < ?" + }; + let cql = format!( + concat!( + "select {} from {}.{}events_{}_{}", + " where series = ? and ts_msp = ? and {}" + ), + values, + ks, + rt.table_prefix(), + shapepre, + stname, + select_cond + ); + let qu = scy.prepare(cql).await?; + Ok(qu) +} + +async fn make_lsp_shape( + ks: &str, + rt: &RetentionTime, + shapepre: &str, + values: &str, + bck: bool, + scy: &Session, +) -> Result { + let values = if shapepre.contains("array") { + values.replace("value", "valueblob") + } else { + values.into() + }; + let values = &values; + let maker = |stname| make_lsp(ks, rt, shapepre, stname, values, bck, scy); + let ret = StmtsLspShape { + u8: maker("u8").await?, + u16: maker("u16").await?, + u32: maker("u32").await?, + u64: maker("u64").await?, + i8: maker("i8").await?, + i16: maker("i16").await?, + i32: maker("i32").await?, + i64: maker("i64").await?, + f32: maker("f32").await?, + f64: maker("f64").await?, + bool: maker("bool").await?, + string: maker("string").await?, + }; + Ok(ret) +} + +async fn make_lsp_dir( + ks: &str, + rt: &RetentionTime, + values: &str, + bck: bool, + scy: &Session, +) -> Result { + let ret = StmtsLspDir { + scalar: make_lsp_shape(ks, rt, "scalar", values, bck, scy).await?, + array: make_lsp_shape(ks, rt, "array", values, bck, scy).await?, + }; + Ok(ret) +} + +async fn make_rt(ks: &str, rt: &RetentionTime, scy: &Session) -> Result { + let ret = StmtsEventsRt { + ts_msp_fwd: make_msp_dir(ks, rt, false, scy).await?, + ts_msp_bck: make_msp_dir(ks, rt, true, scy).await?, + lsp_fwd_val: make_lsp_dir(ks, rt, "ts_lsp, pulse, value", false, scy).await?, + lsp_bck_val: make_lsp_dir(ks, rt, "ts_lsp, pulse, value", true, scy).await?, + lsp_fwd_ts: make_lsp_dir(ks, rt, "ts_lsp, pulse", false, scy).await?, + lsp_bck_ts: make_lsp_dir(ks, rt, "ts_lsp, pulse", true, scy).await?, + }; + Ok(ret) +} + +#[derive(Debug)] +pub struct StmtsEvents { + st: StmtsEventsRt, + mt: StmtsEventsRt, + lt: StmtsEventsRt, +} + +impl StmtsEvents { + pub async fn new(ks: [&str; 3], scy: &Session) -> Result { + let ret = StmtsEvents { + st: make_rt(ks[0], &RetentionTime::Short, scy).await?, + mt: make_rt(ks[1], &RetentionTime::Medium, scy).await?, + lt: make_rt(ks[2], &RetentionTime::Long, scy).await?, + }; + Ok(ret) + } + + pub fn rt(&self, rt: &RetentionTime) -> &StmtsEventsRt { + match rt { + RetentionTime::Short => &self.st, + RetentionTime::Medium => &self.mt, + RetentionTime::Long => &&self.lt, + } + } +} diff --git a/crates/scyllaconn/src/worker.rs b/crates/scyllaconn/src/worker.rs index 8c1f1bb..99d236d 100644 --- a/crates/scyllaconn/src/worker.rs +++ b/crates/scyllaconn/src/worker.rs @@ -1,5 +1,5 @@ use crate::conn::create_scy_session_no_ks; -use crate::events::StmtsEvents; +use crate::events2::prepare::StmtsEvents; use crate::range::ScyllaSeriesRange; use async_channel::Receiver; use async_channel::Sender; @@ -19,8 +19,11 @@ use std::sync::Arc; #[derive(Debug, ThisError)] pub enum Error { + #[error("ScyllaConnection({0})")] ScyllaConnection(err::Error), + Prepare(#[from] crate::events2::prepare::Error), EventsQuery(#[from] crate::events::Error), + Msp(#[from] crate::events2::msp::Error), ChannelSend, ChannelRecv, Join, @@ -145,7 +148,7 @@ impl ScyllaWorker { }; match job { Job::FindTsMsp(rt, series, range, bck, tx) => { - let res = crate::events::find_ts_msp(&rt, series, range, bck, &stmts, &scy).await; + let res = crate::events2::msp::find_ts_msp(&rt, series, range, bck, &stmts, &scy).await; if tx.send(res.map_err(Into::into)).await.is_err() { // TODO count for stats } diff --git a/crates/streams/Cargo.toml b/crates/streams/Cargo.toml index 89fa60b..18392b7 100644 --- a/crates/streams/Cargo.toml +++ b/crates/streams/Cargo.toml @@ -14,7 +14,7 @@ serde_json = "1.0" serde_cbor = "0.11.1" typetag = "0.2.14" ciborium = "0.2.1" -bytes = "1.3" +bytes = "1.6" arrayref = "0.3.6" crc32fast = "1.3.2" byteorder = "1.4.3" diff --git a/crates/streams/src/framed_bytes.rs b/crates/streams/src/framed_bytes.rs new file mode 100644 index 0000000..dbe43d5 --- /dev/null +++ b/crates/streams/src/framed_bytes.rs @@ -0,0 +1,136 @@ +use bytes::Buf; +use bytes::BufMut; +use bytes::Bytes; +use bytes::BytesMut; +use err::thiserror; +use err::ThisError; +use futures_util::Stream; +use futures_util::StreamExt; +use netpod::log::*; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +const FRAME_HEAD_LEN: usize = 16; +const FRAME_PAYLOAD_MAX: u32 = 1024 * 1024 * 8; +const BUF_MAX: usize = (FRAME_HEAD_LEN + FRAME_PAYLOAD_MAX as usize) * 2; + +#[allow(unused)] +macro_rules! trace_parse { + ($($arg:tt)*) => { + if false { + trace!($($arg)*); + } + }; +} + +#[derive(Debug, ThisError)] +pub enum Error { + FrameTooLarge, + Logic, +} + +pub type BoxedFramedBytesStream = Pin> + Send>>; + +// TODO move this type decl because it is not specific to cbor +pub type SitemtyFramedBytesStream = Pin> + Send>>; + +pub enum State { + Reading, + Done, +} + +pub struct FramedBytesStream { + inp: S, + buf: BytesMut, + state: State, +} + +impl FramedBytesStream +where + S: Stream> + Unpin, + E: Into, +{ + pub fn new(inp: S) -> Self { + Self { + inp, + buf: BytesMut::with_capacity(1024 * 256), + state: State::Reading, + } + } + + fn try_parse(&mut self) -> Result, Error> { + trace_parse!("try_parse self.buf.len() {}", self.buf.len()); + if self.buf.len() < FRAME_HEAD_LEN { + return Ok(None); + } + let n = u32::from_le_bytes(self.buf[..4].try_into().map_err(|_| Error::Logic)?); + trace_parse!("try_parse n {}", n); + if n > FRAME_PAYLOAD_MAX { + let e = Error::FrameTooLarge; + return Err(e); + } + let frame_len = FRAME_HEAD_LEN + n as usize; + trace_parse!("try_parse frame_len {}", frame_len); + assert!(self.buf.len() <= self.buf.capacity()); + if self.buf.capacity() < frame_len { + let add_max = BUF_MAX - self.buf.capacity().min(BUF_MAX); + let nadd = ((frame_len.min(FRAME_PAYLOAD_MAX as usize) - self.buf.len()) * 2).min(add_max); + self.buf.reserve(nadd); + } + let adv = (frame_len + 7) / 8 * 8; + trace_parse!("try_parse adv {}", adv); + if self.buf.len() < adv { + Ok(None) + } else { + self.buf.advance(FRAME_HEAD_LEN); + let buf = self.buf.split_to(n as usize); + self.buf.advance(adv - frame_len); + Ok(Some(buf.freeze())) + } + } +} + +impl Stream for FramedBytesStream +where + S: Stream> + Unpin, + E: Into, +{ + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + loop { + break match &self.state { + State::Reading => match self.try_parse() { + Ok(Some(x)) => Ready(Some(Ok(x))), + Ok(None) => match self.inp.poll_next_unpin(cx) { + Ready(Some(x)) => match x { + Ok(x) => { + self.buf.put_slice(&x); + continue; + } + Err(e) => { + self.state = State::Done; + Ready(Some(Err(e.into()))) + } + }, + Ready(None) => { + if self.buf.len() > 0 { + warn!("remaining bytes in input buffer, input closed len {}", self.buf.len()); + } + self.state = State::Done; + Ready(None) + } + Pending => Pending, + }, + Err(e) => { + self.state = State::Done; + Ready(Some(Err(e))) + } + }, + State::Done => Ready(None), + }; + } + } +} diff --git a/crates/streams/src/instrument.rs b/crates/streams/src/instrument.rs new file mode 100644 index 0000000..28c7bf6 --- /dev/null +++ b/crates/streams/src/instrument.rs @@ -0,0 +1,33 @@ +use futures_util::Stream; +use futures_util::StreamExt; +use netpod::log::tracing; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +#[pin_project::pin_project] +pub struct InstrumentStream { + #[pin] + inp: S, + #[pin] + span: tracing::Span, +} + +impl InstrumentStream { + pub fn new(inp: S, span: tracing::Span) -> Self { + Self { inp, span } + } +} + +impl Stream for InstrumentStream +where + S: Stream, +{ + type Item = ::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let mut this = self.project(); + let _spg = this.span.enter(); + this.inp.poll_next_unpin(cx) + } +} diff --git a/crates/streams/src/lib.rs b/crates/streams/src/lib.rs index 1449949..5dc067d 100644 --- a/crates/streams/src/lib.rs +++ b/crates/streams/src/lib.rs @@ -4,8 +4,10 @@ pub mod collect; pub mod dtflags; pub mod filechunkread; pub mod firsterr; +pub mod framed_bytes; pub mod frames; pub mod generators; +pub mod instrument; pub mod itemclone; pub mod json_stream; pub mod lenframed;