diff --git a/httpret/Cargo.toml b/httpret/Cargo.toml index fc5eb36..fe18de3 100644 --- a/httpret/Cargo.toml +++ b/httpret/Cargo.toml @@ -35,3 +35,4 @@ nodenet = { path = "../nodenet" } commonio = { path = "../commonio" } taskrun = { path = "../taskrun" } scylla = "0.4" +md-5 = "0.9" diff --git a/httpret/src/channelconfig.rs b/httpret/src/channelconfig.rs index 25f3822..9abc71f 100644 --- a/httpret/src/channelconfig.rs +++ b/httpret/src/channelconfig.rs @@ -15,6 +15,8 @@ use scylla::frame::response::cql_to_rust::FromRowError as ScyFromRowError; use scylla::transport::errors::{NewSessionError as ScyNewSessionError, QueryError as ScyQueryError}; use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; +use std::convert::TryInto; +use std::time::{Duration, Instant}; use url::Url; pub struct ChConf { @@ -414,10 +416,203 @@ impl ScyllaChannelsWithType { } } +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ScyllaChannelEventSeriesIdQuery { + facility: String, + #[serde(rename = "channelName")] + channel_name: String, + #[serde(rename = "scalarType")] + scalar_type: ScalarType, + shape: Shape, + #[serde(rename = "doCreate", skip_serializing_if = "bool_false")] + do_create: bool, +} + +fn bool_false(x: &bool) -> bool { + *x == false +} + +impl FromUrl for ScyllaChannelEventSeriesIdQuery { + fn from_url(url: &Url) -> Result { + let pairs = get_url_query_pairs(url); + let facility = pairs + .get("facility") + .ok_or_else(|| Error::with_public_msg_no_trace("missing facility"))? + .into(); + let channel_name = pairs + .get("channelName") + .ok_or_else(|| Error::with_public_msg_no_trace("missing channelName"))? + .into(); + let s = pairs + .get("scalarType") + .ok_or_else(|| Error::with_public_msg_no_trace("missing scalarType"))?; + let scalar_type: ScalarType = serde_json::from_str(&format!("\"{s}\""))?; + let s = pairs + .get("shape") + .ok_or_else(|| Error::with_public_msg_no_trace("missing shape"))?; + let shape = Shape::from_dims_str(s)?; + let do_create = pairs.get("doCreate").map_or("false", |x| x.as_str()) == "true"; + Ok(Self { + facility, + channel_name, + scalar_type, + shape, + do_create, + }) + } +} + +#[derive(Clone, Debug, Serialize)] +pub struct ScyllaChannelEventSeriesIdResponse { + #[serde(rename = "seriesId")] + series: u64, +} + +/** +Get the series-id for a channel identified by facility, channel name, scalar type, shape. +*/ +pub struct ScyllaChannelEventSeriesId {} + +impl ScyllaChannelEventSeriesId { + pub fn handler(req: &Request) -> Option { + if req.uri().path() == "/api/4/scylla/channel/events/seriesId" { + Some(Self {}) + } else { + None + } + } + + pub async fn handle(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { + if req.method() == Method::GET { + let accept_def = APP_JSON; + let accept = req + .headers() + .get(http::header::ACCEPT) + .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); + if accept == APP_JSON || accept == ACCEPT_ALL { + let url = Url::parse(&format!("dummy:{}", req.uri()))?; + let q = ScyllaChannelEventSeriesIdQuery::from_url(&url)?; + match self.get_series_id(&q, node_config).await { + Ok(k) => { + let body = Body::from(serde_json::to_vec(&k)?); + Ok(response(StatusCode::OK).body(body)?) + } + Err(e) => Ok(response(StatusCode::NO_CONTENT).body(Body::from(format!("{:?}", e.public_msg())))?), + } + } else { + Ok(response(StatusCode::BAD_REQUEST).body(Body::empty())?) + } + } else { + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) + } + } + + async fn get_series_id( + &self, + q: &ScyllaChannelEventSeriesIdQuery, + node_config: &NodeConfigCached, + ) -> Result { + let dbconf = &node_config.node_config.cluster.database; + // TODO unify the database nodes + let uri = format!( + "postgresql://{}:{}@{}:{}/{}", + dbconf.user, dbconf.pass, dbconf.host, dbconf.port, dbconf.name + ); + let (pg_client, conn) = tokio_postgres::connect(&uri, tokio_postgres::NoTls).await.err_conv()?; + // TODO monitor connection drop. + let _cjh = tokio::spawn(async move { + if let Err(e) = conn.await { + error!("connection error: {}", e); + } + Ok::<_, Error>(()) + }); + let res = pg_client + .query( + "select series from series_by_channel where facility = $1 and channel = $2 and scalar_type = $3 and shape_dims = $4 and agg_kind = 0", + &[&q.facility, &q.channel_name, &q.scalar_type.to_scylla_i32(), &q.shape.to_scylla_vec()], + ) + .await + .err_conv()?; + if res.len() > 1 { + return Err(Error::with_msg_no_trace(format!("multiple series ids found"))); + } else if res.len() == 1 { + let series = res[0].get::<_, i64>(0) as u64; + let ret = ScyllaChannelEventSeriesIdResponse { series }; + Ok(ret) + } else if q.do_create == false { + return Err(Error::with_msg_no_trace(format!( + "series id not found for {}", + q.channel_name + ))); + } else { + let tsbeg = Instant::now(); + use md5::Digest; + let mut h = md5::Md5::new(); + h.update(q.facility.as_bytes()); + h.update(q.channel_name.as_bytes()); + h.update(format!("{:?}", q.scalar_type).as_bytes()); + h.update(format!("{:?}", q.shape).as_bytes()); + for _ in 0..200 { + h.update(tsbeg.elapsed().subsec_nanos().to_ne_bytes()); + let f = h.clone().finalize(); + let mut series = u64::from_le_bytes(f.as_slice()[0..8].try_into().unwrap()); + if series > i64::MAX as u64 { + series &= 0x7fffffffffffffff; + } + if series == 0 { + series = 1; + } + if series <= 0 || series > i64::MAX as u64 { + return Err(Error::with_msg_no_trace(format!( + "attempt to insert bad series id {series}" + ))); + } + let res = pg_client + .execute( + concat!( + "insert into series_by_channel", + " (series, facility, channel, scalar_type, shape_dims, agg_kind)", + " values ($1, $2, $3, $4, $5, 0) on conflict do nothing" + ), + &[ + &(series as i64), + &q.facility, + &q.channel_name, + &q.scalar_type.to_scylla_i32(), + &q.shape.to_scylla_vec(), + ], + ) + .await + .unwrap(); + if res == 1 { + let ret = ScyllaChannelEventSeriesIdResponse { series }; + return Ok(ret); + } else { + warn!( + "tried to insert {series:?} for {} {} {:?} {:?} trying again...", + q.facility, q.channel_name, q.scalar_type, q.shape + ); + } + tokio::time::sleep(Duration::from_millis(20)).await; + } + error!( + "tried to insert new series id for {} {} {:?} {:?} but failed", + q.facility, q.channel_name, q.scalar_type, q.shape + ); + Err(Error::with_msg_no_trace(format!( + "get_series_id can not create and insert series id {:?} {:?} {:?} {:?}", + q.facility, q.channel_name, q.scalar_type, q.shape + ))) + } + } +} + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct ScyllaChannelsActiveQuery { tsedge: u64, + #[serde(rename = "shapeKind")] shape_kind: u32, + #[serde(rename = "scalarType")] scalar_type: ScalarType, } @@ -429,12 +624,12 @@ impl FromUrl for ScyllaChannelsActiveQuery { .ok_or_else(|| Error::with_public_msg_no_trace("missing tsedge"))?; let tsedge: u64 = s.parse()?; let s = pairs - .get("shape_kind") - .ok_or_else(|| Error::with_public_msg_no_trace("missing shape_kind"))?; + .get("shapeKind") + .ok_or_else(|| Error::with_public_msg_no_trace("missing shapeKind"))?; let shape_kind: u32 = s.parse()?; let s = pairs - .get("scalar_type") - .ok_or_else(|| Error::with_public_msg_no_trace("missing scalar_type"))?; + .get("scalarType") + .ok_or_else(|| Error::with_public_msg_no_trace("missing scalarType"))?; let scalar_type: ScalarType = serde_json::from_str(&format!("\"{s}\""))?; info!("parsed scalar type inp: {s:?} val: {scalar_type:?}"); Ok(Self { @@ -515,15 +710,6 @@ impl ScyllaChannelsActive { while let Some(row) = res.next().await { let row = row.err_conv()?; let (series,): (i64,) = row.into_typed().err_conv()?; - if series == 1254561075907984640 { - info!( - "FOUND ACTIVE series {} part {} tsedge {} scalar_type {}", - series, - part, - tsedge, - q.scalar_type.to_scylla_i32() - ); - } ret.push(series as u64); } } @@ -531,8 +717,9 @@ impl ScyllaChannelsActive { } } -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Deserialize)] pub struct ChannelFromSeriesQuery { + #[serde(rename = "seriesId")] series: u64, } @@ -540,8 +727,8 @@ impl FromUrl for ChannelFromSeriesQuery { fn from_url(url: &Url) -> Result { let pairs = get_url_query_pairs(url); let s = pairs - .get("series") - .ok_or_else(|| Error::with_public_msg_no_trace("missing series"))?; + .get("seriesId") + .ok_or_else(|| Error::with_public_msg_no_trace("missing seriesId"))?; let series: u64 = s.parse()?; Ok(Self { series }) } @@ -550,13 +737,20 @@ impl FromUrl for ChannelFromSeriesQuery { #[derive(Clone, Debug, Serialize)] pub struct ChannelFromSeriesResponse { facility: String, + #[serde(rename = "channelName")] channel: String, + #[serde(rename = "scalarType")] scalar_type: ScalarType, shape: Shape, // TODO need a unique representation of the agg kind in the registry. + #[serde(skip_serializing_if = "u32_zero")] agg_kind: u32, } +fn u32_zero(x: &u32) -> bool { + *x == 0 +} + pub struct ChannelFromSeries {} impl ChannelFromSeries { @@ -578,9 +772,14 @@ impl ChannelFromSeries { if accept == APP_JSON || accept == ACCEPT_ALL { let url = Url::parse(&format!("dummy:{}", req.uri()))?; let q = ChannelFromSeriesQuery::from_url(&url)?; - let res = self.get_data(&q, node_config).await?; - let body = Body::from(serde_json::to_vec(&res)?); - Ok(response(StatusCode::OK).body(body)?) + match self.get_data(&q, node_config).await { + Ok(k) => { + let body = Body::from(serde_json::to_vec(&k)?); + Ok(response(StatusCode::OK).body(body)?) + } + Err(e) => Ok(response(StatusCode::INTERNAL_SERVER_ERROR) + .body(Body::from(format!("{:?}", e.public_msg())))?), + } } else { Ok(response(StatusCode::BAD_REQUEST).body(Body::empty())?) } @@ -600,7 +799,7 @@ impl ChannelFromSeries { // TODO unify the database nodes let uri = format!( "postgresql://{}:{}@{}:{}/{}", - dbconf.user, dbconf.pass, dbconf.host, 5432, dbconf.name + dbconf.user, dbconf.pass, dbconf.host, dbconf.port, dbconf.name ); let (pgclient, conn) = tokio_postgres::connect(&uri, tokio_postgres::NoTls).await.err_conv()?; // TODO monitor connection drop. @@ -642,19 +841,19 @@ impl ChannelFromSeries { } } -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Deserialize)] pub struct IocForChannelQuery { - channel_regex: String, + channel: String, } impl FromUrl for IocForChannelQuery { fn from_url(url: &Url) -> Result { let pairs = get_url_query_pairs(url); - let channel_regex = pairs - .get("channel_regex") - .ok_or_else(|| Error::with_public_msg_no_trace("missing channel_regex"))? + let channel = pairs + .get("channelName") + .ok_or_else(|| Error::with_public_msg_no_trace("missing channelName"))? .into(); - Ok(Self { channel_regex }) + Ok(Self { channel }) } } @@ -710,3 +909,97 @@ impl IocForChannel { Ok(ret) } } + +#[derive(Clone, Debug, Deserialize)] +pub struct ScyllaSeriesTsMspQuery { + #[serde(rename = "seriesId")] + series: u64, +} + +impl FromUrl for ScyllaSeriesTsMspQuery { + fn from_url(url: &Url) -> Result { + let pairs = get_url_query_pairs(url); + let s = pairs + .get("seriesId") + .ok_or_else(|| Error::with_public_msg_no_trace("missing seriesId"))?; + let series: u64 = s.parse()?; + Ok(Self { series }) + } +} + +#[derive(Clone, Debug, Serialize)] +pub struct ScyllaSeriesTsMspResponse { + #[serde(rename = "tsMsps")] + ts_msps: Vec, +} + +pub struct ScyllaSeriesTsMsp {} + +impl ScyllaSeriesTsMsp { + pub fn handler(req: &Request) -> Option { + if req.uri().path() == "/api/4/scylla/series/tsMsps" { + Some(Self {}) + } else { + None + } + } + + pub async fn handle(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { + if req.method() == Method::GET { + let accept_def = APP_JSON; + let accept = req + .headers() + .get(http::header::ACCEPT) + .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); + if accept == APP_JSON || accept == ACCEPT_ALL { + let url = Url::parse(&format!("dummy:{}", req.uri()))?; + let q = ScyllaSeriesTsMspQuery::from_url(&url)?; + match self.get_ts_msps(&q, node_config).await { + Ok(k) => { + let body = Body::from(serde_json::to_vec(&k)?); + Ok(response(StatusCode::OK).body(body)?) + } + Err(e) => Ok(response(StatusCode::INTERNAL_SERVER_ERROR) + .body(Body::from(format!("{:?}", e.public_msg())))?), + } + } else { + Ok(response(StatusCode::BAD_REQUEST).body(Body::empty())?) + } + } else { + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) + } + } + + async fn get_ts_msps( + &self, + q: &ScyllaSeriesTsMspQuery, + node_config: &NodeConfigCached, + ) -> Result { + let scyco = node_config + .node_config + .cluster + .scylla + .as_ref() + .ok_or_else(|| Error::with_public_msg_no_trace(format!("No Scylla configured")))?; + let scy = scylla::SessionBuilder::new() + .known_nodes(&scyco.hosts) + .use_keyspace(&scyco.keyspace, true) + .default_consistency(Consistency::One) + .build() + .await + .err_conv()?; + let mut ts_msps = Vec::new(); + let mut res = scy + .query_iter("select ts_msp from ts_msp where series = ?", (q.series as i64,)) + .await + .err_conv()?; + use futures_util::StreamExt; + while let Some(row) = res.next().await { + let row = row.err_conv()?; + let (ts_msp,): (i64,) = row.into_typed().err_conv()?; + ts_msps.push(ts_msp as u64); + } + let ret = ScyllaSeriesTsMspResponse { ts_msps }; + Ok(ret) + } +} diff --git a/httpret/src/httpret.rs b/httpret/src/httpret.rs index 3c658de..e2806f2 100644 --- a/httpret/src/httpret.rs +++ b/httpret/src/httpret.rs @@ -225,6 +225,8 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> } } else if let Some(h) = channelconfig::ChannelConfigHandler::handler(&req) { h.handle(req, &node_config).await + } else if let Some(h) = channelconfig::ScyllaChannelEventSeriesId::handler(&req) { + h.handle(req, &node_config).await } else if let Some(h) = channelconfig::ScyllaConfigsHisto::handler(&req) { h.handle(req, &node_config).await } else if let Some(h) = channelconfig::ScyllaChannelsWithType::handler(&req) { @@ -233,6 +235,8 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> h.handle(req, &node_config).await } else if let Some(h) = channelconfig::ScyllaChannelsActive::handler(&req) { h.handle(req, &node_config).await + } else if let Some(h) = channelconfig::ScyllaSeriesTsMsp::handler(&req) { + h.handle(req, &node_config).await } else if let Some(h) = channelconfig::ChannelFromSeries::handler(&req) { h.handle(req, &node_config).await } else if let Some(h) = events::EventsHandler::handler(&req) { diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index 8064fad..676d53d 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -691,12 +691,123 @@ pub struct ChannelConfig { } #[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Serialize, Deserialize)] +pub enum ShapeOld { + Scalar, + Wave(u32), + Image(u32, u32), +} + +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug)] pub enum Shape { Scalar, Wave(u32), Image(u32, u32), } +impl Serialize for Shape { + fn serialize(&self, ser: S) -> Result + where + S::Error: serde::ser::Error, + { + use Shape::*; + match self { + Scalar => ser.collect_seq([0u32; 0].iter()), + Wave(a) => ser.collect_seq([*a].iter()), + Image(a, b) => ser.collect_seq([*a, *b].iter()), + } + } +} + +struct ShapeVis; + +impl<'de> serde::de::Visitor<'de> for ShapeVis { + type Value = Shape; + + fn expecting(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.write_str("a string describing the Shape variant") + } + + fn visit_str(self, v: &str) -> Result + where + E: serde::de::Error, + { + info!("visit_str {v}"); + if v == "Scalar" { + Ok(Shape::Scalar) + } else { + Err(E::custom(format!("unexpected value: {v:?}"))) + } + } + + fn visit_map(self, mut map: A) -> Result + where + A: serde::de::MapAccess<'de>, + { + use serde::de::Error; + while let Some(key) = map.next_key::()? { + info!("See key {key:?}"); + return if key == "Wave" { + let n: u32 = map.next_value()?; + Ok(Shape::Wave(n)) + } else if key == "Image" { + let a = map.next_value::<[u32; 2]>()?; + Ok(Shape::Image(a[0], a[1])) + } else { + Err(A::Error::custom(format!("unexpected key {key:?}"))) + }; + } + Err(A::Error::custom(format!("invalid shape format"))) + } + + fn visit_seq(self, mut seq: A) -> Result + where + A: serde::de::SeqAccess<'de>, + { + info!("visit_seq"); + let mut a = vec![]; + while let Some(item) = seq.next_element()? { + let n: u32 = item; + a.push(n); + } + if a.len() == 0 { + Ok(Shape::Scalar) + } else if a.len() == 1 { + Ok(Shape::Wave(a[0])) + } else if a.len() == 2 { + Ok(Shape::Image(a[0], a[1])) + } else { + use serde::de::Error; + Err(A::Error::custom(format!("bad shape"))) + } + } +} + +impl<'de> Deserialize<'de> for Shape { + fn deserialize(de: D) -> Result + where + D: serde::Deserializer<'de>, + { + de.deserialize_any(ShapeVis) + /* + // TODO can not clone.. how to try the alternatives? + match de.deserialize_str(ShapeVis) { + Ok(k) => { + info!("De worked first try: {k:?}"); + Ok(k) + } + Err(_) => { + let ret = match >::deserialize(de)? { + ShapeOld::Scalar => Shape::Scalar, + ShapeOld::Wave(a) => Shape::Wave(a), + ShapeOld::Image(a, b) => Shape::Image(a, b), + }; + Ok(ret) + } + } + */ + } +} + impl Shape { pub fn from_bsread_jsval(v: &JsVal) -> Result { match v { @@ -814,6 +925,34 @@ impl Shape { } } +#[test] +fn test_shape_serde() { + let s = serde_json::to_string(&Shape::Image(42, 43)).unwrap(); + assert_eq!(s, r#"[42,43]"#); + let s = serde_json::to_string(&ShapeOld::Scalar).unwrap(); + assert_eq!(s, r#""Scalar""#); + let s = serde_json::to_string(&ShapeOld::Wave(8)).unwrap(); + assert_eq!(s, r#"{"Wave":8}"#); + let s = serde_json::to_string(&ShapeOld::Image(42, 43)).unwrap(); + assert_eq!(s, r#"{"Image":[42,43]}"#); + let s = serde_json::from_str::(r#""Scalar""#).unwrap(); + assert_eq!(s, ShapeOld::Scalar); + let s = serde_json::from_str::(r#"{"Wave": 123}"#).unwrap(); + assert_eq!(s, ShapeOld::Wave(123)); + let s = serde_json::from_str::(r#"{"Image":[77, 78]}"#).unwrap(); + assert_eq!(s, ShapeOld::Image(77, 78)); + let s = serde_json::from_str::(r#"[]"#).unwrap(); + assert_eq!(s, Shape::Scalar); + let s = serde_json::from_str::(r#"[12]"#).unwrap(); + assert_eq!(s, Shape::Wave(12)); + let s = serde_json::from_str::(r#"[12, 13]"#).unwrap(); + assert_eq!(s, Shape::Image(12, 13)); + let s = serde_json::from_str::(r#""Scalar""#).unwrap(); + assert_eq!(s, Shape::Scalar); + let s = serde_json::from_str::(r#"{"Wave":55}"#).unwrap(); + assert_eq!(s, Shape::Wave(55)); +} + pub trait HasShape { fn shape(&self) -> Shape; } diff --git a/nodenet/src/scylla.rs b/nodenet/src/scylla.rs index 65895f8..17f8c31 100644 --- a/nodenet/src/scylla.rs +++ b/nodenet/src/scylla.rs @@ -334,12 +334,23 @@ macro_rules! read_next_scalar_values { type ST = $st; type SCYTY = $scyty; info!("{} series {} ts_msp {}", stringify!($fname), series, ts_msp); + let _ts_lsp_max = if range.end <= ts_msp { + // TODO we should not be here... + } else { + }; + if range.end > i64::MAX as u64 { + return Err(Error::with_msg_no_trace(format!("range.end overflows i64"))); + } + let ts_lsp_max = range.end; let cql = concat!( "select ts_lsp, pulse, value from ", $table_name, - " where series = ? and ts_msp = ?" + " where series = ? and ts_msp = ? and ts_lsp < ?" ); - let res = scy.query(cql, (series, ts_msp as i64)).await.err_conv()?; + let res = scy + .query(cql, (series, ts_msp as i64, ts_lsp_max as i64)) + .await + .err_conv()?; let mut ret = ScalarEvents::::empty(); let mut discarded = 0; for row in res.rows_typed_or_empty::<(i64, i64, SCYTY)>() { @@ -369,7 +380,7 @@ macro_rules! read_next_array_values { async fn $fname( series: i64, ts_msp: u64, - range: NanoRange, + _range: NanoRange, scy: Arc, ) -> Result, Error> { type ST = $st;