From 3270d4843686509fd187a7b143659410ef573f96 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Mon, 7 Jul 2025 16:26:11 +0200 Subject: [PATCH] Refactor msp split type and add channel status variants --- .cargo/cargo-lock | 62 +++++++---- daqingest/Cargo.toml | 2 +- netfetch/src/ca/conn.rs | 134 ++++++++++++++--------- netfetch/src/ca/connset.rs | 90 ++++++++++----- netfetch/src/ca/statemap.rs | 6 - netfetch/src/conf.rs | 10 +- netfetch/src/metrics/ingest.rs | 38 ++----- netfetch/src/metrics/ingest/write_v02.rs | 41 ++----- netfetch/src/metrics/status.rs | 55 +++++++++- scywr/src/insertworker.rs | 2 +- serieswriter/src/fixgridwriter.rs | 48 +------- serieswriter/src/msptool.rs | 68 ++---------- serieswriter/src/msptool/dyngrid.rs | 78 +++++++++++++ serieswriter/src/msptool/fixgrid.rs | 35 ++++-- serieswriter/src/ratelimitwriter.rs | 16 +-- serieswriter/src/rtwriter.rs | 41 +++++-- serieswriter/src/writer.rs | 17 ++- stats/mettdecl.rs | 4 + 18 files changed, 432 insertions(+), 315 deletions(-) create mode 100644 serieswriter/src/msptool/dyngrid.rs diff --git a/.cargo/cargo-lock b/.cargo/cargo-lock index 1cf1d19..bf6c636 100644 --- a/.cargo/cargo-lock +++ b/.cargo/cargo-lock @@ -129,9 +129,9 @@ checksum = "76a2e8124351fda1ef8aaaa3bbd7ebbcb486bbcd4225aca0aa0d84bb2db8fecb" [[package]] name = "async-channel" -version = "2.3.1" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89b47800b0be77592da0afd425cc03468052844aff33b84e33cc696f64e77b6a" +checksum = "924ed96dd52d1b75e9c1a3e6275715fd320f5f9439fb5a4a11fa51f4221158d2" dependencies = [ "concurrent-queue", "event-listener-strategy", @@ -296,9 +296,9 @@ dependencies = [ [[package]] name = "bumpalo" -version = "3.18.1" +version = "3.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "793db76d6187cd04dff33004d8e6c9cc4e05cd330500379d2394209271b4aeee" +checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43" [[package]] name = "byteorder" @@ -314,9 +314,9 @@ checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" [[package]] name = "cc" -version = "1.2.27" +version = "1.2.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d487aa071b5f64da6f19a3e848e3578944b726ee5a4854b82172f02aa876bfdc" +checksum = "5c1599538de2394445747c8cf7935946e3cc27e9625f889d979bfb2aaf569362" dependencies = [ "shlex", ] @@ -411,9 +411,12 @@ checksum = "b94f61472cee1439c0b966b47e3aca9ae07e45d070759512cd390ea2bebc6675" [[package]] name = "cobs" -version = "0.2.3" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "67ba02a97a2bd10f4b59b25c7973101c79642302776489e030cd13cdab09ed15" +checksum = "0fa961b519f0b462e3a3b4a34b64d119eeaca1d59af726fe450bbba07a9fc0a1" +dependencies = [ + "thiserror", +] [[package]] name = "colorchoice" @@ -468,9 +471,9 @@ checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" [[package]] name = "crunchy" -version = "0.2.3" +version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43da5946c66ffcc7745f48db692ffbb10a83bfe0afd96235c5c2a4fb23994929" +checksum = "460fbee9c2c2f33933d720630a6a0bac33ba7053db5344fac858d4b8952d77d5" [[package]] name = "crypto-common" @@ -733,7 +736,7 @@ dependencies = [ [[package]] name = "daqingest" -version = "0.3.0-aa.8" +version = "0.3.0" dependencies = [ "async-channel", "autoerr", @@ -1407,9 +1410,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.9.0" +version = "2.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cea70ddb795996207ad57735b50c5982d8844f38ba9ee5f1aedcfb708a2aa11e" +checksum = "fe4cd85333e22411419a0bcae1297d25e58c9443848b11dc6a86fefe8c78a661" dependencies = [ "equivalent", "hashbrown 0.15.4", @@ -1434,6 +1437,17 @@ dependencies = [ "rustversion", ] +[[package]] +name = "io-uring" +version = "0.7.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b86e202f00093dcba4275d4636b93ef9dd75d025ae560d2521b45ea28ab49013" +dependencies = [ + "bitflags", + "cfg-if", + "libc", +] + [[package]] name = "is_terminal_polyfill" version = "1.70.1" @@ -1520,9 +1534,9 @@ checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94" [[package]] name = "lz4_flex" -version = "0.11.4" +version = "0.11.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c592ad9fbc1b7838633b3ae55ce69b17d01150c72fcef229fbb819d39ee51ee" +checksum = "08ab2867e3eeeca90e844d1940eab391c9dc5228783db2ed999acbc0a9ed375a" dependencies = [ "twox-hash", ] @@ -1819,9 +1833,9 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] name = "postcard" -version = "1.1.1" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "170a2601f67cc9dba8edd8c4870b15f71a6a2dc196daec8c83f72b59dff628a8" +checksum = "6c1de96e20f51df24ca73cafcc4690e044854d803259db27a00a461cb3b9d17a" dependencies = [ "cobs", "embedded-io 0.4.0", @@ -1888,9 +1902,9 @@ dependencies = [ [[package]] name = "prettyplease" -version = "0.2.34" +version = "0.2.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6837b9e10d61f45f987d50808f83d1ee3d206c66acf650c3e4ae2e1f6ddedf55" +checksum = "061c1221631e079b26479d25bbf2275bfe5917ae8419cd7e34f13bfc2aa7539a" dependencies = [ "proc-macro2", "syn", @@ -2498,9 +2512,9 @@ checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" [[package]] name = "syn" -version = "2.0.103" +version = "2.0.104" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e4307e30089d6fd6aff212f2da3a1f9e32f3223b1f010fb09b7c95f90f3ca1e8" +checksum = "17b6f705963418cdb9927482fa304bc562ece2fdd4f616084c50b7023b435a40" dependencies = [ "proc-macro2", "quote", @@ -2626,17 +2640,19 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.45.1" +version = "1.46.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75ef51a33ef1da925cea3e4eb122833cb377c61439ca401b770f54902b806779" +checksum = "0cc3a2344dafbe23a245241fe8b09735b521110d30fcefbbd5feb1797ca35d17" dependencies = [ "backtrace", "bytes", + "io-uring", "libc", "mio", "parking_lot", "pin-project-lite", "signal-hook-registry", + "slab", "socket2", "tokio-macros", "tracing", diff --git a/daqingest/Cargo.toml b/daqingest/Cargo.toml index 07b38f3..6e39cb6 100644 --- a/daqingest/Cargo.toml +++ b/daqingest/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqingest" -version = "0.3.0-aa.9" +version = "0.3.0-ab.1" authors = ["Dominik Werder "] edition = "2024" diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 3789981..255b3c6 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -58,8 +58,7 @@ use series::SeriesId; use serieswriter::binwriter::BinWriter; use serieswriter::binwriter::DiscardFirstOutput; use serieswriter::fixgridwriter::ChannelStatusSeriesWriter; -use serieswriter::fixgridwriter::ChannelStatusWriteState; -use serieswriter::msptool::MspSplit; +use serieswriter::msptool::fixgrid::MspSplitFixGrid; use serieswriter::rtwriter::RtWriter; use serieswriter::writer::EmittableType; use stats::IntervalEma; @@ -152,6 +151,10 @@ autoerr::create_error_v1!( }, ); +pub const fn channel_status_retention_time() -> RetentionTime { + RetentionTime::Long +} + impl err::ToErr for Error { fn to_err(self) -> err::Error { err::Error::with_msg_no_trace(self.to_string()) @@ -567,11 +570,11 @@ impl ChannelConf { conf, state: ChannelState::Init(cssid), wrst: WriterStatus { - writer_status: serieswriter::writer::SeriesWriter::new(SeriesId::new(cssid.id())).unwrap(), - writer_status_state: serieswriter::fixgridwriter::ChannelStatusWriteState::new( + writer_status: serieswriter::writer::SeriesWriter::new( SeriesId::new(cssid.id()), - serieswriter::fixgridwriter::CHANNEL_STATUS_GRID, - ), + MspSplitFixGrid::for_channel_status(), + ) + .unwrap(), }, } } @@ -750,7 +753,6 @@ impl ChannelState { #[derive(Debug)] struct WriterStatus { writer_status: ChannelStatusSeriesWriter, - writer_status_state: ChannelStatusWriteState, } impl WriterStatus { @@ -763,7 +765,7 @@ impl WriterStatus { let (ts, val) = item.to_ts_val(); self.writer_status.write( serieswriter::fixgridwriter::ChannelStatusWriteValue::new(ts, val), - &mut self.writer_status_state, + &mut (), Instant::now(), tsev, deque, @@ -1107,12 +1109,27 @@ impl<'a> EventAddIngestRefobj<'a> { pub type CmdResTx = Sender>; -#[derive(Debug)] pub struct CmdChannelInspectFull { name: String, tx: Sender, } +impl fmt::Debug for CmdChannelInspectFull { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("CmdChannelInspectFull").finish() + } +} + +struct StatusPrivate { + tx: Sender, +} + +impl fmt::Debug for StatusPrivate { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("StatusPrivate").finish() + } +} + #[derive(Debug)] pub enum ConnCommandKind { ChannelAdd(ChannelConfig, ChannelStatusSeriesId), @@ -1120,6 +1137,7 @@ pub enum ConnCommandKind { ChannelCloseReconf(String), Shutdown, ChannelInspectFull(CmdChannelInspectFull), + StatusPrivate(StatusPrivate), } #[derive(Debug)] @@ -1164,6 +1182,15 @@ impl ConnCommand { } } + pub async fn status_private() -> Self { + let (tx, rx) = async_channel::bounded(16); + let cmd = ConnCommand { + id: Self::make_id(), + kind: ConnCommandKind::StatusPrivate(StatusPrivate { tx }), + }; + cmd + } + fn make_id() -> usize { static ID: AtomicUsize = AtomicUsize::new(0); ID.fetch_add(1, atomic::Ordering::AcqRel) @@ -1639,6 +1666,7 @@ impl CaConn { } } } + ConnCommandKind::StatusPrivate(cmd) => todo!(), } } Ready(None) => { @@ -1703,7 +1731,13 @@ impl CaConn { ch.conf.min_quiets(), ch.conf.is_polled(), ch.conf.replication(), - &|| CaWriterValueState::new(st.series_status, chinfo.series.to_series()), + &|| { + CaWriterValueState::new( + st.series_status, + chinfo.series.to_series(), + channel_status_retention_time(), + ) + }, )?; self.handle_writer_establish_inner(cid, writer)?; have_progress = true; @@ -2140,18 +2174,8 @@ impl CaConn { // return Err(Error::with_msg_no_trace()); return Ok(()); }; - if false && dbg_chn { - trace!("handle_event_add_res {:?} {:?}", cid, ev); - } match ch_s { ChannelState::Writable(st) => { - if false && dbg_chn { - trace!("handle_event_add_res Writable {:?} {:?}", cid, ev); - } - // debug!( - // "CaConn sees data_count {} payload_len {}", - // ev.data_count, ev.payload_len - // ); let stnow = self.tmp_ts_poll; let crst = &mut st.channel; let stwin_ts = stnow.duration_since(std::time::UNIX_EPOCH).unwrap().as_secs() / 4; @@ -2205,11 +2229,6 @@ impl CaConn { monitoring_event_last: Some(ev.clone()), last_comparisons: VecDeque::new(), }); - let binwriter = if self.opts.binwriter_enable { - Some(&mut st.binwriter) - } else { - None - }; let mut robj = EventAddIngestRefobj::from_writable_state( &self.opts, &mut self.iqdqs, @@ -2233,11 +2252,6 @@ impl CaConn { } } st2.monitoring_event_last = Some(ev.clone()); - let binwriter = if self.opts.binwriter_enable { - Some(&mut st.binwriter) - } else { - None - }; let mut robj = EventAddIngestRefobj::from_writable_state( &self.opts, &mut self.iqdqs, @@ -2309,7 +2323,6 @@ impl CaConn { // return Err(Error::with_msg_no_trace()); return Ok(()); }; - // debug!("handle_event_add_res {ev:?}"); match ch_s { ChannelState::Writable(st) => match &mut st.reading { ReadingState::StopMonitoringForPolling(..) => { @@ -2412,11 +2425,6 @@ impl CaConn { trace!("make next poll idle at {:?} tsnow {:?}", next, tsnow); } st2.tick = PollTickState::Idle(PollTickStateIdle { next }); - let binwriter = if self.opts.binwriter_enable { - Some(&mut st.binwriter) - } else { - None - }; let mut robj = EventAddIngestRefobj::from_writable_state( &self.opts, &mut self.iqdqs, @@ -2463,6 +2471,7 @@ impl CaConn { ts_silence_read_next: tsnow + Self::silence_read_next_ivl_rng(&mut self.rng), }); if read_expected { + self.mett.monitoring_read_expected().inc(); let item = ChannelStatusItem { ts: self.tmp_ts_poll, cssid: st.channel.cssid.clone(), @@ -2471,6 +2480,7 @@ impl CaConn { ch_wrst .emit_channel_status_item(item, Self::channel_status_qu(&mut self.iqdqs))?; } else { + self.mett.monitoring_read_unexpected().inc(); let item = ChannelStatusItem { ts: self.tmp_ts_poll, cssid: st.channel.cssid.clone(), @@ -2479,26 +2489,50 @@ impl CaConn { ch_wrst .emit_channel_status_item(item, Self::channel_status_qu(&mut self.iqdqs))?; } - let iqdqs = &mut self.iqdqs; - let mett = &mut self.mett; // NOTE we do not update the last value in this ev handler. { if let Some(lst) = st2.monitoring_event_last.as_ref() { // TODO compare with last monitoring value - if ev.value.data == lst.value.data { - if ev.value.meta == lst.value.meta { - st2.last_comparisons - .push_back((UtcDateTime::now(), MonitorReadCmp::Equal)); - } else { - st2.last_comparisons - .push_back((UtcDateTime::now(), MonitorReadCmp::DiffTime)); - } + if ev.value.meta == lst.value.meta { + st2.last_comparisons + .push_back((UtcDateTime::now(), MonitorReadCmp::Equal)); } else { + self.mett.monitoring_read_diff_time().inc(); + st2.last_comparisons + .push_back((UtcDateTime::now(), MonitorReadCmp::DiffTime)); + { + let item = ChannelStatusItem { + ts: self.tmp_ts_poll, + cssid: st.channel.cssid.clone(), + status: ChannelStatus::MonitoringReadDiffTime, + }; + ch_wrst.emit_channel_status_item( + item, + Self::channel_status_qu(&mut self.iqdqs), + )?; + } + } + if ev.value.data == lst.value.data { + st2.last_comparisons + .push_back((UtcDateTime::now(), MonitorReadCmp::Equal)); + } else { + self.mett.monitoring_read_diff_value().inc(); st2.last_comparisons .push_back((UtcDateTime::now(), MonitorReadCmp::DiffValue)); + { + let item = ChannelStatusItem { + ts: self.tmp_ts_poll, + cssid: st.channel.cssid.clone(), + status: ChannelStatus::MonitoringReadDiffValue, + }; + ch_wrst.emit_channel_status_item( + item, + Self::channel_status_qu(&mut self.iqdqs), + )?; + } } } - while st2.last_comparisons.len() > 6 { + while st2.last_comparisons.len() > 12 { st2.last_comparisons.pop_front(); } } @@ -3994,19 +4028,15 @@ struct CaWriterValueState { series_status: SeriesId, last_accepted_ts: TsNano, last_accepted_val: Option, - msp_split_status: MspSplit, - msp_split_data: MspSplit, } impl CaWriterValueState { - fn new(series_status: SeriesId, series_data: SeriesId) -> Self { + fn new(series_status: SeriesId, series_data: SeriesId, rt: RetentionTime) -> Self { Self { series_data, series_status, last_accepted_ts: TsNano::from_ns(0), last_accepted_val: None, - msp_split_status: MspSplit::new(1024 * 64, 1024 * 1024 * 10), - msp_split_data: MspSplit::new(1024 * 64, 1024 * 1024 * 10), } } } diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index ee075b7..2ac02eb 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -56,6 +56,7 @@ use netpod::OnDrop; use netpod::TsNano; use scywr::insertqueues::InsertQueuesTx; use series::SeriesId; +use serieswriter::msptool::fixgrid::MspSplitFixGrid; use stats::mett::CaConnSetMetrics; use std::sync::Arc; use std::task::Context; @@ -198,17 +199,34 @@ pub struct ChannelStatusesRequest { pub tx: Sender, } +impl fmt::Debug for ChannelStatusesRequest { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("ChannelStatusesRequest").finish() + } +} + #[derive(Debug, Serialize)] pub struct ChannelStatusesResponse { pub channels_ca_conn_set: BTreeMap, } -impl fmt::Debug for ChannelStatusesRequest { +pub struct ChannelStatusesPrivateRequest { + pub name: String, + pub limit: u64, + pub tx: Sender, +} + +impl fmt::Debug for ChannelStatusesPrivateRequest { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - fmt.debug_struct("ChannelStatusesRequest").finish() + fmt.debug_struct("ChannelStatusesPrivateRequest").finish() } } +#[derive(Debug, Serialize)] +pub struct ChannelStatusesPrivateResponse { + pub channels_ca_conn_set: BTreeMap, +} + #[derive(Debug)] pub struct ChannelCommand { pub channel: String, @@ -223,6 +241,7 @@ pub enum ConnSetCmd { ChannelRemove(ChannelRemove), Shutdown, ChannelStatuses(ChannelStatusesRequest), + ChannelStatusesPrivate(ChannelStatusesPrivateRequest), // TODO rename to ConnCommand because it must be handled by some specific Conn ChannelCommand(ChannelCommand), } @@ -532,6 +551,7 @@ impl CaConnSet { ConnSetCmd::ChannelRemove(x) => self.handle_remove_channel(x), ConnSetCmd::Shutdown => self.handle_shutdown(), ConnSetCmd::ChannelStatuses(x) => self.handle_channel_statuses_req(x), + ConnSetCmd::ChannelStatusesPrivate(x) => self.handle_channel_statuses_private_req(x), ConnSetCmd::ChannelCommand(x) => self.handle_channel_command(x), }, } @@ -757,20 +777,18 @@ impl CaConnSet { self.cssid_latency_max = dt + Duration::from_millis(2000); debug!("slow cssid fetch dt {:.0} ms {:?}", 1e3 * dt.as_secs_f32(), cmd); } - let mut writer_status = serieswriter::writer::SeriesWriter::new(SeriesId::new(cmd.cssid.id()))?; - let mut writer_status_state = serieswriter::fixgridwriter::ChannelStatusWriteState::new( + let mut writer_status = serieswriter::writer::SeriesWriter::new( SeriesId::new(cmd.cssid.id()), - serieswriter::fixgridwriter::CHANNEL_STATUS_GRID, - ); + MspSplitFixGrid::for_channel_status(), + )?; { let status = netpod::channelstatus::ChannelStatus::HaveStatusId; let stnow = SystemTime::now(); let ts = TsNano::from_system_time(stnow); let item = serieswriter::fixgridwriter::ChannelStatusWriteValue::new(ts, status.to_u64()); - let state = &mut writer_status_state; let ts_net = Instant::now(); let deque = &mut self.storage_insert_lt_qu_l1; - writer_status.write(item, state, ts_net, ts, deque)?; + writer_status.write(item, &mut (), ts_net, ts, deque)?; } *chst2 = ActiveChannelState::WithStatusSeriesId(WithStatusSeriesIdState { cssid: cmd.cssid, @@ -779,7 +797,6 @@ impl CaConnSet { since: SystemTime::now(), }, writer_status: Some(writer_status), - writer_status_state: Some(writer_status_state), }); let qu = IocAddrQuery::cached(name.into()); self.find_ioc_query_queue.push_back(qu); @@ -822,35 +839,36 @@ impl CaConnSet { trace!("handle_add_channel_with_addr INNER {:?}", cmd); self.mett.handle_add_channel_with_addr().inc(); let tsnow = SystemTime::now(); - let mut writer_status = serieswriter::writer::SeriesWriter::new(SeriesId::new(cmd.cssid.id()))?; - let mut writer_status_state = serieswriter::fixgridwriter::ChannelStatusWriteState::new( - SeriesId::new(cmd.cssid.id()), - serieswriter::fixgridwriter::CHANNEL_STATUS_GRID, - ); { let status = netpod::channelstatus::ChannelStatus::HaveAddress; let stnow = SystemTime::now(); let ts = TsNano::from_system_time(stnow); let item = serieswriter::fixgridwriter::ChannelStatusWriteValue::new(ts, status.to_u64()); - let state = &mut writer_status_state; let ts_net = Instant::now(); let deque = &mut self.storage_insert_lt_qu_l1; - writer_status.write(item, state, ts_net, ts, deque)?; + st3.writer_status + .as_mut() + .unwrap() + .write(item, &mut (), ts_net, ts, deque)?; } - *st3 = WithStatusSeriesIdState { - cssid: cmd.cssid.clone(), - addr_find_backoff: 0, - inner: WithStatusSeriesIdStateInner::WithAddress { - addr: addr_v4, - state: WithAddressState::Assigned(ConnectionState { - updated: tsnow, - health_update_count: 0, - value: ConnectionStateValue::Unknown, - }), - }, - writer_status: Some(writer_status), - writer_status_state: Some(writer_status_state), + st3.cssid = cmd.cssid.clone(); + st3.addr_find_backoff = 0; + st3.inner = WithStatusSeriesIdStateInner::WithAddress { + addr: addr_v4, + state: WithAddressState::Assigned(ConnectionState { + updated: tsnow, + health_update_count: 0, + value: ConnectionStateValue::Unknown, + }), }; + if false { + let _ = WithStatusSeriesIdState { + cssid: todo!(), + addr_find_backoff: todo!(), + inner: todo!(), + writer_status: todo!(), + }; + } let addr = cmd.addr; if self.ca_conn_ress.contains_key(&addr) { trace!("ca_conn_ress has already {:?}", addr_v4); @@ -1004,6 +1022,18 @@ impl CaConnSet { Ok(()) } + fn handle_channel_statuses_private_req(&mut self, req: ChannelStatusesPrivateRequest) -> Result<(), Error> { + if self.shutdown_stopping { + return Ok(()); + } + for (addr, ca_conn) in self.ca_conn_ress.iter() { + // let item = ConnCommand::status_private(); + // ca_conn.cmd_queue.push_back(item); + } + let reg1 = regex::Regex::new(&req.name)?; + Ok(()) + } + fn handle_channel_command(&mut self, cmd: ChannelCommand) -> Result<(), Error> { if self.shutdown_stopping { return Ok(()); @@ -1480,7 +1510,7 @@ impl CaConnSet { let deque = &mut lt_qu_2; st3.writer_status.as_mut().unwrap().write( serieswriter::fixgridwriter::ChannelStatusWriteValue::new(tsev, val), - st3.writer_status_state.as_mut().unwrap(), + &mut (), tsnow, tsev, deque, diff --git a/netfetch/src/ca/statemap.rs b/netfetch/src/ca/statemap.rs index 264c5c1..e2d6cc5 100644 --- a/netfetch/src/ca/statemap.rs +++ b/netfetch/src/ca/statemap.rs @@ -6,7 +6,6 @@ use serde::Serialize; use serde_helper::serde_instant::serde_Instant_elapsed_ms; use series::ChannelStatusSeriesId; use serieswriter::fixgridwriter::ChannelStatusSeriesWriter; -use serieswriter::fixgridwriter::ChannelStatusWriteState; use std::collections::BTreeMap; use std::collections::HashMap; use std::collections::btree_map::RangeMut; @@ -124,12 +123,8 @@ pub struct WithStatusSeriesIdState { pub inner: WithStatusSeriesIdStateInner, #[serde(serialize_with = "serde_ser_channel_status_writer")] pub writer_status: Option, - #[serde(skip)] - pub writer_status_state: Option, } -// Need Clone because we use the state tree for metrics output -// TODO use a new info struct impl Clone for WithStatusSeriesIdState { fn clone(&self) -> Self { Self { @@ -137,7 +132,6 @@ impl Clone for WithStatusSeriesIdState { addr_find_backoff: self.addr_find_backoff.clone(), inner: self.inner.clone(), writer_status: None, - writer_status_state: None, } } } diff --git a/netfetch/src/conf.rs b/netfetch/src/conf.rs index a05bc8e..f51a563 100644 --- a/netfetch/src/conf.rs +++ b/netfetch/src/conf.rs @@ -1,6 +1,6 @@ use err::Error; use netpod::Database; -use netpod::log::*; +use netpod::log; use regex::Regex; use scywr::config::ScyllaIngestConfig; use serde::Deserialize; @@ -171,7 +171,7 @@ impl CaIngestOpts { let has_default_hosts = self.scylla.is_some(); for c in confs.iter() { if c.hosts.is_none() && !has_default_hosts { - warn!("scylla config is missing hosts"); + log::warn!("scylla config is missing hosts"); return false; } } @@ -334,7 +334,7 @@ async fn parse_channel_config_txt(fname: &Path) -> Result conf.channels.push(item); } } - info!("Parsed {} channels", conf.channels.len()); + log::info!("Parsed {} channels", conf.channels.len()); Ok(conf) } @@ -392,10 +392,10 @@ async fn parse_config_dir(dir: &Path) -> Result { let buf = tokio::fs::read(e.path()).await?; let conf: BTreeMap = serde_yaml::from_slice(&buf).map_err(Error::from_string)?; - info!("parsed {} channels from {}", conf.len(), fns); + log::info!("parsed {} channels from {}", conf.len(), fns); ret.push_from_parsed(&conf, basename); } else { - debug!("ignore channel config file {:?}", e.path()); + log::debug!("ignore channel config file {:?}", e.path()); } } Ok(ret) diff --git a/netfetch/src/metrics/ingest.rs b/netfetch/src/metrics/ingest.rs index 8af66ac..bb22c69 100644 --- a/netfetch/src/metrics/ingest.rs +++ b/netfetch/src/metrics/ingest.rs @@ -29,7 +29,7 @@ use scywr::iteminsertqueue::ScalarValue; use serde::Deserialize; use serde::Serialize; use series::SeriesId; -use serieswriter::msptool::MspSplit; +use serieswriter::msptool::dyngrid::MspSplitDyn; use serieswriter::writer::EmittableType; use serieswriter::writer::SeriesWriter; use std::collections::HashMap; @@ -48,20 +48,16 @@ macro_rules! trace_input { ($($arg:tt)*) => { if true { log::trace!($($arg)*); } macro_rules! trace_queues { ($($arg:tt)*) => { if true { log::trace!($($arg)*); } }; } -type ValueSeriesWriter = SeriesWriter; +type ValueSeriesWriter = SeriesWriter; #[derive(Debug, Serialize)] struct WritableTypeState { series: SeriesId, - msp_split_data: MspSplit, } impl WritableTypeState { - fn new(series: SeriesId) -> Self { - Self { - series, - msp_split_data: MspSplit::new(10000, 1024 * 256), - } + fn new(series: SeriesId, rt: RetentionTime) -> Self { + Self { series } } } @@ -91,23 +87,6 @@ impl EmittableType for WritableType { ) -> serieswriter::writer::EmitRes { let bytes = ByteSize(self.byte_size()); let data_item = self.1; - // let (ts_msp, ts_lsp, ts_msp_chg) = state.msp_split_data.split(self.0.clone(), self.byte_size()); - // let item = QueryItem::Insert(scywr::iteminsertqueue::InsertItem { - // series: state.series.clone(), - // ts_msp: ts_msp.to_ts_ms(), - // ts_lsp, - // val: self.1.clone(), - // ts_net, - // }); - // let mut items = smallvec::SmallVec::new(); - // items.push(item); - // if ts_msp_chg { - // items.push(QueryItem::Msp(scywr::iteminsertqueue::MspItem::new( - // state.series.clone(), - // ts_msp.to_ts_ms(), - // ts_net, - // ))); - // } serieswriter::writer::EmitRes { data_item, bytes } } } @@ -191,7 +170,8 @@ async fn post_v01_try( }; rres.worker_tx.send(qu).await.unwrap(); let chinfo = rx.recv().await.unwrap().unwrap(); - let mut writer = SeriesWriter::new(chinfo.series.to_series())?; + let msp_split = MspSplitDyn::new(1024 * 64, 1024 * 1024 * 10, rt.clone()); + let mut writer = SeriesWriter::new(chinfo.series.to_series(), msp_split)?; debug_setup!("series writer established"); let mut iqdqs = InsertDeques::new(); let mut iqtx = rres.iqtx.clone(); @@ -342,7 +322,7 @@ where let stnow = SystemTime::now(); let tsev = TsNano::from_system_time(stnow); let tsnow = Instant::now(); - let mut emit_state = WritableTypeState::new(writer.sid()); + let mut emit_state = WritableTypeState::new(writer.sid(), writer.rt()); for (i, (ts, val)) in evs.iter_zip().enumerate() { let val = val.clone(); trace_input!("ev {:6} {:20} {:20?}", i, ts, val); @@ -367,7 +347,7 @@ fn evpush_dim0_enum( let stnow = SystemTime::now(); let tsev = TsNano::from_system_time(stnow); let tsnow = Instant::now(); - let mut emit_state = WritableTypeState::new(writer.sid()); + let mut emit_state = WritableTypeState::new(writer.sid(), writer.rt()); for (i, (ts, val)) in evs.iter_zip().enumerate() { let val = val.clone(); trace_input!("ev {:6} {:20} {:20?}", i, ts, val); @@ -397,7 +377,7 @@ where let stnow = SystemTime::now(); let tsev = TsNano::from_system_time(stnow); let tsnow = Instant::now(); - let mut emit_state = WritableTypeState::new(writer.sid()); + let mut emit_state = WritableTypeState::new(writer.sid(), writer.rt()); for (i, (ts, val)) in evs.iter_zip().enumerate() { let val = val.clone(); trace_input!("ev {:6} {:20} {:20?}", i, ts, val); diff --git a/netfetch/src/metrics/ingest/write_v02.rs b/netfetch/src/metrics/ingest/write_v02.rs index 2829734..ad579f6 100644 --- a/netfetch/src/metrics/ingest/write_v02.rs +++ b/netfetch/src/metrics/ingest/write_v02.rs @@ -32,7 +32,7 @@ use series::SeriesId; use serieswriter::binwriter::BinWriter; use serieswriter::binwriter::DiscardFirstOutput; use serieswriter::binwriter::WriteCntZero; -use serieswriter::msptool::MspSplit; +use serieswriter::msptool::dyngrid::MspSplitDyn; use serieswriter::rtwriter::MinQuiets; use serieswriter::writer::EmittableType; use serieswriter::writer::SeriesWriter; @@ -81,20 +81,16 @@ autoerr::create_error_v1!( }, ); -type ValueSeriesWriter = SeriesWriter; +type ValueSeriesWriter = SeriesWriter; #[derive(Debug, Serialize)] struct WritableTypeState { series: SeriesId, - msp_split_data: MspSplit, } impl WritableTypeState { - fn new(series: SeriesId) -> Self { - Self { - series, - msp_split_data: MspSplit::new(10000, 1024 * 256), - } + fn new(series: SeriesId, rt: RetentionTime) -> Self { + Self { series } } } @@ -124,21 +120,6 @@ impl EmittableType for WritableType { ) -> serieswriter::writer::EmitRes { let bytes = ByteSize(self.byte_size()); let data_item = self.1; - // let (ts_msp, ts_lsp, ts_msp_chg) = state.msp_split_data.split(self.0.clone(), self.byte_size()); - // let item = QueryItem::Insert(scywr::iteminsertqueue::InsertItem { - // series: state.series.clone(), - // ts_msp: ts_msp.to_ts_ms(), - // ts_lsp, - // val: self.1.clone(), - // ts_net, - // }); - // if ts_msp_chg { - // items.push(QueryItem::Msp(scywr::iteminsertqueue::MspItem::new( - // state.series.clone(), - // ts_msp.to_ts_ms(), - // ts_net, - // ))); - // } serieswriter::writer::EmitRes { data_item, bytes } } } @@ -168,7 +149,7 @@ where let stnow = SystemTime::now(); let tsev = TsNano::from_system_time(stnow); let tsnow = Instant::now(); - let mut emit_state = WritableTypeState::new(params.writer.sid()); + let mut emit_state = WritableTypeState::new(params.writer.sid(), params.rt.clone()); if evs.len() != 0 { if params.binwriter.is_none() { for (i, (ts, val)) in evs.iter_zip().enumerate() { @@ -226,7 +207,7 @@ fn evpush_dim0_enum(mut params: EvPushParams) -> Result<(), Error> { let stnow = SystemTime::now(); let tsev = TsNano::from_system_time(stnow); let tsnow = Instant::now(); - let mut emit_state = WritableTypeState::new(params.writer.sid()); + let mut emit_state = WritableTypeState::new(params.writer.sid(), params.rt.clone()); let deque = params.iqdqs.deque(params.rt.clone()); for (i, (ts, val)) in evs.iter_zip().enumerate() { let val = val.clone(); @@ -254,7 +235,7 @@ where let stnow = SystemTime::now(); let tsev = TsNano::from_system_time(stnow); let tsnow = Instant::now(); - let mut emit_state = WritableTypeState::new(params.writer.sid()); + let mut emit_state = WritableTypeState::new(params.writer.sid(), params.rt.clone()); let deque = params.iqdqs.deque(params.rt.clone()); for (i, (ts, val)) in evs.iter_zip().enumerate() { let val = val.clone(); @@ -273,7 +254,7 @@ fn frame_write( rt: RetentionTime, scalar_type: ScalarType, shape: Shape, - writer: &mut SeriesWriter, + writer: &mut SeriesWriter, binwriter: &mut Option, iqdqs: &mut InsertDeques, ) -> Result<(), Error> { @@ -421,7 +402,8 @@ async fn write_with_fresh_msps_inner( .await .map_err(|_| Error::ConfigLookup)? .map_err(|_| Error::ConfigLookup)?; - let mut writer = SeriesWriter::new(chinfo.series.to_series())?; + let msp_split = MspSplitDyn::new(1024 * 64, 1024 * 1024 * 10, rt.clone()); + let mut writer = SeriesWriter::new(chinfo.series.to_series(), msp_split)?; let mut binwriter = None; debug_setup!("series writer established"); let mut iqdqs = InsertDeques::new(); @@ -616,7 +598,8 @@ async fn write_events_exact_2( ) -> Result, Error> { debug_setup!("write_events_exact {:?} {:?}", conf, rt); let series = SeriesId::new(conf.series); - let mut writer = SeriesWriter::new(series)?; + let msp_split = MspSplitDyn::new(1024 * 64, 1024 * 1024 * 10, rt.clone()); + let mut writer = SeriesWriter::new(series, msp_split)?; let mut binwriter = None; debug_setup!("series writer established"); let mut iqdqs = InsertDeques::new(); diff --git a/netfetch/src/metrics/status.rs b/netfetch/src/metrics/status.rs index 747b508..c6439b1 100644 --- a/netfetch/src/metrics/status.rs +++ b/netfetch/src/metrics/status.rs @@ -22,8 +22,6 @@ autoerr::create_error_v1!( #[derive(Debug, Serialize)] pub struct ChannelStates { running_since: DateTime, - // #[serde(with = "humantime_serde")] - // running_since_2: SystemTime, channels: BTreeMap, } @@ -290,3 +288,56 @@ async fn channel_states_try( } Ok(axum::Json(states)) } + +pub async fn channel_states_private( + params: HashMap, + tx: Sender, +) -> Result, axum::Json> { + match channel_states_private_try(params, tx).await { + Ok(x) => Ok(x), + Err(e) => Err(axum::Json(e.to_string())), + } +} + +async fn channel_states_private_try( + params: HashMap, + tx: Sender, +) -> Result, Error> { + let name = params.get("name").map_or(String::new(), |x| x.clone()).to_string(); + let limit = params.get("limit").and_then(|x| x.parse().ok()).unwrap_or(40); + let (tx2, rx2) = async_channel::bounded(1); + let req = ChannelStatusesRequest { name, limit, tx: tx2 }; + let item = CaConnSetEvent::ConnSetCmd(ConnSetCmd::ChannelStatuses(req)); + // TODO handle error + tx.send(item).await.unwrap(); + let res = rx2.recv().await.unwrap(); + Ok(axum::Json(serde_json::Value::Null)) +} + +#[derive(Debug, Serialize)] +pub struct ChannelStatesPrivate { + running_since: DateTime, + channels: BTreeMap, +} + +#[derive(Debug, Serialize)] +struct ChannelStatePrivate { + ioc_address: Option, + // connection: ConnectionState, + // archiving_configuration: ChannelConfigForStatesApi, + // recv_count: u64, + // recv_bytes: u64, + // #[serde(with = "humantime_serde", skip_serializing_if = "system_time_epoch")] + // recv_last: SystemTime, + // #[serde(with = "humantime_serde", skip_serializing_if = "system_time_epoch")] + // write_st_last: SystemTime, + // #[serde(with = "humantime_serde", skip_serializing_if = "system_time_epoch")] + // write_mt_last: SystemTime, + // #[serde(with = "humantime_serde", skip_serializing_if = "system_time_epoch")] + // write_lt_last: SystemTime, + // #[serde(with = "humantime_serde", skip_serializing_if = "system_time_epoch")] + // updated: SystemTime, + // #[serde(with = "humantime_serde")] + // pong_last: Option, + // private: StatePrivate, +} diff --git a/scywr/src/insertworker.rs b/scywr/src/insertworker.rs index 5972cf8..078ae9d 100644 --- a/scywr/src/insertworker.rs +++ b/scywr/src/insertworker.rs @@ -218,7 +218,7 @@ async fn worker_streamed( let tsnow = Instant::now(); let mut mett = stats::mett::ScyllaInsertWorker::new(); let mut mett_emit_last = tsnow; - let metrics_ivl = Duration::from_millis(1000); + let metrics_ivl = Duration::from_millis(500); insert_worker_opts .insert_workers_running .fetch_add(1, atomic::Ordering::AcqRel); diff --git a/serieswriter/src/fixgridwriter.rs b/serieswriter/src/fixgridwriter.rs index 7fa6caf..fb208eb 100644 --- a/serieswriter/src/fixgridwriter.rs +++ b/serieswriter/src/fixgridwriter.rs @@ -6,16 +6,12 @@ use netpod::ByteSize; use netpod::DtMs; use netpod::TsNano; use scywr::iteminsertqueue::DataValue; -use scywr::iteminsertqueue::MspItem; -use scywr::iteminsertqueue::QueryItem; use scywr::iteminsertqueue::ScalarValue; -use serde::Serialize; -use series::SeriesId; use std::time::Instant; pub const CHANNEL_STATUS_GRID: DtMs = DtMs::from_ms_u64(1000 * 60 * 60); -pub type ChannelStatusSeriesWriter = SeriesWriter; +pub type ChannelStatusSeriesWriter = SeriesWriter; #[derive(Debug, Clone)] pub struct ChannelStatusWriteValue(TsNano, u64); @@ -27,7 +23,7 @@ impl ChannelStatusWriteValue { } impl EmittableType for ChannelStatusWriteValue { - type State = ChannelStatusWriteState; + type State = (); fn ts(&self) -> TsNano { self.0 @@ -50,27 +46,6 @@ impl EmittableType for ChannelStatusWriteValue { ) -> serieswriter::writer::EmitRes { let byte_size = self.byte_size(); let data_item = DataValue::Scalar(ScalarValue::U64(self.1)); - // let ts = tsev; - // state.last_accepted_ts = ts; - // state.last_accepted_val = Some(self.1); - { - // let (ts_msp, ts_lsp, ts_msp_chg) = state.msp_split.split(ts, self.byte_size()); - // if ts_msp_chg { - // items.push(QueryItem::Msp(MspItem::new( - // state.series.clone(), - // ts_msp.to_ts_ms(), - // ts_net, - // ))); - // } - // let item = scywr::iteminsertqueue::InsertItem { - // series: state.series.clone(), - // ts_msp: ts_msp.to_ts_ms(), - // ts_lsp, - // ts_net, - // val: DataValue::Scalar(ScalarValue::U64(self.1)), - // }; - // items.push(QueryItem::Insert(item)); - } let ret = serieswriter::writer::EmitRes { data_item, bytes: ByteSize(byte_size), @@ -78,22 +53,3 @@ impl EmittableType for ChannelStatusWriteValue { ret } } - -#[derive(Debug, Serialize)] -pub struct ChannelStatusWriteState { - series: SeriesId, - msp_split: MspSplitFixGrid, - last_accepted_ts: TsNano, - last_accepted_val: Option, -} - -impl ChannelStatusWriteState { - pub fn new(series: SeriesId, grid_dt: DtMs) -> Self { - Self { - series, - msp_split: MspSplitFixGrid::new(grid_dt), - last_accepted_ts: TsNano::from_ns(0), - last_accepted_val: None, - } - } -} diff --git a/serieswriter/src/msptool.rs b/serieswriter/src/msptool.rs index 397c4bf..601a3cd 100644 --- a/serieswriter/src/msptool.rs +++ b/serieswriter/src/msptool.rs @@ -1,67 +1,13 @@ +pub mod dyngrid; pub mod fixgrid; use netpod::DtNano; use netpod::TsNano; -use netpod::timeunits::DAY; -use netpod::timeunits::SEC; -use serde::Serialize; +use netpod::ttl::RetentionTime; -#[derive(Debug, Serialize)] -pub struct MspSplit { - last: Option, - count: u32, - bytes: u32, - count_max: u32, - bytes_max: u32, -} - -impl MspSplit { - pub fn new(count_max: u32, bytes_max: u32) -> Self { - Self { - last: None, - count: 0, - bytes: 0, - count_max, - bytes_max, - } - } - - pub fn split(&mut self, ts: TsNano, item_bytes: u32) -> (TsNano, DtNano, bool, Option) { - // Maximum resolution of the ts msp: - let msp_res_max = SEC * 2; - let ts_inp = ts; - let (ts_msp, changed, ts_msp_retired) = match self.last { - Some(ts_msp_last) => { - if self.count >= self.count_max || self.bytes >= self.bytes_max || ts_msp_last.add_ns(DAY) <= ts_inp { - let ts_msp = ts_inp.div(msp_res_max).mul(msp_res_max); - if ts_msp == ts_msp_last { - // TODO should count these for metrics - (ts_msp, false, None) - } else { - self.last = Some(ts_msp); - self.count = 1; - self.bytes = item_bytes; - (ts_msp, true, Some(ts_msp_last)) - } - } else { - self.count += 1; - self.bytes += item_bytes; - (ts_msp_last, false, None) - } - } - None => { - let ts_msp = ts_inp.div(msp_res_max).mul(msp_res_max); - self.last = Some(ts_msp); - self.count = 1; - self.bytes = item_bytes; - (ts_msp, true, None) - } - }; - let ts_lsp = ts_inp.delta(ts_msp); - (ts_msp, ts_lsp, changed, ts_msp_retired) - } - - pub fn ts_msp_current(&self) -> Option { - self.last - } +pub trait MspSplit { + // fn new() -> Self; + fn split(&mut self, ts: TsNano, item_bytes: u32) -> (TsNano, DtNano, bool, Option); + fn ts_msp_current(&self) -> Option; + fn rt(&self) -> RetentionTime; } diff --git a/serieswriter/src/msptool/dyngrid.rs b/serieswriter/src/msptool/dyngrid.rs new file mode 100644 index 0000000..7d319fb --- /dev/null +++ b/serieswriter/src/msptool/dyngrid.rs @@ -0,0 +1,78 @@ +use crate::msptool::MspSplit; +use netpod::DtNano; +use netpod::TsNano; +use netpod::timeunits::SEC; +use netpod::ttl::RetentionTime; +use serde::Serialize; + +#[derive(Debug, Serialize)] +pub struct MspSplitDyn { + last: Option, + count: u32, + bytes: u32, + count_max: u32, + bytes_max: u32, + rt: RetentionTime, + rollover_at: TsNano, +} + +impl MspSplitDyn { + pub fn new(count_max: u32, bytes_max: u32, rt: RetentionTime) -> Self { + Self { + last: None, + count: 0, + bytes: 0, + count_max, + bytes_max, + rt, + rollover_at: TsNano::from_ns(0), + } + } +} + +impl MspSplit for MspSplitDyn { + fn split(&mut self, ts: TsNano, item_bytes: u32) -> (TsNano, DtNano, bool, Option) { + // Maximum resolution of the ts msp: + let msp_res_max = SEC * 2; + let ts_inp = ts; + let (ts_msp, changed, ts_msp_retired) = match self.last { + Some(ts_msp_last) => { + if self.count >= self.count_max || self.bytes >= self.bytes_max || self.rollover_at <= ts_inp { + let ts_msp = ts_inp.div(msp_res_max).mul(msp_res_max); + if ts_msp == ts_msp_last { + // TODO should count these for metrics + (ts_msp, false, None) + } else { + self.last = Some(ts_msp); + self.count = 1; + self.bytes = item_bytes; + self.rollover_at = TsNano::from_ns(ts_inp.ns() + SEC * self.rt.ttl_ts_msp().as_secs() / 20); + (ts_msp, true, Some(ts_msp_last)) + } + } else { + self.count += 1; + self.bytes += item_bytes; + (ts_msp_last, false, None) + } + } + None => { + let ts_msp = ts_inp.div(msp_res_max).mul(msp_res_max); + self.last = Some(ts_msp); + self.count = 1; + self.bytes = item_bytes; + self.rollover_at = TsNano::from_ns(ts_inp.ns() + SEC * self.rt.ttl_ts_msp().as_secs() / 20); + (ts_msp, true, None) + } + }; + let ts_lsp = ts_inp.delta(ts_msp); + (ts_msp, ts_lsp, changed, ts_msp_retired) + } + + fn ts_msp_current(&self) -> Option { + self.last + } + + fn rt(&self) -> RetentionTime { + self.rt.clone() + } +} diff --git a/serieswriter/src/msptool/fixgrid.rs b/serieswriter/src/msptool/fixgrid.rs index 2400a7f..fd50d18 100644 --- a/serieswriter/src/msptool/fixgrid.rs +++ b/serieswriter/src/msptool/fixgrid.rs @@ -1,13 +1,14 @@ +use crate::fixgridwriter::CHANNEL_STATUS_GRID; +use crate::msptool::MspSplit; use netpod::DtMs; use netpod::DtNano; -use netpod::TsMs; use netpod::TsNano; use serde::Serialize; #[derive(Debug, Serialize)] pub struct MspSplitFixGrid { grid_dt: DtMs, - last: Option, + last: Option, } impl MspSplitFixGrid { @@ -15,16 +16,34 @@ impl MspSplitFixGrid { Self { grid_dt, last: None } } - pub fn split(&mut self, ts: TsNano, _item_bytes: u32) -> (TsNano, DtNano, bool) { + pub fn for_channel_status() -> Self { + Self::new(CHANNEL_STATUS_GRID) + } +} + +impl MspSplit for MspSplitFixGrid { + fn split(&mut self, ts: TsNano, _item_bytes: u32) -> (TsNano, DtNano, bool, Option) { let (msp, _) = ts.to_ts_ms().to_grid_02(self.grid_dt); - let changed = if self.last != Some(msp) { - self.last = Some(msp); - true + let (changed, ts_msp_retired) = if let Some(ts_msp_last) = self.last { + if msp.ns() != ts_msp_last { + self.last = Some(msp.ns()); + (true, Some(ts_msp_last)) + } else { + (false, None) + } } else { - false + (true, None) }; let msp = msp.ns(); let lsp = ts.delta(msp); - (msp, lsp, changed) + (msp, lsp, changed, ts_msp_retired) + } + + fn ts_msp_current(&self) -> Option { + todo!() + } + + fn rt(&self) -> netpod::ttl::RetentionTime { + todo!() } } diff --git a/serieswriter/src/ratelimitwriter.rs b/serieswriter/src/ratelimitwriter.rs index ef24789..997808f 100644 --- a/serieswriter/src/ratelimitwriter.rs +++ b/serieswriter/src/ratelimitwriter.rs @@ -1,3 +1,4 @@ +use crate::msptool::MspSplit; use crate::writer::EmittableType; use crate::writer::SeriesWriter; use core::fmt; @@ -8,7 +9,6 @@ use scywr::iteminsertqueue::QueryItem; use serde::Serialize; use series::SeriesId; use std::collections::VecDeque; -use std::marker::PhantomData; use std::time::Duration; use std::time::Instant; @@ -36,7 +36,7 @@ pub struct HousekeepingRes { } #[derive(Serialize)] -pub struct RateLimitWriter +pub struct RateLimitWriter where ET: EmittableType, { @@ -47,14 +47,14 @@ where last_insert_ts: TsNano, last_insert_val: Option, dbgname: String, - writer: SeriesWriter, + writer: SeriesWriter, do_trace_detail: bool, - _t1: PhantomData, } -impl RateLimitWriter +impl RateLimitWriter where ET: EmittableType, + SPL: MspSplit, { pub fn new( series: SeriesId, @@ -62,8 +62,9 @@ where is_polled: bool, emit_state: ::State, dbgname: String, + spl: SPL, ) -> Result { - let writer = SeriesWriter::new(series)?; + let writer = SeriesWriter::new(series, spl)?; let ret = Self { series, min_quiet, @@ -74,7 +75,6 @@ where dbgname, writer, do_trace_detail: series::dbg::dbg_series(series), - _t1: PhantomData, }; if ret.do_trace_detail { debug!("debug test for detail series"); @@ -182,7 +182,7 @@ where } } -impl fmt::Debug for RateLimitWriter +impl fmt::Debug for RateLimitWriter where ET: EmittableType, { diff --git a/serieswriter/src/rtwriter.rs b/serieswriter/src/rtwriter.rs index c43bd7e..51ae470 100644 --- a/serieswriter/src/rtwriter.rs +++ b/serieswriter/src/rtwriter.rs @@ -1,9 +1,11 @@ use crate::log; +use crate::msptool::dyngrid::MspSplitDyn; use crate::ratelimitwriter::RateLimitWriter; use crate::writer::EmittableType; use netpod::ScalarType; use netpod::Shape; use netpod::TsNano; +use netpod::ttl::RetentionTime; use scywr::insertqueues::InsertDeques; use scywr::iteminsertqueue::QueryItem; use serde::Serialize; @@ -59,11 +61,11 @@ impl MinQuiets { } #[derive(Debug, Serialize)] -struct State +struct State where ET: EmittableType, { - writer: RateLimitWriter, + writer: RateLimitWriter, } #[derive(Debug)] @@ -113,9 +115,9 @@ where series: SeriesId, scalar_type: ScalarType, shape: Shape, - state_st: State, - state_mt: State, - state_lt: State, + state_st: State, + state_mt: State, + state_lt: State, min_quiets: MinQuiets, do_trace_detail: bool, do_st_rf1: bool, @@ -139,15 +141,36 @@ where let dtd = series::dbg::dbg_series(series); debug_init!(dtd, "new {:?} is_polled {}", min_quiets, is_polled); let state_st = { - let writer = RateLimitWriter::new(series, min_quiets.st, is_polled, emit_state_new(), "st".into())?; + let writer = RateLimitWriter::new( + series, + min_quiets.st, + is_polled, + emit_state_new(), + "st".into(), + MspSplitDyn::new(1024 * 64, 1024 * 1024 * 10, RetentionTime::Short), + )?; State { writer } }; let state_mt = { - let writer = RateLimitWriter::new(series, min_quiets.mt, is_polled, emit_state_new(), "mt".into())?; + let writer = RateLimitWriter::new( + series, + min_quiets.mt, + is_polled, + emit_state_new(), + "mt".into(), + MspSplitDyn::new(1024 * 64, 1024 * 1024 * 10, RetentionTime::Medium), + )?; State { writer } }; let state_lt = { - let writer = RateLimitWriter::new(series, min_quiets.lt, is_polled, emit_state_new(), "lt".into())?; + let writer = RateLimitWriter::new( + series, + min_quiets.lt, + is_polled, + emit_state_new(), + "lt".into(), + MspSplitDyn::new(1024 * 64, 1024 * 1024 * 10, RetentionTime::Long), + )?; State { writer } }; let ret = Self { @@ -247,7 +270,7 @@ where } fn write_inner( - state: &mut State, + state: &mut State, item: ET, ts_net: Instant, tsev: TsNano, diff --git a/serieswriter/src/writer.rs b/serieswriter/src/writer.rs index 4d18904..ddbc625 100644 --- a/serieswriter/src/writer.rs +++ b/serieswriter/src/writer.rs @@ -8,7 +8,9 @@ use std::fmt; use std::marker::PhantomData; use std::time::Instant; +use crate::msptool::MspSplit; use netpod::ByteSize; +use netpod::ttl::RetentionTime; use scywr::iteminsertqueue::MspItem; pub use smallvec::SmallVec; use std::time::Duration; @@ -72,9 +74,9 @@ pub struct OnCloseRes { } #[derive(Debug, Serialize)] -pub struct SeriesWriter { +pub struct SeriesWriter { series: SeriesId, - msp_split: crate::msptool::MspSplit, + msp_split: SPL, evts_on_msp_write: TsNano, evts_latest: TsNano, do_trace_detail: bool, @@ -85,14 +87,15 @@ pub struct SeriesWriter { _t1: PhantomData, } -impl SeriesWriter +impl SeriesWriter where ET: EmittableType, + SPL: MspSplit, { - pub fn new(series: SeriesId) -> Result { + pub fn new(series: SeriesId, spl: SPL) -> Result { let res = Self { series, - msp_split: crate::msptool::MspSplit::new(1024 * 64, 1024 * 1024 * 10), + msp_split: spl, evts_on_msp_write: TsNano::from_ns(0), evts_latest: TsNano::from_ns(0), do_trace_detail: series::dbg::dbg_series(series), @@ -107,6 +110,10 @@ where self.series.clone() } + pub fn rt(&self) -> RetentionTime { + self.msp_split.rt() + } + pub fn write( &mut self, item: ET, diff --git a/stats/mettdecl.rs b/stats/mettdecl.rs index 5cb55d8..0779153 100644 --- a/stats/mettdecl.rs +++ b/stats/mettdecl.rs @@ -58,6 +58,10 @@ mod Metrics { recv_read_notify_but_no_longer_ready, recv_read_notify_but_not_init_yet, recv_event_add_while_wait_on_read_notify, + monitoring_read_expected, + monitoring_read_unexpected, + monitoring_read_diff_time, + monitoring_read_diff_value, transition_to_polling, transition_to_polling_bad_state, transition_to_polling_already_in,