From 3a23fa42753a715e7a41d18c035251e572ed8648 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 20 Jan 2023 06:24:49 +0100 Subject: [PATCH] Refactor --- daqingest/src/bin/daqingest.rs | 4 - daqingest/src/daemon.rs | 62 ++------- daqingest/src/opts.rs | 1 - netfetch/src/batchquery/series_by_channel.rs | 134 ++++++++++--------- netfetch/src/daemon_common.rs | 52 +++++++ netfetch/src/metrics.rs | 6 + netfetch/src/netfetch.rs | 1 + 7 files changed, 136 insertions(+), 124 deletions(-) create mode 100644 netfetch/src/daemon_common.rs diff --git a/daqingest/src/bin/daqingest.rs b/daqingest/src/bin/daqingest.rs index 4948cc8..341d4de 100644 --- a/daqingest/src/bin/daqingest.rs +++ b/daqingest/src/bin/daqingest.rs @@ -28,10 +28,6 @@ pub fn main() -> Result<(), Error> { netfetch::ca::search::ca_search(conf, &channels).await? } ChannelAccess::CaIngest(k) => { - let (conf, channels) = parse_config(k.config.into()).await?; - netfetch::ca::ca_connect(conf, &channels).await? - } - ChannelAccess::CaIngestNew(k) => { let (conf, channels) = parse_config(k.config.into()).await?; daqingest::daemon::run(conf, channels).await? } diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index fb498d7..6769bf5 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -4,7 +4,6 @@ use err::Error; use futures_util::FutureExt; use futures_util::StreamExt; use log::*; -use netfetch::ca::conn::CaConnEvent; use netfetch::ca::conn::ConnCommand; use netfetch::ca::connset::CaConnSet; use netfetch::ca::findioc::FindIocRes; @@ -13,6 +12,8 @@ use netfetch::ca::store::DataStore; use netfetch::ca::IngestCommons; use netfetch::ca::SlowWarnable; use netfetch::conf::CaIngestOpts; +use netfetch::daemon_common::Channel; +use netfetch::daemon_common::DaemonEvent; use netfetch::errconv::ErrConv; use netfetch::insertworker::Ttls; use netfetch::metrics::ExtraInsertsConf; @@ -82,21 +83,6 @@ macro_rules! trace_batch { }); } -#[derive(Clone, Debug, Serialize, PartialEq, PartialOrd, Eq, Ord)] -pub struct Channel { - id: String, -} - -impl Channel { - pub fn new(id: String) -> Self { - Self { id } - } - - pub fn id(&self) -> &str { - &self.id - } -} - #[allow(non_snake_case)] mod serde_Instant { use serde::Serializer; @@ -177,37 +163,6 @@ pub struct CaConnState { value: CaConnStateValue, } -#[derive(Debug)] -pub enum DaemonEvent { - TimerTick, - ChannelAdd(Channel), - ChannelRemove(Channel), - SearchDone(Result, Error>), - CaConnEvent(SocketAddrV4, CaConnEvent), -} - -impl DaemonEvent { - pub fn summary(&self) -> String { - use DaemonEvent::*; - match self { - TimerTick => format!("TimerTick"), - ChannelAdd(x) => format!("ChannelAdd {x:?}"), - ChannelRemove(x) => format!("ChannelRemove {x:?}"), - SearchDone(_x) => format!("SearchDone"), - CaConnEvent(_a, b) => { - use netfetch::ca::conn::CaConnEventValue::*; - match &b.value { - None => format!("CaConnEvent/None"), - EchoTimeout => format!("CaConnEvent/EchoTimeout"), - HealthCheckDone => format!("CaConnEvent/HealthCheckDone"), - ConnCommandResult(_) => format!("CaConnEvent/ConnCommandResult"), - EndOfStream => format!("CaConnEvent/EndOfStream"), - } - } - } - } -} - #[derive(Debug, Clone)] pub struct DaemonOpts { backend: String, @@ -1186,13 +1141,10 @@ pub async fn run(opts: CaIngestOpts, channels: Vec) -> Result<(), Error> netfetch::linuxhelper::set_signal_handler(libc::SIGINT, handler_sigint)?; netfetch::linuxhelper::set_signal_handler(libc::SIGTERM, handler_sigterm)?; - let dcom = Arc::new(netfetch::metrics::DaemonComm::dummy()); - netfetch::metrics::start_metrics_service(opts.api_bind(), dcom); - // TODO use a new stats type: - let store_stats = Arc::new(CaConnStats::new()); - let metrics_agg_fut = metrics_agg_task(ingest_commons.clone(), local_stats.clone(), store_stats.clone()); - let metrics_agg_jh = tokio::spawn(metrics_agg_fut); + //let store_stats = Arc::new(CaConnStats::new()); + //let metrics_agg_fut = metrics_agg_task(ingest_commons.clone(), local_stats.clone(), store_stats.clone()); + //let metrics_agg_jh = tokio::spawn(metrics_agg_fut); let opts2 = DaemonOpts { backend: opts.backend().into(), @@ -1209,6 +1161,10 @@ pub async fn run(opts: CaIngestOpts, channels: Vec) -> Result<(), Error> }; let mut daemon = Daemon::new(opts2).await?; let tx = daemon.tx.clone(); + + let dcom = Arc::new(netfetch::metrics::DaemonComm::dummy()); + netfetch::metrics::start_metrics_service(opts.api_bind(), dcom); + let daemon_jh = taskrun::spawn(async move { // TODO handle Err daemon.daemon().await.unwrap(); diff --git a/daqingest/src/opts.rs b/daqingest/src/opts.rs index 11667d0..94f6e98 100644 --- a/daqingest/src/opts.rs +++ b/daqingest/src/opts.rs @@ -79,7 +79,6 @@ pub struct BsreadDump { #[derive(Debug, Parser)] pub enum ChannelAccess { CaIngest(CaConfig), - CaIngestNew(CaConfig), CaSearch(CaSearch), } diff --git a/netfetch/src/batchquery/series_by_channel.rs b/netfetch/src/batchquery/series_by_channel.rs index c386112..f548c41 100644 --- a/netfetch/src/batchquery/series_by_channel.rs +++ b/netfetch/src/batchquery/series_by_channel.rs @@ -59,8 +59,68 @@ async fn prepare_pgcs(sql: &str, pgcn: usize, db: &Database) -> Result<(Sender

Result { - err::todoval() +async fn fetch_data(batch: Vec, pgres: PgRes) -> Result<(ChannelInfoResult, PgRes), Error> { + let mut backend = Vec::new(); + let mut channel = Vec::new(); + let mut scalar_type = Vec::new(); + let mut shape_dims: Vec = Vec::new(); + let mut rid = Vec::new(); + let mut tx = Vec::new(); + for (i, e) in batch.into_iter().enumerate() { + backend.push(e.backend); + channel.push(e.channel); + scalar_type.push(e.scalar_type); + let mut dims = String::with_capacity(16); + dims.push('{'); + for (i, v) in e.shape_dims.into_iter().enumerate() { + if i > 0 { + dims.push(','); + } + use std::fmt::Write; + write!(dims, "{}", v).unwrap(); + } + dims.push('}'); + shape_dims.push(dims); + rid.push(i as i32); + tx.push((i as u32, e.tx)); + } + match pgres + .pgc + .query(&pgres.st, &[&backend, &channel, &scalar_type, &shape_dims, &rid]) + .await + .map_err(|e| { + error!("{e}"); + Error::from(e.to_string()) + }) { + Ok(rows) => { + let mut series_ids = Vec::new(); + let mut txs = Vec::new(); + let mut it1 = rows.into_iter(); + let mut e1 = it1.next(); + for (qrid, tx) in tx { + if let Some(row) = &e1 { + let rid: i32 = row.get(1); + if rid as u32 == qrid { + let series: i64 = row.get(0); + let series = SeriesId::new(series as _); + series_ids.push(Existence::Existing(series)); + txs.push(tx); + } + e1 = it1.next(); + } + } + let result = ChannelInfoResult { + series: series_ids, + tx: txs, + }; + Ok((result, pgres)) + } + Err(e) => { + error!("error in pg query {e}"); + tokio::time::sleep(Duration::from_millis(2000)).await; + Err(e) + } + } } async fn run_queries( @@ -75,70 +135,12 @@ async fn run_queries( let pgc_tx = pgc_tx.clone(); async move { if let Ok(pgres) = pgc_rx.recv().await { - let mut backend = Vec::new(); - let mut channel = Vec::new(); - let mut scalar_type = Vec::new(); - let mut shape_dims: Vec = Vec::new(); - let mut rid = Vec::new(); - let mut tx = Vec::new(); - for (i, e) in batch.into_iter().enumerate() { - backend.push(e.backend); - channel.push(e.channel); - scalar_type.push(e.scalar_type); - let mut dims = String::with_capacity(16); - dims.push('{'); - for (i, v) in e.shape_dims.into_iter().enumerate() { - if i > 0 { - dims.push(','); - } - use std::fmt::Write; - write!(dims, "{}", v).unwrap(); - } - dims.push('}'); - shape_dims.push(dims); - rid.push(i as i32); - tx.push((i as u32, e.tx)); - } - match pgres - .pgc - .query(&pgres.st, &[&backend, &channel, &scalar_type, &shape_dims, &rid]) - .await - .map_err(|e| { - error!("{e}"); - Error::from(e.to_string()) - }) { - Ok(rows) => { - if pgc_tx.send(pgres).await.is_err() { - Err(Error::with_msg_no_trace("can not hand pgres back")) - } else { - let mut series_ids = Vec::new(); - let mut txs = Vec::new(); - let mut it1 = rows.into_iter(); - let mut e1 = it1.next(); - for (qrid, tx) in tx { - if let Some(row) = &e1 { - let rid: i32 = row.get(1); - if rid as u32 == qrid { - let series: i64 = row.get(0); - let series = SeriesId::new(series as _); - series_ids.push(Existence::Existing(series)); - txs.push(tx); - } - e1 = it1.next(); - } - } - let result = ChannelInfoResult { - series: series_ids, - tx: txs, - }; - Ok(result) - } - } - Err(e) => { - error!("error in pg query {e}"); - tokio::time::sleep(Duration::from_millis(2000)).await; - Err(e) - } + let (res, pgres) = fetch_data(batch, pgres).await?; + if let Err(_) = pgc_tx.send(pgres).await { + error!("can not hand back pgres"); + Err(Error::with_msg_no_trace("can not hand back pgres")) + } else { + Ok(res) } } else { error!("can not get pgc"); diff --git a/netfetch/src/daemon_common.rs b/netfetch/src/daemon_common.rs new file mode 100644 index 0000000..556284a --- /dev/null +++ b/netfetch/src/daemon_common.rs @@ -0,0 +1,52 @@ +use crate::ca::conn::CaConnEvent; +use crate::ca::findioc::FindIocRes; +use err::Error; +use serde::Serialize; +use std::collections::VecDeque; +use std::net::SocketAddrV4; + +#[derive(Clone, Debug, Serialize, PartialEq, PartialOrd, Eq, Ord)] +pub struct Channel { + id: String, +} + +impl Channel { + pub fn new(id: String) -> Self { + Self { id } + } + + pub fn id(&self) -> &str { + &self.id + } +} + +#[derive(Debug)] +pub enum DaemonEvent { + TimerTick, + ChannelAdd(Channel), + ChannelRemove(Channel), + SearchDone(Result, Error>), + CaConnEvent(SocketAddrV4, CaConnEvent), +} + +impl DaemonEvent { + pub fn summary(&self) -> String { + use DaemonEvent::*; + match self { + TimerTick => format!("TimerTick"), + ChannelAdd(x) => format!("ChannelAdd {x:?}"), + ChannelRemove(x) => format!("ChannelRemove {x:?}"), + SearchDone(_x) => format!("SearchDone"), + CaConnEvent(_a, b) => { + use crate::ca::conn::CaConnEventValue::*; + match &b.value { + None => format!("CaConnEvent/None"), + EchoTimeout => format!("CaConnEvent/EchoTimeout"), + HealthCheckDone => format!("CaConnEvent/HealthCheckDone"), + ConnCommandResult(_) => format!("CaConnEvent/ConnCommandResult"), + EndOfStream => format!("CaConnEvent/EndOfStream"), + } + } + } + } +} diff --git a/netfetch/src/metrics.rs b/netfetch/src/metrics.rs index 77e94f5..f9c4502 100644 --- a/netfetch/src/metrics.rs +++ b/netfetch/src/metrics.rs @@ -1,5 +1,7 @@ use crate::ca::IngestCommons; use crate::ca::METRICS; +use crate::daemon_common::DaemonEvent; +use async_channel::Sender; use axum::extract::Query; use err::Error; use http::Request; @@ -113,6 +115,10 @@ struct DummyQuery { pub struct DaemonComm {} impl DaemonComm { + pub fn new(tx: Sender) -> Self { + Self {} + } + pub fn dummy() -> Self { Self {} } diff --git a/netfetch/src/netfetch.rs b/netfetch/src/netfetch.rs index dd6c60c..f712f98 100644 --- a/netfetch/src/netfetch.rs +++ b/netfetch/src/netfetch.rs @@ -4,6 +4,7 @@ pub mod bsread; pub mod ca; pub mod channelwriter; pub mod conf; +pub mod daemon_common; pub mod dbpg; pub mod errconv; pub mod insertworker;