From 265f4b9bd9121229c3d020b02b69a398f12bb3ca Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Mon, 28 Apr 2025 17:12:46 +0200 Subject: [PATCH] Cleaning and transition more stats counters --- daqingest/Cargo.toml | 2 +- daqingest/src/daemon.rs | 70 +- dbpg/src/seriesbychannel.rs | 18 +- netfetch/src/ca.rs | 1 - netfetch/src/ca/beacons.rs | 4 +- netfetch/src/ca/conn.rs | 113 +- netfetch/src/ca/connset.rs | 111 +- netfetch/src/ca/connset_input_merge.rs | 8 +- netfetch/src/ca/finder.rs | 2 +- netfetch/src/ca/proto.rs | 1560 ------------------------ netfetch/src/ca/statemap.rs | 17 +- netfetch/src/conf.rs | 4 +- netfetch/src/metrics.rs | 6 +- scywr/src/fut.rs | 47 - scywr/src/futinsert.rs | 7 +- scywr/src/futinsertloop.rs | 108 -- scywr/src/insertworker.rs | 187 +-- scywr/src/iteminsertqueue.rs | 59 +- scywr/src/lib.rs | 2 - serde_helper/src/serde_dummy.rs | 1 - serieswriter/src/rtwriter.rs | 1 - stats/Cargo.toml | 2 +- stats/mettdecl.rs | 22 +- stats/src/mett.rs | 1 + stats/src/stats.rs | 28 +- 25 files changed, 265 insertions(+), 2116 deletions(-) delete mode 100644 netfetch/src/ca/proto.rs delete mode 100644 scywr/src/fut.rs delete mode 100644 scywr/src/futinsertloop.rs diff --git a/daqingest/Cargo.toml b/daqingest/Cargo.toml index e1ad923..29d0267 100644 --- a/daqingest/Cargo.toml +++ b/daqingest/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqingest" -version = "0.3.0-aa.1" +version = "0.3.0-aa.2" authors = ["Dominik Werder "] edition = "2024" diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index a55d7db..2b3d063 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -84,8 +84,6 @@ pub struct Daemon { series_conf_by_id_tx: Sender<()>, iqtx: Option, daemon_metrics: stats::mett::DaemonMetrics, - cpu_latest: u64, - rss_latest: u64, } impl Daemon { @@ -331,7 +329,6 @@ impl Daemon { insert_worker_jhs.extend(jh); }; let stats = Arc::new(DaemonStats::new()); - stats.insert_worker_spawned().add(insert_worker_jhs.len() as _); #[cfg(feature = "bsread")] if let Some(bsaddr) = &opts.test_bsread_addr { @@ -419,8 +416,6 @@ impl Daemon { series_conf_by_id_tx, iqtx: Some(iqtx2), daemon_metrics: stats::mett::DaemonMetrics::new(), - cpu_latest: 0, - rss_latest: 0, }; Ok(ret) } @@ -479,15 +474,7 @@ impl Daemon { fn update_cpu_usage(&mut self) { let cpu = Self::get_cpu_usage(); - if cpu > self.cpu_latest { - let diff = cpu - self.cpu_latest; - self.cpu_latest = cpu; - self.daemon_metrics.proc_cpu_v0_inc().add(diff as u32); - } else if cpu < self.cpu_latest { - let diff = self.cpu_latest - cpu; - self.cpu_latest = cpu; - self.daemon_metrics.proc_cpu_v0_dec().add(diff as u32); - } + self.daemon_metrics.proc_cpu_v0().set(cpu as _); } fn get_memory_usage() -> u64 { @@ -518,15 +505,7 @@ impl Daemon { fn update_memory_usage(&mut self) { let rss = Self::get_memory_usage(); - if rss > self.rss_latest { - let diff = rss - self.rss_latest; - self.rss_latest = rss; - self.daemon_metrics.proc_mem_rss_inc().add(diff as u32); - } else if rss < self.rss_latest { - let diff = self.rss_latest - rss; - self.rss_latest = rss; - self.daemon_metrics.proc_mem_rss_dec().add(diff as u32); - } + self.daemon_metrics.proc_mem_rss().set(rss as _); } async fn handle_timer_tick(&mut self) -> Result<(), Error> { @@ -545,7 +524,6 @@ impl Daemon { std::process::exit(0); } } - self.stats.handle_timer_tick_count.inc(); let tsnow = SystemTime::now(); { let n = SIGINT.load(atomic::Ordering::Acquire); @@ -583,18 +561,19 @@ impl Daemon { .as_ref() .map(|x| netfetch::metrics::types::InsertQueuesTxMetrics::from(x)); if let Some(iqtxm) = iqtxm { - // TODO metrics - self.stats().iqtx_len_st_rf1().set(iqtxm.st_rf1_len as _); - self.stats().iqtx_len_st_rf3().set(iqtxm.st_rf3_len as _); - self.stats().iqtx_len_mt_rf3().set(iqtxm.mt_rf3_len as _); - self.stats().iqtx_len_lt_rf3().set(iqtxm.lt_rf3_len as _); - self.stats().iqtx_len_lt_rf3_lat5().set(iqtxm.lt_rf3_lat5_len as _); + self.daemon_metrics.iqtx_len_st_rf1().set(iqtxm.st_rf1_len as _); + self.daemon_metrics.iqtx_len_st_rf3().set(iqtxm.st_rf3_len as _); + self.daemon_metrics.iqtx_len_mt_rf3().set(iqtxm.mt_rf3_len as _); + self.daemon_metrics.iqtx_len_lt_rf3().set(iqtxm.lt_rf3_len as _); + self.daemon_metrics + .iqtx_len_lt_rf3_lat5() + .set(iqtxm.lt_rf3_lat5_len as _); } else { - self.stats().iqtx_len_st_rf1().set(2); - self.stats().iqtx_len_st_rf3().set(2); - self.stats().iqtx_len_mt_rf3().set(2); - self.stats().iqtx_len_lt_rf3().set(2); - self.stats().iqtx_len_lt_rf3_lat5().set(2); + self.daemon_metrics.iqtx_len_st_rf1().set(0); + self.daemon_metrics.iqtx_len_st_rf3().set(0); + self.daemon_metrics.iqtx_len_mt_rf3().set(0); + self.daemon_metrics.iqtx_len_lt_rf3().set(0); + self.daemon_metrics.iqtx_len_lt_rf3_lat5().set(0); } self.update_cpu_usage(); self.update_memory_usage(); @@ -677,7 +656,7 @@ impl Daemon { Healthy => { let tsnow = Instant::now(); self.connset_status_last = tsnow; - self.stats.caconnset_health_response().inc(); + self.daemon_metrics.caconnset_health_response().inc(); } Error(e) => { error!("error from CaConnSet: {e}"); @@ -766,21 +745,21 @@ impl Daemon { match self.handle_config_reload_inner().await { Ok(()) => { if tx.send(0).await.is_err() { - self.stats.channel_send_err().inc(); + self.daemon_metrics.channel_send_err().inc(); } Ok(()) } Err(e) => { error!("{e}"); if tx.send(127).await.is_err() { - self.stats.channel_send_err().inc(); + self.daemon_metrics.channel_send_err().inc(); } Ok(()) } } } - #[cfg(target_abi = "x32")] + #[cfg(feature = "DISABLED")] async fn handle_shutdown(&mut self) -> Result<(), Error> { warn!("received shutdown event"); if self.shutting_down { @@ -795,7 +774,7 @@ impl Daemon { async fn handle_event(&mut self, item: DaemonEvent) -> Result<(), Error> { use DaemonEvent::*; - self.stats.events.inc(); + self.daemon_metrics.handle_event().inc(); let ts1 = Instant::now(); let item_summary = item.summary(); let ret = match item { @@ -805,7 +784,6 @@ impl Daemon { match tx.send(i.wrapping_add(1)).await { Ok(()) => {} Err(_) => { - self.stats.ticker_token_release_error.inc(); error!("can not send ticker token"); return Err(Error::with_msg_no_trace("can not send ticker token")); } @@ -871,7 +849,7 @@ impl Daemon { match ticker_inp_rx.recv().await { Ok(_) => {} Err(_) => { - stats.ticker_token_acquire_error.inc(); + panic!("can not acquire timer ticker token"); break; } } @@ -909,7 +887,6 @@ impl Daemon { daemon_stats, conn_set_stats, ca_conn_stats, - self.connset_ctrl.ca_proto_stats().clone(), self.insert_worker_stats.clone(), self.series_by_channel_stats.clone(), self.connset_ctrl.ioc_finder_stats().clone(), @@ -960,17 +937,12 @@ impl Daemon { while let Some(jh) = self.insert_workers_jhs.pop() { match jh.await.map_err(Error::from_string) { Ok(x) => match x { - Ok(()) => { - self.stats.insert_worker_join_ok().inc(); - // debug!("joined insert worker"); - } + Ok(()) => {} Err(e) => { - self.stats.insert_worker_join_ok_err().inc(); error!("joined insert worker, error {e}"); } }, Err(e) => { - self.stats.insert_worker_join_err().inc(); error!("insert worker join error {e}"); } } diff --git a/dbpg/src/seriesbychannel.rs b/dbpg/src/seriesbychannel.rs index 064910c..26b0f67 100644 --- a/dbpg/src/seriesbychannel.rs +++ b/dbpg/src/seriesbychannel.rs @@ -3,8 +3,8 @@ use async_channel::Sender; use chrono::DateTime; use chrono::Utc; use core::fmt; -use err::thiserror; use err::ThisError; +use err::thiserror; use futures_util::Future; use futures_util::TryFutureExt; use log::*; @@ -626,6 +626,7 @@ pub async fn start_lookup_workers( Ok((query_tx, jhs, bjh)) } +#[allow(unused)] struct SalterTest; impl HashSalter for SalterTest { @@ -639,6 +640,8 @@ pub struct SalterRandom; impl HashSalter for SalterRandom { fn hupd(hupd: &mut dyn FnMut(&[u8]), i1: u16, i2: u16) { + let _ = i1; + let _ = i2; let tsnow = Instant::now(); let b = unsafe { &*(&tsnow as *const Instant as *const [u8; core::mem::size_of::()]) }; hupd(b) @@ -647,9 +650,9 @@ impl HashSalter for SalterRandom { #[cfg(test)] async fn psql_play(db: &Database) -> Result<(), Error> { - use tokio_postgres::types::ToSql; + // use tokio_postgres::types::ToSql; use tokio_postgres::types::Type; - let (pg, pg_client_jh) = crate::conn::make_pg_client(db).await?; + let (pg, _pg_client_jh) = crate::conn::make_pg_client(db).await?; if false { let sql = concat!("select pg_typeof($1)"); let qu = pg.prepare_typed(sql, &[Type::INT4_ARRAY]).await?; @@ -763,7 +766,7 @@ fn test_series_by_channel_01() { } } // TODO keep join handles and await later - let (channel_info_query_tx, jhs, jh) = + let (channel_info_query_tx, _jhs, _jh) = dbpg::seriesbychannel::start_lookup_workers::(1, &pgconf, series_by_channel_stats.clone()) .await?; @@ -895,10 +898,3 @@ fn test_db_conf() -> Database { } } } - -#[cfg(test)] -async fn test_db_conn() -> Result { - let db = test_db_conf(); - let (pg, pg_client_jh) = crate::conn::make_pg_client(&db).await?; - Ok(pg) -} diff --git a/netfetch/src/ca.rs b/netfetch/src/ca.rs index 10eb155..2fb722e 100644 --- a/netfetch/src/ca.rs +++ b/netfetch/src/ca.rs @@ -8,7 +8,6 @@ pub mod findioc; pub mod search; pub mod statemap; -use crate::metrics::ExtraInsertsConf; use futures_util::Future; use futures_util::FutureExt; use log::*; diff --git a/netfetch/src/ca/beacons.rs b/netfetch/src/ca/beacons.rs index 0535e26..6740272 100644 --- a/netfetch/src/ca/beacons.rs +++ b/netfetch/src/ca/beacons.rs @@ -41,7 +41,6 @@ pub async fn listen_beacons( worker_tx: Sender, backend: String, ) -> Result<(), Error> { - let stnow = SystemTime::now(); let channel = "epics-ca-beacons".to_string(); let scalar_type = ScalarType::U64; let shape = Shape::Scalar; @@ -56,6 +55,7 @@ pub async fn listen_beacons( }; worker_tx.send(qu).await?; let chinfo = rx.recv().await??; + let _ = chinfo; // TODO // let mut writer = SeriesWriter::new(chinfo.series.to_series()); // let mut deque = VecDeque::new(); @@ -91,6 +91,8 @@ pub async fn listen_beacons( let ts_local = ts; let blob = addr_u32 as i64; let val = DataValue::Scalar(ScalarValue::I64(blob)); + let _ = ts_local; + let _ = val; // writer.write(ts, ts_local, val, &mut deque)?; } } diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 04e815c..1ac25cd 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -781,6 +781,7 @@ enum CaConnState { PeerReady, Shutdown(EndOfStreamReason), EndOfStream, + MetricsEmitted, } impl fmt::Debug for CaConnState { @@ -793,6 +794,7 @@ impl fmt::Debug for CaConnState { Self::PeerReady => fmt.debug_tuple("PeerReady").finish(), Self::Shutdown(v0) => fmt.debug_tuple("Shutdown").field(v0).finish(), Self::EndOfStream => fmt.debug_tuple("EndOfStream").finish(), + Self::MetricsEmitted => fmt.debug_tuple("MetricsEmitted").finish(), } } } @@ -1101,7 +1103,6 @@ pub struct CaConn { ca_conn_event_out_queue: VecDeque, ca_conn_event_out_queue_max: usize, thr_msg_poll: ThrottleTrace, - ca_proto_stats: Arc, rng: Xoshiro128PlusPlus, channel_info_query_qu: VecDeque, channel_info_query_tx: Pin>>, @@ -1118,6 +1119,7 @@ pub struct CaConn { ts_channel_status_pong_last: Instant, mett: stats::mett::CaConnMetrics, metrics_emit_last: Instant, + fionread_last: u32, } impl Drop for CaConn { @@ -1135,7 +1137,6 @@ impl CaConn { iqtx: InsertQueuesTx, channel_info_query_tx: Sender, stats: Arc, - ca_proto_stats: Arc, ) -> Self { let tsnow = Instant::now(); let (cq_tx, cq_rx) = async_channel::bounded(32); @@ -1174,7 +1175,6 @@ impl CaConn { ca_conn_event_out_queue: VecDeque::new(), ca_conn_event_out_queue_max: 2000, thr_msg_poll: ThrottleTrace::new(Duration::from_millis(2000)), - ca_proto_stats, rng, channel_info_query_qu: VecDeque::new(), channel_info_query_tx: Box::pin(SenderPolling::new(channel_info_query_tx)), @@ -1188,6 +1188,7 @@ impl CaConn { ts_channel_status_pong_last: tsnow, mett: stats::mett::CaConnMetrics::new(), metrics_emit_last: tsnow, + fionread_last: 0, } } @@ -1528,12 +1529,14 @@ impl CaConn { if dbg_chn_cid { info!("send out EventAdd for {cid:?}"); } - let ty = CaMsgTy::EventAdd(EventAdd { - sid: st2.channel.sid.to_u32(), - data_type: st2.channel.ca_dbr_type, - data_count: st2.channel.ca_dbr_count, - subid: subid.to_u32(), - }); + let data_count = st2.channel.ca_dbr_count; + let _data_count = 0; + let ty = CaMsgTy::EventAdd(EventAdd::new( + st2.channel.ca_dbr_type, + data_count, + st2.channel.sid.to_u32(), + subid.to_u32(), + )); let msg = CaMsg::from_ty_ts(ty, self.poll_tsnow); let proto = self.proto.as_mut().unwrap(); proto.push_out(msg); @@ -1802,7 +1805,12 @@ impl CaConn { Ok(()) } - fn handle_event_add_res(&mut self, ev: proto::EventAddRes, tsnow: Instant) -> Result<(), Error> { + fn handle_event_add_res( + &mut self, + ev: proto::EventAddRes, + tsnow: Instant, + tscaproto: Instant, + ) -> Result<(), Error> { let subid = Subid(ev.subid); // TODO handle subid-not-found which can also be peer error: let cid = if let Some(x) = self.cid_by_subid.get(&subid) { @@ -1910,6 +1918,7 @@ impl CaConn { iqdqs, tsnow, stnow, + tscaproto, ch_conf.use_ioc_time(), stats, &mut self.rng, @@ -1942,6 +1951,7 @@ impl CaConn { iqdqs, tsnow, stnow, + tscaproto, ch_conf.use_ioc_time(), stats, &mut self.rng, @@ -2049,8 +2059,8 @@ impl CaConn { fn handle_read_notify_res( &mut self, ev: proto::ReadNotifyRes, - camsg_ts: Instant, tsnow: Instant, + tscaproto: Instant, ) -> Result<(), Error> { // trace!("handle_read_notify_res {ev:?}"); // TODO can not rely on the SID in the response. @@ -2058,10 +2068,7 @@ impl CaConn { let ioid = Ioid(ev.ioid); if let Some(pp) = self.handler_by_ioid.get_mut(&ioid) { if let Some(mut fut) = pp.take() { - let camsg = CaMsg { - ty: CaMsgTy::ReadNotifyRes(ev), - ts: camsg_ts, - }; + let camsg = CaMsg::from_ty_ts(CaMsgTy::ReadNotifyRes(ev), tscaproto); fut.as_mut().camsg(camsg, self)?; Ok(()) } else { @@ -2119,6 +2126,7 @@ impl CaConn { iqdqs, stnow, tsnow, + tscaproto, ch_conf.use_ioc_time(), stats, &mut self.rng, @@ -2211,6 +2219,7 @@ impl CaConn { iqdqs, stnow, tsnow, + tscaproto, ch_conf.use_ioc_time(), stats, &mut self.rng, @@ -2243,6 +2252,7 @@ impl CaConn { iqdqs: &mut InsertDeques, stnow: SystemTime, tsnow: Instant, + tscaproto: Instant, use_ioc_time: bool, stats: &CaConnStats, rng: &mut Xoshiro128PlusPlus, @@ -2260,6 +2270,7 @@ impl CaConn { iqdqs, tsnow, stnow, + tscaproto, use_ioc_time, stats, rng, @@ -2277,6 +2288,7 @@ impl CaConn { iqdqs: &mut InsertDeques, tsnow: Instant, stnow: SystemTime, + tscaproto: Instant, use_ioc_time: bool, stats: &CaConnStats, rng: &mut Xoshiro128PlusPlus, @@ -2340,7 +2352,7 @@ impl CaConn { crst.insert_item_ivl_ema.tick(tsnow); // binwriter.ingest(tsev, value.f32_for_binning(), iqdqs)?; { - let wres = writer.write(CaWriterValue::new(value, crst), tsnow, tsev, iqdqs)?; + let wres = writer.write(CaWriterValue::new(value, crst), tscaproto, tsev, iqdqs)?; crst.status_emit_count += wres.nstatus() as u64; if wres.st.accept { crst.dw_st_last = stnow; @@ -2595,13 +2607,11 @@ impl CaConn { // Do not go directly into error state: need to at least attempt to close the channel and wait/timeout for reply. let proto = self.proto.as_mut().ok_or(Error::NoProtocol)?; - let item = CaMsg { - ty: CaMsgTy::ChannelClose(ChannelClose { - sid: st2.channel.sid.0, - cid: st2.channel.cid.0, - }), - ts: tsnow, - }; + let ty = CaMsgTy::ChannelClose(ChannelClose { + sid: st2.channel.sid.0, + cid: st2.channel.cid.0, + }); + let item = CaMsg::from_ty_ts(ty, tsnow); proto.push_out(item); *chst = ChannelState::Closing(ClosingState { tsbeg: tsnow, @@ -2775,7 +2785,8 @@ impl CaConn { Ready(Some(Ok(k))) => { match k { CaItem::Msg(camsg) => { - match &camsg.ty { + let (msgcom, ty) = camsg.into_parts(); + match &ty { CaMsgTy::Version => { if !self.version_seen { self.version_seen = true; @@ -2804,7 +2815,7 @@ impl CaConn { } } } - match camsg.ty { + match ty { CaMsgTy::SearchRes(k) => { let a = k.addr.to_be_bytes(); let addr = format!("{}.{}.{}.{}:{}", a[0], a[1], a[2], a[3], k.tcp_port); @@ -2817,15 +2828,15 @@ impl CaConn { cx.waker().wake_by_ref(); } CaMsgTy::EventAddRes(ev) => { - trace4!("got EventAddRes {:?} cnt {}", camsg.ts, ev.data_count); + trace4!("got EventAddRes {:?} cnt {}", msgcom.ts(), ev.data_count); self.mett.event_add_res_recv().inc(); - Self::handle_event_add_res(self, ev, tsnow)? + Self::handle_event_add_res(self, ev, tsnow, msgcom.ts())? } CaMsgTy::EventAddResEmpty(ev) => { - trace4!("got EventAddResEmpty {:?}", camsg.ts); + trace4!("got EventAddResEmpty {:?}", msgcom.ts()); Self::handle_event_add_res_empty(self, ev, tsnow)? } - CaMsgTy::ReadNotifyRes(ev) => Self::handle_read_notify_res(self, ev, camsg.ts, tsnow)?, + CaMsgTy::ReadNotifyRes(ev) => Self::handle_read_notify_res(self, ev, tsnow, msgcom.ts())?, CaMsgTy::Echo => { if let Some(started) = self.ioc_ping_start { let dt = started.elapsed(); @@ -2874,11 +2885,11 @@ impl CaConn { } self.version_seen = true; } - CaMsgTy::ChannelCloseRes(x) => { - self.handle_channel_close_res(x, tsnow)?; + CaMsgTy::ChannelCloseRes(ty) => { + self.handle_channel_close_res(ty, tsnow)?; } _ => { - warn!("Received unexpected protocol message {:?}", camsg); + warn!("Received unexpected protocol message {:?} {:?}", msgcom, ty); } } } @@ -3014,6 +3025,7 @@ impl CaConn { } fn handle_channel_close_res(&mut self, k: proto::ChannelCloseRes, tsnow: Instant) -> Result<(), Error> { + let _ = tsnow; debug!("{:?}", k); Ok(()) } @@ -3037,6 +3049,11 @@ impl CaConn { Ready(connect_result) => { match connect_result { Ok(Ok(tcp)) => { + let raw_fd = { + use std::os::fd::AsRawFd; + let raw_fd = tcp.as_raw_fd(); + raw_fd + }; self.mett.tcp_connected().inc(); let addr = addr.clone(); self.emit_connection_status_item(ConnectionStatusItem { @@ -3047,6 +3064,7 @@ impl CaConn { self.backoff_reset(); let proto = CaProto::new( TcpAsyncWriteRead::from(tcp), + Some(raw_fd), self.remote_addr_dbg.to_string(), self.opts.array_truncate, ); @@ -3128,6 +3146,7 @@ impl CaConn { } CaConnState::Shutdown(..) => Ok(Ready(None)), CaConnState::EndOfStream => Ok(Ready(None)), + CaConnState::MetricsEmitted => Ok(Ready(None)), }; } } @@ -3230,6 +3249,7 @@ impl CaConn { CaConnState::PeerReady => {} CaConnState::Shutdown(..) => {} CaConnState::EndOfStream => {} + CaConnState::MetricsEmitted => {} } self.iqdqs.housekeeping(); if self.metrics_emit_last + METRICS_EMIT_IVL <= tsnow { @@ -3243,8 +3263,29 @@ impl CaConn { fn metrics_emit(&mut self) { if let Some(x) = self.proto.as_mut() { + let fionread = if let Some(rawfd) = x.get_raw_socket_fd() { + let mut v = 0; + if unsafe { libc::ioctl(rawfd, libc::FIONREAD, &mut v) } == 0 { + Some(v as u32) + } else { + None + } + } else { + None + }; let mett = x.mett(); mett.metrics_emit().inc(); + if let Some(fionread) = fionread { + if fionread > self.fionread_last { + let diff = fionread - self.fionread_last; + self.fionread_last = fionread; + mett.fionread_inc().add(diff); + } else if fionread < self.fionread_last { + let diff = self.fionread_last - fionread; + self.fionread_last = fionread; + mett.fionread_dec().add(diff); + } + } let m = mett.take_and_reset(); self.mett.proto().ingest(m); } @@ -3552,8 +3593,14 @@ impl Stream for CaConn { let mut have_pending = false; let mut have_progress = false; - if let CaConnState::EndOfStream = self.state { + if let CaConnState::MetricsEmitted = self.state { break Ready(None); + } else if let CaConnState::EndOfStream = self.state { + self.mett.metrics_emit_final().inc(); + let mett = self.mett.take_and_reset(); + self.state = CaConnState::MetricsEmitted; + break Ready(Some(CaConnEvent::new_now(CaConnEventValue::Metrics(mett)))); + // break Ready(None); } else if let Some(item) = self.ca_conn_event_out_queue.pop_front() { break Ready(Some(item)); } diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index b9a8a3a..d7541af 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -2,7 +2,6 @@ use super::conn::EndOfStreamReason; use super::findioc::FindIocRes; use crate::ca::conn; use crate::ca::statemap; -use crate::ca::statemap::CaConnState; use crate::ca::statemap::MaybeWrongAddressState; use crate::ca::statemap::WithAddressState; use crate::conf::CaIngestOpts; @@ -38,7 +37,6 @@ use scywr::senderpolling::SenderPolling; use serde::Serialize; use series::ChannelStatusSeriesId; use statemap::ActiveChannelState; -use statemap::CaConnStateValue; use statemap::ChannelState; use statemap::ChannelStateMap; use statemap::ChannelStateValue; @@ -50,8 +48,6 @@ use stats::CaConnSetStats; use stats::CaConnStats; use stats::CaProtoStats; use stats::IocFinderStats; -use stats::rand_xoshiro::Xoshiro128PlusPlus; -use stats::rand_xoshiro::rand_core::RngCore; use std::collections::BTreeMap; use std::collections::VecDeque; use std::fmt; @@ -76,18 +72,16 @@ use tracing::Instrument; const CHECK_CHANS_PER_TICK: usize = 10000000; pub const SEARCH_BATCH_MAX: usize = 64; pub const CURRENT_SEARCH_PENDING_MAX: usize = SEARCH_BATCH_MAX * 4; -const UNKNOWN_ADDRESS_STAY: Duration = Duration::from_millis(15000); const NO_ADDRESS_STAY: Duration = Duration::from_millis(20000); -const MAYBE_WRONG_ADDRESS_STAY: Duration = Duration::from_millis(4000); const SEARCH_PENDING_TIMEOUT: Duration = Duration::from_millis(30000); const CHANNEL_HEALTH_TIMEOUT: Duration = Duration::from_millis(30000); const CHANNEL_UNASSIGNED_TIMEOUT: Duration = Duration::from_millis(0); const UNASSIGN_FOR_CONFIG_CHANGE_TIMEOUT: Duration = Duration::from_millis(1000 * 10); const CHANNEL_MAX_WITHOUT_HEALTH_UPDATE: usize = 3000000; -macro_rules! trace2 { ($($arg:tt)*) => { if false { trace!($($arg)*); } }; } +macro_rules! trace2 { ($($arg:expr),*) => ( if false { trace!($($arg),*); } ); } -macro_rules! trace3 { ($($arg:tt)*) => { if false { trace!($($arg)*); } }; } +macro_rules! trace3 { ($($arg:expr),*) => ( if false { trace!($($arg),*); } ); } macro_rules! trace4 { ($($arg:tt)*) => { if false { trace!($($arg)*); } }; } @@ -139,7 +133,6 @@ impl From for ::err::Error { pub struct CmdId(SocketAddrV4, usize); pub struct CaConnRes { - state: CaConnState, sender: Pin>>, stats: Arc, cmd_queue: VecDeque, @@ -265,11 +258,8 @@ pub struct CaConnSetCtrl { rx: Receiver, stats: Arc, ca_conn_stats: Arc, - ca_proto_stats: Arc, ioc_finder_stats: Arc, jh: JoinHandle>, - rng: Xoshiro128PlusPlus, - idcnt: u32, } impl CaConnSetCtrl { @@ -335,19 +325,9 @@ impl CaConnSetCtrl { &self.ca_conn_stats } - pub fn ca_proto_stats(&self) -> &Arc { - &self.ca_proto_stats - } - pub fn ioc_finder_stats(&self) -> &Arc { &self.ioc_finder_stats } - - fn make_id(&mut self) -> u32 { - let id = self.idcnt; - self.idcnt += 1; - self.rng.next_u32() & 0xffff | (id << 16) - } } #[derive(Debug)] @@ -452,11 +432,8 @@ pub struct CaConnSet { ca_conn_stats: Arc, ioc_finder_jh: JoinHandle>, await_ca_conn_jhs: VecDeque<(SocketAddr, JoinHandle>)>, - thr_msg_poll_1: ThrottleTrace, thr_msg_storage_len: ThrottleTrace, - ca_proto_stats: Arc, rogue_channel_count: u64, - connect_fail_count: usize, cssid_latency_max: Duration, ca_connset_metrics: stats::mett::CaConnSetMetrics, } @@ -526,11 +503,8 @@ impl CaConnSet { // connset_out_sender: SenderPolling::new(connset_out_tx), ioc_finder_jh, await_ca_conn_jhs: VecDeque::new(), - thr_msg_poll_1: ThrottleTrace::new(Duration::from_millis(2000)), thr_msg_storage_len: ThrottleTrace::new(Duration::from_millis(1000)), - ca_proto_stats: ca_proto_stats.clone(), rogue_channel_count: 0, - connect_fail_count: 0, cssid_latency_max: Duration::from_millis(2000), ca_connset_metrics: stats::mett::CaConnSetMetrics::new(), }; @@ -541,11 +515,8 @@ impl CaConnSet { rx: connset_out_rx, stats, ca_conn_stats, - ca_proto_stats, ioc_finder_stats, jh, - idcnt: 0, - rng: stats::xoshiro_from_time(), } } @@ -1249,7 +1220,6 @@ impl CaConnSet { fn handle_ca_conn_channel_removed(&mut self, addr: SocketAddr, name: String) -> Result<(), Error> { debug!("handle_ca_conn_channel_removed {addr} {name}"); - let stnow = SystemTime::now(); let name = ChannelName::new(name); if let Some(st1) = self.channel_states.get_mut(&name) { match &mut st1.value { @@ -1368,7 +1338,6 @@ impl CaConnSet { .clone() .ok_or_else(|| Error::MissingChannelInfoChannelTx)?, self.ca_conn_stats.clone(), - self.ca_proto_stats.clone(), ); let conn_tx = conn.conn_command_tx(); let conn_stats = conn.stats(); @@ -1387,7 +1356,6 @@ impl CaConnSet { let fut = fut.instrument(logspan); let jh = tokio::spawn(fut); let ca_conn_res = CaConnRes { - state: CaConnState::new(CaConnStateValue::Fresh), sender: Box::pin(conn_tx.into()), stats: conn_stats, cmd_queue: VecDeque::new(), @@ -1470,80 +1438,6 @@ impl CaConnSet { } } - async fn wait_stopped(&self) -> Result<(), Error> { - warn!("Lock for wait_stopped"); - // let mut g = self.ca_conn_ress.lock().await; - // let mm = std::mem::replace(&mut *g, BTreeMap::new()); - let mm: BTreeMap>> = BTreeMap::new(); - let mut jhs: VecDeque<_> = VecDeque::new(); - for t in mm { - jhs.push_back(t.1.fuse()); - } - loop { - let mut jh = if let Some(x) = jhs.pop_front() { - x - } else { - break; - }; - futures_util::select! { - a = jh => match a { - Ok(k) => match k { - Ok(_) => {} - Err(e) => { - error!("{e:?}"); - } - }, - Err(e) => { - error!("{e:?}"); - } - }, - _b = crate::rt::sleep(Duration::from_millis(1000)).fuse() => { - jhs.push_back(jh); - info!("waiting for {} connections", jhs.len()); - } - }; - } - Ok(()) - } - - fn check_connection_states(&mut self) -> Result<(), Error> { - let tsnow = Instant::now(); - for (addr, val) in &mut self.ca_conn_ress { - let state = &mut val.state; - let v = &mut state.value; - match v { - CaConnStateValue::Fresh => { - // TODO check for delta t since last issued status command. - if tsnow.duration_since(state.last_feedback) > Duration::from_millis(20000) { - error!("TODO Fresh timeout send connection-close for {addr}"); - // TODO collect in metrics - // self.stats.ca_conn_status_feedback_timeout.inc(); - // TODO send shutdown to this CaConn, check that we've received - // a 'shutdown' state from it. (see below) - *v = CaConnStateValue::Shutdown { since: tsnow }; - } - } - CaConnStateValue::HadFeedback => { - // TODO check for delta t since last issued status command. - if tsnow.duration_since(state.last_feedback) > Duration::from_millis(20000) { - error!("TODO HadFeedback timeout send connection-close for {addr}"); - // TODO collect in metrics - // self.stats.ca_conn_status_feedback_timeout.inc(); - *v = CaConnStateValue::Shutdown { since: tsnow }; - } - } - CaConnStateValue::Shutdown { since } => { - if tsnow.saturating_duration_since(*since) > Duration::from_millis(10000) { - // TODO collect in metrics as severe error, this would be a bug. - // self.stats.critical_error.inc(); - error!("Shutdown of CaConn failed for {addr}"); - } - } - } - } - Ok(()) - } - fn check_channel_states(&mut self, tsnow: Instant, stnow: SystemTime) -> Result<(), Error> { let (mut search_pending_count, mut assigned_without_health_update) = self.update_channel_state_counts(); let mut cmd_remove_channel = Vec::new(); @@ -1555,6 +1449,7 @@ impl CaConnSet { } else { self.channel_states.range_mut(..) }; + #[allow(unused)] let mut st_qu_2 = VecDeque::new(); let mut lt_qu_2 = VecDeque::new(); for (i, (ch, st)) in it.enumerate() { diff --git a/netfetch/src/ca/connset_input_merge.rs b/netfetch/src/ca/connset_input_merge.rs index 86402e0..56ea660 100644 --- a/netfetch/src/ca/connset_input_merge.rs +++ b/netfetch/src/ca/connset_input_merge.rs @@ -41,6 +41,10 @@ impl InputMerge { } } +fn todoval() -> T { + todo!() +} + impl Stream for InputMerge { type Item = CaConnSetEvent; @@ -50,7 +54,7 @@ impl Stream for InputMerge { let mut selfp = self.as_mut().project(); if let Some(inp) = selfp.inp3.as_mut().as_pin_mut() { match inp.poll_next(cx) { - Ready(Some(x)) => Some(CaConnSetEvent::ConnSetCmd(todo!())), + Ready(Some(_)) => Some(CaConnSetEvent::ConnSetCmd(todoval())), Ready(None) => { unsafe { // TODO what guarantees that I can drop the content here like this? @@ -70,7 +74,7 @@ impl Stream for InputMerge { let mut selfp = self.as_mut().project(); if let Some(inp) = selfp.inp2.as_mut().as_pin_mut() { match inp.poll_next(cx) { - Ready(Some(x)) => Some(CaConnSetEvent::ConnSetCmd(todo!())), + Ready(Some(_)) => Some(CaConnSetEvent::ConnSetCmd(todoval())), Ready(None) => { unsafe { // TODO what guarantees that I can drop the content here like this? diff --git a/netfetch/src/ca/finder.rs b/netfetch/src/ca/finder.rs index 1a6e2d3..826ba8b 100644 --- a/netfetch/src/ca/finder.rs +++ b/netfetch/src/ca/finder.rs @@ -40,7 +40,7 @@ fn transform_pgres(rows: Vec) -> VecDeque { let n: Result = row.try_get(0); let ch: Result = row.try_get(1); match (n, ch) { - (Ok(n), Ok(ch)) => { + (Ok(_n), Ok(ch)) => { if let Some(addr) = row.get::<_, Option>(3) { let addr = addr.parse().map_or(None, |x| Some(x)); let item = FindIocRes { diff --git a/netfetch/src/ca/proto.rs b/netfetch/src/ca/proto.rs deleted file mode 100644 index 107a183..0000000 --- a/netfetch/src/ca/proto.rs +++ /dev/null @@ -1,1560 +0,0 @@ -use futures_util::AsyncRead; -use futures_util::AsyncWrite; -use futures_util::Stream; -use log::*; -use netpod::timeunits::*; -use slidebuf::SlideBuf; -use std::collections::VecDeque; -use std::io; -use std::pin::Pin; -use std::task::Context; -use std::task::Poll; -use std::time::Instant; - -#[derive(Debug, thiserror::Error)] -#[cstm(name = "CaProto")] -pub enum Error { - SlideBuf(#[from] slidebuf::Error), - #[error("BufferTooSmallForNeedMin({0}, {1})")] - BufferTooSmallForNeedMin(usize, usize), - IO(#[from] io::Error), - BadSlice, - BadCaDbrTypeId(u16), - BadCaScalarTypeId(u16), - GetValHelpInnerTypeMismatch, - GetValHelpTodoWaveform, - NotEnoughPayload, - TodoConversionArray, - CaProtoVersionMissing, - NotEnoughPayloadTimeMetadata(usize), - MismatchDbrTimeType, - BadCaCount, - CaCommandNotSupported(u16), - ParseAttemptInDoneState, - UnexpectedHeader, - ExtendedHeaderBadCount, - NoReadBufferSpace, - NeitherPendingNorProgress, - OutputBufferTooSmall, - LogicError, - BadPayload, -} - -const CA_PROTO_VERSION: u32 = 13; -const EPICS_EPOCH_OFFSET: u64 = 631152000; -const PAYLOAD_LEN_MAX: u32 = 1024 * 1024 * 32; -const PROTO_INPUT_BUF_CAP: u32 = 1024 * 1024 * 40; - -const TESTING_UNRESPONSIVE_TODO_REMOVE: bool = false; -const TESTING_EVENT_ADD_RES_MAX: u32 = 3; - -const TESTING_PROTOCOL_ERROR_TODO_REMOVE: bool = false; -const TESTING_PROTOCOL_ERROR_AFTER_BYTES: u32 = 400; - -pub trait StatsCounter { - fn inc(&mut self); -} - -pub trait StatsCumulative { - fn add(&mut self, v: u64); -} - -pub trait StatsHisto { - fn ingest(&mut self, v: u32); -} - -impl StatsCounter for () { - fn inc(&mut self) {} -} - -impl StatsCumulative for () { - fn add(&mut self, _v: u64) {} -} - -impl StatsHisto for () { - fn ingest(&mut self, _v: u32) {} -} - -pub trait CaProtoStatsRecv: Unpin { - fn out_msg_placed(&mut self) -> &mut dyn StatsCounter; - fn out_bytes(&mut self) -> &mut dyn StatsCumulative; - fn outbuf_len(&mut self) -> &mut dyn StatsHisto; - fn tcp_recv_count(&mut self) -> &mut dyn StatsCounter; - fn tcp_recv_bytes(&mut self) -> &mut dyn StatsCumulative; - fn payload_ext_very_large(&mut self) -> &mut dyn StatsCounter; - fn payload_ext_but_small(&mut self) -> &mut dyn StatsCounter; - fn payload_size(&mut self) -> &mut dyn StatsHisto; - fn protocol_issue(&mut self) -> &mut dyn StatsCounter; - fn data_count(&mut self) -> &mut dyn StatsHisto; -} - -impl CaProtoStatsRecv for () { - fn out_msg_placed(&mut self) -> &mut dyn StatsCounter { - self - } - - fn out_bytes(&mut self) -> &mut dyn StatsCumulative { - self - } - - fn outbuf_len(&mut self) -> &mut dyn StatsHisto { - self - } - - fn tcp_recv_count(&mut self) -> &mut dyn StatsCounter { - self - } - - fn tcp_recv_bytes(&mut self) -> &mut dyn StatsCumulative { - self - } - - fn payload_ext_very_large(&mut self) -> &mut dyn StatsCounter { - self - } - - fn payload_ext_but_small(&mut self) -> &mut dyn StatsCounter { - self - } - - fn payload_size(&mut self) -> &mut dyn StatsHisto { - self - } - - fn protocol_issue(&mut self) -> &mut dyn StatsCounter { - self - } - - fn data_count(&mut self) -> &mut dyn StatsHisto { - self - } -} - -#[derive(Debug)] -pub struct Search { - pub id: u32, - pub channel: String, -} - -#[derive(Debug)] -pub struct SearchRes { - pub addr: u32, - pub tcp_port: u16, - pub id: u32, - pub proto_version: u16, -} - -#[derive(Debug)] -pub struct ErrorCmd { - pub cid: u32, - pub eid: u32, - pub msg: String, -} - -#[derive(Debug)] -pub struct ClientNameRes { - pub name: String, -} - -#[derive(Debug)] -pub struct CreateChan { - pub cid: u32, - pub channel: String, -} - -#[derive(Debug)] -pub struct CreateChanRes { - pub data_type: u16, - pub data_count: u32, - pub cid: u32, - pub sid: u32, -} - -#[derive(Debug)] -pub struct CreateChanFail { - pub cid: u32, -} - -#[derive(Debug)] -pub struct AccessRightsRes { - pub cid: u32, - pub rights: u32, -} - -#[derive(Debug)] -pub struct EventAdd { - pub data_type: u16, - pub data_count: u32, - pub sid: u32, - pub subid: u32, -} - -#[derive(Debug)] -pub struct EventCancel { - pub data_type: u16, - pub data_count: u32, - pub sid: u32, - pub subid: u32, -} - -#[derive(Debug)] -pub struct EventCancelRes { - pub data_type: u16, - pub sid: u32, - pub subid: u32, -} - -// TODO Clone is only used for testing purposes and should get removed later. -#[derive(Debug, Clone)] -pub struct EventAddRes { - pub data_type: u16, - pub data_count: u32, - pub status: u32, - pub subid: u32, - pub payload_len: u32, - pub value: CaEventValue, -} - -#[derive(Debug, Clone)] -pub struct EventAddResEmpty { - pub data_type: u16, - pub sid: u32, - pub subid: u32, -} - -#[derive(Debug)] -pub struct ReadNotify { - pub data_type: u16, - pub data_count: u32, - pub sid: u32, - pub ioid: u32, -} - -#[derive(Debug)] -pub struct ReadNotifyRes { - pub data_type: u16, - pub data_count: u32, - pub sid: u32, - pub ioid: u32, - pub payload_len: u32, - pub value: CaEventValue, -} - -#[derive(Debug)] -pub struct ChannelClose { - pub sid: u32, - pub cid: u32, -} - -#[derive(Debug)] -pub struct ChannelCloseRes { - pub sid: u32, - pub cid: u32, -} - -// This message is only sent from server to client, on server's initiative. -#[derive(Debug)] -pub struct ChannelDisconnect { - pub cid: u32, -} - -#[derive(Debug)] -enum CaScalarType { - I8, - I16, - I32, - F32, - F64, - Enum, - String, -} - -#[derive(Debug)] -enum CaDbrMetaType { - Plain, - Status, - Time, - Ctrl, -} - -#[derive(Debug)] -pub struct CaDbrType { - meta: CaDbrMetaType, - scalar_type: CaScalarType, -} - -impl CaDbrType { - pub fn from_ca_u16(k: u16) -> Result { - if k == 31 { - let ret = CaDbrType { - meta: CaDbrMetaType::Ctrl, - scalar_type: CaScalarType::Enum, - }; - return Ok(ret); - } - if k > 20 { - return Err(Error::BadCaDbrTypeId(k)); - } - let (meta, k) = if k >= 14 { - (CaDbrMetaType::Time, k - 14) - } else if k >= 7 { - (CaDbrMetaType::Status, k - 7) - } else { - (CaDbrMetaType::Plain, k) - }; - use CaScalarType::*; - let scalar_type = match k { - 4 => I8, - 1 => I16, - 5 => I32, - 2 => F32, - 6 => F64, - 3 => Enum, - 0 => String, - k => return Err(Error::BadCaScalarTypeId(k)), - }; - Ok(CaDbrType { meta, scalar_type }) - } -} - -#[derive(Debug, Clone, PartialEq)] -pub enum CaDataScalarValue { - I8(i8), - I16(i16), - I32(i32), - F32(f32), - F64(f64), - Enum(i16), - String(String), - // TODO remove, CA has no bool, make new enum for other use cases. - Bool(bool), -} - -impl CaDataScalarValue { - fn byte_size(&self) -> u32 { - match self { - CaDataScalarValue::I8(_) => 1, - CaDataScalarValue::I16(_) => 2, - CaDataScalarValue::I32(_) => 4, - CaDataScalarValue::F32(_) => 4, - CaDataScalarValue::F64(_) => 8, - CaDataScalarValue::Enum(_) => 2, - CaDataScalarValue::String(v) => v.len() as u32, - CaDataScalarValue::Bool(_) => 1, - } - } -} - -#[derive(Debug, Clone, PartialEq)] -pub enum CaDataArrayValue { - I8(Vec), - I16(Vec), - I32(Vec), - F32(Vec), - F64(Vec), - // TODO remove, CA has no bool, make new enum for other use cases. - Bool(Vec), -} - -impl CaDataArrayValue { - fn byte_size(&self) -> u32 { - match self { - CaDataArrayValue::I8(x) => 1 * x.len() as u32, - CaDataArrayValue::I16(x) => 2 * x.len() as u32, - CaDataArrayValue::I32(x) => 4 * x.len() as u32, - CaDataArrayValue::F32(x) => 4 * x.len() as u32, - CaDataArrayValue::F64(x) => 8 * x.len() as u32, - CaDataArrayValue::Bool(x) => 1 * x.len() as u32, - } - } -} - -#[derive(Debug, Clone, PartialEq)] -pub enum CaDataValue { - Scalar(CaDataScalarValue), - Array(CaDataArrayValue), -} - -impl CaDataValue { - pub fn byte_size(&self) -> u32 { - match self { - CaDataValue::Scalar(x) => x.byte_size(), - CaDataValue::Array(x) => x.byte_size(), - } - } -} - -#[derive(Debug, Clone, PartialEq)] -pub struct CaEventValue { - pub data: CaDataValue, - pub meta: CaMetaValue, -} - -impl CaEventValue { - // Timestamp ns from unix epoch. - pub fn ts(&self) -> Option { - match &self.meta { - CaMetaValue::CaMetaTime(x) => { - let ts = SEC * (x.ca_secs as u64 + EPICS_EPOCH_OFFSET) + x.ca_nanos as u64; - Some(ts) - } - CaMetaValue::CaMetaVariants(_) => None, - } - } - - pub fn f32_for_binning(&self) -> f32 { - match &self.data { - CaDataValue::Scalar(val) => { - use super::proto::CaDataScalarValue::*; - match val { - I8(x) => *x as f32, - I16(x) => *x as f32, - I32(x) => *x as f32, - F32(x) => *x as f32, - F64(x) => *x as f32, - Enum(x) => *x as f32, - String(x) => x.len() as f32, - Bool(x) => f32::from(*x), - } - } - CaDataValue::Array(val) => { - use super::proto::CaDataArrayValue::*; - match val { - I8(x) => x.iter().fold(0., |a, x| a + *x as f32), - I16(x) => x.iter().fold(0., |a, x| a + *x as f32), - I32(x) => x.iter().fold(0., |a, x| a + *x as f32), - F32(x) => x.iter().fold(0., |a, x| a + *x as f32), - F64(x) => x.iter().fold(0., |a, x| a + *x as f32), - Bool(x) => x.iter().fold(0., |a, x| a + f32::from(*x)), - } - } - } - } -} - -#[derive(Debug, Clone, PartialEq)] -pub enum CaMetaValue { - CaMetaTime(CaMetaTime), - CaMetaVariants(CaMetaVariants), -} - -#[derive(Debug, Clone, PartialEq)] -pub struct CaMetaTime { - pub status: u16, - pub severity: u16, - pub ca_secs: u32, - pub ca_nanos: u32, -} - -#[derive(Debug, Clone, PartialEq)] -pub struct CaMetaVariants { - pub status: u16, - pub severity: u16, - pub variants: Vec, -} - -#[derive(Debug)] -pub enum CaMsgTy { - Version, - VersionRes(u16), - Error(ErrorCmd), - ClientName, - ClientNameRes(ClientNameRes), - HostName(String), - Search(Search), - SearchRes(SearchRes), - CreateChan(CreateChan), - CreateChanRes(CreateChanRes), - CreateChanFail(CreateChanFail), - AccessRightsRes(AccessRightsRes), - EventAdd(EventAdd), - EventAddRes(EventAddRes), - EventAddResEmpty(EventAddResEmpty), - EventCancel(EventCancel), - EventCancelRes(EventCancelRes), - ReadNotify(ReadNotify), - ReadNotifyRes(ReadNotifyRes), - ChannelClose(ChannelClose), - ChannelCloseRes(ChannelCloseRes), - ChannelDisconnect(ChannelDisconnect), - Echo, -} - -impl CaMsgTy { - fn cmdid(&self) -> u16 { - use CaMsgTy::*; - match self { - Version => 0, - VersionRes(_) => 0, - Error(_) => 0x0b, - ClientName => 0x14, - ClientNameRes(_) => 0x14, - HostName(_) => 0x15, - Search(_) => 0x06, - SearchRes(_) => 0x06, - CreateChan(_) => 0x12, - CreateChanRes(_) => 0x12, - CreateChanFail(_) => 0x1a, - AccessRightsRes(_) => 0x16, - EventAdd(_) => 0x01, - EventAddRes(_) => 0x01, - // sic: the response to event-cancel is an event-add: - EventAddResEmpty(_) => 0x01, - EventCancel(_) => 0x02, - // sic: the response to event-cancel is an event-add: - EventCancelRes(_) => 0x01, - ReadNotify(_) => 0x0f, - ReadNotifyRes(_) => 0x0f, - ChannelClose(_) => 0x0c, - ChannelCloseRes(_) => 0x0c, - ChannelDisconnect(_) => 0x1b, - Echo => 0x17, - } - } - - fn len(&self) -> usize { - if self.payload_len() <= 0x3ff0 && self.data_count() <= 0xffff { - 16 + self.payload_len() - } else { - 24 + self.payload_len() - } - } - - fn payload_len(&self) -> usize { - use CaMsgTy::*; - match self { - Version => 0, - VersionRes(_) => 0, - Error(x) => (16 + x.msg.len() + 1 + 7) / 8 * 8, - ClientName => 0x10, - ClientNameRes(x) => (x.name.len() + 1 + 7) / 8 * 8, - HostName(x) => (x.len() + 1 + 7) / 8 * 8, - Search(x) => (x.channel.len() + 1 + 7) / 8 * 8, - SearchRes(_) => 8, - CreateChan(x) => (x.channel.len() + 1 + 7) / 8 * 8, - CreateChanRes(_) => 0, - CreateChanFail(_) => 0, - AccessRightsRes(_) => 0, - EventAdd(_) => 16, - EventAddRes(_) => { - error!("should not attempt to serialize the response again"); - panic!(); - } - EventAddResEmpty(_) => 0, - EventCancel(_) => 0, - EventCancelRes(_) => 0, - ReadNotify(_) => 0, - ReadNotifyRes(_) => { - error!("should not attempt to serialize the response again"); - panic!(); - } - ChannelClose(_) => 0, - ChannelCloseRes(_) => 0, - ChannelDisconnect(_) => 0, - Echo => 0, - } - } - - fn data_type(&self) -> u16 { - use CaMsgTy::*; - match self { - Version => 0, - VersionRes(n) => *n, - Error(_) => 0, - ClientName => 0, - ClientNameRes(_) => 0, - HostName(_) => 0, - Search(_) => { - // Reply-flag - 1 - } - SearchRes(x) => x.tcp_port, - CreateChan(_) => 0, - CreateChanRes(x) => x.data_type, - CreateChanFail(_) => 0, - AccessRightsRes(_) => 0, - EventAdd(x) => x.data_type, - EventAddRes(x) => x.data_type, - EventAddResEmpty(x) => x.data_type, - EventCancel(x) => x.data_type, - EventCancelRes(x) => x.data_type, - ReadNotify(x) => x.data_type, - ReadNotifyRes(x) => x.data_type, - ChannelClose(_) => 0, - ChannelCloseRes(_) => 0, - ChannelDisconnect(_) => 0, - Echo => 0, - } - } - - fn data_count(&self) -> u32 { - use CaMsgTy::*; - match self { - Version => CA_PROTO_VERSION, - VersionRes(_) => 0, - Error(_) => 0, - ClientName => 0, - ClientNameRes(_) => 0, - HostName(_) => 0, - Search(_) => CA_PROTO_VERSION, - SearchRes(_) => 0, - CreateChan(_) => 0, - CreateChanRes(..) => { - panic!(); - // x.data_count as _ - } - CreateChanFail(_) => 0, - AccessRightsRes(_) => 0, - EventAdd(x) => x.data_count, - EventAddRes(..) => { - panic!(); - // x.data_count as _ - } - EventAddResEmpty(_) => 0, - EventCancel(x) => x.data_count, - EventCancelRes(..) => 0, - ReadNotify(x) => x.data_count, - ReadNotifyRes(..) => { - panic!(); - // x.data_count as _ - } - ChannelClose(_) => 0, - ChannelCloseRes(_) => 0, - ChannelDisconnect(_) => 0, - Echo => 0, - } - } - - fn param1(&self) -> u32 { - use CaMsgTy::*; - match self { - Version => 0, - VersionRes(_) => 0, - Error(_) => 0, - ClientName => 0, - ClientNameRes(_) => 0, - HostName(_) => 0, - Search(e) => e.id, - SearchRes(x) => x.addr, - CreateChan(x) => x.cid, - CreateChanRes(x) => x.cid, - CreateChanFail(x) => x.cid, - AccessRightsRes(x) => x.cid, - EventAdd(x) => x.sid, - EventAddRes(x) => x.status, - EventAddResEmpty(x) => x.sid, - EventCancel(x) => x.sid, - EventCancelRes(x) => x.sid, - ReadNotify(x) => x.sid, - ReadNotifyRes(x) => x.sid, - ChannelClose(x) => x.sid, - ChannelCloseRes(x) => x.sid, - ChannelDisconnect(x) => x.cid, - Echo => 0, - } - } - - fn param2(&self) -> u32 { - use CaMsgTy::*; - match self { - Version => 0, - VersionRes(_) => 0, - Error(_) => 0, - ClientName => 0, - ClientNameRes(_) => 0, - HostName(_) => 0, - Search(e) => e.id, - SearchRes(x) => x.id, - CreateChan(_) => CA_PROTO_VERSION as _, - CreateChanRes(x) => x.sid, - CreateChanFail(_) => 0, - AccessRightsRes(x) => x.rights, - EventAdd(x) => x.subid, - EventAddRes(x) => x.subid, - EventAddResEmpty(x) => x.subid, - EventCancel(x) => x.subid, - EventCancelRes(x) => x.subid, - ReadNotify(x) => x.ioid, - ReadNotifyRes(x) => x.ioid, - ChannelClose(x) => x.cid, - ChannelCloseRes(x) => x.cid, - ChannelDisconnect(_) => 0, - Echo => 0, - } - } - - fn place_payload_into(&self, buf: &mut [u8]) { - use CaMsgTy::*; - match self { - Version => {} - VersionRes(_) => {} - // Specs: error cmd only from server to client. - Error(_) => todo!(), - ClientName => { - // TODO allow variable client name. - let s = "daqingest".as_bytes(); - let n = s.len(); - buf.fill(0); - buf[..n].copy_from_slice(s); - } - ClientNameRes(_) => { - error!("should not attempt to write ClientNameRes"); - panic!(); - } - HostName(name) => { - let s = name.as_bytes(); - let n = s.len(); - buf.fill(0); - buf[..n].copy_from_slice(s); - } - Search(e) => { - for x in &mut buf[..] { - *x = 0; - } - let d = e.channel.as_bytes(); - if buf.len() < d.len() + 1 { - error!("bad buffer given for search payload {} vs {}", buf.len(), d.len()); - panic!(); - } - buf[0..d.len()].copy_from_slice(&d[0..d.len()]); - } - SearchRes(_) => { - error!("should not attempt to write SearchRes"); - panic!(); - } - CreateChan(x) => { - for x in &mut buf[..] { - *x = 0; - } - let d = x.channel.as_bytes(); - if buf.len() < d.len() + 1 { - error!("bad buffer given for create chan payload {} vs {}", buf.len(), d.len()); - panic!(); - } - buf[0..d.len()].copy_from_slice(&d[0..d.len()]); - } - CreateChanRes(_) => {} - CreateChanFail(_) => {} - AccessRightsRes(_) => {} - EventAdd(_) => { - // Using flags DBE_ARCHIVE, DBE_ALARM, DBE_PROPERTY. - let dbe_value = 0x01; - let dbe_log = 0x02; - let dbe_alarm = 0x04; - let dbe_property = 0x08; - let _ = dbe_value | dbe_property; - let flags = dbe_log | dbe_alarm; - buf.copy_from_slice(&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, flags, 0, 0]); - } - EventAddRes(_) => {} - EventAddResEmpty(_) => {} - EventCancel(_) => {} - EventCancelRes(_) => {} - ReadNotify(_) => {} - ReadNotifyRes(_) => {} - ChannelClose(_) => {} - ChannelCloseRes(_) => {} - ChannelDisconnect(_) => {} - Echo => {} - } - } -} - -macro_rules! convert_scalar_value { - ($st:ty, $var:ident, $buf:expr) => {{ - type ST = $st; - const STL: usize = std::mem::size_of::(); - if $buf.len() < STL { - return Err(Error::NotEnoughPayload); - } - let v = ST::from_be_bytes($buf[..STL].try_into().map_err(|_| Error::BadSlice)?); - CaDataValue::Scalar(CaDataScalarValue::$var(v)) - }}; -} - -macro_rules! convert_scalar_enum_value { - ($st:ty, $buf:expr) => {{ - type ST = $st; - const STL: usize = std::mem::size_of::(); - if $buf.len() < STL { - return Err(Error::NotEnoughPayload); - } - let v = ST::from_be_bytes($buf[..STL].try_into().map_err(|_| Error::BadSlice)?); - CaDataValue::Scalar(CaDataScalarValue::Enum(v)) - }}; -} - -macro_rules! convert_wave_value { - ($st:ty, $var:ident, $n:expr, $buf:expr) => {{ - type ST = $st; - const STL: usize = std::mem::size_of::(); - let nn = $n.min($buf.len() / STL); - let mut a = Vec::with_capacity(nn); - // TODO should optimize? - let mut bb = &$buf[..]; - for _ in 0..nn { - let v = ST::from_be_bytes(bb[..STL].try_into().map_err(|_| Error::BadSlice)?); - bb = &bb[STL..]; - a.push(v); - } - CaDataValue::Array(CaDataArrayValue::$var(a)) - }}; -} - -#[derive(Debug)] -pub struct CaMsg { - pub ty: CaMsgTy, - pub ts: Instant, -} - -impl CaMsg { - pub fn from_ty_ts(ty: CaMsgTy, ts: Instant) -> Self { - Self { ty, ts } - } - - fn len(&self) -> usize { - self.ty.len() - } - - fn place_into(&self, buf: &mut [u8]) { - if self.ty.payload_len() <= 0x3ff0 && self.ty.data_count() <= 0xffff { - let pls = self.ty.payload_len() as u16; - let cnt = self.ty.data_count() as u16; - let t = self.ty.cmdid().to_be_bytes(); - buf[0] = t[0]; - buf[1] = t[1]; - let t = pls.to_be_bytes(); - buf[2] = t[0]; - buf[3] = t[1]; - let t = self.ty.data_type().to_be_bytes(); - buf[4] = t[0]; - buf[5] = t[1]; - let t = cnt.to_be_bytes(); - buf[6] = t[0]; - buf[7] = t[1]; - let t = self.ty.param1().to_be_bytes(); - buf[8] = t[0]; - buf[9] = t[1]; - buf[10] = t[2]; - buf[11] = t[3]; - let t = self.ty.param2().to_be_bytes(); - buf[12] = t[0]; - buf[13] = t[1]; - buf[14] = t[2]; - buf[15] = t[3]; - self.ty.place_payload_into(&mut buf[16..]); - } else { - let pls = self.ty.payload_len(); - let cnt = self.ty.data_count(); - let t = self.ty.cmdid().to_be_bytes(); - buf[0] = t[0]; - buf[1] = t[1]; - buf[2] = 0xff; - buf[3] = 0xff; - let t = self.ty.data_type().to_be_bytes(); - buf[4] = t[0]; - buf[5] = t[1]; - buf[6] = 0x00; - buf[7] = 0x00; - let t = self.ty.param1().to_be_bytes(); - buf[8] = t[0]; - buf[9] = t[1]; - buf[10] = t[2]; - buf[11] = t[3]; - let t = self.ty.param2().to_be_bytes(); - buf[12] = t[0]; - buf[13] = t[1]; - buf[14] = t[2]; - buf[15] = t[3]; - let t = pls.to_be_bytes(); - buf[16] = t[0]; - buf[17] = t[1]; - buf[18] = t[2]; - buf[19] = t[3]; - let t = cnt.to_be_bytes(); - buf[20] = t[0]; - buf[21] = t[1]; - buf[22] = t[2]; - buf[23] = t[3]; - self.ty.place_payload_into(&mut buf[24..]); - } - } - - fn ca_scalar_value(scalar_type: &CaScalarType, buf: &[u8]) -> Result { - let val = match scalar_type { - CaScalarType::I8 => convert_scalar_value!(i8, I8, buf), - CaScalarType::I16 => convert_scalar_value!(i16, I16, buf), - CaScalarType::I32 => convert_scalar_value!(i32, I32, buf), - CaScalarType::F32 => convert_scalar_value!(f32, F32, buf), - CaScalarType::F64 => convert_scalar_value!(f64, F64, buf), - CaScalarType::Enum => convert_scalar_enum_value!(i16, buf), - CaScalarType::String => { - // TODO constrain string length to the CA `data_count`. - let mut ixn = buf.len(); - for (i, &c) in buf.iter().enumerate() { - if c == 0 { - ixn = i; - break; - } - } - //info!("try to read string from payload len {} ixn {}", buf.len(), ixn); - let v = String::from_utf8_lossy(&buf[..ixn]); - CaDataValue::Scalar(CaDataScalarValue::String(v.into())) - } - }; - Ok(val) - } - - fn ca_wave_value(scalar_type: &CaScalarType, n: usize, buf: &[u8]) -> Result { - let val = match scalar_type { - CaScalarType::I8 => convert_wave_value!(i8, I8, n, buf), - CaScalarType::I16 => convert_wave_value!(i16, I16, n, buf), - CaScalarType::I32 => convert_wave_value!(i32, I32, n, buf), - CaScalarType::F32 => convert_wave_value!(f32, F32, n, buf), - CaScalarType::F64 => convert_wave_value!(f64, F64, n, buf), - CaScalarType::String => CaDataValue::Scalar(CaDataScalarValue::String("todo-array-string".into())), - _ => { - warn!("TODO conversion array {scalar_type:?}"); - return Err(Error::TodoConversionArray); - } - }; - Ok(val) - } - - pub fn from_proto_infos( - hi: &HeadInfo, - payload: &[u8], - tsnow: Instant, - array_truncate: usize, - ) -> Result { - let msg = match hi.cmdid { - 0x00 => CaMsg::from_ty_ts(CaMsgTy::VersionRes(hi.data_count() as u16), tsnow), - 0x0b => { - let mut s = String::new(); - s.extend(format!("{:?}", &payload[..payload.len().min(16)]).chars()); - if payload.len() >= 17 { - s.extend(" msg: ".chars()); - s.extend(String::from_utf8_lossy(&payload[17..payload.len() - 1]).chars()); - } - let e = ErrorCmd { - cid: hi.param1, - eid: hi.param2, - msg: s, - }; - CaMsg::from_ty_ts(CaMsgTy::Error(e), tsnow) - } - 0x06 => { - if hi.payload_len() != 8 { - warn!("protocol error: search result is expected with fixed payload size 8"); - } - if hi.data_count() != 0 { - warn!("protocol error: search result is expected with data count 0"); - } - if payload.len() < 2 { - return Err(Error::CaProtoVersionMissing); - } - let proto_version = u16::from_be_bytes(payload[0..2].try_into().map_err(|_| Error::BadSlice)?); - let ty = CaMsgTy::SearchRes(SearchRes { - tcp_port: hi.data_type, - addr: hi.param1, - id: hi.param2, - proto_version, - }); - CaMsg::from_ty_ts(ty, tsnow) - } - 0x01 => { - if payload.len() < 12 { - if payload.len() == 0 { - if hi.data_count() != 0 { - // TODO according to protocol, this should not happen. Count for metrics. - } - let ty = CaMsgTy::EventAddResEmpty(EventAddResEmpty { - data_type: hi.data_type, - sid: hi.param1, - subid: hi.param2, - }); - return Ok(CaMsg::from_ty_ts(ty, tsnow)); - } else { - error!("EventAddRes but bad header {hi:?}"); - return Err(Error::NotEnoughPayloadTimeMetadata(payload.len())); - } - } - let value = Self::extract_ca_data_value(hi, payload, array_truncate)?; - let d = EventAddRes { - data_type: hi.data_type, - data_count: hi.data_count() as _, - status: hi.param1, - subid: hi.param2, - payload_len: hi.payload_len() as u32, - value, - }; - let ty = CaMsgTy::EventAddRes(d); - CaMsg::from_ty_ts(ty, tsnow) - } - 0x0c => { - if payload.len() != 0 { - return Err(Error::BadPayload); - } - let ty = CaMsgTy::ChannelCloseRes(ChannelCloseRes { - sid: hi.param1, - cid: hi.param2, - }); - CaMsg::from_ty_ts(ty, tsnow) - } - 0x0f => { - if payload.len() == 8 { - let v = u64::from_be_bytes(payload.try_into().map_err(|_| Error::BadSlice)?); - debug!("Payload as u64: {v}"); - let v = i64::from_be_bytes(payload.try_into().map_err(|_| Error::BadSlice)?); - debug!("Payload as i64: {v}"); - let v = f64::from_be_bytes(payload.try_into().map_err(|_| Error::BadSlice)?); - debug!("Payload as f64: {v}"); - } - let value = Self::extract_ca_data_value(hi, payload, array_truncate)?; - let ty = CaMsgTy::ReadNotifyRes(ReadNotifyRes { - data_type: hi.data_type, - data_count: hi.data_count() as _, - sid: hi.param1, - ioid: hi.param2, - payload_len: hi.payload_len() as u32, - value, - }); - CaMsg::from_ty_ts(ty, tsnow) - } - 0x12 => { - let ty = CaMsgTy::CreateChanRes(CreateChanRes { - data_type: hi.data_type, - // TODO what am I supposed to use here in case of extended header? - data_count: hi.data_count() as _, - cid: hi.param1, - sid: hi.param2, - }); - CaMsg::from_ty_ts(ty, tsnow) - } - 0x16 => { - let ty = CaMsgTy::AccessRightsRes(AccessRightsRes { - cid: hi.param1, - rights: hi.param2, - }); - CaMsg::from_ty_ts(ty, tsnow) - } - 0x17 => { - let ty = CaMsgTy::Echo; - CaMsg::from_ty_ts(ty, tsnow) - } - 0x1a => { - // TODO use different structs for request and response: - let ty = CaMsgTy::CreateChanFail(CreateChanFail { cid: hi.param1 }); - CaMsg::from_ty_ts(ty, tsnow) - } - 0x14 => { - let name = std::ffi::CString::new(payload) - .map(|s| s.into_string().unwrap_or_else(|e| format!("{e:?}"))) - .unwrap_or_else(|e| format!("{e:?}")); - CaMsg::from_ty_ts(CaMsgTy::ClientNameRes(ClientNameRes { name }), tsnow) - } - // TODO make response type for host name: - 0x15 => CaMsg::from_ty_ts(CaMsgTy::HostName("TODOx5288".into()), tsnow), - 0x1b => { - warn!("HANDLE_SERVER_CHANNEL_DISCONNECT"); - return Err(Error::CaCommandNotSupported(x)); - } - x => return Err(Error::CaCommandNotSupported(x)), - }; - Ok(msg) - } - - fn extract_ca_data_value(hi: &HeadInfo, payload: &[u8], array_truncate: usize) -> Result { - use netpod::Shape; - let ca_dbr_ty = CaDbrType::from_ca_u16(hi.data_type)?; - let ca_sh = Shape::from_ca_count(hi.data_count() as _).map_err(|_| { - error!("BadCaCount {hi:?}"); - Error::BadCaCount - })?; - let (meta, data_offset) = match &ca_dbr_ty.meta { - CaDbrMetaType::Plain => return Err(Error::MismatchDbrTimeType), - CaDbrMetaType::Status => return Err(Error::MismatchDbrTimeType), - CaDbrMetaType::Time => { - let status = u16::from_be_bytes(payload[0..2].try_into().map_err(|_| Error::BadSlice)?); - let severity = u16::from_be_bytes(payload[2..4].try_into().map_err(|_| Error::BadSlice)?); - let ca_secs = u32::from_be_bytes(payload[4..8].try_into().map_err(|_| Error::BadSlice)?); - let ca_nanos = u32::from_be_bytes(payload[8..12].try_into().map_err(|_| Error::BadSlice)?); - let meta = CaMetaValue::CaMetaTime(CaMetaTime { - status, - severity, - ca_secs, - ca_nanos, - }); - (meta, 12) - } - CaDbrMetaType::Ctrl => { - let status = u16::from_be_bytes(payload[0..2].try_into().map_err(|_| Error::BadSlice)?); - let severity = u16::from_be_bytes(payload[2..4].try_into().map_err(|_| Error::BadSlice)?); - let varcnt = u16::from_be_bytes(payload[4..6].try_into().map_err(|_| Error::BadSlice)?); - if varcnt > 16 { - return Err(Error::BadCaCount); - } - // let s = String::from_utf8_lossy(&payload[6..6 + 26 * 16]); - let mut variants = Vec::new(); - for i in 0..varcnt { - let p = (6 + 26 * i) as usize; - let s1 = std::ffi::CStr::from_bytes_until_nul(&payload[p..p + 26]) - .map_or(String::from("encodingerror"), |x| { - x.to_str().map_or(String::from("encodingerror"), |x| x.to_string()) - }); - let s1 = if s1.len() >= 26 { - String::from("toolongerror") - } else { - s1 - }; - variants.push(s1); - } - // info!("enum variants debug {varcnt} {s} {variants:?}"); - let meta = CaMetaValue::CaMetaVariants(CaMetaVariants { - status, - severity, - variants, - }); - (meta, 2 + 2 + 2 + 26 * 16) - } - }; - let meta_padding = match ca_dbr_ty.meta { - CaDbrMetaType::Plain => 0, - CaDbrMetaType::Status => match ca_dbr_ty.scalar_type { - CaScalarType::I8 => 1, - CaScalarType::I16 => 0, - CaScalarType::I32 => 0, - CaScalarType::F32 => 0, - CaScalarType::F64 => 4, - CaScalarType::Enum => 0, - CaScalarType::String => 0, - }, - CaDbrMetaType::Time => match ca_dbr_ty.scalar_type { - CaScalarType::I8 => 3, - CaScalarType::I16 => 2, - CaScalarType::I32 => 0, - CaScalarType::F32 => 0, - CaScalarType::F64 => 4, - CaScalarType::Enum => 2, - CaScalarType::String => 0, - }, - CaDbrMetaType::Ctrl => match ca_dbr_ty.scalar_type { - CaScalarType::I8 => 1, - CaScalarType::I16 => 0, - CaScalarType::I32 => 0, - CaScalarType::F32 => 0, - CaScalarType::F64 => 0, - CaScalarType::Enum => 0, - CaScalarType::String => 0, - }, - }; - let valbuf = &payload[data_offset + meta_padding..]; - let value = match ca_sh { - Shape::Scalar => Self::ca_scalar_value(&ca_dbr_ty.scalar_type, valbuf)?, - Shape::Wave(n) => Self::ca_wave_value(&ca_dbr_ty.scalar_type, (n as usize).min(array_truncate), valbuf)?, - Shape::Image(_, _) => { - error!("Can not handle image from channel access"); - err::todoval() - } - }; - let value = CaEventValue { data: value, meta }; - Ok(value) - } -} - -#[derive(Debug)] -pub enum CaItem { - Empty, - Msg(CaMsg), -} - -#[derive(Clone, Debug)] -pub struct HeadInfo { - cmdid: u16, - payload_size: u32, - data_type: u16, - data_count: u32, - param1: u32, - param2: u32, - is_ext: bool, -} - -impl HeadInfo { - pub fn from_netbuf(buf: &mut SlideBuf) -> Result { - let command = buf.read_u16_be()?; - let payload_size = buf.read_u16_be()? as u32; - let data_type = buf.read_u16_be()?; - let data_count = buf.read_u16_be()? as u32; - let param1 = buf.read_u32_be()?; - let param2 = buf.read_u32_be()?; - let hi = HeadInfo { - cmdid: command, - payload_size, - data_type, - data_count, - param1, - param2, - is_ext: false, - }; - Ok(hi) - } - - fn with_ext(mut self, payload: u32, datacount: u32) -> Self { - self.is_ext = true; - self.payload_size = payload; - self.data_count = datacount; - self - } - - pub fn cmdid(&self) -> u16 { - self.cmdid - } - - pub fn payload_len(&self) -> u32 { - self.payload_size - } - - pub fn data_count(&self) -> u32 { - self.data_count - } - - // only for debug purpose - pub fn param2(&self) -> u32 { - self.param2 - } -} - -#[derive(Debug)] -enum CaState { - StdHead, - ExtHead(HeadInfo), - Payload(HeadInfo), - Done, -} - -impl CaState { - fn need_min(&self) -> usize { - use CaState::*; - match self { - StdHead => 16, - ExtHead(_) => 8, - Payload(k) => k.payload_len() as usize, - Done => 123, - } - } -} - -pub trait AsyncWriteRead: AsyncWrite + AsyncRead + Send + 'static {} - -impl AsyncWriteRead for T where T: AsyncWrite + AsyncRead + Send + 'static {} - -pub struct CaProto { - tcp: Pin>, - tcp_eof: bool, - remote_name: String, - state: CaState, - buf: SlideBuf, - outbuf: SlideBuf, - out: VecDeque, - array_truncate: usize, - stats: STATS, - resqu: VecDeque, - event_add_res_cnt: u32, - bytes_recv_testing: u32, -} - -impl CaProto -where - STATS: CaProtoStatsRecv, -{ - pub fn new(tcp: T, remote_name: String, array_truncate: usize, stats: STATS) -> Self { - Self { - tcp: Box::pin(tcp), - tcp_eof: false, - remote_name, - state: CaState::StdHead, - buf: SlideBuf::new(PROTO_INPUT_BUF_CAP as usize), - outbuf: SlideBuf::new(1024 * 256), - out: VecDeque::new(), - array_truncate, - stats, - resqu: VecDeque::with_capacity(256), - event_add_res_cnt: 0, - bytes_recv_testing: 0, - } - } - - pub fn proto_out_len(&self) -> usize { - self.out.len() - } - - pub fn push_out(&mut self, item: CaMsg) { - self.out.push_back(item); - } - - fn out_msg_buf(&mut self) -> Option<(&CaMsg, &mut [u8])> { - if let Some(item) = self.out.front() { - match self.outbuf.available_writable_area(item.len()) { - Ok(buf) => Some((item, buf)), - Err(_) => { - // TODO is this the correct behavior? - None - } - } - } else { - None - } - } - - fn attempt_output(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - use Poll::*; - let this = self.as_mut().get_mut(); - let w = &mut this.tcp; - let b = this.outbuf.data(); - let w = Pin::new(w); - match w.poll_write(cx, b) { - Ready(k) => match k { - Ok(k) => match self.outbuf.adv(k) { - Ok(()) => { - self.stats.out_bytes().add(k as u64); - Ready(Ok(k)) - } - Err(e) => { - error!("advance error {:?}", e); - Ready(Err(e.into())) - } - }, - Err(e) => { - error!("output write error {:?}", e); - Ready(Err(e.into())) - } - }, - Pending => Pending, - } - } - - fn loop_body(mut self: Pin<&mut Self>, cx: &mut Context) -> Result, Error> { - use Poll::*; - let mut have_pending = false; - let mut have_progress = false; - let tsnow = Instant::now(); - { - let g = self.outbuf.len(); - self.stats.outbuf_len().ingest(g as u32); - } - while let Some((msg, buf)) = self.out_msg_buf() { - let msglen = msg.len(); - if msglen > buf.len() { - break; - } - msg.place_into(&mut buf[..msglen]); - self.outbuf.wadv(msglen)?; - self.out.pop_front(); - self.stats.out_msg_placed().inc(); - } - while self.outbuf.len() != 0 { - match Self::attempt_output(self.as_mut(), cx)? { - Ready(n) => { - if n == 0 { - let e = Error::LogicError; - return Err(e); - } - have_progress = true; - } - Pending => { - have_pending = true; - break; - } - } - } - let need_min = self.state.need_min(); - { - let cap = self.buf.cap(); - if cap < need_min { - let e = Error::BufferTooSmallForNeedMin(cap, need_min); - warn!("{e}"); - return Err(e); - } - } - loop { - if self.tcp_eof { - break; - } - let this = self.as_mut().get_mut(); - let tcp = Pin::new(&mut this.tcp); - let buf = this.buf.available_writable_area(need_min)?; - if buf.len() == 0 { - return Err(Error::NoReadBufferSpace); - } - break match tcp.poll_read(cx, buf) { - Ready(k) => match k { - Ok(nf) => { - // let nf = rbuf.filled().len(); - if nf == 0 { - debug!("peer done {:?} {:?}", self.remote_name, self.state); - self.tcp_eof = true; - } else { - // if false { - // debug!("received {} bytes", nf); - // let t = nf.min(32); - // debug!("received data {:?}", &rbuf.filled()[0..t]); - // } - if TESTING_PROTOCOL_ERROR_TODO_REMOVE { - self.bytes_recv_testing = self.bytes_recv_testing.saturating_add(nf as u32); - if self.bytes_recv_testing <= TESTING_PROTOCOL_ERROR_AFTER_BYTES { - self.buf.wadv(nf)?; - } else { - let nr = - (self.bytes_recv_testing - TESTING_PROTOCOL_ERROR_AFTER_BYTES).min(nf as u32); - self.buf.wadv(nf - nr as usize)?; - for _ in 0..nr { - self.buf.put_u8(0x55)?; - } - } - } else { - self.buf.wadv(nf)?; - } - have_progress = true; - self.stats.tcp_recv_count().inc(); - self.stats.tcp_recv_bytes().add(nf as _); - continue; - } - } - Err(e) => { - return Err(e.into()); - } - }, - Pending => { - have_pending = true; - } - }; - } - while self.resqu.len() < self.resqu.capacity() { - if self.buf.len() >= self.state.need_min() { - if let Some(item) = self.parse_item(tsnow)? { - self.resqu.push_back(item); - } else { - // Nothing to do - } - have_progress = true; - } else { - break; - } - } - if have_progress { - Ok(Ready(())) - } else if have_pending { - Ok(Pending) - } else { - if self.tcp_eof { - self.state = CaState::Done; - Ok(Ready(())) - } else { - Err(Error::NeitherPendingNorProgress) - } - } - } - - fn parse_item(&mut self, tsnow: Instant) -> Result, Error> { - match &self.state { - CaState::StdHead => { - let hi = HeadInfo::from_netbuf(&mut self.buf)?; - if hi.cmdid > 26 { - // TODO count as logic error - self.stats.protocol_issue().inc(); - } - if hi.payload_size == 0xffff { - self.state = CaState::ExtHead(hi); - Ok(None) - } else { - self.stats.payload_size().ingest(hi.payload_len() as u32); - if hi.payload_size == 0 { - self.state = CaState::StdHead; - let msg = CaMsg::from_proto_infos(&hi, &[], tsnow, self.array_truncate)?; - Ok(Some(CaItem::Msg(msg))) - } else { - self.state = CaState::Payload(hi); - Ok(None) - } - } - } - CaState::ExtHead(hi) => { - let payload_size = self.buf.read_u32_be()?; - let data_count = self.buf.read_u32_be()?; - self.stats.payload_size().ingest(hi.payload_len() as u32); - if payload_size > PAYLOAD_LEN_MAX { - self.stats.payload_ext_very_large().inc(); - if false { - warn!( - "ExtHead data_type {} payload_size {payload_size} data_count {data_count}", - hi.data_type - ); - } - } - if payload_size <= 0x3ff0 { - // NOTE can happen even with zero payload, just because data-count exceeds u16. - self.stats.payload_ext_but_small().inc(); - if false { - warn!( - "ExtHead data_type {} payload_size {payload_size} data_count {data_count}", - hi.data_type - ); - } - } - let hi = hi.clone().with_ext(payload_size, data_count); - self.state = CaState::Payload(hi); - Ok(None) - } - CaState::Payload(hi) => { - let g = self.buf.read_bytes(hi.payload_len() as usize)?; - let msg = CaMsg::from_proto_infos(hi, g, tsnow, self.array_truncate)?; - // data-count is only reasonable for event messages - let ret = match &msg.ty { - CaMsgTy::EventAddRes(..) => { - self.stats.data_count().ingest(hi.data_count() as u32); - if TESTING_UNRESPONSIVE_TODO_REMOVE { - if self.event_add_res_cnt < TESTING_EVENT_ADD_RES_MAX { - self.event_add_res_cnt += 1; - Ok(Some(CaItem::Msg(msg))) - } else { - Ok(None) - } - } else { - self.event_add_res_cnt += 1; - Ok(Some(CaItem::Msg(msg))) - } - } - _ => Ok(Some(CaItem::Msg(msg))), - }; - self.state = CaState::StdHead; - ret - } - CaState::Done => Err(Error::ParseAttemptInDoneState), - } - } -} - -impl Stream for CaProto { - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - use Poll::*; - loop { - break if let Some(item) = self.resqu.pop_front() { - Ready(Some(Ok(item))) - } else if let CaState::Done = self.state { - Ready(None) - } else { - let k = Self::loop_body(self.as_mut(), cx); - match k { - Ok(Ready(())) => continue, - Ok(Pending) => Pending, - Err(e) => { - self.state = CaState::Done; - Ready(Some(Err(e))) - } - } - }; - } - } -} diff --git a/netfetch/src/ca/statemap.rs b/netfetch/src/ca/statemap.rs index 0f26201..264c5c1 100644 --- a/netfetch/src/ca/statemap.rs +++ b/netfetch/src/ca/statemap.rs @@ -24,21 +24,6 @@ pub enum CaConnStateValue { Shutdown { since: Instant }, } -#[derive(Debug)] -pub struct CaConnState { - pub last_feedback: Instant, - pub value: CaConnStateValue, -} - -impl CaConnState { - pub fn new(value: CaConnStateValue) -> Self { - Self { - last_feedback: Instant::now(), - value, - } - } -} - #[derive(Debug, Clone, Serialize)] pub enum ConnectionStateValue { Unknown, @@ -243,6 +228,8 @@ impl ChannelStateMap { } pub fn insert(&mut self, k: ChannelName, v: ChannelState) -> Option { + let _ = &self.map2; + let _ = &self.map3; self.map.insert(k, v) } diff --git a/netfetch/src/conf.rs b/netfetch/src/conf.rs index b427fd0..922b41a 100644 --- a/netfetch/src/conf.rs +++ b/netfetch/src/conf.rs @@ -23,8 +23,6 @@ pub struct CaIngestOpts { search: Vec, #[serde(default)] search_blacklist: Vec, - whitelist: Option, - blacklist: Option, #[allow(unused)] #[serde(default, with = "humantime_serde")] timeout: Option, @@ -424,6 +422,7 @@ mod serde_replication_bool { use serde::de; use std::fmt; + #[allow(unused)] pub fn serialize(v: &bool, ser: S) -> Result where S: Serializer, @@ -488,6 +487,7 @@ mod serde_option_channel_read_config { use std::fmt; use std::time::Duration; + #[allow(unused)] pub fn serialize(v: &Option, ser: S) -> Result where S: Serializer, diff --git a/netfetch/src/metrics.rs b/netfetch/src/metrics.rs index 12bd67b..030f345 100644 --- a/netfetch/src/metrics.rs +++ b/netfetch/src/metrics.rs @@ -135,7 +135,6 @@ pub struct StatsSet { daemon: Arc, ca_conn_set: Arc, ca_conn: Arc, - ca_proto: Arc, insert_worker_stats: Arc, series_by_channel_stats: Arc, ioc_finder_stats: Arc, @@ -147,7 +146,6 @@ impl StatsSet { daemon: Arc, ca_conn_set: Arc, ca_conn: Arc, - ca_proto: Arc, insert_worker_stats: Arc, series_by_channel_stats: Arc, ioc_finder_stats: Arc, @@ -157,7 +155,6 @@ impl StatsSet { daemon, ca_conn_set, ca_conn, - ca_proto, insert_worker_stats, series_by_channel_stats, ioc_finder_stats, @@ -366,9 +363,8 @@ fn metrics(stats_set: &StatsSet) -> String { let s3 = stats_set.insert_worker_stats.prometheus(); let s4 = stats_set.ca_conn.prometheus(); let s5 = stats_set.series_by_channel_stats.prometheus(); - let s6 = stats_set.ca_proto.prometheus(); let s7 = stats_set.ioc_finder_stats.prometheus(); - [s1, s2, s3, s4, s5, s6, s7].join("") + [s1, s2, s3, s4, s5, s7].join("") } pub struct RoutesResources { diff --git a/scywr/src/fut.rs b/scywr/src/fut.rs deleted file mode 100644 index 4f77adf..0000000 --- a/scywr/src/fut.rs +++ /dev/null @@ -1,47 +0,0 @@ -use crate::access::Error; -use crate::session::ScySession; -use futures_util::Future; -use futures_util::FutureExt; -use scylla::QueryResult; -use scylla::frame::value::ValueList; -use scylla::prepared_statement::PreparedStatement; -use scylla::transport::errors::QueryError; -use std::pin::Pin; -use std::task::Context; -use std::task::Poll; - -pub struct ScyQueryFut<'a> { - fut: Pin> + Send + 'a>>, -} - -impl<'a> ScyQueryFut<'a> { - pub fn new(scy: &'a ScySession, query: Option<&'a PreparedStatement>, values: V) -> Self - where - V: ValueList + Send + 'static, - { - let _ = scy; - let _ = query; - let _ = values; - if true { - todo!("ScyQueryFut") - }; - //let fut = scy.execute(query, values); - let fut = futures_util::future::ready(Err(QueryError::TimeoutError)); - Self { fut: Box::pin(fut) } - } -} - -impl<'a> Future for ScyQueryFut<'a> { - type Output = Result<(), Error>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - use Poll::*; - match self.fut.poll_unpin(cx) { - Ready(k) => match k { - Ok(_) => Ready(Ok(())), - Err(e) => Ready(Err(e.into())), - }, - Pending => Pending, - } - } -} diff --git a/scywr/src/futinsert.rs b/scywr/src/futinsert.rs index 3328fbc..6e4a137 100644 --- a/scywr/src/futinsert.rs +++ b/scywr/src/futinsert.rs @@ -2,12 +2,11 @@ use crate::access::Error; use crate::session::ScySession; use futures_util::Future; use futures_util::FutureExt; -use netpod::log::*; -use scylla::frame::value::ValueList; +use netpod::log::error; +use scylla::QueryResult; use scylla::prepared_statement::PreparedStatement; use scylla::serialize::row::SerializeRow; use scylla::transport::errors::QueryError; -use scylla::QueryResult; use std::pin::Pin; use std::task::Context; use std::task::Poll; @@ -25,7 +24,7 @@ impl<'a> ScyInsertFut<'a> { pub fn new(scy: &'a ScySession, query: &'a PreparedStatement, values: V) -> Self where - V: ValueList + SerializeRow + Send + 'static, + V: SerializeRow + Send + 'static, { let fut = scy.execute_unpaged(query, values); let fut = Box::pin(fut) as _; diff --git a/scywr/src/futinsertloop.rs b/scywr/src/futinsertloop.rs deleted file mode 100644 index bd81921..0000000 --- a/scywr/src/futinsertloop.rs +++ /dev/null @@ -1,108 +0,0 @@ -use crate::access::Error; -use crate::session::ScySession; -use futures_util::Future; -use futures_util::FutureExt; -use netpod::log::*; -use scylla::frame::value::ValueList; -use scylla::prepared_statement::PreparedStatement; -use scylla::transport::errors::QueryError; -use scylla::QueryResult; -use std::pin::Pin; -use std::task::Context; -use std::task::Poll; -use std::time::Instant; - -pub struct InsertLoopFut<'a> { - futs: Vec> + Send + 'a>>>, - fut_ix: usize, - polled: usize, - ts_create: Instant, - ts_poll_start: Instant, -} - -impl<'a> InsertLoopFut<'a> { - pub fn new(scy: &'a ScySession, query: Option<&'a PreparedStatement>, values: Vec, skip_insert: bool) -> Self - where - V: ValueList + Send + Sync + 'static, - { - let _ = scy; - let _ = query; - let mut values = values; - if skip_insert { - values.clear(); - } - // TODO - // Can I store the values in some better generic form? - // Or is it acceptable to generate all insert futures right here and poll them later? - let futs: Vec<_> = values - .into_iter() - .map(|_vs| { - if true { - todo!("InsertLoopFut") - }; - //let fut = scy.execute(query, vs); - let fut = futures_util::future::ready(Err(QueryError::TimeoutError)); - Box::pin(fut) as _ - }) - .collect(); - let tsnow = Instant::now(); - Self { - futs, - fut_ix: 0, - polled: 0, - ts_create: tsnow, - ts_poll_start: tsnow, - } - } -} - -impl<'a> Future for InsertLoopFut<'a> { - type Output = Result<(), Error>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - use Poll::*; - if self.polled == 0 { - self.ts_poll_start = Instant::now(); - } - self.polled += 1; - if self.futs.is_empty() { - return Ready(Ok(())); - } - loop { - let fut_ix = self.fut_ix; - break match self.futs[fut_ix].poll_unpin(cx) { - Ready(k) => match k { - Ok(_) => { - self.fut_ix += 1; - if self.fut_ix >= self.futs.len() { - if false { - let tsnow = Instant::now(); - let dt_created = tsnow.duration_since(self.ts_create).as_secs_f32() * 1e3; - let dt_polled = tsnow.duration_since(self.ts_poll_start).as_secs_f32() * 1e3; - info!( - "InsertLoopFut polled {} dt_created {:6.2} ms dt_polled {:6.2} ms", - self.polled, dt_created, dt_polled - ); - } - continue; - } else { - Ready(Ok(())) - } - } - Err(e) => { - let tsnow = Instant::now(); - let dt_created = tsnow.duration_since(self.ts_create).as_secs_f32() * 1e3; - let dt_polled = tsnow.duration_since(self.ts_poll_start).as_secs_f32() * 1e3; - warn!( - "InsertLoopFut polled {} dt_created {:6.2} ms dt_polled {:6.2} ms", - self.polled, dt_created, dt_polled - ); - warn!("InsertLoopFut done Err {e:?}"); - Ready(Err(e.into())) - } - }, - Pending => Pending, - }; - } - } -} diff --git a/scywr/src/insertworker.rs b/scywr/src/insertworker.rs index f91f395..0e8a0fd 100644 --- a/scywr/src/insertworker.rs +++ b/scywr/src/insertworker.rs @@ -13,38 +13,38 @@ use crate::store::DataStore; use async_channel::Receiver; use async_channel::Sender; use atomic::AtomicU64; +use futures_util::FutureExt; use futures_util::Stream; use futures_util::StreamExt; -use log::*; +use log; use netpod::ttl::RetentionTime; use smallvec::SmallVec; use smallvec::smallvec; use stats::InsertWorkerStats; use std::collections::VecDeque; +use std::pin::Pin; use std::sync::Arc; use std::sync::atomic; +use std::task::Context; +use std::task::Poll; use std::time::Duration; use std::time::Instant; use taskrun::tokio; use tokio::task::JoinHandle; -macro_rules! trace2 { - ($($arg:tt)*) => { - if false { - trace!($($arg)*); - } - }; -} +macro_rules! error { ($($arg:expr),*) => ( if true { log::error!($($arg),*); } ); } -macro_rules! trace_item_execute { - ($($arg:tt)*) => { - if false { - trace!($($arg)*); - } - }; -} +macro_rules! warn { ($($arg:expr),*) => ( if true { log::warn!($($arg),*); } ); } -macro_rules! debug_setup { ($($arg:expr),*) => ( if false { debug!($($arg),*); } ); } +macro_rules! trace2 { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ); } + +macro_rules! trace_transform { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ); } + +macro_rules! trace_inspect { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ); } + +macro_rules! trace_item_execute { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ); } + +macro_rules! debug_setup { ($($arg:expr),*) => ( if false { log::debug!($($arg),*); } ); } autoerr::create_error_v1!( name(Error, "ScyllaInsertWorker"), @@ -67,7 +67,7 @@ fn stats_inc_for_err(stats: &stats::InsertWorkerStats, err: &crate::iteminsertqu } Error::DbError(_) => { if true { - warn!("db error {err}"); + warn!("db error {}", err); } stats.db_error().inc(); } @@ -180,6 +180,46 @@ pub async fn spawn_scylla_insert_workers_dummy( Ok(jhs) } +struct FutTrackDt { + ts1: Instant, + ts2: Instant, + ts_net: Instant, + poll1: bool, + fut: F, +} + +impl FutTrackDt { + fn from_fut_job(job: FutJob) -> Self { + let tsnow = Instant::now(); + Self { + ts1: tsnow, + ts2: tsnow, + ts_net: job.ts_net, + poll1: false, + fut: job.fut, + } + } +} + +impl Future for FutTrackDt +where + F: Future + Unpin, +{ + type Output = (Instant, Instant, Instant, F::Output); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + use Poll::*; + if self.poll1 == false { + self.poll1 = true; + self.ts2 = Instant::now(); + } + match self.as_mut().fut.poll_unpin(cx) { + Ready(x) => Ready((self.ts_net, self.ts1, self.ts2, x)), + Pending => Pending, + } + } +} + async fn worker_streamed( worker_ix: usize, concurrency: usize, @@ -208,16 +248,22 @@ async fn worker_streamed( let stream = stream .map(|x| futures_util::stream::iter(x)) .flatten_unordered(Some(1)) + .map(|x| FutTrackDt::from_fut_job(x)) .buffer_unordered(concurrency); let mut stream = Box::pin(stream); debug_setup!("waiting for item"); - while let Some(item) = stream.next().await { + while let Some((ts_net, ts1, ts2, item)) = stream.next().await { trace_item_execute!("see item"); let tsnow = Instant::now(); match item { Ok(_) => { mett.job_ok().inc(); - // TODO compute the insert latency bin and count. + let dt1 = tsnow.saturating_duration_since(ts1); + let dt2 = tsnow.saturating_duration_since(ts2); + let dt_net = tsnow.saturating_duration_since(ts_net); + mett.job_dt1().push_dur_100us(dt1); + mett.job_dt2().push_dur_100us(dt2); + mett.job_dt_net().push_dur_100us(dt_net); } Err(e) => { use scylla::transport::errors::QueryError; @@ -261,21 +307,26 @@ async fn worker_streamed( Ok(()) } +struct FutJob { + fut: InsertFut, + ts_net: Instant, +} + fn transform_to_db_futures( item_inp: S, data_store: Arc, ignore_writes: bool, stats: Arc, -) -> impl Stream> +) -> impl Stream> where S: Stream>, { - trace!("transform_to_db_futures begin"); + trace_transform!("transform_to_db_futures begin"); // TODO possible without box? // let item_inp = Box::pin(item_inp); item_inp.map(move |batch| { stats.item_recv.inc(); - trace!("transform_to_db_futures have batch len {}", batch.len()); + trace_transform!("transform_to_db_futures have batch len {}", batch.len()); let tsnow = Instant::now(); let mut res = Vec::with_capacity(32); for item in batch { @@ -284,46 +335,46 @@ where if ignore_writes { SmallVec::new() } else { - prepare_query_insert_futs(item, &data_store, &stats, tsnow) + prepare_query_insert_futs(item, &data_store) } } QueryItem::Msp(item) => { if ignore_writes { SmallVec::new() } else { - prepare_msp_insert_futs(item, &data_store, &stats, tsnow) + prepare_msp_insert_futs(item, &data_store) } } QueryItem::TimeBinSimpleF32V02(item) => { if ignore_writes { SmallVec::new() } else { - prepare_timebin_v02_insert_futs(item, &data_store, &stats, tsnow) + prepare_timebin_v02_insert_futs(item, &data_store, tsnow) } } QueryItem::BinWriteIndexV03(item) => { if ignore_writes { SmallVec::new() } else { - prepare_bin_write_index_v03_insert_futs(item, &data_store, &stats, tsnow) + prepare_bin_write_index_v03_insert_futs(item, &data_store, tsnow) } } QueryItem::Accounting(item) => { if ignore_writes { SmallVec::new() } else { - prepare_accounting_insert_futs(item, &data_store, &stats, tsnow) + prepare_accounting_insert_futs(item, &data_store, tsnow) } } QueryItem::AccountingRecv(item) => { if ignore_writes { SmallVec::new() } else { - prepare_accounting_recv_insert_futs(item, &data_store, &stats, tsnow) + prepare_accounting_recv_insert_futs(item, &data_store, tsnow) } } }; - trace!("prepared futs len {}", futs.len()); + trace_transform!("prepared futs len {}", futs.len()); res.extend(futs.into_iter()); } res @@ -334,7 +385,7 @@ fn inspect_items( item_inp: Receiver>, worker_name: String, ) -> impl Stream> { - trace!("transform_to_db_futures begin"); + trace_inspect!("transform_to_db_futures begin"); // TODO possible without box? // let item_inp = Box::pin(item_inp); item_inp.inspect(move |batch| { @@ -363,44 +414,30 @@ fn inspect_items( }) } -fn prepare_msp_insert_futs( - item: MspItem, - data_store: &Arc, - stats: &Arc, - tsnow: Instant, -) -> SmallVec<[InsertFut; 4]> { +fn prepare_msp_insert_futs(item: MspItem, data_store: &Arc) -> SmallVec<[FutJob; 4]> { trace2!("execute MSP bump"); - stats.inserts_msp().inc(); - { - let dt = tsnow.saturating_duration_since(item.ts_net()); - let dt_ms = 1000 * dt.as_secs() as u32 + dt.subsec_millis(); - stats.item_lat_net_worker().ingest(dt_ms); - } let fut = insert_msp_fut( item.series(), item.ts_msp(), - item.ts_net(), data_store.scy.clone(), data_store.qu_insert_ts_msp.clone(), - stats.clone(), ); + let fut = FutJob { + fut, + ts_net: item.ts_net(), + }; let futs = smallvec![fut]; futs } -fn prepare_query_insert_futs( - item: InsertItem, - data_store: &Arc, - stats: &Arc, - tsnow: Instant, -) -> SmallVec<[InsertFut; 4]> { - stats.inserts_value().inc(); +fn prepare_query_insert_futs(item: InsertItem, data_store: &Arc) -> SmallVec<[FutJob; 4]> { let item_ts_net = item.ts_net; - let dt = tsnow.saturating_duration_since(item_ts_net); - let dt_ms = 1000 * dt.as_secs() as u32 + dt.subsec_millis(); - stats.item_lat_net_worker().ingest(dt_ms); let do_insert = true; - let fut = insert_item_fut(item, &data_store, do_insert, stats); + let fut = insert_item_fut(item, &data_store, do_insert); + let fut = FutJob { + fut, + ts_net: item_ts_net, + }; let futs = smallvec![fut]; futs } @@ -408,9 +445,8 @@ fn prepare_query_insert_futs( fn prepare_timebin_v02_insert_futs( item: TimeBinSimpleF32V02, data_store: &Arc, - stats: &Arc, tsnow: Instant, -) -> SmallVec<[InsertFut; 4]> { +) -> SmallVec<[FutJob; 4]> { let params = ( item.series.id() as i64, item.binlen, @@ -423,15 +459,12 @@ fn prepare_timebin_v02_insert_futs( item.dev, item.lst, ); - // TODO would be better to count inserts only on completed insert - stats.inserted_binned().inc(); let fut = InsertFut::new( data_store.scy.clone(), data_store.qu_insert_binned_scalar_f32_v02.clone(), params, - tsnow, - stats.clone(), ); + let fut = FutJob { fut, ts_net: tsnow }; let futs = smallvec![fut]; // TODO match on the query result: @@ -451,19 +484,15 @@ fn prepare_timebin_v02_insert_futs( fn prepare_bin_write_index_v03_insert_futs( item: BinWriteIndexV03, data_store: &Arc, - stats: &Arc, tsnow: Instant, -) -> SmallVec<[InsertFut; 4]> { +) -> SmallVec<[FutJob; 4]> { let params = (item.series, item.pbp, item.msp, item.rt, item.lsp, item.binlen); - // TODO would be better to count inserts only on completed insert - stats.inserted_binned().inc(); let fut = InsertFut::new( data_store.scy.clone(), data_store.qu_insert_bin_write_index_v03.clone(), params, - tsnow, - stats.clone(), ); + let fut = FutJob { fut, ts_net: tsnow }; let futs = smallvec![fut]; // TODO match on the query result: @@ -483,9 +512,8 @@ fn prepare_bin_write_index_v03_insert_futs( fn prepare_accounting_insert_futs( item: Accounting, data_store: &Arc, - stats: &Arc, tsnow: Instant, -) -> SmallVec<[InsertFut; 4]> { +) -> SmallVec<[FutJob; 4]> { let params = ( item.part, item.ts.sec() as i64, @@ -493,13 +521,8 @@ fn prepare_accounting_insert_futs( item.count, item.bytes, ); - let fut = InsertFut::new( - data_store.scy.clone(), - data_store.qu_account_00.clone(), - params, - tsnow, - stats.clone(), - ); + let fut = InsertFut::new(data_store.scy.clone(), data_store.qu_account_00.clone(), params); + let fut = FutJob { fut, ts_net: tsnow }; let futs = smallvec![fut]; futs } @@ -507,9 +530,8 @@ fn prepare_accounting_insert_futs( fn prepare_accounting_recv_insert_futs( item: AccountingRecv, data_store: &Arc, - stats: &Arc, tsnow: Instant, -) -> SmallVec<[InsertFut; 4]> { +) -> SmallVec<[FutJob; 4]> { let params = ( item.part, item.ts.sec() as i64, @@ -517,13 +539,8 @@ fn prepare_accounting_recv_insert_futs( item.count, item.bytes, ); - let fut = InsertFut::new( - data_store.scy.clone(), - data_store.qu_account_recv_00.clone(), - params, - tsnow, - stats.clone(), - ); + let fut = InsertFut::new(data_store.scy.clone(), data_store.qu_account_recv_00.clone(), params); + let fut = FutJob { fut, ts_net: tsnow }; let futs = smallvec![fut]; futs } diff --git a/scywr/src/iteminsertqueue.rs b/scywr/src/iteminsertqueue.rs index 819879c..482486d 100644 --- a/scywr/src/iteminsertqueue.rs +++ b/scywr/src/iteminsertqueue.rs @@ -12,8 +12,6 @@ use netpod::TsNano; use netpod::channelstatus::ChannelStatus; use netpod::channelstatus::ChannelStatusClosedReason; use scylla::QueryResult; -use scylla::frame::value::Value; -use scylla::frame::value::ValueList; use scylla::prepared_statement::PreparedStatement; use scylla::serialize::row::SerializeRow; use scylla::serialize::value::SerializeValue; @@ -21,8 +19,6 @@ use scylla::transport::errors::DbError; use scylla::transport::errors::QueryError; use series::ChannelStatusSeriesId; use series::SeriesId; -use series::msp::PrebinnedPartitioning; -use stats::InsertWorkerStats; use std::net::SocketAddrV4; use std::pin::Pin; use std::ptr::NonNull; @@ -593,18 +589,16 @@ struct InsParCom { series: SeriesId, ts_msp: TsMs, ts_lsp: DtNano, - ts_net: Instant, #[allow(unused)] do_insert: bool, - stats: Arc, } fn insert_scalar_gen_fut(par: InsParCom, val: ST, qu: Arc, scy: Arc) -> InsertFut where - ST: Value + SerializeValue + Send + 'static, + ST: SerializeValue + Send + 'static, { let params = (par.series.to_i64(), par.ts_msp.to_i64(), par.ts_lsp.to_i64(), val); - InsertFut::new(scy, qu, params, par.ts_net, par.stats) + InsertFut::new(scy, qu, params) } fn insert_scalar_enum_gen_fut( @@ -615,8 +609,8 @@ fn insert_scalar_enum_gen_fut( scy: Arc, ) -> InsertFut where - ST1: Value + SerializeValue + Send + 'static, - ST2: Value + SerializeValue + Send + 'static, + ST1: SerializeValue + Send + 'static, + ST2: SerializeValue + Send + 'static, { let params = ( par.series.to_i64(), @@ -625,13 +619,12 @@ where val, valstr, ); - InsertFut::new(scy, qu, params, par.ts_net, par.stats) + InsertFut::new(scy, qu, params) } -// val: Vec where ST: Value + SerializeValue + Send + 'static, fn insert_array_gen_fut(par: InsParCom, val: Vec, qu: Arc, scy: Arc) -> InsertFut { let params = (par.series.to_i64(), par.ts_msp.to_i64(), par.ts_lsp.to_i64(), val); - InsertFut::new(scy, qu, params, par.ts_net, par.stats) + InsertFut::new(scy, qu, params) } #[pin_project::pin_project] @@ -646,26 +639,13 @@ pub struct InsertFut { } impl InsertFut { - pub fn new( - scy: Arc, - qu: Arc, - params: V, - // timestamp when we first encountered the data to-be inserted, for metrics - tsnet: Instant, - stats: Arc, - ) -> Self { + pub fn new(scy: Arc, qu: Arc, params: V) -> Self { let scy_ref = unsafe { NonNull::from(scy.as_ref()).as_ref() }; let qu_ref = unsafe { NonNull::from(qu.as_ref()).as_ref() }; let fut = scy_ref.execute_unpaged(qu_ref, params); - let fut = fut.map(move |x| { - let dt = tsnet.elapsed(); - let dt_ms = 1000 * dt.as_secs() as u32 + dt.subsec_millis(); - stats.item_lat_net_store().ingest(dt_ms); - x - }); let fut = taskrun::tokio::task::unconstrained(fut); let fut = Box::pin(fut); - // let fut = StackFuture::from(fut); + // let _ff = StackFuture::from(fut); Self { scy, qu, fut } } @@ -687,25 +667,12 @@ impl Future for InsertFut { } } -pub fn insert_msp_fut( - series: SeriesId, - ts_msp: TsMs, - // for stats, the timestamp when we received that data - tsnet: Instant, - scy: Arc, - qu: Arc, - stats: Arc, -) -> InsertFut { +pub fn insert_msp_fut(series: SeriesId, ts_msp: TsMs, scy: Arc, qu: Arc) -> InsertFut { let params = (series.to_i64(), ts_msp.to_i64()); - InsertFut::new(scy, qu, params, tsnet, stats) + InsertFut::new(scy, qu, params) } -pub fn insert_item_fut( - item: InsertItem, - data_store: &DataStore, - do_insert: bool, - stats: &Arc, -) -> InsertFut { +pub fn insert_item_fut(item: InsertItem, data_store: &DataStore, do_insert: bool) -> InsertFut { let scy = data_store.scy.clone(); use DataValue::*; match item.val { @@ -714,9 +681,7 @@ pub fn insert_item_fut( series: item.series, ts_msp: item.ts_msp, ts_lsp: item.ts_lsp, - ts_net: item.ts_net, do_insert, - stats: stats.clone(), }; use ScalarValue::*; match val { @@ -742,9 +707,7 @@ pub fn insert_item_fut( series: item.series, ts_msp: item.ts_msp, ts_lsp: item.ts_lsp, - ts_net: item.ts_net, do_insert, - stats: stats.clone(), }; use ArrayValue::*; let blob = val.to_binary_blob(); diff --git a/scywr/src/lib.rs b/scywr/src/lib.rs index b18151b..df6760a 100644 --- a/scywr/src/lib.rs +++ b/scywr/src/lib.rs @@ -1,10 +1,8 @@ pub mod access; pub mod config; -pub mod fut; pub mod futbatch; pub mod futbatchgen; pub mod futinsert; -pub mod futinsertloop; pub mod insertqueues; pub mod insertworker; pub mod iteminsertqueue; diff --git a/serde_helper/src/serde_dummy.rs b/serde_helper/src/serde_dummy.rs index c66f919..ffbc0eb 100644 --- a/serde_helper/src/serde_dummy.rs +++ b/serde_helper/src/serde_dummy.rs @@ -1,7 +1,6 @@ #[allow(non_snake_case)] pub mod serde_dummy { use serde::Serializer; - use std::time::Instant; #[allow(unused)] pub fn serialize(val: &T, ser: S) -> Result diff --git a/serieswriter/src/rtwriter.rs b/serieswriter/src/rtwriter.rs index ec412e4..c83cadc 100644 --- a/serieswriter/src/rtwriter.rs +++ b/serieswriter/src/rtwriter.rs @@ -198,7 +198,6 @@ where res_lt = Self::write_inner(&mut self.state_lt, item.clone(), ts_net, tsev, &mut iqdqs.lt_rf3_qu)?; res_mt = Self::write_inner(&mut self.state_mt, item.clone(), ts_net, tsev, &mut iqdqs.mt_rf3_qu)?; res_st = if self.do_st_rf1 { - // Self::write_inner(&mut self.state_st, item.clone(), ts_net, tsev, &mut iqdqs.st_rf1_qu)? Self::write_inner(&mut self.state_st, item.clone(), ts_net, tsev, &mut iqdqs.st_rf3_qu)? } else { Self::write_inner(&mut self.state_st, item.clone(), ts_net, tsev, &mut iqdqs.st_rf3_qu)? diff --git a/stats/Cargo.toml b/stats/Cargo.toml index 055158a..46f1ce9 100644 --- a/stats/Cargo.toml +++ b/stats/Cargo.toml @@ -13,5 +13,5 @@ rand_xoshiro = "0.6.0" stats_types = { path = "../stats_types" } stats_proc = { path = "../stats_proc" } log = { path = "../log" } -mettrics = { version = "0.0.6", path = "../../mettrics" } +mettrics = { version = "0.0.7", path = "../../mettrics" } ca_proto = { path = "../../daqbuf-ca-proto", package = "daqbuf-ca-proto" } diff --git a/stats/mettdecl.rs b/stats/mettdecl.rs index f89e9b1..1b1005e 100644 --- a/stats/mettdecl.rs +++ b/stats/mettdecl.rs @@ -5,12 +5,18 @@ mod Metrics { job_ok, job_err, } + enum histolog2s { + job_dt1, + job_dt2, + job_dt_net, + } } mod Metrics { type StructName = CaConnMetrics; enum counters { metrics_emit, + metrics_emit_final, ioid_read_begin, ioid_read_done, ioid_read_timeout, @@ -91,9 +97,17 @@ mod Metrics { type Name = scy_inswork; } enum counters { - proc_cpu_v0_inc, - proc_cpu_v0_dec, - proc_mem_rss_inc, - proc_mem_rss_dec, + handle_event, + caconnset_health_response, + channel_send_err, + } + enum values { + proc_cpu_v0, + proc_mem_rss, + iqtx_len_st_rf1, + iqtx_len_st_rf3, + iqtx_len_mt_rf3, + iqtx_len_lt_rf3, + iqtx_len_lt_rf3_lat5, } } diff --git a/stats/src/mett.rs b/stats/src/mett.rs index 821f7ab..65b5cff 100644 --- a/stats/src/mett.rs +++ b/stats/src/mett.rs @@ -1,4 +1,5 @@ use mettrics::types::CounterU32; use mettrics::types::HistoLog2; +use mettrics::types::ValueU32; mettrics::macros::make_metrics!("mettdecl.rs"); diff --git a/stats/src/stats.rs b/stats/src/stats.rs index 282f024..edc3965 100644 --- a/stats/src/stats.rs +++ b/stats/src/stats.rs @@ -287,38 +287,14 @@ stats_proc::stats_struct!(( stats_struct( name(DaemonStats), prefix(daemon), - counters( - critical_error, - todo_mark, - ticker_token_acquire_error, - ticker_token_release_error, - handle_timer_tick_count, - ioc_search_err, - ioc_search_some, - ioc_search_none, - lookupaddr_ok, - events, - event_ca_conn, - ca_conn_status_done, - ca_conn_status_feedback_timeout, - ca_conn_status_feedback_recv, - ca_conn_status_feedback_no_dst, - ca_echo_timeout_total, - caconn_done_channel_state_reset, - insert_worker_spawned, - insert_worker_join_ok, - insert_worker_join_ok_err, - insert_worker_join_err, - caconnset_health_response, - channel_send_err, - ), + counters(asdasd,), values( channel_unknown_address, channel_search_pending, channel_with_address, channel_no_address, connset_health_lat_ema, - iqtx_len_st_rf1, + // iqtx_len_st_rf1, iqtx_len_st_rf3, iqtx_len_mt_rf3, iqtx_len_lt_rf3,