From b3cd133bbdc11e991397b91be150c197839cdd8f Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 28 Feb 2025 09:15:23 +0100 Subject: [PATCH] Store both divs --- scywr/src/insertworker.rs | 26 +++++++++++++++++--------- scywr/src/iteminsertqueue.rs | 7 ++++--- scywr/src/schema.rs | 19 +++++++++++++------ scywr/src/store.rs | 12 ++++++------ serieswriter/src/binwriter.rs | 11 ++++++----- 5 files changed, 46 insertions(+), 29 deletions(-) diff --git a/scywr/src/insertworker.rs b/scywr/src/insertworker.rs index 603d09e..a913724 100644 --- a/scywr/src/insertworker.rs +++ b/scywr/src/insertworker.rs @@ -1,7 +1,7 @@ use crate::config::ScyllaIngestConfig; use crate::iteminsertqueue::Accounting; use crate::iteminsertqueue::AccountingRecv; -use crate::iteminsertqueue::BinWriteIndexV00; +use crate::iteminsertqueue::BinWriteIndexV01; use crate::iteminsertqueue::InsertFut; use crate::iteminsertqueue::InsertItem; use crate::iteminsertqueue::MspItem; @@ -273,11 +273,11 @@ where prepare_timebin_v02_insert_futs(item, &data_store, &stats, tsnow) } } - QueryItem::BinWriteIndexV00(item) => { + QueryItem::BinWriteIndexV01(item) => { if ignore_writes { SmallVec::new() } else { - prepare_bin_write_index_v00_insert_futs(item, &data_store, &stats, tsnow) + prepare_bin_write_index_v01_insert_futs(item, &data_store, &stats, tsnow) } } QueryItem::Accounting(item) => { @@ -321,8 +321,8 @@ fn inspect_items( QueryItem::TimeBinSimpleF32V02(_) => { trace_item_execute!("execute {worker_name} TimeBinSimpleF32V02"); } - QueryItem::BinWriteIndexV00(_) => { - trace_item_execute!("execute {worker_name} BinWriteIndexV00"); + QueryItem::BinWriteIndexV01(_) => { + trace_item_execute!("execute {worker_name} BinWriteIndexV01"); } QueryItem::Accounting(_) => { trace_item_execute!("execute {worker_name} Accounting {item:?}"); @@ -420,18 +420,26 @@ fn prepare_timebin_v02_insert_futs( futs } -fn prepare_bin_write_index_v00_insert_futs( - item: BinWriteIndexV00, +fn prepare_bin_write_index_v01_insert_futs( + item: BinWriteIndexV01, data_store: &Arc, stats: &Arc, tsnow: Instant, ) -> SmallVec<[InsertFut; 4]> { - let params = (item.series, item.div, item.quo, item.rem, item.rt, item.binlen); + let params = ( + item.series, + item.dv1, + item.dv2, + item.quo, + item.rem, + item.rt, + item.binlen, + ); // TODO would be better to count inserts only on completed insert stats.inserted_binned().inc(); let fut = InsertFut::new( data_store.scy.clone(), - data_store.qu_insert_bin_write_index_v00.clone(), + data_store.qu_insert_bin_write_index_v01.clone(), params, tsnow, stats.clone(), diff --git a/scywr/src/iteminsertqueue.rs b/scywr/src/iteminsertqueue.rs index c7779e4..d74fa3e 100644 --- a/scywr/src/iteminsertqueue.rs +++ b/scywr/src/iteminsertqueue.rs @@ -551,9 +551,10 @@ pub struct TimeBinSimpleF32V02 { } #[derive(Debug, Clone)] -pub struct BinWriteIndexV00 { +pub struct BinWriteIndexV01 { pub series: i64, - pub div: i32, + pub dv1: i32, + pub dv2: i32, pub quo: i64, pub rem: i32, pub rt: i32, @@ -566,7 +567,7 @@ pub enum QueryItem { Insert(InsertItem), Msp(MspItem), TimeBinSimpleF32V02(TimeBinSimpleF32V02), - BinWriteIndexV00(BinWriteIndexV00), + BinWriteIndexV01(BinWriteIndexV01), Accounting(Accounting), AccountingRecv(AccountingRecv), } diff --git a/scywr/src/schema.rs b/scywr/src/schema.rs index d3e47f4..590142d 100644 --- a/scywr/src/schema.rs +++ b/scywr/src/schema.rs @@ -233,7 +233,7 @@ impl GenTwcsTab { format!("({}, {})", pkeys, self.cluster_keys.join(", ")) }; let mut s = String::new(); - write!(s, "create table {}", self.name()).unwrap(); + write!(s, "create table {}.{}", self.keyspace(), self.name()).unwrap(); let mut cols: Vec<_> = self .col_names .iter() @@ -245,7 +245,7 @@ impl GenTwcsTab { write!(s, " ({})", cols).unwrap(); write!( s, - " with default_time_to_live = {}, gc_grace_seconds = {}", + " with default_time_to_live = {} and gc_grace_seconds = {}", self.default_time_to_live.as_secs(), self.gc_grace.as_secs() ) @@ -636,16 +636,17 @@ async fn migrate_scylla_data_schema( let tab = GenTwcsTab::new( ks, rett.table_prefix(), - "bin_write_index_v00", + "bin_write_index_v01", &[ ("series", "bigint"), - ("div", "int"), + ("dv1", "int"), + ("dv2", "int"), ("quo", "bigint"), ("rem", "int"), ("rt", "int"), ("binlen", "int"), ], - ["series", "div", "quo"], + ["series", "dv1", "dv2", "quo"], ["rem", "rt", "binlen"], rett.ttl_binned(), ); @@ -730,7 +731,13 @@ pub async fn migrate_scylla_data_schema_all_rt( let scy2 = create_session_no_ks(scyconf).await?; let scy = &scy2; for cql in chs.todo.iter() { - scy.query_unpaged(cql.as_str(), ()).await?; + match scy.query_unpaged(cql.as_str(), ()).await { + Ok(_) => {} + Err(e) => { + info!("cql error {}", cql); + return Err(e.into()); + } + } } } } diff --git a/scywr/src/store.rs b/scywr/src/store.rs index f158277..6acfb6b 100644 --- a/scywr/src/store.rs +++ b/scywr/src/store.rs @@ -45,7 +45,7 @@ pub struct DataStore { pub qu_insert_array_f64: Arc, pub qu_insert_array_bool: Arc, pub qu_insert_binned_scalar_f32_v02: Arc, - pub qu_insert_bin_write_index_v00: Arc, + pub qu_insert_bin_write_index_v01: Arc, pub qu_account_00: Arc, pub qu_account_recv_00: Arc, pub qu_dummy: Arc, @@ -157,10 +157,10 @@ impl DataStore { scy ); - let qu_insert_bin_write_index_v00 = prep_qu_ins_c!( - "bin_write_index_v00", - "series, div, quo, rem, rt, binlen", - "?, ?, ?, ?, ?, ?", + let qu_insert_bin_write_index_v01 = prep_qu_ins_c!( + "bin_write_index_v01", + "series, dv1, dv2, quo, rem, rt, binlen", + "?, ?, ?, ?, ?, ?, ?", rett, scy ); @@ -219,7 +219,7 @@ impl DataStore { qu_insert_array_f64, qu_insert_array_bool, qu_insert_binned_scalar_f32_v02, - qu_insert_bin_write_index_v00, + qu_insert_bin_write_index_v01, qu_account_00, qu_account_recv_00, qu_dummy, diff --git a/serieswriter/src/binwriter.rs b/serieswriter/src/binwriter.rs index db03ac0..6595a7d 100644 --- a/serieswriter/src/binwriter.rs +++ b/serieswriter/src/binwriter.rs @@ -16,7 +16,7 @@ use netpod::Shape; use netpod::TsNano; use netpod::ttl::RetentionTime; use scywr::insertqueues::InsertDeques; -use scywr::iteminsertqueue::BinWriteIndexV00; +use scywr::iteminsertqueue::BinWriteIndexV01; use scywr::iteminsertqueue::QueryItem; use scywr::iteminsertqueue::TimeBinSimpleF32V02; use series::ChannelStatusSeriesId; @@ -413,18 +413,19 @@ impl BinWriter { } } let div = PrebinnedPartitioning::Day1; - let (quo, rem) = div.quo_rem(ts1.to_ts_ms()); + let (quo, rem, dv1, dv2) = div.quo_rem(ts1.to_ts_ms()); if index_written.should_write(div.clone(), quo, rem) { index_written.mark_written(div.clone(), quo, rem); - let item = BinWriteIndexV00 { + let item = BinWriteIndexV01 { series: series.id() as i64, - div: div.msp_div().ms() as i32, + dv1: dv1 as i32, + dv2: dv2 as i32, quo: quo as i64, rem: rem as i32, rt: rt.index_db_i32(), binlen: pbp.bin_len().ms() as i32, }; - let item = QueryItem::BinWriteIndexV00(item); + let item = QueryItem::BinWriteIndexV01(item); match rt { RetentionTime::Short => { iqdqs.st_rf3_qu.push_back(item);