Stream type for channel list

This commit is contained in:
Dominik Werder
2021-05-31 22:24:53 +02:00
parent f165530263
commit 3d17fea919
4 changed files with 168 additions and 27 deletions
+1
View File
@@ -19,6 +19,7 @@ bincode = "1.3.3"
#async-channel = "1" #async-channel = "1"
#dashmap = "3" #dashmap = "3"
tokio-postgres = { version = "0.7", features = ["runtime", "with-chrono-0_4", "with-serde_json-1"] } tokio-postgres = { version = "0.7", features = ["runtime", "with-chrono-0_4", "with-serde_json-1"] }
async-channel = "1.6"
chrono = "0.4" chrono = "0.4"
regex = "1.5.4" regex = "1.5.4"
err = { path = "../err" } err = { path = "../err" }
+14
View File
@@ -33,6 +33,20 @@ pub async fn create_connection(node_config: &NodeConfigCached) -> Result<Client,
Ok(cl) Ok(cl)
} }
pub async fn create_connection_2(node_config: NodeConfigCached) -> Result<Client, Error> {
let d = &node_config.node_config.cluster.database;
let uri = format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, 5432, d.name);
let (cl, conn) = tokio_postgres::connect(&uri, NoTls).await?;
// TODO monitor connection drop.
let _cjh = tokio::spawn(async move {
if let Err(e) = conn.await {
error!("connection error: {}", e);
}
Ok::<_, Error>(())
});
Ok(cl)
}
pub async fn channel_exists(channel: &Channel, node_config: &NodeConfigCached) -> Result<bool, Error> { pub async fn channel_exists(channel: &Channel, node_config: &NodeConfigCached) -> Result<bool, Error> {
let cl = create_connection(node_config).await?; let cl = create_connection(node_config).await?;
let rows = cl let rows = cl
+117 -20
View File
@@ -1,15 +1,21 @@
use async_channel::{bounded, Receiver, Sender};
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use err::Error; use err::Error;
use futures_core::Stream;
use futures_util::FutureExt;
use netpod::log::*; use netpod::log::*;
use netpod::NodeConfigCached; use netpod::NodeConfigCached;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::future::Future; use std::future::Future;
use std::io::ErrorKind; use std::io::ErrorKind;
use std::marker::PhantomPinned;
use std::os::unix::ffi::OsStringExt; use std::os::unix::ffi::OsStringExt;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::pin::Pin; use std::pin::Pin;
use std::ptr::NonNull;
use std::sync::{Arc, RwLock};
use std::task::{Context, Poll};
use tokio_postgres::Client; use tokio_postgres::Client;
use std::sync::{RwLock, Arc};
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub struct NodeDiskIdent { pub struct NodeDiskIdent {
@@ -67,34 +73,124 @@ where
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub struct UpdatedDbWithChannelNames { pub struct UpdatedDbWithChannelNames {
msg: String,
count: u32, count: u32,
} }
pub async fn update_db_with_channel_names(node_config: &NodeConfigCached) -> Result<UpdatedDbWithChannelNames, Error> { pub struct UpdatedDbWithChannelNamesStream {
let dbc = crate::create_connection(node_config).await?; errored: bool,
let node_ident = get_node_disk_ident(node_config, &dbc).await?; data_complete: bool,
let c1 = Arc::new(RwLock::new(0u32)); node_config: Pin<Box<NodeConfigCached>>,
let rows = dbc node_config_ptr: NonNull<NodeConfigCached>,
.query("select rowid from facilities where name = $1", &[&node_ident.facility]) //_pin: PhantomPinned,
.await?; client_fut: Option<Pin<Box<dyn Future<Output = Result<Client, Error>> + Send>>>,
if rows.len() != 1 { client: Option<Pin<Box<Client>>>,
return Err(Error::with_msg(format!( client_ptr: *const Client,
"can not find facility {}", ident_fut: Option<Pin<Box<dyn Future<Output = Result<NodeDiskIdent, Error>> + Send>>>,
node_ident.facility ident: Option<NodeDiskIdent>,
))); }
unsafe impl Send for UpdatedDbWithChannelNamesStream {}
impl UpdatedDbWithChannelNamesStream {
pub fn new(node_config: NodeConfigCached) -> Result<Self, Error> {
let node_config = Box::pin(node_config.clone());
let mut ret = Self {
errored: false,
data_complete: false,
node_config_ptr: NonNull::dangling(),
node_config,
//_pin: PhantomPinned,
client_fut: None,
client: None,
client_ptr: std::ptr::null(),
ident_fut: None,
ident: None,
};
ret.node_config_ptr = NonNull::from(&*ret.node_config);
ret.client_fut = Some(Box::pin(crate::create_connection(unsafe {
&*ret.node_config_ptr.as_ptr()
})));
Ok(ret)
} }
let facility: i64 = rows[0].try_get(0)?; }
impl Stream for UpdatedDbWithChannelNamesStream {
type Item = Result<UpdatedDbWithChannelNames, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
'outer: loop {
break if self.errored {
Ready(None)
} else if self.data_complete {
Ready(None)
} else if let Some(fut) = &mut self.ident_fut {
match fut.poll_unpin(cx) {
Ready(Ok(item)) => {
self.ident = Some(item);
self.ident_fut = None;
let ret = UpdatedDbWithChannelNames {
msg: format!("ALL DONE"),
count: 42,
};
self.data_complete = true;
Ready(Some(Ok(ret)))
}
Ready(Err(e)) => {
self.errored = true;
Ready(Some(Err(e)))
}
Pending => Pending,
}
} else if let Some(fut) = &mut self.client_fut {
match fut.poll_unpin(cx) {
Ready(Ok(item)) => {
self.client_fut = None;
self.client = Some(Box::pin(item));
self.client_ptr = self.client.as_ref().unwrap() as &Client as *const _;
let p1 = unsafe { &*self.client_ptr };
self.ident_fut = Some(Box::pin(get_node_disk_ident(
unsafe { &*self.node_config_ptr.as_ptr() },
&p1,
)));
let ret = UpdatedDbWithChannelNames {
msg: format!("Client opened connection"),
count: 42,
};
Ready(Some(Ok(ret)))
}
Ready(Err(e)) => {
self.errored = true;
Ready(Some(Err(e)))
}
Pending => Pending,
}
} else {
Ready(None)
};
}
}
}
pub async fn update_db_with_channel_names(
node_config: &NodeConfigCached,
) -> Result<Receiver<Result<UpdatedDbWithChannelNames, Error>>, Error> {
let dbc = crate::create_connection(node_config).await?;
let node_disk_ident = get_node_disk_ident(node_config, &dbc).await?;
let c1 = Arc::new(RwLock::new(0u32));
dbc.query("begin", &[]).await?; dbc.query("begin", &[]).await?;
let dbc = Arc::new(dbc); let dbc = Arc::new(dbc);
find_channel_names_from_config(&node_config.node.data_base_path, |ch| { find_channel_names_from_config(&node_config.node.data_base_path, |ch| {
let ch = ch.to_owned(); let ch = ch.to_owned();
let dbc = dbc.clone(); let dbc = dbc.clone();
let c1 = c1.clone(); let c1 = c1.clone();
let fac = node_disk_ident.facility;
async move { async move {
crate::delay_io_short().await; crate::delay_io_short().await;
dbc.query( dbc.query(
"insert into channels (facility, name) values ($1, $2) on conflict do nothing", "insert into channels (facility, name) values ($1, $2) on conflict do nothing",
&[&facility, &ch], &[&fac, &ch],
) )
.await?; .await?;
let c2 = { let c2 = {
@@ -114,9 +210,10 @@ pub async fn update_db_with_channel_names(node_config: &NodeConfigCached) -> Res
.await?; .await?;
dbc.query("commit", &[]).await?; dbc.query("commit", &[]).await?;
let ret = UpdatedDbWithChannelNames { let ret = UpdatedDbWithChannelNames {
msg: format!("done"),
count: *c1.read()?, count: *c1.read()?,
}; };
Ok(ret) Ok(bounded(16).1)
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
@@ -124,7 +221,9 @@ pub struct UpdatedDbWithAllChannelConfigs {
count: u32, count: u32,
} }
pub async fn update_db_with_all_channel_configs(node_config: &NodeConfigCached) -> Result<UpdatedDbWithAllChannelConfigs, Error> { pub async fn update_db_with_all_channel_configs(
node_config: &NodeConfigCached,
) -> Result<UpdatedDbWithAllChannelConfigs, Error> {
let dbc = crate::create_connection(node_config).await?; let dbc = crate::create_connection(node_config).await?;
let dbc = Arc::new(dbc); let dbc = Arc::new(dbc);
let node_disk_ident = &get_node_disk_ident(node_config, &dbc).await?; let node_disk_ident = &get_node_disk_ident(node_config, &dbc).await?;
@@ -178,9 +277,7 @@ pub async fn update_db_with_all_channel_configs(node_config: &NodeConfigCached)
} }
} }
dbc.query("commit", &[]).await?; dbc.query("commit", &[]).await?;
let ret = UpdatedDbWithAllChannelConfigs { let ret = UpdatedDbWithAllChannelConfigs { count: c1 };
count: c1,
};
Ok(ret) Ok(ret)
} }
+36 -7
View File
@@ -171,6 +171,12 @@ async fn http_service_try(req: Request<Body>, node_config: &NodeConfigCached) ->
} else { } else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
} }
} else if path == "/api/4/update_search_cache" {
if req.method() == Method::GET {
Ok(update_search_cache(req, &node_config).await?)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
} else if path.starts_with("/api/4/documentation/") { } else if path.starts_with("/api/4/documentation/") {
if req.method() == Method::GET { if req.method() == Method::GET {
static_http!(path, "", "index.html", "text/html"); static_http!(path, "", "index.html", "text/html");
@@ -387,25 +393,35 @@ pub async fn clear_cache_all(req: Request<Body>, node_config: &NodeConfigCached)
}; };
let res = disk::cache::clear_cache_all(node_config, dry).await?; let res = disk::cache::clear_cache_all(node_config, dry).await?;
let ret = response(StatusCode::OK) let ret = response(StatusCode::OK)
.header(http::header::CONTENT_TYPE, "application/json") .header(http::header::CONTENT_TYPE, "application/json")
.body(Body::from(serde_json::to_string(&res)?))?; .body(Body::from(serde_json::to_string(&res)?))?;
Ok(ret) Ok(ret)
} }
pub async fn update_db_with_channel_names(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> { pub async fn update_db_with_channel_names(
req: Request<Body>,
node_config: &NodeConfigCached,
) -> Result<Response<Body>, Error> {
let (head, _body) = req.into_parts(); let (head, _body) = req.into_parts();
let _dry = match head.uri.query() { let _dry = match head.uri.query() {
Some(q) => q.contains("dry"), Some(q) => q.contains("dry"),
None => false, None => false,
}; };
let res = dbconn::scan::update_db_with_channel_names(node_config).await?; let res = dbconn::scan::UpdatedDbWithChannelNamesStream::new(node_config.clone())?;
//let res = dbconn::scan::update_db_with_channel_names(node_config).await?;
let ret = response(StatusCode::OK) let ret = response(StatusCode::OK)
.header(http::header::CONTENT_TYPE, "application/json") .header(http::header::CONTENT_TYPE, "application/json")
.body(Body::from(serde_json::to_string(&res)?))?; .body(Body::wrap_stream(res.map(|k| match serde_json::to_string(&k) {
Ok(item) => Ok(item),
Err(e) => Err(e),
})))?;
Ok(ret) Ok(ret)
} }
pub async fn update_db_with_all_channel_configs(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> { pub async fn update_db_with_all_channel_configs(
req: Request<Body>,
node_config: &NodeConfigCached,
) -> Result<Response<Body>, Error> {
let (head, _body) = req.into_parts(); let (head, _body) = req.into_parts();
let _dry = match head.uri.query() { let _dry = match head.uri.query() {
Some(q) => q.contains("dry"), Some(q) => q.contains("dry"),
@@ -417,3 +433,16 @@ pub async fn update_db_with_all_channel_configs(req: Request<Body>, node_config:
.body(Body::from(serde_json::to_string(&res)?))?; .body(Body::from(serde_json::to_string(&res)?))?;
Ok(ret) Ok(ret)
} }
pub async fn update_search_cache(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
let (head, _body) = req.into_parts();
let _dry = match head.uri.query() {
Some(q) => q.contains("dry"),
None => false,
};
let res = dbconn::scan::update_search_cache(node_config).await?;
let ret = response(StatusCode::OK)
.header(http::header::CONTENT_TYPE, "application/json")
.body(Body::from(serde_json::to_string(&res)?))?;
Ok(ret)
}