diff --git a/httpret/src/events.rs b/httpret/src/events.rs
new file mode 100644
index 0000000..1b37a3d
--- /dev/null
+++ b/httpret/src/events.rs
@@ -0,0 +1,96 @@
+use crate::err::Error;
+use crate::{response, response_err, BodyStream, ToPublicResponse};
+use disk::events::{PlainEventsBinaryQuery, PlainEventsJsonQuery};
+use futures_util::{StreamExt, TryStreamExt};
+use http::{Method, Request, Response, StatusCode};
+use hyper::Body;
+use netpod::log::*;
+use netpod::{AggKind, NodeConfigCached};
+use netpod::{ACCEPT_ALL, APP_JSON, APP_OCTET};
+use url::Url;
+
+pub struct EventsHandler {}
+
+impl EventsHandler {
+ pub fn handler(req: &Request
) -> Option {
+ if req.uri().path() == "/api/4/events" {
+ Some(Self {})
+ } else {
+ None
+ }
+ }
+
+ pub async fn handle(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> {
+ if req.method() != Method::GET {
+ return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?);
+ }
+ let ret = match plain_events(req, node_config).await {
+ Ok(res) => res,
+ Err(e) => response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::from(format!("{:?}", e)))?,
+ };
+ Ok(ret)
+ }
+}
+
+async fn plain_events(req: Request, node_config: &NodeConfigCached) -> Result, Error> {
+ match plain_events_inner(req, node_config).await {
+ Ok(ret) => Ok(ret),
+ Err(e) => Ok(e.to_public_response()),
+ }
+}
+
+async fn plain_events_inner(req: Request, node_config: &NodeConfigCached) -> Result, Error> {
+ info!("httpret plain_events_inner req: {:?}", req);
+ let accept_def = APP_JSON;
+ let accept = req
+ .headers()
+ .get(http::header::ACCEPT)
+ .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def));
+ if accept == APP_JSON || accept == ACCEPT_ALL {
+ Ok(plain_events_json(req, node_config).await?)
+ } else if accept == APP_OCTET {
+ Ok(plain_events_binary(req, node_config).await?)
+ } else {
+ let ret = response_err(StatusCode::NOT_ACCEPTABLE, format!("Unsupported Accept: {:?}", accept))?;
+ Ok(ret)
+ }
+}
+
+async fn plain_events_binary(req: Request, node_config: &NodeConfigCached) -> Result, Error> {
+ debug!("httpret plain_events_binary req: {:?}", req);
+ let url = Url::parse(&format!("dummy:{}", req.uri()))?;
+ let query = PlainEventsBinaryQuery::from_url(&url)?;
+ let op = disk::channelexec::PlainEvents::new(
+ query.channel().clone(),
+ query.range().clone(),
+ query.disk_io_buffer_size(),
+ node_config.clone(),
+ );
+ let s = disk::channelexec::channel_exec(op, query.channel(), query.range(), AggKind::Plain, node_config).await?;
+ let s = s.map(|item| item.make_frame());
+ let ret = response(StatusCode::OK).body(BodyStream::wrapped(
+ s.map_err(Error::from),
+ format!("plain_events_binary"),
+ ))?;
+ Ok(ret)
+}
+
+async fn plain_events_json(req: Request, node_config: &NodeConfigCached) -> Result, Error> {
+ debug!("httpret plain_events_json req: {:?}", req);
+ let (head, _body) = req.into_parts();
+ let query = PlainEventsJsonQuery::from_request_head(&head)?;
+ let op = disk::channelexec::PlainEventsJson::new(
+ query.channel().clone(),
+ query.range().clone(),
+ query.disk_io_buffer_size(),
+ query.timeout(),
+ node_config.clone(),
+ query.do_log(),
+ );
+ let s = disk::channelexec::channel_exec(op, query.channel(), query.range(), AggKind::Plain, node_config).await?;
+ let ret = response(StatusCode::OK).body(BodyStream::wrapped(
+ s.map_err(Error::from),
+ format!("plain_events_json"),
+ ))?;
+ Ok(ret)
+}
diff --git a/httpret/src/httpret.rs b/httpret/src/httpret.rs
index 60ac1a4..f5fdd27 100644
--- a/httpret/src/httpret.rs
+++ b/httpret/src/httpret.rs
@@ -1,6 +1,7 @@
pub mod api1;
pub mod channelarchiver;
pub mod err;
+pub mod events;
pub mod evinfo;
pub mod gather;
pub mod proxy;
@@ -12,7 +13,6 @@ use crate::gather::gather_get_json;
use crate::pulsemap::UpdateTask;
use bytes::Bytes;
use disk::binned::query::PreBinnedQuery;
-use disk::events::{PlainEventsBinaryQuery, PlainEventsJsonQuery};
use future::Future;
use futures_core::Stream;
use futures_util::{FutureExt, StreamExt, TryStreamExt};
@@ -24,7 +24,7 @@ use net::SocketAddr;
use netpod::query::BinnedQuery;
use netpod::timeunits::SEC;
use netpod::{
- channel_from_pairs, get_url_query_pairs, AggKind, ChannelConfigQuery, FromUrl, NodeConfigCached, NodeStatus,
+ channel_from_pairs, get_url_query_pairs, ChannelConfigQuery, FromUrl, NodeConfigCached, NodeStatus,
NodeStatusArchiverAppliance,
};
use netpod::{log::*, ACCEPT_ALL};
@@ -190,7 +190,6 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) ->
} else if path == "/api/4/version" {
if req.method() == Method::GET {
let ret = serde_json::json!({
- //"data_api_version": "4.0.0-beta",
"data_api_version": {
"major": 4,
"minor": 0,
@@ -206,12 +205,8 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) ->
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
- } else if path == "/api/4/events" {
- if req.method() == Method::GET {
- Ok(plain_events(req, &node_config).await?)
- } else {
- Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
- }
+ } else if let Some(h) = events::EventsHandler::handler(&req) {
+ h.handle(req, &node_config).await
} else if path == "/api/4/binned" {
if req.method() == Method::GET {
Ok(binned(req, node_config).await?)
@@ -577,69 +572,6 @@ async fn prebinned_inner(req: Request, node_config: &NodeConfigCached) ->
Ok(ret)
}
-async fn plain_events(req: Request, node_config: &NodeConfigCached) -> Result, Error> {
- match plain_events_inner(req, node_config).await {
- Ok(ret) => Ok(ret),
- Err(e) => Ok(e.to_public_response()),
- }
-}
-
-async fn plain_events_inner(req: Request, node_config: &NodeConfigCached) -> Result, Error> {
- info!("httpret plain_events_inner req: {:?}", req);
- let accept_def = APP_JSON;
- let accept = req
- .headers()
- .get(http::header::ACCEPT)
- .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def));
- if accept == APP_JSON || accept == ACCEPT_ALL {
- Ok(plain_events_json(req, node_config).await?)
- } else if accept == APP_OCTET {
- Ok(plain_events_binary(req, node_config).await?)
- } else {
- let ret = response_err(StatusCode::NOT_ACCEPTABLE, format!("unsupported Accept: {:?}", accept))?;
- Ok(ret)
- }
-}
-
-async fn plain_events_binary(req: Request, node_config: &NodeConfigCached) -> Result, Error> {
- debug!("httpret plain_events_binary req: {:?}", req);
- let url = Url::parse(&format!("dummy:{}", req.uri()))?;
- let query = PlainEventsBinaryQuery::from_url(&url)?;
- let op = disk::channelexec::PlainEvents::new(
- query.channel().clone(),
- query.range().clone(),
- query.disk_io_buffer_size(),
- node_config.clone(),
- );
- let s = disk::channelexec::channel_exec(op, query.channel(), query.range(), AggKind::Plain, node_config).await?;
- let s = s.map(|item| item.make_frame());
- let ret = response(StatusCode::OK).body(BodyStream::wrapped(
- s.map_err(Error::from),
- format!("plain_events_binary"),
- ))?;
- Ok(ret)
-}
-
-async fn plain_events_json(req: Request, node_config: &NodeConfigCached) -> Result, Error> {
- debug!("httpret plain_events_json req: {:?}", req);
- let (head, _body) = req.into_parts();
- let query = PlainEventsJsonQuery::from_request_head(&head)?;
- let op = disk::channelexec::PlainEventsJson::new(
- query.channel().clone(),
- query.range().clone(),
- query.disk_io_buffer_size(),
- query.timeout(),
- node_config.clone(),
- query.do_log(),
- );
- let s = disk::channelexec::channel_exec(op, query.channel(), query.range(), AggKind::Plain, node_config).await?;
- let ret = response(StatusCode::OK).body(BodyStream::wrapped(
- s.map_err(Error::from),
- format!("plain_events_json"),
- ))?;
- Ok(ret)
-}
-
async fn node_status(req: Request, node_config: &NodeConfigCached) -> Result, Error> {
let (_head, _body) = req.into_parts();
let archiver_appliance_status = match node_config.node.archiver_appliance.as_ref() {
diff --git a/httpret/src/proxy/api4.rs b/httpret/src/proxy/api4.rs
index 71e02b5..e9debee 100644
--- a/httpret/src/proxy/api4.rs
+++ b/httpret/src/proxy/api4.rs
@@ -102,7 +102,6 @@ pub async fn node_status(req: Request, proxy_config: &ProxyConfig) -> Resu
let vdef = header::HeaderValue::from_static(APP_JSON);
let v = head.headers.get(header::ACCEPT).unwrap_or(&vdef);
if v == APP_JSON || v == ACCEPT_ALL {
- let inpurl = Url::parse(&format!("dummy:{}", head.uri))?;
let mut bodies = vec![];
let urls = proxy_config
.backends_status
@@ -138,13 +137,17 @@ pub async fn node_status(req: Request, proxy_config: &ProxyConfig) -> Resu
Box::pin(fut) as Pin + Send>>
};
let ft = |all: Vec>| {
- let mut res = vec![];
+ let mut sres = vec![];
for j in all {
let val = serde_json::json!({
j.tag: j.val,
});
- res.push(val);
+ sres.push(val);
}
+ let res = serde_json::json!({
+ "THIS_PROXY": "status here...",
+ "subnodes": sres,
+ });
let res = response(StatusCode::OK)
.header(http::header::CONTENT_TYPE, APP_JSON)
.body(Body::from(serde_json::to_string(&res)?))
diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs
index 60ac819..e16a362 100644
--- a/netpod/src/netpod.rs
+++ b/netpod/src/netpod.rs
@@ -220,8 +220,11 @@ pub struct ChannelArchiver {
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Node {
pub host: String,
+ // TODO for `listen` and the ports, would be great to allow a default on Cluster level.
pub listen: String,
+ #[serde(deserialize_with = "port_from_any")]
pub port: u16,
+ #[serde(deserialize_with = "port_from_any")]
pub port_raw: u16,
pub cache_base_path: PathBuf,
pub sf_databuffer: Option,
@@ -229,6 +232,58 @@ pub struct Node {
pub channel_archiver: Option,
}
+struct Visit1 {}
+
+impl<'de> serde::de::Visitor<'de> for Visit1 {
+ type Value = u16;
+
+ fn expecting(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ write!(fmt, "a tcp port number, in numeric or string form.")
+ }
+
+ fn visit_u64(self, v: u64) -> Result
+ where
+ E: serde::de::Error,
+ {
+ if v > u16::MAX as u64 {
+ Err(serde::de::Error::invalid_type(
+ serde::de::Unexpected::Unsigned(v),
+ &self,
+ ))
+ } else {
+ self.visit_i64(v as i64)
+ }
+ }
+
+ fn visit_i64(self, v: i64) -> Result
+ where
+ E: serde::de::Error,
+ {
+ if v < 1 || v > u16::MAX as i64 {
+ Err(serde::de::Error::invalid_type(serde::de::Unexpected::Signed(v), &self))
+ } else {
+ Ok(v as u16)
+ }
+ }
+
+ fn visit_str(self, v: &str) -> Result
+ where
+ E: serde::de::Error,
+ {
+ match v.parse::() {
+ Err(_) => Err(serde::de::Error::invalid_type(serde::de::Unexpected::Str(v), &self)),
+ Ok(v) => Ok(v),
+ }
+ }
+}
+
+fn port_from_any<'de, D>(de: D) -> Result
+where
+ D: serde::Deserializer<'de>,
+{
+ de.deserialize_any(Visit1 {})
+}
+
impl Node {
// TODO needed? Could `sf_databuffer` be None?
pub fn dummy() -> Self {