diff --git a/dbconn/src/lib.rs b/dbconn/src/lib.rs index 4c74199..dda2fbd 100644 --- a/dbconn/src/lib.rs +++ b/dbconn/src/lib.rs @@ -1,9 +1,9 @@ use err::Error; use netpod::log::*; use netpod::{Channel, NodeConfigCached}; -use tokio_postgres::NoTls; +use tokio_postgres::{Client, NoTls}; -pub async fn channel_exists(channel: &Channel, node_config: &NodeConfigCached) -> Result { +pub async fn create_connection(node_config: &NodeConfigCached) -> Result { let d = &node_config.node_config.cluster.database; let uri = format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, 5432, d.name); let (cl, conn) = tokio_postgres::connect(&uri, NoTls).await?; @@ -14,6 +14,11 @@ pub async fn channel_exists(channel: &Channel, node_config: &NodeConfigCached) - } Ok::<_, Error>(()) }); + Ok(cl) +} + +pub async fn channel_exists(channel: &Channel, node_config: &NodeConfigCached) -> Result { + let cl = create_connection(node_config).await?; let rows = cl .query("select rowid from channels where name = $1::text", &[&channel.name]) .await?; @@ -28,3 +33,19 @@ pub async fn channel_exists(channel: &Channel, node_config: &NodeConfigCached) - } Ok(true) } + +pub async fn database_size(node_config: &NodeConfigCached) -> Result { + let cl = create_connection(node_config).await?; + let rows = cl + .query( + "select pg_database_size($1::text)", + &[&node_config.node_config.cluster.database.name], + ) + .await?; + if rows.len() == 0 { + Err(Error::with_msg("could not get database size"))?; + } + let size: i64 = rows[0].get(0); + let size = size as u64; + Ok(size) +} diff --git a/httpret/Cargo.toml b/httpret/Cargo.toml index 85cfdb1..6e6e09a 100644 --- a/httpret/Cargo.toml +++ b/httpret/Cargo.toml @@ -6,6 +6,7 @@ edition = "2018" [dependencies] serde_json = "1.0" +serde = { version = "1.0", features = ["derive"] } http = "0.2" url = "2.2" tokio = { version = "1.5.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } @@ -17,5 +18,6 @@ tracing = "0.1.25" async-channel = "1.6" err = { path = "../err" } netpod = { path = "../netpod" } +dbconn = { path = "../dbconn" } disk = { path = "../disk" } taskrun = { path = "../taskrun" } diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index 324135a..33ad4af 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -13,6 +13,7 @@ use net::SocketAddr; use netpod::{ByteSize, Node, NodeConfigCached}; use panic::{AssertUnwindSafe, UnwindSafe}; use pin::Pin; +use serde::{Deserialize, Serialize}; use std::{future, net, panic, pin, task}; use task::{Context, Poll}; use tracing::field::Empty; @@ -85,7 +86,13 @@ impl UnwindSafe for Cont {} async fn data_api_proxy_try(req: Request, node_config: &NodeConfigCached) -> Result, Error> { let uri = req.uri().clone(); let path = uri.path(); - if path == "/api/1/parsed_raw" { + if path == "/api/1/node_status" { + if req.method() == Method::GET { + Ok(node_status(req, &node_config).await?) + } else { + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) + } + } else if path == "/api/1/parsed_raw" { if req.method() == Method::POST { Ok(parsed_raw(req, &node_config.node).await?) } else { @@ -256,3 +263,18 @@ async fn prebinned(req: Request, node_config: &NodeConfigCached) -> Result Ok(ret) }) } + +#[derive(Debug, Serialize, Deserialize)] +pub struct NodeStatus { + database_size: u64, +} + +async fn node_status(req: Request, node_config: &NodeConfigCached) -> Result, Error> { + let (_head, _body) = req.into_parts(); + let ret = NodeStatus { + database_size: dbconn::database_size(node_config).await?, + }; + let ret = serde_json::to_vec(&ret)?; + let ret = response(StatusCode::OK).body(Body::from(ret))?; + Ok(ret) +} diff --git a/retrieval/src/bin/retrieval.rs b/retrieval/src/bin/retrieval.rs index 2e1f52d..8f705e8 100644 --- a/retrieval/src/bin/retrieval.rs +++ b/retrieval/src/bin/retrieval.rs @@ -34,6 +34,9 @@ async fn go() -> Result<(), Error> { retrieval::run_node(node_config.clone()).await?; } SubCmd::Client(client) => match client.client_type { + ClientType::Status(opts) => { + retrieval::client::status(opts.host, opts.port).await?; + } ClientType::Binned(opts) => { let beg = opts.beg.parse()?; let end = opts.end.parse()?; diff --git a/retrieval/src/cli.rs b/retrieval/src/cli.rs index d078104..97379b0 100644 --- a/retrieval/src/cli.rs +++ b/retrieval/src/cli.rs @@ -31,6 +31,15 @@ pub struct Client { #[derive(Debug, Clap)] pub enum ClientType { Binned(BinnedClient), + Status(StatusClient), +} + +#[derive(Debug, Clap)] +pub struct StatusClient { + #[clap(long)] + pub host: String, + #[clap(long)] + pub port: u16, } #[derive(Debug, Clap)] diff --git a/retrieval/src/client.rs b/retrieval/src/client.rs index 15fde32..1e0ac8c 100644 --- a/retrieval/src/client.rs +++ b/retrieval/src/client.rs @@ -10,6 +10,28 @@ use hyper::Body; use netpod::log::*; use netpod::PerfOpts; +pub async fn status(host: String, port: u16) -> Result<(), Error> { + let t1 = Utc::now(); + let uri = format!("http://{}:{}/api/1/node_status", host, port,); + let req = hyper::Request::builder() + .method(http::Method::GET) + .uri(uri) + .body(Body::empty())?; + let client = hyper::Client::new(); + let res = client.request(req).await?; + if res.status() != StatusCode::OK { + error!("Server error {:?}", res); + return Err(Error::with_msg(format!("Server error {:?}", res))); + } + let body = hyper::body::to_bytes(res.into_body()).await?; + let res = String::from_utf8(body.to_vec())?; + let t2 = chrono::Utc::now(); + let ms = t2.signed_duration_since(t1).num_milliseconds() as u64; + info!("node_status DONE duration: {} ms", ms); + println!("{}", res); + Ok(()) +} + pub async fn get_binned( host: String, port: u16,