Write index to db

This commit is contained in:
Dominik Werder
2025-02-27 16:02:35 +01:00
parent e3074411dc
commit 424ed96ab3
4 changed files with 155 additions and 44 deletions

View File

@@ -1,13 +1,14 @@
use crate::config::ScyllaIngestConfig;
use crate::iteminsertqueue::insert_item_fut;
use crate::iteminsertqueue::insert_msp_fut;
use crate::iteminsertqueue::Accounting;
use crate::iteminsertqueue::AccountingRecv;
use crate::iteminsertqueue::BinWriteIndexV00;
use crate::iteminsertqueue::InsertFut;
use crate::iteminsertqueue::InsertItem;
use crate::iteminsertqueue::MspItem;
use crate::iteminsertqueue::QueryItem;
use crate::iteminsertqueue::TimeBinSimpleF32V02;
use crate::iteminsertqueue::insert_item_fut;
use crate::iteminsertqueue::insert_msp_fut;
use crate::store::DataStore;
use async_channel::Receiver;
use atomic::AtomicU64;
@@ -15,12 +16,12 @@ use futures_util::Stream;
use futures_util::StreamExt;
use log::*;
use netpod::ttl::RetentionTime;
use smallvec::smallvec;
use smallvec::SmallVec;
use smallvec::smallvec;
use stats::InsertWorkerStats;
use std::collections::VecDeque;
use std::sync::atomic;
use std::sync::Arc;
use std::sync::atomic;
use std::time::Duration;
use std::time::Instant;
use taskrun::tokio;
@@ -42,13 +43,7 @@ macro_rules! trace_item_execute {
};
}
macro_rules! debug_setup {
($($arg:tt)*) => {
if false {
debug!($($arg)*);
}
};
}
macro_rules! debug_setup { ($($arg:expr),*) => ( if false { debug!($($arg),*); } ); }
autoerr::create_error_v1!(
name(Error, "ScyllaInsertWorker"),
@@ -278,6 +273,13 @@ where
prepare_timebin_v02_insert_futs(item, &data_store, &stats, tsnow)
}
}
QueryItem::BinWriteIndexV00(item) => {
if ignore_writes {
SmallVec::new()
} else {
prepare_bin_write_index_v00_insert_futs(item, &data_store, &stats, tsnow)
}
}
QueryItem::Accounting(item) => {
if ignore_writes {
SmallVec::new()
@@ -319,6 +321,9 @@ fn inspect_items(
QueryItem::TimeBinSimpleF32V02(_) => {
trace_item_execute!("execute {worker_name} TimeBinSimpleF32V02");
}
QueryItem::BinWriteIndexV00(_) => {
trace_item_execute!("execute {worker_name} BinWriteIndexV00");
}
QueryItem::Accounting(_) => {
trace_item_execute!("execute {worker_name} Accounting {item:?}");
}
@@ -415,6 +420,38 @@ fn prepare_timebin_v02_insert_futs(
futs
}
fn prepare_bin_write_index_v00_insert_futs(
item: BinWriteIndexV00,
data_store: &Arc<DataStore>,
stats: &Arc<InsertWorkerStats>,
tsnow: Instant,
) -> SmallVec<[InsertFut; 4]> {
let params = (item.series, item.div, item.quo, item.rem, item.rt, item.binlen);
// TODO would be better to count inserts only on completed insert
stats.inserted_binned().inc();
let fut = InsertFut::new(
data_store.scy.clone(),
data_store.qu_insert_bin_write_index_v00.clone(),
params,
tsnow,
stats.clone(),
);
let futs = smallvec![fut];
// TODO match on the query result:
// match qres {
// Ok(_) => {
// backoff = backoff_0;
// }
// Err(e) => {
// stats_inc_for_err(&stats, &crate::iteminsertqueue::Error::QueryError(e));
// back_off_sleep(&mut backoff).await;
// }
// }
futs
}
fn prepare_accounting_insert_futs(
item: Accounting,
data_store: &Arc<DataStore>,

View File

@@ -5,12 +5,13 @@ use crate::store::DataStore;
use bytes::BufMut;
use futures_util::Future;
use futures_util::FutureExt;
use netpod::channelstatus::ChannelStatus;
use netpod::channelstatus::ChannelStatusClosedReason;
use netpod::DtNano;
use netpod::Shape;
use netpod::TsMs;
use netpod::TsNano;
use netpod::channelstatus::ChannelStatus;
use netpod::channelstatus::ChannelStatusClosedReason;
use scylla::QueryResult;
use scylla::frame::value::Value;
use scylla::frame::value::ValueList;
use scylla::prepared_statement::PreparedStatement;
@@ -18,9 +19,9 @@ use scylla::serialize::row::SerializeRow;
use scylla::serialize::value::SerializeValue;
use scylla::transport::errors::DbError;
use scylla::transport::errors::QueryError;
use scylla::QueryResult;
use series::ChannelStatusSeriesId;
use series::SeriesId;
use series::msp::PrebinnedPartitioning;
use stats::InsertWorkerStats;
use std::net::SocketAddrV4;
use std::pin::Pin;
@@ -549,12 +550,23 @@ pub struct TimeBinSimpleF32V02 {
pub lst: f32,
}
#[derive(Debug, Clone)]
pub struct BinWriteIndexV00 {
pub series: i64,
pub div: i32,
pub quo: i64,
pub rem: i32,
pub rt: i32,
pub binlen: i32,
}
// Needs to be Clone to send it to multiple retention times if required.
#[derive(Debug, Clone)]
pub enum QueryItem {
Insert(InsertItem),
Msp(MspItem),
TimeBinSimpleF32V02(TimeBinSimpleF32V02),
BinWriteIndexV00(BinWriteIndexV00),
Accounting(Accounting),
AccountingRecv(AccountingRecv),
}

View File

@@ -1,10 +1,10 @@
use crate::config::ScyllaIngestConfig;
use crate::session::create_session;
use netpod::ttl::RetentionTime;
use scylla::Session as ScySession;
use scylla::prepared_statement::PreparedStatement;
use scylla::transport::errors::NewSessionError;
use scylla::transport::errors::QueryError;
use scylla::Session as ScySession;
use std::sync::Arc;
autoerr::create_error_v1!(
@@ -45,6 +45,7 @@ pub struct DataStore {
pub qu_insert_array_f64: Arc<PreparedStatement>,
pub qu_insert_array_bool: Arc<PreparedStatement>,
pub qu_insert_binned_scalar_f32_v02: Arc<PreparedStatement>,
pub qu_insert_bin_write_index_v00: Arc<PreparedStatement>,
pub qu_account_00: Arc<PreparedStatement>,
pub qu_account_recv_00: Arc<PreparedStatement>,
pub qu_dummy: Arc<PreparedStatement>,
@@ -156,6 +157,14 @@ impl DataStore {
scy
);
let qu_insert_bin_write_index_v00 = prep_qu_ins_c!(
"bin_write_index_v00",
"series, div, quo, rem, rt, binlen",
"?, ?, ?, ?, ?, ?",
rett,
scy
);
let qu_account_00 = prep_qu_ins_c!(
"account_00",
"part, ts, series, count, bytes",
@@ -210,6 +219,7 @@ impl DataStore {
qu_insert_array_f64,
qu_insert_array_bool,
qu_insert_binned_scalar_f32_v02,
qu_insert_bin_write_index_v00,
qu_account_00,
qu_account_recv_00,
qu_dummy,

View File

@@ -16,6 +16,7 @@ use netpod::Shape;
use netpod::TsNano;
use netpod::ttl::RetentionTime;
use scywr::insertqueues::InsertDeques;
use scywr::iteminsertqueue::BinWriteIndexV00;
use scywr::iteminsertqueue::QueryItem;
use scywr::iteminsertqueue::TimeBinSimpleF32V02;
use series::ChannelStatusSeriesId;
@@ -25,12 +26,14 @@ use std::time::Duration;
macro_rules! info { ($($arg:expr),*) => ( if true { log::info!($($arg),*); } ) }
macro_rules! trace_ingest { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ) }
macro_rules! trace_tick { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ) }
macro_rules! trace_tick_verbose { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ) }
macro_rules! debug_init { ($t:expr, $($arg:expr),*) => ( if true { if $t { log::debug!($($arg),*); } } ) }
macro_rules! debug_bin { ($t:expr, $($arg:expr),*) => ( if true { if $t { log::debug!($($arg),*); } } ) }
macro_rules! trace_bin { ($t:expr, $($arg:expr),*) => ( if false { if $t { log::trace!($($arg),*); } } ) }
macro_rules! trace_ingest { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ) }
macro_rules! trace_tick { ($($arg:expr),*) => ( if true { log::trace!($($arg),*); } ) }
macro_rules! trace_tick_verbose { ($($arg:expr),*) => ( if true { log::trace!($($arg),*); } ) }
macro_rules! trace_bin { ($t:expr, $($arg:expr),*) => ( if true { if $t { log::trace!($($arg),*); } } ) }
autoerr::create_error_v1!(
name(Error, "SerieswriterBinwriter"),
@@ -43,6 +46,7 @@ autoerr::create_error_v1!(
UnexpectedContainerType,
PartitionMsp(#[from] series::msp::Error),
UnsupportedGridDiv(DtMs, DtMs),
UnexpectedBinLen(DtMs, PrebinnedPartitioning),
BinnerNoProgress,
IngestLoopLimit,
},
@@ -81,25 +85,24 @@ impl WriteCntZero {
#[derive(Debug)]
struct IndexWritten {
last: (u32, u64, u32),
last: Option<(PrebinnedPartitioning, u64, u32)>,
}
impl IndexWritten {
fn new() -> Self {
Self { last: (0, 0, 0) }
Self { last: None }
}
fn should_write(&self, div: u32, quo: u64, rem: u32) -> bool {
let (div0, quo0, rem0) = self.last;
if div0 == 0 || quo0 != quo || rem0 != rem {
true
fn should_write(&self, _div: PrebinnedPartitioning, quo: u64, rem: u32) -> bool {
if let Some((_div0, quo0, rem0)) = &self.last {
*quo0 != quo || *rem0 != rem
} else {
false
true
}
}
fn mark_written(&mut self, div: u32, quo: u64, rem: u32) {
self.last = (div, quo, rem);
fn mark_written(&mut self, div: PrebinnedPartitioning, quo: u64, rem: u32) {
self.last = Some((div, quo, rem));
}
}
@@ -111,8 +114,18 @@ pub struct BinWriter {
scalar_type: ScalarType,
shape: Shape,
evbuf: ContainerEvents<f32>,
binner_1st: Option<(RetentionTime, BinnedEventsTimeweight<f32>, WriteCntZero)>,
binner_others: Vec<(RetentionTime, BinnedBinsTimeweight<f32, f32>, WriteCntZero)>,
binner_1st: Option<(
RetentionTime,
BinnedEventsTimeweight<f32>,
WriteCntZero,
PrebinnedPartitioning,
)>,
binner_others: Vec<(
RetentionTime,
BinnedBinsTimeweight<f32, f32>,
WriteCntZero,
PrebinnedPartitioning,
)>,
index_written: IndexWritten,
trd: bool,
}
@@ -128,7 +141,7 @@ impl BinWriter {
shape: Shape,
chname: String,
) -> Result<Self, Error> {
let trd = series::dbg::dbg_chn(&chname);
let trd = true || series::dbg::dbg_chn(&chname);
if trd {
debug_bin!(trd, "enabled debug for {}", chname);
}
@@ -138,13 +151,21 @@ impl BinWriter {
let quiets = [min_quiets.st.clone(), min_quiets.mt.clone(), min_quiets.lt.clone()];
let mut binner_1st = None;
let mut binner_others = Vec::new();
let mut has_monitor = false;
let mut combs: Vec<_> = rts
.into_iter()
.zip(quiets.into_iter().map(|x| DtMs::from_ms_u64(x.as_millis() as u64)))
.inspect(|x| {
if x.1 <= DUR_ZERO {
has_monitor = true;
}
})
.filter(|x| x.1 > DUR_ZERO && x.1 < DUR_MAX)
.map(|x| (x.0, bin_len_clamp(x.1)))
.map(|x| (x.0, x.1, WriteCntZero::Disable))
.map(|x| (x.0, bin_len_clamp(x.1), WriteCntZero::Disable))
.collect();
let has_monitor = has_monitor;
debug_init!(trd, "has_monitor {:?} is_polled {:?}", has_monitor, is_polled);
debug_init!(trd, "combs A {:?}", combs);
if let Some(last) = combs.last_mut() {
match &last.1 {
PrebinnedPartitioning::Day1 => {
@@ -161,10 +182,12 @@ impl BinWriter {
}
}
}
if !is_polled && combs.len() > 1 {
debug_init!(trd, "combs B {:?}", combs);
if !is_polled && !has_monitor && combs.len() > 1 {
combs.remove(0);
}
let combs = combs;
debug_init!(trd, "combs C {:?}", combs);
debug_bin!(trd, "{:?} binning combs {:?}", chname, combs);
for (rt, pbp, write_zero) in combs {
if binner_1st.is_none() {
@@ -173,7 +196,7 @@ impl BinWriter {
if let WriteCntZero::Enable = write_zero {
binner.cnt_zero_enable();
}
binner_1st = Some((rt, binner, write_zero));
binner_1st = Some((rt, binner, write_zero, pbp));
} else {
let range = BinnedRange::from_beg_to_inf(beg, pbp.bin_len());
let binner = BinnedBinsTimeweight::new(range);
@@ -181,7 +204,7 @@ impl BinWriter {
// TODO
// binner.cnt_zero_enable();
}
binner_others.push((rt, binner, write_zero));
binner_others.push((rt, binner, write_zero, pbp));
}
}
let ret = Self {
@@ -254,6 +277,7 @@ impl BinWriter {
let rt = ee.0.clone();
let write_zero = ee.2.clone();
let binner = &mut ee.1;
let pbp = ee.3.clone();
// TODO avoid boxing
let bufbox = Box::new(buf);
use items_0::timebin::IngestReport;
@@ -278,12 +302,13 @@ impl BinWriter {
&bins,
write_zero,
&mut self.index_written,
pbp,
iqdqs,
)?;
// TODO avoid boxing
let mut bins2: BinsBoxed = Box::new(bins);
for i in 0..self.binner_others.len() {
let (rt, binner, write_zero) = &mut self.binner_others[i];
let (rt, binner, write_zero, pbp) = &mut self.binner_others[i];
let write_zero = write_zero.clone();
binner.ingest(&bins2)?;
let bb: Option<BinsBoxed> = binner.output()?;
@@ -298,7 +323,8 @@ impl BinWriter {
rt.clone(),
&bb2,
write_zero,
todo!(),
&mut self.index_written,
pbp.clone(),
iqdqs,
)?;
} else {
@@ -333,6 +359,7 @@ impl BinWriter {
bins: &ContainerBins<f32, f32>,
write_zero: WriteCntZero,
index_written: &mut IndexWritten,
pbp: PrebinnedPartitioning,
iqdqs: &mut InsertDeques,
) -> Result<(), Error> {
let selfname = "handle_output_ready";
@@ -348,7 +375,10 @@ impl BinWriter {
} else if cnt == 0 && !write_zero.enabled() {
info!("zero count bin {:?}", series);
} else {
let pbp = PrebinnedPartitioning::try_from(bin_len)?;
if bin_len != pbp.bin_len() {
let e = Error::UnexpectedBinLen(bin_len, pbp);
return Err(e);
}
let div = pbp.msp_div();
if div.ns() % bin_len.ns() != 0 {
let e = Error::UnsupportedGridDiv(bin_len, div);
@@ -368,7 +398,7 @@ impl BinWriter {
dev: f32::NAN,
lst,
});
if bin_len >= DtMs::from_ms_u64(1000 * 60 * 60) {
if true || bin_len >= DtMs::from_ms_u64(1000 * 60 * 60) {
debug_bin!(trd, "handle_output_ready emit {:?} len {} {:?}", rt, bins_len, item);
}
match rt {
@@ -382,10 +412,32 @@ impl BinWriter {
iqdqs.lt_rf3_qu.push_back(item);
}
}
let div = PrebinnedPartitioning::Day1;
series.id();
ts1.ms() / div.msp_div().ms();
let (quo, rem) = div.quo_rem(ts1.to_ts_ms());
if index_written.should_write(div.clone(), quo, rem) {
index_written.mark_written(div.clone(), quo, rem);
let item = BinWriteIndexV00 {
series: series.id() as i64,
div: div.msp_div().ms() as i32,
quo: quo as i64,
rem: rem as i32,
rt: rt.index_db_i32(),
binlen: pbp.bin_len().ms() as i32,
};
let item = QueryItem::BinWriteIndexV00(item);
match rt {
RetentionTime::Short => {
iqdqs.st_rf3_qu.push_back(item);
}
RetentionTime::Medium => {
iqdqs.mt_rf3_qu.push_back(item);
}
RetentionTime::Long => {
iqdqs.lt_rf3_qu.push_back(item);
}
}
} else {
}
}
}
Ok(())