diff --git a/scywr/src/insertworker.rs b/scywr/src/insertworker.rs index 6c2b9e4..392c10e 100644 --- a/scywr/src/insertworker.rs +++ b/scywr/src/insertworker.rs @@ -7,8 +7,7 @@ use crate::iteminsertqueue::InsertFut; use crate::iteminsertqueue::InsertItem; use crate::iteminsertqueue::MspItem; use crate::iteminsertqueue::QueryItem; -use crate::iteminsertqueue::TimeBinSimpleF32; -use crate::iteminsertqueue::TimeBinSimpleF32V01; +use crate::iteminsertqueue::TimeBinSimpleF32V02; use crate::store::DataStore; use async_channel::Receiver; use atomic::AtomicU64; @@ -266,9 +265,8 @@ where let futs = match item { QueryItem::Insert(item) => prepare_query_insert_futs(item, &data_store, &stats, tsnow), QueryItem::Msp(item) => prepare_msp_insert_futs(item, &data_store, &stats, tsnow), - QueryItem::TimeBinSimpleF32(item) => prepare_timebin_insert_futs(item, &data_store, &stats, tsnow), - QueryItem::TimeBinSimpleF32V01(item) => { - prepare_timebin_v01_insert_futs(item, &data_store, &stats, tsnow) + QueryItem::TimeBinSimpleF32V02(item) => { + prepare_timebin_v02_insert_futs(item, &data_store, &stats, tsnow) } QueryItem::Accounting(item) => prepare_accounting_insert_futs(item, &data_store, &stats, tsnow), QueryItem::AccountingRecv(item) => { @@ -298,11 +296,8 @@ fn inspect_items( QueryItem::Msp(item) => { trace_item_execute!("execute {worker_name} Msp {}", item.string_short()); } - QueryItem::TimeBinSimpleF32(_) => { - trace_item_execute!("execute {worker_name} TimeBinSimpleF32"); - } - QueryItem::TimeBinSimpleF32V01(_) => { - trace_item_execute!("execute {worker_name} TimeBinSimpleF32V01"); + QueryItem::TimeBinSimpleF32V02(_) => { + trace_item_execute!("execute {worker_name} TimeBinSimpleF32V02"); } QueryItem::Accounting(_) => { trace_item_execute!("execute {worker_name} Accounting {item:?}"); @@ -357,22 +352,22 @@ fn prepare_query_insert_futs( futs } -fn prepare_timebin_insert_futs( - item: TimeBinSimpleF32, +fn prepare_timebin_v02_insert_futs( + item: TimeBinSimpleF32V02, data_store: &Arc, stats: &Arc, tsnow: Instant, ) -> SmallVec<[InsertFut; 4]> { - trace!("have time bin patch to insert: {item:?}"); let params = ( item.series.id() as i64, - item.bin_len_ms, - item.ts_msp.to_i64(), + item.binlen, + item.msp, item.off, - item.count, + item.cnt, item.min, item.max, item.avg, + item.dev, ); // TODO would be better to count inserts only on completed insert stats.inserted_binned().inc(); @@ -399,48 +394,6 @@ fn prepare_timebin_insert_futs( futs } -fn prepare_timebin_v01_insert_futs( - item: TimeBinSimpleF32V01, - data_store: &Arc, - stats: &Arc, - tsnow: Instant, -) -> SmallVec<[InsertFut; 4]> { - trace!("have time bin patch to insert: {item:?}"); - let params = ( - item.series.id() as i64, - item.bin_len_ms, - item.ts_msp.to_i64(), - item.off, - item.count, - item.min, - item.max, - item.avg, - ); - // TODO would be better to count inserts only on completed insert - stats.inserted_binned().inc(); - let fut = InsertFut::new( - data_store.scy.clone(), - data_store.qu_insert_binned_scalar_f32_v01.clone(), - params, - tsnow, - stats.clone(), - ); - let futs = smallvec![fut]; - - // TODO match on the query result: - // match qres { - // Ok(_) => { - // backoff = backoff_0; - // } - // Err(e) => { - // stats_inc_for_err(&stats, &crate::iteminsertqueue::Error::QueryError(e)); - // back_off_sleep(&mut backoff).await; - // } - // } - - futs -} - fn prepare_accounting_insert_futs( item: Accounting, data_store: &Arc, diff --git a/scywr/src/iteminsertqueue.rs b/scywr/src/iteminsertqueue.rs index e4fb115..2bdb3b8 100644 --- a/scywr/src/iteminsertqueue.rs +++ b/scywr/src/iteminsertqueue.rs @@ -538,27 +538,16 @@ impl InsertItem { } #[derive(Debug, Clone)] -pub struct TimeBinSimpleF32 { +pub struct TimeBinSimpleF32V02 { pub series: SeriesId, - pub bin_len_ms: i32, - pub ts_msp: TsMs, + pub binlen: i32, + pub msp: i64, pub off: i32, - pub count: i64, - pub min: f32, - pub max: f32, - pub avg: f32, -} - -#[derive(Debug, Clone)] -pub struct TimeBinSimpleF32V01 { - pub series: SeriesId, - pub bin_len_ms: i32, - pub ts_msp: TsMs, - pub off: i32, - pub count: i64, + pub cnt: i64, pub min: f32, pub max: f32, pub avg: f32, + pub dev: f32, } // Needs to be Clone to send it to multiple retention times if required. @@ -566,8 +555,7 @@ pub struct TimeBinSimpleF32V01 { pub enum QueryItem { Insert(InsertItem), Msp(MspItem), - TimeBinSimpleF32(TimeBinSimpleF32), - TimeBinSimpleF32V01(TimeBinSimpleF32V01), + TimeBinSimpleF32V02(TimeBinSimpleF32V02), Accounting(Accounting), AccountingRecv(AccountingRecv), } diff --git a/scywr/src/schema.rs b/scywr/src/schema.rs index 7e30f3f..ec4ddf2 100644 --- a/scywr/src/schema.rs +++ b/scywr/src/schema.rs @@ -657,39 +657,19 @@ pub async fn migrate_scylla_data_schema( let tab = GenTwcsTab::new( ks, rett.table_prefix(), - "binned_scalar_f32", + "binned_scalar_f32_v02", &[ ("series", "bigint"), - ("bin_len_ms", "int"), - ("ts_msp", "bigint"), + ("binlen", "int"), + ("msp", "bigint"), ("off", "int"), - ("count", "bigint"), + ("cnt", "bigint"), ("min", "float"), ("max", "float"), ("avg", "float"), + ("dev", "float"), ], - ["series", "bin_len_ms", "ts_msp"], - ["off"], - rett.ttl_binned(), - ); - tab.setup(do_change, scy).await?; - } - { - let tab = GenTwcsTab::new( - ks, - rett.table_prefix(), - "binned_scalar_f32_v01", - &[ - ("series", "bigint"), - ("bin_len_ms", "int"), - ("ts_msp", "bigint"), - ("off", "int"), - ("count", "bigint"), - ("min", "float"), - ("max", "float"), - ("avg", "float"), - ], - ["series", "bin_len_ms", "ts_msp"], + ["series", "binlen", "msp"], ["off"], rett.ttl_binned(), ); diff --git a/scywr/src/store.rs b/scywr/src/store.rs index a5b5acb..086d1b4 100644 --- a/scywr/src/store.rs +++ b/scywr/src/store.rs @@ -45,7 +45,6 @@ pub struct DataStore { pub qu_insert_array_f32: Arc, pub qu_insert_array_f64: Arc, pub qu_insert_array_bool: Arc, - pub qu_insert_binned_scalar_f32_v01: Arc, pub qu_insert_binned_scalar_f32_v02: Arc, pub qu_account_00: Arc, pub qu_account_recv_00: Arc, @@ -150,18 +149,10 @@ impl DataStore { let qu_insert_array_f64 = prep_qu_ins_b!("events_array_f64", rett, scy); let qu_insert_array_bool = prep_qu_ins_b!("events_array_bool", rett, scy); - let qu_insert_binned_scalar_f32_v01 = prep_qu_ins_c!( - "binned_scalar_f32_v01", - "series, bin_len_ms, ts_msp, off, count, min, max, avg", - "?, ?, ?, ?, ?, ?, ?, ?", - rett, - scy - ); - let qu_insert_binned_scalar_f32_v02 = prep_qu_ins_c!( - "binned_scalar_f32", - "series, bin_len_ms, ts_msp, off, count, min, max, avg", - "?, ?, ?, ?, ?, ?, ?, ?", + "binned_scalar_f32_v02", + "series, binlen, msp, off, count, min, max, avg, dev", + "?, ?, ?, ?, ?, ?, ?, ?, ?", rett, scy ); @@ -219,7 +210,6 @@ impl DataStore { qu_insert_array_f32, qu_insert_array_f64, qu_insert_array_bool, - qu_insert_binned_scalar_f32_v01, qu_insert_binned_scalar_f32_v02, qu_account_00, qu_account_recv_00, diff --git a/serieswriter/src/binwritergrid.rs b/serieswriter/src/binwritergrid.rs index 6f7d9bf..a490b5a 100644 --- a/serieswriter/src/binwritergrid.rs +++ b/serieswriter/src/binwritergrid.rs @@ -10,11 +10,10 @@ use netpod::BinnedRange; use netpod::DtMs; use netpod::ScalarType; use netpod::Shape; -use netpod::TsMs; use netpod::TsNano; use scywr::insertqueues::InsertDeques; use scywr::iteminsertqueue::QueryItem; -use scywr::iteminsertqueue::TimeBinSimpleF32V01; +use scywr::iteminsertqueue::TimeBinSimpleF32V02; use series::ChannelStatusSeriesId; use series::SeriesId; @@ -114,17 +113,21 @@ impl BinWriterGrid { // TODO return Err(Error::UnsupportedBinGrid(bin_len)); }; - let ts_msp = TsMs::from_ms_u64(ts1.ms() / div.ms() * div.ms()); - let off = (ts1.ms() - ts_msp.ms()) / bin_len.ms(); - let item = QueryItem::TimeBinSimpleF32V01(TimeBinSimpleF32V01 { + if div.ns() % bin_len.ns() != 0 { + panic!("divisor not a multiple {:?} {:?}", bin_len, div); + } + let msp = ts1.ms() / div.ms(); + let off = (ts1.ms() - div.ms() * msp) / bin_len.ms(); + let item = QueryItem::TimeBinSimpleF32V02(TimeBinSimpleF32V02 { series: self.sid.clone(), - bin_len_ms: bin_len.ms() as i32, - ts_msp, + binlen: bin_len.ms() as i32, + msp: msp as i64, off: off as i32, - count: cnt as i64, + cnt: cnt as i64, min, max, avg, + dev: f32::NAN, }); match &self.rt { RetentionTime::Short => {