diff --git a/netfetch/src/ca.rs b/netfetch/src/ca.rs index 67c16f8..74ec170 100644 --- a/netfetch/src/ca.rs +++ b/netfetch/src/ca.rs @@ -1,4 +1,5 @@ pub mod conn; +pub mod connset; pub mod findioc; pub mod proto; pub mod search; @@ -6,28 +7,27 @@ pub mod store; use self::store::DataStore; use crate::ca::conn::ConnCommand; +use crate::ca::connset::CaConnSet; use crate::errconv::ErrConv; use crate::linuxhelper::local_hostname; -use crate::store::{CommonInsertItemQueue, CommonInsertItemQueueSender, IntoSimplerError, QueryItem}; -use async_channel::Sender; -use conn::CaConn; +use crate::metrics::metrics_agg_task; +use crate::rt::JoinHandle; +use crate::store::{CommonInsertItemQueue, IntoSimplerError, QueryItem}; use err::Error; use futures_util::stream::FuturesUnordered; use futures_util::{FutureExt, StreamExt}; use log::*; use netpod::{Database, ScyllaConfig}; use serde::{Deserialize, Serialize}; -use stats::{CaConnStats, CaConnStatsAgg, CaConnStatsAggDiff}; -use std::collections::{BTreeMap, VecDeque}; +use stats::{CaConnStats, CaConnStatsAgg}; +use std::collections::BTreeMap; use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4}; use std::path::PathBuf; use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; -use std::time::Duration; +use std::time::{Duration, Instant}; use tokio::fs::OpenOptions; use tokio::io::AsyncReadExt; -use tokio::sync::Mutex as TokMx; -use tokio::task::JoinHandle; use tokio_postgres::Client as PgClient; pub static SIGINT: AtomicU32 = AtomicU32::new(0); @@ -60,6 +60,7 @@ struct ChannelConfig { insert_item_queue_cap: Option, api_bind: Option, local_epics_hostname: Option, + store_workers_rate: Option, } #[test] @@ -136,6 +137,7 @@ pub async fn parse_config(config: PathBuf) -> Result { insert_item_queue_cap: conf.insert_item_queue_cap.unwrap_or(200000), api_bind: conf.api_bind.unwrap_or_else(|| "0.0.0.0:3011".into()), local_epics_hostname: conf.local_epics_hostname.unwrap_or_else(local_hostname), + store_workers_rate: conf.store_workers_rate.unwrap_or(10000), }) } @@ -156,6 +158,7 @@ pub struct CaConnectOpts { pub insert_item_queue_cap: usize, pub api_bind: String, pub local_epics_hostname: String, + pub store_workers_rate: u64, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -206,6 +209,7 @@ async fn spawn_scylla_insert_workers( insert_worker_count: usize, insert_item_queue: Arc, insert_frac: Arc, + ingest_commons: Arc, pg_client: Arc, store_stats: Arc, ) -> Result>, Error> { @@ -220,17 +224,34 @@ async fn spawn_scylla_insert_workers( let stats = store_stats.clone(); let recv = insert_item_queue.receiver(); let insert_frac = insert_frac.clone(); + let ingest_commons = ingest_commons.clone(); let fut = async move { let backoff_0 = Duration::from_millis(10); let mut backoff = backoff_0.clone(); let mut i1 = 0; - while let Ok(item) = recv.recv().await { + let mut ts_recv_last = Instant::now(); + loop { + let tsnow = Instant::now(); + let dt = tsnow.duration_since(ts_recv_last); + let dt_min = { + let rate = ingest_commons.store_workers_rate.load(Ordering::Acquire); + Duration::from_nanos(insert_worker_count as u64 * 1000000000 / rate) + }; + if dt < dt_min { + tokio::time::sleep_until(ts_recv_last.checked_add(dt_min).unwrap().into()).await; + } + let item = if let Ok(item) = recv.recv().await { + item + } else { + break; + }; + ts_recv_last = Instant::now(); stats.store_worker_item_recv_inc(); match item { QueryItem::ConnectionStatus(item) => { match crate::store::insert_connection_status(item, &data_store, &stats).await { Ok(_) => { - stats.store_worker_insert_done_inc(); + stats.connection_status_insert_done_inc(); backoff = backoff_0; } Err(e) => { @@ -242,7 +263,7 @@ async fn spawn_scylla_insert_workers( QueryItem::ChannelStatus(item) => { match crate::store::insert_channel_status(item, &data_store, &stats).await { Ok(_) => { - stats.store_worker_insert_done_inc(); + stats.channel_status_insert_done_inc(); backoff = backoff_0; } Err(e) => { @@ -280,7 +301,7 @@ async fn spawn_scylla_insert_workers( let qres = data_store.scy.execute(&data_store.qu_insert_muted, values).await; match qres { Ok(_) => { - stats.store_worker_insert_done_inc(); + stats.mute_insert_done_inc(); backoff = backoff_0; } Err(e) => { @@ -304,7 +325,7 @@ async fn spawn_scylla_insert_workers( .await; match qres { Ok(_) => { - stats.store_worker_insert_done_inc(); + stats.ivl_insert_done_inc(); backoff = backoff_0; } Err(e) => { @@ -316,6 +337,7 @@ async fn spawn_scylla_insert_workers( } QueryItem::ChannelInfo(item) => { let params = ( + (item.series.id() & 0xff) as i32, item.ts_msp as i32, item.series.id() as i64, item.ivl, @@ -325,7 +347,7 @@ async fn spawn_scylla_insert_workers( let qres = data_store.scy.execute(&data_store.qu_insert_channel_ping, params).await; match qres { Ok(_) => { - stats.store_worker_insert_done_inc(); + stats.channel_info_insert_done_inc(); backoff = backoff_0; } Err(e) => { @@ -345,273 +367,6 @@ async fn spawn_scylla_insert_workers( Ok(jhs) } -pub struct CommandQueueSet { - queues: tokio::sync::Mutex>>, -} - -impl CommandQueueSet { - pub fn new() -> Self { - Self { - queues: tokio::sync::Mutex::new(BTreeMap::>::new()), - } - } - - pub async fn queues(&self) -> &tokio::sync::Mutex>> { - &self.queues - } - - pub async fn queues_locked(&self) -> tokio::sync::MutexGuard>> { - let mut g = self.queues.lock().await; - let mut rm = Vec::new(); - for (k, v) in g.iter() { - if v.is_closed() { - rm.push(*k); - } - } - for x in rm { - g.remove(&x); - } - g - } -} - -struct CaConnRess { - sender: Sender, - stats: Arc, - jh: JoinHandle>, -} - -// TODO -// Resources belonging to the same CaConn also belong together here. -// Only add or remove them from the set at once. -// That means, they should go together. -// Does not hold the actual CaConn, because that struct is in a task. -// Always create the CaConn via a common code path which also takes care -// to add it to the correct list. -// There, make spawning part of this function? -pub struct CaConnSet { - ca_conn_ress: TokMx>, -} - -impl CaConnSet { - pub fn new() -> Self { - Self { - ca_conn_ress: Default::default(), - } - } - - pub async fn create_ca_conn( - &self, - addr: SocketAddrV4, - local_epics_hostname: String, - array_truncate: usize, - insert_queue_max: usize, - insert_item_queue_sender: CommonInsertItemQueueSender, - data_store: Arc, - ingest_commons: Arc, - with_channels: Vec, - ) -> Result<(), Error> { - info!("create new CaConn {:?}", addr); - let addr2 = SocketAddr::V4(addr.clone()); - let mut conn = CaConn::new( - addr, - local_epics_hostname, - data_store.clone(), - insert_item_queue_sender, - array_truncate, - insert_queue_max, - ingest_commons, - ); - for ch in with_channels { - conn.channel_add(ch); - } - let conn = conn; - let conn_tx = conn.conn_command_tx(); - let conn_stats = conn.stats(); - let conn_fut = async move { - let stats = conn.stats(); - let mut conn = conn; - while let Some(item) = conn.next().await { - match item { - Ok(_) => { - stats.conn_item_count_inc(); - } - Err(e) => { - error!("CaConn gives error: {e:?}"); - break; - } - } - } - Ok::<_, Error>(()) - }; - let jh = tokio::spawn(conn_fut); - let ca_conn_ress = CaConnRess { - sender: conn_tx, - stats: conn_stats, - jh, - }; - self.ca_conn_ress.lock().await.insert(addr2, ca_conn_ress); - Ok(()) - } - - pub async fn send_command_to_all(&self, cmdgen: F) -> Result, Error> - where - F: Fn() -> (ConnCommand, async_channel::Receiver), - { - //let it = self.ca_conn_ress.iter().map(|x| x); - //Self::send_command_inner(it, move || cmd.clone()); - let mut rxs = Vec::new(); - for (_addr, ress) in &*self.ca_conn_ress.lock().await { - let (cmd, rx) = cmdgen(); - match ress.sender.send(cmd).await { - Ok(()) => { - rxs.push(rx); - } - Err(e) => { - error!("can not send command {e:?}"); - } - } - } - let mut res = Vec::new(); - for rx in rxs { - let x = rx.recv().await?; - res.push(x); - } - Ok(res) - } - - pub async fn send_command_to_addr(&self, addr: &SocketAddr, cmdgen: F) -> Result - where - F: Fn() -> (ConnCommand, async_channel::Receiver), - { - if let Some(ress) = self.ca_conn_ress.lock().await.get(addr) { - let (cmd, rx) = cmdgen(); - ress.sender.send(cmd).await.err_conv()?; - let ret = rx.recv().await.err_conv()?; - Ok(ret) - } else { - Err(Error::with_msg_no_trace(format!("addr not found"))) - } - } - - #[allow(unused)] - async fn send_command_inner<'a, IT, F, R>(it: &mut IT, cmdgen: F) -> Vec> - where - IT: Iterator)>, - F: Fn() -> (ConnCommand, async_channel::Receiver), - { - let mut rxs = Vec::new(); - for (_, tx) in it { - let (cmd, rx) = cmdgen(); - match tx.send(cmd).await { - Ok(()) => { - rxs.push(rx); - } - Err(e) => { - error!("can not send command {e:?}"); - } - } - } - rxs - } - - pub async fn send_stop(&self) -> Result<(), Error> { - self.send_command_to_all(|| ConnCommand::shutdown()).await?; - Ok(()) - } - - pub async fn wait_stopped(&self) -> Result<(), Error> { - let mut g = self.ca_conn_ress.lock().await; - let mm = std::mem::replace(&mut *g, BTreeMap::new()); - let mut jhs: VecDeque<_> = VecDeque::new(); - for t in mm { - jhs.push_back(t.1.jh.fuse()); - } - loop { - let mut jh = if let Some(x) = jhs.pop_front() { - x - } else { - break; - }; - futures_util::select! { - a = jh => match a { - Ok(k) => match k { - Ok(_) => {} - Err(e) => { - error!("{e:?}"); - } - }, - Err(e) => { - error!("{e:?}"); - } - }, - _b = tokio::time::sleep(Duration::from_millis(1000)).fuse() => { - jhs.push_back(jh); - info!("waiting for {} connections", jhs.len()); - } - }; - } - Ok(()) - } - - pub async fn add_channel_to_addr( - &self, - addr: SocketAddr, - channel_name: String, - ingest_commons: Arc, - ) -> Result<(), Error> { - let g = self.ca_conn_ress.lock().await; - match g.get(&addr) { - Some(ca_conn) => { - //info!("try to add to existing... {addr} {channel_name}"); - let (cmd, rx) = ConnCommand::channel_add(channel_name); - ca_conn.sender.send(cmd).await.err_conv()?; - let a = rx.recv().await.err_conv()?; - if a { - Ok(()) - } else { - Err(Error::with_msg_no_trace(format!("channel add failed"))) - } - } - None => { - //info!("create new {addr} {channel_name}"); - drop(g); - let addr = if let SocketAddr::V4(x) = addr { - x - } else { - return Err(Error::with_msg_no_trace(format!("only ipv4 supported for IOC"))); - }; - // TODO use parameters: - self.create_ca_conn( - addr, - ingest_commons.local_epics_hostname.clone(), - 512, - 200, - ingest_commons.insert_item_queue.sender().await, - ingest_commons.data_store.clone(), - ingest_commons.clone(), - vec![channel_name], - ) - .await?; - Ok(()) - } - } - } - - pub async fn has_addr(&self, addr: &SocketAddr) -> bool { - // TODO only used to check on add-channel whether we want to add channel to conn, or create new conn. - // TODO must do that atomic. - self.ca_conn_ress.lock().await.contains_key(addr) - } - - pub async fn addr_nth_mod(&self, n: usize) -> Option { - let g = self.ca_conn_ress.lock().await; - let u = g.len(); - let n = n % u; - g.keys().take(n).last().map(Clone::clone) - } -} - pub struct IngestCommons { pub pgconf: Arc, pub local_epics_hostname: String, @@ -620,6 +375,7 @@ pub struct IngestCommons { pub insert_frac: Arc, pub insert_ivl_min: Arc, pub extra_inserts_conf: Mutex, + pub store_workers_rate: AtomicU64, pub ca_conn_set: CaConnSet, } @@ -714,33 +470,6 @@ async fn query_addr_multiple(pg_client: &PgClient) -> Result<(), Error> { Ok(()) } -async fn metrics_agg_task( - ingest_commons: Arc, - local_stats: Arc, - store_stats: Arc, -) -> Result<(), Error> { - let mut agg_last = CaConnStatsAgg::new(); - loop { - tokio::time::sleep(Duration::from_millis(671)).await; - let agg = CaConnStatsAgg::new(); - agg.push(&local_stats); - agg.push(&store_stats); - { - let conn_stats_guard = ingest_commons.ca_conn_set.ca_conn_ress.lock().await; - for (_, g) in conn_stats_guard.iter() { - agg.push(&g.stats); - } - } - let mut m = METRICS.lock().unwrap(); - *m = Some(agg.clone()); - if false { - let diff = CaConnStatsAggDiff::diff_from(&agg_last, &agg); - info!("{}", diff.display()); - } - agg_last = agg; - } -} - pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { crate::linuxhelper::set_signal_handler()?; let insert_frac = Arc::new(AtomicU64::new(1000)); @@ -791,6 +520,20 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { let data_store = Arc::new(DataStore::new(&scyconf, pg_client.clone()).await?); let insert_item_queue = CommonInsertItemQueue::new(opts.insert_item_queue_cap); let insert_item_queue = Arc::new(insert_item_queue); + + let ingest_commons = IngestCommons { + pgconf: Arc::new(pgconf.clone()), + local_epics_hostname: opts.local_epics_hostname.clone(), + insert_item_queue: insert_item_queue.clone(), + data_store: data_store.clone(), + insert_ivl_min: insert_ivl_min.clone(), + insert_frac: insert_frac.clone(), + extra_inserts_conf, + store_workers_rate: AtomicU64::new(opts.store_workers_rate), + ca_conn_set: CaConnSet::new(), + }; + let ingest_commons = Arc::new(ingest_commons); + // TODO use a new stats type: let store_stats = Arc::new(CaConnStats::new()); let jh_insert_workers = spawn_scylla_insert_workers( @@ -799,23 +542,12 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { opts.insert_worker_count, insert_item_queue.clone(), insert_frac.clone(), + ingest_commons.clone(), pg_client.clone(), store_stats.clone(), ) .await?; - let ingest_commons = IngestCommons { - pgconf: Arc::new(pgconf.clone()), - local_epics_hostname: opts.local_epics_hostname.clone(), - insert_item_queue: insert_item_queue.clone(), - data_store: data_store.clone(), - insert_ivl_min: insert_ivl_min.clone(), - insert_frac, - extra_inserts_conf, - ca_conn_set: CaConnSet::new(), - }; - let ingest_commons = Arc::new(ingest_commons); - if true { tokio::spawn(crate::metrics::start_metrics_service( opts.api_bind.clone(), @@ -858,18 +590,24 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { if SIGINT.load(Ordering::Acquire) != 0 { break; } - let addr = ingest_commons.ca_conn_set.addr_nth_mod(iper).await; - if let Some(addr) = addr { - fn cmdgen() -> (ConnCommand, async_channel::Receiver) { - ConnCommand::check_channels_alive() + // TODO remove magic number, make adaptive: + if ingest_commons.insert_item_queue.receiver().len() < 10000 { + let addr = ingest_commons.ca_conn_set.addr_nth_mod(iper).await; + if let Some(addr) = addr { + //info!("channel info for addr {addr}"); + fn cmdgen() -> (ConnCommand, async_channel::Receiver) { + ConnCommand::check_channels_alive() + } + // TODO race between getting nth address and command send, so ignore error so far. + let _res = ingest_commons.ca_conn_set.send_command_to_addr(&addr, cmdgen).await; + let cmdgen = || ConnCommand::save_conn_info(); + // TODO race between getting nth address and command send, so ignore error so far. + let _res = ingest_commons.ca_conn_set.send_command_to_addr(&addr, cmdgen).await; + } else { + //info!("nothing to save iper {iper}"); } - // TODO race between getting nth address and command send, so ignore error so far. - let _res = ingest_commons.ca_conn_set.send_command_to_addr(&addr, cmdgen).await; - let cmdgen = || ConnCommand::save_conn_info(); - // TODO race between getting nth address and command send, so ignore error so far. - let _res = ingest_commons.ca_conn_set.send_command_to_addr(&addr, cmdgen).await; + iper += 1; } - iper += 1; tokio::time::sleep(Duration::from_millis(10)).await; } diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index c2b44ce..5266365 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -249,7 +249,7 @@ impl SubidStore { fn info_store_msp_from_time(ts: SystemTime) -> u32 { let dt = ts.duration_since(SystemTime::UNIX_EPOCH).unwrap_or(Duration::ZERO); - (dt.as_secs() / MIN * MIN / SEC) as u32 + (dt.as_secs() / 60 * 60) as u32 } #[derive(Debug)] @@ -635,7 +635,7 @@ impl CaConn { self.insert_item_queue.push_back(item); } else { if warn_max < 10 { - warn!("no series for cid {:?}", st.cid); + debug!("no series for cid {:?}", st.cid); warn_max += 1; } } diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs new file mode 100644 index 0000000..2d8344d --- /dev/null +++ b/netfetch/src/ca/connset.rs @@ -0,0 +1,293 @@ +use super::conn::ConnCommand; +use super::store::DataStore; +use super::IngestCommons; +use crate::ca::conn::CaConn; +use crate::errconv::ErrConv; +use crate::rt::{JoinHandle, TokMx}; +use crate::store::CommonInsertItemQueueSender; +use async_channel::Sender; +use err::Error; +use futures_util::{FutureExt, StreamExt}; +use netpod::log::*; +use stats::CaConnStats; +use std::collections::{BTreeMap, VecDeque}; +use std::net::{SocketAddr, SocketAddrV4}; +use std::sync::Arc; +use std::time::Duration; + +pub struct CommandQueueSet { + queues: TokMx>>, +} + +impl CommandQueueSet { + pub fn new() -> Self { + Self { + queues: TokMx::new(BTreeMap::>::new()), + } + } + + pub async fn queues(&self) -> &TokMx>> { + &self.queues + } + + pub async fn queues_locked(&self) -> tokio::sync::MutexGuard>> { + let mut g = self.queues.lock().await; + let mut rm = Vec::new(); + for (k, v) in g.iter() { + if v.is_closed() { + rm.push(*k); + } + } + for x in rm { + g.remove(&x); + } + g + } +} + +pub struct CaConnRess { + sender: Sender, + stats: Arc, + jh: JoinHandle>, +} + +impl CaConnRess { + pub fn stats(&self) -> &Arc { + &self.stats + } +} + +// TODO +// Resources belonging to the same CaConn also belong together here. +// Only add or remove them from the set at once. +// That means, they should go together. +// Does not hold the actual CaConn, because that struct is in a task. +// Always create the CaConn via a common code path which also takes care +// to add it to the correct list. +// There, make spawning part of this function? +pub struct CaConnSet { + ca_conn_ress: TokMx>, +} + +impl CaConnSet { + pub fn new() -> Self { + Self { + ca_conn_ress: Default::default(), + } + } + + pub fn ca_conn_ress(&self) -> &TokMx> { + &self.ca_conn_ress + } + + pub async fn create_ca_conn( + &self, + addr: SocketAddrV4, + local_epics_hostname: String, + array_truncate: usize, + insert_queue_max: usize, + insert_item_queue_sender: CommonInsertItemQueueSender, + data_store: Arc, + ingest_commons: Arc, + with_channels: Vec, + ) -> Result<(), Error> { + info!("create new CaConn {:?}", addr); + let addr2 = SocketAddr::V4(addr.clone()); + let mut conn = CaConn::new( + addr, + local_epics_hostname, + data_store.clone(), + insert_item_queue_sender, + array_truncate, + insert_queue_max, + ingest_commons, + ); + for ch in with_channels { + conn.channel_add(ch); + } + let conn = conn; + let conn_tx = conn.conn_command_tx(); + let conn_stats = conn.stats(); + let conn_fut = async move { + let stats = conn.stats(); + let mut conn = conn; + while let Some(item) = conn.next().await { + match item { + Ok(_) => { + stats.conn_item_count_inc(); + } + Err(e) => { + error!("CaConn gives error: {e:?}"); + break; + } + } + } + Ok::<_, Error>(()) + }; + let jh = tokio::spawn(conn_fut); + let ca_conn_ress = CaConnRess { + sender: conn_tx, + stats: conn_stats, + jh, + }; + self.ca_conn_ress.lock().await.insert(addr2, ca_conn_ress); + Ok(()) + } + + pub async fn send_command_to_all(&self, cmdgen: F) -> Result, Error> + where + F: Fn() -> (ConnCommand, async_channel::Receiver), + { + //let it = self.ca_conn_ress.iter().map(|x| x); + //Self::send_command_inner(it, move || cmd.clone()); + let mut rxs = Vec::new(); + for (_addr, ress) in &*self.ca_conn_ress.lock().await { + let (cmd, rx) = cmdgen(); + match ress.sender.send(cmd).await { + Ok(()) => { + rxs.push(rx); + } + Err(e) => { + error!("can not send command {e:?}"); + } + } + } + let mut res = Vec::new(); + for rx in rxs { + let x = rx.recv().await?; + res.push(x); + } + Ok(res) + } + + pub async fn send_command_to_addr(&self, addr: &SocketAddr, cmdgen: F) -> Result + where + F: Fn() -> (ConnCommand, async_channel::Receiver), + { + if let Some(ress) = self.ca_conn_ress.lock().await.get(addr) { + let (cmd, rx) = cmdgen(); + ress.sender.send(cmd).await.err_conv()?; + let ret = rx.recv().await.err_conv()?; + Ok(ret) + } else { + Err(Error::with_msg_no_trace(format!("addr not found"))) + } + } + + #[allow(unused)] + async fn send_command_inner<'a, IT, F, R>(it: &mut IT, cmdgen: F) -> Vec> + where + IT: Iterator)>, + F: Fn() -> (ConnCommand, async_channel::Receiver), + { + let mut rxs = Vec::new(); + for (_, tx) in it { + let (cmd, rx) = cmdgen(); + match tx.send(cmd).await { + Ok(()) => { + rxs.push(rx); + } + Err(e) => { + error!("can not send command {e:?}"); + } + } + } + rxs + } + + pub async fn send_stop(&self) -> Result<(), Error> { + self.send_command_to_all(|| ConnCommand::shutdown()).await?; + Ok(()) + } + + pub async fn wait_stopped(&self) -> Result<(), Error> { + let mut g = self.ca_conn_ress.lock().await; + let mm = std::mem::replace(&mut *g, BTreeMap::new()); + let mut jhs: VecDeque<_> = VecDeque::new(); + for t in mm { + jhs.push_back(t.1.jh.fuse()); + } + loop { + let mut jh = if let Some(x) = jhs.pop_front() { + x + } else { + break; + }; + futures_util::select! { + a = jh => match a { + Ok(k) => match k { + Ok(_) => {} + Err(e) => { + error!("{e:?}"); + } + }, + Err(e) => { + error!("{e:?}"); + } + }, + _b = crate::rt::sleep(Duration::from_millis(1000)).fuse() => { + jhs.push_back(jh); + info!("waiting for {} connections", jhs.len()); + } + }; + } + Ok(()) + } + + pub async fn add_channel_to_addr( + &self, + addr: SocketAddr, + channel_name: String, + ingest_commons: Arc, + ) -> Result<(), Error> { + let g = self.ca_conn_ress.lock().await; + match g.get(&addr) { + Some(ca_conn) => { + //info!("try to add to existing... {addr} {channel_name}"); + let (cmd, rx) = ConnCommand::channel_add(channel_name); + ca_conn.sender.send(cmd).await.err_conv()?; + let a = rx.recv().await.err_conv()?; + if a { + Ok(()) + } else { + Err(Error::with_msg_no_trace(format!("channel add failed"))) + } + } + None => { + //info!("create new {addr} {channel_name}"); + drop(g); + let addr = if let SocketAddr::V4(x) = addr { + x + } else { + return Err(Error::with_msg_no_trace(format!("only ipv4 supported for IOC"))); + }; + // TODO use parameters: + self.create_ca_conn( + addr, + ingest_commons.local_epics_hostname.clone(), + 512, + 200, + ingest_commons.insert_item_queue.sender().await, + ingest_commons.data_store.clone(), + ingest_commons.clone(), + vec![channel_name], + ) + .await?; + Ok(()) + } + } + } + + pub async fn has_addr(&self, addr: &SocketAddr) -> bool { + // TODO only used to check on add-channel whether we want to add channel to conn, or create new conn. + // TODO must do that atomic. + self.ca_conn_ress.lock().await.contains_key(addr) + } + + pub async fn addr_nth_mod(&self, n: usize) -> Option { + let g = self.ca_conn_ress.lock().await; + let u = g.len(); + let n = n % u; + g.keys().take(n).last().map(Clone::clone) + } +} diff --git a/netfetch/src/ca/store.rs b/netfetch/src/ca/store.rs index 1663905..06a3b2e 100644 --- a/netfetch/src/ca/store.rs +++ b/netfetch/src/ca/store.rs @@ -171,7 +171,7 @@ impl DataStore { .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; let qu_insert_channel_status_by_ts_msp = Arc::new(q); let q = scy - .prepare("insert into channel_ping (ts_msp, series, ivl, interest, evsize) values (?, ?, ?, ?, ?)") + .prepare("insert into channel_ping (part, ts_msp, series, ivl, interest, evsize) values (?, ?, ?, ?, ?, ?)") .await .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; let qu_insert_channel_ping = Arc::new(q); diff --git a/netfetch/src/metrics.rs b/netfetch/src/metrics.rs index ad880eb..2db0729 100644 --- a/netfetch/src/metrics.rs +++ b/netfetch/src/metrics.rs @@ -1,39 +1,20 @@ use crate::ca::conn::ConnCommand; -use crate::ca::{ExtraInsertsConf, IngestCommons}; +use crate::ca::{ExtraInsertsConf, IngestCommons, METRICS}; use axum::extract::Query; use err::Error; use http::request::Parts; use log::*; +use stats::{CaConnStats, CaConnStatsAgg, CaConnStatsAggDiff}; use std::collections::HashMap; use std::net::{SocketAddr, SocketAddrV4}; use std::sync::atomic::Ordering; use std::sync::Arc; +use std::time::Duration; async fn get_empty() -> String { String::new() } -#[allow(unused)] -async fn send_command<'a, IT, F, R>(it: &mut IT, cmdgen: F) -> Vec> -where - IT: Iterator)>, - F: Fn() -> (ConnCommand, async_channel::Receiver), -{ - let mut rxs = Vec::new(); - for (_, tx) in it { - let (cmd, rx) = cmdgen(); - match tx.send(cmd).await { - Ok(()) => { - rxs.push(rx); - } - Err(e) => { - error!("can not send command {e:?}"); - } - } - } - rxs -} - async fn find_channel( params: HashMap, ingest_commons: Arc, @@ -276,3 +257,34 @@ pub async fn start_metrics_service(bind_to: String, ingest_commons: Arc, + local_stats: Arc, + store_stats: Arc, +) -> Result<(), Error> { + let mut agg_last = CaConnStatsAgg::new(); + loop { + tokio::time::sleep(Duration::from_millis(671)).await; + let agg = CaConnStatsAgg::new(); + agg.push(&local_stats); + agg.push(&store_stats); + { + let conn_stats_guard = ingest_commons.ca_conn_set.ca_conn_ress().lock().await; + for (_, g) in conn_stats_guard.iter() { + agg.push(g.stats()); + } + } + { + let val = ingest_commons.insert_item_queue.receiver().len() as u64; + agg.store_worker_recv_queue_len.store(val, Ordering::Release); + } + let mut m = METRICS.lock().unwrap(); + *m = Some(agg.clone()); + if false { + let diff = CaConnStatsAggDiff::diff_from(&agg_last, &agg); + info!("{}", diff.display()); + } + agg_last = agg; + } +} diff --git a/netfetch/src/netfetch.rs b/netfetch/src/netfetch.rs index ed0b429..d883690 100644 --- a/netfetch/src/netfetch.rs +++ b/netfetch/src/netfetch.rs @@ -5,6 +5,7 @@ pub mod errconv; pub mod linuxhelper; pub mod metrics; pub mod netbuf; +pub mod rt; pub mod series; pub mod store; #[cfg(test)] diff --git a/netfetch/src/rt.rs b/netfetch/src/rt.rs new file mode 100644 index 0000000..df3ff9e --- /dev/null +++ b/netfetch/src/rt.rs @@ -0,0 +1,3 @@ +pub use tokio::sync::Mutex as TokMx; +pub use tokio::task::JoinHandle; +pub use tokio::time::sleep; diff --git a/netfetch/src/zmtp.rs b/netfetch/src/zmtp.rs index 0101e67..48b29e5 100644 --- a/netfetch/src/zmtp.rs +++ b/netfetch/src/zmtp.rs @@ -441,6 +441,9 @@ pub async fn zmtp_client(opts: ZmtpClientOpts) -> Result<(), Error> { .await .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; let scy = Arc::new(scy); + + error!("TODO redo the pulse mapping"); + err::todo(); let qu1 = scy .prepare("insert into pulse_pkey (a, pulse_a) values (?, ?)") .await diff --git a/stats/src/stats.rs b/stats/src/stats.rs index af7c236..e42bc7f 100644 --- a/stats/src/stats.rs +++ b/stats/src/stats.rs @@ -152,6 +152,7 @@ stats_proc::stats_struct!(( inserts_queue_push, inserts_queue_drop, channel_fast_item_drop, + store_worker_recv_queue_len, store_worker_item_recv, // TODO rename to make clear that this drop is voluntary because of user config choice: store_worker_item_drop, @@ -160,6 +161,11 @@ stats_proc::stats_struct!(( store_worker_insert_timeout, store_worker_insert_unavailable, store_worker_insert_error, + connection_status_insert_done, + channel_status_insert_done, + channel_info_insert_done, + ivl_insert_done, + mute_insert_done, caconn_poll_count, caconn_loop1_count, caconn_loop2_count,