Differentiate error counts, save periodic channel info
This commit is contained in:
@@ -8,7 +8,7 @@ use self::store::DataStore;
|
||||
use crate::ca::conn::ConnCommand;
|
||||
use crate::errconv::ErrConv;
|
||||
use crate::linuxhelper::local_hostname;
|
||||
use crate::store::{CommonInsertItemQueue, CommonInsertItemQueueSender, QueryItem};
|
||||
use crate::store::{CommonInsertItemQueue, CommonInsertItemQueueSender, IntoSimplerError, QueryItem};
|
||||
use async_channel::Sender;
|
||||
use conn::CaConn;
|
||||
use err::Error;
|
||||
@@ -169,6 +169,37 @@ impl ExtraInsertsConf {
|
||||
}
|
||||
}
|
||||
|
||||
fn stats_inc_for_err(stats: &stats::CaConnStats, err: &crate::store::Error) {
|
||||
use crate::store::Error;
|
||||
match err {
|
||||
Error::DbOverload => {
|
||||
stats.store_worker_insert_overload_inc();
|
||||
}
|
||||
Error::DbTimeout => {
|
||||
stats.store_worker_insert_timeout_inc();
|
||||
}
|
||||
Error::DbUnavailable => {
|
||||
stats.store_worker_insert_unavailable_inc();
|
||||
}
|
||||
Error::DbError(_) => {
|
||||
stats.store_worker_insert_error_inc();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn back_off_next(backoff_dt: &mut Duration) {
|
||||
*backoff_dt = *backoff_dt + (*backoff_dt) * 3 / 2;
|
||||
let dtmax = Duration::from_millis(4000);
|
||||
if *backoff_dt > dtmax {
|
||||
*backoff_dt = dtmax;
|
||||
}
|
||||
}
|
||||
|
||||
async fn back_off_sleep(backoff_dt: &mut Duration) {
|
||||
back_off_next(backoff_dt);
|
||||
tokio::time::sleep(*backoff_dt).await;
|
||||
}
|
||||
|
||||
async fn spawn_scylla_insert_workers(
|
||||
scyconf: ScyllaConfig,
|
||||
insert_scylla_sessions: usize,
|
||||
@@ -190,63 +221,47 @@ async fn spawn_scylla_insert_workers(
|
||||
let recv = insert_item_queue.receiver();
|
||||
let insert_frac = insert_frac.clone();
|
||||
let fut = async move {
|
||||
let backoff_0 = Duration::from_millis(10);
|
||||
let mut backoff = backoff_0.clone();
|
||||
let mut i1 = 0;
|
||||
while let Ok(item) = recv.recv().await {
|
||||
stats.store_worker_item_recv_inc();
|
||||
match item {
|
||||
QueryItem::ConnectionStatus(item) => {
|
||||
match crate::store::insert_connection_status(item, &data_store, &stats).await {
|
||||
Ok(_) => {
|
||||
stats.store_worker_item_insert_inc();
|
||||
stats.store_worker_insert_done_inc();
|
||||
backoff = backoff_0;
|
||||
}
|
||||
Err(e) => {
|
||||
stats.store_worker_item_error_inc();
|
||||
// TODO introduce more structured error variants.
|
||||
if e.msg().contains("WriteTimeout") {
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
} else {
|
||||
// TODO back off but continue.
|
||||
error!("insert worker sees error: {e:?}");
|
||||
break;
|
||||
}
|
||||
stats_inc_for_err(&stats, &e);
|
||||
back_off_sleep(&mut backoff).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
QueryItem::ChannelStatus(item) => {
|
||||
match crate::store::insert_channel_status(item, &data_store, &stats).await {
|
||||
Ok(_) => {
|
||||
stats.store_worker_item_insert_inc();
|
||||
stats.store_worker_insert_done_inc();
|
||||
backoff = backoff_0;
|
||||
}
|
||||
Err(e) => {
|
||||
stats.store_worker_item_error_inc();
|
||||
// TODO introduce more structured error variants.
|
||||
if e.msg().contains("WriteTimeout") {
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
} else {
|
||||
// TODO back off but continue.
|
||||
error!("insert worker sees error: {e:?}");
|
||||
break;
|
||||
}
|
||||
stats_inc_for_err(&stats, &e);
|
||||
back_off_sleep(&mut backoff).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
QueryItem::Insert(item) => {
|
||||
stats.store_worker_item_recv_inc();
|
||||
let insert_frac = insert_frac.load(Ordering::Acquire);
|
||||
if i1 % 1000 < insert_frac {
|
||||
match crate::store::insert_item(item, &data_store, &stats).await {
|
||||
Ok(_) => {
|
||||
stats.store_worker_item_insert_inc();
|
||||
stats.store_worker_insert_done_inc();
|
||||
backoff = backoff_0;
|
||||
}
|
||||
Err(e) => {
|
||||
stats.store_worker_item_error_inc();
|
||||
// TODO introduce more structured error variants.
|
||||
if e.msg().contains("WriteTimeout") {
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
} else {
|
||||
// TODO back off but continue.
|
||||
error!("insert worker sees error: {e:?}");
|
||||
break;
|
||||
}
|
||||
stats_inc_for_err(&stats, &e);
|
||||
back_off_sleep(&mut backoff).await;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@@ -262,17 +277,16 @@ async fn spawn_scylla_insert_workers(
|
||||
item.ema,
|
||||
item.emd,
|
||||
);
|
||||
let qres = data_store
|
||||
.scy
|
||||
.query(
|
||||
"insert into muted (part, series, ts, ema, emd) values (?, ?, ?, ?, ?)",
|
||||
values,
|
||||
)
|
||||
.await;
|
||||
let qres = data_store.scy.execute(&data_store.qu_insert_muted, values).await;
|
||||
match qres {
|
||||
Ok(_) => {}
|
||||
Err(_) => {
|
||||
stats.store_worker_item_error_inc();
|
||||
Ok(_) => {
|
||||
stats.store_worker_insert_done_inc();
|
||||
backoff = backoff_0;
|
||||
}
|
||||
Err(e) => {
|
||||
let e = e.into_simpler();
|
||||
stats_inc_for_err(&stats, &e);
|
||||
back_off_sleep(&mut backoff).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -286,21 +300,44 @@ async fn spawn_scylla_insert_workers(
|
||||
);
|
||||
let qres = data_store
|
||||
.scy
|
||||
.query(
|
||||
"insert into item_recv_ivl (part, series, ts, ema, emd) values (?, ?, ?, ?, ?)",
|
||||
values,
|
||||
)
|
||||
.execute(&data_store.qu_insert_item_recv_ivl, values)
|
||||
.await;
|
||||
match qres {
|
||||
Ok(_) => {}
|
||||
Err(_) => {
|
||||
stats.store_worker_item_error_inc();
|
||||
Ok(_) => {
|
||||
stats.store_worker_insert_done_inc();
|
||||
backoff = backoff_0;
|
||||
}
|
||||
Err(e) => {
|
||||
let e = e.into_simpler();
|
||||
stats_inc_for_err(&stats, &e);
|
||||
back_off_sleep(&mut backoff).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
QueryItem::ChannelInfo(item) => {
|
||||
let params = (
|
||||
item.ts_msp as i32,
|
||||
item.series.id() as i64,
|
||||
item.ivl,
|
||||
item.interest,
|
||||
item.evsize as i32,
|
||||
);
|
||||
let qres = data_store.scy.execute(&data_store.qu_insert_channel_ping, params).await;
|
||||
match qres {
|
||||
Ok(_) => {
|
||||
stats.store_worker_insert_done_inc();
|
||||
backoff = backoff_0;
|
||||
}
|
||||
Err(e) => {
|
||||
let e = e.into_simpler();
|
||||
stats_inc_for_err(&stats, &e);
|
||||
back_off_sleep(&mut backoff).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
info!("insert worker has no more messages");
|
||||
trace!("insert worker has no more messages");
|
||||
};
|
||||
let jh = tokio::spawn(fut);
|
||||
jhs.push(jh);
|
||||
@@ -550,7 +587,7 @@ impl CaConnSet {
|
||||
ingest_commons.local_epics_hostname.clone(),
|
||||
512,
|
||||
200,
|
||||
ingest_commons.insert_item_queue.sender(),
|
||||
ingest_commons.insert_item_queue.sender().await,
|
||||
ingest_commons.data_store.clone(),
|
||||
ingest_commons.clone(),
|
||||
vec![channel_name],
|
||||
@@ -567,12 +604,11 @@ impl CaConnSet {
|
||||
self.ca_conn_ress.lock().await.contains_key(addr)
|
||||
}
|
||||
|
||||
pub async fn add_channel_or_create_conn() -> Result<(), Error> {
|
||||
// TODO fix race:
|
||||
// Must not drop mutex in-between calls.
|
||||
// Pass mutex on?
|
||||
|
||||
Ok(())
|
||||
pub async fn addr_nth_mod(&self, n: usize) -> Option<SocketAddr> {
|
||||
let g = self.ca_conn_ress.lock().await;
|
||||
let u = g.len();
|
||||
let n = n % u;
|
||||
g.keys().take(n).last().map(Clone::clone)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -816,6 +852,27 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
|
||||
}
|
||||
info!("channels_by_host len {}", channels_by_host.len());
|
||||
|
||||
// Periodic tasks triggered by commands:
|
||||
let mut iper = 0;
|
||||
loop {
|
||||
if SIGINT.load(Ordering::Acquire) != 0 {
|
||||
break;
|
||||
}
|
||||
let addr = ingest_commons.ca_conn_set.addr_nth_mod(iper).await;
|
||||
if let Some(addr) = addr {
|
||||
fn cmdgen() -> (ConnCommand, async_channel::Receiver<bool>) {
|
||||
ConnCommand::check_channels_alive()
|
||||
}
|
||||
// TODO race between getting nth address and command send, so ignore error so far.
|
||||
let _res = ingest_commons.ca_conn_set.send_command_to_addr(&addr, cmdgen).await;
|
||||
let cmdgen = || ConnCommand::save_conn_info();
|
||||
// TODO race between getting nth address and command send, so ignore error so far.
|
||||
let _res = ingest_commons.ca_conn_set.send_command_to_addr(&addr, cmdgen).await;
|
||||
}
|
||||
iper += 1;
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
}
|
||||
|
||||
loop {
|
||||
if SIGINT.load(Ordering::Acquire) != 0 {
|
||||
if false {
|
||||
@@ -833,12 +890,14 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
|
||||
ingest_commons.ca_conn_set.wait_stopped().await?;
|
||||
info!("all connections done.");
|
||||
|
||||
insert_item_queue.drop_sender().await;
|
||||
|
||||
drop(ingest_commons);
|
||||
metrics_agg_jh.abort();
|
||||
drop(metrics_agg_jh);
|
||||
|
||||
if false {
|
||||
let sender = insert_item_queue.sender_raw();
|
||||
let sender = insert_item_queue.sender_raw().await;
|
||||
sender.close();
|
||||
let receiver = insert_item_queue.receiver();
|
||||
receiver.close();
|
||||
@@ -856,6 +915,7 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
|
||||
let rc = receiver.receiver_count();
|
||||
info!("item queue B senders {} receivers {}", sc, rc);
|
||||
}
|
||||
receiver.close();
|
||||
|
||||
let mut futs = FuturesUnordered::from_iter(jh_insert_workers);
|
||||
loop {
|
||||
@@ -868,11 +928,10 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
|
||||
None => break,
|
||||
},
|
||||
_ = tokio::time::sleep(Duration::from_millis(1000)).fuse() => {
|
||||
info!("waiting for {} inserters", futs.len());
|
||||
if true {
|
||||
let sc = receiver.sender_count();
|
||||
let rc = receiver.receiver_count();
|
||||
info!("item queue B senders {} receivers {}", sc, rc);
|
||||
info!("waiting inserters {} items {} senders {} receivers {}", futs.len(), receiver.len(), sc, rc);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
@@ -6,8 +6,8 @@ use crate::ca::proto::{CreateChan, EventAdd};
|
||||
use crate::ca::store::ChannelRegistry;
|
||||
use crate::series::{Existence, SeriesId};
|
||||
use crate::store::{
|
||||
ChannelStatus, ChannelStatusItem, CommonInsertItemQueueSender, ConnectionStatus, ConnectionStatusItem, InsertItem,
|
||||
IvlItem, MuteItem, QueryItem,
|
||||
ChannelInfoItem, ChannelStatus, ChannelStatusItem, CommonInsertItemQueueSender, ConnectionStatus,
|
||||
ConnectionStatusItem, InsertItem, IvlItem, MuteItem, QueryItem,
|
||||
};
|
||||
use async_channel::Sender;
|
||||
use err::Error;
|
||||
@@ -122,6 +122,7 @@ struct CreatedState {
|
||||
insert_next_earliest: Instant,
|
||||
muted_before: u32,
|
||||
series: Option<SeriesId>,
|
||||
info_store_msp_last: u32,
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
@@ -246,6 +247,11 @@ impl SubidStore {
|
||||
}
|
||||
}
|
||||
|
||||
fn info_store_msp_from_time(ts: SystemTime) -> u32 {
|
||||
let dt = ts.duration_since(SystemTime::UNIX_EPOCH).unwrap_or(Duration::ZERO);
|
||||
(dt.as_secs() / MIN * MIN / SEC) as u32
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum ConnCommandKind {
|
||||
FindChannel(String, Sender<(SocketAddrV4, Vec<String>)>),
|
||||
@@ -255,6 +261,8 @@ pub enum ConnCommandKind {
|
||||
ChannelRemove(String, Sender<bool>),
|
||||
Shutdown(Sender<bool>),
|
||||
ExtraInsertsConf(ExtraInsertsConf, Sender<bool>),
|
||||
CheckChannelsAlive(Sender<bool>),
|
||||
SaveConnInfo(Sender<bool>),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -326,6 +334,22 @@ impl ConnCommand {
|
||||
};
|
||||
(cmd, rx)
|
||||
}
|
||||
|
||||
pub fn check_channels_alive() -> (ConnCommand, async_channel::Receiver<bool>) {
|
||||
let (tx, rx) = async_channel::bounded(1);
|
||||
let cmd = Self {
|
||||
kind: ConnCommandKind::CheckChannelsAlive(tx),
|
||||
};
|
||||
(cmd, rx)
|
||||
}
|
||||
|
||||
pub fn save_conn_info() -> (ConnCommand, async_channel::Receiver<bool>) {
|
||||
let (tx, rx) = async_channel::bounded(1);
|
||||
let cmd = Self {
|
||||
kind: ConnCommandKind::SaveConnInfo(tx),
|
||||
};
|
||||
(cmd, rx)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct CaConn {
|
||||
@@ -351,7 +375,6 @@ pub struct CaConn {
|
||||
stats: Arc<CaConnStats>,
|
||||
insert_queue_max: usize,
|
||||
insert_ivl_min_mus: u64,
|
||||
ts_channel_alive_check_last: Instant,
|
||||
conn_command_tx: async_channel::Sender<ConnCommand>,
|
||||
conn_command_rx: async_channel::Receiver<ConnCommand>,
|
||||
conn_backoff: f32,
|
||||
@@ -395,7 +418,6 @@ impl CaConn {
|
||||
stats: Arc::new(CaConnStats::new()),
|
||||
insert_queue_max,
|
||||
insert_ivl_min_mus: 1000 * 6,
|
||||
ts_channel_alive_check_last: Instant::now(),
|
||||
conn_command_tx: cq_tx,
|
||||
conn_command_rx: cq_rx,
|
||||
conn_backoff: 0.02,
|
||||
@@ -418,23 +440,24 @@ impl CaConn {
|
||||
match self.conn_command_rx.poll_next_unpin(cx) {
|
||||
Ready(Some(a)) => match a.kind {
|
||||
ConnCommandKind::FindChannel(pattern, tx) => {
|
||||
//info!("Search for {pattern:?}");
|
||||
let mut res = Vec::new();
|
||||
for name in self.name_by_cid.values() {
|
||||
if !pattern.is_empty() && name.contains(&pattern) {
|
||||
res.push(name.clone());
|
||||
}
|
||||
}
|
||||
let res = if let Ok(re) = regex::Regex::new(&pattern) {
|
||||
self.name_by_cid
|
||||
.values()
|
||||
.filter(|x| re.is_match(x))
|
||||
.map(ToString::to_string)
|
||||
.collect()
|
||||
} else {
|
||||
Vec::new()
|
||||
};
|
||||
let msg = (self.remote_addr_dbg.clone(), res);
|
||||
match tx.try_send(msg) {
|
||||
Ok(_) => {}
|
||||
Err(_) => {
|
||||
error!("response channel full or closed");
|
||||
self.stats.caconn_command_can_not_reply_inc();
|
||||
}
|
||||
}
|
||||
}
|
||||
ConnCommandKind::ChannelState(name, tx) => {
|
||||
//info!("State for {name:?}");
|
||||
let res = match self.cid_by_name.get(&name) {
|
||||
Some(cid) => match self.channels.get(cid) {
|
||||
Some(state) => Some(state.to_info(name, self.remote_addr_dbg.clone())),
|
||||
@@ -449,7 +472,7 @@ impl CaConn {
|
||||
match tx.try_send(msg) {
|
||||
Ok(_) => {}
|
||||
Err(_) => {
|
||||
error!("response channel full or closed");
|
||||
self.stats.caconn_command_can_not_reply_inc();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -469,7 +492,7 @@ impl CaConn {
|
||||
match tx.try_send(msg) {
|
||||
Ok(_) => {}
|
||||
Err(_) => {
|
||||
error!("response channel full or closed");
|
||||
self.stats.caconn_command_can_not_reply_inc();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -478,7 +501,7 @@ impl CaConn {
|
||||
match tx.try_send(true) {
|
||||
Ok(_) => {}
|
||||
Err(_) => {
|
||||
error!("response channel full or closed");
|
||||
self.stats.caconn_command_can_not_reply_inc();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -487,19 +510,19 @@ impl CaConn {
|
||||
match tx.try_send(true) {
|
||||
Ok(_) => {}
|
||||
Err(_) => {
|
||||
error!("response channel full or closed");
|
||||
self.stats.caconn_command_can_not_reply_inc();
|
||||
}
|
||||
}
|
||||
}
|
||||
ConnCommandKind::Shutdown(tx) => {
|
||||
self.shutdown = true;
|
||||
let _ = self.before_reset_of_channel_state();
|
||||
let res = self.before_reset_of_channel_state();
|
||||
self.state = CaConnState::Shutdown;
|
||||
self.proto = None;
|
||||
match tx.try_send(true) {
|
||||
match tx.try_send(res.is_ok()) {
|
||||
Ok(_) => {}
|
||||
Err(_) => {
|
||||
//error!("response channel full or closed");
|
||||
self.stats.caconn_command_can_not_reply_inc();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -508,7 +531,27 @@ impl CaConn {
|
||||
match tx.try_send(true) {
|
||||
Ok(_) => {}
|
||||
Err(_) => {
|
||||
//error!("response channel full or closed");
|
||||
self.stats.caconn_command_can_not_reply_inc();
|
||||
}
|
||||
}
|
||||
}
|
||||
ConnCommandKind::CheckChannelsAlive(tx) => {
|
||||
let res = self.check_channels_alive();
|
||||
let res = res.is_ok();
|
||||
match tx.try_send(res) {
|
||||
Ok(_) => {}
|
||||
Err(_) => {
|
||||
self.stats.caconn_command_can_not_reply_inc();
|
||||
}
|
||||
}
|
||||
}
|
||||
ConnCommandKind::SaveConnInfo(tx) => {
|
||||
let res = self.save_conn_info();
|
||||
let res = res.is_ok();
|
||||
match tx.try_send(res) {
|
||||
Ok(_) => {}
|
||||
Err(_) => {
|
||||
self.stats.caconn_command_can_not_reply_inc();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -574,8 +617,9 @@ impl CaConn {
|
||||
}
|
||||
|
||||
fn before_reset_of_channel_state(&mut self) -> Result<(), Error> {
|
||||
warn!("before_reset_of_channel_state channels {}", self.channels.len());
|
||||
trace!("before_reset_of_channel_state channels {}", self.channels.len());
|
||||
let mut created = 0;
|
||||
let mut warn_max = 0;
|
||||
for (_cid, chst) in &self.channels {
|
||||
match chst {
|
||||
ChannelState::Created(st) => {
|
||||
@@ -585,13 +629,14 @@ impl CaConn {
|
||||
series: series.clone(),
|
||||
status: ChannelStatus::Closed,
|
||||
});
|
||||
if created < 20 {
|
||||
//info!("store {:?}", item);
|
||||
if created < 10 {
|
||||
trace!("store {:?}", item);
|
||||
}
|
||||
self.insert_item_queue.push_back(item);
|
||||
} else {
|
||||
if created < 20 {
|
||||
//info!("no series for cid {:?}", st.cid);
|
||||
if warn_max < 10 {
|
||||
warn!("no series for cid {:?}", st.cid);
|
||||
warn_max += 1;
|
||||
}
|
||||
}
|
||||
created += 1;
|
||||
@@ -674,6 +719,38 @@ impl CaConn {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn save_conn_info(&mut self) -> Result<(), Error> {
|
||||
let timenow = SystemTime::now();
|
||||
for (_, st) in &mut self.channels {
|
||||
match st {
|
||||
ChannelState::Init => {
|
||||
// TODO need last-save-ts for this state.
|
||||
}
|
||||
ChannelState::Creating { cid: _, ts_beg: _ } => {
|
||||
// TODO need last-save-ts for this state.
|
||||
}
|
||||
ChannelState::Created(st) => {
|
||||
let msp = info_store_msp_from_time(timenow.clone());
|
||||
if msp != st.info_store_msp_last {
|
||||
st.info_store_msp_last = msp;
|
||||
let item = QueryItem::ChannelInfo(ChannelInfoItem {
|
||||
ts_msp: msp,
|
||||
series: st.series.clone().unwrap_or(SeriesId::new(0)),
|
||||
ivl: st.item_recv_ivl_ema.ema().ema(),
|
||||
interest: 0.,
|
||||
evsize: 0,
|
||||
});
|
||||
self.insert_item_queue.push_back(item);
|
||||
}
|
||||
}
|
||||
ChannelState::Error(_) => {
|
||||
// TODO need last-save-ts for this state.
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn channel_to_evented(
|
||||
&mut self,
|
||||
cid: Cid,
|
||||
@@ -730,6 +807,7 @@ impl CaConn {
|
||||
insert_next_earliest: tsnow,
|
||||
muted_before: 0,
|
||||
series: Some(series),
|
||||
info_store_msp_last: info_store_msp_from_time(SystemTime::now()),
|
||||
});
|
||||
let scalar_type = ScalarType::from_ca_id(data_type)?;
|
||||
let shape = Shape::from_ca_count(data_count)?;
|
||||
@@ -1149,6 +1227,7 @@ impl CaConn {
|
||||
insert_next_earliest: tsnow,
|
||||
muted_before: 0,
|
||||
series: None,
|
||||
info_store_msp_last: info_store_msp_from_time(SystemTime::now()),
|
||||
});
|
||||
// TODO handle error in different way. Should most likely not abort.
|
||||
let cd = ChannelDescDecoded {
|
||||
@@ -1375,13 +1454,6 @@ impl Stream for CaConn {
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use Poll::*;
|
||||
self.stats.caconn_poll_count_inc();
|
||||
let tsnow = Instant::now();
|
||||
if tsnow.duration_since(self.ts_channel_alive_check_last) >= Duration::from_millis(4000) {
|
||||
self.ts_channel_alive_check_last = tsnow;
|
||||
if let Err(e) = self.check_channels_alive() {
|
||||
error!("check_dead_channels {e:?}");
|
||||
}
|
||||
}
|
||||
if self.shutdown {
|
||||
info!("CaConn poll");
|
||||
}
|
||||
@@ -1400,7 +1472,7 @@ impl Stream for CaConn {
|
||||
}
|
||||
if self.shutdown {
|
||||
if self.insert_item_queue.len() == 0 {
|
||||
info!("no more items to flush");
|
||||
trace!("no more items to flush");
|
||||
break Ready(Ok(()));
|
||||
} else {
|
||||
info!("more items {}", self.insert_item_queue.len());
|
||||
|
||||
@@ -55,6 +55,12 @@ pub struct DataStore {
|
||||
pub qu_insert_array_i32: Arc<PreparedStatement>,
|
||||
pub qu_insert_array_f32: Arc<PreparedStatement>,
|
||||
pub qu_insert_array_f64: Arc<PreparedStatement>,
|
||||
pub qu_insert_muted: Arc<PreparedStatement>,
|
||||
pub qu_insert_item_recv_ivl: Arc<PreparedStatement>,
|
||||
pub qu_insert_connection_status: Arc<PreparedStatement>,
|
||||
pub qu_insert_channel_status: Arc<PreparedStatement>,
|
||||
pub qu_insert_channel_status_by_ts_msp: Arc<PreparedStatement>,
|
||||
pub qu_insert_channel_ping: Arc<PreparedStatement>,
|
||||
pub chan_reg: Arc<ChannelRegistry>,
|
||||
}
|
||||
|
||||
@@ -137,6 +143,38 @@ impl DataStore {
|
||||
.await
|
||||
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
|
||||
let qu_insert_array_f64 = Arc::new(q);
|
||||
// Others:
|
||||
let q = scy
|
||||
.prepare("insert into muted (part, series, ts, ema, emd) values (?, ?, ?, ?, ?)")
|
||||
.await
|
||||
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
|
||||
let qu_insert_muted = Arc::new(q);
|
||||
let q = scy
|
||||
.prepare("insert into item_recv_ivl (part, series, ts, ema, emd) values (?, ?, ?, ?, ?)")
|
||||
.await
|
||||
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
|
||||
let qu_insert_item_recv_ivl = Arc::new(q);
|
||||
// Connection status:
|
||||
let q = scy
|
||||
.prepare("insert into connection_status (ts_msp, ts_lsp, kind, addr) values (?, ?, ?, ?)")
|
||||
.await
|
||||
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
|
||||
let qu_insert_connection_status = Arc::new(q);
|
||||
let q = scy
|
||||
.prepare("insert into channel_status (series, ts_msp, ts_lsp, kind) values (?, ?, ?, ?)")
|
||||
.await
|
||||
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
|
||||
let qu_insert_channel_status = Arc::new(q);
|
||||
let q = scy
|
||||
.prepare("insert into channel_status_by_ts_msp (ts_msp, ts_lsp, series, kind) values (?, ?, ?, ?)")
|
||||
.await
|
||||
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
|
||||
let qu_insert_channel_status_by_ts_msp = Arc::new(q);
|
||||
let q = scy
|
||||
.prepare("insert into channel_ping (ts_msp, series, ivl, interest, evsize) values (?, ?, ?, ?, ?)")
|
||||
.await
|
||||
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
|
||||
let qu_insert_channel_ping = Arc::new(q);
|
||||
let ret = Self {
|
||||
chan_reg: Arc::new(ChannelRegistry::new(pg_client)),
|
||||
scy,
|
||||
@@ -153,6 +191,12 @@ impl DataStore {
|
||||
qu_insert_array_i32,
|
||||
qu_insert_array_f32,
|
||||
qu_insert_array_f64,
|
||||
qu_insert_muted,
|
||||
qu_insert_item_recv_ivl,
|
||||
qu_insert_connection_status,
|
||||
qu_insert_channel_status,
|
||||
qu_insert_channel_status_by_ts_msp,
|
||||
qu_insert_channel_ping,
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
@@ -2,19 +2,68 @@ use crate::ca::proto::{CaDataArrayValue, CaDataScalarValue, CaDataValue};
|
||||
use crate::ca::store::DataStore;
|
||||
use crate::errconv::ErrConv;
|
||||
use crate::series::SeriesId;
|
||||
use err::Error;
|
||||
use futures_util::{Future, FutureExt};
|
||||
use log::*;
|
||||
use netpod::{ScalarType, Shape};
|
||||
use scylla::frame::value::ValueList;
|
||||
use scylla::prepared_statement::PreparedStatement;
|
||||
use scylla::transport::errors::QueryError;
|
||||
use scylla::transport::errors::{DbError, QueryError};
|
||||
use scylla::{QueryResult, Session as ScySession};
|
||||
use stats::CaConnStats;
|
||||
use std::net::SocketAddrV4;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
use std::time::{Instant, SystemTime};
|
||||
use tokio::sync::Mutex as TokMx;
|
||||
|
||||
pub const CONNECTION_STATUS_DIV: u64 = netpod::timeunits::DAY;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Error {
|
||||
DbUnavailable,
|
||||
DbOverload,
|
||||
DbTimeout,
|
||||
DbError(String),
|
||||
}
|
||||
|
||||
impl From<Error> for err::Error {
|
||||
fn from(e: Error) -> Self {
|
||||
err::Error::with_msg_no_trace(format!("{e:?}"))
|
||||
}
|
||||
}
|
||||
|
||||
pub trait IntoSimplerError {
|
||||
fn into_simpler(self) -> Error;
|
||||
}
|
||||
|
||||
impl IntoSimplerError for QueryError {
|
||||
fn into_simpler(self) -> Error {
|
||||
let e = self;
|
||||
match e {
|
||||
QueryError::DbError(e, msg) => match e {
|
||||
DbError::Unavailable { .. } => Error::DbUnavailable,
|
||||
DbError::Overloaded => Error::DbOverload,
|
||||
DbError::IsBootstrapping => Error::DbUnavailable,
|
||||
DbError::ReadTimeout { .. } => Error::DbTimeout,
|
||||
DbError::WriteTimeout { .. } => Error::DbTimeout,
|
||||
_ => Error::DbError(format!("{e} {msg}")),
|
||||
},
|
||||
QueryError::BadQuery(e) => Error::DbError(e.to_string()),
|
||||
QueryError::IoError(e) => Error::DbError(e.to_string()),
|
||||
QueryError::ProtocolError(e) => Error::DbError(e.to_string()),
|
||||
QueryError::InvalidMessage(e) => Error::DbError(e.to_string()),
|
||||
QueryError::TimeoutError => Error::DbTimeout,
|
||||
QueryError::TooManyOrphanedStreamIds(e) => Error::DbError(e.to_string()),
|
||||
QueryError::UnableToAllocStreamId => Error::DbError(e.to_string()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: IntoSimplerError> From<T> for Error {
|
||||
fn from(e: T) -> Self {
|
||||
e.into_simpler()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ScyInsertFut<'a> {
|
||||
fut: Pin<Box<dyn Future<Output = Result<QueryResult, QueryError>> + Send + 'a>>,
|
||||
@@ -43,7 +92,7 @@ impl<'a> ScyInsertFut<'a> {
|
||||
}
|
||||
|
||||
impl<'a> Future for ScyInsertFut<'a> {
|
||||
type Output = Result<(), Error>;
|
||||
type Output = Result<(), err::Error>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||
use Poll::*;
|
||||
@@ -146,6 +195,15 @@ pub struct IvlItem {
|
||||
pub emd: f32,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ChannelInfoItem {
|
||||
pub ts_msp: u32,
|
||||
pub series: SeriesId,
|
||||
pub ivl: f32,
|
||||
pub interest: f32,
|
||||
pub evsize: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum QueryItem {
|
||||
ConnectionStatus(ConnectionStatusItem),
|
||||
@@ -153,6 +211,7 @@ pub enum QueryItem {
|
||||
Insert(InsertItem),
|
||||
Mute(MuteItem),
|
||||
Ivl(IvlItem),
|
||||
ChannelInfo(ChannelInfoItem),
|
||||
}
|
||||
|
||||
pub struct CommonInsertItemQueueSender {
|
||||
@@ -172,7 +231,7 @@ impl CommonInsertItemQueueSender {
|
||||
}
|
||||
|
||||
pub struct CommonInsertItemQueue {
|
||||
sender: async_channel::Sender<QueryItem>,
|
||||
sender: TokMx<async_channel::Sender<QueryItem>>,
|
||||
recv: async_channel::Receiver<QueryItem>,
|
||||
}
|
||||
|
||||
@@ -180,27 +239,27 @@ impl CommonInsertItemQueue {
|
||||
pub fn new(cap: usize) -> Self {
|
||||
let (tx, rx) = async_channel::bounded(cap);
|
||||
Self {
|
||||
sender: tx.clone(),
|
||||
sender: TokMx::new(tx.clone()),
|
||||
recv: rx,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn sender(&self) -> CommonInsertItemQueueSender {
|
||||
pub async fn sender(&self) -> CommonInsertItemQueueSender {
|
||||
CommonInsertItemQueueSender {
|
||||
sender: self.sender.clone(),
|
||||
sender: self.sender.lock().await.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn sender_raw(&self) -> async_channel::Sender<QueryItem> {
|
||||
self.sender.clone()
|
||||
pub async fn sender_raw(&self) -> async_channel::Sender<QueryItem> {
|
||||
self.sender.lock().await.clone()
|
||||
}
|
||||
|
||||
pub fn receiver(&self) -> async_channel::Receiver<QueryItem> {
|
||||
self.recv.clone()
|
||||
}
|
||||
|
||||
pub fn sender_count(&self) -> usize {
|
||||
self.sender.sender_count()
|
||||
pub async fn sender_count(&self) -> usize {
|
||||
self.sender.lock().await.sender_count()
|
||||
}
|
||||
|
||||
pub fn sender_count2(&self) -> usize {
|
||||
@@ -210,6 +269,12 @@ impl CommonInsertItemQueue {
|
||||
pub fn receiver_count(&self) -> usize {
|
||||
self.recv.receiver_count()
|
||||
}
|
||||
|
||||
// TODO should mark this such that a future call to sender() will fail
|
||||
pub async fn drop_sender(&self) {
|
||||
let x = std::mem::replace(&mut *self.sender.lock().await, async_channel::bounded(1).0);
|
||||
drop(x);
|
||||
}
|
||||
}
|
||||
|
||||
struct InsParCom {
|
||||
@@ -235,8 +300,18 @@ where
|
||||
par.pulse as i64,
|
||||
val,
|
||||
);
|
||||
data_store.scy.execute(qu, params).await.err_conv()?;
|
||||
Ok(())
|
||||
let y = data_store.scy.execute(qu, params).await;
|
||||
match y {
|
||||
Ok(_) => Ok(()),
|
||||
Err(e) => match e {
|
||||
QueryError::TimeoutError => Err(Error::DbTimeout),
|
||||
QueryError::DbError(e, msg) => match e {
|
||||
DbError::Overloaded => Err(Error::DbOverload),
|
||||
_ => Err(Error::DbError(format!("{e} {msg}"))),
|
||||
},
|
||||
_ => Err(Error::DbError(format!("{e}"))),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
async fn insert_array_gen<ST>(
|
||||
@@ -255,19 +330,15 @@ where
|
||||
par.pulse as i64,
|
||||
val,
|
||||
);
|
||||
data_store.scy.execute(qu, params).await.err_conv()?;
|
||||
data_store.scy.execute(qu, params).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn insert_item(item: InsertItem, data_store: &DataStore, stats: &CaConnStats) -> Result<(), Error> {
|
||||
if item.msp_bump {
|
||||
let params = (item.series.id() as i64, item.ts_msp as i64);
|
||||
data_store
|
||||
.scy
|
||||
.execute(&data_store.qu_insert_ts_msp, params)
|
||||
.await
|
||||
.err_conv()?;
|
||||
stats.inserts_msp_inc()
|
||||
data_store.scy.execute(&data_store.qu_insert_ts_msp, params).await?;
|
||||
stats.inserts_msp_inc();
|
||||
}
|
||||
if let Some(ts_msp_grid) = item.ts_msp_grid {
|
||||
let params = (
|
||||
@@ -280,9 +351,8 @@ pub async fn insert_item(item: InsertItem, data_store: &DataStore, stats: &CaCon
|
||||
data_store
|
||||
.scy
|
||||
.execute(&data_store.qu_insert_series_by_ts_msp, params)
|
||||
.await
|
||||
.err_conv()?;
|
||||
stats.inserts_msp_grid_inc()
|
||||
.await?;
|
||||
stats.inserts_msp_grid_inc();
|
||||
}
|
||||
let par = InsParCom {
|
||||
series: item.series.id(),
|
||||
@@ -328,20 +398,15 @@ pub async fn insert_connection_status(
|
||||
let secs = tsunix.as_secs() * netpod::timeunits::SEC;
|
||||
let nanos = tsunix.subsec_nanos() as u64;
|
||||
let ts = secs + nanos;
|
||||
let div = netpod::timeunits::SEC * 600;
|
||||
let ts_msp = ts / div * div;
|
||||
let ts_msp = ts / CONNECTION_STATUS_DIV * CONNECTION_STATUS_DIV;
|
||||
let ts_lsp = ts - ts_msp;
|
||||
let kind = item.status as u32;
|
||||
let addr = format!("{}", item.addr);
|
||||
let params = (ts_msp as i64, ts_lsp as i64, kind as i32, addr);
|
||||
data_store
|
||||
.scy
|
||||
.query(
|
||||
"insert into connection_status (ts_msp, ts_lsp, kind, addr) values (?, ?, ?, ?)",
|
||||
params,
|
||||
)
|
||||
.await
|
||||
.err_conv()?;
|
||||
.execute(&data_store.qu_insert_connection_status, params)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -354,19 +419,19 @@ pub async fn insert_channel_status(
|
||||
let secs = tsunix.as_secs() * netpod::timeunits::SEC;
|
||||
let nanos = tsunix.subsec_nanos() as u64;
|
||||
let ts = secs + nanos;
|
||||
let div = netpod::timeunits::DAY;
|
||||
let ts_msp = ts / div * div;
|
||||
let ts_msp = ts / CONNECTION_STATUS_DIV * CONNECTION_STATUS_DIV;
|
||||
let ts_lsp = ts - ts_msp;
|
||||
let kind = item.status.kind();
|
||||
let series = item.series.id();
|
||||
let params = (series as i64, ts_msp as i64, ts_lsp as i64, kind as i32);
|
||||
data_store
|
||||
.scy
|
||||
.query(
|
||||
"insert into channel_status (series, ts_msp, ts_lsp, kind) values (?, ?, ?, ?)",
|
||||
params,
|
||||
)
|
||||
.await
|
||||
.err_conv()?;
|
||||
.execute(&data_store.qu_insert_channel_status, params)
|
||||
.await?;
|
||||
let params = (ts_msp as i64, ts_lsp as i64, series as i64, kind as i32);
|
||||
data_store
|
||||
.scy
|
||||
.execute(&data_store.qu_insert_channel_status_by_ts_msp, params)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -153,14 +153,19 @@ stats_proc::stats_struct!((
|
||||
inserts_queue_drop,
|
||||
channel_fast_item_drop,
|
||||
store_worker_item_recv,
|
||||
store_worker_item_insert,
|
||||
// TODO rename to make clear that this drop is voluntary because of user config choice:
|
||||
store_worker_item_drop,
|
||||
store_worker_item_error,
|
||||
store_worker_insert_done,
|
||||
store_worker_insert_overload,
|
||||
store_worker_insert_timeout,
|
||||
store_worker_insert_unavailable,
|
||||
store_worker_insert_error,
|
||||
caconn_poll_count,
|
||||
caconn_loop1_count,
|
||||
caconn_loop2_count,
|
||||
caconn_loop3_count,
|
||||
caconn_loop4_count,
|
||||
caconn_command_can_not_reply,
|
||||
time_handle_conn_listen,
|
||||
time_handle_peer_ready,
|
||||
time_check_channels_state_init,
|
||||
|
||||
Reference in New Issue
Block a user