Cleaning and transition more stats counters

This commit is contained in:
Dominik Werder
2025-04-28 17:12:46 +02:00
parent a653816ed2
commit 265f4b9bd9
25 changed files with 265 additions and 2116 deletions

View File

@@ -1,6 +1,6 @@
[package]
name = "daqingest"
version = "0.3.0-aa.1"
version = "0.3.0-aa.2"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2024"

View File

@@ -84,8 +84,6 @@ pub struct Daemon {
series_conf_by_id_tx: Sender<()>,
iqtx: Option<InsertQueuesTx>,
daemon_metrics: stats::mett::DaemonMetrics,
cpu_latest: u64,
rss_latest: u64,
}
impl Daemon {
@@ -331,7 +329,6 @@ impl Daemon {
insert_worker_jhs.extend(jh);
};
let stats = Arc::new(DaemonStats::new());
stats.insert_worker_spawned().add(insert_worker_jhs.len() as _);
#[cfg(feature = "bsread")]
if let Some(bsaddr) = &opts.test_bsread_addr {
@@ -419,8 +416,6 @@ impl Daemon {
series_conf_by_id_tx,
iqtx: Some(iqtx2),
daemon_metrics: stats::mett::DaemonMetrics::new(),
cpu_latest: 0,
rss_latest: 0,
};
Ok(ret)
}
@@ -479,15 +474,7 @@ impl Daemon {
fn update_cpu_usage(&mut self) {
let cpu = Self::get_cpu_usage();
if cpu > self.cpu_latest {
let diff = cpu - self.cpu_latest;
self.cpu_latest = cpu;
self.daemon_metrics.proc_cpu_v0_inc().add(diff as u32);
} else if cpu < self.cpu_latest {
let diff = self.cpu_latest - cpu;
self.cpu_latest = cpu;
self.daemon_metrics.proc_cpu_v0_dec().add(diff as u32);
}
self.daemon_metrics.proc_cpu_v0().set(cpu as _);
}
fn get_memory_usage() -> u64 {
@@ -518,15 +505,7 @@ impl Daemon {
fn update_memory_usage(&mut self) {
let rss = Self::get_memory_usage();
if rss > self.rss_latest {
let diff = rss - self.rss_latest;
self.rss_latest = rss;
self.daemon_metrics.proc_mem_rss_inc().add(diff as u32);
} else if rss < self.rss_latest {
let diff = self.rss_latest - rss;
self.rss_latest = rss;
self.daemon_metrics.proc_mem_rss_dec().add(diff as u32);
}
self.daemon_metrics.proc_mem_rss().set(rss as _);
}
async fn handle_timer_tick(&mut self) -> Result<(), Error> {
@@ -545,7 +524,6 @@ impl Daemon {
std::process::exit(0);
}
}
self.stats.handle_timer_tick_count.inc();
let tsnow = SystemTime::now();
{
let n = SIGINT.load(atomic::Ordering::Acquire);
@@ -583,18 +561,19 @@ impl Daemon {
.as_ref()
.map(|x| netfetch::metrics::types::InsertQueuesTxMetrics::from(x));
if let Some(iqtxm) = iqtxm {
// TODO metrics
self.stats().iqtx_len_st_rf1().set(iqtxm.st_rf1_len as _);
self.stats().iqtx_len_st_rf3().set(iqtxm.st_rf3_len as _);
self.stats().iqtx_len_mt_rf3().set(iqtxm.mt_rf3_len as _);
self.stats().iqtx_len_lt_rf3().set(iqtxm.lt_rf3_len as _);
self.stats().iqtx_len_lt_rf3_lat5().set(iqtxm.lt_rf3_lat5_len as _);
self.daemon_metrics.iqtx_len_st_rf1().set(iqtxm.st_rf1_len as _);
self.daemon_metrics.iqtx_len_st_rf3().set(iqtxm.st_rf3_len as _);
self.daemon_metrics.iqtx_len_mt_rf3().set(iqtxm.mt_rf3_len as _);
self.daemon_metrics.iqtx_len_lt_rf3().set(iqtxm.lt_rf3_len as _);
self.daemon_metrics
.iqtx_len_lt_rf3_lat5()
.set(iqtxm.lt_rf3_lat5_len as _);
} else {
self.stats().iqtx_len_st_rf1().set(2);
self.stats().iqtx_len_st_rf3().set(2);
self.stats().iqtx_len_mt_rf3().set(2);
self.stats().iqtx_len_lt_rf3().set(2);
self.stats().iqtx_len_lt_rf3_lat5().set(2);
self.daemon_metrics.iqtx_len_st_rf1().set(0);
self.daemon_metrics.iqtx_len_st_rf3().set(0);
self.daemon_metrics.iqtx_len_mt_rf3().set(0);
self.daemon_metrics.iqtx_len_lt_rf3().set(0);
self.daemon_metrics.iqtx_len_lt_rf3_lat5().set(0);
}
self.update_cpu_usage();
self.update_memory_usage();
@@ -677,7 +656,7 @@ impl Daemon {
Healthy => {
let tsnow = Instant::now();
self.connset_status_last = tsnow;
self.stats.caconnset_health_response().inc();
self.daemon_metrics.caconnset_health_response().inc();
}
Error(e) => {
error!("error from CaConnSet: {e}");
@@ -766,21 +745,21 @@ impl Daemon {
match self.handle_config_reload_inner().await {
Ok(()) => {
if tx.send(0).await.is_err() {
self.stats.channel_send_err().inc();
self.daemon_metrics.channel_send_err().inc();
}
Ok(())
}
Err(e) => {
error!("{e}");
if tx.send(127).await.is_err() {
self.stats.channel_send_err().inc();
self.daemon_metrics.channel_send_err().inc();
}
Ok(())
}
}
}
#[cfg(target_abi = "x32")]
#[cfg(feature = "DISABLED")]
async fn handle_shutdown(&mut self) -> Result<(), Error> {
warn!("received shutdown event");
if self.shutting_down {
@@ -795,7 +774,7 @@ impl Daemon {
async fn handle_event(&mut self, item: DaemonEvent) -> Result<(), Error> {
use DaemonEvent::*;
self.stats.events.inc();
self.daemon_metrics.handle_event().inc();
let ts1 = Instant::now();
let item_summary = item.summary();
let ret = match item {
@@ -805,7 +784,6 @@ impl Daemon {
match tx.send(i.wrapping_add(1)).await {
Ok(()) => {}
Err(_) => {
self.stats.ticker_token_release_error.inc();
error!("can not send ticker token");
return Err(Error::with_msg_no_trace("can not send ticker token"));
}
@@ -871,7 +849,7 @@ impl Daemon {
match ticker_inp_rx.recv().await {
Ok(_) => {}
Err(_) => {
stats.ticker_token_acquire_error.inc();
panic!("can not acquire timer ticker token");
break;
}
}
@@ -909,7 +887,6 @@ impl Daemon {
daemon_stats,
conn_set_stats,
ca_conn_stats,
self.connset_ctrl.ca_proto_stats().clone(),
self.insert_worker_stats.clone(),
self.series_by_channel_stats.clone(),
self.connset_ctrl.ioc_finder_stats().clone(),
@@ -960,17 +937,12 @@ impl Daemon {
while let Some(jh) = self.insert_workers_jhs.pop() {
match jh.await.map_err(Error::from_string) {
Ok(x) => match x {
Ok(()) => {
self.stats.insert_worker_join_ok().inc();
// debug!("joined insert worker");
}
Ok(()) => {}
Err(e) => {
self.stats.insert_worker_join_ok_err().inc();
error!("joined insert worker, error {e}");
}
},
Err(e) => {
self.stats.insert_worker_join_err().inc();
error!("insert worker join error {e}");
}
}

View File

@@ -3,8 +3,8 @@ use async_channel::Sender;
use chrono::DateTime;
use chrono::Utc;
use core::fmt;
use err::thiserror;
use err::ThisError;
use err::thiserror;
use futures_util::Future;
use futures_util::TryFutureExt;
use log::*;
@@ -626,6 +626,7 @@ pub async fn start_lookup_workers<FR: HashSalter>(
Ok((query_tx, jhs, bjh))
}
#[allow(unused)]
struct SalterTest;
impl HashSalter for SalterTest {
@@ -639,6 +640,8 @@ pub struct SalterRandom;
impl HashSalter for SalterRandom {
fn hupd(hupd: &mut dyn FnMut(&[u8]), i1: u16, i2: u16) {
let _ = i1;
let _ = i2;
let tsnow = Instant::now();
let b = unsafe { &*(&tsnow as *const Instant as *const [u8; core::mem::size_of::<Instant>()]) };
hupd(b)
@@ -647,9 +650,9 @@ impl HashSalter for SalterRandom {
#[cfg(test)]
async fn psql_play(db: &Database) -> Result<(), Error> {
use tokio_postgres::types::ToSql;
// use tokio_postgres::types::ToSql;
use tokio_postgres::types::Type;
let (pg, pg_client_jh) = crate::conn::make_pg_client(db).await?;
let (pg, _pg_client_jh) = crate::conn::make_pg_client(db).await?;
if false {
let sql = concat!("select pg_typeof($1)");
let qu = pg.prepare_typed(sql, &[Type::INT4_ARRAY]).await?;
@@ -763,7 +766,7 @@ fn test_series_by_channel_01() {
}
}
// TODO keep join handles and await later
let (channel_info_query_tx, jhs, jh) =
let (channel_info_query_tx, _jhs, _jh) =
dbpg::seriesbychannel::start_lookup_workers::<SalterTest>(1, &pgconf, series_by_channel_stats.clone())
.await?;
@@ -895,10 +898,3 @@ fn test_db_conf() -> Database {
}
}
}
#[cfg(test)]
async fn test_db_conn() -> Result<PgClient, Error> {
let db = test_db_conf();
let (pg, pg_client_jh) = crate::conn::make_pg_client(&db).await?;
Ok(pg)
}

View File

@@ -8,7 +8,6 @@ pub mod findioc;
pub mod search;
pub mod statemap;
use crate::metrics::ExtraInsertsConf;
use futures_util::Future;
use futures_util::FutureExt;
use log::*;

View File

@@ -41,7 +41,6 @@ pub async fn listen_beacons(
worker_tx: Sender<ChannelInfoQuery>,
backend: String,
) -> Result<(), Error> {
let stnow = SystemTime::now();
let channel = "epics-ca-beacons".to_string();
let scalar_type = ScalarType::U64;
let shape = Shape::Scalar;
@@ -56,6 +55,7 @@ pub async fn listen_beacons(
};
worker_tx.send(qu).await?;
let chinfo = rx.recv().await??;
let _ = chinfo;
// TODO
// let mut writer = SeriesWriter::new(chinfo.series.to_series());
// let mut deque = VecDeque::new();
@@ -91,6 +91,8 @@ pub async fn listen_beacons(
let ts_local = ts;
let blob = addr_u32 as i64;
let val = DataValue::Scalar(ScalarValue::I64(blob));
let _ = ts_local;
let _ = val;
// writer.write(ts, ts_local, val, &mut deque)?;
}
}

View File

@@ -781,6 +781,7 @@ enum CaConnState {
PeerReady,
Shutdown(EndOfStreamReason),
EndOfStream,
MetricsEmitted,
}
impl fmt::Debug for CaConnState {
@@ -793,6 +794,7 @@ impl fmt::Debug for CaConnState {
Self::PeerReady => fmt.debug_tuple("PeerReady").finish(),
Self::Shutdown(v0) => fmt.debug_tuple("Shutdown").field(v0).finish(),
Self::EndOfStream => fmt.debug_tuple("EndOfStream").finish(),
Self::MetricsEmitted => fmt.debug_tuple("MetricsEmitted").finish(),
}
}
}
@@ -1101,7 +1103,6 @@ pub struct CaConn {
ca_conn_event_out_queue: VecDeque<CaConnEvent>,
ca_conn_event_out_queue_max: usize,
thr_msg_poll: ThrottleTrace,
ca_proto_stats: Arc<CaProtoStats>,
rng: Xoshiro128PlusPlus,
channel_info_query_qu: VecDeque<ChannelInfoQuery>,
channel_info_query_tx: Pin<Box<SenderPolling<ChannelInfoQuery>>>,
@@ -1118,6 +1119,7 @@ pub struct CaConn {
ts_channel_status_pong_last: Instant,
mett: stats::mett::CaConnMetrics,
metrics_emit_last: Instant,
fionread_last: u32,
}
impl Drop for CaConn {
@@ -1135,7 +1137,6 @@ impl CaConn {
iqtx: InsertQueuesTx,
channel_info_query_tx: Sender<ChannelInfoQuery>,
stats: Arc<CaConnStats>,
ca_proto_stats: Arc<CaProtoStats>,
) -> Self {
let tsnow = Instant::now();
let (cq_tx, cq_rx) = async_channel::bounded(32);
@@ -1174,7 +1175,6 @@ impl CaConn {
ca_conn_event_out_queue: VecDeque::new(),
ca_conn_event_out_queue_max: 2000,
thr_msg_poll: ThrottleTrace::new(Duration::from_millis(2000)),
ca_proto_stats,
rng,
channel_info_query_qu: VecDeque::new(),
channel_info_query_tx: Box::pin(SenderPolling::new(channel_info_query_tx)),
@@ -1188,6 +1188,7 @@ impl CaConn {
ts_channel_status_pong_last: tsnow,
mett: stats::mett::CaConnMetrics::new(),
metrics_emit_last: tsnow,
fionread_last: 0,
}
}
@@ -1528,12 +1529,14 @@ impl CaConn {
if dbg_chn_cid {
info!("send out EventAdd for {cid:?}");
}
let ty = CaMsgTy::EventAdd(EventAdd {
sid: st2.channel.sid.to_u32(),
data_type: st2.channel.ca_dbr_type,
data_count: st2.channel.ca_dbr_count,
subid: subid.to_u32(),
});
let data_count = st2.channel.ca_dbr_count;
let _data_count = 0;
let ty = CaMsgTy::EventAdd(EventAdd::new(
st2.channel.ca_dbr_type,
data_count,
st2.channel.sid.to_u32(),
subid.to_u32(),
));
let msg = CaMsg::from_ty_ts(ty, self.poll_tsnow);
let proto = self.proto.as_mut().unwrap();
proto.push_out(msg);
@@ -1802,7 +1805,12 @@ impl CaConn {
Ok(())
}
fn handle_event_add_res(&mut self, ev: proto::EventAddRes, tsnow: Instant) -> Result<(), Error> {
fn handle_event_add_res(
&mut self,
ev: proto::EventAddRes,
tsnow: Instant,
tscaproto: Instant,
) -> Result<(), Error> {
let subid = Subid(ev.subid);
// TODO handle subid-not-found which can also be peer error:
let cid = if let Some(x) = self.cid_by_subid.get(&subid) {
@@ -1910,6 +1918,7 @@ impl CaConn {
iqdqs,
tsnow,
stnow,
tscaproto,
ch_conf.use_ioc_time(),
stats,
&mut self.rng,
@@ -1942,6 +1951,7 @@ impl CaConn {
iqdqs,
tsnow,
stnow,
tscaproto,
ch_conf.use_ioc_time(),
stats,
&mut self.rng,
@@ -2049,8 +2059,8 @@ impl CaConn {
fn handle_read_notify_res(
&mut self,
ev: proto::ReadNotifyRes,
camsg_ts: Instant,
tsnow: Instant,
tscaproto: Instant,
) -> Result<(), Error> {
// trace!("handle_read_notify_res {ev:?}");
// TODO can not rely on the SID in the response.
@@ -2058,10 +2068,7 @@ impl CaConn {
let ioid = Ioid(ev.ioid);
if let Some(pp) = self.handler_by_ioid.get_mut(&ioid) {
if let Some(mut fut) = pp.take() {
let camsg = CaMsg {
ty: CaMsgTy::ReadNotifyRes(ev),
ts: camsg_ts,
};
let camsg = CaMsg::from_ty_ts(CaMsgTy::ReadNotifyRes(ev), tscaproto);
fut.as_mut().camsg(camsg, self)?;
Ok(())
} else {
@@ -2119,6 +2126,7 @@ impl CaConn {
iqdqs,
stnow,
tsnow,
tscaproto,
ch_conf.use_ioc_time(),
stats,
&mut self.rng,
@@ -2211,6 +2219,7 @@ impl CaConn {
iqdqs,
stnow,
tsnow,
tscaproto,
ch_conf.use_ioc_time(),
stats,
&mut self.rng,
@@ -2243,6 +2252,7 @@ impl CaConn {
iqdqs: &mut InsertDeques,
stnow: SystemTime,
tsnow: Instant,
tscaproto: Instant,
use_ioc_time: bool,
stats: &CaConnStats,
rng: &mut Xoshiro128PlusPlus,
@@ -2260,6 +2270,7 @@ impl CaConn {
iqdqs,
tsnow,
stnow,
tscaproto,
use_ioc_time,
stats,
rng,
@@ -2277,6 +2288,7 @@ impl CaConn {
iqdqs: &mut InsertDeques,
tsnow: Instant,
stnow: SystemTime,
tscaproto: Instant,
use_ioc_time: bool,
stats: &CaConnStats,
rng: &mut Xoshiro128PlusPlus,
@@ -2340,7 +2352,7 @@ impl CaConn {
crst.insert_item_ivl_ema.tick(tsnow);
// binwriter.ingest(tsev, value.f32_for_binning(), iqdqs)?;
{
let wres = writer.write(CaWriterValue::new(value, crst), tsnow, tsev, iqdqs)?;
let wres = writer.write(CaWriterValue::new(value, crst), tscaproto, tsev, iqdqs)?;
crst.status_emit_count += wres.nstatus() as u64;
if wres.st.accept {
crst.dw_st_last = stnow;
@@ -2595,13 +2607,11 @@ impl CaConn {
// Do not go directly into error state: need to at least attempt to close the channel and wait/timeout for reply.
let proto = self.proto.as_mut().ok_or(Error::NoProtocol)?;
let item = CaMsg {
ty: CaMsgTy::ChannelClose(ChannelClose {
sid: st2.channel.sid.0,
cid: st2.channel.cid.0,
}),
ts: tsnow,
};
let ty = CaMsgTy::ChannelClose(ChannelClose {
sid: st2.channel.sid.0,
cid: st2.channel.cid.0,
});
let item = CaMsg::from_ty_ts(ty, tsnow);
proto.push_out(item);
*chst = ChannelState::Closing(ClosingState {
tsbeg: tsnow,
@@ -2775,7 +2785,8 @@ impl CaConn {
Ready(Some(Ok(k))) => {
match k {
CaItem::Msg(camsg) => {
match &camsg.ty {
let (msgcom, ty) = camsg.into_parts();
match &ty {
CaMsgTy::Version => {
if !self.version_seen {
self.version_seen = true;
@@ -2804,7 +2815,7 @@ impl CaConn {
}
}
}
match camsg.ty {
match ty {
CaMsgTy::SearchRes(k) => {
let a = k.addr.to_be_bytes();
let addr = format!("{}.{}.{}.{}:{}", a[0], a[1], a[2], a[3], k.tcp_port);
@@ -2817,15 +2828,15 @@ impl CaConn {
cx.waker().wake_by_ref();
}
CaMsgTy::EventAddRes(ev) => {
trace4!("got EventAddRes {:?} cnt {}", camsg.ts, ev.data_count);
trace4!("got EventAddRes {:?} cnt {}", msgcom.ts(), ev.data_count);
self.mett.event_add_res_recv().inc();
Self::handle_event_add_res(self, ev, tsnow)?
Self::handle_event_add_res(self, ev, tsnow, msgcom.ts())?
}
CaMsgTy::EventAddResEmpty(ev) => {
trace4!("got EventAddResEmpty {:?}", camsg.ts);
trace4!("got EventAddResEmpty {:?}", msgcom.ts());
Self::handle_event_add_res_empty(self, ev, tsnow)?
}
CaMsgTy::ReadNotifyRes(ev) => Self::handle_read_notify_res(self, ev, camsg.ts, tsnow)?,
CaMsgTy::ReadNotifyRes(ev) => Self::handle_read_notify_res(self, ev, tsnow, msgcom.ts())?,
CaMsgTy::Echo => {
if let Some(started) = self.ioc_ping_start {
let dt = started.elapsed();
@@ -2874,11 +2885,11 @@ impl CaConn {
}
self.version_seen = true;
}
CaMsgTy::ChannelCloseRes(x) => {
self.handle_channel_close_res(x, tsnow)?;
CaMsgTy::ChannelCloseRes(ty) => {
self.handle_channel_close_res(ty, tsnow)?;
}
_ => {
warn!("Received unexpected protocol message {:?}", camsg);
warn!("Received unexpected protocol message {:?} {:?}", msgcom, ty);
}
}
}
@@ -3014,6 +3025,7 @@ impl CaConn {
}
fn handle_channel_close_res(&mut self, k: proto::ChannelCloseRes, tsnow: Instant) -> Result<(), Error> {
let _ = tsnow;
debug!("{:?}", k);
Ok(())
}
@@ -3037,6 +3049,11 @@ impl CaConn {
Ready(connect_result) => {
match connect_result {
Ok(Ok(tcp)) => {
let raw_fd = {
use std::os::fd::AsRawFd;
let raw_fd = tcp.as_raw_fd();
raw_fd
};
self.mett.tcp_connected().inc();
let addr = addr.clone();
self.emit_connection_status_item(ConnectionStatusItem {
@@ -3047,6 +3064,7 @@ impl CaConn {
self.backoff_reset();
let proto = CaProto::new(
TcpAsyncWriteRead::from(tcp),
Some(raw_fd),
self.remote_addr_dbg.to_string(),
self.opts.array_truncate,
);
@@ -3128,6 +3146,7 @@ impl CaConn {
}
CaConnState::Shutdown(..) => Ok(Ready(None)),
CaConnState::EndOfStream => Ok(Ready(None)),
CaConnState::MetricsEmitted => Ok(Ready(None)),
};
}
}
@@ -3230,6 +3249,7 @@ impl CaConn {
CaConnState::PeerReady => {}
CaConnState::Shutdown(..) => {}
CaConnState::EndOfStream => {}
CaConnState::MetricsEmitted => {}
}
self.iqdqs.housekeeping();
if self.metrics_emit_last + METRICS_EMIT_IVL <= tsnow {
@@ -3243,8 +3263,29 @@ impl CaConn {
fn metrics_emit(&mut self) {
if let Some(x) = self.proto.as_mut() {
let fionread = if let Some(rawfd) = x.get_raw_socket_fd() {
let mut v = 0;
if unsafe { libc::ioctl(rawfd, libc::FIONREAD, &mut v) } == 0 {
Some(v as u32)
} else {
None
}
} else {
None
};
let mett = x.mett();
mett.metrics_emit().inc();
if let Some(fionread) = fionread {
if fionread > self.fionread_last {
let diff = fionread - self.fionread_last;
self.fionread_last = fionread;
mett.fionread_inc().add(diff);
} else if fionread < self.fionread_last {
let diff = self.fionread_last - fionread;
self.fionread_last = fionread;
mett.fionread_dec().add(diff);
}
}
let m = mett.take_and_reset();
self.mett.proto().ingest(m);
}
@@ -3552,8 +3593,14 @@ impl Stream for CaConn {
let mut have_pending = false;
let mut have_progress = false;
if let CaConnState::EndOfStream = self.state {
if let CaConnState::MetricsEmitted = self.state {
break Ready(None);
} else if let CaConnState::EndOfStream = self.state {
self.mett.metrics_emit_final().inc();
let mett = self.mett.take_and_reset();
self.state = CaConnState::MetricsEmitted;
break Ready(Some(CaConnEvent::new_now(CaConnEventValue::Metrics(mett))));
// break Ready(None);
} else if let Some(item) = self.ca_conn_event_out_queue.pop_front() {
break Ready(Some(item));
}

View File

@@ -2,7 +2,6 @@ use super::conn::EndOfStreamReason;
use super::findioc::FindIocRes;
use crate::ca::conn;
use crate::ca::statemap;
use crate::ca::statemap::CaConnState;
use crate::ca::statemap::MaybeWrongAddressState;
use crate::ca::statemap::WithAddressState;
use crate::conf::CaIngestOpts;
@@ -38,7 +37,6 @@ use scywr::senderpolling::SenderPolling;
use serde::Serialize;
use series::ChannelStatusSeriesId;
use statemap::ActiveChannelState;
use statemap::CaConnStateValue;
use statemap::ChannelState;
use statemap::ChannelStateMap;
use statemap::ChannelStateValue;
@@ -50,8 +48,6 @@ use stats::CaConnSetStats;
use stats::CaConnStats;
use stats::CaProtoStats;
use stats::IocFinderStats;
use stats::rand_xoshiro::Xoshiro128PlusPlus;
use stats::rand_xoshiro::rand_core::RngCore;
use std::collections::BTreeMap;
use std::collections::VecDeque;
use std::fmt;
@@ -76,18 +72,16 @@ use tracing::Instrument;
const CHECK_CHANS_PER_TICK: usize = 10000000;
pub const SEARCH_BATCH_MAX: usize = 64;
pub const CURRENT_SEARCH_PENDING_MAX: usize = SEARCH_BATCH_MAX * 4;
const UNKNOWN_ADDRESS_STAY: Duration = Duration::from_millis(15000);
const NO_ADDRESS_STAY: Duration = Duration::from_millis(20000);
const MAYBE_WRONG_ADDRESS_STAY: Duration = Duration::from_millis(4000);
const SEARCH_PENDING_TIMEOUT: Duration = Duration::from_millis(30000);
const CHANNEL_HEALTH_TIMEOUT: Duration = Duration::from_millis(30000);
const CHANNEL_UNASSIGNED_TIMEOUT: Duration = Duration::from_millis(0);
const UNASSIGN_FOR_CONFIG_CHANGE_TIMEOUT: Duration = Duration::from_millis(1000 * 10);
const CHANNEL_MAX_WITHOUT_HEALTH_UPDATE: usize = 3000000;
macro_rules! trace2 { ($($arg:tt)*) => { if false { trace!($($arg)*); } }; }
macro_rules! trace2 { ($($arg:expr),*) => ( if false { trace!($($arg),*); } ); }
macro_rules! trace3 { ($($arg:tt)*) => { if false { trace!($($arg)*); } }; }
macro_rules! trace3 { ($($arg:expr),*) => ( if false { trace!($($arg),*); } ); }
macro_rules! trace4 { ($($arg:tt)*) => { if false { trace!($($arg)*); } }; }
@@ -139,7 +133,6 @@ impl From<Error> for ::err::Error {
pub struct CmdId(SocketAddrV4, usize);
pub struct CaConnRes {
state: CaConnState,
sender: Pin<Box<SenderPolling<ConnCommand>>>,
stats: Arc<CaConnStats>,
cmd_queue: VecDeque<ConnCommand>,
@@ -265,11 +258,8 @@ pub struct CaConnSetCtrl {
rx: Receiver<CaConnSetItem>,
stats: Arc<CaConnSetStats>,
ca_conn_stats: Arc<CaConnStats>,
ca_proto_stats: Arc<CaProtoStats>,
ioc_finder_stats: Arc<IocFinderStats>,
jh: JoinHandle<Result<(), Error>>,
rng: Xoshiro128PlusPlus,
idcnt: u32,
}
impl CaConnSetCtrl {
@@ -335,19 +325,9 @@ impl CaConnSetCtrl {
&self.ca_conn_stats
}
pub fn ca_proto_stats(&self) -> &Arc<CaProtoStats> {
&self.ca_proto_stats
}
pub fn ioc_finder_stats(&self) -> &Arc<IocFinderStats> {
&self.ioc_finder_stats
}
fn make_id(&mut self) -> u32 {
let id = self.idcnt;
self.idcnt += 1;
self.rng.next_u32() & 0xffff | (id << 16)
}
}
#[derive(Debug)]
@@ -452,11 +432,8 @@ pub struct CaConnSet {
ca_conn_stats: Arc<CaConnStats>,
ioc_finder_jh: JoinHandle<Result<(), crate::ca::finder::Error>>,
await_ca_conn_jhs: VecDeque<(SocketAddr, JoinHandle<Result<(), Error>>)>,
thr_msg_poll_1: ThrottleTrace,
thr_msg_storage_len: ThrottleTrace,
ca_proto_stats: Arc<CaProtoStats>,
rogue_channel_count: u64,
connect_fail_count: usize,
cssid_latency_max: Duration,
ca_connset_metrics: stats::mett::CaConnSetMetrics,
}
@@ -526,11 +503,8 @@ impl CaConnSet {
// connset_out_sender: SenderPolling::new(connset_out_tx),
ioc_finder_jh,
await_ca_conn_jhs: VecDeque::new(),
thr_msg_poll_1: ThrottleTrace::new(Duration::from_millis(2000)),
thr_msg_storage_len: ThrottleTrace::new(Duration::from_millis(1000)),
ca_proto_stats: ca_proto_stats.clone(),
rogue_channel_count: 0,
connect_fail_count: 0,
cssid_latency_max: Duration::from_millis(2000),
ca_connset_metrics: stats::mett::CaConnSetMetrics::new(),
};
@@ -541,11 +515,8 @@ impl CaConnSet {
rx: connset_out_rx,
stats,
ca_conn_stats,
ca_proto_stats,
ioc_finder_stats,
jh,
idcnt: 0,
rng: stats::xoshiro_from_time(),
}
}
@@ -1249,7 +1220,6 @@ impl CaConnSet {
fn handle_ca_conn_channel_removed(&mut self, addr: SocketAddr, name: String) -> Result<(), Error> {
debug!("handle_ca_conn_channel_removed {addr} {name}");
let stnow = SystemTime::now();
let name = ChannelName::new(name);
if let Some(st1) = self.channel_states.get_mut(&name) {
match &mut st1.value {
@@ -1368,7 +1338,6 @@ impl CaConnSet {
.clone()
.ok_or_else(|| Error::MissingChannelInfoChannelTx)?,
self.ca_conn_stats.clone(),
self.ca_proto_stats.clone(),
);
let conn_tx = conn.conn_command_tx();
let conn_stats = conn.stats();
@@ -1387,7 +1356,6 @@ impl CaConnSet {
let fut = fut.instrument(logspan);
let jh = tokio::spawn(fut);
let ca_conn_res = CaConnRes {
state: CaConnState::new(CaConnStateValue::Fresh),
sender: Box::pin(conn_tx.into()),
stats: conn_stats,
cmd_queue: VecDeque::new(),
@@ -1470,80 +1438,6 @@ impl CaConnSet {
}
}
async fn wait_stopped(&self) -> Result<(), Error> {
warn!("Lock for wait_stopped");
// let mut g = self.ca_conn_ress.lock().await;
// let mm = std::mem::replace(&mut *g, BTreeMap::new());
let mm: BTreeMap<SocketAddrV4, JoinHandle<Result<(), Error>>> = BTreeMap::new();
let mut jhs: VecDeque<_> = VecDeque::new();
for t in mm {
jhs.push_back(t.1.fuse());
}
loop {
let mut jh = if let Some(x) = jhs.pop_front() {
x
} else {
break;
};
futures_util::select! {
a = jh => match a {
Ok(k) => match k {
Ok(_) => {}
Err(e) => {
error!("{e:?}");
}
},
Err(e) => {
error!("{e:?}");
}
},
_b = crate::rt::sleep(Duration::from_millis(1000)).fuse() => {
jhs.push_back(jh);
info!("waiting for {} connections", jhs.len());
}
};
}
Ok(())
}
fn check_connection_states(&mut self) -> Result<(), Error> {
let tsnow = Instant::now();
for (addr, val) in &mut self.ca_conn_ress {
let state = &mut val.state;
let v = &mut state.value;
match v {
CaConnStateValue::Fresh => {
// TODO check for delta t since last issued status command.
if tsnow.duration_since(state.last_feedback) > Duration::from_millis(20000) {
error!("TODO Fresh timeout send connection-close for {addr}");
// TODO collect in metrics
// self.stats.ca_conn_status_feedback_timeout.inc();
// TODO send shutdown to this CaConn, check that we've received
// a 'shutdown' state from it. (see below)
*v = CaConnStateValue::Shutdown { since: tsnow };
}
}
CaConnStateValue::HadFeedback => {
// TODO check for delta t since last issued status command.
if tsnow.duration_since(state.last_feedback) > Duration::from_millis(20000) {
error!("TODO HadFeedback timeout send connection-close for {addr}");
// TODO collect in metrics
// self.stats.ca_conn_status_feedback_timeout.inc();
*v = CaConnStateValue::Shutdown { since: tsnow };
}
}
CaConnStateValue::Shutdown { since } => {
if tsnow.saturating_duration_since(*since) > Duration::from_millis(10000) {
// TODO collect in metrics as severe error, this would be a bug.
// self.stats.critical_error.inc();
error!("Shutdown of CaConn failed for {addr}");
}
}
}
}
Ok(())
}
fn check_channel_states(&mut self, tsnow: Instant, stnow: SystemTime) -> Result<(), Error> {
let (mut search_pending_count, mut assigned_without_health_update) = self.update_channel_state_counts();
let mut cmd_remove_channel = Vec::new();
@@ -1555,6 +1449,7 @@ impl CaConnSet {
} else {
self.channel_states.range_mut(..)
};
#[allow(unused)]
let mut st_qu_2 = VecDeque::new();
let mut lt_qu_2 = VecDeque::new();
for (i, (ch, st)) in it.enumerate() {

View File

@@ -41,6 +41,10 @@ impl InputMerge {
}
}
fn todoval<T>() -> T {
todo!()
}
impl Stream for InputMerge {
type Item = CaConnSetEvent;
@@ -50,7 +54,7 @@ impl Stream for InputMerge {
let mut selfp = self.as_mut().project();
if let Some(inp) = selfp.inp3.as_mut().as_pin_mut() {
match inp.poll_next(cx) {
Ready(Some(x)) => Some(CaConnSetEvent::ConnSetCmd(todo!())),
Ready(Some(_)) => Some(CaConnSetEvent::ConnSetCmd(todoval())),
Ready(None) => {
unsafe {
// TODO what guarantees that I can drop the content here like this?
@@ -70,7 +74,7 @@ impl Stream for InputMerge {
let mut selfp = self.as_mut().project();
if let Some(inp) = selfp.inp2.as_mut().as_pin_mut() {
match inp.poll_next(cx) {
Ready(Some(x)) => Some(CaConnSetEvent::ConnSetCmd(todo!())),
Ready(Some(_)) => Some(CaConnSetEvent::ConnSetCmd(todoval())),
Ready(None) => {
unsafe {
// TODO what guarantees that I can drop the content here like this?

View File

@@ -40,7 +40,7 @@ fn transform_pgres(rows: Vec<PgRow>) -> VecDeque<FindIocRes> {
let n: Result<i32, _> = row.try_get(0);
let ch: Result<String, _> = row.try_get(1);
match (n, ch) {
(Ok(n), Ok(ch)) => {
(Ok(_n), Ok(ch)) => {
if let Some(addr) = row.get::<_, Option<String>>(3) {
let addr = addr.parse().map_or(None, |x| Some(x));
let item = FindIocRes {

File diff suppressed because it is too large Load Diff

View File

@@ -24,21 +24,6 @@ pub enum CaConnStateValue {
Shutdown { since: Instant },
}
#[derive(Debug)]
pub struct CaConnState {
pub last_feedback: Instant,
pub value: CaConnStateValue,
}
impl CaConnState {
pub fn new(value: CaConnStateValue) -> Self {
Self {
last_feedback: Instant::now(),
value,
}
}
}
#[derive(Debug, Clone, Serialize)]
pub enum ConnectionStateValue {
Unknown,
@@ -243,6 +228,8 @@ impl ChannelStateMap {
}
pub fn insert(&mut self, k: ChannelName, v: ChannelState) -> Option<ChannelState> {
let _ = &self.map2;
let _ = &self.map3;
self.map.insert(k, v)
}

View File

@@ -23,8 +23,6 @@ pub struct CaIngestOpts {
search: Vec<String>,
#[serde(default)]
search_blacklist: Vec<String>,
whitelist: Option<String>,
blacklist: Option<String>,
#[allow(unused)]
#[serde(default, with = "humantime_serde")]
timeout: Option<Duration>,
@@ -424,6 +422,7 @@ mod serde_replication_bool {
use serde::de;
use std::fmt;
#[allow(unused)]
pub fn serialize<S>(v: &bool, ser: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
@@ -488,6 +487,7 @@ mod serde_option_channel_read_config {
use std::fmt;
use std::time::Duration;
#[allow(unused)]
pub fn serialize<S>(v: &Option<ChannelReadConfig>, ser: S) -> Result<S::Ok, S::Error>
where
S: Serializer,

View File

@@ -135,7 +135,6 @@ pub struct StatsSet {
daemon: Arc<DaemonStats>,
ca_conn_set: Arc<CaConnSetStats>,
ca_conn: Arc<CaConnStats>,
ca_proto: Arc<CaProtoStats>,
insert_worker_stats: Arc<InsertWorkerStats>,
series_by_channel_stats: Arc<SeriesByChannelStats>,
ioc_finder_stats: Arc<IocFinderStats>,
@@ -147,7 +146,6 @@ impl StatsSet {
daemon: Arc<DaemonStats>,
ca_conn_set: Arc<CaConnSetStats>,
ca_conn: Arc<CaConnStats>,
ca_proto: Arc<CaProtoStats>,
insert_worker_stats: Arc<InsertWorkerStats>,
series_by_channel_stats: Arc<SeriesByChannelStats>,
ioc_finder_stats: Arc<IocFinderStats>,
@@ -157,7 +155,6 @@ impl StatsSet {
daemon,
ca_conn_set,
ca_conn,
ca_proto,
insert_worker_stats,
series_by_channel_stats,
ioc_finder_stats,
@@ -366,9 +363,8 @@ fn metrics(stats_set: &StatsSet) -> String {
let s3 = stats_set.insert_worker_stats.prometheus();
let s4 = stats_set.ca_conn.prometheus();
let s5 = stats_set.series_by_channel_stats.prometheus();
let s6 = stats_set.ca_proto.prometheus();
let s7 = stats_set.ioc_finder_stats.prometheus();
[s1, s2, s3, s4, s5, s6, s7].join("")
[s1, s2, s3, s4, s5, s7].join("")
}
pub struct RoutesResources {

View File

@@ -1,47 +0,0 @@
use crate::access::Error;
use crate::session::ScySession;
use futures_util::Future;
use futures_util::FutureExt;
use scylla::QueryResult;
use scylla::frame::value::ValueList;
use scylla::prepared_statement::PreparedStatement;
use scylla::transport::errors::QueryError;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
pub struct ScyQueryFut<'a> {
fut: Pin<Box<dyn Future<Output = Result<QueryResult, QueryError>> + Send + 'a>>,
}
impl<'a> ScyQueryFut<'a> {
pub fn new<V>(scy: &'a ScySession, query: Option<&'a PreparedStatement>, values: V) -> Self
where
V: ValueList + Send + 'static,
{
let _ = scy;
let _ = query;
let _ = values;
if true {
todo!("ScyQueryFut")
};
//let fut = scy.execute(query, values);
let fut = futures_util::future::ready(Err(QueryError::TimeoutError));
Self { fut: Box::pin(fut) }
}
}
impl<'a> Future for ScyQueryFut<'a> {
type Output = Result<(), Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
use Poll::*;
match self.fut.poll_unpin(cx) {
Ready(k) => match k {
Ok(_) => Ready(Ok(())),
Err(e) => Ready(Err(e.into())),
},
Pending => Pending,
}
}
}

View File

@@ -2,12 +2,11 @@ use crate::access::Error;
use crate::session::ScySession;
use futures_util::Future;
use futures_util::FutureExt;
use netpod::log::*;
use scylla::frame::value::ValueList;
use netpod::log::error;
use scylla::QueryResult;
use scylla::prepared_statement::PreparedStatement;
use scylla::serialize::row::SerializeRow;
use scylla::transport::errors::QueryError;
use scylla::QueryResult;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
@@ -25,7 +24,7 @@ impl<'a> ScyInsertFut<'a> {
pub fn new<V>(scy: &'a ScySession, query: &'a PreparedStatement, values: V) -> Self
where
V: ValueList + SerializeRow + Send + 'static,
V: SerializeRow + Send + 'static,
{
let fut = scy.execute_unpaged(query, values);
let fut = Box::pin(fut) as _;

View File

@@ -1,108 +0,0 @@
use crate::access::Error;
use crate::session::ScySession;
use futures_util::Future;
use futures_util::FutureExt;
use netpod::log::*;
use scylla::frame::value::ValueList;
use scylla::prepared_statement::PreparedStatement;
use scylla::transport::errors::QueryError;
use scylla::QueryResult;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use std::time::Instant;
pub struct InsertLoopFut<'a> {
futs: Vec<Pin<Box<dyn Future<Output = Result<QueryResult, QueryError>> + Send + 'a>>>,
fut_ix: usize,
polled: usize,
ts_create: Instant,
ts_poll_start: Instant,
}
impl<'a> InsertLoopFut<'a> {
pub fn new<V>(scy: &'a ScySession, query: Option<&'a PreparedStatement>, values: Vec<V>, skip_insert: bool) -> Self
where
V: ValueList + Send + Sync + 'static,
{
let _ = scy;
let _ = query;
let mut values = values;
if skip_insert {
values.clear();
}
// TODO
// Can I store the values in some better generic form?
// Or is it acceptable to generate all insert futures right here and poll them later?
let futs: Vec<_> = values
.into_iter()
.map(|_vs| {
if true {
todo!("InsertLoopFut")
};
//let fut = scy.execute(query, vs);
let fut = futures_util::future::ready(Err(QueryError::TimeoutError));
Box::pin(fut) as _
})
.collect();
let tsnow = Instant::now();
Self {
futs,
fut_ix: 0,
polled: 0,
ts_create: tsnow,
ts_poll_start: tsnow,
}
}
}
impl<'a> Future for InsertLoopFut<'a> {
type Output = Result<(), Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
use Poll::*;
if self.polled == 0 {
self.ts_poll_start = Instant::now();
}
self.polled += 1;
if self.futs.is_empty() {
return Ready(Ok(()));
}
loop {
let fut_ix = self.fut_ix;
break match self.futs[fut_ix].poll_unpin(cx) {
Ready(k) => match k {
Ok(_) => {
self.fut_ix += 1;
if self.fut_ix >= self.futs.len() {
if false {
let tsnow = Instant::now();
let dt_created = tsnow.duration_since(self.ts_create).as_secs_f32() * 1e3;
let dt_polled = tsnow.duration_since(self.ts_poll_start).as_secs_f32() * 1e3;
info!(
"InsertLoopFut polled {} dt_created {:6.2} ms dt_polled {:6.2} ms",
self.polled, dt_created, dt_polled
);
}
continue;
} else {
Ready(Ok(()))
}
}
Err(e) => {
let tsnow = Instant::now();
let dt_created = tsnow.duration_since(self.ts_create).as_secs_f32() * 1e3;
let dt_polled = tsnow.duration_since(self.ts_poll_start).as_secs_f32() * 1e3;
warn!(
"InsertLoopFut polled {} dt_created {:6.2} ms dt_polled {:6.2} ms",
self.polled, dt_created, dt_polled
);
warn!("InsertLoopFut done Err {e:?}");
Ready(Err(e.into()))
}
},
Pending => Pending,
};
}
}
}

View File

@@ -13,38 +13,38 @@ use crate::store::DataStore;
use async_channel::Receiver;
use async_channel::Sender;
use atomic::AtomicU64;
use futures_util::FutureExt;
use futures_util::Stream;
use futures_util::StreamExt;
use log::*;
use log;
use netpod::ttl::RetentionTime;
use smallvec::SmallVec;
use smallvec::smallvec;
use stats::InsertWorkerStats;
use std::collections::VecDeque;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic;
use std::task::Context;
use std::task::Poll;
use std::time::Duration;
use std::time::Instant;
use taskrun::tokio;
use tokio::task::JoinHandle;
macro_rules! trace2 {
($($arg:tt)*) => {
if false {
trace!($($arg)*);
}
};
}
macro_rules! error { ($($arg:expr),*) => ( if true { log::error!($($arg),*); } ); }
macro_rules! trace_item_execute {
($($arg:tt)*) => {
if false {
trace!($($arg)*);
}
};
}
macro_rules! warn { ($($arg:expr),*) => ( if true { log::warn!($($arg),*); } ); }
macro_rules! debug_setup { ($($arg:expr),*) => ( if false { debug!($($arg),*); } ); }
macro_rules! trace2 { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ); }
macro_rules! trace_transform { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ); }
macro_rules! trace_inspect { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ); }
macro_rules! trace_item_execute { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ); }
macro_rules! debug_setup { ($($arg:expr),*) => ( if false { log::debug!($($arg),*); } ); }
autoerr::create_error_v1!(
name(Error, "ScyllaInsertWorker"),
@@ -67,7 +67,7 @@ fn stats_inc_for_err(stats: &stats::InsertWorkerStats, err: &crate::iteminsertqu
}
Error::DbError(_) => {
if true {
warn!("db error {err}");
warn!("db error {}", err);
}
stats.db_error().inc();
}
@@ -180,6 +180,46 @@ pub async fn spawn_scylla_insert_workers_dummy(
Ok(jhs)
}
struct FutTrackDt<F> {
ts1: Instant,
ts2: Instant,
ts_net: Instant,
poll1: bool,
fut: F,
}
impl FutTrackDt<InsertFut> {
fn from_fut_job(job: FutJob) -> Self {
let tsnow = Instant::now();
Self {
ts1: tsnow,
ts2: tsnow,
ts_net: job.ts_net,
poll1: false,
fut: job.fut,
}
}
}
impl<F> Future for FutTrackDt<F>
where
F: Future + Unpin,
{
type Output = (Instant, Instant, Instant, F::Output);
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
use Poll::*;
if self.poll1 == false {
self.poll1 = true;
self.ts2 = Instant::now();
}
match self.as_mut().fut.poll_unpin(cx) {
Ready(x) => Ready((self.ts_net, self.ts1, self.ts2, x)),
Pending => Pending,
}
}
}
async fn worker_streamed(
worker_ix: usize,
concurrency: usize,
@@ -208,16 +248,22 @@ async fn worker_streamed(
let stream = stream
.map(|x| futures_util::stream::iter(x))
.flatten_unordered(Some(1))
.map(|x| FutTrackDt::from_fut_job(x))
.buffer_unordered(concurrency);
let mut stream = Box::pin(stream);
debug_setup!("waiting for item");
while let Some(item) = stream.next().await {
while let Some((ts_net, ts1, ts2, item)) = stream.next().await {
trace_item_execute!("see item");
let tsnow = Instant::now();
match item {
Ok(_) => {
mett.job_ok().inc();
// TODO compute the insert latency bin and count.
let dt1 = tsnow.saturating_duration_since(ts1);
let dt2 = tsnow.saturating_duration_since(ts2);
let dt_net = tsnow.saturating_duration_since(ts_net);
mett.job_dt1().push_dur_100us(dt1);
mett.job_dt2().push_dur_100us(dt2);
mett.job_dt_net().push_dur_100us(dt_net);
}
Err(e) => {
use scylla::transport::errors::QueryError;
@@ -261,21 +307,26 @@ async fn worker_streamed(
Ok(())
}
struct FutJob {
fut: InsertFut,
ts_net: Instant,
}
fn transform_to_db_futures<S>(
item_inp: S,
data_store: Arc<DataStore>,
ignore_writes: bool,
stats: Arc<InsertWorkerStats>,
) -> impl Stream<Item = Vec<InsertFut>>
) -> impl Stream<Item = Vec<FutJob>>
where
S: Stream<Item = VecDeque<QueryItem>>,
{
trace!("transform_to_db_futures begin");
trace_transform!("transform_to_db_futures begin");
// TODO possible without box?
// let item_inp = Box::pin(item_inp);
item_inp.map(move |batch| {
stats.item_recv.inc();
trace!("transform_to_db_futures have batch len {}", batch.len());
trace_transform!("transform_to_db_futures have batch len {}", batch.len());
let tsnow = Instant::now();
let mut res = Vec::with_capacity(32);
for item in batch {
@@ -284,46 +335,46 @@ where
if ignore_writes {
SmallVec::new()
} else {
prepare_query_insert_futs(item, &data_store, &stats, tsnow)
prepare_query_insert_futs(item, &data_store)
}
}
QueryItem::Msp(item) => {
if ignore_writes {
SmallVec::new()
} else {
prepare_msp_insert_futs(item, &data_store, &stats, tsnow)
prepare_msp_insert_futs(item, &data_store)
}
}
QueryItem::TimeBinSimpleF32V02(item) => {
if ignore_writes {
SmallVec::new()
} else {
prepare_timebin_v02_insert_futs(item, &data_store, &stats, tsnow)
prepare_timebin_v02_insert_futs(item, &data_store, tsnow)
}
}
QueryItem::BinWriteIndexV03(item) => {
if ignore_writes {
SmallVec::new()
} else {
prepare_bin_write_index_v03_insert_futs(item, &data_store, &stats, tsnow)
prepare_bin_write_index_v03_insert_futs(item, &data_store, tsnow)
}
}
QueryItem::Accounting(item) => {
if ignore_writes {
SmallVec::new()
} else {
prepare_accounting_insert_futs(item, &data_store, &stats, tsnow)
prepare_accounting_insert_futs(item, &data_store, tsnow)
}
}
QueryItem::AccountingRecv(item) => {
if ignore_writes {
SmallVec::new()
} else {
prepare_accounting_recv_insert_futs(item, &data_store, &stats, tsnow)
prepare_accounting_recv_insert_futs(item, &data_store, tsnow)
}
}
};
trace!("prepared futs len {}", futs.len());
trace_transform!("prepared futs len {}", futs.len());
res.extend(futs.into_iter());
}
res
@@ -334,7 +385,7 @@ fn inspect_items(
item_inp: Receiver<VecDeque<QueryItem>>,
worker_name: String,
) -> impl Stream<Item = VecDeque<QueryItem>> {
trace!("transform_to_db_futures begin");
trace_inspect!("transform_to_db_futures begin");
// TODO possible without box?
// let item_inp = Box::pin(item_inp);
item_inp.inspect(move |batch| {
@@ -363,44 +414,30 @@ fn inspect_items(
})
}
fn prepare_msp_insert_futs(
item: MspItem,
data_store: &Arc<DataStore>,
stats: &Arc<InsertWorkerStats>,
tsnow: Instant,
) -> SmallVec<[InsertFut; 4]> {
fn prepare_msp_insert_futs(item: MspItem, data_store: &Arc<DataStore>) -> SmallVec<[FutJob; 4]> {
trace2!("execute MSP bump");
stats.inserts_msp().inc();
{
let dt = tsnow.saturating_duration_since(item.ts_net());
let dt_ms = 1000 * dt.as_secs() as u32 + dt.subsec_millis();
stats.item_lat_net_worker().ingest(dt_ms);
}
let fut = insert_msp_fut(
item.series(),
item.ts_msp(),
item.ts_net(),
data_store.scy.clone(),
data_store.qu_insert_ts_msp.clone(),
stats.clone(),
);
let fut = FutJob {
fut,
ts_net: item.ts_net(),
};
let futs = smallvec![fut];
futs
}
fn prepare_query_insert_futs(
item: InsertItem,
data_store: &Arc<DataStore>,
stats: &Arc<InsertWorkerStats>,
tsnow: Instant,
) -> SmallVec<[InsertFut; 4]> {
stats.inserts_value().inc();
fn prepare_query_insert_futs(item: InsertItem, data_store: &Arc<DataStore>) -> SmallVec<[FutJob; 4]> {
let item_ts_net = item.ts_net;
let dt = tsnow.saturating_duration_since(item_ts_net);
let dt_ms = 1000 * dt.as_secs() as u32 + dt.subsec_millis();
stats.item_lat_net_worker().ingest(dt_ms);
let do_insert = true;
let fut = insert_item_fut(item, &data_store, do_insert, stats);
let fut = insert_item_fut(item, &data_store, do_insert);
let fut = FutJob {
fut,
ts_net: item_ts_net,
};
let futs = smallvec![fut];
futs
}
@@ -408,9 +445,8 @@ fn prepare_query_insert_futs(
fn prepare_timebin_v02_insert_futs(
item: TimeBinSimpleF32V02,
data_store: &Arc<DataStore>,
stats: &Arc<InsertWorkerStats>,
tsnow: Instant,
) -> SmallVec<[InsertFut; 4]> {
) -> SmallVec<[FutJob; 4]> {
let params = (
item.series.id() as i64,
item.binlen,
@@ -423,15 +459,12 @@ fn prepare_timebin_v02_insert_futs(
item.dev,
item.lst,
);
// TODO would be better to count inserts only on completed insert
stats.inserted_binned().inc();
let fut = InsertFut::new(
data_store.scy.clone(),
data_store.qu_insert_binned_scalar_f32_v02.clone(),
params,
tsnow,
stats.clone(),
);
let fut = FutJob { fut, ts_net: tsnow };
let futs = smallvec![fut];
// TODO match on the query result:
@@ -451,19 +484,15 @@ fn prepare_timebin_v02_insert_futs(
fn prepare_bin_write_index_v03_insert_futs(
item: BinWriteIndexV03,
data_store: &Arc<DataStore>,
stats: &Arc<InsertWorkerStats>,
tsnow: Instant,
) -> SmallVec<[InsertFut; 4]> {
) -> SmallVec<[FutJob; 4]> {
let params = (item.series, item.pbp, item.msp, item.rt, item.lsp, item.binlen);
// TODO would be better to count inserts only on completed insert
stats.inserted_binned().inc();
let fut = InsertFut::new(
data_store.scy.clone(),
data_store.qu_insert_bin_write_index_v03.clone(),
params,
tsnow,
stats.clone(),
);
let fut = FutJob { fut, ts_net: tsnow };
let futs = smallvec![fut];
// TODO match on the query result:
@@ -483,9 +512,8 @@ fn prepare_bin_write_index_v03_insert_futs(
fn prepare_accounting_insert_futs(
item: Accounting,
data_store: &Arc<DataStore>,
stats: &Arc<InsertWorkerStats>,
tsnow: Instant,
) -> SmallVec<[InsertFut; 4]> {
) -> SmallVec<[FutJob; 4]> {
let params = (
item.part,
item.ts.sec() as i64,
@@ -493,13 +521,8 @@ fn prepare_accounting_insert_futs(
item.count,
item.bytes,
);
let fut = InsertFut::new(
data_store.scy.clone(),
data_store.qu_account_00.clone(),
params,
tsnow,
stats.clone(),
);
let fut = InsertFut::new(data_store.scy.clone(), data_store.qu_account_00.clone(), params);
let fut = FutJob { fut, ts_net: tsnow };
let futs = smallvec![fut];
futs
}
@@ -507,9 +530,8 @@ fn prepare_accounting_insert_futs(
fn prepare_accounting_recv_insert_futs(
item: AccountingRecv,
data_store: &Arc<DataStore>,
stats: &Arc<InsertWorkerStats>,
tsnow: Instant,
) -> SmallVec<[InsertFut; 4]> {
) -> SmallVec<[FutJob; 4]> {
let params = (
item.part,
item.ts.sec() as i64,
@@ -517,13 +539,8 @@ fn prepare_accounting_recv_insert_futs(
item.count,
item.bytes,
);
let fut = InsertFut::new(
data_store.scy.clone(),
data_store.qu_account_recv_00.clone(),
params,
tsnow,
stats.clone(),
);
let fut = InsertFut::new(data_store.scy.clone(), data_store.qu_account_recv_00.clone(), params);
let fut = FutJob { fut, ts_net: tsnow };
let futs = smallvec![fut];
futs
}

View File

@@ -12,8 +12,6 @@ use netpod::TsNano;
use netpod::channelstatus::ChannelStatus;
use netpod::channelstatus::ChannelStatusClosedReason;
use scylla::QueryResult;
use scylla::frame::value::Value;
use scylla::frame::value::ValueList;
use scylla::prepared_statement::PreparedStatement;
use scylla::serialize::row::SerializeRow;
use scylla::serialize::value::SerializeValue;
@@ -21,8 +19,6 @@ use scylla::transport::errors::DbError;
use scylla::transport::errors::QueryError;
use series::ChannelStatusSeriesId;
use series::SeriesId;
use series::msp::PrebinnedPartitioning;
use stats::InsertWorkerStats;
use std::net::SocketAddrV4;
use std::pin::Pin;
use std::ptr::NonNull;
@@ -593,18 +589,16 @@ struct InsParCom {
series: SeriesId,
ts_msp: TsMs,
ts_lsp: DtNano,
ts_net: Instant,
#[allow(unused)]
do_insert: bool,
stats: Arc<InsertWorkerStats>,
}
fn insert_scalar_gen_fut<ST>(par: InsParCom, val: ST, qu: Arc<PreparedStatement>, scy: Arc<ScySession>) -> InsertFut
where
ST: Value + SerializeValue + Send + 'static,
ST: SerializeValue + Send + 'static,
{
let params = (par.series.to_i64(), par.ts_msp.to_i64(), par.ts_lsp.to_i64(), val);
InsertFut::new(scy, qu, params, par.ts_net, par.stats)
InsertFut::new(scy, qu, params)
}
fn insert_scalar_enum_gen_fut<ST1, ST2>(
@@ -615,8 +609,8 @@ fn insert_scalar_enum_gen_fut<ST1, ST2>(
scy: Arc<ScySession>,
) -> InsertFut
where
ST1: Value + SerializeValue + Send + 'static,
ST2: Value + SerializeValue + Send + 'static,
ST1: SerializeValue + Send + 'static,
ST2: SerializeValue + Send + 'static,
{
let params = (
par.series.to_i64(),
@@ -625,13 +619,12 @@ where
val,
valstr,
);
InsertFut::new(scy, qu, params, par.ts_net, par.stats)
InsertFut::new(scy, qu, params)
}
// val: Vec<ST> where ST: Value + SerializeValue + Send + 'static,
fn insert_array_gen_fut(par: InsParCom, val: Vec<u8>, qu: Arc<PreparedStatement>, scy: Arc<ScySession>) -> InsertFut {
let params = (par.series.to_i64(), par.ts_msp.to_i64(), par.ts_lsp.to_i64(), val);
InsertFut::new(scy, qu, params, par.ts_net, par.stats)
InsertFut::new(scy, qu, params)
}
#[pin_project::pin_project]
@@ -646,26 +639,13 @@ pub struct InsertFut {
}
impl InsertFut {
pub fn new<V: ValueList + SerializeRow + Send + 'static>(
scy: Arc<ScySession>,
qu: Arc<PreparedStatement>,
params: V,
// timestamp when we first encountered the data to-be inserted, for metrics
tsnet: Instant,
stats: Arc<InsertWorkerStats>,
) -> Self {
pub fn new<V: SerializeRow + Send + 'static>(scy: Arc<ScySession>, qu: Arc<PreparedStatement>, params: V) -> Self {
let scy_ref = unsafe { NonNull::from(scy.as_ref()).as_ref() };
let qu_ref = unsafe { NonNull::from(qu.as_ref()).as_ref() };
let fut = scy_ref.execute_unpaged(qu_ref, params);
let fut = fut.map(move |x| {
let dt = tsnet.elapsed();
let dt_ms = 1000 * dt.as_secs() as u32 + dt.subsec_millis();
stats.item_lat_net_store().ingest(dt_ms);
x
});
let fut = taskrun::tokio::task::unconstrained(fut);
let fut = Box::pin(fut);
// let fut = StackFuture::from(fut);
// let _ff = StackFuture::from(fut);
Self { scy, qu, fut }
}
@@ -687,25 +667,12 @@ impl Future for InsertFut {
}
}
pub fn insert_msp_fut(
series: SeriesId,
ts_msp: TsMs,
// for stats, the timestamp when we received that data
tsnet: Instant,
scy: Arc<ScySession>,
qu: Arc<PreparedStatement>,
stats: Arc<InsertWorkerStats>,
) -> InsertFut {
pub fn insert_msp_fut(series: SeriesId, ts_msp: TsMs, scy: Arc<ScySession>, qu: Arc<PreparedStatement>) -> InsertFut {
let params = (series.to_i64(), ts_msp.to_i64());
InsertFut::new(scy, qu, params, tsnet, stats)
InsertFut::new(scy, qu, params)
}
pub fn insert_item_fut(
item: InsertItem,
data_store: &DataStore,
do_insert: bool,
stats: &Arc<InsertWorkerStats>,
) -> InsertFut {
pub fn insert_item_fut(item: InsertItem, data_store: &DataStore, do_insert: bool) -> InsertFut {
let scy = data_store.scy.clone();
use DataValue::*;
match item.val {
@@ -714,9 +681,7 @@ pub fn insert_item_fut(
series: item.series,
ts_msp: item.ts_msp,
ts_lsp: item.ts_lsp,
ts_net: item.ts_net,
do_insert,
stats: stats.clone(),
};
use ScalarValue::*;
match val {
@@ -742,9 +707,7 @@ pub fn insert_item_fut(
series: item.series,
ts_msp: item.ts_msp,
ts_lsp: item.ts_lsp,
ts_net: item.ts_net,
do_insert,
stats: stats.clone(),
};
use ArrayValue::*;
let blob = val.to_binary_blob();

View File

@@ -1,10 +1,8 @@
pub mod access;
pub mod config;
pub mod fut;
pub mod futbatch;
pub mod futbatchgen;
pub mod futinsert;
pub mod futinsertloop;
pub mod insertqueues;
pub mod insertworker;
pub mod iteminsertqueue;

View File

@@ -1,7 +1,6 @@
#[allow(non_snake_case)]
pub mod serde_dummy {
use serde::Serializer;
use std::time::Instant;
#[allow(unused)]
pub fn serialize<S, T>(val: &T, ser: S) -> Result<S::Ok, S::Error>

View File

@@ -198,7 +198,6 @@ where
res_lt = Self::write_inner(&mut self.state_lt, item.clone(), ts_net, tsev, &mut iqdqs.lt_rf3_qu)?;
res_mt = Self::write_inner(&mut self.state_mt, item.clone(), ts_net, tsev, &mut iqdqs.mt_rf3_qu)?;
res_st = if self.do_st_rf1 {
// Self::write_inner(&mut self.state_st, item.clone(), ts_net, tsev, &mut iqdqs.st_rf1_qu)?
Self::write_inner(&mut self.state_st, item.clone(), ts_net, tsev, &mut iqdqs.st_rf3_qu)?
} else {
Self::write_inner(&mut self.state_st, item.clone(), ts_net, tsev, &mut iqdqs.st_rf3_qu)?

View File

@@ -13,5 +13,5 @@ rand_xoshiro = "0.6.0"
stats_types = { path = "../stats_types" }
stats_proc = { path = "../stats_proc" }
log = { path = "../log" }
mettrics = { version = "0.0.6", path = "../../mettrics" }
mettrics = { version = "0.0.7", path = "../../mettrics" }
ca_proto = { path = "../../daqbuf-ca-proto", package = "daqbuf-ca-proto" }

View File

@@ -5,12 +5,18 @@ mod Metrics {
job_ok,
job_err,
}
enum histolog2s {
job_dt1,
job_dt2,
job_dt_net,
}
}
mod Metrics {
type StructName = CaConnMetrics;
enum counters {
metrics_emit,
metrics_emit_final,
ioid_read_begin,
ioid_read_done,
ioid_read_timeout,
@@ -91,9 +97,17 @@ mod Metrics {
type Name = scy_inswork;
}
enum counters {
proc_cpu_v0_inc,
proc_cpu_v0_dec,
proc_mem_rss_inc,
proc_mem_rss_dec,
handle_event,
caconnset_health_response,
channel_send_err,
}
enum values {
proc_cpu_v0,
proc_mem_rss,
iqtx_len_st_rf1,
iqtx_len_st_rf3,
iqtx_len_mt_rf3,
iqtx_len_lt_rf3,
iqtx_len_lt_rf3_lat5,
}
}

View File

@@ -1,4 +1,5 @@
use mettrics::types::CounterU32;
use mettrics::types::HistoLog2;
use mettrics::types::ValueU32;
mettrics::macros::make_metrics!("mettdecl.rs");

View File

@@ -287,38 +287,14 @@ stats_proc::stats_struct!((
stats_struct(
name(DaemonStats),
prefix(daemon),
counters(
critical_error,
todo_mark,
ticker_token_acquire_error,
ticker_token_release_error,
handle_timer_tick_count,
ioc_search_err,
ioc_search_some,
ioc_search_none,
lookupaddr_ok,
events,
event_ca_conn,
ca_conn_status_done,
ca_conn_status_feedback_timeout,
ca_conn_status_feedback_recv,
ca_conn_status_feedback_no_dst,
ca_echo_timeout_total,
caconn_done_channel_state_reset,
insert_worker_spawned,
insert_worker_join_ok,
insert_worker_join_ok_err,
insert_worker_join_err,
caconnset_health_response,
channel_send_err,
),
counters(asdasd,),
values(
channel_unknown_address,
channel_search_pending,
channel_with_address,
channel_no_address,
connset_health_lat_ema,
iqtx_len_st_rf1,
// iqtx_len_st_rf1,
iqtx_len_st_rf3,
iqtx_len_mt_rf3,
iqtx_len_lt_rf3,