Factor out handler, allow strings as port numbers to simplify Jinja2 templates
This commit is contained in:
96
httpret/src/events.rs
Normal file
96
httpret/src/events.rs
Normal file
@@ -0,0 +1,96 @@
|
||||
use crate::err::Error;
|
||||
use crate::{response, response_err, BodyStream, ToPublicResponse};
|
||||
use disk::events::{PlainEventsBinaryQuery, PlainEventsJsonQuery};
|
||||
use futures_util::{StreamExt, TryStreamExt};
|
||||
use http::{Method, Request, Response, StatusCode};
|
||||
use hyper::Body;
|
||||
use netpod::log::*;
|
||||
use netpod::{AggKind, NodeConfigCached};
|
||||
use netpod::{ACCEPT_ALL, APP_JSON, APP_OCTET};
|
||||
use url::Url;
|
||||
|
||||
pub struct EventsHandler {}
|
||||
|
||||
impl EventsHandler {
|
||||
pub fn handler(req: &Request<Body>) -> Option<Self> {
|
||||
if req.uri().path() == "/api/4/events" {
|
||||
Some(Self {})
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn handle(&self, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
|
||||
if req.method() != Method::GET {
|
||||
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?);
|
||||
}
|
||||
let ret = match plain_events(req, node_config).await {
|
||||
Ok(res) => res,
|
||||
Err(e) => response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::from(format!("{:?}", e)))?,
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
}
|
||||
|
||||
async fn plain_events(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
|
||||
match plain_events_inner(req, node_config).await {
|
||||
Ok(ret) => Ok(ret),
|
||||
Err(e) => Ok(e.to_public_response()),
|
||||
}
|
||||
}
|
||||
|
||||
async fn plain_events_inner(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
|
||||
info!("httpret plain_events_inner req: {:?}", req);
|
||||
let accept_def = APP_JSON;
|
||||
let accept = req
|
||||
.headers()
|
||||
.get(http::header::ACCEPT)
|
||||
.map_or(accept_def, |k| k.to_str().unwrap_or(accept_def));
|
||||
if accept == APP_JSON || accept == ACCEPT_ALL {
|
||||
Ok(plain_events_json(req, node_config).await?)
|
||||
} else if accept == APP_OCTET {
|
||||
Ok(plain_events_binary(req, node_config).await?)
|
||||
} else {
|
||||
let ret = response_err(StatusCode::NOT_ACCEPTABLE, format!("Unsupported Accept: {:?}", accept))?;
|
||||
Ok(ret)
|
||||
}
|
||||
}
|
||||
|
||||
async fn plain_events_binary(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
|
||||
debug!("httpret plain_events_binary req: {:?}", req);
|
||||
let url = Url::parse(&format!("dummy:{}", req.uri()))?;
|
||||
let query = PlainEventsBinaryQuery::from_url(&url)?;
|
||||
let op = disk::channelexec::PlainEvents::new(
|
||||
query.channel().clone(),
|
||||
query.range().clone(),
|
||||
query.disk_io_buffer_size(),
|
||||
node_config.clone(),
|
||||
);
|
||||
let s = disk::channelexec::channel_exec(op, query.channel(), query.range(), AggKind::Plain, node_config).await?;
|
||||
let s = s.map(|item| item.make_frame());
|
||||
let ret = response(StatusCode::OK).body(BodyStream::wrapped(
|
||||
s.map_err(Error::from),
|
||||
format!("plain_events_binary"),
|
||||
))?;
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
async fn plain_events_json(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
|
||||
debug!("httpret plain_events_json req: {:?}", req);
|
||||
let (head, _body) = req.into_parts();
|
||||
let query = PlainEventsJsonQuery::from_request_head(&head)?;
|
||||
let op = disk::channelexec::PlainEventsJson::new(
|
||||
query.channel().clone(),
|
||||
query.range().clone(),
|
||||
query.disk_io_buffer_size(),
|
||||
query.timeout(),
|
||||
node_config.clone(),
|
||||
query.do_log(),
|
||||
);
|
||||
let s = disk::channelexec::channel_exec(op, query.channel(), query.range(), AggKind::Plain, node_config).await?;
|
||||
let ret = response(StatusCode::OK).body(BodyStream::wrapped(
|
||||
s.map_err(Error::from),
|
||||
format!("plain_events_json"),
|
||||
))?;
|
||||
Ok(ret)
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
pub mod api1;
|
||||
pub mod channelarchiver;
|
||||
pub mod err;
|
||||
pub mod events;
|
||||
pub mod evinfo;
|
||||
pub mod gather;
|
||||
pub mod proxy;
|
||||
@@ -12,7 +13,6 @@ use crate::gather::gather_get_json;
|
||||
use crate::pulsemap::UpdateTask;
|
||||
use bytes::Bytes;
|
||||
use disk::binned::query::PreBinnedQuery;
|
||||
use disk::events::{PlainEventsBinaryQuery, PlainEventsJsonQuery};
|
||||
use future::Future;
|
||||
use futures_core::Stream;
|
||||
use futures_util::{FutureExt, StreamExt, TryStreamExt};
|
||||
@@ -24,7 +24,7 @@ use net::SocketAddr;
|
||||
use netpod::query::BinnedQuery;
|
||||
use netpod::timeunits::SEC;
|
||||
use netpod::{
|
||||
channel_from_pairs, get_url_query_pairs, AggKind, ChannelConfigQuery, FromUrl, NodeConfigCached, NodeStatus,
|
||||
channel_from_pairs, get_url_query_pairs, ChannelConfigQuery, FromUrl, NodeConfigCached, NodeStatus,
|
||||
NodeStatusArchiverAppliance,
|
||||
};
|
||||
use netpod::{log::*, ACCEPT_ALL};
|
||||
@@ -190,7 +190,6 @@ async fn http_service_try(req: Request<Body>, node_config: &NodeConfigCached) ->
|
||||
} else if path == "/api/4/version" {
|
||||
if req.method() == Method::GET {
|
||||
let ret = serde_json::json!({
|
||||
//"data_api_version": "4.0.0-beta",
|
||||
"data_api_version": {
|
||||
"major": 4,
|
||||
"minor": 0,
|
||||
@@ -206,12 +205,8 @@ async fn http_service_try(req: Request<Body>, node_config: &NodeConfigCached) ->
|
||||
} else {
|
||||
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
|
||||
}
|
||||
} else if path == "/api/4/events" {
|
||||
if req.method() == Method::GET {
|
||||
Ok(plain_events(req, &node_config).await?)
|
||||
} else {
|
||||
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
|
||||
}
|
||||
} else if let Some(h) = events::EventsHandler::handler(&req) {
|
||||
h.handle(req, &node_config).await
|
||||
} else if path == "/api/4/binned" {
|
||||
if req.method() == Method::GET {
|
||||
Ok(binned(req, node_config).await?)
|
||||
@@ -577,69 +572,6 @@ async fn prebinned_inner(req: Request<Body>, node_config: &NodeConfigCached) ->
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
async fn plain_events(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
|
||||
match plain_events_inner(req, node_config).await {
|
||||
Ok(ret) => Ok(ret),
|
||||
Err(e) => Ok(e.to_public_response()),
|
||||
}
|
||||
}
|
||||
|
||||
async fn plain_events_inner(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
|
||||
info!("httpret plain_events_inner req: {:?}", req);
|
||||
let accept_def = APP_JSON;
|
||||
let accept = req
|
||||
.headers()
|
||||
.get(http::header::ACCEPT)
|
||||
.map_or(accept_def, |k| k.to_str().unwrap_or(accept_def));
|
||||
if accept == APP_JSON || accept == ACCEPT_ALL {
|
||||
Ok(plain_events_json(req, node_config).await?)
|
||||
} else if accept == APP_OCTET {
|
||||
Ok(plain_events_binary(req, node_config).await?)
|
||||
} else {
|
||||
let ret = response_err(StatusCode::NOT_ACCEPTABLE, format!("unsupported Accept: {:?}", accept))?;
|
||||
Ok(ret)
|
||||
}
|
||||
}
|
||||
|
||||
async fn plain_events_binary(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
|
||||
debug!("httpret plain_events_binary req: {:?}", req);
|
||||
let url = Url::parse(&format!("dummy:{}", req.uri()))?;
|
||||
let query = PlainEventsBinaryQuery::from_url(&url)?;
|
||||
let op = disk::channelexec::PlainEvents::new(
|
||||
query.channel().clone(),
|
||||
query.range().clone(),
|
||||
query.disk_io_buffer_size(),
|
||||
node_config.clone(),
|
||||
);
|
||||
let s = disk::channelexec::channel_exec(op, query.channel(), query.range(), AggKind::Plain, node_config).await?;
|
||||
let s = s.map(|item| item.make_frame());
|
||||
let ret = response(StatusCode::OK).body(BodyStream::wrapped(
|
||||
s.map_err(Error::from),
|
||||
format!("plain_events_binary"),
|
||||
))?;
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
async fn plain_events_json(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
|
||||
debug!("httpret plain_events_json req: {:?}", req);
|
||||
let (head, _body) = req.into_parts();
|
||||
let query = PlainEventsJsonQuery::from_request_head(&head)?;
|
||||
let op = disk::channelexec::PlainEventsJson::new(
|
||||
query.channel().clone(),
|
||||
query.range().clone(),
|
||||
query.disk_io_buffer_size(),
|
||||
query.timeout(),
|
||||
node_config.clone(),
|
||||
query.do_log(),
|
||||
);
|
||||
let s = disk::channelexec::channel_exec(op, query.channel(), query.range(), AggKind::Plain, node_config).await?;
|
||||
let ret = response(StatusCode::OK).body(BodyStream::wrapped(
|
||||
s.map_err(Error::from),
|
||||
format!("plain_events_json"),
|
||||
))?;
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
async fn node_status(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
|
||||
let (_head, _body) = req.into_parts();
|
||||
let archiver_appliance_status = match node_config.node.archiver_appliance.as_ref() {
|
||||
|
||||
@@ -102,7 +102,6 @@ pub async fn node_status(req: Request<Body>, proxy_config: &ProxyConfig) -> Resu
|
||||
let vdef = header::HeaderValue::from_static(APP_JSON);
|
||||
let v = head.headers.get(header::ACCEPT).unwrap_or(&vdef);
|
||||
if v == APP_JSON || v == ACCEPT_ALL {
|
||||
let inpurl = Url::parse(&format!("dummy:{}", head.uri))?;
|
||||
let mut bodies = vec![];
|
||||
let urls = proxy_config
|
||||
.backends_status
|
||||
@@ -138,13 +137,17 @@ pub async fn node_status(req: Request<Body>, proxy_config: &ProxyConfig) -> Resu
|
||||
Box::pin(fut) as Pin<Box<dyn Future<Output = _> + Send>>
|
||||
};
|
||||
let ft = |all: Vec<SubRes<JsVal>>| {
|
||||
let mut res = vec![];
|
||||
let mut sres = vec![];
|
||||
for j in all {
|
||||
let val = serde_json::json!({
|
||||
j.tag: j.val,
|
||||
});
|
||||
res.push(val);
|
||||
sres.push(val);
|
||||
}
|
||||
let res = serde_json::json!({
|
||||
"THIS_PROXY": "status here...",
|
||||
"subnodes": sres,
|
||||
});
|
||||
let res = response(StatusCode::OK)
|
||||
.header(http::header::CONTENT_TYPE, APP_JSON)
|
||||
.body(Body::from(serde_json::to_string(&res)?))
|
||||
|
||||
@@ -220,8 +220,11 @@ pub struct ChannelArchiver {
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct Node {
|
||||
pub host: String,
|
||||
// TODO for `listen` and the ports, would be great to allow a default on Cluster level.
|
||||
pub listen: String,
|
||||
#[serde(deserialize_with = "port_from_any")]
|
||||
pub port: u16,
|
||||
#[serde(deserialize_with = "port_from_any")]
|
||||
pub port_raw: u16,
|
||||
pub cache_base_path: PathBuf,
|
||||
pub sf_databuffer: Option<SfDatabuffer>,
|
||||
@@ -229,6 +232,58 @@ pub struct Node {
|
||||
pub channel_archiver: Option<ChannelArchiver>,
|
||||
}
|
||||
|
||||
struct Visit1 {}
|
||||
|
||||
impl<'de> serde::de::Visitor<'de> for Visit1 {
|
||||
type Value = u16;
|
||||
|
||||
fn expecting(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(fmt, "a tcp port number, in numeric or string form.")
|
||||
}
|
||||
|
||||
fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>
|
||||
where
|
||||
E: serde::de::Error,
|
||||
{
|
||||
if v > u16::MAX as u64 {
|
||||
Err(serde::de::Error::invalid_type(
|
||||
serde::de::Unexpected::Unsigned(v),
|
||||
&self,
|
||||
))
|
||||
} else {
|
||||
self.visit_i64(v as i64)
|
||||
}
|
||||
}
|
||||
|
||||
fn visit_i64<E>(self, v: i64) -> Result<Self::Value, E>
|
||||
where
|
||||
E: serde::de::Error,
|
||||
{
|
||||
if v < 1 || v > u16::MAX as i64 {
|
||||
Err(serde::de::Error::invalid_type(serde::de::Unexpected::Signed(v), &self))
|
||||
} else {
|
||||
Ok(v as u16)
|
||||
}
|
||||
}
|
||||
|
||||
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
|
||||
where
|
||||
E: serde::de::Error,
|
||||
{
|
||||
match v.parse::<u16>() {
|
||||
Err(_) => Err(serde::de::Error::invalid_type(serde::de::Unexpected::Str(v), &self)),
|
||||
Ok(v) => Ok(v),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn port_from_any<'de, D>(de: D) -> Result<u16, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de>,
|
||||
{
|
||||
de.deserialize_any(Visit1 {})
|
||||
}
|
||||
|
||||
impl Node {
|
||||
// TODO needed? Could `sf_databuffer` be None?
|
||||
pub fn dummy() -> Self {
|
||||
|
||||
Reference in New Issue
Block a user