Generate net recv accounting data

This commit is contained in:
Dominik Werder
2024-06-27 15:30:40 +02:00
parent d3ee2417a3
commit 0bb299c2b1
11 changed files with 135 additions and 15 deletions

View File

@@ -1,6 +1,6 @@
[package]
name = "daqingest"
version = "0.2.1"
version = "0.2.2-aa.0"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"

View File

@@ -30,6 +30,7 @@ async fn main_run(opts: DaqIngestOpts) -> Result<(), Error> {
}
async fn main_run_inner(opts: DaqIngestOpts) -> Result<(), Error> {
let buildmark = "+0006";
use daqingest::opts::ChannelAccess;
use daqingest::opts::SubCmd;
match opts.subcmd {
@@ -89,7 +90,7 @@ async fn main_run_inner(opts: DaqIngestOpts) -> Result<(), Error> {
netfetch::ca::search::ca_search(conf, &channels).await?
}
ChannelAccess::CaIngest(k) => {
info!("daqingest version {} +0004", clap::crate_version!());
info!("daqingest version {} {}", clap::crate_version!(), buildmark);
let (conf, channels_config) = parse_config(k.config.into()).await?;
daqingest::daemon::run(conf, channels_config).await?
}
@@ -106,6 +107,18 @@ async fn main_run_inner(opts: DaqIngestOpts) -> Result<(), Error> {
SubCmd::Version => {
println!("{}", clap::crate_version!());
}
SubCmd::LogTest => {
info!("log-test");
warn!("log-test");
error!("log-test");
debug!("log-test");
trace!("log-test");
series::log_test();
let _spg = tracing::span!(tracing::Level::INFO, "log_span_debug");
_spg.in_scope(|| {
series::log_test();
})
}
}
Ok(())
}

View File

@@ -30,6 +30,7 @@ pub enum SubCmd {
#[cfg(feature = "bsread")]
BsreadDump(BsreadDump),
Version,
LogTest,
}
#[derive(Debug, clap::Parser)]

View File

@@ -36,6 +36,7 @@ use scywr::insertqueues::InsertQueuesTx;
use scywr::insertqueues::InsertSenderPolling;
use scywr::iteminsertqueue as scywriiq;
use scywr::iteminsertqueue::Accounting;
use scywr::iteminsertqueue::AccountingRecv;
use scywr::iteminsertqueue::DataValue;
use scywr::iteminsertqueue::QueryItem;
use scywr::iteminsertqueue::ShutdownReason;
@@ -406,6 +407,7 @@ struct CreatedState {
stwin_ts: u64,
stwin_count: u32,
stwin_bytes: u32,
acc_recv: AccountingInfo,
acc_st: AccountingInfo,
acc_mt: AccountingInfo,
acc_lt: AccountingInfo,
@@ -442,6 +444,7 @@ impl CreatedState {
stwin_ts: 0,
stwin_count: 0,
stwin_bytes: 0,
acc_recv: AccountingInfo::new(acc_msp),
acc_st: AccountingInfo::new(acc_msp),
acc_mt: AccountingInfo::new(acc_msp),
acc_lt: AccountingInfo::new(acc_msp),
@@ -1689,6 +1692,7 @@ impl CaConn {
crst.item_recv_ivl_ema.tick(tsnow);
crst.recv_count += 1;
crst.recv_bytes += payload_len as u64;
crst.acc_recv.push_written(payload_len);
// TODO should attach these counters already to Writable state.
let ts_local = {
let epoch = stnow.duration_since(std::time::UNIX_EPOCH).unwrap_or(Duration::ZERO);
@@ -2239,6 +2243,7 @@ impl CaConn {
stwin_ts: 0,
stwin_count: 0,
stwin_bytes: 0,
acc_recv: AccountingInfo::new(acc_msp),
acc_st: AccountingInfo::new(acc_msp),
acc_mt: AccountingInfo::new(acc_msp),
acc_lt: AccountingInfo::new(acc_msp),
@@ -2524,10 +2529,27 @@ impl CaConn {
count: acc.usage().count() as _,
bytes: acc.usage().bytes() as _,
};
//info!("EMIT ITEM {rt:?} {item:?}");
trace!("EMIT ITEM {rt:?} {item:?}");
self.iqdqs.emit_accounting_item(rt, item)?;
acc.reset(msp);
}
acc.reset(msp);
}
}
{
let acc = &mut ch.acc_recv;
if acc.beg != msp {
if acc.usage().count() != 0 {
let series = st1.writer.sid();
let item = AccountingRecv {
part: (series.id() & 0xff) as i32,
ts: acc.beg,
series,
count: acc.usage().count() as _,
bytes: acc.usage().bytes() as _,
};
self.iqdqs.emit_accounting_recv(item)?;
}
acc.reset(msp);
}
}
}

View File

@@ -1511,8 +1511,9 @@ impl CaConnSet {
}
Ready(Err(e)) => match e {
scywr::senderpolling::Error::NoSendInProgress => {
error!("try_push_ca_conn_cmds {e}");
return Err(Error::with_msg_no_trace(format!("{e}")));
let e = Error::with_msg_no_trace(format!("try_push_ca_conn_cmds E-A {addr} {e}"));
error!("{e}");
return Err(e);
}
scywr::senderpolling::Error::Closed(_) => {
// TODO

View File

@@ -1,4 +1,5 @@
use crate::iteminsertqueue::Accounting;
use crate::iteminsertqueue::AccountingRecv;
use crate::iteminsertqueue::QueryItem;
use crate::senderpolling::SenderPolling;
use async_channel::Receiver;
@@ -152,6 +153,14 @@ impl InsertDeques {
Ok(())
}
// Should be used only for connection and channel status items.
// It encapsulates the decision to which queue(s) we want to send these kind of items.
pub fn emit_accounting_recv(&mut self, item: AccountingRecv) -> Result<(), Error> {
self.deque(RetentionTime::Short)
.push_back(QueryItem::AccountingRecv(item));
Ok(())
}
pub fn deque(&mut self, rt: RetentionTime) -> &mut VecDeque<QueryItem> {
match rt {
RetentionTime::Short => &mut self.st_rf3_rx,

View File

@@ -1,11 +1,10 @@
use crate::config::ScyllaIngestConfig;
use crate::iteminsertqueue::insert_channel_status;
use crate::iteminsertqueue::insert_channel_status_fut;
use crate::iteminsertqueue::insert_connection_status;
use crate::iteminsertqueue::insert_connection_status_fut;
use crate::iteminsertqueue::insert_item_fut;
use crate::iteminsertqueue::insert_msp_fut;
use crate::iteminsertqueue::Accounting;
use crate::iteminsertqueue::AccountingRecv;
use crate::iteminsertqueue::InsertFut;
use crate::iteminsertqueue::InsertItem;
use crate::iteminsertqueue::QueryItem;
@@ -13,7 +12,6 @@ use crate::iteminsertqueue::TimeBinSimpleF32;
use crate::store::DataStore;
use async_channel::Receiver;
use atomic::AtomicU64;
use atomic::Ordering;
use err::Error;
use futures_util::Stream;
use futures_util::StreamExt;
@@ -97,6 +95,7 @@ fn stats_inc_for_err(stats: &stats::InsertWorkerStats, err: &crate::iteminsertqu
}
}
#[allow(unused)]
fn back_off_next(backoff_dt: &mut Duration) {
*backoff_dt = *backoff_dt + (*backoff_dt) * 3 / 2;
let dtmax = Duration::from_millis(4000);
@@ -105,6 +104,7 @@ fn back_off_next(backoff_dt: &mut Duration) {
}
}
#[allow(unused)]
async fn back_off_sleep(backoff_dt: &mut Duration) {
back_off_next(backoff_dt);
tokio::time::sleep(*backoff_dt).await;
@@ -285,6 +285,9 @@ where
}
QueryItem::TimeBinSimpleF32(item) => prepare_timebin_insert_futs(item, &data_store, &stats, tsnow),
QueryItem::Accounting(item) => prepare_accounting_insert_futs(item, &data_store, &stats, tsnow),
QueryItem::AccountingRecv(item) => {
prepare_accounting_recv_insert_futs(item, &data_store, &stats, tsnow)
}
};
trace!("prepared futs len {}", futs.len());
res.extend(futs.into_iter());
@@ -315,12 +318,11 @@ fn inspect_items(
QueryItem::TimeBinSimpleF32(_) => {
trace_item_execute!("execute {worker_name} TimeBinSimpleF32");
}
QueryItem::Accounting(x) => {
if x.series.id() & 0x7f == 200 {
debug!("execute {worker_name} Accounting {item:?}");
} else {
trace_item_execute!("execute {worker_name} Accounting {item:?}");
}
QueryItem::Accounting(_) => {
trace_item_execute!("execute {worker_name} Accounting {item:?}");
}
QueryItem::AccountingRecv(_) => {
trace_item_execute!("execute {worker_name} Accounting {item:?}");
}
}
}
@@ -425,3 +427,27 @@ fn prepare_accounting_insert_futs(
let futs = smallvec![fut];
futs
}
fn prepare_accounting_recv_insert_futs(
item: AccountingRecv,
data_store: &Arc<DataStore>,
stats: &Arc<InsertWorkerStats>,
tsnow: TsMs,
) -> SmallVec<[InsertFut; 4]> {
let params = (
item.part,
item.ts.sec() as i64,
item.series.id() as i64,
item.count,
item.bytes,
);
let fut = InsertFut::new(
data_store.scy.clone(),
data_store.qu_account_recv_00.clone(),
params,
tsnow,
stats.clone(),
);
let futs = smallvec![fut];
futs
}

View File

@@ -640,6 +640,7 @@ pub enum QueryItem {
Insert(InsertItem),
TimeBinSimpleF32(TimeBinSimpleF32),
Accounting(Accounting),
AccountingRecv(AccountingRecv),
}
#[derive(Debug, Clone)]
@@ -651,6 +652,15 @@ pub struct Accounting {
pub bytes: i64,
}
#[derive(Debug, Clone)]
pub struct AccountingRecv {
pub part: i32,
pub ts: TsMs,
pub series: SeriesId,
pub count: i64,
pub bytes: i64,
}
struct InsParCom {
series: SeriesId,
ts_msp: TsMs,

View File

@@ -579,5 +579,23 @@ pub async fn migrate_scylla_data_schema(scyconf: &ScyllaIngestConfig, rett: Rete
);
tab.setup(scy).await?;
}
{
let tab = GenTwcsTab::new(
ks,
rett.table_prefix(),
"account_recv_00",
&[
("part", "int"),
("ts", "bigint"),
("series", "bigint"),
("count", "bigint"),
("bytes", "bigint"),
],
["part", "ts"],
["series"],
rett.ttl_channel_status(),
);
tab.setup(scy).await?;
}
Ok(())
}

View File

@@ -48,6 +48,7 @@ pub struct DataStore {
pub qu_insert_channel_status_by_ts_msp: Arc<PreparedStatement>,
pub qu_insert_binned_scalar_f32_v02: Arc<PreparedStatement>,
pub qu_account_00: Arc<PreparedStatement>,
pub qu_account_recv_00: Arc<PreparedStatement>,
pub qu_dummy: Arc<PreparedStatement>,
}
@@ -174,6 +175,14 @@ impl DataStore {
scy
);
let qu_account_recv_00 = prep_qu_ins_c!(
"account_recv_00",
"part, ts, series, count, bytes",
"?, ?, ?, ?, ?",
rett,
scy
);
let q = scy
.prepare(format!(
concat!("select * from {}{} limit 1"),
@@ -215,6 +224,7 @@ impl DataStore {
qu_insert_channel_status_by_ts_msp,
qu_insert_binned_scalar_f32_v02,
qu_account_00,
qu_account_recv_00,
qu_dummy,
};
Ok(ret)

View File

@@ -2,3 +2,13 @@ pub mod series;
pub use series::ChannelStatusSeriesId;
pub use series::SeriesId;
use log::*;
pub fn log_test() {
info!("log-test");
warn!("log-test");
error!("log-test");
debug!("log-test");
trace!("log-test");
}