diff --git a/scywr/src/insertworker.rs b/scywr/src/insertworker.rs index 781f9b1..603d09e 100644 --- a/scywr/src/insertworker.rs +++ b/scywr/src/insertworker.rs @@ -1,13 +1,14 @@ use crate::config::ScyllaIngestConfig; -use crate::iteminsertqueue::insert_item_fut; -use crate::iteminsertqueue::insert_msp_fut; use crate::iteminsertqueue::Accounting; use crate::iteminsertqueue::AccountingRecv; +use crate::iteminsertqueue::BinWriteIndexV00; use crate::iteminsertqueue::InsertFut; use crate::iteminsertqueue::InsertItem; use crate::iteminsertqueue::MspItem; use crate::iteminsertqueue::QueryItem; use crate::iteminsertqueue::TimeBinSimpleF32V02; +use crate::iteminsertqueue::insert_item_fut; +use crate::iteminsertqueue::insert_msp_fut; use crate::store::DataStore; use async_channel::Receiver; use atomic::AtomicU64; @@ -15,12 +16,12 @@ use futures_util::Stream; use futures_util::StreamExt; use log::*; use netpod::ttl::RetentionTime; -use smallvec::smallvec; use smallvec::SmallVec; +use smallvec::smallvec; use stats::InsertWorkerStats; use std::collections::VecDeque; -use std::sync::atomic; use std::sync::Arc; +use std::sync::atomic; use std::time::Duration; use std::time::Instant; use taskrun::tokio; @@ -42,13 +43,7 @@ macro_rules! trace_item_execute { }; } -macro_rules! debug_setup { - ($($arg:tt)*) => { - if false { - debug!($($arg)*); - } - }; -} +macro_rules! debug_setup { ($($arg:expr),*) => ( if false { debug!($($arg),*); } ); } autoerr::create_error_v1!( name(Error, "ScyllaInsertWorker"), @@ -278,6 +273,13 @@ where prepare_timebin_v02_insert_futs(item, &data_store, &stats, tsnow) } } + QueryItem::BinWriteIndexV00(item) => { + if ignore_writes { + SmallVec::new() + } else { + prepare_bin_write_index_v00_insert_futs(item, &data_store, &stats, tsnow) + } + } QueryItem::Accounting(item) => { if ignore_writes { SmallVec::new() @@ -319,6 +321,9 @@ fn inspect_items( QueryItem::TimeBinSimpleF32V02(_) => { trace_item_execute!("execute {worker_name} TimeBinSimpleF32V02"); } + QueryItem::BinWriteIndexV00(_) => { + trace_item_execute!("execute {worker_name} BinWriteIndexV00"); + } QueryItem::Accounting(_) => { trace_item_execute!("execute {worker_name} Accounting {item:?}"); } @@ -415,6 +420,38 @@ fn prepare_timebin_v02_insert_futs( futs } +fn prepare_bin_write_index_v00_insert_futs( + item: BinWriteIndexV00, + data_store: &Arc, + stats: &Arc, + tsnow: Instant, +) -> SmallVec<[InsertFut; 4]> { + let params = (item.series, item.div, item.quo, item.rem, item.rt, item.binlen); + // TODO would be better to count inserts only on completed insert + stats.inserted_binned().inc(); + let fut = InsertFut::new( + data_store.scy.clone(), + data_store.qu_insert_bin_write_index_v00.clone(), + params, + tsnow, + stats.clone(), + ); + let futs = smallvec![fut]; + + // TODO match on the query result: + // match qres { + // Ok(_) => { + // backoff = backoff_0; + // } + // Err(e) => { + // stats_inc_for_err(&stats, &crate::iteminsertqueue::Error::QueryError(e)); + // back_off_sleep(&mut backoff).await; + // } + // } + + futs +} + fn prepare_accounting_insert_futs( item: Accounting, data_store: &Arc, diff --git a/scywr/src/iteminsertqueue.rs b/scywr/src/iteminsertqueue.rs index d4ac427..c7779e4 100644 --- a/scywr/src/iteminsertqueue.rs +++ b/scywr/src/iteminsertqueue.rs @@ -5,12 +5,13 @@ use crate::store::DataStore; use bytes::BufMut; use futures_util::Future; use futures_util::FutureExt; -use netpod::channelstatus::ChannelStatus; -use netpod::channelstatus::ChannelStatusClosedReason; use netpod::DtNano; use netpod::Shape; use netpod::TsMs; use netpod::TsNano; +use netpod::channelstatus::ChannelStatus; +use netpod::channelstatus::ChannelStatusClosedReason; +use scylla::QueryResult; use scylla::frame::value::Value; use scylla::frame::value::ValueList; use scylla::prepared_statement::PreparedStatement; @@ -18,9 +19,9 @@ use scylla::serialize::row::SerializeRow; use scylla::serialize::value::SerializeValue; use scylla::transport::errors::DbError; use scylla::transport::errors::QueryError; -use scylla::QueryResult; use series::ChannelStatusSeriesId; use series::SeriesId; +use series::msp::PrebinnedPartitioning; use stats::InsertWorkerStats; use std::net::SocketAddrV4; use std::pin::Pin; @@ -549,12 +550,23 @@ pub struct TimeBinSimpleF32V02 { pub lst: f32, } +#[derive(Debug, Clone)] +pub struct BinWriteIndexV00 { + pub series: i64, + pub div: i32, + pub quo: i64, + pub rem: i32, + pub rt: i32, + pub binlen: i32, +} + // Needs to be Clone to send it to multiple retention times if required. #[derive(Debug, Clone)] pub enum QueryItem { Insert(InsertItem), Msp(MspItem), TimeBinSimpleF32V02(TimeBinSimpleF32V02), + BinWriteIndexV00(BinWriteIndexV00), Accounting(Accounting), AccountingRecv(AccountingRecv), } diff --git a/scywr/src/store.rs b/scywr/src/store.rs index 5b63d5a..f158277 100644 --- a/scywr/src/store.rs +++ b/scywr/src/store.rs @@ -1,10 +1,10 @@ use crate::config::ScyllaIngestConfig; use crate::session::create_session; use netpod::ttl::RetentionTime; +use scylla::Session as ScySession; use scylla::prepared_statement::PreparedStatement; use scylla::transport::errors::NewSessionError; use scylla::transport::errors::QueryError; -use scylla::Session as ScySession; use std::sync::Arc; autoerr::create_error_v1!( @@ -45,6 +45,7 @@ pub struct DataStore { pub qu_insert_array_f64: Arc, pub qu_insert_array_bool: Arc, pub qu_insert_binned_scalar_f32_v02: Arc, + pub qu_insert_bin_write_index_v00: Arc, pub qu_account_00: Arc, pub qu_account_recv_00: Arc, pub qu_dummy: Arc, @@ -156,6 +157,14 @@ impl DataStore { scy ); + let qu_insert_bin_write_index_v00 = prep_qu_ins_c!( + "bin_write_index_v00", + "series, div, quo, rem, rt, binlen", + "?, ?, ?, ?, ?, ?", + rett, + scy + ); + let qu_account_00 = prep_qu_ins_c!( "account_00", "part, ts, series, count, bytes", @@ -210,6 +219,7 @@ impl DataStore { qu_insert_array_f64, qu_insert_array_bool, qu_insert_binned_scalar_f32_v02, + qu_insert_bin_write_index_v00, qu_account_00, qu_account_recv_00, qu_dummy, diff --git a/serieswriter/src/binwriter.rs b/serieswriter/src/binwriter.rs index 6568644..db03ac0 100644 --- a/serieswriter/src/binwriter.rs +++ b/serieswriter/src/binwriter.rs @@ -16,6 +16,7 @@ use netpod::Shape; use netpod::TsNano; use netpod::ttl::RetentionTime; use scywr::insertqueues::InsertDeques; +use scywr::iteminsertqueue::BinWriteIndexV00; use scywr::iteminsertqueue::QueryItem; use scywr::iteminsertqueue::TimeBinSimpleF32V02; use series::ChannelStatusSeriesId; @@ -25,12 +26,14 @@ use std::time::Duration; macro_rules! info { ($($arg:expr),*) => ( if true { log::info!($($arg),*); } ) } -macro_rules! trace_ingest { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ) } -macro_rules! trace_tick { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ) } -macro_rules! trace_tick_verbose { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ) } - +macro_rules! debug_init { ($t:expr, $($arg:expr),*) => ( if true { if $t { log::debug!($($arg),*); } } ) } macro_rules! debug_bin { ($t:expr, $($arg:expr),*) => ( if true { if $t { log::debug!($($arg),*); } } ) } -macro_rules! trace_bin { ($t:expr, $($arg:expr),*) => ( if false { if $t { log::trace!($($arg),*); } } ) } + +macro_rules! trace_ingest { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ) } +macro_rules! trace_tick { ($($arg:expr),*) => ( if true { log::trace!($($arg),*); } ) } +macro_rules! trace_tick_verbose { ($($arg:expr),*) => ( if true { log::trace!($($arg),*); } ) } + +macro_rules! trace_bin { ($t:expr, $($arg:expr),*) => ( if true { if $t { log::trace!($($arg),*); } } ) } autoerr::create_error_v1!( name(Error, "SerieswriterBinwriter"), @@ -43,6 +46,7 @@ autoerr::create_error_v1!( UnexpectedContainerType, PartitionMsp(#[from] series::msp::Error), UnsupportedGridDiv(DtMs, DtMs), + UnexpectedBinLen(DtMs, PrebinnedPartitioning), BinnerNoProgress, IngestLoopLimit, }, @@ -81,25 +85,24 @@ impl WriteCntZero { #[derive(Debug)] struct IndexWritten { - last: (u32, u64, u32), + last: Option<(PrebinnedPartitioning, u64, u32)>, } impl IndexWritten { fn new() -> Self { - Self { last: (0, 0, 0) } + Self { last: None } } - fn should_write(&self, div: u32, quo: u64, rem: u32) -> bool { - let (div0, quo0, rem0) = self.last; - if div0 == 0 || quo0 != quo || rem0 != rem { - true + fn should_write(&self, _div: PrebinnedPartitioning, quo: u64, rem: u32) -> bool { + if let Some((_div0, quo0, rem0)) = &self.last { + *quo0 != quo || *rem0 != rem } else { - false + true } } - fn mark_written(&mut self, div: u32, quo: u64, rem: u32) { - self.last = (div, quo, rem); + fn mark_written(&mut self, div: PrebinnedPartitioning, quo: u64, rem: u32) { + self.last = Some((div, quo, rem)); } } @@ -111,8 +114,18 @@ pub struct BinWriter { scalar_type: ScalarType, shape: Shape, evbuf: ContainerEvents, - binner_1st: Option<(RetentionTime, BinnedEventsTimeweight, WriteCntZero)>, - binner_others: Vec<(RetentionTime, BinnedBinsTimeweight, WriteCntZero)>, + binner_1st: Option<( + RetentionTime, + BinnedEventsTimeweight, + WriteCntZero, + PrebinnedPartitioning, + )>, + binner_others: Vec<( + RetentionTime, + BinnedBinsTimeweight, + WriteCntZero, + PrebinnedPartitioning, + )>, index_written: IndexWritten, trd: bool, } @@ -128,7 +141,7 @@ impl BinWriter { shape: Shape, chname: String, ) -> Result { - let trd = series::dbg::dbg_chn(&chname); + let trd = true || series::dbg::dbg_chn(&chname); if trd { debug_bin!(trd, "enabled debug for {}", chname); } @@ -138,13 +151,21 @@ impl BinWriter { let quiets = [min_quiets.st.clone(), min_quiets.mt.clone(), min_quiets.lt.clone()]; let mut binner_1st = None; let mut binner_others = Vec::new(); + let mut has_monitor = false; let mut combs: Vec<_> = rts .into_iter() .zip(quiets.into_iter().map(|x| DtMs::from_ms_u64(x.as_millis() as u64))) + .inspect(|x| { + if x.1 <= DUR_ZERO { + has_monitor = true; + } + }) .filter(|x| x.1 > DUR_ZERO && x.1 < DUR_MAX) - .map(|x| (x.0, bin_len_clamp(x.1))) - .map(|x| (x.0, x.1, WriteCntZero::Disable)) + .map(|x| (x.0, bin_len_clamp(x.1), WriteCntZero::Disable)) .collect(); + let has_monitor = has_monitor; + debug_init!(trd, "has_monitor {:?} is_polled {:?}", has_monitor, is_polled); + debug_init!(trd, "combs A {:?}", combs); if let Some(last) = combs.last_mut() { match &last.1 { PrebinnedPartitioning::Day1 => { @@ -161,10 +182,12 @@ impl BinWriter { } } } - if !is_polled && combs.len() > 1 { + debug_init!(trd, "combs B {:?}", combs); + if !is_polled && !has_monitor && combs.len() > 1 { combs.remove(0); } let combs = combs; + debug_init!(trd, "combs C {:?}", combs); debug_bin!(trd, "{:?} binning combs {:?}", chname, combs); for (rt, pbp, write_zero) in combs { if binner_1st.is_none() { @@ -173,7 +196,7 @@ impl BinWriter { if let WriteCntZero::Enable = write_zero { binner.cnt_zero_enable(); } - binner_1st = Some((rt, binner, write_zero)); + binner_1st = Some((rt, binner, write_zero, pbp)); } else { let range = BinnedRange::from_beg_to_inf(beg, pbp.bin_len()); let binner = BinnedBinsTimeweight::new(range); @@ -181,7 +204,7 @@ impl BinWriter { // TODO // binner.cnt_zero_enable(); } - binner_others.push((rt, binner, write_zero)); + binner_others.push((rt, binner, write_zero, pbp)); } } let ret = Self { @@ -254,6 +277,7 @@ impl BinWriter { let rt = ee.0.clone(); let write_zero = ee.2.clone(); let binner = &mut ee.1; + let pbp = ee.3.clone(); // TODO avoid boxing let bufbox = Box::new(buf); use items_0::timebin::IngestReport; @@ -278,12 +302,13 @@ impl BinWriter { &bins, write_zero, &mut self.index_written, + pbp, iqdqs, )?; // TODO avoid boxing let mut bins2: BinsBoxed = Box::new(bins); for i in 0..self.binner_others.len() { - let (rt, binner, write_zero) = &mut self.binner_others[i]; + let (rt, binner, write_zero, pbp) = &mut self.binner_others[i]; let write_zero = write_zero.clone(); binner.ingest(&bins2)?; let bb: Option = binner.output()?; @@ -298,7 +323,8 @@ impl BinWriter { rt.clone(), &bb2, write_zero, - todo!(), + &mut self.index_written, + pbp.clone(), iqdqs, )?; } else { @@ -333,6 +359,7 @@ impl BinWriter { bins: &ContainerBins, write_zero: WriteCntZero, index_written: &mut IndexWritten, + pbp: PrebinnedPartitioning, iqdqs: &mut InsertDeques, ) -> Result<(), Error> { let selfname = "handle_output_ready"; @@ -348,7 +375,10 @@ impl BinWriter { } else if cnt == 0 && !write_zero.enabled() { info!("zero count bin {:?}", series); } else { - let pbp = PrebinnedPartitioning::try_from(bin_len)?; + if bin_len != pbp.bin_len() { + let e = Error::UnexpectedBinLen(bin_len, pbp); + return Err(e); + } let div = pbp.msp_div(); if div.ns() % bin_len.ns() != 0 { let e = Error::UnsupportedGridDiv(bin_len, div); @@ -368,7 +398,7 @@ impl BinWriter { dev: f32::NAN, lst, }); - if bin_len >= DtMs::from_ms_u64(1000 * 60 * 60) { + if true || bin_len >= DtMs::from_ms_u64(1000 * 60 * 60) { debug_bin!(trd, "handle_output_ready emit {:?} len {} {:?}", rt, bins_len, item); } match rt { @@ -382,10 +412,32 @@ impl BinWriter { iqdqs.lt_rf3_qu.push_back(item); } } - let div = PrebinnedPartitioning::Day1; - series.id(); - ts1.ms() / div.msp_div().ms(); + let (quo, rem) = div.quo_rem(ts1.to_ts_ms()); + if index_written.should_write(div.clone(), quo, rem) { + index_written.mark_written(div.clone(), quo, rem); + let item = BinWriteIndexV00 { + series: series.id() as i64, + div: div.msp_div().ms() as i32, + quo: quo as i64, + rem: rem as i32, + rt: rt.index_db_i32(), + binlen: pbp.bin_len().ms() as i32, + }; + let item = QueryItem::BinWriteIndexV00(item); + match rt { + RetentionTime::Short => { + iqdqs.st_rf3_qu.push_back(item); + } + RetentionTime::Medium => { + iqdqs.mt_rf3_qu.push_back(item); + } + RetentionTime::Long => { + iqdqs.lt_rf3_qu.push_back(item); + } + } + } else { + } } } Ok(())