From f1655302632f749f40ea01876cc2b7c6fdfc90d7 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Mon, 31 May 2021 14:31:29 +0200 Subject: [PATCH] Add scan endpoints --- dbconn/src/scan.rs | 68 +++++++++++++++---------- err/src/lib.rs | 7 +++ httpret/src/lib.rs | 38 ++++++++++++++ httpret/static/documentation/index.html | 2 + 4 files changed, 89 insertions(+), 26 deletions(-) diff --git a/dbconn/src/scan.rs b/dbconn/src/scan.rs index cd03e28..12571d5 100644 --- a/dbconn/src/scan.rs +++ b/dbconn/src/scan.rs @@ -3,14 +3,13 @@ use err::Error; use netpod::log::*; use netpod::NodeConfigCached; use serde::{Deserialize, Serialize}; -use std::cell::RefCell; use std::future::Future; use std::io::ErrorKind; use std::os::unix::ffi::OsStringExt; use std::path::{Path, PathBuf}; use std::pin::Pin; -use std::rc::Rc; use tokio_postgres::Client; +use std::sync::{RwLock, Arc}; #[derive(Debug, Serialize, Deserialize)] pub struct NodeDiskIdent { @@ -34,9 +33,8 @@ fn _get_hostname() -> Result { Ok(String::from_utf8(out.stdout[..out.stdout.len() - 1].to_vec())?) } -pub async fn get_node_disk_ident(node_config: &NodeConfigCached) -> Result { - let con1 = crate::create_connection(node_config).await?; - let rows = con1.query("select nodes.rowid, facility, split, hostname from nodes, facilities where facilities.name = $1 and facility = facilities.rowid and hostname = $2", &[&node_config.node.backend, &node_config.node.host]).await?; +pub async fn get_node_disk_ident(node_config: &NodeConfigCached, dbc: &Client) -> Result { + let rows = dbc.query("select nodes.rowid, facility, split, hostname from nodes, facilities where facilities.name = $1 and facility = facilities.rowid and hostname = $2", &[&node_config.node.backend, &node_config.node.host]).await?; if rows.len() != 1 { return Err(Error::with_msg(format!( "get_node can't find unique entry for {} {}", @@ -67,10 +65,15 @@ where Ok(()) } -pub async fn update_db_with_channel_names(node_config: &NodeConfigCached) -> Result<(), Error> { - let node_ident = get_node_disk_ident(node_config).await?; - let c1 = Rc::new(RefCell::new(0u32)); +#[derive(Debug, Serialize, Deserialize)] +pub struct UpdatedDbWithChannelNames { + count: u32, +} + +pub async fn update_db_with_channel_names(node_config: &NodeConfigCached) -> Result { let dbc = crate::create_connection(node_config).await?; + let node_ident = get_node_disk_ident(node_config, &dbc).await?; + let c1 = Arc::new(RwLock::new(0u32)); let rows = dbc .query("select rowid from facilities where name = $1", &[&node_ident.facility]) .await?; @@ -82,7 +85,7 @@ pub async fn update_db_with_channel_names(node_config: &NodeConfigCached) -> Res } let facility: i64 = rows[0].try_get(0)?; dbc.query("begin", &[]).await?; - let dbc = Rc::new(dbc); + let dbc = Arc::new(dbc); find_channel_names_from_config(&node_config.node.data_base_path, |ch| { let ch = ch.to_owned(); let dbc = dbc.clone(); @@ -94,8 +97,11 @@ pub async fn update_db_with_channel_names(node_config: &NodeConfigCached) -> Res &[&facility, &ch], ) .await?; - *c1.borrow_mut() += 1; - let c2 = *c1.borrow(); + let c2 = { + let mut g = c1.write()?; + *g += 1; + *g + }; if c2 % 200 == 0 { trace!("channels {:6} current {}", c2, ch); dbc.query("commit", &[]).await?; @@ -107,13 +113,21 @@ pub async fn update_db_with_channel_names(node_config: &NodeConfigCached) -> Res }) .await?; dbc.query("commit", &[]).await?; - Ok(()) + let ret = UpdatedDbWithChannelNames { + count: *c1.read()?, + }; + Ok(ret) } -pub async fn update_db_with_all_channel_configs(node_config: &NodeConfigCached, ks_prefix: &str) -> Result<(), Error> { +#[derive(Debug, Serialize, Deserialize)] +pub struct UpdatedDbWithAllChannelConfigs { + count: u32, +} + +pub async fn update_db_with_all_channel_configs(node_config: &NodeConfigCached) -> Result { let dbc = crate::create_connection(node_config).await?; - let dbc = Rc::new(dbc); - let node_disk_ident = &get_node_disk_ident(node_config).await?; + let dbc = Arc::new(dbc); + let node_disk_ident = &get_node_disk_ident(node_config, &dbc).await?; let rows = dbc .query( "select rowid, facility, name from channels where facility = $1 order by facility, name", @@ -131,7 +145,6 @@ pub async fn update_db_with_all_channel_configs(node_config: &NodeConfigCached, match update_db_with_channel_config( node_config, node_disk_ident, - ks_prefix, rowid, &channel, dbc.clone(), @@ -165,7 +178,10 @@ pub async fn update_db_with_all_channel_configs(node_config: &NodeConfigCached, } } dbc.query("commit", &[]).await?; - Ok(()) + let ret = UpdatedDbWithAllChannelConfigs { + count: c1, + }; + Ok(ret) } pub async fn update_search_cache(node_config: &NodeConfigCached) -> Result<(), Error> { @@ -180,10 +196,9 @@ Parse the config of the given channel and update database. pub async fn update_db_with_channel_config( node_config: &NodeConfigCached, node_disk_ident: &NodeDiskIdent, - _ks_prefix: &str, channel_id: i64, channel: &str, - dbc: Rc, + dbc: Arc, count_inserted: &mut usize, count_updated: &mut usize, ) -> Result<(), Error> { @@ -259,7 +274,7 @@ pub async fn update_db_with_all_channel_datafiles( node_disk_ident: &NodeDiskIdent, ks_prefix: &str, ) -> Result<(), Error> { - let dbc = Rc::new(crate::create_connection(node_config).await?); + let dbc = Arc::new(crate::create_connection(node_config).await?); let rows = dbc .query( "select rowid, facility, name from channels where facility = $1 order by facility, name", @@ -290,9 +305,10 @@ pub async fn update_db_with_all_channel_datafiles( struct DatafileDbWriter { channel_id: i64, node_id: i64, - dbc: Rc, - c1: Rc>, + dbc: Arc, + c1: Arc>, } + #[derive(Debug, Serialize)] pub struct ChannelDesc { name: String, @@ -347,7 +363,7 @@ impl ChannelDatafileDescSink for DatafileDbWriter { &serde_json::to_value(k)?, ] ).await?; - *c1.try_borrow_mut().unwrap() += 1; + *c1.write()? += 1; Ok(()) }) } @@ -430,13 +446,13 @@ pub async fn update_db_with_channel_datafiles( ks_prefix: &str, channel_id: i64, channel: &str, - dbc: Rc, + dbc: Arc, ) -> Result<(), Error> { let writer = DatafileDbWriter { node_id: node_disk_ident.rowid(), channel_id: channel_id, dbc: dbc.clone(), - c1: Rc::new(RefCell::new(0)), + c1: Arc::new(RwLock::new(0)), }; let mut n_nothing = 0; for ks in &[2, 3, 4] { @@ -451,7 +467,7 @@ pub async fn update_db_with_channel_datafiles( x?; } }; - if false && *writer.c1.borrow() >= 10 { + if false && *writer.c1.read()? >= 10 { break; } } diff --git a/err/src/lib.rs b/err/src/lib.rs index 0178a69..5cb9fda 100644 --- a/err/src/lib.rs +++ b/err/src/lib.rs @@ -10,6 +10,7 @@ use std::net::AddrParseError; use std::num::{ParseFloatError, ParseIntError}; use std::string::FromUtf8Error; use tokio::task::JoinError; +use std::sync::PoisonError; /** The common error type for this application. @@ -237,6 +238,12 @@ impl From for Error { } } +impl From> for Error { + fn from(_: PoisonError) -> Self { + Self::with_msg("PoisonError") + } +} + pub fn todo() { todo!("TODO"); } diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index 74c886c..6f88c3c 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -159,6 +159,18 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } + } else if path == "/api/4/update_db_with_channel_names" { + if req.method() == Method::GET { + Ok(update_db_with_channel_names(req, &node_config).await?) + } else { + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) + } + } else if path == "/api/4/update_db_with_all_channel_configs" { + if req.method() == Method::GET { + Ok(update_db_with_all_channel_configs(req, &node_config).await?) + } else { + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) + } } else if path.starts_with("/api/4/documentation/") { if req.method() == Method::GET { static_http!(path, "", "index.html", "text/html"); @@ -374,6 +386,32 @@ pub async fn clear_cache_all(req: Request, node_config: &NodeConfigCached) None => false, }; let res = disk::cache::clear_cache_all(node_config, dry).await?; + let ret = response(StatusCode::OK) + .header(http::header::CONTENT_TYPE, "application/json") + .body(Body::from(serde_json::to_string(&res)?))?; + Ok(ret) +} + +pub async fn update_db_with_channel_names(req: Request, node_config: &NodeConfigCached) -> Result, Error> { + let (head, _body) = req.into_parts(); + let _dry = match head.uri.query() { + Some(q) => q.contains("dry"), + None => false, + }; + let res = dbconn::scan::update_db_with_channel_names(node_config).await?; + let ret = response(StatusCode::OK) + .header(http::header::CONTENT_TYPE, "application/json") + .body(Body::from(serde_json::to_string(&res)?))?; + Ok(ret) +} + +pub async fn update_db_with_all_channel_configs(req: Request, node_config: &NodeConfigCached) -> Result, Error> { + let (head, _body) = req.into_parts(); + let _dry = match head.uri.query() { + Some(q) => q.contains("dry"), + None => false, + }; + let res = dbconn::scan::update_db_with_all_channel_configs(node_config).await?; let ret = response(StatusCode::OK) .header(http::header::CONTENT_TYPE, "application/json") .body(Body::from(serde_json::to_string(&res)?))?; diff --git a/httpret/static/documentation/index.html b/httpret/static/documentation/index.html index 000cbc4..f0f9361 100644 --- a/httpret/static/documentation/index.html +++ b/httpret/static/documentation/index.html @@ -121,6 +121,7 @@ curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/search/channel "channels": [ { "name": "S10MA01-DBPM120:Y2", + "backend": "sf-databuffer", "source": "tcp://S20-CVME-DBPM2371:9000", "type": "Float32", "shape": [], @@ -129,6 +130,7 @@ curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/search/channel }, { "name": "S20SY02-DBPM120:Y2", + "backend": "sf-databuffer", "source": "tcp://S20-CVME-DBPM2371:9000", "type": "Float32", "shape": [],