From 2d2721389ddaf397193bde9c5ddc912387c9580f Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Mon, 7 Nov 2022 09:17:10 +0100 Subject: [PATCH] Use explicit TTL --- netfetch/src/ca/store.rs | 16 +++++++++------- netfetch/src/insertworker.rs | 5 ++++- netfetch/src/store.rs | 33 +++++++++++++++++++++++++-------- 3 files changed, 38 insertions(+), 16 deletions(-) diff --git a/netfetch/src/ca/store.rs b/netfetch/src/ca/store.rs index 06a3b2e..fcffe38 100644 --- a/netfetch/src/ca/store.rs +++ b/netfetch/src/ca/store.rs @@ -75,7 +75,7 @@ impl DataStore { .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; let scy = Arc::new(scy); let q = scy - .prepare("insert into ts_msp (series, ts_msp) values (?, ?)") + .prepare("insert into ts_msp (series, ts_msp) values (?, ?) using ttl ?") .await .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; let qu_insert_ts_msp = Arc::new(q); @@ -86,37 +86,39 @@ impl DataStore { .await .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; let qu_insert_series_by_ts_msp = Arc::new(q); + // scalar: let q = scy - .prepare("insert into events_scalar_i8 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)") + .prepare("insert into events_scalar_i8 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?") .await .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; let qu_insert_scalar_i8 = Arc::new(q); let q = scy - .prepare("insert into events_scalar_i16 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)") + .prepare("insert into events_scalar_i16 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?") .await .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; let qu_insert_scalar_i16 = Arc::new(q); let q = scy - .prepare("insert into events_scalar_i32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)") + .prepare("insert into events_scalar_i32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?") .await .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; let qu_insert_scalar_i32 = Arc::new(q); let q = scy - .prepare("insert into events_scalar_f32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)") + .prepare("insert into events_scalar_f32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?") .await .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; let qu_insert_scalar_f32 = Arc::new(q); let q = scy - .prepare("insert into events_scalar_f64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)") + .prepare("insert into events_scalar_f64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?") .await .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; let qu_insert_scalar_f64 = Arc::new(q); let q = scy - .prepare("insert into events_scalar_string (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)") + .prepare("insert into events_scalar_string (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?") .await .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; let qu_insert_scalar_string = Arc::new(q); + // array let q = scy .prepare("insert into events_array_i8 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)") diff --git a/netfetch/src/insertworker.rs b/netfetch/src/insertworker.rs index 7b078cc..4f2a5a2 100644 --- a/netfetch/src/insertworker.rs +++ b/netfetch/src/insertworker.rs @@ -119,6 +119,9 @@ pub async fn spawn_scylla_insert_workers( insert_item_queue.receiver() }; let ingest_commons = ingest_commons.clone(); + let ttl_msp = 60 * 60 * 24 * 4; + let ttl_0d = 60 * 60 * 24 * 2; + let ttl_1d = 60 * 60 * 12; let fut = async move { let backoff_0 = Duration::from_millis(10); let mut backoff = backoff_0.clone(); @@ -158,7 +161,7 @@ pub async fn spawn_scylla_insert_workers( QueryItem::Insert(item) => { let insert_frac = ingest_commons.insert_frac.load(Ordering::Acquire); if i1 % 1000 < insert_frac { - match crate::store::insert_item(item, &data_store, &stats).await { + match crate::store::insert_item(item, ttl_msp, ttl_0d, ttl_1d, &data_store, &stats).await { Ok(_) => { stats.store_worker_insert_done_inc(); backoff = backoff_0; diff --git a/netfetch/src/store.rs b/netfetch/src/store.rs index 5da880f..7a586d8 100644 --- a/netfetch/src/store.rs +++ b/netfetch/src/store.rs @@ -282,6 +282,7 @@ struct InsParCom { ts_msp: u64, ts_lsp: u64, pulse: u64, + ttl: u32, } async fn insert_scalar_gen( @@ -299,6 +300,7 @@ where par.ts_lsp as i64, par.pulse as i64, val, + par.ttl as i32, ); let y = data_store.scy.execute(qu, params).await; match y { @@ -334,9 +336,16 @@ where Ok(()) } -pub async fn insert_item(item: InsertItem, data_store: &DataStore, stats: &CaConnStats) -> Result<(), Error> { +pub async fn insert_item( + item: InsertItem, + ttl_msp: u32, + ttl_0d: u32, + ttl_1d: u32, + data_store: &DataStore, + stats: &CaConnStats, +) -> Result<(), Error> { if item.msp_bump { - let params = (item.series.id() as i64, item.ts_msp as i64); + let params = (item.series.id() as i64, item.ts_msp as i64, ttl_msp as i32); data_store.scy.execute(&data_store.qu_insert_ts_msp, params).await?; stats.inserts_msp_inc(); } @@ -354,15 +363,16 @@ pub async fn insert_item(item: InsertItem, data_store: &DataStore, stats: &CaCon .await?; stats.inserts_msp_grid_inc(); } - let par = InsParCom { - series: item.series.id(), - ts_msp: item.ts_msp, - ts_lsp: item.ts_lsp, - pulse: item.pulse, - }; use CaDataValue::*; match item.val { Scalar(val) => { + let par = InsParCom { + series: item.series.id(), + ts_msp: item.ts_msp, + ts_lsp: item.ts_lsp, + pulse: item.pulse, + ttl: ttl_0d, + }; use CaDataScalarValue::*; match val { I8(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_i8, &data_store).await?, @@ -375,6 +385,13 @@ pub async fn insert_item(item: InsertItem, data_store: &DataStore, stats: &CaCon } } Array(val) => { + let par = InsParCom { + series: item.series.id(), + ts_msp: item.ts_msp, + ts_lsp: item.ts_lsp, + pulse: item.pulse, + ttl: ttl_1d, + }; use CaDataArrayValue::*; match val { I8(val) => insert_array_gen(par, val, &data_store.qu_insert_array_i8, &data_store).await?,