Update node status

This commit is contained in:
Dominik Werder
2022-12-16 16:22:48 +01:00
parent 6c5ada63e7
commit f6d92966cd
9 changed files with 377 additions and 401 deletions

View File

@@ -8,6 +8,7 @@ pub mod pg {
use err::Error;
use netpod::log::*;
use netpod::TableSizes;
use netpod::{Channel, Database, NodeConfigCached};
use netpod::{ScalarType, Shape};
use std::sync::Arc;
@@ -102,10 +103,6 @@ pub async fn database_size(node_config: &NodeConfigCached) -> Result<u64, Error>
Ok(size)
}
pub struct TableSizes {
pub sizes: Vec<(String, String)>,
}
pub async fn table_sizes(node_config: &NodeConfigCached) -> Result<TableSizes, Error> {
let sql = format!(
"{} {} {} {} {} {} {}",

View File

@@ -114,7 +114,7 @@ pub async fn channel_search_list_v1(req: Request<Body>, proxy_config: &ProxyConf
description_regex: query.description_regex.map_or(String::new(), |k| k),
};
let urls = proxy_config
.backends_search
.backends
.iter()
.map(|sh| match Url::parse(&format!("{}/api/4/search/channel", sh.url)) {
Ok(mut url) => {
@@ -144,29 +144,36 @@ pub async fn channel_search_list_v1(req: Request<Body>, proxy_config: &ProxyConf
};
Box::pin(fut) as Pin<Box<dyn Future<Output = _> + Send>>
};
let ft = |all: Vec<SubRes<ChannelSearchResult>>| {
let mut res = ChannelSearchResultV1(vec![]);
for j in all {
for k in j.val.channels {
let mut found = false;
let mut i2 = 0;
for i1 in 0..res.0.len() {
if res.0[i1].backend == k.backend {
found = true;
i2 = i1;
break;
let ft = |all: Vec<(crate::gather::Tag, Result<SubRes<ChannelSearchResult>, Error>)>| {
let mut res = ChannelSearchResultV1(Vec::new());
for (_tag, j) in all {
match j {
Ok(j) => {
for k in j.val.channels {
let mut found = false;
let mut i2 = 0;
for i1 in 0..res.0.len() {
if res.0[i1].backend == k.backend {
found = true;
i2 = i1;
break;
}
}
if !found {
let u = ChannelSearchResultItemV1 {
backend: k.backend,
channels: Vec::new(),
error: None,
};
res.0.push(u);
i2 = res.0.len() - 1;
}
res.0[i2].channels.push(k.name);
}
}
if !found {
let u = ChannelSearchResultItemV1 {
backend: k.backend,
channels: vec![],
error: None,
};
res.0.push(u);
i2 = res.0.len() - 1;
Err(e) => {
warn!("{e}");
}
res.0[i2].channels.push(k.name);
}
}
let res = response(StatusCode::OK)
@@ -213,7 +220,7 @@ pub async fn channel_search_configs_v1(
description_regex: query.description_regex.map_or(String::new(), |k| k),
};
let urls = proxy_config
.backends_search
.backends
.iter()
.map(|sh| match Url::parse(&format!("{}/api/4/search/channel", sh.url)) {
Ok(mut url) => {
@@ -243,46 +250,53 @@ pub async fn channel_search_configs_v1(
};
Box::pin(fut) as Pin<Box<dyn Future<Output = _> + Send>>
};
let ft = |all: Vec<SubRes<ChannelSearchResult>>| {
let mut res = ChannelConfigsResponseV1(vec![]);
for j in all {
for k in j.val.channels {
let mut found = false;
let mut i2 = 0;
for i1 in 0..res.0.len() {
if res.0[i1].backend == k.backend {
found = true;
i2 = i1;
break;
let ft = |all: Vec<(crate::gather::Tag, Result<SubRes<ChannelSearchResult>, Error>)>| {
let mut res = ChannelConfigsResponseV1(Vec::new());
for (_tag, j) in all {
match j {
Ok(j) => {
for k in j.val.channels {
let mut found = false;
let mut i2 = 0;
for i1 in 0..res.0.len() {
if res.0[i1].backend == k.backend {
found = true;
i2 = i1;
break;
}
}
if !found {
let u = ChannelBackendConfigsV1 {
backend: k.backend.clone(),
channels: Vec::new(),
error: None,
};
res.0.push(u);
i2 = res.0.len() - 1;
}
{
let shape = if k.shape.len() == 0 { None } else { Some(k.shape) };
let unit = if k.unit.len() == 0 { None } else { Some(k.unit) };
let description = if k.description.len() == 0 {
None
} else {
Some(k.description)
};
let t = ChannelConfigV1 {
backend: k.backend,
name: k.name,
source: k.source,
description,
ty: k.ty,
shape,
unit,
};
res.0[i2].channels.push(t);
}
}
}
if !found {
let u = ChannelBackendConfigsV1 {
backend: k.backend.clone(),
channels: vec![],
error: None,
};
res.0.push(u);
i2 = res.0.len() - 1;
}
{
let shape = if k.shape.len() == 0 { None } else { Some(k.shape) };
let unit = if k.unit.len() == 0 { None } else { Some(k.unit) };
let description = if k.description.len() == 0 {
None
} else {
Some(k.description)
};
let t = ChannelConfigV1 {
backend: k.backend,
name: k.name,
source: k.source,
description,
ty: k.ty,
shape,
unit,
};
res.0[i2].channels.push(t);
Err(e) => {
warn!("{e}");
}
}
}

View File

@@ -1,2 +1,3 @@
pub mod binned;
pub mod search;
pub mod status;

107
httpret/src/api4/status.rs Normal file
View File

@@ -0,0 +1,107 @@
use crate::bodystream::response;
use crate::err::Error;
use crate::ReqCtx;
use http::Request;
use http::Response;
use http::StatusCode;
use hyper::Body;
use netpod::log::*;
use netpod::NodeConfigCached;
use netpod::NodeStatus;
use netpod::NodeStatusArchiverAppliance;
use netpod::TableSizes;
use std::time::Duration;
async fn table_sizes(node_config: &NodeConfigCached) -> Result<TableSizes, Error> {
let ret = dbconn::table_sizes(node_config).await?;
Ok(ret)
}
pub struct StatusNodesRecursive {}
impl StatusNodesRecursive {
pub fn path() -> &'static str {
"/api/4/private/status/nodes/recursive"
}
pub fn handler(req: &Request<Body>) -> Option<Self> {
if req.uri().path() == Self::path() {
Some(Self {})
} else {
None
}
}
pub async fn handle(
&self,
req: Request<Body>,
ctx: &ReqCtx,
node_config: &NodeConfigCached,
) -> Result<Response<Body>, Error> {
let res = tokio::time::timeout(Duration::from_millis(1200), self.status(req, ctx, node_config)).await;
let res = match res {
Ok(res) => res,
Err(e) => {
let e = Error::from(e);
return Ok(crate::bodystream::ToPublicResponse::to_public_response(&e));
}
};
match res {
Ok(status) => {
let body = serde_json::to_vec(&status)?;
let ret = response(StatusCode::OK).body(Body::from(body))?;
Ok(ret)
}
Err(e) => {
error!("{e}");
let ret = crate::bodystream::ToPublicResponse::to_public_response(&e);
Ok(ret)
}
}
}
async fn status(
&self,
req: Request<Body>,
_ctx: &ReqCtx,
node_config: &NodeConfigCached,
) -> Result<NodeStatus, Error> {
let (_head, _body) = req.into_parts();
let archiver_appliance_status = match node_config.node.archiver_appliance.as_ref() {
Some(k) => {
let mut st = Vec::new();
for p in &k.data_base_paths {
let _m = match tokio::fs::metadata(p).await {
Ok(m) => m,
Err(_e) => {
st.push((p.into(), false));
continue;
}
};
let _ = match tokio::fs::read_dir(p).await {
Ok(rd) => rd,
Err(_e) => {
st.push((p.into(), false));
continue;
}
};
st.push((p.into(), true));
}
Some(NodeStatusArchiverAppliance { readable: st })
}
None => None,
};
let database_size = dbconn::database_size(node_config).await.map_err(|e| format!("{e:?}"));
let ret = NodeStatus {
name: format!("{}:{}", node_config.node.host, node_config.node.port),
is_sf_databuffer: node_config.node.sf_databuffer.is_some(),
is_archiver_engine: node_config.node.channel_archiver.is_some(),
is_archiver_appliance: node_config.node.archiver_appliance.is_some(),
database_size: Some(database_size),
table_sizes: Some(table_sizes(node_config).await.map_err(Into::into)),
archiver_appliance_status,
subs: None,
};
Ok(ret)
}
}

View File

@@ -162,13 +162,16 @@ pub async fn gather_get_json(req: Request<Body>, node_config: &NodeConfigCached)
Ok(res)
}
#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord, Serialize, Deserialize)]
pub struct Tag(pub String);
pub struct SubRes<T> {
pub tag: String,
pub status: StatusCode,
pub val: T,
}
pub async fn gather_get_json_generic<SM, NT, FT>(
pub async fn gather_get_json_generic<SM, NT, FT, OUT>(
_method: http::Method,
urls: Vec<Url>,
bodies: Vec<Option<Body>>,
@@ -177,7 +180,7 @@ pub async fn gather_get_json_generic<SM, NT, FT>(
ft: FT,
// TODO use deadline instead
timeout: Duration,
) -> Result<Response<Body>, Error>
) -> Result<OUT, Error>
where
SM: Send + 'static,
NT: Fn(String, Response<Body>) -> Pin<Box<dyn Future<Output = Result<SubRes<SM>, Error>> + Send>>
@@ -185,7 +188,7 @@ where
+ Sync
+ Copy
+ 'static,
FT: Fn(Vec<SubRes<SM>>) -> Result<Response<Body>, Error>,
FT: Fn(Vec<(Tag, Result<SubRes<SM>, Error>)>) -> Result<OUT, Error>,
{
if urls.len() != bodies.len() {
return Err(Error::with_msg_no_trace("unequal numbers of urls and bodies"));
@@ -198,6 +201,7 @@ where
.zip(bodies.into_iter())
.zip(tags.into_iter())
.map(move |((url, body), tag)| {
info!("Try gather from {}", url);
let url_str = url.as_str();
let req = if body.is_some() {
Request::builder().method(Method::POST).uri(url_str)
@@ -215,36 +219,37 @@ where
Some(body) => body,
};
let req = req.body(body);
let tag2 = tag.clone();
let jh = tokio::spawn(async move {
select! {
_ = sleep(timeout).fuse() => {
Err(Error::with_msg("timeout"))
Err(Error::with_msg_no_trace("timeout"))
}
res = {
let client = Client::new();
client.request(req?).fuse()
} => {
let ret = nt(tag, res?).await?;
let ret = nt(tag2, res?).await?;
Ok(ret)
}
}
});
(url, jh)
(url, tag, jh)
})
.collect();
let mut a: Vec<SubRes<SM>> = Vec::new();
for (_url, jh) in spawned {
let res: SubRes<SM> = match jh.await {
let mut a = Vec::new();
for (_url, tag, jh) in spawned {
let res = match jh.await {
Ok(k) => match k {
Ok(k) => k,
Ok(k) => (Tag(tag), Ok(k)),
Err(e) => {
warn!("{e:?}");
return Err(e);
(Tag(tag), Err(e))
}
},
Err(e) => {
warn!("{e:?}");
return Err(e.into());
(Tag(tag), Err(e.into()))
}
};
a.push(res);
@@ -260,9 +265,9 @@ mod test {
fn try_search() {
let fut = gather_get_json_generic(
hyper::Method::GET,
vec![],
vec![],
vec![],
Vec::new(),
Vec::new(),
Vec::new(),
|tag, _res| {
let fut = async {
let ret = SubRes {
@@ -274,13 +279,8 @@ mod test {
};
Box::pin(fut)
},
|_all| {
let res = response(StatusCode::OK)
.header(http::header::CONTENT_TYPE, APP_JSON)
.body(serde_json::to_string(&42)?.into())?;
Ok(res)
},
Duration::from_millis(4000),
|_all| Ok(String::from("DUMMY-SEARCH-TEST-RESULT-TODO")),
Duration::from_millis(800),
);
let _ = fut;
}

View File

@@ -18,16 +18,20 @@ use crate::err::Error;
use crate::gather::gather_get_json;
use crate::pulsemap::UpdateTask;
use futures_util::{Future, FutureExt, StreamExt};
use http::{Method, StatusCode};
use http::Method;
use http::StatusCode;
use hyper::server::conn::AddrStream;
use hyper::server::Server;
use hyper::service::{make_service_fn, service_fn};
use hyper::{server::Server, Body, Request, Response};
use hyper::Body;
use hyper::Request;
use hyper::Response;
use net::SocketAddr;
use netpod::log::*;
use netpod::query::prebinned::PreBinnedQuery;
use netpod::timeunits::SEC;
use netpod::NodeConfigCached;
use netpod::ProxyConfig;
use netpod::{NodeConfigCached, NodeStatus, NodeStatusArchiverAppliance};
use netpod::{APP_JSON, APP_JSON_LINES};
use nodenet::conn::events_service;
use panic::{AssertUnwindSafe, UnwindSafe};
@@ -262,25 +266,24 @@ async fn http_service_inner(
) -> Result<Response<Body>, Error> {
let uri = req.uri().clone();
let path = uri.path();
if path == "/api/4/node_status" {
if req.method() == Method::GET {
Ok(node_status(req, ctx, &node_config).await?)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
} else if path == "/api/4/version" {
if path == "/api/4/private/version" {
if req.method() == Method::GET {
let ver_maj: u32 = std::env!("CARGO_PKG_VERSION_MAJOR").parse().unwrap_or(0);
let ver_min: u32 = std::env!("CARGO_PKG_VERSION_MINOR").parse().unwrap_or(0);
let ver_pat: u32 = std::env!("CARGO_PKG_VERSION_PATCH").parse().unwrap_or(0);
let ret = serde_json::json!({
"data_api_version": {
"major": 4u32,
"minor": 2u32,
"patch": 0u32,
"daqbuf_version": {
"major": ver_maj,
"minor": ver_min,
"patch": ver_pat,
},
});
Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&ret)?))?)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
} else if let Some(h) = api4::status::StatusNodesRecursive::handler(&req) {
h.handle(req, ctx, &node_config).await
} else if let Some(h) = StatusBoardAllHandler::handler(&req) {
h.handle(req, &node_config).await
} else if let Some(h) = api4::search::ChannelSearchHandler::handler(&req) {
@@ -311,12 +314,6 @@ async fn http_service_inner(
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
} else if path == "/api/4/table_sizes" {
if req.method() == Method::GET {
Ok(table_sizes(req, ctx, &node_config).await?)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
} else if path == "/api/4/random/channel" {
if req.method() == Method::GET {
Ok(random_channel(req, ctx, &node_config).await?)
@@ -472,65 +469,6 @@ async fn prebinned_inner(
todo!()
}
async fn node_status(
req: Request<Body>,
_ctx: &ReqCtx,
node_config: &NodeConfigCached,
) -> Result<Response<Body>, Error> {
let (_head, _body) = req.into_parts();
let archiver_appliance_status = match node_config.node.archiver_appliance.as_ref() {
Some(k) => {
let mut st = Vec::new();
for p in &k.data_base_paths {
let _m = match tokio::fs::metadata(p).await {
Ok(m) => m,
Err(_e) => {
st.push((p.into(), false));
continue;
}
};
let _ = match tokio::fs::read_dir(p).await {
Ok(rd) => rd,
Err(_e) => {
st.push((p.into(), false));
continue;
}
};
st.push((p.into(), true));
}
Some(NodeStatusArchiverAppliance { readable: st })
}
None => None,
};
let database_size = dbconn::database_size(node_config).await.map_err(|e| format!("{e:?}"));
let ret = NodeStatus {
is_sf_databuffer: node_config.node.sf_databuffer.is_some(),
is_archiver_engine: node_config.node.channel_archiver.is_some(),
is_archiver_appliance: node_config.node.archiver_appliance.is_some(),
database_size,
archiver_appliance_status,
};
let ret = serde_json::to_vec(&ret)?;
let ret = response(StatusCode::OK).body(Body::from(ret))?;
Ok(ret)
}
async fn table_sizes(
req: Request<Body>,
_ctx: &ReqCtx,
node_config: &NodeConfigCached,
) -> Result<Response<Body>, Error> {
let (_head, _body) = req.into_parts();
let sizes = dbconn::table_sizes(node_config).await?;
let mut ret = String::new();
for size in sizes.sizes {
use std::fmt::Write;
write!(ret, "{:60} {:20}\n", size.0, size.1)?;
}
let ret = response(StatusCode::OK).body(Body::from(ret))?;
Ok(ret)
}
pub async fn random_channel(
req: Request<Body>,
_ctx: &ReqCtx,

View File

@@ -90,31 +90,26 @@ async fn proxy_http_service_inner(
Ok(channel_search_list_v1(req, proxy_config).await?)
} else if path == "/api/1/channels/config" {
Ok(channel_search_configs_v1(req, proxy_config).await?)
} else if path == "/api/1/stats/version" {
Err(Error::with_msg("todo"))
} else if path == "/api/1/stats/" {
Err(Error::with_msg("todo"))
} else if path == "/api/1/query" {
Ok(proxy_api1_single_backend_query(req, proxy_config).await?)
} else if path.starts_with("/api/1/map/pulse/") {
warn!("/api/1/map/pulse/ DEPRECATED");
Ok(proxy_api1_map_pulse(req, ctx, proxy_config).await?)
} else if path.starts_with("/api/1/gather/") {
Ok(gather_json_2_v1(req, "/api/1/gather/", proxy_config).await?)
} else if path == "/api/4/version" {
} else if path == "/api/4/private/version" {
if req.method() == Method::GET {
let ver_maj: u32 = std::env!("CARGO_PKG_VERSION_MAJOR").parse().unwrap_or(0);
let ver_min: u32 = std::env!("CARGO_PKG_VERSION_MINOR").parse().unwrap_or(0);
let ver_pat: u32 = std::env!("CARGO_PKG_VERSION_PATCH").parse().unwrap_or(0);
let ret = serde_json::json!({
"data_api_version": {
"major": 4,
"minor": 1,
"daqbuf_version": {
"major": ver_maj,
"minor": ver_min,
"patch": ver_pat,
},
});
Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&ret)?))?)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
} else if path == "/api/4/node_status" {
Ok(api4::node_status(req, proxy_config).await?)
} else if let Some(h) = api4::StatusNodesRecursive::handler(&req) {
h.handle(req, ctx, &proxy_config).await
} else if path == "/api/4/backends" {
Ok(backends(req, proxy_config).await?)
} else if path == "/api/4/search/channel" {
@@ -254,7 +249,7 @@ pub async fn channel_search(req: Request<Body>, proxy_config: &ProxyConfig) -> R
let mut methods = vec![];
let mut bodies = vec![];
let mut urls = proxy_config
.backends_search
.backends
.iter()
.map(|sh| match Url::parse(&format!("{}/api/4/search/channel", sh.url)) {
Ok(mut url) => {
@@ -269,9 +264,8 @@ pub async fn channel_search(req: Request<Body>, proxy_config: &ProxyConfig) -> R
bodies.push(None);
a
})?;
if let (Some(hosts), Some(backends)) =
(&proxy_config.api_0_search_hosts, &proxy_config.api_0_search_backends)
{
// TODO probably no longer needed?
if let (Some(hosts), Some(backends)) = (None::<&Vec<String>>, None::<&Vec<String>>) {
#[derive(Serialize)]
struct QueryApi0 {
backends: Vec<String>,
@@ -371,11 +365,18 @@ pub async fn channel_search(req: Request<Body>, proxy_config: &ProxyConfig) -> R
};
Box::pin(fut) as Pin<Box<dyn Future<Output = _> + Send>>
};
let ft = |all: Vec<SubRes<ChannelSearchResult>>| {
let mut res = vec![];
for j in all {
for k in j.val.channels {
res.push(k);
let ft = |all: Vec<(crate::gather::Tag, Result<SubRes<ChannelSearchResult>, Error>)>| {
let mut res = Vec::new();
for (_tag, j) in all {
match j {
Ok(j) => {
for k in j.val.channels {
res.push(k);
}
}
Err(e) => {
warn!("{e}");
}
}
}
let res = ChannelSearchResult { channels: res };
@@ -407,126 +408,6 @@ pub async fn channel_search(req: Request<Body>, proxy_config: &ProxyConfig) -> R
}
}
pub async fn proxy_api1_map_pulse(
req: Request<Body>,
_ctx: &ReqCtx,
proxy_config: &ProxyConfig,
) -> Result<Response<Body>, Error> {
let s2 = format!("http://dummy/{}", req.uri());
info!("s2: {:?}", s2);
let url = Url::parse(&s2)?;
let mut backend = None;
for (k, v) in url.query_pairs() {
if k == "backend" {
backend = Some(v.to_string());
}
}
let backend = if let Some(backend) = backend {
backend
} else {
return Ok(super::response_err(
StatusCode::BAD_REQUEST,
"Required parameter `backend` not specified.",
)?);
};
let pulseid = if let Some(k) = url.path_segments() {
if let Some(k) = k.rev().next() {
if let Ok(k) = k.to_string().parse::<u64>() {
k
} else {
return Ok(super::response_err(
StatusCode::BAD_REQUEST,
"Can not parse parameter `pulseid`.",
)?);
}
} else {
return Ok(super::response_err(
StatusCode::BAD_REQUEST,
"Can not parse parameter `pulseid`.",
)?);
}
} else {
return Ok(super::response_err(
StatusCode::BAD_REQUEST,
"Required parameter `pulseid` not specified.",
)?);
};
match proxy_config
.backends_pulse_map
.iter()
.filter(|x| x.name == backend)
.next()
{
Some(g) => {
let sh = &g.url;
let url = format!("{}/api/1/map/pulse/{}", sh, pulseid);
let req = Request::builder().method(Method::GET).uri(url).body(Body::empty())?;
let c = hyper::Client::new();
let res = c.request(req).await?;
let ret = response(StatusCode::OK).body(res.into_body())?;
Ok(ret)
}
None => {
return Ok(super::response_err(
StatusCode::BAD_REQUEST,
format!("can not find backend for api1 pulse map"),
)?);
}
}
}
pub async fn proxy_api1_single_backend_query(
_req: Request<Body>,
_proxy_config: &ProxyConfig,
) -> Result<Response<Body>, Error> {
// TODO
/*
if let Some(back) = proxy_config.backends_event_download.first() {
let is_tls = req
.uri()
.scheme()
.ok_or_else(|| Error::with_msg_no_trace("no uri scheme"))?
== &http::uri::Scheme::HTTPS;
let bld = Request::builder().method(req.method());
let bld = bld.uri(req.uri());
// TODO to proxy events over multiple backends, we also have to concat results from different backends.
// TODO Carry on needed headers (but should not simply append all)
for (k, v) in req.headers() {
bld.header(k, v);
}
{
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
proxy_config.name.hash(&mut hasher);
let mid = hasher.finish();
bld.header(format!("proxy-mark-{mid:0x}"), proxy_config.name);
}
let body_data = hyper::body::to_bytes(req.into_body()).await?;
let reqout = bld.body(Body::from(body_data))?;
let resfut = {
use hyper::Client;
if is_tls {
let https = HttpsConnector::new();
let client = Client::builder().build::<_, Body>(https);
let req = client.request(reqout);
let req = Box::pin(req) as Pin<Box<dyn Future<Output = Result<Response<Body>, hyper::Error>> + Send>>;
req
} else {
let client = Client::new();
let req = client.request(reqout);
let req = Box::pin(req) as _;
req
}
};
resfut.timeout();
} else {
Err(Error::with_msg_no_trace(format!("no api1 event backend configured")))
}
*/
todo!()
}
pub async fn proxy_single_backend_query<QT>(
req: Request<Body>,
_ctx: &ReqCtx,
@@ -535,8 +416,8 @@ pub async fn proxy_single_backend_query<QT>(
where
QT: FromUrl + AppendToUrl + HasBackend + HasTimeout,
{
info!("proxy_single_backend_query");
let (head, _body) = req.into_parts();
info!("proxy_single_backend_query {}", head.uri);
match head.headers.get(http::header::ACCEPT) {
Some(v) => {
if v == APP_JSON || v == ACCEPT_ALL {
@@ -548,15 +429,7 @@ where
return Ok(response_err(StatusCode::BAD_REQUEST, msg)?);
}
};
// TODO remove this special case
// SPECIAL CASE
// For pulse mapping we want to use a separate list of backends from the config file.
// In general, caller of this function should already provide the chosen list of backends.
let sh = if url.as_str().contains("/map/pulse/") {
get_query_host_for_backend_2(&query.backend(), proxy_config)?
} else {
get_query_host_for_backend(&query.backend(), proxy_config)?
};
let sh = get_query_host_for_backend(&query.backend(), proxy_config)?;
// TODO remove this special case
// SPECIAL CASE:
// Since the inner proxy is not yet handling map-pulse requests without backend,
@@ -626,16 +499,25 @@ where
};
Box::pin(fut) as Pin<Box<dyn Future<Output = Result<SubRes<serde_json::Value>, Error>> + Send>>
};
let ft = |mut all: Vec<SubRes<JsonValue>>| {
let ft = |mut all: Vec<(crate::gather::Tag, Result<SubRes<JsonValue>, Error>)>| {
if all.len() > 0 {
all.truncate(1);
let z = all.pop().unwrap();
let res = z.val;
// TODO want to pass arbitrary body type:
let res = response(z.status)
.header(http::header::CONTENT_TYPE, APP_JSON)
.body(Body::from(serde_json::to_string(&res)?))?;
return Ok(res);
let (_tag, z) = all.pop().unwrap();
match z {
Ok(z) => {
let res = z.val;
// TODO want to pass arbitrary body type:
let res = response(z.status)
.header(http::header::CONTENT_TYPE, APP_JSON)
.body(Body::from(serde_json::to_string(&res)?))?;
return Ok(res);
}
Err(e) => {
warn!("{e}");
let res = crate::bodystream::ToPublicResponse::to_public_response(&e);
return Ok(res);
}
}
} else {
return Err(Error::with_msg("no response from upstream"));
}
@@ -660,12 +542,3 @@ fn get_query_host_for_backend(backend: &str, proxy_config: &ProxyConfig) -> Resu
}
return Err(Error::with_msg(format!("host not found for backend {:?}", backend)));
}
fn get_query_host_for_backend_2(backend: &str, proxy_config: &ProxyConfig) -> Result<String, Error> {
for back in &proxy_config.backends_pulse_map {
if back.name == backend {
return Ok(back.url.clone());
}
}
return Err(Error::with_msg(format!("host not found for backend {:?}", backend)));
}

View File

@@ -1,14 +1,16 @@
use crate::err::Error;
use crate::gather::{gather_get_json_generic, SubRes};
use crate::response;
use crate::gather::{gather_get_json_generic, SubRes, Tag};
use crate::{response, ReqCtx};
use futures_util::Future;
use http::{header, Request, Response, StatusCode};
use hyper::Body;
use itertools::Itertools;
use netpod::log::*;
use netpod::NodeStatus;
use netpod::ACCEPT_ALL;
use netpod::{ChannelSearchQuery, ChannelSearchResult, ProxyConfig, APP_JSON};
use serde_json::Value as JsVal;
use std::collections::BTreeMap;
use std::pin::Pin;
use std::time::Duration;
use url::Url;
@@ -22,7 +24,7 @@ pub async fn channel_search(req: Request<Body>, proxy_config: &ProxyConfig) -> R
let query = ChannelSearchQuery::from_url(&inpurl)?;
let mut bodies = vec![];
let urls = proxy_config
.backends_search
.backends
.iter()
.filter(|k| {
if let Some(back) = &query.backend {
@@ -65,11 +67,18 @@ pub async fn channel_search(req: Request<Body>, proxy_config: &ProxyConfig) -> R
};
Box::pin(fut) as Pin<Box<dyn Future<Output = _> + Send>>
};
let ft = |all: Vec<SubRes<ChannelSearchResult>>| {
let mut res = vec![];
for j in all {
for k in j.val.channels {
res.push(k);
let ft = |all: Vec<(Tag, Result<SubRes<ChannelSearchResult>, Error>)>| {
let mut res = Vec::new();
for (_tag, j) in all {
match j {
Ok(j) => {
for k in j.val.channels {
res.push(k);
}
}
Err(e) => {
warn!("{e}");
}
}
}
let res = ChannelSearchResult { channels: res };
@@ -97,35 +106,69 @@ pub async fn channel_search(req: Request<Body>, proxy_config: &ProxyConfig) -> R
}
}
pub async fn node_status(req: Request<Body>, proxy_config: &ProxyConfig) -> Result<Response<Body>, Error> {
let (head, _body) = req.into_parts();
let v = head
.headers
.get(http::header::ACCEPT)
.ok_or(Error::with_msg_no_trace("no accept header"))?;
let v = v.to_str()?;
if v.contains(APP_JSON) || v.contains(ACCEPT_ALL) {
pub struct StatusNodesRecursive {}
impl StatusNodesRecursive {
pub fn handler(req: &Request<Body>) -> Option<Self> {
if req.uri().path() == crate::api4::status::StatusNodesRecursive::path() {
Some(Self {})
} else {
None
}
}
pub async fn handle(
&self,
req: Request<Body>,
ctx: &ReqCtx,
node_config: &ProxyConfig,
) -> Result<Response<Body>, Error> {
match self.status(req, ctx, node_config).await {
Ok(status) => {
let body = serde_json::to_vec(&status)?;
let ret = response(StatusCode::OK).body(Body::from(body))?;
Ok(ret)
}
Err(e) => {
error!("{e}");
let ret = crate::bodystream::ToPublicResponse::to_public_response(&e);
Ok(ret)
}
}
}
async fn status(
&self,
_req: Request<Body>,
_ctx: &ReqCtx,
proxy_config: &ProxyConfig,
) -> Result<NodeStatus, Error> {
let path = crate::api4::status::StatusNodesRecursive::path();
let mut bodies = Vec::new();
let urls = proxy_config
.backends_status
.iter()
.map(|b| match Url::parse(&format!("{}/api/4/node_status", b.url)) {
Ok(url) => Ok(url),
Err(e) => Err(Error::with_msg(format!("parse error for: {b:?} {e:?}"))),
})
.fold_ok(Vec::new(), |mut a, x| {
a.push(x);
bodies.push(None);
a
})?;
let tags = urls.iter().map(|k| k.to_string()).collect();
let mut urls = Vec::new();
let mut tags = Vec::new();
for backend in &proxy_config.backends {
match Url::parse(&format!("{}{}", backend.url, path)) {
Ok(url) => {
bodies.push(None);
tags.push(url.to_string());
urls.push(url);
}
Err(e) => return Err(Error::with_msg_no_trace(format!("parse error for: {backend:?} {e:?}"))),
}
}
let nt = |tag, res| {
let fut = async {
let body = hyper::body::to_bytes(res).await?;
let res: JsVal = match serde_json::from_slice(&body) {
Ok(k) => k,
Err(_) => {
let msg = format!("can not parse result: {}", String::from_utf8_lossy(&body));
Err(e) => {
error!("{e}");
let msg = format!(
"gather sub responses, can not parse result: {} {}",
String::from_utf8_lossy(&body),
e,
);
error!("{}", msg);
return Err(Error::with_msg_no_trace(msg));
}
@@ -139,23 +182,30 @@ pub async fn node_status(req: Request<Body>, proxy_config: &ProxyConfig) -> Resu
};
Box::pin(fut) as Pin<Box<dyn Future<Output = _> + Send>>
};
let ft = |all: Vec<SubRes<JsVal>>| {
let mut sres = Vec::new();
for j in all {
let val = serde_json::json!({
j.tag: j.val,
});
sres.push(val);
let ft = |all: Vec<(Tag, Result<SubRes<JsVal>, Error>)>| {
let mut subs = BTreeMap::new();
for (tag, sr) in all {
match sr {
Ok(sr) => {
let s: Result<NodeStatus, _> = serde_json::from_value(sr.val).map_err(err::Error::from);
subs.insert(tag.0, s);
}
Err(e) => {
subs.insert(tag.0, Err(err::Error::from(e)));
}
}
}
let res = serde_json::json!({
proxy_config.name.clone(): "TODO proxy status",
"subnodes": sres,
});
let res = response(StatusCode::OK)
.header(http::header::CONTENT_TYPE, APP_JSON)
.body(Body::from(serde_json::to_string(&res)?))
.map_err(Error::from)?;
Ok(res)
let ret = NodeStatus {
name: format!("{}:{}", proxy_config.name, proxy_config.port),
is_sf_databuffer: false,
is_archiver_engine: false,
is_archiver_appliance: false,
database_size: None,
table_sizes: None,
archiver_appliance_status: None,
subs: Some(subs),
};
Ok(ret)
};
let ret = gather_get_json_generic(
http::Method::GET,
@@ -164,13 +214,9 @@ pub async fn node_status(req: Request<Body>, proxy_config: &ProxyConfig) -> Resu
tags,
nt,
ft,
Duration::from_millis(3000),
Duration::from_millis(1200),
)
.await?;
Ok(ret)
} else {
Ok(response(StatusCode::NOT_ACCEPTABLE)
.body(Body::from(format!("{:?}", proxy_config.name)))
.map_err(Error::from)?)
}
}

View File

@@ -538,14 +538,25 @@ pub struct NodeStatusArchiverAppliance {
pub readable: Vec<(PathBuf, bool)>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct TableSizes {
pub sizes: Vec<(String, String)>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct NodeStatus {
//pub node: NodeConfig,
pub name: String,
pub is_sf_databuffer: bool,
pub is_archiver_engine: bool,
pub is_archiver_appliance: bool,
pub database_size: Result<u64, String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub database_size: Option<Result<u64, String>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub table_sizes: Option<Result<TableSizes, Error>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub archiver_appliance_status: Option<NodeStatusArchiverAppliance>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub subs: Option<BTreeMap<String, Result<NodeStatus, Error>>>,
}
// Describes a "channel" which is a time-series with a unique name within a "backend".
@@ -1966,18 +1977,7 @@ pub struct ProxyConfig {
pub name: String,
pub listen: String,
pub port: u16,
#[serde(default)]
pub backends_status: Vec<ProxyBackend>,
#[serde(default)]
pub backends: Vec<ProxyBackend>,
#[serde(default)]
pub backends_pulse_map: Vec<ProxyBackend>,
#[serde(default)]
pub backends_search: Vec<ProxyBackend>,
#[serde(default)]
pub backends_event_download: Vec<ProxyBackend>,
pub api_0_search_hosts: Option<Vec<String>>,
pub api_0_search_backends: Option<Vec<String>>,
}
pub trait HasBackend {