From f1286d9ba6de47d140d87c304441053a9390ba7f Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 26 Mar 2025 09:08:14 +0100 Subject: [PATCH] By default more tasks for mt, lt write, WIP on channel inspect --- daqingest/Cargo.toml | 2 +- daqingest/src/daemon.rs | 84 +++++++++++- netfetch/src/ca/beacons.rs | 2 - netfetch/src/ca/conn.rs | 166 +++++++++++++++++------ netfetch/src/ca/connset.rs | 63 ++++++++- netfetch/src/ca/connset_input_merge.rs | 3 +- netfetch/src/ca/finder.rs | 7 +- netfetch/src/ca/statemap.rs | 7 +- netfetch/src/conf.rs | 14 +- netfetch/src/daemon_common.rs | 2 + netfetch/src/metrics.rs | 9 +- netfetch/src/metrics/ingest.rs | 10 +- netfetch/src/metrics/ingest/write_v02.rs | 10 +- netfetch/src/metrics/types.rs | 27 ++++ scywr/src/insertqueues.rs | 26 ++-- serde_helper/src/lib.rs | 16 +-- serde_helper/src/serde_dummy.rs | 13 ++ serde_helper/src/serde_instant.rs | 14 ++ serieswriter/src/binwriter.rs | 17 +-- serieswriter/src/fixgridwriter.rs | 3 +- serieswriter/src/msptool.rs | 3 +- serieswriter/src/msptool/fixgrid.rs | 3 +- serieswriter/src/ratelimitwriter.rs | 43 ++++-- serieswriter/src/rtwriter.rs | 46 +++---- serieswriter/src/writer.rs | 14 +- stats/src/stats.rs | 5 + 26 files changed, 449 insertions(+), 160 deletions(-) create mode 100644 netfetch/src/metrics/types.rs create mode 100644 serde_helper/src/serde_dummy.rs create mode 100644 serde_helper/src/serde_instant.rs diff --git a/daqingest/Cargo.toml b/daqingest/Cargo.toml index a64ae8e..849be30 100644 --- a/daqingest/Cargo.toml +++ b/daqingest/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqingest" -version = "0.2.7-aa.7" +version = "0.2.7-aa.10" authors = ["Dominik Werder "] edition = "2024" diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index a073125..9bf67d2 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -45,7 +45,8 @@ const CHECK_CHANNEL_SLOW_WARN: Duration = Duration::from_millis(500); pub struct DaemonOpts { pgconf: Database, - scyconf_st: ScyllaIngestConfig, + scyconf_st_rf3: ScyllaIngestConfig, + scyconf_st_rf1: ScyllaIngestConfig, scyconf_mt: ScyllaIngestConfig, scyconf_lt: ScyllaIngestConfig, #[allow(unused)] @@ -201,6 +202,16 @@ impl Daemon { let mut insert_worker_jhs = Vec::new(); if ingest_opts.scylla_disable() { + let jh = scywr::insertworker::spawn_scylla_insert_workers_dummy( + ingest_opts.insert_worker_count(), + ingest_opts.insert_worker_concurrency(), + iqrx.st_rf1_rx, + insert_worker_opts.clone(), + insert_worker_stats.clone(), + ) + .await + .map_err(Error::from_string)?; + insert_worker_jhs.extend(jh); let jh = scywr::insertworker::spawn_scylla_insert_workers_dummy( ingest_opts.insert_worker_count(), ingest_opts.insert_worker_concurrency(), @@ -231,10 +242,36 @@ impl Daemon { .await .map_err(Error::from_string)?; insert_worker_jhs.extend(jh); + let jh = scywr::insertworker::spawn_scylla_insert_workers_dummy( + ingest_opts.insert_worker_count(), + ingest_opts.insert_worker_concurrency(), + iqrx.lt_rf3_lat5_rx, + insert_worker_opts.clone(), + insert_worker_stats.clone(), + ) + .await + .map_err(Error::from_string)?; + insert_worker_jhs.extend(jh); } else { let jh = scywr::insertworker::spawn_scylla_insert_workers( RetentionTime::Short, - opts.scyconf_st.clone(), + opts.scyconf_st_rf1.clone(), + ingest_opts.insert_scylla_sessions(), + ingest_opts.insert_worker_count(), + ingest_opts.insert_worker_concurrency(), + iqrx.st_rf1_rx, + insert_worker_opts.clone(), + insert_worker_stats.clone(), + ingest_opts.use_rate_limit_queue(), + ignore_writes, + ) + .await + .map_err(Error::from_string)?; + insert_worker_jhs.extend(jh); + + let jh = scywr::insertworker::spawn_scylla_insert_workers( + RetentionTime::Short, + opts.scyconf_st_rf3.clone(), ingest_opts.insert_scylla_sessions(), ingest_opts.insert_worker_count(), ingest_opts.insert_worker_concurrency(), @@ -252,8 +289,8 @@ impl Daemon { RetentionTime::Medium, opts.scyconf_mt.clone(), ingest_opts.insert_scylla_sessions(), - ingest_opts.insert_worker_count().min(2), - ingest_opts.insert_worker_concurrency().min(8), + ingest_opts.insert_worker_count(), + ingest_opts.insert_worker_concurrency(), iqrx.mt_rf3_rx, insert_worker_opts.clone(), insert_worker_stats.clone(), @@ -270,8 +307,8 @@ impl Daemon { RetentionTime::Long, opts.scyconf_lt.clone(), ingest_opts.insert_scylla_sessions(), - ingest_opts.insert_worker_count().min(2), - ingest_opts.insert_worker_concurrency().min(8), + ingest_opts.insert_worker_count(), + ingest_opts.insert_worker_concurrency(), lt_rx_combined, insert_worker_opts.clone(), insert_worker_stats.clone(), @@ -422,6 +459,23 @@ impl Daemon { self.insert_queue_counter.load(atomic::Ordering::Acquire), ); } + let iqtxm = self + .iqtx + .as_ref() + .map(|x| netfetch::metrics::types::InsertQueuesTxMetrics::from(x)); + if let Some(iqtxm) = iqtxm { + self.stats().iqtx_len_st_rf1().set(iqtxm.st_rf1_len as _); + self.stats().iqtx_len_st_rf3().set(iqtxm.st_rf3_len as _); + self.stats().iqtx_len_mt_rf3().set(iqtxm.mt_rf3_len as _); + self.stats().iqtx_len_lt_rf3().set(iqtxm.lt_rf3_len as _); + self.stats().iqtx_len_lt_rf3_lat5().set(iqtxm.lt_rf3_lat5_len as _); + } else { + self.stats().iqtx_len_st_rf1().set(2); + self.stats().iqtx_len_st_rf3().set(2); + self.stats().iqtx_len_mt_rf3().set(2); + self.stats().iqtx_len_lt_rf3().set(2); + self.stats().iqtx_len_lt_rf3_lat5().set(2); + } Ok(()) } @@ -439,6 +493,11 @@ impl Daemon { Ok(()) } + async fn handle_channel_command(&mut self, cmd: netfetch::ca::connset::ChannelCommand) -> Result<(), Error> { + self.connset_ctrl.send_channel_command(cmd).await?; + Ok(()) + } + #[cfg(target_abi = "x32")] async fn handle_ca_conn_done(&mut self, conn_addr: SocketAddrV4) -> Result<(), Error> { info!("handle_ca_conn_done {conn_addr:?}"); @@ -510,6 +569,7 @@ impl Daemon { if self.shutting_down { warn!("already shutting down"); } else { + info!("handle_shutdown"); self.shutting_down = true; // TODO make sure we: // set a flag so that we don't attempt to use resources any longer (why could that happen?) @@ -518,6 +578,13 @@ impl Daemon { // drop our ends of channels to workers (gate them behind option?). // await the connection sets. // await other workers that we've spawned. + if let Some(iqtx) = &self.iqtx { + info!("scylla output channels, closing all"); + iqtx.close_all(); + } else { + info!("scylla output channels, not set"); + } + drop(self.iqtx.take()); self.connset_ctrl.shutdown().await?; self.rx.close(); } @@ -624,6 +691,7 @@ impl Daemon { } ChannelAdd(ch, tx) => self.handle_channel_add(ch, tx).await, ChannelRemove(ch) => self.handle_channel_remove(ch).await, + ChannelCommand(cmd) => self.handle_channel_command(cmd).await, CaConnSetItem(item) => self.handle_ca_conn_set_item(item).await, Shutdown => self.handle_shutdown().await, ConfigReload(tx) => self.handle_config_reload(tx).await, @@ -683,6 +751,7 @@ impl Daemon { self.channel_info_query_tx.clone(), self.series_conf_by_id_tx.clone(), self.iqtx + .clone() .take() .ok_or_else(|| Error::with_msg_no_trace("no iqtx available"))?, self.ingest_opts.scylla_config_st().clone(), @@ -833,7 +902,8 @@ pub async fn run(opts: CaIngestOpts, channels_config: Option) -> let opts2 = DaemonOpts { pgconf: opts.postgresql_config().clone(), - scyconf_st: opts.scylla_config_st().clone(), + scyconf_st_rf3: opts.scylla_config_st().clone(), + scyconf_st_rf1: opts.scylla_config_st_rf1().clone(), scyconf_mt: opts.scylla_config_mt().clone(), scyconf_lt: opts.scylla_config_lt().clone(), test_bsread_addr: opts.test_bsread_addr.clone(), diff --git a/netfetch/src/ca/beacons.rs b/netfetch/src/ca/beacons.rs index 379c4d1..0535e26 100644 --- a/netfetch/src/ca/beacons.rs +++ b/netfetch/src/ca/beacons.rs @@ -8,8 +8,6 @@ use netpod::Shape; use netpod::TsNano; use scywr::iteminsertqueue::DataValue; use scywr::iteminsertqueue::ScalarValue; -use serieswriter::writer::SeriesWriter; -use std::collections::VecDeque; use std::io::Cursor; use std::net::Ipv4Addr; use std::time::SystemTime; diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index e58d1e5..c0e5010 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -2,6 +2,7 @@ mod enumfetch; use crate::conf::ChannelConfig; use crate::metrics::status::StorageUsage; +use crate::metrics::types::CaConnMetrics; use crate::throttletrace::ThrottleTrace; use async_channel::Receiver; use async_channel::Sender; @@ -51,6 +52,7 @@ use scywriiq::MspItem; use scywriiq::QueryItem; use scywriiq::ShutdownReason; use serde::Serialize; +use serde_helper::serde_instant::serde_Instant_elapsed_ms; use series::ChannelStatusSeriesId; use series::SeriesId; use serieswriter::binwriter::BinWriter; @@ -230,7 +232,7 @@ mod ser_instant { } } -#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] +#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize)] pub struct Cid(pub u32); impl fmt::Display for Cid { @@ -239,7 +241,7 @@ impl fmt::Display for Cid { } } -#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] +#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize)] struct Subid(pub u32); impl Subid { @@ -248,7 +250,7 @@ impl Subid { } } -#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] +#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize)] struct Sid(pub u32); impl Sid { @@ -257,57 +259,71 @@ impl Sid { } } -#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] +#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize)] struct Ioid(pub u32); -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize)] enum ChannelError { CreateChanFail(ChannelStatusSeriesId), } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize)] struct CreatingState { + #[serde(with = "serde_Instant_elapsed_ms")] tsbeg: Instant, cssid: ChannelStatusSeriesId, cid: Cid, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize)] struct MakingSeriesWriterState { + #[serde(with = "serde_Instant_elapsed_ms")] tsbeg: Instant, channel: CreatedState, series_status: SeriesId, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize)] struct FetchEnumDetails { + #[serde(with = "serde_Instant_elapsed_ms")] tsbeg: Instant, cssid: ChannelStatusSeriesId, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize)] struct EnableMonitoringState { + #[serde(with = "serde_Instant_elapsed_ms")] tsbeg: Instant, subid: Subid, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize)] struct ReadPendingState { + #[serde(with = "serde_Instant_elapsed_ms")] tsbeg: Instant, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize)] struct Monitoring2PassiveState { // Holds instant when we entered this state. A receive of an event is considered a re-enter of the state, // so the instant gets updated. Used for timeout check. + #[serde(with = "serde_Instant_elapsed_ms")] tsbeg: Instant, + #[serde(with = "serde_Instant_elapsed_ms")] ts_silence_read_next: Instant, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize)] +struct Monitoring2ReadPendingState { + #[serde(with = "serde_Instant_elapsed_ms")] + tsbeg: Instant, + ioid: Ioid, +} + +#[derive(Debug, Clone, Serialize)] enum Monitoring2State { Passive(Monitoring2PassiveState), - ReadPending(Ioid, Instant), + ReadPending(Monitoring2ReadPendingState), } #[derive(Debug, Clone, Serialize)] enum MonitorReadCmp { @@ -317,8 +333,9 @@ enum MonitorReadCmp { DiffValue, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize)] struct MonitoringState { + #[serde(with = "serde_Instant_elapsed_ms")] tsbeg: Instant, subid: Subid, mon2state: Monitoring2State, @@ -326,20 +343,23 @@ struct MonitoringState { last_comparisons: VecDeque<(time::UtcDateTime, MonitorReadCmp)>, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize)] struct StopMonitoringForPollingState { + #[serde(with = "serde_Instant_elapsed_ms")] tsbeg: Instant, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize)] struct PollingState { + #[serde(with = "serde_Instant_elapsed_ms")] tsbeg: Instant, poll_ivl: Duration, tick: PollTickState, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize)] struct PollTickStateIdle { + #[serde(with = "serde_Instant_elapsed_ms")] next: Instant, } @@ -358,21 +378,24 @@ impl PollTickStateIdle { } } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize)] struct PollTickStateWait { + #[serde(with = "serde_Instant_elapsed_ms")] next_backup: Instant, + #[serde(with = "serde_Instant_elapsed_ms")] since: Instant, ioid: Ioid, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize)] enum PollTickState { Idle(PollTickStateIdle), Wait(PollTickStateWait), } -#[derive(Debug)] +#[derive(Debug, Serialize)] struct WritableState { + #[serde(with = "serde_Instant_elapsed_ms")] tsbeg: Instant, channel: CreatedState, writer: CaRtWriter, @@ -380,7 +403,7 @@ struct WritableState { reading: ReadingState, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize)] enum ReadingState { EnableMonitoring(EnableMonitoringState), Monitoring(MonitoringState), @@ -388,7 +411,7 @@ enum ReadingState { Polling(PollingState), } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize)] struct AccountingInfo { usage: StorageUsage, beg: TsMs, @@ -416,21 +439,29 @@ impl AccountingInfo { } } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize)] struct CreatedState { cssid: ChannelStatusSeriesId, cid: Cid, sid: Sid, ca_dbr_type: u16, ca_dbr_count: u32, + #[serde(with = "serde_Instant_elapsed_ms")] ts_created: Instant, // Updated when we receive something via monitoring or polling + #[serde(with = "serde_Instant_elapsed_ms")] ts_alive_last: Instant, // Updated on monitoring, polling or when the channel config changes to reset the timeout + #[serde(with = "serde_Instant_elapsed_ms")] ts_activity_last: Instant, st_activity_last: SystemTime, + // TODO + #[serde(skip)] insert_item_ivl_ema: IntervalEma, + // TODO + #[serde(skip)] item_recv_ivl_ema: IntervalEma, + #[serde(with = "serde_Instant_elapsed_ms")] insert_recv_ivl_last: Instant, muted_before: u32, recv_count: u64, @@ -450,6 +481,7 @@ struct CreatedState { name: String, enum_str_table: Option>, status_emit_count: u64, + #[serde(with = "serde_Instant_elapsed_ms")] ts_recv_value_status_emit_next: Instant, } @@ -498,7 +530,7 @@ impl CreatedState { } } -#[derive(Debug)] +#[derive(Debug, Serialize)] enum ChannelState { Init(ChannelStatusSeriesId), Creating(CreatingState), @@ -511,16 +543,19 @@ enum ChannelState { Ended(ChannelStatusSeriesId), } -#[derive(Debug)] +#[derive(Debug, Serialize)] struct ClosingState { + #[serde(with = "serde_Instant_elapsed_ms")] tsbeg: Instant, cssid: ChannelStatusSeriesId, } -#[derive(Debug)] +#[derive(Debug, Serialize)] struct ChannelConf { conf: ChannelConfig, state: ChannelState, + // TODO + #[serde(skip)] wrst: WriterStatus, } @@ -842,11 +877,18 @@ fn info_store_msp_from_time(ts: SystemTime) -> u32 { pub type CmdResTx = Sender>; +#[derive(Debug)] +pub struct CmdChannelInspectFull { + name: String, + tx: Sender, +} + #[derive(Debug)] pub enum ConnCommandKind { ChannelAdd(ChannelConfig, ChannelStatusSeriesId), ChannelClose(String), Shutdown, + ChannelInspectFull(CmdChannelInspectFull), } #[derive(Debug)] @@ -965,6 +1007,7 @@ pub enum CaConnEventValue { ChannelCreateFail(String), EndOfStream(EndOfStreamReason), ChannelRemoved(String), + Metrics(CaConnMetrics), } impl CaConnEventValue { @@ -977,6 +1020,7 @@ impl CaConnEventValue { CaConnEventValue::ChannelCreateFail(_) => "ChannelCreateFail", CaConnEventValue::EndOfStream(_) => "EndOfStream", CaConnEventValue::ChannelRemoved(_) => "ChannelRemoved", + CaConnEventValue::Metrics(_) => "Metrics", } } } @@ -1279,6 +1323,34 @@ impl CaConn { self.cmd_shutdown(); Ok(Ready(Some(()))) } + ConnCommandKind::ChannelInspectFull(cmd) => match self.cid_by_name(&cmd.name) { + Some(cid) => match self.channels.get(&cid) { + Some(ch) => match serde_json::to_value(&ch) { + Ok(val) => match cmd.tx.send_blocking(val) { + Ok(()) => { + // all fine + Ok(Ready(Some(()))) + } + Err(_) => { + // TODO count in metrics + Ok(Ready(Some(()))) + } + }, + Err(_) => { + // TODO count in metrics + Ok(Ready(Some(()))) + } + }, + None => { + // TODO count in metrics + Ok(Ready(Some(()))) + } + }, + None => { + // cmd.tx.close(); + Ok(Ready(Some(()))) + } + }, } } Ready(None) => { @@ -1333,6 +1405,9 @@ impl CaConn { ChannelState::MakingSeriesWriter(st) => { let scalar_type = st.channel.scalar_type.clone(); let shape = st.channel.shape.clone(); + if series::dbg::dbg_chn(st.channel.name()) { + info!("call RtWriter::new {:?} {:?}", chinfo, ch.conf); + } let writer = RtWriter::new( chinfo.series.to_series(), scalar_type, @@ -1639,7 +1714,9 @@ impl CaConn { } for (_cid, conf) in &mut self.channels { if series::dbg::dbg_chn(conf.conf.name()) { - info!("channel_state_on_shutdown {:?}", conf); + let js = serde_json::to_string(conf).unwrap(); + info!("channel_state_on_shutdown debug {:?}", conf); + info!("channel_state_on_shutdown json {}", js); } let chst = &mut conf.state; match chst { @@ -1770,7 +1847,7 @@ impl CaConn { match x.mon2state { // actually, no differing behavior needed so far. Monitoring2State::Passive(_) => {} - Monitoring2State::ReadPending(_, _) => {} + Monitoring2State::ReadPending(_) => {} } Some(x.subid.clone()) } @@ -1831,7 +1908,7 @@ impl CaConn { Monitoring2State::Passive(st3) => { st3.tsbeg = tsnow; } - Monitoring2State::ReadPending(_ioid, _since) => { + Monitoring2State::ReadPending(_) => { // Received EventAdd while still waiting for answer to explicit ReadNotify. // This is fine. self.stats.recv_event_add_while_wait_on_read_notify.inc(); @@ -1939,7 +2016,7 @@ impl CaConn { Monitoring2State::Passive(st3) => { st3.tsbeg = tsnow; } - Monitoring2State::ReadPending(_, _) => {} + Monitoring2State::ReadPending(_) => {} } let name = self.name_by_cid(cid); warn!("received event-cancel but channel {name:?} in wrong state"); @@ -2045,10 +2122,10 @@ impl CaConn { } st3.tsbeg = tsnow; } - Monitoring2State::ReadPending(ioid2, _since) => { + Monitoring2State::ReadPending(st3) => { // We don't check again for `since` here. That's done in timeout checking. // So we could be here a little beyond timeout but we don't care about that. - if ioid != *ioid2 { + if ioid != st3.ioid { // warn!("IOID mismatch ReadNotifyRes on Monitor Read Pending {ioid:?} {ioid2:?}"); self.stats.recv_read_notify_state_read_pending_bad_ioid.inc(); } else { @@ -2243,7 +2320,7 @@ impl CaConn { Self::check_ev_value_data(&value.data, &writer.scalar_type())?; crst.muted_before = 0; crst.insert_item_ivl_ema.tick(tsnow); - binwriter.ingest(tsev, value.f32_for_binning(), iqdqs)?; + // binwriter.ingest(tsev, value.f32_for_binning(), iqdqs)?; { let wres = writer.write(CaWriterValue::new(value, crst), tsnow, tsev, iqdqs)?; crst.status_emit_count += wres.nstatus() as u64; @@ -2449,7 +2526,8 @@ impl CaConn { ); do_wake_again = true; self.proto.as_mut().ok_or_else(|| Error::NoProtocol)?.push_out(msg); - st3.mon2state = Monitoring2State::ReadPending(ioid, tsnow); + st3.mon2state = + Monitoring2State::ReadPending(Monitoring2ReadPendingState { tsbeg: tsnow, ioid }); self.stats.caget_issued().inc(); { let item = ChannelStatusItem { @@ -2462,8 +2540,8 @@ impl CaConn { } } } - Monitoring2State::ReadPending(ioid, since) => { - if *since + MONITOR_POLL_TIMEOUT < tsnow { + Monitoring2State::ReadPending(st4) => { + if st4.tsbeg + MONITOR_POLL_TIMEOUT < tsnow { // Something is wrong with this channel. // Maybe we lost connection, maybe the IOC went down, maybe there is a bug where only // this or a subset of the subscribed channels no longer give updates. @@ -2472,7 +2550,7 @@ impl CaConn { trace_monitor_stale!( "channel monitor explicit read timeout {} ioid {:?}", name, - ioid + st4.ioid ); { let item = ChannelStatusItem { @@ -2609,7 +2687,7 @@ impl CaConn { } ReadingState::Monitoring(st3) => match &st3.mon2state { Monitoring2State::Passive(_st4) => {} - Monitoring2State::ReadPending(_, _) => { + Monitoring2State::ReadPending(_) => { // This is handled in check_channels_state_poll // TODO should unify. } @@ -3095,6 +3173,7 @@ impl CaConn { CaConnState::EndOfStream => {} } self.iqdqs.housekeeping(); + self.metrics_emit(); Ok(()) } @@ -3106,6 +3185,14 @@ impl CaConn { } } + fn metrics_emit(&mut self) { + let item = CaConnMetrics { + ca_conn_event_out_queue_len: self.ca_conn_event_out_queue.len(), + }; + let item = CaConnEvent::new(Instant::now(), CaConnEventValue::Metrics(item)); + self.ca_conn_event_out_queue.push_back(item); + } + fn emit_channel_status(&mut self) -> Result<(), Error> { let stnow = SystemTime::now(); let mut channel_statuses = BTreeMap::new(); @@ -3665,6 +3752,7 @@ impl Stream for CaConn { } } +#[derive(Debug, Serialize)] struct CaWriterValueState { series_data: SeriesId, series_status: SeriesId, @@ -3687,7 +3775,7 @@ impl CaWriterValueState { } } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize)] struct CaWriterValue(CaEventValue, Option); impl CaWriterValue { diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index 84697ef..0793abc 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -71,6 +71,7 @@ use std::time::Duration; use std::time::Instant; use std::time::SystemTime; use taskrun::tokio; +use tracing::Instrument; const CHECK_CHANS_PER_TICK: usize = 10000000; pub const SEARCH_BATCH_MAX: usize = 64; @@ -225,6 +226,17 @@ impl fmt::Debug for ChannelStatusesRequest { } } +#[derive(Debug)] +pub enum ChannelCommandKind { + InspectDetail, +} + +#[derive(Debug)] +pub struct ChannelCommand { + pub channel: String, + pub kind: ChannelCommandKind, +} + #[derive(Debug)] pub enum ConnSetCmd { ChannelConfigFlagReset(ChannelConfigFlagReset), @@ -233,6 +245,7 @@ pub enum ConnSetCmd { ChannelRemove(ChannelRemove), Shutdown, ChannelStatuses(ChannelStatusesRequest), + ChannelCommand(ChannelCommand), } #[derive(Debug)] @@ -299,6 +312,13 @@ impl CaConnSetCtrl { Ok(()) } + pub async fn send_channel_command(&self, cmd: ChannelCommand) -> Result<(), Error> { + self.tx + .send(CaConnSetEvent::ConnSetCmd(ConnSetCmd::ChannelCommand(cmd))) + .await?; + Ok(()) + } + pub async fn shutdown(&self) -> Result<(), Error> { let cmd = ConnSetCmd::Shutdown; self.tx.send(CaConnSetEvent::ConnSetCmd(cmd)).await?; @@ -582,6 +602,7 @@ impl CaConnSet { ConnSetCmd::ChannelRemove(x) => self.handle_remove_channel(x), ConnSetCmd::Shutdown => self.handle_shutdown(), ConnSetCmd::ChannelStatuses(x) => self.handle_channel_statuses_req(x), + ConnSetCmd::ChannelCommand(x) => self.handle_channel_command(x), }, } } @@ -721,6 +742,9 @@ impl CaConnSet { trace3!("handle_add_channel but shutdown_stopping"); return Ok(()); } + if series::dbg::dbg_chn(cmd.name()) { + info!("handle_add_channel {:?}", cmd); + } trace_channel_state!("handle_add_channel {:?}", cmd); self.stats.channel_add().inc(); // TODO should I add the transition through ActiveChannelState::Init as well? @@ -744,6 +768,10 @@ impl CaConnSet { CaConnEventValue::ChannelStatus(st) => self.apply_ca_conn_health_update(addr, st), CaConnEventValue::EndOfStream(reason) => self.handle_ca_conn_eos(addr, reason), CaConnEventValue::ChannelRemoved(name) => self.handle_ca_conn_channel_removed(addr, name), + CaConnEventValue::Metrics(v) => { + // TODO aggregate metrics and stats + Ok(()) + } } } @@ -1047,6 +1075,24 @@ impl CaConnSet { Ok(()) } + fn handle_channel_command(&mut self, cmd: ChannelCommand) -> Result<(), Error> { + if self.shutdown_stopping { + return Ok(()); + } + // TODO handle, send to corresponding CaConn + // let channels_ca_conn_set = self + // .channel_states + // .iter() + // .filter(|(k, _)| k.name() == cmd.channel) + // .map(|(k, v)| (k.name().to_string(), v.clone())) + // .collect(); + // let item = ChannelStatusesResponse { channels_ca_conn_set }; + // if req.tx.try_send(item).is_err() { + // self.stats.response_tx_fail.inc(); + // } + Ok(()) + } + fn handle_shutdown(&mut self) -> Result<(), Error> { if self.shutdown_stopping { return Ok(()); @@ -1317,7 +1363,19 @@ impl CaConnSet { let conn_tx = conn.conn_command_tx(); let conn_stats = conn.stats(); let tx1 = self.ca_conn_res_tx.as_ref().get_ref().clone(); - let jh = tokio::spawn(Self::ca_conn_item_merge(conn, tx1, addr, self.stats.clone())); + let log_level = "trace"; + let logspan = if log_level == "trace" { + trace!("enable trace for handler"); + tracing::span!(tracing::Level::INFO, "log_span_trace") + } else if log_level == "debug" { + debug!("enable debug for handler"); + tracing::span!(tracing::Level::INFO, "log_span_debug") + } else { + tracing::Span::none() + }; + let fut = Self::ca_conn_item_merge(conn, tx1, addr, self.stats.clone()); + let fut = fut.instrument(logspan); + let jh = tokio::spawn(fut); let ca_conn_res = CaConnRes { state: CaConnState::new(CaConnStateValue::Fresh), sender: Box::pin(conn_tx.into()), @@ -1391,6 +1449,9 @@ impl CaConnSet { return Err(e.into()); } } + CaConnEventValue::Metrics(_) => { + // TODO merge metrics + } } } if let Some(x) = eos_reason { diff --git a/netfetch/src/ca/connset_input_merge.rs b/netfetch/src/ca/connset_input_merge.rs index 3886aa8..86402e0 100644 --- a/netfetch/src/ca/connset_input_merge.rs +++ b/netfetch/src/ca/connset_input_merge.rs @@ -1,14 +1,13 @@ use super::connset::CaConnSetEvent; use super::findioc::FindIocRes; -use crate::ca::connset::ConnSetCmd; use async_channel::Receiver; use dbpg::seriesbychannel::ChannelInfoResult; use err::Error; use futures_util::Stream; use pin_project::pin_project; use std::collections::VecDeque; -use std::pin::pin; use std::pin::Pin; +use std::pin::pin; use std::task::Context; use std::task::Poll; diff --git a/netfetch/src/ca/finder.rs b/netfetch/src/ca/finder.rs index 5732b10..1a6e2d3 100644 --- a/netfetch/src/ca/finder.rs +++ b/netfetch/src/ca/finder.rs @@ -1,5 +1,5 @@ -use super::connset::IocAddrQuery; use super::connset::CURRENT_SEARCH_PENDING_MAX; +use super::connset::IocAddrQuery; use super::connset::SEARCH_BATCH_MAX; use super::search::ca_search_workers_start; use crate::ca::findioc::FindIocRes; @@ -10,7 +10,6 @@ use dbpg::conn::make_pg_client; use dbpg::iocindex::IocItem; use dbpg::iocindex::IocSearchIndexWorker; use dbpg::postgres::Row as PgRow; -use hashbrown::HashMap; use log::*; use netpod::Database; use stats::IocFinderStats; @@ -23,9 +22,7 @@ use tokio::task::JoinHandle; const SEARCH_DB_WORKER_CNT: usize = 2; -macro_rules! debug_batch { ($($arg:tt)*) => ( if false { debug!($($arg)*); } ) } - -macro_rules! trace_batch { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } +macro_rules! debug_batch { ($($arg:expr),*) => ( if false { debug!($($arg),*); } ); } autoerr::create_error_v1!( name(Error, "Finder"), diff --git a/netfetch/src/ca/statemap.rs b/netfetch/src/ca/statemap.rs index 40ff236..0f26201 100644 --- a/netfetch/src/ca/statemap.rs +++ b/netfetch/src/ca/statemap.rs @@ -3,12 +3,13 @@ use crate::conf::ChannelConfig; use crate::daemon_common::ChannelName; use dashmap::DashMap; use serde::Serialize; +use serde_helper::serde_instant::serde_Instant_elapsed_ms; use series::ChannelStatusSeriesId; use serieswriter::fixgridwriter::ChannelStatusSeriesWriter; use serieswriter::fixgridwriter::ChannelStatusWriteState; -use std::collections::btree_map::RangeMut; use std::collections::BTreeMap; use std::collections::HashMap; +use std::collections::btree_map::RangeMut; use std::net::SocketAddr; use std::net::SocketAddrV4; use std::ops::RangeBounds; @@ -65,7 +66,7 @@ pub enum WithAddressState { pub struct UnassignedState { #[serde(with = "humantime_serde")] since: SystemTime, - #[serde(with = "serde_helper::serde_Instant")] + #[serde(with = "serde_Instant_elapsed_ms")] unused_since_ts: Instant, } @@ -73,7 +74,7 @@ pub struct UnassignedState { pub struct UnassigningForConfigChangeState { pub config_new: ChannelConfig, pub addr: SocketAddr, - #[serde(with = "serde_helper::serde_Instant")] + #[serde(with = "serde_Instant_elapsed_ms")] pub since: Instant, } diff --git a/netfetch/src/conf.rs b/netfetch/src/conf.rs index feffa61..b427fd0 100644 --- a/netfetch/src/conf.rs +++ b/netfetch/src/conf.rs @@ -102,11 +102,11 @@ impl CaIngestOpts { } pub fn insert_worker_count(&self) -> usize { - self.insert_worker_count.unwrap_or(8) + self.insert_worker_count.unwrap_or(10) } pub fn insert_worker_concurrency(&self) -> usize { - self.insert_worker_concurrency.unwrap_or(32) + self.insert_worker_concurrency.unwrap_or(64) } pub fn array_truncate(&self) -> u64 { @@ -352,10 +352,6 @@ impl IngestConfigArchiving { } } -fn bool_is_false(x: &bool) -> bool { - *x == false -} - fn bool_true() -> bool { true } @@ -363,12 +359,9 @@ fn bool_true() -> bool { mod serde_ingest_config_archiving { use super::ChannelReadConfigApiFormat; use super::IngestConfigArchiving; - use serde::Deserializer; use serde::Serializer; - use serde::de; use serde::ser; use serde::ser::SerializeMap; - use std::fmt; impl ser::Serialize for IngestConfigArchiving { fn serialize(&self, ser: S) -> Result @@ -682,7 +675,8 @@ impl ChannelConfig { } pub fn replication(&self) -> bool { - self.arch.replication + // self.arch.replication + true } pub fn poll_conf(&self) -> Option<(u64,)> { diff --git a/netfetch/src/daemon_common.rs b/netfetch/src/daemon_common.rs index b793257..ace5211 100644 --- a/netfetch/src/daemon_common.rs +++ b/netfetch/src/daemon_common.rs @@ -23,6 +23,7 @@ pub enum DaemonEvent { TimerTick(u32, Sender), ChannelAdd(ChannelConfig, crate::ca::conn::CmdResTx), ChannelRemove(ChannelName), + ChannelCommand(crate::ca::connset::ChannelCommand), CaConnSetItem(CaConnSetItem), Shutdown, ConfigReload(async_channel::Sender), @@ -35,6 +36,7 @@ impl DaemonEvent { TimerTick(_, _) => format!("TimerTick"), ChannelAdd(x, _) => format!("ChannelAdd {x:?}"), ChannelRemove(x) => format!("ChannelRemove {x:?}"), + ChannelCommand(x) => format!("ChannelCommand {x:?}"), CaConnSetItem(_) => format!("CaConnSetItem"), Shutdown => format!("Shutdown"), ConfigReload(..) => format!("ConfigReload"), diff --git a/netfetch/src/metrics.rs b/netfetch/src/metrics.rs index be2a0b0..705cdce 100644 --- a/netfetch/src/metrics.rs +++ b/netfetch/src/metrics.rs @@ -2,6 +2,7 @@ pub mod delete; pub mod ingest; pub mod status; +pub mod types; use crate::ca::conn::ChannelStateInfo; use crate::ca::connset::CaConnSetEvent; @@ -46,9 +47,9 @@ use std::collections::HashMap; use std::net::SocketAddr; use std::net::SocketAddrV4; use std::pin::Pin; +use std::sync::Arc; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; -use std::sync::Arc; use std::task::Context; use std::task::Poll; use std::time::Duration; @@ -367,9 +368,9 @@ fn make_routes( connset_cmd_tx: Sender, stats_set: StatsSet, ) -> axum::Router { + use axum::Router; use axum::extract; use axum::routing::{get, post, put}; - use axum::Router; use http::StatusCode; Router::new() @@ -518,9 +519,9 @@ fn make_routes_channel( connset_cmd_tx: Sender, stats_set: StatsSet, ) -> axum::Router { + use axum::Router; use axum::extract; use axum::routing::{get, post, put}; - use axum::Router; use http::StatusCode; Router::new() .fallback(|| async { axum::Json(json!({"subcommands":["states"]})) }) @@ -560,9 +561,9 @@ fn make_routes_ingest( connset_cmd_tx: Sender, stats_set: StatsSet, ) -> axum::Router { + use axum::Router; use axum::extract; use axum::routing::{get, post, put}; - use axum::Router; use http::StatusCode; Router::new() .nest( diff --git a/netfetch/src/metrics/ingest.rs b/netfetch/src/metrics/ingest.rs index 3ff1a5b..b2c499b 100644 --- a/netfetch/src/metrics/ingest.rs +++ b/netfetch/src/metrics/ingest.rs @@ -1,10 +1,10 @@ pub mod write_v02; use super::RoutesResources; +use axum::Json; use axum::extract::FromRequest; use axum::extract::Query; use axum::http::HeaderMap; -use axum::Json; use bytes::Bytes; use core::fmt; use dbpg::seriesbychannel::ChannelInfoQuery; @@ -12,20 +12,21 @@ use futures_util::StreamExt; use futures_util::TryStreamExt; use items_2::binning::container_events::ContainerEvents; use items_2::binning::container_events::EventValueType; -use netpod::log::*; -use netpod::ttl::RetentionTime; +use netpod::APP_CBOR_FRAMED; use netpod::EnumVariant; use netpod::ScalarType; use netpod::SeriesKind; use netpod::Shape; use netpod::TsNano; -use netpod::APP_CBOR_FRAMED; +use netpod::log::*; +use netpod::ttl::RetentionTime; use scywr::insertqueues::InsertDeques; use scywr::iteminsertqueue::ArrayValue; use scywr::iteminsertqueue::DataValue; use scywr::iteminsertqueue::QueryItem; use scywr::iteminsertqueue::ScalarValue; use serde::Deserialize; +use serde::Serialize; use series::SeriesId; use serieswriter::msptool::MspSplit; use serieswriter::writer::EmittableType; @@ -70,6 +71,7 @@ macro_rules! trace_queues { type ValueSeriesWriter = SeriesWriter; +#[derive(Debug, Serialize)] struct WritableTypeState { series: SeriesId, msp_split_data: MspSplit, diff --git a/netfetch/src/metrics/ingest/write_v02.rs b/netfetch/src/metrics/ingest/write_v02.rs index cc23247..7f698f3 100644 --- a/netfetch/src/metrics/ingest/write_v02.rs +++ b/netfetch/src/metrics/ingest/write_v02.rs @@ -1,8 +1,8 @@ use crate::metrics::RoutesResources; +use axum::Json; use axum::extract::FromRequest; use axum::extract::Query; use axum::http::HeaderMap; -use axum::Json; use bytes::Bytes; use core::fmt; use dbpg::seriesbychannel::ChannelInfoQuery; @@ -10,21 +10,22 @@ use futures_util::StreamExt; use futures_util::TryStreamExt; use items_2::binning::container_events::ContainerEvents; use items_2::binning::container_events::EventValueType; -use netpod::log; -use netpod::ttl::RetentionTime; +use netpod::APP_CBOR_FRAMED; use netpod::DaqbufChannelConfig; use netpod::EnumVariant; use netpod::ScalarType; use netpod::SeriesKind; use netpod::Shape; use netpod::TsNano; -use netpod::APP_CBOR_FRAMED; +use netpod::log; +use netpod::ttl::RetentionTime; use scywr::insertqueues::InsertDeques; use scywr::iteminsertqueue::ArrayValue; use scywr::iteminsertqueue::DataValue; use scywr::iteminsertqueue::QueryItem; use scywr::iteminsertqueue::ScalarValue; use serde::Deserialize; +use serde::Serialize; use series::SeriesId; use serieswriter::msptool::MspSplit; use serieswriter::writer::EmittableType; @@ -76,6 +77,7 @@ autoerr::create_error_v1!( type ValueSeriesWriter = SeriesWriter; +#[derive(Debug, Serialize)] struct WritableTypeState { series: SeriesId, msp_split_data: MspSplit, diff --git a/netfetch/src/metrics/types.rs b/netfetch/src/metrics/types.rs new file mode 100644 index 0000000..f8332a5 --- /dev/null +++ b/netfetch/src/metrics/types.rs @@ -0,0 +1,27 @@ +use scywr::insertqueues::InsertQueuesTx; +use serde::Serialize; + +#[derive(Debug, Serialize)] +pub struct CaConnMetrics { + pub ca_conn_event_out_queue_len: usize, +} + +pub struct InsertQueuesTxMetrics { + pub st_rf1_len: usize, + pub st_rf3_len: usize, + pub mt_rf3_len: usize, + pub lt_rf3_len: usize, + pub lt_rf3_lat5_len: usize, +} + +impl From<&InsertQueuesTx> for InsertQueuesTxMetrics { + fn from(value: &InsertQueuesTx) -> Self { + Self { + st_rf1_len: value.st_rf1_tx.len(), + st_rf3_len: value.st_rf3_tx.len(), + mt_rf3_len: value.mt_rf3_tx.len(), + lt_rf3_len: value.lt_rf3_tx.len(), + lt_rf3_lat5_len: value.lt_rf3_lat5_tx.len(), + } + } +} diff --git a/scywr/src/insertqueues.rs b/scywr/src/insertqueues.rs index 09e6608..7e20625 100644 --- a/scywr/src/insertqueues.rs +++ b/scywr/src/insertqueues.rs @@ -96,6 +96,14 @@ impl InsertQueuesTx { Ok(()) } + pub fn close_all(&self) { + self.st_rf1_tx.close(); + self.st_rf3_tx.close(); + self.mt_rf3_tx.close(); + self.lt_rf3_tx.close(); + self.lt_rf3_lat5_tx.close(); + } + pub fn clone2(&self) -> Self { self.clone() } @@ -231,11 +239,12 @@ impl<'a> fmt::Display for InsertDequesSummary<'a> { let obj = self.obj; write!( fmt, - "InsertDeques {{ st_rf1_len: {}, st_rf3_len: {}, mt_rf3_len: {}, lt_rf3_len: {} }}", + "InsertDeques {{ st_rf1_len: {}, st_rf3_len: {}, mt_rf3_len: {}, lt_rf3_len: {}, lt_rf3_lat5_len: {} }}", obj.st_rf1_qu.len(), obj.st_rf3_qu.len(), obj.mt_rf3_qu.len(), - obj.lt_rf3_qu.len() + obj.lt_rf3_qu.len(), + obj.lt_rf3_lat5_qu.len() ) } } @@ -266,7 +275,11 @@ impl InsertSenderPolling { } pub fn is_idle(&self) -> bool { - self.st_rf1_sp.is_idle() && self.st_rf3_sp.is_idle() && self.mt_rf3_sp.is_idle() && self.lt_rf3_sp.is_idle() + self.st_rf1_sp.is_idle() + && self.st_rf3_sp.is_idle() + && self.mt_rf3_sp.is_idle() + && self.lt_rf3_sp.is_idle() + && self.lt_rf3_lat5_sp.is_idle() } pub fn st_rf1_sp_pin(self: Pin<&mut Self>) -> Pin<&mut SenderPolling>> { @@ -289,13 +302,6 @@ impl InsertSenderPolling { self.project().lt_rf3_lat5_sp } - pub fn __st_rf1_sp_pin(self: Pin<&mut Self>) -> Pin<&mut SenderPolling>> { - if true { - panic!("encapsulated by pin_project"); - } - unsafe { self.map_unchecked_mut(|x| &mut x.st_rf1_sp) } - } - pub fn summary(&self) -> InsertSenderPollingSummary { InsertSenderPollingSummary { obj: self } } diff --git a/serde_helper/src/lib.rs b/serde_helper/src/lib.rs index 54b71b5..3164c32 100644 --- a/serde_helper/src/lib.rs +++ b/serde_helper/src/lib.rs @@ -1,14 +1,2 @@ -#[allow(non_snake_case)] -pub mod serde_Instant { - use serde::Serializer; - use std::time::Instant; - - #[allow(unused)] - pub fn serialize(val: &Instant, ser: S) -> Result - where - S: Serializer, - { - let dur = val.elapsed(); - ser.serialize_u64(dur.as_secs() * 1000 + dur.subsec_millis() as u64) - } -} +pub mod serde_dummy; +pub mod serde_instant; diff --git a/serde_helper/src/serde_dummy.rs b/serde_helper/src/serde_dummy.rs new file mode 100644 index 0000000..c66f919 --- /dev/null +++ b/serde_helper/src/serde_dummy.rs @@ -0,0 +1,13 @@ +#[allow(non_snake_case)] +pub mod serde_dummy { + use serde::Serializer; + use std::time::Instant; + + #[allow(unused)] + pub fn serialize(val: &T, ser: S) -> Result + where + S: Serializer, + { + ser.serialize_str("DUMMY") + } +} diff --git a/serde_helper/src/serde_instant.rs b/serde_helper/src/serde_instant.rs new file mode 100644 index 0000000..384b553 --- /dev/null +++ b/serde_helper/src/serde_instant.rs @@ -0,0 +1,14 @@ +#[allow(non_snake_case)] +pub mod serde_Instant_elapsed_ms { + use serde::Serializer; + use std::time::Instant; + + #[allow(unused)] + pub fn serialize(val: &Instant, ser: S) -> Result + where + S: Serializer, + { + let dur = val.elapsed(); + ser.serialize_u64(dur.as_secs() * 1000 + dur.subsec_millis() as u64) + } +} diff --git a/serieswriter/src/binwriter.rs b/serieswriter/src/binwriter.rs index e1d8993..78b21d8 100644 --- a/serieswriter/src/binwriter.rs +++ b/serieswriter/src/binwriter.rs @@ -1,14 +1,11 @@ use crate::log; use crate::rtwriter::MinQuiets; use items_0::timebin::BinnedBinsTimeweightTrait; -use items_0::timebin::BinnedEventsTimeweightTrait; use items_0::timebin::BinsBoxed; use items_2::binning::container_bins::ContainerBins; use items_2::binning::container_events::ContainerEvents; use items_2::binning::timeweight::timeweight_bins::BinnedBinsTimeweight; -use items_2::binning::timeweight::timeweight_bins_lazy::BinnedBinsTimeweightLazy; use items_2::binning::timeweight::timeweight_events::BinnedEventsTimeweight; -use items_2::binning::timeweight::timeweight_events_dyn::BinnedEventsTimeweightLazy; use netpod::BinnedRange; use netpod::DtMs; use netpod::ScalarType; @@ -19,10 +16,10 @@ use scywr::insertqueues::InsertDeques; use scywr::iteminsertqueue::BinWriteIndexV03; use scywr::iteminsertqueue::QueryItem; use scywr::iteminsertqueue::TimeBinSimpleF32V02; +use serde::Serialize; use series::ChannelStatusSeriesId; use series::SeriesId; use series::msp::PrebinnedPartitioning; -use std::time::Duration; macro_rules! info { ($($arg:expr),*) => ( if true { log::info!($($arg),*); } ) } @@ -31,7 +28,7 @@ macro_rules! debug_bin { ($t:expr, $($arg:expr),*) => ( if true { if $t { log::d macro_rules! trace_ingest { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ) } macro_rules! trace_tick { ($($arg:expr),*) => ( if true { log::trace!($($arg),*); } ) } -macro_rules! trace_tick_verbose { ($($arg:expr),*) => ( if true { log::trace!($($arg),*); } ) } +macro_rules! trace_tick_verbose { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ) } macro_rules! trace_bin { ($t:expr, $($arg:expr),*) => ( if true { if $t { log::trace!($($arg),*); } } ) } @@ -68,7 +65,7 @@ fn bin_len_clamp(dur: DtMs) -> PrebinnedPartitioning { } } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize)] enum WriteCntZero { Enable, Disable, @@ -83,7 +80,7 @@ impl WriteCntZero { } } -#[derive(Debug)] +#[derive(Debug, Serialize)] enum IndexWritten { None, Last(u32, u32), @@ -107,7 +104,7 @@ impl IndexWritten { } } -#[derive(Debug)] +#[derive(Debug, Serialize)] pub struct BinWriter { chname: String, cssid: ChannelStatusSeriesId, @@ -425,6 +422,10 @@ impl BinWriter { iqdqs: &mut InsertDeques, ) -> Result<(), Error> { let selfname = "handle_output_ready"; + if true { + trace_tick!("{selfname} bins ready len {} DISCARDING", bins.len()); + return Ok(()); + } trace_tick!("{selfname} bins ready len {}", bins.len()); for e in bins.iter_debug() { trace_tick_verbose!("{e:?}"); diff --git a/serieswriter/src/fixgridwriter.rs b/serieswriter/src/fixgridwriter.rs index 5c516d3..def3510 100644 --- a/serieswriter/src/fixgridwriter.rs +++ b/serieswriter/src/fixgridwriter.rs @@ -8,6 +8,7 @@ use scywr::iteminsertqueue::DataValue; use scywr::iteminsertqueue::MspItem; use scywr::iteminsertqueue::QueryItem; use scywr::iteminsertqueue::ScalarValue; +use serde::Serialize; use series::SeriesId; use std::time::Instant; @@ -78,7 +79,7 @@ impl EmittableType for ChannelStatusWriteValue { } } -#[derive(Debug)] +#[derive(Debug, Serialize)] pub struct ChannelStatusWriteState { series: SeriesId, msp_split: MspSplitFixGrid, diff --git a/serieswriter/src/msptool.rs b/serieswriter/src/msptool.rs index 159068b..cdaf87f 100644 --- a/serieswriter/src/msptool.rs +++ b/serieswriter/src/msptool.rs @@ -2,11 +2,12 @@ pub mod fixgrid; use netpod::DtNano; use netpod::TsNano; +use serde::Serialize; const SEC: u64 = 1000_000_000; const HOUR: u64 = SEC * 60 * 60 * 24; -#[derive(Debug)] +#[derive(Debug, Serialize)] pub struct MspSplit { last: Option, count: u32, diff --git a/serieswriter/src/msptool/fixgrid.rs b/serieswriter/src/msptool/fixgrid.rs index cc0d6b4..2400a7f 100644 --- a/serieswriter/src/msptool/fixgrid.rs +++ b/serieswriter/src/msptool/fixgrid.rs @@ -2,8 +2,9 @@ use netpod::DtMs; use netpod::DtNano; use netpod::TsMs; use netpod::TsNano; +use serde::Serialize; -#[derive(Debug)] +#[derive(Debug, Serialize)] pub struct MspSplitFixGrid { grid_dt: DtMs, last: Option, diff --git a/serieswriter/src/ratelimitwriter.rs b/serieswriter/src/ratelimitwriter.rs index 5e431b2..04351e4 100644 --- a/serieswriter/src/ratelimitwriter.rs +++ b/serieswriter/src/ratelimitwriter.rs @@ -5,6 +5,7 @@ use netpod::DtNano; use netpod::TsNano; use netpod::log; use scywr::iteminsertqueue::QueryItem; +use serde::Serialize; use series::SeriesId; use std::collections::VecDeque; use std::marker::PhantomData; @@ -13,7 +14,7 @@ use std::time::Instant; macro_rules! debug { ($($arg:expr),*) => ( if true { log::debug!($($arg),*); } ); } macro_rules! trace { ($($arg:expr),*) => ( if true { log::trace!($($arg),*); } ); } -macro_rules! trace_rt_decision { ($det:expr, $($arg:expr),*) => ( if $det { log::trace!($($arg),*); } ); } +macro_rules! trace_rt_decision { ($dtd:expr, $($arg:expr),*) => ( if $dtd { log::trace!($($arg),*); } ); } autoerr::create_error_v1!( name(Error, "RateLimitWriter"), @@ -29,6 +30,7 @@ pub struct WriteRes { pub status: u8, } +#[derive(Serialize)] pub struct RateLimitWriter where ET: EmittableType, @@ -66,7 +68,7 @@ where last_insert_val: None, dbgname, writer, - do_trace_detail: netpod::TRACE_SERIES_ID.contains(&series.id()), + do_trace_detail: series::dbg::dbg_series(series), _t1: PhantomData, }; if ret.do_trace_detail { @@ -83,9 +85,7 @@ where tsev: TsNano, deque: &mut VecDeque, ) -> Result { - // Decide whether we want to write. - // TODO catch already in CaConn the cases when the IOC-timestamp did not change. - let det = self.do_trace_detail; + let dtd = self.do_trace_detail; let dbgname = &self.dbgname; let sid = &self.series; let min_quiet = 1000 * self.min_quiet.as_secs() + self.min_quiet.subsec_millis() as u64; @@ -93,7 +93,7 @@ where let ts = tsev; if false { trace_rt_decision!( - det, + dtd, "{} {} min_quiet {:?} ts1 {:?} ts2 {:?} item {:?}", dbgname, sid, @@ -105,23 +105,42 @@ where } let do_write = { if !self.is_polled && ts.ms() < tsl.ms() + min_quiet { - trace_rt_decision!(det, "{dbgname} {sid} ignore, because not min quiet {ts:?} {tsl:?}"); + trace_rt_decision!( + dtd, + "{} {} ignore, because not min quiet {} {}", + dbgname, + sid, + ts, + tsl + ); false } else if self.is_polled && ts.ms() + 800 < tsl.ms() + min_quiet { trace_rt_decision!( - det, - "{dbgname} {sid} ignore, because not is-polled min quiet {ts:?} {tsl:?}" + dtd, + "{} {} ignore, because not is-polled min quiet {} {}", + dbgname, + sid, + ts, + tsl ); false - } else if ts < tsl.add_dt_nano(DtNano::from_ms(5)) { - trace_rt_decision!(det, "{dbgname} {sid} ignore, because store rate cap"); + } else if ts < tsl.add_dt_nano(DtNano::from_ms(1)) { + trace_rt_decision!( + dtd, + "{} {} ignore, because store rate cap {} {}", + dbgname, + sid, + ts, + tsl + ); false } else { - trace_rt_decision!(det, "{dbgname} {sid} accept"); + trace_rt_decision!(dtd, "{} {} accept {} {}", dbgname, sid, ts, tsl); true } }; if do_write { + self.last_insert_ts = ts; let res = self.writer.write(item, &mut self.emit_state, ts_net, ts, deque)?; let ret = WriteRes { accept: true, diff --git a/serieswriter/src/rtwriter.rs b/serieswriter/src/rtwriter.rs index 24a1a08..ec412e4 100644 --- a/serieswriter/src/rtwriter.rs +++ b/serieswriter/src/rtwriter.rs @@ -6,11 +6,13 @@ use netpod::Shape; use netpod::TsNano; use scywr::insertqueues::InsertDeques; use scywr::iteminsertqueue::QueryItem; +use serde::Serialize; use series::SeriesId; use std::collections::VecDeque; use std::time::Duration; use std::time::Instant; +macro_rules! debug_init { ($det:expr, $($arg:expr),*) => ( if $det { log::info!($($arg),*); } ); } macro_rules! trace_emit { ($det:expr, $($arg:expr),*) => ( if $det { log::trace!($($arg),*); } ); } macro_rules! trace_rt_decision { ($det:expr, $($arg:expr),*) => ( if $det { log::trace!($($arg),*); } ); } @@ -23,14 +25,14 @@ autoerr::create_error_v1!( }, ); -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize)] pub struct MinQuiets { pub st: Duration, pub mt: Duration, pub lt: Duration, } -#[derive(Debug)] +#[derive(Debug, Serialize)] struct State where ET: EmittableType, @@ -72,7 +74,7 @@ impl Default for WriteRtRes { } } -#[derive(Debug)] +#[derive(Debug, Serialize)] pub struct RtWriter where ET: EmittableType, @@ -103,8 +105,9 @@ where do_st_rf1: bool, emit_state_new: &dyn Fn() -> ::State, ) -> Result { + let dtd = series::dbg::dbg_series(series); + debug_init!(dtd, "new {:?} is_polled {}", min_quiets, is_polled); let state_st = { - // let writer = SeriesWriter::establish_with_sid(sid, stnow)?; let writer = RateLimitWriter::new(series, min_quiets.st, is_polled, emit_state_new(), "st".into())?; State { writer } }; @@ -124,7 +127,7 @@ where state_mt, state_lt, min_quiets, - do_trace_detail: netpod::TRACE_SERIES_ID.contains(&series.id()), + do_trace_detail: dtd, do_st_rf1, last_insert_ts: TsNano::from_ns(0), last_insert_val: None, @@ -157,6 +160,9 @@ where ) -> Result { let det = self.do_trace_detail; trace_emit!(det, "write {:?}", item.ts()); + // TODO + // Optimize for the common case that we only write into one of the stores. + // Make the decision first, based on ref, then clone only as required. let res_lt; let res_mt; let res_st; @@ -182,31 +188,21 @@ where .as_ref() .map(|k| item.has_change(k)) .unwrap_or(true) + == false { - // TODO filter duplicate values already here - res_lt = Self::write_inner(&mut self.state_lt, item.clone(), ts_net, tsev, &mut iqdqs.lt_rf3_qu)?; - if !res_lt.accept { - res_mt = Self::write_inner(&mut self.state_mt, item.clone(), ts_net, tsev, &mut iqdqs.mt_rf3_qu)?; - if !res_mt.accept { - if self.do_st_rf1 { - res_st = - Self::write_inner(&mut self.state_st, item.clone(), ts_net, tsev, &mut iqdqs.st_rf1_qu)?; - } else { - res_st = - Self::write_inner(&mut self.state_st, item.clone(), ts_net, tsev, &mut iqdqs.st_rf3_qu)?; - } - } else { - res_st = WriteRtRes::default(); - } - } else { - res_mt = WriteRtRes::default(); - res_st = WriteRtRes::default(); - } - } else { trace_rt_decision!(det, "{} ignore, because value did not change", self.series); res_lt = WriteRtRes::default(); res_mt = WriteRtRes::default(); res_st = WriteRtRes::default(); + } else { + res_lt = Self::write_inner(&mut self.state_lt, item.clone(), ts_net, tsev, &mut iqdqs.lt_rf3_qu)?; + res_mt = Self::write_inner(&mut self.state_mt, item.clone(), ts_net, tsev, &mut iqdqs.mt_rf3_qu)?; + res_st = if self.do_st_rf1 { + // Self::write_inner(&mut self.state_st, item.clone(), ts_net, tsev, &mut iqdqs.st_rf1_qu)? + Self::write_inner(&mut self.state_st, item.clone(), ts_net, tsev, &mut iqdqs.st_rf3_qu)? + } else { + Self::write_inner(&mut self.state_st, item.clone(), ts_net, tsev, &mut iqdqs.st_rf3_qu)? + }; } let ret = WriteRes { st: res_st, diff --git a/serieswriter/src/writer.rs b/serieswriter/src/writer.rs index 0ff0b08..d13c1a9 100644 --- a/serieswriter/src/writer.rs +++ b/serieswriter/src/writer.rs @@ -1,13 +1,15 @@ -use core::fmt; use log::*; use netpod::TsNano; use scywr::iteminsertqueue::QueryItem; +use serde::Serialize; use series::SeriesId; -pub use smallvec::SmallVec; use std::collections::VecDeque; +use std::fmt; use std::marker::PhantomData; use std::time::Instant; +pub use smallvec::SmallVec; + macro_rules! trace_emit { ($det:expr, $($arg:tt)*) => ( if $det { trace!($($arg)*); } ) } autoerr::create_error_v1!( @@ -32,7 +34,7 @@ pub struct EmitRes { } pub trait EmittableType: fmt::Debug + Clone { - type State; + type State: fmt::Debug + Serialize; fn ts(&self) -> TsNano; fn has_change(&self, k: &Self) -> bool; fn byte_size(&self) -> u32; @@ -57,7 +59,7 @@ pub struct WriteRes { pub status: u8, } -#[derive(Debug)] +#[derive(Debug, Serialize)] pub struct SeriesWriter { series: SeriesId, do_trace_detail: bool, @@ -71,7 +73,7 @@ where pub fn new(series: SeriesId) -> Result { let res = Self { series, - do_trace_detail: netpod::TRACE_SERIES_ID.contains(&series.id()), + do_trace_detail: series::dbg::dbg_series(series), _t1: PhantomData, }; Ok(res) @@ -92,7 +94,7 @@ where let det = self.do_trace_detail; let ts_main = item.ts(); let res = item.into_query_item(ts_net, tsev, state); - trace_emit!(det, "emit value for ts {:?} items len {}", ts_main, res.items.len()); + trace_emit!(det, "emit value for ts {} items len {}", ts_main, res.items.len()); for item in res.items { deque.push_back(item); } diff --git a/stats/src/stats.rs b/stats/src/stats.rs index 4211d74..a902ff8 100644 --- a/stats/src/stats.rs +++ b/stats/src/stats.rs @@ -316,6 +316,11 @@ stats_proc::stats_struct!(( channel_with_address, channel_no_address, connset_health_lat_ema, + iqtx_len_st_rf1, + iqtx_len_st_rf3, + iqtx_len_mt_rf3, + iqtx_len_lt_rf3, + iqtx_len_lt_rf3_lat5, ), ), agg(name(DaemonStatsAgg), parent(DaemonStats)),