Refactor, inspect bin write index helper
This commit is contained in:
@@ -1,8 +1,8 @@
|
||||
[package]
|
||||
name = "daqbuffer"
|
||||
version = "0.5.5-aa.10"
|
||||
version = "0.5.5-aa.11"
|
||||
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
|
||||
edition = "2021"
|
||||
edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
futures-util = "0.3.31"
|
||||
|
||||
@@ -6,14 +6,14 @@ use daqbuf_err::Error;
|
||||
use daqbuffer::cli::ClientType;
|
||||
use daqbuffer::cli::Opts;
|
||||
use daqbuffer::cli::SubCmd;
|
||||
use netpod::log::*;
|
||||
use netpod::query::CacheUsage;
|
||||
use netpod::NodeConfig;
|
||||
use netpod::NodeConfigCached;
|
||||
use netpod::ProxyConfig;
|
||||
use netpod::ScalarType;
|
||||
use netpod::ServiceVersion;
|
||||
use netpod::Shape;
|
||||
use netpod::log::*;
|
||||
use netpod::query::CacheUsage;
|
||||
use taskrun::tokio;
|
||||
use tokio::fs::File;
|
||||
use tokio::io::AsyncReadExt;
|
||||
@@ -140,7 +140,7 @@ async fn go() -> Result<(), Error> {
|
||||
}
|
||||
},
|
||||
SubCmd::GenerateTestData => {
|
||||
disk::gen::gen_test_data().await?;
|
||||
disk::datagen::gen_test_data().await?;
|
||||
}
|
||||
SubCmd::Test => (),
|
||||
SubCmd::Version => {
|
||||
@@ -172,11 +172,11 @@ async fn test_log() {
|
||||
#[cfg(feature = "DISABLED")]
|
||||
fn simple_fetch() {
|
||||
use daqbuffer::err::ErrConv;
|
||||
use netpod::timeunits::*;
|
||||
use netpod::ByteOrder;
|
||||
use netpod::ScalarType;
|
||||
use netpod::SfDbChannel;
|
||||
use netpod::Shape;
|
||||
use netpod::timeunits::*;
|
||||
taskrun::run(async {
|
||||
let _rh = daqbufp2::nodes::require_test_hosts_running()?;
|
||||
let t1 = chrono::Utc::now();
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
name = "dbconn"
|
||||
version = "0.0.2"
|
||||
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
|
||||
edition = "2021"
|
||||
edition = "2024"
|
||||
|
||||
[lib]
|
||||
path = "src/dbconn.rs"
|
||||
|
||||
@@ -1,9 +1,6 @@
|
||||
use crate::ErrConv;
|
||||
use crate::create_connection;
|
||||
use crate::worker::PgQueue;
|
||||
use crate::ErrConv;
|
||||
use daqbuf_err as err;
|
||||
use err::Error;
|
||||
use netpod::log::*;
|
||||
use netpod::ChannelArchiver;
|
||||
use netpod::ChannelSearchQuery;
|
||||
use netpod::ChannelSearchResult;
|
||||
@@ -12,9 +9,25 @@ use netpod::Database;
|
||||
use netpod::NodeConfigCached;
|
||||
use netpod::ScalarType;
|
||||
use netpod::Shape;
|
||||
use netpod::log;
|
||||
use serde_json::Value as JsVal;
|
||||
use tokio_postgres::Client as PgClient;
|
||||
|
||||
macro_rules! debug { ($($arg:expr),*) => ( if true { log::debug!($($arg),*) } ); }
|
||||
|
||||
macro_rules! trace { ($($arg:expr),*) => ( if true { log::trace!($($arg),*) } ); }
|
||||
|
||||
autoerr::create_error_v1!(
|
||||
name(Error, "DbconnSearch"),
|
||||
enum variants {
|
||||
BadShapeFromDb(String),
|
||||
BadArchEngValue(String),
|
||||
Pg(#[from] tokio_postgres::Error),
|
||||
Other(#[from] daqbuf_err::Error),
|
||||
Worker(#[from] crate::worker::Error),
|
||||
},
|
||||
);
|
||||
|
||||
pub async fn search_channel_databuffer(
|
||||
query: ChannelSearchQuery,
|
||||
backend: &str,
|
||||
@@ -52,8 +65,7 @@ pub async fn search_channel_databuffer(
|
||||
&backend,
|
||||
],
|
||||
)
|
||||
.await
|
||||
.err_conv()?;
|
||||
.await?;
|
||||
let mut res = Vec::new();
|
||||
for row in rows {
|
||||
let shapedb: Option<serde_json::Value> = row.get(4);
|
||||
@@ -68,14 +80,14 @@ pub async fn search_channel_databuffer(
|
||||
Some(n) => {
|
||||
a.push(n as u32);
|
||||
}
|
||||
None => return Err(Error::with_msg(format!("can not understand shape {:?}", shapedb))),
|
||||
None => return Err(Error::BadShapeFromDb(format!("{:?}", shapedb))),
|
||||
},
|
||||
_ => return Err(Error::with_msg(format!("can not understand shape {:?}", shapedb))),
|
||||
_ => return Err(Error::BadShapeFromDb(format!("{:?}", shapedb))),
|
||||
}
|
||||
}
|
||||
a
|
||||
}
|
||||
_ => return Err(Error::with_msg(format!("can not understand shape {:?}", shapedb))),
|
||||
_ => return Err(Error::BadShapeFromDb(format!("{:?}", shapedb))),
|
||||
},
|
||||
None => Vec::new(),
|
||||
};
|
||||
@@ -131,7 +143,7 @@ pub(super) async fn search_channel_scylla(
|
||||
regop
|
||||
);
|
||||
let params: &[&(dyn tokio_postgres::types::ToSql + Sync)] = &[&ch_kind, &query.name_regex, &cb1, &cb2];
|
||||
info!("search_channel_scylla {:?}", params);
|
||||
trace!("search_channel_scylla {:?}", params);
|
||||
let rows = pgc.query(sql, params).await.err_conv()?;
|
||||
let mut res = Vec::new();
|
||||
for row in rows {
|
||||
@@ -227,10 +239,7 @@ async fn search_channel_archeng(
|
||||
if k == "Scalar" {
|
||||
Vec::new()
|
||||
} else {
|
||||
return Err(Error::with_msg_no_trace(format!(
|
||||
"search_channel_archeng can not understand {:?}",
|
||||
config
|
||||
)));
|
||||
return Err(Error::BadArchEngValue(format!("{:?}", config)));
|
||||
}
|
||||
}
|
||||
JsVal::Object(k) => match k.get("Wave") {
|
||||
@@ -239,24 +248,15 @@ async fn search_channel_archeng(
|
||||
vec![k.as_i64().unwrap_or(u32::MAX as i64) as u32]
|
||||
}
|
||||
_ => {
|
||||
return Err(Error::with_msg_no_trace(format!(
|
||||
"search_channel_archeng can not understand {:?}",
|
||||
config
|
||||
)));
|
||||
return Err(Error::BadArchEngValue(format!("{:?}", config)));
|
||||
}
|
||||
},
|
||||
None => {
|
||||
return Err(Error::with_msg_no_trace(format!(
|
||||
"search_channel_archeng can not understand {:?}",
|
||||
config
|
||||
)));
|
||||
return Err(Error::BadArchEngValue(format!("{:?}", config)));
|
||||
}
|
||||
},
|
||||
_ => {
|
||||
return Err(Error::with_msg_no_trace(format!(
|
||||
"search_channel_archeng can not understand {:?}",
|
||||
config
|
||||
)));
|
||||
return Err(Error::BadArchEngValue(format!("{:?}", config)));
|
||||
}
|
||||
},
|
||||
None => Vec::new(),
|
||||
@@ -290,16 +290,12 @@ pub async fn search_channel(
|
||||
let mut query = query;
|
||||
query.backend = Some(backend.into());
|
||||
if let Some(_scyconf) = ncc.node_config.cluster.scylla_st() {
|
||||
pgqueue
|
||||
.search_channel_scylla(query)
|
||||
.await
|
||||
.map_err(|e| Error::with_msg_no_trace(format!("db worker error {e}")))?
|
||||
pgqueue.search_channel_scylla(query).await?
|
||||
// search_channel_scylla(query, backend, pgconf).await
|
||||
} else if let Some(conf) = ncc.node.channel_archiver.as_ref() {
|
||||
search_channel_archeng(query, backend.clone(), conf, pgconf).await
|
||||
} else if let Some(_conf) = ncc.node.archiver_appliance.as_ref() {
|
||||
// TODO
|
||||
err::todoval()
|
||||
todo!()
|
||||
} else {
|
||||
search_channel_databuffer(query, backend, ncc).await
|
||||
}
|
||||
|
||||
@@ -3,29 +3,28 @@ use async_channel::Receiver;
|
||||
use async_channel::RecvError;
|
||||
use async_channel::Sender;
|
||||
use daqbuf_err as err;
|
||||
use err::thiserror;
|
||||
use err::ThisError;
|
||||
use netpod::log::*;
|
||||
use netpod::range::evrange::NanoRange;
|
||||
use netpod::ChConf;
|
||||
use netpod::ChannelSearchQuery;
|
||||
use netpod::ChannelSearchResult;
|
||||
use netpod::Database;
|
||||
use netpod::SeriesKind;
|
||||
use netpod::SfDbChannel;
|
||||
use netpod::log::*;
|
||||
use netpod::range::evrange::NanoRange;
|
||||
use taskrun::tokio;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio_postgres::Client;
|
||||
|
||||
#[derive(Debug, ThisError)]
|
||||
#[cstm(name = "PgWorker")]
|
||||
pub enum Error {
|
||||
Error(#[from] err::Error),
|
||||
ChannelSend,
|
||||
ChannelRecv,
|
||||
Join,
|
||||
ChannelConfig(#[from] crate::channelconfig::Error),
|
||||
}
|
||||
autoerr::create_error_v1!(
|
||||
name(Error, "PgWorker"),
|
||||
enum variants {
|
||||
Error(#[from] err::Error),
|
||||
ChannelSend,
|
||||
ChannelRecv,
|
||||
Join,
|
||||
ChannelConfig(#[from] crate::channelconfig::Error),
|
||||
},
|
||||
);
|
||||
|
||||
impl From<RecvError> for Error {
|
||||
fn from(_value: RecvError) -> Self {
|
||||
@@ -51,7 +50,10 @@ enum Job {
|
||||
Vec<u64>,
|
||||
Sender<Result<Vec<Option<crate::channelinfo::ChannelInfo>>, crate::channelinfo::Error>>,
|
||||
),
|
||||
SearchChannel(ChannelSearchQuery, Sender<Result<ChannelSearchResult, err::Error>>),
|
||||
SearchChannel(
|
||||
ChannelSearchQuery,
|
||||
Sender<Result<ChannelSearchResult, crate::search::Error>>,
|
||||
),
|
||||
SfChannelBySeries(
|
||||
netpod::SfDbChannel,
|
||||
Sender<Result<netpod::SfDbChannel, crate::FindChannelError>>,
|
||||
@@ -101,7 +103,7 @@ impl PgQueue {
|
||||
pub async fn search_channel_scylla(
|
||||
&self,
|
||||
query: ChannelSearchQuery,
|
||||
) -> Result<Result<ChannelSearchResult, err::Error>, Error> {
|
||||
) -> Result<Result<ChannelSearchResult, crate::search::Error>, Error> {
|
||||
let (tx, rx) = async_channel::bounded(1);
|
||||
let job = Job::SearchChannel(query, tx);
|
||||
self.tx.send(job).await.map_err(|_| Error::ChannelSend)?;
|
||||
|
||||
@@ -2,13 +2,13 @@
|
||||
pub mod aggtest;
|
||||
pub mod binnedstream;
|
||||
pub mod channelconfig;
|
||||
pub mod datagen;
|
||||
pub mod dataopen;
|
||||
pub mod decode;
|
||||
pub mod eventchunker;
|
||||
pub mod eventchunkermultifile;
|
||||
pub mod eventfilter;
|
||||
pub mod frame;
|
||||
pub mod gen;
|
||||
pub mod index;
|
||||
pub mod merge;
|
||||
pub mod paths;
|
||||
|
||||
@@ -8,20 +8,20 @@ use bytes::BytesMut;
|
||||
use daqbuf_err as err;
|
||||
use futures_util::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use http::header;
|
||||
use http::Request;
|
||||
use http::Response;
|
||||
use http::StatusCode;
|
||||
use http::header;
|
||||
use http_body::Frame;
|
||||
use http_body_util::combinators::BoxBody;
|
||||
use http_body_util::BodyExt;
|
||||
use http_body_util::combinators::BoxBody;
|
||||
use hyper::body::Body;
|
||||
use hyper::body::Incoming;
|
||||
use hyper::client::conn::http1::SendRequest;
|
||||
use netpod::log::*;
|
||||
use netpod::ReqCtx;
|
||||
use netpod::APP_JSON;
|
||||
use netpod::ReqCtx;
|
||||
use netpod::X_DAQBUF_REQID;
|
||||
use netpod::log::*;
|
||||
use serde::Serialize;
|
||||
use std::fmt;
|
||||
use std::pin::Pin;
|
||||
@@ -123,6 +123,10 @@ pub fn internal_error() -> http::Response<StreamBody> {
|
||||
res
|
||||
}
|
||||
|
||||
pub fn bad_request_response(msg: String, reqid: impl AsRef<str>) -> http::Response<StreamBody> {
|
||||
error_status_response(StatusCode::BAD_REQUEST, msg, reqid)
|
||||
}
|
||||
|
||||
pub fn error_response(msg: String, reqid: impl AsRef<str>) -> http::Response<StreamBody> {
|
||||
error_status_response(StatusCode::INTERNAL_SERVER_ERROR, msg, reqid)
|
||||
}
|
||||
@@ -361,11 +365,7 @@ where
|
||||
let host = uri.host().ok_or_else(|| Error::NoHostInUrl)?;
|
||||
let port = uri.port_u16().unwrap_or_else(|| {
|
||||
// NOTE known issue: url::Url will "forget" the port if it was the default port.
|
||||
if scheme == "https" {
|
||||
443
|
||||
} else {
|
||||
80
|
||||
}
|
||||
if scheme == "https" { 443 } else { 80 }
|
||||
});
|
||||
let stream = TcpStream::connect(format!("{host}:{port}")).await?;
|
||||
if false {
|
||||
|
||||
@@ -43,6 +43,7 @@ dbconn = { path = "../dbconn" }
|
||||
disk = { path = "../disk" }
|
||||
items_0 = { path = "../../../daqbuf-items-0", package = "daqbuf-items-0" }
|
||||
items_2 = { path = "../../../daqbuf-items-2", package = "daqbuf-items-2" }
|
||||
series = { path = "../../../daqbuf-series", package = "daqbuf-series" }
|
||||
parse = { path = "../../../daqbuf-parse", package = "daqbuf-parse" }
|
||||
streams = { path = "../../../daqbuf-streams", package = "daqbuf-streams" }
|
||||
streamio = { path = "../streamio" }
|
||||
@@ -54,7 +55,7 @@ daqbuf-redis = { path = "../daqbuf-redis" }
|
||||
httpclient = { path = "../httpclient" }
|
||||
|
||||
[features]
|
||||
#default = []
|
||||
default = ["http3"]
|
||||
default = []
|
||||
# default = ["http3"]
|
||||
prometheus_endpoint = []
|
||||
http3 = ["h3", "h3-quinn", "quinn", "rustls", "rustls-pki-types"]
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
pub mod accounting;
|
||||
pub mod backend;
|
||||
pub mod binned;
|
||||
pub mod binwriteindex;
|
||||
pub mod databuffer_tools;
|
||||
pub mod docs;
|
||||
pub mod eventdata;
|
||||
|
||||
@@ -11,6 +11,7 @@ use http::header::CONTENT_TYPE;
|
||||
use http::request::Parts;
|
||||
use http::Method;
|
||||
use http::StatusCode;
|
||||
use httpclient::bad_request_response;
|
||||
use httpclient::body_empty;
|
||||
use httpclient::body_stream;
|
||||
use httpclient::error_response;
|
||||
@@ -101,11 +102,11 @@ impl BinnedHandler {
|
||||
Ok(res)
|
||||
}
|
||||
Error::BadQuery(msg) => {
|
||||
let res = error_response(format!("bad query: {msg}"), ctx.reqid());
|
||||
let res = bad_request_response(msg, ctx.reqid());
|
||||
Ok(res)
|
||||
}
|
||||
_ => {
|
||||
error!("EventsHandler sees: {e}");
|
||||
error!("EventsHandler sees: {}", e);
|
||||
Ok(error_response(e.to_string(), ctx.reqid()))
|
||||
}
|
||||
},
|
||||
@@ -131,7 +132,7 @@ async fn binned(
|
||||
let reqid = ctx.reqid();
|
||||
let (head, _body) = req.into_parts();
|
||||
let query = BinnedQuery::from_url(&url).map_err(|e| {
|
||||
error!("binned_cbor_framed: {e:?}");
|
||||
error!("binned_cbor_framed: {}", e);
|
||||
Error::BadQuery(e.to_string())
|
||||
})?;
|
||||
let logspan = if query.log_level() == "trace" {
|
||||
|
||||
315
crates/httpret/src/api4/binwriteindex.rs
Normal file
315
crates/httpret/src/api4/binwriteindex.rs
Normal file
@@ -0,0 +1,315 @@
|
||||
use crate::bodystream::response;
|
||||
use crate::channelconfig::ch_conf_from_binned;
|
||||
use crate::requests::accepts_cbor_framed;
|
||||
use crate::requests::accepts_json_framed;
|
||||
use crate::requests::accepts_json_or_all;
|
||||
use crate::requests::accepts_octets;
|
||||
use crate::ServiceSharedResources;
|
||||
use daqbuf_err as err;
|
||||
use dbconn::worker::PgQueue;
|
||||
use futures_util::StreamExt;
|
||||
use futures_util::TryStreamExt;
|
||||
use http::header::CONTENT_TYPE;
|
||||
use http::request::Parts;
|
||||
use http::Method;
|
||||
use http::StatusCode;
|
||||
use httpclient::bad_request_response;
|
||||
use httpclient::body_empty;
|
||||
use httpclient::body_stream;
|
||||
use httpclient::error_response;
|
||||
use httpclient::error_status_response;
|
||||
use httpclient::not_found_response;
|
||||
use httpclient::IntoBody;
|
||||
use httpclient::Requ;
|
||||
use httpclient::StreamResponse;
|
||||
use httpclient::ToJsonBody;
|
||||
use netpod::log;
|
||||
use netpod::req_uri_to_url;
|
||||
use netpod::timeunits::SEC;
|
||||
use netpod::ttl::RetentionTime;
|
||||
use netpod::ChannelTypeConfigGen;
|
||||
use netpod::FromUrl;
|
||||
use netpod::NodeConfigCached;
|
||||
use netpod::ReqCtx;
|
||||
use netpod::APP_CBOR_FRAMED;
|
||||
use netpod::APP_JSON;
|
||||
use netpod::APP_JSON_FRAMED;
|
||||
use netpod::HEADER_NAME_REQUEST_ID;
|
||||
use nodenet::client::OpenBoxedBytesViaHttp;
|
||||
use nodenet::scylla::ScyllaEventReadProvider;
|
||||
use query::api4::binned::BinWriteIndexQuery;
|
||||
use query::api4::binned::BinnedQuery;
|
||||
use scyllaconn::worker::ScyllaQueue;
|
||||
use series::msp::PrebinnedPartitioning;
|
||||
use series::SeriesId;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use streams::eventsplainreader::DummyCacheReadProvider;
|
||||
use streams::eventsplainreader::SfDatabufferEventReadProvider;
|
||||
use streams::streamtimeout::StreamTimeout2;
|
||||
use streams::timebin::cached::reader::EventsReadProvider;
|
||||
use streams::timebin::CacheReadProvider;
|
||||
use tracing::Instrument;
|
||||
use tracing::Span;
|
||||
use url::Url;
|
||||
|
||||
macro_rules! error { ($($arg:expr),*) => ( if true { log::error!($($arg),*); } ); }
|
||||
macro_rules! info { ($($arg:expr),*) => ( if true { log::info!($($arg),*); } ); }
|
||||
macro_rules! debug { ($($arg:expr),*) => ( if true { log::debug!($($arg),*); } ); }
|
||||
macro_rules! trace { ($($arg:expr),*) => ( if true { log::trace!($($arg),*); } ); }
|
||||
|
||||
autoerr::create_error_v1!(
|
||||
name(Error, "Api4BinWriteIndex"),
|
||||
enum variants {
|
||||
ChannelNotFound,
|
||||
BadQuery(String),
|
||||
HttpLib(#[from] http::Error),
|
||||
ChannelConfig(crate::channelconfig::Error),
|
||||
Retrieval(#[from] crate::RetrievalError),
|
||||
EventsCbor(#[from] streams::plaineventscbor::Error),
|
||||
EventsJson(#[from] streams::plaineventsjson::Error),
|
||||
ServerError,
|
||||
BinnedStream(err::Error),
|
||||
TimebinnedJson(#[from] streams::timebinnedjson::Error),
|
||||
},
|
||||
);
|
||||
|
||||
impl From<crate::channelconfig::Error> for Error {
|
||||
fn from(value: crate::channelconfig::Error) -> Self {
|
||||
use crate::channelconfig::Error::*;
|
||||
match value {
|
||||
NotFound(_) => Self::ChannelNotFound,
|
||||
_ => Self::ChannelConfig(value),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Error> for crate::RetrievalError {
|
||||
fn from(value: Error) -> Self {
|
||||
crate::RetrievalError::TextError(value.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct BinWriteIndexHandler {}
|
||||
|
||||
impl BinWriteIndexHandler {
|
||||
pub fn handler(req: &Requ) -> Option<Self> {
|
||||
if req.uri().path() == "/api/4/private/binwriteindex" {
|
||||
Some(Self {})
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn handle(
|
||||
&self,
|
||||
req: Requ,
|
||||
ctx: &ReqCtx,
|
||||
shared_res: &ServiceSharedResources,
|
||||
ncc: &NodeConfigCached,
|
||||
) -> Result<StreamResponse, Error> {
|
||||
if req.method() != Method::GET {
|
||||
return Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?);
|
||||
}
|
||||
match handle_request(req, ctx, &shared_res.pgqueue, shared_res.scyqueue.clone(), ncc).await {
|
||||
Ok(ret) => Ok(ret),
|
||||
Err(e) => match e {
|
||||
Error::ChannelNotFound => {
|
||||
let res = not_found_response("channel not found".into(), ctx.reqid());
|
||||
Ok(res)
|
||||
}
|
||||
Error::BadQuery(msg) => {
|
||||
let res = bad_request_response(msg, ctx.reqid());
|
||||
Ok(res)
|
||||
}
|
||||
_ => {
|
||||
error!("EventsHandler sees: {}", e);
|
||||
Ok(error_response(e.to_string(), ctx.reqid()))
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_request(
|
||||
req: Requ,
|
||||
ctx: &ReqCtx,
|
||||
pgqueue: &PgQueue,
|
||||
scyqueue: Option<ScyllaQueue>,
|
||||
ncc: &NodeConfigCached,
|
||||
) -> Result<StreamResponse, Error> {
|
||||
let url = req_uri_to_url(req.uri()).map_err(|e| Error::BadQuery(e.to_string()))?;
|
||||
if req
|
||||
.uri()
|
||||
.path_and_query()
|
||||
.map_or(false, |x| x.as_str().contains("DOERR"))
|
||||
{
|
||||
Err(Error::ServerError)?;
|
||||
}
|
||||
let reqid = ctx.reqid();
|
||||
let (head, _body) = req.into_parts();
|
||||
let query = BinWriteIndexQuery::from_url(&url).map_err(|e| {
|
||||
error!("handle_request: {}", e);
|
||||
Error::BadQuery(e.to_string())
|
||||
})?;
|
||||
info!("{:?}", query);
|
||||
let logspan = if query.log_level() == "trace" {
|
||||
trace!("enable trace for handler");
|
||||
tracing::span!(tracing::Level::INFO, "log_span_trace")
|
||||
} else if query.log_level() == "debug" {
|
||||
debug!("enable debug for handler");
|
||||
tracing::span!(tracing::Level::INFO, "log_span_debug")
|
||||
} else {
|
||||
tracing::Span::none()
|
||||
};
|
||||
let span1 = tracing::span!(
|
||||
tracing::Level::INFO,
|
||||
"binwriteindex",
|
||||
reqid,
|
||||
beg = query.range().beg_u64() / SEC,
|
||||
end = query.range().end_u64() / SEC,
|
||||
ch = query.channel().name(),
|
||||
);
|
||||
span1.in_scope(|| {
|
||||
debug!("binned begin {:?}", query);
|
||||
});
|
||||
binned_instrumented(head, ctx, url, query, pgqueue, scyqueue, ncc, logspan.clone())
|
||||
.instrument(logspan)
|
||||
.instrument(span1)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn binned_instrumented(
|
||||
head: Parts,
|
||||
ctx: &ReqCtx,
|
||||
url: Url,
|
||||
query: BinWriteIndexQuery,
|
||||
pgqueue: &PgQueue,
|
||||
scyqueue: Option<ScyllaQueue>,
|
||||
ncc: &NodeConfigCached,
|
||||
logspan: Span,
|
||||
) -> Result<StreamResponse, Error> {
|
||||
let res2 = HandleRes2::new(ctx, logspan, url, query.clone(), pgqueue, scyqueue, ncc).await?;
|
||||
if accepts_json_or_all(&head.headers) {
|
||||
Ok(binned_json_single(res2, ctx, ncc).await?)
|
||||
} else {
|
||||
let ret = error_response(format!("Unsupported Accept: {:?}", &head.headers), ctx.reqid());
|
||||
Ok(ret)
|
||||
}
|
||||
}
|
||||
|
||||
fn make_read_provider(
|
||||
chname: &str,
|
||||
scyqueue: Option<ScyllaQueue>,
|
||||
open_bytes: Pin<Arc<OpenBoxedBytesViaHttp>>,
|
||||
ctx: &ReqCtx,
|
||||
ncc: &NodeConfigCached,
|
||||
) -> (Arc<dyn EventsReadProvider>, Arc<dyn CacheReadProvider>) {
|
||||
let events_read_provider = if chname.starts_with("unittest") {
|
||||
let x = streams::teststream::UnitTestStream::new();
|
||||
Arc::new(x)
|
||||
} else if ncc.node_config.cluster.scylla_lt().is_some() {
|
||||
scyqueue
|
||||
.clone()
|
||||
.map(|qu| ScyllaEventReadProvider::new(qu))
|
||||
.map(|x| Arc::new(x) as Arc<dyn EventsReadProvider>)
|
||||
.expect("scylla queue")
|
||||
} else if ncc.node.sf_databuffer.is_some() {
|
||||
// TODO do not clone the request. Pass an Arc up to here.
|
||||
let x = SfDatabufferEventReadProvider::new(Arc::new(ctx.clone()), open_bytes);
|
||||
Arc::new(x)
|
||||
} else {
|
||||
panic!("unexpected backend")
|
||||
};
|
||||
let cache_read_provider = if ncc.node_config.cluster.scylla_lt().is_some() {
|
||||
scyqueue
|
||||
.clone()
|
||||
.map(|qu| scyllaconn::bincache::ScyllaPrebinnedReadProvider::new(qu))
|
||||
.map(|x| Arc::new(x) as Arc<dyn CacheReadProvider>)
|
||||
.expect("scylla queue")
|
||||
} else if ncc.node.sf_databuffer.is_some() {
|
||||
let x = DummyCacheReadProvider::new();
|
||||
Arc::new(x)
|
||||
} else {
|
||||
panic!("unexpected backend")
|
||||
};
|
||||
(events_read_provider, cache_read_provider)
|
||||
}
|
||||
|
||||
async fn binned_json_single(
|
||||
res2: HandleRes2<'_>,
|
||||
ctx: &ReqCtx,
|
||||
_ncc: &NodeConfigCached,
|
||||
) -> Result<StreamResponse, Error> {
|
||||
// TODO unify with binned_json_framed
|
||||
debug!("binned_json_single");
|
||||
let rt1 = res2.query.retention_time_1();
|
||||
let rt2 = res2.query.retention_time_2();
|
||||
let pbp = res2.query.prebinned_partitioning();
|
||||
// let rts = [RetentionTime::Short, RetentionTime::Medium, RetentionTime::Long];
|
||||
// for rt in rts {
|
||||
let mut strings = Vec::new();
|
||||
{
|
||||
let mut stream = scyllaconn::binwriteindex::BinWriteIndexRtStream::new(
|
||||
rt1,
|
||||
rt2,
|
||||
SeriesId::new(res2.ch_conf.series().unwrap()),
|
||||
pbp.clone(),
|
||||
res2.query.range().to_time().unwrap(),
|
||||
res2.scyqueue.clone().unwrap(),
|
||||
);
|
||||
while let Some(x) = stream.next().await {
|
||||
strings.push(format!("{:?}", x));
|
||||
}
|
||||
}
|
||||
let ret = response(StatusCode::OK)
|
||||
.header(CONTENT_TYPE, APP_JSON)
|
||||
.header(HEADER_NAME_REQUEST_ID, ctx.reqid())
|
||||
.body(ToJsonBody::from(&strings).into_body())?;
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
struct HandleRes2<'a> {
|
||||
logspan: Span,
|
||||
url: Url,
|
||||
query: BinWriteIndexQuery,
|
||||
ch_conf: ChannelTypeConfigGen,
|
||||
events_read_provider: Arc<dyn EventsReadProvider>,
|
||||
cache_read_provider: Arc<dyn CacheReadProvider>,
|
||||
timeout_provider: Box<dyn StreamTimeout2>,
|
||||
pgqueue: &'a PgQueue,
|
||||
scyqueue: Option<ScyllaQueue>,
|
||||
}
|
||||
|
||||
impl<'a> HandleRes2<'a> {
|
||||
async fn new(
|
||||
ctx: &ReqCtx,
|
||||
logspan: Span,
|
||||
url: Url,
|
||||
query: BinWriteIndexQuery,
|
||||
pgqueue: &'a PgQueue,
|
||||
scyqueue: Option<ScyllaQueue>,
|
||||
ncc: &NodeConfigCached,
|
||||
) -> Result<Self, Error> {
|
||||
let q2 = BinnedQuery::new(query.channel().clone(), query.range().clone(), 100);
|
||||
let ch_conf = ch_conf_from_binned(&q2, ctx, pgqueue, ncc)
|
||||
.await?
|
||||
.ok_or_else(|| Error::ChannelNotFound)?;
|
||||
let open_bytes = Arc::pin(OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone()));
|
||||
let (events_read_provider, cache_read_provider) =
|
||||
make_read_provider(ch_conf.name(), scyqueue.clone(), open_bytes, ctx, ncc);
|
||||
let timeout_provider = streamio::streamtimeout::StreamTimeout::boxed();
|
||||
let ret = Self {
|
||||
logspan,
|
||||
url,
|
||||
query,
|
||||
ch_conf,
|
||||
events_read_provider,
|
||||
cache_read_provider,
|
||||
timeout_provider,
|
||||
pgqueue,
|
||||
scyqueue,
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
}
|
||||
@@ -67,6 +67,7 @@ async fn channel_search(req: Requ, pgqueue: &PgQueue, ncc: &NodeConfigCached) ->
|
||||
};
|
||||
let res = dbconn::search::search_channel(query, pgqueue, ncc)
|
||||
.instrument(logspan)
|
||||
.await?;
|
||||
.await
|
||||
.map_err(|e| Error::from_to_string(e))?;
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
@@ -15,7 +15,7 @@ autoerr::create_error_v1!(
|
||||
pub struct Http3Support {}
|
||||
|
||||
impl Http3Support {
|
||||
pub async fn new(bind_addr: SocketAddr) -> Result<Self, Error> {
|
||||
pub async fn new(_bind_addr: SocketAddr) -> Result<Self, Error> {
|
||||
let ret = Self {};
|
||||
Ok(ret)
|
||||
}
|
||||
@@ -28,7 +28,7 @@ impl Http3Support {
|
||||
impl Future for Http3Support {
|
||||
type Output = Result<(), Error>;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||
fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -129,7 +129,7 @@ pub async fn host(ncc: NodeConfigCached, service_version: ServiceVersion) -> Res
|
||||
match x {
|
||||
Ok(()) => {}
|
||||
Err(e) => {
|
||||
error!("received error from PgWorker: {e}");
|
||||
error!("received error from PgWorker: {}", e);
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -150,7 +150,7 @@ pub async fn host(ncc: NodeConfigCached, service_version: ServiceVersion) -> Res
|
||||
match x {
|
||||
Ok(()) => {}
|
||||
Err(e) => {
|
||||
error!("received error from ScyllaWorker: {e}");
|
||||
error!("received error from ScyllaWorker: {}", e);
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -162,6 +162,7 @@ pub async fn host(ncc: NodeConfigCached, service_version: ServiceVersion) -> Res
|
||||
let shared_res = Arc::new(shared_res);
|
||||
use std::str::FromStr;
|
||||
let bind_addr = SocketAddr::from_str(&format!("{}:{}", ncc.node.listen(), ncc.node.port))?;
|
||||
#[cfg(feature = "http3")]
|
||||
let http3 = http3::Http3Support::new_or_dummy(bind_addr.clone()).await?;
|
||||
// tokio::net::TcpSocket::new_v4()?.listen(200)?
|
||||
let listener = TcpListener::bind(bind_addr).await?;
|
||||
@@ -171,7 +172,7 @@ pub async fn host(ncc: NodeConfigCached, service_version: ServiceVersion) -> Res
|
||||
} else {
|
||||
break;
|
||||
};
|
||||
debug!("new connection from {addr}");
|
||||
debug!("new connection from {}", addr);
|
||||
let node_config = ncc.clone();
|
||||
let service_version = service_version.clone();
|
||||
let io = TokioIo::new(stream);
|
||||
@@ -194,13 +195,26 @@ pub async fn host(ncc: NodeConfigCached, service_version: ServiceVersion) -> Res
|
||||
match res {
|
||||
Ok(()) => {}
|
||||
Err(e) => {
|
||||
error!("error from serve_connection: {e}");
|
||||
if e.is_body_write_aborted() {
|
||||
info!("http conn body write abort: {}", e)
|
||||
} else if e.is_closed() {
|
||||
info!("http conn close: {}", e)
|
||||
} else if e.is_canceled() {
|
||||
info!("http conn cancel: {}", e)
|
||||
} else if e.is_timeout() {
|
||||
info!("http conn timeout: {}", e)
|
||||
} else {
|
||||
warn!("error from serve_connection: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
info!("http service done");
|
||||
let _x: () = http3.wait_idle().await;
|
||||
#[cfg(feature = "http3")]
|
||||
{
|
||||
let _x: () = http3.wait_idle().await;
|
||||
}
|
||||
info!("http host done");
|
||||
// rawjh.await??;
|
||||
Ok(())
|
||||
@@ -365,6 +379,8 @@ async fn http_service_inner(
|
||||
} else {
|
||||
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?)
|
||||
}
|
||||
} else if let Some(h) = api4::binwriteindex::BinWriteIndexHandler::handler(&req) {
|
||||
Ok(h.handle(req, ctx, &shared_res, &node_config).await?)
|
||||
} else if let Some(h) = api4::eventdata::EventDataHandler::handler(&req) {
|
||||
Ok(h.handle(req, ctx, &node_config, shared_res)
|
||||
.await
|
||||
|
||||
@@ -71,8 +71,6 @@ use url::Url;
|
||||
|
||||
#[cfg(feature = "http3")]
|
||||
use crate::http3;
|
||||
#[cfg(not(feature = "http3"))]
|
||||
use crate::http3_dummy as http3;
|
||||
|
||||
const DISTRI_PRE: &str = "/distri/";
|
||||
|
||||
@@ -80,6 +78,7 @@ pub async fn proxy(proxy_config: ProxyConfig, service_version: ServiceVersion) -
|
||||
status_board_init();
|
||||
use std::str::FromStr;
|
||||
let bind_addr = SocketAddr::from_str(&format!("{}:{}", proxy_config.listen, proxy_config.port))?;
|
||||
#[cfg(feature = "http3")]
|
||||
let http3 = http3::Http3Support::new_or_dummy(bind_addr.clone()).await?;
|
||||
let listener = TcpListener::bind(bind_addr).await?;
|
||||
loop {
|
||||
@@ -110,7 +109,10 @@ pub async fn proxy(proxy_config: ProxyConfig, service_version: ServiceVersion) -
|
||||
});
|
||||
}
|
||||
info!("http service done");
|
||||
let _x: () = http3.wait_idle().await;
|
||||
#[cfg(feature = "http3")]
|
||||
{
|
||||
let _x: () = http3.wait_idle().await;
|
||||
}
|
||||
info!("http host done");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
174
crates/scyllaconn/src/binwriteindex.rs
Normal file
174
crates/scyllaconn/src/binwriteindex.rs
Normal file
@@ -0,0 +1,174 @@
|
||||
pub mod bwxcmb;
|
||||
|
||||
use crate::worker::ScyllaQueue;
|
||||
use daqbuf_series::msp::MspU32;
|
||||
use daqbuf_series::msp::PrebinnedPartitioning;
|
||||
use daqbuf_series::SeriesId;
|
||||
use futures_util::Future;
|
||||
use futures_util::FutureExt;
|
||||
use futures_util::Stream;
|
||||
use netpod::log;
|
||||
use netpod::range::evrange::NanoRange;
|
||||
use netpod::ttl::RetentionTime;
|
||||
use std::collections::VecDeque;
|
||||
use std::fmt;
|
||||
use std::pin::Pin;
|
||||
use std::task::Context;
|
||||
use std::task::Poll;
|
||||
|
||||
macro_rules! info { ($($arg:expr),*) => ( if true { log::info!($($arg),*); } ); }
|
||||
macro_rules! debug { ($($arg:expr),*) => ( if true { log::debug!($($arg),*); } ); }
|
||||
|
||||
autoerr::create_error_v1!(
|
||||
name(Error, "BinWriteIndexRtStream"),
|
||||
enum variants {
|
||||
Worker(#[from] crate::worker::Error),
|
||||
},
|
||||
);
|
||||
|
||||
struct Fut1(Fut2);
|
||||
|
||||
impl fmt::Debug for Fut1 {
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||
fmt.debug_tuple("Fut1").finish()
|
||||
}
|
||||
}
|
||||
|
||||
type Fut2 =
|
||||
Pin<Box<dyn Future<Output = Result<(u32, u32, u32, VecDeque<BinWriteIndexEntry>), crate::worker::Error>> + Send>>;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct BinWriteIndexEntry {
|
||||
pub rt: u16,
|
||||
pub lsp: u32,
|
||||
pub binlen: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct BinWriteIndexSet {
|
||||
pub rt: RetentionTime,
|
||||
pub msp: MspU32,
|
||||
pub entries: VecDeque<BinWriteIndexEntry>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct BinWriteIndexRtStream {
|
||||
rt1: RetentionTime,
|
||||
rt2: RetentionTime,
|
||||
series: SeriesId,
|
||||
scyqueue: ScyllaQueue,
|
||||
pbp: PrebinnedPartitioning,
|
||||
msp: u32,
|
||||
lsp_min: u32,
|
||||
msp_end: u32,
|
||||
lsp_end: u32,
|
||||
fut1: Option<Fut1>,
|
||||
}
|
||||
|
||||
impl BinWriteIndexRtStream {
|
||||
pub fn type_name() -> &'static str {
|
||||
std::any::type_name::<Self>()
|
||||
}
|
||||
|
||||
pub fn new(
|
||||
rt1: RetentionTime,
|
||||
rt2: RetentionTime,
|
||||
series: SeriesId,
|
||||
pbp: PrebinnedPartitioning,
|
||||
range: NanoRange,
|
||||
scyqueue: ScyllaQueue,
|
||||
) -> Self {
|
||||
info!("{}::new INFO/DEBUG test", Self::type_name());
|
||||
debug!("{}::new", Self::type_name());
|
||||
let (msp_beg, lsp_beg) = pbp.msp_lsp(range.beg_ts().to_ts_ms());
|
||||
let (msp_end, lsp_end) = pbp.msp_lsp(range.end_ts().add_dt_nano(pbp.bin_len().dt_ns()).to_ts_ms());
|
||||
BinWriteIndexRtStream {
|
||||
rt1,
|
||||
rt2,
|
||||
series,
|
||||
scyqueue,
|
||||
pbp,
|
||||
msp: msp_beg,
|
||||
lsp_min: lsp_beg,
|
||||
msp_end,
|
||||
lsp_end,
|
||||
fut1: None,
|
||||
}
|
||||
}
|
||||
|
||||
async fn next_query_fut(
|
||||
scyqueue: &ScyllaQueue,
|
||||
rt1: RetentionTime,
|
||||
rt2: RetentionTime,
|
||||
series: SeriesId,
|
||||
pbp: PrebinnedPartitioning,
|
||||
msp: u32,
|
||||
lsp_min: u32,
|
||||
lsp_max: u32,
|
||||
) -> Result<(u32, u32, u32, VecDeque<BinWriteIndexEntry>), crate::worker::Error> {
|
||||
debug!("make_next_query_fut msp {} lsp {} {}", msp, lsp_min, lsp_max);
|
||||
let res = scyqueue
|
||||
.bin_write_index_read(rt1, rt2, series, pbp, MspU32(msp), lsp_min, lsp_max)
|
||||
.await?;
|
||||
Ok((msp, lsp_min, lsp_max, res))
|
||||
}
|
||||
|
||||
fn make_next_query_fut(mut self: Pin<&mut Self>, _cx: &mut Context) -> Option<Fut1> {
|
||||
if self.msp <= self.msp_end {
|
||||
let msp = self.msp;
|
||||
let lsp_min = self.lsp_min;
|
||||
self.msp += 1;
|
||||
self.lsp_min = 0;
|
||||
let lsp_max = if self.msp == self.msp_end {
|
||||
self.lsp_end
|
||||
} else {
|
||||
self.pbp.patch_len()
|
||||
};
|
||||
let scyqueue = unsafe { netpod::extltref(&self.scyqueue) };
|
||||
let fut = Self::next_query_fut(
|
||||
scyqueue,
|
||||
self.rt1.clone(),
|
||||
self.rt2.clone(),
|
||||
self.series.clone(),
|
||||
self.pbp.clone(),
|
||||
msp,
|
||||
lsp_min,
|
||||
lsp_max,
|
||||
);
|
||||
Some(Fut1(Box::pin(fut)))
|
||||
} else {
|
||||
debug!("make_next_query_fut done");
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for BinWriteIndexRtStream {
|
||||
type Item = Result<BinWriteIndexSet, Error>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use Poll::*;
|
||||
loop {
|
||||
break if let Some(fut) = self.fut1.as_mut() {
|
||||
match fut.0.poll_unpin(cx) {
|
||||
Ready(Ok(x)) => {
|
||||
self.fut1 = None;
|
||||
let item = BinWriteIndexSet {
|
||||
rt: self.rt1.clone(),
|
||||
msp: MspU32(x.0),
|
||||
entries: x.3,
|
||||
};
|
||||
Ready(Some(Ok(item)))
|
||||
}
|
||||
Ready(Err(e)) => Ready(Some(Err(e.into()))),
|
||||
Pending => Pending,
|
||||
}
|
||||
} else if let Some(fut) = self.as_mut().make_next_query_fut(cx) {
|
||||
self.fut1 = Some(fut);
|
||||
continue;
|
||||
} else {
|
||||
Ready(None)
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
126
crates/scyllaconn/src/binwriteindex/bwxcmb.rs
Normal file
126
crates/scyllaconn/src/binwriteindex/bwxcmb.rs
Normal file
@@ -0,0 +1,126 @@
|
||||
use super::BinWriteIndexRtStream;
|
||||
use crate::worker::ScyllaQueue;
|
||||
use daqbuf_series::msp::PrebinnedPartitioning;
|
||||
use daqbuf_series::SeriesId;
|
||||
use futures_util::Future;
|
||||
use futures_util::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use netpod::log;
|
||||
use netpod::range::evrange::NanoRange;
|
||||
use netpod::ttl::RetentionTime;
|
||||
use std::collections::VecDeque;
|
||||
use std::fmt;
|
||||
use std::pin::Pin;
|
||||
use std::task::Context;
|
||||
use std::task::Poll;
|
||||
|
||||
macro_rules! debug { ($($arg:expr),*) => ( if true { log::debug!($($arg),*); } ); }
|
||||
|
||||
autoerr::create_error_v1!(
|
||||
name(Error, "BinWriteIndexStream"),
|
||||
enum variants {
|
||||
A,
|
||||
},
|
||||
);
|
||||
|
||||
type Fut1Res = <BinWriteIndexRtStream as Stream>::Item;
|
||||
|
||||
struct Fut1a(Pin<Box<dyn Future<Output = Fut1Res>>>);
|
||||
|
||||
impl fmt::Debug for Fut1a {
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||
fmt.debug_tuple("Fut1a").finish()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum InpSt {
|
||||
Polling(BinWriteIndexRtStream),
|
||||
Ready(BinWriteIndexRtStream, Fut1Res),
|
||||
Done,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct BinWriteIndexStream {
|
||||
rtss: VecDeque<InpSt>,
|
||||
}
|
||||
|
||||
impl BinWriteIndexStream {
|
||||
pub fn type_name() -> &'static str {
|
||||
std::any::type_name::<Self>()
|
||||
}
|
||||
|
||||
pub fn new(series: SeriesId, range: NanoRange, scyqueue: ScyllaQueue) -> Self {
|
||||
debug!("{}::new", Self::type_name());
|
||||
let mut rtss = VecDeque::new();
|
||||
let rts = [RetentionTime::Short, RetentionTime::Medium, RetentionTime::Long];
|
||||
for rt in rts {
|
||||
let s = BinWriteIndexRtStream::new(
|
||||
rt.clone(),
|
||||
rt.clone(),
|
||||
series.clone(),
|
||||
PrebinnedPartitioning::Day1,
|
||||
range.clone(),
|
||||
scyqueue.clone(),
|
||||
);
|
||||
rtss.push_back(InpSt::Polling(s));
|
||||
}
|
||||
BinWriteIndexStream { rtss }
|
||||
}
|
||||
|
||||
fn abort(&mut self) {
|
||||
for inp in self.rtss.iter_mut() {
|
||||
*inp = InpSt::Done;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for BinWriteIndexStream {
|
||||
type Item = Result<(), Error>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
// TODO poll the streams in `self.rtss`. When any of them returns an item, store it
|
||||
// in the variant InpSt::Ready(..) together with the remaining stream.
|
||||
// When all input streams have an item ready, store all items in a new variable
|
||||
// in the type Self and transition all inputs to Polling again.
|
||||
// If any of the input streams returns None, we return None as well.
|
||||
use Poll::*;
|
||||
loop {
|
||||
let mut ready_cnt: u16 = 0;
|
||||
let mut do_abort = false;
|
||||
let mut have_pending = false;
|
||||
for inp in self.rtss.iter_mut() {
|
||||
match inp {
|
||||
InpSt::Polling(fut) => match fut.poll_next_unpin(cx) {
|
||||
Ready(Some(item)) => {
|
||||
let fut = todo!("bwxcmb keep stream in state");
|
||||
*inp = InpSt::Ready(fut, item);
|
||||
ready_cnt += 1;
|
||||
}
|
||||
Ready(None) => {
|
||||
do_abort = true;
|
||||
}
|
||||
Pending => {
|
||||
have_pending = true;
|
||||
}
|
||||
},
|
||||
InpSt::Ready(..) => {
|
||||
ready_cnt += 1;
|
||||
}
|
||||
InpSt::Done => {}
|
||||
}
|
||||
}
|
||||
if do_abort {
|
||||
self.abort();
|
||||
}
|
||||
break if ready_cnt == self.rtss.len() as u16 {
|
||||
todo!("TODO bwxcmb")
|
||||
} else if have_pending {
|
||||
Pending
|
||||
} else {
|
||||
self.abort();
|
||||
Ready(None)
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -3,9 +3,6 @@ use super::events::EventsStreamRt;
|
||||
use crate::events2::onebeforeandbulk::OneBeforeAndBulk;
|
||||
use crate::range::ScyllaSeriesRange;
|
||||
use crate::worker::ScyllaQueue;
|
||||
use daqbuf_err as err;
|
||||
use err::thiserror;
|
||||
use err::ThisError;
|
||||
use futures_util::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use items_0::streamitem::sitem_err2_from_string;
|
||||
@@ -14,20 +11,21 @@ use items_0::streamitem::SitemErrTy;
|
||||
use items_0::streamitem::StreamItem;
|
||||
use items_2::channelevents::ChannelEvents;
|
||||
use items_2::merger::Merger;
|
||||
use netpod::log::*;
|
||||
use netpod::log;
|
||||
use netpod::ttl::RetentionTime;
|
||||
use netpod::ChConf;
|
||||
use std::pin::Pin;
|
||||
use std::task::Context;
|
||||
use std::task::Poll;
|
||||
|
||||
macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace!($($arg)*); } ) }
|
||||
macro_rules! trace_init { ($($arg:expr),*) => ( if true { log::trace!($($arg),*); } ) }
|
||||
|
||||
#[derive(Debug, ThisError)]
|
||||
#[cstm(name = "EventsMergeRt")]
|
||||
pub enum Error {
|
||||
Msg(String),
|
||||
}
|
||||
autoerr::create_error_v1!(
|
||||
name(Error, "EventsMergeRt"),
|
||||
enum variants {
|
||||
Msg(String),
|
||||
},
|
||||
);
|
||||
|
||||
pub struct MergeRts {
|
||||
inp: Pin<Box<dyn Stream<Item = Result<ChannelEvents, SitemErrTy>> + Send>>,
|
||||
|
||||
@@ -1,15 +1,12 @@
|
||||
use super::prepare::StmtsEvents;
|
||||
use crate::events2::prepare::StmtsEvents;
|
||||
use crate::range::ScyllaSeriesRange;
|
||||
use crate::worker::ScyllaQueue;
|
||||
use daqbuf_err as err;
|
||||
use daqbuf_series::SeriesId;
|
||||
use err::thiserror;
|
||||
use err::ThisError;
|
||||
use futures_util::Future;
|
||||
use futures_util::FutureExt;
|
||||
use futures_util::Stream;
|
||||
use futures_util::TryStreamExt;
|
||||
use netpod::log::*;
|
||||
use netpod::log;
|
||||
use netpod::ttl::RetentionTime;
|
||||
use netpod::TsMs;
|
||||
use netpod::TsMsVecFmt;
|
||||
@@ -19,17 +16,20 @@ use std::pin::Pin;
|
||||
use std::task::Context;
|
||||
use std::task::Poll;
|
||||
|
||||
macro_rules! trace_emit { ($det:expr, $($arg:tt)*) => ( if $det { trace!($($arg)*); } ) }
|
||||
macro_rules! trace_emit { ($det:expr, $($arg:expr),*) => ( if $det { log::trace!($($arg),*); } ) }
|
||||
|
||||
#[derive(Debug, ThisError)]
|
||||
#[cstm(name = "EventsMsp")]
|
||||
pub enum Error {
|
||||
Logic,
|
||||
Worker(Box<crate::worker::Error>),
|
||||
ScyllaQuery(#[from] scylla::transport::errors::QueryError),
|
||||
ScyllaRow(#[from] scylla::transport::iterator::NextRowError),
|
||||
ScyllaTypeCheck(#[from] scylla::deserialize::TypeCheckError),
|
||||
}
|
||||
macro_rules! trace_msp { ($($arg:expr),*) => ( if true { log::trace!($($arg),*); } ) }
|
||||
|
||||
autoerr::create_error_v1!(
|
||||
name(Error, "EventsMsp"),
|
||||
enum variants {
|
||||
Logic,
|
||||
Worker(Box<crate::worker::Error>),
|
||||
ScyllaQuery(#[from] scylla::transport::errors::QueryError),
|
||||
ScyllaRow(#[from] scylla::transport::iterator::NextRowError),
|
||||
ScyllaTypeCheck(#[from] scylla::deserialize::TypeCheckError),
|
||||
},
|
||||
);
|
||||
|
||||
impl From<crate::worker::Error> for Error {
|
||||
fn from(value: crate::worker::Error) -> Self {
|
||||
@@ -237,7 +237,7 @@ impl Stream for MspStreamRt {
|
||||
Ready(Err(e)) => {
|
||||
trace_emit!(trdet, "bulk fwd error {e}");
|
||||
*rsv = Resolvable::Taken;
|
||||
error!("{e}");
|
||||
log::error!("{e}");
|
||||
}
|
||||
Pending => {
|
||||
trace_emit!(trdet, "bulk fwd Pending");
|
||||
@@ -305,7 +305,7 @@ pub async fn find_ts_msp(
|
||||
stmts: &StmtsEvents,
|
||||
scy: &Session,
|
||||
) -> Result<VecDeque<TsMs>, Error> {
|
||||
trace!(
|
||||
trace_msp!(
|
||||
"find_ts_msp series {:?} {:?} {} bck {}",
|
||||
rt,
|
||||
series,
|
||||
|
||||
@@ -79,6 +79,7 @@ pub struct StmtsEventsRt {
|
||||
lsp_fwd_ts: StmtsLspDir,
|
||||
lsp_bck_ts: StmtsLspDir,
|
||||
prebinned_f32: PreparedStatement,
|
||||
bin_write_index_read: PreparedStatement,
|
||||
}
|
||||
|
||||
impl StmtsEventsRt {
|
||||
@@ -109,6 +110,10 @@ impl StmtsEventsRt {
|
||||
pub fn prebinned_f32(&self) -> &PreparedStatement {
|
||||
&self.prebinned_f32
|
||||
}
|
||||
|
||||
pub fn bin_write_index_read(&self) -> &PreparedStatement {
|
||||
&self.bin_write_index_read
|
||||
}
|
||||
}
|
||||
|
||||
async fn make_msp_dir(ks: &str, rt: &RetentionTime, bck: bool, scy: &Session) -> Result<PreparedStatement, Error> {
|
||||
@@ -228,6 +233,21 @@ async fn make_prebinned_f32(ks: &str, rt: &RetentionTime, scy: &Session) -> Resu
|
||||
Ok(qu)
|
||||
}
|
||||
|
||||
async fn make_bin_write_index_read(ks: &str, rt: &RetentionTime, scy: &Session) -> Result<PreparedStatement, Error> {
|
||||
let cql = format!(
|
||||
concat!(
|
||||
"select rt, lsp, binlen",
|
||||
" from {}.{}bin_write_index_v03",
|
||||
" where series = ? and pbp = ? and msp = ? and rt = ?",
|
||||
" and lsp >= ? and lsp < ?",
|
||||
),
|
||||
ks,
|
||||
rt.table_prefix()
|
||||
);
|
||||
let qu = scy.prepare(cql).await?;
|
||||
Ok(qu)
|
||||
}
|
||||
|
||||
async fn make_rt(ks: &str, rt: &RetentionTime, scy: &Session) -> Result<StmtsEventsRt, Error> {
|
||||
let ret = StmtsEventsRt {
|
||||
ts_msp_fwd: make_msp_dir(ks, rt, false, scy).await?,
|
||||
@@ -237,6 +257,7 @@ async fn make_rt(ks: &str, rt: &RetentionTime, scy: &Session) -> Result<StmtsEve
|
||||
lsp_fwd_ts: make_lsp_dir(ks, rt, "ts_lsp", false, scy).await?,
|
||||
lsp_bck_ts: make_lsp_dir(ks, rt, "ts_lsp", true, scy).await?,
|
||||
prebinned_f32: make_prebinned_f32(ks, rt, scy).await?,
|
||||
bin_write_index_read: make_bin_write_index_read(ks, rt, scy).await?,
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
@@ -1,24 +0,0 @@
|
||||
use daqbuf_err as err;
|
||||
use err::thiserror;
|
||||
use err::ThisError;
|
||||
use netpod::ttl::RetentionTime;
|
||||
use netpod::ScyllaConfig;
|
||||
use scylla::Session as ScySession;
|
||||
|
||||
#[derive(Debug, ThisError)]
|
||||
#[cstm(name = "ScyllaSchema")]
|
||||
pub enum Error {
|
||||
Scylla(#[from] scylla::transport::errors::QueryError),
|
||||
}
|
||||
|
||||
pub async fn schema(rt: RetentionTime, scyco: &ScyllaConfig, scy: &ScySession) -> Result<(), Error> {
|
||||
let table = "binned_scalar_f32";
|
||||
let cql = format!(
|
||||
concat!("alter table {}.{}{}", " add lst float"),
|
||||
&scyco.keyspace,
|
||||
rt.table_prefix(),
|
||||
table
|
||||
);
|
||||
let _ = scy.query_unpaged(cql, ()).await;
|
||||
Ok(())
|
||||
}
|
||||
@@ -1,10 +1,10 @@
|
||||
pub mod accounting;
|
||||
pub mod bincache;
|
||||
pub mod binwriteindex;
|
||||
pub mod conn;
|
||||
pub mod errconv;
|
||||
pub mod events2;
|
||||
pub mod range;
|
||||
pub mod schema;
|
||||
pub mod status;
|
||||
pub mod worker;
|
||||
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use crate::binwriteindex::BinWriteIndexEntry;
|
||||
use crate::conn::create_scy_session_no_ks;
|
||||
use crate::events2::events::ReadJobTrace;
|
||||
use crate::events2::prepare::StmtsEvents;
|
||||
@@ -5,11 +6,15 @@ use crate::range::ScyllaSeriesRange;
|
||||
use async_channel::Receiver;
|
||||
use async_channel::Sender;
|
||||
use daqbuf_err as err;
|
||||
use daqbuf_series::msp::MspU32;
|
||||
use daqbuf_series::msp::PrebinnedPartitioning;
|
||||
use daqbuf_series::SeriesId;
|
||||
use futures_util::Future;
|
||||
use futures_util::StreamExt;
|
||||
use futures_util::TryStreamExt;
|
||||
use items_0::timebin::BinningggContainerEventsDyn;
|
||||
use items_2::binning::container_bins::ContainerBins;
|
||||
use netpod::log::*;
|
||||
use netpod::log;
|
||||
use netpod::ttl::RetentionTime;
|
||||
use netpod::DtMs;
|
||||
use netpod::ScyllaConfig;
|
||||
@@ -20,6 +25,9 @@ use std::fmt;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
|
||||
macro_rules! info { ($($arg:expr),*) => ( if true { log::info!($($arg),*); } ); }
|
||||
macro_rules! debug { ($($arg:expr),*) => ( if true { log::debug!($($arg),*); } ); }
|
||||
|
||||
const CONCURRENT_QUERIES_PER_WORKER: usize = 80;
|
||||
const SCYLLA_WORKER_QUEUE_LEN: usize = 200;
|
||||
|
||||
@@ -36,10 +44,19 @@ autoerr::create_error_v1!(
|
||||
Toplist(#[from] crate::accounting::toplist::Error),
|
||||
MissingKeyspaceConfig,
|
||||
CacheWriteF32(#[from] streams::timebin::cached::reader::Error),
|
||||
Schema(#[from] crate::schema::Error),
|
||||
ScyllaQuery(#[from] scylla::transport::errors::QueryError),
|
||||
ScyllaType(#[from] scylla::deserialize::TypeCheckError),
|
||||
},
|
||||
);
|
||||
|
||||
impl<T> From<async_channel::SendError<T>> for Error {
|
||||
fn from(_: async_channel::SendError<T>) -> Self {
|
||||
Self::ChannelSend
|
||||
}
|
||||
}
|
||||
|
||||
type ScySessTy = scylla::transport::session::GenericSession<scylla::transport::session::CurrentDeserializationApi>;
|
||||
|
||||
#[derive(Debug)]
|
||||
struct ReadPrebinnedF32 {
|
||||
rt: RetentionTime,
|
||||
@@ -50,6 +67,60 @@ struct ReadPrebinnedF32 {
|
||||
tx: Sender<Result<ContainerBins<f32, f32>, streams::timebin::cached::reader::Error>>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct BinWriteIndexRead {
|
||||
rt1: RetentionTime,
|
||||
rt2: RetentionTime,
|
||||
series: SeriesId,
|
||||
pbp: PrebinnedPartitioning,
|
||||
msp: MspU32,
|
||||
lsp_min: u32,
|
||||
lsp_max: u32,
|
||||
tx: Sender<Result<VecDeque<BinWriteIndexEntry>, Error>>,
|
||||
}
|
||||
|
||||
impl BinWriteIndexRead {
|
||||
async fn execute(self, stmts: &StmtsEvents, scy: &ScySessTy) {
|
||||
// TODO avoid the extra clone
|
||||
let tx = self.tx.clone();
|
||||
match self.execute_inner(stmts, scy).await {
|
||||
Ok(()) => {}
|
||||
Err(e) => {
|
||||
if tx.send(Err(e)).await.is_err() {
|
||||
// TODO count for stats
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn execute_inner(self, stmts: &StmtsEvents, scy: &ScySessTy) -> Result<(), Error> {
|
||||
let params = (
|
||||
self.series.id() as i64,
|
||||
self.pbp.db_ix() as i16,
|
||||
self.msp.0 as i32,
|
||||
self.rt2.to_index_db_i32() as i16,
|
||||
self.lsp_min as i32,
|
||||
self.lsp_max as i32,
|
||||
);
|
||||
log::info!("execute {:?}", params);
|
||||
let res = scy
|
||||
.execute_iter(stmts.rt(&self.rt1).bin_write_index_read().clone(), params)
|
||||
.await?;
|
||||
let mut it = res.rows_stream::<(i16, i32, i32)>()?;
|
||||
let mut all = VecDeque::new();
|
||||
while let Some((rt, lsp, binlen)) = it.try_next().await? {
|
||||
let v = BinWriteIndexEntry {
|
||||
rt: rt as u16,
|
||||
lsp: lsp as u32,
|
||||
binlen: binlen as u32,
|
||||
};
|
||||
all.push_back(v);
|
||||
}
|
||||
self.tx.send(Ok(all)).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum Job {
|
||||
FindTsMsp(
|
||||
@@ -72,6 +143,7 @@ enum Job {
|
||||
Sender<Result<(), streams::timebin::cached::reader::Error>>,
|
||||
),
|
||||
ReadPrebinnedF32(ReadPrebinnedF32),
|
||||
BinWriteIndexRead(BinWriteIndexRead),
|
||||
}
|
||||
|
||||
struct ReadNextValues {
|
||||
@@ -197,6 +269,39 @@ impl ScyllaQueue {
|
||||
.map_err(|_| streams::timebin::cached::reader::Error::ChannelRecv)??;
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
pub async fn bin_write_index_read(
|
||||
&self,
|
||||
rt1: RetentionTime,
|
||||
rt2: RetentionTime,
|
||||
series: SeriesId,
|
||||
pbp: PrebinnedPartitioning,
|
||||
msp: MspU32,
|
||||
lsp_min: u32,
|
||||
lsp_max: u32,
|
||||
) -> Result<VecDeque<BinWriteIndexEntry>, Error> {
|
||||
let (tx, rx) = async_channel::bounded(1);
|
||||
let job = BinWriteIndexRead {
|
||||
rt1,
|
||||
rt2,
|
||||
series,
|
||||
pbp,
|
||||
msp,
|
||||
lsp_min,
|
||||
lsp_max,
|
||||
tx,
|
||||
};
|
||||
let job = Job::BinWriteIndexRead(job);
|
||||
self.tx
|
||||
.send(job)
|
||||
.await
|
||||
.map_err(|_| streams::timebin::cached::reader::Error::ChannelSend)?;
|
||||
let res = rx
|
||||
.recv()
|
||||
.await
|
||||
.map_err(|_| streams::timebin::cached::reader::Error::ChannelRecv)??;
|
||||
Ok(res)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -229,9 +334,6 @@ impl ScyllaWorker {
|
||||
.await
|
||||
.map_err(Error::ScyllaConnection)?;
|
||||
let scy = Arc::new(scy);
|
||||
crate::schema::schema(RetentionTime::Short, &self.scyconf_st, &scy).await?;
|
||||
crate::schema::schema(RetentionTime::Medium, &self.scyconf_mt, &scy).await?;
|
||||
crate::schema::schema(RetentionTime::Long, &self.scyconf_lt, &scy).await?;
|
||||
let kss = [
|
||||
self.scyconf_st.keyspace.as_str(),
|
||||
self.scyconf_mt.keyspace.as_str(),
|
||||
@@ -294,6 +396,7 @@ impl ScyllaWorker {
|
||||
// TODO count for stats
|
||||
}
|
||||
}
|
||||
Job::BinWriteIndexRead(job) => job.execute(&stmts, &scy).await,
|
||||
}
|
||||
})
|
||||
.buffer_unordered(CONCURRENT_QUERIES_PER_WORKER)
|
||||
|
||||
Reference in New Issue
Block a user