From 6224df534a89dfb216710eb6e1b343e17a7ee77d Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 16 May 2024 23:33:34 +0200 Subject: [PATCH] Refactor series lookup --- crates/daqbuffer/src/bin/daqbuffer.rs | 2 +- crates/dbconn/src/channelconfig.rs | 20 +- crates/dbconn/src/dbconn.rs | 31 +- crates/dbconn/src/query.rs | 3 +- crates/dbconn/src/scan.rs | 11 +- crates/dbconn/src/scan/updatechannelnames.rs | 171 ---------- crates/dbconn/src/search.rs | 10 +- crates/dbconn/src/worker.rs | 125 ++++++++ crates/disk/src/channelconfig.rs | 8 +- crates/disk/src/dataopen.rs | 4 +- crates/disk/src/disk.rs | 3 +- crates/disk/src/gen.rs | 6 +- crates/disk/src/index.rs | 2 +- crates/disk/src/raw/conn.rs | 18 +- crates/httpret/src/api1.rs | 15 +- crates/httpret/src/api4/accounting.rs | 13 +- crates/httpret/src/api4/binned.rs | 21 +- crates/httpret/src/api4/eventdata.rs | 12 +- crates/httpret/src/api4/events.rs | 35 ++- crates/httpret/src/channel_status.rs | 41 ++- crates/httpret/src/channelconfig.rs | 58 ++-- crates/httpret/src/httpret.rs | 68 ++-- crates/httpret/src/pulsemap.rs | 8 +- crates/items_2/src/binsdim0.rs | 14 +- crates/items_2/src/eventsdim0.rs | 6 +- crates/items_2/src/framable.rs | 10 +- crates/items_2/src/frame.rs | 20 +- crates/netpod/src/netpod.rs | 126 ++++++-- crates/netpod/src/ttl.rs | 43 ++- crates/nodenet/src/channelconfig.rs | 30 +- crates/nodenet/src/configquorum.rs | 8 +- crates/nodenet/src/conn.rs | 6 +- crates/nodenet/src/conn/test.rs | 18 +- crates/nodenet/src/scylla.rs | 2 +- crates/query/src/api4.rs | 10 +- crates/query/src/api4/events.rs | 13 +- crates/scyllaconn/src/accounting/toplist.rs | 3 +- crates/scyllaconn/src/events.rs | 310 +++++++++++-------- crates/scyllaconn/src/range.rs | 9 +- crates/streams/src/plaineventsstream.rs | 2 +- crates/taskrun/Cargo.toml | 9 +- 41 files changed, 762 insertions(+), 562 deletions(-) delete mode 100644 crates/dbconn/src/scan/updatechannelnames.rs create mode 100644 crates/dbconn/src/worker.rs diff --git a/crates/daqbuffer/src/bin/daqbuffer.rs b/crates/daqbuffer/src/bin/daqbuffer.rs index 1f23500..08d5920 100644 --- a/crates/daqbuffer/src/bin/daqbuffer.rs +++ b/crates/daqbuffer/src/bin/daqbuffer.rs @@ -64,7 +64,7 @@ async fn go() -> Result<(), Error> { }; match opts.subcmd { SubCmd::Retrieval(subcmd) => { - info!("daqbuffer version {}", clap::crate_version!()); + info!("daqbuffer version {} +0002", clap::crate_version!()); info!(" service_version {}", service_version); if false { #[allow(non_snake_case)] diff --git a/crates/dbconn/src/channelconfig.rs b/crates/dbconn/src/channelconfig.rs index 9859e0d..c031bd3 100644 --- a/crates/dbconn/src/channelconfig.rs +++ b/crates/dbconn/src/channelconfig.rs @@ -5,11 +5,11 @@ use err::Error; use netpod::log::*; use netpod::range::evrange::NanoRange; use netpod::ChConf; -use netpod::NodeConfigCached; use netpod::ScalarType; use netpod::Shape; use netpod::TsMs; use std::time::Duration; +use tokio_postgres::Client; /// It is an unsolved question as to how we want to uniquely address channels. /// Currently, the usual (backend, channelname) works in 99% of the cases, but the edge-cases @@ -19,13 +19,14 @@ use std::time::Duration; /// Otherwise we try to uniquely identify the series id from the given information. /// In the future, we can even try to involve time range information for that, but backends like /// old archivers and sf databuffer do not support such lookup. -pub async fn chconf_best_matching_for_name_and_range( +pub(super) async fn chconf_best_matching_for_name_and_range( backend: &str, name: &str, range: NanoRange, - ncc: &NodeConfigCached, + pg: &Client, ) -> Result { debug!("chconf_best_matching_for_name_and_range {backend} {name} {range:?}"); + #[cfg(DISABLED)] if ncc.node_config.cluster.scylla.is_none() { let e = Error::with_msg_no_trace(format!( "chconf_best_matching_for_name_and_range but not a scylla backend" @@ -33,21 +34,20 @@ pub async fn chconf_best_matching_for_name_and_range( error!("{e}"); return Err(e); }; + #[cfg(DISABLED)] if backend != ncc.node_config.cluster.backend { warn!( "mismatched backend {} vs {}", backend, ncc.node_config.cluster.backend ); } - let dbconf = &ncc.node_config.cluster.database; - let pgclient = crate::create_connection(dbconf).await?; let sql = concat!( "select unnest(tscs) as tsc, series, scalar_type, shape_dims", " from series_by_channel", " where kind = 2 and facility = $1 and channel = $2", " order by tsc", ); - let res = pgclient.query(sql, &[&backend, &name]).await.err_conv()?; + let res = pg.query(sql, &[&backend, &name]).await.err_conv()?; if res.len() == 0 { let e = Error::with_public_msg_no_trace(format!("can not find channel information for {name}")); warn!("{e}"); @@ -70,7 +70,7 @@ pub async fn chconf_best_matching_for_name_and_range( let tsmss: Vec<_> = rows.iter().map(|x| x.0.clone()).collect(); let range = (TsMs(range.beg / 1000), TsMs(range.end / 1000)); let res = decide_best_matching_index(range, &tsmss)?; - let ch_conf = chconf_for_series(backend, rows[res].1, ncc).await?; + let ch_conf = chconf_for_series(backend, rows[res].1, pg).await?; Ok(ch_conf) } else { let r = res.first().unwrap(); @@ -191,10 +191,8 @@ fn test_decide_best_matching_index_after_01() { assert_eq!(i, 0); } -pub async fn chconf_for_series(backend: &str, series: u64, ncc: &NodeConfigCached) -> Result { - let dbconf = &ncc.node_config.cluster.database; - let pgclient = crate::create_connection(dbconf).await?; - let res = pgclient +pub(super) async fn chconf_for_series(backend: &str, series: u64, pg: &Client) -> Result { + let res = pg .query( "select channel, scalar_type, shape_dims from series_by_channel where facility = $1 and series = $2", &[&backend, &(series as i64)], diff --git a/crates/dbconn/src/dbconn.rs b/crates/dbconn/src/dbconn.rs index 9ac9823..35c4d62 100644 --- a/crates/dbconn/src/dbconn.rs +++ b/crates/dbconn/src/dbconn.rs @@ -3,6 +3,7 @@ pub mod channelinfo; pub mod query; pub mod scan; pub mod search; +pub mod worker; pub mod pg { pub use tokio_postgres::types::Type; @@ -28,6 +29,7 @@ use serde::Serialize; use std::sync::Arc; use std::time::Duration; use taskrun::tokio; +use tokio::task::JoinHandle; trait ErrConv { fn err_conv(self) -> Result; @@ -63,27 +65,28 @@ pub async fn delay_io_medium() { delay_us(2000).await; } -pub async fn create_connection(db_config: &Database) -> Result { +pub async fn create_connection(db_config: &Database) -> Result<(PgClient, JoinHandle>), Error> { + warn!("create_connection\n\n CREATING CONNECTION\n\n"); // TODO use a common already running worker pool for these queries: let d = db_config; let uri = format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, d.port, d.name); let (cl, conn) = tokio_postgres::connect(&uri, NoTls) .await - .map_err(|e| format!("Can not connect to database: {e:?}")) - //.errconv() - ?; - // TODO monitor connection drop. - let _cjh = tokio::spawn(async move { - if let Err(e) = conn.await { - error!("connection error: {}", e); + .map_err(|e| format!("Can not connect to database: {e}"))?; + let jh = tokio::spawn(async move { + match conn.await { + Ok(()) => Ok(()), + Err(e) => { + error!("connection error: {}", e); + Err(Error::from_string(e)) + } } - Ok::<_, Error>(()) }); - Ok(cl) + Ok((cl, jh)) } pub async fn channel_exists(channel_name: &str, node_config: &NodeConfigCached) -> Result { - let cl = create_connection(&node_config.node_config.cluster.database).await?; + let (cl, _pgjh) = create_connection(&node_config.node_config.cluster.database).await?; let rows = cl .query("select rowid from channels where name = $1::text", &[&channel_name]) .await @@ -101,7 +104,7 @@ pub async fn channel_exists(channel_name: &str, node_config: &NodeConfigCached) } pub async fn database_size(node_config: &NodeConfigCached) -> Result { - let cl = create_connection(&node_config.node_config.cluster.database).await?; + let (cl, _pgjh) = create_connection(&node_config.node_config.cluster.database).await?; let rows = cl .query( "select pg_database_size($1::text)", @@ -129,7 +132,7 @@ pub async fn table_sizes(node_config: &NodeConfigCached) -> Result Result Result { let sql = "select name from channels order by rowid limit 1 offset (random() * (select count(rowid) from channels))::bigint"; - let cl = create_connection(&node_config.node_config.cluster.database).await?; + let (cl, _pgjh) = create_connection(&node_config.node_config.cluster.database).await?; let rows = cl.query(sql, &[]).await.err_conv()?; if rows.len() == 0 { Err(Error::with_msg("can not get random channel"))?; diff --git a/crates/dbconn/src/query.rs b/crates/dbconn/src/query.rs index 0d2c263..3c85a85 100644 --- a/crates/dbconn/src/query.rs +++ b/crates/dbconn/src/query.rs @@ -6,7 +6,6 @@ use netpod::NodeConfigCached; use netpod::SfDbChannel; // For sf-databuffer backend, given a Channel, try to complete the information if only id is given. -#[allow(unused)] async fn sf_databuffer_fetch_channel_by_series( channel: SfDbChannel, ncc: &NodeConfigCached, @@ -24,7 +23,7 @@ async fn sf_databuffer_fetch_channel_by_series( let series = channel .series() .ok_or_else(|| Error::with_msg_no_trace("no series id given"))? as i64; - let pgcon = create_connection(&ncc.node_config.cluster.database).await?; + let (pgcon, _pgjh) = create_connection(&ncc.node_config.cluster.database).await?; let mut rows = pgcon .query("select name from channels where rowid = $1", &[&series]) .await diff --git a/crates/dbconn/src/scan.rs b/crates/dbconn/src/scan.rs index 297194b..d438baf 100644 --- a/crates/dbconn/src/scan.rs +++ b/crates/dbconn/src/scan.rs @@ -30,8 +30,6 @@ use tokio::fs::DirEntry; use tokio::fs::ReadDir; use tokio_postgres::Client; -mod updatechannelnames; - #[derive(Debug, Serialize, Deserialize)] pub struct NodeDiskIdent { pub rowid: i64, @@ -201,7 +199,7 @@ async fn update_db_with_channel_names_inner( node_config: NodeConfigCached, db_config: Database, ) -> Result<(), Error> { - let dbc = create_connection(&db_config).await?; + let (dbc, _pgjh) = create_connection(&db_config).await?; info!("update_db_with_channel_names connection done"); let node_disk_ident = get_node_disk_ident(&node_config, &dbc).await?; info!("update_db_with_channel_names get_node_disk_ident done"); @@ -335,7 +333,7 @@ async fn update_db_with_all_channel_configs_inner( node_config: NodeConfigCached, ) -> Result<(), Error> { let node_config = &node_config; - let dbc = create_connection(&node_config.node_config.cluster.database).await?; + let (dbc, _pgjh) = create_connection(&node_config.node_config.cluster.database).await?; let dbc = Arc::new(dbc); let node_disk_ident = &get_node_disk_ident(node_config, &dbc).await?; let rows = dbc @@ -437,7 +435,7 @@ pub async fn update_db_with_all_channel_configs( } pub async fn update_search_cache(node_config: &NodeConfigCached) -> Result { - let dbc = create_connection(&node_config.node_config.cluster.database).await?; + let (dbc, _pgjh) = create_connection(&node_config.node_config.cluster.database).await?; dbc.query("select update_cache()", &[]) .await .err_conv() @@ -554,7 +552,8 @@ pub async fn update_db_with_all_channel_datafiles( node_disk_ident: &NodeDiskIdent, ks_prefix: &str, ) -> Result<(), Error> { - let dbc = Arc::new(create_connection(&node_config.node_config.cluster.database).await?); + let (dbc, _pgjh) = create_connection(&node_config.node_config.cluster.database).await?; + let dbc = Arc::new(dbc); let rows = dbc .query( "select rowid, facility, name from channels where facility = $1 order by facility, name", diff --git a/crates/dbconn/src/scan/updatechannelnames.rs b/crates/dbconn/src/scan/updatechannelnames.rs deleted file mode 100644 index a0220da..0000000 --- a/crates/dbconn/src/scan/updatechannelnames.rs +++ /dev/null @@ -1,171 +0,0 @@ -use super::get_node_disk_ident; -use super::update_db_with_channel_name_list; -use super::FindChannelNamesFromConfigReadDir; -use super::NodeDiskIdent; -use super::UpdatedDbWithChannelNames; -use crate::create_connection; -use crate::pg::Client as PgClient; -use err::Error; -use futures_util::Future; -use futures_util::Stream; -use netpod::NodeConfigCached; -use pin_project::pin_project; -use std::os::unix::prelude::OsStringExt; -use std::pin::Pin; -use std::task::Context; -use std::task::Poll; - -#[pin_project] -struct UpdatedDbWithChannelNamesStream { - errored: bool, - data_complete: bool, - #[allow(dead_code)] - node_config: Pin>, - // TODO can we pass a Pin to the async fn instead of creating static ref? - node_config_ref: &'static NodeConfigCached, - #[pin] - client_fut: Option> + Send>>>, - #[pin] - client: Option, - client_ref: Option<&'static PgClient>, - #[pin] - ident_fut: Option> + Send>>>, - ident: Option, - #[pin] - find: Option, - #[pin] - update_batch: Option> + Send>>>, - channel_inp_done: bool, - clist: Vec, -} - -impl UpdatedDbWithChannelNamesStream { - #[allow(unused)] - fn new(node_config: NodeConfigCached) -> Result { - let node_config = Box::pin(node_config.clone()); - let node_config_ref = unsafe { &*(&node_config as &NodeConfigCached as *const _) }; - let mut ret = Self { - errored: false, - data_complete: false, - node_config, - node_config_ref, - client_fut: None, - client: None, - client_ref: None, - ident_fut: None, - ident: None, - find: None, - update_batch: None, - channel_inp_done: false, - clist: Vec::new(), - }; - ret.client_fut = Some(Box::pin(create_connection( - &ret.node_config_ref.node_config.cluster.database, - ))); - Ok(ret) - } -} - -impl Stream for UpdatedDbWithChannelNamesStream { - type Item = Result; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - use Poll::*; - let mut pself = self.project(); - loop { - break if *pself.errored { - Ready(None) - } else if *pself.data_complete { - Ready(None) - } else if let Some(fut) = pself.find.as_mut().as_pin_mut() { - match fut.poll_next(cx) { - Ready(Some(Ok(item))) => { - pself - .clist - .push(String::from_utf8(item.file_name().into_vec()).unwrap()); - continue; - } - Ready(Some(Err(e))) => { - *pself.errored = true; - Ready(Some(Err(e))) - } - Ready(None) => { - *pself.channel_inp_done = true; - // Work through the collected items - let l = std::mem::replace(pself.clist, Vec::new()); - let fut = update_db_with_channel_name_list( - l, - pself.ident.as_ref().unwrap().facility, - pself.client.as_ref().get_ref().as_ref().unwrap(), - ); - // TODO - //pself.update_batch.replace(Box::pin(fut)); - let _ = fut; - continue; - } - Pending => Pending, - } - } else if let Some(fut) = pself.ident_fut.as_mut().as_pin_mut() { - match fut.poll(cx) { - Ready(Ok(item)) => { - *pself.ident_fut = None; - *pself.ident = Some(item); - let ret = UpdatedDbWithChannelNames { - msg: format!("Got ident {:?}", pself.ident), - count: 43, - }; - let base_path = &pself - .node_config - .node - .sf_databuffer - .as_ref() - .ok_or_else(|| Error::with_msg(format!("missing sf databuffer config in node")))? - .data_base_path; - let s = FindChannelNamesFromConfigReadDir::new(base_path); - *pself.find = Some(s); - Ready(Some(Ok(ret))) - } - Ready(Err(e)) => { - *pself.errored = true; - Ready(Some(Err(e))) - } - Pending => Pending, - } - } else if let Some(fut) = pself.client_fut.as_mut().as_pin_mut() { - match fut.poll(cx) { - Ready(Ok(item)) => { - *pself.client_fut = None; - //*pself.client = Some(Box::pin(item)); - //*pself.client_ref = Some(unsafe { &*(&pself.client.as_ref().unwrap() as &Client as *const _) }); - *pself.client = Some(item); - let c2: &PgClient = pself.client.as_ref().get_ref().as_ref().unwrap(); - *pself.client_ref = Some(unsafe { &*(c2 as *const _) }); - - //() == pself.node_config.as_ref(); - //() == pself.client.as_ref().as_pin_ref().unwrap(); - /* *pself.ident_fut = Some(Box::pin(get_node_disk_ident_2( - pself.node_config.as_ref(), - pself.client.as_ref().as_pin_ref().unwrap(), - )));*/ - *pself.ident_fut = Some(Box::pin(get_node_disk_ident( - pself.node_config_ref, - pself.client_ref.as_ref().unwrap(), - ))); - let ret = UpdatedDbWithChannelNames { - msg: format!("Client opened connection"), - count: 42, - }; - Ready(Some(Ok(ret))) - } - Ready(Err(e)) => { - *pself.errored = true; - Ready(Some(Err(e))) - } - Pending => Pending, - } - } else { - Ready(None) - }; - } - } -} diff --git a/crates/dbconn/src/search.rs b/crates/dbconn/src/search.rs index 7d8eb56..ec64b3e 100644 --- a/crates/dbconn/src/search.rs +++ b/crates/dbconn/src/search.rs @@ -34,8 +34,8 @@ pub async fn search_channel_databuffer( " dtype, shape, unit, description, channel_backend", " from searchext($1, $2, $3, $4)", ); - let cl = create_connection(&node_config.node_config.cluster.database).await?; - let rows = cl + let (pg, _pgjh) = create_connection(&node_config.node_config.cluster.database).await?; + let rows = pg .query( sql, &[&query.name_regex, &query.source_regex, &query.description_regex, &"asc"], @@ -115,7 +115,7 @@ pub async fn search_channel_scylla(query: ChannelSearchQuery, pgconf: &Database) ), regop ); - let pgclient = crate::create_connection(pgconf).await?; + let (pgclient, _pgjh) = crate::create_connection(pgconf).await?; let rows = pgclient .query(sql, &[&ch_kind, &query.name_regex, &cb1, &cb2]) .await @@ -182,7 +182,7 @@ async fn search_channel_archeng( " order by c.name", " limit 100" )); - let cl = create_connection(database).await?; + let (cl, _pgjh) = create_connection(database).await?; let rows = cl.query(sql.as_str(), &[&query.name_regex]).await.err_conv()?; let mut res = Vec::new(); for row in rows { @@ -271,7 +271,7 @@ pub async fn search_channel( node_config: &NodeConfigCached, ) -> Result { let pgconf = &node_config.node_config.cluster.database; - if let Some(_scyconf) = node_config.node_config.cluster.scylla.as_ref() { + if let Some(_scyconf) = node_config.node_config.cluster.scylla_st() { search_channel_scylla(query, pgconf).await } else if let Some(conf) = node_config.node.channel_archiver.as_ref() { search_channel_archeng(query, node_config.node_config.cluster.backend.clone(), conf, pgconf).await diff --git a/crates/dbconn/src/worker.rs b/crates/dbconn/src/worker.rs new file mode 100644 index 0000000..e7fed33 --- /dev/null +++ b/crates/dbconn/src/worker.rs @@ -0,0 +1,125 @@ +use crate::create_connection; +use async_channel::Receiver; +use async_channel::Sender; +use err::thiserror; +use err::ThisError; +use netpod::log::*; +use netpod::range::evrange::NanoRange; +use netpod::ChConf; +use netpod::Database; +use taskrun::tokio; +use tokio::task::JoinHandle; +use tokio_postgres::Client; + +#[derive(Debug, ThisError)] +pub enum Error { + Error(#[from] err::Error), + ChannelSend, + ChannelRecv, + Join, +} + +impl err::ToErr for Error { + fn to_err(self) -> err::Error { + err::Error::from_string(self) + } +} + +#[derive(Debug)] +enum Job { + ChConfBestMatchingNameRange(String, String, NanoRange, Sender>), + ChConfForSeries(String, u64, Sender>), +} + +#[derive(Debug, Clone)] +pub struct PgQueue { + tx: Sender, +} + +impl PgQueue { + pub async fn chconf_for_series( + &self, + backend: &str, + series: u64, + ) -> Result>, Error> { + let (tx, rx) = async_channel::bounded(1); + let job = Job::ChConfForSeries(backend.into(), series, tx); + self.tx.send(job).await.map_err(|_| Error::ChannelSend)?; + Ok(rx) + } + + pub async fn chconf_best_matching_name_range_job( + &self, + backend: &str, + name: &str, + range: NanoRange, + ) -> Result>, Error> { + let (tx, rx) = async_channel::bounded(1); + let job = Job::ChConfBestMatchingNameRange(backend.into(), name.into(), range, tx); + self.tx.send(job).await.map_err(|_| Error::ChannelSend)?; + Ok(rx) + } +} + +#[derive(Debug)] +pub struct PgWorker { + rx: Receiver, + pg: Client, + pgjh: Option>>, +} + +impl PgWorker { + pub async fn new(pgconf: &Database) -> Result<(PgQueue, Self), Error> { + let (tx, rx) = async_channel::bounded(64); + let (pg, pgjh) = create_connection(pgconf).await?; + let queue = PgQueue { tx }; + let worker = Self { + rx, + pg, + pgjh: Some(pgjh), + }; + Ok((queue, worker)) + } + + pub async fn work(self) -> Result<(), Error> { + loop { + let x = self.rx.recv().await; + let job = match x { + Ok(x) => x, + Err(_) => { + error!("PgWorker can not receive from channel"); + return Err(Error::ChannelRecv); + } + }; + match job { + Job::ChConfBestMatchingNameRange(backend, name, range, tx) => { + let res = + crate::channelconfig::chconf_best_matching_for_name_and_range(&backend, &name, range, &self.pg) + .await; + if tx.send(res.map_err(Into::into)).await.is_err() { + // TODO count for stats + } + } + Job::ChConfForSeries(backend, series, tx) => { + let res = crate::channelconfig::chconf_for_series(&backend, series, &self.pg).await; + if tx.send(res.map_err(Into::into)).await.is_err() { + // TODO count for stats + } + } + } + } + } + + pub async fn join(&mut self) -> Result<(), Error> { + if let Some(jh) = self.pgjh.take() { + jh.await.map_err(|_| Error::Join)?.map_err(Error::from)?; + Ok(()) + } else { + Ok(()) + } + } + + pub fn close(&self) { + self.rx.close(); + } +} diff --git a/crates/disk/src/channelconfig.rs b/crates/disk/src/channelconfig.rs index 934fcb1..f9c0346 100644 --- a/crates/disk/src/channelconfig.rs +++ b/crates/disk/src/channelconfig.rs @@ -30,9 +30,9 @@ impl From for ConfigError { pub async fn config_entry_best_match( range: &NanoRange, channel: SfDbChannel, - node_config: &NodeConfigCached, + ncc: &NodeConfigCached, ) -> Result, ConfigError> { - let channel_config = match read_local_config(channel.clone(), node_config.clone()).await { + let channel_config = match read_local_config(channel.clone(), ncc.clone()).await { Ok(x) => x, Err(e) => match e { ConfigParseError::FileNotFound => return Ok(None), @@ -59,9 +59,9 @@ pub async fn channel_configs( pub async fn channel_config_best_match( range: NanoRange, channel: SfDbChannel, - node_config: &NodeConfigCached, + ncc: &NodeConfigCached, ) -> Result, ConfigError> { - let best = config_entry_best_match(&range, channel.clone(), node_config).await?; + let best = config_entry_best_match(&range, channel.clone(), ncc).await?; match best { None => Ok(None), Some(entry) => { diff --git a/crates/disk/src/dataopen.rs b/crates/disk/src/dataopen.rs index c9e12c3..fd661a1 100644 --- a/crates/disk/src/dataopen.rs +++ b/crates/disk/src/dataopen.rs @@ -255,7 +255,7 @@ async fn open_files_inner( return Ok(()); } for &tb in &timebins { - let ts_bin = TsNano(tb * fetch_info.bs().ns()); + let ts_bin = TsNano::from_ns(tb * fetch_info.bs().ns()); if ts_bin.ns() >= range.end { continue; } @@ -350,7 +350,7 @@ async fn open_expanded_files_inner( } let mut p1 = None; for (i1, tb) in timebins.iter().enumerate().rev() { - let ts_bin = TsNano(tb * fetch_info.bs().ns()); + let ts_bin = TsNano::from_ns(tb * fetch_info.bs().ns()); if ts_bin.ns() <= range.beg { p1 = Some(i1); break; diff --git a/crates/disk/src/disk.rs b/crates/disk/src/disk.rs index e8acfd6..d7bc14d 100644 --- a/crates/disk/src/disk.rs +++ b/crates/disk/src/disk.rs @@ -385,7 +385,8 @@ impl FileContentStream2 { fn make_reading(&mut self) { let mut buf = Box::new(BytesMut::with_capacity(self.disk_io_tune.read_buffer_len)); - let bufref = unsafe { &mut *((&mut buf as &mut BytesMut) as *mut BytesMut) }; + // let bufref = unsafe { &mut *((&mut buf as &mut BytesMut) as *mut BytesMut) }; + let bufref: &mut BytesMut = err::todoval(); let fileref = unsafe { &mut *((&mut self.file) as *mut Pin>) }; let fut = AsyncReadExt::read_buf(fileref, bufref).map_err(|e| e.into()); self.fcs = FCS2::Reading((buf, Box::pin(fut))); diff --git a/crates/disk/src/gen.rs b/crates/disk/src/gen.rs index 8393daa..00f2e41 100644 --- a/crates/disk/src/gen.rs +++ b/crates/disk/src/gen.rs @@ -164,7 +164,7 @@ async fn gen_channel(chn: &ChannelGenProps, split: u32, node: &Node, ensemble: & .await .map_err(|k| Error::with_msg(format!("can not generate config {:?}", k)))?; let mut evix = 0; - let mut ts = TsNano(0); + let mut ts = TsNano::from_ns(0); let mut pulse = 0; while ts.ns() < DAY * 3 { let res = gen_timebin( @@ -352,7 +352,7 @@ async fn gen_timebin( let mut evix = evix; let mut ts = ts; let mut pulse = pulse; - let tsmax = TsNano((tb + 1) * config.time_bin_size.ns()); + let tsmax = TsNano::from_ns((tb + 1) * config.time_bin_size.ns()); while ts.ns() < tsmax.ns() { match gen_var { // TODO @@ -377,7 +377,7 @@ async fn gen_timebin( } } evix += 1; - ts.0 += ts_spacing; + ts = ts.add_ns(ts_spacing); pulse += 1; } let ret = GenTimebinRes { evix, ts, pulse }; diff --git a/crates/disk/src/index.rs b/crates/disk/src/index.rs index 6753934..77f43d3 100644 --- a/crates/disk/src/index.rs +++ b/crates/disk/src/index.rs @@ -197,7 +197,7 @@ pub fn parse_event(buf: &[u8]) -> Result<(u32, TsNano), Error> { return Err(Error::with_msg(format!("len mismatch len1: {} len2: {}", len1, len2))); } let ts = u64::from_be_bytes(*array_ref![buf, 12, 8]); - Ok((len1 as u32, TsNano(ts))) + Ok((len1 as u32, TsNano::from_ns(ts))) } pub async fn read_event_at(pos: u64, file: &mut File) -> Result<(u32, TsNano), Error> { diff --git a/crates/disk/src/raw/conn.rs b/crates/disk/src/raw/conn.rs index 5b34c82..5f3fee0 100644 --- a/crates/disk/src/raw/conn.rs +++ b/crates/disk/src/raw/conn.rs @@ -98,12 +98,12 @@ pub fn make_event_blobs_stream( event_chunker_conf: EventChunkerConf, disk_io_tune: DiskIoTune, reqctx: ReqCtxArc, - node_config: &NodeConfigCached, + ncc: &NodeConfigCached, ) -> Result { debug!("make_local_event_blobs_stream {fetch_info:?} disk_io_tune {disk_io_tune:?}"); // TODO should not need this for correctness. // Should limit based on return size and latency. - let out_max_len = if node_config.node_config.cluster.is_central_storage { + let out_max_len = if ncc.node_config.cluster.is_central_storage { 128 } else { 128 @@ -111,8 +111,8 @@ pub fn make_event_blobs_stream( let event_blobs = EventChunkerMultifile::new( range, fetch_info.clone(), - node_config.node.clone(), - node_config.ix, + ncc.node.clone(), + ncc.ix, disk_io_tune, event_chunker_conf, expand, @@ -126,7 +126,7 @@ pub fn make_event_blobs_pipe_real( subq: &EventsSubQuery, fetch_info: &SfChFetchInfo, reqctx: ReqCtxArc, - node_config: &NodeConfigCached, + ncc: &NodeConfigCached, ) -> Result> + Send>>, Error> { let expand = subq.transform().need_one_before_range(); let range = subq.range(); @@ -138,7 +138,7 @@ pub fn make_event_blobs_pipe_real( event_chunker_conf, subq.disk_io_tune(), reqctx, - node_config, + ncc, )?; let pipe = Box::pin(event_blobs) as _; Ok(pipe) @@ -191,12 +191,12 @@ pub fn make_event_blobs_pipe( subq: &EventsSubQuery, fetch_info: &SfChFetchInfo, reqctx: ReqCtxArc, - node_config: &NodeConfigCached, + ncc: &NodeConfigCached, ) -> Result> + Send>>, Error> { debug!("make_event_blobs_pipe {subq:?}"); if subq.backend() == TEST_BACKEND { - make_event_blobs_pipe_test(subq, node_config) + make_event_blobs_pipe_test(subq, ncc) } else { - make_event_blobs_pipe_real(subq, fetch_info, reqctx, node_config) + make_event_blobs_pipe_real(subq, fetch_info, reqctx, ncc) } } diff --git a/crates/httpret/src/api1.rs b/crates/httpret/src/api1.rs index 3b9d61c..9818809 100644 --- a/crates/httpret/src/api1.rs +++ b/crates/httpret/src/api1.rs @@ -6,6 +6,7 @@ use crate::gather::SubRes; use crate::response; use crate::ReqCtx; use crate::Requ; +use crate::ServiceSharedResources; use bytes::BufMut; use bytes::BytesMut; use disk::merge::mergedblobsfromremotes::MergedBlobsFromRemotes; @@ -896,6 +897,7 @@ impl Api1EventsBinaryHandler { &self, req: Requ, ctx: &ReqCtx, + shared_res: &ServiceSharedResources, node_config: &NodeConfigCached, ) -> Result { if req.method() != Method::POST { @@ -942,6 +944,7 @@ impl Api1EventsBinaryHandler { span.clone(), reqidspan.clone(), ctx, + shared_res, node_config, ) .instrument(span) @@ -958,6 +961,7 @@ impl Api1EventsBinaryHandler { span: tracing::Span, reqidspan: tracing::Span, ctx: &ReqCtx, + shared_res: &ServiceSharedResources, ncc: &NodeConfigCached, ) -> Result { let self_name = any::type_name::(); @@ -983,9 +987,14 @@ impl Api1EventsBinaryHandler { for ch in qu.channels() { debug!("try to find config quorum for {ch:?}"); let ch = SfDbChannel::from_name(backend, ch.name()); - let ch_conf = - nodenet::configquorum::find_config_basics_quorum(ch.clone(), range.clone().into(), ctx, ncc) - .await?; + let ch_conf = nodenet::configquorum::find_config_basics_quorum( + ch.clone(), + range.clone().into(), + ctx, + &shared_res.pgqueue, + ncc, + ) + .await?; match ch_conf { Some(x) => { debug!("found quorum {ch:?} {x:?}"); diff --git a/crates/httpret/src/api4/accounting.rs b/crates/httpret/src/api4/accounting.rs index c96ec7d..36c4f12 100644 --- a/crates/httpret/src/api4/accounting.rs +++ b/crates/httpret/src/api4/accounting.rs @@ -72,8 +72,7 @@ impl AccountingIngestedBytes { let scyco = ncc .node_config .cluster - .scylla - .as_ref() + .scylla_st() .ok_or_else(|| Error::with_public_msg_no_trace(format!("no scylla configured")))?; let scy = scyllaconn::conn::create_scy_session(scyco).await?; let mut stream = scyllaconn::accounting::totals::AccountingStreamScylla::new(q.range().try_into()?, scy); @@ -136,16 +135,16 @@ impl AccountingToplistCounts { _ctx: &ReqCtx, ncc: &NodeConfigCached, ) -> Result { + // TODO assumes that accounting data is in the LT keyspace let scyco = ncc .node_config .cluster - .scylla - .as_ref() - .ok_or_else(|| Error::with_public_msg_no_trace(format!("no scylla configured")))?; + .scylla_lt() + .ok_or_else(|| Error::with_public_msg_no_trace(format!("no lt scylla configured")))?; let scy = scyllaconn::conn::create_scy_session(scyco).await?; let pgconf = &ncc.node_config.cluster.database; - let pg = dbconn::create_connection(&pgconf).await?; - let mut top1 = scyllaconn::accounting::toplist::read_ts(qu.ts().0, scy).await?; + let (pg, pgjh) = dbconn::create_connection(&pgconf).await?; + let mut top1 = scyllaconn::accounting::toplist::read_ts(qu.ts().ns(), scy).await?; top1.sort_by_bytes(); let mut ret = Toplist { toplist: Vec::new() }; let series_ids: Vec<_> = top1.usage().iter().take(qu.limit() as _).map(|x| x.0).collect(); diff --git a/crates/httpret/src/api4/binned.rs b/crates/httpret/src/api4/binned.rs index 2dde50f..e87517f 100644 --- a/crates/httpret/src/api4/binned.rs +++ b/crates/httpret/src/api4/binned.rs @@ -5,6 +5,8 @@ use crate::channelconfig::ch_conf_from_binned; use crate::err::Error; use crate::requests::accepts_json_or_all; use crate::requests::accepts_octets; +use crate::ServiceSharedResources; +use dbconn::worker::PgQueue; use http::Method; use http::StatusCode; use httpclient::body_empty; @@ -23,7 +25,13 @@ use query::api4::binned::BinnedQuery; use tracing::Instrument; use url::Url; -async fn binned_json(url: Url, req: Requ, ctx: &ReqCtx, ncc: &NodeConfigCached) -> Result { +async fn binned_json( + url: Url, + req: Requ, + ctx: &ReqCtx, + pgqueue: &PgQueue, + ncc: &NodeConfigCached, +) -> Result { debug!("{:?}", req); let reqid = crate::status_board() .map_err(|e| Error::with_msg_no_trace(e.to_string()))? @@ -35,7 +43,7 @@ async fn binned_json(url: Url, req: Requ, ctx: &ReqCtx, ncc: &NodeConfigCached) e.add_public_msg(msg) })?; // TODO handle None case better and return 404 - let ch_conf = ch_conf_from_binned(&query, ctx, ncc) + let ch_conf = ch_conf_from_binned(&query, ctx, pgqueue, ncc) .await? .ok_or_else(|| Error::with_msg_no_trace("channel not found"))?; let span1 = span!( @@ -58,7 +66,7 @@ async fn binned_json(url: Url, req: Requ, ctx: &ReqCtx, ncc: &NodeConfigCached) Ok(ret) } -async fn binned(req: Requ, ctx: &ReqCtx, node_config: &NodeConfigCached) -> Result { +async fn binned(req: Requ, ctx: &ReqCtx, pgqueue: &PgQueue, ncc: &NodeConfigCached) -> Result { let url = req_uri_to_url(req.uri())?; if req .uri() @@ -68,7 +76,7 @@ async fn binned(req: Requ, ctx: &ReqCtx, node_config: &NodeConfigCached) -> Resu Err(Error::with_msg_no_trace("hidden message").add_public_msg("PublicMessage"))?; } if accepts_json_or_all(&req.headers()) { - Ok(binned_json(url, req, ctx, node_config).await?) + Ok(binned_json(url, req, ctx, pgqueue, ncc).await?) } else if accepts_octets(&req.headers()) { Ok(response_err_msg( StatusCode::NOT_ACCEPTABLE, @@ -98,12 +106,13 @@ impl BinnedHandler { &self, req: Requ, ctx: &ReqCtx, - node_config: &NodeConfigCached, + shared_res: &ServiceSharedResources, + ncc: &NodeConfigCached, ) -> Result { if req.method() != Method::GET { return Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?); } - match binned(req, ctx, node_config).await { + match binned(req, ctx, &shared_res.pgqueue, ncc).await { Ok(ret) => Ok(ret), Err(e) => { warn!("BinnedHandler handle sees: {e}"); diff --git a/crates/httpret/src/api4/eventdata.rs b/crates/httpret/src/api4/eventdata.rs index 36ff716..5ed07f7 100644 --- a/crates/httpret/src/api4/eventdata.rs +++ b/crates/httpret/src/api4/eventdata.rs @@ -1,6 +1,7 @@ use crate::bodystream::response_err_msg; use crate::response; use crate::ReqCtx; +use crate::ServiceSharedResources; use err::thiserror; use err::PublicError; use err::ThisError; @@ -15,6 +16,7 @@ use httpclient::StreamResponse; use netpod::log::*; use netpod::NodeConfigCached; use netpod::ServiceVersion; +use std::sync::Arc; #[derive(Debug, ThisError)] pub enum EventDataError { @@ -50,14 +52,14 @@ impl EventDataHandler { req: Requ, _ctx: &ReqCtx, ncc: &NodeConfigCached, - _service_version: &ServiceVersion, + shared_res: Arc, ) -> Result { if req.method() != Method::POST { Ok(response(StatusCode::NOT_ACCEPTABLE) .body(body_empty()) .map_err(|_| EventDataError::InternalError)?) } else { - match Self::handle_req(req, ncc).await { + match Self::handle_req(req, ncc, shared_res).await { Ok(ret) => Ok(ret), Err(e) => { error!("{e}"); @@ -69,7 +71,11 @@ impl EventDataHandler { } } - async fn handle_req(req: Requ, ncc: &NodeConfigCached) -> Result { + async fn handle_req( + req: Requ, + ncc: &NodeConfigCached, + shared_res: Arc, + ) -> Result { let (_head, body) = req.into_parts(); let body = read_body_bytes(body) .await diff --git a/crates/httpret/src/api4/events.rs b/crates/httpret/src/api4/events.rs index ae66812..684c9d8 100644 --- a/crates/httpret/src/api4/events.rs +++ b/crates/httpret/src/api4/events.rs @@ -5,9 +5,11 @@ use crate::requests::accepts_cbor_framed; use crate::requests::accepts_json_framed; use crate::requests::accepts_json_or_all; use crate::response; +use crate::ServiceSharedResources; use crate::ToPublicResponse; use bytes::Bytes; use bytes::BytesMut; +use dbconn::worker::PgQueue; use futures_util::future; use futures_util::stream; use futures_util::Stream; @@ -44,12 +46,13 @@ impl EventsHandler { &self, req: Requ, ctx: &ReqCtx, - node_config: &NodeConfigCached, + shared_res: &ServiceSharedResources, + ncc: &NodeConfigCached, ) -> Result { if req.method() != Method::GET { return Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?); } - match plain_events(req, ctx, node_config).await { + match plain_events(req, ctx, &shared_res.pgqueue, ncc).await { Ok(ret) => Ok(ret), Err(e) => { error!("EventsHandler sees: {e}"); @@ -59,14 +62,19 @@ impl EventsHandler { } } -async fn plain_events(req: Requ, ctx: &ReqCtx, node_config: &NodeConfigCached) -> Result { +async fn plain_events( + req: Requ, + ctx: &ReqCtx, + pgqueue: &PgQueue, + ncc: &NodeConfigCached, +) -> Result { let url = req_uri_to_url(req.uri())?; if accepts_cbor_framed(req.headers()) { - Ok(plain_events_cbor_framed(url, req, ctx, node_config).await?) + Ok(plain_events_cbor_framed(url, req, ctx, pgqueue, ncc).await?) } else if accepts_json_framed(req.headers()) { - Ok(plain_events_json_framed(url, req, ctx, node_config).await?) + Ok(plain_events_json_framed(url, req, ctx, pgqueue, ncc).await?) } else if accepts_json_or_all(req.headers()) { - Ok(plain_events_json(url, req, ctx, node_config).await?) + Ok(plain_events_json(url, req, ctx, pgqueue, ncc).await?) } else { let ret = response_err_msg(StatusCode::NOT_ACCEPTABLE, format!("unsupported accept {:?}", req))?; Ok(ret) @@ -77,10 +85,11 @@ async fn plain_events_cbor_framed( url: Url, req: Requ, ctx: &ReqCtx, + pgqueue: &PgQueue, ncc: &NodeConfigCached, ) -> Result { let evq = PlainEventsQuery::from_url(&url).map_err(|e| e.add_public_msg(format!("Can not understand query")))?; - let ch_conf = chconf_from_events_quorum(&evq, ctx, ncc) + let ch_conf = chconf_from_events_quorum(&evq, ctx, pgqueue, ncc) .await? .ok_or_else(|| Error::with_msg_no_trace("channel not found"))?; info!("plain_events_cbor_framed chconf_from_events_quorum: {ch_conf:?} {req:?}"); @@ -115,10 +124,11 @@ async fn plain_events_json_framed( url: Url, req: Requ, ctx: &ReqCtx, + pgqueue: &PgQueue, ncc: &NodeConfigCached, ) -> Result { let evq = PlainEventsQuery::from_url(&url).map_err(|e| e.add_public_msg(format!("Can not understand query")))?; - let ch_conf = chconf_from_events_quorum(&evq, ctx, ncc) + let ch_conf = chconf_from_events_quorum(&evq, ctx, pgqueue, ncc) .await? .ok_or_else(|| Error::with_msg_no_trace("channel not found"))?; info!("plain_events_json_framed chconf_from_events_quorum: {ch_conf:?} {req:?}"); @@ -133,7 +143,8 @@ async fn plain_events_json( url: Url, req: Requ, ctx: &ReqCtx, - node_config: &NodeConfigCached, + pgqueue: &PgQueue, + ncc: &NodeConfigCached, ) -> Result { let self_name = "plain_events_json"; info!("{self_name} req: {:?}", req); @@ -141,17 +152,17 @@ async fn plain_events_json( let query = PlainEventsQuery::from_url(&url)?; info!("{self_name} query {query:?}"); // TODO handle None case better and return 404 - let ch_conf = chconf_from_events_quorum(&query, ctx, node_config) + let ch_conf = chconf_from_events_quorum(&query, ctx, pgqueue, ncc) .await .map_err(Error::from)? .ok_or_else(|| Error::with_msg_no_trace("channel not found"))?; info!("{self_name} chconf_from_events_quorum: {ch_conf:?}"); - let open_bytes = OpenBoxedBytesViaHttp::new(node_config.node_config.cluster.clone()); + let open_bytes = OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone()); let item = streams::plaineventsjson::plain_events_json( &query, ch_conf, ctx, - &node_config.node_config.cluster, + &ncc.node_config.cluster, Box::pin(open_bytes), ) .await; diff --git a/crates/httpret/src/channel_status.rs b/crates/httpret/src/channel_status.rs index b67d9f1..f1d8d3f 100644 --- a/crates/httpret/src/channel_status.rs +++ b/crates/httpret/src/channel_status.rs @@ -1,6 +1,7 @@ use crate::bodystream::response; use crate::err::Error; use crate::ReqCtx; +use crate::ServiceSharedResources; use futures_util::StreamExt; use http::Method; use http::StatusCode; @@ -37,7 +38,8 @@ impl ConnectionStatusEvents { &self, req: Requ, _ctx: &ReqCtx, - node_config: &NodeConfigCached, + shared_res: &ServiceSharedResources, + ncc: &NodeConfigCached, ) -> Result { if req.method() == Method::GET { let accept_def = APP_JSON; @@ -48,7 +50,7 @@ impl ConnectionStatusEvents { if accept.contains(APP_JSON) || accept.contains(ACCEPT_ALL) { let url = req_uri_to_url(req.uri())?; let q = ChannelStateEventsQuery::from_url(&url)?; - match self.fetch_data(&q, node_config).await { + match self.fetch_data(&q, shared_res, ncc).await { Ok(k) => { let body = ToJsonBody::from(&k).into_body(); Ok(response(StatusCode::OK).body(body)?) @@ -70,17 +72,18 @@ impl ConnectionStatusEvents { async fn fetch_data( &self, q: &ChannelStateEventsQuery, - node_config: &NodeConfigCached, + shared_res: &ServiceSharedResources, + ncc: &NodeConfigCached, ) -> Result, Error> { - let scyco = node_config + let scyco = ncc .node_config .cluster - .scylla - .as_ref() + .scylla_st() .ok_or_else(|| Error::with_public_msg_no_trace(format!("no scylla configured")))?; let _scy = scyllaconn::conn::create_scy_session(scyco).await?; let _chconf = - nodenet::channelconfig::channel_config(q.range().clone(), q.channel().clone(), node_config).await?; + nodenet::channelconfig::channel_config(q.range().clone(), q.channel().clone(), &shared_res.pgqueue, ncc) + .await?; let _do_one_before_range = true; let ret = Vec::new(); if true { @@ -111,7 +114,8 @@ impl ChannelStatusEventsHandler { &self, req: Requ, _ctx: &ReqCtx, - node_config: &NodeConfigCached, + shared_res: &ServiceSharedResources, + ncc: &NodeConfigCached, ) -> Result { if req.method() == Method::GET { let accept_def = APP_JSON; @@ -122,7 +126,7 @@ impl ChannelStatusEventsHandler { if accept.contains(APP_JSON) || accept.contains(ACCEPT_ALL) { let url = req_uri_to_url(req.uri())?; let q = ChannelStateEventsQuery::from_url(&url)?; - match self.fetch_data(&q, node_config).await { + match self.fetch_data(&q, shared_res, ncc).await { Ok(k) => { let body = ToJsonBody::from(&k).into_body(); Ok(response(StatusCode::OK).body(body)?) @@ -144,20 +148,25 @@ impl ChannelStatusEventsHandler { async fn fetch_data( &self, q: &ChannelStateEventsQuery, - node_config: &NodeConfigCached, + shared_res: &ServiceSharedResources, + ncc: &NodeConfigCached, ) -> Result { - let scyco = node_config + let scyco = ncc .node_config .cluster - .scylla - .as_ref() + .scylla_st() .ok_or_else(|| Error::with_public_msg_no_trace(format!("no scylla configured")))?; let scy = scyllaconn::conn::create_scy_session(scyco).await?; let do_one_before_range = true; if false { - let chconf = nodenet::channelconfig::channel_config(q.range().clone(), q.channel().clone(), node_config) - .await? - .ok_or_else(|| Error::with_msg_no_trace("channel config not found"))?; + let chconf = nodenet::channelconfig::channel_config( + q.range().clone(), + q.channel().clone(), + &shared_res.pgqueue, + ncc, + ) + .await? + .ok_or_else(|| Error::with_msg_no_trace("channel config not found"))?; use netpod::ChannelTypeConfigGen; match chconf { ChannelTypeConfigGen::Scylla(_x) => todo!(), diff --git a/crates/httpret/src/channelconfig.rs b/crates/httpret/src/channelconfig.rs index 565d595..1d3be9d 100644 --- a/crates/httpret/src/channelconfig.rs +++ b/crates/httpret/src/channelconfig.rs @@ -2,6 +2,7 @@ use crate::err::Error; use crate::response; use crate::ToPublicResponse; use dbconn::create_connection; +use dbconn::worker::PgQueue; use futures_util::StreamExt; use http::Method; use http::StatusCode; @@ -38,27 +39,30 @@ use url::Url; pub async fn chconf_from_events_quorum( q: &PlainEventsQuery, ctx: &ReqCtx, + pgqueue: &PgQueue, ncc: &NodeConfigCached, ) -> Result, Error> { - let ret = find_config_basics_quorum(q.channel().clone(), q.range().clone(), ctx, ncc).await?; + let ret = find_config_basics_quorum(q.channel().clone(), q.range().clone(), ctx, pgqueue, ncc).await?; Ok(ret) } pub async fn chconf_from_prebinned( q: &PreBinnedQuery, ctx: &ReqCtx, + pgqueue: &PgQueue, ncc: &NodeConfigCached, ) -> Result, Error> { - let ret = find_config_basics_quorum(q.channel().clone(), q.patch().patch_range(), ctx, ncc).await?; + let ret = find_config_basics_quorum(q.channel().clone(), q.patch().patch_range(), ctx, pgqueue, ncc).await?; Ok(ret) } pub async fn ch_conf_from_binned( q: &BinnedQuery, ctx: &ReqCtx, + pgqueue: &PgQueue, ncc: &NodeConfigCached, ) -> Result, Error> { - let ret = find_config_basics_quorum(q.channel().clone(), q.range().clone(), ctx, ncc).await?; + let ret = find_config_basics_quorum(q.channel().clone(), q.range().clone(), ctx, pgqueue, ncc).await?; Ok(ret) } @@ -73,7 +77,12 @@ impl ChannelConfigHandler { } } - pub async fn handle(&self, req: Requ, node_config: &NodeConfigCached) -> Result { + pub async fn handle( + &self, + req: Requ, + pgqueue: &PgQueue, + node_config: &NodeConfigCached, + ) -> Result { if req.method() == Method::GET { let accept_def = APP_JSON; let accept = req @@ -81,7 +90,7 @@ impl ChannelConfigHandler { .get(http::header::ACCEPT) .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); if accept.contains(APP_JSON) || accept.contains(ACCEPT_ALL) { - match self.channel_config(req, &node_config).await { + match self.channel_config(req, pgqueue, &node_config).await { Ok(k) => Ok(k), Err(e) => { warn!("ChannelConfigHandler::handle: got error from channel_config: {e:?}"); @@ -96,10 +105,16 @@ impl ChannelConfigHandler { } } - async fn channel_config(&self, req: Requ, node_config: &NodeConfigCached) -> Result { + async fn channel_config( + &self, + req: Requ, + pgqueue: &PgQueue, + node_config: &NodeConfigCached, + ) -> Result { let url = req_uri_to_url(req.uri())?; let q = ChannelConfigQuery::from_url(&url)?; - let conf = nodenet::channelconfig::channel_config(q.range.clone(), q.channel.clone(), node_config).await?; + let conf = + nodenet::channelconfig::channel_config(q.range.clone(), q.channel.clone(), pgqueue, node_config).await?; match conf { Some(conf) => { let res: ChannelConfigResponse = conf.into(); @@ -180,6 +195,7 @@ impl ChannelConfigQuorumHandler { &self, req: Requ, ctx: &ReqCtx, + pgqueue: &PgQueue, node_config: &NodeConfigCached, ) -> Result { if req.method() == Method::GET { @@ -189,7 +205,7 @@ impl ChannelConfigQuorumHandler { .get(http::header::ACCEPT) .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); if accept.contains(APP_JSON) || accept.contains(ACCEPT_ALL) { - match self.channel_config_quorum(req, ctx, &node_config).await { + match self.channel_config_quorum(req, ctx, pgqueue, &node_config).await { Ok(k) => Ok(k), Err(e) => { warn!("from channel_config_quorum: {e}"); @@ -208,13 +224,15 @@ impl ChannelConfigQuorumHandler { &self, req: Requ, ctx: &ReqCtx, + pgqueue: &PgQueue, ncc: &NodeConfigCached, ) -> Result { info!("channel_config_quorum"); let url = req_uri_to_url(req.uri())?; let q = ChannelConfigQuery::from_url(&url)?; info!("channel_config_quorum for q {q:?}"); - let ch_confs = nodenet::configquorum::find_config_basics_quorum(q.channel, q.range.into(), ctx, ncc).await?; + let ch_confs = + nodenet::configquorum::find_config_basics_quorum(q.channel, q.range.into(), ctx, pgqueue, ncc).await?; let ret = response(StatusCode::OK) .header(http::header::CONTENT_TYPE, APP_JSON) .body(ToJsonBody::from(&ch_confs).into_body())?; @@ -386,8 +404,7 @@ impl ScyllaChannelsActive { let scyco = node_config .node_config .cluster - .scylla - .as_ref() + .scylla_st() .ok_or_else(|| Error::with_public_msg_no_trace(format!("No Scylla configured")))?; let scy = scyllaconn::conn::create_scy_session(scyco).await?; // Database stores tsedge/ts_msp in units of (10 sec), and we additionally map to the grid. @@ -494,7 +511,7 @@ impl IocForChannel { node_config: &NodeConfigCached, ) -> Result, Error> { let dbconf = &node_config.node_config.cluster.database; - let pg_client = create_connection(dbconf).await?; + let (pg_client, pgjh) = create_connection(dbconf).await?; let rows = pg_client .query( "select addr from ioc_by_channel where facility = $1 and channel = $2", @@ -583,8 +600,7 @@ impl ScyllaSeriesTsMsp { let scyco = node_config .node_config .cluster - .scylla - .as_ref() + .scylla_st() .ok_or_else(|| Error::with_public_msg_no_trace(format!("No Scylla configured")))?; let scy = scyllaconn::conn::create_scy_session(scyco).await?; let mut ts_msps = Vec::new(); @@ -626,7 +642,7 @@ impl AmbigiousChannelNames { } } - pub async fn handle(&self, req: Requ, node_config: &NodeConfigCached) -> Result { + pub async fn handle(&self, req: Requ, ncc: &NodeConfigCached) -> Result { if req.method() == Method::GET { let accept_def = APP_JSON; let accept = req @@ -634,7 +650,7 @@ impl AmbigiousChannelNames { .get(http::header::ACCEPT) .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); if accept == APP_JSON || accept == ACCEPT_ALL { - match self.process(node_config).await { + match self.process(ncc).await { Ok(k) => { let body = ToJsonBody::from(&k).into_body(); Ok(response(StatusCode::OK).body(body)?) @@ -650,9 +666,9 @@ impl AmbigiousChannelNames { } } - async fn process(&self, node_config: &NodeConfigCached) -> Result { - let dbconf = &node_config.node_config.cluster.database; - let pg_client = create_connection(dbconf).await?; + async fn process(&self, ncc: &NodeConfigCached) -> Result { + let dbconf = &ncc.node_config.cluster.database; + let (pg_client, pgjh) = create_connection(dbconf).await?; let rows = pg_client .query( "select t2.series, t2.channel, t2.scalar_type, t2.shape_dims, t2.agg_kind from series_by_channel t1, series_by_channel t2 where t2.channel = t1.channel and t2.series != t1.series", @@ -747,9 +763,7 @@ impl GenerateScyllaTestData { } async fn process(&self, node_config: &NodeConfigCached) -> Result<(), Error> { - let dbconf = &node_config.node_config.cluster.database; - let _pg_client = create_connection(dbconf).await?; - let scyconf = node_config.node_config.cluster.scylla.as_ref().unwrap(); + let scyconf = node_config.node_config.cluster.scylla_st().unwrap(); let scy = scyllaconn::conn::create_scy_session(scyconf).await?; let series: u64 = 42001; // TODO query `ts_msp` for all MSP values und use that to delete from event table first. diff --git a/crates/httpret/src/httpret.rs b/crates/httpret/src/httpret.rs index dd4e8cc..f777007 100644 --- a/crates/httpret/src/httpret.rs +++ b/crates/httpret/src/httpret.rs @@ -19,6 +19,8 @@ use crate::bodystream::response; use crate::err::Error; use ::err::thiserror; use ::err::ThisError; +use dbconn::worker::PgQueue; +use dbconn::worker::PgWorker; use futures_util::Future; use futures_util::FutureExt; use http::Method; @@ -37,6 +39,7 @@ use netpod::query::prebinned::PreBinnedQuery; use netpod::req_uri_to_url; use netpod::status_board; use netpod::status_board_init; +use netpod::Database; use netpod::NodeConfigCached; use netpod::ReqCtx; use netpod::ServiceVersion; @@ -49,6 +52,7 @@ use serde::Serialize; use std::net; use std::panic; use std::pin; +use std::sync::Arc; use std::task; use task::Context; use task::Poll; @@ -79,6 +83,7 @@ impl IntoBoxedError for tokio::task::JoinError {} impl IntoBoxedError for api4::databuffer_tools::FindActiveError {} impl IntoBoxedError for std::string::FromUtf8Error {} impl IntoBoxedError for std::io::Error {} +impl IntoBoxedError for dbconn::worker::Error {} impl From for RetrievalError where @@ -95,16 +100,29 @@ impl ::err::ToErr for RetrievalError { } } -pub async fn host(node_config: NodeConfigCached, service_version: ServiceVersion) -> Result<(), RetrievalError> { +pub struct ServiceSharedResources { + pgqueue: PgQueue, +} + +impl ServiceSharedResources { + pub fn new(pgqueue: PgQueue) -> Self { + Self { pgqueue } + } +} + +pub async fn host(ncc: NodeConfigCached, service_version: ServiceVersion) -> Result<(), RetrievalError> { status_board_init(); #[cfg(DISABLED)] - if let Some(bind) = node_config.node.prometheus_api_bind { + if let Some(bind) = ncc.node.prometheus_api_bind { tokio::spawn(prometheus::host(bind)); } // let rawjh = taskrun::spawn(nodenet::conn::events_service(node_config.clone())); + let (pgqueue, pgworker) = PgWorker::new(&ncc.node_config.cluster.database).await?; + let pgworker_jh = taskrun::spawn(pgworker.work()); + let shared_res = ServiceSharedResources::new(pgqueue); + let shared_res = Arc::new(shared_res); use std::str::FromStr; - let bind_addr = SocketAddr::from_str(&format!("{}:{}", node_config.node.listen(), node_config.node.port))?; - + let bind_addr = SocketAddr::from_str(&format!("{}:{}", ncc.node.listen(), ncc.node.port))?; // tokio::net::TcpSocket::new_v4()?.listen(200)? let listener = TcpListener::bind(bind_addr).await?; loop { @@ -114,14 +132,24 @@ pub async fn host(node_config: NodeConfigCached, service_version: ServiceVersion break; }; debug!("new connection from {addr}"); - let node_config = node_config.clone(); + let node_config = ncc.clone(); let service_version = service_version.clone(); let io = TokioIo::new(stream); + let shared_res = shared_res.clone(); + // let shared_res = &shared_res; tokio::task::spawn(async move { let res = hyper::server::conn::http1::Builder::new() .serve_connection( io, - service_fn(move |req| the_service_fn(req, addr, node_config.clone(), service_version.clone())), + service_fn(move |req| { + the_service_fn( + req, + addr, + node_config.clone(), + service_version.clone(), + shared_res.clone(), + ) + }), ) .await; match res { @@ -132,7 +160,7 @@ pub async fn host(node_config: NodeConfigCached, service_version: ServiceVersion } }); } - + info!("http host done"); // rawjh.await??; Ok(()) } @@ -142,10 +170,11 @@ async fn the_service_fn( addr: SocketAddr, node_config: NodeConfigCached, service_version: ServiceVersion, + shared_res: Arc, ) -> Result { let ctx = ReqCtx::new_with_node(&req, &node_config); let reqid_span = span!(Level::INFO, "req", reqid = ctx.reqid()); - let f = http_service(req, addr, ctx, node_config, service_version); + let f = http_service(req, addr, ctx, node_config, service_version, shared_res); let f = Cont { f: Box::pin(f) }; f.instrument(reqid_span).await } @@ -156,6 +185,7 @@ async fn http_service( ctx: ReqCtx, node_config: NodeConfigCached, service_version: ServiceVersion, + shared_res: Arc, ) -> Result { info!( "http-request {:?} - {:?} - {:?} - {:?}", @@ -164,7 +194,7 @@ async fn http_service( req.uri(), req.headers() ); - match http_service_try(req, ctx, &node_config, &service_version).await { + match http_service_try(req, ctx, &node_config, &service_version, shared_res).await { Ok(k) => Ok(k), Err(e) => { error!("daqbuffer node http_service sees error from http_service_try: {}", e); @@ -209,6 +239,7 @@ async fn http_service_try( ctx: ReqCtx, node_config: &NodeConfigCached, service_version: &ServiceVersion, + shared_res: Arc, ) -> Result { use http::HeaderValue; let mut urlmarks = Vec::new(); @@ -221,7 +252,7 @@ async fn http_service_try( } } } - let mut res = http_service_inner(req, &ctx, node_config, service_version).await?; + let mut res = http_service_inner(req, &ctx, node_config, service_version, shared_res).await?; let hm = res.headers_mut(); hm.append("Access-Control-Allow-Origin", "*".parse().unwrap()); hm.append("Access-Control-Allow-Headers", "*".parse().unwrap()); @@ -243,6 +274,7 @@ async fn http_service_inner( ctx: &ReqCtx, node_config: &NodeConfigCached, service_version: &ServiceVersion, + shared_res: Arc, ) -> Result { let uri = req.uri().clone(); let path = uri.path(); @@ -291,7 +323,7 @@ async fn http_service_inner( Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?) } } else if let Some(h) = api4::eventdata::EventDataHandler::handler(&req) { - Ok(h.handle(req, ctx, &node_config, service_version) + Ok(h.handle(req, ctx, &node_config, shared_res) .await .map_err(|e| Error::with_msg_no_trace(e.to_string()))?) } else if let Some(h) = api4::status::StatusNodesRecursive::handler(&req) { @@ -303,19 +335,19 @@ async fn http_service_inner( } else if let Some(h) = api4::search::ChannelSearchHandler::handler(&req) { Ok(h.handle(req, &node_config).await?) } else if let Some(h) = channel_status::ConnectionStatusEvents::handler(&req) { - Ok(h.handle(req, ctx, &node_config).await?) + Ok(h.handle(req, ctx, &shared_res, &node_config).await?) } else if let Some(h) = channel_status::ChannelStatusEventsHandler::handler(&req) { - Ok(h.handle(req, ctx, &node_config).await?) + Ok(h.handle(req, ctx, &shared_res, &node_config).await?) } else if let Some(h) = api4::events::EventsHandler::handler(&req) { - Ok(h.handle(req, ctx, &node_config).await?) + Ok(h.handle(req, ctx, &shared_res, &node_config).await?) } else if let Some(h) = api4::binned::BinnedHandler::handler(&req) { - Ok(h.handle(req, ctx, &node_config).await?) + Ok(h.handle(req, ctx, &shared_res, &node_config).await?) } else if let Some(h) = channelconfig::ChannelConfigQuorumHandler::handler(&req) { - Ok(h.handle(req, ctx, &node_config).await?) + Ok(h.handle(req, ctx, &shared_res.pgqueue, &node_config).await?) } else if let Some(h) = channelconfig::ChannelConfigsHandler::handler(&req) { Ok(h.handle(req, &node_config).await?) } else if let Some(h) = channelconfig::ChannelConfigHandler::handler(&req) { - Ok(h.handle(req, &node_config).await?) + Ok(h.handle(req, &shared_res.pgqueue, &node_config).await?) } else if let Some(h) = channelconfig::IocForChannel::handler(&req) { Ok(h.handle(req, &node_config).await?) } else if let Some(h) = channelconfig::ScyllaChannelsActive::handler(&req) { @@ -357,7 +389,7 @@ async fn http_service_inner( } else if let Some(h) = settings::SettingsThreadsMaxHandler::handler(&req) { Ok(h.handle(req, &node_config).await?) } else if let Some(h) = api1::Api1EventsBinaryHandler::handler(&req) { - Ok(h.handle(req, ctx, &node_config).await?) + Ok(h.handle(req, ctx, &shared_res, &node_config).await?) } else if let Some(h) = pulsemap::MapPulseScyllaHandler::handler(&req) { Ok(h.handle(req, &node_config).await?) } else if let Some(h) = pulsemap::IndexChannelHttpFunction::handler(&req) { diff --git a/crates/httpret/src/pulsemap.rs b/crates/httpret/src/pulsemap.rs index 0db75c8..245f502 100644 --- a/crates/httpret/src/pulsemap.rs +++ b/crates/httpret/src/pulsemap.rs @@ -425,7 +425,7 @@ impl IndexChannelHttpFunction { async fn index(req: Requ, do_print: bool, node_config: &NodeConfigCached) -> Result { // TODO avoid double-insert on central storage. - let pgc = dbconn::create_connection(&node_config.node_config.cluster.database).await?; + let (pgc, pgjh) = dbconn::create_connection(&node_config.node_config.cluster.database).await?; // TODO remove update of static columns when older clients are removed. let sql = "insert into map_pulse_files (channel, split, timebin, pulse_min, pulse_max, hostname, ks) values ($1, $2, $3, $4, $5, $6, $7) on conflict (channel, split, timebin) do update set pulse_min = $4, pulse_max = $5, upc1 = map_pulse_files.upc1 + 1, hostname = $6"; let insert_01 = pgc.prepare(sql).await?; @@ -936,7 +936,7 @@ impl MapPulseScyllaHandler { let url = req_uri_to_url(req.uri())?; let query = MapPulseQuery::from_url(&url)?; let pulse = query.pulse; - let scyconf = if let Some(x) = node_config.node_config.cluster.scylla.as_ref() { + let scyconf = if let Some(x) = node_config.node_config.cluster.scylla_st() { x } else { return Err(Error::with_public_msg_no_trace("no scylla configured")); @@ -1017,7 +1017,7 @@ impl MapPulseLocalHttpFunction { }) .unwrap_or_else(|| String::from("missing x-req-from")); let ts1 = Instant::now(); - let conn = dbconn::create_connection(&node_config.node_config.cluster.database).await?; + let (conn, pgjh) = dbconn::create_connection(&node_config.node_config.cluster.database).await?; let sql = "select channel, hostname, timebin, split, ks from map_pulse_files where hostname = $1 and pulse_min <= $2 and (pulse_max >= $2 or closed = 0)"; let rows = conn.query(sql, &[&node_config.node.host, &(pulse as i64)]).await?; let cands: Vec<_> = rows @@ -1516,7 +1516,7 @@ impl MarkClosedHttpFunction { } pub async fn mark_closed(node_config: &NodeConfigCached) -> Result<(), Error> { - let conn = dbconn::create_connection(&node_config.node_config.cluster.database).await?; + let (conn, pgjh) = dbconn::create_connection(&node_config.node_config.cluster.database).await?; let sql = "select distinct channel from map_pulse_files order by channel"; let rows = conn.query(sql, &[]).await?; let chns: Vec<_> = rows.iter().map(|r| r.get::<_, String>(0)).collect(); diff --git a/crates/items_2/src/binsdim0.rs b/crates/items_2/src/binsdim0.rs index 6229869..fe8ed32 100644 --- a/crates/items_2/src/binsdim0.rs +++ b/crates/items_2/src/binsdim0.rs @@ -1011,7 +1011,7 @@ impl TimeBinned for BinsDim0 { fn bins_timebin_fill_empty_00() { let mut bins = BinsDim0::::empty(); let binrange = BinnedRangeEnum::Time(BinnedRange { - bin_len: TsNano(SEC * 2), + bin_len: TsNano::from_ns(SEC * 2), bin_off: 9, bin_cnt: 5, }); @@ -1033,7 +1033,7 @@ fn bins_timebin_fill_empty_00() { fn bins_timebin_fill_empty_01() { let mut bins = BinsDim0::::empty(); let binrange = BinnedRangeEnum::Time(BinnedRange { - bin_len: TsNano(SEC * 2), + bin_len: TsNano::from_ns(SEC * 2), bin_off: 9, bin_cnt: 5, }); @@ -1056,7 +1056,7 @@ fn bins_timebin_fill_empty_01() { fn bins_timebin_push_empty_00() { let mut bins = BinsDim0::::empty(); let binrange = BinnedRangeEnum::Time(BinnedRange { - bin_len: TsNano(SEC * 2), + bin_len: TsNano::from_ns(SEC * 2), bin_off: 9, bin_cnt: 5, }); @@ -1078,7 +1078,7 @@ fn bins_timebin_push_empty_00() { fn bins_timebin_push_empty_01() { let mut bins = BinsDim0::::empty(); let binrange = BinnedRangeEnum::Time(BinnedRange { - bin_len: TsNano(SEC * 2), + bin_len: TsNano::from_ns(SEC * 2), bin_off: 9, bin_cnt: 5, }); @@ -1104,7 +1104,7 @@ fn bins_timebin_ingest_only_before() { bins.push(SEC * 2, SEC * 4, 3, 7, 9, 8.1); bins.push(SEC * 4, SEC * 6, 3, 6, 9, 8.2); let binrange = BinnedRangeEnum::Time(BinnedRange { - bin_len: TsNano(SEC * 2), + bin_len: TsNano::from_ns(SEC * 2), bin_off: 9, bin_cnt: 5, }); @@ -1127,7 +1127,7 @@ fn bins_timebin_ingest_00() { bins.push(SEC * 21, SEC * 22, 5, 71, 93, 86.); bins.push(SEC * 23, SEC * 24, 6, 72, 92, 81.); let binrange = BinnedRangeEnum::Time(BinnedRange { - bin_len: TsNano(SEC * 2), + bin_len: TsNano::from_ns(SEC * 2), bin_off: 9, bin_cnt: 5, }); @@ -1148,7 +1148,7 @@ fn bins_timebin_ingest_00() { #[test] fn bins_timebin_ingest_continuous_00() { let binrange = BinnedRangeEnum::Time(BinnedRange { - bin_len: TsNano(SEC * 2), + bin_len: TsNano::from_ns(SEC * 2), bin_off: 9, bin_cnt: 20, }); diff --git a/crates/items_2/src/eventsdim0.rs b/crates/items_2/src/eventsdim0.rs index 93743c7..9eb32db 100644 --- a/crates/items_2/src/eventsdim0.rs +++ b/crates/items_2/src/eventsdim0.rs @@ -1292,7 +1292,7 @@ fn binner_00() { let mut ev1 = EventsDim0::empty(); ev1.push(MS * 1200, 3, 1.2f32); ev1.push(MS * 3200, 3, 3.2f32); - let binrange = BinnedRangeEnum::from_custom(TsNano(SEC), 0, 10); + let binrange = BinnedRangeEnum::from_custom(TsNano::from_ns(SEC), 0, 10); let mut binner = ev1.time_binner_new(binrange, true); binner.ingest(ev1.as_time_binnable_mut()); eprintln!("{:?}", binner); @@ -1306,7 +1306,7 @@ fn binner_01() { ev1.push(MS * 1300, 3, 1.3); ev1.push(MS * 2100, 3, 2.1); ev1.push(MS * 2300, 3, 2.3); - let binrange = BinnedRangeEnum::from_custom(TsNano(SEC), 0, 10); + let binrange = BinnedRangeEnum::from_custom(TsNano::from_ns(SEC), 0, 10); let mut binner = ev1.time_binner_new(binrange, true); binner.ingest(ev1.as_time_binnable_mut()); eprintln!("{:?}", binner); @@ -1386,7 +1386,7 @@ fn bin_binned_02() { #[test] fn events_timebin_ingest_continuous_00() { let binrange = BinnedRangeEnum::Time(BinnedRange { - bin_len: TsNano(SEC * 2), + bin_len: TsNano::from_ns(SEC * 2), bin_off: 9, bin_cnt: 20, }); diff --git a/crates/items_2/src/framable.rs b/crates/items_2/src/framable.rs index d27fe82..576ea32 100644 --- a/crates/items_2/src/framable.rs +++ b/crates/items_2/src/framable.rs @@ -16,6 +16,7 @@ use items_0::streamitem::ERROR_FRAME_TYPE_ID; use items_0::streamitem::EVENT_QUERY_JSON_STRING_FRAME; use items_0::streamitem::SITEMTY_NONSPEC_FRAME_TYPE_ID; use items_0::Events; +use netpod::log::*; use serde::de::DeserializeOwned; use serde::Deserialize; use serde::Serialize; @@ -82,7 +83,10 @@ where Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) => make_range_complete_frame(), Ok(StreamItem::Log(item)) => make_log_frame(item), Ok(StreamItem::Stats(item)) => make_stats_frame(item), - Err(e) => make_error_frame(e), + Err(e) => { + info!("calling make_error_frame for [[{e}]]"); + make_error_frame(e) + } } } } @@ -181,7 +185,7 @@ fn test_frame_log() { #[test] fn test_frame_error() { use crate::channelevents::ChannelEvents; - use crate::frame::decode_from_slice; + use crate::frame::json_from_slice; let item: Sitemty = Err(Error::with_msg_no_trace(format!("dummy-error-message"))); let buf = Framable::make_frame(&item).unwrap(); let len = u32::from_le_bytes(buf[12..16].try_into().unwrap()); @@ -190,5 +194,5 @@ fn test_frame_error() { panic!("bad tyid"); } eprintln!("buf len {} len {}", buf.len(), len); - let item2: Error = decode_from_slice(&buf[20..20 + len as usize]).unwrap(); + let item2: Error = json_from_slice(&buf[20..20 + len as usize]).unwrap(); } diff --git a/crates/items_2/src/frame.rs b/crates/items_2/src/frame.rs index 5517e67..d40ae96 100644 --- a/crates/items_2/src/frame.rs +++ b/crates/items_2/src/frame.rs @@ -146,6 +146,20 @@ where postcard::from_bytes(buf).map_err(|e| format!("{e}").into()) } +fn json_to_vec(item: T) -> Result, Error> +where + T: Serialize, +{ + serde_json::to_vec(&item).map_err(Error::from_string) +} + +pub fn json_from_slice(buf: &[u8]) -> Result +where + T: for<'de> serde::Deserialize<'de>, +{ + serde_json::from_slice(buf).map_err(Error::from_string) +} + pub fn encode_to_vec(item: T) -> Result, Error> where T: Serialize, @@ -213,7 +227,8 @@ where // TODO remove duplication for these similar `make_*_frame` functions: pub fn make_error_frame(error: &err::Error) -> Result { - match encode_to_vec(error) { + // error frames are always encoded as json + match json_to_vec(error) { Ok(enc) => { let mut h = crc32fast::Hasher::new(); h.update(&enc); @@ -335,7 +350,8 @@ where ))); } if frame.tyid() == ERROR_FRAME_TYPE_ID { - let k: err::Error = match decode_from_slice(frame.buf()) { + // error frames are always encoded as json + let k: err::Error = match json_from_slice(frame.buf()) { Ok(item) => item, Err(e) => { error!("deserialize len {} ERROR_FRAME_TYPE_ID {}", frame.buf().len(), e); diff --git a/crates/netpod/src/netpod.rs b/crates/netpod/src/netpod.rs index ff7d34c..72af0cc 100644 --- a/crates/netpod/src/netpod.rs +++ b/crates/netpod/src/netpod.rs @@ -642,8 +642,14 @@ pub struct Cluster { pub is_central_storage: bool, #[serde(rename = "fileIoBufferSize", default)] pub file_io_buffer_size: FileIoBufferSize, - pub scylla: Option, - pub cache_scylla: Option, + scylla: Option, + #[serde(rename = "scylla_st")] + scylla_st: Option, + #[serde(rename = "scylla_mt")] + scylla_mt: Option, + #[serde(rename = "scylla_lt")] + scylla_lt: Option, + cache_scylla: Option, } impl Cluster { @@ -654,6 +660,40 @@ impl Cluster { true } } + + pub fn scylla_st(&self) -> Option<&ScyllaConfig> { + self.scylla_st.as_ref().map_or_else(|| self.scylla.as_ref(), Some) + } + + pub fn scylla_mt(&self) -> Option<&ScyllaConfig> { + self.scylla_mt.as_ref() + } + + pub fn scylla_lt(&self) -> Option<&ScyllaConfig> { + self.scylla_lt.as_ref() + } + + pub fn test_00() -> Self { + Self { + backend: "testbackend-00".into(), + nodes: Vec::new(), + database: Database { + name: "".into(), + host: "".into(), + port: 5432, + user: "".into(), + pass: "".into(), + }, + run_map_pulse_task: false, + is_central_storage: false, + file_io_buffer_size: FileIoBufferSize(1024 * 8), + scylla: None, + scylla_st: None, + scylla_mt: None, + scylla_lt: None, + cache_scylla: None, + } + } } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -1368,20 +1408,20 @@ where pub struct DtNano(u64); impl DtNano { - pub fn from_ns(ns: u64) -> Self { + pub const fn from_ns(ns: u64) -> Self { Self(ns) } - pub fn from_ms(ns: u64) -> Self { - Self(MS * ns) + pub const fn from_ms(ns: u64) -> Self { + Self(1000000 * ns) } - pub fn ns(&self) -> u64 { + pub const fn ns(&self) -> u64 { self.0 } - pub fn ms(&self) -> u64 { - self.0 / MS + pub const fn ms(&self) -> u64 { + self.0 / 1000000 } pub fn to_i64(&self) -> i64 { @@ -1389,13 +1429,13 @@ impl DtNano { } } -#[cfg(DISABLED)] -impl fmt::Debug for DtNano { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { +impl fmt::Display for DtNano { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { let sec = self.0 / SEC; let ms = (self.0 - SEC * sec) / MS; let ns = self.0 - SEC * sec - MS * ms; - f.debug_tuple("DtNano").field(&sec).field(&ms).field(&ns).finish() + // fmt.debug_tuple("DtNano").field(&sec).field(&ms).field(&ns).finish() + write!(fmt, "DtNano {{ sec {} ms {} ns {} }}", sec, ms, ns) } } @@ -1452,7 +1492,11 @@ impl DtMs { } pub const fn ms(&self) -> u64 { - self.0 / MS + self.0 + } + + pub const fn ns(&self) -> u64 { + 1000000 * self.0 } pub const fn to_i64(&self) -> i64 { @@ -1461,7 +1505,7 @@ impl DtMs { } #[derive(Copy, Clone, PartialEq, PartialOrd, Eq, Ord)] -pub struct TsNano(pub u64); +pub struct TsNano(u64); mod ts_nano_ser { use super::TsNano; @@ -1522,7 +1566,7 @@ impl TsNano { } pub const fn from_ms(ns: u64) -> Self { - Self(MS * ns) + Self(1000000 * ns) } pub const fn ns(&self) -> u64 { @@ -1530,7 +1574,7 @@ impl TsNano { } pub const fn ms(&self) -> u64 { - self.0 / MS + self.0 / 1000000 } pub const fn sub(self, v: DtNano) -> Self { @@ -1560,19 +1604,26 @@ impl TsNano { impl fmt::Debug for TsNano { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - let ts = Utc.timestamp_opt((self.0 / SEC) as i64, (self.0 % SEC) as u32); + let ts = Utc + .timestamp_opt((self.0 / SEC) as i64, (self.0 % SEC) as u32) + .earliest() + .unwrap_or(Default::default()); f.debug_struct("TsNano") - .field( - "ts", - &ts.earliest() - .unwrap_or(Default::default()) - .format(DATETIME_FMT_3MS) - .to_string(), - ) + .field("ts", &ts.format(DATETIME_FMT_3MS).to_string()) .finish() } } +impl fmt::Display for TsNano { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + let ts = Utc + .timestamp_opt((self.0 / SEC) as i64, (self.0 % SEC) as u32) + .earliest() + .unwrap_or(Default::default()); + ts.format(DATETIME_FMT_3MS).fmt(fmt) + } +} + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, PartialOrd)] pub struct PulseId(u64); @@ -2372,6 +2423,18 @@ impl TsMs { self.0 } + pub const fn ns(self) -> TsNano { + TsNano::from_ms(self.0) + } + + pub const fn ns_u64(self) -> u64 { + 1000000 * self.0 + } + + pub const fn sec(self) -> u64 { + self.0 / 1000 + } + pub const fn to_u64(self) -> u64 { self.0 } @@ -2387,6 +2450,12 @@ impl TsMs { } } +impl fmt::Display for TsMs { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "TsMs {{ {} }}", self.0) + } +} + impl std::ops::Sub for TsMs { type Output = TsMs; @@ -3215,6 +3284,9 @@ pub fn test_cluster() -> Cluster { pass: "testingdaq".into(), }, scylla: None, + scylla_st: None, + scylla_mt: None, + scylla_lt: None, cache_scylla: None, run_map_pulse_task: false, is_central_storage: false, @@ -3249,6 +3321,9 @@ pub fn sls_test_cluster() -> Cluster { pass: "testingdaq".into(), }, scylla: None, + scylla_st: None, + scylla_mt: None, + scylla_lt: None, cache_scylla: None, run_map_pulse_task: false, is_central_storage: false, @@ -3283,6 +3358,9 @@ pub fn archapp_test_cluster() -> Cluster { pass: "testingdaq".into(), }, scylla: None, + scylla_st: None, + scylla_mt: None, + scylla_lt: None, cache_scylla: None, run_map_pulse_task: false, is_central_storage: false, diff --git a/crates/netpod/src/ttl.rs b/crates/netpod/src/ttl.rs index d1c5123..f07bbbd 100644 --- a/crates/netpod/src/ttl.rs +++ b/crates/netpod/src/ttl.rs @@ -8,41 +8,54 @@ pub enum RetentionTime { } impl RetentionTime { + pub fn debug_tag(&self) -> &'static str { + use RetentionTime::*; + match self { + Short => "ST", + Medium => "MT", + Long => "LT", + } + } + pub fn table_prefix(&self) -> &'static str { use RetentionTime::*; match self { - Short => "", + Short => "st_", Medium => "mt_", Long => "lt_", } } pub fn ttl_events_d0(&self) -> Duration { - match self { - RetentionTime::Short => Duration::from_secs(60 * 60 * 12), - RetentionTime::Medium => Duration::from_secs(60 * 60 * 24 * 100), - RetentionTime::Long => Duration::from_secs(60 * 60 * 24 * 31 * 12 * 11), - } + let day = 60 * 60 * 24; + let margin_max = Duration::from_secs(day * 2); + let ttl = self.ttl_ts_msp(); + let margin = ttl / 10; + let margin = if margin >= margin_max { margin_max } else { margin }; + ttl + margin } pub fn ttl_events_d1(&self) -> Duration { - match self { - RetentionTime::Short => Duration::from_secs(60 * 60 * 12), - RetentionTime::Medium => Duration::from_secs(60 * 60 * 24 * 100), - RetentionTime::Long => Duration::from_secs(60 * 60 * 24 * 31 * 12 * 11), - } + // TTL now depends only on RetentionTime, not on data type or shape. + self.ttl_events_d0() } pub fn ttl_ts_msp(&self) -> Duration { - let dt = self.ttl_events_d0(); - dt + dt / 30 + let day = 60 * 60 * 24; + match self { + RetentionTime::Short => Duration::from_secs(day * 40), + RetentionTime::Medium => Duration::from_secs(day * 31 * 13), + RetentionTime::Long => Duration::from_secs(day * 31 * 12 * 17), + } } pub fn ttl_binned(&self) -> Duration { - self.ttl_events_d0() * 2 + // Current choice is to keep the TTL the same as for events + self.ttl_events_d0() } pub fn ttl_channel_status(&self) -> Duration { - self.ttl_binned() + // Current choice is to keep the TTL the same as for events + self.ttl_events_d0() } } diff --git a/crates/nodenet/src/channelconfig.rs b/crates/nodenet/src/channelconfig.rs index 1efa51a..257d035 100644 --- a/crates/nodenet/src/channelconfig.rs +++ b/crates/nodenet/src/channelconfig.rs @@ -1,3 +1,4 @@ +use dbconn::worker::PgQueue; use err::Error; use httpclient::url::Url; use netpod::log::*; @@ -100,13 +101,14 @@ fn channel_config_test_backend(channel: SfDbChannel) -> Result Result, Error> { if channel.backend() == TEST_BACKEND { Ok(Some(channel_config_test_backend(channel)?)) - } else if ncc.node_config.cluster.scylla.is_some() { + } else if ncc.node_config.cluster.scylla_st().is_some() { debug!("try to get ChConf for scylla type backend"); - let ret = scylla_chconf_from_sf_db_channel(range, &channel, ncc) + let ret = scylla_chconf_from_sf_db_channel(range, &channel, pgqueue) .await .map_err(Error::from)?; Ok(Some(ChannelTypeConfigGen::Scylla(ret))) @@ -158,7 +160,7 @@ pub async fn channel_configs(channel: SfDbChannel, ncc: &NodeConfigCached) -> Re } }; Ok(ret) - } else if ncc.node_config.cluster.scylla.is_some() { + } else if ncc.node_config.cluster.scylla_st().is_some() { debug!("try to get ChConf for scylla type backend"); let ret = scylla_all_chconf_from_sf_db_channel(&channel, ncc) .await @@ -206,20 +208,22 @@ pub async fn http_get_channel_config( async fn scylla_chconf_from_sf_db_channel( range: NanoRange, channel: &SfDbChannel, - ncc: &NodeConfigCached, + pgqueue: &PgQueue, ) -> Result { if let Some(series) = channel.series() { - dbconn::channelconfig::chconf_for_series(channel.backend(), series, ncc).await + let ret = pgqueue + .chconf_for_series(channel.backend(), series) + .await? + .recv() + .await??; + Ok(ret) } else { // TODO let called function allow to return None instead of error-not-found - let ret = dbconn::channelconfig::chconf_best_matching_for_name_and_range( - channel.backend(), - channel.name(), - range, - ncc, - ) - .await - .map_err(Error::from)?; + let ret = pgqueue + .chconf_best_matching_name_range_job(channel.backend(), channel.name(), range) + .await? + .recv() + .await??; Ok(ret) } } diff --git a/crates/nodenet/src/configquorum.rs b/crates/nodenet/src/configquorum.rs index 682b5fe..1763c5b 100644 --- a/crates/nodenet/src/configquorum.rs +++ b/crates/nodenet/src/configquorum.rs @@ -1,4 +1,5 @@ use crate::channelconfig::http_get_channel_config; +use dbconn::worker::PgQueue; use err::Error; use netpod::log::*; use netpod::range::evrange::SeriesRange; @@ -90,13 +91,14 @@ pub async fn find_config_basics_quorum( channel: SfDbChannel, range: SeriesRange, ctx: &ReqCtx, + pgqueue: &PgQueue, ncc: &NodeConfigCached, ) -> Result, Error> { trace!("find_config_basics_quorum"); if let Some(_cfg) = &ncc.node.sf_databuffer { let channel = if channel.name().is_empty() { if let Some(_) = channel.series() { - let pgclient = dbconn::create_connection(&ncc.node_config.cluster.database).await?; + let (pgclient, _pgjh) = dbconn::create_connection(&ncc.node_config.cluster.database).await?; let pgclient = std::sync::Arc::new(pgclient); dbconn::find_sf_channel_by_series(channel, pgclient) .await @@ -111,9 +113,9 @@ pub async fn find_config_basics_quorum( Some(x) => Ok(Some(ChannelTypeConfigGen::SfDatabuffer(x))), None => Ok(None), } - } else if let Some(_) = &ncc.node_config.cluster.scylla { + } else if let Some(_) = &ncc.node_config.cluster.scylla_st() { let range = netpod::range::evrange::NanoRange::try_from(&range)?; - let ret = crate::channelconfig::channel_config(range, channel, ncc).await?; + let ret = crate::channelconfig::channel_config(range, channel, pgqueue, ncc).await?; Ok(ret) } else { Err(Error::with_msg_no_trace( diff --git a/crates/nodenet/src/conn.rs b/crates/nodenet/src/conn.rs index 6aa3150..d0863ef 100644 --- a/crates/nodenet/src/conn.rs +++ b/crates/nodenet/src/conn.rs @@ -17,6 +17,7 @@ use items_2::empty::empty_events_dyn_ev; use items_2::framable::EventQueryJsonStringFrame; use items_2::framable::Framable; use items_2::frame::decode_frame; +use items_2::frame::make_error_frame; use items_2::frame::make_term_frame; use items_2::inmem::InMemoryFrame; use netpod::histo::HistoLog2; @@ -81,7 +82,7 @@ async fn make_channel_events_stream_data( let node_count = ncc.node_config.cluster.nodes.len() as u64; let node_ix = ncc.ix as u64; streams::generators::make_test_channel_events_stream_data(subq, node_count, node_ix) - } else if let Some(scyconf) = &ncc.node_config.cluster.scylla { + } else if let Some(scyconf) = &ncc.node_config.cluster.scylla_st() { let cfg = subq.ch_conf().to_scylla()?; scylla_channel_event_stream(subq, cfg, scyconf, ncc).await } else if let Some(_) = &ncc.node.channel_archiver { @@ -125,6 +126,7 @@ pub async fn create_response_bytes_stream( return Err(e); } if evq.is_event_blobs() { + // This is only relevant for "api-1" queries in sf-data/imagebuffer based backends. // TODO support event blobs as transform let fetch_info = evq.ch_conf().to_sf_databuffer()?; let stream = disk::raw::conn::make_event_blobs_pipe(&evq, &fetch_info, reqctx, ncc)?; @@ -151,7 +153,7 @@ pub async fn create_response_bytes_stream( }) }); // let stream = stream.map(move |x| Box::new(x) as Box); - let stream = stream.map(|x| x.make_frame().map(|x| x.freeze())); + let stream = stream.map(|x| x.make_frame().map(bytes::BytesMut::freeze)); let ret = Box::pin(stream); Ok(ret) } diff --git a/crates/nodenet/src/conn/test.rs b/crates/nodenet/src/conn/test.rs index 092bff9..4a03254 100644 --- a/crates/nodenet/src/conn/test.rs +++ b/crates/nodenet/src/conn/test.rs @@ -39,6 +39,7 @@ use tokio::io::AsyncWriteExt; use tokio::net::TcpListener; use tokio::net::TcpStream; +// TODO unify with Cluster::test_00() const TEST_BACKEND: &str = "testbackend-00"; #[test] @@ -50,22 +51,7 @@ fn raw_data_00() { let cfg = NodeConfigCached { node_config: NodeConfig { name: "node_name_dummy".into(), - cluster: Cluster { - backend: TEST_BACKEND.into(), - nodes: Vec::new(), - database: Database { - name: "".into(), - host: "".into(), - port: 5432, - user: "".into(), - pass: "".into(), - }, - run_map_pulse_task: false, - is_central_storage: false, - file_io_buffer_size: FileIoBufferSize(1024 * 8), - scylla: None, - cache_scylla: None, - }, + cluster: Cluster::test_00(), }, node: Node { host: "empty".into(), diff --git a/crates/nodenet/src/scylla.rs b/crates/nodenet/src/scylla.rs index 2eaa6cd..9c32fa8 100644 --- a/crates/nodenet/src/scylla.rs +++ b/crates/nodenet/src/scylla.rs @@ -29,7 +29,7 @@ pub async fn scylla_channel_event_stream( let shape = chconf.shape(); let do_test_stream_error = false; let with_values = evq.need_value_data(); - debug!("Make EventsStreamScylla for {series:?} {scalar_type:?} {shape:?}"); + debug!("\n\nmake EventsStreamScylla {series:?} {scalar_type:?} {shape:?}\n"); let stream = scyllaconn::events::EventsStreamScylla::new( series, evq.range().into(), diff --git a/crates/query/src/api4.rs b/crates/query/src/api4.rs index 3395f28..6f5c6b9 100644 --- a/crates/query/src/api4.rs +++ b/crates/query/src/api4.rs @@ -59,7 +59,7 @@ impl FromUrl for AccountingIngestedBytesQuery { let ret = Self { backend: pairs .get("backend") - .ok_or_else(|| Error::with_msg_no_trace("missing backend"))? + .ok_or_else(|| Error::with_public_msg_no_trace("missing backend"))? .to_string(), range: SeriesRange::from_pairs(pairs)?, }; @@ -121,14 +121,16 @@ impl FromUrl for AccountingToplistQuery { fn from_pairs(pairs: &BTreeMap) -> Result { let fn1 = |pairs: &BTreeMap| { - let v = pairs.get("tsDate").ok_or(Error::with_public_msg("missing tsDate"))?; + let v = pairs + .get("tsDate") + .ok_or(Error::with_public_msg_no_trace("missing tsDate"))?; let w = v.parse::>()?; - Ok::<_, Error>(TsNano(w.to_nanos())) + Ok::<_, Error>(TsNano::from_ns(w.to_nanos())) }; let ret = Self { backend: pairs .get("backend") - .ok_or_else(|| Error::with_msg_no_trace("missing backend"))? + .ok_or_else(|| Error::with_public_msg_no_trace("missing backend"))? .to_string(), ts: fn1(pairs)?, limit: pairs.get("limit").map_or(None, |x| x.parse().ok()).unwrap_or(20), diff --git a/crates/query/src/api4/events.rs b/crates/query/src/api4/events.rs index d3983c4..01b7cc2 100644 --- a/crates/query/src/api4/events.rs +++ b/crates/query/src/api4/events.rs @@ -170,11 +170,6 @@ impl PlainEventsQuery { self.do_test_stream_error = k; } - pub fn for_event_blobs(mut self) -> Self { - self.transform = TransformQuery::for_event_blobs(); - self - } - pub fn for_time_weighted_scalar(mut self) -> Self { self.transform = TransformQuery::for_time_weighted_scalar(); self @@ -196,6 +191,14 @@ impl PlainEventsQuery { pub fn create_errors_contains(&self, x: &str) -> bool { self.create_errors.contains(&String::from(x)) } + + pub fn summary_short(&self) -> String { + format!( + "PlainEventsQuery {{ chn: {}, range: {:?} }}", + self.channel().name(), + self.range() + ) + } } impl HasBackend for PlainEventsQuery { diff --git a/crates/scyllaconn/src/accounting/toplist.rs b/crates/scyllaconn/src/accounting/toplist.rs index 6fd6f29..2842636 100644 --- a/crates/scyllaconn/src/accounting/toplist.rs +++ b/crates/scyllaconn/src/accounting/toplist.rs @@ -41,8 +41,9 @@ pub async fn read_ts(ts: u64, scy: Arc) -> Result // TODO toplist::read_ts refactor info!("TODO toplist::read_ts refactor"); let snap = EMIT_ACCOUNTING_SNAP.ms() / 1000; + info!("ts {ts} snap {snap:?}"); let ts = ts / timeunits::SEC / snap * snap; - let cql = concat!("select series, count, bytes from account_00 where part = ? and ts = ?"); + let cql = concat!("select series, count, bytes from lt_account_00 where part = ? and ts = ?"); let qu = prep(cql, scy.clone()).await?; let ret = read_ts_inner(ts, qu, scy).await?; Ok(ret) diff --git a/crates/scyllaconn/src/events.rs b/crates/scyllaconn/src/events.rs index a7e3ef8..6900ff3 100644 --- a/crates/scyllaconn/src/events.rs +++ b/crates/scyllaconn/src/events.rs @@ -4,6 +4,7 @@ use err::Error; use futures_util::Future; use futures_util::FutureExt; use futures_util::Stream; +use futures_util::StreamExt; use items_0::scalar_ops::ScalarOps; use items_0::Appendable; use items_0::Empty; @@ -13,8 +14,12 @@ use items_2::channelevents::ChannelEvents; use items_2::eventsdim0::EventsDim0; use items_2::eventsdim1::EventsDim1; use netpod::log::*; +use netpod::DtNano; use netpod::ScalarType; use netpod::Shape; +use netpod::TsMs; +use netpod::TsNano; +use scylla::frame::response::result::Row; use scylla::Session as ScySession; use std::collections::VecDeque; use std::mem; @@ -27,33 +32,42 @@ async fn find_ts_msp( series: u64, range: ScyllaSeriesRange, scy: Arc, -) -> Result<(VecDeque, VecDeque), Error> { - trace!("find_ts_msp series {} {:?}", series, range); +) -> Result<(VecDeque, VecDeque), Error> { + trace!("find_ts_msp series {:?} {:?}", series, range); let mut ret1 = VecDeque::new(); let mut ret2 = VecDeque::new(); // TODO use prepared statements - let cql = "select ts_msp from ts_msp where series = ? and ts_msp < ? order by ts_msp desc limit 2"; - let res = scy.query(cql, (series as i64, range.beg() as i64)).await.err_conv()?; + let cql = "select ts_msp from st_ts_msp where series = ? and ts_msp < ? order by ts_msp desc limit 2"; + let params = (series as i64, range.beg().ms() as i64); + trace!("find_ts_msp query 1 params {:?}", params); + let res = scy.query(cql, params).await.err_conv()?; for row in res.rows_typed_or_empty::<(i64,)>() { let row = row.err_conv()?; - ret1.push_front(row.0 as u64); + let ts = TsMs::from_ms_u64(row.0 as u64); + trace!("query 1 ts_msp {}", ts); + ret1.push_front(ts); } - let cql = "select ts_msp from ts_msp where series = ? and ts_msp >= ? and ts_msp < ?"; - let res = scy - .query(cql, (series as i64, range.beg() as i64, range.end() as i64)) - .await - .err_conv()?; + let cql = "select ts_msp from st_ts_msp where series = ? and ts_msp >= ? and ts_msp < ?"; + let params = (series as i64, range.beg().ms() as i64, 1 + range.end().ms() as i64); + trace!("find_ts_msp query 2 params {:?}", params); + let res = scy.query(cql, params).await.err_conv()?; for row in res.rows_typed_or_empty::<(i64,)>() { let row = row.err_conv()?; - ret2.push_back(row.0 as u64); + let ts = TsMs::from_ms_u64(row.0 as u64); + trace!("query 2 ts_msp {}", ts); + ret2.push_back(ts); } - let cql = "select ts_msp from ts_msp where series = ? and ts_msp >= ? limit 1"; - let res = scy.query(cql, (series as i64, range.end() as i64)).await.err_conv()?; + let cql = "select ts_msp from st_ts_msp where series = ? and ts_msp >= ? limit 1"; + let params = (series as i64, range.end().ms() as i64); + trace!("find_ts_msp query 3 params {:?}", params); + let res = scy.query(cql, params).await.err_conv()?; for row in res.rows_typed_or_empty::<(i64,)>() { let row = row.err_conv()?; - ret2.push_back(row.0 as u64); + let ts = TsMs::from_ms_u64(row.0 as u64); + trace!("query 3 ts_msp {}", ts); + ret2.push_back(ts); } - trace!("find_ts_msp n1 {} n2 {}", ret1.len(), ret2.len()); + trace!("find_ts_msp n1 {:?} n2 {:?}", ret1.len(), ret2.len()); Ok((ret1, ret2)) } @@ -64,6 +78,7 @@ trait ValTy: Sized { fn from_scyty(inp: Self::ScyTy) -> Self; fn table_name() -> &'static str; fn default() -> Self; + fn is_valueblob() -> bool; } macro_rules! impl_scaty_scalar { @@ -81,6 +96,9 @@ macro_rules! impl_scaty_scalar { fn default() -> Self { ::default() } + fn is_valueblob() -> bool { + false + } } }; } @@ -100,39 +118,42 @@ macro_rules! impl_scaty_array { fn default() -> Self { Vec::new() } + fn is_valueblob() -> bool { + true + } } }; } -impl_scaty_scalar!(u8, i8, "events_scalar_u8"); -impl_scaty_scalar!(u16, i16, "events_scalar_u16"); -impl_scaty_scalar!(u32, i32, "events_scalar_u32"); -impl_scaty_scalar!(u64, i64, "events_scalar_u64"); -impl_scaty_scalar!(i8, i8, "events_scalar_i8"); -impl_scaty_scalar!(i16, i16, "events_scalar_i16"); -impl_scaty_scalar!(i32, i32, "events_scalar_i32"); -impl_scaty_scalar!(i64, i64, "events_scalar_i64"); -impl_scaty_scalar!(f32, f32, "events_scalar_f32"); -impl_scaty_scalar!(f64, f64, "events_scalar_f64"); -impl_scaty_scalar!(bool, bool, "events_scalar_bool"); -impl_scaty_scalar!(String, String, "events_scalar_string"); +impl_scaty_scalar!(u8, i8, "st_events_scalar_u8"); +impl_scaty_scalar!(u16, i16, "st_events_scalar_u16"); +impl_scaty_scalar!(u32, i32, "st_events_scalar_u32"); +impl_scaty_scalar!(u64, i64, "st_events_scalar_u64"); +impl_scaty_scalar!(i8, i8, "st_events_scalar_i8"); +impl_scaty_scalar!(i16, i16, "st_events_scalar_i16"); +impl_scaty_scalar!(i32, i32, "st_events_scalar_i32"); +impl_scaty_scalar!(i64, i64, "st_events_scalar_i64"); +impl_scaty_scalar!(f32, f32, "st_events_scalar_f32"); +impl_scaty_scalar!(f64, f64, "st_events_scalar_f64"); +impl_scaty_scalar!(bool, bool, "st_events_scalar_bool"); +impl_scaty_scalar!(String, String, "st_events_scalar_string"); -impl_scaty_array!(Vec, u8, Vec, "events_array_u8"); -impl_scaty_array!(Vec, u16, Vec, "events_array_u16"); -impl_scaty_array!(Vec, u32, Vec, "events_array_u32"); -impl_scaty_array!(Vec, u64, Vec, "events_array_u64"); -impl_scaty_array!(Vec, i8, Vec, "events_array_i8"); -impl_scaty_array!(Vec, i16, Vec, "events_array_i16"); -impl_scaty_array!(Vec, i32, Vec, "events_array_i32"); -impl_scaty_array!(Vec, i64, Vec, "events_array_i64"); -impl_scaty_array!(Vec, f32, Vec, "events_array_f32"); -impl_scaty_array!(Vec, f64, Vec, "events_array_f64"); -impl_scaty_array!(Vec, bool, Vec, "events_array_bool"); -impl_scaty_array!(Vec, String, Vec, "events_array_string"); +impl_scaty_array!(Vec, u8, Vec, "st_events_array_u8"); +impl_scaty_array!(Vec, u16, Vec, "st_events_array_u16"); +impl_scaty_array!(Vec, u32, Vec, "st_events_array_u32"); +impl_scaty_array!(Vec, u64, Vec, "st_events_array_u64"); +impl_scaty_array!(Vec, i8, Vec, "st_events_array_i8"); +impl_scaty_array!(Vec, i16, Vec, "st_events_array_i16"); +impl_scaty_array!(Vec, i32, Vec, "st_events_array_i32"); +impl_scaty_array!(Vec, i64, Vec, "st_events_array_i64"); +impl_scaty_array!(Vec, f32, Vec, "st_events_array_f32"); +impl_scaty_array!(Vec, f64, Vec, "st_events_array_f64"); +impl_scaty_array!(Vec, bool, Vec, "st_events_array_bool"); +impl_scaty_array!(Vec, String, Vec, "st_events_array_string"); struct ReadNextValuesOpts { series: u64, - ts_msp: u64, + ts_msp: TsMs, range: ScyllaSeriesRange, fwd: bool, with_values: bool, @@ -143,30 +164,41 @@ async fn read_next_values(opts: ReadNextValuesOpts) -> Result i64::MAX as u64 { + if range.end() > TsNano::from_ns(i64::MAX as u64) { return Err(Error::with_msg_no_trace(format!("range.end overflows i64"))); } let cql_fields = if opts.with_values { - "ts_lsp, pulse, value" + if ST::is_valueblob() { + "ts_lsp, pulse, valueblob" + } else { + "ts_lsp, pulse, value" + } } else { "ts_lsp, pulse" }; let ret = if fwd { - let ts_lsp_min = if ts_msp < range.beg() { range.beg() - ts_msp } else { 0 }; - let ts_lsp_max = if ts_msp < range.end() { range.end() - ts_msp } else { 0 }; + let ts_lsp_min = if range.beg() > ts_msp.ns() { + range.beg().delta(ts_msp.ns()) + } else { + DtNano::from_ns(0) + }; + let ts_lsp_max = if range.end() > ts_msp.ns() { + range.end().delta(ts_msp.ns()) + } else { + DtNano::from_ns(0) + }; trace!( - "FWD ts_msp {} ts_lsp_min {} ts_lsp_max {} beg {} end {} {}", + "FWD ts_msp {} ts_lsp_min {} ts_lsp_max {} {}", ts_msp, ts_lsp_min, ts_lsp_max, - range.beg(), - range.end(), table_name, ); // TODO use prepared! @@ -177,54 +209,28 @@ where ), cql_fields, table_name, ); - let res = scy - .query( - cql, - (series as i64, ts_msp as i64, ts_lsp_min as i64, ts_lsp_max as i64), - ) - .await - .err_conv()?; - let mut last_before = None; - let mut ret = ST::Container::empty(); - for row in res.rows().err_conv()? { - let (ts, pulse, value) = if opts.with_values { - let row: (i64, i64, ST::ScyTy) = row.into_typed().err_conv()?; - let ts = ts_msp + row.0 as u64; - let pulse = row.1 as u64; - let value = ValTy::from_scyty(row.2); - (ts, pulse, value) - } else { - let row: (i64, i64) = row.into_typed().err_conv()?; - let ts = ts_msp + row.0 as u64; - let pulse = row.1 as u64; - let value = ValTy::default(); - (ts, pulse, value) - }; - if ts >= range.end() { - // TODO count as logic error - error!("ts >= range.end"); - } else if ts >= range.beg() { - if pulse % 27 != 3618 { - ret.push(ts, pulse, value); - } - } else { - if last_before.is_none() { - warn!("encounter event before range in forward read {ts}"); - } - last_before = Some((ts, pulse, value)); - } + let params = ( + series as i64, + ts_msp.ms() as i64, + ts_lsp_min.ns() as i64, + ts_lsp_max.ns() as i64, + ); + trace!("FWD event search params {:?}", params); + let mut res = scy.query_iter(cql, params).await.err_conv()?; + let mut rows = Vec::new(); + while let Some(x) = res.next().await { + rows.push(x.err_conv()?); } + let mut last_before = None; + let ret = convert_rows::(rows, range, ts_msp, opts.with_values, !fwd, &mut last_before)?; ret } else { - let ts_lsp_max = if ts_msp < range.beg() { range.beg() - ts_msp } else { 0 }; - trace!( - "BCK ts_msp {} ts_lsp_max {} beg {} end {} {}", - ts_msp, - ts_lsp_max, - range.beg(), - range.end(), - table_name, - ); + let ts_lsp_max = if ts_msp.ns() < range.beg() { + range.beg().delta(ts_msp.ns()) + } else { + DtNano::from_ns(0) + }; + trace!("BCK ts_msp {} ts_lsp_max {} {}", ts_msp, ts_lsp_max, table_name,); // TODO use prepared! let cql = format!( concat!( @@ -233,54 +239,89 @@ where ), cql_fields, table_name, ); - let res = scy - .query(cql, (series as i64, ts_msp as i64, ts_lsp_max as i64)) - .await - .err_conv()?; - let mut seen_before = false; - let mut ret = ST::Container::empty(); - for row in res.rows().err_conv()? { - let (ts, pulse, value) = if opts.with_values { - let row: (i64, i64, ST::ScyTy) = row.into_typed().err_conv()?; - let ts = ts_msp + row.0 as u64; - let pulse = row.1 as u64; - let value = ValTy::from_scyty(row.2); - (ts, pulse, value) - } else { - let row: (i64, i64) = row.into_typed().err_conv()?; - let ts = ts_msp + row.0 as u64; - let pulse = row.1 as u64; - let value = ValTy::default(); - (ts, pulse, value) - }; - if ts >= range.beg() { - // TODO count as logic error - error!("ts >= range.beg"); - } else if ts < range.beg() { - if pulse % 27 != 3618 { - ret.push(ts, pulse, value); - } - } else { - seen_before = true; - } + let params = (series as i64, ts_msp.ms() as i64, ts_lsp_max.ns() as i64); + trace!("BCK event search params {:?}", params); + let mut res = scy.query_iter(cql, params).await.err_conv()?; + let mut rows = Vec::new(); + while let Some(x) = res.next().await { + rows.push(x.err_conv()?); } - let _ = seen_before; + let mut _last_before = None; + let ret = convert_rows::(rows, range, ts_msp, opts.with_values, !fwd, &mut _last_before)?; if ret.len() > 1 { error!("multiple events in backwards search {}", ret.len()); } ret }; - trace!("read ts_msp {} len {}", ts_msp, ret.len()); + trace!("read ts_msp {:?} len {}", ts_msp, ret.len()); let ret = Box::new(ret); Ok(ret) } +fn convert_rows( + rows: Vec, + range: ScyllaSeriesRange, + ts_msp: TsMs, + with_values: bool, + bck: bool, + last_before: &mut Option<(TsNano, u64, ST)>, +) -> Result<::Container, Error> { + let mut ret = ::Container::empty(); + for row in rows { + let (ts, pulse, value) = if with_values { + if ST::is_valueblob() { + let row: (i64, i64, Vec) = row.into_typed().err_conv()?; + trace!("read a value blob len {}", row.2.len()); + let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); + let pulse = row.1 as u64; + let value = ValTy::default(); + (ts, pulse, value) + } else { + let row: (i64, i64, ST::ScyTy) = row.into_typed().err_conv()?; + let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); + let pulse = row.1 as u64; + let value = ValTy::from_scyty(row.2); + (ts, pulse, value) + } + } else { + let row: (i64, i64) = row.into_typed().err_conv()?; + let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); + let pulse = row.1 as u64; + let value = ValTy::default(); + (ts, pulse, value) + }; + if bck { + if ts >= range.beg() { + // TODO count as logic error + error!("ts >= range.beg"); + } else if ts < range.beg() { + ret.push(ts.ns(), pulse, value); + } else { + *last_before = Some((ts, pulse, value)); + } + } else { + if ts >= range.end() { + // TODO count as logic error + error!("ts >= range.end"); + } else if ts >= range.beg() { + ret.push(ts.ns(), pulse, value); + } else { + if last_before.is_none() { + warn!("encounter event before range in forward read {ts}"); + } + *last_before = Some((ts, pulse, value)); + } + } + } + Ok(ret) +} + struct ReadValues { series: u64, scalar_type: ScalarType, shape: Shape, range: ScyllaSeriesRange, - ts_msps: VecDeque, + ts_msps: VecDeque, fwd: bool, with_values: bool, fut: Pin, Error>> + Send>>, @@ -294,7 +335,7 @@ impl ReadValues { scalar_type: ScalarType, shape: Shape, range: ScyllaSeriesRange, - ts_msps: VecDeque, + ts_msps: VecDeque, fwd: bool, with_values: bool, scy: Arc, @@ -327,7 +368,7 @@ impl ReadValues { } } - fn make_fut(&mut self, ts_msp: u64) -> Pin, Error>> + Send>> { + fn make_fut(&mut self, ts_msp: TsMs) -> Pin, Error>> + Send>> { let opts = ReadNextValuesOpts { series: self.series.clone(), ts_msp, @@ -387,7 +428,7 @@ impl ReadValues { enum FrState { New, - FindMsp(Pin, VecDeque), Error>> + Send>>), + FindMsp(Pin, VecDeque), Error>> + Send>>), ReadBack1(ReadValues), ReadBack2(ReadValues), ReadValues(ReadValues), @@ -402,8 +443,8 @@ pub struct EventsStreamScylla { shape: Shape, range: ScyllaSeriesRange, do_one_before_range: bool, - ts_msp_bck: VecDeque, - ts_msp_fwd: VecDeque, + ts_msp_bck: VecDeque, + ts_msp_fwd: VecDeque, scy: Arc, do_test_stream_error: bool, found_one_after: bool, @@ -422,6 +463,7 @@ impl EventsStreamScylla { scy: Arc, do_test_stream_error: bool, ) -> Self { + debug!("EventsStreamScylla::new"); Self { state: FrState::New, series, @@ -439,13 +481,13 @@ impl EventsStreamScylla { } } - fn ts_msps_found(&mut self, msps1: VecDeque, msps2: VecDeque) { + fn ts_msps_found(&mut self, msps1: VecDeque, msps2: VecDeque) { trace!("ts_msps_found msps1 {msps1:?} msps2 {msps2:?}"); self.ts_msp_bck = msps1; self.ts_msp_fwd = msps2; for x in self.ts_msp_bck.iter().rev() { let x = x.clone(); - if x >= self.range.end() { + if x.ns() >= self.range.end() { info!("FOUND one-after because of MSP"); self.found_one_after = true; } @@ -589,6 +631,7 @@ impl Stream for EventsStreamScylla { continue; } Ready(Err(e)) => { + error!("EventsStreamScylla FindMsp {e}"); self.state = FrState::DataDone; Ready(Some(Err(e))) } @@ -601,6 +644,7 @@ impl Stream for EventsStreamScylla { continue; } Ready(Err(e)) => { + error!("EventsStreamScylla ReadBack1 {e}"); st.fut_done = true; self.state = FrState::DataDone; Ready(Some(Err(e))) @@ -614,6 +658,7 @@ impl Stream for EventsStreamScylla { continue; } Ready(Err(e)) => { + error!("EventsStreamScylla ReadBack2 {e}"); st.fut_done = true; self.state = FrState::DataDone; Ready(Some(Err(e))) @@ -633,6 +678,7 @@ impl Stream for EventsStreamScylla { continue; } Ready(Err(e)) => { + error!("EventsStreamScylla ReadValues {e}"); st.fut_done = true; Ready(Some(Err(e))) } diff --git a/crates/scyllaconn/src/range.rs b/crates/scyllaconn/src/range.rs index ac0fe66..40859fb 100644 --- a/crates/scyllaconn/src/range.rs +++ b/crates/scyllaconn/src/range.rs @@ -1,4 +1,5 @@ use netpod::range::evrange::SeriesRange; +use netpod::TsNano; #[derive(Debug, Clone)] pub struct ScyllaSeriesRange { @@ -7,12 +8,12 @@ pub struct ScyllaSeriesRange { } impl ScyllaSeriesRange { - pub fn beg(&self) -> u64 { - self.beg + pub fn beg(&self) -> TsNano { + TsNano::from_ns(self.beg) } - pub fn end(&self) -> u64 { - self.end + pub fn end(&self) -> TsNano { + TsNano::from_ns(self.end) } } diff --git a/crates/streams/src/plaineventsstream.rs b/crates/streams/src/plaineventsstream.rs index 36d25f4..d838d0d 100644 --- a/crates/streams/src/plaineventsstream.rs +++ b/crates/streams/src/plaineventsstream.rs @@ -26,7 +26,7 @@ pub async fn dyn_events_stream( ctx: &ReqCtx, open_bytes: OpenBoxedBytesStreamsBox, ) -> Result { - trace!("dyn_events_stream begin"); + trace!("dyn_events_stream {}", evq.summary_short()); let subq = make_sub_query( ch_conf, evq.range().clone(), diff --git a/crates/taskrun/Cargo.toml b/crates/taskrun/Cargo.toml index 8e6dbc1..4bf9336 100644 --- a/crates/taskrun/Cargo.toml +++ b/crates/taskrun/Cargo.toml @@ -8,15 +8,14 @@ edition = "2021" path = "src/taskrun.rs" [dependencies] -tokio = { version = "1.32.0", features = ["full", "tracing", "time"] } +tokio = { version = "1.37.0", features = ["full", "tracing", "time"] } futures-util = "0.3.28" tracing = "0.1.40" tracing-log = "0.2.0" -tracing-subscriber = { version = "0.3.17", features = ["fmt", "time"] } +tracing-subscriber = { version = "0.3.18", features = ["fmt", "time"] } #tracing-loki = { version = "0.2.1", default-features = false, features = ["compat-0-2-1"] } console-subscriber = { version = "0.2.0" } time = { version = "0.3", features = ["formatting"] } -backtrace = "0.3.56" -lazy_static = "1.4.0" -chrono = "0.4" +backtrace = "0.3.71" +chrono = "0.4.38" err = { path = "../err" }