From e2e847989f9d466506a16cdd5df876764b0407a4 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Mon, 16 Jan 2023 18:56:01 +0100 Subject: [PATCH] Refactor channel add --- .cargo/cargo-lock | 62 +-- Cargo.toml | 8 +- daqingest/Cargo.toml | 1 + daqingest/src/daemon.rs | 854 ++++++++++++++++++++++++------------ netfetch/Cargo.toml | 1 + netfetch/src/ca.rs | 152 +++++-- netfetch/src/ca/conn.rs | 687 +++++++++++++++++------------ netfetch/src/ca/connset.rs | 228 +++++----- netfetch/src/ca/findioc.rs | 20 +- netfetch/src/ca/search.rs | 161 +++++-- netfetch/src/linuxhelper.rs | 22 +- netfetch/src/metrics.rs | 85 +--- netfetch/src/store.rs | 74 ++-- 13 files changed, 1452 insertions(+), 903 deletions(-) diff --git a/.cargo/cargo-lock b/.cargo/cargo-lock index 57bd6ec..1560380 100644 --- a/.cargo/cargo-lock +++ b/.cargo/cargo-lock @@ -255,9 +255,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.0.32" +version = "4.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7db700bc935f9e43e88d00b0850dae18a63773cfbec6d8e070fccf7fef89a39" +checksum = "4ec7a4128863c188deefe750ac1d1dfe66c236909f845af04beed823638dc1b2" dependencies = [ "bitflags", "clap_derive", @@ -270,9 +270,9 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.0.21" +version = "4.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0177313f9f02afc995627906bbd8967e2be069f5261954222dac78290c2b9014" +checksum = "684a277d672e91966334af371f1a7b5833f9aa00b07c84e92fbce95e00208ce8" dependencies = [ "heck 0.4.0", "proc-macro-error", @@ -283,9 +283,9 @@ dependencies = [ [[package]] name = "clap_lex" -version = "0.3.0" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0d4198f73e42b4936b35b5bb248d81d2b595ecb170da0bac7655c54eedfa8da8" +checksum = "783fe232adfca04f90f56201b26d79682d4cd2625e0bc7290b95123afe558ade" dependencies = [ "os_str_bytes", ] @@ -302,9 +302,9 @@ dependencies = [ [[package]] name = "concurrent-queue" -version = "2.0.0" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd7bef69dc86e3c610e4e7aed41035e2a7ed12e72dd7530f61327a6579a4390b" +checksum = "c278839b831783b70278b14df4d45e1beb1aad306c07bb796637de9a0e323e8e" dependencies = [ "crossbeam-utils", ] @@ -452,6 +452,7 @@ dependencies = [ "clap", "err", "futures-util", + "libc", "log 0.0.1", "netfetch", "netpod", @@ -921,9 +922,9 @@ dependencies = [ [[package]] name = "io-lifetimes" -version = "1.0.3" +version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46112a93252b123d31a119a8d1a1ac19deac4fac6e0e8b0df58f0d4e5870e63c" +checksum = "e7d6c6f8c91b4b9ed43484ad1a938e393caf35960fce7f82a040497207bd8e9e" dependencies = [ "libc", "windows-sys", @@ -1125,6 +1126,7 @@ dependencies = [ "tokio", "tokio-postgres", "tokio-stream", + "tracing", "url", ] @@ -1147,9 +1149,9 @@ dependencies = [ [[package]] name = "nom" -version = "7.1.2" +version = "7.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5507769c4919c998e69e49c839d9dc6e693ede4cc4290d6ad8b41d4f09c548c" +checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" dependencies = [ "memchr", "minimal-lexical", @@ -1265,9 +1267,9 @@ dependencies = [ [[package]] name = "parking_lot_core" -version = "0.9.5" +version = "0.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ff9f3fef3968a3ec5945535ed654cb38ff72d7495a25619e2247fb15a2ed9ba" +checksum = "ba1ef8814b5c993410bb3adfad7a5ed269563e4a2f90c41f5d85be7fb47133bf" dependencies = [ "cfg-if", "libc", @@ -1891,9 +1893,9 @@ dependencies = [ [[package]] name = "termcolor" -version = "1.1.3" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bab24d30b911b2376f3a13cc2cd443142f0c81dda04c118693e35b3835757755" +checksum = "be55cf8942feac5c765c2c993422806843c9a9a45d4d5c407ad6dd2ea95eb9b6" dependencies = [ "winapi-util", ] @@ -2449,42 +2451,42 @@ dependencies = [ [[package]] name = "windows_aarch64_gnullvm" -version = "0.42.0" +version = "0.42.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41d2aa71f6f0cbe00ae5167d90ef3cfe66527d6f613ca78ac8024c3ccab9a19e" +checksum = "8c9864e83243fdec7fc9c5444389dcbbfd258f745e7853198f365e3c4968a608" [[package]] name = "windows_aarch64_msvc" -version = "0.42.0" +version = "0.42.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd0f252f5a35cac83d6311b2e795981f5ee6e67eb1f9a7f64eb4500fbc4dcdb4" +checksum = "4c8b1b673ffc16c47a9ff48570a9d85e25d265735c503681332589af6253c6c7" [[package]] name = "windows_i686_gnu" -version = "0.42.0" +version = "0.42.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fbeae19f6716841636c28d695375df17562ca208b2b7d0dc47635a50ae6c5de7" +checksum = "de3887528ad530ba7bdbb1faa8275ec7a1155a45ffa57c37993960277145d640" [[package]] name = "windows_i686_msvc" -version = "0.42.0" +version = "0.42.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "84c12f65daa39dd2babe6e442988fc329d6243fdce47d7d2d155b8d874862246" +checksum = "bf4d1122317eddd6ff351aa852118a2418ad4214e6613a50e0191f7004372605" [[package]] name = "windows_x86_64_gnu" -version = "0.42.0" +version = "0.42.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf7b1b21b5362cbc318f686150e5bcea75ecedc74dd157d874d754a2ca44b0ed" +checksum = "c1040f221285e17ebccbc2591ffdc2d44ee1f9186324dd3e84e99ac68d699c45" [[package]] name = "windows_x86_64_gnullvm" -version = "0.42.0" +version = "0.42.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09d525d2ba30eeb3297665bd434a54297e4170c7f1a44cad4ef58095b4cd2028" +checksum = "628bfdf232daa22b0d64fdb62b09fcc36bb01f05a3939e20ab73aaf9470d0463" [[package]] name = "windows_x86_64_msvc" -version = "0.42.0" +version = "0.42.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f40009d85759725a34da6d89a94e63d7bdc50a862acf0dbc7c8e488f1edcb6f5" +checksum = "447660ad36a13288b1db4d4248e857b510e8c3a225c822ba4fb748c0aafecffd" diff --git a/Cargo.toml b/Cargo.toml index f40b37e..54cc388 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,13 +2,13 @@ members = ["log", "netfetch", "daqingest"] [profile.release] -opt-level = 3 -debug = 0 +opt-level = 2 +debug = 1 overflow-checks = false debug-assertions = false lto = "thin" -#codegen-units = 32 -incremental = false +codegen-units = 64 +incremental = true [patch.crates-io] #tokio = { git = "https://github.com/dominikwerder/tokio", rev = "995221d8" } diff --git a/daqingest/Cargo.toml b/daqingest/Cargo.toml index 769fedc..405f881 100644 --- a/daqingest/Cargo.toml +++ b/daqingest/Cargo.toml @@ -22,6 +22,7 @@ bytes = "1.1" scylla = "0.7" tokio-postgres = "0.7.7" serde = { version = "1.0", features = ["derive"] } +libc = "0.2" err = { path = "../../daqbuffer/err" } log = { path = "../log" } netpod = { path = "../../daqbuffer/netpod" } diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index 3301a6e..6f51e1f 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -1,7 +1,6 @@ use async_channel::Receiver; use async_channel::Sender; use err::Error; -use futures_util::Future; use futures_util::FutureExt; use futures_util::StreamExt; use log::*; @@ -12,6 +11,7 @@ use netfetch::ca::findioc::FindIocRes; use netfetch::ca::findioc::FindIocStream; use netfetch::ca::store::DataStore; use netfetch::ca::IngestCommons; +use netfetch::ca::SlowWarnable; use netfetch::conf::CaIngestOpts; use netfetch::errconv::ErrConv; use netfetch::insertworker::Ttls; @@ -21,11 +21,9 @@ use netpod::Database; use netpod::ScyllaConfig; use serde::Serialize; use std::collections::BTreeMap; +use std::collections::HashMap; use std::collections::VecDeque; -use std::fmt; -use std::net::SocketAddr; use std::net::SocketAddrV4; -use std::pin::Pin; use std::sync::atomic; use std::sync::atomic::AtomicU64; use std::sync::atomic::AtomicUsize; @@ -34,16 +32,55 @@ use std::time::Duration; use std::time::Instant; use std::time::SystemTime; use tokio_postgres::Client as PgClient; -use tokio_postgres::Statement as PgStatement; +use tokio_postgres::Row as PgRow; +use tracing::info_span; +use tracing::Instrument; -const CHECK_CHANS_PER_TICK: usize = 10000; -const FINDER_TIMEOUT: usize = 100; -const TIMEOUT_WARN: usize = 6000; -const FINDER_JOB_QUEUE_LEN_MAX: usize = 20; -const FINDER_IN_FLIGHT_MAX: usize = 400; +const SEARCH_BATCH_MAX: usize = 256; +const CURRENT_SEARCH_PENDING_MAX: usize = SEARCH_BATCH_MAX * 4; +const SEARCH_DB_PIPELINE_LEN: usize = 4; +const FINDER_JOB_QUEUE_LEN_MAX: usize = 10; +const FINDER_IN_FLIGHT_MAX: usize = 800; const FINDER_BATCH_SIZE: usize = 8; -const CURRENT_SEARCH_PENDING_MAX: usize = 420; -const SEARCH_PENDING_TIMEOUT: usize = 10000; +const CHECK_CHANS_PER_TICK: usize = 10000; +const CA_CONN_INSERT_QUEUE_MAX: usize = 256; + +const UNKNOWN_ADDRESS_STAY: Duration = Duration::from_millis(2000); +const NO_ADDRESS_STAY: Duration = Duration::from_millis(20000); +const SEARCH_PENDING_TIMEOUT: Duration = Duration::from_millis(30000); +const SEARCH_PENDING_TIMEOUT_WARN: Duration = Duration::from_millis(8000); +const FINDER_TIMEOUT: Duration = Duration::from_millis(100); +const CHANNEL_CHECK_INTERVAL: Duration = Duration::from_millis(5000); +const PRINT_ACTIVE_INTERVAL: Duration = Duration::from_millis(8000); + +const DO_ASSIGN_TO_CA_CONN: bool = true; + +static SEARCH_REQ_MARK_COUNT: AtomicUsize = AtomicUsize::new(0); +static SEARCH_REQ_SEND_COUNT: AtomicUsize = AtomicUsize::new(0); +static SEARCH_REQ_RECV_COUNT: AtomicUsize = AtomicUsize::new(0); +static SEARCH_REQ_BATCH_SEND_COUNT: AtomicUsize = AtomicUsize::new(0); +static SEARCH_REQ_BATCH_RECV_COUNT: AtomicUsize = AtomicUsize::new(0); +static SEARCH_RES_0_COUNT: AtomicUsize = AtomicUsize::new(0); +static SEARCH_RES_1_COUNT: AtomicUsize = AtomicUsize::new(0); +static SEARCH_RES_2_COUNT: AtomicUsize = AtomicUsize::new(0); +static SEARCH_RES_3_COUNT: AtomicUsize = AtomicUsize::new(0); +static SEARCH_ANS_COUNT: AtomicUsize = AtomicUsize::new(0); + +#[allow(unused)] +macro_rules! debug_batch { + (D$($arg:tt)*) => (); + ($($arg:tt)*) => (if false { + info!($($arg)*); + }); +} + +#[allow(unused)] +macro_rules! trace_batch { + (D$($arg:tt)*) => (); + ($($arg:tt)*) => (if false { + trace!($($arg)*); + }); +} #[derive(Clone, Debug, Serialize, PartialEq, PartialOrd, Eq, Ord)] pub struct Channel { @@ -60,52 +97,66 @@ impl Channel { } } +#[allow(non_snake_case)] +mod serde_Instant { + use serde::Serializer; + use std::time::Instant; + + #[allow(unused)] + pub fn serialize(val: &Instant, ser: S) -> Result + where + S: Serializer, + { + let dur = val.elapsed(); + ser.serialize_u64(dur.as_secs() * 1000 + dur.subsec_millis() as u64) + } +} + #[derive(Clone, Debug, Serialize)] pub enum ConnectionStateValue { Unconnected, - Connected { since: SystemTime }, + Connected { + //#[serde(with = "serde_Instant")] + since: SystemTime, + }, } #[derive(Clone, Debug, Serialize)] pub struct ConnectionState { + //#[serde(with = "serde_Instant")] updated: SystemTime, value: ConnectionStateValue, } #[derive(Clone, Debug, Serialize)] pub enum WithAddressState { - Unassigned { assign_at: SystemTime }, + Unassigned { + //#[serde(with = "serde_Instant")] + assign_at: SystemTime, + }, Assigned(ConnectionState), } #[derive(Clone, Debug, Serialize)] pub enum ActiveChannelState { - UnknownAddress, - SearchPending { + UnknownAddress { since: SystemTime, }, + SearchPending { + //#[serde(with = "serde_Instant")] + since: SystemTime, + did_send: bool, + }, WithAddress { addr: SocketAddrV4, state: WithAddressState, }, - NoAddress, + NoAddress { + since: SystemTime, + }, } -enum ChanOp { - Finder(String, SystemTime), - ConnCmd(Pin> + Send>>), -} - -impl fmt::Debug for ChanOp { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - match self { - Self::Finder(arg0, arg1) => fmt.debug_tuple("Finder").field(arg0).field(arg1).finish(), - Self::ConnCmd(_arg0) => fmt.debug_tuple("ConnCmd").finish(), - } - } -} - -#[derive(Debug, Serialize)] +#[derive(Debug)] pub enum ChannelStateValue { Active(ActiveChannelState), ToRemove { addr: Option }, @@ -114,7 +165,16 @@ pub enum ChannelStateValue { #[derive(Debug)] pub struct ChannelState { value: ChannelStateValue, - pending_op: Option, +} + +#[derive(Debug, Serialize)] +pub enum CaConnStateValue {} + +#[derive(Debug)] +pub struct CaConnState { + last_feedback: Instant, + #[allow(unused)] + value: CaConnStateValue, } #[derive(Debug)] @@ -123,7 +183,29 @@ pub enum DaemonEvent { ChannelAdd(Channel), ChannelRemove(Channel), SearchDone(Result, Error>), - CaConnEvent(CaConnEvent), + CaConnEvent(SocketAddrV4, CaConnEvent), +} + +impl DaemonEvent { + pub fn summary(&self) -> String { + use DaemonEvent::*; + match self { + TimerTick => format!("TimerTick"), + ChannelAdd(x) => format!("ChannelAdd {x:?}"), + ChannelRemove(x) => format!("ChannelRemove {x:?}"), + SearchDone(_x) => format!("SearchDone"), + CaConnEvent(_a, b) => { + use netfetch::ca::conn::CaConnEventValue::*; + match &b.value { + None => format!("CaConnEvent/None"), + EchoTimeout => format!("CaConnEvent/EchoTimeout"), + HealthCheckDone => format!("CaConnEvent/HealthCheckDone"), + ConnCommandResult(_) => format!("CaConnEvent/ConnCommandResult"), + EndOfStream => format!("CaConnEvent/EndOfStream"), + } + } + } + } } #[derive(Debug, Clone)] @@ -132,8 +214,6 @@ pub struct DaemonOpts { local_epics_hostname: String, array_truncate: usize, insert_item_queue_cap: usize, - search_tgts: Vec, - //search_excl: Vec, pgconf: Database, scyconf: ScyllaConfig, ttls: Ttls, @@ -189,15 +269,9 @@ pub async fn make_pg_client(d: &Database) -> Result { Ok(client) } -struct AddrInsertJob { - backend: String, - channel: Channel, - response_addr: SocketAddrV4, - addr: SocketAddrV4, -} - pub struct Daemon { opts: DaemonOpts, + connection_states: BTreeMap, channel_states: BTreeMap, tx: Sender, rx: Receiver, @@ -209,6 +283,7 @@ pub struct Daemon { insert_queue_counter: Arc, count_unknown_address: usize, count_search_pending: usize, + count_search_sent: usize, count_no_address: usize, count_unassigned: usize, count_assigned: usize, @@ -216,10 +291,6 @@ pub struct Daemon { insert_workers_jh: Vec>, #[allow(unused)] pg_client: Arc, - #[allow(unused)] - qu_addr_insert: PgStatement, - ioc_addr_inserter_jh: tokio::task::JoinHandle<()>, - ioc_addr_inserter_tx: Sender, ingest_commons: Arc, caconn_last_channel_check: Instant, } @@ -230,8 +301,15 @@ impl Daemon { let datastore = DataStore::new(&opts.scyconf, pg_client.clone()).await?; let datastore = Arc::new(datastore); let (tx, rx) = async_channel::bounded(32); - let tgts = opts.search_tgts.clone(); - let (search_tx, ioc_finder_jh) = Self::start_finder(tx.clone(), tgts); + let pgcs = { + let mut a = Vec::new(); + for _ in 0..SEARCH_DB_PIPELINE_LEN { + let pgc = Arc::new(make_pg_client(&opts.pgconf).await?); + a.push(pgc); + } + a + }; + let (search_tx, ioc_finder_jh) = Self::start_finder(tx.clone(), opts.backend().into(), pgcs); let common_insert_item_queue = Arc::new(CommonInsertItemQueue::new(opts.insert_item_queue_cap)); let common_insert_item_queue_2 = Arc::new(CommonInsertItemQueue::new(opts.insert_item_queue_cap)); let insert_queue_counter = Arc::new(AtomicUsize::new(0)); @@ -240,10 +318,10 @@ impl Daemon { if true { tokio::spawn({ let rx = common_insert_item_queue.receiver(); - let tx = common_insert_item_queue_2.sender().await; + let tx = common_insert_item_queue_2.sender(); let insert_queue_counter = insert_queue_counter.clone(); async move { - let mut printed_last = SystemTime::now(); + let mut printed_last = Instant::now(); let mut histo = BTreeMap::new(); while let Ok(item) = rx.recv().await { insert_queue_counter.fetch_add(1, atomic::Ordering::AcqRel); @@ -275,8 +353,8 @@ impl Daemon { break; } } - let tsnow = SystemTime::now(); - if tsnow.duration_since(printed_last).unwrap_or(Duration::ZERO) >= Duration::from_millis(2000) { + let tsnow = Instant::now(); + if tsnow.duration_since(printed_last) >= PRINT_ACTIVE_INTERVAL { printed_last = tsnow; let mut all: Vec<_> = histo .iter() @@ -319,24 +397,17 @@ impl Daemon { let tx = tx.clone(); async move { while let Ok(item) = rx.recv().await { - match tx.send(DaemonEvent::CaConnEvent(item)).await { + match tx.send(DaemonEvent::CaConnEvent(item.0, item.1)).await { Ok(_) => {} - Err(e) => break, + Err(e) => { + error!("{e}"); + break; + } } } } }); - let qu_addr_insert = { - const TEXT: tokio_postgres::types::Type = tokio_postgres::types::Type::TEXT; - let sql = "insert into ioc_by_channel_log (facility, channel, responseaddr, addr) values ($1, $2, $3, $4)"; - let res = pg_client - .prepare_typed(sql, &[TEXT, TEXT, TEXT, TEXT]) - .await - .map_err(|e| Error::from(e.to_string()))?; - res - }; - let insert_scylla_sessions = 1; let insert_worker_count = 1000; let use_rate_limit_queue = false; @@ -357,40 +428,9 @@ impl Daemon { ) .await?; - let (ioc_addr_inserter_tx, ioc_addr_inserter_rx) = async_channel::bounded(64); - let fut = { - let pg_client = pg_client.clone(); - let qu_addr_insert = qu_addr_insert.clone(); - async move { - let mut stream = ioc_addr_inserter_rx - .map(|k: AddrInsertJob| { - let pgc = pg_client.clone(); - let qu_addr_insert = qu_addr_insert.clone(); - async move { - let response_addr = k.response_addr.to_string(); - let addr = k.addr.to_string(); - let res = pgc - .execute(&qu_addr_insert, &[&k.backend, &k.channel.id(), &response_addr, &addr]) - .await - .map_err(|e| Error::from(e.to_string()))?; - Ok::<_, Error>(res) - } - }) - .buffer_unordered(16); - while let Some(item) = stream.next().await { - match item { - Ok(_) => {} - Err(e) => { - error!("{e}"); - } - } - } - } - }; - let ioc_addr_inserter_task = tokio::spawn(fut); - let ret = Self { opts, + connection_states: BTreeMap::new(), channel_states: BTreeMap::new(), tx, rx, @@ -402,31 +442,220 @@ impl Daemon { insert_queue_counter, count_unknown_address: 0, count_search_pending: 0, + count_search_sent: 0, count_no_address: 0, count_unassigned: 0, count_assigned: 0, last_status_print: SystemTime::now(), insert_workers_jh: jh_insert_workers, pg_client, - qu_addr_insert, - ioc_addr_inserter_jh: ioc_addr_inserter_task, - ioc_addr_inserter_tx, ingest_commons, caconn_last_channel_check: Instant::now(), }; Ok(ret) } - fn start_finder(tx: Sender, tgts: Vec) -> (Sender, tokio::task::JoinHandle<()>) { + fn start_finder( + tx: Sender, + backend: String, + pgcs: Vec>, + ) -> (Sender, tokio::task::JoinHandle<()>) { + fn transform_pgres(rows: Vec) -> VecDeque { + let mut ret = VecDeque::new(); + for row in rows { + let ch: Result = row.try_get(0); + if let Ok(ch) = ch { + if let Some(addr) = row.get::<_, Option>(1) { + let addr = addr.parse().map_or(None, |x| Some(x)); + let item = FindIocRes { + channel: ch, + response_addr: None, + addr, + dt: Duration::from_millis(0), + }; + ret.push_back(item); + } else { + let item = FindIocRes { + channel: ch, + response_addr: None, + addr: None, + dt: Duration::from_millis(0), + }; + ret.push_back(item); + } + } else if let Err(e) = ch { + error!("bad string from pg: {e:?}"); + } + } + ret + } + let (qtx, qrx) = async_channel::bounded(CURRENT_SEARCH_PENDING_MAX); + let fut = async move { + let (batch_tx, batch_rx) = async_channel::bounded(SEARCH_DB_PIPELINE_LEN); + let fut2 = async move { + let mut batch_ix = 0 as usize; + let mut all = Vec::new(); + let mut do_emit = false; + loop { + if do_emit { + do_emit = false; + let batch = std::mem::replace(&mut all, Vec::new()); + let n = batch.len(); + trace_batch!("--- BATCH TRY SEND"); + match batch_tx.send((batch_ix, batch)).await { + Ok(_) => { + trace_batch!("--- BATCH SEND DONE"); + batch_ix += 1; + SEARCH_REQ_BATCH_SEND_COUNT.fetch_add(n, atomic::Ordering::AcqRel); + } + Err(e) => { + error!("can not send batch"); + all = e.0 .1; + } + } + } + match tokio::time::timeout(Duration::from_millis(200), qrx.recv()).await { + Ok(k) => match k { + Ok(item) => { + SEARCH_REQ_RECV_COUNT.fetch_add(1, atomic::Ordering::AcqRel); + all.push(item); + if all.len() >= SEARCH_BATCH_MAX { + do_emit = true; + } + } + Err(e) => { + error!("error in batcher, no more input {e}"); + break; + } + }, + Err(e) => { + let _e: tokio::time::error::Elapsed = e; + if all.len() > 0 { + do_emit = true; + } + } + } + } + warn!("-------- batcher is done --------------"); + }; + tokio::spawn(fut2); + let (pgc_tx, pgc_rx) = async_channel::bounded(128); + for pgc in pgcs { + let sql = "with q1 as (select * from unnest($2::text[]) as unn (ch)) select distinct on (tt.facility, tt.channel) tt.channel, tt.addr from ioc_by_channel_log tt join q1 on tt.channel = q1.ch and tt.facility = $1 and tt.addr is not null order by tt.facility, tt.channel, tsmod desc"; + let qu_select_multi = pgc.prepare(sql).await.unwrap(); + let qu_select_multi = Arc::new(qu_select_multi); + match pgc_tx.send((pgc, qu_select_multi)).await { + Ok(_) => {} + Err(e) => { + error!("can not enqueue pgc {e}"); + } + } + } + let backend = Arc::new(backend.clone()); + let stream = batch_rx + .map(|(batch_ix, batch): (usize, Vec)| { + let pgc_tx = pgc_tx.clone(); + let pgc_rx = pgc_rx.clone(); + let backend = backend.clone(); + SEARCH_REQ_BATCH_RECV_COUNT.fetch_add(batch.len(), atomic::Ordering::AcqRel); + async move { + let ts1 = Instant::now(); + let (pgc, qu_select_multi) = pgc_rx.recv().await.unwrap(); + debug_batch!("run query batch {} len {}", batch_ix, batch.len()); + let qres = pgc.query(qu_select_multi.as_ref(), &[backend.as_ref(), &batch]).await; + let dt = ts1.elapsed(); + debug_batch!( + "done query batch {} len {}: {} {:.3}ms", + batch_ix, + batch.len(), + qres.is_ok(), + dt.as_secs_f32() * 1e3 + ); + if dt > Duration::from_millis(5000) { + let mut out = String::from("["); + for s in &batch { + if out.len() > 1 { + out.push_str(", "); + } + out.push('\''); + out.push_str(s); + out.push('\''); + } + out.push(']'); + eprintln!("VERY LONG QUERY batch_ix {batch_ix}\n{out}"); + } + pgc_tx.send((pgc, qu_select_multi)).await.unwrap(); + (batch_ix, batch, qres) + } + }) + .buffer_unordered(SEARCH_DB_PIPELINE_LEN); + let mut resdiff = 0; + let mut stream = Box::pin(stream); + while let Some((batch_ix, batch, pgres)) = stream.next().await { + match pgres { + Ok(rows) => { + if rows.len() > batch.len() { + error!("MORE RESULTS THAN INPUT"); + } else if rows.len() < batch.len() { + resdiff += batch.len() - rows.len(); + } + let nbatch = batch.len(); + trace_batch!("received results {} resdiff {}", rows.len(), resdiff); + SEARCH_RES_0_COUNT.fetch_add(rows.len(), atomic::Ordering::AcqRel); + let items = transform_pgres(rows); + let names: HashMap<_, _> = items.iter().map(|x| (&x.channel, true)).collect(); + let mut to_add = Vec::new(); + for s in batch { + if !names.contains_key(&s) { + let item = FindIocRes { + channel: s, + response_addr: None, + addr: None, + dt: Duration::from_millis(0), + }; + to_add.push(item); + } + } + SEARCH_RES_1_COUNT.fetch_add(items.len(), atomic::Ordering::AcqRel); + SEARCH_RES_2_COUNT.fetch_add(to_add.len(), atomic::Ordering::AcqRel); + let mut items = items; + items.extend(to_add.into_iter()); + if items.len() != nbatch { + error!("STILL NOT MATCHING LEN"); + } + SEARCH_RES_3_COUNT.fetch_add(items.len(), atomic::Ordering::AcqRel); + debug_batch!("TRY SEND batch_ix {batch_ix}"); + let x = tx.send(DaemonEvent::SearchDone(Ok(items))).await; + match x { + Ok(_) => { + debug_batch!("DONE SEND batch_ix {batch_ix}"); + } + Err(e) => { + error!("finder sees: {e}"); + break; + } + } + } + Err(e) => { + error!("finder sees error: {e}"); + tokio::time::sleep(Duration::from_millis(1000)).await; + } + } + } + }; + let jh = taskrun::spawn(fut); + (qtx, jh) + } + + #[allow(unused)] + fn start_finder_ca( + tx: Sender, + tgts: Vec, + ) -> (Sender, tokio::task::JoinHandle<()>) { let (qtx, qrx) = async_channel::bounded(32); let (atx, arx) = async_channel::bounded(32); let ioc_finder_fut = async move { - let mut finder = FindIocStream::new( - tgts, - Duration::from_millis(FINDER_TIMEOUT as u64), - FINDER_IN_FLIGHT_MAX, - FINDER_BATCH_SIZE, - ); + let mut finder = FindIocStream::new(tgts, FINDER_TIMEOUT, FINDER_IN_FLIGHT_MAX, FINDER_BATCH_SIZE); let fut_tick_dur = Duration::from_millis(100); let mut finder_more = true; let mut finder_fut = OptFut::new(finder.next()); @@ -435,14 +664,11 @@ impl Daemon { let mut fut_tick = Box::pin(tokio::time::sleep(fut_tick_dur)); let mut asend = OptFut::empty(); loop { - //tokio::time::sleep(Duration::from_millis(1)).await; tokio::select! { _ = &mut asend, if asend.is_enabled() => { - //info!("finder asend done"); asend = OptFut::empty(); } r1 = &mut finder_fut, if finder_fut.is_enabled() => { - //info!("finder fut1"); finder_fut = OptFut::empty(); match r1 { Some(item) => { @@ -454,7 +680,6 @@ impl Daemon { finder_more = false; } } - //info!("finder.job_queue_len() {}", finder.job_queue_len()); if qrx_more && finder.job_queue_len() < FINDER_JOB_QUEUE_LEN_MAX { qrx_fut = OptFut::new(qrx.recv()); } @@ -464,11 +689,9 @@ impl Daemon { fut_tick = Box::pin(tokio::time::sleep(fut_tick_dur)); } r2 = &mut qrx_fut, if qrx_fut.is_enabled() => { - //info!("finder fut2"); qrx_fut = OptFut::empty(); match r2 { Ok(item) => { - //info!("Push to finder: {item:?}"); finder.push(item); } Err(e) => { @@ -477,7 +700,6 @@ impl Daemon { qrx_more = false; } } - //info!("finder.job_queue_len() {}", finder.job_queue_len()); if qrx_more && finder.job_queue_len() < FINDER_JOB_QUEUE_LEN_MAX { qrx_fut = OptFut::new(qrx.recv()); } @@ -489,7 +711,6 @@ impl Daemon { fut_tick = Box::pin(tokio::time::sleep(fut_tick_dur)); } _ = &mut fut_tick => { - //info!("finder fut_tick finder.job_queue_len() {}", finder.job_queue_len()); if qrx_more && finder.job_queue_len() < FINDER_JOB_QUEUE_LEN_MAX { qrx_fut = OptFut::new(qrx.recv()); } @@ -511,7 +732,6 @@ impl Daemon { taskrun::spawn({ async move { while let Ok(item) = arx.recv().await { - //info!("forward search result item"); match tx.send(DaemonEvent::SearchDone(item)).await { Ok(_) => {} Err(e) => { @@ -526,9 +746,15 @@ impl Daemon { } async fn check_chans(&mut self) -> Result<(), Error> { - let tsnow = SystemTime::now(); - let k = self.chan_check_next.take(); - trace!("------------ check_chans start at {:?}", k); + { + let tsnow = Instant::now(); + for (k, v) in &self.connection_states { + // TODO check for delta t since last issued status command. + if tsnow.duration_since(v.last_feedback) > Duration::from_millis(20000) { + error!("CaConn without status feedback {k:?}"); + } + } + } let mut currently_search_pending = 0; { for (_ch, st) in &self.channel_states { @@ -537,40 +763,40 @@ impl Daemon { } } } + let k = self.chan_check_next.take(); + trace!("------------ check_chans start at {:?}", k); let it = if let Some(last) = k { self.channel_states.range_mut(last..) } else { self.channel_states.range_mut(..) }; + let tsnow = SystemTime::now(); for (i, (ch, st)) in it.enumerate() { use ActiveChannelState::*; use ChannelStateValue::*; match &mut st.value { Active(st2) => match st2 { - UnknownAddress => { - //info!("UnknownAddress {} {:?}", i, ch); - if currently_search_pending < CURRENT_SEARCH_PENDING_MAX { - currently_search_pending += 1; - if st.pending_op.is_none() { - st.pending_op = Some(ChanOp::Finder(ch.id().to_string(), tsnow)); - st.value = Active(SearchPending { since: tsnow }); + UnknownAddress { since } => { + let dt = tsnow.duration_since(*since).unwrap_or(Duration::ZERO); + if dt > UNKNOWN_ADDRESS_STAY { + //info!("UnknownAddress {} {:?}", i, ch); + if currently_search_pending < CURRENT_SEARCH_PENDING_MAX { + currently_search_pending += 1; + st.value = Active(SearchPending { + since: tsnow, + did_send: false, + }); + SEARCH_REQ_MARK_COUNT.fetch_add(1, atomic::Ordering::AcqRel); } } } - SearchPending { since } => { + SearchPending { since, did_send: _ } => { //info!("SearchPending {} {:?}", i, ch); - // TODO handle Err - match tsnow.duration_since(*since) { - Ok(dt) => { - if dt >= Duration::from_millis(SEARCH_PENDING_TIMEOUT as u64) { - debug!("Search timeout for {ch:?}"); - st.value = Active(ActiveChannelState::NoAddress); - currently_search_pending -= 1; - } - } - Err(e) => { - error!("SearchPending {e}"); - } + let dt = tsnow.duration_since(*since).unwrap_or(Duration::ZERO); + if dt > SEARCH_PENDING_TIMEOUT { + info!("Search timeout for {ch:?}"); + st.value = Active(ActiveChannelState::NoAddress { since: tsnow }); + currently_search_pending -= 1; } } WithAddress { addr, state } => { @@ -578,55 +804,30 @@ impl Daemon { use WithAddressState::*; match state { Unassigned { assign_at } => { - if *assign_at <= tsnow { - if st.pending_op.is_none() { - if self.ingest_commons.ca_conn_set.has_addr(&SocketAddr::V4(*addr)).await { - } else { - debug!("==================== create CaConn for {ch:?}"); - let backend = self.opts.backend().into(); - let local_epics_hostname = self.opts.local_epics_hostname.clone(); - let array_truncate = self.opts.array_truncate; - let insert_item_queue_sender = self.common_insert_item_queue.sender().await; - let insert_queue_max = 256; - let data_store = self.datastore.clone(); - let with_channels = Vec::new(); - // TODO want to atomically use or create a connection. - // TODO creating a connection may block too long, because it establishes TCP first. - self.ingest_commons - .ca_conn_set - .create_ca_conn( - backend, - *addr, - local_epics_hostname, - array_truncate, - insert_queue_max, - insert_item_queue_sender, - data_store, - with_channels, - ) - .await?; - } - { - let backend = self.opts.backend().into(); - let channel_name = ch.id().into(); - let ingest_commons = self.ingest_commons.clone(); - // This operation is meant to complete very quickly - self.ingest_commons - .ca_conn_set - .add_channel_to_addr( - backend, - SocketAddr::V4(*addr), - channel_name, - ingest_commons, - ) - .await?; - let cs = ConnectionState { - updated: tsnow, - value: ConnectionStateValue::Unconnected, - }; - *state = WithAddressState::Assigned(cs) - } - } + if DO_ASSIGN_TO_CA_CONN && *assign_at <= tsnow { + let backend = self.opts.backend().into(); + let channel_name = ch.id().into(); + // This operation is meant to complete very quickly + self.ingest_commons + .ca_conn_set + .add_channel_to_addr( + backend, + *addr, + channel_name, + &self.common_insert_item_queue, + &self.datastore, + CA_CONN_INSERT_QUEUE_MAX, + self.opts.array_truncate, + self.opts.local_epics_hostname.clone(), + ) + .slow_warn(2000) + .instrument(info_span!("add_channel_to_addr")) + .await?; + let cs = ConnectionState { + updated: tsnow, + value: ConnectionStateValue::Unconnected, + }; + *state = WithAddressState::Assigned(cs); } } Assigned(_) => { @@ -634,9 +835,11 @@ impl Daemon { } } } - NoAddress => { - // TODO try to find address again after some randomized timeout - //info!("NoAddress {} {:?}", i, ch); + NoAddress { since } => { + let dt = tsnow.duration_since(*since).unwrap_or(Duration::ZERO); + if dt > NO_ADDRESS_STAY { + st.value = Active(ActiveChannelState::UnknownAddress { since: tsnow }); + } } }, ToRemove { .. } => { @@ -649,69 +852,43 @@ impl Daemon { } } for (ch, st) in &mut self.channel_states { - match &mut st.pending_op { - Some(op) => match op { - ChanOp::Finder(s, start) => { - if *start + Duration::from_millis(10000) >= tsnow { - match self.search_tx.try_send(s.clone()) { - Ok(_) => { - *start = tsnow; - st.pending_op = None; - } - Err(e) => match e { - async_channel::TrySendError::Full(_) => { - //warn!("Finder channel full"); - *start = tsnow; - } - async_channel::TrySendError::Closed(_) => { - error!("Finder channel closed"); - } - }, - } - } else { - st.pending_op = None; - warn!("ChanOp::Finder send timeout for {ch:?}"); - *st = ChannelState { - value: ChannelStateValue::Active(ActiveChannelState::UnknownAddress), - pending_op: None, - }; + if let ChannelStateValue::Active(ActiveChannelState::SearchPending { since: _, did_send }) = &mut st.value { + if *did_send == false { + match self.search_tx.try_send(ch.id().into()) { + Ok(()) => { + *did_send = true; + SEARCH_REQ_SEND_COUNT.fetch_add(1, atomic::Ordering::AcqRel); } - } - ChanOp::ConnCmd(fut) => { - use std::task::Poll::*; - match futures_util::poll!(fut) { - Ready(res) => { - st.pending_op = None; - match res { - Ok(_) => { - debug!("ChanOp::ConnCmd completed fine"); - } - Err(e) => { - error!("ChanOp::ConnCmd {e}"); - } - } + Err(e) => match e { + async_channel::TrySendError::Full(_) => {} + async_channel::TrySendError::Closed(_) => { + error!("Finder channel closed"); + // TODO recover from this. + panic!(); } - Pending => {} - } + }, } - }, - None => {} + } } } { self.count_unknown_address = 0; self.count_search_pending = 0; + self.count_search_sent = 0; self.count_no_address = 0; self.count_unassigned = 0; self.count_assigned = 0; for (_ch, st) in &self.channel_states { match &st.value { ChannelStateValue::Active(st) => match st { - ActiveChannelState::UnknownAddress => { + ActiveChannelState::UnknownAddress { .. } => { self.count_unknown_address += 1; } - ActiveChannelState::SearchPending { .. } => { + ActiveChannelState::SearchPending { did_send, .. } => { self.count_search_pending += 1; + if *did_send { + self.count_search_sent += 1; + } } ActiveChannelState::WithAddress { state, .. } => match state { WithAddressState::Unassigned { .. } => { @@ -721,7 +898,7 @@ impl Daemon { self.count_assigned += 1; } }, - ActiveChannelState::NoAddress => { + ActiveChannelState::NoAddress { .. } => { self.count_no_address += 1; } }, @@ -733,38 +910,66 @@ impl Daemon { } async fn check_caconn_chans(&mut self) -> Result<(), Error> { - if self.caconn_last_channel_check.elapsed() > Duration::from_millis(5000) { + if self.caconn_last_channel_check.elapsed() > CHANNEL_CHECK_INTERVAL { info!("Issue channel check to all CaConn"); - let res = self - .ingest_commons + self.ingest_commons .ca_conn_set - .send_command_to_all(|| ConnCommand::check_channels_alive()) + .enqueue_command_to_all(|| ConnCommand::check_health()) .await?; - for x in res { - if !x { - warn!("bad check_channels_alive result"); - } - } self.caconn_last_channel_check = Instant::now(); } Ok(()) } async fn handle_timer_tick(&mut self) -> Result<(), Error> { + let ts1 = Instant::now(); let tsnow = SystemTime::now(); + if SIGINT.load(atomic::Ordering::Acquire) == 1 { + warn!("Received SIGINT"); + SIGINT.store(2, atomic::Ordering::Release); + } + if SIGTERM.load(atomic::Ordering::Acquire) == 1 { + warn!("Received SIGTERM"); + SIGTERM.store(2, atomic::Ordering::Release); + } self.check_chans().await?; + let dt = ts1.elapsed(); + if dt > Duration::from_millis(500) { + info!("slow check_chans {}ms", dt.as_secs_f32() * 1e3); + } + let ts1 = Instant::now(); self.check_caconn_chans().await?; + let dt = ts1.elapsed(); + if dt > Duration::from_millis(500) { + info!("slow check_chans {}ms", dt.as_secs_f32() * 1e3); + } if tsnow.duration_since(self.last_status_print).unwrap_or(Duration::ZERO) >= Duration::from_millis(1000) { self.last_status_print = tsnow; info!( - "{:8} {:8} {:8} : {:8} {:8} : {:10}", + "{:8} {:8} {:8} : {:8} : {:8} {:8} : {:10}", self.count_unknown_address, self.count_search_pending, + self.count_search_sent, self.count_no_address, self.count_unassigned, self.count_assigned, self.insert_queue_counter.load(atomic::Ordering::Acquire), ); + if false { + info!( + "{:5} {:5} {:5} {:5} {:5} {:5} {:5} {:5} {:5} {:5}", + SEARCH_REQ_MARK_COUNT.load(atomic::Ordering::Acquire), + SEARCH_REQ_SEND_COUNT.load(atomic::Ordering::Acquire), + SEARCH_REQ_RECV_COUNT.load(atomic::Ordering::Acquire), + SEARCH_REQ_BATCH_SEND_COUNT.load(atomic::Ordering::Acquire), + SEARCH_REQ_BATCH_RECV_COUNT.load(atomic::Ordering::Acquire), + SEARCH_RES_0_COUNT.load(atomic::Ordering::Acquire), + SEARCH_RES_1_COUNT.load(atomic::Ordering::Acquire), + SEARCH_RES_2_COUNT.load(atomic::Ordering::Acquire), + SEARCH_RES_3_COUNT.load(atomic::Ordering::Acquire), + SEARCH_ANS_COUNT.load(atomic::Ordering::Acquire), + ); + } } Ok(()) } @@ -772,8 +977,9 @@ impl Daemon { fn handle_channel_add(&mut self, ch: Channel) -> Result<(), Error> { if !self.channel_states.contains_key(&ch) { let st = ChannelState { - value: ChannelStateValue::Active(ActiveChannelState::UnknownAddress), - pending_op: None, + value: ChannelStateValue::Active(ActiveChannelState::UnknownAddress { + since: SystemTime::now(), + }), }; self.channel_states.insert(ch, st); } @@ -784,7 +990,7 @@ impl Daemon { if let Some(k) = self.channel_states.get_mut(&ch) { match &k.value { ChannelStateValue::Active(j) => match j { - ActiveChannelState::UnknownAddress => { + ActiveChannelState::UnknownAddress { .. } => { k.value = ChannelStateValue::ToRemove { addr: None }; } ActiveChannelState::SearchPending { .. } => { @@ -795,7 +1001,7 @@ impl Daemon { addr: Some(addr.clone()), }; } - ActiveChannelState::NoAddress => { + ActiveChannelState::NoAddress { .. } => { k.value = ChannelStateValue::ToRemove { addr: None }; } }, @@ -807,16 +1013,19 @@ impl Daemon { async fn handle_search_done(&mut self, item: Result, Error>) -> Result<(), Error> { //debug!("handle SearchDone: {res:?}"); + let tsnow = SystemTime::now(); match item { - Ok(a) => { - for res in a { + Ok(ress) => { + SEARCH_ANS_COUNT.fetch_add(ress.len(), atomic::Ordering::AcqRel); + for res in ress { if let Some(addr) = &res.addr { - let addr = addr.clone(); let ch = Channel::new(res.channel); if let Some(st) = self.channel_states.get_mut(&ch) { - if let ChannelStateValue::Active(ActiveChannelState::SearchPending { since }) = &st.value { - let dt = SystemTime::now().duration_since(*since).unwrap(); - if dt > Duration::from_millis(TIMEOUT_WARN as u64) { + if let ChannelStateValue::Active(ActiveChannelState::SearchPending { since, did_send: _ }) = + &st.value + { + let dt = tsnow.duration_since(*since).unwrap(); + if dt > SEARCH_PENDING_TIMEOUT_WARN { warn!( " FOUND {:5.0} {:5.0} {addr}", 1e3 * dt.as_secs_f32(), @@ -824,21 +1033,10 @@ impl Daemon { ); } let stnew = ChannelStateValue::Active(ActiveChannelState::WithAddress { - addr, - state: WithAddressState::Unassigned { - assign_at: SystemTime::now(), - }, + addr: addr.clone(), + state: WithAddressState::Unassigned { assign_at: tsnow }, }); st.value = stnew; - if let (Some(response_addr),) = (res.response_addr,) { - let job = AddrInsertJob { - backend: self.opts.backend().into(), - channel: ch.clone(), - response_addr: response_addr, - addr: addr, - }; - self.ioc_addr_inserter_tx.send(job).await?; - } } else { warn!( "address found, but state for {ch:?} is not SearchPending: {:?}", @@ -852,16 +1050,18 @@ impl Daemon { //debug!("no addr from search in {res:?}"); let ch = Channel::new(res.channel); if let Some(st) = self.channel_states.get_mut(&ch) { - if let ChannelStateValue::Active(ActiveChannelState::SearchPending { since }) = &st.value { - let dt = SystemTime::now().duration_since(*since).unwrap(); - if dt > Duration::from_millis(TIMEOUT_WARN as u64) { + if let ChannelStateValue::Active(ActiveChannelState::SearchPending { since, did_send: _ }) = + &st.value + { + let dt = tsnow.duration_since(*since).unwrap(); + if dt > SEARCH_PENDING_TIMEOUT_WARN { warn!( "NOT FOUND {:5.0} {:5.0}", 1e3 * dt.as_secs_f32(), 1e3 * res.dt.as_secs_f32() ); } - st.value = ChannelStateValue::Active(ActiveChannelState::NoAddress); + st.value = ChannelStateValue::Active(ActiveChannelState::NoAddress { since: tsnow }); } else { warn!("no address, but state for {ch:?} is not SearchPending: {:?}", st.value); } @@ -878,14 +1078,52 @@ impl Daemon { Ok(()) } - async fn handle_event(&mut self, item: DaemonEvent) -> Result<(), Error> { + async fn handle_ca_conn_done(&mut self, conn_addr: SocketAddrV4) -> Result<(), Error> { + info!("handle_ca_conn_done {conn_addr:?}"); + self.connection_states.remove(&conn_addr); + for (k, v) in self.channel_states.iter_mut() { + match &v.value { + ChannelStateValue::Active(st2) => match st2 { + ActiveChannelState::UnknownAddress { .. } => {} + ActiveChannelState::SearchPending { since: _, did_send: _ } => {} + ActiveChannelState::WithAddress { addr, state: _ } => { + if addr == &conn_addr { + info!("ca conn down, reset {k:?}"); + *v = ChannelState { + value: ChannelStateValue::Active(ActiveChannelState::UnknownAddress { + since: SystemTime::now(), + }), + }; + } + } + ActiveChannelState::NoAddress { .. } => {} + }, + ChannelStateValue::ToRemove { addr: _ } => {} + } + } + Ok(()) + } + + async fn handle_event(&mut self, item: DaemonEvent, ticker_inp_tx: &Sender) -> Result<(), Error> { use DaemonEvent::*; - match item { - TimerTick => self.handle_timer_tick().await, + let ts1 = Instant::now(); + let item_summary = item.summary(); + let ret = match item { + TimerTick => { + let ret = self.handle_timer_tick().await; + match ticker_inp_tx.send(42).await { + Ok(_) => {} + Err(_) => { + error!("can not send ticker token"); + return Err(Error::with_msg_no_trace("can not send ticker token")); + } + } + ret + } ChannelAdd(ch) => self.handle_channel_add(ch), ChannelRemove(ch) => self.handle_channel_remove(ch), SearchDone(item) => self.handle_search_done(item).await, - CaConnEvent(item) => { + CaConnEvent(addr, item) => { use netfetch::ca::conn::CaConnEventValue::*; match item.value { None => { @@ -896,30 +1134,56 @@ impl Daemon { error!("TODO on EchoTimeout remove the CaConn and reset channels"); Ok(()) } + HealthCheckDone => { + if let Some(st) = self.connection_states.get_mut(&addr) { + st.last_feedback = Instant::now(); + Ok(()) + } else { + error!("received HealthCheckDone for unknown CaConn"); + // TODO + Ok(()) + } + } + ConnCommandResult(item) => { + info!("TODO handle ConnCommandResult {item:?}"); + Ok(()) + } + EndOfStream => self.handle_ca_conn_done(addr).await, } } + }; + let dt = ts1.elapsed(); + if dt > Duration::from_millis(200) { + warn!("handle_event slow {}ms {}", dt.as_secs_f32() * 1e3, item_summary); } + ret } pub async fn daemon(&mut self) -> Result<(), Error> { + let (ticker_inp_tx, ticker_inp_rx) = async_channel::bounded::(1); let ticker = { let tx = self.tx.clone(); async move { - let mut ticker = tokio::time::interval(Duration::from_millis(100)); - ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); loop { - ticker.tick().await; + tokio::time::sleep(Duration::from_millis(100)).await; if let Err(e) = tx.send(DaemonEvent::TimerTick).await { error!("can not send TimerTick {e}"); break; } + let c = ticker_inp_rx.len().max(1); + for _ in 0..c { + match ticker_inp_rx.recv().await { + Ok(_) => {} + Err(_) => break, + } + } } } }; taskrun::spawn(ticker); loop { match self.rx.recv().await { - Ok(item) => match self.handle_event(item).await { + Ok(item) => match self.handle_event(item, &ticker_inp_tx).await { Ok(_) => {} Err(e) => { error!("daemon: {e}"); @@ -936,19 +1200,28 @@ impl Daemon { let _ = &self.ioc_finder_jh; warn!("TODO wait for insert workers"); let _ = &self.insert_workers_jh; - let _ = &self.ioc_addr_inserter_jh; + info!("daemon done"); Ok(()) } } +static SIGINT: AtomicUsize = AtomicUsize::new(0); +static SIGTERM: AtomicUsize = AtomicUsize::new(0); + +fn handler_sigint(_a: libc::c_int, _b: *const libc::siginfo_t, _c: *const libc::c_void) { + SIGINT.store(1, atomic::Ordering::Release); + let _ = netfetch::linuxhelper::unset_signal_handler(libc::SIGINT); +} + +fn handler_sigterm(_a: libc::c_int, _b: *const libc::siginfo_t, _c: *const libc::c_void) { + SIGTERM.store(1, atomic::Ordering::Release); + let _ = netfetch::linuxhelper::unset_signal_handler(libc::SIGTERM); +} + pub async fn run(opts: CaIngestOpts, channels: Vec) -> Result<(), Error> { info!("start up {opts:?}"); - let mut search_tgts = Vec::new(); - for s in opts.search() { - let addr: SocketAddrV4 = s.parse()?; - search_tgts.push(addr); - } - info!("parsed search_tgts {search_tgts:?}"); + netfetch::linuxhelper::set_signal_handler(libc::SIGINT, handler_sigint)?; + netfetch::linuxhelper::set_signal_handler(libc::SIGTERM, handler_sigterm)?; let opts2 = DaemonOpts { backend: opts.backend().into(), local_epics_hostname: opts.local_epics_hostname().into(), @@ -956,7 +1229,6 @@ pub async fn run(opts: CaIngestOpts, channels: Vec) -> Result<(), Error> insert_item_queue_cap: opts.insert_item_queue_cap(), pgconf: opts.postgresql().clone(), scyconf: opts.scylla().clone(), - search_tgts, ttls: Ttls { index: opts.ttl_index(), d0: opts.ttl_d0(), diff --git a/netfetch/Cargo.toml b/netfetch/Cargo.toml index 434d58e..16c19c4 100644 --- a/netfetch/Cargo.toml +++ b/netfetch/Cargo.toml @@ -14,6 +14,7 @@ serde_cbor = "0.11" serde_yaml = "0.9.16" tokio = { version = "1.23.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs", "tracing"] } tokio-stream = { version = "0.1", features = ["fs"]} +tracing = "0.1.37" async-channel = "1.6" bytes = "1.3" arrayref = "0.3" diff --git a/netfetch/src/ca.rs b/netfetch/src/ca.rs index 3c1d7da..56b7bbb 100644 --- a/netfetch/src/ca.rs +++ b/netfetch/src/ca.rs @@ -6,25 +6,31 @@ pub mod search; pub mod store; use self::store::DataStore; -use crate::ca::conn::ConnCommand; use crate::ca::connset::CaConnSet; use crate::conf::CaIngestOpts; use crate::errconv::ErrConv; use crate::insertworker::spawn_scylla_insert_workers; -use crate::metrics::{metrics_agg_task, ExtraInsertsConf}; +use crate::metrics::metrics_agg_task; +use crate::metrics::ExtraInsertsConf; use crate::rt::TokMx; use crate::store::CommonInsertItemQueue; use err::Error; use futures_util::stream::FuturesUnordered; +use futures_util::Future; use futures_util::{FutureExt, StreamExt}; use log::*; use netpod::Database; -use stats::{CaConnStats, CaConnStatsAgg}; +use stats::CaConnStats; +use stats::CaConnStatsAgg; use std::collections::BTreeMap; -use std::net::{SocketAddr, SocketAddrV4}; +use std::net::SocketAddrV4; +use std::pin::Pin; use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; +use std::sync::Mutex; +use std::task::Poll; use std::time::Duration; +use std::time::Instant; use tokio_postgres::Client as PgClient; pub static SIGINT: AtomicU32 = AtomicU32::new(0); @@ -46,6 +52,92 @@ pub struct IngestCommons { pub ca_conn_set: CaConnSet, } +pub trait SlowWarnable { + fn slow_warn(self, ms: u64) -> SlowWarn>> + where + Self: Sized; +} + +impl SlowWarnable for F +where + F: Future, +{ + fn slow_warn(self, ms: u64) -> SlowWarn>> + where + Self: Sized, + { + SlowWarn::new(ms, Box::pin(self)) + } +} + +pub struct SlowWarn { + ms: u64, + fut: F, + timeout: Option>>>, + first_poll: Option, +} + +impl SlowWarn +where + F: Future + Unpin, +{ + pub fn new(ms: u64, fut: F) -> Self { + Self { + ms, + fut, + timeout: None, + first_poll: None, + } + } + + fn poll_fut(mut self: Pin<&mut Self>, cx: &mut std::task::Context) -> Poll<::Output> { + use Poll::*; + match self.fut.poll_unpin(cx) { + Ready(x) => { + if let Some(None) = &self.timeout { + let dt = self.first_poll.take().unwrap().elapsed(); + warn!("--------- Completed in {}ms ----------", dt.as_secs_f32()); + } + Ready(x) + } + Pending => Pending, + } + } +} + +impl Future for SlowWarn +where + F: Future + Unpin, +{ + type Output = ::Output; + + fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context) -> Poll { + use Poll::*; + if self.first_poll.is_none() { + self.first_poll = Some(Instant::now()); + } + let a = self.timeout.as_mut(); + match a { + Some(x) => match x { + Some(x) => match x.poll_unpin(cx) { + Ready(()) => { + warn!("---------------- SlowWarn ---------------------"); + self.timeout = Some(None); + Self::poll_fut(self, cx) + } + Pending => Self::poll_fut(self, cx), + }, + None => Self::poll_fut(self, cx), + }, + None => { + self.timeout = Some(Some(Box::pin(tokio::time::sleep(Duration::from_millis(self.ms))))); + cx.waker().wake_by_ref(); + Self::poll_fut(self, cx) + } + } + } +} + pub async fn find_channel_addr( backend: String, name: String, @@ -137,8 +229,13 @@ async fn query_addr_multiple(pg_client: &PgClient) -> Result<(), Error> { Ok(()) } +fn handler_sigaction(_a: libc::c_int, _b: *const libc::siginfo_t, _c: *const libc::c_void) { + crate::ca::SIGINT.store(1, Ordering::Release); + let _ = crate::linuxhelper::unset_signal_handler(libc::SIGINT); +} + pub async fn ca_connect(opts: CaIngestOpts, channels: &Vec) -> Result<(), Error> { - crate::linuxhelper::set_signal_handler()?; + crate::linuxhelper::set_signal_handler(libc::SIGINT, handler_sigaction)?; let extra_inserts_conf = TokMx::new(ExtraInsertsConf { copies: Vec::new() }); let insert_ivl_min = Arc::new(AtomicU64::new(8800)); let scyconf = opts.scylla().clone(); @@ -250,9 +347,13 @@ pub async fn ca_connect(opts: CaIngestOpts, channels: &Vec) -> Result<() .ca_conn_set .add_channel_to_addr( opts.backend().into(), - SocketAddr::V4(addr.clone()), + *addr, ch.clone(), - ingest_commons.clone(), + &ingest_commons.insert_item_queue, + &ingest_commons.data_store, + opts.insert_queue_max(), + opts.array_truncate(), + opts.local_epics_hostname(), ) .await?; } @@ -263,33 +364,6 @@ pub async fn ca_connect(opts: CaIngestOpts, channels: &Vec) -> Result<() } info!("channels_by_host len {}", channels_by_host.len()); - // Periodic tasks triggered by commands: - let mut iper = 0; - loop { - if SIGINT.load(Ordering::Acquire) != 0 { - break; - } - // TODO remove magic number, make adaptive: - if ingest_commons.insert_item_queue.receiver().len() < 10000 { - let addr = ingest_commons.ca_conn_set.addr_nth_mod(iper).await; - if let Some(addr) = addr { - //info!("channel info for addr {addr}"); - fn cmdgen() -> (ConnCommand, async_channel::Receiver) { - ConnCommand::check_channels_alive() - } - // TODO race between getting nth address and command send, so ignore error so far. - let _res = ingest_commons.ca_conn_set.send_command_to_addr(&addr, cmdgen).await; - let cmdgen = || ConnCommand::save_conn_info(); - // TODO race between getting nth address and command send, so ignore error so far. - let _res = ingest_commons.ca_conn_set.send_command_to_addr(&addr, cmdgen).await; - } else { - //info!("nothing to save iper {iper}"); - } - iper += 1; - } - tokio::time::sleep(Duration::from_millis(10)).await; - } - loop { if SIGINT.load(Ordering::Acquire) != 0 { if false { @@ -298,8 +372,8 @@ pub async fn ca_connect(opts: CaIngestOpts, channels: &Vec) -> Result<() let rc = receiver.receiver_count(); info!("item queue senders {} receivers {}", sc, rc); } - info!("sending stop commands"); - ingest_commons.ca_conn_set.send_stop().await?; + error!("TODO sending stop commands"); + //ingest_commons.ca_conn_set.send_stop().await?; break; } tokio::time::sleep(Duration::from_millis(400)).await; @@ -307,14 +381,14 @@ pub async fn ca_connect(opts: CaIngestOpts, channels: &Vec) -> Result<() ingest_commons.ca_conn_set.wait_stopped().await?; info!("all connections done."); - insert_item_queue.drop_sender().await; + insert_item_queue.close(); drop(ingest_commons); metrics_agg_jh.abort(); drop(metrics_agg_jh); if false { - let sender = insert_item_queue.sender_raw().await; + let sender = insert_item_queue.sender_raw(); sender.close(); let receiver = insert_item_queue.receiver(); receiver.close(); diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 1d69b60..3b0939b 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -17,25 +17,34 @@ use crate::store::CommonInsertItemQueueSender; use crate::store::ConnectionStatus; use crate::store::ConnectionStatusItem; use crate::store::{InsertItem, IvlItem, MuteItem, QueryItem}; -use async_channel::Sender; use err::Error; use futures_util::stream::FuturesOrdered; use futures_util::{Future, FutureExt, Stream, StreamExt, TryFutureExt}; use log::*; use netpod::timeunits::*; +use netpod::ScalarType; +use netpod::Shape; use netpod::TS_MSP_GRID_SPACING; use netpod::TS_MSP_GRID_UNIT; -use netpod::{ScalarType, Shape}; use serde::Serialize; -use stats::{CaConnStats, IntervalEma}; -use std::collections::{BTreeMap, VecDeque}; +use stats::CaConnStats; +use stats::IntervalEma; +use std::collections::BTreeMap; +use std::collections::HashMap; +use std::collections::VecDeque; use std::net::SocketAddrV4; use std::ops::ControlFlow; use std::pin::Pin; +use std::sync::atomic; +use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; use std::sync::Arc; -use std::task::{Context, Poll}; -use std::time::{Duration, Instant, SystemTime}; +use std::sync::Mutex as StdMutex; +use std::task::Context; +use std::task::Poll; +use std::time::Duration; +use std::time::Instant; +use std::time::SystemTime; use tokio::net::TcpStream; #[derive(Clone, Debug, Serialize)] @@ -44,6 +53,7 @@ pub enum ChannelConnectedInfo { Connecting, Connected, Error, + Ended, } #[derive(Clone, Debug, Serialize)] @@ -142,6 +152,7 @@ enum ChannelState { Creating { cid: Cid, ts_beg: Instant }, Created(CreatedState), Error(ChannelError), + Ended, } impl ChannelState { @@ -151,6 +162,7 @@ impl ChannelState { ChannelState::Creating { .. } => ChannelConnectedInfo::Connecting, ChannelState::Created(_) => ChannelConnectedInfo::Connected, ChannelState::Error(_) => ChannelConnectedInfo::Error, + ChannelState::Ended => ChannelConnectedInfo::Ended, }; let scalar_type = match self { ChannelState::Created(s) => Some(s.scalar_type.clone()), @@ -264,108 +276,75 @@ fn info_store_msp_from_time(ts: SystemTime) -> u32 { #[derive(Debug)] pub enum ConnCommandKind { - FindChannel(String, Sender<(SocketAddrV4, Vec)>), - ChannelState(String, Sender<(SocketAddrV4, Option)>), - ChannelStatesAll((), Sender<(SocketAddrV4, Vec)>), - ChannelAdd(String, Sender), - ChannelRemove(String, Sender), - Shutdown(Sender), - ExtraInsertsConf(ExtraInsertsConf, Sender), - CheckChannelsAlive(Sender), - SaveConnInfo(Sender), + ChannelAdd(String), + ChannelRemove(String), + CheckHealth, + Shutdown, } #[derive(Debug)] pub struct ConnCommand { + id: usize, kind: ConnCommandKind, } impl ConnCommand { - pub fn find_channel(pattern: String) -> (ConnCommand, async_channel::Receiver<(SocketAddrV4, Vec)>) { - let (tx, rx) = async_channel::bounded(1); - let cmd = Self { - kind: ConnCommandKind::FindChannel(pattern, tx), - }; - (cmd, rx) + pub fn channel_add(name: String) -> Self { + Self { + id: Self::make_id(), + kind: ConnCommandKind::ChannelAdd(name), + } } - pub fn channel_state( - name: String, - ) -> ( - ConnCommand, - async_channel::Receiver<(SocketAddrV4, Option)>, - ) { - let (tx, rx) = async_channel::bounded(1); - let cmd = Self { - kind: ConnCommandKind::ChannelState(name, tx), - }; - (cmd, rx) + pub fn channel_remove(name: String) -> Self { + Self { + id: Self::make_id(), + kind: ConnCommandKind::ChannelRemove(name), + } } - pub fn channel_states_all() -> ( - ConnCommand, - async_channel::Receiver<(SocketAddrV4, Vec)>, - ) { - let (tx, rx) = async_channel::bounded(1); - let cmd = Self { - kind: ConnCommandKind::ChannelStatesAll((), tx), - }; - (cmd, rx) + pub fn check_health() -> Self { + Self { + id: Self::make_id(), + kind: ConnCommandKind::CheckHealth, + } } - pub fn channel_add(name: String) -> (ConnCommand, async_channel::Receiver) { - let (tx, rx) = async_channel::bounded(1); - let cmd = Self { - kind: ConnCommandKind::ChannelAdd(name, tx), - }; - (cmd, rx) + pub fn shutdown() -> Self { + Self { + id: Self::make_id(), + kind: ConnCommandKind::Shutdown, + } } - pub fn channel_remove(name: String) -> (ConnCommand, async_channel::Receiver) { - let (tx, rx) = async_channel::bounded(1); - let cmd = Self { - kind: ConnCommandKind::ChannelRemove(name, tx), - }; - (cmd, rx) + fn make_id() -> usize { + static ID: AtomicUsize = AtomicUsize::new(0); + ID.fetch_add(1, atomic::Ordering::AcqRel) } - pub fn shutdown() -> (ConnCommand, async_channel::Receiver) { - let (tx, rx) = async_channel::bounded(1); - let cmd = Self { - kind: ConnCommandKind::Shutdown(tx), - }; - (cmd, rx) + pub fn id(&self) -> usize { + self.id } +} - pub fn extra_inserts_conf_set(k: ExtraInsertsConf) -> (ConnCommand, async_channel::Receiver) { - let (tx, rx) = async_channel::bounded(1); - let cmd = Self { - kind: ConnCommandKind::ExtraInsertsConf(k, tx), - }; - (cmd, rx) - } +#[derive(Debug)] +pub enum ConnCommandResultKind { + CheckHealth, +} - pub fn check_channels_alive() -> (ConnCommand, async_channel::Receiver) { - let (tx, rx) = async_channel::bounded(1); - let cmd = Self { - kind: ConnCommandKind::CheckChannelsAlive(tx), - }; - (cmd, rx) - } - - pub fn save_conn_info() -> (ConnCommand, async_channel::Receiver) { - let (tx, rx) = async_channel::bounded(1); - let cmd = Self { - kind: ConnCommandKind::SaveConnInfo(tx), - }; - (cmd, rx) - } +#[derive(Debug)] +pub struct ConnCommandResult { + id: usize, + kind: ConnCommandResultKind, } #[derive(Debug)] pub enum CaConnEventValue { None, EchoTimeout, + HealthCheckDone, + ConnCommandResult(ConnCommandResult), + EndOfStream, } #[derive(Debug)] @@ -374,6 +353,31 @@ pub struct CaConnEvent { pub value: CaConnEventValue, } +#[derive(Debug)] +pub enum ChannelSetOp { + Add, + Remove, +} + +pub struct ChannelSetOps { + ops: StdMutex>, + flag: AtomicUsize, +} + +impl ChannelSetOps { + pub fn insert(&self, name: String, op: ChannelSetOp) { + match self.ops.lock() { + Ok(mut g) => { + g.insert(name, op); + self.flag.fetch_add(g.len(), atomic::Ordering::AcqRel); + } + Err(e) => { + error!("can not lock {e}"); + } + } + } +} + pub struct CaConn { state: CaConnState, shutdown: bool, @@ -406,6 +410,8 @@ pub struct CaConn { extra_inserts_conf: ExtraInsertsConf, ioc_ping_last: Instant, ioc_ping_start: Option, + cmd_res_queue: VecDeque, + channel_set_ops: Arc, } impl CaConn { @@ -450,144 +456,165 @@ impl CaConn { extra_inserts_conf: ExtraInsertsConf::new(), ioc_ping_last: Instant::now(), ioc_ping_start: None, + cmd_res_queue: VecDeque::new(), + channel_set_ops: Arc::new(ChannelSetOps { + ops: StdMutex::new(BTreeMap::new()), + flag: AtomicUsize::new(0), + }), } } + pub fn get_channel_set_ops_map(&self) -> Arc { + self.channel_set_ops.clone() + } + pub fn conn_command_tx(&self) -> async_channel::Sender { self.conn_command_tx.clone() } - fn handle_conn_command(&mut self, cx: &mut Context) -> Option> { + fn trigger_shutdown(&mut self) { + self.shutdown = true; + for (k, v) in self.channels.iter_mut() { + match v { + ChannelState::Init => { + *v = ChannelState::Ended; + } + ChannelState::Creating { .. } => { + *v = ChannelState::Ended; + } + ChannelState::Created(st) => { + if let Some(series) = &st.series { + let item = QueryItem::ChannelStatus(ChannelStatusItem { + ts: SystemTime::now(), + series: series.clone(), + status: ChannelStatus::Closed, + }); + info!("emit status item {item:?}"); + self.insert_item_queue.push_back(item); + } + *v = ChannelState::Ended; + } + ChannelState::Error(_) => {} + ChannelState::Ended => {} + } + } + } + + fn cmd_check_health(&mut self) { + match self.check_channels_alive() { + Ok(_) => {} + Err(e) => { + error!("{e}"); + self.trigger_shutdown(); + } + } + //self.stats.caconn_command_can_not_reply_inc(); + // TODO return the result + } + + fn cmd_find_channel(&self, pattern: &str) { + let res = if let Ok(re) = regex::Regex::new(&pattern) { + self.name_by_cid + .values() + .filter(|x| re.is_match(x)) + .map(ToString::to_string) + .collect() + } else { + Vec::new() + }; + // TODO return the result + } + + fn cmd_channel_state(&self, name: String) { + let res = match self.cid_by_name.get(&name) { + Some(cid) => match self.channels.get(cid) { + Some(state) => Some(state.to_info(name, self.remote_addr_dbg.clone())), + None => None, + }, + None => None, + }; + let msg = (self.remote_addr_dbg.clone(), res); + if msg.1.is_some() { + info!("Sending back {msg:?}"); + } + // TODO return the result + } + + fn cmd_channel_states_all(&self) { + let res: Vec<_> = self + .channels + .iter() + .map(|(cid, state)| { + let name = self + .name_by_cid + .get(cid) + .map_or("--unknown--".into(), |x| x.to_string()); + state.to_info(name, self.remote_addr_dbg.clone()) + }) + .collect(); + let msg = (self.remote_addr_dbg.clone(), res); + // TODO return the result + } + + fn cmd_channel_add(&mut self, name: String) { + self.channel_add(name); + // TODO return the result + //self.stats.caconn_command_can_not_reply_inc(); + } + + fn cmd_channel_remove(&mut self, name: String) { + self.channel_remove(name); + // TODO return the result + //self.stats.caconn_command_can_not_reply_inc(); + } + + fn cmd_shutdown(&mut self) { + self.trigger_shutdown(); + let res = self.before_reset_of_channel_state(); + self.state = CaConnState::Shutdown; + self.proto = None; + // TODO return the result + } + + fn cmd_extra_inserts_conf(&mut self, extra_inserts_conf: ExtraInsertsConf) { + self.extra_inserts_conf = extra_inserts_conf; + // TODO return the result + } + + fn cmd_save_conn_info(&mut self) { + let res = self.emit_channel_info_insert_items(); + let res = res.is_ok(); + // TODO return the result + } + + fn handle_conn_command(&mut self, cx: &mut Context) -> Poll>> { // TODO if this loops for too long time, yield and make sure we get wake up again. use Poll::*; - loop { - self.stats.caconn_loop3_count_inc(); - match self.conn_command_rx.poll_next_unpin(cx) { - Ready(Some(a)) => match a.kind { - ConnCommandKind::FindChannel(pattern, tx) => { - let res = if let Ok(re) = regex::Regex::new(&pattern) { - self.name_by_cid - .values() - .filter(|x| re.is_match(x)) - .map(ToString::to_string) - .collect() - } else { - Vec::new() - }; - let msg = (self.remote_addr_dbg.clone(), res); - match tx.try_send(msg) { - Ok(_) => {} - Err(_) => { - self.stats.caconn_command_can_not_reply_inc(); - } - } - } - ConnCommandKind::ChannelState(name, tx) => { - let res = match self.cid_by_name.get(&name) { - Some(cid) => match self.channels.get(cid) { - Some(state) => Some(state.to_info(name, self.remote_addr_dbg.clone())), - None => None, - }, - None => None, - }; - let msg = (self.remote_addr_dbg.clone(), res); - if msg.1.is_some() { - info!("Sending back {msg:?}"); - } - match tx.try_send(msg) { - Ok(_) => {} - Err(_) => { - self.stats.caconn_command_can_not_reply_inc(); - } - } - } - ConnCommandKind::ChannelStatesAll((), tx) => { - let res = self - .channels - .iter() - .map(|(cid, state)| { - let name = self - .name_by_cid - .get(cid) - .map_or("--unknown--".into(), |x| x.to_string()); - state.to_info(name, self.remote_addr_dbg.clone()) - }) - .collect(); - let msg = (self.remote_addr_dbg.clone(), res); - match tx.try_send(msg) { - Ok(_) => {} - Err(_) => { - self.stats.caconn_command_can_not_reply_inc(); - } - } - } - ConnCommandKind::ChannelAdd(name, tx) => { - self.channel_add(name); - match tx.try_send(true) { - Ok(_) => {} - Err(_) => { - self.stats.caconn_command_can_not_reply_inc(); - } - } - } - ConnCommandKind::ChannelRemove(name, tx) => { - self.channel_remove(name); - match tx.try_send(true) { - Ok(_) => {} - Err(_) => { - self.stats.caconn_command_can_not_reply_inc(); - } - } - } - ConnCommandKind::Shutdown(tx) => { - self.shutdown = true; - let res = self.before_reset_of_channel_state(); - self.state = CaConnState::Shutdown; - self.proto = None; - match tx.try_send(res.is_ok()) { - Ok(_) => {} - Err(_) => { - self.stats.caconn_command_can_not_reply_inc(); - } - } - } - ConnCommandKind::ExtraInsertsConf(k, tx) => { - self.extra_inserts_conf = k; - match tx.try_send(true) { - Ok(_) => {} - Err(_) => { - self.stats.caconn_command_can_not_reply_inc(); - } - } - } - ConnCommandKind::CheckChannelsAlive(tx) => { - let res = self.check_channels_alive(); - let res = res.is_ok(); - match tx.try_send(res) { - Ok(_) => {} - Err(_) => { - self.stats.caconn_command_can_not_reply_inc(); - } - } - } - ConnCommandKind::SaveConnInfo(tx) => { - let res = self.save_conn_info(); - let res = res.is_ok(); - match tx.try_send(res) { - Ok(_) => {} - Err(_) => { - self.stats.caconn_command_can_not_reply_inc(); - } - } - } - }, - Ready(None) => { - error!("Command queue closed"); + self.stats.caconn_loop3_count_inc(); + match self.conn_command_rx.poll_next_unpin(cx) { + Ready(Some(a)) => match a.kind { + ConnCommandKind::ChannelAdd(name) => { + self.cmd_channel_add(name); + Ready(Some(Ok(()))) } - Pending => { - break Some(Pending); + ConnCommandKind::ChannelRemove(name) => { + self.cmd_channel_remove(name); + Ready(Some(Ok(()))) } + ConnCommandKind::CheckHealth => { + self.cmd_check_health(); + Ready(Some(Ok(()))) + } + ConnCommandKind::Shutdown => { + self.cmd_shutdown(); + Ready(Some(Ok(()))) + } + }, + Ready(None) => { + error!("Command queue closed"); + Ready(None) } + Pending => Pending, } } @@ -595,38 +622,81 @@ impl CaConn { self.stats.clone() } - pub fn channel_add(&mut self, channel: String) { - if self.cid_by_name.contains_key(&channel) { + fn channel_add_expl( + channel: String, + channels: &mut BTreeMap, + cid_by_name: &mut BTreeMap, + name_by_cid: &mut BTreeMap, + cid_store: &mut CidStore, + init_state_count: &mut u64, + ) { + if cid_by_name.contains_key(&channel) { return; } - let cid = self.cid_by_name(&channel); - if self.channels.contains_key(&cid) { + let cid = Self::cid_by_name_expl(&channel, cid_by_name, name_by_cid, cid_store); + if channels.contains_key(&cid) { error!("logic error"); } else { - self.channels.insert(cid, ChannelState::Init); + channels.insert(cid, ChannelState::Init); // TODO do not count, use separate queue for those channels. - self.init_state_count += 1; + *init_state_count += 1; } } - pub fn channel_remove(&mut self, channel: String) { - let cid = self.cid_by_name(&channel); - if self.channels.contains_key(&cid) { + pub fn channel_add(&mut self, channel: String) { + Self::channel_add_expl( + channel, + &mut self.channels, + &mut self.cid_by_name, + &mut self.name_by_cid, + &mut self.cid_store, + &mut self.init_state_count, + ) + } + + fn channel_remove_expl( + channel: String, + channels: &mut BTreeMap, + cid_by_name: &mut BTreeMap, + name_by_cid: &mut BTreeMap, + cid_store: &mut CidStore, + ) { + let cid = Self::cid_by_name_expl(&channel, cid_by_name, name_by_cid, cid_store); + if channels.contains_key(&cid) { warn!("TODO actually cause the channel to get closed and removed {}", channel); } } - fn cid_by_name(&mut self, name: &str) -> Cid { - if let Some(cid) = self.cid_by_name.get(name) { + pub fn channel_remove(&mut self, channel: String) { + Self::channel_remove_expl( + channel, + &mut self.channels, + &mut self.cid_by_name, + &mut self.name_by_cid, + &mut self.cid_store, + ) + } + + fn cid_by_name_expl( + name: &str, + cid_by_name: &mut BTreeMap, + name_by_cid: &mut BTreeMap, + cid_store: &mut CidStore, + ) -> Cid { + if let Some(cid) = cid_by_name.get(name) { *cid } else { - let cid = self.cid_store.next(); - self.cid_by_name.insert(name.into(), cid); - self.name_by_cid.insert(cid, name.into()); + let cid = cid_store.next(); + cid_by_name.insert(name.into(), cid); + name_by_cid.insert(cid, name.into()); cid } } + fn cid_by_name(&mut self, name: &str) -> Cid { + Self::cid_by_name_expl(name, &mut self.cid_by_name, &mut self.name_by_cid, &mut self.cid_store) + } + fn name_by_cid(&self, cid: Cid) -> Option<&str> { self.name_by_cid.get(&cid).map(|x| x.as_str()) } @@ -721,7 +791,7 @@ impl CaConn { if let Some(started) = self.ioc_ping_start { if started.elapsed() > Duration::from_millis(4000) { warn!("Echo timeout {addr:?}", addr = self.remote_addr_dbg); - self.shutdown = true; + self.trigger_shutdown(); } } else { self.ioc_ping_start = Some(Instant::now()); @@ -731,7 +801,7 @@ impl CaConn { proto.push_out(msg); } else { warn!("can not push echo, no proto"); - self.shutdown = true; + self.trigger_shutdown(); } } } @@ -767,7 +837,7 @@ impl CaConn { Ok(()) } - fn save_conn_info(&mut self) -> Result<(), Error> { + fn emit_channel_info_insert_items(&mut self) -> Result<(), Error> { let timenow = SystemTime::now(); for (_, st) in &mut self.channels { match st { @@ -796,6 +866,7 @@ impl CaConn { ChannelState::Error(_) => { // TODO need last-save-ts for this state. } + ChannelState::Ended => {} } } Ok(()) @@ -1516,6 +1587,50 @@ impl CaConn { } } } + + fn apply_3(res: ChannelOpsResources) { + let mut g = res.channel_set_ops.lock().unwrap(); + let map = std::mem::replace(&mut *g, BTreeMap::new()); + for (ch, op) in map { + match op { + ChannelSetOp::Add => Self::channel_add_expl( + ch, + res.channels, + res.cid_by_name, + res.name_by_cid, + res.cid_store, + res.init_state_count, + ), + ChannelSetOp::Remove => { + Self::channel_remove_expl(ch, res.channels, res.cid_by_name, res.name_by_cid, res.cid_store) + } + } + } + res.channel_set_ops_flag.store(0, atomic::Ordering::Release); + } + + fn apply_2(&mut self) { + let res = ChannelOpsResources { + channel_set_ops: &self.channel_set_ops.ops, + channels: &mut self.channels, + cid_by_name: &mut self.cid_by_name, + name_by_cid: &mut self.name_by_cid, + cid_store: &mut self.cid_store, + init_state_count: &mut self.init_state_count, + channel_set_ops_flag: &self.channel_set_ops.flag, + }; + Self::apply_3(res) + } +} + +struct ChannelOpsResources<'a> { + channel_set_ops: &'a StdMutex>, + channels: &'a mut BTreeMap, + cid_by_name: &'a mut BTreeMap, + name_by_cid: &'a mut BTreeMap, + cid_store: &'a mut CidStore, + init_state_count: &'a mut u64, + channel_set_ops_flag: &'a AtomicUsize, } impl Stream for CaConn { @@ -1523,63 +1638,91 @@ impl Stream for CaConn { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; + let poll_ts1 = Instant::now(); self.stats.caconn_poll_count_inc(); - if self.shutdown { - info!("CaConn poll in shutdown"); + if self.channel_set_ops.flag.load(atomic::Ordering::Acquire) > 0 { + Self::apply_2(&mut self); } - let mut i1 = 0; - let ret = loop { - i1 += 1; - if self.shutdown { - info!("CaConn in shutdown loop 1"); - } - self.stats.caconn_loop1_count_inc(); - if !self.shutdown { - self.handle_conn_command(cx); - } - let q = self.handle_insert_futs(cx); - match q { - Ready(_) => {} - Pending => break Pending, - } - if self.shutdown { - if self.insert_item_queue.len() == 0 { - trace!("no more items to flush"); - if i1 >= 10 { - break Ready(Ok(())); + let ret = if let Some(item) = self.cmd_res_queue.pop_front() { + Ready(Some(Ok(CaConnEvent { + ts: Instant::now(), + value: CaConnEventValue::ConnCommandResult(item), + }))) + } else { + let mut i1 = 0; + let ret = loop { + i1 += 1; + self.stats.caconn_loop1_count_inc(); + loop { + if self.shutdown { + break; } - } else { - //info!("more items {}", self.insert_item_queue.len()); + break match self.handle_conn_command(cx) { + Ready(Some(Ok(_))) => {} + Ready(Some(Err(e))) => { + error!("{e}"); + self.trigger_shutdown(); + break; + } + Ready(None) => { + warn!("command input queue closed, do shutdown"); + self.trigger_shutdown(); + break; + } + Pending => break, + }; } - } - if self.insert_item_queue.len() >= self.insert_queue_max { - break Pending; - } - if !self.shutdown { - if let Some(v) = self.loop_inner(cx) { - if i1 >= 10 { - break v; + match self.handle_insert_futs(cx) { + Ready(_) => {} + Pending => break Pending, + } + if self.shutdown { + if self.insert_item_queue.len() == 0 { + trace!("no more items to flush"); + if i1 >= 10 { + break Ready(Ok(())); + } + } else { + //info!("more items {}", self.insert_item_queue.len()); } } + if self.insert_item_queue.len() >= self.insert_queue_max { + break Pending; + } + if !self.shutdown { + if let Some(v) = self.loop_inner(cx) { + if i1 >= 10 { + break v; + } + } + } + }; + match &ret { + Ready(_) => self.stats.conn_stream_ready_inc(), + Pending => self.stats.conn_stream_pending_inc(), + } + if self.shutdown && self.insert_item_queue.len() == 0 { + Ready(None) + } else { + match ret { + Ready(Ok(())) => { + let item = CaConnEvent { + ts: Instant::now(), + value: CaConnEventValue::None, + }; + Ready(Some(Ok(item))) + } + Ready(Err(e)) => Ready(Some(Err(e))), + Pending => Pending, + } } }; - match &ret { - Ready(_) => self.stats.conn_stream_ready_inc(), - Pending => self.stats.conn_stream_pending_inc(), - } - if self.shutdown && self.insert_item_queue.len() == 0 { - return Ready(None); - } - match ret { - Ready(Ok(())) => { - let item = CaConnEvent { - ts: Instant::now(), - value: CaConnEventValue::None, - }; - Ready(Some(Ok(item))) + { + let dt = poll_ts1.elapsed(); + if dt > Duration::from_millis(40) { + warn!("slow poll: {}ms", dt.as_secs_f32() * 1e3); } - Ready(Err(e)) => Ready(Some(Err(e))), - Pending => Pending, } + ret } } diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index dff1037..0e98cd8 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -1,52 +1,38 @@ -use super::conn::{CaConnEvent, ConnCommand}; +use super::conn::CaConnEvent; +use super::conn::ChannelSetOp; +use super::conn::ChannelSetOps; +use super::conn::ConnCommand; use super::store::DataStore; -use super::IngestCommons; +use super::SlowWarnable; use crate::ca::conn::CaConn; +use crate::ca::conn::CaConnEventValue; use crate::errconv::ErrConv; -use crate::rt::{JoinHandle, TokMx}; +use crate::rt::JoinHandle; +use crate::rt::TokMx; +use crate::store::CommonInsertItemQueue; use crate::store::CommonInsertItemQueueSender; -use async_channel::{Receiver, Sender}; +use async_channel::Receiver; +use async_channel::Sender; use err::Error; -use futures_util::{FutureExt, StreamExt}; +use futures_util::FutureExt; +use futures_util::StreamExt; use netpod::log::*; use stats::CaConnStats; -use std::collections::{BTreeMap, VecDeque}; -use std::net::{SocketAddr, SocketAddrV4}; +use std::collections::BTreeMap; +use std::collections::VecDeque; +use std::net::SocketAddrV4; use std::sync::Arc; use std::time::Duration; +use std::time::Instant; +use tracing::info_span; +use tracing::Instrument; -pub struct CommandQueueSet { - queues: TokMx>>, -} - -impl CommandQueueSet { - pub fn new() -> Self { - Self { - queues: TokMx::new(BTreeMap::>::new()), - } - } - - pub async fn queues(&self) -> &TokMx>> { - &self.queues - } - - pub async fn queues_locked(&self) -> tokio::sync::MutexGuard>> { - let mut g = self.queues.lock().await; - let mut rm = Vec::new(); - for (k, v) in g.iter() { - if v.is_closed() { - rm.push(*k); - } - } - for x in rm { - g.remove(&x); - } - g - } -} +#[derive(Debug, PartialEq, Eq)] +pub struct CmdId(SocketAddrV4, usize); pub struct CaConnRess { sender: Sender, + channel_set_ops: Arc, stats: Arc, jh: JoinHandle>, } @@ -66,30 +52,30 @@ impl CaConnRess { // to add it to the correct list. // There, make spawning part of this function? pub struct CaConnSet { - ca_conn_ress: TokMx>, - conn_item_tx: Sender, - conn_item_rx: Receiver, + ca_conn_ress: Arc>>, + conn_item_tx: Sender<(SocketAddrV4, CaConnEvent)>, + conn_item_rx: Receiver<(SocketAddrV4, CaConnEvent)>, } impl CaConnSet { pub fn new() -> Self { let (conn_item_tx, conn_item_rx) = async_channel::bounded(10000); Self { - ca_conn_ress: Default::default(), + ca_conn_ress: Arc::new(TokMx::new(BTreeMap::new())), conn_item_tx, conn_item_rx, } } - pub fn conn_item_rx(&self) -> Receiver { + pub fn conn_item_rx(&self) -> Receiver<(SocketAddrV4, CaConnEvent)> { self.conn_item_rx.clone() } - pub fn ca_conn_ress(&self) -> &TokMx> { + pub fn ca_conn_ress(&self) -> &TokMx> { &self.ca_conn_ress } - pub async fn create_ca_conn( + pub fn create_ca_conn_2( &self, backend: String, addr: SocketAddrV4, @@ -99,9 +85,9 @@ impl CaConnSet { insert_item_queue_sender: CommonInsertItemQueueSender, data_store: Arc, with_channels: Vec, - ) -> Result<(), Error> { - info!("create new CaConn {:?}", addr); - let addr2 = SocketAddr::V4(addr.clone()); + ) -> Result { + // TODO should we save this as event? + trace!("create new CaConn {:?}", addr); let mut conn = CaConn::new( backend.clone(), addr, @@ -117,7 +103,9 @@ impl CaConnSet { let conn = conn; let conn_tx = conn.conn_command_tx(); let conn_stats = conn.stats(); + let channel_set_ops = conn.get_channel_set_ops_map(); let conn_item_tx = self.conn_item_tx.clone(); + let ca_conn_ress = self.ca_conn_ress.clone(); let conn_fut = async move { let stats = conn.stats(); let mut conn = conn; @@ -125,7 +113,7 @@ impl CaConnSet { match item { Ok(item) => { stats.conn_item_count_inc(); - conn_item_tx.send(item).await?; + conn_item_tx.send((addr, item)).await?; } Err(e) => { error!("CaConn gives error: {e:?}"); @@ -133,45 +121,54 @@ impl CaConnSet { } } } + Self::conn_remove(&ca_conn_ress, addr).await?; + conn_item_tx + .send(( + addr, + CaConnEvent { + ts: Instant::now(), + value: CaConnEventValue::EndOfStream, + }, + )) + .await?; Ok(()) }; let jh = tokio::spawn(conn_fut); let ca_conn_ress = CaConnRess { sender: conn_tx, + channel_set_ops, stats: conn_stats, jh, }; - self.ca_conn_ress.lock().await.insert(addr2, ca_conn_ress); - Ok(()) + Ok(ca_conn_ress) } - pub async fn send_command_to_all(&self, cmdgen: F) -> Result, Error> + pub async fn enqueue_command_to_all(&self, cmdgen: F) -> Result, Error> where - F: Fn() -> (ConnCommand, async_channel::Receiver), + F: Fn() -> ConnCommand, { - //let it = self.ca_conn_ress.iter().map(|x| x); - //Self::send_command_inner(it, move || cmd.clone()); - let mut rxs = Vec::new(); - for (_addr, ress) in &*self.ca_conn_ress.lock().await { - let (cmd, rx) = cmdgen(); - match ress.sender.send(cmd).await { + let mut senders = Vec::new(); + for (addr, ress) in &*self.ca_conn_ress.lock().await { + senders.push((*addr, ress.sender.clone())); + } + let mut cmdids = Vec::new(); + for (addr, sender) in senders { + let cmd = cmdgen(); + let cmdid = cmd.id(); + match sender.send(cmd).await { Ok(()) => { - rxs.push(rx); + cmdids.push(CmdId(addr, cmdid)); } Err(e) => { error!("can not send command {e:?}"); } } } - let mut res = Vec::new(); - for rx in rxs { - let x = rx.recv().await?; - res.push(x); - } - Ok(res) + Ok(cmdids) } - pub async fn send_command_to_addr(&self, addr: &SocketAddr, cmdgen: F) -> Result + #[allow(unused)] + async fn send_command_to_addr_disabled(&self, addr: &SocketAddrV4, cmdgen: F) -> Result where F: Fn() -> (ConnCommand, async_channel::Receiver), { @@ -186,7 +183,7 @@ impl CaConnSet { } #[allow(unused)] - async fn send_command_inner<'a, IT, F, R>(it: &mut IT, cmdgen: F) -> Vec> + async fn send_command_inner_disabled<'a, IT, F, R>(it: &mut IT, cmdgen: F) -> Vec> where IT: Iterator)>, F: Fn() -> (ConnCommand, async_channel::Receiver), @@ -206,12 +203,8 @@ impl CaConnSet { rxs } - pub async fn send_stop(&self) -> Result<(), Error> { - self.send_command_to_all(|| ConnCommand::shutdown()).await?; - Ok(()) - } - pub async fn wait_stopped(&self) -> Result<(), Error> { + warn!("Lock for wait_stopped"); let mut g = self.ca_conn_ress.lock().await; let mm = std::mem::replace(&mut *g, BTreeMap::new()); let mut jhs: VecDeque<_> = VecDeque::new(); @@ -249,61 +242,68 @@ impl CaConnSet { pub async fn add_channel_to_addr( &self, backend: String, - addr: SocketAddr, - channel_name: String, - ingest_commons: Arc, + addr: SocketAddrV4, + name: String, + insert_item_queue: &CommonInsertItemQueue, + data_store: &Arc, + insert_queue_max: usize, + array_truncate: usize, + local_epics_hostname: String, ) -> Result<(), Error> { - let g = self.ca_conn_ress.lock().await; + let mut g = self + .ca_conn_ress + .lock() + .slow_warn(500) + .instrument(info_span!("conn_ress.lock")) + .await; + if !g.contains_key(&addr) { + let ca_conn_ress = self.create_ca_conn_2( + backend.clone(), + addr, + local_epics_hostname, + array_truncate, + insert_queue_max, + insert_item_queue.sender(), + data_store.clone(), + Vec::new(), + )?; + g.insert(addr, ca_conn_ress); + } match g.get(&addr) { Some(ca_conn) => { - //info!("try to add to existing... {addr} {channel_name}"); - let (cmd, rx) = ConnCommand::channel_add(channel_name); - ca_conn.sender.send(cmd).await.err_conv()?; - let a = rx.recv().await.err_conv()?; - if a { + if true { + let op = super::conn::ChannelSetOp::Add; + ca_conn.channel_set_ops.insert(name, op); Ok(()) } else { - Err(Error::with_msg_no_trace(format!("channel add failed"))) + let cmd = ConnCommand::channel_add(name); + let _cmdid = CmdId(addr, cmd.id()); + ca_conn + .sender + .send(cmd) + .slow_warn(500) + .instrument(info_span!("ca_conn.send")) + .await + .err_conv()?; + Ok(()) } } None => { - //info!("create new {addr} {channel_name}"); - drop(g); - let addr = if let SocketAddr::V4(x) = addr { - x - } else { - return Err(Error::with_msg_no_trace(format!("only ipv4 supported for IOC"))); - }; - // TODO use parameters: - self.create_ca_conn( - backend.clone(), - addr, - ingest_commons.local_epics_hostname.clone(), - 512, - 200, - ingest_commons.insert_item_queue.sender().await, - ingest_commons.data_store.clone(), - vec![channel_name], - ) - .await?; - Ok(()) + error!("expected to find matching CaConn"); + Err(Error::with_msg_no_trace("CaConn not found")) } } } - pub async fn has_addr(&self, addr: &SocketAddr) -> bool { - // TODO only used to check on add-channel whether we want to add channel to conn, or create new conn. - // TODO must do that atomic. - self.ca_conn_ress.lock().await.contains_key(addr) - } - - pub async fn addr_nth_mod(&self, n: usize) -> Option { - let g = self.ca_conn_ress.lock().await; - let len = g.len(); - if len < 1 { - return None; + async fn conn_remove( + ca_conn_ress: &TokMx>, + addr: SocketAddrV4, + ) -> Result { + warn!("Lock for conn_remove"); + if let Some(_caconn) = ca_conn_ress.lock().await.remove(&addr) { + Ok(true) + } else { + Ok(false) } - let n = n % len; - g.keys().take(n).last().map(Clone::clone) } } diff --git a/netfetch/src/ca/findioc.rs b/netfetch/src/ca/findioc.rs index ea2505a..9b09b40 100644 --- a/netfetch/src/ca/findioc.rs +++ b/netfetch/src/ca/findioc.rs @@ -70,6 +70,7 @@ pub struct FindIocStream { sids_done: BTreeMap, result_for_done_sid_count: u64, sleeper: Pin + Send>>, + stop_on_empty_queue: bool, } impl FindIocStream { @@ -96,9 +97,14 @@ impl FindIocStream { channels_per_batch: batch_size, batch_run_max, sleeper: Box::pin(tokio::time::sleep(Duration::from_millis(500))), + stop_on_empty_queue: false, } } + pub fn set_stop_on_empty_queue(&mut self) { + self.stop_on_empty_queue = true; + } + pub fn quick_state(&self) -> String { format!( "channels_input {} in_flight {} bid_by_sid {} out_queue {} result_for_done_sid_count {} bids_timed_out {}", @@ -589,12 +595,16 @@ impl Stream for FindIocStream { continue; } else { if self.channels_input.is_empty() && self.in_flight.is_empty() && self.out_queue.is_empty() { - match self.sleeper.poll_unpin(cx) { - Ready(_) => { - self.sleeper = Box::pin(tokio::time::sleep(Duration::from_millis(500))); - continue; + if self.stop_on_empty_queue { + Ready(None) + } else { + match self.sleeper.poll_unpin(cx) { + Ready(_) => { + self.sleeper = Box::pin(tokio::time::sleep(Duration::from_millis(500))); + continue; + } + Pending => Pending, } - Pending => Pending, } } else { Pending diff --git a/netfetch/src/ca/search.rs b/netfetch/src/ca/search.rs index 1abce28..2203d6a 100644 --- a/netfetch/src/ca/search.rs +++ b/netfetch/src/ca/search.rs @@ -1,11 +1,20 @@ +use super::findioc::FindIocRes; use crate::ca::findioc::FindIocStream; use crate::conf::CaIngestOpts; +use async_channel::Receiver; +use async_channel::Sender; use err::Error; use futures_util::StreamExt; use log::*; -use std::net::{IpAddr, SocketAddr}; -use std::sync::Arc; -use std::time::{Duration, Instant}; +use netpod::Database; +use std::net::IpAddr; +use std::net::SocketAddr; +use std::time::Duration; +use std::time::Instant; +use tokio::task::JoinHandle; +use tokio_postgres::Client as PgClient; + +const DB_WORKER_COUNT: usize = 4; async fn resolve_address(addr_str: &str) -> Result { const PORT_DEFAULT: u16 = 5064; @@ -44,33 +53,95 @@ async fn resolve_address(addr_str: &str) -> Result { Ok(ac) } +struct DbUpdateWorker { + jh: JoinHandle<()>, +} + +impl DbUpdateWorker { + fn new(rx: Receiver, backend: String, database: Database) -> Self { + let jh = tokio::spawn(Self::worker(rx, backend, database)); + Self { jh } + } + + async fn worker(rx: Receiver, backend: String, database: Database) { + let d = &database; + let (pg_client, pg_conn) = tokio_postgres::connect( + &format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, d.port, d.name), + tokio_postgres::tls::NoTls, + ) + .await + .unwrap(); + let (pgconn_out_tx, pgconn_out_rx) = async_channel::bounded(16); + tokio::spawn(async move { + if let Err(e) = pgconn_out_tx.send(pg_conn.await).await { + error!("can not report status of pg conn {e}"); + } + }); + let pg_client: PgClient = pg_client; + let qu_select = { + let sql = "select channel, addr from ioc_by_channel_log where facility = $1 and channel = $2 and addr is not distinct from $3 and archived = 0"; + pg_client.prepare(sql).await.unwrap() + }; + let qu_update_tsmod = { + let sql = "update ioc_by_channel_log set tsmod = now(), responseaddr = $4 where facility = $1 and channel = $2 and addr is not distinct from $3 and archived = 0"; + pg_client.prepare(sql).await.unwrap() + }; + let qu_update_archived = { + let sql = + "update ioc_by_channel_log set archived = 1 where facility = $1 and channel = $2 and archived = 0"; + pg_client.prepare(sql).await.unwrap() + }; + let qu_insert = { + let sql = "insert into ioc_by_channel_log (facility, channel, addr, responseaddr) values ($1, $2, $3, $4)"; + const TEXT: tokio_postgres::types::Type = tokio_postgres::types::Type::TEXT; + pg_client.prepare_typed(sql, &[TEXT, TEXT, TEXT, TEXT]).await.unwrap() + }; + while let Ok(item) = rx.recv().await { + let responseaddr = item.response_addr.map(|x| x.to_string()); + let addr = item.addr.map(|x| x.to_string()); + let res = pg_client + .query(&qu_select, &[&backend, &item.channel, &addr]) + .await + .unwrap(); + if res.len() == 0 { + pg_client + .execute(&qu_update_archived, &[&backend, &item.channel]) + .await + .unwrap(); + pg_client + .execute(&qu_insert, &[&backend, &item.channel, &addr, &responseaddr]) + .await + .unwrap(); + } else if res.len() == 1 { + pg_client + .execute(&qu_update_tsmod, &[&backend, &item.channel, &addr, &responseaddr]) + .await + .unwrap(); + } else { + warn!("Duplicate for {}", item.channel); + let sql="with q1 as (select ctid from ioc_by_channel_log where facility = $1 and channel = $2 and addr is not distinct from $3 order by tsmod desc, ctid desc limit 1) update ioc_by_channel_log t set archived = 1 from q1 where t.facility = $1 and t.channel = $2 and t.addr is not distinct from $3 and t.ctid != q1.ctid"; + pg_client.execute(sql, &[&backend, &item.channel, &addr]).await.unwrap(); + pg_client + .execute(&qu_update_tsmod, &[&backend, &item.channel, &addr, &responseaddr]) + .await + .unwrap(); + } + } + drop(pg_client); + let x = pgconn_out_rx.recv().await; + if let Err(e) = x { + error!("db worker sees: {e}"); + } + } +} + pub async fn ca_search(opts: CaIngestOpts, channels: &Vec) -> Result<(), Error> { info!("ca_search begin"); - let d = opts.postgresql().clone(); - let (pg_client, pg_conn) = tokio_postgres::connect( - &format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, d.port, d.name), - tokio_postgres::tls::NoTls, - ) - .await - .unwrap(); - // TODO join pg_conn in the end: - tokio::spawn(pg_conn); - let pg_client = Arc::new(pg_client); - let qu_insert = { - const TEXT: tokio_postgres::types::Type = tokio_postgres::types::Type::TEXT; - pg_client - .prepare_typed( - "insert into ioc_by_channel_log (facility, channel, responseaddr, addr) values ($1, $2, $3, $4)", - &[TEXT, TEXT, TEXT, TEXT], - ) - .await - .unwrap() - }; let mut addrs = Vec::new(); for s in opts.search() { match resolve_address(s).await { Ok(addr) => { - info!("resolved {s} as {addr}"); + trace!("resolved {s} as {addr}"); addrs.push(addr); } Err(e) => { @@ -83,7 +154,7 @@ pub async fn ca_search(opts: CaIngestOpts, channels: &Vec) -> Result<(), for s in opts.search_blacklist() { match resolve_address(s).await { Ok(addr) => { - info!("resolved {s} as {addr}"); + trace!("resolved {s} as {addr}"); gw_addrs.push(addr); } Err(e) => { @@ -93,7 +164,6 @@ pub async fn ca_search(opts: CaIngestOpts, channels: &Vec) -> Result<(), } gw_addrs }; - info!("Blacklisting {} gateways", gw_addrs.len()); let addrs = addrs .into_iter() .filter_map(|x| match x { @@ -104,14 +174,26 @@ pub async fn ca_search(opts: CaIngestOpts, channels: &Vec) -> Result<(), } }) .collect(); - let mut finder = FindIocStream::new(addrs, Duration::from_millis(1000), 20, 1); + let mut finder = FindIocStream::new(addrs, Duration::from_millis(800), 20, 4); + finder.set_stop_on_empty_queue(); for ch in channels.iter() { finder.push(ch.into()); } + + let (dbtx, dbrx) = async_channel::bounded(64); + + let mut dbworkers = Vec::new(); + for _ in 0..DB_WORKER_COUNT { + let w = DbUpdateWorker::new(dbrx.clone(), opts.backend().into(), opts.postgresql().clone()); + dbworkers.push(w); + } + drop(dbrx); + let dbtx: Sender<_> = dbtx; + let mut ts_last = Instant::now(); loop { let ts_now = Instant::now(); - if ts_now.duration_since(ts_last) >= Duration::from_millis(1000) { + if ts_now.duration_since(ts_last) >= Duration::from_millis(2000) { ts_last = ts_now; info!("{}", finder.quick_state()); } @@ -154,14 +236,25 @@ pub async fn ca_search(opts: CaIngestOpts, channels: &Vec) -> Result<(), if do_block { info!("blacklisting {item:?}"); } else { - let responseaddr = item.response_addr.map(|x| x.to_string()); - let addr = item.addr.map(|x| x.to_string()); - pg_client - .execute(&qu_insert, &[&opts.backend(), &item.channel, &responseaddr, &addr]) - .await - .unwrap(); + match dbtx.send(item).await { + Ok(_) => {} + Err(_) => { + error!("dbtx broken"); + break; + } + } } } } + drop(dbtx); + for w in dbworkers { + match w.jh.await { + Ok(_) => {} + Err(e) => { + error!("see error while join on db worker: {e}"); + } + } + } + info!("all done"); Ok(()) } diff --git a/netfetch/src/linuxhelper.rs b/netfetch/src/linuxhelper.rs index a5983c8..a2a5d3b 100644 --- a/netfetch/src/linuxhelper.rs +++ b/netfetch/src/linuxhelper.rs @@ -2,7 +2,6 @@ use err::Error; use log::*; use std::ffi::CStr; use std::mem::MaybeUninit; -use std::sync::atomic::Ordering; use tokio::net::TcpStream; pub fn local_hostname() -> String { @@ -23,18 +22,22 @@ fn test_get_local_hostname() { assert_ne!(local_hostname().len(), 0); } -pub fn set_signal_handler() -> Result<(), Error> { +pub fn set_signal_handler( + signum: libc::c_int, + cb: fn(libc::c_int, *const libc::siginfo_t, *const libc::c_void) -> (), +) -> Result<(), Error> { + //let cb: fn(libc::c_int, *const libc::siginfo_t, *const libc::c_void) -> () = handler_sigaction; // Safe because it creates a valid value: let mask: libc::sigset_t = unsafe { MaybeUninit::zeroed().assume_init() }; - let handler: libc::sighandler_t = handler_sigaction as *const libc::c_void as _; + let sa_sigaction: libc::sighandler_t = cb as *const libc::c_void as _; let act = libc::sigaction { - sa_sigaction: handler, + sa_sigaction, sa_mask: mask, sa_flags: 0, sa_restorer: None, }; let (ec, msg) = unsafe { - let ec = libc::sigaction(libc::SIGINT, &act, std::ptr::null_mut()); + let ec = libc::sigaction(signum, &act, std::ptr::null_mut()); let errno = *libc::__errno_location(); (ec, CStr::from_ptr(libc::strerror(errno))) }; @@ -46,7 +49,7 @@ pub fn set_signal_handler() -> Result<(), Error> { Ok(()) } -fn unset_signal_handler() -> Result<(), Error> { +pub fn unset_signal_handler(signum: libc::c_int) -> Result<(), Error> { // Safe because it creates a valid value: let mask: libc::sigset_t = unsafe { MaybeUninit::zeroed().assume_init() }; let act = libc::sigaction { @@ -56,7 +59,7 @@ fn unset_signal_handler() -> Result<(), Error> { sa_restorer: None, }; let (ec, msg) = unsafe { - let ec = libc::sigaction(libc::SIGINT, &act, std::ptr::null_mut()); + let ec = libc::sigaction(signum, &act, std::ptr::null_mut()); let errno = *libc::__errno_location(); (ec, CStr::from_ptr(libc::strerror(errno))) }; @@ -68,11 +71,6 @@ fn unset_signal_handler() -> Result<(), Error> { Ok(()) } -fn handler_sigaction(_a: libc::c_int, _b: *const libc::siginfo_t, _c: *const libc::c_void) { - crate::ca::SIGINT.store(1, Ordering::Release); - let _ = unset_signal_handler(); -} - pub fn set_rcv_sock_opts(conn: &mut TcpStream, rcvbuf: u32) -> Result<(), Error> { use std::mem::size_of; use std::os::unix::prelude::AsRawFd; diff --git a/netfetch/src/metrics.rs b/netfetch/src/metrics.rs index bc1364c..4300aab 100644 --- a/netfetch/src/metrics.rs +++ b/netfetch/src/metrics.rs @@ -8,7 +8,6 @@ use log::*; use serde::{Deserialize, Serialize}; use stats::{CaConnStats, CaConnStatsAgg, CaConnStatsAggDiff}; use std::collections::HashMap; -use std::net::SocketAddr; use std::net::SocketAddrV4; use std::sync::atomic::Ordering; use std::sync::Arc; @@ -30,36 +29,16 @@ async fn find_channel( ingest_commons: Arc, ) -> axum::Json)>> { let pattern = params.get("pattern").map_or(String::new(), |x| x.clone()).to_string(); - // TODO allow usage of `?` in handler: - let res = ingest_commons - .ca_conn_set - .send_command_to_all(|| ConnCommand::find_channel(pattern.clone())) - .await - .unwrap(); - let res = res.into_iter().map(|x| (x.0.to_string(), x.1)).collect(); + // TODO ask Daemon for that information. + error!("TODO find_channel"); + let res = Vec::new(); axum::Json(res) } async fn channel_add_inner(params: HashMap, ingest_commons: Arc) -> Result<(), Error> { if let (Some(backend), Some(name)) = (params.get("backend"), params.get("name")) { - match crate::ca::find_channel_addr(backend.into(), name.into(), &ingest_commons.pgconf).await { - Ok(Some(addr)) => { - ingest_commons - .ca_conn_set - .add_channel_to_addr( - ingest_commons.backend.clone(), - SocketAddr::V4(addr), - name.into(), - ingest_commons.clone(), - ) - .await?; - Ok(()) - } - _ => { - error!("can not find addr for channel"); - Err(Error::with_msg_no_trace(format!("can not find addr for channel"))) - } - } + error!("TODO channel_add_inner"); + Err(Error::with_msg_no_trace(format!("TODO channel_add_inner"))) } else { Err(Error::with_msg_no_trace(format!("wrong parameters given"))) } @@ -98,35 +77,14 @@ async fn channel_remove( } else { return Json(Value::Bool(false)); }; - match ingest_commons - .ca_conn_set - .send_command_to_addr(&SocketAddr::V4(addr), || ConnCommand::channel_remove(name.into())) - .await - { - Ok(k) => Json(Value::Bool(k)), - Err(e) => { - error!("{e:?}"); - Json(Value::Bool(false)) - } - } + error!("TODO channel_remove"); + Json(Value::Bool(false)) } -async fn channel_state(params: HashMap, ingest_commons: Arc) -> String { +async fn channel_state(params: HashMap, ingest_commons: Arc) -> axum::Json { let name = params.get("name").map_or(String::new(), |x| x.clone()).to_string(); - match ingest_commons - .ca_conn_set - .send_command_to_all(|| ConnCommand::channel_state(name.clone())) - .await - { - Ok(k) => { - let a: Vec<_> = k.into_iter().map(|(a, b)| (a.to_string(), b)).collect(); - serde_json::to_string(&a).unwrap() - } - Err(e) => { - error!("{e:?}"); - return format!("null"); - } - } + error!("TODO channel_state"); + axum::Json(false) } async fn channel_states( @@ -134,31 +92,14 @@ async fn channel_states( ingest_commons: Arc, ) -> axum::Json> { let limit = params.get("limit").map(|x| x.parse()).unwrap_or(Ok(40)).unwrap_or(40); - let vals = ingest_commons - .ca_conn_set - .send_command_to_all(|| ConnCommand::channel_states_all()) - .await - .unwrap(); - let mut res = Vec::new(); - for h in vals { - for j in h.1 { - res.push(j); - } - } - res.sort_unstable_by_key(|v| u32::MAX - v.interest_score as u32); - res.truncate(limit); - axum::Json(res) + error!("TODO channel_state"); + axum::Json(Vec::new()) } async fn extra_inserts_conf_set(v: ExtraInsertsConf, ingest_commons: Arc) -> axum::Json { // TODO ingest_commons is the authorative value. Should have common function outside of this metrics which // can update everything to a given value. - *ingest_commons.extra_inserts_conf.lock().await = v.clone(); - ingest_commons - .ca_conn_set - .send_command_to_all(|| ConnCommand::extra_inserts_conf_set(v.clone())) - .await - .unwrap(); + error!("TODO extra_inserts_conf_set"); axum::Json(true) } diff --git a/netfetch/src/store.rs b/netfetch/src/store.rs index d51eec8..de0f8e6 100644 --- a/netfetch/src/store.rs +++ b/netfetch/src/store.rs @@ -2,19 +2,25 @@ use crate::ca::proto::{CaDataArrayValue, CaDataScalarValue, CaDataValue}; use crate::ca::store::DataStore; use crate::errconv::ErrConv; use crate::series::SeriesId; -use futures_util::{Future, FutureExt}; +use futures_util::Future; +use futures_util::FutureExt; use log::*; -use netpod::{ScalarType, Shape}; +use netpod::ScalarType; +use netpod::Shape; use scylla::frame::value::ValueList; use scylla::prepared_statement::PreparedStatement; -use scylla::transport::errors::{DbError, QueryError}; -use scylla::{QueryResult, Session as ScySession}; +use scylla::transport::errors::DbError; +use scylla::transport::errors::QueryError; +use scylla::QueryResult; +use scylla::Session as ScySession; use stats::CaConnStats; use std::net::SocketAddrV4; use std::pin::Pin; -use std::task::{Context, Poll}; -use std::time::{Duration, Instant, SystemTime}; -use tokio::sync::Mutex as TokMx; +use std::task::Context; +use std::task::Poll; +use std::time::Duration; +use std::time::Instant; +use std::time::SystemTime; pub use netpod::CONNECTION_STATUS_DIV; @@ -128,11 +134,24 @@ impl<'a> Future for ScyInsertFut<'a> { #[derive(Debug)] pub enum ConnectionStatus { - ConnectError = 1, - ConnectTimeout = 2, - Established = 3, - Closing = 4, - ClosedUnexpected = 5, + ConnectError, + ConnectTimeout, + Established, + Closing, + ClosedUnexpected, +} + +impl ConnectionStatus { + pub fn kind(&self) -> u32 { + use ConnectionStatus::*; + match self { + ConnectError => 1, + ConnectTimeout => 2, + Established => 3, + Closing => 4, + ClosedUnexpected => 5, + } + } } #[derive(Debug)] @@ -232,35 +251,32 @@ impl CommonInsertItemQueueSender { } pub struct CommonInsertItemQueue { - sender: TokMx>, + sender: async_channel::Sender, recv: async_channel::Receiver, } impl CommonInsertItemQueue { pub fn new(cap: usize) -> Self { let (tx, rx) = async_channel::bounded(cap); - Self { - sender: TokMx::new(tx.clone()), - recv: rx, - } + Self { sender: tx, recv: rx } } - pub async fn sender(&self) -> CommonInsertItemQueueSender { + pub fn sender(&self) -> CommonInsertItemQueueSender { CommonInsertItemQueueSender { - sender: self.sender.lock().await.clone(), + sender: self.sender.clone(), } } - pub async fn sender_raw(&self) -> async_channel::Sender { - self.sender.lock().await.clone() + pub fn sender_raw(&self) -> async_channel::Sender { + self.sender.clone() } pub fn receiver(&self) -> async_channel::Receiver { self.recv.clone() } - pub async fn sender_count(&self) -> usize { - self.sender.lock().await.sender_count() + pub fn sender_count(&self) -> usize { + self.sender.sender_count() } pub fn sender_count2(&self) -> usize { @@ -271,10 +287,8 @@ impl CommonInsertItemQueue { self.recv.receiver_count() } - // TODO should mark this such that a future call to sender() will fail - pub async fn drop_sender(&self) { - let x = std::mem::replace(&mut *self.sender.lock().await, async_channel::bounded(1).0); - drop(x); + pub fn close(&self) { + self.sender.close(); } } @@ -415,13 +429,13 @@ pub async fn insert_connection_status( data_store: &DataStore, _stats: &CaConnStats, ) -> Result<(), Error> { - let tsunix = item.ts.duration_since(std::time::UNIX_EPOCH).unwrap(); + let tsunix = item.ts.duration_since(std::time::UNIX_EPOCH).unwrap_or(Duration::ZERO); let secs = tsunix.as_secs() * netpod::timeunits::SEC; let nanos = tsunix.subsec_nanos() as u64; let ts = secs + nanos; let ts_msp = ts / CONNECTION_STATUS_DIV * CONNECTION_STATUS_DIV; let ts_lsp = ts - ts_msp; - let kind = item.status as u32; + let kind = item.status.kind(); let addr = format!("{}", item.addr); let params = (ts_msp as i64, ts_lsp as i64, kind as i32, addr, ttl.as_secs() as i32); data_store @@ -437,7 +451,7 @@ pub async fn insert_channel_status( data_store: &DataStore, _stats: &CaConnStats, ) -> Result<(), Error> { - let tsunix = item.ts.duration_since(std::time::UNIX_EPOCH).unwrap(); + let tsunix = item.ts.duration_since(std::time::UNIX_EPOCH).unwrap_or(Duration::ZERO); let secs = tsunix.as_secs() * netpod::timeunits::SEC; let nanos = tsunix.subsec_nanos() as u64; let ts = secs + nanos;