diff --git a/.cargo/config.toml b/.cargo/config.toml index 83a9246..3522d2d 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -1,6 +1,6 @@ [build] rustflags = [ - #"-C", "force-frame-pointers=yes", + "-C", "force-frame-pointers=yes", #"-C", "force-unwind-tables=yes", #"-C", "embed-bitcode=no", #"-C", "relocation-model=pic", diff --git a/Cargo.toml b/Cargo.toml index 70d9ee4..f88a491 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ members = ["crates/*"] [profile.release] -opt-level = 1 +opt-level = 2 debug = 0 overflow-checks = false debug-assertions = false diff --git a/crates/daqbuffer/Cargo.toml b/crates/daqbuffer/Cargo.toml index d46dbae..f2df0bd 100644 --- a/crates/daqbuffer/Cargo.toml +++ b/crates/daqbuffer/Cargo.toml @@ -1,11 +1,11 @@ [package] name = "daqbuffer" -version = "0.4.1" +version = "0.4.3-alpha" authors = ["Dominik Werder "] edition = "2021" [dependencies] -tokio = { version = "1.18.1", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } +tokio = { version = "1.22.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } hyper = "0.14" http = "0.2" tracing = "0.1.25" @@ -20,7 +20,7 @@ serde_json = "1.0" serde_yaml = "0.9.16" chrono = "0.4" url = "2.2.2" -clap = { version = "4.0.22", features = ["derive", "cargo"] } +clap = { version = "4.3.15", features = ["derive", "cargo"] } lazy_static = "1.4.0" err = { path = "../err" } taskrun = { path = "../taskrun" } diff --git a/crates/daqbuffer/src/bin/daqbuffer.rs b/crates/daqbuffer/src/bin/daqbuffer.rs index cf6d8a8..785ac87 100644 --- a/crates/daqbuffer/src/bin/daqbuffer.rs +++ b/crates/daqbuffer/src/bin/daqbuffer.rs @@ -14,6 +14,7 @@ use netpod::DtNano; use netpod::NodeConfig; use netpod::NodeConfigCached; use netpod::ProxyConfig; +use netpod::ServiceVersion; use tokio::fs::File; use tokio::io::AsyncReadExt; @@ -55,9 +56,16 @@ fn parse_ts(s: &str) -> Result, Error> { async fn go() -> Result<(), Error> { let opts = Opts::parse(); + let service_version = ServiceVersion { + major: std::env!("CARGO_PKG_VERSION_MAJOR").parse().unwrap_or(0), + minor: std::env!("CARGO_PKG_VERSION_MINOR").parse().unwrap_or(0), + patch: std::env!("CARGO_PKG_VERSION_PATCH").parse().unwrap_or(0), + pre: std::option_env!("CARGO_PKG_VERSION_PRE").map(Into::into), + }; match opts.subcmd { SubCmd::Retrieval(subcmd) => { - info!("daqbuffer {}", clap::crate_version!()); + info!("daqbuffer version {} 0000", clap::crate_version!()); + info!("{:?}", service_version); let mut config_file = File::open(&subcmd.config).await?; let mut buf = Vec::new(); config_file.read_to_end(&mut buf).await?; @@ -65,12 +73,12 @@ async fn go() -> Result<(), Error> { info!("Parsed json config from {}", subcmd.config); let cfg: Result = cfg.into(); let cfg = cfg?; - daqbufp2::run_node(cfg).await?; + daqbufp2::run_node(cfg, service_version).await?; } else if let Ok(cfg) = serde_yaml::from_slice::(&buf) { info!("Parsed yaml config from {}", subcmd.config); let cfg: Result = cfg.into(); let cfg = cfg?; - daqbufp2::run_node(cfg).await?; + daqbufp2::run_node(cfg, service_version).await?; } else { return Err(Error::with_msg_no_trace(format!( "can not parse config at {}", @@ -86,7 +94,7 @@ async fn go() -> Result<(), Error> { let proxy_config: ProxyConfig = serde_yaml::from_slice(&buf).map_err(|e| Error::with_msg_no_trace(e.to_string()))?; info!("Parsed yaml config from {}", subcmd.config); - daqbufp2::run_proxy(proxy_config.clone()).await?; + daqbufp2::run_proxy(proxy_config.clone(), service_version).await?; } SubCmd::Client(client) => match client.client_type { ClientType::Status(opts) => { diff --git a/crates/daqbufp2/Cargo.toml b/crates/daqbufp2/Cargo.toml index b82846c..7ba9bd4 100644 --- a/crates/daqbufp2/Cargo.toml +++ b/crates/daqbufp2/Cargo.toml @@ -18,7 +18,6 @@ bytes = "1.0.1" serde = "1.0" serde_derive = "1.0" serde_json = "1.0" -rmp-serde = "1.1.1" chrono = "0.4" url = "2.2.2" lazy_static = "1.4.0" diff --git a/crates/daqbufp2/src/daqbufp2.rs b/crates/daqbufp2/src/daqbufp2.rs index 49b1d15..66d9c23 100644 --- a/crates/daqbufp2/src/daqbufp2.rs +++ b/crates/daqbufp2/src/daqbufp2.rs @@ -6,10 +6,20 @@ pub mod test; use ::err::Error; use futures_util::TryFutureExt; -use netpod::{Cluster, NodeConfig, NodeConfigCached, ProxyConfig}; +use netpod::Cluster; +use netpod::NodeConfig; +use netpod::NodeConfigCached; +use netpod::ProxyConfig; +use netpod::ServiceVersion; use tokio::task::JoinHandle; pub fn spawn_test_hosts(cluster: Cluster) -> Vec>> { + let service_version = ServiceVersion { + major: 0, + minor: 0, + patch: 0, + pre: None, + }; let mut ret = Vec::new(); for node in &cluster.nodes { let node_config = NodeConfig { @@ -18,7 +28,7 @@ pub fn spawn_test_hosts(cluster: Cluster) -> Vec>> }; let node_config: Result = node_config.into(); let node_config = node_config.unwrap(); - let h = tokio::spawn(httpret::host(node_config).map_err(Error::from)); + let h = tokio::spawn(httpret::host(node_config, service_version.clone()).map_err(Error::from)); ret.push(h); } @@ -27,12 +37,12 @@ pub fn spawn_test_hosts(cluster: Cluster) -> Vec>> ret } -pub async fn run_node(node_config: NodeConfigCached) -> Result<(), Error> { - httpret::host(node_config).await?; +pub async fn run_node(node_config: NodeConfigCached, service_version: ServiceVersion) -> Result<(), Error> { + httpret::host(node_config, service_version).await?; Ok(()) } -pub async fn run_proxy(proxy_config: ProxyConfig) -> Result<(), Error> { - httpret::proxy::proxy(proxy_config).await?; +pub async fn run_proxy(proxy_config: ProxyConfig, service_version: ServiceVersion) -> Result<(), Error> { + httpret::proxy::proxy(proxy_config, service_version).await?; Ok(()) } diff --git a/crates/daqbufp2/src/test/timeweightedjson.rs b/crates/daqbufp2/src/test/timeweightedjson.rs index c5fca81..48f1dd5 100644 --- a/crates/daqbufp2/src/test/timeweightedjson.rs +++ b/crates/daqbufp2/src/test/timeweightedjson.rs @@ -76,9 +76,9 @@ async fn get_json_common( if expect_finalised_range { if !res .get("rangeFinal") - .ok_or(Error::with_msg("missing rangeFinal"))? + .ok_or_else(|| Error::with_msg("missing rangeFinal"))? .as_bool() - .ok_or(Error::with_msg("key rangeFinal not bool"))? + .ok_or_else(|| Error::with_msg("key rangeFinal not bool"))? { return Err(Error::with_msg("expected rangeFinal")); } diff --git a/crates/httpclient/Cargo.toml b/crates/httpclient/Cargo.toml index 9d84bdc..024a627 100644 --- a/crates/httpclient/Cargo.toml +++ b/crates/httpclient/Cargo.toml @@ -8,7 +8,6 @@ edition = "2021" futures-util = "0.3.25" serde = { version = "1.0.147", features = ["derive"] } serde_json = "1.0.89" -rmp-serde = "1.1.1" http = "0.2.8" url = "2.3.1" tokio = { version = "1.22.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } diff --git a/crates/httpret/src/api1.rs b/crates/httpret/src/api1.rs index 66bc380..fe0e799 100644 --- a/crates/httpret/src/api1.rs +++ b/crates/httpret/src/api1.rs @@ -995,7 +995,7 @@ impl Api1EventsBinaryHandler { Ok(ret) } else { // TODO set the public error code and message and return Err(e). - let e = Error::with_public_msg(format!("{self_name} unsupported Accept: {}", accept)); + let e = Error::with_public_msg_no_trace(format!("{self_name} unsupported Accept: {}", accept)); error!("{self_name} {e}"); Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?) } @@ -1030,7 +1030,7 @@ impl RequestStatusHandler { .to_owned(); if accept != APP_JSON && accept != ACCEPT_ALL { // TODO set the public error code and message and return Err(e). - let e = Error::with_public_msg(format!("Unsupported Accept: {:?}", accept)); + let e = Error::with_public_msg_no_trace(format!("Unsupported Accept: {:?}", accept)); error!("{e}"); return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?); } diff --git a/crates/httpret/src/api4/binned.rs b/crates/httpret/src/api4/binned.rs index d91d1be..7feb272 100644 --- a/crates/httpret/src/api4/binned.rs +++ b/crates/httpret/src/api4/binned.rs @@ -12,9 +12,6 @@ use netpod::log::*; use netpod::timeunits::SEC; use netpod::FromUrl; use netpod::NodeConfigCached; -use netpod::ACCEPT_ALL; -use netpod::APP_JSON; -use netpod::APP_OCTET; use query::api4::binned::BinnedQuery; use tracing::Instrument; use url::Url; @@ -37,6 +34,7 @@ async fn binned_json(url: Url, req: Request, node_config: &NodeConfigCache let span1 = span!( Level::INFO, "httpret::binned", + reqid, beg = query.range().beg_u64() / SEC, end = query.range().end_u64() / SEC, ch = query.channel().name().clone(), @@ -53,10 +51,6 @@ async fn binned_json(url: Url, req: Request, node_config: &NodeConfigCache } async fn binned(req: Request, node_config: &NodeConfigCached) -> Result, Error> { - let accept = req - .headers() - .get(http::header::ACCEPT) - .map_or(ACCEPT_ALL, |k| k.to_str().unwrap_or(ACCEPT_ALL)); let url = { let s1 = format!("dummy:{}", req.uri()); Url::parse(&s1) @@ -70,15 +64,18 @@ async fn binned(req: Request, node_config: &NodeConfigCached) -> Result, ctx: &ReqCtx, node_config: &NodeConfigCached, + service_version: &ServiceVersion, ) -> Result, Error> { - let res = tokio::time::timeout(Duration::from_millis(1200), self.status(req, ctx, node_config)).await; + let res = tokio::time::timeout( + Duration::from_millis(1200), + self.status(req, ctx, node_config, service_version), + ) + .await; let res = match res { Ok(res) => res, Err(e) => { @@ -67,6 +73,7 @@ impl StatusNodesRecursive { req: Request, _ctx: &ReqCtx, node_config: &NodeConfigCached, + service_version: &ServiceVersion, ) -> Result { let (_head, _body) = req.into_parts(); let archiver_appliance_status = match node_config.node.archiver_appliance.as_ref() { @@ -96,7 +103,7 @@ impl StatusNodesRecursive { let database_size = dbconn::database_size(node_config).await.map_err(|e| format!("{e}")); let ret = NodeStatus { name: format!("{}:{}", node_config.node.host, node_config.node.port), - version: core::env!("CARGO_PKG_VERSION").into(), + version: service_version.to_string(), is_sf_databuffer: node_config.node.sf_databuffer.is_some(), is_archiver_engine: node_config.node.channel_archiver.is_some(), is_archiver_appliance: node_config.node.archiver_appliance.is_some(), diff --git a/crates/httpret/src/httpret.rs b/crates/httpret/src/httpret.rs index 876856b..8b82824 100644 --- a/crates/httpret/src/httpret.rs +++ b/crates/httpret/src/httpret.rs @@ -37,6 +37,7 @@ use netpod::query::prebinned::PreBinnedQuery; use netpod::CmpZero; use netpod::NodeConfigCached; use netpod::ProxyConfig; +use netpod::ServiceVersion; use netpod::APP_JSON; use netpod::APP_JSON_LINES; use nodenet::conn::events_service; @@ -100,7 +101,27 @@ impl ::err::ToErr for RetrievalError { } } -pub async fn host(node_config: NodeConfigCached) -> Result<(), RetrievalError> { +pub fn accepts_json(hm: &http::HeaderMap) -> bool { + match hm.get(http::header::ACCEPT) { + Some(x) => match x.to_str() { + Ok(x) => x.contains(netpod::APP_JSON) || x.contains(netpod::ACCEPT_ALL), + Err(_) => false, + }, + None => false, + } +} + +pub fn accepts_octets(hm: &http::HeaderMap) -> bool { + match hm.get(http::header::ACCEPT) { + Some(x) => match x.to_str() { + Ok(x) => x.contains(netpod::APP_OCTET), + Err(_) => false, + }, + None => false, + } +} + +pub async fn host(node_config: NodeConfigCached, service_version: ServiceVersion) -> Result<(), RetrievalError> { static STATUS_BOARD_INIT: Once = Once::new(); STATUS_BOARD_INIT.call_once(|| { let b = StatusBoard::new(); @@ -124,6 +145,7 @@ pub async fn host(node_config: NodeConfigCached) -> Result<(), RetrievalError> { debug!("new connection from {:?}", conn.remote_addr()); let node_config = node_config.clone(); let addr = conn.remote_addr(); + let service_version = service_version.clone(); async move { Ok::<_, Error>(service_fn({ move |req| { @@ -135,7 +157,7 @@ pub async fn host(node_config: NodeConfigCached) -> Result<(), RetrievalError> { req.uri(), req.headers() ); - let f = http_service(req, node_config.clone()); + let f = http_service(req, node_config.clone(), service_version.clone()); Cont { f: Box::pin(f) } } })) @@ -150,8 +172,12 @@ pub async fn host(node_config: NodeConfigCached) -> Result<(), RetrievalError> { Ok(()) } -async fn http_service(req: Request, node_config: NodeConfigCached) -> Result, Error> { - match http_service_try(req, &node_config).await { +async fn http_service( + req: Request, + node_config: NodeConfigCached, + service_version: ServiceVersion, +) -> Result, Error> { + match http_service_try(req, &node_config, &service_version).await { Ok(k) => Ok(k), Err(e) => { error!("daqbuffer node http_service sees error: {}", e); @@ -286,7 +312,11 @@ macro_rules! static_http_api1 { }; } -async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> Result, Error> { +async fn http_service_try( + req: Request, + node_config: &NodeConfigCached, + service_version: &ServiceVersion, +) -> Result, Error> { use http::HeaderValue; let mut urlmarks = Vec::new(); urlmarks.push(format!("{}:{}", req.method(), req.uri())); @@ -297,7 +327,7 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> } } let ctx = ReqCtx::with_node(&req, node_config); - let mut res = http_service_inner(req, &ctx, node_config).await?; + let mut res = http_service_inner(req, &ctx, node_config, service_version).await?; let hm = res.headers_mut(); hm.append("Access-Control-Allow-Origin", "*".parse().unwrap()); hm.append("Access-Control-Allow-Headers", "*".parse().unwrap()); @@ -316,19 +346,17 @@ async fn http_service_inner( req: Request, ctx: &ReqCtx, node_config: &NodeConfigCached, + service_version: &ServiceVersion, ) -> Result, RetrievalError> { let uri = req.uri().clone(); let path = uri.path(); if path == "/api/4/private/version" { if req.method() == Method::GET { - let ver_maj: u32 = std::env!("CARGO_PKG_VERSION_MAJOR").parse().unwrap_or(0); - let ver_min: u32 = std::env!("CARGO_PKG_VERSION_MINOR").parse().unwrap_or(0); - let ver_pat: u32 = std::env!("CARGO_PKG_VERSION_PATCH").parse().unwrap_or(0); let ret = serde_json::json!({ "daqbuf_version": { - "major": ver_maj, - "minor": ver_min, - "patch": ver_pat, + "major": service_version.major, + "minor": service_version.minor, + "patch": service_version.patch, }, }); Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&ret)?))?) @@ -355,7 +383,7 @@ async fn http_service_inner( Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } } else if let Some(h) = api4::status::StatusNodesRecursive::handler(&req) { - Ok(h.handle(req, ctx, &node_config).await?) + Ok(h.handle(req, ctx, &node_config, service_version).await?) } else if let Some(h) = StatusBoardAllHandler::handler(&req) { h.handle(req, &node_config).await } else if let Some(h) = api4::databuffer_tools::FindActiveHandler::handler(&req) { diff --git a/crates/httpret/src/proxy.rs b/crates/httpret/src/proxy.rs index 5630187..e7faf8f 100644 --- a/crates/httpret/src/proxy.rs +++ b/crates/httpret/src/proxy.rs @@ -37,6 +37,7 @@ use netpod::FromUrl; use netpod::HasBackend; use netpod::HasTimeout; use netpod::ProxyConfig; +use netpod::ServiceVersion; use netpod::ACCEPT_ALL; use netpod::APP_JSON; use query::api4::binned::BinnedQuery; @@ -57,12 +58,13 @@ use url::Url; const DISTRI_PRE: &str = "/distri/"; -pub async fn proxy(proxy_config: ProxyConfig) -> Result<(), Error> { +pub async fn proxy(proxy_config: ProxyConfig, service_version: ServiceVersion) -> Result<(), Error> { use std::str::FromStr; let addr = SocketAddr::from_str(&format!("{}:{}", proxy_config.listen, proxy_config.port))?; let make_service = make_service_fn({ move |_conn| { let proxy_config = proxy_config.clone(); + let service_version = service_version.clone(); async move { Ok::<_, Error>(service_fn({ move |req| { @@ -73,7 +75,7 @@ pub async fn proxy(proxy_config: ProxyConfig) -> Result<(), Error> { req.uri(), req.headers() ); - let f = proxy_http_service(req, proxy_config.clone()); + let f = proxy_http_service(req, proxy_config.clone(), service_version.clone()); Cont { f: Box::pin(f) } } })) @@ -84,8 +86,12 @@ pub async fn proxy(proxy_config: ProxyConfig) -> Result<(), Error> { Ok(()) } -async fn proxy_http_service(req: Request, proxy_config: ProxyConfig) -> Result, Error> { - match proxy_http_service_try(req, &proxy_config).await { +async fn proxy_http_service( + req: Request, + proxy_config: ProxyConfig, + service_version: ServiceVersion, +) -> Result, Error> { + match proxy_http_service_try(req, &proxy_config, &service_version).await { Ok(k) => Ok(k), Err(e) => { error!("data_api_proxy sees error: {:?}", e); @@ -94,9 +100,13 @@ async fn proxy_http_service(req: Request, proxy_config: ProxyConfig) -> Re } } -async fn proxy_http_service_try(req: Request, proxy_config: &ProxyConfig) -> Result, Error> { +async fn proxy_http_service_try( + req: Request, + proxy_config: &ProxyConfig, + service_version: &ServiceVersion, +) -> Result, Error> { let ctx = ReqCtx::with_proxy(&req, proxy_config); - let mut res = proxy_http_service_inner(req, &ctx, proxy_config).await?; + let mut res = proxy_http_service_inner(req, &ctx, proxy_config, &service_version).await?; let hm = res.headers_mut(); hm.insert("Access-Control-Allow-Origin", "*".parse().unwrap()); hm.insert("Access-Control-Allow-Headers", "*".parse().unwrap()); @@ -111,6 +121,7 @@ async fn proxy_http_service_inner( req: Request, ctx: &ReqCtx, proxy_config: &ProxyConfig, + service_version: &ServiceVersion, ) -> Result, Error> { let uri = req.uri().clone(); let path = uri.path(); @@ -122,14 +133,11 @@ async fn proxy_http_service_inner( Ok(gather_json_2_v1(req, "/api/1/gather/", proxy_config).await?) } else if path == "/api/4/private/version" { if req.method() == Method::GET { - let ver_maj: u32 = std::env!("CARGO_PKG_VERSION_MAJOR").parse().unwrap_or(0); - let ver_min: u32 = std::env!("CARGO_PKG_VERSION_MINOR").parse().unwrap_or(0); - let ver_pat: u32 = std::env!("CARGO_PKG_VERSION_PATCH").parse().unwrap_or(0); let ret = serde_json::json!({ "daqbuf_version": { - "major": ver_maj, - "minor": ver_min, - "patch": ver_pat, + "major": service_version.major, + "minor": service_version.minor, + "patch": service_version.patch, }, }); Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&ret)?))?) @@ -137,7 +145,7 @@ async fn proxy_http_service_inner( Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } } else if let Some(h) = api4::StatusNodesRecursive::handler(&req) { - h.handle(req, ctx, &proxy_config).await + h.handle(req, ctx, &proxy_config, service_version).await } else if path == "/api/4/backends" { Ok(backends(req, proxy_config).await?) } else if let Some(h) = api4::ChannelSearchAggHandler::handler(&req) { diff --git a/crates/httpret/src/proxy/api1/reqstatus.rs b/crates/httpret/src/proxy/api1/reqstatus.rs index b365793..c891905 100644 --- a/crates/httpret/src/proxy/api1/reqstatus.rs +++ b/crates/httpret/src/proxy/api1/reqstatus.rs @@ -39,7 +39,7 @@ impl RequestStatusHandler { .to_owned(); if accept != APP_JSON && accept != ACCEPT_ALL { // TODO set the public error code and message and return Err(e). - let e = Error::with_public_msg(format!("Unsupported Accept: {:?}", accept)); + let e = Error::with_public_msg_no_trace(format!("Unsupported Accept: {:?}", accept)); error!("{e}"); return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?); } diff --git a/crates/httpret/src/proxy/api4.rs b/crates/httpret/src/proxy/api4.rs index 4a5a3be..a48a214 100644 --- a/crates/httpret/src/proxy/api4.rs +++ b/crates/httpret/src/proxy/api4.rs @@ -19,6 +19,7 @@ use netpod::ChannelSearchResult; use netpod::NodeStatus; use netpod::NodeStatusSub; use netpod::ProxyConfig; +use netpod::ServiceVersion; use netpod::ACCEPT_ALL; use netpod::APP_JSON; use serde_json::Value as JsVal; @@ -162,9 +163,10 @@ impl StatusNodesRecursive { &self, req: Request, ctx: &ReqCtx, - node_config: &ProxyConfig, + proxy_config: &ProxyConfig, + service_version: &ServiceVersion, ) -> Result, Error> { - match self.status(req, ctx, node_config).await { + match self.status(req, ctx, proxy_config, service_version).await { Ok(status) => { let body = serde_json::to_vec(&status)?; let ret = response(StatusCode::OK).body(Body::from(body))?; @@ -183,6 +185,7 @@ impl StatusNodesRecursive { _req: Request, _ctx: &ReqCtx, proxy_config: &ProxyConfig, + service_version: &ServiceVersion, ) -> Result { let path = crate::api4::status::StatusNodesRecursive::path(); let mut bodies = Vec::new(); @@ -243,7 +246,7 @@ impl StatusNodesRecursive { } let ret = NodeStatus { name: format!("{}:{}", proxy_config.name, proxy_config.port), - version: core::env!("CARGO_PKG_VERSION").into(), + version: service_version.to_string(), is_sf_databuffer: false, is_archiver_engine: false, is_archiver_appliance: false, diff --git a/crates/httpret/src/pulsemap.rs b/crates/httpret/src/pulsemap.rs index aa39bcf..e6c2b17 100644 --- a/crates/httpret/src/pulsemap.rs +++ b/crates/httpret/src/pulsemap.rs @@ -881,9 +881,11 @@ impl FromUrl for MapPulseQuery { fn from_url(url: &url::Url) -> Result { let mut pit = url .path_segments() - .ok_or(Error::with_msg_no_trace("no path in url"))? + .ok_or_else(|| Error::with_msg_no_trace("no path in url"))? .rev(); - let pulsestr = pit.next().ok_or(Error::with_msg_no_trace("no pulse in url path"))?; + let pulsestr = pit + .next() + .ok_or_else(|| Error::with_msg_no_trace("no pulse in url path"))?; let backend = pit.next().unwrap_or("sf-databuffer").into(); // TODO legacy: use a default backend if not specified. let backend = if backend == "pulse" { diff --git a/crates/httpret/src/settings.rs b/crates/httpret/src/settings.rs index 3b42ccc..c9dbac1 100644 --- a/crates/httpret/src/settings.rs +++ b/crates/httpret/src/settings.rs @@ -35,7 +35,7 @@ impl SettingsThreadsMaxHandler { .to_owned(); if accept != APP_JSON && accept != ACCEPT_ALL { // TODO set the public error code and message and return Err(e). - let e = Error::with_public_msg(format!("Unsupported Accept: {:?}", accept)); + let e = Error::with_public_msg_no_trace(format!("Unsupported Accept: {:?}", accept)); error!("{e}"); return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?); } diff --git a/crates/items_0/src/items_0.rs b/crates/items_0/src/items_0.rs index 7fc5297..3b33fd5 100644 --- a/crates/items_0/src/items_0.rs +++ b/crates/items_0/src/items_0.rs @@ -91,7 +91,13 @@ pub enum MergeError { impl From for err::Error { fn from(e: MergeError) -> Self { - format!("{e:?}").into() + e.to_string().into() + } +} + +impl fmt::Display for MergeError { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "{:?}", self) } } diff --git a/crates/items_2/Cargo.toml b/crates/items_2/Cargo.toml index 07bcf1c..4dbd349 100644 --- a/crates/items_2/Cargo.toml +++ b/crates/items_2/Cargo.toml @@ -12,6 +12,8 @@ doctest = false serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" serde_cbor = "0.11.2" +rmp-serde = "1.1.1" +postcard = { version = "1.0.0", features = ["use-std"] } erased-serde = "0.3" bytes = "1.2.1" num-traits = "0.2.15" @@ -20,7 +22,6 @@ crc32fast = "1.3.2" futures-util = "0.3.24" tokio = { version = "1.20", features = ["rt-multi-thread", "sync", "time"] } humantime-serde = "1.1.1" -rmp-serde = "1.1.1" err = { path = "../err" } items_0 = { path = "../items_0" } items_proc = { path = "../items_proc" } diff --git a/crates/items_2/src/channelevents.rs b/crates/items_2/src/channelevents.rs index 6fa9ff6..3229978 100644 --- a/crates/items_2/src/channelevents.rs +++ b/crates/items_2/src/channelevents.rs @@ -202,52 +202,63 @@ mod serde_channel_events { where A: de::SeqAccess<'de>, { - let e0: &str = seq.next_element()?.ok_or(de::Error::missing_field("[0] cty"))?; - let e1: u32 = seq.next_element()?.ok_or(de::Error::missing_field("[1] nty"))?; + let e0: &str = seq.next_element()?.ok_or_else(|| de::Error::missing_field("[0] cty"))?; + let e1: u32 = seq.next_element()?.ok_or_else(|| de::Error::missing_field("[1] nty"))?; if e0 == EventsDim0::::serde_id() { match e1 { u8::SUB => { - let obj: EventsDim0 = seq.next_element()?.ok_or(de::Error::missing_field("[2] obj"))?; + let obj: EventsDim0 = + seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } u16::SUB => { - let obj: EventsDim0 = seq.next_element()?.ok_or(de::Error::missing_field("[2] obj"))?; + let obj: EventsDim0 = + seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } u32::SUB => { - let obj: EventsDim0 = seq.next_element()?.ok_or(de::Error::missing_field("[2] obj"))?; + let obj: EventsDim0 = + seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } u64::SUB => { - let obj: EventsDim0 = seq.next_element()?.ok_or(de::Error::missing_field("[2] obj"))?; + let obj: EventsDim0 = + seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } i8::SUB => { - let obj: EventsDim0 = seq.next_element()?.ok_or(de::Error::missing_field("[2] obj"))?; + let obj: EventsDim0 = + seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } i16::SUB => { - let obj: EventsDim0 = seq.next_element()?.ok_or(de::Error::missing_field("[2] obj"))?; + let obj: EventsDim0 = + seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } i32::SUB => { - let obj: EventsDim0 = seq.next_element()?.ok_or(de::Error::missing_field("[2] obj"))?; + let obj: EventsDim0 = + seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } i64::SUB => { - let obj: EventsDim0 = seq.next_element()?.ok_or(de::Error::missing_field("[2] obj"))?; + let obj: EventsDim0 = + seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } f32::SUB => { - let obj: EventsDim0 = seq.next_element()?.ok_or(de::Error::missing_field("[2] obj"))?; + let obj: EventsDim0 = + seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } f64::SUB => { - let obj: EventsDim0 = seq.next_element()?.ok_or(de::Error::missing_field("[2] obj"))?; + let obj: EventsDim0 = + seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } bool::SUB => { - let obj: EventsDim0 = seq.next_element()?.ok_or(de::Error::missing_field("[2] obj"))?; + let obj: EventsDim0 = + seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } _ => Err(de::Error::custom(&format!("unknown nty {e1}"))), @@ -255,15 +266,18 @@ mod serde_channel_events { } else if e0 == EventsDim1::::serde_id() { match e1 { f32::SUB => { - let obj: EventsDim1 = seq.next_element()?.ok_or(de::Error::missing_field("[2] obj"))?; + let obj: EventsDim1 = + seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } f64::SUB => { - let obj: EventsDim1 = seq.next_element()?.ok_or(de::Error::missing_field("[2] obj"))?; + let obj: EventsDim1 = + seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } bool::SUB => { - let obj: EventsDim1 = seq.next_element()?.ok_or(de::Error::missing_field("[2] obj"))?; + let obj: EventsDim1 = + seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } _ => Err(de::Error::custom(&format!("unknown nty {e1}"))), @@ -272,17 +286,17 @@ mod serde_channel_events { match e1 { f32::SUB => { let obj: EventsXbinDim0 = - seq.next_element()?.ok_or(de::Error::missing_field("[2] obj"))?; + seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } f64::SUB => { let obj: EventsXbinDim0 = - seq.next_element()?.ok_or(de::Error::missing_field("[2] obj"))?; + seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } bool::SUB => { let obj: EventsXbinDim0 = - seq.next_element()?.ok_or(de::Error::missing_field("[2] obj"))?; + seq.next_element()?.ok_or_else(|| de::Error::missing_field("[2] obj"))?; Ok(EvBox(Box::new(obj))) } _ => Err(de::Error::custom(&format!("unknown nty {e1}"))), diff --git a/crates/items_2/src/frame.rs b/crates/items_2/src/frame.rs index 5ebe4e3..aef00dd 100644 --- a/crates/items_2/src/frame.rs +++ b/crates/items_2/src/frame.rs @@ -1,5 +1,6 @@ use crate::framable::FrameDecodable; use crate::framable::INMEM_FRAME_ENCID; +use crate::framable::INMEM_FRAME_FOOT; use crate::framable::INMEM_FRAME_HEAD; use crate::framable::INMEM_FRAME_MAGIC; use crate::inmem::InMemoryFrame; @@ -112,25 +113,57 @@ where rmp_serde::from_slice(buf).map_err(|e| format!("{e}").into()) } +pub fn postcard_to_vec(item: T) -> Result, Error> +where + T: Serialize, +{ + postcard::to_stdvec(&item).map_err(|e| format!("{e}").into()) +} + +pub fn postcard_erased_to_vec(item: T) -> Result, Error> +where + T: erased_serde::Serialize, +{ + use postcard::ser_flavors::Flavor; + let mut ser1 = postcard::Serializer { + output: postcard::ser_flavors::AllocVec::new(), + }; + let mut ser2 = ::erase(&mut ser1); + item.erased_serialize(&mut ser2) + .map_err(|e| Error::from(format!("{e}")))?; + let ret = ser1.output.finalize().map_err(|e| format!("{e}").into()); + ret +} + +pub fn postcard_from_slice(buf: &[u8]) -> Result +where + T: for<'de> serde::Deserialize<'de>, +{ + postcard::from_bytes(buf).map_err(|e| format!("{e}").into()) +} + pub fn encode_to_vec(item: T) -> Result, Error> where T: Serialize, { - msgpack_to_vec(item) + // msgpack_to_vec(item) + postcard_to_vec(item) } pub fn encode_erased_to_vec(item: T) -> Result, Error> where T: erased_serde::Serialize, { - msgpack_erased_to_vec(item) + // msgpack_erased_to_vec(item) + postcard_erased_to_vec(item) } pub fn decode_from_slice(buf: &[u8]) -> Result where T: for<'de> serde::Deserialize<'de>, { - msgpack_from_slice(buf) + // msgpack_from_slice(buf) + postcard_from_slice(buf) } pub fn make_frame_2(item: T, fty: u32) -> Result @@ -145,7 +178,7 @@ where h.update(&enc); let payload_crc = h.finalize(); // TODO reserve also for footer via constant - let mut buf = BytesMut::with_capacity(enc.len() + INMEM_FRAME_HEAD); + let mut buf = BytesMut::with_capacity(INMEM_FRAME_HEAD + INMEM_FRAME_FOOT + enc.len()); buf.put_u32_le(INMEM_FRAME_MAGIC); buf.put_u32_le(INMEM_FRAME_ENCID); buf.put_u32_le(fty); @@ -168,7 +201,7 @@ pub fn make_error_frame(error: &err::Error) -> Result { let mut h = crc32fast::Hasher::new(); h.update(&enc); let payload_crc = h.finalize(); - let mut buf = BytesMut::with_capacity(INMEM_FRAME_HEAD); + let mut buf = BytesMut::with_capacity(INMEM_FRAME_HEAD + INMEM_FRAME_FOOT + enc.len()); buf.put_u32_le(INMEM_FRAME_MAGIC); buf.put_u32_le(INMEM_FRAME_ENCID); buf.put_u32_le(ERROR_FRAME_TYPE_ID); @@ -191,7 +224,7 @@ pub fn make_log_frame(item: &LogItem) -> Result { let mut h = crc32fast::Hasher::new(); h.update(&enc); let payload_crc = h.finalize(); - let mut buf = BytesMut::with_capacity(INMEM_FRAME_HEAD); + let mut buf = BytesMut::with_capacity(INMEM_FRAME_HEAD + INMEM_FRAME_FOOT + enc.len()); buf.put_u32_le(INMEM_FRAME_MAGIC); buf.put_u32_le(INMEM_FRAME_ENCID); buf.put_u32_le(LOG_FRAME_TYPE_ID); @@ -214,7 +247,7 @@ pub fn make_stats_frame(item: &StatsItem) -> Result { let mut h = crc32fast::Hasher::new(); h.update(&enc); let payload_crc = h.finalize(); - let mut buf = BytesMut::with_capacity(INMEM_FRAME_HEAD); + let mut buf = BytesMut::with_capacity(INMEM_FRAME_HEAD + INMEM_FRAME_FOOT + enc.len()); buf.put_u32_le(INMEM_FRAME_MAGIC); buf.put_u32_le(INMEM_FRAME_ENCID); buf.put_u32_le(STATS_FRAME_TYPE_ID); @@ -236,7 +269,7 @@ pub fn make_range_complete_frame() -> Result { let mut h = crc32fast::Hasher::new(); h.update(&enc); let payload_crc = h.finalize(); - let mut buf = BytesMut::with_capacity(INMEM_FRAME_HEAD); + let mut buf = BytesMut::with_capacity(INMEM_FRAME_HEAD + INMEM_FRAME_FOOT + enc.len()); buf.put_u32_le(INMEM_FRAME_MAGIC); buf.put_u32_le(INMEM_FRAME_ENCID); buf.put_u32_le(RANGE_COMPLETE_FRAME_TYPE_ID); @@ -255,7 +288,7 @@ pub fn make_term_frame() -> Result { let mut h = crc32fast::Hasher::new(); h.update(&enc); let payload_crc = h.finalize(); - let mut buf = BytesMut::with_capacity(INMEM_FRAME_HEAD); + let mut buf = BytesMut::with_capacity(INMEM_FRAME_HEAD + INMEM_FRAME_FOOT + enc.len()); buf.put_u32_le(INMEM_FRAME_MAGIC); buf.put_u32_le(INMEM_FRAME_ENCID); buf.put_u32_le(TERM_FRAME_TYPE_ID); diff --git a/crates/netpod/src/netpod.rs b/crates/netpod/src/netpod.rs index abef033..f1fb0d9 100644 --- a/crates/netpod/src/netpod.rs +++ b/crates/netpod/src/netpod.rs @@ -562,6 +562,23 @@ impl NodeConfig { } } +#[derive(Clone, Debug)] +pub struct ServiceVersion { + pub major: u32, + pub minor: u32, + pub patch: u32, + pub pre: Option, +} + +impl fmt::Display for ServiceVersion { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + match &self.pre { + Some(pre) => write!(fmt, "{}.{}.{}-{}", self.major, self.minor, self.patch, pre), + None => write!(fmt, "{}.{}.{}", self.major, self.minor, self.patch), + } + } +} + #[derive(Clone, Debug)] pub struct NodeConfigCached { pub node_config: NodeConfig, @@ -679,11 +696,10 @@ impl FromUrl for SfDbChannel { let ret = SfDbChannel { backend: pairs .get("backend") - .ok_or(Error::with_public_msg("missing backend"))? + .ok_or_else(|| Error::with_public_msg_no_trace("missing backend"))? .into(), name: pairs .get("channelName") - //.ok_or(Error::with_public_msg("missing channelName"))? .map(String::from) .unwrap_or(String::new()) .into(), @@ -692,7 +708,7 @@ impl FromUrl for SfDbChannel { .and_then(|x| x.parse::().map_or(None, |x| Some(x))), }; if ret.name.is_empty() && ret.series.is_none() { - return Err(Error::with_public_msg(format!( + return Err(Error::with_public_msg_no_trace(format!( "Missing one of channelName or seriesId parameters." ))); } @@ -764,11 +780,11 @@ impl FromUrl for DaqbufSeries { let ret = DaqbufSeries { series: pairs .get("seriesId") - .ok_or_else(|| Error::with_public_msg("missing seriesId")) + .ok_or_else(|| Error::with_public_msg_no_trace("missing seriesId")) .map(|x| x.parse::())??, backend: pairs .get("backend") - .ok_or_else(|| Error::with_public_msg("missing backend"))? + .ok_or_else(|| Error::with_public_msg_no_trace("missing backend"))? .into(), name: pairs .get("channelName") diff --git a/crates/netpod/src/query.rs b/crates/netpod/src/query.rs index 7dbeae7..5d664a3 100644 --- a/crates/netpod/src/query.rs +++ b/crates/netpod/src/query.rs @@ -112,7 +112,7 @@ impl FromUrl for TimeRangeQuery { }; Ok(ret) } else { - Err(Error::with_public_msg("missing date range")) + Err(Error::with_public_msg_no_trace("missing date range")) } } } @@ -178,7 +178,7 @@ impl FromUrl for PulseRangeQuery { }; Ok(ret) } else { - Err(Error::with_public_msg("missing pulse range")) + Err(Error::with_public_msg_no_trace("missing pulse range")) } } } @@ -298,8 +298,12 @@ impl FromUrl for ChannelStateEventsQuery { } fn from_pairs(pairs: &BTreeMap) -> Result { - let beg_date = pairs.get("begDate").ok_or(Error::with_msg("missing begDate"))?; - let end_date = pairs.get("endDate").ok_or(Error::with_msg("missing endDate"))?; + let beg_date = pairs + .get("begDate") + .ok_or_else(|| Error::with_msg_no_trace("missing begDate"))?; + let end_date = pairs + .get("endDate") + .ok_or_else(|| Error::with_msg_no_trace("missing endDate"))?; let ret = Self { channel: SfDbChannel::from_pairs(&pairs)?, range: NanoRange { diff --git a/crates/query/src/api4/binned.rs b/crates/query/src/api4/binned.rs index d404167..aa3d10a 100644 --- a/crates/query/src/api4/binned.rs +++ b/crates/query/src/api4/binned.rs @@ -173,9 +173,9 @@ impl FromUrl for BinnedQuery { range, bin_count: pairs .get("binCount") - .ok_or(Error::with_msg("missing binCount"))? + .ok_or_else(|| Error::with_msg_no_trace("missing binCount"))? .parse() - .map_err(|e| Error::with_msg(format!("can not parse binCount {:?}", e)))?, + .map_err(|e| Error::with_msg_no_trace(format!("can not parse binCount {:?}", e)))?, transform: TransformQuery::from_pairs(pairs)?, cache_usage: CacheUsage::from_pairs(&pairs)?, buf_len_disk_io: pairs diff --git a/crates/query/src/api4/events.rs b/crates/query/src/api4/events.rs index 854e309..37b169b 100644 --- a/crates/query/src/api4/events.rs +++ b/crates/query/src/api4/events.rs @@ -221,12 +221,12 @@ impl FromUrl for PlainEventsQuery { .get("doTestMainError") .map_or("false", |k| k) .parse() - .map_err(|e| Error::with_public_msg(format!("can not parse doTestMainError: {}", e)))?, + .map_err(|e| Error::with_public_msg_no_trace(format!("can not parse doTestMainError: {}", e)))?, do_test_stream_error: pairs .get("doTestStreamError") .map_or("false", |k| k) .parse() - .map_err(|e| Error::with_public_msg(format!("can not parse doTestStreamError: {}", e)))?, + .map_err(|e| Error::with_public_msg_no_trace(format!("can not parse doTestStreamError: {}", e)))?, test_do_wasm: pairs .get("testDoWasm") .map(|x| x.parse::().ok()) diff --git a/crates/streams/src/rangefilter2.rs b/crates/streams/src/rangefilter2.rs index dc5a6ae..69d78a9 100644 --- a/crates/streams/src/rangefilter2.rs +++ b/crates/streams/src/rangefilter2.rs @@ -45,7 +45,7 @@ where pub fn new(inp: S, range: NanoRange, one_before_range: bool) -> Self { trace!( - "{}::new range: {:?} one_before_range: {:?}", + "{}::new range: {:?} one_before_range {:?}", Self::type_name(), range, one_before_range @@ -112,16 +112,18 @@ where } else { let mut dummy = item.new_empty(); item.drain_into(&mut dummy, (0, ilge - 1)) - .map_err(|e| format!("{e:?} unexpected MergeError while remove of items"))?; + .map_err(|e| format!("{e} unexpected MergeError while remove of items"))?; self.slot1 = None; item } } None => { + // TODO keep stats about this case + debug!("drain into to keep one before"); let n = item.len(); let mut keep = item.new_empty(); item.drain_into(&mut keep, (n.max(1) - 1, n)) - .map_err(|e| format!("{e:?} unexpected MergeError while remove of items"))?; + .map_err(|e| format!("{e} unexpected MergeError while remove of items"))?; self.slot1 = Some(keep); item.new_empty() }