Remove thiserror from netfetch

This commit is contained in:
Dominik Werder
2025-02-19 15:46:08 +01:00
parent a827478bc7
commit 31538ee23a
11 changed files with 123 additions and 119 deletions

View File

@@ -1,8 +1,6 @@
use async_channel::Sender;
use bytes::Buf;
use dbpg::seriesbychannel::ChannelInfoQuery;
use err::thiserror;
use err::ThisError;
use log::*;
use netpod::ScalarType;
use netpod::SeriesKind;
@@ -17,15 +15,16 @@ use std::net::Ipv4Addr;
use std::time::SystemTime;
use taskrun::tokio::net::UdpSocket;
#[derive(Debug, ThisError)]
#[cstm(name = "NetfetchBeacons")]
pub enum Error {
Io(#[from] std::io::Error),
SeriesWriter(#[from] serieswriter::writer::Error),
ChannelSend,
ChannelRecv,
ChannelLookup(#[from] dbpg::seriesbychannel::Error),
}
autoerr::create_error_v1!(
name(Error, "NetfetchBeacons"),
enum variants {
Io(#[from] std::io::Error),
SeriesWriter(#[from] serieswriter::writer::Error),
ChannelSend,
ChannelRecv,
ChannelLookup(#[from] dbpg::seriesbychannel::Error),
},
);
impl<T> From<async_channel::SendError<T>> for Error {
fn from(_value: async_channel::SendError<T>) -> Self {

View File

@@ -3,8 +3,6 @@ use super::CreatedState;
use super::Ioid;
use ca_proto::ca::proto;
use dbpg::seriesbychannel::ChannelInfoQuery;
use err::thiserror;
use err::ThisError;
use log::*;
use proto::CaMsg;
use proto::ReadNotify;
@@ -12,11 +10,12 @@ use series::SeriesId;
use std::pin::Pin;
use std::time::Instant;
#[derive(Debug, ThisError)]
#[cstm(name = "NetfetchEnumfetch")]
pub enum Error {
MissingState,
}
autoerr::create_error_v1!(
name(Error, "NetfetchEnumfetch"),
enum variants {
MissingState,
},
);
pub trait ConnFuture: Send {
fn camsg(self: Pin<&mut Self>, camsg: CaMsg, conn: &mut CaConn) -> Result<(), Error>;

View File

@@ -1,8 +1,11 @@
use ca_proto::ca::proto;
#[derive(Debug, thiserror::Error)]
#[cstm(name = "ConnChannelError")]
pub enum Error {}
autoerr::create_error_v1!(
name(Error, "ConnChannelError"),
enum variants {
Logic,
},
);
trait Channel {
fn can_accept_ca_msg(&self) -> bool;

View File

@@ -14,12 +14,13 @@ use std::time::Duration;
use taskrun::tokio;
use tokio::task::JoinHandle;
#[derive(Debug, thiserror::Error)]
#[cstm(name = "IocSearch")]
pub enum Error {
LookupFailure(String),
IO(#[from] std::io::Error),
}
autoerr::create_error_v1!(
name(Error, "IocSearch"),
enum variants {
LookupFailure(String),
IO(#[from] std::io::Error),
},
);
async fn resolve_address(addr_str: &str) -> Result<SocketAddr, Error> {
const PORT_DEFAULT: u16 = 5064;

View File

@@ -412,36 +412,7 @@ fn make_routes(
)
.nest(
"/channel",
Router::new()
.fallback(|| async { axum::Json(json!({"subcommands":["states"]})) })
.route(
"/error_handler_test",
get({
let tx = connset_cmd_tx.clone();
|Query(params): Query<HashMap<String, String>>| status::error_handler_test()
}),
)
.route(
"/states",
get({
let tx = connset_cmd_tx.clone();
|Query(params): Query<HashMap<String, String>>| status::channel_states(params, tx)
}),
)
.route(
"/add",
get({
let dcom = dcom.clone();
|Query(params): Query<HashMap<String, String>>| channel_add(params, dcom)
}),
)
.route(
"/remove",
get({
let dcom = dcom.clone();
|Query(params): Query<HashMap<String, String>>| channel_remove(params, dcom)
}),
),
make_routes_channel(rres.clone(), dcom.clone(), connset_cmd_tx.clone(), stats_set.clone()),
)
.nest(
"/ingest",
@@ -547,6 +518,48 @@ fn make_routes(
)
}
fn make_routes_channel(
rres: Arc<RoutesResources>,
dcom: Arc<DaemonComm>,
connset_cmd_tx: Sender<CaConnSetEvent>,
stats_set: StatsSet,
) -> axum::Router {
use axum::extract;
use axum::routing::{get, post, put};
use axum::Router;
use http::StatusCode;
Router::new()
.fallback(|| async { axum::Json(json!({"subcommands":["states"]})) })
.route(
"/error_handler_test",
get({
let tx = connset_cmd_tx.clone();
|Query(params): Query<HashMap<String, String>>| status::error_handler_test()
}),
)
.route(
"/states",
get({
let tx = connset_cmd_tx.clone();
|Query(params): Query<HashMap<String, String>>| status::channel_states(params, tx)
}),
)
.route(
"/add",
get({
let dcom = dcom.clone();
|Query(params): Query<HashMap<String, String>>| channel_add(params, dcom)
}),
)
.route(
"/remove",
get({
let dcom = dcom.clone();
|Query(params): Query<HashMap<String, String>>| channel_remove(params, dcom)
}),
)
}
pub async fn metrics_service(
bind_to: String,
dcom: Arc<DaemonComm>,

View File

@@ -8,8 +8,6 @@ use bytes::Bytes;
use chrono::DateTime;
use chrono::Utc;
use core::fmt;
use err::thiserror;
use err::ThisError;
use futures_util::StreamExt;
use futures_util::TryStreamExt;
use netpod::log::*;
@@ -47,22 +45,22 @@ macro_rules! debug_cql {
};
}
#[derive(Debug, ThisError)]
#[cstm(name = "HttpDelete")]
pub enum Error {
Logic,
MissingRetentionTime,
MissingSeriesId,
MissingScalarType,
MissingBegDate,
MissingEndDate,
ScyllaTransport(#[from] scylla::transport::errors::NewSessionError),
ScyllaQuery(#[from] scylla::transport::errors::QueryError),
ScyllaRowError(#[from] scylla::cql_to_rust::FromRowError),
ScyllaNextRow(#[from] scylla::transport::iterator::NextRowError),
ScyllaTypeCheck(#[from] scylla::deserialize::TypeCheckError),
InvalidTimestamp,
}
autoerr::create_error_v1!(
name(Error, "HttpDelete"),
enum variants {
Logic,
MissingRetentionTime,
MissingSeriesId,
MissingScalarType,
MissingBegDate,
MissingEndDate,
ScyllaTransport(#[from] scylla::transport::errors::NewSessionError),
ScyllaQuery(#[from] scylla::transport::errors::QueryError),
ScyllaNextRow(#[from] scylla::transport::iterator::NextRowError),
ScyllaTypeCheck(#[from] scylla::deserialize::TypeCheckError),
InvalidTimestamp,
},
);
pub async fn delete(
(headers, Query(params), body): (HeaderMap, Query<HashMap<String, String>>, axum::body::Body),

View File

@@ -6,8 +6,6 @@ use axum::Json;
use bytes::Bytes;
use core::fmt;
use dbpg::seriesbychannel::ChannelInfoQuery;
use err::thiserror;
use err::ThisError;
use futures_util::StreamExt;
use futures_util::TryStreamExt;
use items_2::binning::container_events::ContainerEvents;
@@ -133,24 +131,24 @@ impl EmittableType for WritableType {
}
}
#[derive(Debug, ThisError)]
#[cstm(name = "MetricsIngest")]
pub enum Error {
UnsupportedContentType,
Logic,
SeriesWriter(#[from] serieswriter::writer::Error),
MissingChannelName,
MissingScalarType,
MissingShape,
SendError,
Decode,
FramedBytes(#[from] streams::framed_bytes::Error),
InsertQueues(#[from] scywr::insertqueues::Error),
Serde(#[from] serde_json::Error),
#[error("Parse({0})")]
Parse(String),
NotSupported,
}
autoerr::create_error_v1!(
name(Error, "MetricsIngest"),
enum variants {
UnsupportedContentType,
Logic,
SeriesWriter(#[from] serieswriter::writer::Error),
MissingChannelName,
MissingScalarType,
MissingShape,
SendError,
Decode,
FramedBytes(#[from] streams::framed_bytes::Error),
InsertQueues(#[from] scywr::insertqueues::Error),
Serde(#[from] serde_json::Error),
Parse(String),
NotSupported,
},
);
pub async fn post_v01(
(headers, Query(params), body): (HeaderMap, Query<HashMap<String, String>>, axum::body::Body),

View File

@@ -6,8 +6,6 @@ use crate::conf::ChannelConfigForStatesApi;
use async_channel::Sender;
use chrono::DateTime;
use chrono::Utc;
use err::thiserror;
use err::ThisError;
use serde::Serialize;
use std::collections::BTreeMap;
use std::collections::HashMap;

View File

@@ -1,18 +1,16 @@
use err::thiserror;
use std::array::TryFromSliceError;
use std::mem;
use taskrun::tokio;
use tokio::io::ReadBuf;
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("read {0} have {1}")]
AdvanceOver(usize, usize),
#[error("write {0} have {1}")]
WriteAdvanceOver(usize, usize),
#[error("TryFromSliceError")]
Slice(#[from] TryFromSliceError),
}
autoerr::create_error_v1!(
name(Error, "Error"),
enum variants {
AdvanceOver(usize, usize),
WriteAdvanceOver(usize, usize),
Slice(#[from] TryFromSliceError),
},
);
pub struct NetBuf {
buf: Vec<u8>,