Proxy api1 status request
This commit is contained in:
@@ -92,7 +92,7 @@ pub async fn get_binned(
|
|||||||
head, s
|
head, s
|
||||||
)));
|
)));
|
||||||
}
|
}
|
||||||
let perf_opts = PerfOpts { inmem_bufcap: 512 };
|
let perf_opts = PerfOpts::default();
|
||||||
let s1 = HttpBodyAsAsyncRead::new(res);
|
let s1 = HttpBodyAsAsyncRead::new(res);
|
||||||
let s2 = InMemoryFrameAsyncReadStream::new(s1, perf_opts.inmem_bufcap);
|
let s2 = InMemoryFrameAsyncReadStream::new(s1, perf_opts.inmem_bufcap);
|
||||||
use futures_util::StreamExt;
|
use futures_util::StreamExt;
|
||||||
|
|||||||
@@ -101,7 +101,7 @@ where
|
|||||||
let beg_date: DateTime<Utc> = beg_date.parse()?;
|
let beg_date: DateTime<Utc> = beg_date.parse()?;
|
||||||
let end_date: DateTime<Utc> = end_date.parse()?;
|
let end_date: DateTime<Utc> = end_date.parse()?;
|
||||||
let channel_backend = "testbackend";
|
let channel_backend = "testbackend";
|
||||||
let perf_opts = PerfOpts { inmem_bufcap: 512 };
|
let perf_opts = PerfOpts::default();
|
||||||
let channel = Channel {
|
let channel = Channel {
|
||||||
backend: channel_backend.into(),
|
backend: channel_backend.into(),
|
||||||
name: channel_name.into(),
|
name: channel_name.into(),
|
||||||
|
|||||||
@@ -76,7 +76,7 @@ where
|
|||||||
let beg_date: DateTime<Utc> = beg_date.parse()?;
|
let beg_date: DateTime<Utc> = beg_date.parse()?;
|
||||||
let end_date: DateTime<Utc> = end_date.parse()?;
|
let end_date: DateTime<Utc> = end_date.parse()?;
|
||||||
let channel_backend = "testbackend";
|
let channel_backend = "testbackend";
|
||||||
let perf_opts = PerfOpts { inmem_bufcap: 512 };
|
let perf_opts = PerfOpts::default();
|
||||||
let channel = Channel {
|
let channel = Channel {
|
||||||
backend: channel_backend.into(),
|
backend: channel_backend.into(),
|
||||||
name: channel_name.into(),
|
name: channel_name.into(),
|
||||||
|
|||||||
@@ -796,7 +796,8 @@ impl Stream for DataApiPython3DataStream {
|
|||||||
None,
|
None,
|
||||||
true,
|
true,
|
||||||
);
|
);
|
||||||
let perf_opts = PerfOpts { inmem_bufcap: 1024 * 4 };
|
warn!("fix magic inmem_bufcap");
|
||||||
|
let perf_opts = PerfOpts::default();
|
||||||
// TODO is this a good to place decide this?
|
// TODO is this a good to place decide this?
|
||||||
let s = if self.node_config.node_config.cluster.is_central_storage {
|
let s = if self.node_config.node_config.cluster.is_central_storage {
|
||||||
info!("Set up central storage stream");
|
info!("Set up central storage stream");
|
||||||
|
|||||||
@@ -187,6 +187,10 @@ async fn proxy_http_service_inner(
|
|||||||
} else if path.starts_with("/api/4/test/log/debug") {
|
} else if path.starts_with("/api/4/test/log/debug") {
|
||||||
debug!("{path}");
|
debug!("{path}");
|
||||||
Ok(response(StatusCode::OK).body(Body::empty())?)
|
Ok(response(StatusCode::OK).body(Body::empty())?)
|
||||||
|
} else if let Some(h) = api1::PythonDataApi1Query::handler(&req) {
|
||||||
|
h.handle(req, ctx, proxy_config).await
|
||||||
|
} else if let Some(h) = api1::reqstatus::RequestStatusHandler::handler(&req) {
|
||||||
|
h.handle(req, proxy_config).await
|
||||||
} else if path.starts_with(DISTRI_PRE) {
|
} else if path.starts_with(DISTRI_PRE) {
|
||||||
proxy_distribute_v2(req).await
|
proxy_distribute_v2(req).await
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
@@ -1,11 +1,15 @@
|
|||||||
|
pub mod reqstatus;
|
||||||
|
|
||||||
use crate::bodystream::response;
|
use crate::bodystream::response;
|
||||||
use crate::err::Error;
|
use crate::err::Error;
|
||||||
use crate::ReqCtx;
|
use crate::ReqCtx;
|
||||||
|
use http::HeaderValue;
|
||||||
use http::Method;
|
use http::Method;
|
||||||
use http::Request;
|
use http::Request;
|
||||||
use http::Response;
|
use http::Response;
|
||||||
use http::StatusCode;
|
use http::StatusCode;
|
||||||
use hyper::Body;
|
use hyper::Body;
|
||||||
|
use hyper::Client;
|
||||||
use netpod::log::*;
|
use netpod::log::*;
|
||||||
use netpod::query::api1::Api1Query;
|
use netpod::query::api1::Api1Query;
|
||||||
use netpod::ProxyConfig;
|
use netpod::ProxyConfig;
|
||||||
@@ -29,14 +33,14 @@ impl PythonDataApi1Query {
|
|||||||
pub async fn handle(
|
pub async fn handle(
|
||||||
&self,
|
&self,
|
||||||
req: Request<Body>,
|
req: Request<Body>,
|
||||||
ctx: &ReqCtx,
|
_ctx: &ReqCtx,
|
||||||
proxy_config: &ProxyConfig,
|
proxy_config: &ProxyConfig,
|
||||||
) -> Result<Response<Body>, Error> {
|
) -> Result<Response<Body>, Error> {
|
||||||
if req.method() != Method::POST {
|
if req.method() != Method::POST {
|
||||||
return Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?);
|
return Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?);
|
||||||
}
|
}
|
||||||
let (head, body) = req.into_parts();
|
let (head, body) = req.into_parts();
|
||||||
let accept = head
|
let _accept = head
|
||||||
.headers
|
.headers
|
||||||
.get(http::header::ACCEPT)
|
.get(http::header::ACCEPT)
|
||||||
.map_or(Ok(ACCEPT_ALL), |k| k.to_str())
|
.map_or(Ok(ACCEPT_ALL), |k| k.to_str())
|
||||||
@@ -55,6 +59,39 @@ impl PythonDataApi1Query {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
info!("Proxy sees request: {qu:?}");
|
info!("Proxy sees request: {qu:?}");
|
||||||
Ok(response(StatusCode::OK).body(Body::empty())?)
|
let back = {
|
||||||
|
let mut ret = None;
|
||||||
|
for b in &proxy_config.backends {
|
||||||
|
if b.name == "sf-databuffer" {
|
||||||
|
ret = Some(b);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ret
|
||||||
|
};
|
||||||
|
if let Some(back) = back {
|
||||||
|
let url_str = format!("{}/api/1/query", back.url);
|
||||||
|
info!("try to ask {url_str}");
|
||||||
|
let req = Request::builder()
|
||||||
|
.method(Method::POST)
|
||||||
|
.uri(url_str)
|
||||||
|
.body(Body::from(body_data))?;
|
||||||
|
let client = Client::new();
|
||||||
|
let res = client.request(req).await?;
|
||||||
|
let (head, body) = res.into_parts();
|
||||||
|
if head.status != StatusCode::OK {
|
||||||
|
error!("backend returned error: {head:?}");
|
||||||
|
Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())?)
|
||||||
|
} else {
|
||||||
|
info!("backend returned OK");
|
||||||
|
let riq_def = HeaderValue::from_static("(none)");
|
||||||
|
let riq = head.headers.get("x-daqbuffer-request-id").unwrap_or(&riq_def);
|
||||||
|
Ok(response(StatusCode::OK)
|
||||||
|
.header("x-daqbuffer-request-id", riq)
|
||||||
|
.body(body)?)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())?)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
81
httpret/src/proxy/api1/reqstatus.rs
Normal file
81
httpret/src/proxy/api1/reqstatus.rs
Normal file
@@ -0,0 +1,81 @@
|
|||||||
|
use crate::bodystream::response;
|
||||||
|
use crate::err::Error;
|
||||||
|
use http::Method;
|
||||||
|
use http::Request;
|
||||||
|
use http::Response;
|
||||||
|
use http::StatusCode;
|
||||||
|
use hyper::Body;
|
||||||
|
use hyper::Client;
|
||||||
|
use netpod::log::*;
|
||||||
|
use netpod::ProxyConfig;
|
||||||
|
use netpod::ACCEPT_ALL;
|
||||||
|
use netpod::APP_JSON;
|
||||||
|
|
||||||
|
pub struct RequestStatusHandler {}
|
||||||
|
|
||||||
|
impl RequestStatusHandler {
|
||||||
|
pub fn path_prefix() -> &'static str {
|
||||||
|
"/api/1/requestStatus/"
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn handler(req: &Request<Body>) -> Option<Self> {
|
||||||
|
if req.uri().path().starts_with(Self::path_prefix()) {
|
||||||
|
Some(Self {})
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn handle(&self, req: Request<Body>, proxy_config: &ProxyConfig) -> Result<Response<Body>, Error> {
|
||||||
|
let (head, body) = req.into_parts();
|
||||||
|
if head.method != Method::GET {
|
||||||
|
return Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?);
|
||||||
|
}
|
||||||
|
let accept = head
|
||||||
|
.headers
|
||||||
|
.get(http::header::ACCEPT)
|
||||||
|
.map_or(Ok(ACCEPT_ALL), |k| k.to_str())
|
||||||
|
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?
|
||||||
|
.to_owned();
|
||||||
|
if accept != APP_JSON && accept != ACCEPT_ALL {
|
||||||
|
// TODO set the public error code and message and return Err(e).
|
||||||
|
let e = Error::with_public_msg(format!("Unsupported Accept: {:?}", accept));
|
||||||
|
error!("{e}");
|
||||||
|
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?);
|
||||||
|
}
|
||||||
|
let _body_data = hyper::body::to_bytes(body).await?;
|
||||||
|
let status_id = &head.uri.path()[Self::path_prefix().len()..];
|
||||||
|
info!("RequestStatusHandler status_id {:?}", status_id);
|
||||||
|
|
||||||
|
let back = {
|
||||||
|
let mut ret = None;
|
||||||
|
for b in &proxy_config.backends {
|
||||||
|
if b.name == "sf-databuffer" {
|
||||||
|
ret = Some(b);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ret
|
||||||
|
};
|
||||||
|
if let Some(back) = back {
|
||||||
|
let url_str = format!("{}{}{}", back.url, Self::path_prefix(), status_id);
|
||||||
|
info!("try to ask {url_str}");
|
||||||
|
let req = Request::builder()
|
||||||
|
.method(Method::GET)
|
||||||
|
.uri(url_str)
|
||||||
|
.body(Body::empty())?;
|
||||||
|
let client = Client::new();
|
||||||
|
let res = client.request(req).await?;
|
||||||
|
let (head, body) = res.into_parts();
|
||||||
|
if head.status != StatusCode::OK {
|
||||||
|
error!("backend returned error: {head:?}");
|
||||||
|
Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())?)
|
||||||
|
} else {
|
||||||
|
info!("backend returned OK");
|
||||||
|
Ok(response(StatusCode::OK).body(body)?)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())?)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1819,6 +1819,14 @@ pub struct PerfOpts {
|
|||||||
pub inmem_bufcap: usize,
|
pub inmem_bufcap: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl PerfOpts {
|
||||||
|
pub fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
inmem_bufcap: 1024 * 512,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct ByteSize(pub u32);
|
pub struct ByteSize(pub u32);
|
||||||
|
|
||||||
|
|||||||
@@ -131,7 +131,7 @@ impl PreBinnedQuery {
|
|||||||
self.cache_usage.clone()
|
self.cache_usage.clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn disk_io_buffer_size(&self) -> usize {
|
pub fn disk_io_buffer_sizee(&self) -> usize {
|
||||||
self.disk_io_buffer_size
|
self.disk_io_buffer_size
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -182,7 +182,8 @@ async fn events_conn_handler_inner_try(
|
|||||||
) -> Result<(), ConnErr> {
|
) -> Result<(), ConnErr> {
|
||||||
let _ = addr;
|
let _ = addr;
|
||||||
let (netin, mut netout) = stream.into_split();
|
let (netin, mut netout) = stream.into_split();
|
||||||
let perf_opts = PerfOpts { inmem_bufcap: 512 };
|
warn!("fix magic inmem_bufcap option");
|
||||||
|
let perf_opts = PerfOpts::default();
|
||||||
let mut h = InMemoryFrameAsyncReadStream::new(netin, perf_opts.inmem_bufcap);
|
let mut h = InMemoryFrameAsyncReadStream::new(netin, perf_opts.inmem_bufcap);
|
||||||
let mut frames = Vec::new();
|
let mut frames = Vec::new();
|
||||||
while let Some(k) = h
|
while let Some(k) = h
|
||||||
|
|||||||
Reference in New Issue
Block a user