diff --git a/.cargo/cargo-lock b/.cargo/cargo-lock index 3d4f20a..2f259a0 100644 --- a/.cargo/cargo-lock +++ b/.cargo/cargo-lock @@ -674,14 +674,12 @@ dependencies = [ name = "dbconn" version = "0.0.2" dependencies = [ - "arrayref", "async-channel", "byteorder", "bytes", "chrono", "crc32fast", "err", - "futures-core", "futures-util", "items", "netpod", @@ -1239,7 +1237,6 @@ dependencies = [ "streams", "taskrun", "tokio", - "tokio-postgres", "tracing", "tracing-futures", "url", diff --git a/dbconn/Cargo.toml b/dbconn/Cargo.toml index 3b33acf..800e2b1 100644 --- a/dbconn/Cargo.toml +++ b/dbconn/Cargo.toml @@ -10,22 +10,20 @@ path = "src/dbconn.rs" [dependencies] serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" -tokio = { version = "1.20.1", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } +tokio = { version = "1.23.0", features = ["time"] } tokio-postgres = { version = "0.7.7", features = ["with-chrono-0_4", "with-serde_json-1"] } -tracing = "0.1.25" -crc32fast = "1.2.1" -arrayref = "0.3.6" +tracing = "0.1.37" +crc32fast = "1.3.2" byteorder = "1.4" -futures-core = "0.3.24" -futures-util = "0.3.24" -bytes = "1.2" +futures-util = "0.3.25" +bytes = "1.3" pin-project = "1" #async-channel = "1" #dashmap = "3" scylla = "0.6.1" async-channel = "1.6" chrono = "0.4" -regex = "1.5.4" +regex = "1.7.0" err = { path = "../err" } netpod = { path = "../netpod" } parse = { path = "../parse" } diff --git a/dbconn/src/channelconfig.rs b/dbconn/src/channelconfig.rs index 744dfd9..5d5c13d 100644 --- a/dbconn/src/channelconfig.rs +++ b/dbconn/src/channelconfig.rs @@ -72,14 +72,7 @@ pub async fn chconf_from_database(channel: &Channel, ncc: &NodeConfigCached) -> } // TODO use a common already running worker pool for these queries: let dbconf = &ncc.node_config.cluster.database; - let dburl = format!( - "postgresql://{}:{}@{}:{}/{}", - dbconf.user, dbconf.pass, dbconf.host, dbconf.port, dbconf.name - ); - let (pgclient, pgconn) = tokio_postgres::connect(&dburl, tokio_postgres::NoTls) - .await - .err_conv()?; - tokio::spawn(pgconn); + let pgclient = crate::create_connection(dbconf).await?; if let Some(series) = channel.series() { let res = pgclient .query( diff --git a/dbconn/src/dbconn.rs b/dbconn/src/dbconn.rs index 869253e..02cfde4 100644 --- a/dbconn/src/dbconn.rs +++ b/dbconn/src/dbconn.rs @@ -49,6 +49,7 @@ pub async fn delay_io_medium() { } pub async fn create_connection(db_config: &Database) -> Result { + // TODO use a common already running worker pool for these queries: let d = db_config; let uri = format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, 5432, d.name); let (cl, conn) = tokio_postgres::connect(&uri, NoTls) diff --git a/dbconn/src/scan.rs b/dbconn/src/scan.rs index 26f6364..bf01cce 100644 --- a/dbconn/src/scan.rs +++ b/dbconn/src/scan.rs @@ -2,8 +2,7 @@ use crate::{create_connection, delay_io_medium, delay_io_short, ErrConv}; use async_channel::{bounded, Receiver}; use chrono::{DateTime, Utc}; use err::Error; -use futures_core::Stream; -use futures_util::FutureExt; +use futures_util::{FutureExt, Stream}; use netpod::log::*; use netpod::{Database, NodeConfigCached}; use parse::channelconfig::NErr; diff --git a/httpret/Cargo.toml b/httpret/Cargo.toml index 64ed8b2..613566b 100644 --- a/httpret/Cargo.toml +++ b/httpret/Cargo.toml @@ -14,7 +14,6 @@ http = "0.2" url = "2.2" tokio = { version = "1.23.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } hyper = { version = "0.14", features = ["http1", "http2", "client", "server", "tcp", "stream"] } -tokio-postgres = { version = "0.7.6", features = ["runtime", "with-chrono-0_4", "with-serde_json-1"] } bytes = "1.3.0" futures-util = "0.3.14" tracing = "0.1" diff --git a/httpret/src/channelconfig.rs b/httpret/src/channelconfig.rs index f39a4fc..d0bd543 100644 --- a/httpret/src/channelconfig.rs +++ b/httpret/src/channelconfig.rs @@ -11,17 +11,16 @@ use netpod::log::*; use netpod::query::prebinned::PreBinnedQuery; use netpod::query::{BinnedQuery, PlainEventsQuery}; use netpod::timeunits::*; -use netpod::{Channel, ChannelConfigQuery, Database, FromUrl, ScalarType, ScyllaConfig, Shape}; +use netpod::{Channel, ChannelConfigQuery, FromUrl, ScalarType, Shape}; use netpod::{ChannelConfigResponse, NodeConfigCached}; use netpod::{ACCEPT_ALL, APP_JSON}; use scylla::batch::Consistency; use scylla::frame::response::cql_to_rust::FromRowError as ScyFromRowError; -use scylla::transport::errors::{NewSessionError as ScyNewSessionError, QueryError as ScyQueryError}; +use scylla::transport::errors::NewSessionError as ScyNewSessionError; +use scylla::transport::errors::QueryError as ScyQueryError; use scylla::transport::iterator::NextRowError; use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; -use std::convert::TryInto; -use std::time::{Duration, Instant}; use url::Url; pub async fn chconf_from_events_binary(q: &PlainEventsQuery, ncc: &NodeConfigCached) -> Result { @@ -123,73 +122,23 @@ impl ErrConv for Result { } } -impl ErrConv for Result { - fn err_conv(self) -> Result { - match self { - Ok(k) => Ok(k), - Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))), - } - } -} - -async fn config_from_scylla( - chq: ChannelConfigQuery, - _pgconf: Database, - scyconf: ScyllaConfig, - _node_config: &NodeConfigCached, -) -> Result { - // Find the "series" id. - let scy = scylla::SessionBuilder::new() - .known_nodes(&scyconf.hosts) - .use_keyspace(&scyconf.keyspace, true) - .default_consistency(Consistency::One) - .build() - .await - .err_conv()?; - let cql = "select series, scalar_type, shape_dims from series_by_channel where facility = ? and channel_name = ?"; - let res = scy - .query(cql, (&chq.channel.backend, chq.channel.name())) - .await - .err_conv()?; - let rows: Vec<_> = res.rows_typed_or_empty::<(i64, i32, Vec)>().collect(); - if rows.len() == 0 { - return Err(Error::with_public_msg_no_trace(format!( - "can not find series for channel {}", - chq.channel.name() - ))); - } else { - for r in &rows { - if let Err(e) = r { - return Err(Error::with_msg_no_trace(format!("error {e:?}"))); - } - info!("got row {r:?}"); - } - let row = rows[0].as_ref().unwrap(); - let scalar_type = ScalarType::from_scylla_i32(row.1)?; - let shape = Shape::from_scylla_shape_dims(&row.2)?; - let res = ChannelConfigResponse { - channel: chq.channel, - scalar_type, - byte_order: None, - shape, - }; - info!("MADE: {res:?}"); - Ok(res) - } -} - -pub async fn channel_config(req: Request, node_config: &NodeConfigCached) -> Result, Error> { +async fn channel_config(req: Request, node_config: &NodeConfigCached) -> Result, Error> { info!("channel_config"); let url = Url::parse(&format!("dummy:{}", req.uri()))?; let q = ChannelConfigQuery::from_url(&url)?; info!("channel_config for q {q:?}"); - let conf = if let Some(scyco) = &node_config.node_config.cluster.scylla { - let pgconf = node_config.node_config.cluster.database.clone(); - config_from_scylla(q, pgconf, scyco.clone(), node_config).await? + let conf = if let Some(_scyco) = &node_config.node_config.cluster.scylla { + let c = dbconn::channelconfig::chconf_from_database(&q.channel, node_config).await?; + ChannelConfigResponse { + channel: q.channel, + scalar_type: c.scalar_type, + byte_order: None, + shape: c.shape, + } } else if let Some(_) = &node_config.node.channel_archiver { - return Err(Error::with_msg_no_trace("archapp not built")); + return Err(Error::with_msg_no_trace("no archiver")); } else if let Some(_) = &node_config.node.archiver_appliance { - return Err(Error::with_msg_no_trace("archapp not built")); + return Err(Error::with_msg_no_trace("no archapp")); } else { parse::channelconfig::channel_config(&q, &node_config.node).await? }; @@ -449,142 +398,6 @@ pub struct ScyllaChannelEventSeriesIdResponse { series: u64, } -/** -Get the series-id for a channel identified by backend, channel name, scalar type, shape. -*/ -pub struct ScyllaChannelEventSeriesId {} - -impl ScyllaChannelEventSeriesId { - pub fn handler(req: &Request) -> Option { - if req.uri().path() == "/api/4/scylla/channel/events/seriesId" { - Some(Self {}) - } else { - None - } - } - - pub async fn handle(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { - if req.method() == Method::GET { - let accept_def = APP_JSON; - let accept = req - .headers() - .get(http::header::ACCEPT) - .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); - if accept == APP_JSON || accept == ACCEPT_ALL { - let url = Url::parse(&format!("dummy:{}", req.uri()))?; - let q = ScyllaChannelEventSeriesIdQuery::from_url(&url)?; - match self.get_series_id(&q, node_config).await { - Ok(k) => { - let body = Body::from(serde_json::to_vec(&k)?); - Ok(response(StatusCode::OK).body(body)?) - } - Err(e) => Ok(response(StatusCode::NO_CONTENT).body(Body::from(format!("{:?}", e.public_msg())))?), - } - } else { - Ok(response(StatusCode::BAD_REQUEST).body(Body::empty())?) - } - } else { - Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) - } - } - - async fn get_series_id( - &self, - q: &ScyllaChannelEventSeriesIdQuery, - node_config: &NodeConfigCached, - ) -> Result { - let dbconf = &node_config.node_config.cluster.database; - // TODO unify the database nodes - let uri = format!( - "postgresql://{}:{}@{}:{}/{}", - dbconf.user, dbconf.pass, dbconf.host, dbconf.port, dbconf.name - ); - let (pg_client, conn) = tokio_postgres::connect(&uri, tokio_postgres::NoTls).await.err_conv()?; - // TODO monitor connection drop. - let _cjh = tokio::spawn(async move { - if let Err(e) = conn.await { - error!("connection error: {}", e); - } - Ok::<_, Error>(()) - }); - let res = pg_client - .query( - "select series from series_by_channel where facility = $1 and channel = $2 and scalar_type = $3 and shape_dims = $4 and agg_kind = 0", - &[&q.backend, &q.name, &q.scalar_type.to_scylla_i32(), &q.shape.to_scylla_vec()], - ) - .await - .err_conv()?; - if res.len() > 1 { - return Err(Error::with_msg_no_trace(format!("multiple series ids found"))); - } else if res.len() == 1 { - let series = res[0].get::<_, i64>(0) as u64; - let ret = ScyllaChannelEventSeriesIdResponse { series }; - Ok(ret) - } else if q.do_create == false { - return Err(Error::with_msg_no_trace(format!("series id not found for {}", q.name))); - } else { - let tsbeg = Instant::now(); - use md5::Digest; - let mut h = md5::Md5::new(); - h.update(q.backend.as_bytes()); - h.update(q.name.as_bytes()); - h.update(format!("{:?}", q.scalar_type).as_bytes()); - h.update(format!("{:?}", q.shape).as_bytes()); - for _ in 0..200 { - h.update(tsbeg.elapsed().subsec_nanos().to_ne_bytes()); - let f = h.clone().finalize(); - let mut series = u64::from_le_bytes(f.as_slice()[0..8].try_into().unwrap()); - if series > i64::MAX as u64 { - series &= 0x7fffffffffffffff; - } - if series == 0 { - series = 1; - } - if series <= 0 || series > i64::MAX as u64 { - return Err(Error::with_msg_no_trace(format!( - "attempt to insert bad series id {series}" - ))); - } - let res = pg_client - .execute( - concat!( - "insert into series_by_channel", - " (series, facility, channel, scalar_type, shape_dims, agg_kind)", - " values ($1, $2, $3, $4, $5, 0) on conflict do nothing" - ), - &[ - &(series as i64), - &q.backend, - &q.name, - &q.scalar_type.to_scylla_i32(), - &q.shape.to_scylla_vec(), - ], - ) - .await - .unwrap(); - if res == 1 { - let ret = ScyllaChannelEventSeriesIdResponse { series }; - return Ok(ret); - } else { - warn!( - "tried to insert {series:?} for {} {} {:?} {:?} trying again...", - q.backend, q.name, q.scalar_type, q.shape - ); - } - tokio::time::sleep(Duration::from_millis(20)).await; - } - error!( - "tried to insert new series id for {} {} {:?} {:?} but failed", - q.backend, q.name, q.scalar_type, q.shape - ); - Err(Error::with_msg_no_trace(format!( - "get_series_id can not create and insert series id {:?} {:?} {:?} {:?}", - q.backend, q.name, q.scalar_type, q.shape - ))) - } - } -} - #[derive(Clone, Debug, Serialize, Deserialize)] pub struct ScyllaChannelsActiveQuery { tsedge: u64, @@ -698,136 +511,9 @@ impl ScyllaChannelsActive { } } -#[derive(Clone, Debug, Deserialize)] -pub struct ChannelFromSeriesQuery { - #[serde(rename = "seriesId")] - series: u64, -} - -impl FromUrl for ChannelFromSeriesQuery { - fn from_url(url: &Url) -> Result { - let pairs = get_url_query_pairs(url); - Self::from_pairs(&pairs) - } - - fn from_pairs(pairs: &BTreeMap) -> Result { - let s = pairs - .get("seriesId") - .ok_or_else(|| Error::with_public_msg_no_trace("missing seriesId"))?; - let series: u64 = s.parse()?; - Ok(Self { series }) - } -} - -#[derive(Clone, Debug, Serialize)] -pub struct ChannelFromSeriesResponse { - backend: String, - #[serde(rename = "channelName")] - channel: String, - #[serde(rename = "scalarType")] - scalar_type: ScalarType, - shape: Shape, - // TODO need a unique representation of the agg kind in the registry. - #[serde(skip_serializing_if = "u32_zero")] - agg_kind: u32, -} - -fn u32_zero(x: &u32) -> bool { - *x == 0 -} - -pub struct ChannelFromSeries {} - -impl ChannelFromSeries { - pub fn handler(req: &Request) -> Option { - if req.uri().path() == "/api/4/channel/series" { - Some(Self {}) - } else { - None - } - } - - pub async fn handle(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { - if req.method() == Method::GET { - let accept_def = APP_JSON; - let accept = req - .headers() - .get(http::header::ACCEPT) - .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); - if accept == APP_JSON || accept == ACCEPT_ALL { - let url = Url::parse(&format!("dummy:{}", req.uri()))?; - let q = ChannelFromSeriesQuery::from_url(&url)?; - match self.get_data(&q, node_config).await { - Ok(k) => { - let body = Body::from(serde_json::to_vec(&k)?); - Ok(response(StatusCode::OK).body(body)?) - } - Err(e) => Ok(response(StatusCode::INTERNAL_SERVER_ERROR) - .body(Body::from(format!("{:?}", e.public_msg())))?), - } - } else { - Ok(response(StatusCode::BAD_REQUEST).body(Body::empty())?) - } - } else { - Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) - } - } - - async fn get_data( - &self, - q: &ChannelFromSeriesQuery, - node_config: &NodeConfigCached, - ) -> Result { - let series = q.series as i64; - //let pgconn = create_connection(&node_config.node_config.cluster.database).await?; - let dbconf = &node_config.node_config.cluster.database; - // TODO unify the database nodes - let uri = format!( - "postgresql://{}:{}@{}:{}/{}", - dbconf.user, dbconf.pass, dbconf.host, dbconf.port, dbconf.name - ); - let (pgclient, conn) = tokio_postgres::connect(&uri, tokio_postgres::NoTls).await.err_conv()?; - // TODO monitor connection drop. - let _cjh = tokio::spawn(async move { - if let Err(e) = conn.await { - error!("connection error: {}", e); - } - Ok::<_, Error>(()) - }); - let res = pgclient - .query( - "select facility, channel, scalar_type, shape_dims, agg_kind from series_by_channel where series = $1", - &[&series], - ) - .await?; - let res = if let Some(row) = res.first() { - row - } else { - // TODO return code 204 - return Err(Error::with_msg_no_trace("can not find series")); - }; - let backend: String = res.get(0); - let channel: String = res.get(1); - let scalar_type: i32 = res.get(2); - // TODO check and document the format in the storage: - let scalar_type = ScalarType::from_dtype_index(scalar_type as u8)?; - let shape: Vec = res.get(3); - let shape = Shape::from_scylla_shape_dims(&shape)?; - let agg_kind: i32 = res.get(4); - // TODO method is called from_scylla_shape_dims but document that postgres uses the same format. - let ret = ChannelFromSeriesResponse { - backend, - channel, - scalar_type, - shape, - agg_kind: agg_kind as u32, - }; - Ok(ret) - } -} - #[derive(Clone, Debug, Deserialize)] pub struct IocForChannelQuery { + #[serde(rename = "channelBackend")] backend: String, #[serde(rename = "channelName")] name: String, diff --git a/httpret/src/httpret.rs b/httpret/src/httpret.rs index be79ac6..9816a70 100644 --- a/httpret/src/httpret.rs +++ b/httpret/src/httpret.rs @@ -292,10 +292,6 @@ async fn http_service_inner( } } else if let Some(h) = channelconfig::ChannelConfigHandler::handler(&req) { h.handle(req, &node_config).await - } else if let Some(h) = channelconfig::ScyllaChannelEventSeriesId::handler(&req) { - h.handle(req, &node_config).await - } else if let Some(h) = channelconfig::ScyllaConfigsHisto::handler(&req) { - h.handle(req, &node_config).await } else if let Some(h) = channelconfig::ScyllaChannelsWithType::handler(&req) { h.handle(req, &node_config).await } else if let Some(h) = channelconfig::IocForChannel::handler(&req) { @@ -304,12 +300,8 @@ async fn http_service_inner( h.handle(req, &node_config).await } else if let Some(h) = channelconfig::ScyllaSeriesTsMsp::handler(&req) { h.handle(req, &node_config).await - } else if let Some(h) = channelconfig::ChannelFromSeries::handler(&req) { - h.handle(req, &node_config).await } else if let Some(h) = channelconfig::AmbigiousChannelNames::handler(&req) { h.handle(req, &node_config).await - } else if let Some(h) = channelconfig::GenerateScyllaTestData::handler(&req) { - h.handle(req, &node_config).await } else if let Some(h) = events::EventsHandler::handler(&req) { h.handle(req, &node_config).await } else if let Some(h) = channel_status::ConnectionStatusEvents::handler(&req) { diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index d8223fd..d9b9cdd 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -2056,11 +2056,13 @@ impl AppendToUrl for ChannelConfigQuery { #[derive(Debug, Serialize, Deserialize)] pub struct ChannelConfigResponse { + #[serde(rename = "channel")] pub channel: Channel, #[serde(rename = "scalarType")] pub scalar_type: ScalarType, #[serde(rename = "byteOrder")] pub byte_order: Option, + #[serde(rename = "shape")] pub shape: Shape, } diff --git a/scyllaconn/src/config.rs b/scyllaconn/src/config.rs new file mode 100644 index 0000000..e8c439d --- /dev/null +++ b/scyllaconn/src/config.rs @@ -0,0 +1,50 @@ +use crate::errconv::ErrConv; +use err::Error; +use futures_util::StreamExt; +use netpod::{log::*, Channel, ScalarType, Shape}; +use netpod::{ChannelConfigQuery, ChannelConfigResponse}; +use scylla::Session as ScySession; +use std::sync::Arc; + +// TODO unused, table in postgres. +pub async fn config_from_scylla(chq: ChannelConfigQuery, scy: Arc) -> Result { + let cql = "select series, scalar_type, shape_dims from series_by_channel where facility = ? and channel_name = ?"; + let mut it = scy + .query_iter(cql, (chq.channel.backend(), chq.channel.name())) + .await + .err_conv()?; + let mut rows = Vec::new(); + while let Some(row) = it.next().await { + let row = row.err_conv()?; + let cols = row.into_typed::<(i64, i32, Vec)>().err_conv()?; + let scalar_type = ScalarType::from_scylla_i32(cols.1)?; + let shape = Shape::from_scylla_shape_dims(&cols.2)?; + let channel = Channel { + series: Some(cols.0 as _), + backend: chq.channel.backend().into(), + name: chq.channel.name().into(), + }; + let res = ChannelConfigResponse { + channel, + scalar_type, + byte_order: None, + shape, + }; + info!("config_from_scylla: {res:?}"); + rows.push(res); + } + if rows.is_empty() { + return Err(Error::with_public_msg_no_trace(format!( + "can not find config for channel {:?}", + chq.channel + ))); + } else { + if rows.len() > 1 { + error!( + "Found multiple configurations for channel {:?} {:?}", + chq.channel, rows + ); + } + Ok(rows.pop().unwrap()) + } +} diff --git a/scyllaconn/src/scyllaconn.rs b/scyllaconn/src/scyllaconn.rs index 9299e46..1c915d9 100644 --- a/scyllaconn/src/scyllaconn.rs +++ b/scyllaconn/src/scyllaconn.rs @@ -1,4 +1,5 @@ pub mod bincache; +pub mod config; pub mod errconv; pub mod events;