From 9ddc0de9afd80dcc4cbbc590f0b832dc9dbfa56f Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 1 Mar 2024 11:46:02 +0100 Subject: [PATCH] Check schema for more table options --- daqingest/src/bin/daqingest.rs | 3 + daqingest/src/daemon.rs | 8 -- daqingest/src/tools.rs | 1 - netfetch/src/conf.rs | 37 +------- scywr/src/insertworker.rs | 20 +---- scywr/src/iteminsertqueue.rs | 51 ++--------- scywr/src/schema.rs | 150 +++++++++++++++++++++++++-------- 7 files changed, 128 insertions(+), 142 deletions(-) diff --git a/daqingest/src/bin/daqingest.rs b/daqingest/src/bin/daqingest.rs index ba60fcf..cefab86 100644 --- a/daqingest/src/bin/daqingest.rs +++ b/daqingest/src/bin/daqingest.rs @@ -58,6 +58,9 @@ async fn main_run_inner(opts: DaqIngestOpts) -> Result<(), Error> { name: k.pg_name, }; let scyconf = ScyllaIngestConfig::new([k.scylla_host], k.scylla_keyspace); + scywr::schema::migrate_scylla_data_schema(&scyconf, netpod::ttl::RetentionTime::Short) + .await + .map_err(Error::from_string)?; match k.sub { DbSub::Data(u) => { use daqingest::opts::DbDataSub; diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index 84d6b93..647be7d 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -19,7 +19,6 @@ use netpod::ttl::RetentionTime; use netpod::Database; use scywr::config::ScyllaIngestConfig; use scywr::insertworker::InsertWorkerOpts; -use scywr::insertworker::Ttls; use scywr::iteminsertqueue as scywriiq; use scywriiq::QueryItem; use stats::DaemonStats; @@ -46,7 +45,6 @@ const CHECK_CHANNEL_SLOW_WARN: Duration = Duration::from_millis(500); pub struct DaemonOpts { pgconf: Database, scyconf: ScyllaIngestConfig, - ttls: Ttls, #[allow(unused)] test_bsread_addr: Option, insert_frac: Arc, @@ -591,12 +589,6 @@ pub async fn run(opts: CaIngestOpts, channels_config: Option) -> let opts2 = DaemonOpts { pgconf: opts.postgresql_config().clone(), scyconf: opts.scylla_config().clone(), - ttls: Ttls { - index: opts.ttl_index(), - d0: opts.ttl_d0(), - d1: opts.ttl_d1(), - binned: opts.ttl_binned(), - }, test_bsread_addr: opts.test_bsread_addr.clone(), insert_frac: insert_frac.clone(), store_workers_rate, diff --git a/daqingest/src/tools.rs b/daqingest/src/tools.rs index e8f1d91..9cb02be 100644 --- a/daqingest/src/tools.rs +++ b/daqingest/src/tools.rs @@ -34,7 +34,6 @@ pub async fn remove_older( pgconf: &Database, scyconf: &ScyllaIngestConfig, ) -> Result<(), Error> { - scywr::schema::migrate_scylla_data_schema(scyconf, netpod::ttl::RetentionTime::Short).await?; let date_cut = parse_date_str(¶ms.date)?; let ts_cut = date_to_ts_ns(date_cut); debug!("chosen date is {:?} {}", date_cut, ts_cut); diff --git a/netfetch/src/conf.rs b/netfetch/src/conf.rs index 683eb48..4536f0f 100644 --- a/netfetch/src/conf.rs +++ b/netfetch/src/conf.rs @@ -40,14 +40,6 @@ pub struct CaIngestOpts { store_workers_rate: Option, insert_frac: Option, use_rate_limit_queue: Option, - #[serde(default, with = "humantime_serde")] - ttl_index: Option, - #[serde(default, with = "humantime_serde")] - ttl_d0: Option, - #[serde(default, with = "humantime_serde")] - ttl_d1: Option, - #[serde(default, with = "humantime_serde")] - ttl_binned: Option, pub test_bsread_addr: Option, } @@ -119,37 +111,13 @@ impl CaIngestOpts { pub fn use_rate_limit_queue(&self) -> bool { self.use_rate_limit_queue.unwrap_or(false) } - - pub fn ttl_index(&self) -> Duration { - self.ttl_index - .clone() - .unwrap_or_else(|| Duration::from_secs(60 * 60 * 24 * 50)) - } - - pub fn ttl_d0(&self) -> Duration { - self.ttl_d0 - .clone() - .unwrap_or_else(|| Duration::from_secs(60 * 60 * 24 * 40)) - } - - pub fn ttl_d1(&self) -> Duration { - self.ttl_d1 - .clone() - .unwrap_or_else(|| Duration::from_secs(60 * 60 * 24 * 10)) - } - - pub fn ttl_binned(&self) -> Duration { - self.ttl_binned - .clone() - .unwrap_or_else(|| Duration::from_secs(60 * 60 * 24 * 40)) - } } #[test] fn parse_config_minimal() { let conf = r###" backend: scylla -ttl_d1: 10m 3s 45ms +timeout: 10m 3s 45ms api_bind: "0.0.0.0:3011" channels: /some/path/file.txt search: @@ -173,8 +141,7 @@ scylla: assert_eq!(&conf.api_bind, "0.0.0.0:3011"); assert_eq!(conf.search.get(0), Some(&"172.26.0.255".to_string())); assert_eq!(conf.scylla.hosts().get(1), Some(&"sf-nube-12:19042".to_string())); - assert_eq!(conf.ttl_d1, Some(Duration::from_millis(1000 * (60 * 10 + 3) + 45))); - assert_eq!(conf.ttl_binned, None); + assert_eq!(conf.timeout, Some(Duration::from_millis(1000 * (60 * 10 + 3) + 45))); } #[test] diff --git a/scywr/src/insertworker.rs b/scywr/src/insertworker.rs index 7ea2998..066fed8 100644 --- a/scywr/src/insertworker.rs +++ b/scywr/src/insertworker.rs @@ -83,14 +83,6 @@ async fn back_off_sleep(backoff_dt: &mut Duration) { tokio::time::sleep(*backoff_dt).await; } -#[derive(Debug, Clone)] -pub struct Ttls { - pub index: Duration, - pub d0: Duration, - pub d1: Duration, - pub binned: Duration, -} - pub struct InsertWorkerOpts { pub store_workers_rate: Arc, pub insert_workers_running: Arc, @@ -125,7 +117,6 @@ pub async fn spawn_scylla_insert_workers( let jh = tokio::spawn(worker( worker_ix, item_inp.clone(), - ttls.clone(), insert_worker_opts.clone(), data_store, store_stats.clone(), @@ -147,7 +138,6 @@ pub async fn spawn_scylla_insert_workers( async fn worker( worker_ix: usize, item_inp: Receiver, - ttls: Ttls, insert_worker_opts: Arc, data_store: Arc, stats: Arc, @@ -167,7 +157,7 @@ async fn worker( break; }; match item { - QueryItem::ConnectionStatus(item) => match insert_connection_status(item, ttls.index, &data_store).await { + QueryItem::ConnectionStatus(item) => match insert_connection_status(item, &data_store).await { Ok(_) => { stats.inserted_connection_status().inc(); backoff = backoff_0; @@ -177,7 +167,7 @@ async fn worker( back_off_sleep(&mut backoff).await; } }, - QueryItem::ChannelStatus(item) => match insert_channel_status(item, ttls.index, &data_store).await { + QueryItem::ChannelStatus(item) => match insert_channel_status(item, &data_store).await { Ok(_) => { stats.inserted_channel_status().inc(); backoff = backoff_0; @@ -198,7 +188,7 @@ async fn worker( stats.item_lat_net_worker().ingest(dt); let insert_frac = insert_worker_opts.insert_frac.load(Ordering::Acquire); let do_insert = i1 % 1000 < insert_frac; - match insert_item(item, &ttls, &data_store, do_insert, &stats).await { + match insert_item(item, &data_store, do_insert, &stats).await { Ok(_) => { stats.inserted_values().inc(); let tsnow = { @@ -224,7 +214,6 @@ async fn worker( item.ts as i64, item.ema, item.emd, - ttls.index.as_secs() as i32, ); let qu = err::todoval(); let qres = data_store.scy.execute(&qu, values).await; @@ -246,7 +235,6 @@ async fn worker( item.ts as i64, item.ema, item.emd, - ttls.index.as_secs() as i32, ); let qu = err::todoval(); let qres = data_store.scy.execute(&qu, values).await; @@ -269,7 +257,6 @@ async fn worker( item.ivl, item.interest, item.evsize as i32, - ttls.index.as_secs() as i32, ); let qu = err::todoval(); let qres = data_store.scy.execute(&qu, params).await; @@ -427,7 +414,6 @@ fn prepare_query_insert_futs( if item.shape.to_scylla_vec().is_empty() { 0 } else { 1 } as i32, item.scalar_type.to_scylla_i32(), item.series.id() as i64, - ttls.index.as_secs() as i32, ); data_store .scy diff --git a/scywr/src/iteminsertqueue.rs b/scywr/src/iteminsertqueue.rs index 2d02ae4..bc01cb8 100644 --- a/scywr/src/iteminsertqueue.rs +++ b/scywr/src/iteminsertqueue.rs @@ -1,6 +1,5 @@ pub use netpod::CONNECTION_STATUS_DIV; -use crate::insertworker::Ttls; use crate::session::ScySession; use crate::store::DataStore; use err::thiserror; @@ -522,15 +521,15 @@ where } } +// TODO ncurrently not in use, anything to merge? pub async fn insert_item( item: InsertItem, - ttls: &Ttls, data_store: &DataStore, do_insert: bool, stats: &Arc, ) -> Result<(), Error> { if item.msp_bump { - let params = (item.series.id() as i64, item.ts_msp as i64, ttls.index.as_secs() as i32); + let params = (item.series.id() as i64, item.ts_msp as i64); data_store.scy.execute(&data_store.qu_insert_ts_msp, params).await?; stats.inserts_msp().inc(); } @@ -541,7 +540,6 @@ pub async fn insert_item( if item.shape.to_scylla_vec().is_empty() { 0 } else { 1 } as i32, item.scalar_type.to_scylla_i32(), item.series.id() as i64, - ttls.index.as_secs() as i32, ); data_store .scy @@ -723,11 +721,7 @@ pub fn insert_channel_status_fut( smallvec![fut1, fut2] } -pub async fn insert_connection_status( - item: ConnectionStatusItem, - ttl: Duration, - data_store: &DataStore, -) -> Result<(), Error> { +pub async fn insert_connection_status(item: ConnectionStatusItem, data_store: &DataStore) -> Result<(), Error> { let tsunix = item.ts.duration_since(UNIX_EPOCH).unwrap_or(Duration::ZERO); let secs = tsunix.as_secs() * SEC; let nanos = tsunix.subsec_nanos() as u64; @@ -736,7 +730,7 @@ pub async fn insert_connection_status( let ts_lsp = ts - ts_msp; let kind = item.status.to_kind(); let addr = format!("{}", item.addr); - let params = (ts_msp as i64, ts_lsp as i64, kind as i32, addr, ttl.as_secs() as i32); + let params = (ts_msp as i64, ts_lsp as i64, kind as i32, addr); data_store .scy .execute(&data_store.qu_insert_connection_status, params) @@ -744,11 +738,7 @@ pub async fn insert_connection_status( Ok(()) } -pub async fn insert_channel_status( - item: ChannelStatusItem, - ttl: Duration, - data_store: &DataStore, -) -> Result<(), Error> { +pub async fn insert_channel_status(item: ChannelStatusItem, data_store: &DataStore) -> Result<(), Error> { let tsunix = item.ts.duration_since(UNIX_EPOCH).unwrap_or(Duration::ZERO); let secs = tsunix.as_secs() * SEC; let nanos = tsunix.subsec_nanos() as u64; @@ -757,24 +747,12 @@ pub async fn insert_channel_status( let ts_lsp = ts - ts_msp; let kind = item.status.to_kind(); let cssid = item.cssid.id(); - let params = ( - cssid as i64, - ts_msp as i64, - ts_lsp as i64, - kind as i32, - ttl.as_secs() as i32, - ); + let params = (cssid as i64, ts_msp as i64, ts_lsp as i64, kind as i32); data_store .scy .execute(&data_store.qu_insert_channel_status, params) .await?; - let params = ( - ts_msp as i64, - ts_lsp as i64, - cssid as i64, - kind as i32, - ttl.as_secs() as i32, - ); + let params = (ts_msp as i64, ts_lsp as i64, cssid as i64, kind as i32); data_store .scy .execute(&data_store.qu_insert_channel_status_by_ts_msp, params) @@ -782,21 +760,6 @@ pub async fn insert_channel_status( Ok(()) } -pub struct InsertFut2 { - data_store: Arc, - stats: Arc, - kind: InsertFutKind, -} - -impl Future for InsertFut2 { - type Output = Result<(), Error>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - use Poll::*; - todo!() - } -} - pub enum InsertFutKind { Value, } diff --git a/scywr/src/schema.rs b/scywr/src/schema.rs index 75f8d0c..85b03a8 100644 --- a/scywr/src/schema.rs +++ b/scywr/src/schema.rs @@ -8,6 +8,8 @@ use log::*; use netpod::ttl::RetentionTime; use scylla::transport::errors::DbError; use scylla::transport::errors::QueryError; +use scylla::transport::iterator::NextRowError; +use std::collections::BTreeMap; use std::fmt; use std::time::Duration; @@ -17,6 +19,8 @@ pub enum Error { Fmt(#[from] fmt::Error), Query(#[from] QueryError), NewSession(String), + ScyllaNextRow(#[from] NextRowError), + MissingData, } impl From for Error { @@ -111,6 +115,7 @@ struct GenTwcsTab { cluster_keys: Vec, default_time_to_live: Duration, compaction_window_size: Duration, + gc_grace: Duration, } impl GenTwcsTab { @@ -125,7 +130,6 @@ impl GenTwcsTab { partition_keys: I2, cluster_keys: I3, default_time_to_live: Duration, - compaction_window_size: Duration, ) -> Self where PRE: AsRef, @@ -146,7 +150,7 @@ impl GenTwcsTab { partition_keys, cluster_keys, default_time_to_live, - compaction_window_size, + default_time_to_live / 40, ) } @@ -182,6 +186,7 @@ impl GenTwcsTab { cluster_keys: cluster_keys.into_iter().map(Into::into).collect(), default_time_to_live, compaction_window_size, + gc_grace: Duration::from_secs(60 * 60 * 12), } } @@ -189,6 +194,22 @@ impl GenTwcsTab { &self.name } + async fn setup(&self, scy: &ScySession) -> Result<(), Error> { + self.create_if_missing(scy).await?; + self.check_table_options(scy).await?; + Ok(()) + } + + async fn create_if_missing(&self, scy: &ScySession) -> Result<(), Error> { + // TODO check for more details (all columns, correct types, correct kinds, etc) + if !has_table(self.name(), scy).await? { + let cql = self.cql(); + info!("scylla create table {} {}", self.name(), cql); + scy.query(cql, ()).await?; + } + Ok(()) + } + fn cql(&self) -> String { use std::fmt::Write; let pkey = if self.partition_keys.len() == 0 { @@ -219,24 +240,81 @@ impl GenTwcsTab { self.default_time_to_live.as_secs() ) .unwrap(); - s.write_str(" and compaction = { 'class': 'TimeWindowCompactionStrategy', 'compaction_window_unit': 'HOURS'") - .unwrap(); + s.write_str(" and compaction = { ").unwrap(); write!( s, - ", 'compaction_window_size': {}", - self.compaction_window_size.as_secs() / 60 / 60 + concat!( + "'class': 'TimeWindowCompactionStrategy'", + ", 'compaction_window_unit': 'MINUTES'", + ", 'compaction_window_size': {}", + ), + self.compaction_window_size.as_secs() / 60 ) .unwrap(); s.write_str(" }").unwrap(); s } - async fn create_if_missing(&self, scy: &ScySession) -> Result<(), Error> { - // TODO check for more details (all columns, correct types, correct kinds, etc) - if !has_table(self.name(), scy).await? { - let cql = self.cql(); - info!("scylla create table {} {}", self.name(), cql); - scy.query(cql, ()).await?; + fn compaction_options(&self) -> BTreeMap { + let win_mins = self.compaction_window_size.as_secs() / 60; + let mut map = BTreeMap::new(); + map.insert("class".into(), "TimeWindowCompactionStrategy".into()); + map.insert("compaction_window_unit".into(), "MINUTES".into()); + map.insert("compaction_window_size".into(), win_mins.to_string()); + map + } + + async fn check_table_options(&self, scy: &ScySession) -> Result<(), Error> { + let cql = concat!( + "select default_time_to_live, gc_grace_seconds, compaction", + " from system_schema.tables where keyspace_name = ? and table_name = ?" + ); + let x = scy + .query_iter(cql, (scy.get_keyspace().unwrap().as_ref(), &self.name())) + .await?; + let mut it = x.into_typed::<(i32, i32, BTreeMap)>(); + let mut rows = Vec::new(); + while let Some(u) = it.next().await { + let row = u?; + rows.push((row.0 as u64, row.1 as u64, row.2)); + } + if let Some(row) = rows.get(0) { + if row.0 != self.default_time_to_live.as_secs() { + let cql = format!( + concat!("alter table {} with default_time_to_live = {}"), + self.name(), + self.default_time_to_live.as_secs() + ); + debug!("{cql}"); + scy.query(cql, ()).await?; + } + if row.1 != self.gc_grace.as_secs() { + let cql = format!( + concat!("alter table {} with gc_grace_seconds = {}"), + self.name(), + self.gc_grace.as_secs() + ); + debug!("{cql}"); + scy.query(cql, ()).await?; + } + if row.2 != self.compaction_options() { + let params: Vec<_> = self + .compaction_options() + .iter() + .map(|(k, v)| format!("'{k}': '{v}'")) + .collect(); + let params = params.join(", "); + let cql = format!( + concat!("alter table {} with compaction = {{ {} }}"), + self.name(), + params + ); + debug!("{cql}"); + scy.query(cql, ()).await?; + } + } else { + let e = Error::MissingData; + return Err(e); } Ok(()) } @@ -404,14 +482,19 @@ pub async fn migrate_scylla_data_schema(scyconf: &ScyllaIngestConfig, rett: Rete scy.use_keyspace(scyconf.keyspace(), true).await?; - { - let table_name = format!("{}ts_msp", rett.table_prefix()); - if !has_table(&table_name, &scy).await? { - create_table_ts_msp(&table_name, scy).await?; - } - } check_event_tables(rett.clone(), scy).await?; + { + let tab = GenTwcsTab::new( + rett.table_prefix(), + "ts_msp", + &[("series", "bigint"), ("ts_msp", "bigint")], + ["series"], + ["ts_msp_ms"], + rett.ttl_ts_msp(), + ); + tab.setup(scy).await?; + } { let tab = GenTwcsTab::new( rett.table_prefix(), @@ -419,10 +502,9 @@ pub async fn migrate_scylla_data_schema(scyconf: &ScyllaIngestConfig, rett: Rete &[("series", "bigint"), ("ts_msp_ms", "bigint")], ["series"], ["ts_msp_ms"], - rett.ttl_ts_msp() / 40, rett.ttl_ts_msp(), ); - tab.create_if_missing(scy).await?; + tab.setup(scy).await?; } { let tab = GenTwcsTab::new( @@ -438,9 +520,8 @@ pub async fn migrate_scylla_data_schema(scyconf: &ScyllaIngestConfig, rett: Rete ["part", "ts_msp", "shape_kind", "scalar_type"], ["series"], dhours(5), - ddays(4), ); - tab.create_if_missing(scy).await?; + tab.setup(scy).await?; } { let tab = GenTwcsTab::new( @@ -454,10 +535,9 @@ pub async fn migrate_scylla_data_schema(scyconf: &ScyllaIngestConfig, rett: Rete ], ["ts_msp"], ["ts_lsp"], - dhours(1), - ddays(4), + rett.ttl_channel_status(), ); - tab.create_if_missing(scy).await?; + tab.setup(scy).await?; } { let tab = GenTwcsTab::new( @@ -471,10 +551,9 @@ pub async fn migrate_scylla_data_schema(scyconf: &ScyllaIngestConfig, rett: Rete ], ["series", "ts_msp"], ["ts_lsp"], - dhours(1), - ddays(4), + rett.ttl_channel_status(), ); - tab.create_if_missing(scy).await?; + tab.setup(scy).await?; } { let tab = GenTwcsTab::new( @@ -488,10 +567,9 @@ pub async fn migrate_scylla_data_schema(scyconf: &ScyllaIngestConfig, rett: Rete ], ["ts_msp"], ["ts_lsp"], - dhours(1), - ddays(4), + rett.ttl_channel_status(), ); - tab.create_if_missing(scy).await?; + tab.setup(scy).await?; } { let tab = GenTwcsTab::new( @@ -509,10 +587,9 @@ pub async fn migrate_scylla_data_schema(scyconf: &ScyllaIngestConfig, rett: Rete ], ["series", "bin_len_ms", "ts_msp"], ["off"], - ddays(30), - ddays(4), + rett.ttl_binned(), ); - tab.create_if_missing(scy).await?; + tab.setup(scy).await?; } { let tab = GenTwcsTab::new( @@ -527,10 +604,9 @@ pub async fn migrate_scylla_data_schema(scyconf: &ScyllaIngestConfig, rett: Rete ], ["part", "ts"], ["series"], - ddays(30), - ddays(4), + rett.ttl_channel_status(), ); - tab.create_if_missing(scy).await?; + tab.setup(scy).await?; } Ok(()) }