Allow to fetch ts msps for a series

This commit is contained in:
Dominik Werder
2022-06-08 16:37:55 +02:00
parent 3cd1b7a640
commit 7063842c4c
5 changed files with 477 additions and 29 deletions

View File

@@ -35,3 +35,4 @@ nodenet = { path = "../nodenet" }
commonio = { path = "../commonio" }
taskrun = { path = "../taskrun" }
scylla = "0.4"
md-5 = "0.9"

View File

@@ -15,6 +15,8 @@ use scylla::frame::response::cql_to_rust::FromRowError as ScyFromRowError;
use scylla::transport::errors::{NewSessionError as ScyNewSessionError, QueryError as ScyQueryError};
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::convert::TryInto;
use std::time::{Duration, Instant};
use url::Url;
pub struct ChConf {
@@ -414,10 +416,203 @@ impl ScyllaChannelsWithType {
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ScyllaChannelEventSeriesIdQuery {
facility: String,
#[serde(rename = "channelName")]
channel_name: String,
#[serde(rename = "scalarType")]
scalar_type: ScalarType,
shape: Shape,
#[serde(rename = "doCreate", skip_serializing_if = "bool_false")]
do_create: bool,
}
fn bool_false(x: &bool) -> bool {
*x == false
}
impl FromUrl for ScyllaChannelEventSeriesIdQuery {
fn from_url(url: &Url) -> Result<Self, err::Error> {
let pairs = get_url_query_pairs(url);
let facility = pairs
.get("facility")
.ok_or_else(|| Error::with_public_msg_no_trace("missing facility"))?
.into();
let channel_name = pairs
.get("channelName")
.ok_or_else(|| Error::with_public_msg_no_trace("missing channelName"))?
.into();
let s = pairs
.get("scalarType")
.ok_or_else(|| Error::with_public_msg_no_trace("missing scalarType"))?;
let scalar_type: ScalarType = serde_json::from_str(&format!("\"{s}\""))?;
let s = pairs
.get("shape")
.ok_or_else(|| Error::with_public_msg_no_trace("missing shape"))?;
let shape = Shape::from_dims_str(s)?;
let do_create = pairs.get("doCreate").map_or("false", |x| x.as_str()) == "true";
Ok(Self {
facility,
channel_name,
scalar_type,
shape,
do_create,
})
}
}
#[derive(Clone, Debug, Serialize)]
pub struct ScyllaChannelEventSeriesIdResponse {
#[serde(rename = "seriesId")]
series: u64,
}
/**
Get the series-id for a channel identified by facility, channel name, scalar type, shape.
*/
pub struct ScyllaChannelEventSeriesId {}
impl ScyllaChannelEventSeriesId {
pub fn handler(req: &Request<Body>) -> Option<Self> {
if req.uri().path() == "/api/4/scylla/channel/events/seriesId" {
Some(Self {})
} else {
None
}
}
pub async fn handle(&self, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
if req.method() == Method::GET {
let accept_def = APP_JSON;
let accept = req
.headers()
.get(http::header::ACCEPT)
.map_or(accept_def, |k| k.to_str().unwrap_or(accept_def));
if accept == APP_JSON || accept == ACCEPT_ALL {
let url = Url::parse(&format!("dummy:{}", req.uri()))?;
let q = ScyllaChannelEventSeriesIdQuery::from_url(&url)?;
match self.get_series_id(&q, node_config).await {
Ok(k) => {
let body = Body::from(serde_json::to_vec(&k)?);
Ok(response(StatusCode::OK).body(body)?)
}
Err(e) => Ok(response(StatusCode::NO_CONTENT).body(Body::from(format!("{:?}", e.public_msg())))?),
}
} else {
Ok(response(StatusCode::BAD_REQUEST).body(Body::empty())?)
}
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
}
async fn get_series_id(
&self,
q: &ScyllaChannelEventSeriesIdQuery,
node_config: &NodeConfigCached,
) -> Result<ScyllaChannelEventSeriesIdResponse, Error> {
let dbconf = &node_config.node_config.cluster.database;
// TODO unify the database nodes
let uri = format!(
"postgresql://{}:{}@{}:{}/{}",
dbconf.user, dbconf.pass, dbconf.host, dbconf.port, dbconf.name
);
let (pg_client, conn) = tokio_postgres::connect(&uri, tokio_postgres::NoTls).await.err_conv()?;
// TODO monitor connection drop.
let _cjh = tokio::spawn(async move {
if let Err(e) = conn.await {
error!("connection error: {}", e);
}
Ok::<_, Error>(())
});
let res = pg_client
.query(
"select series from series_by_channel where facility = $1 and channel = $2 and scalar_type = $3 and shape_dims = $4 and agg_kind = 0",
&[&q.facility, &q.channel_name, &q.scalar_type.to_scylla_i32(), &q.shape.to_scylla_vec()],
)
.await
.err_conv()?;
if res.len() > 1 {
return Err(Error::with_msg_no_trace(format!("multiple series ids found")));
} else if res.len() == 1 {
let series = res[0].get::<_, i64>(0) as u64;
let ret = ScyllaChannelEventSeriesIdResponse { series };
Ok(ret)
} else if q.do_create == false {
return Err(Error::with_msg_no_trace(format!(
"series id not found for {}",
q.channel_name
)));
} else {
let tsbeg = Instant::now();
use md5::Digest;
let mut h = md5::Md5::new();
h.update(q.facility.as_bytes());
h.update(q.channel_name.as_bytes());
h.update(format!("{:?}", q.scalar_type).as_bytes());
h.update(format!("{:?}", q.shape).as_bytes());
for _ in 0..200 {
h.update(tsbeg.elapsed().subsec_nanos().to_ne_bytes());
let f = h.clone().finalize();
let mut series = u64::from_le_bytes(f.as_slice()[0..8].try_into().unwrap());
if series > i64::MAX as u64 {
series &= 0x7fffffffffffffff;
}
if series == 0 {
series = 1;
}
if series <= 0 || series > i64::MAX as u64 {
return Err(Error::with_msg_no_trace(format!(
"attempt to insert bad series id {series}"
)));
}
let res = pg_client
.execute(
concat!(
"insert into series_by_channel",
" (series, facility, channel, scalar_type, shape_dims, agg_kind)",
" values ($1, $2, $3, $4, $5, 0) on conflict do nothing"
),
&[
&(series as i64),
&q.facility,
&q.channel_name,
&q.scalar_type.to_scylla_i32(),
&q.shape.to_scylla_vec(),
],
)
.await
.unwrap();
if res == 1 {
let ret = ScyllaChannelEventSeriesIdResponse { series };
return Ok(ret);
} else {
warn!(
"tried to insert {series:?} for {} {} {:?} {:?} trying again...",
q.facility, q.channel_name, q.scalar_type, q.shape
);
}
tokio::time::sleep(Duration::from_millis(20)).await;
}
error!(
"tried to insert new series id for {} {} {:?} {:?} but failed",
q.facility, q.channel_name, q.scalar_type, q.shape
);
Err(Error::with_msg_no_trace(format!(
"get_series_id can not create and insert series id {:?} {:?} {:?} {:?}",
q.facility, q.channel_name, q.scalar_type, q.shape
)))
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ScyllaChannelsActiveQuery {
tsedge: u64,
#[serde(rename = "shapeKind")]
shape_kind: u32,
#[serde(rename = "scalarType")]
scalar_type: ScalarType,
}
@@ -429,12 +624,12 @@ impl FromUrl for ScyllaChannelsActiveQuery {
.ok_or_else(|| Error::with_public_msg_no_trace("missing tsedge"))?;
let tsedge: u64 = s.parse()?;
let s = pairs
.get("shape_kind")
.ok_or_else(|| Error::with_public_msg_no_trace("missing shape_kind"))?;
.get("shapeKind")
.ok_or_else(|| Error::with_public_msg_no_trace("missing shapeKind"))?;
let shape_kind: u32 = s.parse()?;
let s = pairs
.get("scalar_type")
.ok_or_else(|| Error::with_public_msg_no_trace("missing scalar_type"))?;
.get("scalarType")
.ok_or_else(|| Error::with_public_msg_no_trace("missing scalarType"))?;
let scalar_type: ScalarType = serde_json::from_str(&format!("\"{s}\""))?;
info!("parsed scalar type inp: {s:?} val: {scalar_type:?}");
Ok(Self {
@@ -515,15 +710,6 @@ impl ScyllaChannelsActive {
while let Some(row) = res.next().await {
let row = row.err_conv()?;
let (series,): (i64,) = row.into_typed().err_conv()?;
if series == 1254561075907984640 {
info!(
"FOUND ACTIVE series {} part {} tsedge {} scalar_type {}",
series,
part,
tsedge,
q.scalar_type.to_scylla_i32()
);
}
ret.push(series as u64);
}
}
@@ -531,8 +717,9 @@ impl ScyllaChannelsActive {
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Deserialize)]
pub struct ChannelFromSeriesQuery {
#[serde(rename = "seriesId")]
series: u64,
}
@@ -540,8 +727,8 @@ impl FromUrl for ChannelFromSeriesQuery {
fn from_url(url: &Url) -> Result<Self, err::Error> {
let pairs = get_url_query_pairs(url);
let s = pairs
.get("series")
.ok_or_else(|| Error::with_public_msg_no_trace("missing series"))?;
.get("seriesId")
.ok_or_else(|| Error::with_public_msg_no_trace("missing seriesId"))?;
let series: u64 = s.parse()?;
Ok(Self { series })
}
@@ -550,13 +737,20 @@ impl FromUrl for ChannelFromSeriesQuery {
#[derive(Clone, Debug, Serialize)]
pub struct ChannelFromSeriesResponse {
facility: String,
#[serde(rename = "channelName")]
channel: String,
#[serde(rename = "scalarType")]
scalar_type: ScalarType,
shape: Shape,
// TODO need a unique representation of the agg kind in the registry.
#[serde(skip_serializing_if = "u32_zero")]
agg_kind: u32,
}
fn u32_zero(x: &u32) -> bool {
*x == 0
}
pub struct ChannelFromSeries {}
impl ChannelFromSeries {
@@ -578,9 +772,14 @@ impl ChannelFromSeries {
if accept == APP_JSON || accept == ACCEPT_ALL {
let url = Url::parse(&format!("dummy:{}", req.uri()))?;
let q = ChannelFromSeriesQuery::from_url(&url)?;
let res = self.get_data(&q, node_config).await?;
let body = Body::from(serde_json::to_vec(&res)?);
Ok(response(StatusCode::OK).body(body)?)
match self.get_data(&q, node_config).await {
Ok(k) => {
let body = Body::from(serde_json::to_vec(&k)?);
Ok(response(StatusCode::OK).body(body)?)
}
Err(e) => Ok(response(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::from(format!("{:?}", e.public_msg())))?),
}
} else {
Ok(response(StatusCode::BAD_REQUEST).body(Body::empty())?)
}
@@ -600,7 +799,7 @@ impl ChannelFromSeries {
// TODO unify the database nodes
let uri = format!(
"postgresql://{}:{}@{}:{}/{}",
dbconf.user, dbconf.pass, dbconf.host, 5432, dbconf.name
dbconf.user, dbconf.pass, dbconf.host, dbconf.port, dbconf.name
);
let (pgclient, conn) = tokio_postgres::connect(&uri, tokio_postgres::NoTls).await.err_conv()?;
// TODO monitor connection drop.
@@ -642,19 +841,19 @@ impl ChannelFromSeries {
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Deserialize)]
pub struct IocForChannelQuery {
channel_regex: String,
channel: String,
}
impl FromUrl for IocForChannelQuery {
fn from_url(url: &Url) -> Result<Self, err::Error> {
let pairs = get_url_query_pairs(url);
let channel_regex = pairs
.get("channel_regex")
.ok_or_else(|| Error::with_public_msg_no_trace("missing channel_regex"))?
let channel = pairs
.get("channelName")
.ok_or_else(|| Error::with_public_msg_no_trace("missing channelName"))?
.into();
Ok(Self { channel_regex })
Ok(Self { channel })
}
}
@@ -710,3 +909,97 @@ impl IocForChannel {
Ok(ret)
}
}
#[derive(Clone, Debug, Deserialize)]
pub struct ScyllaSeriesTsMspQuery {
#[serde(rename = "seriesId")]
series: u64,
}
impl FromUrl for ScyllaSeriesTsMspQuery {
fn from_url(url: &Url) -> Result<Self, err::Error> {
let pairs = get_url_query_pairs(url);
let s = pairs
.get("seriesId")
.ok_or_else(|| Error::with_public_msg_no_trace("missing seriesId"))?;
let series: u64 = s.parse()?;
Ok(Self { series })
}
}
#[derive(Clone, Debug, Serialize)]
pub struct ScyllaSeriesTsMspResponse {
#[serde(rename = "tsMsps")]
ts_msps: Vec<u64>,
}
pub struct ScyllaSeriesTsMsp {}
impl ScyllaSeriesTsMsp {
pub fn handler(req: &Request<Body>) -> Option<Self> {
if req.uri().path() == "/api/4/scylla/series/tsMsps" {
Some(Self {})
} else {
None
}
}
pub async fn handle(&self, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
if req.method() == Method::GET {
let accept_def = APP_JSON;
let accept = req
.headers()
.get(http::header::ACCEPT)
.map_or(accept_def, |k| k.to_str().unwrap_or(accept_def));
if accept == APP_JSON || accept == ACCEPT_ALL {
let url = Url::parse(&format!("dummy:{}", req.uri()))?;
let q = ScyllaSeriesTsMspQuery::from_url(&url)?;
match self.get_ts_msps(&q, node_config).await {
Ok(k) => {
let body = Body::from(serde_json::to_vec(&k)?);
Ok(response(StatusCode::OK).body(body)?)
}
Err(e) => Ok(response(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::from(format!("{:?}", e.public_msg())))?),
}
} else {
Ok(response(StatusCode::BAD_REQUEST).body(Body::empty())?)
}
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
}
async fn get_ts_msps(
&self,
q: &ScyllaSeriesTsMspQuery,
node_config: &NodeConfigCached,
) -> Result<ScyllaSeriesTsMspResponse, Error> {
let scyco = node_config
.node_config
.cluster
.scylla
.as_ref()
.ok_or_else(|| Error::with_public_msg_no_trace(format!("No Scylla configured")))?;
let scy = scylla::SessionBuilder::new()
.known_nodes(&scyco.hosts)
.use_keyspace(&scyco.keyspace, true)
.default_consistency(Consistency::One)
.build()
.await
.err_conv()?;
let mut ts_msps = Vec::new();
let mut res = scy
.query_iter("select ts_msp from ts_msp where series = ?", (q.series as i64,))
.await
.err_conv()?;
use futures_util::StreamExt;
while let Some(row) = res.next().await {
let row = row.err_conv()?;
let (ts_msp,): (i64,) = row.into_typed().err_conv()?;
ts_msps.push(ts_msp as u64);
}
let ret = ScyllaSeriesTsMspResponse { ts_msps };
Ok(ret)
}
}

View File

@@ -225,6 +225,8 @@ async fn http_service_try(req: Request<Body>, node_config: &NodeConfigCached) ->
}
} else if let Some(h) = channelconfig::ChannelConfigHandler::handler(&req) {
h.handle(req, &node_config).await
} else if let Some(h) = channelconfig::ScyllaChannelEventSeriesId::handler(&req) {
h.handle(req, &node_config).await
} else if let Some(h) = channelconfig::ScyllaConfigsHisto::handler(&req) {
h.handle(req, &node_config).await
} else if let Some(h) = channelconfig::ScyllaChannelsWithType::handler(&req) {
@@ -233,6 +235,8 @@ async fn http_service_try(req: Request<Body>, node_config: &NodeConfigCached) ->
h.handle(req, &node_config).await
} else if let Some(h) = channelconfig::ScyllaChannelsActive::handler(&req) {
h.handle(req, &node_config).await
} else if let Some(h) = channelconfig::ScyllaSeriesTsMsp::handler(&req) {
h.handle(req, &node_config).await
} else if let Some(h) = channelconfig::ChannelFromSeries::handler(&req) {
h.handle(req, &node_config).await
} else if let Some(h) = events::EventsHandler::handler(&req) {

View File

@@ -691,12 +691,123 @@ pub struct ChannelConfig {
}
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Serialize, Deserialize)]
pub enum ShapeOld {
Scalar,
Wave(u32),
Image(u32, u32),
}
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug)]
pub enum Shape {
Scalar,
Wave(u32),
Image(u32, u32),
}
impl Serialize for Shape {
fn serialize<S: serde::Serializer>(&self, ser: S) -> Result<S::Ok, S::Error>
where
S::Error: serde::ser::Error,
{
use Shape::*;
match self {
Scalar => ser.collect_seq([0u32; 0].iter()),
Wave(a) => ser.collect_seq([*a].iter()),
Image(a, b) => ser.collect_seq([*a, *b].iter()),
}
}
}
struct ShapeVis;
impl<'de> serde::de::Visitor<'de> for ShapeVis {
type Value = Shape;
fn expecting(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.write_str("a string describing the Shape variant")
}
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
info!("visit_str {v}");
if v == "Scalar" {
Ok(Shape::Scalar)
} else {
Err(E::custom(format!("unexpected value: {v:?}")))
}
}
fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
where
A: serde::de::MapAccess<'de>,
{
use serde::de::Error;
while let Some(key) = map.next_key::<String>()? {
info!("See key {key:?}");
return if key == "Wave" {
let n: u32 = map.next_value()?;
Ok(Shape::Wave(n))
} else if key == "Image" {
let a = map.next_value::<[u32; 2]>()?;
Ok(Shape::Image(a[0], a[1]))
} else {
Err(A::Error::custom(format!("unexpected key {key:?}")))
};
}
Err(A::Error::custom(format!("invalid shape format")))
}
fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
where
A: serde::de::SeqAccess<'de>,
{
info!("visit_seq");
let mut a = vec![];
while let Some(item) = seq.next_element()? {
let n: u32 = item;
a.push(n);
}
if a.len() == 0 {
Ok(Shape::Scalar)
} else if a.len() == 1 {
Ok(Shape::Wave(a[0]))
} else if a.len() == 2 {
Ok(Shape::Image(a[0], a[1]))
} else {
use serde::de::Error;
Err(A::Error::custom(format!("bad shape")))
}
}
}
impl<'de> Deserialize<'de> for Shape {
fn deserialize<D>(de: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
de.deserialize_any(ShapeVis)
/*
// TODO can not clone.. how to try the alternatives?
match de.deserialize_str(ShapeVis) {
Ok(k) => {
info!("De worked first try: {k:?}");
Ok(k)
}
Err(_) => {
let ret = match <ShapeOld as Deserialize<'de>>::deserialize(de)? {
ShapeOld::Scalar => Shape::Scalar,
ShapeOld::Wave(a) => Shape::Wave(a),
ShapeOld::Image(a, b) => Shape::Image(a, b),
};
Ok(ret)
}
}
*/
}
}
impl Shape {
pub fn from_bsread_jsval(v: &JsVal) -> Result<Shape, Error> {
match v {
@@ -814,6 +925,34 @@ impl Shape {
}
}
#[test]
fn test_shape_serde() {
let s = serde_json::to_string(&Shape::Image(42, 43)).unwrap();
assert_eq!(s, r#"[42,43]"#);
let s = serde_json::to_string(&ShapeOld::Scalar).unwrap();
assert_eq!(s, r#""Scalar""#);
let s = serde_json::to_string(&ShapeOld::Wave(8)).unwrap();
assert_eq!(s, r#"{"Wave":8}"#);
let s = serde_json::to_string(&ShapeOld::Image(42, 43)).unwrap();
assert_eq!(s, r#"{"Image":[42,43]}"#);
let s = serde_json::from_str::<ShapeOld>(r#""Scalar""#).unwrap();
assert_eq!(s, ShapeOld::Scalar);
let s = serde_json::from_str::<ShapeOld>(r#"{"Wave": 123}"#).unwrap();
assert_eq!(s, ShapeOld::Wave(123));
let s = serde_json::from_str::<ShapeOld>(r#"{"Image":[77, 78]}"#).unwrap();
assert_eq!(s, ShapeOld::Image(77, 78));
let s = serde_json::from_str::<Shape>(r#"[]"#).unwrap();
assert_eq!(s, Shape::Scalar);
let s = serde_json::from_str::<Shape>(r#"[12]"#).unwrap();
assert_eq!(s, Shape::Wave(12));
let s = serde_json::from_str::<Shape>(r#"[12, 13]"#).unwrap();
assert_eq!(s, Shape::Image(12, 13));
let s = serde_json::from_str::<Shape>(r#""Scalar""#).unwrap();
assert_eq!(s, Shape::Scalar);
let s = serde_json::from_str::<Shape>(r#"{"Wave":55}"#).unwrap();
assert_eq!(s, Shape::Wave(55));
}
pub trait HasShape {
fn shape(&self) -> Shape;
}

View File

@@ -334,12 +334,23 @@ macro_rules! read_next_scalar_values {
type ST = $st;
type SCYTY = $scyty;
info!("{} series {} ts_msp {}", stringify!($fname), series, ts_msp);
let _ts_lsp_max = if range.end <= ts_msp {
// TODO we should not be here...
} else {
};
if range.end > i64::MAX as u64 {
return Err(Error::with_msg_no_trace(format!("range.end overflows i64")));
}
let ts_lsp_max = range.end;
let cql = concat!(
"select ts_lsp, pulse, value from ",
$table_name,
" where series = ? and ts_msp = ?"
" where series = ? and ts_msp = ? and ts_lsp < ?"
);
let res = scy.query(cql, (series, ts_msp as i64)).await.err_conv()?;
let res = scy
.query(cql, (series, ts_msp as i64, ts_lsp_max as i64))
.await
.err_conv()?;
let mut ret = ScalarEvents::<ST>::empty();
let mut discarded = 0;
for row in res.rows_typed_or_empty::<(i64, i64, SCYTY)>() {
@@ -369,7 +380,7 @@ macro_rules! read_next_array_values {
async fn $fname(
series: i64,
ts_msp: u64,
range: NanoRange,
_range: NanoRange,
scy: Arc<ScySession>,
) -> Result<WaveEvents<$st>, Error> {
type ST = $st;