From a5c927538e8a6497e3d01002786ee9c764b5f45c Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 6 Jan 2023 16:09:25 +0100 Subject: [PATCH] Ingest swissfel ca until before electric test --- daqingest/Cargo.toml | 1 + daqingest/src/daemon.rs | 176 +++++++++++++++++++++++++++++++---- netfetch/Cargo.toml | 2 +- netfetch/src/ca.rs | 7 +- netfetch/src/ca/findioc.rs | 4 - netfetch/src/ca/search.rs | 10 +- netfetch/src/insertworker.rs | 15 ++- 7 files changed, 179 insertions(+), 36 deletions(-) diff --git a/daqingest/Cargo.toml b/daqingest/Cargo.toml index 66ee052..769fedc 100644 --- a/daqingest/Cargo.toml +++ b/daqingest/Cargo.toml @@ -25,5 +25,6 @@ serde = { version = "1.0", features = ["derive"] } err = { path = "../../daqbuffer/err" } log = { path = "../log" } netpod = { path = "../../daqbuffer/netpod" } +stats = { path = "../stats" } netfetch = { path = "../netfetch" } taskrun = { path = "../../daqbuffer/taskrun" } diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index 3a9b7f0..fdad9c9 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -29,15 +29,16 @@ use std::sync::Arc; use std::time::Duration; use std::time::SystemTime; use tokio_postgres::Client as PgClient; +use tokio_postgres::Statement as PgStatement; const CHECK_CHANS_PER_TICK: usize = 10000; const FINDER_TIMEOUT: usize = 100; +const TIMEOUT_WARN: usize = 3000; const FINDER_JOB_QUEUE_LEN_MAX: usize = 20; -const FINDER_IN_FLIGHT_MAX: usize = 200; +const FINDER_IN_FLIGHT_MAX: usize = 400; const FINDER_BATCH_SIZE: usize = 8; -const CURRENT_SEARCH_PENDING_MAX: usize = 220; +const CURRENT_SEARCH_PENDING_MAX: usize = 420; const SEARCH_PENDING_TIMEOUT: usize = 10000; -const TIMEOUT_WARN_FACTOR: usize = 10; #[derive(Clone, Debug, Serialize, PartialEq, PartialOrd, Eq, Ord)] pub struct Channel { @@ -180,6 +181,13 @@ pub async fn make_pg_client(d: &Database) -> Result { Ok(client) } +struct AddrInsertJob { + backend: String, + channel: Channel, + response_addr: SocketAddrV4, + addr: SocketAddrV4, +} + pub struct Daemon { opts: DaemonOpts, channel_states: BTreeMap, @@ -198,19 +206,79 @@ pub struct Daemon { count_unassigned: usize, count_assigned: usize, last_status_print: SystemTime, + insert_workers_jh: Vec>, + #[allow(unused)] + pg_client: Arc, + #[allow(unused)] + qu_addr_insert: PgStatement, + ioc_addr_inserter_jh: tokio::task::JoinHandle<()>, + ioc_addr_inserter_tx: Sender, } impl Daemon { pub async fn new(opts: DaemonOpts) -> Result { let pg_client = Arc::new(make_pg_client(&opts.pgconf).await?); - let datastore = DataStore::new(&opts.scyconf, pg_client).await?; + let datastore = DataStore::new(&opts.scyconf, pg_client.clone()).await?; let datastore = Arc::new(datastore); let (tx, rx) = async_channel::bounded(32); let tgts = opts.search_tgts.clone(); let (search_tx, ioc_finder_jh) = Self::start_finder(tx.clone(), tgts); let common_insert_item_queue = Arc::new(CommonInsertItemQueue::new(opts.insert_item_queue_cap)); + let common_insert_item_queue_2 = Arc::new(CommonInsertItemQueue::new(opts.insert_item_queue_cap)); let insert_queue_counter = Arc::new(AtomicUsize::new(0)); + // Insert queue hook + if true { + tokio::spawn({ + let rx = common_insert_item_queue.receiver(); + let tx = common_insert_item_queue_2.sender().await; + let insert_queue_counter = insert_queue_counter.clone(); + async move { + let mut printed_last = SystemTime::now(); + let mut histo = BTreeMap::new(); + while let Ok(item) = rx.recv().await { + insert_queue_counter.fetch_add(1, atomic::Ordering::AcqRel); + //trace!("insert queue item {item:?}"); + match &item { + netfetch::store::QueryItem::Insert(item) => { + item.pulse; + histo + .entry(item.series.clone()) + .and_modify(|(c, msp, lsp, pulse)| { + *c += 1; + *msp = item.ts_msp; + *lsp = item.ts_lsp; + *pulse = item.pulse; + }) + .or_insert((0 as usize, item.ts_msp, item.ts_lsp, item.pulse)); + } + _ => {} + } + match tx.send(item).await { + Ok(_) => {} + Err(e) => { + error!("insert queue hook send {e}"); + break; + } + } + let tsnow = SystemTime::now(); + if tsnow.duration_since(printed_last).unwrap_or(Duration::ZERO) >= Duration::from_millis(2000) { + printed_last = tsnow; + let mut all: Vec<_> = histo + .iter() + .map(|(k, (c, msp, lsp, pulse))| (usize::MAX - *c, k.clone(), *msp, *lsp, *pulse)) + .collect(); + all.sort_unstable(); + for (c, sid, msp, lsp, pulse) in all.into_iter().take(4) { + info!("{:10} {:20} {:14} {:20} {:?}", usize::MAX - c, msp, lsp, pulse, sid); + } + histo.clear(); + } + } + } + }); + } + let ingest_commons = netfetch::ca::IngestCommons { pgconf: Arc::new(opts.pgconf.clone()), backend: opts.backend().into(), @@ -223,19 +291,74 @@ impl Daemon { insert_frac: AtomicU64::new(1000), ca_conn_set: netfetch::ca::connset::CaConnSet::new(), }; - let _ingest_commons = Arc::new(ingest_commons); + let ingest_commons = Arc::new(ingest_commons); - // TODO hook up with insert worker - tokio::spawn({ - let rx = common_insert_item_queue.receiver(); - let insert_queue_counter = insert_queue_counter.clone(); + let qu_addr_insert = { + const TEXT: tokio_postgres::types::Type = tokio_postgres::types::Type::TEXT; + let sql = "insert into ioc_by_channel_log (facility, channel, responseaddr, addr) values ($1, $2, $3, $4)"; + let res = pg_client + .prepare_typed(sql, &[TEXT, TEXT, TEXT, TEXT]) + .await + .map_err(|e| Error::from(e.to_string()))?; + res + }; + + let insert_scylla_sessions = 1; + let insert_worker_count = 1000; + let use_rate_limit_queue = false; + + // TODO use a new stats type: + let store_stats = Arc::new(stats::CaConnStats::new()); + let ttls = netfetch::insertworker::Ttls { + index: Duration::from_secs(60 * 60 * 24 * 4), + d0: Duration::from_secs(60 * 60 * 24 * 3), + d1: Duration::from_secs(60 * 60 * 4), + }; + let jh_insert_workers = netfetch::insertworker::spawn_scylla_insert_workers( + opts.scyconf.clone(), + insert_scylla_sessions, + insert_worker_count, + common_insert_item_queue_2.clone(), + ingest_commons.clone(), + pg_client.clone(), + store_stats.clone(), + use_rate_limit_queue, + ttls, + ) + .await?; + + let (ioc_addr_inserter_tx, ioc_addr_inserter_rx) = async_channel::bounded(64); + let fut = { + let pg_client = pg_client.clone(); + let qu_addr_insert = qu_addr_insert.clone(); async move { - while let Ok(item) = rx.recv().await { - insert_queue_counter.fetch_add(1, atomic::Ordering::AcqRel); - trace!("insert queue item {item:?}"); + let mut stream = ioc_addr_inserter_rx + .map(|k: AddrInsertJob| { + let pgc = pg_client.clone(); + let qu_addr_insert = qu_addr_insert.clone(); + async move { + let response_addr = k.response_addr.to_string(); + let addr = k.addr.to_string(); + let res = pgc + .execute(&qu_addr_insert, &[&k.backend, &k.channel.id(), &response_addr, &addr]) + .await + .map_err(|e| Error::from(e.to_string()))?; + Ok::<_, Error>(res) + } + }) + .buffer_unordered(16); + while let Some(item) = stream.next().await { + match item { + Ok(_) => {} + Err(e) => { + error!("{e}"); + } + } } } - }); + }; + let ioc_addr_inserter_task = tokio::spawn(fut); + let ret = Self { opts, channel_states: BTreeMap::new(), @@ -254,6 +377,11 @@ impl Daemon { count_unassigned: 0, count_assigned: 0, last_status_print: SystemTime::now(), + insert_workers_jh: jh_insert_workers, + pg_client, + qu_addr_insert, + ioc_addr_inserter_jh: ioc_addr_inserter_task, + ioc_addr_inserter_tx, }; Ok(ret) } @@ -622,7 +750,7 @@ impl Daemon { Ok(()) } - fn handle_search_done(&mut self, item: Result, Error>) -> Result<(), Error> { + async fn handle_search_done(&mut self, item: Result, Error>) -> Result<(), Error> { //debug!("handle SearchDone: {res:?}"); match item { Ok(a) => { @@ -633,7 +761,7 @@ impl Daemon { if let Some(st) = self.channel_states.get_mut(&ch) { if let ChannelStateValue::Active(ActiveChannelState::SearchPending { since }) = &st.value { let dt = SystemTime::now().duration_since(*since).unwrap(); - if dt > Duration::from_millis(FINDER_TIMEOUT as u64 * TIMEOUT_WARN_FACTOR as u64) { + if dt > Duration::from_millis(TIMEOUT_WARN as u64) { warn!( " FOUND {:5.0} {:5.0} {addr}", 1e3 * dt.as_secs_f32(), @@ -647,6 +775,15 @@ impl Daemon { }, }); st.value = stnew; + if let (Some(response_addr),) = (res.response_addr,) { + let job = AddrInsertJob { + backend: self.opts.backend().into(), + channel: ch.clone(), + response_addr: response_addr, + addr: addr, + }; + self.ioc_addr_inserter_tx.send(job).await?; + } } else { warn!( "address found, but state for {ch:?} is not SearchPending: {:?}", @@ -662,7 +799,7 @@ impl Daemon { if let Some(st) = self.channel_states.get_mut(&ch) { if let ChannelStateValue::Active(ActiveChannelState::SearchPending { since }) = &st.value { let dt = SystemTime::now().duration_since(*since).unwrap(); - if dt > Duration::from_millis(FINDER_TIMEOUT as u64 * TIMEOUT_WARN_FACTOR as u64) { + if dt > Duration::from_millis(TIMEOUT_WARN as u64) { warn!( "NOT FOUND {:5.0} {:5.0}", 1e3 * dt.as_secs_f32(), @@ -692,7 +829,7 @@ impl Daemon { TimerTick => self.handle_timer_tick().await, ChannelAdd(ch) => self.handle_channel_add(ch), ChannelRemove(ch) => self.handle_channel_remove(ch), - SearchDone(item) => self.handle_search_done(item), + SearchDone(item) => self.handle_search_done(item).await, } } @@ -727,8 +864,11 @@ impl Daemon { } } } - warn!("TODO shut down IOC finder properly"); + warn!("TODO wait for IOC finder properly"); let _ = &self.ioc_finder_jh; + warn!("TODO wait for insert workers"); + let _ = &self.insert_workers_jh; + let _ = &self.ioc_addr_inserter_jh; Ok(()) } } diff --git a/netfetch/Cargo.toml b/netfetch/Cargo.toml index 597039a..434d58e 100644 --- a/netfetch/Cargo.toml +++ b/netfetch/Cargo.toml @@ -11,7 +11,7 @@ path = "src/netfetch.rs" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" serde_cbor = "0.11" -serde_yaml = "0.8.23" +serde_yaml = "0.9.16" tokio = { version = "1.23.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs", "tracing"] } tokio-stream = { version = "0.1", features = ["fs"]} async-channel = "1.6" diff --git a/netfetch/src/ca.rs b/netfetch/src/ca.rs index 261b4b0..e7be0be 100644 --- a/netfetch/src/ca.rs +++ b/netfetch/src/ca.rs @@ -199,6 +199,11 @@ pub async fn ca_connect(opts: CaIngestOpts, channels: &Vec) -> Result<() // TODO use a new stats type: let store_stats = Arc::new(CaConnStats::new()); + let ttls = crate::insertworker::Ttls { + index: opts.ttl_index(), + d0: opts.ttl_d0(), + d1: opts.ttl_d1(), + }; let jh_insert_workers = spawn_scylla_insert_workers( opts.scylla().clone(), opts.insert_scylla_sessions(), @@ -208,7 +213,7 @@ pub async fn ca_connect(opts: CaIngestOpts, channels: &Vec) -> Result<() pg_client.clone(), store_stats.clone(), opts.use_rate_limit_queue(), - opts.clone(), + ttls, ) .await?; diff --git a/netfetch/src/ca/findioc.rs b/netfetch/src/ca/findioc.rs index ca52389..ea2505a 100644 --- a/netfetch/src/ca/findioc.rs +++ b/netfetch/src/ca/findioc.rs @@ -45,7 +45,6 @@ struct SearchBatch { #[derive(Debug)] pub struct FindIocRes { pub channel: String, - pub query_addr: Option, pub response_addr: Option, pub addr: Option, pub dt: Duration, @@ -392,8 +391,6 @@ impl FindIocStream { let dt = tsnow.saturating_duration_since(batch.ts_beg); let res = FindIocRes { channel: ch.into(), - // TODO associate a batch with a specific query address. - query_addr: None, response_addr: Some(src.clone()), addr: Some(addr), dt, @@ -463,7 +460,6 @@ impl FindIocStream { } for ((sid, ch), dt) in sids.into_iter().zip(chns).zip(dts) { let res = FindIocRes { - query_addr: None, response_addr: None, channel: ch, addr: None, diff --git a/netfetch/src/ca/search.rs b/netfetch/src/ca/search.rs index 19d3ba0..1abce28 100644 --- a/netfetch/src/ca/search.rs +++ b/netfetch/src/ca/search.rs @@ -60,8 +60,8 @@ pub async fn ca_search(opts: CaIngestOpts, channels: &Vec) -> Result<(), const TEXT: tokio_postgres::types::Type = tokio_postgres::types::Type::TEXT; pg_client .prepare_typed( - "insert into ioc_by_channel_log (facility, channel, queryaddr, responseaddr, addr) values ($1, $2, $3, $4, $5)", - &[TEXT, TEXT, TEXT, TEXT, TEXT], + "insert into ioc_by_channel_log (facility, channel, responseaddr, addr) values ($1, $2, $3, $4)", + &[TEXT, TEXT, TEXT, TEXT], ) .await .unwrap() @@ -154,14 +154,10 @@ pub async fn ca_search(opts: CaIngestOpts, channels: &Vec) -> Result<(), if do_block { info!("blacklisting {item:?}"); } else { - let queryaddr = item.query_addr.map(|x| x.to_string()); let responseaddr = item.response_addr.map(|x| x.to_string()); let addr = item.addr.map(|x| x.to_string()); pg_client - .execute( - &qu_insert, - &[&opts.backend(), &item.channel, &queryaddr, &responseaddr, &addr], - ) + .execute(&qu_insert, &[&opts.backend(), &item.channel, &responseaddr, &addr]) .await .unwrap(); } diff --git a/netfetch/src/insertworker.rs b/netfetch/src/insertworker.rs index 219df3d..ec919b3 100644 --- a/netfetch/src/insertworker.rs +++ b/netfetch/src/insertworker.rs @@ -1,6 +1,5 @@ use crate::ca::store::DataStore; use crate::ca::IngestCommons; -use crate::conf::CaIngestOpts; use crate::rt::JoinHandle; use crate::store::{CommonInsertItemQueue, IntoSimplerError, QueryItem}; use err::Error; @@ -44,6 +43,12 @@ async fn back_off_sleep(backoff_dt: &mut Duration) { tokio::time::sleep(*backoff_dt).await; } +pub struct Ttls { + pub index: Duration, + pub d0: Duration, + pub d1: Duration, +} + pub async fn spawn_scylla_insert_workers( scyconf: ScyllaConfig, insert_scylla_sessions: usize, @@ -53,7 +58,7 @@ pub async fn spawn_scylla_insert_workers( pg_client: Arc, store_stats: Arc, use_rate_limit_queue: bool, - opts: CaIngestOpts, + ttls: Ttls, ) -> Result>, Error> { let (q2_tx, q2_rx) = async_channel::bounded(insert_item_queue.receiver().capacity().unwrap_or(20000)); { @@ -125,9 +130,9 @@ pub async fn spawn_scylla_insert_workers( insert_item_queue.receiver() }; let ingest_commons = ingest_commons.clone(); - let ttl_msp = opts.ttl_index(); - let ttl_0d = opts.ttl_d0(); - let ttl_1d = opts.ttl_d1(); + let ttl_msp = ttls.index; + let ttl_0d = ttls.d0; + let ttl_1d = ttls.d1; let fut = async move { let backoff_0 = Duration::from_millis(10); let mut backoff = backoff_0.clone();