Update deps

This commit is contained in:
Dominik Werder
2024-07-04 10:23:01 +02:00
parent 584d977675
commit 06ac90aa70
31 changed files with 362 additions and 778 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -23,4 +23,4 @@ incremental = true
[patch.crates-io]
#tokio = { git = "https://github.com/dominikwerder/tokio", rev = "995221d8" }
thiserror = { git = "https://github.com/dominikwerder/thiserror.git" }
thiserror = { git = "https://github.com/dominikwerder/thiserror.git", branch = "cstm" }

View File

@@ -1,6 +1,6 @@
[package]
name = "daqbuffer"
version = "0.5.1-aa.2"
version = "0.5.1-aa.3"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"

View File

@@ -18,13 +18,13 @@ use streams::cbor_stream::FramedBytesToSitemtyDynEventsStream;
use url::Url;
#[derive(Debug, ThisError)]
#[cstm(name = "DataFetch")]
pub enum Error {
Url(#[from] url::ParseError),
NoHostname,
HttpBody(#[from] http::Error),
HttpClient(#[from] httpclient::Error),
Hyper(#[from] httpclient::hyper::Error),
#[error("RequestFailed({0})")]
RequestFailed(String),
}

View File

@@ -5,6 +5,7 @@ use netpod::Shape;
use tokio_postgres::Client;
#[derive(Debug, ThisError)]
#[cstm(name = "ChannelInfo")]
pub enum Error {
Pg(#[from] crate::pg::Error),
BadValue,

View File

@@ -196,6 +196,7 @@ pub async fn find_series_sf_databuffer(channel: &SfDbChannel, pgclient: Arc<PgCl
}
#[derive(Debug, ThisError, Serialize)]
#[cstm(name = "FindChannel")]
pub enum FindChannelError {
UnknownBackend,
BadSeriesId,

View File

@@ -1,4 +1,5 @@
use crate::create_connection;
use crate::worker::PgQueue;
use crate::ErrConv;
use err::Error;
use netpod::ChannelArchiver;
@@ -10,6 +11,7 @@ use netpod::NodeConfigCached;
use netpod::ScalarType;
use netpod::Shape;
use serde_json::Value as JsVal;
use tokio_postgres::Client as PgClient;
pub async fn search_channel_databuffer(
query: ChannelSearchQuery,
@@ -98,10 +100,9 @@ pub async fn search_channel_databuffer(
Ok(ret)
}
pub async fn search_channel_scylla(
pub(super) async fn search_channel_scylla(
query: ChannelSearchQuery,
backend: &str,
pgconf: &Database,
pgc: &PgClient,
) -> Result<ChannelSearchResult, Error> {
let empty = if !query.name_regex.is_empty() { false } else { true };
if empty {
@@ -109,11 +110,10 @@ pub async fn search_channel_scylla(
return Ok(ret);
}
let ch_kind: i16 = if query.channel_status { 1 } else { 2 };
let tmp_backend = Some(backend.to_string());
let (cb1, cb2) = if let Some(x) = &tmp_backend {
let (cb1, cb2) = if let Some(x) = query.backend.as_ref() {
(false, x.as_str())
} else {
(true, "")
(false, "----------")
};
let regop = if query.icase { "~*" } else { "~" };
let sql = &format!(
@@ -128,8 +128,7 @@ pub async fn search_channel_scylla(
),
regop
);
let (pgclient, _pgjh) = crate::create_connection(pgconf).await?;
let rows = pgclient
let rows = pgc
.query(sql, &[&ch_kind, &query.name_regex, &cb1, &cb2])
.await
.err_conv()?;
@@ -279,11 +278,21 @@ async fn search_channel_archeng(
Ok(ret)
}
pub async fn search_channel(query: ChannelSearchQuery, ncc: &NodeConfigCached) -> Result<ChannelSearchResult, Error> {
pub async fn search_channel(
query: ChannelSearchQuery,
pgqueue: &PgQueue,
ncc: &NodeConfigCached,
) -> Result<ChannelSearchResult, Error> {
let backend = &ncc.node_config.cluster.backend;
let pgconf = &ncc.node_config.cluster.database;
let mut query = query;
query.backend = Some(backend.into());
if let Some(_scyconf) = ncc.node_config.cluster.scylla_st() {
search_channel_scylla(query, backend, pgconf).await
pgqueue
.search_channel_scylla(query)
.await
.map_err(|e| Error::with_msg_no_trace(format!("db worker error {e}")))?
// search_channel_scylla(query, backend, pgconf).await
} else if let Some(conf) = ncc.node.channel_archiver.as_ref() {
search_channel_archeng(query, backend.clone(), conf, pgconf).await
} else if let Some(_conf) = ncc.node.archiver_appliance.as_ref() {

View File

@@ -1,17 +1,21 @@
use crate::create_connection;
use async_channel::Receiver;
use async_channel::RecvError;
use async_channel::Sender;
use err::thiserror;
use err::ThisError;
use netpod::log::*;
use netpod::range::evrange::NanoRange;
use netpod::ChConf;
use netpod::ChannelSearchQuery;
use netpod::ChannelSearchResult;
use netpod::Database;
use taskrun::tokio;
use tokio::task::JoinHandle;
use tokio_postgres::Client;
#[derive(Debug, ThisError)]
#[cstm(name = "PgWorker")]
pub enum Error {
Error(#[from] err::Error),
ChannelSend,
@@ -19,6 +23,12 @@ pub enum Error {
Join,
}
impl From<RecvError> for Error {
fn from(_value: RecvError) -> Self {
Self::ChannelRecv
}
}
impl err::ToErr for Error {
fn to_err(self) -> err::Error {
err::Error::from_string(self)
@@ -33,6 +43,7 @@ enum Job {
Vec<u64>,
Sender<Result<Vec<Option<crate::channelinfo::ChannelInfo>>, crate::channelinfo::Error>>,
),
SearchChannel(ChannelSearchQuery, Sender<Result<ChannelSearchResult, err::Error>>),
}
#[derive(Debug, Clone)]
@@ -73,6 +84,17 @@ impl PgQueue {
self.tx.send(job).await.map_err(|_| Error::ChannelSend)?;
Ok(rx)
}
pub async fn search_channel_scylla(
&self,
query: ChannelSearchQuery,
) -> Result<Result<ChannelSearchResult, err::Error>, Error> {
let (tx, rx) = async_channel::bounded(1);
let job = Job::SearchChannel(query, tx);
self.tx.send(job).await.map_err(|_| Error::ChannelSend)?;
let ret = rx.recv().await?;
Ok(ret)
}
}
#[derive(Debug)]
@@ -126,6 +148,12 @@ impl PgWorker {
// TODO count for stats
}
}
Job::SearchChannel(query, tx) => {
let res = crate::search::search_channel_scylla(query, &self.pg).await;
if tx.send(res.map_err(Into::into)).await.is_err() {
// TODO count for stats
}
}
}
}
}

View File

@@ -12,6 +12,7 @@ use parse::channelconfig::ConfigEntry;
use parse::channelconfig::ConfigParseError;
#[derive(Debug, ThisError)]
#[cstm(name = "ChannelConfig")]
pub enum ConfigError {
ParseError(ConfigParseError),
NotFound,

View File

@@ -35,6 +35,7 @@ use streams::filechunkread::FileChunkRead;
use streams::needminbuffer::NeedMinBuffer;
#[derive(Debug, ThisError, Serialize, Deserialize)]
#[cstm(name = "DatabufferDataParse")]
pub enum DataParseError {
DataFrameLengthMismatch,
FileHeaderTooShort,

View File

@@ -21,8 +21,9 @@ regex = "1.9.1"
http = "1.0.0"
hyper = "1.0.1"
thiserror = "=0.0.1"
#thiserror = "1"
anyhow = "1.0"
tokio = "1"
[patch.crates-io]
thiserror = { git = "https://github.com/dominikwerder/thiserror.git" }
thiserror = { git = "https://github.com/dominikwerder/thiserror.git", branch = "cstm" }

View File

@@ -1,10 +1,21 @@
//! Error handling and reporting.
#[macro_export]
macro_rules! err_dbg_dis {
($tt:ty, $nn:expr) => {
impl ::core::fmt::Display for $tt {
fn fmt(&self, fmt: &mut ::core::fmt::Formatter) -> ::core::fmt::Result {
write!(fmt, "{}::{:?}", $nn, self)
}
}
};
}
pub use anyhow;
pub use thiserror;
pub use thiserror::Error as ThisError;
pub use thiserror::UserErrorClass;
pub use thiserror::UserErrorContent;
// pub use thiserror::UserErrorClass;
// pub use thiserror::UserErrorContent;
pub mod bt {
pub use backtrace::Backtrace;
@@ -525,19 +536,22 @@ mod test {
use super::*;
#[derive(Debug, ThisError, Serialize, Deserialize)]
#[cstm(name = "SomeErrorEnumA")]
enum SomeErrorEnumA {
BadCase,
WithStringContent(String),
#[error("bad: {0}")]
// #[error("bad: {0}")]
WithStringContentFmt(String),
}
#[derive(Debug, ThisError, Serialize, Deserialize)]
#[cstm(name = "SomeErrorEnumB0")]
enum SomeErrorEnumB0 {
FromA(#[from] SomeErrorEnumA),
}
#[derive(Debug, ThisError, Serialize, Deserialize)]
#[cstm(name = "SomeErrorEnumB1")]
enum SomeErrorEnumB1 {
FromA(#[from] SomeErrorEnumA),
#[error("caffe")]

View File

@@ -30,6 +30,7 @@ use std::task::Poll;
use taskrun::tokio;
#[derive(Debug, ThisError)]
#[cstm(name = "ChannelFindActive")]
pub enum FindActiveError {
HttpBadAccept,
HttpBadUrl,

View File

@@ -20,9 +20,9 @@ use std::sync::Arc;
use streams::instrument::InstrumentStream;
#[derive(Debug, ThisError)]
#[cstm(name = "EventData")]
pub enum EventDataError {
QueryParse,
#[error("Error({0})")]
Error(Box<dyn ToPublicError>),
InternalError,
}

View File

@@ -1,6 +1,7 @@
use crate::bodystream::response;
use crate::bodystream::ToPublicResponse;
use crate::err::Error;
use dbconn::worker::PgQueue;
use http::Method;
use http::StatusCode;
use httpclient::body_empty;
@@ -16,14 +17,6 @@ use netpod::NodeConfigCached;
use netpod::ACCEPT_ALL;
use netpod::APP_JSON;
pub async fn channel_search(req: Requ, node_config: &NodeConfigCached) -> Result<ChannelSearchResult, Error> {
let url = req_uri_to_url(req.uri())?;
let query = ChannelSearchQuery::from_url(&url)?;
info!("search query: {:?}", query);
let res = dbconn::search::search_channel(query, node_config).await?;
Ok(res)
}
pub struct ChannelSearchHandler {}
impl ChannelSearchHandler {
@@ -35,7 +28,7 @@ impl ChannelSearchHandler {
}
}
pub async fn handle(&self, req: Requ, node_config: &NodeConfigCached) -> Result<StreamResponse, Error> {
pub async fn handle(&self, req: Requ, pgqueue: &PgQueue, ncc: &NodeConfigCached) -> Result<StreamResponse, Error> {
if req.method() == Method::GET {
let accept_def = APP_JSON;
let accept = req
@@ -43,7 +36,7 @@ impl ChannelSearchHandler {
.get(http::header::ACCEPT)
.map_or(accept_def, |k| k.to_str().unwrap_or(accept_def));
if accept.contains(APP_JSON) || accept.contains(ACCEPT_ALL) {
match channel_search(req, node_config).await {
match channel_search(req, pgqueue, ncc).await {
Ok(item) => Ok(response(StatusCode::OK).body(ToJsonBody::from(&item).into_body())?),
Err(e) => {
warn!("handle: got error from channel_search: {e:?}");
@@ -58,3 +51,11 @@ impl ChannelSearchHandler {
}
}
}
async fn channel_search(req: Requ, pgqueue: &PgQueue, ncc: &NodeConfigCached) -> Result<ChannelSearchResult, Error> {
let url = req_uri_to_url(req.uri())?;
let query = ChannelSearchQuery::from_url(&url)?;
info!("search query: {:?}", query);
let res = dbconn::search::search_channel(query, pgqueue, ncc).await?;
Ok(res)
}

View File

@@ -63,6 +63,7 @@ use taskrun::tokio::net::TcpListener;
use tracing::Instrument;
#[derive(Debug, ThisError, Serialize, Deserialize)]
#[cstm(name = "Retrieval")]
pub enum RetrievalError {
Error(#[from] ::err::Error),
Error2(#[from] crate::err::Error),
@@ -353,7 +354,7 @@ async fn http_service_inner(
} else if let Some(h) = api4::databuffer_tools::FindActiveHandler::handler(&req) {
Ok(h.handle(req, &node_config).await?)
} else if let Some(h) = api4::search::ChannelSearchHandler::handler(&req) {
Ok(h.handle(req, &node_config).await?)
Ok(h.handle(req, &shared_res.pgqueue, &node_config).await?)
} else if let Some(h) = channel_status::ConnectionStatusEvents::handler(&req) {
Ok(h.handle(req, ctx, &shared_res, &node_config).await?)
} else if let Some(h) = channel_status::ChannelStatusEventsHandler::handler(&req) {

View File

@@ -268,7 +268,7 @@ impl ScalarOps for EnumVariant {
}
fn zero_b() -> Self {
EnumVariant::empty()
EnumVariant::default()
}
fn equal_slack(&self, rhs: &Self) -> bool {

View File

@@ -210,6 +210,7 @@ mod serde_channel_events {
use crate::eventsxbindim0::EventsXbinDim0;
use items_0::subfr::SubFrId;
use netpod::log::*;
use netpod::EnumVariant;
use serde::de;
use serde::de::EnumAccess;
use serde::de::VariantAccess;
@@ -321,6 +322,11 @@ mod serde_channel_events {
seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
Ok(EvBox(Box::new(obj)))
}
EnumVariant::SUB => {
let obj: EventsDim0<EnumVariant> =
seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?;
Ok(EvBox(Box::new(obj)))
}
_ => {
error!("TODO serde cty {cty} nty {nty}");
Err(de::Error::custom(&format!("unknown nty {nty}")))

View File

@@ -259,6 +259,7 @@ impl Mergeable for EventFull {
}
#[derive(Debug, ThisError, Serialize, Deserialize)]
#[cstm(name = "Decompress")]
pub enum DecompError {
TooLittleInput,
BadCompresionBlockSize,

View File

@@ -244,7 +244,7 @@ impl Serialize for ScalarType {
BOOL => ser.serialize_str("bool"),
STRING => ser.serialize_str("string"),
Enum => ser.serialize_str("enum"),
ChannelStatus => ser.serialize_str("ChannelStatus"),
ChannelStatus => ser.serialize_str("channelstatus"),
}
}
}
@@ -274,7 +274,7 @@ impl<'de> serde::de::Visitor<'de> for ScalarTypeVis {
"bool" => ScalarType::BOOL,
"string" => ScalarType::STRING,
"enum" => ScalarType::Enum,
"ChannelStatus" => ScalarType::ChannelStatus,
"channelstatus" => ScalarType::ChannelStatus,
k => return Err(E::custom(format!("can not understand variant {k:?}"))),
};
Ok(ret)
@@ -382,7 +382,7 @@ impl ScalarType {
BOOL => "bool",
STRING => "string",
Enum => "enum",
ChannelStatus => "ChannelStatus",
ChannelStatus => "channelstatus",
}
}
@@ -404,7 +404,7 @@ impl ScalarType {
"bool" => BOOL,
"string" => STRING,
"enum" => Enum,
"ChannelStatus" => ChannelStatus,
"channelstatus" => ChannelStatus,
_ => {
return Err(Error::with_msg_no_trace(format!(
"from_bsread_str can not understand bsread {:?}",
@@ -593,8 +593,8 @@ pub struct EnumVariant {
name: StringFix<26>,
}
impl EnumVariant {
pub fn empty() -> Self {
impl Default for EnumVariant {
fn default() -> Self {
Self {
ix: u16::MAX,
name: StringFix::new(),

View File

@@ -78,10 +78,13 @@ impl fmt::Display for RetentionTime {
}
#[derive(Debug, ThisError)]
#[cstm(name = "TTL")]
pub enum Error {
Parse,
}
// err::err_dbg_dis!(Error, "ttl::Error::");
impl FromStr for RetentionTime {
type Err = Error;

View File

@@ -27,7 +27,7 @@ use tokio::io::ErrorKind;
const TEST_BACKEND: &str = "testbackend-00";
#[derive(Debug, ThisError)]
// #[error("ConfigParseError")]
#[cstm(name = "ConfigParse")]
pub enum ConfigParseError {
NotSupportedOnNode,
FileNotFound,

View File

@@ -9,6 +9,7 @@ use scylla::prepared_statement::PreparedStatement;
use scylla::Session as ScySession;
#[derive(Debug, ThisError)]
#[cstm(name = "AccountingToplist")]
pub enum Error {
ScyllaQuery(#[from] scylla::transport::errors::QueryError),
ScyllaNextRow(#[from] scylla::transport::iterator::NextRowError),

View File

@@ -18,6 +18,7 @@ use items_2::eventsdim1::EventsDim1;
use netpod::log::*;
use netpod::ttl::RetentionTime;
use netpod::DtNano;
use netpod::EnumVariant;
use netpod::ScalarType;
use netpod::Shape;
use netpod::TsMs;
@@ -33,6 +34,7 @@ use std::task::Context;
use std::task::Poll;
#[derive(Debug, ThisError)]
#[cstm(name = "ScyllaReadEvents")]
pub enum Error {
Prepare(#[from] crate::events2::prepare::Error),
ScyllaQuery(#[from] scylla::transport::errors::QueryError),
@@ -132,6 +134,38 @@ macro_rules! impl_scaty_array {
};
}
impl ValTy for EnumVariant {
type ScaTy = EnumVariant;
type ScyTy = i16;
type Container = EventsDim0<EnumVariant>;
fn from_scyty(inp: Self::ScyTy) -> Self {
let _ = inp;
panic!("uses more specialized impl")
}
fn from_valueblob(inp: Vec<u8>) -> Self {
let _ = inp;
panic!("uses more specialized impl")
}
fn table_name() -> &'static str {
"array_string"
}
fn default() -> Self {
<Self as Default>::default()
}
fn is_valueblob() -> bool {
false
}
fn st_name() -> &'static str {
"enum"
}
}
impl ValTy for Vec<String> {
type ScaTy = String;
type ScyTy = Vec<String>;
@@ -142,6 +176,7 @@ impl ValTy for Vec<String> {
}
fn from_valueblob(inp: Vec<u8>) -> Self {
let _ = inp;
warn!("ValTy::from_valueblob for Vec<String>");
Vec::new()
}
@@ -188,6 +223,7 @@ impl_scaty_array!(Vec<f32>, f32, Vec<f32>, "f32", "f32");
impl_scaty_array!(Vec<f64>, f64, Vec<f64>, "f64", "f64");
impl_scaty_array!(Vec<bool>, bool, Vec<bool>, "bool", "bool");
#[derive(Debug)]
pub(super) struct ReadNextValuesOpts {
rt: RetentionTime,
series: u64,
@@ -226,6 +262,7 @@ where
{
// TODO could take scyqeue out of opts struct.
let scyqueue = opts.scyqueue.clone();
debug!("bbbbbbbbbbbbbbbbbbbbbbbbbbbb");
let futgen = Box::new(|scy: Arc<Session>, stmts: Arc<StmtsEvents>| {
let fut = async {
read_next_values_2::<ST>(opts, scy, stmts)
@@ -246,7 +283,13 @@ async fn read_next_values_2<ST>(
where
ST: ValTy,
{
trace!("read_next_values_2 {} {}", opts.series, opts.ts_msp);
debug!("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa {opts:?}");
trace!(
"read_next_values_2 {} {} st_name {}",
opts.series,
opts.ts_msp,
ST::st_name()
);
let series = opts.series;
let ts_msp = opts.ts_msp;
let range = opts.range;

View File

@@ -13,6 +13,7 @@ use items_0::Events;
use items_2::channelevents::ChannelEvents;
use netpod::log::*;
use netpod::ttl::RetentionTime;
use netpod::EnumVariant;
use netpod::ScalarType;
use netpod::Shape;
use netpod::TsMs;
@@ -41,6 +42,7 @@ macro_rules! warn_item {
}
#[derive(Debug, ThisError)]
#[cstm(name = "ScyllaEvents")]
pub enum Error {
Worker(#[from] crate::worker::Error),
Events(#[from] crate::events::Error),
@@ -143,6 +145,7 @@ impl EventsStreamRt {
);
let scalar_type = self.scalar_type.clone();
let shape = self.shape.clone();
debug!("make_read_events_fut {:?} {:?}", shape, scalar_type);
let fut = async move {
let ret = match &shape {
Shape::Scalar => match &scalar_type {
@@ -158,9 +161,15 @@ impl EventsStreamRt {
ScalarType::F64 => read_next_values::<f64>(opts).await,
ScalarType::BOOL => read_next_values::<bool>(opts).await,
ScalarType::STRING => read_next_values::<String>(opts).await,
ScalarType::Enum => read_next_values::<String>(opts).await,
ScalarType::Enum => {
debug!(
"make_read_events_fut {:?} {:?} ------------- good",
shape, scalar_type
);
read_next_values::<EnumVariant>(opts).await
}
ScalarType::ChannelStatus => {
warn!("read scalar channel status not yet supported");
warn!("read not yet supported {:?} {:?}", shape, scalar_type);
err::todoval()
}
},
@@ -177,12 +186,15 @@ impl EventsStreamRt {
ScalarType::F64 => read_next_values::<Vec<f64>>(opts).await,
ScalarType::BOOL => read_next_values::<Vec<bool>>(opts).await,
ScalarType::STRING => {
warn!("read array string not yet supported");
warn!("read not yet supported {:?} {:?}", shape, scalar_type);
err::todoval()
}
ScalarType::Enum => {
warn!("read not yet supported {:?} {:?}", shape, scalar_type);
err::todoval()
}
ScalarType::Enum => read_next_values::<Vec<String>>(opts).await,
ScalarType::ChannelStatus => {
warn!("read array channel status not yet supported");
warn!("read not yet supported {:?} {:?}", shape, scalar_type);
err::todoval()
}
},

View File

@@ -11,6 +11,7 @@ use std::task::Context;
use std::task::Poll;
#[derive(Debug, ThisError)]
#[cstm(name = "EventsFirstBefore")]
pub enum Error {
Unordered,
Logic,

View File

@@ -25,6 +25,7 @@ use std::task::Context;
use std::task::Poll;
#[derive(Debug, ThisError)]
#[cstm(name = "EventsMergeRt")]
pub enum Error {
Input(#[from] crate::events2::firstbefore::Error),
Events(#[from] crate::events2::events::Error),

View File

@@ -18,9 +18,9 @@ use std::task::Context;
use std::task::Poll;
#[derive(Debug, ThisError)]
#[cstm(name = "EventsMsp")]
pub enum Error {
Logic,
#[error("Worker({0})")]
Worker(Box<crate::worker::Error>),
ScyllaQuery(#[from] scylla::transport::errors::QueryError),
ScyllaRow(#[from] scylla::transport::iterator::NextRowError),

View File

@@ -5,6 +5,7 @@ use scylla::prepared_statement::PreparedStatement;
use scylla::Session;
#[derive(Debug, ThisError)]
#[cstm(name = "ScyllaPrepare")]
pub enum Error {
ScyllaQuery(#[from] scylla::transport::errors::QueryError),
ScyllaNextRow(#[from] scylla::transport::iterator::NextRowError),

View File

@@ -18,8 +18,8 @@ use std::pin::Pin;
use std::sync::Arc;
#[derive(Debug, ThisError)]
#[cstm(name = "ScyllaWorker")]
pub enum Error {
#[error("ScyllaConnection({0})")]
ScyllaConnection(err::Error),
Prepare(#[from] crate::events2::prepare::Error),
EventsQuery(#[from] crate::events::Error),

View File

@@ -25,6 +25,7 @@ macro_rules! trace_parse {
}
#[derive(Debug, ThisError)]
#[cstm(name = "StreamFramedBytes")]
pub enum Error {
FrameTooLarge,
Logic,