Update http deps

This commit is contained in:
Dominik Werder
2023-12-06 17:03:56 +01:00
parent 1b3e9ebd2a
commit c887db1b3d
28 changed files with 1251 additions and 926 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,6 @@
[package]
name = "daqbuffer"
version = "0.4.5-alpha.0"
version = "0.5.0-alpha.0"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"

View File

@@ -1,11 +1,12 @@
use crate::err::ErrConv;
use bytes::Bytes;
use chrono::DateTime;
use chrono::Utc;
use disk::streamlog::Streamlog;
use err::Error;
use futures_util::TryStreamExt;
use http::StatusCode;
use http::Uri;
use httpclient::body_empty;
use items_0::streamitem::StreamItem;
use netpod::log::*;
use netpod::query::CacheUsage;
@@ -22,10 +23,12 @@ use url::Url;
pub async fn status(host: String, port: u16) -> Result<(), Error> {
let t1 = Utc::now();
let uri = format!("http://{}:{}/api/4/node_status", host, port,);
let req = hyper::Request::builder()
let uri: Uri = uri.parse()?;
let req = http::Request::builder()
.method(http::Method::GET)
.header(http::header::HOST, uri.host().unwrap())
.uri(uri)
.body(httpclient::Full::new(Bytes::new()))?;
.body(body_empty())?;
let mut client = httpclient::connect_client(req.uri()).await?;
let res = client.send_request(req).await?;
if res.status() != StatusCode::OK {
@@ -69,11 +72,13 @@ pub async fn get_binned(
let mut url = Url::parse(&format!("http://{}:{}/api/4/binned", hp.host, hp.port))?;
query.append_to_url(&mut url);
let url = url;
let req = hyper::Request::builder()
let uri: Uri = url.as_str().parse()?;
let req = http::Request::builder()
.method(http::Method::GET)
.uri(url.to_string())
.header(http::header::HOST, uri.host().unwrap())
.header(http::header::ACCEPT, APP_OCTET)
.body(httpclient::Full::new(Bytes::new()))
.uri(uri)
.body(body_empty())
.ec()?;
let mut client = httpclient::connect_client(req.uri()).await?;
let res = client.send_request(req).await?;

View File

@@ -18,9 +18,8 @@ async_channel_2 = { package = "async-channel", version = "2.0.0" }
chrono = { version = "0.4.26", features = ["serde"] }
url = "2.4.0"
regex = "1.9.1"
http = "0.2.9"
http_1 = { package = "http", version = "1.0.0" }
hyper_1 = { package = "hyper", version = "1.0.1" }
http = "1.0.0"
hyper = "1.0.1"
thiserror = "=0.0.1"
anyhow = "1.0"
tokio = "1"

View File

@@ -429,12 +429,6 @@ impl From<rmp_serde::decode::Error> for Error {
}
}
impl From<http::header::ToStrError> for Error {
fn from(k: http::header::ToStrError) -> Self {
Self::from_string(format!("{:?}", k))
}
}
impl From<anyhow::Error> for Error {
fn from(k: anyhow::Error) -> Self {
Self::from_string(format!("{k}"))
@@ -447,14 +441,20 @@ impl From<tokio::task::JoinError> for Error {
}
}
impl From<http_1::Error> for Error {
fn from(k: http_1::Error) -> Self {
impl From<http::Error> for Error {
fn from(k: http::Error) -> Self {
Self::from_string(k)
}
}
impl From<hyper_1::Error> for Error {
fn from(k: hyper_1::Error) -> Self {
impl From<http::uri::InvalidUri> for Error {
fn from(k: http::uri::InvalidUri) -> Self {
Self::from_string(k)
}
}
impl From<hyper::Error> for Error {
fn from(k: hyper::Error) -> Self {
Self::from_string(k)
}
}

View File

@@ -8,17 +8,17 @@ edition = "2021"
futures-util = "0.3.25"
serde = { version = "1.0.147", features = ["derive"] }
serde_json = "1.0.89"
url = "2.3.1"
tokio = { version = "1.22.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] }
tracing = "0.1.37"
url = "2.5.0"
tokio = { version = "1.34.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] }
tracing = "0.1.40"
http = "1.0.0"
http-body = "1.0.0"
http-body-util = "0.1.0"
hyper = { version = "1.0.1", features = ["http1", "http2", "client", "server"] }
hyper-tls = { version = "0.5.0" }
hyper-tls = { version = "0.6.0" }
hyper-util = { version = "0.1.1", features = ["full"] }
bytes = "1.3.0"
async-channel = "1.8.0"
bytes = "1.5.0"
async-channel = "1.9.0"
err = { path = "../err" }
netpod = { path = "../netpod" }
parse = { path = "../parse" }

View File

@@ -1,18 +1,22 @@
pub use hyper_util;
pub use http_body_util;
pub use http_body_util::Full;
pub use hyper_util;
use bytes::BufMut;
use bytes::Bytes;
use bytes::BytesMut;
use err::PublicError;
use futures_util::pin_mut;
use futures_util::Stream;
use futures_util::StreamExt;
use futures_util::TryStreamExt;
use http::header;
use http::Request;
use http::Response;
use http::StatusCode;
use http_body::Frame;
use http_body_util::combinators::BoxBody;
use http_body_util::BodyExt;
use hyper::body::Body;
use hyper::body::Incoming;
use hyper::client::conn::http2::SendRequest;
use hyper::Method;
use netpod::log::*;
@@ -21,18 +25,150 @@ use netpod::ChannelConfigQuery;
use netpod::ChannelConfigResponse;
use netpod::NodeConfigCached;
use netpod::APP_JSON;
use serde::Serialize;
use std::fmt;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use tokio::io;
use tokio::io::AsyncRead;
use tokio::io::ReadBuf;
use tokio::net::TcpStream;
use url::Url;
pub type BodyBox = BoxBody<Bytes, BodyError>;
pub type RespBox = Response<BodyBox>;
pub type Requ = Request<Incoming>;
pub type RespFull = Response<Full<Bytes>>;
// TODO rename: too similar.
pub type StreamBody = http_body_util::StreamBody<Pin<Box<dyn Stream<Item = Result<Frame<Bytes>, BodyError>> + Send>>>;
pub type StreamResponse = Response<StreamBody>;
fn _assert1() {
let body: Full<Bytes> = todoval();
let _: &dyn Body<Data = _, Error = _> = &body;
}
fn _assert2() {
let stream: Pin<Box<dyn futures_util::Stream<Item = Result<Frame<Bytes>, BodyError>>>> = todoval();
let body = http_body_util::StreamBody::new(stream);
let _: &dyn Body<Data = _, Error = _> = &body;
}
#[allow(unused)]
fn todoval<T>() -> T {
todo!()
}
pub fn body_empty() -> StreamBody {
// Full::new(Bytes::new()).map_err(Into::into).boxed()
let fr = Frame::data(Bytes::new());
let stream = futures_util::stream::iter([Ok(fr)]);
http_body_util::StreamBody::new(Box::pin(stream))
}
pub fn body_string<S: ToString>(body: S) -> StreamBody {
// Full::new(Bytes::from(body.to_string())).map_err(Into::into).boxed()
let fr = Frame::data(Bytes::from(body.to_string()));
let stream = futures_util::stream::iter([Ok(fr)]);
http_body_util::StreamBody::new(Box::pin(stream))
}
pub fn body_bytes<D: Into<Bytes>>(body: D) -> StreamBody {
let fr = Frame::data(body.into());
let stream = futures_util::stream::iter([Ok(fr)]);
http_body_util::StreamBody::new(Box::pin(stream))
}
pub trait IntoBody {
fn into_body(self) -> StreamBody;
}
pub struct StringBody {
body: String,
}
impl<S: ToString> From<S> for StringBody {
fn from(value: S) -> Self {
Self {
body: value.to_string(),
}
}
}
impl IntoBody for StringBody {
fn into_body(self) -> StreamBody {
let fr = Frame::data(Bytes::from(self.body.as_bytes().to_vec()));
let stream = futures_util::stream::iter([Ok(fr)]);
http_body_util::StreamBody::new(Box::pin(stream))
}
}
pub struct ToJsonBody {
body: Vec<u8>,
}
impl<S: Serialize> From<&S> for ToJsonBody {
fn from(value: &S) -> Self {
Self {
body: serde_json::to_vec(value).unwrap_or(Vec::new()),
}
}
}
impl IntoBody for ToJsonBody {
fn into_body(self) -> StreamBody {
let fr = Frame::data(Bytes::from(self.body));
let stream = futures_util::stream::iter([Ok(fr)]);
http_body_util::StreamBody::new(Box::pin(stream))
}
}
pub fn body_stream<S, I, E>(stream: S) -> StreamBody
where
S: Stream<Item = Result<I, E>> + Send + 'static,
I: Into<Bytes>,
E: fmt::Display,
{
let stream = stream.map(|x| match x {
Ok(x) => Ok(Frame::data(x.into())),
Err(_e) => Err(BodyError::Bad),
});
StreamBody::new(Box::pin(stream))
}
pub struct StreamIncoming {
inp: http_body_util::BodyStream<Incoming>,
}
impl StreamIncoming {
pub fn new(inp: Incoming) -> Self {
Self {
inp: http_body_util::BodyStream::new(inp),
}
}
}
impl Stream for StreamIncoming {
type Item = Result<Bytes, BodyError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
match self.inp.poll_next_unpin(cx) {
Ready(Some(Ok(x))) => {
if x.is_data() {
Ready(Some(Ok(x.into_data().unwrap())))
} else {
Ready(Some(Ok(Bytes::new())))
}
}
Ready(Some(Err(e))) => {
error!("{e}");
Ready(Some(Err(BodyError::Bad)))
}
Ready(None) => Ready(None),
Pending => Pending,
}
}
}
#[derive(Debug)]
pub enum BodyError {
@@ -62,19 +198,19 @@ pub enum Error {
}
impl From<std::io::Error> for Error {
fn from(value: std::io::Error) -> Self {
fn from(_: std::io::Error) -> Self {
Self::IO
}
}
impl From<http::Error> for Error {
fn from(value: http::Error) -> Self {
fn from(_: http::Error) -> Self {
Self::Http
}
}
impl From<hyper::Error> for Error {
fn from(value: hyper::Error) -> Self {
fn from(_: hyper::Error) -> Self {
Self::Http
}
}
@@ -102,13 +238,11 @@ pub async fn http_get(url: Url, accept: &str) -> Result<HttpResponse, Error> {
.uri(url.to_string())
.header(header::HOST, url.host_str().ok_or_else(|| Error::BadUrl)?)
.header(header::ACCEPT, accept)
.body(Full::new(Bytes::new()))?;
.body(body_empty())?;
let mut send_req = connect_client(req.uri()).await?;
let res = send_req.send_request(req).await?;
let (head, mut body) = res.into_parts();
debug!("http_get head {head:?}");
use bytes::BufMut;
use http_body_util::BodyExt;
let mut buf = BytesMut::new();
while let Some(x) = body.frame().await {
match x {
@@ -128,14 +262,13 @@ pub async fn http_get(url: Url, accept: &str) -> Result<HttpResponse, Error> {
}
pub async fn http_post(url: Url, accept: &str, body: String) -> Result<Bytes, Error> {
let body = Bytes::from(body.as_bytes().to_vec());
let req = Request::builder()
.method(http::Method::POST)
.uri(url.to_string())
.header(header::HOST, url.host_str().ok_or_else(|| Error::BadUrl)?)
.header(header::CONTENT_TYPE, APP_JSON)
.header(header::ACCEPT, accept)
.body(Full::new(body))?;
.body(body_string(body))?;
let mut send_req = connect_client(req.uri()).await?;
let res = send_req.send_request(req).await?;
if res.status() != StatusCode::OK {
@@ -143,12 +276,12 @@ pub async fn http_post(url: Url, accept: &str, body: String) -> Result<Bytes, Er
let (_head, body) = res.into_parts();
let buf = read_body_bytes(body).await?;
let s = String::from_utf8_lossy(&buf);
error!("{s}");
// TODO return error
return Err(Error::Http);
}
let (head, mut body) = res.into_parts();
debug!("http_get head {head:?}");
use bytes::BufMut;
use http_body_util::BodyExt;
let mut buf = BytesMut::new();
while let Some(x) = body.frame().await {
match x {
@@ -164,7 +297,7 @@ pub async fn http_post(url: Url, accept: &str, body: String) -> Result<Bytes, Er
Ok(buf)
}
pub async fn connect_client(uri: &http::Uri) -> Result<SendRequest<Full<Bytes>>, Error> {
pub async fn connect_client(uri: &http::Uri) -> Result<SendRequest<StreamBody>, Error> {
let host = uri.host().ok_or_else(|| Error::BadUrl)?;
let port = uri.port_u16().ok_or_else(|| Error::BadUrl)?;
let stream = TcpStream::connect(format!("{host}:{port}")).await?;
@@ -178,8 +311,6 @@ pub async fn connect_client(uri: &http::Uri) -> Result<SendRequest<Full<Bytes>>,
}
pub async fn read_body_bytes(mut body: hyper::body::Incoming) -> Result<Bytes, Error> {
use bytes::BufMut;
use http_body_util::BodyExt;
let mut buf = BytesMut::new();
while let Some(x) = body.frame().await {
match x {

View File

@@ -6,21 +6,23 @@ use crate::gather::SubRes;
use crate::response;
use crate::ReqCtx;
use crate::Requ;
use crate::RespFull;
use bytes::BufMut;
use bytes::Bytes;
use bytes::BytesMut;
use disk::merge::mergedblobsfromremotes::MergedBlobsFromRemotes;
use futures_util::Stream;
use futures_util::StreamExt;
use http::header;
use http::Method;
use http::Response;
use http::StatusCode;
use http_body_util::Full;
use httpclient::body_stream;
use httpclient::connect_client;
use httpclient::read_body_bytes;
use httpclient::IntoBody;
use httpclient::StreamResponse;
use httpclient::ToJsonBody;
use hyper::body::Incoming;
use hyper::Request;
use hyper::Response;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
@@ -138,7 +140,7 @@ impl FromErrorCode for ChannelSearchResultItemV1 {
#[derive(Debug, Serialize, Deserialize)]
pub struct ChannelSearchResultV1(pub Vec<ChannelSearchResultItemV1>);
pub async fn channel_search_list_v1(req: Requ, proxy_config: &ProxyConfig) -> Result<RespFull, Error> {
pub async fn channel_search_list_v1(req: Requ, proxy_config: &ProxyConfig) -> Result<StreamResponse, Error> {
let (head, reqbody) = req.into_parts();
let bodybytes = read_body_bytes(reqbody).await?;
let query: ChannelSearchQueryV1 = serde_json::from_slice(&bodybytes)?;
@@ -168,9 +170,10 @@ pub async fn channel_search_list_v1(req: Requ, proxy_config: &ProxyConfig) -> Re
a
})?;
let tags: Vec<_> = urls.iter().map(|k| k.to_string()).collect();
let nt = |tag, res| {
let nt = |tag: String, res: Response<Incoming>| {
let fut = async {
let body = read_body_bytes(res).await?;
let (_head, body) = res.into_parts();
let body = read_body_bytes(body).await?;
let res: ChannelSearchResult = match serde_json::from_slice(&body) {
Ok(k) => k,
Err(_) => ChannelSearchResult { channels: Vec::new() },
@@ -234,14 +237,14 @@ pub async fn channel_search_list_v1(req: Requ, proxy_config: &ProxyConfig) -> Re
.await?;
Ok(ret)
} else {
Ok(response(StatusCode::NOT_ACCEPTABLE).body(Full::new(Bytes::new()))?)
Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?)
}
}
None => Ok(response(StatusCode::NOT_ACCEPTABLE).body(Full::new(Bytes::new()))?),
None => Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?),
}
}
pub async fn channel_search_configs_v1(req: Requ, proxy_config: &ProxyConfig) -> Result<RespFull, Error> {
pub async fn channel_search_configs_v1(req: Requ, proxy_config: &ProxyConfig) -> Result<StreamResponse, Error> {
let (head, reqbody) = req.into_parts();
let bodybytes = read_body_bytes(reqbody).await?;
let query: ChannelSearchQueryV1 = serde_json::from_slice(&bodybytes)?;
@@ -267,17 +270,18 @@ pub async fn channel_search_configs_v1(req: Requ, proxy_config: &ProxyConfig) ->
}
Err(e) => Err(Error::with_msg(format!("parse error for: {:?} {:?}", sh, e))),
})
.fold_ok(vec![], |mut a, x| {
.fold_ok(Vec::new(), |mut a, x| {
a.push(x);
a
})?;
let tags: Vec<_> = urls.iter().map(|k| k.to_string()).collect();
let nt = |tag, res| {
let nt = |tag: String, res: Response<Incoming>| {
let fut = async {
let body = read_body_bytes(res).await?;
let (_head, body) = res.into_parts();
let body = read_body_bytes(body).await?;
let res: ChannelSearchResult = match serde_json::from_slice(&body) {
Ok(k) => k,
Err(_) => ChannelSearchResult { channels: vec![] },
Err(_) => ChannelSearchResult { channels: Vec::new() },
};
let ret = SubRes {
tag,
@@ -340,7 +344,7 @@ pub async fn channel_search_configs_v1(req: Requ, proxy_config: &ProxyConfig) ->
}
let res = response(StatusCode::OK)
.header(http::header::CONTENT_TYPE, APP_JSON)
.body(Full::new(serde_json::to_string(&res)?))?;
.body(ToJsonBody::from(&res).into_body())?;
Ok(res)
};
let bodies = (0..urls.len()).into_iter().map(|_| None).collect();
@@ -356,10 +360,10 @@ pub async fn channel_search_configs_v1(req: Requ, proxy_config: &ProxyConfig) ->
.await?;
Ok(ret)
} else {
Ok(response(StatusCode::NOT_ACCEPTABLE).body(Full::new(Bytes::new()))?)
Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?)
}
}
None => Ok(response(StatusCode::NOT_ACCEPTABLE).body(Full::new(Bytes::new()))?),
None => Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?),
}
}
@@ -420,7 +424,7 @@ impl FromErrorCode for ChannelBackendConfigsV1 {
}
// TODO replace usage of this by gather-generic
pub async fn gather_json_2_v1(req: Requ, pathpre: &str, _proxy_config: &ProxyConfig) -> Result<RespFull, Error> {
pub async fn gather_json_2_v1(req: Requ, pathpre: &str, _proxy_config: &ProxyConfig) -> Result<StreamResponse, Error> {
let (part_head, part_body) = req.into_parts();
let bodyslice = read_body_bytes(part_body).await?;
let gather_from: GatherFromV1 = serde_json::from_slice(&bodyslice)?;
@@ -432,19 +436,19 @@ pub async fn gather_json_2_v1(req: Requ, pathpre: &str, _proxy_config: &ProxyCon
let uri = format!("http://{}:{}/{}", gh.host, gh.port, path_post);
let req = Request::builder()
.method(Method::GET)
.uri(uri)
.header(header::HOST, gh.host);
.header(header::HOST, &gh.host)
.header(header::ACCEPT, APP_JSON)
.uri(uri);
let req = if gh.inst.len() > 0 {
req.header("retrieval_instance", &gh.inst)
} else {
req
};
let req = req.header(http::header::ACCEPT, APP_JSON);
let req = req.body(Full::new(Bytes::new()));
let req = req.body(body_empty())?;
let task = tokio::spawn(async move {
let mut client = connect_client(req.uri()).await?;
let res = client.send_request(req).await?;
Ok::<_, Error>(process_answer(res?).await?)
Ok::<_, Error>(process_answer(res).await?)
});
let task = tokio::time::timeout(std::time::Duration::from_millis(5000), task);
spawned.push((gh.clone(), task));
@@ -474,7 +478,7 @@ pub async fn gather_json_2_v1(req: Requ, pathpre: &str, _proxy_config: &ProxyCon
}
let res = response(StatusCode::OK)
.header(http::header::CONTENT_TYPE, APP_JSON)
.body(serde_json::to_string(&Jres { hosts: a })?.into())?;
.body(ToJsonBody::from(&Jres { hosts: a }).into_body())?;
Ok(res)
}
@@ -490,24 +494,20 @@ struct GatherHostV1 {
inst: String,
}
async fn process_answer(res: RespFull) -> Result<JsonValue, Error> {
let (pre, mut body) = res.into_parts();
async fn process_answer(res: Response<Incoming>) -> Result<JsonValue, Error> {
let (pre, body) = res.into_parts();
let body = read_body_bytes(body).await?;
let body = String::from_utf8(body.to_vec())?;
if pre.status != StatusCode::OK {
if let Some(c) = body.data().await {
let c: bytes::Bytes = c?;
let s1 = String::from_utf8(c.to_vec())?;
Ok(JsonValue::String(format!(
"status {} body {}",
pre.status.as_str(),
s1
)))
} else {
Ok(JsonValue::String(format!("status {}", pre.status.as_str())))
}
Ok(JsonValue::String(format!(
"status {} body {}",
pre.status.as_str(),
body
)))
} else {
let val = match serde_json::from_slice(the_data) {
let val = match serde_json::from_str(&body) {
Ok(k) => k,
Err(_e) => JsonValue::String(String::from_utf8(body_all.to_vec())?),
Err(_e) => JsonValue::String(body),
};
Ok::<_, Error>(val)
}
@@ -850,7 +850,7 @@ impl Stream for DataApiPython3DataStream {
#[allow(unused)]
fn shape_to_api3proto(sh: &Option<Vec<u32>>) -> Vec<u32> {
match sh {
None => vec![],
None => Vec::new(),
Some(g) => {
if g.len() == 1 {
vec![g[0]]
@@ -874,9 +874,14 @@ impl Api1EventsBinaryHandler {
}
}
pub async fn handle(&self, req: Requ, _ctx: &ReqCtx, node_config: &NodeConfigCached) -> Result<RespFull, Error> {
pub async fn handle(
&self,
req: Requ,
_ctx: &ReqCtx,
node_config: &NodeConfigCached,
) -> Result<StreamResponse, Error> {
if req.method() != Method::POST {
return Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Full::new(Bytes::new()))?);
return Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?);
}
let (head, body) = req.into_parts();
let accept = head
@@ -939,7 +944,7 @@ impl Api1EventsBinaryHandler {
span: tracing::Span,
reqidspan: tracing::Span,
ncc: &NodeConfigCached,
) -> Result<RespFull, Error> {
) -> Result<StreamResponse, Error> {
let self_name = any::type_name::<Self>();
// TODO this should go to usage statistics:
debug!(
@@ -1003,7 +1008,7 @@ impl Api1EventsBinaryHandler {
ncc.clone(),
);
let s = s.instrument(span).instrument(reqidspan);
let body = Body::wrap_stream(s);
let body = body_stream(s);
let ret = response(StatusCode::OK).header(X_DAQBUF_REQID, reqctx.reqid());
let ret = ret.body(body)?;
Ok(ret)
@@ -1011,7 +1016,7 @@ impl Api1EventsBinaryHandler {
// TODO set the public error code and message and return Err(e).
let e = Error::with_public_msg_no_trace(format!("{self_name} unsupported Accept: {}", accept));
error!("{self_name} {e}");
Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty)?)
Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?)
}
}
}
@@ -1031,7 +1036,7 @@ impl RequestStatusHandler {
}
}
pub async fn handle(&self, req: Requ, _ncc: &NodeConfigCached) -> Result<RespFull, Error> {
pub async fn handle(&self, req: Requ, _ncc: &NodeConfigCached) -> Result<StreamResponse, Error> {
let (head, body) = req.into_parts();
if head.method != Method::GET {
return Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?);
@@ -1053,7 +1058,7 @@ impl RequestStatusHandler {
debug!("RequestStatusHandler status_id {:?}", status_id);
let status = crate::status_board()?.status_as_json(status_id);
let s = serde_json::to_string(&status)?;
let ret = response(StatusCode::OK).body(Full::new(s))?;
let ret = response(StatusCode::OK).body(body_string(s))?;
Ok(ret)
}
}

View File

@@ -4,10 +4,12 @@ use crate::channelconfig::ch_conf_from_binned;
use crate::err::Error;
use crate::response_err;
use http::Method;
use http::Request;
use http::Response;
use http::StatusCode;
use hyper::Body;
use httpclient::body_empty;
use httpclient::IntoBody;
use httpclient::Requ;
use httpclient::StreamResponse;
use httpclient::ToJsonBody;
use netpod::log::*;
use netpod::timeunits::SEC;
use netpod::FromUrl;
@@ -16,7 +18,7 @@ use query::api4::binned::BinnedQuery;
use tracing::Instrument;
use url::Url;
async fn binned_json(url: Url, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
async fn binned_json(url: Url, req: Requ, node_config: &NodeConfigCached) -> Result<StreamResponse, Error> {
debug!("{:?}", req);
let reqid = crate::status_board()
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?
@@ -45,12 +47,11 @@ async fn binned_json(url: Url, req: Request<Body>, node_config: &NodeConfigCache
let item = streams::timebinnedjson::timebinned_json(query, ch_conf, reqid, node_config.node_config.cluster.clone())
.instrument(span1)
.await?;
let buf = serde_json::to_vec(&item)?;
let ret = response(StatusCode::OK).body(Body::from(buf))?;
let ret = response(StatusCode::OK).body(ToJsonBody::from(&item).into_body())?;
Ok(ret)
}
async fn binned(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
async fn binned(req: Requ, node_config: &NodeConfigCached) -> Result<StreamResponse, Error> {
let url = {
let s1 = format!("dummy:{}", req.uri());
Url::parse(&s1)
@@ -83,7 +84,7 @@ async fn binned(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Re
pub struct BinnedHandler {}
impl BinnedHandler {
pub fn handler(req: &Request<Body>) -> Option<Self> {
pub fn handler(req: &Requ) -> Option<Self> {
if req.uri().path() == "/api/4/binned" {
Some(Self {})
} else {
@@ -91,9 +92,9 @@ impl BinnedHandler {
}
}
pub async fn handle(&self, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
pub async fn handle(&self, req: Requ, node_config: &NodeConfigCached) -> Result<StreamResponse, Error> {
if req.method() != Method::GET {
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?);
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?);
}
match binned(req, node_config).await {
Ok(ret) => Ok(ret),

View File

@@ -9,10 +9,12 @@ use err::ToPublicError;
use futures_util::Stream;
use futures_util::StreamExt;
use http::Method;
use http::Request;
use http::Response;
use http::StatusCode;
use hyper::Body;
use httpclient::body_empty;
use httpclient::body_stream;
use httpclient::Requ;
use httpclient::StreamResponse;
use netpod::log::*;
use netpod::Node;
use netpod::NodeConfigCached;
@@ -54,7 +56,7 @@ impl ToPublicError for FindActiveError {
pub struct FindActiveHandler {}
impl FindActiveHandler {
pub fn handler(req: &Request<Body>) -> Option<Self> {
pub fn handler(req: &Requ) -> Option<Self> {
if req.uri().path() == "/api/4/tool/sfdatabuffer/find/channel/active" {
Some(Self {})
} else {
@@ -62,10 +64,10 @@ impl FindActiveHandler {
}
}
pub async fn handle(&self, req: Request<Body>, ncc: &NodeConfigCached) -> Result<Response<Body>, FindActiveError> {
pub async fn handle(&self, req: Requ, ncc: &NodeConfigCached) -> Result<StreamResponse, FindActiveError> {
if req.method() != Method::GET {
Ok(response(StatusCode::NOT_ACCEPTABLE)
.body(Body::empty())
.body(body_empty())
.map_err(|_| FindActiveError::InternalError)?)
} else {
match Self::handle_req(req, ncc).await {
@@ -80,7 +82,7 @@ impl FindActiveHandler {
}
}
async fn handle_req(req: Request<Body>, ncc: &NodeConfigCached) -> Result<Response<Body>, FindActiveError> {
async fn handle_req(req: Requ, ncc: &NodeConfigCached) -> Result<StreamResponse, FindActiveError> {
let accept_def = APP_JSON;
let accept = req
.headers()
@@ -108,7 +110,7 @@ impl FindActiveHandler {
}
})
.map(|x| Ok::<_, String>(Bytes::from(x)));
let body = Body::wrap_stream(Box::pin(stream));
let body = body_stream(stream);
Ok(Response::builder().status(StatusCode::OK).body(body).unwrap())
} else {
Err(FindActiveError::HttpBadAccept)

View File

@@ -4,12 +4,13 @@ use crate::ReqCtx;
use err::thiserror;
use err::ThisError;
use err::ToPublicError;
use futures_util::TryStreamExt;
use http::Method;
use http::Request;
use http::Response;
use http::StatusCode;
use hyper::Body;
use httpclient::body_empty;
use httpclient::body_stream;
use httpclient::read_body_bytes;
use httpclient::Requ;
use httpclient::StreamResponse;
use netpod::log::*;
use netpod::NodeConfigCached;
use netpod::ServiceVersion;
@@ -35,7 +36,7 @@ impl ToPublicError for EventDataError {
pub struct EventDataHandler {}
impl EventDataHandler {
pub fn handler(req: &Request<Body>) -> Option<Self> {
pub fn handler(req: &Requ) -> Option<Self> {
if req.uri().path().eq("/api/4/private/eventdata/frames") {
Some(Self {})
} else {
@@ -45,14 +46,14 @@ impl EventDataHandler {
pub async fn handle(
&self,
req: Request<Body>,
req: Requ,
_ctx: &ReqCtx,
ncc: &NodeConfigCached,
_service_version: &ServiceVersion,
) -> Result<Response<Body>, EventDataError> {
) -> Result<StreamResponse, EventDataError> {
if req.method() != Method::POST {
Ok(response(StatusCode::NOT_ACCEPTABLE)
.body(Body::empty())
.body(body_empty())
.map_err(|_| EventDataError::InternalError)?)
} else {
match Self::handle_req(req, ncc).await {
@@ -67,18 +68,21 @@ impl EventDataHandler {
}
}
async fn handle_req(req: Request<Body>, ncc: &NodeConfigCached) -> Result<Response<Body>, EventDataError> {
async fn handle_req(req: Requ, ncc: &NodeConfigCached) -> Result<StreamResponse, EventDataError> {
let (_head, body) = req.into_parts();
let frames =
nodenet::conn::events_get_input_frames(body.map_err(|e| err::Error::with_msg_no_trace(e.to_string())))
.await
.map_err(|_| EventDataError::InternalError)?;
let body = read_body_bytes(body)
.await
.map_err(|_e| EventDataError::InternalError)?;
let inp = futures_util::stream::iter([Ok(body)]);
let frames = nodenet::conn::events_get_input_frames(inp)
.await
.map_err(|_| EventDataError::InternalError)?;
let (evsubq,) = nodenet::conn::events_parse_input_query(frames).map_err(|_| EventDataError::QueryParse)?;
let stream = nodenet::conn::create_response_bytes_stream(evsubq, ncc)
.await
.map_err(|e| EventDataError::Error(Box::new(e)))?;
let ret = response(StatusCode::OK)
.body(Body::wrap_stream(stream))
.body(body_stream(stream))
.map_err(|_| EventDataError::InternalError)?;
Ok(ret)
}

View File

@@ -6,10 +6,13 @@ use crate::ToPublicResponse;
use futures_util::stream;
use futures_util::TryStreamExt;
use http::Method;
use http::Request;
use http::Response;
use http::StatusCode;
use hyper::Body;
use httpclient::body_empty;
use httpclient::body_stream;
use httpclient::IntoBody;
use httpclient::Requ;
use httpclient::StreamResponse;
use httpclient::ToJsonBody;
use netpod::log::*;
use netpod::FromUrl;
use netpod::NodeConfigCached;
@@ -22,7 +25,7 @@ use url::Url;
pub struct EventsHandler {}
impl EventsHandler {
pub fn handler(req: &Request<Body>) -> Option<Self> {
pub fn handler(req: &Requ) -> Option<Self> {
if req.uri().path() == "/api/4/events" {
Some(Self {})
} else {
@@ -30,9 +33,9 @@ impl EventsHandler {
}
}
pub async fn handle(&self, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
pub async fn handle(&self, req: Requ, node_config: &NodeConfigCached) -> Result<StreamResponse, Error> {
if req.method() != Method::GET {
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?);
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?);
}
match plain_events(req, node_config).await {
Ok(ret) => Ok(ret),
@@ -44,7 +47,7 @@ impl EventsHandler {
}
}
async fn plain_events(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
async fn plain_events(req: Requ, node_config: &NodeConfigCached) -> Result<StreamResponse, Error> {
let accept_def = APP_JSON;
let accept = req
.headers()
@@ -66,25 +69,18 @@ async fn plain_events(req: Request<Body>, node_config: &NodeConfigCached) -> Res
}
}
async fn plain_events_binary(
url: Url,
req: Request<Body>,
node_config: &NodeConfigCached,
) -> Result<Response<Body>, Error> {
async fn plain_events_binary(url: Url, req: Requ, node_config: &NodeConfigCached) -> Result<StreamResponse, Error> {
debug!("{:?}", req);
let query = PlainEventsQuery::from_url(&url).map_err(|e| e.add_public_msg(format!("Can not understand query")))?;
let ch_conf = chconf_from_events_quorum(&query, node_config).await?;
info!("plain_events_binary chconf_from_events_quorum: {ch_conf:?}");
let s = stream::iter([Ok::<_, Error>(String::from("TODO_PREBINNED_BINARY_STREAM"))]);
let ret = response(StatusCode::OK).body(Body::wrap_stream(s.map_err(Error::from)))?;
let s = s.map_err(Error::from);
let ret = response(StatusCode::OK).body(body_stream(s))?;
Ok(ret)
}
async fn plain_events_json(
url: Url,
req: Request<Body>,
node_config: &NodeConfigCached,
) -> Result<Response<Body>, Error> {
async fn plain_events_json(url: Url, req: Requ, node_config: &NodeConfigCached) -> Result<StreamResponse, Error> {
let reqid = crate::status_board()?.new_status_id();
info!("plain_events_json req: {:?}", req);
let (_head, _body) = req.into_parts();
@@ -105,7 +101,6 @@ async fn plain_events_json(
return Err(e.into());
}
};
let buf = serde_json::to_vec(&item)?;
let ret = response(StatusCode::OK).body(Body::from(buf))?;
let ret = response(StatusCode::OK).body(ToJsonBody::from(&item).into_body())?;
Ok(ret)
}

View File

@@ -2,10 +2,12 @@ use crate::bodystream::response;
use crate::bodystream::ToPublicResponse;
use crate::err::Error;
use http::Method;
use http::Request;
use http::Response;
use http::StatusCode;
use hyper::Body;
use httpclient::body_empty;
use httpclient::IntoBody;
use httpclient::Requ;
use httpclient::StreamResponse;
use httpclient::ToJsonBody;
use netpod::log::*;
use netpod::ChannelSearchQuery;
use netpod::ChannelSearchResult;
@@ -14,7 +16,7 @@ use netpod::ACCEPT_ALL;
use netpod::APP_JSON;
use url::Url;
pub async fn channel_search(req: Request<Body>, node_config: &NodeConfigCached) -> Result<ChannelSearchResult, Error> {
pub async fn channel_search(req: Requ, node_config: &NodeConfigCached) -> Result<ChannelSearchResult, Error> {
let url = Url::parse(&format!("dummy://{}", req.uri()))?;
let query = ChannelSearchQuery::from_url(&url)?;
info!("search query: {:?}", query);
@@ -25,7 +27,7 @@ pub async fn channel_search(req: Request<Body>, node_config: &NodeConfigCached)
pub struct ChannelSearchHandler {}
impl ChannelSearchHandler {
pub fn handler(req: &Request<Body>) -> Option<Self> {
pub fn handler(req: &Requ) -> Option<Self> {
if req.uri().path() == "/api/4/search/channel" {
Some(Self {})
} else {
@@ -33,7 +35,7 @@ impl ChannelSearchHandler {
}
}
pub async fn handle(&self, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
pub async fn handle(&self, req: Requ, node_config: &NodeConfigCached) -> Result<StreamResponse, Error> {
if req.method() == Method::GET {
let accept_def = APP_JSON;
let accept = req
@@ -42,20 +44,17 @@ impl ChannelSearchHandler {
.map_or(accept_def, |k| k.to_str().unwrap_or(accept_def));
if accept.contains(APP_JSON) || accept.contains(ACCEPT_ALL) {
match channel_search(req, node_config).await {
Ok(item) => {
let buf = serde_json::to_vec(&item)?;
Ok(response(StatusCode::OK).body(Body::from(buf))?)
}
Ok(item) => Ok(response(StatusCode::OK).body(ToJsonBody::from(&item).into_body())?),
Err(e) => {
warn!("handle: got error from channel_search: {e:?}");
Ok(e.to_public_response())
}
}
} else {
Ok(response(StatusCode::BAD_REQUEST).body(Body::empty())?)
Ok(response(StatusCode::BAD_REQUEST).body(body_empty())?)
}
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?)
}
}
}

View File

@@ -1,10 +1,11 @@
use crate::bodystream::response;
use crate::err::Error;
use crate::ReqCtx;
use http::Request;
use http::Response;
use http::StatusCode;
use hyper::Body;
use httpclient::IntoBody;
use httpclient::Requ;
use httpclient::StreamResponse;
use httpclient::ToJsonBody;
use netpod::log::*;
use netpod::NodeConfigCached;
use netpod::NodeStatus;
@@ -28,7 +29,7 @@ impl StatusNodesRecursive {
"/api/4/private/status/nodes/recursive"
}
pub fn handler(req: &Request<Body>) -> Option<Self> {
pub fn handler(req: &Requ) -> Option<Self> {
if req.uri().path() == Self::path() {
Some(Self {})
} else {
@@ -38,11 +39,11 @@ impl StatusNodesRecursive {
pub async fn handle(
&self,
req: Request<Body>,
req: Requ,
ctx: &ReqCtx,
node_config: &NodeConfigCached,
service_version: &ServiceVersion,
) -> Result<Response<Body>, Error> {
) -> Result<StreamResponse, Error> {
let res = tokio::time::timeout(
Duration::from_millis(1200),
self.status(req, ctx, node_config, service_version),
@@ -57,8 +58,7 @@ impl StatusNodesRecursive {
};
match res {
Ok(status) => {
let body = serde_json::to_vec(&status)?;
let ret = response(StatusCode::OK).body(Body::from(body))?;
let ret = response(StatusCode::OK).body(ToJsonBody::from(&status).into_body())?;
Ok(ret)
}
Err(e) => {
@@ -71,7 +71,7 @@ impl StatusNodesRecursive {
async fn status(
&self,
req: Request<Body>,
req: Requ,
_ctx: &ReqCtx,
node_config: &NodeConfigCached,
service_version: &ServiceVersion,

View File

@@ -1,13 +1,11 @@
use crate::err::Error;
use futures_util::StreamExt;
use http::HeaderMap;
use http::Response;
use http::StatusCode;
use httpclient::body_empty;
use httpclient::body_string;
use httpclient::StreamResponse;
use netpod::log::*;
use netpod::APP_JSON;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
pub fn response<T>(status: T) -> http::response::Builder
where
@@ -18,17 +16,17 @@ where
}
pub trait ToPublicResponse {
fn to_public_response(&self) -> Response<Body>;
fn to_public_response(&self) -> StreamResponse;
}
impl ToPublicResponse for Error {
fn to_public_response(&self) -> Response<Body> {
fn to_public_response(&self) -> StreamResponse {
self.0.to_public_response()
}
}
impl ToPublicResponse for ::err::Error {
fn to_public_response(&self) -> Response<Body> {
fn to_public_response(&self) -> StreamResponse {
use err::Reason;
let e = self.to_public_error();
let status = match e.reason() {
@@ -42,30 +40,15 @@ impl ToPublicResponse for ::err::Error {
};
match response(status)
.header(http::header::ACCEPT, APP_JSON)
.body(Body::from(msg))
.body(body_string(msg))
{
Ok(res) => res,
Err(e) => {
error!("can not generate http error response {e:?}");
let mut res = Response::new(Body::default());
let mut res = Response::new(body_empty());
*res.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
res
}
}
}
}
struct BodyStreamWrap(netpod::BodyStream);
// impl hyper::body::HttpBody for BodyStreamWrap {
// type Data = bytes::Bytes;
// type Error = ::err::Error;
// fn poll_data(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<Self::Data, Self::Error>>> {
// self.0.inner.poll_next_unpin(cx)
// }
// fn poll_trailers(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
// Poll::Ready(Ok(None))
// }
// }

View File

@@ -3,10 +3,13 @@ use crate::err::Error;
use crate::ReqCtx;
use futures_util::StreamExt;
use http::Method;
use http::Request;
use http::Response;
use http::StatusCode;
use hyper::Body;
use httpclient::body_empty;
use httpclient::body_string;
use httpclient::IntoBody;
use httpclient::Requ;
use httpclient::StreamResponse;
use httpclient::ToJsonBody;
use items_0::Empty;
use items_0::Extendable;
use items_2::channelevents::ChannelStatusEvents;
@@ -22,7 +25,7 @@ use url::Url;
pub struct ConnectionStatusEvents {}
impl ConnectionStatusEvents {
pub fn handler(req: &Request<Body>) -> Option<Self> {
pub fn handler(req: &Requ) -> Option<Self> {
if req.uri().path() == "/api/4/status/connection/events" {
Some(Self {})
} else {
@@ -32,10 +35,10 @@ impl ConnectionStatusEvents {
pub async fn handle(
&self,
req: Request<Body>,
req: Requ,
_ctx: &ReqCtx,
node_config: &NodeConfigCached,
) -> Result<Response<Body>, Error> {
) -> Result<StreamResponse, Error> {
if req.method() == Method::GET {
let accept_def = APP_JSON;
let accept = req
@@ -47,20 +50,20 @@ impl ConnectionStatusEvents {
let q = ChannelStateEventsQuery::from_url(&url)?;
match self.fetch_data(&q, node_config).await {
Ok(k) => {
let body = Body::from(serde_json::to_vec(&k)?);
let body = ToJsonBody::from(&k).into_body();
Ok(response(StatusCode::OK).body(body)?)
}
Err(e) => {
error!("{e}");
Ok(response(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::from(format!("{:?}", e.public_msg())))?)
.body(body_string(format!("{:?}", e.public_msg())))?)
}
}
} else {
Ok(response(StatusCode::BAD_REQUEST).body(Body::empty())?)
Ok(response(StatusCode::BAD_REQUEST).body(body_empty())?)
}
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?)
}
}
@@ -96,7 +99,7 @@ impl ConnectionStatusEvents {
pub struct ChannelStatusEventsHandler {}
impl ChannelStatusEventsHandler {
pub fn handler(req: &Request<Body>) -> Option<Self> {
pub fn handler(req: &Requ) -> Option<Self> {
if req.uri().path() == "/api/4/status/channel/events" {
Some(Self {})
} else {
@@ -106,10 +109,10 @@ impl ChannelStatusEventsHandler {
pub async fn handle(
&self,
req: Request<Body>,
req: Requ,
_ctx: &ReqCtx,
node_config: &NodeConfigCached,
) -> Result<Response<Body>, Error> {
) -> Result<StreamResponse, Error> {
if req.method() == Method::GET {
let accept_def = APP_JSON;
let accept = req
@@ -121,20 +124,20 @@ impl ChannelStatusEventsHandler {
let q = ChannelStateEventsQuery::from_url(&url)?;
match self.fetch_data(&q, node_config).await {
Ok(k) => {
let body = Body::from(serde_json::to_vec(&k)?);
let body = ToJsonBody::from(&k).into_body();
Ok(response(StatusCode::OK).body(body)?)
}
Err(e) => {
error!("{e}");
Ok(response(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::from(format!("{:?}", e.public_msg())))?)
.body(body_string(format!("{:?}", e.public_msg())))?)
}
}
} else {
Ok(response(StatusCode::BAD_REQUEST).body(Body::empty())?)
Ok(response(StatusCode::BAD_REQUEST).body(body_empty())?)
}
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?)
}
}

View File

@@ -4,10 +4,13 @@ use crate::ToPublicResponse;
use dbconn::create_connection;
use futures_util::StreamExt;
use http::Method;
use http::Request;
use http::Response;
use http::StatusCode;
use hyper::Body;
use httpclient::body_empty;
use httpclient::body_string;
use httpclient::IntoBody;
use httpclient::Requ;
use httpclient::StreamResponse;
use httpclient::ToJsonBody;
use netpod::get_url_query_pairs;
use netpod::log::*;
use netpod::query::prebinned::PreBinnedQuery;
@@ -58,7 +61,7 @@ pub async fn ch_conf_from_binned(
pub struct ChannelConfigHandler {}
impl ChannelConfigHandler {
pub fn handler(req: &Request<Body>) -> Option<Self> {
pub fn handler(req: &Requ) -> Option<Self> {
if req.uri().path() == "/api/4/channel/config" {
Some(Self {})
} else {
@@ -66,7 +69,7 @@ impl ChannelConfigHandler {
}
}
pub async fn handle(&self, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
pub async fn handle(&self, req: Requ, node_config: &NodeConfigCached) -> Result<StreamResponse, Error> {
if req.method() == Method::GET {
let accept_def = APP_JSON;
let accept = req
@@ -82,18 +85,14 @@ impl ChannelConfigHandler {
}
}
} else {
Ok(response(StatusCode::BAD_REQUEST).body(Body::empty())?)
Ok(response(StatusCode::BAD_REQUEST).body(body_empty())?)
}
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?)
}
}
async fn channel_config(
&self,
req: Request<Body>,
node_config: &NodeConfigCached,
) -> Result<Response<Body>, Error> {
async fn channel_config(&self, req: Requ, node_config: &NodeConfigCached) -> Result<StreamResponse, Error> {
let url = Url::parse(&format!("dummy:{}", req.uri()))?;
let q = ChannelConfigQuery::from_url(&url)?;
let conf = nodenet::channelconfig::channel_config(q.range.clone(), q.channel.clone(), node_config).await?;
@@ -102,13 +101,13 @@ impl ChannelConfigHandler {
let res: ChannelConfigResponse = conf.into();
let ret = response(StatusCode::OK)
.header(http::header::CONTENT_TYPE, APP_JSON)
.body(Body::from(serde_json::to_string(&res)?))?;
.body(ToJsonBody::from(&res).into_body())?;
Ok(ret)
}
None => {
let ret = response(StatusCode::NOT_FOUND)
.header(http::header::CONTENT_TYPE, APP_JSON)
.body(Body::empty())?;
.body(body_empty())?;
Ok(ret)
}
}
@@ -118,7 +117,7 @@ impl ChannelConfigHandler {
pub struct ChannelConfigsHandler {}
impl ChannelConfigsHandler {
pub fn handler(req: &Request<Body>) -> Option<Self> {
pub fn handler(req: &Requ) -> Option<Self> {
if req.uri().path() == "/api/4/channel/configs" {
Some(Self {})
} else {
@@ -126,7 +125,7 @@ impl ChannelConfigsHandler {
}
}
pub async fn handle(&self, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
pub async fn handle(&self, req: Requ, node_config: &NodeConfigCached) -> Result<StreamResponse, Error> {
if req.method() == Method::GET {
let accept_def = APP_JSON;
let accept = req
@@ -142,14 +141,14 @@ impl ChannelConfigsHandler {
}
}
} else {
Ok(response(StatusCode::BAD_REQUEST).body(Body::empty())?)
Ok(response(StatusCode::BAD_REQUEST).body(body_empty())?)
}
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?)
}
}
async fn channel_configs(&self, req: Request<Body>, ncc: &NodeConfigCached) -> Result<Response<Body>, Error> {
async fn channel_configs(&self, req: Requ, ncc: &NodeConfigCached) -> Result<StreamResponse, Error> {
info!("channel_configs");
let url = Url::parse(&format!("dummy:{}", req.uri()))?;
let q = ChannelConfigQuery::from_url(&url)?;
@@ -157,7 +156,7 @@ impl ChannelConfigsHandler {
let ch_confs = nodenet::channelconfig::channel_configs(q.channel, ncc).await?;
let ret = response(StatusCode::OK)
.header(http::header::CONTENT_TYPE, APP_JSON)
.body(Body::from(serde_json::to_string(&ch_confs)?))?;
.body(ToJsonBody::from(&ch_confs).into_body())?;
Ok(ret)
}
}
@@ -165,7 +164,7 @@ impl ChannelConfigsHandler {
pub struct ChannelConfigQuorumHandler {}
impl ChannelConfigQuorumHandler {
pub fn handler(req: &Request<Body>) -> Option<Self> {
pub fn handler(req: &Requ) -> Option<Self> {
if req.uri().path() == "/api/4/channel/config/quorum" {
Some(Self {})
} else {
@@ -173,7 +172,7 @@ impl ChannelConfigQuorumHandler {
}
}
pub async fn handle(&self, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
pub async fn handle(&self, req: Requ, node_config: &NodeConfigCached) -> Result<StreamResponse, Error> {
if req.method() == Method::GET {
let accept_def = APP_JSON;
let accept = req
@@ -189,14 +188,14 @@ impl ChannelConfigQuorumHandler {
}
}
} else {
Ok(response(StatusCode::BAD_REQUEST).body(Body::empty())?)
Ok(response(StatusCode::BAD_REQUEST).body(body_empty())?)
}
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?)
}
}
async fn channel_config_quorum(&self, req: Request<Body>, ncc: &NodeConfigCached) -> Result<Response<Body>, Error> {
async fn channel_config_quorum(&self, req: Requ, ncc: &NodeConfigCached) -> Result<StreamResponse, Error> {
info!("channel_config_quorum");
let url = Url::parse(&format!("dummy:{}", req.uri()))?;
let q = ChannelConfigQuery::from_url(&url)?;
@@ -204,7 +203,7 @@ impl ChannelConfigQuorumHandler {
let ch_confs = nodenet::configquorum::find_config_basics_quorum(q.channel, q.range.into(), ncc).await?;
let ret = response(StatusCode::OK)
.header(http::header::CONTENT_TYPE, APP_JSON)
.body(Body::from(serde_json::to_string(&ch_confs)?))?;
.body(ToJsonBody::from(&ch_confs).into_body())?;
Ok(ret)
}
}
@@ -217,7 +216,7 @@ pub struct ConfigsHisto {
pub struct ScyllaConfigsHisto {}
impl ScyllaConfigsHisto {
pub fn handler(req: &Request<Body>) -> Option<Self> {
pub fn handler(req: &Requ) -> Option<Self> {
if req.uri().path() == "/api/4/scylla/configs/histo" {
Some(Self {})
} else {
@@ -225,7 +224,7 @@ impl ScyllaConfigsHisto {
}
}
pub async fn handle(&self, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
pub async fn handle(&self, req: Requ, node_config: &NodeConfigCached) -> Result<StreamResponse, Error> {
if req.method() == Method::GET {
let accept_def = APP_JSON;
let accept = req
@@ -236,13 +235,13 @@ impl ScyllaConfigsHisto {
let res = self
.make_histo(&node_config.node_config.cluster.backend, node_config)
.await?;
let body = Body::from(serde_json::to_vec(&res)?);
let body = ToJsonBody::from(&res).into_body();
Ok(response(StatusCode::OK).body(body)?)
} else {
Ok(response(StatusCode::BAD_REQUEST).body(Body::empty())?)
Ok(response(StatusCode::BAD_REQUEST).body(body_empty())?)
}
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?)
}
}
@@ -325,7 +324,7 @@ pub struct ChannelListWithType {
pub struct ScyllaChannelsWithType {}
impl ScyllaChannelsWithType {
pub fn handler(req: &Request<Body>) -> Option<Self> {
pub fn handler(req: &Requ) -> Option<Self> {
if req.uri().path() == "/api/4/scylla/channels/with_type" {
Some(Self {})
} else {
@@ -333,7 +332,7 @@ impl ScyllaChannelsWithType {
}
}
pub async fn handle(&self, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
pub async fn handle(&self, req: Requ, node_config: &NodeConfigCached) -> Result<StreamResponse, Error> {
if req.method() == Method::GET {
let accept_def = APP_JSON;
let accept = req
@@ -346,13 +345,13 @@ impl ScyllaChannelsWithType {
let res = self
.get_channels(&q, &node_config.node_config.cluster.backend, node_config)
.await?;
let body = Body::from(serde_json::to_vec(&res)?);
let body = ToJsonBody::from(&res).into_body();
Ok(response(StatusCode::OK).body(body)?)
} else {
Ok(response(StatusCode::BAD_REQUEST).body(Body::empty())?)
Ok(response(StatusCode::BAD_REQUEST).body(body_empty())?)
}
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?)
}
}
@@ -483,7 +482,7 @@ impl FromUrl for ScyllaChannelsActiveQuery {
pub struct ScyllaChannelsActive {}
impl ScyllaChannelsActive {
pub fn handler(req: &Request<Body>) -> Option<Self> {
pub fn handler(req: &Requ) -> Option<Self> {
if req.uri().path() == "/api/4/channels/active" {
Some(Self {})
} else {
@@ -491,7 +490,7 @@ impl ScyllaChannelsActive {
}
}
pub async fn handle(&self, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
pub async fn handle(&self, req: Requ, node_config: &NodeConfigCached) -> Result<StreamResponse, Error> {
if req.method() == Method::GET {
let accept_def = APP_JSON;
let accept = req
@@ -502,13 +501,13 @@ impl ScyllaChannelsActive {
let url = Url::parse(&format!("dummy:{}", req.uri()))?;
let q = ScyllaChannelsActiveQuery::from_url(&url)?;
let res = self.get_channels(&q, node_config).await?;
let body = Body::from(serde_json::to_vec(&res)?);
let body = ToJsonBody::from(&res).into_body();
Ok(response(StatusCode::OK).body(body)?)
} else {
Ok(response(StatusCode::BAD_REQUEST).body(Body::empty())?)
Ok(response(StatusCode::BAD_REQUEST).body(body_empty())?)
}
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?)
}
}
@@ -586,7 +585,7 @@ pub struct IocForChannelRes {
pub struct IocForChannel {}
impl IocForChannel {
pub fn handler(req: &Request<Body>) -> Option<Self> {
pub fn handler(req: &Requ) -> Option<Self> {
if req.uri().path() == "/api/4/channel/ioc" {
Some(Self {})
} else {
@@ -594,7 +593,7 @@ impl IocForChannel {
}
}
pub async fn handle(&self, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
pub async fn handle(&self, req: Requ, node_config: &NodeConfigCached) -> Result<StreamResponse, Error> {
if req.method() == Method::GET {
let accept_def = APP_JSON;
let accept = req
@@ -606,19 +605,19 @@ impl IocForChannel {
let q = IocForChannelQuery::from_url(&url)?;
match self.find(&q, node_config).await {
Ok(k) => {
let body = Body::from(serde_json::to_vec(&k)?);
let body = ToJsonBody::from(&k).into_body();
Ok(response(StatusCode::OK).body(body)?)
}
Err(e) => {
let body = Body::from(format!("{:?}", e.public_msg()));
let body = body_string(format!("{:?}", e.public_msg()));
Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(body)?)
}
}
} else {
Ok(response(StatusCode::BAD_REQUEST).body(Body::empty())?)
Ok(response(StatusCode::BAD_REQUEST).body(body_empty())?)
}
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?)
}
}
@@ -675,7 +674,7 @@ pub struct ScyllaSeriesTsMspResponse {
pub struct ScyllaSeriesTsMsp {}
impl ScyllaSeriesTsMsp {
pub fn handler(req: &Request<Body>) -> Option<Self> {
pub fn handler(req: &Requ) -> Option<Self> {
if req.uri().path() == "/api/4/scylla/series/tsMsps" {
Some(Self {})
} else {
@@ -683,7 +682,7 @@ impl ScyllaSeriesTsMsp {
}
}
pub async fn handle(&self, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
pub async fn handle(&self, req: Requ, node_config: &NodeConfigCached) -> Result<StreamResponse, Error> {
if req.method() == Method::GET {
let accept_def = APP_JSON;
let accept = req
@@ -695,17 +694,17 @@ impl ScyllaSeriesTsMsp {
let q = ScyllaSeriesTsMspQuery::from_url(&url)?;
match self.get_ts_msps(&q, node_config).await {
Ok(k) => {
let body = Body::from(serde_json::to_vec(&k)?);
let body = ToJsonBody::from(&k).into_body();
Ok(response(StatusCode::OK).body(body)?)
}
Err(e) => Ok(response(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::from(format!("{:?}", e.public_msg())))?),
.body(body_string(format!("{:?}", e.public_msg())))?),
}
} else {
Ok(response(StatusCode::BAD_REQUEST).body(Body::empty())?)
Ok(response(StatusCode::BAD_REQUEST).body(body_empty())?)
}
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?)
}
}
@@ -752,7 +751,7 @@ pub struct AmbigiousChannelNamesResponse {
pub struct AmbigiousChannelNames {}
impl AmbigiousChannelNames {
pub fn handler(req: &Request<Body>) -> Option<Self> {
pub fn handler(req: &Requ) -> Option<Self> {
if req.uri().path() == "/api/4/channels/ambigious" {
Some(Self {})
} else {
@@ -760,7 +759,7 @@ impl AmbigiousChannelNames {
}
}
pub async fn handle(&self, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
pub async fn handle(&self, req: Requ, node_config: &NodeConfigCached) -> Result<StreamResponse, Error> {
if req.method() == Method::GET {
let accept_def = APP_JSON;
let accept = req
@@ -770,17 +769,17 @@ impl AmbigiousChannelNames {
if accept == APP_JSON || accept == ACCEPT_ALL {
match self.process(node_config).await {
Ok(k) => {
let body = Body::from(serde_json::to_vec(&k)?);
let body = ToJsonBody::from(&k).into_body();
Ok(response(StatusCode::OK).body(body)?)
}
Err(e) => Ok(response(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::from(format!("{:?}", e.public_msg())))?),
.body(body_string(format!("{:?}", e.public_msg())))?),
}
} else {
Ok(response(StatusCode::BAD_REQUEST).body(Body::empty())?)
Ok(response(StatusCode::BAD_REQUEST).body(body_empty())?)
}
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?)
}
}
@@ -848,7 +847,7 @@ fn test_data_f64_01() -> (Msps, Lsps, Pulses, ValsF64) {
pub struct GenerateScyllaTestData {}
impl GenerateScyllaTestData {
pub fn handler(req: &Request<Body>) -> Option<Self> {
pub fn handler(req: &Requ) -> Option<Self> {
if req.uri().path() == "/api/4/test/generate/scylla" {
Some(Self {})
} else {
@@ -856,7 +855,7 @@ impl GenerateScyllaTestData {
}
}
pub async fn handle(&self, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
pub async fn handle(&self, req: Requ, node_config: &NodeConfigCached) -> Result<StreamResponse, Error> {
if req.method() == Method::GET {
let accept_def = APP_JSON;
let accept = req
@@ -866,17 +865,17 @@ impl GenerateScyllaTestData {
if accept == APP_JSON || accept == ACCEPT_ALL {
match self.process(node_config).await {
Ok(k) => {
let body = Body::from(serde_json::to_vec(&k)?);
let body = ToJsonBody::from(&k).into_body();
Ok(response(StatusCode::OK).body(body)?)
}
Err(e) => Ok(response(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::from(format!("{:?}", e.public_msg())))?),
.body(body_string(format!("{:?}", e.public_msg())))?),
}
} else {
Ok(response(StatusCode::BAD_REQUEST).body(Body::empty())?)
Ok(response(StatusCode::BAD_REQUEST).body(body_empty())?)
}
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?)
}
}

View File

@@ -2,17 +2,11 @@ use crate::body_empty;
use crate::err::Error;
use crate::response;
use crate::Requ;
use crate::RespFull;
use crate::StreamBody;
use bytes::Bytes;
use futures_util::Stream;
use futures_util::TryStreamExt;
use http::Method;
use http::Response;
use http::StatusCode;
use http_body_util::BodyExt;
use httpclient::httpclient::http_body_util;
use httpclient::RespBox;
use httpclient::body_stream;
use httpclient::StreamResponse;
use netpod::get_url_query_pairs;
use netpod::log::*;
use netpod::DiskIoTune;
@@ -72,7 +66,7 @@ impl DownloadHandler {
}
}
pub async fn handle(&self, req: Requ, node_config: &NodeConfigCached) -> Result<RespBox, Error> {
pub async fn handle(&self, req: Requ, node_config: &NodeConfigCached) -> Result<StreamResponse, Error> {
if req.method() == Method::GET {
self.get(req, node_config).await
} else {
@@ -80,7 +74,7 @@ impl DownloadHandler {
}
}
pub async fn get(&self, req: Requ, ncc: &NodeConfigCached) -> Result<RespBox, Error> {
pub async fn get(&self, req: Requ, ncc: &NodeConfigCached) -> Result<StreamResponse, Error> {
let (head, _body) = req.into_parts();
let p2 = &head.uri.path()[Self::path_prefix().len()..];
let base = match &ncc.node.sf_databuffer {
@@ -95,16 +89,7 @@ impl DownloadHandler {
let file = tokio::fs::OpenOptions::new().read(true).open(&pp).await?;
let stream =
disk::file_content_stream(pp, file, query.disk_io_tune.clone(), "download").map_ok(|x| x.into_buf());
use futures_util::StreamExt;
use hyper::body::Frame;
let stream = stream.map(|item| item.map(|x| Frame::data(x.freeze())));
let body = httpclient::httpclient::http_body_util::StreamBody::new(stream);
let body = BodyExt::boxed(body);
// let body = http_body_util::combinators::BoxBody::new(body);
// let body: StreamBody = Box::pin(body);
// let body: Pin<Box<dyn Stream<Item = Result<Frame<Bytes>, err::Error>>>> = Box::pin(body);
let res = response(StatusCode::OK).body(body)?;
let res = response(StatusCode::OK).body(body_stream(stream))?;
Ok(res)
}
}

View File

@@ -3,13 +3,15 @@ use crate::body_string;
use crate::err::Error;
use crate::response;
use crate::Requ;
use crate::RespFull;
use futures_util::select;
use futures_util::FutureExt;
use http::Method;
use http::StatusCode;
use httpclient::connect_client;
use httpclient::read_body_bytes;
use httpclient::IntoBody;
use httpclient::StreamResponse;
use httpclient::ToJsonBody;
use hyper::body::Incoming;
use hyper::Request;
use hyper::Response;
@@ -55,7 +57,7 @@ async fn process_answer(res: Response<hyper::body::Incoming>) -> Result<JsonValu
}
}
pub async fn gather_get_json(req: Requ, node_config: &NodeConfigCached) -> Result<RespFull, Error> {
pub async fn gather_get_json(req: Requ, node_config: &NodeConfigCached) -> Result<StreamResponse, Error> {
let (head, body) = req.into_parts();
let _bodyslice = read_body_bytes(body).await?;
let pathpre = "/api/4/gather/";
@@ -68,6 +70,7 @@ pub async fn gather_get_json(req: Requ, node_config: &NodeConfigCached) -> Resul
.filter_map(|node| {
let uri = format!("http://{}:{}/api/4/{}", node.host, node.port, pathsuf);
let req = Request::builder().method(Method::GET).uri(uri);
let req = req.header(http::header::HOST, &node.host);
let req = req.header(http::header::ACCEPT, APP_JSON);
match req.body(body_empty()) {
Ok(req) => {
@@ -123,7 +126,7 @@ pub async fn gather_get_json(req: Requ, node_config: &NodeConfigCached) -> Resul
let a = a;
let res = response(StatusCode::OK)
.header(http::header::CONTENT_TYPE, APP_JSON)
.body(serde_json::to_string(&Jres { hosts: a })?.into())?;
.body(ToJsonBody::from(&Jres { hosts: a }).into_body())?;
Ok(res)
}

View File

@@ -25,12 +25,16 @@ use futures_util::FutureExt;
use futures_util::StreamExt;
use http::Method;
use http::StatusCode;
use http_body_util::combinators::BoxBody;
use http_body_util::Full;
use hyper::body::Incoming;
use httpclient::body_bytes;
use httpclient::body_empty;
use httpclient::body_stream;
use httpclient::body_string;
use httpclient::IntoBody;
use httpclient::Requ;
use httpclient::StreamResponse;
use httpclient::ToJsonBody;
use hyper::service::service_fn;
use hyper::Request;
use hyper::Response;
use hyper_util::rt::TokioIo;
use net::SocketAddr;
use netpod::log::*;
@@ -123,23 +127,6 @@ pub fn accepts_octets(hm: &http::HeaderMap) -> bool {
}
}
pub type Requ = Request<Incoming>;
pub type RespFull = Response<Full<Bytes>>;
use http_body_util::BodyExt;
use httpclient::BodyBox;
use httpclient::RespBox;
pub fn body_empty() -> BodyBox {
Full::new(Bytes::new()).map_err(Into::into).boxed()
}
pub fn body_string<S: ToString>(body: S) -> BodyBox {
Full::new(Bytes::from(body.to_string())).map_err(Into::into).boxed()
}
pub type StreamBody = Pin<Box<dyn futures_util::Stream<Item = Result<hyper::body::Frame<Bytes>, ::err::Error>>>>;
pub async fn host(node_config: NodeConfigCached, service_version: ServiceVersion) -> Result<(), RetrievalError> {
static STATUS_BOARD_INIT: Once = Once::new();
STATUS_BOARD_INIT.call_once(|| {
@@ -158,7 +145,11 @@ pub async fn host(node_config: NodeConfigCached, service_version: ServiceVersion
let listener = TcpListener::bind(bind_addr).await?;
loop {
let (stream, addr) = listener.accept().await?;
let (stream, addr) = if let Ok(x) = listener.accept().await {
x
} else {
break;
};
debug!("new connection from {addr}");
let node_config = node_config.clone();
let service_version = service_version.clone();
@@ -188,7 +179,7 @@ async fn the_service_fn(
addr: SocketAddr,
node_config: NodeConfigCached,
service_version: ServiceVersion,
) -> Result<Response<RespBox>, Error> {
) -> Result<StreamResponse, Error> {
info!(
"http-request {:?} - {:?} - {:?} - {:?}",
addr,
@@ -205,7 +196,7 @@ async fn http_service(
req: Requ,
node_config: NodeConfigCached,
service_version: ServiceVersion,
) -> Result<Response<RespBox>, Error> {
) -> Result<StreamResponse, Error> {
match http_service_try(req, &node_config, &service_version).await {
Ok(k) => Ok(k),
Err(e) => {
@@ -282,7 +273,7 @@ impl ReqCtx {
}
// TODO remove because I want error bodies to be json.
pub fn response_err<T>(status: StatusCode, msg: T) -> Result<RespFull, RetrievalError>
pub fn response_err<T>(status: StatusCode, msg: T) -> Result<StreamResponse, RetrievalError>
where
T: AsRef<str>,
{
@@ -295,7 +286,7 @@ where
),
msg.as_ref()
);
let ret = response(status).body(Full::new(msg))?;
let ret = response(status).body(body_string(msg))?;
Ok(ret)
}
@@ -305,7 +296,7 @@ macro_rules! static_http {
let c = include_bytes!(concat!("../static/documentation/", $tgtex));
let ret = response(StatusCode::OK)
.header("content-type", $ctype)
.body(Full::new(&c[..]))?;
.body(body_bytes(c.to_vec()))?;
return Ok(ret);
}
};
@@ -314,7 +305,7 @@ macro_rules! static_http {
let c = include_bytes!(concat!("../static/documentation/", $tgt));
let ret = response(StatusCode::OK)
.header("content-type", $ctype)
.body(Full::new(&c[..]))?;
.body(body_bytes(c.to_vec()))?;
return Ok(ret);
}
};
@@ -326,7 +317,7 @@ macro_rules! static_http_api1 {
let c = include_bytes!(concat!("../static/documentation/", $tgtex));
let ret = response(StatusCode::OK)
.header("content-type", $ctype)
.body(Full::new(&c[..]))?;
.body(body_bytes(c.to_vec()))?;
return Ok(ret);
}
};
@@ -335,7 +326,7 @@ macro_rules! static_http_api1 {
let c = include_bytes!(concat!("../static/documentation/", $tgt));
let ret = response(StatusCode::OK)
.header("content-type", $ctype)
.body(Full::new(&c[..]))?;
.body(body_bytes(c.to_vec()))?;
return Ok(ret);
}
};
@@ -345,7 +336,7 @@ async fn http_service_try(
req: Requ,
node_config: &NodeConfigCached,
service_version: &ServiceVersion,
) -> Result<RespBox, Error> {
) -> Result<StreamResponse, Error> {
use http::HeaderValue;
let mut urlmarks = Vec::new();
urlmarks.push(format!("{}:{}", req.method(), req.uri()));
@@ -376,7 +367,7 @@ async fn http_service_inner(
ctx: &ReqCtx,
node_config: &NodeConfigCached,
service_version: &ServiceVersion,
) -> Result<RespBox, RetrievalError> {
) -> Result<StreamResponse, RetrievalError> {
let uri = req.uri().clone();
let path = uri.path();
if path == "/api/4/private/version" {
@@ -388,7 +379,7 @@ async fn http_service_inner(
"patch": service_version.patch,
},
});
Ok(response(StatusCode::OK).body(body_string(serde_json::to_string(&ret)?))?)
Ok(response(StatusCode::OK).body(ToJsonBody::from(&ret).into_body())?)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?)
}
@@ -529,13 +520,13 @@ async fn http_service_inner(
if req.method() == Method::GET {
api_1_docs(path)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Full::new(Bytes::new()))?)
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?)
}
} else if path.starts_with("/api/4/documentation/") {
if req.method() == Method::GET {
api_4_docs(path)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Full::new(Bytes::new()))?)
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?)
}
} else {
use std::fmt::Write;
@@ -556,7 +547,7 @@ async fn http_service_inner(
}
}
pub fn api_4_docs(path: &str) -> Result<RespFull, RetrievalError> {
pub fn api_4_docs(path: &str) -> Result<StreamResponse, RetrievalError> {
static_http!(path, "", "api4.html", "text/html");
static_http!(path, "style.css", "text/css");
static_http!(path, "script.js", "text/javascript");
@@ -564,7 +555,7 @@ pub fn api_4_docs(path: &str) -> Result<RespFull, RetrievalError> {
Ok(response(StatusCode::NOT_FOUND).body(body_empty())?)
}
pub fn api_1_docs(path: &str) -> Result<RespFull, RetrievalError> {
pub fn api_1_docs(path: &str) -> Result<StreamResponse, RetrievalError> {
static_http_api1!(path, "", "api1.html", "text/html");
static_http_api1!(path, "style.css", "text/css");
static_http_api1!(path, "script.js", "text/javascript");
@@ -582,7 +573,7 @@ impl StatusBoardAllHandler {
}
}
pub async fn handle(&self, _req: Requ, _node_config: &NodeConfigCached) -> Result<RespBox, RetrievalError> {
pub async fn handle(&self, _req: Requ, _node_config: &NodeConfigCached) -> Result<StreamResponse, RetrievalError> {
use std::ops::Deref;
let sb = status_board().unwrap();
let buf = serde_json::to_string(sb.deref()).unwrap();
@@ -591,7 +582,7 @@ impl StatusBoardAllHandler {
}
}
async fn prebinned(req: Requ, ctx: &ReqCtx, node_config: &NodeConfigCached) -> Result<RespBox, RetrievalError> {
async fn prebinned(req: Requ, ctx: &ReqCtx, node_config: &NodeConfigCached) -> Result<StreamResponse, RetrievalError> {
match prebinned_inner(req, ctx, node_config).await {
Ok(ret) => Ok(ret),
Err(e) => {
@@ -601,7 +592,11 @@ async fn prebinned(req: Requ, ctx: &ReqCtx, node_config: &NodeConfigCached) -> R
}
}
async fn prebinned_inner(req: Requ, _ctx: &ReqCtx, _node_config: &NodeConfigCached) -> Result<RespBox, RetrievalError> {
async fn prebinned_inner(
req: Requ,
_ctx: &ReqCtx,
_node_config: &NodeConfigCached,
) -> Result<StreamResponse, RetrievalError> {
let (head, _body) = req.into_parts();
let url: url::Url = format!("dummy://{}", head.uri).parse()?;
let query = PreBinnedQuery::from_url(&url)?;
@@ -614,14 +609,22 @@ async fn prebinned_inner(req: Requ, _ctx: &ReqCtx, _node_config: &NodeConfigCach
todo!()
}
async fn random_channel(req: Requ, _ctx: &ReqCtx, node_config: &NodeConfigCached) -> Result<RespBox, RetrievalError> {
async fn random_channel(
req: Requ,
_ctx: &ReqCtx,
node_config: &NodeConfigCached,
) -> Result<StreamResponse, RetrievalError> {
let (_head, _body) = req.into_parts();
let ret = dbconn::random_channel(node_config).await?;
let ret = response(StatusCode::OK).body(body_string(ret))?;
Ok(ret)
}
async fn clear_cache_all(req: Requ, _ctx: &ReqCtx, node_config: &NodeConfigCached) -> Result<RespBox, RetrievalError> {
async fn clear_cache_all(
req: Requ,
_ctx: &ReqCtx,
node_config: &NodeConfigCached,
) -> Result<StreamResponse, RetrievalError> {
let (head, _body) = req.into_parts();
let dry = match head.uri.query() {
Some(q) => q.contains("dry"),
@@ -638,7 +641,7 @@ async fn update_db_with_channel_names(
req: Requ,
_ctx: &ReqCtx,
node_config: &NodeConfigCached,
) -> Result<RespBox, RetrievalError> {
) -> Result<StreamResponse, RetrievalError> {
info!("httpret::update_db_with_channel_names");
let (head, _body) = req.into_parts();
let _dry = match head.uri.query() {
@@ -650,15 +653,16 @@ async fn update_db_with_channel_names(
.await;
match res {
Ok(res) => {
let stream = res.map(|k| match serde_json::to_string(&k) {
Ok(mut item) => {
item.push('\n');
Ok(Bytes::from(item))
}
Err(e) => Err(e),
});
let ret = response(StatusCode::OK)
.header(http::header::CONTENT_TYPE, APP_JSON_LINES)
.body(Body::wrap_stream(res.map(|k| match serde_json::to_string(&k) {
Ok(mut item) => {
item.push('\n');
Ok(item)
}
Err(e) => Err(e),
})))?;
.body(body_stream(stream))?;
Ok(ret)
}
Err(e) => {
@@ -676,22 +680,23 @@ async fn update_db_with_channel_names_3(
req: Requ,
_ctx: &ReqCtx,
node_config: &NodeConfigCached,
) -> Result<RespBox, RetrievalError> {
) -> Result<StreamResponse, RetrievalError> {
let (head, _body) = req.into_parts();
let _dry = match head.uri.query() {
Some(q) => q.contains("dry"),
None => false,
};
let res = dbconn::scan::update_db_with_channel_names_3(node_config);
let stream = res.map(|k| match serde_json::to_string(&k) {
Ok(mut item) => {
item.push('\n');
Ok(Bytes::from(item))
}
Err(e) => Err(e),
});
let ret = response(StatusCode::OK)
.header(http::header::CONTENT_TYPE, APP_JSON_LINES)
.body(Body::wrap_stream(res.map(|k| match serde_json::to_string(&k) {
Ok(mut item) => {
item.push('\n');
Ok(item)
}
Err(e) => Err(e),
})))?;
.body(body_stream(stream))?;
Ok(ret)
}
@@ -699,22 +704,23 @@ async fn update_db_with_all_channel_configs(
req: Requ,
_ctx: &ReqCtx,
node_config: &NodeConfigCached,
) -> Result<RespBox, RetrievalError> {
) -> Result<StreamResponse, RetrievalError> {
let (head, _body) = req.into_parts();
let _dry = match head.uri.query() {
Some(q) => q.contains("dry"),
None => false,
};
let res = dbconn::scan::update_db_with_all_channel_configs(node_config.clone()).await?;
let stream = res.map(|k| match serde_json::to_string(&k) {
Ok(mut item) => {
item.push('\n');
Ok(Bytes::from(item))
}
Err(e) => Err(e),
});
let ret = response(StatusCode::OK)
.header(http::header::CONTENT_TYPE, APP_JSON_LINES)
.body(Body::wrap_stream(res.map(|k| match serde_json::to_string(&k) {
Ok(mut item) => {
item.push('\n');
Ok(item)
}
Err(e) => Err(e),
})))?;
.body(body_stream(stream))?;
Ok(ret)
}
@@ -722,7 +728,7 @@ async fn update_search_cache(
req: Requ,
_ctx: &ReqCtx,
node_config: &NodeConfigCached,
) -> Result<RespBox, RetrievalError> {
) -> Result<StreamResponse, RetrievalError> {
let (head, _body) = req.into_parts();
let _dry = match head.uri.query() {
Some(q) => q.contains("dry"),

View File

@@ -19,12 +19,17 @@ use futures_util::pin_mut;
use futures_util::Stream;
use http::Method;
use http::StatusCode;
use hyper::service::make_service_fn;
use httpclient::body_empty;
use httpclient::body_stream;
use httpclient::body_string;
use httpclient::read_body_bytes;
use httpclient::IntoBody;
use httpclient::Requ;
use httpclient::StreamResponse;
use httpclient::ToJsonBody;
use hyper::service::service_fn;
use hyper::Body;
use hyper::Request;
use hyper::Response;
use hyper::Server;
use hyper_util::rt::TokioIo;
use itertools::Itertools;
use netpod::log::*;
use netpod::query::ChannelStateEventsQuery;
@@ -55,43 +60,60 @@ use taskrun::tokio;
use tokio::fs::File;
use tokio::io::AsyncRead;
use tokio::io::ReadBuf;
use tokio::net::TcpListener;
use url::Url;
const DISTRI_PRE: &str = "/distri/";
pub async fn proxy(proxy_config: ProxyConfig, service_version: ServiceVersion) -> Result<(), Error> {
use std::str::FromStr;
let addr = SocketAddr::from_str(&format!("{}:{}", proxy_config.listen, proxy_config.port))?;
let make_service = make_service_fn({
move |_conn| {
let proxy_config = proxy_config.clone();
let service_version = service_version.clone();
async move {
Ok::<_, Error>(service_fn({
move |req| {
info!(
"http-request {:?} - {:?} - {:?} - {:?}",
addr,
req.method(),
req.uri(),
req.headers()
);
let f = proxy_http_service(req, proxy_config.clone(), service_version.clone());
Cont { f: Box::pin(f) }
}
}))
let bind_addr = SocketAddr::from_str(&format!("{}:{}", proxy_config.listen, proxy_config.port))?;
let listener = TcpListener::bind(bind_addr).await?;
loop {
let (stream, addr) = if let Ok(x) = listener.accept().await {
x
} else {
break;
};
debug!("new connection from {addr}");
let proxy_config = proxy_config.clone();
let service_version = service_version.clone();
let io = TokioIo::new(stream);
tokio::task::spawn(async move {
let res = hyper::server::conn::http1::Builder::new()
.serve_connection(
io,
service_fn({
move |req| {
info!(
"http-request {:?} - {:?} - {:?} - {:?}",
bind_addr,
req.method(),
req.uri(),
req.headers()
);
let f = proxy_http_service(req, proxy_config.clone(), service_version.clone());
Cont { f: Box::pin(f) }
}
}),
)
.await;
match res {
Ok(()) => {}
Err(e) => {
error!("{e}");
}
}
}
});
Server::bind(&addr).serve(make_service).await?;
});
}
Ok(())
}
async fn proxy_http_service(
req: Request<Body>,
req: Requ,
proxy_config: ProxyConfig,
service_version: ServiceVersion,
) -> Result<Response<Body>, Error> {
) -> Result<StreamResponse, Error> {
match proxy_http_service_try(req, &proxy_config, &service_version).await {
Ok(k) => Ok(k),
Err(e) => {
@@ -102,10 +124,10 @@ async fn proxy_http_service(
}
async fn proxy_http_service_try(
req: Request<Body>,
req: Requ,
proxy_config: &ProxyConfig,
service_version: &ServiceVersion,
) -> Result<Response<Body>, Error> {
) -> Result<StreamResponse, Error> {
let ctx = ReqCtx::with_proxy(&req, proxy_config);
let mut res = proxy_http_service_inner(req, &ctx, proxy_config, &service_version).await?;
let hm = res.headers_mut();
@@ -119,11 +141,11 @@ async fn proxy_http_service_try(
}
async fn proxy_http_service_inner(
req: Request<Body>,
req: Requ,
ctx: &ReqCtx,
proxy_config: &ProxyConfig,
service_version: &ServiceVersion,
) -> Result<Response<Body>, Error> {
) -> Result<StreamResponse, Error> {
let uri = req.uri().clone();
let path = uri.path();
if path == "/api/1/channels" {
@@ -141,9 +163,9 @@ async fn proxy_http_service_inner(
"patch": service_version.patch,
},
});
Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&ret)?))?)
Ok(response(StatusCode::OK).body(ToJsonBody::from(&ret).into_body())?)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?)
}
} else if let Some(h) = api4::StatusNodesRecursive::handler(&req) {
h.handle(req, ctx, &proxy_config, service_version).await
@@ -169,34 +191,34 @@ async fn proxy_http_service_inner(
if req.method() == Method::GET {
Ok(api_1_docs(path)?)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?)
}
} else if path.starts_with("/api/4/documentation/") {
if req.method() == Method::GET {
Ok(api_4_docs(path)?)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?)
}
} else if path.starts_with("/api/4/test/http/204") {
Ok(response(StatusCode::NO_CONTENT).body(Body::from("No Content"))?)
Ok(response(StatusCode::NO_CONTENT).body(body_string("No Content"))?)
} else if path.starts_with("/api/4/test/http/400") {
Ok(response(StatusCode::BAD_REQUEST).body(Body::from("Bad Request"))?)
Ok(response(StatusCode::BAD_REQUEST).body(body_string("Bad Request"))?)
} else if path.starts_with("/api/4/test/http/405") {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::from("Method Not Allowed"))?)
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_string("Method Not Allowed"))?)
} else if path.starts_with("/api/4/test/http/406") {
Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::from("Not Acceptable"))?)
Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_string("Not Acceptable"))?)
} else if path.starts_with("/api/4/test/log/error") {
error!("{path}");
Ok(response(StatusCode::OK).body(Body::empty())?)
Ok(response(StatusCode::OK).body(body_empty())?)
} else if path.starts_with("/api/4/test/log/warn") {
warn!("{path}");
Ok(response(StatusCode::OK).body(Body::empty())?)
Ok(response(StatusCode::OK).body(body_empty())?)
} else if path.starts_with("/api/4/test/log/info") {
info!("{path}");
Ok(response(StatusCode::OK).body(Body::empty())?)
Ok(response(StatusCode::OK).body(body_empty())?)
} else if path.starts_with("/api/4/test/log/debug") {
debug!("{path}");
Ok(response(StatusCode::OK).body(Body::empty())?)
Ok(response(StatusCode::OK).body(body_empty())?)
} else if let Some(h) = api1::PythonDataApi1Query::handler(&req) {
h.handle(req, ctx, proxy_config).await
} else if let Some(h) = api1::reqstatus::RequestStatusHandler::handler(&req) {
@@ -218,11 +240,11 @@ async fn proxy_http_service_inner(
write!(out, "HEADER {hn:?}: {hv:?}<br>\n")?;
}
write!(out, "</pre>\n")?;
Ok(response(StatusCode::NOT_FOUND).body(Body::from(body))?)
Ok(response(StatusCode::NOT_FOUND).body(body_string(body))?)
}
}
pub async fn proxy_distribute_v2(req: Request<Body>) -> Result<Response<Body>, Error> {
pub async fn proxy_distribute_v2(req: Requ) -> Result<StreamResponse, Error> {
let path = req.uri().path();
if path
.chars()
@@ -233,9 +255,9 @@ pub async fn proxy_distribute_v2(req: Request<Body>) -> Result<Response<Body>, E
let s = FileStream {
file: File::open(format!("/opt/distri/{}", &path[DISTRI_PRE.len()..])).await?,
};
Ok(response(StatusCode::OK).body(Body::wrap_stream(s))?)
Ok(response(StatusCode::OK).body(body_stream(s))?)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?)
}
}
@@ -276,14 +298,14 @@ pub struct BackendsResponse {
backends: Vec<String>,
}
pub async fn backends(_req: Request<Body>, proxy_config: &ProxyConfig) -> Result<Response<Body>, Error> {
pub async fn backends(_req: Requ, proxy_config: &ProxyConfig) -> Result<StreamResponse, Error> {
let backends: Vec<_> = proxy_config.backends.iter().map(|k| k.name.to_string()).collect();
let res = BackendsResponse { backends };
let ret = response(StatusCode::OK).body(Body::from(serde_json::to_vec(&res)?))?;
let ret = response(StatusCode::OK).body(ToJsonBody::from(&res).into_body())?;
Ok(ret)
}
pub async fn channel_search(req: Request<Body>, proxy_config: &ProxyConfig) -> Result<Response<Body>, Error> {
pub async fn channel_search(req: Requ, proxy_config: &ProxyConfig) -> Result<StreamResponse, Error> {
let (head, _body) = req.into_parts();
match head.headers.get(http::header::ACCEPT) {
Some(v) => {
@@ -331,13 +353,18 @@ pub async fn channel_search(req: Request<Body>, proxy_config: &ProxyConfig) -> R
};
let qs = serde_json::to_string(&q).unwrap();
methods.push(http::Method::POST);
bodies.push(Some(Body::from(qs)));
bodies.push(Some(qs));
});
}
let tags = urls.iter().map(|k| k.to_string()).collect();
let nt = |tag, res| {
// let nt = |tag: String, res: Response<hyper::body::Incoming>| {
fn fn_nt(
tag: String,
res: Response<hyper::body::Incoming>,
) -> Pin<Box<dyn Future<Output = Result<SubRes<ChannelSearchResult>, Error>> + Send>> {
let fut = async {
let body = hyper::body::to_bytes(res).await?;
let (_head, body) = res.into_parts();
let body = read_body_bytes(body).await?;
//info!("got a result {:?}", body);
let res: SubRes<ChannelSearchResult> =
match serde_json::from_slice::<ChannelSearchResult>(&body) {
@@ -408,7 +435,7 @@ pub async fn channel_search(req: Request<Body>, proxy_config: &ProxyConfig) -> R
Ok(res)
};
Box::pin(fut) as Pin<Box<dyn Future<Output = _> + Send>>
};
}
let ft = |all: Vec<(crate::gather::Tag, Result<SubRes<ChannelSearchResult>, Error>)>| {
let mut res = Vec::new();
for (_tag, j) in all {
@@ -426,7 +453,7 @@ pub async fn channel_search(req: Request<Body>, proxy_config: &ProxyConfig) -> R
let res = ChannelSearchResult { channels: res };
let res = response(StatusCode::OK)
.header(http::header::CONTENT_TYPE, APP_JSON)
.body(Body::from(serde_json::to_string(&res)?))?;
.body(ToJsonBody::from(&res).into_body())?;
Ok(res)
};
// TODO gather_get_json_generic must for this case accept a Method for each Request.
@@ -438,25 +465,25 @@ pub async fn channel_search(req: Request<Body>, proxy_config: &ProxyConfig) -> R
urls,
bodies,
tags,
nt,
fn_nt,
ft,
Duration::from_millis(3000),
)
.await?;
Ok(ret)
} else {
Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?)
Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?)
}
}
None => Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?),
None => Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?),
}
}
pub async fn proxy_single_backend_query<QT>(
req: Request<Body>,
req: Requ,
_ctx: &ReqCtx,
proxy_config: &ProxyConfig,
) -> Result<Response<Body>, Error>
) -> Result<StreamResponse, Error>
where
QT: FromUrl + AppendToUrl + HasBackend + HasTimeout,
{
@@ -510,11 +537,11 @@ where
a
})?;
let tags: Vec<_> = urls.iter().map(|k| k.to_string()).collect();
let nt = |tag: String, res: Response<Body>| {
let nt = |tag: String, res: Response<hyper::body::Incoming>| {
let fut = async {
let (head, body) = res.into_parts();
if head.status == StatusCode::OK {
let body = hyper::body::to_bytes(body).await?;
let body = read_body_bytes(body).await?;
match serde_json::from_slice::<JsonValue>(&body) {
Ok(val) => {
let ret = SubRes {
@@ -530,7 +557,7 @@ where
}
}
} else {
let body = hyper::body::to_bytes(body).await?;
let body = read_body_bytes(body).await?;
let b = String::from_utf8_lossy(&body);
let ret = SubRes {
tag,
@@ -553,7 +580,7 @@ where
// TODO want to pass arbitrary body type:
let res = response(z.status)
.header(http::header::CONTENT_TYPE, APP_JSON)
.body(Body::from(serde_json::to_string(&res)?))?;
.body(ToJsonBody::from(&res).into_body())?;
return Ok(res);
}
Err(e) => {
@@ -571,10 +598,10 @@ where
gather_get_json_generic(http::Method::GET, urls, bodies, tags, nt, ft, query.timeout()).await?;
Ok(ret)
} else {
Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?)
Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?)
}
}
None => Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?),
None => Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?),
}
}

View File

@@ -3,13 +3,20 @@ pub mod reqstatus;
use crate::bodystream::response;
use crate::err::Error;
use crate::ReqCtx;
use http::header;
use http::HeaderValue;
use http::Method;
use http::Request;
use http::Response;
use http::StatusCode;
use hyper::Body;
use hyper::Client;
use http::Uri;
use httpclient::body_bytes;
use httpclient::body_empty;
use httpclient::body_stream;
use httpclient::connect_client;
use httpclient::read_body_bytes;
use httpclient::Requ;
use httpclient::StreamIncoming;
use httpclient::StreamResponse;
use netpod::log::*;
use netpod::query::api1::Api1Query;
use netpod::ProxyConfig;
@@ -23,7 +30,7 @@ impl PythonDataApi1Query {
"/api/1/query"
}
pub fn handler(req: &Request<Body>) -> Option<Self> {
pub fn handler(req: &Requ) -> Option<Self> {
if req.uri().path() == Self::path() {
Some(Self {})
} else {
@@ -31,14 +38,9 @@ impl PythonDataApi1Query {
}
}
pub async fn handle(
&self,
req: Request<Body>,
_ctx: &ReqCtx,
proxy_config: &ProxyConfig,
) -> Result<Response<Body>, Error> {
pub async fn handle(&self, req: Requ, _ctx: &ReqCtx, proxy_config: &ProxyConfig) -> Result<StreamResponse, Error> {
if req.method() != Method::POST {
return Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?);
return Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?);
}
let (head, body) = req.into_parts();
let _accept = head
@@ -47,7 +49,7 @@ impl PythonDataApi1Query {
.map_or(Ok(ACCEPT_ALL), |k| k.to_str())
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?
.to_owned();
let body_data = hyper::body::to_bytes(body).await?;
let body_data = read_body_bytes(body).await?;
if body_data.len() < 512 && body_data.first() == Some(&"{".as_bytes()[0]) {
info!("request body_data string: {}", String::from_utf8_lossy(&body_data));
}
@@ -73,24 +75,28 @@ impl PythonDataApi1Query {
if let Some(back) = back {
let url_str = format!("{}/api/1/query", back.url);
info!("try to ask {url_str}");
let uri: Uri = url_str.parse()?;
let req = Request::builder()
.method(Method::POST)
.uri(url_str)
.body(Body::from(body_data))?;
let client = Client::new();
let res = client.request(req).await?;
.header(header::HOST, uri.host().unwrap())
.uri(&uri)
.body(body_bytes(body_data))?;
let mut client = connect_client(&uri).await?;
let res = client.send_request(req).await?;
let (head, body) = res.into_parts();
if head.status != StatusCode::OK {
error!("backend returned error: {head:?}");
Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())?)
Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(body_empty())?)
} else {
info!("backend returned OK");
let riq_def = HeaderValue::from_static("(none)");
let riq = head.headers.get(X_DAQBUF_REQID).unwrap_or(&riq_def);
Ok(response(StatusCode::OK).header(X_DAQBUF_REQID, riq).body(body)?)
Ok(response(StatusCode::OK)
.header(X_DAQBUF_REQID, riq)
.body(body_stream(StreamIncoming::new(body)))?)
}
} else {
Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())?)
Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(body_empty())?)
}
}
}

View File

@@ -1,11 +1,16 @@
use crate::bodystream::response;
use crate::err::Error;
use http::header;
use http::Method;
use http::Request;
use http::Response;
use http::StatusCode;
use hyper::Body;
use hyper::Client;
use http::Uri;
use httpclient::body_bytes;
use httpclient::body_empty;
use httpclient::connect_client;
use httpclient::read_body_bytes;
use httpclient::Requ;
use httpclient::StreamResponse;
use netpod::log::*;
use netpod::ProxyConfig;
use netpod::ACCEPT_ALL;
@@ -18,7 +23,7 @@ impl RequestStatusHandler {
"/api/1/requestStatus/"
}
pub fn handler(req: &Request<Body>) -> Option<Self> {
pub fn handler(req: &Requ) -> Option<Self> {
if req.uri().path().starts_with(Self::path_prefix()) {
Some(Self {})
} else {
@@ -26,10 +31,10 @@ impl RequestStatusHandler {
}
}
pub async fn handle(&self, req: Request<Body>, proxy_config: &ProxyConfig) -> Result<Response<Body>, Error> {
pub async fn handle(&self, req: Requ, proxy_config: &ProxyConfig) -> Result<StreamResponse, Error> {
let (head, body) = req.into_parts();
if head.method != Method::GET {
return Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?);
return Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?);
}
let accept = head
.headers
@@ -41,9 +46,9 @@ impl RequestStatusHandler {
// TODO set the public error code and message and return Err(e).
let e = Error::with_public_msg_no_trace(format!("Unsupported Accept: {:?}", accept));
error!("{e}");
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?);
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?);
}
let _body_data = hyper::body::to_bytes(body).await?;
let _body_data = read_body_bytes(body).await?;
let status_id = &head.uri.path()[Self::path_prefix().len()..];
debug!("RequestStatusHandler status_id {:?}", status_id);
@@ -60,22 +65,24 @@ impl RequestStatusHandler {
if let Some(back) = back {
let url_str = format!("{}{}{}", back.url, Self::path_prefix(), status_id);
debug!("try to ask {url_str}");
let uri: Uri = url_str.parse()?;
let req = Request::builder()
.method(Method::GET)
.uri(url_str)
.body(Body::empty())?;
let client = Client::new();
let res = client.request(req).await?;
.header(header::HOST, uri.host().unwrap())
.uri(&uri)
.body(body_empty())?;
let res = connect_client(&uri).await?.send_request(req).await?;
let (head, body) = res.into_parts();
if head.status != StatusCode::OK {
error!("backend returned error: {head:?}");
Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())?)
Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(body_empty())?)
} else {
debug!("backend returned OK");
Ok(response(StatusCode::OK).body(body)?)
let body = read_body_bytes(body).await?;
Ok(response(StatusCode::OK).body(body_bytes(body))?)
}
} else {
Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())?)
Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(body_empty())?)
}
}
}

View File

@@ -9,9 +9,15 @@ use crate::response;
use crate::ReqCtx;
use futures_util::Future;
use http::Method;
use http::Request;
use http::Response;
use http::StatusCode;
use httpclient::body_empty;
use httpclient::read_body_bytes;
use httpclient::IntoBody;
use httpclient::Requ;
use httpclient::StreamResponse;
use httpclient::ToJsonBody;
use hyper::body::Incoming;
use netpod::log::*;
use netpod::ChannelSearchQuery;
use netpod::ChannelSearchResult;
@@ -34,7 +40,7 @@ use url::Url;
// The aggregators and leaf nodes behind should as well not depend on backend,
// but simply answer all matching.
pub async fn channel_search(req: Request<Body>, proxy_config: &ProxyConfig) -> Result<ChannelSearchResult, Error> {
pub async fn channel_search(req: Requ, proxy_config: &ProxyConfig) -> Result<ChannelSearchResult, Error> {
let (head, _body) = req.into_parts();
let inpurl = Url::parse(&format!("dummy:{}", head.uri))?;
let query = ChannelSearchQuery::from_url(&inpurl)?;
@@ -58,9 +64,10 @@ pub async fn channel_search(req: Request<Body>, proxy_config: &ProxyConfig) -> R
}
}
}
let nt = |tag, res| {
let nt = |tag: String, res: Response<Incoming>| {
let fut = async {
let body = hyper::body::to_bytes(res).await?;
let (_head, body) = res.into_parts();
let body = read_body_bytes(body).await?;
//info!("got a result {:?}", body);
let res: ChannelSearchResult = match serde_json::from_slice(&body) {
Ok(k) => k,
@@ -112,7 +119,7 @@ pub async fn channel_search(req: Request<Body>, proxy_config: &ProxyConfig) -> R
pub struct ChannelSearchAggHandler {}
impl ChannelSearchAggHandler {
pub fn handler(req: &Request<Body>) -> Option<Self> {
pub fn handler(req: &Requ) -> Option<Self> {
if req.uri().path() == "/api/4/search/channel" {
Some(Self {})
} else {
@@ -120,7 +127,7 @@ impl ChannelSearchAggHandler {
}
}
pub async fn handle(&self, req: Request<Body>, node_config: &ProxyConfig) -> Result<Response<Body>, Error> {
pub async fn handle(&self, req: Requ, node_config: &ProxyConfig) -> Result<StreamResponse, Error> {
if req.method() == Method::GET {
let accept_def = APP_JSON;
let accept = req
@@ -129,20 +136,17 @@ impl ChannelSearchAggHandler {
.map_or(accept_def, |k| k.to_str().unwrap_or(accept_def));
if accept.contains(APP_JSON) || accept.contains(ACCEPT_ALL) {
match channel_search(req, node_config).await {
Ok(item) => {
let buf = serde_json::to_vec(&item)?;
Ok(response(StatusCode::OK).body(Body::from(buf))?)
}
Ok(item) => Ok(response(StatusCode::OK).body(ToJsonBody::from(&item).into_body())?),
Err(e) => {
warn!("ChannelConfigHandler::handle: got error from channel_config: {e:?}");
Ok(e.to_public_response())
}
}
} else {
Ok(response(StatusCode::BAD_REQUEST).body(Body::empty())?)
Ok(response(StatusCode::BAD_REQUEST).body(body_empty())?)
}
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?)
}
}
}
@@ -150,7 +154,7 @@ impl ChannelSearchAggHandler {
pub struct StatusNodesRecursive {}
impl StatusNodesRecursive {
pub fn handler(req: &Request<Body>) -> Option<Self> {
pub fn handler(req: &Requ) -> Option<Self> {
if req.uri().path() == crate::api4::status::StatusNodesRecursive::path() {
Some(Self {})
} else {
@@ -160,15 +164,14 @@ impl StatusNodesRecursive {
pub async fn handle(
&self,
req: Request<Body>,
req: Requ,
ctx: &ReqCtx,
proxy_config: &ProxyConfig,
service_version: &ServiceVersion,
) -> Result<Response<Body>, Error> {
) -> Result<StreamResponse, Error> {
match self.status(req, ctx, proxy_config, service_version).await {
Ok(status) => {
let body = serde_json::to_vec(&status)?;
let ret = response(StatusCode::OK).body(Body::from(body))?;
let ret = response(StatusCode::OK).body(ToJsonBody::from(&status).into_body())?;
Ok(ret)
}
Err(e) => {
@@ -181,7 +184,7 @@ impl StatusNodesRecursive {
async fn status(
&self,
_req: Request<Body>,
_req: Requ,
_ctx: &ReqCtx,
proxy_config: &ProxyConfig,
service_version: &ServiceVersion,
@@ -200,9 +203,10 @@ impl StatusNodesRecursive {
Err(e) => return Err(Error::with_msg_no_trace(format!("parse error for: {sub:?} {e:?}"))),
}
}
let nt = |tag, res| {
let nt = |tag: String, res: Response<Incoming>| {
let fut = async {
let body = hyper::body::to_bytes(res).await?;
let (_head, body) = res.into_parts();
let body = read_body_bytes(body).await?;
let res: JsVal = match serde_json::from_slice(&body) {
Ok(k) => k,
Err(e) => {

View File

@@ -1,10 +1,11 @@
use crate::bodystream::response;
use crate::err::Error;
use crate::ReqCtx;
use http::Request;
use http::Response;
use http::StatusCode;
use hyper::Body;
use httpclient::IntoBody;
use httpclient::Requ;
use httpclient::StreamResponse;
use httpclient::ToJsonBody;
use netpod::log::*;
use netpod::ProxyConfig;
@@ -15,7 +16,7 @@ impl CaIocLookup {
"/api/4/channel-access/search/addr"
}
pub fn handler(req: &Request<Body>) -> Option<Self> {
pub fn handler(req: &Requ) -> Option<Self> {
if req.uri().path() == Self::path() {
Some(Self {})
} else {
@@ -23,16 +24,10 @@ impl CaIocLookup {
}
}
pub async fn handle(
&self,
req: Request<Body>,
ctx: &ReqCtx,
node_config: &ProxyConfig,
) -> Result<Response<Body>, Error> {
pub async fn handle(&self, req: Requ, ctx: &ReqCtx, node_config: &ProxyConfig) -> Result<StreamResponse, Error> {
match self.search(req, ctx, node_config).await {
Ok(status) => {
let body = serde_json::to_vec(&status)?;
let ret = response(StatusCode::OK).body(Body::from(body))?;
let ret = response(StatusCode::OK).body(ToJsonBody::from(&status).into_body())?;
Ok(ret)
}
Err(e) => {
@@ -43,12 +38,7 @@ impl CaIocLookup {
}
}
async fn search(
&self,
_req: Request<Body>,
_ctx: &ReqCtx,
_proxy_config: &ProxyConfig,
) -> Result<Option<String>, Error> {
async fn search(&self, _req: Requ, _ctx: &ReqCtx, _proxy_config: &ProxyConfig) -> Result<Option<String>, Error> {
Ok(None)
}
}

View File

@@ -4,7 +4,6 @@ use crate::cache::Cache;
use crate::err::Error;
use crate::response;
use crate::Requ;
use crate::RespFull;
use bytes::Buf;
use bytes::BufMut;
use bytes::BytesMut;
@@ -19,6 +18,7 @@ use http::StatusCode;
use http::Uri;
use httpclient::connect_client;
use httpclient::read_body_bytes;
use httpclient::StreamResponse;
use hyper::Request;
use netpod::log::*;
use netpod::timeunits::SEC;
@@ -401,7 +401,7 @@ impl IndexFullHttpFunction {
}
}
pub async fn handle(&self, req: Requ, node_config: &NodeConfigCached) -> Result<RespFull, Error> {
pub async fn handle(&self, req: Requ, node_config: &NodeConfigCached) -> Result<StreamResponse, Error> {
if req.method() != Method::GET {
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?);
}
@@ -908,7 +908,7 @@ impl MapPulseScyllaHandler {
}
}
pub async fn handle(&self, req: Requ, node_config: &NodeConfigCached) -> Result<RespFull, Error> {
pub async fn handle(&self, req: Requ, node_config: &NodeConfigCached) -> Result<StreamResponse, Error> {
if req.method() != Method::GET {
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?);
}
@@ -960,7 +960,7 @@ impl MapPulseLocalHttpFunction {
}
}
pub async fn handle(&self, req: Requ, node_config: &NodeConfigCached) -> Result<RespFull, Error> {
pub async fn handle(&self, req: Requ, node_config: &NodeConfigCached) -> Result<StreamResponse, Error> {
if req.method() != Method::GET {
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?);
}
@@ -1119,7 +1119,7 @@ impl MapPulseHistoHttpFunction {
}
}
pub async fn handle(&self, req: Requ, node_config: &NodeConfigCached) -> Result<RespFull, Error> {
pub async fn handle(&self, req: Requ, node_config: &NodeConfigCached) -> Result<StreamResponse, Error> {
if req.method() != Method::GET {
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?);
}
@@ -1210,7 +1210,7 @@ impl MapPulseHttpFunction {
}
}
pub async fn handle(&self, req: Requ, node_config: &NodeConfigCached) -> Result<RespFull, Error> {
pub async fn handle(&self, req: Requ, node_config: &NodeConfigCached) -> Result<StreamResponse, Error> {
use crate::cache::CachePortal;
if req.method() != Method::GET {
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?);
@@ -1343,7 +1343,7 @@ impl Api4MapPulseHttpFunction {
res
}
pub async fn handle(&self, req: Requ, ncc: &NodeConfigCached) -> Result<RespFull, Error> {
pub async fn handle(&self, req: Requ, ncc: &NodeConfigCached) -> Result<StreamResponse, Error> {
if req.method() != Method::GET {
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?);
}
@@ -1394,7 +1394,7 @@ impl Api4MapPulse2HttpFunction {
path.starts_with(Self::path_prefix())
}
pub async fn handle(&self, req: Requ, ncc: &NodeConfigCached) -> Result<RespFull, Error> {
pub async fn handle(&self, req: Requ, ncc: &NodeConfigCached) -> Result<StreamResponse, Error> {
if req.method() != Method::GET {
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?);
}
@@ -1440,7 +1440,7 @@ impl MarkClosedHttpFunction {
}
}
pub async fn handle(&self, req: Requ, node_config: &NodeConfigCached) -> Result<RespFull, Error> {
pub async fn handle(&self, req: Requ, node_config: &NodeConfigCached) -> Result<StreamResponse, Error> {
if req.method() != Method::GET {
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?);
}

View File

@@ -3,8 +3,10 @@ use crate::response;
use http::header;
use http::Method;
use http::StatusCode;
use hyper::Request;
use hyper::Response;
use httpclient::body_empty;
use httpclient::body_string;
use httpclient::Requ;
use httpclient::StreamResponse;
use netpod::log::*;
use netpod::NodeConfigCached;
use netpod::ACCEPT_ALL;
@@ -17,7 +19,7 @@ impl SettingsThreadsMaxHandler {
"/api/4/settings/read3/threads_max"
}
pub fn handler(req: &Request<Body>) -> Option<Self> {
pub fn handler(req: &Requ) -> Option<Self> {
if req.uri().path().starts_with(Self::path_prefix()) {
Some(Self {})
} else {
@@ -25,7 +27,7 @@ impl SettingsThreadsMaxHandler {
}
}
pub async fn put(&self, req: Request<Body>, _node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
pub async fn put(&self, req: Requ, _node_config: &NodeConfigCached) -> Result<StreamResponse, Error> {
let (head, body) = req.into_parts();
let accept = head
.headers
@@ -35,32 +37,32 @@ impl SettingsThreadsMaxHandler {
.to_owned();
if accept != APP_JSON && accept != ACCEPT_ALL {
// TODO set the public error code and message and return Err(e).
let e = Error::with_public_msg_no_trace(format!("Unsupported Accept: {:?}", accept));
let e = Error::with_public_msg_no_trace(format!("unsupported accept: {:?}", accept));
error!("{e}");
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?);
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(body_empty())?);
}
let body = httpclient::read_body_bytes(body).await?;
//let threads_max: usize = head.uri.path()[Self::path_prefix().len()..].parse()?;
let threads_max: usize = String::from_utf8_lossy(&body).parse()?;
info!("threads_max {threads_max}");
disk::read3::Read3::get().set_threads_max(threads_max);
let ret = response(StatusCode::NO_CONTENT).body(Body::empty())?;
let ret = response(StatusCode::NO_CONTENT).body(body_empty())?;
Ok(ret)
}
pub async fn get(&self, _req: Request<Body>, _node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
pub async fn get(&self, _req: Requ, _node_config: &NodeConfigCached) -> Result<StreamResponse, Error> {
let threads_max = disk::read3::Read3::get().threads_max();
let ret = response(StatusCode::OK).body(Body::from(format!("{threads_max}")))?;
let ret = response(StatusCode::OK).body(body_string(format!("{threads_max}")))?;
Ok(ret)
}
pub async fn handle(&self, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
pub async fn handle(&self, req: Requ, node_config: &NodeConfigCached) -> Result<StreamResponse, Error> {
if req.method() == Method::GET {
self.get(req, node_config).await
} else if req.method() == Method::PUT {
self.put(req, node_config).await
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?)
}
}
}

View File

@@ -8,6 +8,9 @@ use crate::frames::inmem::InMemoryFrameStream;
use crate::frames::inmem::TcpReadAsBytes;
use err::Error;
use futures_util::Stream;
use http::Uri;
use httpclient::body_bytes;
use httpclient::http;
use items_0::framable::FrameTypeInnerStatic;
use items_0::streamitem::sitem_data;
use items_0::streamitem::Sitemty;
@@ -62,21 +65,22 @@ pub async fn x_processed_event_blobs_stream_from_node_http(
use http::header;
use http::Method;
use http::Request;
use httpclient::http;
use httpclient::hyper;
use hyper::StatusCode;
let frame1 = make_node_command_frame(subq.clone())?;
let item = sitem_data(frame1.clone());
let buf = item.make_frame()?.freeze();
let buf = item.make_frame()?;
let url = node.baseurl().join("/api/4/private/eventdata/frames").unwrap();
debug!("open_event_data_streams_http post {url}");
let uri: Uri = url.as_str().parse().unwrap();
let req = Request::builder()
.method(Method::POST)
.uri(url.to_string())
.uri(&uri)
.header(header::HOST, uri.host().unwrap())
.header(header::ACCEPT, APP_OCTET)
.body(httpclient::Full::new(buf))
.body(body_bytes(buf))
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
let mut client = httpclient::connect_client(req.uri()).await?;
let res = client
@@ -161,20 +165,20 @@ where
use http::header;
use http::Method;
use http::Request;
use httpclient::http;
use httpclient::hyper;
use hyper::StatusCode;
use httpclient::hyper::StatusCode;
let item = sitem_data(frame1.clone());
let buf = item.make_frame()?.freeze();
let buf = item.make_frame()?;
let url = node.baseurl().join("/api/4/private/eventdata/frames").unwrap();
debug!("open_event_data_streams_http post {url}");
let uri: Uri = url.as_str().parse().unwrap();
let req = Request::builder()
.method(Method::POST)
.uri(url.to_string())
.uri(&uri)
.header(header::HOST, uri.host().unwrap())
.header(header::ACCEPT, APP_OCTET)
.body(httpclient::Full::new(buf))
.body(body_bytes(buf))
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
let mut client = httpclient::connect_client(req.uri()).await?;
let res = client