diff --git a/.cargo/cargo-lock b/.cargo/cargo-lock index 67cd312..12eb82e 100644 --- a/.cargo/cargo-lock +++ b/.cargo/cargo-lock @@ -54,9 +54,9 @@ dependencies = [ [[package]] name = "anstream" -version = "0.6.4" +version = "0.6.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ab91ebe16eb252986481c5b62f6098f3b698a45e34b5b98200cf20dd2484a44" +checksum = "d664a92ecae85fd0a7392615844904654d1d5f5514837f471ddef4a057aba1b6" dependencies = [ "anstyle", "anstyle-parse", @@ -83,9 +83,9 @@ dependencies = [ [[package]] name = "anstyle-query" -version = "1.0.1" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3a318f1f38d2418400f8209655bfd825785afd25aa30bb7ba6cc792e4596748" +checksum = "e28923312444cdd728e4738b3f9c9cac739500909bb3d3c94b43551b16517648" dependencies = [ "windows-sys 0.52.0", ] @@ -167,7 +167,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.40", ] [[package]] @@ -178,7 +178,7 @@ checksum = "a66537f1bb974b254c98ed142ff995236e81b9d0fe4db0575f46612cb15eb0f9" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.40", ] [[package]] @@ -208,7 +208,7 @@ dependencies = [ "bytes", "futures-util", "http 0.2.11", - "http-body 0.4.5", + "http-body 0.4.6", "hyper 0.14.27", "itoa", "matchit", @@ -234,7 +234,7 @@ dependencies = [ "bytes", "futures-util", "http 0.2.11", - "http-body 0.4.5", + "http-body 0.4.6", "mime", "rustversion", "tower-layer", @@ -424,7 +424,7 @@ dependencies = [ "heck 0.4.1", "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.40", ] [[package]] @@ -801,7 +801,7 @@ dependencies = [ "ident_case", "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.40", ] [[package]] @@ -812,7 +812,7 @@ checksum = "836a9bbc7ad63342d6d6e7b815ccab164bc77a2d95d84bc3117a8c0d5c98e2d5" dependencies = [ "darling_core", "quote", - "syn 2.0.39", + "syn 2.0.40", ] [[package]] @@ -986,7 +986,7 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.40", ] [[package]] @@ -1192,7 +1192,7 @@ checksum = "53b153fd91e4b0147f4aced87be237c98248656bb01050b96bf3ee89220a8ddb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.40", ] [[package]] @@ -1437,9 +1437,9 @@ dependencies = [ [[package]] name = "http-body" -version = "0.4.5" +version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" +checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" dependencies = [ "bytes", "http 0.2.11", @@ -1529,6 +1529,7 @@ dependencies = [ "nodenet", "parse", "query", + "rand", "regex", "scyllaconn", "serde", @@ -1568,7 +1569,7 @@ dependencies = [ "futures-util", "h2 0.3.22", "http 0.2.11", - "http-body 0.4.5", + "http-body 0.4.6", "httparse", "httpdate", "itoa", @@ -1752,7 +1753,7 @@ dependencies = [ name = "items_proc" version = "0.0.2" dependencies = [ - "syn 2.0.39", + "syn 2.0.40", ] [[package]] @@ -1766,9 +1767,9 @@ dependencies = [ [[package]] name = "itoa" -version = "1.0.9" +version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af150ab688ff2122fcef229be89cb50dd66af9e01a4ff320cc137eecc9bacc38" +checksum = "b1a46d1a171d865aa5f83f92695765caa047a9b4cbae2cbf37dbd613a793fd4c" [[package]] name = "js-sys" @@ -1793,9 +1794,9 @@ checksum = "884e2677b40cc8c339eaefcb701c32ef1fd2493d71118dc0ca4b6a736c93bd67" [[package]] name = "libc" -version = "0.2.150" +version = "0.2.151" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89d92a4743f9a61002fae18374ed11e7973f530cb3a3255fb354818118b2203c" +checksum = "302d7ab3130588088d277783b1e2d2e10c9e9e4a16dd9050e6ec93fb3e7048f4" [[package]] name = "linux-raw-sys" @@ -1961,8 +1962,10 @@ dependencies = [ "err", "futures-util", "hex", + "http 1.0.0", "humantime-serde", "num-traits", + "rand", "serde", "serde_json", "tracing", @@ -2037,7 +2040,7 @@ checksum = "cfb77679af88f8b125209d354a202862602672222e7f2313fdd6dc349bad4712" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.40", ] [[package]] @@ -2087,7 +2090,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.40", ] [[package]] @@ -2101,9 +2104,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.18.0" +version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d" +checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" [[package]] name = "openssl" @@ -2128,7 +2131,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.40", ] [[package]] @@ -2248,7 +2251,7 @@ checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.40", ] [[package]] @@ -2388,7 +2391,7 @@ dependencies = [ "itertools", "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.40", ] [[package]] @@ -2664,9 +2667,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.26" +version = "0.38.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9470c4bf8246c8daf25f9598dca807fb6510347b1e1cfa55749113850c79d88a" +checksum = "72e572a5e8ca657d7366229cdde4bd14c4eb5499a9573d4d366fe1b599daa316" dependencies = [ "bitflags 2.4.1", "errno", @@ -2683,9 +2686,9 @@ checksum = "7ffc183a10b4478d04cbbbfc96d0873219d962dd5accaff2ffbd4ceb7df837f4" [[package]] name = "ryu" -version = "1.0.15" +version = "1.0.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ad4cc8da4ef723ed60bced201181d83791ad433213d8c24efffda1eec85d741" +checksum = "f98d2aa92eebf49b69786be48e4477826b256916e84a57ff2a4f21923b48eb4c" [[package]] name = "schannel" @@ -2704,9 +2707,9 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" [[package]] name = "scylla" -version = "0.9.1" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8338911f1c2a7bf8ba4036997196e49e9eda8d4414af10548e9420e23a5dbd6f" +checksum = "7b30067d0d0313a0bbcda9c31a503678f18dacd5426dd8a63c3f581cf80f5c30" dependencies = [ "arc-swap", "async-trait", @@ -2738,9 +2741,9 @@ dependencies = [ [[package]] name = "scylla-cql" -version = "0.0.8" +version = "0.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "048bdf0be96308ec0f5aeed2847bb2270f53355425b3afd67e64efc99d70b3e3" +checksum = "c9642864295085ef2fe62493eec405a109fbaae9f3a9b80ea087556af4a97df2" dependencies = [ "async-trait", "bigdecimal", @@ -2765,7 +2768,7 @@ checksum = "5757ded3dfb10967ca7d1ff1084d072d565b5e10b2b21c286d5335c245425a7e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.40", ] [[package]] @@ -2861,7 +2864,7 @@ checksum = "43576ca501357b9b071ac53cdc7da8ef0cbd9493d8df094cd821777ea6e894d3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.40", ] [[package]] @@ -3090,9 +3093,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.39" +version = "2.0.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23e78b90f2fcf45d3e842032ce32e3f2d1545ba6636271dcbf24fa306d87be7a" +checksum = "13fa70a4ee923979ffb522cacce59d34421ebdea5625e1073c4326ef9d2dd42e" dependencies = [ "proc-macro2", "quote", @@ -3171,7 +3174,7 @@ source = "git+https://github.com/dominikwerder/thiserror.git#052df05c18b5f26b462 dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.40", ] [[package]] @@ -3182,7 +3185,7 @@ checksum = "266b2e40bc00e5a6c09c3584011e08b06f123c00362c92b975ba9843aaaa14b8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.40", ] [[package]] @@ -3250,9 +3253,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.34.0" +version = "1.35.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0c014766411e834f7af5b8f4cf46257aab4036ca95e9d2c144a10f59ad6f5b9" +checksum = "841d45b238a16291a4e1584e61820b8ae57d696cc5015c459c229ccc6990cc1c" dependencies = [ "backtrace", "bytes", @@ -3286,7 +3289,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.40", ] [[package]] @@ -3380,7 +3383,7 @@ dependencies = [ "bytes", "h2 0.3.22", "http 0.2.11", - "http-body 0.4.5", + "http-body 0.4.6", "hyper 0.14.27", "hyper-timeout", "percent-encoding", @@ -3446,7 +3449,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.40", ] [[package]] @@ -3503,9 +3506,9 @@ dependencies = [ [[package]] name = "try-lock" -version = "0.2.4" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed" +checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" [[package]] name = "twox-hash" @@ -3525,9 +3528,9 @@ checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" [[package]] name = "unicode-bidi" -version = "0.3.13" +version = "0.3.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "92888ba5573ff080736b3648696b70cafad7d250551175acbaa4e0385b3e1460" +checksum = "6f2528f27a9eb2b21e69c95319b30bd0efd85d09c379741b0f78ea1d86be2416" [[package]] name = "unicode-ident" @@ -3636,7 +3639,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.40", "wasm-bindgen-shared", ] @@ -3658,7 +3661,7 @@ checksum = "f0eb82fcb7930ae6219a7ecfd55b217f5f0893484b7a13022ebb2b2bf20b5283" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.40", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -4036,9 +4039,9 @@ checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" [[package]] name = "winnow" -version = "0.5.25" +version = "0.5.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7e87b8dfbe3baffbe687eef2e164e32286eff31a5ee16463ce03d991643ec94" +checksum = "b67b5f0a4e7a27a64c651977932b9dc5667ca7fc31ac44b03ed37a0cf42fdfff" dependencies = [ "memchr", ] diff --git a/crates/daqbuffer/src/bin/daqbuffer.rs b/crates/daqbuffer/src/bin/daqbuffer.rs index 0aeab3b..3ba65da 100644 --- a/crates/daqbuffer/src/bin/daqbuffer.rs +++ b/crates/daqbuffer/src/bin/daqbuffer.rs @@ -84,18 +84,7 @@ async fn go() -> Result<(), Error> { let cfg = cfg?; daqbufp2::run_node(cfg, service_version).await?; } else if let Ok(cfg) = serde_yaml::from_slice::(&buf) { - let sp = span!(Level::INFO, "parse", id = 123u32); - sp.in_scope(|| { - let sp = span!(Level::TRACE, "sptrace"); - sp.in_scope(|| { - let sp = span!(Level::INFO, "cfg", file = "some"); - sp.in_scope(|| { - debug!("Parsed yaml config from {}", subcmd.config); - info!("Parsed yaml config from {}", subcmd.config); - warn!("Parsed yaml config from {}", subcmd.config); - }); - }); - }); + info!("Parsed yaml config from {}", subcmd.config); let cfg: Result = cfg.into(); let cfg = cfg?; daqbufp2::run_node(cfg, service_version).await?; diff --git a/crates/disk/src/aggtest.rs b/crates/disk/src/aggtest.rs index b3882f9..96ccece 100644 --- a/crates/disk/src/aggtest.rs +++ b/crates/disk/src/aggtest.rs @@ -88,7 +88,7 @@ async fn agg_x_dim_0_inner() { true, // TODO 32, - netpod::ReqCtx::new("req-000").into(), + netpod::ReqCtx::for_test().into(), ); let _ = fut1; // TODO add the binning and expectation and await the result. @@ -150,7 +150,7 @@ async fn agg_x_dim_1_inner() { true, // TODO 32, - netpod::ReqCtx::new("req-000").into(), + netpod::ReqCtx::for_test().into(), ); let _ = fut1; // TODO add the binning and expectation and await the result. diff --git a/crates/httpret/src/api1.rs b/crates/httpret/src/api1.rs index f165578..3b6aa97 100644 --- a/crates/httpret/src/api1.rs +++ b/crates/httpret/src/api1.rs @@ -32,6 +32,7 @@ use itertools::Itertools; use netpod::log::*; use netpod::query::api1::Api1Query; use netpod::range::evrange::NanoRange; +use netpod::req_uri_to_url; use netpod::timeunits::SEC; use netpod::Api1WarningStats; use netpod::ChannelSearchQuery; @@ -70,7 +71,6 @@ use std::time::Instant; use taskrun::tokio; use tracing_futures::Instrument; use url::Url; -use netpod::req_uri_to_url; pub trait BackendAware { fn backend(&self) -> &str; @@ -142,7 +142,11 @@ impl FromErrorCode for ChannelSearchResultItemV1 { #[derive(Debug, Serialize, Deserialize)] pub struct ChannelSearchResultV1(pub Vec); -pub async fn channel_search_list_v1(req: Requ, proxy_config: &ProxyConfig) -> Result { +pub async fn channel_search_list_v1( + req: Requ, + ctx: &ReqCtx, + proxy_config: &ProxyConfig, +) -> Result { let (head, reqbody) = req.into_parts(); let bodybytes = read_body_bytes(reqbody).await?; let query: ChannelSearchQueryV1 = serde_json::from_slice(&bodybytes)?; @@ -235,6 +239,7 @@ pub async fn channel_search_list_v1(req: Requ, proxy_config: &ProxyConfig) -> Re nt, ft, Duration::from_millis(3000), + ctx, ) .await?; Ok(ret) @@ -246,7 +251,11 @@ pub async fn channel_search_list_v1(req: Requ, proxy_config: &ProxyConfig) -> Re } } -pub async fn channel_search_configs_v1(req: Requ, proxy_config: &ProxyConfig) -> Result { +pub async fn channel_search_configs_v1( + req: Requ, + ctx: &ReqCtx, + proxy_config: &ProxyConfig, +) -> Result { let (head, reqbody) = req.into_parts(); let bodybytes = read_body_bytes(reqbody).await?; let query: ChannelSearchQueryV1 = serde_json::from_slice(&bodybytes)?; @@ -358,6 +367,7 @@ pub async fn channel_search_configs_v1(req: Requ, proxy_config: &ProxyConfig) -> nt, ft, Duration::from_millis(3000), + ctx, ) .await?; Ok(ret) @@ -516,6 +526,7 @@ async fn process_answer(res: Response) -> Result { } pub struct DataApiPython3DataStream { + ts_ctor: Instant, range: NanoRange, channels: VecDeque, settings: EventsSubQuerySettings, @@ -551,6 +562,7 @@ impl DataApiPython3DataStream { ) -> Self { debug!("DataApiPython3DataStream::new settings {settings:?} disk_io_tune {disk_io_tune:?}"); Self { + ts_ctor: Instant::now(), range, channels: channels.into_iter().collect(), settings, @@ -739,7 +751,7 @@ impl DataApiPython3DataStream { if tsnow.duration_since(self.ping_last) >= Duration::from_millis(500) { self.ping_last = tsnow; let mut sb = crate::status_board().unwrap(); - sb.mark_alive(self.ctx.reqid()); + sb.mark_alive(self.ctx.reqid_this()); } ret } @@ -781,10 +793,12 @@ impl Stream for DataApiPython3DataStream { panic!("poll on completed") } else if self.data_done { self.completed = true; - let reqid = self.ctx.reqid(); + let dt = self.ts_ctor.elapsed().as_secs_f32(); info!( - "{} response body sent {} bytes ({})", - reqid, self.count_bytes, self.count_emits + "response body sent {} bytes {} items {:0} ms", + self.count_bytes, + self.count_emits, + 1e3 * dt ); Ready(None) } else { @@ -803,7 +817,7 @@ impl Stream for DataApiPython3DataStream { self.current_fetch_info = None; self.data_done = true; let mut sb = crate::status_board().unwrap(); - sb.add_error(self.ctx.reqid(), e.0.clone()); + sb.add_error(self.ctx.reqid_this(), e.0.clone()); Ready(Some(Err(e))) } }, @@ -838,8 +852,8 @@ impl Stream for DataApiPython3DataStream { let n = Instant::now(); self.ping_last = n; let mut sb = crate::status_board().unwrap(); - sb.mark_alive(self.ctx.reqid()); - sb.mark_done(self.ctx.reqid()); + sb.mark_alive(self.ctx.reqid_this()); + sb.mark_done(self.ctx.reqid_this()); } continue; } @@ -915,7 +929,7 @@ impl Api1EventsBinaryHandler { }; let url = req_uri_to_url(&head.uri)?; let disk_tune = DiskIoTune::from_url(&url)?; - let reqidspan = tracing::info_span!("api1query", reqid = ctx.reqid()); + let reqidspan = tracing::info_span!("api1query"); // TODO do not clone here let reqctx = Arc::new(ctx.clone()); self.handle_for_query( @@ -981,8 +995,8 @@ impl Api1EventsBinaryHandler { // This means, the request status must provide these counters. error!("no config quorum found for {ch:?}"); let mut sb = crate::status_board().unwrap(); - sb.mark_alive(reqctx.reqid()); - if let Some(e) = sb.get_entry(reqctx.reqid()) { + sb.mark_alive(reqctx.reqid_this()); + if let Some(e) = sb.get_entry(reqctx.reqid_this()) { e.channel_not_found_inc(); } } @@ -1005,7 +1019,23 @@ impl Api1EventsBinaryHandler { ); let s = s.instrument(span).instrument(reqidspan); let body = body_stream(s); - let ret = response(StatusCode::OK).header(X_DAQBUF_REQID, reqctx.reqid()); + let n2 = ncc + .node_config + .cluster + .nodes + .get(ncc.ix) + .ok_or_else(|| Error::with_msg_no_trace(format!("node ix {} not found", ncc.ix)))?; + let nodeno_pre = "sf-daqbuf-"; + let nodeno: u32 = if n2.host.starts_with(nodeno_pre) { + n2.host[nodeno_pre.len()..nodeno_pre.len() + 2] + .parse() + .map_err(|e| Error::with_msg_no_trace(format!("{e}")))? + } else { + 0 + }; + let req_stat_id = format!("{}{:02}", reqctx.reqid_this(), nodeno); + info!("return req_stat_id {req_stat_id}"); + let ret = response(StatusCode::OK).header(X_DAQBUF_REQID, req_stat_id); let ret = ret.body(body)?; Ok(ret) } else { @@ -1052,7 +1082,7 @@ impl RequestStatusHandler { let _body_data = read_body_bytes(body).await?; let status_id = &head.uri.path()[Self::path_prefix().len()..]; debug!("RequestStatusHandler status_id {:?}", status_id); - let status = crate::status_board()?.status_as_json(status_id); + let status = crate::status_board().unwrap().status_as_json(status_id); let s = serde_json::to_string(&status)?; let ret = response(StatusCode::OK).body(body_string(s))?; Ok(ret) diff --git a/crates/httpret/src/api4.rs b/crates/httpret/src/api4.rs index abaea30..69c9b8d 100644 --- a/crates/httpret/src/api4.rs +++ b/crates/httpret/src/api4.rs @@ -2,5 +2,6 @@ pub mod binned; pub mod databuffer_tools; pub mod eventdata; pub mod events; +pub mod maintenance; pub mod search; pub mod status; diff --git a/crates/httpret/src/api4/maintenance.rs b/crates/httpret/src/api4/maintenance.rs new file mode 100644 index 0000000..6c86daf --- /dev/null +++ b/crates/httpret/src/api4/maintenance.rs @@ -0,0 +1,201 @@ +use crate::bodystream::response; +use crate::err::Error; +use crate::RetrievalError; +use bytes::Bytes; +use futures_util::StreamExt; +use http::Method; +use http::StatusCode; +use httpclient::body_empty; +use httpclient::body_stream; +use httpclient::body_string; +use httpclient::Requ; +use httpclient::StreamResponse; +use netpod::log::*; +use netpod::NodeConfigCached; +use netpod::ReqCtx; +use netpod::APP_JSON; +use netpod::APP_JSON_LINES; + +pub struct UpdateDbWithChannelNamesHandler {} + +impl UpdateDbWithChannelNamesHandler { + pub fn self_name() -> &'static str { + std::any::type_name::() + } + + pub fn prefix() -> &'static str { + "/api/4/maintenance/update_db_with_channel_names" + } + + pub fn handler(req: &Requ) -> Option { + if req.uri().path().starts_with(Self::prefix()) { + Some(Self {}) + } else { + None + } + } + + pub async fn handle( + &self, + req: Requ, + ctx: &ReqCtx, + node_config: &NodeConfigCached, + ) -> Result { + if req.method() != Method::GET { + return Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?); + } + info!("{}", Self::self_name()); + let (head, _body) = req.into_parts(); + let _dry = match head.uri.query() { + Some(q) => q.contains("dry"), + None => false, + }; + let res = + dbconn::scan::update_db_with_channel_names(node_config.clone(), &node_config.node_config.cluster.database) + .await; + match res { + Ok(res) => { + let stream = res.map(|k| match serde_json::to_string(&k) { + Ok(mut item) => { + item.push('\n'); + Ok(Bytes::from(item)) + } + Err(e) => Err(e), + }); + let ret = response(StatusCode::OK) + .header(http::header::CONTENT_TYPE, APP_JSON_LINES) + .body(body_stream(stream))?; + let dt = ctx.ts_ctor().elapsed(); + info!("{} response dt {:.0} ms", Self::self_name(), 1e3 * dt.as_secs_f32()); + Ok(ret) + } + Err(e) => { + let p = serde_json::to_string(&e)?; + let res = response(StatusCode::OK) + .header(http::header::CONTENT_TYPE, APP_JSON_LINES) + .body(body_string(p))?; + Ok(res) + } + } + } +} + +pub struct UpdateDbWithAllChannelConfigsHandler {} + +impl UpdateDbWithAllChannelConfigsHandler { + pub fn self_name() -> &'static str { + std::any::type_name::() + } + + pub fn prefix() -> &'static str { + "/api/4/maintenance/update_db_with_all_channel_configs" + } + + pub fn handler(req: &Requ) -> Option { + if req.uri().path().starts_with(Self::prefix()) { + Some(Self {}) + } else { + None + } + } + + pub async fn handle( + &self, + req: Requ, + ctx: &ReqCtx, + node_config: &NodeConfigCached, + ) -> Result { + if req.method() != Method::GET { + return Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?); + } + info!("{}", Self::self_name()); + let (head, _body) = req.into_parts(); + let _dry = match head.uri.query() { + Some(q) => q.contains("dry"), + None => false, + }; + let res = dbconn::scan::update_db_with_all_channel_configs(node_config.clone()).await?; + let stream = res.map(|k| match serde_json::to_string(&k) { + Ok(mut item) => { + item.push('\n'); + Ok(Bytes::from(item)) + } + Err(e) => Err(e), + }); + let ret = response(StatusCode::OK) + .header(http::header::CONTENT_TYPE, APP_JSON_LINES) + .body(body_stream(stream))?; + let dt = ctx.ts_ctor().elapsed(); + info!("{} response dt {:.0} ms", Self::self_name(), 1e3 * dt.as_secs_f32()); + Ok(ret) + } +} + +pub struct UpdateSearchCacheHandler {} + +impl UpdateSearchCacheHandler { + pub fn self_name() -> &'static str { + std::any::type_name::() + } + + pub fn prefix() -> &'static str { + "/api/4/maintenance/update_search_cache" + } + + pub fn handler(req: &Requ) -> Option { + if req.uri().path().starts_with(Self::prefix()) { + Some(Self {}) + } else { + None + } + } + + pub async fn handle( + &self, + req: Requ, + ctx: &ReqCtx, + node_config: &NodeConfigCached, + ) -> Result { + if req.method() != Method::GET { + return Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?); + } + info!("{}", Self::self_name()); + let (head, _body) = req.into_parts(); + let _dry = match head.uri.query() { + Some(q) => q.contains("dry"), + None => false, + }; + let res = dbconn::scan::update_search_cache(node_config).await?; + let ret = response(StatusCode::OK) + .header(http::header::CONTENT_TYPE, APP_JSON) + .body(body_string(serde_json::to_string(&res)?))?; + let dt = ctx.ts_ctor().elapsed(); + info!("{} response dt {:.0} ms", Self::self_name(), 1e3 * dt.as_secs_f32()); + Ok(ret) + } +} + +#[allow(unused)] +async fn update_db_with_channel_names_3( + req: Requ, + _ctx: &ReqCtx, + node_config: &NodeConfigCached, +) -> Result { + let (head, _body) = req.into_parts(); + let _dry = match head.uri.query() { + Some(q) => q.contains("dry"), + None => false, + }; + let res = dbconn::scan::update_db_with_channel_names_3(node_config); + let stream = res.map(|k| match serde_json::to_string(&k) { + Ok(mut item) => { + item.push('\n'); + Ok(Bytes::from(item)) + } + Err(e) => Err(e), + }); + let ret = response(StatusCode::OK) + .header(http::header::CONTENT_TYPE, APP_JSON_LINES) + .body(body_stream(stream))?; + Ok(ret) +} diff --git a/crates/httpret/src/gather.rs b/crates/httpret/src/gather.rs index d2aeb64..4d4d79f 100644 --- a/crates/httpret/src/gather.rs +++ b/crates/httpret/src/gather.rs @@ -1,8 +1,6 @@ use crate::body_empty; use crate::body_string; use crate::err::Error; -use crate::response; -use crate::Requ; use futures_util::select; use futures_util::FutureExt; use http::header; @@ -10,20 +8,14 @@ use http::Method; use http::StatusCode; use http::Uri; use httpclient::connect_client; -use httpclient::read_body_bytes; -use httpclient::IntoBody; -use httpclient::StreamResponse; -use httpclient::ToJsonBody; use hyper::body::Incoming; use hyper::Request; use hyper::Response; use netpod::log::*; -use netpod::Node; -use netpod::NodeConfigCached; +use netpod::ReqCtx; use netpod::APP_JSON; use serde::Deserialize; use serde::Serialize; -use serde_json::Value as JsonValue; use std::fmt; use std::future::Future; use std::pin::Pin; @@ -44,97 +36,6 @@ struct GatherHost { inst: String, } -async fn process_answer(res: Response) -> Result { - let (pre, body) = res.into_parts(); - if pre.status != StatusCode::OK { - let buf = read_body_bytes(body).await?; - let s = String::from_utf8(buf.to_vec())?; - Ok(JsonValue::String(format!("status {} body {}", pre.status.as_str(), s))) - } else { - let body_all = read_body_bytes(body).await?; - let val = match serde_json::from_slice(&body_all) { - Ok(k) => k, - Err(_e) => JsonValue::String(String::from_utf8(body_all.to_vec())?), - }; - Ok::<_, Error>(val) - } -} - -pub async fn gather_get_json(req: Requ, node_config: &NodeConfigCached) -> Result { - let (head, body) = req.into_parts(); - let _bodyslice = read_body_bytes(body).await?; - let pathpre = "/api/4/gather/"; - let pathsuf = &head.uri.path()[pathpre.len()..]; - let spawned: Vec<_> = node_config - .node_config - .cluster - .nodes - .iter() - .filter_map(|node| { - let uri = format!("http://{}:{}/api/4/{}", node.host, node.port, pathsuf); - let req = Request::builder() - .method(Method::GET) - .header(http::header::HOST, &node.host) - .header(http::header::ACCEPT, APP_JSON) - .uri(uri); - match req.body(body_empty()) { - Ok(req) => { - let task = tokio::spawn(async move { - select! { - _ = sleep(Duration::from_millis(1500)).fuse() => { - Err(Error::with_msg_no_trace(format!("timeout"))) - } - res = async move { - let mut client = if let Ok(x) = connect_client(req.uri()).await {x} - else { return Err(Error::with_msg("can not make request")); }; - let res = if let Ok(x) = client.send_request(req).await { x } - else { return Err(Error::with_msg("can not make request")); }; - Ok(res) - }.fuse() => { - Ok(process_answer(res?).await?) - } - } - }); - Some((node.clone(), task)) - } - Err(e) => { - error!("bad request: {e}"); - None - } - } - }) - .collect(); - #[derive(Serialize)] - struct Hres { - node: Node, - res: JsonValue, - } - #[derive(Serialize)] - struct Jres { - hosts: Vec, - } - let mut a = Vec::new(); - for (node, jh) in spawned { - let res = match jh.await { - Ok(k) => match k { - Ok(k) => k, - Err(e) => JsonValue::String(format!("ERROR({:?})", e)), - }, - Err(e) => JsonValue::String(format!("ERROR({:?})", e)), - }; - let v = Hres { - node: node.clone(), - res, - }; - a.push(v); - } - let a = a; - let res = response(StatusCode::OK) - .header(http::header::CONTENT_TYPE, APP_JSON) - .body(ToJsonBody::from(&Jres { hosts: a }).into_body())?; - Ok(res) -} - #[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord, Serialize, Deserialize)] pub struct Tag(pub String); @@ -146,7 +47,7 @@ pub struct SubRes { } pub async fn gather_get_json_generic( - _method: http::Method, + method: http::Method, urls: Vec, bodies: Vec>, tags: Vec, @@ -155,6 +56,7 @@ pub async fn gather_get_json_generic( // TODO use deadline instead. // TODO Wait a bit longer compared to remote to receive partial results. timeout: Duration, + ctx: &ReqCtx, ) -> Result where SM: Send + 'static, @@ -179,7 +81,7 @@ where .zip(bodies.into_iter()) .zip(tags.into_iter()) .filter_map(move |((url, body), tag)| { - info!("Try gather from {}", url); + info!("try gather from {}", url); let uri: Uri = if let Ok(x) = url.as_str().parse() { x } else { @@ -187,18 +89,23 @@ where return None; }; let req = if body.is_some() { + if method == Method::GET { + warn!("gather sends body via GET"); + } Request::builder() - .method(Method::POST) + .method(method.clone()) .header(header::HOST, uri.host().unwrap()) .header(http::header::CONTENT_TYPE, APP_JSON) .header(http::header::ACCEPT, APP_JSON) + .header(ctx.header_name(), ctx.header_value()) .uri(uri) } else { Request::builder() - .method(Method::GET) + .method(method.clone()) .header(header::HOST, uri.host().unwrap()) .header(http::header::CONTENT_TYPE, APP_JSON) .header(http::header::ACCEPT, APP_JSON) + .header(ctx.header_name(), ctx.header_value()) .uri(uri) }; let body = match body { @@ -267,6 +174,7 @@ mod test { #[test] fn try_search() { + let ctx = ReqCtx::for_test(); let fut = gather_get_json_generic( hyper::Method::GET, Vec::new(), @@ -285,6 +193,7 @@ mod test { }, |_all| Ok(String::from("DUMMY-SEARCH-TEST-RESULT-TODO")), Duration::from_millis(800), + &ctx, ); let _ = fut; } diff --git a/crates/httpret/src/httpret.rs b/crates/httpret/src/httpret.rs index f70f04d..8be6793 100644 --- a/crates/httpret/src/httpret.rs +++ b/crates/httpret/src/httpret.rs @@ -16,18 +16,14 @@ pub mod settings; use self::bodystream::ToPublicResponse; use crate::bodystream::response; use crate::err::Error; -use crate::gather::gather_get_json; use ::err::thiserror; use ::err::ThisError; -use bytes::Bytes; use futures_util::Future; use futures_util::FutureExt; -use futures_util::StreamExt; use http::Method; use http::StatusCode; use httpclient::body_bytes; use httpclient::body_empty; -use httpclient::body_stream; use httpclient::body_string; use httpclient::IntoBody; use httpclient::Requ; @@ -39,27 +35,21 @@ use net::SocketAddr; use netpod::log::*; use netpod::query::prebinned::PreBinnedQuery; use netpod::req_uri_to_url; +use netpod::status_board; +use netpod::status_board_init; use netpod::NodeConfigCached; use netpod::ReqCtx; use netpod::ServiceVersion; use netpod::APP_JSON; -use netpod::APP_JSON_LINES; use panic::AssertUnwindSafe; use panic::UnwindSafe; use pin::Pin; use serde::Deserialize; use serde::Serialize; -use std::collections::BTreeMap; use std::net; use std::panic; use std::pin; -use std::sync::atomic::AtomicPtr; -use std::sync::atomic::Ordering; -use std::sync::Once; -use std::sync::RwLock; -use std::sync::RwLockWriteGuard; use std::task; -use std::time::SystemTime; use task::Context; use task::Poll; use taskrun::tokio; @@ -172,7 +162,7 @@ async fn the_service_fn( node_config: NodeConfigCached, service_version: ServiceVersion, ) -> Result { - let ctx = ReqCtx::new(status_board()?.new_status_id()).with_node(&req, &node_config); + let ctx = ReqCtx::new_with_node(&req, &node_config); let reqid_span = span!(Level::INFO, "req", reqid = ctx.reqid()); let f = http_service(req, addr, ctx, node_config, service_version); let f = Cont { f: Box::pin(f) }; @@ -423,36 +413,18 @@ async fn http_service_inner( } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?) } - } else if path.starts_with("/api/4/gather/") { - if req.method() == Method::GET { - Ok(gather_get_json(req, &node_config).await?) - } else { - Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?) - } } else if path == "/api/4/clear_cache" { if req.method() == Method::GET { Ok(clear_cache_all(req, ctx, &node_config).await?) } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?) } - } else if path == "/api/4/update_db_with_channel_names" { - if req.method() == Method::GET { - Ok(update_db_with_channel_names(req, ctx, &node_config).await?) - } else { - Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?) - } - } else if path == "/api/4/update_db_with_all_channel_configs" { - if req.method() == Method::GET { - Ok(update_db_with_all_channel_configs(req, ctx, &node_config).await?) - } else { - Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?) - } - } else if path == "/api/4/update_search_cache" { - if req.method() == Method::GET { - Ok(update_search_cache(req, ctx, &node_config).await?) - } else { - Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?) - } + } else if let Some(h) = api4::maintenance::UpdateDbWithChannelNamesHandler::handler(&req) { + Ok(h.handle(req, ctx, &node_config).await?) + } else if let Some(h) = api4::maintenance::UpdateDbWithAllChannelConfigsHandler::handler(&req) { + Ok(h.handle(req, ctx, &node_config).await?) + } else if let Some(h) = api4::maintenance::UpdateSearchCacheHandler::handler(&req) { + Ok(h.handle(req, ctx, &node_config).await?) } else if let Some(h) = download::DownloadHandler::handler(&req) { Ok(h.handle(req, &node_config).await?) } else if let Some(h) = settings::SettingsThreadsMaxHandler::handler(&req) { @@ -463,6 +435,8 @@ async fn http_service_inner( Ok(h.handle(req, &node_config).await?) } else if let Some(h) = pulsemap::IndexFullHttpFunction::handler(&req) { Ok(h.handle(req, &node_config).await?) + } else if let Some(h) = pulsemap::IndexChannelHttpFunction::handler(&req) { + Ok(h.handle(req, ctx, &node_config).await?) } else if let Some(h) = pulsemap::MarkClosedHttpFunction::handler(&req) { Ok(h.handle(req, &node_config).await?) } else if let Some(h) = pulsemap::MapPulseLocalHttpFunction::handler(&req) { @@ -535,9 +509,8 @@ impl StatusBoardAllHandler { } pub async fn handle(&self, _req: Requ, _node_config: &NodeConfigCached) -> Result { - use std::ops::Deref; let sb = status_board().unwrap(); - let buf = serde_json::to_string(sb.deref()).unwrap(); + let buf = serde_json::to_string(std::ops::Deref::deref(&sb)).unwrap(); let res = response(StatusCode::OK).body(body_string(buf))?; Ok(res) } @@ -597,308 +570,3 @@ async fn clear_cache_all( .body(body_string(serde_json::to_string(&res)?))?; Ok(ret) } - -async fn update_db_with_channel_names( - req: Requ, - _ctx: &ReqCtx, - node_config: &NodeConfigCached, -) -> Result { - info!("httpret::update_db_with_channel_names"); - let (head, _body) = req.into_parts(); - let _dry = match head.uri.query() { - Some(q) => q.contains("dry"), - None => false, - }; - let res = - dbconn::scan::update_db_with_channel_names(node_config.clone(), &node_config.node_config.cluster.database) - .await; - match res { - Ok(res) => { - let stream = res.map(|k| match serde_json::to_string(&k) { - Ok(mut item) => { - item.push('\n'); - Ok(Bytes::from(item)) - } - Err(e) => Err(e), - }); - let ret = response(StatusCode::OK) - .header(http::header::CONTENT_TYPE, APP_JSON_LINES) - .body(body_stream(stream))?; - Ok(ret) - } - Err(e) => { - let p = serde_json::to_string(&e)?; - let res = response(StatusCode::OK) - .header(http::header::CONTENT_TYPE, APP_JSON_LINES) - .body(body_string(p))?; - Ok(res) - } - } -} - -#[allow(unused)] -async fn update_db_with_channel_names_3( - req: Requ, - _ctx: &ReqCtx, - node_config: &NodeConfigCached, -) -> Result { - let (head, _body) = req.into_parts(); - let _dry = match head.uri.query() { - Some(q) => q.contains("dry"), - None => false, - }; - let res = dbconn::scan::update_db_with_channel_names_3(node_config); - let stream = res.map(|k| match serde_json::to_string(&k) { - Ok(mut item) => { - item.push('\n'); - Ok(Bytes::from(item)) - } - Err(e) => Err(e), - }); - let ret = response(StatusCode::OK) - .header(http::header::CONTENT_TYPE, APP_JSON_LINES) - .body(body_stream(stream))?; - Ok(ret) -} - -async fn update_db_with_all_channel_configs( - req: Requ, - _ctx: &ReqCtx, - node_config: &NodeConfigCached, -) -> Result { - let (head, _body) = req.into_parts(); - let _dry = match head.uri.query() { - Some(q) => q.contains("dry"), - None => false, - }; - let res = dbconn::scan::update_db_with_all_channel_configs(node_config.clone()).await?; - let stream = res.map(|k| match serde_json::to_string(&k) { - Ok(mut item) => { - item.push('\n'); - Ok(Bytes::from(item)) - } - Err(e) => Err(e), - }); - let ret = response(StatusCode::OK) - .header(http::header::CONTENT_TYPE, APP_JSON_LINES) - .body(body_stream(stream))?; - Ok(ret) -} - -async fn update_search_cache( - req: Requ, - _ctx: &ReqCtx, - node_config: &NodeConfigCached, -) -> Result { - let (head, _body) = req.into_parts(); - let _dry = match head.uri.query() { - Some(q) => q.contains("dry"), - None => false, - }; - let res = dbconn::scan::update_search_cache(node_config).await?; - let ret = response(StatusCode::OK) - .header(http::header::CONTENT_TYPE, APP_JSON) - .body(body_string(serde_json::to_string(&res)?))?; - Ok(ret) -} - -#[derive(Debug, Serialize)] -pub struct StatusBoardEntry { - #[allow(unused)] - #[serde(serialize_with = "instant_serde::ser")] - ts_created: SystemTime, - #[serde(serialize_with = "instant_serde::ser")] - ts_updated: SystemTime, - // #[serde(skip_serializing_if = "is_false")] - done: bool, - // #[serde(skip_serializing_if = "Vec::is_empty")] - errors: Vec<::err::Error>, - // TODO make this a better Stats container and remove pub access. - // #[serde(default, skip_serializing_if = "CmpZero::is_zero")] - error_count: usize, - // #[serde(default, skip_serializing_if = "CmpZero::is_zero")] - warn_count: usize, - // #[serde(default, skip_serializing_if = "CmpZero::is_zero")] - channel_not_found: usize, - // #[serde(default, skip_serializing_if = "CmpZero::is_zero")] - subreq_fail: usize, -} - -mod instant_serde { - use super::*; - use netpod::DATETIME_FMT_3MS; - use serde::Serializer; - - pub fn ser(x: &SystemTime, ser: S) -> Result { - use chrono::LocalResult; - let dur = x.duration_since(std::time::UNIX_EPOCH).unwrap(); - let res = chrono::TimeZone::timestamp_opt(&chrono::Utc, dur.as_secs() as i64, dur.subsec_nanos()); - match res { - LocalResult::None => Err(serde::ser::Error::custom(format!("Bad local instant conversion"))), - LocalResult::Single(dt) => { - let s = dt.format(DATETIME_FMT_3MS).to_string(); - ser.serialize_str(&s) - } - LocalResult::Ambiguous(dt, _dt2) => { - let s = dt.format(DATETIME_FMT_3MS).to_string(); - ser.serialize_str(&s) - } - } - } -} - -impl StatusBoardEntry { - pub fn new() -> Self { - Self { - ts_created: SystemTime::now(), - ts_updated: SystemTime::now(), - done: false, - errors: Vec::new(), - error_count: 0, - warn_count: 0, - channel_not_found: 0, - subreq_fail: 0, - } - } - - pub fn warn_inc(&mut self) { - self.warn_count += 1; - } - - pub fn channel_not_found_inc(&mut self) { - self.channel_not_found += 1; - } -} - -#[derive(Debug, Serialize)] -pub struct StatusBoardEntryUser { - // #[serde(default, skip_serializing_if = "CmpZero::is_zero")] - error_count: usize, - // #[serde(default, skip_serializing_if = "CmpZero::is_zero")] - warn_count: usize, - // #[serde(default, skip_serializing_if = "CmpZero::is_zero")] - channel_not_found: usize, - #[serde(skip_serializing_if = "Vec::is_empty")] - errors: Vec<::err::PublicError>, -} - -impl From<&StatusBoardEntry> for StatusBoardEntryUser { - fn from(e: &StatusBoardEntry) -> Self { - Self { - error_count: e.error_count, - warn_count: e.warn_count, - channel_not_found: e.channel_not_found, - errors: e.errors.iter().map(|e| e.to_public_error()).collect(), - } - } -} - -#[derive(Debug, Serialize)] -pub struct StatusBoard { - entries: BTreeMap, -} - -impl StatusBoard { - pub fn new() -> Self { - Self { - entries: BTreeMap::new(), - } - } - - pub fn new_status_id(&mut self) -> String { - self.clean_if_needed(); - let n: u32 = rand::random(); - let s = format!("{:08x}", n); - self.entries.insert(s.clone(), StatusBoardEntry::new()); - s - } - - pub fn clean_if_needed(&mut self) { - if self.entries.len() > 15000 { - let mut tss: Vec<_> = self.entries.values().map(|e| e.ts_updated).collect(); - tss.sort_unstable(); - let tss = tss; - let tsm = tss[tss.len() / 3]; - let a = std::mem::replace(&mut self.entries, BTreeMap::new()); - self.entries = a.into_iter().filter(|(_k, v)| v.ts_updated >= tsm).collect(); - } - } - - pub fn get_entry(&mut self, status_id: &str) -> Option<&mut StatusBoardEntry> { - self.entries.get_mut(status_id) - } - - pub fn mark_alive(&mut self, status_id: &str) { - match self.entries.get_mut(status_id) { - Some(e) => { - e.ts_updated = SystemTime::now(); - } - None => { - error!("can not find status id {}", status_id); - } - } - } - - pub fn mark_done(&mut self, status_id: &str) { - match self.entries.get_mut(status_id) { - Some(e) => { - e.ts_updated = SystemTime::now(); - e.done = true; - } - None => { - error!("can not find status id {}", status_id); - } - } - } - - pub fn add_error(&mut self, status_id: &str, err: ::err::Error) { - match self.entries.get_mut(status_id) { - Some(e) => { - e.ts_updated = SystemTime::now(); - if e.errors.len() < 100 { - e.errors.push(err); - e.error_count += 1; - } - } - None => { - error!("can not find status id {}", status_id); - } - } - } - - pub fn status_as_json(&self, status_id: &str) -> StatusBoardEntryUser { - match self.entries.get(status_id) { - Some(e) => e.into(), - None => { - error!("can not find status id {}", status_id); - let _e = ::err::Error::with_public_msg_no_trace(format!("request-id unknown {status_id}")); - StatusBoardEntryUser { - error_count: 1, - warn_count: 0, - channel_not_found: 0, - errors: vec![::err::Error::with_public_msg_no_trace("request-id not found").into()], - } - } - } - } -} - -static STATUS_BOARD: AtomicPtr> = AtomicPtr::new(std::ptr::null_mut()); - -pub fn status_board() -> Result, RetrievalError> { - let x = unsafe { &*STATUS_BOARD.load(Ordering::SeqCst) }.write(); - match x { - Ok(x) => Ok(x), - Err(e) => Err(RetrievalError::TextError(format!("{e}"))), - } -} - -pub fn status_board_init() { - static STATUS_BOARD_INIT: Once = Once::new(); - STATUS_BOARD_INIT.call_once(|| { - let b = StatusBoard::new(); - let a = RwLock::new(b); - let x = Box::new(a); - STATUS_BOARD.store(Box::into_raw(x), Ordering::SeqCst); - }); -} diff --git a/crates/httpret/src/proxy.rs b/crates/httpret/src/proxy.rs index 92015d0..93f01d7 100644 --- a/crates/httpret/src/proxy.rs +++ b/crates/httpret/src/proxy.rs @@ -111,7 +111,7 @@ async fn the_service_fn( proxy_config: ProxyConfig, service_version: ServiceVersion, ) -> Result { - let ctx = ReqCtx::new(status_board().unwrap().new_status_id()).with_proxy(&req, &proxy_config); + let ctx = ReqCtx::new_with_proxy(&req, &proxy_config); let reqid_span = span!(Level::INFO, "req", reqid = ctx.reqid()); let f = proxy_http_service(req, addr, ctx, proxy_config.clone(), service_version.clone()); let f = Cont { f: Box::pin(f) }; @@ -167,9 +167,9 @@ async fn proxy_http_service_inner( let uri = req.uri().clone(); let path = uri.path(); if path == "/api/1/channels" { - Ok(channel_search_list_v1(req, proxy_config).await?) + Ok(channel_search_list_v1(req, ctx, proxy_config).await?) } else if path == "/api/1/channels/config" { - Ok(channel_search_configs_v1(req, proxy_config).await?) + Ok(channel_search_configs_v1(req, ctx, proxy_config).await?) } else if path.starts_with("/api/1/gather/") { Ok(gather_json_2_v1(req, "/api/1/gather/", proxy_config).await?) } else if path == "/api/4/private/version" { @@ -190,7 +190,7 @@ async fn proxy_http_service_inner( } else if path == "/api/4/backends" { Ok(backends(req, proxy_config).await?) } else if let Some(h) = api4::ChannelSearchAggHandler::handler(&req) { - h.handle(req, &proxy_config).await + h.handle(req, ctx, &proxy_config).await } else if path == "/api/4/events" { Ok(proxy_single_backend_query::(req, ctx, proxy_config).await?) } else if path == "/api/4/status/connection/events" { @@ -323,7 +323,7 @@ pub async fn backends(_req: Requ, proxy_config: &ProxyConfig) -> Result Result { +pub async fn channel_search(req: Requ, ctx: &ReqCtx, proxy_config: &ProxyConfig) -> Result { let (head, _body) = req.into_parts(); match head.headers.get(http::header::ACCEPT) { Some(v) => { @@ -486,6 +486,7 @@ pub async fn channel_search(req: Requ, proxy_config: &ProxyConfig) -> Result Result( req: Requ, - _ctx: &ReqCtx, + ctx: &ReqCtx, proxy_config: &ProxyConfig, ) -> Result where @@ -612,8 +613,8 @@ where } }; let bodies = (0..urls.len()).into_iter().map(|_| None).collect(); - let ret = - gather_get_json_generic(http::Method::GET, urls, bodies, tags, nt, ft, query.timeout()).await?; + let ret = gather_get_json_generic(http::Method::GET, urls, bodies, tags, nt, ft, query.timeout(), ctx) + .await?; Ok(ret) } else { Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?) diff --git a/crates/httpret/src/proxy/api1.rs b/crates/httpret/src/proxy/api1.rs index 8cee64e..d2ce1f9 100644 --- a/crates/httpret/src/proxy/api1.rs +++ b/crates/httpret/src/proxy/api1.rs @@ -99,7 +99,7 @@ impl PythonDataApi1Query { Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(body_empty())?) } else { info!("backend returned OK"); - let riq_def = HeaderValue::from_static("(none)"); + let riq_def = HeaderValue::from_static("none"); let riq = head.headers.get(X_DAQBUF_REQID).unwrap_or(&riq_def); Ok(response(StatusCode::OK) .header(X_DAQBUF_REQID, riq) diff --git a/crates/httpret/src/proxy/api1/reqstatus.rs b/crates/httpret/src/proxy/api1/reqstatus.rs index 9551492..219d77d 100644 --- a/crates/httpret/src/proxy/api1/reqstatus.rs +++ b/crates/httpret/src/proxy/api1/reqstatus.rs @@ -51,6 +51,9 @@ impl RequestStatusHandler { let _body_data = read_body_bytes(body).await?; let status_id = &head.uri.path()[Self::path_prefix().len()..]; debug!("RequestStatusHandler status_id {:?}", status_id); + if status_id.len() < 8 { + return Err(Error::with_msg_no_trace(format!("bad status id {}", status_id))); + } let back = { let mut ret = None; @@ -63,7 +66,14 @@ impl RequestStatusHandler { ret }; if let Some(back) = back { - let url_str = format!("{}{}{}", back.url, Self::path_prefix(), status_id); + let (status_id, url) = if back.url.contains("sf-daqbuf-23.psi.ch") { + // TODO split_at may panic on bad input + let (status_id, node_tgt) = status_id.split_at(status_id.len() - 2); + (status_id, back.url.replace("-23.", &format!("-{}.", node_tgt))) + } else { + (status_id, back.url.clone()) + }; + let url_str = format!("{}{}{}", url, Self::path_prefix(), status_id); debug!("try to ask {url_str}"); let uri: Uri = url_str.parse()?; let req = Request::builder() diff --git a/crates/httpret/src/proxy/api4.rs b/crates/httpret/src/proxy/api4.rs index a8efc17..5bb1c7c 100644 --- a/crates/httpret/src/proxy/api4.rs +++ b/crates/httpret/src/proxy/api4.rs @@ -41,7 +41,7 @@ use url::Url; // The aggregators and leaf nodes behind should as well not depend on backend, // but simply answer all matching. -pub async fn channel_search(req: Requ, proxy_config: &ProxyConfig) -> Result { +pub async fn channel_search(req: Requ, ctx: &ReqCtx, proxy_config: &ProxyConfig) -> Result { let (head, _body) = req.into_parts(); let inpurl = req_uri_to_url(&head.uri)?; let query = ChannelSearchQuery::from_url(&inpurl)?; @@ -112,6 +112,7 @@ pub async fn channel_search(req: Requ, proxy_config: &ProxyConfig) -> Result Result { + pub async fn handle(&self, req: Requ, ctx: &ReqCtx, node_config: &ProxyConfig) -> Result { if req.method() == Method::GET { let accept_def = APP_JSON; let accept = req @@ -136,7 +137,7 @@ impl ChannelSearchAggHandler { .get(http::header::ACCEPT) .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); if accept.contains(APP_JSON) || accept.contains(ACCEPT_ALL) { - match channel_search(req, node_config).await { + match channel_search(req, ctx, node_config).await { Ok(item) => Ok(response(StatusCode::OK).body(ToJsonBody::from(&item).into_body())?), Err(e) => { warn!("ChannelConfigHandler::handle: got error from channel_config: {e:?}"); @@ -186,7 +187,7 @@ impl StatusNodesRecursive { async fn status( &self, _req: Requ, - _ctx: &ReqCtx, + ctx: &ReqCtx, proxy_config: &ProxyConfig, service_version: &ServiceVersion, ) -> Result { @@ -268,6 +269,7 @@ impl StatusNodesRecursive { nt, ft, Duration::from_millis(1200), + ctx, ) .await?; Ok(ret) diff --git a/crates/httpret/src/pulsemap.rs b/crates/httpret/src/pulsemap.rs index db0de04..892121d 100644 --- a/crates/httpret/src/pulsemap.rs +++ b/crates/httpret/src/pulsemap.rs @@ -28,6 +28,7 @@ use netpod::FromUrl; use netpod::HasBackend; use netpod::HasTimeout; use netpod::NodeConfigCached; +use netpod::ReqCtx; use netpod::DATETIME_FMT_9MS; use scyllaconn::scylla; use serde::Deserialize; @@ -62,10 +63,6 @@ pub struct MapPulseHisto { _counts: Vec, } -const _MAP_INDEX_FAST_URL_PREFIX: &'static str = "/api/1/map/index/fast/"; -const MAP_PULSE_HISTO_URL_PREFIX: &'static str = "/api/1/map/pulse/histo/"; -const MAP_PULSE_URL_PREFIX: &'static str = "/api/1/map/pulse/"; -const MAP_PULSE_LOCAL_URL_PREFIX: &'static str = "/api/1/map/pulse/local/"; const MAP_PULSE_MARK_CLOSED_URL_PREFIX: &'static str = "/api/1/map/pulse/mark/closed/"; const API_4_MAP_PULSE_URL_PREFIX: &'static str = "/api/4/map/pulse/"; @@ -106,7 +103,7 @@ fn timer_channel_names() -> Vec { }) .flatten() .collect(); - all.push("SIN-CVME-TIFGUN-EVR0:RX-PULSEID".into()); + // all.push("SIN-CVME-TIFGUN-EVR0:RX-PULSEID".into()); all.push("SAR-CVME-TIFALL4:EvtSet".into()); all.push("SAR-CVME-TIFALL5:EvtSet".into()); all.push("SAR-CVME-TIFALL6:EvtSet".into()); @@ -426,7 +423,8 @@ impl IndexFullHttpFunction { make_tables(&pgc).await?; let chs = timer_channel_names(); for channel_name in chs { - match Self::index_channel(&channel_name, &pgc, do_print, &insert_01, node_config).await { + match IndexChannelHttpFunction::index_channel(&channel_name, &pgc, do_print, &insert_01, node_config).await + { Ok(m) => { msg.push_str("\n"); msg.push_str(&m); @@ -439,6 +437,67 @@ impl IndexFullHttpFunction { } Ok(msg) } +} + +pub struct IndexChannelHttpFunction {} + +impl IndexChannelHttpFunction { + pub fn handler(req: &Requ) -> Option { + if req.uri().path().eq("/api/1/maintenance/index/pulse/channel") { + Some(Self {}) + } else { + None + } + } + + pub async fn handle( + &self, + req: Requ, + ctx: &ReqCtx, + node_config: &NodeConfigCached, + ) -> Result { + if req.method() != Method::GET { + return Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?); + } + let ret = match Self::index(req, false, node_config).await { + Ok(msg) => { + let dt = ctx.ts_ctor().elapsed(); + let res = response(StatusCode::OK).body(body_string(msg))?; + info!("IndexChannelHttpFunction response dt {:.0} ms", 1e3 * dt.as_secs_f32()); + res + } + Err(e) => { + error!("IndexChannelHttpFunction {e}"); + response(StatusCode::INTERNAL_SERVER_ERROR).body(body_string(format!("{:?}", e)))? + } + }; + Ok(ret) + } + + async fn index(req: Requ, do_print: bool, node_config: &NodeConfigCached) -> Result { + // TODO avoid double-insert on central storage. + let pgc = dbconn::create_connection(&node_config.node_config.cluster.database).await?; + // TODO remove update of static columns when older clients are removed. + let sql = "insert into map_pulse_files (channel, split, timebin, pulse_min, pulse_max, hostname, ks) values ($1, $2, $3, $4, $5, $6, $7) on conflict (channel, split, timebin) do update set pulse_min = $4, pulse_max = $5, upc1 = map_pulse_files.upc1 + 1, hostname = $6"; + let insert_01 = pgc.prepare(sql).await?; + make_tables(&pgc).await?; + let url = Url::parse(&format!("dummy:{}", req.uri())) + .map_err(|e| Error::with_msg_no_trace(format!("{} {}", e, req.uri())))?; + let pairs: std::collections::HashMap<_, _> = url.query_pairs().collect(); + let channel_name = if let Some(x) = pairs.get("name") { + x + } else { + return Err(Error::with_msg_no_trace(format!("no name given"))); + }; + match Self::index_channel(&channel_name, &pgc, do_print, &insert_01, node_config).await { + Ok(m) => Ok(m), + Err(e) => { + error!("error while indexing {} {:?}", channel_name, e); + let s = format!("error from index_channel {e}"); + Ok(s) + } + } + } async fn index_channel( channel_name: &str, @@ -970,8 +1029,12 @@ fn extract_path_number_after_prefix(req: &Requ, prefix: &str) -> Result &'static str { + "/api/1/map/pulse/local/" + } + pub fn handler(req: &Requ) -> Option { - if req.uri().path().starts_with(MAP_PULSE_LOCAL_URL_PREFIX) { + if req.uri().path().starts_with(Self::prefix()) { Some(Self {}) } else { None @@ -982,8 +1045,16 @@ impl MapPulseLocalHttpFunction { if req.method() != Method::GET { return Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?); } - let pulse = extract_path_number_after_prefix(&req, MAP_PULSE_LOCAL_URL_PREFIX)?; - let req_from = req.headers().get("x-req-from").map_or(None, |x| Some(format!("{x:?}"))); + let pulse = extract_path_number_after_prefix(&req, Self::prefix())?; + let req_from = req + .headers() + .get("x-req-from") + .map(|x| { + x.to_str() + .map(|x| x.to_string()) + .unwrap_or_else(|_| format!("bad string in x-req-from")) + }) + .unwrap_or_else(|| String::from("missing x-req-from")); let ts1 = Instant::now(); let conn = dbconn::create_connection(&node_config.node_config.cluster.database).await?; let sql = "select channel, hostname, timebin, split, ks from map_pulse_files where hostname = $1 and pulse_min <= $2 and (pulse_max >= $2 or closed = 0)"; @@ -1000,7 +1071,7 @@ impl MapPulseLocalHttpFunction { }) .collect(); let dt = Instant::now().duration_since(ts1); - if dt >= Duration::from_millis(500) { + if dt >= Duration::from_millis(300) { info!( "map pulse local req-from {:?} candidate list in {:.0}ms", req_from, @@ -1126,8 +1197,12 @@ pub struct TsHisto { pub struct MapPulseHistoHttpFunction {} impl MapPulseHistoHttpFunction { + pub fn prefix() -> &'static str { + "/api/1/map/pulse/histo/" + } + pub fn handler(req: &Requ) -> Option { - if req.uri().path().starts_with(MAP_PULSE_HISTO_URL_PREFIX) { + if req.uri().path().starts_with(Self::prefix()) { Some(Self {}) } else { None @@ -1138,7 +1213,7 @@ impl MapPulseHistoHttpFunction { if req.method() != Method::GET { return Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?); } - let pulse = extract_path_number_after_prefix(&req, MAP_PULSE_HISTO_URL_PREFIX)?; + let pulse = extract_path_number_after_prefix(&req, Self::prefix())?; let ret = Self::histo(pulse, node_config).await?; Ok(response(StatusCode::OK).body(body_string(serde_json::to_string(&ret)?))?) } @@ -1148,7 +1223,10 @@ impl MapPulseHistoHttpFunction { for node in &node_config.node_config.cluster.nodes { let s = format!( "http://{}:{}{}{}", - node.host, node.port, MAP_PULSE_LOCAL_URL_PREFIX, pulse + node.host, + node.port, + MapPulseLocalHttpFunction::prefix(), + pulse ); let uri: Uri = s.parse()?; let req = Request::get(&uri) @@ -1216,8 +1294,12 @@ impl MapPulseHistoHttpFunction { pub struct MapPulseHttpFunction {} impl MapPulseHttpFunction { + pub fn prefix() -> &'static str { + "/api/1/map/pulse/" + } + pub fn handler(req: &Requ) -> Option { - if req.uri().path().starts_with(MAP_PULSE_URL_PREFIX) { + if req.uri().path().starts_with(Self::prefix()) { Some(Self {}) } else { None @@ -1230,7 +1312,7 @@ impl MapPulseHttpFunction { return Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?); } trace!("MapPulseHttpFunction handle uri: {:?}", req.uri()); - let pulse = extract_path_number_after_prefix(&req, MAP_PULSE_URL_PREFIX)?; + let pulse = extract_path_number_after_prefix(&req, Self::prefix())?; match CACHE.portal(pulse) { CachePortal::Fresh => { let histo = MapPulseHistoHttpFunction::histo(pulse, node_config).await?; diff --git a/crates/netpod/Cargo.toml b/crates/netpod/Cargo.toml index 90966c1..3a5b286 100644 --- a/crates/netpod/Cargo.toml +++ b/crates/netpod/Cargo.toml @@ -20,4 +20,5 @@ tracing = "0.1.37" url = "2.5.0" num-traits = "0.2.16" hex = "0.4.3" +rand = "0.8.5" err = { path = "../err" } diff --git a/crates/netpod/src/netpod.rs b/crates/netpod/src/netpod.rs index 1eb0094..c471b7b 100644 --- a/crates/netpod/src/netpod.rs +++ b/crates/netpod/src/netpod.rs @@ -33,9 +33,16 @@ use std::net::SocketAddr; use std::path::PathBuf; use std::pin::Pin; use std::str::FromStr; +use std::sync::atomic; +use std::sync::atomic::AtomicPtr; +use std::sync::Once; +use std::sync::RwLock; +use std::sync::RwLockWriteGuard; use std::task::Context; use std::task::Poll; use std::time::Duration; +use std::time::Instant; +use std::time::SystemTime; use timeunits::*; use url::Url; @@ -3115,62 +3122,94 @@ pub const PSI_DAQBUFFER_SEEN_URL: &'static str = "PSI-Daqbuffer-Seen-Url"; #[derive(Debug, Clone)] pub struct ReqCtx { + ts_ctor: Instant, reqid: String, + reqid_this: String, marks: Vec, mark: String, } impl ReqCtx { - pub fn new(reqid: S) -> Self - where - S: Into, - { - let ret = Self { - reqid: reqid.into(), + pub fn new_with_node(req: &Request, nc: &NodeConfigCached) -> Self { + let reqid_this = status_board().unwrap().new_status_id(); + let reqid = if let Some(reqid_parent) = req.headers().get("daqbuf-reqid") { + let parent = reqid_parent.to_str().unwrap_or("badid"); + format!("{}-{}", parent, reqid_this) + } else { + reqid_this.clone() + }; + let mark = format!("{}:{}", nc.node_config.name, nc.node.port); + let mut marks = Vec::new(); + for (n, v) in req.headers().iter() { + if n == PSI_DAQBUFFER_SERVICE_MARK { + marks.push(String::from_utf8_lossy(v.as_bytes()).to_string()); + } + } + Self { + ts_ctor: Instant::now(), + reqid, + reqid_this, + marks, + mark, + } + } + + pub fn new_with_proxy(req: &Request, proxy: &ProxyConfig) -> Self { + let reqid_this = status_board().unwrap().new_status_id(); + let reqid = if let Some(reqid_parent) = req.headers().get("daqbuf-reqid") { + let parent = reqid_parent.to_str().unwrap_or("badid"); + format!("{}-{}", parent, reqid_this) + } else { + reqid_this.clone() + }; + let mark = format!("{}:{}", proxy.name, proxy.port); + let mut marks = Vec::new(); + for (n, v) in req.headers().iter() { + if n == PSI_DAQBUFFER_SERVICE_MARK { + marks.push(String::from_utf8_lossy(v.as_bytes()).to_string()); + } + } + Self { + ts_ctor: Instant::now(), + reqid, + reqid_this, + marks, + mark, + } + } + + pub fn new_from_single_reqid(reqid: String) -> Self { + Self { + ts_ctor: Instant::now(), + reqid_this: reqid.clone(), + reqid, marks: Vec::new(), mark: String::new(), - }; - ret + } } pub fn for_test() -> Self { Self { - reqid: "TESTID".into(), + ts_ctor: Instant::now(), + reqid: "PARENTID-TESTID".into(), + reqid_this: "TESTID".into(), marks: Vec::new(), mark: String::new(), } } - pub fn with_node(mut self, req: &Request, nc: &NodeConfigCached) -> Self { - if let Some(reqid) = req.headers().get("daqbuf-reqid") { - self.reqid = format!("{}-{}", reqid.to_str().unwrap_or("BADID"), self.reqid()); - } - self.mark = format!("{}:{}", nc.node_config.name, nc.node.port); - for (n, v) in req.headers().iter() { - if n == PSI_DAQBUFFER_SERVICE_MARK { - self.marks.push(String::from_utf8_lossy(v.as_bytes()).to_string()); - } - } - self - } - - pub fn with_proxy(mut self, req: &Request, proxy: &ProxyConfig) -> Self { - if let Some(reqid) = req.headers().get("daqbuf-reqid") { - self.reqid = format!("{}-{}", reqid.to_str().unwrap_or("BADID"), self.reqid()); - } - self.mark = format!("{}:{}", proxy.name, proxy.port); - for (n, v) in req.headers().iter() { - if n == PSI_DAQBUFFER_SERVICE_MARK { - self.marks.push(String::from_utf8_lossy(v.as_bytes()).to_string()); - } - } - self + pub fn ts_ctor(&self) -> Instant { + self.ts_ctor.clone() } pub fn reqid(&self) -> &str { &self.reqid } + pub fn reqid_this(&self) -> &str { + &self.reqid_this + } + pub fn mark(&self) -> &str { &self.mark } @@ -3190,6 +3229,221 @@ impl ReqCtx { pub type ReqCtxArc = std::sync::Arc; +static STATUS_BOARD: AtomicPtr> = AtomicPtr::new(std::ptr::null_mut()); + +#[derive(Debug, Serialize)] +pub struct StatusBoardEntry { + #[allow(unused)] + #[serde(serialize_with = "instant_serde::ser")] + ts_created: SystemTime, + #[serde(serialize_with = "instant_serde::ser")] + ts_updated: SystemTime, + // #[serde(skip_serializing_if = "is_false")] + done: bool, + // #[serde(skip_serializing_if = "Vec::is_empty")] + errors: Vec<::err::Error>, + // TODO make this a better Stats container and remove pub access. + // #[serde(default, skip_serializing_if = "CmpZero::is_zero")] + error_count: usize, + // #[serde(default, skip_serializing_if = "CmpZero::is_zero")] + warn_count: usize, + // #[serde(default, skip_serializing_if = "CmpZero::is_zero")] + channel_not_found: usize, + // #[serde(default, skip_serializing_if = "CmpZero::is_zero")] + subreq_fail: usize, +} + +mod instant_serde { + use super::DATETIME_FMT_3MS; + use serde::Serializer; + use std::time::SystemTime; + + pub fn ser(x: &SystemTime, ser: S) -> Result { + use chrono::LocalResult; + let dur = x.duration_since(std::time::UNIX_EPOCH).unwrap(); + let res = chrono::TimeZone::timestamp_opt(&chrono::Utc, dur.as_secs() as i64, dur.subsec_nanos()); + match res { + LocalResult::None => Err(serde::ser::Error::custom(format!("Bad local instant conversion"))), + LocalResult::Single(dt) => { + let s = dt.format(DATETIME_FMT_3MS).to_string(); + ser.serialize_str(&s) + } + LocalResult::Ambiguous(dt, _dt2) => { + let s = dt.format(DATETIME_FMT_3MS).to_string(); + ser.serialize_str(&s) + } + } + } +} + +impl StatusBoardEntry { + pub fn new() -> Self { + Self { + ts_created: SystemTime::now(), + ts_updated: SystemTime::now(), + done: false, + errors: Vec::new(), + error_count: 0, + warn_count: 0, + channel_not_found: 0, + subreq_fail: 0, + } + } + + pub fn warn_inc(&mut self) { + self.warn_count += 1; + } + + pub fn channel_not_found_inc(&mut self) { + self.channel_not_found += 1; + } +} + +#[derive(Debug, Serialize)] +pub struct StatusBoardEntryUser { + // #[serde(default, skip_serializing_if = "CmpZero::is_zero")] + error_count: usize, + // #[serde(default, skip_serializing_if = "CmpZero::is_zero")] + warn_count: usize, + // #[serde(default, skip_serializing_if = "CmpZero::is_zero")] + channel_not_found: usize, + #[serde(skip_serializing_if = "Vec::is_empty")] + errors: Vec<::err::PublicError>, +} + +impl From<&StatusBoardEntry> for StatusBoardEntryUser { + fn from(e: &StatusBoardEntry) -> Self { + Self { + error_count: e.error_count, + warn_count: e.warn_count, + channel_not_found: e.channel_not_found, + errors: e.errors.iter().map(|e| e.to_public_error()).collect(), + } + } +} + +#[derive(Debug, Serialize)] +pub struct StatusBoard { + entries: BTreeMap, +} + +impl StatusBoard { + pub fn new() -> Self { + Self { + entries: BTreeMap::new(), + } + } + + pub fn new_status_id(&mut self) -> String { + self.clean_if_needed(); + let n: u32 = rand::random(); + let s = format!("{:08x}", n); + self.entries.insert(s.clone(), StatusBoardEntry::new()); + s + } + + pub fn clean_if_needed(&mut self) { + if self.entries.len() > 15000 { + let mut tss: Vec<_> = self.entries.values().map(|e| e.ts_updated).collect(); + tss.sort_unstable(); + let tss = tss; + let tsm = tss[tss.len() / 3]; + let a = std::mem::replace(&mut self.entries, BTreeMap::new()); + self.entries = a.into_iter().filter(|(_k, v)| v.ts_updated >= tsm).collect(); + } + } + + pub fn get_entry(&mut self, status_id: &str) -> Option<&mut StatusBoardEntry> { + self.entries.get_mut(status_id) + } + + pub fn mark_alive(&mut self, status_id: &str) { + match self.entries.get_mut(status_id) { + Some(e) => { + e.ts_updated = SystemTime::now(); + } + None => { + error!("can not find status id {}", status_id); + } + } + } + + pub fn mark_done(&mut self, status_id: &str) { + match self.entries.get_mut(status_id) { + Some(e) => { + e.ts_updated = SystemTime::now(); + e.done = true; + } + None => { + error!("can not find status id {}", status_id); + } + } + } + + pub fn add_error(&mut self, status_id: &str, err: ::err::Error) { + match self.entries.get_mut(status_id) { + Some(e) => { + e.ts_updated = SystemTime::now(); + if e.errors.len() < 100 { + e.errors.push(err); + e.error_count += 1; + } + } + None => { + error!("can not find status id {}", status_id); + } + } + } + + pub fn status_as_json(&self, status_id: &str) -> StatusBoardEntryUser { + match self.entries.get(status_id) { + Some(e) => e.into(), + None => { + error!("can not find status id {}", status_id); + let _e = ::err::Error::with_public_msg_no_trace(format!("request-id unknown {status_id}")); + StatusBoardEntryUser { + error_count: 1, + warn_count: 0, + channel_not_found: 0, + errors: vec![::err::Error::with_public_msg_no_trace("request-id not found").into()], + } + } + } + } +} + +#[derive(Debug)] +pub enum StatusBoardError { + CantAcquire, +} + +impl fmt::Display for StatusBoardError { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "{self:?}") + } +} + +pub fn status_board() -> Result, StatusBoardError> { + let x = unsafe { &*STATUS_BOARD.load(atomic::Ordering::SeqCst) }.write(); + match x { + Ok(x) => Ok(x), + Err(e) => { + error!("{e}"); + Err(StatusBoardError::CantAcquire) + } + } +} + +pub fn status_board_init() { + static STATUS_BOARD_INIT: Once = Once::new(); + STATUS_BOARD_INIT.call_once(|| { + let b = StatusBoard::new(); + let a = RwLock::new(b); + let x = Box::new(a); + STATUS_BOARD.store(Box::into_raw(x), atomic::Ordering::SeqCst); + }); +} + pub fn req_uri_to_url(uri: &Uri) -> Result { if uri.scheme().is_none() { format!("dummy:{uri}") diff --git a/crates/nodenet/src/conn.rs b/crates/nodenet/src/conn.rs index e5ce42a..837906c 100644 --- a/crates/nodenet/src/conn.rs +++ b/crates/nodenet/src/conn.rs @@ -166,7 +166,7 @@ pub async fn create_response_bytes_stream( evq.ch_conf().shape(), ); debug!("wasm1 {:?}", evq.wasm1()); - let reqctx = netpod::ReqCtx::new(evq.reqid()).into(); + let reqctx = netpod::ReqCtx::new_from_single_reqid(evq.reqid().into()).into(); if evq.create_errors_contains("nodenet_parse_query") { let e = Error::with_msg_no_trace("produced error on request nodenet_parse_query"); return Err(e);