Use explicit TTL

This commit is contained in:
Dominik Werder
2022-11-07 09:17:10 +01:00
parent e5ad477c57
commit 2d2721389d
3 changed files with 38 additions and 16 deletions

View File

@@ -75,7 +75,7 @@ impl DataStore {
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let scy = Arc::new(scy);
let q = scy
.prepare("insert into ts_msp (series, ts_msp) values (?, ?)")
.prepare("insert into ts_msp (series, ts_msp) values (?, ?) using ttl ?")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_ts_msp = Arc::new(q);
@@ -86,37 +86,39 @@ impl DataStore {
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_series_by_ts_msp = Arc::new(q);
// scalar:
let q = scy
.prepare("insert into events_scalar_i8 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)")
.prepare("insert into events_scalar_i8 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_scalar_i8 = Arc::new(q);
let q = scy
.prepare("insert into events_scalar_i16 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)")
.prepare("insert into events_scalar_i16 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_scalar_i16 = Arc::new(q);
let q = scy
.prepare("insert into events_scalar_i32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)")
.prepare("insert into events_scalar_i32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_scalar_i32 = Arc::new(q);
let q = scy
.prepare("insert into events_scalar_f32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)")
.prepare("insert into events_scalar_f32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_scalar_f32 = Arc::new(q);
let q = scy
.prepare("insert into events_scalar_f64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)")
.prepare("insert into events_scalar_f64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_scalar_f64 = Arc::new(q);
let q = scy
.prepare("insert into events_scalar_string (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)")
.prepare("insert into events_scalar_string (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_scalar_string = Arc::new(q);
// array
let q = scy
.prepare("insert into events_array_i8 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)")

View File

@@ -119,6 +119,9 @@ pub async fn spawn_scylla_insert_workers(
insert_item_queue.receiver()
};
let ingest_commons = ingest_commons.clone();
let ttl_msp = 60 * 60 * 24 * 4;
let ttl_0d = 60 * 60 * 24 * 2;
let ttl_1d = 60 * 60 * 12;
let fut = async move {
let backoff_0 = Duration::from_millis(10);
let mut backoff = backoff_0.clone();
@@ -158,7 +161,7 @@ pub async fn spawn_scylla_insert_workers(
QueryItem::Insert(item) => {
let insert_frac = ingest_commons.insert_frac.load(Ordering::Acquire);
if i1 % 1000 < insert_frac {
match crate::store::insert_item(item, &data_store, &stats).await {
match crate::store::insert_item(item, ttl_msp, ttl_0d, ttl_1d, &data_store, &stats).await {
Ok(_) => {
stats.store_worker_insert_done_inc();
backoff = backoff_0;

View File

@@ -282,6 +282,7 @@ struct InsParCom {
ts_msp: u64,
ts_lsp: u64,
pulse: u64,
ttl: u32,
}
async fn insert_scalar_gen<ST>(
@@ -299,6 +300,7 @@ where
par.ts_lsp as i64,
par.pulse as i64,
val,
par.ttl as i32,
);
let y = data_store.scy.execute(qu, params).await;
match y {
@@ -334,9 +336,16 @@ where
Ok(())
}
pub async fn insert_item(item: InsertItem, data_store: &DataStore, stats: &CaConnStats) -> Result<(), Error> {
pub async fn insert_item(
item: InsertItem,
ttl_msp: u32,
ttl_0d: u32,
ttl_1d: u32,
data_store: &DataStore,
stats: &CaConnStats,
) -> Result<(), Error> {
if item.msp_bump {
let params = (item.series.id() as i64, item.ts_msp as i64);
let params = (item.series.id() as i64, item.ts_msp as i64, ttl_msp as i32);
data_store.scy.execute(&data_store.qu_insert_ts_msp, params).await?;
stats.inserts_msp_inc();
}
@@ -354,15 +363,16 @@ pub async fn insert_item(item: InsertItem, data_store: &DataStore, stats: &CaCon
.await?;
stats.inserts_msp_grid_inc();
}
let par = InsParCom {
series: item.series.id(),
ts_msp: item.ts_msp,
ts_lsp: item.ts_lsp,
pulse: item.pulse,
};
use CaDataValue::*;
match item.val {
Scalar(val) => {
let par = InsParCom {
series: item.series.id(),
ts_msp: item.ts_msp,
ts_lsp: item.ts_lsp,
pulse: item.pulse,
ttl: ttl_0d,
};
use CaDataScalarValue::*;
match val {
I8(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_i8, &data_store).await?,
@@ -375,6 +385,13 @@ pub async fn insert_item(item: InsertItem, data_store: &DataStore, stats: &CaCon
}
}
Array(val) => {
let par = InsParCom {
series: item.series.id(),
ts_msp: item.ts_msp,
ts_lsp: item.ts_lsp,
pulse: item.pulse,
ttl: ttl_1d,
};
use CaDataArrayValue::*;
match val {
I8(val) => insert_array_gen(par, val, &data_store.qu_insert_array_i8, &data_store).await?,