diff --git a/crates/daqbufp2/src/test/api1.rs b/crates/daqbufp2/src/test/api1.rs index 2ea6b91..3acffbe 100644 --- a/crates/daqbufp2/src/test/api1.rs +++ b/crates/daqbufp2/src/test/api1.rs @@ -8,6 +8,7 @@ use netpod::log::*; use netpod::query::api1::Api1Query; use netpod::query::api1::Api1Range; use netpod::query::api1::ChannelTuple; +use netpod::ReqCtx; use netpod::APP_OCTET; use parse::api1_parse; use parse::api1_parse::Api1Frame; @@ -54,6 +55,7 @@ fn events_f64_plain() -> Result<(), Error> { return Ok(()); } let fut = async { + let ctx = ReqCtx::for_test(); let rh = require_test_hosts_running()?; let cluster = &rh.cluster; let node = &cluster.nodes[0]; @@ -64,7 +66,7 @@ fn events_f64_plain() -> Result<(), Error> { let ch = ChannelTuple::new(TEST_BACKEND.into(), "test-gen-i32-dim0-v01".into()); let qu = Api1Query::new(range, vec![ch]); let body = serde_json::to_string(&qu)?; - let buf = http_post(url, accept, body.into()).await?; + let buf = http_post(url, accept, body.into(), &ctx).await?; eprintln!("body received: {}", buf.len()); match api1_parse::api1_frames::>(&buf) { Ok((_, frames)) => { diff --git a/crates/daqbufp2/src/test/api1/data_api_python.rs b/crates/daqbufp2/src/test/api1/data_api_python.rs index 2312af9..0c21054 100644 --- a/crates/daqbufp2/src/test/api1/data_api_python.rs +++ b/crates/daqbufp2/src/test/api1/data_api_python.rs @@ -6,6 +6,7 @@ use netpod::range::evrange::NanoRange; use netpod::timeunits::MS; use netpod::Cluster; use netpod::HostPort; +use netpod::ReqCtx; use netpod::SfDbChannel; use netpod::APP_JSON; use netpod::DATETIME_FMT_3MS; @@ -23,6 +24,7 @@ async fn fetch_data_api_python_blob( end_date: &str, cluster: &Cluster, ) -> Result, Error> { + let ctx = ReqCtx::for_test(); let t1 = Utc::now(); let node0 = &cluster.nodes[0]; let beg_date = beg_date.parse()?; @@ -43,7 +45,7 @@ async fn fetch_data_api_python_blob( let hp = HostPort::from_node(node0); let url = Url::parse(&format!("http://{}:{}/api/1/query", hp.host, hp.port))?; info!("http get {}", url); - let buf = httpclient::http_post(url, APP_JSON, query_str).await?; + let buf = httpclient::http_post(url, APP_JSON, query_str, &ctx).await?; let t2 = chrono::Utc::now(); let ms = t2.signed_duration_since(t1).num_milliseconds() as u64; // TODO add timeout diff --git a/crates/daqbufp2/src/test/api4/binnedjson.rs b/crates/daqbufp2/src/test/api4/binnedjson.rs index dcfa983..23a72e4 100644 --- a/crates/daqbufp2/src/test/api4/binnedjson.rs +++ b/crates/daqbufp2/src/test/api4/binnedjson.rs @@ -10,6 +10,7 @@ use netpod::range::evrange::NanoRange; use netpod::AppendToUrl; use netpod::Cluster; use netpod::HostPort; +use netpod::ReqCtx; use netpod::SfDbChannel; use netpod::APP_JSON; use query::api4::binned::BinnedQuery; @@ -338,6 +339,7 @@ async fn get_binned_json( bin_count: u32, cluster: &Cluster, ) -> Result { + let ctx = ReqCtx::for_test(); let t1 = Utc::now(); let node0 = &cluster.nodes[0]; let beg_date = beg_date.parse()?; @@ -349,7 +351,7 @@ async fn get_binned_json( let mut url = Url::parse(&format!("http://{}:{}/api/4/binned", hp.host, hp.port))?; query.append_to_url(&mut url); let url = url; - let res = httpclient::http_get(url, APP_JSON).await?; + let res = httpclient::http_get(url, APP_JSON, &ctx).await?; let s = String::from_utf8_lossy(&res.body); let res: JsonValue = serde_json::from_str(&s)?; let pretty = serde_json::to_string_pretty(&res)?; diff --git a/crates/daqbufp2/src/test/api4/common.rs b/crates/daqbufp2/src/test/api4/common.rs index c6c10b5..707e90c 100644 --- a/crates/daqbufp2/src/test/api4/common.rs +++ b/crates/daqbufp2/src/test/api4/common.rs @@ -4,6 +4,7 @@ use netpod::log::*; use netpod::AppendToUrl; use netpod::Cluster; use netpod::HostPort; +use netpod::ReqCtx; use netpod::APP_JSON; use query::api4::binned::BinnedQuery; use query::api4::events::PlainEventsQuery; @@ -12,13 +13,14 @@ use url::Url; // TODO improve by a more information-rich return type. pub async fn fetch_events_json(query: PlainEventsQuery, cluster: &Cluster) -> Result { + let ctx = ReqCtx::for_test(); let t1 = Utc::now(); let node0 = &cluster.nodes[0]; let hp = HostPort::from_node(node0); let mut url = Url::parse(&format!("http://{}:{}/api/4/events", hp.host, hp.port))?; query.append_to_url(&mut url); let url = url; - let res = httpclient::http_get(url, APP_JSON).await?; + let res = httpclient::http_get(url, APP_JSON, &ctx).await?; let s = String::from_utf8_lossy(&res.body); let res: JsonValue = serde_json::from_str(&s)?; let pretty = serde_json::to_string_pretty(&res)?; @@ -32,13 +34,14 @@ pub async fn fetch_events_json(query: PlainEventsQuery, cluster: &Cluster) -> Re // TODO improve by a more information-rich return type. pub async fn fetch_binned_json(query: BinnedQuery, cluster: &Cluster) -> Result { + let ctx = ReqCtx::for_test(); let t1 = Utc::now(); let node0 = &cluster.nodes[0]; let hp = HostPort::from_node(node0); let mut url = Url::parse(&format!("http://{}:{}/api/4/binned", hp.host, hp.port))?; query.append_to_url(&mut url); let url = url; - let res = httpclient::http_get(url, APP_JSON).await?; + let res = httpclient::http_get(url, APP_JSON, &ctx).await?; let s = String::from_utf8_lossy(&res.body); let res: JsonValue = serde_json::from_str(&s)?; let pretty = serde_json::to_string_pretty(&res)?; diff --git a/crates/daqbufp2/src/test/api4/eventsjson.rs b/crates/daqbufp2/src/test/api4/eventsjson.rs index 328f415..4e2e663 100644 --- a/crates/daqbufp2/src/test/api4/eventsjson.rs +++ b/crates/daqbufp2/src/test/api4/eventsjson.rs @@ -2,7 +2,6 @@ use crate::nodes::require_test_hosts_running; use crate::test::api4::common::fetch_events_json; use chrono::Utc; use err::Error; -use http::StatusCode; use items_0::WithLen; use items_2::eventsdim0::EventsDim0CollectorOutput; use netpod::log::*; @@ -10,6 +9,7 @@ use netpod::range::evrange::NanoRange; use netpod::AppendToUrl; use netpod::Cluster; use netpod::HostPort; +use netpod::ReqCtx; use netpod::SfDbChannel; use netpod::APP_JSON; use query::api4::events::PlainEventsQuery; @@ -74,6 +74,7 @@ async fn events_plain_json( end_date: &str, cluster: &Cluster, ) -> Result { + let ctx = ReqCtx::for_test(); let t1 = Utc::now(); let node0 = &cluster.nodes[0]; let beg_date = beg_date.parse()?; @@ -84,7 +85,7 @@ async fn events_plain_json( let mut url = Url::parse(&format!("http://{}:{}/api/4/events", hp.host, hp.port))?; query.append_to_url(&mut url); let url = url; - let res = httpclient::http_get(url, APP_JSON).await?; + let res = httpclient::http_get(url, APP_JSON, &ctx).await?; let s = String::from_utf8_lossy(&res.body); let res: JsonValue = serde_json::from_str(&s)?; let pretty = serde_json::to_string_pretty(&res)?; diff --git a/crates/daqbufp2/src/test/timeweightedjson.rs b/crates/daqbufp2/src/test/timeweightedjson.rs index 039b4ca..ebd643d 100644 --- a/crates/daqbufp2/src/test/timeweightedjson.rs +++ b/crates/daqbufp2/src/test/timeweightedjson.rs @@ -6,6 +6,7 @@ use netpod::query::CacheUsage; use netpod::range::evrange::NanoRange; use netpod::AppendToUrl; use netpod::Cluster; +use netpod::ReqCtx; use netpod::SfDbChannel; use netpod::APP_JSON; use query::api4::binned::BinnedQuery; @@ -44,7 +45,8 @@ async fn get_json_common( let mut url = Url::parse(&format!("http://{}:{}/api/4/binned", node0.host, node0.port))?; query.append_to_url(&mut url); let url = url; - let res = httpclient::http_get(url, APP_JSON).await?; + let ctx = ReqCtx::for_test(); + let res = httpclient::http_get(url, APP_JSON, &ctx).await?; let s = String::from_utf8_lossy(&res.body); let t2 = chrono::Utc::now(); let ms = t2.signed_duration_since(t1).num_milliseconds() as u64; diff --git a/crates/disk/src/aggtest.rs b/crates/disk/src/aggtest.rs index 161b818..b3882f9 100644 --- a/crates/disk/src/aggtest.rs +++ b/crates/disk/src/aggtest.rs @@ -88,7 +88,7 @@ async fn agg_x_dim_0_inner() { true, // TODO 32, - netpod::ReqCtx::new("req-000"), + netpod::ReqCtx::new("req-000").into(), ); let _ = fut1; // TODO add the binning and expectation and await the result. @@ -150,7 +150,7 @@ async fn agg_x_dim_1_inner() { true, // TODO 32, - netpod::ReqCtx::new("req-000"), + netpod::ReqCtx::new("req-000").into(), ); let _ = fut1; // TODO add the binning and expectation and await the result. diff --git a/crates/disk/src/merge/mergedblobsfromremotes.rs b/crates/disk/src/merge/mergedblobsfromremotes.rs index 3dbb7c8..57d3495 100644 --- a/crates/disk/src/merge/mergedblobsfromremotes.rs +++ b/crates/disk/src/merge/mergedblobsfromremotes.rs @@ -7,6 +7,7 @@ use items_2::eventfull::EventFull; use items_2::merger::Merger; use netpod::log::*; use netpod::Cluster; +use netpod::ReqCtx; use query::api4::events::EventsSubQuery; use std::future::Future; use std::pin::Pin; @@ -26,11 +27,11 @@ pub struct MergedBlobsFromRemotes { } impl MergedBlobsFromRemotes { - pub fn new(subq: EventsSubQuery, cluster: Cluster) -> Self { + pub fn new(subq: EventsSubQuery, ctx: &ReqCtx, cluster: Cluster) -> Self { debug!("MergedBlobsFromRemotes::new subq {:?}", subq); let mut tcp_establish_futs = Vec::new(); for node in &cluster.nodes { - let f = x_processed_event_blobs_stream_from_node(subq.clone(), node.clone()); + let f = x_processed_event_blobs_stream_from_node(subq.clone(), node.clone(), ctx.clone()); let f: T002 = Box::pin(f); tcp_establish_futs.push(f); } diff --git a/crates/httpclient/src/httpclient.rs b/crates/httpclient/src/httpclient.rs index 2fa13cc..1eb8791 100644 --- a/crates/httpclient/src/httpclient.rs +++ b/crates/httpclient/src/httpclient.rs @@ -17,13 +17,14 @@ use http_body_util::combinators::BoxBody; use http_body_util::BodyExt; use hyper::body::Body; use hyper::body::Incoming; -use hyper::client::conn::http2::SendRequest; +use hyper::client::conn::http1::SendRequest; use hyper::Method; use netpod::log::*; use netpod::AppendToUrl; use netpod::ChannelConfigQuery; use netpod::ChannelConfigResponse; use netpod::NodeConfigCached; +use netpod::ReqCtx; use netpod::APP_JSON; use serde::Serialize; use std::fmt; @@ -232,12 +233,13 @@ pub struct HttpResponse { pub body: Bytes, } -pub async fn http_get(url: Url, accept: &str) -> Result { +pub async fn http_get(url: Url, accept: &str, ctx: &ReqCtx) -> Result { let req = Request::builder() .method(http::Method::GET) .uri(url.to_string()) .header(header::HOST, url.host_str().ok_or_else(|| Error::BadUrl)?) .header(header::ACCEPT, accept) + .header("daqbuf-reqid", ctx.reqid()) .body(body_empty())?; let mut send_req = connect_client(req.uri()).await?; let res = send_req.send_request(req).await?; @@ -261,13 +263,14 @@ pub async fn http_get(url: Url, accept: &str) -> Result { Ok(ret) } -pub async fn http_post(url: Url, accept: &str, body: String) -> Result { +pub async fn http_post(url: Url, accept: &str, body: String, ctx: &ReqCtx) -> Result { let req = Request::builder() .method(http::Method::POST) .uri(url.to_string()) .header(header::HOST, url.host_str().ok_or_else(|| Error::BadUrl)?) .header(header::CONTENT_TYPE, APP_JSON) .header(header::ACCEPT, accept) + .header("daqbuf-reqid", ctx.reqid()) .body(body_string(body))?; let mut send_req = connect_client(req.uri()).await?; let res = send_req.send_request(req).await?; @@ -301,8 +304,12 @@ pub async fn connect_client(uri: &http::Uri) -> Result, let host = uri.host().ok_or_else(|| Error::BadUrl)?; let port = uri.port_u16().ok_or_else(|| Error::BadUrl)?; let stream = TcpStream::connect(format!("{host}:{port}")).await?; - let executor = hyper_util::rt::TokioExecutor::new(); - let (send_req, conn) = hyper::client::conn::http2::Builder::new(executor) + #[cfg(DISABLED)] + { + let executor = hyper_util::rt::TokioExecutor::new(); + hyper::client::conn::http2::Builder::new(executor); + } + let (send_req, conn) = hyper::client::conn::http1::Builder::new() .handshake(hyper_util::rt::TokioIo::new(stream)) .await?; // TODO would need to take greater care of this task to catch connection-level errors. diff --git a/crates/httpret/Cargo.toml b/crates/httpret/Cargo.toml index 60f8f6f..bf664a7 100644 --- a/crates/httpret/Cargo.toml +++ b/crates/httpret/Cargo.toml @@ -24,6 +24,7 @@ itertools = "0.11.0" chrono = "0.4.23" md-5 = "0.10.6" regex = "1.10.2" +rand = "0.8.5" err = { path = "../err" } netpod = { path = "../netpod" } query = { path = "../query" } diff --git a/crates/httpret/src/api1.rs b/crates/httpret/src/api1.rs index 8a968f1..52d4bcc 100644 --- a/crates/httpret/src/api1.rs +++ b/crates/httpret/src/api1.rs @@ -62,6 +62,7 @@ use std::any; use std::collections::VecDeque; use std::future::Future; use std::pin::Pin; +use std::sync::Arc; use std::task::Context; use std::task::Poll; use std::time::Duration; @@ -527,7 +528,7 @@ pub struct DataApiPython3DataStream { event_count: usize, events_max: u64, header_out: bool, - reqctx: ReqCtxArc, + ctx: ReqCtxArc, ping_last: Instant, data_done: bool, completed: bool, @@ -561,7 +562,7 @@ impl DataApiPython3DataStream { event_count: 0, events_max, header_out: false, - reqctx, + ctx: reqctx, ping_last: Instant::now(), data_done: false, completed: false, @@ -737,7 +738,7 @@ impl DataApiPython3DataStream { if tsnow.duration_since(self.ping_last) >= Duration::from_millis(500) { self.ping_last = tsnow; let mut sb = crate::status_board().unwrap(); - sb.mark_alive(self.reqctx.reqid()); + sb.mark_alive(self.ctx.reqid()); } ret } @@ -749,7 +750,7 @@ impl DataApiPython3DataStream { self.range.clone().into(), TransformQuery::for_event_blobs(), ); - let subq = EventsSubQuery::from_parts(select, self.settings.clone(), self.reqctx.reqid().into()); + let subq = EventsSubQuery::from_parts(select, self.settings.clone(), self.ctx.reqid().into()); debug!("query for event blobs retrieval subq {subq:?}"); // TODO important TODO debug!("TODO fix magic inmem_bufcap"); @@ -757,10 +758,10 @@ impl DataApiPython3DataStream { // TODO is this a good to place decide this? let stream = if self.node_config.node_config.cluster.is_central_storage { debug!("set up central storage stream"); - disk::raw::conn::make_event_blobs_pipe(&subq, &fetch_info, self.reqctx.clone(), &self.node_config)? + disk::raw::conn::make_event_blobs_pipe(&subq, &fetch_info, self.ctx.clone(), &self.node_config)? } else { debug!("set up merged remote stream {}", fetch_info.name()); - let s = MergedBlobsFromRemotes::new(subq, self.node_config.node_config.cluster.clone()); + let s = MergedBlobsFromRemotes::new(subq, &self.ctx, self.node_config.node_config.cluster.clone()); Box::pin(s) as Pin> + Send>> }; self.chan_stream = Some(Box::pin(stream)); @@ -779,7 +780,7 @@ impl Stream for DataApiPython3DataStream { panic!("poll on completed") } else if self.data_done { self.completed = true; - let reqid = self.reqctx.reqid(); + let reqid = self.ctx.reqid(); info!( "{} response body sent {} bytes ({})", reqid, self.count_bytes, self.count_emits @@ -801,7 +802,7 @@ impl Stream for DataApiPython3DataStream { self.current_fetch_info = None; self.data_done = true; let mut sb = crate::status_board().unwrap(); - sb.add_error(self.reqctx.reqid(), e.0.clone()); + sb.add_error(self.ctx.reqid(), e.0.clone()); Ready(Some(Err(e))) } }, @@ -836,8 +837,8 @@ impl Stream for DataApiPython3DataStream { let n = Instant::now(); self.ping_last = n; let mut sb = crate::status_board().unwrap(); - sb.mark_alive(self.reqctx.reqid()); - sb.mark_done(self.reqctx.reqid()); + sb.mark_alive(self.ctx.reqid()); + sb.mark_done(self.ctx.reqid()); } continue; } @@ -877,7 +878,7 @@ impl Api1EventsBinaryHandler { pub async fn handle( &self, req: Requ, - _ctx: &ReqCtx, + ctx: &ReqCtx, node_config: &NodeConfigCached, ) -> Result { if req.method() != Method::POST { @@ -902,8 +903,6 @@ impl Api1EventsBinaryHandler { return Err(Error::with_msg_no_trace("can not parse query")); } }; - let reqid = super::status_board()?.new_status_id(); - let reqctx = netpod::ReqCtx::new(reqid); let span = if qu.log_level() == "trace" { debug!("enable trace for handler"); tracing::span!(tracing::Level::TRACE, "log_span_trace") @@ -920,7 +919,9 @@ impl Api1EventsBinaryHandler { .map_err(|e| e.add_public_msg(format!("Can not parse query url")))? }; let disk_tune = DiskIoTune::from_url(&url)?; - let reqidspan = tracing::info_span!("api1query", reqid = reqctx.reqid()); + let reqidspan = tracing::info_span!("api1query", reqid = ctx.reqid()); + // TODO do not clone here + let reqctx = Arc::new(ctx.clone()); self.handle_for_query( qu, accept, @@ -928,6 +929,7 @@ impl Api1EventsBinaryHandler { &reqctx, span.clone(), reqidspan.clone(), + ctx, node_config, ) .instrument(span) @@ -943,6 +945,7 @@ impl Api1EventsBinaryHandler { reqctx: &ReqCtxArc, span: tracing::Span, reqidspan: tracing::Span, + ctx: &ReqCtx, ncc: &NodeConfigCached, ) -> Result { let self_name = any::type_name::(); @@ -973,7 +976,8 @@ impl Api1EventsBinaryHandler { debug!("try to find config quorum for {ch:?}"); let ch = SfDbChannel::from_name(backend, ch.name()); let ch_conf = - nodenet::configquorum::find_config_basics_quorum(ch.clone(), range.clone().into(), ncc).await?; + nodenet::configquorum::find_config_basics_quorum(ch.clone(), range.clone().into(), ctx, ncc) + .await?; match ch_conf { Some(x) => { debug!("found quorum {ch:?} {x:?}"); diff --git a/crates/httpret/src/api4/binned.rs b/crates/httpret/src/api4/binned.rs index b37845d..d4fbc3f 100644 --- a/crates/httpret/src/api4/binned.rs +++ b/crates/httpret/src/api4/binned.rs @@ -14,11 +14,17 @@ use netpod::log::*; use netpod::timeunits::SEC; use netpod::FromUrl; use netpod::NodeConfigCached; +use netpod::ReqCtx; use query::api4::binned::BinnedQuery; use tracing::Instrument; use url::Url; -async fn binned_json(url: Url, req: Requ, node_config: &NodeConfigCached) -> Result { +async fn binned_json( + url: Url, + req: Requ, + ctx: &ReqCtx, + node_config: &NodeConfigCached, +) -> Result { debug!("{:?}", req); let reqid = crate::status_board() .map_err(|e| Error::with_msg_no_trace(e.to_string()))? @@ -30,7 +36,7 @@ async fn binned_json(url: Url, req: Requ, node_config: &NodeConfigCached) -> Res e.add_public_msg(msg) })?; // TODO handle None case better and return 404 - let ch_conf = ch_conf_from_binned(&query, node_config) + let ch_conf = ch_conf_from_binned(&query, ctx, node_config) .await? .ok_or_else(|| Error::with_msg_no_trace("channel not found"))?; let span1 = span!( @@ -44,14 +50,14 @@ async fn binned_json(url: Url, req: Requ, node_config: &NodeConfigCached) -> Res span1.in_scope(|| { debug!("begin"); }); - let item = streams::timebinnedjson::timebinned_json(query, ch_conf, reqid, node_config.node_config.cluster.clone()) + let item = streams::timebinnedjson::timebinned_json(query, ch_conf, ctx, node_config.node_config.cluster.clone()) .instrument(span1) .await?; let ret = response(StatusCode::OK).body(ToJsonBody::from(&item).into_body())?; Ok(ret) } -async fn binned(req: Requ, node_config: &NodeConfigCached) -> Result { +async fn binned(req: Requ, ctx: &ReqCtx, node_config: &NodeConfigCached) -> Result { let url = { let s1 = format!("dummy:{}", req.uri()); Url::parse(&s1) @@ -66,7 +72,7 @@ async fn binned(req: Requ, node_config: &NodeConfigCached) -> Result Result { + pub async fn handle( + &self, + req: Requ, + ctx: &ReqCtx, + node_config: &NodeConfigCached, + ) -> Result { if req.method() != Method::GET { return Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?); } - match binned(req, node_config).await { + match binned(req, ctx, node_config).await { Ok(ret) => Ok(ret), Err(e) => { warn!("BinnedHandler handle sees: {e}"); diff --git a/crates/httpret/src/api4/events.rs b/crates/httpret/src/api4/events.rs index a7d180d..14aba3b 100644 --- a/crates/httpret/src/api4/events.rs +++ b/crates/httpret/src/api4/events.rs @@ -16,6 +16,7 @@ use httpclient::ToJsonBody; use netpod::log::*; use netpod::FromUrl; use netpod::NodeConfigCached; +use netpod::ReqCtx; use netpod::ACCEPT_ALL; use netpod::APP_JSON; use netpod::APP_OCTET; @@ -33,11 +34,16 @@ impl EventsHandler { } } - pub async fn handle(&self, req: Requ, node_config: &NodeConfigCached) -> Result { + pub async fn handle( + &self, + req: Requ, + ctx: &ReqCtx, + node_config: &NodeConfigCached, + ) -> Result { if req.method() != Method::GET { return Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?); } - match plain_events(req, node_config).await { + match plain_events(req, ctx, node_config).await { Ok(ret) => Ok(ret), Err(e) => { error!("EventsHandler sees: {e}"); @@ -47,7 +53,7 @@ impl EventsHandler { } } -async fn plain_events(req: Requ, node_config: &NodeConfigCached) -> Result { +async fn plain_events(req: Requ, ctx: &ReqCtx, node_config: &NodeConfigCached) -> Result { let accept_def = APP_JSON; let accept = req .headers() @@ -60,19 +66,24 @@ async fn plain_events(req: Requ, node_config: &NodeConfigCached) -> Result Result { +async fn plain_events_binary( + url: Url, + req: Requ, + ctx: &ReqCtx, + node_config: &NodeConfigCached, +) -> Result { debug!("{:?}", req); let query = PlainEventsQuery::from_url(&url).map_err(|e| e.add_public_msg(format!("Can not understand query")))?; - let ch_conf = chconf_from_events_quorum(&query, node_config).await?; + let ch_conf = chconf_from_events_quorum(&query, ctx, node_config).await?; info!("plain_events_binary chconf_from_events_quorum: {ch_conf:?}"); let s = stream::iter([Ok::<_, Error>(String::from("TODO_PREBINNED_BINARY_STREAM"))]); let s = s.map_err(Error::from); @@ -80,20 +91,24 @@ async fn plain_events_binary(url: Url, req: Requ, node_config: &NodeConfigCached Ok(ret) } -async fn plain_events_json(url: Url, req: Requ, node_config: &NodeConfigCached) -> Result { - let reqid = crate::status_board()?.new_status_id(); +async fn plain_events_json( + url: Url, + req: Requ, + ctx: &ReqCtx, + node_config: &NodeConfigCached, +) -> Result { info!("plain_events_json req: {:?}", req); let (_head, _body) = req.into_parts(); let query = PlainEventsQuery::from_url(&url)?; info!("plain_events_json query {query:?}"); // TODO handle None case better and return 404 - let ch_conf = chconf_from_events_quorum(&query, node_config) + let ch_conf = chconf_from_events_quorum(&query, ctx, node_config) .await .map_err(Error::from)? .ok_or_else(|| Error::with_msg_no_trace("channel not found"))?; info!("plain_events_json chconf_from_events_quorum: {ch_conf:?}"); let item = - streams::plaineventsjson::plain_events_json(&query, ch_conf, reqid, &node_config.node_config.cluster).await; + streams::plaineventsjson::plain_events_json(&query, ch_conf, ctx, &node_config.node_config.cluster).await; let item = match item { Ok(item) => item, Err(e) => { diff --git a/crates/httpret/src/channelconfig.rs b/crates/httpret/src/channelconfig.rs index dec04ea..41848ef 100644 --- a/crates/httpret/src/channelconfig.rs +++ b/crates/httpret/src/channelconfig.rs @@ -20,6 +20,7 @@ use netpod::ChannelConfigResponse; use netpod::ChannelTypeConfigGen; use netpod::FromUrl; use netpod::NodeConfigCached; +use netpod::ReqCtx; use netpod::ScalarType; use netpod::SfDbChannel; use netpod::Shape; @@ -36,25 +37,28 @@ use url::Url; pub async fn chconf_from_events_quorum( q: &PlainEventsQuery, + ctx: &ReqCtx, ncc: &NodeConfigCached, ) -> Result, Error> { - let ret = find_config_basics_quorum(q.channel().clone(), q.range().clone(), ncc).await?; + let ret = find_config_basics_quorum(q.channel().clone(), q.range().clone(), ctx, ncc).await?; Ok(ret) } pub async fn chconf_from_prebinned( q: &PreBinnedQuery, + ctx: &ReqCtx, ncc: &NodeConfigCached, ) -> Result, Error> { - let ret = find_config_basics_quorum(q.channel().clone(), q.patch().patch_range(), ncc).await?; + let ret = find_config_basics_quorum(q.channel().clone(), q.patch().patch_range(), ctx, ncc).await?; Ok(ret) } pub async fn ch_conf_from_binned( q: &BinnedQuery, + ctx: &ReqCtx, ncc: &NodeConfigCached, ) -> Result, Error> { - let ret = find_config_basics_quorum(q.channel().clone(), q.range().clone(), ncc).await?; + let ret = find_config_basics_quorum(q.channel().clone(), q.range().clone(), ctx, ncc).await?; Ok(ret) } @@ -172,7 +176,12 @@ impl ChannelConfigQuorumHandler { } } - pub async fn handle(&self, req: Requ, node_config: &NodeConfigCached) -> Result { + pub async fn handle( + &self, + req: Requ, + ctx: &ReqCtx, + node_config: &NodeConfigCached, + ) -> Result { if req.method() == Method::GET { let accept_def = APP_JSON; let accept = req @@ -180,7 +189,7 @@ impl ChannelConfigQuorumHandler { .get(http::header::ACCEPT) .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); if accept.contains(APP_JSON) || accept.contains(ACCEPT_ALL) { - match self.channel_config_quorum(req, &node_config).await { + match self.channel_config_quorum(req, ctx, &node_config).await { Ok(k) => Ok(k), Err(e) => { warn!("from channel_config_quorum: {e}"); @@ -195,12 +204,17 @@ impl ChannelConfigQuorumHandler { } } - async fn channel_config_quorum(&self, req: Requ, ncc: &NodeConfigCached) -> Result { + async fn channel_config_quorum( + &self, + req: Requ, + ctx: &ReqCtx, + ncc: &NodeConfigCached, + ) -> Result { info!("channel_config_quorum"); let url = Url::parse(&format!("dummy:{}", req.uri()))?; let q = ChannelConfigQuery::from_url(&url)?; info!("channel_config_quorum for q {q:?}"); - let ch_confs = nodenet::configquorum::find_config_basics_quorum(q.channel, q.range.into(), ncc).await?; + let ch_confs = nodenet::configquorum::find_config_basics_quorum(q.channel, q.range.into(), ctx, ncc).await?; let ret = response(StatusCode::OK) .header(http::header::CONTENT_TYPE, APP_JSON) .body(ToJsonBody::from(&ch_confs).into_body())?; diff --git a/crates/httpret/src/gather.rs b/crates/httpret/src/gather.rs index 4deac8d..d2aeb64 100644 --- a/crates/httpret/src/gather.rs +++ b/crates/httpret/src/gather.rs @@ -5,8 +5,10 @@ use crate::response; use crate::Requ; use futures_util::select; use futures_util::FutureExt; +use http::header; use http::Method; use http::StatusCode; +use http::Uri; use httpclient::connect_client; use httpclient::read_body_bytes; use httpclient::IntoBody; @@ -22,6 +24,7 @@ use netpod::APP_JSON; use serde::Deserialize; use serde::Serialize; use serde_json::Value as JsonValue; +use std::fmt; use std::future::Future; use std::pin::Pin; use std::time::Duration; @@ -69,9 +72,11 @@ pub async fn gather_get_json(req: Requ, node_config: &NodeConfigCached) -> Resul .iter() .filter_map(|node| { let uri = format!("http://{}:{}/api/4/{}", node.host, node.port, pathsuf); - let req = Request::builder().method(Method::GET).uri(uri); - let req = req.header(http::header::HOST, &node.host); - let req = req.header(http::header::ACCEPT, APP_JSON); + let req = Request::builder() + .method(Method::GET) + .header(http::header::HOST, &node.host) + .header(http::header::ACCEPT, APP_JSON) + .uri(uri); match req.body(body_empty()) { Ok(req) => { let task = tokio::spawn(async move { @@ -133,6 +138,7 @@ pub async fn gather_get_json(req: Requ, node_config: &NodeConfigCached) -> Resul #[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord, Serialize, Deserialize)] pub struct Tag(pub String); +#[derive(Debug)] pub struct SubRes { pub tag: String, pub status: StatusCode, @@ -158,6 +164,7 @@ where + Copy + 'static, FT: Fn(Vec<(Tag, Result, Error>)>) -> Result, + SubRes: fmt::Debug, { // TODO remove magic constant let extra_timeout = Duration::from_millis(3000); @@ -173,17 +180,26 @@ where .zip(tags.into_iter()) .filter_map(move |((url, body), tag)| { info!("Try gather from {}", url); - let url_str = url.as_str(); - let req = if body.is_some() { - Request::builder().method(Method::POST).uri(url_str) + let uri: Uri = if let Ok(x) = url.as_str().parse() { + x } else { - Request::builder().method(Method::GET).uri(url_str) + warn!("can not parse {url}"); + return None; }; - let req = req.header(http::header::ACCEPT, APP_JSON); let req = if body.is_some() { - req.header(http::header::CONTENT_TYPE, APP_JSON) + Request::builder() + .method(Method::POST) + .header(header::HOST, uri.host().unwrap()) + .header(http::header::CONTENT_TYPE, APP_JSON) + .header(http::header::ACCEPT, APP_JSON) + .uri(uri) } else { - req + Request::builder() + .method(Method::GET) + .header(header::HOST, uri.host().unwrap()) + .header(http::header::CONTENT_TYPE, APP_JSON) + .header(http::header::ACCEPT, APP_JSON) + .uri(uri) }; let body = match body { None => body_empty(), @@ -209,9 +225,9 @@ where }; Ok(res) }.fuse() => { - info!("received result in time"); + debug!("received result in time {res:?}"); let ret = nt(tag2, res?).await?; - info!("transformed result in time"); + debug!("transformed result in time {ret:?}"); Ok(ret) } } diff --git a/crates/httpret/src/httpret.rs b/crates/httpret/src/httpret.rs index 5dfdd69..d6e9047 100644 --- a/crates/httpret/src/httpret.rs +++ b/crates/httpret/src/httpret.rs @@ -34,13 +34,12 @@ use httpclient::Requ; use httpclient::StreamResponse; use httpclient::ToJsonBody; use hyper::service::service_fn; -use hyper::Request; use hyper_util::rt::TokioIo; use net::SocketAddr; use netpod::log::*; use netpod::query::prebinned::PreBinnedQuery; use netpod::NodeConfigCached; -use netpod::ProxyConfig; +use netpod::ReqCtx; use netpod::ServiceVersion; use netpod::APP_JSON; use netpod::APP_JSON_LINES; @@ -64,9 +63,7 @@ use task::Context; use task::Poll; use taskrun::tokio; use taskrun::tokio::net::TcpListener; - -pub const PSI_DAQBUFFER_SERVICE_MARK: &'static str = "PSI-Daqbuffer-Service-Mark"; -pub const PSI_DAQBUFFER_SEEN_URL: &'static str = "PSI-Daqbuffer-Seen-Url"; +use tracing::Instrument; #[derive(Debug, ThisError, Serialize, Deserialize)] pub enum RetrievalError { @@ -128,13 +125,7 @@ pub fn accepts_octets(hm: &http::HeaderMap) -> bool { } pub async fn host(node_config: NodeConfigCached, service_version: ServiceVersion) -> Result<(), RetrievalError> { - static STATUS_BOARD_INIT: Once = Once::new(); - STATUS_BOARD_INIT.call_once(|| { - let b = StatusBoard::new(); - let a = RwLock::new(b); - let x = Box::new(a); - STATUS_BOARD.store(Box::into_raw(x), Ordering::SeqCst); - }); + status_board_init(); #[cfg(DISABLED)] if let Some(bind) = node_config.node.prometheus_api_bind { tokio::spawn(prometheus::host(bind)); @@ -179,6 +170,20 @@ async fn the_service_fn( addr: SocketAddr, node_config: NodeConfigCached, service_version: ServiceVersion, +) -> Result { + let ctx = ReqCtx::new(status_board()?.new_status_id()).with_node(&req, &node_config); + let reqid_span = span!(Level::INFO, "req", reqid = ctx.reqid()); + let f = http_service(req, addr, ctx, node_config, service_version); + let f = Cont { f: Box::pin(f) }; + f.instrument(reqid_span).await +} + +async fn http_service( + req: Requ, + addr: SocketAddr, + ctx: ReqCtx, + node_config: NodeConfigCached, + service_version: ServiceVersion, ) -> Result { info!( "http-request {:?} - {:?} - {:?} - {:?}", @@ -187,17 +192,7 @@ async fn the_service_fn( req.uri(), req.headers() ); - let f = http_service(req, node_config, service_version).await; - // Cont { f: Box::pin(f) } - f -} - -async fn http_service( - req: Requ, - node_config: NodeConfigCached, - service_version: ServiceVersion, -) -> Result { - match http_service_try(req, &node_config, &service_version).await { + match http_service_try(req, ctx, &node_config, &service_version).await { Ok(k) => Ok(k), Err(e) => { error!("daqbuffer node http_service sees error: {}", e); @@ -237,41 +232,6 @@ where impl UnwindSafe for Cont {} -pub struct ReqCtx { - pub marks: Vec, - pub mark: String, -} - -impl ReqCtx { - fn with_node(req: &Request, nc: &NodeConfigCached) -> Self { - let mut marks = Vec::new(); - for (n, v) in req.headers().iter() { - if n == PSI_DAQBUFFER_SERVICE_MARK { - marks.push(String::from_utf8_lossy(v.as_bytes()).to_string()); - } - } - Self { - marks, - mark: format!("{}:{}", nc.node_config.name, nc.node.port), - } - } -} - -impl ReqCtx { - fn with_proxy(req: &Request, proxy: &ProxyConfig) -> Self { - let mut marks = Vec::new(); - for (n, v) in req.headers().iter() { - if n == PSI_DAQBUFFER_SERVICE_MARK { - marks.push(String::from_utf8_lossy(v.as_bytes()).to_string()); - } - } - Self { - marks, - mark: format!("{}:{}", proxy.name, proxy.port), - } - } -} - // TODO remove because I want error bodies to be json. pub fn response_err(status: StatusCode, msg: T) -> Result where @@ -334,6 +294,7 @@ macro_rules! static_http_api1 { async fn http_service_try( req: Requ, + ctx: ReqCtx, node_config: &NodeConfigCached, service_version: &ServiceVersion, ) -> Result { @@ -341,23 +302,22 @@ async fn http_service_try( let mut urlmarks = Vec::new(); urlmarks.push(format!("{}:{}", req.method(), req.uri())); for (k, v) in req.headers() { - if k == PSI_DAQBUFFER_SEEN_URL { + if k == netpod::PSI_DAQBUFFER_SEEN_URL { let s = String::from_utf8_lossy(v.as_bytes()); urlmarks.push(s.into()); } } - let ctx = ReqCtx::with_node(&req, &node_config); let mut res = http_service_inner(req, &ctx, node_config, service_version).await?; let hm = res.headers_mut(); hm.append("Access-Control-Allow-Origin", "*".parse().unwrap()); hm.append("Access-Control-Allow-Headers", "*".parse().unwrap()); - for m in &ctx.marks { - hm.append(PSI_DAQBUFFER_SERVICE_MARK, m.parse().unwrap()); + for m in ctx.marks() { + hm.append(netpod::PSI_DAQBUFFER_SERVICE_MARK, m.parse().unwrap()); } - hm.append(PSI_DAQBUFFER_SERVICE_MARK, ctx.mark.parse().unwrap()); + hm.append(netpod::PSI_DAQBUFFER_SERVICE_MARK, ctx.mark().parse().unwrap()); for s in urlmarks { let v = HeaderValue::from_str(&s).unwrap_or_else(|_| HeaderValue::from_static("invalid")); - hm.append(PSI_DAQBUFFER_SEEN_URL, v); + hm.append(netpod::PSI_DAQBUFFER_SEEN_URL, v); } Ok(res) } @@ -427,9 +387,9 @@ async fn http_service_inner( } else if let Some(h) = api4::search::ChannelSearchHandler::handler(&req) { Ok(h.handle(req, &node_config).await?) } else if let Some(h) = api4::binned::BinnedHandler::handler(&req) { - Ok(h.handle(req, &node_config).await?) + Ok(h.handle(req, ctx, &node_config).await?) } else if let Some(h) = channelconfig::ChannelConfigQuorumHandler::handler(&req) { - Ok(h.handle(req, &node_config).await?) + Ok(h.handle(req, ctx, &node_config).await?) } else if let Some(h) = channelconfig::ChannelConfigsHandler::handler(&req) { Ok(h.handle(req, &node_config).await?) } else if let Some(h) = channelconfig::ChannelConfigHandler::handler(&req) { @@ -445,7 +405,7 @@ async fn http_service_inner( } else if let Some(h) = channelconfig::AmbigiousChannelNames::handler(&req) { Ok(h.handle(req, &node_config).await?) } else if let Some(h) = api4::events::EventsHandler::handler(&req) { - Ok(h.handle(req, &node_config).await?) + Ok(h.handle(req, ctx, &node_config).await?) } else if let Some(h) = channel_status::ConnectionStatusEvents::handler(&req) { Ok(h.handle(req, ctx, &node_config).await?) } else if let Some(h) = channel_status::ChannelStatusEventsHandler::handler(&req) { @@ -845,20 +805,14 @@ impl StatusBoard { } pub fn new_status_id(&mut self) -> String { - use std::fs::File; - use std::io::Read; - self.clean(); - let mut f = File::open("/dev/urandom").unwrap(); - let mut buf = [0; 4]; - f.read_exact(&mut buf).unwrap(); - let n = u32::from_le_bytes(buf); + self.clean_if_needed(); + let n: u32 = rand::random(); let s = format!("{:08x}", n); - debug!("new_status_id {s}"); self.entries.insert(s.clone(), StatusBoardEntry::new()); s } - pub fn clean(&mut self) { + pub fn clean_if_needed(&mut self) { if self.entries.len() > 15000 { let mut tss: Vec<_> = self.entries.values().map(|e| e.ts_updated).collect(); tss.sort_unstable(); @@ -916,7 +870,7 @@ impl StatusBoard { Some(e) => e.into(), None => { error!("can not find status id {}", status_id); - let _e = ::err::Error::with_public_msg_no_trace(format!("Request status ID unknown {status_id}")); + let _e = ::err::Error::with_public_msg_no_trace(format!("request-id unknown {status_id}")); StatusBoardEntryUser { error_count: 1, warn_count: 0, @@ -937,3 +891,13 @@ pub fn status_board() -> Result, Retrieva Err(e) => Err(RetrievalError::TextError(format!("{e}"))), } } + +pub fn status_board_init() { + static STATUS_BOARD_INIT: Once = Once::new(); + STATUS_BOARD_INIT.call_once(|| { + let b = StatusBoard::new(); + let a = RwLock::new(b); + let x = Box::new(a); + STATUS_BOARD.store(Box::into_raw(x), Ordering::SeqCst); + }); +} diff --git a/crates/httpret/src/proxy.rs b/crates/httpret/src/proxy.rs index 0446e24..20537fe 100644 --- a/crates/httpret/src/proxy.rs +++ b/crates/httpret/src/proxy.rs @@ -12,9 +12,9 @@ use crate::gather::SubRes; use crate::pulsemap::MapPulseQuery; use crate::response; use crate::response_err; +use crate::status_board; +use crate::status_board_init; use crate::Cont; -use crate::ReqCtx; -use crate::PSI_DAQBUFFER_SERVICE_MARK; use futures_util::pin_mut; use futures_util::Stream; use http::Method; @@ -42,9 +42,11 @@ use netpod::FromUrl; use netpod::HasBackend; use netpod::HasTimeout; use netpod::ProxyConfig; +use netpod::ReqCtx; use netpod::ServiceVersion; use netpod::ACCEPT_ALL; use netpod::APP_JSON; +use netpod::PSI_DAQBUFFER_SERVICE_MARK; use query::api4::binned::BinnedQuery; use query::api4::events::PlainEventsQuery; use serde::Deserialize; @@ -61,19 +63,23 @@ use tokio::fs::File; use tokio::io::AsyncRead; use tokio::io::ReadBuf; use tokio::net::TcpListener; +use tracing::Instrument; use url::Url; const DISTRI_PRE: &str = "/distri/"; pub async fn proxy(proxy_config: ProxyConfig, service_version: ServiceVersion) -> Result<(), Error> { + status_board_init(); use std::str::FromStr; let bind_addr = SocketAddr::from_str(&format!("{}:{}", proxy_config.listen, proxy_config.port))?; let listener = TcpListener::bind(bind_addr).await?; loop { - let (stream, addr) = if let Ok(x) = listener.accept().await { - x - } else { - break; + let (stream, addr) = match listener.accept().await { + Ok(x) => x, + Err(e) => { + error!("{e}"); + break; + } }; debug!("new connection from {addr}"); let proxy_config = proxy_config.clone(); @@ -83,19 +89,7 @@ pub async fn proxy(proxy_config: ProxyConfig, service_version: ServiceVersion) - let res = hyper::server::conn::http1::Builder::new() .serve_connection( io, - service_fn({ - move |req| { - info!( - "http-request {:?} - {:?} - {:?} - {:?}", - bind_addr, - req.method(), - req.uri(), - req.headers() - ); - let f = proxy_http_service(req, proxy_config.clone(), service_version.clone()); - Cont { f: Box::pin(f) } - } - }), + service_fn(move |req| the_service_fn(req, addr, proxy_config.clone(), service_version.clone())), ) .await; match res { @@ -106,15 +100,38 @@ pub async fn proxy(proxy_config: ProxyConfig, service_version: ServiceVersion) - } }); } + info!("proxy done"); Ok(()) } +async fn the_service_fn( + req: Requ, + addr: SocketAddr, + proxy_config: ProxyConfig, + service_version: ServiceVersion, +) -> Result { + let ctx = ReqCtx::new(status_board().unwrap().new_status_id()).with_proxy(&req, &proxy_config); + let reqid_span = span!(Level::INFO, "req", reqid = ctx.reqid()); + let f = proxy_http_service(req, addr, ctx, proxy_config.clone(), service_version.clone()); + let f = Cont { f: Box::pin(f) }; + f.instrument(reqid_span).await +} + async fn proxy_http_service( req: Requ, + addr: SocketAddr, + ctx: ReqCtx, proxy_config: ProxyConfig, service_version: ServiceVersion, ) -> Result { - match proxy_http_service_try(req, &proxy_config, &service_version).await { + info!( + "http-request {:?} - {:?} - {:?} - {:?}", + addr, + req.method(), + req.uri(), + req.headers() + ); + match proxy_http_service_try(req, ctx, &proxy_config, &service_version).await { Ok(k) => Ok(k), Err(e) => { error!("data_api_proxy sees error: {:?}", e); @@ -125,18 +142,18 @@ async fn proxy_http_service( async fn proxy_http_service_try( req: Requ, + ctx: ReqCtx, proxy_config: &ProxyConfig, service_version: &ServiceVersion, ) -> Result { - let ctx = ReqCtx::with_proxy(&req, proxy_config); let mut res = proxy_http_service_inner(req, &ctx, proxy_config, &service_version).await?; let hm = res.headers_mut(); hm.insert("Access-Control-Allow-Origin", "*".parse().unwrap()); hm.insert("Access-Control-Allow-Headers", "*".parse().unwrap()); - for m in &ctx.marks { + for m in ctx.marks() { hm.append(PSI_DAQBUFFER_SERVICE_MARK, m.parse().unwrap()); } - hm.append(PSI_DAQBUFFER_SERVICE_MARK, ctx.mark.parse().unwrap()); + hm.append(PSI_DAQBUFFER_SERVICE_MARK, ctx.mark().parse().unwrap()); Ok(res) } diff --git a/crates/httpret/src/proxy/api1.rs b/crates/httpret/src/proxy/api1.rs index 73c9bdd..8cee64e 100644 --- a/crates/httpret/src/proxy/api1.rs +++ b/crates/httpret/src/proxy/api1.rs @@ -38,7 +38,7 @@ impl PythonDataApi1Query { } } - pub async fn handle(&self, req: Requ, _ctx: &ReqCtx, proxy_config: &ProxyConfig) -> Result { + pub async fn handle(&self, req: Requ, ctx: &ReqCtx, proxy_config: &ProxyConfig) -> Result { if req.method() != Method::POST { return Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?); } @@ -50,18 +50,16 @@ impl PythonDataApi1Query { .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))? .to_owned(); let body_data = read_body_bytes(body).await?; - if body_data.len() < 512 && body_data.first() == Some(&"{".as_bytes()[0]) { - info!("request body_data string: {}", String::from_utf8_lossy(&body_data)); - } let qu = match serde_json::from_slice::(&body_data) { Ok(qu) => qu, Err(e) => { - error!("got body_data: {:?}", String::from_utf8_lossy(&body_data[..])); + let buf = &body_data[..body_data.len().min(200)]; + error!("got body_data: {:?}", String::from_utf8_lossy(buf)); error!("can not parse: {e}"); return Err(Error::with_msg_no_trace("can not parse query")); } }; - info!("Proxy sees request: {qu:?}"); + info!("{qu:?}"); let back = { let mut ret = None; for b in &proxy_config.backends { @@ -73,12 +71,24 @@ impl PythonDataApi1Query { ret }; if let Some(back) = back { + // TODO remove special code, make it part of configuration + let back = if back.url.contains("sf-daqbuf-23.psi.ch") { + let id = 21 + rand::random::() % 13; + let url = back.url.replace("-23.", &format!("-{id}.")); + netpod::ProxyBackend { + name: back.name.clone(), + url, + } + } else { + back.clone() + }; let url_str = format!("{}/api/1/query", back.url); info!("try to ask {url_str}"); let uri: Uri = url_str.parse()?; let req = Request::builder() .method(Method::POST) .header(header::HOST, uri.host().unwrap()) + .header(ctx.header_name(), ctx.header_value()) .uri(&uri) .body(body_bytes(body_data))?; let mut client = connect_client(&uri).await?; diff --git a/crates/httpret/src/pulsemap.rs b/crates/httpret/src/pulsemap.rs index 3223d98..a11a211 100644 --- a/crates/httpret/src/pulsemap.rs +++ b/crates/httpret/src/pulsemap.rs @@ -835,11 +835,11 @@ impl FromUrl for MapPulseQuery { fn from_url(url: &url::Url) -> Result { let mut pit = url .path_segments() - .ok_or_else(|| Error::with_msg_no_trace("no path in url"))? + .ok_or_else(|| Error::with_msg_no_trace(format!("no path in url {url}")))? .rev(); let pulsestr = pit .next() - .ok_or_else(|| Error::with_msg_no_trace("no pulse in url path"))?; + .ok_or_else(|| Error::with_msg_no_trace(format!("no pulse in url path {pit:?}")))?; let backend = pit.next().unwrap_or("sf-databuffer").into(); // TODO legacy: use a default backend if not specified. let backend = if backend == "pulse" { diff --git a/crates/netpod/Cargo.toml b/crates/netpod/Cargo.toml index b2afb5d..939b009 100644 --- a/crates/netpod/Cargo.toml +++ b/crates/netpod/Cargo.toml @@ -10,6 +10,7 @@ path = "src/netpod.rs" [dependencies] serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" +http = "1.0.0" humantime-serde = "1.1.1" async-channel = "1.8.0" bytes = "1.4.0" diff --git a/crates/netpod/src/netpod.rs b/crates/netpod/src/netpod.rs index 43b5c98..813e704 100644 --- a/crates/netpod/src/netpod.rs +++ b/crates/netpod/src/netpod.rs @@ -17,6 +17,7 @@ use chrono::Utc; use err::Error; use futures_util::Stream; use futures_util::StreamExt; +use http::Request; use range::evrange::NanoRange; use range::evrange::PulseRange; use range::evrange::SeriesRange; @@ -2528,7 +2529,7 @@ mod test { } } -#[derive(Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize)] pub struct ChannelSearchSingleResult { pub backend: String, pub name: String, @@ -2544,7 +2545,7 @@ pub struct ChannelSearchSingleResult { pub is_api_0: Option, } -#[derive(Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize)] pub struct ChannelSearchResult { pub channels: Vec, } @@ -3108,22 +3109,82 @@ mod test_parse { } } +pub const PSI_DAQBUFFER_SERVICE_MARK: &'static str = "PSI-Daqbuffer-Service-Mark"; +pub const PSI_DAQBUFFER_SEEN_URL: &'static str = "PSI-Daqbuffer-Seen-Url"; + +#[derive(Debug, Clone)] pub struct ReqCtx { reqid: String, + marks: Vec, + mark: String, } impl ReqCtx { - pub fn new(reqid: S) -> std::sync::Arc + pub fn new(reqid: S) -> Self where S: Into, { - let ret = Self { reqid: reqid.into() }; - std::sync::Arc::new(ret) + let ret = Self { + reqid: reqid.into(), + marks: Vec::new(), + mark: String::new(), + }; + ret + } + + pub fn for_test() -> Self { + Self { + reqid: "TESTID".into(), + marks: Vec::new(), + mark: String::new(), + } + } + + pub fn with_node(mut self, req: &Request, nc: &NodeConfigCached) -> Self { + if let Some(reqid) = req.headers().get("daqbuf-reqid") { + self.reqid = format!("{}-{}", reqid.to_str().unwrap_or("BADID"), self.reqid()); + } + self.mark = format!("{}:{}", nc.node_config.name, nc.node.port); + for (n, v) in req.headers().iter() { + if n == PSI_DAQBUFFER_SERVICE_MARK { + self.marks.push(String::from_utf8_lossy(v.as_bytes()).to_string()); + } + } + self + } + + pub fn with_proxy(mut self, req: &Request, proxy: &ProxyConfig) -> Self { + if let Some(reqid) = req.headers().get("daqbuf-reqid") { + self.reqid = format!("{}-{}", reqid.to_str().unwrap_or("BADID"), self.reqid()); + } + self.mark = format!("{}:{}", proxy.name, proxy.port); + for (n, v) in req.headers().iter() { + if n == PSI_DAQBUFFER_SERVICE_MARK { + self.marks.push(String::from_utf8_lossy(v.as_bytes()).to_string()); + } + } + self } pub fn reqid(&self) -> &str { &self.reqid } + + pub fn mark(&self) -> &str { + &self.mark + } + + pub fn marks(&self) -> &[String] { + &self.marks + } + + pub fn header_name(&self) -> &'static str { + "daqbuf-reqid" + } + + pub fn header_value(&self) -> &str { + &self.reqid + } } pub type ReqCtxArc = std::sync::Arc; diff --git a/crates/nodenet/src/channelconfig.rs b/crates/nodenet/src/channelconfig.rs index eafbbb9..cfd3feb 100644 --- a/crates/nodenet/src/channelconfig.rs +++ b/crates/nodenet/src/channelconfig.rs @@ -11,6 +11,7 @@ use netpod::ChannelConfigResponse; use netpod::ChannelTypeConfigGen; use netpod::DtNano; use netpod::NodeConfigCached; +use netpod::ReqCtx; use netpod::ScalarType; use netpod::SfChFetchInfo; use netpod::SfDbChannel; @@ -175,11 +176,12 @@ pub async fn channel_configs(channel: SfDbChannel, ncc: &NodeConfigCached) -> Re pub async fn http_get_channel_config( qu: ChannelConfigQuery, baseurl: Url, + ctx: &ReqCtx, ) -> Result, Error> { let url = baseurl; let mut url = url.join("/api/4/channel/config").unwrap(); qu.append_to_url(&mut url); - let res = httpclient::http_get(url, APP_JSON).await?; + let res = httpclient::http_get(url, APP_JSON, ctx).await?; use httpclient::http::StatusCode; if res.head.status == StatusCode::NOT_FOUND { Ok(None) diff --git a/crates/nodenet/src/configquorum.rs b/crates/nodenet/src/configquorum.rs index 0c84660..6613a5b 100644 --- a/crates/nodenet/src/configquorum.rs +++ b/crates/nodenet/src/configquorum.rs @@ -8,6 +8,7 @@ use netpod::ChannelConfigResponse; use netpod::ChannelTypeConfigGen; use netpod::DtNano; use netpod::NodeConfigCached; +use netpod::ReqCtx; use netpod::SfChFetchInfo; use netpod::SfDbChannel; use std::collections::BTreeMap; @@ -48,6 +49,7 @@ fn decide_sf_ch_config_quorum(inp: Vec) -> Result Result, Error> { let range = match range { @@ -63,9 +65,12 @@ async fn find_sf_ch_config_quorum( // TODO expand: false, }; - let res = tokio::time::timeout(Duration::from_millis(4000), http_get_channel_config(qu, node.baseurl())) - .await - .map_err(|_| Error::with_msg_no_trace("timeout"))??; + let res = tokio::time::timeout( + Duration::from_millis(4000), + http_get_channel_config(qu, node.baseurl(), ctx), + ) + .await + .map_err(|_| Error::with_msg_no_trace("timeout"))??; all.push(res); } let all: Vec<_> = all.into_iter().filter_map(|x| x).collect(); @@ -84,6 +89,7 @@ async fn find_sf_ch_config_quorum( pub async fn find_config_basics_quorum( channel: SfDbChannel, range: SeriesRange, + ctx: &ReqCtx, ncc: &NodeConfigCached, ) -> Result, Error> { if let Some(_cfg) = &ncc.node.sf_databuffer { @@ -100,7 +106,7 @@ pub async fn find_config_basics_quorum( } else { channel }; - match find_sf_ch_config_quorum(channel, range, ncc).await? { + match find_sf_ch_config_quorum(channel, range, ctx, ncc).await? { Some(x) => Ok(Some(ChannelTypeConfigGen::SfDatabuffer(x))), None => Ok(None), } diff --git a/crates/nodenet/src/conn.rs b/crates/nodenet/src/conn.rs index 1bd46ea..e5ce42a 100644 --- a/crates/nodenet/src/conn.rs +++ b/crates/nodenet/src/conn.rs @@ -166,7 +166,7 @@ pub async fn create_response_bytes_stream( evq.ch_conf().shape(), ); debug!("wasm1 {:?}", evq.wasm1()); - let reqctx = netpod::ReqCtx::new(evq.reqid()); + let reqctx = netpod::ReqCtx::new(evq.reqid()).into(); if evq.create_errors_contains("nodenet_parse_query") { let e = Error::with_msg_no_trace("produced error on request nodenet_parse_query"); return Err(e); diff --git a/crates/streams/src/plaineventsjson.rs b/crates/streams/src/plaineventsjson.rs index ab84fb6..44fdc3e 100644 --- a/crates/streams/src/plaineventsjson.rs +++ b/crates/streams/src/plaineventsjson.rs @@ -14,6 +14,7 @@ use items_2::streams::PlainEventStream; use netpod::log::*; use netpod::ChannelTypeConfigGen; use netpod::Cluster; +use netpod::ReqCtx; use query::api4::events::EventsSubQuery; use query::api4::events::EventsSubQuerySelect; use query::api4::events::EventsSubQuerySettings; @@ -24,7 +25,7 @@ use std::time::Instant; pub async fn plain_events_json( evq: &PlainEventsQuery, ch_conf: ChannelTypeConfigGen, - reqid: String, + ctx: &ReqCtx, cluster: &Cluster, ) -> Result { info!("plain_events_json evquery {:?}", evq); @@ -33,12 +34,12 @@ pub async fn plain_events_json( select.set_wasm1(x.into()); } let settings = EventsSubQuerySettings::from(evq); - let subq = EventsSubQuery::from_parts(select, settings, reqid); + let subq = EventsSubQuery::from_parts(select, settings, ctx.reqid().into()); // TODO remove magic constant let deadline = Instant::now() + evq.timeout(); let mut tr = build_merged_event_transform(evq.transform())?; // TODO make sure the empty container arrives over the network. - let inps = open_event_data_streams::(subq, cluster).await?; + let inps = open_event_data_streams::(subq, ctx, cluster).await?; // TODO propagate also the max-buf-len for the first stage event reader. // TODO use a mixture of count and byte-size as threshold. let stream = Merger::new(inps, evq.merger_out_len_max()); @@ -72,6 +73,7 @@ pub async fn plain_events_json( let t = httpclient::http_get( Url::parse(&format!("http://data-api.psi.ch/distri/{}", wasmname)).unwrap(), "*/*", + ctx, ) .await .unwrap(); diff --git a/crates/streams/src/tcprawclient.rs b/crates/streams/src/tcprawclient.rs index 9aa48ac..d7012b7 100644 --- a/crates/streams/src/tcprawclient.rs +++ b/crates/streams/src/tcprawclient.rs @@ -21,6 +21,7 @@ use items_2::frame::make_term_frame; use netpod::log::*; use netpod::Cluster; use netpod::Node; +use netpod::ReqCtx; use netpod::APP_OCTET; use query::api4::events::EventsSubQuery; use query::api4::events::Frame1Parts; @@ -61,6 +62,7 @@ pub async fn x_processed_event_blobs_stream_from_node_tcp( pub async fn x_processed_event_blobs_stream_from_node_http( subq: EventsSubQuery, node: Node, + ctx: &ReqCtx, ) -> Result> + Send>>, Error> { use http::header; use http::Method; @@ -80,6 +82,7 @@ pub async fn x_processed_event_blobs_stream_from_node_http( .uri(&uri) .header(header::HOST, uri.host().unwrap()) .header(header::ACCEPT, APP_OCTET) + .header(ctx.header_name(), ctx.header_value()) .body(body_bytes(buf)) .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; let mut client = httpclient::connect_client(req.uri()).await?; @@ -115,9 +118,10 @@ pub async fn x_processed_event_blobs_stream_from_node_http( pub async fn x_processed_event_blobs_stream_from_node( subq: EventsSubQuery, node: Node, + ctx: ReqCtx, ) -> Result> + Send>>, Error> { if true { - x_processed_event_blobs_stream_from_node_http(subq, node).await + x_processed_event_blobs_stream_from_node_http(subq, node, &ctx).await } else { x_processed_event_blobs_stream_from_node_tcp(subq, node).await } @@ -154,7 +158,11 @@ where Ok(streams) } -async fn open_event_data_streams_http(subq: EventsSubQuery, cluster: &Cluster) -> Result>, Error> +async fn open_event_data_streams_http( + subq: EventsSubQuery, + ctx: &ReqCtx, + cluster: &Cluster, +) -> Result>, Error> where // TODO group bounds in new trait T: FrameTypeInnerStatic + DeserializeOwned + Send + Unpin + fmt::Debug + 'static, @@ -178,6 +186,7 @@ where .uri(&uri) .header(header::HOST, uri.host().unwrap()) .header(header::ACCEPT, APP_OCTET) + .header(ctx.header_name(), ctx.header_value()) .body(body_bytes(buf)) .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; let mut client = httpclient::connect_client(req.uri()).await?; @@ -210,13 +219,17 @@ where Ok(streams) } -pub async fn open_event_data_streams(subq: EventsSubQuery, cluster: &Cluster) -> Result>, Error> +pub async fn open_event_data_streams( + subq: EventsSubQuery, + ctx: &ReqCtx, + cluster: &Cluster, +) -> Result>, Error> where // TODO group bounds in new trait T: FrameTypeInnerStatic + DeserializeOwned + Send + Unpin + fmt::Debug + 'static, { if true { - open_event_data_streams_http(subq, cluster).await + open_event_data_streams_http(subq, ctx, cluster).await } else { open_event_data_streams_tcp(subq, cluster).await } diff --git a/crates/streams/src/timebinnedjson.rs b/crates/streams/src/timebinnedjson.rs index b715bda..43a6837 100644 --- a/crates/streams/src/timebinnedjson.rs +++ b/crates/streams/src/timebinnedjson.rs @@ -23,6 +23,7 @@ use netpod::range::evrange::NanoRange; use netpod::BinnedRangeEnum; use netpod::ChannelTypeConfigGen; use netpod::Cluster; +use netpod::ReqCtx; use query::api4::binned::BinnedQuery; use query::api4::events::EventsSubQuery; use query::api4::events::EventsSubQuerySelect; @@ -41,7 +42,7 @@ async fn timebinnable_stream( range: NanoRange, one_before_range: bool, ch_conf: ChannelTypeConfigGen, - reqid: String, + ctx: &ReqCtx, cluster: Cluster, ) -> Result { let mut select = EventsSubQuerySelect::new(ch_conf, range.clone().into(), query.transform().clone()); @@ -49,9 +50,9 @@ async fn timebinnable_stream( select.set_wasm1(wasm1.into()); } let settings = EventsSubQuerySettings::from(&query); - let subq = EventsSubQuery::from_parts(select.clone(), settings, reqid); + let subq = EventsSubQuery::from_parts(select.clone(), settings, ctx.reqid().into()); let mut tr = build_merged_event_transform(subq.transform())?; - let inps = open_event_data_streams::(subq, &cluster).await?; + let inps = open_event_data_streams::(subq, ctx, &cluster).await?; // TODO propagate also the max-buf-len for the first stage event reader. // TODO use a mixture of count and byte-size as threshold. let stream = Merger::new(inps, query.merger_out_len_max()); @@ -75,6 +76,7 @@ async fn timebinnable_stream( let t = httpclient::http_get( Url::parse(&format!("http://data-api.psi.ch/distri/{}", wasmname)).unwrap(), "*/*", + ctx, ) .await .unwrap(); @@ -209,7 +211,7 @@ async fn timebinned_stream( query: BinnedQuery, binned_range: BinnedRangeEnum, ch_conf: ChannelTypeConfigGen, - reqid: String, + ctx: &ReqCtx, cluster: Cluster, ) -> Result>> + Send>>, Error> { let range = binned_range.binned_range_time().to_nano_range(); @@ -217,7 +219,7 @@ async fn timebinned_stream( let do_time_weight = true; let one_before_range = true; - let stream = timebinnable_stream(query.clone(), range, one_before_range, ch_conf, reqid, cluster).await?; + let stream = timebinnable_stream(query.clone(), range, one_before_range, ch_conf, ctx, cluster).await?; let stream: Pin> = stream.0; let stream = Box::pin(stream); // TODO rename TimeBinnedStream to make it more clear that it is the component which initiates the time binning. @@ -243,13 +245,13 @@ fn timebinned_to_collectable( pub async fn timebinned_json( query: BinnedQuery, ch_conf: ChannelTypeConfigGen, - reqid: String, + ctx: &ReqCtx, cluster: Cluster, ) -> Result { let deadline = Instant::now().checked_add(query.timeout_value()).unwrap(); let binned_range = BinnedRangeEnum::covering_range(query.range().clone(), query.bin_count())?; let collect_max = 10000; - let stream = timebinned_stream(query.clone(), binned_range.clone(), ch_conf, reqid, cluster).await?; + let stream = timebinned_stream(query.clone(), binned_range.clone(), ch_conf, ctx, cluster).await?; let stream = timebinned_to_collectable(stream); let collected = Collect::new(stream, deadline, collect_max, None, Some(binned_range)); let collected: BoxFuture<_> = Box::pin(collected); diff --git a/crates/taskrun/src/formatter.rs b/crates/taskrun/src/formatter.rs index 1e4c797..ae69c29 100644 --- a/crates/taskrun/src/formatter.rs +++ b/crates/taskrun/src/formatter.rs @@ -61,11 +61,12 @@ where match current_thread.name() { Some(name) => { let n = name.len(); - let max = 14; + let max = 32; if n > max { - writer.write_str(&name[0..2])?; + let pre = 3; + writer.write_str(&name[0..3])?; writer.write_char('.')?; - writer.write_str(&name[name.len() + 3 - max..])?; + writer.write_str(&name[name.len() + 1 + pre - max..])?; } else { writer.write_str(name)?; }