Enable streaming results for channel index
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
use err::Error;
|
||||
use netpod::log::*;
|
||||
use netpod::{Channel, NodeConfigCached};
|
||||
use netpod::{Channel, Database, NodeConfigCached};
|
||||
use std::time::Duration;
|
||||
use tokio_postgres::{Client, NoTls};
|
||||
|
||||
@@ -19,8 +19,8 @@ pub async fn delay_io_medium() {
|
||||
delay_us(2000).await;
|
||||
}
|
||||
|
||||
pub async fn create_connection(node_config: &NodeConfigCached) -> Result<Client, Error> {
|
||||
let d = &node_config.node_config.cluster.database;
|
||||
pub async fn create_connection(db_config: &Database) -> Result<Client, Error> {
|
||||
let d = db_config;
|
||||
let uri = format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, 5432, d.name);
|
||||
let (cl, conn) = tokio_postgres::connect(&uri, NoTls).await?;
|
||||
// TODO monitor connection drop.
|
||||
@@ -34,7 +34,7 @@ pub async fn create_connection(node_config: &NodeConfigCached) -> Result<Client,
|
||||
}
|
||||
|
||||
pub async fn channel_exists(channel: &Channel, node_config: &NodeConfigCached) -> Result<bool, Error> {
|
||||
let cl = create_connection(node_config).await?;
|
||||
let cl = create_connection(&node_config.node_config.cluster.database).await?;
|
||||
let rows = cl
|
||||
.query("select rowid from channels where name = $1::text", &[&channel.name])
|
||||
.await?;
|
||||
@@ -51,7 +51,7 @@ pub async fn channel_exists(channel: &Channel, node_config: &NodeConfigCached) -
|
||||
}
|
||||
|
||||
pub async fn database_size(node_config: &NodeConfigCached) -> Result<u64, Error> {
|
||||
let cl = create_connection(node_config).await?;
|
||||
let cl = create_connection(&node_config.node_config.cluster.database).await?;
|
||||
let rows = cl
|
||||
.query(
|
||||
"select pg_database_size($1::text)",
|
||||
@@ -82,7 +82,7 @@ pub async fn table_sizes(node_config: &NodeConfigCached) -> Result<TableSizes, E
|
||||
"ORDER BY pg_total_relation_size(C.oid) DESC LIMIT 20",
|
||||
);
|
||||
let sql = sql.as_str();
|
||||
let cl = create_connection(node_config).await?;
|
||||
let cl = create_connection(&node_config.node_config.cluster.database).await?;
|
||||
let rows = cl.query(sql, &[]).await?;
|
||||
let mut sizes = TableSizes { sizes: vec![] };
|
||||
sizes.sizes.push((format!("table"), format!("size")));
|
||||
@@ -94,7 +94,7 @@ pub async fn table_sizes(node_config: &NodeConfigCached) -> Result<TableSizes, E
|
||||
|
||||
pub async fn random_channel(node_config: &NodeConfigCached) -> Result<String, Error> {
|
||||
let sql = "select name from channels order by rowid limit 1 offset (random() * (select count(rowid) from channels))::bigint";
|
||||
let cl = create_connection(node_config).await?;
|
||||
let cl = create_connection(&node_config.node_config.cluster.database).await?;
|
||||
let rows = cl.query(sql, &[]).await?;
|
||||
if rows.len() == 0 {
|
||||
Err(Error::with_msg("can not get random channel"))?;
|
||||
|
||||
@@ -3,9 +3,9 @@ use async_channel::{bounded, Receiver};
|
||||
use chrono::{DateTime, Utc};
|
||||
use err::Error;
|
||||
use futures_core::Stream;
|
||||
use futures_util::{pin_mut, FutureExt};
|
||||
use futures_util::{pin_mut, FutureExt, StreamExt};
|
||||
use netpod::log::*;
|
||||
use netpod::NodeConfigCached;
|
||||
use netpod::{Database, NodeConfigCached};
|
||||
use pin_project::pin_project;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::future::Future;
|
||||
@@ -203,7 +203,9 @@ impl UpdatedDbWithChannelNamesStream {
|
||||
channel_inp_done: false,
|
||||
clist: vec![],
|
||||
};
|
||||
ret.client_fut = Some(Box::pin(crate::create_connection(ret.node_config_ref)));
|
||||
ret.client_fut = Some(Box::pin(crate::create_connection(
|
||||
&ret.node_config_ref.node_config.cluster.database,
|
||||
)));
|
||||
Ok(ret)
|
||||
}
|
||||
}
|
||||
@@ -320,115 +322,177 @@ async fn update_db_with_channel_name_list(list: Vec<String>, backend: i64, dbc:
|
||||
}
|
||||
|
||||
pub async fn update_db_with_channel_names(
|
||||
node_config: &NodeConfigCached,
|
||||
node_config: NodeConfigCached,
|
||||
db_config: &Database,
|
||||
) -> Result<Receiver<Result<UpdatedDbWithChannelNames, Error>>, Error> {
|
||||
let dbc = crate::create_connection(node_config).await?;
|
||||
let node_disk_ident = get_node_disk_ident(node_config, &dbc).await?;
|
||||
let c1 = Arc::new(RwLock::new(0u32));
|
||||
dbc.query("begin", &[]).await?;
|
||||
let dbc = Arc::new(dbc);
|
||||
find_channel_names_from_config(&node_config.node.data_base_path, |ch| {
|
||||
let ch = ch.to_owned();
|
||||
let dbc = dbc.clone();
|
||||
let c1 = c1.clone();
|
||||
let fac = node_disk_ident.facility;
|
||||
async move {
|
||||
crate::delay_io_short().await;
|
||||
dbc.query(
|
||||
"insert into channels (facility, name) values ($1, $2) on conflict do nothing",
|
||||
&[&fac, &ch],
|
||||
)
|
||||
.await?;
|
||||
let c2 = {
|
||||
let mut g = c1.write()?;
|
||||
*g += 1;
|
||||
*g
|
||||
};
|
||||
if c2 % 200 == 0 {
|
||||
trace!("channels {:6} current {}", c2, ch);
|
||||
dbc.query("commit", &[]).await?;
|
||||
crate::delay_io_medium().await;
|
||||
dbc.query("begin", &[]).await?;
|
||||
let (tx, rx) = bounded(16);
|
||||
let db_config = db_config.clone();
|
||||
tokio::spawn(async move {
|
||||
let dbc = crate::create_connection(&db_config).await?;
|
||||
let node_disk_ident = get_node_disk_ident(&node_config, &dbc).await?;
|
||||
let c1 = Arc::new(RwLock::new(0u32));
|
||||
dbc.query("begin", &[]).await?;
|
||||
let dbc = Arc::new(dbc);
|
||||
let tx = Arc::new(tx);
|
||||
find_channel_names_from_config(&node_config.node.data_base_path, |ch| {
|
||||
let ch = ch.to_owned();
|
||||
let dbc = dbc.clone();
|
||||
let c1 = c1.clone();
|
||||
let tx = tx.clone();
|
||||
let fac = node_disk_ident.facility;
|
||||
async move {
|
||||
crate::delay_io_short().await;
|
||||
dbc.query(
|
||||
"insert into channels (facility, name) values ($1, $2) on conflict do nothing",
|
||||
&[&fac, &ch],
|
||||
)
|
||||
.await?;
|
||||
let c2 = {
|
||||
let mut g = c1.write()?;
|
||||
*g += 1;
|
||||
*g
|
||||
};
|
||||
if c2 % 200 == 0 {
|
||||
dbc.query("commit", &[]).await?;
|
||||
let ret = UpdatedDbWithChannelNames {
|
||||
msg: format!("current {}", ch),
|
||||
count: c2,
|
||||
};
|
||||
tx.send(Ok(ret)).await?;
|
||||
crate::delay_io_medium().await;
|
||||
dbc.query("begin", &[]).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
})
|
||||
.await?;
|
||||
dbc.query("commit", &[]).await?;
|
||||
let _ret = UpdatedDbWithChannelNames {
|
||||
msg: format!("done"),
|
||||
count: *c1.read()?,
|
||||
};
|
||||
Ok(bounded(16).1)
|
||||
})
|
||||
.await?;
|
||||
dbc.query("commit", &[]).await?;
|
||||
let c2 = *c1.read()?;
|
||||
let ret = UpdatedDbWithChannelNames {
|
||||
msg: format!("all done"),
|
||||
count: c2,
|
||||
};
|
||||
tx.send(Ok(ret)).await?;
|
||||
Ok::<_, Error>(())
|
||||
});
|
||||
Ok(rx)
|
||||
}
|
||||
|
||||
pub fn update_db_with_channel_names_3<'a>(
|
||||
node_config: &'a NodeConfigCached,
|
||||
) -> impl Stream<Item = Result<UpdatedDbWithChannelNames, Error>> + 'static {
|
||||
futures_util::future::ready(node_config.node.data_base_path.clone())
|
||||
.then(|path| tokio::fs::read_dir(path))
|
||||
.map(Result::unwrap)
|
||||
.map(|rd| {
|
||||
futures_util::stream::unfold(rd, move |rd| {
|
||||
//let fut = rd.next_entry();
|
||||
futures_util::future::ready(Ok(None)).map(move |item: Result<Option<u32>, Error>| match item {
|
||||
Ok(Some(item)) => Some((item, rd)),
|
||||
Ok(None) => None,
|
||||
Err(_e) => None,
|
||||
})
|
||||
})
|
||||
})
|
||||
.map(|_conf| Err(Error::with_msg("TODO")))
|
||||
.into_stream()
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct UpdatedDbWithAllChannelConfigs {
|
||||
msg: String,
|
||||
count: u32,
|
||||
}
|
||||
|
||||
pub async fn update_db_with_all_channel_configs(
|
||||
node_config: &NodeConfigCached,
|
||||
) -> Result<UpdatedDbWithAllChannelConfigs, Error> {
|
||||
let dbc = crate::create_connection(node_config).await?;
|
||||
let dbc = Arc::new(dbc);
|
||||
let node_disk_ident = &get_node_disk_ident(node_config, &dbc).await?;
|
||||
let rows = dbc
|
||||
.query(
|
||||
"select rowid, facility, name from channels where facility = $1 order by facility, name",
|
||||
&[&node_config.node.backend],
|
||||
)
|
||||
.await?;
|
||||
let mut c1 = 0;
|
||||
dbc.query("begin", &[]).await?;
|
||||
let mut count_inserted = 0;
|
||||
let mut count_updated = 0;
|
||||
for row in rows {
|
||||
let rowid: i64 = row.try_get(0)?;
|
||||
let _facility: i64 = row.try_get(1)?;
|
||||
let channel: String = row.try_get(2)?;
|
||||
match update_db_with_channel_config(
|
||||
node_config,
|
||||
node_disk_ident,
|
||||
rowid,
|
||||
&channel,
|
||||
dbc.clone(),
|
||||
&mut count_inserted,
|
||||
&mut count_updated,
|
||||
)
|
||||
.await
|
||||
{
|
||||
/*Err(Error::ChannelConfigdirNotFound { .. }) => {
|
||||
warn!("can not find channel config {}", channel);
|
||||
crate::delay_io_medium().await;
|
||||
}*/
|
||||
Err(e) => {
|
||||
error!("{:?}", e);
|
||||
crate::delay_io_medium().await;
|
||||
}
|
||||
_ => {
|
||||
c1 += 1;
|
||||
if c1 % 200 == 0 {
|
||||
trace!(
|
||||
"channel no {:6} inserted {:6} updated {:6}",
|
||||
c1,
|
||||
count_inserted,
|
||||
count_updated
|
||||
);
|
||||
dbc.query("commit", &[]).await?;
|
||||
dbc.query("begin", &[]).await?;
|
||||
node_config: NodeConfigCached,
|
||||
) -> Result<Receiver<Result<UpdatedDbWithAllChannelConfigs, Error>>, Error> {
|
||||
let (tx, rx) = bounded(16);
|
||||
let tx = Arc::new(tx);
|
||||
let tx2 = tx.clone();
|
||||
tokio::spawn(
|
||||
async move {
|
||||
let node_config = &node_config;
|
||||
let dbc = crate::create_connection(&node_config.node_config.cluster.database).await?;
|
||||
let dbc = Arc::new(dbc);
|
||||
let node_disk_ident = &get_node_disk_ident(node_config, &dbc).await?;
|
||||
let rows = dbc
|
||||
.query(
|
||||
"select rowid, facility, name from channels where facility = $1 order by facility, name",
|
||||
&[&node_disk_ident.facility],
|
||||
)
|
||||
.await?;
|
||||
let mut c1 = 0;
|
||||
dbc.query("begin", &[]).await?;
|
||||
let mut count_inserted = 0;
|
||||
let mut count_updated = 0;
|
||||
for row in rows {
|
||||
let rowid: i64 = row.try_get(0)?;
|
||||
let _facility: i64 = row.try_get(1)?;
|
||||
let channel: String = row.try_get(2)?;
|
||||
match update_db_with_channel_config(
|
||||
node_config,
|
||||
node_disk_ident,
|
||||
rowid,
|
||||
&channel,
|
||||
dbc.clone(),
|
||||
&mut count_inserted,
|
||||
&mut count_updated,
|
||||
)
|
||||
.await
|
||||
{
|
||||
/*Err(Error::ChannelConfigdirNotFound { .. }) => {
|
||||
warn!("can not find channel config {}", channel);
|
||||
crate::delay_io_medium().await;
|
||||
}*/
|
||||
Err(e) => {
|
||||
error!("{:?}", e);
|
||||
crate::delay_io_medium().await;
|
||||
}
|
||||
_ => {
|
||||
c1 += 1;
|
||||
if c1 % 200 == 0 {
|
||||
dbc.query("commit", &[]).await?;
|
||||
let msg = format!(
|
||||
"channel no {:6} inserted {:6} updated {:6}",
|
||||
c1, count_inserted, count_updated
|
||||
);
|
||||
let ret = UpdatedDbWithAllChannelConfigs { msg, count: c1 };
|
||||
tx.send(Ok(ret)).await?;
|
||||
dbc.query("begin", &[]).await?;
|
||||
}
|
||||
crate::delay_io_short().await;
|
||||
}
|
||||
}
|
||||
crate::delay_io_short().await;
|
||||
}
|
||||
dbc.query("commit", &[]).await?;
|
||||
let msg = format!(
|
||||
"ALL DONE channel no {:6} inserted {:6} updated {:6}",
|
||||
c1, count_inserted, count_updated
|
||||
);
|
||||
let ret = UpdatedDbWithAllChannelConfigs { msg, count: c1 };
|
||||
tx.send(Ok(ret)).await?;
|
||||
Ok::<_, Error>(())
|
||||
}
|
||||
}
|
||||
dbc.query("commit", &[]).await?;
|
||||
let ret = UpdatedDbWithAllChannelConfigs { count: c1 };
|
||||
Ok(ret)
|
||||
.then({
|
||||
|item| async move {
|
||||
match item {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
let msg = format!("Seeing error: {:?}", e);
|
||||
let ret = UpdatedDbWithAllChannelConfigs { msg, count: 0 };
|
||||
tx2.send(Ok(ret)).await?;
|
||||
}
|
||||
}
|
||||
Ok::<_, Error>(())
|
||||
}
|
||||
}),
|
||||
);
|
||||
Ok(rx)
|
||||
}
|
||||
|
||||
pub async fn update_search_cache(node_config: &NodeConfigCached) -> Result<(), Error> {
|
||||
let dbc = crate::create_connection(node_config).await?;
|
||||
let dbc = crate::create_connection(&node_config.node_config.cluster.database).await?;
|
||||
dbc.query("select update_cache()", &[]).await?;
|
||||
Ok(())
|
||||
}
|
||||
@@ -517,7 +581,7 @@ pub async fn update_db_with_all_channel_datafiles(
|
||||
node_disk_ident: &NodeDiskIdent,
|
||||
ks_prefix: &str,
|
||||
) -> Result<(), Error> {
|
||||
let dbc = Arc::new(crate::create_connection(node_config).await?);
|
||||
let dbc = Arc::new(crate::create_connection(&node_config.node_config.cluster.database).await?);
|
||||
let rows = dbc
|
||||
.query(
|
||||
"select rowid, facility, name from channels where facility = $1 order by facility, name",
|
||||
|
||||
@@ -11,7 +11,7 @@ pub async fn search_channel(
|
||||
"channel_id, channel_name, source_name, dtype, shape, unit, description, channel_backend",
|
||||
" from searchext($1, $2, $3, $4)",
|
||||
));
|
||||
let cl = create_connection(node_config).await?;
|
||||
let cl = create_connection(&node_config.node_config.cluster.database).await?;
|
||||
let rows = cl
|
||||
.query(
|
||||
sql.as_str(),
|
||||
|
||||
Reference in New Issue
Block a user