diff --git a/crates/daqbuffer/Cargo.toml b/crates/daqbuffer/Cargo.toml index 1999977..617686b 100644 --- a/crates/daqbuffer/Cargo.toml +++ b/crates/daqbuffer/Cargo.toml @@ -1,8 +1,8 @@ [package] name = "daqbuffer" -version = "0.5.5-aa.10" +version = "0.5.5-aa.11" authors = ["Dominik Werder "] -edition = "2021" +edition = "2024" [dependencies] futures-util = "0.3.31" diff --git a/crates/daqbuffer/src/bin/daqbuffer.rs b/crates/daqbuffer/src/bin/daqbuffer.rs index a0715fd..277060c 100644 --- a/crates/daqbuffer/src/bin/daqbuffer.rs +++ b/crates/daqbuffer/src/bin/daqbuffer.rs @@ -6,14 +6,14 @@ use daqbuf_err::Error; use daqbuffer::cli::ClientType; use daqbuffer::cli::Opts; use daqbuffer::cli::SubCmd; -use netpod::log::*; -use netpod::query::CacheUsage; use netpod::NodeConfig; use netpod::NodeConfigCached; use netpod::ProxyConfig; use netpod::ScalarType; use netpod::ServiceVersion; use netpod::Shape; +use netpod::log::*; +use netpod::query::CacheUsage; use taskrun::tokio; use tokio::fs::File; use tokio::io::AsyncReadExt; @@ -140,7 +140,7 @@ async fn go() -> Result<(), Error> { } }, SubCmd::GenerateTestData => { - disk::gen::gen_test_data().await?; + disk::datagen::gen_test_data().await?; } SubCmd::Test => (), SubCmd::Version => { @@ -172,11 +172,11 @@ async fn test_log() { #[cfg(feature = "DISABLED")] fn simple_fetch() { use daqbuffer::err::ErrConv; - use netpod::timeunits::*; use netpod::ByteOrder; use netpod::ScalarType; use netpod::SfDbChannel; use netpod::Shape; + use netpod::timeunits::*; taskrun::run(async { let _rh = daqbufp2::nodes::require_test_hosts_running()?; let t1 = chrono::Utc::now(); diff --git a/crates/dbconn/Cargo.toml b/crates/dbconn/Cargo.toml index 08b8e2f..927aae9 100644 --- a/crates/dbconn/Cargo.toml +++ b/crates/dbconn/Cargo.toml @@ -2,7 +2,7 @@ name = "dbconn" version = "0.0.2" authors = ["Dominik Werder "] -edition = "2021" +edition = "2024" [lib] path = "src/dbconn.rs" diff --git a/crates/dbconn/src/search.rs b/crates/dbconn/src/search.rs index d61b1f7..e92e3f3 100644 --- a/crates/dbconn/src/search.rs +++ b/crates/dbconn/src/search.rs @@ -1,9 +1,6 @@ +use crate::ErrConv; use crate::create_connection; use crate::worker::PgQueue; -use crate::ErrConv; -use daqbuf_err as err; -use err::Error; -use netpod::log::*; use netpod::ChannelArchiver; use netpod::ChannelSearchQuery; use netpod::ChannelSearchResult; @@ -12,9 +9,25 @@ use netpod::Database; use netpod::NodeConfigCached; use netpod::ScalarType; use netpod::Shape; +use netpod::log; use serde_json::Value as JsVal; use tokio_postgres::Client as PgClient; +macro_rules! debug { ($($arg:expr),*) => ( if true { log::debug!($($arg),*) } ); } + +macro_rules! trace { ($($arg:expr),*) => ( if true { log::trace!($($arg),*) } ); } + +autoerr::create_error_v1!( + name(Error, "DbconnSearch"), + enum variants { + BadShapeFromDb(String), + BadArchEngValue(String), + Pg(#[from] tokio_postgres::Error), + Other(#[from] daqbuf_err::Error), + Worker(#[from] crate::worker::Error), + }, +); + pub async fn search_channel_databuffer( query: ChannelSearchQuery, backend: &str, @@ -52,8 +65,7 @@ pub async fn search_channel_databuffer( &backend, ], ) - .await - .err_conv()?; + .await?; let mut res = Vec::new(); for row in rows { let shapedb: Option = row.get(4); @@ -68,14 +80,14 @@ pub async fn search_channel_databuffer( Some(n) => { a.push(n as u32); } - None => return Err(Error::with_msg(format!("can not understand shape {:?}", shapedb))), + None => return Err(Error::BadShapeFromDb(format!("{:?}", shapedb))), }, - _ => return Err(Error::with_msg(format!("can not understand shape {:?}", shapedb))), + _ => return Err(Error::BadShapeFromDb(format!("{:?}", shapedb))), } } a } - _ => return Err(Error::with_msg(format!("can not understand shape {:?}", shapedb))), + _ => return Err(Error::BadShapeFromDb(format!("{:?}", shapedb))), }, None => Vec::new(), }; @@ -131,7 +143,7 @@ pub(super) async fn search_channel_scylla( regop ); let params: &[&(dyn tokio_postgres::types::ToSql + Sync)] = &[&ch_kind, &query.name_regex, &cb1, &cb2]; - info!("search_channel_scylla {:?}", params); + trace!("search_channel_scylla {:?}", params); let rows = pgc.query(sql, params).await.err_conv()?; let mut res = Vec::new(); for row in rows { @@ -227,10 +239,7 @@ async fn search_channel_archeng( if k == "Scalar" { Vec::new() } else { - return Err(Error::with_msg_no_trace(format!( - "search_channel_archeng can not understand {:?}", - config - ))); + return Err(Error::BadArchEngValue(format!("{:?}", config))); } } JsVal::Object(k) => match k.get("Wave") { @@ -239,24 +248,15 @@ async fn search_channel_archeng( vec![k.as_i64().unwrap_or(u32::MAX as i64) as u32] } _ => { - return Err(Error::with_msg_no_trace(format!( - "search_channel_archeng can not understand {:?}", - config - ))); + return Err(Error::BadArchEngValue(format!("{:?}", config))); } }, None => { - return Err(Error::with_msg_no_trace(format!( - "search_channel_archeng can not understand {:?}", - config - ))); + return Err(Error::BadArchEngValue(format!("{:?}", config))); } }, _ => { - return Err(Error::with_msg_no_trace(format!( - "search_channel_archeng can not understand {:?}", - config - ))); + return Err(Error::BadArchEngValue(format!("{:?}", config))); } }, None => Vec::new(), @@ -290,16 +290,12 @@ pub async fn search_channel( let mut query = query; query.backend = Some(backend.into()); if let Some(_scyconf) = ncc.node_config.cluster.scylla_st() { - pgqueue - .search_channel_scylla(query) - .await - .map_err(|e| Error::with_msg_no_trace(format!("db worker error {e}")))? + pgqueue.search_channel_scylla(query).await? // search_channel_scylla(query, backend, pgconf).await } else if let Some(conf) = ncc.node.channel_archiver.as_ref() { search_channel_archeng(query, backend.clone(), conf, pgconf).await } else if let Some(_conf) = ncc.node.archiver_appliance.as_ref() { - // TODO - err::todoval() + todo!() } else { search_channel_databuffer(query, backend, ncc).await } diff --git a/crates/dbconn/src/worker.rs b/crates/dbconn/src/worker.rs index 043398f..09c8e6c 100644 --- a/crates/dbconn/src/worker.rs +++ b/crates/dbconn/src/worker.rs @@ -3,29 +3,28 @@ use async_channel::Receiver; use async_channel::RecvError; use async_channel::Sender; use daqbuf_err as err; -use err::thiserror; -use err::ThisError; -use netpod::log::*; -use netpod::range::evrange::NanoRange; use netpod::ChConf; use netpod::ChannelSearchQuery; use netpod::ChannelSearchResult; use netpod::Database; use netpod::SeriesKind; use netpod::SfDbChannel; +use netpod::log::*; +use netpod::range::evrange::NanoRange; use taskrun::tokio; use tokio::task::JoinHandle; use tokio_postgres::Client; -#[derive(Debug, ThisError)] -#[cstm(name = "PgWorker")] -pub enum Error { - Error(#[from] err::Error), - ChannelSend, - ChannelRecv, - Join, - ChannelConfig(#[from] crate::channelconfig::Error), -} +autoerr::create_error_v1!( + name(Error, "PgWorker"), + enum variants { + Error(#[from] err::Error), + ChannelSend, + ChannelRecv, + Join, + ChannelConfig(#[from] crate::channelconfig::Error), + }, +); impl From for Error { fn from(_value: RecvError) -> Self { @@ -51,7 +50,10 @@ enum Job { Vec, Sender>, crate::channelinfo::Error>>, ), - SearchChannel(ChannelSearchQuery, Sender>), + SearchChannel( + ChannelSearchQuery, + Sender>, + ), SfChannelBySeries( netpod::SfDbChannel, Sender>, @@ -101,7 +103,7 @@ impl PgQueue { pub async fn search_channel_scylla( &self, query: ChannelSearchQuery, - ) -> Result, Error> { + ) -> Result, Error> { let (tx, rx) = async_channel::bounded(1); let job = Job::SearchChannel(query, tx); self.tx.send(job).await.map_err(|_| Error::ChannelSend)?; diff --git a/crates/disk/src/gen.rs b/crates/disk/src/datagen.rs similarity index 100% rename from crates/disk/src/gen.rs rename to crates/disk/src/datagen.rs diff --git a/crates/disk/src/disk.rs b/crates/disk/src/disk.rs index e412685..f5b7a21 100644 --- a/crates/disk/src/disk.rs +++ b/crates/disk/src/disk.rs @@ -2,13 +2,13 @@ pub mod aggtest; pub mod binnedstream; pub mod channelconfig; +pub mod datagen; pub mod dataopen; pub mod decode; pub mod eventchunker; pub mod eventchunkermultifile; pub mod eventfilter; pub mod frame; -pub mod gen; pub mod index; pub mod merge; pub mod paths; diff --git a/crates/httpclient/src/httpclient.rs b/crates/httpclient/src/httpclient.rs index 2511395..128e464 100644 --- a/crates/httpclient/src/httpclient.rs +++ b/crates/httpclient/src/httpclient.rs @@ -8,20 +8,20 @@ use bytes::BytesMut; use daqbuf_err as err; use futures_util::Stream; use futures_util::StreamExt; -use http::header; use http::Request; use http::Response; use http::StatusCode; +use http::header; use http_body::Frame; -use http_body_util::combinators::BoxBody; use http_body_util::BodyExt; +use http_body_util::combinators::BoxBody; use hyper::body::Body; use hyper::body::Incoming; use hyper::client::conn::http1::SendRequest; -use netpod::log::*; -use netpod::ReqCtx; use netpod::APP_JSON; +use netpod::ReqCtx; use netpod::X_DAQBUF_REQID; +use netpod::log::*; use serde::Serialize; use std::fmt; use std::pin::Pin; @@ -123,6 +123,10 @@ pub fn internal_error() -> http::Response { res } +pub fn bad_request_response(msg: String, reqid: impl AsRef) -> http::Response { + error_status_response(StatusCode::BAD_REQUEST, msg, reqid) +} + pub fn error_response(msg: String, reqid: impl AsRef) -> http::Response { error_status_response(StatusCode::INTERNAL_SERVER_ERROR, msg, reqid) } @@ -361,11 +365,7 @@ where let host = uri.host().ok_or_else(|| Error::NoHostInUrl)?; let port = uri.port_u16().unwrap_or_else(|| { // NOTE known issue: url::Url will "forget" the port if it was the default port. - if scheme == "https" { - 443 - } else { - 80 - } + if scheme == "https" { 443 } else { 80 } }); let stream = TcpStream::connect(format!("{host}:{port}")).await?; if false { diff --git a/crates/httpret/Cargo.toml b/crates/httpret/Cargo.toml index affe900..df9136c 100644 --- a/crates/httpret/Cargo.toml +++ b/crates/httpret/Cargo.toml @@ -43,6 +43,7 @@ dbconn = { path = "../dbconn" } disk = { path = "../disk" } items_0 = { path = "../../../daqbuf-items-0", package = "daqbuf-items-0" } items_2 = { path = "../../../daqbuf-items-2", package = "daqbuf-items-2" } +series = { path = "../../../daqbuf-series", package = "daqbuf-series" } parse = { path = "../../../daqbuf-parse", package = "daqbuf-parse" } streams = { path = "../../../daqbuf-streams", package = "daqbuf-streams" } streamio = { path = "../streamio" } @@ -54,7 +55,7 @@ daqbuf-redis = { path = "../daqbuf-redis" } httpclient = { path = "../httpclient" } [features] -#default = [] -default = ["http3"] +default = [] +# default = ["http3"] prometheus_endpoint = [] http3 = ["h3", "h3-quinn", "quinn", "rustls", "rustls-pki-types"] diff --git a/crates/httpret/src/api4.rs b/crates/httpret/src/api4.rs index 523da12..7e63603 100644 --- a/crates/httpret/src/api4.rs +++ b/crates/httpret/src/api4.rs @@ -1,6 +1,7 @@ pub mod accounting; pub mod backend; pub mod binned; +pub mod binwriteindex; pub mod databuffer_tools; pub mod docs; pub mod eventdata; diff --git a/crates/httpret/src/api4/binned.rs b/crates/httpret/src/api4/binned.rs index c860cef..b0fa73a 100644 --- a/crates/httpret/src/api4/binned.rs +++ b/crates/httpret/src/api4/binned.rs @@ -11,6 +11,7 @@ use http::header::CONTENT_TYPE; use http::request::Parts; use http::Method; use http::StatusCode; +use httpclient::bad_request_response; use httpclient::body_empty; use httpclient::body_stream; use httpclient::error_response; @@ -101,11 +102,11 @@ impl BinnedHandler { Ok(res) } Error::BadQuery(msg) => { - let res = error_response(format!("bad query: {msg}"), ctx.reqid()); + let res = bad_request_response(msg, ctx.reqid()); Ok(res) } _ => { - error!("EventsHandler sees: {e}"); + error!("EventsHandler sees: {}", e); Ok(error_response(e.to_string(), ctx.reqid())) } }, @@ -131,7 +132,7 @@ async fn binned( let reqid = ctx.reqid(); let (head, _body) = req.into_parts(); let query = BinnedQuery::from_url(&url).map_err(|e| { - error!("binned_cbor_framed: {e:?}"); + error!("binned_cbor_framed: {}", e); Error::BadQuery(e.to_string()) })?; let logspan = if query.log_level() == "trace" { diff --git a/crates/httpret/src/api4/binwriteindex.rs b/crates/httpret/src/api4/binwriteindex.rs new file mode 100644 index 0000000..cc7d0b7 --- /dev/null +++ b/crates/httpret/src/api4/binwriteindex.rs @@ -0,0 +1,315 @@ +use crate::bodystream::response; +use crate::channelconfig::ch_conf_from_binned; +use crate::requests::accepts_cbor_framed; +use crate::requests::accepts_json_framed; +use crate::requests::accepts_json_or_all; +use crate::requests::accepts_octets; +use crate::ServiceSharedResources; +use daqbuf_err as err; +use dbconn::worker::PgQueue; +use futures_util::StreamExt; +use futures_util::TryStreamExt; +use http::header::CONTENT_TYPE; +use http::request::Parts; +use http::Method; +use http::StatusCode; +use httpclient::bad_request_response; +use httpclient::body_empty; +use httpclient::body_stream; +use httpclient::error_response; +use httpclient::error_status_response; +use httpclient::not_found_response; +use httpclient::IntoBody; +use httpclient::Requ; +use httpclient::StreamResponse; +use httpclient::ToJsonBody; +use netpod::log; +use netpod::req_uri_to_url; +use netpod::timeunits::SEC; +use netpod::ttl::RetentionTime; +use netpod::ChannelTypeConfigGen; +use netpod::FromUrl; +use netpod::NodeConfigCached; +use netpod::ReqCtx; +use netpod::APP_CBOR_FRAMED; +use netpod::APP_JSON; +use netpod::APP_JSON_FRAMED; +use netpod::HEADER_NAME_REQUEST_ID; +use nodenet::client::OpenBoxedBytesViaHttp; +use nodenet::scylla::ScyllaEventReadProvider; +use query::api4::binned::BinWriteIndexQuery; +use query::api4::binned::BinnedQuery; +use scyllaconn::worker::ScyllaQueue; +use series::msp::PrebinnedPartitioning; +use series::SeriesId; +use std::pin::Pin; +use std::sync::Arc; +use streams::eventsplainreader::DummyCacheReadProvider; +use streams::eventsplainreader::SfDatabufferEventReadProvider; +use streams::streamtimeout::StreamTimeout2; +use streams::timebin::cached::reader::EventsReadProvider; +use streams::timebin::CacheReadProvider; +use tracing::Instrument; +use tracing::Span; +use url::Url; + +macro_rules! error { ($($arg:expr),*) => ( if true { log::error!($($arg),*); } ); } +macro_rules! info { ($($arg:expr),*) => ( if true { log::info!($($arg),*); } ); } +macro_rules! debug { ($($arg:expr),*) => ( if true { log::debug!($($arg),*); } ); } +macro_rules! trace { ($($arg:expr),*) => ( if true { log::trace!($($arg),*); } ); } + +autoerr::create_error_v1!( + name(Error, "Api4BinWriteIndex"), + enum variants { + ChannelNotFound, + BadQuery(String), + HttpLib(#[from] http::Error), + ChannelConfig(crate::channelconfig::Error), + Retrieval(#[from] crate::RetrievalError), + EventsCbor(#[from] streams::plaineventscbor::Error), + EventsJson(#[from] streams::plaineventsjson::Error), + ServerError, + BinnedStream(err::Error), + TimebinnedJson(#[from] streams::timebinnedjson::Error), + }, +); + +impl From for Error { + fn from(value: crate::channelconfig::Error) -> Self { + use crate::channelconfig::Error::*; + match value { + NotFound(_) => Self::ChannelNotFound, + _ => Self::ChannelConfig(value), + } + } +} + +impl From for crate::RetrievalError { + fn from(value: Error) -> Self { + crate::RetrievalError::TextError(value.to_string()) + } +} + +pub struct BinWriteIndexHandler {} + +impl BinWriteIndexHandler { + pub fn handler(req: &Requ) -> Option { + if req.uri().path() == "/api/4/private/binwriteindex" { + Some(Self {}) + } else { + None + } + } + + pub async fn handle( + &self, + req: Requ, + ctx: &ReqCtx, + shared_res: &ServiceSharedResources, + ncc: &NodeConfigCached, + ) -> Result { + if req.method() != Method::GET { + return Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?); + } + match handle_request(req, ctx, &shared_res.pgqueue, shared_res.scyqueue.clone(), ncc).await { + Ok(ret) => Ok(ret), + Err(e) => match e { + Error::ChannelNotFound => { + let res = not_found_response("channel not found".into(), ctx.reqid()); + Ok(res) + } + Error::BadQuery(msg) => { + let res = bad_request_response(msg, ctx.reqid()); + Ok(res) + } + _ => { + error!("EventsHandler sees: {}", e); + Ok(error_response(e.to_string(), ctx.reqid())) + } + }, + } + } +} + +async fn handle_request( + req: Requ, + ctx: &ReqCtx, + pgqueue: &PgQueue, + scyqueue: Option, + ncc: &NodeConfigCached, +) -> Result { + let url = req_uri_to_url(req.uri()).map_err(|e| Error::BadQuery(e.to_string()))?; + if req + .uri() + .path_and_query() + .map_or(false, |x| x.as_str().contains("DOERR")) + { + Err(Error::ServerError)?; + } + let reqid = ctx.reqid(); + let (head, _body) = req.into_parts(); + let query = BinWriteIndexQuery::from_url(&url).map_err(|e| { + error!("handle_request: {}", e); + Error::BadQuery(e.to_string()) + })?; + info!("{:?}", query); + let logspan = if query.log_level() == "trace" { + trace!("enable trace for handler"); + tracing::span!(tracing::Level::INFO, "log_span_trace") + } else if query.log_level() == "debug" { + debug!("enable debug for handler"); + tracing::span!(tracing::Level::INFO, "log_span_debug") + } else { + tracing::Span::none() + }; + let span1 = tracing::span!( + tracing::Level::INFO, + "binwriteindex", + reqid, + beg = query.range().beg_u64() / SEC, + end = query.range().end_u64() / SEC, + ch = query.channel().name(), + ); + span1.in_scope(|| { + debug!("binned begin {:?}", query); + }); + binned_instrumented(head, ctx, url, query, pgqueue, scyqueue, ncc, logspan.clone()) + .instrument(logspan) + .instrument(span1) + .await +} + +async fn binned_instrumented( + head: Parts, + ctx: &ReqCtx, + url: Url, + query: BinWriteIndexQuery, + pgqueue: &PgQueue, + scyqueue: Option, + ncc: &NodeConfigCached, + logspan: Span, +) -> Result { + let res2 = HandleRes2::new(ctx, logspan, url, query.clone(), pgqueue, scyqueue, ncc).await?; + if accepts_json_or_all(&head.headers) { + Ok(binned_json_single(res2, ctx, ncc).await?) + } else { + let ret = error_response(format!("Unsupported Accept: {:?}", &head.headers), ctx.reqid()); + Ok(ret) + } +} + +fn make_read_provider( + chname: &str, + scyqueue: Option, + open_bytes: Pin>, + ctx: &ReqCtx, + ncc: &NodeConfigCached, +) -> (Arc, Arc) { + let events_read_provider = if chname.starts_with("unittest") { + let x = streams::teststream::UnitTestStream::new(); + Arc::new(x) + } else if ncc.node_config.cluster.scylla_lt().is_some() { + scyqueue + .clone() + .map(|qu| ScyllaEventReadProvider::new(qu)) + .map(|x| Arc::new(x) as Arc) + .expect("scylla queue") + } else if ncc.node.sf_databuffer.is_some() { + // TODO do not clone the request. Pass an Arc up to here. + let x = SfDatabufferEventReadProvider::new(Arc::new(ctx.clone()), open_bytes); + Arc::new(x) + } else { + panic!("unexpected backend") + }; + let cache_read_provider = if ncc.node_config.cluster.scylla_lt().is_some() { + scyqueue + .clone() + .map(|qu| scyllaconn::bincache::ScyllaPrebinnedReadProvider::new(qu)) + .map(|x| Arc::new(x) as Arc) + .expect("scylla queue") + } else if ncc.node.sf_databuffer.is_some() { + let x = DummyCacheReadProvider::new(); + Arc::new(x) + } else { + panic!("unexpected backend") + }; + (events_read_provider, cache_read_provider) +} + +async fn binned_json_single( + res2: HandleRes2<'_>, + ctx: &ReqCtx, + _ncc: &NodeConfigCached, +) -> Result { + // TODO unify with binned_json_framed + debug!("binned_json_single"); + let rt1 = res2.query.retention_time_1(); + let rt2 = res2.query.retention_time_2(); + let pbp = res2.query.prebinned_partitioning(); + // let rts = [RetentionTime::Short, RetentionTime::Medium, RetentionTime::Long]; + // for rt in rts { + let mut strings = Vec::new(); + { + let mut stream = scyllaconn::binwriteindex::BinWriteIndexRtStream::new( + rt1, + rt2, + SeriesId::new(res2.ch_conf.series().unwrap()), + pbp.clone(), + res2.query.range().to_time().unwrap(), + res2.scyqueue.clone().unwrap(), + ); + while let Some(x) = stream.next().await { + strings.push(format!("{:?}", x)); + } + } + let ret = response(StatusCode::OK) + .header(CONTENT_TYPE, APP_JSON) + .header(HEADER_NAME_REQUEST_ID, ctx.reqid()) + .body(ToJsonBody::from(&strings).into_body())?; + Ok(ret) +} + +struct HandleRes2<'a> { + logspan: Span, + url: Url, + query: BinWriteIndexQuery, + ch_conf: ChannelTypeConfigGen, + events_read_provider: Arc, + cache_read_provider: Arc, + timeout_provider: Box, + pgqueue: &'a PgQueue, + scyqueue: Option, +} + +impl<'a> HandleRes2<'a> { + async fn new( + ctx: &ReqCtx, + logspan: Span, + url: Url, + query: BinWriteIndexQuery, + pgqueue: &'a PgQueue, + scyqueue: Option, + ncc: &NodeConfigCached, + ) -> Result { + let q2 = BinnedQuery::new(query.channel().clone(), query.range().clone(), 100); + let ch_conf = ch_conf_from_binned(&q2, ctx, pgqueue, ncc) + .await? + .ok_or_else(|| Error::ChannelNotFound)?; + let open_bytes = Arc::pin(OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone())); + let (events_read_provider, cache_read_provider) = + make_read_provider(ch_conf.name(), scyqueue.clone(), open_bytes, ctx, ncc); + let timeout_provider = streamio::streamtimeout::StreamTimeout::boxed(); + let ret = Self { + logspan, + url, + query, + ch_conf, + events_read_provider, + cache_read_provider, + timeout_provider, + pgqueue, + scyqueue, + }; + Ok(ret) + } +} diff --git a/crates/httpret/src/api4/search.rs b/crates/httpret/src/api4/search.rs index 1a9a66e..74480e6 100644 --- a/crates/httpret/src/api4/search.rs +++ b/crates/httpret/src/api4/search.rs @@ -67,6 +67,7 @@ async fn channel_search(req: Requ, pgqueue: &PgQueue, ncc: &NodeConfigCached) -> }; let res = dbconn::search::search_channel(query, pgqueue, ncc) .instrument(logspan) - .await?; + .await + .map_err(|e| Error::from_to_string(e))?; Ok(res) } diff --git a/crates/httpret/src/http3_dummy.rs b/crates/httpret/src/http3_dummy.rs index 31e24fb..a4bbe30 100644 --- a/crates/httpret/src/http3_dummy.rs +++ b/crates/httpret/src/http3_dummy.rs @@ -15,7 +15,7 @@ autoerr::create_error_v1!( pub struct Http3Support {} impl Http3Support { - pub async fn new(bind_addr: SocketAddr) -> Result { + pub async fn new(_bind_addr: SocketAddr) -> Result { let ret = Self {}; Ok(ret) } @@ -28,7 +28,7 @@ impl Http3Support { impl Future for Http3Support { type Output = Result<(), Error>; - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll { todo!() } } diff --git a/crates/httpret/src/httpret.rs b/crates/httpret/src/httpret.rs index b7fe833..0073dc5 100644 --- a/crates/httpret/src/httpret.rs +++ b/crates/httpret/src/httpret.rs @@ -129,7 +129,7 @@ pub async fn host(ncc: NodeConfigCached, service_version: ServiceVersion) -> Res match x { Ok(()) => {} Err(e) => { - error!("received error from PgWorker: {e}"); + error!("received error from PgWorker: {}", e); } } }); @@ -150,7 +150,7 @@ pub async fn host(ncc: NodeConfigCached, service_version: ServiceVersion) -> Res match x { Ok(()) => {} Err(e) => { - error!("received error from ScyllaWorker: {e}"); + error!("received error from ScyllaWorker: {}", e); } } }); @@ -162,6 +162,7 @@ pub async fn host(ncc: NodeConfigCached, service_version: ServiceVersion) -> Res let shared_res = Arc::new(shared_res); use std::str::FromStr; let bind_addr = SocketAddr::from_str(&format!("{}:{}", ncc.node.listen(), ncc.node.port))?; + #[cfg(feature = "http3")] let http3 = http3::Http3Support::new_or_dummy(bind_addr.clone()).await?; // tokio::net::TcpSocket::new_v4()?.listen(200)? let listener = TcpListener::bind(bind_addr).await?; @@ -171,7 +172,7 @@ pub async fn host(ncc: NodeConfigCached, service_version: ServiceVersion) -> Res } else { break; }; - debug!("new connection from {addr}"); + debug!("new connection from {}", addr); let node_config = ncc.clone(); let service_version = service_version.clone(); let io = TokioIo::new(stream); @@ -194,13 +195,26 @@ pub async fn host(ncc: NodeConfigCached, service_version: ServiceVersion) -> Res match res { Ok(()) => {} Err(e) => { - error!("error from serve_connection: {e}"); + if e.is_body_write_aborted() { + info!("http conn body write abort: {}", e) + } else if e.is_closed() { + info!("http conn close: {}", e) + } else if e.is_canceled() { + info!("http conn cancel: {}", e) + } else if e.is_timeout() { + info!("http conn timeout: {}", e) + } else { + warn!("error from serve_connection: {}", e); + } } } }); } info!("http service done"); - let _x: () = http3.wait_idle().await; + #[cfg(feature = "http3")] + { + let _x: () = http3.wait_idle().await; + } info!("http host done"); // rawjh.await??; Ok(()) @@ -365,6 +379,8 @@ async fn http_service_inner( } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?) } + } else if let Some(h) = api4::binwriteindex::BinWriteIndexHandler::handler(&req) { + Ok(h.handle(req, ctx, &shared_res, &node_config).await?) } else if let Some(h) = api4::eventdata::EventDataHandler::handler(&req) { Ok(h.handle(req, ctx, &node_config, shared_res) .await diff --git a/crates/httpret/src/proxy.rs b/crates/httpret/src/proxy.rs index a98a7c2..5aa358f 100644 --- a/crates/httpret/src/proxy.rs +++ b/crates/httpret/src/proxy.rs @@ -71,8 +71,6 @@ use url::Url; #[cfg(feature = "http3")] use crate::http3; -#[cfg(not(feature = "http3"))] -use crate::http3_dummy as http3; const DISTRI_PRE: &str = "/distri/"; @@ -80,6 +78,7 @@ pub async fn proxy(proxy_config: ProxyConfig, service_version: ServiceVersion) - status_board_init(); use std::str::FromStr; let bind_addr = SocketAddr::from_str(&format!("{}:{}", proxy_config.listen, proxy_config.port))?; + #[cfg(feature = "http3")] let http3 = http3::Http3Support::new_or_dummy(bind_addr.clone()).await?; let listener = TcpListener::bind(bind_addr).await?; loop { @@ -110,7 +109,10 @@ pub async fn proxy(proxy_config: ProxyConfig, service_version: ServiceVersion) - }); } info!("http service done"); - let _x: () = http3.wait_idle().await; + #[cfg(feature = "http3")] + { + let _x: () = http3.wait_idle().await; + } info!("http host done"); Ok(()) } diff --git a/crates/scyllaconn/src/binwriteindex.rs b/crates/scyllaconn/src/binwriteindex.rs new file mode 100644 index 0000000..84b20ec --- /dev/null +++ b/crates/scyllaconn/src/binwriteindex.rs @@ -0,0 +1,174 @@ +pub mod bwxcmb; + +use crate::worker::ScyllaQueue; +use daqbuf_series::msp::MspU32; +use daqbuf_series::msp::PrebinnedPartitioning; +use daqbuf_series::SeriesId; +use futures_util::Future; +use futures_util::FutureExt; +use futures_util::Stream; +use netpod::log; +use netpod::range::evrange::NanoRange; +use netpod::ttl::RetentionTime; +use std::collections::VecDeque; +use std::fmt; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +macro_rules! info { ($($arg:expr),*) => ( if true { log::info!($($arg),*); } ); } +macro_rules! debug { ($($arg:expr),*) => ( if true { log::debug!($($arg),*); } ); } + +autoerr::create_error_v1!( + name(Error, "BinWriteIndexRtStream"), + enum variants { + Worker(#[from] crate::worker::Error), + }, +); + +struct Fut1(Fut2); + +impl fmt::Debug for Fut1 { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_tuple("Fut1").finish() + } +} + +type Fut2 = + Pin), crate::worker::Error>> + Send>>; + +#[derive(Debug)] +pub struct BinWriteIndexEntry { + pub rt: u16, + pub lsp: u32, + pub binlen: u32, +} + +#[derive(Debug)] +pub struct BinWriteIndexSet { + pub rt: RetentionTime, + pub msp: MspU32, + pub entries: VecDeque, +} + +#[derive(Debug)] +pub struct BinWriteIndexRtStream { + rt1: RetentionTime, + rt2: RetentionTime, + series: SeriesId, + scyqueue: ScyllaQueue, + pbp: PrebinnedPartitioning, + msp: u32, + lsp_min: u32, + msp_end: u32, + lsp_end: u32, + fut1: Option, +} + +impl BinWriteIndexRtStream { + pub fn type_name() -> &'static str { + std::any::type_name::() + } + + pub fn new( + rt1: RetentionTime, + rt2: RetentionTime, + series: SeriesId, + pbp: PrebinnedPartitioning, + range: NanoRange, + scyqueue: ScyllaQueue, + ) -> Self { + info!("{}::new INFO/DEBUG test", Self::type_name()); + debug!("{}::new", Self::type_name()); + let (msp_beg, lsp_beg) = pbp.msp_lsp(range.beg_ts().to_ts_ms()); + let (msp_end, lsp_end) = pbp.msp_lsp(range.end_ts().add_dt_nano(pbp.bin_len().dt_ns()).to_ts_ms()); + BinWriteIndexRtStream { + rt1, + rt2, + series, + scyqueue, + pbp, + msp: msp_beg, + lsp_min: lsp_beg, + msp_end, + lsp_end, + fut1: None, + } + } + + async fn next_query_fut( + scyqueue: &ScyllaQueue, + rt1: RetentionTime, + rt2: RetentionTime, + series: SeriesId, + pbp: PrebinnedPartitioning, + msp: u32, + lsp_min: u32, + lsp_max: u32, + ) -> Result<(u32, u32, u32, VecDeque), crate::worker::Error> { + debug!("make_next_query_fut msp {} lsp {} {}", msp, lsp_min, lsp_max); + let res = scyqueue + .bin_write_index_read(rt1, rt2, series, pbp, MspU32(msp), lsp_min, lsp_max) + .await?; + Ok((msp, lsp_min, lsp_max, res)) + } + + fn make_next_query_fut(mut self: Pin<&mut Self>, _cx: &mut Context) -> Option { + if self.msp <= self.msp_end { + let msp = self.msp; + let lsp_min = self.lsp_min; + self.msp += 1; + self.lsp_min = 0; + let lsp_max = if self.msp == self.msp_end { + self.lsp_end + } else { + self.pbp.patch_len() + }; + let scyqueue = unsafe { netpod::extltref(&self.scyqueue) }; + let fut = Self::next_query_fut( + scyqueue, + self.rt1.clone(), + self.rt2.clone(), + self.series.clone(), + self.pbp.clone(), + msp, + lsp_min, + lsp_max, + ); + Some(Fut1(Box::pin(fut))) + } else { + debug!("make_next_query_fut done"); + None + } + } +} + +impl Stream for BinWriteIndexRtStream { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + loop { + break if let Some(fut) = self.fut1.as_mut() { + match fut.0.poll_unpin(cx) { + Ready(Ok(x)) => { + self.fut1 = None; + let item = BinWriteIndexSet { + rt: self.rt1.clone(), + msp: MspU32(x.0), + entries: x.3, + }; + Ready(Some(Ok(item))) + } + Ready(Err(e)) => Ready(Some(Err(e.into()))), + Pending => Pending, + } + } else if let Some(fut) = self.as_mut().make_next_query_fut(cx) { + self.fut1 = Some(fut); + continue; + } else { + Ready(None) + }; + } + } +} diff --git a/crates/scyllaconn/src/binwriteindex/bwxcmb.rs b/crates/scyllaconn/src/binwriteindex/bwxcmb.rs new file mode 100644 index 0000000..cbd8c03 --- /dev/null +++ b/crates/scyllaconn/src/binwriteindex/bwxcmb.rs @@ -0,0 +1,126 @@ +use super::BinWriteIndexRtStream; +use crate::worker::ScyllaQueue; +use daqbuf_series::msp::PrebinnedPartitioning; +use daqbuf_series::SeriesId; +use futures_util::Future; +use futures_util::Stream; +use futures_util::StreamExt; +use netpod::log; +use netpod::range::evrange::NanoRange; +use netpod::ttl::RetentionTime; +use std::collections::VecDeque; +use std::fmt; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +macro_rules! debug { ($($arg:expr),*) => ( if true { log::debug!($($arg),*); } ); } + +autoerr::create_error_v1!( + name(Error, "BinWriteIndexStream"), + enum variants { + A, + }, +); + +type Fut1Res = ::Item; + +struct Fut1a(Pin>>); + +impl fmt::Debug for Fut1a { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_tuple("Fut1a").finish() + } +} + +#[derive(Debug)] +enum InpSt { + Polling(BinWriteIndexRtStream), + Ready(BinWriteIndexRtStream, Fut1Res), + Done, +} + +#[derive(Debug)] +pub struct BinWriteIndexStream { + rtss: VecDeque, +} + +impl BinWriteIndexStream { + pub fn type_name() -> &'static str { + std::any::type_name::() + } + + pub fn new(series: SeriesId, range: NanoRange, scyqueue: ScyllaQueue) -> Self { + debug!("{}::new", Self::type_name()); + let mut rtss = VecDeque::new(); + let rts = [RetentionTime::Short, RetentionTime::Medium, RetentionTime::Long]; + for rt in rts { + let s = BinWriteIndexRtStream::new( + rt.clone(), + rt.clone(), + series.clone(), + PrebinnedPartitioning::Day1, + range.clone(), + scyqueue.clone(), + ); + rtss.push_back(InpSt::Polling(s)); + } + BinWriteIndexStream { rtss } + } + + fn abort(&mut self) { + for inp in self.rtss.iter_mut() { + *inp = InpSt::Done; + } + } +} + +impl Stream for BinWriteIndexStream { + type Item = Result<(), Error>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + // TODO poll the streams in `self.rtss`. When any of them returns an item, store it + // in the variant InpSt::Ready(..) together with the remaining stream. + // When all input streams have an item ready, store all items in a new variable + // in the type Self and transition all inputs to Polling again. + // If any of the input streams returns None, we return None as well. + use Poll::*; + loop { + let mut ready_cnt: u16 = 0; + let mut do_abort = false; + let mut have_pending = false; + for inp in self.rtss.iter_mut() { + match inp { + InpSt::Polling(fut) => match fut.poll_next_unpin(cx) { + Ready(Some(item)) => { + let fut = todo!("bwxcmb keep stream in state"); + *inp = InpSt::Ready(fut, item); + ready_cnt += 1; + } + Ready(None) => { + do_abort = true; + } + Pending => { + have_pending = true; + } + }, + InpSt::Ready(..) => { + ready_cnt += 1; + } + InpSt::Done => {} + } + } + if do_abort { + self.abort(); + } + break if ready_cnt == self.rtss.len() as u16 { + todo!("TODO bwxcmb") + } else if have_pending { + Pending + } else { + self.abort(); + Ready(None) + }; + } + } +} diff --git a/crates/scyllaconn/src/events2/mergert.rs b/crates/scyllaconn/src/events2/mergert.rs index fd83990..9212190 100644 --- a/crates/scyllaconn/src/events2/mergert.rs +++ b/crates/scyllaconn/src/events2/mergert.rs @@ -3,9 +3,6 @@ use super::events::EventsStreamRt; use crate::events2::onebeforeandbulk::OneBeforeAndBulk; use crate::range::ScyllaSeriesRange; use crate::worker::ScyllaQueue; -use daqbuf_err as err; -use err::thiserror; -use err::ThisError; use futures_util::Stream; use futures_util::StreamExt; use items_0::streamitem::sitem_err2_from_string; @@ -14,20 +11,21 @@ use items_0::streamitem::SitemErrTy; use items_0::streamitem::StreamItem; use items_2::channelevents::ChannelEvents; use items_2::merger::Merger; -use netpod::log::*; +use netpod::log; use netpod::ttl::RetentionTime; use netpod::ChConf; use std::pin::Pin; use std::task::Context; use std::task::Poll; -macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace!($($arg)*); } ) } +macro_rules! trace_init { ($($arg:expr),*) => ( if true { log::trace!($($arg),*); } ) } -#[derive(Debug, ThisError)] -#[cstm(name = "EventsMergeRt")] -pub enum Error { - Msg(String), -} +autoerr::create_error_v1!( + name(Error, "EventsMergeRt"), + enum variants { + Msg(String), + }, +); pub struct MergeRts { inp: Pin> + Send>>, diff --git a/crates/scyllaconn/src/events2/msp.rs b/crates/scyllaconn/src/events2/msp.rs index 5521845..fb2eb64 100644 --- a/crates/scyllaconn/src/events2/msp.rs +++ b/crates/scyllaconn/src/events2/msp.rs @@ -1,15 +1,12 @@ -use super::prepare::StmtsEvents; +use crate::events2::prepare::StmtsEvents; use crate::range::ScyllaSeriesRange; use crate::worker::ScyllaQueue; -use daqbuf_err as err; use daqbuf_series::SeriesId; -use err::thiserror; -use err::ThisError; use futures_util::Future; use futures_util::FutureExt; use futures_util::Stream; use futures_util::TryStreamExt; -use netpod::log::*; +use netpod::log; use netpod::ttl::RetentionTime; use netpod::TsMs; use netpod::TsMsVecFmt; @@ -19,17 +16,20 @@ use std::pin::Pin; use std::task::Context; use std::task::Poll; -macro_rules! trace_emit { ($det:expr, $($arg:tt)*) => ( if $det { trace!($($arg)*); } ) } +macro_rules! trace_emit { ($det:expr, $($arg:expr),*) => ( if $det { log::trace!($($arg),*); } ) } -#[derive(Debug, ThisError)] -#[cstm(name = "EventsMsp")] -pub enum Error { - Logic, - Worker(Box), - ScyllaQuery(#[from] scylla::transport::errors::QueryError), - ScyllaRow(#[from] scylla::transport::iterator::NextRowError), - ScyllaTypeCheck(#[from] scylla::deserialize::TypeCheckError), -} +macro_rules! trace_msp { ($($arg:expr),*) => ( if true { log::trace!($($arg),*); } ) } + +autoerr::create_error_v1!( + name(Error, "EventsMsp"), + enum variants { + Logic, + Worker(Box), + ScyllaQuery(#[from] scylla::transport::errors::QueryError), + ScyllaRow(#[from] scylla::transport::iterator::NextRowError), + ScyllaTypeCheck(#[from] scylla::deserialize::TypeCheckError), + }, +); impl From for Error { fn from(value: crate::worker::Error) -> Self { @@ -237,7 +237,7 @@ impl Stream for MspStreamRt { Ready(Err(e)) => { trace_emit!(trdet, "bulk fwd error {e}"); *rsv = Resolvable::Taken; - error!("{e}"); + log::error!("{e}"); } Pending => { trace_emit!(trdet, "bulk fwd Pending"); @@ -305,7 +305,7 @@ pub async fn find_ts_msp( stmts: &StmtsEvents, scy: &Session, ) -> Result, Error> { - trace!( + trace_msp!( "find_ts_msp series {:?} {:?} {} bck {}", rt, series, diff --git a/crates/scyllaconn/src/events2/prepare.rs b/crates/scyllaconn/src/events2/prepare.rs index 4af3f7a..639a0f2 100644 --- a/crates/scyllaconn/src/events2/prepare.rs +++ b/crates/scyllaconn/src/events2/prepare.rs @@ -79,6 +79,7 @@ pub struct StmtsEventsRt { lsp_fwd_ts: StmtsLspDir, lsp_bck_ts: StmtsLspDir, prebinned_f32: PreparedStatement, + bin_write_index_read: PreparedStatement, } impl StmtsEventsRt { @@ -109,6 +110,10 @@ impl StmtsEventsRt { pub fn prebinned_f32(&self) -> &PreparedStatement { &self.prebinned_f32 } + + pub fn bin_write_index_read(&self) -> &PreparedStatement { + &self.bin_write_index_read + } } async fn make_msp_dir(ks: &str, rt: &RetentionTime, bck: bool, scy: &Session) -> Result { @@ -228,6 +233,21 @@ async fn make_prebinned_f32(ks: &str, rt: &RetentionTime, scy: &Session) -> Resu Ok(qu) } +async fn make_bin_write_index_read(ks: &str, rt: &RetentionTime, scy: &Session) -> Result { + let cql = format!( + concat!( + "select rt, lsp, binlen", + " from {}.{}bin_write_index_v03", + " where series = ? and pbp = ? and msp = ? and rt = ?", + " and lsp >= ? and lsp < ?", + ), + ks, + rt.table_prefix() + ); + let qu = scy.prepare(cql).await?; + Ok(qu) +} + async fn make_rt(ks: &str, rt: &RetentionTime, scy: &Session) -> Result { let ret = StmtsEventsRt { ts_msp_fwd: make_msp_dir(ks, rt, false, scy).await?, @@ -237,6 +257,7 @@ async fn make_rt(ks: &str, rt: &RetentionTime, scy: &Session) -> Result Result<(), Error> { - let table = "binned_scalar_f32"; - let cql = format!( - concat!("alter table {}.{}{}", " add lst float"), - &scyco.keyspace, - rt.table_prefix(), - table - ); - let _ = scy.query_unpaged(cql, ()).await; - Ok(()) -} diff --git a/crates/scyllaconn/src/scyllaconn.rs b/crates/scyllaconn/src/scyllaconn.rs index 958db83..a159f23 100644 --- a/crates/scyllaconn/src/scyllaconn.rs +++ b/crates/scyllaconn/src/scyllaconn.rs @@ -1,10 +1,10 @@ pub mod accounting; pub mod bincache; +pub mod binwriteindex; pub mod conn; pub mod errconv; pub mod events2; pub mod range; -pub mod schema; pub mod status; pub mod worker; diff --git a/crates/scyllaconn/src/worker.rs b/crates/scyllaconn/src/worker.rs index 813ed87..cd79194 100644 --- a/crates/scyllaconn/src/worker.rs +++ b/crates/scyllaconn/src/worker.rs @@ -1,3 +1,4 @@ +use crate::binwriteindex::BinWriteIndexEntry; use crate::conn::create_scy_session_no_ks; use crate::events2::events::ReadJobTrace; use crate::events2::prepare::StmtsEvents; @@ -5,11 +6,15 @@ use crate::range::ScyllaSeriesRange; use async_channel::Receiver; use async_channel::Sender; use daqbuf_err as err; +use daqbuf_series::msp::MspU32; +use daqbuf_series::msp::PrebinnedPartitioning; +use daqbuf_series::SeriesId; use futures_util::Future; use futures_util::StreamExt; +use futures_util::TryStreamExt; use items_0::timebin::BinningggContainerEventsDyn; use items_2::binning::container_bins::ContainerBins; -use netpod::log::*; +use netpod::log; use netpod::ttl::RetentionTime; use netpod::DtMs; use netpod::ScyllaConfig; @@ -20,6 +25,9 @@ use std::fmt; use std::pin::Pin; use std::sync::Arc; +macro_rules! info { ($($arg:expr),*) => ( if true { log::info!($($arg),*); } ); } +macro_rules! debug { ($($arg:expr),*) => ( if true { log::debug!($($arg),*); } ); } + const CONCURRENT_QUERIES_PER_WORKER: usize = 80; const SCYLLA_WORKER_QUEUE_LEN: usize = 200; @@ -36,10 +44,19 @@ autoerr::create_error_v1!( Toplist(#[from] crate::accounting::toplist::Error), MissingKeyspaceConfig, CacheWriteF32(#[from] streams::timebin::cached::reader::Error), - Schema(#[from] crate::schema::Error), + ScyllaQuery(#[from] scylla::transport::errors::QueryError), + ScyllaType(#[from] scylla::deserialize::TypeCheckError), }, ); +impl From> for Error { + fn from(_: async_channel::SendError) -> Self { + Self::ChannelSend + } +} + +type ScySessTy = scylla::transport::session::GenericSession; + #[derive(Debug)] struct ReadPrebinnedF32 { rt: RetentionTime, @@ -50,6 +67,60 @@ struct ReadPrebinnedF32 { tx: Sender, streams::timebin::cached::reader::Error>>, } +#[derive(Debug)] +struct BinWriteIndexRead { + rt1: RetentionTime, + rt2: RetentionTime, + series: SeriesId, + pbp: PrebinnedPartitioning, + msp: MspU32, + lsp_min: u32, + lsp_max: u32, + tx: Sender, Error>>, +} + +impl BinWriteIndexRead { + async fn execute(self, stmts: &StmtsEvents, scy: &ScySessTy) { + // TODO avoid the extra clone + let tx = self.tx.clone(); + match self.execute_inner(stmts, scy).await { + Ok(()) => {} + Err(e) => { + if tx.send(Err(e)).await.is_err() { + // TODO count for stats + } + } + } + } + + async fn execute_inner(self, stmts: &StmtsEvents, scy: &ScySessTy) -> Result<(), Error> { + let params = ( + self.series.id() as i64, + self.pbp.db_ix() as i16, + self.msp.0 as i32, + self.rt2.to_index_db_i32() as i16, + self.lsp_min as i32, + self.lsp_max as i32, + ); + log::info!("execute {:?}", params); + let res = scy + .execute_iter(stmts.rt(&self.rt1).bin_write_index_read().clone(), params) + .await?; + let mut it = res.rows_stream::<(i16, i32, i32)>()?; + let mut all = VecDeque::new(); + while let Some((rt, lsp, binlen)) = it.try_next().await? { + let v = BinWriteIndexEntry { + rt: rt as u16, + lsp: lsp as u32, + binlen: binlen as u32, + }; + all.push_back(v); + } + self.tx.send(Ok(all)).await?; + Ok(()) + } +} + #[derive(Debug)] enum Job { FindTsMsp( @@ -72,6 +143,7 @@ enum Job { Sender>, ), ReadPrebinnedF32(ReadPrebinnedF32), + BinWriteIndexRead(BinWriteIndexRead), } struct ReadNextValues { @@ -197,6 +269,39 @@ impl ScyllaQueue { .map_err(|_| streams::timebin::cached::reader::Error::ChannelRecv)??; Ok(res) } + + pub async fn bin_write_index_read( + &self, + rt1: RetentionTime, + rt2: RetentionTime, + series: SeriesId, + pbp: PrebinnedPartitioning, + msp: MspU32, + lsp_min: u32, + lsp_max: u32, + ) -> Result, Error> { + let (tx, rx) = async_channel::bounded(1); + let job = BinWriteIndexRead { + rt1, + rt2, + series, + pbp, + msp, + lsp_min, + lsp_max, + tx, + }; + let job = Job::BinWriteIndexRead(job); + self.tx + .send(job) + .await + .map_err(|_| streams::timebin::cached::reader::Error::ChannelSend)?; + let res = rx + .recv() + .await + .map_err(|_| streams::timebin::cached::reader::Error::ChannelRecv)??; + Ok(res) + } } #[derive(Debug)] @@ -229,9 +334,6 @@ impl ScyllaWorker { .await .map_err(Error::ScyllaConnection)?; let scy = Arc::new(scy); - crate::schema::schema(RetentionTime::Short, &self.scyconf_st, &scy).await?; - crate::schema::schema(RetentionTime::Medium, &self.scyconf_mt, &scy).await?; - crate::schema::schema(RetentionTime::Long, &self.scyconf_lt, &scy).await?; let kss = [ self.scyconf_st.keyspace.as_str(), self.scyconf_mt.keyspace.as_str(), @@ -294,6 +396,7 @@ impl ScyllaWorker { // TODO count for stats } } + Job::BinWriteIndexRead(job) => job.execute(&stmts, &scy).await, } }) .buffer_unordered(CONCURRENT_QUERIES_PER_WORKER)