From 3cd1b7a64003399a9eb8b543c28623b30892081a Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 3 Jun 2022 16:57:59 +0200 Subject: [PATCH] Find active channels and deliver values --- Cargo.toml | 2 +- archapp/src/archeng/blockrefstream.rs | 2 + archapp/src/archeng/blockstream.rs | 2 + archapp/src/archeng/configs.rs | 1 + commonio/Cargo.toml | 2 +- daqbuffer/Cargo.toml | 2 +- daqbuffer/src/bin/daqbuffer.rs | 1 + daqbufp2/src/client.rs | 1 + daqbufp2/src/test/binnedbinary.rs | 1 + daqbufp2/src/test/binnedjson.rs | 4 + .../src/test/binnedjson/channelarchiver.rs | 4 + daqbufp2/src/test/events.rs | 2 + daqbufp2/src/test/timeweightedjson.rs | 1 + dbconn/Cargo.toml | 3 +- dbconn/src/lib.rs | 47 +- dbconn/src/scan.rs | 74 ++-- dbconn/src/search.rs | 76 +++- disk/src/aggtest.rs | 3 +- disk/src/binned.rs | 14 +- disk/src/binned/binnedfrompbv.rs | 6 +- disk/src/binned/pbv.rs | 10 +- disk/src/binned/prebinned.rs | 21 +- disk/src/binned/query.rs | 44 +- disk/src/channelexec.rs | 21 +- disk/src/dataopen.rs | 1 + disk/src/eventblobs.rs | 1 + disk/src/events.rs | 23 +- disk/src/frame/inmem.rs | 12 +- disk/src/gen.rs | 6 +- disk/src/merge.rs | 1 + dq/Cargo.toml | 2 +- dq/src/bin/dq.rs | 1 + dq/src/dq.rs | 1 + httpret/Cargo.toml | 1 + httpret/src/api1.rs | 1 + httpret/src/channelarchiver.rs | 2 + httpret/src/channelconfig.rs | 403 +++++++++++++++++- httpret/src/events.rs | 25 +- httpret/src/evinfo.rs | 4 + httpret/src/httpret.rs | 30 +- httpret/src/proxy.rs | 2 + netpod/src/netpod.rs | 149 ++++++- netpod/src/query.rs | 7 +- nodenet/Cargo.toml | 1 + nodenet/src/conn.rs | 56 ++- nodenet/src/scylla.rs | 165 +++---- parse/Cargo.toml | 2 +- taskrun/Cargo.toml | 8 +- taskrun/src/taskrun.rs | 24 +- 49 files changed, 1002 insertions(+), 270 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 2701596..04210ff 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,4 +21,4 @@ codegen-units = 32 incremental = true [patch.crates-io] -tokio = { git = "https://github.com/dominikwerder/tokio", rev = "995221d8" } +#tokio = { git = "https://github.com/dominikwerder/tokio", rev = "995221d8" } diff --git a/archapp/src/archeng/blockrefstream.rs b/archapp/src/archeng/blockrefstream.rs index cbc75c2..57c698c 100644 --- a/archapp/src/archeng/blockrefstream.rs +++ b/archapp/src/archeng/blockrefstream.rs @@ -198,6 +198,7 @@ mod test { let channel = Channel { backend: "sls-archive".into(), name: "X05DA-FE-WI1:TC1".into(), + series: None, }; use chrono::{DateTime, Utc}; let dtbeg: DateTime = "2021-10-01T00:00:00Z".parse()?; @@ -211,6 +212,7 @@ mod test { }; let dbconf = Database { host: "localhost".into(), + port: 5432, name: "testingdaq".into(), user: "testingdaq".into(), pass: "testingdaq".into(), diff --git a/archapp/src/archeng/blockstream.rs b/archapp/src/archeng/blockstream.rs index dadeb87..6c1c4c8 100644 --- a/archapp/src/archeng/blockstream.rs +++ b/archapp/src/archeng/blockstream.rs @@ -485,9 +485,11 @@ mod test { backend: "sls-archive".into(), //name: "X05DA-FE-WI1:TC1".into(), name: "ARIDI-PCT:CURRENT".into(), + series: None, }; let dbconf = Database { host: "localhost".into(), + port: 5432, name: "testingdaq".into(), user: "testingdaq".into(), pass: "testingdaq".into(), diff --git a/archapp/src/archeng/configs.rs b/archapp/src/archeng/configs.rs index 9e88717..014a0c6 100644 --- a/archapp/src/archeng/configs.rs +++ b/archapp/src/archeng/configs.rs @@ -227,6 +227,7 @@ impl Stream for ConfigStream { let channel = Channel { name: item, backend: "".into(), + series: None, }; let now = SystemTime::now() .duration_since(std::time::UNIX_EPOCH) diff --git a/commonio/Cargo.toml b/commonio/Cargo.toml index 8d573b8..6822989 100644 --- a/commonio/Cargo.toml +++ b/commonio/Cargo.toml @@ -8,7 +8,7 @@ edition = "2021" path = "src/commonio.rs" [dependencies] -tokio = { version = "=1.16.1", features = ["io-util", "net", "time", "sync", "fs", "parking_lot"] } +tokio = { version = "1.18.1", features = ["io-util", "net", "time", "fs"] } tracing = "0.1" futures-core = "0.3.15" futures-util = "0.3.15" diff --git a/daqbuffer/Cargo.toml b/daqbuffer/Cargo.toml index c388a87..8665f77 100644 --- a/daqbuffer/Cargo.toml +++ b/daqbuffer/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Dominik Werder "] edition = "2021" [dependencies] -tokio = { version = "=1.16.1", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } +tokio = { version = "1.18.1", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } hyper = "0.14" http = "0.2" tracing = "0.1.25" diff --git a/daqbuffer/src/bin/daqbuffer.rs b/daqbuffer/src/bin/daqbuffer.rs index 116d0f3..d1fed44 100644 --- a/daqbuffer/src/bin/daqbuffer.rs +++ b/daqbuffer/src/bin/daqbuffer.rs @@ -116,6 +116,7 @@ fn simple_fetch() { channel: Channel { backend: "sf-databuffer".into(), name: "S10BC01-DBAM070:BAM_CH1_NORM".into(), + series: None, }, keyspace: 3, time_bin_size: Nanos { ns: DAY }, diff --git a/daqbufp2/src/client.rs b/daqbufp2/src/client.rs index 109527c..f4654fe 100644 --- a/daqbufp2/src/client.rs +++ b/daqbufp2/src/client.rs @@ -56,6 +56,7 @@ pub async fn get_binned( let channel = Channel { backend: channel_backend.clone(), name: channel_name.into(), + series: None, }; let agg_kind = AggKind::DimXBins1; let range = NanoRange::from_date_time(beg_date, end_date); diff --git a/daqbufp2/src/test/binnedbinary.rs b/daqbufp2/src/test/binnedbinary.rs index 3abe68a..aea86b2 100644 --- a/daqbufp2/src/test/binnedbinary.rs +++ b/daqbufp2/src/test/binnedbinary.rs @@ -102,6 +102,7 @@ where let channel = Channel { backend: channel_backend.into(), name: channel_name.into(), + series:None, }; let range = NanoRange::from_date_time(beg_date, end_date); let mut query = BinnedQuery::new(channel, range, bin_count, agg_kind); diff --git a/daqbufp2/src/test/binnedjson.rs b/daqbufp2/src/test/binnedjson.rs index 867b226..2785636 100644 --- a/daqbufp2/src/test/binnedjson.rs +++ b/daqbufp2/src/test/binnedjson.rs @@ -119,6 +119,7 @@ fn get_sls_archive_1() -> Result<(), Error> { let channel = Channel { backend: "sls-archive".into(), name: "ABOMA-CH-6G:U-DCLINK".into(), + series: None, }; let begstr = "2021-11-10T01:00:00Z"; let endstr = "2021-11-10T01:01:00Z"; @@ -140,6 +141,7 @@ fn get_sls_archive_3() -> Result<(), Error> { let channel = Channel { backend: "sls-archive".into(), name: "ARIDI-PCT:CURRENT".into(), + series: None, }; let begstr = "2021-11-09T00:00:00Z"; let endstr = "2021-11-11T00:10:00Z"; @@ -161,6 +163,7 @@ fn get_sls_archive_wave_2() -> Result<(), Error> { let channel = Channel { backend: "sls-archive".into(), name: "ARIDI-MBF-X:CBM-IN".into(), + series: None, }; let begstr = "2021-11-09T10:00:00Z"; let endstr = "2021-11-10T06:00:00Z"; @@ -192,6 +195,7 @@ async fn get_binned_json_common( let channel = Channel { backend: channel_backend.into(), name: channel_name.into(), + series: None, }; let range = NanoRange::from_date_time(beg_date, end_date); let mut query = BinnedQuery::new(channel, range, bin_count, agg_kind); diff --git a/daqbufp2/src/test/binnedjson/channelarchiver.rs b/daqbufp2/src/test/binnedjson/channelarchiver.rs index b1e8b49..140e872 100644 --- a/daqbufp2/src/test/binnedjson/channelarchiver.rs +++ b/daqbufp2/src/test/binnedjson/channelarchiver.rs @@ -8,6 +8,7 @@ fn get_scalar_2_events() -> Result<(), Error> { let channel = Channel { backend: "sls-archive".into(), name: "ARIDI-PCT:CURRENT".into(), + series: None, }; let begstr = "2021-11-10T00:00:00Z"; let endstr = "2021-11-10T00:10:00Z"; @@ -50,6 +51,7 @@ fn get_scalar_2_binned() -> Result<(), Error> { let channel = Channel { backend: "sls-archive".into(), name: "ARIDI-PCT:CURRENT".into(), + series: None, }; let begstr = "2021-11-10T00:00:00Z"; let endstr = "2021-11-10T00:10:00Z"; @@ -71,6 +73,7 @@ fn get_wave_1_events() -> Result<(), Error> { let channel = Channel { backend: "sls-archive".into(), name: "ARIDI-MBF-X:CBM-IN".into(), + series: None, }; let begstr = "2021-11-09T00:00:00Z"; let endstr = "2021-11-09T00:10:00Z"; @@ -111,6 +114,7 @@ fn get_wave_1_binned() -> Result<(), Error> { let channel = Channel { backend: "sls-archive".into(), name: "ARIDI-MBF-X:CBM-IN".into(), + series: None, }; let begstr = "2021-11-09T00:00:00Z"; let endstr = "2021-11-11T00:10:00Z"; diff --git a/daqbufp2/src/test/events.rs b/daqbufp2/src/test/events.rs index 999a305..3d9e7ed 100644 --- a/daqbufp2/src/test/events.rs +++ b/daqbufp2/src/test/events.rs @@ -62,6 +62,7 @@ where let channel = Channel { backend: channel_backend.into(), name: channel_name.into(), + series: None, }; let range = NanoRange::from_date_time(beg_date, end_date); let query = PlainEventsBinaryQuery::new(channel, range, 1024 * 4); @@ -272,6 +273,7 @@ pub async fn get_plain_events_json( let channel = Channel { backend: channel_backend.into(), name: channel_name.into(), + series: None, }; let range = NanoRange::from_date_time(beg_date, end_date); let query = PlainEventsJsonQuery::new(channel, range, 1024 * 4, None, false); diff --git a/daqbufp2/src/test/timeweightedjson.rs b/daqbufp2/src/test/timeweightedjson.rs index 1aad737..22d6767 100644 --- a/daqbufp2/src/test/timeweightedjson.rs +++ b/daqbufp2/src/test/timeweightedjson.rs @@ -172,6 +172,7 @@ async fn get_json_common( let channel = Channel { backend: channel_backend.into(), name: channel_name.into(), + series: None, }; let range = NanoRange::from_date_time(beg_date, end_date); let mut query = BinnedQuery::new(channel, range, bin_count, agg_kind); diff --git a/dbconn/Cargo.toml b/dbconn/Cargo.toml index c378a71..d93b2e4 100644 --- a/dbconn/Cargo.toml +++ b/dbconn/Cargo.toml @@ -18,7 +18,8 @@ bytes = "1.0.1" pin-project = "1.0.7" #async-channel = "1" #dashmap = "3" -tokio-postgres = { version = "0.7.4", features = ["runtime", "with-chrono-0_4", "with-serde_json-1"] } +tokio-postgres = { version = "0.7.6", features = ["runtime", "with-chrono-0_4", "with-serde_json-1"] } +scylla = "0.4.4" async-channel = "1.6" chrono = "0.4" regex = "1.5.4" diff --git a/dbconn/src/lib.rs b/dbconn/src/lib.rs index 639f84e..b0a6c32 100644 --- a/dbconn/src/lib.rs +++ b/dbconn/src/lib.rs @@ -1,6 +1,5 @@ pub mod scan; pub mod search; - pub mod pg { pub use tokio_postgres::{Client, Error}; } @@ -8,15 +7,17 @@ pub mod pg { use err::Error; use netpod::log::*; use netpod::{Channel, Database, NodeConfigCached}; +use scylla::frame::response::cql_to_rust::FromRowError as ScyFromRowError; +use scylla::transport::errors::{NewSessionError as ScyNewSessionError, QueryError as ScyQueryError}; use std::time::Duration; use tokio_postgres::{Client, NoTls}; trait ErrConv { - fn errconv(self) -> Result; + fn err_conv(self) -> Result; } impl ErrConv for Result { - fn errconv(self) -> Result { + fn err_conv(self) -> Result { match self { Ok(k) => Ok(k), Err(e) => Err(Error::with_msg(e.to_string())), @@ -25,13 +26,39 @@ impl ErrConv for Result { } impl ErrConv for Result> { - fn errconv(self) -> Result { + fn err_conv(self) -> Result { match self { Ok(k) => Ok(k), Err(e) => Err(Error::with_msg(e.to_string())), } } } +impl ErrConv for Result { + fn err_conv(self) -> Result { + match self { + Ok(k) => Ok(k), + Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))), + } + } +} + +impl ErrConv for Result { + fn err_conv(self) -> Result { + match self { + Ok(k) => Ok(k), + Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))), + } + } +} + +impl ErrConv for Result { + fn err_conv(self) -> Result { + match self { + Ok(k) => Ok(k), + Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))), + } + } +} pub async fn delay_us(mu: u64) { tokio::time::sleep(Duration::from_micros(mu)).await; @@ -68,7 +95,7 @@ pub async fn channel_exists(channel: &Channel, node_config: &NodeConfigCached) - let rows = cl .query("select rowid from channels where name = $1::text", &[&channel.name]) .await - .errconv()?; + .err_conv()?; debug!("channel_exists {} rows", rows.len()); for row in rows { debug!( @@ -89,7 +116,7 @@ pub async fn database_size(node_config: &NodeConfigCached) -> Result &[&node_config.node_config.cluster.database.name], ) .await - .errconv()?; + .err_conv()?; if rows.len() == 0 { Err(Error::with_msg("could not get database size"))?; } @@ -115,7 +142,7 @@ pub async fn table_sizes(node_config: &NodeConfigCached) -> Result Result Result { let sql = "select name from channels order by rowid limit 1 offset (random() * (select count(rowid) from channels))::bigint"; let cl = create_connection(&node_config.node_config.cluster.database).await?; - let rows = cl.query(sql, &[]).await.errconv()?; + let rows = cl.query(sql, &[]).await.err_conv()?; if rows.len() == 0 { Err(Error::with_msg("can not get random channel"))?; } @@ -141,11 +168,11 @@ pub async fn insert_channel(name: String, facility: i64, dbc: &Client) -> Result &[&facility, &name], ) .await - .errconv()?; + .err_conv()?; if rows[0].get::<_, i64>(0) == 0 { let sql = concat!("insert into channels (facility, name) values ($1, $2) on conflict (facility, name) do nothing"); - dbc.query(sql, &[&facility, &name]).await.errconv()?; + dbc.query(sql, &[&facility, &name]).await.err_conv()?; } Ok(()) } diff --git a/dbconn/src/scan.rs b/dbconn/src/scan.rs index 2d36398..6ec0036 100644 --- a/dbconn/src/scan.rs +++ b/dbconn/src/scan.rs @@ -49,7 +49,7 @@ pub async fn get_node_disk_ident(node_config: &NodeConfigCached, dbc: &Client) - let rows = dbc .query(sql, &[&node_config.node_config.cluster.backend, &node_config.node.host]) .await - .errconv()?; + .err_conv()?; if rows.len() != 1 { return Err(Error::with_msg(format!( "get_node can't find unique entry for {} {}", @@ -73,7 +73,7 @@ pub async fn get_node_disk_ident_2( let rows = dbc .query(sql, &[&node_config.node_config.cluster.backend, &node_config.node.host]) .await - .errconv()?; + .err_conv()?; if rows.len() != 1 { return Err(Error::with_msg(format!( "get_node can't find unique entry for {} {}", @@ -327,16 +327,16 @@ impl Stream for UpdatedDbWithChannelNamesStream { async fn update_db_with_channel_name_list(list: Vec, backend: i64, dbc: &Client) -> Result<(), Error> { crate::delay_io_short().await; - dbc.query("begin", &[]).await.errconv()?; + dbc.query("begin", &[]).await.err_conv()?; for ch in list { dbc.query( "insert into channels (facility, name) values ($1, $2) on conflict do nothing", &[&backend, &ch], ) .await - .errconv()?; + .err_conv()?; } - dbc.query("commit", &[]).await.errconv()?; + dbc.query("commit", &[]).await.err_conv()?; Ok(()) } @@ -351,7 +351,7 @@ pub async fn update_db_with_channel_names( let dbc = crate::create_connection(&db_config).await?; let node_disk_ident = get_node_disk_ident(&node_config, &dbc).await?; let c1 = Arc::new(RwLock::new(0u32)); - dbc.query("begin", &[]).await.errconv()?; + dbc.query("begin", &[]).await.err_conv()?; let dbc = Arc::new(dbc); let tx = Arc::new(tx); let base_path = &node_config @@ -373,33 +373,33 @@ pub async fn update_db_with_channel_names( &[&fac, &ch], ) .await - .errconv()?; + .err_conv()?; let c2 = { let mut g = c1.write()?; *g += 1; *g }; if c2 % 200 == 0 { - dbc.query("commit", &[]).await.errconv()?; + dbc.query("commit", &[]).await.err_conv()?; let ret = UpdatedDbWithChannelNames { msg: format!("current {}", ch), count: c2, }; - tx.send(Ok(ret)).await.errconv()?; + tx.send(Ok(ret)).await.err_conv()?; crate::delay_io_medium().await; - dbc.query("begin", &[]).await.errconv()?; + dbc.query("begin", &[]).await.err_conv()?; } Ok(()) } }) .await?; - dbc.query("commit", &[]).await.errconv()?; + dbc.query("commit", &[]).await.err_conv()?; let c2 = *c1.read()?; let ret = UpdatedDbWithChannelNames { msg: format!("all done"), count: c2, }; - tx.send(Ok(ret)).await.errconv()?; + tx.send(Ok(ret)).await.err_conv()?; Ok::<_, Error>(()) }; let block2 = async move { @@ -468,15 +468,15 @@ pub async fn update_db_with_all_channel_configs( &[&node_disk_ident.facility], ) .await - .errconv()?; + .err_conv()?; let mut c1 = 0; - dbc.query("begin", &[]).await.errconv()?; + dbc.query("begin", &[]).await.err_conv()?; let mut count_inserted = 0; let mut count_updated = 0; for row in rows { - let rowid: i64 = row.try_get(0).errconv()?; - let _facility: i64 = row.try_get(1).errconv()?; - let channel: String = row.try_get(2).errconv()?; + let rowid: i64 = row.try_get(0).err_conv()?; + let _facility: i64 = row.try_get(1).err_conv()?; + let channel: String = row.try_get(2).err_conv()?; match update_db_with_channel_config( node_config, node_disk_ident, @@ -499,26 +499,26 @@ pub async fn update_db_with_all_channel_configs( Ok(UpdateChannelConfigResult::Done) => { c1 += 1; if c1 % 200 == 0 { - dbc.query("commit", &[]).await.errconv()?; + dbc.query("commit", &[]).await.err_conv()?; let msg = format!( "channel no {:6} inserted {:6} updated {:6}", c1, count_inserted, count_updated ); let ret = UpdatedDbWithAllChannelConfigs { msg, count: c1 }; - tx.send(Ok(ret)).await.errconv()?; - dbc.query("begin", &[]).await.errconv()?; + tx.send(Ok(ret)).await.err_conv()?; + dbc.query("begin", &[]).await.err_conv()?; } crate::delay_io_short().await; } } } - dbc.query("commit", &[]).await.errconv()?; + dbc.query("commit", &[]).await.err_conv()?; let msg = format!( "ALL DONE channel no {:6} inserted {:6} updated {:6}", c1, count_inserted, count_updated ); let ret = UpdatedDbWithAllChannelConfigs { msg, count: c1 }; - tx.send(Ok(ret)).await.errconv()?; + tx.send(Ok(ret)).await.err_conv()?; Ok::<_, Error>(()) } .then({ @@ -528,7 +528,7 @@ pub async fn update_db_with_all_channel_configs( Err(e) => { let msg = format!("Seeing error: {:?}", e); let ret = UpdatedDbWithAllChannelConfigs { msg, count: 0 }; - tx2.send(Ok(ret)).await.errconv()?; + tx2.send(Ok(ret)).await.err_conv()?; } } Ok::<_, Error>(()) @@ -551,7 +551,7 @@ pub async fn update_db_with_all_channel_configs( pub async fn update_search_cache(node_config: &NodeConfigCached) -> Result<(), Error> { let dbc = crate::create_connection(&node_config.node_config.cluster.database).await?; - dbc.query("select update_cache()", &[]).await.errconv()?; + dbc.query("select update_cache()", &[]).await.err_conv()?; Ok(()) } @@ -597,7 +597,7 @@ pub async fn update_db_with_channel_config( &[&node_disk_ident.rowid(), &channel_id], ) .await - .errconv()?; + .err_conv()?; if rows.len() > 1 { return Err(Error::with_msg("more than one row")); } @@ -612,7 +612,7 @@ pub async fn update_db_with_channel_config( "insert into configs_history (rowid_original, node, channel, fileSize, parsedUntil, config, tsinsert) ", "select rowid as rowid_original, node, channel, fileSize, parsedUntil, config, now() from configs where rowid = $1" ); - dbc.query(sql, &[&rowid]).await.errconv()?; + dbc.query(sql, &[&rowid]).await.err_conv()?; } //ensure!(meta.len() >= parsed_until as u64, ConfigFileOnDiskShrunk{path}); (Some(rowid), true) @@ -635,14 +635,14 @@ pub async fn update_db_with_channel_config( ], ) .await - .errconv()?; + .err_conv()?; *count_inserted += 1; } Some(_config_id_2) => { dbc.query( "insert into configs (node, channel, fileSize, parsedUntil, config) values ($1, $2, $3, $4, $5) on conflict (node, channel) do update set fileSize = $3, parsedUntil = $4, config = $5", &[&node_disk_ident.rowid(), &channel_id, &(meta.len() as i64), &(buf.len() as i64), &serde_json::to_value(config)?], - ).await.errconv()?; + ).await.err_conv()?; *count_updated += 1; } } @@ -662,25 +662,25 @@ pub async fn update_db_with_all_channel_datafiles( &[&node_disk_ident.facility()], ) .await - .errconv()?; + .err_conv()?; let mut c1 = 0; - dbc.query("begin", &[]).await.errconv()?; + dbc.query("begin", &[]).await.err_conv()?; for row in rows { - let rowid: i64 = row.try_get(0).errconv()?; - let _facility: i64 = row.try_get(1).errconv()?; - let channel: String = row.try_get(2).errconv()?; + let rowid: i64 = row.try_get(0).err_conv()?; + let _facility: i64 = row.try_get(1).err_conv()?; + let channel: String = row.try_get(2).err_conv()?; update_db_with_channel_datafiles(node_config, node_disk_ident, ks_prefix, rowid, &channel, dbc.clone()).await?; c1 += 1; if c1 % 40 == 0 { trace!("import datafiles {} {}", c1, channel); - dbc.query("commit", &[]).await.errconv()?; - dbc.query("begin", &[]).await.errconv()?; + dbc.query("commit", &[]).await.err_conv()?; + dbc.query("begin", &[]).await.err_conv()?; } if false && c1 >= 30 { break; } } - dbc.query("commit", &[]).await.errconv()?; + dbc.query("commit", &[]).await.err_conv()?; Ok(()) } @@ -744,7 +744,7 @@ impl ChannelDatafileDescSink for DatafileDbWriter { &((k.timebin() + 1) as i64 * k.binsize() as i64), &serde_json::to_value(k)?, ] - ).await.errconv()?; + ).await.err_conv()?; *c1.write()? += 1; Ok(()) }) diff --git a/dbconn/src/search.rs b/dbconn/src/search.rs index 45c1b12..9997af4 100644 --- a/dbconn/src/search.rs +++ b/dbconn/src/search.rs @@ -2,8 +2,10 @@ use crate::{create_connection, ErrConv}; use err::Error; use netpod::{ ChannelArchiver, ChannelSearchQuery, ChannelSearchResult, ChannelSearchSingleResult, Database, NodeConfigCached, + ScalarType, ScyllaConfig, Shape, }; use serde_json::Value as JsVal; +use std::sync::Arc; pub async fn search_channel_databuffer( query: ChannelSearchQuery, @@ -34,7 +36,7 @@ pub async fn search_channel_databuffer( &[&query.name_regex, &query.source_regex, &query.description_regex, &"asc"], ) .await - .errconv()?; + .err_conv()?; let mut res = vec![]; for row in rows { let shapedb: Option = row.get(4); @@ -64,6 +66,7 @@ pub async fn search_channel_databuffer( let k = ChannelSearchSingleResult { backend: row.get(7), name: row.get(1), + series: row.get::<_, i64>(0) as u64, source: row.get(2), ty, shape: shape, @@ -77,6 +80,65 @@ pub async fn search_channel_databuffer( Ok(ret) } +pub async fn search_channel_scylla( + query: ChannelSearchQuery, + _scyconf: &ScyllaConfig, + pgconf: &Database, +) -> Result { + let empty = if !query.name_regex.is_empty() { + false + } else if !query.source_regex.is_empty() { + false + } else if !query.description_regex.is_empty() { + false + } else { + true + }; + if empty { + let ret = ChannelSearchResult { channels: vec![] }; + return Ok(ret); + } + let sql = format!(concat!( + "select", + " series, facility, channel, scalar_type, shape_dims", + " from series_by_channel", + " where channel like $1", + )); + let u = { + let d = &pgconf; + format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, d.port, d.name) + }; + let (pgclient, pgconn) = tokio_postgres::connect(&u, tokio_postgres::NoTls).await.err_conv()?; + // TODO use common connection/pool: + tokio::spawn(pgconn); + let pgclient = Arc::new(pgclient); + let rows = pgclient.query(sql.as_str(), &[&query.name_regex]).await.err_conv()?; + let mut res = vec![]; + for row in rows { + let series = row.get::<_, i64>(0) as u64; + let facility: String = row.get(1); + let channel: String = row.get(2); + let a: i32 = row.get(3); + let scalar_type = ScalarType::from_scylla_i32(a)?; + let a: Vec = row.get(4); + let shape = Shape::from_scylla_shape_dims(&a)?; + let k = ChannelSearchSingleResult { + backend: facility, + name: channel, + series, + source: "".into(), + ty: scalar_type.to_variant_str().into(), + shape: shape.to_scylla_vec().into_iter().map(|x| x as u32).collect(), + unit: "".into(), + description: "".into(), + is_api_0: None, + }; + res.push(k); + } + let ret = ChannelSearchResult { channels: res }; + Ok(ret) +} + pub async fn search_channel_archeng( query: ChannelSearchQuery, backend: String, @@ -106,7 +168,7 @@ pub async fn search_channel_archeng( " limit 100" )); let cl = create_connection(database).await?; - let rows = cl.query(sql.as_str(), &[&query.name_regex]).await.errconv()?; + let rows = cl.query(sql.as_str(), &[&query.name_regex]).await.err_conv()?; let mut res = vec![]; for row in rows { let name: String = row.get(0); @@ -174,6 +236,8 @@ pub async fn search_channel_archeng( let k = ChannelSearchSingleResult { backend: backend.clone(), name, + // TODO provide a unique id also within this backend: + series: 0, source: String::new(), ty: st.into(), shape, @@ -191,9 +255,11 @@ pub async fn search_channel( query: ChannelSearchQuery, node_config: &NodeConfigCached, ) -> Result { - let database = &node_config.node_config.cluster.database; - if let Some(conf) = node_config.node.channel_archiver.as_ref() { - search_channel_archeng(query, node_config.node_config.cluster.backend.clone(), conf, database).await + let pgconf = &node_config.node_config.cluster.database; + if let Some(scyconf) = node_config.node_config.cluster.scylla.as_ref() { + search_channel_scylla(query, scyconf, pgconf).await + } else if let Some(conf) = node_config.node.channel_archiver.as_ref() { + search_channel_archeng(query, node_config.node_config.cluster.backend.clone(), conf, pgconf).await } else if let Some(_conf) = node_config.node.archiver_appliance.as_ref() { // TODO err::todoval() diff --git a/disk/src/aggtest.rs b/disk/src/aggtest.rs index eb4fc55..e530027 100644 --- a/disk/src/aggtest.rs +++ b/disk/src/aggtest.rs @@ -20,7 +20,6 @@ pub fn make_test_node(id: u32) -> Node { }), archiver_appliance: None, channel_archiver: None, - access_scylla: false, } } @@ -40,6 +39,7 @@ async fn agg_x_dim_0_inner() { channel: Channel { backend: "sf-databuffer".into(), name: "S10BC01-DBAM070:EOM1_T1".into(), + series: None, }, keyspace: 2, time_bin_size: Nanos { ns: DAY }, @@ -94,6 +94,7 @@ async fn agg_x_dim_1_inner() { channel: Channel { backend: "ks".into(), name: "wave1".into(), + series: None, }, keyspace: 3, time_bin_size: Nanos { ns: DAY }, diff --git a/disk/src/binned.rs b/disk/src/binned.rs index 4de12dc..6bf5c5f 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -54,7 +54,7 @@ impl ChannelExecFunction for BinnedBinaryChannelExec { fn exec( self, _byte_order: END, - _scalar_type: ScalarType, + scalar_type: ScalarType, shape: Shape, event_value_shape: EVS, _events_node_proc: ENP, @@ -93,6 +93,7 @@ impl ChannelExecFunction for BinnedBinaryChannelExec { PreBinnedPatchIterator::from_range(pre_range), self.query.channel().clone(), range.clone(), + scalar_type, shape, self.query.agg_kind().clone(), self.query.cache_usage().clone(), @@ -147,12 +148,16 @@ impl ChannelExecFunction for BinnedBinaryChannelExec { pub async fn binned_bytes_for_http( query: &BinnedQuery, + scalar_type: ScalarType, + shape: Shape, node_config: &NodeConfigCached, ) -> Result> + Send>>, Error> { let ret = channel_exec( BinnedBinaryChannelExec::new(query.clone(), node_config.clone()), query.channel(), query.range(), + scalar_type, + shape, query.agg_kind().clone(), node_config, ) @@ -306,7 +311,7 @@ impl ChannelExecFunction for BinnedJsonChannelExec { fn exec( self, _byte_order: END, - _scalar_type: ScalarType, + scalar_type: ScalarType, shape: Shape, event_value_shape: EVS, _events_node_proc: ENP, @@ -346,6 +351,7 @@ impl ChannelExecFunction for BinnedJsonChannelExec { PreBinnedPatchIterator::from_range(pre_range), self.query.channel().clone(), range.clone(), + scalar_type, shape, self.query.agg_kind().clone(), self.query.cache_usage().clone(), @@ -400,12 +406,16 @@ impl ChannelExecFunction for BinnedJsonChannelExec { pub async fn binned_json( query: &BinnedQuery, + scalar_type: ScalarType, + shape: Shape, node_config: &NodeConfigCached, ) -> Result> + Send>>, Error> { let ret = channel_exec( BinnedJsonChannelExec::new(query.clone(), query.timeout(), node_config.clone()), query.channel(), query.range(), + scalar_type, + shape, query.agg_kind().clone(), node_config, ) diff --git a/disk/src/binned/binnedfrompbv.rs b/disk/src/binned/binnedfrompbv.rs index 1db22f6..fbdf262 100644 --- a/disk/src/binned/binnedfrompbv.rs +++ b/disk/src/binned/binnedfrompbv.rs @@ -12,7 +12,7 @@ use netpod::log::*; use netpod::query::CacheUsage; use netpod::{ x_bin_count, AggKind, AppendToUrl, BinnedRange, ByteSize, Channel, NodeConfigCached, PerfOpts, - PreBinnedPatchIterator, Shape, + PreBinnedPatchIterator, ScalarType, Shape, }; use serde::de::DeserializeOwned; use std::future::ready; @@ -164,6 +164,7 @@ where patch_it: PreBinnedPatchIterator, channel: Channel, range: BinnedRange, + scalar_type: ScalarType, shape: Shape, agg_kind: AggKind, cache_usage: CacheUsage, @@ -185,12 +186,15 @@ where let pmax = patches.len(); let inp = futures_util::stream::iter(patches.into_iter().enumerate()) .map({ + let shape = shape.clone(); let agg_kind = agg_kind.clone(); let node_config = node_config.clone(); move |(pix, patch)| { let query = PreBinnedQuery::new( patch, channel.clone(), + scalar_type.clone(), + shape.clone(), agg_kind.clone(), cache_usage.clone(), disk_io_buffer_size, diff --git a/disk/src/binned/pbv.rs b/disk/src/binned/pbv.rs index bb41b7f..e39816d 100644 --- a/disk/src/binned/pbv.rs +++ b/disk/src/binned/pbv.rs @@ -17,7 +17,7 @@ use items::{ use netpod::log::*; use netpod::query::{CacheUsage, RawEventsQuery}; use netpod::{ - x_bin_count, AggKind, BinnedRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, PreBinnedPatchRange, Shape, + x_bin_count, AggKind, BinnedRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, PreBinnedPatchRange, }; use serde::de::DeserializeOwned; use serde::Serialize; @@ -37,7 +37,6 @@ where ENP: EventsNodeProcessor>::Batch>, { query: PreBinnedQuery, - shape: Shape, agg_kind: AggKind, node_config: NodeConfigCached, open_check_local_file: Option> + Send>>>, @@ -71,10 +70,9 @@ where // TODO who exactly needs this DeserializeOwned? Sitemty<<::Output as TimeBinnableType>::Output>: FrameType + DeserializeOwned, { - pub fn new(query: PreBinnedQuery, shape: Shape, agg_kind: AggKind, node_config: &NodeConfigCached) -> Self { + pub fn new(query: PreBinnedQuery, agg_kind: AggKind, node_config: &NodeConfigCached) -> Self { Self { query, - shape, agg_kind, node_config: node_config.clone(), open_check_local_file: None, @@ -133,7 +131,7 @@ where let ret = TBinnerStream::<_, ::Output>::new( s, range, - x_bin_count(&self.shape, &self.agg_kind), + x_bin_count(&self.query.shape().clone(), &self.agg_kind), self.agg_kind.do_time_weighted(), ); Ok(Box::pin(ret)) @@ -180,6 +178,8 @@ where let query = PreBinnedQuery::new( patch, q2.channel().clone(), + q2.scalar_type().clone(), + q2.shape().clone(), q2.agg_kind().clone(), q2.cache_usage().clone(), disk_io_buffer_size, diff --git a/disk/src/binned/prebinned.rs b/disk/src/binned/prebinned.rs index b15d633..17c638f 100644 --- a/disk/src/binned/prebinned.rs +++ b/disk/src/binned/prebinned.rs @@ -13,13 +13,13 @@ use items::numops::{BoolNum, NumOps, StringNum}; use items::{ Appendable, Clearable, EventsNodeProcessor, Framable, FrameType, PushableIndex, Sitemty, TimeBinnableType, }; -use netpod::{AggKind, ByteOrder, ChannelConfigQuery, NodeConfigCached, ScalarType, Shape}; +use netpod::{AggKind, ByteOrder, NodeConfigCached, ScalarType, Shape}; use serde::de::DeserializeOwned; use serde::Serialize; use std::pin::Pin; fn make_num_pipeline_nty_end_evs_enp( - shape: Shape, + _shape: Shape, agg_kind: AggKind, _event_value_shape: EVS, _events_node_proc: ENP, @@ -36,7 +36,7 @@ where Sitemty<<::Output as TimeBinnableType>::Output>: Framable + FrameType + DeserializeOwned, { - let ret = PreBinnedValueStream::::new(query, shape, agg_kind, node_config); + let ret = PreBinnedValueStream::::new(query, agg_kind, node_config); let ret = StreamExt::map(ret, |item| Box::new(item) as Box); Box::pin(ret) } @@ -138,9 +138,6 @@ macro_rules! match_end { }; } -// TODO is the distinction on byte order necessary here? -// We should rely on the "events" http api to deliver data, and the cache, both -// of those have fixed endianness. fn make_num_pipeline( scalar_type: ScalarType, byte_order: ByteOrder, @@ -185,17 +182,11 @@ pub async fn pre_binned_bytes_for_http( )); return Err(err); } - let q = ChannelConfigQuery { - channel: query.channel().clone(), - range: query.patch().patch_range(), - expand: query.agg_kind().need_expand(), - }; - let conf = httpclient::get_channel_config(&q, node_config).await?; let ret = make_num_pipeline( - conf.scalar_type.clone(), + query.scalar_type().clone(), // TODO actually, make_num_pipeline should not depend on endianness. - conf.byte_order.unwrap_or(ByteOrder::LE).clone(), - conf.shape.clone(), + ByteOrder::LE, + query.shape().clone(), query.agg_kind().clone(), query.clone(), node_config, diff --git a/disk/src/binned/query.rs b/disk/src/binned/query.rs index d8164b6..98e5778 100644 --- a/disk/src/binned/query.rs +++ b/disk/src/binned/query.rs @@ -1,7 +1,10 @@ use err::Error; use http::request::Parts; use netpod::query::{agg_kind_from_binning_scheme, binning_scheme_append_to_url, CacheUsage}; -use netpod::{channel_from_pairs, AggKind, AppendToUrl, ByteSize, Channel, PreBinnedPatchCoord}; +use netpod::{ + channel_append_to_url, channel_from_pairs, AggKind, AppendToUrl, ByteSize, Channel, PreBinnedPatchCoord, + ScalarType, Shape, +}; use std::collections::BTreeMap; use url::Url; @@ -10,6 +13,8 @@ pub struct PreBinnedQuery { patch: PreBinnedPatchCoord, agg_kind: AggKind, channel: Channel, + scalar_type: ScalarType, + shape: Shape, cache_usage: CacheUsage, disk_io_buffer_size: usize, disk_stats_every: ByteSize, @@ -20,6 +25,8 @@ impl PreBinnedQuery { pub fn new( patch: PreBinnedPatchCoord, channel: Channel, + scalar_type: ScalarType, + shape: Shape, agg_kind: AggKind, cache_usage: CacheUsage, disk_io_buffer_size: usize, @@ -28,8 +35,10 @@ impl PreBinnedQuery { ) -> Self { Self { patch, - agg_kind, channel, + scalar_type, + shape, + agg_kind, cache_usage, disk_io_buffer_size, disk_stats_every, @@ -45,25 +54,35 @@ impl PreBinnedQuery { let pairs = pairs; let bin_t_len = pairs .get("binTlen") - .ok_or(Error::with_msg("missing binTlen"))? + .ok_or_else(|| Error::with_msg("missing binTlen"))? .parse()?; let patch_t_len = pairs .get("patchTlen") - .ok_or(Error::with_msg("missing patchTlen"))? + .ok_or_else(|| Error::with_msg("missing patchTlen"))? .parse()?; let patch_ix = pairs .get("patchIx") - .ok_or(Error::with_msg("missing patchIx"))? + .ok_or_else(|| Error::with_msg("missing patchIx"))? .parse()?; let disk_stats_every = pairs .get("diskStatsEveryKb") - .ok_or(Error::with_msg("missing diskStatsEveryKb"))?; + .ok_or_else(|| Error::with_msg("missing diskStatsEveryKb"))?; let disk_stats_every = disk_stats_every .parse() .map_err(|e| Error::with_msg(format!("can not parse diskStatsEveryKb {:?}", e)))?; + let scalar_type = pairs + .get("scalarType") + .ok_or_else(|| Error::with_msg("missing scalarType")) + .map(|x| ScalarType::from_url_str(&x))??; + let shape = pairs + .get("shape") + .ok_or_else(|| Error::with_msg("missing shape")) + .map(|x| Shape::from_url_str(&x))??; let ret = Self { patch: PreBinnedPatchCoord::new(bin_t_len, patch_t_len, patch_ix), channel: channel_from_pairs(&pairs)?, + scalar_type, + shape, agg_kind: agg_kind_from_binning_scheme(&pairs).unwrap_or(AggKind::DimXBins1), cache_usage: CacheUsage::from_pairs(&pairs)?, disk_io_buffer_size: pairs @@ -99,6 +118,14 @@ impl PreBinnedQuery { &self.channel } + pub fn scalar_type(&self) -> &ScalarType { + &self.scalar_type + } + + pub fn shape(&self) -> &Shape { + &self.shape + } + pub fn agg_kind(&self) -> &AggKind { &self.agg_kind } @@ -120,9 +147,10 @@ impl AppendToUrl for PreBinnedQuery { fn append_to_url(&self, url: &mut Url) { self.patch.append_to_url(url); binning_scheme_append_to_url(&self.agg_kind, url); + channel_append_to_url(url, &self.channel); let mut g = url.query_pairs_mut(); - g.append_pair("channelBackend", &self.channel.backend); - g.append_pair("channelName", &self.channel.name); + g.append_pair("scalarType", &format!("{:?}", self.scalar_type)); + g.append_pair("shape", &format!("{:?}", self.shape)); g.append_pair("cacheUsage", &format!("{}", self.cache_usage.query_param_value())); g.append_pair("diskIoBufferSize", &format!("{}", self.disk_io_buffer_size)); g.append_pair("diskStatsEveryKb", &format!("{}", self.disk_stats_every.bytes() / 1024)); diff --git a/disk/src/channelexec.rs b/disk/src/channelexec.rs index fd481ba..e3e4ddc 100644 --- a/disk/src/channelexec.rs +++ b/disk/src/channelexec.rs @@ -197,29 +197,20 @@ pub async fn channel_exec( f: F, channel: &Channel, range: &NanoRange, + scalar_type: ScalarType, + shape: Shape, agg_kind: AggKind, node_config: &NodeConfigCached, ) -> Result where F: ChannelExecFunction, { - let q = ChannelConfigQuery { - channel: channel.clone(), - range: range.clone(), - expand: agg_kind.need_expand(), - }; - let conf = httpclient::get_channel_config(&q, node_config).await.map_err(|e| { - e.add_public_msg(format!( - "Can not find channel config for channel: {:?}", - q.channel.name() - )) - })?; let ret = channel_exec_config( f, - conf.scalar_type.clone(), - // TODO is the byte order ever important here? - conf.byte_order.unwrap_or(ByteOrder::LE).clone(), - conf.shape.clone(), + scalar_type, + // TODO TODO TODO is the byte order ever important here? + ByteOrder::LE, + shape, agg_kind, node_config, )?; diff --git a/disk/src/dataopen.rs b/disk/src/dataopen.rs index c6d5b40..e05e6e1 100644 --- a/disk/src/dataopen.rs +++ b/disk/src/dataopen.rs @@ -815,6 +815,7 @@ mod test { let chn = netpod::Channel { backend: "testbackend".into(), name: "scalar-i32-be".into(), + series: None, }; // TODO read config from disk? Or expose the config from data generator? let channel_config = ChannelConfig { diff --git a/disk/src/eventblobs.rs b/disk/src/eventblobs.rs index 9a2beef..676548a 100644 --- a/disk/src/eventblobs.rs +++ b/disk/src/eventblobs.rs @@ -250,6 +250,7 @@ mod test { let chn = netpod::Channel { backend: "testbackend".into(), name: "scalar-i32-be".into(), + series: None, }; // TODO read config from disk. let channel_config = ChannelConfig { diff --git a/disk/src/events.rs b/disk/src/events.rs index bd6e5d7..79e8446 100644 --- a/disk/src/events.rs +++ b/disk/src/events.rs @@ -1,8 +1,7 @@ use chrono::{DateTime, TimeZone, Utc}; use err::Error; -use netpod::{ - channel_from_pairs, get_url_query_pairs, AppendToUrl, Channel, FromUrl, HasBackend, HasTimeout, NanoRange, ToNanos, -}; +use netpod::{channel_append_to_url, channel_from_pairs, get_url_query_pairs}; +use netpod::{AppendToUrl, Channel, FromUrl, HasBackend, HasTimeout, NanoRange, ToNanos}; use std::time::Duration; use url::Url; @@ -32,11 +31,11 @@ impl PlainEventsBinaryQuery { let beg_date = pairs.get("begDate").ok_or(Error::with_msg("missing begDate"))?; let end_date = pairs.get("endDate").ok_or(Error::with_msg("missing endDate"))?; let ret = Self { + channel: channel_from_pairs(&pairs)?, range: NanoRange { beg: beg_date.parse::>()?.to_nanos(), end: end_date.parse::>()?.to_nanos(), }, - channel: channel_from_pairs(&pairs)?, disk_io_buffer_size: pairs .get("diskIoBufferSize") .map_or("4096", |k| k) @@ -85,9 +84,8 @@ impl PlainEventsBinaryQuery { impl AppendToUrl for PlainEventsBinaryQuery { fn append_to_url(&self, url: &mut Url) { let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ"; + channel_append_to_url(url, &self.channel); let mut g = url.query_pairs_mut(); - g.append_pair("channelBackend", &self.channel.backend); - g.append_pair("channelName", &self.channel.name); g.append_pair( "begDate", &Utc.timestamp_nanos(self.range.beg as i64).format(date_fmt).to_string(), @@ -137,11 +135,11 @@ impl PlainEventsJsonQuery { let beg_date = pairs.get("begDate").ok_or(Error::with_public_msg("missing begDate"))?; let end_date = pairs.get("endDate").ok_or(Error::with_public_msg("missing endDate"))?; let ret = Self { + channel: channel_from_pairs(&pairs)?, range: NanoRange { beg: beg_date.parse::>()?.to_nanos(), end: end_date.parse::>()?.to_nanos(), }, - channel: channel_from_pairs(&pairs)?, disk_io_buffer_size: pairs .get("diskIoBufferSize") .map_or("4096", |k| k) @@ -176,14 +174,14 @@ impl PlainEventsJsonQuery { Self::from_url(&url) } - pub fn range(&self) -> &NanoRange { - &self.range - } - pub fn channel(&self) -> &Channel { &self.channel } + pub fn range(&self) -> &NanoRange { + &self.range + } + pub fn report_error(&self) -> bool { self.report_error } @@ -210,9 +208,8 @@ impl PlainEventsJsonQuery { pub fn append_to_url(&self, url: &mut Url) { let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ"; + channel_append_to_url(url, &self.channel); let mut g = url.query_pairs_mut(); - g.append_pair("channelBackend", &self.channel.backend); - g.append_pair("channelName", &self.channel.name); g.append_pair( "begDate", &Utc.timestamp_nanos(self.range.beg as i64).format(date_fmt).to_string(), diff --git a/disk/src/frame/inmem.rs b/disk/src/frame/inmem.rs index ca6b21d..6fcf968 100644 --- a/disk/src/frame/inmem.rs +++ b/disk/src/frame/inmem.rs @@ -103,7 +103,6 @@ where buf: BytesMut, wp: usize, ) -> (Option>>, BytesMut, usize) { - let mut buf = buf; let nb = wp; if nb >= INMEM_FRAME_HEAD { let magic = u32::from_le_bytes(*arrayref::array_ref![buf, 0, 4]); @@ -162,16 +161,20 @@ where let payload_crc_match = payload_crc_ind == payload_crc; let frame_crc_match = frame_crc_ind == frame_crc; if !payload_crc_match || !frame_crc_match { + let ss = String::from_utf8_lossy(&buf[..buf.len().min(256)]); + warn!("CRC mismatch A\n{ss:?}"); return ( Some(Some(Err(Error::with_msg(format!( - "InMemoryFrameAsyncReadStream tryparse crc mismatch {} {}", + "InMemoryFrameAsyncReadStream tryparse crc mismatch A {} {}", payload_crc_match, frame_crc_match, ))))), buf, wp, ); } + let mut buf = buf; let mut buf3 = buf.split_to(nl); + let buf = buf; buf3.advance(INMEM_FRAME_HEAD); buf3.truncate(len as usize); let mut h = crc32fast::Hasher::new(); @@ -179,9 +182,12 @@ where let payload_crc_2 = h.finalize(); let payload_crc_2_match = payload_crc_2 == payload_crc_ind; if !payload_crc_2_match { + let sa = String::from_utf8_lossy(&buf[..buf.len().min(256)]); + let sb = String::from_utf8_lossy(&buf3[..buf3.len().min(256)]); + warn!("CRC mismatch B\n{sa:?}\n{sb:?}"); return ( Some(Some(Err(Error::with_msg(format!( - "InMemoryFrameAsyncReadStream tryparse crc mismatch {} {} {}", + "InMemoryFrameAsyncReadStream tryparse crc mismatch B {} {} {}", payload_crc_match, frame_crc_match, payload_crc_2_match, ))))), buf, diff --git a/disk/src/gen.rs b/disk/src/gen.rs index da3cd39..47dcdd9 100644 --- a/disk/src/gen.rs +++ b/disk/src/gen.rs @@ -31,6 +31,7 @@ pub async fn gen_test_data() -> Result<(), Error> { channel: Channel { backend: "testbackend".into(), name: "scalar-i32-be".into(), + series: None, }, keyspace: 2, time_bin_size: Nanos { ns: DAY }, @@ -49,6 +50,7 @@ pub async fn gen_test_data() -> Result<(), Error> { channel: Channel { backend: "testbackend".into(), name: "wave-f64-be-n21".into(), + series: None, }, keyspace: 3, time_bin_size: Nanos { ns: DAY }, @@ -67,6 +69,7 @@ pub async fn gen_test_data() -> Result<(), Error> { channel: Channel { backend: "testbackend".into(), name: "wave-u16-le-n77".into(), + series: None, }, keyspace: 3, time_bin_size: Nanos { ns: DAY }, @@ -85,6 +88,7 @@ pub async fn gen_test_data() -> Result<(), Error> { channel: Channel { backend: "testbackend".into(), name: "tw-scalar-i32-be".into(), + series: None, }, keyspace: 2, time_bin_size: Nanos { ns: DAY }, @@ -103,6 +107,7 @@ pub async fn gen_test_data() -> Result<(), Error> { channel: Channel { backend: "testbackend".into(), name: "const-regular-scalar-i32-be".into(), + series: None, }, keyspace: 2, time_bin_size: Nanos { ns: DAY }, @@ -131,7 +136,6 @@ pub async fn gen_test_data() -> Result<(), Error> { }), archiver_appliance: None, channel_archiver: None, - access_scylla: false, }; ensemble.nodes.push(node); } diff --git a/disk/src/merge.rs b/disk/src/merge.rs index 56abddb..38b9f07 100644 --- a/disk/src/merge.rs +++ b/disk/src/merge.rs @@ -345,6 +345,7 @@ mod test { channel: Channel { backend: "testbackend".into(), name: "scalar-i32-be".into(), + series: None, }, keyspace: 2, time_bin_size: Nanos { ns: DAY }, diff --git a/dq/Cargo.toml b/dq/Cargo.toml index e1a1951..7a5a029 100644 --- a/dq/Cargo.toml +++ b/dq/Cargo.toml @@ -10,7 +10,7 @@ path = "src/dq.rs" [dependencies] #serde = { version = "1.0", features = ["derive"] } #serde_json = "1.0" -tokio = { version = "=1.16.1", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } +tokio = { version = "1.18.1", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } futures-util = "0.3.14" clap = { version = "3.0.6", features = ["derive", "cargo"] } chrono = "0.4.19" diff --git a/dq/src/bin/dq.rs b/dq/src/bin/dq.rs index 66aaf5c..a065df8 100644 --- a/dq/src/bin/dq.rs +++ b/dq/src/bin/dq.rs @@ -79,6 +79,7 @@ pub fn main() -> Result<(), Error> { channel: Channel { backend: String::new(), name: config.channel_name.clone(), + series: None, }, keyspace: ce.ks as u8, time_bin_size: ce.bs, diff --git a/dq/src/dq.rs b/dq/src/dq.rs index ab2a0fd..b0f5daa 100644 --- a/dq/src/dq.rs +++ b/dq/src/dq.rs @@ -337,6 +337,7 @@ pub async fn convert(convert_params: ConvertParams) -> Result<(), Error> { let channel = Channel { backend: String::new(), name: convert_params.channel_name.into(), + series: None, }; let mut data_writer = DataWriter::new( convert_params.output_dir, diff --git a/httpret/Cargo.toml b/httpret/Cargo.toml index 6a6a468..fc5eb36 100644 --- a/httpret/Cargo.toml +++ b/httpret/Cargo.toml @@ -26,6 +26,7 @@ chrono = "0.4.19" err = { path = "../err" } netpod = { path = "../netpod" } dbconn = { path = "../dbconn" } +tokio-postgres = { version = "0.7.6", features = ["runtime", "with-chrono-0_4", "with-serde_json-1"] } disk = { path = "../disk" } items = { path = "../items" } parse = { path = "../parse" } diff --git a/httpret/src/api1.rs b/httpret/src/api1.rs index 0920e63..eca3c17 100644 --- a/httpret/src/api1.rs +++ b/httpret/src/api1.rs @@ -930,6 +930,7 @@ impl Api1EventsBinaryHandler { .map(|x| Channel { backend: backend.into(), name: x.clone(), + series: None, }) .collect(); // TODO use a better stream protocol with built-in error delivery. diff --git a/httpret/src/channelarchiver.rs b/httpret/src/channelarchiver.rs index 0ced47b..b8022a9 100644 --- a/httpret/src/channelarchiver.rs +++ b/httpret/src/channelarchiver.rs @@ -270,6 +270,7 @@ impl BlockRefStream { let channel = Channel { backend: "".into(), name: channel_name, + series: None, //name: "ARIDI-PCT:CURRENT".into(), }; use archapp_wrap::archapp::archeng; @@ -335,6 +336,7 @@ impl BlockStream { let channel = Channel { backend: node_config.node_config.cluster.backend.clone(), name: channel_name, + series: None, }; use archapp_wrap::archapp::archeng; let ixpaths = archeng::indexfiles::index_file_path_list(channel.clone(), database.clone()).await?; diff --git a/httpret/src/channelconfig.rs b/httpret/src/channelconfig.rs index 3e01d8e..25f3822 100644 --- a/httpret/src/channelconfig.rs +++ b/httpret/src/channelconfig.rs @@ -1,19 +1,89 @@ -use std::collections::BTreeMap; - use crate::err::Error; use crate::{response, ToPublicResponse}; +use dbconn::create_connection; +use disk::binned::query::PreBinnedQuery; +use disk::events::{PlainEventsBinaryQuery, PlainEventsJsonQuery}; use http::{Method, Request, Response, StatusCode}; use hyper::Body; use netpod::log::*; -use netpod::{get_url_query_pairs, Channel, ChannelConfigQuery, FromUrl, ScalarType, ScyllaConfig, Shape}; +use netpod::query::BinnedQuery; +use netpod::{get_url_query_pairs, Channel, ChannelConfigQuery, Database, FromUrl, ScalarType, ScyllaConfig, Shape}; use netpod::{ChannelConfigResponse, NodeConfigCached}; use netpod::{ACCEPT_ALL, APP_JSON}; use scylla::batch::Consistency; use scylla::frame::response::cql_to_rust::FromRowError as ScyFromRowError; use scylla::transport::errors::{NewSessionError as ScyNewSessionError, QueryError as ScyQueryError}; use serde::{Deserialize, Serialize}; +use std::collections::BTreeMap; use url::Url; +pub struct ChConf { + pub scalar_type: ScalarType, + pub shape: Shape, +} + +pub async fn chconf_from_events_binary(_q: &PlainEventsBinaryQuery, _conf: &NodeConfigCached) -> Result { + err::todoval() +} + +pub async fn chconf_from_events_json(q: &PlainEventsJsonQuery, ncc: &NodeConfigCached) -> Result { + if q.channel().backend != ncc.node_config.cluster.backend { + warn!( + "Mismatched backend {} VS {}", + q.channel().backend, + ncc.node_config.cluster.backend + ); + } + if let Some(_conf) = &ncc.node_config.cluster.scylla { + // This requires the series id. + let series = q + .channel() + .series + .ok_or_else(|| Error::with_msg_no_trace(format!("needs a series id")))?; + // TODO use a common already running worker pool for these queries: + let dbconf = &ncc.node_config.cluster.database; + let dburl = format!( + "postgresql://{}:{}@{}:{}/{}", + dbconf.user, dbconf.pass, dbconf.host, dbconf.port, dbconf.name + ); + let (pgclient, pgconn) = tokio_postgres::connect(&dburl, tokio_postgres::NoTls) + .await + .err_conv()?; + tokio::spawn(pgconn); + let res = pgclient + .query( + "select scalar_type, shape_dims from series_by_channel where series = $1", + &[&(series as i64)], + ) + .await + .err_conv()?; + if res.len() == 0 { + error!("can not find channel for series {series}"); + err::todoval() + } else if res.len() > 1 { + error!("can not find channel for series {series}"); + err::todoval() + } else { + let row = res.first().unwrap(); + let scalar_type = ScalarType::from_dtype_index(row.get::<_, i32>(0) as u8)?; + // TODO can I get a slice from psql driver? + let shape = Shape::from_scylla_shape_dims(&row.get::<_, Vec>(1))?; + let ret = ChConf { scalar_type, shape }; + Ok(ret) + } + } else { + err::todoval() + } +} + +pub async fn chconf_from_prebinned(_q: &PreBinnedQuery, _conf: &NodeConfigCached) -> Result { + err::todoval() +} + +pub async fn chconf_from_binned(_q: &BinnedQuery, _conf: &NodeConfigCached) -> Result { + err::todoval() +} + pub struct ChannelConfigHandler {} impl ChannelConfigHandler { @@ -80,15 +150,25 @@ impl ErrConv for Result { } } +impl ErrConv for Result { + fn err_conv(self) -> Result { + match self { + Ok(k) => Ok(k), + Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))), + } + } +} + async fn config_from_scylla( chq: ChannelConfigQuery, - scyco: &ScyllaConfig, + _pgconf: Database, + scyconf: ScyllaConfig, _node_config: &NodeConfigCached, ) -> Result { // Find the "series" id. let scy = scylla::SessionBuilder::new() - .known_nodes(&scyco.hosts) - .use_keyspace(&scyco.keyspace, true) + .known_nodes(&scyconf.hosts) + .use_keyspace(&scyconf.keyspace, true) .default_consistency(Consistency::One) .build() .await @@ -130,14 +210,9 @@ pub async fn channel_config(req: Request, node_config: &NodeConfigCached) let url = Url::parse(&format!("dummy:{}", req.uri()))?; let q = ChannelConfigQuery::from_url(&url)?; info!("channel_config for q {q:?}"); - let conf = if q.channel.backend == "scylla" { - let scyco = node_config - .node_config - .cluster - .scylla - .as_ref() - .ok_or_else(|| Error::with_public_msg_no_trace(format!("No Scylla configured")))?; - config_from_scylla(q, scyco, node_config).await? + let conf = if let Some(scyco) = &node_config.node_config.cluster.scylla { + let pgconf = node_config.node_config.cluster.database.clone(); + config_from_scylla(q, pgconf, scyco.clone(), node_config).await? } else if let Some(conf) = &node_config.node.channel_archiver { archapp_wrap::archapp::archeng::channel_config_from_db(&q, conf, &node_config.node_config.cluster.database) .await? @@ -326,10 +401,11 @@ impl ScyllaChannelsWithType { .err_conv()?; let mut list = Vec::new(); for row in res.rows_typed_or_empty::<(String, i64)>() { - let (channel_name, _series) = row.err_conv()?; + let (channel_name, series) = row.err_conv()?; let ch = Channel { backend: facility.into(), name: channel_name, + series: Some(series as u64), }; list.push(ch); } @@ -337,3 +413,300 @@ impl ScyllaChannelsWithType { Ok(ret) } } + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ScyllaChannelsActiveQuery { + tsedge: u64, + shape_kind: u32, + scalar_type: ScalarType, +} + +impl FromUrl for ScyllaChannelsActiveQuery { + fn from_url(url: &Url) -> Result { + let pairs = get_url_query_pairs(url); + let s = pairs + .get("tsedge") + .ok_or_else(|| Error::with_public_msg_no_trace("missing tsedge"))?; + let tsedge: u64 = s.parse()?; + let s = pairs + .get("shape_kind") + .ok_or_else(|| Error::with_public_msg_no_trace("missing shape_kind"))?; + let shape_kind: u32 = s.parse()?; + let s = pairs + .get("scalar_type") + .ok_or_else(|| Error::with_public_msg_no_trace("missing scalar_type"))?; + let scalar_type: ScalarType = serde_json::from_str(&format!("\"{s}\""))?; + info!("parsed scalar type inp: {s:?} val: {scalar_type:?}"); + Ok(Self { + tsedge, + scalar_type, + shape_kind, + }) + } +} + +pub struct ScyllaChannelsActive {} + +impl ScyllaChannelsActive { + pub fn handler(req: &Request) -> Option { + if req.uri().path() == "/api/4/scylla/channels/active" { + Some(Self {}) + } else { + None + } + } + + pub async fn handle(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { + if req.method() == Method::GET { + let accept_def = APP_JSON; + let accept = req + .headers() + .get(http::header::ACCEPT) + .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); + if accept == APP_JSON || accept == ACCEPT_ALL { + let url = Url::parse(&format!("dummy:{}", req.uri()))?; + let q = ScyllaChannelsActiveQuery::from_url(&url)?; + let res = self.get_channels(&q, node_config).await?; + let body = Body::from(serde_json::to_vec(&res)?); + Ok(response(StatusCode::OK).body(body)?) + } else { + Ok(response(StatusCode::BAD_REQUEST).body(Body::empty())?) + } + } else { + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) + } + } + + async fn get_channels( + &self, + q: &ScyllaChannelsActiveQuery, + node_config: &NodeConfigCached, + ) -> Result, Error> { + let scyco = node_config + .node_config + .cluster + .scylla + .as_ref() + .ok_or_else(|| Error::with_public_msg_no_trace(format!("No Scylla configured")))?; + let scy = scylla::SessionBuilder::new() + .known_nodes(&scyco.hosts) + .use_keyspace(&scyco.keyspace, true) + .default_consistency(Consistency::One) + .build() + .await + .err_conv()?; + // Database stores tsedge/ts_msp in units of (10 sec), and we additionally map to the grid. + let tsedge = q.tsedge / 10 / (6 * 2) * (6 * 2); + info!( + "ScyllaChannelsActive::get_channels tsedge {} (10s) {} (s)", + tsedge, + tsedge * 10 + ); + let mut ret = Vec::new(); + for part in 0..256 { + let mut res = scy + .query_iter( + "select series from series_by_ts_msp where part = ? and ts_msp = ? and shape_kind = ? and scalar_type = ?", + (part as i32, tsedge as i32, q.shape_kind as i32, q.scalar_type.to_scylla_i32()), + ) + .await + .err_conv()?; + use futures_util::StreamExt; + while let Some(row) = res.next().await { + let row = row.err_conv()?; + let (series,): (i64,) = row.into_typed().err_conv()?; + if series == 1254561075907984640 { + info!( + "FOUND ACTIVE series {} part {} tsedge {} scalar_type {}", + series, + part, + tsedge, + q.scalar_type.to_scylla_i32() + ); + } + ret.push(series as u64); + } + } + Ok(ret) + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ChannelFromSeriesQuery { + series: u64, +} + +impl FromUrl for ChannelFromSeriesQuery { + fn from_url(url: &Url) -> Result { + let pairs = get_url_query_pairs(url); + let s = pairs + .get("series") + .ok_or_else(|| Error::with_public_msg_no_trace("missing series"))?; + let series: u64 = s.parse()?; + Ok(Self { series }) + } +} + +#[derive(Clone, Debug, Serialize)] +pub struct ChannelFromSeriesResponse { + facility: String, + channel: String, + scalar_type: ScalarType, + shape: Shape, + // TODO need a unique representation of the agg kind in the registry. + agg_kind: u32, +} + +pub struct ChannelFromSeries {} + +impl ChannelFromSeries { + pub fn handler(req: &Request) -> Option { + if req.uri().path() == "/api/4/channel/series" { + Some(Self {}) + } else { + None + } + } + + pub async fn handle(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { + if req.method() == Method::GET { + let accept_def = APP_JSON; + let accept = req + .headers() + .get(http::header::ACCEPT) + .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); + if accept == APP_JSON || accept == ACCEPT_ALL { + let url = Url::parse(&format!("dummy:{}", req.uri()))?; + let q = ChannelFromSeriesQuery::from_url(&url)?; + let res = self.get_data(&q, node_config).await?; + let body = Body::from(serde_json::to_vec(&res)?); + Ok(response(StatusCode::OK).body(body)?) + } else { + Ok(response(StatusCode::BAD_REQUEST).body(Body::empty())?) + } + } else { + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) + } + } + + async fn get_data( + &self, + q: &ChannelFromSeriesQuery, + node_config: &NodeConfigCached, + ) -> Result { + let series = q.series as i64; + //let pgconn = create_connection(&node_config.node_config.cluster.database).await?; + let dbconf = &node_config.node_config.cluster.database; + // TODO unify the database nodes + let uri = format!( + "postgresql://{}:{}@{}:{}/{}", + dbconf.user, dbconf.pass, dbconf.host, 5432, dbconf.name + ); + let (pgclient, conn) = tokio_postgres::connect(&uri, tokio_postgres::NoTls).await.err_conv()?; + // TODO monitor connection drop. + let _cjh = tokio::spawn(async move { + if let Err(e) = conn.await { + error!("connection error: {}", e); + } + Ok::<_, Error>(()) + }); + let res = pgclient + .query( + "select facility, channel, scalar_type, shape_dims, agg_kind from series_by_channel where series = $1", + &[&series], + ) + .await?; + let res = if let Some(row) = res.first() { + row + } else { + // TODO return code 204 + return Err(Error::with_msg_no_trace("can not find series")); + }; + let facility: String = res.get(0); + let channel: String = res.get(1); + let scalar_type: i32 = res.get(2); + // TODO check and document the format in the storage: + let scalar_type = ScalarType::from_dtype_index(scalar_type as u8)?; + let shape: Vec = res.get(3); + let shape = Shape::from_scylla_shape_dims(&shape)?; + let agg_kind: i32 = res.get(4); + // TODO method is called from_scylla_shape_dims but document that postgres uses the same format. + let ret = ChannelFromSeriesResponse { + facility, + channel, + scalar_type, + shape, + agg_kind: agg_kind as u32, + }; + Ok(ret) + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct IocForChannelQuery { + channel_regex: String, +} + +impl FromUrl for IocForChannelQuery { + fn from_url(url: &Url) -> Result { + let pairs = get_url_query_pairs(url); + let channel_regex = pairs + .get("channel_regex") + .ok_or_else(|| Error::with_public_msg_no_trace("missing channel_regex"))? + .into(); + Ok(Self { channel_regex }) + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ChannelIoc { + channel: String, + ioc: String, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct IocForChannelRes { + channels: Vec, +} + +pub struct IocForChannel {} + +impl IocForChannel { + pub fn handler(req: &Request) -> Option { + if req.uri().path() == "/api/4/ioc/channel" { + Some(Self {}) + } else { + None + } + } + + pub async fn handle(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { + if req.method() == Method::GET { + let accept_def = APP_JSON; + let accept = req + .headers() + .get(http::header::ACCEPT) + .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); + if accept == APP_JSON || accept == ACCEPT_ALL { + let url = Url::parse(&format!("dummy:{}", req.uri()))?; + let q = IocForChannelQuery::from_url(&url)?; + let res = self.find(&q, node_config).await?; + let body = Body::from(serde_json::to_vec(&res)?); + Ok(response(StatusCode::OK).body(body)?) + } else { + Ok(response(StatusCode::BAD_REQUEST).body(Body::empty())?) + } + } else { + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) + } + } + + async fn find(&self, q: &IocForChannelQuery, node_config: &NodeConfigCached) -> Result { + // TODO implement lookup in postgres + let _ = q; + let _pgconn = create_connection(&node_config.node_config.cluster.database).await?; + let _facility = "scylla"; + let ret = IocForChannelRes { channels: vec![] }; + Ok(ret) + } +} diff --git a/httpret/src/events.rs b/httpret/src/events.rs index c8cba14..c7cce38 100644 --- a/httpret/src/events.rs +++ b/httpret/src/events.rs @@ -1,3 +1,4 @@ +use crate::channelconfig::{chconf_from_events_binary, chconf_from_events_json}; use crate::err::Error; use crate::{response, response_err, BodyStream, ToPublicResponse}; use disk::events::{PlainEventsBinaryQuery, PlainEventsJsonQuery}; @@ -52,13 +53,23 @@ async fn plain_events_binary(req: Request, node_config: &NodeConfigCached) debug!("httpret plain_events_binary req: {:?}", req); let url = Url::parse(&format!("dummy:{}", req.uri()))?; let query = PlainEventsBinaryQuery::from_url(&url)?; + let chconf = chconf_from_events_binary(&query, node_config).await?; let op = disk::channelexec::PlainEvents::new( query.channel().clone(), query.range().clone(), query.disk_io_buffer_size(), node_config.clone(), ); - let s = disk::channelexec::channel_exec(op, query.channel(), query.range(), AggKind::Plain, node_config).await?; + let s = disk::channelexec::channel_exec( + op, + query.channel(), + query.range(), + chconf.scalar_type, + chconf.shape, + AggKind::Plain, + node_config, + ) + .await?; let s = s.map(|item| item.make_frame()); let ret = response(StatusCode::OK).body(BodyStream::wrapped( s.map_err(Error::from), @@ -71,6 +82,7 @@ async fn plain_events_json(req: Request, node_config: &NodeConfigCached) - info!("httpret plain_events_json req: {:?}", req); let (head, _body) = req.into_parts(); let query = PlainEventsJsonQuery::from_request_head(&head)?; + let chconf = chconf_from_events_json(&query, node_config).await?; let op = disk::channelexec::PlainEventsJson::new( query.channel().clone(), query.range().clone(), @@ -80,7 +92,16 @@ async fn plain_events_json(req: Request, node_config: &NodeConfigCached) - query.events_max().unwrap_or(u64::MAX), query.do_log(), ); - let s = disk::channelexec::channel_exec(op, query.channel(), query.range(), AggKind::Plain, node_config).await?; + let s = disk::channelexec::channel_exec( + op, + query.channel(), + query.range(), + chconf.scalar_type, + chconf.shape, + AggKind::Plain, + node_config, + ) + .await?; let ret = response(StatusCode::OK).body(BodyStream::wrapped( s.map_err(Error::from), format!("plain_events_json"), diff --git a/httpret/src/evinfo.rs b/httpret/src/evinfo.rs index cd60a75..f335366 100644 --- a/httpret/src/evinfo.rs +++ b/httpret/src/evinfo.rs @@ -1,3 +1,4 @@ +use crate::channelconfig::chconf_from_events_json; use crate::err::Error; use crate::response; use bytes::Bytes; @@ -73,6 +74,7 @@ impl EventInfoScan { query: &PlainEventsJsonQuery, node_config: &NodeConfigCached, ) -> Result> + Send>>, Error> { + let chconf = chconf_from_events_json(&query, node_config).await?; let ret = channel_exec( EvInfoFunc::new( query.clone(), @@ -82,6 +84,8 @@ impl EventInfoScan { ), query.channel(), query.range(), + chconf.scalar_type, + chconf.shape, AggKind::Stats1, node_config, ) diff --git a/httpret/src/httpret.rs b/httpret/src/httpret.rs index e532fe7..3c658de 100644 --- a/httpret/src/httpret.rs +++ b/httpret/src/httpret.rs @@ -15,6 +15,7 @@ use crate::err::Error; use crate::gather::gather_get_json; use crate::pulsemap::UpdateTask; use bytes::Bytes; +use channelconfig::{chconf_from_binned, ChConf}; use disk::binned::query::PreBinnedQuery; use future::Future; use futures_core::Stream; @@ -206,7 +207,7 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> let ret = serde_json::json!({ "data_api_version": { "major": 4u32, - "minor": 1u32, + "minor": 2u32, "patch": 0u32, }, }); @@ -228,6 +229,12 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> h.handle(req, &node_config).await } else if let Some(h) = channelconfig::ScyllaChannelsWithType::handler(&req) { h.handle(req, &node_config).await + } else if let Some(h) = channelconfig::IocForChannel::handler(&req) { + h.handle(req, &node_config).await + } else if let Some(h) = channelconfig::ScyllaChannelsActive::handler(&req) { + h.handle(req, &node_config).await + } else if let Some(h) = channelconfig::ChannelFromSeries::handler(&req) { + h.handle(req, &node_config).await } else if let Some(h) = events::EventsHandler::handler(&req) { h.handle(req, &node_config).await } else if path == "/api/4/binned" { @@ -527,20 +534,25 @@ async fn binned_inner(req: Request, node_config: &NodeConfigCached) -> Res let (head, _body) = req.into_parts(); let url = Url::parse(&format!("dummy:{}", head.uri))?; let query = BinnedQuery::from_url(&url)?; + let chconf = chconf_from_binned(&query, node_config).await?; let desc = format!("binned-BEG-{}-END-{}", query.range().beg / SEC, query.range().end / SEC); let span1 = span!(Level::INFO, "httpret::binned", desc = &desc.as_str()); span1.in_scope(|| { debug!("binned STARTING {:?}", query); }); match head.headers.get(http::header::ACCEPT) { - Some(v) if v == APP_OCTET => binned_binary(query, node_config).await, - Some(v) if v == APP_JSON || v == ACCEPT_ALL => binned_json(query, node_config).await, + Some(v) if v == APP_OCTET => binned_binary(query, chconf, node_config).await, + Some(v) if v == APP_JSON || v == ACCEPT_ALL => binned_json(query, chconf, node_config).await, _ => Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?), } } -async fn binned_binary(query: BinnedQuery, node_config: &NodeConfigCached) -> Result, Error> { - let ret = match disk::binned::binned_bytes_for_http(&query, node_config).await { +async fn binned_binary( + query: BinnedQuery, + chconf: ChConf, + node_config: &NodeConfigCached, +) -> Result, Error> { + let ret = match disk::binned::binned_bytes_for_http(&query, chconf.scalar_type, chconf.shape, node_config).await { Ok(s) => { response(StatusCode::OK).body(BodyStream::wrapped(s.map_err(Error::from), format!("binned_binary")))? } @@ -556,8 +568,12 @@ async fn binned_binary(query: BinnedQuery, node_config: &NodeConfigCached) -> Re Ok(ret) } -async fn binned_json(query: BinnedQuery, node_config: &NodeConfigCached) -> Result, Error> { - let ret = match disk::binned::binned_json(&query, node_config).await { +async fn binned_json( + query: BinnedQuery, + chconf: ChConf, + node_config: &NodeConfigCached, +) -> Result, Error> { + let ret = match disk::binned::binned_json(&query, chconf.scalar_type, chconf.shape, node_config).await { Ok(s) => response(StatusCode::OK).body(BodyStream::wrapped(s.map_err(Error::from), format!("binned_json")))?, Err(e) => { if query.report_error() { diff --git a/httpret/src/proxy.rs b/httpret/src/proxy.rs index d5e11ed..283058b 100644 --- a/httpret/src/proxy.rs +++ b/httpret/src/proxy.rs @@ -310,6 +310,8 @@ pub async fn channel_search(req: Request, proxy_config: &ProxyConfig) -> R backend: c.backend.clone(), description: String::new(), name: c.name.clone(), + // TODO api 0 does not provide a series id + series: 0, shape: vec![], source: c.source.clone(), ty: c.ty.clone(), diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index 2a6b961..8064fad 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -41,7 +41,7 @@ pub struct BodyStream { pub inner: Box> + Send + Unpin>, } -#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Serialize, Deserialize)] +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug)] pub enum ScalarType { U8, U16, @@ -57,6 +57,68 @@ pub enum ScalarType { STRING, } +impl Serialize for ScalarType { + fn serialize(&self, ser: S) -> Result + where + S::Error: serde::ser::Error, + { + use ScalarType::*; + match self { + U8 => ser.serialize_str("u8"), + U16 => ser.serialize_str("u16"), + U32 => ser.serialize_str("u32"), + U64 => ser.serialize_str("u64"), + I8 => ser.serialize_str("i8"), + I16 => ser.serialize_str("i16"), + I32 => ser.serialize_str("i32"), + I64 => ser.serialize_str("i64"), + F32 => ser.serialize_str("f32"), + F64 => ser.serialize_str("f64"), + BOOL => ser.serialize_str("bool"), + STRING => ser.serialize_str("string"), + } + } +} + +struct ScalarTypeVis; + +impl<'de> serde::de::Visitor<'de> for ScalarTypeVis { + type Value = ScalarType; + + fn expecting(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.write_str("a string describing the ScalarType variant") + } + + fn visit_str(self, value: &str) -> Result { + let s = value.to_lowercase(); + let ret = match s.as_str() { + "u8" => ScalarType::U8, + "u16" => ScalarType::U16, + "u32" => ScalarType::U32, + "u64" => ScalarType::U64, + "i8" => ScalarType::I8, + "i16" => ScalarType::I16, + "i32" => ScalarType::I32, + "i64" => ScalarType::I64, + "f32" => ScalarType::F32, + "f64" => ScalarType::F64, + "bool" => ScalarType::BOOL, + "string" => ScalarType::STRING, + k => return Err(E::custom(format!("can not understand variant {k:?}"))), + }; + Ok(ret) + } +} + +impl<'de> Deserialize<'de> for ScalarType { + fn deserialize(de: D) -> Result + where + D: serde::Deserializer<'de>, + { + de.deserialize_str(ScalarTypeVis) + } +} + pub trait HasScalarType { fn scalar_type(&self) -> ScalarType; } @@ -85,6 +147,24 @@ impl ScalarType { Ok(g) } + pub fn to_variant_str(&self) -> &'static str { + use ScalarType::*; + match self { + U8 => "u8", + U16 => "u16", + U32 => "u32", + U64 => "u64", + I8 => "i8", + I16 => "i16", + I32 => "i32", + I64 => "i64", + F32 => "f32", + F64 => "f64", + BOOL => "bool", + STRING => "string", + } + } + pub fn to_bsread_str(&self) -> &'static str { use ScalarType::*; match self { @@ -232,6 +312,11 @@ impl ScalarType { pub fn to_scylla_i32(&self) -> i32 { self.index() as i32 } + + pub fn from_url_str(s: &str) -> Result { + let ret = serde_json::from_str(&format!("\"{s}\""))?; + Ok(ret) + } } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -264,8 +349,6 @@ pub struct Node { pub sf_databuffer: Option, pub archiver_appliance: Option, pub channel_archiver: Option, - #[serde(default)] - pub access_scylla: bool, } struct Visit1 {} @@ -336,7 +419,6 @@ impl Node { }), archiver_appliance: None, channel_archiver: None, - access_scylla: false, } } } @@ -345,6 +427,7 @@ impl Node { pub struct Database { pub name: String, pub host: String, + pub port: u16, pub user: String, pub pass: String, } @@ -440,6 +523,8 @@ pub struct NodeStatus { pub struct Channel { pub backend: String, pub name: String, + // TODO ideally, all channels would have a unique id. For scylla backends, we require the id. + pub series: Option, } pub struct HostPort { @@ -705,7 +790,7 @@ impl Shape { ))) } else if k == 1 { Ok(Shape::Scalar) - } else if k <= 2048 { + } else if k <= 1024 * 32 { Ok(Shape::Wave(k as u32)) } else { Err(Error::with_public_msg_no_trace(format!( @@ -722,6 +807,11 @@ impl Shape { Image(n, m) => vec![*n as i32, *m as i32], } } + + pub fn from_url_str(s: &str) -> Result { + let ret = serde_json::from_str(s)?; + Ok(ret) + } } pub trait HasShape { @@ -1492,10 +1582,22 @@ pub fn channel_from_pairs(pairs: &BTreeMap) -> Result().map_or(None, |x| Some(x))), }; Ok(ret) } +pub fn channel_append_to_url(url: &mut Url, channel: &Channel) { + let mut qp = url.query_pairs_mut(); + qp.append_pair("channelBackend", &channel.backend); + qp.append_pair("channelName", &channel.name); + if let Some(series) = &channel.series { + qp.append_pair("seriesId", &format!("{}", series)); + } +} + #[derive(Debug, Serialize, Deserialize)] pub struct ChannelSearchQuery { pub backend: Option, @@ -1547,6 +1649,7 @@ mod test { pub struct ChannelSearchSingleResult { pub backend: String, pub name: String, + pub series: u64, pub source: String, #[serde(rename = "type")] pub ty: String, @@ -1653,9 +1756,8 @@ impl FromUrl for ChannelConfigQuery { impl AppendToUrl for ChannelConfigQuery { fn append_to_url(&self, url: &mut Url) { let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ"; + channel_append_to_url(url, &self.channel); let mut g = url.query_pairs_mut(); - g.append_pair("channelBackend", &self.channel.backend); - g.append_pair("channelName", &self.channel.name); g.append_pair( "begDate", &Utc.timestamp_nanos(self.range.beg as i64).format(date_fmt).to_string(), @@ -1730,7 +1832,6 @@ pub fn test_cluster() -> Cluster { }), archiver_appliance: None, channel_archiver: None, - access_scylla: false, }) .collect(); Cluster { @@ -1738,6 +1839,7 @@ pub fn test_cluster() -> Cluster { nodes, database: Database { host: "127.0.0.1".into(), + port: 5432, name: "testingdaq".into(), user: "testingdaq".into(), pass: "testingdaq".into(), @@ -1763,7 +1865,6 @@ pub fn sls_test_cluster() -> Cluster { channel_archiver: Some(ChannelArchiver { data_base_paths: vec![test_data_base_path_channel_archiver_sls()], }), - access_scylla: false, }) .collect(); Cluster { @@ -1771,6 +1872,7 @@ pub fn sls_test_cluster() -> Cluster { nodes, database: Database { host: "127.0.0.1".into(), + port: 5432, name: "testingdaq".into(), user: "testingdaq".into(), pass: "testingdaq".into(), @@ -1796,7 +1898,6 @@ pub fn archapp_test_cluster() -> Cluster { archiver_appliance: Some(ArchiverAppliance { data_base_paths: vec![test_data_base_path_archiver_appliance()], }), - access_scylla: false, }) .collect(); Cluster { @@ -1804,6 +1905,7 @@ pub fn archapp_test_cluster() -> Cluster { nodes, database: Database { host: "127.0.0.1".into(), + port: 5432, name: "testingdaq".into(), user: "testingdaq".into(), pass: "testingdaq".into(), @@ -1839,3 +1941,30 @@ pub fn test_data_base_path_archiver_appliance() -> PathBuf { .join("ArchiverStore"); data_base_path } + +#[cfg(test)] +mod test_parse { + use super::*; + + #[test] + fn parse_scalar_type_shape() { + let mut url: Url = "http://test/path".parse().unwrap(); + { + let mut g = url.query_pairs_mut(); + g.append_pair("scalarType", &format!("{:?}", ScalarType::F32)); + g.append_pair("shape", &format!("{:?}", Shape::Image(3, 4))); + } + let url = url; + let urls = format!("{}", url); + let url: Url = urls.parse().unwrap(); + let mut a = BTreeMap::new(); + for (k, v) in url.query_pairs() { + let k = k.to_string(); + let v = v.to_string(); + info!("k {k:?} v {v:?}"); + a.insert(k, v); + } + assert_eq!(a.get("scalarType").unwrap(), "f32"); + assert_eq!(a.get("shape").unwrap(), "Image(3, 4)"); + } +} diff --git a/netpod/src/query.rs b/netpod/src/query.rs index 1ec0eb4..c22983b 100644 --- a/netpod/src/query.rs +++ b/netpod/src/query.rs @@ -1,6 +1,6 @@ use crate::{ - channel_from_pairs, get_url_query_pairs, AggKind, AppendToUrl, ByteSize, Channel, FromUrl, HasBackend, HasTimeout, - NanoRange, ToNanos, + channel_append_to_url, channel_from_pairs, get_url_query_pairs, AggKind, AppendToUrl, ByteSize, Channel, FromUrl, + HasBackend, HasTimeout, NanoRange, ToNanos, }; use crate::{log::*, DiskIoTune}; use chrono::{DateTime, TimeZone, Utc}; @@ -239,10 +239,9 @@ impl AppendToUrl for BinnedQuery { fn append_to_url(&self, url: &mut Url) { let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ"; { + channel_append_to_url(url, &self.channel); let mut g = url.query_pairs_mut(); g.append_pair("cacheUsage", &self.cache_usage.to_string()); - g.append_pair("channelBackend", &self.channel.backend); - g.append_pair("channelName", &self.channel.name); g.append_pair("binCount", &format!("{}", self.bin_count)); g.append_pair( "begDate", diff --git a/nodenet/Cargo.toml b/nodenet/Cargo.toml index 3b4d8c9..657b92b 100644 --- a/nodenet/Cargo.toml +++ b/nodenet/Cargo.toml @@ -24,6 +24,7 @@ futures-util = "0.3.14" tracing = "0.1.25" hex = "0.4.3" scylla = "0.4.4" +tokio-postgres = "0.7.6" err = { path = "../err" } netpod = { path = "../netpod" } disk = { path = "../disk" } diff --git a/nodenet/src/conn.rs b/nodenet/src/conn.rs index d47f82e..7236510 100644 --- a/nodenet/src/conn.rs +++ b/nodenet/src/conn.rs @@ -116,40 +116,38 @@ async fn events_conn_handler_inner_try( return Err((Error::with_msg("json parse error"), netout))?; } }; - debug!("--- got query evq {:?}", evq); + info!("events_conn_handler_inner_try evq {:?}", evq); - let mut p1: Pin> + Send>> = if evq.channel.backend == "scylla" { - if node_config.node.access_scylla { - let scyco = node_config.node_config.cluster.scylla.as_ref().unwrap(); - match make_scylla_stream(&evq, scyco).await { + let mut p1: Pin> + Send>> = + if let Some(conf) = &node_config.node_config.cluster.scylla { + let scyco = conf; + let dbconf = node_config.node_config.cluster.database.clone(); + match make_scylla_stream(&evq, scyco, dbconf).await { + Ok(j) => j, + Err(e) => return Err((e, netout))?, + } + } else if let Some(aa) = &node_config.node.channel_archiver { + match archapp_wrap::archapp::archeng::pipe::make_event_pipe(&evq, node_config.clone(), aa.clone()).await { + Ok(j) => j, + Err(e) => return Err((e, netout))?, + } + } else if let Some(aa) = &node_config.node.archiver_appliance { + match archapp_wrap::make_event_pipe(&evq, aa).await { Ok(j) => j, Err(e) => return Err((e, netout))?, } } else { - Box::pin(futures_util::stream::empty()) - } - } else if let Some(aa) = &node_config.node.channel_archiver { - match archapp_wrap::archapp::archeng::pipe::make_event_pipe(&evq, node_config.clone(), aa.clone()).await { - Ok(j) => j, - Err(e) => return Err((e, netout))?, - } - } else if let Some(aa) = &node_config.node.archiver_appliance { - match archapp_wrap::make_event_pipe(&evq, aa).await { - Ok(j) => j, - Err(e) => return Err((e, netout))?, - } - } else { - match evq.agg_kind { - AggKind::EventBlobs => match disk::raw::conn::make_event_blobs_pipe(&evq, node_config).await { - Ok(j) => j, - Err(e) => return Err((e, netout))?, - }, - _ => match disk::raw::conn::make_event_pipe(&evq, node_config).await { - Ok(j) => j, - Err(e) => return Err((e, netout))?, - }, - } - }; + match evq.agg_kind { + AggKind::EventBlobs => match disk::raw::conn::make_event_blobs_pipe(&evq, node_config).await { + Ok(j) => j, + Err(e) => return Err((e, netout))?, + }, + _ => match disk::raw::conn::make_event_pipe(&evq, node_config).await { + Ok(j) => j, + Err(e) => return Err((e, netout))?, + }, + } + }; let mut buf_len_histo = HistoLog2::new(5); while let Some(item) = p1.next().await { let item = item.make_frame(); diff --git a/nodenet/src/scylla.rs b/nodenet/src/scylla.rs index 99710da..65895f8 100644 --- a/nodenet/src/scylla.rs +++ b/nodenet/src/scylla.rs @@ -6,7 +6,7 @@ use items::waveevents::WaveEvents; use items::{Framable, RangeCompletableItem, StreamItem}; use netpod::log::*; use netpod::query::RawEventsQuery; -use netpod::{NanoRange, ScalarType, ScyllaConfig, Shape}; +use netpod::{Channel, Database, NanoRange, ScalarType, ScyllaConfig, Shape}; use scylla::frame::response::cql_to_rust::FromRowError as ScyFromRowError; use scylla::transport::errors::{NewSessionError as ScyNewSessionError, QueryError as ScyQueryError}; use scylla::Session as ScySession; @@ -14,6 +14,7 @@ use std::collections::VecDeque; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; +use tokio_postgres::Client as PgClient; trait ErrConv { fn err_conv(self) -> Result; @@ -46,6 +47,15 @@ impl ErrConv for Result { } } +impl ErrConv for Result { + fn err_conv(self) -> Result { + match self { + Ok(k) => Ok(k), + Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))), + } + } +} + macro_rules! impl_read_values_fut { ($fname:ident, $self:expr, $ts_msp:expr) => {{ let fut = $fname($self.series, $ts_msp, $self.range.clone(), $self.scy.clone()); @@ -101,14 +111,18 @@ impl ReadValues { fn next(&mut self) -> bool { if let Some(ts_msp) = self.ts_msp.pop_front() { - self.fut = self.make_fut(ts_msp); + self.fut = self.make_fut(ts_msp, self.ts_msp.len() > 1); true } else { false } } - fn make_fut(&mut self, ts_msp: u64) -> Pin> + Send>> { + fn make_fut( + &mut self, + ts_msp: u64, + _has_more_msp: bool, + ) -> Pin> + Send>> { // TODO this also needs to differentiate on Shape. let fut = match &self.shape { Shape::Scalar => match &self.scalar_type { @@ -137,7 +151,7 @@ impl ReadValues { enum FrState { New, - FindSeries(Pin> + Send>>), + FindSeries(Pin> + Send>>), FindMsp(Pin, Error>> + Send>>), ReadValues(ReadValues), Done, @@ -145,26 +159,30 @@ enum FrState { pub struct ScyllaFramableStream { state: FrState, - facility: String, - channel_name: String, + #[allow(unused)] + evq: RawEventsQuery, + #[allow(unused)] + channel: Channel, + series: u64, range: NanoRange, scalar_type: Option, shape: Option, - series: i64, scy: Arc, + pgclient: Arc, } impl ScyllaFramableStream { - pub fn new(evq: &RawEventsQuery, scy: Arc) -> Self { + pub fn new(evq: &RawEventsQuery, scy: Arc, pgclient: Arc) -> Self { Self { state: FrState::New, - facility: evq.channel.backend.clone(), - channel_name: evq.channel.name().into(), + series: evq.channel.series.unwrap(), + evq: evq.clone(), + channel: evq.channel.clone(), range: evq.range.clone(), scalar_type: None, shape: None, - series: 0, scy, + pgclient, } } } @@ -177,18 +195,17 @@ impl Stream for ScyllaFramableStream { loop { break match self.state { FrState::New => { - let fut = find_series(self.facility.clone(), self.channel_name.clone(), self.scy.clone()); + let fut = find_series(self.series, self.pgclient.clone()); let fut = Box::pin(fut); self.state = FrState::FindSeries(fut); continue; } FrState::FindSeries(ref mut fut) => match fut.poll_unpin(cx) { - Ready(Ok((series, scalar_type, shape))) => { - info!("ScyllaFramableStream found series {}", series); - self.series = series; + Ready(Ok((scalar_type, shape))) => { + info!("ScyllaFramableStream found series {:?} {:?}", scalar_type, shape); self.scalar_type = Some(scalar_type); self.shape = Some(shape); - let fut = find_ts_msp(series, self.range.clone(), self.scy.clone()); + let fut = find_ts_msp(self.series as i64, self.range.clone(), self.scy.clone()); let fut = Box::pin(fut); self.state = FrState::FindMsp(fut); continue; @@ -206,10 +223,11 @@ impl Stream for ScyllaFramableStream { info!("found ts_msp {ts_msp:?}"); // TODO get rid of into() for VecDeque let mut st = ReadValues::new( - self.series, + self.series as i64, self.scalar_type.as_ref().unwrap().clone(), self.shape.as_ref().unwrap().clone(), self.range.clone(), + // TODO get rid of the conversion: ts_msp.into(), self.scy.clone(), ); @@ -245,25 +263,19 @@ impl Stream for ScyllaFramableStream { } } -async fn find_series( - facility: String, - channel_name: String, - scy: Arc, -) -> Result<(i64, ScalarType, Shape), Error> { - info!("find_series"); - let res = { - let cql = - "select series, scalar_type, shape_dims from series_by_channel where facility = ? and channel_name = ?"; - scy.query(cql, (&facility, &channel_name)).await.err_conv()? +async fn find_series(series: u64, pgclient: Arc) -> Result<(ScalarType, Shape), Error> { + info!("find_series series {}", series); + let rows = { + let q = "select facility, channel, scalar_type, shape_dims from series_by_channel where series = $1"; + pgclient.query(q, &[&(series as i64)]).await.err_conv()? }; - let rows: Vec<_> = res.rows_typed_or_empty::<(i64, i32, Vec)>().collect(); - if rows.len() > 1 { - error!("Multiple series found for channel, can not return data for ambiguous series"); + if rows.len() < 1 { return Err(Error::with_public_msg_no_trace( "Multiple series found for channel, can not return data for ambiguous series", )); } - if rows.len() < 1 { + if rows.len() > 1 { + error!("Multiple series found for channel, can not return data for ambiguous series"); return Err(Error::with_public_msg_no_trace( "Multiple series found for channel, can not return data for ambiguous series", )); @@ -271,28 +283,43 @@ async fn find_series( let row = rows .into_iter() .next() - .ok_or_else(|| Error::with_public_msg_no_trace(format!("can not find series for channel")))? - .err_conv()?; - info!("make_scylla_stream row {row:?}"); - let series = row.0; - let scalar_type = ScalarType::from_scylla_i32(row.1)?; - let shape = Shape::from_scylla_shape_dims(&row.2)?; + .ok_or_else(|| Error::with_public_msg_no_trace(format!("can not find series for channel")))?; + info!("row {row:?}"); + let _facility: String = row.get(0); + let _channel: String = row.get(1); + let a: i32 = row.get(2); + let scalar_type = ScalarType::from_scylla_i32(a)?; + let a: Vec = row.get(3); + let shape = Shape::from_scylla_shape_dims(&a)?; info!("make_scylla_stream series {series} scalar_type {scalar_type:?} shape {shape:?}"); - Ok((series, scalar_type, shape)) + Ok((scalar_type, shape)) } async fn find_ts_msp(series: i64, range: NanoRange, scy: Arc) -> Result, Error> { + info!("find_ts_msp series {} {:?}", series, range); + // TODO use prepared statements + let cql = "select ts_msp from ts_msp where series = ? and ts_msp < ? order by ts_msp desc limit 1"; + let res = scy.query(cql, (series, range.beg as i64)).await.err_conv()?; + let mut before = vec![]; + for row in res.rows_typed_or_empty::<(i64,)>() { + let row = row.err_conv()?; + before.push(row.0 as u64); + } + info!("FOUND BEFORE THE REQUESTED TIME: {} {:?}", before.len(), before); let cql = "select ts_msp from ts_msp where series = ? and ts_msp >= ? and ts_msp < ?"; let res = scy .query(cql, (series, range.beg as i64, range.end as i64)) .await .err_conv()?; let mut ret = vec![]; + for x in before { + ret.push(x); + } for row in res.rows_typed_or_empty::<(i64,)>() { let row = row.err_conv()?; ret.push(row.0 as u64); } - info!("found in total {} rows", ret.len()); + info!("found in total {} rows {:?}", ret.len(), ret); Ok(ret) } @@ -307,34 +334,31 @@ macro_rules! read_next_scalar_values { type ST = $st; type SCYTY = $scyty; info!("{} series {} ts_msp {}", stringify!($fname), series, ts_msp); - // TODO add the constraint on range! - warn!("remove the limit clause, add range check"); - // TODO use typed timestamp.. - let ts_lsp_max = if range.end < ts_msp { - // We should not be here anyway. - warn!("range.end < ts_msp"); - 0 - } else { - range.end - ts_msp - }; let cql = concat!( "select ts_lsp, pulse, value from ", $table_name, - " where series = ? and ts_msp = ? and ts_lsp < ?" + " where series = ? and ts_msp = ?" ); - let res = scy - .query(cql, (series, ts_msp as i64, ts_lsp_max as i64)) - .await - .err_conv()?; + let res = scy.query(cql, (series, ts_msp as i64)).await.err_conv()?; let mut ret = ScalarEvents::::empty(); + let mut discarded = 0; for row in res.rows_typed_or_empty::<(i64, i64, SCYTY)>() { let row = row.err_conv()?; let ts = ts_msp + row.0 as u64; let pulse = row.1 as u64; let value = row.2 as ST; - ret.push(ts, pulse, value); + if ts < range.beg || ts >= range.end { + discarded += 1; + } else { + ret.push(ts, pulse, value); + } } - info!("found in total {} events ts_msp {}", ret.tss.len(), ts_msp); + info!( + "found in total {} events ts_msp {} discarded {}", + ret.tss.len(), + ts_msp, + discarded + ); Ok(ret) } }; @@ -351,25 +375,12 @@ macro_rules! read_next_array_values { type ST = $st; type SCYTY = $scyty; info!("{} series {} ts_msp {}", stringify!($fname), series, ts_msp); - // TODO add the constraint on range! - warn!("remove the limit clause, add range check"); - // TODO use typed timestamp.. - let ts_lsp_max = if range.end < ts_msp { - // We should not be here anyway. - warn!("range.end < ts_msp"); - 0 - } else { - range.end - ts_msp - }; let cql = concat!( "select ts_lsp, pulse, value from ", $table_name, - " where series = ? and ts_msp = ? and ts_lsp < ?" + " where series = ? and ts_msp = ?" ); - let res = scy - .query(cql, (series, ts_msp as i64, ts_lsp_max as i64)) - .await - .err_conv()?; + let res = scy.query(cql, (series, ts_msp as i64)).await.err_conv()?; let mut ret = WaveEvents::::empty(); for row in res.rows_typed_or_empty::<(i64, i64, Vec)>() { let row = row.err_conv()?; @@ -393,16 +404,26 @@ read_next_array_values!(read_next_values_array_u16, u16, i16, "events_wave_u16") pub async fn make_scylla_stream( evq: &RawEventsQuery, scyco: &ScyllaConfig, + dbconf: Database, ) -> Result> + Send>>, Error> { info!("make_scylla_stream open scylla connection"); + // TODO reuse existing connection: let scy = scylla::SessionBuilder::new() .known_nodes(&scyco.hosts) .use_keyspace(&scyco.keyspace, true) .build() .await .err_conv()?; + let u = { + let d = &dbconf; + format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, d.port, d.name) + }; + let (pgclient, pgconn) = tokio_postgres::connect(&u, tokio_postgres::NoTls).await.err_conv()?; + // TODO use common connection/pool: + tokio::spawn(pgconn); + let pgclient = Arc::new(pgclient); let scy = Arc::new(scy); - let res = Box::pin(ScyllaFramableStream::new(evq, scy)) as _; + let res = Box::pin(ScyllaFramableStream::new(evq, scy, pgclient)) as _; Ok(res) } diff --git a/parse/Cargo.toml b/parse/Cargo.toml index 9cdbe38..54c49dd 100644 --- a/parse/Cargo.toml +++ b/parse/Cargo.toml @@ -6,7 +6,7 @@ edition = "2018" [dependencies] serde = { version = "1.0", features = ["derive"] } -tokio = { version = "=1.16.1", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } +tokio = { version = "1.18.1" } chrono = { version = "0.4.19", features = ["serde"] } bytes = "1.0.1" byteorder = "1.4.3" diff --git a/taskrun/Cargo.toml b/taskrun/Cargo.toml index 0bd9cc6..571db2b 100644 --- a/taskrun/Cargo.toml +++ b/taskrun/Cargo.toml @@ -8,9 +8,11 @@ edition = "2021" path = "src/taskrun.rs" [dependencies] -tokio = { version = "=1.16.1", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } -tracing = "0.1.25" -tracing-subscriber = { version = "0.2.17", features= ["chrono"] } +tokio = { version = "1.18.1", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs", "tracing"] } +tracing = "0.1.34" +tracing-subscriber = { version = "0.3.11", features = ["fmt", "time"] } +time = { version = "0.3", features = ["formatting"] } +console-subscriber = "0.1.5" backtrace = "0.3.56" lazy_static = "1.4.0" chrono = "0.4" diff --git a/taskrun/src/taskrun.rs b/taskrun/src/taskrun.rs index aef8f96..d18011c 100644 --- a/taskrun/src/taskrun.rs +++ b/taskrun/src/taskrun.rs @@ -18,13 +18,17 @@ lazy_static::lazy_static! { } pub fn get_runtime() -> Arc { + get_runtime_opts(24, 128) +} + +pub fn get_runtime_opts(nworkers: usize, nblocking: usize) -> Arc { let mut g = RUNTIME.lock().unwrap(); match g.as_ref() { None => { tracing_init(); let res = tokio::runtime::Builder::new_multi_thread() - .worker_threads(12) - .max_blocking_threads(256) + .worker_threads(nworkers) + .max_blocking_threads(nblocking) .enable_all() .on_thread_start(|| { let _old = panic::take_hook(); @@ -81,16 +85,24 @@ lazy_static::lazy_static! { pub fn tracing_init() { let mut g = INITMX.lock().unwrap(); if *g == 0 { + //use tracing_subscriber::fmt::time::FormatTime; + let fmtstr = "[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond digits:3]Z"; + //let format = tracing_subscriber::fmt::format().with_timer(timer); + let timer = tracing_subscriber::fmt::time::UtcTime::new(time::format_description::parse(fmtstr).unwrap()); + //use tracing_subscriber::prelude::*; + //let trsub = tracing_subscriber::fmt::layer(); + //let console_layer = console_subscriber::spawn(); + //tracing_subscriber::registry().with(console_layer).with(trsub).init(); + //console_subscriber::init(); tracing_subscriber::fmt() - .with_timer(tracing_subscriber::fmt::time::ChronoUtc::with_format( - "%Y-%m-%dT%H:%M:%S%.3fZ".into(), - )) - //.with_timer(tracing_subscriber::fmt::time::uptime()) + .with_timer(timer) .with_target(true) .with_thread_names(true) //.with_max_level(tracing::Level::INFO) .with_env_filter(tracing_subscriber::EnvFilter::new( [ + //"tokio=trace", + //"runtime=trace", "info", "archapp::archeng=info", "archapp::archeng::datablockstream=info",