From eee67b916fe0c8a5fec8e291a8b9f6775803410d Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 10 May 2022 15:48:25 +0200 Subject: [PATCH] Gather some runtime stats --- Cargo.toml | 2 +- daqingest/src/bin/daqingest.rs | 16 +- daqingest/src/daqingest.rs | 2 + netfetch/src/ca.rs | 66 ++++-- netfetch/src/ca/conn.rs | 407 +++++++++++++++++++++------------ netfetch/src/ca/proto.rs | 14 ++ netfetch/src/ca/store.rs | 7 + netfetch/src/series.rs | 7 +- stats/Cargo.toml | 1 + stats/src/stats.rs | 162 +++++++++++++ 10 files changed, 502 insertions(+), 182 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d4ee7be..ba16d4a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ members = ["log", "netfetch", "daqingest"] [profile.release] opt-level = 2 -debug = 1 +debug = 2 overflow-checks = false debug-assertions = false lto = "thin" diff --git a/daqingest/src/bin/daqingest.rs b/daqingest/src/bin/daqingest.rs index 6056d90..f1a1957 100644 --- a/daqingest/src/bin/daqingest.rs +++ b/daqingest/src/bin/daqingest.rs @@ -3,13 +3,14 @@ use daqingest::{ChannelAccess, DaqIngestOpts, SubCmd}; use err::Error; pub fn main() -> Result<(), Error> { - taskrun::run(async { + let opts = DaqIngestOpts::parse(); + log::info!("daqingest version {}", clap::crate_version!()); + let runtime = taskrun::get_runtime_opts(opts.nworkers.unwrap_or(12), 32); + let res = runtime.block_on(async move { if false { return Err(Error::with_msg_no_trace(format!("unknown command"))); } else { } - let opts = DaqIngestOpts::parse(); - log::info!("daqingest version {}", clap::crate_version!()); match opts.subcmd { SubCmd::Bsread(k) => netfetch::zmtp::zmtp_client(k.into()).await?, SubCmd::ListPkey => daqingest::query::list_pkey().await?, @@ -26,5 +27,12 @@ pub fn main() -> Result<(), Error> { }, } Ok(()) - }) + }); + match res { + Ok(k) => Ok(k), + Err(e) => { + log::error!("Catched: {:?}", e); + Err(e) + } + } } diff --git a/daqingest/src/daqingest.rs b/daqingest/src/daqingest.rs index e246cb6..e4542a1 100644 --- a/daqingest/src/daqingest.rs +++ b/daqingest/src/daqingest.rs @@ -14,6 +14,8 @@ pub struct DaqIngestOpts { pub tag: Option, #[clap(subcommand)] pub subcmd: SubCmd, + #[clap(long)] + pub nworkers: Option, } #[derive(Debug, Parser)] diff --git a/netfetch/src/ca.rs b/netfetch/src/ca.rs index f7eb5e9..b43b1a0 100644 --- a/netfetch/src/ca.rs +++ b/netfetch/src/ca.rs @@ -11,6 +11,7 @@ use log::*; use netpod::Database; use scylla::batch::Consistency; use serde::{Deserialize, Serialize}; +use stats::CaConnVecStats; use std::collections::BTreeMap; use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use std::path::PathBuf; @@ -232,7 +233,7 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { let pg_client = Arc::new(pg_client); let scy = scylla::SessionBuilder::new() .known_node("sf-nube-11:19042") - .default_consistency(Consistency::Quorum) + .default_consistency(Consistency::One) .use_keyspace("ks1", true) .build() .await @@ -253,46 +254,52 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { error!("can not find address of channel {}", ch); } else { let addr: &str = rows[0].get(0); - let addr: SocketAddrV4 = match addr.parse() { - Ok(k) => k, - Err(e) => { - error!("can not parse {addr:?} {e:?}"); - continue; - } - }; - if ix % 1 == 0 { - info!("{} {} {:?}", ix, ch, addr); - } - if !channels_by_host.contains_key(&addr) { - channels_by_host.insert(addr, vec![ch.to_string()]); + if addr == "" { + // TODO the address was searched before but could not be found. } else { - channels_by_host.get_mut(&addr).unwrap().push(ch.to_string()); + let addr: SocketAddrV4 = match addr.parse() { + Ok(k) => k, + Err(e) => { + error!("can not parse {addr:?} {e:?}"); + continue; + } + }; + if ix % 100 == 0 { + info!("{} {} {:?}", ix, ch, addr); + } + if !channels_by_host.contains_key(&addr) { + channels_by_host.insert(addr, vec![ch.to_string()]); + } else { + channels_by_host.get_mut(&addr).unwrap().push(ch.to_string()); + } } } } if opts.abort_after_search == 1 { return Ok(()); } - info!("CONNECT TO HOSTS"); let data_store = Arc::new(DataStore::new(pg_client, scy.clone()).await?); let mut conn_jhs = vec![]; + let mut conn_stats_all = vec![]; for (host, channels) in channels_by_host { if false && host.ip() != &"172.26.24.76".parse::().unwrap() { continue; } let data_store = data_store.clone(); + debug!("Create TCP connection to {:?}", (host.ip(), host.port())); + let addr = SocketAddrV4::new(host.ip().clone(), host.port()); + let tcp = TcpStream::connect(addr).await?; + let mut conn = CaConn::new(tcp, addr, data_store.clone()); + conn_stats_all.push(conn.stats()); + for c in channels { + conn.channel_add(c); + } let conn_block = async move { - info!("Create TCP connection to {:?}", (host.ip(), host.port())); - let addr = SocketAddrV4::new(host.ip().clone(), host.port()); - let tcp = TcpStream::connect(addr).await?; - let mut conn = CaConn::new(tcp, addr, data_store.clone()); - for c in channels { - conn.channel_add(c); - } while let Some(item) = conn.next().await { match item { - Ok(k) => { - trace!("CaConn gives item: {k:?}"); + Ok(_) => { + // TODO test if performance can be noticed: + //trace!("CaConn gives item: {k:?}"); } Err(e) => { error!("CaConn gives error: {e:?}"); @@ -305,6 +312,17 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { let jh = tokio::spawn(conn_block); conn_jhs.push(jh); } + let mut agg_last = CaConnVecStats::new(Instant::now()); + loop { + tokio::time::sleep(Duration::from_millis(2000)).await; + let mut agg = CaConnVecStats::new(Instant::now()); + for st in &conn_stats_all { + agg.push(&st); + } + let diff = agg.diff_against(&agg_last); + info!("{diff}"); + agg_last = agg; + } for jh in conn_jhs { match jh.await { Ok(k) => match k { diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 90b127b..273c5c5 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -11,10 +11,11 @@ use libc::c_int; use log::*; use netpod::timeunits::SEC; use netpod::{ScalarType, Shape}; +use stats::CaConnStats; use std::collections::{BTreeMap, VecDeque}; use std::net::{Ipv4Addr, SocketAddrV4}; use std::pin::Pin; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; use std::sync::Arc; use std::task::{Context, Poll}; use std::time::{Duration, Instant, SystemTime}; @@ -53,6 +54,8 @@ struct CreatedState { shape: Shape, ts_created: Instant, state: MonitoringState, + ts_msp_last: u64, + inserted_in_ts_msp: u64, } #[derive(Debug)] @@ -98,8 +101,9 @@ macro_rules! insert_scalar_impl { ts_lsp: u64, val: $valty, ts_msp_changed: bool, - st: Option, - sh: Option, + st: ScalarType, + sh: Shape, + inserts_discarded: &AtomicU64, ) { let pulse = 0 as u64; let params = ( @@ -118,8 +122,8 @@ macro_rules! insert_scalar_impl { (series.id() as u32 % TABLE_SERIES_MOD) as i32, series.id() as i64, ts_msp as i64, - st.map(|x| x.to_scylla_i32()), - sh.map(|x| x.to_scylla_vec()), + st.to_scylla_i32(), + sh.to_scylla_vec(), ), ); let fut2 = ScyInsertFut::new( @@ -132,8 +136,7 @@ macro_rules! insert_scalar_impl { Box::pin(fut3) as _ }; if futs_queue.len() >= INSERT_FUTS_MAX { - warn!("can not keep up"); - // TODO count these events, this means dataloss. + inserts_discarded.fetch_add(1, Ordering::Release); } else { futs_queue.push(fut); } @@ -153,8 +156,9 @@ macro_rules! insert_array_impl { ts_lsp: u64, val: Vec<$valty>, ts_msp_changed: bool, - st: Option, - sh: Option, + st: ScalarType, + sh: Shape, + inserts_discarded: &AtomicU64, ) { let pulse = 0 as u64; let params = ( @@ -173,8 +177,8 @@ macro_rules! insert_array_impl { (series.id() as u32 % TABLE_SERIES_MOD) as i32, series.id() as i64, ts_msp as i64, - st.map(|x| x.to_scylla_i32()), - sh.map(|x| x.to_scylla_vec()), + st.to_scylla_i32(), + sh.to_scylla_vec(), ), ); let fut2 = ScyInsertFut::new( @@ -187,8 +191,7 @@ macro_rules! insert_array_impl { Box::pin(fut3) as _ }; if futs_queue.len() >= INSERT_FUTS_MAX { - warn!("can not keep up"); - // TODO count these events, this means dataloss. + inserts_discarded.fetch_add(1, Ordering::Release); } else { futs_queue.push(fut); } @@ -203,81 +206,74 @@ insert_scalar_impl!(insert_scalar_f32, f32, qu_insert_scalar_f32); insert_scalar_impl!(insert_scalar_f64, f64, qu_insert_scalar_f64); insert_scalar_impl!(insert_scalar_string, String, qu_insert_scalar_string); +insert_array_impl!(insert_array_i8, i8, qu_insert_array_i8); insert_array_impl!(insert_array_f32, f32, qu_insert_array_f32); insert_array_impl!(insert_array_f64, f64, qu_insert_array_f64); macro_rules! match_scalar_value_insert { ($stv:ident, $insf:ident, $val:expr, $comm:expr) => {{ - let (data_store, futs_queue, ch_s, series, ts_msp, ts_lsp, ts_msp_changed, channel_scalar_type, channel_shape) = + let (data_store, futs_queue, series, ts_msp, ts_lsp, ts_msp_changed, scalar_type, shape, inserts_discarded) = $comm; - match ch_s { - ChannelState::Created(st) => match st.shape { - Shape::Scalar => match st.scalar_type { - ScalarType::$stv => $insf( - data_store, - futs_queue, - series, - ts_msp, - ts_lsp, - $val, - ts_msp_changed, - channel_scalar_type, - channel_shape, - ), - _ => { - error!("unexpected value type insf {:?}", stringify!($insf)); - } - }, + match shape { + Shape::Scalar => match scalar_type { + ScalarType::$stv => $insf( + data_store, + futs_queue, + series, + ts_msp, + ts_lsp, + $val, + ts_msp_changed, + scalar_type, + shape, + inserts_discarded, + ), _ => { - error!( - "unexpected value shape insf {:?} st.shape {:?}", - stringify!($insf), - st.shape - ); + error!("unexpected value type insf {:?}", stringify!($insf)); } }, _ => { - error!("got value but channel not created insf {:?}", stringify!($insf)); + error!( + "unexpected value shape insf {:?} shape {:?}", + stringify!($insf), + shape + ); } } - }}; + };}; } macro_rules! match_array_value_insert { ($stv:ident, $insf:ident, $val:expr, $comm:expr) => {{ - let (data_store, futs_queue, ch_s, series, ts_msp, ts_lsp, ts_msp_changed, channel_scalar_type, channel_shape) = + let (data_store, futs_queue, series, ts_msp, ts_lsp, ts_msp_changed, scalar_type, shape, inserts_discarded) = $comm; - match ch_s { - ChannelState::Created(st) => match st.shape { - Shape::Wave(_) => match st.scalar_type { - ScalarType::$stv => $insf( - data_store, - futs_queue, - series, - ts_msp, - ts_lsp, - $val, - ts_msp_changed, - channel_scalar_type, - channel_shape, - ), - _ => { - error!("unexpected value type insf {:?}", stringify!($insf)); - } - }, + match shape { + Shape::Wave(_) => match scalar_type { + ScalarType::$stv => $insf( + data_store, + futs_queue, + series, + ts_msp, + ts_lsp, + $val, + ts_msp_changed, + scalar_type, + shape, + inserts_discarded, + ), _ => { - error!( - "unexpected value shape insf {:?} st.shape {:?}", - stringify!($insf), - st.shape - ); + error!("unexpected value type insf {:?}", stringify!($insf)); } }, _ => { - error!("got value but channel not created insf {:?}", stringify!($insf)); + error!( + "unexpected value shape insf {:?} shape {:?}", + stringify!($insf), + shape + ); } } - }}; + };}; } pub struct CaConn { @@ -287,6 +283,7 @@ pub struct CaConn { ioid_store: IdStore, subid_store: IdStore, channels: BTreeMap, + init_state_count: u64, cid_by_name: BTreeMap, cid_by_subid: BTreeMap, ts_msp_last_by_series: BTreeMap, @@ -298,6 +295,7 @@ pub struct CaConn { >, value_insert_futs: FuturesUnordered> + Send>>>, remote_addr_dbg: SocketAddrV4, + stats: Arc, } impl CaConn { @@ -309,6 +307,7 @@ impl CaConn { ioid_store: IdStore::new(), subid_store: IdStore::new(), channels: BTreeMap::new(), + init_state_count: 0, cid_by_name: BTreeMap::new(), cid_by_subid: BTreeMap::new(), ts_msp_last_by_series: BTreeMap::new(), @@ -318,14 +317,21 @@ impl CaConn { fut_get_series: FuturesUnordered::new(), value_insert_futs: FuturesUnordered::new(), remote_addr_dbg, + stats: Arc::new(CaConnStats::new()), } } + pub fn stats(&self) -> Arc { + self.stats.clone() + } + pub fn channel_add(&mut self, channel: String) { let cid = self.cid_by_name(&channel); if self.channels.contains_key(&cid) { } else { self.channels.insert(cid, ChannelState::Init); + // TODO do not count, use separate queue for those channels. + self.init_state_count += 1; } } @@ -344,6 +350,7 @@ impl CaConn { self.name_by_cid.get(&cid).map(|x| x.as_str()) } + #[inline(never)] fn handle_insert_futs(&mut self, cx: &mut Context) -> Result<(), Error> { use Poll::*; while self.value_insert_futs.len() > 0 { @@ -355,12 +362,13 @@ impl CaConn { Ok(()) } + #[inline(never)] fn handle_get_series_futs(&mut self, cx: &mut Context) -> Result<(), Error> { use Poll::*; while self.fut_get_series.len() > 0 { match self.fut_get_series.poll_next_unpin(cx) { Ready(Some(Ok(k))) => { - info!("Have SeriesId {k:?}"); + //info!("Have SeriesId {k:?}"); let cid = k.0; let sid = k.1; let data_type = k.2; @@ -369,6 +377,9 @@ impl CaConn { Existence::Created(k) => k, Existence::Existing(k) => k, }; + if series.id() == 0 { + warn!("Weird series id: {series:?}"); + } let subid = self.subid_store.next(); self.cid_by_subid.insert(subid, cid); let name = self.name_by_cid(cid).unwrap().to_string(); @@ -391,6 +402,8 @@ impl CaConn { shape: Shape::from_ca_count(data_count)?, ts_created: Instant::now(), state: MonitoringState::AddingEvent(series), + ts_msp_last: 0, + inserted_in_ts_msp: u64::MAX, }); let scalar_type = ScalarType::from_ca_id(data_type)?; let shape = Shape::from_ca_count(data_count)?; @@ -413,20 +426,109 @@ impl CaConn { Ok(()) } - fn handle_event_add_res(&mut self, ev: proto::EventAddRes) { + #[inline(never)] + fn event_add_insert( + &mut self, + series: SeriesId, + scalar_type: ScalarType, + shape: Shape, + ev: proto::EventAddRes, + cid: u32, + ts_msp_last: u64, + inserted_in_ts_msp: u64, + ) -> Result<(), Error> { + // TODO where to actually get the timestamp of the event from? + let ts = SystemTime::now(); + let epoch = ts.duration_since(std::time::UNIX_EPOCH).unwrap(); + // TODO decide on better msp/lsp: random offset! + // As long as one writer is active, the msp is arbitrary. + let ts = epoch.as_secs() * 1000000000 + epoch.subsec_nanos() as u64; + let ts_msp = if inserted_in_ts_msp > 2000 { + let ts_msp = ts / (60 * SEC) * (60 * SEC); + if let ChannelState::Created(st) = self.channels.get_mut(&cid).unwrap() { + st.ts_msp_last = ts_msp; + st.inserted_in_ts_msp = 1; + } else { + error!("logic error expect ChannelState::Created"); + } + ts_msp + } else { + if let ChannelState::Created(st) = self.channels.get_mut(&cid).unwrap() { + st.inserted_in_ts_msp += 1; + } else { + error!("logic error expect ChannelState::Created"); + } + ts_msp_last + }; + let ts_lsp = ts - ts_msp; + let ts_msp_changed = if let Some(ts_msp_cur) = self.ts_msp_last_by_series.get_mut(&series) { + if ts_msp != *ts_msp_cur { + *ts_msp_cur = ts_msp; + true + } else { + false + } + } else { + self.ts_msp_last_by_series.insert(series.clone(), ts_msp); + true + }; + // TODO make sure that I only accept types I expect. + use crate::ca::proto::CaDataScalarValue::*; + use crate::ca::proto::CaDataValue::*; + let data_store = self.data_store.clone(); + let futs_queue = &mut self.value_insert_futs; + let comm = ( + data_store, + futs_queue, + series, + ts_msp, + ts_lsp, + ts_msp_changed, + scalar_type, + shape, + &self.stats.inserts_discarded, + ); + match ev.value { + Scalar(v) => match v { + I8(val) => match_scalar_value_insert!(I8, insert_scalar_i8, val, comm), + I16(val) => match_scalar_value_insert!(I16, insert_scalar_i16, val, comm), + I32(val) => match_scalar_value_insert!(I32, insert_scalar_i32, val, comm), + F32(val) => match_scalar_value_insert!(F32, insert_scalar_f32, val, comm), + F64(val) => match_scalar_value_insert!(F64, insert_scalar_f64, val, comm), + String(val) => match_scalar_value_insert!(STRING, insert_scalar_string, val, comm), + _ => { + warn!("can not handle Scalar {:?}", v); + } + }, + Array(v) => { + use crate::ca::proto::CaDataArrayValue::*; + match v { + I8(val) => match_array_value_insert!(I8, insert_array_i8, val, comm), + F32(val) => match_array_value_insert!(F32, insert_array_f32, val, comm), + F64(val) => match_array_value_insert!(F64, insert_array_f64, val, comm), + _ => { + warn!("can not handle Array ty {} n {}", ev.data_type, ev.data_count); + } + } + } + } + self.stats.inserts_started.fetch_add(1, Ordering::Release); + Ok(()) + } + + #[inline(never)] + fn handle_event_add_res(&mut self, ev: proto::EventAddRes) -> Result<(), Error> { // TODO handle subid-not-found which can also be peer error: let cid = *self.cid_by_subid.get(&ev.subid).unwrap(); // TODO get rid of the string clone when I don't want the log output any longer: - let name: String = self.name_by_cid(cid).unwrap().into(); + //let name: String = self.name_by_cid(cid).unwrap().into(); // TODO handle not-found error: let mut series_2 = None; let ch_s = self.channels.get_mut(&cid).unwrap(); - let mut channel_scalar_type = None; - let mut channel_shape = None; match ch_s { ChannelState::Created(st) => { - channel_scalar_type = Some(st.scalar_type.clone()); - channel_shape = Some(st.shape.clone()); + let scalar_type = st.scalar_type.clone(); + let shape = st.shape.clone(); match st.state { MonitoringState::AddingEvent(ref series) => { let series = series.clone(); @@ -445,84 +547,29 @@ impl CaConn { st.ts_last = Instant::now(); } _ => { - error!("unexpected state: EventAddRes while having {ch_s:?}"); + error!("unexpected state: EventAddRes while having {:?}", st.state); } } + let series = match series_2 { + Some(k) => k, + None => { + error!("handle_event_add_res but no series"); + // TODO allow return Result + return Err(format!("no series id on insert").into()); + } + }; + let ts_msp_last = st.ts_msp_last; + let inserted_in_ts_msp = st.inserted_in_ts_msp; + self.event_add_insert(series, scalar_type, shape, ev, cid, ts_msp_last, inserted_in_ts_msp)?; } _ => { error!("unexpected state: EventAddRes while having {ch_s:?}"); } } - { - let series = match series_2 { - Some(k) => k, - None => { - error!("handle_event_add_res but no series"); - // TODO allow return Result - return; - } - }; - // TODO where to actually get the time from? - let ts = SystemTime::now(); - let epoch = ts.duration_since(std::time::UNIX_EPOCH).unwrap(); - // TODO decide on better msp/lsp: random offset! - // As long as one writer is active, the msp is arbitrary. - let ts = epoch.as_secs() * 1000000000 + epoch.subsec_nanos() as u64; - let ts_msp = ts / (60 * SEC) * (60 * SEC); - let ts_lsp = ts - ts_msp; - let ts_msp_changed = if let Some(ts_msp_cur) = self.ts_msp_last_by_series.get_mut(&series) { - if ts_msp != *ts_msp_cur { - *ts_msp_cur = ts_msp; - true - } else { - false - } - } else { - self.ts_msp_last_by_series.insert(series.clone(), ts_msp); - true - }; - // TODO make sure that I only accept types I expect. - use crate::ca::proto::CaDataScalarValue::*; - use crate::ca::proto::CaDataValue::*; - let data_store = self.data_store.clone(); - let futs_queue = &mut self.value_insert_futs; - let comm = ( - data_store, - futs_queue, - ch_s, - series, - ts_msp, - ts_lsp, - ts_msp_changed, - channel_scalar_type, - channel_shape, - ); - match ev.value { - Scalar(v) => match v { - I8(val) => match_scalar_value_insert!(I8, insert_scalar_i8, val, comm), - I16(val) => match_scalar_value_insert!(I16, insert_scalar_i16, val, comm), - I32(val) => match_scalar_value_insert!(I32, insert_scalar_i32, val, comm), - F32(val) => match_scalar_value_insert!(F32, insert_scalar_f32, val, comm), - F64(val) => match_scalar_value_insert!(F64, insert_scalar_f64, val, comm), - String(val) => match_scalar_value_insert!(STRING, insert_scalar_string, val, comm), - _ => { - warn!("can not handle Scalar {:?}", v); - } - }, - Array(v) => { - use crate::ca::proto::CaDataArrayValue::*; - match v { - F32(val) => match_array_value_insert!(F32, insert_array_f32, val, comm), - F64(val) => match_array_value_insert!(F64, insert_array_f64, val, comm), - _ => { - warn!("can not handle Array ty {} n {}", ev.data_type, ev.data_count); - } - } - } - } - } + Ok(()) } + #[inline(never)] fn handle_conn_listen(&mut self, cx: &mut Context) -> Option>>> { use Poll::*; match self.proto.poll_next_unpin(cx) { @@ -538,7 +585,9 @@ impl CaConn { error!("See some unexpected version {n} channel search may not work."); Some(Ready(Some(Ok(())))) } else { - info!("Received peer version {n}"); + if n != 13 { + warn!("Received peer version {n}"); + } self.state = CaConnState::PeerReady; None } @@ -563,8 +612,12 @@ impl CaConn { } } + #[inline(never)] fn check_channels_state_init(&mut self, msgs_tmp: &mut Vec) -> Result<(), Error> { // TODO profile, efficient enough? + if self.init_state_count == 0 { + return Ok(()); + } let keys: Vec = self.channels.keys().map(|x| *x).collect(); for cid in keys { match self.channels.get_mut(&cid).unwrap() { @@ -576,7 +629,7 @@ impl CaConn { Ok(k) => k, Err(e) => return Err(e), }; - info!("Sending CreateChan for {}", name); + debug!("Sending CreateChan for {}", name); let msg = CaMsg { ty: CaMsgTy::CreateChan(CreateChan { cid, @@ -590,6 +643,7 @@ impl CaConn { cid, ts_beg: Instant::now(), }; + self.init_state_count -= 1; } _ => {} } @@ -597,14 +651,21 @@ impl CaConn { Ok(()) } + #[inline(never)] fn handle_peer_ready(&mut self, cx: &mut Context) -> Poll>> { use Poll::*; + let mut ts1 = Instant::now(); // TODO unify with Listen state where protocol gets polled as well. let mut msgs_tmp = vec![]; self.check_channels_state_init(&mut msgs_tmp)?; + let ts2 = Instant::now(); + self.stats + .time_check_channels_state_init + .fetch_add((ts2.duration_since(ts1) * 1000000).as_secs(), Ordering::Release); + ts1 = ts2; let mut do_wake_again = false; if msgs_tmp.len() > 0 { - info!("msgs_tmp.len() {}", msgs_tmp.len()); + //info!("msgs_tmp.len() {}", msgs_tmp.len()); do_wake_again = true; } // TODO be careful to not overload outgoing message queue. @@ -627,7 +688,7 @@ impl CaConn { let sid = k.sid; // TODO handle error: let name = self.name_by_cid(cid).unwrap().to_string(); - info!("CreateChanRes {name:?}"); + debug!("CreateChanRes {name:?}"); let scalar_type = ScalarType::from_ca_id(k.data_type)?; let shape = Shape::from_ca_count(k.data_count)?; // TODO handle not-found error: @@ -639,6 +700,8 @@ impl CaConn { shape: shape.clone(), ts_created: Instant::now(), state: MonitoringState::FetchSeriesId, + ts_msp_last: 0, + inserted_in_ts_msp: u64::MAX, }); // TODO handle error in different way. Should most likely not abort. let cd = ChannelDescDecoded { @@ -660,7 +723,15 @@ impl CaConn { self.fut_get_series.push(Box::pin(fut) as _); do_wake_again = true; } - CaMsgTy::EventAddRes(k) => Self::handle_event_add_res(self, k), + CaMsgTy::EventAddRes(k) => { + let res = Self::handle_event_add_res(self, k); + let ts2 = Instant::now(); + self.stats + .time_handle_event_add_res + .fetch_add((ts2.duration_since(ts1) * 1000000).as_secs(), Ordering::Release); + ts1 = ts2; + res? + } _ => {} } } @@ -680,7 +751,8 @@ impl CaConn { Pending => Pending, }; if do_wake_again { - info!("do_wake_again"); + // TODO remove the need for this: + trace!("do_wake_again"); cx.waker().wake_by_ref(); } res @@ -692,10 +764,22 @@ impl Stream for CaConn { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; + let ts1 = Instant::now(); self.poll_count += 1; - loop { + let ret = loop { + let ts1 = Instant::now(); self.handle_insert_futs(cx)?; + let ts2 = Instant::now(); + self.stats + .poll_time_handle_insert_futs + .fetch_add((ts2.duration_since(ts1) * 1000000).as_secs(), Ordering::AcqRel); + let ts1 = ts2; self.handle_get_series_futs(cx)?; + let ts2 = Instant::now(); + self.stats + .poll_time_get_series_futs + .fetch_add((ts2.duration_since(ts1) * 1000000).as_secs(), Ordering::AcqRel); + let mut ts1 = ts2; break match &self.state { CaConnState::Init => { let msg = CaMsg { ty: CaMsgTy::Version }; @@ -709,14 +793,35 @@ impl Stream for CaConn { self.state = CaConnState::Listen; continue; } - CaConnState::Listen => match self.handle_conn_listen(cx) { + CaConnState::Listen => match { + let res = self.handle_conn_listen(cx); + let ts2 = Instant::now(); + self.stats + .time_handle_conn_listen + .fetch_add((ts2.duration_since(ts1) * 1000000).as_secs(), Ordering::AcqRel); + ts1 = ts2; + res + } { Some(k) => k, None => continue, }, - CaConnState::PeerReady => self.handle_peer_ready(cx), + CaConnState::PeerReady => { + let res = self.handle_peer_ready(cx); + let ts2 = Instant::now(); + self.stats + .time_handle_peer_ready + .fetch_add((ts2.duration_since(ts1) * 1000000).as_secs(), Ordering::AcqRel); + ts1 = ts2; + res + } CaConnState::Done => Ready(None), }; - } + }; + let ts2 = Instant::now(); + self.stats + .poll_time_all + .fetch_add((ts2.duration_since(ts1) * 1000000).as_secs(), Ordering::AcqRel); + ret } } diff --git a/netfetch/src/ca/proto.rs b/netfetch/src/ca/proto.rs index f149351..3013b75 100644 --- a/netfetch/src/ca/proto.rs +++ b/netfetch/src/ca/proto.rs @@ -107,6 +107,7 @@ pub enum CaDataScalarValue { #[derive(Clone, Debug)] pub enum CaDataArrayValue { + I8(Vec), F32(Vec), F64(Vec), } @@ -520,6 +521,19 @@ impl CaMsg { } }, Shape::Wave(n) => match ca_st { + CaScalarType::I8 => { + type ST = i8; + const STL: usize = std::mem::size_of::(); + let nn = (n as usize).min(payload.len() / STL); + let mut a = Vec::with_capacity(nn); + let mut bb = &payload[..]; + for _ in 0..nn { + let v = ST::from_be_bytes(bb[..STL].try_into()?); + bb = &bb[STL..]; + a.push(v); + } + CaDataValue::Array(CaDataArrayValue::I8(a)) + } CaScalarType::F32 => { type ST = f32; const STL: usize = std::mem::size_of::(); diff --git a/netfetch/src/ca/store.rs b/netfetch/src/ca/store.rs index 60f16be..3e7cb46 100644 --- a/netfetch/src/ca/store.rs +++ b/netfetch/src/ca/store.rs @@ -47,6 +47,7 @@ pub struct DataStore { pub qu_insert_scalar_f32: Arc, pub qu_insert_scalar_f64: Arc, pub qu_insert_scalar_string: Arc, + pub qu_insert_array_i8: Arc, pub qu_insert_array_f32: Arc, pub qu_insert_array_f64: Arc, pub chan_reg: Arc, @@ -95,6 +96,11 @@ impl DataStore { .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; let qu_insert_scalar_string = Arc::new(q); // array + let q = scy + .prepare("insert into events_array_i8 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)") + .await + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + let qu_insert_array_i8 = Arc::new(q); let q = scy .prepare("insert into events_array_f32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)") .await @@ -116,6 +122,7 @@ impl DataStore { qu_insert_scalar_f32, qu_insert_scalar_f64, qu_insert_scalar_string, + qu_insert_array_i8, qu_insert_array_f32, qu_insert_array_f64, }; diff --git a/netfetch/src/series.rs b/netfetch/src/series.rs index 4cfa961..30a357c 100644 --- a/netfetch/src/series.rs +++ b/netfetch/src/series.rs @@ -120,6 +120,9 @@ pub async fn get_series_id(pg_client: &PgClient, cd: &ChannelDescDecoded) -> Res h.update(format!("{:?} {:?}", scalar_type, shape).as_bytes()); let f = h.finalize(); let mut series = u64::from_le_bytes(f.as_slice()[0..8].try_into().unwrap()); + if series > i64::MAX as u64 { + series &= 0x7fffffffffffffff; + } for _ in 0..2000 { if series > i64::MAX as u64 { series = 0; @@ -129,7 +132,7 @@ pub async fn get_series_id(pg_client: &PgClient, cd: &ChannelDescDecoded) -> Res concat!( "insert into series_by_channel", " (series, facility, channel, scalar_type, shape_dims, agg_kind)", - " values ($1, $2, $3, $4, $5, 0)" + " values ($1, $2, $3, $4, $5, 0) on conflict do nothing" ), &[&(series as i64), &facility, channel_name, &scalar_type, &shape], ) @@ -139,7 +142,7 @@ pub async fn get_series_id(pg_client: &PgClient, cd: &ChannelDescDecoded) -> Res let series = Existence::Created(SeriesId(series)); return Ok(series); } else { - error!("tried to insert but series exists..."); + error!("tried to insert {series:?} for {channel_name} but it exists"); } tokio::time::sleep(Duration::from_millis(20)).await; series += 1; diff --git a/stats/Cargo.toml b/stats/Cargo.toml index 87f31b5..fcf30a2 100644 --- a/stats/Cargo.toml +++ b/stats/Cargo.toml @@ -10,3 +10,4 @@ path = "src/stats.rs" [dependencies] log = { path = "../log" } err = { path = "../../daqbuffer/err" } +libc = "0.2" diff --git a/stats/src/stats.rs b/stats/src/stats.rs index 5ce7e6d..fc23133 100644 --- a/stats/src/stats.rs +++ b/stats/src/stats.rs @@ -1,5 +1,13 @@ +use std::fmt; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering::{AcqRel, Acquire}; +use std::sync::RwLock; use std::time::{Duration, Instant}; +const US: u64 = 1000; +const MS: u64 = US * 1000; +const SEC: u64 = MS * 1000; + pub struct EMA { ema: f32, emv: f32, @@ -75,3 +83,157 @@ impl CheckEvery { } } } + +pub struct CaConnStats { + pub poll_time_all: AtomicU64, + pub poll_time_handle_insert_futs: AtomicU64, + pub poll_time_get_series_futs: AtomicU64, + pub time_handle_conn_listen: AtomicU64, + pub time_handle_peer_ready: AtomicU64, + pub time_check_channels_state_init: AtomicU64, + pub time_handle_event_add_res: AtomicU64, + pub inserts_started: AtomicU64, + pub inserts_discarded: AtomicU64, +} + +impl CaConnStats { + pub fn new() -> Self { + Self { + poll_time_all: AtomicU64::new(0), + poll_time_handle_insert_futs: AtomicU64::new(0), + poll_time_get_series_futs: AtomicU64::new(0), + time_handle_conn_listen: AtomicU64::new(0), + time_handle_peer_ready: AtomicU64::new(0), + time_check_channels_state_init: AtomicU64::new(0), + time_handle_event_add_res: AtomicU64::new(0), + inserts_started: AtomicU64::new(0), + inserts_discarded: AtomicU64::new(0), + } + } +} + +pub struct CaConnVecStats { + pub ts_create: RwLock, + pub poll_time_all: AtomicU64, + pub poll_time_handle_insert_futs: AtomicU64, + pub poll_time_get_series_futs: AtomicU64, + pub time_handle_conn_listen: AtomicU64, + pub time_handle_peer_ready: AtomicU64, + pub time_check_channels_state_init: AtomicU64, + pub time_handle_event_add_res: AtomicU64, + pub inserts_started: AtomicU64, + pub inserts_discarded: AtomicU64, +} + +pub struct CaConnVecStatsDiff { + pub dt: AtomicU64, + pub poll_time_all: AtomicU64, + pub poll_time_handle_insert_futs: AtomicU64, + pub poll_time_get_series_futs: AtomicU64, + pub time_handle_conn_listen: AtomicU64, + pub time_handle_peer_ready: AtomicU64, + pub time_check_channels_state_init: AtomicU64, + pub time_handle_event_add_res: AtomicU64, + pub inserts_started: AtomicU64, + pub inserts_discarded: AtomicU64, +} + +impl CaConnVecStats { + pub fn new(ts_create: Instant) -> Self { + Self { + ts_create: RwLock::new(ts_create), + poll_time_all: AtomicU64::new(0), + poll_time_handle_insert_futs: AtomicU64::new(0), + poll_time_get_series_futs: AtomicU64::new(0), + time_handle_conn_listen: AtomicU64::new(0), + time_handle_peer_ready: AtomicU64::new(0), + time_check_channels_state_init: AtomicU64::new(0), + time_handle_event_add_res: AtomicU64::new(0), + inserts_started: AtomicU64::new(0), + inserts_discarded: AtomicU64::new(0), + } + } + + pub fn push(&mut self, k: &CaConnStats) { + self.poll_time_all.fetch_add(k.poll_time_all.load(Acquire), AcqRel); + self.poll_time_handle_insert_futs + .fetch_add(k.poll_time_handle_insert_futs.load(Acquire), AcqRel); + self.poll_time_get_series_futs + .fetch_add(k.poll_time_get_series_futs.load(Acquire), AcqRel); + self.time_handle_conn_listen + .fetch_add(k.time_handle_conn_listen.load(Acquire), AcqRel); + self.time_handle_peer_ready + .fetch_add(k.time_handle_peer_ready.load(Acquire), AcqRel); + self.time_check_channels_state_init + .fetch_add(k.time_check_channels_state_init.load(Acquire), AcqRel); + self.time_handle_event_add_res + .fetch_add(k.time_handle_event_add_res.load(Acquire), AcqRel); + self.inserts_started.fetch_add(k.inserts_started.load(Acquire), AcqRel); + self.inserts_discarded + .fetch_add(k.inserts_discarded.load(Acquire), AcqRel); + } + + pub fn diff_against(&self, k: &Self) -> CaConnVecStatsDiff { + let dur = self + .ts_create + .read() + .unwrap() + .duration_since(*k.ts_create.read().unwrap()); + CaConnVecStatsDiff { + dt: AtomicU64::new(dur.as_secs() * SEC + dur.subsec_nanos() as u64), + poll_time_all: AtomicU64::new(self.poll_time_all.load(Acquire) - k.poll_time_all.load(Acquire)), + poll_time_handle_insert_futs: AtomicU64::new( + self.poll_time_handle_insert_futs.load(Acquire) - k.poll_time_handle_insert_futs.load(Acquire), + ), + poll_time_get_series_futs: AtomicU64::new( + self.poll_time_get_series_futs.load(Acquire) - k.poll_time_get_series_futs.load(Acquire), + ), + time_handle_conn_listen: AtomicU64::new( + self.time_handle_conn_listen.load(Acquire) - k.time_handle_conn_listen.load(Acquire), + ), + time_handle_peer_ready: AtomicU64::new( + self.time_handle_peer_ready.load(Acquire) - k.time_handle_peer_ready.load(Acquire), + ), + time_check_channels_state_init: AtomicU64::new( + self.time_check_channels_state_init.load(Acquire) - k.time_check_channels_state_init.load(Acquire), + ), + time_handle_event_add_res: AtomicU64::new( + self.time_handle_event_add_res.load(Acquire) - k.time_handle_event_add_res.load(Acquire), + ), + inserts_started: AtomicU64::new(self.inserts_started.load(Acquire) - k.inserts_started.load(Acquire)), + inserts_discarded: AtomicU64::new(self.inserts_discarded.load(Acquire) - k.inserts_discarded.load(Acquire)), + } + } +} + +impl fmt::Display for CaConnVecStatsDiff { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + let insert_freq = self.inserts_started.load(Acquire) / (self.dt.load(Acquire) / SEC); + let poll_time = self.poll_time_all.load(Acquire); + let poll_time_handle_insert_futs = self.poll_time_handle_insert_futs.load(Acquire); + let poll_time_get_series_futs = self.poll_time_get_series_futs.load(Acquire); + let time_handle_conn_listen = self.time_handle_conn_listen.load(Acquire); + let time_handle_peer_ready = self.time_handle_peer_ready.load(Acquire); + let time_check_channels_state_init = self.time_check_channels_state_init.load(Acquire); + let time_handle_event_add_res = self.time_check_channels_state_init.load(Acquire); + let poll_pct_handle_insert_futs = poll_time_handle_insert_futs * 100 / poll_time; + let poll_pct_get_series_futs = poll_time_get_series_futs * 100 / poll_time; + let pct_handle_conn_listen = time_handle_conn_listen * 100 / poll_time; + let pct_handle_peer_ready = time_handle_peer_ready * 100 / poll_time; + let pct_check_channels_state_init = time_check_channels_state_init * 100 / poll_time; + let pct_handle_event_add_res = time_handle_event_add_res * 100 / poll_time; + let inserts_discarded_freq = self.inserts_discarded.load(Acquire); + write!( + fmt, + "insfreq {} disc {} poll_time {:5} ms inserts {:2}% seriesid {:2}% listen {:2}% peer {:2}% checkinit {:2}% evadd {:2}%", + insert_freq,inserts_discarded_freq, + poll_time / 1000, + poll_pct_handle_insert_futs, + poll_pct_get_series_futs, + pct_handle_conn_listen, + pct_handle_peer_ready, + pct_check_channels_state_init, + pct_handle_event_add_res + ) + } +}