From 79e3a1ea4750084d5fd371306a54691b07b79d91 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 2 Jun 2021 17:18:49 +0200 Subject: [PATCH] Enable streaming results for channel index --- dbconn/src/lib.rs | 14 +-- dbconn/src/scan.rs | 258 +++++++++++++++++++++++++++---------------- dbconn/src/search.rs | 2 +- httpret/src/lib.rs | 46 ++++++-- 4 files changed, 208 insertions(+), 112 deletions(-) diff --git a/dbconn/src/lib.rs b/dbconn/src/lib.rs index 4f2b26d..8ca1375 100644 --- a/dbconn/src/lib.rs +++ b/dbconn/src/lib.rs @@ -1,6 +1,6 @@ use err::Error; use netpod::log::*; -use netpod::{Channel, NodeConfigCached}; +use netpod::{Channel, Database, NodeConfigCached}; use std::time::Duration; use tokio_postgres::{Client, NoTls}; @@ -19,8 +19,8 @@ pub async fn delay_io_medium() { delay_us(2000).await; } -pub async fn create_connection(node_config: &NodeConfigCached) -> Result { - let d = &node_config.node_config.cluster.database; +pub async fn create_connection(db_config: &Database) -> Result { + let d = db_config; let uri = format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, 5432, d.name); let (cl, conn) = tokio_postgres::connect(&uri, NoTls).await?; // TODO monitor connection drop. @@ -34,7 +34,7 @@ pub async fn create_connection(node_config: &NodeConfigCached) -> Result Result { - let cl = create_connection(node_config).await?; + let cl = create_connection(&node_config.node_config.cluster.database).await?; let rows = cl .query("select rowid from channels where name = $1::text", &[&channel.name]) .await?; @@ -51,7 +51,7 @@ pub async fn channel_exists(channel: &Channel, node_config: &NodeConfigCached) - } pub async fn database_size(node_config: &NodeConfigCached) -> Result { - let cl = create_connection(node_config).await?; + let cl = create_connection(&node_config.node_config.cluster.database).await?; let rows = cl .query( "select pg_database_size($1::text)", @@ -82,7 +82,7 @@ pub async fn table_sizes(node_config: &NodeConfigCached) -> Result Result Result { let sql = "select name from channels order by rowid limit 1 offset (random() * (select count(rowid) from channels))::bigint"; - let cl = create_connection(node_config).await?; + let cl = create_connection(&node_config.node_config.cluster.database).await?; let rows = cl.query(sql, &[]).await?; if rows.len() == 0 { Err(Error::with_msg("can not get random channel"))?; diff --git a/dbconn/src/scan.rs b/dbconn/src/scan.rs index fe9b743..58cfb7e 100644 --- a/dbconn/src/scan.rs +++ b/dbconn/src/scan.rs @@ -3,9 +3,9 @@ use async_channel::{bounded, Receiver}; use chrono::{DateTime, Utc}; use err::Error; use futures_core::Stream; -use futures_util::{pin_mut, FutureExt}; +use futures_util::{pin_mut, FutureExt, StreamExt}; use netpod::log::*; -use netpod::NodeConfigCached; +use netpod::{Database, NodeConfigCached}; use pin_project::pin_project; use serde::{Deserialize, Serialize}; use std::future::Future; @@ -203,7 +203,9 @@ impl UpdatedDbWithChannelNamesStream { channel_inp_done: false, clist: vec![], }; - ret.client_fut = Some(Box::pin(crate::create_connection(ret.node_config_ref))); + ret.client_fut = Some(Box::pin(crate::create_connection( + &ret.node_config_ref.node_config.cluster.database, + ))); Ok(ret) } } @@ -320,115 +322,177 @@ async fn update_db_with_channel_name_list(list: Vec, backend: i64, dbc: } pub async fn update_db_with_channel_names( - node_config: &NodeConfigCached, + node_config: NodeConfigCached, + db_config: &Database, ) -> Result>, Error> { - let dbc = crate::create_connection(node_config).await?; - let node_disk_ident = get_node_disk_ident(node_config, &dbc).await?; - let c1 = Arc::new(RwLock::new(0u32)); - dbc.query("begin", &[]).await?; - let dbc = Arc::new(dbc); - find_channel_names_from_config(&node_config.node.data_base_path, |ch| { - let ch = ch.to_owned(); - let dbc = dbc.clone(); - let c1 = c1.clone(); - let fac = node_disk_ident.facility; - async move { - crate::delay_io_short().await; - dbc.query( - "insert into channels (facility, name) values ($1, $2) on conflict do nothing", - &[&fac, &ch], - ) - .await?; - let c2 = { - let mut g = c1.write()?; - *g += 1; - *g - }; - if c2 % 200 == 0 { - trace!("channels {:6} current {}", c2, ch); - dbc.query("commit", &[]).await?; - crate::delay_io_medium().await; - dbc.query("begin", &[]).await?; + let (tx, rx) = bounded(16); + let db_config = db_config.clone(); + tokio::spawn(async move { + let dbc = crate::create_connection(&db_config).await?; + let node_disk_ident = get_node_disk_ident(&node_config, &dbc).await?; + let c1 = Arc::new(RwLock::new(0u32)); + dbc.query("begin", &[]).await?; + let dbc = Arc::new(dbc); + let tx = Arc::new(tx); + find_channel_names_from_config(&node_config.node.data_base_path, |ch| { + let ch = ch.to_owned(); + let dbc = dbc.clone(); + let c1 = c1.clone(); + let tx = tx.clone(); + let fac = node_disk_ident.facility; + async move { + crate::delay_io_short().await; + dbc.query( + "insert into channels (facility, name) values ($1, $2) on conflict do nothing", + &[&fac, &ch], + ) + .await?; + let c2 = { + let mut g = c1.write()?; + *g += 1; + *g + }; + if c2 % 200 == 0 { + dbc.query("commit", &[]).await?; + let ret = UpdatedDbWithChannelNames { + msg: format!("current {}", ch), + count: c2, + }; + tx.send(Ok(ret)).await?; + crate::delay_io_medium().await; + dbc.query("begin", &[]).await?; + } + Ok(()) } - Ok(()) - } - }) - .await?; - dbc.query("commit", &[]).await?; - let _ret = UpdatedDbWithChannelNames { - msg: format!("done"), - count: *c1.read()?, - }; - Ok(bounded(16).1) + }) + .await?; + dbc.query("commit", &[]).await?; + let c2 = *c1.read()?; + let ret = UpdatedDbWithChannelNames { + msg: format!("all done"), + count: c2, + }; + tx.send(Ok(ret)).await?; + Ok::<_, Error>(()) + }); + Ok(rx) +} + +pub fn update_db_with_channel_names_3<'a>( + node_config: &'a NodeConfigCached, +) -> impl Stream> + 'static { + futures_util::future::ready(node_config.node.data_base_path.clone()) + .then(|path| tokio::fs::read_dir(path)) + .map(Result::unwrap) + .map(|rd| { + futures_util::stream::unfold(rd, move |rd| { + //let fut = rd.next_entry(); + futures_util::future::ready(Ok(None)).map(move |item: Result, Error>| match item { + Ok(Some(item)) => Some((item, rd)), + Ok(None) => None, + Err(_e) => None, + }) + }) + }) + .map(|_conf| Err(Error::with_msg("TODO"))) + .into_stream() } #[derive(Debug, Serialize, Deserialize)] pub struct UpdatedDbWithAllChannelConfigs { + msg: String, count: u32, } pub async fn update_db_with_all_channel_configs( - node_config: &NodeConfigCached, -) -> Result { - let dbc = crate::create_connection(node_config).await?; - let dbc = Arc::new(dbc); - let node_disk_ident = &get_node_disk_ident(node_config, &dbc).await?; - let rows = dbc - .query( - "select rowid, facility, name from channels where facility = $1 order by facility, name", - &[&node_config.node.backend], - ) - .await?; - let mut c1 = 0; - dbc.query("begin", &[]).await?; - let mut count_inserted = 0; - let mut count_updated = 0; - for row in rows { - let rowid: i64 = row.try_get(0)?; - let _facility: i64 = row.try_get(1)?; - let channel: String = row.try_get(2)?; - match update_db_with_channel_config( - node_config, - node_disk_ident, - rowid, - &channel, - dbc.clone(), - &mut count_inserted, - &mut count_updated, - ) - .await - { - /*Err(Error::ChannelConfigdirNotFound { .. }) => { - warn!("can not find channel config {}", channel); - crate::delay_io_medium().await; - }*/ - Err(e) => { - error!("{:?}", e); - crate::delay_io_medium().await; - } - _ => { - c1 += 1; - if c1 % 200 == 0 { - trace!( - "channel no {:6} inserted {:6} updated {:6}", - c1, - count_inserted, - count_updated - ); - dbc.query("commit", &[]).await?; - dbc.query("begin", &[]).await?; + node_config: NodeConfigCached, +) -> Result>, Error> { + let (tx, rx) = bounded(16); + let tx = Arc::new(tx); + let tx2 = tx.clone(); + tokio::spawn( + async move { + let node_config = &node_config; + let dbc = crate::create_connection(&node_config.node_config.cluster.database).await?; + let dbc = Arc::new(dbc); + let node_disk_ident = &get_node_disk_ident(node_config, &dbc).await?; + let rows = dbc + .query( + "select rowid, facility, name from channels where facility = $1 order by facility, name", + &[&node_disk_ident.facility], + ) + .await?; + let mut c1 = 0; + dbc.query("begin", &[]).await?; + let mut count_inserted = 0; + let mut count_updated = 0; + for row in rows { + let rowid: i64 = row.try_get(0)?; + let _facility: i64 = row.try_get(1)?; + let channel: String = row.try_get(2)?; + match update_db_with_channel_config( + node_config, + node_disk_ident, + rowid, + &channel, + dbc.clone(), + &mut count_inserted, + &mut count_updated, + ) + .await + { + /*Err(Error::ChannelConfigdirNotFound { .. }) => { + warn!("can not find channel config {}", channel); + crate::delay_io_medium().await; + }*/ + Err(e) => { + error!("{:?}", e); + crate::delay_io_medium().await; + } + _ => { + c1 += 1; + if c1 % 200 == 0 { + dbc.query("commit", &[]).await?; + let msg = format!( + "channel no {:6} inserted {:6} updated {:6}", + c1, count_inserted, count_updated + ); + let ret = UpdatedDbWithAllChannelConfigs { msg, count: c1 }; + tx.send(Ok(ret)).await?; + dbc.query("begin", &[]).await?; + } + crate::delay_io_short().await; + } } - crate::delay_io_short().await; } + dbc.query("commit", &[]).await?; + let msg = format!( + "ALL DONE channel no {:6} inserted {:6} updated {:6}", + c1, count_inserted, count_updated + ); + let ret = UpdatedDbWithAllChannelConfigs { msg, count: c1 }; + tx.send(Ok(ret)).await?; + Ok::<_, Error>(()) } - } - dbc.query("commit", &[]).await?; - let ret = UpdatedDbWithAllChannelConfigs { count: c1 }; - Ok(ret) + .then({ + |item| async move { + match item { + Ok(_) => {} + Err(e) => { + let msg = format!("Seeing error: {:?}", e); + let ret = UpdatedDbWithAllChannelConfigs { msg, count: 0 }; + tx2.send(Ok(ret)).await?; + } + } + Ok::<_, Error>(()) + } + }), + ); + Ok(rx) } pub async fn update_search_cache(node_config: &NodeConfigCached) -> Result<(), Error> { - let dbc = crate::create_connection(node_config).await?; + let dbc = crate::create_connection(&node_config.node_config.cluster.database).await?; dbc.query("select update_cache()", &[]).await?; Ok(()) } @@ -517,7 +581,7 @@ pub async fn update_db_with_all_channel_datafiles( node_disk_ident: &NodeDiskIdent, ks_prefix: &str, ) -> Result<(), Error> { - let dbc = Arc::new(crate::create_connection(node_config).await?); + let dbc = Arc::new(crate::create_connection(&node_config.node_config.cluster.database).await?); let rows = dbc .query( "select rowid, facility, name from channels where facility = $1 order by facility, name", diff --git a/dbconn/src/search.rs b/dbconn/src/search.rs index 3bd2cce..bf0d3ac 100644 --- a/dbconn/src/search.rs +++ b/dbconn/src/search.rs @@ -11,7 +11,7 @@ pub async fn search_channel( "channel_id, channel_name, source_name, dtype, shape, unit, description, channel_backend", " from searchext($1, $2, $3, $4)", )); - let cl = create_connection(node_config).await?; + let cl = create_connection(&node_config.node_config.cluster.database).await?; let rows = cl .query( sql.as_str(), diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index 209d61f..1dcf674 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -443,12 +443,38 @@ pub async fn update_db_with_channel_names( Some(q) => q.contains("dry"), None => false, }; - let res = dbconn::scan::UpdatedDbWithChannelNamesStream::new(node_config.clone())?; - //let res = dbconn::scan::update_db_with_channel_names(node_config).await?; + let res = + dbconn::scan::update_db_with_channel_names(node_config.clone(), &node_config.node_config.cluster.database) + .await?; let ret = response(StatusCode::OK) - .header(http::header::CONTENT_TYPE, "application/json") + .header(http::header::CONTENT_TYPE, "application/jsonlines") .body(Body::wrap_stream(res.map(|k| match serde_json::to_string(&k) { - Ok(item) => Ok(item), + Ok(mut item) => { + item.push('\n'); + Ok(item) + } + Err(e) => Err(e), + })))?; + Ok(ret) +} + +pub async fn update_db_with_channel_names_3( + req: Request, + node_config: &NodeConfigCached, +) -> Result, Error> { + let (head, _body) = req.into_parts(); + let _dry = match head.uri.query() { + Some(q) => q.contains("dry"), + None => false, + }; + let res = dbconn::scan::update_db_with_channel_names_3(node_config); + let ret = response(StatusCode::OK) + .header(http::header::CONTENT_TYPE, "application/jsonlines") + .body(Body::wrap_stream(res.map(|k| match serde_json::to_string(&k) { + Ok(mut item) => { + item.push('\n'); + Ok(item) + } Err(e) => Err(e), })))?; Ok(ret) @@ -463,10 +489,16 @@ pub async fn update_db_with_all_channel_configs( Some(q) => q.contains("dry"), None => false, }; - let res = dbconn::scan::update_db_with_all_channel_configs(node_config).await?; + let res = dbconn::scan::update_db_with_all_channel_configs(node_config.clone()).await?; let ret = response(StatusCode::OK) - .header(http::header::CONTENT_TYPE, "application/json") - .body(Body::from(serde_json::to_string(&res)?))?; + .header(http::header::CONTENT_TYPE, "application/jsonlines") + .body(Body::wrap_stream(res.map(|k| match serde_json::to_string(&k) { + Ok(mut item) => { + item.push('\n'); + Ok(item) + } + Err(e) => Err(e), + })))?; Ok(ret) }