diff --git a/Cargo.toml b/Cargo.toml index 65e7573..8259feb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,11 +3,11 @@ members = ["daqbuffer", "h5out", "items", "items_proc", "nodenet", "httpclient", [profile.release] opt-level = 1 -debug = 1 +debug = 0 overflow-checks = false debug-assertions = false -lto = "off" -codegen-units = 64 +lto = "thin" +codegen-units = 128 incremental = true [profile.release2] diff --git a/disk/src/aggtest.rs b/disk/src/aggtest.rs index e530027..8ab4a03 100644 --- a/disk/src/aggtest.rs +++ b/disk/src/aggtest.rs @@ -20,6 +20,7 @@ pub fn make_test_node(id: u32) -> Node { }), archiver_appliance: None, channel_archiver: None, + prometheus_api_bind: None, } } diff --git a/disk/src/gen.rs b/disk/src/gen.rs index 47dcdd9..586fde2 100644 --- a/disk/src/gen.rs +++ b/disk/src/gen.rs @@ -136,6 +136,7 @@ pub async fn gen_test_data() -> Result<(), Error> { }), archiver_appliance: None, channel_archiver: None, + prometheus_api_bind: None, }; ensemble.nodes.push(node); } diff --git a/httpret/Cargo.toml b/httpret/Cargo.toml index 6be57dc..07d80fa 100644 --- a/httpret/Cargo.toml +++ b/httpret/Cargo.toml @@ -2,7 +2,7 @@ name = "httpret" version = "0.0.1-a.0" authors = ["Dominik Werder "] -edition = "2018" +edition = "2021" [lib] path = "src/httpret.rs" @@ -35,3 +35,4 @@ commonio = { path = "../commonio" } taskrun = { path = "../taskrun" } scylla = "0.4" md-5 = "0.9" +regex = "1.6" diff --git a/httpret/src/httpret.rs b/httpret/src/httpret.rs index b3627e9..8e3bbad 100644 --- a/httpret/src/httpret.rs +++ b/httpret/src/httpret.rs @@ -6,6 +6,7 @@ pub mod err; pub mod events; pub mod evinfo; pub mod gather; +pub mod prometheus; pub mod proxy; pub mod pulsemap; pub mod search; @@ -51,6 +52,9 @@ pub async fn host(node_config: NodeConfigCached) -> Result<(), Error> { let x = Box::new(a); STATUS_BOARD.store(Box::into_raw(x), Ordering::SeqCst); }); + if let Some(bind) = node_config.node.prometheus_api_bind { + tokio::spawn(prometheus::host(bind)); + } let _update_task = if node_config.node_config.cluster.run_map_pulse_task { Some(UpdateTask::new(node_config.clone())) } else { diff --git a/httpret/src/prometheus.rs b/httpret/src/prometheus.rs new file mode 100644 index 0000000..01c0ce4 --- /dev/null +++ b/httpret/src/prometheus.rs @@ -0,0 +1,432 @@ +use crate::err::Error; +use http::{HeaderMap, HeaderValue, Method, Request, Response, StatusCode}; +use hyper::server::conn::AddrStream; +use hyper::service::{make_service_fn, service_fn}; +use hyper::{Body, Server}; +use netpod::log::*; +use netpod::{ACCEPT_ALL, APP_JSON}; +use serde_json::{json, Value}; +use std::collections::BTreeMap; +use std::net::SocketAddr; +use std::sync::Once; + +fn response(status: T) -> http::response::Builder +where + http::StatusCode: std::convert::TryFrom, + >::Error: Into, +{ + Response::builder() + .status(status) + .header("Access-Control-Allow-Origin", "*") + .header("Access-Control-Allow-Headers", "*") +} + +fn accepts_json(headers: &HeaderMap) -> bool { + match headers.get(http::header::ACCEPT) { + Some(h) => match h.to_str() { + Ok(hv) => { + if hv.contains(APP_JSON) { + true + } else if hv.contains(ACCEPT_ALL) { + true + } else { + false + } + } + Err(_) => false, + }, + None => false, + } +} + +pub struct StatusBuildInfoHandler {} + +impl StatusBuildInfoHandler { + pub fn handler(req: &Request) -> Option { + if req.uri().path() == "/api/v1/status/buildinfo" { + Some(Self {}) + } else { + None + } + } + + pub async fn handle(&self, req: Request) -> Result, Error> { + info!("{} for {:?}", std::any::type_name::(), req); + if req.method() == Method::GET { + if accepts_json(req.headers()) { + if true { + let res = json!({ + "status": "success", + "data": { + "version": "2.37", + "revision": "daqingest", + "branch": "dev", + "buildUser": "dominik.werder", + "buildDate": "2022-07-21", + "goVersion": "nogo" + } + }); + let body = Body::from(serde_json::to_vec(&res)?); + Ok(response(StatusCode::OK).body(body)?) + } else { + Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::from(format!("error")))?) + } + } else { + Ok(response(StatusCode::BAD_REQUEST).body(Body::empty())?) + } + } else { + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) + } + } +} + +pub struct SeriesHandler {} + +impl SeriesHandler { + pub fn handler(req: &Request) -> Option { + if req.uri().path() == "/api/v1/series" { + Some(Self {}) + } else { + None + } + } + + pub async fn handle(&self, req: Request) -> Result, Error> { + info!("{} for {:?}", std::any::type_name::(), req); + if req.method() == Method::GET || req.method() == Method::POST { + if accepts_json(req.headers()) { + let res = json!({ + "status": "success", + "data": [ + { + "__name__": "series1", + //"job": "daqingest", + //"instance": "node1" + } + ] + }); + let body = Body::from(serde_json::to_vec(&res)?); + Ok(response(StatusCode::OK).body(body)?) + } else { + Ok(response(StatusCode::BAD_REQUEST).body(Body::empty())?) + } + } else { + info!("series handler with {:?}", req); + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) + } + } +} + +pub struct MetadataHandler {} + +impl MetadataHandler { + pub fn handler(req: &Request) -> Option { + if req.uri().path() == "/api/v1/metadata" { + Some(Self {}) + } else { + None + } + } + + pub async fn handle(&self, req: Request) -> Result, Error> { + info!("{} for {:?}", std::any::type_name::(), req); + if req.method() == Method::GET { + if accepts_json(req.headers()) { + if true { + let res = json!({ + "status": "success", + "data": {} + }); + let body = Body::from(serde_json::to_vec(&res)?); + Ok(response(StatusCode::OK).body(body)?) + } else { + Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::from(format!("error")))?) + } + } else { + Ok(response(StatusCode::BAD_REQUEST).body(Body::empty())?) + } + } else { + info!("metadata handler with {:?}", req); + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) + } + } +} + +pub struct LabelsHandler {} + +impl LabelsHandler { + pub fn handler(req: &Request) -> Option { + if req.uri().path() == "/api/v1/labels" { + Some(Self {}) + } else { + None + } + } + + pub async fn handle(&self, req: Request) -> Result, Error> { + let self_name = std::any::type_name::(); + info!("{} for {:?}", self_name, req); + if req.method() == Method::GET || req.method() == Method::POST { + if accepts_json(req.headers()) { + if true { + let res = json!({ + "status": "success", + "data": ["__name__"] + }); + info!("return labels {:?}", res); + let body = Body::from(serde_json::to_vec(&res)?); + Ok(response(StatusCode::OK).body(body)?) + } else { + Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::from(format!("error")))?) + } + } else { + warn!("{} BAD_REQUEST {:?}", self_name, req); + Ok(response(StatusCode::BAD_REQUEST).body(Body::empty())?) + } + } else { + warn!("{} METHOD_NOT_ALLOWED {:?}", self_name, req); + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) + } + } +} + +pub struct LabelValuesHandler { + label: String, +} + +impl LabelValuesHandler { + pub fn handler(req: &Request) -> Option { + use regex::Regex; + static mut RE1: Option = None; + static RE1_INIT: Once = Once::new(); + let re1 = unsafe { + RE1_INIT.call_once(|| { + RE1 = Some(Regex::new(r#"/api/v1/label/([-:._a-zA-Z0-9]+)/values"#).unwrap()); + }); + RE1.as_mut().unwrap_unchecked() + }; + if let Some(caps) = re1.captures(req.uri().path()) { + if let Some(label) = caps.get(1) { + Some(LabelValuesHandler { + label: label.as_str().into(), + }) + } else { + None + } + } else { + None + } + } + + pub async fn handle(&self, req: Request) -> Result, Error> { + let self_name = std::any::type_name::(); + info!("{} for {:?}", self_name, req); + info!("LABEL {:?}", self.label); + if req.method() == Method::GET || req.method() == Method::POST { + if accepts_json(req.headers()) { + if self.label == "__name__" { + let res = json!({ + "status": "success", + "data": ["series1", "series2"] + }); + info!("label values {:?}", res); + let body = Body::from(serde_json::to_vec(&res)?); + Ok(response(StatusCode::OK).body(body)?) + } else { + let res = json!({ + "status": "success", + "data": [] + }); + warn!("return empty label values"); + let body = Body::from(serde_json::to_vec(&res)?); + Ok(response(StatusCode::OK).body(body)?) + } + } else { + warn!("{} bad accept", self_name); + Ok(response(StatusCode::BAD_REQUEST).body(Body::empty())?) + } + } else { + warn!("{} label value handler with {:?}", self_name, req); + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) + } + } +} + +pub struct QueryHandler {} + +impl QueryHandler { + pub fn handler(req: &Request) -> Option { + if req.uri().path() == "/api/v1/query" { + Some(Self {}) + } else { + None + } + } + + pub async fn handle(&self, req: Request) -> Result, Error> { + info!("{} for {:?}", std::any::type_name::(), req); + let url = url::Url::parse(&format!("dummy://{}", &req.uri())); + info!("/api/v1/query parsed url: {:?}", url); + let body = hyper::body::to_bytes(req.into_body()).await?; + let body_str = String::from_utf8_lossy(&body); + info!("/api/v1/query body_str: {:?}", body_str); + let formurl = url::Url::parse(&format!("dummy:///?{}", body_str)); + info!("/api/v1/query formurl: {:?}", formurl); + let res = json!({ + "status": "success", + "data": { + "resultType": "scalar", + "result": [40, "2"] + } + }); + let body = Body::from(serde_json::to_vec(&res)?); + Ok(response(StatusCode::OK).body(body)?) + } +} + +pub struct QueryRangeHandler {} + +impl QueryRangeHandler { + pub fn handler(req: &Request) -> Option { + if req.uri().path() == "/api/v1/query_range" { + Some(Self {}) + } else { + None + } + } + + pub async fn handle(&self, req: Request) -> Result, Error> { + info!("{} for {:?}", std::any::type_name::(), req); + let url = url::Url::parse(&format!("dummy://{}", &req.uri())); + info!("/api/v1/query_range parsed url: {:?}", url); + let body = hyper::body::to_bytes(req.into_body()).await?; + let body_str = String::from_utf8_lossy(&body); + info!("/api/v1/query_range body_str: {:?}", body_str); + let formurl = url::Url::parse(&format!("dummy:///?{}", body_str)); + info!("/api/v1/query_range formurl: {:?}", formurl); + match formurl { + Ok(formurl) => { + for (k, v) in formurl.query_pairs() { + info!("KEY {} VALUE {}", k, v); + } + let qps: BTreeMap<_, _> = formurl.query_pairs().collect(); + if let (Some(ts1), Some(ts2)) = (qps.get("start"), qps.get("end")) { + let ts1: f64 = ts1.parse().unwrap_or(1.); + let ts2: f64 = ts2.parse().unwrap_or(2.); + let ts1 = if !ts1.is_normal() { 1. } else { ts1 }; + let ts2 = if !ts2.is_normal() { 2. } else { ts2 }; + let ts1 = ts1.min(3e9); + let ts2 = ts2.min(3e9); + let ts2 = if ts2 <= ts1 { ts1 + 1. } else { ts2 }; + let dt = (ts2 - ts1) / 20.; + info!("ts1 {} ts2 {} dt {}", ts1, ts2, dt); + let mut ts = ts1; + let mut vals = Vec::new(); + while ts < ts2 { + ts += dt; + vals.push((ts, 1.2f32)); + } + let v: Vec<_> = vals + .into_iter() + .map(|(k, v)| json!([json!(k), Value::String(v.to_string())])) + .collect(); + let res = serde_json::json!({ + "status": "success", + "data": { + "resultType": "matrix", + "result": [ + { + "metric": { + "__name__": "series1", + }, + "values": v + } + ] + } + }); + info!("return {:?}", res); + let body = Body::from(serde_json::to_vec(&res)?); + return Ok(response(StatusCode::OK).body(body)?); + } else { + } + } + Err(e) => { + warn!("can not parse {e:?}"); + } + } + info!("query range returning default empty"); + let res = json!({ + "status": "success", + "data": { + "resultType": "matrix", + "result": [ + { + "metric": { + "__name__": "series1", + }, + "values": [ + ] + } + ] + } + }); + let body = Body::from(serde_json::to_vec(&res)?); + Ok(response(StatusCode::OK).body(body)?) + } +} + +async fn http_service_inner(req: Request) -> Result, Error> { + if let Some(h) = StatusBuildInfoHandler::handler(&req) { + h.handle(req).await + } else if let Some(h) = SeriesHandler::handler(&req) { + h.handle(req).await + } else if let Some(h) = MetadataHandler::handler(&req) { + h.handle(req).await + } else if let Some(h) = LabelsHandler::handler(&req) { + h.handle(req).await + } else if let Some(h) = LabelValuesHandler::handler(&req) { + h.handle(req).await + } else if let Some(h) = QueryHandler::handler(&req) { + h.handle(req).await + } else if let Some(h) = QueryRangeHandler::handler(&req) { + h.handle(req).await + } else { + warn!("no handler found for {:?}", req); + Ok(response(StatusCode::NOT_FOUND).body(Body::empty())?) + } +} + +async fn http_service(req: Request) -> Result, Error> { + match http_service_inner(req).await { + Ok(k) => Ok(k), + Err(e) => { + error!("daqbuffer node http_service sees error: {}", e); + Err(e) + } + } +} + +pub async fn host(bind: SocketAddr) -> Result<(), Error> { + let make_service = make_service_fn({ + move |conn: &AddrStream| { + let addr = conn.remote_addr(); + async move { + Ok::<_, Error>(service_fn({ + move |req| { + info!( + "REQUEST {:?} - {:?} - {:?} - {:?}", + addr, + req.method(), + req.uri(), + req.headers() + ); + let f = http_service(req); + Box::pin(f) + } + })) + } + } + }); + Server::bind(&bind).serve(make_service).await?; + Ok(()) +} diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index baf9501..93b022f 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -13,6 +13,7 @@ use serde_json::Value as JsVal; use std::collections::BTreeMap; use std::fmt; use std::iter::FromIterator; +use std::net::SocketAddr; use std::path::PathBuf; use std::pin::Pin; use std::str::FromStr; @@ -356,6 +357,7 @@ pub struct Node { pub sf_databuffer: Option, pub archiver_appliance: Option, pub channel_archiver: Option, + pub prometheus_api_bind: Option, } struct Visit1 {} @@ -426,6 +428,7 @@ impl Node { }), archiver_appliance: None, channel_archiver: None, + prometheus_api_bind: None, } } } @@ -2043,6 +2046,7 @@ pub fn test_cluster() -> Cluster { }), archiver_appliance: None, channel_archiver: None, + prometheus_api_bind: None, }) .collect(); Cluster { @@ -2077,6 +2081,7 @@ pub fn sls_test_cluster() -> Cluster { channel_archiver: Some(ChannelArchiver { data_base_paths: vec![test_data_base_path_channel_archiver_sls()], }), + prometheus_api_bind: None, }) .collect(); Cluster { @@ -2111,6 +2116,7 @@ pub fn archapp_test_cluster() -> Cluster { archiver_appliance: Some(ArchiverAppliance { data_base_paths: vec![test_data_base_path_archiver_appliance()], }), + prometheus_api_bind: None, }) .collect(); Cluster {