diff --git a/dbconn/Cargo.toml b/dbconn/Cargo.toml index d7def8d..644a8e3 100644 --- a/dbconn/Cargo.toml +++ b/dbconn/Cargo.toml @@ -19,6 +19,7 @@ bincode = "1.3.3" #async-channel = "1" #dashmap = "3" tokio-postgres = { version = "0.7", features = ["runtime", "with-chrono-0_4", "with-serde_json-1"] } +async-channel = "1.6" chrono = "0.4" regex = "1.5.4" err = { path = "../err" } diff --git a/dbconn/src/lib.rs b/dbconn/src/lib.rs index 4f2b26d..c58a55a 100644 --- a/dbconn/src/lib.rs +++ b/dbconn/src/lib.rs @@ -33,6 +33,20 @@ pub async fn create_connection(node_config: &NodeConfigCached) -> Result Result { + let d = &node_config.node_config.cluster.database; + let uri = format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, 5432, d.name); + let (cl, conn) = tokio_postgres::connect(&uri, NoTls).await?; + // TODO monitor connection drop. + let _cjh = tokio::spawn(async move { + if let Err(e) = conn.await { + error!("connection error: {}", e); + } + Ok::<_, Error>(()) + }); + Ok(cl) +} + pub async fn channel_exists(channel: &Channel, node_config: &NodeConfigCached) -> Result { let cl = create_connection(node_config).await?; let rows = cl diff --git a/dbconn/src/scan.rs b/dbconn/src/scan.rs index 12571d5..5259f21 100644 --- a/dbconn/src/scan.rs +++ b/dbconn/src/scan.rs @@ -1,15 +1,21 @@ +use async_channel::{bounded, Receiver, Sender}; use chrono::{DateTime, Utc}; use err::Error; +use futures_core::Stream; +use futures_util::FutureExt; use netpod::log::*; use netpod::NodeConfigCached; use serde::{Deserialize, Serialize}; use std::future::Future; use std::io::ErrorKind; +use std::marker::PhantomPinned; use std::os::unix::ffi::OsStringExt; use std::path::{Path, PathBuf}; use std::pin::Pin; +use std::ptr::NonNull; +use std::sync::{Arc, RwLock}; +use std::task::{Context, Poll}; use tokio_postgres::Client; -use std::sync::{RwLock, Arc}; #[derive(Debug, Serialize, Deserialize)] pub struct NodeDiskIdent { @@ -67,34 +73,124 @@ where #[derive(Debug, Serialize, Deserialize)] pub struct UpdatedDbWithChannelNames { + msg: String, count: u32, } -pub async fn update_db_with_channel_names(node_config: &NodeConfigCached) -> Result { - let dbc = crate::create_connection(node_config).await?; - let node_ident = get_node_disk_ident(node_config, &dbc).await?; - let c1 = Arc::new(RwLock::new(0u32)); - let rows = dbc - .query("select rowid from facilities where name = $1", &[&node_ident.facility]) - .await?; - if rows.len() != 1 { - return Err(Error::with_msg(format!( - "can not find facility {}", - node_ident.facility - ))); +pub struct UpdatedDbWithChannelNamesStream { + errored: bool, + data_complete: bool, + node_config: Pin>, + node_config_ptr: NonNull, + //_pin: PhantomPinned, + client_fut: Option> + Send>>>, + client: Option>>, + client_ptr: *const Client, + ident_fut: Option> + Send>>>, + ident: Option, +} + +unsafe impl Send for UpdatedDbWithChannelNamesStream {} + +impl UpdatedDbWithChannelNamesStream { + pub fn new(node_config: NodeConfigCached) -> Result { + let node_config = Box::pin(node_config.clone()); + let mut ret = Self { + errored: false, + data_complete: false, + node_config_ptr: NonNull::dangling(), + node_config, + //_pin: PhantomPinned, + client_fut: None, + client: None, + client_ptr: std::ptr::null(), + ident_fut: None, + ident: None, + }; + ret.node_config_ptr = NonNull::from(&*ret.node_config); + ret.client_fut = Some(Box::pin(crate::create_connection(unsafe { + &*ret.node_config_ptr.as_ptr() + }))); + Ok(ret) } - let facility: i64 = rows[0].try_get(0)?; +} + +impl Stream for UpdatedDbWithChannelNamesStream { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + 'outer: loop { + break if self.errored { + Ready(None) + } else if self.data_complete { + Ready(None) + } else if let Some(fut) = &mut self.ident_fut { + match fut.poll_unpin(cx) { + Ready(Ok(item)) => { + self.ident = Some(item); + self.ident_fut = None; + let ret = UpdatedDbWithChannelNames { + msg: format!("ALL DONE"), + count: 42, + }; + self.data_complete = true; + Ready(Some(Ok(ret))) + } + Ready(Err(e)) => { + self.errored = true; + Ready(Some(Err(e))) + } + Pending => Pending, + } + } else if let Some(fut) = &mut self.client_fut { + match fut.poll_unpin(cx) { + Ready(Ok(item)) => { + self.client_fut = None; + self.client = Some(Box::pin(item)); + self.client_ptr = self.client.as_ref().unwrap() as &Client as *const _; + let p1 = unsafe { &*self.client_ptr }; + self.ident_fut = Some(Box::pin(get_node_disk_ident( + unsafe { &*self.node_config_ptr.as_ptr() }, + &p1, + ))); + let ret = UpdatedDbWithChannelNames { + msg: format!("Client opened connection"), + count: 42, + }; + Ready(Some(Ok(ret))) + } + Ready(Err(e)) => { + self.errored = true; + Ready(Some(Err(e))) + } + Pending => Pending, + } + } else { + Ready(None) + }; + } + } +} + +pub async fn update_db_with_channel_names( + node_config: &NodeConfigCached, +) -> Result>, Error> { + let dbc = crate::create_connection(node_config).await?; + let node_disk_ident = get_node_disk_ident(node_config, &dbc).await?; + let c1 = Arc::new(RwLock::new(0u32)); dbc.query("begin", &[]).await?; let dbc = Arc::new(dbc); find_channel_names_from_config(&node_config.node.data_base_path, |ch| { let ch = ch.to_owned(); let dbc = dbc.clone(); let c1 = c1.clone(); + let fac = node_disk_ident.facility; async move { crate::delay_io_short().await; dbc.query( "insert into channels (facility, name) values ($1, $2) on conflict do nothing", - &[&facility, &ch], + &[&fac, &ch], ) .await?; let c2 = { @@ -114,9 +210,10 @@ pub async fn update_db_with_channel_names(node_config: &NodeConfigCached) -> Res .await?; dbc.query("commit", &[]).await?; let ret = UpdatedDbWithChannelNames { + msg: format!("done"), count: *c1.read()?, }; - Ok(ret) + Ok(bounded(16).1) } #[derive(Debug, Serialize, Deserialize)] @@ -124,7 +221,9 @@ pub struct UpdatedDbWithAllChannelConfigs { count: u32, } -pub async fn update_db_with_all_channel_configs(node_config: &NodeConfigCached) -> Result { +pub async fn update_db_with_all_channel_configs( + node_config: &NodeConfigCached, +) -> Result { let dbc = crate::create_connection(node_config).await?; let dbc = Arc::new(dbc); let node_disk_ident = &get_node_disk_ident(node_config, &dbc).await?; @@ -178,9 +277,7 @@ pub async fn update_db_with_all_channel_configs(node_config: &NodeConfigCached) } } dbc.query("commit", &[]).await?; - let ret = UpdatedDbWithAllChannelConfigs { - count: c1, - }; + let ret = UpdatedDbWithAllChannelConfigs { count: c1 }; Ok(ret) } diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index 6f88c3c..f9448b9 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -171,6 +171,12 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } + } else if path == "/api/4/update_search_cache" { + if req.method() == Method::GET { + Ok(update_search_cache(req, &node_config).await?) + } else { + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) + } } else if path.starts_with("/api/4/documentation/") { if req.method() == Method::GET { static_http!(path, "", "index.html", "text/html"); @@ -387,25 +393,35 @@ pub async fn clear_cache_all(req: Request, node_config: &NodeConfigCached) }; let res = disk::cache::clear_cache_all(node_config, dry).await?; let ret = response(StatusCode::OK) - .header(http::header::CONTENT_TYPE, "application/json") - .body(Body::from(serde_json::to_string(&res)?))?; + .header(http::header::CONTENT_TYPE, "application/json") + .body(Body::from(serde_json::to_string(&res)?))?; Ok(ret) } -pub async fn update_db_with_channel_names(req: Request, node_config: &NodeConfigCached) -> Result, Error> { +pub async fn update_db_with_channel_names( + req: Request, + node_config: &NodeConfigCached, +) -> Result, Error> { let (head, _body) = req.into_parts(); let _dry = match head.uri.query() { Some(q) => q.contains("dry"), None => false, }; - let res = dbconn::scan::update_db_with_channel_names(node_config).await?; + let res = dbconn::scan::UpdatedDbWithChannelNamesStream::new(node_config.clone())?; + //let res = dbconn::scan::update_db_with_channel_names(node_config).await?; let ret = response(StatusCode::OK) - .header(http::header::CONTENT_TYPE, "application/json") - .body(Body::from(serde_json::to_string(&res)?))?; + .header(http::header::CONTENT_TYPE, "application/json") + .body(Body::wrap_stream(res.map(|k| match serde_json::to_string(&k) { + Ok(item) => Ok(item), + Err(e) => Err(e), + })))?; Ok(ret) } -pub async fn update_db_with_all_channel_configs(req: Request, node_config: &NodeConfigCached) -> Result, Error> { +pub async fn update_db_with_all_channel_configs( + req: Request, + node_config: &NodeConfigCached, +) -> Result, Error> { let (head, _body) = req.into_parts(); let _dry = match head.uri.query() { Some(q) => q.contains("dry"), @@ -417,3 +433,16 @@ pub async fn update_db_with_all_channel_configs(req: Request, node_config: .body(Body::from(serde_json::to_string(&res)?))?; Ok(ret) } + +pub async fn update_search_cache(req: Request, node_config: &NodeConfigCached) -> Result, Error> { + let (head, _body) = req.into_parts(); + let _dry = match head.uri.query() { + Some(q) => q.contains("dry"), + None => false, + }; + let res = dbconn::scan::update_search_cache(node_config).await?; + let ret = response(StatusCode::OK) + .header(http::header::CONTENT_TYPE, "application/json") + .body(Body::from(serde_json::to_string(&res)?))?; + Ok(ret) +}