diff --git a/crates/daqbuffer/src/bin/daqbuffer.rs b/crates/daqbuffer/src/bin/daqbuffer.rs index 08d5920..132cd59 100644 --- a/crates/daqbuffer/src/bin/daqbuffer.rs +++ b/crates/daqbuffer/src/bin/daqbuffer.rs @@ -64,7 +64,7 @@ async fn go() -> Result<(), Error> { }; match opts.subcmd { SubCmd::Retrieval(subcmd) => { - info!("daqbuffer version {} +0002", clap::crate_version!()); + info!("daqbuffer version {} +0003", clap::crate_version!()); info!(" service_version {}", service_version); if false { #[allow(non_snake_case)] diff --git a/crates/httpret/src/api4/eventdata.rs b/crates/httpret/src/api4/eventdata.rs index 5ed07f7..f5d6bf1 100644 --- a/crates/httpret/src/api4/eventdata.rs +++ b/crates/httpret/src/api4/eventdata.rs @@ -15,7 +15,6 @@ use httpclient::Requ; use httpclient::StreamResponse; use netpod::log::*; use netpod::NodeConfigCached; -use netpod::ServiceVersion; use std::sync::Arc; #[derive(Debug, ThisError)] @@ -85,7 +84,7 @@ impl EventDataHandler { .await .map_err(|_| EventDataError::InternalError)?; let (evsubq,) = nodenet::conn::events_parse_input_query(frames).map_err(|_| EventDataError::QueryParse)?; - let stream = nodenet::conn::create_response_bytes_stream(evsubq, ncc) + let stream = nodenet::conn::create_response_bytes_stream(evsubq, shared_res.scyqueue.as_ref(), ncc) .await .map_err(|e| EventDataError::Error(Box::new(e)))?; let ret = response(StatusCode::OK) diff --git a/crates/httpret/src/httpret.rs b/crates/httpret/src/httpret.rs index f777007..2ed4581 100644 --- a/crates/httpret/src/httpret.rs +++ b/crates/httpret/src/httpret.rs @@ -47,6 +47,8 @@ use netpod::APP_JSON; use panic::AssertUnwindSafe; use panic::UnwindSafe; use pin::Pin; +use scyllaconn::worker::ScyllaQueue; +use scyllaconn::worker::ScyllaWorker; use serde::Deserialize; use serde::Serialize; use std::net; @@ -102,11 +104,12 @@ impl ::err::ToErr for RetrievalError { pub struct ServiceSharedResources { pgqueue: PgQueue, + scyqueue: Option, } impl ServiceSharedResources { - pub fn new(pgqueue: PgQueue) -> Self { - Self { pgqueue } + pub fn new(pgqueue: PgQueue, scyqueue: Option) -> Self { + Self { pgqueue, scyqueue } } } @@ -119,7 +122,21 @@ pub async fn host(ncc: NodeConfigCached, service_version: ServiceVersion) -> Res // let rawjh = taskrun::spawn(nodenet::conn::events_service(node_config.clone())); let (pgqueue, pgworker) = PgWorker::new(&ncc.node_config.cluster.database).await?; let pgworker_jh = taskrun::spawn(pgworker.work()); - let shared_res = ServiceSharedResources::new(pgqueue); + let scyqueue = if let (Some(st), Some(mt), Some(lt)) = ( + ncc.node_config.cluster.scylla_st(), + ncc.node_config.cluster.scylla_mt(), + ncc.node_config.cluster.scylla_lt(), + ) { + let (scyqueue, scylla_worker) = ScyllaWorker::new(st, mt, lt).await.map_err(|e| { + error!("{e}"); + RetrievalError::TextError(e.to_string()) + })?; + let scylla_worker_jh = taskrun::spawn(scylla_worker.work()); + Some(scyqueue) + } else { + None + }; + let shared_res = ServiceSharedResources::new(pgqueue, scyqueue); let shared_res = Arc::new(shared_res); use std::str::FromStr; let bind_addr = SocketAddr::from_str(&format!("{}:{}", ncc.node.listen(), ncc.node.port))?; @@ -136,7 +153,6 @@ pub async fn host(ncc: NodeConfigCached, service_version: ServiceVersion) -> Res let service_version = service_version.clone(); let io = TokioIo::new(stream); let shared_res = shared_res.clone(); - // let shared_res = &shared_res; tokio::task::spawn(async move { let res = hyper::server::conn::http1::Builder::new() .serve_connection( diff --git a/crates/nodenet/src/conn.rs b/crates/nodenet/src/conn.rs index d0863ef..eb5c4d4 100644 --- a/crates/nodenet/src/conn.rs +++ b/crates/nodenet/src/conn.rs @@ -17,7 +17,6 @@ use items_2::empty::empty_events_dyn_ev; use items_2::framable::EventQueryJsonStringFrame; use items_2::framable::Framable; use items_2::frame::decode_frame; -use items_2::frame::make_error_frame; use items_2::frame::make_term_frame; use items_2::inmem::InMemoryFrame; use netpod::histo::HistoLog2; @@ -26,6 +25,7 @@ use netpod::NodeConfigCached; use netpod::ReqCtxArc; use query::api4::events::EventsSubQuery; use query::api4::events::Frame1Parts; +use scyllaconn::worker::ScyllaQueue; use std::net::SocketAddr; use std::pin::Pin; use streams::frames::inmem::BoxedBytesStream; @@ -45,13 +45,14 @@ mod test; #[derive(Debug, ThisError)] pub enum NodeNetError {} -pub async fn events_service(node_config: NodeConfigCached) -> Result<(), Error> { - let addr = format!("{}:{}", node_config.node.listen(), node_config.node.port_raw); +pub async fn events_service(ncc: NodeConfigCached) -> Result<(), Error> { + let scyqueue = err::todoval(); + let addr = format!("{}:{}", ncc.node.listen(), ncc.node.port_raw); let lis = tokio::net::TcpListener::bind(addr).await?; loop { match lis.accept().await { Ok((stream, addr)) => { - taskrun::spawn(events_conn_handler(stream, addr, node_config.clone())); + taskrun::spawn(events_conn_handler(stream, addr, scyqueue, ncc.clone())); } Err(e) => Err(e)?, } @@ -76,15 +77,16 @@ impl> From<(E, OwnedWriteHalf)> for ConnErr { async fn make_channel_events_stream_data( subq: EventsSubQuery, reqctx: ReqCtxArc, + scyqueue: Option<&ScyllaQueue>, ncc: &NodeConfigCached, ) -> Result> + Send>>, Error> { if subq.backend() == TEST_BACKEND { let node_count = ncc.node_config.cluster.nodes.len() as u64; let node_ix = ncc.ix as u64; streams::generators::make_test_channel_events_stream_data(subq, node_count, node_ix) - } else if let Some(scyconf) = &ncc.node_config.cluster.scylla_st() { + } else if let Some(scyqueue) = scyqueue { let cfg = subq.ch_conf().to_scylla()?; - scylla_channel_event_stream(subq, cfg, scyconf, ncc).await + scylla_channel_event_stream(subq, cfg, scyqueue).await } else if let Some(_) = &ncc.node.channel_archiver { let e = Error::with_msg_no_trace("archapp not built"); Err(e) @@ -100,11 +102,12 @@ async fn make_channel_events_stream_data( async fn make_channel_events_stream( subq: EventsSubQuery, reqctx: ReqCtxArc, + scyqueue: Option<&ScyllaQueue>, ncc: &NodeConfigCached, ) -> Result> + Send>>, Error> { let empty = empty_events_dyn_ev(subq.ch_conf().scalar_type(), subq.ch_conf().shape())?; let empty = sitem_data(ChannelEvents::Events(empty)); - let stream = make_channel_events_stream_data(subq, reqctx, ncc).await?; + let stream = make_channel_events_stream_data(subq, reqctx, scyqueue, ncc).await?; let ret = futures_util::stream::iter([empty]).chain(stream); let ret = Box::pin(ret); Ok(ret) @@ -112,6 +115,7 @@ async fn make_channel_events_stream( pub async fn create_response_bytes_stream( evq: EventsSubQuery, + scyqueue: Option<&ScyllaQueue>, ncc: &NodeConfigCached, ) -> Result { debug!( @@ -136,7 +140,7 @@ pub async fn create_response_bytes_stream( Ok(ret) } else { let mut tr = build_event_transform(evq.transform())?; - let stream = make_channel_events_stream(evq, reqctx, ncc).await?; + let stream = make_channel_events_stream(evq, reqctx, scyqueue, ncc).await?; let stream = stream.map(move |x| { on_sitemty_data!(x, |x: ChannelEvents| { match x { @@ -162,9 +166,10 @@ pub async fn create_response_bytes_stream( async fn events_conn_handler_with_reqid( mut netout: OwnedWriteHalf, evq: EventsSubQuery, + scyqueue: Option<&ScyllaQueue>, ncc: &NodeConfigCached, ) -> Result<(), ConnErr> { - let mut stream = match create_response_bytes_stream(evq, ncc).await { + let mut stream = match create_response_bytes_stream(evq, scyqueue, ncc).await { Ok(x) => x, Err(e) => return Err((e, netout))?, }; @@ -283,6 +288,7 @@ async fn events_conn_handler_inner_try( netin: INP, netout: OwnedWriteHalf, addr: SocketAddr, + scyqueue: Option<&ScyllaQueue>, ncc: &NodeConfigCached, ) -> Result<(), ConnErr> where @@ -300,19 +306,22 @@ where debug!("events_conn_handler sees: {evq:?}"); let reqid = evq.reqid(); let span = tracing::info_span!("subreq", reqid = reqid); - events_conn_handler_with_reqid(netout, evq, ncc).instrument(span).await + events_conn_handler_with_reqid(netout, evq, scyqueue, ncc) + .instrument(span) + .await } async fn events_conn_handler_inner( netin: INP, netout: OwnedWriteHalf, addr: SocketAddr, - node_config: &NodeConfigCached, + scyqueue: Option<&ScyllaQueue>, + ncc: &NodeConfigCached, ) -> Result<(), Error> where INP: Stream> + Unpin, { - match events_conn_handler_inner_try(netin, netout, addr, node_config).await { + match events_conn_handler_inner_try(netin, netout, addr, scyqueue, ncc).await { Ok(_) => (), Err(ce) => { let mut out = ce.netout; @@ -324,11 +333,16 @@ where Ok(()) } -async fn events_conn_handler(stream: TcpStream, addr: SocketAddr, node_config: NodeConfigCached) -> Result<(), Error> { +async fn events_conn_handler( + stream: TcpStream, + addr: SocketAddr, + scyqueue: Option<&ScyllaQueue>, + ncc: NodeConfigCached, +) -> Result<(), Error> { let (netin, netout) = stream.into_split(); let inp = Box::new(TcpReadAsBytes::new(netin)); let span1 = span!(Level::INFO, "events_conn_handler"); - let r = events_conn_handler_inner(inp, netout, addr, &node_config) + let r = events_conn_handler_inner(inp, netout, addr, scyqueue, &ncc) .instrument(span1) .await; match r { diff --git a/crates/nodenet/src/conn/test.rs b/crates/nodenet/src/conn/test.rs index 4a03254..a0e39c9 100644 --- a/crates/nodenet/src/conn/test.rs +++ b/crates/nodenet/src/conn/test.rs @@ -88,7 +88,8 @@ fn raw_data_00() { let frame1 = Frame1Parts::new(qu.clone()); let query = EventQueryJsonStringFrame(serde_json::to_string(&frame1).unwrap()); let frame = sitem_data(query).make_frame()?; - let jh = taskrun::spawn(events_conn_handler(client, addr, cfg)); + let scyqueue = err::todoval(); + let jh = taskrun::spawn(events_conn_handler(client, addr, scyqueue, cfg)); con.write_all(&frame).await.unwrap(); eprintln!("written"); con.shutdown().await.unwrap(); diff --git a/crates/nodenet/src/scylla.rs b/crates/nodenet/src/scylla.rs index 9c32fa8..5132ba0 100644 --- a/crates/nodenet/src/scylla.rs +++ b/crates/nodenet/src/scylla.rs @@ -7,23 +7,20 @@ use items_0::streamitem::StreamItem; use items_2::channelevents::ChannelEvents; use netpod::log::*; use netpod::ChConf; -use netpod::NodeConfigCached; -use netpod::ScyllaConfig; use query::api4::events::EventsSubQuery; +use scyllaconn::worker::ScyllaQueue; use std::pin::Pin; use taskrun::tokio; pub async fn scylla_channel_event_stream( evq: EventsSubQuery, chconf: ChConf, - scyco: &ScyllaConfig, - _ncc: &NodeConfigCached, + scyqueue: &ScyllaQueue, ) -> Result> + Send>>, Error> { // TODO depends in general on the query // TODO why both in PlainEventsQuery and as separate parameter? Check other usages. + // let do_one_before_range = evq.need_one_before_range(); let do_one_before_range = false; - // TODO use better builder pattern with shortcuts for production and dev defaults - let scy = scyllaconn::conn::create_scy_session(scyco).await?; let series = chconf.series(); let scalar_type = chconf.scalar_type(); let shape = chconf.shape(); @@ -37,7 +34,7 @@ pub async fn scylla_channel_event_stream( scalar_type.clone(), shape.clone(), with_values, - scy, + scyqueue.clone(), do_test_stream_error, ); let stream = stream diff --git a/crates/scyllaconn/src/bincache.rs b/crates/scyllaconn/src/bincache.rs index 93ff40e..06e41ae 100644 --- a/crates/scyllaconn/src/bincache.rs +++ b/crates/scyllaconn/src/bincache.rs @@ -1,5 +1,4 @@ use crate::errconv::ErrConv; -use crate::events::EventsStreamScylla; use err::Error; use futures_util::Future; use futures_util::Stream; diff --git a/crates/scyllaconn/src/events.rs b/crates/scyllaconn/src/events.rs index 6900ff3..d49b2cc 100644 --- a/crates/scyllaconn/src/events.rs +++ b/crates/scyllaconn/src/events.rs @@ -1,5 +1,6 @@ use crate::errconv::ErrConv; use crate::range::ScyllaSeriesRange; +use crate::worker::ScyllaQueue; use err::Error; use futures_util::Future; use futures_util::FutureExt; @@ -20,7 +21,10 @@ use netpod::Shape; use netpod::TsMs; use netpod::TsNano; use scylla::frame::response::result::Row; +use scylla::prepared_statement::PreparedStatement; +use scylla::Session; use scylla::Session as ScySession; +use std::collections::BTreeMap; use std::collections::VecDeque; use std::mem; use std::pin::Pin; @@ -28,61 +32,131 @@ use std::sync::Arc; use std::task::Context; use std::task::Poll; -async fn find_ts_msp( +#[derive(Debug)] +pub struct StmtsEventsRt { + ts_msp_bck: PreparedStatement, + ts_msp_fwd: PreparedStatement, + read_value_queries: BTreeMap, +} + +impl StmtsEventsRt { + pub(super) async fn new(rtpre: &str, scy: &Session) -> Result { + let cql = format!( + "select ts_msp from {}{} where series = ? and ts_msp < ? order by ts_msp desc limit 2", + rtpre, "ts_msp" + ); + let ts_msp_bck = scy.prepare(cql).await.err_conv()?; + let cql = format!( + "select ts_msp from {}{} where series = ? and ts_msp >= ? and ts_msp < ?", + rtpre, "ts_msp" + ); + let ts_msp_fwd = scy.prepare(cql).await.err_conv()?; + let mut read_value_queries = BTreeMap::new(); + for sct in [ + "u8", "u16", "u32", "u64", "i8", "i16", "i32", "i64", "f32", "f64", "bool", "string", + ] { + let combinations = [ + ("timestamps", "scalar", "ts_lsp, pulse"), + ("timestamps", "array", "ts_lsp, pulse"), + ("values", "scalar", "ts_lsp, pulse, value"), + ("valueblobs", "array", "ts_lsp, pulse, valueblob"), + ]; + for com in combinations { + let query_name = format!("{}_{}_{}_fwd", com.1, sct, com.0); + let cql = format!( + concat!( + "select {} from {}events_{}_{}", + " where series = ? and ts_msp = ? and ts_lsp >= ? and ts_lsp < ?" + ), + com.2, rtpre, com.1, sct, + ); + let qu = scy.prepare(cql).await.err_conv()?; + read_value_queries.insert(query_name, qu); + + let query_name = format!("{}_{}_{}_bck", com.1, sct, com.0); + let cql = format!( + concat!( + "select {} from {}events_{}_{}", + " where series = ? and ts_msp = ? and ts_lsp < ? order by ts_lsp desc limit 1" + ), + com.2, rtpre, com.1, sct, + ); + let qu = scy.prepare(cql).await.err_conv()?; + read_value_queries.insert(query_name, qu); + } + } + let ret = Self { + ts_msp_bck, + ts_msp_fwd, + read_value_queries, + }; + Ok(ret) + } +} + +pub(super) async fn find_ts_msp_worker( series: u64, range: ScyllaSeriesRange, - scy: Arc, + stmts: &StmtsEventsRt, + scy: &ScySession, ) -> Result<(VecDeque, VecDeque), Error> { trace!("find_ts_msp series {:?} {:?}", series, range); let mut ret1 = VecDeque::new(); let mut ret2 = VecDeque::new(); - // TODO use prepared statements - let cql = "select ts_msp from st_ts_msp where series = ? and ts_msp < ? order by ts_msp desc limit 2"; let params = (series as i64, range.beg().ms() as i64); trace!("find_ts_msp query 1 params {:?}", params); - let res = scy.query(cql, params).await.err_conv()?; - for row in res.rows_typed_or_empty::<(i64,)>() { - let row = row.err_conv()?; + let mut res = scy + .execute_iter(stmts.ts_msp_bck.clone(), params) + .await + .err_conv()? + .into_typed::<(i64,)>(); + while let Some(x) = res.next().await { + let row = x.map_err(|e| Error::with_msg_no_trace(e.to_string()))?; let ts = TsMs::from_ms_u64(row.0 as u64); trace!("query 1 ts_msp {}", ts); ret1.push_front(ts); } - let cql = "select ts_msp from st_ts_msp where series = ? and ts_msp >= ? and ts_msp < ?"; let params = (series as i64, range.beg().ms() as i64, 1 + range.end().ms() as i64); trace!("find_ts_msp query 2 params {:?}", params); - let res = scy.query(cql, params).await.err_conv()?; - for row in res.rows_typed_or_empty::<(i64,)>() { - let row = row.err_conv()?; + let mut res = scy + .execute_iter(stmts.ts_msp_fwd.clone(), params) + .await + .err_conv()? + .into_typed::<(i64,)>(); + while let Some(x) = res.next().await { + let row = x.map_err(|e| Error::with_msg_no_trace(e.to_string()))?; let ts = TsMs::from_ms_u64(row.0 as u64); trace!("query 2 ts_msp {}", ts); - ret2.push_back(ts); - } - let cql = "select ts_msp from st_ts_msp where series = ? and ts_msp >= ? limit 1"; - let params = (series as i64, range.end().ms() as i64); - trace!("find_ts_msp query 3 params {:?}", params); - let res = scy.query(cql, params).await.err_conv()?; - for row in res.rows_typed_or_empty::<(i64,)>() { - let row = row.err_conv()?; - let ts = TsMs::from_ms_u64(row.0 as u64); - trace!("query 3 ts_msp {}", ts); - ret2.push_back(ts); + ret2.push_front(ts); } + // let cql = "select ts_msp from st_ts_msp where series = ? and ts_msp >= ? limit 1"; + // let params = (series as i64, range.end().ms() as i64); + // trace!("find_ts_msp query 3 params {:?}", params); + // let res = scy.query(cql, params).await.err_conv()?; + // for row in res.rows_typed_or_empty::<(i64,)>() { + // let row = row.err_conv()?; + // let ts = TsMs::from_ms_u64(row.0 as u64); + // trace!("query 3 ts_msp {}", ts); + // ret2.push_back(ts); + // } trace!("find_ts_msp n1 {:?} n2 {:?}", ret1.len(), ret2.len()); Ok((ret1, ret2)) } -trait ValTy: Sized { +trait ValTy: Sized + 'static { type ScaTy: ScalarOps + std::default::Default; type ScyTy: scylla::cql_to_rust::FromCqlVal; type Container: Events + Appendable; fn from_scyty(inp: Self::ScyTy) -> Self; + fn from_valueblob(inp: Vec) -> Self; fn table_name() -> &'static str; fn default() -> Self; fn is_valueblob() -> bool; + fn st_name() -> &'static str; } macro_rules! impl_scaty_scalar { - ($st:ty, $st_scy:ty, $table_name:expr) => { + ($st:ty, $st_scy:ty, $st_name:expr, $table_name:expr) => { impl ValTy for $st { type ScaTy = $st; type ScyTy = $st_scy; @@ -90,6 +164,9 @@ macro_rules! impl_scaty_scalar { fn from_scyty(inp: Self::ScyTy) -> Self { inp as Self } + fn from_valueblob(_inp: Vec) -> Self { + ::default() + } fn table_name() -> &'static str { $table_name } @@ -99,12 +176,15 @@ macro_rules! impl_scaty_scalar { fn is_valueblob() -> bool { false } + fn st_name() -> &'static str { + $st_name + } } }; } macro_rules! impl_scaty_array { - ($vt:ty, $st:ty, $st_scy:ty, $table_name:expr) => { + ($vt:ty, $st:ty, $st_scy:ty, $st_name:expr, $table_name:expr) => { impl ValTy for $vt { type ScaTy = $st; type ScyTy = $st_scy; @@ -112,6 +192,22 @@ macro_rules! impl_scaty_array { fn from_scyty(inp: Self::ScyTy) -> Self { inp.into_iter().map(|x| x as Self::ScaTy).collect() } + fn from_valueblob(inp: Vec) -> Self { + if inp.len() < 32 { + ::default() + } else { + let en = std::mem::size_of::(); + let n = (inp.len().max(32) - 32) / en; + let mut c = Vec::with_capacity(n); + for i in 0..n { + let r1 = &inp[32 + en * (0 + i)..32 + en * (1 + i)]; + let p1 = r1 as *const _ as *const $st; + let v1 = unsafe { p1.read_unaligned() }; + c.push(v1); + } + c + } + } fn table_name() -> &'static str { $table_name } @@ -121,35 +217,38 @@ macro_rules! impl_scaty_array { fn is_valueblob() -> bool { true } + fn st_name() -> &'static str { + $st_name + } } }; } -impl_scaty_scalar!(u8, i8, "st_events_scalar_u8"); -impl_scaty_scalar!(u16, i16, "st_events_scalar_u16"); -impl_scaty_scalar!(u32, i32, "st_events_scalar_u32"); -impl_scaty_scalar!(u64, i64, "st_events_scalar_u64"); -impl_scaty_scalar!(i8, i8, "st_events_scalar_i8"); -impl_scaty_scalar!(i16, i16, "st_events_scalar_i16"); -impl_scaty_scalar!(i32, i32, "st_events_scalar_i32"); -impl_scaty_scalar!(i64, i64, "st_events_scalar_i64"); -impl_scaty_scalar!(f32, f32, "st_events_scalar_f32"); -impl_scaty_scalar!(f64, f64, "st_events_scalar_f64"); -impl_scaty_scalar!(bool, bool, "st_events_scalar_bool"); -impl_scaty_scalar!(String, String, "st_events_scalar_string"); +impl_scaty_scalar!(u8, i8, "u8", "st_events_scalar_u8"); +impl_scaty_scalar!(u16, i16, "u16", "st_events_scalar_u16"); +impl_scaty_scalar!(u32, i32, "u32", "st_events_scalar_u32"); +impl_scaty_scalar!(u64, i64, "u64", "st_events_scalar_u64"); +impl_scaty_scalar!(i8, i8, "i8", "st_events_scalar_i8"); +impl_scaty_scalar!(i16, i16, "i16", "st_events_scalar_i16"); +impl_scaty_scalar!(i32, i32, "i32", "st_events_scalar_i32"); +impl_scaty_scalar!(i64, i64, "i64", "st_events_scalar_i64"); +impl_scaty_scalar!(f32, f32, "f32", "st_events_scalar_f32"); +impl_scaty_scalar!(f64, f64, "f64", "st_events_scalar_f64"); +impl_scaty_scalar!(bool, bool, "bool", "st_events_scalar_bool"); +impl_scaty_scalar!(String, String, "string", "st_events_scalar_string"); -impl_scaty_array!(Vec, u8, Vec, "st_events_array_u8"); -impl_scaty_array!(Vec, u16, Vec, "st_events_array_u16"); -impl_scaty_array!(Vec, u32, Vec, "st_events_array_u32"); -impl_scaty_array!(Vec, u64, Vec, "st_events_array_u64"); -impl_scaty_array!(Vec, i8, Vec, "st_events_array_i8"); -impl_scaty_array!(Vec, i16, Vec, "st_events_array_i16"); -impl_scaty_array!(Vec, i32, Vec, "st_events_array_i32"); -impl_scaty_array!(Vec, i64, Vec, "st_events_array_i64"); -impl_scaty_array!(Vec, f32, Vec, "st_events_array_f32"); -impl_scaty_array!(Vec, f64, Vec, "st_events_array_f64"); -impl_scaty_array!(Vec, bool, Vec, "st_events_array_bool"); -impl_scaty_array!(Vec, String, Vec, "st_events_array_string"); +impl_scaty_array!(Vec, u8, Vec, "u8", "st_events_array_u8"); +impl_scaty_array!(Vec, u16, Vec, "u16", "st_events_array_u16"); +impl_scaty_array!(Vec, u32, Vec, "u32", "st_events_array_u32"); +impl_scaty_array!(Vec, u64, Vec, "u64", "st_events_array_u64"); +impl_scaty_array!(Vec, i8, Vec, "i8", "st_events_array_i8"); +impl_scaty_array!(Vec, i16, Vec, "i16", "st_events_array_i16"); +impl_scaty_array!(Vec, i32, Vec, "i32", "st_events_array_i32"); +impl_scaty_array!(Vec, i64, Vec, "i64", "st_events_array_i64"); +impl_scaty_array!(Vec, f32, Vec, "f32", "st_events_array_f32"); +impl_scaty_array!(Vec, f64, Vec, "f64", "st_events_array_f64"); +impl_scaty_array!(Vec, bool, Vec, "bool", "st_events_array_bool"); +// impl_scaty_array!(Vec, String, Vec, "string", "st_events_array_string"); struct ReadNextValuesOpts { series: u64, @@ -157,33 +256,40 @@ struct ReadNextValuesOpts { range: ScyllaSeriesRange, fwd: bool, with_values: bool, - scy: Arc, + scyqueue: ScyllaQueue, } async fn read_next_values(opts: ReadNextValuesOpts) -> Result, Error> where ST: ValTy, { - debug!("read_next_values {} {}", opts.series, opts.ts_msp); + // TODO could take scyqeue out of opts struct. + let scyqueue = opts.scyqueue.clone(); + let futgen = Box::new(|scy: Arc, stmts: Arc| { + let fut = read_next_values_worker::(opts, scy, stmts); + Box::pin(fut) as Pin, err::Error>> + Send>> + }); + let res = scyqueue.read_next_values(futgen).await?; + Ok(res) +} + +async fn read_next_values_worker( + opts: ReadNextValuesOpts, + scy: Arc, + stmts: Arc, +) -> Result, Error> +where + ST: ValTy, +{ + trace!("read_next_values_worker {} {}", opts.series, opts.ts_msp); let series = opts.series; let ts_msp = opts.ts_msp; let range = opts.range; - let fwd = opts.fwd; - let scy = opts.scy; let table_name = ST::table_name(); if range.end() > TsNano::from_ns(i64::MAX as u64) { return Err(Error::with_msg_no_trace(format!("range.end overflows i64"))); } - let cql_fields = if opts.with_values { - if ST::is_valueblob() { - "ts_lsp, pulse, valueblob" - } else { - "ts_lsp, pulse, value" - } - } else { - "ts_lsp, pulse" - }; - let ret = if fwd { + let ret = if opts.fwd { let ts_lsp_min = if range.beg() > ts_msp.ns() { range.beg().delta(ts_msp.ns()) } else { @@ -201,14 +307,20 @@ where ts_lsp_max, table_name, ); - // TODO use prepared! - let cql = format!( - concat!( - "select {} from {}", - " where series = ? and ts_msp = ? and ts_lsp >= ? and ts_lsp < ?" - ), - cql_fields, table_name, - ); + let qu_name = if opts.with_values { + if ST::is_valueblob() { + format!("array_{}_valueblobs_fwd", ST::st_name()) + } else { + format!("array_{}_values_fwd", ST::st_name()) + } + } else { + format!("array_{}_timestamps_fwd", ST::st_name()) + }; + let qu = stmts.read_value_queries.get(&qu_name).ok_or_else(|| { + let e = Error::with_msg_no_trace(format!("can not find query name {}", qu_name)); + error!("{e}"); + e + })?; let params = ( series as i64, ts_msp.ms() as i64, @@ -216,13 +328,13 @@ where ts_lsp_max.ns() as i64, ); trace!("FWD event search params {:?}", params); - let mut res = scy.query_iter(cql, params).await.err_conv()?; + let mut res = scy.execute_iter(qu.clone(), params).await.err_conv()?; let mut rows = Vec::new(); while let Some(x) = res.next().await { rows.push(x.err_conv()?); } let mut last_before = None; - let ret = convert_rows::(rows, range, ts_msp, opts.with_values, !fwd, &mut last_before)?; + let ret = convert_rows::(rows, range, ts_msp, opts.with_values, !opts.fwd, &mut last_before)?; ret } else { let ts_lsp_max = if ts_msp.ns() < range.beg() { @@ -231,23 +343,29 @@ where DtNano::from_ns(0) }; trace!("BCK ts_msp {} ts_lsp_max {} {}", ts_msp, ts_lsp_max, table_name,); - // TODO use prepared! - let cql = format!( - concat!( - "select {} from {}", - " where series = ? and ts_msp = ? and ts_lsp < ? order by ts_lsp desc limit 1" - ), - cql_fields, table_name, - ); + let qu_name = if opts.with_values { + if ST::is_valueblob() { + format!("array_{}_valueblobs_bck", ST::st_name()) + } else { + format!("array_{}_values_bck", ST::st_name()) + } + } else { + format!("array_{}_timestamps_bck", ST::st_name()) + }; + let qu = stmts.read_value_queries.get(&qu_name).ok_or_else(|| { + let e = Error::with_msg_no_trace(format!("can not find query name {}", qu_name)); + error!("{e}"); + e + })?; let params = (series as i64, ts_msp.ms() as i64, ts_lsp_max.ns() as i64); trace!("BCK event search params {:?}", params); - let mut res = scy.query_iter(cql, params).await.err_conv()?; + let mut res = scy.execute_iter(qu.clone(), params).await.err_conv()?; let mut rows = Vec::new(); while let Some(x) = res.next().await { rows.push(x.err_conv()?); } let mut _last_before = None; - let ret = convert_rows::(rows, range, ts_msp, opts.with_values, !fwd, &mut _last_before)?; + let ret = convert_rows::(rows, range, ts_msp, opts.with_values, !opts.fwd, &mut _last_before)?; if ret.len() > 1 { error!("multiple events in backwards search {}", ret.len()); } @@ -274,7 +392,7 @@ fn convert_rows( trace!("read a value blob len {}", row.2.len()); let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); let pulse = row.1 as u64; - let value = ValTy::default(); + let value = ValTy::from_valueblob(row.2); (ts, pulse, value) } else { let row: (i64, i64, ST::ScyTy) = row.into_typed().err_conv()?; @@ -326,7 +444,7 @@ struct ReadValues { with_values: bool, fut: Pin, Error>> + Send>>, fut_done: bool, - scy: Arc, + scyqueue: ScyllaQueue, } impl ReadValues { @@ -338,7 +456,7 @@ impl ReadValues { ts_msps: VecDeque, fwd: bool, with_values: bool, - scy: Arc, + scyqueue: ScyllaQueue, ) -> Self { let mut ret = Self { series, @@ -352,7 +470,7 @@ impl ReadValues { "future not initialized", )))), fut_done: false, - scy, + scyqueue, }; ret.next(); ret @@ -375,7 +493,7 @@ impl ReadValues { range: self.range.clone(), fwd: self.fwd, with_values: self.with_values, - scy: self.scy.clone(), + scyqueue: self.scyqueue.clone(), }; let scalar_type = self.scalar_type.clone(); let shape = self.shape.clone(); @@ -394,8 +512,8 @@ impl ReadValues { ScalarType::F64 => read_next_values::(opts).await, ScalarType::BOOL => read_next_values::(opts).await, ScalarType::STRING => read_next_values::(opts).await, - _ => { - error!("TODO ReadValues add more types"); + ScalarType::ChannelStatus => { + warn!("read scalar channel status not yet supported"); err::todoval() } }, @@ -411,8 +529,12 @@ impl ReadValues { ScalarType::F32 => read_next_values::>(opts).await, ScalarType::F64 => read_next_values::>(opts).await, ScalarType::BOOL => read_next_values::>(opts).await, - _ => { - error!("TODO ReadValues add more types"); + ScalarType::STRING => { + warn!("read array string not yet supported"); + err::todoval() + } + ScalarType::ChannelStatus => { + warn!("read array channel status not yet supported"); err::todoval() } }, @@ -428,7 +550,7 @@ impl ReadValues { enum FrState { New, - FindMsp(Pin, VecDeque), Error>> + Send>>), + FindMsp(Pin, VecDeque), crate::worker::Error>> + Send>>), ReadBack1(ReadValues), ReadBack2(ReadValues), ReadValues(ReadValues), @@ -445,7 +567,7 @@ pub struct EventsStreamScylla { do_one_before_range: bool, ts_msp_bck: VecDeque, ts_msp_fwd: VecDeque, - scy: Arc, + scyqueue: ScyllaQueue, do_test_stream_error: bool, found_one_after: bool, with_values: bool, @@ -460,7 +582,7 @@ impl EventsStreamScylla { scalar_type: ScalarType, shape: Shape, with_values: bool, - scy: Arc, + scyqueue: ScyllaQueue, do_test_stream_error: bool, ) -> Self { debug!("EventsStreamScylla::new"); @@ -473,7 +595,7 @@ impl EventsStreamScylla { do_one_before_range, ts_msp_bck: VecDeque::new(), ts_msp_fwd: VecDeque::new(), - scy, + scyqueue, do_test_stream_error, found_one_after: false, with_values, @@ -505,7 +627,7 @@ impl EventsStreamScylla { [msp].into(), false, self.with_values, - self.scy.clone(), + self.scyqueue.clone(), ); self.state = FrState::ReadBack1(st); } else if self.ts_msp_fwd.len() > 0 { @@ -518,7 +640,7 @@ impl EventsStreamScylla { mem::replace(&mut self.ts_msp_fwd, VecDeque::new()), true, self.with_values, - self.scy.clone(), + self.scyqueue.clone(), ); self.state = FrState::ReadValues(st); } else { @@ -539,7 +661,7 @@ impl EventsStreamScylla { mem::replace(&mut self.ts_msp_fwd, VecDeque::new()), true, self.with_values, - self.scy.clone(), + self.scyqueue.clone(), ); self.state = FrState::ReadValues(st); } else { @@ -556,7 +678,7 @@ impl EventsStreamScylla { [msp].into(), false, self.with_values, - self.scy.clone(), + self.scyqueue.clone(), ); self.state = FrState::ReadBack2(st); } else if self.ts_msp_fwd.len() > 0 { @@ -569,7 +691,7 @@ impl EventsStreamScylla { mem::replace(&mut self.ts_msp_fwd, VecDeque::new()), true, self.with_values, - self.scy.clone(), + self.scyqueue.clone(), ); self.state = FrState::ReadValues(st); } else { @@ -593,7 +715,7 @@ impl EventsStreamScylla { mem::replace(&mut self.ts_msp_fwd, VecDeque::new()), true, self.with_values, - self.scy.clone(), + self.scyqueue.clone(), ); self.state = FrState::ReadValues(st); } else { @@ -602,6 +724,14 @@ impl EventsStreamScylla { } } +async fn find_ts_msp_via_queue( + series: u64, + range: ScyllaSeriesRange, + scyqueue: ScyllaQueue, +) -> Result<(VecDeque, VecDeque), crate::worker::Error> { + scyqueue.find_ts_msp(series, range).await +} + impl Stream for EventsStreamScylla { type Item = Result; @@ -620,7 +750,9 @@ impl Stream for EventsStreamScylla { } break match self.state { FrState::New => { - let fut = find_ts_msp(self.series, self.range.clone(), self.scy.clone()); + let series = self.series.clone(); + let range = self.range.clone(); + let fut = find_ts_msp_via_queue(series, range, self.scyqueue.clone()); let fut = Box::pin(fut); self.state = FrState::FindMsp(fut); continue; @@ -633,7 +765,7 @@ impl Stream for EventsStreamScylla { Ready(Err(e)) => { error!("EventsStreamScylla FindMsp {e}"); self.state = FrState::DataDone; - Ready(Some(Err(e))) + Ready(Some(Err(e.into()))) } Pending => Pending, }, diff --git a/crates/scyllaconn/src/worker.rs b/crates/scyllaconn/src/worker.rs index 3f13e76..52fe372 100644 --- a/crates/scyllaconn/src/worker.rs +++ b/crates/scyllaconn/src/worker.rs @@ -1,12 +1,20 @@ use crate::conn::create_scy_session_no_ks; +use crate::events::StmtsEventsRt; +use crate::range::ScyllaSeriesRange; use async_channel::Receiver; use async_channel::Sender; use err::thiserror; use err::ThisError; +use futures_util::Future; +use items_0::Events; use netpod::log::*; -use netpod::range::evrange::NanoRange; use netpod::ScyllaConfig; +use netpod::TsMs; use scylla::Session; +use std::collections::VecDeque; +use std::fmt; +use std::pin::Pin; +use std::sync::Arc; #[derive(Debug, ThisError)] pub enum Error { @@ -24,7 +32,31 @@ impl err::ToErr for Error { #[derive(Debug)] enum Job { - JobA(String, Sender>), + FindTsMsp( + // series-id + u64, + ScyllaSeriesRange, + Sender, VecDeque), Error>>, + ), + ReadNextValues(ReadNextValues), +} + +struct ReadNextValues { + futgen: Box< + dyn FnOnce( + Arc, + Arc, + ) -> Pin, err::Error>> + Send>> + + Send, + >, + // fut: Pin, Error>> + Send>>, + tx: Sender, Error>>, +} + +impl fmt::Debug for ReadNextValues { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "ReadNextValues {{ .. }}") + } } #[derive(Debug, Clone)] @@ -33,31 +65,59 @@ pub struct ScyllaQueue { } impl ScyllaQueue { - pub async fn job_a(&self, backend: &str) -> Result>, Error> { + pub async fn find_ts_msp( + &self, + series: u64, + range: ScyllaSeriesRange, + ) -> Result<(VecDeque, VecDeque), Error> { let (tx, rx) = async_channel::bounded(1); - let job = Job::JobA(backend.into(), tx); + let job = Job::FindTsMsp(series, range, tx); self.tx.send(job).await.map_err(|_| Error::ChannelSend)?; - Ok(rx) + let res = rx.recv().await.map_err(|_| Error::ChannelRecv)??; + Ok(res) + } + + pub async fn read_next_values(&self, futgen: F) -> Result, Error> + where + F: FnOnce( + Arc, + Arc, + ) -> Pin, err::Error>> + Send>> + + Send + + 'static, + { + let (tx, rx) = async_channel::bounded(1); + let job = Job::ReadNextValues(ReadNextValues { + futgen: Box::new(futgen), + tx, + }); + self.tx.send(job).await.map_err(|_| Error::ChannelSend)?; + let res = rx.recv().await.map_err(|_| Error::ChannelRecv)??; + Ok(res) } } #[derive(Debug)] pub struct ScyllaWorker { rx: Receiver, - scy: Session, - // pgjh: Option>>, + scy: Arc, + stmts_st: Arc, } impl ScyllaWorker { - pub async fn new(scyconf: &ScyllaConfig) -> Result<(ScyllaQueue, Self), Error> { + pub async fn new( + scyconf_st: &ScyllaConfig, + scyconf_mt: &ScyllaConfig, + scyconf_lt: &ScyllaConfig, + ) -> Result<(ScyllaQueue, Self), Error> { let (tx, rx) = async_channel::bounded(64); - let scy = create_scy_session_no_ks(scyconf).await?; + let scy = create_scy_session_no_ks(scyconf_st).await?; + let scy = Arc::new(scy); + let rtpre = format!("{}.st_", scyconf_st.keyspace); + let stmts_st = StmtsEventsRt::new(&rtpre, &scy).await?; + let stmts_st = Arc::new(stmts_st); let queue = ScyllaQueue { tx }; - let worker = Self { - rx, - scy, - // pgjh: Some(pgjh), - }; + let worker = Self { rx, scy, stmts_st }; Ok((queue, worker)) } @@ -72,12 +132,19 @@ impl ScyllaWorker { } }; match job { - Job::JobA(backend, tx) => { - let res = Ok::<_, Error>(backend); + Job::FindTsMsp(series, range, tx) => { + let res = crate::events::find_ts_msp_worker(series, range, &self.stmts_st, &self.scy).await; if tx.send(res.map_err(Into::into)).await.is_err() { // TODO count for stats } } + Job::ReadNextValues(job) => { + let fut = (job.futgen)(self.scy.clone(), self.stmts_st.clone()); + let res = fut.await; + if job.tx.send(res.map_err(Into::into)).await.is_err() { + // TODO count for stats + } + } } } }