From 477ceb94020d974ee41a7c4fccce9d1f7ebfe251 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Sun, 8 Dec 2024 07:40:19 +0100 Subject: [PATCH] Adapt to changed errors --- crates/daqbuffer/src/bin/daqbuffer.rs | 10 +++---- crates/daqbufp2/src/daqbufp2.rs | 2 +- crates/daqbufp2/src/test/api1.rs | 6 +++- crates/httpret/src/api1.rs | 2 +- crates/httpret/src/api4/binned.rs | 8 ++--- crates/httpret/src/channelconfig.rs | 42 +++++++++++---------------- crates/httpret/src/err.rs | 2 +- crates/httpret/src/httpret.rs | 2 +- crates/httpret/src/proxy/api4.rs | 5 ++-- crates/nodenet/src/conn.rs | 1 + 10 files changed, 39 insertions(+), 41 deletions(-) diff --git a/crates/daqbuffer/src/bin/daqbuffer.rs b/crates/daqbuffer/src/bin/daqbuffer.rs index 021850f..a0715fd 100644 --- a/crates/daqbuffer/src/bin/daqbuffer.rs +++ b/crates/daqbuffer/src/bin/daqbuffer.rs @@ -81,13 +81,13 @@ async fn go() -> Result<(), Error> { config_file.read_to_end(&mut buf).await?; if let Ok(cfg) = serde_json::from_slice::(b"nothing") { info!("Parsed json config from {}", subcmd.config); - let cfg: Result = cfg.into(); - let cfg = cfg?; + let cfg: Result = cfg.into(); + let cfg = cfg.map_err(Error::from_string)?; daqbufp2::run_node(cfg, service_version).await?; } else if let Ok(cfg) = serde_yaml::from_slice::(&buf) { info!("Parsed yaml config from {}", subcmd.config); - let cfg: Result = cfg.into(); - let cfg = cfg?; + let cfg: Result = cfg.into(); + let cfg = cfg.map_err(Error::from_string)?; daqbufp2::run_node(cfg, service_version).await?; } else { return Err(Error::with_msg_no_trace(format!( @@ -114,7 +114,7 @@ async fn go() -> Result<(), Error> { ClientType::Binned(opts) => { let beg = parse_ts(&opts.beg)?; let end = parse_ts(&opts.end)?; - let cache_usage = CacheUsage::from_string(&opts.cache)?; + let cache_usage = CacheUsage::from_string(&opts.cache).map_err(Error::from_string)?; daqbufp2::client::get_binned( opts.host, opts.port, diff --git a/crates/daqbufp2/src/daqbufp2.rs b/crates/daqbufp2/src/daqbufp2.rs index f3a82f2..2fd759c 100644 --- a/crates/daqbufp2/src/daqbufp2.rs +++ b/crates/daqbufp2/src/daqbufp2.rs @@ -26,7 +26,7 @@ pub fn spawn_test_hosts(cluster: Cluster) -> Vec>> cluster: cluster.clone(), name: format!("{}:{}", node.host, node.port), }; - let node_config: Result = node_config.into(); + let node_config: Result = node_config.into(); let node_config = node_config.unwrap(); let h = tokio::spawn(httpret::host(node_config, service_version.clone()).map_err(Error::from)); ret.push(h); diff --git a/crates/daqbufp2/src/test/api1.rs b/crates/daqbufp2/src/test/api1.rs index f777426..3aeb1a7 100644 --- a/crates/daqbufp2/src/test/api1.rs +++ b/crates/daqbufp2/src/test/api1.rs @@ -61,7 +61,11 @@ fn events_f64_plain() -> Result<(), Error> { let node = &cluster.nodes[0]; let url: Url = format!("http://{}:{}/api/1/query", node.host, node.port).parse()?; let accept = APP_OCTET; - let range = Api1Range::new("1970-01-01T00:00:00Z".try_into()?, "1970-01-01T00:01:00Z".try_into()?)?; + let range = Api1Range::new( + "1970-01-01T00:00:00Z".try_into().map_err(Error::from_string)?, + "1970-01-01T00:01:00Z".try_into().map_err(Error::from_string)?, + ) + .map_err(Error::from_string)?; // TODO the channel list needs to get pre-processed to check for backend prefix! let ch = ChannelTuple::new(TEST_BACKEND.into(), "test-gen-i32-dim0-v01".into()); let qu = Api1Query::new(range, vec![ch]); diff --git a/crates/httpret/src/api1.rs b/crates/httpret/src/api1.rs index a04f2cb..0efd9e0 100644 --- a/crates/httpret/src/api1.rs +++ b/crates/httpret/src/api1.rs @@ -828,7 +828,7 @@ impl Stream for DataApiPython3DataStream { self.current_fetch_info = None; self.data_done = true; let mut sb = crate::status_board().unwrap(); - sb.add_error(self.ctx.reqid_this(), e.0.clone()); + sb.add_error(self.ctx.reqid_this(), e.0.to_string()); Ready(Some(Err(e))) } }, diff --git a/crates/httpret/src/api4/binned.rs b/crates/httpret/src/api4/binned.rs index 5850328..a0ce87e 100644 --- a/crates/httpret/src/api4/binned.rs +++ b/crates/httpret/src/api4/binned.rs @@ -156,25 +156,25 @@ fn make_read_provider( .clone() .map(|qu| ScyllaEventReadProvider::new(qu)) .map(|x| Arc::new(x) as Arc) - .expect("expect scylla queue") + .expect("scylla queue") } else if ncc.node.sf_databuffer.is_some() { // TODO do not clone the request. Pass an Arc up to here. let x = SfDatabufferEventReadProvider::new(Arc::new(ctx.clone()), open_bytes); Arc::new(x) } else { - panic!() + panic!("unexpected backend") }; let cache_read_provider = if ncc.node_config.cluster.scylla_lt().is_some() { scyqueue .clone() .map(|qu| ScyllaCacheReadProvider::new(qu)) .map(|x| Arc::new(x) as Arc) - .expect("expect scylla queue") + .expect("scylla queue") } else if ncc.node.sf_databuffer.is_some() { let x = DummyCacheReadProvider::new(); Arc::new(x) } else { - panic!() + panic!("unexpected backend") }; (events_read_provider, cache_read_provider) } diff --git a/crates/httpret/src/channelconfig.rs b/crates/httpret/src/channelconfig.rs index 2c4ed09..3063707 100644 --- a/crates/httpret/src/channelconfig.rs +++ b/crates/httpret/src/channelconfig.rs @@ -57,6 +57,7 @@ pub enum Error { MissingShapeKind, MissingEdge, MissingTimerange, + MissingChannelName, Uri(#[from] netpod::UriError), ChannelConfigQuery(daqbuf_err::Error), ExpectScyllaBackend, @@ -67,7 +68,7 @@ pub enum Error { PgWorker(#[from] dbconn::worker::Error), Async(#[from] netpod::AsyncChannelError), ChannelConfig(#[from] dbconn::channelconfig::Error), - Netpod(#[from] netpod::NetpodError), + Netpod(#[from] netpod::Error), ScyllaQuery(#[from] scyllaconn::scylla::transport::errors::QueryError), ScyllaTypeCheck(#[from] scyllaconn::scylla::deserialize::TypeCheckError), } @@ -330,7 +331,7 @@ pub struct ChannelsWithTypeQuery { } impl FromUrl for ChannelsWithTypeQuery { - type Error = daqbuf_err::Error; + type Error = Error; fn from_url(url: &Url) -> Result { let pairs = get_url_query_pairs(url); @@ -338,14 +339,11 @@ impl FromUrl for ChannelsWithTypeQuery { } fn from_pairs(pairs: &BTreeMap) -> Result { - let s = pairs - .get("scalar_type") - .ok_or_else(|| daqbuf_err::Error::with_public_msg_no_trace("missing scalar_type"))?; + let s = pairs.get("scalar_type").ok_or_else(|| Error::MissingScalarType)?; //let scalar_type = ScalarType::from_bsread_str(s)?; - let scalar_type: ScalarType = serde_json::from_str(&format!("\"{s}\""))?; - let s = pairs - .get("shape") - .ok_or_else(|| daqbuf_err::Error::with_public_msg_no_trace("missing shape"))?; + let scalar_type: ScalarType = + serde_json::from_str(&format!("\"{s}\"")).map_err(|_| Error::MissingScalarType)?; + let s = pairs.get("shape").ok_or_else(|| Error::MissingShape)?; let shape = Shape::from_dims_str(s)?; Ok(Self { scalar_type, shape }) } @@ -368,29 +366,23 @@ fn bool_false(x: &bool) -> bool { } impl FromUrl for ScyllaChannelEventSeriesIdQuery { - type Error = daqbuf_err::Error; + type Error = Error; - fn from_url(url: &Url) -> Result { + fn from_url(url: &Url) -> Result { let pairs = get_url_query_pairs(url); Self::from_pairs(&pairs) } - fn from_pairs(pairs: &BTreeMap) -> Result { - let backend = pairs - .get("backend") - .ok_or_else(|| daqbuf_err::Error::with_public_msg_no_trace("missing backend"))? - .into(); + fn from_pairs(pairs: &BTreeMap) -> Result { + let backend = pairs.get("backend").ok_or_else(|| Error::MissingBackend)?.into(); let name = pairs .get("channelName") - .ok_or_else(|| daqbuf_err::Error::with_public_msg_no_trace("missing channelName"))? + .ok_or_else(|| Error::MissingChannelName)? .into(); - let s = pairs - .get("scalarType") - .ok_or_else(|| daqbuf_err::Error::with_public_msg_no_trace("missing scalarType"))?; - let scalar_type: ScalarType = serde_json::from_str(&format!("\"{s}\""))?; - let s = pairs - .get("shape") - .ok_or_else(|| daqbuf_err::Error::with_public_msg_no_trace("missing shape"))?; + let s = pairs.get("scalarType").ok_or_else(|| Error::MissingScalarType)?; + let scalar_type: ScalarType = + serde_json::from_str(&format!("\"{s}\"")).map_err(|_| Error::MissingScalarType)?; + let s = pairs.get("shape").ok_or_else(|| Error::MissingShape)?; let shape = Shape::from_dims_str(s)?; let do_create = pairs.get("doCreate").map_or("false", |x| x.as_str()) == "true"; Ok(Self { @@ -809,7 +801,7 @@ impl AmbigiousChannelNames { series: row.get::<_, i64>(0) as u64, name: row.get(1), scalar_type: ScalarType::from_scylla_i32(row.get(2))?, - shape: Shape::from_scylla_shape_dims(&row.get::<_, Vec>(3)).map_err(other_err_error)?, + shape: Shape::from_scylla_shape_dims(&row.get::<_, Vec>(3))?, }; ret.ambigious.push(g); } diff --git a/crates/httpret/src/err.rs b/crates/httpret/src/err.rs index 840f1c1..58eb12c 100644 --- a/crates/httpret/src/err.rs +++ b/crates/httpret/src/err.rs @@ -113,4 +113,4 @@ impl Convable for nodenet::configquorum::Error {} impl Convable for nodenet::channelconfig::Error {} impl Convable for query::api4::Error {} impl Convable for query::api4::events::Error {} -impl Convable for netpod::NetpodError {} +impl Convable for netpod::Error {} diff --git a/crates/httpret/src/httpret.rs b/crates/httpret/src/httpret.rs index 843a0ab..4a27f34 100644 --- a/crates/httpret/src/httpret.rs +++ b/crates/httpret/src/httpret.rs @@ -78,7 +78,7 @@ pub enum RetrievalError { #[serde(skip)] Url(#[from] url::ParseError), #[serde(skip)] - Netpod(#[from] netpod::NetpodError), + Netpod(#[from] netpod::Error), } trait IntoBoxedError: std::error::Error {} diff --git a/crates/httpret/src/proxy/api4.rs b/crates/httpret/src/proxy/api4.rs index 928c939..4c0a320 100644 --- a/crates/httpret/src/proxy/api4.rs +++ b/crates/httpret/src/proxy/api4.rs @@ -233,14 +233,15 @@ impl StatusNodesRecursive { for (tag, sr) in all { match sr { Ok(sr) => { - let s: Result = serde_json::from_value(sr.val).map_err(daqbuf_err::Error::from); + let s: Result = + serde_json::from_value(sr.val).map_err(|e| netpod::NodeStatusSubError::Msg(e.to_string())); let sub = NodeStatusSub { url: tag.0, status: s }; subs.push_back(sub); } Err(e) => { let sub = NodeStatusSub { url: tag.0, - status: Err(daqbuf_err::Error::from(e)), + status: Err(netpod::NodeStatusSubError::Msg(e.to_string())), }; subs.push_back(sub); } diff --git a/crates/nodenet/src/conn.rs b/crates/nodenet/src/conn.rs index 14ec307..7da06a4 100644 --- a/crates/nodenet/src/conn.rs +++ b/crates/nodenet/src/conn.rs @@ -56,6 +56,7 @@ pub enum Error { Frame(#[from] items_2::frame::Error), InMem(#[from] streams::frames::inmem::Error), FramedStream(#[from] streams::frames::Error), + Netpod(#[from] netpod::Error), } pub async fn events_service(ncc: NodeConfigCached) -> Result<(), Error> {