Simplify cli option, add channel search

This commit is contained in:
Dominik Werder
2021-05-27 16:23:38 +02:00
parent e55751ad67
commit 53957a261a
9 changed files with 155 additions and 22 deletions

View File

@@ -3,6 +3,8 @@ use netpod::log::*;
use netpod::{Channel, NodeConfigCached}; use netpod::{Channel, NodeConfigCached};
use tokio_postgres::{Client, NoTls}; use tokio_postgres::{Client, NoTls};
pub mod search;
pub async fn create_connection(node_config: &NodeConfigCached) -> Result<Client, Error> { pub async fn create_connection(node_config: &NodeConfigCached) -> Result<Client, Error> {
let d = &node_config.node_config.cluster.database; let d = &node_config.node_config.cluster.database;
let uri = format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, 5432, d.name); let uri = format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, 5432, d.name);

58
dbconn/src/search.rs Normal file
View File

@@ -0,0 +1,58 @@
use crate::create_connection;
use err::Error;
use netpod::{ChannelSearchQuery, ChannelSearchResult, ChannelSearchSingleResult, NodeConfigCached};
pub async fn search_channel(
query: ChannelSearchQuery,
node_config: &NodeConfigCached,
) -> Result<ChannelSearchResult, Error> {
let sql = format!(concat!(
"select ",
"channel_id, channel_name, source_name, dtype, shape, unit, description",
" from searchext($1, $2, $3, $4)",
));
let cl = create_connection(node_config).await?;
let rows = cl
.query(
sql.as_str(),
&[&query.name_regex, &query.source_regex, &query.description_regex, &"asc"],
)
.await?;
let mut res = vec![];
for row in rows {
let shapedb: Option<serde_json::Value> = row.get(4);
let shape = match &shapedb {
Some(top) => match top {
serde_json::Value::Null => vec![],
serde_json::Value::Array(items) => {
let mut a = vec![];
for item in items {
match item {
serde_json::Value::Number(n) => match n.as_i64() {
Some(n) => {
a.push(n as u32);
}
None => return Err(Error::with_msg(format!("can not understand shape {:?}", shapedb))),
},
_ => return Err(Error::with_msg(format!("can not understand shape {:?}", shapedb))),
}
}
a
}
_ => return Err(Error::with_msg(format!("can not understand shape {:?}", shapedb))),
},
None => vec![],
};
let k = ChannelSearchSingleResult {
name: row.get(1),
source: row.get(2),
ty: row.get(3),
shape: shape,
unit: row.get(5),
description: row.get(6),
};
res.push(k);
}
let ret = ChannelSearchResult { channels: res };
Ok(ret)
}

View File

@@ -367,7 +367,7 @@ where
fn new(file: File) -> Self { fn new(file: File) -> Self {
Self { Self {
// TODO make buffer size a parameter: // TODO make buffer size a parameter:
buf: vec![0; 4096], buf: vec![0; 1024 * 32],
all: vec![], all: vec![],
file: Some(file), file: Some(file),
_marker: std::marker::PhantomData::default(), _marker: std::marker::PhantomData::default(),

View File

@@ -60,6 +60,19 @@ impl CacheUsage {
})?; })?;
Ok(ret) Ok(ret)
} }
pub fn from_string(s: &str) -> Result<Self, Error> {
let ret = if s == "ignore" {
CacheUsage::Ignore
} else if s == "recreate" {
CacheUsage::Recreate
} else if s == "use" {
CacheUsage::Use
} else {
return Err(Error::with_msg(format!("can not interpret cache usage string: {}", s)));
};
Ok(ret)
}
} }
impl Display for CacheUsage { impl Display for CacheUsage {

View File

@@ -21,6 +21,7 @@ use task::{Context, Poll};
use tracing::field::Empty; use tracing::field::Empty;
pub mod gather; pub mod gather;
pub mod search;
pub async fn host(node_config: NodeConfigCached) -> Result<(), Error> { pub async fn host(node_config: NodeConfigCached) -> Result<(), Error> {
let node_config = node_config.clone(); let node_config = node_config.clone();
@@ -115,6 +116,13 @@ async fn data_api_proxy_try(req: Request<Body>, node_config: &NodeConfigCached)
} else { } else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
} }
} else if path == "/api/4/search/channel" {
if req.method() == Method::GET {
// TODO multi-facility search
Ok(search::channel_search(req, &node_config).await?)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
} else if path == "/api/4/table_sizes" { } else if path == "/api/4/table_sizes" {
if req.method() == Method::GET { if req.method() == Method::GET {
Ok(table_sizes(req, &node_config).await?) Ok(table_sizes(req, &node_config).await?)
@@ -254,7 +262,8 @@ async fn binned(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Re
let query = disk::cache::BinnedQuery::from_request(&head)?; let query = disk::cache::BinnedQuery::from_request(&head)?;
match head.headers.get("accept") { match head.headers.get("accept") {
Some(v) if v == "application/octet-stream" => binned_binary(query, node_config).await, Some(v) if v == "application/octet-stream" => binned_binary(query, node_config).await,
_ => binned_json(query, node_config).await, Some(v) if v == "application/json" => binned_json(query, node_config).await,
_ => Err(Error::with_msg("binned with unknown accept")),
} }
} }

12
httpret/src/search.rs Normal file
View File

@@ -0,0 +1,12 @@
use err::Error;
use hyper::{Body, Request, Response, StatusCode};
use netpod::{ChannelSearchQuery, NodeConfigCached};
pub async fn channel_search(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
let (head, _body) = req.into_parts();
let query = ChannelSearchQuery::from_request(head.uri.query())?;
let res = dbconn::search::search_channel(query, node_config).await?;
let body = Body::from(serde_json::to_string(&res)?);
let ret = super::response(StatusCode::OK).body(body)?;
Ok(ret)
}

View File

@@ -10,6 +10,7 @@
</head> </head>
<body> <body>
<h1>Retrieval Documentation</h1> <h1>Retrieval Documentation</h1>
<h2>HTTP API documentation</h2> <h2>HTTP API documentation</h2>
@@ -19,12 +20,13 @@
<p>Currently available:</p> <p>Currently available:</p>
<ul> <ul>
<li><a href="#query-binned">Query binned data</a></li> <li><a href="#query-binned">Query binned data</a></li>
<li><a href="#search-channel">Search channel</a></li>
</ul> </ul>
<a id="query-binned"></a> <a id="query-binned"></a>
<h2>Query binned data</h2> <h2>Query binned data</h2>
<p><strong>Method:</strong> GET</p> <p><strong>Method:</strong> GET</p>
<p><strong>URL:</strong> https://data-api.psi.ch/api/4/binned</p> <p><strong>URL:</strong> http://sf-daqbuf-21:8380/api/4/binned</p>
<p><strong>Query parameters:</strong></p> <p><strong>Query parameters:</strong></p>
<ul> <ul>
<li>channel_backend (e.g. "sf-databuffer")</li> <li>channel_backend (e.g. "sf-databuffer")</li>
@@ -89,6 +91,19 @@ curl -H 'Accept: application/json' 'http://sf-daqbuf-21:8380/api/4/binned?channe
then it will add the flag <strong>finalised_range</strong> to the response.</p> then it will add the flag <strong>finalised_range</strong> to the response.</p>
<a id="search-channel"></a>
<h2>Search channel</h2>
<p><strong>Method:</strong> GET</p>
<p><strong>URL:</strong> http://sf-daqbuf-21:8380/api/4/search/channel</p>
<p><strong>Query parameters:</strong></p>
<ul>
<li>nameRegex (e.g. "LSCP.*6")</li>
<li>sourceRegex (e.g. "178:9999")</li>
<li>descriptionRegex (e.g. "celsius")</li>
</ul>
<p><strong>Request header:</strong> "Accept" must be "application/json"</p>
<h2>Feedback and comments very much appreciated!</h2> <h2>Feedback and comments very much appreciated!</h2>
<p>dominik.werder@psi.ch</p> <p>dominik.werder@psi.ch</p>
<p>or please assign me a JIRA ticket.</p> <p>or please assign me a JIRA ticket.</p>

View File

@@ -758,3 +758,38 @@ impl ByteSize {
self.0 self.0
} }
} }
#[derive(Serialize, Deserialize)]
pub struct ChannelSearchQuery {
pub name_regex: String,
pub source_regex: String,
pub description_regex: String,
}
impl ChannelSearchQuery {
pub fn from_request(query: Option<&str>) -> Result<Self, Error> {
let params = query_params(query);
let ret = Self {
name_regex: params.get("nameRegex").map_or("".into(), |k| k.into()),
source_regex: params.get("sourceRegex").map_or("".into(), |k| k.into()),
description_regex: params.get("descriptionRegex").map_or("".into(), |k| k.into()),
};
Ok(ret)
}
}
#[derive(Serialize, Deserialize)]
pub struct ChannelSearchSingleResult {
pub name: String,
pub source: String,
#[serde(rename = "type")]
pub ty: String,
pub shape: Vec<u32>,
pub unit: String,
pub description: String,
}
#[derive(Serialize, Deserialize)]
pub struct ChannelSearchResult {
pub channels: Vec<ChannelSearchSingleResult>,
}

View File

@@ -38,6 +38,11 @@ fn parse_ts_rel(s: &str) -> Result<DateTime<Utc>, Error> {
} }
} }
fn parse_ts(s: &str) -> Result<DateTime<Utc>, Error> {
let ret = if s.contains("-") { s.parse()? } else { parse_ts_rel(s)? };
Ok(ret)
}
async fn go() -> Result<(), Error> { async fn go() -> Result<(), Error> {
use clap::Clap; use clap::Clap;
use retrieval::cli::{ClientType, Opts, SubCmd}; use retrieval::cli::{ClientType, Opts, SubCmd};
@@ -60,25 +65,9 @@ async fn go() -> Result<(), Error> {
retrieval::client::status(opts.host, opts.port).await?; retrieval::client::status(opts.host, opts.port).await?;
} }
ClientType::Binned(opts) => { ClientType::Binned(opts) => {
let beg = if opts.beg.contains("-") { let beg = parse_ts(&opts.beg)?;
opts.beg.parse()? let end = parse_ts(&opts.end)?;
} else { let cache_usage = CacheUsage::from_string(&opts.cache)?;
parse_ts_rel(&opts.beg)?
};
let end = if opts.end.contains("-") {
opts.end.parse()?
} else {
parse_ts_rel(&opts.end)?
};
let cache_usage = if opts.cache == "ignore" {
CacheUsage::Ignore
} else if opts.cache == "recreate" {
CacheUsage::Recreate
} else if opts.cache == "use" {
CacheUsage::Use
} else {
return Err(Error::with_msg(format!("can not interpret --cache {}", opts.cache)));
};
retrieval::client::get_binned( retrieval::client::get_binned(
opts.host, opts.host,
opts.port, opts.port,