From 22b43fe012f8cc7d7565e01c132cb905889d99b5 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Mon, 11 Apr 2022 17:25:23 +0200 Subject: [PATCH] Add config query for scylla --- httpret/Cargo.toml | 1 + httpret/src/channelconfig.rs | 62 ++++++++++++++++++++++++++-- httpret/src/events.rs | 3 +- httpret/src/httpret.rs | 5 ++- httpret/src/pulsemap.rs | 78 ++++++++++++++++++++++++++++++++++++ netpod/src/netpod.rs | 2 + nodenet/Cargo.toml | 1 + nodenet/src/conn.rs | 63 +++++++++++++++++++++++++++++ 8 files changed, 210 insertions(+), 5 deletions(-) diff --git a/httpret/Cargo.toml b/httpret/Cargo.toml index 8ecbd78..6a6a468 100644 --- a/httpret/Cargo.toml +++ b/httpret/Cargo.toml @@ -33,3 +33,4 @@ archapp_wrap = { path = "../archapp_wrap" } nodenet = { path = "../nodenet" } commonio = { path = "../commonio" } taskrun = { path = "../taskrun" } +scylla = "0.4" diff --git a/httpret/src/channelconfig.rs b/httpret/src/channelconfig.rs index 65aeeeb..bb404b7 100644 --- a/httpret/src/channelconfig.rs +++ b/httpret/src/channelconfig.rs @@ -3,9 +3,11 @@ use crate::{response, ToPublicResponse}; use http::{Method, Request, Response, StatusCode}; use hyper::Body; use netpod::log::*; -use netpod::NodeConfigCached; -use netpod::{ChannelConfigQuery, FromUrl}; +use netpod::{ChannelConfigQuery, FromUrl, ScalarType, Shape}; +use netpod::{ChannelConfigResponse, NodeConfigCached}; use netpod::{ACCEPT_ALL, APP_JSON}; +use scylla::frame::response::cql_to_rust::FromRowError as ScyFromRowError; +use scylla::transport::errors::{NewSessionError as ScyNewSessionError, QueryError as ScyQueryError}; use url::Url; pub struct ChannelConfigHandler {} @@ -43,11 +45,65 @@ impl ChannelConfigHandler { } } +trait ErrConv { + fn err_conv(self) -> Result; +} + +impl ErrConv for Result { + fn err_conv(self) -> Result { + match self { + Ok(k) => Ok(k), + Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))), + } + } +} + +impl ErrConv for Result { + fn err_conv(self) -> Result { + match self { + Ok(k) => Ok(k), + Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))), + } + } +} + +impl ErrConv for Result { + fn err_conv(self) -> Result { + match self { + Ok(k) => Ok(k), + Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))), + } + } +} + pub async fn channel_config(req: Request, node_config: &NodeConfigCached) -> Result, Error> { + info!("channel_config"); let url = Url::parse(&format!("dummy:{}", req.uri()))?; //let pairs = get_url_query_pairs(&url); let q = ChannelConfigQuery::from_url(&url)?; - let conf = if let Some(conf) = &node_config.node.channel_archiver { + info!("channel_config for q {q:?}"); + let conf = if q.channel.backend == "scylla" { + // Find the "series" id. + let scy = scylla::SessionBuilder::new() + .known_node("sf-daqbuf-34:8340") + .build() + .await + .err_conv()?; + let cql = "select dtype, series from series_by_channel where facility = ? and channel_name = ?"; + let res = scy.query(cql, ()).await.err_conv()?; + let rows = res.rows_typed_or_empty::<(i32, i32)>(); + for r in rows { + let r = r.err_conv()?; + info!("got row {r:?}"); + } + let res = ChannelConfigResponse { + channel: q.channel, + scalar_type: ScalarType::F32, + byte_order: None, + shape: Shape::Scalar, + }; + res + } else if let Some(conf) = &node_config.node.channel_archiver { archapp_wrap::archapp::archeng::channel_config_from_db(&q, conf, &node_config.node_config.cluster.database) .await? } else if let Some(conf) = &node_config.node.archiver_appliance { diff --git a/httpret/src/events.rs b/httpret/src/events.rs index cb6917e..eab86fd 100644 --- a/httpret/src/events.rs +++ b/httpret/src/events.rs @@ -68,9 +68,10 @@ async fn plain_events_binary(req: Request, node_config: &NodeConfigCached) } async fn plain_events_json(req: Request, node_config: &NodeConfigCached) -> Result, Error> { - debug!("httpret plain_events_json req: {:?}", req); + info!("httpret plain_events_json req: {:?}", req); let (head, _body) = req.into_parts(); let query = PlainEventsJsonQuery::from_request_head(&head)?; + let op = disk::channelexec::PlainEventsJson::new( query.channel().clone(), query.range().clone(), diff --git a/httpret/src/httpret.rs b/httpret/src/httpret.rs index 72b8424..1ac3d88 100644 --- a/httpret/src/httpret.rs +++ b/httpret/src/httpret.rs @@ -206,7 +206,8 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> let ret = serde_json::json!({ "data_api_version": { "major": 4u32, - "minor": 0u32, + "minor": 1u32, + "patch": 0u32, }, }); Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&ret)?))?) @@ -305,6 +306,8 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> h.handle(req, &node_config).await } else if let Some(h) = evinfo::EventInfoScan::handler(&req) { h.handle(req, &node_config).await + } else if let Some(h) = pulsemap::MapPulseScyllaHandler::handler(&req) { + h.handle(req, &node_config).await } else if let Some(h) = pulsemap::IndexFullHttpFunction::handler(&req) { h.handle(req, &node_config).await } else if let Some(h) = pulsemap::MarkClosedHttpFunction::handler(&req) { diff --git a/httpret/src/pulsemap.rs b/httpret/src/pulsemap.rs index 58480b5..8c2ac23 100644 --- a/httpret/src/pulsemap.rs +++ b/httpret/src/pulsemap.rs @@ -478,6 +478,84 @@ struct LocalMap { channels: Vec, } +pub trait ErrConv { + fn err_conv(self) -> Result; +} + +impl ErrConv for Result { + fn err_conv(self) -> Result { + self.map_err(|e| err::Error::with_msg_no_trace(format!("{e:?}"))) + } +} + +impl ErrConv for Result { + fn err_conv(self) -> Result { + self.map_err(|e| err::Error::with_msg_no_trace(format!("{e:?}"))) + } +} + +impl ErrConv for Result { + fn err_conv(self) -> Result { + self.map_err(|e| err::Error::with_msg_no_trace(format!("{e:?}"))) + } +} + +pub struct MapPulseScyllaHandler {} + +impl MapPulseScyllaHandler { + pub fn prefix() -> &'static str { + "/api/4/scylla/map/pulse/" + } + + pub fn handler(req: &Request) -> Option { + if req.uri().path().starts_with(Self::prefix()) { + Some(Self {}) + } else { + None + } + } + + pub async fn handle(&self, req: Request, _node_config: &NodeConfigCached) -> Result, Error> { + if req.method() != Method::GET { + return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?); + } + let urls = format!("dummy://{}", req.uri()); + let url = url::Url::parse(&urls)?; + let query = MapPulseQuery::from_url(&url)?; + let pulse = query.pulse; + let scy = scylla::SessionBuilder::new() + .known_node("sf-daqbuf-34:19042") + .default_consistency(scylla::batch::Consistency::One) + .use_keyspace("ks1", false) + .build() + .await + .err_conv()?; + let pulse_a = (pulse >> 14) as i64; + let pulse_b = (pulse & 0x3fff) as i32; + let res = scy + .query( + "select ts_a, ts_b from pulse where pulse_a = ? and pulse_b = ?", + (pulse_a, pulse_b), + ) + .await + .err_conv()?; + let rows = res.rows().err_conv()?; + let mut tss = vec![]; + let mut channels = vec![]; + use scylla::frame::response::result::CqlValue; + let ts_a_def = CqlValue::BigInt(0); + let ts_b_def = CqlValue::Int(0); + for row in rows { + let ts_a = row.columns[0].as_ref().unwrap_or(&ts_a_def).as_bigint().unwrap_or(0) as u64; + let ts_b = row.columns[1].as_ref().unwrap_or(&ts_b_def).as_int().unwrap_or(0) as u32 as u64; + tss.push(ts_a * netpod::timeunits::SEC + ts_b); + channels.push("scylla".into()); + } + let ret = LocalMap { pulse, tss, channels }; + Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&ret)?))?) + } +} + pub struct MapPulseLocalHttpFunction {} impl MapPulseLocalHttpFunction { diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index 6600abc..1ca5fa4 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -116,6 +116,8 @@ impl ScalarType { "int64" => I64, "float" => F32, "double" => F64, + "float32" => F32, + "float64" => F64, "string" => STRING, _ => { return Err(Error::with_msg_no_trace(format!( diff --git a/nodenet/Cargo.toml b/nodenet/Cargo.toml index 9f3888c..2ca8ca4 100644 --- a/nodenet/Cargo.toml +++ b/nodenet/Cargo.toml @@ -20,6 +20,7 @@ futures-core = "0.3.14" futures-util = "0.3.14" tracing = "0.1.25" hex = "0.4.3" +scylla = "0.4.4" err = { path = "../err" } netpod = { path = "../netpod" } disk = { path = "../disk" } diff --git a/nodenet/src/conn.rs b/nodenet/src/conn.rs index a44dc6e..5caee2f 100644 --- a/nodenet/src/conn.rs +++ b/nodenet/src/conn.rs @@ -9,6 +9,8 @@ use netpod::histo::HistoLog2; use netpod::query::RawEventsQuery; use netpod::{log::*, AggKind}; use netpod::{EventQueryJsonStringFrame, NodeConfigCached, PerfOpts}; +use scylla::frame::response::cql_to_rust::FromRowError as ScyFromRowError; +use scylla::transport::errors::{NewSessionError as ScyNewSessionError, QueryError as ScyQueryError}; use std::net::SocketAddr; use std::pin::Pin; use tokio::io::AsyncWriteExt; @@ -16,6 +18,37 @@ use tokio::net::tcp::OwnedWriteHalf; use tokio::net::TcpStream; use tracing::Instrument; +trait ErrConv { + fn err_conv(self) -> Result; +} + +impl ErrConv for Result { + fn err_conv(self) -> Result { + match self { + Ok(k) => Ok(k), + Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))), + } + } +} + +impl ErrConv for Result { + fn err_conv(self) -> Result { + match self { + Ok(k) => Ok(k), + Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))), + } + } +} + +impl ErrConv for Result { + fn err_conv(self) -> Result { + match self { + Ok(k) => Ok(k), + Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))), + } + } +} + pub async fn events_service(node_config: NodeConfigCached) -> Result<(), Error> { let addr = format!("{}:{}", node_config.node.listen, node_config.node.port_raw); let lis = tokio::net::TcpListener::bind(addr).await?; @@ -117,6 +150,36 @@ async fn events_conn_handler_inner_try( }; debug!("--- got query evq {:?}", evq); + // TODO make scylla usage configurable in config + if evq.channel.backend == "scylla" { + // Find the "series" id. + let scy = scylla::SessionBuilder::new() + .known_node("sf-daqbuf-34:8340") + .build() + .await + .err_conv(); + let scy = match scy { + Ok(k) => k, + Err(e) => return Err((e, netout))?, + }; + let cql = "select dtype, series from series_by_channel where facility = ? and channel_name = ?"; + let res = scy.query(cql, ()).await.err_conv(); + let res = match res { + Ok(k) => k, + Err(e) => return Err((e, netout))?, + }; + let rows = res.rows_typed_or_empty::<(i32, i32)>(); + for r in rows { + let r = match r.err_conv() { + Ok(k) => k, + Err(e) => return Err((e, netout))?, + }; + info!("got row {r:?}"); + } + error!("TODO scylla fetch continue here"); + err::todo(); + } + let mut p1: Pin> + Send>> = if let Some(aa) = &node_config.node.channel_archiver { match archapp_wrap::archapp::archeng::pipe::make_event_pipe(&evq, node_config.clone(), aa.clone()).await {