This commit is contained in:
Dominik Werder
2023-12-05 15:44:11 +01:00
parent a5d3350747
commit 1b3e9ebd2a
35 changed files with 1180 additions and 948 deletions

View File

@@ -5,20 +5,19 @@ authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"
[dependencies]
hyper = "0.14"
http = "0.2"
futures-util = "0.3.14"
bytes = "1.0.1"
bytes = "1.5.0"
#dashmap = "3"
serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
serde_yaml = "0.9.16"
chrono = "0.4"
url = "2.2.2"
clap = { version = "4.3.21", features = ["derive", "cargo"] }
serde_yaml = "0.9.27"
chrono = "0.4.31"
url = "2.5.0"
clap = { version = "4.4.11", features = ["derive", "cargo"] }
err = { path = "../err" }
taskrun = { path = "../taskrun" }
netpod = { path = "../netpod" }
disk = { path = "../disk" }
httpclient = { path = "../httpclient" }
daqbufp2 = { path = "../daqbufp2" }

View File

@@ -84,7 +84,18 @@ async fn go() -> Result<(), Error> {
let cfg = cfg?;
daqbufp2::run_node(cfg, service_version).await?;
} else if let Ok(cfg) = serde_yaml::from_slice::<NodeConfig>(&buf) {
info!("Parsed yaml config from {}", subcmd.config);
let sp = span!(Level::INFO, "parse", id = 123u32);
sp.in_scope(|| {
let sp = span!(Level::TRACE, "sptrace");
sp.in_scope(|| {
let sp = span!(Level::INFO, "cfg", file = "some");
sp.in_scope(|| {
debug!("Parsed yaml config from {}", subcmd.config);
info!("Parsed yaml config from {}", subcmd.config);
warn!("Parsed yaml config from {}", subcmd.config);
});
});
});
let cfg: Result<NodeConfigCached, Error> = cfg.into();
let cfg = cfg?;
daqbufp2::run_node(cfg, service_version).await?;
@@ -138,9 +149,11 @@ async fn go() -> Result<(), Error> {
Ok(())
}
// TODO test data needs to be generated
// TODO test data needs to be generated.
// TODO use httpclient for the request: need to add binary POST.
//#[test]
#[allow(unused)]
#[cfg(DISABLED)]
fn simple_fetch() {
use daqbuffer::err::ErrConv;
use netpod::timeunits::*;

View File

@@ -13,6 +13,4 @@ impl<T, E: Convable> ErrConv<T> for Result<T, E> {
}
}
impl Convable for http::Error {}
impl Convable for hyper::Error {}
impl Convable for serde_yaml::Error {}

View File

@@ -9,8 +9,8 @@ path = "src/daqbufp2.rs"
[dependencies]
tokio = { version = "1.22.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] }
hyper = "0.14"
http = "0.2"
hyper = { version = "1.0.1", features = ["client", "http1", "http2"] }
http = "1"
tracing = "0.1.25"
tracing-subscriber = "0.3.16"
futures-util = "0.3.25"

View File

@@ -1,12 +1,11 @@
use crate::err::ErrConv;
use bytes::Bytes;
use chrono::DateTime;
use chrono::Utc;
use disk::streamlog::Streamlog;
use err::Error;
use futures_util::TryStreamExt;
use http::StatusCode;
use httpclient::HttpBodyAsAsyncRead;
use hyper::Body;
use items_0::streamitem::StreamItem;
use netpod::log::*;
use netpod::query::CacheUsage;
@@ -18,7 +17,6 @@ use netpod::SfDbChannel;
use netpod::APP_OCTET;
use query::api4::binned::BinnedQuery;
use streams::frames::inmem::InMemoryFrameStream;
use streams::frames::inmem::TcpReadAsBytes;
use url::Url;
pub async fn status(host: String, port: u16) -> Result<(), Error> {
@@ -27,16 +25,16 @@ pub async fn status(host: String, port: u16) -> Result<(), Error> {
let req = hyper::Request::builder()
.method(http::Method::GET)
.uri(uri)
.body(Body::empty())
.ec()?;
let client = hyper::Client::new();
let res = client.request(req).await.ec()?;
.body(httpclient::Full::new(Bytes::new()))?;
let mut client = httpclient::connect_client(req.uri()).await?;
let res = client.send_request(req).await?;
if res.status() != StatusCode::OK {
error!("Server error {:?}", res);
return Err(Error::with_msg(format!("Server error {:?}", res)));
}
let body = hyper::body::to_bytes(res.into_body()).await.ec()?;
let res = String::from_utf8(body.to_vec())?;
let (_, body) = res.into_parts();
let body = httpclient::read_body_bytes(body).await?;
let res = String::from_utf8_lossy(&body);
let t2 = chrono::Utc::now();
let ms = t2.signed_duration_since(t1).num_milliseconds() as u64;
info!("node_status DONE duration: {} ms", ms);
@@ -75,15 +73,15 @@ pub async fn get_binned(
.method(http::Method::GET)
.uri(url.to_string())
.header(http::header::ACCEPT, APP_OCTET)
.body(Body::empty())
.body(httpclient::Full::new(Bytes::new()))
.ec()?;
let client = hyper::Client::new();
let res = client.request(req).await.ec()?;
let mut client = httpclient::connect_client(req.uri()).await?;
let res = client.send_request(req).await?;
if res.status() != StatusCode::OK {
error!("Server error {:?}", res);
let (head, body) = res.into_parts();
let buf = hyper::body::to_bytes(body).await.ec()?;
let s = String::from_utf8_lossy(&buf);
let body = httpclient::read_body_bytes(body).await?;
let s = String::from_utf8_lossy(&body);
return Err(Error::with_msg(format!(
concat!(
"Server error {:?}\n",
@@ -94,8 +92,9 @@ pub async fn get_binned(
head, s
)));
}
let s1 = HttpBodyAsAsyncRead::new(res);
let s2 = InMemoryFrameStream::new(TcpReadAsBytes::new(s1), ByteSize::from_kb(8));
let (_head, body) = res.into_parts();
let inp = httpclient::IncomingStream::new(body);
let s2 = InMemoryFrameStream::new(inp, ByteSize::from_kb(8));
use futures_util::StreamExt;
use std::future::ready;
let s3 = s2

View File

@@ -1,9 +1,6 @@
use crate::err::ErrConv;
use crate::nodes::require_test_hosts_running;
use chrono::Utc;
use err::Error;
use http::StatusCode;
use hyper::Body;
use netpod::log::*;
use netpod::range::evrange::NanoRange;
use netpod::timeunits::MS;
@@ -46,20 +43,7 @@ async fn fetch_data_api_python_blob(
let hp = HostPort::from_node(node0);
let url = Url::parse(&format!("http://{}:{}/api/1/query", hp.host, hp.port))?;
info!("http get {}", url);
let req = hyper::Request::builder()
.method(http::Method::POST)
.uri(url.to_string())
.header(http::header::CONTENT_TYPE, APP_JSON)
//.header(http::header::ACCEPT, APP_JSON)
.body(Body::from(query_str))
.ec()?;
let client = hyper::Client::new();
let res = client.request(req).await.ec()?;
if res.status() != StatusCode::OK {
error!("client response {:?}", res);
return Err(Error::with_msg_no_trace(format!("bad result {res:?}")));
}
let buf = hyper::body::to_bytes(res.into_body()).await.ec()?;
let buf = httpclient::http_post(url, APP_JSON, query_str).await?;
let t2 = chrono::Utc::now();
let ms = t2.signed_duration_since(t1).num_milliseconds() as u64;
// TODO add timeout

View File

@@ -1,9 +1,6 @@
use crate::err::ErrConv;
use crate::nodes::require_test_hosts_running;
use chrono::Utc;
use err::Error;
use http::StatusCode;
use hyper::Body;
use items_0::test::f32_iter_cmp_near;
use items_0::test::f64_iter_cmp_near;
use items_0::WithLen;
@@ -352,24 +349,8 @@ async fn get_binned_json(
let mut url = Url::parse(&format!("http://{}:{}/api/4/binned", hp.host, hp.port))?;
query.append_to_url(&mut url);
let url = url;
debug!("http get {}", url);
let req = hyper::Request::builder()
.method(http::Method::GET)
.uri(url.to_string())
.header(http::header::ACCEPT, APP_JSON)
.body(Body::empty())
.ec()?;
let client = hyper::Client::new();
let res = client.request(req).await.ec()?;
if res.status() != StatusCode::OK {
error!("error response {:?}", res);
let buf = hyper::body::to_bytes(res.into_body()).await.ec()?;
let s = String::from_utf8_lossy(&buf);
error!("body of error response: {s}");
return Err(Error::with_msg_no_trace(format!("error response")));
}
let buf = hyper::body::to_bytes(res.into_body()).await.ec()?;
let s = String::from_utf8_lossy(&buf);
let res = httpclient::http_get(url, APP_JSON).await?;
let s = String::from_utf8_lossy(&res.body);
let res: JsonValue = serde_json::from_str(&s)?;
let pretty = serde_json::to_string_pretty(&res)?;
debug!("get_binned_json pretty {pretty}");

View File

@@ -1,8 +1,5 @@
use crate::err::ErrConv;
use chrono::Utc;
use err::Error;
use http::StatusCode;
use hyper::Body;
use netpod::log::*;
use netpod::AppendToUrl;
use netpod::Cluster;
@@ -21,21 +18,8 @@ pub async fn fetch_events_json(query: PlainEventsQuery, cluster: &Cluster) -> Re
let mut url = Url::parse(&format!("http://{}:{}/api/4/events", hp.host, hp.port))?;
query.append_to_url(&mut url);
let url = url;
debug!("fetch_events_json url {}", url);
let req = hyper::Request::builder()
.method(http::Method::GET)
.uri(url.to_string())
.header(http::header::ACCEPT, APP_JSON)
.body(Body::empty())
.ec()?;
let client = hyper::Client::new();
let res = client.request(req).await.ec()?;
if res.status() != StatusCode::OK {
error!("client response {:?}", res);
return Err(Error::with_msg_no_trace(format!("bad result {res:?}")));
}
let buf = hyper::body::to_bytes(res.into_body()).await.ec()?;
let s = String::from_utf8_lossy(&buf);
let res = httpclient::http_get(url, APP_JSON).await?;
let s = String::from_utf8_lossy(&res.body);
let res: JsonValue = serde_json::from_str(&s)?;
let pretty = serde_json::to_string_pretty(&res)?;
debug!("fetch_binned_json pretty: {pretty}");
@@ -54,21 +38,8 @@ pub async fn fetch_binned_json(query: BinnedQuery, cluster: &Cluster) -> Result<
let mut url = Url::parse(&format!("http://{}:{}/api/4/binned", hp.host, hp.port))?;
query.append_to_url(&mut url);
let url = url;
debug!("fetch_binned_json url {}", url);
let req = hyper::Request::builder()
.method(http::Method::GET)
.uri(url.to_string())
.header(http::header::ACCEPT, APP_JSON)
.body(Body::empty())
.ec()?;
let client = hyper::Client::new();
let res = client.request(req).await.ec()?;
if res.status() != StatusCode::OK {
error!("client response {:?}", res);
return Err(Error::with_msg_no_trace(format!("bad result {res:?}")));
}
let buf = hyper::body::to_bytes(res.into_body()).await.ec()?;
let s = String::from_utf8_lossy(&buf);
let res = httpclient::http_get(url, APP_JSON).await?;
let s = String::from_utf8_lossy(&res.body);
let res: JsonValue = serde_json::from_str(&s)?;
let pretty = serde_json::to_string_pretty(&res)?;
debug!("fetch_binned_json pretty: {pretty}");

View File

@@ -1,10 +1,8 @@
use crate::err::ErrConv;
use crate::nodes::require_test_hosts_running;
use crate::test::api4::common::fetch_events_json;
use chrono::Utc;
use err::Error;
use http::StatusCode;
use hyper::Body;
use items_0::WithLen;
use items_2::eventsdim0::EventsDim0CollectorOutput;
use netpod::log::*;
@@ -86,21 +84,8 @@ async fn events_plain_json(
let mut url = Url::parse(&format!("http://{}:{}/api/4/events", hp.host, hp.port))?;
query.append_to_url(&mut url);
let url = url;
info!("http get {}", url);
let req = hyper::Request::builder()
.method(http::Method::GET)
.uri(url.to_string())
.header(http::header::ACCEPT, APP_JSON)
.body(Body::empty())
.ec()?;
let client = hyper::Client::new();
let res = client.request(req).await.ec()?;
if res.status() != StatusCode::OK {
error!("client response {:?}", res);
return Err(Error::with_msg_no_trace(format!("bad result {res:?}")));
}
let buf = hyper::body::to_bytes(res.into_body()).await.ec()?;
let s = String::from_utf8_lossy(&buf);
let res = httpclient::http_get(url, APP_JSON).await?;
let s = String::from_utf8_lossy(&res.body);
let res: JsonValue = serde_json::from_str(&s)?;
let pretty = serde_json::to_string_pretty(&res)?;
info!("{pretty}");

View File

@@ -1,9 +1,6 @@
use crate::err::ErrConv;
use chrono::DateTime;
use chrono::Utc;
use err::Error;
use http::StatusCode;
use hyper::Body;
use netpod::log::*;
use netpod::query::CacheUsage;
use netpod::range::evrange::NanoRange;
@@ -47,25 +44,13 @@ async fn get_json_common(
let mut url = Url::parse(&format!("http://{}:{}/api/4/binned", node0.host, node0.port))?;
query.append_to_url(&mut url);
let url = url;
debug!("get_json_common get {}", url);
let req = hyper::Request::builder()
.method(http::Method::GET)
.uri(url.to_string())
.header(http::header::ACCEPT, APP_JSON)
.body(Body::empty())
.ec()?;
let client = hyper::Client::new();
let res = client.request(req).await.ec()?;
if res.status() != StatusCode::OK {
error!("get_json_common client response {:?}", res);
}
let res = hyper::body::to_bytes(res.into_body()).await.ec()?;
let res = httpclient::http_get(url, APP_JSON).await?;
let s = String::from_utf8_lossy(&res.body);
let t2 = chrono::Utc::now();
let ms = t2.signed_duration_since(t1).num_milliseconds() as u64;
// TODO add timeout
debug!("get_json_common DONE time {} ms", ms);
let res = String::from_utf8_lossy(&res).to_string();
let res: serde_json::Value = serde_json::from_str(res.as_str())?;
let res: serde_json::Value = serde_json::from_str(&s)?;
// TODO assert these:
debug!(
"result from endpoint: --------------\n{}\n--------------",

View File

@@ -7,6 +7,7 @@ pub mod pg {
pub use tokio_postgres::Client;
pub use tokio_postgres::Error;
pub use tokio_postgres::NoTls;
pub use tokio_postgres::Statement;
}
use err::anyhow;

View File

@@ -8,29 +8,27 @@ edition = "2021"
path = "src/disk.rs"
[dependencies]
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_cbor = "0.11.1"
http = "0.2"
chrono = { version = "0.4.19", features = ["serde"] }
tokio-stream = {version = "0.1.5", features = ["fs"]}
hyper = { version = "0.14", features = ["http1", "http2", "client", "server", "tcp", "stream"] }
serde = { version = "1.0.193", features = ["derive"] }
serde_json = "1.0.108"
serde_cbor = "0.11.2"
chrono = { version = "0.4.31", features = ["serde"] }
tokio-stream = {version = "0.1.14", features = ["fs"]}
async-channel = "1.9.0"
crossbeam = "0.8"
bytes = "1.4.0"
crossbeam = "0.8.2"
bytes = "1.5.0"
crc32fast = "1.3.2"
arrayref = "0.3.6"
byteorder = "1.4.3"
byteorder = "1.5.0"
futures-util = "0.3.14"
async-stream = "0.3.0"
tracing = "0.1.25"
tracing = "0.1.40"
tracing-futures = { version = "0.2.5", features = ["futures-03", "std-future"] }
fs2 = "0.4.3"
libc = "0.2.93"
hex = "0.4.3"
num-traits = "0.2.14"
num-derive = "0.4.0"
url = "2.4.0"
url = "2.5.0"
tiny-keccak = { version = "2.0", features = ["sha3"] }
err = { path = "../err" }
taskrun = { path = "../taskrun" }

View File

@@ -769,7 +769,7 @@ impl BlockingTaskIntoChannel {
match tx.send_blocking(Err(Error::with_msg_no_trace(msg))) {
Ok(()) => (),
Err(e) => {
error!("blocking_task_into_channel can not send into channel {e}");
error!("blocking_task_into_channel can not send Err into channel {e}");
}
}
break;
@@ -783,7 +783,8 @@ impl BlockingTaskIntoChannel {
match tx.send_blocking(Ok(item)) {
Ok(()) => (),
Err(e) => {
error!("blocking_task_into_channel can not send into channel {e}");
// Receiver most likely disconnected.
// error!("blocking_task_into_channel can not send into channel {e}");
break;
}
}
@@ -793,7 +794,7 @@ impl BlockingTaskIntoChannel {
match tx.send_blocking(Err(e.into())) {
Ok(()) => (),
Err(e) => {
error!("blocking_task_into_channel can not send into channel {e}");
error!("blocking_task_into_channel can not send Err into channel {e}");
}
}
break;

View File

@@ -14,10 +14,13 @@ serde_json = "1.0"
serde_cbor = "0.11.2"
rmp-serde = "1.1.1"
async-channel = "1.9.0"
async_channel_2 = { package = "async-channel", version = "2.0.0" }
chrono = { version = "0.4.26", features = ["serde"] }
url = "2.4.0"
regex = "1.9.1"
http = "0.2.9"
http_1 = { package = "http", version = "1.0.0" }
hyper_1 = { package = "hyper", version = "1.0.1" }
thiserror = "=0.0.1"
anyhow = "1.0"
tokio = "1"

View File

@@ -118,10 +118,10 @@ impl Error {
Self::with_msg_no_trace(e.to_string())
}
pub fn add_backtrace(self) -> Self {
let mut ret = self;
ret.trace_str = Some(fmt_backtrace(&backtrace::Backtrace::new()));
ret
pub fn add_backtrace(mut self) -> Self {
self.msg.extend(" (add_backtrace DISABLED)".chars());
// ret.trace_str = Some(fmt_backtrace(&backtrace::Backtrace::new()));
self
}
pub fn mark_bad_request(mut self) -> Self {
@@ -165,7 +165,11 @@ impl Error {
}
}
#[allow(unused)]
fn fmt_backtrace(trace: &backtrace::Backtrace) -> String {
if true {
return String::from("fmt_backtrace DISABLED");
}
use std::io::Write;
let mut buf = Vec::new();
let mut c1 = 0;
@@ -295,139 +299,163 @@ impl ToErr for Infallible {
impl From<String> for Error {
fn from(k: String) -> Self {
Self::with_msg(k)
Self::from_string(k)
}
}
impl From<&str> for Error {
fn from(k: &str) -> Self {
Self::with_msg(k)
Self::from_string(k)
}
}
impl From<std::io::Error> for Error {
fn from(k: std::io::Error) -> Self {
Self::with_msg(k.to_string())
Self::from_string(k)
}
}
impl From<AddrParseError> for Error {
fn from(k: AddrParseError) -> Self {
Self::with_msg(k.to_string())
Self::from_string(k)
}
}
impl From<serde_json::Error> for Error {
fn from(k: serde_json::Error) -> Self {
Self::with_msg(k.to_string())
Self::from_string(k)
}
}
impl<T> From<async_channel::SendError<T>> for Error {
fn from(k: async_channel::SendError<T>) -> Self {
Self::with_msg(format!("{:?}", k))
Self::from_string(k)
}
}
impl From<async_channel::RecvError> for Error {
fn from(k: async_channel::RecvError) -> Self {
Self::with_msg(k.to_string())
Self::from_string(k)
}
}
impl<T> From<async_channel_2::SendError<T>> for Error {
fn from(k: async_channel_2::SendError<T>) -> Self {
Self::from_string(k)
}
}
impl From<async_channel_2::RecvError> for Error {
fn from(k: async_channel_2::RecvError) -> Self {
Self::from_string(k)
}
}
impl From<chrono::format::ParseError> for Error {
fn from(k: chrono::format::ParseError) -> Self {
Self::with_msg(k.to_string())
Self::from_string(k)
}
}
impl From<ParseIntError> for Error {
fn from(k: ParseIntError) -> Self {
Self::with_msg(k.to_string())
Self::from_string(k)
}
}
impl From<ParseFloatError> for Error {
fn from(k: ParseFloatError) -> Self {
Self::with_msg(k.to_string())
Self::from_string(k)
}
}
impl From<FromUtf8Error> for Error {
fn from(k: FromUtf8Error) -> Self {
Self::with_msg(k.to_string())
Self::from_string(k)
}
}
impl From<std::str::Utf8Error> for Error {
fn from(k: std::str::Utf8Error) -> Self {
Self::with_msg(k.to_string())
Self::from_string(k)
}
}
impl From<serde_cbor::Error> for Error {
fn from(k: serde_cbor::Error) -> Self {
Self::with_msg(k.to_string())
Self::from_string(k)
}
}
impl From<std::fmt::Error> for Error {
fn from(k: std::fmt::Error) -> Self {
Self::with_msg(k.to_string())
Self::from_string(k)
}
}
impl From<regex::Error> for Error {
fn from(k: regex::Error) -> Self {
Self::with_msg(k.to_string())
Self::from_string(k)
}
}
impl<T> From<PoisonError<T>> for Error {
fn from(_: PoisonError<T>) -> Self {
Self::with_msg("PoisonError")
Self::from_string("PoisonError")
}
}
impl From<url::ParseError> for Error {
fn from(k: url::ParseError) -> Self {
Self::with_msg(format!("{:?}", k))
Self::from_string(format!("{:?}", k))
}
}
impl From<TryFromSliceError> for Error {
fn from(k: TryFromSliceError) -> Self {
Self::with_msg(format!("{:?}", k))
Self::from_string(format!("{:?}", k))
}
}
impl From<rmp_serde::encode::Error> for Error {
fn from(k: rmp_serde::encode::Error) -> Self {
Self::with_msg(format!("{:?}", k))
Self::from_string(format!("{:?}", k))
}
}
impl From<rmp_serde::decode::Error> for Error {
fn from(k: rmp_serde::decode::Error) -> Self {
Self::with_msg(format!("{:?}", k))
Self::from_string(format!("{:?}", k))
}
}
impl From<http::header::ToStrError> for Error {
fn from(k: http::header::ToStrError) -> Self {
Self::with_msg(format!("{:?}", k))
Self::from_string(format!("{:?}", k))
}
}
impl From<anyhow::Error> for Error {
fn from(k: anyhow::Error) -> Self {
Self::with_msg(format!("{k}"))
Self::from_string(format!("{k}"))
}
}
impl From<tokio::task::JoinError> for Error {
fn from(k: tokio::task::JoinError) -> Self {
Self::with_msg(format!("{k}"))
Self::from_string(format!("{k}"))
}
}
impl From<http_1::Error> for Error {
fn from(k: http_1::Error) -> Self {
Self::from_string(k)
}
}
impl From<hyper_1::Error> for Error {
fn from(k: hyper_1::Error) -> Self {
Self::from_string(k)
}
}

View File

@@ -8,12 +8,15 @@ edition = "2021"
futures-util = "0.3.25"
serde = { version = "1.0.147", features = ["derive"] }
serde_json = "1.0.89"
http = "0.2.8"
url = "2.3.1"
tokio = { version = "1.22.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] }
tracing = "0.1.37"
hyper = { version = "0.14.23", features = ["http1", "http2", "client", "server", "tcp", "stream"] }
http = "1.0.0"
http-body = "1.0.0"
http-body-util = "0.1.0"
hyper = { version = "1.0.1", features = ["http1", "http2", "client", "server"] }
hyper-tls = { version = "0.5.0" }
hyper-util = { version = "0.1.1", features = ["full"] }
bytes = "1.3.0"
async-channel = "1.8.0"
err = { path = "../err" }

View File

@@ -1,44 +1,95 @@
pub use hyper_util;
pub use http_body_util;
pub use http_body_util::Full;
use bytes::Bytes;
use err::Error;
use bytes::BytesMut;
use err::PublicError;
use futures_util::pin_mut;
use http::header;
use http::Request;
use http::Response;
use http::StatusCode;
use hyper::body::HttpBody;
use hyper::Body;
use http_body_util::combinators::BoxBody;
use hyper::body::Body;
use hyper::client::conn::http2::SendRequest;
use hyper::Method;
use netpod::log::*;
use netpod::AppendToUrl;
use netpod::ChannelConfigQuery;
use netpod::ChannelConfigResponse;
use netpod::NodeConfigCached;
use netpod::APP_JSON;
use std::fmt;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use tokio::io;
use tokio::io::AsyncRead;
use tokio::io::ReadBuf;
use tokio::net::TcpStream;
use url::Url;
pub trait ErrConv<T> {
fn ec(self) -> Result<T, ::err::Error>;
pub type BodyBox = BoxBody<Bytes, BodyError>;
pub type RespBox = Response<BodyBox>;
#[derive(Debug)]
pub enum BodyError {
Bad,
}
pub trait Convable: ToString {}
impl<T, E: Convable> ErrConv<T> for Result<T, E> {
fn ec(self) -> Result<T, ::err::Error> {
match self {
Ok(x) => Ok(x),
Err(e) => Err(::err::Error::from_string(e.to_string())),
}
impl fmt::Display for BodyError {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.write_str("Bad")
}
}
impl Convable for http::Error {}
impl Convable for hyper::Error {}
impl std::error::Error for BodyError {}
impl From<std::convert::Infallible> for BodyError {
fn from(_value: std::convert::Infallible) -> Self {
BodyError::Bad
}
}
#[derive(Debug)]
pub enum Error {
BadUrl,
Connection,
IO,
Http,
}
impl From<std::io::Error> for Error {
fn from(value: std::io::Error) -> Self {
Self::IO
}
}
impl From<http::Error> for Error {
fn from(value: http::Error) -> Self {
Self::Http
}
}
impl From<hyper::Error> for Error {
fn from(value: hyper::Error) -> Self {
Self::Http
}
}
impl fmt::Display for Error {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "{self:?}")
}
}
impl err::ToErr for Error {
fn to_err(self) -> err::Error {
err::Error::with_msg_no_trace(format!("self"))
}
}
pub struct HttpResponse {
pub head: http::response::Parts,
@@ -49,143 +100,130 @@ pub async fn http_get(url: Url, accept: &str) -> Result<HttpResponse, Error> {
let req = Request::builder()
.method(http::Method::GET)
.uri(url.to_string())
.header(header::HOST, url.host_str().ok_or_else(|| Error::BadUrl)?)
.header(header::ACCEPT, accept)
.body(Body::empty())
.ec()?;
let client = hyper::Client::new();
let res = client.request(req).await.ec()?;
let (head, body) = res.into_parts();
.body(Full::new(Bytes::new()))?;
let mut send_req = connect_client(req.uri()).await?;
let res = send_req.send_request(req).await?;
let (head, mut body) = res.into_parts();
debug!("http_get head {head:?}");
let body = hyper::body::to_bytes(body).await.ec()?;
let ret = HttpResponse { head, body };
use bytes::BufMut;
use http_body_util::BodyExt;
let mut buf = BytesMut::new();
while let Some(x) = body.frame().await {
match x {
Ok(mut x) => {
if let Some(x) = x.data_mut() {
buf.put(x);
}
}
Err(e) => return Err(e.into()),
}
}
let ret = HttpResponse {
head,
body: buf.freeze(),
};
Ok(ret)
}
pub async fn http_post(url: Url, accept: &str, body: String) -> Result<Bytes, Error> {
let body = Bytes::from(body.as_bytes().to_vec());
let req = Request::builder()
.method(http::Method::POST)
.uri(url.to_string())
.header(header::HOST, url.host_str().ok_or_else(|| Error::BadUrl)?)
.header(header::CONTENT_TYPE, APP_JSON)
.header(header::ACCEPT, accept)
.body(Body::from(body))
.ec()?;
let client = hyper::Client::new();
let res = client.request(req).await.ec()?;
.body(Full::new(body))?;
let mut send_req = connect_client(req.uri()).await?;
let res = send_req.send_request(req).await?;
if res.status() != StatusCode::OK {
error!("Server error {:?}", res);
let (head, body) = res.into_parts();
let buf = hyper::body::to_bytes(body).await.ec()?;
let (_head, body) = res.into_parts();
let buf = read_body_bytes(body).await?;
let s = String::from_utf8_lossy(&buf);
return Err(Error::with_msg(format!(
concat!(
"Server error {:?}\n",
"---------------------- message from http body:\n",
"{}\n",
"---------------------- end of http body",
),
head, s
)));
return Err(Error::Http);
}
let body = hyper::body::to_bytes(res.into_body()).await.ec()?;
Ok(body)
}
// TODO move to a better fitting module:
pub struct HttpBodyAsAsyncRead {
inp: Response<Body>,
left: Bytes,
rp: usize,
}
impl HttpBodyAsAsyncRead {
pub fn new(inp: Response<Body>) -> Self {
Self {
inp,
left: Bytes::new(),
rp: 0,
let (head, mut body) = res.into_parts();
debug!("http_get head {head:?}");
use bytes::BufMut;
use http_body_util::BodyExt;
let mut buf = BytesMut::new();
while let Some(x) = body.frame().await {
match x {
Ok(mut x) => {
if let Some(x) = x.data_mut() {
buf.put(x);
}
}
Err(e) => return Err(e.into()),
}
}
let buf = read_body_bytes(body).await?;
Ok(buf)
}
impl AsyncRead for HttpBodyAsAsyncRead {
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context, buf: &mut ReadBuf) -> Poll<io::Result<()>> {
trace!("impl AsyncRead for HttpBodyAsAsyncRead");
use Poll::*;
if self.left.len() != 0 {
let n1 = buf.remaining();
let n2 = self.left.len() - self.rp;
if n2 <= n1 {
buf.put_slice(self.left[self.rp..].as_ref());
self.left = Bytes::new();
self.rp = 0;
Ready(Ok(()))
} else {
buf.put_slice(self.left[self.rp..(self.rp + n2)].as_ref());
self.rp += n2;
Ready(Ok(()))
pub async fn connect_client(uri: &http::Uri) -> Result<SendRequest<Full<Bytes>>, Error> {
let host = uri.host().ok_or_else(|| Error::BadUrl)?;
let port = uri.port_u16().ok_or_else(|| Error::BadUrl)?;
let stream = TcpStream::connect(format!("{host}:{port}")).await?;
let executor = hyper_util::rt::TokioExecutor::new();
let (send_req, conn) = hyper::client::conn::http2::Builder::new(executor)
.handshake(hyper_util::rt::TokioIo::new(stream))
.await?;
// TODO would need to take greater care of this task to catch connection-level errors.
tokio::spawn(conn);
Ok(send_req)
}
pub async fn read_body_bytes(mut body: hyper::body::Incoming) -> Result<Bytes, Error> {
use bytes::BufMut;
use http_body_util::BodyExt;
let mut buf = BytesMut::new();
while let Some(x) = body.frame().await {
match x {
Ok(mut x) => {
if let Some(x) = x.data_mut() {
buf.put(x);
}
}
} else {
let f = &mut self.inp;
pin_mut!(f);
match f.poll_data(cx) {
Ready(Some(Ok(k))) => {
let n1 = buf.remaining();
if k.len() <= n1 {
buf.put_slice(k.as_ref());
Ready(Ok(()))
Err(e) => return Err(e.into()),
}
}
Ok(buf.freeze())
}
pub struct IncomingStream {
inp: hyper::body::Incoming,
}
impl IncomingStream {
pub fn new(inp: hyper::body::Incoming) -> Self {
Self { inp }
}
}
impl futures_util::Stream for IncomingStream {
type Item = Result<Bytes, err::Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
let j = &mut self.get_mut().inp;
let k = Pin::new(j);
match hyper::body::Body::poll_frame(k, cx) {
Ready(Some(x)) => match x {
Ok(x) => {
if let Ok(x) = x.into_data() {
Ready(Some(Ok(x)))
} else {
buf.put_slice(k[..n1].as_ref());
self.left = k;
self.rp = n1;
Ready(Ok(()))
Ready(Some(Ok(Bytes::new())))
}
}
Ready(Some(Err(e))) => Ready(Err(io::Error::new(
io::ErrorKind::Other,
Error::with_msg(format!("Received by HttpBodyAsAsyncRead: {:?}", e)),
))),
Ready(None) => Ready(Ok(())),
Pending => Pending,
}
}
}
}
pub async fn get_channel_config(
q: &ChannelConfigQuery,
node_config: &NodeConfigCached,
) -> Result<ChannelConfigResponse, Error> {
let mut url = Url::parse(&format!(
"http://{}:{}/api/4/channel/config",
node_config.node.host, node_config.node.port
))?;
q.append_to_url(&mut url);
let req = hyper::Request::builder()
.method(Method::GET)
.uri(url.as_str())
.body(Body::empty())
.map_err(Error::from_string)?;
let client = hyper::Client::new();
let res = client
.request(req)
.await
.map_err(|e| Error::with_msg(format!("get_channel_config request error: {e:?}")))?;
if res.status().is_success() {
let buf = hyper::body::to_bytes(res.into_body())
.await
.map_err(|e| Error::with_msg(format!("can not read response: {e:?}")))?;
let ret: ChannelConfigResponse = serde_json::from_slice(&buf)
.map_err(|e| Error::with_msg(format!("can not parse the channel config response json: {e:?}")))?;
Ok(ret)
} else {
let buf = hyper::body::to_bytes(res.into_body())
.await
.map_err(|e| Error::with_msg(format!("can not read response: {e:?}")))?;
match serde_json::from_slice::<PublicError>(&buf) {
Ok(e) => Err(e.into()),
Err(_) => Err(Error::with_msg(format!(
"can not parse the http error body: {:?}",
String::from_utf8_lossy(&buf)
))),
Err(e) => Ready(Some(Err(e.into()))),
},
Ready(None) => Ready(None),
Pending => Pending,
}
}
}

View File

@@ -10,18 +10,20 @@ path = "src/httpret.rs"
[dependencies]
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
http = "0.2.9"
url = "2.4.0"
hyper = { version = "0.14", features = ["http1", "http2", "client", "server", "tcp", "stream"] }
bytes = "1.4.0"
url = "2.5.0"
http = "1.0.0"
http-body-util = { version = "0.1.0" }
hyper = { version = "1.0.1", features = ["http1", "http2", "client", "server"] }
hyper-util = { version = "0.1.1", features = ["http1", "http2", "client", "server"] }
bytes = "1.5.0"
futures-util = "0.3.14"
tracing = "0.1"
tracing-futures = "0.2"
async-channel = "1.9.0"
itertools = "0.11.0"
chrono = "0.4.23"
md-5 = "0.10.5"
regex = "1.9.3"
md-5 = "0.10.6"
regex = "1.10.2"
err = { path = "../err" }
netpod = { path = "../netpod" }
query = { path = "../query" }

View File

@@ -1,17 +1,24 @@
use crate::body_empty;
use crate::body_string;
use crate::err::Error;
use crate::gather::gather_get_json_generic;
use crate::gather::SubRes;
use crate::response;
use crate::ReqCtx;
use crate::Requ;
use crate::RespFull;
use bytes::BufMut;
use bytes::Bytes;
use bytes::BytesMut;
use disk::merge::mergedblobsfromremotes::MergedBlobsFromRemotes;
use futures_util::Stream;
use futures_util::StreamExt;
use http::header;
use http::Method;
use http::StatusCode;
use hyper::Body;
use hyper::Client;
use http_body_util::Full;
use httpclient::connect_client;
use httpclient::read_body_bytes;
use hyper::Request;
use hyper::Response;
use items_0::streamitem::RangeCompletableItem;
@@ -131,9 +138,9 @@ impl FromErrorCode for ChannelSearchResultItemV1 {
#[derive(Debug, Serialize, Deserialize)]
pub struct ChannelSearchResultV1(pub Vec<ChannelSearchResultItemV1>);
pub async fn channel_search_list_v1(req: Request<Body>, proxy_config: &ProxyConfig) -> Result<Response<Body>, Error> {
pub async fn channel_search_list_v1(req: Requ, proxy_config: &ProxyConfig) -> Result<RespFull, Error> {
let (head, reqbody) = req.into_parts();
let bodybytes = hyper::body::to_bytes(reqbody).await?;
let bodybytes = read_body_bytes(reqbody).await?;
let query: ChannelSearchQueryV1 = serde_json::from_slice(&bodybytes)?;
match head.headers.get(http::header::ACCEPT) {
Some(v) => {
@@ -156,17 +163,17 @@ pub async fn channel_search_list_v1(req: Request<Body>, proxy_config: &ProxyConf
}
Err(e) => Err(Error::with_msg(format!("parse error for: {:?} {:?}", sh, e))),
})
.fold_ok(vec![], |mut a, x| {
.fold_ok(Vec::new(), |mut a, x| {
a.push(x);
a
})?;
let tags: Vec<_> = urls.iter().map(|k| k.to_string()).collect();
let nt = |tag, res| {
let fut = async {
let body = hyper::body::to_bytes(res).await?;
let body = read_body_bytes(res).await?;
let res: ChannelSearchResult = match serde_json::from_slice(&body) {
Ok(k) => k,
Err(_) => ChannelSearchResult { channels: vec![] },
Err(_) => ChannelSearchResult { channels: Vec::new() },
};
let ret = SubRes {
tag,
@@ -211,7 +218,7 @@ pub async fn channel_search_list_v1(req: Request<Body>, proxy_config: &ProxyConf
}
let res = response(StatusCode::OK)
.header(http::header::CONTENT_TYPE, APP_JSON)
.body(Body::from(serde_json::to_string(&res)?))?;
.body(body_string(serde_json::to_string(&res)?))?;
Ok(res)
};
let bodies = (0..urls.len()).into_iter().map(|_| None).collect();
@@ -227,19 +234,16 @@ pub async fn channel_search_list_v1(req: Request<Body>, proxy_config: &ProxyConf
.await?;
Ok(ret)
} else {
Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?)
Ok(response(StatusCode::NOT_ACCEPTABLE).body(Full::new(Bytes::new()))?)
}
}
None => Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?),
None => Ok(response(StatusCode::NOT_ACCEPTABLE).body(Full::new(Bytes::new()))?),
}
}
pub async fn channel_search_configs_v1(
req: Request<Body>,
proxy_config: &ProxyConfig,
) -> Result<Response<Body>, Error> {
pub async fn channel_search_configs_v1(req: Requ, proxy_config: &ProxyConfig) -> Result<RespFull, Error> {
let (head, reqbody) = req.into_parts();
let bodybytes = hyper::body::to_bytes(reqbody).await?;
let bodybytes = read_body_bytes(reqbody).await?;
let query: ChannelSearchQueryV1 = serde_json::from_slice(&bodybytes)?;
match head.headers.get(http::header::ACCEPT) {
Some(v) => {
@@ -270,7 +274,7 @@ pub async fn channel_search_configs_v1(
let tags: Vec<_> = urls.iter().map(|k| k.to_string()).collect();
let nt = |tag, res| {
let fut = async {
let body = hyper::body::to_bytes(res).await?;
let body = read_body_bytes(res).await?;
let res: ChannelSearchResult = match serde_json::from_slice(&body) {
Ok(k) => k,
Err(_) => ChannelSearchResult { channels: vec![] },
@@ -336,7 +340,7 @@ pub async fn channel_search_configs_v1(
}
let res = response(StatusCode::OK)
.header(http::header::CONTENT_TYPE, APP_JSON)
.body(Body::from(serde_json::to_string(&res)?))?;
.body(Full::new(serde_json::to_string(&res)?))?;
Ok(res)
};
let bodies = (0..urls.len()).into_iter().map(|_| None).collect();
@@ -352,10 +356,10 @@ pub async fn channel_search_configs_v1(
.await?;
Ok(ret)
} else {
Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?)
Ok(response(StatusCode::NOT_ACCEPTABLE).body(Full::new(Bytes::new()))?)
}
}
None => Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?),
None => Ok(response(StatusCode::NOT_ACCEPTABLE).body(Full::new(Bytes::new()))?),
}
}
@@ -409,39 +413,37 @@ impl FromErrorCode for ChannelBackendConfigsV1 {
fn from_error_code(backend: &str, code: ErrorCode) -> Self {
Self {
backend: backend.into(),
channels: vec![],
channels: Vec::new(),
error: Some(ErrorDescription { code }),
}
}
}
// TODO replace usage of this by gather-generic
pub async fn gather_json_2_v1(
req: Request<Body>,
pathpre: &str,
_proxy_config: &ProxyConfig,
) -> Result<Response<Body>, Error> {
pub async fn gather_json_2_v1(req: Requ, pathpre: &str, _proxy_config: &ProxyConfig) -> Result<RespFull, Error> {
let (part_head, part_body) = req.into_parts();
let bodyslice = hyper::body::to_bytes(part_body).await?;
let bodyslice = read_body_bytes(part_body).await?;
let gather_from: GatherFromV1 = serde_json::from_slice(&bodyslice)?;
let mut spawned = vec![];
let mut spawned = Vec::new();
let uri = part_head.uri;
let path_post = &uri.path()[pathpre.len()..];
//let hds = part_head.headers;
for gh in gather_from.hosts {
let uri = format!("http://{}:{}/{}", gh.host, gh.port, path_post);
let req = Request::builder().method(Method::GET).uri(uri);
let req = Request::builder()
.method(Method::GET)
.uri(uri)
.header(header::HOST, gh.host);
let req = if gh.inst.len() > 0 {
req.header("retrieval_instance", &gh.inst)
} else {
req
};
let req = req.header(http::header::ACCEPT, APP_JSON);
//.body(Body::from(serde_json::to_string(&q)?))?;
let req = req.body(Body::empty());
let req = req.body(Full::new(Bytes::new()));
let task = tokio::spawn(async move {
//let res = Client::new().request(req);
let res = Client::new().request(req?).await;
let mut client = connect_client(req.uri()).await?;
let res = client.send_request(req).await?;
Ok::<_, Error>(process_answer(res?).await?)
});
let task = tokio::time::timeout(std::time::Duration::from_millis(5000), task);
@@ -488,10 +490,9 @@ struct GatherHostV1 {
inst: String,
}
async fn process_answer(res: Response<Body>) -> Result<JsonValue, Error> {
async fn process_answer(res: RespFull) -> Result<JsonValue, Error> {
let (pre, mut body) = res.into_parts();
if pre.status != StatusCode::OK {
use hyper::body::HttpBody;
if let Some(c) = body.data().await {
let c: bytes::Bytes = c?;
let s1 = String::from_utf8(c.to_vec())?;
@@ -504,9 +505,7 @@ async fn process_answer(res: Response<Body>) -> Result<JsonValue, Error> {
Ok(JsonValue::String(format!("status {}", pre.status.as_str())))
}
} else {
let body: hyper::Body = body;
let body_all = hyper::body::to_bytes(body).await?;
let val = match serde_json::from_slice(&body_all) {
let val = match serde_json::from_slice(the_data) {
Ok(k) => k,
Err(_e) => JsonValue::String(String::from_utf8(body_all.to_vec())?),
};
@@ -533,6 +532,8 @@ pub struct DataApiPython3DataStream {
data_done: bool,
completed: bool,
stats: Api1WarningStats,
count_emits: u64,
count_bytes: u64,
}
impl DataApiPython3DataStream {
@@ -565,6 +566,8 @@ impl DataApiPython3DataStream {
data_done: false,
completed: false,
stats: Api1WarningStats::new(),
count_emits: 0,
count_bytes: 0,
}
}
@@ -776,12 +779,21 @@ impl Stream for DataApiPython3DataStream {
panic!("poll on completed")
} else if self.data_done {
self.completed = true;
let reqid = self.reqctx.reqid();
info!(
"{} response body sent {} bytes ({})",
reqid, self.count_bytes, self.count_emits
);
Ready(None)
} else {
if let Some(stream) = &mut self.chan_stream {
match stream.poll_next_unpin(cx) {
Ready(Some(k)) => match self.handle_chan_stream_ready(k) {
Ok(k) => Ready(Some(Ok(k))),
Ok(k) => {
self.count_emits += 1;
self.count_bytes += k.len() as u64;
Ready(Some(Ok(k)))
}
Err(e) => {
error!("{e}");
self.chan_stream = None;
@@ -854,7 +866,7 @@ fn shape_to_api3proto(sh: &Option<Vec<u32>>) -> Vec<u32> {
pub struct Api1EventsBinaryHandler {}
impl Api1EventsBinaryHandler {
pub fn handler(req: &Request<Body>) -> Option<Self> {
pub fn handler(req: &Requ) -> Option<Self> {
if req.uri().path() == "/api/1/query" {
Some(Self {})
} else {
@@ -862,14 +874,9 @@ impl Api1EventsBinaryHandler {
}
}
pub async fn handle(
&self,
req: Request<Body>,
_ctx: &ReqCtx,
node_config: &NodeConfigCached,
) -> Result<Response<Body>, Error> {
pub async fn handle(&self, req: Requ, _ctx: &ReqCtx, node_config: &NodeConfigCached) -> Result<RespFull, Error> {
if req.method() != Method::POST {
return Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?);
return Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Full::new(Bytes::new()))?);
}
let (head, body) = req.into_parts();
let accept = head
@@ -878,7 +885,7 @@ impl Api1EventsBinaryHandler {
.map_or(Ok(ACCEPT_ALL), |k| k.to_str())
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?
.to_owned();
let body_data = hyper::body::to_bytes(body).await?;
let body_data = read_body_bytes(body).await?;
if body_data.len() < 1024 * 2 && body_data.first() == Some(&"{".as_bytes()[0]) {
debug!("request body_data string: {}", String::from_utf8_lossy(&body_data));
}
@@ -932,7 +939,7 @@ impl Api1EventsBinaryHandler {
span: tracing::Span,
reqidspan: tracing::Span,
ncc: &NodeConfigCached,
) -> Result<Response<Body>, Error> {
) -> Result<RespFull, Error> {
let self_name = any::type_name::<Self>();
// TODO this should go to usage statistics:
debug!(
@@ -1004,7 +1011,7 @@ impl Api1EventsBinaryHandler {
// TODO set the public error code and message and return Err(e).
let e = Error::with_public_msg_no_trace(format!("{self_name} unsupported Accept: {}", accept));
error!("{self_name} {e}");
Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?)
Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty)?)
}
}
}
@@ -1016,7 +1023,7 @@ impl RequestStatusHandler {
"/api/1/requestStatus/"
}
pub fn handler(req: &Request<Body>) -> Option<Self> {
pub fn handler(req: &Requ) -> Option<Self> {
if req.uri().path().starts_with(Self::path_prefix()) {
Some(Self {})
} else {
@@ -1024,29 +1031,29 @@ impl RequestStatusHandler {
}
}
pub async fn handle(&self, req: Request<Body>, _ncc: &NodeConfigCached) -> Result<Response<Body>, Error> {
pub async fn handle(&self, req: Requ, _ncc: &NodeConfigCached) -> Result<RespFull, Error> {
let (head, body) = req.into_parts();
if head.method != Method::GET {
return Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?);
return Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?);
}
let accept = head
.headers
.get(http::header::ACCEPT)
.get(header::ACCEPT)
.map_or(Ok(ACCEPT_ALL), |k| k.to_str())
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?
.to_owned();
if accept != APP_JSON && accept != ACCEPT_ALL {
// TODO set the public error code and message and return Err(e).
let e = Error::with_public_msg_no_trace(format!("Unsupported Accept: {:?}", accept));
let e = Error::with_public_msg_no_trace(format!("unsupported accept: {:?}", accept));
error!("{e}");
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?);
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?);
}
let _body_data = hyper::body::to_bytes(body).await?;
let _body_data = read_body_bytes(body).await?;
let status_id = &head.uri.path()[Self::path_prefix().len()..];
debug!("RequestStatusHandler status_id {:?}", status_id);
let status = crate::status_board()?.status_as_json(status_id);
let s = serde_json::to_string(&status)?;
let ret = response(StatusCode::OK).body(Body::from(s))?;
let ret = response(StatusCode::OK).body(Full::new(s))?;
Ok(ret)
}
}

View File

@@ -3,7 +3,6 @@ use futures_util::StreamExt;
use http::HeaderMap;
use http::Response;
use http::StatusCode;
use hyper::Body;
use netpod::log::*;
use netpod::APP_JSON;
use std::pin::Pin;
@@ -58,15 +57,15 @@ impl ToPublicResponse for ::err::Error {
struct BodyStreamWrap(netpod::BodyStream);
impl hyper::body::HttpBody for BodyStreamWrap {
type Data = bytes::Bytes;
type Error = ::err::Error;
// impl hyper::body::HttpBody for BodyStreamWrap {
// type Data = bytes::Bytes;
// type Error = ::err::Error;
fn poll_data(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<Self::Data, Self::Error>>> {
self.0.inner.poll_next_unpin(cx)
}
// fn poll_data(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<Self::Data, Self::Error>>> {
// self.0.inner.poll_next_unpin(cx)
// }
fn poll_trailers(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
Poll::Ready(Ok(None))
}
}
// fn poll_trailers(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
// Poll::Ready(Ok(None))
// }
// }

114
crates/httpret/src/cache.rs Normal file
View File

@@ -0,0 +1,114 @@
use async_channel::Receiver;
use async_channel::Sender;
use netpod::log::*;
use std::collections::BTreeMap;
use std::sync::Mutex;
use std::time::SystemTime;
pub struct Dummy(u32);
pub enum CachePortal<V> {
Fresh,
Existing(Receiver<Dummy>),
Known(V),
}
impl<V> CachePortal<V> {}
enum CacheEntry<V> {
Waiting(SystemTime, Sender<Dummy>, Receiver<Dummy>),
Known(SystemTime, V),
}
impl<V> CacheEntry<V> {
fn ts(&self) -> &SystemTime {
match self {
CacheEntry::Waiting(ts, _, _) => ts,
CacheEntry::Known(ts, _) => ts,
}
}
}
struct CacheInner<K, V> {
map: BTreeMap<K, CacheEntry<V>>,
}
impl<K, V> CacheInner<K, V>
where
K: Ord,
{
const fn new() -> Self {
Self { map: BTreeMap::new() }
}
fn housekeeping(&mut self) {
if self.map.len() > 200 {
info!("trigger housekeeping with len {}", self.map.len());
let mut v: Vec<_> = self.map.iter().map(|(k, v)| (v.ts(), k)).collect();
v.sort();
let ts0 = v[v.len() / 2].0.clone();
//let tsnow = SystemTime::now();
//let tscut = tsnow.checked_sub(Duration::from_secs(60 * 10)).unwrap_or(tsnow);
self.map.retain(|_k, v| v.ts() >= &ts0);
info!("housekeeping kept len {}", self.map.len());
}
}
}
pub struct Cache<K, V> {
inner: Mutex<CacheInner<K, V>>,
}
impl<K, V> Cache<K, V>
where
K: Ord,
V: Clone,
{
pub const fn new() -> Self {
Self {
inner: Mutex::new(CacheInner::new()),
}
}
pub fn housekeeping(&self) {
let mut g = self.inner.lock().unwrap();
g.housekeeping();
}
pub fn portal(&self, key: K) -> CachePortal<V> {
use std::collections::btree_map::Entry;
let mut g = self.inner.lock().unwrap();
g.housekeeping();
match g.map.entry(key) {
Entry::Vacant(e) => {
let (tx, rx) = async_channel::bounded(16);
let ret = CachePortal::Fresh;
let v = CacheEntry::Waiting(SystemTime::now(), tx, rx);
e.insert(v);
ret
}
Entry::Occupied(e) => match e.get() {
CacheEntry::Waiting(_ts, _tx, rx) => CachePortal::Existing(rx.clone()),
CacheEntry::Known(_ts, v) => CachePortal::Known(v.clone()),
},
}
}
pub fn set_value(&self, key: K, val: V) {
let mut g = self.inner.lock().unwrap();
if let Some(e) = g.map.get_mut(&key) {
match e {
CacheEntry::Waiting(ts, tx, _rx) => {
let tx = tx.clone();
*e = CacheEntry::Known(*ts, val);
tx.close();
}
CacheEntry::Known(_ts, _val) => {
error!("set_value already known");
}
}
} else {
error!("set_value no entry for key");
}
}
}

View File

@@ -1,11 +1,18 @@
use crate::body_empty;
use crate::err::Error;
use crate::response;
use crate::Requ;
use crate::RespFull;
use crate::StreamBody;
use bytes::Bytes;
use futures_util::Stream;
use futures_util::TryStreamExt;
use http::Method;
use http::Response;
use http::StatusCode;
use hyper::Body;
use hyper::Request;
use hyper::Response;
use http_body_util::BodyExt;
use httpclient::httpclient::http_body_util;
use httpclient::RespBox;
use netpod::get_url_query_pairs;
use netpod::log::*;
use netpod::DiskIoTune;
@@ -57,7 +64,7 @@ impl DownloadHandler {
"/api/4/test/download/"
}
pub fn handler(req: &Request<Body>) -> Option<Self> {
pub fn handler(req: &Requ) -> Option<Self> {
if req.uri().path().starts_with(Self::path_prefix()) {
Some(Self {})
} else {
@@ -65,7 +72,15 @@ impl DownloadHandler {
}
}
pub async fn get(&self, req: Request<Body>, ncc: &NodeConfigCached) -> Result<Response<Body>, Error> {
pub async fn handle(&self, req: Requ, node_config: &NodeConfigCached) -> Result<RespBox, Error> {
if req.method() == Method::GET {
self.get(req, node_config).await
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?)
}
}
pub async fn get(&self, req: Requ, ncc: &NodeConfigCached) -> Result<RespBox, Error> {
let (head, _body) = req.into_parts();
let p2 = &head.uri.path()[Self::path_prefix().len()..];
let base = match &ncc.node.sf_databuffer {
@@ -78,15 +93,18 @@ impl DownloadHandler {
let pp = base.join(p2);
info!("Try to open {pp:?}");
let file = tokio::fs::OpenOptions::new().read(true).open(&pp).await?;
let s = disk::file_content_stream(pp, file, query.disk_io_tune.clone(), "download").map_ok(|x| x.into_buf());
Ok(response(StatusCode::OK).body(Body::wrap_stream(s))?)
}
let stream =
disk::file_content_stream(pp, file, query.disk_io_tune.clone(), "download").map_ok(|x| x.into_buf());
pub async fn handle(&self, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
if req.method() == Method::GET {
self.get(req, node_config).await
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
use futures_util::StreamExt;
use hyper::body::Frame;
let stream = stream.map(|item| item.map(|x| Frame::data(x.freeze())));
let body = httpclient::httpclient::http_body_util::StreamBody::new(stream);
let body = BodyExt::boxed(body);
// let body = http_body_util::combinators::BoxBody::new(body);
// let body: StreamBody = Box::pin(body);
// let body: Pin<Box<dyn Stream<Item = Result<Frame<Bytes>, err::Error>>>> = Box::pin(body);
let res = response(StatusCode::OK).body(body)?;
Ok(res)
}
}

View File

@@ -99,3 +99,4 @@ impl Convable for hyper::Error {}
impl Convable for std::array::TryFromSliceError {}
impl Convable for err::anyhow::Error {}
impl Convable for crate::RetrievalError {}
impl Convable for httpclient::Error {}

View File

@@ -1,11 +1,16 @@
use crate::body_empty;
use crate::body_string;
use crate::err::Error;
use crate::response;
use crate::Requ;
use crate::RespFull;
use futures_util::select;
use futures_util::FutureExt;
use http::Method;
use http::StatusCode;
use hyper::Body;
use hyper::Client;
use httpclient::connect_client;
use httpclient::read_body_bytes;
use hyper::body::Incoming;
use hyper::Request;
use hyper::Response;
use netpod::log::*;
@@ -34,24 +39,14 @@ struct GatherHost {
inst: String,
}
async fn process_answer(res: Response<Body>) -> Result<JsonValue, Error> {
let (pre, mut body) = res.into_parts();
async fn process_answer(res: Response<hyper::body::Incoming>) -> Result<JsonValue, Error> {
let (pre, body) = res.into_parts();
if pre.status != StatusCode::OK {
use hyper::body::HttpBody;
if let Some(c) = body.data().await {
let c: bytes::Bytes = c?;
let s1 = String::from_utf8(c.to_vec())?;
Ok(JsonValue::String(format!(
"status {} body {}",
pre.status.as_str(),
s1
)))
} else {
Ok(JsonValue::String(format!("status {}", pre.status.as_str())))
}
let buf = read_body_bytes(body).await?;
let s = String::from_utf8(buf.to_vec())?;
Ok(JsonValue::String(format!("status {} body {}", pre.status.as_str(), s)))
} else {
let body: hyper::Body = body;
let body_all = hyper::body::to_bytes(body).await?;
let body_all = read_body_bytes(body).await?;
let val = match serde_json::from_slice(&body_all) {
Ok(k) => k,
Err(_e) => JsonValue::String(String::from_utf8(body_all.to_vec())?),
@@ -60,62 +55,9 @@ async fn process_answer(res: Response<Body>) -> Result<JsonValue, Error> {
}
}
pub async fn unused_gather_json_from_hosts(req: Request<Body>, pathpre: &str) -> Result<Response<Body>, Error> {
let (part_head, part_body) = req.into_parts();
let bodyslice = hyper::body::to_bytes(part_body).await?;
let gather_from: GatherFrom = serde_json::from_slice(&bodyslice)?;
let mut spawned = vec![];
let uri = part_head.uri;
let path_post = &uri.path()[pathpre.len()..];
for gh in gather_from.hosts {
let uri = format!("http://{}:{}/{}", gh.host, gh.port, path_post);
let req = Request::builder().method(Method::GET).uri(uri);
let req = if gh.inst.len() > 0 {
req.header("retrieval_instance", &gh.inst)
} else {
req
};
let req = req.header(http::header::ACCEPT, APP_JSON);
let req = req.body(Body::empty());
let task = tokio::spawn(async move {
select! {
_ = sleep(Duration::from_millis(1500)).fuse() => {
Err(Error::with_msg_no_trace(format!("timeout")))
}
res = Client::new().request(req?).fuse() => Ok(process_answer(res?).await?)
}
});
spawned.push((gh.clone(), task));
}
#[derive(Serialize)]
struct Hres {
gh: GatherHost,
res: JsonValue,
}
#[derive(Serialize)]
struct Jres {
hosts: Vec<Hres>,
}
let mut a = vec![];
for tr in spawned {
let res = match tr.1.await {
Ok(k) => match k {
Ok(k) => k,
Err(e) => JsonValue::String(format!("ERROR({:?})", e)),
},
Err(e) => JsonValue::String(format!("ERROR({:?})", e)),
};
a.push(Hres { gh: tr.0, res });
}
let res = response(StatusCode::OK)
.header(http::header::CONTENT_TYPE, APP_JSON)
.body(serde_json::to_string(&Jres { hosts: a })?.into())?;
Ok(res)
}
pub async fn gather_get_json(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
pub async fn gather_get_json(req: Requ, node_config: &NodeConfigCached) -> Result<RespFull, Error> {
let (head, body) = req.into_parts();
let _bodyslice = hyper::body::to_bytes(body).await?;
let _bodyslice = read_body_bytes(body).await?;
let pathpre = "/api/4/gather/";
let pathsuf = &head.uri.path()[pathpre.len()..];
let spawned: Vec<_> = node_config
@@ -123,20 +65,35 @@ pub async fn gather_get_json(req: Request<Body>, node_config: &NodeConfigCached)
.cluster
.nodes
.iter()
.map(|node| {
.filter_map(|node| {
let uri = format!("http://{}:{}/api/4/{}", node.host, node.port, pathsuf);
let req = Request::builder().method(Method::GET).uri(uri);
let req = req.header(http::header::ACCEPT, APP_JSON);
let req = req.body(Body::empty());
let task = tokio::spawn(async move {
select! {
_ = sleep(Duration::from_millis(1500)).fuse() => {
Err(Error::with_msg_no_trace(format!("timeout")))
}
res = Client::new().request(req?).fuse() => Ok(process_answer(res?).await?)
match req.body(body_empty()) {
Ok(req) => {
let task = tokio::spawn(async move {
select! {
_ = sleep(Duration::from_millis(1500)).fuse() => {
Err(Error::with_msg_no_trace(format!("timeout")))
}
res = async move {
let mut client = if let Ok(x) = connect_client(req.uri()).await {x}
else { return Err(Error::with_msg("can not make request")); };
let res = if let Ok(x) = client.send_request(req).await { x }
else { return Err(Error::with_msg("can not make request")); };
Ok(res)
}.fuse() => {
Ok(process_answer(res?).await?)
}
}
});
Some((node.clone(), task))
}
});
(node.clone(), task)
Err(e) => {
error!("bad request: {e}");
None
}
}
})
.collect();
#[derive(Serialize)]
@@ -148,7 +105,7 @@ pub async fn gather_get_json(req: Request<Body>, node_config: &NodeConfigCached)
struct Jres {
hosts: Vec<Hres>,
}
let mut a = vec![];
let mut a = Vec::new();
for (node, jh) in spawned {
let res = match jh.await {
Ok(k) => match k {
@@ -182,7 +139,7 @@ pub struct SubRes<T> {
pub async fn gather_get_json_generic<SM, NT, FT, OUT>(
_method: http::Method,
urls: Vec<Url>,
bodies: Vec<Option<Body>>,
bodies: Vec<Option<String>>,
tags: Vec<String>,
nt: NT,
ft: FT,
@@ -192,7 +149,7 @@ pub async fn gather_get_json_generic<SM, NT, FT, OUT>(
) -> Result<OUT, Error>
where
SM: Send + 'static,
NT: Fn(String, Response<Body>) -> Pin<Box<dyn Future<Output = Result<SubRes<SM>, Error>> + Send>>
NT: Fn(String, Response<Incoming>) -> Pin<Box<dyn Future<Output = Result<SubRes<SM>, Error>> + Send>>
+ Send
+ Sync
+ Copy
@@ -211,7 +168,7 @@ where
.into_iter()
.zip(bodies.into_iter())
.zip(tags.into_iter())
.map(move |((url, body), tag)| {
.filter_map(move |((url, body), tag)| {
info!("Try gather from {}", url);
let url_str = url.as_str();
let req = if body.is_some() {
@@ -226,29 +183,43 @@ where
req
};
let body = match body {
None => Body::empty(),
Some(body) => body,
None => body_empty(),
Some(body) => body_string(body),
};
let req = req.body(body);
let tag2 = tag.clone();
let jh = tokio::spawn(async move {
select! {
_ = sleep(timeout + extra_timeout).fuse() => {
error!("PROXY TIMEOUT");
Err(Error::with_msg_no_trace(format!("timeout")))
}
res = {
let client = Client::new();
client.request(req?).fuse()
} => {
info!("received result in time");
let ret = nt(tag2, res?).await?;
info!("transformed result in time");
Ok(ret)
}
match req.body(body) {
Ok(req) => {
let tag2 = tag.clone();
let jh = tokio::spawn(async move {
select! {
_ = sleep(timeout + extra_timeout).fuse() => {
error!("PROXY TIMEOUT");
Err(Error::with_msg_no_trace(format!("timeout")))
}
res = async move {
let mut client = match connect_client(req.uri()).await {
Ok(x) => x,
Err(e) => return Err(Error::from_to_string(e)),
};
let res = match client.send_request(req).await {
Ok(x) => x,
Err(e) => return Err(Error::from_to_string(e)),
};
Ok(res)
}.fuse() => {
info!("received result in time");
let ret = nt(tag2, res?).await?;
info!("transformed result in time");
Ok(ret)
}
}
});
Some((url, tag, jh))
}
});
(url, tag, jh)
Err(e) => {
error!("bad request: {e}");
None
}
}
})
.collect();
let mut a = Vec::new();

View File

@@ -1,11 +1,13 @@
pub mod api1;
pub mod api4;
pub mod bodystream;
pub mod cache;
pub mod channel_status;
pub mod channelconfig;
pub mod download;
pub mod err;
pub mod gather;
#[cfg(DISABLED)]
pub mod prometheus;
pub mod proxy;
pub mod pulsemap;
@@ -17,18 +19,19 @@ use crate::err::Error;
use crate::gather::gather_get_json;
use ::err::thiserror;
use ::err::ThisError;
use bytes::Bytes;
use futures_util::Future;
use futures_util::FutureExt;
use futures_util::StreamExt;
use http::Method;
use http::StatusCode;
use hyper::server::conn::AddrStream;
use hyper::server::Server;
use hyper::service::make_service_fn;
use http_body_util::combinators::BoxBody;
use http_body_util::Full;
use hyper::body::Incoming;
use hyper::service::service_fn;
use hyper::Body;
use hyper::Request;
use hyper::Response;
use hyper_util::rt::TokioIo;
use net::SocketAddr;
use netpod::log::*;
use netpod::query::prebinned::PreBinnedQuery;
@@ -56,6 +59,7 @@ use std::time::SystemTime;
use task::Context;
use task::Poll;
use taskrun::tokio;
use taskrun::tokio::net::TcpListener;
pub const PSI_DAQBUFFER_SERVICE_MARK: &'static str = "PSI-Daqbuffer-Service-Mark";
pub const PSI_DAQBUFFER_SEEN_URL: &'static str = "PSI-Daqbuffer-Seen-Url";
@@ -82,6 +86,7 @@ impl IntoBoxedError for net::AddrParseError {}
impl IntoBoxedError for tokio::task::JoinError {}
impl IntoBoxedError for api4::databuffer_tools::FindActiveError {}
impl IntoBoxedError for std::string::FromUtf8Error {}
impl IntoBoxedError for std::io::Error {}
impl<E> From<E> for RetrievalError
where
@@ -118,6 +123,23 @@ pub fn accepts_octets(hm: &http::HeaderMap) -> bool {
}
}
pub type Requ = Request<Incoming>;
pub type RespFull = Response<Full<Bytes>>;
use http_body_util::BodyExt;
use httpclient::BodyBox;
use httpclient::RespBox;
pub fn body_empty() -> BodyBox {
Full::new(Bytes::new()).map_err(Into::into).boxed()
}
pub fn body_string<S: ToString>(body: S) -> BodyBox {
Full::new(Bytes::from(body.to_string())).map_err(Into::into).boxed()
}
pub type StreamBody = Pin<Box<dyn futures_util::Stream<Item = Result<hyper::body::Frame<Bytes>, ::err::Error>>>>;
pub async fn host(node_config: NodeConfigCached, service_version: ServiceVersion) -> Result<(), RetrievalError> {
static STATUS_BOARD_INIT: Once = Once::new();
STATUS_BOARD_INIT.call_once(|| {
@@ -126,48 +148,64 @@ pub async fn host(node_config: NodeConfigCached, service_version: ServiceVersion
let x = Box::new(a);
STATUS_BOARD.store(Box::into_raw(x), Ordering::SeqCst);
});
#[cfg(DISABLED)]
if let Some(bind) = node_config.node.prometheus_api_bind {
tokio::spawn(prometheus::host(bind));
}
// let rawjh = taskrun::spawn(nodenet::conn::events_service(node_config.clone()));
use std::str::FromStr;
let addr = SocketAddr::from_str(&format!("{}:{}", node_config.node.listen(), node_config.node.port))?;
let make_service = make_service_fn({
move |conn: &AddrStream| {
debug!("new connection from {:?}", conn.remote_addr());
let node_config = node_config.clone();
let addr = conn.remote_addr();
let service_version = service_version.clone();
async move {
let ret = service_fn(move |req| {
// TODO send to logstash
info!(
"http-request {:?} - {:?} - {:?} - {:?}",
addr,
req.method(),
req.uri(),
req.headers()
);
let f = http_service(req, node_config.clone(), service_version.clone());
Cont { f: Box::pin(f) }
});
Ok::<_, Error>(ret)
let bind_addr = SocketAddr::from_str(&format!("{}:{}", node_config.node.listen(), node_config.node.port))?;
let listener = TcpListener::bind(bind_addr).await?;
loop {
let (stream, addr) = listener.accept().await?;
debug!("new connection from {addr}");
let node_config = node_config.clone();
let service_version = service_version.clone();
let io = TokioIo::new(stream);
tokio::task::spawn(async move {
let res = hyper::server::conn::http1::Builder::new()
.serve_connection(
io,
service_fn(move |req| the_service_fn(req, addr, node_config.clone(), service_version.clone())),
)
.await;
match res {
Ok(()) => {}
Err(e) => {
error!("{e}");
}
}
}
});
Server::bind(&addr)
.serve(make_service)
.await
.map(|e| RetrievalError::TextError(format!("{e:?}")))?;
});
}
// rawjh.await??;
Ok(())
}
async fn http_service(
req: Request<Body>,
async fn the_service_fn(
req: Requ,
addr: SocketAddr,
node_config: NodeConfigCached,
service_version: ServiceVersion,
) -> Result<Response<Body>, Error> {
) -> Result<Response<RespBox>, Error> {
info!(
"http-request {:?} - {:?} - {:?} - {:?}",
addr,
req.method(),
req.uri(),
req.headers()
);
let f = http_service(req, node_config, service_version).await;
// Cont { f: Box::pin(f) }
f
}
async fn http_service(
req: Requ,
node_config: NodeConfigCached,
service_version: ServiceVersion,
) -> Result<Response<RespBox>, Error> {
match http_service_try(req, &node_config, &service_version).await {
Ok(k) => Ok(k),
Err(e) => {
@@ -244,7 +282,7 @@ impl ReqCtx {
}
// TODO remove because I want error bodies to be json.
pub fn response_err<T>(status: StatusCode, msg: T) -> Result<Response<Body>, RetrievalError>
pub fn response_err<T>(status: StatusCode, msg: T) -> Result<RespFull, RetrievalError>
where
T: AsRef<str>,
{
@@ -257,7 +295,7 @@ where
),
msg.as_ref()
);
let ret = response(status).body(Body::from(msg))?;
let ret = response(status).body(Full::new(msg))?;
Ok(ret)
}
@@ -267,7 +305,7 @@ macro_rules! static_http {
let c = include_bytes!(concat!("../static/documentation/", $tgtex));
let ret = response(StatusCode::OK)
.header("content-type", $ctype)
.body(Body::from(&c[..]))?;
.body(Full::new(&c[..]))?;
return Ok(ret);
}
};
@@ -276,7 +314,7 @@ macro_rules! static_http {
let c = include_bytes!(concat!("../static/documentation/", $tgt));
let ret = response(StatusCode::OK)
.header("content-type", $ctype)
.body(Body::from(&c[..]))?;
.body(Full::new(&c[..]))?;
return Ok(ret);
}
};
@@ -288,7 +326,7 @@ macro_rules! static_http_api1 {
let c = include_bytes!(concat!("../static/documentation/", $tgtex));
let ret = response(StatusCode::OK)
.header("content-type", $ctype)
.body(Body::from(&c[..]))?;
.body(Full::new(&c[..]))?;
return Ok(ret);
}
};
@@ -297,17 +335,17 @@ macro_rules! static_http_api1 {
let c = include_bytes!(concat!("../static/documentation/", $tgt));
let ret = response(StatusCode::OK)
.header("content-type", $ctype)
.body(Body::from(&c[..]))?;
.body(Full::new(&c[..]))?;
return Ok(ret);
}
};
}
async fn http_service_try(
req: Request<Body>,
req: Requ,
node_config: &NodeConfigCached,
service_version: &ServiceVersion,
) -> Result<Response<Body>, Error> {
) -> Result<RespBox, Error> {
use http::HeaderValue;
let mut urlmarks = Vec::new();
urlmarks.push(format!("{}:{}", req.method(), req.uri()));
@@ -317,7 +355,7 @@ async fn http_service_try(
urlmarks.push(s.into());
}
}
let ctx = ReqCtx::with_node(&req, node_config);
let ctx = ReqCtx::with_node(&req, &node_config);
let mut res = http_service_inner(req, &ctx, node_config, service_version).await?;
let hm = res.headers_mut();
hm.append("Access-Control-Allow-Origin", "*".parse().unwrap());
@@ -334,11 +372,11 @@ async fn http_service_try(
}
async fn http_service_inner(
req: Request<Body>,
req: Requ,
ctx: &ReqCtx,
node_config: &NodeConfigCached,
service_version: &ServiceVersion,
) -> Result<Response<Body>, RetrievalError> {
) -> Result<RespBox, RetrievalError> {
let uri = req.uri().clone();
let path = uri.path();
if path == "/api/4/private/version" {
@@ -350,9 +388,9 @@ async fn http_service_inner(
"patch": service_version.patch,
},
});
Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&ret)?))?)
Ok(response(StatusCode::OK).body(body_string(serde_json::to_string(&ret)?))?)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?)
}
} else if path.starts_with("/api/4/private/logtest/") {
if req.method() == Method::GET {
@@ -366,12 +404,24 @@ async fn http_service_inner(
warn!("test warn log output");
} else if path.ends_with("/error") {
error!("test error log output");
} else if path.ends_with("/mixed") {
warn!("test warn log output");
let sp_info = span!(Level::INFO, "sp_info", f1 = "v1");
sp_info.in_scope(|| {
warn!("test warn log output in sp_info");
info!("test info log output in sp_info");
let sp_debug = span!(Level::DEBUG, "sp_debug", f1 = "v1");
sp_debug.in_scope(|| {
info!("test info log output in sp_info:sp_debug");
debug!("test debug log output in sp_info:sp_debug");
});
});
} else {
error!("test unknown log output");
}
Ok(response(StatusCode::OK).body(Body::empty())?)
Ok(response(StatusCode::OK).body(body_empty())?)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?)
}
} else if let Some(h) = api4::eventdata::EventDataHandler::handler(&req) {
Ok(h.handle(req, ctx, &node_config, service_version)
@@ -413,43 +463,43 @@ async fn http_service_inner(
if req.method() == Method::GET {
Ok(prebinned(req, ctx, &node_config).await?)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?)
}
} else if path == "/api/4/random/channel" {
if req.method() == Method::GET {
Ok(random_channel(req, ctx, &node_config).await?)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?)
}
} else if path.starts_with("/api/4/gather/") {
if req.method() == Method::GET {
Ok(gather_get_json(req, &node_config).await?)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?)
}
} else if path == "/api/4/clear_cache" {
if req.method() == Method::GET {
Ok(clear_cache_all(req, ctx, &node_config).await?)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?)
}
} else if path == "/api/4/update_db_with_channel_names" {
if req.method() == Method::GET {
Ok(update_db_with_channel_names(req, ctx, &node_config).await?)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?)
}
} else if path == "/api/4/update_db_with_all_channel_configs" {
if req.method() == Method::GET {
Ok(update_db_with_all_channel_configs(req, ctx, &node_config).await?)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?)
}
} else if path == "/api/4/update_search_cache" {
if req.method() == Method::GET {
Ok(update_search_cache(req, ctx, &node_config).await?)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?)
}
} else if let Some(h) = download::DownloadHandler::handler(&req) {
Ok(h.handle(req, &node_config).await?)
@@ -479,13 +529,13 @@ async fn http_service_inner(
if req.method() == Method::GET {
api_1_docs(path)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Full::new(Bytes::new()))?)
}
} else if path.starts_with("/api/4/documentation/") {
if req.method() == Method::GET {
api_4_docs(path)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Full::new(Bytes::new()))?)
}
} else {
use std::fmt::Write;
@@ -502,29 +552,29 @@ async fn http_service_inner(
write!(out, "HEADER {hn:?}: {hv:?}<br>\n")?;
}
write!(out, "</pre>\n")?;
Ok(response(StatusCode::NOT_FOUND).body(Body::from(body))?)
Ok(response(StatusCode::NOT_FOUND).body(body_string(body))?)
}
}
pub fn api_4_docs(path: &str) -> Result<Response<Body>, RetrievalError> {
pub fn api_4_docs(path: &str) -> Result<RespFull, RetrievalError> {
static_http!(path, "", "api4.html", "text/html");
static_http!(path, "style.css", "text/css");
static_http!(path, "script.js", "text/javascript");
static_http!(path, "status-main.html", "text/html");
Ok(response(StatusCode::NOT_FOUND).body(Body::empty())?)
Ok(response(StatusCode::NOT_FOUND).body(body_empty())?)
}
pub fn api_1_docs(path: &str) -> Result<Response<Body>, RetrievalError> {
pub fn api_1_docs(path: &str) -> Result<RespFull, RetrievalError> {
static_http_api1!(path, "", "api1.html", "text/html");
static_http_api1!(path, "style.css", "text/css");
static_http_api1!(path, "script.js", "text/javascript");
Ok(response(StatusCode::NOT_FOUND).body(Body::empty())?)
Ok(response(StatusCode::NOT_FOUND).body(body_empty())?)
}
pub struct StatusBoardAllHandler {}
impl StatusBoardAllHandler {
pub fn handler(req: &Request<Body>) -> Option<Self> {
pub fn handler(req: &Requ) -> Option<Self> {
if req.uri().path() == "/api/4/status/board/all" {
Some(Self {})
} else {
@@ -532,38 +582,26 @@ impl StatusBoardAllHandler {
}
}
pub async fn handle(
&self,
_req: Request<Body>,
_node_config: &NodeConfigCached,
) -> Result<Response<Body>, RetrievalError> {
pub async fn handle(&self, _req: Requ, _node_config: &NodeConfigCached) -> Result<RespBox, RetrievalError> {
use std::ops::Deref;
let sb = status_board().unwrap();
let buf = serde_json::to_vec(sb.deref()).unwrap();
let res = response(StatusCode::OK).body(Body::from(buf))?;
let buf = serde_json::to_string(sb.deref()).unwrap();
let res = response(StatusCode::OK).body(body_string(buf))?;
Ok(res)
}
}
async fn prebinned(
req: Request<Body>,
ctx: &ReqCtx,
node_config: &NodeConfigCached,
) -> Result<Response<Body>, RetrievalError> {
async fn prebinned(req: Requ, ctx: &ReqCtx, node_config: &NodeConfigCached) -> Result<RespBox, RetrievalError> {
match prebinned_inner(req, ctx, node_config).await {
Ok(ret) => Ok(ret),
Err(e) => {
error!("fn prebinned: {e:?}");
Ok(response(StatusCode::BAD_REQUEST).body(Body::from(format!("[prebinned-error]")))?)
Ok(response(StatusCode::BAD_REQUEST).body(body_string(format!("[prebinned-error]")))?)
}
}
}
async fn prebinned_inner(
req: Request<Body>,
_ctx: &ReqCtx,
_node_config: &NodeConfigCached,
) -> Result<Response<Body>, RetrievalError> {
async fn prebinned_inner(req: Requ, _ctx: &ReqCtx, _node_config: &NodeConfigCached) -> Result<RespBox, RetrievalError> {
let (head, _body) = req.into_parts();
let url: url::Url = format!("dummy://{}", head.uri).parse()?;
let query = PreBinnedQuery::from_url(&url)?;
@@ -576,22 +614,14 @@ async fn prebinned_inner(
todo!()
}
async fn random_channel(
req: Request<Body>,
_ctx: &ReqCtx,
node_config: &NodeConfigCached,
) -> Result<Response<Body>, RetrievalError> {
async fn random_channel(req: Requ, _ctx: &ReqCtx, node_config: &NodeConfigCached) -> Result<RespBox, RetrievalError> {
let (_head, _body) = req.into_parts();
let ret = dbconn::random_channel(node_config).await?;
let ret = response(StatusCode::OK).body(Body::from(ret))?;
let ret = response(StatusCode::OK).body(body_string(ret))?;
Ok(ret)
}
async fn clear_cache_all(
req: Request<Body>,
_ctx: &ReqCtx,
node_config: &NodeConfigCached,
) -> Result<Response<Body>, RetrievalError> {
async fn clear_cache_all(req: Requ, _ctx: &ReqCtx, node_config: &NodeConfigCached) -> Result<RespBox, RetrievalError> {
let (head, _body) = req.into_parts();
let dry = match head.uri.query() {
Some(q) => q.contains("dry"),
@@ -600,15 +630,15 @@ async fn clear_cache_all(
let res = disk::cache::clear_cache_all(node_config, dry).await?;
let ret = response(StatusCode::OK)
.header(http::header::CONTENT_TYPE, APP_JSON)
.body(Body::from(serde_json::to_string(&res)?))?;
.body(body_string(serde_json::to_string(&res)?))?;
Ok(ret)
}
async fn update_db_with_channel_names(
req: Request<Body>,
req: Requ,
_ctx: &ReqCtx,
node_config: &NodeConfigCached,
) -> Result<Response<Body>, RetrievalError> {
) -> Result<RespBox, RetrievalError> {
info!("httpret::update_db_with_channel_names");
let (head, _body) = req.into_parts();
let _dry = match head.uri.query() {
@@ -635,7 +665,7 @@ async fn update_db_with_channel_names(
let p = serde_json::to_string(&e)?;
let res = response(StatusCode::OK)
.header(http::header::CONTENT_TYPE, APP_JSON_LINES)
.body(Body::from(p))?;
.body(body_string(p))?;
Ok(res)
}
}
@@ -643,10 +673,10 @@ async fn update_db_with_channel_names(
#[allow(unused)]
async fn update_db_with_channel_names_3(
req: Request<Body>,
req: Requ,
_ctx: &ReqCtx,
node_config: &NodeConfigCached,
) -> Result<Response<Body>, RetrievalError> {
) -> Result<RespBox, RetrievalError> {
let (head, _body) = req.into_parts();
let _dry = match head.uri.query() {
Some(q) => q.contains("dry"),
@@ -666,10 +696,10 @@ async fn update_db_with_channel_names_3(
}
async fn update_db_with_all_channel_configs(
req: Request<Body>,
req: Requ,
_ctx: &ReqCtx,
node_config: &NodeConfigCached,
) -> Result<Response<Body>, RetrievalError> {
) -> Result<RespBox, RetrievalError> {
let (head, _body) = req.into_parts();
let _dry = match head.uri.query() {
Some(q) => q.contains("dry"),
@@ -689,10 +719,10 @@ async fn update_db_with_all_channel_configs(
}
async fn update_search_cache(
req: Request<Body>,
req: Requ,
_ctx: &ReqCtx,
node_config: &NodeConfigCached,
) -> Result<Response<Body>, RetrievalError> {
) -> Result<RespBox, RetrievalError> {
let (head, _body) = req.into_parts();
let _dry = match head.uri.query() {
Some(q) => q.contains("dry"),
@@ -701,7 +731,7 @@ async fn update_search_cache(
let res = dbconn::scan::update_search_cache(node_config).await?;
let ret = response(StatusCode::OK)
.header(http::header::CONTENT_TYPE, APP_JSON)
.body(Body::from(serde_json::to_string(&res)?))?;
.body(body_string(serde_json::to_string(&res)?))?;
Ok(ret)
}

View File

@@ -5,6 +5,7 @@ use http::Method;
use http::Request;
use http::Response;
use http::StatusCode;
use httpclient::read_body_bytes;
use hyper::server::conn::AddrStream;
use hyper::service::make_service_fn;
use hyper::service::service_fn;
@@ -276,7 +277,7 @@ impl QueryHandler {
info!("{} for {:?}", std::any::type_name::<Self>(), req);
let url = url::Url::parse(&format!("dummy://{}", &req.uri()));
info!("/api/v1/query parsed url: {:?}", url);
let body = hyper::body::to_bytes(req.into_body()).await?;
let body = read_body_bytes(req.into_body()).await?;
let body_str = String::from_utf8_lossy(&body);
info!("/api/v1/query body_str: {:?}", body_str);
let formurl = url::Url::parse(&format!("dummy:///?{}", body_str));
@@ -308,7 +309,7 @@ impl QueryRangeHandler {
info!("{} for {:?}", std::any::type_name::<Self>(), req);
let url = url::Url::parse(&format!("dummy://{}", &req.uri()));
info!("/api/v1/query_range parsed url: {:?}", url);
let body = hyper::body::to_bytes(req.into_body()).await?;
let body = read_body_bytes(req.into_body()).await?;
let body_str = String::from_utf8_lossy(&body);
info!("/api/v1/query_range body_str: {:?}", body_str);
let formurl = url::Url::parse(&format!("dummy:///?{}", body_str));

View File

@@ -12,7 +12,6 @@ use http::Method;
use http::Request;
use http::Response;
use http::StatusCode;
use hyper::Body;
use netpod::log::*;
use netpod::ChannelSearchQuery;
use netpod::ChannelSearchResult;

View File

@@ -1,7 +1,10 @@
use crate::body_empty;
use crate::body_string;
use crate::cache::Cache;
use crate::err::Error;
use crate::response;
use async_channel::Receiver;
use async_channel::Sender;
use crate::Requ;
use crate::RespFull;
use bytes::Buf;
use bytes::BufMut;
use bytes::BytesMut;
@@ -10,12 +13,13 @@ use chrono::Utc;
use futures_util::stream::FuturesOrdered;
use futures_util::stream::FuturesUnordered;
use futures_util::FutureExt;
use http::header;
use http::Method;
use http::StatusCode;
use http::Uri;
use hyper::Body;
use httpclient::connect_client;
use httpclient::read_body_bytes;
use hyper::Request;
use hyper::Response;
use netpod::log::*;
use netpod::timeunits::SEC;
use netpod::AppendToUrl;
@@ -36,7 +40,6 @@ use std::pin::Pin;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::Mutex;
use std::task::Context;
use std::task::Poll;
use std::time::Duration;
@@ -50,114 +53,6 @@ use tokio::task::JoinHandle;
use tokio::time::error::Elapsed;
use url::Url;
struct Dummy;
enum CachePortal<V> {
Fresh,
Existing(Receiver<Dummy>),
Known(V),
}
impl<V> CachePortal<V> {}
enum CacheEntry<V> {
Waiting(SystemTime, Sender<Dummy>, Receiver<Dummy>),
Known(SystemTime, V),
}
impl<V> CacheEntry<V> {
fn ts(&self) -> &SystemTime {
match self {
CacheEntry::Waiting(ts, _, _) => ts,
CacheEntry::Known(ts, _) => ts,
}
}
}
struct CacheInner<K, V> {
map: BTreeMap<K, CacheEntry<V>>,
}
impl<K, V> CacheInner<K, V>
where
K: Ord,
{
const fn new() -> Self {
Self { map: BTreeMap::new() }
}
fn housekeeping(&mut self) {
if self.map.len() > 200 {
info!("trigger housekeeping with len {}", self.map.len());
let mut v: Vec<_> = self.map.iter().map(|(k, v)| (v.ts(), k)).collect();
v.sort();
let ts0 = v[v.len() / 2].0.clone();
//let tsnow = SystemTime::now();
//let tscut = tsnow.checked_sub(Duration::from_secs(60 * 10)).unwrap_or(tsnow);
self.map.retain(|_k, v| v.ts() >= &ts0);
info!("housekeeping kept len {}", self.map.len());
}
}
}
struct Cache<K, V> {
inner: Mutex<CacheInner<K, V>>,
}
impl<K, V> Cache<K, V>
where
K: Ord,
V: Clone,
{
const fn new() -> Self {
Self {
inner: Mutex::new(CacheInner::new()),
}
}
fn housekeeping(&self) {
let mut g = self.inner.lock().unwrap();
g.housekeeping();
}
fn portal(&self, key: K) -> CachePortal<V> {
use std::collections::btree_map::Entry;
let mut g = self.inner.lock().unwrap();
g.housekeeping();
match g.map.entry(key) {
Entry::Vacant(e) => {
let (tx, rx) = async_channel::bounded(16);
let ret = CachePortal::Fresh;
let v = CacheEntry::Waiting(SystemTime::now(), tx, rx);
e.insert(v);
ret
}
Entry::Occupied(e) => match e.get() {
CacheEntry::Waiting(_ts, _tx, rx) => CachePortal::Existing(rx.clone()),
CacheEntry::Known(_ts, v) => CachePortal::Known(v.clone()),
},
}
}
fn set_value(&self, key: K, val: V) {
let mut g = self.inner.lock().unwrap();
if let Some(e) = g.map.get_mut(&key) {
match e {
CacheEntry::Waiting(ts, tx, _rx) => {
let tx = tx.clone();
*e = CacheEntry::Known(*ts, val);
tx.close();
}
CacheEntry::Known(_ts, _val) => {
error!("set_value already known");
}
}
} else {
error!("set_value no entry for key");
}
}
}
static CACHE: Cache<u64, u64> = Cache::new();
pub struct MapPulseHisto {
@@ -176,26 +71,25 @@ const API_4_MAP_PULSE_URL_PREFIX: &'static str = "/api/4/map/pulse/";
const MAP_PULSE_LOCAL_TIMEOUT: Duration = Duration::from_millis(8000);
const MAP_PULSE_QUERY_TIMEOUT: Duration = Duration::from_millis(10000);
async fn make_tables(node_config: &NodeConfigCached) -> Result<(), Error> {
let conn = dbconn::create_connection(&node_config.node_config.cluster.database).await?;
async fn make_tables(pgc: &dbconn::pg::Client) -> Result<(), Error> {
let sql = "set client_min_messages = 'warning'";
conn.execute(sql, &[]).await?;
pgc.execute(sql, &[]).await?;
let sql = "create table if not exists map_pulse_channels (name text, tbmax int)";
conn.execute(sql, &[]).await?;
pgc.execute(sql, &[]).await?;
let sql = "create table if not exists map_pulse_files (channel text not null, split int not null, timebin int not null, closed int not null default 0, pulse_min int8 not null, pulse_max int8 not null)";
conn.execute(sql, &[]).await?;
pgc.execute(sql, &[]).await?;
let sql = "create unique index if not exists map_pulse_files_ix1 on map_pulse_files (channel, split, timebin)";
conn.execute(sql, &[]).await?;
pgc.execute(sql, &[]).await?;
let sql = "alter table map_pulse_files add if not exists upc1 int not null default 0";
conn.execute(sql, &[]).await?;
pgc.execute(sql, &[]).await?;
let sql = "alter table map_pulse_files add if not exists hostname text not null default ''";
conn.execute(sql, &[]).await?;
pgc.execute(sql, &[]).await?;
let sql = "alter table map_pulse_files add if not exists ks int not null default 2";
conn.execute(sql, &[]).await?;
pgc.execute(sql, &[]).await?;
let sql = "create index if not exists map_pulse_files_ix2 on map_pulse_files (hostname)";
conn.execute(sql, &[]).await?;
pgc.execute(sql, &[]).await?;
let sql = "set client_min_messages = 'notice'";
conn.execute(sql, &[]).await?;
pgc.execute(sql, &[]).await?;
Ok(())
}
@@ -223,12 +117,13 @@ fn timer_channel_names() -> Vec<String> {
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
enum MapfilePath {
Scalar(PathBuf),
Index(PathBuf, PathBuf),
Index(PathBuf),
}
async fn datafiles_for_channel(name: String, node_config: &NodeConfigCached) -> Result<Vec<MapfilePath>, Error> {
async fn datafiles_for_channel(name: &str, node_config: &NodeConfigCached) -> Result<Vec<MapfilePath>, Error> {
let mut a = Vec::new();
let sfc = node_config.node.sf_databuffer.as_ref().unwrap();
let data_base_path = &sfc.data_base_path;
let channel_path = sfc
.data_base_path
.join(format!("{}_2", sfc.ksprefix))
@@ -252,34 +147,42 @@ async fn datafiles_for_channel(name: String, node_config: &NodeConfigCached) ->
}
Err(e) => match e.kind() {
std::io::ErrorKind::NotFound => {
let channel_path = sfc
.data_base_path
.join(format!("{}_3", sfc.ksprefix))
.join("byTime")
.join(&name);
match tokio::fs::read_dir(&channel_path).await {
Ok(mut rd) => {
while let Ok(Some(entry)) = rd.next_entry().await {
let mut rd2 = tokio::fs::read_dir(entry.path()).await?;
while let Ok(Some(e2)) = rd2.next_entry().await {
let mut rd3 = tokio::fs::read_dir(e2.path()).await?;
while let Ok(Some(e3)) = rd3.next_entry().await {
if e3.file_name().to_string_lossy().ends_with("_00000_Data_Index") {
let fns = e3.file_name().to_string_lossy().to_string();
let path_data = e3.path().parent().unwrap().join(&fns[..fns.len() - 6]);
let x = MapfilePath::Index(e3.path(), path_data);
a.push(x);
}
}
}
files_recursive(name, &data_base_path, &sfc.ksprefix, 3, "_00000_Data_Index").await
}
_ => return Err(e)?,
},
}
}
async fn files_recursive(
name: &str,
data_base_path: &Path,
ksprefix: &str,
ks: u32,
data_file_suffix: &str,
) -> Result<Vec<MapfilePath>, Error> {
let mut a = Vec::new();
let channel_path = data_base_path
.join(format!("{}_{}", ksprefix, ks))
.join("byTime")
.join(&name);
match tokio::fs::read_dir(&channel_path).await {
Ok(mut rd) => {
while let Ok(Some(entry)) = rd.next_entry().await {
let mut rd2 = tokio::fs::read_dir(entry.path()).await?;
while let Ok(Some(e2)) = rd2.next_entry().await {
let mut rd3 = tokio::fs::read_dir(e2.path()).await?;
while let Ok(Some(e3)) = rd3.next_entry().await {
if e3.file_name().to_string_lossy().ends_with(data_file_suffix) {
let x = MapfilePath::Index(e3.path());
a.push(x);
}
Ok(a)
}
Err(e) => match e.kind() {
_ => return Err(e)?,
},
}
}
Ok(a)
}
Err(e) => match e.kind() {
_ => return Err(e)?,
},
}
@@ -490,7 +393,7 @@ async fn read_chunk_at(mut file: File, pos: u64, chunk_len: Option<u64>) -> Resu
pub struct IndexFullHttpFunction {}
impl IndexFullHttpFunction {
pub fn handler(req: &Request<Body>) -> Option<Self> {
pub fn handler(req: &Requ) -> Option<Self> {
if req.uri().path().eq("/api/1/map/index/full") {
Some(Self {})
} else {
@@ -498,42 +401,90 @@ impl IndexFullHttpFunction {
}
}
pub async fn handle(&self, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
pub async fn handle(&self, req: Requ, node_config: &NodeConfigCached) -> Result<RespFull, Error> {
if req.method() != Method::GET {
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?);
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?);
}
let ret = match Self::index(false, node_config).await {
Ok(msg) => response(StatusCode::OK).body(Body::from(msg))?,
Err(e) => response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::from(format!("{:?}", e)))?,
Ok(msg) => response(StatusCode::OK).body(body_string(msg))?,
Err(e) => {
error!("IndexFullHttpFunction {e}");
response(StatusCode::INTERNAL_SERVER_ERROR).body(body_string(format!("{:?}", e)))?
}
};
Ok(ret)
}
pub async fn index_channel(
channel_name: String,
conn: &dbconn::pg::Client,
async fn index(do_print: bool, node_config: &NodeConfigCached) -> Result<String, Error> {
// TODO avoid double-insert on central storage.
let mut msg = format!("LOG");
let pgc = dbconn::create_connection(&node_config.node_config.cluster.database).await?;
// TODO remove update of static columns when older clients are removed.
let sql = "insert into map_pulse_files (channel, split, timebin, pulse_min, pulse_max, hostname, ks) values ($1, $2, $3, $4, $5, $6, $7) on conflict (channel, split, timebin) do update set pulse_min = $4, pulse_max = $5, upc1 = map_pulse_files.upc1 + 1, hostname = $6";
let insert_01 = pgc.prepare(sql).await?;
make_tables(&pgc).await?;
let chs = timer_channel_names();
for channel_name in chs {
match Self::index_channel(&channel_name, &pgc, do_print, &insert_01, node_config).await {
Ok(m) => {
msg.push_str("\n");
msg.push_str(&m);
}
Err(e) => {
error!("error while indexing {} {:?}", channel_name, e);
//return Err(e);
}
}
}
Ok(msg)
}
async fn index_channel(
channel_name: &str,
pgc: &dbconn::pg::Client,
do_print: bool,
insert_01: &dbconn::pg::Statement,
node_config: &NodeConfigCached,
) -> Result<String, Error> {
let mut msg = format!("Index channel {}", channel_name);
let files = datafiles_for_channel(channel_name.clone(), node_config).await?;
let files = datafiles_for_channel(channel_name, node_config).await?;
let mut files = files;
files.sort();
let files = files;
msg = format!("{}\n{:?}", msg, files);
let mut latest_pair = (0, 0);
let n1 = files.len().min(3);
let m1 = files.len() - n1;
for ch in &files[m1..] {
trace!(" index over {:?}", ch);
}
for mp in files[m1..].into_iter() {
let files_from = files.len() - files.len().min(2);
for mp in files[files_from..].into_iter() {
match mp {
MapfilePath::Scalar(path) => {
trace!("Scalar {path:?}");
let splitted: Vec<_> = path.to_str().unwrap().split("/").collect();
let timebin: u64 = splitted[splitted.len() - 3].parse()?;
let split: u64 = splitted[splitted.len() - 2].parse()?;
let file = tokio::fs::OpenOptions::new().read(true).open(&path).await?;
if false {
// Not needed, we any use only the last N files.
// TODO the timebin unit depends on the keyspace.
// In worst case could depend on the current channel config, and could have changed
// at each config change. That would be madness. Luckily, it seems always 1d for ks 2 and 3.
let timebin_dt = Duration::from_secs(60 * 60 * 24 * timebin);
let timebin_ts = SystemTime::UNIX_EPOCH.checked_add(timebin_dt).unwrap();
let tsnow = SystemTime::now();
if timebin_ts + Duration::from_secs(60 * 60 * 24 * 2) < tsnow {
debug!("FILTER PAST {timebin} {path:?}");
} else if timebin_ts > tsnow + Duration::from_secs(60 * 60 * 24 * 2) {
debug!("FILTER FUTU {timebin} {path:?}");
} else {
debug!("KEEP TIMEBI {timebin} {path:?}");
}
}
let file = match tokio::fs::OpenOptions::new().read(true).open(&path).await {
Ok(x) => x,
Err(e) => {
let e = Error::with_msg_no_trace(format!("MapfilePath::Scalar {e} {path:?}"));
error!("{e}");
return Err(e);
}
};
let (r2, file) = read_first_chunk(file).await?;
msg = format!("{}\n{:?}", msg, r2);
if let Some(r2) = r2 {
@@ -543,10 +494,8 @@ impl IndexFullHttpFunction {
if r3.pulse > latest_pair.0 {
latest_pair = (r3.pulse, r3.ts);
}
// TODO remove update of static columns when older clients are removed.
let sql = "insert into map_pulse_files (channel, split, timebin, pulse_min, pulse_max, hostname) values ($1, $2, $3, $4, $5, $6) on conflict (channel, split, timebin) do update set pulse_min = $4, pulse_max = $5, upc1 = map_pulse_files.upc1 + 1, hostname = $6";
conn.execute(
sql,
pgc.execute(
insert_01,
&[
&channel_name,
&(split as i32),
@@ -554,21 +503,42 @@ impl IndexFullHttpFunction {
&(r2.pulse as i64),
&(r3.pulse as i64),
&node_config.node.host,
&(2 as i32),
],
)
.await?;
} else {
warn!("could not find last event chunk in {path:?}");
}
} else {
warn!("could not find first event chunk in {path:?}");
}
}
MapfilePath::Index(path_index, path_data) => {
MapfilePath::Index(path_index) => {
trace!("Index {path_index:?}");
let path_data = {
let fns = path_index.file_name().unwrap().to_str().unwrap();
path_index.parent().unwrap().join(&fns[..fns.len() - 6])
};
let splitted: Vec<_> = path_index.to_str().unwrap().split("/").collect();
let timebin: u64 = splitted[splitted.len() - 3].parse()?;
let split: u64 = splitted[splitted.len() - 2].parse()?;
let file_index = tokio::fs::OpenOptions::new().read(true).open(&path_index).await?;
let file_data = tokio::fs::OpenOptions::new().read(true).open(&path_data).await?;
let file_index = match tokio::fs::OpenOptions::new().read(true).open(&path_index).await {
Ok(x) => x,
Err(e) => {
let e = Error::with_msg_no_trace(format!("MapfilePath::Index {e} {path_index:?}"));
error!("{e}");
return Err(e);
}
};
let file_data = match tokio::fs::OpenOptions::new().read(true).open(&path_data).await {
Ok(x) => x,
Err(e) => {
let e = Error::with_msg_no_trace(format!("MapfilePath::Index {e} {path_data:?}"));
error!("{e}");
return Err(e);
}
};
let (r2, file_index, file_data) = read_first_index_chunk(file_index, file_data).await?;
msg = format!("{}\n{:?}", msg, r2);
if let Some(r2) = r2 {
@@ -578,10 +548,8 @@ impl IndexFullHttpFunction {
if r3.pulse > latest_pair.0 {
latest_pair = (r3.pulse, r3.ts);
}
// TODO remove update of static columns when older clients are removed.
let sql = "insert into map_pulse_files (channel, split, timebin, pulse_min, pulse_max, hostname, ks) values ($1, $2, $3, $4, $5, $6, 3) on conflict (channel, split, timebin) do update set pulse_min = $4, pulse_max = $5, upc1 = map_pulse_files.upc1 + 1, hostname = $6";
conn.execute(
sql,
pgc.execute(
insert_01,
&[
&channel_name,
&(split as i32),
@@ -589,12 +557,15 @@ impl IndexFullHttpFunction {
&(r2.pulse as i64),
&(r3.pulse as i64),
&node_config.node.host,
&(3 as i32),
],
)
.await?;
} else {
warn!("could not find last index chunk in {path_index:?}");
}
} else {
warn!("could not find first event chunk in {path_index:?}");
warn!("could not find first index chunk in {path_index:?}");
}
}
}
@@ -609,27 +580,6 @@ impl IndexFullHttpFunction {
}
Ok(msg)
}
pub async fn index(do_print: bool, node_config: &NodeConfigCached) -> Result<String, Error> {
// TODO avoid double-insert on central storage.
let mut msg = format!("LOG");
make_tables(node_config).await?;
let conn = dbconn::create_connection(&node_config.node_config.cluster.database).await?;
let chs = timer_channel_names();
for channel_name in &chs[..] {
match Self::index_channel(channel_name.clone(), &conn, do_print, node_config).await {
Ok(m) => {
msg.push_str("\n");
msg.push_str(&m);
}
Err(e) => {
error!("error while indexing {} {:?}", channel_name, e);
//return Err(e);
}
}
}
Ok(msg)
}
}
pub struct UpdateTaskGuard {
@@ -728,7 +678,7 @@ impl UpdateTask {
/// Returns a guard which must be kept alive as long as the service should run.
/// Should instead of this use a system-timer and call the rest api.
#[allow(unused)]
fn new(node_config: NodeConfigCached) -> UpdateTaskGuard {
fn _new(node_config: NodeConfigCached) -> UpdateTaskGuard {
let do_abort = Arc::new(AtomicUsize::default());
let task = Self {
do_abort: do_abort.clone(),
@@ -950,7 +900,7 @@ impl MapPulseScyllaHandler {
"/api/4/scylla/map/pulse/"
}
pub fn handler(req: &Request<Body>) -> Option<Self> {
pub fn handler(req: &Requ) -> Option<Self> {
if req.uri().path().starts_with(Self::prefix()) {
Some(Self {})
} else {
@@ -958,9 +908,9 @@ impl MapPulseScyllaHandler {
}
}
pub async fn handle(&self, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
pub async fn handle(&self, req: Requ, node_config: &NodeConfigCached) -> Result<RespFull, Error> {
if req.method() != Method::GET {
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?);
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?);
}
let urls = format!("dummy://{}", req.uri());
let url = url::Url::parse(&urls)?;
@@ -995,14 +945,14 @@ impl MapPulseScyllaHandler {
channels.push(ch.into());
}
let ret = LocalMap { pulse, tss, channels };
Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&ret)?))?)
Ok(response(StatusCode::OK).body(body_string(serde_json::to_string(&ret)?))?)
}
}
pub struct MapPulseLocalHttpFunction {}
impl MapPulseLocalHttpFunction {
pub fn handler(req: &Request<Body>) -> Option<Self> {
pub fn handler(req: &Requ) -> Option<Self> {
if req.uri().path().starts_with(MAP_PULSE_LOCAL_URL_PREFIX) {
Some(Self {})
} else {
@@ -1010,9 +960,9 @@ impl MapPulseLocalHttpFunction {
}
}
pub async fn handle(&self, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
pub async fn handle(&self, req: Requ, node_config: &NodeConfigCached) -> Result<RespFull, Error> {
if req.method() != Method::GET {
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?);
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?);
}
let urls = req.uri().to_string();
let pulse: u64 = urls[MAP_PULSE_LOCAL_URL_PREFIX.len()..]
@@ -1042,9 +992,6 @@ impl MapPulseLocalHttpFunction {
dt.as_secs_f32() * 1e3
);
}
//let mut msg = String::new();
//use std::fmt::Write;
//write!(&mut msg, "cands: {:?}\n", cands)?;
let mut futs = FuturesUnordered::new();
for (ch, hostname, tb, sp, ks) in cands {
futs.push(Self::search(pulse, ch, hostname, tb, sp, ks, node_config));
@@ -1067,7 +1014,7 @@ impl MapPulseLocalHttpFunction {
}
}
let ret = LocalMap { pulse, tss, channels };
Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&ret)?))?)
Ok(response(StatusCode::OK).body(body_string(serde_json::to_string(&ret)?))?)
}
async fn search(
@@ -1088,57 +1035,68 @@ impl MapPulseLocalHttpFunction {
ch
);
if ks == 2 {
match disk::paths::data_path_tb(ks, &ch, tb, 86400000, sp, &node_config.node) {
Ok(path) => {
//write!(&mut msg, "data_path_tb: {:?}\n", path)?;
match search_pulse(pulse, &path).await {
Ok(ts) => {
//write!(&mut msg, "SEARCH: {:?} for {}\n", ts, pulse)?;
if let Some(ts) = ts {
info!("Found in ks {} sp {} tb {} ch {} ts {}", ks, sp, tb, ch, ts);
Ok(Some((ts, ch)))
} else {
Ok(None)
}
}
Err(e) => {
warn!("can not map pulse with {ch} {sp} {tb} {e}");
return Err(e);
match disk::paths::data_path_tb(ks, &ch, tb, 1000 * 60 * 60 * 24, sp, &node_config.node) {
Ok(path) => match search_pulse(pulse, &path).await {
Ok(ts) => {
if let Some(ts) = ts {
info!("pulse {pulse} found in ks {ks} sp {sp} tb {tb} ch {ch} ts {ts}");
Ok(Some((ts, ch)))
} else {
Ok(None)
}
}
}
Err(e) => {
let e = Error::with_msg_no_trace(format!(
"pulse {pulse} can not map ks {ks} sp {sp} tb {tb} ch {ch} {e}"
));
error!("{e}");
return Err(e);
}
},
Err(e) => {
warn!("can not get path to files {ch} {e}");
return Err(e)?;
let e = Error::with_msg_no_trace(format!(
"pulse {pulse} no path ks {ks} sp {sp} tb {tb} ch {ch} {e}"
));
error!("{e}");
return Err(e);
}
}
} else if ks == 3 {
match disk::paths::data_path_tb(ks, &ch, tb, 86400000, sp, &node_config.node) {
Ok(path) => {
//write!(&mut msg, "data_path_tb: {:?}\n", path)?;
match search_index_pulse(pulse, &path).await {
Ok(ts) => {
//write!(&mut msg, "SEARCH: {:?} for {}\n", ts, pulse)?;
if let Some(ts) = ts {
info!("Found in ks {} sp {} tb {} ch {} ts {}", ks, sp, tb, ch, ts);
Ok(Some((ts, ch)))
} else {
Ok(None)
}
}
Err(e) => {
warn!("can not map pulse with {ch} {sp} {tb} {e}");
return Err(e);
match disk::paths::data_path_tb(ks, &ch, tb, 1000 * 60 * 60 * 24, sp, &node_config.node) {
Ok(path) => match search_index_pulse(pulse, &path).await {
Ok(ts) => {
if let Some(ts) = ts {
info!(
"pulse {} found in ks {} sp {} tb {} ch {} ts {}",
pulse, ks, sp, tb, ch, ts
);
Ok(Some((ts, ch)))
} else {
Ok(None)
}
}
}
Err(e) => {
let e = Error::with_msg_no_trace(format!(
"pulse {pulse} can not map ks {ks} sp {sp} tb {tb} ch {ch} {e}"
));
error!("{e}");
return Err(e);
}
},
Err(e) => {
warn!("can not get path to files {ch} {e}");
return Err(e)?;
let e = Error::with_msg_no_trace(format!(
"pulse {pulse} no path ks {ks} sp {sp} tb {tb} ch {ch} {e}"
));
error!("{e}");
return Err(e);
}
}
} else {
return Err(Error::with_msg_no_trace(format!("bad keyspace {ks}")));
let e = Error::with_msg_no_trace(format!(
"pulse {pulse} bad keyspace ks {ks} sp {sp} tb {tb} ch {ch}"
));
error!("{e}");
return Err(e);
}
}
}
@@ -1153,7 +1111,7 @@ pub struct TsHisto {
pub struct MapPulseHistoHttpFunction {}
impl MapPulseHistoHttpFunction {
pub fn handler(req: &Request<Body>) -> Option<Self> {
pub fn handler(req: &Requ) -> Option<Self> {
if req.uri().path().starts_with(MAP_PULSE_HISTO_URL_PREFIX) {
Some(Self {})
} else {
@@ -1161,14 +1119,14 @@ impl MapPulseHistoHttpFunction {
}
}
pub async fn handle(&self, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
pub async fn handle(&self, req: Requ, node_config: &NodeConfigCached) -> Result<RespFull, Error> {
if req.method() != Method::GET {
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?);
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?);
}
let urls = format!("{}", req.uri());
let pulse: u64 = urls[MAP_PULSE_HISTO_URL_PREFIX.len()..].parse()?;
let ret = Self::histo(pulse, node_config).await?;
Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&ret)?))?)
Ok(response(StatusCode::OK).body(body_string(serde_json::to_string(&ret)?))?)
}
pub async fn histo(pulse: u64, node_config: &NodeConfigCached) -> Result<TsHisto, Error> {
@@ -1179,12 +1137,21 @@ impl MapPulseHistoHttpFunction {
node.host, node.port, MAP_PULSE_LOCAL_URL_PREFIX, pulse
);
let uri: Uri = s.parse()?;
let req = Request::get(uri)
let req = Request::get(&uri)
.header(header::HOST, uri.host().unwrap())
.header("x-req-from", &node_config.node.host)
.body(Body::empty())?;
let fut = hyper::Client::new().request(req);
//let fut = hyper::Client::new().get(uri);
let fut = tokio::time::timeout(MAP_PULSE_LOCAL_TIMEOUT, fut);
.body(body_empty())?;
let fut = async move {
match connect_client(req.uri()).await {
Ok(mut client) => {
let fut = client.send_request(req);
tokio::time::timeout(MAP_PULSE_LOCAL_TIMEOUT, fut)
.await
.map(|x| x.map_err(Error::from_to_string))
}
Err(e) => Ok(Err(Error::from_to_string(e))),
}
};
futs.push_back(fut);
}
use futures_util::stream::StreamExt;
@@ -1192,7 +1159,7 @@ impl MapPulseHistoHttpFunction {
while let Some(futres) = futs.next().await {
match futres {
Ok(res) => match res {
Ok(res) => match hyper::body::to_bytes(res.into_body()).await {
Ok(res) => match read_body_bytes(res.into_body()).await {
Ok(body) => match serde_json::from_slice::<LocalMap>(&body) {
Ok(lm) => {
for ts in lm.tss {
@@ -1227,6 +1194,7 @@ impl MapPulseHistoHttpFunction {
tss: map.keys().map(|j| *j).collect(),
counts: map.values().map(|j| *j).collect(),
};
info!("pulse {pulse} histo {ret:?}");
Ok(ret)
}
}
@@ -1234,7 +1202,7 @@ impl MapPulseHistoHttpFunction {
pub struct MapPulseHttpFunction {}
impl MapPulseHttpFunction {
pub fn handler(req: &Request<Body>) -> Option<Self> {
pub fn handler(req: &Requ) -> Option<Self> {
if req.uri().path().starts_with(MAP_PULSE_URL_PREFIX) {
Some(Self {})
} else {
@@ -1242,16 +1210,16 @@ impl MapPulseHttpFunction {
}
}
pub async fn handle(&self, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
pub async fn handle(&self, req: Requ, node_config: &NodeConfigCached) -> Result<RespFull, Error> {
use crate::cache::CachePortal;
if req.method() != Method::GET {
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?);
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?);
}
trace!("MapPulseHttpFunction handle uri: {:?}", req.uri());
let urls = format!("{}", req.uri());
let pulse: u64 = urls[MAP_PULSE_URL_PREFIX.len()..].parse()?;
match CACHE.portal(pulse) {
CachePortal::Fresh => {
trace!("value not yet in cache pulse {pulse}");
let histo = MapPulseHistoHttpFunction::histo(pulse, node_config).await?;
let mut i1 = 0;
let mut max = 0;
@@ -1264,9 +1232,9 @@ impl MapPulseHttpFunction {
if max > 0 {
let val = histo.tss[i1];
CACHE.set_value(pulse, val);
Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&val)?))?)
Ok(response(StatusCode::OK).body(body_string(serde_json::to_string(&val)?))?)
} else {
Ok(response(StatusCode::NO_CONTENT).body(Body::empty())?)
Ok(response(StatusCode::NO_CONTENT).body(body_empty())?)
}
}
CachePortal::Existing(rx) => {
@@ -1274,30 +1242,27 @@ impl MapPulseHttpFunction {
match rx.recv().await {
Ok(_) => {
error!("should never recv from existing operation pulse {pulse}");
Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())?)
Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(body_empty())?)
}
Err(_e) => {
trace!("woken up while value wait pulse {pulse}");
match CACHE.portal(pulse) {
CachePortal::Known(val) => {
info!("good, value after wakeup pulse {pulse}");
Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&val)?))?)
}
CachePortal::Fresh => {
error!("woken up, but portal fresh pulse {pulse}");
Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())?)
}
CachePortal::Existing(..) => {
error!("woken up, but portal existing pulse {pulse}");
Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())?)
}
Err(_e) => match CACHE.portal(pulse) {
CachePortal::Known(ts) => {
info!("pulse {pulse} known from cache ts {ts}");
Ok(response(StatusCode::OK).body(body_string(serde_json::to_string(&ts)?))?)
}
}
CachePortal::Fresh => {
error!("pulse {pulse} woken up, but fresh");
Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(body_empty())?)
}
CachePortal::Existing(..) => {
error!("pulse {pulse} woken up, but existing");
Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(body_empty())?)
}
},
}
}
CachePortal::Known(val) => {
trace!("value already in cache pulse {pulse} ts {val}");
Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&val)?))?)
CachePortal::Known(ts) => {
info!("pulse {pulse} in cache ts {ts}");
Ok(response(StatusCode::OK).body(body_string(serde_json::to_string(&ts)?))?)
}
}
}
@@ -1306,7 +1271,7 @@ impl MapPulseHttpFunction {
pub struct Api4MapPulseHttpFunction {}
impl Api4MapPulseHttpFunction {
pub fn handler(req: &Request<Body>) -> Option<Self> {
pub fn handler(req: &Requ) -> Option<Self> {
if req.uri().path().starts_with(API_4_MAP_PULSE_URL_PREFIX) {
Some(Self {})
} else {
@@ -1319,6 +1284,7 @@ impl Api4MapPulseHttpFunction {
}
pub async fn find_timestamp(q: MapPulseQuery, ncc: &NodeConfigCached) -> Result<Option<u64>, Error> {
use crate::cache::CachePortal;
let pulse = q.pulse;
let res = match CACHE.portal(pulse) {
CachePortal::Fresh => {
@@ -1377,20 +1343,20 @@ impl Api4MapPulseHttpFunction {
res
}
pub async fn handle(&self, req: Request<Body>, ncc: &NodeConfigCached) -> Result<Response<Body>, Error> {
pub async fn handle(&self, req: Requ, ncc: &NodeConfigCached) -> Result<RespFull, Error> {
if req.method() != Method::GET {
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?);
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?);
}
let ts1 = Instant::now();
trace!("Api4MapPulseHttpFunction handle uri: {:?}", req.uri());
let url = Url::parse(&format!("dummy:{}", req.uri()))?;
let q = MapPulseQuery::from_url(&url)?;
let ret = match Self::find_timestamp(q, ncc).await {
Ok(Some(val)) => Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&val)?))?),
Ok(None) => Ok(response(StatusCode::NO_CONTENT).body(Body::empty())?),
Ok(Some(val)) => Ok(response(StatusCode::OK).body(body_string(serde_json::to_string(&val)?))?),
Ok(None) => Ok(response(StatusCode::NO_CONTENT).body(body_empty())?),
Err(e) => {
error!("find_timestamp {e}");
Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())?)
Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(body_empty())?)
}
};
let ts2 = Instant::now();
@@ -1416,7 +1382,7 @@ impl Api4MapPulse2HttpFunction {
"/api/4/map/pulse-v2/"
}
pub fn handler(req: &Request<Body>) -> Option<Self> {
pub fn handler(req: &Requ) -> Option<Self> {
if req.uri().path().starts_with(Self::path_prefix()) {
Some(Self {})
} else {
@@ -1428,9 +1394,9 @@ impl Api4MapPulse2HttpFunction {
path.starts_with(Self::path_prefix())
}
pub async fn handle(&self, req: Request<Body>, ncc: &NodeConfigCached) -> Result<Response<Body>, Error> {
pub async fn handle(&self, req: Requ, ncc: &NodeConfigCached) -> Result<RespFull, Error> {
if req.method() != Method::GET {
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?);
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?);
}
let ts1 = Instant::now();
let url = Url::parse(&format!("dummy:{}", req.uri()))?;
@@ -1446,12 +1412,12 @@ impl Api4MapPulse2HttpFunction {
.format(DATETIME_FMT_9MS)
.to_string();
let res = Api4MapPulse2Response { sec, ns, datetime };
Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&res)?))?)
Ok(response(StatusCode::OK).body(body_string(serde_json::to_string(&res)?))?)
}
Ok(None) => Ok(response(StatusCode::NO_CONTENT).body(Body::empty())?),
Ok(None) => Ok(response(StatusCode::NO_CONTENT).body(body_empty())?),
Err(e) => {
error!("find_timestamp {e}");
Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())?)
Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(body_empty())?)
}
};
let ts2 = Instant::now();
@@ -1466,7 +1432,7 @@ impl Api4MapPulse2HttpFunction {
pub struct MarkClosedHttpFunction {}
impl MarkClosedHttpFunction {
pub fn handler(req: &Request<Body>) -> Option<Self> {
pub fn handler(req: &Requ) -> Option<Self> {
if req.uri().path().starts_with(MAP_PULSE_MARK_CLOSED_URL_PREFIX) {
Some(Self {})
} else {
@@ -1474,19 +1440,19 @@ impl MarkClosedHttpFunction {
}
}
pub async fn handle(&self, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
pub async fn handle(&self, req: Requ, node_config: &NodeConfigCached) -> Result<RespFull, Error> {
if req.method() != Method::GET {
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?);
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?);
}
info!("MarkClosedHttpFunction handle uri: {:?}", req.uri());
match MarkClosedHttpFunction::mark_closed(node_config).await {
Ok(_) => {
let ret = response(StatusCode::OK).body(Body::empty())?;
let ret = response(StatusCode::OK).body(body_empty())?;
Ok(ret)
}
Err(e) => {
let msg = format!("{:?}", e);
let ret = response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::from(msg))?;
let ret = response(StatusCode::INTERNAL_SERVER_ERROR).body(body_string(msg))?;
Ok(ret)
}
}

View File

@@ -1,8 +1,8 @@
use crate::err::Error;
use crate::response;
use http::header;
use http::Method;
use http::StatusCode;
use hyper::Body;
use hyper::Request;
use hyper::Response;
use netpod::log::*;
@@ -29,7 +29,7 @@ impl SettingsThreadsMaxHandler {
let (head, body) = req.into_parts();
let accept = head
.headers
.get(http::header::ACCEPT)
.get(header::ACCEPT)
.map_or(Ok(ACCEPT_ALL), |k| k.to_str())
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?
.to_owned();
@@ -39,7 +39,7 @@ impl SettingsThreadsMaxHandler {
error!("{e}");
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?);
}
let body = hyper::body::to_bytes(body).await?;
let body = httpclient::read_body_bytes(body).await?;
//let threads_max: usize = head.uri.path()[Self::path_prefix().len()..].parse()?;
let threads_max: usize = String::from_utf8_lossy(&body).parse()?;
info!("threads_max {threads_max}");

View File

@@ -1091,7 +1091,7 @@ mod test_frame {
#[test]
fn events_serialize() {
taskrun::tracing_init().unwrap();
taskrun::tracing_init_testing().unwrap();
let mut events = EventsDim0::empty();
events.push(123, 234, 55f32);
let events = events;

View File

@@ -5,7 +5,7 @@ authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"
[dependencies]
tokio = { version = "1.21.2", features = ["io-util", "net", "time", "sync", "fs"] }
tokio = { version = "1.34", features = ["io-util", "net", "time", "sync", "fs"] }
futures-util = "0.3.15"
pin-project = "1.0.12"
serde = { version = "1.0", features = ["derive"] }

View File

@@ -18,6 +18,7 @@ use items_2::frame::make_term_frame;
use netpod::log::*;
use netpod::Cluster;
use netpod::Node;
use netpod::APP_OCTET;
use query::api4::events::EventsSubQuery;
use query::api4::events::Frame1Parts;
use serde::de::DeserializeOwned;
@@ -63,32 +64,29 @@ pub async fn x_processed_event_blobs_stream_from_node_http(
use http::Request;
use httpclient::http;
use httpclient::hyper;
use hyper::Body;
use hyper::StatusCode;
let frame1 = make_node_command_frame(subq.clone())?;
let item = sitem_data(frame1.clone());
let buf = item.make_frame()?;
let buf = item.make_frame()?.freeze();
let url = node.baseurl().join("/api/4/private/eventdata/frames").unwrap();
debug!("open_event_data_streams_http post {url}");
let req = Request::builder()
.method(Method::POST)
.uri(url.to_string())
.header(header::ACCEPT, "application/octet-stream")
.body(Body::from(buf.to_vec()))
.header(header::ACCEPT, APP_OCTET)
.body(httpclient::Full::new(buf))
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
let client = hyper::Client::new();
let mut client = httpclient::connect_client(req.uri()).await?;
let res = client
.request(req)
.send_request(req)
.await
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
if res.status() != StatusCode::OK {
error!("Server error {:?}", res);
let (head, body) = res.into_parts();
let buf = hyper::body::to_bytes(body)
.await
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
let buf = httpclient::read_body_bytes(body).await?;
let s = String::from_utf8_lossy(&buf);
return Err(Error::with_msg(format!(
concat!(
@@ -101,7 +99,7 @@ pub async fn x_processed_event_blobs_stream_from_node_http(
)));
}
let (_head, body) = res.into_parts();
let frames = InMemoryFrameStream::new(body, subq.inmem_bufcap());
let frames = InMemoryFrameStream::new(httpclient::IncomingStream::new(body), subq.inmem_bufcap());
let frames = Box::pin(frames);
let stream = EventsFromFrames::new(frames, url.to_string());
debug!("open_event_data_streams_http done {url}");
@@ -165,31 +163,28 @@ where
use http::Request;
use httpclient::http;
use httpclient::hyper;
use hyper::Body;
use hyper::StatusCode;
let item = sitem_data(frame1.clone());
let buf = item.make_frame()?;
let buf = item.make_frame()?.freeze();
let url = node.baseurl().join("/api/4/private/eventdata/frames").unwrap();
debug!("open_event_data_streams_http post {url}");
let req = Request::builder()
.method(Method::POST)
.uri(url.to_string())
.header(header::ACCEPT, "application/octet-stream")
.body(Body::from(buf.to_vec()))
.header(header::ACCEPT, APP_OCTET)
.body(httpclient::Full::new(buf))
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
let client = hyper::Client::new();
let mut client = httpclient::connect_client(req.uri()).await?;
let res = client
.request(req)
.send_request(req)
.await
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
if res.status() != StatusCode::OK {
error!("Server error {:?}", res);
let (head, body) = res.into_parts();
let buf = hyper::body::to_bytes(body)
.await
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
let buf = httpclient::read_body_bytes(body).await?;
let s = String::from_utf8_lossy(&buf);
return Err(Error::with_msg(format!(
concat!(
@@ -202,7 +197,7 @@ where
)));
}
let (_head, body) = res.into_parts();
let frames = InMemoryFrameStream::new(body, subq.inmem_bufcap());
let frames = InMemoryFrameStream::new(httpclient::IncomingStream::new(body), subq.inmem_bufcap());
let frames = Box::pin(frames);
let stream = EventsFromFrames::<T>::new(frames, url.to_string());
debug!("open_event_data_streams_http done {url}");

View File

@@ -10,10 +10,11 @@ path = "src/taskrun.rs"
[dependencies]
tokio = { version = "1.32.0", features = ["full", "tracing", "time"] }
futures-util = "0.3.28"
tracing = "0.1.37"
tracing = "0.1.40"
tracing-log = "0.2.0"
tracing-subscriber = { version = "0.3.17", features = ["fmt", "time"] }
#tracing-loki = { version = "0.2.1", default-features = false, features = ["compat-0-2-1"] }
console-subscriber = { version = "0.1.10" }
console-subscriber = { version = "0.2.0" }
time = { version = "0.3", features = ["formatting"] }
backtrace = "0.3.56"
lazy_static = "1.4.0"

View File

@@ -0,0 +1,123 @@
use std::fmt;
use time::format_description::well_known::Rfc3339;
use time::OffsetDateTime;
use tracing::Event;
use tracing::Subscriber;
use tracing_log::NormalizeEvent;
use tracing_subscriber::fmt::format::Writer;
use tracing_subscriber::fmt::FmtContext;
use tracing_subscriber::fmt::FormatEvent;
use tracing_subscriber::fmt::FormatFields;
use tracing_subscriber::fmt::FormattedFields;
use tracing_subscriber::registry::LookupSpan;
fn _dummyyyy() {
let _ = tracing_subscriber::fmt::format::Full;
}
pub struct FormatTxt;
impl<S, N> FormatEvent<S, N> for FormatTxt
where
S: Subscriber + for<'a> LookupSpan<'a>,
N: for<'a> FormatFields<'a> + 'static,
{
fn format_event(&self, ctx: &FmtContext<'_, S, N>, mut writer: Writer<'_>, event: &Event<'_>) -> fmt::Result {
let normalized_meta = event.normalized_metadata();
let meta = normalized_meta.as_ref().unwrap_or_else(|| event.metadata());
// Without tracing-log:
// let meta = event.metadata();
// write!(w, "{}", datetime::DateTime::from(std::time::SystemTime::now()));
// write!(writer, "{} ", FmtLevel::new(meta.level()))?;
// Using crate `time` doing `DateTime<somehow-utc>.format_into(..)`
// tracing_subscriber::fmt::time::datetime is private:
// tracing_subscriber::fmt::time::datetime::DateTime::from(std::time::SystemTime::now());
if false {
// TODO restrict to milliseconds.
// TODO there must be a better way than via cursor?
let tsnow = OffsetDateTime::now_utc();
let buf = [0u8; 64];
let mut cr = std::io::Cursor::new(buf);
let n = tsnow.format_into(&mut cr, &Rfc3339).unwrap();
let buf = cr.into_inner();
writer.write_str(std::str::from_utf8(&buf[..n]).unwrap())?;
// writer.write_char(' ')?;
}
if true {
const DATETIME_FMT_3MS: &str = "%Y-%m-%dT%H:%M:%S.%3fZ";
let ts = chrono::Utc::now();
let tsfmt = ts.format(DATETIME_FMT_3MS);
writer.write_str(&tsfmt.to_string())?;
// writer.write_char(' ')?;
}
write!(writer, " {:>5} ", meta.level().as_str())?;
writer.write_str("[THR ")?;
let current_thread = std::thread::current();
match current_thread.name() {
Some(name) => {
let n = name.len();
let max = 14;
if n > max {
writer.write_str(&name[0..2])?;
writer.write_char('.')?;
writer.write_str(&name[name.len() + 3 - max..])?;
} else {
writer.write_str(name)?;
}
}
None => {
// write!(writer, "{:0>2?} ", current_thread.id())?;
write!(writer, "{:?} ", current_thread.id())?;
}
}
writer.write_char(' ')?;
writer.write_str("[TGT ")?;
writer.write_str(meta.target())?;
writer.write_char(' ')?;
writer.write_str("[SCP ")?;
if let Some(sc) = ctx.event_scope() {
for (i, span) in sc.from_root().enumerate() {
if i != 0 {
writer.write_char(',')?;
}
let meta = span.metadata();
writer.write_str(meta.name())?;
let ext = span.extensions();
if let Some(fields) = ext.get::<FormattedFields<N>>() {
if fields.is_empty() {
} else {
writer.write_char('{')?;
writer.write_str(fields)?;
// write!(writer, "{{{}}}", fields)?;
writer.write_char('}')?;
}
}
}
}
writer.write_char(' ')?;
if false {
writer.write_str("[FIL ")?;
if let Some(x) = meta.file() {
writer.write_str(x)?;
if let Some(x) = meta.line() {
write!(writer, ":{x}")?;
}
}
writer.write_char(' ')?;
}
writer.write_str("[MSG ")?;
ctx.format_fields(writer.by_ref(), event)?;
writer.write_char('\n')?;
Ok(())
}
}

View File

@@ -1,3 +1,5 @@
pub mod formatter;
pub use tokio;
use crate::log::*;
@@ -23,7 +25,7 @@ pub fn get_runtime() -> Arc<Runtime> {
get_runtime_opts(24, 128)
}
// #[allow(unused)]
#[allow(unused)]
fn on_thread_start() {
let old = panic::take_hook();
panic::set_hook(Box::new(move |info| {
@@ -86,7 +88,7 @@ where
E: fmt::Display,
{
let runtime = get_runtime();
match tracing_init() {
match tracing_init(TracingMode::Development) {
Ok(_) => {}
Err(()) => {
eprintln!("ERROR tracing: can not init");
@@ -102,7 +104,7 @@ where
}
}
fn tracing_init_inner() -> Result<(), Error> {
fn tracing_init_inner(mode: TracingMode) -> Result<(), Error> {
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::Layer;
@@ -110,7 +112,12 @@ fn tracing_init_inner() -> Result<(), Error> {
let timer = tracing_subscriber::fmt::time::UtcTime::new(
time::format_description::parse(fmtstr).map_err(|e| format!("{e}"))?,
);
if true {
if let TracingMode::Console = mode {
// Only async console
console_subscriber::init();
} else {
// #[cfg(DISABLED)]
// Logging setup
let filter = tracing_subscriber::EnvFilter::builder()
.with_default_directive(tracing::metadata::LevelFilter::INFO.into())
.from_env()
@@ -121,6 +128,7 @@ fn tracing_init_inner() -> Result<(), Error> {
.with_target(true)
.with_ansi(false)
.with_thread_names(true)
.event_format(formatter::FormatTxt)
.with_filter(filter);
let reg = tracing_subscriber::registry();
@@ -191,11 +199,21 @@ fn tracing_init_inner() -> Result<(), Error> {
Ok(())
}
pub fn tracing_init() -> Result<(), ()> {
pub enum TracingMode {
Production,
Development,
Console,
}
pub fn tracing_init_testing() -> Result<(), ()> {
tracing_init(TracingMode::Development)
}
pub fn tracing_init(mode: TracingMode) -> Result<(), ()> {
match INIT_TRACING_ONCE.lock() {
Ok(mut initg) => {
if *initg == 0 {
match tracing_init_inner() {
match tracing_init_inner(mode) {
Ok(_) => {
*initg = 1;
}