diff --git a/netfetch/src/ca.rs b/netfetch/src/ca.rs index 5eb8d50..67c16f8 100644 --- a/netfetch/src/ca.rs +++ b/netfetch/src/ca.rs @@ -8,7 +8,7 @@ use self::store::DataStore; use crate::ca::conn::ConnCommand; use crate::errconv::ErrConv; use crate::linuxhelper::local_hostname; -use crate::store::{CommonInsertItemQueue, CommonInsertItemQueueSender, QueryItem}; +use crate::store::{CommonInsertItemQueue, CommonInsertItemQueueSender, IntoSimplerError, QueryItem}; use async_channel::Sender; use conn::CaConn; use err::Error; @@ -169,6 +169,37 @@ impl ExtraInsertsConf { } } +fn stats_inc_for_err(stats: &stats::CaConnStats, err: &crate::store::Error) { + use crate::store::Error; + match err { + Error::DbOverload => { + stats.store_worker_insert_overload_inc(); + } + Error::DbTimeout => { + stats.store_worker_insert_timeout_inc(); + } + Error::DbUnavailable => { + stats.store_worker_insert_unavailable_inc(); + } + Error::DbError(_) => { + stats.store_worker_insert_error_inc(); + } + } +} + +fn back_off_next(backoff_dt: &mut Duration) { + *backoff_dt = *backoff_dt + (*backoff_dt) * 3 / 2; + let dtmax = Duration::from_millis(4000); + if *backoff_dt > dtmax { + *backoff_dt = dtmax; + } +} + +async fn back_off_sleep(backoff_dt: &mut Duration) { + back_off_next(backoff_dt); + tokio::time::sleep(*backoff_dt).await; +} + async fn spawn_scylla_insert_workers( scyconf: ScyllaConfig, insert_scylla_sessions: usize, @@ -190,63 +221,47 @@ async fn spawn_scylla_insert_workers( let recv = insert_item_queue.receiver(); let insert_frac = insert_frac.clone(); let fut = async move { + let backoff_0 = Duration::from_millis(10); + let mut backoff = backoff_0.clone(); let mut i1 = 0; while let Ok(item) = recv.recv().await { + stats.store_worker_item_recv_inc(); match item { QueryItem::ConnectionStatus(item) => { match crate::store::insert_connection_status(item, &data_store, &stats).await { Ok(_) => { - stats.store_worker_item_insert_inc(); + stats.store_worker_insert_done_inc(); + backoff = backoff_0; } Err(e) => { - stats.store_worker_item_error_inc(); - // TODO introduce more structured error variants. - if e.msg().contains("WriteTimeout") { - tokio::time::sleep(Duration::from_millis(100)).await; - } else { - // TODO back off but continue. - error!("insert worker sees error: {e:?}"); - break; - } + stats_inc_for_err(&stats, &e); + back_off_sleep(&mut backoff).await; } } } QueryItem::ChannelStatus(item) => { match crate::store::insert_channel_status(item, &data_store, &stats).await { Ok(_) => { - stats.store_worker_item_insert_inc(); + stats.store_worker_insert_done_inc(); + backoff = backoff_0; } Err(e) => { - stats.store_worker_item_error_inc(); - // TODO introduce more structured error variants. - if e.msg().contains("WriteTimeout") { - tokio::time::sleep(Duration::from_millis(100)).await; - } else { - // TODO back off but continue. - error!("insert worker sees error: {e:?}"); - break; - } + stats_inc_for_err(&stats, &e); + back_off_sleep(&mut backoff).await; } } } QueryItem::Insert(item) => { - stats.store_worker_item_recv_inc(); let insert_frac = insert_frac.load(Ordering::Acquire); if i1 % 1000 < insert_frac { match crate::store::insert_item(item, &data_store, &stats).await { Ok(_) => { - stats.store_worker_item_insert_inc(); + stats.store_worker_insert_done_inc(); + backoff = backoff_0; } Err(e) => { - stats.store_worker_item_error_inc(); - // TODO introduce more structured error variants. - if e.msg().contains("WriteTimeout") { - tokio::time::sleep(Duration::from_millis(100)).await; - } else { - // TODO back off but continue. - error!("insert worker sees error: {e:?}"); - break; - } + stats_inc_for_err(&stats, &e); + back_off_sleep(&mut backoff).await; } } } else { @@ -262,17 +277,16 @@ async fn spawn_scylla_insert_workers( item.ema, item.emd, ); - let qres = data_store - .scy - .query( - "insert into muted (part, series, ts, ema, emd) values (?, ?, ?, ?, ?)", - values, - ) - .await; + let qres = data_store.scy.execute(&data_store.qu_insert_muted, values).await; match qres { - Ok(_) => {} - Err(_) => { - stats.store_worker_item_error_inc(); + Ok(_) => { + stats.store_worker_insert_done_inc(); + backoff = backoff_0; + } + Err(e) => { + let e = e.into_simpler(); + stats_inc_for_err(&stats, &e); + back_off_sleep(&mut backoff).await; } } } @@ -286,21 +300,44 @@ async fn spawn_scylla_insert_workers( ); let qres = data_store .scy - .query( - "insert into item_recv_ivl (part, series, ts, ema, emd) values (?, ?, ?, ?, ?)", - values, - ) + .execute(&data_store.qu_insert_item_recv_ivl, values) .await; match qres { - Ok(_) => {} - Err(_) => { - stats.store_worker_item_error_inc(); + Ok(_) => { + stats.store_worker_insert_done_inc(); + backoff = backoff_0; + } + Err(e) => { + let e = e.into_simpler(); + stats_inc_for_err(&stats, &e); + back_off_sleep(&mut backoff).await; + } + } + } + QueryItem::ChannelInfo(item) => { + let params = ( + item.ts_msp as i32, + item.series.id() as i64, + item.ivl, + item.interest, + item.evsize as i32, + ); + let qres = data_store.scy.execute(&data_store.qu_insert_channel_ping, params).await; + match qres { + Ok(_) => { + stats.store_worker_insert_done_inc(); + backoff = backoff_0; + } + Err(e) => { + let e = e.into_simpler(); + stats_inc_for_err(&stats, &e); + back_off_sleep(&mut backoff).await; } } } } } - info!("insert worker has no more messages"); + trace!("insert worker has no more messages"); }; let jh = tokio::spawn(fut); jhs.push(jh); @@ -550,7 +587,7 @@ impl CaConnSet { ingest_commons.local_epics_hostname.clone(), 512, 200, - ingest_commons.insert_item_queue.sender(), + ingest_commons.insert_item_queue.sender().await, ingest_commons.data_store.clone(), ingest_commons.clone(), vec![channel_name], @@ -567,12 +604,11 @@ impl CaConnSet { self.ca_conn_ress.lock().await.contains_key(addr) } - pub async fn add_channel_or_create_conn() -> Result<(), Error> { - // TODO fix race: - // Must not drop mutex in-between calls. - // Pass mutex on? - - Ok(()) + pub async fn addr_nth_mod(&self, n: usize) -> Option { + let g = self.ca_conn_ress.lock().await; + let u = g.len(); + let n = n % u; + g.keys().take(n).last().map(Clone::clone) } } @@ -816,6 +852,27 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { } info!("channels_by_host len {}", channels_by_host.len()); + // Periodic tasks triggered by commands: + let mut iper = 0; + loop { + if SIGINT.load(Ordering::Acquire) != 0 { + break; + } + let addr = ingest_commons.ca_conn_set.addr_nth_mod(iper).await; + if let Some(addr) = addr { + fn cmdgen() -> (ConnCommand, async_channel::Receiver) { + ConnCommand::check_channels_alive() + } + // TODO race between getting nth address and command send, so ignore error so far. + let _res = ingest_commons.ca_conn_set.send_command_to_addr(&addr, cmdgen).await; + let cmdgen = || ConnCommand::save_conn_info(); + // TODO race between getting nth address and command send, so ignore error so far. + let _res = ingest_commons.ca_conn_set.send_command_to_addr(&addr, cmdgen).await; + } + iper += 1; + tokio::time::sleep(Duration::from_millis(10)).await; + } + loop { if SIGINT.load(Ordering::Acquire) != 0 { if false { @@ -833,12 +890,14 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { ingest_commons.ca_conn_set.wait_stopped().await?; info!("all connections done."); + insert_item_queue.drop_sender().await; + drop(ingest_commons); metrics_agg_jh.abort(); drop(metrics_agg_jh); if false { - let sender = insert_item_queue.sender_raw(); + let sender = insert_item_queue.sender_raw().await; sender.close(); let receiver = insert_item_queue.receiver(); receiver.close(); @@ -856,6 +915,7 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { let rc = receiver.receiver_count(); info!("item queue B senders {} receivers {}", sc, rc); } + receiver.close(); let mut futs = FuturesUnordered::from_iter(jh_insert_workers); loop { @@ -868,11 +928,10 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { None => break, }, _ = tokio::time::sleep(Duration::from_millis(1000)).fuse() => { - info!("waiting for {} inserters", futs.len()); if true { let sc = receiver.sender_count(); let rc = receiver.receiver_count(); - info!("item queue B senders {} receivers {}", sc, rc); + info!("waiting inserters {} items {} senders {} receivers {}", futs.len(), receiver.len(), sc, rc); } } ); diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index c7c2041..c2b44ce 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -6,8 +6,8 @@ use crate::ca::proto::{CreateChan, EventAdd}; use crate::ca::store::ChannelRegistry; use crate::series::{Existence, SeriesId}; use crate::store::{ - ChannelStatus, ChannelStatusItem, CommonInsertItemQueueSender, ConnectionStatus, ConnectionStatusItem, InsertItem, - IvlItem, MuteItem, QueryItem, + ChannelInfoItem, ChannelStatus, ChannelStatusItem, CommonInsertItemQueueSender, ConnectionStatus, + ConnectionStatusItem, InsertItem, IvlItem, MuteItem, QueryItem, }; use async_channel::Sender; use err::Error; @@ -122,6 +122,7 @@ struct CreatedState { insert_next_earliest: Instant, muted_before: u32, series: Option, + info_store_msp_last: u32, } #[allow(unused)] @@ -246,6 +247,11 @@ impl SubidStore { } } +fn info_store_msp_from_time(ts: SystemTime) -> u32 { + let dt = ts.duration_since(SystemTime::UNIX_EPOCH).unwrap_or(Duration::ZERO); + (dt.as_secs() / MIN * MIN / SEC) as u32 +} + #[derive(Debug)] pub enum ConnCommandKind { FindChannel(String, Sender<(SocketAddrV4, Vec)>), @@ -255,6 +261,8 @@ pub enum ConnCommandKind { ChannelRemove(String, Sender), Shutdown(Sender), ExtraInsertsConf(ExtraInsertsConf, Sender), + CheckChannelsAlive(Sender), + SaveConnInfo(Sender), } #[derive(Debug)] @@ -326,6 +334,22 @@ impl ConnCommand { }; (cmd, rx) } + + pub fn check_channels_alive() -> (ConnCommand, async_channel::Receiver) { + let (tx, rx) = async_channel::bounded(1); + let cmd = Self { + kind: ConnCommandKind::CheckChannelsAlive(tx), + }; + (cmd, rx) + } + + pub fn save_conn_info() -> (ConnCommand, async_channel::Receiver) { + let (tx, rx) = async_channel::bounded(1); + let cmd = Self { + kind: ConnCommandKind::SaveConnInfo(tx), + }; + (cmd, rx) + } } pub struct CaConn { @@ -351,7 +375,6 @@ pub struct CaConn { stats: Arc, insert_queue_max: usize, insert_ivl_min_mus: u64, - ts_channel_alive_check_last: Instant, conn_command_tx: async_channel::Sender, conn_command_rx: async_channel::Receiver, conn_backoff: f32, @@ -395,7 +418,6 @@ impl CaConn { stats: Arc::new(CaConnStats::new()), insert_queue_max, insert_ivl_min_mus: 1000 * 6, - ts_channel_alive_check_last: Instant::now(), conn_command_tx: cq_tx, conn_command_rx: cq_rx, conn_backoff: 0.02, @@ -418,23 +440,24 @@ impl CaConn { match self.conn_command_rx.poll_next_unpin(cx) { Ready(Some(a)) => match a.kind { ConnCommandKind::FindChannel(pattern, tx) => { - //info!("Search for {pattern:?}"); - let mut res = Vec::new(); - for name in self.name_by_cid.values() { - if !pattern.is_empty() && name.contains(&pattern) { - res.push(name.clone()); - } - } + let res = if let Ok(re) = regex::Regex::new(&pattern) { + self.name_by_cid + .values() + .filter(|x| re.is_match(x)) + .map(ToString::to_string) + .collect() + } else { + Vec::new() + }; let msg = (self.remote_addr_dbg.clone(), res); match tx.try_send(msg) { Ok(_) => {} Err(_) => { - error!("response channel full or closed"); + self.stats.caconn_command_can_not_reply_inc(); } } } ConnCommandKind::ChannelState(name, tx) => { - //info!("State for {name:?}"); let res = match self.cid_by_name.get(&name) { Some(cid) => match self.channels.get(cid) { Some(state) => Some(state.to_info(name, self.remote_addr_dbg.clone())), @@ -449,7 +472,7 @@ impl CaConn { match tx.try_send(msg) { Ok(_) => {} Err(_) => { - error!("response channel full or closed"); + self.stats.caconn_command_can_not_reply_inc(); } } } @@ -469,7 +492,7 @@ impl CaConn { match tx.try_send(msg) { Ok(_) => {} Err(_) => { - error!("response channel full or closed"); + self.stats.caconn_command_can_not_reply_inc(); } } } @@ -478,7 +501,7 @@ impl CaConn { match tx.try_send(true) { Ok(_) => {} Err(_) => { - error!("response channel full or closed"); + self.stats.caconn_command_can_not_reply_inc(); } } } @@ -487,19 +510,19 @@ impl CaConn { match tx.try_send(true) { Ok(_) => {} Err(_) => { - error!("response channel full or closed"); + self.stats.caconn_command_can_not_reply_inc(); } } } ConnCommandKind::Shutdown(tx) => { self.shutdown = true; - let _ = self.before_reset_of_channel_state(); + let res = self.before_reset_of_channel_state(); self.state = CaConnState::Shutdown; self.proto = None; - match tx.try_send(true) { + match tx.try_send(res.is_ok()) { Ok(_) => {} Err(_) => { - //error!("response channel full or closed"); + self.stats.caconn_command_can_not_reply_inc(); } } } @@ -508,7 +531,27 @@ impl CaConn { match tx.try_send(true) { Ok(_) => {} Err(_) => { - //error!("response channel full or closed"); + self.stats.caconn_command_can_not_reply_inc(); + } + } + } + ConnCommandKind::CheckChannelsAlive(tx) => { + let res = self.check_channels_alive(); + let res = res.is_ok(); + match tx.try_send(res) { + Ok(_) => {} + Err(_) => { + self.stats.caconn_command_can_not_reply_inc(); + } + } + } + ConnCommandKind::SaveConnInfo(tx) => { + let res = self.save_conn_info(); + let res = res.is_ok(); + match tx.try_send(res) { + Ok(_) => {} + Err(_) => { + self.stats.caconn_command_can_not_reply_inc(); } } } @@ -574,8 +617,9 @@ impl CaConn { } fn before_reset_of_channel_state(&mut self) -> Result<(), Error> { - warn!("before_reset_of_channel_state channels {}", self.channels.len()); + trace!("before_reset_of_channel_state channels {}", self.channels.len()); let mut created = 0; + let mut warn_max = 0; for (_cid, chst) in &self.channels { match chst { ChannelState::Created(st) => { @@ -585,13 +629,14 @@ impl CaConn { series: series.clone(), status: ChannelStatus::Closed, }); - if created < 20 { - //info!("store {:?}", item); + if created < 10 { + trace!("store {:?}", item); } self.insert_item_queue.push_back(item); } else { - if created < 20 { - //info!("no series for cid {:?}", st.cid); + if warn_max < 10 { + warn!("no series for cid {:?}", st.cid); + warn_max += 1; } } created += 1; @@ -674,6 +719,38 @@ impl CaConn { Ok(()) } + fn save_conn_info(&mut self) -> Result<(), Error> { + let timenow = SystemTime::now(); + for (_, st) in &mut self.channels { + match st { + ChannelState::Init => { + // TODO need last-save-ts for this state. + } + ChannelState::Creating { cid: _, ts_beg: _ } => { + // TODO need last-save-ts for this state. + } + ChannelState::Created(st) => { + let msp = info_store_msp_from_time(timenow.clone()); + if msp != st.info_store_msp_last { + st.info_store_msp_last = msp; + let item = QueryItem::ChannelInfo(ChannelInfoItem { + ts_msp: msp, + series: st.series.clone().unwrap_or(SeriesId::new(0)), + ivl: st.item_recv_ivl_ema.ema().ema(), + interest: 0., + evsize: 0, + }); + self.insert_item_queue.push_back(item); + } + } + ChannelState::Error(_) => { + // TODO need last-save-ts for this state. + } + } + } + Ok(()) + } + fn channel_to_evented( &mut self, cid: Cid, @@ -730,6 +807,7 @@ impl CaConn { insert_next_earliest: tsnow, muted_before: 0, series: Some(series), + info_store_msp_last: info_store_msp_from_time(SystemTime::now()), }); let scalar_type = ScalarType::from_ca_id(data_type)?; let shape = Shape::from_ca_count(data_count)?; @@ -1149,6 +1227,7 @@ impl CaConn { insert_next_earliest: tsnow, muted_before: 0, series: None, + info_store_msp_last: info_store_msp_from_time(SystemTime::now()), }); // TODO handle error in different way. Should most likely not abort. let cd = ChannelDescDecoded { @@ -1375,13 +1454,6 @@ impl Stream for CaConn { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; self.stats.caconn_poll_count_inc(); - let tsnow = Instant::now(); - if tsnow.duration_since(self.ts_channel_alive_check_last) >= Duration::from_millis(4000) { - self.ts_channel_alive_check_last = tsnow; - if let Err(e) = self.check_channels_alive() { - error!("check_dead_channels {e:?}"); - } - } if self.shutdown { info!("CaConn poll"); } @@ -1400,7 +1472,7 @@ impl Stream for CaConn { } if self.shutdown { if self.insert_item_queue.len() == 0 { - info!("no more items to flush"); + trace!("no more items to flush"); break Ready(Ok(())); } else { info!("more items {}", self.insert_item_queue.len()); diff --git a/netfetch/src/ca/store.rs b/netfetch/src/ca/store.rs index 5f8730d..1663905 100644 --- a/netfetch/src/ca/store.rs +++ b/netfetch/src/ca/store.rs @@ -55,6 +55,12 @@ pub struct DataStore { pub qu_insert_array_i32: Arc, pub qu_insert_array_f32: Arc, pub qu_insert_array_f64: Arc, + pub qu_insert_muted: Arc, + pub qu_insert_item_recv_ivl: Arc, + pub qu_insert_connection_status: Arc, + pub qu_insert_channel_status: Arc, + pub qu_insert_channel_status_by_ts_msp: Arc, + pub qu_insert_channel_ping: Arc, pub chan_reg: Arc, } @@ -137,6 +143,38 @@ impl DataStore { .await .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; let qu_insert_array_f64 = Arc::new(q); + // Others: + let q = scy + .prepare("insert into muted (part, series, ts, ema, emd) values (?, ?, ?, ?, ?)") + .await + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + let qu_insert_muted = Arc::new(q); + let q = scy + .prepare("insert into item_recv_ivl (part, series, ts, ema, emd) values (?, ?, ?, ?, ?)") + .await + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + let qu_insert_item_recv_ivl = Arc::new(q); + // Connection status: + let q = scy + .prepare("insert into connection_status (ts_msp, ts_lsp, kind, addr) values (?, ?, ?, ?)") + .await + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + let qu_insert_connection_status = Arc::new(q); + let q = scy + .prepare("insert into channel_status (series, ts_msp, ts_lsp, kind) values (?, ?, ?, ?)") + .await + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + let qu_insert_channel_status = Arc::new(q); + let q = scy + .prepare("insert into channel_status_by_ts_msp (ts_msp, ts_lsp, series, kind) values (?, ?, ?, ?)") + .await + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + let qu_insert_channel_status_by_ts_msp = Arc::new(q); + let q = scy + .prepare("insert into channel_ping (ts_msp, series, ivl, interest, evsize) values (?, ?, ?, ?, ?)") + .await + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + let qu_insert_channel_ping = Arc::new(q); let ret = Self { chan_reg: Arc::new(ChannelRegistry::new(pg_client)), scy, @@ -153,6 +191,12 @@ impl DataStore { qu_insert_array_i32, qu_insert_array_f32, qu_insert_array_f64, + qu_insert_muted, + qu_insert_item_recv_ivl, + qu_insert_connection_status, + qu_insert_channel_status, + qu_insert_channel_status_by_ts_msp, + qu_insert_channel_ping, }; Ok(ret) } diff --git a/netfetch/src/store.rs b/netfetch/src/store.rs index eb1e44c..5da880f 100644 --- a/netfetch/src/store.rs +++ b/netfetch/src/store.rs @@ -2,19 +2,68 @@ use crate::ca::proto::{CaDataArrayValue, CaDataScalarValue, CaDataValue}; use crate::ca::store::DataStore; use crate::errconv::ErrConv; use crate::series::SeriesId; -use err::Error; use futures_util::{Future, FutureExt}; use log::*; use netpod::{ScalarType, Shape}; use scylla::frame::value::ValueList; use scylla::prepared_statement::PreparedStatement; -use scylla::transport::errors::QueryError; +use scylla::transport::errors::{DbError, QueryError}; use scylla::{QueryResult, Session as ScySession}; use stats::CaConnStats; use std::net::SocketAddrV4; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::{Instant, SystemTime}; +use tokio::sync::Mutex as TokMx; + +pub const CONNECTION_STATUS_DIV: u64 = netpod::timeunits::DAY; + +#[derive(Debug)] +pub enum Error { + DbUnavailable, + DbOverload, + DbTimeout, + DbError(String), +} + +impl From for err::Error { + fn from(e: Error) -> Self { + err::Error::with_msg_no_trace(format!("{e:?}")) + } +} + +pub trait IntoSimplerError { + fn into_simpler(self) -> Error; +} + +impl IntoSimplerError for QueryError { + fn into_simpler(self) -> Error { + let e = self; + match e { + QueryError::DbError(e, msg) => match e { + DbError::Unavailable { .. } => Error::DbUnavailable, + DbError::Overloaded => Error::DbOverload, + DbError::IsBootstrapping => Error::DbUnavailable, + DbError::ReadTimeout { .. } => Error::DbTimeout, + DbError::WriteTimeout { .. } => Error::DbTimeout, + _ => Error::DbError(format!("{e} {msg}")), + }, + QueryError::BadQuery(e) => Error::DbError(e.to_string()), + QueryError::IoError(e) => Error::DbError(e.to_string()), + QueryError::ProtocolError(e) => Error::DbError(e.to_string()), + QueryError::InvalidMessage(e) => Error::DbError(e.to_string()), + QueryError::TimeoutError => Error::DbTimeout, + QueryError::TooManyOrphanedStreamIds(e) => Error::DbError(e.to_string()), + QueryError::UnableToAllocStreamId => Error::DbError(e.to_string()), + } + } +} + +impl From for Error { + fn from(e: T) -> Self { + e.into_simpler() + } +} pub struct ScyInsertFut<'a> { fut: Pin> + Send + 'a>>, @@ -43,7 +92,7 @@ impl<'a> ScyInsertFut<'a> { } impl<'a> Future for ScyInsertFut<'a> { - type Output = Result<(), Error>; + type Output = Result<(), err::Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { use Poll::*; @@ -146,6 +195,15 @@ pub struct IvlItem { pub emd: f32, } +#[derive(Debug)] +pub struct ChannelInfoItem { + pub ts_msp: u32, + pub series: SeriesId, + pub ivl: f32, + pub interest: f32, + pub evsize: u32, +} + #[derive(Debug)] pub enum QueryItem { ConnectionStatus(ConnectionStatusItem), @@ -153,6 +211,7 @@ pub enum QueryItem { Insert(InsertItem), Mute(MuteItem), Ivl(IvlItem), + ChannelInfo(ChannelInfoItem), } pub struct CommonInsertItemQueueSender { @@ -172,7 +231,7 @@ impl CommonInsertItemQueueSender { } pub struct CommonInsertItemQueue { - sender: async_channel::Sender, + sender: TokMx>, recv: async_channel::Receiver, } @@ -180,27 +239,27 @@ impl CommonInsertItemQueue { pub fn new(cap: usize) -> Self { let (tx, rx) = async_channel::bounded(cap); Self { - sender: tx.clone(), + sender: TokMx::new(tx.clone()), recv: rx, } } - pub fn sender(&self) -> CommonInsertItemQueueSender { + pub async fn sender(&self) -> CommonInsertItemQueueSender { CommonInsertItemQueueSender { - sender: self.sender.clone(), + sender: self.sender.lock().await.clone(), } } - pub fn sender_raw(&self) -> async_channel::Sender { - self.sender.clone() + pub async fn sender_raw(&self) -> async_channel::Sender { + self.sender.lock().await.clone() } pub fn receiver(&self) -> async_channel::Receiver { self.recv.clone() } - pub fn sender_count(&self) -> usize { - self.sender.sender_count() + pub async fn sender_count(&self) -> usize { + self.sender.lock().await.sender_count() } pub fn sender_count2(&self) -> usize { @@ -210,6 +269,12 @@ impl CommonInsertItemQueue { pub fn receiver_count(&self) -> usize { self.recv.receiver_count() } + + // TODO should mark this such that a future call to sender() will fail + pub async fn drop_sender(&self) { + let x = std::mem::replace(&mut *self.sender.lock().await, async_channel::bounded(1).0); + drop(x); + } } struct InsParCom { @@ -235,8 +300,18 @@ where par.pulse as i64, val, ); - data_store.scy.execute(qu, params).await.err_conv()?; - Ok(()) + let y = data_store.scy.execute(qu, params).await; + match y { + Ok(_) => Ok(()), + Err(e) => match e { + QueryError::TimeoutError => Err(Error::DbTimeout), + QueryError::DbError(e, msg) => match e { + DbError::Overloaded => Err(Error::DbOverload), + _ => Err(Error::DbError(format!("{e} {msg}"))), + }, + _ => Err(Error::DbError(format!("{e}"))), + }, + } } async fn insert_array_gen( @@ -255,19 +330,15 @@ where par.pulse as i64, val, ); - data_store.scy.execute(qu, params).await.err_conv()?; + data_store.scy.execute(qu, params).await?; Ok(()) } pub async fn insert_item(item: InsertItem, data_store: &DataStore, stats: &CaConnStats) -> Result<(), Error> { if item.msp_bump { let params = (item.series.id() as i64, item.ts_msp as i64); - data_store - .scy - .execute(&data_store.qu_insert_ts_msp, params) - .await - .err_conv()?; - stats.inserts_msp_inc() + data_store.scy.execute(&data_store.qu_insert_ts_msp, params).await?; + stats.inserts_msp_inc(); } if let Some(ts_msp_grid) = item.ts_msp_grid { let params = ( @@ -280,9 +351,8 @@ pub async fn insert_item(item: InsertItem, data_store: &DataStore, stats: &CaCon data_store .scy .execute(&data_store.qu_insert_series_by_ts_msp, params) - .await - .err_conv()?; - stats.inserts_msp_grid_inc() + .await?; + stats.inserts_msp_grid_inc(); } let par = InsParCom { series: item.series.id(), @@ -328,20 +398,15 @@ pub async fn insert_connection_status( let secs = tsunix.as_secs() * netpod::timeunits::SEC; let nanos = tsunix.subsec_nanos() as u64; let ts = secs + nanos; - let div = netpod::timeunits::SEC * 600; - let ts_msp = ts / div * div; + let ts_msp = ts / CONNECTION_STATUS_DIV * CONNECTION_STATUS_DIV; let ts_lsp = ts - ts_msp; let kind = item.status as u32; let addr = format!("{}", item.addr); let params = (ts_msp as i64, ts_lsp as i64, kind as i32, addr); data_store .scy - .query( - "insert into connection_status (ts_msp, ts_lsp, kind, addr) values (?, ?, ?, ?)", - params, - ) - .await - .err_conv()?; + .execute(&data_store.qu_insert_connection_status, params) + .await?; Ok(()) } @@ -354,19 +419,19 @@ pub async fn insert_channel_status( let secs = tsunix.as_secs() * netpod::timeunits::SEC; let nanos = tsunix.subsec_nanos() as u64; let ts = secs + nanos; - let div = netpod::timeunits::DAY; - let ts_msp = ts / div * div; + let ts_msp = ts / CONNECTION_STATUS_DIV * CONNECTION_STATUS_DIV; let ts_lsp = ts - ts_msp; let kind = item.status.kind(); let series = item.series.id(); let params = (series as i64, ts_msp as i64, ts_lsp as i64, kind as i32); data_store .scy - .query( - "insert into channel_status (series, ts_msp, ts_lsp, kind) values (?, ?, ?, ?)", - params, - ) - .await - .err_conv()?; + .execute(&data_store.qu_insert_channel_status, params) + .await?; + let params = (ts_msp as i64, ts_lsp as i64, series as i64, kind as i32); + data_store + .scy + .execute(&data_store.qu_insert_channel_status_by_ts_msp, params) + .await?; Ok(()) } diff --git a/stats/src/stats.rs b/stats/src/stats.rs index 13f7351..af7c236 100644 --- a/stats/src/stats.rs +++ b/stats/src/stats.rs @@ -153,14 +153,19 @@ stats_proc::stats_struct!(( inserts_queue_drop, channel_fast_item_drop, store_worker_item_recv, - store_worker_item_insert, + // TODO rename to make clear that this drop is voluntary because of user config choice: store_worker_item_drop, - store_worker_item_error, + store_worker_insert_done, + store_worker_insert_overload, + store_worker_insert_timeout, + store_worker_insert_unavailable, + store_worker_insert_error, caconn_poll_count, caconn_loop1_count, caconn_loop2_count, caconn_loop3_count, caconn_loop4_count, + caconn_command_can_not_reply, time_handle_conn_listen, time_handle_peer_ready, time_check_channels_state_init,