From a0bc67d3a3106147c28ae3cee7ec1a657b9dafee Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 22 Feb 2022 16:54:14 +0100 Subject: [PATCH] Factor out handler, allow strings as port numbers to simplify Jinja2 templates --- httpret/src/events.rs | 96 +++++++++++++++++++++++++++++++++++++++ httpret/src/httpret.rs | 76 ++----------------------------- httpret/src/proxy/api4.rs | 9 ++-- netpod/src/netpod.rs | 55 ++++++++++++++++++++++ 4 files changed, 161 insertions(+), 75 deletions(-) create mode 100644 httpret/src/events.rs diff --git a/httpret/src/events.rs b/httpret/src/events.rs new file mode 100644 index 0000000..1b37a3d --- /dev/null +++ b/httpret/src/events.rs @@ -0,0 +1,96 @@ +use crate::err::Error; +use crate::{response, response_err, BodyStream, ToPublicResponse}; +use disk::events::{PlainEventsBinaryQuery, PlainEventsJsonQuery}; +use futures_util::{StreamExt, TryStreamExt}; +use http::{Method, Request, Response, StatusCode}; +use hyper::Body; +use netpod::log::*; +use netpod::{AggKind, NodeConfigCached}; +use netpod::{ACCEPT_ALL, APP_JSON, APP_OCTET}; +use url::Url; + +pub struct EventsHandler {} + +impl EventsHandler { + pub fn handler(req: &Request) -> Option { + if req.uri().path() == "/api/4/events" { + Some(Self {}) + } else { + None + } + } + + pub async fn handle(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { + if req.method() != Method::GET { + return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?); + } + let ret = match plain_events(req, node_config).await { + Ok(res) => res, + Err(e) => response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::from(format!("{:?}", e)))?, + }; + Ok(ret) + } +} + +async fn plain_events(req: Request, node_config: &NodeConfigCached) -> Result, Error> { + match plain_events_inner(req, node_config).await { + Ok(ret) => Ok(ret), + Err(e) => Ok(e.to_public_response()), + } +} + +async fn plain_events_inner(req: Request, node_config: &NodeConfigCached) -> Result, Error> { + info!("httpret plain_events_inner req: {:?}", req); + let accept_def = APP_JSON; + let accept = req + .headers() + .get(http::header::ACCEPT) + .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); + if accept == APP_JSON || accept == ACCEPT_ALL { + Ok(plain_events_json(req, node_config).await?) + } else if accept == APP_OCTET { + Ok(plain_events_binary(req, node_config).await?) + } else { + let ret = response_err(StatusCode::NOT_ACCEPTABLE, format!("Unsupported Accept: {:?}", accept))?; + Ok(ret) + } +} + +async fn plain_events_binary(req: Request, node_config: &NodeConfigCached) -> Result, Error> { + debug!("httpret plain_events_binary req: {:?}", req); + let url = Url::parse(&format!("dummy:{}", req.uri()))?; + let query = PlainEventsBinaryQuery::from_url(&url)?; + let op = disk::channelexec::PlainEvents::new( + query.channel().clone(), + query.range().clone(), + query.disk_io_buffer_size(), + node_config.clone(), + ); + let s = disk::channelexec::channel_exec(op, query.channel(), query.range(), AggKind::Plain, node_config).await?; + let s = s.map(|item| item.make_frame()); + let ret = response(StatusCode::OK).body(BodyStream::wrapped( + s.map_err(Error::from), + format!("plain_events_binary"), + ))?; + Ok(ret) +} + +async fn plain_events_json(req: Request, node_config: &NodeConfigCached) -> Result, Error> { + debug!("httpret plain_events_json req: {:?}", req); + let (head, _body) = req.into_parts(); + let query = PlainEventsJsonQuery::from_request_head(&head)?; + let op = disk::channelexec::PlainEventsJson::new( + query.channel().clone(), + query.range().clone(), + query.disk_io_buffer_size(), + query.timeout(), + node_config.clone(), + query.do_log(), + ); + let s = disk::channelexec::channel_exec(op, query.channel(), query.range(), AggKind::Plain, node_config).await?; + let ret = response(StatusCode::OK).body(BodyStream::wrapped( + s.map_err(Error::from), + format!("plain_events_json"), + ))?; + Ok(ret) +} diff --git a/httpret/src/httpret.rs b/httpret/src/httpret.rs index 60ac1a4..f5fdd27 100644 --- a/httpret/src/httpret.rs +++ b/httpret/src/httpret.rs @@ -1,6 +1,7 @@ pub mod api1; pub mod channelarchiver; pub mod err; +pub mod events; pub mod evinfo; pub mod gather; pub mod proxy; @@ -12,7 +13,6 @@ use crate::gather::gather_get_json; use crate::pulsemap::UpdateTask; use bytes::Bytes; use disk::binned::query::PreBinnedQuery; -use disk::events::{PlainEventsBinaryQuery, PlainEventsJsonQuery}; use future::Future; use futures_core::Stream; use futures_util::{FutureExt, StreamExt, TryStreamExt}; @@ -24,7 +24,7 @@ use net::SocketAddr; use netpod::query::BinnedQuery; use netpod::timeunits::SEC; use netpod::{ - channel_from_pairs, get_url_query_pairs, AggKind, ChannelConfigQuery, FromUrl, NodeConfigCached, NodeStatus, + channel_from_pairs, get_url_query_pairs, ChannelConfigQuery, FromUrl, NodeConfigCached, NodeStatus, NodeStatusArchiverAppliance, }; use netpod::{log::*, ACCEPT_ALL}; @@ -190,7 +190,6 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> } else if path == "/api/4/version" { if req.method() == Method::GET { let ret = serde_json::json!({ - //"data_api_version": "4.0.0-beta", "data_api_version": { "major": 4, "minor": 0, @@ -206,12 +205,8 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } - } else if path == "/api/4/events" { - if req.method() == Method::GET { - Ok(plain_events(req, &node_config).await?) - } else { - Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) - } + } else if let Some(h) = events::EventsHandler::handler(&req) { + h.handle(req, &node_config).await } else if path == "/api/4/binned" { if req.method() == Method::GET { Ok(binned(req, node_config).await?) @@ -577,69 +572,6 @@ async fn prebinned_inner(req: Request, node_config: &NodeConfigCached) -> Ok(ret) } -async fn plain_events(req: Request, node_config: &NodeConfigCached) -> Result, Error> { - match plain_events_inner(req, node_config).await { - Ok(ret) => Ok(ret), - Err(e) => Ok(e.to_public_response()), - } -} - -async fn plain_events_inner(req: Request, node_config: &NodeConfigCached) -> Result, Error> { - info!("httpret plain_events_inner req: {:?}", req); - let accept_def = APP_JSON; - let accept = req - .headers() - .get(http::header::ACCEPT) - .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); - if accept == APP_JSON || accept == ACCEPT_ALL { - Ok(plain_events_json(req, node_config).await?) - } else if accept == APP_OCTET { - Ok(plain_events_binary(req, node_config).await?) - } else { - let ret = response_err(StatusCode::NOT_ACCEPTABLE, format!("unsupported Accept: {:?}", accept))?; - Ok(ret) - } -} - -async fn plain_events_binary(req: Request, node_config: &NodeConfigCached) -> Result, Error> { - debug!("httpret plain_events_binary req: {:?}", req); - let url = Url::parse(&format!("dummy:{}", req.uri()))?; - let query = PlainEventsBinaryQuery::from_url(&url)?; - let op = disk::channelexec::PlainEvents::new( - query.channel().clone(), - query.range().clone(), - query.disk_io_buffer_size(), - node_config.clone(), - ); - let s = disk::channelexec::channel_exec(op, query.channel(), query.range(), AggKind::Plain, node_config).await?; - let s = s.map(|item| item.make_frame()); - let ret = response(StatusCode::OK).body(BodyStream::wrapped( - s.map_err(Error::from), - format!("plain_events_binary"), - ))?; - Ok(ret) -} - -async fn plain_events_json(req: Request, node_config: &NodeConfigCached) -> Result, Error> { - debug!("httpret plain_events_json req: {:?}", req); - let (head, _body) = req.into_parts(); - let query = PlainEventsJsonQuery::from_request_head(&head)?; - let op = disk::channelexec::PlainEventsJson::new( - query.channel().clone(), - query.range().clone(), - query.disk_io_buffer_size(), - query.timeout(), - node_config.clone(), - query.do_log(), - ); - let s = disk::channelexec::channel_exec(op, query.channel(), query.range(), AggKind::Plain, node_config).await?; - let ret = response(StatusCode::OK).body(BodyStream::wrapped( - s.map_err(Error::from), - format!("plain_events_json"), - ))?; - Ok(ret) -} - async fn node_status(req: Request, node_config: &NodeConfigCached) -> Result, Error> { let (_head, _body) = req.into_parts(); let archiver_appliance_status = match node_config.node.archiver_appliance.as_ref() { diff --git a/httpret/src/proxy/api4.rs b/httpret/src/proxy/api4.rs index 71e02b5..e9debee 100644 --- a/httpret/src/proxy/api4.rs +++ b/httpret/src/proxy/api4.rs @@ -102,7 +102,6 @@ pub async fn node_status(req: Request, proxy_config: &ProxyConfig) -> Resu let vdef = header::HeaderValue::from_static(APP_JSON); let v = head.headers.get(header::ACCEPT).unwrap_or(&vdef); if v == APP_JSON || v == ACCEPT_ALL { - let inpurl = Url::parse(&format!("dummy:{}", head.uri))?; let mut bodies = vec![]; let urls = proxy_config .backends_status @@ -138,13 +137,17 @@ pub async fn node_status(req: Request, proxy_config: &ProxyConfig) -> Resu Box::pin(fut) as Pin + Send>> }; let ft = |all: Vec>| { - let mut res = vec![]; + let mut sres = vec![]; for j in all { let val = serde_json::json!({ j.tag: j.val, }); - res.push(val); + sres.push(val); } + let res = serde_json::json!({ + "THIS_PROXY": "status here...", + "subnodes": sres, + }); let res = response(StatusCode::OK) .header(http::header::CONTENT_TYPE, APP_JSON) .body(Body::from(serde_json::to_string(&res)?)) diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index 60ac819..e16a362 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -220,8 +220,11 @@ pub struct ChannelArchiver { #[derive(Clone, Debug, Serialize, Deserialize)] pub struct Node { pub host: String, + // TODO for `listen` and the ports, would be great to allow a default on Cluster level. pub listen: String, + #[serde(deserialize_with = "port_from_any")] pub port: u16, + #[serde(deserialize_with = "port_from_any")] pub port_raw: u16, pub cache_base_path: PathBuf, pub sf_databuffer: Option, @@ -229,6 +232,58 @@ pub struct Node { pub channel_archiver: Option, } +struct Visit1 {} + +impl<'de> serde::de::Visitor<'de> for Visit1 { + type Value = u16; + + fn expecting(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "a tcp port number, in numeric or string form.") + } + + fn visit_u64(self, v: u64) -> Result + where + E: serde::de::Error, + { + if v > u16::MAX as u64 { + Err(serde::de::Error::invalid_type( + serde::de::Unexpected::Unsigned(v), + &self, + )) + } else { + self.visit_i64(v as i64) + } + } + + fn visit_i64(self, v: i64) -> Result + where + E: serde::de::Error, + { + if v < 1 || v > u16::MAX as i64 { + Err(serde::de::Error::invalid_type(serde::de::Unexpected::Signed(v), &self)) + } else { + Ok(v as u16) + } + } + + fn visit_str(self, v: &str) -> Result + where + E: serde::de::Error, + { + match v.parse::() { + Err(_) => Err(serde::de::Error::invalid_type(serde::de::Unexpected::Str(v), &self)), + Ok(v) => Ok(v), + } + } +} + +fn port_from_any<'de, D>(de: D) -> Result +where + D: serde::Deserializer<'de>, +{ + de.deserialize_any(Visit1 {}) +} + impl Node { // TODO needed? Could `sf_databuffer` be None? pub fn dummy() -> Self {