Rework some error types

This commit is contained in:
Dominik Werder
2025-02-07 12:04:31 +01:00
parent 1aab18dc7f
commit 80dfb4069d
12 changed files with 94 additions and 85 deletions

View File

@@ -58,7 +58,7 @@ pub async fn delay_us(mu: u64) {
}
pub async fn delay_io_short() {
delay_us(1000).await;
delay_us(500).await;
}
pub async fn delay_io_medium() {

View File

@@ -54,6 +54,7 @@ fn _get_hostname() -> Result<String, Error> {
}
pub async fn get_node_disk_ident(node_config: &NodeConfigCached, dbc: &Client) -> Result<NodeDiskIdent, Error> {
info!("get_node_disk_ident");
let sql = "select nodes.rowid, facility, split, hostname from nodes, facilities where facilities.name = $1 and facility = facilities.rowid and hostname = $2";
let rows = dbc
.query(sql, &[&node_config.node_config.cluster.backend, &node_config.node.host])
@@ -61,7 +62,7 @@ pub async fn get_node_disk_ident(node_config: &NodeConfigCached, dbc: &Client) -
.err_conv()?;
if rows.len() != 1 {
return Err(Error::with_msg(format!(
"get_node can't find unique entry for {} {}",
"get_node_disk_ident can not find unique entry for {:?} {:?}",
node_config.node.host, node_config.node_config.cluster.backend
)));
}
@@ -202,7 +203,19 @@ async fn update_db_with_channel_names_inner(
) -> Result<(), Error> {
let (dbc, _pgjh) = create_connection(&db_config).await?;
info!("update_db_with_channel_names connection done");
let node_disk_ident = get_node_disk_ident(&node_config, &dbc).await?;
let node_disk_ident = match get_node_disk_ident(&node_config, &dbc).await {
Ok(x) => {
// info!("node_disk_ident {:?}", x);
debug!("node_disk_ident {:?}", x);
x
}
Err(e) => {
let p1 = &node_config.node.host;
let p2 = &node_config.node_config.cluster.backend;
error!("can not get_node_disk_ident {:?} {:?}", p1, p2);
return Err(e);
}
};
info!("update_db_with_channel_names get_node_disk_ident done");
let insert_sql = concat!(
"insert into channels (facility, name) select facility, name from (values ($1::bigint, $2::text)) v1 (facility, name)",
@@ -229,8 +242,13 @@ async fn update_db_with_channel_names_inner(
.as_ref()
.ok_or_else(|| Error::with_msg(format!("missing sf databuffer config in node")))?
.data_base_path;
debug!("base_path {:?}", base_path);
let channel_names = find_channel_names_from_config(base_path).await?;
for ch in channel_names {
if ch.contains("BEAT") || ch.contains("BEAM") {
debug!("ch {:?}", ch);
}
debug!("ch {:?}", ch);
let fac = node_disk_ident.facility;
crate::delay_io_short().await;
let ret = dbc
@@ -336,7 +354,19 @@ async fn update_db_with_all_channel_configs_inner(
let node_config = &node_config;
let (dbc, _pgjh) = create_connection(&node_config.node_config.cluster.database).await?;
let dbc = Arc::new(dbc);
let node_disk_ident = &get_node_disk_ident(node_config, &dbc).await?;
let node_disk_ident = match get_node_disk_ident(node_config, &dbc).await {
Ok(x) => {
// info!("node_disk_ident {:?}", x);
debug!("node_disk_ident {:?}", x);
x
}
Err(e) => {
let p1 = &node_config.node.host;
let p2 = &node_config.node_config.cluster.backend;
error!("ww can not get_node_disk_ident {:?} {:?}", p1, p2);
return Err(e);
}
};
let rows = dbc
.query(
"select rowid, facility, name from channels where facility = $1 order by facility, name",
@@ -353,9 +383,10 @@ async fn update_db_with_all_channel_configs_inner(
let rowid: i64 = row.try_get(0).err_conv()?;
let _facility: i64 = row.try_get(1).err_conv()?;
let channel: String = row.try_get(2).err_conv()?;
debug!("updating config {:?}", channel);
match update_db_with_channel_config(
node_config,
node_disk_ident,
&node_disk_ident,
rowid,
&channel,
dbc.clone(),

View File

@@ -28,6 +28,7 @@ rand = "0.8.5"
ciborium = "0.2.1"
flate2 = "1"
brotli = "7.0.0"
autoerr = "0.0.3"
daqbuf-err = { path = "../../../daqbuf-err" }
netpod = { path = "../../../daqbuf-netpod", package = "daqbuf-netpod" }
query = { path = "../../../daqbuf-query", package = "daqbuf-query" }

View File

@@ -1,6 +1,5 @@
use crate::response;
use crate::ServiceSharedResources;
use daqbuf_err::thiserror;
use dbconn::create_connection;
use dbconn::worker::PgQueue;
use futures_util::StreamExt;
@@ -41,37 +40,38 @@ use serde::Serialize;
use std::collections::BTreeMap;
use url::Url;
#[derive(Debug, thiserror::Error)]
#[cstm(name = "ChannelConfigError")]
pub enum Error {
NotFound(SfDbChannel),
ConfigQuorum(#[from] nodenet::configquorum::Error),
ConfigNode(#[from] nodenet::channelconfig::Error),
Http(#[from] crate::Error),
HttpCrate(#[from] http::Error),
// TODO create dedicated error type for query parsing
BadQuery(#[from] daqbuf_err::Error),
MissingBackend,
MissingScalarType,
MissingShape,
MissingShapeKind,
MissingEdge,
MissingTimerange,
MissingChannelName,
Uri(#[from] netpod::UriError),
ChannelConfigQuery(daqbuf_err::Error),
ExpectScyllaBackend,
Pg(#[from] dbconn::pg::Error),
Scylla(String),
Join,
OtherErr(daqbuf_err::Error),
PgWorker(#[from] dbconn::worker::Error),
Async(#[from] netpod::AsyncChannelError),
ChannelConfig(#[from] dbconn::channelconfig::Error),
Netpod(#[from] netpod::Error),
ScyllaQuery(#[from] scyllaconn::scylla::transport::errors::QueryError),
ScyllaTypeCheck(#[from] scyllaconn::scylla::deserialize::TypeCheckError),
}
autoerr::create_error_v1!(
name(Error, "ChannelConfigError"),
enum variants {
NotFound(SfDbChannel),
ConfigQuorum(#[from] nodenet::configquorum::Error),
ConfigNode(#[from] nodenet::channelconfig::Error),
Http(#[from] crate::Error),
HttpCrate(#[from] http::Error),
// TODO create dedicated error type for query parsing
BadQuery(#[from] daqbuf_err::Error),
MissingBackend,
MissingScalarType,
MissingShape,
MissingShapeKind,
MissingEdge,
MissingTimerange,
MissingChannelName,
Uri(#[from] netpod::UriError),
ChannelConfigQuery(daqbuf_err::Error),
ExpectScyllaBackend,
Pg(#[from] dbconn::pg::Error),
Scylla(String),
Join,
OtherErr(daqbuf_err::Error),
PgWorker(#[from] dbconn::worker::Error),
Async(#[from] netpod::AsyncChannelError),
ChannelConfig(#[from] dbconn::channelconfig::Error),
Netpod(#[from] netpod::Error),
ScyllaQuery(#[from] scyllaconn::scylla::transport::errors::QueryError),
ScyllaTypeCheck(#[from] scyllaconn::scylla::deserialize::TypeCheckError),
},
);
fn other_err_error(e: daqbuf_err::Error) -> Error {
Error::OtherErr(e)

View File

@@ -17,8 +17,6 @@ pub mod settings;
use crate::bodystream::response;
use crate::err::Error;
use daqbuf_err;
use daqbuf_err::thiserror;
use daqbuf_err::ThisError;
use dbconn::worker::PgQueue;
use dbconn::worker::PgWorker;
use futures_util::Future;
@@ -42,14 +40,11 @@ use netpod::status_board_init;
use netpod::NodeConfigCached;
use netpod::ReqCtx;
use netpod::ServiceVersion;
use netpod::APP_JSON;
use panic::AssertUnwindSafe;
use panic::UnwindSafe;
use pin::Pin;
use scyllaconn::worker::ScyllaQueue;
use scyllaconn::worker::ScyllaWorker;
use serde::Deserialize;
use serde::Serialize;
use std::net;
use std::panic;
use std::pin;
@@ -61,25 +56,20 @@ use taskrun::tokio;
use taskrun::tokio::net::TcpListener;
use tracing::Instrument;
#[derive(Debug, ThisError, Serialize, Deserialize)]
#[cstm(name = "Retrieval")]
pub enum RetrievalError {
Error(#[from] daqbuf_err::Error),
Error2(#[from] crate::err::Error),
TextError(String),
#[serde(skip)]
Hyper(#[from] hyper::Error),
#[serde(skip)]
Http(#[from] http::Error),
#[serde(skip)]
Serde(#[from] serde_json::Error),
#[serde(skip)]
Fmt(#[from] std::fmt::Error),
#[serde(skip)]
Url(#[from] url::ParseError),
#[serde(skip)]
Netpod(#[from] netpod::Error),
}
autoerr::create_error_v1!(
name(RetrievalError, "Retrieval"),
enum variants {
Error(#[from] daqbuf_err::Error),
Error2(#[from] crate::err::Error),
TextError(String),
Hyper(#[from] hyper::Error),
Http(#[from] http::Error),
Serde(#[from] serde_json::Error),
Fmt(#[from] std::fmt::Error),
Url(#[from] url::ParseError),
Netpod(#[from] netpod::Error),
},
);
trait IntoBoxedError: std::error::Error {}
impl IntoBoxedError for net::AddrParseError {}

View File

@@ -189,8 +189,6 @@ async fn proxy_http_service_inner(
}
} else if let Some(h) = api4::StatusNodesRecursive::handler(&req) {
h.handle(req, ctx, &proxy_config, service_version).await
} else if path == "/api/4/backends" {
Ok(backends(req, proxy_config).await?)
} else if let Some(h) = api4::backend::BackendListHandler::handler(&req) {
h.handle(req, ctx, &proxy_config).await
} else if let Some(h) = api4::ChannelSearchAggHandler::handler(&req) {

View File

@@ -1,7 +1,5 @@
use crate::log::*;
use daqbuf_err as err;
use err::thiserror;
use err::ThisError;
use futures_util::TryStreamExt;
use netpod::ttl::RetentionTime;
use netpod::TsMs;

View File

@@ -11,7 +11,7 @@ use netpod::TsNano;
use scylla::Session as ScySession;
use std::ops::Range;
use streams::timebin::cached::reader::BinsReadRes;
use streams::timebin::cached::reader::PrebinnedPartitioning;
use daqbuf_series::msp::PrebinnedPartitioning;
async fn scylla_read_prebinned_f32(
series: u64,

View File

@@ -1,6 +1,5 @@
use daqbuf_err as err;
use err::Error;
use scylla::frame::response::cql_to_rust::FromRowError as ScyFromRowError;
use scylla::transport::errors::NewSessionError as ScyNewSessionError;
use scylla::transport::errors::QueryError as ScyQueryError;
@@ -34,15 +33,6 @@ impl<T> ErrConv<T> for Result<T, ScyNewSessionError> {
}
}
impl<T> ErrConv<T> for Result<T, ScyFromRowError> {
fn err_conv(self) -> Result<T, Error> {
match self {
Ok(k) => Ok(k),
Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))),
}
}
}
impl<T> ErrConv<T> for Result<T, scylla::deserialize::TypeCheckError> {
fn err_conv(self) -> Result<T, Error> {
match self {

View File

@@ -32,7 +32,6 @@ autoerr::create_error_v1!(
Prepare(#[from] crate::events2::prepare::Error),
ScyllaQuery(#[from] scylla::transport::errors::QueryError),
ScyllaNextRow(#[from] scylla::transport::iterator::NextRowError),
ScyllaTypeConv(#[from] scylla::cql_to_rust::FromRowError),
ScyllaWorker(Box<crate::worker::Error>),
ScyllaTypeCheck(#[from] scylla::deserialize::TypeCheckError),
MissingQuery(String),

View File

@@ -21,6 +21,7 @@ byteorder = "1.4.3"
async-channel = "1.9.0"
rand_xoshiro = "0.6.0"
thiserror = "=0.0.1"
autoerr = "0.0.3"
chrono = { version = "0.4.19", features = ["serde"] }
netpod = { path = "../../../daqbuf-netpod", package = "daqbuf-netpod" }
query = { path = "../../../daqbuf-query", package = "daqbuf-query" }

View File

@@ -25,14 +25,15 @@ use streams::tcprawclient::HttpSimplePost;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;
#[derive(Debug, thiserror::Error)]
#[cstm(name = "TcpRawClient")]
pub enum Error {
IO(#[from] std::io::Error),
Frame(#[from] items_2::frame::Error),
Framable(#[from] items_2::framable::Error),
StreamsRawClient(#[from] streams::tcprawclient::Error),
}
autoerr::create_error_v1!(
name(Error, "TcpRawClient"),
enum variants {
IO(#[from] std::io::Error),
Frame(#[from] items_2::frame::Error),
Framable(#[from] items_2::framable::Error),
StreamsRawClient(#[from] streams::tcprawclient::Error),
},
);
pub type BoxedStream<T> = Pin<Box<dyn Stream<Item = Sitemty<T>> + Send>>;