Count inserts

This commit is contained in:
Dominik Werder
2022-05-13 16:31:23 +02:00
parent eee67b916f
commit ae197e2ef2
7 changed files with 327 additions and 53 deletions

View File

@@ -11,7 +11,7 @@ use log::*;
use netpod::Database;
use scylla::batch::Consistency;
use serde::{Deserialize, Serialize};
use stats::CaConnVecStats;
use stats::{CaConnStats2, CaConnVecStats};
use std::collections::BTreeMap;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::path::PathBuf;
@@ -232,7 +232,7 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
tokio::spawn(pg_conn);
let pg_client = Arc::new(pg_client);
let scy = scylla::SessionBuilder::new()
.known_node("sf-nube-11:19042")
.known_node("sf-nube-14:19042")
.default_consistency(Consistency::One)
.use_keyspace("ks1", true)
.build()
@@ -241,7 +241,7 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
let scy = Arc::new(scy);
info!("FIND IOCS");
let qu_find_addr = pg_client
.prepare("select addr from ioc_by_channel where facility = $1 and channel = $2")
.prepare("select t2.addr from ioc_by_channel t1, ioc_by_channel t2 where t2.facility = t1.facility and t2.channel = t1.channel and t1.facility = $1 and t1.channel = $2")
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let mut channels_by_host = BTreeMap::new();
@@ -264,7 +264,7 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
continue;
}
};
if ix % 100 == 0 {
if ix % 200 == 0 {
info!("{} {} {:?}", ix, ch, addr);
}
if !channels_by_host.contains_key(&addr) {
@@ -281,6 +281,7 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
let data_store = Arc::new(DataStore::new(pg_client, scy.clone()).await?);
let mut conn_jhs = vec![];
let mut conn_stats_all = vec![];
let mut conn_stats2 = vec![];
for (host, channels) in channels_by_host {
if false && host.ip() != &"172.26.24.76".parse::<Ipv4Addr>().unwrap() {
continue;
@@ -291,6 +292,7 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
let tcp = TcpStream::connect(addr).await?;
let mut conn = CaConn::new(tcp, addr, data_store.clone());
conn_stats_all.push(conn.stats());
conn_stats2.push(conn.stats2());
for c in channels {
conn.channel_add(c);
}
@@ -313,6 +315,7 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
conn_jhs.push(jh);
}
let mut agg_last = CaConnVecStats::new(Instant::now());
let mut agg2_last = CaConnStats2Agg::new();
loop {
tokio::time::sleep(Duration::from_millis(2000)).await;
let mut agg = CaConnVecStats::new(Instant::now());

View File

@@ -5,13 +5,13 @@ use crate::ca::proto::{CreateChan, EventAdd, HeadInfo, ReadNotify};
use crate::series::{Existence, SeriesId};
use crate::store::ScyInsertFut;
use err::Error;
use futures_util::stream::FuturesUnordered;
use futures_util::stream::{FuturesOrdered, FuturesUnordered};
use futures_util::{Future, FutureExt, Stream, StreamExt, TryFutureExt};
use libc::c_int;
use log::*;
use netpod::timeunits::SEC;
use netpod::{ScalarType, Shape};
use stats::CaConnStats;
use stats::{CaConnStats, CaConnStats2, IntervalEma};
use std::collections::{BTreeMap, VecDeque};
use std::net::{Ipv4Addr, SocketAddrV4};
use std::pin::Pin;
@@ -23,7 +23,8 @@ use tokio::io::unix::AsyncFd;
use tokio::net::TcpStream;
const INSERT_FUTS_MAX: usize = 200;
const TABLE_SERIES_MOD: u32 = 2;
const INSERT_FUTS_LIM: usize = 80000;
const TABLE_SERIES_MOD: u32 = 128;
#[derive(Debug)]
enum ChannelError {
@@ -56,6 +57,7 @@ struct CreatedState {
state: MonitoringState,
ts_msp_last: u64,
inserted_in_ts_msp: u64,
ivl_ema: IntervalEma,
}
#[derive(Debug)]
@@ -95,7 +97,7 @@ macro_rules! insert_scalar_impl {
fn $fname(
data_store: Arc<DataStore>,
// TODO maybe use a newtype?
futs_queue: &mut FuturesUnordered<Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>>,
futs_queue: &mut FuturesOrdered<Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>>,
series: SeriesId,
ts_msp: u64,
ts_lsp: u64,
@@ -103,8 +105,12 @@ macro_rules! insert_scalar_impl {
ts_msp_changed: bool,
st: ScalarType,
sh: Shape,
inserts_discarded: &AtomicU64,
stats2: Arc<CaConnStats2>,
) {
if futs_queue.len() >= INSERT_FUTS_LIM {
stats2.inserts_discard.fetch_add(1, Ordering::Release);
return;
}
let pulse = 0 as u64;
let params = (
series.id() as i64,
@@ -114,6 +120,7 @@ macro_rules! insert_scalar_impl {
val,
);
let fut3 = ScyInsertFut::new(data_store.scy.clone(), data_store.$qu_insert.clone(), params);
stats2.inserts_val.fetch_add(1, Ordering::Release);
let fut = if ts_msp_changed {
let fut1 = ScyInsertFut::new(
data_store.scy.clone(),
@@ -131,15 +138,12 @@ macro_rules! insert_scalar_impl {
data_store.qu_insert_ts_msp.clone(),
(series.id() as i64, ts_msp as i64),
);
stats2.inserts_msp.fetch_add(1, Ordering::Release);
Box::pin(fut1.and_then(move |_| fut2).and_then(move |_| fut3)) as _
} else {
Box::pin(fut3) as _
};
if futs_queue.len() >= INSERT_FUTS_MAX {
inserts_discarded.fetch_add(1, Ordering::Release);
} else {
futs_queue.push(fut);
}
futs_queue.push(fut);
}
};
}
@@ -150,7 +154,7 @@ macro_rules! insert_array_impl {
fn $fname(
data_store: Arc<DataStore>,
// TODO maybe use a newtype?
futs_queue: &mut FuturesUnordered<Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>>,
futs_queue: &mut FuturesOrdered<Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>>,
series: SeriesId,
ts_msp: u64,
ts_lsp: u64,
@@ -158,8 +162,12 @@ macro_rules! insert_array_impl {
ts_msp_changed: bool,
st: ScalarType,
sh: Shape,
inserts_discarded: &AtomicU64,
stats2: Arc<CaConnStats2>,
) {
if futs_queue.len() >= INSERT_FUTS_LIM {
stats2.inserts_discard.fetch_add(1, Ordering::Release);
return;
}
let pulse = 0 as u64;
let params = (
series.id() as i64,
@@ -169,6 +177,7 @@ macro_rules! insert_array_impl {
val,
);
let fut3 = ScyInsertFut::new(data_store.scy.clone(), data_store.$qu_insert.clone(), params);
stats2.inserts_val.fetch_add(1, Ordering::Release);
let fut = if ts_msp_changed {
let fut1 = ScyInsertFut::new(
data_store.scy.clone(),
@@ -186,15 +195,12 @@ macro_rules! insert_array_impl {
data_store.qu_insert_ts_msp.clone(),
(series.id() as i64, ts_msp as i64),
);
stats2.inserts_msp.fetch_add(1, Ordering::Release);
Box::pin(fut1.and_then(move |_| fut2).and_then(move |_| fut3)) as _
} else {
Box::pin(fut3) as _
};
if futs_queue.len() >= INSERT_FUTS_MAX {
inserts_discarded.fetch_add(1, Ordering::Release);
} else {
futs_queue.push(fut);
}
futs_queue.push(fut);
}
};
}
@@ -212,8 +218,7 @@ insert_array_impl!(insert_array_f64, f64, qu_insert_array_f64);
macro_rules! match_scalar_value_insert {
($stv:ident, $insf:ident, $val:expr, $comm:expr) => {{
let (data_store, futs_queue, series, ts_msp, ts_lsp, ts_msp_changed, scalar_type, shape, inserts_discarded) =
$comm;
let (data_store, futs_queue, series, ts_msp, ts_lsp, ts_msp_changed, scalar_type, shape, stats2) = $comm;
match shape {
Shape::Scalar => match scalar_type {
ScalarType::$stv => $insf(
@@ -226,7 +231,7 @@ macro_rules! match_scalar_value_insert {
ts_msp_changed,
scalar_type,
shape,
inserts_discarded,
stats2,
),
_ => {
error!("unexpected value type insf {:?}", stringify!($insf));
@@ -240,13 +245,12 @@ macro_rules! match_scalar_value_insert {
);
}
}
};};
}};
}
macro_rules! match_array_value_insert {
($stv:ident, $insf:ident, $val:expr, $comm:expr) => {{
let (data_store, futs_queue, series, ts_msp, ts_lsp, ts_msp_changed, scalar_type, shape, inserts_discarded) =
$comm;
let (data_store, futs_queue, series, ts_msp, ts_lsp, ts_msp_changed, scalar_type, shape, stats2) = $comm;
match shape {
Shape::Wave(_) => match scalar_type {
ScalarType::$stv => $insf(
@@ -259,7 +263,7 @@ macro_rules! match_array_value_insert {
ts_msp_changed,
scalar_type,
shape,
inserts_discarded,
stats2,
),
_ => {
error!("unexpected value type insf {:?}", stringify!($insf));
@@ -273,7 +277,7 @@ macro_rules! match_array_value_insert {
);
}
}
};};
}};
}
pub struct CaConn {
@@ -290,12 +294,12 @@ pub struct CaConn {
name_by_cid: BTreeMap<u32, String>,
poll_count: usize,
data_store: Arc<DataStore>,
fut_get_series: FuturesUnordered<
Pin<Box<dyn Future<Output = Result<(u32, u32, u16, u16, Existence<SeriesId>), Error>> + Send>>,
>,
value_insert_futs: FuturesUnordered<Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>>,
fut_get_series:
FuturesOrdered<Pin<Box<dyn Future<Output = Result<(u32, u32, u16, u16, Existence<SeriesId>), Error>> + Send>>>,
value_insert_futs: FuturesOrdered<Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>>,
remote_addr_dbg: SocketAddrV4,
stats: Arc<CaConnStats>,
stats2: Arc<CaConnStats2>,
}
impl CaConn {
@@ -314,10 +318,11 @@ impl CaConn {
name_by_cid: BTreeMap::new(),
poll_count: 0,
data_store,
fut_get_series: FuturesUnordered::new(),
value_insert_futs: FuturesUnordered::new(),
fut_get_series: FuturesOrdered::new(),
value_insert_futs: FuturesOrdered::new(),
remote_addr_dbg,
stats: Arc::new(CaConnStats::new()),
stats2: Arc::new(CaConnStats2::new()),
}
}
@@ -325,6 +330,10 @@ impl CaConn {
self.stats.clone()
}
pub fn stats2(&self) -> Arc<CaConnStats2> {
self.stats2.clone()
}
pub fn channel_add(&mut self, channel: String) {
let cid = self.cid_by_name(&channel);
if self.channels.contains_key(&cid) {
@@ -404,6 +413,7 @@ impl CaConn {
state: MonitoringState::AddingEvent(series),
ts_msp_last: 0,
inserted_in_ts_msp: u64::MAX,
ivl_ema: IntervalEma::new(),
});
let scalar_type = ScalarType::from_ca_id(data_type)?;
let shape = Shape::from_ca_count(data_count)?;
@@ -442,7 +452,7 @@ impl CaConn {
let epoch = ts.duration_since(std::time::UNIX_EPOCH).unwrap();
// TODO decide on better msp/lsp: random offset!
// As long as one writer is active, the msp is arbitrary.
let ts = epoch.as_secs() * 1000000000 + epoch.subsec_nanos() as u64;
let ts = epoch.as_secs() * SEC + epoch.subsec_nanos() as u64;
let ts_msp = if inserted_in_ts_msp > 2000 {
let ts_msp = ts / (60 * SEC) * (60 * SEC);
if let ChannelState::Created(st) = self.channels.get_mut(&cid).unwrap() {
@@ -486,7 +496,7 @@ impl CaConn {
ts_msp_changed,
scalar_type,
shape,
&self.stats.inserts_discarded,
self.stats2.clone(),
);
match ev.value {
Scalar(v) => match v {
@@ -702,6 +712,7 @@ impl CaConn {
state: MonitoringState::FetchSeriesId,
ts_msp_last: 0,
inserted_in_ts_msp: u64::MAX,
ivl_ema: IntervalEma::new(),
});
// TODO handle error in different way. Should most likely not abort.
let cd = ChannelDescDecoded {
@@ -764,22 +775,27 @@ impl Stream for CaConn {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
let ts1 = Instant::now();
let ts_outer_1 = Instant::now();
let mut ts1 = ts_outer_1;
self.poll_count += 1;
let ret = loop {
let ts1 = Instant::now();
self.handle_insert_futs(cx)?;
let ts2 = Instant::now();
self.stats
.poll_time_handle_insert_futs
.fetch_add((ts2.duration_since(ts1) * 1000000).as_secs(), Ordering::AcqRel);
let ts1 = ts2;
ts1 = ts2;
self.handle_get_series_futs(cx)?;
let ts2 = Instant::now();
self.stats
.poll_time_get_series_futs
.fetch_add((ts2.duration_since(ts1) * 1000000).as_secs(), Ordering::AcqRel);
let mut ts1 = ts2;
ts1 = ts2;
if self.value_insert_futs.len() >= INSERT_FUTS_MAX {
// TODO do not do more.
// But: can I assume that in this case we triggered a Pending?
break Pending;
}
break match &self.state {
CaConnState::Init => {
let msg = CaMsg { ty: CaMsgTy::Version };
@@ -817,10 +833,16 @@ impl Stream for CaConn {
CaConnState::Done => Ready(None),
};
};
let ts2 = Instant::now();
self.stats
.poll_time_all
.fetch_add((ts2.duration_since(ts1) * 1000000).as_secs(), Ordering::AcqRel);
let nn = self.value_insert_futs.len() as u64;
if nn > 1000 {
warn!("insert_queue_len {nn}");
}
self.stats.insert_queue_len.store(nn, Ordering::Release);
let ts_outer_2 = Instant::now();
self.stats.poll_time_all.fetch_add(
(ts_outer_2.duration_since(ts_outer_1) * 1000000).as_secs(),
Ordering::AcqRel,
);
ret
}
}

View File

@@ -294,7 +294,7 @@ impl CaMsgTy {
VersionRes(_) => {}
ClientName => {
// TODO allow variable client name.
let s = "werder_d".as_bytes();
let s = "daqingest".as_bytes();
let n = s.len();
buf.fill(0);
buf[..n].copy_from_slice(s);
@@ -718,7 +718,6 @@ impl CaProto {
if let Ok(buf) = self.outbuf.write_buf(item.len()) {
Some((item, buf))
} else {
error!("output buffer too small for message");
None
}
} else {
@@ -755,8 +754,13 @@ impl CaProto {
break None;
}
while let Some((msg, buf)) = self.out_msg_buf() {
msg.place_into(buf);
self.out.pop_front();
if msg.len() > buf.len() {
error!("got output buffer but too small");
break;
} else {
msg.place_into(buf);
self.out.pop_front();
}
}
while self.outbuf.len() > 0 {
match Self::attempt_output(self.as_mut(), cx)? {
@@ -851,7 +855,12 @@ impl CaProto {
break match &self.state {
CaState::StdHead => {
let hi = HeadInfo::from_netbuf(&mut self.buf)?;
if hi.cmdid == 6 || hi.cmdid > 26 || hi.data_type > 10 || hi.payload_size > 2800 {
if hi.cmdid == 6
|| hi.cmdid > 26
|| hi.data_type > 10
|| hi.data_count > 4096
|| hi.payload_size > 1024 * 32
{
warn!("StdHead sees {hi:?}");
}
if hi.payload_size == 0xffff && hi.data_count == 0 {