Start improving client facing errors

This commit is contained in:
Dominik Werder
2024-08-20 16:16:43 +02:00
parent cb92317bf6
commit 7a8d071c7a
23 changed files with 984 additions and 654 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,6 @@
[package]
name = "daqbuffer"
version = "0.5.2"
version = "0.5.3-aa.0"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"

View File

@@ -1,7 +1,7 @@
use crate::ErrConv;
use chrono::DateTime;
use chrono::Utc;
use err::Error;
use err::thiserror;
use err::ThisError;
use netpod::log::*;
use netpod::range::evrange::NanoRange;
use netpod::ChConf;
@@ -13,6 +13,19 @@ use netpod::TsMs;
use std::time::Duration;
use tokio_postgres::Client;
#[derive(Debug, ThisError)]
#[cstm(name = "DbChannelConfig")]
pub enum Error {
Pg(#[from] tokio_postgres::Error),
#[error("NotFound({0}, {1})")]
NotFound(SfDbChannel, NanoRange),
SeriesNotFound(String, u64),
BadScalarType(i32),
BadShape(Vec<i32>),
BadKind(i16),
NoInput,
}
/// It is an unsolved question as to how we want to uniquely address channels.
/// Currently, the usual (backend, channelname) works in 99% of the cases, but the edge-cases
/// are not solved. At the same time, it is desirable to avoid to complicate things for users.
@@ -27,21 +40,6 @@ pub(super) async fn chconf_best_matching_for_name_and_range(
pg: &Client,
) -> Result<ChConf, Error> {
debug!("chconf_best_matching_for_name_and_range {channel:?} {range:?}");
#[cfg(DISABLED)]
if ncc.node_config.cluster.scylla.is_none() {
let e = Error::with_msg_no_trace(format!(
"chconf_best_matching_for_name_and_range but not a scylla backend"
));
error!("{e}");
return Err(e);
};
#[cfg(DISABLED)]
if backend != ncc.node_config.cluster.backend {
warn!(
"mismatched backend {} vs {}",
backend, ncc.node_config.cluster.backend
);
}
let sql = concat!(
"select unnest(tscs) as tsc, series, scalar_type, shape_dims",
" from series_by_channel",
@@ -52,10 +50,9 @@ pub(super) async fn chconf_best_matching_for_name_and_range(
);
let res = pg
.query(sql, &[&channel.backend(), &channel.name(), &channel.kind().to_db_i16()])
.await
.err_conv()?;
.await?;
if res.len() == 0 {
let e = Error::with_public_msg_no_trace(format!("can not find channel information for {channel:?} {range:?}"));
let e = Error::NotFound(channel, range);
warn!("{e}");
Err(e)
} else if res.len() > 1 {
@@ -67,8 +64,9 @@ pub(super) async fn chconf_best_matching_for_name_and_range(
// TODO can I get a slice from psql driver?
let shape_dims: Vec<i32> = r.get(3);
let series = series as u64;
let _scalar_type = ScalarType::from_scylla_i32(scalar_type)?;
let _shape = Shape::from_scylla_shape_dims(&shape_dims)?;
let _scalar_type =
ScalarType::from_scylla_i32(scalar_type).map_err(|_| Error::BadScalarType(scalar_type))?;
let _shape = Shape::from_scylla_shape_dims(&shape_dims).map_err(|_| Error::BadShape(shape_dims))?;
let tsms = tsc.signed_duration_since(DateTime::UNIX_EPOCH).num_milliseconds() as u64;
let ts = TsMs::from_ms_u64(tsms);
rows.push((ts, series));
@@ -88,8 +86,8 @@ pub(super) async fn chconf_best_matching_for_name_and_range(
let shape_dims: Vec<i32> = r.get(3);
let series = series as u64;
let kind = channel.kind();
let scalar_type = ScalarType::from_scylla_i32(scalar_type)?;
let shape = Shape::from_scylla_shape_dims(&shape_dims)?;
let scalar_type = ScalarType::from_scylla_i32(scalar_type).map_err(|_| Error::BadScalarType(scalar_type))?;
let shape = Shape::from_scylla_shape_dims(&shape_dims).map_err(|_| Error::BadShape(shape_dims))?;
let ret = ChConf::new(channel.backend(), series, kind, scalar_type, shape, channel.name());
Ok(ret)
}
@@ -97,7 +95,7 @@ pub(super) async fn chconf_best_matching_for_name_and_range(
fn decide_best_matching_index(range: (TsMs, TsMs), rows: &[TsMs]) -> Result<usize, Error> {
if rows.len() < 1 {
let e = Error::with_msg_no_trace("decide_best_matching_index no rows");
let e = Error::NoInput;
warn!("{e}");
Err(e)
} else {
@@ -205,22 +203,22 @@ pub(super) async fn chconf_for_series(backend: &str, series: u64, pg: &Client) -
"select channel, scalar_type, shape_dims, kind from series_by_channel where facility = $1 and series = $2",
&[&backend, &(series as i64)],
)
.await
.err_conv()?;
.await?;
if res.len() < 1 {
let e = Error::with_public_msg_no_trace(format!(
"can not find channel information backend {backend} series {series}"
));
let e = Error::SeriesNotFound(backend.into(), series);
warn!("{e}");
Err(e)
} else {
let row = res.first().unwrap();
let name: String = row.get(0);
let scalar_type = ScalarType::from_dtype_index(row.get::<_, i32>(1) as u8)?;
let scalar_type = row.get::<_, i32>(1);
let scalar_type =
ScalarType::from_dtype_index(scalar_type as _).map_err(|_| Error::BadScalarType(scalar_type))?;
// TODO can I get a slice from psql driver?
let shape = Shape::from_scylla_shape_dims(&row.get::<_, Vec<i32>>(2))?;
let shape = row.get::<_, Vec<i32>>(2);
let shape = Shape::from_scylla_shape_dims(&shape).map_err(|_| Error::BadShape(shape))?;
let kind: i16 = row.get(3);
let kind = SeriesKind::from_db_i16(kind)?;
let kind = SeriesKind::from_db_i16(kind).map_err(|_| Error::BadKind(kind))?;
let ret = ChConf::new(backend, series, kind, scalar_type, shape, name);
Ok(ret)
}

View File

@@ -23,6 +23,7 @@ pub enum Error {
ChannelSend,
ChannelRecv,
Join,
ChannelConfig(#[from] crate::channelconfig::Error),
}
impl From<RecvError> for Error {
@@ -39,8 +40,12 @@ impl err::ToErr for Error {
#[derive(Debug)]
enum Job {
ChConfBestMatchingNameRange(SfDbChannel, NanoRange, Sender<Result<ChConf, Error>>),
ChConfForSeries(String, u64, Sender<Result<ChConf, Error>>),
ChConfBestMatchingNameRange(
SfDbChannel,
NanoRange,
Sender<Result<ChConf, crate::channelconfig::Error>>,
),
ChConfForSeries(String, u64, Sender<Result<ChConf, crate::channelconfig::Error>>),
InfoForSeriesIds(
Vec<u64>,
Sender<Result<Vec<Option<crate::channelinfo::ChannelInfo>>, crate::channelinfo::Error>>,
@@ -58,26 +63,28 @@ pub struct PgQueue {
}
impl PgQueue {
pub async fn chconf_for_series(
&self,
backend: &str,
series: u64,
) -> Result<Receiver<Result<ChConf, Error>>, Error> {
let (tx, rx) = async_channel::bounded(1);
let job = Job::ChConfForSeries(backend.into(), series, tx);
self.tx.send(job).await.map_err(|_| Error::ChannelSend)?;
Ok(rx)
}
pub async fn chconf_best_matching_name_range(
&self,
channel: SfDbChannel,
range: NanoRange,
) -> Result<Receiver<Result<ChConf, Error>>, Error> {
) -> Result<Result<ChConf, crate::channelconfig::Error>, netpod::AsyncChannelError> {
let (tx, rx) = async_channel::bounded(1);
let job = Job::ChConfBestMatchingNameRange(channel, range, tx);
self.tx.send(job).await.map_err(|_| Error::ChannelSend)?;
Ok(rx)
self.tx.send(job).await.map_err(|_| netpod::AsyncChannelError::Send)?;
let res = rx.recv().await.map_err(|_| netpod::AsyncChannelError::Recv)?;
Ok(res)
}
pub async fn chconf_for_series(
&self,
backend: &str,
series: u64,
) -> Result<Result<ChConf, crate::channelconfig::Error>, netpod::AsyncChannelError> {
let (tx, rx) = async_channel::bounded(1);
let job = Job::ChConfForSeries(backend.into(), series, tx);
self.tx.send(job).await.map_err(|_| netpod::AsyncChannelError::Send)?;
let res = rx.recv().await.map_err(|_| netpod::AsyncChannelError::Recv)?;
Ok(res)
}
pub async fn info_for_series_ids(

View File

@@ -74,6 +74,57 @@ pub fn body_bytes<D: Into<Bytes>>(body: D) -> StreamBody {
http_body_util::StreamBody::new(Box::pin(stream))
}
pub fn internal_error() -> http::Response<StreamBody> {
let mut res = http::Response::new(body_empty());
*res.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
res
}
pub fn error_response(msg: String, reqid: impl AsRef<str>) -> http::Response<StreamBody> {
let status = StatusCode::INTERNAL_SERVER_ERROR;
let js = serde_json::json!({
"message": msg.to_string(),
"requestid": reqid.as_ref(),
});
if let Ok(body) = serde_json::to_string_pretty(&js) {
match Response::builder()
.status(status)
.header(http::header::CONTENT_TYPE, APP_JSON)
.body(body_string(body))
{
Ok(res) => res,
Err(e) => {
error!("can not generate http error response {e}");
internal_error()
}
}
} else {
internal_error()
}
}
pub fn not_found_response(msg: String, reqid: impl AsRef<str>) -> http::Response<StreamBody> {
let status = StatusCode::NOT_FOUND;
let js = serde_json::json!({
"message": msg.to_string(),
"requestid": reqid.as_ref(),
});
if let Ok(body) = serde_json::to_string_pretty(&js) {
match Response::builder()
.status(status)
.header(http::header::CONTENT_TYPE, APP_JSON)
.body(body_string(body))
{
Ok(res) => res,
Err(e) => {
error!("can not generate http error response {e}");
internal_error()
}
}
} else {
internal_error()
}
}
pub trait IntoBody {
fn into_body(self) -> StreamBody;
}
@@ -153,6 +204,7 @@ impl Stream for StreamIncoming {
if x.is_data() {
Ready(Some(Ok(x.into_data().unwrap())))
} else {
warn!("non-data in stream: {x:?}");
Ready(Some(Ok(Bytes::new())))
}
}

View File

@@ -1,15 +1,16 @@
use crate::bodystream::response;
use crate::bodystream::response_err_msg;
use crate::bodystream::ToPublicResponse;
use crate::channelconfig::ch_conf_from_binned;
use crate::err::Error;
use crate::requests::accepts_json_or_all;
use crate::requests::accepts_octets;
use crate::ServiceSharedResources;
use dbconn::worker::PgQueue;
use err::thiserror;
use err::ThisError;
use http::Method;
use http::StatusCode;
use httpclient::body_empty;
use httpclient::error_response;
use httpclient::not_found_response;
use httpclient::IntoBody;
use httpclient::Requ;
use httpclient::StreamResponse;
@@ -25,69 +26,27 @@ use query::api4::binned::BinnedQuery;
use tracing::Instrument;
use url::Url;
async fn binned_json(
url: Url,
req: Requ,
ctx: &ReqCtx,
pgqueue: &PgQueue,
ncc: &NodeConfigCached,
) -> Result<StreamResponse, Error> {
debug!("{:?}", req);
let reqid = crate::status_board()
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?
.new_status_id();
let (_head, _body) = req.into_parts();
let query = BinnedQuery::from_url(&url).map_err(|e| {
error!("binned_json: {e:?}");
let msg = format!("can not parse query: {}", e.msg());
e.add_public_msg(msg)
})?;
// TODO handle None case better and return 404
let ch_conf = ch_conf_from_binned(&query, ctx, pgqueue, ncc)
.await?
.ok_or_else(|| Error::with_msg_no_trace("channel not found"))?;
let span1 = span!(
Level::INFO,
"httpret::binned",
reqid,
beg = query.range().beg_u64() / SEC,
end = query.range().end_u64() / SEC,
ch = query.channel().name(),
);
span1.in_scope(|| {
debug!("begin");
});
let open_bytes = OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone());
let open_bytes = Box::pin(open_bytes);
let item = streams::timebinnedjson::timebinned_json(query, ch_conf, ctx, open_bytes)
.instrument(span1)
.await?;
let ret = response(StatusCode::OK).body(ToJsonBody::from(&item).into_body())?;
Ok(ret)
#[derive(Debug, ThisError)]
#[cstm(name = "Api4Binned")]
pub enum Error {
ChannelNotFound,
BadQuery(String),
HttpLib(#[from] http::Error),
ChannelConfig(crate::channelconfig::Error),
Retrieval(#[from] crate::RetrievalError),
EventsCbor(#[from] streams::plaineventscbor::Error),
EventsJson(#[from] streams::plaineventsjson::Error),
ServerError,
BinnedStream(::err::Error),
}
async fn binned(req: Requ, ctx: &ReqCtx, pgqueue: &PgQueue, ncc: &NodeConfigCached) -> Result<StreamResponse, Error> {
let url = req_uri_to_url(req.uri())?;
if req
.uri()
.path_and_query()
.map_or(false, |x| x.as_str().contains("DOERR"))
{
Err(Error::with_msg_no_trace("hidden message").add_public_msg("PublicMessage"))?;
}
if accepts_json_or_all(&req.headers()) {
Ok(binned_json(url, req, ctx, pgqueue, ncc).await?)
} else if accepts_octets(&req.headers()) {
Ok(response_err_msg(
StatusCode::NOT_ACCEPTABLE,
format!("binary binned data not yet available"),
)?)
} else {
let ret = response_err_msg(
StatusCode::NOT_ACCEPTABLE,
format!("Unsupported Accept: {:?}", req.headers()),
)?;
Ok(ret)
impl From<crate::channelconfig::Error> for Error {
fn from(value: crate::channelconfig::Error) -> Self {
use crate::channelconfig::Error::*;
match value {
NotFound(_) => Self::ChannelNotFound,
_ => Self::ChannelConfig(value),
}
}
}
@@ -110,14 +69,86 @@ impl BinnedHandler {
ncc: &NodeConfigCached,
) -> Result<StreamResponse, Error> {
if req.method() != Method::GET {
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?);
return Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?);
}
match binned(req, ctx, &shared_res.pgqueue, ncc).await {
Ok(ret) => Ok(ret),
Err(e) => {
warn!("BinnedHandler handle sees: {e}");
Ok(e.to_public_response())
}
Err(e) => match e {
Error::ChannelNotFound => {
let res = not_found_response("channel not found".into(), ctx.reqid());
Ok(res)
}
Error::BadQuery(msg) => {
let res = error_response(format!("bad query: {msg}"), ctx.reqid());
Ok(res)
}
_ => {
error!("EventsHandler sees: {e}");
Ok(error_response(e.public_message(), ctx.reqid()))
}
},
}
}
}
async fn binned(req: Requ, ctx: &ReqCtx, pgqueue: &PgQueue, ncc: &NodeConfigCached) -> Result<StreamResponse, Error> {
let url = req_uri_to_url(req.uri()).map_err(|e| Error::BadQuery(e.to_string()))?;
if req
.uri()
.path_and_query()
.map_or(false, |x| x.as_str().contains("DOERR"))
{
Err(Error::ServerError)?;
}
if accepts_json_or_all(&req.headers()) {
Ok(binned_json(url, req, ctx, pgqueue, ncc).await?)
} else if accepts_octets(&req.headers()) {
Ok(error_response(
format!("binary binned data not yet available"),
ctx.reqid(),
))
} else {
let ret = error_response(format!("Unsupported Accept: {:?}", req.headers()), ctx.reqid());
Ok(ret)
}
}
async fn binned_json(
url: Url,
req: Requ,
ctx: &ReqCtx,
pgqueue: &PgQueue,
ncc: &NodeConfigCached,
) -> Result<StreamResponse, Error> {
debug!("{:?}", req);
let reqid = crate::status_board().map_err(|_e| Error::ServerError)?.new_status_id();
let (_head, _body) = req.into_parts();
let query = BinnedQuery::from_url(&url).map_err(|e| {
error!("binned_json: {e:?}");
Error::BadQuery(e.to_string())
})?;
// TODO handle None case better and return 404
let ch_conf = ch_conf_from_binned(&query, ctx, pgqueue, ncc)
.await?
.ok_or_else(|| Error::ChannelNotFound)?;
let span1 = span!(
Level::INFO,
"httpret::binned",
reqid,
beg = query.range().beg_u64() / SEC,
end = query.range().end_u64() / SEC,
ch = query.channel().name(),
);
span1.in_scope(|| {
debug!("begin");
});
let open_bytes = OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone());
let open_bytes = Box::pin(open_bytes);
let item = streams::timebinnedjson::timebinned_json(query, ch_conf, ctx, open_bytes)
.instrument(span1)
.await
.map_err(|e| Error::BinnedStream(e))?;
let ret = response(StatusCode::OK).body(ToJsonBody::from(&item).into_body())?;
// let ret = error_response(e.public_message(), ctx.reqid());
Ok(ret)
}

View File

@@ -1,5 +1,4 @@
use crate::bodystream::response;
use crate::bodystream::response_err_msg;
use async_channel::Receiver;
use async_channel::Sender;
use bytes::Bytes;
@@ -14,6 +13,7 @@ use http::Response;
use http::StatusCode;
use httpclient::body_empty;
use httpclient::body_stream;
use httpclient::error_response;
use httpclient::Requ;
use httpclient::StreamResponse;
use netpod::log::*;
@@ -76,8 +76,7 @@ impl FindActiveHandler {
Ok(ret) => Ok(ret),
Err(e) => {
error!("{e}");
let res = response_err_msg(StatusCode::NOT_ACCEPTABLE, e.to_public_error())
.map_err(|_| FindActiveError::InternalError)?;
let res = error_response(e.to_public_error().to_string(), "missing-req");
Ok(res)
}
}

View File

@@ -1,4 +1,3 @@
use crate::bodystream::response_err_msg;
use crate::response;
use crate::ReqCtx;
use crate::ServiceSharedResources;
@@ -10,6 +9,7 @@ use http::Method;
use http::StatusCode;
use httpclient::body_empty;
use httpclient::body_stream;
use httpclient::error_response;
use httpclient::read_body_bytes;
use httpclient::Requ;
use httpclient::StreamResponse;
@@ -50,7 +50,7 @@ impl EventDataHandler {
pub async fn handle(
&self,
req: Requ,
_ctx: &ReqCtx,
ctx: &ReqCtx,
ncc: &NodeConfigCached,
shared_res: Arc<ServiceSharedResources>,
) -> Result<StreamResponse, EventDataError> {
@@ -63,8 +63,7 @@ impl EventDataHandler {
Ok(ret) => Ok(ret),
Err(e) => {
error!("{e}");
let res = response_err_msg(StatusCode::NOT_ACCEPTABLE, e.to_public_error())
.map_err(|_| EventDataError::InternalError)?;
let res = error_response(e.to_public_error().to_string(), ctx.reqid());
Ok(res)
}
}

View File

@@ -1,23 +1,25 @@
use crate::bodystream::response_err_msg;
use crate::channelconfig::chconf_from_events_quorum;
use crate::err::Error;
use crate::requests::accepts_cbor_framed;
use crate::requests::accepts_json_framed;
use crate::requests::accepts_json_or_all;
use crate::response;
use crate::ServiceSharedResources;
use crate::ToPublicResponse;
use bytes::Bytes;
use bytes::BytesMut;
use dbconn::worker::PgQueue;
use err::thiserror;
use err::ThisError;
use futures_util::future;
use futures_util::stream;
use futures_util::Stream;
use futures_util::StreamExt;
use http::header::CONTENT_TYPE;
use http::Method;
use http::StatusCode;
use httpclient::body_empty;
use httpclient::body_stream;
use httpclient::error_response;
use httpclient::not_found_response;
use httpclient::IntoBody;
use httpclient::Requ;
use httpclient::StreamResponse;
@@ -28,11 +30,36 @@ use netpod::ChannelTypeConfigGen;
use netpod::FromUrl;
use netpod::NodeConfigCached;
use netpod::ReqCtx;
use netpod::APP_CBOR_FRAMED;
use netpod::APP_JSON;
use netpod::APP_JSON_FRAMED;
use netpod::HEADER_NAME_REQUEST_ID;
use nodenet::client::OpenBoxedBytesViaHttp;
use query::api4::events::PlainEventsQuery;
use streams::instrument::InstrumentStream;
use tracing::Instrument;
#[derive(Debug, ThisError)]
#[cstm(name = "Api4Events")]
pub enum Error {
ChannelNotFound,
HttpLib(#[from] http::Error),
ChannelConfig(crate::channelconfig::Error),
Retrieval(#[from] crate::RetrievalError),
EventsCbor(#[from] streams::plaineventscbor::Error),
EventsJson(#[from] streams::plaineventsjson::Error),
}
impl From<crate::channelconfig::Error> for Error {
fn from(value: crate::channelconfig::Error) -> Self {
use crate::channelconfig::Error::*;
match value {
NotFound(_) => Self::ChannelNotFound,
_ => Self::ChannelConfig(value),
}
}
}
pub struct EventsHandler {}
impl EventsHandler {
@@ -50,9 +77,9 @@ impl EventsHandler {
ctx: &ReqCtx,
shared_res: &ServiceSharedResources,
ncc: &NodeConfigCached,
) -> Result<StreamResponse, Error> {
) -> Result<StreamResponse, crate::err::Error> {
if req.method() != Method::GET {
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?);
return Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?);
}
let self_name = "handle";
let url = req_uri_to_url(req.uri())?;
@@ -73,10 +100,16 @@ impl EventsHandler {
.await
{
Ok(ret) => Ok(ret),
Err(e) => {
error!("EventsHandler sees: {e}");
Ok(e.to_public_response())
}
Err(e) => match e {
Error::ChannelNotFound => {
let res = not_found_response("channel not found".into(), ctx.reqid());
Ok(res)
}
_ => {
error!("EventsHandler sees: {e}");
Ok(error_response(e.public_message(), ctx.reqid()))
}
},
}
}
}
@@ -90,7 +123,7 @@ async fn plain_events(
) -> Result<StreamResponse, Error> {
let ch_conf = chconf_from_events_quorum(&evq, ctx, pgqueue, ncc)
.await?
.ok_or_else(|| Error::with_msg_no_trace("channel not found"))?;
.ok_or_else(|| Error::ChannelNotFound)?;
if accepts_cbor_framed(req.headers()) {
Ok(plain_events_cbor_framed(req, evq, ch_conf, ctx, ncc).await?)
} else if accepts_json_framed(req.headers()) {
@@ -98,7 +131,7 @@ async fn plain_events(
} else if accepts_json_or_all(req.headers()) {
Ok(plain_events_json(req, evq, ch_conf, ctx, ncc).await?)
} else {
let ret = response_err_msg(StatusCode::NOT_ACCEPTABLE, format!("unsupported accept {:?}", req))?;
let ret = error_response(format!("unsupported accept"), ctx.reqid());
Ok(ret)
}
}
@@ -124,7 +157,10 @@ async fn plain_events_cbor_framed(
tracing::Span::none()
};
let stream = InstrumentStream::new(stream, logspan);
let ret = response(StatusCode::OK).body(body_stream(stream))?;
let ret = response(StatusCode::OK)
.header(CONTENT_TYPE, APP_CBOR_FRAMED)
.header(HEADER_NAME_REQUEST_ID, ctx.reqid())
.body(body_stream(stream))?;
Ok(ret)
}
@@ -139,7 +175,10 @@ async fn plain_events_json_framed(
let open_bytes = OpenBoxedBytesViaHttp::new(ncc.node_config.cluster.clone());
let stream = streams::plaineventsjson::plain_events_json_stream(&evq, ch_conf, ctx, Box::pin(open_bytes)).await?;
let stream = bytes_chunks_to_len_framed_str(stream);
let ret = response(StatusCode::OK).body(body_stream(stream))?;
let ret = response(StatusCode::OK)
.header(CONTENT_TYPE, APP_JSON_FRAMED)
.header(HEADER_NAME_REQUEST_ID, ctx.reqid())
.body(body_stream(stream))?;
Ok(ret)
}
@@ -167,12 +206,15 @@ async fn plain_events_json(
return Err(e.into());
}
};
let ret = response(StatusCode::OK).body(ToJsonBody::from(&item).into_body())?;
let ret = response(StatusCode::OK)
.header(CONTENT_TYPE, APP_JSON)
.header(HEADER_NAME_REQUEST_ID, ctx.reqid())
.body(ToJsonBody::from(&item).into_body())?;
debug!("{self_name} response created");
Ok(ret)
}
fn bytes_chunks_to_framed<S, T>(stream: S) -> impl Stream<Item = Result<Bytes, Error>>
fn bytes_chunks_to_framed<S, T>(stream: S) -> impl Stream<Item = Result<Bytes, crate::err::Error>>
where
S: Stream<Item = Result<T, err::Error>>,
T: Into<Bytes>,
@@ -191,19 +233,19 @@ where
b2.put_slice(&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]);
let mut b3 = BytesMut::with_capacity(16);
b3.put_slice(&[0, 0, 0, 0, 0, 0, 0, 0][..pad]);
stream::iter([Ok::<_, Error>(b2.freeze()), Ok(buf), Ok(b3.freeze())])
stream::iter([Ok::<_, crate::err::Error>(b2.freeze()), Ok(buf), Ok(b3.freeze())])
}
Err(e) => {
let e = Error::with_msg_no_trace(e.to_string());
let e = crate::err::Error::with_msg_no_trace(e.to_string());
stream::iter([Err(e), Ok(Bytes::new()), Ok(Bytes::new())])
}
})
.filter(|x| if let Ok(x) = x { ready(x.len() > 0) } else { ready(true) })
}
fn bytes_chunks_to_len_framed_str<S, T>(stream: S) -> impl Stream<Item = Result<String, Error>>
fn bytes_chunks_to_len_framed_str<S, T>(stream: S) -> impl Stream<Item = Result<String, crate::err::Error>>
where
S: Stream<Item = Result<T, err::Error>>,
S: Stream<Item = Result<T, ::err::Error>>,
T: Into<String>,
{
use future::ready;
@@ -214,10 +256,10 @@ where
let s = y.into();
let mut b2 = String::with_capacity(16);
write!(b2, "\n{}\n", s.len()).unwrap();
stream::iter([Ok::<_, Error>(b2), Ok(s)])
stream::iter([Ok::<_, crate::err::Error>(b2), Ok(s)])
}
Err(e) => {
let e = Error::with_msg_no_trace(e.to_string());
let e = crate::err::Error::with_msg_no_trace(e.to_string());
stream::iter([Err(e), Ok(String::new())])
}
})

View File

@@ -17,14 +17,6 @@ where
Response::builder().status(status)
}
pub fn response_err_msg<T>(status: StatusCode, msg: T) -> Result<StreamResponse, RetrievalError>
where
T: ToString,
{
let ret = response(status).body(body_string(msg))?;
Ok(ret)
}
pub trait ToPublicResponse {
fn to_public_response(&self) -> StreamResponse;
}

View File

@@ -1,7 +1,6 @@
use crate::err::Error;
use crate::response;
use crate::ServiceSharedResources;
use crate::ToPublicResponse;
use core::fmt;
use dbconn::create_connection;
use dbconn::worker::PgQueue;
use futures_util::StreamExt;
@@ -36,12 +35,191 @@ use netpod::APP_JSON;
use nodenet::configquorum::find_config_basics_quorum;
use query::api4::binned::BinnedQuery;
use query::api4::events::PlainEventsQuery;
use scyllaconn::errconv::ErrConv;
use serde::Deserialize;
use serde::Serialize;
use std::collections::BTreeMap;
use url::Url;
#[derive(Debug)]
pub enum Error {
NotFound(SfDbChannel),
ConfigQuorum(nodenet::configquorum::Error),
ConfigNode(nodenet::channelconfig::Error),
Http(crate::Error),
HttpCrate(http::Error),
// TODO create dedicated error type for query parsing
BadQuery(err::Error),
MissingBackend,
MissingScalarType,
MissingShape,
MissingShapeKind,
MissingEdge,
Uri(netpod::UriError),
ChannelConfigQuery(err::Error),
ExpectScyllaBackend,
Pg(dbconn::pg::Error),
Scylla(String),
Join,
OtherErr(err::Error),
PgWorker(dbconn::worker::Error),
Async(netpod::AsyncChannelError),
ChannelConfig(dbconn::channelconfig::Error),
}
impl fmt::Display for Error {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
let name = "HttpChannelConfigError";
write!(fmt, "{name}(")?;
match self {
Error::NotFound(chn) => write!(fmt, "NotFound({chn}")?,
Error::ConfigQuorum(e) => write!(fmt, "ConfigQuorum({e})")?,
Error::ConfigNode(e) => write!(fmt, "ConfigNode({e})")?,
Error::Http(e) => write!(fmt, "Http({e})")?,
Error::HttpCrate(e) => write!(fmt, "HttpCrate({e})")?,
Error::BadQuery(e) => write!(fmt, "BadQuery({e})")?,
Error::MissingBackend => write!(fmt, "MissingBackend")?,
Error::MissingScalarType => write!(fmt, "MissingScalarType")?,
Error::MissingShape => write!(fmt, "MissingShape")?,
Error::MissingShapeKind => write!(fmt, "MissingShapeKind")?,
Error::MissingEdge => write!(fmt, "MissingEdge")?,
Error::Uri(x) => write!(fmt, "Uri({x})")?,
Error::ChannelConfigQuery(e) => write!(fmt, "ChannelConfigQuery({e})")?,
Error::ExpectScyllaBackend => write!(fmt, "ExpectScyllaBackend")?,
Error::Pg(e) => write!(fmt, "Pg({e})")?,
Error::Scylla(e) => write!(fmt, "Scylla({e})")?,
Error::Join => write!(fmt, "Join")?,
Error::OtherErr(e) => write!(fmt, "OtherErr({e})")?,
Error::PgWorker(e) => write!(fmt, "PgWorker({e})")?,
Error::Async(e) => write!(fmt, "Async({e})")?,
Error::ChannelConfig(e) => write!(fmt, "ChannelConfig({e})")?,
}
write!(fmt, ")")?;
Ok(())
}
}
fn other_err_error(e: err::Error) -> Error {
Error::OtherErr(e)
}
impl std::error::Error for Error {}
impl From<crate::Error> for Error {
fn from(e: crate::Error) -> Self {
Self::Http(e)
}
}
impl From<http::Error> for Error {
fn from(e: http::Error) -> Self {
Self::HttpCrate(e)
}
}
impl From<nodenet::configquorum::Error> for Error {
fn from(e: nodenet::configquorum::Error) -> Self {
use nodenet::configquorum::Error::*;
match e {
NotFound(a) => Self::NotFound(a),
_ => Self::ConfigQuorum(e),
}
}
}
impl From<nodenet::channelconfig::Error> for Error {
fn from(e: nodenet::channelconfig::Error) -> Self {
match e {
nodenet::channelconfig::Error::NotFoundChannel(a) => Self::NotFound(a),
_ => Self::ConfigNode(e),
}
}
}
impl From<netpod::UriError> for Error {
fn from(e: netpod::UriError) -> Self {
Self::Uri(e)
}
}
impl From<dbconn::pg::Error> for Error {
fn from(e: dbconn::pg::Error) -> Self {
Self::Pg(e)
}
}
impl From<dbconn::worker::Error> for Error {
fn from(e: dbconn::worker::Error) -> Self {
Self::PgWorker(e)
}
}
impl From<scyllaconn::scylla::cql_to_rust::FromRowError> for Error {
fn from(e: scyllaconn::scylla::cql_to_rust::FromRowError) -> Self {
Self::Scylla(e.to_string())
}
}
impl From<scyllaconn::scylla::transport::errors::QueryError> for Error {
fn from(e: scyllaconn::scylla::transport::errors::QueryError) -> Self {
Self::Scylla(e.to_string())
}
}
impl From<scyllaconn::scylla::transport::iterator::NextRowError> for Error {
fn from(e: scyllaconn::scylla::transport::iterator::NextRowError) -> Self {
Self::Scylla(e.to_string())
}
}
impl From<taskrun::tokio::task::JoinError> for Error {
fn from(_e: taskrun::tokio::task::JoinError) -> Self {
Self::Join
}
}
impl From<netpod::AsyncChannelError> for Error {
fn from(e: netpod::AsyncChannelError) -> Self {
Self::Async(e)
}
}
impl From<dbconn::channelconfig::Error> for Error {
fn from(e: dbconn::channelconfig::Error) -> Self {
Self::ChannelConfig(e)
}
}
impl From<Error> for crate::err::Error {
fn from(e: Error) -> Self {
Self::with_msg_no_trace(format!("{e} TODO add public message"))
}
}
impl Error {
fn to_public_response(self) -> http::Response<httpclient::StreamBody> {
use httpclient::internal_error;
let status = StatusCode::INTERNAL_SERVER_ERROR;
let js = serde_json::json!({
"message": self.to_string(),
});
if let Ok(body) = serde_json::to_string_pretty(&js) {
match response(status)
.header(http::header::CONTENT_TYPE, APP_JSON)
.body(body_string(body))
{
Ok(res) => res,
Err(e) => {
error!("can not generate http error response {e}");
internal_error()
}
}
} else {
internal_error()
}
}
}
impl crate::IntoBoxedError for Error {}
pub async fn chconf_from_events_quorum(
q: &PlainEventsQuery,
ctx: &ReqCtx,
@@ -118,7 +296,7 @@ impl ChannelConfigHandler {
node_config: &NodeConfigCached,
) -> Result<StreamResponse, Error> {
let url = req_uri_to_url(req.uri())?;
let q = ChannelConfigQuery::from_url(&url)?;
let q = ChannelConfigQuery::from_url(&url).map_err(|e| Error::BadQuery(e))?;
let conf =
nodenet::channelconfig::channel_config(q.range.clone(), q.channel.clone(), pgqueue, node_config).await?;
match conf {
@@ -176,7 +354,7 @@ impl ChannelConfigsHandler {
async fn channel_configs(&self, req: Requ, ncc: &NodeConfigCached) -> Result<StreamResponse, Error> {
info!("channel_configs");
let url = req_uri_to_url(req.uri())?;
let q = ChannelConfigQuery::from_url(&url)?;
let q = ChannelConfigQuery::from_url(&url).map_err(|e| Error::BadQuery(e))?;
info!("channel_configs for q {q:?}");
let ch_confs = nodenet::channelconfig::channel_configs(q.channel, ncc).await?;
let ret = response(StatusCode::OK)
@@ -235,7 +413,7 @@ impl ChannelConfigQuorumHandler {
) -> Result<StreamResponse, Error> {
info!("channel_config_quorum");
let url = req_uri_to_url(req.uri())?;
let q = ChannelConfigQuery::from_url(&url)?;
let q = ChannelConfigQuery::from_url(&url).map_err(|e| Error::ChannelConfigQuery(e))?;
info!("channel_config_quorum for q {q:?}");
let ch_confs =
nodenet::configquorum::find_config_basics_quorum(q.channel, q.range.into(), ctx, pgqueue, ncc).await?;
@@ -266,12 +444,12 @@ impl FromUrl for ChannelsWithTypeQuery {
fn from_pairs(pairs: &BTreeMap<String, String>) -> Result<Self, err::Error> {
let s = pairs
.get("scalar_type")
.ok_or_else(|| Error::with_public_msg_no_trace("missing scalar_type"))?;
.ok_or_else(|| err::Error::with_public_msg_no_trace("missing scalar_type"))?;
//let scalar_type = ScalarType::from_bsread_str(s)?;
let scalar_type: ScalarType = serde_json::from_str(&format!("\"{s}\""))?;
let s = pairs
.get("shape")
.ok_or_else(|| Error::with_public_msg_no_trace("missing shape"))?;
.ok_or_else(|| err::Error::with_public_msg_no_trace("missing shape"))?;
let shape = Shape::from_dims_str(s)?;
Ok(Self { scalar_type, shape })
}
@@ -302,19 +480,19 @@ impl FromUrl for ScyllaChannelEventSeriesIdQuery {
fn from_pairs(pairs: &BTreeMap<String, String>) -> Result<Self, err::Error> {
let backend = pairs
.get("backend")
.ok_or_else(|| Error::with_public_msg_no_trace("missing backend"))?
.ok_or_else(|| err::Error::with_public_msg_no_trace("missing backend"))?
.into();
let name = pairs
.get("channelName")
.ok_or_else(|| Error::with_public_msg_no_trace("missing channelName"))?
.ok_or_else(|| err::Error::with_public_msg_no_trace("missing channelName"))?
.into();
let s = pairs
.get("scalarType")
.ok_or_else(|| Error::with_public_msg_no_trace("missing scalarType"))?;
.ok_or_else(|| err::Error::with_public_msg_no_trace("missing scalarType"))?;
let scalar_type: ScalarType = serde_json::from_str(&format!("\"{s}\""))?;
let s = pairs
.get("shape")
.ok_or_else(|| Error::with_public_msg_no_trace("missing shape"))?;
.ok_or_else(|| err::Error::with_public_msg_no_trace("missing shape"))?;
let shape = Shape::from_dims_str(s)?;
let do_create = pairs.get("doCreate").map_or("false", |x| x.as_str()) == "true";
Ok(Self {
@@ -351,15 +529,15 @@ impl FromUrl for ScyllaChannelsActiveQuery {
fn from_pairs(pairs: &BTreeMap<String, String>) -> Result<Self, err::Error> {
let s = pairs
.get("tsedge")
.ok_or_else(|| Error::with_public_msg_no_trace("missing tsedge"))?;
.ok_or_else(|| err::Error::with_public_msg_no_trace("missing tsedge"))?;
let tsedge: u64 = s.parse()?;
let s = pairs
.get("shapeKind")
.ok_or_else(|| Error::with_public_msg_no_trace("missing shapeKind"))?;
.ok_or_else(|| err::Error::with_public_msg_no_trace("missing shapeKind"))?;
let shape_kind: u32 = s.parse()?;
let s = pairs
.get("scalarType")
.ok_or_else(|| Error::with_public_msg_no_trace("missing scalarType"))?;
.ok_or_else(|| err::Error::with_public_msg_no_trace("missing scalarType"))?;
let scalar_type: ScalarType = serde_json::from_str(&format!("\"{s}\""))?;
info!("parsed scalar type inp: {s:?} val: {scalar_type:?}");
Ok(Self {
@@ -390,7 +568,7 @@ impl ScyllaChannelsActive {
.map_or(accept_def, |k| k.to_str().unwrap_or(accept_def));
if accept == APP_JSON || accept == ACCEPT_ALL {
let url = req_uri_to_url(req.uri())?;
let q = ScyllaChannelsActiveQuery::from_url(&url)?;
let q = ScyllaChannelsActiveQuery::from_url(&url).map_err(|e| Error::BadQuery(e))?;
let res = self.get_channels(&q, node_config).await?;
let body = ToJsonBody::from(&res).into_body();
Ok(response(StatusCode::OK).body(body)?)
@@ -411,8 +589,10 @@ impl ScyllaChannelsActive {
.node_config
.cluster
.scylla_st()
.ok_or_else(|| Error::with_public_msg_no_trace(format!("No Scylla configured")))?;
let scy = scyllaconn::conn::create_scy_session(scyco).await?;
.ok_or_else(|| Error::ExpectScyllaBackend)?;
let scy = scyllaconn::conn::create_scy_session(scyco)
.await
.map_err(other_err_error)?;
// Database stores tsedge/ts_msp in units of (10 sec), and we additionally map to the grid.
let tsedge = q.tsedge / 10 / (6 * 2) * (6 * 2);
info!(
@@ -427,11 +607,10 @@ impl ScyllaChannelsActive {
"select series from series_by_ts_msp where part = ? and ts_msp = ? and shape_kind = ? and scalar_type = ?",
(part as i32, tsedge as i32, q.shape_kind as i32, q.scalar_type.to_scylla_i32()),
)
.await
.err_conv()?;
.await.map_err(|e| Error::Scylla(e.to_string()))?;
while let Some(row) = res.next().await {
let row = row.err_conv()?;
let (series,): (i64,) = row.into_typed().err_conv()?;
let row = row?;
let (series,): (i64,) = row.into_typed()?;
ret.push(series as u64);
}
}
@@ -456,11 +635,11 @@ impl FromUrl for IocForChannelQuery {
fn from_pairs(pairs: &BTreeMap<String, String>) -> Result<Self, err::Error> {
let backend = pairs
.get("backend")
.ok_or_else(|| Error::with_public_msg_no_trace("missing backend"))?
.ok_or_else(|| err::Error::with_public_msg_no_trace("missing backend"))?
.into();
let name = pairs
.get("channelName")
.ok_or_else(|| Error::with_public_msg_no_trace("missing channelName"))?
.ok_or_else(|| err::Error::with_public_msg_no_trace("missing channelName"))?
.into();
Ok(Self { backend, name })
}
@@ -492,16 +671,13 @@ impl IocForChannel {
.map_or(accept_def, |k| k.to_str().unwrap_or(accept_def));
if accept == APP_JSON || accept == ACCEPT_ALL {
let url = req_uri_to_url(req.uri())?;
let q = IocForChannelQuery::from_url(&url)?;
let q = IocForChannelQuery::from_url(&url).map_err(|e| Error::BadQuery(e))?;
match self.find(&q, node_config).await {
Ok(k) => {
let body = ToJsonBody::from(&k).into_body();
Ok(response(StatusCode::OK).body(body)?)
}
Err(e) => {
let body = body_string(format!("{:?}", e.public_msg()));
Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(body)?)
}
Err(e) => Ok(e.to_public_response()),
}
} else {
Ok(response(StatusCode::BAD_REQUEST).body(body_empty())?)
@@ -517,7 +693,7 @@ impl IocForChannel {
node_config: &NodeConfigCached,
) -> Result<Option<IocForChannelRes>, Error> {
let dbconf = &node_config.node_config.cluster.database;
let (pg_client, pgjh) = create_connection(dbconf).await?;
let (pg_client, pgjh) = create_connection(dbconf).await.map_err(other_err_error)?;
let rows = pg_client
.query(
"select addr from ioc_by_channel where facility = $1 and channel = $2",
@@ -525,7 +701,7 @@ impl IocForChannel {
)
.await?;
drop(pg_client);
pgjh.await??;
pgjh.await?.map_err(other_err_error)?;
if let Some(row) = rows.first() {
let ioc_addr = row.get(0);
let ret = IocForChannelRes { ioc_addr };
@@ -593,14 +769,13 @@ impl ScyllaSeriesTsMsp {
.map_or(accept_def, |k| k.to_str().unwrap_or(accept_def));
if accept == APP_JSON || accept == ACCEPT_ALL {
let url = req_uri_to_url(req.uri())?;
let q = ScyllaSeriesTsMspQuery::from_url(&url)?;
let q = ScyllaSeriesTsMspQuery::from_url(&url).map_err(|e| Error::BadQuery(e))?;
match self.get_ts_msps(&q, shared_res).await {
Ok(k) => {
let body = ToJsonBody::from(&k).into_body();
Ok(response(StatusCode::OK).body(body)?)
}
Err(e) => Ok(response(StatusCode::INTERNAL_SERVER_ERROR)
.body(body_string(format!("{:?}", e.public_msg())))?),
Err(e) => Ok(e.to_public_response()),
}
} else {
Ok(response(StatusCode::BAD_REQUEST).body(body_empty())?)
@@ -623,12 +798,7 @@ impl ScyllaSeriesTsMsp {
let chconf = shared_res
.pgqueue
.chconf_best_matching_name_range(q.channel.clone(), nano_range)
.await
.map_err(|e| Error::with_msg_no_trace(format!("error from pg worker: {e}")))?
.recv()
.await
.unwrap()
.unwrap();
.await??;
use scyllaconn::SeriesId;
let sid = SeriesId::new(chconf.series());
let scyqueue = shared_res.scyqueue.clone().unwrap();
@@ -710,8 +880,7 @@ impl AmbigiousChannelNames {
let body = ToJsonBody::from(&k).into_body();
Ok(response(StatusCode::OK).body(body)?)
}
Err(e) => Ok(response(StatusCode::INTERNAL_SERVER_ERROR)
.body(body_string(format!("{:?}", e.public_msg())))?),
Err(e) => Ok(e.to_public_response()),
}
} else {
Ok(response(StatusCode::BAD_REQUEST).body(body_empty())?)
@@ -723,7 +892,7 @@ impl AmbigiousChannelNames {
async fn process(&self, ncc: &NodeConfigCached) -> Result<AmbigiousChannelNamesResponse, Error> {
let dbconf = &ncc.node_config.cluster.database;
let (pg_client, pgjh) = create_connection(dbconf).await?;
let (pg_client, pgjh) = create_connection(dbconf).await.map_err(other_err_error)?;
let rows = pg_client
.query(
"select t2.series, t2.channel, t2.scalar_type, t2.shape_dims, t2.agg_kind from series_by_channel t1, series_by_channel t2 where t2.channel = t1.channel and t2.series != t1.series",
@@ -731,14 +900,14 @@ impl AmbigiousChannelNames {
)
.await?;
drop(pg_client);
pgjh.await??;
pgjh.await?.map_err(other_err_error)?;
let mut ret = AmbigiousChannelNamesResponse { ambigious: Vec::new() };
for row in rows {
let g = AmbigiousChannel {
series: row.get::<_, i64>(0) as u64,
name: row.get(1),
scalar_type: ScalarType::from_scylla_i32(row.get(2))?,
shape: Shape::from_scylla_shape_dims(&row.get::<_, Vec<i32>>(3))?,
scalar_type: ScalarType::from_scylla_i32(row.get(2)).map_err(other_err_error)?,
shape: Shape::from_scylla_shape_dims(&row.get::<_, Vec<i32>>(3)).map_err(other_err_error)?,
};
ret.ambigious.push(g);
}
@@ -798,8 +967,7 @@ impl GenerateScyllaTestData {
let body = ToJsonBody::from(&k).into_body();
Ok(response(StatusCode::OK).body(body)?)
}
Err(e) => Ok(response(StatusCode::INTERNAL_SERVER_ERROR)
.body(body_string(format!("{:?}", e.public_msg())))?),
Err(e) => Ok(e.to_public_response()),
}
} else {
Ok(response(StatusCode::BAD_REQUEST).body(body_empty())?)
@@ -811,25 +979,24 @@ impl GenerateScyllaTestData {
async fn process(&self, node_config: &NodeConfigCached) -> Result<(), Error> {
let scyconf = node_config.node_config.cluster.scylla_st().unwrap();
let scy = scyllaconn::conn::create_scy_session(scyconf).await?;
let scy = scyllaconn::conn::create_scy_session(scyconf)
.await
.map_err(other_err_error)?;
let series: u64 = 42001;
// TODO query `ts_msp` for all MSP values und use that to delete from event table first.
// Only later delete also from the `ts_msp` table.
let it = scy
.query_iter("select ts_msp from ts_msp where series = ?", (series as i64,))
.await
.err_conv()?;
.await?;
let mut it = it.into_typed::<(i64,)>();
while let Some(row) = it.next().await {
let row = row.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
let row = row?;
let values = (series as i64, row.0);
scy.query("delete from events_scalar_f64 where series = ? and ts_msp = ?", values)
.await
.err_conv()?;
.await?;
}
scy.query("delete from ts_msp where series = ?", (series as i64,))
.await
.err_conv()?;
.await?;
// Generate
let (msps, lsps, pulses, vals) = test_data_f64_01();
@@ -840,8 +1007,7 @@ impl GenerateScyllaTestData {
"insert into ts_msp (series, ts_msp) values (?, ?)",
(series as i64, msp as i64),
)
.await
.err_conv()?;
.await?;
}
last = msp;
}
@@ -850,8 +1016,7 @@ impl GenerateScyllaTestData {
"insert into events_scalar_f64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)",
(series as i64, msp as i64, lsp as i64, pulse as i64, val),
)
.await
.err_conv()?;
.await?;
}
Ok(())
}

View File

@@ -1,7 +1,6 @@
use err::ToPublicError;
use serde::Deserialize;
use serde::Serialize;
use serde_json::Value as JsVal;
use std::fmt;
use taskrun::tokio;
@@ -108,3 +107,6 @@ impl Convable for std::array::TryFromSliceError {}
impl Convable for err::anyhow::Error {}
impl Convable for crate::RetrievalError {}
impl Convable for httpclient::Error {}
impl Convable for netpod::UriError {}
impl Convable for nodenet::configquorum::Error {}
impl Convable for nodenet::channelconfig::Error {}

View File

@@ -87,6 +87,8 @@ impl IntoBoxedError for api4::databuffer_tools::FindActiveError {}
impl IntoBoxedError for std::string::FromUtf8Error {}
impl IntoBoxedError for std::io::Error {}
impl IntoBoxedError for dbconn::worker::Error {}
impl IntoBoxedError for netpod::UriError {}
impl IntoBoxedError for crate::api4::binned::Error {}
impl<E> From<E> for RetrievalError
where

View File

@@ -4,7 +4,6 @@ pub mod api4;
use crate::api1::channel_search_configs_v1;
use crate::api1::channel_search_list_v1;
use crate::api1::gather_json_2_v1;
use crate::bodystream::response_err_msg;
use crate::err::Error;
use crate::gather::gather_get_json_generic;
use crate::gather::SubRes;
@@ -21,6 +20,7 @@ use http::StatusCode;
use httpclient::body_empty;
use httpclient::body_stream;
use httpclient::body_string;
use httpclient::error_response;
use httpclient::http;
use httpclient::http::header;
use httpclient::read_body_bytes;
@@ -509,9 +509,9 @@ where
let mut query = match QT::from_url(&url) {
Ok(k) => k,
Err(_) => {
let msg = format!("malformed request or missing parameters {:?}", req.uri());
let msg = format!("malformed request or missing parameters {}", req.uri());
warn!("{msg}");
return Ok(response_err_msg(StatusCode::BAD_REQUEST, msg)?);
return Ok(error_response(msg, ctx.reqid()));
}
};
trace!("proxy_backend_query {:?} {:?}", query, req.uri());

View File

@@ -8,6 +8,7 @@ use crate::ReqCtx;
use http::header;
use http::Method;
use http::Request;
use http::Response;
use http::StatusCode;
use http::Uri;
use httpclient::body_empty;
@@ -68,7 +69,7 @@ impl EventsHandler {
let url = req_uri_to_url(&head.uri)?;
let pairs = get_url_query_pairs(&url);
let evq = PlainEventsQuery::from_pairs(&pairs)?;
debug!("{:?}", evq);
debug!("handle_framed {evq:?}");
let query_host = get_query_host_for_backend(evq.backend(), proxy_config)?;
let url_str = format!(
"{}{}",
@@ -92,10 +93,14 @@ impl EventsHandler {
let (head, body) = res.into_parts();
if head.status != StatusCode::OK {
warn!("backend returned error: {head:?}");
Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(body_empty())?)
} else {
debug!("backend returned OK");
Ok(response(StatusCode::OK).body(body_stream(StreamIncoming::new(body)))?)
}
let mut resb = Response::builder().status(head.status);
for h in head.headers {
if let (Some(hn), hv) = h {
resb = resb.header(hn, hv);
}
}
let res = resb.body(body_stream(StreamIncoming::new(body)))?;
Ok(res)
}
}

View File

@@ -63,7 +63,9 @@ use bytes::Bytes;
use chrono::DateTime;
use chrono::TimeZone;
use chrono::Utc;
use err::thiserror;
use err::Error;
use err::ThisError;
use futures_util::Stream;
use futures_util::StreamExt;
use http::Request;
@@ -103,6 +105,7 @@ pub const APP_CBOR_FRAMED: &str = "application/cbor-framed";
pub const APP_JSON_FRAMED: &str = "application/json-framed";
pub const ACCEPT_ALL: &str = "*/*";
pub const X_DAQBUF_REQID: &str = "x-daqbuffer-request-id";
pub const HEADER_NAME_REQUEST_ID: &str = "requestid";
pub const CONNECTION_STATUS_DIV: DtMs = DtMs::from_ms_u64(1000 * 60 * 60);
// pub const TS_MSP_GRID_UNIT: DtMs = DtMs::from_ms_u64(1000 * 10);
@@ -176,6 +179,13 @@ impl CmpZero for usize {
}
}
#[derive(Debug, err::ThisError)]
#[cstm(name = "AsyncChannelError")]
pub enum AsyncChannelError {
Send,
Recv,
}
pub struct BodyStream {
//pub receiver: async_channel::Receiver<Result<Bytes, Error>>,
pub inner: Box<dyn Stream<Item = Result<Bytes, Error>> + Send + Unpin>,
@@ -1071,6 +1081,16 @@ impl SfDbChannel {
}
}
impl fmt::Display for SfDbChannel {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(
fmt,
"SfDbChannel {{ series: {:?}, backend: {:?}, name: {:?}, kind: {:?} }}",
self.series, self.backend, self.name, self.kind
)
}
}
impl FromUrl for SfDbChannel {
fn from_url(url: &Url) -> Result<Self, Error> {
let pairs = get_url_query_pairs(url);
@@ -4123,14 +4143,18 @@ pub fn status_board_init() {
});
}
pub fn req_uri_to_url(uri: &Uri) -> Result<Url, Error> {
#[derive(Debug, ThisError)]
#[cstm(name = "UriError")]
pub enum UriError {
ParseError(Uri),
}
pub fn req_uri_to_url(uri: &Uri) -> Result<Url, UriError> {
if uri.scheme().is_none() {
format!("dummy:{uri}")
.parse()
.map_err(|_| Error::with_msg_no_trace(format!("can not use uri {uri}")))
.map_err(|_| UriError::ParseError(uri.clone()))
} else {
uri.to_string()
.parse()
.map_err(|_| Error::with_msg_no_trace(format!("can not use uri {uri}")))
uri.to_string().parse().map_err(|_| UriError::ParseError(uri.clone()))
}
}

View File

@@ -60,6 +60,12 @@ impl fmt::Debug for NanoRange {
}
}
impl fmt::Display for NanoRange {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt::Debug::fmt(self, fmt)
}
}
impl NanoRange {
pub fn from_date_time(beg: DateTime<Utc>, end: DateTime<Utc>) -> Self {
Self {

View File

@@ -1,5 +1,6 @@
use dbconn::worker::PgQueue;
use err::Error;
use err::thiserror;
use err::ThisError;
use httpclient::url::Url;
use netpod::log::*;
use netpod::range::evrange::NanoRange;
@@ -20,6 +21,50 @@ use netpod::Shape;
use netpod::APP_JSON;
use serde::Serialize;
#[derive(Debug, ThisError)]
#[cstm(name = "ChannelConfigNode")]
pub enum Error {
NotFoundChannel(SfDbChannel),
ChannelConfig(dbconn::channelconfig::Error),
DbWorker(#[from] dbconn::worker::Error),
DiskConfig(#[from] disk::channelconfig::ConfigError),
BackendConfigError,
BadTestSetup,
HttpReqError,
HttpClient(#[from] httpclient::Error),
ConfigParse(#[from] disk::parse::channelconfig::ConfigParseError),
JsonParse(#[from] serde_json::Error),
SearchWithGivenSeries,
AsyncSend,
AsyncRecv,
Todo,
}
impl From<async_channel::RecvError> for Error {
fn from(_value: async_channel::RecvError) -> Self {
Error::AsyncRecv
}
}
impl From<netpod::AsyncChannelError> for Error {
fn from(value: netpod::AsyncChannelError) -> Self {
match value {
netpod::AsyncChannelError::Send => Self::AsyncSend,
netpod::AsyncChannelError::Recv => Self::AsyncRecv,
}
}
}
impl From<dbconn::channelconfig::Error> for Error {
fn from(value: dbconn::channelconfig::Error) -> Self {
use dbconn::channelconfig::Error::*;
match value {
NotFound(chn, _) => Self::NotFoundChannel(chn),
_ => Self::ChannelConfig(value),
}
}
}
const TEST_BACKEND: &str = "testbackend-00";
fn channel_config_test_backend(channel: SfDbChannel) -> Result<ChannelTypeConfigGen, Error> {
@@ -92,8 +137,7 @@ fn channel_config_test_backend(channel: SfDbChannel) -> Result<ChannelTypeConfig
ret
} else {
error!("no test information");
return Err(Error::with_msg_no_trace(format!("no test information"))
.add_public_msg("No channel config for test channel {:?}"));
return Err(Error::NotFoundChannel(channel));
};
Ok(ChannelTypeConfigGen::SfDatabuffer(ret))
}
@@ -108,15 +152,11 @@ pub async fn channel_config(
Ok(Some(channel_config_test_backend(channel)?))
} else if ncc.node_config.cluster.scylla_st().is_some() {
debug!("try to get ChConf for scylla type backend");
let ret = scylla_chconf_from_sf_db_channel(range, channel, pgqueue)
.await
.map_err(Error::from)?;
let ret = scylla_chconf_from_sf_db_channel(range, channel, pgqueue).await?;
Ok(Some(ChannelTypeConfigGen::Scylla(ret)))
} else if ncc.node.sf_databuffer.is_some() {
debug!("channel_config channel {channel:?}");
let k = disk::channelconfig::channel_config_best_match(range, channel.clone(), ncc)
.await
.map_err(|e| Error::from(e.to_string()))?;
let k = disk::channelconfig::channel_config_best_match(range, channel.clone(), ncc).await?;
match k {
Some(config) => {
debug!("channel_config config {config:?}");
@@ -135,10 +175,7 @@ pub async fn channel_config(
None => Ok(None),
}
} else {
return Err(
Error::with_msg_no_trace(format!("no channel config for backend {}", channel.backend()))
.add_public_msg(format!("no channel config for backend {}", channel.backend())),
);
Err(Error::BackendConfigError)
}
}
@@ -154,7 +191,7 @@ pub async fn channel_configs(channel: SfDbChannel, ncc: &NodeConfigCached) -> Re
ChannelTypeConfigGen::Scylla(x) => ChannelConfigsGen::Scylla(x),
ChannelTypeConfigGen::SfDatabuffer(_) => {
// ChannelConfigsGen::SfDatabuffer(todo!())
let e = Error::with_msg_no_trace("channel_configs test backend TODO SfDatabuffer");
let e = Error::BadTestSetup;
warn!("{e}");
return Err(e);
}
@@ -168,15 +205,10 @@ pub async fn channel_configs(channel: SfDbChannel, ncc: &NodeConfigCached) -> Re
Ok(ChannelConfigsGen::Scylla(ret))
} else if ncc.node.sf_databuffer.is_some() {
debug!("channel_config channel {channel:?}");
let configs = disk::channelconfig::channel_configs(channel.clone(), ncc)
.await
.map_err(|e| Error::from(e.to_string()))?;
let configs = disk::channelconfig::channel_configs(channel.clone(), ncc).await?;
Ok(ChannelConfigsGen::SfDatabuffer(configs))
} else {
return Err(
Error::with_msg_no_trace(format!("no channel config for backend {}", channel.backend()))
.add_public_msg(format!("no channel config for backend {}", channel.backend())),
);
return Err(Error::BackendConfigError);
}
}
@@ -196,12 +228,9 @@ pub async fn http_get_channel_config(
let ret: ChannelConfigResponse = serde_json::from_slice(&res.body)?;
Ok(Some(ret))
} else {
let b = &res.body;
let s = String::from_utf8_lossy(&b[0..b.len().min(256)]);
Err(Error::with_msg_no_trace(format!(
"http_get_channel_config {} {}",
res.head.status, s
)))
// let b = &res.body;
// let s = String::from_utf8_lossy(&b[0..b.len().min(256)]);
Err(Error::HttpReqError)
}
}
@@ -210,43 +239,21 @@ async fn scylla_chconf_from_sf_db_channel(
channel: SfDbChannel,
pgqueue: &PgQueue,
) -> Result<ChConf, Error> {
trace!("scylla_chconf_from_sf_db_channel {:?}", channel);
if let Some(series) = channel.series() {
let ret = pgqueue
.chconf_for_series(channel.backend(), series)
.await?
.recv()
.await??;
let ret = pgqueue.chconf_for_series(channel.backend(), series).await??;
Ok(ret)
} else {
// TODO let called function allow to return None instead of error-not-found
let ret = pgqueue
.chconf_best_matching_name_range(channel, range)
.await?
.recv()
.await??;
let ret = pgqueue.chconf_best_matching_name_range(channel, range).await??;
Ok(ret)
}
}
async fn scylla_all_chconf_from_sf_db_channel(channel: &SfDbChannel, _ncc: &NodeConfigCached) -> Result<ChConf, Error> {
if let Some(_) = channel.series() {
let e = Error::with_msg_no_trace(format!(
"scylla_all_chconf_from_sf_db_channel but series anyways specified {channel:?}"
));
// dbconn::channelconfig::chconf_for_series(channel.backend(), series, ncc).await
warn!("{e}");
Err(e)
Err(Error::SearchWithGivenSeries)
} else {
#[cfg(DISABLED)]
{
// TODO let called function allow to return None instead of error-not-found
let ret = dbconn::channelconfig::chconf_from_scylla_type_backend(&channel, ncc)
.await
.map_err(Error::from)?;
Ok(Some(ChannelTypeConfigGen::Scylla(ret)))
}
let e = Error::with_msg_no_trace(format!("scylla_all_chconf_from_sf_db_channel TODO"));
warn!("{e}");
Err(e)
Err(Error::Todo)
}
}

View File

@@ -1,6 +1,7 @@
use crate::channelconfig::http_get_channel_config;
use dbconn::worker::PgQueue;
use err::Error;
use err::thiserror;
use err::ThisError;
use netpod::log::*;
use netpod::range::evrange::SeriesRange;
use netpod::ChConf;
@@ -16,6 +17,30 @@ use std::collections::BTreeMap;
use std::time::Duration;
use taskrun::tokio;
#[derive(Debug, ThisError)]
#[cstm(name = "ConfigQuorum")]
pub enum Error {
NotFound(SfDbChannel),
MissingTimeRange,
Timeout,
ChannelConfig(crate::channelconfig::Error),
ExpectSfDatabufferBackend,
UnsupportedBackend,
BadTimeRange,
DbWorker(#[from] dbconn::worker::Error),
FindChannel(#[from] dbconn::FindChannelError),
}
impl From<crate::channelconfig::Error> for Error {
fn from(value: crate::channelconfig::Error) -> Self {
use crate::channelconfig::Error::*;
match value {
NotFoundChannel(chn) => Self::NotFound(chn),
_ => Self::ChannelConfig(value),
}
}
}
fn decide_sf_ch_config_quorum(inp: Vec<ChannelConfigResponse>) -> Result<Option<ChannelTypeConfigGen>, Error> {
let mut histo = BTreeMap::new();
for item in inp {
@@ -55,7 +80,7 @@ async fn find_sf_ch_config_quorum(
) -> Result<Option<SfChFetchInfo>, Error> {
let range = match range {
SeriesRange::TimeRange(x) => x,
SeriesRange::PulseRange(_) => return Err(Error::with_msg_no_trace("expect TimeRange")),
SeriesRange::PulseRange(_) => return Err(Error::MissingTimeRange),
};
let mut all = Vec::new();
for node in &ncc.node_config.cluster.nodes {
@@ -71,16 +96,14 @@ async fn find_sf_ch_config_quorum(
http_get_channel_config(qu, node.baseurl(), ctx),
)
.await
.map_err(|_| Error::with_msg_no_trace("timeout"))??;
.map_err(|_| Error::Timeout)??;
all.push(res);
}
let all: Vec<_> = all.into_iter().filter_map(|x| x).collect();
let qu = decide_sf_ch_config_quorum(all)?;
match qu {
Some(item) => match item {
ChannelTypeConfigGen::Scylla(_) => Err(Error::with_msg_no_trace(
"find_sf_ch_config_quorum not a sf-databuffer config",
)),
ChannelTypeConfigGen::Scylla(_) => Err(Error::ExpectSfDatabufferBackend),
ChannelTypeConfigGen::SfDatabuffer(item) => Ok(Some(item)),
},
None => Ok(None),
@@ -98,11 +121,7 @@ pub async fn find_config_basics_quorum(
if let Some(_cfg) = &ncc.node.sf_databuffer {
let channel = if channel.name().is_empty() {
if let Some(_) = channel.series() {
pgqueue
.find_sf_channel_by_series(channel)
.await
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?
pgqueue.find_sf_channel_by_series(channel).await??
} else {
channel
}
@@ -114,12 +133,10 @@ pub async fn find_config_basics_quorum(
None => Ok(None),
}
} else if let Some(_) = &ncc.node_config.cluster.scylla_st() {
let range = netpod::range::evrange::NanoRange::try_from(&range)?;
let range = netpod::range::evrange::NanoRange::try_from(&range).map_err(|_| Error::BadTimeRange)?;
let ret = crate::channelconfig::channel_config(range, channel, pgqueue, ncc).await?;
Ok(ret)
} else {
Err(Error::with_msg_no_trace(
"find_config_basics_quorum not supported backend",
))
Err(Error::UnsupportedBackend)
}
}

View File

@@ -53,6 +53,15 @@ macro_rules! warn_item {
};
}
#[allow(unused)]
macro_rules! trace_every_event {
($($arg:tt)*) => {
if false {
trace!($($arg)*);
}
};
}
#[derive(Debug, Clone)]
pub struct EventReadOpts {
pub with_values: bool,
@@ -433,7 +442,7 @@ impl Stream for EventsStreamRt {
use items_2::merger::Mergeable;
trace_fetch!("ReadingBck FetchEvents got len {}", x.len());
for ts in Mergeable::tss(&x) {
trace_fetch!("ReadingBck FetchEvents ts {}", ts.fmt());
trace_every_event!("ReadingBck FetchEvents ts {}", ts.fmt());
}
if let Some(ix) = Mergeable::find_highest_index_lt(&x, self.range.beg().ns()) {
trace_fetch!("ReadingBck FetchEvents find_highest_index_lt {:?}", ix);
@@ -480,10 +489,10 @@ impl Stream for EventsStreamRt {
},
ReadingState::FetchEvents(st2) => match st2.fut.poll_unpin(cx) {
Ready(Ok(x)) => {
use items_2::merger::Mergeable;
trace_fetch!("ReadingFwd FetchEvents got len {:?}", x.len());
for ts_ns in x.tss() {
let ts = TsNano::from_ns(*ts_ns).to_ts_ms();
trace_fetch!("ReadingFwd FetchEvents ts {}", ts.fmt());
for ts in Mergeable::tss(&x) {
trace_every_event!("ReadingFwd FetchEvents ts {}", ts.fmt());
}
self.out.push_back(x);
self.setup_fwd_read();

View File

@@ -4,12 +4,19 @@ use crate::firsterr::non_empty;
use crate::firsterr::only_first_err;
use crate::plaineventsstream::dyn_events_stream;
use crate::tcprawclient::OpenBoxedBytesStreamsBox;
use err::Error;
use err::thiserror;
use err::ThisError;
use netpod::log::*;
use netpod::ChannelTypeConfigGen;
use netpod::ReqCtx;
use query::api4::events::PlainEventsQuery;
#[derive(Debug, ThisError)]
#[cstm(name = "PlainEventsCbor")]
pub enum Error {
Stream(#[from] crate::plaineventsstream::Error),
}
pub async fn plain_events_cbor_stream(
evq: &PlainEventsQuery,
ch_conf: ChannelTypeConfigGen,

View File

@@ -5,7 +5,8 @@ use crate::json_stream::events_stream_to_json_stream;
use crate::json_stream::JsonStream;
use crate::plaineventsstream::dyn_events_stream;
use crate::tcprawclient::OpenBoxedBytesStreamsBox;
use err::Error;
use err::thiserror;
use err::ThisError;
use futures_util::StreamExt;
use items_0::collect_s::Collectable;
use items_0::on_sitemty_data;
@@ -17,6 +18,14 @@ use query::api4::events::PlainEventsQuery;
use serde_json::Value as JsonValue;
use std::time::Instant;
#[derive(Debug, ThisError)]
#[cstm(name = "PlainEventsJson")]
pub enum Error {
Stream(#[from] crate::plaineventsstream::Error),
Collect(err::Error),
Json(#[from] serde_json::Error),
}
pub async fn plain_events_json(
evq: &PlainEventsQuery,
ch_conf: ChannelTypeConfigGen,
@@ -49,7 +58,8 @@ pub async fn plain_events_json(
Some(evq.range().clone()),
None,
)
.await?;
.await
.map_err(Error::Collect)?;
debug!("plain_events_json collected");
let jsval = serde_json::to_value(&collected)?;
debug!("plain_events_json json serialized");

View File

@@ -2,7 +2,8 @@ use crate::tcprawclient::container_stream_from_bytes_stream;
use crate::tcprawclient::make_sub_query;
use crate::tcprawclient::OpenBoxedBytesStreamsBox;
use crate::transform::build_merged_event_transform;
use err::Error;
use err::thiserror;
use err::ThisError;
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::on_sitemty_data;
@@ -18,6 +19,12 @@ use netpod::ReqCtx;
use query::api4::events::PlainEventsQuery;
use std::pin::Pin;
#[derive(Debug, ThisError)]
#[cstm(name = "PlainEventsStream")]
pub enum Error {
OtherErr(#[from] err::Error),
}
pub type DynEventsStream = Pin<Box<dyn Stream<Item = Sitemty<Box<dyn Events>>> + Send>>;
pub async fn dyn_events_stream(
@@ -86,9 +93,9 @@ async fn transform_wasm<INP>(
stream: INP,
_wasmname: &str,
_ctx: &ReqCtx,
) -> Result<impl Stream<Item = Result<StreamItem<RangeCompletableItem<Box<dyn Events>>>, Error>> + Send, Error>
) -> Result<impl Stream<Item = Result<StreamItem<RangeCompletableItem<Box<dyn Events>>>, err::Error>> + Send, err::Error>
where
INP: Stream<Item = Result<StreamItem<RangeCompletableItem<Box<dyn Events>>>, Error>> + Send + 'static,
INP: Stream<Item = Result<StreamItem<RangeCompletableItem<Box<dyn Events>>>, err::Error>> + Send + 'static,
{
let ret: Pin<Box<dyn Stream<Item = Sitemty<Box<dyn Events>>> + Send>> = Box::pin(stream);
Ok(ret)