Add config query for scylla
This commit is contained in:
@@ -33,3 +33,4 @@ archapp_wrap = { path = "../archapp_wrap" }
|
||||
nodenet = { path = "../nodenet" }
|
||||
commonio = { path = "../commonio" }
|
||||
taskrun = { path = "../taskrun" }
|
||||
scylla = "0.4"
|
||||
|
||||
@@ -3,9 +3,11 @@ use crate::{response, ToPublicResponse};
|
||||
use http::{Method, Request, Response, StatusCode};
|
||||
use hyper::Body;
|
||||
use netpod::log::*;
|
||||
use netpod::NodeConfigCached;
|
||||
use netpod::{ChannelConfigQuery, FromUrl};
|
||||
use netpod::{ChannelConfigQuery, FromUrl, ScalarType, Shape};
|
||||
use netpod::{ChannelConfigResponse, NodeConfigCached};
|
||||
use netpod::{ACCEPT_ALL, APP_JSON};
|
||||
use scylla::frame::response::cql_to_rust::FromRowError as ScyFromRowError;
|
||||
use scylla::transport::errors::{NewSessionError as ScyNewSessionError, QueryError as ScyQueryError};
|
||||
use url::Url;
|
||||
|
||||
pub struct ChannelConfigHandler {}
|
||||
@@ -43,11 +45,65 @@ impl ChannelConfigHandler {
|
||||
}
|
||||
}
|
||||
|
||||
trait ErrConv<T> {
|
||||
fn err_conv(self) -> Result<T, Error>;
|
||||
}
|
||||
|
||||
impl<T> ErrConv<T> for Result<T, ScyQueryError> {
|
||||
fn err_conv(self) -> Result<T, Error> {
|
||||
match self {
|
||||
Ok(k) => Ok(k),
|
||||
Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> ErrConv<T> for Result<T, ScyNewSessionError> {
|
||||
fn err_conv(self) -> Result<T, Error> {
|
||||
match self {
|
||||
Ok(k) => Ok(k),
|
||||
Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> ErrConv<T> for Result<T, ScyFromRowError> {
|
||||
fn err_conv(self) -> Result<T, Error> {
|
||||
match self {
|
||||
Ok(k) => Ok(k),
|
||||
Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn channel_config(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
|
||||
info!("channel_config");
|
||||
let url = Url::parse(&format!("dummy:{}", req.uri()))?;
|
||||
//let pairs = get_url_query_pairs(&url);
|
||||
let q = ChannelConfigQuery::from_url(&url)?;
|
||||
let conf = if let Some(conf) = &node_config.node.channel_archiver {
|
||||
info!("channel_config for q {q:?}");
|
||||
let conf = if q.channel.backend == "scylla" {
|
||||
// Find the "series" id.
|
||||
let scy = scylla::SessionBuilder::new()
|
||||
.known_node("sf-daqbuf-34:8340")
|
||||
.build()
|
||||
.await
|
||||
.err_conv()?;
|
||||
let cql = "select dtype, series from series_by_channel where facility = ? and channel_name = ?";
|
||||
let res = scy.query(cql, ()).await.err_conv()?;
|
||||
let rows = res.rows_typed_or_empty::<(i32, i32)>();
|
||||
for r in rows {
|
||||
let r = r.err_conv()?;
|
||||
info!("got row {r:?}");
|
||||
}
|
||||
let res = ChannelConfigResponse {
|
||||
channel: q.channel,
|
||||
scalar_type: ScalarType::F32,
|
||||
byte_order: None,
|
||||
shape: Shape::Scalar,
|
||||
};
|
||||
res
|
||||
} else if let Some(conf) = &node_config.node.channel_archiver {
|
||||
archapp_wrap::archapp::archeng::channel_config_from_db(&q, conf, &node_config.node_config.cluster.database)
|
||||
.await?
|
||||
} else if let Some(conf) = &node_config.node.archiver_appliance {
|
||||
|
||||
@@ -68,9 +68,10 @@ async fn plain_events_binary(req: Request<Body>, node_config: &NodeConfigCached)
|
||||
}
|
||||
|
||||
async fn plain_events_json(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
|
||||
debug!("httpret plain_events_json req: {:?}", req);
|
||||
info!("httpret plain_events_json req: {:?}", req);
|
||||
let (head, _body) = req.into_parts();
|
||||
let query = PlainEventsJsonQuery::from_request_head(&head)?;
|
||||
|
||||
let op = disk::channelexec::PlainEventsJson::new(
|
||||
query.channel().clone(),
|
||||
query.range().clone(),
|
||||
|
||||
@@ -206,7 +206,8 @@ async fn http_service_try(req: Request<Body>, node_config: &NodeConfigCached) ->
|
||||
let ret = serde_json::json!({
|
||||
"data_api_version": {
|
||||
"major": 4u32,
|
||||
"minor": 0u32,
|
||||
"minor": 1u32,
|
||||
"patch": 0u32,
|
||||
},
|
||||
});
|
||||
Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&ret)?))?)
|
||||
@@ -305,6 +306,8 @@ async fn http_service_try(req: Request<Body>, node_config: &NodeConfigCached) ->
|
||||
h.handle(req, &node_config).await
|
||||
} else if let Some(h) = evinfo::EventInfoScan::handler(&req) {
|
||||
h.handle(req, &node_config).await
|
||||
} else if let Some(h) = pulsemap::MapPulseScyllaHandler::handler(&req) {
|
||||
h.handle(req, &node_config).await
|
||||
} else if let Some(h) = pulsemap::IndexFullHttpFunction::handler(&req) {
|
||||
h.handle(req, &node_config).await
|
||||
} else if let Some(h) = pulsemap::MarkClosedHttpFunction::handler(&req) {
|
||||
|
||||
@@ -478,6 +478,84 @@ struct LocalMap {
|
||||
channels: Vec<String>,
|
||||
}
|
||||
|
||||
pub trait ErrConv<T> {
|
||||
fn err_conv(self) -> Result<T, err::Error>;
|
||||
}
|
||||
|
||||
impl<T> ErrConv<T> for Result<T, scylla::transport::errors::NewSessionError> {
|
||||
fn err_conv(self) -> Result<T, err::Error> {
|
||||
self.map_err(|e| err::Error::with_msg_no_trace(format!("{e:?}")))
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> ErrConv<T> for Result<T, scylla::transport::errors::QueryError> {
|
||||
fn err_conv(self) -> Result<T, err::Error> {
|
||||
self.map_err(|e| err::Error::with_msg_no_trace(format!("{e:?}")))
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> ErrConv<T> for Result<T, scylla::transport::query_result::RowsExpectedError> {
|
||||
fn err_conv(self) -> Result<T, err::Error> {
|
||||
self.map_err(|e| err::Error::with_msg_no_trace(format!("{e:?}")))
|
||||
}
|
||||
}
|
||||
|
||||
pub struct MapPulseScyllaHandler {}
|
||||
|
||||
impl MapPulseScyllaHandler {
|
||||
pub fn prefix() -> &'static str {
|
||||
"/api/4/scylla/map/pulse/"
|
||||
}
|
||||
|
||||
pub fn handler(req: &Request<Body>) -> Option<Self> {
|
||||
if req.uri().path().starts_with(Self::prefix()) {
|
||||
Some(Self {})
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn handle(&self, req: Request<Body>, _node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
|
||||
if req.method() != Method::GET {
|
||||
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?);
|
||||
}
|
||||
let urls = format!("dummy://{}", req.uri());
|
||||
let url = url::Url::parse(&urls)?;
|
||||
let query = MapPulseQuery::from_url(&url)?;
|
||||
let pulse = query.pulse;
|
||||
let scy = scylla::SessionBuilder::new()
|
||||
.known_node("sf-daqbuf-34:19042")
|
||||
.default_consistency(scylla::batch::Consistency::One)
|
||||
.use_keyspace("ks1", false)
|
||||
.build()
|
||||
.await
|
||||
.err_conv()?;
|
||||
let pulse_a = (pulse >> 14) as i64;
|
||||
let pulse_b = (pulse & 0x3fff) as i32;
|
||||
let res = scy
|
||||
.query(
|
||||
"select ts_a, ts_b from pulse where pulse_a = ? and pulse_b = ?",
|
||||
(pulse_a, pulse_b),
|
||||
)
|
||||
.await
|
||||
.err_conv()?;
|
||||
let rows = res.rows().err_conv()?;
|
||||
let mut tss = vec![];
|
||||
let mut channels = vec![];
|
||||
use scylla::frame::response::result::CqlValue;
|
||||
let ts_a_def = CqlValue::BigInt(0);
|
||||
let ts_b_def = CqlValue::Int(0);
|
||||
for row in rows {
|
||||
let ts_a = row.columns[0].as_ref().unwrap_or(&ts_a_def).as_bigint().unwrap_or(0) as u64;
|
||||
let ts_b = row.columns[1].as_ref().unwrap_or(&ts_b_def).as_int().unwrap_or(0) as u32 as u64;
|
||||
tss.push(ts_a * netpod::timeunits::SEC + ts_b);
|
||||
channels.push("scylla".into());
|
||||
}
|
||||
let ret = LocalMap { pulse, tss, channels };
|
||||
Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&ret)?))?)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct MapPulseLocalHttpFunction {}
|
||||
|
||||
impl MapPulseLocalHttpFunction {
|
||||
|
||||
@@ -116,6 +116,8 @@ impl ScalarType {
|
||||
"int64" => I64,
|
||||
"float" => F32,
|
||||
"double" => F64,
|
||||
"float32" => F32,
|
||||
"float64" => F64,
|
||||
"string" => STRING,
|
||||
_ => {
|
||||
return Err(Error::with_msg_no_trace(format!(
|
||||
|
||||
@@ -20,6 +20,7 @@ futures-core = "0.3.14"
|
||||
futures-util = "0.3.14"
|
||||
tracing = "0.1.25"
|
||||
hex = "0.4.3"
|
||||
scylla = "0.4.4"
|
||||
err = { path = "../err" }
|
||||
netpod = { path = "../netpod" }
|
||||
disk = { path = "../disk" }
|
||||
|
||||
@@ -9,6 +9,8 @@ use netpod::histo::HistoLog2;
|
||||
use netpod::query::RawEventsQuery;
|
||||
use netpod::{log::*, AggKind};
|
||||
use netpod::{EventQueryJsonStringFrame, NodeConfigCached, PerfOpts};
|
||||
use scylla::frame::response::cql_to_rust::FromRowError as ScyFromRowError;
|
||||
use scylla::transport::errors::{NewSessionError as ScyNewSessionError, QueryError as ScyQueryError};
|
||||
use std::net::SocketAddr;
|
||||
use std::pin::Pin;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
@@ -16,6 +18,37 @@ use tokio::net::tcp::OwnedWriteHalf;
|
||||
use tokio::net::TcpStream;
|
||||
use tracing::Instrument;
|
||||
|
||||
trait ErrConv<T> {
|
||||
fn err_conv(self) -> Result<T, Error>;
|
||||
}
|
||||
|
||||
impl<T> ErrConv<T> for Result<T, ScyQueryError> {
|
||||
fn err_conv(self) -> Result<T, Error> {
|
||||
match self {
|
||||
Ok(k) => Ok(k),
|
||||
Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> ErrConv<T> for Result<T, ScyNewSessionError> {
|
||||
fn err_conv(self) -> Result<T, Error> {
|
||||
match self {
|
||||
Ok(k) => Ok(k),
|
||||
Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> ErrConv<T> for Result<T, ScyFromRowError> {
|
||||
fn err_conv(self) -> Result<T, Error> {
|
||||
match self {
|
||||
Ok(k) => Ok(k),
|
||||
Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn events_service(node_config: NodeConfigCached) -> Result<(), Error> {
|
||||
let addr = format!("{}:{}", node_config.node.listen, node_config.node.port_raw);
|
||||
let lis = tokio::net::TcpListener::bind(addr).await?;
|
||||
@@ -117,6 +150,36 @@ async fn events_conn_handler_inner_try(
|
||||
};
|
||||
debug!("--- got query evq {:?}", evq);
|
||||
|
||||
// TODO make scylla usage configurable in config
|
||||
if evq.channel.backend == "scylla" {
|
||||
// Find the "series" id.
|
||||
let scy = scylla::SessionBuilder::new()
|
||||
.known_node("sf-daqbuf-34:8340")
|
||||
.build()
|
||||
.await
|
||||
.err_conv();
|
||||
let scy = match scy {
|
||||
Ok(k) => k,
|
||||
Err(e) => return Err((e, netout))?,
|
||||
};
|
||||
let cql = "select dtype, series from series_by_channel where facility = ? and channel_name = ?";
|
||||
let res = scy.query(cql, ()).await.err_conv();
|
||||
let res = match res {
|
||||
Ok(k) => k,
|
||||
Err(e) => return Err((e, netout))?,
|
||||
};
|
||||
let rows = res.rows_typed_or_empty::<(i32, i32)>();
|
||||
for r in rows {
|
||||
let r = match r.err_conv() {
|
||||
Ok(k) => k,
|
||||
Err(e) => return Err((e, netout))?,
|
||||
};
|
||||
info!("got row {r:?}");
|
||||
}
|
||||
error!("TODO scylla fetch continue here");
|
||||
err::todo();
|
||||
}
|
||||
|
||||
let mut p1: Pin<Box<dyn Stream<Item = Box<dyn Framable>> + Send>> =
|
||||
if let Some(aa) = &node_config.node.channel_archiver {
|
||||
match archapp_wrap::archapp::archeng::pipe::make_event_pipe(&evq, node_config.clone(), aa.clone()).await {
|
||||
|
||||
Reference in New Issue
Block a user