WIP typechecks

This commit is contained in:
Dominik Werder
2024-11-05 14:53:13 +01:00
parent ef021ff971
commit 9fd27332c3
10 changed files with 67 additions and 39 deletions

View File

@@ -1,6 +1,6 @@
[package]
name = "daqbuffer"
version = "0.5.4"
version = "0.5.5-aa.0"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"

View File

@@ -56,6 +56,7 @@ pub enum Error {
EventsJson(#[from] streams::plaineventsjson::Error),
ServerError,
BinnedStream(::err::Error),
TimebinnedJson(#[from] streams::timebinnedjson::Error),
}
impl From<crate::channelconfig::Error> for Error {
@@ -212,8 +213,7 @@ async fn binned_json_single(
make_read_provider(ch_conf.name(), scyqueue, open_bytes, ctx, ncc);
let item = streams::timebinnedjson::timebinned_json(query, ch_conf, ctx, cache_read_provider, events_read_provider)
.instrument(span1)
.await
.map_err(|e| Error::BinnedStream(e))?;
.await?;
match item {
CollectResult::Some(item) => {
let ret = response(StatusCode::OK)
@@ -269,8 +269,7 @@ async fn binned_json_framed(
let stream =
streams::timebinnedjson::timebinned_json_framed(query, ch_conf, ctx, cache_read_provider, events_read_provider)
.instrument(span1)
.await
.map_err(|e| Error::BinnedStream(e))?;
.await?;
let stream = bytes_chunks_to_len_framed_str(stream);
let ret = response(StatusCode::OK)
.header(CONTENT_TYPE, APP_JSON_FRAMED)

View File

@@ -124,8 +124,7 @@ impl EventsHandler {
}
let self_name = "handle";
let url = req_uri_to_url(req.uri())?;
let evq =
PlainEventsQuery::from_url(&url).map_err(|e| e.add_public_msg(format!("Can not understand query")))?;
let evq = PlainEventsQuery::from_url(&url)?;
debug!("{self_name} evq {evq:?}");
let logspan = if evq.log_level() == "trace" {
trace!("enable trace for handler");
@@ -260,10 +259,11 @@ async fn plain_events_json(
}
}
fn bytes_chunks_to_framed<S, T>(stream: S) -> impl Stream<Item = Result<Bytes, crate::err::Error>>
fn bytes_chunks_to_framed<S, T, E>(stream: S) -> impl Stream<Item = Result<Bytes, E>>
where
S: Stream<Item = Result<T, err::Error>>,
S: Stream<Item = Result<T, E>>,
T: Into<Bytes>,
E: std::error::Error,
{
use future::ready;
stream
@@ -279,21 +279,22 @@ where
b2.put_slice(&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]);
let mut b3 = BytesMut::with_capacity(16);
b3.put_slice(&[0, 0, 0, 0, 0, 0, 0, 0][..pad]);
stream::iter([Ok::<_, crate::err::Error>(b2.freeze()), Ok(buf), Ok(b3.freeze())])
stream::iter([Ok(b2.freeze()), Ok(buf), Ok(b3.freeze())])
}
Err(e) => {
let e = crate::err::Error::with_msg_no_trace(e.to_string());
stream::iter([Err(e), Ok(Bytes::new()), Ok(Bytes::new())])
error!("{e}");
stream::iter([Ok(Bytes::new()), Ok(Bytes::new()), Ok(Bytes::new())])
}
})
.filter(|x| if let Ok(x) = x { ready(x.len() > 0) } else { ready(true) })
}
// TODO move this, it's also used by binned.
pub fn bytes_chunks_to_len_framed_str<S, T>(stream: S) -> impl Stream<Item = Result<String, crate::err::Error>>
pub fn bytes_chunks_to_len_framed_str<S, T, E>(stream: S) -> impl Stream<Item = Result<String, E>>
where
S: Stream<Item = Result<T, ::err::Error>>,
S: Stream<Item = Result<T, E>>,
T: Into<String>,
E: std::error::Error,
{
use future::ready;
stream
@@ -303,11 +304,11 @@ where
let s = y.into();
let mut b2 = String::with_capacity(16);
write!(b2, "{:15}\n", s.len()).unwrap();
stream::iter([Ok::<_, crate::err::Error>(b2), Ok(s), Ok(String::from("\n"))])
stream::iter([Ok::<_, E>(b2), Ok(s), Ok(String::from("\n"))])
}
Err(e) => {
let e = crate::err::Error::with_msg_no_trace(e.to_string());
stream::iter([Err(e), Ok(String::new()), Ok(String::new())])
error!("{e}");
stream::iter([Ok(String::new()), Ok(String::new()), Ok(String::new())])
}
})
.filter(|x| if let Ok(x) = x { ready(x.len() > 0) } else { ready(true) })

View File

@@ -54,6 +54,7 @@ pub enum Error {
MissingShape,
MissingShapeKind,
MissingEdge,
MissingTimerange,
Uri(netpod::UriError),
ChannelConfigQuery(err::Error),
ExpectScyllaBackend,
@@ -64,6 +65,7 @@ pub enum Error {
PgWorker(dbconn::worker::Error),
Async(netpod::AsyncChannelError),
ChannelConfig(dbconn::channelconfig::Error),
Netpod(netpod::NetpodError),
}
impl fmt::Display for Error {
@@ -82,6 +84,7 @@ impl fmt::Display for Error {
Error::MissingShape => write!(fmt, "MissingShape")?,
Error::MissingShapeKind => write!(fmt, "MissingShapeKind")?,
Error::MissingEdge => write!(fmt, "MissingEdge")?,
Error::MissingTimerange => write!(fmt, "MissingTimerange")?,
Error::Uri(x) => write!(fmt, "Uri({x})")?,
Error::ChannelConfigQuery(e) => write!(fmt, "ChannelConfigQuery({e})")?,
Error::ExpectScyllaBackend => write!(fmt, "ExpectScyllaBackend")?,
@@ -92,6 +95,7 @@ impl fmt::Display for Error {
Error::PgWorker(e) => write!(fmt, "PgWorker({e})")?,
Error::Async(e) => write!(fmt, "Async({e})")?,
Error::ChannelConfig(e) => write!(fmt, "ChannelConfig({e})")?,
Error::Netpod(e) => write!(fmt, "Netpod({e})")?,
}
write!(fmt, ")")?;
Ok(())
@@ -188,6 +192,12 @@ impl From<dbconn::channelconfig::Error> for Error {
}
}
impl From<netpod::NetpodError> for Error {
fn from(e: netpod::NetpodError) -> Self {
Self::Netpod(e)
}
}
impl From<Error> for crate::err::Error {
fn from(e: Error) -> Self {
Self::with_msg_no_trace(format!("{e} TODO add public message"))
@@ -296,7 +306,7 @@ impl ChannelConfigHandler {
node_config: &NodeConfigCached,
) -> Result<StreamResponse, Error> {
let url = req_uri_to_url(req.uri())?;
let q = ChannelConfigQuery::from_url(&url).map_err(|e| Error::BadQuery(e))?;
let q = ChannelConfigQuery::from_url(&url)?;
let conf =
nodenet::channelconfig::channel_config(q.range.clone(), q.channel.clone(), pgqueue, node_config).await?;
match conf {
@@ -354,7 +364,7 @@ impl ChannelConfigsHandler {
async fn channel_configs(&self, req: Requ, ncc: &NodeConfigCached) -> Result<StreamResponse, Error> {
info!("channel_configs");
let url = req_uri_to_url(req.uri())?;
let q = ChannelConfigQuery::from_url(&url).map_err(|e| Error::BadQuery(e))?;
let q = ChannelConfigQuery::from_url(&url)?;
info!("channel_configs for q {q:?}");
let ch_confs = nodenet::channelconfig::channel_configs(q.channel, ncc).await?;
let ret = response(StatusCode::OK)
@@ -413,7 +423,7 @@ impl ChannelConfigQuorumHandler {
) -> Result<StreamResponse, Error> {
info!("channel_config_quorum");
let url = req_uri_to_url(req.uri())?;
let q = ChannelConfigQuery::from_url(&url).map_err(|e| Error::ChannelConfigQuery(e))?;
let q = ChannelConfigQuery::from_url(&url)?;
info!("channel_config_quorum for q {q:?}");
let ch_confs =
nodenet::configquorum::find_config_basics_quorum(q.channel, q.range.into(), ctx, pgqueue, ncc).await?;
@@ -436,12 +446,14 @@ pub struct ChannelsWithTypeQuery {
}
impl FromUrl for ChannelsWithTypeQuery {
fn from_url(url: &Url) -> Result<Self, err::Error> {
type Error = err::Error;
fn from_url(url: &Url) -> Result<Self, Self::Error> {
let pairs = get_url_query_pairs(url);
Self::from_pairs(&pairs)
}
fn from_pairs(pairs: &BTreeMap<String, String>) -> Result<Self, err::Error> {
fn from_pairs(pairs: &BTreeMap<String, String>) -> Result<Self, Self::Error> {
let s = pairs
.get("scalar_type")
.ok_or_else(|| err::Error::with_public_msg_no_trace("missing scalar_type"))?;
@@ -472,6 +484,8 @@ fn bool_false(x: &bool) -> bool {
}
impl FromUrl for ScyllaChannelEventSeriesIdQuery {
type Error = err::Error;
fn from_url(url: &Url) -> Result<Self, err::Error> {
let pairs = get_url_query_pairs(url);
Self::from_pairs(&pairs)
@@ -521,6 +535,8 @@ pub struct ScyllaChannelsActiveQuery {
}
impl FromUrl for ScyllaChannelsActiveQuery {
type Error = err::Error;
fn from_url(url: &Url) -> Result<Self, err::Error> {
let pairs = get_url_query_pairs(url);
Self::from_pairs(&pairs)
@@ -627,12 +643,14 @@ pub struct IocForChannelQuery {
}
impl FromUrl for IocForChannelQuery {
fn from_url(url: &Url) -> Result<Self, err::Error> {
type Error = err::Error;
fn from_url(url: &Url) -> Result<Self, Self::Error> {
let pairs = get_url_query_pairs(url);
Self::from_pairs(&pairs)
}
fn from_pairs(pairs: &BTreeMap<String, String>) -> Result<Self, err::Error> {
fn from_pairs(pairs: &BTreeMap<String, String>) -> Result<Self, Self::Error> {
let backend = pairs
.get("backend")
.ok_or_else(|| err::Error::with_public_msg_no_trace("missing backend"))?
@@ -719,19 +737,21 @@ pub struct ScyllaSeriesTsMspQuery {
}
impl FromUrl for ScyllaSeriesTsMspQuery {
fn from_url(url: &Url) -> Result<Self, err::Error> {
type Error = Error;
fn from_url(url: &Url) -> Result<Self, Self::Error> {
let pairs = get_url_query_pairs(url);
Self::from_pairs(&pairs)
}
fn from_pairs(pairs: &BTreeMap<String, String>) -> Result<Self, err::Error> {
fn from_pairs(pairs: &BTreeMap<String, String>) -> Result<Self, Self::Error> {
let channel = SfDbChannel::from_pairs(pairs)?;
let range = if let Ok(x) = TimeRangeQuery::from_pairs(pairs) {
SeriesRange::TimeRange(x.into())
} else if let Ok(x) = PulseRangeQuery::from_pairs(pairs) {
SeriesRange::PulseRange(x.into())
} else {
return Err(err::Error::with_public_msg_no_trace("no time range in url"));
return Err(Error::MissingTimerange);
};
Ok(Self { channel, range })
}
@@ -769,7 +789,7 @@ impl ScyllaSeriesTsMsp {
.map_or(accept_def, |k| k.to_str().unwrap_or(accept_def));
if accept == APP_JSON || accept == ACCEPT_ALL {
let url = req_uri_to_url(req.uri())?;
let q = ScyllaSeriesTsMspQuery::from_url(&url).map_err(|e| Error::BadQuery(e))?;
let q = ScyllaSeriesTsMspQuery::from_url(&url)?;
match self.get_ts_msps(&q, shared_res).await {
Ok(k) => {
let body = ToJsonBody::from(&k).into_body();
@@ -906,7 +926,7 @@ impl AmbigiousChannelNames {
let g = AmbigiousChannel {
series: row.get::<_, i64>(0) as u64,
name: row.get(1),
scalar_type: ScalarType::from_scylla_i32(row.get(2)).map_err(other_err_error)?,
scalar_type: ScalarType::from_scylla_i32(row.get(2))?,
shape: Shape::from_scylla_shape_dims(&row.get::<_, Vec<i32>>(3)).map_err(other_err_error)?,
};
ret.ambigious.push(g);

View File

@@ -21,12 +21,14 @@ pub struct DownloadQuery {
}
impl FromUrl for DownloadQuery {
fn from_url(url: &Url) -> Result<Self, ::err::Error> {
type Error = crate::err::Error;
fn from_url(url: &Url) -> Result<Self, Self::Error> {
let pairs = get_url_query_pairs(url);
Self::from_pairs(&pairs)
}
fn from_pairs(pairs: &std::collections::BTreeMap<String, String>) -> Result<Self, err::Error> {
fn from_pairs(pairs: &std::collections::BTreeMap<String, String>) -> Result<Self, Self::Error> {
let read_sys = pairs
.get("ReadSys")
.map(|x| x.as_str().into())

View File

@@ -110,3 +110,6 @@ impl Convable for httpclient::Error {}
impl Convable for netpod::UriError {}
impl Convable for nodenet::configquorum::Error {}
impl Convable for nodenet::channelconfig::Error {}
impl Convable for query::api4::Error {}
impl Convable for query::api4::events::Error {}
impl Convable for netpod::NetpodError {}

View File

@@ -76,6 +76,8 @@ pub enum RetrievalError {
Fmt(#[from] std::fmt::Error),
#[serde(skip)]
Url(#[from] url::ParseError),
#[serde(skip)]
Netpod(#[from] netpod::NetpodError),
}
trait IntoBoxedError: std::error::Error {}

View File

@@ -849,7 +849,9 @@ impl HasTimeout for MapPulseQuery {
}
impl FromUrl for MapPulseQuery {
fn from_url(url: &url::Url) -> Result<Self, err::Error> {
type Error = Error;
fn from_url(url: &url::Url) -> Result<Self, Self::Error> {
let mut pit = url
.path_segments()
.ok_or_else(|| Error::with_msg_no_trace(format!("no path in url {url}")))?
@@ -870,10 +872,8 @@ impl FromUrl for MapPulseQuery {
Ok(ret)
}
fn from_pairs(_pairs: &BTreeMap<String, String>) -> Result<Self, err::Error> {
Err(err::Error::with_msg_no_trace(format!(
"can not only construct from pairs"
)))
fn from_pairs(_pairs: &BTreeMap<String, String>) -> Result<Self, Self::Error> {
Err(Error::with_msg_no_trace(format!("can not only construct from pairs")))
}
}

View File

@@ -89,7 +89,7 @@ fn raw_data_00() {
let qu = EventsSubQuery::from_parts(select, settings, "dummy".into(), log_level);
let frame1 = Frame1Parts::new(qu.clone());
let query = EventQueryJsonStringFrame(serde_json::to_string(&frame1).unwrap());
let frame = sitem_data(query).make_frame_dyn()?;
let frame = sitem_data(query).make_frame_dyn().map_err(Error::from_string)?;
let scyqueue = err::todoval();
let jh = taskrun::spawn(events_conn_handler(client, addr, scyqueue, cfg));
con.write_all(&frame).await.unwrap();

View File

@@ -281,9 +281,10 @@ impl<S> FramedBytesToSitemtyDynEventsStream<S> {
}
}
impl<S> Stream for FramedBytesToSitemtyDynEventsStream<S>
impl<S, E> Stream for FramedBytesToSitemtyDynEventsStream<S>
where
S: Stream<Item = Result<Bytes, Error>> + Unpin,
S: Stream<Item = Result<Bytes, E>> + Unpin,
E: std::error::Error,
{
type Item = <SitemtyDynEventsStream as Stream>::Item;