Adapt to changed errors

This commit is contained in:
Dominik Werder
2024-12-08 07:40:19 +01:00
parent 162084d302
commit 477ceb9402
10 changed files with 39 additions and 41 deletions
+5 -5
View File
@@ -81,13 +81,13 @@ async fn go() -> Result<(), Error> {
config_file.read_to_end(&mut buf).await?; config_file.read_to_end(&mut buf).await?;
if let Ok(cfg) = serde_json::from_slice::<NodeConfig>(b"nothing") { if let Ok(cfg) = serde_json::from_slice::<NodeConfig>(b"nothing") {
info!("Parsed json config from {}", subcmd.config); info!("Parsed json config from {}", subcmd.config);
let cfg: Result<NodeConfigCached, Error> = cfg.into(); let cfg: Result<NodeConfigCached, netpod::Error> = cfg.into();
let cfg = cfg?; let cfg = cfg.map_err(Error::from_string)?;
daqbufp2::run_node(cfg, service_version).await?; daqbufp2::run_node(cfg, service_version).await?;
} else if let Ok(cfg) = serde_yaml::from_slice::<NodeConfig>(&buf) { } else if let Ok(cfg) = serde_yaml::from_slice::<NodeConfig>(&buf) {
info!("Parsed yaml config from {}", subcmd.config); info!("Parsed yaml config from {}", subcmd.config);
let cfg: Result<NodeConfigCached, Error> = cfg.into(); let cfg: Result<NodeConfigCached, netpod::Error> = cfg.into();
let cfg = cfg?; let cfg = cfg.map_err(Error::from_string)?;
daqbufp2::run_node(cfg, service_version).await?; daqbufp2::run_node(cfg, service_version).await?;
} else { } else {
return Err(Error::with_msg_no_trace(format!( return Err(Error::with_msg_no_trace(format!(
@@ -114,7 +114,7 @@ async fn go() -> Result<(), Error> {
ClientType::Binned(opts) => { ClientType::Binned(opts) => {
let beg = parse_ts(&opts.beg)?; let beg = parse_ts(&opts.beg)?;
let end = parse_ts(&opts.end)?; let end = parse_ts(&opts.end)?;
let cache_usage = CacheUsage::from_string(&opts.cache)?; let cache_usage = CacheUsage::from_string(&opts.cache).map_err(Error::from_string)?;
daqbufp2::client::get_binned( daqbufp2::client::get_binned(
opts.host, opts.host,
opts.port, opts.port,
+1 -1
View File
@@ -26,7 +26,7 @@ pub fn spawn_test_hosts(cluster: Cluster) -> Vec<JoinHandle<Result<(), Error>>>
cluster: cluster.clone(), cluster: cluster.clone(),
name: format!("{}:{}", node.host, node.port), name: format!("{}:{}", node.host, node.port),
}; };
let node_config: Result<NodeConfigCached, Error> = node_config.into(); let node_config: Result<NodeConfigCached, netpod::Error> = node_config.into();
let node_config = node_config.unwrap(); let node_config = node_config.unwrap();
let h = tokio::spawn(httpret::host(node_config, service_version.clone()).map_err(Error::from)); let h = tokio::spawn(httpret::host(node_config, service_version.clone()).map_err(Error::from));
ret.push(h); ret.push(h);
+5 -1
View File
@@ -61,7 +61,11 @@ fn events_f64_plain() -> Result<(), Error> {
let node = &cluster.nodes[0]; let node = &cluster.nodes[0];
let url: Url = format!("http://{}:{}/api/1/query", node.host, node.port).parse()?; let url: Url = format!("http://{}:{}/api/1/query", node.host, node.port).parse()?;
let accept = APP_OCTET; let accept = APP_OCTET;
let range = Api1Range::new("1970-01-01T00:00:00Z".try_into()?, "1970-01-01T00:01:00Z".try_into()?)?; let range = Api1Range::new(
"1970-01-01T00:00:00Z".try_into().map_err(Error::from_string)?,
"1970-01-01T00:01:00Z".try_into().map_err(Error::from_string)?,
)
.map_err(Error::from_string)?;
// TODO the channel list needs to get pre-processed to check for backend prefix! // TODO the channel list needs to get pre-processed to check for backend prefix!
let ch = ChannelTuple::new(TEST_BACKEND.into(), "test-gen-i32-dim0-v01".into()); let ch = ChannelTuple::new(TEST_BACKEND.into(), "test-gen-i32-dim0-v01".into());
let qu = Api1Query::new(range, vec![ch]); let qu = Api1Query::new(range, vec![ch]);
+1 -1
View File
@@ -828,7 +828,7 @@ impl Stream for DataApiPython3DataStream {
self.current_fetch_info = None; self.current_fetch_info = None;
self.data_done = true; self.data_done = true;
let mut sb = crate::status_board().unwrap(); let mut sb = crate::status_board().unwrap();
sb.add_error(self.ctx.reqid_this(), e.0.clone()); sb.add_error(self.ctx.reqid_this(), e.0.to_string());
Ready(Some(Err(e))) Ready(Some(Err(e)))
} }
}, },
+4 -4
View File
@@ -156,25 +156,25 @@ fn make_read_provider(
.clone() .clone()
.map(|qu| ScyllaEventReadProvider::new(qu)) .map(|qu| ScyllaEventReadProvider::new(qu))
.map(|x| Arc::new(x) as Arc<dyn EventsReadProvider>) .map(|x| Arc::new(x) as Arc<dyn EventsReadProvider>)
.expect("expect scylla queue") .expect("scylla queue")
} else if ncc.node.sf_databuffer.is_some() { } else if ncc.node.sf_databuffer.is_some() {
// TODO do not clone the request. Pass an Arc up to here. // TODO do not clone the request. Pass an Arc up to here.
let x = SfDatabufferEventReadProvider::new(Arc::new(ctx.clone()), open_bytes); let x = SfDatabufferEventReadProvider::new(Arc::new(ctx.clone()), open_bytes);
Arc::new(x) Arc::new(x)
} else { } else {
panic!() panic!("unexpected backend")
}; };
let cache_read_provider = if ncc.node_config.cluster.scylla_lt().is_some() { let cache_read_provider = if ncc.node_config.cluster.scylla_lt().is_some() {
scyqueue scyqueue
.clone() .clone()
.map(|qu| ScyllaCacheReadProvider::new(qu)) .map(|qu| ScyllaCacheReadProvider::new(qu))
.map(|x| Arc::new(x) as Arc<dyn CacheReadProvider>) .map(|x| Arc::new(x) as Arc<dyn CacheReadProvider>)
.expect("expect scylla queue") .expect("scylla queue")
} else if ncc.node.sf_databuffer.is_some() { } else if ncc.node.sf_databuffer.is_some() {
let x = DummyCacheReadProvider::new(); let x = DummyCacheReadProvider::new();
Arc::new(x) Arc::new(x)
} else { } else {
panic!() panic!("unexpected backend")
}; };
(events_read_provider, cache_read_provider) (events_read_provider, cache_read_provider)
} }
+17 -25
View File
@@ -57,6 +57,7 @@ pub enum Error {
MissingShapeKind, MissingShapeKind,
MissingEdge, MissingEdge,
MissingTimerange, MissingTimerange,
MissingChannelName,
Uri(#[from] netpod::UriError), Uri(#[from] netpod::UriError),
ChannelConfigQuery(daqbuf_err::Error), ChannelConfigQuery(daqbuf_err::Error),
ExpectScyllaBackend, ExpectScyllaBackend,
@@ -67,7 +68,7 @@ pub enum Error {
PgWorker(#[from] dbconn::worker::Error), PgWorker(#[from] dbconn::worker::Error),
Async(#[from] netpod::AsyncChannelError), Async(#[from] netpod::AsyncChannelError),
ChannelConfig(#[from] dbconn::channelconfig::Error), ChannelConfig(#[from] dbconn::channelconfig::Error),
Netpod(#[from] netpod::NetpodError), Netpod(#[from] netpod::Error),
ScyllaQuery(#[from] scyllaconn::scylla::transport::errors::QueryError), ScyllaQuery(#[from] scyllaconn::scylla::transport::errors::QueryError),
ScyllaTypeCheck(#[from] scyllaconn::scylla::deserialize::TypeCheckError), ScyllaTypeCheck(#[from] scyllaconn::scylla::deserialize::TypeCheckError),
} }
@@ -330,7 +331,7 @@ pub struct ChannelsWithTypeQuery {
} }
impl FromUrl for ChannelsWithTypeQuery { impl FromUrl for ChannelsWithTypeQuery {
type Error = daqbuf_err::Error; type Error = Error;
fn from_url(url: &Url) -> Result<Self, Self::Error> { fn from_url(url: &Url) -> Result<Self, Self::Error> {
let pairs = get_url_query_pairs(url); let pairs = get_url_query_pairs(url);
@@ -338,14 +339,11 @@ impl FromUrl for ChannelsWithTypeQuery {
} }
fn from_pairs(pairs: &BTreeMap<String, String>) -> Result<Self, Self::Error> { fn from_pairs(pairs: &BTreeMap<String, String>) -> Result<Self, Self::Error> {
let s = pairs let s = pairs.get("scalar_type").ok_or_else(|| Error::MissingScalarType)?;
.get("scalar_type")
.ok_or_else(|| daqbuf_err::Error::with_public_msg_no_trace("missing scalar_type"))?;
//let scalar_type = ScalarType::from_bsread_str(s)?; //let scalar_type = ScalarType::from_bsread_str(s)?;
let scalar_type: ScalarType = serde_json::from_str(&format!("\"{s}\""))?; let scalar_type: ScalarType =
let s = pairs serde_json::from_str(&format!("\"{s}\"")).map_err(|_| Error::MissingScalarType)?;
.get("shape") let s = pairs.get("shape").ok_or_else(|| Error::MissingShape)?;
.ok_or_else(|| daqbuf_err::Error::with_public_msg_no_trace("missing shape"))?;
let shape = Shape::from_dims_str(s)?; let shape = Shape::from_dims_str(s)?;
Ok(Self { scalar_type, shape }) Ok(Self { scalar_type, shape })
} }
@@ -368,29 +366,23 @@ fn bool_false(x: &bool) -> bool {
} }
impl FromUrl for ScyllaChannelEventSeriesIdQuery { impl FromUrl for ScyllaChannelEventSeriesIdQuery {
type Error = daqbuf_err::Error; type Error = Error;
fn from_url(url: &Url) -> Result<Self, daqbuf_err::Error> { fn from_url(url: &Url) -> Result<Self, Error> {
let pairs = get_url_query_pairs(url); let pairs = get_url_query_pairs(url);
Self::from_pairs(&pairs) Self::from_pairs(&pairs)
} }
fn from_pairs(pairs: &BTreeMap<String, String>) -> Result<Self, daqbuf_err::Error> { fn from_pairs(pairs: &BTreeMap<String, String>) -> Result<Self, Error> {
let backend = pairs let backend = pairs.get("backend").ok_or_else(|| Error::MissingBackend)?.into();
.get("backend")
.ok_or_else(|| daqbuf_err::Error::with_public_msg_no_trace("missing backend"))?
.into();
let name = pairs let name = pairs
.get("channelName") .get("channelName")
.ok_or_else(|| daqbuf_err::Error::with_public_msg_no_trace("missing channelName"))? .ok_or_else(|| Error::MissingChannelName)?
.into(); .into();
let s = pairs let s = pairs.get("scalarType").ok_or_else(|| Error::MissingScalarType)?;
.get("scalarType") let scalar_type: ScalarType =
.ok_or_else(|| daqbuf_err::Error::with_public_msg_no_trace("missing scalarType"))?; serde_json::from_str(&format!("\"{s}\"")).map_err(|_| Error::MissingScalarType)?;
let scalar_type: ScalarType = serde_json::from_str(&format!("\"{s}\""))?; let s = pairs.get("shape").ok_or_else(|| Error::MissingShape)?;
let s = pairs
.get("shape")
.ok_or_else(|| daqbuf_err::Error::with_public_msg_no_trace("missing shape"))?;
let shape = Shape::from_dims_str(s)?; let shape = Shape::from_dims_str(s)?;
let do_create = pairs.get("doCreate").map_or("false", |x| x.as_str()) == "true"; let do_create = pairs.get("doCreate").map_or("false", |x| x.as_str()) == "true";
Ok(Self { Ok(Self {
@@ -809,7 +801,7 @@ impl AmbigiousChannelNames {
series: row.get::<_, i64>(0) as u64, series: row.get::<_, i64>(0) as u64,
name: row.get(1), name: row.get(1),
scalar_type: ScalarType::from_scylla_i32(row.get(2))?, scalar_type: ScalarType::from_scylla_i32(row.get(2))?,
shape: Shape::from_scylla_shape_dims(&row.get::<_, Vec<i32>>(3)).map_err(other_err_error)?, shape: Shape::from_scylla_shape_dims(&row.get::<_, Vec<i32>>(3))?,
}; };
ret.ambigious.push(g); ret.ambigious.push(g);
} }
+1 -1
View File
@@ -113,4 +113,4 @@ impl Convable for nodenet::configquorum::Error {}
impl Convable for nodenet::channelconfig::Error {} impl Convable for nodenet::channelconfig::Error {}
impl Convable for query::api4::Error {} impl Convable for query::api4::Error {}
impl Convable for query::api4::events::Error {} impl Convable for query::api4::events::Error {}
impl Convable for netpod::NetpodError {} impl Convable for netpod::Error {}
+1 -1
View File
@@ -78,7 +78,7 @@ pub enum RetrievalError {
#[serde(skip)] #[serde(skip)]
Url(#[from] url::ParseError), Url(#[from] url::ParseError),
#[serde(skip)] #[serde(skip)]
Netpod(#[from] netpod::NetpodError), Netpod(#[from] netpod::Error),
} }
trait IntoBoxedError: std::error::Error {} trait IntoBoxedError: std::error::Error {}
+3 -2
View File
@@ -233,14 +233,15 @@ impl StatusNodesRecursive {
for (tag, sr) in all { for (tag, sr) in all {
match sr { match sr {
Ok(sr) => { Ok(sr) => {
let s: Result<NodeStatus, _> = serde_json::from_value(sr.val).map_err(daqbuf_err::Error::from); let s: Result<NodeStatus, _> =
serde_json::from_value(sr.val).map_err(|e| netpod::NodeStatusSubError::Msg(e.to_string()));
let sub = NodeStatusSub { url: tag.0, status: s }; let sub = NodeStatusSub { url: tag.0, status: s };
subs.push_back(sub); subs.push_back(sub);
} }
Err(e) => { Err(e) => {
let sub = NodeStatusSub { let sub = NodeStatusSub {
url: tag.0, url: tag.0,
status: Err(daqbuf_err::Error::from(e)), status: Err(netpod::NodeStatusSubError::Msg(e.to_string())),
}; };
subs.push_back(sub); subs.push_back(sub);
} }
+1
View File
@@ -56,6 +56,7 @@ pub enum Error {
Frame(#[from] items_2::frame::Error), Frame(#[from] items_2::frame::Error),
InMem(#[from] streams::frames::inmem::Error), InMem(#[from] streams::frames::inmem::Error),
FramedStream(#[from] streams::frames::Error), FramedStream(#[from] streams::frames::Error),
Netpod(#[from] netpod::Error),
} }
pub async fn events_service(ncc: NodeConfigCached) -> Result<(), Error> { pub async fn events_service(ncc: NodeConfigCached) -> Result<(), Error> {