Add scan endpoints

This commit is contained in:
Dominik Werder
2021-05-31 14:31:29 +02:00
parent d3fa58563b
commit f165530263
4 changed files with 89 additions and 26 deletions

View File

@@ -3,14 +3,13 @@ use err::Error;
use netpod::log::*;
use netpod::NodeConfigCached;
use serde::{Deserialize, Serialize};
use std::cell::RefCell;
use std::future::Future;
use std::io::ErrorKind;
use std::os::unix::ffi::OsStringExt;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::rc::Rc;
use tokio_postgres::Client;
use std::sync::{RwLock, Arc};
#[derive(Debug, Serialize, Deserialize)]
pub struct NodeDiskIdent {
@@ -34,9 +33,8 @@ fn _get_hostname() -> Result<String, Error> {
Ok(String::from_utf8(out.stdout[..out.stdout.len() - 1].to_vec())?)
}
pub async fn get_node_disk_ident(node_config: &NodeConfigCached) -> Result<NodeDiskIdent, Error> {
let con1 = crate::create_connection(node_config).await?;
let rows = con1.query("select nodes.rowid, facility, split, hostname from nodes, facilities where facilities.name = $1 and facility = facilities.rowid and hostname = $2", &[&node_config.node.backend, &node_config.node.host]).await?;
pub async fn get_node_disk_ident(node_config: &NodeConfigCached, dbc: &Client) -> Result<NodeDiskIdent, Error> {
let rows = dbc.query("select nodes.rowid, facility, split, hostname from nodes, facilities where facilities.name = $1 and facility = facilities.rowid and hostname = $2", &[&node_config.node.backend, &node_config.node.host]).await?;
if rows.len() != 1 {
return Err(Error::with_msg(format!(
"get_node can't find unique entry for {} {}",
@@ -67,10 +65,15 @@ where
Ok(())
}
pub async fn update_db_with_channel_names(node_config: &NodeConfigCached) -> Result<(), Error> {
let node_ident = get_node_disk_ident(node_config).await?;
let c1 = Rc::new(RefCell::new(0u32));
#[derive(Debug, Serialize, Deserialize)]
pub struct UpdatedDbWithChannelNames {
count: u32,
}
pub async fn update_db_with_channel_names(node_config: &NodeConfigCached) -> Result<UpdatedDbWithChannelNames, Error> {
let dbc = crate::create_connection(node_config).await?;
let node_ident = get_node_disk_ident(node_config, &dbc).await?;
let c1 = Arc::new(RwLock::new(0u32));
let rows = dbc
.query("select rowid from facilities where name = $1", &[&node_ident.facility])
.await?;
@@ -82,7 +85,7 @@ pub async fn update_db_with_channel_names(node_config: &NodeConfigCached) -> Res
}
let facility: i64 = rows[0].try_get(0)?;
dbc.query("begin", &[]).await?;
let dbc = Rc::new(dbc);
let dbc = Arc::new(dbc);
find_channel_names_from_config(&node_config.node.data_base_path, |ch| {
let ch = ch.to_owned();
let dbc = dbc.clone();
@@ -94,8 +97,11 @@ pub async fn update_db_with_channel_names(node_config: &NodeConfigCached) -> Res
&[&facility, &ch],
)
.await?;
*c1.borrow_mut() += 1;
let c2 = *c1.borrow();
let c2 = {
let mut g = c1.write()?;
*g += 1;
*g
};
if c2 % 200 == 0 {
trace!("channels {:6} current {}", c2, ch);
dbc.query("commit", &[]).await?;
@@ -107,13 +113,21 @@ pub async fn update_db_with_channel_names(node_config: &NodeConfigCached) -> Res
})
.await?;
dbc.query("commit", &[]).await?;
Ok(())
let ret = UpdatedDbWithChannelNames {
count: *c1.read()?,
};
Ok(ret)
}
pub async fn update_db_with_all_channel_configs(node_config: &NodeConfigCached, ks_prefix: &str) -> Result<(), Error> {
#[derive(Debug, Serialize, Deserialize)]
pub struct UpdatedDbWithAllChannelConfigs {
count: u32,
}
pub async fn update_db_with_all_channel_configs(node_config: &NodeConfigCached) -> Result<UpdatedDbWithAllChannelConfigs, Error> {
let dbc = crate::create_connection(node_config).await?;
let dbc = Rc::new(dbc);
let node_disk_ident = &get_node_disk_ident(node_config).await?;
let dbc = Arc::new(dbc);
let node_disk_ident = &get_node_disk_ident(node_config, &dbc).await?;
let rows = dbc
.query(
"select rowid, facility, name from channels where facility = $1 order by facility, name",
@@ -131,7 +145,6 @@ pub async fn update_db_with_all_channel_configs(node_config: &NodeConfigCached,
match update_db_with_channel_config(
node_config,
node_disk_ident,
ks_prefix,
rowid,
&channel,
dbc.clone(),
@@ -165,7 +178,10 @@ pub async fn update_db_with_all_channel_configs(node_config: &NodeConfigCached,
}
}
dbc.query("commit", &[]).await?;
Ok(())
let ret = UpdatedDbWithAllChannelConfigs {
count: c1,
};
Ok(ret)
}
pub async fn update_search_cache(node_config: &NodeConfigCached) -> Result<(), Error> {
@@ -180,10 +196,9 @@ Parse the config of the given channel and update database.
pub async fn update_db_with_channel_config(
node_config: &NodeConfigCached,
node_disk_ident: &NodeDiskIdent,
_ks_prefix: &str,
channel_id: i64,
channel: &str,
dbc: Rc<Client>,
dbc: Arc<Client>,
count_inserted: &mut usize,
count_updated: &mut usize,
) -> Result<(), Error> {
@@ -259,7 +274,7 @@ pub async fn update_db_with_all_channel_datafiles(
node_disk_ident: &NodeDiskIdent,
ks_prefix: &str,
) -> Result<(), Error> {
let dbc = Rc::new(crate::create_connection(node_config).await?);
let dbc = Arc::new(crate::create_connection(node_config).await?);
let rows = dbc
.query(
"select rowid, facility, name from channels where facility = $1 order by facility, name",
@@ -290,9 +305,10 @@ pub async fn update_db_with_all_channel_datafiles(
struct DatafileDbWriter {
channel_id: i64,
node_id: i64,
dbc: Rc<Client>,
c1: Rc<RefCell<u32>>,
dbc: Arc<Client>,
c1: Arc<RwLock<u32>>,
}
#[derive(Debug, Serialize)]
pub struct ChannelDesc {
name: String,
@@ -347,7 +363,7 @@ impl ChannelDatafileDescSink for DatafileDbWriter {
&serde_json::to_value(k)?,
]
).await?;
*c1.try_borrow_mut().unwrap() += 1;
*c1.write()? += 1;
Ok(())
})
}
@@ -430,13 +446,13 @@ pub async fn update_db_with_channel_datafiles(
ks_prefix: &str,
channel_id: i64,
channel: &str,
dbc: Rc<Client>,
dbc: Arc<Client>,
) -> Result<(), Error> {
let writer = DatafileDbWriter {
node_id: node_disk_ident.rowid(),
channel_id: channel_id,
dbc: dbc.clone(),
c1: Rc::new(RefCell::new(0)),
c1: Arc::new(RwLock::new(0)),
};
let mut n_nothing = 0;
for ks in &[2, 3, 4] {
@@ -451,7 +467,7 @@ pub async fn update_db_with_channel_datafiles(
x?;
}
};
if false && *writer.c1.borrow() >= 10 {
if false && *writer.c1.read()? >= 10 {
break;
}
}