From 0bb299c2b1368748c4b2d1f3ffbe00dac3375483 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 27 Jun 2024 15:30:40 +0200 Subject: [PATCH] Generate net recv accounting data --- daqingest/Cargo.toml | 2 +- daqingest/src/bin/daqingest.rs | 15 +++++++++++- daqingest/src/opts.rs | 1 + netfetch/src/ca/conn.rs | 26 ++++++++++++++++++-- netfetch/src/ca/connset.rs | 5 ++-- scywr/src/insertqueues.rs | 9 +++++++ scywr/src/insertworker.rs | 44 +++++++++++++++++++++++++++------- scywr/src/iteminsertqueue.rs | 10 ++++++++ scywr/src/schema.rs | 18 ++++++++++++++ scywr/src/store.rs | 10 ++++++++ series/src/lib.rs | 10 ++++++++ 11 files changed, 135 insertions(+), 15 deletions(-) diff --git a/daqingest/Cargo.toml b/daqingest/Cargo.toml index b060e93..1313ae3 100644 --- a/daqingest/Cargo.toml +++ b/daqingest/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqingest" -version = "0.2.1" +version = "0.2.2-aa.0" authors = ["Dominik Werder "] edition = "2021" diff --git a/daqingest/src/bin/daqingest.rs b/daqingest/src/bin/daqingest.rs index 49b7c3d..06d82a0 100644 --- a/daqingest/src/bin/daqingest.rs +++ b/daqingest/src/bin/daqingest.rs @@ -30,6 +30,7 @@ async fn main_run(opts: DaqIngestOpts) -> Result<(), Error> { } async fn main_run_inner(opts: DaqIngestOpts) -> Result<(), Error> { + let buildmark = "+0006"; use daqingest::opts::ChannelAccess; use daqingest::opts::SubCmd; match opts.subcmd { @@ -89,7 +90,7 @@ async fn main_run_inner(opts: DaqIngestOpts) -> Result<(), Error> { netfetch::ca::search::ca_search(conf, &channels).await? } ChannelAccess::CaIngest(k) => { - info!("daqingest version {} +0004", clap::crate_version!()); + info!("daqingest version {} {}", clap::crate_version!(), buildmark); let (conf, channels_config) = parse_config(k.config.into()).await?; daqingest::daemon::run(conf, channels_config).await? } @@ -106,6 +107,18 @@ async fn main_run_inner(opts: DaqIngestOpts) -> Result<(), Error> { SubCmd::Version => { println!("{}", clap::crate_version!()); } + SubCmd::LogTest => { + info!("log-test"); + warn!("log-test"); + error!("log-test"); + debug!("log-test"); + trace!("log-test"); + series::log_test(); + let _spg = tracing::span!(tracing::Level::INFO, "log_span_debug"); + _spg.in_scope(|| { + series::log_test(); + }) + } } Ok(()) } diff --git a/daqingest/src/opts.rs b/daqingest/src/opts.rs index 7bf9f9d..9a53288 100644 --- a/daqingest/src/opts.rs +++ b/daqingest/src/opts.rs @@ -30,6 +30,7 @@ pub enum SubCmd { #[cfg(feature = "bsread")] BsreadDump(BsreadDump), Version, + LogTest, } #[derive(Debug, clap::Parser)] diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 6857c99..eaeeb2c 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -36,6 +36,7 @@ use scywr::insertqueues::InsertQueuesTx; use scywr::insertqueues::InsertSenderPolling; use scywr::iteminsertqueue as scywriiq; use scywr::iteminsertqueue::Accounting; +use scywr::iteminsertqueue::AccountingRecv; use scywr::iteminsertqueue::DataValue; use scywr::iteminsertqueue::QueryItem; use scywr::iteminsertqueue::ShutdownReason; @@ -406,6 +407,7 @@ struct CreatedState { stwin_ts: u64, stwin_count: u32, stwin_bytes: u32, + acc_recv: AccountingInfo, acc_st: AccountingInfo, acc_mt: AccountingInfo, acc_lt: AccountingInfo, @@ -442,6 +444,7 @@ impl CreatedState { stwin_ts: 0, stwin_count: 0, stwin_bytes: 0, + acc_recv: AccountingInfo::new(acc_msp), acc_st: AccountingInfo::new(acc_msp), acc_mt: AccountingInfo::new(acc_msp), acc_lt: AccountingInfo::new(acc_msp), @@ -1689,6 +1692,7 @@ impl CaConn { crst.item_recv_ivl_ema.tick(tsnow); crst.recv_count += 1; crst.recv_bytes += payload_len as u64; + crst.acc_recv.push_written(payload_len); // TODO should attach these counters already to Writable state. let ts_local = { let epoch = stnow.duration_since(std::time::UNIX_EPOCH).unwrap_or(Duration::ZERO); @@ -2239,6 +2243,7 @@ impl CaConn { stwin_ts: 0, stwin_count: 0, stwin_bytes: 0, + acc_recv: AccountingInfo::new(acc_msp), acc_st: AccountingInfo::new(acc_msp), acc_mt: AccountingInfo::new(acc_msp), acc_lt: AccountingInfo::new(acc_msp), @@ -2524,10 +2529,27 @@ impl CaConn { count: acc.usage().count() as _, bytes: acc.usage().bytes() as _, }; - //info!("EMIT ITEM {rt:?} {item:?}"); + trace!("EMIT ITEM {rt:?} {item:?}"); self.iqdqs.emit_accounting_item(rt, item)?; - acc.reset(msp); } + acc.reset(msp); + } + } + { + let acc = &mut ch.acc_recv; + if acc.beg != msp { + if acc.usage().count() != 0 { + let series = st1.writer.sid(); + let item = AccountingRecv { + part: (series.id() & 0xff) as i32, + ts: acc.beg, + series, + count: acc.usage().count() as _, + bytes: acc.usage().bytes() as _, + }; + self.iqdqs.emit_accounting_recv(item)?; + } + acc.reset(msp); } } } diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index 412a617..cebda2c 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -1511,8 +1511,9 @@ impl CaConnSet { } Ready(Err(e)) => match e { scywr::senderpolling::Error::NoSendInProgress => { - error!("try_push_ca_conn_cmds {e}"); - return Err(Error::with_msg_no_trace(format!("{e}"))); + let e = Error::with_msg_no_trace(format!("try_push_ca_conn_cmds E-A {addr} {e}")); + error!("{e}"); + return Err(e); } scywr::senderpolling::Error::Closed(_) => { // TODO diff --git a/scywr/src/insertqueues.rs b/scywr/src/insertqueues.rs index 692f149..92d964e 100644 --- a/scywr/src/insertqueues.rs +++ b/scywr/src/insertqueues.rs @@ -1,4 +1,5 @@ use crate::iteminsertqueue::Accounting; +use crate::iteminsertqueue::AccountingRecv; use crate::iteminsertqueue::QueryItem; use crate::senderpolling::SenderPolling; use async_channel::Receiver; @@ -152,6 +153,14 @@ impl InsertDeques { Ok(()) } + // Should be used only for connection and channel status items. + // It encapsulates the decision to which queue(s) we want to send these kind of items. + pub fn emit_accounting_recv(&mut self, item: AccountingRecv) -> Result<(), Error> { + self.deque(RetentionTime::Short) + .push_back(QueryItem::AccountingRecv(item)); + Ok(()) + } + pub fn deque(&mut self, rt: RetentionTime) -> &mut VecDeque { match rt { RetentionTime::Short => &mut self.st_rf3_rx, diff --git a/scywr/src/insertworker.rs b/scywr/src/insertworker.rs index f895b5c..a6ba258 100644 --- a/scywr/src/insertworker.rs +++ b/scywr/src/insertworker.rs @@ -1,11 +1,10 @@ use crate::config::ScyllaIngestConfig; -use crate::iteminsertqueue::insert_channel_status; use crate::iteminsertqueue::insert_channel_status_fut; -use crate::iteminsertqueue::insert_connection_status; use crate::iteminsertqueue::insert_connection_status_fut; use crate::iteminsertqueue::insert_item_fut; use crate::iteminsertqueue::insert_msp_fut; use crate::iteminsertqueue::Accounting; +use crate::iteminsertqueue::AccountingRecv; use crate::iteminsertqueue::InsertFut; use crate::iteminsertqueue::InsertItem; use crate::iteminsertqueue::QueryItem; @@ -13,7 +12,6 @@ use crate::iteminsertqueue::TimeBinSimpleF32; use crate::store::DataStore; use async_channel::Receiver; use atomic::AtomicU64; -use atomic::Ordering; use err::Error; use futures_util::Stream; use futures_util::StreamExt; @@ -97,6 +95,7 @@ fn stats_inc_for_err(stats: &stats::InsertWorkerStats, err: &crate::iteminsertqu } } +#[allow(unused)] fn back_off_next(backoff_dt: &mut Duration) { *backoff_dt = *backoff_dt + (*backoff_dt) * 3 / 2; let dtmax = Duration::from_millis(4000); @@ -105,6 +104,7 @@ fn back_off_next(backoff_dt: &mut Duration) { } } +#[allow(unused)] async fn back_off_sleep(backoff_dt: &mut Duration) { back_off_next(backoff_dt); tokio::time::sleep(*backoff_dt).await; @@ -285,6 +285,9 @@ where } QueryItem::TimeBinSimpleF32(item) => prepare_timebin_insert_futs(item, &data_store, &stats, tsnow), QueryItem::Accounting(item) => prepare_accounting_insert_futs(item, &data_store, &stats, tsnow), + QueryItem::AccountingRecv(item) => { + prepare_accounting_recv_insert_futs(item, &data_store, &stats, tsnow) + } }; trace!("prepared futs len {}", futs.len()); res.extend(futs.into_iter()); @@ -315,12 +318,11 @@ fn inspect_items( QueryItem::TimeBinSimpleF32(_) => { trace_item_execute!("execute {worker_name} TimeBinSimpleF32"); } - QueryItem::Accounting(x) => { - if x.series.id() & 0x7f == 200 { - debug!("execute {worker_name} Accounting {item:?}"); - } else { - trace_item_execute!("execute {worker_name} Accounting {item:?}"); - } + QueryItem::Accounting(_) => { + trace_item_execute!("execute {worker_name} Accounting {item:?}"); + } + QueryItem::AccountingRecv(_) => { + trace_item_execute!("execute {worker_name} Accounting {item:?}"); } } } @@ -425,3 +427,27 @@ fn prepare_accounting_insert_futs( let futs = smallvec![fut]; futs } + +fn prepare_accounting_recv_insert_futs( + item: AccountingRecv, + data_store: &Arc, + stats: &Arc, + tsnow: TsMs, +) -> SmallVec<[InsertFut; 4]> { + let params = ( + item.part, + item.ts.sec() as i64, + item.series.id() as i64, + item.count, + item.bytes, + ); + let fut = InsertFut::new( + data_store.scy.clone(), + data_store.qu_account_recv_00.clone(), + params, + tsnow, + stats.clone(), + ); + let futs = smallvec![fut]; + futs +} diff --git a/scywr/src/iteminsertqueue.rs b/scywr/src/iteminsertqueue.rs index c632b2c..a26a970 100644 --- a/scywr/src/iteminsertqueue.rs +++ b/scywr/src/iteminsertqueue.rs @@ -640,6 +640,7 @@ pub enum QueryItem { Insert(InsertItem), TimeBinSimpleF32(TimeBinSimpleF32), Accounting(Accounting), + AccountingRecv(AccountingRecv), } #[derive(Debug, Clone)] @@ -651,6 +652,15 @@ pub struct Accounting { pub bytes: i64, } +#[derive(Debug, Clone)] +pub struct AccountingRecv { + pub part: i32, + pub ts: TsMs, + pub series: SeriesId, + pub count: i64, + pub bytes: i64, +} + struct InsParCom { series: SeriesId, ts_msp: TsMs, diff --git a/scywr/src/schema.rs b/scywr/src/schema.rs index 2e7bcff..cd36bca 100644 --- a/scywr/src/schema.rs +++ b/scywr/src/schema.rs @@ -579,5 +579,23 @@ pub async fn migrate_scylla_data_schema(scyconf: &ScyllaIngestConfig, rett: Rete ); tab.setup(scy).await?; } + { + let tab = GenTwcsTab::new( + ks, + rett.table_prefix(), + "account_recv_00", + &[ + ("part", "int"), + ("ts", "bigint"), + ("series", "bigint"), + ("count", "bigint"), + ("bytes", "bigint"), + ], + ["part", "ts"], + ["series"], + rett.ttl_channel_status(), + ); + tab.setup(scy).await?; + } Ok(()) } diff --git a/scywr/src/store.rs b/scywr/src/store.rs index 7646059..467aded 100644 --- a/scywr/src/store.rs +++ b/scywr/src/store.rs @@ -48,6 +48,7 @@ pub struct DataStore { pub qu_insert_channel_status_by_ts_msp: Arc, pub qu_insert_binned_scalar_f32_v02: Arc, pub qu_account_00: Arc, + pub qu_account_recv_00: Arc, pub qu_dummy: Arc, } @@ -174,6 +175,14 @@ impl DataStore { scy ); + let qu_account_recv_00 = prep_qu_ins_c!( + "account_recv_00", + "part, ts, series, count, bytes", + "?, ?, ?, ?, ?", + rett, + scy + ); + let q = scy .prepare(format!( concat!("select * from {}{} limit 1"), @@ -215,6 +224,7 @@ impl DataStore { qu_insert_channel_status_by_ts_msp, qu_insert_binned_scalar_f32_v02, qu_account_00, + qu_account_recv_00, qu_dummy, }; Ok(ret) diff --git a/series/src/lib.rs b/series/src/lib.rs index d82fc15..fe705a3 100644 --- a/series/src/lib.rs +++ b/series/src/lib.rs @@ -2,3 +2,13 @@ pub mod series; pub use series::ChannelStatusSeriesId; pub use series::SeriesId; + +use log::*; + +pub fn log_test() { + info!("log-test"); + warn!("log-test"); + error!("log-test"); + debug!("log-test"); + trace!("log-test"); +}