From 4e758dc4b8a3cc0f4753190d6c7437ae42328fda Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 22 Dec 2023 21:31:00 +0100 Subject: [PATCH] WIP --- .cargo/config.toml | 2 +- daqingest/src/bin/daqingest.rs | 105 +++++++------ daqingest/src/daemon.rs | 69 +++------ dbpg/src/schema.rs | 65 +++++++- ingest-linux/src/signal.rs | 2 - netfetch/Cargo.toml | 1 + netfetch/src/ca/conn.rs | 274 +++++++++++++++++++++++---------- netfetch/src/ca/connset.rs | 237 +++++++++++++++------------- netfetch/src/ca/finder.rs | 12 -- netfetch/src/ca/findioc.rs | 2 +- netfetch/src/ca/statemap.rs | 47 +++++- netfetch/src/conf.rs | 1 - netfetch/src/daemon_common.rs | 2 +- scywr/Cargo.toml | 2 +- scywr/src/futbatch.rs | 2 +- scywr/src/futbatchgen.rs | 2 +- scywr/src/futinsert.rs | 3 +- scywr/src/iteminsertqueue.rs | 15 +- series/src/series.rs | 2 +- stats/src/stats.rs | 4 + 20 files changed, 529 insertions(+), 320 deletions(-) diff --git a/.cargo/config.toml b/.cargo/config.toml index bc814c1..32b7a6e 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -9,7 +9,7 @@ rustflags = [ #"-C", "inline-threshold=1000", #"-Z", "time-passes=yes", #"-Z", "time-llvm-passes=yes", - #"--cfg", "tokio_unstable", + "--cfg", "tokio_unstable", ] rustdocflags = [ diff --git a/daqingest/src/bin/daqingest.rs b/daqingest/src/bin/daqingest.rs index fa9fcd9..2ebad51 100644 --- a/daqingest/src/bin/daqingest.rs +++ b/daqingest/src/bin/daqingest.rs @@ -3,67 +3,74 @@ use daqingest::opts::DaqIngestOpts; use err::Error; use log::*; use netfetch::conf::parse_config; +use taskrun::TracingMode; pub fn main() -> Result<(), Error> { let opts = DaqIngestOpts::parse(); // TODO offer again function to get runtime and configure tracing in one call let runtime = taskrun::get_runtime_opts(opts.worker_threads.unwrap_or(8), opts.blocking_threads.unwrap_or(256)); - match taskrun::tracing_init() { + match taskrun::tracing_init(TracingMode::Production) { Ok(()) => {} Err(()) => return Err(Error::with_msg_no_trace("tracing init failed")), } - let res = runtime.block_on(async move { - use daqingest::opts::ChannelAccess; - use daqingest::opts::SubCmd; - match opts.subcmd { - SubCmd::ListPkey => { - // TODO must take scylla config from CLI - let scylla_conf = err::todoval(); - scywr::tools::list_pkey(&scylla_conf).await? - } - SubCmd::ListPulses => { - // TODO must take scylla config from CLI - let scylla_conf = err::todoval(); - scywr::tools::list_pulses(&scylla_conf).await? - } - SubCmd::FetchEvents(k) => { - // TODO must take scylla config from CLI - let scylla_conf = err::todoval(); - scywr::tools::fetch_events(&k.backend, &k.channel, &scylla_conf).await? - } - SubCmd::ChannelAccess(k) => match k { - #[cfg(DISABLED)] - ChannelAccess::CaSearch(k) => { - info!("daqingest version {}", clap::crate_version!()); - let (conf, channels) = parse_config(k.config.into()).await?; - netfetch::ca::search::ca_search(conf, &channels).await? - } - ChannelAccess::CaIngest(k) => { - info!("daqingest version {}", clap::crate_version!()); - let (conf, channels) = parse_config(k.config.into()).await?; - daqingest::daemon::run(conf, channels).await? - } - }, - #[cfg(feature = "bsread")] - SubCmd::Bsread(k) => ingest_bsread::zmtp::zmtp_client(k.into()) - .await - .map_err(|e| Error::from(e.to_string()))?, - #[cfg(feature = "bsread")] - SubCmd::BsreadDump(k) => { - let mut f = ingest_bsread::zmtp::dumper::BsreadDumper::new(k.source); - f.run().await.map_err(|e| Error::from(e.to_string()))? - } - SubCmd::Version => { - println!("{}", clap::crate_version!()); - } - } - Ok(()) - }); + let res = runtime.block_on(main_run(opts)); match res { Ok(k) => Ok(k), Err(e) => { - error!("Catched: {:?}", e); + error!("catched: {:?}", e); Err(e) } } } + +async fn main_run(opts: DaqIngestOpts) -> Result<(), Error> { + taskrun::tokio::spawn(main_run_inner(opts)).await? +} + +async fn main_run_inner(opts: DaqIngestOpts) -> Result<(), Error> { + use daqingest::opts::ChannelAccess; + use daqingest::opts::SubCmd; + match opts.subcmd { + SubCmd::ListPkey => { + // TODO must take scylla config from CLI + let scylla_conf = err::todoval(); + scywr::tools::list_pkey(&scylla_conf).await? + } + SubCmd::ListPulses => { + // TODO must take scylla config from CLI + let scylla_conf = err::todoval(); + scywr::tools::list_pulses(&scylla_conf).await? + } + SubCmd::FetchEvents(k) => { + // TODO must take scylla config from CLI + let scylla_conf = err::todoval(); + scywr::tools::fetch_events(&k.backend, &k.channel, &scylla_conf).await? + } + SubCmd::ChannelAccess(k) => match k { + #[cfg(DISABLED)] + ChannelAccess::CaSearch(k) => { + info!("daqingest version {}", clap::crate_version!()); + let (conf, channels) = parse_config(k.config.into()).await?; + netfetch::ca::search::ca_search(conf, &channels).await? + } + ChannelAccess::CaIngest(k) => { + info!("daqingest version {}", clap::crate_version!()); + let (conf, channels) = parse_config(k.config.into()).await?; + daqingest::daemon::run(conf, channels).await? + } + }, + #[cfg(feature = "bsread")] + SubCmd::Bsread(k) => ingest_bsread::zmtp::zmtp_client(k.into()) + .await + .map_err(|e| Error::from(e.to_string()))?, + #[cfg(feature = "bsread")] + SubCmd::BsreadDump(k) => { + let mut f = ingest_bsread::zmtp::dumper::BsreadDumper::new(k.source); + f.run().await.map_err(|e| Error::from(e.to_string()))? + } + SubCmd::Version => { + println!("{}", clap::crate_version!()); + } + } + Ok(()) +} diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index 32bb82b..3a44275 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -35,17 +35,11 @@ use taskrun::tokio; use tokio::task::JoinHandle; const CHECK_HEALTH_IVL: Duration = Duration::from_millis(2000); -const CHECK_HEALTH_TIMEOUT: Duration = Duration::from_millis(1500); +const CHECK_HEALTH_TIMEOUT: Duration = Duration::from_millis(5000); const PRINT_ACTIVE_INTERVAL: Duration = Duration::from_millis(60000); const PRINT_STATUS_INTERVAL: Duration = Duration::from_millis(20000); const CHECK_CHANNEL_SLOW_WARN: Duration = Duration::from_millis(500); -#[derive(Debug)] -enum CheckPeriodic { - Waiting(Instant), - Ongoing(u32, Instant), -} - pub struct DaemonOpts { pgconf: Database, scyconf: ScyllaConfig, @@ -75,7 +69,7 @@ pub struct Daemon { series_by_channel_stats: Arc, shutting_down: bool, connset_ctrl: CaConnSetCtrl, - connset_status_last: CheckPeriodic, + connset_status_last: Instant, // TODO should be a stats object? insert_workers_running: AtomicU64, query_item_tx_weak: WeakSender>, @@ -231,7 +225,7 @@ impl Daemon { series_by_channel_stats, shutting_down: false, connset_ctrl: conn_set_ctrl, - connset_status_last: CheckPeriodic::Waiting(Instant::now()), + connset_status_last: Instant::now(), insert_workers_running: AtomicU64::new(0), query_item_tx_weak, connset_health_lat_ema: 0., @@ -243,23 +237,18 @@ impl Daemon { &self.stats } - async fn check_caconn_chans(&mut self, ts1: Instant) -> Result<(), Error> { - match &self.connset_status_last { - CheckPeriodic::Waiting(since) => { - if *since + CHECK_HEALTH_IVL <= ts1 { - let id = self.connset_ctrl.check_health().await?; - self.connset_status_last = CheckPeriodic::Ongoing(id, ts1); - } - } - CheckPeriodic::Ongoing(idexp, since) => { - let dt = ts1.saturating_duration_since(*since); - if dt > CHECK_HEALTH_TIMEOUT { - error!( - "CaConnSet has not reported health status since {:.0} idexp {idexp:08x}", - dt.as_secs_f32() * 1e3 - ); - } - } + async fn check_health(&mut self, ts1: Instant) -> Result<(), Error> { + self.check_health_connset(ts1)?; + Ok(()) + } + + fn check_health_connset(&mut self, ts1: Instant) -> Result<(), Error> { + let dt = self.connset_status_last.elapsed(); + if dt > CHECK_HEALTH_TIMEOUT { + error!( + "CaConnSet has not reported health status since {:.0}", + dt.as_secs_f32() * 1e3 + ); } Ok(()) } @@ -288,7 +277,7 @@ impl Daemon { SIGTERM.store(2, atomic::Ordering::Release); } let ts1 = Instant::now(); - self.check_caconn_chans(ts1).await?; + self.check_health(ts1).await?; let dt = ts1.elapsed(); if dt > CHECK_CHANNEL_SLOW_WARN { info!("slow check_chans {:.0} ms", dt.as_secs_f32() * 1e3); @@ -376,30 +365,10 @@ impl Daemon { async fn handle_ca_conn_set_item(&mut self, item: CaConnSetItem) -> Result<(), Error> { use CaConnSetItem::*; match item { - Healthy(id, ts1, ts2) => { + Healthy => { let tsnow = Instant::now(); - let dt1 = tsnow.duration_since(ts1).as_secs_f32() * 1e3; - let dt2 = tsnow.duration_since(ts2).as_secs_f32() * 1e3; - match &self.connset_status_last { - CheckPeriodic::Waiting(_since) => { - error!("received CaConnSet health report without having asked {dt1:.0} ms {dt2:.0} ms"); - } - CheckPeriodic::Ongoing(idexp, since) => { - if id != *idexp { - warn!("unexpected check health answer id {id:08x} idexp {idexp:08x}"); - } - // TODO insert response time as series to scylla. - let dtsince = tsnow.duration_since(*since).as_secs_f32() * 1e3; - { - let v = &mut self.connset_health_lat_ema; - *v += (dtsince - *v) * 0.2; - self.stats.connset_health_lat_ema().set(*v as _); - } - trace!("received CaConnSet Healthy dtsince {dtsince:.0} ms {dt1:.0} ms {dt2:.0} ms"); - self.connset_status_last = CheckPeriodic::Waiting(tsnow); - self.stats.caconnset_health_response().inc(); - } - } + self.connset_status_last = tsnow; + self.stats.caconnset_health_response().inc(); } Error(e) => { error!("error from CaConnSet: {e}"); diff --git a/dbpg/src/schema.rs b/dbpg/src/schema.rs index a639290..9ba9f15 100644 --- a/dbpg/src/schema.rs +++ b/dbpg/src/schema.rs @@ -18,6 +18,30 @@ impl Error { } } +async fn has_table(table: &str, pgc: &PgClient) -> Result { + let rows = pgc + .query( + "select count(*) as c from information_schema.tables where table_name = $1 and table_type = 'BASE TABLE' limit 10", + &[&table], + ) + .await?; + if rows.len() == 1 { + let c: i64 = rows[0].get(0); + if c == 0 { + Ok(false) + } else if c == 1 { + Ok(true) + } else { + Err(Error::from_logic_msg(format!("has_table bad count {}", c))) + } + } else { + Err(Error::from_logic_msg(format!( + "has_columns bad row count {}", + rows.len() + ))) + } +} + async fn has_column(table: &str, column: &str, pgc: &PgClient) -> Result { let rows = pgc .query( @@ -32,19 +56,51 @@ async fn has_column(table: &str, column: &str, pgc: &PgClient) -> Result Result<(), Error> { + if !has_table("ioc_by_channel_log", pgc).await? { + let _ = pgc + .execute( + " +create table if not exists ioc_by_channel_log ( + facility text not null, + channel text not null, + tscreate timestamptz not null default now(), + tsmod timestamptz not null default now(), + archived int not null default 0, + queryaddr text, + responseaddr text, + addr text +) +", + &[], + ) + .await; + let _ = pgc + .execute( + " +create index if not exists ioc_by_channel_log_channel on ioc_by_channel_log ( + facility, + channel +) +", + &[], + ) + .await; + } + Ok(()) +} + +async fn migrate_01(pgc: &PgClient) -> Result<(), Error> { if !has_column("ioc_by_channel_log", "tscreate", pgc).await? { pgc.execute( "alter table ioc_by_channel_log add tscreate timestamptz not null default now()", @@ -79,6 +135,7 @@ async fn migrate_00(pgc: &PgClient) -> Result<(), Error> { pub async fn schema_check(pgc: &PgClient) -> Result<(), Error> { migrate_00(&pgc).await?; + migrate_01(&pgc).await?; info!("schema_check done"); Ok(()) } diff --git a/ingest-linux/src/signal.rs b/ingest-linux/src/signal.rs index 1dc6e8f..95fb22f 100644 --- a/ingest-linux/src/signal.rs +++ b/ingest-linux/src/signal.rs @@ -1,10 +1,8 @@ -use log::*; use std::ffi::CStr; use std::mem::MaybeUninit; use thiserror::Error; #[derive(Debug, Error)] -#[error("{self}")] pub enum Error { SignalHandlerSet, SignalHandlerUnset, diff --git a/netfetch/Cargo.toml b/netfetch/Cargo.toml index 707293d..1e590ce 100644 --- a/netfetch/Cargo.toml +++ b/netfetch/Cargo.toml @@ -32,6 +32,7 @@ pin-project = "1" lazy_static = "1" libc = "0.2" slidebuf = "0.0.1" +dashmap = "5.5.3" log = { path = "../log" } series = { path = "../series" } stats = { path = "../stats" } diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index dd9d92b..9059ab8 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -1,5 +1,4 @@ use super::proto; -use super::proto::CreateChanRes; use super::ExtraInsertsConf; use crate::senderpolling::SenderPolling; use crate::throttletrace::ThrottleTrace; @@ -105,7 +104,7 @@ pub enum ChannelConnectedInfo { #[derive(Clone, Debug, Serialize)] pub struct ChannelStateInfo { - pub name: String, + pub cssid: ChannelStatusSeriesId, pub addr: SocketAddrV4, pub series: Option, pub channel_connected_info: ChannelConnectedInfo, @@ -159,7 +158,7 @@ struct Subid(pub u32); #[derive(Clone, Debug)] enum ChannelError { - CreateChanFail, + CreateChanFail(ChannelStatusSeriesId), } #[derive(Clone, Debug)] @@ -213,18 +212,18 @@ enum ChannelState { FetchingSeriesId(CreatedState), Created(SeriesId, CreatedState), Error(ChannelError), - Ended, + Ended(ChannelStatusSeriesId), } impl ChannelState { - fn to_info(&self, name: String, addr: SocketAddrV4) -> ChannelStateInfo { + fn to_info(&self, cssid: ChannelStatusSeriesId, addr: SocketAddrV4) -> ChannelStateInfo { let channel_connected_info = match self { ChannelState::Init(..) => ChannelConnectedInfo::Disconnected, ChannelState::Creating { .. } => ChannelConnectedInfo::Connecting, - ChannelState::FetchingSeriesId(..) => ChannelConnectedInfo::Connecting, + ChannelState::FetchingSeriesId(_) => ChannelConnectedInfo::Connecting, ChannelState::Created(..) => ChannelConnectedInfo::Connected, - ChannelState::Error(..) => ChannelConnectedInfo::Error, - ChannelState::Ended => ChannelConnectedInfo::Ended, + ChannelState::Error(_) => ChannelConnectedInfo::Error, + ChannelState::Ended(_) => ChannelConnectedInfo::Ended, }; let scalar_type = match self { ChannelState::Created(_series, s) => Some(s.scalar_type.clone()), @@ -269,7 +268,7 @@ impl ChannelState { }; let interest_score = 1. / item_recv_ivl_ema.unwrap_or(1e10).max(1e-6).min(1e10); ChannelStateInfo { - name, + cssid, addr, series, channel_connected_info, @@ -282,6 +281,19 @@ impl ChannelState { interest_score, } } + + fn cssid(&self) -> ChannelStatusSeriesId { + match self { + ChannelState::Init(cssid) => cssid.clone(), + ChannelState::Creating { cssid, .. } => cssid.clone(), + ChannelState::FetchingSeriesId(st) => st.cssid.clone(), + ChannelState::Created(_, st) => st.cssid.clone(), + ChannelState::Error(e) => match e { + ChannelError::CreateChanFail(cssid) => cssid.clone(), + }, + ChannelState::Ended(cssid) => cssid.clone(), + } + } } enum CaConnState { @@ -320,12 +332,14 @@ fn wait_fut(dt: u64) -> Pin + Send>> { } struct CidStore { + cnt: u32, rng: Xoshiro128PlusPlus, } impl CidStore { fn new(seed: u32) -> Self { Self { + cnt: 0, rng: Xoshiro128PlusPlus::seed_from_u64(seed as _), } } @@ -340,17 +354,25 @@ impl CidStore { } fn next(&mut self) -> Cid { - Cid(self.rng.next_u32()) + let c = self.cnt << 8; + self.cnt += 1; + let r = self.rng.next_u32(); + let r = r ^ (r >> 8); + let r = r ^ (r >> 8); + let r = r ^ (r >> 8); + Cid(c | r) } } struct SubidStore { + cnt: u32, rng: Xoshiro128PlusPlus, } impl SubidStore { fn new(seed: u32) -> Self { Self { + cnt: 0, rng: Xoshiro128PlusPlus::seed_from_u64(seed as _), } } @@ -365,7 +387,13 @@ impl SubidStore { } fn next(&mut self) -> Subid { - Subid(self.rng.next_u32()) + let c = self.cnt << 8; + self.cnt += 1; + let r = self.rng.next_u32(); + let r = r ^ (r >> 8); + let r = r ^ (r >> 8); + let r = r ^ (r >> 8); + Subid(c | r) } } @@ -381,7 +409,6 @@ pub enum ConnCommandKind { SeriesLookupResult(Result), ChannelAdd(String, ChannelStatusSeriesId), ChannelRemove(String), - CheckHealth, Shutdown, } @@ -413,13 +440,6 @@ impl ConnCommand { } } - pub fn check_health() -> Self { - Self { - id: Self::make_id(), - kind: ConnCommandKind::CheckHealth, - } - } - pub fn shutdown() -> Self { Self { id: Self::make_id(), @@ -438,13 +458,13 @@ impl ConnCommand { } #[derive(Debug)] -pub struct CheckHealthResult { - pub channel_statuses: BTreeMap, +pub struct ChannelStatusPartial { + pub channel_statuses: BTreeMap, } #[derive(Debug)] pub enum ConnCommandResultKind { - CheckHealth(CheckHealthResult), + Unused, } #[derive(Debug)] @@ -469,6 +489,7 @@ pub enum CaConnEventValue { None, EchoTimeout, ConnCommandResult(ConnCommandResult), + ChannelStatus(ChannelStatusPartial), QueryItem(QueryItem), ChannelCreateFail(String), EndOfStream, @@ -526,10 +547,12 @@ pub struct CaConn { cid_store: CidStore, subid_store: SubidStore, channels: HashMap, - cid_by_name: HashMap, + // btree because require order: + cid_by_name: BTreeMap, cid_by_subid: HashMap, name_by_cid: HashMap, time_binners: HashMap, + channel_status_last_done: Option, init_state_count: u64, insert_item_queue: VecDeque, remote_addr_dbg: SocketAddrV4, @@ -585,10 +608,11 @@ impl CaConn { subid_store: SubidStore::new_from_time(), init_state_count: 0, channels: HashMap::new(), - cid_by_name: HashMap::new(), + cid_by_name: BTreeMap::new(), cid_by_subid: HashMap::new(), name_by_cid: HashMap::new(), time_binners: HashMap::new(), + channel_status_last_done: None, insert_item_queue: VecDeque::new(), remote_addr_dbg, local_epics_hostname, @@ -668,7 +692,16 @@ impl CaConn { } fn cmd_check_health(&mut self) { + // TODO + // no longer in use. + // CaConn emits health updates by iteself. + // Make sure that we do also the checks here on regular intervals. + trace!("cmd_check_health"); + + // TODO + // what actions are taken here? + // what status is modified here? match self.check_channels_alive() { Ok(_) => {} Err(e) => { @@ -676,25 +709,42 @@ impl CaConn { self.trigger_shutdown(ChannelStatusClosedReason::InternalError); } } - // TODO return the result + + // TODO + // Time this, is it fast enough? + + let mut kit = self.cid_by_name.values(); + if let Some(mut kk) = kit.next().map(Clone::clone) { + let mut start = Some(kk.clone()); + if let Some(last) = self.channel_status_last_done.take() { + while kk <= last { + kk = if let Some(x) = kit.next().map(Clone::clone) { + start = Some(x.clone()); + x + } else { + start = None; + break; + }; + } + } + if let Some(mut kk) = start { + loop { + kk = if let Some(x) = kit.next().map(Clone::clone) { + x + } else { + break; + }; + } + } else { + // Nothing to do, will continue on next call from front. + } + } + while let Some(kk) = kit.next() {} let mut channel_statuses = BTreeMap::new(); for (k, v) in self.channels.iter() { - let name = self - .name_by_cid(*k) - .map_or_else(|| format!("{k:?}"), ToString::to_string); - let info = v.to_info(name.clone(), self.remote_addr_dbg); - channel_statuses.insert(name, info); + let info = v.to_info(v.cssid(), self.remote_addr_dbg); + channel_statuses.insert(v.cssid(), info); } - let health = CheckHealthResult { channel_statuses }; - let res = ConnCommandResult { - id: ConnCommandResult::make_id(), - kind: ConnCommandResultKind::CheckHealth(health), - }; - let item = CaConnEvent { - ts: Instant::now(), - value: CaConnEventValue::ConnCommandResult(res), - }; - self.ca_conn_event_out_queue.push_back(item); } fn cmd_find_channel(&self, pattern: &str) { @@ -713,7 +763,7 @@ impl CaConn { fn cmd_channel_state(&self, name: String) { let res = match self.cid_by_name.get(&name) { Some(cid) => match self.channels.get(cid) { - Some(state) => Some(state.to_info(name, self.remote_addr_dbg.clone())), + Some(state) => Some(state.to_info(state.cssid(), self.remote_addr_dbg.clone())), None => None, }, None => None, @@ -730,11 +780,11 @@ impl CaConn { .channels .iter() .map(|(cid, state)| { - let name = self - .name_by_cid - .get(cid) - .map_or("--unknown--".into(), |x| x.to_string()); - state.to_info(name, self.remote_addr_dbg.clone()) + // let name = self + // .name_by_cid + // .get(cid) + // .map_or("--unknown--".into(), |x| x.to_string()); + state.to_info(state.cssid(), self.remote_addr_dbg.clone()) }) .collect(); let msg = (self.remote_addr_dbg.clone(), res); @@ -832,10 +882,6 @@ impl CaConn { self.cmd_channel_remove(name); Ok(Ready(Some(()))) } - ConnCommandKind::CheckHealth => { - self.cmd_check_health(); - Ok(Ready(Some(()))) - } ConnCommandKind::Shutdown => { self.cmd_shutdown(); Ok(Ready(Some(()))) @@ -899,7 +945,7 @@ impl CaConn { fn channel_remove_expl( name: String, channels: &mut HashMap, - cid_by_name: &mut HashMap, + cid_by_name: &mut BTreeMap, name_by_cid: &mut HashMap, cid_store: &mut CidStore, time_binners: &mut HashMap, @@ -926,7 +972,7 @@ impl CaConn { fn cid_by_name_expl( name: &str, - cid_by_name: &mut HashMap, + cid_by_name: &mut BTreeMap, name_by_cid: &mut HashMap, cid_store: &mut CidStore, ) -> Cid { @@ -955,17 +1001,18 @@ impl CaConn { } fn channel_state_on_shutdown(&mut self, channel_reason: ChannelStatusClosedReason) { + // TODO can I reuse emit_channel_info_insert_items ? trace!("channel_state_on_shutdown channels {}", self.channels.len()); for (_cid, chst) in &mut self.channels { match chst { - ChannelState::Init(..) => { - *chst = ChannelState::Ended; + ChannelState::Init(cssid) => { + *chst = ChannelState::Ended(cssid.clone()); } - ChannelState::Creating { .. } => { - *chst = ChannelState::Ended; + ChannelState::Creating { cssid, .. } => { + *chst = ChannelState::Ended(cssid.clone()); } - ChannelState::FetchingSeriesId(..) => { - *chst = ChannelState::Ended; + ChannelState::FetchingSeriesId(st) => { + *chst = ChannelState::Ended(st.cssid.clone()); } ChannelState::Created(series, st2) => { let item = QueryItem::ChannelStatus(ChannelStatusItem { @@ -974,12 +1021,13 @@ impl CaConn { status: ChannelStatus::Closed(channel_reason.clone()), }); self.insert_item_queue.push_back(item); - *chst = ChannelState::Ended; + *chst = ChannelState::Ended(st2.cssid.clone()); } ChannelState::Error(..) => { - *chst = ChannelState::Ended; + warn!("TODO emit error status"); + // *chst = ChannelState::Ended; } - ChannelState::Ended => {} + ChannelState::Ended(cssid) => {} } } } @@ -1070,7 +1118,7 @@ impl CaConn { ChannelState::Error(_) => { // TODO need last-save-ts for this state. } - ChannelState::Ended => {} + ChannelState::Ended(_) => {} } } Ok(()) @@ -1261,6 +1309,17 @@ impl CaConn { } fn handle_event_add_res(&mut self, ev: proto::EventAddRes, tsnow: Instant) -> Result<(), Error> { + trace!("got EventAddRes: {ev:?}"); + self.stats.event_add_res_recv.inc(); + let res = Self::handle_event_add_res_inner(self, ev, tsnow); + let ts2 = Instant::now(); + self.stats + .time_handle_event_add_res + .add((ts2.duration_since(tsnow) * MS as u32).as_secs()); + res + } + + fn handle_event_add_res_inner(&mut self, ev: proto::EventAddRes, tsnow: Instant) -> Result<(), Error> { let subid = Subid(ev.subid); // TODO handle subid-not-found which can also be peer error: let cid = if let Some(x) = self.cid_by_subid.get(&subid) { @@ -1446,6 +1505,7 @@ impl CaConn { } CaItem::Msg(msg) => match msg.ty { CaMsgTy::VersionRes(n) => { + debug!("see incoming {:?} {:?}", self.remote_addr_dbg, msg); if n < 12 || n > 13 { error!("See some unexpected version {n} channel search may not work."); Ready(Some(Ok(()))) @@ -1553,18 +1613,7 @@ impl CaConn { self.handle_create_chan_res(k, tsnow)?; do_wake_again = true; } - CaMsgTy::EventAddRes(k) => { - trace4!("got EventAddRes: {k:?}"); - self.stats.event_add_res_recv.inc(); - let res = Self::handle_event_add_res(self, k, tsnow); - let ts2 = Instant::now(); - self.stats - .time_handle_event_add_res - .add((ts2.duration_since(ts1) * MS as u32).as_secs()); - ts1 = ts2; - let _ = ts1; - res? - } + CaMsgTy::EventAddRes(k) => self.handle_event_add_res(k, tsnow)?, CaMsgTy::Echo => { // let addr = &self.remote_addr_dbg; if let Some(started) = self.ioc_ping_start { @@ -1649,7 +1698,7 @@ impl CaConn { res.map_err(|e| Error::from(e.to_string())) } - fn handle_create_chan_res(&mut self, k: CreateChanRes, tsnow: Instant) -> Result<(), Error> { + fn handle_create_chan_res(&mut self, k: proto::CreateChanRes, tsnow: Instant) -> Result<(), Error> { // TODO handle cid-not-found which can also indicate peer error. let cid = Cid(k.cid); let sid = k.sid; @@ -1947,6 +1996,24 @@ impl CaConn { Ok(()) } + fn emit_channel_status(&mut self) { + // TODO limit the queue length. + // Maybe factor the actual push item into new function. + // What to do if limit reached? + // Increase some error counter. + + // if self.ca_conn_event_out_queue.len()> + + let val = ChannelStatusPartial { + channel_statuses: Default::default(), + }; + let item = CaConnEvent { + ts: Instant::now(), + value: CaConnEventValue::ChannelStatus(val), + }; + self.ca_conn_event_out_queue.push_back(item); + } + fn check_ticker_connecting_timeout(&mut self, since: Instant) -> Result<(), Error> { Ok(()) } @@ -2071,10 +2138,13 @@ impl Stream for CaConn { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - self.stats.poll_count().inc(); let poll_ts1 = Instant::now(); + self.stats.poll_count().inc(); self.stats.poll_fn_begin().inc(); + let mut reloops: u32 = 0; let ret = loop { + let lts1 = Instant::now(); + self.stats.poll_loop_begin().inc(); let qlen = self.insert_item_queue.len(); if qlen >= self.opts.insert_queue_max * 2 / 3 { @@ -2093,6 +2163,8 @@ impl Stream for CaConn { break Ready(Some(Ok(item))); } + let lts2 = Instant::now(); + match self.as_mut().handle_own_ticker(cx) { Ok(Ready(())) => { have_progress = true; @@ -2114,6 +2186,8 @@ impl Stream for CaConn { Err(e) => break Ready(Some(Err(e))), } + let lts3 = Instant::now(); + match self.as_mut().attempt_flush_channel_info_query(cx) { Ok(Ready(Some(()))) => { have_progress = true; @@ -2125,6 +2199,8 @@ impl Stream for CaConn { Err(e) => break Ready(Some(Err(e))), } + let lts2 = Instant::now(); + match self.as_mut().handle_conn_command(cx) { Ok(Ready(Some(()))) => { have_progress = true; @@ -2136,6 +2212,8 @@ impl Stream for CaConn { Err(e) => break Ready(Some(Err(e))), } + let lts4 = Instant::now(); + match self.loop_inner(cx) { Ok(Ready(Some(()))) => { have_progress = true; @@ -2151,6 +2229,26 @@ impl Stream for CaConn { } } + let lts5 = Instant::now(); + + let max = Duration::from_millis(14); + let dt = lts2.saturating_duration_since(lts1); + if dt > max { + debug!("LONG OPERATION 2 {dt:?}"); + } + let dt = lts3.saturating_duration_since(lts2); + if dt > max { + debug!("LONG OPERATION 3 {dt:?}"); + } + let dt = lts4.saturating_duration_since(lts3); + if dt > max { + debug!("LONG OPERATION 4 {dt:?}"); + } + let dt = lts5.saturating_duration_since(lts4); + if dt > max { + debug!("LONG OPERATION 5 {dt:?}"); + } + break if self.is_shutdown() { if self.queues_out_flushed() { // debug!("end of stream {}", self.remote_addr_dbg); @@ -2160,6 +2258,7 @@ impl Stream for CaConn { // debug!("queues_out_flushed false"); if have_progress { self.stats.poll_reloop().inc(); + reloops += 1; continue; } else if have_pending { self.stats.poll_pending().inc(); @@ -2174,8 +2273,18 @@ impl Stream for CaConn { } } else { if have_progress { - self.stats.poll_reloop().inc(); - continue; + if poll_ts1.elapsed() > Duration::from_millis(5) { + self.stats.poll_wake_break().inc(); + cx.waker().wake_by_ref(); + break Ready(Some(Ok(CaConnEvent { + ts: poll_ts1, + value: CaConnEventValue::None, + }))); + } else { + self.stats.poll_reloop().inc(); + reloops += 1; + continue; + } } else if have_pending { self.stats.poll_pending().inc(); Pending @@ -2186,13 +2295,20 @@ impl Stream for CaConn { } }; }; + if reloops >= 512 { + self.stats.poll_reloops_512().inc(); + } else if reloops >= 64 { + self.stats.poll_reloops_64().inc(); + } else if reloops >= 8 { + self.stats.poll_reloops_8().inc(); + } let poll_ts2 = Instant::now(); let dt = poll_ts2.saturating_duration_since(poll_ts1); if dt > Duration::from_millis(80) { warn!("long poll duration {:.0} ms", dt.as_secs_f32() * 1e3) } else if dt > Duration::from_millis(40) { info!("long poll duration {:.0} ms", dt.as_secs_f32() * 1e3) - } else if false && dt > Duration::from_millis(5) { + } else if dt > Duration::from_millis(14) { debug!("long poll duration {:.0} ms", dt.as_secs_f32() * 1e3) } ret diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index 0c05068..0ac7805 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -1,16 +1,6 @@ -use super::conn::ChannelStateInfo; -use super::conn::CheckHealthResult; -use super::conn::ConnCommandResult; use super::findioc::FindIocRes; -use super::statemap; -use super::statemap::ChannelState; -use super::statemap::ConnectionState; -use super::statemap::ConnectionStateValue; -use crate::ca::conn::CaConn; -use crate::ca::conn::CaConnEvent; -use crate::ca::conn::CaConnEventValue; -use crate::ca::conn::CaConnOpts; -use crate::ca::conn::ConnCommand; +use crate::ca::conn; +use crate::ca::statemap; use crate::ca::statemap::CaConnState; use crate::ca::statemap::WithAddressState; use crate::conf::CaIngestOpts; @@ -22,6 +12,14 @@ use crate::throttletrace::ThrottleTrace; use async_channel::Receiver; use async_channel::Sender; use atomic::AtomicUsize; +use conn::CaConn; +use conn::CaConnEvent; +use conn::CaConnEventValue; +use conn::CaConnOpts; +use conn::ChannelStateInfo; +use conn::ChannelStatusPartial; +use conn::ConnCommand; +use conn::ConnCommandResult; use core::fmt; use dbpg::seriesbychannel::BoxedSend; use dbpg::seriesbychannel::CanSendChannelInfoResult; @@ -41,8 +39,11 @@ use serde::Serialize; use series::ChannelStatusSeriesId; use statemap::ActiveChannelState; use statemap::CaConnStateValue; +use statemap::ChannelState; use statemap::ChannelStateMap; use statemap::ChannelStateValue; +use statemap::ConnectionState; +use statemap::ConnectionStateValue; use statemap::WithStatusSeriesIdState; use statemap::WithStatusSeriesIdStateInner; use statemap::CHANNEL_STATUS_DUMMY_SCALAR_TYPE; @@ -53,6 +54,7 @@ use stats::CaConnStats; use stats::CaProtoStats; use stats::IocFinderStats; use std::collections::BTreeMap; +use std::collections::HashMap; use std::collections::VecDeque; use std::net::SocketAddr; use std::net::SocketAddrV4; @@ -196,7 +198,6 @@ impl fmt::Debug for ChannelStatusesRequest { pub enum ConnSetCmd { ChannelAdd(ChannelAdd), ChannelRemove(ChannelRemove), - CheckHealth(u32, Instant), Shutdown, ChannelStatuses(ChannelStatusesRequest), } @@ -213,7 +214,7 @@ impl CaConnSetEvent { #[derive(Debug, Clone)] pub enum CaConnSetItem { Error(Error), - Healthy(u32, Instant, Instant), + Healthy, } pub struct CaConnSetCtrl { @@ -266,19 +267,6 @@ impl CaConnSetCtrl { Ok(()) } - pub async fn check_health(&mut self) -> Result { - let id = self.make_id(); - let cmd = ConnSetCmd::CheckHealth(id, Instant::now()); - let n = self.tx.len(); - if n > 0 { - debug!("check_health self.tx.len() {:?}", n); - } - let s = format!("{:?}", cmd); - self.tx.send(CaConnSetEvent::ConnSetCmd(cmd)).await?; - trace!("check_health enqueued {s}"); - Ok(id) - } - pub async fn join(self) -> Result<(), Error> { self.jh.await.map_err(|e| Error::with_msg_no_trace(e.to_string()))??; Ok(()) @@ -352,6 +340,7 @@ impl CanSendChannelInfoResult for SeriesLookupSender { } pub struct CaConnSet { + ticker: Pin>, backend: String, local_epics_hostname: String, ca_conn_ress: BTreeMap, @@ -384,9 +373,14 @@ pub struct CaConnSet { ca_proto_stats: Arc, rogue_channel_count: u64, connect_fail_count: usize, + name_by_cssid: HashMap, } impl CaConnSet { + pub fn self_name() -> &'static str { + std::any::type_name::() + } + pub fn start( backend: String, local_epics_hostname: String, @@ -411,6 +405,7 @@ impl CaConnSet { let ca_proto_stats = Arc::new(CaProtoStats::new()); let ca_conn_stats = Arc::new(CaConnStats::new()); let connset = Self { + ticker: Self::new_self_ticker(), backend, local_epics_hostname, ca_conn_ress: BTreeMap::new(), @@ -444,6 +439,7 @@ impl CaConnSet { ca_proto_stats: ca_proto_stats.clone(), rogue_channel_count: 0, connect_fail_count: 0, + name_by_cssid: HashMap::new(), }; // TODO await on jh let jh = tokio::spawn(CaConnSet::run(connset)); @@ -460,6 +456,10 @@ impl CaConnSet { } } + fn new_self_ticker() -> Pin> { + Box::pin(tokio::time::sleep(Duration::from_millis(500))) + } + async fn run(mut this: CaConnSet) -> Result<(), Error> { debug!("CaConnSet run begin"); loop { @@ -498,7 +498,6 @@ impl CaConnSet { ConnSetCmd::ChannelRemove(x) => self.handle_remove_channel(x), // ConnSetCmd::IocAddrQueryResult(x) => self.handle_ioc_query_result(x).await, // ConnSetCmd::SeriesLookupResult(x) => self.handle_series_lookup_result(x).await, - ConnSetCmd::CheckHealth(id, ts1) => self.handle_check_health(id, ts1), ConnSetCmd::Shutdown => self.handle_shutdown(), ConnSetCmd::ChannelStatuses(x) => self.handle_channel_statuses_req(x), }, @@ -517,13 +516,19 @@ impl CaConnSet { self.stats.channel_add().inc(); // TODO should I add the transition through ActiveChannelState::Init as well? let ch = Channel::new(cmd.name.clone()); - let _st = self.channel_states.inner().entry(ch).or_insert_with(|| ChannelState { - value: ChannelStateValue::Active(ActiveChannelState::WaitForStatusSeriesId { - since: SystemTime::now(), - }), - running_cmd_id: None, - health_timeout_count: 0, - }); + let _st = if let Some(e) = self.channel_states.get_mut(&ch) { + e + } else { + let item = ChannelState { + value: ChannelStateValue::Active(ActiveChannelState::WaitForStatusSeriesId { + since: SystemTime::now(), + }), + running_cmd_id: None, + health_timeout_count: 0, + }; + self.channel_states.insert(ch.clone(), item); + self.channel_states.get_mut(&ch).unwrap() + }; let tx = self.channel_info_res_tx.as_ref().get_ref().clone(); let item = ChannelInfoQuery { backend: cmd.backend, @@ -545,13 +550,17 @@ impl CaConnSet { CaConnEventValue::EchoTimeout => Ok(()), CaConnEventValue::ConnCommandResult(x) => self.handle_conn_command_result(addr, x), CaConnEventValue::QueryItem(item) => { - todo!("remove this insert case"); + error!("TODO remove this insert case"); // self.storage_insert_queue.push_back(item); Ok(()) } CaConnEventValue::ChannelCreateFail(x) => self.handle_channel_create_fail(addr, x), CaConnEventValue::EndOfStream => self.handle_ca_conn_eos(addr), CaConnEventValue::ConnectFail => self.handle_connect_fail(addr), + CaConnEventValue::ChannelStatus(st) => { + error!("TODO handle_ca_conn_event update channel status view"); + Ok(()) + } } } @@ -562,10 +571,12 @@ impl CaConnSet { trace3!("handle_series_lookup_result {res:?}"); match res { Ok(res) => { + let cssid = ChannelStatusSeriesId::new(res.series.into_inner().id()); + self.name_by_cssid.insert(cssid.clone(), res.channel.clone()); let add = ChannelAddWithStatusId { backend: res.backend, name: res.channel, - cssid: ChannelStatusSeriesId::new(res.series.into_inner().id()), + cssid, }; self.handle_add_channel_with_status_id(add)?; } @@ -587,7 +598,7 @@ impl CaConnSet { debug!("handle_add_channel_with_status_id {cmd:?}"); } let ch = Channel::new(cmd.name.clone()); - if let Some(chst) = self.channel_states.inner().get_mut(&ch) { + if let Some(chst) = self.channel_states.get_mut(&ch) { if let ChannelStateValue::Active(chst2) = &mut chst.value { if let ActiveChannelState::WaitForStatusSeriesId { .. } = chst2 { *chst2 = ActiveChannelState::WithStatusSeriesId { @@ -631,7 +642,7 @@ impl CaConnSet { debug!("handle_add_channel_with_addr {cmd:?}"); } let ch = Channel::new(cmd.name.clone()); - if let Some(chst) = self.channel_states.inner().get_mut(&ch) { + if let Some(chst) = self.channel_states.get_mut(&ch) { if let ChannelStateValue::Active(ast) = &mut chst.value { if let ActiveChannelState::WithStatusSeriesId { status_series_id: _, @@ -669,7 +680,7 @@ impl CaConnSet { return Ok(()); } let ch = Channel::new(cmd.name); - if let Some(k) = self.channel_states.inner().get_mut(&ch) { + if let Some(k) = self.channel_states.get_mut(&ch) { match &k.value { ChannelStateValue::Active(j) => match j { ActiveChannelState::Init { .. } => { @@ -717,7 +728,7 @@ impl CaConnSet { if trigger.contains(&ch.id()) { debug!("handle_ioc_query_result {res:?}"); } - if let Some(chst) = self.channel_states.inner().get_mut(&ch) { + if let Some(chst) = self.channel_states.get_mut(&ch) { if let ChannelStateValue::Active(ast) = &mut chst.value { if let ActiveChannelState::WithStatusSeriesId { status_series_id, @@ -764,33 +775,20 @@ impl CaConnSet { Ok(()) } - fn handle_check_health(&mut self, id: u32, ts1: Instant) -> Result<(), Error> { - trace2!("handle_check_health {id:08x}"); + fn handle_check_health(&mut self) -> Result<(), Error> { + trace2!("handle_check_health"); if self.shutdown_stopping { - return Ok(()); + Ok(()) + } else { + if false { + self.thr_msg_storage_len + .trigger("connset handle_check_health", &[&self.storage_insert_sender.len()]); + } + self.check_channel_states()?; + let item = CaConnSetItem::Healthy; + self.connset_out_queue.push_back(item); + Ok(()) } - if false { - self.thr_msg_storage_len - .trigger("connset handle_check_health", &[&self.storage_insert_sender.len()]); - } - self.check_channel_states()?; - - // Trigger already the next health check, but use the current data that we have. - // TODO do the full check before sending the reply to daemon. - for (_, res) in self.ca_conn_ress.iter_mut() { - let item = ConnCommand::check_health(); - res.cmd_queue.push_back(item); - trace2!( - "handle_check_health pushed check command {:?} {:?}", - res.cmd_queue.len(), - res.sender.len() - ); - } - - let ts2 = Instant::now(); - let item = CaConnSetItem::Healthy(id, ts1, ts2); - self.connset_out_queue.push_back(item); - Ok(()) } fn handle_channel_statuses_req(&mut self, req: ChannelStatusesRequest) -> Result<(), Error> { @@ -801,7 +799,6 @@ impl CaConnSet { let reg1 = regex::Regex::new(&req.name)?; let channels_ca_conn_set = self .channel_states - .inner() .iter() .filter(|(k, _)| reg1.is_match(k.id())) .map(|(k, v)| (k.id().to_string(), v.clone())) @@ -835,17 +832,23 @@ impl CaConnSet { fn handle_conn_command_result(&mut self, addr: SocketAddr, res: ConnCommandResult) -> Result<(), Error> { use crate::ca::conn::ConnCommandResultKind::*; match res.kind { - CheckHealth(res) => self.apply_ca_conn_health_update(addr, res), + Unused => Ok(()), + //CheckHealth(res) => self.apply_ca_conn_health_update(addr, res), } } - fn apply_ca_conn_health_update(&mut self, addr: SocketAddr, res: CheckHealthResult) -> Result<(), Error> { + fn apply_ca_conn_health_update(&mut self, addr: SocketAddr, res: ChannelStatusPartial) -> Result<(), Error> { trace2!("apply_ca_conn_health_update {addr}"); let tsnow = SystemTime::now(); self.rogue_channel_count = 0; for (k, v) in res.channel_statuses { - let ch = Channel::new(k); - if let Some(st1) = self.channel_states.inner().get_mut(&ch) { + let name = if let Some(x) = self.name_by_cssid.get(&v.cssid) { + x + } else { + return Err(Error::with_msg_no_trace(format!("unknown cssid {:?}", v.cssid))); + }; + let ch = Channel::new(name.clone()); + if let Some(st1) = self.channel_states.get_mut(&ch) { if let ChannelStateValue::Active(st2) = &mut st1.value { if let ActiveChannelState::WithStatusSeriesId { status_series_id: _, @@ -888,7 +891,7 @@ impl CaConnSet { trace!("handle_channel_create_fail {addr} {name}"); let tsnow = SystemTime::now(); let ch = Channel::new(name); - if let Some(st1) = self.channel_states.inner().get_mut(&ch) { + if let Some(st1) = self.channel_states.get_mut(&ch) { if let ChannelStateValue::Active(st2) = &mut st1.value { if let ActiveChannelState::WithStatusSeriesId { status_series_id: _, @@ -921,7 +924,7 @@ impl CaConnSet { fn handle_connect_fail(&mut self, addr: SocketAddr) -> Result<(), Error> { trace2!("handle_connect_fail {addr}"); let tsnow = SystemTime::now(); - for (ch, st1) in self.channel_states.inner().iter_mut() { + for (ch, st1) in self.channel_states.iter_mut() { match &mut st1.value { ChannelStateValue::Active(st2) => match st2 { ActiveChannelState::Init { since: _ } => {} @@ -1033,32 +1036,13 @@ impl CaConnSet { break; } } - CaConnEventValue::None => { - if let Err(_) = tx1.send((addr, item)).await { - break; - } - } - CaConnEventValue::EchoTimeout => { - if let Err(_) = tx1.send((addr, item)).await { - break; - } - } - CaConnEventValue::ConnCommandResult(_) => { - if let Err(_) = tx1.send((addr, item)).await { - break; - } - } - CaConnEventValue::ChannelCreateFail(_) => { - if let Err(_) = tx1.send((addr, item)).await { - break; - } - } - CaConnEventValue::EndOfStream => { - if let Err(_) = tx1.send((addr, item)).await { - break; - } - } - CaConnEventValue::ConnectFail => { + CaConnEventValue::None + | CaConnEventValue::EchoTimeout + | CaConnEventValue::ConnCommandResult(..) + | CaConnEventValue::ChannelCreateFail(..) + | CaConnEventValue::EndOfStream + | CaConnEventValue::ConnectFail + | CaConnEventValue::ChannelStatus(..) => { if let Err(_) = tx1.send((addr, item)).await { break; } @@ -1240,9 +1224,9 @@ impl CaConnSet { let k = self.chan_check_next.take(); let it = if let Some(last) = k { trace!("check_chans start at {:?}", last); - self.channel_states.inner().range_mut(last..) + self.channel_states.range_mut(last..) } else { - self.channel_states.inner().range_mut(..) + self.channel_states.range_mut(..) }; let tsnow = SystemTime::now(); @@ -1306,11 +1290,14 @@ impl CaConnSet { } } Assigned(st4) => { + if st4.updated + CHANNEL_HEALTH_TIMEOUT / 3 < tsnow { + warn!("soon health timeout channel {ch:?} ~~~~~~~~~~~~~~~~~~~"); + } if st4.updated + CHANNEL_HEALTH_TIMEOUT < tsnow { self.stats.channel_health_timeout().inc(); trace!("health timeout channel {ch:?} ~~~~~~~~~~~~~~~~~~~"); // TODO - error!("health timeout channel {ch:?} ~~~~~~~~~~~~~~~~~~~"); + error!("TODO health timeout channel {ch:?} ~~~~~~~~~~~~~~~~~~~"); std::process::exit(1); let addr = SocketAddr::V4(*addr_v4); cmd_remove_channel.push((addr, ch.clone())); @@ -1385,7 +1372,7 @@ impl CaConnSet { let mut connected = 0; let mut maybe_wrong_address = 0; let mut assigned_without_health_update = 0; - for (_ch, st) in self.channel_states.inner().iter() { + for (_ch, st) in self.channel_states.iter() { match &st.value { ChannelStateValue::Active(st2) => match st2 { ActiveChannelState::Init { .. } => { @@ -1477,16 +1464,28 @@ impl CaConnSet { } Ok(()) } + + fn handle_own_ticker_tick(mut self: Pin<&mut Self>, cx: &mut Context) -> Result<(), Error> { + debug!("handle_own_ticker_tick {}", Self::self_name()); + if !self.ready_for_end_of_stream() { + self.ticker = Self::new_self_ticker(); + let _ = self.ticker.poll_unpin(cx); + // cx.waker().wake_by_ref(); + } + self.handle_check_health()?; + Ok(()) + } } impl Stream for CaConnSet { type Item = CaConnSetItem; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - trace4!("CaConnSet poll begin"); use Poll::*; + trace4!("CaConnSet poll begin"); + let poll_ts1 = Instant::now(); self.stats.poll_fn_begin().inc(); - let res = loop { + let ret = loop { trace4!("CaConnSet poll loop"); self.stats.poll_loop_begin().inc(); @@ -1519,6 +1518,22 @@ impl Stream for CaConnSet { break Ready(Some(item)); } + match self.ticker.poll_unpin(cx) { + Ready(()) => match self.as_mut().handle_own_ticker_tick(cx) { + Ok(()) => { + have_progress = true; + } + Err(e) => { + have_progress = true; + error!("ticker {e}"); + break Ready(Some(CaConnSetItem::Error(e))); + } + }, + Pending => { + have_pending = true; + } + } + if let Some((addr, jh)) = self.await_ca_conn_jhs.front_mut() { match jh.poll_unpin(cx) { Ready(x) => { @@ -1591,6 +1606,7 @@ impl Stream for CaConnSet { } if self.channel_info_query_sender.is_idle() { + // if self.channel_info_query_sender.len().unwrap_or(0) <= 10 {} if let Some(item) = self.channel_info_query_queue.pop_front() { self.channel_info_query_sender.as_mut().send_pin(item); } @@ -1690,6 +1706,15 @@ impl Stream for CaConnSet { }; }; trace4!("CaConnSet poll done"); - res + let poll_ts2 = Instant::now(); + let dt = poll_ts2.saturating_duration_since(poll_ts1); + if dt > Duration::from_millis(80) { + warn!("long poll duration {:.0} ms", dt.as_secs_f32() * 1e3) + } else if dt > Duration::from_millis(40) { + info!("long poll duration {:.0} ms", dt.as_secs_f32() * 1e3) + } else if dt > Duration::from_millis(5) { + debug!("long poll duration {:.0} ms", dt.as_secs_f32() * 1e3) + } + ret } } diff --git a/netfetch/src/ca/finder.rs b/netfetch/src/ca/finder.rs index 61eecc7..b43a520 100644 --- a/netfetch/src/ca/finder.rs +++ b/netfetch/src/ca/finder.rs @@ -1,12 +1,9 @@ -use super::connset::CaConnSetEvent; use super::connset::IocAddrQuery; use super::connset::CURRENT_SEARCH_PENDING_MAX; use super::connset::SEARCH_BATCH_MAX; use super::search::ca_search_workers_start; use crate::ca::findioc::FindIocRes; -use crate::ca::findioc::FindIocStream; use crate::conf::CaIngestOpts; -use crate::daemon_common::DaemonEvent; use async_channel::Receiver; use async_channel::Sender; use dbpg::conn::make_pg_client; @@ -14,16 +11,11 @@ use dbpg::iocindex::IocItem; use dbpg::iocindex::IocSearchIndexWorker; use dbpg::postgres::Row as PgRow; use err::Error; -use futures_util::FutureExt; -use futures_util::StreamExt; use log::*; use netpod::Database; use stats::IocFinderStats; use std::collections::HashMap; use std::collections::VecDeque; -use std::net::SocketAddrV4; -use std::sync::atomic; -use std::sync::atomic::AtomicUsize; use std::sync::Arc; use std::time::Duration; use std::time::Instant; @@ -31,10 +23,6 @@ use taskrun::tokio; use tokio::task::JoinHandle; const SEARCH_DB_PIPELINE_LEN: usize = 4; -const FINDER_JOB_QUEUE_LEN_MAX: usize = 10; -const FINDER_BATCH_SIZE: usize = 8; -const FINDER_IN_FLIGHT_MAX: usize = 800; -const FINDER_TIMEOUT: Duration = Duration::from_millis(100); #[allow(unused)] macro_rules! debug_batch { diff --git a/netfetch/src/ca/findioc.rs b/netfetch/src/ca/findioc.rs index 0dc505b..a5f5e91 100644 --- a/netfetch/src/ca/findioc.rs +++ b/netfetch/src/ca/findioc.rs @@ -616,10 +616,10 @@ impl Stream for FindIocStream { match batch.tgts.pop_front() { Some(tgtix) => { Self::serialize_batch(buf1, batch); + debug!("serialized for search {:?}", batch.channels); match self.tgts.get(tgtix) { Some(tgt) => { let tgt = tgt.clone(); - //info!("Serialize and queue {bid:?}"); self.send_addr = tgt.clone(); self.batch_send_queue.push_back(bid); have_progress = true; diff --git a/netfetch/src/ca/statemap.rs b/netfetch/src/ca/statemap.rs index 00c767e..fa8707c 100644 --- a/netfetch/src/ca/statemap.rs +++ b/netfetch/src/ca/statemap.rs @@ -1,9 +1,13 @@ use crate::ca::conn::ChannelStateInfo; use crate::daemon_common::Channel; +use dashmap::DashMap; use serde::Serialize; use series::ChannelStatusSeriesId; +use std::collections::btree_map::RangeMut; use std::collections::BTreeMap; +use std::collections::HashMap; use std::net::SocketAddrV4; +use std::ops::RangeBounds; use std::time::Instant; use std::time::SystemTime; @@ -117,14 +121,51 @@ pub struct ChannelState { #[derive(Debug, Clone, Serialize)] pub struct ChannelStateMap { map: BTreeMap, + #[serde(skip)] + map2: HashMap, + // TODO implement same interface via dashmap and compare + #[serde(skip)] + map3: DashMap, } impl ChannelStateMap { pub fn new() -> Self { - Self { map: BTreeMap::new() } + Self { + map: BTreeMap::new(), + map2: HashMap::new(), + map3: DashMap::new(), + } } - pub fn inner(&mut self) -> &mut BTreeMap { - &mut self.map + pub fn insert(&mut self, k: Channel, v: ChannelState) -> Option { + self.map.insert(k, v) + } + + pub fn get_mut(&mut self, k: &Channel) -> Option<&mut ChannelState> { + self.map.iter_mut(); + self.map.get_mut(k) + } + + pub fn iter(&self) -> impl Iterator { + self.map.iter() + } + + pub fn iter_mut(&mut self) -> impl Iterator { + self.map.iter_mut() + } + + pub fn iter_mut_dash(&mut self) -> ChannelStateIter { + todo!() + } + + pub fn range_mut(&mut self, range: R) -> RangeMut + where + R: RangeBounds, + { + self.map.range_mut(range) } } + +pub struct ChannelStateIter<'a> { + _m1: &'a u32, +} diff --git a/netfetch/src/conf.rs b/netfetch/src/conf.rs index 4631d0d..8211079 100644 --- a/netfetch/src/conf.rs +++ b/netfetch/src/conf.rs @@ -1,5 +1,4 @@ use err::Error; -use ingest_linux::net::local_hostname; use netpod::log::*; use netpod::Database; use netpod::ScyllaConfig; diff --git a/netfetch/src/daemon_common.rs b/netfetch/src/daemon_common.rs index 3fd2b47..19596b7 100644 --- a/netfetch/src/daemon_common.rs +++ b/netfetch/src/daemon_common.rs @@ -2,7 +2,7 @@ use crate::ca::connset::CaConnSetItem; use async_channel::Sender; use serde::Serialize; -#[derive(Clone, Debug, Serialize, PartialEq, PartialOrd, Eq, Ord)] +#[derive(Clone, Debug, Serialize, PartialEq, PartialOrd, Eq, Ord, Hash)] pub struct Channel { id: String, } diff --git a/scywr/Cargo.toml b/scywr/Cargo.toml index 8bc8e11..c796fff 100644 --- a/scywr/Cargo.toml +++ b/scywr/Cargo.toml @@ -7,7 +7,7 @@ edition = "2021" [dependencies] futures-util = "0.3.28" async-channel = "2.0.0" -scylla = "0.10.1" +scylla = "0.11.0" smallvec = "1.11.0" pin-project = "1.1.3" stackfuture = "0.3.0" diff --git a/scywr/src/futbatch.rs b/scywr/src/futbatch.rs index e8a8d53..f6d5b43 100644 --- a/scywr/src/futbatch.rs +++ b/scywr/src/futbatch.rs @@ -4,7 +4,7 @@ use futures_util::Future; use futures_util::FutureExt; use netpod::log::*; use scylla::batch::Batch; -use scylla::frame::value::BatchValues; +use scylla::serialize::batch::BatchValues; use scylla::transport::errors::QueryError; use scylla::QueryResult; use std::pin::Pin; diff --git a/scywr/src/futbatchgen.rs b/scywr/src/futbatchgen.rs index fd276d3..2de6240 100644 --- a/scywr/src/futbatchgen.rs +++ b/scywr/src/futbatchgen.rs @@ -4,7 +4,7 @@ use futures_util::Future; use futures_util::FutureExt; use netpod::log::*; use scylla::batch::Batch; -use scylla::frame::value::BatchValues; +use scylla::serialize::batch::BatchValues; use scylla::transport::errors::QueryError; use scylla::QueryResult; use std::pin::Pin; diff --git a/scywr/src/futinsert.rs b/scywr/src/futinsert.rs index 9a91f7d..87cf87b 100644 --- a/scywr/src/futinsert.rs +++ b/scywr/src/futinsert.rs @@ -5,6 +5,7 @@ use futures_util::FutureExt; use netpod::log::*; use scylla::frame::value::ValueList; use scylla::prepared_statement::PreparedStatement; +use scylla::serialize::row::SerializeRow; use scylla::transport::errors::QueryError; use scylla::QueryResult; use std::pin::Pin; @@ -24,7 +25,7 @@ impl<'a> ScyInsertFut<'a> { pub fn new(scy: &'a ScySession, query: &'a PreparedStatement, values: V) -> Self where - V: ValueList + Send + 'static, + V: ValueList + SerializeRow + Send + 'static, { let fut = scy.execute(query, values); let fut = Box::pin(fut) as _; diff --git a/scywr/src/iteminsertqueue.rs b/scywr/src/iteminsertqueue.rs index 06e5bae..7ebdb0c 100644 --- a/scywr/src/iteminsertqueue.rs +++ b/scywr/src/iteminsertqueue.rs @@ -10,7 +10,11 @@ use futures_util::FutureExt; use netpod::timeunits::SEC; use netpod::ScalarType; use netpod::Shape; +use scylla::frame::value::Value; +use scylla::frame::value::ValueList; use scylla::prepared_statement::PreparedStatement; +use scylla::serialize::row::SerializeRow; +use scylla::serialize::value::SerializeCql; use scylla::transport::errors::DbError; use scylla::transport::errors::QueryError; use scylla::QueryResult; @@ -18,7 +22,6 @@ use series::ChannelStatusSeriesId; use series::SeriesId; use smallvec::smallvec; use smallvec::SmallVec; -use stackfuture::StackFuture; use stats::InsertWorkerStats; use std::net::SocketAddrV4; use std::pin::Pin; @@ -278,7 +281,7 @@ struct InsParCom { fn insert_scalar_gen_fut(par: InsParCom, val: ST, qu: Arc, scy: Arc) -> InsertFut where - ST: scylla::frame::value::Value + Send + 'static, + ST: Value + SerializeCql + Send + 'static, { let params = ( par.series as i64, @@ -293,7 +296,7 @@ where fn insert_array_gen_fut(par: InsParCom, val: ST, qu: Arc, scy: Arc) -> InsertFut where - ST: scylla::frame::value::Value + Send + 'static, + ST: Value + SerializeCql + Send + 'static, { let params = ( par.series as i64, @@ -318,7 +321,7 @@ pub struct InsertFut { } impl InsertFut { - pub fn new( + pub fn new( scy: Arc, qu: Arc, params: V, @@ -363,7 +366,7 @@ async fn insert_scalar_gen( data_store: &DataStore, ) -> Result<(), Error> where - ST: scylla::frame::value::Value, + ST: Value + SerializeCql, { let params = ( par.series as i64, @@ -399,7 +402,7 @@ async fn insert_array_gen( data_store: &DataStore, ) -> Result<(), Error> where - ST: scylla::frame::value::Value, + ST: Value + SerializeCql, { if par.do_insert { let params = ( diff --git a/series/src/series.rs b/series/src/series.rs index eae62bb..fc6bcac 100644 --- a/series/src/series.rs +++ b/series/src/series.rs @@ -30,7 +30,7 @@ impl SeriesId { } } -#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Serialize)] +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Hash, Serialize)] pub struct ChannelStatusSeriesId(u64); impl ChannelStatusSeriesId { diff --git a/stats/src/stats.rs b/stats/src/stats.rs index 7760131..c228f94 100644 --- a/stats/src/stats.rs +++ b/stats/src/stats.rs @@ -426,6 +426,10 @@ stats_proc::stats_struct!(( poll_reloop, poll_pending, poll_no_progress_no_pending, + poll_reloops_8, + poll_reloops_64, + poll_reloops_512, + poll_wake_break, storage_queue_send, storage_queue_pending, storage_queue_above_8,