diff --git a/daqingest/src/bin/daqingest.rs b/daqingest/src/bin/daqingest.rs index 52e0588..d8abdbd 100644 --- a/daqingest/src/bin/daqingest.rs +++ b/daqingest/src/bin/daqingest.rs @@ -5,7 +5,7 @@ use log::*; pub fn main() -> Result<(), Error> { let opts = DaqIngestOpts::parse(); - log::info!("daqingest version {}", clap::crate_version!()); + info!("daqingest version {}", clap::crate_version!()); let runtime = taskrun::get_runtime_opts(opts.nworkers.unwrap_or(12), 32); let res = runtime.block_on(async move { match opts.subcmd { diff --git a/netfetch/src/ca.rs b/netfetch/src/ca.rs index b033d9e..4daa108 100644 --- a/netfetch/src/ca.rs +++ b/netfetch/src/ca.rs @@ -3,7 +3,6 @@ pub mod proto; pub mod search; pub mod store; -use self::conn::FindIocStream; use self::store::DataStore; use crate::store::{CommonInsertItemQueue, QueryItem}; use conn::CaConn; @@ -15,20 +14,19 @@ use scylla::batch::Consistency; use serde::{Deserialize, Serialize}; use stats::{CaConnStats, CaConnStatsAgg, CaConnStatsAggDiff}; use std::collections::BTreeMap; -use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; +use std::net::{Ipv4Addr, SocketAddrV4}; use std::path::PathBuf; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex, Once}; -use std::time::{Duration, Instant}; +use std::time::Duration; use tokio::fs::OpenOptions; use tokio::io::AsyncReadExt; -use tokio::net::TcpStream; use tokio_postgres::Client as PgClient; static mut METRICS: Option>> = None; static METRICS_ONCE: Once = Once::new(); -fn get_metrics() -> &'static mut Option { +pub fn get_metrics() -> &'static mut Option { METRICS_ONCE.call_once(|| unsafe { METRICS = Some(Mutex::new(None)); }); @@ -58,6 +56,7 @@ struct ChannelConfig { insert_queue_max: Option, insert_item_queue_cap: Option, api_bind: Option, + local_epics_hostname: String, } pub struct ListenFromFileOpts { @@ -70,16 +69,15 @@ pub async fn parse_config(config: PathBuf) -> Result { file.read_to_end(&mut buf).await?; let mut conf: ChannelConfig = serde_yaml::from_slice(&buf).map_err(|e| Error::with_msg_no_trace(format!("{:?}", e)))?; - let re1 = regex::Regex::new(&conf.whitelist)?; - let re2 = regex::Regex::new(&conf.blacklist)?; + let re_p = regex::Regex::new(&conf.whitelist)?; + let re_n = regex::Regex::new(&conf.blacklist)?; conf.channels = conf .channels .into_iter() .filter(|ch| { - if let Some(_cs) = re1.captures(&ch) { - //let m = cs.get(1).unwrap(); + if let Some(_cs) = re_p.captures(&ch) { true - } else if re2.is_match(&ch) { + } else if re_n.is_match(&ch) { false } else { true @@ -101,6 +99,7 @@ pub async fn parse_config(config: PathBuf) -> Result { insert_queue_max: conf.insert_queue_max.unwrap_or(32), insert_item_queue_cap: conf.insert_item_queue_cap.unwrap_or(380000), api_bind: conf.api_bind.unwrap_or_else(|| "0.0.0.0:3011".into()), + local_epics_hostname: conf.local_epics_hostname, }) } @@ -119,6 +118,7 @@ pub struct CaConnectOpts { pub insert_queue_max: usize, pub insert_item_queue_cap: usize, pub api_bind: String, + pub local_epics_hostname: String, } async fn spawn_scylla_insert_workers( @@ -235,7 +235,7 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { let insert_ivl_min = Arc::new(AtomicU64::new(8800)); let opts = parse_config(opts.config).await?; let scyconf = opts.scyconf.clone(); - tokio::spawn(start_metrics_service( + tokio::spawn(crate::metrics::start_metrics_service( opts.api_bind.clone(), insert_frac.clone(), insert_ivl_min.clone(), @@ -290,7 +290,7 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { ], ) .await - .map_err(|e| Error::with_msg_no_trace(format!("PG error: {e:?}")))?; + .map_err(|e| Error::with_msg_no_trace(format!("pg lookup error: {e:?}")))?; if rows.is_empty() { error!("can not find any addresses of channels {:?}", chstmp); } else { @@ -347,28 +347,11 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { let mut conn_stats = vec![]; info!("channels_by_host len {}", channels_by_host.len()); for (host, channels) in channels_by_host { - if false && host.ip() != &"172.26.24.76".parse::().unwrap() { - continue; - } let data_store = data_store.clone(); - //debug!("Create TCP connection to {:?}", (host.ip(), host.port())); let addr = SocketAddrV4::new(host.ip().clone(), host.port()); - // TODO establish the connection in the future SM. - let tcp = match tokio::time::timeout(Duration::from_millis(500), TcpStream::connect(addr)).await { - Ok(Ok(k)) => k, - Ok(Err(e)) => { - error!("Can not connect to {addr:?} {e:?}"); - continue; - } - Err(e) => { - error!("Can not connect to {addr:?} {e:?}"); - continue; - } - }; - local_stats.tcp_connected_inc(); let mut conn = CaConn::new( - tcp, addr, + opts.local_epics_hostname.clone(), data_store.clone(), insert_item_queue.sender(), opts.array_truncate, @@ -385,8 +368,6 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { match item { Ok(_) => { stats2.conn_item_count_inc(); - // TODO test if performance can be noticed: - //trace!("CaConn gives item: {k:?}"); } Err(e) => { error!("CaConn gives error: {e:?}"); @@ -435,39 +416,3 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { } Ok(()) } - -async fn start_metrics_service(bind_to: String, insert_frac: Arc, insert_ivl_min: Arc) { - let app = axum::Router::new() - .route( - "/metrics", - axum::routing::get(|| async { - let stats = get_metrics(); - match stats { - Some(s) => { - trace!("Metrics"); - s.prometheus() - } - None => { - trace!("Metrics empty"); - String::new() - } - } - }), - ) - .route( - "/insert_frac", - axum::routing::put(|v: axum::extract::Json| async move { - insert_frac.store(v.0, Ordering::Release); - }), - ) - .route( - "/insert_ivl_min", - axum::routing::put(|v: axum::extract::Json| async move { - insert_ivl_min.store(v.0, Ordering::Release); - }), - ); - axum::Server::bind(&bind_to.parse().unwrap()) - .serve(app.into_make_service()) - .await - .unwrap() -} diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 9e5d883..cfaec83 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -2,6 +2,7 @@ use super::proto::{self, CaItem, CaMsg, CaMsgTy, CaProto}; use super::store::DataStore; use crate::bsread::ChannelDescDecoded; use crate::ca::proto::{CreateChan, EventAdd, HeadInfo}; +use crate::ca::store::ChannelRegistry; use crate::series::{Existence, SeriesId}; use crate::store::{CommonInsertItemQueueSender, InsertItem, IvlItem, MuteItem, QueryItem}; use err::Error; @@ -78,10 +79,17 @@ enum ChannelState { } enum CaConnState { + Unconnected, + Connecting(Pin> + Send>>), Init, Listen, PeerReady, - Done, + Wait(Pin + Send>>), +} + +fn wait_fut(dt: u64) -> Pin + Send>> { + let fut = tokio::time::sleep(Duration::from_millis(dt)); + Box::pin(fut) } struct IdStore { @@ -103,7 +111,7 @@ impl IdStore { #[allow(unused)] pub struct CaConn { state: CaConnState, - proto: CaProto, + proto: Option, cid_store: IdStore, ioid_store: IdStore, subid_store: IdStore, @@ -121,6 +129,8 @@ pub struct CaConn { fut_get_series: FuturesOrdered), Error>> + Send>>>, remote_addr_dbg: SocketAddrV4, + local_epics_hostname: String, + array_truncate: usize, stats: Arc, insert_queue_max: usize, insert_ivl_min: Arc, @@ -128,8 +138,8 @@ pub struct CaConn { impl CaConn { pub fn new( - tcp: TcpStream, remote_addr_dbg: SocketAddrV4, + local_epics_hostname: String, data_store: Arc, insert_item_sender: CommonInsertItemQueueSender, array_truncate: usize, @@ -137,8 +147,8 @@ impl CaConn { insert_ivl_min: Arc, ) -> Self { Self { - state: CaConnState::Init, - proto: CaProto::new(tcp, remote_addr_dbg, array_truncate), + state: CaConnState::Unconnected, + proto: None, cid_store: IdStore::new(), ioid_store: IdStore::new(), subid_store: IdStore::new(), @@ -155,6 +165,8 @@ impl CaConn { insert_item_send_fut: None, fut_get_series: FuturesOrdered::new(), remote_addr_dbg, + local_epics_hostname, + array_truncate, stats: Arc::new(CaConnStats::new()), insert_queue_max, insert_ivl_min, @@ -190,7 +202,6 @@ impl CaConn { self.name_by_cid.get(&cid).map(|x| x.as_str()) } - #[inline(never)] fn handle_insert_futs(&mut self, cx: &mut Context) -> Poll> { use Poll::*; loop { @@ -246,18 +257,24 @@ impl CaConn { if series.id() == 0 { warn!("Weird series id: {series:?}"); } + if data_type > 6 { + error!("data type of series unexpected: {}", data_type); + } let subid = self.subid_store.next(); self.cid_by_subid.insert(subid, cid); let name = self.name_by_cid(cid).unwrap().to_string(); + // TODO convert first to CaDbrType, set to `Time`, then convert to ix: + let data_type_asked = data_type + 14; let msg = CaMsg { ty: CaMsgTy::EventAdd(EventAdd { sid, - data_type, + data_type: data_type_asked, data_count, subid, }), }; - self.proto.push_out(msg); + let proto = self.proto.as_mut().unwrap(); + proto.push_out(msg); // TODO handle not-found error: let ch_s = self.channels.get_mut(&cid).unwrap(); *ch_s = ChannelState::Created(CreatedState { @@ -298,7 +315,6 @@ impl CaConn { Ok(()) } - #[inline(never)] fn event_add_insert( &mut self, series: SeriesId, @@ -313,8 +329,8 @@ impl CaConn { ) -> Result<(), Error> { // TODO decide on better msp/lsp: random offset! // As long as one writer is active, the msp is arbitrary. - let ts_msp = if inserted_in_ts_msp > 2000 { - let ts_msp = ts / (60 * SEC) * (60 * SEC); + let ts_msp = if inserted_in_ts_msp > 20000 { + let ts_msp = ts / (10 * SEC) * (10 * SEC); if let ChannelState::Created(st) = self.channels.get_mut(&cid).unwrap() { st.ts_msp_last = ts_msp; st.inserted_in_ts_msp = 1; @@ -351,7 +367,7 @@ impl CaConn { pulse: 0, scalar_type, shape, - val: ev.value, + val: ev.value.data, ts_msp_grid, }; item_queue.push_back(QueryItem::Insert(item)); @@ -359,17 +375,17 @@ impl CaConn { Ok(()) } - #[inline(never)] - fn handle_event_add_res(&mut self, ev: proto::EventAddRes) -> Result<(), Error> { + fn handle_event_add_res(&mut self, ev: proto::EventAddRes, tsnow: Instant) -> Result<(), Error> { // TODO handle subid-not-found which can also be peer error: let cid = *self.cid_by_subid.get(&ev.subid).unwrap(); + //let name = self.name_by_cid(cid).unwrap().to_string(); // TODO get rid of the string clone when I don't want the log output any longer: - //let name: String = self.name_by_cid(cid).unwrap().into(); // TODO handle not-found error: let mut series_2 = None; let ch_s = self.channels.get_mut(&cid).unwrap(); match ch_s { ChannelState::Created(st) => { + st.item_recv_ivl_ema.tick(Instant::now()); let scalar_type = st.scalar_type.clone(); let shape = st.shape.clone(); match st.state { @@ -401,12 +417,24 @@ impl CaConn { return Err(format!("no series id on insert").into()); } }; - let ts = { + let ts_local = { let ts = SystemTime::now(); let epoch = ts.duration_since(std::time::UNIX_EPOCH).unwrap(); epoch.as_secs() * SEC + epoch.subsec_nanos() as u64 }; - let tsnow = Instant::now(); + let ts = ev.value.ts.map_or(0, |x| x.get()); + let ts_diff = ts.abs_diff(ts_local); + if ts_diff > SEC * 300 { + self.stats.ca_ts_off_4_inc(); + //warn!("Bad time for {name} {ts} vs {ts_local} diff {}", ts_diff / SEC); + // TODO mute this channel for some time, discard the event. + } else if ts_diff > SEC * 120 { + self.stats.ca_ts_off_3_inc(); + } else if ts_diff > SEC * 20 { + self.stats.ca_ts_off_2_inc(); + } else if ts_diff > SEC * 3 { + self.stats.ca_ts_off_1_inc(); + } if tsnow >= st.insert_next_earliest { st.muted_before = 0; st.insert_item_ivl_ema.tick(tsnow); @@ -441,7 +469,7 @@ impl CaConn { )?; } else { self.stats.channel_fast_item_drop_inc(); - if tsnow.duration_since(st.insert_recv_ivl_last) >= Duration::from_millis(2000) { + if tsnow.duration_since(st.insert_recv_ivl_last) >= Duration::from_millis(10000) { st.insert_recv_ivl_last = tsnow; let ema = st.insert_item_ivl_ema.ema(); let item = IvlItem { @@ -477,10 +505,9 @@ impl CaConn { Pending Ready(no-more-work, something-was-done, error) */ - #[inline(never)] fn handle_conn_listen(&mut self, cx: &mut Context) -> Poll>> { use Poll::*; - match self.proto.poll_next_unpin(cx) { + match self.proto.as_mut().unwrap().poll_next_unpin(cx) { Ready(Some(k)) => match k { Ok(k) => match k { CaItem::Empty => { @@ -513,14 +540,14 @@ impl CaConn { }, Ready(None) => { warn!("CaProto is done {:?}", self.remote_addr_dbg); - self.state = CaConnState::Done; + self.state = CaConnState::Wait(wait_fut(10000)); + self.proto = None; Ready(None) } Pending => Pending, } } - #[inline(never)] fn check_channels_state_init(&mut self, msgs_tmp: &mut Vec) -> Result<(), Error> { // TODO profile, efficient enough? if self.init_state_count == 0 { @@ -561,7 +588,6 @@ impl CaConn { // Can return: // Pending, error, work-done (pending state unknown), no-more-work-ever-again. - #[inline(never)] fn handle_peer_ready(&mut self, cx: &mut Context) -> Poll>> { use Poll::*; let mut ts1 = Instant::now(); @@ -578,11 +604,15 @@ impl CaConn { //info!("msgs_tmp.len() {}", msgs_tmp.len()); do_wake_again = true; } - // TODO be careful to not overload outgoing message queue. - for msg in msgs_tmp { - self.proto.push_out(msg); + { + let proto = self.proto.as_mut().unwrap(); + // TODO be careful to not overload outgoing message queue. + for msg in msgs_tmp { + proto.push_out(msg); + } } - let res = match self.proto.poll_next_unpin(cx) { + let tsnow = Instant::now(); + let res = match self.proto.as_mut().unwrap().poll_next_unpin(cx) { Ready(Some(Ok(k))) => { match k { CaItem::Msg(k) => { @@ -599,6 +629,12 @@ impl CaConn { // TODO handle error: let name = self.name_by_cid(cid).unwrap().to_string(); debug!("CreateChanRes {name:?}"); + if false && name.contains(".STAT") { + info!("Channel created for {}", name); + } + if k.data_type > 6 { + error!("CreateChanRes with unexpected data_type {}", k.data_type); + } let scalar_type = ScalarType::from_ca_id(k.data_type)?; let shape = Shape::from_ca_count(k.data_count)?; // TODO handle not-found error: @@ -608,15 +644,15 @@ impl CaConn { sid, scalar_type: scalar_type.clone(), shape: shape.clone(), - ts_created: Instant::now(), + ts_created: tsnow, state: MonitoringState::FetchSeriesId, ts_msp_last: 0, ts_msp_grid_last: 0, inserted_in_ts_msp: u64::MAX, insert_item_ivl_ema: IntervalEma::new(), item_recv_ivl_ema: IntervalEma::new(), - insert_recv_ivl_last: Instant::now(), - insert_next_earliest: Instant::now(), + insert_recv_ivl_last: tsnow, + insert_next_earliest: tsnow, muted_before: 0, }); // TODO handle error in different way. Should most likely not abort. @@ -629,10 +665,10 @@ impl CaConn { byte_order: netpod::ByteOrder::LE, compression: None, }; - let y = unsafe { &*(&self as &Self as *const CaConn) }; - let fut = y - .data_store - .chan_reg + let z = unsafe { + &*(&self.data_store.chan_reg as &ChannelRegistry as *const ChannelRegistry) + }; + let fut = z .get_series_id(cd) .map_ok(move |series| (cid, k.sid, k.data_type, k.data_count, series)); // TODO throttle execution rate: @@ -640,7 +676,7 @@ impl CaConn { do_wake_again = true; } CaMsgTy::EventAddRes(k) => { - let res = Self::handle_event_add_res(self, k); + let res = Self::handle_event_add_res(self, k, tsnow); let ts2 = Instant::now(); self.stats .time_handle_event_add_res @@ -662,7 +698,8 @@ impl CaConn { } Ready(None) => { warn!("CaProto is done"); - self.state = CaConnState::Done; + self.state = CaConnState::Wait(wait_fut(10000)); + self.proto = None; Ready(None) } Pending => Pending, @@ -697,28 +734,63 @@ impl Stream for CaConn { Pending => break Pending, } - self.handle_get_series_futs(cx)?; - let ts2 = Instant::now(); - self.stats - .poll_time_get_series_futs - .fetch_add((ts2.duration_since(ts1) * MS as u32).as_secs(), Ordering::AcqRel); - ts1 = ts2; - if self.insert_item_queue.len() >= self.insert_queue_max { break Pending; } break loop { - break match &self.state { + break match &mut self.state { + CaConnState::Unconnected => { + let addr = self.remote_addr_dbg.clone(); + let fut = async move { + trace!("create tcp connection to {:?}", (addr.ip(), addr.port())); + match tokio::time::timeout(Duration::from_millis(500), TcpStream::connect(addr)).await { + Ok(Ok(k)) => Ok(k), + Ok(Err(e)) => { + error!("Can not connect to {addr:?} {e:?}"); + Err(e.into()) + } + Err(e) => { + error!("Can not connect to {addr:?} {e:?}"); + Err(Error::with_msg_no_trace(format!("timeout"))) + } + } + }; + self.state = CaConnState::Connecting(Box::pin(fut)); + continue 'outer; + } + CaConnState::Connecting(ref mut fut) => { + match fut.poll_unpin(cx) { + Ready(Ok(tcp)) => { + let proto = CaProto::new(tcp, self.remote_addr_dbg.clone(), self.array_truncate); + self.state = CaConnState::Init; + self.proto = Some(proto); + continue 'outer; + } + Ready(Err(e)) => { + error!("Connection error: {e:?}"); + // We can not connect to the remote. + // TODO do exponential backoff. + self.state = CaConnState::Wait(wait_fut(10000)); + self.proto = None; + continue 'outer; + } + Pending => Pending, + } + } CaConnState::Init => { + let hostname = self.local_epics_hostname.clone(); + let proto = self.proto.as_mut().unwrap(); let msg = CaMsg { ty: CaMsgTy::Version }; - self.proto.push_out(msg); + proto.push_out(msg); let msg = CaMsg { ty: CaMsgTy::ClientName, }; - self.proto.push_out(msg); - let msg = CaMsg { ty: CaMsgTy::HostName }; - self.proto.push_out(msg); + proto.push_out(msg); + let msg = CaMsg { + ty: CaMsgTy::HostName(hostname), + }; + proto.push_out(msg); self.state = CaConnState::Listen; continue 'outer; } @@ -737,6 +809,14 @@ impl Stream for CaConn { Pending => Pending, }, CaConnState::PeerReady => { + { + self.handle_get_series_futs(cx)?; + let ts2 = Instant::now(); + self.stats + .poll_time_get_series_futs + .fetch_add((ts2.duration_since(ts1) * MS as u32).as_secs(), Ordering::AcqRel); + ts1 = ts2; + } let res = self.handle_peer_ready(cx); let ts2 = Instant::now(); self.stats.time_handle_peer_ready_dur(ts2.duration_since(ts1)); @@ -757,10 +837,14 @@ impl Stream for CaConn { Pending => Pending, } } - CaConnState::Done => { - // TODO handle better - Pending - } + CaConnState::Wait(inst) => match inst.poll_unpin(cx) { + Ready(_) => { + self.state = CaConnState::Unconnected; + self.proto = None; + continue 'outer; + } + Pending => Pending, + }, }; }; }; diff --git a/netfetch/src/ca/proto.rs b/netfetch/src/ca/proto.rs index 43aaef6..7f32501 100644 --- a/netfetch/src/ca/proto.rs +++ b/netfetch/src/ca/proto.rs @@ -2,14 +2,17 @@ use crate::netbuf::NetBuf; use err::Error; use futures_util::{pin_mut, Stream}; use log::*; +use netpod::timeunits::*; use std::collections::{BTreeMap, VecDeque}; use std::net::SocketAddrV4; +use std::num::{NonZeroU16, NonZeroU64}; use std::pin::Pin; use std::task::{Context, Poll}; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tokio::net::TcpStream; const CA_PROTO_VERSION: u16 = 13; +const EPICS_EPOCH_OFFSET: u64 = 631152000; #[derive(Debug)] pub struct Search { @@ -69,7 +72,7 @@ pub struct EventAddRes { pub data_count: u16, pub status: u32, pub subid: u32, - pub value: CaDataValue, + pub value: CaEventValue, } #[derive(Debug)] @@ -99,6 +102,49 @@ enum CaScalarType { String, } +#[derive(Debug)] +enum CaDbrMetaType { + Plain, + Status, + Time, +} + +#[derive(Debug)] +pub struct CaDbrType { + meta: CaDbrMetaType, + scalar_type: CaScalarType, +} + +impl CaDbrType { + pub fn from_ca_u16(k: u16) -> Result { + if k > 20 { + return Err(Error::with_msg_no_trace(format!( + "can not understand ca dbr type id {}", + k + ))); + } + let (meta, k) = if k >= 14 { + (CaDbrMetaType::Time, k - 14) + } else if k >= 7 { + (CaDbrMetaType::Status, k - 7) + } else { + (CaDbrMetaType::Plain, k) + }; + use CaScalarType::*; + let scalar_type = match k { + 4 => I8, + 1 => I16, + 5 => I32, + 2 => F32, + 6 => F64, + 3 => Enum, + 0 => String, + k => return Err(Error::with_msg_no_trace(format!("bad ca scalar type id: {k}"))), + }; + Ok(CaDbrType { meta, scalar_type }) + } +} + #[derive(Clone, Debug)] pub enum CaDataScalarValue { I8(i8), @@ -125,21 +171,12 @@ pub enum CaDataValue { Array(CaDataArrayValue), } -impl CaScalarType { - fn from_ca_u16(k: u16) -> Result { - use CaScalarType::*; - let ret = match k { - 4 => I8, - 1 => I16, - 5 => I32, - 2 => F32, - 6 => F64, - 3 => Enum, - 0 => String, - k => return Err(Error::with_msg_no_trace(format!("bad dbr type id: {k}"))), - }; - Ok(ret) - } +#[derive(Clone, Debug)] +pub struct CaEventValue { + pub ts: Option, + pub status: Option, + pub severity: Option, + pub data: CaDataValue, } #[derive(Debug)] @@ -148,7 +185,7 @@ pub enum CaMsgTy { VersionRes(u16), ClientName, ClientNameRes(ClientNameRes), - HostName, + HostName(String), Search(Search), SearchRes(SearchRes), CreateChan(CreateChan), @@ -169,7 +206,7 @@ impl CaMsgTy { VersionRes(_) => 0, ClientName => 0x14, ClientNameRes(_) => 0x14, - HostName => 0x15, + HostName(_) => 0x15, Search(_) => 0x06, SearchRes(_) => 0x06, CreateChan(_) => 0x12, @@ -194,7 +231,7 @@ impl CaMsgTy { VersionRes(_) => 0, ClientName => 0x10, ClientNameRes(x) => (x.name.len() + 1 + 7) / 8 * 8, - HostName => 0x18, + HostName(_) => 0x18, Search(x) => (x.channel.len() + 1 + 7) / 8 * 8, SearchRes(_) => 8, CreateChan(x) => (x.channel.len() + 1 + 7) / 8 * 8, @@ -221,7 +258,7 @@ impl CaMsgTy { VersionRes(n) => *n, ClientName => 0, ClientNameRes(_) => 0, - HostName => 0, + HostName(_) => 0, Search(_) => { // Reply-flag 1 @@ -245,7 +282,7 @@ impl CaMsgTy { VersionRes(_) => 0, ClientName => 0, ClientNameRes(_) => 0, - HostName => 0, + HostName(_) => 0, Search(_) => CA_PROTO_VERSION, SearchRes(_) => 0, CreateChan(_) => 0, @@ -266,7 +303,7 @@ impl CaMsgTy { VersionRes(_) => 0, ClientName => 0, ClientNameRes(_) => 0, - HostName => 0, + HostName(_) => 0, Search(e) => e.id, SearchRes(x) => x.addr, CreateChan(x) => x.cid, @@ -287,7 +324,7 @@ impl CaMsgTy { VersionRes(_) => 0, ClientName => 0, ClientNameRes(_) => 0, - HostName => 0, + HostName(_) => 0, Search(e) => e.id, SearchRes(x) => x.id, CreateChan(_) => CA_PROTO_VERSION as _, @@ -317,9 +354,8 @@ impl CaMsgTy { error!("should not attempt to write ClientNameRes"); panic!(); } - HostName => { - // TODO allow variable host name. Null-extend always to 8 byte align. - let s = "sf-nube-11.psi.ch".as_bytes(); + HostName(name) => { + let s = name.as_bytes(); let n = s.len(); buf.fill(0); buf[..n].copy_from_slice(s); @@ -364,6 +400,39 @@ impl CaMsgTy { } } +macro_rules! convert_scalar_value { + ($st:ty, $var:ident, $buf:expr) => {{ + type ST = $st; + const STL: usize = std::mem::size_of::(); + if $buf.len() < STL { + return Err(Error::with_msg_no_trace(format!( + "not enough payload for {} {}", + std::any::type_name::(), + $buf.len() + ))); + } + let v = ST::from_be_bytes($buf[..STL].try_into()?); + CaDataValue::Scalar(CaDataScalarValue::$var(v)) + }}; +} + +macro_rules! convert_wave_value { + ($st:ty, $var:ident, $n:expr, $buf:expr) => {{ + type ST = $st; + const STL: usize = std::mem::size_of::(); + let nn = $n.min($buf.len() / STL); + let mut a = Vec::with_capacity(nn); + // TODO optimize with unsafe? + let mut bb = &$buf[..]; + for _ in 0..nn { + let v = ST::from_be_bytes(bb[..STL].try_into()?); + bb = &bb[STL..]; + a.push(v); + } + CaDataValue::Array(CaDataArrayValue::$var(a)) + }}; +} + #[derive(Debug)] pub struct CaMsg { pub ty: CaMsgTy, @@ -406,6 +475,48 @@ impl CaMsg { } } + fn ca_scalar_value(scalar_type: &CaScalarType, buf: &[u8]) -> Result { + let val = match scalar_type { + CaScalarType::I8 => convert_scalar_value!(i8, I8, buf), + CaScalarType::I16 => convert_scalar_value!(i16, I16, buf), + CaScalarType::I32 => convert_scalar_value!(i32, I32, buf), + CaScalarType::F32 => convert_scalar_value!(f32, F32, buf), + CaScalarType::F64 => convert_scalar_value!(f64, F64, buf), + CaScalarType::Enum => convert_scalar_value!(i16, I16, buf), + CaScalarType::String => { + // TODO constrain string length to the CA `data_count`. + let mut ixn = buf.len(); + for (i, &c) in buf.iter().enumerate() { + if c == 0 { + ixn = i; + break; + } + } + //info!("try to read string from payload len {} ixn {}", buf.len(), ixn); + let v = String::from_utf8_lossy(&buf[..ixn]); + CaDataValue::Scalar(CaDataScalarValue::String(v.into())) + } + }; + Ok(val) + } + + fn ca_wave_value(scalar_type: &CaScalarType, n: usize, buf: &[u8]) -> Result { + let val = match scalar_type { + CaScalarType::I8 => convert_wave_value!(i8, I8, n, buf), + CaScalarType::I16 => convert_wave_value!(i16, I16, n, buf), + CaScalarType::I32 => convert_wave_value!(i32, I32, n, buf), + CaScalarType::F32 => convert_wave_value!(f32, F32, n, buf), + CaScalarType::F64 => convert_wave_value!(f64, F64, n, buf), + _ => { + warn!("TODO conversion array {scalar_type:?}"); + return Err(Error::with_msg_no_trace(format!( + "can not yet handle conversion of type array {scalar_type:?}" + ))); + } + }; + Ok(val) + } + pub fn from_proto_infos(hi: &HeadInfo, payload: &[u8], array_truncate: usize) -> Result { let msg = match hi.cmdid { 0 => CaMsg { @@ -420,7 +531,9 @@ impl CaMsg { } } // TODO make response type for host name: - 21 => CaMsg { ty: CaMsgTy::HostName }, + 21 => CaMsg { + ty: CaMsgTy::HostName("TODOx5288".into()), + }, 6 => { if hi.payload_size != 8 { warn!("protocol error: search result is expected with fixed payload size 8"); @@ -469,174 +582,43 @@ impl CaMsg { } 1 => { use netpod::Shape; - let ca_st = CaScalarType::from_ca_u16(hi.data_type)?; + let ca_dbr_ty = CaDbrType::from_ca_u16(hi.data_type)?; + if let CaDbrMetaType::Time = ca_dbr_ty.meta { + } else { + return Err(Error::with_msg_no_trace(format!( + "expect ca dbr time type, got: {:?}", + ca_dbr_ty + ))); + } + if payload.len() < 12 { + return Err(Error::with_msg_no_trace(format!( + "not enough payload for time metadata {}", + payload.len() + ))); + } + let ca_status = u16::from_be_bytes(payload[0..2].try_into()?); + let ca_severity = u16::from_be_bytes(payload[2..4].try_into()?); + let ca_secs = u32::from_be_bytes(payload[4..8].try_into()?); + let ca_nanos = u32::from_be_bytes(payload[8..12].try_into()?); let ca_sh = Shape::from_ca_count(hi.data_count)?; + let valbuf = &payload[12..]; let value = match ca_sh { - Shape::Scalar => match ca_st { - CaScalarType::I8 => { - type ST = i8; - const STL: usize = std::mem::size_of::(); - if payload.len() < STL { - return Err(Error::with_msg_no_trace(format!( - "not enough payload for i8 {}", - payload.len() - ))); - } - let v = ST::from_be_bytes(payload[..STL].try_into()?); - CaDataValue::Scalar(CaDataScalarValue::I8(v)) - } - CaScalarType::I16 => { - type ST = i16; - const STL: usize = std::mem::size_of::(); - if payload.len() < STL { - return Err(Error::with_msg_no_trace(format!( - "not enough payload for i16 {}", - payload.len() - ))); - } - let v = ST::from_be_bytes(payload[..STL].try_into()?); - CaDataValue::Scalar(CaDataScalarValue::I16(v)) - } - CaScalarType::I32 => { - type ST = i32; - const STL: usize = std::mem::size_of::(); - if payload.len() < STL { - return Err(Error::with_msg_no_trace(format!( - "not enough payload for i32 {}", - payload.len() - ))); - } - let v = ST::from_be_bytes(payload[..STL].try_into()?); - CaDataValue::Scalar(CaDataScalarValue::I32(v)) - } - CaScalarType::F32 => { - type ST = f32; - const STL: usize = std::mem::size_of::(); - if payload.len() < STL { - return Err(Error::with_msg_no_trace(format!( - "not enough payload for f32 {}", - payload.len() - ))); - } - let v = ST::from_be_bytes(payload[..STL].try_into()?); - CaDataValue::Scalar(CaDataScalarValue::F32(v)) - } - CaScalarType::F64 => { - type ST = f64; - const STL: usize = std::mem::size_of::(); - if payload.len() < STL { - return Err(Error::with_msg_no_trace(format!( - "not enough payload for f64 {}", - payload.len() - ))); - } - let v = ST::from_be_bytes(payload[..STL].try_into()?); - CaDataValue::Scalar(CaDataScalarValue::F64(v)) - } - CaScalarType::Enum => { - type ST = i16; - const STL: usize = std::mem::size_of::(); - if payload.len() < STL { - return Err(Error::with_msg_no_trace(format!( - "not enough payload for i16 {}", - payload.len() - ))); - } - let v = ST::from_be_bytes(payload[..STL].try_into()?); - CaDataValue::Scalar(CaDataScalarValue::I16(v)) - } - CaScalarType::String => { - // TODO constrain string length to the CA `data_count`. - let mut ixn = payload.len(); - for (i, &c) in payload.iter().enumerate() { - if c == 0 { - ixn = i; - break; - } - } - //info!("try to read string from payload len {} ixn {}", payload.len(), ixn); - let v = String::from_utf8_lossy(&payload[..ixn]); - CaDataValue::Scalar(CaDataScalarValue::String(v.into())) - } - }, - Shape::Wave(n) => match ca_st { - CaScalarType::I8 => { - type ST = i8; - const STL: usize = std::mem::size_of::(); - let nn = (n as usize).min(payload.len() / STL).min(array_truncate); - let mut a = Vec::with_capacity(nn); - let mut bb = &payload[..]; - for _ in 0..nn { - let v = ST::from_be_bytes(bb[..STL].try_into()?); - bb = &bb[STL..]; - a.push(v); - } - CaDataValue::Array(CaDataArrayValue::I8(a)) - } - CaScalarType::I16 => { - type ST = i16; - const STL: usize = std::mem::size_of::(); - let nn = (n as usize).min(payload.len() / STL).min(array_truncate); - let mut a = Vec::with_capacity(nn); - let mut bb = &payload[..]; - for _ in 0..nn { - let v = ST::from_be_bytes(bb[..STL].try_into()?); - bb = &bb[STL..]; - a.push(v); - } - CaDataValue::Array(CaDataArrayValue::I16(a)) - } - CaScalarType::I32 => { - type ST = i32; - const STL: usize = std::mem::size_of::(); - let nn = (n as usize).min(payload.len() / STL).min(array_truncate); - let mut a = Vec::with_capacity(nn); - let mut bb = &payload[..]; - for _ in 0..nn { - let v = ST::from_be_bytes(bb[..STL].try_into()?); - bb = &bb[STL..]; - a.push(v); - } - CaDataValue::Array(CaDataArrayValue::I32(a)) - } - CaScalarType::F32 => { - type ST = f32; - const STL: usize = std::mem::size_of::(); - let nn = (n as usize).min(payload.len() / STL).min(array_truncate); - let mut a = Vec::with_capacity(nn); - let mut bb = &payload[..]; - for _ in 0..nn { - let v = ST::from_be_bytes(bb[..STL].try_into()?); - bb = &bb[STL..]; - a.push(v); - } - CaDataValue::Array(CaDataArrayValue::F32(a)) - } - CaScalarType::F64 => { - type ST = f64; - const STL: usize = std::mem::size_of::(); - let nn = (n as usize).min(payload.len() / STL).min(array_truncate); - let mut a = Vec::with_capacity(nn); - let mut bb = &payload[..]; - for _ in 0..nn { - let v = ST::from_be_bytes(bb[..STL].try_into()?); - bb = &bb[STL..]; - a.push(v); - } - CaDataValue::Array(CaDataArrayValue::F64(a)) - } - _ => { - warn!("TODO handle array {ca_st:?}"); - return Err(Error::with_msg_no_trace(format!( - "can not yet handle type array {ca_st:?}" - ))); - } - }, + Shape::Scalar => Self::ca_scalar_value(&ca_dbr_ty.scalar_type, valbuf)?, + Shape::Wave(n) => { + Self::ca_wave_value(&ca_dbr_ty.scalar_type, (n as usize).min(array_truncate), valbuf)? + } Shape::Image(_, _) => { - error!("Can not get Image from CA"); + error!("Can not handle image from channel access"); err::todoval() } }; + let ts = SEC * (ca_secs as u64 + EPICS_EPOCH_OFFSET) + ca_nanos as u64; + let value = CaEventValue { + ts: NonZeroU64::new(ts), + status: NonZeroU16::new(ca_status), + severity: NonZeroU16::new(ca_severity), + data: value, + }; let d = EventAddRes { data_type: hi.data_type, data_count: hi.data_count, diff --git a/netfetch/src/metrics.rs b/netfetch/src/metrics.rs new file mode 100644 index 0000000..4e5a5bb --- /dev/null +++ b/netfetch/src/metrics.rs @@ -0,0 +1,39 @@ +use log::*; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; + +pub async fn start_metrics_service(bind_to: String, insert_frac: Arc, insert_ivl_min: Arc) { + let app = axum::Router::new() + .route( + "/metrics", + axum::routing::get(|| async { + let stats = crate::ca::get_metrics(); + match stats { + Some(s) => { + trace!("Metrics"); + s.prometheus() + } + None => { + trace!("Metrics empty"); + String::new() + } + } + }), + ) + .route( + "/insert_frac", + axum::routing::put(|v: axum::extract::Json| async move { + insert_frac.store(v.0, Ordering::Release); + }), + ) + .route( + "/insert_ivl_min", + axum::routing::put(|v: axum::extract::Json| async move { + insert_ivl_min.store(v.0, Ordering::Release); + }), + ); + axum::Server::bind(&bind_to.parse().unwrap()) + .serve(app.into_make_service()) + .await + .unwrap() +} diff --git a/netfetch/src/netfetch.rs b/netfetch/src/netfetch.rs index b775d23..c8c5ea5 100644 --- a/netfetch/src/netfetch.rs +++ b/netfetch/src/netfetch.rs @@ -2,6 +2,7 @@ pub mod bsread; pub mod ca; pub mod channelwriter; pub mod errconv; +pub mod metrics; pub mod netbuf; pub mod series; pub mod store; diff --git a/netfetch/src/series.rs b/netfetch/src/series.rs index 34bb1de..978fce6 100644 --- a/netfetch/src/series.rs +++ b/netfetch/src/series.rs @@ -15,6 +15,10 @@ pub enum Existence { pub struct SeriesId(u64); impl SeriesId { + pub fn new(id: u64) -> Self { + Self(id) + } + pub fn id(&self) -> u64 { self.0 } diff --git a/stats/src/stats.rs b/stats/src/stats.rs index 817981c..a56fbe0 100644 --- a/stats/src/stats.rs +++ b/stats/src/stats.rs @@ -160,6 +160,10 @@ stats_proc::stats_struct!(( conn_item_count, conn_stream_ready, conn_stream_pending, + ca_ts_off_1, + ca_ts_off_2, + ca_ts_off_3, + ca_ts_off_4, ), ), agg(name(CaConnStatsAgg), parent(CaConnStats)),