WIP
This commit is contained in:
@@ -1553,9 +1553,13 @@ pub async fn run(opts: CaIngestOpts, channels: Vec<String>) -> Result<(), Error>
|
||||
|
||||
netfetch::dbpg::schema_check(opts.postgresql_config()).await?;
|
||||
|
||||
scywr::schema::migrate_keyspace(opts.scylla_config())
|
||||
scywr::schema::migrate_scylla_data_schema(opts.scylla_config())
|
||||
.await
|
||||
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
|
||||
info!("migrate_keyspace done");
|
||||
if true {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// TODO use a new stats type:
|
||||
//let store_stats = Arc::new(CaConnStats::new());
|
||||
|
||||
@@ -0,0 +1,13 @@
|
||||
[package]
|
||||
name = "dbpg"
|
||||
version = "0.0.1"
|
||||
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
log = { path = "../log" }
|
||||
err = { path = "../../daqbuffer/crates/err" }
|
||||
netpod = { path = "../../daqbuffer/crates/netpod" }
|
||||
taskrun = { path = "../../daqbuffer/crates/taskrun" }
|
||||
tokio-postgres = { version = "0.7.10", features = ["with-chrono-0_4"] }
|
||||
async-channel = "1.9.0"
|
||||
@@ -0,0 +1,17 @@
|
||||
use crate::err::Error;
|
||||
use log::*;
|
||||
use netpod::Database;
|
||||
use taskrun::tokio;
|
||||
use tokio_postgres::Client;
|
||||
|
||||
pub type PgClient = Client;
|
||||
|
||||
pub async fn make_pg_client(dbconf: &Database) -> Result<PgClient, Error> {
|
||||
let d = dbconf;
|
||||
let url = format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, d.port, d.name);
|
||||
info!("connect to {url}");
|
||||
let (client, pg_conn) = tokio_postgres::connect(&url, tokio_postgres::tls::NoTls).await?;
|
||||
// TODO allow clean shutdown on ctrl-c and join the pg_conn in the end:
|
||||
tokio::spawn(pg_conn);
|
||||
Ok(client)
|
||||
}
|
||||
@@ -0,0 +1,20 @@
|
||||
use err::thiserror;
|
||||
use err::ThisError;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Msg(pub String);
|
||||
|
||||
#[derive(Debug, ThisError)]
|
||||
pub enum Error {
|
||||
Postgres(#[from] tokio_postgres::Error),
|
||||
Msg(Msg),
|
||||
}
|
||||
|
||||
impl Error {
|
||||
pub fn from_msg<T>(msg: T) -> Self
|
||||
where
|
||||
T: Into<String>,
|
||||
{
|
||||
Self::Msg(Msg(msg.into()))
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,114 @@
|
||||
use crate::conn::PgClient;
|
||||
use crate::err::Error;
|
||||
use async_channel::Receiver;
|
||||
use log::*;
|
||||
use std::net::SocketAddrV4;
|
||||
use std::time::Duration;
|
||||
use tokio_postgres::Statement;
|
||||
|
||||
const TEXT: tokio_postgres::types::Type = tokio_postgres::types::Type::TEXT;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct IocItem {
|
||||
pub channel: String,
|
||||
pub response_addr: Option<SocketAddrV4>,
|
||||
pub addr: Option<SocketAddrV4>,
|
||||
pub dt: Duration,
|
||||
}
|
||||
|
||||
impl IocItem {
|
||||
pub fn new(channel: String, response_addr: Option<SocketAddrV4>, addr: Option<SocketAddrV4>, dt: Duration) -> Self {
|
||||
Self {
|
||||
channel,
|
||||
response_addr,
|
||||
addr,
|
||||
dt,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO
|
||||
// Issue: the prepared statements are bound to a connection.
|
||||
// A connection pool is therefore often not a good model.
|
||||
// Better would probably be to hand a database-work-item to a queue.
|
||||
// A worker can then deal with the query.
|
||||
// Must choose between: some async fn waits for that item being finished.
|
||||
// Or: enqueue-and-forget and let the result of that query be enqueued
|
||||
// in yet another queue.
|
||||
pub struct IocSearchIndexWorker {
|
||||
backend: String,
|
||||
pg: PgClient,
|
||||
qu_select: Statement,
|
||||
qu_update_tsmod: Statement,
|
||||
qu_update_archived: Statement,
|
||||
qu_insert: Statement,
|
||||
rx: Receiver<IocItem>,
|
||||
}
|
||||
|
||||
impl IocSearchIndexWorker {
|
||||
pub async fn prepare(rx: Receiver<IocItem>, backend: String, pg: PgClient) -> Result<Self, Error> {
|
||||
let qu_select = {
|
||||
let sql = "select channel, addr from ioc_by_channel_log where facility = $1 and channel = $2 and addr is not distinct from $3 and archived = 0";
|
||||
pg.prepare(sql).await.unwrap()
|
||||
};
|
||||
let qu_update_tsmod = {
|
||||
let sql = "update ioc_by_channel_log set tsmod = now(), responseaddr = $4 where facility = $1 and channel = $2 and addr is not distinct from $3 and archived = 0";
|
||||
pg.prepare(sql).await.unwrap()
|
||||
};
|
||||
let qu_update_archived = {
|
||||
let sql =
|
||||
"update ioc_by_channel_log set archived = 1 where facility = $1 and channel = $2 and archived = 0";
|
||||
pg.prepare(sql).await.unwrap()
|
||||
};
|
||||
let qu_insert = {
|
||||
let sql = "insert into ioc_by_channel_log (facility, channel, addr, responseaddr) values ($1, $2, $3, $4)";
|
||||
pg.prepare_typed(sql, &[TEXT, TEXT, TEXT, TEXT]).await.unwrap()
|
||||
};
|
||||
let ret = Self {
|
||||
backend,
|
||||
pg,
|
||||
qu_select,
|
||||
qu_update_tsmod,
|
||||
qu_update_archived,
|
||||
qu_insert,
|
||||
rx,
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
pub async fn worker(&self) {
|
||||
let backend = &self.backend;
|
||||
let rx = &self.rx;
|
||||
let pg = &self.pg;
|
||||
while let Ok(item) = rx.recv().await {
|
||||
let responseaddr = item.response_addr.map(|x| x.to_string());
|
||||
let addr = item.addr.map(|x| x.to_string());
|
||||
let res = pg
|
||||
.query(&self.qu_select, &[backend, &item.channel, &addr])
|
||||
.await
|
||||
.unwrap();
|
||||
if res.len() == 0 {
|
||||
pg.execute(&self.qu_update_archived, &[backend, &item.channel])
|
||||
.await
|
||||
.unwrap();
|
||||
pg.execute(&self.qu_insert, &[backend, &item.channel, &addr, &responseaddr])
|
||||
.await
|
||||
.unwrap();
|
||||
} else if res.len() == 1 {
|
||||
pg.execute(&self.qu_update_tsmod, &[backend, &item.channel, &addr, &responseaddr])
|
||||
.await
|
||||
.unwrap();
|
||||
} else {
|
||||
warn!("Duplicate for {}", item.channel);
|
||||
let sql = concat!(
|
||||
"with q1 as (select ctid from ioc_by_channel_log where facility = $1 and channel = $2 and addr is not distinct from $3 order by tsmod desc, ctid desc limit 1)",
|
||||
" update ioc_by_channel_log t set archived = 1 from q1 where t.facility = $1 and t.channel = $2 and t.ctid != q1.ctid and archived != 1",
|
||||
);
|
||||
pg.execute(sql, &[backend, &item.channel, &addr]).await.unwrap();
|
||||
pg.execute(&self.qu_update_tsmod, &[&backend, &item.channel, &addr, &responseaddr])
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,5 @@
|
||||
pub mod conn;
|
||||
pub mod err;
|
||||
pub mod iocindex;
|
||||
pub mod pool;
|
||||
pub mod schema;
|
||||
@@ -0,0 +1,114 @@
|
||||
use crate::conn::PgClient;
|
||||
use async_channel::Receiver;
|
||||
use async_channel::RecvError;
|
||||
use async_channel::SendError;
|
||||
use async_channel::Sender;
|
||||
use err::thiserror;
|
||||
use err::ThisError;
|
||||
use log::*;
|
||||
use netpod::Database;
|
||||
|
||||
#[derive(Debug, ThisError)]
|
||||
pub enum Error {
|
||||
Postgres(#[from] tokio_postgres::Error),
|
||||
EndOfPool,
|
||||
ChannelRecv(#[from] RecvError),
|
||||
ChannelSend,
|
||||
Msg(String),
|
||||
}
|
||||
|
||||
impl From<crate::err::Error> for Error {
|
||||
fn from(value: crate::err::Error) -> Self {
|
||||
type G = crate::err::Error;
|
||||
match value {
|
||||
G::Postgres(e) => Error::Postgres(e),
|
||||
G::Msg(e) => Error::Msg(e.0),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> From<SendError<T>> for Error {
|
||||
fn from(_: SendError<T>) -> Self {
|
||||
Self::ChannelSend
|
||||
}
|
||||
}
|
||||
|
||||
struct PgClientInner {
|
||||
pgc: PgClient,
|
||||
handout_count: u64,
|
||||
}
|
||||
|
||||
pub struct PgClientPooled {
|
||||
pgc: Option<PgClientInner>,
|
||||
tx: Sender<PgClientInner>,
|
||||
}
|
||||
|
||||
impl Drop for PgClientPooled {
|
||||
fn drop(&mut self) {
|
||||
// async: the channel capacity is chosen large enough so
|
||||
// all pooled connections have space.
|
||||
match self.tx.try_send(self.pgc.take().unwrap()) {
|
||||
Ok(()) => {}
|
||||
Err(e) => {
|
||||
use async_channel::TrySendError;
|
||||
match e {
|
||||
TrySendError::Full(_) => {
|
||||
error!("can not return pooled database handle, pool is blocked");
|
||||
}
|
||||
TrySendError::Closed(_) => {
|
||||
warn!("can not return pooled database handle, pool is closed")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PgClientPooled {
|
||||
pub fn pgc(&self) -> &PgClient {
|
||||
&self.pgc.as_ref().unwrap().pgc
|
||||
}
|
||||
}
|
||||
|
||||
pub struct PgPool {
|
||||
tx: Sender<PgClientInner>,
|
||||
rx: Receiver<PgClientInner>,
|
||||
handout_count: u64,
|
||||
}
|
||||
|
||||
impl PgPool {
|
||||
pub async fn new(cap: usize, dbconf: &Database) -> Result<Self, Error> {
|
||||
let (tx, rx) = async_channel::bounded(2 + cap);
|
||||
for _ in 0..cap {
|
||||
let pgc = crate::conn::make_pg_client(dbconf).await?;
|
||||
let pgc = PgClientInner { pgc, handout_count: 0 };
|
||||
tx.send(pgc).await?;
|
||||
}
|
||||
let ret = PgPool {
|
||||
tx,
|
||||
rx,
|
||||
handout_count: 0,
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
pub async fn conn(&self) -> Result<PgClientPooled, Error> {
|
||||
let mut pgc = self.rx.recv().await?;
|
||||
pgc.handout_count += 1;
|
||||
let ret = PgClientPooled {
|
||||
pgc: Some(pgc),
|
||||
tx: self.tx.clone(),
|
||||
};
|
||||
is_it_send(&ret);
|
||||
is_it_sync(&ret);
|
||||
let mut ret = ret;
|
||||
is_it_mut_sync(&mut ret);
|
||||
Ok(ret)
|
||||
}
|
||||
}
|
||||
|
||||
fn is_it_send<T: Send>(_: &T) {}
|
||||
|
||||
fn is_it_sync<T: Sync>(_: &T) {}
|
||||
|
||||
fn is_it_mut_sync<T: Sync>(_: &mut T) {}
|
||||
@@ -1,20 +1,6 @@
|
||||
use crate::errconv::ErrConv;
|
||||
use err::Error;
|
||||
use netpod::log::*;
|
||||
use netpod::Database;
|
||||
use taskrun::tokio;
|
||||
use tokio_postgres::Client as PgClient;
|
||||
|
||||
pub async fn make_pg_client(d: &Database) -> Result<PgClient, Error> {
|
||||
let url = format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, d.port, d.name);
|
||||
info!("connect to {url}");
|
||||
let (client, pg_conn) = tokio_postgres::connect(&url, tokio_postgres::tls::NoTls)
|
||||
.await
|
||||
.err_conv()?;
|
||||
// TODO allow clean shutdown on ctrl-c and join the pg_conn in the end:
|
||||
tokio::spawn(pg_conn);
|
||||
Ok(client)
|
||||
}
|
||||
use crate::conn::PgClient;
|
||||
use crate::err::Error;
|
||||
use log::*;
|
||||
|
||||
async fn has_column(table: &str, column: &str, pgc: &PgClient) -> Result<bool, Error> {
|
||||
let rows = pgc
|
||||
@@ -22,8 +8,7 @@ async fn has_column(table: &str, column: &str, pgc: &PgClient) -> Result<bool, E
|
||||
"select count(*) as c from information_schema.columns where table_name = $1 and column_name = $2 limit 10",
|
||||
&[&table, &column],
|
||||
)
|
||||
.await
|
||||
.err_conv()?;
|
||||
.await?;
|
||||
if rows.len() == 1 {
|
||||
let c: i64 = rows[0].get(0);
|
||||
if c == 0 {
|
||||
@@ -31,15 +16,12 @@ async fn has_column(table: &str, column: &str, pgc: &PgClient) -> Result<bool, E
|
||||
} else if c == 1 {
|
||||
Ok(true)
|
||||
} else {
|
||||
Err(Error::with_msg_no_trace(format!("has_columns bad count {}", c)))
|
||||
Err(Error::from_msg(format!("has_columns bad count {}", c)))
|
||||
}
|
||||
} else if rows.len() == 0 {
|
||||
Ok(false)
|
||||
} else {
|
||||
Err(Error::with_msg_no_trace(format!(
|
||||
"has_columns bad row count {}",
|
||||
rows.len()
|
||||
)))
|
||||
Err(Error::from_msg(format!("has_columns bad row count {}", rows.len())))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -49,16 +31,14 @@ async fn migrate_00(pgc: &PgClient) -> Result<(), Error> {
|
||||
"alter table ioc_by_channel_log add tscreate timestamptz not null default now()",
|
||||
&[],
|
||||
)
|
||||
.await
|
||||
.err_conv()?;
|
||||
.await?;
|
||||
}
|
||||
if !has_column("ioc_by_channel_log", "archived", pgc).await? {
|
||||
pgc.execute(
|
||||
"alter table ioc_by_channel_log add archived int not null default 0",
|
||||
&[],
|
||||
)
|
||||
.await
|
||||
.err_conv()?;
|
||||
.await?;
|
||||
}
|
||||
{
|
||||
match pgc.execute("alter table series_by_channel add constraint series_by_channel_nondup unique (facility, channel, scalar_type, shape_dims, agg_kind)", &[]).await {
|
||||
@@ -71,8 +51,7 @@ async fn migrate_00(pgc: &PgClient) -> Result<(), Error> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn schema_check(db: &Database) -> Result<(), Error> {
|
||||
let pgc = make_pg_client(db).await?;
|
||||
pub async fn schema_check(pgc: &PgClient) -> Result<(), Error> {
|
||||
migrate_00(&pgc).await?;
|
||||
info!("schema_check done");
|
||||
Ok(())
|
||||
+1
-1
@@ -16,7 +16,6 @@ bytes = "1.4"
|
||||
arrayref = "0.3"
|
||||
byteorder = "1.4"
|
||||
futures-util = "0.3"
|
||||
tokio-postgres = "0.7.8"
|
||||
md-5 = "0.10"
|
||||
hex = "0.4"
|
||||
libc = "0.2"
|
||||
@@ -33,6 +32,7 @@ lazy_static = "1"
|
||||
log = { path = "../log" }
|
||||
stats = { path = "../stats" }
|
||||
scywr = { path = "../scywr" }
|
||||
dbpg = { path = "../dbpg" }
|
||||
err = { path = "../../daqbuffer/crates/err" }
|
||||
netpod = { path = "../../daqbuffer/crates/netpod" }
|
||||
items_0 = { path = "../../daqbuffer/crates/items_0" }
|
||||
|
||||
+20
-82
@@ -1,19 +1,19 @@
|
||||
use super::findioc::FindIocRes;
|
||||
use crate::ca::findioc::FindIocStream;
|
||||
use crate::conf::CaIngestOpts;
|
||||
use async_channel::Receiver;
|
||||
use async_channel::Sender;
|
||||
use dbpg::conn::PgClient;
|
||||
use dbpg::iocindex::IocItem;
|
||||
use dbpg::iocindex::IocSearchIndexWorker;
|
||||
use err::Error;
|
||||
use futures_util::StreamExt;
|
||||
use log::*;
|
||||
use netpod::Database;
|
||||
use std::net::IpAddr;
|
||||
use std::net::SocketAddr;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
use taskrun::tokio;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio_postgres::Client as PgClient;
|
||||
|
||||
const DB_WORKER_COUNT: usize = 4;
|
||||
|
||||
@@ -59,89 +59,23 @@ struct DbUpdateWorker {
|
||||
}
|
||||
|
||||
impl DbUpdateWorker {
|
||||
fn new(rx: Receiver<FindIocRes>, backend: String, database: Database) -> Self {
|
||||
let jh = tokio::spawn(Self::worker(rx, backend, database));
|
||||
Self { jh }
|
||||
}
|
||||
|
||||
async fn worker(rx: Receiver<FindIocRes>, backend: String, database: Database) {
|
||||
let d = &database;
|
||||
let (pg_client, pg_conn) = tokio_postgres::connect(
|
||||
&format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, d.port, d.name),
|
||||
tokio_postgres::tls::NoTls,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let (pgconn_out_tx, pgconn_out_rx) = async_channel::bounded(16);
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = pgconn_out_tx.send(pg_conn.await).await {
|
||||
error!("can not report status of pg conn {e}");
|
||||
}
|
||||
});
|
||||
let pg_client: PgClient = pg_client;
|
||||
let qu_select = {
|
||||
let sql = "select channel, addr from ioc_by_channel_log where facility = $1 and channel = $2 and addr is not distinct from $3 and archived = 0";
|
||||
pg_client.prepare(sql).await.unwrap()
|
||||
};
|
||||
let qu_update_tsmod = {
|
||||
let sql = "update ioc_by_channel_log set tsmod = now(), responseaddr = $4 where facility = $1 and channel = $2 and addr is not distinct from $3 and archived = 0";
|
||||
pg_client.prepare(sql).await.unwrap()
|
||||
};
|
||||
let qu_update_archived = {
|
||||
let sql =
|
||||
"update ioc_by_channel_log set archived = 1 where facility = $1 and channel = $2 and archived = 0";
|
||||
pg_client.prepare(sql).await.unwrap()
|
||||
};
|
||||
let qu_insert = {
|
||||
let sql = "insert into ioc_by_channel_log (facility, channel, addr, responseaddr) values ($1, $2, $3, $4)";
|
||||
const TEXT: tokio_postgres::types::Type = tokio_postgres::types::Type::TEXT;
|
||||
pg_client.prepare_typed(sql, &[TEXT, TEXT, TEXT, TEXT]).await.unwrap()
|
||||
};
|
||||
while let Ok(item) = rx.recv().await {
|
||||
let responseaddr = item.response_addr.map(|x| x.to_string());
|
||||
let addr = item.addr.map(|x| x.to_string());
|
||||
let res = pg_client
|
||||
.query(&qu_select, &[&backend, &item.channel, &addr])
|
||||
.await
|
||||
.unwrap();
|
||||
if res.len() == 0 {
|
||||
pg_client
|
||||
.execute(&qu_update_archived, &[&backend, &item.channel])
|
||||
.await
|
||||
.unwrap();
|
||||
pg_client
|
||||
.execute(&qu_insert, &[&backend, &item.channel, &addr, &responseaddr])
|
||||
.await
|
||||
.unwrap();
|
||||
} else if res.len() == 1 {
|
||||
pg_client
|
||||
.execute(&qu_update_tsmod, &[&backend, &item.channel, &addr, &responseaddr])
|
||||
.await
|
||||
.unwrap();
|
||||
} else {
|
||||
warn!("Duplicate for {}", item.channel);
|
||||
let sql = concat!(
|
||||
"with q1 as (select ctid from ioc_by_channel_log where facility = $1 and channel = $2 and addr is not distinct from $3 order by tsmod desc, ctid desc limit 1)",
|
||||
" update ioc_by_channel_log t set archived = 1 from q1 where t.facility = $1 and t.channel = $2 and t.ctid != q1.ctid and archived != 1",
|
||||
);
|
||||
pg_client.execute(sql, &[&backend, &item.channel, &addr]).await.unwrap();
|
||||
pg_client
|
||||
.execute(&qu_update_tsmod, &[&backend, &item.channel, &addr, &responseaddr])
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
drop(pg_client);
|
||||
let x = pgconn_out_rx.recv().await;
|
||||
if let Err(e) = x {
|
||||
error!("db worker sees: {e}");
|
||||
}
|
||||
async fn new(rx: Receiver<IocItem>, backend: String, pg: PgClient) -> Result<Self, Error> {
|
||||
let worker = IocSearchIndexWorker::prepare(rx, backend, pg)
|
||||
.await
|
||||
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
|
||||
let jh = tokio::spawn(async move { worker.worker().await });
|
||||
Ok(Self { jh })
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn ca_search(opts: CaIngestOpts, channels: &Vec<String>) -> Result<(), Error> {
|
||||
info!("ca_search begin");
|
||||
crate::dbpg::schema_check(opts.postgresql_config()).await?;
|
||||
let pg = dbpg::conn::make_pg_client(opts.postgresql_config())
|
||||
.await
|
||||
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
|
||||
dbpg::schema::schema_check(&pg)
|
||||
.await
|
||||
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
|
||||
let mut addrs = Vec::new();
|
||||
for s in opts.search() {
|
||||
match resolve_address(s).await {
|
||||
@@ -189,7 +123,10 @@ pub async fn ca_search(opts: CaIngestOpts, channels: &Vec<String>) -> Result<(),
|
||||
|
||||
let mut dbworkers = Vec::new();
|
||||
for _ in 0..DB_WORKER_COUNT {
|
||||
let w = DbUpdateWorker::new(dbrx.clone(), opts.backend().into(), opts.postgresql_config().clone());
|
||||
let pg = dbpg::conn::make_pg_client(opts.postgresql_config())
|
||||
.await
|
||||
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
|
||||
let w = DbUpdateWorker::new(dbrx.clone(), opts.backend().into(), pg).await?;
|
||||
dbworkers.push(w);
|
||||
}
|
||||
drop(dbrx);
|
||||
@@ -241,6 +178,7 @@ pub async fn ca_search(opts: CaIngestOpts, channels: &Vec<String>) -> Result<(),
|
||||
if do_block {
|
||||
info!("blacklisting {item:?}");
|
||||
} else {
|
||||
let item = IocItem::new(item.channel, item.response_addr, item.addr, item.dt);
|
||||
match dbtx.send(item).await {
|
||||
Ok(_) => {}
|
||||
Err(_) => {
|
||||
|
||||
@@ -5,7 +5,6 @@ pub mod bsreadclient;
|
||||
pub mod ca;
|
||||
pub mod conf;
|
||||
pub mod daemon_common;
|
||||
pub mod dbpg;
|
||||
pub mod errconv;
|
||||
pub mod insertworker;
|
||||
pub mod linuxhelper;
|
||||
|
||||
+280
-97
@@ -1,12 +1,14 @@
|
||||
use crate::session::create_session;
|
||||
use crate::session::create_session_no_ks;
|
||||
use crate::session::ScySession;
|
||||
use err::thiserror;
|
||||
use err::ThisError;
|
||||
use futures_util::StreamExt;
|
||||
use log::*;
|
||||
use netpod::ScyllaConfig;
|
||||
use scylla::transport::errors::DbError;
|
||||
use scylla::transport::errors::QueryError;
|
||||
use std::fmt;
|
||||
use std::time::Duration;
|
||||
|
||||
#[derive(Debug, ThisError)]
|
||||
pub enum Error {
|
||||
@@ -24,14 +26,24 @@ impl From<crate::session::Error> for Error {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn has_keyspace(name: &str, scy: &ScySession) -> Result<bool, Error> {
|
||||
let cql = "select keyspace_name from system_schema.keyspaces where keyspace_name = ?";
|
||||
let mut res = scy.query_iter(cql, (name,)).await?;
|
||||
while let Some(k) = res.next().await {
|
||||
let row = k?;
|
||||
if let Some(table_name) = row.columns[0].as_ref().unwrap().as_text() {
|
||||
if table_name == name {
|
||||
return Ok(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
pub async fn has_table(name: &str, scy: &ScySession) -> Result<bool, Error> {
|
||||
let cql = "select table_name from system_schema.tables where keyspace_name = ?";
|
||||
let ks = scy.get_keyspace().ok_or_else(|| Error::NoKeyspaceChosen)?;
|
||||
let mut res = scy
|
||||
.query_iter(
|
||||
"select table_name from system_schema.tables where keyspace_name = ?",
|
||||
(ks.as_ref(),),
|
||||
)
|
||||
.await?;
|
||||
let mut res = scy.query_iter(cql, (ks.as_ref(),)).await?;
|
||||
while let Some(k) = res.next().await {
|
||||
let row = k?;
|
||||
if let Some(table_name) = row.columns[0].as_ref().unwrap().as_text() {
|
||||
@@ -79,30 +91,120 @@ pub async fn create_table_ts_msp(scy: &ScySession) -> Result<(), Error> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn dhours(x: u64) -> Duration {
|
||||
Duration::from_secs(60 * 60 * x)
|
||||
}
|
||||
|
||||
fn ddays(x: u64) -> Duration {
|
||||
Duration::from_secs(60 * 60 * 24 * x)
|
||||
}
|
||||
|
||||
struct GenTwcsTab {
|
||||
name: String,
|
||||
cql: String,
|
||||
default_time_to_live: usize,
|
||||
compaction_window_size: usize,
|
||||
col_names: Vec<String>,
|
||||
col_types: Vec<String>,
|
||||
partition_keys: Vec<String>,
|
||||
cluster_keys: Vec<String>,
|
||||
default_time_to_live: Duration,
|
||||
compaction_window_size: Duration,
|
||||
}
|
||||
|
||||
impl GenTwcsTab {
|
||||
fn name(&self) -> String {
|
||||
self.name.clone()
|
||||
// name: "series_by_ts_msp".into(),
|
||||
// cql: "(part int, ts_msp int, shape_kind int, scalar_type int, series bigint, primary key ((part, ts_msp, shape_kind, scalar_type), series))".into(),
|
||||
// default_time_to_live: 60 * 60 * 5,
|
||||
// compaction_window_size: 24 * 4,
|
||||
pub fn new<'a, N, CI, A, B, I2, I2A, I3, I3A>(
|
||||
name: N,
|
||||
cols: CI,
|
||||
partition_keys: I2,
|
||||
cluster_keys: I3,
|
||||
default_time_to_live: Duration,
|
||||
compaction_window_size: Duration,
|
||||
) -> Self
|
||||
where
|
||||
N: Into<String>,
|
||||
CI: IntoIterator<Item = &'a (A, B)>,
|
||||
// TODO could make for idiomatic to skip extra clone if passed value is already String
|
||||
A: AsRef<str> + 'a,
|
||||
B: AsRef<str> + 'a,
|
||||
I2: IntoIterator<Item = I2A>,
|
||||
I3: IntoIterator<Item = I3A>,
|
||||
I2A: Into<String>,
|
||||
I3A: Into<String>,
|
||||
{
|
||||
let mut col_names = Vec::new();
|
||||
let mut col_types = Vec::new();
|
||||
cols.into_iter().for_each(|(a, b)| {
|
||||
col_names.push(a.as_ref().into());
|
||||
col_types.push(b.as_ref().into());
|
||||
});
|
||||
Self {
|
||||
name: name.into(),
|
||||
col_names,
|
||||
col_types,
|
||||
partition_keys: partition_keys.into_iter().map(Into::into).collect(),
|
||||
cluster_keys: cluster_keys.into_iter().map(Into::into).collect(),
|
||||
default_time_to_live,
|
||||
compaction_window_size,
|
||||
}
|
||||
}
|
||||
|
||||
fn name(&self) -> &str {
|
||||
&self.name
|
||||
}
|
||||
|
||||
fn cql(&self) -> String {
|
||||
use std::fmt::Write;
|
||||
let pkey = if self.partition_keys.len() == 0 {
|
||||
panic!("some partition key is required");
|
||||
} else {
|
||||
self.partition_keys.join(", ")
|
||||
};
|
||||
let pkeys = format!("({})", pkey);
|
||||
let pkeys = if self.cluster_keys.len() == 0 {
|
||||
format!("({})", pkeys)
|
||||
} else {
|
||||
format!("({}, {})", pkeys, self.cluster_keys.join(", "))
|
||||
};
|
||||
let mut s = String::new();
|
||||
write!(s, "create table {}", self.name()).unwrap();
|
||||
s.write_str(&self.cql).unwrap();
|
||||
write!(s, " with default_time_to_live = {}", self.default_time_to_live).unwrap();
|
||||
let mut cols: Vec<_> = self
|
||||
.col_names
|
||||
.iter()
|
||||
.zip(self.col_types.iter())
|
||||
.map(|(n, t)| format!("{} {}", n, t))
|
||||
.collect();
|
||||
cols.push(format!("primary key {pkeys}"));
|
||||
let cols = cols.join(", ");
|
||||
write!(s, " ({})", cols).unwrap();
|
||||
write!(
|
||||
s,
|
||||
" with default_time_to_live = {}",
|
||||
self.default_time_to_live.as_secs()
|
||||
)
|
||||
.unwrap();
|
||||
s.write_str(" and compaction = { 'class': 'TimeWindowCompactionStrategy', 'compaction_window_unit': 'HOURS'")
|
||||
.unwrap();
|
||||
write!(s, ", 'compaction_window_size': {}", self.compaction_window_size).unwrap();
|
||||
write!(
|
||||
s,
|
||||
", 'compaction_window_size': {}",
|
||||
self.compaction_window_size.as_secs() / 60 / 60
|
||||
)
|
||||
.unwrap();
|
||||
s.write_str(" }").unwrap();
|
||||
s
|
||||
}
|
||||
|
||||
async fn create_if_missing(&self, scy: &ScySession) -> Result<(), Error> {
|
||||
// TODO check for more details (all columns, correct types, correct kinds, etc)
|
||||
if !has_table(self.name(), scy).await? {
|
||||
let cql = self.cql();
|
||||
info!("CREATE CQL: {cql}");
|
||||
scy.query(self.cql(), ()).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
struct EvTabDim0 {
|
||||
@@ -119,7 +221,7 @@ impl EvTabDim0 {
|
||||
format!("events_scalar_{}", self.sty)
|
||||
}
|
||||
|
||||
fn cql(&self) -> String {
|
||||
fn cql_create(&self) -> String {
|
||||
use std::fmt::Write;
|
||||
let mut s = String::new();
|
||||
write!(s, "create table {}", self.name()).unwrap();
|
||||
@@ -161,6 +263,24 @@ impl EvTabDim1 {
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
async fn get_columns(keyspace: &str, table: &str, scy: &ScySession) -> Result<Vec<String>, Error> {
|
||||
let mut ret = Vec::new();
|
||||
let cql = "select column_name, kind, type from system_schema.columns where keyspace_name = ? and table_name = ?";
|
||||
let params = (keyspace, table);
|
||||
let mut res = scy.query_iter(cql, params).await?;
|
||||
while let Some(row) = res.next().await {
|
||||
// columns:
|
||||
// kind (text): regular, clustering, partition_key.
|
||||
// column_name (text)
|
||||
// type (text): text, blob, int, ...
|
||||
let row = row?;
|
||||
let name = row.columns[0].as_ref().unwrap().as_text().unwrap();
|
||||
ret.push(name.into());
|
||||
}
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
async fn check_event_tables(scy: &ScySession) -> Result<(), Error> {
|
||||
let stys = [
|
||||
"u8", "u16", "u32", "u64", "i8", "i16", "i32", "i64", "f32", "f64", "bool", "string",
|
||||
@@ -177,8 +297,8 @@ async fn check_event_tables(scy: &ScySession) -> Result<(), Error> {
|
||||
default_time_to_live: 60 * 60 * 1,
|
||||
compaction_window_size: 48,
|
||||
};
|
||||
if !check_table_readable(&desc.name(), scy).await? {
|
||||
scy.query(desc.cql(), ()).await?;
|
||||
if !has_table(&desc.name(), scy).await? {
|
||||
scy.query(desc.cql_create(), ()).await?;
|
||||
}
|
||||
let desc = EvTabDim1 {
|
||||
sty: sty.into(),
|
||||
@@ -194,103 +314,166 @@ async fn check_event_tables(scy: &ScySession) -> Result<(), Error> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn migrate_keyspace(scyconf: &ScyllaConfig) -> Result<(), Error> {
|
||||
let scy2 = create_session(scyconf).await?;
|
||||
pub async fn migrate_scylla_data_schema(scyconf: &ScyllaConfig) -> Result<(), Error> {
|
||||
let scy2 = create_session_no_ks(scyconf).await?;
|
||||
let scy = &scy2;
|
||||
if !check_table_readable("ts_msp", &scy).await? {
|
||||
|
||||
if !has_keyspace(&scyconf.keyspace, scy).await? {
|
||||
let rf = 2;
|
||||
let cql = format!("create keyspace {} with replication = {{ 'class': 'SimpleStrategy', 'replication_factor': {} }} and durable_writes = true;", scyconf.keyspace, rf);
|
||||
scy.query_iter(cql, ()).await?;
|
||||
info!("keyspace created");
|
||||
}
|
||||
|
||||
scy.use_keyspace(&scyconf.keyspace, true).await?;
|
||||
|
||||
if !has_table("ts_msp", &scy).await? {
|
||||
create_table_ts_msp(scy).await?;
|
||||
}
|
||||
|
||||
check_event_tables(scy).await?;
|
||||
{
|
||||
let desc = GenTwcsTab {
|
||||
name: "series_by_ts_msp".into(),
|
||||
cql: "(part int, ts_msp int, shape_kind int, scalar_type int, series bigint, primary key ((part, ts_msp, shape_kind, scalar_type), series))".into(),
|
||||
default_time_to_live: 60 * 60 * 5,
|
||||
compaction_window_size: 24 * 4,
|
||||
};
|
||||
if !check_table_readable(&desc.name(), scy).await? {
|
||||
scy.query(desc.cql(), ()).await?;
|
||||
}
|
||||
|
||||
if false {
|
||||
info!("early abort schema");
|
||||
return Ok(());
|
||||
}
|
||||
{
|
||||
let desc = GenTwcsTab {
|
||||
name: "connection_status".into(),
|
||||
cql: "(ts_msp bigint, ts_lsp bigint, kind int, addr text, primary key (ts_msp, ts_lsp))".into(),
|
||||
default_time_to_live: 60 * 60 * 1,
|
||||
compaction_window_size: 24 * 4,
|
||||
};
|
||||
if !check_table_readable(&desc.name(), scy).await? {
|
||||
scy.query(desc.cql(), ()).await?;
|
||||
}
|
||||
let tab = GenTwcsTab::new(
|
||||
"series_by_ts_msp",
|
||||
&[
|
||||
("part", "int"),
|
||||
("ts_msp", "int"),
|
||||
("shape_kind", "int"),
|
||||
("scalar_type", "int"),
|
||||
("series", "bigint"),
|
||||
],
|
||||
["part", "ts_msp", "shape_kind", "scalar_type"],
|
||||
["series"],
|
||||
dhours(5),
|
||||
ddays(4),
|
||||
);
|
||||
tab.create_if_missing(scy).await?;
|
||||
}
|
||||
{
|
||||
let desc = GenTwcsTab {
|
||||
name: "channel_status".into(),
|
||||
cql: "(series bigint, ts_msp bigint, ts_lsp bigint, kind int, primary key ((series, ts_msp), ts_lsp))"
|
||||
.into(),
|
||||
default_time_to_live: 60 * 60 * 1,
|
||||
compaction_window_size: 24 * 4,
|
||||
};
|
||||
if !check_table_readable(&desc.name(), scy).await? {
|
||||
scy.query(desc.cql(), ()).await?;
|
||||
}
|
||||
let tab = GenTwcsTab::new(
|
||||
"connection_status",
|
||||
&[
|
||||
("ts_msp", "bigint"),
|
||||
("ts_lsp", "bigint"),
|
||||
("kind", "int"),
|
||||
("addr", "text"),
|
||||
],
|
||||
["ts_msp"],
|
||||
["ts_lsp"],
|
||||
dhours(1),
|
||||
ddays(4),
|
||||
);
|
||||
tab.create_if_missing(scy).await?;
|
||||
}
|
||||
{
|
||||
let desc = GenTwcsTab {
|
||||
name: "channel_status_by_ts_msp".into(),
|
||||
cql: "(ts_msp bigint, ts_lsp bigint, series bigint, kind int, primary key (ts_msp, ts_lsp))".into(),
|
||||
default_time_to_live: 60 * 60 * 1,
|
||||
compaction_window_size: 24 * 4,
|
||||
};
|
||||
if !check_table_readable(&desc.name(), scy).await? {
|
||||
scy.query(desc.cql(), ()).await?;
|
||||
}
|
||||
let tab = GenTwcsTab::new(
|
||||
"channel_status",
|
||||
&[
|
||||
("series", "bigint"),
|
||||
("ts_msp", "bigint"),
|
||||
("ts_lsp", "bigint"),
|
||||
("kind", "int"),
|
||||
],
|
||||
["series", "ts_msp"],
|
||||
["ts_lsp"],
|
||||
dhours(1),
|
||||
ddays(4),
|
||||
);
|
||||
tab.create_if_missing(scy).await?;
|
||||
}
|
||||
{
|
||||
let desc = GenTwcsTab {
|
||||
name: "channel_ping".into(),
|
||||
cql: "(part int, ts_msp int, series bigint, ivl float, interest float, evsize int, primary key ((part, ts_msp), series))"
|
||||
.into(),
|
||||
default_time_to_live: 60 * 60 * 1,
|
||||
compaction_window_size: 24 * 4,
|
||||
};
|
||||
if !check_table_readable(&desc.name(), scy).await? {
|
||||
scy.query(desc.cql(), ()).await?;
|
||||
}
|
||||
let tab = GenTwcsTab::new(
|
||||
"channel_status_by_ts_msp",
|
||||
&[
|
||||
("ts_msp", "bigint"),
|
||||
("ts_lsp", "bigint"),
|
||||
("series", "bigint"),
|
||||
("kind", "int"),
|
||||
],
|
||||
["ts_msp"],
|
||||
["ts_lsp"],
|
||||
dhours(1),
|
||||
ddays(4),
|
||||
);
|
||||
tab.create_if_missing(scy).await?;
|
||||
}
|
||||
{
|
||||
let desc = GenTwcsTab {
|
||||
name: "muted".into(),
|
||||
cql: "(part int, series bigint, ts bigint, ema float, emd float, primary key (part, series, ts))".into(),
|
||||
default_time_to_live: 60 * 60 * 4,
|
||||
compaction_window_size: 24 * 1,
|
||||
};
|
||||
if !check_table_readable(&desc.name(), scy).await? {
|
||||
scy.query(desc.cql(), ()).await?;
|
||||
}
|
||||
let tab = GenTwcsTab::new(
|
||||
"channel_ping",
|
||||
&[
|
||||
("part", "int"),
|
||||
("ts_msp", "int"),
|
||||
("series", "bigint"),
|
||||
("ivl", "float"),
|
||||
("interest", "float"),
|
||||
("evsize", "int"),
|
||||
],
|
||||
["part", "ts_msp"],
|
||||
["series"],
|
||||
dhours(1),
|
||||
ddays(4),
|
||||
);
|
||||
tab.create_if_missing(scy).await?;
|
||||
}
|
||||
{
|
||||
let desc = GenTwcsTab {
|
||||
name: "item_recv_ivl".into(),
|
||||
cql: "(part int, series bigint, ts bigint, ema float, emd float, primary key (part, series, ts))".into(),
|
||||
default_time_to_live: 60 * 60 * 4,
|
||||
compaction_window_size: 24 * 1,
|
||||
};
|
||||
if !check_table_readable(&desc.name(), scy).await? {
|
||||
scy.query(desc.cql(), ()).await?;
|
||||
}
|
||||
let tab = GenTwcsTab::new(
|
||||
"muted",
|
||||
&[
|
||||
("part", "int"),
|
||||
("series", "bigint"),
|
||||
("ts", "bigint"),
|
||||
("ema", "float"),
|
||||
("emd", "float"),
|
||||
],
|
||||
["part"],
|
||||
["series", "ts"],
|
||||
dhours(4),
|
||||
ddays(1),
|
||||
);
|
||||
tab.create_if_missing(scy).await?;
|
||||
}
|
||||
{
|
||||
let desc = GenTwcsTab {
|
||||
name: "binned_scalar_f32_v01".into(),
|
||||
cql: "(series bigint, bin_len_sec int, bin_count int, off_msp int, off_lsp int, counts frozen<list<bigint>>, mins frozen<list<float>>, maxs frozen<list<float>>, avgs frozen<list<float>>, primary key ((series, bin_len_sec, bin_count, off_msp), off_lsp))"
|
||||
.into(),
|
||||
default_time_to_live: 60 * 60 * 24 * 30,
|
||||
compaction_window_size: 24 * 4,
|
||||
};
|
||||
if !check_table_readable(&desc.name(), scy).await? {
|
||||
scy.query(desc.cql(), ()).await?;
|
||||
}
|
||||
let tab = GenTwcsTab::new(
|
||||
"item_recv_ivl",
|
||||
&[
|
||||
("part", "int"),
|
||||
("series", "bigint"),
|
||||
("ts", "bigint"),
|
||||
("ema", "float"),
|
||||
("emd", "float"),
|
||||
],
|
||||
["part"],
|
||||
["series", "ts"],
|
||||
dhours(4),
|
||||
ddays(1),
|
||||
);
|
||||
tab.create_if_missing(scy).await?;
|
||||
}
|
||||
{
|
||||
let tab = GenTwcsTab::new(
|
||||
"binned_scalar_f32_v01",
|
||||
&[
|
||||
("series", "bigint"),
|
||||
("bin_len_sec", "int"),
|
||||
("bin_count", "int"),
|
||||
("off_msp", "int"),
|
||||
("off_lsp", "int"),
|
||||
("counts", "frozen<list<bigint>>"),
|
||||
("mins", "frozen<list<float>>"),
|
||||
("maxs", "frozen<list<float>>"),
|
||||
("avgs", "frozen<list<float>>"),
|
||||
],
|
||||
["series", "bin_len_sec", "bin_count", "off_msp"],
|
||||
["off_lsp"],
|
||||
ddays(30),
|
||||
ddays(4),
|
||||
);
|
||||
tab.create_if_missing(scy).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -20,10 +20,9 @@ impl From<NewSessionError> for Error {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn create_session(scyconf: &ScyllaConfig) -> Result<Arc<Session>, Error> {
|
||||
pub async fn create_session_no_ks(scyconf: &ScyllaConfig) -> Result<Arc<Session>, Error> {
|
||||
let scy = scylla::SessionBuilder::new()
|
||||
.known_nodes(&scyconf.hosts)
|
||||
.use_keyspace(&scyconf.keyspace, true)
|
||||
.default_execution_profile_handle(
|
||||
ExecutionProfileBuilder::default()
|
||||
.consistency(Consistency::LocalOne)
|
||||
@@ -35,3 +34,11 @@ pub async fn create_session(scyconf: &ScyllaConfig) -> Result<Arc<Session>, Erro
|
||||
let scy = Arc::new(scy);
|
||||
Ok(scy)
|
||||
}
|
||||
|
||||
pub async fn create_session(scyconf: &ScyllaConfig) -> Result<Arc<Session>, Error> {
|
||||
let scy = create_session_no_ks(scyconf).await?;
|
||||
scy.use_keyspace(&scyconf.keyspace, true)
|
||||
.await
|
||||
.map_err(|e| Error::NewSession(e.to_string()))?;
|
||||
Ok(scy)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user