diff --git a/dbpg/src/schema.rs b/dbpg/src/schema.rs index 4ab2be5..16a3da2 100644 --- a/dbpg/src/schema.rs +++ b/dbpg/src/schema.rs @@ -75,11 +75,11 @@ create table if not exists series_by_channel ( scalar_type int not null, shape_dims int[] not null, agg_kind int not null, - tscreate timestamptz not null default 'now()' + tscreate timestamptz not null default now() )"; let _ = pgc.execute(sql, &[]).await; - let sql = "alter table series_by_channel add tscreate timestamptz not null default 'now()'"; + let sql = "alter table series_by_channel add tscreate timestamptz not null default now()"; let _ = pgc.execute(sql, &[]).await; if !has_table("ioc_by_channel_log", pgc).await? { diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 06793d1..6dbd1c3 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -20,6 +20,7 @@ use netpod::timeunits::*; use netpod::ScalarType; use netpod::Shape; use netpod::TsNano; +use netpod::EMIT_ACCOUNTING_SNAP; use proto::CaItem; use proto::CaMsg; use proto::CaMsgTy; @@ -27,6 +28,7 @@ use proto::CaProto; use proto::CreateChan; use proto::EventAdd; use scywr::iteminsertqueue as scywriiq; +use scywr::iteminsertqueue::Accounting; use scywr::iteminsertqueue::DataValue; use scywriiq::ChannelInfoItem; use scywriiq::ChannelStatus; @@ -310,6 +312,9 @@ struct CreatedState { stwin_ts: u64, stwin_count: u32, stwin_bytes: u32, + account_emit_last: u64, + account_count: u64, + account_bytes: u64, } impl CreatedState { @@ -338,6 +343,9 @@ impl CreatedState { stwin_ts: 0, stwin_count: 0, stwin_bytes: 0, + account_emit_last: 0, + account_count: 0, + account_bytes: 0, } } } @@ -1620,6 +1628,11 @@ impl CaConn { let ts_diff = ts.abs_diff(ts_local); stats.ca_ts_off().ingest((ts_diff / MS) as u32); if tsnow >= crst.insert_next_earliest { + { + crst.account_count += 1; + // TODO how do we account for bytes? Here, we also add 8 bytes for the timestamp. + crst.account_bytes += 8 + payload_len as u64; + } { crst.muted_before = 0; crst.insert_item_ivl_ema.tick(tsnow); @@ -1882,23 +1895,13 @@ impl CaConn { cx.waker().wake_by_ref(); } CaMsgTy::EventAddRes(ev) => { - trace!("got EventAddRes {:?} cnt {}", camsg.ts, ev.data_count); + trace2!("got EventAddRes {:?} cnt {}", camsg.ts, ev.data_count); self.stats.event_add_res_recv.inc(); - let res = Self::handle_event_add_res(self, ev, tsnow); - let ts2 = Instant::now(); - self.stats - .time_handle_event_add_res - .add((ts2.duration_since(tsnow) * MS as u32).as_secs()); - res?; + Self::handle_event_add_res(self, ev, tsnow)? } CaMsgTy::EventAddResEmpty(ev) => { - trace!("got EventAddResEmpty {:?}", camsg.ts); - let res = Self::handle_event_add_res_empty(self, ev, tsnow); - let ts2 = Instant::now(); - self.stats - .time_handle_event_add_res - .add((ts2.duration_since(tsnow) * MS as u32).as_secs()); - res?; + trace2!("got EventAddResEmpty {:?}", camsg.ts); + Self::handle_event_add_res_empty(self, ev, tsnow)? } CaMsgTy::ReadNotifyRes(ev) => Self::handle_read_notify_res(self, ev, tsnow)?, CaMsgTy::Echo => { @@ -2036,6 +2039,9 @@ impl CaConn { stwin_ts: 0, stwin_count: 0, stwin_bytes: 0, + account_emit_last: 0, + account_count: 0, + account_bytes: 0, }; *ch_s = ChannelState::MakingSeriesWriter(MakingSeriesWriterState { tsbeg: tsnow, channel }); let job = EstablishWorkerJob::new( @@ -2230,6 +2236,7 @@ impl CaConn { if self.channel_status_emit_last + Duration::from_millis(3000) <= tsnow { self.channel_status_emit_last = tsnow; self.emit_channel_status()?; + self.emit_accounting()?; } if self.tick_last_writer + Duration::from_millis(2000) <= tsnow { self.tick_last_writer = tsnow; @@ -2276,6 +2283,39 @@ impl CaConn { Ok(()) } + fn emit_accounting(&mut self) -> Result<(), Error> { + let stnow = self.tmp_ts_poll; + let ts_sec = stnow.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs(); + let ts_sec_snap = ts_sec / EMIT_ACCOUNTING_SNAP * EMIT_ACCOUNTING_SNAP; + for (_k, st0) in self.channels.iter_mut() { + match st0 { + ChannelState::Writable(st1) => { + let ch = &mut st1.channel; + if ts_sec_snap != ch.account_emit_last { + ch.account_emit_last = ts_sec_snap; + if ch.account_count != 0 { + let series_id = ch.cssid.id(); + let count = ch.account_count as i64; + let bytes = ch.account_bytes as i64; + ch.account_count = 0; + ch.account_bytes = 0; + let item = QueryItem::Accounting(Accounting { + part: (series_id & 0xff) as i32, + ts: ts_sec_snap as i64, + series: SeriesId::new(series_id), + count, + bytes, + }); + self.insert_item_queue.push_back(item); + } + } + } + _ => {} + } + } + Ok(()) + } + fn tick_writers(&mut self) -> Result<(), Error> { for (k, st) in &mut self.channels { if let ChannelState::Writable(st2) = st { diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index c962b36..e2ed9e1 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -3,6 +3,7 @@ use super::findioc::FindIocRes; use crate::ca::conn; use crate::ca::statemap; use crate::ca::statemap::CaConnState; +use crate::ca::statemap::MaybeWrongAddressState; use crate::ca::statemap::WithAddressState; use crate::conf::CaIngestOpts; use crate::daemon_common::Channel; @@ -321,6 +322,10 @@ impl IocAddrQuery { } } +fn bump_backoff(x: &mut u32) { + *x = (1 + *x).min(10); +} + struct SeriesLookupSender { tx: Sender>, } @@ -724,22 +729,13 @@ impl CaConnSet { if let Some(addr) = res.addr { self.stats.ioc_addr_found().inc(); trace!("ioc found {res:?}"); - if false { - let since = SystemTime::now(); - st2.addr_find_backoff = 0; - st2.inner = WithStatusSeriesIdStateInner::WithAddress { - addr, - state: WithAddressState::Unassigned { since }, - }; - } else { - let cmd = ChannelAddWithAddr { - backend: self.backend.clone(), - name: res.channel, - addr: SocketAddr::V4(addr), - cssid: st2.cssid.clone(), - }; - self.handle_add_channel_with_addr(cmd)?; - } + let cmd = ChannelAddWithAddr { + backend: self.backend.clone(), + name: res.channel, + addr: SocketAddr::V4(addr), + cssid: st2.cssid.clone(), + }; + self.handle_add_channel_with_addr(cmd)?; } else { self.stats.ioc_addr_not_found().inc(); trace!("ioc not found {res:?}"); @@ -879,8 +875,11 @@ impl CaConnSet { if let ChannelStateValue::Active(st2) = &mut st1.value { if let ActiveChannelState::WithStatusSeriesId(st3) = st2 { trace!("handle_channel_create_fail {addr} {ch:?} set to MaybeWrongAddress"); - st3.addr_find_backoff = (st3.addr_find_backoff + 1).min(20); - st3.inner = WithStatusSeriesIdStateInner::MaybeWrongAddress { since: tsnow }; + bump_backoff(&mut st3.addr_find_backoff); + st3.inner = WithStatusSeriesIdStateInner::MaybeWrongAddress(MaybeWrongAddressState::new( + tsnow, + st3.addr_find_backoff, + )); } } } @@ -917,14 +916,11 @@ impl CaConnSet { } fn handle_connect_fail(&mut self, addr: SocketAddr) -> Result<(), Error> { - // TODO ideally should only remove on EOS. - self.ca_conn_ress.remove(&addr); self.transition_channels_to_maybe_wrong_address(addr)?; Ok(()) } fn transition_channels_to_maybe_wrong_address(&mut self, addr: SocketAddr) -> Result<(), Error> { - trace2!("handle_connect_fail {addr}"); let tsnow = SystemTime::now(); for (ch, st1) in self.channel_states.iter_mut() { match &mut st1.value { @@ -945,8 +941,10 @@ impl CaConnSet { if self.connect_fail_count > 400 { std::process::exit(1); } - st3.addr_find_backoff = (st3.addr_find_backoff + 1).min(20); - st3.inner = WithStatusSeriesIdStateInner::MaybeWrongAddress { since: tsnow }; + bump_backoff(&mut st3.addr_find_backoff); + st3.inner = WithStatusSeriesIdStateInner::MaybeWrongAddress( + MaybeWrongAddressState::new(tsnow, st3.addr_find_backoff), + ); } } } @@ -1333,14 +1331,12 @@ impl CaConnSet { } let addr = SocketAddr::V4(*addr_v4); cmd_remove_channel.push((addr, ch.clone())); - if st.health_timeout_count < 3 { - st3.addr_find_backoff = (st3.addr_find_backoff + 1).min(20); - st3.inner = - WithStatusSeriesIdStateInner::MaybeWrongAddress { since: stnow }; - let item = - ChannelStatusItem::new_closed_conn_timeout(stnow, st3.cssid.clone()); - channel_status_items.push(item); - } + bump_backoff(&mut st3.addr_find_backoff); + st3.inner = WithStatusSeriesIdStateInner::MaybeWrongAddress( + MaybeWrongAddressState::new(stnow, st3.addr_find_backoff), + ); + let item = ChannelStatusItem::new_closed_conn_timeout(stnow, st3.cssid.clone()); + channel_status_items.push(item); } } } @@ -1350,8 +1346,8 @@ impl CaConnSet { st3.inner = WithStatusSeriesIdStateInner::UnknownAddress { since: stnow }; } } - WithStatusSeriesIdStateInner::MaybeWrongAddress { since } => { - if *since + (MAYBE_WRONG_ADDRESS_STAY * st3.addr_find_backoff.max(1).min(10)) < stnow { + WithStatusSeriesIdStateInner::MaybeWrongAddress(st4) => { + if st4.since + st4.backoff_dt < stnow { if search_pending_count < CURRENT_SEARCH_PENDING_MAX as _ { trace!("try again channel after MaybeWrongAddress"); if trigger.contains(&ch.id()) { diff --git a/netfetch/src/ca/statemap.rs b/netfetch/src/ca/statemap.rs index 4b9728b..4b8c5a2 100644 --- a/netfetch/src/ca/statemap.rs +++ b/netfetch/src/ca/statemap.rs @@ -8,6 +8,7 @@ use std::collections::BTreeMap; use std::collections::HashMap; use std::net::SocketAddrV4; use std::ops::RangeBounds; +use std::time::Duration; use std::time::Instant; use std::time::SystemTime; @@ -82,10 +83,25 @@ pub enum WithStatusSeriesIdStateInner { #[serde(with = "humantime_serde")] since: SystemTime, }, - MaybeWrongAddress { - #[serde(with = "humantime_serde")] - since: SystemTime, - }, + MaybeWrongAddress(MaybeWrongAddressState), +} + +#[derive(Debug, Clone, Serialize)] +pub struct MaybeWrongAddressState { + #[serde(with = "humantime_serde")] + pub since: SystemTime, + pub backoff_dt: Duration, +} + +impl MaybeWrongAddressState { + pub fn new(since: SystemTime, backoff_cnt: u32) -> Self { + let f = 1. + 10. * (backoff_cnt as f32 / 4.).tanh(); + let dtms = 4e3_f32 * f; + Self { + since, + backoff_dt: Duration::from_millis(dtms as u64), + } + } } #[derive(Debug, Clone, Serialize)] diff --git a/scywr/src/insertworker.rs b/scywr/src/insertworker.rs index aaba377..112c4b1 100644 --- a/scywr/src/insertworker.rs +++ b/scywr/src/insertworker.rs @@ -5,6 +5,7 @@ use crate::iteminsertqueue::insert_connection_status_fut; use crate::iteminsertqueue::insert_item; use crate::iteminsertqueue::insert_item_fut; use crate::iteminsertqueue::insert_msp_fut; +use crate::iteminsertqueue::Accounting; use crate::iteminsertqueue::ConnectionStatusItem; use crate::iteminsertqueue::InsertFut; use crate::iteminsertqueue::InsertItem; @@ -294,6 +295,7 @@ async fn worker( info!("have time bin patch to insert: {item:?}"); return Err(Error::with_msg_no_trace("TODO insert item old path")); } + QueryItem::Accounting(..) => {} } } stats.worker_finish().inc(); @@ -344,6 +346,9 @@ async fn worker_streamed( QueryItem::TimeBinSimpleF32(item) => { prepare_timebin_insert_futs(item, &ttls, &data_store, &stats, tsnow_u64) } + QueryItem::Accounting(item) => { + prepare_accounting_insert_futs(item, &ttls, &data_store, &stats, tsnow_u64) + } _ => { // TODO debug!("TODO insert item {item:?}"); @@ -489,3 +494,29 @@ fn prepare_timebin_insert_futs( futs } + +fn prepare_accounting_insert_futs( + item: Accounting, + ttls: &Ttls, + data_store: &Arc, + stats: &Arc, + tsnow_u64: u64, +) -> SmallVec<[InsertFut; 4]> { + let params = ( + item.part, + item.ts, + item.series.id() as i64, + item.count, + item.bytes, + ttls.binned.as_secs() as i32, + ); + let fut = InsertFut::new( + data_store.scy.clone(), + data_store.qu_account_00.clone(), + params, + tsnow_u64, + stats.clone(), + ); + let futs = smallvec![fut]; + futs +} diff --git a/scywr/src/iteminsertqueue.rs b/scywr/src/iteminsertqueue.rs index 9ef9a02..45c87ec 100644 --- a/scywr/src/iteminsertqueue.rs +++ b/scywr/src/iteminsertqueue.rs @@ -352,6 +352,16 @@ pub enum QueryItem { Ivl(IvlItem), ChannelInfo(ChannelInfoItem), TimeBinSimpleF32(TimeBinSimpleF32), + Accounting(Accounting), +} + +#[derive(Debug)] +pub struct Accounting { + pub part: i32, + pub ts: i64, + pub series: SeriesId, + pub count: i64, + pub bytes: i64, } struct InsParCom { diff --git a/scywr/src/schema.rs b/scywr/src/schema.rs index 7d3d126..8314f93 100644 --- a/scywr/src/schema.rs +++ b/scywr/src/schema.rs @@ -507,5 +507,22 @@ pub async fn migrate_scylla_data_schema(scyconf: &ScyllaConfig) -> Result<(), Er ); tab.create_if_missing(scy).await?; } + { + let tab = GenTwcsTab::new( + "account_00", + &[ + ("part", "int"), + ("ts", "bigint"), + ("series", "bigint"), + ("count", "bigint"), + ("bytes", "bigint"), + ], + ["part", "ts"], + ["series"], + ddays(30), + ddays(4), + ); + tab.create_if_missing(scy).await?; + } Ok(()) } diff --git a/scywr/src/store.rs b/scywr/src/store.rs index 053c9e6..ce7f75b 100644 --- a/scywr/src/store.rs +++ b/scywr/src/store.rs @@ -41,6 +41,7 @@ pub struct DataStore { pub qu_insert_channel_status_by_ts_msp: Arc, pub qu_insert_channel_ping: Arc, pub qu_insert_binned_scalar_f32_v02: Arc, + pub qu_account_00: Arc, } impl DataStore { @@ -169,6 +170,15 @@ impl DataStore { ); let q = scy.prepare(cql).await?; let qu_insert_binned_scalar_f32_v02 = Arc::new(q); + + let cql = concat!( + "insert into account_00", + " (part, ts, series, count, bytes)", + " values (?, ?, ?, ?, ?) using ttl ?" + ); + let q = scy.prepare(cql).await?; + let qu_account_00 = Arc::new(q); + let ret = Self { scy, qu_insert_ts_msp, @@ -195,6 +205,7 @@ impl DataStore { qu_insert_channel_status_by_ts_msp, qu_insert_channel_ping, qu_insert_binned_scalar_f32_v02, + qu_account_00, }; Ok(ret) } diff --git a/serieswriter/src/writer.rs b/serieswriter/src/writer.rs index 12dff63..5e8e81a 100644 --- a/serieswriter/src/writer.rs +++ b/serieswriter/src/writer.rs @@ -252,6 +252,7 @@ impl EstablishWriterWorker { .map(move |item| { let wtx = self.worker_tx.clone(); let cnt = cnt.clone(); + let stats = self.stats.clone(); async move { let res = SeriesWriter::establish( wtx.clone(), @@ -264,7 +265,8 @@ impl EstablishWriterWorker { .await; cnt.fetch_add(1, atomic::Ordering::SeqCst); if item.restx.send((item.job_id, res)).await.is_err() { - warn!("can not send writer establish result"); + stats.result_send_fail().inc(); + trace!("can not send writer establish result"); } } }) diff --git a/stats/src/stats.rs b/stats/src/stats.rs index 4ae9ada..859bc1f 100644 --- a/stats/src/stats.rs +++ b/stats/src/stats.rs @@ -318,7 +318,6 @@ stats_proc::stats_struct!(( time_handle_conn_listen, time_handle_peer_ready, time_check_channels_state_init, - time_handle_event_add_res, tcp_connected, get_series_id_ok, item_count, @@ -494,7 +493,11 @@ stats_proc::stats_struct!(( ), values(db_lookup_workers,) ), - stats_struct(name(SeriesWriterEstablishStats), prefix(wrest), counters(job_recv,),), + stats_struct( + name(SeriesWriterEstablishStats), + prefix(wrest), + counters(job_recv, result_send_fail,), + ), )); stats_proc::stats_struct!((