From bc3a123f1343688515bf19907252e91bd2220cf6 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 30 Jan 2024 16:02:41 +0100 Subject: [PATCH] Add handler for accounting and refactor --- crates/httpclient/src/httpclient.rs | 11 +- crates/httpclient/src/lib.rs | 1 + crates/httpret/src/api4.rs | 1 + crates/httpret/src/api4/accounting.rs | 86 ++++++++ crates/httpret/src/api4/binned.rs | 6 +- crates/httpret/src/api4/databuffer_tools.rs | 5 +- crates/httpret/src/api4/eventdata.rs | 4 +- crates/httpret/src/api4/events.rs | 21 +- crates/httpret/src/bodystream.rs | 9 + crates/httpret/src/channel_status.rs | 4 +- crates/httpret/src/channelconfig.rs | 10 +- crates/httpret/src/gather.rs | 1 + crates/httpret/src/httpret.rs | 19 +- crates/httpret/src/proxy.rs | 224 +++++++++----------- crates/httpret/src/proxy/api4/events.rs | 2 +- crates/httpret/src/pulsemap.rs | 2 +- crates/httpret/src/requests.rs | 21 ++ crates/items_2/src/accounting.rs | 37 ++++ crates/items_2/src/items_2.rs | 1 + crates/netpod/src/netpod.rs | 8 +- crates/nodenet/src/scylla.rs | 2 +- crates/query/src/api4.rs | 51 +++++ crates/scyllaconn/src/accounting.rs | 197 +++++++++++++++++ crates/scyllaconn/src/conn.rs | 24 +++ crates/scyllaconn/src/events.rs | 34 +-- crates/scyllaconn/src/range.rs | 26 +++ crates/scyllaconn/src/scyllaconn.rs | 44 +--- crates/scyllaconn/src/status.rs | 4 +- 28 files changed, 611 insertions(+), 244 deletions(-) create mode 100644 crates/httpret/src/api4/accounting.rs create mode 100644 crates/httpret/src/requests.rs create mode 100644 crates/items_2/src/accounting.rs create mode 100644 crates/scyllaconn/src/accounting.rs create mode 100644 crates/scyllaconn/src/conn.rs create mode 100644 crates/scyllaconn/src/range.rs diff --git a/crates/httpclient/src/httpclient.rs b/crates/httpclient/src/httpclient.rs index 675b6de..aa569b8 100644 --- a/crates/httpclient/src/httpclient.rs +++ b/crates/httpclient/src/httpclient.rs @@ -7,7 +7,6 @@ use bytes::Bytes; use bytes::BytesMut; use futures_util::Stream; use futures_util::StreamExt; -use futures_util::TryStreamExt; use http::header; use http::Request; use http::Response; @@ -18,14 +17,10 @@ use http_body_util::BodyExt; use hyper::body::Body; use hyper::body::Incoming; use hyper::client::conn::http1::SendRequest; -use hyper::Method; use netpod::log::*; -use netpod::AppendToUrl; -use netpod::ChannelConfigQuery; -use netpod::ChannelConfigResponse; -use netpod::NodeConfigCached; use netpod::ReqCtx; use netpod::APP_JSON; +use netpod::X_DAQBUF_REQID; use serde::Serialize; use std::fmt; use std::pin::Pin; @@ -240,7 +235,7 @@ pub async fn http_get(url: Url, accept: &str, ctx: &ReqCtx) -> Result Re .header(header::HOST, url.host_str().ok_or_else(|| Error::NoHostInUrl)?) .header(header::CONTENT_TYPE, APP_JSON) .header(header::ACCEPT, accept) - .header("daqbuf-reqid", ctx.reqid()) + .header(X_DAQBUF_REQID, ctx.reqid()) .body(body_string(body))?; let mut send_req = connect_client(req.uri()).await?; let res = send_req.send_request(req).await?; diff --git a/crates/httpclient/src/lib.rs b/crates/httpclient/src/lib.rs index 25060ac..50aaf80 100644 --- a/crates/httpclient/src/lib.rs +++ b/crates/httpclient/src/lib.rs @@ -2,5 +2,6 @@ pub mod httpclient; pub use crate::httpclient::*; pub use http; +pub use http_body; pub use hyper; pub use url; diff --git a/crates/httpret/src/api4.rs b/crates/httpret/src/api4.rs index 65683ac..ac8d0ca 100644 --- a/crates/httpret/src/api4.rs +++ b/crates/httpret/src/api4.rs @@ -1,3 +1,4 @@ +pub mod accounting; pub mod binned; pub mod databuffer_tools; pub mod docs; diff --git a/crates/httpret/src/api4/accounting.rs b/crates/httpret/src/api4/accounting.rs new file mode 100644 index 0000000..a7b6b42 --- /dev/null +++ b/crates/httpret/src/api4/accounting.rs @@ -0,0 +1,86 @@ +use crate::bodystream::response; +use crate::err::Error; +use crate::requests::accepts_json_or_all; +use crate::ReqCtx; +use futures_util::StreamExt; +use http::Method; +use http::StatusCode; +use httpclient::body_empty; +use httpclient::body_string; +use httpclient::IntoBody; +use httpclient::Requ; +use httpclient::StreamResponse; +use httpclient::ToJsonBody; +use items_0::Empty; +use items_0::Extendable; +use items_2::accounting::AccountingEvents; +use items_2::channelevents::ChannelStatusEvents; +use netpod::log::*; +use netpod::query::ChannelStateEventsQuery; +use netpod::req_uri_to_url; +use netpod::FromUrl; +use netpod::NodeConfigCached; + +pub struct AccountingIngestedBytes {} + +impl AccountingIngestedBytes { + pub fn handler(req: &Requ) -> Option { + if req.uri().path().starts_with("/api/4/status/accounting/ingested/bytes/") { + Some(Self {}) + } else { + None + } + } + + pub async fn handle( + &self, + req: Requ, + _ctx: &ReqCtx, + node_config: &NodeConfigCached, + ) -> Result { + if req.method() == Method::GET { + if accepts_json_or_all(req.headers()) { + let url = req_uri_to_url(req.uri())?; + let q = ChannelStateEventsQuery::from_url(&url)?; + match self.fetch_data(&q, node_config).await { + Ok(k) => { + let body = ToJsonBody::from(&k).into_body(); + Ok(response(StatusCode::OK).body(body)?) + } + Err(e) => { + error!("{e}"); + Ok(response(StatusCode::INTERNAL_SERVER_ERROR) + .body(body_string(format!("{:?}", e.public_msg())))?) + } + } + } else { + Ok(response(StatusCode::BAD_REQUEST).body(body_empty())?) + } + } else { + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?) + } + } + + async fn fetch_data( + &self, + q: &ChannelStateEventsQuery, + node_config: &NodeConfigCached, + ) -> Result { + let scyco = node_config + .node_config + .cluster + .scylla + .as_ref() + .ok_or_else(|| Error::with_public_msg_no_trace(format!("no scylla configured")))?; + let scy = scyllaconn::conn::create_scy_session(scyco).await?; + // TODO so far, we sum over everything + let series_id = 0; + let mut stream = scyllaconn::accounting::AccountingStreamScylla::new(series_id, q.range().clone(), scy); + let mut ret = AccountingEvents::empty(); + while let Some(item) = stream.next().await { + let mut item = item?; + ret.extend_from(&mut item); + } + Ok(ret) + } +} diff --git a/crates/httpret/src/api4/binned.rs b/crates/httpret/src/api4/binned.rs index b00ab9d..4e4580c 100644 --- a/crates/httpret/src/api4/binned.rs +++ b/crates/httpret/src/api4/binned.rs @@ -1,8 +1,8 @@ use crate::bodystream::response; +use crate::bodystream::response_err_msg; use crate::bodystream::ToPublicResponse; use crate::channelconfig::ch_conf_from_binned; use crate::err::Error; -use crate::response_err; use http::Method; use http::StatusCode; use httpclient::body_empty; @@ -68,12 +68,12 @@ async fn binned(req: Requ, ctx: &ReqCtx, node_config: &NodeConfigCached) -> Resu if crate::accepts_json(&req.headers()) { Ok(binned_json(url, req, ctx, node_config).await?) } else if crate::accepts_octets(&req.headers()) { - Ok(response_err( + Ok(response_err_msg( StatusCode::NOT_ACCEPTABLE, format!("binary binned data not yet available"), )?) } else { - let ret = response_err( + let ret = response_err_msg( StatusCode::NOT_ACCEPTABLE, format!("Unsupported Accept: {:?}", req.headers()), )?; diff --git a/crates/httpret/src/api4/databuffer_tools.rs b/crates/httpret/src/api4/databuffer_tools.rs index 0b08513..0c6430d 100644 --- a/crates/httpret/src/api4/databuffer_tools.rs +++ b/crates/httpret/src/api4/databuffer_tools.rs @@ -1,5 +1,5 @@ use crate::bodystream::response; -use crate::response_err; +use crate::bodystream::response_err_msg; use async_channel::Receiver; use async_channel::Sender; use bytes::Bytes; @@ -27,7 +27,6 @@ use std::pin::Pin; use std::task::Context; use std::task::Poll; use taskrun::tokio; -use url::Url; #[derive(Debug, ThisError)] pub enum FindActiveError { @@ -75,7 +74,7 @@ impl FindActiveHandler { Ok(ret) => Ok(ret), Err(e) => { error!("{e}"); - let res = response_err(StatusCode::NOT_ACCEPTABLE, e.to_public_error()) + let res = response_err_msg(StatusCode::NOT_ACCEPTABLE, e.to_public_error()) .map_err(|_| FindActiveError::InternalError)?; Ok(res) } diff --git a/crates/httpret/src/api4/eventdata.rs b/crates/httpret/src/api4/eventdata.rs index 7694384..ab007eb 100644 --- a/crates/httpret/src/api4/eventdata.rs +++ b/crates/httpret/src/api4/eventdata.rs @@ -1,5 +1,5 @@ +use crate::bodystream::response_err_msg; use crate::response; -use crate::response_err; use crate::ReqCtx; use err::thiserror; use err::ThisError; @@ -60,7 +60,7 @@ impl EventDataHandler { Ok(ret) => Ok(ret), Err(e) => { error!("{e}"); - let res = response_err(StatusCode::NOT_ACCEPTABLE, e.to_public_error()) + let res = response_err_msg(StatusCode::NOT_ACCEPTABLE, e.to_public_error()) .map_err(|_| EventDataError::InternalError)?; Ok(res) } diff --git a/crates/httpret/src/api4/events.rs b/crates/httpret/src/api4/events.rs index 35cc6c2..beaecef 100644 --- a/crates/httpret/src/api4/events.rs +++ b/crates/httpret/src/api4/events.rs @@ -1,14 +1,15 @@ +use crate::bodystream::response_err_msg; use crate::channelconfig::chconf_from_events_quorum; use crate::err::Error; +use crate::requests::accepts_cbor_frames; +use crate::requests::accepts_json_or_all; use crate::response; -use crate::response_err; use crate::ToPublicResponse; use bytes::Bytes; use bytes::BytesMut; use futures_util::future; use futures_util::stream; use futures_util::StreamExt; -use http::header; use http::Method; use http::StatusCode; use httpclient::body_empty; @@ -22,9 +23,6 @@ use netpod::req_uri_to_url; use netpod::FromUrl; use netpod::NodeConfigCached; use netpod::ReqCtx; -use netpod::ACCEPT_ALL; -use netpod::APP_CBOR; -use netpod::APP_JSON; use nodenet::client::OpenBoxedBytesViaHttp; use query::api4::events::PlainEventsQuery; use url::Url; @@ -60,18 +58,13 @@ impl EventsHandler { } async fn plain_events(req: Requ, ctx: &ReqCtx, node_config: &NodeConfigCached) -> Result { - let accept_def = APP_JSON; - let accept = req - .headers() - .get(header::ACCEPT) - .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); let url = req_uri_to_url(req.uri())?; - if accept.contains(APP_JSON) || accept.contains(ACCEPT_ALL) { - Ok(plain_events_json(url, req, ctx, node_config).await?) - } else if accept == APP_CBOR { + if accepts_cbor_frames(req.headers()) { Ok(plain_events_cbor(url, req, ctx, node_config).await?) + } else if accepts_json_or_all(req.headers()) { + Ok(plain_events_json(url, req, ctx, node_config).await?) } else { - let ret = response_err(StatusCode::NOT_ACCEPTABLE, format!("unsupported accept: {}", accept))?; + let ret = response_err_msg(StatusCode::NOT_ACCEPTABLE, format!("unsupported accept {:?}", req))?; Ok(ret) } } diff --git a/crates/httpret/src/bodystream.rs b/crates/httpret/src/bodystream.rs index c1e5386..84ce003 100644 --- a/crates/httpret/src/bodystream.rs +++ b/crates/httpret/src/bodystream.rs @@ -1,4 +1,5 @@ use crate::err::Error; +use crate::RetrievalError; use http::Response; use http::StatusCode; use httpclient::body_empty; @@ -15,6 +16,14 @@ where Response::builder().status(status) } +pub fn response_err_msg(status: StatusCode, msg: T) -> Result +where + T: ToString, +{ + let ret = response(status).body(body_string(msg))?; + Ok(ret) +} + pub trait ToPublicResponse { fn to_public_response(&self) -> StreamResponse; } diff --git a/crates/httpret/src/channel_status.rs b/crates/httpret/src/channel_status.rs index 18f6993..b67d9f1 100644 --- a/crates/httpret/src/channel_status.rs +++ b/crates/httpret/src/channel_status.rs @@ -78,7 +78,7 @@ impl ConnectionStatusEvents { .scylla .as_ref() .ok_or_else(|| Error::with_public_msg_no_trace(format!("no scylla configured")))?; - let _scy = scyllaconn::create_scy_session(scyco).await?; + let _scy = scyllaconn::conn::create_scy_session(scyco).await?; let _chconf = nodenet::channelconfig::channel_config(q.range().clone(), q.channel().clone(), node_config).await?; let _do_one_before_range = true; @@ -152,7 +152,7 @@ impl ChannelStatusEventsHandler { .scylla .as_ref() .ok_or_else(|| Error::with_public_msg_no_trace(format!("no scylla configured")))?; - let scy = scyllaconn::create_scy_session(scyco).await?; + let scy = scyllaconn::conn::create_scy_session(scyco).await?; let do_one_before_range = true; if false { let chconf = nodenet::channelconfig::channel_config(q.range().clone(), q.channel().clone(), node_config) diff --git a/crates/httpret/src/channelconfig.rs b/crates/httpret/src/channelconfig.rs index e209143..26973cf 100644 --- a/crates/httpret/src/channelconfig.rs +++ b/crates/httpret/src/channelconfig.rs @@ -267,7 +267,7 @@ impl ScyllaConfigsHisto { .scylla .as_ref() .ok_or_else(|| Error::with_public_msg_no_trace(format!("No Scylla configured")))?; - let scy = scyllaconn::create_scy_session(scyco).await?; + let scy = scyllaconn::conn::create_scy_session(scyco).await?; let res = scy .query( "select scalar_type, shape_dims, series from series_by_channel where facility = ? allow filtering", @@ -382,7 +382,7 @@ impl ScyllaChannelsWithType { .scylla .as_ref() .ok_or_else(|| Error::with_public_msg_no_trace(format!("No Scylla configured")))?; - let scy = scyllaconn::create_scy_session(scyco).await?; + let scy = scyllaconn::conn::create_scy_session(scyco).await?; let res = scy .query( "select channel_name, series from series_by_channel where facility = ? and scalar_type = ? and shape_dims = ? allow filtering", @@ -537,7 +537,7 @@ impl ScyllaChannelsActive { .scylla .as_ref() .ok_or_else(|| Error::with_public_msg_no_trace(format!("No Scylla configured")))?; - let scy = scyllaconn::create_scy_session(scyco).await?; + let scy = scyllaconn::conn::create_scy_session(scyco).await?; // Database stores tsedge/ts_msp in units of (10 sec), and we additionally map to the grid. let tsedge = q.tsedge / 10 / (6 * 2) * (6 * 2); info!( @@ -734,7 +734,7 @@ impl ScyllaSeriesTsMsp { .scylla .as_ref() .ok_or_else(|| Error::with_public_msg_no_trace(format!("No Scylla configured")))?; - let scy = scyllaconn::create_scy_session(scyco).await?; + let scy = scyllaconn::conn::create_scy_session(scyco).await?; let mut ts_msps = Vec::new(); let mut res = scy .query_iter("select ts_msp from ts_msp where series = ?", (q.series as i64,)) @@ -898,7 +898,7 @@ impl GenerateScyllaTestData { let dbconf = &node_config.node_config.cluster.database; let _pg_client = create_connection(dbconf).await?; let scyconf = node_config.node_config.cluster.scylla.as_ref().unwrap(); - let scy = scyllaconn::create_scy_session(scyconf).await?; + let scy = scyllaconn::conn::create_scy_session(scyconf).await?; let series: u64 = 42001; // TODO query `ts_msp` for all MSP values und use that to delete from event table first. // Only later delete also from the `ts_msp` table. diff --git a/crates/httpret/src/gather.rs b/crates/httpret/src/gather.rs index 4d4d79f..99e38e3 100644 --- a/crates/httpret/src/gather.rs +++ b/crates/httpret/src/gather.rs @@ -8,6 +8,7 @@ use http::Method; use http::StatusCode; use http::Uri; use httpclient::connect_client; +use httpclient::http; use hyper::body::Incoming; use hyper::Request; use hyper::Response; diff --git a/crates/httpret/src/httpret.rs b/crates/httpret/src/httpret.rs index dd886dc..e20c321 100644 --- a/crates/httpret/src/httpret.rs +++ b/crates/httpret/src/httpret.rs @@ -11,6 +11,7 @@ pub mod gather; pub mod prometheus; pub mod proxy; pub mod pulsemap; +pub mod requests; pub mod settings; use self::bodystream::ToPublicResponse; @@ -222,24 +223,6 @@ where impl UnwindSafe for Cont {} -// TODO remove because I want error bodies to be json. -pub fn response_err(status: StatusCode, msg: T) -> Result -where - T: AsRef, -{ - let msg = format!( - concat!( - "Error:\n{}\n", - "\nDocumentation pages API 1 and 4:", - "\nhttps://data-api.psi.ch/api/1/documentation/", - "\nhttps://data-api.psi.ch/api/4/documentation/", - ), - msg.as_ref() - ); - let ret = response(status).body(body_string(msg))?; - Ok(ret) -} - async fn http_service_try( req: Requ, ctx: ReqCtx, diff --git a/crates/httpret/src/proxy.rs b/crates/httpret/src/proxy.rs index 7a8e066..b39fe45 100644 --- a/crates/httpret/src/proxy.rs +++ b/crates/httpret/src/proxy.rs @@ -4,24 +4,29 @@ pub mod api4; use crate::api1::channel_search_configs_v1; use crate::api1::channel_search_list_v1; use crate::api1::gather_json_2_v1; +use crate::bodystream::response_err_msg; use crate::err::Error; use crate::gather::gather_get_json_generic; use crate::gather::SubRes; use crate::pulsemap::MapPulseQuery; use crate::response; -use crate::response_err; use crate::status_board_init; use crate::Cont; use futures_util::pin_mut; use futures_util::Stream; +use futures_util::StreamExt; use http::Method; +use http::Request; use http::StatusCode; use httpclient::body_empty; use httpclient::body_stream; use httpclient::body_string; +use httpclient::http; +use httpclient::http::header; use httpclient::read_body_bytes; use httpclient::IntoBody; use httpclient::Requ; +use httpclient::StreamIncoming; use httpclient::StreamResponse; use httpclient::ToJsonBody; use hyper::service::service_fn; @@ -42,13 +47,13 @@ use netpod::HasTimeout; use netpod::ProxyConfig; use netpod::ReqCtx; use netpod::ServiceVersion; -use netpod::ACCEPT_ALL; use netpod::APP_JSON; use netpod::PSI_DAQBUFFER_SERVICE_MARK; +use netpod::X_DAQBUF_REQID; use query::api4::binned::BinnedQuery; use serde::Deserialize; use serde::Serialize; -use serde_json::Value as JsonValue; +use std::fmt; use std::future::Future; use std::net::SocketAddr; use std::pin::Pin; @@ -190,17 +195,19 @@ async fn proxy_http_service_inner( } else if let Some(h) = api4::events::EventsHandler::handler(&req) { h.handle(req, ctx, &proxy_config).await } else if path == "/api/4/status/connection/events" { - Ok(proxy_single_backend_query::(req, ctx, proxy_config).await?) + Ok(proxy_backend_query::(req, ctx, proxy_config).await?) } else if path == "/api/4/status/channel/events" { - Ok(proxy_single_backend_query::(req, ctx, proxy_config).await?) + Ok(proxy_backend_query::(req, ctx, proxy_config).await?) } else if path.starts_with("/api/4/map/pulse-v2/") { - Ok(proxy_single_backend_query::(req, ctx, proxy_config).await?) + Ok(proxy_backend_query::(req, ctx, proxy_config).await?) } else if path.starts_with("/api/4/map/pulse/") { - Ok(proxy_single_backend_query::(req, ctx, proxy_config).await?) + Ok(proxy_backend_query::(req, ctx, proxy_config).await?) } else if path == "/api/4/binned" { - Ok(proxy_single_backend_query::(req, ctx, proxy_config).await?) + Ok(proxy_backend_query::(req, ctx, proxy_config).await?) + } else if let Some(h) = crate::api4::accounting::AccountingIngestedBytes::handler(&req) { + Ok(proxy_backend_query::(req, ctx, proxy_config).await?) } else if path == "/api/4/channel/config" { - Ok(proxy_single_backend_query::(req, ctx, proxy_config).await?) + Ok(proxy_backend_query::(req, ctx, proxy_config).await?) } else if path.starts_with("/api/4/test/http/204") { Ok(response(StatusCode::NO_CONTENT).body(body_string("No Content"))?) } else if path.starts_with("/api/4/test/http/400") { @@ -484,129 +491,100 @@ pub async fn channel_search(req: Requ, ctx: &ReqCtx, proxy_config: &ProxyConfig) } } -pub async fn proxy_single_backend_query( +pub async fn proxy_backend_query( req: Requ, ctx: &ReqCtx, proxy_config: &ProxyConfig, ) -> Result where - QT: FromUrl + AppendToUrl + HasBackend + HasTimeout, + QT: fmt::Debug + FromUrl + AppendToUrl + HasBackend + HasTimeout, { let (head, _body) = req.into_parts(); - info!("proxy_single_backend_query {}", head.uri); - match head.headers.get(http::header::ACCEPT) { - Some(v) => { - if v == APP_JSON || v == ACCEPT_ALL { - let url = req_uri_to_url(&head.uri)?; - let query = match QT::from_url(&url) { - Ok(k) => k, - Err(_) => { - let msg = format!("Malformed request or missing parameters"); - return Ok(response_err(StatusCode::BAD_REQUEST, msg)?); - } - }; - let sh = get_query_host_for_backend(&query.backend(), proxy_config)?; - // TODO remove this special case - // SPECIAL CASE: - // Since the inner proxy is not yet handling map-pulse requests without backend, - // we can not simply copy the original url here. - // Instead, url needs to get parsed and formatted. - // In general, the caller of this function should be able to provide a url, or maybe - // better a closure so that the url can even depend on backend. - let uri_path: String = if url.as_str().contains("/map/pulse/") { - match MapPulseQuery::from_url(&url) { - Ok(qu) => { - info!("qu {qu:?}"); - format!("/api/4/map/pulse/{}/{}", qu.backend, qu.pulse) - } - Err(e) => { - error!("{e:?}"); - String::from("/BAD") - } - } - } else { - head.uri.path().into() - }; - info!("uri_path {uri_path}"); - let urls = [sh] - .iter() - .map(|sh| match Url::parse(&format!("{}{}", sh, uri_path)) { - Ok(mut url) => { - query.append_to_url(&mut url); - Ok(url) - } - Err(e) => Err(Error::with_msg(format!("parse error for: {:?} {:?}", sh, e))), - }) - .fold_ok(Vec::new(), |mut a, x| { - a.push(x); - a - })?; - let tags: Vec<_> = urls.iter().map(|k| k.to_string()).collect(); - let nt = |tag: String, res: Response| { - let fut = async { - let (head, body) = res.into_parts(); - if head.status == StatusCode::OK { - let body = read_body_bytes(body).await?; - match serde_json::from_slice::(&body) { - Ok(val) => { - let ret = SubRes { - tag, - status: head.status, - val, - }; - Ok(ret) - } - Err(e) => { - warn!("can not parse response: {e:?}"); - Err(e.into()) - } - } - } else { - let body = read_body_bytes(body).await?; - let b = String::from_utf8_lossy(&body); - let ret = SubRes { - tag, - status: head.status, - // TODO would like to pass arbitrary type of body in these cases: - val: serde_json::Value::String(format!("{}", b)), - }; - Ok(ret) - } - }; - Box::pin(fut) as Pin, Error>> + Send>> - }; - let ft = |mut all: Vec<(crate::gather::Tag, Result, Error>)>| { - if all.len() > 0 { - all.truncate(1); - let (_tag, z) = all.pop().unwrap(); - match z { - Ok(z) => { - let res = z.val; - // TODO want to pass arbitrary body type: - let res = response(z.status) - .header(http::header::CONTENT_TYPE, APP_JSON) - .body(ToJsonBody::from(&res).into_body())?; - return Ok(res); - } - Err(e) => { - warn!("FT sees: {e}"); - let res = crate::bodystream::ToPublicResponse::to_public_response(&e); - return Ok(res); - } - } - } else { - return Err(Error::with_msg("no response from upstream")); - } - }; - let bodies = (0..urls.len()).into_iter().map(|_| None).collect(); - let ret = gather_get_json_generic(http::Method::GET, urls, bodies, tags, nt, ft, query.timeout(), ctx) - .await?; - Ok(ret) - } else { - Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?) + // TODO will we need some mechanism to modify the necessary url? + let url = req_uri_to_url(&head.uri)?; + let query = match QT::from_url(&url) { + Ok(k) => k, + Err(_) => { + let msg = format!("malformed request or missing parameters {head:?}"); + warn!("{msg}"); + return Ok(response_err_msg(StatusCode::BAD_REQUEST, msg)?); + } + }; + info!("proxy_backend_query {query:?} {head:?}"); + let query_host = get_query_host_for_backend(&query.backend(), proxy_config)?; + // TODO remove this special case + // SPECIAL CASE: + // Since the inner proxy is not yet handling map-pulse requests without backend, + // we can not simply copy the original url here. + // Instead, url needs to get parsed and formatted. + // In general, the caller of this function should be able to provide a url, or maybe + // better a closure so that the url can even depend on backend. + let uri_path: String = if url.as_str().contains("/map/pulse/") { + match MapPulseQuery::from_url(&url) { + Ok(qu) => { + info!("qu {qu:?}"); + format!("/api/4/map/pulse/{}/{}", qu.backend, qu.pulse) + } + Err(e) => { + error!("{e:?}"); + String::from("/BAD") } } - None => Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?), + } else { + head.uri.path().into() + }; + info!("uri_path {uri_path}"); + let mut url = Url::parse(&format!("{}{}", query_host, uri_path))?; + query.append_to_url(&mut url); + + let mut req = Request::builder() + .method(http::Method::GET) + .uri(url.to_string()) + .header( + header::HOST, + url.host_str() + .ok_or_else(|| Error::with_msg_no_trace("no host in url"))?, + ) + .header(X_DAQBUF_REQID, ctx.reqid()); + { + let hs = req + .headers_mut() + .ok_or_else(|| Error::with_msg_no_trace("can not set headers"))?; + for (hn, hv) in &head.headers { + if hn == header::HOST { + } else if hn == X_DAQBUF_REQID { + } else { + hs.append(hn, hv.clone()); + } + } + } + let req = req.body(body_empty())?; + + let fut = async move { + let mut send_req = httpclient::httpclient::connect_client(req.uri()).await?; + let res = send_req.send_request(req).await?; + Ok::<_, Error>(res) + }; + + let res = tokio::time::timeout(Duration::from_millis(5000), fut) + .await + .map_err(|_| { + let e = Error::with_msg_no_trace(format!("timeout trying to make sub request")); + warn!("{e}"); + e + })??; + + { + use bytes::Bytes; + use httpclient::http_body::Frame; + use httpclient::BodyError; + let (head, body) = res.into_parts(); + let body = StreamIncoming::new(body); + let body = body.map(|x| x.map(Frame::data)); + let body: Pin, BodyError>> + Send>> = Box::pin(body); + let body = http_body_util::StreamBody::new(body); + let ret = Response::from_parts(head, body); + Ok(ret) } } diff --git a/crates/httpret/src/proxy/api4/events.rs b/crates/httpret/src/proxy/api4/events.rs index 801d3cf..0f5bc0f 100644 --- a/crates/httpret/src/proxy/api4/events.rs +++ b/crates/httpret/src/proxy/api4/events.rs @@ -56,7 +56,7 @@ impl EventsHandler { if accept.contains(APP_CBOR) { self.handle_cbor(req, ctx, proxy_config).await } else if accept.contains(APP_JSON) { - return Ok(crate::proxy::proxy_single_backend_query::(req, ctx, proxy_config).await?); + return Ok(crate::proxy::proxy_backend_query::(req, ctx, proxy_config).await?); } else if accept.contains(ACCEPT_ALL) { todo!() } else { diff --git a/crates/httpret/src/pulsemap.rs b/crates/httpret/src/pulsemap.rs index 7ab89eb..74926a3 100644 --- a/crates/httpret/src/pulsemap.rs +++ b/crates/httpret/src/pulsemap.rs @@ -936,7 +936,7 @@ impl MapPulseScyllaHandler { } else { return Err(Error::with_public_msg_no_trace("no scylla configured")); }; - let scy = scyllaconn::create_scy_session(&scyconf).await?; + let scy = scyllaconn::conn::create_scy_session(&scyconf).await?; let pulse_a = (pulse >> 14) as i64; let pulse_b = (pulse & 0x3fff) as i32; let res = scy diff --git a/crates/httpret/src/requests.rs b/crates/httpret/src/requests.rs new file mode 100644 index 0000000..a9da4c6 --- /dev/null +++ b/crates/httpret/src/requests.rs @@ -0,0 +1,21 @@ +use httpclient::http::header; +use httpclient::http::header::HeaderMap; +use netpod::ACCEPT_ALL; +use netpod::APP_CBOR_FRAMES; +use netpod::APP_JSON; + +pub fn accepts_json_or_all(headers: &HeaderMap) -> bool { + let accept_def = APP_JSON; + let accept = headers + .get(header::ACCEPT) + .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); + accept.contains(APP_JSON) || accept.contains(ACCEPT_ALL) +} + +pub fn accepts_cbor_frames(headers: &HeaderMap) -> bool { + let accept_def = ""; + let accept = headers + .get(header::ACCEPT) + .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); + accept.contains(APP_CBOR_FRAMES) +} diff --git a/crates/items_2/src/accounting.rs b/crates/items_2/src/accounting.rs new file mode 100644 index 0000000..f8ca0a1 --- /dev/null +++ b/crates/items_2/src/accounting.rs @@ -0,0 +1,37 @@ +use items_0::Empty; +use items_0::Extendable; +use items_0::WithLen; +use serde::Deserialize; +use serde::Serialize; +use std::collections::VecDeque; + +#[derive(Debug, Serialize, Deserialize)] +pub struct AccountingEvents { + pub tss: VecDeque, + pub bytes: VecDeque, +} + +impl Empty for AccountingEvents { + fn empty() -> Self { + Self { + tss: VecDeque::new(), + bytes: VecDeque::new(), + } + } +} + +impl WithLen for AccountingEvents { + fn len(&self) -> usize { + self.tss.len() + } +} + +impl Extendable for AccountingEvents { + fn extend_from(&mut self, src: &mut Self) { + use core::mem::replace; + let v = replace(&mut src.tss, VecDeque::new()); + self.tss.extend(v.into_iter()); + let v = replace(&mut src.bytes, VecDeque::new()); + self.bytes.extend(v.into_iter()); + } +} diff --git a/crates/items_2/src/items_2.rs b/crates/items_2/src/items_2.rs index 2065dac..3c31de9 100644 --- a/crates/items_2/src/items_2.rs +++ b/crates/items_2/src/items_2.rs @@ -1,3 +1,4 @@ +pub mod accounting; pub mod binnedcollected; pub mod binsdim0; pub mod binsxbindim0; diff --git a/crates/netpod/src/netpod.rs b/crates/netpod/src/netpod.rs index ecdb68a..811f135 100644 --- a/crates/netpod/src/netpod.rs +++ b/crates/netpod/src/netpod.rs @@ -58,6 +58,8 @@ pub const CONNECTION_STATUS_DIV: u64 = timeunits::DAY; pub const TS_MSP_GRID_UNIT: u64 = timeunits::SEC * 10; pub const TS_MSP_GRID_SPACING: u64 = 6 * 2; +pub const EMIT_ACCOUNTING_SNAP: u64 = 60 * 10; + pub const DATETIME_FMT_0MS: &str = "%Y-%m-%dT%H:%M:%SZ"; pub const DATETIME_FMT_3MS: &str = "%Y-%m-%dT%H:%M:%S.%3fZ"; pub const DATETIME_FMT_6MS: &str = "%Y-%m-%dT%H:%M:%S.%6fZ"; @@ -3210,7 +3212,7 @@ pub struct ReqCtx { impl ReqCtx { pub fn new_with_node(req: &Request, nc: &NodeConfigCached) -> Self { let reqid_this = status_board().unwrap().new_status_id(); - let reqid = if let Some(reqid_parent) = req.headers().get("daqbuf-reqid") { + let reqid = if let Some(reqid_parent) = req.headers().get(X_DAQBUF_REQID) { let parent = reqid_parent.to_str().unwrap_or("badid"); format!("{}-{}", parent, reqid_this) } else { @@ -3234,7 +3236,7 @@ impl ReqCtx { pub fn new_with_proxy(req: &Request, proxy: &ProxyConfig) -> Self { let reqid_this = status_board().unwrap().new_status_id(); - let reqid = if let Some(reqid_parent) = req.headers().get("daqbuf-reqid") { + let reqid = if let Some(reqid_parent) = req.headers().get(X_DAQBUF_REQID) { let parent = reqid_parent.to_str().unwrap_or("badid"); format!("{}-{}", parent, reqid_this) } else { @@ -3297,7 +3299,7 @@ impl ReqCtx { } pub fn header_name(&self) -> &'static str { - "daqbuf-reqid" + X_DAQBUF_REQID } pub fn header_value(&self) -> &str { diff --git a/crates/nodenet/src/scylla.rs b/crates/nodenet/src/scylla.rs index e35abd2..2eaa6cd 100644 --- a/crates/nodenet/src/scylla.rs +++ b/crates/nodenet/src/scylla.rs @@ -23,7 +23,7 @@ pub async fn scylla_channel_event_stream( // TODO why both in PlainEventsQuery and as separate parameter? Check other usages. let do_one_before_range = false; // TODO use better builder pattern with shortcuts for production and dev defaults - let scy = scyllaconn::create_scy_session(scyco).await?; + let scy = scyllaconn::conn::create_scy_session(scyco).await?; let series = chconf.series(); let scalar_type = chconf.scalar_type(); let shape = chconf.shape(); diff --git a/crates/query/src/api4.rs b/crates/query/src/api4.rs index b11d861..f6a084b 100644 --- a/crates/query/src/api4.rs +++ b/crates/query/src/api4.rs @@ -1,2 +1,53 @@ pub mod binned; pub mod events; + +use err::Error; +use netpod::get_url_query_pairs; +use netpod::AppendToUrl; +use netpod::FromUrl; +use netpod::HasBackend; +use netpod::HasTimeout; +use serde::Deserialize; +use serde::Serialize; +use std::time::Duration; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AccountingIngestedBytesQuery { + backend: String, +} + +impl HasBackend for AccountingIngestedBytesQuery { + fn backend(&self) -> &str { + &self.backend + } +} + +impl HasTimeout for AccountingIngestedBytesQuery { + fn timeout(&self) -> Duration { + Duration::from_millis(5000) + } +} + +impl FromUrl for AccountingIngestedBytesQuery { + fn from_url(url: &url::Url) -> Result { + let pairs = get_url_query_pairs(url); + Self::from_pairs(&pairs) + } + + fn from_pairs(pairs: &std::collections::BTreeMap) -> Result { + let ret = Self { + backend: pairs + .get("backend") + .ok_or_else(|| Error::with_msg_no_trace("missing backend"))? + .to_string(), + }; + Ok(ret) + } +} + +impl AppendToUrl for AccountingIngestedBytesQuery { + fn append_to_url(&self, url: &mut url::Url) { + let mut g = url.query_pairs_mut(); + g.append_pair("backend", &self.backend); + } +} diff --git a/crates/scyllaconn/src/accounting.rs b/crates/scyllaconn/src/accounting.rs new file mode 100644 index 0000000..5be013e --- /dev/null +++ b/crates/scyllaconn/src/accounting.rs @@ -0,0 +1,197 @@ +use crate::errconv::ErrConv; +use err::Error; +use futures_util::Future; +use futures_util::FutureExt; +use futures_util::Stream; +use items_0::Empty; +use items_0::Extendable; +use items_0::WithLen; +use items_2::accounting::AccountingEvents; +use netpod::log::*; +use netpod::range::evrange::NanoRange; +use netpod::timeunits; +use netpod::EMIT_ACCOUNTING_SNAP; +use scylla::Session as ScySession; +use std::collections::VecDeque; +use std::pin::Pin; +use std::sync::Arc; +use std::task::Context; +use std::task::Poll; + +async fn read_next(ts_msp: u64, range: NanoRange, fwd: bool, scy: Arc) -> Result { + if ts_msp >= range.end { + warn!( + "given ts_msp {} >= range.end {} not necessary to read this", + ts_msp, range.end + ); + } + if range.end > i64::MAX as u64 { + return Err(Error::with_msg_no_trace(format!("range.end overflows i64"))); + } + let mut ret = AccountingEvents::empty(); + let mut tot_bytes = 0; + for part in 0..255_u32 { + let res = if fwd { + let ts_lsp_min = if ts_msp < range.beg { range.beg - ts_msp } else { 0 }; + let ts_lsp_max = if ts_msp < range.end { range.end - ts_msp } else { 0 }; + trace!( + "FWD ts_msp {} ts_lsp_min {} ts_lsp_max {} beg {} end {}", + ts_msp, + ts_lsp_min, + ts_lsp_max, + range.beg, + range.end + ); + // TODO use prepared! + let cql = concat!("select series, count, bytes from account_00 where part = ? and ts = ?"); + scy.query(cql, (part as i32, ts_msp as i64)).await.err_conv()? + } else { + return Err(Error::with_msg_no_trace("no backward support")); + }; + type RowType = (i64, i64, i64); + for row in res.rows_typed_or_empty::() { + let row = row.err_conv()?; + let ts = ts_msp; + let series = row.0 as u64; + let count = row.1 as u64; + let bytes = row.1 as u64; + tot_bytes += bytes; + } + } + ret.tss.push_back(ts_msp); + ret.bytes.push_back(tot_bytes); + trace!("found in total {} events ts_msp {}", ret.len(), ts_msp); + Ok(ret) +} + +struct ReadValues { + series: u64, + range: NanoRange, + ts_msps: VecDeque, + fwd: bool, + do_one_before_range: bool, + fut: Pin> + Send>>, + scy: Arc, +} + +impl ReadValues { + fn new( + series: u64, + range: NanoRange, + ts_msps: VecDeque, + fwd: bool, + do_one_before_range: bool, + scy: Arc, + ) -> Self { + let mut ret = Self { + series, + range, + ts_msps, + fwd, + do_one_before_range, + fut: Box::pin(futures_util::future::ready(Err(Error::with_msg_no_trace( + "future not initialized", + )))), + scy, + }; + ret.next(); + ret + } + + fn next(&mut self) -> bool { + if let Some(ts_msp) = self.ts_msps.pop_front() { + self.fut = self.make_fut(ts_msp); + true + } else { + debug!("no more msp"); + false + } + } + + fn make_fut(&mut self, ts_msp: u64) -> Pin> + Send>> { + debug!("make fut for {ts_msp}"); + let fut = read_next(ts_msp, self.range.clone(), self.fwd, self.scy.clone()); + Box::pin(fut) + } +} + +enum FrState { + New, + ReadValues(ReadValues), + Done, +} + +pub struct AccountingStreamScylla { + state: FrState, + series: u64, + range: NanoRange, + scy: Arc, + outbuf: AccountingEvents, +} + +impl AccountingStreamScylla { + pub fn new(series: u64, range: NanoRange, scy: Arc) -> Self { + Self { + state: FrState::New, + series, + range, + scy, + outbuf: AccountingEvents::empty(), + } + } +} + +impl Stream for AccountingStreamScylla { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + let span = tracing::span!(tracing::Level::TRACE, "poll_next"); + let _spg = span.enter(); + loop { + if self.outbuf.len() > 0 { + let item = std::mem::replace(&mut self.outbuf, AccountingEvents::empty()); + break Ready(Some(Ok(item))); + } + break match self.state { + FrState::New => { + let mut ts_msps = VecDeque::new(); + let mut ts = self.range.beg / timeunits::SEC / EMIT_ACCOUNTING_SNAP * EMIT_ACCOUNTING_SNAP; + while ts < self.range.end { + debug!("use ts {ts}"); + ts_msps.push_back(ts); + ts += EMIT_ACCOUNTING_SNAP; + } + let fwd = true; + let do_one_before_range = false; + let st = ReadValues::new( + self.series, + self.range.clone(), + ts_msps, + fwd, + do_one_before_range, + self.scy.clone(), + ); + self.state = FrState::ReadValues(st); + continue; + } + FrState::ReadValues(ref mut st) => match st.fut.poll_unpin(cx) { + Ready(Ok(mut item)) => { + if !st.next() { + debug!("ReadValues exhausted"); + self.state = FrState::Done; + } + self.outbuf.extend_from(&mut item); + continue; + } + Ready(Err(e)) => { + error!("{e}"); + Ready(Some(Err(e))) + } + Pending => Pending, + }, + FrState::Done => Ready(None), + }; + } + } +} diff --git a/crates/scyllaconn/src/conn.rs b/crates/scyllaconn/src/conn.rs new file mode 100644 index 0000000..5eaf8cb --- /dev/null +++ b/crates/scyllaconn/src/conn.rs @@ -0,0 +1,24 @@ +use crate::errconv::ErrConv; +use err::Error; +use netpod::ScyllaConfig; +use scylla::execution_profile::ExecutionProfileBuilder; +use scylla::statement::Consistency; +use scylla::Session as ScySession; +use std::sync::Arc; + +pub async fn create_scy_session(scyconf: &ScyllaConfig) -> Result, Error> { + let scy = scylla::SessionBuilder::new() + .known_nodes(&scyconf.hosts) + .use_keyspace(&scyconf.keyspace, true) + .default_execution_profile_handle( + ExecutionProfileBuilder::default() + .consistency(Consistency::LocalOne) + .build() + .into_handle(), + ) + .build() + .await + .err_conv()?; + let ret = Arc::new(scy); + Ok(ret) +} diff --git a/crates/scyllaconn/src/events.rs b/crates/scyllaconn/src/events.rs index fd17482..a7e3ef8 100644 --- a/crates/scyllaconn/src/events.rs +++ b/crates/scyllaconn/src/events.rs @@ -1,5 +1,5 @@ use crate::errconv::ErrConv; -use crate::ScyllaSeriesRange; +use crate::range::ScyllaSeriesRange; use err::Error; use futures_util::Future; use futures_util::FutureExt; @@ -33,14 +33,14 @@ async fn find_ts_msp( let mut ret2 = VecDeque::new(); // TODO use prepared statements let cql = "select ts_msp from ts_msp where series = ? and ts_msp < ? order by ts_msp desc limit 2"; - let res = scy.query(cql, (series as i64, range.beg as i64)).await.err_conv()?; + let res = scy.query(cql, (series as i64, range.beg() as i64)).await.err_conv()?; for row in res.rows_typed_or_empty::<(i64,)>() { let row = row.err_conv()?; ret1.push_front(row.0 as u64); } let cql = "select ts_msp from ts_msp where series = ? and ts_msp >= ? and ts_msp < ?"; let res = scy - .query(cql, (series as i64, range.beg as i64, range.end as i64)) + .query(cql, (series as i64, range.beg() as i64, range.end() as i64)) .await .err_conv()?; for row in res.rows_typed_or_empty::<(i64,)>() { @@ -48,7 +48,7 @@ async fn find_ts_msp( ret2.push_back(row.0 as u64); } let cql = "select ts_msp from ts_msp where series = ? and ts_msp >= ? limit 1"; - let res = scy.query(cql, (series as i64, range.end as i64)).await.err_conv()?; + let res = scy.query(cql, (series as i64, range.end() as i64)).await.err_conv()?; for row in res.rows_typed_or_empty::<(i64,)>() { let row = row.err_conv()?; ret2.push_back(row.0 as u64); @@ -149,7 +149,7 @@ where let fwd = opts.fwd; let scy = opts.scy; let table_name = ST::table_name(); - if range.end > i64::MAX as u64 { + if range.end() > i64::MAX as u64 { return Err(Error::with_msg_no_trace(format!("range.end overflows i64"))); } let cql_fields = if opts.with_values { @@ -158,15 +158,15 @@ where "ts_lsp, pulse" }; let ret = if fwd { - let ts_lsp_min = if ts_msp < range.beg { range.beg - ts_msp } else { 0 }; - let ts_lsp_max = if ts_msp < range.end { range.end - ts_msp } else { 0 }; + let ts_lsp_min = if ts_msp < range.beg() { range.beg() - ts_msp } else { 0 }; + let ts_lsp_max = if ts_msp < range.end() { range.end() - ts_msp } else { 0 }; trace!( "FWD ts_msp {} ts_lsp_min {} ts_lsp_max {} beg {} end {} {}", ts_msp, ts_lsp_min, ts_lsp_max, - range.beg, - range.end, + range.beg(), + range.end(), table_name, ); // TODO use prepared! @@ -200,10 +200,10 @@ where let value = ValTy::default(); (ts, pulse, value) }; - if ts >= range.end { + if ts >= range.end() { // TODO count as logic error error!("ts >= range.end"); - } else if ts >= range.beg { + } else if ts >= range.beg() { if pulse % 27 != 3618 { ret.push(ts, pulse, value); } @@ -216,13 +216,13 @@ where } ret } else { - let ts_lsp_max = if ts_msp < range.beg { range.beg - ts_msp } else { 0 }; + let ts_lsp_max = if ts_msp < range.beg() { range.beg() - ts_msp } else { 0 }; trace!( "BCK ts_msp {} ts_lsp_max {} beg {} end {} {}", ts_msp, ts_lsp_max, - range.beg, - range.end, + range.beg(), + range.end(), table_name, ); // TODO use prepared! @@ -253,10 +253,10 @@ where let value = ValTy::default(); (ts, pulse, value) }; - if ts >= range.beg { + if ts >= range.beg() { // TODO count as logic error error!("ts >= range.beg"); - } else if ts < range.beg { + } else if ts < range.beg() { if pulse % 27 != 3618 { ret.push(ts, pulse, value); } @@ -445,7 +445,7 @@ impl EventsStreamScylla { self.ts_msp_fwd = msps2; for x in self.ts_msp_bck.iter().rev() { let x = x.clone(); - if x >= self.range.end { + if x >= self.range.end() { info!("FOUND one-after because of MSP"); self.found_one_after = true; } diff --git a/crates/scyllaconn/src/range.rs b/crates/scyllaconn/src/range.rs new file mode 100644 index 0000000..ac0fe66 --- /dev/null +++ b/crates/scyllaconn/src/range.rs @@ -0,0 +1,26 @@ +use netpod::range::evrange::SeriesRange; + +#[derive(Debug, Clone)] +pub struct ScyllaSeriesRange { + beg: u64, + end: u64, +} + +impl ScyllaSeriesRange { + pub fn beg(&self) -> u64 { + self.beg + } + + pub fn end(&self) -> u64 { + self.end + } +} + +impl From<&SeriesRange> for ScyllaSeriesRange { + fn from(value: &SeriesRange) -> Self { + match value { + SeriesRange::TimeRange(k) => Self { beg: k.beg, end: k.end }, + SeriesRange::PulseRange(k) => Self { beg: k.beg, end: k.end }, + } + } +} diff --git a/crates/scyllaconn/src/scyllaconn.rs b/crates/scyllaconn/src/scyllaconn.rs index 29d0403..08400bf 100644 --- a/crates/scyllaconn/src/scyllaconn.rs +++ b/crates/scyllaconn/src/scyllaconn.rs @@ -1,47 +1,9 @@ +pub mod accounting; pub mod bincache; +pub mod conn; pub mod errconv; pub mod events; +pub mod range; pub mod status; pub use scylla; - -use err::Error; -use errconv::ErrConv; -use netpod::range::evrange::SeriesRange; -use netpod::ScyllaConfig; -use scylla::execution_profile::ExecutionProfileBuilder; -use scylla::statement::Consistency; -use scylla::Session as ScySession; -use std::sync::Arc; - -#[derive(Debug, Clone)] -pub struct ScyllaSeriesRange { - beg: u64, - end: u64, -} - -impl From<&SeriesRange> for ScyllaSeriesRange { - fn from(value: &SeriesRange) -> Self { - match value { - SeriesRange::TimeRange(k) => Self { beg: k.beg, end: k.end }, - SeriesRange::PulseRange(k) => Self { beg: k.beg, end: k.end }, - } - } -} - -pub async fn create_scy_session(scyconf: &ScyllaConfig) -> Result, Error> { - let scy = scylla::SessionBuilder::new() - .known_nodes(&scyconf.hosts) - .use_keyspace(&scyconf.keyspace, true) - .default_execution_profile_handle( - ExecutionProfileBuilder::default() - .consistency(Consistency::LocalOne) - .build() - .into_handle(), - ) - .build() - .await - .err_conv()?; - let ret = Arc::new(scy); - Ok(ret) -} diff --git a/crates/scyllaconn/src/status.rs b/crates/scyllaconn/src/status.rs index 03362d6..87de0a8 100644 --- a/crates/scyllaconn/src/status.rs +++ b/crates/scyllaconn/src/status.rs @@ -143,13 +143,13 @@ impl ReadValues { self.fut = self.make_fut(ts_msp); true } else { - info!("no more msp"); + debug!("no more msp"); false } } fn make_fut(&mut self, ts_msp: u64) -> Pin> + Send>> { - info!("make fut for {ts_msp}"); + debug!("make fut for {ts_msp}"); let fut = read_next_status_events( self.series, ts_msp,