diff --git a/crates/dbconn/src/dbconn.rs b/crates/dbconn/src/dbconn.rs index ffed680..9c465e6 100644 --- a/crates/dbconn/src/dbconn.rs +++ b/crates/dbconn/src/dbconn.rs @@ -58,7 +58,7 @@ pub async fn delay_us(mu: u64) { } pub async fn delay_io_short() { - delay_us(1000).await; + delay_us(500).await; } pub async fn delay_io_medium() { diff --git a/crates/dbconn/src/scan.rs b/crates/dbconn/src/scan.rs index f827342..9b2f7cc 100644 --- a/crates/dbconn/src/scan.rs +++ b/crates/dbconn/src/scan.rs @@ -54,6 +54,7 @@ fn _get_hostname() -> Result { } pub async fn get_node_disk_ident(node_config: &NodeConfigCached, dbc: &Client) -> Result { + info!("get_node_disk_ident"); let sql = "select nodes.rowid, facility, split, hostname from nodes, facilities where facilities.name = $1 and facility = facilities.rowid and hostname = $2"; let rows = dbc .query(sql, &[&node_config.node_config.cluster.backend, &node_config.node.host]) @@ -61,7 +62,7 @@ pub async fn get_node_disk_ident(node_config: &NodeConfigCached, dbc: &Client) - .err_conv()?; if rows.len() != 1 { return Err(Error::with_msg(format!( - "get_node can't find unique entry for {} {}", + "get_node_disk_ident can not find unique entry for {:?} {:?}", node_config.node.host, node_config.node_config.cluster.backend ))); } @@ -202,7 +203,19 @@ async fn update_db_with_channel_names_inner( ) -> Result<(), Error> { let (dbc, _pgjh) = create_connection(&db_config).await?; info!("update_db_with_channel_names connection done"); - let node_disk_ident = get_node_disk_ident(&node_config, &dbc).await?; + let node_disk_ident = match get_node_disk_ident(&node_config, &dbc).await { + Ok(x) => { + // info!("node_disk_ident {:?}", x); + debug!("node_disk_ident {:?}", x); + x + } + Err(e) => { + let p1 = &node_config.node.host; + let p2 = &node_config.node_config.cluster.backend; + error!("can not get_node_disk_ident {:?} {:?}", p1, p2); + return Err(e); + } + }; info!("update_db_with_channel_names get_node_disk_ident done"); let insert_sql = concat!( "insert into channels (facility, name) select facility, name from (values ($1::bigint, $2::text)) v1 (facility, name)", @@ -229,8 +242,13 @@ async fn update_db_with_channel_names_inner( .as_ref() .ok_or_else(|| Error::with_msg(format!("missing sf databuffer config in node")))? .data_base_path; + debug!("base_path {:?}", base_path); let channel_names = find_channel_names_from_config(base_path).await?; for ch in channel_names { + if ch.contains("BEAT") || ch.contains("BEAM") { + debug!("ch {:?}", ch); + } + debug!("ch {:?}", ch); let fac = node_disk_ident.facility; crate::delay_io_short().await; let ret = dbc @@ -336,7 +354,19 @@ async fn update_db_with_all_channel_configs_inner( let node_config = &node_config; let (dbc, _pgjh) = create_connection(&node_config.node_config.cluster.database).await?; let dbc = Arc::new(dbc); - let node_disk_ident = &get_node_disk_ident(node_config, &dbc).await?; + let node_disk_ident = match get_node_disk_ident(node_config, &dbc).await { + Ok(x) => { + // info!("node_disk_ident {:?}", x); + debug!("node_disk_ident {:?}", x); + x + } + Err(e) => { + let p1 = &node_config.node.host; + let p2 = &node_config.node_config.cluster.backend; + error!("ww can not get_node_disk_ident {:?} {:?}", p1, p2); + return Err(e); + } + }; let rows = dbc .query( "select rowid, facility, name from channels where facility = $1 order by facility, name", @@ -353,9 +383,10 @@ async fn update_db_with_all_channel_configs_inner( let rowid: i64 = row.try_get(0).err_conv()?; let _facility: i64 = row.try_get(1).err_conv()?; let channel: String = row.try_get(2).err_conv()?; + debug!("updating config {:?}", channel); match update_db_with_channel_config( node_config, - node_disk_ident, + &node_disk_ident, rowid, &channel, dbc.clone(), diff --git a/crates/httpret/Cargo.toml b/crates/httpret/Cargo.toml index cc78958..ef1af7c 100644 --- a/crates/httpret/Cargo.toml +++ b/crates/httpret/Cargo.toml @@ -28,6 +28,7 @@ rand = "0.8.5" ciborium = "0.2.1" flate2 = "1" brotli = "7.0.0" +autoerr = "0.0.3" daqbuf-err = { path = "../../../daqbuf-err" } netpod = { path = "../../../daqbuf-netpod", package = "daqbuf-netpod" } query = { path = "../../../daqbuf-query", package = "daqbuf-query" } diff --git a/crates/httpret/src/channelconfig.rs b/crates/httpret/src/channelconfig.rs index 3063707..3f01467 100644 --- a/crates/httpret/src/channelconfig.rs +++ b/crates/httpret/src/channelconfig.rs @@ -1,6 +1,5 @@ use crate::response; use crate::ServiceSharedResources; -use daqbuf_err::thiserror; use dbconn::create_connection; use dbconn::worker::PgQueue; use futures_util::StreamExt; @@ -41,37 +40,38 @@ use serde::Serialize; use std::collections::BTreeMap; use url::Url; -#[derive(Debug, thiserror::Error)] -#[cstm(name = "ChannelConfigError")] -pub enum Error { - NotFound(SfDbChannel), - ConfigQuorum(#[from] nodenet::configquorum::Error), - ConfigNode(#[from] nodenet::channelconfig::Error), - Http(#[from] crate::Error), - HttpCrate(#[from] http::Error), - // TODO create dedicated error type for query parsing - BadQuery(#[from] daqbuf_err::Error), - MissingBackend, - MissingScalarType, - MissingShape, - MissingShapeKind, - MissingEdge, - MissingTimerange, - MissingChannelName, - Uri(#[from] netpod::UriError), - ChannelConfigQuery(daqbuf_err::Error), - ExpectScyllaBackend, - Pg(#[from] dbconn::pg::Error), - Scylla(String), - Join, - OtherErr(daqbuf_err::Error), - PgWorker(#[from] dbconn::worker::Error), - Async(#[from] netpod::AsyncChannelError), - ChannelConfig(#[from] dbconn::channelconfig::Error), - Netpod(#[from] netpod::Error), - ScyllaQuery(#[from] scyllaconn::scylla::transport::errors::QueryError), - ScyllaTypeCheck(#[from] scyllaconn::scylla::deserialize::TypeCheckError), -} +autoerr::create_error_v1!( + name(Error, "ChannelConfigError"), + enum variants { + NotFound(SfDbChannel), + ConfigQuorum(#[from] nodenet::configquorum::Error), + ConfigNode(#[from] nodenet::channelconfig::Error), + Http(#[from] crate::Error), + HttpCrate(#[from] http::Error), + // TODO create dedicated error type for query parsing + BadQuery(#[from] daqbuf_err::Error), + MissingBackend, + MissingScalarType, + MissingShape, + MissingShapeKind, + MissingEdge, + MissingTimerange, + MissingChannelName, + Uri(#[from] netpod::UriError), + ChannelConfigQuery(daqbuf_err::Error), + ExpectScyllaBackend, + Pg(#[from] dbconn::pg::Error), + Scylla(String), + Join, + OtherErr(daqbuf_err::Error), + PgWorker(#[from] dbconn::worker::Error), + Async(#[from] netpod::AsyncChannelError), + ChannelConfig(#[from] dbconn::channelconfig::Error), + Netpod(#[from] netpod::Error), + ScyllaQuery(#[from] scyllaconn::scylla::transport::errors::QueryError), + ScyllaTypeCheck(#[from] scyllaconn::scylla::deserialize::TypeCheckError), + }, +); fn other_err_error(e: daqbuf_err::Error) -> Error { Error::OtherErr(e) diff --git a/crates/httpret/src/httpret.rs b/crates/httpret/src/httpret.rs index 4a27f34..221a92a 100644 --- a/crates/httpret/src/httpret.rs +++ b/crates/httpret/src/httpret.rs @@ -17,8 +17,6 @@ pub mod settings; use crate::bodystream::response; use crate::err::Error; use daqbuf_err; -use daqbuf_err::thiserror; -use daqbuf_err::ThisError; use dbconn::worker::PgQueue; use dbconn::worker::PgWorker; use futures_util::Future; @@ -42,14 +40,11 @@ use netpod::status_board_init; use netpod::NodeConfigCached; use netpod::ReqCtx; use netpod::ServiceVersion; -use netpod::APP_JSON; use panic::AssertUnwindSafe; use panic::UnwindSafe; use pin::Pin; use scyllaconn::worker::ScyllaQueue; use scyllaconn::worker::ScyllaWorker; -use serde::Deserialize; -use serde::Serialize; use std::net; use std::panic; use std::pin; @@ -61,25 +56,20 @@ use taskrun::tokio; use taskrun::tokio::net::TcpListener; use tracing::Instrument; -#[derive(Debug, ThisError, Serialize, Deserialize)] -#[cstm(name = "Retrieval")] -pub enum RetrievalError { - Error(#[from] daqbuf_err::Error), - Error2(#[from] crate::err::Error), - TextError(String), - #[serde(skip)] - Hyper(#[from] hyper::Error), - #[serde(skip)] - Http(#[from] http::Error), - #[serde(skip)] - Serde(#[from] serde_json::Error), - #[serde(skip)] - Fmt(#[from] std::fmt::Error), - #[serde(skip)] - Url(#[from] url::ParseError), - #[serde(skip)] - Netpod(#[from] netpod::Error), -} +autoerr::create_error_v1!( + name(RetrievalError, "Retrieval"), + enum variants { + Error(#[from] daqbuf_err::Error), + Error2(#[from] crate::err::Error), + TextError(String), + Hyper(#[from] hyper::Error), + Http(#[from] http::Error), + Serde(#[from] serde_json::Error), + Fmt(#[from] std::fmt::Error), + Url(#[from] url::ParseError), + Netpod(#[from] netpod::Error), + }, +); trait IntoBoxedError: std::error::Error {} impl IntoBoxedError for net::AddrParseError {} diff --git a/crates/httpret/src/proxy.rs b/crates/httpret/src/proxy.rs index f9aed63..d5aab3d 100644 --- a/crates/httpret/src/proxy.rs +++ b/crates/httpret/src/proxy.rs @@ -189,8 +189,6 @@ async fn proxy_http_service_inner( } } else if let Some(h) = api4::StatusNodesRecursive::handler(&req) { h.handle(req, ctx, &proxy_config, service_version).await - } else if path == "/api/4/backends" { - Ok(backends(req, proxy_config).await?) } else if let Some(h) = api4::backend::BackendListHandler::handler(&req) { h.handle(req, ctx, &proxy_config).await } else if let Some(h) = api4::ChannelSearchAggHandler::handler(&req) { diff --git a/crates/scyllaconn/src/accounting/toplist.rs b/crates/scyllaconn/src/accounting/toplist.rs index 66ab645..691c3bf 100644 --- a/crates/scyllaconn/src/accounting/toplist.rs +++ b/crates/scyllaconn/src/accounting/toplist.rs @@ -1,7 +1,5 @@ use crate::log::*; use daqbuf_err as err; -use err::thiserror; -use err::ThisError; use futures_util::TryStreamExt; use netpod::ttl::RetentionTime; use netpod::TsMs; diff --git a/crates/scyllaconn/src/bincache.rs b/crates/scyllaconn/src/bincache.rs index 9e78a3f..b93caa6 100644 --- a/crates/scyllaconn/src/bincache.rs +++ b/crates/scyllaconn/src/bincache.rs @@ -11,7 +11,7 @@ use netpod::TsNano; use scylla::Session as ScySession; use std::ops::Range; use streams::timebin::cached::reader::BinsReadRes; -use streams::timebin::cached::reader::PrebinnedPartitioning; +use daqbuf_series::msp::PrebinnedPartitioning; async fn scylla_read_prebinned_f32( series: u64, diff --git a/crates/scyllaconn/src/errconv.rs b/crates/scyllaconn/src/errconv.rs index 6967d75..82070b2 100644 --- a/crates/scyllaconn/src/errconv.rs +++ b/crates/scyllaconn/src/errconv.rs @@ -1,6 +1,5 @@ use daqbuf_err as err; use err::Error; -use scylla::frame::response::cql_to_rust::FromRowError as ScyFromRowError; use scylla::transport::errors::NewSessionError as ScyNewSessionError; use scylla::transport::errors::QueryError as ScyQueryError; @@ -34,15 +33,6 @@ impl ErrConv for Result { } } -impl ErrConv for Result { - fn err_conv(self) -> Result { - match self { - Ok(k) => Ok(k), - Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))), - } - } -} - impl ErrConv for Result { fn err_conv(self) -> Result { match self { diff --git a/crates/scyllaconn/src/events.rs b/crates/scyllaconn/src/events.rs index f7fc34f..d9bfc9a 100644 --- a/crates/scyllaconn/src/events.rs +++ b/crates/scyllaconn/src/events.rs @@ -32,7 +32,6 @@ autoerr::create_error_v1!( Prepare(#[from] crate::events2::prepare::Error), ScyllaQuery(#[from] scylla::transport::errors::QueryError), ScyllaNextRow(#[from] scylla::transport::iterator::NextRowError), - ScyllaTypeConv(#[from] scylla::cql_to_rust::FromRowError), ScyllaWorker(Box), ScyllaTypeCheck(#[from] scylla::deserialize::TypeCheckError), MissingQuery(String), diff --git a/crates/streamio/Cargo.toml b/crates/streamio/Cargo.toml index 2306ff8..19c7f17 100644 --- a/crates/streamio/Cargo.toml +++ b/crates/streamio/Cargo.toml @@ -21,6 +21,7 @@ byteorder = "1.4.3" async-channel = "1.9.0" rand_xoshiro = "0.6.0" thiserror = "=0.0.1" +autoerr = "0.0.3" chrono = { version = "0.4.19", features = ["serde"] } netpod = { path = "../../../daqbuf-netpod", package = "daqbuf-netpod" } query = { path = "../../../daqbuf-query", package = "daqbuf-query" } diff --git a/crates/streamio/src/tcprawclient.rs b/crates/streamio/src/tcprawclient.rs index d61a3a8..2b772da 100644 --- a/crates/streamio/src/tcprawclient.rs +++ b/crates/streamio/src/tcprawclient.rs @@ -25,14 +25,15 @@ use streams::tcprawclient::HttpSimplePost; use tokio::io::AsyncWriteExt; use tokio::net::TcpStream; -#[derive(Debug, thiserror::Error)] -#[cstm(name = "TcpRawClient")] -pub enum Error { - IO(#[from] std::io::Error), - Frame(#[from] items_2::frame::Error), - Framable(#[from] items_2::framable::Error), - StreamsRawClient(#[from] streams::tcprawclient::Error), -} +autoerr::create_error_v1!( + name(Error, "TcpRawClient"), + enum variants { + IO(#[from] std::io::Error), + Frame(#[from] items_2::frame::Error), + Framable(#[from] items_2::framable::Error), + StreamsRawClient(#[from] streams::tcprawclient::Error), + }, +); pub type BoxedStream = Pin> + Send>>;