Unify search backends config, add basic node status proxy support
This commit is contained in:
@@ -9,8 +9,9 @@ use http::{Method, StatusCode};
|
||||
use hyper::{Body, Client, Request, Response};
|
||||
use items::{RangeCompletableItem, Sitemty, StreamItem};
|
||||
use itertools::Itertools;
|
||||
use netpod::log::*;
|
||||
use netpod::query::RawEventsQuery;
|
||||
use netpod::{log::*, ByteSize, Channel, FileIoBufferSize, NanoRange, NodeConfigCached, PerfOpts, Shape, APP_OCTET};
|
||||
use netpod::{ByteSize, Channel, FileIoBufferSize, NanoRange, NodeConfigCached, PerfOpts, Shape, APP_OCTET};
|
||||
use netpod::{ChannelSearchQuery, ChannelSearchResult, ProxyConfig, APP_JSON};
|
||||
use parse::channelconfig::{
|
||||
extract_matching_config_entry, read_local_config, Config, ConfigEntry, MatchingConfigEntry,
|
||||
@@ -108,9 +109,9 @@ pub async fn channel_search_list_v1(req: Request<Body>, proxy_config: &ProxyConf
|
||||
description_regex: query.description_regex.map_or(String::new(), |k| k),
|
||||
};
|
||||
let urls = proxy_config
|
||||
.search_hosts
|
||||
.backends_search
|
||||
.iter()
|
||||
.map(|sh| match Url::parse(&format!("{}/api/4/search/channel", sh)) {
|
||||
.map(|sh| match Url::parse(&format!("{}/api/4/search/channel", sh.url)) {
|
||||
Ok(mut url) => {
|
||||
query.append_to_url(&mut url);
|
||||
Ok(url)
|
||||
@@ -207,9 +208,9 @@ pub async fn channel_search_configs_v1(
|
||||
description_regex: query.description_regex.map_or(String::new(), |k| k),
|
||||
};
|
||||
let urls = proxy_config
|
||||
.search_hosts
|
||||
.backends_search
|
||||
.iter()
|
||||
.map(|sh| match Url::parse(&format!("{}/api/4/search/channel", sh)) {
|
||||
.map(|sh| match Url::parse(&format!("{}/api/4/search/channel", sh.url)) {
|
||||
Ok(mut url) => {
|
||||
query.append_to_url(&mut url);
|
||||
Ok(url)
|
||||
|
||||
@@ -158,7 +158,7 @@ pub async fn gather_get_json(req: Request<Body>, node_config: &NodeConfigCached)
|
||||
}
|
||||
let a = a;
|
||||
let res = response(StatusCode::OK)
|
||||
.header(http::header::CONTENT_TYPE, "application/json")
|
||||
.header(http::header::CONTENT_TYPE, APP_JSON)
|
||||
.body(serde_json::to_string(&Jres { hosts: a })?.into())?;
|
||||
Ok(res)
|
||||
}
|
||||
@@ -213,7 +213,7 @@ where
|
||||
Some(body) => body,
|
||||
};
|
||||
let req = req.body(body);
|
||||
let task = tokio::spawn(async move {
|
||||
let jh = tokio::spawn(async move {
|
||||
select! {
|
||||
_ = sleep(timeout).fuse() => {
|
||||
Err(Error::with_msg("timeout"))
|
||||
@@ -234,11 +234,11 @@ where
|
||||
}
|
||||
}
|
||||
});
|
||||
(url, task)
|
||||
(url, jh)
|
||||
})
|
||||
.collect();
|
||||
let mut a: Vec<SubRes<SM>> = vec![];
|
||||
for (_schemehostport, jh) in spawned {
|
||||
for (_url, jh) in spawned {
|
||||
let res: SubRes<SM> = match jh.await {
|
||||
Ok(k) => match k {
|
||||
Ok(k) => k,
|
||||
|
||||
@@ -23,13 +23,15 @@ use hyper::{server::Server, Body, Request, Response};
|
||||
use net::SocketAddr;
|
||||
use netpod::query::BinnedQuery;
|
||||
use netpod::timeunits::SEC;
|
||||
use netpod::{channel_from_pairs, get_url_query_pairs, AggKind, ChannelConfigQuery, FromUrl, NodeConfigCached};
|
||||
use netpod::{
|
||||
channel_from_pairs, get_url_query_pairs, AggKind, ChannelConfigQuery, FromUrl, NodeConfigCached, NodeStatus,
|
||||
NodeStatusArchiverAppliance,
|
||||
};
|
||||
use netpod::{log::*, ACCEPT_ALL};
|
||||
use netpod::{APP_JSON, APP_JSON_LINES, APP_OCTET};
|
||||
use nodenet::conn::events_service;
|
||||
use panic::{AssertUnwindSafe, UnwindSafe};
|
||||
use pin::Pin;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::{future, net, panic, pin, task};
|
||||
use task::{Context, Poll};
|
||||
use tracing::field::Empty;
|
||||
@@ -638,15 +640,39 @@ async fn plain_events_json(req: Request<Body>, node_config: &NodeConfigCached) -
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct NodeStatus {
|
||||
database_size: u64,
|
||||
}
|
||||
|
||||
async fn node_status(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
|
||||
let (_head, _body) = req.into_parts();
|
||||
let archiver_appliance_status = match node_config.node.archiver_appliance.as_ref() {
|
||||
Some(k) => {
|
||||
let mut st = vec![];
|
||||
for p in &k.data_base_paths {
|
||||
let _m = match tokio::fs::metadata(p).await {
|
||||
Ok(m) => m,
|
||||
Err(_e) => {
|
||||
st.push((p.into(), false));
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let _ = match tokio::fs::read_dir(p).await {
|
||||
Ok(rd) => rd,
|
||||
Err(_e) => {
|
||||
st.push((p.into(), false));
|
||||
continue;
|
||||
}
|
||||
};
|
||||
st.push((p.into(), true));
|
||||
}
|
||||
Some(NodeStatusArchiverAppliance { readable: st })
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
let database_size = dbconn::database_size(node_config).await.map_err(|e| format!("{e:?}"));
|
||||
let ret = NodeStatus {
|
||||
database_size: dbconn::database_size(node_config).await?,
|
||||
is_sf_databuffer: node_config.node.sf_databuffer.is_some(),
|
||||
is_archiver_engine: node_config.node.channel_archiver.is_some(),
|
||||
is_archiver_appliance: node_config.node.archiver_appliance.is_some(),
|
||||
database_size,
|
||||
archiver_appliance_status,
|
||||
};
|
||||
let ret = serde_json::to_vec(&ret)?;
|
||||
let ret = response(StatusCode::OK).body(Body::from(ret))?;
|
||||
|
||||
@@ -101,6 +101,8 @@ async fn proxy_http_service_try(req: Request<Body>, proxy_config: &ProxyConfig)
|
||||
} else {
|
||||
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
|
||||
}
|
||||
} else if path == "/api/4/node_status" {
|
||||
Ok(api4::node_status(req, proxy_config).await?)
|
||||
} else if path == "/api/4/backends" {
|
||||
Ok(backends(req, proxy_config).await?)
|
||||
} else if path == "/api/4/search/channel" {
|
||||
@@ -226,11 +228,12 @@ pub async fn channel_search(req: Request<Body>, proxy_config: &ProxyConfig) -> R
|
||||
if v == APP_JSON {
|
||||
let url = Url::parse(&format!("dummy:{}", head.uri))?;
|
||||
let query = ChannelSearchQuery::from_url(&url)?;
|
||||
let mut methods = vec![];
|
||||
let mut bodies = vec![];
|
||||
let mut urls = proxy_config
|
||||
.search_hosts
|
||||
.backends_search
|
||||
.iter()
|
||||
.map(|sh| match Url::parse(&format!("{}/api/4/search/channel", sh)) {
|
||||
.map(|sh| match Url::parse(&format!("{}/api/4/search/channel", sh.url)) {
|
||||
Ok(mut url) => {
|
||||
query.append_to_url(&mut url);
|
||||
Ok(url)
|
||||
@@ -239,6 +242,7 @@ pub async fn channel_search(req: Request<Body>, proxy_config: &ProxyConfig) -> R
|
||||
})
|
||||
.fold_ok(vec![], |mut a, x| {
|
||||
a.push(x);
|
||||
methods.push(http::Method::GET);
|
||||
bodies.push(None);
|
||||
a
|
||||
})?;
|
||||
@@ -265,6 +269,7 @@ pub async fn channel_search(req: Request<Body>, proxy_config: &ProxyConfig) -> R
|
||||
source_regex: query.source_regex.clone(),
|
||||
};
|
||||
let qs = serde_json::to_string(&q).unwrap();
|
||||
methods.push(http::Method::POST);
|
||||
bodies.push(Some(Body::from(qs)));
|
||||
});
|
||||
}
|
||||
@@ -354,6 +359,10 @@ pub async fn channel_search(req: Request<Body>, proxy_config: &ProxyConfig) -> R
|
||||
.body(Body::from(serde_json::to_string(&res)?))?;
|
||||
Ok(res)
|
||||
};
|
||||
// TODO gather_get_json_generic must for this case accept a Method for each Request.
|
||||
// Currently it is inferred via presence of the body.
|
||||
// On the other hand, I want to gather over rather homogeneous requests.
|
||||
// So: better enforce same method.
|
||||
let ret = gather_get_json_generic(
|
||||
http::Method::GET,
|
||||
urls,
|
||||
@@ -413,26 +422,33 @@ pub async fn proxy_api1_map_pulse(req: Request<Body>, proxy_config: &ProxyConfig
|
||||
"Required parameter `pulseid` not specified.",
|
||||
)?);
|
||||
};
|
||||
if backend == "sf-databuffer" {
|
||||
if proxy_config.search_hosts.len() == 1 {
|
||||
let url = format!("http://sf-daqbuf-21:8380/api/1/map/pulse/{}", pulseid);
|
||||
match proxy_config
|
||||
.backends_pulse_map
|
||||
.iter()
|
||||
.filter(|x| x.name == backend)
|
||||
.next()
|
||||
{
|
||||
Some(g) => {
|
||||
let sh = &g.url;
|
||||
let url = format!("{}/api/1/map/pulse/{}", sh, pulseid);
|
||||
let req = Request::builder().method(Method::GET).uri(url).body(Body::empty())?;
|
||||
let res = hyper::Client::new().request(req).await?;
|
||||
let ret = response(StatusCode::OK).body(res.into_body())?;
|
||||
Ok(ret)
|
||||
} else {
|
||||
let url = format!("https://sf-data-api.psi.ch/api/1/map/pulse/{}", pulseid);
|
||||
let req = Request::builder().method(Method::GET).uri(url).body(Body::empty())?;
|
||||
let https = HttpsConnector::new();
|
||||
let res = hyper::Client::builder().build(https).request(req).await?;
|
||||
let res = if sh.starts_with("https") {
|
||||
let https = HttpsConnector::new();
|
||||
let c = hyper::Client::builder().build(https);
|
||||
c.request(req).await?
|
||||
} else {
|
||||
let c = hyper::Client::new();
|
||||
c.request(req).await?
|
||||
};
|
||||
let ret = response(StatusCode::OK).body(res.into_body())?;
|
||||
Ok(ret)
|
||||
}
|
||||
} else {
|
||||
return Ok(super::response_err(
|
||||
StatusCode::BAD_REQUEST,
|
||||
format!("backend \"{}\" not supported for this action", backend),
|
||||
)?);
|
||||
None => {
|
||||
return Ok(super::response_err(
|
||||
StatusCode::BAD_REQUEST,
|
||||
format!("can not find backend for api1 pulse map"),
|
||||
)?);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@ use itertools::Itertools;
|
||||
use netpod::log::*;
|
||||
use netpod::ACCEPT_ALL;
|
||||
use netpod::{ChannelSearchQuery, ChannelSearchResult, ProxyConfig, APP_JSON};
|
||||
use serde_json::Value as JsVal;
|
||||
use std::pin::Pin;
|
||||
use std::time::Duration;
|
||||
use url::Url;
|
||||
@@ -95,3 +96,86 @@ pub async fn channel_search(req: Request<Body>, proxy_config: &ProxyConfig) -> R
|
||||
.map_err(Error::from)?)
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn node_status(req: Request<Body>, proxy_config: &ProxyConfig) -> Result<Response<Body>, Error> {
|
||||
let (head, _body) = req.into_parts();
|
||||
let vdef = header::HeaderValue::from_static(APP_JSON);
|
||||
let v = head.headers.get(header::ACCEPT).unwrap_or(&vdef);
|
||||
if v == APP_JSON || v == ACCEPT_ALL {
|
||||
let inpurl = Url::parse(&format!("dummy:{}", head.uri))?;
|
||||
let query = ChannelSearchQuery::from_url(&inpurl)?;
|
||||
let mut bodies = vec![];
|
||||
let urls = proxy_config
|
||||
.backends_status
|
||||
.iter()
|
||||
.filter(|k| {
|
||||
if let Some(back) = &query.backend {
|
||||
back == &k.name
|
||||
} else {
|
||||
true
|
||||
}
|
||||
})
|
||||
.map(|pb| match Url::parse(&format!("{}/api/4/node_status", pb.url)) {
|
||||
Ok(mut url) => {
|
||||
query.append_to_url(&mut url);
|
||||
Ok(url)
|
||||
}
|
||||
Err(_) => Err(Error::with_msg(format!("parse error for: {:?}", pb))),
|
||||
})
|
||||
.fold_ok(vec![], |mut a, x| {
|
||||
a.push(x);
|
||||
bodies.push(None);
|
||||
a
|
||||
})?;
|
||||
let tags = urls.iter().map(|k| k.to_string()).collect();
|
||||
let nt = |tag, res| {
|
||||
let fut = async {
|
||||
let body = hyper::body::to_bytes(res).await?;
|
||||
let res: JsVal = match serde_json::from_slice(&body) {
|
||||
Ok(k) => k,
|
||||
Err(_) => {
|
||||
let msg = format!("can not parse result: {}", String::from_utf8_lossy(&body));
|
||||
error!("{}", msg);
|
||||
return Err(Error::with_msg_no_trace(msg));
|
||||
}
|
||||
};
|
||||
let ret = SubRes {
|
||||
tag,
|
||||
status: StatusCode::OK,
|
||||
val: res,
|
||||
};
|
||||
Ok(ret)
|
||||
};
|
||||
Box::pin(fut) as Pin<Box<dyn Future<Output = _> + Send>>
|
||||
};
|
||||
let ft = |all: Vec<SubRes<JsVal>>| {
|
||||
let mut res = vec![];
|
||||
for j in all {
|
||||
let val = serde_json::json!({
|
||||
j.tag: j.val,
|
||||
});
|
||||
res.push(val);
|
||||
}
|
||||
let res = response(StatusCode::OK)
|
||||
.header(http::header::CONTENT_TYPE, APP_JSON)
|
||||
.body(Body::from(serde_json::to_string(&res)?))
|
||||
.map_err(Error::from)?;
|
||||
Ok(res)
|
||||
};
|
||||
let ret = gather_get_json_generic(
|
||||
http::Method::GET,
|
||||
urls,
|
||||
bodies,
|
||||
tags,
|
||||
nt,
|
||||
ft,
|
||||
Duration::from_millis(3000),
|
||||
)
|
||||
.await?;
|
||||
Ok(ret)
|
||||
} else {
|
||||
Ok(response(StatusCode::NOT_ACCEPTABLE)
|
||||
.body(Body::from(format!("{:?}", proxy_config.name)))
|
||||
.map_err(Error::from)?)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -323,6 +323,21 @@ impl From<NodeConfig> for Result<NodeConfigCached, Error> {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct NodeStatusArchiverAppliance {
|
||||
pub readable: Vec<(PathBuf, bool)>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct NodeStatus {
|
||||
//pub node: NodeConfig,
|
||||
pub is_sf_databuffer: bool,
|
||||
pub is_archiver_engine: bool,
|
||||
pub is_archiver_appliance: bool,
|
||||
pub database_size: Result<u64, String>,
|
||||
pub archiver_appliance_status: Option<NodeStatusArchiverAppliance>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct Channel {
|
||||
pub backend: String,
|
||||
@@ -1354,7 +1369,7 @@ pub struct ProxyConfig {
|
||||
pub name: String,
|
||||
pub listen: String,
|
||||
pub port: u16,
|
||||
pub search_hosts: Vec<String>,
|
||||
pub backends_status: Vec<ProxyBackend>,
|
||||
pub backends: Vec<ProxyBackend>,
|
||||
pub backends_pulse_map: Vec<ProxyBackend>,
|
||||
pub backends_search: Vec<ProxyBackend>,
|
||||
|
||||
Reference in New Issue
Block a user