From 1b3e9ebd2a0efc52fb571ffb95203edd1174ee62 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 5 Dec 2023 15:44:11 +0100 Subject: [PATCH] WIP --- crates/daqbuffer/Cargo.toml | 13 +- crates/daqbuffer/src/bin/daqbuffer.rs | 17 +- crates/daqbuffer/src/err.rs | 2 - crates/daqbufp2/Cargo.toml | 4 +- crates/daqbufp2/src/client.rs | 31 +- .../daqbufp2/src/test/api1/data_api_python.rs | 18 +- crates/daqbufp2/src/test/api4/binnedjson.rs | 23 +- crates/daqbufp2/src/test/api4/common.rs | 37 +- crates/daqbufp2/src/test/api4/eventsjson.rs | 19 +- crates/daqbufp2/src/test/timeweightedjson.rs | 21 +- crates/dbconn/src/dbconn.rs | 1 + crates/disk/Cargo.toml | 22 +- crates/disk/src/disk.rs | 7 +- crates/err/Cargo.toml | 3 + crates/err/src/lib.rs | 82 ++- crates/httpclient/Cargo.toml | 7 +- crates/httpclient/src/httpclient.rs | 298 +++++---- crates/httpret/Cargo.toml | 14 +- crates/httpret/src/api1.rs | 121 ++-- crates/httpret/src/bodystream.rs | 21 +- crates/httpret/src/cache.rs | 114 ++++ crates/httpret/src/download.rs | 46 +- crates/httpret/src/err.rs | 1 + crates/httpret/src/gather.rs | 187 +++--- crates/httpret/src/httpret.rs | 238 ++++---- crates/httpret/src/prometheus.rs | 5 +- crates/httpret/src/proxy/api4.rs | 1 - crates/httpret/src/pulsemap.rs | 572 ++++++++---------- crates/httpret/src/settings.rs | 6 +- crates/items_2/src/eventsdim0.rs | 2 +- crates/streams/Cargo.toml | 2 +- crates/streams/src/tcprawclient.rs | 35 +- crates/taskrun/Cargo.toml | 5 +- crates/taskrun/src/formatter.rs | 123 ++++ crates/taskrun/src/taskrun.rs | 30 +- 35 files changed, 1180 insertions(+), 948 deletions(-) create mode 100644 crates/httpret/src/cache.rs create mode 100644 crates/taskrun/src/formatter.rs diff --git a/crates/daqbuffer/Cargo.toml b/crates/daqbuffer/Cargo.toml index 412bbba..2f50fb5 100644 --- a/crates/daqbuffer/Cargo.toml +++ b/crates/daqbuffer/Cargo.toml @@ -5,20 +5,19 @@ authors = ["Dominik Werder "] edition = "2021" [dependencies] -hyper = "0.14" -http = "0.2" futures-util = "0.3.14" -bytes = "1.0.1" +bytes = "1.5.0" #dashmap = "3" serde = "1.0" serde_derive = "1.0" serde_json = "1.0" -serde_yaml = "0.9.16" -chrono = "0.4" -url = "2.2.2" -clap = { version = "4.3.21", features = ["derive", "cargo"] } +serde_yaml = "0.9.27" +chrono = "0.4.31" +url = "2.5.0" +clap = { version = "4.4.11", features = ["derive", "cargo"] } err = { path = "../err" } taskrun = { path = "../taskrun" } netpod = { path = "../netpod" } disk = { path = "../disk" } +httpclient = { path = "../httpclient" } daqbufp2 = { path = "../daqbufp2" } diff --git a/crates/daqbuffer/src/bin/daqbuffer.rs b/crates/daqbuffer/src/bin/daqbuffer.rs index edadc74..0aeab3b 100644 --- a/crates/daqbuffer/src/bin/daqbuffer.rs +++ b/crates/daqbuffer/src/bin/daqbuffer.rs @@ -84,7 +84,18 @@ async fn go() -> Result<(), Error> { let cfg = cfg?; daqbufp2::run_node(cfg, service_version).await?; } else if let Ok(cfg) = serde_yaml::from_slice::(&buf) { - info!("Parsed yaml config from {}", subcmd.config); + let sp = span!(Level::INFO, "parse", id = 123u32); + sp.in_scope(|| { + let sp = span!(Level::TRACE, "sptrace"); + sp.in_scope(|| { + let sp = span!(Level::INFO, "cfg", file = "some"); + sp.in_scope(|| { + debug!("Parsed yaml config from {}", subcmd.config); + info!("Parsed yaml config from {}", subcmd.config); + warn!("Parsed yaml config from {}", subcmd.config); + }); + }); + }); let cfg: Result = cfg.into(); let cfg = cfg?; daqbufp2::run_node(cfg, service_version).await?; @@ -138,9 +149,11 @@ async fn go() -> Result<(), Error> { Ok(()) } -// TODO test data needs to be generated +// TODO test data needs to be generated. +// TODO use httpclient for the request: need to add binary POST. //#[test] #[allow(unused)] +#[cfg(DISABLED)] fn simple_fetch() { use daqbuffer::err::ErrConv; use netpod::timeunits::*; diff --git a/crates/daqbuffer/src/err.rs b/crates/daqbuffer/src/err.rs index 6036a66..0e70b2c 100644 --- a/crates/daqbuffer/src/err.rs +++ b/crates/daqbuffer/src/err.rs @@ -13,6 +13,4 @@ impl ErrConv for Result { } } -impl Convable for http::Error {} -impl Convable for hyper::Error {} impl Convable for serde_yaml::Error {} diff --git a/crates/daqbufp2/Cargo.toml b/crates/daqbufp2/Cargo.toml index 7ba9bd4..e5f8781 100644 --- a/crates/daqbufp2/Cargo.toml +++ b/crates/daqbufp2/Cargo.toml @@ -9,8 +9,8 @@ path = "src/daqbufp2.rs" [dependencies] tokio = { version = "1.22.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } -hyper = "0.14" -http = "0.2" +hyper = { version = "1.0.1", features = ["client", "http1", "http2"] } +http = "1" tracing = "0.1.25" tracing-subscriber = "0.3.16" futures-util = "0.3.25" diff --git a/crates/daqbufp2/src/client.rs b/crates/daqbufp2/src/client.rs index 2ddba40..bb792d7 100644 --- a/crates/daqbufp2/src/client.rs +++ b/crates/daqbufp2/src/client.rs @@ -1,12 +1,11 @@ use crate::err::ErrConv; +use bytes::Bytes; use chrono::DateTime; use chrono::Utc; use disk::streamlog::Streamlog; use err::Error; use futures_util::TryStreamExt; use http::StatusCode; -use httpclient::HttpBodyAsAsyncRead; -use hyper::Body; use items_0::streamitem::StreamItem; use netpod::log::*; use netpod::query::CacheUsage; @@ -18,7 +17,6 @@ use netpod::SfDbChannel; use netpod::APP_OCTET; use query::api4::binned::BinnedQuery; use streams::frames::inmem::InMemoryFrameStream; -use streams::frames::inmem::TcpReadAsBytes; use url::Url; pub async fn status(host: String, port: u16) -> Result<(), Error> { @@ -27,16 +25,16 @@ pub async fn status(host: String, port: u16) -> Result<(), Error> { let req = hyper::Request::builder() .method(http::Method::GET) .uri(uri) - .body(Body::empty()) - .ec()?; - let client = hyper::Client::new(); - let res = client.request(req).await.ec()?; + .body(httpclient::Full::new(Bytes::new()))?; + let mut client = httpclient::connect_client(req.uri()).await?; + let res = client.send_request(req).await?; if res.status() != StatusCode::OK { error!("Server error {:?}", res); return Err(Error::with_msg(format!("Server error {:?}", res))); } - let body = hyper::body::to_bytes(res.into_body()).await.ec()?; - let res = String::from_utf8(body.to_vec())?; + let (_, body) = res.into_parts(); + let body = httpclient::read_body_bytes(body).await?; + let res = String::from_utf8_lossy(&body); let t2 = chrono::Utc::now(); let ms = t2.signed_duration_since(t1).num_milliseconds() as u64; info!("node_status DONE duration: {} ms", ms); @@ -75,15 +73,15 @@ pub async fn get_binned( .method(http::Method::GET) .uri(url.to_string()) .header(http::header::ACCEPT, APP_OCTET) - .body(Body::empty()) + .body(httpclient::Full::new(Bytes::new())) .ec()?; - let client = hyper::Client::new(); - let res = client.request(req).await.ec()?; + let mut client = httpclient::connect_client(req.uri()).await?; + let res = client.send_request(req).await?; if res.status() != StatusCode::OK { error!("Server error {:?}", res); let (head, body) = res.into_parts(); - let buf = hyper::body::to_bytes(body).await.ec()?; - let s = String::from_utf8_lossy(&buf); + let body = httpclient::read_body_bytes(body).await?; + let s = String::from_utf8_lossy(&body); return Err(Error::with_msg(format!( concat!( "Server error {:?}\n", @@ -94,8 +92,9 @@ pub async fn get_binned( head, s ))); } - let s1 = HttpBodyAsAsyncRead::new(res); - let s2 = InMemoryFrameStream::new(TcpReadAsBytes::new(s1), ByteSize::from_kb(8)); + let (_head, body) = res.into_parts(); + let inp = httpclient::IncomingStream::new(body); + let s2 = InMemoryFrameStream::new(inp, ByteSize::from_kb(8)); use futures_util::StreamExt; use std::future::ready; let s3 = s2 diff --git a/crates/daqbufp2/src/test/api1/data_api_python.rs b/crates/daqbufp2/src/test/api1/data_api_python.rs index da85c4f..2312af9 100644 --- a/crates/daqbufp2/src/test/api1/data_api_python.rs +++ b/crates/daqbufp2/src/test/api1/data_api_python.rs @@ -1,9 +1,6 @@ -use crate::err::ErrConv; use crate::nodes::require_test_hosts_running; use chrono::Utc; use err::Error; -use http::StatusCode; -use hyper::Body; use netpod::log::*; use netpod::range::evrange::NanoRange; use netpod::timeunits::MS; @@ -46,20 +43,7 @@ async fn fetch_data_api_python_blob( let hp = HostPort::from_node(node0); let url = Url::parse(&format!("http://{}:{}/api/1/query", hp.host, hp.port))?; info!("http get {}", url); - let req = hyper::Request::builder() - .method(http::Method::POST) - .uri(url.to_string()) - .header(http::header::CONTENT_TYPE, APP_JSON) - //.header(http::header::ACCEPT, APP_JSON) - .body(Body::from(query_str)) - .ec()?; - let client = hyper::Client::new(); - let res = client.request(req).await.ec()?; - if res.status() != StatusCode::OK { - error!("client response {:?}", res); - return Err(Error::with_msg_no_trace(format!("bad result {res:?}"))); - } - let buf = hyper::body::to_bytes(res.into_body()).await.ec()?; + let buf = httpclient::http_post(url, APP_JSON, query_str).await?; let t2 = chrono::Utc::now(); let ms = t2.signed_duration_since(t1).num_milliseconds() as u64; // TODO add timeout diff --git a/crates/daqbufp2/src/test/api4/binnedjson.rs b/crates/daqbufp2/src/test/api4/binnedjson.rs index bf9f931..dcfa983 100644 --- a/crates/daqbufp2/src/test/api4/binnedjson.rs +++ b/crates/daqbufp2/src/test/api4/binnedjson.rs @@ -1,9 +1,6 @@ -use crate::err::ErrConv; use crate::nodes::require_test_hosts_running; use chrono::Utc; use err::Error; -use http::StatusCode; -use hyper::Body; use items_0::test::f32_iter_cmp_near; use items_0::test::f64_iter_cmp_near; use items_0::WithLen; @@ -352,24 +349,8 @@ async fn get_binned_json( let mut url = Url::parse(&format!("http://{}:{}/api/4/binned", hp.host, hp.port))?; query.append_to_url(&mut url); let url = url; - debug!("http get {}", url); - let req = hyper::Request::builder() - .method(http::Method::GET) - .uri(url.to_string()) - .header(http::header::ACCEPT, APP_JSON) - .body(Body::empty()) - .ec()?; - let client = hyper::Client::new(); - let res = client.request(req).await.ec()?; - if res.status() != StatusCode::OK { - error!("error response {:?}", res); - let buf = hyper::body::to_bytes(res.into_body()).await.ec()?; - let s = String::from_utf8_lossy(&buf); - error!("body of error response: {s}"); - return Err(Error::with_msg_no_trace(format!("error response"))); - } - let buf = hyper::body::to_bytes(res.into_body()).await.ec()?; - let s = String::from_utf8_lossy(&buf); + let res = httpclient::http_get(url, APP_JSON).await?; + let s = String::from_utf8_lossy(&res.body); let res: JsonValue = serde_json::from_str(&s)?; let pretty = serde_json::to_string_pretty(&res)?; debug!("get_binned_json pretty {pretty}"); diff --git a/crates/daqbufp2/src/test/api4/common.rs b/crates/daqbufp2/src/test/api4/common.rs index 7e95e27..c6c10b5 100644 --- a/crates/daqbufp2/src/test/api4/common.rs +++ b/crates/daqbufp2/src/test/api4/common.rs @@ -1,8 +1,5 @@ -use crate::err::ErrConv; use chrono::Utc; use err::Error; -use http::StatusCode; -use hyper::Body; use netpod::log::*; use netpod::AppendToUrl; use netpod::Cluster; @@ -21,21 +18,8 @@ pub async fn fetch_events_json(query: PlainEventsQuery, cluster: &Cluster) -> Re let mut url = Url::parse(&format!("http://{}:{}/api/4/events", hp.host, hp.port))?; query.append_to_url(&mut url); let url = url; - debug!("fetch_events_json url {}", url); - let req = hyper::Request::builder() - .method(http::Method::GET) - .uri(url.to_string()) - .header(http::header::ACCEPT, APP_JSON) - .body(Body::empty()) - .ec()?; - let client = hyper::Client::new(); - let res = client.request(req).await.ec()?; - if res.status() != StatusCode::OK { - error!("client response {:?}", res); - return Err(Error::with_msg_no_trace(format!("bad result {res:?}"))); - } - let buf = hyper::body::to_bytes(res.into_body()).await.ec()?; - let s = String::from_utf8_lossy(&buf); + let res = httpclient::http_get(url, APP_JSON).await?; + let s = String::from_utf8_lossy(&res.body); let res: JsonValue = serde_json::from_str(&s)?; let pretty = serde_json::to_string_pretty(&res)?; debug!("fetch_binned_json pretty: {pretty}"); @@ -54,21 +38,8 @@ pub async fn fetch_binned_json(query: BinnedQuery, cluster: &Cluster) -> Result< let mut url = Url::parse(&format!("http://{}:{}/api/4/binned", hp.host, hp.port))?; query.append_to_url(&mut url); let url = url; - debug!("fetch_binned_json url {}", url); - let req = hyper::Request::builder() - .method(http::Method::GET) - .uri(url.to_string()) - .header(http::header::ACCEPT, APP_JSON) - .body(Body::empty()) - .ec()?; - let client = hyper::Client::new(); - let res = client.request(req).await.ec()?; - if res.status() != StatusCode::OK { - error!("client response {:?}", res); - return Err(Error::with_msg_no_trace(format!("bad result {res:?}"))); - } - let buf = hyper::body::to_bytes(res.into_body()).await.ec()?; - let s = String::from_utf8_lossy(&buf); + let res = httpclient::http_get(url, APP_JSON).await?; + let s = String::from_utf8_lossy(&res.body); let res: JsonValue = serde_json::from_str(&s)?; let pretty = serde_json::to_string_pretty(&res)?; debug!("fetch_binned_json pretty: {pretty}"); diff --git a/crates/daqbufp2/src/test/api4/eventsjson.rs b/crates/daqbufp2/src/test/api4/eventsjson.rs index bbb9432..328f415 100644 --- a/crates/daqbufp2/src/test/api4/eventsjson.rs +++ b/crates/daqbufp2/src/test/api4/eventsjson.rs @@ -1,10 +1,8 @@ -use crate::err::ErrConv; use crate::nodes::require_test_hosts_running; use crate::test::api4::common::fetch_events_json; use chrono::Utc; use err::Error; use http::StatusCode; -use hyper::Body; use items_0::WithLen; use items_2::eventsdim0::EventsDim0CollectorOutput; use netpod::log::*; @@ -86,21 +84,8 @@ async fn events_plain_json( let mut url = Url::parse(&format!("http://{}:{}/api/4/events", hp.host, hp.port))?; query.append_to_url(&mut url); let url = url; - info!("http get {}", url); - let req = hyper::Request::builder() - .method(http::Method::GET) - .uri(url.to_string()) - .header(http::header::ACCEPT, APP_JSON) - .body(Body::empty()) - .ec()?; - let client = hyper::Client::new(); - let res = client.request(req).await.ec()?; - if res.status() != StatusCode::OK { - error!("client response {:?}", res); - return Err(Error::with_msg_no_trace(format!("bad result {res:?}"))); - } - let buf = hyper::body::to_bytes(res.into_body()).await.ec()?; - let s = String::from_utf8_lossy(&buf); + let res = httpclient::http_get(url, APP_JSON).await?; + let s = String::from_utf8_lossy(&res.body); let res: JsonValue = serde_json::from_str(&s)?; let pretty = serde_json::to_string_pretty(&res)?; info!("{pretty}"); diff --git a/crates/daqbufp2/src/test/timeweightedjson.rs b/crates/daqbufp2/src/test/timeweightedjson.rs index 48f1dd5..039b4ca 100644 --- a/crates/daqbufp2/src/test/timeweightedjson.rs +++ b/crates/daqbufp2/src/test/timeweightedjson.rs @@ -1,9 +1,6 @@ -use crate::err::ErrConv; use chrono::DateTime; use chrono::Utc; use err::Error; -use http::StatusCode; -use hyper::Body; use netpod::log::*; use netpod::query::CacheUsage; use netpod::range::evrange::NanoRange; @@ -47,25 +44,13 @@ async fn get_json_common( let mut url = Url::parse(&format!("http://{}:{}/api/4/binned", node0.host, node0.port))?; query.append_to_url(&mut url); let url = url; - debug!("get_json_common get {}", url); - let req = hyper::Request::builder() - .method(http::Method::GET) - .uri(url.to_string()) - .header(http::header::ACCEPT, APP_JSON) - .body(Body::empty()) - .ec()?; - let client = hyper::Client::new(); - let res = client.request(req).await.ec()?; - if res.status() != StatusCode::OK { - error!("get_json_common client response {:?}", res); - } - let res = hyper::body::to_bytes(res.into_body()).await.ec()?; + let res = httpclient::http_get(url, APP_JSON).await?; + let s = String::from_utf8_lossy(&res.body); let t2 = chrono::Utc::now(); let ms = t2.signed_duration_since(t1).num_milliseconds() as u64; // TODO add timeout debug!("get_json_common DONE time {} ms", ms); - let res = String::from_utf8_lossy(&res).to_string(); - let res: serde_json::Value = serde_json::from_str(res.as_str())?; + let res: serde_json::Value = serde_json::from_str(&s)?; // TODO assert these: debug!( "result from endpoint: --------------\n{}\n--------------", diff --git a/crates/dbconn/src/dbconn.rs b/crates/dbconn/src/dbconn.rs index 0865c99..6e507dc 100644 --- a/crates/dbconn/src/dbconn.rs +++ b/crates/dbconn/src/dbconn.rs @@ -7,6 +7,7 @@ pub mod pg { pub use tokio_postgres::Client; pub use tokio_postgres::Error; pub use tokio_postgres::NoTls; + pub use tokio_postgres::Statement; } use err::anyhow; diff --git a/crates/disk/Cargo.toml b/crates/disk/Cargo.toml index 24e65a3..14e565e 100644 --- a/crates/disk/Cargo.toml +++ b/crates/disk/Cargo.toml @@ -8,29 +8,27 @@ edition = "2021" path = "src/disk.rs" [dependencies] -serde = { version = "1.0", features = ["derive"] } -serde_json = "1.0" -serde_cbor = "0.11.1" -http = "0.2" -chrono = { version = "0.4.19", features = ["serde"] } -tokio-stream = {version = "0.1.5", features = ["fs"]} -hyper = { version = "0.14", features = ["http1", "http2", "client", "server", "tcp", "stream"] } +serde = { version = "1.0.193", features = ["derive"] } +serde_json = "1.0.108" +serde_cbor = "0.11.2" +chrono = { version = "0.4.31", features = ["serde"] } +tokio-stream = {version = "0.1.14", features = ["fs"]} async-channel = "1.9.0" -crossbeam = "0.8" -bytes = "1.4.0" +crossbeam = "0.8.2" +bytes = "1.5.0" crc32fast = "1.3.2" arrayref = "0.3.6" -byteorder = "1.4.3" +byteorder = "1.5.0" futures-util = "0.3.14" async-stream = "0.3.0" -tracing = "0.1.25" +tracing = "0.1.40" tracing-futures = { version = "0.2.5", features = ["futures-03", "std-future"] } fs2 = "0.4.3" libc = "0.2.93" hex = "0.4.3" num-traits = "0.2.14" num-derive = "0.4.0" -url = "2.4.0" +url = "2.5.0" tiny-keccak = { version = "2.0", features = ["sha3"] } err = { path = "../err" } taskrun = { path = "../taskrun" } diff --git a/crates/disk/src/disk.rs b/crates/disk/src/disk.rs index 6b669dc..e8acfd6 100644 --- a/crates/disk/src/disk.rs +++ b/crates/disk/src/disk.rs @@ -769,7 +769,7 @@ impl BlockingTaskIntoChannel { match tx.send_blocking(Err(Error::with_msg_no_trace(msg))) { Ok(()) => (), Err(e) => { - error!("blocking_task_into_channel can not send into channel {e}"); + error!("blocking_task_into_channel can not send Err into channel {e}"); } } break; @@ -783,7 +783,8 @@ impl BlockingTaskIntoChannel { match tx.send_blocking(Ok(item)) { Ok(()) => (), Err(e) => { - error!("blocking_task_into_channel can not send into channel {e}"); + // Receiver most likely disconnected. + // error!("blocking_task_into_channel can not send into channel {e}"); break; } } @@ -793,7 +794,7 @@ impl BlockingTaskIntoChannel { match tx.send_blocking(Err(e.into())) { Ok(()) => (), Err(e) => { - error!("blocking_task_into_channel can not send into channel {e}"); + error!("blocking_task_into_channel can not send Err into channel {e}"); } } break; diff --git a/crates/err/Cargo.toml b/crates/err/Cargo.toml index eb6b83c..bb52edf 100644 --- a/crates/err/Cargo.toml +++ b/crates/err/Cargo.toml @@ -14,10 +14,13 @@ serde_json = "1.0" serde_cbor = "0.11.2" rmp-serde = "1.1.1" async-channel = "1.9.0" +async_channel_2 = { package = "async-channel", version = "2.0.0" } chrono = { version = "0.4.26", features = ["serde"] } url = "2.4.0" regex = "1.9.1" http = "0.2.9" +http_1 = { package = "http", version = "1.0.0" } +hyper_1 = { package = "hyper", version = "1.0.1" } thiserror = "=0.0.1" anyhow = "1.0" tokio = "1" diff --git a/crates/err/src/lib.rs b/crates/err/src/lib.rs index 4afdc5c..0ec420e 100644 --- a/crates/err/src/lib.rs +++ b/crates/err/src/lib.rs @@ -118,10 +118,10 @@ impl Error { Self::with_msg_no_trace(e.to_string()) } - pub fn add_backtrace(self) -> Self { - let mut ret = self; - ret.trace_str = Some(fmt_backtrace(&backtrace::Backtrace::new())); - ret + pub fn add_backtrace(mut self) -> Self { + self.msg.extend(" (add_backtrace DISABLED)".chars()); + // ret.trace_str = Some(fmt_backtrace(&backtrace::Backtrace::new())); + self } pub fn mark_bad_request(mut self) -> Self { @@ -165,7 +165,11 @@ impl Error { } } +#[allow(unused)] fn fmt_backtrace(trace: &backtrace::Backtrace) -> String { + if true { + return String::from("fmt_backtrace DISABLED"); + } use std::io::Write; let mut buf = Vec::new(); let mut c1 = 0; @@ -295,139 +299,163 @@ impl ToErr for Infallible { impl From for Error { fn from(k: String) -> Self { - Self::with_msg(k) + Self::from_string(k) } } impl From<&str> for Error { fn from(k: &str) -> Self { - Self::with_msg(k) + Self::from_string(k) } } impl From for Error { fn from(k: std::io::Error) -> Self { - Self::with_msg(k.to_string()) + Self::from_string(k) } } impl From for Error { fn from(k: AddrParseError) -> Self { - Self::with_msg(k.to_string()) + Self::from_string(k) } } impl From for Error { fn from(k: serde_json::Error) -> Self { - Self::with_msg(k.to_string()) + Self::from_string(k) } } impl From> for Error { fn from(k: async_channel::SendError) -> Self { - Self::with_msg(format!("{:?}", k)) + Self::from_string(k) } } impl From for Error { fn from(k: async_channel::RecvError) -> Self { - Self::with_msg(k.to_string()) + Self::from_string(k) + } +} + +impl From> for Error { + fn from(k: async_channel_2::SendError) -> Self { + Self::from_string(k) + } +} + +impl From for Error { + fn from(k: async_channel_2::RecvError) -> Self { + Self::from_string(k) } } impl From for Error { fn from(k: chrono::format::ParseError) -> Self { - Self::with_msg(k.to_string()) + Self::from_string(k) } } impl From for Error { fn from(k: ParseIntError) -> Self { - Self::with_msg(k.to_string()) + Self::from_string(k) } } impl From for Error { fn from(k: ParseFloatError) -> Self { - Self::with_msg(k.to_string()) + Self::from_string(k) } } impl From for Error { fn from(k: FromUtf8Error) -> Self { - Self::with_msg(k.to_string()) + Self::from_string(k) } } impl From for Error { fn from(k: std::str::Utf8Error) -> Self { - Self::with_msg(k.to_string()) + Self::from_string(k) } } impl From for Error { fn from(k: serde_cbor::Error) -> Self { - Self::with_msg(k.to_string()) + Self::from_string(k) } } impl From for Error { fn from(k: std::fmt::Error) -> Self { - Self::with_msg(k.to_string()) + Self::from_string(k) } } impl From for Error { fn from(k: regex::Error) -> Self { - Self::with_msg(k.to_string()) + Self::from_string(k) } } impl From> for Error { fn from(_: PoisonError) -> Self { - Self::with_msg("PoisonError") + Self::from_string("PoisonError") } } impl From for Error { fn from(k: url::ParseError) -> Self { - Self::with_msg(format!("{:?}", k)) + Self::from_string(format!("{:?}", k)) } } impl From for Error { fn from(k: TryFromSliceError) -> Self { - Self::with_msg(format!("{:?}", k)) + Self::from_string(format!("{:?}", k)) } } impl From for Error { fn from(k: rmp_serde::encode::Error) -> Self { - Self::with_msg(format!("{:?}", k)) + Self::from_string(format!("{:?}", k)) } } impl From for Error { fn from(k: rmp_serde::decode::Error) -> Self { - Self::with_msg(format!("{:?}", k)) + Self::from_string(format!("{:?}", k)) } } impl From for Error { fn from(k: http::header::ToStrError) -> Self { - Self::with_msg(format!("{:?}", k)) + Self::from_string(format!("{:?}", k)) } } impl From for Error { fn from(k: anyhow::Error) -> Self { - Self::with_msg(format!("{k}")) + Self::from_string(format!("{k}")) } } impl From for Error { fn from(k: tokio::task::JoinError) -> Self { - Self::with_msg(format!("{k}")) + Self::from_string(format!("{k}")) + } +} + +impl From for Error { + fn from(k: http_1::Error) -> Self { + Self::from_string(k) + } +} + +impl From for Error { + fn from(k: hyper_1::Error) -> Self { + Self::from_string(k) } } diff --git a/crates/httpclient/Cargo.toml b/crates/httpclient/Cargo.toml index 024a627..b8e5d62 100644 --- a/crates/httpclient/Cargo.toml +++ b/crates/httpclient/Cargo.toml @@ -8,12 +8,15 @@ edition = "2021" futures-util = "0.3.25" serde = { version = "1.0.147", features = ["derive"] } serde_json = "1.0.89" -http = "0.2.8" url = "2.3.1" tokio = { version = "1.22.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } tracing = "0.1.37" -hyper = { version = "0.14.23", features = ["http1", "http2", "client", "server", "tcp", "stream"] } +http = "1.0.0" +http-body = "1.0.0" +http-body-util = "0.1.0" +hyper = { version = "1.0.1", features = ["http1", "http2", "client", "server"] } hyper-tls = { version = "0.5.0" } +hyper-util = { version = "0.1.1", features = ["full"] } bytes = "1.3.0" async-channel = "1.8.0" err = { path = "../err" } diff --git a/crates/httpclient/src/httpclient.rs b/crates/httpclient/src/httpclient.rs index b5483da..9da8370 100644 --- a/crates/httpclient/src/httpclient.rs +++ b/crates/httpclient/src/httpclient.rs @@ -1,44 +1,95 @@ +pub use hyper_util; + +pub use http_body_util; +pub use http_body_util::Full; + use bytes::Bytes; -use err::Error; +use bytes::BytesMut; use err::PublicError; use futures_util::pin_mut; use http::header; use http::Request; use http::Response; use http::StatusCode; -use hyper::body::HttpBody; -use hyper::Body; +use http_body_util::combinators::BoxBody; +use hyper::body::Body; +use hyper::client::conn::http2::SendRequest; use hyper::Method; use netpod::log::*; use netpod::AppendToUrl; use netpod::ChannelConfigQuery; use netpod::ChannelConfigResponse; use netpod::NodeConfigCached; +use netpod::APP_JSON; +use std::fmt; use std::pin::Pin; use std::task::Context; use std::task::Poll; use tokio::io; use tokio::io::AsyncRead; use tokio::io::ReadBuf; +use tokio::net::TcpStream; use url::Url; -pub trait ErrConv { - fn ec(self) -> Result; +pub type BodyBox = BoxBody; +pub type RespBox = Response; + +#[derive(Debug)] +pub enum BodyError { + Bad, } -pub trait Convable: ToString {} - -impl ErrConv for Result { - fn ec(self) -> Result { - match self { - Ok(x) => Ok(x), - Err(e) => Err(::err::Error::from_string(e.to_string())), - } +impl fmt::Display for BodyError { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.write_str("Bad") } } -impl Convable for http::Error {} -impl Convable for hyper::Error {} +impl std::error::Error for BodyError {} + +impl From for BodyError { + fn from(_value: std::convert::Infallible) -> Self { + BodyError::Bad + } +} + +#[derive(Debug)] +pub enum Error { + BadUrl, + Connection, + IO, + Http, +} + +impl From for Error { + fn from(value: std::io::Error) -> Self { + Self::IO + } +} + +impl From for Error { + fn from(value: http::Error) -> Self { + Self::Http + } +} + +impl From for Error { + fn from(value: hyper::Error) -> Self { + Self::Http + } +} + +impl fmt::Display for Error { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "{self:?}") + } +} + +impl err::ToErr for Error { + fn to_err(self) -> err::Error { + err::Error::with_msg_no_trace(format!("self")) + } +} pub struct HttpResponse { pub head: http::response::Parts, @@ -49,143 +100,130 @@ pub async fn http_get(url: Url, accept: &str) -> Result { let req = Request::builder() .method(http::Method::GET) .uri(url.to_string()) + .header(header::HOST, url.host_str().ok_or_else(|| Error::BadUrl)?) .header(header::ACCEPT, accept) - .body(Body::empty()) - .ec()?; - let client = hyper::Client::new(); - let res = client.request(req).await.ec()?; - let (head, body) = res.into_parts(); + .body(Full::new(Bytes::new()))?; + let mut send_req = connect_client(req.uri()).await?; + let res = send_req.send_request(req).await?; + let (head, mut body) = res.into_parts(); debug!("http_get head {head:?}"); - let body = hyper::body::to_bytes(body).await.ec()?; - let ret = HttpResponse { head, body }; + use bytes::BufMut; + use http_body_util::BodyExt; + let mut buf = BytesMut::new(); + while let Some(x) = body.frame().await { + match x { + Ok(mut x) => { + if let Some(x) = x.data_mut() { + buf.put(x); + } + } + Err(e) => return Err(e.into()), + } + } + let ret = HttpResponse { + head, + body: buf.freeze(), + }; Ok(ret) } pub async fn http_post(url: Url, accept: &str, body: String) -> Result { + let body = Bytes::from(body.as_bytes().to_vec()); let req = Request::builder() .method(http::Method::POST) .uri(url.to_string()) + .header(header::HOST, url.host_str().ok_or_else(|| Error::BadUrl)?) + .header(header::CONTENT_TYPE, APP_JSON) .header(header::ACCEPT, accept) - .body(Body::from(body)) - .ec()?; - let client = hyper::Client::new(); - let res = client.request(req).await.ec()?; + .body(Full::new(body))?; + let mut send_req = connect_client(req.uri()).await?; + let res = send_req.send_request(req).await?; if res.status() != StatusCode::OK { error!("Server error {:?}", res); - let (head, body) = res.into_parts(); - let buf = hyper::body::to_bytes(body).await.ec()?; + let (_head, body) = res.into_parts(); + let buf = read_body_bytes(body).await?; let s = String::from_utf8_lossy(&buf); - return Err(Error::with_msg(format!( - concat!( - "Server error {:?}\n", - "---------------------- message from http body:\n", - "{}\n", - "---------------------- end of http body", - ), - head, s - ))); + return Err(Error::Http); } - let body = hyper::body::to_bytes(res.into_body()).await.ec()?; - Ok(body) -} - -// TODO move to a better fitting module: -pub struct HttpBodyAsAsyncRead { - inp: Response, - left: Bytes, - rp: usize, -} - -impl HttpBodyAsAsyncRead { - pub fn new(inp: Response) -> Self { - Self { - inp, - left: Bytes::new(), - rp: 0, + let (head, mut body) = res.into_parts(); + debug!("http_get head {head:?}"); + use bytes::BufMut; + use http_body_util::BodyExt; + let mut buf = BytesMut::new(); + while let Some(x) = body.frame().await { + match x { + Ok(mut x) => { + if let Some(x) = x.data_mut() { + buf.put(x); + } + } + Err(e) => return Err(e.into()), } } + let buf = read_body_bytes(body).await?; + Ok(buf) } -impl AsyncRead for HttpBodyAsAsyncRead { - fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context, buf: &mut ReadBuf) -> Poll> { - trace!("impl AsyncRead for HttpBodyAsAsyncRead"); - use Poll::*; - if self.left.len() != 0 { - let n1 = buf.remaining(); - let n2 = self.left.len() - self.rp; - if n2 <= n1 { - buf.put_slice(self.left[self.rp..].as_ref()); - self.left = Bytes::new(); - self.rp = 0; - Ready(Ok(())) - } else { - buf.put_slice(self.left[self.rp..(self.rp + n2)].as_ref()); - self.rp += n2; - Ready(Ok(())) +pub async fn connect_client(uri: &http::Uri) -> Result>, Error> { + let host = uri.host().ok_or_else(|| Error::BadUrl)?; + let port = uri.port_u16().ok_or_else(|| Error::BadUrl)?; + let stream = TcpStream::connect(format!("{host}:{port}")).await?; + let executor = hyper_util::rt::TokioExecutor::new(); + let (send_req, conn) = hyper::client::conn::http2::Builder::new(executor) + .handshake(hyper_util::rt::TokioIo::new(stream)) + .await?; + // TODO would need to take greater care of this task to catch connection-level errors. + tokio::spawn(conn); + Ok(send_req) +} + +pub async fn read_body_bytes(mut body: hyper::body::Incoming) -> Result { + use bytes::BufMut; + use http_body_util::BodyExt; + let mut buf = BytesMut::new(); + while let Some(x) = body.frame().await { + match x { + Ok(mut x) => { + if let Some(x) = x.data_mut() { + buf.put(x); + } } - } else { - let f = &mut self.inp; - pin_mut!(f); - match f.poll_data(cx) { - Ready(Some(Ok(k))) => { - let n1 = buf.remaining(); - if k.len() <= n1 { - buf.put_slice(k.as_ref()); - Ready(Ok(())) + Err(e) => return Err(e.into()), + } + } + Ok(buf.freeze()) +} + +pub struct IncomingStream { + inp: hyper::body::Incoming, +} + +impl IncomingStream { + pub fn new(inp: hyper::body::Incoming) -> Self { + Self { inp } + } +} + +impl futures_util::Stream for IncomingStream { + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + let j = &mut self.get_mut().inp; + let k = Pin::new(j); + match hyper::body::Body::poll_frame(k, cx) { + Ready(Some(x)) => match x { + Ok(x) => { + if let Ok(x) = x.into_data() { + Ready(Some(Ok(x))) } else { - buf.put_slice(k[..n1].as_ref()); - self.left = k; - self.rp = n1; - Ready(Ok(())) + Ready(Some(Ok(Bytes::new()))) } } - Ready(Some(Err(e))) => Ready(Err(io::Error::new( - io::ErrorKind::Other, - Error::with_msg(format!("Received by HttpBodyAsAsyncRead: {:?}", e)), - ))), - Ready(None) => Ready(Ok(())), - Pending => Pending, - } - } - } -} - -pub async fn get_channel_config( - q: &ChannelConfigQuery, - node_config: &NodeConfigCached, -) -> Result { - let mut url = Url::parse(&format!( - "http://{}:{}/api/4/channel/config", - node_config.node.host, node_config.node.port - ))?; - q.append_to_url(&mut url); - let req = hyper::Request::builder() - .method(Method::GET) - .uri(url.as_str()) - .body(Body::empty()) - .map_err(Error::from_string)?; - let client = hyper::Client::new(); - let res = client - .request(req) - .await - .map_err(|e| Error::with_msg(format!("get_channel_config request error: {e:?}")))?; - if res.status().is_success() { - let buf = hyper::body::to_bytes(res.into_body()) - .await - .map_err(|e| Error::with_msg(format!("can not read response: {e:?}")))?; - let ret: ChannelConfigResponse = serde_json::from_slice(&buf) - .map_err(|e| Error::with_msg(format!("can not parse the channel config response json: {e:?}")))?; - Ok(ret) - } else { - let buf = hyper::body::to_bytes(res.into_body()) - .await - .map_err(|e| Error::with_msg(format!("can not read response: {e:?}")))?; - match serde_json::from_slice::(&buf) { - Ok(e) => Err(e.into()), - Err(_) => Err(Error::with_msg(format!( - "can not parse the http error body: {:?}", - String::from_utf8_lossy(&buf) - ))), + Err(e) => Ready(Some(Err(e.into()))), + }, + Ready(None) => Ready(None), + Pending => Pending, } } } diff --git a/crates/httpret/Cargo.toml b/crates/httpret/Cargo.toml index e3292de..60f8f6f 100644 --- a/crates/httpret/Cargo.toml +++ b/crates/httpret/Cargo.toml @@ -10,18 +10,20 @@ path = "src/httpret.rs" [dependencies] serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" -http = "0.2.9" -url = "2.4.0" -hyper = { version = "0.14", features = ["http1", "http2", "client", "server", "tcp", "stream"] } -bytes = "1.4.0" +url = "2.5.0" +http = "1.0.0" +http-body-util = { version = "0.1.0" } +hyper = { version = "1.0.1", features = ["http1", "http2", "client", "server"] } +hyper-util = { version = "0.1.1", features = ["http1", "http2", "client", "server"] } +bytes = "1.5.0" futures-util = "0.3.14" tracing = "0.1" tracing-futures = "0.2" async-channel = "1.9.0" itertools = "0.11.0" chrono = "0.4.23" -md-5 = "0.10.5" -regex = "1.9.3" +md-5 = "0.10.6" +regex = "1.10.2" err = { path = "../err" } netpod = { path = "../netpod" } query = { path = "../query" } diff --git a/crates/httpret/src/api1.rs b/crates/httpret/src/api1.rs index 940ac22..5b48def 100644 --- a/crates/httpret/src/api1.rs +++ b/crates/httpret/src/api1.rs @@ -1,17 +1,24 @@ +use crate::body_empty; +use crate::body_string; use crate::err::Error; use crate::gather::gather_get_json_generic; use crate::gather::SubRes; use crate::response; use crate::ReqCtx; +use crate::Requ; +use crate::RespFull; use bytes::BufMut; +use bytes::Bytes; use bytes::BytesMut; use disk::merge::mergedblobsfromremotes::MergedBlobsFromRemotes; use futures_util::Stream; use futures_util::StreamExt; +use http::header; use http::Method; use http::StatusCode; -use hyper::Body; -use hyper::Client; +use http_body_util::Full; +use httpclient::connect_client; +use httpclient::read_body_bytes; use hyper::Request; use hyper::Response; use items_0::streamitem::RangeCompletableItem; @@ -131,9 +138,9 @@ impl FromErrorCode for ChannelSearchResultItemV1 { #[derive(Debug, Serialize, Deserialize)] pub struct ChannelSearchResultV1(pub Vec); -pub async fn channel_search_list_v1(req: Request, proxy_config: &ProxyConfig) -> Result, Error> { +pub async fn channel_search_list_v1(req: Requ, proxy_config: &ProxyConfig) -> Result { let (head, reqbody) = req.into_parts(); - let bodybytes = hyper::body::to_bytes(reqbody).await?; + let bodybytes = read_body_bytes(reqbody).await?; let query: ChannelSearchQueryV1 = serde_json::from_slice(&bodybytes)?; match head.headers.get(http::header::ACCEPT) { Some(v) => { @@ -156,17 +163,17 @@ pub async fn channel_search_list_v1(req: Request, proxy_config: &ProxyConf } Err(e) => Err(Error::with_msg(format!("parse error for: {:?} {:?}", sh, e))), }) - .fold_ok(vec![], |mut a, x| { + .fold_ok(Vec::new(), |mut a, x| { a.push(x); a })?; let tags: Vec<_> = urls.iter().map(|k| k.to_string()).collect(); let nt = |tag, res| { let fut = async { - let body = hyper::body::to_bytes(res).await?; + let body = read_body_bytes(res).await?; let res: ChannelSearchResult = match serde_json::from_slice(&body) { Ok(k) => k, - Err(_) => ChannelSearchResult { channels: vec![] }, + Err(_) => ChannelSearchResult { channels: Vec::new() }, }; let ret = SubRes { tag, @@ -211,7 +218,7 @@ pub async fn channel_search_list_v1(req: Request, proxy_config: &ProxyConf } let res = response(StatusCode::OK) .header(http::header::CONTENT_TYPE, APP_JSON) - .body(Body::from(serde_json::to_string(&res)?))?; + .body(body_string(serde_json::to_string(&res)?))?; Ok(res) }; let bodies = (0..urls.len()).into_iter().map(|_| None).collect(); @@ -227,19 +234,16 @@ pub async fn channel_search_list_v1(req: Request, proxy_config: &ProxyConf .await?; Ok(ret) } else { - Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?) + Ok(response(StatusCode::NOT_ACCEPTABLE).body(Full::new(Bytes::new()))?) } } - None => Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?), + None => Ok(response(StatusCode::NOT_ACCEPTABLE).body(Full::new(Bytes::new()))?), } } -pub async fn channel_search_configs_v1( - req: Request, - proxy_config: &ProxyConfig, -) -> Result, Error> { +pub async fn channel_search_configs_v1(req: Requ, proxy_config: &ProxyConfig) -> Result { let (head, reqbody) = req.into_parts(); - let bodybytes = hyper::body::to_bytes(reqbody).await?; + let bodybytes = read_body_bytes(reqbody).await?; let query: ChannelSearchQueryV1 = serde_json::from_slice(&bodybytes)?; match head.headers.get(http::header::ACCEPT) { Some(v) => { @@ -270,7 +274,7 @@ pub async fn channel_search_configs_v1( let tags: Vec<_> = urls.iter().map(|k| k.to_string()).collect(); let nt = |tag, res| { let fut = async { - let body = hyper::body::to_bytes(res).await?; + let body = read_body_bytes(res).await?; let res: ChannelSearchResult = match serde_json::from_slice(&body) { Ok(k) => k, Err(_) => ChannelSearchResult { channels: vec![] }, @@ -336,7 +340,7 @@ pub async fn channel_search_configs_v1( } let res = response(StatusCode::OK) .header(http::header::CONTENT_TYPE, APP_JSON) - .body(Body::from(serde_json::to_string(&res)?))?; + .body(Full::new(serde_json::to_string(&res)?))?; Ok(res) }; let bodies = (0..urls.len()).into_iter().map(|_| None).collect(); @@ -352,10 +356,10 @@ pub async fn channel_search_configs_v1( .await?; Ok(ret) } else { - Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?) + Ok(response(StatusCode::NOT_ACCEPTABLE).body(Full::new(Bytes::new()))?) } } - None => Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?), + None => Ok(response(StatusCode::NOT_ACCEPTABLE).body(Full::new(Bytes::new()))?), } } @@ -409,39 +413,37 @@ impl FromErrorCode for ChannelBackendConfigsV1 { fn from_error_code(backend: &str, code: ErrorCode) -> Self { Self { backend: backend.into(), - channels: vec![], + channels: Vec::new(), error: Some(ErrorDescription { code }), } } } // TODO replace usage of this by gather-generic -pub async fn gather_json_2_v1( - req: Request, - pathpre: &str, - _proxy_config: &ProxyConfig, -) -> Result, Error> { +pub async fn gather_json_2_v1(req: Requ, pathpre: &str, _proxy_config: &ProxyConfig) -> Result { let (part_head, part_body) = req.into_parts(); - let bodyslice = hyper::body::to_bytes(part_body).await?; + let bodyslice = read_body_bytes(part_body).await?; let gather_from: GatherFromV1 = serde_json::from_slice(&bodyslice)?; - let mut spawned = vec![]; + let mut spawned = Vec::new(); let uri = part_head.uri; let path_post = &uri.path()[pathpre.len()..]; //let hds = part_head.headers; for gh in gather_from.hosts { let uri = format!("http://{}:{}/{}", gh.host, gh.port, path_post); - let req = Request::builder().method(Method::GET).uri(uri); + let req = Request::builder() + .method(Method::GET) + .uri(uri) + .header(header::HOST, gh.host); let req = if gh.inst.len() > 0 { req.header("retrieval_instance", &gh.inst) } else { req }; let req = req.header(http::header::ACCEPT, APP_JSON); - //.body(Body::from(serde_json::to_string(&q)?))?; - let req = req.body(Body::empty()); + let req = req.body(Full::new(Bytes::new())); let task = tokio::spawn(async move { - //let res = Client::new().request(req); - let res = Client::new().request(req?).await; + let mut client = connect_client(req.uri()).await?; + let res = client.send_request(req).await?; Ok::<_, Error>(process_answer(res?).await?) }); let task = tokio::time::timeout(std::time::Duration::from_millis(5000), task); @@ -488,10 +490,9 @@ struct GatherHostV1 { inst: String, } -async fn process_answer(res: Response) -> Result { +async fn process_answer(res: RespFull) -> Result { let (pre, mut body) = res.into_parts(); if pre.status != StatusCode::OK { - use hyper::body::HttpBody; if let Some(c) = body.data().await { let c: bytes::Bytes = c?; let s1 = String::from_utf8(c.to_vec())?; @@ -504,9 +505,7 @@ async fn process_answer(res: Response) -> Result { Ok(JsonValue::String(format!("status {}", pre.status.as_str()))) } } else { - let body: hyper::Body = body; - let body_all = hyper::body::to_bytes(body).await?; - let val = match serde_json::from_slice(&body_all) { + let val = match serde_json::from_slice(the_data) { Ok(k) => k, Err(_e) => JsonValue::String(String::from_utf8(body_all.to_vec())?), }; @@ -533,6 +532,8 @@ pub struct DataApiPython3DataStream { data_done: bool, completed: bool, stats: Api1WarningStats, + count_emits: u64, + count_bytes: u64, } impl DataApiPython3DataStream { @@ -565,6 +566,8 @@ impl DataApiPython3DataStream { data_done: false, completed: false, stats: Api1WarningStats::new(), + count_emits: 0, + count_bytes: 0, } } @@ -776,12 +779,21 @@ impl Stream for DataApiPython3DataStream { panic!("poll on completed") } else if self.data_done { self.completed = true; + let reqid = self.reqctx.reqid(); + info!( + "{} response body sent {} bytes ({})", + reqid, self.count_bytes, self.count_emits + ); Ready(None) } else { if let Some(stream) = &mut self.chan_stream { match stream.poll_next_unpin(cx) { Ready(Some(k)) => match self.handle_chan_stream_ready(k) { - Ok(k) => Ready(Some(Ok(k))), + Ok(k) => { + self.count_emits += 1; + self.count_bytes += k.len() as u64; + Ready(Some(Ok(k))) + } Err(e) => { error!("{e}"); self.chan_stream = None; @@ -854,7 +866,7 @@ fn shape_to_api3proto(sh: &Option>) -> Vec { pub struct Api1EventsBinaryHandler {} impl Api1EventsBinaryHandler { - pub fn handler(req: &Request) -> Option { + pub fn handler(req: &Requ) -> Option { if req.uri().path() == "/api/1/query" { Some(Self {}) } else { @@ -862,14 +874,9 @@ impl Api1EventsBinaryHandler { } } - pub async fn handle( - &self, - req: Request, - _ctx: &ReqCtx, - node_config: &NodeConfigCached, - ) -> Result, Error> { + pub async fn handle(&self, req: Requ, _ctx: &ReqCtx, node_config: &NodeConfigCached) -> Result { if req.method() != Method::POST { - return Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?); + return Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Full::new(Bytes::new()))?); } let (head, body) = req.into_parts(); let accept = head @@ -878,7 +885,7 @@ impl Api1EventsBinaryHandler { .map_or(Ok(ACCEPT_ALL), |k| k.to_str()) .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))? .to_owned(); - let body_data = hyper::body::to_bytes(body).await?; + let body_data = read_body_bytes(body).await?; if body_data.len() < 1024 * 2 && body_data.first() == Some(&"{".as_bytes()[0]) { debug!("request body_data string: {}", String::from_utf8_lossy(&body_data)); } @@ -932,7 +939,7 @@ impl Api1EventsBinaryHandler { span: tracing::Span, reqidspan: tracing::Span, ncc: &NodeConfigCached, - ) -> Result, Error> { + ) -> Result { let self_name = any::type_name::(); // TODO this should go to usage statistics: debug!( @@ -1004,7 +1011,7 @@ impl Api1EventsBinaryHandler { // TODO set the public error code and message and return Err(e). let e = Error::with_public_msg_no_trace(format!("{self_name} unsupported Accept: {}", accept)); error!("{self_name} {e}"); - Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?) + Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty)?) } } } @@ -1016,7 +1023,7 @@ impl RequestStatusHandler { "/api/1/requestStatus/" } - pub fn handler(req: &Request) -> Option { + pub fn handler(req: &Requ) -> Option { if req.uri().path().starts_with(Self::path_prefix()) { Some(Self {}) } else { @@ -1024,29 +1031,29 @@ impl RequestStatusHandler { } } - pub async fn handle(&self, req: Request, _ncc: &NodeConfigCached) -> Result, Error> { + pub async fn handle(&self, req: Requ, _ncc: &NodeConfigCached) -> Result { let (head, body) = req.into_parts(); if head.method != Method::GET { - return Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?); + return Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?); } let accept = head .headers - .get(http::header::ACCEPT) + .get(header::ACCEPT) .map_or(Ok(ACCEPT_ALL), |k| k.to_str()) .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))? .to_owned(); if accept != APP_JSON && accept != ACCEPT_ALL { // TODO set the public error code and message and return Err(e). - let e = Error::with_public_msg_no_trace(format!("Unsupported Accept: {:?}", accept)); + let e = Error::with_public_msg_no_trace(format!("unsupported accept: {:?}", accept)); error!("{e}"); - return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?); + return Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?); } - let _body_data = hyper::body::to_bytes(body).await?; + let _body_data = read_body_bytes(body).await?; let status_id = &head.uri.path()[Self::path_prefix().len()..]; debug!("RequestStatusHandler status_id {:?}", status_id); let status = crate::status_board()?.status_as_json(status_id); let s = serde_json::to_string(&status)?; - let ret = response(StatusCode::OK).body(Body::from(s))?; + let ret = response(StatusCode::OK).body(Full::new(s))?; Ok(ret) } } diff --git a/crates/httpret/src/bodystream.rs b/crates/httpret/src/bodystream.rs index c08400e..329a9eb 100644 --- a/crates/httpret/src/bodystream.rs +++ b/crates/httpret/src/bodystream.rs @@ -3,7 +3,6 @@ use futures_util::StreamExt; use http::HeaderMap; use http::Response; use http::StatusCode; -use hyper::Body; use netpod::log::*; use netpod::APP_JSON; use std::pin::Pin; @@ -58,15 +57,15 @@ impl ToPublicResponse for ::err::Error { struct BodyStreamWrap(netpod::BodyStream); -impl hyper::body::HttpBody for BodyStreamWrap { - type Data = bytes::Bytes; - type Error = ::err::Error; +// impl hyper::body::HttpBody for BodyStreamWrap { +// type Data = bytes::Bytes; +// type Error = ::err::Error; - fn poll_data(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll>> { - self.0.inner.poll_next_unpin(cx) - } +// fn poll_data(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll>> { +// self.0.inner.poll_next_unpin(cx) +// } - fn poll_trailers(self: Pin<&mut Self>, _cx: &mut Context) -> Poll, Self::Error>> { - Poll::Ready(Ok(None)) - } -} +// fn poll_trailers(self: Pin<&mut Self>, _cx: &mut Context) -> Poll, Self::Error>> { +// Poll::Ready(Ok(None)) +// } +// } diff --git a/crates/httpret/src/cache.rs b/crates/httpret/src/cache.rs new file mode 100644 index 0000000..1923d6f --- /dev/null +++ b/crates/httpret/src/cache.rs @@ -0,0 +1,114 @@ +use async_channel::Receiver; +use async_channel::Sender; +use netpod::log::*; +use std::collections::BTreeMap; +use std::sync::Mutex; +use std::time::SystemTime; + +pub struct Dummy(u32); + +pub enum CachePortal { + Fresh, + Existing(Receiver), + Known(V), +} + +impl CachePortal {} + +enum CacheEntry { + Waiting(SystemTime, Sender, Receiver), + Known(SystemTime, V), +} + +impl CacheEntry { + fn ts(&self) -> &SystemTime { + match self { + CacheEntry::Waiting(ts, _, _) => ts, + CacheEntry::Known(ts, _) => ts, + } + } +} + +struct CacheInner { + map: BTreeMap>, +} + +impl CacheInner +where + K: Ord, +{ + const fn new() -> Self { + Self { map: BTreeMap::new() } + } + + fn housekeeping(&mut self) { + if self.map.len() > 200 { + info!("trigger housekeeping with len {}", self.map.len()); + let mut v: Vec<_> = self.map.iter().map(|(k, v)| (v.ts(), k)).collect(); + v.sort(); + let ts0 = v[v.len() / 2].0.clone(); + //let tsnow = SystemTime::now(); + //let tscut = tsnow.checked_sub(Duration::from_secs(60 * 10)).unwrap_or(tsnow); + self.map.retain(|_k, v| v.ts() >= &ts0); + info!("housekeeping kept len {}", self.map.len()); + } + } +} + +pub struct Cache { + inner: Mutex>, +} + +impl Cache +where + K: Ord, + V: Clone, +{ + pub const fn new() -> Self { + Self { + inner: Mutex::new(CacheInner::new()), + } + } + + pub fn housekeeping(&self) { + let mut g = self.inner.lock().unwrap(); + g.housekeeping(); + } + + pub fn portal(&self, key: K) -> CachePortal { + use std::collections::btree_map::Entry; + let mut g = self.inner.lock().unwrap(); + g.housekeeping(); + match g.map.entry(key) { + Entry::Vacant(e) => { + let (tx, rx) = async_channel::bounded(16); + let ret = CachePortal::Fresh; + let v = CacheEntry::Waiting(SystemTime::now(), tx, rx); + e.insert(v); + ret + } + Entry::Occupied(e) => match e.get() { + CacheEntry::Waiting(_ts, _tx, rx) => CachePortal::Existing(rx.clone()), + CacheEntry::Known(_ts, v) => CachePortal::Known(v.clone()), + }, + } + } + + pub fn set_value(&self, key: K, val: V) { + let mut g = self.inner.lock().unwrap(); + if let Some(e) = g.map.get_mut(&key) { + match e { + CacheEntry::Waiting(ts, tx, _rx) => { + let tx = tx.clone(); + *e = CacheEntry::Known(*ts, val); + tx.close(); + } + CacheEntry::Known(_ts, _val) => { + error!("set_value already known"); + } + } + } else { + error!("set_value no entry for key"); + } + } +} diff --git a/crates/httpret/src/download.rs b/crates/httpret/src/download.rs index a70ca4a..e6a3d51 100644 --- a/crates/httpret/src/download.rs +++ b/crates/httpret/src/download.rs @@ -1,11 +1,18 @@ +use crate::body_empty; use crate::err::Error; use crate::response; +use crate::Requ; +use crate::RespFull; +use crate::StreamBody; +use bytes::Bytes; +use futures_util::Stream; use futures_util::TryStreamExt; use http::Method; +use http::Response; use http::StatusCode; -use hyper::Body; -use hyper::Request; -use hyper::Response; +use http_body_util::BodyExt; +use httpclient::httpclient::http_body_util; +use httpclient::RespBox; use netpod::get_url_query_pairs; use netpod::log::*; use netpod::DiskIoTune; @@ -57,7 +64,7 @@ impl DownloadHandler { "/api/4/test/download/" } - pub fn handler(req: &Request) -> Option { + pub fn handler(req: &Requ) -> Option { if req.uri().path().starts_with(Self::path_prefix()) { Some(Self {}) } else { @@ -65,7 +72,15 @@ impl DownloadHandler { } } - pub async fn get(&self, req: Request, ncc: &NodeConfigCached) -> Result, Error> { + pub async fn handle(&self, req: Requ, node_config: &NodeConfigCached) -> Result { + if req.method() == Method::GET { + self.get(req, node_config).await + } else { + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?) + } + } + + pub async fn get(&self, req: Requ, ncc: &NodeConfigCached) -> Result { let (head, _body) = req.into_parts(); let p2 = &head.uri.path()[Self::path_prefix().len()..]; let base = match &ncc.node.sf_databuffer { @@ -78,15 +93,18 @@ impl DownloadHandler { let pp = base.join(p2); info!("Try to open {pp:?}"); let file = tokio::fs::OpenOptions::new().read(true).open(&pp).await?; - let s = disk::file_content_stream(pp, file, query.disk_io_tune.clone(), "download").map_ok(|x| x.into_buf()); - Ok(response(StatusCode::OK).body(Body::wrap_stream(s))?) - } + let stream = + disk::file_content_stream(pp, file, query.disk_io_tune.clone(), "download").map_ok(|x| x.into_buf()); - pub async fn handle(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { - if req.method() == Method::GET { - self.get(req, node_config).await - } else { - Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) - } + use futures_util::StreamExt; + use hyper::body::Frame; + let stream = stream.map(|item| item.map(|x| Frame::data(x.freeze()))); + let body = httpclient::httpclient::http_body_util::StreamBody::new(stream); + let body = BodyExt::boxed(body); + // let body = http_body_util::combinators::BoxBody::new(body); + // let body: StreamBody = Box::pin(body); + // let body: Pin, err::Error>>>> = Box::pin(body); + let res = response(StatusCode::OK).body(body)?; + Ok(res) } } diff --git a/crates/httpret/src/err.rs b/crates/httpret/src/err.rs index a60675d..55637f7 100644 --- a/crates/httpret/src/err.rs +++ b/crates/httpret/src/err.rs @@ -99,3 +99,4 @@ impl Convable for hyper::Error {} impl Convable for std::array::TryFromSliceError {} impl Convable for err::anyhow::Error {} impl Convable for crate::RetrievalError {} +impl Convable for httpclient::Error {} diff --git a/crates/httpret/src/gather.rs b/crates/httpret/src/gather.rs index cf07a05..40f8f54 100644 --- a/crates/httpret/src/gather.rs +++ b/crates/httpret/src/gather.rs @@ -1,11 +1,16 @@ +use crate::body_empty; +use crate::body_string; use crate::err::Error; use crate::response; +use crate::Requ; +use crate::RespFull; use futures_util::select; use futures_util::FutureExt; use http::Method; use http::StatusCode; -use hyper::Body; -use hyper::Client; +use httpclient::connect_client; +use httpclient::read_body_bytes; +use hyper::body::Incoming; use hyper::Request; use hyper::Response; use netpod::log::*; @@ -34,24 +39,14 @@ struct GatherHost { inst: String, } -async fn process_answer(res: Response) -> Result { - let (pre, mut body) = res.into_parts(); +async fn process_answer(res: Response) -> Result { + let (pre, body) = res.into_parts(); if pre.status != StatusCode::OK { - use hyper::body::HttpBody; - if let Some(c) = body.data().await { - let c: bytes::Bytes = c?; - let s1 = String::from_utf8(c.to_vec())?; - Ok(JsonValue::String(format!( - "status {} body {}", - pre.status.as_str(), - s1 - ))) - } else { - Ok(JsonValue::String(format!("status {}", pre.status.as_str()))) - } + let buf = read_body_bytes(body).await?; + let s = String::from_utf8(buf.to_vec())?; + Ok(JsonValue::String(format!("status {} body {}", pre.status.as_str(), s))) } else { - let body: hyper::Body = body; - let body_all = hyper::body::to_bytes(body).await?; + let body_all = read_body_bytes(body).await?; let val = match serde_json::from_slice(&body_all) { Ok(k) => k, Err(_e) => JsonValue::String(String::from_utf8(body_all.to_vec())?), @@ -60,62 +55,9 @@ async fn process_answer(res: Response) -> Result { } } -pub async fn unused_gather_json_from_hosts(req: Request, pathpre: &str) -> Result, Error> { - let (part_head, part_body) = req.into_parts(); - let bodyslice = hyper::body::to_bytes(part_body).await?; - let gather_from: GatherFrom = serde_json::from_slice(&bodyslice)?; - let mut spawned = vec![]; - let uri = part_head.uri; - let path_post = &uri.path()[pathpre.len()..]; - for gh in gather_from.hosts { - let uri = format!("http://{}:{}/{}", gh.host, gh.port, path_post); - let req = Request::builder().method(Method::GET).uri(uri); - let req = if gh.inst.len() > 0 { - req.header("retrieval_instance", &gh.inst) - } else { - req - }; - let req = req.header(http::header::ACCEPT, APP_JSON); - let req = req.body(Body::empty()); - let task = tokio::spawn(async move { - select! { - _ = sleep(Duration::from_millis(1500)).fuse() => { - Err(Error::with_msg_no_trace(format!("timeout"))) - } - res = Client::new().request(req?).fuse() => Ok(process_answer(res?).await?) - } - }); - spawned.push((gh.clone(), task)); - } - #[derive(Serialize)] - struct Hres { - gh: GatherHost, - res: JsonValue, - } - #[derive(Serialize)] - struct Jres { - hosts: Vec, - } - let mut a = vec![]; - for tr in spawned { - let res = match tr.1.await { - Ok(k) => match k { - Ok(k) => k, - Err(e) => JsonValue::String(format!("ERROR({:?})", e)), - }, - Err(e) => JsonValue::String(format!("ERROR({:?})", e)), - }; - a.push(Hres { gh: tr.0, res }); - } - let res = response(StatusCode::OK) - .header(http::header::CONTENT_TYPE, APP_JSON) - .body(serde_json::to_string(&Jres { hosts: a })?.into())?; - Ok(res) -} - -pub async fn gather_get_json(req: Request, node_config: &NodeConfigCached) -> Result, Error> { +pub async fn gather_get_json(req: Requ, node_config: &NodeConfigCached) -> Result { let (head, body) = req.into_parts(); - let _bodyslice = hyper::body::to_bytes(body).await?; + let _bodyslice = read_body_bytes(body).await?; let pathpre = "/api/4/gather/"; let pathsuf = &head.uri.path()[pathpre.len()..]; let spawned: Vec<_> = node_config @@ -123,20 +65,35 @@ pub async fn gather_get_json(req: Request, node_config: &NodeConfigCached) .cluster .nodes .iter() - .map(|node| { + .filter_map(|node| { let uri = format!("http://{}:{}/api/4/{}", node.host, node.port, pathsuf); let req = Request::builder().method(Method::GET).uri(uri); let req = req.header(http::header::ACCEPT, APP_JSON); - let req = req.body(Body::empty()); - let task = tokio::spawn(async move { - select! { - _ = sleep(Duration::from_millis(1500)).fuse() => { - Err(Error::with_msg_no_trace(format!("timeout"))) - } - res = Client::new().request(req?).fuse() => Ok(process_answer(res?).await?) + match req.body(body_empty()) { + Ok(req) => { + let task = tokio::spawn(async move { + select! { + _ = sleep(Duration::from_millis(1500)).fuse() => { + Err(Error::with_msg_no_trace(format!("timeout"))) + } + res = async move { + let mut client = if let Ok(x) = connect_client(req.uri()).await {x} + else { return Err(Error::with_msg("can not make request")); }; + let res = if let Ok(x) = client.send_request(req).await { x } + else { return Err(Error::with_msg("can not make request")); }; + Ok(res) + }.fuse() => { + Ok(process_answer(res?).await?) + } + } + }); + Some((node.clone(), task)) } - }); - (node.clone(), task) + Err(e) => { + error!("bad request: {e}"); + None + } + } }) .collect(); #[derive(Serialize)] @@ -148,7 +105,7 @@ pub async fn gather_get_json(req: Request, node_config: &NodeConfigCached) struct Jres { hosts: Vec, } - let mut a = vec![]; + let mut a = Vec::new(); for (node, jh) in spawned { let res = match jh.await { Ok(k) => match k { @@ -182,7 +139,7 @@ pub struct SubRes { pub async fn gather_get_json_generic( _method: http::Method, urls: Vec, - bodies: Vec>, + bodies: Vec>, tags: Vec, nt: NT, ft: FT, @@ -192,7 +149,7 @@ pub async fn gather_get_json_generic( ) -> Result where SM: Send + 'static, - NT: Fn(String, Response) -> Pin, Error>> + Send>> + NT: Fn(String, Response) -> Pin, Error>> + Send>> + Send + Sync + Copy @@ -211,7 +168,7 @@ where .into_iter() .zip(bodies.into_iter()) .zip(tags.into_iter()) - .map(move |((url, body), tag)| { + .filter_map(move |((url, body), tag)| { info!("Try gather from {}", url); let url_str = url.as_str(); let req = if body.is_some() { @@ -226,29 +183,43 @@ where req }; let body = match body { - None => Body::empty(), - Some(body) => body, + None => body_empty(), + Some(body) => body_string(body), }; - let req = req.body(body); - let tag2 = tag.clone(); - let jh = tokio::spawn(async move { - select! { - _ = sleep(timeout + extra_timeout).fuse() => { - error!("PROXY TIMEOUT"); - Err(Error::with_msg_no_trace(format!("timeout"))) - } - res = { - let client = Client::new(); - client.request(req?).fuse() - } => { - info!("received result in time"); - let ret = nt(tag2, res?).await?; - info!("transformed result in time"); - Ok(ret) - } + match req.body(body) { + Ok(req) => { + let tag2 = tag.clone(); + let jh = tokio::spawn(async move { + select! { + _ = sleep(timeout + extra_timeout).fuse() => { + error!("PROXY TIMEOUT"); + Err(Error::with_msg_no_trace(format!("timeout"))) + } + res = async move { + let mut client = match connect_client(req.uri()).await { + Ok(x) => x, + Err(e) => return Err(Error::from_to_string(e)), + }; + let res = match client.send_request(req).await { + Ok(x) => x, + Err(e) => return Err(Error::from_to_string(e)), + }; + Ok(res) + }.fuse() => { + info!("received result in time"); + let ret = nt(tag2, res?).await?; + info!("transformed result in time"); + Ok(ret) + } + } + }); + Some((url, tag, jh)) } - }); - (url, tag, jh) + Err(e) => { + error!("bad request: {e}"); + None + } + } }) .collect(); let mut a = Vec::new(); diff --git a/crates/httpret/src/httpret.rs b/crates/httpret/src/httpret.rs index 79fdf6f..45fb225 100644 --- a/crates/httpret/src/httpret.rs +++ b/crates/httpret/src/httpret.rs @@ -1,11 +1,13 @@ pub mod api1; pub mod api4; pub mod bodystream; +pub mod cache; pub mod channel_status; pub mod channelconfig; pub mod download; pub mod err; pub mod gather; +#[cfg(DISABLED)] pub mod prometheus; pub mod proxy; pub mod pulsemap; @@ -17,18 +19,19 @@ use crate::err::Error; use crate::gather::gather_get_json; use ::err::thiserror; use ::err::ThisError; +use bytes::Bytes; use futures_util::Future; use futures_util::FutureExt; use futures_util::StreamExt; use http::Method; use http::StatusCode; -use hyper::server::conn::AddrStream; -use hyper::server::Server; -use hyper::service::make_service_fn; +use http_body_util::combinators::BoxBody; +use http_body_util::Full; +use hyper::body::Incoming; use hyper::service::service_fn; -use hyper::Body; use hyper::Request; use hyper::Response; +use hyper_util::rt::TokioIo; use net::SocketAddr; use netpod::log::*; use netpod::query::prebinned::PreBinnedQuery; @@ -56,6 +59,7 @@ use std::time::SystemTime; use task::Context; use task::Poll; use taskrun::tokio; +use taskrun::tokio::net::TcpListener; pub const PSI_DAQBUFFER_SERVICE_MARK: &'static str = "PSI-Daqbuffer-Service-Mark"; pub const PSI_DAQBUFFER_SEEN_URL: &'static str = "PSI-Daqbuffer-Seen-Url"; @@ -82,6 +86,7 @@ impl IntoBoxedError for net::AddrParseError {} impl IntoBoxedError for tokio::task::JoinError {} impl IntoBoxedError for api4::databuffer_tools::FindActiveError {} impl IntoBoxedError for std::string::FromUtf8Error {} +impl IntoBoxedError for std::io::Error {} impl From for RetrievalError where @@ -118,6 +123,23 @@ pub fn accepts_octets(hm: &http::HeaderMap) -> bool { } } +pub type Requ = Request; +pub type RespFull = Response>; + +use http_body_util::BodyExt; +use httpclient::BodyBox; +use httpclient::RespBox; + +pub fn body_empty() -> BodyBox { + Full::new(Bytes::new()).map_err(Into::into).boxed() +} + +pub fn body_string(body: S) -> BodyBox { + Full::new(Bytes::from(body.to_string())).map_err(Into::into).boxed() +} + +pub type StreamBody = Pin, ::err::Error>>>>; + pub async fn host(node_config: NodeConfigCached, service_version: ServiceVersion) -> Result<(), RetrievalError> { static STATUS_BOARD_INIT: Once = Once::new(); STATUS_BOARD_INIT.call_once(|| { @@ -126,48 +148,64 @@ pub async fn host(node_config: NodeConfigCached, service_version: ServiceVersion let x = Box::new(a); STATUS_BOARD.store(Box::into_raw(x), Ordering::SeqCst); }); + #[cfg(DISABLED)] if let Some(bind) = node_config.node.prometheus_api_bind { tokio::spawn(prometheus::host(bind)); } // let rawjh = taskrun::spawn(nodenet::conn::events_service(node_config.clone())); use std::str::FromStr; - let addr = SocketAddr::from_str(&format!("{}:{}", node_config.node.listen(), node_config.node.port))?; - let make_service = make_service_fn({ - move |conn: &AddrStream| { - debug!("new connection from {:?}", conn.remote_addr()); - let node_config = node_config.clone(); - let addr = conn.remote_addr(); - let service_version = service_version.clone(); - async move { - let ret = service_fn(move |req| { - // TODO send to logstash - info!( - "http-request {:?} - {:?} - {:?} - {:?}", - addr, - req.method(), - req.uri(), - req.headers() - ); - let f = http_service(req, node_config.clone(), service_version.clone()); - Cont { f: Box::pin(f) } - }); - Ok::<_, Error>(ret) + let bind_addr = SocketAddr::from_str(&format!("{}:{}", node_config.node.listen(), node_config.node.port))?; + + let listener = TcpListener::bind(bind_addr).await?; + loop { + let (stream, addr) = listener.accept().await?; + debug!("new connection from {addr}"); + let node_config = node_config.clone(); + let service_version = service_version.clone(); + let io = TokioIo::new(stream); + tokio::task::spawn(async move { + let res = hyper::server::conn::http1::Builder::new() + .serve_connection( + io, + service_fn(move |req| the_service_fn(req, addr, node_config.clone(), service_version.clone())), + ) + .await; + match res { + Ok(()) => {} + Err(e) => { + error!("{e}"); + } } - } - }); - Server::bind(&addr) - .serve(make_service) - .await - .map(|e| RetrievalError::TextError(format!("{e:?}")))?; + }); + } + // rawjh.await??; Ok(()) } -async fn http_service( - req: Request, +async fn the_service_fn( + req: Requ, + addr: SocketAddr, node_config: NodeConfigCached, service_version: ServiceVersion, -) -> Result, Error> { +) -> Result, Error> { + info!( + "http-request {:?} - {:?} - {:?} - {:?}", + addr, + req.method(), + req.uri(), + req.headers() + ); + let f = http_service(req, node_config, service_version).await; + // Cont { f: Box::pin(f) } + f +} + +async fn http_service( + req: Requ, + node_config: NodeConfigCached, + service_version: ServiceVersion, +) -> Result, Error> { match http_service_try(req, &node_config, &service_version).await { Ok(k) => Ok(k), Err(e) => { @@ -244,7 +282,7 @@ impl ReqCtx { } // TODO remove because I want error bodies to be json. -pub fn response_err(status: StatusCode, msg: T) -> Result, RetrievalError> +pub fn response_err(status: StatusCode, msg: T) -> Result where T: AsRef, { @@ -257,7 +295,7 @@ where ), msg.as_ref() ); - let ret = response(status).body(Body::from(msg))?; + let ret = response(status).body(Full::new(msg))?; Ok(ret) } @@ -267,7 +305,7 @@ macro_rules! static_http { let c = include_bytes!(concat!("../static/documentation/", $tgtex)); let ret = response(StatusCode::OK) .header("content-type", $ctype) - .body(Body::from(&c[..]))?; + .body(Full::new(&c[..]))?; return Ok(ret); } }; @@ -276,7 +314,7 @@ macro_rules! static_http { let c = include_bytes!(concat!("../static/documentation/", $tgt)); let ret = response(StatusCode::OK) .header("content-type", $ctype) - .body(Body::from(&c[..]))?; + .body(Full::new(&c[..]))?; return Ok(ret); } }; @@ -288,7 +326,7 @@ macro_rules! static_http_api1 { let c = include_bytes!(concat!("../static/documentation/", $tgtex)); let ret = response(StatusCode::OK) .header("content-type", $ctype) - .body(Body::from(&c[..]))?; + .body(Full::new(&c[..]))?; return Ok(ret); } }; @@ -297,17 +335,17 @@ macro_rules! static_http_api1 { let c = include_bytes!(concat!("../static/documentation/", $tgt)); let ret = response(StatusCode::OK) .header("content-type", $ctype) - .body(Body::from(&c[..]))?; + .body(Full::new(&c[..]))?; return Ok(ret); } }; } async fn http_service_try( - req: Request, + req: Requ, node_config: &NodeConfigCached, service_version: &ServiceVersion, -) -> Result, Error> { +) -> Result { use http::HeaderValue; let mut urlmarks = Vec::new(); urlmarks.push(format!("{}:{}", req.method(), req.uri())); @@ -317,7 +355,7 @@ async fn http_service_try( urlmarks.push(s.into()); } } - let ctx = ReqCtx::with_node(&req, node_config); + let ctx = ReqCtx::with_node(&req, &node_config); let mut res = http_service_inner(req, &ctx, node_config, service_version).await?; let hm = res.headers_mut(); hm.append("Access-Control-Allow-Origin", "*".parse().unwrap()); @@ -334,11 +372,11 @@ async fn http_service_try( } async fn http_service_inner( - req: Request, + req: Requ, ctx: &ReqCtx, node_config: &NodeConfigCached, service_version: &ServiceVersion, -) -> Result, RetrievalError> { +) -> Result { let uri = req.uri().clone(); let path = uri.path(); if path == "/api/4/private/version" { @@ -350,9 +388,9 @@ async fn http_service_inner( "patch": service_version.patch, }, }); - Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&ret)?))?) + Ok(response(StatusCode::OK).body(body_string(serde_json::to_string(&ret)?))?) } else { - Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?) } } else if path.starts_with("/api/4/private/logtest/") { if req.method() == Method::GET { @@ -366,12 +404,24 @@ async fn http_service_inner( warn!("test warn log output"); } else if path.ends_with("/error") { error!("test error log output"); + } else if path.ends_with("/mixed") { + warn!("test warn log output"); + let sp_info = span!(Level::INFO, "sp_info", f1 = "v1"); + sp_info.in_scope(|| { + warn!("test warn log output in sp_info"); + info!("test info log output in sp_info"); + let sp_debug = span!(Level::DEBUG, "sp_debug", f1 = "v1"); + sp_debug.in_scope(|| { + info!("test info log output in sp_info:sp_debug"); + debug!("test debug log output in sp_info:sp_debug"); + }); + }); } else { error!("test unknown log output"); } - Ok(response(StatusCode::OK).body(Body::empty())?) + Ok(response(StatusCode::OK).body(body_empty())?) } else { - Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?) } } else if let Some(h) = api4::eventdata::EventDataHandler::handler(&req) { Ok(h.handle(req, ctx, &node_config, service_version) @@ -413,43 +463,43 @@ async fn http_service_inner( if req.method() == Method::GET { Ok(prebinned(req, ctx, &node_config).await?) } else { - Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?) } } else if path == "/api/4/random/channel" { if req.method() == Method::GET { Ok(random_channel(req, ctx, &node_config).await?) } else { - Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?) } } else if path.starts_with("/api/4/gather/") { if req.method() == Method::GET { Ok(gather_get_json(req, &node_config).await?) } else { - Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?) } } else if path == "/api/4/clear_cache" { if req.method() == Method::GET { Ok(clear_cache_all(req, ctx, &node_config).await?) } else { - Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?) } } else if path == "/api/4/update_db_with_channel_names" { if req.method() == Method::GET { Ok(update_db_with_channel_names(req, ctx, &node_config).await?) } else { - Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?) } } else if path == "/api/4/update_db_with_all_channel_configs" { if req.method() == Method::GET { Ok(update_db_with_all_channel_configs(req, ctx, &node_config).await?) } else { - Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?) } } else if path == "/api/4/update_search_cache" { if req.method() == Method::GET { Ok(update_search_cache(req, ctx, &node_config).await?) } else { - Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?) } } else if let Some(h) = download::DownloadHandler::handler(&req) { Ok(h.handle(req, &node_config).await?) @@ -479,13 +529,13 @@ async fn http_service_inner( if req.method() == Method::GET { api_1_docs(path) } else { - Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Full::new(Bytes::new()))?) } } else if path.starts_with("/api/4/documentation/") { if req.method() == Method::GET { api_4_docs(path) } else { - Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Full::new(Bytes::new()))?) } } else { use std::fmt::Write; @@ -502,29 +552,29 @@ async fn http_service_inner( write!(out, "HEADER {hn:?}: {hv:?}
\n")?; } write!(out, "\n")?; - Ok(response(StatusCode::NOT_FOUND).body(Body::from(body))?) + Ok(response(StatusCode::NOT_FOUND).body(body_string(body))?) } } -pub fn api_4_docs(path: &str) -> Result, RetrievalError> { +pub fn api_4_docs(path: &str) -> Result { static_http!(path, "", "api4.html", "text/html"); static_http!(path, "style.css", "text/css"); static_http!(path, "script.js", "text/javascript"); static_http!(path, "status-main.html", "text/html"); - Ok(response(StatusCode::NOT_FOUND).body(Body::empty())?) + Ok(response(StatusCode::NOT_FOUND).body(body_empty())?) } -pub fn api_1_docs(path: &str) -> Result, RetrievalError> { +pub fn api_1_docs(path: &str) -> Result { static_http_api1!(path, "", "api1.html", "text/html"); static_http_api1!(path, "style.css", "text/css"); static_http_api1!(path, "script.js", "text/javascript"); - Ok(response(StatusCode::NOT_FOUND).body(Body::empty())?) + Ok(response(StatusCode::NOT_FOUND).body(body_empty())?) } pub struct StatusBoardAllHandler {} impl StatusBoardAllHandler { - pub fn handler(req: &Request) -> Option { + pub fn handler(req: &Requ) -> Option { if req.uri().path() == "/api/4/status/board/all" { Some(Self {}) } else { @@ -532,38 +582,26 @@ impl StatusBoardAllHandler { } } - pub async fn handle( - &self, - _req: Request, - _node_config: &NodeConfigCached, - ) -> Result, RetrievalError> { + pub async fn handle(&self, _req: Requ, _node_config: &NodeConfigCached) -> Result { use std::ops::Deref; let sb = status_board().unwrap(); - let buf = serde_json::to_vec(sb.deref()).unwrap(); - let res = response(StatusCode::OK).body(Body::from(buf))?; + let buf = serde_json::to_string(sb.deref()).unwrap(); + let res = response(StatusCode::OK).body(body_string(buf))?; Ok(res) } } -async fn prebinned( - req: Request, - ctx: &ReqCtx, - node_config: &NodeConfigCached, -) -> Result, RetrievalError> { +async fn prebinned(req: Requ, ctx: &ReqCtx, node_config: &NodeConfigCached) -> Result { match prebinned_inner(req, ctx, node_config).await { Ok(ret) => Ok(ret), Err(e) => { error!("fn prebinned: {e:?}"); - Ok(response(StatusCode::BAD_REQUEST).body(Body::from(format!("[prebinned-error]")))?) + Ok(response(StatusCode::BAD_REQUEST).body(body_string(format!("[prebinned-error]")))?) } } } -async fn prebinned_inner( - req: Request, - _ctx: &ReqCtx, - _node_config: &NodeConfigCached, -) -> Result, RetrievalError> { +async fn prebinned_inner(req: Requ, _ctx: &ReqCtx, _node_config: &NodeConfigCached) -> Result { let (head, _body) = req.into_parts(); let url: url::Url = format!("dummy://{}", head.uri).parse()?; let query = PreBinnedQuery::from_url(&url)?; @@ -576,22 +614,14 @@ async fn prebinned_inner( todo!() } -async fn random_channel( - req: Request, - _ctx: &ReqCtx, - node_config: &NodeConfigCached, -) -> Result, RetrievalError> { +async fn random_channel(req: Requ, _ctx: &ReqCtx, node_config: &NodeConfigCached) -> Result { let (_head, _body) = req.into_parts(); let ret = dbconn::random_channel(node_config).await?; - let ret = response(StatusCode::OK).body(Body::from(ret))?; + let ret = response(StatusCode::OK).body(body_string(ret))?; Ok(ret) } -async fn clear_cache_all( - req: Request, - _ctx: &ReqCtx, - node_config: &NodeConfigCached, -) -> Result, RetrievalError> { +async fn clear_cache_all(req: Requ, _ctx: &ReqCtx, node_config: &NodeConfigCached) -> Result { let (head, _body) = req.into_parts(); let dry = match head.uri.query() { Some(q) => q.contains("dry"), @@ -600,15 +630,15 @@ async fn clear_cache_all( let res = disk::cache::clear_cache_all(node_config, dry).await?; let ret = response(StatusCode::OK) .header(http::header::CONTENT_TYPE, APP_JSON) - .body(Body::from(serde_json::to_string(&res)?))?; + .body(body_string(serde_json::to_string(&res)?))?; Ok(ret) } async fn update_db_with_channel_names( - req: Request, + req: Requ, _ctx: &ReqCtx, node_config: &NodeConfigCached, -) -> Result, RetrievalError> { +) -> Result { info!("httpret::update_db_with_channel_names"); let (head, _body) = req.into_parts(); let _dry = match head.uri.query() { @@ -635,7 +665,7 @@ async fn update_db_with_channel_names( let p = serde_json::to_string(&e)?; let res = response(StatusCode::OK) .header(http::header::CONTENT_TYPE, APP_JSON_LINES) - .body(Body::from(p))?; + .body(body_string(p))?; Ok(res) } } @@ -643,10 +673,10 @@ async fn update_db_with_channel_names( #[allow(unused)] async fn update_db_with_channel_names_3( - req: Request, + req: Requ, _ctx: &ReqCtx, node_config: &NodeConfigCached, -) -> Result, RetrievalError> { +) -> Result { let (head, _body) = req.into_parts(); let _dry = match head.uri.query() { Some(q) => q.contains("dry"), @@ -666,10 +696,10 @@ async fn update_db_with_channel_names_3( } async fn update_db_with_all_channel_configs( - req: Request, + req: Requ, _ctx: &ReqCtx, node_config: &NodeConfigCached, -) -> Result, RetrievalError> { +) -> Result { let (head, _body) = req.into_parts(); let _dry = match head.uri.query() { Some(q) => q.contains("dry"), @@ -689,10 +719,10 @@ async fn update_db_with_all_channel_configs( } async fn update_search_cache( - req: Request, + req: Requ, _ctx: &ReqCtx, node_config: &NodeConfigCached, -) -> Result, RetrievalError> { +) -> Result { let (head, _body) = req.into_parts(); let _dry = match head.uri.query() { Some(q) => q.contains("dry"), @@ -701,7 +731,7 @@ async fn update_search_cache( let res = dbconn::scan::update_search_cache(node_config).await?; let ret = response(StatusCode::OK) .header(http::header::CONTENT_TYPE, APP_JSON) - .body(Body::from(serde_json::to_string(&res)?))?; + .body(body_string(serde_json::to_string(&res)?))?; Ok(ret) } diff --git a/crates/httpret/src/prometheus.rs b/crates/httpret/src/prometheus.rs index 2e46700..80fd74d 100644 --- a/crates/httpret/src/prometheus.rs +++ b/crates/httpret/src/prometheus.rs @@ -5,6 +5,7 @@ use http::Method; use http::Request; use http::Response; use http::StatusCode; +use httpclient::read_body_bytes; use hyper::server::conn::AddrStream; use hyper::service::make_service_fn; use hyper::service::service_fn; @@ -276,7 +277,7 @@ impl QueryHandler { info!("{} for {:?}", std::any::type_name::(), req); let url = url::Url::parse(&format!("dummy://{}", &req.uri())); info!("/api/v1/query parsed url: {:?}", url); - let body = hyper::body::to_bytes(req.into_body()).await?; + let body = read_body_bytes(req.into_body()).await?; let body_str = String::from_utf8_lossy(&body); info!("/api/v1/query body_str: {:?}", body_str); let formurl = url::Url::parse(&format!("dummy:///?{}", body_str)); @@ -308,7 +309,7 @@ impl QueryRangeHandler { info!("{} for {:?}", std::any::type_name::(), req); let url = url::Url::parse(&format!("dummy://{}", &req.uri())); info!("/api/v1/query_range parsed url: {:?}", url); - let body = hyper::body::to_bytes(req.into_body()).await?; + let body = read_body_bytes(req.into_body()).await?; let body_str = String::from_utf8_lossy(&body); info!("/api/v1/query_range body_str: {:?}", body_str); let formurl = url::Url::parse(&format!("dummy:///?{}", body_str)); diff --git a/crates/httpret/src/proxy/api4.rs b/crates/httpret/src/proxy/api4.rs index a48a214..d21dfcc 100644 --- a/crates/httpret/src/proxy/api4.rs +++ b/crates/httpret/src/proxy/api4.rs @@ -12,7 +12,6 @@ use http::Method; use http::Request; use http::Response; use http::StatusCode; -use hyper::Body; use netpod::log::*; use netpod::ChannelSearchQuery; use netpod::ChannelSearchResult; diff --git a/crates/httpret/src/pulsemap.rs b/crates/httpret/src/pulsemap.rs index c06a66a..e53beaf 100644 --- a/crates/httpret/src/pulsemap.rs +++ b/crates/httpret/src/pulsemap.rs @@ -1,7 +1,10 @@ +use crate::body_empty; +use crate::body_string; +use crate::cache::Cache; use crate::err::Error; use crate::response; -use async_channel::Receiver; -use async_channel::Sender; +use crate::Requ; +use crate::RespFull; use bytes::Buf; use bytes::BufMut; use bytes::BytesMut; @@ -10,12 +13,13 @@ use chrono::Utc; use futures_util::stream::FuturesOrdered; use futures_util::stream::FuturesUnordered; use futures_util::FutureExt; +use http::header; use http::Method; use http::StatusCode; use http::Uri; -use hyper::Body; +use httpclient::connect_client; +use httpclient::read_body_bytes; use hyper::Request; -use hyper::Response; use netpod::log::*; use netpod::timeunits::SEC; use netpod::AppendToUrl; @@ -36,7 +40,6 @@ use std::pin::Pin; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; use std::sync::Arc; -use std::sync::Mutex; use std::task::Context; use std::task::Poll; use std::time::Duration; @@ -50,114 +53,6 @@ use tokio::task::JoinHandle; use tokio::time::error::Elapsed; use url::Url; -struct Dummy; - -enum CachePortal { - Fresh, - Existing(Receiver), - Known(V), -} - -impl CachePortal {} - -enum CacheEntry { - Waiting(SystemTime, Sender, Receiver), - Known(SystemTime, V), -} - -impl CacheEntry { - fn ts(&self) -> &SystemTime { - match self { - CacheEntry::Waiting(ts, _, _) => ts, - CacheEntry::Known(ts, _) => ts, - } - } -} - -struct CacheInner { - map: BTreeMap>, -} - -impl CacheInner -where - K: Ord, -{ - const fn new() -> Self { - Self { map: BTreeMap::new() } - } - - fn housekeeping(&mut self) { - if self.map.len() > 200 { - info!("trigger housekeeping with len {}", self.map.len()); - let mut v: Vec<_> = self.map.iter().map(|(k, v)| (v.ts(), k)).collect(); - v.sort(); - let ts0 = v[v.len() / 2].0.clone(); - //let tsnow = SystemTime::now(); - //let tscut = tsnow.checked_sub(Duration::from_secs(60 * 10)).unwrap_or(tsnow); - self.map.retain(|_k, v| v.ts() >= &ts0); - info!("housekeeping kept len {}", self.map.len()); - } - } -} - -struct Cache { - inner: Mutex>, -} - -impl Cache -where - K: Ord, - V: Clone, -{ - const fn new() -> Self { - Self { - inner: Mutex::new(CacheInner::new()), - } - } - - fn housekeeping(&self) { - let mut g = self.inner.lock().unwrap(); - g.housekeeping(); - } - - fn portal(&self, key: K) -> CachePortal { - use std::collections::btree_map::Entry; - let mut g = self.inner.lock().unwrap(); - g.housekeeping(); - match g.map.entry(key) { - Entry::Vacant(e) => { - let (tx, rx) = async_channel::bounded(16); - let ret = CachePortal::Fresh; - let v = CacheEntry::Waiting(SystemTime::now(), tx, rx); - e.insert(v); - ret - } - Entry::Occupied(e) => match e.get() { - CacheEntry::Waiting(_ts, _tx, rx) => CachePortal::Existing(rx.clone()), - CacheEntry::Known(_ts, v) => CachePortal::Known(v.clone()), - }, - } - } - - fn set_value(&self, key: K, val: V) { - let mut g = self.inner.lock().unwrap(); - if let Some(e) = g.map.get_mut(&key) { - match e { - CacheEntry::Waiting(ts, tx, _rx) => { - let tx = tx.clone(); - *e = CacheEntry::Known(*ts, val); - tx.close(); - } - CacheEntry::Known(_ts, _val) => { - error!("set_value already known"); - } - } - } else { - error!("set_value no entry for key"); - } - } -} - static CACHE: Cache = Cache::new(); pub struct MapPulseHisto { @@ -176,26 +71,25 @@ const API_4_MAP_PULSE_URL_PREFIX: &'static str = "/api/4/map/pulse/"; const MAP_PULSE_LOCAL_TIMEOUT: Duration = Duration::from_millis(8000); const MAP_PULSE_QUERY_TIMEOUT: Duration = Duration::from_millis(10000); -async fn make_tables(node_config: &NodeConfigCached) -> Result<(), Error> { - let conn = dbconn::create_connection(&node_config.node_config.cluster.database).await?; +async fn make_tables(pgc: &dbconn::pg::Client) -> Result<(), Error> { let sql = "set client_min_messages = 'warning'"; - conn.execute(sql, &[]).await?; + pgc.execute(sql, &[]).await?; let sql = "create table if not exists map_pulse_channels (name text, tbmax int)"; - conn.execute(sql, &[]).await?; + pgc.execute(sql, &[]).await?; let sql = "create table if not exists map_pulse_files (channel text not null, split int not null, timebin int not null, closed int not null default 0, pulse_min int8 not null, pulse_max int8 not null)"; - conn.execute(sql, &[]).await?; + pgc.execute(sql, &[]).await?; let sql = "create unique index if not exists map_pulse_files_ix1 on map_pulse_files (channel, split, timebin)"; - conn.execute(sql, &[]).await?; + pgc.execute(sql, &[]).await?; let sql = "alter table map_pulse_files add if not exists upc1 int not null default 0"; - conn.execute(sql, &[]).await?; + pgc.execute(sql, &[]).await?; let sql = "alter table map_pulse_files add if not exists hostname text not null default ''"; - conn.execute(sql, &[]).await?; + pgc.execute(sql, &[]).await?; let sql = "alter table map_pulse_files add if not exists ks int not null default 2"; - conn.execute(sql, &[]).await?; + pgc.execute(sql, &[]).await?; let sql = "create index if not exists map_pulse_files_ix2 on map_pulse_files (hostname)"; - conn.execute(sql, &[]).await?; + pgc.execute(sql, &[]).await?; let sql = "set client_min_messages = 'notice'"; - conn.execute(sql, &[]).await?; + pgc.execute(sql, &[]).await?; Ok(()) } @@ -223,12 +117,13 @@ fn timer_channel_names() -> Vec { #[derive(Debug, PartialEq, Eq, PartialOrd, Ord)] enum MapfilePath { Scalar(PathBuf), - Index(PathBuf, PathBuf), + Index(PathBuf), } -async fn datafiles_for_channel(name: String, node_config: &NodeConfigCached) -> Result, Error> { +async fn datafiles_for_channel(name: &str, node_config: &NodeConfigCached) -> Result, Error> { let mut a = Vec::new(); let sfc = node_config.node.sf_databuffer.as_ref().unwrap(); + let data_base_path = &sfc.data_base_path; let channel_path = sfc .data_base_path .join(format!("{}_2", sfc.ksprefix)) @@ -252,34 +147,42 @@ async fn datafiles_for_channel(name: String, node_config: &NodeConfigCached) -> } Err(e) => match e.kind() { std::io::ErrorKind::NotFound => { - let channel_path = sfc - .data_base_path - .join(format!("{}_3", sfc.ksprefix)) - .join("byTime") - .join(&name); - match tokio::fs::read_dir(&channel_path).await { - Ok(mut rd) => { - while let Ok(Some(entry)) = rd.next_entry().await { - let mut rd2 = tokio::fs::read_dir(entry.path()).await?; - while let Ok(Some(e2)) = rd2.next_entry().await { - let mut rd3 = tokio::fs::read_dir(e2.path()).await?; - while let Ok(Some(e3)) = rd3.next_entry().await { - if e3.file_name().to_string_lossy().ends_with("_00000_Data_Index") { - let fns = e3.file_name().to_string_lossy().to_string(); - let path_data = e3.path().parent().unwrap().join(&fns[..fns.len() - 6]); - let x = MapfilePath::Index(e3.path(), path_data); - a.push(x); - } - } - } + files_recursive(name, &data_base_path, &sfc.ksprefix, 3, "_00000_Data_Index").await + } + _ => return Err(e)?, + }, + } +} + +async fn files_recursive( + name: &str, + data_base_path: &Path, + ksprefix: &str, + ks: u32, + data_file_suffix: &str, +) -> Result, Error> { + let mut a = Vec::new(); + let channel_path = data_base_path + .join(format!("{}_{}", ksprefix, ks)) + .join("byTime") + .join(&name); + match tokio::fs::read_dir(&channel_path).await { + Ok(mut rd) => { + while let Ok(Some(entry)) = rd.next_entry().await { + let mut rd2 = tokio::fs::read_dir(entry.path()).await?; + while let Ok(Some(e2)) = rd2.next_entry().await { + let mut rd3 = tokio::fs::read_dir(e2.path()).await?; + while let Ok(Some(e3)) = rd3.next_entry().await { + if e3.file_name().to_string_lossy().ends_with(data_file_suffix) { + let x = MapfilePath::Index(e3.path()); + a.push(x); } - Ok(a) } - Err(e) => match e.kind() { - _ => return Err(e)?, - }, } } + Ok(a) + } + Err(e) => match e.kind() { _ => return Err(e)?, }, } @@ -490,7 +393,7 @@ async fn read_chunk_at(mut file: File, pos: u64, chunk_len: Option) -> Resu pub struct IndexFullHttpFunction {} impl IndexFullHttpFunction { - pub fn handler(req: &Request) -> Option { + pub fn handler(req: &Requ) -> Option { if req.uri().path().eq("/api/1/map/index/full") { Some(Self {}) } else { @@ -498,42 +401,90 @@ impl IndexFullHttpFunction { } } - pub async fn handle(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { + pub async fn handle(&self, req: Requ, node_config: &NodeConfigCached) -> Result { if req.method() != Method::GET { - return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?); + return Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?); } let ret = match Self::index(false, node_config).await { - Ok(msg) => response(StatusCode::OK).body(Body::from(msg))?, - Err(e) => response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::from(format!("{:?}", e)))?, + Ok(msg) => response(StatusCode::OK).body(body_string(msg))?, + Err(e) => { + error!("IndexFullHttpFunction {e}"); + response(StatusCode::INTERNAL_SERVER_ERROR).body(body_string(format!("{:?}", e)))? + } }; Ok(ret) } - pub async fn index_channel( - channel_name: String, - conn: &dbconn::pg::Client, + async fn index(do_print: bool, node_config: &NodeConfigCached) -> Result { + // TODO avoid double-insert on central storage. + let mut msg = format!("LOG"); + let pgc = dbconn::create_connection(&node_config.node_config.cluster.database).await?; + // TODO remove update of static columns when older clients are removed. + let sql = "insert into map_pulse_files (channel, split, timebin, pulse_min, pulse_max, hostname, ks) values ($1, $2, $3, $4, $5, $6, $7) on conflict (channel, split, timebin) do update set pulse_min = $4, pulse_max = $5, upc1 = map_pulse_files.upc1 + 1, hostname = $6"; + let insert_01 = pgc.prepare(sql).await?; + make_tables(&pgc).await?; + let chs = timer_channel_names(); + for channel_name in chs { + match Self::index_channel(&channel_name, &pgc, do_print, &insert_01, node_config).await { + Ok(m) => { + msg.push_str("\n"); + msg.push_str(&m); + } + Err(e) => { + error!("error while indexing {} {:?}", channel_name, e); + //return Err(e); + } + } + } + Ok(msg) + } + + async fn index_channel( + channel_name: &str, + pgc: &dbconn::pg::Client, do_print: bool, + insert_01: &dbconn::pg::Statement, node_config: &NodeConfigCached, ) -> Result { let mut msg = format!("Index channel {}", channel_name); - let files = datafiles_for_channel(channel_name.clone(), node_config).await?; + let files = datafiles_for_channel(channel_name, node_config).await?; let mut files = files; files.sort(); let files = files; msg = format!("{}\n{:?}", msg, files); let mut latest_pair = (0, 0); - let n1 = files.len().min(3); - let m1 = files.len() - n1; - for ch in &files[m1..] { - trace!(" index over {:?}", ch); - } - for mp in files[m1..].into_iter() { + let files_from = files.len() - files.len().min(2); + for mp in files[files_from..].into_iter() { match mp { MapfilePath::Scalar(path) => { + trace!("Scalar {path:?}"); let splitted: Vec<_> = path.to_str().unwrap().split("/").collect(); let timebin: u64 = splitted[splitted.len() - 3].parse()?; let split: u64 = splitted[splitted.len() - 2].parse()?; - let file = tokio::fs::OpenOptions::new().read(true).open(&path).await?; + if false { + // Not needed, we any use only the last N files. + // TODO the timebin unit depends on the keyspace. + // In worst case could depend on the current channel config, and could have changed + // at each config change. That would be madness. Luckily, it seems always 1d for ks 2 and 3. + let timebin_dt = Duration::from_secs(60 * 60 * 24 * timebin); + let timebin_ts = SystemTime::UNIX_EPOCH.checked_add(timebin_dt).unwrap(); + let tsnow = SystemTime::now(); + if timebin_ts + Duration::from_secs(60 * 60 * 24 * 2) < tsnow { + debug!("FILTER PAST {timebin} {path:?}"); + } else if timebin_ts > tsnow + Duration::from_secs(60 * 60 * 24 * 2) { + debug!("FILTER FUTU {timebin} {path:?}"); + } else { + debug!("KEEP TIMEBI {timebin} {path:?}"); + } + } + let file = match tokio::fs::OpenOptions::new().read(true).open(&path).await { + Ok(x) => x, + Err(e) => { + let e = Error::with_msg_no_trace(format!("MapfilePath::Scalar {e} {path:?}")); + error!("{e}"); + return Err(e); + } + }; let (r2, file) = read_first_chunk(file).await?; msg = format!("{}\n{:?}", msg, r2); if let Some(r2) = r2 { @@ -543,10 +494,8 @@ impl IndexFullHttpFunction { if r3.pulse > latest_pair.0 { latest_pair = (r3.pulse, r3.ts); } - // TODO remove update of static columns when older clients are removed. - let sql = "insert into map_pulse_files (channel, split, timebin, pulse_min, pulse_max, hostname) values ($1, $2, $3, $4, $5, $6) on conflict (channel, split, timebin) do update set pulse_min = $4, pulse_max = $5, upc1 = map_pulse_files.upc1 + 1, hostname = $6"; - conn.execute( - sql, + pgc.execute( + insert_01, &[ &channel_name, &(split as i32), @@ -554,21 +503,42 @@ impl IndexFullHttpFunction { &(r2.pulse as i64), &(r3.pulse as i64), &node_config.node.host, + &(2 as i32), ], ) .await?; + } else { + warn!("could not find last event chunk in {path:?}"); } } else { warn!("could not find first event chunk in {path:?}"); } } - MapfilePath::Index(path_index, path_data) => { + MapfilePath::Index(path_index) => { trace!("Index {path_index:?}"); + let path_data = { + let fns = path_index.file_name().unwrap().to_str().unwrap(); + path_index.parent().unwrap().join(&fns[..fns.len() - 6]) + }; let splitted: Vec<_> = path_index.to_str().unwrap().split("/").collect(); let timebin: u64 = splitted[splitted.len() - 3].parse()?; let split: u64 = splitted[splitted.len() - 2].parse()?; - let file_index = tokio::fs::OpenOptions::new().read(true).open(&path_index).await?; - let file_data = tokio::fs::OpenOptions::new().read(true).open(&path_data).await?; + let file_index = match tokio::fs::OpenOptions::new().read(true).open(&path_index).await { + Ok(x) => x, + Err(e) => { + let e = Error::with_msg_no_trace(format!("MapfilePath::Index {e} {path_index:?}")); + error!("{e}"); + return Err(e); + } + }; + let file_data = match tokio::fs::OpenOptions::new().read(true).open(&path_data).await { + Ok(x) => x, + Err(e) => { + let e = Error::with_msg_no_trace(format!("MapfilePath::Index {e} {path_data:?}")); + error!("{e}"); + return Err(e); + } + }; let (r2, file_index, file_data) = read_first_index_chunk(file_index, file_data).await?; msg = format!("{}\n{:?}", msg, r2); if let Some(r2) = r2 { @@ -578,10 +548,8 @@ impl IndexFullHttpFunction { if r3.pulse > latest_pair.0 { latest_pair = (r3.pulse, r3.ts); } - // TODO remove update of static columns when older clients are removed. - let sql = "insert into map_pulse_files (channel, split, timebin, pulse_min, pulse_max, hostname, ks) values ($1, $2, $3, $4, $5, $6, 3) on conflict (channel, split, timebin) do update set pulse_min = $4, pulse_max = $5, upc1 = map_pulse_files.upc1 + 1, hostname = $6"; - conn.execute( - sql, + pgc.execute( + insert_01, &[ &channel_name, &(split as i32), @@ -589,12 +557,15 @@ impl IndexFullHttpFunction { &(r2.pulse as i64), &(r3.pulse as i64), &node_config.node.host, + &(3 as i32), ], ) .await?; + } else { + warn!("could not find last index chunk in {path_index:?}"); } } else { - warn!("could not find first event chunk in {path_index:?}"); + warn!("could not find first index chunk in {path_index:?}"); } } } @@ -609,27 +580,6 @@ impl IndexFullHttpFunction { } Ok(msg) } - - pub async fn index(do_print: bool, node_config: &NodeConfigCached) -> Result { - // TODO avoid double-insert on central storage. - let mut msg = format!("LOG"); - make_tables(node_config).await?; - let conn = dbconn::create_connection(&node_config.node_config.cluster.database).await?; - let chs = timer_channel_names(); - for channel_name in &chs[..] { - match Self::index_channel(channel_name.clone(), &conn, do_print, node_config).await { - Ok(m) => { - msg.push_str("\n"); - msg.push_str(&m); - } - Err(e) => { - error!("error while indexing {} {:?}", channel_name, e); - //return Err(e); - } - } - } - Ok(msg) - } } pub struct UpdateTaskGuard { @@ -728,7 +678,7 @@ impl UpdateTask { /// Returns a guard which must be kept alive as long as the service should run. /// Should instead of this use a system-timer and call the rest api. #[allow(unused)] - fn new(node_config: NodeConfigCached) -> UpdateTaskGuard { + fn _new(node_config: NodeConfigCached) -> UpdateTaskGuard { let do_abort = Arc::new(AtomicUsize::default()); let task = Self { do_abort: do_abort.clone(), @@ -950,7 +900,7 @@ impl MapPulseScyllaHandler { "/api/4/scylla/map/pulse/" } - pub fn handler(req: &Request) -> Option { + pub fn handler(req: &Requ) -> Option { if req.uri().path().starts_with(Self::prefix()) { Some(Self {}) } else { @@ -958,9 +908,9 @@ impl MapPulseScyllaHandler { } } - pub async fn handle(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { + pub async fn handle(&self, req: Requ, node_config: &NodeConfigCached) -> Result { if req.method() != Method::GET { - return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?); + return Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?); } let urls = format!("dummy://{}", req.uri()); let url = url::Url::parse(&urls)?; @@ -995,14 +945,14 @@ impl MapPulseScyllaHandler { channels.push(ch.into()); } let ret = LocalMap { pulse, tss, channels }; - Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&ret)?))?) + Ok(response(StatusCode::OK).body(body_string(serde_json::to_string(&ret)?))?) } } pub struct MapPulseLocalHttpFunction {} impl MapPulseLocalHttpFunction { - pub fn handler(req: &Request) -> Option { + pub fn handler(req: &Requ) -> Option { if req.uri().path().starts_with(MAP_PULSE_LOCAL_URL_PREFIX) { Some(Self {}) } else { @@ -1010,9 +960,9 @@ impl MapPulseLocalHttpFunction { } } - pub async fn handle(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { + pub async fn handle(&self, req: Requ, node_config: &NodeConfigCached) -> Result { if req.method() != Method::GET { - return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?); + return Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?); } let urls = req.uri().to_string(); let pulse: u64 = urls[MAP_PULSE_LOCAL_URL_PREFIX.len()..] @@ -1042,9 +992,6 @@ impl MapPulseLocalHttpFunction { dt.as_secs_f32() * 1e3 ); } - //let mut msg = String::new(); - //use std::fmt::Write; - //write!(&mut msg, "cands: {:?}\n", cands)?; let mut futs = FuturesUnordered::new(); for (ch, hostname, tb, sp, ks) in cands { futs.push(Self::search(pulse, ch, hostname, tb, sp, ks, node_config)); @@ -1067,7 +1014,7 @@ impl MapPulseLocalHttpFunction { } } let ret = LocalMap { pulse, tss, channels }; - Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&ret)?))?) + Ok(response(StatusCode::OK).body(body_string(serde_json::to_string(&ret)?))?) } async fn search( @@ -1088,57 +1035,68 @@ impl MapPulseLocalHttpFunction { ch ); if ks == 2 { - match disk::paths::data_path_tb(ks, &ch, tb, 86400000, sp, &node_config.node) { - Ok(path) => { - //write!(&mut msg, "data_path_tb: {:?}\n", path)?; - match search_pulse(pulse, &path).await { - Ok(ts) => { - //write!(&mut msg, "SEARCH: {:?} for {}\n", ts, pulse)?; - if let Some(ts) = ts { - info!("Found in ks {} sp {} tb {} ch {} ts {}", ks, sp, tb, ch, ts); - Ok(Some((ts, ch))) - } else { - Ok(None) - } - } - Err(e) => { - warn!("can not map pulse with {ch} {sp} {tb} {e}"); - return Err(e); + match disk::paths::data_path_tb(ks, &ch, tb, 1000 * 60 * 60 * 24, sp, &node_config.node) { + Ok(path) => match search_pulse(pulse, &path).await { + Ok(ts) => { + if let Some(ts) = ts { + info!("pulse {pulse} found in ks {ks} sp {sp} tb {tb} ch {ch} ts {ts}"); + Ok(Some((ts, ch))) + } else { + Ok(None) } } - } + Err(e) => { + let e = Error::with_msg_no_trace(format!( + "pulse {pulse} can not map ks {ks} sp {sp} tb {tb} ch {ch} {e}" + )); + error!("{e}"); + return Err(e); + } + }, Err(e) => { - warn!("can not get path to files {ch} {e}"); - return Err(e)?; + let e = Error::with_msg_no_trace(format!( + "pulse {pulse} no path ks {ks} sp {sp} tb {tb} ch {ch} {e}" + )); + error!("{e}"); + return Err(e); } } } else if ks == 3 { - match disk::paths::data_path_tb(ks, &ch, tb, 86400000, sp, &node_config.node) { - Ok(path) => { - //write!(&mut msg, "data_path_tb: {:?}\n", path)?; - match search_index_pulse(pulse, &path).await { - Ok(ts) => { - //write!(&mut msg, "SEARCH: {:?} for {}\n", ts, pulse)?; - if let Some(ts) = ts { - info!("Found in ks {} sp {} tb {} ch {} ts {}", ks, sp, tb, ch, ts); - Ok(Some((ts, ch))) - } else { - Ok(None) - } - } - Err(e) => { - warn!("can not map pulse with {ch} {sp} {tb} {e}"); - return Err(e); + match disk::paths::data_path_tb(ks, &ch, tb, 1000 * 60 * 60 * 24, sp, &node_config.node) { + Ok(path) => match search_index_pulse(pulse, &path).await { + Ok(ts) => { + if let Some(ts) = ts { + info!( + "pulse {} found in ks {} sp {} tb {} ch {} ts {}", + pulse, ks, sp, tb, ch, ts + ); + Ok(Some((ts, ch))) + } else { + Ok(None) } } - } + Err(e) => { + let e = Error::with_msg_no_trace(format!( + "pulse {pulse} can not map ks {ks} sp {sp} tb {tb} ch {ch} {e}" + )); + error!("{e}"); + return Err(e); + } + }, Err(e) => { - warn!("can not get path to files {ch} {e}"); - return Err(e)?; + let e = Error::with_msg_no_trace(format!( + "pulse {pulse} no path ks {ks} sp {sp} tb {tb} ch {ch} {e}" + )); + error!("{e}"); + return Err(e); } } } else { - return Err(Error::with_msg_no_trace(format!("bad keyspace {ks}"))); + let e = Error::with_msg_no_trace(format!( + "pulse {pulse} bad keyspace ks {ks} sp {sp} tb {tb} ch {ch}" + )); + error!("{e}"); + return Err(e); } } } @@ -1153,7 +1111,7 @@ pub struct TsHisto { pub struct MapPulseHistoHttpFunction {} impl MapPulseHistoHttpFunction { - pub fn handler(req: &Request) -> Option { + pub fn handler(req: &Requ) -> Option { if req.uri().path().starts_with(MAP_PULSE_HISTO_URL_PREFIX) { Some(Self {}) } else { @@ -1161,14 +1119,14 @@ impl MapPulseHistoHttpFunction { } } - pub async fn handle(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { + pub async fn handle(&self, req: Requ, node_config: &NodeConfigCached) -> Result { if req.method() != Method::GET { - return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?); + return Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?); } let urls = format!("{}", req.uri()); let pulse: u64 = urls[MAP_PULSE_HISTO_URL_PREFIX.len()..].parse()?; let ret = Self::histo(pulse, node_config).await?; - Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&ret)?))?) + Ok(response(StatusCode::OK).body(body_string(serde_json::to_string(&ret)?))?) } pub async fn histo(pulse: u64, node_config: &NodeConfigCached) -> Result { @@ -1179,12 +1137,21 @@ impl MapPulseHistoHttpFunction { node.host, node.port, MAP_PULSE_LOCAL_URL_PREFIX, pulse ); let uri: Uri = s.parse()?; - let req = Request::get(uri) + let req = Request::get(&uri) + .header(header::HOST, uri.host().unwrap()) .header("x-req-from", &node_config.node.host) - .body(Body::empty())?; - let fut = hyper::Client::new().request(req); - //let fut = hyper::Client::new().get(uri); - let fut = tokio::time::timeout(MAP_PULSE_LOCAL_TIMEOUT, fut); + .body(body_empty())?; + let fut = async move { + match connect_client(req.uri()).await { + Ok(mut client) => { + let fut = client.send_request(req); + tokio::time::timeout(MAP_PULSE_LOCAL_TIMEOUT, fut) + .await + .map(|x| x.map_err(Error::from_to_string)) + } + Err(e) => Ok(Err(Error::from_to_string(e))), + } + }; futs.push_back(fut); } use futures_util::stream::StreamExt; @@ -1192,7 +1159,7 @@ impl MapPulseHistoHttpFunction { while let Some(futres) = futs.next().await { match futres { Ok(res) => match res { - Ok(res) => match hyper::body::to_bytes(res.into_body()).await { + Ok(res) => match read_body_bytes(res.into_body()).await { Ok(body) => match serde_json::from_slice::(&body) { Ok(lm) => { for ts in lm.tss { @@ -1227,6 +1194,7 @@ impl MapPulseHistoHttpFunction { tss: map.keys().map(|j| *j).collect(), counts: map.values().map(|j| *j).collect(), }; + info!("pulse {pulse} histo {ret:?}"); Ok(ret) } } @@ -1234,7 +1202,7 @@ impl MapPulseHistoHttpFunction { pub struct MapPulseHttpFunction {} impl MapPulseHttpFunction { - pub fn handler(req: &Request) -> Option { + pub fn handler(req: &Requ) -> Option { if req.uri().path().starts_with(MAP_PULSE_URL_PREFIX) { Some(Self {}) } else { @@ -1242,16 +1210,16 @@ impl MapPulseHttpFunction { } } - pub async fn handle(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { + pub async fn handle(&self, req: Requ, node_config: &NodeConfigCached) -> Result { + use crate::cache::CachePortal; if req.method() != Method::GET { - return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?); + return Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?); } trace!("MapPulseHttpFunction handle uri: {:?}", req.uri()); let urls = format!("{}", req.uri()); let pulse: u64 = urls[MAP_PULSE_URL_PREFIX.len()..].parse()?; match CACHE.portal(pulse) { CachePortal::Fresh => { - trace!("value not yet in cache pulse {pulse}"); let histo = MapPulseHistoHttpFunction::histo(pulse, node_config).await?; let mut i1 = 0; let mut max = 0; @@ -1264,9 +1232,9 @@ impl MapPulseHttpFunction { if max > 0 { let val = histo.tss[i1]; CACHE.set_value(pulse, val); - Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&val)?))?) + Ok(response(StatusCode::OK).body(body_string(serde_json::to_string(&val)?))?) } else { - Ok(response(StatusCode::NO_CONTENT).body(Body::empty())?) + Ok(response(StatusCode::NO_CONTENT).body(body_empty())?) } } CachePortal::Existing(rx) => { @@ -1274,30 +1242,27 @@ impl MapPulseHttpFunction { match rx.recv().await { Ok(_) => { error!("should never recv from existing operation pulse {pulse}"); - Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())?) + Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(body_empty())?) } - Err(_e) => { - trace!("woken up while value wait pulse {pulse}"); - match CACHE.portal(pulse) { - CachePortal::Known(val) => { - info!("good, value after wakeup pulse {pulse}"); - Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&val)?))?) - } - CachePortal::Fresh => { - error!("woken up, but portal fresh pulse {pulse}"); - Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())?) - } - CachePortal::Existing(..) => { - error!("woken up, but portal existing pulse {pulse}"); - Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())?) - } + Err(_e) => match CACHE.portal(pulse) { + CachePortal::Known(ts) => { + info!("pulse {pulse} known from cache ts {ts}"); + Ok(response(StatusCode::OK).body(body_string(serde_json::to_string(&ts)?))?) } - } + CachePortal::Fresh => { + error!("pulse {pulse} woken up, but fresh"); + Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(body_empty())?) + } + CachePortal::Existing(..) => { + error!("pulse {pulse} woken up, but existing"); + Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(body_empty())?) + } + }, } } - CachePortal::Known(val) => { - trace!("value already in cache pulse {pulse} ts {val}"); - Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&val)?))?) + CachePortal::Known(ts) => { + info!("pulse {pulse} in cache ts {ts}"); + Ok(response(StatusCode::OK).body(body_string(serde_json::to_string(&ts)?))?) } } } @@ -1306,7 +1271,7 @@ impl MapPulseHttpFunction { pub struct Api4MapPulseHttpFunction {} impl Api4MapPulseHttpFunction { - pub fn handler(req: &Request) -> Option { + pub fn handler(req: &Requ) -> Option { if req.uri().path().starts_with(API_4_MAP_PULSE_URL_PREFIX) { Some(Self {}) } else { @@ -1319,6 +1284,7 @@ impl Api4MapPulseHttpFunction { } pub async fn find_timestamp(q: MapPulseQuery, ncc: &NodeConfigCached) -> Result, Error> { + use crate::cache::CachePortal; let pulse = q.pulse; let res = match CACHE.portal(pulse) { CachePortal::Fresh => { @@ -1377,20 +1343,20 @@ impl Api4MapPulseHttpFunction { res } - pub async fn handle(&self, req: Request, ncc: &NodeConfigCached) -> Result, Error> { + pub async fn handle(&self, req: Requ, ncc: &NodeConfigCached) -> Result { if req.method() != Method::GET { - return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?); + return Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?); } let ts1 = Instant::now(); trace!("Api4MapPulseHttpFunction handle uri: {:?}", req.uri()); let url = Url::parse(&format!("dummy:{}", req.uri()))?; let q = MapPulseQuery::from_url(&url)?; let ret = match Self::find_timestamp(q, ncc).await { - Ok(Some(val)) => Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&val)?))?), - Ok(None) => Ok(response(StatusCode::NO_CONTENT).body(Body::empty())?), + Ok(Some(val)) => Ok(response(StatusCode::OK).body(body_string(serde_json::to_string(&val)?))?), + Ok(None) => Ok(response(StatusCode::NO_CONTENT).body(body_empty())?), Err(e) => { error!("find_timestamp {e}"); - Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())?) + Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(body_empty())?) } }; let ts2 = Instant::now(); @@ -1416,7 +1382,7 @@ impl Api4MapPulse2HttpFunction { "/api/4/map/pulse-v2/" } - pub fn handler(req: &Request) -> Option { + pub fn handler(req: &Requ) -> Option { if req.uri().path().starts_with(Self::path_prefix()) { Some(Self {}) } else { @@ -1428,9 +1394,9 @@ impl Api4MapPulse2HttpFunction { path.starts_with(Self::path_prefix()) } - pub async fn handle(&self, req: Request, ncc: &NodeConfigCached) -> Result, Error> { + pub async fn handle(&self, req: Requ, ncc: &NodeConfigCached) -> Result { if req.method() != Method::GET { - return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?); + return Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?); } let ts1 = Instant::now(); let url = Url::parse(&format!("dummy:{}", req.uri()))?; @@ -1446,12 +1412,12 @@ impl Api4MapPulse2HttpFunction { .format(DATETIME_FMT_9MS) .to_string(); let res = Api4MapPulse2Response { sec, ns, datetime }; - Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&res)?))?) + Ok(response(StatusCode::OK).body(body_string(serde_json::to_string(&res)?))?) } - Ok(None) => Ok(response(StatusCode::NO_CONTENT).body(Body::empty())?), + Ok(None) => Ok(response(StatusCode::NO_CONTENT).body(body_empty())?), Err(e) => { error!("find_timestamp {e}"); - Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())?) + Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(body_empty())?) } }; let ts2 = Instant::now(); @@ -1466,7 +1432,7 @@ impl Api4MapPulse2HttpFunction { pub struct MarkClosedHttpFunction {} impl MarkClosedHttpFunction { - pub fn handler(req: &Request) -> Option { + pub fn handler(req: &Requ) -> Option { if req.uri().path().starts_with(MAP_PULSE_MARK_CLOSED_URL_PREFIX) { Some(Self {}) } else { @@ -1474,19 +1440,19 @@ impl MarkClosedHttpFunction { } } - pub async fn handle(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { + pub async fn handle(&self, req: Requ, node_config: &NodeConfigCached) -> Result { if req.method() != Method::GET { - return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?); + return Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?); } info!("MarkClosedHttpFunction handle uri: {:?}", req.uri()); match MarkClosedHttpFunction::mark_closed(node_config).await { Ok(_) => { - let ret = response(StatusCode::OK).body(Body::empty())?; + let ret = response(StatusCode::OK).body(body_empty())?; Ok(ret) } Err(e) => { let msg = format!("{:?}", e); - let ret = response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::from(msg))?; + let ret = response(StatusCode::INTERNAL_SERVER_ERROR).body(body_string(msg))?; Ok(ret) } } diff --git a/crates/httpret/src/settings.rs b/crates/httpret/src/settings.rs index c9dbac1..8170234 100644 --- a/crates/httpret/src/settings.rs +++ b/crates/httpret/src/settings.rs @@ -1,8 +1,8 @@ use crate::err::Error; use crate::response; +use http::header; use http::Method; use http::StatusCode; -use hyper::Body; use hyper::Request; use hyper::Response; use netpod::log::*; @@ -29,7 +29,7 @@ impl SettingsThreadsMaxHandler { let (head, body) = req.into_parts(); let accept = head .headers - .get(http::header::ACCEPT) + .get(header::ACCEPT) .map_or(Ok(ACCEPT_ALL), |k| k.to_str()) .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))? .to_owned(); @@ -39,7 +39,7 @@ impl SettingsThreadsMaxHandler { error!("{e}"); return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?); } - let body = hyper::body::to_bytes(body).await?; + let body = httpclient::read_body_bytes(body).await?; //let threads_max: usize = head.uri.path()[Self::path_prefix().len()..].parse()?; let threads_max: usize = String::from_utf8_lossy(&body).parse()?; info!("threads_max {threads_max}"); diff --git a/crates/items_2/src/eventsdim0.rs b/crates/items_2/src/eventsdim0.rs index 4f0d0be..d71617b 100644 --- a/crates/items_2/src/eventsdim0.rs +++ b/crates/items_2/src/eventsdim0.rs @@ -1091,7 +1091,7 @@ mod test_frame { #[test] fn events_serialize() { - taskrun::tracing_init().unwrap(); + taskrun::tracing_init_testing().unwrap(); let mut events = EventsDim0::empty(); events.push(123, 234, 55f32); let events = events; diff --git a/crates/streams/Cargo.toml b/crates/streams/Cargo.toml index ddd463f..7ab8312 100644 --- a/crates/streams/Cargo.toml +++ b/crates/streams/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Dominik Werder "] edition = "2021" [dependencies] -tokio = { version = "1.21.2", features = ["io-util", "net", "time", "sync", "fs"] } +tokio = { version = "1.34", features = ["io-util", "net", "time", "sync", "fs"] } futures-util = "0.3.15" pin-project = "1.0.12" serde = { version = "1.0", features = ["derive"] } diff --git a/crates/streams/src/tcprawclient.rs b/crates/streams/src/tcprawclient.rs index 794bc1a..d1b6e41 100644 --- a/crates/streams/src/tcprawclient.rs +++ b/crates/streams/src/tcprawclient.rs @@ -18,6 +18,7 @@ use items_2::frame::make_term_frame; use netpod::log::*; use netpod::Cluster; use netpod::Node; +use netpod::APP_OCTET; use query::api4::events::EventsSubQuery; use query::api4::events::Frame1Parts; use serde::de::DeserializeOwned; @@ -63,32 +64,29 @@ pub async fn x_processed_event_blobs_stream_from_node_http( use http::Request; use httpclient::http; use httpclient::hyper; - use hyper::Body; use hyper::StatusCode; let frame1 = make_node_command_frame(subq.clone())?; let item = sitem_data(frame1.clone()); - let buf = item.make_frame()?; + let buf = item.make_frame()?.freeze(); let url = node.baseurl().join("/api/4/private/eventdata/frames").unwrap(); debug!("open_event_data_streams_http post {url}"); let req = Request::builder() .method(Method::POST) .uri(url.to_string()) - .header(header::ACCEPT, "application/octet-stream") - .body(Body::from(buf.to_vec())) + .header(header::ACCEPT, APP_OCTET) + .body(httpclient::Full::new(buf)) .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; - let client = hyper::Client::new(); + let mut client = httpclient::connect_client(req.uri()).await?; let res = client - .request(req) + .send_request(req) .await .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; if res.status() != StatusCode::OK { error!("Server error {:?}", res); let (head, body) = res.into_parts(); - let buf = hyper::body::to_bytes(body) - .await - .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; + let buf = httpclient::read_body_bytes(body).await?; let s = String::from_utf8_lossy(&buf); return Err(Error::with_msg(format!( concat!( @@ -101,7 +99,7 @@ pub async fn x_processed_event_blobs_stream_from_node_http( ))); } let (_head, body) = res.into_parts(); - let frames = InMemoryFrameStream::new(body, subq.inmem_bufcap()); + let frames = InMemoryFrameStream::new(httpclient::IncomingStream::new(body), subq.inmem_bufcap()); let frames = Box::pin(frames); let stream = EventsFromFrames::new(frames, url.to_string()); debug!("open_event_data_streams_http done {url}"); @@ -165,31 +163,28 @@ where use http::Request; use httpclient::http; use httpclient::hyper; - use hyper::Body; use hyper::StatusCode; let item = sitem_data(frame1.clone()); - let buf = item.make_frame()?; + let buf = item.make_frame()?.freeze(); let url = node.baseurl().join("/api/4/private/eventdata/frames").unwrap(); debug!("open_event_data_streams_http post {url}"); let req = Request::builder() .method(Method::POST) .uri(url.to_string()) - .header(header::ACCEPT, "application/octet-stream") - .body(Body::from(buf.to_vec())) + .header(header::ACCEPT, APP_OCTET) + .body(httpclient::Full::new(buf)) .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; - let client = hyper::Client::new(); + let mut client = httpclient::connect_client(req.uri()).await?; let res = client - .request(req) + .send_request(req) .await .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; if res.status() != StatusCode::OK { error!("Server error {:?}", res); let (head, body) = res.into_parts(); - let buf = hyper::body::to_bytes(body) - .await - .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; + let buf = httpclient::read_body_bytes(body).await?; let s = String::from_utf8_lossy(&buf); return Err(Error::with_msg(format!( concat!( @@ -202,7 +197,7 @@ where ))); } let (_head, body) = res.into_parts(); - let frames = InMemoryFrameStream::new(body, subq.inmem_bufcap()); + let frames = InMemoryFrameStream::new(httpclient::IncomingStream::new(body), subq.inmem_bufcap()); let frames = Box::pin(frames); let stream = EventsFromFrames::::new(frames, url.to_string()); debug!("open_event_data_streams_http done {url}"); diff --git a/crates/taskrun/Cargo.toml b/crates/taskrun/Cargo.toml index b273d4b..8e6dbc1 100644 --- a/crates/taskrun/Cargo.toml +++ b/crates/taskrun/Cargo.toml @@ -10,10 +10,11 @@ path = "src/taskrun.rs" [dependencies] tokio = { version = "1.32.0", features = ["full", "tracing", "time"] } futures-util = "0.3.28" -tracing = "0.1.37" +tracing = "0.1.40" +tracing-log = "0.2.0" tracing-subscriber = { version = "0.3.17", features = ["fmt", "time"] } #tracing-loki = { version = "0.2.1", default-features = false, features = ["compat-0-2-1"] } -console-subscriber = { version = "0.1.10" } +console-subscriber = { version = "0.2.0" } time = { version = "0.3", features = ["formatting"] } backtrace = "0.3.56" lazy_static = "1.4.0" diff --git a/crates/taskrun/src/formatter.rs b/crates/taskrun/src/formatter.rs new file mode 100644 index 0000000..1e4c797 --- /dev/null +++ b/crates/taskrun/src/formatter.rs @@ -0,0 +1,123 @@ +use std::fmt; +use time::format_description::well_known::Rfc3339; +use time::OffsetDateTime; +use tracing::Event; +use tracing::Subscriber; +use tracing_log::NormalizeEvent; +use tracing_subscriber::fmt::format::Writer; +use tracing_subscriber::fmt::FmtContext; +use tracing_subscriber::fmt::FormatEvent; +use tracing_subscriber::fmt::FormatFields; +use tracing_subscriber::fmt::FormattedFields; +use tracing_subscriber::registry::LookupSpan; + +fn _dummyyyy() { + let _ = tracing_subscriber::fmt::format::Full; +} + +pub struct FormatTxt; + +impl FormatEvent for FormatTxt +where + S: Subscriber + for<'a> LookupSpan<'a>, + N: for<'a> FormatFields<'a> + 'static, +{ + fn format_event(&self, ctx: &FmtContext<'_, S, N>, mut writer: Writer<'_>, event: &Event<'_>) -> fmt::Result { + let normalized_meta = event.normalized_metadata(); + let meta = normalized_meta.as_ref().unwrap_or_else(|| event.metadata()); + // Without tracing-log: + // let meta = event.metadata(); + // write!(w, "{}", datetime::DateTime::from(std::time::SystemTime::now())); + // write!(writer, "{} ", FmtLevel::new(meta.level()))?; + // Using crate `time` doing `DateTime.format_into(..)` + + // tracing_subscriber::fmt::time::datetime is private: + // tracing_subscriber::fmt::time::datetime::DateTime::from(std::time::SystemTime::now()); + + if false { + // TODO restrict to milliseconds. + // TODO there must be a better way than via cursor? + let tsnow = OffsetDateTime::now_utc(); + let buf = [0u8; 64]; + let mut cr = std::io::Cursor::new(buf); + let n = tsnow.format_into(&mut cr, &Rfc3339).unwrap(); + let buf = cr.into_inner(); + writer.write_str(std::str::from_utf8(&buf[..n]).unwrap())?; + // writer.write_char(' ')?; + } + + if true { + const DATETIME_FMT_3MS: &str = "%Y-%m-%dT%H:%M:%S.%3fZ"; + let ts = chrono::Utc::now(); + let tsfmt = ts.format(DATETIME_FMT_3MS); + writer.write_str(&tsfmt.to_string())?; + // writer.write_char(' ')?; + } + + write!(writer, " {:>5} ", meta.level().as_str())?; + + writer.write_str("[THR ")?; + let current_thread = std::thread::current(); + match current_thread.name() { + Some(name) => { + let n = name.len(); + let max = 14; + if n > max { + writer.write_str(&name[0..2])?; + writer.write_char('.')?; + writer.write_str(&name[name.len() + 3 - max..])?; + } else { + writer.write_str(name)?; + } + } + None => { + // write!(writer, "{:0>2?} ", current_thread.id())?; + write!(writer, "{:?} ", current_thread.id())?; + } + } + writer.write_char(' ')?; + + writer.write_str("[TGT ")?; + writer.write_str(meta.target())?; + writer.write_char(' ')?; + + writer.write_str("[SCP ")?; + if let Some(sc) = ctx.event_scope() { + for (i, span) in sc.from_root().enumerate() { + if i != 0 { + writer.write_char(',')?; + } + let meta = span.metadata(); + writer.write_str(meta.name())?; + let ext = span.extensions(); + if let Some(fields) = ext.get::>() { + if fields.is_empty() { + } else { + writer.write_char('{')?; + writer.write_str(fields)?; + // write!(writer, "{{{}}}", fields)?; + writer.write_char('}')?; + } + } + } + } + writer.write_char(' ')?; + + if false { + writer.write_str("[FIL ")?; + if let Some(x) = meta.file() { + writer.write_str(x)?; + if let Some(x) = meta.line() { + write!(writer, ":{x}")?; + } + } + writer.write_char(' ')?; + } + + writer.write_str("[MSG ")?; + ctx.format_fields(writer.by_ref(), event)?; + + writer.write_char('\n')?; + Ok(()) + } +} diff --git a/crates/taskrun/src/taskrun.rs b/crates/taskrun/src/taskrun.rs index c18b29d..e1826e1 100644 --- a/crates/taskrun/src/taskrun.rs +++ b/crates/taskrun/src/taskrun.rs @@ -1,3 +1,5 @@ +pub mod formatter; + pub use tokio; use crate::log::*; @@ -23,7 +25,7 @@ pub fn get_runtime() -> Arc { get_runtime_opts(24, 128) } -// #[allow(unused)] +#[allow(unused)] fn on_thread_start() { let old = panic::take_hook(); panic::set_hook(Box::new(move |info| { @@ -86,7 +88,7 @@ where E: fmt::Display, { let runtime = get_runtime(); - match tracing_init() { + match tracing_init(TracingMode::Development) { Ok(_) => {} Err(()) => { eprintln!("ERROR tracing: can not init"); @@ -102,7 +104,7 @@ where } } -fn tracing_init_inner() -> Result<(), Error> { +fn tracing_init_inner(mode: TracingMode) -> Result<(), Error> { use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::Layer; @@ -110,7 +112,12 @@ fn tracing_init_inner() -> Result<(), Error> { let timer = tracing_subscriber::fmt::time::UtcTime::new( time::format_description::parse(fmtstr).map_err(|e| format!("{e}"))?, ); - if true { + if let TracingMode::Console = mode { + // Only async console + console_subscriber::init(); + } else { + // #[cfg(DISABLED)] + // Logging setup let filter = tracing_subscriber::EnvFilter::builder() .with_default_directive(tracing::metadata::LevelFilter::INFO.into()) .from_env() @@ -121,6 +128,7 @@ fn tracing_init_inner() -> Result<(), Error> { .with_target(true) .with_ansi(false) .with_thread_names(true) + .event_format(formatter::FormatTxt) .with_filter(filter); let reg = tracing_subscriber::registry(); @@ -191,11 +199,21 @@ fn tracing_init_inner() -> Result<(), Error> { Ok(()) } -pub fn tracing_init() -> Result<(), ()> { +pub enum TracingMode { + Production, + Development, + Console, +} + +pub fn tracing_init_testing() -> Result<(), ()> { + tracing_init(TracingMode::Development) +} + +pub fn tracing_init(mode: TracingMode) -> Result<(), ()> { match INIT_TRACING_ONCE.lock() { Ok(mut initg) => { if *initg == 0 { - match tracing_init_inner() { + match tracing_init_inner(mode) { Ok(_) => { *initg = 1; }