diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index 6535264..98df7e3 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -243,7 +243,7 @@ impl Daemon { let rx = inserthook::active_channel_insert_hook(common_insert_item_queue.receiver().unwrap()); let common_insert_item_queue_2 = rx; - let conn_set_ctrl = CaConnSet::new(channel_info_query_tx.clone()); + let conn_set_ctrl = CaConnSet::start(channel_info_query_tx.clone()); let ingest_commons = IngestCommons { pgconf: Arc::new(opts.pgconf.clone()), diff --git a/netfetch/src/ca.rs b/netfetch/src/ca.rs index 0f01ee8..7e17b76 100644 --- a/netfetch/src/ca.rs +++ b/netfetch/src/ca.rs @@ -1,6 +1,5 @@ pub mod conn; pub mod connset; -pub mod connset_consume; pub mod findioc; pub mod proto; pub mod search; diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index 7402a06..25aedd7 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -1,7 +1,5 @@ use super::conn::CaConnEvent; use super::conn::ConnCommand; -use super::connset_consume::ConnSetConsume; -use super::SlowWarnable; use crate::ca::conn::CaConn; use crate::ca::conn::CaConnEventValue; use crate::ca::conn::CaConnOpts; @@ -15,9 +13,6 @@ use err::Error; use futures_util::FutureExt; use futures_util::StreamExt; use netpod::log::*; -use scywr::iteminsertqueue::CommonInsertItemQueue; -use scywr::iteminsertqueue::CommonInsertItemQueueSender; -use scywr::store::DataStore; use series::ChannelStatusSeriesId; use stats::CaConnStats; use std::collections::BTreeMap; @@ -28,8 +23,6 @@ use std::sync::Arc; use std::time::Duration; use std::time::Instant; use taskrun::tokio; -use tracing::info_span; -use tracing::Instrument; #[derive(Debug, PartialEq, Eq)] pub struct CmdId(SocketAddrV4, usize); @@ -61,9 +54,15 @@ pub enum ConnSetCmd { Shutdown, } +#[derive(Debug)] +pub enum CaConnSetEvent { + ConnSetCmd(ConnSetCmd), + CaConnEvent((SocketAddr, CaConnEvent)), +} + #[derive(Clone)] pub struct CaConnSetCtrl { - cmd_tx: Sender, + tx: Sender, } impl CaConnSetCtrl { @@ -83,46 +82,41 @@ impl CaConnSetCtrl { local_epics_hostname, }; let cmd = ConnSetCmd::ChannelAdd(cmd); - self.cmd_tx.send(cmd).await?; + self.tx.send(CaConnSetEvent::ConnSetCmd(cmd)).await?; Ok(()) } } pub struct CaConnSet { - cmd_rx: Receiver, ca_conn_ress: BTreeMap, - conn_item_tx: Sender<(SocketAddr, CaConnEvent)>, + connset_tx: Sender, + connset_rx: Receiver, channel_info_query_tx: Sender, - conn_evs_collect_jh: JoinHandle>, shutdown: bool, } impl CaConnSet { - pub fn new(channel_info_query_tx: Sender) -> CaConnSetCtrl { - let (cmd_tx, cmd_rx) = async_channel::bounded(32); - let (conn_item_tx, conn_item_rx) = async_channel::bounded(10000); - let (consume_jh,) = ConnSetConsume::new(conn_item_rx); - // TODO where to use this? - // TODO receive and use CaConn items, as well as receive and use ConnSetCmd - err::todo(); + pub fn start(channel_info_query_tx: Sender) -> CaConnSetCtrl { + let (connset_tx, connset_rx) = async_channel::bounded(10000); let connset = Self { - cmd_rx, ca_conn_ress: BTreeMap::new(), - conn_item_tx, + connset_tx: connset_tx.clone(), + connset_rx, channel_info_query_tx, - conn_evs_collect_jh: consume_jh, shutdown: false, }; - CaConnSetCtrl { cmd_tx } + // TODO use jh + let jh = tokio::spawn(CaConnSet::run(connset)); + CaConnSetCtrl { tx: connset_tx } } - async fn run(mut self) -> Result<(), Error> { + async fn run(mut this: CaConnSet) -> Result<(), Error> { loop { - let x = self.cmd_rx.recv().await; + let x = this.connset_rx.recv().await; match x { - Ok(ev) => self.handle_event(ev).await?, + Ok(ev) => this.handle_event(ev).await?, Err(_) => { - if self.shutdown { + if this.shutdown { // all fine break Ok(()); } else { @@ -133,14 +127,16 @@ impl CaConnSet { } } - async fn handle_event(&mut self, ev: ConnSetCmd) -> Result<(), Error> { - use ConnSetCmd::*; + async fn handle_event(&mut self, ev: CaConnSetEvent) -> Result<(), Error> { match ev { - ChannelAdd(x) => self.add_channel_to_addr(x).await, - Shutdown => { - self.shutdown = true; - Ok(()) - } + CaConnSetEvent::ConnSetCmd(cmd) => match cmd { + ConnSetCmd::ChannelAdd(x) => self.add_channel_to_addr(x).await, + ConnSetCmd::Shutdown => { + self.shutdown = true; + Ok(()) + } + }, + CaConnSetEvent::CaConnEvent((addr, ev)) => todo!(), } } @@ -174,7 +170,7 @@ impl CaConnSet { ); let conn_tx = conn.conn_command_tx(); let conn_stats = conn.stats(); - let conn_item_tx = self.conn_item_tx.clone(); + let conn_item_tx = self.connset_tx.clone(); let jh = tokio::spawn(Self::ca_conn_item_merge(conn, conn_item_tx, addr_v4)); let ca_conn_res = CaConnRes { sender: conn_tx, @@ -186,7 +182,7 @@ impl CaConnSet { async fn ca_conn_item_merge( conn: CaConn, - conn_item_tx: Sender<(SocketAddr, CaConnEvent)>, + conn_item_tx: Sender, addr: SocketAddrV4, ) -> Result<(), Error> { debug!("ca_conn_consumer begin {}", addr); @@ -197,7 +193,9 @@ impl CaConnSet { match item { Ok(item) => { stats.conn_item_count_inc(); - conn_item_tx.send((SocketAddr::V4(addr), item)).await?; + conn_item_tx + .send(CaConnSetEvent::CaConnEvent((SocketAddr::V4(addr), item))) + .await?; } Err(e) => { error!("CaConn gives error: {e:?}"); @@ -207,13 +205,13 @@ impl CaConnSet { } debug!("ca_conn_consumer ended {}", addr); conn_item_tx - .send(( + .send(CaConnSetEvent::CaConnEvent(( SocketAddr::V4(addr), CaConnEvent { ts: Instant::now(), value: CaConnEventValue::EndOfStream, }, - )) + ))) .await?; debug!("ca_conn_consumer signaled {}", addr); ret diff --git a/netfetch/src/ca/connset_consume.rs b/netfetch/src/ca/connset_consume.rs deleted file mode 100644 index da406b0..0000000 --- a/netfetch/src/ca/connset_consume.rs +++ /dev/null @@ -1,41 +0,0 @@ -use crate::ca::conn::CaConnEvent; -use async_channel::Receiver; -use async_channel::Sender; -use err::thiserror; -use log::*; -use std::net::SocketAddr; -use taskrun::spawn; -use taskrun::tokio::task::JoinHandle; -use thiserror::Error; - -#[derive(Debug, Error)] -pub enum Error {} - -pub struct ConnSetConsumeInp(Receiver); - -pub struct ConnSetConsume { - inp: Receiver<(SocketAddr, CaConnEvent)>, -} - -impl ConnSetConsume { - pub fn new(inp: Receiver<(SocketAddr, CaConnEvent)>) -> (JoinHandle>,) { - let ret = Self { inp }; - let jh = spawn(ret.run()); - (jh,) - } - - fn handle_event(&mut self, addr: SocketAddr, ev: CaConnEvent) {} - - async fn run(mut self) -> Result<(), Error> { - loop { - match self.inp.recv().await { - Ok((addr, item)) => self.handle_event(addr, item), - Err(e) => { - error!("{e}"); - break; - } - } - } - Ok(()) - } -}