From 21655b4a6733907b4f2aef64b0afbadc1efffe78 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 18 Mar 2025 14:57:21 +0100 Subject: [PATCH] Check for repeated value already in RtWriter --- daqingest/src/bin/daqingest.rs | 5 +-- netfetch/Cargo.toml | 1 + netfetch/src/ca/conn.rs | 43 +++++++++++++++++++++++++ netfetch/src/ca/proto.rs | 4 +++ scywr/src/config.rs | 8 +---- scywr/src/schema.rs | 50 ++++++++++++++++++----------- serieswriter/src/binwriter.rs | 4 +-- serieswriter/src/ratelimitwriter.rs | 35 +++++++++----------- serieswriter/src/rtwriter.rs | 33 +++++++++++++++++-- 9 files changed, 130 insertions(+), 53 deletions(-) diff --git a/daqingest/src/bin/daqingest.rs b/daqingest/src/bin/daqingest.rs index 2b6eef9..634ab8e 100644 --- a/daqingest/src/bin/daqingest.rs +++ b/daqingest/src/bin/daqingest.rs @@ -67,10 +67,7 @@ async fn main_run_inner(opts: DaqIngestOpts) -> Result<(), Error> { pass: k.pg_pass, name: k.pg_name, }; - let scyconf = ScyllaIngestConfig::new([k.scylla_host], k.scylla_keyspace, 3); - // scywr::schema::migrate_scylla_data_schema(&scyconf, netpod::ttl::RetentionTime::Short) - // .await - // .map_err(Error::from_string)?; + let scyconf = ScyllaIngestConfig::new([k.scylla_host], k.scylla_keyspace); match k.sub { DbSub::Data(u) => { use daqingest::opts::DbDataSub; diff --git a/netfetch/Cargo.toml b/netfetch/Cargo.toml index 777a7a0..549c329 100644 --- a/netfetch/Cargo.toml +++ b/netfetch/Cargo.toml @@ -27,6 +27,7 @@ axum = "0.8.1" http-body = "1" url = "2.5" chrono = "0.4" +time = { version = "0.3.40", features = ["serde"] } humantime = "2.1.0" humantime-serde = "1.1.1" pin-project = "1" diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index b949500..e58d1e5 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -79,6 +79,7 @@ use std::time::Duration; use std::time::Instant; use std::time::SystemTime; use taskrun::tokio; +use time::UtcDateTime; use tokio::net::TcpStream; const CONNECTING_TIMEOUT: Duration = Duration::from_millis(1000 * 6); @@ -188,6 +189,7 @@ pub struct ChannelStateInfo { pub write_mt_last: SystemTime, pub write_lt_last: SystemTime, pub status_emit_count: u64, + pub last_comparisons: Option>, } mod ser_instant { @@ -307,12 +309,21 @@ enum Monitoring2State { Passive(Monitoring2PassiveState), ReadPending(Ioid, Instant), } +#[derive(Debug, Clone, Serialize)] +enum MonitorReadCmp { + Equal, + DiffTime, + DiffTimeValue, + DiffValue, +} #[derive(Debug, Clone)] struct MonitoringState { tsbeg: Instant, subid: Subid, mon2state: Monitoring2State, + monitoring_event_last: Option, + last_comparisons: VecDeque<(time::UtcDateTime, MonitorReadCmp)>, } #[derive(Debug, Clone)] @@ -606,6 +617,13 @@ impl ChannelState { ChannelState::Writable(s) => s.channel.status_emit_count, _ => 0, }; + let last_comparisons = match self { + ChannelState::Writable(s) => match &s.reading { + ReadingState::Monitoring(st2) => Some(st2.last_comparisons.clone()), + _ => None, + }, + _ => None, + }; ChannelStateInfo { stnow, cssid, @@ -628,6 +646,7 @@ impl ChannelState { write_mt_last, write_lt_last, status_emit_count, + last_comparisons, } } @@ -1784,6 +1803,8 @@ impl CaConn { tsbeg: tsnow, ts_silence_read_next: tsnow + Self::silence_read_next_ivl_rng(&mut self.rng), }), + monitoring_event_last: Some(ev.clone()), + last_comparisons: VecDeque::new(), }); let crst = &mut st.channel; let writer = &mut st.writer; @@ -1821,6 +1842,7 @@ impl CaConn { let binwriter = &mut st.binwriter; let iqdqs = &mut self.iqdqs; let stats = self.stats.as_ref(); + st2.monitoring_event_last = Some(ev.clone()); Self::event_add_ingest( ev.payload_len, ev.value, @@ -2060,6 +2082,27 @@ impl CaConn { } let iqdqs = &mut self.iqdqs; let stats = self.stats.as_ref(); + // NOTE we do not update the last value in this ev handler. + { + if let Some(lst) = st2.monitoring_event_last.as_ref() { + // TODO compare with last monitoring value + if ev.value.data == lst.value.data { + if ev.value.meta == lst.value.meta { + st2.last_comparisons + .push_back((UtcDateTime::now(), MonitorReadCmp::Equal)); + } else { + st2.last_comparisons + .push_back((UtcDateTime::now(), MonitorReadCmp::DiffTime)); + } + } else { + st2.last_comparisons + .push_back((UtcDateTime::now(), MonitorReadCmp::DiffValue)); + } + } + while st2.last_comparisons.len() > 6 { + st2.last_comparisons.pop_front(); + } + } // TODO check ADEL to see if monitor should have fired. // But there is still a small chance that the monitor will just received slightly later. // More involved check would be to raise a flag, wait for the expected monitor for some diff --git a/netfetch/src/ca/proto.rs b/netfetch/src/ca/proto.rs index 0d36c65..107a183 100644 --- a/netfetch/src/ca/proto.rs +++ b/netfetch/src/ca/proto.rs @@ -1055,6 +1055,10 @@ impl CaMsg { } // TODO make response type for host name: 0x15 => CaMsg::from_ty_ts(CaMsgTy::HostName("TODOx5288".into()), tsnow), + 0x1b => { + warn!("HANDLE_SERVER_CHANNEL_DISCONNECT"); + return Err(Error::CaCommandNotSupported(x)); + } x => return Err(Error::CaCommandNotSupported(x)), }; Ok(msg) diff --git a/scywr/src/config.rs b/scywr/src/config.rs index 84d1587..e8a91b7 100644 --- a/scywr/src/config.rs +++ b/scywr/src/config.rs @@ -4,11 +4,10 @@ use serde::Deserialize; pub struct ScyllaIngestConfig { hosts: Vec, keyspace: String, - rf: u8, } impl ScyllaIngestConfig { - pub fn new(hosts: I, ks: K1, rf: u8) -> Self + pub fn new(hosts: I, ks: K1) -> Self where I: IntoIterator, H: Into, @@ -17,7 +16,6 @@ impl ScyllaIngestConfig { Self { hosts: hosts.into_iter().map(Into::into).collect(), keyspace: ks.into(), - rf, } } @@ -28,8 +26,4 @@ impl ScyllaIngestConfig { pub fn keyspace(&self) -> &String { &self.keyspace } - - pub fn rf(&self) -> u8 { - self.rf - } } diff --git a/scywr/src/schema.rs b/scywr/src/schema.rs index 3327204..fc3d280 100644 --- a/scywr/src/schema.rs +++ b/scywr/src/schema.rs @@ -54,7 +54,7 @@ impl Changeset { fn log_statements(&self) { for q in &self.todo { - info!("WOULD DO {q}"); + info!("would execute:\n{q}\n"); } } } @@ -70,10 +70,9 @@ pub async fn has_keyspace(name: &str, scy: &ScySession) -> Result { Ok(false) } -pub async fn has_table(name: &str, scy: &ScySession) -> Result { +pub async fn has_table(ks: &str, name: &str, scy: &ScySession) -> Result { let cql = "select table_name from system_schema.tables where keyspace_name = ?"; - let ks = scy.get_keyspace().ok_or_else(|| Error::NoKeyspaceChosen)?; - let mut res = scy.query_iter(cql, (ks.as_ref(),)).await?.rows_stream::<(String,)>()?; + let mut res = scy.query_iter(cql, (ks,)).await?.rows_stream::<(String,)>()?; while let Some((table_name,)) = res.try_next().await? { if table_name == name { return Ok(true); @@ -82,9 +81,12 @@ pub async fn has_table(name: &str, scy: &ScySession) -> Result { Ok(false) } -pub async fn check_table_readable(name: &str, scy: &ScySession) -> Result { +pub async fn check_table_readable(ks: &str, name: &str, scy: &ScySession) -> Result { use crate::scylla::transport::errors::QueryError; - match scy.query_unpaged(format!("select * from {} limit 1", name), ()).await { + match scy + .query_unpaged(format!("select * from {}.{} limit 1", ks, name), ()) + .await + { Ok(_) => Ok(true), Err(e) => match &e { QueryError::DbError(e2, msg) => match e2 { @@ -216,7 +218,7 @@ impl GenTwcsTab { } async fn has_table_name(&self, scy: &ScySession) -> Result { - has_table(self.name(), scy).await + has_table(self.keyspace(), self.name(), scy).await } fn cql(&self) -> String { @@ -283,7 +285,6 @@ impl GenTwcsTab { ); let x = scy.query_iter(cql, (self.keyspace(), self.name())).await?; let mut it = x.rows_stream::<(i32, i32, BTreeMap)>()?; - // let mut it = x.into_typed::<(i32, i32, BTreeMap)>(); let mut rows = Vec::new(); while let Some(u) = it.next().await { let row = u?; @@ -478,7 +479,7 @@ async fn check_event_tables( ], ["series", "ts_msp"], ["ts_lsp"], - rett.ttl_events_d1(), + rett.ttl_events_d0(), ); tab.setup(chs, scy).await?; } @@ -495,7 +496,7 @@ async fn check_event_tables( ], ["series", "ts_msp"], ["ts_lsp"], - rett.ttl_events_d1(), + rett.ttl_events_d0(), ); tab.setup(chs, scy).await?; } @@ -512,7 +513,7 @@ async fn check_event_tables( ], ["series", "ts_msp"], ["ts_lsp"], - rett.ttl_events_d1(), + rett.ttl_events_d0(), ); tab.setup(chs, scy).await?; } @@ -522,6 +523,7 @@ async fn check_event_tables( async fn migrate_scylla_data_schema( scyconf: &ScyllaIngestConfig, rett: RetentionTime, + rf: u8, chs: &mut Changeset, ) -> Result<(), Error> { let scy2 = create_session_no_ks(scyconf).await?; @@ -530,14 +532,13 @@ async fn migrate_scylla_data_schema( let ks = scyconf.keyspace(); if !has_keyspace(ks, scy).await? { - let replication = scyconf.rf(); let cql = format!( concat!( "create keyspace {}", " with replication = {{ 'class': 'SimpleStrategy', 'replication_factor': {} }}", " and durable_writes = {};" ), - ks, replication, durable + ks, rf, durable ); info!("scylla create keyspace {cql}"); chs.add_todo(cql); @@ -545,8 +546,6 @@ async fn migrate_scylla_data_schema( info!("scylla has keyspace {ks}"); } - scy.use_keyspace(ks, true).await?; - check_event_tables(ks, rett.clone(), chs, scy).await?; { @@ -692,13 +691,19 @@ async fn migrate_scylla_data_schema( } { let tn = format!("{}{}", rett.table_prefix(), "bin_write_index_v00"); - if has_table(&tn, scy).await? { + if has_table(ks, &tn, scy).await? { chs.add_todo(format!("drop table {}.{}", ks, tn)); } } { let tn = format!("{}{}", rett.table_prefix(), "bin_write_index_v01"); - if has_table(&tn, scy).await? { + if has_table(&ks, &tn, scy).await? { + chs.add_todo(format!("drop table {}.{}", ks, tn)); + } + } + { + let tn = format!("{}{}", rett.table_prefix(), "bin_write_index_v02"); + if has_table(&ks, &tn, scy).await? { chs.add_todo(format!("drop table {}.{}", ks, tn)); } } @@ -716,8 +721,15 @@ pub async fn migrate_scylla_data_schema_all_rt( RetentionTime::Long, RetentionTime::Short, ]; - for ((rt, scyconf), chs) in rts.clone().into_iter().zip(scyconfs.iter()).zip(chsa.iter_mut()) { - migrate_scylla_data_schema(scyconf, rt, chs).await?; + let rfs = [3, 3, 3, 1]; + for (((rt, scyconf), chs), rf) in rts + .clone() + .into_iter() + .zip(scyconfs.iter()) + .zip(chsa.iter_mut()) + .zip(rfs.iter().map(|&x| x)) + { + migrate_scylla_data_schema(scyconf, rt, rf, chs).await?; } let todo = chsa.iter().any(|x| x.has_to_do()); if do_change { diff --git a/serieswriter/src/binwriter.rs b/serieswriter/src/binwriter.rs index 75b0801..e1d8993 100644 --- a/serieswriter/src/binwriter.rs +++ b/serieswriter/src/binwriter.rs @@ -485,7 +485,7 @@ impl BinWriter { series: series.id() as i64, pbp: pbp_ix.db_ix() as i16, msp: msp as i32, - rt: rt.index_db_i32() as i16, + rt: rt.to_index_db_i32() as i16, lsp: lsp as i32, binlen: pbp.bin_len().ms() as i32, }; @@ -513,7 +513,7 @@ impl BinWriter { series: series.id() as i64, pbp: pbp_ix.db_ix() as i16, msp: msp as i32, - rt: rt.index_db_i32() as i16, + rt: rt.to_index_db_i32() as i16, lsp: lsp as i32, binlen: pbp.bin_len().ms() as i32, }; diff --git a/serieswriter/src/ratelimitwriter.rs b/serieswriter/src/ratelimitwriter.rs index ecd3fd2..9b30e66 100644 --- a/serieswriter/src/ratelimitwriter.rs +++ b/serieswriter/src/ratelimitwriter.rs @@ -1,9 +1,9 @@ use crate::writer::EmittableType; use crate::writer::SeriesWriter; use core::fmt; -use netpod::log::*; use netpod::DtNano; use netpod::TsNano; +use netpod::log; use scywr::iteminsertqueue::QueryItem; use series::SeriesId; use std::collections::VecDeque; @@ -11,7 +11,9 @@ use std::marker::PhantomData; use std::time::Duration; use std::time::Instant; -macro_rules! trace_rt_decision { ($det:expr, $($arg:tt)*) => { if $det { trace!($($arg)*); } }; } +macro_rules! debug { ($($arg:expr),*) => ( if true { log::debug!($($arg),*); } ); } +macro_rules! trace { ($($arg:expr),*) => ( if true { log::trace!($($arg),*); } ); } +macro_rules! trace_rt_decision { ($det:expr, $($arg:expr),*) => ( if $det { log::trace!($($arg),*); } ); } autoerr::create_error_v1!( name(Error, "RateLimitWriter"), @@ -92,10 +94,13 @@ where if false { trace_rt_decision!( det, - "{dbgname} {sid} min_quiet {min_quiet:?} ts1 {ts1:?} ts2 {ts2:?} item {item:?}", - ts1 = ts.ms(), - ts2 = tsl.ms(), - item = item, + "{} {} min_quiet {:?} ts1 {:?} ts2 {:?} item {:?}", + dbgname, + sid, + min_quiet, + ts.ms(), + tsl.ms(), + item ); } let do_write = { @@ -105,7 +110,11 @@ where } else if ts < tsl { trace_rt_decision!( det, - "{dbgname} {sid} ignore, because ts_local rewind {ts:?} {tsl:?}", + "{} {} ignore, because ts_local rewind {:?} {:?}", + dbgname, + sid, + ts, + tsl ); false } else if !self.is_polled && ts.ms() < tsl.ms() + min_quiet { @@ -120,20 +129,8 @@ where } else if ts < tsl.add_dt_nano(DtNano::from_ms(5)) { trace_rt_decision!(det, "{dbgname} {sid} ignore, because store rate cap"); false - } else if self - .last_insert_val - .as_ref() - .map(|k| !item.has_change(k)) - .unwrap_or(false) - { - trace_rt_decision!(det, "{dbgname} {sid} ignore, because value did not change"); - false } else { trace_rt_decision!(det, "{dbgname} {sid} accept"); - if true { - self.last_insert_val = Some(item.clone()); - } - self.last_insert_ts = ts.clone(); true } }; diff --git a/serieswriter/src/rtwriter.rs b/serieswriter/src/rtwriter.rs index c747bac..5a8ab2d 100644 --- a/serieswriter/src/rtwriter.rs +++ b/serieswriter/src/rtwriter.rs @@ -12,6 +12,7 @@ use std::time::Duration; use std::time::Instant; macro_rules! trace_emit { ($det:expr, $($arg:expr),*) => ( if $det { log::trace!($($arg),*); } ); } +macro_rules! trace_rt_decision { ($det:expr, $($arg:expr),*) => ( if $det { log::trace!($($arg),*); } ); } autoerr::create_error_v1!( name(Error, "SerieswriterRtwriter"), @@ -48,6 +49,10 @@ impl WriteRes { pub fn nstatus(&self) -> u8 { self.st.status + self.mt.status + self.lt.status } + + pub fn accept_any(&self) -> bool { + self.lt.accept || self.mt.accept || self.st.accept + } } #[derive(Debug)] @@ -81,6 +86,8 @@ where min_quiets: MinQuiets, do_trace_detail: bool, do_st_rf1: bool, + last_insert_ts: TsNano, + last_insert_val: Option, } impl RtWriter @@ -119,6 +126,8 @@ where min_quiets, do_trace_detail: netpod::TRACE_SERIES_ID.contains(&series.id()), do_st_rf1, + last_insert_ts: TsNano::from_ns(0), + last_insert_val: None, }; Ok(ret) } @@ -152,9 +161,15 @@ where // Optimize for the common case that we only write into one of the stores. // Make the decision first, based on ref, then clone only as required. let res_lt; - let mut res_mt = WriteRtRes::default(); - let mut res_st = WriteRtRes::default(); + let res_mt; + let res_st; + if self + .last_insert_val + .as_ref() + .map(|k| item.has_change(k)) + .unwrap_or(true) { + // TODO filter duplicate values already here res_lt = Self::write_inner(&mut self.state_lt, item.clone(), ts_net, tsev, &mut iqdqs.lt_rf3_qu)?; if !res_lt.accept { res_mt = Self::write_inner(&mut self.state_mt, item.clone(), ts_net, tsev, &mut iqdqs.mt_rf3_qu)?; @@ -166,14 +181,28 @@ where res_st = Self::write_inner(&mut self.state_st, item.clone(), ts_net, tsev, &mut iqdqs.st_rf3_qu)?; } + } else { + res_st = WriteRtRes::default(); } + } else { + res_mt = WriteRtRes::default(); + res_st = WriteRtRes::default(); } + } else { + trace_rt_decision!(det, "{} ignore, because value did not change", self.series); + res_lt = WriteRtRes::default(); + res_mt = WriteRtRes::default(); + res_st = WriteRtRes::default(); } let ret = WriteRes { st: res_st, mt: res_mt, lt: res_lt, }; + if ret.accept_any() { + self.last_insert_ts = tsev.clone(); + self.last_insert_val = Some(item.clone()); + } Ok(ret) }