Move API 1 docs here
This commit is contained in:
@@ -33,20 +33,6 @@ pub async fn create_connection(node_config: &NodeConfigCached) -> Result<Client,
|
||||
Ok(cl)
|
||||
}
|
||||
|
||||
pub async fn create_connection_2(node_config: NodeConfigCached) -> Result<Client, Error> {
|
||||
let d = &node_config.node_config.cluster.database;
|
||||
let uri = format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, 5432, d.name);
|
||||
let (cl, conn) = tokio_postgres::connect(&uri, NoTls).await?;
|
||||
// TODO monitor connection drop.
|
||||
let _cjh = tokio::spawn(async move {
|
||||
if let Err(e) = conn.await {
|
||||
error!("connection error: {}", e);
|
||||
}
|
||||
Ok::<_, Error>(())
|
||||
});
|
||||
Ok(cl)
|
||||
}
|
||||
|
||||
pub async fn channel_exists(channel: &Channel, node_config: &NodeConfigCached) -> Result<bool, Error> {
|
||||
let cl = create_connection(node_config).await?;
|
||||
let rows = cl
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use async_channel::{bounded, Receiver, Sender};
|
||||
use async_channel::{bounded, Receiver};
|
||||
use chrono::{DateTime, Utc};
|
||||
use err::Error;
|
||||
use futures_core::Stream;
|
||||
@@ -8,11 +8,9 @@ use netpod::NodeConfigCached;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::future::Future;
|
||||
use std::io::ErrorKind;
|
||||
use std::marker::PhantomPinned;
|
||||
use std::os::unix::ffi::OsStringExt;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::pin::Pin;
|
||||
use std::ptr::NonNull;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::task::{Context, Poll};
|
||||
use tokio_postgres::Client;
|
||||
@@ -80,12 +78,12 @@ pub struct UpdatedDbWithChannelNames {
|
||||
pub struct UpdatedDbWithChannelNamesStream {
|
||||
errored: bool,
|
||||
data_complete: bool,
|
||||
#[allow(dead_code)]
|
||||
node_config: Pin<Box<NodeConfigCached>>,
|
||||
node_config_ptr: NonNull<NodeConfigCached>,
|
||||
//_pin: PhantomPinned,
|
||||
node_config_ref: &'static NodeConfigCached,
|
||||
client_fut: Option<Pin<Box<dyn Future<Output = Result<Client, Error>> + Send>>>,
|
||||
client: Option<Pin<Box<Client>>>,
|
||||
client_ptr: *const Client,
|
||||
client_ref: Option<&'static Client>,
|
||||
ident_fut: Option<Pin<Box<dyn Future<Output = Result<NodeDiskIdent, Error>> + Send>>>,
|
||||
ident: Option<NodeDiskIdent>,
|
||||
}
|
||||
@@ -95,22 +93,19 @@ unsafe impl Send for UpdatedDbWithChannelNamesStream {}
|
||||
impl UpdatedDbWithChannelNamesStream {
|
||||
pub fn new(node_config: NodeConfigCached) -> Result<Self, Error> {
|
||||
let node_config = Box::pin(node_config.clone());
|
||||
let node_config_ref = unsafe { &*(&node_config as &NodeConfigCached as *const _) };
|
||||
let mut ret = Self {
|
||||
errored: false,
|
||||
data_complete: false,
|
||||
node_config_ptr: NonNull::dangling(),
|
||||
node_config,
|
||||
//_pin: PhantomPinned,
|
||||
node_config_ref,
|
||||
client_fut: None,
|
||||
client: None,
|
||||
client_ptr: std::ptr::null(),
|
||||
client_ref: None,
|
||||
ident_fut: None,
|
||||
ident: None,
|
||||
};
|
||||
ret.node_config_ptr = NonNull::from(&*ret.node_config);
|
||||
ret.client_fut = Some(Box::pin(crate::create_connection(unsafe {
|
||||
&*ret.node_config_ptr.as_ptr()
|
||||
})));
|
||||
ret.client_fut = Some(Box::pin(crate::create_connection(ret.node_config_ref)));
|
||||
Ok(ret)
|
||||
}
|
||||
}
|
||||
@@ -120,7 +115,7 @@ impl Stream for UpdatedDbWithChannelNamesStream {
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use Poll::*;
|
||||
'outer: loop {
|
||||
loop {
|
||||
break if self.errored {
|
||||
Ready(None)
|
||||
} else if self.data_complete {
|
||||
@@ -148,11 +143,10 @@ impl Stream for UpdatedDbWithChannelNamesStream {
|
||||
Ready(Ok(item)) => {
|
||||
self.client_fut = None;
|
||||
self.client = Some(Box::pin(item));
|
||||
self.client_ptr = self.client.as_ref().unwrap() as &Client as *const _;
|
||||
let p1 = unsafe { &*self.client_ptr };
|
||||
self.client_ref = Some(unsafe { &*(&self.client.as_ref().unwrap() as &Client as *const _) });
|
||||
self.ident_fut = Some(Box::pin(get_node_disk_ident(
|
||||
unsafe { &*self.node_config_ptr.as_ptr() },
|
||||
&p1,
|
||||
self.node_config_ref,
|
||||
self.client_ref.as_ref().unwrap(),
|
||||
)));
|
||||
let ret = UpdatedDbWithChannelNames {
|
||||
msg: format!("Client opened connection"),
|
||||
@@ -209,7 +203,7 @@ pub async fn update_db_with_channel_names(
|
||||
})
|
||||
.await?;
|
||||
dbc.query("commit", &[]).await?;
|
||||
let ret = UpdatedDbWithChannelNames {
|
||||
let _ret = UpdatedDbWithChannelNames {
|
||||
msg: format!("done"),
|
||||
count: *c1.read()?,
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user