Restructure binned schema

This commit is contained in:
Dominik Werder
2024-11-08 13:54:34 +01:00
parent b163369b1c
commit ae08472a90
5 changed files with 37 additions and 123 deletions

View File

@@ -7,8 +7,7 @@ use crate::iteminsertqueue::InsertFut;
use crate::iteminsertqueue::InsertItem;
use crate::iteminsertqueue::MspItem;
use crate::iteminsertqueue::QueryItem;
use crate::iteminsertqueue::TimeBinSimpleF32;
use crate::iteminsertqueue::TimeBinSimpleF32V01;
use crate::iteminsertqueue::TimeBinSimpleF32V02;
use crate::store::DataStore;
use async_channel::Receiver;
use atomic::AtomicU64;
@@ -266,9 +265,8 @@ where
let futs = match item {
QueryItem::Insert(item) => prepare_query_insert_futs(item, &data_store, &stats, tsnow),
QueryItem::Msp(item) => prepare_msp_insert_futs(item, &data_store, &stats, tsnow),
QueryItem::TimeBinSimpleF32(item) => prepare_timebin_insert_futs(item, &data_store, &stats, tsnow),
QueryItem::TimeBinSimpleF32V01(item) => {
prepare_timebin_v01_insert_futs(item, &data_store, &stats, tsnow)
QueryItem::TimeBinSimpleF32V02(item) => {
prepare_timebin_v02_insert_futs(item, &data_store, &stats, tsnow)
}
QueryItem::Accounting(item) => prepare_accounting_insert_futs(item, &data_store, &stats, tsnow),
QueryItem::AccountingRecv(item) => {
@@ -298,11 +296,8 @@ fn inspect_items(
QueryItem::Msp(item) => {
trace_item_execute!("execute {worker_name} Msp {}", item.string_short());
}
QueryItem::TimeBinSimpleF32(_) => {
trace_item_execute!("execute {worker_name} TimeBinSimpleF32");
}
QueryItem::TimeBinSimpleF32V01(_) => {
trace_item_execute!("execute {worker_name} TimeBinSimpleF32V01");
QueryItem::TimeBinSimpleF32V02(_) => {
trace_item_execute!("execute {worker_name} TimeBinSimpleF32V02");
}
QueryItem::Accounting(_) => {
trace_item_execute!("execute {worker_name} Accounting {item:?}");
@@ -357,22 +352,22 @@ fn prepare_query_insert_futs(
futs
}
fn prepare_timebin_insert_futs(
item: TimeBinSimpleF32,
fn prepare_timebin_v02_insert_futs(
item: TimeBinSimpleF32V02,
data_store: &Arc<DataStore>,
stats: &Arc<InsertWorkerStats>,
tsnow: Instant,
) -> SmallVec<[InsertFut; 4]> {
trace!("have time bin patch to insert: {item:?}");
let params = (
item.series.id() as i64,
item.bin_len_ms,
item.ts_msp.to_i64(),
item.binlen,
item.msp,
item.off,
item.count,
item.cnt,
item.min,
item.max,
item.avg,
item.dev,
);
// TODO would be better to count inserts only on completed insert
stats.inserted_binned().inc();
@@ -399,48 +394,6 @@ fn prepare_timebin_insert_futs(
futs
}
fn prepare_timebin_v01_insert_futs(
item: TimeBinSimpleF32V01,
data_store: &Arc<DataStore>,
stats: &Arc<InsertWorkerStats>,
tsnow: Instant,
) -> SmallVec<[InsertFut; 4]> {
trace!("have time bin patch to insert: {item:?}");
let params = (
item.series.id() as i64,
item.bin_len_ms,
item.ts_msp.to_i64(),
item.off,
item.count,
item.min,
item.max,
item.avg,
);
// TODO would be better to count inserts only on completed insert
stats.inserted_binned().inc();
let fut = InsertFut::new(
data_store.scy.clone(),
data_store.qu_insert_binned_scalar_f32_v01.clone(),
params,
tsnow,
stats.clone(),
);
let futs = smallvec![fut];
// TODO match on the query result:
// match qres {
// Ok(_) => {
// backoff = backoff_0;
// }
// Err(e) => {
// stats_inc_for_err(&stats, &crate::iteminsertqueue::Error::QueryError(e));
// back_off_sleep(&mut backoff).await;
// }
// }
futs
}
fn prepare_accounting_insert_futs(
item: Accounting,
data_store: &Arc<DataStore>,

View File

@@ -538,27 +538,16 @@ impl InsertItem {
}
#[derive(Debug, Clone)]
pub struct TimeBinSimpleF32 {
pub struct TimeBinSimpleF32V02 {
pub series: SeriesId,
pub bin_len_ms: i32,
pub ts_msp: TsMs,
pub binlen: i32,
pub msp: i64,
pub off: i32,
pub count: i64,
pub min: f32,
pub max: f32,
pub avg: f32,
}
#[derive(Debug, Clone)]
pub struct TimeBinSimpleF32V01 {
pub series: SeriesId,
pub bin_len_ms: i32,
pub ts_msp: TsMs,
pub off: i32,
pub count: i64,
pub cnt: i64,
pub min: f32,
pub max: f32,
pub avg: f32,
pub dev: f32,
}
// Needs to be Clone to send it to multiple retention times if required.
@@ -566,8 +555,7 @@ pub struct TimeBinSimpleF32V01 {
pub enum QueryItem {
Insert(InsertItem),
Msp(MspItem),
TimeBinSimpleF32(TimeBinSimpleF32),
TimeBinSimpleF32V01(TimeBinSimpleF32V01),
TimeBinSimpleF32V02(TimeBinSimpleF32V02),
Accounting(Accounting),
AccountingRecv(AccountingRecv),
}

View File

@@ -657,39 +657,19 @@ pub async fn migrate_scylla_data_schema(
let tab = GenTwcsTab::new(
ks,
rett.table_prefix(),
"binned_scalar_f32",
"binned_scalar_f32_v02",
&[
("series", "bigint"),
("bin_len_ms", "int"),
("ts_msp", "bigint"),
("binlen", "int"),
("msp", "bigint"),
("off", "int"),
("count", "bigint"),
("cnt", "bigint"),
("min", "float"),
("max", "float"),
("avg", "float"),
("dev", "float"),
],
["series", "bin_len_ms", "ts_msp"],
["off"],
rett.ttl_binned(),
);
tab.setup(do_change, scy).await?;
}
{
let tab = GenTwcsTab::new(
ks,
rett.table_prefix(),
"binned_scalar_f32_v01",
&[
("series", "bigint"),
("bin_len_ms", "int"),
("ts_msp", "bigint"),
("off", "int"),
("count", "bigint"),
("min", "float"),
("max", "float"),
("avg", "float"),
],
["series", "bin_len_ms", "ts_msp"],
["series", "binlen", "msp"],
["off"],
rett.ttl_binned(),
);

View File

@@ -45,7 +45,6 @@ pub struct DataStore {
pub qu_insert_array_f32: Arc<PreparedStatement>,
pub qu_insert_array_f64: Arc<PreparedStatement>,
pub qu_insert_array_bool: Arc<PreparedStatement>,
pub qu_insert_binned_scalar_f32_v01: Arc<PreparedStatement>,
pub qu_insert_binned_scalar_f32_v02: Arc<PreparedStatement>,
pub qu_account_00: Arc<PreparedStatement>,
pub qu_account_recv_00: Arc<PreparedStatement>,
@@ -150,18 +149,10 @@ impl DataStore {
let qu_insert_array_f64 = prep_qu_ins_b!("events_array_f64", rett, scy);
let qu_insert_array_bool = prep_qu_ins_b!("events_array_bool", rett, scy);
let qu_insert_binned_scalar_f32_v01 = prep_qu_ins_c!(
"binned_scalar_f32_v01",
"series, bin_len_ms, ts_msp, off, count, min, max, avg",
"?, ?, ?, ?, ?, ?, ?, ?",
rett,
scy
);
let qu_insert_binned_scalar_f32_v02 = prep_qu_ins_c!(
"binned_scalar_f32",
"series, bin_len_ms, ts_msp, off, count, min, max, avg",
"?, ?, ?, ?, ?, ?, ?, ?",
"binned_scalar_f32_v02",
"series, binlen, msp, off, count, min, max, avg, dev",
"?, ?, ?, ?, ?, ?, ?, ?, ?",
rett,
scy
);
@@ -219,7 +210,6 @@ impl DataStore {
qu_insert_array_f32,
qu_insert_array_f64,
qu_insert_array_bool,
qu_insert_binned_scalar_f32_v01,
qu_insert_binned_scalar_f32_v02,
qu_account_00,
qu_account_recv_00,

View File

@@ -10,11 +10,10 @@ use netpod::BinnedRange;
use netpod::DtMs;
use netpod::ScalarType;
use netpod::Shape;
use netpod::TsMs;
use netpod::TsNano;
use scywr::insertqueues::InsertDeques;
use scywr::iteminsertqueue::QueryItem;
use scywr::iteminsertqueue::TimeBinSimpleF32V01;
use scywr::iteminsertqueue::TimeBinSimpleF32V02;
use series::ChannelStatusSeriesId;
use series::SeriesId;
@@ -114,17 +113,21 @@ impl BinWriterGrid {
// TODO
return Err(Error::UnsupportedBinGrid(bin_len));
};
let ts_msp = TsMs::from_ms_u64(ts1.ms() / div.ms() * div.ms());
let off = (ts1.ms() - ts_msp.ms()) / bin_len.ms();
let item = QueryItem::TimeBinSimpleF32V01(TimeBinSimpleF32V01 {
if div.ns() % bin_len.ns() != 0 {
panic!("divisor not a multiple {:?} {:?}", bin_len, div);
}
let msp = ts1.ms() / div.ms();
let off = (ts1.ms() - div.ms() * msp) / bin_len.ms();
let item = QueryItem::TimeBinSimpleF32V02(TimeBinSimpleF32V02 {
series: self.sid.clone(),
bin_len_ms: bin_len.ms() as i32,
ts_msp,
binlen: bin_len.ms() as i32,
msp: msp as i64,
off: off as i32,
count: cnt as i64,
cnt: cnt as i64,
min,
max,
avg,
dev: f32::NAN,
});
match &self.rt {
RetentionTime::Short => {