Refactor channel update
This commit is contained in:
@@ -16,6 +16,7 @@ futures-core = "0.3.14"
|
|||||||
futures-util = "0.3.14"
|
futures-util = "0.3.14"
|
||||||
bytes = "1.0.1"
|
bytes = "1.0.1"
|
||||||
bincode = "1.3.3"
|
bincode = "1.3.3"
|
||||||
|
pin-project = "1.0.7"
|
||||||
#async-channel = "1"
|
#async-channel = "1"
|
||||||
#dashmap = "3"
|
#dashmap = "3"
|
||||||
tokio-postgres = { version = "0.7", features = ["runtime", "with-chrono-0_4", "with-serde_json-1"] }
|
tokio-postgres = { version = "0.7", features = ["runtime", "with-chrono-0_4", "with-serde_json-1"] }
|
||||||
|
|||||||
+178
-26
@@ -1,18 +1,22 @@
|
|||||||
|
#![allow(unused_imports)]
|
||||||
use async_channel::{bounded, Receiver};
|
use async_channel::{bounded, Receiver};
|
||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
use err::Error;
|
use err::Error;
|
||||||
use futures_core::Stream;
|
use futures_core::Stream;
|
||||||
use futures_util::FutureExt;
|
use futures_util::{pin_mut, FutureExt};
|
||||||
use netpod::log::*;
|
use netpod::log::*;
|
||||||
use netpod::NodeConfigCached;
|
use netpod::NodeConfigCached;
|
||||||
|
use pin_project::pin_project;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::future::Future;
|
use std::future::Future;
|
||||||
use std::io::ErrorKind;
|
use std::io::ErrorKind;
|
||||||
|
use std::marker::PhantomPinned;
|
||||||
use std::os::unix::ffi::OsStringExt;
|
use std::os::unix::ffi::OsStringExt;
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::sync::{Arc, RwLock};
|
use std::sync::{Arc, RwLock};
|
||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
|
use tokio::fs::{DirEntry, ReadDir};
|
||||||
use tokio_postgres::Client;
|
use tokio_postgres::Client;
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
@@ -54,13 +58,94 @@ pub async fn get_node_disk_ident(node_config: &NodeConfigCached, dbc: &Client) -
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn get_node_disk_ident_2(
|
||||||
|
node_config: Pin<&NodeConfigCached>,
|
||||||
|
dbc: Pin<&Client>,
|
||||||
|
) -> Result<NodeDiskIdent, Error> {
|
||||||
|
let rows = dbc.query("select nodes.rowid, facility, split, hostname from nodes, facilities where facilities.name = $1 and facility = facilities.rowid and hostname = $2", &[&node_config.node.backend, &node_config.node.host]).await?;
|
||||||
|
if rows.len() != 1 {
|
||||||
|
return Err(Error::with_msg(format!(
|
||||||
|
"get_node can't find unique entry for {} {}",
|
||||||
|
node_config.node.host, node_config.node.backend
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
let row = &rows[0];
|
||||||
|
Ok(NodeDiskIdent {
|
||||||
|
rowid: row.get(0),
|
||||||
|
facility: row.get(1),
|
||||||
|
split: row.get(2),
|
||||||
|
hostname: row.get(3),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
#[pin_project]
|
||||||
|
pub struct FindChannelNamesFromConfigReadDir {
|
||||||
|
#[pin]
|
||||||
|
read_dir_fut: Option<Pin<Box<dyn Future<Output = std::io::Result<ReadDir>> + Send>>>,
|
||||||
|
read_dir: Option<ReadDir>,
|
||||||
|
#[pin]
|
||||||
|
dir_entry_fut: Option<Pin<Box<dyn Future<Output = std::io::Result<Option<DirEntry>>> + Send>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FindChannelNamesFromConfigReadDir {
|
||||||
|
pub fn new(base_dir: impl AsRef<Path>) -> Self {
|
||||||
|
Self {
|
||||||
|
read_dir_fut: Some(Box::pin(tokio::fs::read_dir(base_dir.as_ref().join("config")))),
|
||||||
|
read_dir: None,
|
||||||
|
dir_entry_fut: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Stream for FindChannelNamesFromConfigReadDir {
|
||||||
|
type Item = Result<DirEntry, Error>;
|
||||||
|
|
||||||
|
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||||
|
use Poll::*;
|
||||||
|
let mut pself = self.project();
|
||||||
|
loop {
|
||||||
|
break if let Some(fut) = pself.dir_entry_fut.as_mut().as_pin_mut() {
|
||||||
|
match fut.poll(cx) {
|
||||||
|
Ready(Ok(Some(item))) => {
|
||||||
|
let g = unsafe { &mut *(pself.read_dir.as_mut().unwrap() as *mut ReadDir) };
|
||||||
|
let fut = g.next_entry();
|
||||||
|
*pself.dir_entry_fut = Some(Box::pin(fut));
|
||||||
|
Ready(Some(Ok(item)))
|
||||||
|
}
|
||||||
|
Ready(Ok(None)) => {
|
||||||
|
*pself.dir_entry_fut = None;
|
||||||
|
Ready(None)
|
||||||
|
}
|
||||||
|
Ready(Err(e)) => Ready(Some(Err(e.into()))),
|
||||||
|
Pending => Pending,
|
||||||
|
}
|
||||||
|
} else if let Some(fut) = pself.read_dir_fut.as_mut().as_pin_mut() {
|
||||||
|
match fut.poll(cx) {
|
||||||
|
Ready(Ok(item)) => {
|
||||||
|
*pself.read_dir_fut = None;
|
||||||
|
*pself.read_dir = Some(item);
|
||||||
|
//let fut = pself.read_dir.as_mut().unwrap().next_entry();
|
||||||
|
let g = unsafe { &mut *(pself.read_dir.as_mut().unwrap() as *mut ReadDir) };
|
||||||
|
let fut = g.next_entry();
|
||||||
|
*pself.dir_entry_fut = Some(Box::pin(fut));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
Ready(Err(e)) => Ready(Some(Err(e.into()))),
|
||||||
|
Pending => Pending,
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
Pending
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async fn find_channel_names_from_config<F, Fut>(base_dir: impl AsRef<Path>, mut cb: F) -> Result<(), Error>
|
async fn find_channel_names_from_config<F, Fut>(base_dir: impl AsRef<Path>, mut cb: F) -> Result<(), Error>
|
||||||
where
|
where
|
||||||
F: FnMut(&str) -> Fut,
|
F: FnMut(&str) -> Fut,
|
||||||
Fut: Future<Output = Result<(), Error>>,
|
Fut: Future<Output = Result<(), Error>>,
|
||||||
{
|
{
|
||||||
let mut path2: PathBuf = base_dir.as_ref().into();
|
let path2: PathBuf = base_dir.as_ref().join("config");
|
||||||
path2.push("config");
|
|
||||||
let mut rd = tokio::fs::read_dir(&path2).await?;
|
let mut rd = tokio::fs::read_dir(&path2).await?;
|
||||||
while let Ok(Some(entry)) = rd.next_entry().await {
|
while let Ok(Some(entry)) = rd.next_entry().await {
|
||||||
let fname = String::from_utf8(entry.file_name().into_vec())?;
|
let fname = String::from_utf8(entry.file_name().into_vec())?;
|
||||||
@@ -75,21 +160,30 @@ pub struct UpdatedDbWithChannelNames {
|
|||||||
count: u32,
|
count: u32,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[pin_project]
|
||||||
pub struct UpdatedDbWithChannelNamesStream {
|
pub struct UpdatedDbWithChannelNamesStream {
|
||||||
errored: bool,
|
errored: bool,
|
||||||
data_complete: bool,
|
data_complete: bool,
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
node_config: Pin<Box<NodeConfigCached>>,
|
node_config: Pin<Box<NodeConfigCached>>,
|
||||||
|
// TODO can we pass a Pin to the async fn instead of creating static ref?
|
||||||
node_config_ref: &'static NodeConfigCached,
|
node_config_ref: &'static NodeConfigCached,
|
||||||
|
#[pin]
|
||||||
client_fut: Option<Pin<Box<dyn Future<Output = Result<Client, Error>> + Send>>>,
|
client_fut: Option<Pin<Box<dyn Future<Output = Result<Client, Error>> + Send>>>,
|
||||||
client: Option<Pin<Box<Client>>>,
|
#[pin]
|
||||||
|
client: Option<Client>,
|
||||||
client_ref: Option<&'static Client>,
|
client_ref: Option<&'static Client>,
|
||||||
|
#[pin]
|
||||||
ident_fut: Option<Pin<Box<dyn Future<Output = Result<NodeDiskIdent, Error>> + Send>>>,
|
ident_fut: Option<Pin<Box<dyn Future<Output = Result<NodeDiskIdent, Error>> + Send>>>,
|
||||||
ident: Option<NodeDiskIdent>,
|
ident: Option<NodeDiskIdent>,
|
||||||
|
#[pin]
|
||||||
|
find: Option<FindChannelNamesFromConfigReadDir>,
|
||||||
|
#[pin]
|
||||||
|
update_batch: Option<Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>>,
|
||||||
|
channel_inp_done: bool,
|
||||||
|
clist: Vec<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
unsafe impl Send for UpdatedDbWithChannelNamesStream {}
|
|
||||||
|
|
||||||
impl UpdatedDbWithChannelNamesStream {
|
impl UpdatedDbWithChannelNamesStream {
|
||||||
pub fn new(node_config: NodeConfigCached) -> Result<Self, Error> {
|
pub fn new(node_config: NodeConfigCached) -> Result<Self, Error> {
|
||||||
let node_config = Box::pin(node_config.clone());
|
let node_config = Box::pin(node_config.clone());
|
||||||
@@ -104,6 +198,10 @@ impl UpdatedDbWithChannelNamesStream {
|
|||||||
client_ref: None,
|
client_ref: None,
|
||||||
ident_fut: None,
|
ident_fut: None,
|
||||||
ident: None,
|
ident: None,
|
||||||
|
find: None,
|
||||||
|
update_batch: None,
|
||||||
|
channel_inp_done: false,
|
||||||
|
clist: vec![],
|
||||||
};
|
};
|
||||||
ret.client_fut = Some(Box::pin(crate::create_connection(ret.node_config_ref)));
|
ret.client_fut = Some(Box::pin(crate::create_connection(ret.node_config_ref)));
|
||||||
Ok(ret)
|
Ok(ret)
|
||||||
@@ -113,40 +211,80 @@ impl UpdatedDbWithChannelNamesStream {
|
|||||||
impl Stream for UpdatedDbWithChannelNamesStream {
|
impl Stream for UpdatedDbWithChannelNamesStream {
|
||||||
type Item = Result<UpdatedDbWithChannelNames, Error>;
|
type Item = Result<UpdatedDbWithChannelNames, Error>;
|
||||||
|
|
||||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||||
use Poll::*;
|
use Poll::*;
|
||||||
|
let mut pself = self.project();
|
||||||
loop {
|
loop {
|
||||||
break if self.errored {
|
break if *pself.errored {
|
||||||
Ready(None)
|
Ready(None)
|
||||||
} else if self.data_complete {
|
} else if *pself.data_complete {
|
||||||
Ready(None)
|
Ready(None)
|
||||||
} else if let Some(fut) = &mut self.ident_fut {
|
} else if let Some(fut) = pself.find.as_mut().as_pin_mut() {
|
||||||
match fut.poll_unpin(cx) {
|
match fut.poll_next(cx) {
|
||||||
|
Ready(Some(Ok(item))) => {
|
||||||
|
pself
|
||||||
|
.clist
|
||||||
|
.push(String::from_utf8(item.file_name().into_vec()).unwrap());
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
Ready(Some(Err(e))) => {
|
||||||
|
*pself.errored = true;
|
||||||
|
Ready(Some(Err(e)))
|
||||||
|
}
|
||||||
|
Ready(None) => {
|
||||||
|
*pself.channel_inp_done = true;
|
||||||
|
// Work through the collected items
|
||||||
|
let l = std::mem::replace(pself.clist, vec![]);
|
||||||
|
let fut = update_db_with_channel_name_list(
|
||||||
|
l,
|
||||||
|
pself.ident.as_ref().unwrap().facility,
|
||||||
|
pself.client.as_ref().get_ref().as_ref().unwrap(),
|
||||||
|
);
|
||||||
|
// TODO
|
||||||
|
//pself.update_batch.replace(Box::pin(fut));
|
||||||
|
let _ = fut;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
Pending => Pending,
|
||||||
|
}
|
||||||
|
} else if let Some(fut) = pself.ident_fut.as_mut().as_pin_mut() {
|
||||||
|
match fut.poll(cx) {
|
||||||
Ready(Ok(item)) => {
|
Ready(Ok(item)) => {
|
||||||
self.ident = Some(item);
|
*pself.ident_fut = None;
|
||||||
self.ident_fut = None;
|
*pself.ident = Some(item);
|
||||||
let ret = UpdatedDbWithChannelNames {
|
let ret = UpdatedDbWithChannelNames {
|
||||||
msg: format!("ALL DONE"),
|
msg: format!("Got ident {:?}", pself.ident),
|
||||||
count: 42,
|
count: 43,
|
||||||
};
|
};
|
||||||
self.data_complete = true;
|
let s = FindChannelNamesFromConfigReadDir::new(&pself.node_config.node.data_base_path);
|
||||||
|
*pself.find = Some(s);
|
||||||
Ready(Some(Ok(ret)))
|
Ready(Some(Ok(ret)))
|
||||||
}
|
}
|
||||||
Ready(Err(e)) => {
|
Ready(Err(e)) => {
|
||||||
self.errored = true;
|
*pself.errored = true;
|
||||||
Ready(Some(Err(e)))
|
Ready(Some(Err(e)))
|
||||||
}
|
}
|
||||||
Pending => Pending,
|
Pending => Pending,
|
||||||
}
|
}
|
||||||
} else if let Some(fut) = &mut self.client_fut {
|
} else if let Some(fut) = pself.client_fut.as_mut().as_pin_mut() {
|
||||||
match fut.poll_unpin(cx) {
|
match fut.poll(cx) {
|
||||||
Ready(Ok(item)) => {
|
Ready(Ok(item)) => {
|
||||||
self.client_fut = None;
|
*pself.client_fut = None;
|
||||||
self.client = Some(Box::pin(item));
|
//*pself.client = Some(Box::pin(item));
|
||||||
self.client_ref = Some(unsafe { &*(&self.client.as_ref().unwrap() as &Client as *const _) });
|
//*pself.client_ref = Some(unsafe { &*(&pself.client.as_ref().unwrap() as &Client as *const _) });
|
||||||
self.ident_fut = Some(Box::pin(get_node_disk_ident(
|
*pself.client = Some(item);
|
||||||
self.node_config_ref,
|
let c2: &Client = pself.client.as_ref().get_ref().as_ref().unwrap();
|
||||||
self.client_ref.as_ref().unwrap(),
|
*pself.client_ref = Some(unsafe { &*(c2 as *const _) });
|
||||||
|
|
||||||
|
//() == pself.node_config.as_ref();
|
||||||
|
//() == pself.client.as_ref().as_pin_ref().unwrap();
|
||||||
|
/* *pself.ident_fut = Some(Box::pin(get_node_disk_ident_2(
|
||||||
|
pself.node_config.as_ref(),
|
||||||
|
pself.client.as_ref().as_pin_ref().unwrap(),
|
||||||
|
)));*/
|
||||||
|
*pself.ident_fut = Some(Box::pin(get_node_disk_ident(
|
||||||
|
pself.node_config_ref,
|
||||||
|
pself.client_ref.as_ref().unwrap(),
|
||||||
)));
|
)));
|
||||||
let ret = UpdatedDbWithChannelNames {
|
let ret = UpdatedDbWithChannelNames {
|
||||||
msg: format!("Client opened connection"),
|
msg: format!("Client opened connection"),
|
||||||
@@ -155,7 +293,7 @@ impl Stream for UpdatedDbWithChannelNamesStream {
|
|||||||
Ready(Some(Ok(ret)))
|
Ready(Some(Ok(ret)))
|
||||||
}
|
}
|
||||||
Ready(Err(e)) => {
|
Ready(Err(e)) => {
|
||||||
self.errored = true;
|
*pself.errored = true;
|
||||||
Ready(Some(Err(e)))
|
Ready(Some(Err(e)))
|
||||||
}
|
}
|
||||||
Pending => Pending,
|
Pending => Pending,
|
||||||
@@ -167,6 +305,20 @@ impl Stream for UpdatedDbWithChannelNamesStream {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn update_db_with_channel_name_list(list: Vec<String>, backend: i64, dbc: &Client) -> Result<(), Error> {
|
||||||
|
crate::delay_io_short().await;
|
||||||
|
dbc.query("begin", &[]).await?;
|
||||||
|
for ch in list {
|
||||||
|
dbc.query(
|
||||||
|
"insert into channels (facility, name) values ($1, $2) on conflict do nothing",
|
||||||
|
&[&backend, &ch],
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
dbc.query("commit", &[]).await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn update_db_with_channel_names(
|
pub async fn update_db_with_channel_names(
|
||||||
node_config: &NodeConfigCached,
|
node_config: &NodeConfigCached,
|
||||||
) -> Result<Receiver<Result<UpdatedDbWithChannelNames, Error>>, Error> {
|
) -> Result<Receiver<Result<UpdatedDbWithChannelNames, Error>>, Error> {
|
||||||
|
|||||||
Reference in New Issue
Block a user