From 9fd27332c3b46a465a66e71454ce713582e37fc6 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 5 Nov 2024 14:53:13 +0100 Subject: [PATCH] WIP typechecks --- crates/daqbuffer/Cargo.toml | 2 +- crates/httpret/src/api4/binned.rs | 7 ++--- crates/httpret/src/api4/events.rs | 25 ++++++++-------- crates/httpret/src/channelconfig.rs | 44 +++++++++++++++++++++-------- crates/httpret/src/download.rs | 6 ++-- crates/httpret/src/err.rs | 3 ++ crates/httpret/src/httpret.rs | 2 ++ crates/httpret/src/pulsemap.rs | 10 +++---- crates/nodenet/src/conn/test.rs | 2 +- crates/streams/src/cbor_stream.rs | 5 ++-- 10 files changed, 67 insertions(+), 39 deletions(-) diff --git a/crates/daqbuffer/Cargo.toml b/crates/daqbuffer/Cargo.toml index a71a2f9..296796f 100644 --- a/crates/daqbuffer/Cargo.toml +++ b/crates/daqbuffer/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqbuffer" -version = "0.5.4" +version = "0.5.5-aa.0" authors = ["Dominik Werder "] edition = "2021" diff --git a/crates/httpret/src/api4/binned.rs b/crates/httpret/src/api4/binned.rs index b1c4728..bf2b806 100644 --- a/crates/httpret/src/api4/binned.rs +++ b/crates/httpret/src/api4/binned.rs @@ -56,6 +56,7 @@ pub enum Error { EventsJson(#[from] streams::plaineventsjson::Error), ServerError, BinnedStream(::err::Error), + TimebinnedJson(#[from] streams::timebinnedjson::Error), } impl From for Error { @@ -212,8 +213,7 @@ async fn binned_json_single( make_read_provider(ch_conf.name(), scyqueue, open_bytes, ctx, ncc); let item = streams::timebinnedjson::timebinned_json(query, ch_conf, ctx, cache_read_provider, events_read_provider) .instrument(span1) - .await - .map_err(|e| Error::BinnedStream(e))?; + .await?; match item { CollectResult::Some(item) => { let ret = response(StatusCode::OK) @@ -269,8 +269,7 @@ async fn binned_json_framed( let stream = streams::timebinnedjson::timebinned_json_framed(query, ch_conf, ctx, cache_read_provider, events_read_provider) .instrument(span1) - .await - .map_err(|e| Error::BinnedStream(e))?; + .await?; let stream = bytes_chunks_to_len_framed_str(stream); let ret = response(StatusCode::OK) .header(CONTENT_TYPE, APP_JSON_FRAMED) diff --git a/crates/httpret/src/api4/events.rs b/crates/httpret/src/api4/events.rs index 816a574..a3c2f47 100644 --- a/crates/httpret/src/api4/events.rs +++ b/crates/httpret/src/api4/events.rs @@ -124,8 +124,7 @@ impl EventsHandler { } let self_name = "handle"; let url = req_uri_to_url(req.uri())?; - let evq = - PlainEventsQuery::from_url(&url).map_err(|e| e.add_public_msg(format!("Can not understand query")))?; + let evq = PlainEventsQuery::from_url(&url)?; debug!("{self_name} evq {evq:?}"); let logspan = if evq.log_level() == "trace" { trace!("enable trace for handler"); @@ -260,10 +259,11 @@ async fn plain_events_json( } } -fn bytes_chunks_to_framed(stream: S) -> impl Stream> +fn bytes_chunks_to_framed(stream: S) -> impl Stream> where - S: Stream>, + S: Stream>, T: Into, + E: std::error::Error, { use future::ready; stream @@ -279,21 +279,22 @@ where b2.put_slice(&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]); let mut b3 = BytesMut::with_capacity(16); b3.put_slice(&[0, 0, 0, 0, 0, 0, 0, 0][..pad]); - stream::iter([Ok::<_, crate::err::Error>(b2.freeze()), Ok(buf), Ok(b3.freeze())]) + stream::iter([Ok(b2.freeze()), Ok(buf), Ok(b3.freeze())]) } Err(e) => { - let e = crate::err::Error::with_msg_no_trace(e.to_string()); - stream::iter([Err(e), Ok(Bytes::new()), Ok(Bytes::new())]) + error!("{e}"); + stream::iter([Ok(Bytes::new()), Ok(Bytes::new()), Ok(Bytes::new())]) } }) .filter(|x| if let Ok(x) = x { ready(x.len() > 0) } else { ready(true) }) } // TODO move this, it's also used by binned. -pub fn bytes_chunks_to_len_framed_str(stream: S) -> impl Stream> +pub fn bytes_chunks_to_len_framed_str(stream: S) -> impl Stream> where - S: Stream>, + S: Stream>, T: Into, + E: std::error::Error, { use future::ready; stream @@ -303,11 +304,11 @@ where let s = y.into(); let mut b2 = String::with_capacity(16); write!(b2, "{:15}\n", s.len()).unwrap(); - stream::iter([Ok::<_, crate::err::Error>(b2), Ok(s), Ok(String::from("\n"))]) + stream::iter([Ok::<_, E>(b2), Ok(s), Ok(String::from("\n"))]) } Err(e) => { - let e = crate::err::Error::with_msg_no_trace(e.to_string()); - stream::iter([Err(e), Ok(String::new()), Ok(String::new())]) + error!("{e}"); + stream::iter([Ok(String::new()), Ok(String::new()), Ok(String::new())]) } }) .filter(|x| if let Ok(x) = x { ready(x.len() > 0) } else { ready(true) }) diff --git a/crates/httpret/src/channelconfig.rs b/crates/httpret/src/channelconfig.rs index 39a2d9c..4954091 100644 --- a/crates/httpret/src/channelconfig.rs +++ b/crates/httpret/src/channelconfig.rs @@ -54,6 +54,7 @@ pub enum Error { MissingShape, MissingShapeKind, MissingEdge, + MissingTimerange, Uri(netpod::UriError), ChannelConfigQuery(err::Error), ExpectScyllaBackend, @@ -64,6 +65,7 @@ pub enum Error { PgWorker(dbconn::worker::Error), Async(netpod::AsyncChannelError), ChannelConfig(dbconn::channelconfig::Error), + Netpod(netpod::NetpodError), } impl fmt::Display for Error { @@ -82,6 +84,7 @@ impl fmt::Display for Error { Error::MissingShape => write!(fmt, "MissingShape")?, Error::MissingShapeKind => write!(fmt, "MissingShapeKind")?, Error::MissingEdge => write!(fmt, "MissingEdge")?, + Error::MissingTimerange => write!(fmt, "MissingTimerange")?, Error::Uri(x) => write!(fmt, "Uri({x})")?, Error::ChannelConfigQuery(e) => write!(fmt, "ChannelConfigQuery({e})")?, Error::ExpectScyllaBackend => write!(fmt, "ExpectScyllaBackend")?, @@ -92,6 +95,7 @@ impl fmt::Display for Error { Error::PgWorker(e) => write!(fmt, "PgWorker({e})")?, Error::Async(e) => write!(fmt, "Async({e})")?, Error::ChannelConfig(e) => write!(fmt, "ChannelConfig({e})")?, + Error::Netpod(e) => write!(fmt, "Netpod({e})")?, } write!(fmt, ")")?; Ok(()) @@ -188,6 +192,12 @@ impl From for Error { } } +impl From for Error { + fn from(e: netpod::NetpodError) -> Self { + Self::Netpod(e) + } +} + impl From for crate::err::Error { fn from(e: Error) -> Self { Self::with_msg_no_trace(format!("{e} TODO add public message")) @@ -296,7 +306,7 @@ impl ChannelConfigHandler { node_config: &NodeConfigCached, ) -> Result { let url = req_uri_to_url(req.uri())?; - let q = ChannelConfigQuery::from_url(&url).map_err(|e| Error::BadQuery(e))?; + let q = ChannelConfigQuery::from_url(&url)?; let conf = nodenet::channelconfig::channel_config(q.range.clone(), q.channel.clone(), pgqueue, node_config).await?; match conf { @@ -354,7 +364,7 @@ impl ChannelConfigsHandler { async fn channel_configs(&self, req: Requ, ncc: &NodeConfigCached) -> Result { info!("channel_configs"); let url = req_uri_to_url(req.uri())?; - let q = ChannelConfigQuery::from_url(&url).map_err(|e| Error::BadQuery(e))?; + let q = ChannelConfigQuery::from_url(&url)?; info!("channel_configs for q {q:?}"); let ch_confs = nodenet::channelconfig::channel_configs(q.channel, ncc).await?; let ret = response(StatusCode::OK) @@ -413,7 +423,7 @@ impl ChannelConfigQuorumHandler { ) -> Result { info!("channel_config_quorum"); let url = req_uri_to_url(req.uri())?; - let q = ChannelConfigQuery::from_url(&url).map_err(|e| Error::ChannelConfigQuery(e))?; + let q = ChannelConfigQuery::from_url(&url)?; info!("channel_config_quorum for q {q:?}"); let ch_confs = nodenet::configquorum::find_config_basics_quorum(q.channel, q.range.into(), ctx, pgqueue, ncc).await?; @@ -436,12 +446,14 @@ pub struct ChannelsWithTypeQuery { } impl FromUrl for ChannelsWithTypeQuery { - fn from_url(url: &Url) -> Result { + type Error = err::Error; + + fn from_url(url: &Url) -> Result { let pairs = get_url_query_pairs(url); Self::from_pairs(&pairs) } - fn from_pairs(pairs: &BTreeMap) -> Result { + fn from_pairs(pairs: &BTreeMap) -> Result { let s = pairs .get("scalar_type") .ok_or_else(|| err::Error::with_public_msg_no_trace("missing scalar_type"))?; @@ -472,6 +484,8 @@ fn bool_false(x: &bool) -> bool { } impl FromUrl for ScyllaChannelEventSeriesIdQuery { + type Error = err::Error; + fn from_url(url: &Url) -> Result { let pairs = get_url_query_pairs(url); Self::from_pairs(&pairs) @@ -521,6 +535,8 @@ pub struct ScyllaChannelsActiveQuery { } impl FromUrl for ScyllaChannelsActiveQuery { + type Error = err::Error; + fn from_url(url: &Url) -> Result { let pairs = get_url_query_pairs(url); Self::from_pairs(&pairs) @@ -627,12 +643,14 @@ pub struct IocForChannelQuery { } impl FromUrl for IocForChannelQuery { - fn from_url(url: &Url) -> Result { + type Error = err::Error; + + fn from_url(url: &Url) -> Result { let pairs = get_url_query_pairs(url); Self::from_pairs(&pairs) } - fn from_pairs(pairs: &BTreeMap) -> Result { + fn from_pairs(pairs: &BTreeMap) -> Result { let backend = pairs .get("backend") .ok_or_else(|| err::Error::with_public_msg_no_trace("missing backend"))? @@ -719,19 +737,21 @@ pub struct ScyllaSeriesTsMspQuery { } impl FromUrl for ScyllaSeriesTsMspQuery { - fn from_url(url: &Url) -> Result { + type Error = Error; + + fn from_url(url: &Url) -> Result { let pairs = get_url_query_pairs(url); Self::from_pairs(&pairs) } - fn from_pairs(pairs: &BTreeMap) -> Result { + fn from_pairs(pairs: &BTreeMap) -> Result { let channel = SfDbChannel::from_pairs(pairs)?; let range = if let Ok(x) = TimeRangeQuery::from_pairs(pairs) { SeriesRange::TimeRange(x.into()) } else if let Ok(x) = PulseRangeQuery::from_pairs(pairs) { SeriesRange::PulseRange(x.into()) } else { - return Err(err::Error::with_public_msg_no_trace("no time range in url")); + return Err(Error::MissingTimerange); }; Ok(Self { channel, range }) } @@ -769,7 +789,7 @@ impl ScyllaSeriesTsMsp { .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); if accept == APP_JSON || accept == ACCEPT_ALL { let url = req_uri_to_url(req.uri())?; - let q = ScyllaSeriesTsMspQuery::from_url(&url).map_err(|e| Error::BadQuery(e))?; + let q = ScyllaSeriesTsMspQuery::from_url(&url)?; match self.get_ts_msps(&q, shared_res).await { Ok(k) => { let body = ToJsonBody::from(&k).into_body(); @@ -906,7 +926,7 @@ impl AmbigiousChannelNames { let g = AmbigiousChannel { series: row.get::<_, i64>(0) as u64, name: row.get(1), - scalar_type: ScalarType::from_scylla_i32(row.get(2)).map_err(other_err_error)?, + scalar_type: ScalarType::from_scylla_i32(row.get(2))?, shape: Shape::from_scylla_shape_dims(&row.get::<_, Vec>(3)).map_err(other_err_error)?, }; ret.ambigious.push(g); diff --git a/crates/httpret/src/download.rs b/crates/httpret/src/download.rs index e72d73e..c244a9b 100644 --- a/crates/httpret/src/download.rs +++ b/crates/httpret/src/download.rs @@ -21,12 +21,14 @@ pub struct DownloadQuery { } impl FromUrl for DownloadQuery { - fn from_url(url: &Url) -> Result { + type Error = crate::err::Error; + + fn from_url(url: &Url) -> Result { let pairs = get_url_query_pairs(url); Self::from_pairs(&pairs) } - fn from_pairs(pairs: &std::collections::BTreeMap) -> Result { + fn from_pairs(pairs: &std::collections::BTreeMap) -> Result { let read_sys = pairs .get("ReadSys") .map(|x| x.as_str().into()) diff --git a/crates/httpret/src/err.rs b/crates/httpret/src/err.rs index 1ff3e8e..537ab33 100644 --- a/crates/httpret/src/err.rs +++ b/crates/httpret/src/err.rs @@ -110,3 +110,6 @@ impl Convable for httpclient::Error {} impl Convable for netpod::UriError {} impl Convable for nodenet::configquorum::Error {} impl Convable for nodenet::channelconfig::Error {} +impl Convable for query::api4::Error {} +impl Convable for query::api4::events::Error {} +impl Convable for netpod::NetpodError {} diff --git a/crates/httpret/src/httpret.rs b/crates/httpret/src/httpret.rs index 15768e2..34da205 100644 --- a/crates/httpret/src/httpret.rs +++ b/crates/httpret/src/httpret.rs @@ -76,6 +76,8 @@ pub enum RetrievalError { Fmt(#[from] std::fmt::Error), #[serde(skip)] Url(#[from] url::ParseError), + #[serde(skip)] + Netpod(#[from] netpod::NetpodError), } trait IntoBoxedError: std::error::Error {} diff --git a/crates/httpret/src/pulsemap.rs b/crates/httpret/src/pulsemap.rs index 4b02de2..d8e94ed 100644 --- a/crates/httpret/src/pulsemap.rs +++ b/crates/httpret/src/pulsemap.rs @@ -849,7 +849,9 @@ impl HasTimeout for MapPulseQuery { } impl FromUrl for MapPulseQuery { - fn from_url(url: &url::Url) -> Result { + type Error = Error; + + fn from_url(url: &url::Url) -> Result { let mut pit = url .path_segments() .ok_or_else(|| Error::with_msg_no_trace(format!("no path in url {url}")))? @@ -870,10 +872,8 @@ impl FromUrl for MapPulseQuery { Ok(ret) } - fn from_pairs(_pairs: &BTreeMap) -> Result { - Err(err::Error::with_msg_no_trace(format!( - "can not only construct from pairs" - ))) + fn from_pairs(_pairs: &BTreeMap) -> Result { + Err(Error::with_msg_no_trace(format!("can not only construct from pairs"))) } } diff --git a/crates/nodenet/src/conn/test.rs b/crates/nodenet/src/conn/test.rs index 33e53b2..a808fc9 100644 --- a/crates/nodenet/src/conn/test.rs +++ b/crates/nodenet/src/conn/test.rs @@ -89,7 +89,7 @@ fn raw_data_00() { let qu = EventsSubQuery::from_parts(select, settings, "dummy".into(), log_level); let frame1 = Frame1Parts::new(qu.clone()); let query = EventQueryJsonStringFrame(serde_json::to_string(&frame1).unwrap()); - let frame = sitem_data(query).make_frame_dyn()?; + let frame = sitem_data(query).make_frame_dyn().map_err(Error::from_string)?; let scyqueue = err::todoval(); let jh = taskrun::spawn(events_conn_handler(client, addr, scyqueue, cfg)); con.write_all(&frame).await.unwrap(); diff --git a/crates/streams/src/cbor_stream.rs b/crates/streams/src/cbor_stream.rs index 678e942..78feab8 100644 --- a/crates/streams/src/cbor_stream.rs +++ b/crates/streams/src/cbor_stream.rs @@ -281,9 +281,10 @@ impl FramedBytesToSitemtyDynEventsStream { } } -impl Stream for FramedBytesToSitemtyDynEventsStream +impl Stream for FramedBytesToSitemtyDynEventsStream where - S: Stream> + Unpin, + S: Stream> + Unpin, + E: std::error::Error, { type Item = ::Item;