Add more supported data types and stats counter

This commit is contained in:
Dominik Werder
2022-05-24 14:05:36 +02:00
parent 2f9a4092c8
commit 277597400e
9 changed files with 338 additions and 97 deletions

View File

@@ -102,6 +102,7 @@ impl From<CaChannel> for CaConnectOpts {
timeout: 2000,
abort_after_search: 0,
pg_pass: "".into(),
array_truncate: 512,
}
}
}

View File

@@ -2,6 +2,8 @@ pub mod conn;
pub mod proto;
pub mod store;
use crate::store::CommonInsertQueue;
use self::conn::FindIocStream;
use self::store::DataStore;
use conn::CaConn;
@@ -11,7 +13,7 @@ use log::*;
use netpod::Database;
use scylla::batch::Consistency;
use serde::{Deserialize, Serialize};
use stats::{CaConnStats2Agg, CaConnStats2AggDiff};
use stats::{CaConnStatsAgg, CaConnStatsAggDiff};
use std::collections::BTreeMap;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::path::PathBuf;
@@ -21,17 +23,15 @@ use tokio::fs::OpenOptions;
use tokio::io::AsyncReadExt;
use tokio::net::TcpStream;
static mut METRICS: Option<Mutex<Option<CaConnStats2Agg>>> = None;
static mut METRICS: Option<Mutex<Option<CaConnStatsAgg>>> = None;
static METRICS_ONCE: Once = Once::new();
fn get_metrics() -> &'static mut Option<CaConnStats2Agg> {
fn get_metrics() -> &'static mut Option<CaConnStatsAgg> {
METRICS_ONCE.call_once(|| unsafe {
METRICS = Some(Mutex::new(None));
});
let mut g = unsafe { METRICS.as_mut().unwrap().lock().unwrap() };
//let ret = g.as_mut().unwrap();
//let ret = g.as_mut(;
let ret: &mut Option<CaConnStats2Agg> = &mut *g;
let ret: &mut Option<CaConnStatsAgg> = &mut *g;
let ret = unsafe { &mut *(ret as *mut _) };
ret
}
@@ -49,6 +49,7 @@ struct ChannelConfig {
#[serde(default)]
abort_after_search: u32,
pg_pass: String,
array_truncate: Option<usize>,
}
pub struct ListenFromFileOpts {
@@ -86,6 +87,7 @@ pub async fn parse_config(config: PathBuf) -> Result<CaConnectOpts, Error> {
timeout: conf.timeout.unwrap_or(2000),
abort_after_search: conf.abort_after_search,
pg_pass: conf.pg_pass,
array_truncate: conf.array_truncate.unwrap_or(512),
})
}
@@ -98,6 +100,7 @@ pub struct CaConnectOpts {
pub timeout: u64,
pub abort_after_search: u32,
pub pg_pass: String,
pub array_truncate: usize,
}
async fn resolve_address(addr_str: &str) -> Result<SocketAddrV4, Error> {
@@ -230,6 +233,10 @@ pub async fn ca_search(opts: ListenFromFileOpts) -> Result<(), Error> {
pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
tokio::spawn(start_metrics_service());
// TODO maybe this should hold the resources needed by the futures?
let ciq = CommonInsertQueue::new();
let facility = "scylla";
let opts = parse_config(opts.config).await?;
let d = Database {
@@ -257,36 +264,53 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
let scy = Arc::new(scy);
info!("FIND IOCS");
let qu_find_addr = pg_client
.prepare("select t2.addr from ioc_by_channel t1, ioc_by_channel t2 where t2.facility = t1.facility and t2.channel = t1.channel and t1.facility = $1 and t1.channel = $2")
.prepare("select t2.channel, t2.addr from ioc_by_channel t1, ioc_by_channel t2 where t2.facility = t1.facility and t2.channel = t1.channel and t1.facility = $1 and t1.channel in ($2, $3, $4, $5, $6, $7, $8, $9)")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let mut channels_by_host = BTreeMap::new();
for (ix, ch) in opts.channels.iter().enumerate() {
let mut chns_todo = &opts.channels[..];
let mut chstmp = ["__NONE__"; 8];
let mut ix = 0;
while chns_todo.len() > 0 {
for (s1, s2) in chns_todo.iter().zip(chstmp.iter_mut()) {
*s2 = s1;
}
chns_todo = &chns_todo[chstmp.len().min(chns_todo.len())..];
let rows = pg_client
.query(&qu_find_addr, &[&facility, ch])
.query(
&qu_find_addr,
&[
&facility, &chstmp[0], &chstmp[1], &chstmp[2], &chstmp[3], &chstmp[4], &chstmp[5], &chstmp[6],
&chstmp[7],
],
)
.await
.map_err(|e| Error::with_msg_no_trace(format!("PG error: {e:?}")))?;
if rows.is_empty() {
error!("can not find address of channel {}", ch);
error!("can not find any addresses of channels {:?}", chstmp);
} else {
let addr: &str = rows[0].get(0);
if addr == "" {
// TODO the address was searched before but could not be found.
} else {
let addr: SocketAddrV4 = match addr.parse() {
Ok(k) => k,
Err(e) => {
error!("can not parse {addr:?} {e:?}");
continue;
}
};
if ix % 200 == 0 {
info!("{} {} {:?}", ix, ch, addr);
}
if !channels_by_host.contains_key(&addr) {
channels_by_host.insert(addr, vec![ch.to_string()]);
for row in rows {
let ch: &str = row.get(0);
let addr: &str = row.get(1);
if addr == "" {
// TODO the address was searched before but could not be found.
} else {
channels_by_host.get_mut(&addr).unwrap().push(ch.to_string());
let addr: SocketAddrV4 = match addr.parse() {
Ok(k) => k,
Err(e) => {
error!("can not parse {addr:?} for channel {ch:?} {e:?}");
continue;
}
};
ix += 1;
if ix % 1000 == 0 {
info!("{} of {} {} {:?}", ix, opts.channels.len(), ch, addr);
}
if !channels_by_host.contains_key(&addr) {
channels_by_host.insert(addr, vec![ch.to_string()]);
} else {
channels_by_host.get_mut(&addr).unwrap().push(ch.to_string());
}
}
}
}
@@ -294,7 +318,7 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
if opts.abort_after_search == 1 {
return Ok(());
}
let data_store = Arc::new(DataStore::new(pg_client, scy.clone()).await?);
let data_store = Arc::new(DataStore::new(pg_client, scy.clone(), ciq.sender()).await?);
let mut conn_jhs = vec![];
let mut conn_stats = vec![];
for (host, channels) in channels_by_host {
@@ -311,8 +335,8 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
continue;
}
};
let mut conn = CaConn::new(tcp, addr, data_store.clone());
conn_stats.push(conn.stats2());
let mut conn = CaConn::new(tcp, addr, data_store.clone(), opts.array_truncate);
conn_stats.push(conn.stats());
for c in channels {
conn.channel_add(c);
}
@@ -334,17 +358,19 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
let jh = tokio::spawn(conn_block);
conn_jhs.push(jh);
}
let mut agg_last = CaConnStats2Agg::new();
let mut agg_last = CaConnStatsAgg::new();
loop {
tokio::time::sleep(Duration::from_millis(2000)).await;
let agg = CaConnStats2Agg::new();
tokio::time::sleep(Duration::from_millis(500)).await;
let agg = CaConnStatsAgg::new();
for g in &conn_stats {
agg.push(&g);
}
let m = get_metrics();
*m = Some(agg.clone());
let diff = CaConnStats2AggDiff::diff_from(&agg_last, &agg);
info!("{}", diff.display());
if false {
let diff = CaConnStatsAggDiff::diff_from(&agg_last, &agg);
info!("{}", diff.display());
}
agg_last = agg;
if false {
break;
@@ -372,8 +398,14 @@ async fn start_metrics_service() {
axum::routing::get(|| async {
let stats = get_metrics();
match stats {
Some(s) => s.prometheus(),
None => String::new(),
Some(s) => {
trace!("Metrics");
s.prometheus()
}
None => {
trace!("Metrics empty");
String::new()
}
}
}),
);

View File

@@ -11,7 +11,7 @@ use libc::c_int;
use log::*;
use netpod::timeunits::SEC;
use netpod::{ScalarType, Shape};
use stats::{CaConnStats2, IntervalEma};
use stats::{CaConnStats, IntervalEma};
use std::collections::{BTreeMap, VecDeque};
use std::net::{Ipv4Addr, SocketAddrV4};
use std::pin::Pin;
@@ -22,8 +22,8 @@ use std::time::{Duration, Instant, SystemTime};
use tokio::io::unix::AsyncFd;
use tokio::net::TcpStream;
const INSERT_FUTS_MAX: usize = 200;
const INSERT_FUTS_LIM: usize = 80000;
const INSERT_FUTS_MAX: usize = 2;
const INSERT_FUTS_LIM: usize = 16;
const TABLE_SERIES_MOD: u32 = 128;
#[derive(Debug)]
@@ -105,7 +105,7 @@ macro_rules! insert_scalar_impl {
ts_msp_changed: bool,
st: ScalarType,
sh: Shape,
stats: Arc<CaConnStats2>,
stats: Arc<CaConnStats>,
) {
if futs_queue.len() >= INSERT_FUTS_LIM {
stats.inserts_discard.fetch_add(1, Ordering::AcqRel);
@@ -144,6 +144,7 @@ macro_rules! insert_scalar_impl {
Box::pin(fut3) as _
};
futs_queue.push(fut);
stats.inserts_queue_push_inc();
}
};
}
@@ -162,7 +163,7 @@ macro_rules! insert_array_impl {
ts_msp_changed: bool,
st: ScalarType,
sh: Shape,
stats: Arc<CaConnStats2>,
stats: Arc<CaConnStats>,
) {
if futs_queue.len() >= INSERT_FUTS_LIM {
stats.inserts_discard.fetch_add(1, Ordering::AcqRel);
@@ -201,6 +202,7 @@ macro_rules! insert_array_impl {
Box::pin(fut3) as _
};
futs_queue.push(fut);
stats.inserts_queue_push_inc();
}
};
}
@@ -213,6 +215,8 @@ insert_scalar_impl!(insert_scalar_f64, f64, qu_insert_scalar_f64);
insert_scalar_impl!(insert_scalar_string, String, qu_insert_scalar_string);
insert_array_impl!(insert_array_i8, i8, qu_insert_array_i8);
insert_array_impl!(insert_array_i16, i16, qu_insert_array_i16);
insert_array_impl!(insert_array_i32, i32, qu_insert_array_i32);
insert_array_impl!(insert_array_f32, f32, qu_insert_array_f32);
insert_array_impl!(insert_array_f64, f64, qu_insert_array_f64);
@@ -298,14 +302,19 @@ pub struct CaConn {
FuturesOrdered<Pin<Box<dyn Future<Output = Result<(u32, u32, u16, u16, Existence<SeriesId>), Error>> + Send>>>,
value_insert_futs: FuturesOrdered<Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>>,
remote_addr_dbg: SocketAddrV4,
stats2: Arc<CaConnStats2>,
stats: Arc<CaConnStats>,
}
impl CaConn {
pub fn new(tcp: TcpStream, remote_addr_dbg: SocketAddrV4, data_store: Arc<DataStore>) -> Self {
pub fn new(
tcp: TcpStream,
remote_addr_dbg: SocketAddrV4,
data_store: Arc<DataStore>,
array_truncate: usize,
) -> Self {
Self {
state: CaConnState::Init,
proto: CaProto::new(tcp, remote_addr_dbg),
proto: CaProto::new(tcp, remote_addr_dbg, array_truncate),
cid_store: IdStore::new(),
ioid_store: IdStore::new(),
subid_store: IdStore::new(),
@@ -320,12 +329,12 @@ impl CaConn {
fut_get_series: FuturesOrdered::new(),
value_insert_futs: FuturesOrdered::new(),
remote_addr_dbg,
stats2: Arc::new(CaConnStats2::new()),
stats: Arc::new(CaConnStats::new()),
}
}
pub fn stats2(&self) -> Arc<CaConnStats2> {
self.stats2.clone()
pub fn stats(&self) -> Arc<CaConnStats> {
self.stats.clone()
}
pub fn channel_add(&mut self, channel: String) {
@@ -359,7 +368,9 @@ impl CaConn {
while self.value_insert_futs.len() > 0 {
match self.value_insert_futs.poll_next_unpin(cx) {
Pending => break,
_ => {}
_ => {
self.stats.inserts_queue_pop_inc();
}
}
}
Ok(())
@@ -490,7 +501,7 @@ impl CaConn {
ts_msp_changed,
scalar_type,
shape,
self.stats2.clone(),
self.stats.clone(),
);
match ev.value {
Scalar(v) => match v {
@@ -508,6 +519,8 @@ impl CaConn {
use crate::ca::proto::CaDataArrayValue::*;
match v {
I8(val) => match_array_value_insert!(I8, insert_array_i8, val, comm),
I16(val) => match_array_value_insert!(I16, insert_array_i16, val, comm),
I32(val) => match_array_value_insert!(I32, insert_array_i32, val, comm),
F32(val) => match_array_value_insert!(F32, insert_array_f32, val, comm),
F64(val) => match_array_value_insert!(F64, insert_array_f64, val, comm),
_ => {
@@ -662,7 +675,7 @@ impl CaConn {
let mut msgs_tmp = vec![];
self.check_channels_state_init(&mut msgs_tmp)?;
let ts2 = Instant::now();
self.stats2
self.stats
.time_check_channels_state_init
.fetch_add((ts2.duration_since(ts1) * 1000000).as_secs(), Ordering::Release);
ts1 = ts2;
@@ -730,7 +743,7 @@ impl CaConn {
CaMsgTy::EventAddRes(k) => {
let res = Self::handle_event_add_res(self, k);
let ts2 = Instant::now();
self.stats2
self.stats
.time_handle_event_add_res
.fetch_add((ts2.duration_since(ts1) * 1000000).as_secs(), Ordering::Release);
ts1 = ts2;
@@ -774,13 +787,13 @@ impl Stream for CaConn {
let ret = loop {
self.handle_insert_futs(cx)?;
let ts2 = Instant::now();
self.stats2
self.stats
.poll_time_handle_insert_futs
.fetch_add((ts2.duration_since(ts1) * 1000000).as_secs(), Ordering::AcqRel);
ts1 = ts2;
self.handle_get_series_futs(cx)?;
let ts2 = Instant::now();
self.stats2
self.stats
.poll_time_get_series_futs
.fetch_add((ts2.duration_since(ts1) * 1000000).as_secs(), Ordering::AcqRel);
ts1 = ts2;
@@ -805,7 +818,7 @@ impl Stream for CaConn {
CaConnState::Listen => match {
let res = self.handle_conn_listen(cx);
let ts2 = Instant::now();
self.stats2
self.stats
.time_handle_conn_listen
.fetch_add((ts2.duration_since(ts1) * 1000000).as_secs(), Ordering::AcqRel);
ts1 = ts2;
@@ -817,20 +830,19 @@ impl Stream for CaConn {
CaConnState::PeerReady => {
let res = self.handle_peer_ready(cx);
let ts2 = Instant::now();
self.stats2.time_handle_peer_ready_dur(ts2.duration_since(ts1));
self.stats.time_handle_peer_ready_dur(ts2.duration_since(ts1));
ts1 = ts2;
res
}
CaConnState::Done => Ready(None),
};
};
let nn = self.value_insert_futs.len() as u64;
if nn > 1000 {
warn!("insert_queue_len {nn}");
let nn = self.value_insert_futs.len();
if nn > INSERT_FUTS_LIM {
warn!("value_insert_futs len {nn}");
}
self.stats2.inserts_queue_len.store(nn, Ordering::Release);
let ts_outer_2 = Instant::now();
self.stats2.poll_time_all_dur(ts_outer_2.duration_since(ts_outer_1));
self.stats.poll_time_all_dur(ts_outer_2.duration_since(ts_outer_1));
ret
}
}
@@ -1099,7 +1111,7 @@ impl FindIocStream {
error!("incomplete message, missing payload");
break;
}
let msg = CaMsg::from_proto_infos(&hi, nb.data())?;
let msg = CaMsg::from_proto_infos(&hi, nb.data(), 32)?;
nb.adv(hi.payload())?;
msgs.push(msg);
}

View File

@@ -2,7 +2,7 @@ use crate::netbuf::NetBuf;
use err::Error;
use futures_util::{pin_mut, Stream};
use log::*;
use std::collections::VecDeque;
use std::collections::{BTreeMap, VecDeque};
use std::net::SocketAddrV4;
use std::pin::Pin;
use std::task::{Context, Poll};
@@ -44,6 +44,11 @@ pub struct CreateChanRes {
pub sid: u32,
}
#[derive(Debug)]
pub struct CreateChanFail {
pub cid: u32,
}
#[derive(Debug)]
pub struct AccessRightsRes {
pub cid: u32,
@@ -108,6 +113,8 @@ pub enum CaDataScalarValue {
#[derive(Clone, Debug)]
pub enum CaDataArrayValue {
I8(Vec<i8>),
I16(Vec<i16>),
I32(Vec<i32>),
F32(Vec<f32>),
F64(Vec<f64>),
}
@@ -146,6 +153,7 @@ pub enum CaMsgTy {
SearchRes(SearchRes),
CreateChan(CreateChan),
CreateChanRes(CreateChanRes),
CreateChanFail(CreateChanFail),
AccessRightsRes(AccessRightsRes),
EventAdd(EventAdd),
EventAddRes(EventAddRes),
@@ -166,6 +174,7 @@ impl CaMsgTy {
SearchRes(_) => 0x06,
CreateChan(_) => 0x12,
CreateChanRes(_) => 0x12,
CreateChanFail(_) => 0x1a,
AccessRightsRes(_) => 0x16,
EventAdd(_) => 0x01,
EventAddRes(_) => 0x01,
@@ -190,6 +199,7 @@ impl CaMsgTy {
SearchRes(_) => 8,
CreateChan(x) => (x.channel.len() + 1 + 7) / 8 * 8,
CreateChanRes(_) => 0,
CreateChanFail(_) => 0,
AccessRightsRes(_) => 0,
EventAdd(_) => 16,
EventAddRes(_) => {
@@ -219,6 +229,7 @@ impl CaMsgTy {
SearchRes(x) => x.tcp_port,
CreateChan(_) => 0,
CreateChanRes(x) => x.data_type,
CreateChanFail(_) => 0,
AccessRightsRes(_) => 0,
EventAdd(x) => x.data_type,
EventAddRes(x) => x.data_type,
@@ -239,6 +250,7 @@ impl CaMsgTy {
SearchRes(_) => 0,
CreateChan(_) => 0,
CreateChanRes(x) => x.data_count,
CreateChanFail(_) => 0,
AccessRightsRes(_) => 0,
EventAdd(x) => x.data_count,
EventAddRes(x) => x.data_count,
@@ -259,6 +271,7 @@ impl CaMsgTy {
SearchRes(x) => x.addr,
CreateChan(x) => x.cid,
CreateChanRes(x) => x.cid,
CreateChanFail(x) => x.cid,
AccessRightsRes(x) => x.cid,
EventAdd(x) => x.sid,
EventAddRes(x) => x.status,
@@ -279,6 +292,7 @@ impl CaMsgTy {
SearchRes(x) => x.id,
CreateChan(_) => CA_PROTO_VERSION as _,
CreateChanRes(x) => x.sid,
CreateChanFail(_) => 0,
AccessRightsRes(x) => x.rights,
EventAdd(x) => x.subid,
EventAddRes(x) => x.subid,
@@ -337,10 +351,11 @@ impl CaMsgTy {
unsafe { std::ptr::copy(&d[0] as _, &mut buf[0] as _, d.len()) };
}
CreateChanRes(_) => {}
CreateChanFail(_) => {}
AccessRightsRes(_) => {}
EventAdd(_) => {
// TODO allow to customize the mask. Test if it works.
buf.copy_from_slice(&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 15, 0, 0]);
buf.copy_from_slice(&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0x0e, 0, 0]);
}
EventAddRes(_) => {}
ReadNotify(_) => {}
@@ -391,7 +406,7 @@ impl CaMsg {
}
}
pub fn from_proto_infos(hi: &HeadInfo, payload: &[u8]) -> Result<Self, Error> {
pub fn from_proto_infos(hi: &HeadInfo, payload: &[u8], array_truncate: usize) -> Result<Self, Error> {
let msg = match hi.cmdid {
0 => CaMsg {
ty: CaMsgTy::VersionRes(hi.data_count),
@@ -446,12 +461,42 @@ impl CaMsg {
}),
}
}
26 => {
CaMsg {
// TODO use different structs for request and response:
ty: CaMsgTy::CreateChanFail(CreateChanFail { cid: hi.param1 }),
}
}
1 => {
use netpod::Shape;
let ca_st = CaScalarType::from_ca_u16(hi.data_type)?;
let ca_sh = Shape::from_ca_count(hi.data_count)?;
let value = match ca_sh {
Shape::Scalar => match ca_st {
CaScalarType::I8 => {
type ST = i8;
const STL: usize = std::mem::size_of::<ST>();
if payload.len() < STL {
return Err(Error::with_msg_no_trace(format!(
"not enough payload for i8 {}",
payload.len()
)));
}
let v = ST::from_be_bytes(payload[..STL].try_into()?);
CaDataValue::Scalar(CaDataScalarValue::I8(v))
}
CaScalarType::I16 => {
type ST = i16;
const STL: usize = std::mem::size_of::<ST>();
if payload.len() < STL {
return Err(Error::with_msg_no_trace(format!(
"not enough payload for i16 {}",
payload.len()
)));
}
let v = ST::from_be_bytes(payload[..STL].try_into()?);
CaDataValue::Scalar(CaDataScalarValue::I16(v))
}
CaScalarType::I32 => {
type ST = i32;
const STL: usize = std::mem::size_of::<ST>();
@@ -513,18 +558,12 @@ impl CaMsg {
let v = String::from_utf8_lossy(&payload[..ixn]);
CaDataValue::Scalar(CaDataScalarValue::String(v.into()))
}
_ => {
warn!("TODO handle {ca_st:?}");
return Err(Error::with_msg_no_trace(format!(
"can not yet handle type scalar {ca_st:?}"
)));
}
},
Shape::Wave(n) => match ca_st {
CaScalarType::I8 => {
type ST = i8;
const STL: usize = std::mem::size_of::<ST>();
let nn = (n as usize).min(payload.len() / STL);
let nn = (n as usize).min(payload.len() / STL).min(array_truncate);
let mut a = Vec::with_capacity(nn);
let mut bb = &payload[..];
for _ in 0..nn {
@@ -534,10 +573,36 @@ impl CaMsg {
}
CaDataValue::Array(CaDataArrayValue::I8(a))
}
CaScalarType::I16 => {
type ST = i16;
const STL: usize = std::mem::size_of::<ST>();
let nn = (n as usize).min(payload.len() / STL).min(array_truncate);
let mut a = Vec::with_capacity(nn);
let mut bb = &payload[..];
for _ in 0..nn {
let v = ST::from_be_bytes(bb[..STL].try_into()?);
bb = &bb[STL..];
a.push(v);
}
CaDataValue::Array(CaDataArrayValue::I16(a))
}
CaScalarType::I32 => {
type ST = i32;
const STL: usize = std::mem::size_of::<ST>();
let nn = (n as usize).min(payload.len() / STL).min(array_truncate);
let mut a = Vec::with_capacity(nn);
let mut bb = &payload[..];
for _ in 0..nn {
let v = ST::from_be_bytes(bb[..STL].try_into()?);
bb = &bb[STL..];
a.push(v);
}
CaDataValue::Array(CaDataArrayValue::I32(a))
}
CaScalarType::F32 => {
type ST = f32;
const STL: usize = std::mem::size_of::<ST>();
let nn = (n as usize).min(payload.len() / STL);
let nn = (n as usize).min(payload.len() / STL).min(array_truncate);
let mut a = Vec::with_capacity(nn);
let mut bb = &payload[..];
for _ in 0..nn {
@@ -550,7 +615,7 @@ impl CaMsg {
CaScalarType::F64 => {
type ST = f64;
const STL: usize = std::mem::size_of::<ST>();
let nn = (n as usize).min(payload.len() / STL);
let nn = (n as usize).min(payload.len() / STL).min(array_truncate);
let mut a = Vec::with_capacity(nn);
let mut bb = &payload[..];
for _ in 0..nn {
@@ -561,7 +626,7 @@ impl CaMsg {
CaDataValue::Array(CaDataArrayValue::F64(a))
}
_ => {
warn!("TODO handle {ca_st:?}");
warn!("TODO handle array {ca_st:?}");
return Err(Error::with_msg_no_trace(format!(
"can not yet handle type array {ca_st:?}"
)));
@@ -687,10 +752,12 @@ pub struct CaProto {
buf: NetBuf,
outbuf: NetBuf,
out: VecDeque<CaMsg>,
array_truncate: usize,
logged_proto_error_for_cid: BTreeMap<u32, bool>,
}
impl CaProto {
pub fn new(tcp: TcpStream, remote_addr_dbg: SocketAddrV4) -> Self {
pub fn new(tcp: TcpStream, remote_addr_dbg: SocketAddrV4, array_truncate: usize) -> Self {
Self {
tcp,
remote_addr_dbg,
@@ -698,6 +765,8 @@ impl CaProto {
buf: NetBuf::new(1024 * 128),
outbuf: NetBuf::new(1024 * 128),
out: VecDeque::new(),
array_truncate,
logged_proto_error_for_cid: BTreeMap::new(),
}
}
@@ -855,13 +924,22 @@ impl CaProto {
break match &self.state {
CaState::StdHead => {
let hi = HeadInfo::from_netbuf(&mut self.buf)?;
if hi.cmdid == 6
|| hi.cmdid > 26
|| hi.data_type > 10
|| hi.data_count > 4096
|| hi.payload_size > 1024 * 32
{
warn!("StdHead sees {hi:?}");
if hi.cmdid == 1 || hi.cmdid == 15 {
let sid = hi.param1;
if hi.payload_size == 0xffff && hi.data_count == 0 {
} else if hi.payload_size > 16368 {
if self.logged_proto_error_for_cid.contains_key(&sid) {
// TODO emit this as Item so that downstream can translate SID to name.
warn!(
"Protocol error payload_size 0x{:04x} data_count 0x{:04x} hi {:?}",
hi.payload_size, hi.data_count, hi
);
self.logged_proto_error_for_cid.insert(sid, true);
}
}
}
if hi.cmdid > 26 {
warn!("Enexpected cmdid {hi:?}");
}
if hi.payload_size == 0xffff && hi.data_count == 0 {
self.state = CaState::ExtHead(hi);
@@ -869,7 +947,7 @@ impl CaProto {
} else {
if hi.payload_size == 0 {
self.state = CaState::StdHead;
let msg = CaMsg::from_proto_infos(&hi, &[])?;
let msg = CaMsg::from_proto_infos(&hi, &[], self.array_truncate)?;
Ok(Some(CaItem::Msg(msg)))
} else {
self.state = CaState::Payload(hi);
@@ -880,9 +958,18 @@ impl CaProto {
CaState::ExtHead(hi) => {
let payload_size = self.buf.read_u32_be()?;
let data_count = self.buf.read_u32_be()?;
warn!("ExtHead payload_size {payload_size} data_count {data_count}");
if payload_size == 0 {
let msg = CaMsg::from_proto_infos(hi, &[])?;
if payload_size > 1024 * 256 {
warn!(
"ExtHead data_type {} payload_size {payload_size} data_count {data_count}",
hi.data_type
);
}
if payload_size <= 16368 {
warn!(
"ExtHead data_type {} payload_size {payload_size} data_count {data_count}",
hi.data_type
);
let msg = CaMsg::from_proto_infos(hi, &[], self.array_truncate)?;
self.state = CaState::StdHead;
Ok(Some(CaItem::Msg(msg)))
} else {
@@ -892,7 +979,7 @@ impl CaProto {
}
CaState::Payload(hi) => {
let g = self.buf.read_bytes(hi.payload_size as _)?;
let msg = CaMsg::from_proto_infos(hi, g)?;
let msg = CaMsg::from_proto_infos(hi, g, self.array_truncate)?;
self.state = CaState::StdHead;
Ok(Some(CaItem::Msg(msg)))
}

View File

@@ -1,5 +1,6 @@
use crate::bsread::ChannelDescDecoded;
use crate::series::{Existence, SeriesId};
use crate::store::{CommonInsertQueue, CommonInsertQueueSender};
use async_channel::{Receiver, Sender};
use err::Error;
use scylla::prepared_statement::PreparedStatement;
@@ -48,13 +49,20 @@ pub struct DataStore {
pub qu_insert_scalar_f64: Arc<PreparedStatement>,
pub qu_insert_scalar_string: Arc<PreparedStatement>,
pub qu_insert_array_i8: Arc<PreparedStatement>,
pub qu_insert_array_i16: Arc<PreparedStatement>,
pub qu_insert_array_i32: Arc<PreparedStatement>,
pub qu_insert_array_f32: Arc<PreparedStatement>,
pub qu_insert_array_f64: Arc<PreparedStatement>,
pub chan_reg: Arc<ChannelRegistry>,
pub ciqs: CommonInsertQueueSender,
}
impl DataStore {
pub async fn new(pg_client: Arc<PgClient>, scy: Arc<ScySession>) -> Result<Self, Error> {
pub async fn new(
pg_client: Arc<PgClient>,
scy: Arc<ScySession>,
ciqs: CommonInsertQueueSender,
) -> Result<Self, Error> {
let q = scy
.prepare("insert into series (part, series, ts_msp, scalar_type, shape_dims) values (?, ?, ?, ?, ?)")
.await
@@ -101,6 +109,16 @@ impl DataStore {
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_array_i8 = Arc::new(q);
let q = scy
.prepare("insert into events_array_i16 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_array_i16 = Arc::new(q);
let q = scy
.prepare("insert into events_array_i32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let qu_insert_array_i32 = Arc::new(q);
let q = scy
.prepare("insert into events_array_f32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)")
.await
@@ -123,8 +141,11 @@ impl DataStore {
qu_insert_scalar_f64,
qu_insert_scalar_string,
qu_insert_array_i8,
qu_insert_array_i16,
qu_insert_array_i32,
qu_insert_array_f32,
qu_insert_array_f64,
ciqs,
};
Ok(ret)
}

View File

@@ -142,11 +142,14 @@ pub async fn get_series_id(pg_client: &PgClient, cd: &ChannelDescDecoded) -> Res
let series = Existence::Created(SeriesId(series));
return Ok(series);
} else {
error!("tried to insert {series:?} for {channel_name} but it exists");
warn!(
"tried to insert {series:?} for {facility} {channel_name} {scalar_type:?} {shape:?} trying again..."
);
}
tokio::time::sleep(Duration::from_millis(20)).await;
series += 1;
}
error!("tried to insert {series:?} for {facility} {channel_name} {scalar_type:?} {shape:?} but it failed");
Err(Error::with_msg_no_trace(format!("get_series_id can not create and insert series id {facility:?} {channel_name:?} {scalar_type:?} {shape:?}")))
} else {
let series = all[0] as u64;

View File

@@ -1,16 +1,21 @@
use crate::errconv::ErrConv;
use err::Error;
use futures_util::{Future, FutureExt};
use futures_util::stream::FuturesOrdered;
use futures_util::{Future, FutureExt, Stream, StreamExt};
use log::*;
use scylla::frame::value::ValueList;
use scylla::prepared_statement::PreparedStatement;
use scylla::transport::errors::QueryError;
use scylla::{QueryResult, Session as ScySession};
use std::collections::VecDeque;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Instant;
const CHANNEL_CAP: usize = 128;
const POLLING_CAP: usize = 32;
pub struct ScyInsertFut {
#[allow(unused)]
scy: Arc<ScySession>,
@@ -78,3 +83,82 @@ impl Future for ScyInsertFut {
}
}
}
type FutTy = Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>;
pub struct CommonInsertQueueSender {
sender: async_channel::Sender<FutTy>,
}
impl CommonInsertQueueSender {
pub async fn send(&self, k: FutTy) -> Result<(), Error> {
self.sender
.send(k)
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))
}
}
pub struct CommonInsertQueue {
sender: async_channel::Sender<FutTy>,
recv: async_channel::Receiver<FutTy>,
futs: FuturesOrdered<FutTy>,
inp_done: bool,
}
impl CommonInsertQueue {
pub fn new() -> Self {
let (tx, rx) = async_channel::bounded(CHANNEL_CAP);
Self {
sender: tx.clone(),
recv: rx,
futs: FuturesOrdered::new(),
inp_done: false,
}
}
pub fn sender(&self) -> CommonInsertQueueSender {
CommonInsertQueueSender {
sender: self.sender.clone(),
}
}
}
impl Stream for CommonInsertQueue {
type Item = Result<(), Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
loop {
let _res_inp = if self.futs.len() < POLLING_CAP && !self.inp_done {
match self.recv.poll_next_unpin(cx) {
Ready(Some(k)) => {
self.futs.push(k);
continue;
}
Ready(None) => {
self.inp_done = true;
Ready(None)
}
Pending => Pending,
}
} else {
Ready(Some(()))
};
let res_qu = match self.futs.poll_next_unpin(cx) {
Ready(Some(Ok(_k))) => Ready(Some(Ok(()))),
Ready(Some(Err(e))) => Ready(Some(Err(e))),
Ready(None) => {
if self.inp_done {
Ready(None)
} else {
Pending
}
}
Pending => Pending,
};
// TODO monitor queue length and queue pushes per poll of this.
break res_qu;
}
}
}

View File

@@ -113,12 +113,13 @@ impl IntervalEma {
stats_proc::stats_struct!((
stats_struct(
name(CaConnStats2),
name(CaConnStats),
counters(
inserts_val,
inserts_msp,
inserts_discard,
inserts_queue_len,
inserts_queue_push,
inserts_queue_pop,
poll_time_all,
poll_time_handle_insert_futs,
poll_time_get_series_futs,
@@ -128,6 +129,6 @@ stats_proc::stats_struct!((
time_handle_event_add_res,
),
),
agg(name(CaConnStats2Agg), parent(CaConnStats2)),
diff(name(CaConnStats2AggDiff), input(CaConnStats2Agg)),
agg(name(CaConnStatsAgg), parent(CaConnStats)),
diff(name(CaConnStatsAggDiff), input(CaConnStatsAgg)),
));

View File

@@ -185,7 +185,7 @@ impl {name} {{
for x in &st.counters {
let n = x.to_string();
buf.push_str(&format!(
"ret.push_str(&format!(\"{} {{}}\\n\", self.{}.load(Ordering::Acquire)));\n",
"ret.push_str(&format!(\"daqingest_{} {{}}\\n\", self.{}.load(Ordering::Acquire)));\n",
n, n
));
}
@@ -193,7 +193,7 @@ impl {name} {{
"
pub fn prometheus(&self) -> String {{
let mut ret = String::new();
ret.push_str(&format!(\"aggcount {{}}\\n\", self.aggcount.load(Ordering::Acquire)));
ret.push_str(&format!(\"daqingest_aggcount {{}}\\n\", self.aggcount.load(Ordering::Acquire)));
{buf}
ret
}}