Support larger CA data count, use histogram in metrics
This commit is contained in:
+41
-55
@@ -42,6 +42,7 @@ use series::SeriesId;
|
||||
use stats::CaConnStats;
|
||||
use stats::CaProtoStats;
|
||||
use stats::IntervalEma;
|
||||
use stats::XorShift32;
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::VecDeque;
|
||||
use std::net::SocketAddrV4;
|
||||
@@ -59,6 +60,7 @@ use taskrun::tokio;
|
||||
use tokio::net::TcpStream;
|
||||
|
||||
const CONNECTING_TIMEOUT: Duration = Duration::from_millis(6000);
|
||||
const IOC_PING_IVL: Duration = Duration::from_millis(80000);
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! trace2 {
|
||||
@@ -171,7 +173,7 @@ struct CreatedState {
|
||||
#[allow(unused)]
|
||||
sid: u32,
|
||||
data_type: u16,
|
||||
data_count: u16,
|
||||
data_count: u32,
|
||||
scalar_type: ScalarType,
|
||||
shape: Shape,
|
||||
#[allow(unused)]
|
||||
@@ -501,6 +503,7 @@ pub struct CaConn {
|
||||
inserts_counter: u64,
|
||||
extra_inserts_conf: ExtraInsertsConf,
|
||||
ioc_ping_last: Instant,
|
||||
ioc_ping_next: Instant,
|
||||
ioc_ping_start: Option<Instant>,
|
||||
storage_insert_sender: SenderPolling<QueryItem>,
|
||||
ca_conn_event_out_queue: VecDeque<CaConnEvent>,
|
||||
@@ -510,6 +513,7 @@ pub struct CaConn {
|
||||
thr_msg_poll: ThrottleTrace,
|
||||
ca_proto_stats: Arc<CaProtoStats>,
|
||||
weird_count: usize,
|
||||
rng: XorShift32,
|
||||
}
|
||||
|
||||
#[cfg(DISABLED)]
|
||||
@@ -531,6 +535,7 @@ impl CaConn {
|
||||
ca_proto_stats: Arc<CaProtoStats>,
|
||||
) -> Self {
|
||||
let (cq_tx, cq_rx) = async_channel::bounded(32);
|
||||
let mut rng = XorShift32::new_from_time();
|
||||
Self {
|
||||
opts,
|
||||
backend,
|
||||
@@ -556,6 +561,7 @@ impl CaConn {
|
||||
inserts_counter: 0,
|
||||
extra_inserts_conf: ExtraInsertsConf::new(),
|
||||
ioc_ping_last: Instant::now(),
|
||||
ioc_ping_next: Instant::now() + Self::ioc_ping_ivl_rng(&mut rng),
|
||||
ioc_ping_start: None,
|
||||
storage_insert_sender: SenderPolling::new(storage_insert_tx),
|
||||
ca_conn_event_out_queue: VecDeque::new(),
|
||||
@@ -565,9 +571,14 @@ impl CaConn {
|
||||
thr_msg_poll: ThrottleTrace::new(Duration::from_millis(10000)),
|
||||
ca_proto_stats,
|
||||
weird_count: 0,
|
||||
rng,
|
||||
}
|
||||
}
|
||||
|
||||
fn ioc_ping_ivl_rng(rng: &mut XorShift32) -> Duration {
|
||||
IOC_PING_IVL * 100 / (70 + (rng.next() % 60))
|
||||
}
|
||||
|
||||
fn new_self_ticker() -> Pin<Box<tokio::time::Sleep>> {
|
||||
Box::pin(tokio::time::sleep(Duration::from_millis(500)))
|
||||
}
|
||||
@@ -965,12 +976,11 @@ impl CaConn {
|
||||
self.trigger_shutdown(ChannelStatusClosedReason::IocTimeout);
|
||||
}
|
||||
} else {
|
||||
// TODO randomize delay a bit
|
||||
if self.ioc_ping_last.elapsed() > Duration::from_millis(120000) {
|
||||
if self.ioc_ping_next < tsnow {
|
||||
if let Some(proto) = &mut self.proto {
|
||||
self.stats.ping_start().inc();
|
||||
self.ioc_ping_start = Some(Instant::now());
|
||||
let msg = CaMsg { ty: CaMsgTy::Echo };
|
||||
let msg = CaMsg::from_ty_ts(CaMsgTy::Echo, tsnow);
|
||||
proto.push_out(msg);
|
||||
} else {
|
||||
self.stats.ping_no_proto().inc();
|
||||
@@ -1046,7 +1056,7 @@ impl CaConn {
|
||||
cid: Cid,
|
||||
sid: u32,
|
||||
data_type: u16,
|
||||
data_count: u16,
|
||||
data_count: u32,
|
||||
series: SeriesId,
|
||||
) -> Result<(), Error> {
|
||||
let tsnow = Instant::now();
|
||||
@@ -1069,14 +1079,13 @@ impl CaConn {
|
||||
self.cid_by_subid.insert(subid, cid);
|
||||
// TODO convert first to CaDbrType, set to `Time`, then convert to ix:
|
||||
let data_type_asked = data_type + 14;
|
||||
let msg = CaMsg {
|
||||
ty: CaMsgTy::EventAdd(EventAdd {
|
||||
sid,
|
||||
data_type: data_type_asked,
|
||||
data_count,
|
||||
subid,
|
||||
}),
|
||||
};
|
||||
let ty = CaMsgTy::EventAdd(EventAdd {
|
||||
sid,
|
||||
data_type: data_type_asked,
|
||||
data_count: data_count as _,
|
||||
subid,
|
||||
});
|
||||
let msg = CaMsg::from_ty_ts(ty, tsnow);
|
||||
let proto = self.proto.as_mut().unwrap();
|
||||
proto.push_out(msg);
|
||||
// TODO handle not-found error:
|
||||
@@ -1271,19 +1280,9 @@ impl CaConn {
|
||||
let epoch = ts.duration_since(std::time::UNIX_EPOCH).unwrap();
|
||||
epoch.as_secs() * SEC + epoch.subsec_nanos() as u64
|
||||
};
|
||||
let ts = ev.value.ts.map_or(0, |x| x.get());
|
||||
let ts = ev.value.ts;
|
||||
let ts_diff = ts.abs_diff(ts_local);
|
||||
if ts_diff > SEC * 300 {
|
||||
self.stats.ca_ts_off_4.inc();
|
||||
//warn!("Bad time for {name} {ts} vs {ts_local} diff {}", ts_diff / SEC);
|
||||
// TODO mute this channel for some time, discard the event.
|
||||
} else if ts_diff > SEC * 120 {
|
||||
self.stats.ca_ts_off_3.inc();
|
||||
} else if ts_diff > SEC * 20 {
|
||||
self.stats.ca_ts_off_2.inc();
|
||||
} else if ts_diff > SEC * 3 {
|
||||
self.stats.ca_ts_off_1.inc();
|
||||
}
|
||||
self.stats.ca_ts_off().ingest((ts_diff / MS) as u32);
|
||||
if tsnow >= st.insert_next_earliest {
|
||||
//let channel_state = self.channels.get_mut(&cid).unwrap();
|
||||
let item_queue = &mut self.insert_item_queue;
|
||||
@@ -1441,12 +1440,13 @@ impl CaConn {
|
||||
Ok(k) => k.to_string(),
|
||||
Err(e) => return Err(e),
|
||||
};
|
||||
let msg = CaMsg {
|
||||
ty: CaMsgTy::CreateChan(CreateChan {
|
||||
let msg = CaMsg::from_ty_ts(
|
||||
CaMsgTy::CreateChan(CreateChan {
|
||||
cid: cid.0,
|
||||
channel: name.into(),
|
||||
}),
|
||||
};
|
||||
Instant::now(),
|
||||
);
|
||||
msgs_tmp.push(msg);
|
||||
// TODO handle not-found error:
|
||||
let ch_s = self.channels.get_mut(&cid).unwrap();
|
||||
@@ -1573,28 +1573,15 @@ impl CaConn {
|
||||
CaMsgTy::Echo => {
|
||||
// let addr = &self.remote_addr_dbg;
|
||||
if let Some(started) = self.ioc_ping_start {
|
||||
let dt = started.elapsed().as_secs_f32() * 1e3;
|
||||
if dt <= 10. {
|
||||
self.stats.pong_recv_010ms().inc();
|
||||
} else if dt <= 25. {
|
||||
self.stats.pong_recv_025ms().inc();
|
||||
} else if dt <= 50. {
|
||||
self.stats.pong_recv_050ms().inc();
|
||||
} else if dt <= 100. {
|
||||
self.stats.pong_recv_100ms().inc();
|
||||
} else if dt <= 200. {
|
||||
self.stats.pong_recv_200ms().inc();
|
||||
} else if dt <= 400. {
|
||||
self.stats.pong_recv_400ms().inc();
|
||||
} else {
|
||||
self.stats.pong_recv_slow().inc();
|
||||
// warn!("Received Echo {dt:10.0}ms {addr:?}");
|
||||
}
|
||||
let dt = started.elapsed();
|
||||
let dt = dt.as_secs() as u32 + dt.subsec_millis();
|
||||
self.stats.pong_recv_lat().ingest(dt);
|
||||
} else {
|
||||
let addr = &self.remote_addr_dbg;
|
||||
warn!("Received Echo even though we didn't asked for it {addr:?}");
|
||||
}
|
||||
self.ioc_ping_last = Instant::now();
|
||||
self.ioc_ping_last = tsnow;
|
||||
self.ioc_ping_next = tsnow + Self::ioc_ping_ivl_rng(&mut self.rng);
|
||||
self.ioc_ping_start = None;
|
||||
}
|
||||
CaMsgTy::CreateChanFail(msg) => {
|
||||
@@ -1622,6 +1609,7 @@ impl CaConn {
|
||||
warn!("CaConn sees: {msg:?}");
|
||||
}
|
||||
}
|
||||
#[cfg(DISABLED)]
|
||||
CaMsgTy::IssueDataCount(hi, stat, sev, secs, nanos) => {
|
||||
let cid = *self.cid_by_subid.get(&hi.param2()).unwrap();
|
||||
let name = self.name_by_cid.get(&cid).unwrap();
|
||||
@@ -1676,7 +1664,7 @@ impl CaConn {
|
||||
Break(Pending)
|
||||
}
|
||||
|
||||
fn handle_conn_state(&mut self, cx: &mut Context) -> Result<Poll<Option<()>>, Error> {
|
||||
fn handle_conn_state(&mut self, tsnow: Instant, cx: &mut Context) -> Result<Poll<Option<()>>, Error> {
|
||||
use Poll::*;
|
||||
match &mut self.state {
|
||||
CaConnState::Unconnected(_since) => {
|
||||
@@ -1694,6 +1682,7 @@ impl CaConn {
|
||||
Ready(connect_result) => {
|
||||
match connect_result {
|
||||
Ok(Ok(tcp)) => {
|
||||
self.stats.tcp_connected.inc();
|
||||
let addr = addr.clone();
|
||||
self.insert_item_queue
|
||||
.push_back(QueryItem::ConnectionStatus(ConnectionStatusItem {
|
||||
@@ -1761,15 +1750,11 @@ impl CaConn {
|
||||
trace4!("Init");
|
||||
let hostname = self.local_epics_hostname.clone();
|
||||
let proto = self.proto.as_mut().unwrap();
|
||||
let msg = CaMsg { ty: CaMsgTy::Version };
|
||||
let msg = CaMsg::from_ty_ts(CaMsgTy::Version, tsnow);
|
||||
proto.push_out(msg);
|
||||
let msg = CaMsg {
|
||||
ty: CaMsgTy::ClientName,
|
||||
};
|
||||
let msg = CaMsg::from_ty_ts(CaMsgTy::ClientName, tsnow);
|
||||
proto.push_out(msg);
|
||||
let msg = CaMsg {
|
||||
ty: CaMsgTy::HostName(hostname),
|
||||
};
|
||||
let msg = CaMsg::from_ty_ts(CaMsgTy::HostName(hostname), tsnow);
|
||||
proto.push_out(msg);
|
||||
self.state = CaConnState::Listen;
|
||||
Ok(Ready(Some(())))
|
||||
@@ -1820,6 +1805,7 @@ impl CaConn {
|
||||
|
||||
fn loop_inner(&mut self, cx: &mut Context) -> Result<Poll<Option<()>>, Error> {
|
||||
use Poll::*;
|
||||
let tsnow = Instant::now();
|
||||
let mut have_progress = false;
|
||||
for _ in 0..64 {
|
||||
self.stats.caconn_loop2_count.inc();
|
||||
@@ -1828,7 +1814,7 @@ impl CaConn {
|
||||
} else if self.insert_item_queue.len() >= self.opts.insert_queue_max {
|
||||
break;
|
||||
} else {
|
||||
match self.handle_conn_state(cx) {
|
||||
match self.handle_conn_state(tsnow, cx) {
|
||||
Ok(x) => match x {
|
||||
Ready(Some(())) => {
|
||||
have_progress = true;
|
||||
|
||||
@@ -274,6 +274,7 @@ impl FindIocStream {
|
||||
sock: i32,
|
||||
stats: &IocFinderStats,
|
||||
) -> Poll<Result<(SocketAddrV4, Vec<(SearchId, SocketAddrV4)>), Error>> {
|
||||
let tsnow = Instant::now();
|
||||
let mut saddr_mem = [0u8; std::mem::size_of::<libc::sockaddr>()];
|
||||
let mut saddr_len: libc::socklen_t = saddr_mem.len() as _;
|
||||
let mut buf = vec![0u8; 1024];
|
||||
@@ -343,7 +344,7 @@ impl FindIocStream {
|
||||
error!("incomplete message, missing payload");
|
||||
break;
|
||||
}
|
||||
let msg = CaMsg::from_proto_infos(&hi, nb.data(), 32).map_err(|e| e.to_string())?;
|
||||
let msg = CaMsg::from_proto_infos(&hi, nb.data(), tsnow, 32).map_err(|e| e.to_string())?;
|
||||
nb.adv(hi.payload_len()).map_err(|e| e.to_string())?;
|
||||
msgs.push(msg);
|
||||
accounted += 16 + hi.payload_len();
|
||||
|
||||
+91
-96
@@ -17,6 +17,7 @@ use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::task::Context;
|
||||
use std::task::Poll;
|
||||
use std::time::Instant;
|
||||
use taskrun::tokio;
|
||||
use tokio::io::AsyncRead;
|
||||
use tokio::io::AsyncWrite;
|
||||
@@ -44,6 +45,7 @@ pub enum Error {
|
||||
CaCommandNotSupported(u16),
|
||||
ParseAttemptInDoneState,
|
||||
UnexpectedHeader,
|
||||
ExtendedHeaderBadCount,
|
||||
}
|
||||
|
||||
const CA_PROTO_VERSION: u16 = 13;
|
||||
@@ -84,7 +86,7 @@ pub struct CreateChan {
|
||||
#[derive(Debug)]
|
||||
pub struct CreateChanRes {
|
||||
pub data_type: u16,
|
||||
pub data_count: u16,
|
||||
pub data_count: u32,
|
||||
pub cid: u32,
|
||||
pub sid: u32,
|
||||
}
|
||||
@@ -112,7 +114,7 @@ pub struct EventAdd {
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct EventAddRes {
|
||||
pub data_type: u16,
|
||||
pub data_count: u16,
|
||||
pub data_count: u32,
|
||||
pub status: u32,
|
||||
pub subid: u32,
|
||||
pub value: CaEventValue,
|
||||
@@ -129,7 +131,7 @@ pub struct ReadNotify {
|
||||
#[derive(Debug)]
|
||||
pub struct ReadNotifyRes {
|
||||
pub data_type: u16,
|
||||
pub data_count: u16,
|
||||
pub data_count: u32,
|
||||
pub sid: u32,
|
||||
pub ioid: u32,
|
||||
}
|
||||
@@ -342,9 +344,9 @@ impl From<CaDataValue> for scywr::iteminsertqueue::DataValue {
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct CaEventValue {
|
||||
pub ts: Option<NonZeroU64>,
|
||||
pub status: Option<NonZeroU16>,
|
||||
pub severity: Option<NonZeroU16>,
|
||||
pub ts: u64,
|
||||
pub status: u16,
|
||||
pub severity: u16,
|
||||
pub data: CaDataValue,
|
||||
}
|
||||
|
||||
@@ -367,7 +369,6 @@ pub enum CaMsgTy {
|
||||
ReadNotify(ReadNotify),
|
||||
ReadNotifyRes(ReadNotifyRes),
|
||||
Echo,
|
||||
IssueDataCount(HeadInfo, u16, u16, u32, u32),
|
||||
}
|
||||
|
||||
impl CaMsgTy {
|
||||
@@ -391,7 +392,6 @@ impl CaMsgTy {
|
||||
ReadNotify(_) => 0x0f,
|
||||
ReadNotifyRes(_) => 0x0f,
|
||||
Echo => 0x17,
|
||||
IssueDataCount(..) => panic!(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -425,7 +425,6 @@ impl CaMsgTy {
|
||||
panic!();
|
||||
}
|
||||
Echo => 0,
|
||||
IssueDataCount(..) => panic!(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -452,7 +451,6 @@ impl CaMsgTy {
|
||||
ReadNotify(x) => x.data_type,
|
||||
ReadNotifyRes(x) => x.data_type,
|
||||
Echo => 0,
|
||||
IssueDataCount(..) => panic!(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -468,15 +466,23 @@ impl CaMsgTy {
|
||||
Search(_) => CA_PROTO_VERSION,
|
||||
SearchRes(_) => 0,
|
||||
CreateChan(_) => 0,
|
||||
CreateChanRes(x) => x.data_count,
|
||||
CreateChanRes(x) => {
|
||||
panic!();
|
||||
x.data_count as _
|
||||
}
|
||||
CreateChanFail(_) => 0,
|
||||
AccessRightsRes(_) => 0,
|
||||
EventAdd(x) => x.data_count,
|
||||
EventAddRes(x) => x.data_count,
|
||||
EventAddRes(x) => {
|
||||
panic!();
|
||||
x.data_count as _
|
||||
}
|
||||
ReadNotify(x) => x.data_count,
|
||||
ReadNotifyRes(x) => x.data_count,
|
||||
ReadNotifyRes(x) => {
|
||||
panic!();
|
||||
x.data_count as _
|
||||
}
|
||||
Echo => 0,
|
||||
IssueDataCount(..) => panic!(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -500,7 +506,6 @@ impl CaMsgTy {
|
||||
ReadNotify(x) => x.sid,
|
||||
ReadNotifyRes(x) => x.sid,
|
||||
Echo => 0,
|
||||
IssueDataCount(..) => panic!(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -524,7 +529,6 @@ impl CaMsgTy {
|
||||
ReadNotify(x) => x.ioid,
|
||||
ReadNotifyRes(x) => x.ioid,
|
||||
Echo => 0,
|
||||
IssueDataCount(..) => panic!(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -589,7 +593,6 @@ impl CaMsgTy {
|
||||
ReadNotify(_) => {}
|
||||
ReadNotifyRes(_) => {}
|
||||
Echo => {}
|
||||
IssueDataCount(..) => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -626,9 +629,14 @@ macro_rules! convert_wave_value {
|
||||
#[derive(Debug)]
|
||||
pub struct CaMsg {
|
||||
pub ty: CaMsgTy,
|
||||
pub ts: Instant,
|
||||
}
|
||||
|
||||
impl CaMsg {
|
||||
pub fn from_ty_ts(ty: CaMsgTy, ts: Instant) -> Self {
|
||||
Self { ty, ts }
|
||||
}
|
||||
|
||||
fn len(&self) -> usize {
|
||||
self.ty.len()
|
||||
}
|
||||
@@ -706,11 +714,14 @@ impl CaMsg {
|
||||
Ok(val)
|
||||
}
|
||||
|
||||
pub fn from_proto_infos(hi: &HeadInfo, payload: &[u8], array_truncate: usize) -> Result<Self, Error> {
|
||||
pub fn from_proto_infos(
|
||||
hi: &HeadInfo,
|
||||
payload: &[u8],
|
||||
tsnow: Instant,
|
||||
array_truncate: usize,
|
||||
) -> Result<Self, Error> {
|
||||
let msg = match hi.cmdid {
|
||||
0x00 => CaMsg {
|
||||
ty: CaMsgTy::VersionRes(hi.data_count),
|
||||
},
|
||||
0x00 => CaMsg::from_ty_ts(CaMsgTy::VersionRes(hi.data_count), tsnow),
|
||||
0x0b => {
|
||||
let mut s = String::new();
|
||||
s.extend(format!("{:?}", &payload[..payload.len().min(16)]).chars());
|
||||
@@ -723,20 +734,16 @@ impl CaMsg {
|
||||
eid: hi.param2,
|
||||
msg: s,
|
||||
};
|
||||
CaMsg { ty: CaMsgTy::Error(e) }
|
||||
CaMsg::from_ty_ts(CaMsgTy::Error(e), tsnow)
|
||||
}
|
||||
20 => {
|
||||
let name = std::ffi::CString::new(payload)
|
||||
.map(|s| s.into_string().unwrap_or_else(|e| format!("{e:?}")))
|
||||
.unwrap_or_else(|e| format!("{e:?}"));
|
||||
CaMsg {
|
||||
ty: CaMsgTy::ClientNameRes(ClientNameRes { name }),
|
||||
}
|
||||
CaMsg::from_ty_ts(CaMsgTy::ClientNameRes(ClientNameRes { name }), tsnow)
|
||||
}
|
||||
// TODO make response type for host name:
|
||||
21 => CaMsg {
|
||||
ty: CaMsgTy::HostName("TODOx5288".into()),
|
||||
},
|
||||
21 => CaMsg::from_ty_ts(CaMsgTy::HostName("TODOx5288".into()), tsnow),
|
||||
6 => {
|
||||
if hi.payload_len() != 8 {
|
||||
warn!("protocol error: search result is expected with fixed payload size 8");
|
||||
@@ -748,40 +755,36 @@ impl CaMsg {
|
||||
return Err(Error::CaProtoVersionMissing);
|
||||
}
|
||||
let proto_version = u16::from_be_bytes(payload[0..2].try_into().map_err(|_| Error::BadSlice)?);
|
||||
CaMsg {
|
||||
ty: CaMsgTy::SearchRes(SearchRes {
|
||||
tcp_port: hi.data_type,
|
||||
addr: hi.param1,
|
||||
id: hi.param2,
|
||||
proto_version,
|
||||
}),
|
||||
}
|
||||
let ty = CaMsgTy::SearchRes(SearchRes {
|
||||
tcp_port: hi.data_type,
|
||||
addr: hi.param1,
|
||||
id: hi.param2,
|
||||
proto_version,
|
||||
});
|
||||
CaMsg::from_ty_ts(ty, tsnow)
|
||||
}
|
||||
18 => {
|
||||
CaMsg {
|
||||
// TODO use different structs for request and response:
|
||||
ty: CaMsgTy::CreateChanRes(CreateChanRes {
|
||||
data_type: hi.data_type,
|
||||
data_count: hi.data_count,
|
||||
cid: hi.param1,
|
||||
sid: hi.param2,
|
||||
}),
|
||||
}
|
||||
let ty = CaMsgTy::CreateChanRes(CreateChanRes {
|
||||
data_type: hi.data_type,
|
||||
// TODO what am I supposed to use here in case of extended header?
|
||||
data_count: hi.data_count() as _,
|
||||
cid: hi.param1,
|
||||
sid: hi.param2,
|
||||
});
|
||||
CaMsg::from_ty_ts(ty, tsnow)
|
||||
}
|
||||
22 => {
|
||||
CaMsg {
|
||||
// TODO use different structs for request and response:
|
||||
ty: CaMsgTy::AccessRightsRes(AccessRightsRes {
|
||||
cid: hi.param1,
|
||||
rights: hi.param2,
|
||||
}),
|
||||
}
|
||||
// TODO use different structs for request and response:
|
||||
let ty = CaMsgTy::AccessRightsRes(AccessRightsRes {
|
||||
cid: hi.param1,
|
||||
rights: hi.param2,
|
||||
});
|
||||
CaMsg::from_ty_ts(ty, tsnow)
|
||||
}
|
||||
26 => {
|
||||
CaMsg {
|
||||
// TODO use different structs for request and response:
|
||||
ty: CaMsgTy::CreateChanFail(CreateChanFail { cid: hi.param1 }),
|
||||
}
|
||||
// TODO use different structs for request and response:
|
||||
let ty = CaMsgTy::CreateChanFail(CreateChanFail { cid: hi.param1 });
|
||||
CaMsg::from_ty_ts(ty, tsnow)
|
||||
}
|
||||
1 => {
|
||||
use netpod::Shape;
|
||||
@@ -797,14 +800,8 @@ impl CaMsg {
|
||||
let ca_severity = u16::from_be_bytes(payload[2..4].try_into().map_err(|_| Error::BadSlice)?);
|
||||
let ca_secs = u32::from_be_bytes(payload[4..8].try_into().map_err(|_| Error::BadSlice)?);
|
||||
let ca_nanos = u32::from_be_bytes(payload[8..12].try_into().map_err(|_| Error::BadSlice)?);
|
||||
if hi.data_count == 0 || hi.data_count > 1024 * 32 {
|
||||
let msg = CaMsg {
|
||||
ty: CaMsgTy::IssueDataCount(hi.clone(), ca_status, ca_severity, ca_secs, ca_nanos),
|
||||
};
|
||||
return Ok(msg);
|
||||
}
|
||||
let ca_sh = Shape::from_ca_count(hi.data_count).map_err(|_| {
|
||||
error!("BadCaCount hi.data_count {}", hi.data_count);
|
||||
let ca_sh = Shape::from_ca_count(hi.data_count() as _).map_err(|_| {
|
||||
error!("BadCaCount {hi:?}");
|
||||
Error::BadCaCount
|
||||
})?;
|
||||
let meta_padding = match ca_dbr_ty.meta {
|
||||
@@ -841,21 +838,20 @@ impl CaMsg {
|
||||
};
|
||||
let ts = SEC * (ca_secs as u64 + EPICS_EPOCH_OFFSET) + ca_nanos as u64;
|
||||
let value = CaEventValue {
|
||||
ts: NonZeroU64::new(ts),
|
||||
status: NonZeroU16::new(ca_status),
|
||||
severity: NonZeroU16::new(ca_severity),
|
||||
ts,
|
||||
status: ca_status,
|
||||
severity: ca_severity,
|
||||
data: value,
|
||||
};
|
||||
let d = EventAddRes {
|
||||
data_type: hi.data_type,
|
||||
data_count: hi.data_count,
|
||||
data_count: hi.data_count() as _,
|
||||
status: hi.param1,
|
||||
subid: hi.param2,
|
||||
value,
|
||||
};
|
||||
CaMsg {
|
||||
ty: CaMsgTy::EventAddRes(d),
|
||||
}
|
||||
let ty = CaMsgTy::EventAddRes(d);
|
||||
CaMsg::from_ty_ts(ty, tsnow)
|
||||
}
|
||||
15 => {
|
||||
if payload.len() == 8 {
|
||||
@@ -872,17 +868,16 @@ impl CaMsg {
|
||||
&payload[..payload.len().min(12)],
|
||||
);
|
||||
}
|
||||
CaMsg {
|
||||
// TODO use different structs for request and response:
|
||||
ty: CaMsgTy::ReadNotifyRes(ReadNotifyRes {
|
||||
data_type: hi.data_type,
|
||||
data_count: hi.data_count,
|
||||
sid: hi.param1,
|
||||
ioid: hi.param2,
|
||||
}),
|
||||
}
|
||||
// TODO use different structs for request and response:
|
||||
let ty = CaMsgTy::ReadNotifyRes(ReadNotifyRes {
|
||||
data_type: hi.data_type,
|
||||
data_count: hi.data_count() as _,
|
||||
sid: hi.param1,
|
||||
ioid: hi.param2,
|
||||
});
|
||||
CaMsg::from_ty_ts(ty, tsnow)
|
||||
}
|
||||
0x17 => CaMsg { ty: CaMsgTy::Echo },
|
||||
0x17 => CaMsg::from_ty_ts(CaMsgTy::Echo, tsnow),
|
||||
x => return Err(Error::CaCommandNotSupported(x)),
|
||||
};
|
||||
Ok(msg)
|
||||
@@ -1065,6 +1060,7 @@ impl CaProto {
|
||||
|
||||
fn loop_body(mut self: Pin<&mut Self>, cx: &mut Context) -> Result<Option<Poll<CaItem>>, Error> {
|
||||
use Poll::*;
|
||||
let tsnow = Instant::now();
|
||||
let output_res_1: Option<Poll<()>> = 'll1: loop {
|
||||
if self.out.len() == 0 {
|
||||
break None;
|
||||
@@ -1152,7 +1148,7 @@ impl CaProto {
|
||||
Ok(None)
|
||||
}
|
||||
}?;
|
||||
let parse_res: Option<CaItem> = self.parse_item()?;
|
||||
let parse_res: Option<CaItem> = self.parse_item(tsnow)?;
|
||||
match (output_res_2, read_res, parse_res) {
|
||||
(_, _, Some(item)) => Ok(Some(Ready(item))),
|
||||
(Some(Pending), _, _) => Ok(Some(Pending)),
|
||||
@@ -1165,7 +1161,7 @@ impl CaProto {
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_item(&mut self) -> Result<Option<CaItem>, Error> {
|
||||
fn parse_item(&mut self, tsnow: Instant) -> Result<Option<CaItem>, Error> {
|
||||
loop {
|
||||
if self.buf.len() < self.state.need_min() {
|
||||
break Ok(None);
|
||||
@@ -1178,17 +1174,12 @@ impl CaProto {
|
||||
if hi.payload_size == 0xffff {
|
||||
if hi.data_count != 0 {
|
||||
warn!("protocol error: {hi:?}");
|
||||
return Err(Error::UnexpectedHeader);
|
||||
return Err(Error::ExtendedHeaderBadCount);
|
||||
}
|
||||
}
|
||||
if hi.payload_size == 0xffff {
|
||||
} else if hi.payload_size > 16368 {
|
||||
if self.logged_proto_error_for_cid.contains_key(&sid) {
|
||||
// TODO emit this as Item so that downstream can translate SID to name.
|
||||
self.logged_proto_error_for_cid.insert(sid, true);
|
||||
}
|
||||
warn!("protocol error: {hi:?}");
|
||||
return Err(Error::UnexpectedHeader);
|
||||
self.stats.payload_std_too_large().inc();
|
||||
}
|
||||
}
|
||||
if hi.cmdid > 26 {
|
||||
@@ -1199,9 +1190,11 @@ impl CaProto {
|
||||
self.state = CaState::ExtHead(hi);
|
||||
Ok(None)
|
||||
} else {
|
||||
// For extended messages, ingest on receive of extended header
|
||||
self.stats.payload_size().ingest(hi.payload_len() as u32);
|
||||
if hi.payload_size == 0 {
|
||||
self.state = CaState::StdHead;
|
||||
let msg = CaMsg::from_proto_infos(&hi, &[], self.array_truncate)?;
|
||||
let msg = CaMsg::from_proto_infos(&hi, &[], tsnow, self.array_truncate)?;
|
||||
Ok(Some(CaItem::Msg(msg)))
|
||||
} else {
|
||||
self.state = CaState::Payload(hi);
|
||||
@@ -1212,8 +1205,9 @@ impl CaProto {
|
||||
CaState::ExtHead(hi) => {
|
||||
let payload_size = self.buf.read_u32_be()?;
|
||||
let data_count = self.buf.read_u32_be()?;
|
||||
self.stats.payload_size().ingest(hi.payload_len() as u32);
|
||||
if payload_size > 1024 * 1024 * 32 {
|
||||
self.stats.payload_very_large().inc();
|
||||
self.stats.payload_ext_very_large().inc();
|
||||
if false {
|
||||
warn!(
|
||||
"ExtHead data_type {} payload_size {payload_size} data_count {data_count}",
|
||||
@@ -1227,9 +1221,6 @@ impl CaProto {
|
||||
"ExtHead data_type {} payload_size {payload_size} data_count {data_count}",
|
||||
hi.data_type
|
||||
);
|
||||
// let msg = CaMsg::from_proto_infos(hi, &[], self.array_truncate)?;
|
||||
// self.state = CaState::StdHead;
|
||||
// Ok(Some(CaItem::Msg(msg)))
|
||||
}
|
||||
let hi = hi.clone().with_ext(payload_size, data_count);
|
||||
self.state = CaState::Payload(hi);
|
||||
@@ -1237,7 +1228,11 @@ impl CaProto {
|
||||
}
|
||||
CaState::Payload(hi) => {
|
||||
let g = self.buf.read_bytes(hi.payload_len())?;
|
||||
let msg = CaMsg::from_proto_infos(hi, g, self.array_truncate)?;
|
||||
let msg = CaMsg::from_proto_infos(hi, g, tsnow, self.array_truncate)?;
|
||||
// data-count is only reasonable for event messages
|
||||
if let CaMsgTy::EventAddRes(e) = &msg.ty {
|
||||
self.stats.data_count().ingest(hi.data_count() as u32);
|
||||
}
|
||||
self.state = CaState::StdHead;
|
||||
Ok(Some(CaItem::Msg(msg)))
|
||||
}
|
||||
|
||||
+21
-40
@@ -191,31 +191,11 @@ async fn worker(
|
||||
let epoch = ts.duration_since(std::time::UNIX_EPOCH).unwrap();
|
||||
epoch.as_secs() * SEC + epoch.subsec_nanos() as u64
|
||||
};
|
||||
let dt = (tsnow / 1000000) as i64 - (item_ts_local / 1000000) as i64;
|
||||
if dt < 0 {
|
||||
stats.item_latency_neg().inc();
|
||||
} else if dt <= 25 {
|
||||
stats.item_latency_025ms().inc();
|
||||
} else if dt <= 50 {
|
||||
stats.item_latency_050ms().inc();
|
||||
} else if dt <= 100 {
|
||||
stats.item_latency_100ms().inc();
|
||||
} else if dt <= 200 {
|
||||
stats.item_latency_200ms().inc();
|
||||
} else if dt <= 400 {
|
||||
stats.item_latency_400ms().inc();
|
||||
} else if dt <= 800 {
|
||||
stats.item_latency_800ms().inc();
|
||||
} else if dt <= 1600 {
|
||||
stats.item_latency_1600ms().inc();
|
||||
} else if dt <= 3200 {
|
||||
stats.item_latency_3200ms().inc();
|
||||
} else {
|
||||
stats.item_latency_large().inc();
|
||||
}
|
||||
let dt = ((tsnow / 1000000) as u32).saturating_sub((item_ts_local / 1000000) as u32);
|
||||
stats.item_lat_net_worker().ingest(dt);
|
||||
let insert_frac = insert_worker_opts.insert_frac.load(Ordering::Acquire);
|
||||
let do_insert = i1 % 1000 < insert_frac;
|
||||
match insert_item(item, &ttls, &data_store, &stats, do_insert).await {
|
||||
match insert_item(item, &ttls, &data_store, do_insert, &stats).await {
|
||||
Ok(_) => {
|
||||
stats.inserted_values().inc();
|
||||
let tsnow = {
|
||||
@@ -223,18 +203,8 @@ async fn worker(
|
||||
let epoch = ts.duration_since(std::time::UNIX_EPOCH).unwrap();
|
||||
epoch.as_secs() * SEC + epoch.subsec_nanos() as u64
|
||||
};
|
||||
let dt = (tsnow / 1000000) as i64 - (item_ts_local / 1000000) as i64;
|
||||
if dt <= 50 {
|
||||
stats.item_commit_latency_0050ms().inc();
|
||||
} else if dt <= 200 {
|
||||
stats.item_commit_latency_0200ms().inc();
|
||||
} else if dt <= 800 {
|
||||
stats.item_commit_latency_0800ms().inc();
|
||||
} else if dt <= 3200 {
|
||||
stats.item_commit_latency_3200ms().inc();
|
||||
} else {
|
||||
stats.item_commit_latency_large().inc();
|
||||
}
|
||||
let dt = ((tsnow / 1000000) as u32).saturating_sub((item_ts_local / 1000000) as u32);
|
||||
stats.item_lat_net_store().ingest(dt);
|
||||
backoff = backoff_0;
|
||||
}
|
||||
Err(e) => {
|
||||
@@ -367,16 +337,21 @@ async fn worker_streamed(
|
||||
let mut stream = item_inp
|
||||
.map(|item| {
|
||||
stats.item_recv.inc();
|
||||
let tsnow_u64 = {
|
||||
let ts = SystemTime::now();
|
||||
let epoch = ts.duration_since(std::time::UNIX_EPOCH).unwrap();
|
||||
epoch.as_secs() * SEC + epoch.subsec_nanos() as u64
|
||||
};
|
||||
match item {
|
||||
QueryItem::Insert(item) => prepare_query_insert_futs(item, &ttls, &data_store, &stats),
|
||||
QueryItem::Insert(item) => prepare_query_insert_futs(item, &ttls, &data_store, &stats, tsnow_u64),
|
||||
QueryItem::ConnectionStatus(item) => {
|
||||
stats.inserted_connection_status().inc();
|
||||
let fut = insert_connection_status_fut(item, &ttls, &data_store);
|
||||
let fut = insert_connection_status_fut(item, &ttls, &data_store, stats.clone());
|
||||
smallvec![fut]
|
||||
}
|
||||
QueryItem::ChannelStatus(item) => {
|
||||
stats.inserted_channel_status().inc();
|
||||
insert_channel_status_fut(item, &ttls, &data_store)
|
||||
insert_channel_status_fut(item, &ttls, &data_store, stats.clone())
|
||||
}
|
||||
_ => {
|
||||
// TODO
|
||||
@@ -420,23 +395,29 @@ fn prepare_query_insert_futs(
|
||||
item: InsertItem,
|
||||
ttls: &Ttls,
|
||||
data_store: &Arc<DataStore>,
|
||||
stats: &InsertWorkerStats,
|
||||
stats: &Arc<InsertWorkerStats>,
|
||||
tsnow_u64: u64,
|
||||
) -> SmallVec<[InsertFut; 4]> {
|
||||
stats.inserts_value().inc();
|
||||
let item_ts_local = item.ts_local;
|
||||
let dt = ((tsnow_u64 / 1000000) as u32).saturating_sub((item_ts_local / 1000000) as u32);
|
||||
stats.item_lat_net_worker().ingest(dt);
|
||||
let msp_bump = item.msp_bump;
|
||||
let series = item.series.clone();
|
||||
let ts_msp = item.ts_msp;
|
||||
let do_insert = true;
|
||||
let fut = insert_item_fut(item, &ttls, &data_store, do_insert);
|
||||
let fut = insert_item_fut(item, &ttls, &data_store, do_insert, stats);
|
||||
let mut futs = smallvec![fut];
|
||||
if msp_bump {
|
||||
stats.inserts_msp().inc();
|
||||
let fut = insert_msp_fut(
|
||||
series,
|
||||
ts_msp,
|
||||
item_ts_local,
|
||||
ttls,
|
||||
data_store.scy.clone(),
|
||||
data_store.qu_insert_ts_msp.clone(),
|
||||
stats.clone(),
|
||||
);
|
||||
futs.push(fut);
|
||||
}
|
||||
|
||||
@@ -269,9 +269,11 @@ struct InsParCom {
|
||||
series: u64,
|
||||
ts_msp: u64,
|
||||
ts_lsp: u64,
|
||||
ts_local: u64,
|
||||
pulse: u64,
|
||||
ttl: u32,
|
||||
do_insert: bool,
|
||||
stats: Arc<InsertWorkerStats>,
|
||||
}
|
||||
|
||||
fn insert_scalar_gen_fut<ST>(par: InsParCom, val: ST, qu: Arc<PreparedStatement>, scy: Arc<ScySession>) -> InsertFut
|
||||
@@ -286,7 +288,7 @@ where
|
||||
val,
|
||||
par.ttl as i32,
|
||||
);
|
||||
InsertFut::new(scy, qu, params)
|
||||
InsertFut::new(scy, qu, params, par.ts_local, par.stats)
|
||||
}
|
||||
|
||||
fn insert_array_gen_fut<ST>(par: InsParCom, val: ST, qu: Arc<PreparedStatement>, scy: Arc<ScySession>) -> InsertFut
|
||||
@@ -301,7 +303,7 @@ where
|
||||
val,
|
||||
par.ttl as i32,
|
||||
);
|
||||
InsertFut::new(scy, qu, params)
|
||||
InsertFut::new(scy, qu, params, par.ts_local, par.stats)
|
||||
}
|
||||
|
||||
#[pin_project::pin_project]
|
||||
@@ -320,10 +322,24 @@ impl InsertFut {
|
||||
scy: Arc<ScySession>,
|
||||
qu: Arc<PreparedStatement>,
|
||||
params: V,
|
||||
// timestamp when we first encountered the data to-be inserted, for metrics
|
||||
tsnet: u64,
|
||||
stats: Arc<InsertWorkerStats>,
|
||||
) -> Self {
|
||||
let scy_ref = unsafe { NonNull::from(scy.as_ref()).as_ref() };
|
||||
let qu_ref = unsafe { NonNull::from(qu.as_ref()).as_ref() };
|
||||
let fut = scy_ref.execute_paged(qu_ref, params, None);
|
||||
let fut = fut.map(move |x| {
|
||||
let item_ts_local = tsnet;
|
||||
let tsnow_u64 = {
|
||||
let ts = SystemTime::now();
|
||||
let epoch = ts.duration_since(std::time::UNIX_EPOCH).unwrap();
|
||||
epoch.as_secs() * SEC + epoch.subsec_nanos() as u64
|
||||
};
|
||||
let dt = ((tsnow_u64 / 1000000) as u32).saturating_sub((item_ts_local / 1000000) as u32);
|
||||
stats.item_lat_net_store().ingest(dt);
|
||||
x
|
||||
});
|
||||
let fut = taskrun::tokio::task::unconstrained(fut);
|
||||
let fut = Box::pin(fut);
|
||||
// let fut = StackFuture::from(fut);
|
||||
@@ -416,8 +432,8 @@ pub async fn insert_item(
|
||||
item: InsertItem,
|
||||
ttls: &Ttls,
|
||||
data_store: &DataStore,
|
||||
stats: &InsertWorkerStats,
|
||||
do_insert: bool,
|
||||
stats: &Arc<InsertWorkerStats>,
|
||||
) -> Result<(), Error> {
|
||||
if item.msp_bump {
|
||||
let params = (item.series.id() as i64, item.ts_msp as i64, ttls.index.as_secs() as i32);
|
||||
@@ -446,9 +462,11 @@ pub async fn insert_item(
|
||||
series: item.series.id(),
|
||||
ts_msp: item.ts_msp,
|
||||
ts_lsp: item.ts_lsp,
|
||||
ts_local: item.ts_local,
|
||||
pulse: item.pulse,
|
||||
ttl: ttls.d0.as_secs() as _,
|
||||
do_insert,
|
||||
stats: stats.clone(),
|
||||
};
|
||||
use ScalarValue::*;
|
||||
match val {
|
||||
@@ -467,9 +485,11 @@ pub async fn insert_item(
|
||||
series: item.series.id(),
|
||||
ts_msp: item.ts_msp,
|
||||
ts_lsp: item.ts_lsp,
|
||||
ts_local: item.ts_local,
|
||||
pulse: item.pulse,
|
||||
ttl: ttls.d1.as_secs() as _,
|
||||
do_insert,
|
||||
stats: stats.clone(),
|
||||
};
|
||||
use ArrayValue::*;
|
||||
match val {
|
||||
@@ -489,15 +509,24 @@ pub async fn insert_item(
|
||||
pub fn insert_msp_fut(
|
||||
series: SeriesId,
|
||||
ts_msp: u64,
|
||||
// for stats, the timestamp when we received that data
|
||||
tsnet: u64,
|
||||
ttls: &Ttls,
|
||||
scy: Arc<ScySession>,
|
||||
qu: Arc<PreparedStatement>,
|
||||
stats: Arc<InsertWorkerStats>,
|
||||
) -> InsertFut {
|
||||
let params = (series.id() as i64, ts_msp as i64, ttls.index.as_secs() as i32);
|
||||
InsertFut::new(scy, qu, params)
|
||||
InsertFut::new(scy, qu, params, tsnet, stats)
|
||||
}
|
||||
|
||||
pub fn insert_item_fut(item: InsertItem, ttls: &Ttls, data_store: &DataStore, do_insert: bool) -> InsertFut {
|
||||
pub fn insert_item_fut(
|
||||
item: InsertItem,
|
||||
ttls: &Ttls,
|
||||
data_store: &DataStore,
|
||||
do_insert: bool,
|
||||
stats: &Arc<InsertWorkerStats>,
|
||||
) -> InsertFut {
|
||||
let scy = data_store.scy.clone();
|
||||
use DataValue::*;
|
||||
match item.val {
|
||||
@@ -506,9 +535,11 @@ pub fn insert_item_fut(item: InsertItem, ttls: &Ttls, data_store: &DataStore, do
|
||||
series: item.series.id(),
|
||||
ts_msp: item.ts_msp,
|
||||
ts_lsp: item.ts_lsp,
|
||||
ts_local: item.ts_local,
|
||||
pulse: item.pulse,
|
||||
ttl: ttls.d0.as_secs() as _,
|
||||
do_insert,
|
||||
stats: stats.clone(),
|
||||
};
|
||||
use ScalarValue::*;
|
||||
match val {
|
||||
@@ -527,9 +558,11 @@ pub fn insert_item_fut(item: InsertItem, ttls: &Ttls, data_store: &DataStore, do
|
||||
series: item.series.id(),
|
||||
ts_msp: item.ts_msp,
|
||||
ts_lsp: item.ts_lsp,
|
||||
ts_local: item.ts_local,
|
||||
pulse: item.pulse,
|
||||
ttl: ttls.d1.as_secs() as _,
|
||||
do_insert,
|
||||
stats: stats.clone(),
|
||||
};
|
||||
use ArrayValue::*;
|
||||
match val {
|
||||
@@ -544,13 +577,20 @@ pub fn insert_item_fut(item: InsertItem, ttls: &Ttls, data_store: &DataStore, do
|
||||
}
|
||||
}
|
||||
|
||||
pub fn insert_connection_status_fut(item: ConnectionStatusItem, ttls: &Ttls, data_store: &DataStore) -> InsertFut {
|
||||
pub fn insert_connection_status_fut(
|
||||
item: ConnectionStatusItem,
|
||||
ttls: &Ttls,
|
||||
data_store: &DataStore,
|
||||
stats: Arc<InsertWorkerStats>,
|
||||
) -> InsertFut {
|
||||
let tsunix = item.ts.duration_since(UNIX_EPOCH).unwrap_or(Duration::ZERO);
|
||||
let secs = tsunix.as_secs() * SEC;
|
||||
let nanos = tsunix.subsec_nanos() as u64;
|
||||
let ts = secs + nanos;
|
||||
let ts_msp = ts / CONNECTION_STATUS_DIV * CONNECTION_STATUS_DIV;
|
||||
let ts_lsp = ts - ts_msp;
|
||||
// TODO is that the good tsnet to use?
|
||||
let tsnet = ts;
|
||||
let kind = item.status.to_kind();
|
||||
let addr = format!("{}", item.addr);
|
||||
let params = (
|
||||
@@ -564,6 +604,8 @@ pub fn insert_connection_status_fut(item: ConnectionStatusItem, ttls: &Ttls, dat
|
||||
data_store.scy.clone(),
|
||||
data_store.qu_insert_connection_status.clone(),
|
||||
params,
|
||||
tsnet,
|
||||
stats,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -571,6 +613,7 @@ pub fn insert_channel_status_fut(
|
||||
item: ChannelStatusItem,
|
||||
ttls: &Ttls,
|
||||
data_store: &DataStore,
|
||||
stats: Arc<InsertWorkerStats>,
|
||||
) -> SmallVec<[InsertFut; 4]> {
|
||||
let tsunix = item.ts.duration_since(UNIX_EPOCH).unwrap_or(Duration::ZERO);
|
||||
let secs = tsunix.as_secs() * SEC;
|
||||
@@ -578,6 +621,7 @@ pub fn insert_channel_status_fut(
|
||||
let ts = secs + nanos;
|
||||
let ts_msp = ts / CONNECTION_STATUS_DIV * CONNECTION_STATUS_DIV;
|
||||
let ts_lsp = ts - ts_msp;
|
||||
let tsnet = ts;
|
||||
let kind = item.status.to_kind();
|
||||
let cssid = item.cssid.id();
|
||||
let params = (
|
||||
@@ -591,6 +635,8 @@ pub fn insert_channel_status_fut(
|
||||
data_store.scy.clone(),
|
||||
data_store.qu_insert_channel_status.clone(),
|
||||
params,
|
||||
tsnet,
|
||||
stats.clone(),
|
||||
);
|
||||
let params = (
|
||||
ts_msp as i64,
|
||||
@@ -603,6 +649,8 @@ pub fn insert_channel_status_fut(
|
||||
data_store.scy.clone(),
|
||||
data_store.qu_insert_channel_status_by_ts_msp.clone(),
|
||||
params,
|
||||
tsnet,
|
||||
stats,
|
||||
);
|
||||
smallvec![fut1, fut2]
|
||||
}
|
||||
|
||||
@@ -11,5 +11,3 @@ path = "src/stats.rs"
|
||||
stats_types = { path = "../stats_types" }
|
||||
stats_proc = { path = "../stats_proc" }
|
||||
log = { path = "../log" }
|
||||
err = { path = "../../daqbuffer/crates/err" }
|
||||
libc = "0.2"
|
||||
|
||||
+38
-33
@@ -1,6 +1,7 @@
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::time::{Duration, Instant};
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
|
||||
const US: u64 = 1000;
|
||||
const MS: u64 = US * 1000;
|
||||
@@ -207,7 +208,35 @@ impl IntervalEma {
|
||||
}
|
||||
}
|
||||
|
||||
// #[cfg(DISABLED)]
|
||||
pub struct XorShift32 {
|
||||
state: u32,
|
||||
}
|
||||
|
||||
impl XorShift32 {
|
||||
pub fn new(state: u32) -> Self {
|
||||
Self { state }
|
||||
}
|
||||
|
||||
pub fn new_from_time() -> Self {
|
||||
use std::time::SystemTime;
|
||||
Self::new(
|
||||
SystemTime::now()
|
||||
.duration_since(SystemTime::UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.subsec_micros(),
|
||||
)
|
||||
}
|
||||
|
||||
pub fn next(&mut self) -> u32 {
|
||||
let mut x = self.state;
|
||||
x ^= x << 13;
|
||||
x ^= x >> 17;
|
||||
x ^= x << 5;
|
||||
self.state = x;
|
||||
x
|
||||
}
|
||||
}
|
||||
|
||||
stats_proc::stats_struct!((
|
||||
stats_struct(
|
||||
name(CaProtoStats),
|
||||
@@ -216,9 +245,11 @@ stats_proc::stats_struct!((
|
||||
tcp_recv_count,
|
||||
tcp_recv_bytes,
|
||||
protocol_issue,
|
||||
payload_very_large,
|
||||
payload_std_too_large,
|
||||
payload_ext_but_small,
|
||||
payload_ext_very_large,
|
||||
),
|
||||
histolog2s(payload_size, data_count,),
|
||||
),
|
||||
stats_struct(
|
||||
name(CaConnSetStats),
|
||||
@@ -301,24 +332,10 @@ stats_proc::stats_struct!((
|
||||
inserts_msp_grid,
|
||||
inserts_value,
|
||||
ratelimit_drop,
|
||||
item_latency_neg,
|
||||
item_latency_025ms,
|
||||
item_latency_050ms,
|
||||
item_latency_100ms,
|
||||
item_latency_200ms,
|
||||
item_latency_400ms,
|
||||
item_latency_800ms,
|
||||
item_latency_1600ms,
|
||||
item_latency_3200ms,
|
||||
item_latency_large,
|
||||
item_commit_latency_0050ms,
|
||||
item_commit_latency_0200ms,
|
||||
item_commit_latency_0800ms,
|
||||
item_commit_latency_3200ms,
|
||||
item_commit_latency_large,
|
||||
worker_start,
|
||||
worker_finish,
|
||||
)
|
||||
),
|
||||
histolog2s(item_lat_net_worker, item_lat_net_store,),
|
||||
),
|
||||
stats_struct(
|
||||
name(IocFinderStats),
|
||||
@@ -346,7 +363,6 @@ stats_proc::stats_struct!((
|
||||
),
|
||||
));
|
||||
|
||||
// #[cfg(DISABLED)]
|
||||
stats_proc::stats_struct!((
|
||||
stats_struct(
|
||||
name(CaConnStats),
|
||||
@@ -399,19 +415,8 @@ stats_proc::stats_struct!((
|
||||
channel_alive_count,
|
||||
channel_not_alive_count,
|
||||
channel_series_lookup_already_pending,
|
||||
ca_ts_off_1,
|
||||
ca_ts_off_2,
|
||||
ca_ts_off_3,
|
||||
ca_ts_off_4,
|
||||
ping_start,
|
||||
ping_no_proto,
|
||||
pong_recv_010ms,
|
||||
pong_recv_025ms,
|
||||
pong_recv_050ms,
|
||||
pong_recv_100ms,
|
||||
pong_recv_200ms,
|
||||
pong_recv_400ms,
|
||||
pong_recv_slow,
|
||||
pong_timeout,
|
||||
ca_conn_poll_fn_begin,
|
||||
ca_conn_poll_loop_begin,
|
||||
@@ -419,13 +424,13 @@ stats_proc::stats_struct!((
|
||||
ca_conn_poll_pending,
|
||||
ca_conn_poll_no_progress_no_pending,
|
||||
),
|
||||
values(inter_ivl_ema)
|
||||
values(inter_ivl_ema),
|
||||
histolog2s(pong_recv_lat, ca_ts_off,),
|
||||
),
|
||||
agg(name(CaConnStatsAgg), parent(CaConnStats)),
|
||||
diff(name(CaConnStatsAggDiff), input(CaConnStatsAgg)),
|
||||
));
|
||||
|
||||
// #[cfg(DISABLED)]
|
||||
stats_proc::stats_struct!((
|
||||
stats_struct(
|
||||
name(DaemonStats),
|
||||
|
||||
@@ -17,6 +17,7 @@ struct StatsStructDef {
|
||||
prefix: Option<syn::Ident>,
|
||||
counters: Vec<syn::Ident>,
|
||||
values: Vec<syn::Ident>,
|
||||
histolog2s: Vec<syn::Ident>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -49,7 +50,11 @@ fn stats_struct_impl(st: &StatsStructDef) -> String {
|
||||
.values
|
||||
.iter()
|
||||
.map(|x| format!("{:12}{}: stats_types::Value::new()", "", x.to_string()));
|
||||
let inits: Vec<_> = inits1.into_iter().chain(inits2).collect();
|
||||
let init_histolog2s = st
|
||||
.histolog2s
|
||||
.iter()
|
||||
.map(|x| format!("{:12}{}: stats_types::HistoLog2::new(0)", "", x.to_string()));
|
||||
let inits: Vec<_> = inits1.into_iter().chain(inits2).chain(init_histolog2s).collect();
|
||||
let inits = inits.join(",\n");
|
||||
let incers: String = st
|
||||
.counters
|
||||
@@ -76,7 +81,22 @@ fn stats_struct_impl(st: &StatsStructDef) -> String {
|
||||
write!(
|
||||
buf,
|
||||
"
|
||||
pub fn {nn}(&self) -> &stats_types::Value {{
|
||||
pub fn {nn}(&self) -> &stats_types::Value {{
|
||||
&self.{nn}
|
||||
}}
|
||||
"
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
buf
|
||||
};
|
||||
let histolog2s = {
|
||||
let mut buf = String::new();
|
||||
for nn in &st.histolog2s {
|
||||
write!(
|
||||
buf,
|
||||
"
|
||||
pub fn {nn}(&self) -> &stats_types::HistoLog2 {{
|
||||
&self.{nn}
|
||||
}}
|
||||
"
|
||||
@@ -111,6 +131,17 @@ fn stats_struct_impl(st: &StatsStructDef) -> String {
|
||||
pre, n, n
|
||||
));
|
||||
}
|
||||
for x in &st.histolog2s {
|
||||
let n = x.to_string();
|
||||
let fullname = if let Some(x) = &st.prefix {
|
||||
format!("daqingest_{x}_{n}")
|
||||
} else {
|
||||
format!("daqingest_{n}")
|
||||
};
|
||||
buf.push_str(&format!("let fullname = \"{fullname}\";\n"));
|
||||
buf.push_str(&format!("let s = self.{n}.to_prometheus(fullname);\n"));
|
||||
buf.push_str(&format!("ret.push_str(&s);\n"));
|
||||
}
|
||||
format!(
|
||||
"
|
||||
pub fn prometheus(&self) -> String {{
|
||||
@@ -158,6 +189,8 @@ impl {name} {{
|
||||
|
||||
{values}
|
||||
|
||||
{histolog2s}
|
||||
|
||||
{fn_prometheus}
|
||||
|
||||
{fn_snapshot}
|
||||
@@ -185,6 +218,11 @@ fn stats_struct_decl_impl(st: &StatsStructDef) -> String {
|
||||
.iter()
|
||||
.map(|x| format!("{:4}pub {}: stats_types::Value,\n", "", x.to_string()))
|
||||
.fold(String::new(), extend_str);
|
||||
let histolog2s_decl = st
|
||||
.histolog2s
|
||||
.iter()
|
||||
.map(|x| format!("{:4}pub {}: stats_types::HistoLog2,\n", "", x.to_string()))
|
||||
.fold(String::new(), extend_str);
|
||||
let structt = format!(
|
||||
"
|
||||
pub struct {name} {{
|
||||
@@ -192,6 +230,7 @@ pub struct {name} {{
|
||||
dropped: stats_types::Value,
|
||||
{counters_decl}
|
||||
{values_decl}
|
||||
{histolog2s_decl}
|
||||
}}
|
||||
|
||||
"
|
||||
@@ -407,7 +446,8 @@ fn idents_from_exprs(inp: PunctExpr) -> syn::Result<Vec<syn::Ident>> {
|
||||
|
||||
fn func_name_from_expr(inp: syn::Expr) -> syn::Result<syn::Ident> {
|
||||
use syn::spanned::Spanned;
|
||||
use syn::{Error, Expr};
|
||||
use syn::Error;
|
||||
use syn::Expr;
|
||||
match inp {
|
||||
Expr::Path(k) => {
|
||||
if k.path.segments.len() != 1 {
|
||||
@@ -425,7 +465,8 @@ fn func_name_from_expr(inp: syn::Expr) -> syn::Result<syn::Ident> {
|
||||
impl FuncCallWithArgs {
|
||||
fn from_expr(inp: syn::Expr) -> Result<Self, syn::Error> {
|
||||
use syn::spanned::Spanned;
|
||||
use syn::{Error, Expr};
|
||||
use syn::Error;
|
||||
use syn::Expr;
|
||||
let span_all = inp.span();
|
||||
match inp {
|
||||
Expr::Call(k) => {
|
||||
@@ -448,6 +489,7 @@ impl StatsStructDef {
|
||||
prefix: syn::parse_str("__empty").unwrap(),
|
||||
counters: Vec::new(),
|
||||
values: Vec::new(),
|
||||
histolog2s: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -456,6 +498,7 @@ impl StatsStructDef {
|
||||
let mut prefix = None;
|
||||
let mut counters = None;
|
||||
let mut values = None;
|
||||
let mut histolog2s = None;
|
||||
for k in inp {
|
||||
let fa = FuncCallWithArgs::from_expr(k)?;
|
||||
if fa.name == "name" {
|
||||
@@ -470,6 +513,9 @@ impl StatsStructDef {
|
||||
} else if fa.name == "values" {
|
||||
let idents = idents_from_exprs(fa.args)?;
|
||||
values = Some(idents);
|
||||
} else if fa.name == "histolog2s" {
|
||||
let idents = idents_from_exprs(fa.args)?;
|
||||
histolog2s = Some(idents);
|
||||
} else {
|
||||
panic!("fa.name: {:?}", fa.name);
|
||||
}
|
||||
@@ -479,6 +525,7 @@ impl StatsStructDef {
|
||||
prefix,
|
||||
counters: counters.unwrap_or(Vec::new()),
|
||||
values: values.unwrap_or(Vec::new()),
|
||||
histolog2s: histolog2s.unwrap_or(Vec::new()),
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
@@ -587,6 +634,7 @@ pub fn stats_struct(ts: TokenStream) -> TokenStream {
|
||||
prefix: None,
|
||||
counters: j.stats.counters.clone(),
|
||||
values: Vec::new(),
|
||||
histolog2s: Vec::new(),
|
||||
};
|
||||
def.stats_struct_defs.push(h);
|
||||
}
|
||||
@@ -620,6 +668,8 @@ pub fn stats_struct(ts: TokenStream) -> TokenStream {
|
||||
counters: j.stats.counters.clone(),
|
||||
// TODO compute values
|
||||
values: Vec::new(),
|
||||
// TODO not supported yet
|
||||
histolog2s: Vec::new(),
|
||||
};
|
||||
let s = diff_decl_impl(k, &p);
|
||||
code.push_str(&s);
|
||||
|
||||
@@ -65,6 +65,14 @@ impl Value {
|
||||
self.0.store(x, Release);
|
||||
}
|
||||
|
||||
pub fn inc(&self) {
|
||||
self.0.fetch_add(1, AcqRel);
|
||||
}
|
||||
|
||||
pub fn dec(&self) {
|
||||
self.0.fetch_sub(1, AcqRel);
|
||||
}
|
||||
|
||||
pub fn load(&self) -> u64 {
|
||||
self.0.load(Acquire)
|
||||
}
|
||||
@@ -108,3 +116,106 @@ struct StatsAReader {
|
||||
}
|
||||
|
||||
impl StatsAReader {}
|
||||
|
||||
pub struct HistoLog2 {
|
||||
histo: [AtomicU64; 16],
|
||||
sub: u16,
|
||||
}
|
||||
|
||||
macro_rules! rep16 {
|
||||
([$x:expr]) => {
|
||||
[$x, $x, $x, $x, $x, $x, $x, $x, $x, $x, $x, $x, $x, $x, $x, $x]
|
||||
};
|
||||
}
|
||||
|
||||
impl HistoLog2 {
|
||||
pub fn new(sub: u16) -> Self {
|
||||
Self {
|
||||
histo: rep16!([AtomicU64::new(0)]),
|
||||
sub,
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn ingest(&self, mut v: u32) {
|
||||
v >>= self.sub;
|
||||
let mut po = 0;
|
||||
while v != 0 && po < self.histo.len() - 1 {
|
||||
v >>= 1;
|
||||
po += 1;
|
||||
}
|
||||
self.histo[po].fetch_add(1, AcqRel);
|
||||
}
|
||||
|
||||
pub fn to_prometheus(&self, name: &str) -> String {
|
||||
let mut ret = String::with_capacity(2048);
|
||||
ret.push_str("# HELP ");
|
||||
ret.push_str(name);
|
||||
ret.push_str(" help-text-missing\n");
|
||||
ret.push_str("# TYPE ");
|
||||
ret.push_str(name);
|
||||
ret.push_str(" histogram\n");
|
||||
let mut cnt = 0;
|
||||
let mut sum = 0;
|
||||
for (i, a) in self.histo.iter().enumerate() {
|
||||
use std::ops::Sub;
|
||||
let i = i as u32;
|
||||
let le = 2u32.pow(i).sub(1);
|
||||
let v = a.load(Acquire);
|
||||
cnt += v;
|
||||
sum += v * le as u64;
|
||||
ret.push_str(name);
|
||||
ret.push_str("_bucket{le=\"");
|
||||
ret.push_str(&le.to_string());
|
||||
ret.push_str("\"} ");
|
||||
ret.push_str(&cnt.to_string());
|
||||
ret.push_str("\n");
|
||||
}
|
||||
ret.push_str(name);
|
||||
ret.push_str("_bucket{le=\"+Inf\"} ");
|
||||
ret.push_str(&cnt.to_string());
|
||||
ret.push_str("\n");
|
||||
ret.push_str(name);
|
||||
ret.push_str("_count ");
|
||||
ret.push_str(&cnt.to_string());
|
||||
ret.push_str("\n");
|
||||
ret.push_str(name);
|
||||
ret.push_str("_sum ");
|
||||
ret.push_str(&sum.to_string());
|
||||
ret.push_str("\n");
|
||||
ret
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn histo_00() {
|
||||
let histo = HistoLog2::new(0);
|
||||
// histo.ingest(0);
|
||||
// histo.ingest(1);
|
||||
// histo.ingest(2);
|
||||
histo.ingest(3);
|
||||
histo.ingest(4);
|
||||
let exp = r##"# HELP latA help-text-missing
|
||||
# TYPE latA histogram
|
||||
latA_bucket{le="0"} 0
|
||||
latA_bucket{le="1"} 0
|
||||
latA_bucket{le="3"} 1
|
||||
latA_bucket{le="7"} 2
|
||||
latA_bucket{le="15"} 2
|
||||
latA_bucket{le="31"} 2
|
||||
latA_bucket{le="63"} 2
|
||||
latA_bucket{le="127"} 2
|
||||
latA_bucket{le="255"} 2
|
||||
latA_bucket{le="511"} 2
|
||||
latA_bucket{le="1023"} 2
|
||||
latA_bucket{le="2047"} 2
|
||||
latA_bucket{le="4095"} 2
|
||||
latA_bucket{le="8191"} 2
|
||||
latA_bucket{le="16383"} 2
|
||||
latA_bucket{le="32767"} 2
|
||||
latA_bucket{le="+Inf"} 2
|
||||
latA_count 2
|
||||
latA_sum 10
|
||||
"##;
|
||||
assert_eq!(histo.to_prometheus("latA"), exp);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user