diff --git a/dbconn/Cargo.toml b/dbconn/Cargo.toml index 644a8e3..e48e23c 100644 --- a/dbconn/Cargo.toml +++ b/dbconn/Cargo.toml @@ -16,6 +16,7 @@ futures-core = "0.3.14" futures-util = "0.3.14" bytes = "1.0.1" bincode = "1.3.3" +pin-project = "1.0.7" #async-channel = "1" #dashmap = "3" tokio-postgres = { version = "0.7", features = ["runtime", "with-chrono-0_4", "with-serde_json-1"] } diff --git a/dbconn/src/scan.rs b/dbconn/src/scan.rs index eef9d88..fe9b743 100644 --- a/dbconn/src/scan.rs +++ b/dbconn/src/scan.rs @@ -1,18 +1,22 @@ +#![allow(unused_imports)] use async_channel::{bounded, Receiver}; use chrono::{DateTime, Utc}; use err::Error; use futures_core::Stream; -use futures_util::FutureExt; +use futures_util::{pin_mut, FutureExt}; use netpod::log::*; use netpod::NodeConfigCached; +use pin_project::pin_project; use serde::{Deserialize, Serialize}; use std::future::Future; use std::io::ErrorKind; +use std::marker::PhantomPinned; use std::os::unix::ffi::OsStringExt; use std::path::{Path, PathBuf}; use std::pin::Pin; use std::sync::{Arc, RwLock}; use std::task::{Context, Poll}; +use tokio::fs::{DirEntry, ReadDir}; use tokio_postgres::Client; #[derive(Debug, Serialize, Deserialize)] @@ -54,13 +58,94 @@ pub async fn get_node_disk_ident(node_config: &NodeConfigCached, dbc: &Client) - }) } +pub async fn get_node_disk_ident_2( + node_config: Pin<&NodeConfigCached>, + dbc: Pin<&Client>, +) -> Result { + let rows = dbc.query("select nodes.rowid, facility, split, hostname from nodes, facilities where facilities.name = $1 and facility = facilities.rowid and hostname = $2", &[&node_config.node.backend, &node_config.node.host]).await?; + if rows.len() != 1 { + return Err(Error::with_msg(format!( + "get_node can't find unique entry for {} {}", + node_config.node.host, node_config.node.backend + ))); + } + let row = &rows[0]; + Ok(NodeDiskIdent { + rowid: row.get(0), + facility: row.get(1), + split: row.get(2), + hostname: row.get(3), + }) +} + +#[pin_project] +pub struct FindChannelNamesFromConfigReadDir { + #[pin] + read_dir_fut: Option> + Send>>>, + read_dir: Option, + #[pin] + dir_entry_fut: Option>> + Send>>>, +} + +impl FindChannelNamesFromConfigReadDir { + pub fn new(base_dir: impl AsRef) -> Self { + Self { + read_dir_fut: Some(Box::pin(tokio::fs::read_dir(base_dir.as_ref().join("config")))), + read_dir: None, + dir_entry_fut: None, + } + } +} + +impl Stream for FindChannelNamesFromConfigReadDir { + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + let mut pself = self.project(); + loop { + break if let Some(fut) = pself.dir_entry_fut.as_mut().as_pin_mut() { + match fut.poll(cx) { + Ready(Ok(Some(item))) => { + let g = unsafe { &mut *(pself.read_dir.as_mut().unwrap() as *mut ReadDir) }; + let fut = g.next_entry(); + *pself.dir_entry_fut = Some(Box::pin(fut)); + Ready(Some(Ok(item))) + } + Ready(Ok(None)) => { + *pself.dir_entry_fut = None; + Ready(None) + } + Ready(Err(e)) => Ready(Some(Err(e.into()))), + Pending => Pending, + } + } else if let Some(fut) = pself.read_dir_fut.as_mut().as_pin_mut() { + match fut.poll(cx) { + Ready(Ok(item)) => { + *pself.read_dir_fut = None; + *pself.read_dir = Some(item); + //let fut = pself.read_dir.as_mut().unwrap().next_entry(); + let g = unsafe { &mut *(pself.read_dir.as_mut().unwrap() as *mut ReadDir) }; + let fut = g.next_entry(); + *pself.dir_entry_fut = Some(Box::pin(fut)); + continue; + } + Ready(Err(e)) => Ready(Some(Err(e.into()))), + Pending => Pending, + } + } else { + Pending + }; + } + } +} + async fn find_channel_names_from_config(base_dir: impl AsRef, mut cb: F) -> Result<(), Error> where F: FnMut(&str) -> Fut, Fut: Future>, { - let mut path2: PathBuf = base_dir.as_ref().into(); - path2.push("config"); + let path2: PathBuf = base_dir.as_ref().join("config"); let mut rd = tokio::fs::read_dir(&path2).await?; while let Ok(Some(entry)) = rd.next_entry().await { let fname = String::from_utf8(entry.file_name().into_vec())?; @@ -75,21 +160,30 @@ pub struct UpdatedDbWithChannelNames { count: u32, } +#[pin_project] pub struct UpdatedDbWithChannelNamesStream { errored: bool, data_complete: bool, #[allow(dead_code)] node_config: Pin>, + // TODO can we pass a Pin to the async fn instead of creating static ref? node_config_ref: &'static NodeConfigCached, + #[pin] client_fut: Option> + Send>>>, - client: Option>>, + #[pin] + client: Option, client_ref: Option<&'static Client>, + #[pin] ident_fut: Option> + Send>>>, ident: Option, + #[pin] + find: Option, + #[pin] + update_batch: Option> + Send>>>, + channel_inp_done: bool, + clist: Vec, } -unsafe impl Send for UpdatedDbWithChannelNamesStream {} - impl UpdatedDbWithChannelNamesStream { pub fn new(node_config: NodeConfigCached) -> Result { let node_config = Box::pin(node_config.clone()); @@ -104,6 +198,10 @@ impl UpdatedDbWithChannelNamesStream { client_ref: None, ident_fut: None, ident: None, + find: None, + update_batch: None, + channel_inp_done: false, + clist: vec![], }; ret.client_fut = Some(Box::pin(crate::create_connection(ret.node_config_ref))); Ok(ret) @@ -113,40 +211,80 @@ impl UpdatedDbWithChannelNamesStream { impl Stream for UpdatedDbWithChannelNamesStream { type Item = Result; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; + let mut pself = self.project(); loop { - break if self.errored { + break if *pself.errored { Ready(None) - } else if self.data_complete { + } else if *pself.data_complete { Ready(None) - } else if let Some(fut) = &mut self.ident_fut { - match fut.poll_unpin(cx) { + } else if let Some(fut) = pself.find.as_mut().as_pin_mut() { + match fut.poll_next(cx) { + Ready(Some(Ok(item))) => { + pself + .clist + .push(String::from_utf8(item.file_name().into_vec()).unwrap()); + continue; + } + Ready(Some(Err(e))) => { + *pself.errored = true; + Ready(Some(Err(e))) + } + Ready(None) => { + *pself.channel_inp_done = true; + // Work through the collected items + let l = std::mem::replace(pself.clist, vec![]); + let fut = update_db_with_channel_name_list( + l, + pself.ident.as_ref().unwrap().facility, + pself.client.as_ref().get_ref().as_ref().unwrap(), + ); + // TODO + //pself.update_batch.replace(Box::pin(fut)); + let _ = fut; + continue; + } + Pending => Pending, + } + } else if let Some(fut) = pself.ident_fut.as_mut().as_pin_mut() { + match fut.poll(cx) { Ready(Ok(item)) => { - self.ident = Some(item); - self.ident_fut = None; + *pself.ident_fut = None; + *pself.ident = Some(item); let ret = UpdatedDbWithChannelNames { - msg: format!("ALL DONE"), - count: 42, + msg: format!("Got ident {:?}", pself.ident), + count: 43, }; - self.data_complete = true; + let s = FindChannelNamesFromConfigReadDir::new(&pself.node_config.node.data_base_path); + *pself.find = Some(s); Ready(Some(Ok(ret))) } Ready(Err(e)) => { - self.errored = true; + *pself.errored = true; Ready(Some(Err(e))) } Pending => Pending, } - } else if let Some(fut) = &mut self.client_fut { - match fut.poll_unpin(cx) { + } else if let Some(fut) = pself.client_fut.as_mut().as_pin_mut() { + match fut.poll(cx) { Ready(Ok(item)) => { - self.client_fut = None; - self.client = Some(Box::pin(item)); - self.client_ref = Some(unsafe { &*(&self.client.as_ref().unwrap() as &Client as *const _) }); - self.ident_fut = Some(Box::pin(get_node_disk_ident( - self.node_config_ref, - self.client_ref.as_ref().unwrap(), + *pself.client_fut = None; + //*pself.client = Some(Box::pin(item)); + //*pself.client_ref = Some(unsafe { &*(&pself.client.as_ref().unwrap() as &Client as *const _) }); + *pself.client = Some(item); + let c2: &Client = pself.client.as_ref().get_ref().as_ref().unwrap(); + *pself.client_ref = Some(unsafe { &*(c2 as *const _) }); + + //() == pself.node_config.as_ref(); + //() == pself.client.as_ref().as_pin_ref().unwrap(); + /* *pself.ident_fut = Some(Box::pin(get_node_disk_ident_2( + pself.node_config.as_ref(), + pself.client.as_ref().as_pin_ref().unwrap(), + )));*/ + *pself.ident_fut = Some(Box::pin(get_node_disk_ident( + pself.node_config_ref, + pself.client_ref.as_ref().unwrap(), ))); let ret = UpdatedDbWithChannelNames { msg: format!("Client opened connection"), @@ -155,7 +293,7 @@ impl Stream for UpdatedDbWithChannelNamesStream { Ready(Some(Ok(ret))) } Ready(Err(e)) => { - self.errored = true; + *pself.errored = true; Ready(Some(Err(e))) } Pending => Pending, @@ -167,6 +305,20 @@ impl Stream for UpdatedDbWithChannelNamesStream { } } +async fn update_db_with_channel_name_list(list: Vec, backend: i64, dbc: &Client) -> Result<(), Error> { + crate::delay_io_short().await; + dbc.query("begin", &[]).await?; + for ch in list { + dbc.query( + "insert into channels (facility, name) values ($1, $2) on conflict do nothing", + &[&backend, &ch], + ) + .await?; + } + dbc.query("commit", &[]).await?; + Ok(()) +} + pub async fn update_db_with_channel_names( node_config: &NodeConfigCached, ) -> Result>, Error> {