diff --git a/daqingest/src/bin/daqingest.rs b/daqingest/src/bin/daqingest.rs index 779c80c..9e6174d 100644 --- a/daqingest/src/bin/daqingest.rs +++ b/daqingest/src/bin/daqingest.rs @@ -33,24 +33,30 @@ async fn main_run(opts: DaqIngestOpts) -> Result<(), Error> { } async fn main_run_inner(opts: DaqIngestOpts) -> Result<(), Error> { - let buildmark = "+0008"; + let buildmark = "+0009"; use daqingest::opts::ChannelAccess; use daqingest::opts::SubCmd; match opts.subcmd { SubCmd::ListPkey => { // TODO must take scylla config from CLI let scylla_conf = err::todoval(); - scywr::tools::list_pkey(&scylla_conf).await? + scywr::tools::list_pkey(&scylla_conf) + .await + .map_err(Error::from_string)? } SubCmd::ListPulses => { // TODO must take scylla config from CLI let scylla_conf = err::todoval(); - scywr::tools::list_pulses(&scylla_conf).await? + scywr::tools::list_pulses(&scylla_conf) + .await + .map_err(Error::from_string)? } SubCmd::FetchEvents(k) => { // TODO must take scylla config from CLI let scylla_conf = err::todoval(); - scywr::tools::fetch_events(&k.backend, &k.channel, &scylla_conf).await? + scywr::tools::fetch_events(&k.backend, &k.channel, &scylla_conf) + .await + .map_err(Error::from_string)? } SubCmd::Db(k) => { use daqingest::opts::DbSub; diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index 712404c..4403425 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -66,7 +66,7 @@ pub struct Daemon { count_unassigned: usize, count_assigned: usize, last_status_print: SystemTime, - insert_workers_jh: Vec>>, + insert_workers_jhs: Vec>>, stats: Arc, insert_worker_stats: Arc, series_by_channel_stats: Arc, @@ -197,7 +197,8 @@ impl Daemon { insert_worker_opts.clone(), insert_worker_stats.clone(), ) - .await?; + .await + .map_err(Error::from_string)?; insert_worker_jhs.extend(jh); let jh = scywr::insertworker::spawn_scylla_insert_workers_dummy( ingest_opts.insert_worker_count(), @@ -206,7 +207,8 @@ impl Daemon { insert_worker_opts.clone(), insert_worker_stats.clone(), ) - .await?; + .await + .map_err(Error::from_string)?; insert_worker_jhs.extend(jh); let jh = scywr::insertworker::spawn_scylla_insert_workers_dummy( ingest_opts.insert_worker_count(), @@ -215,7 +217,8 @@ impl Daemon { insert_worker_opts.clone(), insert_worker_stats.clone(), ) - .await?; + .await + .map_err(Error::from_string)?; insert_worker_jhs.extend(jh); } else { let jh = scywr::insertworker::spawn_scylla_insert_workers( @@ -229,7 +232,8 @@ impl Daemon { insert_worker_stats.clone(), ingest_opts.use_rate_limit_queue(), ) - .await?; + .await + .map_err(Error::from_string)?; insert_worker_jhs.extend(jh); let jh = scywr::insertworker::spawn_scylla_insert_workers( @@ -243,7 +247,8 @@ impl Daemon { insert_worker_stats.clone(), ingest_opts.use_rate_limit_queue(), ) - .await?; + .await + .map_err(Error::from_string)?; insert_worker_jhs.extend(jh); let jh = scywr::insertworker::spawn_scylla_insert_workers( @@ -257,7 +262,8 @@ impl Daemon { insert_worker_stats.clone(), ingest_opts.use_rate_limit_queue(), ) - .await?; + .await + .map_err(Error::from_string)?; insert_worker_jhs.extend(jh); }; let stats = Arc::new(DaemonStats::new()); @@ -311,7 +317,7 @@ impl Daemon { count_unassigned: 0, count_assigned: 0, last_status_print: SystemTime::now(), - insert_workers_jh: insert_worker_jhs, + insert_workers_jhs: insert_worker_jhs, stats, insert_worker_stats, series_by_channel_stats, @@ -720,7 +726,7 @@ impl Daemon { } debug!("joined metrics handler"); debug!("wait for insert workers"); - while let Some(jh) = self.insert_workers_jh.pop() { + while let Some(jh) = self.insert_workers_jhs.pop() { match jh.await.map_err(Error::from_string) { Ok(x) => match x { Ok(()) => { diff --git a/netfetch/src/metrics.rs b/netfetch/src/metrics.rs index 92e8867..92303d7 100644 --- a/netfetch/src/metrics.rs +++ b/netfetch/src/metrics.rs @@ -569,37 +569,16 @@ pub async fn metrics_service( } pub async fn metrics_agg_task(local_stats: Arc, store_stats: Arc) -> Result<(), Error> { + use stats::rand_xoshiro::rand_core::RngCore; + let mut rng = stats::xoshiro_from_time(); let mut agg_last = CaConnStatsAgg::new(); loop { - tokio::time::sleep(Duration::from_millis(671)).await; + let dt = rng.next_u32(); + tokio::time::sleep(Duration::from_millis(500 + (dt as u64 & 0x7f))).await; let agg = CaConnStatsAgg::new(); agg.push(&local_stats); agg.push(&store_stats); trace!("TODO metrics_agg_task"); - // TODO when a CaConn is closed, I'll lose the so far collected counts, which creates a jump - // in the metrics. - // To make this sound: - // Let CaConn keep a stats and just count. - // At the tick, create a snapshot: all atomics are copied after each other. - // Diff this new snapshot with an older snapshot and send that. - // Note: some stats are counters, but some are current values. - // e.g. the number of active channels should go down when a CaConn stops. - #[cfg(DISABLED)] - { - let conn_stats_guard = ingest_commons.ca_conn_set.ca_conn_ress().lock().await; - for (_, g) in conn_stats_guard.iter() { - agg.push(g.stats()); - } - } - #[cfg(DISABLED)] - { - let mut m = METRICS.lock().unwrap(); - *m = Some(agg.clone()); - if false { - let diff = CaConnStatsAggDiff::diff_from(&agg_last, &agg); - info!("{}", diff.display()); - } - } agg_last = agg; } } diff --git a/scywr/Cargo.toml b/scywr/Cargo.toml index 50e1f44..b11fd7f 100644 --- a/scywr/Cargo.toml +++ b/scywr/Cargo.toml @@ -10,14 +10,11 @@ async-channel = "2.3.1" scylla = "0.15.0" smallvec = "1.11.0" pin-project = "1.1.5" -stackfuture = "0.3.0" bytes = "1.7.1" autoerr = "0.0.3" serde = { version = "1", features = ["derive"] } log = { path = "../log" } stats = { path = "../stats" } series = { path = "../../daqbuf-series", package = "daqbuf-series" } -err = { path = "../../daqbuf-err", package = "daqbuf-err" } netpod = { path = "../../daqbuf-netpod", package = "daqbuf-netpod" } taskrun = { path = "../../daqbuffer/crates/taskrun" } -#bitshuffle = { path = "../../daqbuffer/crates/bitshuffle" } diff --git a/scywr/src/access.rs b/scywr/src/access.rs index e97de18..964a59b 100644 --- a/scywr/src/access.rs +++ b/scywr/src/access.rs @@ -1,12 +1,11 @@ -use err::thiserror; -use err::ThisError; use scylla::transport::errors::DbError; use scylla::transport::errors::QueryError; -#[derive(Debug, ThisError)] -#[cstm(name = "ScyllaAccess")] -pub enum Error { - DbError(#[from] DbError), - QueryError(#[from] QueryError), - NoKeyspaceChosen, -} +autoerr::create_error_v1!( + name(Error, "ScyllaAccess"), + enum variants { + DbError(#[from] DbError), + QueryError(#[from] QueryError), + NoKeyspaceChosen, + }, +); diff --git a/scywr/src/err.rs b/scywr/src/err.rs deleted file mode 100644 index 62231b3..0000000 --- a/scywr/src/err.rs +++ /dev/null @@ -1,44 +0,0 @@ -use scylla::transport::errors::DbError; -use scylla::transport::errors::QueryError; - -#[derive(Debug)] -pub enum Error { - DbUnavailable, - DbOverload, - DbTimeout, - DbError(String), -} - -impl From for err::Error { - fn from(e: Error) -> Self { - err::Error::with_msg_no_trace(format!("{e:?}")) - } -} - -pub trait IntoSimplerError { - fn into_simpler(self) -> Error; -} - -impl IntoSimplerError for QueryError { - fn into_simpler(self) -> Error { - let e = &self; - match e { - QueryError::DbError(e, msg) => match e { - DbError::Unavailable { .. } => Error::DbUnavailable, - DbError::Overloaded => Error::DbOverload, - DbError::IsBootstrapping => Error::DbUnavailable, - DbError::ReadTimeout { .. } => Error::DbTimeout, - DbError::WriteTimeout { .. } => Error::DbTimeout, - _ => Error::DbError(format!("{e} {msg}")), - }, - QueryError::TimeoutError => Error::DbTimeout, - _ => Error::DbError(e.to_string()), - } - } -} - -impl From for Error { - fn from(e: T) -> Self { - e.into_simpler() - } -} diff --git a/scywr/src/insertqueues.rs b/scywr/src/insertqueues.rs index b21265c..50d89d3 100644 --- a/scywr/src/insertqueues.rs +++ b/scywr/src/insertqueues.rs @@ -5,21 +5,18 @@ use crate::senderpolling::SenderPolling; use async_channel::Receiver; use async_channel::Sender; use core::fmt; -use err::thiserror; -use err::ThisError; -use netpod::log::*; use netpod::ttl::RetentionTime; use pin_project::pin_project; use std::collections::VecDeque; use std::pin::Pin; -#[derive(Debug, ThisError)] -#[cstm(name = "ScyllaInsertQueue")] -pub enum Error { - QueuePush, - #[error("ChannelSend({0}, {1})")] - ChannelSend(RetentionTime, u8), -} +autoerr::create_error_v1!( + name(Error, "ScyllaInsertQueue"), + enum variants { + QueuePush, + ChannelSend(RetentionTime, u8), + }, +); #[derive(Clone)] pub struct InsertQueuesTx { diff --git a/scywr/src/insertworker.rs b/scywr/src/insertworker.rs index ed66825..428b991 100644 --- a/scywr/src/insertworker.rs +++ b/scywr/src/insertworker.rs @@ -11,7 +11,6 @@ use crate::iteminsertqueue::TimeBinSimpleF32V02; use crate::store::DataStore; use async_channel::Receiver; use atomic::AtomicU64; -use err::Error; use futures_util::Stream; use futures_util::StreamExt; use log::*; @@ -27,7 +26,6 @@ use std::time::Instant; use taskrun::tokio; use tokio::task::JoinHandle; -#[allow(unused)] macro_rules! trace2 { ($($arg:tt)*) => { if false { @@ -36,16 +34,6 @@ macro_rules! trace2 { }; } -#[allow(unused)] -macro_rules! trace3 { - ($($arg:tt)*) => { - if false { - trace!($($arg)*); - } - }; -} - -#[allow(unused)] macro_rules! trace_item_execute { ($($arg:tt)*) => { if false { @@ -54,7 +42,6 @@ macro_rules! trace_item_execute { }; } -#[allow(unused)] macro_rules! debug_setup { ($($arg:tt)*) => { if false { @@ -63,6 +50,13 @@ macro_rules! debug_setup { }; } +autoerr::create_error_v1!( + name(Error, "ScyllaInsertWorker"), + enum variants { + Store(#[from] crate::store::Error), + }, +); + fn stats_inc_for_err(stats: &stats::InsertWorkerStats, err: &crate::iteminsertqueue::Error) { use crate::iteminsertqueue::Error; match err { @@ -90,6 +84,9 @@ fn stats_inc_for_err(stats: &stats::InsertWorkerStats, err: &crate::iteminsertqu Error::GetValHelpInnerTypeMismatch => { stats.logic_error().inc(); } + Error::UnknownConnectionStatus => { + stats.logic_error().inc(); + } } } @@ -134,11 +131,7 @@ pub async fn spawn_scylla_insert_workers( let mut jhs = Vec::new(); let mut data_stores = Vec::new(); for _ in 0..insert_scylla_sessions { - let data_store = Arc::new( - DataStore::new(&scyconf, rett.clone()) - .await - .map_err(|e| Error::from(e.to_string()))?, - ); + let data_store = Arc::new(DataStore::new(&scyconf, rett.clone()).await?); data_stores.push(data_store); } for worker_ix in 0..insert_worker_count { diff --git a/scywr/src/iteminsertqueue.rs b/scywr/src/iteminsertqueue.rs index d794e3b..d4ac427 100644 --- a/scywr/src/iteminsertqueue.rs +++ b/scywr/src/iteminsertqueue.rs @@ -3,8 +3,6 @@ pub use netpod::CONNECTION_STATUS_DIV; use crate::session::ScySession; use crate::store::DataStore; use bytes::BufMut; -use err::thiserror; -use err::ThisError; use futures_util::Future; use futures_util::FutureExt; use netpod::channelstatus::ChannelStatus; @@ -33,17 +31,19 @@ use std::task::Poll; use std::time::Instant; use std::time::SystemTime; -#[derive(Debug, ThisError)] -#[cstm(name = "ScyllaItemInsertQueue")] -pub enum Error { - DbTimeout, - DbOverload, - DbUnavailable, - DbError(#[from] DbError), - QueryError(#[from] QueryError), - GetValHelpTodoWaveform, - GetValHelpInnerTypeMismatch, -} +autoerr::create_error_v1!( + name(Error, "ScyllaItemInsertQueue"), + enum variants { + DbTimeout, + DbOverload, + DbUnavailable, + DbError(#[from] DbError), + QueryError(#[from] QueryError), + GetValHelpTodoWaveform, + GetValHelpInnerTypeMismatch, + UnknownConnectionStatus, + }, +); #[derive(Clone, Debug, PartialEq)] pub enum ScalarValue { @@ -427,7 +427,7 @@ impl ConnectionStatus { } } - pub fn from_kind(kind: u32) -> Result { + pub fn from_kind(kind: u32) -> Result { use ConnectionStatus::*; let ret = match kind { 1 => ConnectError, @@ -437,9 +437,7 @@ impl ConnectionStatus { 5 => ClosedUnexpected, 6 => ConnectionHandlerDone, _ => { - return Err(err::Error::with_msg_no_trace(format!( - "unknown ConnectionStatus kind {kind}" - ))); + return Err(Error::UnknownConnectionStatus); } }; Ok(ret) diff --git a/scywr/src/lib.rs b/scywr/src/lib.rs index 9071e95..b18151b 100644 --- a/scywr/src/lib.rs +++ b/scywr/src/lib.rs @@ -1,6 +1,5 @@ pub mod access; pub mod config; -pub mod err; pub mod fut; pub mod futbatch; pub mod futbatchgen; diff --git a/scywr/src/schema.rs b/scywr/src/schema.rs index 6c72e92..b2cb15f 100644 --- a/scywr/src/schema.rs +++ b/scywr/src/schema.rs @@ -1,8 +1,6 @@ use crate::config::ScyllaIngestConfig; use crate::session::create_session_no_ks; use crate::session::ScySession; -use err::thiserror; -use err::ThisError; use futures_util::StreamExt; use futures_util::TryStreamExt; use log::*; @@ -12,19 +10,20 @@ use std::collections::BTreeMap; use std::fmt; use std::time::Duration; -#[derive(Debug, ThisError)] -#[cstm(name = "ScyllaSchema")] -pub enum Error { - NoKeyspaceChosen, - Fmt(#[from] fmt::Error), - Query(#[from] scylla::transport::errors::QueryError), - NewSession(String), - ScyllaNextRow(#[from] scylla::transport::iterator::NextRowError), - ScyllaTypecheck(#[from] scylla::deserialize::TypeCheckError), - MissingData, - AddColumnImpossible, - BadSchema, -} +autoerr::create_error_v1!( + name(Error, "ScyllaSchema"), + enum variants { + NoKeyspaceChosen, + Fmt(#[from] fmt::Error), + Query(#[from] scylla::transport::errors::QueryError), + NewSession(String), + ScyllaNextRow(#[from] scylla::transport::iterator::NextRowError), + ScyllaTypecheck(#[from] scylla::deserialize::TypeCheckError), + MissingData, + AddColumnImpossible, + BadSchema, + }, +); impl From for Error { fn from(value: crate::session::Error) -> Self { diff --git a/scywr/src/senderpolling.rs b/scywr/src/senderpolling.rs index 9c3ab23..d8ff42f 100644 --- a/scywr/src/senderpolling.rs +++ b/scywr/src/senderpolling.rs @@ -47,7 +47,7 @@ where fn _require_unpin(_: &T) {} fn _check_unpin() { - let _r: &SenderPolling = err::todoval(); + let _r: &SenderPolling = todo!(); // _require_unpin(_r); } diff --git a/scywr/src/session.rs b/scywr/src/session.rs index d6b6695..911da0d 100644 --- a/scywr/src/session.rs +++ b/scywr/src/session.rs @@ -2,18 +2,17 @@ pub use scylla::Session; pub use Session as ScySession; use crate::config::ScyllaIngestConfig; -use err::thiserror; -use err::ThisError; use scylla::execution_profile::ExecutionProfileBuilder; use scylla::statement::Consistency; use scylla::transport::errors::NewSessionError; use std::sync::Arc; -#[derive(Debug, ThisError)] -#[cstm(name = "ScyllaSession")] -pub enum Error { - NewSession(String), -} +autoerr::create_error_v1!( + name(Error, "ScyllaSession"), + enum variants { + NewSession(String), + }, +); impl From for Error { fn from(value: NewSessionError) -> Self { diff --git a/scywr/src/tools.rs b/scywr/src/tools.rs index 14c079e..719a634 100644 --- a/scywr/src/tools.rs +++ b/scywr/src/tools.rs @@ -2,39 +2,19 @@ use crate::config::ScyllaIngestConfig; use crate::session::create_session; use futures_util::TryStreamExt; use log::*; -use scylla::transport::errors::NewSessionError; -use scylla::transport::errors::QueryError; -pub struct Error(err::Error); - -impl err::ToErr for Error { - fn to_err(self) -> err::Error { - self.0 - } -} - -impl From for Error { - fn from(e: NewSessionError) -> Self { - Self(err::Error::with_msg_no_trace(format!("{e:?}"))) - } -} - -impl From for Error { - fn from(e: QueryError) -> Self { - Self(err::Error::with_msg_no_trace(format!("{e:?}"))) - } -} - -impl From for Error { - fn from(e: scylla::deserialize::TypeCheckError) -> Self { - Self(err::Error::with_msg_no_trace(format!("{e:?}"))) - } -} +autoerr::create_error_v1!( + name(Error, "ScyllaTools"), + enum variants { + Session(#[from] crate::session::Error), + ScyllaNewSession(#[from] scylla::transport::errors::NewSessionError), + ScyllaQueryError(#[from] scylla::transport::errors::QueryError), + ScyllaTypeCheck(#[from] scylla::deserialize::TypeCheckError), + }, +); pub async fn list_pkey(scylla_conf: &ScyllaIngestConfig) -> Result<(), Error> { - let scy = create_session(scylla_conf) - .await - .map_err(|e| Error(err::Error::with_msg_no_trace(e.to_string())))?; + let scy = create_session(scylla_conf).await?; let query = scy .prepare("select distinct token(pulse_a), pulse_a from pulse where token(pulse_a) >= ? and token(pulse_a) <= ?") .await?; @@ -65,9 +45,7 @@ pub async fn list_pkey(scylla_conf: &ScyllaIngestConfig) -> Result<(), Error> { } pub async fn list_pulses(scylla_conf: &ScyllaIngestConfig) -> Result<(), Error> { - let scy = create_session(scylla_conf) - .await - .map_err(|e| Error(err::Error::with_msg_no_trace(e.to_string())))?; + let scy = create_session(scylla_conf).await?; let query = scy .prepare("select token(tsa) as tsatok, tsa, tsb, pulse from pulse where token(tsa) >= ? and token(tsa) <= ?") .await?; @@ -96,10 +74,10 @@ pub async fn list_pulses(scylla_conf: &ScyllaIngestConfig) -> Result<(), Error> pub async fn fetch_events(backend: &str, channel: &str, scylla_conf: &ScyllaIngestConfig) -> Result<(), Error> { // TODO use the keyspace from commandline. - err::todo(); - let scy = create_session(scylla_conf) - .await - .map_err(|e| Error(err::Error::with_msg_no_trace(e.to_string())))?; + if true { + todo!(); + } + let scy = create_session(scylla_conf).await?; let qu_series = scy .prepare( "select series, scalar_type, shape_dims from series_by_channel where facility = ? and channel_name = ?",