From 837265a7b376c0e5ca0f9796c0d3b7f1a95735f9 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Sun, 27 Aug 2023 22:45:10 +0200 Subject: [PATCH] WIP --- daqingest/src/daemon.rs | 6 +- dbpg/Cargo.toml | 13 + dbpg/src/conn.rs | 17 + dbpg/src/err.rs | 20 ++ dbpg/src/iocindex.rs | 114 +++++++ dbpg/src/lib.rs | 5 + dbpg/src/pool.rs | 114 +++++++ netfetch/src/dbpg.rs => dbpg/src/schema.rs | 39 +-- netfetch/Cargo.toml | 2 +- netfetch/src/ca/search.rs | 102 ++---- netfetch/src/lib.rs | 1 - scywr/src/schema.rs | 377 +++++++++++++++------ scywr/src/session.rs | 11 +- 13 files changed, 607 insertions(+), 214 deletions(-) create mode 100644 dbpg/Cargo.toml create mode 100644 dbpg/src/conn.rs create mode 100644 dbpg/src/err.rs create mode 100644 dbpg/src/iocindex.rs create mode 100644 dbpg/src/lib.rs create mode 100644 dbpg/src/pool.rs rename netfetch/src/dbpg.rs => dbpg/src/schema.rs (58%) diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index f3553a9..6ef834e 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -1553,9 +1553,13 @@ pub async fn run(opts: CaIngestOpts, channels: Vec) -> Result<(), Error> netfetch::dbpg::schema_check(opts.postgresql_config()).await?; - scywr::schema::migrate_keyspace(opts.scylla_config()) + scywr::schema::migrate_scylla_data_schema(opts.scylla_config()) .await .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; + info!("migrate_keyspace done"); + if true { + return Ok(()); + } // TODO use a new stats type: //let store_stats = Arc::new(CaConnStats::new()); diff --git a/dbpg/Cargo.toml b/dbpg/Cargo.toml new file mode 100644 index 0000000..e15a610 --- /dev/null +++ b/dbpg/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "dbpg" +version = "0.0.1" +authors = ["Dominik Werder "] +edition = "2021" + +[dependencies] +log = { path = "../log" } +err = { path = "../../daqbuffer/crates/err" } +netpod = { path = "../../daqbuffer/crates/netpod" } +taskrun = { path = "../../daqbuffer/crates/taskrun" } +tokio-postgres = { version = "0.7.10", features = ["with-chrono-0_4"] } +async-channel = "1.9.0" diff --git a/dbpg/src/conn.rs b/dbpg/src/conn.rs new file mode 100644 index 0000000..0aca872 --- /dev/null +++ b/dbpg/src/conn.rs @@ -0,0 +1,17 @@ +use crate::err::Error; +use log::*; +use netpod::Database; +use taskrun::tokio; +use tokio_postgres::Client; + +pub type PgClient = Client; + +pub async fn make_pg_client(dbconf: &Database) -> Result { + let d = dbconf; + let url = format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, d.port, d.name); + info!("connect to {url}"); + let (client, pg_conn) = tokio_postgres::connect(&url, tokio_postgres::tls::NoTls).await?; + // TODO allow clean shutdown on ctrl-c and join the pg_conn in the end: + tokio::spawn(pg_conn); + Ok(client) +} diff --git a/dbpg/src/err.rs b/dbpg/src/err.rs new file mode 100644 index 0000000..f27d63e --- /dev/null +++ b/dbpg/src/err.rs @@ -0,0 +1,20 @@ +use err::thiserror; +use err::ThisError; + +#[derive(Debug)] +pub struct Msg(pub String); + +#[derive(Debug, ThisError)] +pub enum Error { + Postgres(#[from] tokio_postgres::Error), + Msg(Msg), +} + +impl Error { + pub fn from_msg(msg: T) -> Self + where + T: Into, + { + Self::Msg(Msg(msg.into())) + } +} diff --git a/dbpg/src/iocindex.rs b/dbpg/src/iocindex.rs new file mode 100644 index 0000000..1f4188d --- /dev/null +++ b/dbpg/src/iocindex.rs @@ -0,0 +1,114 @@ +use crate::conn::PgClient; +use crate::err::Error; +use async_channel::Receiver; +use log::*; +use std::net::SocketAddrV4; +use std::time::Duration; +use tokio_postgres::Statement; + +const TEXT: tokio_postgres::types::Type = tokio_postgres::types::Type::TEXT; + +#[derive(Debug)] +pub struct IocItem { + pub channel: String, + pub response_addr: Option, + pub addr: Option, + pub dt: Duration, +} + +impl IocItem { + pub fn new(channel: String, response_addr: Option, addr: Option, dt: Duration) -> Self { + Self { + channel, + response_addr, + addr, + dt, + } + } +} + +// TODO +// Issue: the prepared statements are bound to a connection. +// A connection pool is therefore often not a good model. +// Better would probably be to hand a database-work-item to a queue. +// A worker can then deal with the query. +// Must choose between: some async fn waits for that item being finished. +// Or: enqueue-and-forget and let the result of that query be enqueued +// in yet another queue. +pub struct IocSearchIndexWorker { + backend: String, + pg: PgClient, + qu_select: Statement, + qu_update_tsmod: Statement, + qu_update_archived: Statement, + qu_insert: Statement, + rx: Receiver, +} + +impl IocSearchIndexWorker { + pub async fn prepare(rx: Receiver, backend: String, pg: PgClient) -> Result { + let qu_select = { + let sql = "select channel, addr from ioc_by_channel_log where facility = $1 and channel = $2 and addr is not distinct from $3 and archived = 0"; + pg.prepare(sql).await.unwrap() + }; + let qu_update_tsmod = { + let sql = "update ioc_by_channel_log set tsmod = now(), responseaddr = $4 where facility = $1 and channel = $2 and addr is not distinct from $3 and archived = 0"; + pg.prepare(sql).await.unwrap() + }; + let qu_update_archived = { + let sql = + "update ioc_by_channel_log set archived = 1 where facility = $1 and channel = $2 and archived = 0"; + pg.prepare(sql).await.unwrap() + }; + let qu_insert = { + let sql = "insert into ioc_by_channel_log (facility, channel, addr, responseaddr) values ($1, $2, $3, $4)"; + pg.prepare_typed(sql, &[TEXT, TEXT, TEXT, TEXT]).await.unwrap() + }; + let ret = Self { + backend, + pg, + qu_select, + qu_update_tsmod, + qu_update_archived, + qu_insert, + rx, + }; + Ok(ret) + } + + pub async fn worker(&self) { + let backend = &self.backend; + let rx = &self.rx; + let pg = &self.pg; + while let Ok(item) = rx.recv().await { + let responseaddr = item.response_addr.map(|x| x.to_string()); + let addr = item.addr.map(|x| x.to_string()); + let res = pg + .query(&self.qu_select, &[backend, &item.channel, &addr]) + .await + .unwrap(); + if res.len() == 0 { + pg.execute(&self.qu_update_archived, &[backend, &item.channel]) + .await + .unwrap(); + pg.execute(&self.qu_insert, &[backend, &item.channel, &addr, &responseaddr]) + .await + .unwrap(); + } else if res.len() == 1 { + pg.execute(&self.qu_update_tsmod, &[backend, &item.channel, &addr, &responseaddr]) + .await + .unwrap(); + } else { + warn!("Duplicate for {}", item.channel); + let sql = concat!( + "with q1 as (select ctid from ioc_by_channel_log where facility = $1 and channel = $2 and addr is not distinct from $3 order by tsmod desc, ctid desc limit 1)", + " update ioc_by_channel_log t set archived = 1 from q1 where t.facility = $1 and t.channel = $2 and t.ctid != q1.ctid and archived != 1", + ); + pg.execute(sql, &[backend, &item.channel, &addr]).await.unwrap(); + pg.execute(&self.qu_update_tsmod, &[&backend, &item.channel, &addr, &responseaddr]) + .await + .unwrap(); + } + } + } +} diff --git a/dbpg/src/lib.rs b/dbpg/src/lib.rs new file mode 100644 index 0000000..e21871b --- /dev/null +++ b/dbpg/src/lib.rs @@ -0,0 +1,5 @@ +pub mod conn; +pub mod err; +pub mod iocindex; +pub mod pool; +pub mod schema; diff --git a/dbpg/src/pool.rs b/dbpg/src/pool.rs new file mode 100644 index 0000000..605e5f2 --- /dev/null +++ b/dbpg/src/pool.rs @@ -0,0 +1,114 @@ +use crate::conn::PgClient; +use async_channel::Receiver; +use async_channel::RecvError; +use async_channel::SendError; +use async_channel::Sender; +use err::thiserror; +use err::ThisError; +use log::*; +use netpod::Database; + +#[derive(Debug, ThisError)] +pub enum Error { + Postgres(#[from] tokio_postgres::Error), + EndOfPool, + ChannelRecv(#[from] RecvError), + ChannelSend, + Msg(String), +} + +impl From for Error { + fn from(value: crate::err::Error) -> Self { + type G = crate::err::Error; + match value { + G::Postgres(e) => Error::Postgres(e), + G::Msg(e) => Error::Msg(e.0), + } + } +} + +impl From> for Error { + fn from(_: SendError) -> Self { + Self::ChannelSend + } +} + +struct PgClientInner { + pgc: PgClient, + handout_count: u64, +} + +pub struct PgClientPooled { + pgc: Option, + tx: Sender, +} + +impl Drop for PgClientPooled { + fn drop(&mut self) { + // async: the channel capacity is chosen large enough so + // all pooled connections have space. + match self.tx.try_send(self.pgc.take().unwrap()) { + Ok(()) => {} + Err(e) => { + use async_channel::TrySendError; + match e { + TrySendError::Full(_) => { + error!("can not return pooled database handle, pool is blocked"); + } + TrySendError::Closed(_) => { + warn!("can not return pooled database handle, pool is closed") + } + } + } + } + } +} + +impl PgClientPooled { + pub fn pgc(&self) -> &PgClient { + &self.pgc.as_ref().unwrap().pgc + } +} + +pub struct PgPool { + tx: Sender, + rx: Receiver, + handout_count: u64, +} + +impl PgPool { + pub async fn new(cap: usize, dbconf: &Database) -> Result { + let (tx, rx) = async_channel::bounded(2 + cap); + for _ in 0..cap { + let pgc = crate::conn::make_pg_client(dbconf).await?; + let pgc = PgClientInner { pgc, handout_count: 0 }; + tx.send(pgc).await?; + } + let ret = PgPool { + tx, + rx, + handout_count: 0, + }; + Ok(ret) + } + + pub async fn conn(&self) -> Result { + let mut pgc = self.rx.recv().await?; + pgc.handout_count += 1; + let ret = PgClientPooled { + pgc: Some(pgc), + tx: self.tx.clone(), + }; + is_it_send(&ret); + is_it_sync(&ret); + let mut ret = ret; + is_it_mut_sync(&mut ret); + Ok(ret) + } +} + +fn is_it_send(_: &T) {} + +fn is_it_sync(_: &T) {} + +fn is_it_mut_sync(_: &mut T) {} diff --git a/netfetch/src/dbpg.rs b/dbpg/src/schema.rs similarity index 58% rename from netfetch/src/dbpg.rs rename to dbpg/src/schema.rs index c393797..a805acc 100644 --- a/netfetch/src/dbpg.rs +++ b/dbpg/src/schema.rs @@ -1,20 +1,6 @@ -use crate::errconv::ErrConv; -use err::Error; -use netpod::log::*; -use netpod::Database; -use taskrun::tokio; -use tokio_postgres::Client as PgClient; - -pub async fn make_pg_client(d: &Database) -> Result { - let url = format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, d.port, d.name); - info!("connect to {url}"); - let (client, pg_conn) = tokio_postgres::connect(&url, tokio_postgres::tls::NoTls) - .await - .err_conv()?; - // TODO allow clean shutdown on ctrl-c and join the pg_conn in the end: - tokio::spawn(pg_conn); - Ok(client) -} +use crate::conn::PgClient; +use crate::err::Error; +use log::*; async fn has_column(table: &str, column: &str, pgc: &PgClient) -> Result { let rows = pgc @@ -22,8 +8,7 @@ async fn has_column(table: &str, column: &str, pgc: &PgClient) -> Result Result Result<(), Error> { "alter table ioc_by_channel_log add tscreate timestamptz not null default now()", &[], ) - .await - .err_conv()?; + .await?; } if !has_column("ioc_by_channel_log", "archived", pgc).await? { pgc.execute( "alter table ioc_by_channel_log add archived int not null default 0", &[], ) - .await - .err_conv()?; + .await?; } { match pgc.execute("alter table series_by_channel add constraint series_by_channel_nondup unique (facility, channel, scalar_type, shape_dims, agg_kind)", &[]).await { @@ -71,8 +51,7 @@ async fn migrate_00(pgc: &PgClient) -> Result<(), Error> { Ok(()) } -pub async fn schema_check(db: &Database) -> Result<(), Error> { - let pgc = make_pg_client(db).await?; +pub async fn schema_check(pgc: &PgClient) -> Result<(), Error> { migrate_00(&pgc).await?; info!("schema_check done"); Ok(()) diff --git a/netfetch/Cargo.toml b/netfetch/Cargo.toml index 8e66fe3..0d4ed3f 100644 --- a/netfetch/Cargo.toml +++ b/netfetch/Cargo.toml @@ -16,7 +16,6 @@ bytes = "1.4" arrayref = "0.3" byteorder = "1.4" futures-util = "0.3" -tokio-postgres = "0.7.8" md-5 = "0.10" hex = "0.4" libc = "0.2" @@ -33,6 +32,7 @@ lazy_static = "1" log = { path = "../log" } stats = { path = "../stats" } scywr = { path = "../scywr" } +dbpg = { path = "../dbpg" } err = { path = "../../daqbuffer/crates/err" } netpod = { path = "../../daqbuffer/crates/netpod" } items_0 = { path = "../../daqbuffer/crates/items_0" } diff --git a/netfetch/src/ca/search.rs b/netfetch/src/ca/search.rs index 50f17a0..7d28a7b 100644 --- a/netfetch/src/ca/search.rs +++ b/netfetch/src/ca/search.rs @@ -1,19 +1,19 @@ -use super::findioc::FindIocRes; use crate::ca::findioc::FindIocStream; use crate::conf::CaIngestOpts; use async_channel::Receiver; use async_channel::Sender; +use dbpg::conn::PgClient; +use dbpg::iocindex::IocItem; +use dbpg::iocindex::IocSearchIndexWorker; use err::Error; use futures_util::StreamExt; use log::*; -use netpod::Database; use std::net::IpAddr; use std::net::SocketAddr; use std::time::Duration; use std::time::Instant; use taskrun::tokio; use tokio::task::JoinHandle; -use tokio_postgres::Client as PgClient; const DB_WORKER_COUNT: usize = 4; @@ -59,89 +59,23 @@ struct DbUpdateWorker { } impl DbUpdateWorker { - fn new(rx: Receiver, backend: String, database: Database) -> Self { - let jh = tokio::spawn(Self::worker(rx, backend, database)); - Self { jh } - } - - async fn worker(rx: Receiver, backend: String, database: Database) { - let d = &database; - let (pg_client, pg_conn) = tokio_postgres::connect( - &format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, d.port, d.name), - tokio_postgres::tls::NoTls, - ) - .await - .unwrap(); - let (pgconn_out_tx, pgconn_out_rx) = async_channel::bounded(16); - tokio::spawn(async move { - if let Err(e) = pgconn_out_tx.send(pg_conn.await).await { - error!("can not report status of pg conn {e}"); - } - }); - let pg_client: PgClient = pg_client; - let qu_select = { - let sql = "select channel, addr from ioc_by_channel_log where facility = $1 and channel = $2 and addr is not distinct from $3 and archived = 0"; - pg_client.prepare(sql).await.unwrap() - }; - let qu_update_tsmod = { - let sql = "update ioc_by_channel_log set tsmod = now(), responseaddr = $4 where facility = $1 and channel = $2 and addr is not distinct from $3 and archived = 0"; - pg_client.prepare(sql).await.unwrap() - }; - let qu_update_archived = { - let sql = - "update ioc_by_channel_log set archived = 1 where facility = $1 and channel = $2 and archived = 0"; - pg_client.prepare(sql).await.unwrap() - }; - let qu_insert = { - let sql = "insert into ioc_by_channel_log (facility, channel, addr, responseaddr) values ($1, $2, $3, $4)"; - const TEXT: tokio_postgres::types::Type = tokio_postgres::types::Type::TEXT; - pg_client.prepare_typed(sql, &[TEXT, TEXT, TEXT, TEXT]).await.unwrap() - }; - while let Ok(item) = rx.recv().await { - let responseaddr = item.response_addr.map(|x| x.to_string()); - let addr = item.addr.map(|x| x.to_string()); - let res = pg_client - .query(&qu_select, &[&backend, &item.channel, &addr]) - .await - .unwrap(); - if res.len() == 0 { - pg_client - .execute(&qu_update_archived, &[&backend, &item.channel]) - .await - .unwrap(); - pg_client - .execute(&qu_insert, &[&backend, &item.channel, &addr, &responseaddr]) - .await - .unwrap(); - } else if res.len() == 1 { - pg_client - .execute(&qu_update_tsmod, &[&backend, &item.channel, &addr, &responseaddr]) - .await - .unwrap(); - } else { - warn!("Duplicate for {}", item.channel); - let sql = concat!( - "with q1 as (select ctid from ioc_by_channel_log where facility = $1 and channel = $2 and addr is not distinct from $3 order by tsmod desc, ctid desc limit 1)", - " update ioc_by_channel_log t set archived = 1 from q1 where t.facility = $1 and t.channel = $2 and t.ctid != q1.ctid and archived != 1", - ); - pg_client.execute(sql, &[&backend, &item.channel, &addr]).await.unwrap(); - pg_client - .execute(&qu_update_tsmod, &[&backend, &item.channel, &addr, &responseaddr]) - .await - .unwrap(); - } - } - drop(pg_client); - let x = pgconn_out_rx.recv().await; - if let Err(e) = x { - error!("db worker sees: {e}"); - } + async fn new(rx: Receiver, backend: String, pg: PgClient) -> Result { + let worker = IocSearchIndexWorker::prepare(rx, backend, pg) + .await + .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; + let jh = tokio::spawn(async move { worker.worker().await }); + Ok(Self { jh }) } } pub async fn ca_search(opts: CaIngestOpts, channels: &Vec) -> Result<(), Error> { info!("ca_search begin"); - crate::dbpg::schema_check(opts.postgresql_config()).await?; + let pg = dbpg::conn::make_pg_client(opts.postgresql_config()) + .await + .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; + dbpg::schema::schema_check(&pg) + .await + .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; let mut addrs = Vec::new(); for s in opts.search() { match resolve_address(s).await { @@ -189,7 +123,10 @@ pub async fn ca_search(opts: CaIngestOpts, channels: &Vec) -> Result<(), let mut dbworkers = Vec::new(); for _ in 0..DB_WORKER_COUNT { - let w = DbUpdateWorker::new(dbrx.clone(), opts.backend().into(), opts.postgresql_config().clone()); + let pg = dbpg::conn::make_pg_client(opts.postgresql_config()) + .await + .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; + let w = DbUpdateWorker::new(dbrx.clone(), opts.backend().into(), pg).await?; dbworkers.push(w); } drop(dbrx); @@ -241,6 +178,7 @@ pub async fn ca_search(opts: CaIngestOpts, channels: &Vec) -> Result<(), if do_block { info!("blacklisting {item:?}"); } else { + let item = IocItem::new(item.channel, item.response_addr, item.addr, item.dt); match dbtx.send(item).await { Ok(_) => {} Err(_) => { diff --git a/netfetch/src/lib.rs b/netfetch/src/lib.rs index b7fd74c..1264cfe 100644 --- a/netfetch/src/lib.rs +++ b/netfetch/src/lib.rs @@ -5,7 +5,6 @@ pub mod bsreadclient; pub mod ca; pub mod conf; pub mod daemon_common; -pub mod dbpg; pub mod errconv; pub mod insertworker; pub mod linuxhelper; diff --git a/scywr/src/schema.rs b/scywr/src/schema.rs index ab327ba..564849d 100644 --- a/scywr/src/schema.rs +++ b/scywr/src/schema.rs @@ -1,12 +1,14 @@ -use crate::session::create_session; +use crate::session::create_session_no_ks; use crate::session::ScySession; use err::thiserror; use err::ThisError; use futures_util::StreamExt; +use log::*; use netpod::ScyllaConfig; use scylla::transport::errors::DbError; use scylla::transport::errors::QueryError; use std::fmt; +use std::time::Duration; #[derive(Debug, ThisError)] pub enum Error { @@ -24,14 +26,24 @@ impl From for Error { } } +pub async fn has_keyspace(name: &str, scy: &ScySession) -> Result { + let cql = "select keyspace_name from system_schema.keyspaces where keyspace_name = ?"; + let mut res = scy.query_iter(cql, (name,)).await?; + while let Some(k) = res.next().await { + let row = k?; + if let Some(table_name) = row.columns[0].as_ref().unwrap().as_text() { + if table_name == name { + return Ok(true); + } + } + } + Ok(false) +} + pub async fn has_table(name: &str, scy: &ScySession) -> Result { + let cql = "select table_name from system_schema.tables where keyspace_name = ?"; let ks = scy.get_keyspace().ok_or_else(|| Error::NoKeyspaceChosen)?; - let mut res = scy - .query_iter( - "select table_name from system_schema.tables where keyspace_name = ?", - (ks.as_ref(),), - ) - .await?; + let mut res = scy.query_iter(cql, (ks.as_ref(),)).await?; while let Some(k) = res.next().await { let row = k?; if let Some(table_name) = row.columns[0].as_ref().unwrap().as_text() { @@ -79,30 +91,120 @@ pub async fn create_table_ts_msp(scy: &ScySession) -> Result<(), Error> { Ok(()) } +fn dhours(x: u64) -> Duration { + Duration::from_secs(60 * 60 * x) +} + +fn ddays(x: u64) -> Duration { + Duration::from_secs(60 * 60 * 24 * x) +} + struct GenTwcsTab { name: String, - cql: String, - default_time_to_live: usize, - compaction_window_size: usize, + col_names: Vec, + col_types: Vec, + partition_keys: Vec, + cluster_keys: Vec, + default_time_to_live: Duration, + compaction_window_size: Duration, } impl GenTwcsTab { - fn name(&self) -> String { - self.name.clone() + // name: "series_by_ts_msp".into(), + // cql: "(part int, ts_msp int, shape_kind int, scalar_type int, series bigint, primary key ((part, ts_msp, shape_kind, scalar_type), series))".into(), + // default_time_to_live: 60 * 60 * 5, + // compaction_window_size: 24 * 4, + pub fn new<'a, N, CI, A, B, I2, I2A, I3, I3A>( + name: N, + cols: CI, + partition_keys: I2, + cluster_keys: I3, + default_time_to_live: Duration, + compaction_window_size: Duration, + ) -> Self + where + N: Into, + CI: IntoIterator, + // TODO could make for idiomatic to skip extra clone if passed value is already String + A: AsRef + 'a, + B: AsRef + 'a, + I2: IntoIterator, + I3: IntoIterator, + I2A: Into, + I3A: Into, + { + let mut col_names = Vec::new(); + let mut col_types = Vec::new(); + cols.into_iter().for_each(|(a, b)| { + col_names.push(a.as_ref().into()); + col_types.push(b.as_ref().into()); + }); + Self { + name: name.into(), + col_names, + col_types, + partition_keys: partition_keys.into_iter().map(Into::into).collect(), + cluster_keys: cluster_keys.into_iter().map(Into::into).collect(), + default_time_to_live, + compaction_window_size, + } + } + + fn name(&self) -> &str { + &self.name } fn cql(&self) -> String { use std::fmt::Write; + let pkey = if self.partition_keys.len() == 0 { + panic!("some partition key is required"); + } else { + self.partition_keys.join(", ") + }; + let pkeys = format!("({})", pkey); + let pkeys = if self.cluster_keys.len() == 0 { + format!("({})", pkeys) + } else { + format!("({}, {})", pkeys, self.cluster_keys.join(", ")) + }; let mut s = String::new(); write!(s, "create table {}", self.name()).unwrap(); - s.write_str(&self.cql).unwrap(); - write!(s, " with default_time_to_live = {}", self.default_time_to_live).unwrap(); + let mut cols: Vec<_> = self + .col_names + .iter() + .zip(self.col_types.iter()) + .map(|(n, t)| format!("{} {}", n, t)) + .collect(); + cols.push(format!("primary key {pkeys}")); + let cols = cols.join(", "); + write!(s, " ({})", cols).unwrap(); + write!( + s, + " with default_time_to_live = {}", + self.default_time_to_live.as_secs() + ) + .unwrap(); s.write_str(" and compaction = { 'class': 'TimeWindowCompactionStrategy', 'compaction_window_unit': 'HOURS'") .unwrap(); - write!(s, ", 'compaction_window_size': {}", self.compaction_window_size).unwrap(); + write!( + s, + ", 'compaction_window_size': {}", + self.compaction_window_size.as_secs() / 60 / 60 + ) + .unwrap(); s.write_str(" }").unwrap(); s } + + async fn create_if_missing(&self, scy: &ScySession) -> Result<(), Error> { + // TODO check for more details (all columns, correct types, correct kinds, etc) + if !has_table(self.name(), scy).await? { + let cql = self.cql(); + info!("CREATE CQL: {cql}"); + scy.query(self.cql(), ()).await?; + } + Ok(()) + } } struct EvTabDim0 { @@ -119,7 +221,7 @@ impl EvTabDim0 { format!("events_scalar_{}", self.sty) } - fn cql(&self) -> String { + fn cql_create(&self) -> String { use std::fmt::Write; let mut s = String::new(); write!(s, "create table {}", self.name()).unwrap(); @@ -161,6 +263,24 @@ impl EvTabDim1 { } } +#[allow(unused)] +async fn get_columns(keyspace: &str, table: &str, scy: &ScySession) -> Result, Error> { + let mut ret = Vec::new(); + let cql = "select column_name, kind, type from system_schema.columns where keyspace_name = ? and table_name = ?"; + let params = (keyspace, table); + let mut res = scy.query_iter(cql, params).await?; + while let Some(row) = res.next().await { + // columns: + // kind (text): regular, clustering, partition_key. + // column_name (text) + // type (text): text, blob, int, ... + let row = row?; + let name = row.columns[0].as_ref().unwrap().as_text().unwrap(); + ret.push(name.into()); + } + Ok(ret) +} + async fn check_event_tables(scy: &ScySession) -> Result<(), Error> { let stys = [ "u8", "u16", "u32", "u64", "i8", "i16", "i32", "i64", "f32", "f64", "bool", "string", @@ -177,8 +297,8 @@ async fn check_event_tables(scy: &ScySession) -> Result<(), Error> { default_time_to_live: 60 * 60 * 1, compaction_window_size: 48, }; - if !check_table_readable(&desc.name(), scy).await? { - scy.query(desc.cql(), ()).await?; + if !has_table(&desc.name(), scy).await? { + scy.query(desc.cql_create(), ()).await?; } let desc = EvTabDim1 { sty: sty.into(), @@ -194,103 +314,166 @@ async fn check_event_tables(scy: &ScySession) -> Result<(), Error> { Ok(()) } -pub async fn migrate_keyspace(scyconf: &ScyllaConfig) -> Result<(), Error> { - let scy2 = create_session(scyconf).await?; +pub async fn migrate_scylla_data_schema(scyconf: &ScyllaConfig) -> Result<(), Error> { + let scy2 = create_session_no_ks(scyconf).await?; let scy = &scy2; - if !check_table_readable("ts_msp", &scy).await? { + + if !has_keyspace(&scyconf.keyspace, scy).await? { + let rf = 2; + let cql = format!("create keyspace {} with replication = {{ 'class': 'SimpleStrategy', 'replication_factor': {} }} and durable_writes = true;", scyconf.keyspace, rf); + scy.query_iter(cql, ()).await?; + info!("keyspace created"); + } + + scy.use_keyspace(&scyconf.keyspace, true).await?; + + if !has_table("ts_msp", &scy).await? { create_table_ts_msp(scy).await?; } + check_event_tables(scy).await?; - { - let desc = GenTwcsTab { - name: "series_by_ts_msp".into(), - cql: "(part int, ts_msp int, shape_kind int, scalar_type int, series bigint, primary key ((part, ts_msp, shape_kind, scalar_type), series))".into(), - default_time_to_live: 60 * 60 * 5, - compaction_window_size: 24 * 4, - }; - if !check_table_readable(&desc.name(), scy).await? { - scy.query(desc.cql(), ()).await?; - } + + if false { + info!("early abort schema"); + return Ok(()); } { - let desc = GenTwcsTab { - name: "connection_status".into(), - cql: "(ts_msp bigint, ts_lsp bigint, kind int, addr text, primary key (ts_msp, ts_lsp))".into(), - default_time_to_live: 60 * 60 * 1, - compaction_window_size: 24 * 4, - }; - if !check_table_readable(&desc.name(), scy).await? { - scy.query(desc.cql(), ()).await?; - } + let tab = GenTwcsTab::new( + "series_by_ts_msp", + &[ + ("part", "int"), + ("ts_msp", "int"), + ("shape_kind", "int"), + ("scalar_type", "int"), + ("series", "bigint"), + ], + ["part", "ts_msp", "shape_kind", "scalar_type"], + ["series"], + dhours(5), + ddays(4), + ); + tab.create_if_missing(scy).await?; } { - let desc = GenTwcsTab { - name: "channel_status".into(), - cql: "(series bigint, ts_msp bigint, ts_lsp bigint, kind int, primary key ((series, ts_msp), ts_lsp))" - .into(), - default_time_to_live: 60 * 60 * 1, - compaction_window_size: 24 * 4, - }; - if !check_table_readable(&desc.name(), scy).await? { - scy.query(desc.cql(), ()).await?; - } + let tab = GenTwcsTab::new( + "connection_status", + &[ + ("ts_msp", "bigint"), + ("ts_lsp", "bigint"), + ("kind", "int"), + ("addr", "text"), + ], + ["ts_msp"], + ["ts_lsp"], + dhours(1), + ddays(4), + ); + tab.create_if_missing(scy).await?; } { - let desc = GenTwcsTab { - name: "channel_status_by_ts_msp".into(), - cql: "(ts_msp bigint, ts_lsp bigint, series bigint, kind int, primary key (ts_msp, ts_lsp))".into(), - default_time_to_live: 60 * 60 * 1, - compaction_window_size: 24 * 4, - }; - if !check_table_readable(&desc.name(), scy).await? { - scy.query(desc.cql(), ()).await?; - } + let tab = GenTwcsTab::new( + "channel_status", + &[ + ("series", "bigint"), + ("ts_msp", "bigint"), + ("ts_lsp", "bigint"), + ("kind", "int"), + ], + ["series", "ts_msp"], + ["ts_lsp"], + dhours(1), + ddays(4), + ); + tab.create_if_missing(scy).await?; } { - let desc = GenTwcsTab { - name: "channel_ping".into(), - cql: "(part int, ts_msp int, series bigint, ivl float, interest float, evsize int, primary key ((part, ts_msp), series))" - .into(), - default_time_to_live: 60 * 60 * 1, - compaction_window_size: 24 * 4, - }; - if !check_table_readable(&desc.name(), scy).await? { - scy.query(desc.cql(), ()).await?; - } + let tab = GenTwcsTab::new( + "channel_status_by_ts_msp", + &[ + ("ts_msp", "bigint"), + ("ts_lsp", "bigint"), + ("series", "bigint"), + ("kind", "int"), + ], + ["ts_msp"], + ["ts_lsp"], + dhours(1), + ddays(4), + ); + tab.create_if_missing(scy).await?; } { - let desc = GenTwcsTab { - name: "muted".into(), - cql: "(part int, series bigint, ts bigint, ema float, emd float, primary key (part, series, ts))".into(), - default_time_to_live: 60 * 60 * 4, - compaction_window_size: 24 * 1, - }; - if !check_table_readable(&desc.name(), scy).await? { - scy.query(desc.cql(), ()).await?; - } + let tab = GenTwcsTab::new( + "channel_ping", + &[ + ("part", "int"), + ("ts_msp", "int"), + ("series", "bigint"), + ("ivl", "float"), + ("interest", "float"), + ("evsize", "int"), + ], + ["part", "ts_msp"], + ["series"], + dhours(1), + ddays(4), + ); + tab.create_if_missing(scy).await?; } { - let desc = GenTwcsTab { - name: "item_recv_ivl".into(), - cql: "(part int, series bigint, ts bigint, ema float, emd float, primary key (part, series, ts))".into(), - default_time_to_live: 60 * 60 * 4, - compaction_window_size: 24 * 1, - }; - if !check_table_readable(&desc.name(), scy).await? { - scy.query(desc.cql(), ()).await?; - } + let tab = GenTwcsTab::new( + "muted", + &[ + ("part", "int"), + ("series", "bigint"), + ("ts", "bigint"), + ("ema", "float"), + ("emd", "float"), + ], + ["part"], + ["series", "ts"], + dhours(4), + ddays(1), + ); + tab.create_if_missing(scy).await?; } { - let desc = GenTwcsTab { - name: "binned_scalar_f32_v01".into(), - cql: "(series bigint, bin_len_sec int, bin_count int, off_msp int, off_lsp int, counts frozen>, mins frozen>, maxs frozen>, avgs frozen>, primary key ((series, bin_len_sec, bin_count, off_msp), off_lsp))" - .into(), - default_time_to_live: 60 * 60 * 24 * 30, - compaction_window_size: 24 * 4, - }; - if !check_table_readable(&desc.name(), scy).await? { - scy.query(desc.cql(), ()).await?; - } + let tab = GenTwcsTab::new( + "item_recv_ivl", + &[ + ("part", "int"), + ("series", "bigint"), + ("ts", "bigint"), + ("ema", "float"), + ("emd", "float"), + ], + ["part"], + ["series", "ts"], + dhours(4), + ddays(1), + ); + tab.create_if_missing(scy).await?; + } + { + let tab = GenTwcsTab::new( + "binned_scalar_f32_v01", + &[ + ("series", "bigint"), + ("bin_len_sec", "int"), + ("bin_count", "int"), + ("off_msp", "int"), + ("off_lsp", "int"), + ("counts", "frozen>"), + ("mins", "frozen>"), + ("maxs", "frozen>"), + ("avgs", "frozen>"), + ], + ["series", "bin_len_sec", "bin_count", "off_msp"], + ["off_lsp"], + ddays(30), + ddays(4), + ); + tab.create_if_missing(scy).await?; } Ok(()) } diff --git a/scywr/src/session.rs b/scywr/src/session.rs index d669ea7..f84893e 100644 --- a/scywr/src/session.rs +++ b/scywr/src/session.rs @@ -20,10 +20,9 @@ impl From for Error { } } -pub async fn create_session(scyconf: &ScyllaConfig) -> Result, Error> { +pub async fn create_session_no_ks(scyconf: &ScyllaConfig) -> Result, Error> { let scy = scylla::SessionBuilder::new() .known_nodes(&scyconf.hosts) - .use_keyspace(&scyconf.keyspace, true) .default_execution_profile_handle( ExecutionProfileBuilder::default() .consistency(Consistency::LocalOne) @@ -35,3 +34,11 @@ pub async fn create_session(scyconf: &ScyllaConfig) -> Result, Erro let scy = Arc::new(scy); Ok(scy) } + +pub async fn create_session(scyconf: &ScyllaConfig) -> Result, Error> { + let scy = create_session_no_ks(scyconf).await?; + scy.use_keyspace(&scyconf.keyspace, true) + .await + .map_err(|e| Error::NewSession(e.to_string()))?; + Ok(scy) +}