diff --git a/daqingest/Cargo.toml b/daqingest/Cargo.toml index 2bf32d7..18dd0df 100644 --- a/daqingest/Cargo.toml +++ b/daqingest/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqingest" -version = "0.2.7-aa.4" +version = "0.2.7-aa.5" authors = ["Dominik Werder "] edition = "2024" diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 0629dcb..e0c9796 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -7,7 +7,6 @@ use async_channel::Receiver; use async_channel::Sender; use ca_proto::ca::proto; use ca_proto_tokio::tcpasyncwriteread::TcpAsyncWriteRead; -use core::fmt; use dbpg::seriesbychannel::ChannelInfoQuery; use dbpg::seriesbychannel::ChannelInfoResult; use enumfetch::ConnFuture; @@ -17,16 +16,16 @@ use futures_util::Stream; use futures_util::StreamExt; use hashbrown::HashMap; use log::*; -use netpod::channelstatus::ChannelStatus; -use netpod::channelstatus::ChannelStatusClosedReason; -use netpod::timeunits::*; -use netpod::ttl::RetentionTime; +use netpod::EMIT_ACCOUNTING_SNAP; use netpod::ScalarType; use netpod::SeriesKind; use netpod::Shape; use netpod::TsMs; use netpod::TsNano; -use netpod::EMIT_ACCOUNTING_SNAP; +use netpod::channelstatus::ChannelStatus; +use netpod::channelstatus::ChannelStatusClosedReason; +use netpod::timeunits::*; +use netpod::ttl::RetentionTime; use proto::CaDataValue; use proto::CaEventValue; use proto::CaItem; @@ -60,19 +59,20 @@ use serieswriter::fixgridwriter::ChannelStatusWriteState; use serieswriter::msptool::MspSplit; use serieswriter::rtwriter::RtWriter; use serieswriter::writer::EmittableType; -use stats::rand_xoshiro::rand_core::RngCore; -use stats::rand_xoshiro::rand_core::SeedableRng; -use stats::rand_xoshiro::Xoshiro128PlusPlus; use stats::CaConnStats; use stats::CaProtoStats; use stats::IntervalEma; +use stats::rand_xoshiro::Xoshiro128PlusPlus; +use stats::rand_xoshiro::rand_core::RngCore; +use stats::rand_xoshiro::rand_core::SeedableRng; use std::collections::BTreeMap; use std::collections::VecDeque; +use std::fmt; use std::net::SocketAddrV4; use std::pin::Pin; +use std::sync::Arc; use std::sync::atomic; use std::sync::atomic::AtomicUsize; -use std::sync::Arc; use std::task::Context; use std::task::Poll; use std::time::Duration; @@ -91,60 +91,17 @@ const READ_CHANNEL_VALUE_STATUS_EMIT_QUIET_MIN: Duration = Duration::from_millis const SILENCE_READ_NEXT_IVL: Duration = Duration::from_millis(1000 * 200); const POLL_READ_TIMEOUT: Duration = Duration::from_millis(1000 * 10); const DO_RATE_CHECK: bool = false; +const CHANNEL_STATUS_PONG_QUIET: Duration = Duration::from_millis(1000 * 60 * 60); -#[allow(unused)] -macro_rules! trace2 { - ($($arg:tt)*) => { - if true { - trace!($($arg)*); - } - }; -} +macro_rules! trace3 { ($($arg:expr),*) => ( if false { trace!($($arg),*); } ); } -#[allow(unused)] -macro_rules! trace3 { - ($($arg:tt)*) => { - if false { - trace!($($arg)*); - } - }; -} +macro_rules! trace4 { ($($arg:expr),*) => ( if false { trace!($($arg),*); } ); } -#[allow(unused)] -macro_rules! trace4 { - ($($arg:tt)*) => { - if false { - trace!($($arg)*); - } - }; -} +macro_rules! trace_flush_queue { ($($arg:expr),*) => ( if false { trace3!($($arg),*); } ); } -#[allow(unused)] -macro_rules! trace_flush_queue { - ($($arg:tt)*) => { - if false { - trace3!($($arg)*); - } - }; -} +macro_rules! trace_event_incoming { ($($arg:expr),*) => ( if false { trace!($($arg),*); } ); } -#[allow(unused)] -macro_rules! trace_event_incoming { - ($($arg:tt)*) => { - if false { - trace!($($arg)*); - } - }; -} - -#[allow(unused)] -macro_rules! trace_monitor_stale { - ($($arg:tt)*) => { - if false { - trace!($($arg)*); - } - }; -} +macro_rules! trace_monitor_stale { ($($arg:expr),*) => ( if false { trace!($($arg),*); } ); } fn dbg_chn_cid(cid: Cid, conn: &CaConn) -> bool { if let Some(name) = conn.name_by_cid(cid) { @@ -272,7 +229,7 @@ mod ser_instant { } #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] -struct Cid(pub u32); +pub struct Cid(pub u32); impl fmt::Display for Cid { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { @@ -636,11 +593,7 @@ impl ChannelState { let item_recv_ivl_ema = match self { ChannelState::Writable(s) => { let ema = s.channel.item_recv_ivl_ema.ema(); - if ema.update_count() == 0 { - None - } else { - Some(ema.ema()) - } + if ema.update_count() == 0 { None } else { Some(ema.ema()) } } _ => None, }; @@ -1091,6 +1044,7 @@ pub struct CaConn { trace_channel_poll: bool, ca_msg_recv_count: u64, ca_version_recv_count: u64, + ts_channel_status_pong_last: Instant, } impl Drop for CaConn { @@ -1157,6 +1111,7 @@ impl CaConn { trace_channel_poll: false, ca_msg_recv_count: 0, ca_version_recv_count: 0, + ts_channel_status_pong_last: tsnow, } } @@ -1195,6 +1150,14 @@ impl CaConn { Ioid(self.ioid) } + fn channel_status_qu(iqdqs: &mut InsertDeques) -> &mut VecDeque { + &mut iqdqs.lt_rf3_qu + } + + fn channel_status_pong_qu(iqdqs: &mut InsertDeques) -> &mut VecDeque { + &mut iqdqs.st_rf3_qu + } + pub fn conn_command_tx(&self) -> Sender { self.conn_command_tx.as_ref().get_ref().clone() } @@ -1422,7 +1385,8 @@ impl CaConn { cssid: st2.channel.cssid.clone(), status: ChannelStatus::Opened, }; - conf.wrst.emit_channel_status_item(item, &mut self.iqdqs.st_rf3_qu)?; + conf.wrst + .emit_channel_status_item(item, Self::channel_status_qu(&mut self.iqdqs))?; } if let Some((ivl,)) = conf_poll_conf { let ivl = Duration::from_millis(ivl); @@ -1449,10 +1413,7 @@ impl CaConn { self.cid_by_subid.insert(subid, cid); trace!( "new {:?} for {:?} chst {:?} {:?}", - subid, - cid, - st2.channel.cid, - st2.channel.sid + subid, cid, st2.channel.cid, st2.channel.sid ); subid }; @@ -1532,7 +1493,6 @@ impl CaConn { debug!("channel_close {}", name); let tsnow = Instant::now(); let stnow = SystemTime::now(); - let cid = if let Some(x) = self.cid_by_name.get(&name) { x.clone() } else { @@ -1540,22 +1500,16 @@ impl CaConn { return; }; self.cid_by_name.remove(&name); - if let Some(conf) = self.channels.get_mut(&cid) { - let mut item_deque = VecDeque::new(); let item = ChannelStatusItem { ts: stnow, cssid: conf.state.cssid(), status: ChannelStatus::Closed(ChannelStatusClosedReason::ChannelRemove), }; - let deque = &mut item_deque; - if conf.wrst.emit_channel_status_item(item, deque).is_err() { + let qu = Self::channel_status_qu(&mut self.iqdqs); + if conf.wrst.emit_channel_status_item(item, qu).is_err() { self.stats.logic_error().inc(); } - for x in item_deque { - self.iqdqs.st_rf3_qu.push_back(x); - } - // TODO shutdown the internal writer structures. if let Some(cst) = conf.state.created_state() { if let Some(proto) = self.proto.as_mut() { @@ -1567,7 +1521,6 @@ impl CaConn { proto.push_out(item); } } - { let mut it = self.cid_by_subid.extract_if(|_, v| *v == cid); if let Some((subid, _cid)) = it.next() { @@ -1589,14 +1542,11 @@ impl CaConn { } else { debug!("channel_close {} no channel block", name); }; - { let it = self.cid_by_sid.extract_if(|_, v| *v == cid); it.count(); } - self.channels.remove(&cid); - // TODO emit CaConn item to let CaConnSet know that we have closed the channel. // TODO may be too full let value = CaConnEventValue::ChannelRemoved(name); @@ -1656,21 +1606,17 @@ impl CaConn { // TODO can I reuse emit_channel_info_insert_items ? trace!("channel_state_on_shutdown channels {}", self.channels.len()); let stnow = self.tmp_ts_poll; - let mut item_deque = VecDeque::new(); + let status_qu = Self::channel_status_qu(&mut self.iqdqs); for (_cid, conf) in &mut self.channels { let item = ChannelStatusItem { ts: stnow, cssid: conf.state.cssid(), status: ChannelStatus::Closed(channel_reason.clone()), }; - let deque = &mut item_deque; - if conf.wrst.emit_channel_status_item(item, deque).is_err() { + if conf.wrst.emit_channel_status_item(item, status_qu).is_err() { self.stats.logic_error().inc(); } } - for x in item_deque { - self.iqdqs.st_rf3_qu.push_back(x); - } for (_cid, conf) in &mut self.channels { if series::dbg::dbg_chn(conf.conf.name()) { info!("channel_state_on_shutdown {:?}", conf); @@ -2046,7 +1992,7 @@ impl CaConn { self.read_ioids.remove(&st3.ioid); let next = PollTickStateIdle::decide_next(st3.next_backup, st2.poll_ivl, tsnow); if self.trace_channel_poll { - trace!("make next poll idle at {next:?} tsnow {tsnow:?}"); + trace!("make next poll idle at {:?} tsnow {:?}", next, tsnow); } st2.tick = PollTickState::Idle(PollTickStateIdle { next }); let iqdqs = &mut self.iqdqs; @@ -2085,7 +2031,7 @@ impl CaConn { } else { self.stats.recv_read_notify_state_read_pending.inc(); } - let read_expected = if let Some(cid) = self.read_ioids.remove(&ioid) { + let read_expected = if let Some(_cid) = self.read_ioids.remove(&ioid) { true } else { false @@ -2100,14 +2046,16 @@ impl CaConn { cssid: st.channel.cssid.clone(), status: ChannelStatus::MonitoringReadResultExpected, }; - ch_wrst.emit_channel_status_item(item, &mut self.iqdqs.st_rf3_qu)?; + ch_wrst + .emit_channel_status_item(item, Self::channel_status_qu(&mut self.iqdqs))?; } else { let item = ChannelStatusItem { ts: self.tmp_ts_poll, cssid: st.channel.cssid.clone(), status: ChannelStatus::MonitoringReadResultUnexpected, }; - ch_wrst.emit_channel_status_item(item, &mut self.iqdqs.st_rf3_qu)?; + ch_wrst + .emit_channel_status_item(item, Self::channel_status_qu(&mut self.iqdqs))?; } let iqdqs = &mut self.iqdqs; let stats = self.stats.as_ref(); @@ -2224,8 +2172,10 @@ impl CaConn { cssid: crst.cssid, status: ChannelStatus::MonitoringSilenceReadUnchanged, }; - let deque = &mut iqdqs.st_rf3_qu; - if wrst.emit_channel_status_item(item, deque).is_err() { + if wrst + .emit_channel_status_item(item, Self::channel_status_qu(iqdqs)) + .is_err() + { stats.logic_error().inc(); } } @@ -2463,7 +2413,8 @@ impl CaConn { cssid: st2.channel.cssid.clone(), status: ChannelStatus::MonitoringSilenceReadStart, }; - conf.wrst.emit_channel_status_item(item, &mut self.iqdqs.st_rf3_qu)?; + conf.wrst + .emit_channel_status_item(item, Self::channel_status_qu(&mut self.iqdqs))?; } } } @@ -2485,7 +2436,8 @@ impl CaConn { cssid: st2.channel.cssid.clone(), status: ChannelStatus::MonitoringSilenceReadTimeout, }; - conf.wrst.emit_channel_status_item(item, &mut self.iqdqs.st_rf3_qu)?; + conf.wrst + .emit_channel_status_item(item, Self::channel_status_qu(&mut self.iqdqs))?; } if false { // Here we try to close the channel at hand. @@ -3204,8 +3156,16 @@ impl CaConn { cssid: st1.channel.cssid, status: ChannelStatus::Pong, }; - let deque = &mut self.iqdqs.st_rf3_qu; - if ch.wrst.emit_channel_status_item(item, deque).is_err() { + let dt = self + .poll_tsnow + .saturating_duration_since(self.ts_channel_status_pong_last); + let qu = if dt >= CHANNEL_STATUS_PONG_QUIET { + self.ts_channel_status_pong_last = self.poll_tsnow; + Self::channel_status_qu(&mut self.iqdqs) + } else { + Self::channel_status_pong_qu(&mut self.iqdqs) + }; + if ch.wrst.emit_channel_status_item(item, qu).is_err() { self.stats.logic_error().inc(); } } diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index 334798c..84697ef 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -46,12 +46,12 @@ use statemap::ConnectionState; use statemap::ConnectionStateValue; use statemap::WithStatusSeriesIdState; use statemap::WithStatusSeriesIdStateInner; -use stats::rand_xoshiro::rand_core::RngCore; -use stats::rand_xoshiro::Xoshiro128PlusPlus; use stats::CaConnSetStats; use stats::CaConnStats; use stats::CaProtoStats; use stats::IocFinderStats; +use stats::rand_xoshiro::Xoshiro128PlusPlus; +use stats::rand_xoshiro::rand_core::RngCore; use std::collections::BTreeMap; use std::collections::VecDeque; use std::fmt; @@ -418,9 +418,12 @@ pub struct CaConnSet { find_ioc_res_rx: Pin>>>, find_ioc_queue_set: QueueSet, iqtx: Pin>, - storage_insert_queue_l1: VecDeque, - storage_insert_queue: VecDeque>, - storage_insert_sender: Pin>>>, + storage_insert_st_qu: VecDeque>, + storage_insert_st_qu_l1: VecDeque, + storage_insert_st_tx: Pin>>>, + storage_insert_lt_qu: VecDeque>, + storage_insert_lt_qu_l1: VecDeque, + storage_insert_lt_tx: Pin>>>, ca_conn_res_tx: Pin>>, ca_conn_res_rx: Pin>>, connset_out_queue: VecDeque, @@ -486,12 +489,13 @@ impl CaConnSet { find_ioc_res_rx: Box::pin(find_ioc_res_rx), find_ioc_queue_set: QueueSet::new(), iqtx: Box::pin(iqtx.clone()), - storage_insert_queue_l1: VecDeque::new(), - storage_insert_queue: VecDeque::new(), - + storage_insert_st_qu: VecDeque::new(), + storage_insert_st_qu_l1: VecDeque::new(), + storage_insert_st_tx: Box::pin(SenderPolling::new(iqtx.st_rf3_tx.clone())), // TODO simplify for all combinations - storage_insert_sender: Box::pin(SenderPolling::new(iqtx.st_rf3_tx.clone())), - + storage_insert_lt_qu: VecDeque::new(), + storage_insert_lt_qu_l1: VecDeque::new(), + storage_insert_lt_tx: Box::pin(SenderPolling::new(iqtx.lt_rf3_tx.clone())), ca_conn_res_tx: Box::pin(ca_conn_res_tx), ca_conn_res_rx: Box::pin(ca_conn_res_rx), shutdown_stopping: false, @@ -808,7 +812,7 @@ impl CaConnSet { let item = serieswriter::fixgridwriter::ChannelStatusWriteValue::new(ts, status.to_u64()); let state = &mut writer_status_state; let ts_net = Instant::now(); - let deque = &mut self.storage_insert_queue_l1; + let deque = &mut self.storage_insert_lt_qu_l1; writer_status.write(item, state, ts_net, ts, deque)?; } *chst2 = ActiveChannelState::WithStatusSeriesId(WithStatusSeriesIdState { @@ -873,7 +877,7 @@ impl CaConnSet { let item = serieswriter::fixgridwriter::ChannelStatusWriteValue::new(ts, status.to_u64()); let state = &mut writer_status_state; let ts_net = Instant::now(); - let deque = &mut self.storage_insert_queue_l1; + let deque = &mut self.storage_insert_lt_qu_l1; writer_status.write(item, state, ts_net, ts, deque)?; } *st3 = WithStatusSeriesIdState { @@ -1013,8 +1017,9 @@ impl CaConnSet { Ok(()) } else { if false { + // TODO self.thr_msg_storage_len - .trigger("connset handle_check_health", &[&self.storage_insert_sender.len()]); + .trigger("connset handle_check_health", &[&self.storage_insert_st_tx.len()]); } self.check_channel_states(tsnow, stnow)?; let item = CaConnSetItem::Healthy; @@ -1136,8 +1141,7 @@ impl CaConnSet { } else { trace!( "handle_channel_create_fail {:?} {:?} set to MaybeWrongAddress", - ch, - addr + ch, addr ); } bump_backoff(&mut st3.addr_find_backoff); @@ -1482,7 +1486,8 @@ impl CaConnSet { } else { self.channel_states.range_mut(..) }; - let mut item_deque = VecDeque::new(); + let mut st_qu_2 = VecDeque::new(); + let mut lt_qu_2 = VecDeque::new(); for (i, (ch, st)) in it.enumerate() { match &mut st.value { ChannelStateValue::Active(st2) => match st2 { @@ -1556,7 +1561,7 @@ impl CaConnSet { st3.inner = WithStatusSeriesIdStateInner::MaybeWrongAddress(snew); let item = ChannelStatusItem::new_closed_conn_timeout(stnow, st3.cssid.clone()); let (tsev, val) = item.to_ts_val(); - let deque = &mut item_deque; + let deque = &mut lt_qu_2; st3.writer_status.as_mut().unwrap().write( serieswriter::fixgridwriter::ChannelStatusWriteValue::new(tsev, val), st3.writer_status_state.as_mut().unwrap(), @@ -1642,7 +1647,8 @@ impl CaConnSet { } }; } - self.storage_insert_queue.push_back(item_deque); + self.storage_insert_st_qu.push_back(st_qu_2); + self.storage_insert_lt_qu.push_back(lt_qu_2); for (addr, ch) in cmd_remove_channel { if let Some(g) = self.ca_conn_ress.get_mut(&addr) { let cmd = ConnCommand::channel_close(ch.name().into()); @@ -1793,9 +1799,15 @@ impl CaConnSet { } self.handle_check_health()?; { - if self.storage_insert_queue_l1.len() != 0 { - let a = core::mem::replace(&mut self.storage_insert_queue_l1, VecDeque::new()); - self.storage_insert_queue.push_back(a); + if self.storage_insert_st_qu_l1.len() != 0 { + let a = std::mem::replace(&mut self.storage_insert_st_qu_l1, VecDeque::new()); + self.storage_insert_st_qu.push_back(a); + } + } + { + if self.storage_insert_lt_qu_l1.len() != 0 { + let a = std::mem::replace(&mut self.storage_insert_lt_qu_l1, VecDeque::new()); + self.storage_insert_lt_qu.push_back(a); } } Ok(()) @@ -1907,7 +1919,7 @@ impl Stream for CaConnSet { self.stats.storage_insert_tx_len.set(self.iqtx.st_rf3_tx.len() as _); self.stats .storage_insert_queue_len - .set(self.storage_insert_queue.len() as _); + .set(self.storage_insert_st_qu.len() as _); self.stats .channel_info_query_queue_len .set(self.channel_info_query_qu.len() as _); @@ -1978,8 +1990,20 @@ impl Stream for CaConnSet { { let this = self.as_mut().get_mut(); - let qu = &mut this.storage_insert_queue; - let tx = this.storage_insert_sender.as_mut(); + let qu = &mut this.storage_insert_st_qu; + let tx = this.storage_insert_st_tx.as_mut(); + let counter = this.stats.storage_insert_queue_send(); + let x = sender_polling_send(qu, tx, cx, || { + counter.inc(); + }); + if let Err(e) = merge_pending_progress(x, &mut penpro) { + break Ready(Some(CaConnSetItem::Error(e))); + } + } + { + let this = self.as_mut().get_mut(); + let qu = &mut this.storage_insert_lt_qu; + let tx = this.storage_insert_lt_tx.as_mut(); let counter = this.stats.storage_insert_queue_send(); let x = sender_polling_send(qu, tx, cx, || { counter.inc(); diff --git a/scywr/src/insertworker.rs b/scywr/src/insertworker.rs index a913724..c3998d3 100644 --- a/scywr/src/insertworker.rs +++ b/scywr/src/insertworker.rs @@ -1,7 +1,7 @@ use crate::config::ScyllaIngestConfig; use crate::iteminsertqueue::Accounting; use crate::iteminsertqueue::AccountingRecv; -use crate::iteminsertqueue::BinWriteIndexV01; +use crate::iteminsertqueue::BinWriteIndexV03; use crate::iteminsertqueue::InsertFut; use crate::iteminsertqueue::InsertItem; use crate::iteminsertqueue::MspItem; @@ -273,11 +273,11 @@ where prepare_timebin_v02_insert_futs(item, &data_store, &stats, tsnow) } } - QueryItem::BinWriteIndexV01(item) => { + QueryItem::BinWriteIndexV03(item) => { if ignore_writes { SmallVec::new() } else { - prepare_bin_write_index_v01_insert_futs(item, &data_store, &stats, tsnow) + prepare_bin_write_index_v03_insert_futs(item, &data_store, &stats, tsnow) } } QueryItem::Accounting(item) => { @@ -321,8 +321,8 @@ fn inspect_items( QueryItem::TimeBinSimpleF32V02(_) => { trace_item_execute!("execute {worker_name} TimeBinSimpleF32V02"); } - QueryItem::BinWriteIndexV01(_) => { - trace_item_execute!("execute {worker_name} BinWriteIndexV01"); + QueryItem::BinWriteIndexV03(_) => { + trace_item_execute!("execute {worker_name} BinWriteIndexV03"); } QueryItem::Accounting(_) => { trace_item_execute!("execute {worker_name} Accounting {item:?}"); @@ -420,26 +420,18 @@ fn prepare_timebin_v02_insert_futs( futs } -fn prepare_bin_write_index_v01_insert_futs( - item: BinWriteIndexV01, +fn prepare_bin_write_index_v03_insert_futs( + item: BinWriteIndexV03, data_store: &Arc, stats: &Arc, tsnow: Instant, ) -> SmallVec<[InsertFut; 4]> { - let params = ( - item.series, - item.dv1, - item.dv2, - item.quo, - item.rem, - item.rt, - item.binlen, - ); + let params = (item.series, item.pbp, item.msp, item.rt, item.lsp, item.binlen); // TODO would be better to count inserts only on completed insert stats.inserted_binned().inc(); let fut = InsertFut::new( data_store.scy.clone(), - data_store.qu_insert_bin_write_index_v01.clone(), + data_store.qu_insert_bin_write_index_v03.clone(), params, tsnow, stats.clone(), diff --git a/scywr/src/iteminsertqueue.rs b/scywr/src/iteminsertqueue.rs index d74fa3e..819879c 100644 --- a/scywr/src/iteminsertqueue.rs +++ b/scywr/src/iteminsertqueue.rs @@ -551,13 +551,12 @@ pub struct TimeBinSimpleF32V02 { } #[derive(Debug, Clone)] -pub struct BinWriteIndexV01 { +pub struct BinWriteIndexV03 { pub series: i64, - pub dv1: i32, - pub dv2: i32, - pub quo: i64, - pub rem: i32, - pub rt: i32, + pub pbp: i16, + pub msp: i32, + pub rt: i16, + pub lsp: i32, pub binlen: i32, } @@ -567,7 +566,7 @@ pub enum QueryItem { Insert(InsertItem), Msp(MspItem), TimeBinSimpleF32V02(TimeBinSimpleF32V02), - BinWriteIndexV01(BinWriteIndexV01), + BinWriteIndexV03(BinWriteIndexV03), Accounting(Accounting), AccountingRecv(AccountingRecv), } diff --git a/scywr/src/schema.rs b/scywr/src/schema.rs index 590142d..5fc15b0 100644 --- a/scywr/src/schema.rs +++ b/scywr/src/schema.rs @@ -636,18 +636,17 @@ async fn migrate_scylla_data_schema( let tab = GenTwcsTab::new( ks, rett.table_prefix(), - "bin_write_index_v01", + "bin_write_index_v03", &[ ("series", "bigint"), - ("dv1", "int"), - ("dv2", "int"), - ("quo", "bigint"), - ("rem", "int"), - ("rt", "int"), + ("pbp", "smallint"), + ("msp", "int"), + ("rt", "smallint"), + ("lsp", "int"), ("binlen", "int"), ], - ["series", "dv1", "dv2", "quo"], - ["rem", "rt", "binlen"], + ["series", "pbp", "msp"], + ["rt", "lsp", "binlen"], rett.ttl_binned(), ); tab.setup(chs, scy).await?; @@ -711,6 +710,18 @@ async fn migrate_scylla_data_schema( ); tab.setup(chs, scy).await?; } + { + let tn = format!("{}{}", rett.table_prefix(), "bin_write_index_v00"); + if has_table(&tn, scy).await? { + chs.add_todo(format!("drop table {}.{}", ks, tn)); + } + } + { + let tn = format!("{}{}", rett.table_prefix(), "bin_write_index_v01"); + if has_table(&tn, scy).await? { + chs.add_todo(format!("drop table {}.{}", ks, tn)); + } + } Ok(()) } diff --git a/scywr/src/store.rs b/scywr/src/store.rs index 6acfb6b..dc0ad8b 100644 --- a/scywr/src/store.rs +++ b/scywr/src/store.rs @@ -45,7 +45,7 @@ pub struct DataStore { pub qu_insert_array_f64: Arc, pub qu_insert_array_bool: Arc, pub qu_insert_binned_scalar_f32_v02: Arc, - pub qu_insert_bin_write_index_v01: Arc, + pub qu_insert_bin_write_index_v03: Arc, pub qu_account_00: Arc, pub qu_account_recv_00: Arc, pub qu_dummy: Arc, @@ -157,10 +157,10 @@ impl DataStore { scy ); - let qu_insert_bin_write_index_v01 = prep_qu_ins_c!( - "bin_write_index_v01", - "series, dv1, dv2, quo, rem, rt, binlen", - "?, ?, ?, ?, ?, ?, ?", + let qu_insert_bin_write_index_v03 = prep_qu_ins_c!( + "bin_write_index_v03", + "series, pbp, msp, rt, lsp, binlen", + "?, ?, ?, ?, ?, ?", rett, scy ); @@ -219,7 +219,7 @@ impl DataStore { qu_insert_array_f64, qu_insert_array_bool, qu_insert_binned_scalar_f32_v02, - qu_insert_bin_write_index_v01, + qu_insert_bin_write_index_v03, qu_account_00, qu_account_recv_00, qu_dummy, diff --git a/serieswriter/src/binwriter.rs b/serieswriter/src/binwriter.rs index 6595a7d..5ca0fa2 100644 --- a/serieswriter/src/binwriter.rs +++ b/serieswriter/src/binwriter.rs @@ -16,7 +16,7 @@ use netpod::Shape; use netpod::TsNano; use netpod::ttl::RetentionTime; use scywr::insertqueues::InsertDeques; -use scywr::iteminsertqueue::BinWriteIndexV01; +use scywr::iteminsertqueue::BinWriteIndexV03; use scywr::iteminsertqueue::QueryItem; use scywr::iteminsertqueue::TimeBinSimpleF32V02; use series::ChannelStatusSeriesId; @@ -84,25 +84,26 @@ impl WriteCntZero { } #[derive(Debug)] -struct IndexWritten { - last: Option<(PrebinnedPartitioning, u64, u32)>, +enum IndexWritten { + None, + Last(u32, u32), } impl IndexWritten { fn new() -> Self { - Self { last: None } + IndexWritten::None } - fn should_write(&self, _div: PrebinnedPartitioning, quo: u64, rem: u32) -> bool { - if let Some((_div0, quo0, rem0)) = &self.last { - *quo0 != quo || *rem0 != rem + fn should_write(&self, msp: u32, lsp: u32) -> bool { + if let IndexWritten::Last(lmsp, llsp) = self { + *lmsp != msp || *llsp != lsp } else { true } } - fn mark_written(&mut self, div: PrebinnedPartitioning, quo: u64, rem: u32) { - self.last = Some((div, quo, rem)); + fn mark_written(&mut self, msp: u32, lsp: u32) { + *self = IndexWritten::Last(msp, lsp); } } @@ -119,14 +120,17 @@ pub struct BinWriter { BinnedEventsTimeweight, WriteCntZero, PrebinnedPartitioning, + IndexWritten, + Option, )>, binner_others: Vec<( RetentionTime, BinnedBinsTimeweight, WriteCntZero, PrebinnedPartitioning, + IndexWritten, + Option, )>, - index_written: IndexWritten, trd: bool, } @@ -151,13 +155,13 @@ impl BinWriter { let quiets = [min_quiets.st.clone(), min_quiets.mt.clone(), min_quiets.lt.clone()]; let mut binner_1st = None; let mut binner_others = Vec::new(); - let mut has_monitor = false; + let mut has_monitor = None; let mut combs: Vec<_> = rts .into_iter() .zip(quiets.into_iter().map(|x| DtMs::from_ms_u64(x.as_millis() as u64))) .inspect(|x| { if x.1 <= DUR_ZERO { - has_monitor = true; + has_monitor = Some(x.0.clone()); } }) .filter(|x| x.1 > DUR_ZERO && x.1 < DUR_MAX) @@ -181,9 +185,42 @@ impl BinWriter { combs.push((RetentionTime::Long, PrebinnedPartitioning::Day1, WriteCntZero::Enable)); } } + } else { + match &has_monitor { + Some(RetentionTime::Short) => { + combs.push((RetentionTime::Short, PrebinnedPartitioning::Min1, WriteCntZero::Disable)); + combs.push(( + RetentionTime::Medium, + PrebinnedPartitioning::Hour1, + WriteCntZero::Disable, + )); + combs.push((RetentionTime::Long, PrebinnedPartitioning::Day1, WriteCntZero::Enable)); + } + Some(RetentionTime::Medium) => { + combs.push((RetentionTime::Short, PrebinnedPartitioning::Min1, WriteCntZero::Disable)); + combs.push(( + RetentionTime::Medium, + PrebinnedPartitioning::Hour1, + WriteCntZero::Disable, + )); + combs.push((RetentionTime::Long, PrebinnedPartitioning::Day1, WriteCntZero::Enable)); + } + Some(RetentionTime::Long) => { + combs.push(( + RetentionTime::Medium, + PrebinnedPartitioning::Min1, + WriteCntZero::Disable, + )); + combs.push((RetentionTime::Long, PrebinnedPartitioning::Hour1, WriteCntZero::Disable)); + combs.push((RetentionTime::Long, PrebinnedPartitioning::Day1, WriteCntZero::Enable)); + } + None => { + combs.push((RetentionTime::Long, PrebinnedPartitioning::Day1, WriteCntZero::Enable)); + } + } } debug_init!(trd, "combs B {:?}", combs); - if !is_polled && !has_monitor && combs.len() > 1 { + if combs.len() > 1 && has_monitor.is_none() && is_polled { combs.remove(0); } let combs = combs; @@ -196,15 +233,24 @@ impl BinWriter { if let WriteCntZero::Enable = write_zero { binner.cnt_zero_enable(); } - binner_1st = Some((rt, binner, write_zero, pbp)); + let iw2 = if pbp.uses_index_min10() { + Some(IndexWritten::new()) + } else { + None + }; + binner_1st = Some((rt, binner, write_zero, pbp, IndexWritten::new(), iw2)); } else { let range = BinnedRange::from_beg_to_inf(beg, pbp.bin_len()); - let binner = BinnedBinsTimeweight::new(range); + let mut binner = BinnedBinsTimeweight::new(range); if let WriteCntZero::Enable = write_zero { - // TODO - // binner.cnt_zero_enable(); + binner.cnt_zero_enable(); } - binner_others.push((rt, binner, write_zero, pbp)); + let iw2 = if pbp.uses_index_min10() { + Some(IndexWritten::new()) + } else { + None + }; + binner_others.push((rt, binner, write_zero, pbp, IndexWritten::new(), iw2)); } } let ret = Self { @@ -216,7 +262,6 @@ impl BinWriter { evbuf: ContainerEvents::new(), binner_1st, binner_others, - index_written: IndexWritten::new(), trd, }; let _ = ret.cssid; @@ -278,6 +323,8 @@ impl BinWriter { let write_zero = ee.2.clone(); let binner = &mut ee.1; let pbp = ee.3.clone(); + let index_written = &mut ee.4; + let iw2 = &mut ee.5; // TODO avoid boxing let bufbox = Box::new(buf); use items_0::timebin::IngestReport; @@ -297,18 +344,20 @@ impl BinWriter { trace_bin!(self.trd, "binner_1st out len {}", bins.len()); Self::handle_output_ready( self.trd, + true, self.sid, rt, &bins, write_zero, - &mut self.index_written, + index_written, + iw2, pbp, iqdqs, )?; // TODO avoid boxing let mut bins2: BinsBoxed = Box::new(bins); for i in 0..self.binner_others.len() { - let (rt, binner, write_zero, pbp) = &mut self.binner_others[i]; + let (rt, binner, write_zero, pbp, index_written, iw2) = &mut self.binner_others[i]; let write_zero = write_zero.clone(); binner.ingest(&bins2)?; let bb: Option = binner.output()?; @@ -319,11 +368,13 @@ impl BinWriter { if let Some(bb2) = bb.as_any_ref().downcast_ref::>() { Self::handle_output_ready( self.trd, + false, self.sid, rt.clone(), &bb2, write_zero, - &mut self.index_written, + index_written, + iw2, pbp.clone(), iqdqs, )?; @@ -348,17 +399,21 @@ impl BinWriter { Ok(()) } } else { + // TODO should rather make the top-level binner non-optional + self.evbuf.clear(); Ok(()) } } fn handle_output_ready( trd: bool, + is_from_events: bool, series: SeriesId, rt: RetentionTime, bins: &ContainerBins, write_zero: WriteCntZero, - index_written: &mut IndexWritten, + iw1: &mut IndexWritten, + iw2: &mut Option, pbp: PrebinnedPartitioning, iqdqs: &mut InsertDeques, ) -> Result<(), Error> { @@ -373,71 +428,91 @@ impl BinWriter { if fnl == false { info!("non final bin {:?}", series); } else if cnt == 0 && !write_zero.enabled() { - info!("zero count bin {:?}", series); + if is_from_events { + info!("zero count bin from events {:?}", series); + } else { + info!("zero count bin from bins {:?}", series); + } } else { if bin_len != pbp.bin_len() { let e = Error::UnexpectedBinLen(bin_len, pbp); return Err(e); } - let div = pbp.msp_div(); - if div.ns() % bin_len.ns() != 0 { - let e = Error::UnsupportedGridDiv(bin_len, div); - return Err(e); - } - let msp = ts1.ms() / div.ms(); - let off = (ts1.ms() - div.ms() * msp) / bin_len.ms(); - let item = QueryItem::TimeBinSimpleF32V02(TimeBinSimpleF32V02 { - series, - binlen: bin_len.ms() as i32, - msp: msp as i64, - off: off as i32, - cnt: cnt as i64, - min, - max, - avg, - dev: f32::NAN, - lst, - }); - if true || bin_len >= DtMs::from_ms_u64(1000 * 60 * 60) { - debug_bin!(trd, "handle_output_ready emit {:?} len {} {:?}", rt, bins_len, item); - } - match rt { - RetentionTime::Short => { - iqdqs.st_rf3_qu.push_back(item); + { + let (msp, lsp) = pbp.msp_lsp(ts1.to_ts_ms()); + let item = QueryItem::TimeBinSimpleF32V02(TimeBinSimpleF32V02 { + series, + binlen: bin_len.ms() as i32, + msp: msp as i64, + off: lsp as i32, + cnt: cnt as i64, + min, + max, + avg, + dev: f32::NAN, + lst, + }); + if true || bin_len >= DtMs::from_ms_u64(1000 * 60 * 60) { + debug_bin!(trd, "handle_output_ready emit {:?} len {} {:?}", rt, bins_len, item); } - RetentionTime::Medium => { - iqdqs.mt_rf3_qu.push_back(item); - } - RetentionTime::Long => { - iqdqs.lt_rf3_qu.push_back(item); + let qu = iqdqs.deque(rt.clone()); + qu.push_back(item); + } + if pbp.uses_index_min10() { + let pbp_ix = PrebinnedPartitioning::Min10; + let (msp, lsp) = pbp_ix.msp_lsp(ts1.to_ts_ms()); + debug_bin!( + trd, + "handle_output_ready index {:?} {:?} {:?} {:?} {:?} {:?}", + series, + pbp_ix, + pbp, + rt, + msp, + lsp + ); + let iw = iw2.as_mut().unwrap(); + if iw.should_write(msp, lsp) { + iw.mark_written(msp, lsp); + let item = BinWriteIndexV03 { + series: series.id() as i64, + pbp: pbp_ix.db_ix() as i16, + msp: msp as i32, + rt: rt.index_db_i32() as i16, + lsp: lsp as i32, + binlen: pbp.bin_len().ms() as i32, + }; + let item = QueryItem::BinWriteIndexV03(item); + iqdqs.deque(rt.clone()).push_back(item); } } - let div = PrebinnedPartitioning::Day1; - let (quo, rem, dv1, dv2) = div.quo_rem(ts1.to_ts_ms()); - if index_written.should_write(div.clone(), quo, rem) { - index_written.mark_written(div.clone(), quo, rem); - let item = BinWriteIndexV01 { - series: series.id() as i64, - dv1: dv1 as i32, - dv2: dv2 as i32, - quo: quo as i64, - rem: rem as i32, - rt: rt.index_db_i32(), - binlen: pbp.bin_len().ms() as i32, - }; - let item = QueryItem::BinWriteIndexV01(item); - match rt { - RetentionTime::Short => { - iqdqs.st_rf3_qu.push_back(item); - } - RetentionTime::Medium => { - iqdqs.mt_rf3_qu.push_back(item); - } - RetentionTime::Long => { - iqdqs.lt_rf3_qu.push_back(item); - } + if true { + let pbp_ix = PrebinnedPartitioning::Day1; + let (msp, lsp) = pbp_ix.msp_lsp(ts1.to_ts_ms()); + debug_bin!( + trd, + "handle_output_ready index {:?} {:?} {:?} {:?} {:?} {:?}", + series, + pbp_ix, + pbp, + rt, + msp, + lsp + ); + // let iw = iw1; + if iw1.should_write(msp, lsp) { + iw1.mark_written(msp, lsp); + let item = BinWriteIndexV03 { + series: series.id() as i64, + pbp: pbp_ix.db_ix() as i16, + msp: msp as i32, + rt: rt.index_db_i32() as i16, + lsp: lsp as i32, + binlen: pbp.bin_len().ms() as i32, + }; + let item = QueryItem::BinWriteIndexV03(item); + iqdqs.deque(rt.clone()).push_back(item); } - } else { } } }