diff --git a/httpret/src/api1.rs b/httpret/src/api1.rs index 6362f7c..f8d86c9 100644 --- a/httpret/src/api1.rs +++ b/httpret/src/api1.rs @@ -9,8 +9,9 @@ use http::{Method, StatusCode}; use hyper::{Body, Client, Request, Response}; use items::{RangeCompletableItem, Sitemty, StreamItem}; use itertools::Itertools; +use netpod::log::*; use netpod::query::RawEventsQuery; -use netpod::{log::*, ByteSize, Channel, FileIoBufferSize, NanoRange, NodeConfigCached, PerfOpts, Shape, APP_OCTET}; +use netpod::{ByteSize, Channel, FileIoBufferSize, NanoRange, NodeConfigCached, PerfOpts, Shape, APP_OCTET}; use netpod::{ChannelSearchQuery, ChannelSearchResult, ProxyConfig, APP_JSON}; use parse::channelconfig::{ extract_matching_config_entry, read_local_config, Config, ConfigEntry, MatchingConfigEntry, @@ -108,9 +109,9 @@ pub async fn channel_search_list_v1(req: Request, proxy_config: &ProxyConf description_regex: query.description_regex.map_or(String::new(), |k| k), }; let urls = proxy_config - .search_hosts + .backends_search .iter() - .map(|sh| match Url::parse(&format!("{}/api/4/search/channel", sh)) { + .map(|sh| match Url::parse(&format!("{}/api/4/search/channel", sh.url)) { Ok(mut url) => { query.append_to_url(&mut url); Ok(url) @@ -207,9 +208,9 @@ pub async fn channel_search_configs_v1( description_regex: query.description_regex.map_or(String::new(), |k| k), }; let urls = proxy_config - .search_hosts + .backends_search .iter() - .map(|sh| match Url::parse(&format!("{}/api/4/search/channel", sh)) { + .map(|sh| match Url::parse(&format!("{}/api/4/search/channel", sh.url)) { Ok(mut url) => { query.append_to_url(&mut url); Ok(url) diff --git a/httpret/src/gather.rs b/httpret/src/gather.rs index f45a484..be89294 100644 --- a/httpret/src/gather.rs +++ b/httpret/src/gather.rs @@ -158,7 +158,7 @@ pub async fn gather_get_json(req: Request, node_config: &NodeConfigCached) } let a = a; let res = response(StatusCode::OK) - .header(http::header::CONTENT_TYPE, "application/json") + .header(http::header::CONTENT_TYPE, APP_JSON) .body(serde_json::to_string(&Jres { hosts: a })?.into())?; Ok(res) } @@ -213,7 +213,7 @@ where Some(body) => body, }; let req = req.body(body); - let task = tokio::spawn(async move { + let jh = tokio::spawn(async move { select! { _ = sleep(timeout).fuse() => { Err(Error::with_msg("timeout")) @@ -234,11 +234,11 @@ where } } }); - (url, task) + (url, jh) }) .collect(); let mut a: Vec> = vec![]; - for (_schemehostport, jh) in spawned { + for (_url, jh) in spawned { let res: SubRes = match jh.await { Ok(k) => match k { Ok(k) => k, diff --git a/httpret/src/httpret.rs b/httpret/src/httpret.rs index d2d0fe5..60ac1a4 100644 --- a/httpret/src/httpret.rs +++ b/httpret/src/httpret.rs @@ -23,13 +23,15 @@ use hyper::{server::Server, Body, Request, Response}; use net::SocketAddr; use netpod::query::BinnedQuery; use netpod::timeunits::SEC; -use netpod::{channel_from_pairs, get_url_query_pairs, AggKind, ChannelConfigQuery, FromUrl, NodeConfigCached}; +use netpod::{ + channel_from_pairs, get_url_query_pairs, AggKind, ChannelConfigQuery, FromUrl, NodeConfigCached, NodeStatus, + NodeStatusArchiverAppliance, +}; use netpod::{log::*, ACCEPT_ALL}; use netpod::{APP_JSON, APP_JSON_LINES, APP_OCTET}; use nodenet::conn::events_service; use panic::{AssertUnwindSafe, UnwindSafe}; use pin::Pin; -use serde::{Deserialize, Serialize}; use std::{future, net, panic, pin, task}; use task::{Context, Poll}; use tracing::field::Empty; @@ -638,15 +640,39 @@ async fn plain_events_json(req: Request, node_config: &NodeConfigCached) - Ok(ret) } -#[derive(Debug, Serialize, Deserialize)] -pub struct NodeStatus { - database_size: u64, -} - async fn node_status(req: Request, node_config: &NodeConfigCached) -> Result, Error> { let (_head, _body) = req.into_parts(); + let archiver_appliance_status = match node_config.node.archiver_appliance.as_ref() { + Some(k) => { + let mut st = vec![]; + for p in &k.data_base_paths { + let _m = match tokio::fs::metadata(p).await { + Ok(m) => m, + Err(_e) => { + st.push((p.into(), false)); + continue; + } + }; + let _ = match tokio::fs::read_dir(p).await { + Ok(rd) => rd, + Err(_e) => { + st.push((p.into(), false)); + continue; + } + }; + st.push((p.into(), true)); + } + Some(NodeStatusArchiverAppliance { readable: st }) + } + None => None, + }; + let database_size = dbconn::database_size(node_config).await.map_err(|e| format!("{e:?}")); let ret = NodeStatus { - database_size: dbconn::database_size(node_config).await?, + is_sf_databuffer: node_config.node.sf_databuffer.is_some(), + is_archiver_engine: node_config.node.channel_archiver.is_some(), + is_archiver_appliance: node_config.node.archiver_appliance.is_some(), + database_size, + archiver_appliance_status, }; let ret = serde_json::to_vec(&ret)?; let ret = response(StatusCode::OK).body(Body::from(ret))?; diff --git a/httpret/src/proxy.rs b/httpret/src/proxy.rs index 14926ff..d356ad3 100644 --- a/httpret/src/proxy.rs +++ b/httpret/src/proxy.rs @@ -101,6 +101,8 @@ async fn proxy_http_service_try(req: Request, proxy_config: &ProxyConfig) } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } + } else if path == "/api/4/node_status" { + Ok(api4::node_status(req, proxy_config).await?) } else if path == "/api/4/backends" { Ok(backends(req, proxy_config).await?) } else if path == "/api/4/search/channel" { @@ -226,11 +228,12 @@ pub async fn channel_search(req: Request, proxy_config: &ProxyConfig) -> R if v == APP_JSON { let url = Url::parse(&format!("dummy:{}", head.uri))?; let query = ChannelSearchQuery::from_url(&url)?; + let mut methods = vec![]; let mut bodies = vec![]; let mut urls = proxy_config - .search_hosts + .backends_search .iter() - .map(|sh| match Url::parse(&format!("{}/api/4/search/channel", sh)) { + .map(|sh| match Url::parse(&format!("{}/api/4/search/channel", sh.url)) { Ok(mut url) => { query.append_to_url(&mut url); Ok(url) @@ -239,6 +242,7 @@ pub async fn channel_search(req: Request, proxy_config: &ProxyConfig) -> R }) .fold_ok(vec![], |mut a, x| { a.push(x); + methods.push(http::Method::GET); bodies.push(None); a })?; @@ -265,6 +269,7 @@ pub async fn channel_search(req: Request, proxy_config: &ProxyConfig) -> R source_regex: query.source_regex.clone(), }; let qs = serde_json::to_string(&q).unwrap(); + methods.push(http::Method::POST); bodies.push(Some(Body::from(qs))); }); } @@ -354,6 +359,10 @@ pub async fn channel_search(req: Request, proxy_config: &ProxyConfig) -> R .body(Body::from(serde_json::to_string(&res)?))?; Ok(res) }; + // TODO gather_get_json_generic must for this case accept a Method for each Request. + // Currently it is inferred via presence of the body. + // On the other hand, I want to gather over rather homogeneous requests. + // So: better enforce same method. let ret = gather_get_json_generic( http::Method::GET, urls, @@ -413,26 +422,33 @@ pub async fn proxy_api1_map_pulse(req: Request, proxy_config: &ProxyConfig "Required parameter `pulseid` not specified.", )?); }; - if backend == "sf-databuffer" { - if proxy_config.search_hosts.len() == 1 { - let url = format!("http://sf-daqbuf-21:8380/api/1/map/pulse/{}", pulseid); + match proxy_config + .backends_pulse_map + .iter() + .filter(|x| x.name == backend) + .next() + { + Some(g) => { + let sh = &g.url; + let url = format!("{}/api/1/map/pulse/{}", sh, pulseid); let req = Request::builder().method(Method::GET).uri(url).body(Body::empty())?; - let res = hyper::Client::new().request(req).await?; - let ret = response(StatusCode::OK).body(res.into_body())?; - Ok(ret) - } else { - let url = format!("https://sf-data-api.psi.ch/api/1/map/pulse/{}", pulseid); - let req = Request::builder().method(Method::GET).uri(url).body(Body::empty())?; - let https = HttpsConnector::new(); - let res = hyper::Client::builder().build(https).request(req).await?; + let res = if sh.starts_with("https") { + let https = HttpsConnector::new(); + let c = hyper::Client::builder().build(https); + c.request(req).await? + } else { + let c = hyper::Client::new(); + c.request(req).await? + }; let ret = response(StatusCode::OK).body(res.into_body())?; Ok(ret) } - } else { - return Ok(super::response_err( - StatusCode::BAD_REQUEST, - format!("backend \"{}\" not supported for this action", backend), - )?); + None => { + return Ok(super::response_err( + StatusCode::BAD_REQUEST, + format!("can not find backend for api1 pulse map"), + )?); + } } } diff --git a/httpret/src/proxy/api4.rs b/httpret/src/proxy/api4.rs index e01bdf9..fe17e88 100644 --- a/httpret/src/proxy/api4.rs +++ b/httpret/src/proxy/api4.rs @@ -8,6 +8,7 @@ use itertools::Itertools; use netpod::log::*; use netpod::ACCEPT_ALL; use netpod::{ChannelSearchQuery, ChannelSearchResult, ProxyConfig, APP_JSON}; +use serde_json::Value as JsVal; use std::pin::Pin; use std::time::Duration; use url::Url; @@ -95,3 +96,86 @@ pub async fn channel_search(req: Request, proxy_config: &ProxyConfig) -> R .map_err(Error::from)?) } } + +pub async fn node_status(req: Request, proxy_config: &ProxyConfig) -> Result, Error> { + let (head, _body) = req.into_parts(); + let vdef = header::HeaderValue::from_static(APP_JSON); + let v = head.headers.get(header::ACCEPT).unwrap_or(&vdef); + if v == APP_JSON || v == ACCEPT_ALL { + let inpurl = Url::parse(&format!("dummy:{}", head.uri))?; + let query = ChannelSearchQuery::from_url(&inpurl)?; + let mut bodies = vec![]; + let urls = proxy_config + .backends_status + .iter() + .filter(|k| { + if let Some(back) = &query.backend { + back == &k.name + } else { + true + } + }) + .map(|pb| match Url::parse(&format!("{}/api/4/node_status", pb.url)) { + Ok(mut url) => { + query.append_to_url(&mut url); + Ok(url) + } + Err(_) => Err(Error::with_msg(format!("parse error for: {:?}", pb))), + }) + .fold_ok(vec![], |mut a, x| { + a.push(x); + bodies.push(None); + a + })?; + let tags = urls.iter().map(|k| k.to_string()).collect(); + let nt = |tag, res| { + let fut = async { + let body = hyper::body::to_bytes(res).await?; + let res: JsVal = match serde_json::from_slice(&body) { + Ok(k) => k, + Err(_) => { + let msg = format!("can not parse result: {}", String::from_utf8_lossy(&body)); + error!("{}", msg); + return Err(Error::with_msg_no_trace(msg)); + } + }; + let ret = SubRes { + tag, + status: StatusCode::OK, + val: res, + }; + Ok(ret) + }; + Box::pin(fut) as Pin + Send>> + }; + let ft = |all: Vec>| { + let mut res = vec![]; + for j in all { + let val = serde_json::json!({ + j.tag: j.val, + }); + res.push(val); + } + let res = response(StatusCode::OK) + .header(http::header::CONTENT_TYPE, APP_JSON) + .body(Body::from(serde_json::to_string(&res)?)) + .map_err(Error::from)?; + Ok(res) + }; + let ret = gather_get_json_generic( + http::Method::GET, + urls, + bodies, + tags, + nt, + ft, + Duration::from_millis(3000), + ) + .await?; + Ok(ret) + } else { + Ok(response(StatusCode::NOT_ACCEPTABLE) + .body(Body::from(format!("{:?}", proxy_config.name))) + .map_err(Error::from)?) + } +} diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index 8718eb0..4bc7bb4 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -323,6 +323,21 @@ impl From for Result { } } +#[derive(Debug, Serialize, Deserialize)] +pub struct NodeStatusArchiverAppliance { + pub readable: Vec<(PathBuf, bool)>, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct NodeStatus { + //pub node: NodeConfig, + pub is_sf_databuffer: bool, + pub is_archiver_engine: bool, + pub is_archiver_appliance: bool, + pub database_size: Result, + pub archiver_appliance_status: Option, +} + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct Channel { pub backend: String, @@ -1354,7 +1369,7 @@ pub struct ProxyConfig { pub name: String, pub listen: String, pub port: u16, - pub search_hosts: Vec, + pub backends_status: Vec, pub backends: Vec, pub backends_pulse_map: Vec, pub backends_search: Vec,