diff --git a/daqbuffer/src/bin/daqbuffer.rs b/daqbuffer/src/bin/daqbuffer.rs index e3c974f..a5d6159 100644 --- a/daqbuffer/src/bin/daqbuffer.rs +++ b/daqbuffer/src/bin/daqbuffer.rs @@ -1,8 +1,10 @@ use chrono::{DateTime, Duration, Utc}; +use clap::Clap; +use daqbuffer::cli::{ClientType, Opts, SubCmd}; use disk::binned::query::CacheUsage; use err::Error; use netpod::log::*; -use netpod::{NodeConfig, NodeConfigCached}; +use netpod::{NodeConfig, NodeConfigCached, ProxyConfig}; use tokio::fs::File; use tokio::io::AsyncReadExt; @@ -45,8 +47,6 @@ fn parse_ts(s: &str) -> Result, Error> { } async fn go() -> Result<(), Error> { - use clap::Clap; - use daqbuffer::cli::{ClientType, Opts, SubCmd}; let opts = Opts::parse(); match opts.subcmd { SubCmd::Retrieval(subcmd) => { @@ -61,6 +61,14 @@ async fn go() -> Result<(), Error> { let node_config = node_config?; daqbuffer::run_node(node_config.clone()).await?; } + SubCmd::Proxy(subcmd) => { + info!("daqbuffer proxy {}", clap::crate_version!()); + let mut config_file = File::open(subcmd.config).await?; + let mut buf = vec![]; + config_file.read_to_end(&mut buf).await?; + let proxy_config: ProxyConfig = serde_json::from_slice(&buf)?; + daqbuffer::run_proxy(proxy_config.clone()).await?; + } SubCmd::Client(client) => match client.client_type { ClientType::Status(opts) => { daqbuffer::client::status(opts.host, opts.port).await?; diff --git a/daqbuffer/src/cli.rs b/daqbuffer/src/cli.rs index 4d9681e..94f7358 100644 --- a/daqbuffer/src/cli.rs +++ b/daqbuffer/src/cli.rs @@ -12,6 +12,7 @@ pub struct Opts { #[derive(Debug, Clap)] pub enum SubCmd { Retrieval(Retrieval), + Proxy(Proxy), Client(Client), GenerateTestData, } @@ -22,6 +23,12 @@ pub struct Retrieval { pub config: String, } +#[derive(Debug, Clap)] +pub struct Proxy { + #[clap(long)] + pub config: String, +} + #[derive(Debug, Clap)] pub struct Client { #[clap(subcommand)] diff --git a/daqbuffer/src/lib.rs b/daqbuffer/src/lib.rs index 5cdb0dd..b8a8ecb 100644 --- a/daqbuffer/src/lib.rs +++ b/daqbuffer/src/lib.rs @@ -1,5 +1,5 @@ use err::Error; -use netpod::{Cluster, NodeConfig, NodeConfigCached}; +use netpod::{Cluster, NodeConfig, NodeConfigCached, ProxyConfig}; use tokio::task::JoinHandle; #[allow(unused_imports)] use tracing::{debug, error, info, trace, warn}; @@ -29,3 +29,8 @@ pub async fn run_node(node_config: NodeConfigCached) -> Result<(), Error> { httpret::host(node_config).await?; Ok(()) } + +pub async fn run_proxy(proxy_config: ProxyConfig) -> Result<(), Error> { + httpret::proxy(proxy_config).await?; + Ok(()) +} diff --git a/disk/src/binned.rs b/disk/src/binned.rs index c9b553e..014c8df 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -655,6 +655,7 @@ where S: Stream> + Unpin, T: Collectable, { + info!("\n\nConstruct deadline with timeout {:?}\n\n", timeout); let deadline = tokio::time::Instant::now() + timeout; let mut collector = ::new_collector(bin_count_exp); let mut i1 = 0; @@ -711,15 +712,11 @@ pub struct BinnedJsonChannelExec { } impl BinnedJsonChannelExec { - pub fn new(query: BinnedQuery, node_config: NodeConfigCached) -> Self { - info!( - "BinnedJsonChannelExec AggKind: {:?}\n--------------------------------------------------------------", - query.agg_kind() - ); + pub fn new(query: BinnedQuery, timeout: Duration, node_config: NodeConfigCached) -> Self { Self { query, node_config, - timeout: Duration::from_millis(3000), + timeout, } } } @@ -819,7 +816,7 @@ pub async fn binned_json( node_config: &NodeConfigCached, ) -> Result> + Send>>, Error> { let ret = channel_exec( - BinnedJsonChannelExec::new(query.clone(), node_config.clone()), + BinnedJsonChannelExec::new(query.clone(), query.timeout(), node_config.clone()), query.channel(), query.range(), query.agg_kind().clone(), diff --git a/disk/src/binned/prebinned.rs b/disk/src/binned/prebinned.rs index a239e21..cf6dc9f 100644 --- a/disk/src/binned/prebinned.rs +++ b/disk/src/binned/prebinned.rs @@ -148,7 +148,6 @@ fn make_num_pipeline( } } -// TODO after the refactor, return direct value instead of boxed. pub async fn pre_binned_bytes_for_http( node_config: &NodeConfigCached, query: &PreBinnedQuery, diff --git a/disk/src/binned/query.rs b/disk/src/binned/query.rs index 4742d16..fbc4b4b 100644 --- a/disk/src/binned/query.rs +++ b/disk/src/binned/query.rs @@ -76,7 +76,7 @@ impl PreBinnedQuery { self.patch.to_url_params_strings(), self.channel.backend, self.channel.name, - binning_scheme_string(&self.agg_kind), + binning_scheme_query_string(&self.agg_kind), self.cache_usage, self.disk_stats_every.bytes() / 1024, self.report_error(), @@ -293,7 +293,7 @@ impl BinnedQuery { self.bin_count, Utc.timestamp_nanos(self.range.beg as i64).format(date_fmt), Utc.timestamp_nanos(self.range.end as i64).format(date_fmt), - binning_scheme_string(&self.agg_kind), + binning_scheme_query_string(&self.agg_kind), self.disk_stats_every.bytes() / 1024, self.timeout.as_millis(), self.abort_after_bin_count, @@ -301,17 +301,16 @@ impl BinnedQuery { } } -fn binning_scheme_string(agg_kind: &AggKind) -> String { +fn binning_scheme_query_string(agg_kind: &AggKind) -> String { match agg_kind { AggKind::Plain => "fullValue".into(), AggKind::DimXBins1 => "toScalarX".into(), - AggKind::DimXBinsN(n) => format!("binnedXcount{}", n), + AggKind::DimXBinsN(n) => format!("binnedX&binnedXcount={}", n), } } fn agg_kind_from_binning_scheme(params: &BTreeMap) -> Result { let key = "binningScheme"; - let tok1 = "binnedXcount"; let s = params .get(key) .map_or(Err(Error::with_msg(format!("can not find {}", key))), |k| Ok(k))?; @@ -319,8 +318,9 @@ fn agg_kind_from_binning_scheme(params: &BTreeMap) -> Result> + Unpin, T: Collectable + Debug, { + info!("\n\nConstruct deadline with timeout {:?}\n\n", timeout); let deadline = tokio::time::Instant::now() + timeout; // TODO in general a Collector does not need to know about the expected number of bins. // It would make more sense for some specific Collector kind to know. @@ -335,7 +336,6 @@ where collector.set_range_complete(); } RangeCompletableItem::Data(item) => { - info!("collect_plain_events_json GOT ITEM {:?}", item); collector.ingest(&item); i1 += 1; } diff --git a/disk/src/events.rs b/disk/src/events.rs index 3a6835b..ba5d01c 100644 --- a/disk/src/events.rs +++ b/disk/src/events.rs @@ -19,7 +19,7 @@ impl PlainEventsQuery { channel, range, report_error: false, - timeout: Duration::from_millis(2000), + timeout: Duration::from_millis(10000), } } @@ -40,7 +40,7 @@ impl PlainEventsQuery { .map_err(|e| Error::with_msg(format!("can not parse reportError {:?}", e)))?, timeout: params .get("timeout") - .map_or("2000", |k| k) + .map_or("10000", |k| k) .parse::() .map(|k| Duration::from_millis(k)) .map_err(|e| Error::with_msg(format!("can not parse timeout {:?}", e)))?, @@ -98,7 +98,7 @@ impl PlainEventsJsonQuery { channel, range, report_error: false, - timeout: Duration::from_millis(2000), + timeout: Duration::from_millis(10000), } } @@ -119,7 +119,7 @@ impl PlainEventsJsonQuery { .map_err(|e| Error::with_msg(format!("can not parse reportError {:?}", e)))?, timeout: params .get("timeout") - .map_or("2000", |k| k) + .map_or("10000", |k| k) .parse::() .map(|k| Duration::from_millis(k)) .map_err(|e| Error::with_msg(format!("can not parse timeout {:?}", e)))?, diff --git a/err/src/lib.rs b/err/src/lib.rs index d04f069..97320ab 100644 --- a/err/src/lib.rs +++ b/err/src/lib.rs @@ -2,6 +2,7 @@ Error handling and reporting. */ +use http::header::InvalidHeaderValue; use http::uri::InvalidUri; use nom::error::ErrorKind; use serde::{Deserialize, Serialize}; @@ -11,6 +12,7 @@ use std::num::{ParseFloatError, ParseIntError}; use std::string::FromUtf8Error; use std::sync::PoisonError; use tokio::task::JoinError; +use tokio::time::error::Elapsed; /** The common error type for this application. @@ -240,6 +242,18 @@ impl From> for Error { } } +impl From for Error { + fn from(k: InvalidHeaderValue) -> Self { + Self::with_msg(format!("{:?}", k)) + } +} + +impl From for Error { + fn from(k: Elapsed) -> Self { + Self::with_msg(format!("{:?}", k)) + } +} + pub fn todo() { todo!("TODO"); } diff --git a/httpret/src/api1.rs b/httpret/src/api1.rs new file mode 100644 index 0000000..19be26a --- /dev/null +++ b/httpret/src/api1.rs @@ -0,0 +1,390 @@ +use crate::response; +use err::Error; +use http::{Method, StatusCode}; +use hyper::{Body, Client, Request, Response}; +use netpod::log::*; +use serde::{Deserialize, Serialize}; +use serde_json::Value as JsonValue; +use std::future::Future; +use std::pin::Pin; +use std::time::Duration; +use tokio::time::timeout_at; + +fn get_backends() -> [(&'static str, &'static str, u16); 6] { + // TODO take from config. + err::todo(); + [ + ("gls-archive", "gls-data-api.psi.ch", 8371), + ("hipa-archive", "hipa-data-api.psi.ch", 8082), + ("sf-databuffer", "sf-daqbuf-33.psi.ch", 8371), + ("sf-imagebuffer", "sf-daq-5.psi.ch", 8371), + ("timeout", "sf-daqbuf-33.psi.ch", 8371), + ("error500", "sf-daqbuf-33.psi.ch", 8371), + ] +} + +fn get_live_hosts() -> &'static [(&'static str, u16)] { + // TODO take from config. + err::todo(); + &[ + ("sf-daqbuf-21", 8371), + ("sf-daqbuf-22", 8371), + ("sf-daqbuf-23", 8371), + ("sf-daqbuf-24", 8371), + ("sf-daqbuf-25", 8371), + ("sf-daqbuf-26", 8371), + ("sf-daqbuf-27", 8371), + ("sf-daqbuf-28", 8371), + ("sf-daqbuf-29", 8371), + ("sf-daqbuf-30", 8371), + ("sf-daqbuf-31", 8371), + ("sf-daqbuf-32", 8371), + ("sf-daqbuf-33", 8371), + ("sf-daq-5", 8371), + ("sf-daq-6", 8371), + ("hipa-data-api", 8082), + ("gls-data-api", 8371), + ] +} + +pub trait BackendAware { + fn backend(&self) -> &str; +} + +pub trait FromErrorCode { + fn from_error_code(backend: &str, code: ErrorCode) -> Self; +} + +#[derive(Debug, PartialEq, Serialize, Deserialize)] +pub enum ErrorCode { + Error, + Timeout, +} + +#[derive(Debug, PartialEq, Serialize, Deserialize)] +pub struct ErrorDescription { + code: ErrorCode, +} + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub enum Ordering { + #[serde(rename = "none")] + NONE, + #[serde(rename = "asc")] + ASC, + #[serde(rename = "desc")] + DESC, +} + +#[derive(Debug, PartialEq, Serialize, Deserialize)] +pub struct ChannelSearchQueryV1 { + #[serde(skip_serializing_if = "Option::is_none")] + pub regex: Option, + #[serde(rename = "sourceRegex", skip_serializing_if = "Option::is_none")] + pub source_regex: Option, + #[serde(rename = "descriptionRegex", skip_serializing_if = "Option::is_none")] + pub description_regex: Option, + #[serde(skip_serializing_if = "Vec::is_empty")] + pub backends: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub ordering: Option, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct ChannelSearchResultItemV1 { + pub backend: String, + pub channels: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub error: Option, +} + +impl BackendAware for ChannelSearchResultItemV1 { + fn backend(&self) -> &str { + &self.backend + } +} + +impl FromErrorCode for ChannelSearchResultItemV1 { + fn from_error_code(backend: &str, code: ErrorCode) -> Self { + Self { + backend: backend.into(), + channels: vec![], + error: Some(ErrorDescription { code }), + } + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct ChannelSearchResultV1(pub Vec); + +pub async fn channels_list_v1(req: Request) -> Result, Error> { + let reqbody = req.into_body(); + let bodyslice = hyper::body::to_bytes(reqbody).await?; + let query: ChannelSearchQueryV1 = serde_json::from_slice(&bodyslice)?; + let subq_maker = |backend: &str| -> JsonValue { + serde_json::to_value(ChannelSearchQueryV1 { + regex: query.regex.clone(), + source_regex: query.source_regex.clone(), + description_regex: query.description_regex.clone(), + backends: vec![backend.into()], + ordering: query.ordering.clone(), + }) + .unwrap() + }; + let back2: Vec<_> = query.backends.iter().map(|x| x.as_str()).collect(); + let spawned = subreq(&back2[..], "channels", &subq_maker)?; + let mut res = vec![]; + for (backend, s) in spawned { + res.push((backend, s.await)); + } + let res2 = ChannelSearchResultV1(extr(res)); + let body = serde_json::to_string(&res2.0)?; + let res = response(StatusCode::OK).body(body.into())?; + Ok(res) +} + +type TT0 = ( + (&'static str, &'static str, u16), + http::response::Parts, + hyper::body::Bytes, +); +type TT1 = Result; +type TT2 = tokio::task::JoinHandle; +type TT3 = Result; +type TT4 = Result; +type TT7 = Pin + Send>>; +type TT8 = (&'static str, TT7); + +fn subreq(backends_req: &[&str], endp: &str, subq_maker: &dyn Fn(&str) -> JsonValue) -> Result, Error> { + let backends = get_backends(); + let mut spawned = vec![]; + for back in &backends { + if backends_req.contains(&back.0) { + let back = back.clone(); + let q = subq_maker(back.0); + let endp = match back.0 { + "timeout" => "channels_timeout", + "error500" => "channels_error500", + _ => endp, + }; + let uri = format!("http://{}:{}{}/{}", back.1, back.2, "/api/1", endp); + let req = Request::builder() + .method(Method::POST) + .uri(uri) + .header("content-type", "application/json") + .body(Body::from(serde_json::to_string(&q)?))?; + let jh: TT2 = tokio::spawn(async move { + let res = Client::new().request(req).await?; + let (pre, body) = res.into_parts(); + //info!("Answer from {} status {}", back.1, pre.status); + let body_all = hyper::body::to_bytes(body).await?; + //info!("Got {} bytes from {}", body_all.len(), back.1); + Ok::<_, Error>((back, pre, body_all)) + }); + let jh = tokio::time::timeout(std::time::Duration::from_millis(5000), jh); + let bx: TT7 = Box::pin(jh); + spawned.push((back.0, bx)); + } + } + Ok(spawned) +} + +//fn extr<'a, T: BackendAware + FromErrorCode + Deserialize<'a>>(results: Vec<(&str, TT4)>) -> Vec { +fn extr Deserialize<'a>>(results: Vec<(&str, TT4)>) -> Vec { + let mut ret = vec![]; + for (backend, r) in results { + if let Ok(r20) = r { + if let Ok(r30) = r20 { + if let Ok(r2) = r30 { + if r2.1.status == 200 { + let inp_res: Result, _> = serde_json::from_slice(&r2.2); + if let Ok(inp) = inp_res { + if inp.len() > 1 { + error!("more than one result item from {:?}", r2.0); + } else { + for inp2 in inp { + if inp2.backend() == r2.0 .0 { + ret.push(inp2); + } + } + } + } else { + error!("malformed answer from {:?}", r2.0); + ret.push(T::from_error_code(backend, ErrorCode::Error)); + } + } else { + error!("bad answer from {:?}", r2.0); + ret.push(T::from_error_code(backend, ErrorCode::Error)); + } + } else { + error!("bad answer from {:?}", r30); + ret.push(T::from_error_code(backend, ErrorCode::Error)); + } + } else { + error!("subrequest join handle error {:?}", r20); + ret.push(T::from_error_code(backend, ErrorCode::Error)); + } + } else { + error!("subrequest timeout {:?}", r); + ret.push(T::from_error_code(backend, ErrorCode::Timeout)); + } + } + ret +} + +#[derive(Debug, PartialEq, Serialize, Deserialize)] +pub struct ChannelConfigV1 { + pub backend: String, + pub name: String, + pub source: String, + #[serde(rename = "type")] + pub ty: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub shape: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub unit: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub description: Option, +} + +#[derive(Debug, PartialEq, Serialize, Deserialize)] +pub struct ChannelConfigsQueryV1 { + #[serde(skip_serializing_if = "Option::is_none")] + pub regex: Option, + #[serde(rename = "sourceRegex")] + pub source_regex: Option, + #[serde(rename = "descriptionRegex")] + pub description_regex: Option, + #[serde(skip_serializing_if = "Vec::is_empty", default)] + pub backends: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub ordering: Option, +} + +#[derive(Debug, PartialEq, Serialize, Deserialize)] +pub struct ChannelBackendConfigsV1 { + pub backend: String, + pub channels: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub error: Option, +} + +#[derive(Debug, PartialEq, Serialize, Deserialize)] +pub struct ChannelConfigsResponseV1(pub Vec); + +impl BackendAware for ChannelBackendConfigsV1 { + fn backend(&self) -> &str { + &self.backend + } +} + +impl FromErrorCode for ChannelBackendConfigsV1 { + fn from_error_code(backend: &str, code: ErrorCode) -> Self { + Self { + backend: backend.into(), + channels: vec![], + error: Some(ErrorDescription { code }), + } + } +} + +pub async fn channels_config_v1(req: Request) -> Result, Error> { + let reqbody = req.into_body(); + let bodyslice = hyper::body::to_bytes(reqbody).await?; + let query: ChannelConfigsQueryV1 = serde_json::from_slice(&bodyslice)?; + let subq_maker = |backend: &str| -> JsonValue { + serde_json::to_value(ChannelConfigsQueryV1 { + regex: query.regex.clone(), + source_regex: query.source_regex.clone(), + description_regex: query.description_regex.clone(), + backends: vec![backend.into()], + ordering: query.ordering.clone(), + }) + .unwrap() + }; + let back2: Vec<_> = query.backends.iter().map(|x| x.as_str()).collect(); + let spawned = subreq(&back2[..], "channels/config", &subq_maker)?; + let mut res = vec![]; + for (backend, s) in spawned { + res.push((backend, s.await)); + } + let res2 = ChannelConfigsResponseV1(extr(res)); + let body = serde_json::to_string(&res2.0)?; + let res = response(StatusCode::OK).body(body.into())?; + Ok(res) +} + +pub async fn gather_json_v1(req_m: Request, path: &str) -> Result, Error> { + let mut spawned = vec![]; + let (req_h, _) = req_m.into_parts(); + for host in get_live_hosts() { + for inst in &["00", "01", "02"] { + let req_hh = req_h.headers.clone(); + let host_filter = if req_hh.contains_key("host_filter") { + Some(req_hh.get("host_filter").unwrap().to_str().unwrap()) + } else { + None + }; + let path = path.to_string(); + let task = if host_filter.is_none() || host_filter.as_ref().unwrap() == &host.0 { + let task = ( + host.clone(), + inst.to_string(), + tokio::spawn(async move { + let uri = format!("http://{}:{}{}", host.0, host.1, path); + let req = Request::builder().method(Method::GET).uri(uri); + let req = if false && req_hh.contains_key("retrieval_instance") { + req.header("retrieval_instance", req_hh.get("retrieval_instance").unwrap()) + } else { + req + }; + let req = req.header("retrieval_instance", *inst); + //.header("content-type", "application/json") + //.body(Body::from(serde_json::to_string(&q)?))?; + let req = req.body(Body::empty())?; + let deadline = tokio::time::Instant::now() + Duration::from_millis(1000); + let fut = async { + let res = Client::new().request(req).await?; + let (pre, body) = res.into_parts(); + if pre.status != StatusCode::OK { + Err(Error::with_msg(format!("request failed, got {}", pre.status))) + } else { + // aggregate returns a hyper Buf which is not Read + let body_all = hyper::body::to_bytes(body).await?; + let val = match serde_json::from_slice(&body_all) { + Ok(k) => k, + Err(_e) => JsonValue::String(String::from_utf8(body_all.to_vec())?), + }; + Ok(val) + } + }; + let ret = timeout_at(deadline, fut).await??; + Ok::<_, Error>(ret) + }), + ); + Some(task) + } else { + None + }; + if let Some(task) = task { + spawned.push(task); + } + } + } + use serde_json::Map; + let mut m = Map::new(); + for h in spawned { + let res = match h.2.await { + Ok(k) => match k { + Ok(k) => k, + Err(_e) => JsonValue::String(format!("ERROR")), + }, + Err(_e) => JsonValue::String(format!("ERROR")), + }; + m.insert(format!("{}:{}-{}", h.0 .0, h.0 .1, h.1), res); + } + let res = response(200) + .header("Content-Type", "application/json") + .body(serde_json::to_string(&m)?.into())?; + Ok(res) +} diff --git a/httpret/src/gather.rs b/httpret/src/gather.rs index 1bccd36..53df77f 100644 --- a/httpret/src/gather.rs +++ b/httpret/src/gather.rs @@ -1,10 +1,15 @@ use crate::response; use err::Error; +use futures_util::{select, FutureExt}; use http::{Method, StatusCode}; use hyper::{Body, Client, Request, Response}; use netpod::{Node, NodeConfigCached}; use serde::{Deserialize, Serialize}; use serde_json::Value as JsonValue; +use std::future::Future; +use std::pin::Pin; +use std::time::Duration; +use tokio::time::sleep; #[derive(Clone, Serialize, Deserialize)] struct GatherFrom { @@ -44,7 +49,7 @@ async fn process_answer(res: Response) -> Result { } } -pub async fn gather_json_from_hosts(req: Request, pathpre: &str) -> Result, Error> { +pub async fn unused_gather_json_from_hosts(req: Request, pathpre: &str) -> Result, Error> { let (part_head, part_body) = req.into_parts(); let bodyslice = hyper::body::to_bytes(part_body).await?; let gather_from: GatherFrom = serde_json::from_slice(&bodyslice)?; @@ -61,10 +66,6 @@ pub async fn gather_json_from_hosts(req: Request, pathpre: &str) -> Result }; let req = req.header(http::header::ACCEPT, "application/json"); let req = req.body(Body::empty()); - use futures_util::select; - use futures_util::FutureExt; - use std::time::Duration; - use tokio::time::sleep; let task = tokio::spawn(async move { select! { _ = sleep(Duration::from_millis(1500)).fuse() => { @@ -114,13 +115,9 @@ pub async fn gather_get_json(req: Request, node_config: &NodeConfigCached) .map(|node| { let uri = format!("http://{}:{}/api/4/{}", node.host, node.port, pathsuf); let req = Request::builder().method(Method::GET).uri(uri); - let req = req.header("x-node-from-name", format!("{}", node_config.node_config.name)); + let req = req.header("x-log-from-node-name", format!("{}", node_config.node_config.name)); let req = req.header(http::header::ACCEPT, "application/json"); let req = req.body(Body::empty()); - use futures_util::select; - use futures_util::FutureExt; - use std::time::Duration; - use tokio::time::sleep; let task = tokio::spawn(async move { select! { _ = sleep(Duration::from_millis(1500)).fuse() => { @@ -162,3 +159,77 @@ pub async fn gather_get_json(req: Request, node_config: &NodeConfigCached) .body(serde_json::to_string(&Jres { hosts: a })?.into())?; Ok(res) } + +pub async fn gather_get_json_generic( + method: http::Method, + uri: String, + schemehostports: Vec, + nt: NT, + ft: FT, + timeout: Duration, +) -> Result, Error> +where + SM: Send + 'static, + NT: Fn(Response) -> Pin> + Send>> + Send + Sync + Copy + 'static, + FT: Fn(Vec) -> Result, Error>, +{ + let spawned: Vec<_> = schemehostports + .iter() + .map(move |schemehostport| { + let uri = format!("{}{}", schemehostport, uri.clone()); + let req = Request::builder().method(method.clone()).uri(uri); + //let req = req.header("x-log-from-node-name", format!("{}", node_config.node_config.name)); + let req = req.header(http::header::ACCEPT, "application/json"); + let req = req.body(Body::empty()); + let task = tokio::spawn(async move { + select! { + _ = sleep(timeout).fuse() => { + Err(Error::with_msg("timeout")) + } + res = Client::new().request(req?).fuse() => Ok(nt(res?).await?) + } + }); + (schemehostport.clone(), task) + }) + .collect(); + let mut a = vec![]; + for (_schemehostport, jh) in spawned { + let res = match jh.await { + Ok(k) => match k { + Ok(k) => k, + Err(e) => return Err(e), + }, + Err(e) => return Err(e.into()), + }; + a.push(res); + } + let a = a; + ft(a) +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn try_search() { + let schemehostports = ["http://sf-daqbuf-22:8371".into()]; + let fut = gather_get_json_generic( + hyper::Method::GET, + format!("/api/4/search/channel"), + schemehostports.to_vec(), + |_res| { + let fut = async { Ok(()) }; + Box::pin(fut) + }, + |_all| { + let res = response(StatusCode::OK) + .header(http::header::CONTENT_TYPE, "application/json") + .body(serde_json::to_string(&42)?.into())?; + Ok(res) + }, + Duration::from_millis(4000), + ); + let _ = fut; + } +} diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index 06ce426..3fd20a7 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -1,3 +1,4 @@ +use crate::api1::{channels_config_v1, channels_list_v1, gather_json_v1}; use crate::gather::gather_get_json; use bytes::Bytes; use disk::binned::prebinned::pre_binned_bytes_for_http; @@ -13,7 +14,7 @@ use hyper::service::{make_service_fn, service_fn}; use hyper::{server::Server, Body, Request, Response}; use net::SocketAddr; use netpod::log::*; -use netpod::{AggKind, Channel, NodeConfigCached}; +use netpod::{AggKind, Channel, NodeConfigCached, ProxyConfig}; use panic::{AssertUnwindSafe, UnwindSafe}; use pin::Pin; use serde::{Deserialize, Serialize}; @@ -22,11 +23,12 @@ use task::{Context, Poll}; use tracing::field::Empty; use tracing::Instrument; +pub mod api1; pub mod gather; +pub mod proxy; pub mod search; pub async fn host(node_config: NodeConfigCached) -> Result<(), Error> { - let node_config = node_config.clone(); let rawjh = taskrun::spawn(events_service(node_config.clone())); use std::str::FromStr; let addr = SocketAddr::from_str(&format!("{}:{}", node_config.node.listen, node_config.node.port))?; @@ -52,7 +54,7 @@ async fn http_service(req: Request, node_config: NodeConfigCached) -> Resu match http_service_try(req, &node_config).await { Ok(k) => Ok(k), Err(e) => { - error!("data_api_proxy sees error: {:?}", e); + error!("daqbuffer node http_service sees error: {:?}", e); Err(e) } } @@ -141,7 +143,6 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> } } else if path == "/api/4/search/channel" { if req.method() == Method::GET { - // TODO multi-facility search Ok(search::channel_search(req, &node_config).await?) } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) @@ -237,6 +238,14 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } + } else if path == "/api/1/channels" { + Ok(channels_list_v1(req).await?) + } else if path == "/api/1/channels/config" { + Ok(channels_config_v1(req).await?) + } else if path == "/api/1/stats/version" { + Ok(gather_json_v1(req, "/stats/version").await?) + } else if path.starts_with("/api/1/stats/") { + Ok(gather_json_v1(req, path).await?) } else { Ok(response(StatusCode::NOT_FOUND).body(Body::from(format!( "Sorry, not found: {:?} {:?} {:?}", @@ -594,3 +603,52 @@ pub async fn ca_connect_1(req: Request, node_config: &NodeConfigCached) -> })))?; Ok(ret) } + +pub async fn proxy(proxy_config: ProxyConfig) -> Result<(), Error> { + use std::str::FromStr; + let addr = SocketAddr::from_str(&format!("{}:{}", proxy_config.listen, proxy_config.port))?; + let make_service = make_service_fn({ + move |_conn| { + let proxy_config = proxy_config.clone(); + async move { + Ok::<_, Error>(service_fn({ + move |req| { + let f = proxy_http_service(req, proxy_config.clone()); + Cont { f: Box::pin(f) } + } + })) + } + } + }); + Server::bind(&addr).serve(make_service).await?; + Ok(()) +} + +async fn proxy_http_service(req: Request, proxy_config: ProxyConfig) -> Result, Error> { + match proxy_http_service_try(req, &proxy_config).await { + Ok(k) => Ok(k), + Err(e) => { + error!("data_api_proxy sees error: {:?}", e); + Err(e) + } + } +} + +async fn proxy_http_service_try(req: Request, proxy_config: &ProxyConfig) -> Result, Error> { + let uri = req.uri().clone(); + let path = uri.path(); + if path == "/api/4/search/channel" { + if req.method() == Method::GET { + Ok(proxy::channel_search(req, &proxy_config).await?) + } else { + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) + } + } else { + Ok(response(StatusCode::NOT_FOUND).body(Body::from(format!( + "Sorry, not found: {:?} {:?} {:?}", + req.method(), + req.uri().path(), + req.uri().query(), + )))?) + } +} diff --git a/httpret/src/proxy.rs b/httpret/src/proxy.rs new file mode 100644 index 0000000..e32e13d --- /dev/null +++ b/httpret/src/proxy.rs @@ -0,0 +1,47 @@ +use crate::response; +use err::Error; +use http::{HeaderValue, StatusCode}; +use hyper::{Body, Request, Response}; +use netpod::{ChannelSearchQuery, ProxyConfig}; +use std::future::Future; +use std::pin::Pin; +use std::time::Duration; + +pub async fn channel_search(req: Request, proxy_config: &ProxyConfig) -> Result, Error> { + let (head, _body) = req.into_parts(); + match head.headers.get("accept") { + Some(v) => { + if v == "application/json" { + // TODO actually pass on the query parameters to the sub query. + err::todo(); + let query = ChannelSearchQuery::from_request(head.uri.query())?; + let uri = format!("/api/4/search/channel"); + let nt = |_res| { + let fut = async { Ok(0f32) }; + Box::pin(fut) as Pin> + Send>> + }; + let ft = |_all| { + let res = response(StatusCode::OK) + .header(http::header::CONTENT_TYPE, "application/json") + .body(serde_json::to_string(&42)?.into())?; + Ok(res) + }; + let mut ret = crate::gather::gather_get_json_generic::( + http::Method::GET, + uri, + proxy_config.search_hosts.clone(), + nt, + ft, + Duration::from_millis(3000), + ) + .await?; + ret.headers_mut() + .append("x-proxy-log-mark", HeaderValue::from_str("proxied")?); + Ok(ret) + } else { + Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?) + } + } + _ => Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?), + } +} diff --git a/httpret/static/documentation/api4.html b/httpret/static/documentation/api4.html index e0da2fb..2efdbdd 100644 --- a/httpret/static/documentation/api4.html +++ b/httpret/static/documentation/api4.html @@ -23,12 +23,31 @@ Currently available: +

Timestamp format

+

The result encodes timestamps in the form:

+
{
+  "tsAnchor": 1623909860,                    // Time-anchor of this result in UNIX epoch seconds.
+  "tsOffMs": [573, 15671, 37932, ...],       // Millisecond-offset to tsAnchor for each event/bin-edge.
+  "tsOffNs": [422901, 422902, 422903, ...],  // Nanosecond-offset to tsAnchor in addition to tsOffMs for each event/bin-edge.
+}
+

which results in these nanosecond-timestamps:

+
1623909860573422901
+1623909875671422902
+1623909897932422903
+

Formally: tsAbsolute = tsAnchor * 109 + tsOffMs * 106 + tsOffNs

+

Two reasons lead to this choice of timestamp format:

+
    +
  • Javascript can not represent the full nanosecond-resolution timestamps in a single numeric variable.
  • +
  • The lowest 6 digits of the nanosecond timestamp are anyway abused by the timing system to emit a pulse-id.
  • +
+ +

API functions

Currently available functionality:

@@ -82,56 +101,56 @@ curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/search/channel

Query event data

+

Returns the full event values in a given time range.

Method: GET

URL: https://data-api.psi.ch/api/4/events

Query parameters:

  • channelBackend (e.g. "sf-databuffer")
  • -
  • channelName (e.g. "SLAAR-LSCP4-LAS6891:CH7:1")
  • +
  • channelName (e.g. "S10CB02-RBOC-DCP10:FOR-AMPLT-AVG")
  • begDate (e.g. "2021-05-26T07:10:00.000Z")
  • endDate (e.g. "2021-05-26T07:16:00.000Z")

Request header: "Accept" must be "application/json"

+

Timeout

+

If the requested range takes too long to retrieve, then the flags timedOut: true will be set.

+

CURL example:

 curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/events?channelBackend=sf-databuffer
-  &channelName=SLAAR-LSCP4-LAS6891:CH7:1&begDate=2021-06-11T07:00:00.000Z&endDate=2021-06-11T07:00:01.000Z'
+  &channelName=S10CB02-RBOC-DCP10:FOR-AMPLT-AVG&begDate=2021-05-26T07:10:00.000Z&endDate=2021-05-26T07:16:00.000Z'
 
-

Timestamp format

-

Javascript can not represent the full 64-bit integer and the databuffer nanosecond timestamps would lose precision. -Therefore, timestamps are represented in the response by ts0 which gives an absolute anchor -in time in units of seconds, and the array tsoff with the offset of each event in nanoseconds.

- -

Timeout

-

If the requested range takes too long to retrieve, then the flags timedOut: true will be set.

-

Example response:

 {
   "finalisedRange": true,
-  "ts0": 1623394800,
-  "tsoff": [
-    68461150,
-    169461160,
-    269461170,
-    369461180,
-    479461191,
-    579461201,
-    ...
+  "tsAnchor": 1623763172,
+  "tsMs": [
+    5,
+    15,
+    25,
+    35
+  ],
+  "tsNs": [
+    299319,
+    299320,
+    299321,
+    299322
   ],
   "values": [
-    [378, 325, 321, 381, ... waveform of 1st event ],
-    [334, 355, 360, 345, ... waveform of 2nd event ],
-    ...
+    0.6080216765403748,
+    0.6080366969108582,
+    0.6080275177955627,
+    0.6080636382102966
   ]
 }
 

Finalised range

If the server can determine that no more data will be added to the requested time range - then it will add the flag finalisedRange: true to the response.

+then it will add the flag finalisedRange: true to the response.

@@ -142,11 +161,17 @@ in time in units of seconds, and the array tsoff with the offse

URL: https://data-api.psi.ch/api/4/binned

Query parameters:

    -
  • channelBackend (e.g. "sf-databuffer")
  • -
  • channelName (e.g. "SLAAR-LSCP4-LAS6891:CH7:1")
  • -
  • begDate (e.g. "2021-05-26T07:10:00.000Z")
  • -
  • endDate (e.g. "2021-05-26T07:16:00.000Z")
  • -
  • binCount (e.g. "6")
  • +
  • channelBackend (e.g. "sf-databuffer")
  • +
  • channelName (e.g. "SLAAR-LSCP4-LAS6891:CH7:1")
  • +
  • begDate (e.g. "2021-05-26T07:10:00.000Z")
  • +
  • endDate (e.g. "2021-05-26T07:16:00.000Z")
  • +
  • binCount (number of requested bins in time-dimension, e.g. "6")
  • +
  • binningScheme (optional)
  • +
      +
    • if not specified: waveform gets first binned to a scalar.
    • +
    • "binningScheme=binnedX&binnedXcount=13": waveform gets first binned to 13 bins in X-dimension (waveform-dimension).
    • +
    • "binningScheme=binnedX&binnedXcount=0": waveform is not binned in X-dimension but kept at full length.
    • +

Request header: "Accept" must be "application/json"

@@ -157,89 +182,167 @@ curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/binned?channel

Partial result

-

If the requested range takes longer time to retrieve, then a partial result with at least one bin is returned.

-

The partial result will contain the necessary information to send another request with a range that -starts with the first not-yet-retrieved bin.

-

This information is provided by the continueAt and missingBins fields.

-

This enables the user agent to start the presentation to the user while updating the user interface +

If the requested range takes longer time to retrieve, then a partial result with at least one bin is returned. +The partial result will contain the necessary information to send another request with a range that +starts with the first not-yet-retrieved bin. +This information is provided by the continueAt and missingBins fields. +This enables the user agent to start the presentation to the user while updating the user interface as new bins are received.

-

Example response:

-
-{
-  "ts0": 1623304800,
-  "missingBins": 1,
-  "continueAt": 86400000000000,
-  "tsoff": [
-    0,
-    7200000000000,
-    14400000000000,
-    21600000000000,
-    28800000000000,
-    36000000000000,
-    43200000000000,
-    50400000000000,
-    57600000000000,
-    64800000000000,
-    72000000000000,
-    79200000000000,
-    86400000000000
-  ],
+

Example response (without usage of binningScheme):

+
{
   "avgs": [
-    341.3874206542969,
-    341.5171203613281,
-    341.70989990234375,
-    341.2113952636719,
-    341.84088134765625,
-    342.1435546875,
-    341.16558837890625,
-    342.2756652832031,
-    342.9447326660156,
-    343.0351867675781,
-    342.6963195800781,
-    342.054931640625
+    16204.087890625,
+    16204.3798828125,
+    16203.9296875,
+    16204.232421875,
+    16202.974609375,
+    16203.208984375,
+    16203.4345703125
   ],
   "counts": [
-    71539,
-    71537,
-    71538,
-    71539,
-    71538,
-    71537,
-    71538,
-    71539,
-    71539,
-    71539,
-    71538,
-    71299
+    1000,
+    999,
+    1000,
+    999,
+    1000,
+    999,
+    1000
   ],
+  "finalisedRange": true,
   "maxs": [
-    450,
-    450,
-    462,
-    458,
-    454,
-    450,
-    452,
-    451,
-    450,
-    453,
-    464,
-    448
+    48096,
+    48100,
+    48094,
+    48096,
+    48096,
+    48095,
+    48096
   ],
   "mins": [
-    224,
-    239,
-    242,
-    235,
-    243,
-    239,
-    239,
-    241,
-    243,
-    229,
-    244,
-    225
+    0,
+    0,
+    0,
+    0,
+    0,
+    0,
+    0
+  ],
+  "tsAnchor": 1623769850,
+  "tsMs": [
+    0,
+    10000,
+    20000,
+    30000,
+    40000,
+    50000,
+    60000,
+    70000
+  ],
+  "tsNs": [
+    0,
+    0,
+    0,
+    0,
+    0,
+    0,
+    0,
+    0
+  ]
+}
+
+ +

Example response (waveform channel and usage of binningScheme):

+
{
+  "tsAnchor": 1623769950,
+  "tsMs": [
+    0,
+    10000,
+    20000,
+    30000,
+    40000,
+    50000,
+    60000,
+    70000
+  ],
+  "tsNs": [
+    0,
+    0,
+    0,
+    0,
+    0,
+    0,
+    0,
+    0
+  ],
+  "finalisedRange": true,
+  "counts": [
+    1000,
+    1000,
+    ...
+  ],
+  "avgs": [
+    [
+      0.013631398789584637,
+      34936.76953125,
+      45045.5078125,
+      31676.30859375,
+      880.7999877929688,
+      576.4010620117188,
+      295.1236877441406
+    ],
+    [
+      0.01851877197623253,
+      34935.734375,
+      45044.2734375,
+      31675.359375,
+      880.7310791015625,
+      576.3038330078125,
+      295.06134033203125
+    ],
+    ...
+  ],
+  "maxs": [
+    [
+      111,
+      48093,
+      45804,
+      47122,
+      1446,
+      783,
+      431
+    ],
+    [
+      120,
+      48092,
+      45803,
+      47124,
+      1452,
+      782,
+      431
+    ],
+    ...
+  ],
+  "mins": [
+    [
+      0,
+      0,
+      44329,
+      267,
+      519,
+      394,
+      0
+    ],
+    [
+      0,
+      0,
+      44327,
+      265,
+      514,
+      395,
+      0
+    ],
+    ...
   ]
 }
 
diff --git a/httpret/static/documentation/style.css b/httpret/static/documentation/style.css index 11b81dc..8129850 100644 --- a/httpret/static/documentation/style.css +++ b/httpret/static/documentation/style.css @@ -4,7 +4,7 @@ div, h1, h2, h3, h4, h5, pre, code, p { } h1, h2, h3, h4, h5 { - margin-top: 1.2em; + margin-top: 1.6em; margin-bottom: 0.6em; } diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index a0423d2..aa03cb2 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -880,3 +880,10 @@ pub struct ChannelSearchSingleResult { pub struct ChannelSearchResult { pub channels: Vec, } + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ProxyConfig { + pub listen: String, + pub port: u16, + pub search_hosts: Vec, +}