diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index e1e3c51..7ae59e8 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -42,6 +42,7 @@ use series::SeriesId; use stats::CaConnStats; use stats::CaProtoStats; use stats::IntervalEma; +use stats::XorShift32; use std::collections::BTreeMap; use std::collections::VecDeque; use std::net::SocketAddrV4; @@ -59,6 +60,7 @@ use taskrun::tokio; use tokio::net::TcpStream; const CONNECTING_TIMEOUT: Duration = Duration::from_millis(6000); +const IOC_PING_IVL: Duration = Duration::from_millis(80000); #[allow(unused)] macro_rules! trace2 { @@ -171,7 +173,7 @@ struct CreatedState { #[allow(unused)] sid: u32, data_type: u16, - data_count: u16, + data_count: u32, scalar_type: ScalarType, shape: Shape, #[allow(unused)] @@ -501,6 +503,7 @@ pub struct CaConn { inserts_counter: u64, extra_inserts_conf: ExtraInsertsConf, ioc_ping_last: Instant, + ioc_ping_next: Instant, ioc_ping_start: Option, storage_insert_sender: SenderPolling, ca_conn_event_out_queue: VecDeque, @@ -510,6 +513,7 @@ pub struct CaConn { thr_msg_poll: ThrottleTrace, ca_proto_stats: Arc, weird_count: usize, + rng: XorShift32, } #[cfg(DISABLED)] @@ -531,6 +535,7 @@ impl CaConn { ca_proto_stats: Arc, ) -> Self { let (cq_tx, cq_rx) = async_channel::bounded(32); + let mut rng = XorShift32::new_from_time(); Self { opts, backend, @@ -556,6 +561,7 @@ impl CaConn { inserts_counter: 0, extra_inserts_conf: ExtraInsertsConf::new(), ioc_ping_last: Instant::now(), + ioc_ping_next: Instant::now() + Self::ioc_ping_ivl_rng(&mut rng), ioc_ping_start: None, storage_insert_sender: SenderPolling::new(storage_insert_tx), ca_conn_event_out_queue: VecDeque::new(), @@ -565,9 +571,14 @@ impl CaConn { thr_msg_poll: ThrottleTrace::new(Duration::from_millis(10000)), ca_proto_stats, weird_count: 0, + rng, } } + fn ioc_ping_ivl_rng(rng: &mut XorShift32) -> Duration { + IOC_PING_IVL * 100 / (70 + (rng.next() % 60)) + } + fn new_self_ticker() -> Pin> { Box::pin(tokio::time::sleep(Duration::from_millis(500))) } @@ -965,12 +976,11 @@ impl CaConn { self.trigger_shutdown(ChannelStatusClosedReason::IocTimeout); } } else { - // TODO randomize delay a bit - if self.ioc_ping_last.elapsed() > Duration::from_millis(120000) { + if self.ioc_ping_next < tsnow { if let Some(proto) = &mut self.proto { self.stats.ping_start().inc(); self.ioc_ping_start = Some(Instant::now()); - let msg = CaMsg { ty: CaMsgTy::Echo }; + let msg = CaMsg::from_ty_ts(CaMsgTy::Echo, tsnow); proto.push_out(msg); } else { self.stats.ping_no_proto().inc(); @@ -1046,7 +1056,7 @@ impl CaConn { cid: Cid, sid: u32, data_type: u16, - data_count: u16, + data_count: u32, series: SeriesId, ) -> Result<(), Error> { let tsnow = Instant::now(); @@ -1069,14 +1079,13 @@ impl CaConn { self.cid_by_subid.insert(subid, cid); // TODO convert first to CaDbrType, set to `Time`, then convert to ix: let data_type_asked = data_type + 14; - let msg = CaMsg { - ty: CaMsgTy::EventAdd(EventAdd { - sid, - data_type: data_type_asked, - data_count, - subid, - }), - }; + let ty = CaMsgTy::EventAdd(EventAdd { + sid, + data_type: data_type_asked, + data_count: data_count as _, + subid, + }); + let msg = CaMsg::from_ty_ts(ty, tsnow); let proto = self.proto.as_mut().unwrap(); proto.push_out(msg); // TODO handle not-found error: @@ -1271,19 +1280,9 @@ impl CaConn { let epoch = ts.duration_since(std::time::UNIX_EPOCH).unwrap(); epoch.as_secs() * SEC + epoch.subsec_nanos() as u64 }; - let ts = ev.value.ts.map_or(0, |x| x.get()); + let ts = ev.value.ts; let ts_diff = ts.abs_diff(ts_local); - if ts_diff > SEC * 300 { - self.stats.ca_ts_off_4.inc(); - //warn!("Bad time for {name} {ts} vs {ts_local} diff {}", ts_diff / SEC); - // TODO mute this channel for some time, discard the event. - } else if ts_diff > SEC * 120 { - self.stats.ca_ts_off_3.inc(); - } else if ts_diff > SEC * 20 { - self.stats.ca_ts_off_2.inc(); - } else if ts_diff > SEC * 3 { - self.stats.ca_ts_off_1.inc(); - } + self.stats.ca_ts_off().ingest((ts_diff / MS) as u32); if tsnow >= st.insert_next_earliest { //let channel_state = self.channels.get_mut(&cid).unwrap(); let item_queue = &mut self.insert_item_queue; @@ -1441,12 +1440,13 @@ impl CaConn { Ok(k) => k.to_string(), Err(e) => return Err(e), }; - let msg = CaMsg { - ty: CaMsgTy::CreateChan(CreateChan { + let msg = CaMsg::from_ty_ts( + CaMsgTy::CreateChan(CreateChan { cid: cid.0, channel: name.into(), }), - }; + Instant::now(), + ); msgs_tmp.push(msg); // TODO handle not-found error: let ch_s = self.channels.get_mut(&cid).unwrap(); @@ -1573,28 +1573,15 @@ impl CaConn { CaMsgTy::Echo => { // let addr = &self.remote_addr_dbg; if let Some(started) = self.ioc_ping_start { - let dt = started.elapsed().as_secs_f32() * 1e3; - if dt <= 10. { - self.stats.pong_recv_010ms().inc(); - } else if dt <= 25. { - self.stats.pong_recv_025ms().inc(); - } else if dt <= 50. { - self.stats.pong_recv_050ms().inc(); - } else if dt <= 100. { - self.stats.pong_recv_100ms().inc(); - } else if dt <= 200. { - self.stats.pong_recv_200ms().inc(); - } else if dt <= 400. { - self.stats.pong_recv_400ms().inc(); - } else { - self.stats.pong_recv_slow().inc(); - // warn!("Received Echo {dt:10.0}ms {addr:?}"); - } + let dt = started.elapsed(); + let dt = dt.as_secs() as u32 + dt.subsec_millis(); + self.stats.pong_recv_lat().ingest(dt); } else { let addr = &self.remote_addr_dbg; warn!("Received Echo even though we didn't asked for it {addr:?}"); } - self.ioc_ping_last = Instant::now(); + self.ioc_ping_last = tsnow; + self.ioc_ping_next = tsnow + Self::ioc_ping_ivl_rng(&mut self.rng); self.ioc_ping_start = None; } CaMsgTy::CreateChanFail(msg) => { @@ -1622,6 +1609,7 @@ impl CaConn { warn!("CaConn sees: {msg:?}"); } } + #[cfg(DISABLED)] CaMsgTy::IssueDataCount(hi, stat, sev, secs, nanos) => { let cid = *self.cid_by_subid.get(&hi.param2()).unwrap(); let name = self.name_by_cid.get(&cid).unwrap(); @@ -1676,7 +1664,7 @@ impl CaConn { Break(Pending) } - fn handle_conn_state(&mut self, cx: &mut Context) -> Result>, Error> { + fn handle_conn_state(&mut self, tsnow: Instant, cx: &mut Context) -> Result>, Error> { use Poll::*; match &mut self.state { CaConnState::Unconnected(_since) => { @@ -1694,6 +1682,7 @@ impl CaConn { Ready(connect_result) => { match connect_result { Ok(Ok(tcp)) => { + self.stats.tcp_connected.inc(); let addr = addr.clone(); self.insert_item_queue .push_back(QueryItem::ConnectionStatus(ConnectionStatusItem { @@ -1761,15 +1750,11 @@ impl CaConn { trace4!("Init"); let hostname = self.local_epics_hostname.clone(); let proto = self.proto.as_mut().unwrap(); - let msg = CaMsg { ty: CaMsgTy::Version }; + let msg = CaMsg::from_ty_ts(CaMsgTy::Version, tsnow); proto.push_out(msg); - let msg = CaMsg { - ty: CaMsgTy::ClientName, - }; + let msg = CaMsg::from_ty_ts(CaMsgTy::ClientName, tsnow); proto.push_out(msg); - let msg = CaMsg { - ty: CaMsgTy::HostName(hostname), - }; + let msg = CaMsg::from_ty_ts(CaMsgTy::HostName(hostname), tsnow); proto.push_out(msg); self.state = CaConnState::Listen; Ok(Ready(Some(()))) @@ -1820,6 +1805,7 @@ impl CaConn { fn loop_inner(&mut self, cx: &mut Context) -> Result>, Error> { use Poll::*; + let tsnow = Instant::now(); let mut have_progress = false; for _ in 0..64 { self.stats.caconn_loop2_count.inc(); @@ -1828,7 +1814,7 @@ impl CaConn { } else if self.insert_item_queue.len() >= self.opts.insert_queue_max { break; } else { - match self.handle_conn_state(cx) { + match self.handle_conn_state(tsnow, cx) { Ok(x) => match x { Ready(Some(())) => { have_progress = true; diff --git a/netfetch/src/ca/findioc.rs b/netfetch/src/ca/findioc.rs index 3b2cc68..f91512d 100644 --- a/netfetch/src/ca/findioc.rs +++ b/netfetch/src/ca/findioc.rs @@ -274,6 +274,7 @@ impl FindIocStream { sock: i32, stats: &IocFinderStats, ) -> Poll), Error>> { + let tsnow = Instant::now(); let mut saddr_mem = [0u8; std::mem::size_of::()]; let mut saddr_len: libc::socklen_t = saddr_mem.len() as _; let mut buf = vec![0u8; 1024]; @@ -343,7 +344,7 @@ impl FindIocStream { error!("incomplete message, missing payload"); break; } - let msg = CaMsg::from_proto_infos(&hi, nb.data(), 32).map_err(|e| e.to_string())?; + let msg = CaMsg::from_proto_infos(&hi, nb.data(), tsnow, 32).map_err(|e| e.to_string())?; nb.adv(hi.payload_len()).map_err(|e| e.to_string())?; msgs.push(msg); accounted += 16 + hi.payload_len(); diff --git a/netfetch/src/ca/proto.rs b/netfetch/src/ca/proto.rs index 54e0da3..820fa4a 100644 --- a/netfetch/src/ca/proto.rs +++ b/netfetch/src/ca/proto.rs @@ -17,6 +17,7 @@ use std::pin::Pin; use std::sync::Arc; use std::task::Context; use std::task::Poll; +use std::time::Instant; use taskrun::tokio; use tokio::io::AsyncRead; use tokio::io::AsyncWrite; @@ -44,6 +45,7 @@ pub enum Error { CaCommandNotSupported(u16), ParseAttemptInDoneState, UnexpectedHeader, + ExtendedHeaderBadCount, } const CA_PROTO_VERSION: u16 = 13; @@ -84,7 +86,7 @@ pub struct CreateChan { #[derive(Debug)] pub struct CreateChanRes { pub data_type: u16, - pub data_count: u16, + pub data_count: u32, pub cid: u32, pub sid: u32, } @@ -112,7 +114,7 @@ pub struct EventAdd { #[derive(Debug, Clone)] pub struct EventAddRes { pub data_type: u16, - pub data_count: u16, + pub data_count: u32, pub status: u32, pub subid: u32, pub value: CaEventValue, @@ -129,7 +131,7 @@ pub struct ReadNotify { #[derive(Debug)] pub struct ReadNotifyRes { pub data_type: u16, - pub data_count: u16, + pub data_count: u32, pub sid: u32, pub ioid: u32, } @@ -342,9 +344,9 @@ impl From for scywr::iteminsertqueue::DataValue { #[derive(Clone, Debug)] pub struct CaEventValue { - pub ts: Option, - pub status: Option, - pub severity: Option, + pub ts: u64, + pub status: u16, + pub severity: u16, pub data: CaDataValue, } @@ -367,7 +369,6 @@ pub enum CaMsgTy { ReadNotify(ReadNotify), ReadNotifyRes(ReadNotifyRes), Echo, - IssueDataCount(HeadInfo, u16, u16, u32, u32), } impl CaMsgTy { @@ -391,7 +392,6 @@ impl CaMsgTy { ReadNotify(_) => 0x0f, ReadNotifyRes(_) => 0x0f, Echo => 0x17, - IssueDataCount(..) => panic!(), } } @@ -425,7 +425,6 @@ impl CaMsgTy { panic!(); } Echo => 0, - IssueDataCount(..) => panic!(), } } @@ -452,7 +451,6 @@ impl CaMsgTy { ReadNotify(x) => x.data_type, ReadNotifyRes(x) => x.data_type, Echo => 0, - IssueDataCount(..) => panic!(), } } @@ -468,15 +466,23 @@ impl CaMsgTy { Search(_) => CA_PROTO_VERSION, SearchRes(_) => 0, CreateChan(_) => 0, - CreateChanRes(x) => x.data_count, + CreateChanRes(x) => { + panic!(); + x.data_count as _ + } CreateChanFail(_) => 0, AccessRightsRes(_) => 0, EventAdd(x) => x.data_count, - EventAddRes(x) => x.data_count, + EventAddRes(x) => { + panic!(); + x.data_count as _ + } ReadNotify(x) => x.data_count, - ReadNotifyRes(x) => x.data_count, + ReadNotifyRes(x) => { + panic!(); + x.data_count as _ + } Echo => 0, - IssueDataCount(..) => panic!(), } } @@ -500,7 +506,6 @@ impl CaMsgTy { ReadNotify(x) => x.sid, ReadNotifyRes(x) => x.sid, Echo => 0, - IssueDataCount(..) => panic!(), } } @@ -524,7 +529,6 @@ impl CaMsgTy { ReadNotify(x) => x.ioid, ReadNotifyRes(x) => x.ioid, Echo => 0, - IssueDataCount(..) => panic!(), } } @@ -589,7 +593,6 @@ impl CaMsgTy { ReadNotify(_) => {} ReadNotifyRes(_) => {} Echo => {} - IssueDataCount(..) => {} } } } @@ -626,9 +629,14 @@ macro_rules! convert_wave_value { #[derive(Debug)] pub struct CaMsg { pub ty: CaMsgTy, + pub ts: Instant, } impl CaMsg { + pub fn from_ty_ts(ty: CaMsgTy, ts: Instant) -> Self { + Self { ty, ts } + } + fn len(&self) -> usize { self.ty.len() } @@ -706,11 +714,14 @@ impl CaMsg { Ok(val) } - pub fn from_proto_infos(hi: &HeadInfo, payload: &[u8], array_truncate: usize) -> Result { + pub fn from_proto_infos( + hi: &HeadInfo, + payload: &[u8], + tsnow: Instant, + array_truncate: usize, + ) -> Result { let msg = match hi.cmdid { - 0x00 => CaMsg { - ty: CaMsgTy::VersionRes(hi.data_count), - }, + 0x00 => CaMsg::from_ty_ts(CaMsgTy::VersionRes(hi.data_count), tsnow), 0x0b => { let mut s = String::new(); s.extend(format!("{:?}", &payload[..payload.len().min(16)]).chars()); @@ -723,20 +734,16 @@ impl CaMsg { eid: hi.param2, msg: s, }; - CaMsg { ty: CaMsgTy::Error(e) } + CaMsg::from_ty_ts(CaMsgTy::Error(e), tsnow) } 20 => { let name = std::ffi::CString::new(payload) .map(|s| s.into_string().unwrap_or_else(|e| format!("{e:?}"))) .unwrap_or_else(|e| format!("{e:?}")); - CaMsg { - ty: CaMsgTy::ClientNameRes(ClientNameRes { name }), - } + CaMsg::from_ty_ts(CaMsgTy::ClientNameRes(ClientNameRes { name }), tsnow) } // TODO make response type for host name: - 21 => CaMsg { - ty: CaMsgTy::HostName("TODOx5288".into()), - }, + 21 => CaMsg::from_ty_ts(CaMsgTy::HostName("TODOx5288".into()), tsnow), 6 => { if hi.payload_len() != 8 { warn!("protocol error: search result is expected with fixed payload size 8"); @@ -748,40 +755,36 @@ impl CaMsg { return Err(Error::CaProtoVersionMissing); } let proto_version = u16::from_be_bytes(payload[0..2].try_into().map_err(|_| Error::BadSlice)?); - CaMsg { - ty: CaMsgTy::SearchRes(SearchRes { - tcp_port: hi.data_type, - addr: hi.param1, - id: hi.param2, - proto_version, - }), - } + let ty = CaMsgTy::SearchRes(SearchRes { + tcp_port: hi.data_type, + addr: hi.param1, + id: hi.param2, + proto_version, + }); + CaMsg::from_ty_ts(ty, tsnow) } 18 => { - CaMsg { - // TODO use different structs for request and response: - ty: CaMsgTy::CreateChanRes(CreateChanRes { - data_type: hi.data_type, - data_count: hi.data_count, - cid: hi.param1, - sid: hi.param2, - }), - } + let ty = CaMsgTy::CreateChanRes(CreateChanRes { + data_type: hi.data_type, + // TODO what am I supposed to use here in case of extended header? + data_count: hi.data_count() as _, + cid: hi.param1, + sid: hi.param2, + }); + CaMsg::from_ty_ts(ty, tsnow) } 22 => { - CaMsg { - // TODO use different structs for request and response: - ty: CaMsgTy::AccessRightsRes(AccessRightsRes { - cid: hi.param1, - rights: hi.param2, - }), - } + // TODO use different structs for request and response: + let ty = CaMsgTy::AccessRightsRes(AccessRightsRes { + cid: hi.param1, + rights: hi.param2, + }); + CaMsg::from_ty_ts(ty, tsnow) } 26 => { - CaMsg { - // TODO use different structs for request and response: - ty: CaMsgTy::CreateChanFail(CreateChanFail { cid: hi.param1 }), - } + // TODO use different structs for request and response: + let ty = CaMsgTy::CreateChanFail(CreateChanFail { cid: hi.param1 }); + CaMsg::from_ty_ts(ty, tsnow) } 1 => { use netpod::Shape; @@ -797,14 +800,8 @@ impl CaMsg { let ca_severity = u16::from_be_bytes(payload[2..4].try_into().map_err(|_| Error::BadSlice)?); let ca_secs = u32::from_be_bytes(payload[4..8].try_into().map_err(|_| Error::BadSlice)?); let ca_nanos = u32::from_be_bytes(payload[8..12].try_into().map_err(|_| Error::BadSlice)?); - if hi.data_count == 0 || hi.data_count > 1024 * 32 { - let msg = CaMsg { - ty: CaMsgTy::IssueDataCount(hi.clone(), ca_status, ca_severity, ca_secs, ca_nanos), - }; - return Ok(msg); - } - let ca_sh = Shape::from_ca_count(hi.data_count).map_err(|_| { - error!("BadCaCount hi.data_count {}", hi.data_count); + let ca_sh = Shape::from_ca_count(hi.data_count() as _).map_err(|_| { + error!("BadCaCount {hi:?}"); Error::BadCaCount })?; let meta_padding = match ca_dbr_ty.meta { @@ -841,21 +838,20 @@ impl CaMsg { }; let ts = SEC * (ca_secs as u64 + EPICS_EPOCH_OFFSET) + ca_nanos as u64; let value = CaEventValue { - ts: NonZeroU64::new(ts), - status: NonZeroU16::new(ca_status), - severity: NonZeroU16::new(ca_severity), + ts, + status: ca_status, + severity: ca_severity, data: value, }; let d = EventAddRes { data_type: hi.data_type, - data_count: hi.data_count, + data_count: hi.data_count() as _, status: hi.param1, subid: hi.param2, value, }; - CaMsg { - ty: CaMsgTy::EventAddRes(d), - } + let ty = CaMsgTy::EventAddRes(d); + CaMsg::from_ty_ts(ty, tsnow) } 15 => { if payload.len() == 8 { @@ -872,17 +868,16 @@ impl CaMsg { &payload[..payload.len().min(12)], ); } - CaMsg { - // TODO use different structs for request and response: - ty: CaMsgTy::ReadNotifyRes(ReadNotifyRes { - data_type: hi.data_type, - data_count: hi.data_count, - sid: hi.param1, - ioid: hi.param2, - }), - } + // TODO use different structs for request and response: + let ty = CaMsgTy::ReadNotifyRes(ReadNotifyRes { + data_type: hi.data_type, + data_count: hi.data_count() as _, + sid: hi.param1, + ioid: hi.param2, + }); + CaMsg::from_ty_ts(ty, tsnow) } - 0x17 => CaMsg { ty: CaMsgTy::Echo }, + 0x17 => CaMsg::from_ty_ts(CaMsgTy::Echo, tsnow), x => return Err(Error::CaCommandNotSupported(x)), }; Ok(msg) @@ -1065,6 +1060,7 @@ impl CaProto { fn loop_body(mut self: Pin<&mut Self>, cx: &mut Context) -> Result>, Error> { use Poll::*; + let tsnow = Instant::now(); let output_res_1: Option> = 'll1: loop { if self.out.len() == 0 { break None; @@ -1152,7 +1148,7 @@ impl CaProto { Ok(None) } }?; - let parse_res: Option = self.parse_item()?; + let parse_res: Option = self.parse_item(tsnow)?; match (output_res_2, read_res, parse_res) { (_, _, Some(item)) => Ok(Some(Ready(item))), (Some(Pending), _, _) => Ok(Some(Pending)), @@ -1165,7 +1161,7 @@ impl CaProto { } } - fn parse_item(&mut self) -> Result, Error> { + fn parse_item(&mut self, tsnow: Instant) -> Result, Error> { loop { if self.buf.len() < self.state.need_min() { break Ok(None); @@ -1178,17 +1174,12 @@ impl CaProto { if hi.payload_size == 0xffff { if hi.data_count != 0 { warn!("protocol error: {hi:?}"); - return Err(Error::UnexpectedHeader); + return Err(Error::ExtendedHeaderBadCount); } } if hi.payload_size == 0xffff { } else if hi.payload_size > 16368 { - if self.logged_proto_error_for_cid.contains_key(&sid) { - // TODO emit this as Item so that downstream can translate SID to name. - self.logged_proto_error_for_cid.insert(sid, true); - } - warn!("protocol error: {hi:?}"); - return Err(Error::UnexpectedHeader); + self.stats.payload_std_too_large().inc(); } } if hi.cmdid > 26 { @@ -1199,9 +1190,11 @@ impl CaProto { self.state = CaState::ExtHead(hi); Ok(None) } else { + // For extended messages, ingest on receive of extended header + self.stats.payload_size().ingest(hi.payload_len() as u32); if hi.payload_size == 0 { self.state = CaState::StdHead; - let msg = CaMsg::from_proto_infos(&hi, &[], self.array_truncate)?; + let msg = CaMsg::from_proto_infos(&hi, &[], tsnow, self.array_truncate)?; Ok(Some(CaItem::Msg(msg))) } else { self.state = CaState::Payload(hi); @@ -1212,8 +1205,9 @@ impl CaProto { CaState::ExtHead(hi) => { let payload_size = self.buf.read_u32_be()?; let data_count = self.buf.read_u32_be()?; + self.stats.payload_size().ingest(hi.payload_len() as u32); if payload_size > 1024 * 1024 * 32 { - self.stats.payload_very_large().inc(); + self.stats.payload_ext_very_large().inc(); if false { warn!( "ExtHead data_type {} payload_size {payload_size} data_count {data_count}", @@ -1227,9 +1221,6 @@ impl CaProto { "ExtHead data_type {} payload_size {payload_size} data_count {data_count}", hi.data_type ); - // let msg = CaMsg::from_proto_infos(hi, &[], self.array_truncate)?; - // self.state = CaState::StdHead; - // Ok(Some(CaItem::Msg(msg))) } let hi = hi.clone().with_ext(payload_size, data_count); self.state = CaState::Payload(hi); @@ -1237,7 +1228,11 @@ impl CaProto { } CaState::Payload(hi) => { let g = self.buf.read_bytes(hi.payload_len())?; - let msg = CaMsg::from_proto_infos(hi, g, self.array_truncate)?; + let msg = CaMsg::from_proto_infos(hi, g, tsnow, self.array_truncate)?; + // data-count is only reasonable for event messages + if let CaMsgTy::EventAddRes(e) = &msg.ty { + self.stats.data_count().ingest(hi.data_count() as u32); + } self.state = CaState::StdHead; Ok(Some(CaItem::Msg(msg))) } diff --git a/scywr/src/insertworker.rs b/scywr/src/insertworker.rs index fbc8eba..42f5b3a 100644 --- a/scywr/src/insertworker.rs +++ b/scywr/src/insertworker.rs @@ -191,31 +191,11 @@ async fn worker( let epoch = ts.duration_since(std::time::UNIX_EPOCH).unwrap(); epoch.as_secs() * SEC + epoch.subsec_nanos() as u64 }; - let dt = (tsnow / 1000000) as i64 - (item_ts_local / 1000000) as i64; - if dt < 0 { - stats.item_latency_neg().inc(); - } else if dt <= 25 { - stats.item_latency_025ms().inc(); - } else if dt <= 50 { - stats.item_latency_050ms().inc(); - } else if dt <= 100 { - stats.item_latency_100ms().inc(); - } else if dt <= 200 { - stats.item_latency_200ms().inc(); - } else if dt <= 400 { - stats.item_latency_400ms().inc(); - } else if dt <= 800 { - stats.item_latency_800ms().inc(); - } else if dt <= 1600 { - stats.item_latency_1600ms().inc(); - } else if dt <= 3200 { - stats.item_latency_3200ms().inc(); - } else { - stats.item_latency_large().inc(); - } + let dt = ((tsnow / 1000000) as u32).saturating_sub((item_ts_local / 1000000) as u32); + stats.item_lat_net_worker().ingest(dt); let insert_frac = insert_worker_opts.insert_frac.load(Ordering::Acquire); let do_insert = i1 % 1000 < insert_frac; - match insert_item(item, &ttls, &data_store, &stats, do_insert).await { + match insert_item(item, &ttls, &data_store, do_insert, &stats).await { Ok(_) => { stats.inserted_values().inc(); let tsnow = { @@ -223,18 +203,8 @@ async fn worker( let epoch = ts.duration_since(std::time::UNIX_EPOCH).unwrap(); epoch.as_secs() * SEC + epoch.subsec_nanos() as u64 }; - let dt = (tsnow / 1000000) as i64 - (item_ts_local / 1000000) as i64; - if dt <= 50 { - stats.item_commit_latency_0050ms().inc(); - } else if dt <= 200 { - stats.item_commit_latency_0200ms().inc(); - } else if dt <= 800 { - stats.item_commit_latency_0800ms().inc(); - } else if dt <= 3200 { - stats.item_commit_latency_3200ms().inc(); - } else { - stats.item_commit_latency_large().inc(); - } + let dt = ((tsnow / 1000000) as u32).saturating_sub((item_ts_local / 1000000) as u32); + stats.item_lat_net_store().ingest(dt); backoff = backoff_0; } Err(e) => { @@ -367,16 +337,21 @@ async fn worker_streamed( let mut stream = item_inp .map(|item| { stats.item_recv.inc(); + let tsnow_u64 = { + let ts = SystemTime::now(); + let epoch = ts.duration_since(std::time::UNIX_EPOCH).unwrap(); + epoch.as_secs() * SEC + epoch.subsec_nanos() as u64 + }; match item { - QueryItem::Insert(item) => prepare_query_insert_futs(item, &ttls, &data_store, &stats), + QueryItem::Insert(item) => prepare_query_insert_futs(item, &ttls, &data_store, &stats, tsnow_u64), QueryItem::ConnectionStatus(item) => { stats.inserted_connection_status().inc(); - let fut = insert_connection_status_fut(item, &ttls, &data_store); + let fut = insert_connection_status_fut(item, &ttls, &data_store, stats.clone()); smallvec![fut] } QueryItem::ChannelStatus(item) => { stats.inserted_channel_status().inc(); - insert_channel_status_fut(item, &ttls, &data_store) + insert_channel_status_fut(item, &ttls, &data_store, stats.clone()) } _ => { // TODO @@ -420,23 +395,29 @@ fn prepare_query_insert_futs( item: InsertItem, ttls: &Ttls, data_store: &Arc, - stats: &InsertWorkerStats, + stats: &Arc, + tsnow_u64: u64, ) -> SmallVec<[InsertFut; 4]> { stats.inserts_value().inc(); + let item_ts_local = item.ts_local; + let dt = ((tsnow_u64 / 1000000) as u32).saturating_sub((item_ts_local / 1000000) as u32); + stats.item_lat_net_worker().ingest(dt); let msp_bump = item.msp_bump; let series = item.series.clone(); let ts_msp = item.ts_msp; let do_insert = true; - let fut = insert_item_fut(item, &ttls, &data_store, do_insert); + let fut = insert_item_fut(item, &ttls, &data_store, do_insert, stats); let mut futs = smallvec![fut]; if msp_bump { stats.inserts_msp().inc(); let fut = insert_msp_fut( series, ts_msp, + item_ts_local, ttls, data_store.scy.clone(), data_store.qu_insert_ts_msp.clone(), + stats.clone(), ); futs.push(fut); } diff --git a/scywr/src/iteminsertqueue.rs b/scywr/src/iteminsertqueue.rs index dccefc0..b4dd897 100644 --- a/scywr/src/iteminsertqueue.rs +++ b/scywr/src/iteminsertqueue.rs @@ -269,9 +269,11 @@ struct InsParCom { series: u64, ts_msp: u64, ts_lsp: u64, + ts_local: u64, pulse: u64, ttl: u32, do_insert: bool, + stats: Arc, } fn insert_scalar_gen_fut(par: InsParCom, val: ST, qu: Arc, scy: Arc) -> InsertFut @@ -286,7 +288,7 @@ where val, par.ttl as i32, ); - InsertFut::new(scy, qu, params) + InsertFut::new(scy, qu, params, par.ts_local, par.stats) } fn insert_array_gen_fut(par: InsParCom, val: ST, qu: Arc, scy: Arc) -> InsertFut @@ -301,7 +303,7 @@ where val, par.ttl as i32, ); - InsertFut::new(scy, qu, params) + InsertFut::new(scy, qu, params, par.ts_local, par.stats) } #[pin_project::pin_project] @@ -320,10 +322,24 @@ impl InsertFut { scy: Arc, qu: Arc, params: V, + // timestamp when we first encountered the data to-be inserted, for metrics + tsnet: u64, + stats: Arc, ) -> Self { let scy_ref = unsafe { NonNull::from(scy.as_ref()).as_ref() }; let qu_ref = unsafe { NonNull::from(qu.as_ref()).as_ref() }; let fut = scy_ref.execute_paged(qu_ref, params, None); + let fut = fut.map(move |x| { + let item_ts_local = tsnet; + let tsnow_u64 = { + let ts = SystemTime::now(); + let epoch = ts.duration_since(std::time::UNIX_EPOCH).unwrap(); + epoch.as_secs() * SEC + epoch.subsec_nanos() as u64 + }; + let dt = ((tsnow_u64 / 1000000) as u32).saturating_sub((item_ts_local / 1000000) as u32); + stats.item_lat_net_store().ingest(dt); + x + }); let fut = taskrun::tokio::task::unconstrained(fut); let fut = Box::pin(fut); // let fut = StackFuture::from(fut); @@ -416,8 +432,8 @@ pub async fn insert_item( item: InsertItem, ttls: &Ttls, data_store: &DataStore, - stats: &InsertWorkerStats, do_insert: bool, + stats: &Arc, ) -> Result<(), Error> { if item.msp_bump { let params = (item.series.id() as i64, item.ts_msp as i64, ttls.index.as_secs() as i32); @@ -446,9 +462,11 @@ pub async fn insert_item( series: item.series.id(), ts_msp: item.ts_msp, ts_lsp: item.ts_lsp, + ts_local: item.ts_local, pulse: item.pulse, ttl: ttls.d0.as_secs() as _, do_insert, + stats: stats.clone(), }; use ScalarValue::*; match val { @@ -467,9 +485,11 @@ pub async fn insert_item( series: item.series.id(), ts_msp: item.ts_msp, ts_lsp: item.ts_lsp, + ts_local: item.ts_local, pulse: item.pulse, ttl: ttls.d1.as_secs() as _, do_insert, + stats: stats.clone(), }; use ArrayValue::*; match val { @@ -489,15 +509,24 @@ pub async fn insert_item( pub fn insert_msp_fut( series: SeriesId, ts_msp: u64, + // for stats, the timestamp when we received that data + tsnet: u64, ttls: &Ttls, scy: Arc, qu: Arc, + stats: Arc, ) -> InsertFut { let params = (series.id() as i64, ts_msp as i64, ttls.index.as_secs() as i32); - InsertFut::new(scy, qu, params) + InsertFut::new(scy, qu, params, tsnet, stats) } -pub fn insert_item_fut(item: InsertItem, ttls: &Ttls, data_store: &DataStore, do_insert: bool) -> InsertFut { +pub fn insert_item_fut( + item: InsertItem, + ttls: &Ttls, + data_store: &DataStore, + do_insert: bool, + stats: &Arc, +) -> InsertFut { let scy = data_store.scy.clone(); use DataValue::*; match item.val { @@ -506,9 +535,11 @@ pub fn insert_item_fut(item: InsertItem, ttls: &Ttls, data_store: &DataStore, do series: item.series.id(), ts_msp: item.ts_msp, ts_lsp: item.ts_lsp, + ts_local: item.ts_local, pulse: item.pulse, ttl: ttls.d0.as_secs() as _, do_insert, + stats: stats.clone(), }; use ScalarValue::*; match val { @@ -527,9 +558,11 @@ pub fn insert_item_fut(item: InsertItem, ttls: &Ttls, data_store: &DataStore, do series: item.series.id(), ts_msp: item.ts_msp, ts_lsp: item.ts_lsp, + ts_local: item.ts_local, pulse: item.pulse, ttl: ttls.d1.as_secs() as _, do_insert, + stats: stats.clone(), }; use ArrayValue::*; match val { @@ -544,13 +577,20 @@ pub fn insert_item_fut(item: InsertItem, ttls: &Ttls, data_store: &DataStore, do } } -pub fn insert_connection_status_fut(item: ConnectionStatusItem, ttls: &Ttls, data_store: &DataStore) -> InsertFut { +pub fn insert_connection_status_fut( + item: ConnectionStatusItem, + ttls: &Ttls, + data_store: &DataStore, + stats: Arc, +) -> InsertFut { let tsunix = item.ts.duration_since(UNIX_EPOCH).unwrap_or(Duration::ZERO); let secs = tsunix.as_secs() * SEC; let nanos = tsunix.subsec_nanos() as u64; let ts = secs + nanos; let ts_msp = ts / CONNECTION_STATUS_DIV * CONNECTION_STATUS_DIV; let ts_lsp = ts - ts_msp; + // TODO is that the good tsnet to use? + let tsnet = ts; let kind = item.status.to_kind(); let addr = format!("{}", item.addr); let params = ( @@ -564,6 +604,8 @@ pub fn insert_connection_status_fut(item: ConnectionStatusItem, ttls: &Ttls, dat data_store.scy.clone(), data_store.qu_insert_connection_status.clone(), params, + tsnet, + stats, ) } @@ -571,6 +613,7 @@ pub fn insert_channel_status_fut( item: ChannelStatusItem, ttls: &Ttls, data_store: &DataStore, + stats: Arc, ) -> SmallVec<[InsertFut; 4]> { let tsunix = item.ts.duration_since(UNIX_EPOCH).unwrap_or(Duration::ZERO); let secs = tsunix.as_secs() * SEC; @@ -578,6 +621,7 @@ pub fn insert_channel_status_fut( let ts = secs + nanos; let ts_msp = ts / CONNECTION_STATUS_DIV * CONNECTION_STATUS_DIV; let ts_lsp = ts - ts_msp; + let tsnet = ts; let kind = item.status.to_kind(); let cssid = item.cssid.id(); let params = ( @@ -591,6 +635,8 @@ pub fn insert_channel_status_fut( data_store.scy.clone(), data_store.qu_insert_channel_status.clone(), params, + tsnet, + stats.clone(), ); let params = ( ts_msp as i64, @@ -603,6 +649,8 @@ pub fn insert_channel_status_fut( data_store.scy.clone(), data_store.qu_insert_channel_status_by_ts_msp.clone(), params, + tsnet, + stats, ); smallvec![fut1, fut2] } diff --git a/stats/Cargo.toml b/stats/Cargo.toml index 25313f3..5fc571f 100644 --- a/stats/Cargo.toml +++ b/stats/Cargo.toml @@ -11,5 +11,3 @@ path = "src/stats.rs" stats_types = { path = "../stats_types" } stats_proc = { path = "../stats_proc" } log = { path = "../log" } -err = { path = "../../daqbuffer/crates/err" } -libc = "0.2" diff --git a/stats/src/stats.rs b/stats/src/stats.rs index 098dfd4..36bc7d0 100644 --- a/stats/src/stats.rs +++ b/stats/src/stats.rs @@ -1,6 +1,7 @@ use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; -use std::time::{Duration, Instant}; +use std::time::Duration; +use std::time::Instant; const US: u64 = 1000; const MS: u64 = US * 1000; @@ -207,7 +208,35 @@ impl IntervalEma { } } -// #[cfg(DISABLED)] +pub struct XorShift32 { + state: u32, +} + +impl XorShift32 { + pub fn new(state: u32) -> Self { + Self { state } + } + + pub fn new_from_time() -> Self { + use std::time::SystemTime; + Self::new( + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .subsec_micros(), + ) + } + + pub fn next(&mut self) -> u32 { + let mut x = self.state; + x ^= x << 13; + x ^= x >> 17; + x ^= x << 5; + self.state = x; + x + } +} + stats_proc::stats_struct!(( stats_struct( name(CaProtoStats), @@ -216,9 +245,11 @@ stats_proc::stats_struct!(( tcp_recv_count, tcp_recv_bytes, protocol_issue, - payload_very_large, + payload_std_too_large, payload_ext_but_small, + payload_ext_very_large, ), + histolog2s(payload_size, data_count,), ), stats_struct( name(CaConnSetStats), @@ -301,24 +332,10 @@ stats_proc::stats_struct!(( inserts_msp_grid, inserts_value, ratelimit_drop, - item_latency_neg, - item_latency_025ms, - item_latency_050ms, - item_latency_100ms, - item_latency_200ms, - item_latency_400ms, - item_latency_800ms, - item_latency_1600ms, - item_latency_3200ms, - item_latency_large, - item_commit_latency_0050ms, - item_commit_latency_0200ms, - item_commit_latency_0800ms, - item_commit_latency_3200ms, - item_commit_latency_large, worker_start, worker_finish, - ) + ), + histolog2s(item_lat_net_worker, item_lat_net_store,), ), stats_struct( name(IocFinderStats), @@ -346,7 +363,6 @@ stats_proc::stats_struct!(( ), )); -// #[cfg(DISABLED)] stats_proc::stats_struct!(( stats_struct( name(CaConnStats), @@ -399,19 +415,8 @@ stats_proc::stats_struct!(( channel_alive_count, channel_not_alive_count, channel_series_lookup_already_pending, - ca_ts_off_1, - ca_ts_off_2, - ca_ts_off_3, - ca_ts_off_4, ping_start, ping_no_proto, - pong_recv_010ms, - pong_recv_025ms, - pong_recv_050ms, - pong_recv_100ms, - pong_recv_200ms, - pong_recv_400ms, - pong_recv_slow, pong_timeout, ca_conn_poll_fn_begin, ca_conn_poll_loop_begin, @@ -419,13 +424,13 @@ stats_proc::stats_struct!(( ca_conn_poll_pending, ca_conn_poll_no_progress_no_pending, ), - values(inter_ivl_ema) + values(inter_ivl_ema), + histolog2s(pong_recv_lat, ca_ts_off,), ), agg(name(CaConnStatsAgg), parent(CaConnStats)), diff(name(CaConnStatsAggDiff), input(CaConnStatsAgg)), )); -// #[cfg(DISABLED)] stats_proc::stats_struct!(( stats_struct( name(DaemonStats), diff --git a/stats_proc/src/stats_proc.rs b/stats_proc/src/stats_proc.rs index b61add6..b88a775 100644 --- a/stats_proc/src/stats_proc.rs +++ b/stats_proc/src/stats_proc.rs @@ -17,6 +17,7 @@ struct StatsStructDef { prefix: Option, counters: Vec, values: Vec, + histolog2s: Vec, } #[derive(Debug)] @@ -49,7 +50,11 @@ fn stats_struct_impl(st: &StatsStructDef) -> String { .values .iter() .map(|x| format!("{:12}{}: stats_types::Value::new()", "", x.to_string())); - let inits: Vec<_> = inits1.into_iter().chain(inits2).collect(); + let init_histolog2s = st + .histolog2s + .iter() + .map(|x| format!("{:12}{}: stats_types::HistoLog2::new(0)", "", x.to_string())); + let inits: Vec<_> = inits1.into_iter().chain(inits2).chain(init_histolog2s).collect(); let inits = inits.join(",\n"); let incers: String = st .counters @@ -76,7 +81,22 @@ fn stats_struct_impl(st: &StatsStructDef) -> String { write!( buf, " - pub fn {nn}(&self) -> &stats_types::Value {{ + pub fn {nn}(&self) -> &stats_types::Value {{ + &self.{nn} + }} + " + ) + .unwrap(); + } + buf + }; + let histolog2s = { + let mut buf = String::new(); + for nn in &st.histolog2s { + write!( + buf, + " + pub fn {nn}(&self) -> &stats_types::HistoLog2 {{ &self.{nn} }} " @@ -111,6 +131,17 @@ fn stats_struct_impl(st: &StatsStructDef) -> String { pre, n, n )); } + for x in &st.histolog2s { + let n = x.to_string(); + let fullname = if let Some(x) = &st.prefix { + format!("daqingest_{x}_{n}") + } else { + format!("daqingest_{n}") + }; + buf.push_str(&format!("let fullname = \"{fullname}\";\n")); + buf.push_str(&format!("let s = self.{n}.to_prometheus(fullname);\n")); + buf.push_str(&format!("ret.push_str(&s);\n")); + } format!( " pub fn prometheus(&self) -> String {{ @@ -158,6 +189,8 @@ impl {name} {{ {values} + {histolog2s} + {fn_prometheus} {fn_snapshot} @@ -185,6 +218,11 @@ fn stats_struct_decl_impl(st: &StatsStructDef) -> String { .iter() .map(|x| format!("{:4}pub {}: stats_types::Value,\n", "", x.to_string())) .fold(String::new(), extend_str); + let histolog2s_decl = st + .histolog2s + .iter() + .map(|x| format!("{:4}pub {}: stats_types::HistoLog2,\n", "", x.to_string())) + .fold(String::new(), extend_str); let structt = format!( " pub struct {name} {{ @@ -192,6 +230,7 @@ pub struct {name} {{ dropped: stats_types::Value, {counters_decl} {values_decl} +{histolog2s_decl} }} " @@ -407,7 +446,8 @@ fn idents_from_exprs(inp: PunctExpr) -> syn::Result> { fn func_name_from_expr(inp: syn::Expr) -> syn::Result { use syn::spanned::Spanned; - use syn::{Error, Expr}; + use syn::Error; + use syn::Expr; match inp { Expr::Path(k) => { if k.path.segments.len() != 1 { @@ -425,7 +465,8 @@ fn func_name_from_expr(inp: syn::Expr) -> syn::Result { impl FuncCallWithArgs { fn from_expr(inp: syn::Expr) -> Result { use syn::spanned::Spanned; - use syn::{Error, Expr}; + use syn::Error; + use syn::Expr; let span_all = inp.span(); match inp { Expr::Call(k) => { @@ -448,6 +489,7 @@ impl StatsStructDef { prefix: syn::parse_str("__empty").unwrap(), counters: Vec::new(), values: Vec::new(), + histolog2s: Vec::new(), } } @@ -456,6 +498,7 @@ impl StatsStructDef { let mut prefix = None; let mut counters = None; let mut values = None; + let mut histolog2s = None; for k in inp { let fa = FuncCallWithArgs::from_expr(k)?; if fa.name == "name" { @@ -470,6 +513,9 @@ impl StatsStructDef { } else if fa.name == "values" { let idents = idents_from_exprs(fa.args)?; values = Some(idents); + } else if fa.name == "histolog2s" { + let idents = idents_from_exprs(fa.args)?; + histolog2s = Some(idents); } else { panic!("fa.name: {:?}", fa.name); } @@ -479,6 +525,7 @@ impl StatsStructDef { prefix, counters: counters.unwrap_or(Vec::new()), values: values.unwrap_or(Vec::new()), + histolog2s: histolog2s.unwrap_or(Vec::new()), }; Ok(ret) } @@ -587,6 +634,7 @@ pub fn stats_struct(ts: TokenStream) -> TokenStream { prefix: None, counters: j.stats.counters.clone(), values: Vec::new(), + histolog2s: Vec::new(), }; def.stats_struct_defs.push(h); } @@ -620,6 +668,8 @@ pub fn stats_struct(ts: TokenStream) -> TokenStream { counters: j.stats.counters.clone(), // TODO compute values values: Vec::new(), + // TODO not supported yet + histolog2s: Vec::new(), }; let s = diff_decl_impl(k, &p); code.push_str(&s); diff --git a/stats_types/src/stats_types.rs b/stats_types/src/stats_types.rs index f6d738b..b524244 100644 --- a/stats_types/src/stats_types.rs +++ b/stats_types/src/stats_types.rs @@ -65,6 +65,14 @@ impl Value { self.0.store(x, Release); } + pub fn inc(&self) { + self.0.fetch_add(1, AcqRel); + } + + pub fn dec(&self) { + self.0.fetch_sub(1, AcqRel); + } + pub fn load(&self) -> u64 { self.0.load(Acquire) } @@ -108,3 +116,106 @@ struct StatsAReader { } impl StatsAReader {} + +pub struct HistoLog2 { + histo: [AtomicU64; 16], + sub: u16, +} + +macro_rules! rep16 { + ([$x:expr]) => { + [$x, $x, $x, $x, $x, $x, $x, $x, $x, $x, $x, $x, $x, $x, $x, $x] + }; +} + +impl HistoLog2 { + pub fn new(sub: u16) -> Self { + Self { + histo: rep16!([AtomicU64::new(0)]), + sub, + } + } + + #[inline] + pub fn ingest(&self, mut v: u32) { + v >>= self.sub; + let mut po = 0; + while v != 0 && po < self.histo.len() - 1 { + v >>= 1; + po += 1; + } + self.histo[po].fetch_add(1, AcqRel); + } + + pub fn to_prometheus(&self, name: &str) -> String { + let mut ret = String::with_capacity(2048); + ret.push_str("# HELP "); + ret.push_str(name); + ret.push_str(" help-text-missing\n"); + ret.push_str("# TYPE "); + ret.push_str(name); + ret.push_str(" histogram\n"); + let mut cnt = 0; + let mut sum = 0; + for (i, a) in self.histo.iter().enumerate() { + use std::ops::Sub; + let i = i as u32; + let le = 2u32.pow(i).sub(1); + let v = a.load(Acquire); + cnt += v; + sum += v * le as u64; + ret.push_str(name); + ret.push_str("_bucket{le=\""); + ret.push_str(&le.to_string()); + ret.push_str("\"} "); + ret.push_str(&cnt.to_string()); + ret.push_str("\n"); + } + ret.push_str(name); + ret.push_str("_bucket{le=\"+Inf\"} "); + ret.push_str(&cnt.to_string()); + ret.push_str("\n"); + ret.push_str(name); + ret.push_str("_count "); + ret.push_str(&cnt.to_string()); + ret.push_str("\n"); + ret.push_str(name); + ret.push_str("_sum "); + ret.push_str(&sum.to_string()); + ret.push_str("\n"); + ret + } +} + +#[test] +fn histo_00() { + let histo = HistoLog2::new(0); + // histo.ingest(0); + // histo.ingest(1); + // histo.ingest(2); + histo.ingest(3); + histo.ingest(4); + let exp = r##"# HELP latA help-text-missing +# TYPE latA histogram +latA_bucket{le="0"} 0 +latA_bucket{le="1"} 0 +latA_bucket{le="3"} 1 +latA_bucket{le="7"} 2 +latA_bucket{le="15"} 2 +latA_bucket{le="31"} 2 +latA_bucket{le="63"} 2 +latA_bucket{le="127"} 2 +latA_bucket{le="255"} 2 +latA_bucket{le="511"} 2 +latA_bucket{le="1023"} 2 +latA_bucket{le="2047"} 2 +latA_bucket{le="4095"} 2 +latA_bucket{le="8191"} 2 +latA_bucket{le="16383"} 2 +latA_bucket{le="32767"} 2 +latA_bucket{le="+Inf"} 2 +latA_count 2 +latA_sum 10 +"##; + assert_eq!(histo.to_prometheus("latA"), exp); +}