Try to reconnect on tcp error, take timestamp from epics, warn if ts off

This commit is contained in:
Dominik Werder
2022-07-21 11:27:38 +02:00
parent 27e93f2539
commit 41ac41f3ae
8 changed files with 369 additions and 310 deletions

View File

@@ -5,7 +5,7 @@ use log::*;
pub fn main() -> Result<(), Error> {
let opts = DaqIngestOpts::parse();
log::info!("daqingest version {}", clap::crate_version!());
info!("daqingest version {}", clap::crate_version!());
let runtime = taskrun::get_runtime_opts(opts.nworkers.unwrap_or(12), 32);
let res = runtime.block_on(async move {
match opts.subcmd {

View File

@@ -3,7 +3,6 @@ pub mod proto;
pub mod search;
pub mod store;
use self::conn::FindIocStream;
use self::store::DataStore;
use crate::store::{CommonInsertItemQueue, QueryItem};
use conn::CaConn;
@@ -15,20 +14,19 @@ use scylla::batch::Consistency;
use serde::{Deserialize, Serialize};
use stats::{CaConnStats, CaConnStatsAgg, CaConnStatsAggDiff};
use std::collections::BTreeMap;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::net::{Ipv4Addr, SocketAddrV4};
use std::path::PathBuf;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex, Once};
use std::time::{Duration, Instant};
use std::time::Duration;
use tokio::fs::OpenOptions;
use tokio::io::AsyncReadExt;
use tokio::net::TcpStream;
use tokio_postgres::Client as PgClient;
static mut METRICS: Option<Mutex<Option<CaConnStatsAgg>>> = None;
static METRICS_ONCE: Once = Once::new();
fn get_metrics() -> &'static mut Option<CaConnStatsAgg> {
pub fn get_metrics() -> &'static mut Option<CaConnStatsAgg> {
METRICS_ONCE.call_once(|| unsafe {
METRICS = Some(Mutex::new(None));
});
@@ -58,6 +56,7 @@ struct ChannelConfig {
insert_queue_max: Option<usize>,
insert_item_queue_cap: Option<usize>,
api_bind: Option<String>,
local_epics_hostname: String,
}
pub struct ListenFromFileOpts {
@@ -70,16 +69,15 @@ pub async fn parse_config(config: PathBuf) -> Result<CaConnectOpts, Error> {
file.read_to_end(&mut buf).await?;
let mut conf: ChannelConfig =
serde_yaml::from_slice(&buf).map_err(|e| Error::with_msg_no_trace(format!("{:?}", e)))?;
let re1 = regex::Regex::new(&conf.whitelist)?;
let re2 = regex::Regex::new(&conf.blacklist)?;
let re_p = regex::Regex::new(&conf.whitelist)?;
let re_n = regex::Regex::new(&conf.blacklist)?;
conf.channels = conf
.channels
.into_iter()
.filter(|ch| {
if let Some(_cs) = re1.captures(&ch) {
//let m = cs.get(1).unwrap();
if let Some(_cs) = re_p.captures(&ch) {
true
} else if re2.is_match(&ch) {
} else if re_n.is_match(&ch) {
false
} else {
true
@@ -101,6 +99,7 @@ pub async fn parse_config(config: PathBuf) -> Result<CaConnectOpts, Error> {
insert_queue_max: conf.insert_queue_max.unwrap_or(32),
insert_item_queue_cap: conf.insert_item_queue_cap.unwrap_or(380000),
api_bind: conf.api_bind.unwrap_or_else(|| "0.0.0.0:3011".into()),
local_epics_hostname: conf.local_epics_hostname,
})
}
@@ -119,6 +118,7 @@ pub struct CaConnectOpts {
pub insert_queue_max: usize,
pub insert_item_queue_cap: usize,
pub api_bind: String,
pub local_epics_hostname: String,
}
async fn spawn_scylla_insert_workers(
@@ -235,7 +235,7 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
let insert_ivl_min = Arc::new(AtomicU64::new(8800));
let opts = parse_config(opts.config).await?;
let scyconf = opts.scyconf.clone();
tokio::spawn(start_metrics_service(
tokio::spawn(crate::metrics::start_metrics_service(
opts.api_bind.clone(),
insert_frac.clone(),
insert_ivl_min.clone(),
@@ -290,7 +290,7 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
],
)
.await
.map_err(|e| Error::with_msg_no_trace(format!("PG error: {e:?}")))?;
.map_err(|e| Error::with_msg_no_trace(format!("pg lookup error: {e:?}")))?;
if rows.is_empty() {
error!("can not find any addresses of channels {:?}", chstmp);
} else {
@@ -347,28 +347,11 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
let mut conn_stats = vec![];
info!("channels_by_host len {}", channels_by_host.len());
for (host, channels) in channels_by_host {
if false && host.ip() != &"172.26.24.76".parse::<Ipv4Addr>().unwrap() {
continue;
}
let data_store = data_store.clone();
//debug!("Create TCP connection to {:?}", (host.ip(), host.port()));
let addr = SocketAddrV4::new(host.ip().clone(), host.port());
// TODO establish the connection in the future SM.
let tcp = match tokio::time::timeout(Duration::from_millis(500), TcpStream::connect(addr)).await {
Ok(Ok(k)) => k,
Ok(Err(e)) => {
error!("Can not connect to {addr:?} {e:?}");
continue;
}
Err(e) => {
error!("Can not connect to {addr:?} {e:?}");
continue;
}
};
local_stats.tcp_connected_inc();
let mut conn = CaConn::new(
tcp,
addr,
opts.local_epics_hostname.clone(),
data_store.clone(),
insert_item_queue.sender(),
opts.array_truncate,
@@ -385,8 +368,6 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
match item {
Ok(_) => {
stats2.conn_item_count_inc();
// TODO test if performance can be noticed:
//trace!("CaConn gives item: {k:?}");
}
Err(e) => {
error!("CaConn gives error: {e:?}");
@@ -435,39 +416,3 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
}
Ok(())
}
async fn start_metrics_service(bind_to: String, insert_frac: Arc<AtomicU64>, insert_ivl_min: Arc<AtomicU64>) {
let app = axum::Router::new()
.route(
"/metrics",
axum::routing::get(|| async {
let stats = get_metrics();
match stats {
Some(s) => {
trace!("Metrics");
s.prometheus()
}
None => {
trace!("Metrics empty");
String::new()
}
}
}),
)
.route(
"/insert_frac",
axum::routing::put(|v: axum::extract::Json<u64>| async move {
insert_frac.store(v.0, Ordering::Release);
}),
)
.route(
"/insert_ivl_min",
axum::routing::put(|v: axum::extract::Json<u64>| async move {
insert_ivl_min.store(v.0, Ordering::Release);
}),
);
axum::Server::bind(&bind_to.parse().unwrap())
.serve(app.into_make_service())
.await
.unwrap()
}

View File

@@ -2,6 +2,7 @@ use super::proto::{self, CaItem, CaMsg, CaMsgTy, CaProto};
use super::store::DataStore;
use crate::bsread::ChannelDescDecoded;
use crate::ca::proto::{CreateChan, EventAdd, HeadInfo};
use crate::ca::store::ChannelRegistry;
use crate::series::{Existence, SeriesId};
use crate::store::{CommonInsertItemQueueSender, InsertItem, IvlItem, MuteItem, QueryItem};
use err::Error;
@@ -78,10 +79,17 @@ enum ChannelState {
}
enum CaConnState {
Unconnected,
Connecting(Pin<Box<dyn Future<Output = Result<TcpStream, Error>> + Send>>),
Init,
Listen,
PeerReady,
Done,
Wait(Pin<Box<dyn Future<Output = ()> + Send>>),
}
fn wait_fut(dt: u64) -> Pin<Box<dyn Future<Output = ()> + Send>> {
let fut = tokio::time::sleep(Duration::from_millis(dt));
Box::pin(fut)
}
struct IdStore {
@@ -103,7 +111,7 @@ impl IdStore {
#[allow(unused)]
pub struct CaConn {
state: CaConnState,
proto: CaProto,
proto: Option<CaProto>,
cid_store: IdStore,
ioid_store: IdStore,
subid_store: IdStore,
@@ -121,6 +129,8 @@ pub struct CaConn {
fut_get_series:
FuturesOrdered<Pin<Box<dyn Future<Output = Result<(u32, u32, u16, u16, Existence<SeriesId>), Error>> + Send>>>,
remote_addr_dbg: SocketAddrV4,
local_epics_hostname: String,
array_truncate: usize,
stats: Arc<CaConnStats>,
insert_queue_max: usize,
insert_ivl_min: Arc<AtomicU64>,
@@ -128,8 +138,8 @@ pub struct CaConn {
impl CaConn {
pub fn new(
tcp: TcpStream,
remote_addr_dbg: SocketAddrV4,
local_epics_hostname: String,
data_store: Arc<DataStore>,
insert_item_sender: CommonInsertItemQueueSender,
array_truncate: usize,
@@ -137,8 +147,8 @@ impl CaConn {
insert_ivl_min: Arc<AtomicU64>,
) -> Self {
Self {
state: CaConnState::Init,
proto: CaProto::new(tcp, remote_addr_dbg, array_truncate),
state: CaConnState::Unconnected,
proto: None,
cid_store: IdStore::new(),
ioid_store: IdStore::new(),
subid_store: IdStore::new(),
@@ -155,6 +165,8 @@ impl CaConn {
insert_item_send_fut: None,
fut_get_series: FuturesOrdered::new(),
remote_addr_dbg,
local_epics_hostname,
array_truncate,
stats: Arc::new(CaConnStats::new()),
insert_queue_max,
insert_ivl_min,
@@ -190,7 +202,6 @@ impl CaConn {
self.name_by_cid.get(&cid).map(|x| x.as_str())
}
#[inline(never)]
fn handle_insert_futs(&mut self, cx: &mut Context) -> Poll<Result<(), Error>> {
use Poll::*;
loop {
@@ -246,18 +257,24 @@ impl CaConn {
if series.id() == 0 {
warn!("Weird series id: {series:?}");
}
if data_type > 6 {
error!("data type of series unexpected: {}", data_type);
}
let subid = self.subid_store.next();
self.cid_by_subid.insert(subid, cid);
let name = self.name_by_cid(cid).unwrap().to_string();
// TODO convert first to CaDbrType, set to `Time`, then convert to ix:
let data_type_asked = data_type + 14;
let msg = CaMsg {
ty: CaMsgTy::EventAdd(EventAdd {
sid,
data_type,
data_type: data_type_asked,
data_count,
subid,
}),
};
self.proto.push_out(msg);
let proto = self.proto.as_mut().unwrap();
proto.push_out(msg);
// TODO handle not-found error:
let ch_s = self.channels.get_mut(&cid).unwrap();
*ch_s = ChannelState::Created(CreatedState {
@@ -298,7 +315,6 @@ impl CaConn {
Ok(())
}
#[inline(never)]
fn event_add_insert(
&mut self,
series: SeriesId,
@@ -313,8 +329,8 @@ impl CaConn {
) -> Result<(), Error> {
// TODO decide on better msp/lsp: random offset!
// As long as one writer is active, the msp is arbitrary.
let ts_msp = if inserted_in_ts_msp > 2000 {
let ts_msp = ts / (60 * SEC) * (60 * SEC);
let ts_msp = if inserted_in_ts_msp > 20000 {
let ts_msp = ts / (10 * SEC) * (10 * SEC);
if let ChannelState::Created(st) = self.channels.get_mut(&cid).unwrap() {
st.ts_msp_last = ts_msp;
st.inserted_in_ts_msp = 1;
@@ -351,7 +367,7 @@ impl CaConn {
pulse: 0,
scalar_type,
shape,
val: ev.value,
val: ev.value.data,
ts_msp_grid,
};
item_queue.push_back(QueryItem::Insert(item));
@@ -359,17 +375,17 @@ impl CaConn {
Ok(())
}
#[inline(never)]
fn handle_event_add_res(&mut self, ev: proto::EventAddRes) -> Result<(), Error> {
fn handle_event_add_res(&mut self, ev: proto::EventAddRes, tsnow: Instant) -> Result<(), Error> {
// TODO handle subid-not-found which can also be peer error:
let cid = *self.cid_by_subid.get(&ev.subid).unwrap();
//let name = self.name_by_cid(cid).unwrap().to_string();
// TODO get rid of the string clone when I don't want the log output any longer:
//let name: String = self.name_by_cid(cid).unwrap().into();
// TODO handle not-found error:
let mut series_2 = None;
let ch_s = self.channels.get_mut(&cid).unwrap();
match ch_s {
ChannelState::Created(st) => {
st.item_recv_ivl_ema.tick(Instant::now());
let scalar_type = st.scalar_type.clone();
let shape = st.shape.clone();
match st.state {
@@ -401,12 +417,24 @@ impl CaConn {
return Err(format!("no series id on insert").into());
}
};
let ts = {
let ts_local = {
let ts = SystemTime::now();
let epoch = ts.duration_since(std::time::UNIX_EPOCH).unwrap();
epoch.as_secs() * SEC + epoch.subsec_nanos() as u64
};
let tsnow = Instant::now();
let ts = ev.value.ts.map_or(0, |x| x.get());
let ts_diff = ts.abs_diff(ts_local);
if ts_diff > SEC * 300 {
self.stats.ca_ts_off_4_inc();
//warn!("Bad time for {name} {ts} vs {ts_local} diff {}", ts_diff / SEC);
// TODO mute this channel for some time, discard the event.
} else if ts_diff > SEC * 120 {
self.stats.ca_ts_off_3_inc();
} else if ts_diff > SEC * 20 {
self.stats.ca_ts_off_2_inc();
} else if ts_diff > SEC * 3 {
self.stats.ca_ts_off_1_inc();
}
if tsnow >= st.insert_next_earliest {
st.muted_before = 0;
st.insert_item_ivl_ema.tick(tsnow);
@@ -441,7 +469,7 @@ impl CaConn {
)?;
} else {
self.stats.channel_fast_item_drop_inc();
if tsnow.duration_since(st.insert_recv_ivl_last) >= Duration::from_millis(2000) {
if tsnow.duration_since(st.insert_recv_ivl_last) >= Duration::from_millis(10000) {
st.insert_recv_ivl_last = tsnow;
let ema = st.insert_item_ivl_ema.ema();
let item = IvlItem {
@@ -477,10 +505,9 @@ impl CaConn {
Pending
Ready(no-more-work, something-was-done, error)
*/
#[inline(never)]
fn handle_conn_listen(&mut self, cx: &mut Context) -> Poll<Option<Result<(), Error>>> {
use Poll::*;
match self.proto.poll_next_unpin(cx) {
match self.proto.as_mut().unwrap().poll_next_unpin(cx) {
Ready(Some(k)) => match k {
Ok(k) => match k {
CaItem::Empty => {
@@ -513,14 +540,14 @@ impl CaConn {
},
Ready(None) => {
warn!("CaProto is done {:?}", self.remote_addr_dbg);
self.state = CaConnState::Done;
self.state = CaConnState::Wait(wait_fut(10000));
self.proto = None;
Ready(None)
}
Pending => Pending,
}
}
#[inline(never)]
fn check_channels_state_init(&mut self, msgs_tmp: &mut Vec<CaMsg>) -> Result<(), Error> {
// TODO profile, efficient enough?
if self.init_state_count == 0 {
@@ -561,7 +588,6 @@ impl CaConn {
// Can return:
// Pending, error, work-done (pending state unknown), no-more-work-ever-again.
#[inline(never)]
fn handle_peer_ready(&mut self, cx: &mut Context) -> Poll<Option<Result<(), Error>>> {
use Poll::*;
let mut ts1 = Instant::now();
@@ -578,11 +604,15 @@ impl CaConn {
//info!("msgs_tmp.len() {}", msgs_tmp.len());
do_wake_again = true;
}
// TODO be careful to not overload outgoing message queue.
for msg in msgs_tmp {
self.proto.push_out(msg);
{
let proto = self.proto.as_mut().unwrap();
// TODO be careful to not overload outgoing message queue.
for msg in msgs_tmp {
proto.push_out(msg);
}
}
let res = match self.proto.poll_next_unpin(cx) {
let tsnow = Instant::now();
let res = match self.proto.as_mut().unwrap().poll_next_unpin(cx) {
Ready(Some(Ok(k))) => {
match k {
CaItem::Msg(k) => {
@@ -599,6 +629,12 @@ impl CaConn {
// TODO handle error:
let name = self.name_by_cid(cid).unwrap().to_string();
debug!("CreateChanRes {name:?}");
if false && name.contains(".STAT") {
info!("Channel created for {}", name);
}
if k.data_type > 6 {
error!("CreateChanRes with unexpected data_type {}", k.data_type);
}
let scalar_type = ScalarType::from_ca_id(k.data_type)?;
let shape = Shape::from_ca_count(k.data_count)?;
// TODO handle not-found error:
@@ -608,15 +644,15 @@ impl CaConn {
sid,
scalar_type: scalar_type.clone(),
shape: shape.clone(),
ts_created: Instant::now(),
ts_created: tsnow,
state: MonitoringState::FetchSeriesId,
ts_msp_last: 0,
ts_msp_grid_last: 0,
inserted_in_ts_msp: u64::MAX,
insert_item_ivl_ema: IntervalEma::new(),
item_recv_ivl_ema: IntervalEma::new(),
insert_recv_ivl_last: Instant::now(),
insert_next_earliest: Instant::now(),
insert_recv_ivl_last: tsnow,
insert_next_earliest: tsnow,
muted_before: 0,
});
// TODO handle error in different way. Should most likely not abort.
@@ -629,10 +665,10 @@ impl CaConn {
byte_order: netpod::ByteOrder::LE,
compression: None,
};
let y = unsafe { &*(&self as &Self as *const CaConn) };
let fut = y
.data_store
.chan_reg
let z = unsafe {
&*(&self.data_store.chan_reg as &ChannelRegistry as *const ChannelRegistry)
};
let fut = z
.get_series_id(cd)
.map_ok(move |series| (cid, k.sid, k.data_type, k.data_count, series));
// TODO throttle execution rate:
@@ -640,7 +676,7 @@ impl CaConn {
do_wake_again = true;
}
CaMsgTy::EventAddRes(k) => {
let res = Self::handle_event_add_res(self, k);
let res = Self::handle_event_add_res(self, k, tsnow);
let ts2 = Instant::now();
self.stats
.time_handle_event_add_res
@@ -662,7 +698,8 @@ impl CaConn {
}
Ready(None) => {
warn!("CaProto is done");
self.state = CaConnState::Done;
self.state = CaConnState::Wait(wait_fut(10000));
self.proto = None;
Ready(None)
}
Pending => Pending,
@@ -697,28 +734,63 @@ impl Stream for CaConn {
Pending => break Pending,
}
self.handle_get_series_futs(cx)?;
let ts2 = Instant::now();
self.stats
.poll_time_get_series_futs
.fetch_add((ts2.duration_since(ts1) * MS as u32).as_secs(), Ordering::AcqRel);
ts1 = ts2;
if self.insert_item_queue.len() >= self.insert_queue_max {
break Pending;
}
break loop {
break match &self.state {
break match &mut self.state {
CaConnState::Unconnected => {
let addr = self.remote_addr_dbg.clone();
let fut = async move {
trace!("create tcp connection to {:?}", (addr.ip(), addr.port()));
match tokio::time::timeout(Duration::from_millis(500), TcpStream::connect(addr)).await {
Ok(Ok(k)) => Ok(k),
Ok(Err(e)) => {
error!("Can not connect to {addr:?} {e:?}");
Err(e.into())
}
Err(e) => {
error!("Can not connect to {addr:?} {e:?}");
Err(Error::with_msg_no_trace(format!("timeout")))
}
}
};
self.state = CaConnState::Connecting(Box::pin(fut));
continue 'outer;
}
CaConnState::Connecting(ref mut fut) => {
match fut.poll_unpin(cx) {
Ready(Ok(tcp)) => {
let proto = CaProto::new(tcp, self.remote_addr_dbg.clone(), self.array_truncate);
self.state = CaConnState::Init;
self.proto = Some(proto);
continue 'outer;
}
Ready(Err(e)) => {
error!("Connection error: {e:?}");
// We can not connect to the remote.
// TODO do exponential backoff.
self.state = CaConnState::Wait(wait_fut(10000));
self.proto = None;
continue 'outer;
}
Pending => Pending,
}
}
CaConnState::Init => {
let hostname = self.local_epics_hostname.clone();
let proto = self.proto.as_mut().unwrap();
let msg = CaMsg { ty: CaMsgTy::Version };
self.proto.push_out(msg);
proto.push_out(msg);
let msg = CaMsg {
ty: CaMsgTy::ClientName,
};
self.proto.push_out(msg);
let msg = CaMsg { ty: CaMsgTy::HostName };
self.proto.push_out(msg);
proto.push_out(msg);
let msg = CaMsg {
ty: CaMsgTy::HostName(hostname),
};
proto.push_out(msg);
self.state = CaConnState::Listen;
continue 'outer;
}
@@ -737,6 +809,14 @@ impl Stream for CaConn {
Pending => Pending,
},
CaConnState::PeerReady => {
{
self.handle_get_series_futs(cx)?;
let ts2 = Instant::now();
self.stats
.poll_time_get_series_futs
.fetch_add((ts2.duration_since(ts1) * MS as u32).as_secs(), Ordering::AcqRel);
ts1 = ts2;
}
let res = self.handle_peer_ready(cx);
let ts2 = Instant::now();
self.stats.time_handle_peer_ready_dur(ts2.duration_since(ts1));
@@ -757,10 +837,14 @@ impl Stream for CaConn {
Pending => Pending,
}
}
CaConnState::Done => {
// TODO handle better
Pending
}
CaConnState::Wait(inst) => match inst.poll_unpin(cx) {
Ready(_) => {
self.state = CaConnState::Unconnected;
self.proto = None;
continue 'outer;
}
Pending => Pending,
},
};
};
};

View File

@@ -2,14 +2,17 @@ use crate::netbuf::NetBuf;
use err::Error;
use futures_util::{pin_mut, Stream};
use log::*;
use netpod::timeunits::*;
use std::collections::{BTreeMap, VecDeque};
use std::net::SocketAddrV4;
use std::num::{NonZeroU16, NonZeroU64};
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::net::TcpStream;
const CA_PROTO_VERSION: u16 = 13;
const EPICS_EPOCH_OFFSET: u64 = 631152000;
#[derive(Debug)]
pub struct Search {
@@ -69,7 +72,7 @@ pub struct EventAddRes {
pub data_count: u16,
pub status: u32,
pub subid: u32,
pub value: CaDataValue,
pub value: CaEventValue,
}
#[derive(Debug)]
@@ -99,6 +102,49 @@ enum CaScalarType {
String,
}
#[derive(Debug)]
enum CaDbrMetaType {
Plain,
Status,
Time,
}
#[derive(Debug)]
pub struct CaDbrType {
meta: CaDbrMetaType,
scalar_type: CaScalarType,
}
impl CaDbrType {
pub fn from_ca_u16(k: u16) -> Result<Self, Error> {
if k > 20 {
return Err(Error::with_msg_no_trace(format!(
"can not understand ca dbr type id {}",
k
)));
}
let (meta, k) = if k >= 14 {
(CaDbrMetaType::Time, k - 14)
} else if k >= 7 {
(CaDbrMetaType::Status, k - 7)
} else {
(CaDbrMetaType::Plain, k)
};
use CaScalarType::*;
let scalar_type = match k {
4 => I8,
1 => I16,
5 => I32,
2 => F32,
6 => F64,
3 => Enum,
0 => String,
k => return Err(Error::with_msg_no_trace(format!("bad ca scalar type id: {k}"))),
};
Ok(CaDbrType { meta, scalar_type })
}
}
#[derive(Clone, Debug)]
pub enum CaDataScalarValue {
I8(i8),
@@ -125,21 +171,12 @@ pub enum CaDataValue {
Array(CaDataArrayValue),
}
impl CaScalarType {
fn from_ca_u16(k: u16) -> Result<Self, Error> {
use CaScalarType::*;
let ret = match k {
4 => I8,
1 => I16,
5 => I32,
2 => F32,
6 => F64,
3 => Enum,
0 => String,
k => return Err(Error::with_msg_no_trace(format!("bad dbr type id: {k}"))),
};
Ok(ret)
}
#[derive(Clone, Debug)]
pub struct CaEventValue {
pub ts: Option<NonZeroU64>,
pub status: Option<NonZeroU16>,
pub severity: Option<NonZeroU16>,
pub data: CaDataValue,
}
#[derive(Debug)]
@@ -148,7 +185,7 @@ pub enum CaMsgTy {
VersionRes(u16),
ClientName,
ClientNameRes(ClientNameRes),
HostName,
HostName(String),
Search(Search),
SearchRes(SearchRes),
CreateChan(CreateChan),
@@ -169,7 +206,7 @@ impl CaMsgTy {
VersionRes(_) => 0,
ClientName => 0x14,
ClientNameRes(_) => 0x14,
HostName => 0x15,
HostName(_) => 0x15,
Search(_) => 0x06,
SearchRes(_) => 0x06,
CreateChan(_) => 0x12,
@@ -194,7 +231,7 @@ impl CaMsgTy {
VersionRes(_) => 0,
ClientName => 0x10,
ClientNameRes(x) => (x.name.len() + 1 + 7) / 8 * 8,
HostName => 0x18,
HostName(_) => 0x18,
Search(x) => (x.channel.len() + 1 + 7) / 8 * 8,
SearchRes(_) => 8,
CreateChan(x) => (x.channel.len() + 1 + 7) / 8 * 8,
@@ -221,7 +258,7 @@ impl CaMsgTy {
VersionRes(n) => *n,
ClientName => 0,
ClientNameRes(_) => 0,
HostName => 0,
HostName(_) => 0,
Search(_) => {
// Reply-flag
1
@@ -245,7 +282,7 @@ impl CaMsgTy {
VersionRes(_) => 0,
ClientName => 0,
ClientNameRes(_) => 0,
HostName => 0,
HostName(_) => 0,
Search(_) => CA_PROTO_VERSION,
SearchRes(_) => 0,
CreateChan(_) => 0,
@@ -266,7 +303,7 @@ impl CaMsgTy {
VersionRes(_) => 0,
ClientName => 0,
ClientNameRes(_) => 0,
HostName => 0,
HostName(_) => 0,
Search(e) => e.id,
SearchRes(x) => x.addr,
CreateChan(x) => x.cid,
@@ -287,7 +324,7 @@ impl CaMsgTy {
VersionRes(_) => 0,
ClientName => 0,
ClientNameRes(_) => 0,
HostName => 0,
HostName(_) => 0,
Search(e) => e.id,
SearchRes(x) => x.id,
CreateChan(_) => CA_PROTO_VERSION as _,
@@ -317,9 +354,8 @@ impl CaMsgTy {
error!("should not attempt to write ClientNameRes");
panic!();
}
HostName => {
// TODO allow variable host name. Null-extend always to 8 byte align.
let s = "sf-nube-11.psi.ch".as_bytes();
HostName(name) => {
let s = name.as_bytes();
let n = s.len();
buf.fill(0);
buf[..n].copy_from_slice(s);
@@ -364,6 +400,39 @@ impl CaMsgTy {
}
}
macro_rules! convert_scalar_value {
($st:ty, $var:ident, $buf:expr) => {{
type ST = $st;
const STL: usize = std::mem::size_of::<ST>();
if $buf.len() < STL {
return Err(Error::with_msg_no_trace(format!(
"not enough payload for {} {}",
std::any::type_name::<ST>(),
$buf.len()
)));
}
let v = ST::from_be_bytes($buf[..STL].try_into()?);
CaDataValue::Scalar(CaDataScalarValue::$var(v))
}};
}
macro_rules! convert_wave_value {
($st:ty, $var:ident, $n:expr, $buf:expr) => {{
type ST = $st;
const STL: usize = std::mem::size_of::<ST>();
let nn = $n.min($buf.len() / STL);
let mut a = Vec::with_capacity(nn);
// TODO optimize with unsafe?
let mut bb = &$buf[..];
for _ in 0..nn {
let v = ST::from_be_bytes(bb[..STL].try_into()?);
bb = &bb[STL..];
a.push(v);
}
CaDataValue::Array(CaDataArrayValue::$var(a))
}};
}
#[derive(Debug)]
pub struct CaMsg {
pub ty: CaMsgTy,
@@ -406,6 +475,48 @@ impl CaMsg {
}
}
fn ca_scalar_value(scalar_type: &CaScalarType, buf: &[u8]) -> Result<CaDataValue, Error> {
let val = match scalar_type {
CaScalarType::I8 => convert_scalar_value!(i8, I8, buf),
CaScalarType::I16 => convert_scalar_value!(i16, I16, buf),
CaScalarType::I32 => convert_scalar_value!(i32, I32, buf),
CaScalarType::F32 => convert_scalar_value!(f32, F32, buf),
CaScalarType::F64 => convert_scalar_value!(f64, F64, buf),
CaScalarType::Enum => convert_scalar_value!(i16, I16, buf),
CaScalarType::String => {
// TODO constrain string length to the CA `data_count`.
let mut ixn = buf.len();
for (i, &c) in buf.iter().enumerate() {
if c == 0 {
ixn = i;
break;
}
}
//info!("try to read string from payload len {} ixn {}", buf.len(), ixn);
let v = String::from_utf8_lossy(&buf[..ixn]);
CaDataValue::Scalar(CaDataScalarValue::String(v.into()))
}
};
Ok(val)
}
fn ca_wave_value(scalar_type: &CaScalarType, n: usize, buf: &[u8]) -> Result<CaDataValue, Error> {
let val = match scalar_type {
CaScalarType::I8 => convert_wave_value!(i8, I8, n, buf),
CaScalarType::I16 => convert_wave_value!(i16, I16, n, buf),
CaScalarType::I32 => convert_wave_value!(i32, I32, n, buf),
CaScalarType::F32 => convert_wave_value!(f32, F32, n, buf),
CaScalarType::F64 => convert_wave_value!(f64, F64, n, buf),
_ => {
warn!("TODO conversion array {scalar_type:?}");
return Err(Error::with_msg_no_trace(format!(
"can not yet handle conversion of type array {scalar_type:?}"
)));
}
};
Ok(val)
}
pub fn from_proto_infos(hi: &HeadInfo, payload: &[u8], array_truncate: usize) -> Result<Self, Error> {
let msg = match hi.cmdid {
0 => CaMsg {
@@ -420,7 +531,9 @@ impl CaMsg {
}
}
// TODO make response type for host name:
21 => CaMsg { ty: CaMsgTy::HostName },
21 => CaMsg {
ty: CaMsgTy::HostName("TODOx5288".into()),
},
6 => {
if hi.payload_size != 8 {
warn!("protocol error: search result is expected with fixed payload size 8");
@@ -469,174 +582,43 @@ impl CaMsg {
}
1 => {
use netpod::Shape;
let ca_st = CaScalarType::from_ca_u16(hi.data_type)?;
let ca_dbr_ty = CaDbrType::from_ca_u16(hi.data_type)?;
if let CaDbrMetaType::Time = ca_dbr_ty.meta {
} else {
return Err(Error::with_msg_no_trace(format!(
"expect ca dbr time type, got: {:?}",
ca_dbr_ty
)));
}
if payload.len() < 12 {
return Err(Error::with_msg_no_trace(format!(
"not enough payload for time metadata {}",
payload.len()
)));
}
let ca_status = u16::from_be_bytes(payload[0..2].try_into()?);
let ca_severity = u16::from_be_bytes(payload[2..4].try_into()?);
let ca_secs = u32::from_be_bytes(payload[4..8].try_into()?);
let ca_nanos = u32::from_be_bytes(payload[8..12].try_into()?);
let ca_sh = Shape::from_ca_count(hi.data_count)?;
let valbuf = &payload[12..];
let value = match ca_sh {
Shape::Scalar => match ca_st {
CaScalarType::I8 => {
type ST = i8;
const STL: usize = std::mem::size_of::<ST>();
if payload.len() < STL {
return Err(Error::with_msg_no_trace(format!(
"not enough payload for i8 {}",
payload.len()
)));
}
let v = ST::from_be_bytes(payload[..STL].try_into()?);
CaDataValue::Scalar(CaDataScalarValue::I8(v))
}
CaScalarType::I16 => {
type ST = i16;
const STL: usize = std::mem::size_of::<ST>();
if payload.len() < STL {
return Err(Error::with_msg_no_trace(format!(
"not enough payload for i16 {}",
payload.len()
)));
}
let v = ST::from_be_bytes(payload[..STL].try_into()?);
CaDataValue::Scalar(CaDataScalarValue::I16(v))
}
CaScalarType::I32 => {
type ST = i32;
const STL: usize = std::mem::size_of::<ST>();
if payload.len() < STL {
return Err(Error::with_msg_no_trace(format!(
"not enough payload for i32 {}",
payload.len()
)));
}
let v = ST::from_be_bytes(payload[..STL].try_into()?);
CaDataValue::Scalar(CaDataScalarValue::I32(v))
}
CaScalarType::F32 => {
type ST = f32;
const STL: usize = std::mem::size_of::<ST>();
if payload.len() < STL {
return Err(Error::with_msg_no_trace(format!(
"not enough payload for f32 {}",
payload.len()
)));
}
let v = ST::from_be_bytes(payload[..STL].try_into()?);
CaDataValue::Scalar(CaDataScalarValue::F32(v))
}
CaScalarType::F64 => {
type ST = f64;
const STL: usize = std::mem::size_of::<ST>();
if payload.len() < STL {
return Err(Error::with_msg_no_trace(format!(
"not enough payload for f64 {}",
payload.len()
)));
}
let v = ST::from_be_bytes(payload[..STL].try_into()?);
CaDataValue::Scalar(CaDataScalarValue::F64(v))
}
CaScalarType::Enum => {
type ST = i16;
const STL: usize = std::mem::size_of::<ST>();
if payload.len() < STL {
return Err(Error::with_msg_no_trace(format!(
"not enough payload for i16 {}",
payload.len()
)));
}
let v = ST::from_be_bytes(payload[..STL].try_into()?);
CaDataValue::Scalar(CaDataScalarValue::I16(v))
}
CaScalarType::String => {
// TODO constrain string length to the CA `data_count`.
let mut ixn = payload.len();
for (i, &c) in payload.iter().enumerate() {
if c == 0 {
ixn = i;
break;
}
}
//info!("try to read string from payload len {} ixn {}", payload.len(), ixn);
let v = String::from_utf8_lossy(&payload[..ixn]);
CaDataValue::Scalar(CaDataScalarValue::String(v.into()))
}
},
Shape::Wave(n) => match ca_st {
CaScalarType::I8 => {
type ST = i8;
const STL: usize = std::mem::size_of::<ST>();
let nn = (n as usize).min(payload.len() / STL).min(array_truncate);
let mut a = Vec::with_capacity(nn);
let mut bb = &payload[..];
for _ in 0..nn {
let v = ST::from_be_bytes(bb[..STL].try_into()?);
bb = &bb[STL..];
a.push(v);
}
CaDataValue::Array(CaDataArrayValue::I8(a))
}
CaScalarType::I16 => {
type ST = i16;
const STL: usize = std::mem::size_of::<ST>();
let nn = (n as usize).min(payload.len() / STL).min(array_truncate);
let mut a = Vec::with_capacity(nn);
let mut bb = &payload[..];
for _ in 0..nn {
let v = ST::from_be_bytes(bb[..STL].try_into()?);
bb = &bb[STL..];
a.push(v);
}
CaDataValue::Array(CaDataArrayValue::I16(a))
}
CaScalarType::I32 => {
type ST = i32;
const STL: usize = std::mem::size_of::<ST>();
let nn = (n as usize).min(payload.len() / STL).min(array_truncate);
let mut a = Vec::with_capacity(nn);
let mut bb = &payload[..];
for _ in 0..nn {
let v = ST::from_be_bytes(bb[..STL].try_into()?);
bb = &bb[STL..];
a.push(v);
}
CaDataValue::Array(CaDataArrayValue::I32(a))
}
CaScalarType::F32 => {
type ST = f32;
const STL: usize = std::mem::size_of::<ST>();
let nn = (n as usize).min(payload.len() / STL).min(array_truncate);
let mut a = Vec::with_capacity(nn);
let mut bb = &payload[..];
for _ in 0..nn {
let v = ST::from_be_bytes(bb[..STL].try_into()?);
bb = &bb[STL..];
a.push(v);
}
CaDataValue::Array(CaDataArrayValue::F32(a))
}
CaScalarType::F64 => {
type ST = f64;
const STL: usize = std::mem::size_of::<ST>();
let nn = (n as usize).min(payload.len() / STL).min(array_truncate);
let mut a = Vec::with_capacity(nn);
let mut bb = &payload[..];
for _ in 0..nn {
let v = ST::from_be_bytes(bb[..STL].try_into()?);
bb = &bb[STL..];
a.push(v);
}
CaDataValue::Array(CaDataArrayValue::F64(a))
}
_ => {
warn!("TODO handle array {ca_st:?}");
return Err(Error::with_msg_no_trace(format!(
"can not yet handle type array {ca_st:?}"
)));
}
},
Shape::Scalar => Self::ca_scalar_value(&ca_dbr_ty.scalar_type, valbuf)?,
Shape::Wave(n) => {
Self::ca_wave_value(&ca_dbr_ty.scalar_type, (n as usize).min(array_truncate), valbuf)?
}
Shape::Image(_, _) => {
error!("Can not get Image from CA");
error!("Can not handle image from channel access");
err::todoval()
}
};
let ts = SEC * (ca_secs as u64 + EPICS_EPOCH_OFFSET) + ca_nanos as u64;
let value = CaEventValue {
ts: NonZeroU64::new(ts),
status: NonZeroU16::new(ca_status),
severity: NonZeroU16::new(ca_severity),
data: value,
};
let d = EventAddRes {
data_type: hi.data_type,
data_count: hi.data_count,

39
netfetch/src/metrics.rs Normal file
View File

@@ -0,0 +1,39 @@
use log::*;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
pub async fn start_metrics_service(bind_to: String, insert_frac: Arc<AtomicU64>, insert_ivl_min: Arc<AtomicU64>) {
let app = axum::Router::new()
.route(
"/metrics",
axum::routing::get(|| async {
let stats = crate::ca::get_metrics();
match stats {
Some(s) => {
trace!("Metrics");
s.prometheus()
}
None => {
trace!("Metrics empty");
String::new()
}
}
}),
)
.route(
"/insert_frac",
axum::routing::put(|v: axum::extract::Json<u64>| async move {
insert_frac.store(v.0, Ordering::Release);
}),
)
.route(
"/insert_ivl_min",
axum::routing::put(|v: axum::extract::Json<u64>| async move {
insert_ivl_min.store(v.0, Ordering::Release);
}),
);
axum::Server::bind(&bind_to.parse().unwrap())
.serve(app.into_make_service())
.await
.unwrap()
}

View File

@@ -2,6 +2,7 @@ pub mod bsread;
pub mod ca;
pub mod channelwriter;
pub mod errconv;
pub mod metrics;
pub mod netbuf;
pub mod series;
pub mod store;

View File

@@ -15,6 +15,10 @@ pub enum Existence<T> {
pub struct SeriesId(u64);
impl SeriesId {
pub fn new(id: u64) -> Self {
Self(id)
}
pub fn id(&self) -> u64 {
self.0
}

View File

@@ -160,6 +160,10 @@ stats_proc::stats_struct!((
conn_item_count,
conn_stream_ready,
conn_stream_pending,
ca_ts_off_1,
ca_ts_off_2,
ca_ts_off_3,
ca_ts_off_4,
),
),
agg(name(CaConnStatsAgg), parent(CaConnStats)),