Emit accounting data

This commit is contained in:
Dominik Werder
2024-01-31 14:27:54 +01:00
parent 1e39a9c2ac
commit 1e4d544fc0
10 changed files with 182 additions and 56 deletions

View File

@@ -75,11 +75,11 @@ create table if not exists series_by_channel (
scalar_type int not null,
shape_dims int[] not null,
agg_kind int not null,
tscreate timestamptz not null default 'now()'
tscreate timestamptz not null default now()
)";
let _ = pgc.execute(sql, &[]).await;
let sql = "alter table series_by_channel add tscreate timestamptz not null default 'now()'";
let sql = "alter table series_by_channel add tscreate timestamptz not null default now()";
let _ = pgc.execute(sql, &[]).await;
if !has_table("ioc_by_channel_log", pgc).await? {

View File

@@ -20,6 +20,7 @@ use netpod::timeunits::*;
use netpod::ScalarType;
use netpod::Shape;
use netpod::TsNano;
use netpod::EMIT_ACCOUNTING_SNAP;
use proto::CaItem;
use proto::CaMsg;
use proto::CaMsgTy;
@@ -27,6 +28,7 @@ use proto::CaProto;
use proto::CreateChan;
use proto::EventAdd;
use scywr::iteminsertqueue as scywriiq;
use scywr::iteminsertqueue::Accounting;
use scywr::iteminsertqueue::DataValue;
use scywriiq::ChannelInfoItem;
use scywriiq::ChannelStatus;
@@ -310,6 +312,9 @@ struct CreatedState {
stwin_ts: u64,
stwin_count: u32,
stwin_bytes: u32,
account_emit_last: u64,
account_count: u64,
account_bytes: u64,
}
impl CreatedState {
@@ -338,6 +343,9 @@ impl CreatedState {
stwin_ts: 0,
stwin_count: 0,
stwin_bytes: 0,
account_emit_last: 0,
account_count: 0,
account_bytes: 0,
}
}
}
@@ -1620,6 +1628,11 @@ impl CaConn {
let ts_diff = ts.abs_diff(ts_local);
stats.ca_ts_off().ingest((ts_diff / MS) as u32);
if tsnow >= crst.insert_next_earliest {
{
crst.account_count += 1;
// TODO how do we account for bytes? Here, we also add 8 bytes for the timestamp.
crst.account_bytes += 8 + payload_len as u64;
}
{
crst.muted_before = 0;
crst.insert_item_ivl_ema.tick(tsnow);
@@ -1882,23 +1895,13 @@ impl CaConn {
cx.waker().wake_by_ref();
}
CaMsgTy::EventAddRes(ev) => {
trace!("got EventAddRes {:?} cnt {}", camsg.ts, ev.data_count);
trace2!("got EventAddRes {:?} cnt {}", camsg.ts, ev.data_count);
self.stats.event_add_res_recv.inc();
let res = Self::handle_event_add_res(self, ev, tsnow);
let ts2 = Instant::now();
self.stats
.time_handle_event_add_res
.add((ts2.duration_since(tsnow) * MS as u32).as_secs());
res?;
Self::handle_event_add_res(self, ev, tsnow)?
}
CaMsgTy::EventAddResEmpty(ev) => {
trace!("got EventAddResEmpty {:?}", camsg.ts);
let res = Self::handle_event_add_res_empty(self, ev, tsnow);
let ts2 = Instant::now();
self.stats
.time_handle_event_add_res
.add((ts2.duration_since(tsnow) * MS as u32).as_secs());
res?;
trace2!("got EventAddResEmpty {:?}", camsg.ts);
Self::handle_event_add_res_empty(self, ev, tsnow)?
}
CaMsgTy::ReadNotifyRes(ev) => Self::handle_read_notify_res(self, ev, tsnow)?,
CaMsgTy::Echo => {
@@ -2036,6 +2039,9 @@ impl CaConn {
stwin_ts: 0,
stwin_count: 0,
stwin_bytes: 0,
account_emit_last: 0,
account_count: 0,
account_bytes: 0,
};
*ch_s = ChannelState::MakingSeriesWriter(MakingSeriesWriterState { tsbeg: tsnow, channel });
let job = EstablishWorkerJob::new(
@@ -2230,6 +2236,7 @@ impl CaConn {
if self.channel_status_emit_last + Duration::from_millis(3000) <= tsnow {
self.channel_status_emit_last = tsnow;
self.emit_channel_status()?;
self.emit_accounting()?;
}
if self.tick_last_writer + Duration::from_millis(2000) <= tsnow {
self.tick_last_writer = tsnow;
@@ -2276,6 +2283,39 @@ impl CaConn {
Ok(())
}
fn emit_accounting(&mut self) -> Result<(), Error> {
let stnow = self.tmp_ts_poll;
let ts_sec = stnow.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs();
let ts_sec_snap = ts_sec / EMIT_ACCOUNTING_SNAP * EMIT_ACCOUNTING_SNAP;
for (_k, st0) in self.channels.iter_mut() {
match st0 {
ChannelState::Writable(st1) => {
let ch = &mut st1.channel;
if ts_sec_snap != ch.account_emit_last {
ch.account_emit_last = ts_sec_snap;
if ch.account_count != 0 {
let series_id = ch.cssid.id();
let count = ch.account_count as i64;
let bytes = ch.account_bytes as i64;
ch.account_count = 0;
ch.account_bytes = 0;
let item = QueryItem::Accounting(Accounting {
part: (series_id & 0xff) as i32,
ts: ts_sec_snap as i64,
series: SeriesId::new(series_id),
count,
bytes,
});
self.insert_item_queue.push_back(item);
}
}
}
_ => {}
}
}
Ok(())
}
fn tick_writers(&mut self) -> Result<(), Error> {
for (k, st) in &mut self.channels {
if let ChannelState::Writable(st2) = st {

View File

@@ -3,6 +3,7 @@ use super::findioc::FindIocRes;
use crate::ca::conn;
use crate::ca::statemap;
use crate::ca::statemap::CaConnState;
use crate::ca::statemap::MaybeWrongAddressState;
use crate::ca::statemap::WithAddressState;
use crate::conf::CaIngestOpts;
use crate::daemon_common::Channel;
@@ -321,6 +322,10 @@ impl IocAddrQuery {
}
}
fn bump_backoff(x: &mut u32) {
*x = (1 + *x).min(10);
}
struct SeriesLookupSender {
tx: Sender<Result<ChannelInfoResult, Error>>,
}
@@ -724,22 +729,13 @@ impl CaConnSet {
if let Some(addr) = res.addr {
self.stats.ioc_addr_found().inc();
trace!("ioc found {res:?}");
if false {
let since = SystemTime::now();
st2.addr_find_backoff = 0;
st2.inner = WithStatusSeriesIdStateInner::WithAddress {
addr,
state: WithAddressState::Unassigned { since },
};
} else {
let cmd = ChannelAddWithAddr {
backend: self.backend.clone(),
name: res.channel,
addr: SocketAddr::V4(addr),
cssid: st2.cssid.clone(),
};
self.handle_add_channel_with_addr(cmd)?;
}
let cmd = ChannelAddWithAddr {
backend: self.backend.clone(),
name: res.channel,
addr: SocketAddr::V4(addr),
cssid: st2.cssid.clone(),
};
self.handle_add_channel_with_addr(cmd)?;
} else {
self.stats.ioc_addr_not_found().inc();
trace!("ioc not found {res:?}");
@@ -879,8 +875,11 @@ impl CaConnSet {
if let ChannelStateValue::Active(st2) = &mut st1.value {
if let ActiveChannelState::WithStatusSeriesId(st3) = st2 {
trace!("handle_channel_create_fail {addr} {ch:?} set to MaybeWrongAddress");
st3.addr_find_backoff = (st3.addr_find_backoff + 1).min(20);
st3.inner = WithStatusSeriesIdStateInner::MaybeWrongAddress { since: tsnow };
bump_backoff(&mut st3.addr_find_backoff);
st3.inner = WithStatusSeriesIdStateInner::MaybeWrongAddress(MaybeWrongAddressState::new(
tsnow,
st3.addr_find_backoff,
));
}
}
}
@@ -917,14 +916,11 @@ impl CaConnSet {
}
fn handle_connect_fail(&mut self, addr: SocketAddr) -> Result<(), Error> {
// TODO ideally should only remove on EOS.
self.ca_conn_ress.remove(&addr);
self.transition_channels_to_maybe_wrong_address(addr)?;
Ok(())
}
fn transition_channels_to_maybe_wrong_address(&mut self, addr: SocketAddr) -> Result<(), Error> {
trace2!("handle_connect_fail {addr}");
let tsnow = SystemTime::now();
for (ch, st1) in self.channel_states.iter_mut() {
match &mut st1.value {
@@ -945,8 +941,10 @@ impl CaConnSet {
if self.connect_fail_count > 400 {
std::process::exit(1);
}
st3.addr_find_backoff = (st3.addr_find_backoff + 1).min(20);
st3.inner = WithStatusSeriesIdStateInner::MaybeWrongAddress { since: tsnow };
bump_backoff(&mut st3.addr_find_backoff);
st3.inner = WithStatusSeriesIdStateInner::MaybeWrongAddress(
MaybeWrongAddressState::new(tsnow, st3.addr_find_backoff),
);
}
}
}
@@ -1333,14 +1331,12 @@ impl CaConnSet {
}
let addr = SocketAddr::V4(*addr_v4);
cmd_remove_channel.push((addr, ch.clone()));
if st.health_timeout_count < 3 {
st3.addr_find_backoff = (st3.addr_find_backoff + 1).min(20);
st3.inner =
WithStatusSeriesIdStateInner::MaybeWrongAddress { since: stnow };
let item =
ChannelStatusItem::new_closed_conn_timeout(stnow, st3.cssid.clone());
channel_status_items.push(item);
}
bump_backoff(&mut st3.addr_find_backoff);
st3.inner = WithStatusSeriesIdStateInner::MaybeWrongAddress(
MaybeWrongAddressState::new(stnow, st3.addr_find_backoff),
);
let item = ChannelStatusItem::new_closed_conn_timeout(stnow, st3.cssid.clone());
channel_status_items.push(item);
}
}
}
@@ -1350,8 +1346,8 @@ impl CaConnSet {
st3.inner = WithStatusSeriesIdStateInner::UnknownAddress { since: stnow };
}
}
WithStatusSeriesIdStateInner::MaybeWrongAddress { since } => {
if *since + (MAYBE_WRONG_ADDRESS_STAY * st3.addr_find_backoff.max(1).min(10)) < stnow {
WithStatusSeriesIdStateInner::MaybeWrongAddress(st4) => {
if st4.since + st4.backoff_dt < stnow {
if search_pending_count < CURRENT_SEARCH_PENDING_MAX as _ {
trace!("try again channel after MaybeWrongAddress");
if trigger.contains(&ch.id()) {

View File

@@ -8,6 +8,7 @@ use std::collections::BTreeMap;
use std::collections::HashMap;
use std::net::SocketAddrV4;
use std::ops::RangeBounds;
use std::time::Duration;
use std::time::Instant;
use std::time::SystemTime;
@@ -82,10 +83,25 @@ pub enum WithStatusSeriesIdStateInner {
#[serde(with = "humantime_serde")]
since: SystemTime,
},
MaybeWrongAddress {
#[serde(with = "humantime_serde")]
since: SystemTime,
},
MaybeWrongAddress(MaybeWrongAddressState),
}
#[derive(Debug, Clone, Serialize)]
pub struct MaybeWrongAddressState {
#[serde(with = "humantime_serde")]
pub since: SystemTime,
pub backoff_dt: Duration,
}
impl MaybeWrongAddressState {
pub fn new(since: SystemTime, backoff_cnt: u32) -> Self {
let f = 1. + 10. * (backoff_cnt as f32 / 4.).tanh();
let dtms = 4e3_f32 * f;
Self {
since,
backoff_dt: Duration::from_millis(dtms as u64),
}
}
}
#[derive(Debug, Clone, Serialize)]

View File

@@ -5,6 +5,7 @@ use crate::iteminsertqueue::insert_connection_status_fut;
use crate::iteminsertqueue::insert_item;
use crate::iteminsertqueue::insert_item_fut;
use crate::iteminsertqueue::insert_msp_fut;
use crate::iteminsertqueue::Accounting;
use crate::iteminsertqueue::ConnectionStatusItem;
use crate::iteminsertqueue::InsertFut;
use crate::iteminsertqueue::InsertItem;
@@ -294,6 +295,7 @@ async fn worker(
info!("have time bin patch to insert: {item:?}");
return Err(Error::with_msg_no_trace("TODO insert item old path"));
}
QueryItem::Accounting(..) => {}
}
}
stats.worker_finish().inc();
@@ -344,6 +346,9 @@ async fn worker_streamed(
QueryItem::TimeBinSimpleF32(item) => {
prepare_timebin_insert_futs(item, &ttls, &data_store, &stats, tsnow_u64)
}
QueryItem::Accounting(item) => {
prepare_accounting_insert_futs(item, &ttls, &data_store, &stats, tsnow_u64)
}
_ => {
// TODO
debug!("TODO insert item {item:?}");
@@ -489,3 +494,29 @@ fn prepare_timebin_insert_futs(
futs
}
fn prepare_accounting_insert_futs(
item: Accounting,
ttls: &Ttls,
data_store: &Arc<DataStore>,
stats: &Arc<InsertWorkerStats>,
tsnow_u64: u64,
) -> SmallVec<[InsertFut; 4]> {
let params = (
item.part,
item.ts,
item.series.id() as i64,
item.count,
item.bytes,
ttls.binned.as_secs() as i32,
);
let fut = InsertFut::new(
data_store.scy.clone(),
data_store.qu_account_00.clone(),
params,
tsnow_u64,
stats.clone(),
);
let futs = smallvec![fut];
futs
}

View File

@@ -352,6 +352,16 @@ pub enum QueryItem {
Ivl(IvlItem),
ChannelInfo(ChannelInfoItem),
TimeBinSimpleF32(TimeBinSimpleF32),
Accounting(Accounting),
}
#[derive(Debug)]
pub struct Accounting {
pub part: i32,
pub ts: i64,
pub series: SeriesId,
pub count: i64,
pub bytes: i64,
}
struct InsParCom {

View File

@@ -507,5 +507,22 @@ pub async fn migrate_scylla_data_schema(scyconf: &ScyllaConfig) -> Result<(), Er
);
tab.create_if_missing(scy).await?;
}
{
let tab = GenTwcsTab::new(
"account_00",
&[
("part", "int"),
("ts", "bigint"),
("series", "bigint"),
("count", "bigint"),
("bytes", "bigint"),
],
["part", "ts"],
["series"],
ddays(30),
ddays(4),
);
tab.create_if_missing(scy).await?;
}
Ok(())
}

View File

@@ -41,6 +41,7 @@ pub struct DataStore {
pub qu_insert_channel_status_by_ts_msp: Arc<PreparedStatement>,
pub qu_insert_channel_ping: Arc<PreparedStatement>,
pub qu_insert_binned_scalar_f32_v02: Arc<PreparedStatement>,
pub qu_account_00: Arc<PreparedStatement>,
}
impl DataStore {
@@ -169,6 +170,15 @@ impl DataStore {
);
let q = scy.prepare(cql).await?;
let qu_insert_binned_scalar_f32_v02 = Arc::new(q);
let cql = concat!(
"insert into account_00",
" (part, ts, series, count, bytes)",
" values (?, ?, ?, ?, ?) using ttl ?"
);
let q = scy.prepare(cql).await?;
let qu_account_00 = Arc::new(q);
let ret = Self {
scy,
qu_insert_ts_msp,
@@ -195,6 +205,7 @@ impl DataStore {
qu_insert_channel_status_by_ts_msp,
qu_insert_channel_ping,
qu_insert_binned_scalar_f32_v02,
qu_account_00,
};
Ok(ret)
}

View File

@@ -252,6 +252,7 @@ impl EstablishWriterWorker {
.map(move |item| {
let wtx = self.worker_tx.clone();
let cnt = cnt.clone();
let stats = self.stats.clone();
async move {
let res = SeriesWriter::establish(
wtx.clone(),
@@ -264,7 +265,8 @@ impl EstablishWriterWorker {
.await;
cnt.fetch_add(1, atomic::Ordering::SeqCst);
if item.restx.send((item.job_id, res)).await.is_err() {
warn!("can not send writer establish result");
stats.result_send_fail().inc();
trace!("can not send writer establish result");
}
}
})

View File

@@ -318,7 +318,6 @@ stats_proc::stats_struct!((
time_handle_conn_listen,
time_handle_peer_ready,
time_check_channels_state_init,
time_handle_event_add_res,
tcp_connected,
get_series_id_ok,
item_count,
@@ -494,7 +493,11 @@ stats_proc::stats_struct!((
),
values(db_lookup_workers,)
),
stats_struct(name(SeriesWriterEstablishStats), prefix(wrest), counters(job_recv,),),
stats_struct(
name(SeriesWriterEstablishStats),
prefix(wrest),
counters(job_recv, result_send_fail,),
),
));
stats_proc::stats_struct!((