diff --git a/netfetch/src/ca.rs b/netfetch/src/ca.rs index d482b90..158ebb4 100644 --- a/netfetch/src/ca.rs +++ b/netfetch/src/ca.rs @@ -8,11 +8,10 @@ use self::store::DataStore; use crate::ca::conn::ConnCommand; use crate::errconv::ErrConv; use crate::linuxhelper::local_hostname; -use crate::store::{CommonInsertItemQueue, QueryItem}; +use crate::store::{CommonInsertItemQueue, CommonInsertItemQueueSender, QueryItem}; use async_channel::Sender; use conn::CaConn; use err::Error; -use futures_util::future::Fuse; use futures_util::stream::FuturesUnordered; use futures_util::{FutureExt, StreamExt}; use log::*; @@ -20,7 +19,7 @@ use netpod::{Database, ScyllaConfig}; use serde::{Deserialize, Serialize}; use stats::{CaConnStats, CaConnStatsAgg, CaConnStatsAggDiff}; use std::collections::{BTreeMap, VecDeque}; -use std::net::{IpAddr, Ipv4Addr, SocketAddrV4}; +use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4}; use std::path::PathBuf; use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; @@ -339,6 +338,241 @@ impl CommandQueueSet { } } +struct CaConnRess { + sender: Sender, + stats: Arc, + jh: JoinHandle>, +} + +// TODO +// Resources belonging to the same CaConn also belong together here. +// Only add or remove them from the set at once. +// That means, they should go together. +// Does not hold the actual CaConn, because that struct is in a task. +// Always create the CaConn via a common code path which also takes care +// to add it to the correct list. +// There, make spawning part of this function? +pub struct CaConnSet { + ca_conn_ress: TokMx>, +} + +impl CaConnSet { + pub fn new() -> Self { + Self { + ca_conn_ress: Default::default(), + } + } + + pub async fn create_ca_conn( + &self, + addr: SocketAddrV4, + local_epics_hostname: String, + array_truncate: usize, + insert_queue_max: usize, + insert_item_queue_sender: CommonInsertItemQueueSender, + data_store: Arc, + ingest_commons: Arc, + with_channels: Vec, + ) -> Result<(), Error> { + info!("create new CaConn {:?}", addr); + let addr2 = SocketAddr::V4(addr.clone()); + let mut conn = CaConn::new( + addr, + local_epics_hostname, + data_store.clone(), + insert_item_queue_sender, + array_truncate, + insert_queue_max, + ingest_commons, + ); + for ch in with_channels { + conn.channel_add(ch); + } + let conn = conn; + let conn_tx = conn.conn_command_tx(); + let conn_stats = conn.stats(); + let conn_fut = async move { + let stats = conn.stats(); + let mut conn = conn; + while let Some(item) = conn.next().await { + match item { + Ok(_) => { + stats.conn_item_count_inc(); + } + Err(e) => { + error!("CaConn gives error: {e:?}"); + break; + } + } + } + Ok::<_, Error>(()) + }; + let jh = tokio::spawn(conn_fut); + let ca_conn_ress = CaConnRess { + sender: conn_tx, + stats: conn_stats, + jh, + }; + self.ca_conn_ress.lock().await.insert(addr2, ca_conn_ress); + Ok(()) + } + + pub async fn send_command_to_all(&self, cmdgen: F) -> Result, Error> + where + F: Fn() -> (ConnCommand, async_channel::Receiver), + { + //let it = self.ca_conn_ress.iter().map(|x| x); + //Self::send_command_inner(it, move || cmd.clone()); + let mut rxs = Vec::new(); + for (_addr, ress) in &*self.ca_conn_ress.lock().await { + let (cmd, rx) = cmdgen(); + match ress.sender.send(cmd).await { + Ok(()) => { + rxs.push(rx); + } + Err(e) => { + error!("can not send command {e:?}"); + } + } + } + let mut res = Vec::new(); + for rx in rxs { + let x = rx.recv().await?; + res.push(x); + } + Ok(res) + } + + pub async fn send_command_to_addr(&self, addr: &SocketAddr, cmdgen: F) -> Result + where + F: Fn() -> (ConnCommand, async_channel::Receiver), + { + if let Some(ress) = self.ca_conn_ress.lock().await.get(addr) { + let (cmd, rx) = cmdgen(); + ress.sender.send(cmd).await.err_conv()?; + let ret = rx.recv().await.err_conv()?; + Ok(ret) + } else { + Err(Error::with_msg_no_trace(format!("addr not found"))) + } + } + + #[allow(unused)] + async fn send_command_inner<'a, IT, F, R>(it: &mut IT, cmdgen: F) -> Vec> + where + IT: Iterator)>, + F: Fn() -> (ConnCommand, async_channel::Receiver), + { + let mut rxs = Vec::new(); + for (_, tx) in it { + let (cmd, rx) = cmdgen(); + match tx.send(cmd).await { + Ok(()) => { + rxs.push(rx); + } + Err(e) => { + error!("can not send command {e:?}"); + } + } + } + rxs + } + + pub async fn send_stop(&self) -> Result<(), Error> { + self.send_command_to_all(|| ConnCommand::shutdown()).await?; + Ok(()) + } + + pub async fn wait_stopped(&self) -> Result<(), Error> { + let mut g = self.ca_conn_ress.lock().await; + let mm = std::mem::replace(&mut *g, BTreeMap::new()); + let mut jhs: VecDeque<_> = VecDeque::new(); + for t in mm { + jhs.push_back(t.1.jh.fuse()); + } + loop { + let mut jh = if let Some(x) = jhs.pop_front() { + x + } else { + break; + }; + futures_util::select! { + a = jh => match a { + Ok(k) => match k { + Ok(_) => {} + Err(e) => { + error!("{e:?}"); + } + }, + Err(e) => { + error!("{e:?}"); + } + }, + _b = tokio::time::sleep(Duration::from_millis(1000)).fuse() => { + jhs.push_back(jh); + info!("waiting for {} connections", jhs.len()); + } + }; + } + err::todoval() + } + + pub async fn add_channel_to_addr( + &self, + addr: SocketAddr, + channel_name: String, + ingest_commons: Arc, + ) -> Result<(), Error> { + let g = self.ca_conn_ress.lock().await; + match g.get(&addr) { + Some(ca_conn) => { + let (cmd, rx) = ConnCommand::channel_add(channel_name); + ca_conn.sender.send(cmd).await.err_conv()?; + let a = rx.recv().await.err_conv()?; + if a { + Ok(()) + } else { + Err(Error::with_msg_no_trace(format!("channel add failed"))) + } + } + None => { + let addr = if let SocketAddr::V4(x) = addr { + x + } else { + return Err(Error::with_msg_no_trace(format!("only ipv4 supported for IOC"))); + }; + // TODO use parameters: + self.create_ca_conn( + addr, + ingest_commons.local_epics_hostname.clone(), + 512, + 200, + ingest_commons.insert_item_queue.sender(), + ingest_commons.data_store.clone(), + ingest_commons.clone(), + vec![channel_name], + ) + .await?; + Ok(()) + } + } + } + + pub async fn has_addr(&self, addr: &SocketAddr) -> bool { + // TODO only used to check on add-channel whether we want to add channel to conn, or create new conn. + // TODO must do that atomic. + self.ca_conn_ress.lock().await.contains_key(addr) + } + + pub async fn add_channel_or_create_conn() -> Result<(), Error> { + // TODO fix race: + // Must not drop mutex in-between calls. + // Pass mutex on? + + Ok(()) + } +} + pub struct IngestCommons { pub pgconf: Arc, pub local_epics_hostname: String, @@ -347,8 +581,7 @@ pub struct IngestCommons { pub insert_frac: Arc, pub insert_ivl_min: Arc, pub extra_inserts_conf: Mutex, - pub conn_stats: Arc>>>, - pub command_queue_set: Arc, + pub ca_conn_set: CaConnSet, } pub async fn find_channel_addr( @@ -365,7 +598,10 @@ pub async fn find_channel_addr( .await .unwrap(); // TODO allow clean shutdown on ctrl-c and join the pg_conn in the end: - tokio::spawn(pg_conn); + tokio::spawn(async { + pg_conn.await.unwrap(); + info!("drop pg conn after find_channel_addr"); + }); let pg_client = Arc::new(pg_client); let qu_find_addr = pg_client .prepare( @@ -396,54 +632,74 @@ pub async fn find_channel_addr( } } -pub async fn create_ca_conn( - addr: SocketAddrV4, - local_epics_hostname: String, - array_truncate: usize, - insert_queue_max: usize, - insert_item_queue: Arc, - data_store: Arc, - insert_ivl_min: Arc, - conn_stats: Arc>>>, - command_queue_set: Arc, - ingest_commons: Arc, -) -> Result>, Error> { - info!("create new CaConn {:?}", addr); - let data_store = data_store.clone(); - let conn = CaConn::new( - addr, - local_epics_hostname, - data_store.clone(), - insert_item_queue.sender(), - array_truncate, - insert_queue_max, - insert_ivl_min.clone(), - ingest_commons.clone(), - ); - conn_stats.lock().await.push(conn.stats()); - let stats2 = conn.stats(); - let conn_command_tx = conn.conn_command_tx(); - { - //command_queue_set.queues().lock().await.insert(addr, conn_command_tx); - command_queue_set.queues_locked().await.insert(addr, conn_command_tx); +#[allow(unused)] +async fn query_addr_multiple(pg_client: &PgClient) -> Result<(), Error> { + let backend: &String = err::todoval(); + // TODO factor the find loop into a separate Stream. + let qu_find_addr = pg_client + .prepare("with q1 as (select t1.facility, t1.channel, t1.addr from ioc_by_channel t1 where t1.facility = $1 and t1.channel in ($2, $3, $4, $5, $6, $7, $8, $9) and t1.addr != '' order by t1.tsmod desc) select distinct on (q1.facility, q1.channel) q1.facility, q1.channel, q1.addr from q1") + .await + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + let mut chns_todo: &[String] = err::todoval(); + let mut chstmp = ["__NONE__"; 8]; + for (s1, s2) in chns_todo.iter().zip(chstmp.iter_mut()) { + *s2 = s1; } - let conn_block = async move { - let mut conn = conn; - while let Some(item) = conn.next().await { - match item { - Ok(_) => { - stats2.conn_item_count_inc(); - } + chns_todo = &chns_todo[chstmp.len().min(chns_todo.len())..]; + let rows = pg_client + .query( + &qu_find_addr, + &[ + &backend, &chstmp[0], &chstmp[1], &chstmp[2], &chstmp[3], &chstmp[4], &chstmp[5], &chstmp[6], + &chstmp[7], + ], + ) + .await + .map_err(|e| Error::with_msg_no_trace(format!("pg lookup error: {e:?}")))?; + for row in rows { + let ch: &str = row.get(1); + let addr: &str = row.get(2); + if addr == "" { + // TODO the address was searched before but could not be found. + } else { + let addr: SocketAddrV4 = match addr.parse() { + Ok(k) => k, Err(e) => { - error!("CaConn gives error: {e:?}"); - break; + error!("can not parse {addr:?} for channel {ch:?} {e:?}"); + continue; } + }; + let _ = addr; + } + } + Ok(()) +} + +async fn metrics_agg_task( + ingest_commons: Arc, + local_stats: Arc, + store_stats: Arc, +) -> Result<(), Error> { + let mut agg_last = CaConnStatsAgg::new(); + loop { + tokio::time::sleep(Duration::from_millis(671)).await; + let agg = CaConnStatsAgg::new(); + agg.push(&local_stats); + agg.push(&store_stats); + { + let conn_stats_guard = ingest_commons.ca_conn_set.ca_conn_ress.lock().await; + for (_, g) in conn_stats_guard.iter() { + agg.push(&g.stats); } } - Ok::<_, Error>(()) - }; - let jh = tokio::spawn(conn_block); - Ok(jh) + let mut m = METRICS.lock().unwrap(); + *m = Some(agg.clone()); + if false { + let diff = CaConnStatsAggDiff::diff_from(&agg_last, &agg); + info!("{}", diff.display()); + } + agg_last = agg; + } } pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { @@ -473,13 +729,8 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { tokio::spawn(pg_conn); let pg_client = Arc::new(pg_client); - // TODO use new struct: + // TODO use a new type: let local_stats = Arc::new(CaConnStats::new()); - // TODO factor the find loop into a separate Stream. - let qu_find_addr = pg_client - .prepare("with q1 as (select t1.facility, t1.channel, t1.addr from ioc_by_channel t1 where t1.facility = $1 and t1.channel in ($2, $3, $4, $5, $6, $7, $8, $9) and t1.addr != '' order by t1.tsmod desc) select distinct on (q1.facility, q1.channel) q1.facility, q1.channel, q1.addr from q1") - .await - .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; // Fetch all addresses for all channels. let rows = pg_client @@ -501,7 +752,7 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { let data_store = Arc::new(DataStore::new(&scyconf, pg_client.clone()).await?); let insert_item_queue = CommonInsertItemQueue::new(opts.insert_item_queue_cap); let insert_item_queue = Arc::new(insert_item_queue); - // TODO use a new stats struct + // TODO use a new stats type: let store_stats = Arc::new(CaConnStats::new()); let jh_insert_workers = spawn_scylla_insert_workers( opts.scyconf.clone(), @@ -514,20 +765,15 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { ) .await?; - let mut conn_jhs = vec![]; - let conn_stats: Arc>>> = Arc::new(TokMx::new(Vec::new())); - let command_queue_set = Arc::new(CommandQueueSet::new()); - let ingest_commons = IngestCommons { pgconf: Arc::new(pgconf.clone()), local_epics_hostname: opts.local_epics_hostname.clone(), insert_item_queue: insert_item_queue.clone(), data_store: data_store.clone(), insert_ivl_min: insert_ivl_min.clone(), - conn_stats: conn_stats.clone(), - command_queue_set: command_queue_set.clone(), insert_frac, extra_inserts_conf, + ca_conn_set: CaConnSet::new(), }; let ingest_commons = Arc::new(ingest_commons); @@ -538,159 +784,35 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { )); } - let metrics_agg_fut = { - let conn_stats = conn_stats.clone(); - let local_stats = local_stats.clone(); - async move { - let mut agg_last = CaConnStatsAgg::new(); - loop { - tokio::time::sleep(Duration::from_millis(671)).await; - let agg = CaConnStatsAgg::new(); - agg.push(&local_stats); - agg.push(&store_stats); - for g in conn_stats.lock().await.iter() { - agg.push(&g); - } - let mut m = METRICS.lock().unwrap(); - *m = Some(agg.clone()); - if false { - let diff = CaConnStatsAggDiff::diff_from(&agg_last, &agg); - info!("{}", diff.display()); - } - agg_last = agg; - if false { - break; - } - } - } - }; + let metrics_agg_fut = metrics_agg_task(ingest_commons.clone(), local_stats.clone(), store_stats.clone()); let metrics_agg_jh = tokio::spawn(metrics_agg_fut); let mut chns_todo = &opts.channels[..]; - let mut chstmp = ["__NONE__"; 8]; let mut ix = 0; - while chns_todo.len() > 0 && SIGINT.load(Ordering::Acquire) == 0 { - if false { - for (s1, s2) in chns_todo.iter().zip(chstmp.iter_mut()) { - *s2 = s1; - } - chns_todo = &chns_todo[chstmp.len().min(chns_todo.len())..]; - let rows = pg_client - .query( - &qu_find_addr, - &[ - &opts.backend, - &chstmp[0], - &chstmp[1], - &chstmp[2], - &chstmp[3], - &chstmp[4], - &chstmp[5], - &chstmp[6], - &chstmp[7], - ], - ) - .await - .map_err(|e| Error::with_msg_no_trace(format!("pg lookup error: {e:?}")))?; - for row in rows { - let ch: &str = row.get(1); - let addr: &str = row.get(2); - if addr == "" { - // TODO the address was searched before but could not be found. - } else { - let addr: SocketAddrV4 = match addr.parse() { - Ok(k) => k, - Err(e) => { - error!("can not parse {addr:?} for channel {ch:?} {e:?}"); - continue; - } - }; - let _ = addr; - } - } + for ch in chns_todo { + if SIGINT.load(Ordering::Acquire) != 0 { + break; } - if let Some(ch) = chns_todo.first() { - let ch = ch.clone(); - chns_todo = &chns_todo[1..]; - if let Some(addr) = phonebook.get(&ch) { - if !channels_by_host.contains_key(&addr) { - channels_by_host.insert(addr, vec![ch.to_string()]); - } else { - channels_by_host.get_mut(&addr).unwrap().push(ch.to_string()); - } - let create_new = { - let g = command_queue_set.queues_locked().await; - if let Some(tx) = g.get(&addr) { - let (cmd, rx) = ConnCommand::channel_add(ch.to_string()); - tx.send(cmd).await.unwrap(); - if !rx.recv().await.unwrap() { - error!("Could not add channel: {}", ch); - } - false - } else { - true - } - }; - if create_new { - info!("create new CaConn {:?} {:?}", addr, ch); - let data_store = data_store.clone(); - let conn = CaConn::new( - addr.clone(), - opts.local_epics_hostname.clone(), - data_store.clone(), - insert_item_queue.sender(), - opts.array_truncate, - opts.insert_queue_max, - insert_ivl_min.clone(), - ingest_commons.clone(), - ); - conn_stats.lock().await.push(conn.stats()); - let stats2 = conn.stats(); - let conn_command_tx = conn.conn_command_tx(); - let tx = conn_command_tx.clone(); - { - command_queue_set - .queues_locked() - .await - .insert(addr.clone(), conn_command_tx); - } - let conn_block = async move { - let mut conn = conn; - while let Some(item) = conn.next().await { - match item { - Ok(_) => { - stats2.conn_item_count_inc(); - } - Err(e) => { - error!("CaConn gives error: {e:?}"); - break; - } - } - } - Ok::<_, Error>(()) - }; - let jh = tokio::spawn(conn_block); - conn_jhs.push(jh); - { - let (cmd, rx) = ConnCommand::channel_add(ch.to_string()); - tx.send(cmd).await.unwrap(); - if !rx.recv().await.unwrap() { - error!("Could not add channel: {}", ch); - } - } - } - } - ix += 1; - if ix % 1000 == 0 { - info!("{} of {} {}", ix, opts.channels.len(), ch); + let ch = ch.to_string(); + chns_todo = &chns_todo[1..]; + if let Some(addr) = phonebook.get(&ch) { + if !channels_by_host.contains_key(&addr) { + channels_by_host.insert(addr, vec![ch.to_string()]); + } else { + channels_by_host.get_mut(&addr).unwrap().push(ch.to_string()); } + ingest_commons + .ca_conn_set + .add_channel_to_addr(SocketAddr::V4(addr.clone()), ch.clone(), ingest_commons.clone()) + .await?; + } + ix += 1; + if ix % 1000 == 0 { + info!("{} of {} {}", ix, opts.channels.len(), ch); } } info!("channels_by_host len {}", channels_by_host.len()); - let mut conn_jhs: VecDeque>>> = - conn_jhs.into_iter().map(|jh| jh.fuse()).collect(); - let mut sent_stop_commands = false; loop { if SIGINT.load(Ordering::Acquire) != 0 { if false { @@ -699,41 +821,13 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { let rc = receiver.receiver_count(); info!("item queue senders {} receivers {}", sc, rc); } - if !sent_stop_commands { - sent_stop_commands = true; - info!("sending stop command"); - let queues = command_queue_set.queues_locked().await; - for q in queues.iter() { - let (cmd, _rx) = ConnCommand::shutdown(); - let _ = q.1.send(cmd).await; - } - } - } - let mut jh = if let Some(x) = conn_jhs.pop_front() { - x - } else { + info!("sending stop commands"); + ingest_commons.ca_conn_set.send_stop().await?; break; - }; - futures_util::select! { - a = jh => match a { - Ok(k) => match k { - Ok(_) => {} - Err(e) => { - error!("{e:?}"); - } - }, - Err(e) => { - error!("{e:?}"); - } - }, - _b = tokio::time::sleep(Duration::from_millis(1000)).fuse() => { - conn_jhs.push_back(jh); - if sent_stop_commands { - info!("waiting for {} connections", conn_jhs.len()); - } - } - }; + } + tokio::time::sleep(Duration::from_millis(400)).await; } + ingest_commons.ca_conn_set.wait_stopped().await?; info!("all connections done."); drop(ingest_commons); diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 5e0786d..974f26a 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -22,7 +22,7 @@ use std::collections::{BTreeMap, VecDeque}; use std::net::SocketAddrV4; use std::ops::ControlFlow; use std::pin::Pin; -use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::atomic::Ordering; use std::sync::Arc; use std::task::{Context, Poll}; use std::time::{Duration, Instant, SystemTime}; @@ -350,7 +350,7 @@ pub struct CaConn { array_truncate: usize, stats: Arc, insert_queue_max: usize, - insert_ivl_min: Arc, + insert_ivl_min_mus: u64, ts_channel_alive_check_last: Instant, conn_command_tx: async_channel::Sender, conn_command_rx: async_channel::Receiver, @@ -370,7 +370,6 @@ impl CaConn { insert_item_sender: CommonInsertItemQueueSender, array_truncate: usize, insert_queue_max: usize, - insert_ivl_min: Arc, ingest_commons: Arc, ) -> Self { let (cq_tx, cq_rx) = async_channel::bounded(32); @@ -395,7 +394,7 @@ impl CaConn { array_truncate, stats: Arc::new(CaConnStats::new()), insert_queue_max, - insert_ivl_min, + insert_ivl_min_mus: 1000 * 1000 * 6, ts_channel_alive_check_last: Instant::now(), conn_command_tx: cq_tx, conn_command_rx: cq_rx, @@ -529,8 +528,12 @@ impl CaConn { } pub fn channel_add(&mut self, channel: String) { + if self.cid_by_name.contains_key(&channel) { + return; + } let cid = self.cid_by_name(&channel); if self.channels.contains_key(&cid) { + error!("logic error"); } else { self.channels.insert(cid, ChannelState::Init); // TODO do not count, use separate queue for those channels. @@ -827,7 +830,7 @@ impl CaConn { ev: proto::EventAddRes, tsnow: Instant, item_queue: &mut VecDeque, - insert_ivl_min: Arc, + insert_ivl_min_mus: u64, stats: Arc, inserts_counter: &mut u64, extra_inserts_conf: &ExtraInsertsConf, @@ -836,8 +839,7 @@ impl CaConn { st.insert_item_ivl_ema.tick(tsnow); let em = st.insert_item_ivl_ema.ema(); let ema = em.ema(); - let ivl_min = insert_ivl_min.load(Ordering::Acquire); - let ivl_min = (ivl_min as f32) * 1e-6; + let ivl_min = (insert_ivl_min_mus as f32) * 1e-6; let dt = (ivl_min - ema).max(0.) / em.k(); st.insert_next_earliest = tsnow .checked_add(Duration::from_micros((dt * 1e6) as u64)) @@ -852,14 +854,14 @@ impl CaConn { } else { None }; - for &(m, l) in extra_inserts_conf.copies.iter() { + for (i, &(m, l)) in extra_inserts_conf.copies.iter().enumerate().rev() { if *inserts_counter % m == l { Self::event_add_insert( st, series.clone(), scalar_type.clone(), shape.clone(), - ts - 2, + ts - 1 - i as u64, ev.clone(), item_queue, ts_msp_last, @@ -953,7 +955,7 @@ impl CaConn { ev, tsnow, item_queue, - self.insert_ivl_min.clone(), + self.insert_ivl_min_mus, self.stats.clone(), inserts_counter, extra_inserts_conf, diff --git a/netfetch/src/errconv.rs b/netfetch/src/errconv.rs index 0ecb4a2..5fbfb94 100644 --- a/netfetch/src/errconv.rs +++ b/netfetch/src/errconv.rs @@ -1,3 +1,4 @@ +use async_channel::{RecvError, SendError}; use err::Error; use scylla::transport::errors::QueryError; use scylla::transport::query_result::{FirstRowError, RowsExpectedError}; @@ -6,6 +7,24 @@ pub trait ErrConv { fn err_conv(self) -> Result; } +impl ErrConv for Result> { + fn err_conv(self) -> Result { + match self { + Ok(k) => Ok(k), + Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))), + } + } +} + +impl ErrConv for Result { + fn err_conv(self) -> Result { + match self { + Ok(k) => Ok(k), + Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))), + } + } +} + impl ErrConv for Result { fn err_conv(self) -> Result { match self { diff --git a/netfetch/src/metrics.rs b/netfetch/src/metrics.rs index f83e806..ad880eb 100644 --- a/netfetch/src/metrics.rs +++ b/netfetch/src/metrics.rs @@ -1,10 +1,11 @@ use crate::ca::conn::ConnCommand; use crate::ca::{ExtraInsertsConf, IngestCommons}; use axum::extract::Query; +use err::Error; use http::request::Parts; use log::*; use std::collections::HashMap; -use std::net::SocketAddrV4; +use std::net::{SocketAddr, SocketAddrV4}; use std::sync::atomic::Ordering; use std::sync::Arc; @@ -12,6 +13,7 @@ async fn get_empty() -> String { String::new() } +#[allow(unused)] async fn send_command<'a, IT, F, R>(it: &mut IT, cmdgen: F) -> Vec> where IT: Iterator)>, @@ -37,86 +39,44 @@ async fn find_channel( ingest_commons: Arc, ) -> axum::Json)>> { let pattern = params.get("pattern").map_or(String::new(), |x| x.clone()).to_string(); - let g = ingest_commons.command_queue_set.queues_locked().await; - let mut it = g.iter(); - let rxs = send_command(&mut it, || ConnCommand::find_channel(pattern.clone())).await; - let mut res = Vec::new(); - for rx in rxs { - let item = rx.recv().await.unwrap(); - if item.1.len() > 0 { - let item = (item.0.to_string(), item.1); - res.push(item); - } - } + // TODO allow usage of `?` in handler: + let res = ingest_commons + .ca_conn_set + .send_command_to_all(|| ConnCommand::find_channel(pattern.clone())) + .await + .unwrap(); + let res = res.into_iter().map(|x| (x.0.to_string(), x.1)).collect(); axum::Json(res) } -async fn channel_add(params: HashMap, ingest_commons: Arc) -> String { +async fn channel_add_inner(params: HashMap, ingest_commons: Arc) -> Result<(), Error> { if let (Some(backend), Some(name)) = (params.get("backend"), params.get("name")) { - // TODO look up the address. match crate::ca::find_channel_addr(backend.into(), name.into(), &ingest_commons.pgconf).await { Ok(Some(addr)) => { - if ingest_commons - .command_queue_set - .queues_locked() - .await - .contains_key(&addr) - { - } else { - match crate::ca::create_ca_conn( - addr, - ingest_commons.local_epics_hostname.clone(), - 256, - 32, - ingest_commons.insert_item_queue.clone(), - ingest_commons.data_store.clone(), - ingest_commons.insert_ivl_min.clone(), - ingest_commons.conn_stats.clone(), - ingest_commons.command_queue_set.clone(), - ingest_commons.clone(), - ) - .await - { - Ok(_) => { - // TODO keep the join handle. - } - Err(_) => { - error!("can not create CaConn"); - } - } - } - if let Some(tx) = ingest_commons.command_queue_set.queues_locked().await.get(&addr) { - let (cmd, rx) = ConnCommand::channel_add(name.into()); - if let Err(_) = tx.send(cmd).await { - error!("can not send command"); - "false".into() - } else { - match rx.recv().await { - Ok(x) => { - if x { - "true".into() - } else { - "false".into() - } - } - Err(_) => "false".into(), - } - } - } else { - error!("Even after create, can not locate the connection."); - "false".into() - } + ingest_commons + .ca_conn_set + .add_channel_to_addr(SocketAddr::V4(addr), name.into(), ingest_commons.clone()) + .await?; + Ok(()) } _ => { error!("can not find addr for channel"); - "false".into() + Err(Error::with_msg_no_trace(format!("can not find addr for channel"))) } } } else { - "false".into() + Err(Error::with_msg_no_trace(format!("wrong parameters given"))) } } +async fn channel_add(params: HashMap, ingest_commons: Arc) -> axum::Json { + let ret = match channel_add_inner(params, ingest_commons).await { + Ok(_) => true, + Err(_) => false, + }; + axum::Json(ret) +} + async fn channel_remove( params: HashMap, ingest_commons: Arc, @@ -132,7 +92,7 @@ async fn channel_remove( } else { return Json(Value::Bool(false)); }; - let backend = if let Some(x) = params.get("backend") { + let _backend = if let Some(x) = params.get("backend") { x } else { return Json(Value::Bool(false)); @@ -142,87 +102,70 @@ async fn channel_remove( } else { return Json(Value::Bool(false)); }; - if let Some(tx) = ingest_commons.command_queue_set.queues_locked().await.get(&addr) { - // TODO any need to check the backend here? - let _ = backend; - let (cmd, rx) = ConnCommand::channel_remove(name.into()); - if let Err(_) = tx.send(cmd).await { - error!("can not send command"); + match ingest_commons + .ca_conn_set + .send_command_to_addr(&SocketAddr::V4(addr), || ConnCommand::channel_remove(name.into())) + .await + { + Ok(k) => Json(Value::Bool(k)), + Err(e) => { + error!("{e:?}"); Json(Value::Bool(false)) - } else { - match rx.recv().await { - Ok(x) => Json(Value::Bool(x)), - Err(_) => Json(Value::Bool(false)), - } } - } else { - Json(Value::Bool(false)) } } async fn channel_state(params: HashMap, ingest_commons: Arc) -> String { let name = params.get("name").map_or(String::new(), |x| x.clone()).to_string(); - let g = ingest_commons.command_queue_set.queues_locked().await; - let mut rxs = Vec::new(); - for (_, tx) in g.iter() { - let (cmd, rx) = ConnCommand::channel_state(name.clone()); - match tx.send(cmd).await { - Ok(()) => { - rxs.push(rx); - } - Err(e) => { - error!("can not send command {e:?}"); - } + match ingest_commons + .ca_conn_set + .send_command_to_all(|| ConnCommand::channel_state(name.clone())) + .await + { + Ok(k) => { + let a: Vec<_> = k.into_iter().map(|(a, b)| (a.to_string(), b)).collect(); + serde_json::to_string(&a).unwrap() + } + Err(e) => { + error!("{e:?}"); + return format!("null"); } } - let mut res = Vec::new(); - for rx in rxs { - let item = rx.recv().await.unwrap(); - if let Some(st) = item.1 { - let item = (item.0.to_string(), st); - res.push(item); - } - } - serde_json::to_string(&res).unwrap() } async fn channel_states( _params: HashMap, ingest_commons: Arc, ) -> axum::Json> { - let g = ingest_commons.command_queue_set.queues_locked().await; - let mut rxs = Vec::new(); - for (_, tx) in g.iter() { - let (cmd, rx) = ConnCommand::channel_states_all(); - match tx.send(cmd).await { - Ok(()) => { - rxs.push(rx); - } - Err(e) => { - error!("can not send command {e:?}"); - } - } - } + let vals = ingest_commons + .ca_conn_set + .send_command_to_all(|| ConnCommand::channel_states_all()) + .await + .unwrap(); let mut res = Vec::new(); - for rx in rxs { - let item = rx.recv().await.unwrap(); - for h in item.1 { - res.push(h); + for h in vals { + for j in h.1 { + res.push(j); } } res.sort_unstable_by_key(|v| u32::MAX - v.interest_score as u32); - //let res: Vec<_> = res.into_iter().rev().take(10).collect(); + let res = if true { + res.into_iter().rev().take(10).collect() + } else { + res + }; axum::Json(res) } async fn extra_inserts_conf_set(v: ExtraInsertsConf, ingest_commons: Arc) -> axum::Json { + // TODO ingest_commons is the authorative value. Should have common function outside of this metrics which + // can update everything to a given value. *ingest_commons.extra_inserts_conf.lock().unwrap() = v.clone(); - let g = ingest_commons.command_queue_set.queues_locked().await; - let mut it = g.iter(); - let rxs = send_command(&mut it, || ConnCommand::extra_inserts_conf_set(v.clone())).await; - for rx in rxs { - let _item = rx.recv().await.unwrap(); - } + ingest_commons + .ca_conn_set + .send_command_to_all(|| ConnCommand::extra_inserts_conf_set(v.clone())) + .await + .unwrap(); axum::Json(true) }