Changed error type

This commit is contained in:
Dominik Werder
2024-12-06 13:36:50 +01:00
parent 016a7332d0
commit af49f9e181
14 changed files with 106 additions and 200 deletions
+10 -4
View File
@@ -33,24 +33,30 @@ async fn main_run(opts: DaqIngestOpts) -> Result<(), Error> {
}
async fn main_run_inner(opts: DaqIngestOpts) -> Result<(), Error> {
let buildmark = "+0008";
let buildmark = "+0009";
use daqingest::opts::ChannelAccess;
use daqingest::opts::SubCmd;
match opts.subcmd {
SubCmd::ListPkey => {
// TODO must take scylla config from CLI
let scylla_conf = err::todoval();
scywr::tools::list_pkey(&scylla_conf).await?
scywr::tools::list_pkey(&scylla_conf)
.await
.map_err(Error::from_string)?
}
SubCmd::ListPulses => {
// TODO must take scylla config from CLI
let scylla_conf = err::todoval();
scywr::tools::list_pulses(&scylla_conf).await?
scywr::tools::list_pulses(&scylla_conf)
.await
.map_err(Error::from_string)?
}
SubCmd::FetchEvents(k) => {
// TODO must take scylla config from CLI
let scylla_conf = err::todoval();
scywr::tools::fetch_events(&k.backend, &k.channel, &scylla_conf).await?
scywr::tools::fetch_events(&k.backend, &k.channel, &scylla_conf)
.await
.map_err(Error::from_string)?
}
SubCmd::Db(k) => {
use daqingest::opts::DbSub;
+15 -9
View File
@@ -66,7 +66,7 @@ pub struct Daemon {
count_unassigned: usize,
count_assigned: usize,
last_status_print: SystemTime,
insert_workers_jh: Vec<JoinHandle<Result<(), Error>>>,
insert_workers_jhs: Vec<JoinHandle<Result<(), scywr::insertworker::Error>>>,
stats: Arc<DaemonStats>,
insert_worker_stats: Arc<InsertWorkerStats>,
series_by_channel_stats: Arc<SeriesByChannelStats>,
@@ -197,7 +197,8 @@ impl Daemon {
insert_worker_opts.clone(),
insert_worker_stats.clone(),
)
.await?;
.await
.map_err(Error::from_string)?;
insert_worker_jhs.extend(jh);
let jh = scywr::insertworker::spawn_scylla_insert_workers_dummy(
ingest_opts.insert_worker_count(),
@@ -206,7 +207,8 @@ impl Daemon {
insert_worker_opts.clone(),
insert_worker_stats.clone(),
)
.await?;
.await
.map_err(Error::from_string)?;
insert_worker_jhs.extend(jh);
let jh = scywr::insertworker::spawn_scylla_insert_workers_dummy(
ingest_opts.insert_worker_count(),
@@ -215,7 +217,8 @@ impl Daemon {
insert_worker_opts.clone(),
insert_worker_stats.clone(),
)
.await?;
.await
.map_err(Error::from_string)?;
insert_worker_jhs.extend(jh);
} else {
let jh = scywr::insertworker::spawn_scylla_insert_workers(
@@ -229,7 +232,8 @@ impl Daemon {
insert_worker_stats.clone(),
ingest_opts.use_rate_limit_queue(),
)
.await?;
.await
.map_err(Error::from_string)?;
insert_worker_jhs.extend(jh);
let jh = scywr::insertworker::spawn_scylla_insert_workers(
@@ -243,7 +247,8 @@ impl Daemon {
insert_worker_stats.clone(),
ingest_opts.use_rate_limit_queue(),
)
.await?;
.await
.map_err(Error::from_string)?;
insert_worker_jhs.extend(jh);
let jh = scywr::insertworker::spawn_scylla_insert_workers(
@@ -257,7 +262,8 @@ impl Daemon {
insert_worker_stats.clone(),
ingest_opts.use_rate_limit_queue(),
)
.await?;
.await
.map_err(Error::from_string)?;
insert_worker_jhs.extend(jh);
};
let stats = Arc::new(DaemonStats::new());
@@ -311,7 +317,7 @@ impl Daemon {
count_unassigned: 0,
count_assigned: 0,
last_status_print: SystemTime::now(),
insert_workers_jh: insert_worker_jhs,
insert_workers_jhs: insert_worker_jhs,
stats,
insert_worker_stats,
series_by_channel_stats,
@@ -720,7 +726,7 @@ impl Daemon {
}
debug!("joined metrics handler");
debug!("wait for insert workers");
while let Some(jh) = self.insert_workers_jh.pop() {
while let Some(jh) = self.insert_workers_jhs.pop() {
match jh.await.map_err(Error::from_string) {
Ok(x) => match x {
Ok(()) => {
+4 -25
View File
@@ -569,37 +569,16 @@ pub async fn metrics_service(
}
pub async fn metrics_agg_task(local_stats: Arc<CaConnStats>, store_stats: Arc<CaConnStats>) -> Result<(), Error> {
use stats::rand_xoshiro::rand_core::RngCore;
let mut rng = stats::xoshiro_from_time();
let mut agg_last = CaConnStatsAgg::new();
loop {
tokio::time::sleep(Duration::from_millis(671)).await;
let dt = rng.next_u32();
tokio::time::sleep(Duration::from_millis(500 + (dt as u64 & 0x7f))).await;
let agg = CaConnStatsAgg::new();
agg.push(&local_stats);
agg.push(&store_stats);
trace!("TODO metrics_agg_task");
// TODO when a CaConn is closed, I'll lose the so far collected counts, which creates a jump
// in the metrics.
// To make this sound:
// Let CaConn keep a stats and just count.
// At the tick, create a snapshot: all atomics are copied after each other.
// Diff this new snapshot with an older snapshot and send that.
// Note: some stats are counters, but some are current values.
// e.g. the number of active channels should go down when a CaConn stops.
#[cfg(DISABLED)]
{
let conn_stats_guard = ingest_commons.ca_conn_set.ca_conn_ress().lock().await;
for (_, g) in conn_stats_guard.iter() {
agg.push(g.stats());
}
}
#[cfg(DISABLED)]
{
let mut m = METRICS.lock().unwrap();
*m = Some(agg.clone());
if false {
let diff = CaConnStatsAggDiff::diff_from(&agg_last, &agg);
info!("{}", diff.display());
}
}
agg_last = agg;
}
}
-3
View File
@@ -10,14 +10,11 @@ async-channel = "2.3.1"
scylla = "0.15.0"
smallvec = "1.11.0"
pin-project = "1.1.5"
stackfuture = "0.3.0"
bytes = "1.7.1"
autoerr = "0.0.3"
serde = { version = "1", features = ["derive"] }
log = { path = "../log" }
stats = { path = "../stats" }
series = { path = "../../daqbuf-series", package = "daqbuf-series" }
err = { path = "../../daqbuf-err", package = "daqbuf-err" }
netpod = { path = "../../daqbuf-netpod", package = "daqbuf-netpod" }
taskrun = { path = "../../daqbuffer/crates/taskrun" }
#bitshuffle = { path = "../../daqbuffer/crates/bitshuffle" }
+8 -9
View File
@@ -1,12 +1,11 @@
use err::thiserror;
use err::ThisError;
use scylla::transport::errors::DbError;
use scylla::transport::errors::QueryError;
#[derive(Debug, ThisError)]
#[cstm(name = "ScyllaAccess")]
pub enum Error {
DbError(#[from] DbError),
QueryError(#[from] QueryError),
NoKeyspaceChosen,
}
autoerr::create_error_v1!(
name(Error, "ScyllaAccess"),
enum variants {
DbError(#[from] DbError),
QueryError(#[from] QueryError),
NoKeyspaceChosen,
},
);
-44
View File
@@ -1,44 +0,0 @@
use scylla::transport::errors::DbError;
use scylla::transport::errors::QueryError;
#[derive(Debug)]
pub enum Error {
DbUnavailable,
DbOverload,
DbTimeout,
DbError(String),
}
impl From<Error> for err::Error {
fn from(e: Error) -> Self {
err::Error::with_msg_no_trace(format!("{e:?}"))
}
}
pub trait IntoSimplerError {
fn into_simpler(self) -> Error;
}
impl IntoSimplerError for QueryError {
fn into_simpler(self) -> Error {
let e = &self;
match e {
QueryError::DbError(e, msg) => match e {
DbError::Unavailable { .. } => Error::DbUnavailable,
DbError::Overloaded => Error::DbOverload,
DbError::IsBootstrapping => Error::DbUnavailable,
DbError::ReadTimeout { .. } => Error::DbTimeout,
DbError::WriteTimeout { .. } => Error::DbTimeout,
_ => Error::DbError(format!("{e} {msg}")),
},
QueryError::TimeoutError => Error::DbTimeout,
_ => Error::DbError(e.to_string()),
}
}
}
impl<T: IntoSimplerError> From<T> for Error {
fn from(e: T) -> Self {
e.into_simpler()
}
}
+7 -10
View File
@@ -5,21 +5,18 @@ use crate::senderpolling::SenderPolling;
use async_channel::Receiver;
use async_channel::Sender;
use core::fmt;
use err::thiserror;
use err::ThisError;
use netpod::log::*;
use netpod::ttl::RetentionTime;
use pin_project::pin_project;
use std::collections::VecDeque;
use std::pin::Pin;
#[derive(Debug, ThisError)]
#[cstm(name = "ScyllaInsertQueue")]
pub enum Error {
QueuePush,
#[error("ChannelSend({0}, {1})")]
ChannelSend(RetentionTime, u8),
}
autoerr::create_error_v1!(
name(Error, "ScyllaInsertQueue"),
enum variants {
QueuePush,
ChannelSend(RetentionTime, u8),
},
);
#[derive(Clone)]
pub struct InsertQueuesTx {
+11 -18
View File
@@ -11,7 +11,6 @@ use crate::iteminsertqueue::TimeBinSimpleF32V02;
use crate::store::DataStore;
use async_channel::Receiver;
use atomic::AtomicU64;
use err::Error;
use futures_util::Stream;
use futures_util::StreamExt;
use log::*;
@@ -27,7 +26,6 @@ use std::time::Instant;
use taskrun::tokio;
use tokio::task::JoinHandle;
#[allow(unused)]
macro_rules! trace2 {
($($arg:tt)*) => {
if false {
@@ -36,16 +34,6 @@ macro_rules! trace2 {
};
}
#[allow(unused)]
macro_rules! trace3 {
($($arg:tt)*) => {
if false {
trace!($($arg)*);
}
};
}
#[allow(unused)]
macro_rules! trace_item_execute {
($($arg:tt)*) => {
if false {
@@ -54,7 +42,6 @@ macro_rules! trace_item_execute {
};
}
#[allow(unused)]
macro_rules! debug_setup {
($($arg:tt)*) => {
if false {
@@ -63,6 +50,13 @@ macro_rules! debug_setup {
};
}
autoerr::create_error_v1!(
name(Error, "ScyllaInsertWorker"),
enum variants {
Store(#[from] crate::store::Error),
},
);
fn stats_inc_for_err(stats: &stats::InsertWorkerStats, err: &crate::iteminsertqueue::Error) {
use crate::iteminsertqueue::Error;
match err {
@@ -90,6 +84,9 @@ fn stats_inc_for_err(stats: &stats::InsertWorkerStats, err: &crate::iteminsertqu
Error::GetValHelpInnerTypeMismatch => {
stats.logic_error().inc();
}
Error::UnknownConnectionStatus => {
stats.logic_error().inc();
}
}
}
@@ -134,11 +131,7 @@ pub async fn spawn_scylla_insert_workers(
let mut jhs = Vec::new();
let mut data_stores = Vec::new();
for _ in 0..insert_scylla_sessions {
let data_store = Arc::new(
DataStore::new(&scyconf, rett.clone())
.await
.map_err(|e| Error::from(e.to_string()))?,
);
let data_store = Arc::new(DataStore::new(&scyconf, rett.clone()).await?);
data_stores.push(data_store);
}
for worker_ix in 0..insert_worker_count {
+15 -17
View File
@@ -3,8 +3,6 @@ pub use netpod::CONNECTION_STATUS_DIV;
use crate::session::ScySession;
use crate::store::DataStore;
use bytes::BufMut;
use err::thiserror;
use err::ThisError;
use futures_util::Future;
use futures_util::FutureExt;
use netpod::channelstatus::ChannelStatus;
@@ -33,17 +31,19 @@ use std::task::Poll;
use std::time::Instant;
use std::time::SystemTime;
#[derive(Debug, ThisError)]
#[cstm(name = "ScyllaItemInsertQueue")]
pub enum Error {
DbTimeout,
DbOverload,
DbUnavailable,
DbError(#[from] DbError),
QueryError(#[from] QueryError),
GetValHelpTodoWaveform,
GetValHelpInnerTypeMismatch,
}
autoerr::create_error_v1!(
name(Error, "ScyllaItemInsertQueue"),
enum variants {
DbTimeout,
DbOverload,
DbUnavailable,
DbError(#[from] DbError),
QueryError(#[from] QueryError),
GetValHelpTodoWaveform,
GetValHelpInnerTypeMismatch,
UnknownConnectionStatus,
},
);
#[derive(Clone, Debug, PartialEq)]
pub enum ScalarValue {
@@ -427,7 +427,7 @@ impl ConnectionStatus {
}
}
pub fn from_kind(kind: u32) -> Result<Self, err::Error> {
pub fn from_kind(kind: u32) -> Result<Self, Error> {
use ConnectionStatus::*;
let ret = match kind {
1 => ConnectError,
@@ -437,9 +437,7 @@ impl ConnectionStatus {
5 => ClosedUnexpected,
6 => ConnectionHandlerDone,
_ => {
return Err(err::Error::with_msg_no_trace(format!(
"unknown ConnectionStatus kind {kind}"
)));
return Err(Error::UnknownConnectionStatus);
}
};
Ok(ret)
-1
View File
@@ -1,6 +1,5 @@
pub mod access;
pub mod config;
pub mod err;
pub mod fut;
pub mod futbatch;
pub mod futbatchgen;
+14 -15
View File
@@ -1,8 +1,6 @@
use crate::config::ScyllaIngestConfig;
use crate::session::create_session_no_ks;
use crate::session::ScySession;
use err::thiserror;
use err::ThisError;
use futures_util::StreamExt;
use futures_util::TryStreamExt;
use log::*;
@@ -12,19 +10,20 @@ use std::collections::BTreeMap;
use std::fmt;
use std::time::Duration;
#[derive(Debug, ThisError)]
#[cstm(name = "ScyllaSchema")]
pub enum Error {
NoKeyspaceChosen,
Fmt(#[from] fmt::Error),
Query(#[from] scylla::transport::errors::QueryError),
NewSession(String),
ScyllaNextRow(#[from] scylla::transport::iterator::NextRowError),
ScyllaTypecheck(#[from] scylla::deserialize::TypeCheckError),
MissingData,
AddColumnImpossible,
BadSchema,
}
autoerr::create_error_v1!(
name(Error, "ScyllaSchema"),
enum variants {
NoKeyspaceChosen,
Fmt(#[from] fmt::Error),
Query(#[from] scylla::transport::errors::QueryError),
NewSession(String),
ScyllaNextRow(#[from] scylla::transport::iterator::NextRowError),
ScyllaTypecheck(#[from] scylla::deserialize::TypeCheckError),
MissingData,
AddColumnImpossible,
BadSchema,
},
);
impl From<crate::session::Error> for Error {
fn from(value: crate::session::Error) -> Self {
+1 -1
View File
@@ -47,7 +47,7 @@ where
fn _require_unpin<T: Unpin>(_: &T) {}
fn _check_unpin() {
let _r: &SenderPolling<String> = err::todoval();
let _r: &SenderPolling<String> = todo!();
// _require_unpin(_r);
}
+6 -7
View File
@@ -2,18 +2,17 @@ pub use scylla::Session;
pub use Session as ScySession;
use crate::config::ScyllaIngestConfig;
use err::thiserror;
use err::ThisError;
use scylla::execution_profile::ExecutionProfileBuilder;
use scylla::statement::Consistency;
use scylla::transport::errors::NewSessionError;
use std::sync::Arc;
#[derive(Debug, ThisError)]
#[cstm(name = "ScyllaSession")]
pub enum Error {
NewSession(String),
}
autoerr::create_error_v1!(
name(Error, "ScyllaSession"),
enum variants {
NewSession(String),
},
);
impl From<NewSessionError> for Error {
fn from(value: NewSessionError) -> Self {
+15 -37
View File
@@ -2,39 +2,19 @@ use crate::config::ScyllaIngestConfig;
use crate::session::create_session;
use futures_util::TryStreamExt;
use log::*;
use scylla::transport::errors::NewSessionError;
use scylla::transport::errors::QueryError;
pub struct Error(err::Error);
impl err::ToErr for Error {
fn to_err(self) -> err::Error {
self.0
}
}
impl From<NewSessionError> for Error {
fn from(e: NewSessionError) -> Self {
Self(err::Error::with_msg_no_trace(format!("{e:?}")))
}
}
impl From<QueryError> for Error {
fn from(e: QueryError) -> Self {
Self(err::Error::with_msg_no_trace(format!("{e:?}")))
}
}
impl From<scylla::deserialize::TypeCheckError> for Error {
fn from(e: scylla::deserialize::TypeCheckError) -> Self {
Self(err::Error::with_msg_no_trace(format!("{e:?}")))
}
}
autoerr::create_error_v1!(
name(Error, "ScyllaTools"),
enum variants {
Session(#[from] crate::session::Error),
ScyllaNewSession(#[from] scylla::transport::errors::NewSessionError),
ScyllaQueryError(#[from] scylla::transport::errors::QueryError),
ScyllaTypeCheck(#[from] scylla::deserialize::TypeCheckError),
},
);
pub async fn list_pkey(scylla_conf: &ScyllaIngestConfig) -> Result<(), Error> {
let scy = create_session(scylla_conf)
.await
.map_err(|e| Error(err::Error::with_msg_no_trace(e.to_string())))?;
let scy = create_session(scylla_conf).await?;
let query = scy
.prepare("select distinct token(pulse_a), pulse_a from pulse where token(pulse_a) >= ? and token(pulse_a) <= ?")
.await?;
@@ -65,9 +45,7 @@ pub async fn list_pkey(scylla_conf: &ScyllaIngestConfig) -> Result<(), Error> {
}
pub async fn list_pulses(scylla_conf: &ScyllaIngestConfig) -> Result<(), Error> {
let scy = create_session(scylla_conf)
.await
.map_err(|e| Error(err::Error::with_msg_no_trace(e.to_string())))?;
let scy = create_session(scylla_conf).await?;
let query = scy
.prepare("select token(tsa) as tsatok, tsa, tsb, pulse from pulse where token(tsa) >= ? and token(tsa) <= ?")
.await?;
@@ -96,10 +74,10 @@ pub async fn list_pulses(scylla_conf: &ScyllaIngestConfig) -> Result<(), Error>
pub async fn fetch_events(backend: &str, channel: &str, scylla_conf: &ScyllaIngestConfig) -> Result<(), Error> {
// TODO use the keyspace from commandline.
err::todo();
let scy = create_session(scylla_conf)
.await
.map_err(|e| Error(err::Error::with_msg_no_trace(e.to_string())))?;
if true {
todo!();
}
let scy = create_session(scylla_conf).await?;
let qu_series = scy
.prepare(
"select series, scalar_type, shape_dims from series_by_channel where facility = ? and channel_name = ?",