Factor out channel config handler

This commit is contained in:
Dominik Werder
2022-02-28 13:55:34 +01:00
parent 36ecc858fd
commit f82989f5c9
14 changed files with 248 additions and 83 deletions

View File

@@ -0,0 +1,62 @@
use crate::err::Error;
use crate::{response, ToPublicResponse};
use http::{Method, Request, Response, StatusCode};
use hyper::Body;
use netpod::log::*;
use netpod::NodeConfigCached;
use netpod::{ChannelConfigQuery, FromUrl};
use netpod::{ACCEPT_ALL, APP_JSON};
use url::Url;
pub struct ChannelConfigHandler {}
impl ChannelConfigHandler {
pub fn handler(req: &Request<Body>) -> Option<Self> {
if req.uri().path() == "/api/4/channel/config" {
Some(Self {})
} else {
None
}
}
pub async fn handle(&self, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
if req.method() == Method::GET {
let accept_def = APP_JSON;
let accept = req
.headers()
.get(http::header::ACCEPT)
.map_or(accept_def, |k| k.to_str().unwrap_or(accept_def));
if accept == APP_JSON || accept == ACCEPT_ALL {
match channel_config(req, &node_config).await {
Ok(k) => Ok(k),
Err(e) => {
warn!("ChannelConfigHandler::handle: got error from channel_config: {e:?}");
Ok(e.to_public_response())
}
}
} else {
Ok(response(StatusCode::BAD_REQUEST).body(Body::empty())?)
}
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
}
}
pub async fn channel_config(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
let url = Url::parse(&format!("dummy:{}", req.uri()))?;
//let pairs = get_url_query_pairs(&url);
let q = ChannelConfigQuery::from_url(&url)?;
let conf = if let Some(conf) = &node_config.node.channel_archiver {
archapp_wrap::archapp::archeng::channel_config_from_db(&q, conf, &node_config.node_config.cluster.database)
.await?
} else if let Some(conf) = &node_config.node.archiver_appliance {
archapp_wrap::channel_config(&q, conf).await?
} else {
parse::channelconfig::channel_config(&q, &node_config.node).await?
};
let ret = response(StatusCode::OK)
.header(http::header::CONTENT_TYPE, APP_JSON)
.body(Body::from(serde_json::to_string(&conf)?))?;
Ok(ret)
}

View File

@@ -1,6 +1,8 @@
use serde::{Deserialize, Serialize};
use std::fmt;
pub struct Error(::err::Error);
#[derive(Serialize, Deserialize)]
pub struct Error(pub ::err::Error);
impl Error {
pub fn with_msg<S: Into<String>>(s: S) -> Self {

View File

@@ -1,5 +1,6 @@
pub mod api1;
pub mod channelarchiver;
pub mod channelconfig;
pub mod err;
pub mod events;
pub mod evinfo;
@@ -21,14 +22,12 @@ use hyper::server::conn::AddrStream;
use hyper::service::{make_service_fn, service_fn};
use hyper::{server::Server, Body, Request, Response};
use net::SocketAddr;
use netpod::log::*;
use netpod::query::BinnedQuery;
use netpod::timeunits::SEC;
use netpod::{
channel_from_pairs, get_url_query_pairs, ChannelConfigQuery, FromUrl, NodeConfigCached, NodeStatus,
NodeStatusArchiverAppliance,
};
use netpod::{log::*, ACCEPT_ALL};
use netpod::{APP_JSON, APP_JSON_LINES, APP_OCTET};
use netpod::{channel_from_pairs, get_url_query_pairs};
use netpod::{FromUrl, NodeConfigCached, NodeStatus, NodeStatusArchiverAppliance};
use netpod::{ACCEPT_ALL, APP_JSON, APP_JSON_LINES, APP_OCTET};
use nodenet::conn::events_service;
use panic::{AssertUnwindSafe, UnwindSafe};
use pin::Pin;
@@ -119,6 +118,7 @@ where
impl<F> UnwindSafe for Cont<F> {}
// TODO remove because I want error bodies to be json.
pub fn response_err<T>(status: StatusCode, msg: T) -> Result<Response<Body>, Error>
where
T: AsRef<str>,
@@ -205,6 +205,8 @@ async fn http_service_try(req: Request<Body>, node_config: &NodeConfigCached) ->
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
} else if let Some(h) = channelconfig::ChannelConfigHandler::handler(&req) {
h.handle(req, &node_config).await
} else if let Some(h) = events::EventsHandler::handler(&req) {
h.handle(req, &node_config).await
} else if path == "/api/4/binned" {
@@ -285,15 +287,6 @@ async fn http_service_try(req: Request<Body>, node_config: &NodeConfigCached) ->
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
} else if path == "/api/4/channel/config" {
if req.method() == Method::GET {
match channel_config(req, &node_config).await {
Ok(k) => Ok(k),
Err(e) => Ok(e.to_public_response()),
}
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
} else if path == "/api/1/query" {
if req.method() == Method::POST {
Ok(api1::api1_binary_events(req, &node_config).await?)
@@ -454,36 +447,35 @@ trait ToPublicResponse {
impl ToPublicResponse for Error {
fn to_public_response(&self) -> Response<Body> {
error!("ToPublicResponse converts: {self:?}");
use std::fmt::Write;
let status = match self.reason() {
Some(::err::Reason::BadRequest) => StatusCode::BAD_REQUEST,
Some(::err::Reason::InternalError) => StatusCode::INTERNAL_SERVER_ERROR,
_ => StatusCode::INTERNAL_SERVER_ERROR,
};
let mut msg = match self.public_msg() {
Some(v) => v.join("\n"),
_ => String::new(),
};
write!(msg, "\n\nhttps://data-api.psi.ch/api/4/documentation\n").unwrap();
response(status).body(Body::from(msg)).unwrap()
self.0.to_public_response()
}
}
impl ToPublicResponse for ::err::Error {
fn to_public_response(&self) -> Response<Body> {
use std::fmt::Write;
let status = match self.reason() {
Some(::err::Reason::BadRequest) => StatusCode::BAD_REQUEST,
Some(::err::Reason::InternalError) => StatusCode::INTERNAL_SERVER_ERROR,
use ::err::Reason;
let e = self.to_public_error();
let status = match e.reason() {
Some(Reason::BadRequest) => StatusCode::BAD_REQUEST,
Some(Reason::InternalError) => StatusCode::INTERNAL_SERVER_ERROR,
_ => StatusCode::INTERNAL_SERVER_ERROR,
};
let mut msg = match self.public_msg() {
Some(v) => v.join("\n"),
_ => String::new(),
let msg = match serde_json::to_string(&e) {
Ok(s) => s,
Err(_) => "can not serialize error".into(),
};
write!(msg, "\n\nhttps://data-api.psi.ch/api/4/documentation\n").unwrap();
response(status).body(Body::from(msg)).unwrap()
match response(status)
.header(http::header::ACCEPT, APP_JSON)
.body(Body::from(msg))
{
Ok(res) => res,
Err(e) => {
error!("can not generate http error response {e:?}");
let mut res = Response::new(Body::default());
*res.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
res
}
}
}
}
@@ -739,24 +731,6 @@ pub async fn update_search_cache(req: Request<Body>, node_config: &NodeConfigCac
Ok(ret)
}
pub async fn channel_config(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
let url = Url::parse(&format!("dummy:{}", req.uri()))?;
//let pairs = get_url_query_pairs(&url);
let q = ChannelConfigQuery::from_url(&url)?;
let conf = if let Some(conf) = &node_config.node.channel_archiver {
archapp_wrap::archapp::archeng::channel_config_from_db(&q, conf, &node_config.node_config.cluster.database)
.await?
} else if let Some(conf) = &node_config.node.archiver_appliance {
archapp_wrap::channel_config(&q, conf).await?
} else {
parse::channelconfig::channel_config(&q, &node_config.node).await?
};
let ret = response(StatusCode::OK)
.header(http::header::CONTENT_TYPE, APP_JSON)
.body(Body::from(serde_json::to_string(&conf)?))?;
Ok(ret)
}
pub async fn ca_connect_1(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
let url = Url::parse(&format!("dummy:{}", req.uri()))?;
let pairs = get_url_query_pairs(&url);

View File

@@ -326,12 +326,11 @@ async fn update_task(do_abort: Arc<AtomicUsize>, node_config: NodeConfigCached)
info!("update_task break A");
break;
}
tokio::time::sleep(Duration::from_millis(60000)).await;
tokio::time::sleep(Duration::from_millis(165000 + 0x7fff * commonio::tokio_rand().await?)).await;
if do_abort.load(Ordering::SeqCst) != 0 {
info!("update_task break B");
break;
}
info!("Start update task");
let ts1 = Instant::now();
match IndexFullHttpFunction::index(&node_config).await {
Ok(_) => {}