diff --git a/.cargo/cargo-lock b/.cargo/cargo-lock index 8f9c8c5..af140c1 100644 --- a/.cargo/cargo-lock +++ b/.cargo/cargo-lock @@ -371,9 +371,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.31" +version = "4.5.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "027bb0d98429ae334a8698531da7077bdf906419543a35a55c2cb1b66437d767" +checksum = "6088f3ae8c3608d19260cd7445411865a485688711b78b5be70d78cd96136f83" dependencies = [ "clap_builder", "clap_derive", @@ -381,9 +381,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.31" +version = "4.5.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5589e0cba072e0f3d23791efac0fd8627b49c829c196a492e88168e6a669d863" +checksum = "22a7ef7f676155edfb82daa97f99441f3ebf4a58d5e32f295a56259f1b6facc8" dependencies = [ "anstream", "anstyle", @@ -393,9 +393,9 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.5.28" +version = "4.5.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf4ced95c6f4a675af3da73304b9ac4ed991640c36374e4b46795c49e17cf1ed" +checksum = "09176aae279615badda0765c0c0b3f6ed53f4709118af73cf4655d85d1530cd7" dependencies = [ "heck", "proc-macro2", @@ -713,7 +713,7 @@ dependencies = [ [[package]] name = "daqingest" -version = "0.2.7-aa.6" +version = "0.2.7-aa.7" dependencies = [ "async-channel", "autoerr", @@ -1152,9 +1152,9 @@ dependencies = [ [[package]] name = "http" -version = "1.2.0" +version = "1.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f16ca2af56261c99fba8bac40a10251ce8188205a4c448fbb745a2e4daa76fea" +checksum = "f4a85d31aea989eead29a3aaf9e1115a180df8282431156e533de47660892565" dependencies = [ "bytes", "fnv", @@ -1173,12 +1173,12 @@ dependencies = [ [[package]] name = "http-body-util" -version = "0.1.2" +version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "793429d76616a256bcb62c2a2ec2bed781c8307e797e2598c50010f2bee2544f" +checksum = "b021d93e26becf5dc7e1b75b1bed1fd93124b374ceb73f43d4d4eafec896a64a" dependencies = [ "bytes", - "futures-util", + "futures-core", "http", "http-body", "pin-project-lite", @@ -1198,9 +1198,9 @@ checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" [[package]] name = "humantime" -version = "2.1.0" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" +checksum = "9b112acc8b3adf4b107a8ec20977da0273a8c386765a3ec0229bd500a1443f9f" [[package]] name = "humantime-serde" @@ -1417,9 +1417,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.7.1" +version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c9c992b02b5b4c94ea26e32fe5bccb7aa7d9f390ab5c1221ff895bc7ea8b652" +checksum = "3954d50fe15b02142bf25d3b8bdadb634ec3948f103d04ffe3031bc8fe9d7058" dependencies = [ "equivalent", "hashbrown 0.15.2", @@ -1492,9 +1492,9 @@ checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" [[package]] name = "libc" -version = "0.2.170" +version = "0.2.171" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "875b3680cb2f8f71bdcf9a30f38d48282f5d3c95cbf9b3fa57269bb5d5c06828" +checksum = "c19937216e9d3aa9956d9bb8dfc0b0c8beb6058fc4f7a4dc4d850edf86a237d6" [[package]] name = "litemap" @@ -1695,9 +1695,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.20.3" +version = "1.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "945462a4b81e43c4e3ba96bd7b49d834c6f61198356aa858733bc4acf3cbe62e" +checksum = "cde51589ab56b20a6f686b2c68f7a0bd6add753d697abf720d63f8db3ab7b1ad" [[package]] name = "overload" @@ -1849,11 +1849,11 @@ checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" [[package]] name = "ppv-lite86" -version = "0.2.20" +version = "0.2.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77957b295656769bb8ad2b6a6b09d897d94f05c41b069aede1fcdaa675eaea04" +checksum = "85eae3c4ed2f50dcfe72643da4befc30deadb458a9b590d720cde2f2b1e97da9" dependencies = [ - "zerocopy 0.7.35", + "zerocopy 0.8.23", ] [[package]] @@ -1867,9 +1867,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.39" +version = "1.0.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1f1914ce909e1658d9907913b4b91947430c7d9be598b15a1912935b8c04801" +checksum = "1885c039570dc00dcb4ff087a89e185fd56bae234ddc7f056a945bf36467248d" dependencies = [ "proc-macro2", ] @@ -2135,9 +2135,9 @@ checksum = "56e6fa9c48d24d85fb3de5ad847117517440f6beceb7798af16b4a87d616b8d0" [[package]] name = "serde" -version = "1.0.218" +version = "1.0.219" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8dfc9d19bdbf6d17e22319da49161d5d0108e4188e8b680aef6299eed22df60" +checksum = "5f0e2c6ed6606019b4e29e69dbaba95b11854410e5347d525002456dbbb786b6" dependencies = [ "serde_derive", ] @@ -2154,9 +2154,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.218" +version = "1.0.219" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f09503e191f4e797cb8aac08e9a4a4695c5edf6a2e70e376d961ddd5c969f82b" +checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00" dependencies = [ "proc-macro2", "quote", @@ -2391,9 +2391,9 @@ checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" [[package]] name = "syn" -version = "2.0.99" +version = "2.0.100" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e02e925281e18ffd9d640e234264753c43edc62d64b2d4cf898f1bc5e75f3fc2" +checksum = "b09a44accad81e1ba1cd74a32461ba89dee89095ba17b32f5d03683b1b1fc2a0" dependencies = [ "proc-macro2", "quote", @@ -2538,9 +2538,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.43.0" +version = "1.44.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d61fa4ffa3de412bfea335c6ecff681de2b609ba3c77ef3e00e521813a9ed9e" +checksum = "f382da615b842244d4b8738c82ed1275e6c5dd90c459a30941cd07080b06c91a" dependencies = [ "backtrace", "bytes", @@ -3102,7 +3102,6 @@ version = "0.7.35" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0" dependencies = [ - "byteorder", "zerocopy-derive 0.7.35", ] diff --git a/daqingest/Cargo.toml b/daqingest/Cargo.toml index 727fa47..a64ae8e 100644 --- a/daqingest/Cargo.toml +++ b/daqingest/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqingest" -version = "0.2.7-aa.6" +version = "0.2.7-aa.7" authors = ["Dominik Werder "] edition = "2024" diff --git a/daqingest/src/bin/daqingest.rs b/daqingest/src/bin/daqingest.rs index 7307ee7..2b6eef9 100644 --- a/daqingest/src/bin/daqingest.rs +++ b/daqingest/src/bin/daqingest.rs @@ -67,7 +67,7 @@ async fn main_run_inner(opts: DaqIngestOpts) -> Result<(), Error> { pass: k.pg_pass, name: k.pg_name, }; - let scyconf = ScyllaIngestConfig::new([k.scylla_host], k.scylla_keyspace, "DUMMY"); + let scyconf = ScyllaIngestConfig::new([k.scylla_host], k.scylla_keyspace, 3); // scywr::schema::migrate_scylla_data_schema(&scyconf, netpod::ttl::RetentionTime::Short) // .await // .map_err(Error::from_string)?; @@ -170,6 +170,7 @@ async fn scylla_schema_check(opts: CaIngestOpts, do_change: bool) -> Result<(), opts.scylla_config_st(), opts.scylla_config_mt(), opts.scylla_config_lt(), + opts.scylla_config_st_rf1(), ], do_change, ) diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index 130acd0..a073125 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -807,6 +807,7 @@ pub async fn run(opts: CaIngestOpts, channels_config: Option) -> opts.scylla_config_st(), opts.scylla_config_mt(), opts.scylla_config_lt(), + opts.scylla_config_st_rf1(), ], false, ) diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index e0c9796..b949500 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -1320,6 +1320,7 @@ impl CaConn { shape, ch.conf.min_quiets(), ch.conf.is_polled(), + ch.conf.replication(), &|| CaWriterValueState::new(st.series_status, chinfo.series.to_series()), )?; self.handle_writer_establish_inner(cid, writer)?; diff --git a/netfetch/src/conf.rs b/netfetch/src/conf.rs index f09ac5e..feffa61 100644 --- a/netfetch/src/conf.rs +++ b/netfetch/src/conf.rs @@ -1,6 +1,6 @@ use err::Error; -use netpod::log::*; use netpod::Database; +use netpod::log::*; use regex::Regex; use scywr::config::ScyllaIngestConfig; use serde::Deserialize; @@ -32,6 +32,7 @@ pub struct CaIngestOpts { scylla_st: ScyllaIngestConfig, scylla_mt: ScyllaIngestConfig, scylla_lt: ScyllaIngestConfig, + scylla_st_rf1: ScyllaIngestConfig, array_truncate: Option, insert_worker_count: Option, insert_worker_concurrency: Option, @@ -80,6 +81,10 @@ impl CaIngestOpts { &self.scylla_lt } + pub fn scylla_config_st_rf1(&self) -> &ScyllaIngestConfig { + &self.scylla_st_rf1 + } + pub fn search(&self) -> &Vec { &self.search } @@ -358,11 +363,11 @@ fn bool_true() -> bool { mod serde_ingest_config_archiving { use super::ChannelReadConfigApiFormat; use super::IngestConfigArchiving; + use serde::Deserializer; + use serde::Serializer; use serde::de; use serde::ser; use serde::ser::SerializeMap; - use serde::Deserializer; - use serde::Serializer; use std::fmt; impl ser::Serialize for IngestConfigArchiving { @@ -421,9 +426,9 @@ mod serde_ChannelReadConfigApiFormat { } mod serde_replication_bool { - use serde::de; use serde::Deserializer; use serde::Serializer; + use serde::de; use std::fmt; pub fn serialize(v: &bool, ser: S) -> Result @@ -484,9 +489,9 @@ mod serde_replication_bool { mod serde_option_channel_read_config { use super::ChannelReadConfig; - use serde::de; use serde::Deserializer; use serde::Serializer; + use serde::de; use std::fmt; use std::time::Duration; @@ -568,11 +573,7 @@ pub enum ChannelReadConfig { impl ChannelReadConfig { pub fn is_monitor(&self) -> bool { - if let Self::Monitor = self { - true - } else { - false - } + if let Self::Monitor = self { true } else { false } } } @@ -680,6 +681,10 @@ impl ChannelConfig { self.arch.is_polled } + pub fn replication(&self) -> bool { + self.arch.replication + } + pub fn poll_conf(&self) -> Option<(u64,)> { if self.is_polled() { if let Some(ChannelReadConfig::Poll(x)) = self.arch.short_term { diff --git a/scywr/src/config.rs b/scywr/src/config.rs index 522daf7..84d1587 100644 --- a/scywr/src/config.rs +++ b/scywr/src/config.rs @@ -4,21 +4,20 @@ use serde::Deserialize; pub struct ScyllaIngestConfig { hosts: Vec, keyspace: String, - keyspace_rf1: Option, + rf: u8, } impl ScyllaIngestConfig { - pub fn new(hosts: I, ks_rf3: K1, ks_rf1: K2) -> Self + pub fn new(hosts: I, ks: K1, rf: u8) -> Self where I: IntoIterator, H: Into, K1: Into, - K2: Into, { Self { hosts: hosts.into_iter().map(Into::into).collect(), - keyspace: ks_rf3.into(), - keyspace_rf1: Some(ks_rf1.into()), + keyspace: ks.into(), + rf, } } @@ -30,7 +29,7 @@ impl ScyllaIngestConfig { &self.keyspace } - pub fn keyspace_rf1(&self) -> Option<&String> { - self.keyspace_rf1.as_ref() + pub fn rf(&self) -> u8 { + self.rf } } diff --git a/scywr/src/schema.rs b/scywr/src/schema.rs index 5fc15b0..3327204 100644 --- a/scywr/src/schema.rs +++ b/scywr/src/schema.rs @@ -527,44 +527,24 @@ async fn migrate_scylla_data_schema( let scy2 = create_session_no_ks(scyconf).await?; let scy = &scy2; let durable = true; + let ks = scyconf.keyspace(); - if !has_keyspace(scyconf.keyspace(), scy).await? { - // TODO - let replication = 3; + if !has_keyspace(ks, scy).await? { + let replication = scyconf.rf(); let cql = format!( concat!( "create keyspace {}", " with replication = {{ 'class': 'SimpleStrategy', 'replication_factor': {} }}", " and durable_writes = {};" ), - scyconf.keyspace(), - replication, - durable + ks, replication, durable ); info!("scylla create keyspace {cql}"); chs.add_todo(cql); + } else { + info!("scylla has keyspace {ks}"); } - if let Some(ks) = scyconf.keyspace_rf1() { - if !has_keyspace(ks, scy).await? { - let replication = 1; - let cql = format!( - concat!( - "create keyspace {}", - " with replication = {{ 'class': 'SimpleStrategy', 'replication_factor': {} }}", - " and durable_writes = {};" - ), - scyconf.keyspace(), - replication, - durable - ); - info!("scylla create keyspace {cql}"); - chs.add_todo(cql); - } - } - - let ks = scyconf.keyspace(); - scy.use_keyspace(ks, true).await?; check_event_tables(ks, rett.clone(), chs, scy).await?; @@ -726,18 +706,23 @@ async fn migrate_scylla_data_schema( } pub async fn migrate_scylla_data_schema_all_rt( - scyconfs: [&ScyllaIngestConfig; 3], + scyconfs: [&ScyllaIngestConfig; 4], do_change: bool, ) -> Result<(), Error> { - let mut chsa = [Changeset::new(), Changeset::new(), Changeset::new()]; - let rts = [RetentionTime::Short, RetentionTime::Medium, RetentionTime::Long]; - for ((rt, scyconf), chs) in rts.clone().into_iter().zip(scyconfs.into_iter()).zip(chsa.iter_mut()) { + let mut chsa = [Changeset::new(), Changeset::new(), Changeset::new(), Changeset::new()]; + let rts = [ + RetentionTime::Short, + RetentionTime::Medium, + RetentionTime::Long, + RetentionTime::Short, + ]; + for ((rt, scyconf), chs) in rts.clone().into_iter().zip(scyconfs.iter()).zip(chsa.iter_mut()) { migrate_scylla_data_schema(scyconf, rt, chs).await?; } let todo = chsa.iter().any(|x| x.has_to_do()); if do_change { if todo { - for ((_rt, scyconf), chs) in rts.into_iter().zip(scyconfs.into_iter()).zip(chsa.iter_mut()) { + for ((_rt, scyconf), chs) in rts.into_iter().zip(scyconfs.iter()).zip(chsa.iter_mut()) { if chs.has_to_do() { let scy2 = create_session_no_ks(scyconf).await?; let scy = &scy2; diff --git a/serieswriter/src/rtwriter.rs b/serieswriter/src/rtwriter.rs index 11b2b6c..c747bac 100644 --- a/serieswriter/src/rtwriter.rs +++ b/serieswriter/src/rtwriter.rs @@ -1,4 +1,4 @@ -use crate::log::*; +use crate::log; use crate::ratelimitwriter::RateLimitWriter; use crate::writer::EmittableType; use netpod::ScalarType; @@ -11,13 +11,7 @@ use std::collections::VecDeque; use std::time::Duration; use std::time::Instant; -macro_rules! trace_emit { - ($det:expr, $($arg:tt)*) => { - if $det { - trace!($($arg)*); - } - }; -} +macro_rules! trace_emit { ($det:expr, $($arg:expr),*) => ( if $det { log::trace!($($arg),*); } ); } autoerr::create_error_v1!( name(Error, "SerieswriterRtwriter"), @@ -86,6 +80,7 @@ where state_lt: State, min_quiets: MinQuiets, do_trace_detail: bool, + do_st_rf1: bool, } impl RtWriter @@ -98,6 +93,7 @@ where shape: Shape, min_quiets: MinQuiets, is_polled: bool, + do_st_rf1: bool, emit_state_new: &dyn Fn() -> ::State, ) -> Result { let state_st = { @@ -122,6 +118,7 @@ where state_lt, min_quiets, do_trace_detail: netpod::TRACE_SERIES_ID.contains(&series.id()), + do_st_rf1, }; Ok(ret) } @@ -162,7 +159,13 @@ where if !res_lt.accept { res_mt = Self::write_inner(&mut self.state_mt, item.clone(), ts_net, tsev, &mut iqdqs.mt_rf3_qu)?; if !res_mt.accept { - res_st = Self::write_inner(&mut self.state_st, item.clone(), ts_net, tsev, &mut iqdqs.st_rf3_qu)?; + if self.do_st_rf1 { + res_st = + Self::write_inner(&mut self.state_st, item.clone(), ts_net, tsev, &mut iqdqs.st_rf1_qu)?; + } else { + res_st = + Self::write_inner(&mut self.state_st, item.clone(), ts_net, tsev, &mut iqdqs.st_rf3_qu)?; + } } } } @@ -191,7 +194,11 @@ where } pub fn tick(&mut self, iqdqs: &mut InsertDeques) -> Result<(), Error> { - self.state_st.writer.tick(&mut iqdqs.st_rf3_qu)?; + if self.do_st_rf1 { + self.state_st.writer.tick(&mut iqdqs.st_rf1_qu)?; + } else { + self.state_st.writer.tick(&mut iqdqs.st_rf3_qu)?; + } self.state_mt.writer.tick(&mut iqdqs.mt_rf3_qu)?; self.state_lt.writer.tick(&mut iqdqs.lt_rf3_qu)?; Ok(())