From 277597400e11e32c85a0c038d11fdc4fa02b9503 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 24 May 2022 14:05:36 +0200 Subject: [PATCH] Add more supported data types and stats counter --- daqingest/src/daqingest.rs | 1 + netfetch/src/ca.rs | 106 ++++++++++++++++---------- netfetch/src/ca/conn.rs | 62 +++++++++------- netfetch/src/ca/proto.rs | 139 ++++++++++++++++++++++++++++------- netfetch/src/ca/store.rs | 23 +++++- netfetch/src/series.rs | 5 +- netfetch/src/store.rs | 86 +++++++++++++++++++++- stats/src/stats.rs | 9 ++- stats_proc/src/stats_proc.rs | 4 +- 9 files changed, 338 insertions(+), 97 deletions(-) diff --git a/daqingest/src/daqingest.rs b/daqingest/src/daqingest.rs index e4542a1..e1bf6c8 100644 --- a/daqingest/src/daqingest.rs +++ b/daqingest/src/daqingest.rs @@ -102,6 +102,7 @@ impl From for CaConnectOpts { timeout: 2000, abort_after_search: 0, pg_pass: "".into(), + array_truncate: 512, } } } diff --git a/netfetch/src/ca.rs b/netfetch/src/ca.rs index b963747..610dbdd 100644 --- a/netfetch/src/ca.rs +++ b/netfetch/src/ca.rs @@ -2,6 +2,8 @@ pub mod conn; pub mod proto; pub mod store; +use crate::store::CommonInsertQueue; + use self::conn::FindIocStream; use self::store::DataStore; use conn::CaConn; @@ -11,7 +13,7 @@ use log::*; use netpod::Database; use scylla::batch::Consistency; use serde::{Deserialize, Serialize}; -use stats::{CaConnStats2Agg, CaConnStats2AggDiff}; +use stats::{CaConnStatsAgg, CaConnStatsAggDiff}; use std::collections::BTreeMap; use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use std::path::PathBuf; @@ -21,17 +23,15 @@ use tokio::fs::OpenOptions; use tokio::io::AsyncReadExt; use tokio::net::TcpStream; -static mut METRICS: Option>> = None; +static mut METRICS: Option>> = None; static METRICS_ONCE: Once = Once::new(); -fn get_metrics() -> &'static mut Option { +fn get_metrics() -> &'static mut Option { METRICS_ONCE.call_once(|| unsafe { METRICS = Some(Mutex::new(None)); }); let mut g = unsafe { METRICS.as_mut().unwrap().lock().unwrap() }; - //let ret = g.as_mut().unwrap(); - //let ret = g.as_mut(; - let ret: &mut Option = &mut *g; + let ret: &mut Option = &mut *g; let ret = unsafe { &mut *(ret as *mut _) }; ret } @@ -49,6 +49,7 @@ struct ChannelConfig { #[serde(default)] abort_after_search: u32, pg_pass: String, + array_truncate: Option, } pub struct ListenFromFileOpts { @@ -86,6 +87,7 @@ pub async fn parse_config(config: PathBuf) -> Result { timeout: conf.timeout.unwrap_or(2000), abort_after_search: conf.abort_after_search, pg_pass: conf.pg_pass, + array_truncate: conf.array_truncate.unwrap_or(512), }) } @@ -98,6 +100,7 @@ pub struct CaConnectOpts { pub timeout: u64, pub abort_after_search: u32, pub pg_pass: String, + pub array_truncate: usize, } async fn resolve_address(addr_str: &str) -> Result { @@ -230,6 +233,10 @@ pub async fn ca_search(opts: ListenFromFileOpts) -> Result<(), Error> { pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { tokio::spawn(start_metrics_service()); + + // TODO maybe this should hold the resources needed by the futures? + let ciq = CommonInsertQueue::new(); + let facility = "scylla"; let opts = parse_config(opts.config).await?; let d = Database { @@ -257,36 +264,53 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { let scy = Arc::new(scy); info!("FIND IOCS"); let qu_find_addr = pg_client - .prepare("select t2.addr from ioc_by_channel t1, ioc_by_channel t2 where t2.facility = t1.facility and t2.channel = t1.channel and t1.facility = $1 and t1.channel = $2") + .prepare("select t2.channel, t2.addr from ioc_by_channel t1, ioc_by_channel t2 where t2.facility = t1.facility and t2.channel = t1.channel and t1.facility = $1 and t1.channel in ($2, $3, $4, $5, $6, $7, $8, $9)") .await .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; let mut channels_by_host = BTreeMap::new(); - for (ix, ch) in opts.channels.iter().enumerate() { + let mut chns_todo = &opts.channels[..]; + let mut chstmp = ["__NONE__"; 8]; + let mut ix = 0; + while chns_todo.len() > 0 { + for (s1, s2) in chns_todo.iter().zip(chstmp.iter_mut()) { + *s2 = s1; + } + chns_todo = &chns_todo[chstmp.len().min(chns_todo.len())..]; let rows = pg_client - .query(&qu_find_addr, &[&facility, ch]) + .query( + &qu_find_addr, + &[ + &facility, &chstmp[0], &chstmp[1], &chstmp[2], &chstmp[3], &chstmp[4], &chstmp[5], &chstmp[6], + &chstmp[7], + ], + ) .await .map_err(|e| Error::with_msg_no_trace(format!("PG error: {e:?}")))?; if rows.is_empty() { - error!("can not find address of channel {}", ch); + error!("can not find any addresses of channels {:?}", chstmp); } else { - let addr: &str = rows[0].get(0); - if addr == "" { - // TODO the address was searched before but could not be found. - } else { - let addr: SocketAddrV4 = match addr.parse() { - Ok(k) => k, - Err(e) => { - error!("can not parse {addr:?} {e:?}"); - continue; - } - }; - if ix % 200 == 0 { - info!("{} {} {:?}", ix, ch, addr); - } - if !channels_by_host.contains_key(&addr) { - channels_by_host.insert(addr, vec![ch.to_string()]); + for row in rows { + let ch: &str = row.get(0); + let addr: &str = row.get(1); + if addr == "" { + // TODO the address was searched before but could not be found. } else { - channels_by_host.get_mut(&addr).unwrap().push(ch.to_string()); + let addr: SocketAddrV4 = match addr.parse() { + Ok(k) => k, + Err(e) => { + error!("can not parse {addr:?} for channel {ch:?} {e:?}"); + continue; + } + }; + ix += 1; + if ix % 1000 == 0 { + info!("{} of {} {} {:?}", ix, opts.channels.len(), ch, addr); + } + if !channels_by_host.contains_key(&addr) { + channels_by_host.insert(addr, vec![ch.to_string()]); + } else { + channels_by_host.get_mut(&addr).unwrap().push(ch.to_string()); + } } } } @@ -294,7 +318,7 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { if opts.abort_after_search == 1 { return Ok(()); } - let data_store = Arc::new(DataStore::new(pg_client, scy.clone()).await?); + let data_store = Arc::new(DataStore::new(pg_client, scy.clone(), ciq.sender()).await?); let mut conn_jhs = vec![]; let mut conn_stats = vec![]; for (host, channels) in channels_by_host { @@ -311,8 +335,8 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { continue; } }; - let mut conn = CaConn::new(tcp, addr, data_store.clone()); - conn_stats.push(conn.stats2()); + let mut conn = CaConn::new(tcp, addr, data_store.clone(), opts.array_truncate); + conn_stats.push(conn.stats()); for c in channels { conn.channel_add(c); } @@ -334,17 +358,19 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { let jh = tokio::spawn(conn_block); conn_jhs.push(jh); } - let mut agg_last = CaConnStats2Agg::new(); + let mut agg_last = CaConnStatsAgg::new(); loop { - tokio::time::sleep(Duration::from_millis(2000)).await; - let agg = CaConnStats2Agg::new(); + tokio::time::sleep(Duration::from_millis(500)).await; + let agg = CaConnStatsAgg::new(); for g in &conn_stats { agg.push(&g); } let m = get_metrics(); *m = Some(agg.clone()); - let diff = CaConnStats2AggDiff::diff_from(&agg_last, &agg); - info!("{}", diff.display()); + if false { + let diff = CaConnStatsAggDiff::diff_from(&agg_last, &agg); + info!("{}", diff.display()); + } agg_last = agg; if false { break; @@ -372,8 +398,14 @@ async fn start_metrics_service() { axum::routing::get(|| async { let stats = get_metrics(); match stats { - Some(s) => s.prometheus(), - None => String::new(), + Some(s) => { + trace!("Metrics"); + s.prometheus() + } + None => { + trace!("Metrics empty"); + String::new() + } } }), ); diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 3bf2064..804050f 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -11,7 +11,7 @@ use libc::c_int; use log::*; use netpod::timeunits::SEC; use netpod::{ScalarType, Shape}; -use stats::{CaConnStats2, IntervalEma}; +use stats::{CaConnStats, IntervalEma}; use std::collections::{BTreeMap, VecDeque}; use std::net::{Ipv4Addr, SocketAddrV4}; use std::pin::Pin; @@ -22,8 +22,8 @@ use std::time::{Duration, Instant, SystemTime}; use tokio::io::unix::AsyncFd; use tokio::net::TcpStream; -const INSERT_FUTS_MAX: usize = 200; -const INSERT_FUTS_LIM: usize = 80000; +const INSERT_FUTS_MAX: usize = 2; +const INSERT_FUTS_LIM: usize = 16; const TABLE_SERIES_MOD: u32 = 128; #[derive(Debug)] @@ -105,7 +105,7 @@ macro_rules! insert_scalar_impl { ts_msp_changed: bool, st: ScalarType, sh: Shape, - stats: Arc, + stats: Arc, ) { if futs_queue.len() >= INSERT_FUTS_LIM { stats.inserts_discard.fetch_add(1, Ordering::AcqRel); @@ -144,6 +144,7 @@ macro_rules! insert_scalar_impl { Box::pin(fut3) as _ }; futs_queue.push(fut); + stats.inserts_queue_push_inc(); } }; } @@ -162,7 +163,7 @@ macro_rules! insert_array_impl { ts_msp_changed: bool, st: ScalarType, sh: Shape, - stats: Arc, + stats: Arc, ) { if futs_queue.len() >= INSERT_FUTS_LIM { stats.inserts_discard.fetch_add(1, Ordering::AcqRel); @@ -201,6 +202,7 @@ macro_rules! insert_array_impl { Box::pin(fut3) as _ }; futs_queue.push(fut); + stats.inserts_queue_push_inc(); } }; } @@ -213,6 +215,8 @@ insert_scalar_impl!(insert_scalar_f64, f64, qu_insert_scalar_f64); insert_scalar_impl!(insert_scalar_string, String, qu_insert_scalar_string); insert_array_impl!(insert_array_i8, i8, qu_insert_array_i8); +insert_array_impl!(insert_array_i16, i16, qu_insert_array_i16); +insert_array_impl!(insert_array_i32, i32, qu_insert_array_i32); insert_array_impl!(insert_array_f32, f32, qu_insert_array_f32); insert_array_impl!(insert_array_f64, f64, qu_insert_array_f64); @@ -298,14 +302,19 @@ pub struct CaConn { FuturesOrdered), Error>> + Send>>>, value_insert_futs: FuturesOrdered> + Send>>>, remote_addr_dbg: SocketAddrV4, - stats2: Arc, + stats: Arc, } impl CaConn { - pub fn new(tcp: TcpStream, remote_addr_dbg: SocketAddrV4, data_store: Arc) -> Self { + pub fn new( + tcp: TcpStream, + remote_addr_dbg: SocketAddrV4, + data_store: Arc, + array_truncate: usize, + ) -> Self { Self { state: CaConnState::Init, - proto: CaProto::new(tcp, remote_addr_dbg), + proto: CaProto::new(tcp, remote_addr_dbg, array_truncate), cid_store: IdStore::new(), ioid_store: IdStore::new(), subid_store: IdStore::new(), @@ -320,12 +329,12 @@ impl CaConn { fut_get_series: FuturesOrdered::new(), value_insert_futs: FuturesOrdered::new(), remote_addr_dbg, - stats2: Arc::new(CaConnStats2::new()), + stats: Arc::new(CaConnStats::new()), } } - pub fn stats2(&self) -> Arc { - self.stats2.clone() + pub fn stats(&self) -> Arc { + self.stats.clone() } pub fn channel_add(&mut self, channel: String) { @@ -359,7 +368,9 @@ impl CaConn { while self.value_insert_futs.len() > 0 { match self.value_insert_futs.poll_next_unpin(cx) { Pending => break, - _ => {} + _ => { + self.stats.inserts_queue_pop_inc(); + } } } Ok(()) @@ -490,7 +501,7 @@ impl CaConn { ts_msp_changed, scalar_type, shape, - self.stats2.clone(), + self.stats.clone(), ); match ev.value { Scalar(v) => match v { @@ -508,6 +519,8 @@ impl CaConn { use crate::ca::proto::CaDataArrayValue::*; match v { I8(val) => match_array_value_insert!(I8, insert_array_i8, val, comm), + I16(val) => match_array_value_insert!(I16, insert_array_i16, val, comm), + I32(val) => match_array_value_insert!(I32, insert_array_i32, val, comm), F32(val) => match_array_value_insert!(F32, insert_array_f32, val, comm), F64(val) => match_array_value_insert!(F64, insert_array_f64, val, comm), _ => { @@ -662,7 +675,7 @@ impl CaConn { let mut msgs_tmp = vec![]; self.check_channels_state_init(&mut msgs_tmp)?; let ts2 = Instant::now(); - self.stats2 + self.stats .time_check_channels_state_init .fetch_add((ts2.duration_since(ts1) * 1000000).as_secs(), Ordering::Release); ts1 = ts2; @@ -730,7 +743,7 @@ impl CaConn { CaMsgTy::EventAddRes(k) => { let res = Self::handle_event_add_res(self, k); let ts2 = Instant::now(); - self.stats2 + self.stats .time_handle_event_add_res .fetch_add((ts2.duration_since(ts1) * 1000000).as_secs(), Ordering::Release); ts1 = ts2; @@ -774,13 +787,13 @@ impl Stream for CaConn { let ret = loop { self.handle_insert_futs(cx)?; let ts2 = Instant::now(); - self.stats2 + self.stats .poll_time_handle_insert_futs .fetch_add((ts2.duration_since(ts1) * 1000000).as_secs(), Ordering::AcqRel); ts1 = ts2; self.handle_get_series_futs(cx)?; let ts2 = Instant::now(); - self.stats2 + self.stats .poll_time_get_series_futs .fetch_add((ts2.duration_since(ts1) * 1000000).as_secs(), Ordering::AcqRel); ts1 = ts2; @@ -805,7 +818,7 @@ impl Stream for CaConn { CaConnState::Listen => match { let res = self.handle_conn_listen(cx); let ts2 = Instant::now(); - self.stats2 + self.stats .time_handle_conn_listen .fetch_add((ts2.duration_since(ts1) * 1000000).as_secs(), Ordering::AcqRel); ts1 = ts2; @@ -817,20 +830,19 @@ impl Stream for CaConn { CaConnState::PeerReady => { let res = self.handle_peer_ready(cx); let ts2 = Instant::now(); - self.stats2.time_handle_peer_ready_dur(ts2.duration_since(ts1)); + self.stats.time_handle_peer_ready_dur(ts2.duration_since(ts1)); ts1 = ts2; res } CaConnState::Done => Ready(None), }; }; - let nn = self.value_insert_futs.len() as u64; - if nn > 1000 { - warn!("insert_queue_len {nn}"); + let nn = self.value_insert_futs.len(); + if nn > INSERT_FUTS_LIM { + warn!("value_insert_futs len {nn}"); } - self.stats2.inserts_queue_len.store(nn, Ordering::Release); let ts_outer_2 = Instant::now(); - self.stats2.poll_time_all_dur(ts_outer_2.duration_since(ts_outer_1)); + self.stats.poll_time_all_dur(ts_outer_2.duration_since(ts_outer_1)); ret } } @@ -1099,7 +1111,7 @@ impl FindIocStream { error!("incomplete message, missing payload"); break; } - let msg = CaMsg::from_proto_infos(&hi, nb.data())?; + let msg = CaMsg::from_proto_infos(&hi, nb.data(), 32)?; nb.adv(hi.payload())?; msgs.push(msg); } diff --git a/netfetch/src/ca/proto.rs b/netfetch/src/ca/proto.rs index aa4164c..43aaef6 100644 --- a/netfetch/src/ca/proto.rs +++ b/netfetch/src/ca/proto.rs @@ -2,7 +2,7 @@ use crate::netbuf::NetBuf; use err::Error; use futures_util::{pin_mut, Stream}; use log::*; -use std::collections::VecDeque; +use std::collections::{BTreeMap, VecDeque}; use std::net::SocketAddrV4; use std::pin::Pin; use std::task::{Context, Poll}; @@ -44,6 +44,11 @@ pub struct CreateChanRes { pub sid: u32, } +#[derive(Debug)] +pub struct CreateChanFail { + pub cid: u32, +} + #[derive(Debug)] pub struct AccessRightsRes { pub cid: u32, @@ -108,6 +113,8 @@ pub enum CaDataScalarValue { #[derive(Clone, Debug)] pub enum CaDataArrayValue { I8(Vec), + I16(Vec), + I32(Vec), F32(Vec), F64(Vec), } @@ -146,6 +153,7 @@ pub enum CaMsgTy { SearchRes(SearchRes), CreateChan(CreateChan), CreateChanRes(CreateChanRes), + CreateChanFail(CreateChanFail), AccessRightsRes(AccessRightsRes), EventAdd(EventAdd), EventAddRes(EventAddRes), @@ -166,6 +174,7 @@ impl CaMsgTy { SearchRes(_) => 0x06, CreateChan(_) => 0x12, CreateChanRes(_) => 0x12, + CreateChanFail(_) => 0x1a, AccessRightsRes(_) => 0x16, EventAdd(_) => 0x01, EventAddRes(_) => 0x01, @@ -190,6 +199,7 @@ impl CaMsgTy { SearchRes(_) => 8, CreateChan(x) => (x.channel.len() + 1 + 7) / 8 * 8, CreateChanRes(_) => 0, + CreateChanFail(_) => 0, AccessRightsRes(_) => 0, EventAdd(_) => 16, EventAddRes(_) => { @@ -219,6 +229,7 @@ impl CaMsgTy { SearchRes(x) => x.tcp_port, CreateChan(_) => 0, CreateChanRes(x) => x.data_type, + CreateChanFail(_) => 0, AccessRightsRes(_) => 0, EventAdd(x) => x.data_type, EventAddRes(x) => x.data_type, @@ -239,6 +250,7 @@ impl CaMsgTy { SearchRes(_) => 0, CreateChan(_) => 0, CreateChanRes(x) => x.data_count, + CreateChanFail(_) => 0, AccessRightsRes(_) => 0, EventAdd(x) => x.data_count, EventAddRes(x) => x.data_count, @@ -259,6 +271,7 @@ impl CaMsgTy { SearchRes(x) => x.addr, CreateChan(x) => x.cid, CreateChanRes(x) => x.cid, + CreateChanFail(x) => x.cid, AccessRightsRes(x) => x.cid, EventAdd(x) => x.sid, EventAddRes(x) => x.status, @@ -279,6 +292,7 @@ impl CaMsgTy { SearchRes(x) => x.id, CreateChan(_) => CA_PROTO_VERSION as _, CreateChanRes(x) => x.sid, + CreateChanFail(_) => 0, AccessRightsRes(x) => x.rights, EventAdd(x) => x.subid, EventAddRes(x) => x.subid, @@ -337,10 +351,11 @@ impl CaMsgTy { unsafe { std::ptr::copy(&d[0] as _, &mut buf[0] as _, d.len()) }; } CreateChanRes(_) => {} + CreateChanFail(_) => {} AccessRightsRes(_) => {} EventAdd(_) => { // TODO allow to customize the mask. Test if it works. - buf.copy_from_slice(&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 15, 0, 0]); + buf.copy_from_slice(&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0x0e, 0, 0]); } EventAddRes(_) => {} ReadNotify(_) => {} @@ -391,7 +406,7 @@ impl CaMsg { } } - pub fn from_proto_infos(hi: &HeadInfo, payload: &[u8]) -> Result { + pub fn from_proto_infos(hi: &HeadInfo, payload: &[u8], array_truncate: usize) -> Result { let msg = match hi.cmdid { 0 => CaMsg { ty: CaMsgTy::VersionRes(hi.data_count), @@ -446,12 +461,42 @@ impl CaMsg { }), } } + 26 => { + CaMsg { + // TODO use different structs for request and response: + ty: CaMsgTy::CreateChanFail(CreateChanFail { cid: hi.param1 }), + } + } 1 => { use netpod::Shape; let ca_st = CaScalarType::from_ca_u16(hi.data_type)?; let ca_sh = Shape::from_ca_count(hi.data_count)?; let value = match ca_sh { Shape::Scalar => match ca_st { + CaScalarType::I8 => { + type ST = i8; + const STL: usize = std::mem::size_of::(); + if payload.len() < STL { + return Err(Error::with_msg_no_trace(format!( + "not enough payload for i8 {}", + payload.len() + ))); + } + let v = ST::from_be_bytes(payload[..STL].try_into()?); + CaDataValue::Scalar(CaDataScalarValue::I8(v)) + } + CaScalarType::I16 => { + type ST = i16; + const STL: usize = std::mem::size_of::(); + if payload.len() < STL { + return Err(Error::with_msg_no_trace(format!( + "not enough payload for i16 {}", + payload.len() + ))); + } + let v = ST::from_be_bytes(payload[..STL].try_into()?); + CaDataValue::Scalar(CaDataScalarValue::I16(v)) + } CaScalarType::I32 => { type ST = i32; const STL: usize = std::mem::size_of::(); @@ -513,18 +558,12 @@ impl CaMsg { let v = String::from_utf8_lossy(&payload[..ixn]); CaDataValue::Scalar(CaDataScalarValue::String(v.into())) } - _ => { - warn!("TODO handle {ca_st:?}"); - return Err(Error::with_msg_no_trace(format!( - "can not yet handle type scalar {ca_st:?}" - ))); - } }, Shape::Wave(n) => match ca_st { CaScalarType::I8 => { type ST = i8; const STL: usize = std::mem::size_of::(); - let nn = (n as usize).min(payload.len() / STL); + let nn = (n as usize).min(payload.len() / STL).min(array_truncate); let mut a = Vec::with_capacity(nn); let mut bb = &payload[..]; for _ in 0..nn { @@ -534,10 +573,36 @@ impl CaMsg { } CaDataValue::Array(CaDataArrayValue::I8(a)) } + CaScalarType::I16 => { + type ST = i16; + const STL: usize = std::mem::size_of::(); + let nn = (n as usize).min(payload.len() / STL).min(array_truncate); + let mut a = Vec::with_capacity(nn); + let mut bb = &payload[..]; + for _ in 0..nn { + let v = ST::from_be_bytes(bb[..STL].try_into()?); + bb = &bb[STL..]; + a.push(v); + } + CaDataValue::Array(CaDataArrayValue::I16(a)) + } + CaScalarType::I32 => { + type ST = i32; + const STL: usize = std::mem::size_of::(); + let nn = (n as usize).min(payload.len() / STL).min(array_truncate); + let mut a = Vec::with_capacity(nn); + let mut bb = &payload[..]; + for _ in 0..nn { + let v = ST::from_be_bytes(bb[..STL].try_into()?); + bb = &bb[STL..]; + a.push(v); + } + CaDataValue::Array(CaDataArrayValue::I32(a)) + } CaScalarType::F32 => { type ST = f32; const STL: usize = std::mem::size_of::(); - let nn = (n as usize).min(payload.len() / STL); + let nn = (n as usize).min(payload.len() / STL).min(array_truncate); let mut a = Vec::with_capacity(nn); let mut bb = &payload[..]; for _ in 0..nn { @@ -550,7 +615,7 @@ impl CaMsg { CaScalarType::F64 => { type ST = f64; const STL: usize = std::mem::size_of::(); - let nn = (n as usize).min(payload.len() / STL); + let nn = (n as usize).min(payload.len() / STL).min(array_truncate); let mut a = Vec::with_capacity(nn); let mut bb = &payload[..]; for _ in 0..nn { @@ -561,7 +626,7 @@ impl CaMsg { CaDataValue::Array(CaDataArrayValue::F64(a)) } _ => { - warn!("TODO handle {ca_st:?}"); + warn!("TODO handle array {ca_st:?}"); return Err(Error::with_msg_no_trace(format!( "can not yet handle type array {ca_st:?}" ))); @@ -687,10 +752,12 @@ pub struct CaProto { buf: NetBuf, outbuf: NetBuf, out: VecDeque, + array_truncate: usize, + logged_proto_error_for_cid: BTreeMap, } impl CaProto { - pub fn new(tcp: TcpStream, remote_addr_dbg: SocketAddrV4) -> Self { + pub fn new(tcp: TcpStream, remote_addr_dbg: SocketAddrV4, array_truncate: usize) -> Self { Self { tcp, remote_addr_dbg, @@ -698,6 +765,8 @@ impl CaProto { buf: NetBuf::new(1024 * 128), outbuf: NetBuf::new(1024 * 128), out: VecDeque::new(), + array_truncate, + logged_proto_error_for_cid: BTreeMap::new(), } } @@ -855,13 +924,22 @@ impl CaProto { break match &self.state { CaState::StdHead => { let hi = HeadInfo::from_netbuf(&mut self.buf)?; - if hi.cmdid == 6 - || hi.cmdid > 26 - || hi.data_type > 10 - || hi.data_count > 4096 - || hi.payload_size > 1024 * 32 - { - warn!("StdHead sees {hi:?}"); + if hi.cmdid == 1 || hi.cmdid == 15 { + let sid = hi.param1; + if hi.payload_size == 0xffff && hi.data_count == 0 { + } else if hi.payload_size > 16368 { + if self.logged_proto_error_for_cid.contains_key(&sid) { + // TODO emit this as Item so that downstream can translate SID to name. + warn!( + "Protocol error payload_size 0x{:04x} data_count 0x{:04x} hi {:?}", + hi.payload_size, hi.data_count, hi + ); + self.logged_proto_error_for_cid.insert(sid, true); + } + } + } + if hi.cmdid > 26 { + warn!("Enexpected cmdid {hi:?}"); } if hi.payload_size == 0xffff && hi.data_count == 0 { self.state = CaState::ExtHead(hi); @@ -869,7 +947,7 @@ impl CaProto { } else { if hi.payload_size == 0 { self.state = CaState::StdHead; - let msg = CaMsg::from_proto_infos(&hi, &[])?; + let msg = CaMsg::from_proto_infos(&hi, &[], self.array_truncate)?; Ok(Some(CaItem::Msg(msg))) } else { self.state = CaState::Payload(hi); @@ -880,9 +958,18 @@ impl CaProto { CaState::ExtHead(hi) => { let payload_size = self.buf.read_u32_be()?; let data_count = self.buf.read_u32_be()?; - warn!("ExtHead payload_size {payload_size} data_count {data_count}"); - if payload_size == 0 { - let msg = CaMsg::from_proto_infos(hi, &[])?; + if payload_size > 1024 * 256 { + warn!( + "ExtHead data_type {} payload_size {payload_size} data_count {data_count}", + hi.data_type + ); + } + if payload_size <= 16368 { + warn!( + "ExtHead data_type {} payload_size {payload_size} data_count {data_count}", + hi.data_type + ); + let msg = CaMsg::from_proto_infos(hi, &[], self.array_truncate)?; self.state = CaState::StdHead; Ok(Some(CaItem::Msg(msg))) } else { @@ -892,7 +979,7 @@ impl CaProto { } CaState::Payload(hi) => { let g = self.buf.read_bytes(hi.payload_size as _)?; - let msg = CaMsg::from_proto_infos(hi, g)?; + let msg = CaMsg::from_proto_infos(hi, g, self.array_truncate)?; self.state = CaState::StdHead; Ok(Some(CaItem::Msg(msg))) } diff --git a/netfetch/src/ca/store.rs b/netfetch/src/ca/store.rs index 3e7cb46..41a043f 100644 --- a/netfetch/src/ca/store.rs +++ b/netfetch/src/ca/store.rs @@ -1,5 +1,6 @@ use crate::bsread::ChannelDescDecoded; use crate::series::{Existence, SeriesId}; +use crate::store::{CommonInsertQueue, CommonInsertQueueSender}; use async_channel::{Receiver, Sender}; use err::Error; use scylla::prepared_statement::PreparedStatement; @@ -48,13 +49,20 @@ pub struct DataStore { pub qu_insert_scalar_f64: Arc, pub qu_insert_scalar_string: Arc, pub qu_insert_array_i8: Arc, + pub qu_insert_array_i16: Arc, + pub qu_insert_array_i32: Arc, pub qu_insert_array_f32: Arc, pub qu_insert_array_f64: Arc, pub chan_reg: Arc, + pub ciqs: CommonInsertQueueSender, } impl DataStore { - pub async fn new(pg_client: Arc, scy: Arc) -> Result { + pub async fn new( + pg_client: Arc, + scy: Arc, + ciqs: CommonInsertQueueSender, + ) -> Result { let q = scy .prepare("insert into series (part, series, ts_msp, scalar_type, shape_dims) values (?, ?, ?, ?, ?)") .await @@ -101,6 +109,16 @@ impl DataStore { .await .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; let qu_insert_array_i8 = Arc::new(q); + let q = scy + .prepare("insert into events_array_i16 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)") + .await + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + let qu_insert_array_i16 = Arc::new(q); + let q = scy + .prepare("insert into events_array_i32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)") + .await + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + let qu_insert_array_i32 = Arc::new(q); let q = scy .prepare("insert into events_array_f32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)") .await @@ -123,8 +141,11 @@ impl DataStore { qu_insert_scalar_f64, qu_insert_scalar_string, qu_insert_array_i8, + qu_insert_array_i16, + qu_insert_array_i32, qu_insert_array_f32, qu_insert_array_f64, + ciqs, }; Ok(ret) } diff --git a/netfetch/src/series.rs b/netfetch/src/series.rs index 30a357c..1daabff 100644 --- a/netfetch/src/series.rs +++ b/netfetch/src/series.rs @@ -142,11 +142,14 @@ pub async fn get_series_id(pg_client: &PgClient, cd: &ChannelDescDecoded) -> Res let series = Existence::Created(SeriesId(series)); return Ok(series); } else { - error!("tried to insert {series:?} for {channel_name} but it exists"); + warn!( + "tried to insert {series:?} for {facility} {channel_name} {scalar_type:?} {shape:?} trying again..." + ); } tokio::time::sleep(Duration::from_millis(20)).await; series += 1; } + error!("tried to insert {series:?} for {facility} {channel_name} {scalar_type:?} {shape:?} but it failed"); Err(Error::with_msg_no_trace(format!("get_series_id can not create and insert series id {facility:?} {channel_name:?} {scalar_type:?} {shape:?}"))) } else { let series = all[0] as u64; diff --git a/netfetch/src/store.rs b/netfetch/src/store.rs index b0e2488..98a2753 100644 --- a/netfetch/src/store.rs +++ b/netfetch/src/store.rs @@ -1,16 +1,21 @@ use crate::errconv::ErrConv; use err::Error; -use futures_util::{Future, FutureExt}; +use futures_util::stream::FuturesOrdered; +use futures_util::{Future, FutureExt, Stream, StreamExt}; use log::*; use scylla::frame::value::ValueList; use scylla::prepared_statement::PreparedStatement; use scylla::transport::errors::QueryError; use scylla::{QueryResult, Session as ScySession}; +use std::collections::VecDeque; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; use std::time::Instant; +const CHANNEL_CAP: usize = 128; +const POLLING_CAP: usize = 32; + pub struct ScyInsertFut { #[allow(unused)] scy: Arc, @@ -78,3 +83,82 @@ impl Future for ScyInsertFut { } } } + +type FutTy = Pin> + Send>>; + +pub struct CommonInsertQueueSender { + sender: async_channel::Sender, +} + +impl CommonInsertQueueSender { + pub async fn send(&self, k: FutTy) -> Result<(), Error> { + self.sender + .send(k) + .await + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}"))) + } +} + +pub struct CommonInsertQueue { + sender: async_channel::Sender, + recv: async_channel::Receiver, + futs: FuturesOrdered, + inp_done: bool, +} + +impl CommonInsertQueue { + pub fn new() -> Self { + let (tx, rx) = async_channel::bounded(CHANNEL_CAP); + Self { + sender: tx.clone(), + recv: rx, + futs: FuturesOrdered::new(), + inp_done: false, + } + } + + pub fn sender(&self) -> CommonInsertQueueSender { + CommonInsertQueueSender { + sender: self.sender.clone(), + } + } +} + +impl Stream for CommonInsertQueue { + type Item = Result<(), Error>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + loop { + let _res_inp = if self.futs.len() < POLLING_CAP && !self.inp_done { + match self.recv.poll_next_unpin(cx) { + Ready(Some(k)) => { + self.futs.push(k); + continue; + } + Ready(None) => { + self.inp_done = true; + Ready(None) + } + Pending => Pending, + } + } else { + Ready(Some(())) + }; + let res_qu = match self.futs.poll_next_unpin(cx) { + Ready(Some(Ok(_k))) => Ready(Some(Ok(()))), + Ready(Some(Err(e))) => Ready(Some(Err(e))), + Ready(None) => { + if self.inp_done { + Ready(None) + } else { + Pending + } + } + Pending => Pending, + }; + // TODO monitor queue length and queue pushes per poll of this. + break res_qu; + } + } +} diff --git a/stats/src/stats.rs b/stats/src/stats.rs index b71bfa9..e794a79 100644 --- a/stats/src/stats.rs +++ b/stats/src/stats.rs @@ -113,12 +113,13 @@ impl IntervalEma { stats_proc::stats_struct!(( stats_struct( - name(CaConnStats2), + name(CaConnStats), counters( inserts_val, inserts_msp, inserts_discard, - inserts_queue_len, + inserts_queue_push, + inserts_queue_pop, poll_time_all, poll_time_handle_insert_futs, poll_time_get_series_futs, @@ -128,6 +129,6 @@ stats_proc::stats_struct!(( time_handle_event_add_res, ), ), - agg(name(CaConnStats2Agg), parent(CaConnStats2)), - diff(name(CaConnStats2AggDiff), input(CaConnStats2Agg)), + agg(name(CaConnStatsAgg), parent(CaConnStats)), + diff(name(CaConnStatsAggDiff), input(CaConnStatsAgg)), )); diff --git a/stats_proc/src/stats_proc.rs b/stats_proc/src/stats_proc.rs index a88f689..6340ddb 100644 --- a/stats_proc/src/stats_proc.rs +++ b/stats_proc/src/stats_proc.rs @@ -185,7 +185,7 @@ impl {name} {{ for x in &st.counters { let n = x.to_string(); buf.push_str(&format!( - "ret.push_str(&format!(\"{} {{}}\\n\", self.{}.load(Ordering::Acquire)));\n", + "ret.push_str(&format!(\"daqingest_{} {{}}\\n\", self.{}.load(Ordering::Acquire)));\n", n, n )); } @@ -193,7 +193,7 @@ impl {name} {{ " pub fn prometheus(&self) -> String {{ let mut ret = String::new(); - ret.push_str(&format!(\"aggcount {{}}\\n\", self.aggcount.load(Ordering::Acquire))); + ret.push_str(&format!(\"daqingest_aggcount {{}}\\n\", self.aggcount.load(Ordering::Acquire))); {buf} ret }}