From 7beb5a9ced749da266ef40bbbbf2201d87516f07 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 20 Feb 2024 09:44:38 +0100 Subject: [PATCH] Refactor scylla config and default ttls --- batchtools/src/batcher.rs | 3 +- daqingest/src/daemon.rs | 12 ++- dbpg/src/seriesbychannel.rs | 2 +- netfetch/src/ca/connset.rs | 15 ++-- netfetch/src/ca/finder.rs | 63 +++++++++------ netfetch/src/ca/findioc.rs | 85 ++++++++++++++------ netfetch/src/ca/search.rs | 6 +- netfetch/src/conf.rs | 54 ++++++++----- scywr/Cargo.toml | 1 + scywr/src/config.rs | 28 +++++++ scywr/src/insertworker.rs | 52 ++++-------- scywr/src/iteminsertqueue.rs | 39 +-------- scywr/src/schema.rs | 148 +++++++++++++++++------------------ scywr/src/session.rs | 10 +-- scywr/src/store.rs | 87 ++++++-------------- scywr/src/tools.rs | 8 +- serieswriter/src/writer.rs | 9 +-- 17 files changed, 307 insertions(+), 315 deletions(-) diff --git a/batchtools/src/batcher.rs b/batchtools/src/batcher.rs index 13ad720..545b901 100644 --- a/batchtools/src/batcher.rs +++ b/batchtools/src/batcher.rs @@ -45,7 +45,6 @@ async fn run_batcher(rx: Receiver, batch_tx: Sender>, batch_limit: } } Err(e) => { - error!("------------------------------------------ error in batcher, no more input {e}"); break; } }, @@ -58,5 +57,5 @@ async fn run_batcher(rx: Receiver, batch_tx: Sender>, batch_limit: } } } - warn!("-------- batcher is done --------------"); + debug!("-------- batcher is done --------------"); } diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index fe4f870..84d6b93 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -5,7 +5,6 @@ use async_channel::Sender; use async_channel::WeakSender; use err::Error; use log::*; -use netfetch::ca::conn; use netfetch::ca::connset::CaConnSet; use netfetch::ca::connset::CaConnSetCtrl; use netfetch::ca::connset::CaConnSetItem; @@ -16,8 +15,9 @@ use netfetch::daemon_common::Channel; use netfetch::daemon_common::DaemonEvent; use netfetch::metrics::StatsSet; use netfetch::throttletrace::ThrottleTrace; +use netpod::ttl::RetentionTime; use netpod::Database; -use netpod::ScyllaConfig; +use scywr::config::ScyllaIngestConfig; use scywr::insertworker::InsertWorkerOpts; use scywr::insertworker::Ttls; use scywr::iteminsertqueue as scywriiq; @@ -45,7 +45,7 @@ const CHECK_CHANNEL_SLOW_WARN: Duration = Duration::from_millis(500); pub struct DaemonOpts { pgconf: Database, - scyconf: ScyllaConfig, + scyconf: ScyllaIngestConfig, ttls: Ttls, #[allow(unused)] test_bsread_addr: Option, @@ -160,7 +160,6 @@ impl Daemon { rx }; - let ttls = opts.ttls.clone(); let insert_worker_opts = InsertWorkerOpts { store_workers_rate: opts.store_workers_rate.clone(), insert_workers_running: Arc::new(AtomicU64::new(0)), @@ -177,7 +176,6 @@ impl Daemon { insert_worker_opts, insert_worker_stats.clone(), ingest_opts.use_rate_limit_queue(), - ttls, ) .await?; let stats = Arc::new(DaemonStats::new()); @@ -560,7 +558,6 @@ fn handler_sigterm(_a: libc::c_int, _b: *const libc::siginfo_t, _c: *const libc: pub async fn run(opts: CaIngestOpts, channels_config: Option) -> Result<(), Error> { info!("start up {opts:?}"); - debug!("channels_config {channels_config:?}"); ingest_linux::signal::set_signal_handler(libc::SIGINT, handler_sigint).map_err(Error::from_string)?; ingest_linux::signal::set_signal_handler(libc::SIGTERM, handler_sigterm).map_err(Error::from_string)?; @@ -569,8 +566,9 @@ pub async fn run(opts: CaIngestOpts, channels_config: Option) -> .map_err(Error::from_string)?; dbpg::schema::schema_check(&pg).await.map_err(Error::from_string)?; drop(pg); + jh.await?.map_err(Error::from_string)?; - scywr::schema::migrate_scylla_data_schema(opts.scylla_config()) + scywr::schema::migrate_scylla_data_schema(opts.scylla_config(), RetentionTime::Short) .await .map_err(Error::from_string)?; diff --git a/dbpg/src/seriesbychannel.rs b/dbpg/src/seriesbychannel.rs index e3d45d3..22c2878 100644 --- a/dbpg/src/seriesbychannel.rs +++ b/dbpg/src/seriesbychannel.rs @@ -241,7 +241,7 @@ impl Worker { } }; } - debug!("Worker2 done"); + trace!("Worker2 done"); Ok(()) } diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index d9f6c2a..ba62040 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -478,7 +478,7 @@ impl CaConnSet { } async fn run(mut this: CaConnSet) -> Result<(), Error> { - debug!("CaConnSet run begin"); + trace!("CaConnSet run begin"); loop { let x = this.next().await; match x { @@ -486,19 +486,14 @@ impl CaConnSet { None => break, } } - // debug!( - // "search_tx sender {} receiver {}", - // this.find_ioc_query_tx.sender_count(), - // this.find_ioc_query_tx.receiver_count() - // ); - debug!("CaConnSet EndOfStream"); - debug!("join ioc_finder_jh A {:?}", this.find_ioc_query_sender.len()); + trace!("CaConnSet EndOfStream"); + trace!("join ioc_finder_jh A {:?}", this.find_ioc_query_sender.len()); this.find_ioc_query_sender.as_mut().drop(); - debug!("join ioc_finder_jh B {:?}", this.find_ioc_query_sender.len()); + trace!("join ioc_finder_jh B {:?}", this.find_ioc_query_sender.len()); this.ioc_finder_jh .await .map_err(|e| Error::with_msg_no_trace(e.to_string()))??; - debug!("joined ioc_finder_jh"); + trace!("joined ioc_finder_jh"); this.connset_out_tx.close(); this.connset_inp_rx.close(); this.shutdown_done = true; diff --git a/netfetch/src/ca/finder.rs b/netfetch/src/ca/finder.rs index 75581e6..800392d 100644 --- a/netfetch/src/ca/finder.rs +++ b/netfetch/src/ca/finder.rs @@ -96,7 +96,10 @@ async fn finder_full( )); let jh2 = taskrun::spawn(finder_network_if_not_found(rx1, tx, opts.clone(), stats)); jh1.await??; + trace!("finder::finder_full awaited A"); jh2.await??; + trace!("finder::finder_full awaited B"); + trace!("finder::finder_full done"); Ok(()) } @@ -108,21 +111,28 @@ async fn finder_worker( stats: Arc, ) -> Result<(), Error> { // TODO do something with join handle - let (batch_rx, jh) = batchtools::batcher::batch( + let (batch_rx, jh_batch) = batchtools::batcher::batch( SEARCH_BATCH_MAX, Duration::from_millis(200), SEARCH_DB_PIPELINE_LEN, qrx, ); + let mut jhs = Vec::new(); for _ in 0..SEARCH_DB_PIPELINE_LEN { - // TODO use join handle - tokio::spawn(finder_worker_single( + let jh = tokio::spawn(finder_worker_single( batch_rx.clone(), tx.clone(), backend.clone(), db.clone(), stats.clone(), )); + jhs.push(jh); + } + jh_batch.await?; + trace!("finder_worker jh_batch awaited"); + for (i, jh) in jhs.into_iter().enumerate() { + jh.await??; + trace!("finder_worker single {i} awaited"); } Ok(()) } @@ -165,17 +175,7 @@ async fn finder_worker_single( dt.as_secs_f32() * 1e3 ); if dt > Duration::from_millis(5000) { - let mut out = String::from("["); - for e in &batch { - if out.len() > 1 { - out.push_str(", "); - } - out.push('\''); - out.push_str(e.name()); - out.push('\''); - } - out.push(']'); - trace!("very slow query\n{out}"); + warn!("very slow query"); } match qres { Ok(rows) => { @@ -237,8 +237,9 @@ async fn finder_worker_single( Err(_e) => break, } } - debug!("finder_worker_single done"); + drop(pg); jh.await?.map_err(|e| Error::from_string(e))?; + trace!("finder_worker_single done"); Ok(()) } @@ -248,13 +249,14 @@ async fn finder_network_if_not_found( opts: CaIngestOpts, stats: Arc, ) -> Result<(), Error> { - let (net_tx, net_rx, jh, jhs) = ca_search_workers_start(&opts, stats.clone()).await.unwrap(); + let self_name = "finder_network_if_not_found"; + let (net_tx, net_rx, jh_ca_search) = ca_search_workers_start(&opts, stats.clone()).await?; let jh2 = taskrun::spawn(process_net_result(net_rx, tx.clone(), opts.clone())); 'outer: while let Ok(item) = rx.recv().await { let mut res = VecDeque::new(); let mut net = VecDeque::new(); for e in item { - trace!("finder_network_if_not_found sees {e:?}"); + trace!("{self_name} sees {e:?}"); if e.addr.is_none() { net.push_back(e.channel); } else { @@ -262,20 +264,22 @@ async fn finder_network_if_not_found( } } if let Err(_) = tx.send(res).await { + debug!("{self_name} res send error, break"); break; } for ch in net { if let Err(_) = net_tx.send(ch).await { + debug!("{self_name} net ch send error, break"); break 'outer; } } } - for jh in jhs { - jh.await??; - } - jh.await??; + drop(net_tx); + trace!("{self_name} loop end"); + jh_ca_search.await??; + trace!("{self_name} jh_ca_search awaited"); jh2.await??; - debug!("finder_network_if_not_found done"); + trace!("{self_name} process_net_result awaited"); Ok(()) } @@ -290,9 +294,13 @@ async fn process_net_result( let mut index_worker_pg_jh = Vec::new(); for _ in 0..IOC_SEARCH_INDEX_WORKER_COUNT { let backend = opts.backend().into(); - let (pg, jh) = dbpg::conn::make_pg_client(opts.postgresql_config()).await.unwrap(); + let (pg, jh) = dbpg::conn::make_pg_client(opts.postgresql_config()) + .await + .map_err(Error::from_string)?; index_worker_pg_jh.push(jh); - let worker = IocSearchIndexWorker::prepare(dbrx.clone(), backend, pg).await.unwrap(); + let worker = IocSearchIndexWorker::prepare(dbrx.clone(), backend, pg) + .await + .map_err(Error::from_string)?; let jh = tokio::spawn(async move { worker.worker().await }); ioc_search_index_worker_jhs.push(jh); } @@ -317,6 +325,13 @@ async fn process_net_result( } } } + trace!("process_net_result break loop"); + dbtx.close(); + trace!("process_net_result dbtx closed"); + for (i, jh) in ioc_search_index_worker_jhs.into_iter().enumerate() { + jh.await?; + trace!("process_net_result search index worker {i} awaited"); + } Ok(()) } diff --git a/netfetch/src/ca/findioc.rs b/netfetch/src/ca/findioc.rs index d9fec45..5ee8817 100644 --- a/netfetch/src/ca/findioc.rs +++ b/netfetch/src/ca/findioc.rs @@ -104,7 +104,8 @@ pub struct FindIocStream { bids_timed_out: BTreeMap, sids_done: BTreeMap, result_for_done_sid_count: u64, - sleeper: Pin + Send>>, + sleep_count: u8, + sleeper: Option + Send>>>, #[allow(unused)] thr_msg_0: ThrottleTrace, #[allow(unused)] @@ -145,7 +146,8 @@ impl FindIocStream { in_flight_max, channels_per_batch: batch_size, batch_run_max, - sleeper: Box::pin(tokio::time::sleep(Duration::from_millis(500))), + sleep_count: 0, + sleeper: Some(Box::pin(tokio::time::sleep(Duration::from_millis(500)))), thr_msg_0: ThrottleTrace::new(Duration::from_millis(1000)), thr_msg_1: ThrottleTrace::new(Duration::from_millis(1000)), thr_msg_2: ThrottleTrace::new(Duration::from_millis(1000)), @@ -155,7 +157,8 @@ impl FindIocStream { pub fn quick_state(&self) -> String { format!( - "channels_input {} in_flight {} bid_by_sid {} out_queue {} result_for_done_sid_count {} bids_timed_out {}", + "channels_input {} {} in_flight {} bid_by_sid {} out_queue {} result_for_done_sid_count {} bids_timed_out {}", + self.channels_input.is_closed(), self.channels_input.len(), self.in_flight.len(), self.bid_by_sid.len(), @@ -562,7 +565,10 @@ impl FindIocStream { } fn ready_for_end_of_stream(&self) -> bool { - self.channels_input.is_closed() && self.in_flight.is_empty() && self.out_queue.is_empty() + self.channels_input.is_closed() + && self.channels_input.is_empty() + && self.in_flight.is_empty() + && self.out_queue.is_empty() } } @@ -571,6 +577,9 @@ impl Stream for FindIocStream { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; + if self.channels_input.is_closed() { + debug!("{}", self.quick_state()); + } // self.thr_msg_0.trigger("FindIocStream::poll_next", &[]); match self.ping.poll_unpin(cx) { Ready(_) => { @@ -582,7 +591,7 @@ impl Stream for FindIocStream { self.clear_timed_out(); loop { let mut have_progress = false; - if self.out_queue.is_empty() == false { + if !self.out_queue.is_empty() { let ret = std::mem::replace(&mut self.out_queue, VecDeque::new()); break Ready(Some(Ok(ret))); } @@ -675,39 +684,69 @@ impl Stream for FindIocStream { } } } + if self.ready_for_end_of_stream() { + // debug!("ready_for_end_of_stream but in late part"); + } break match self.afd.poll_read_ready(cx) { - Ready(Ok(mut g)) => match unsafe { Self::try_read(self.sock.0, &self.stats) } { - Ready(Ok((src, res))) => { - self.handle_result(src, res); - continue; + Ready(Ok(mut g)) => { + debug!("BLOCK AA"); + match unsafe { Self::try_read(self.sock.0, &self.stats) } { + Ready(Ok((src, res))) => { + self.handle_result(src, res); + if self.ready_for_end_of_stream() { + debug!("ready_for_end_of_stream continue after handle_result"); + } + continue; + } + Ready(Err(e)) => { + error!("Error from try_read {e:?}"); + Ready(Some(Err(e))) + } + Pending => { + g.clear_ready(); + if self.ready_for_end_of_stream() { + debug!("ready_for_end_of_stream continue after clear_ready"); + } + continue; + } } - Ready(Err(e)) => { - error!("Error from try_read {e:?}"); - Ready(Some(Err(e))) - } - Pending => { - g.clear_ready(); - continue; - } - }, + } Ready(Err(e)) => { let e = Error::with_msg_no_trace(format!("{e:?}")); error!("poll_read_ready {e:?}"); Ready(Some(Err(e))) } Pending => { + // debug!("BLOCK BB"); if have_progress { + if self.ready_for_end_of_stream() { + debug!("ready_for_end_of_stream continue after progress"); + } continue; } else { + // debug!("BLOCK BC"); if self.ready_for_end_of_stream() { - match self.sleeper.poll_unpin(cx) { - Ready(_) => { - self.sleeper = Box::pin(tokio::time::sleep(Duration::from_millis(500))); - continue; + // debug!("BLOCK BD"); + if let Some(fut) = self.sleeper.as_mut() { + match fut.poll_unpin(cx) { + Ready(()) => { + if self.sleep_count < 0 { + self.sleeper = + Some(Box::pin(tokio::time::sleep(Duration::from_millis(100)))); + self.sleep_count += 1; + } else { + self.sleeper = None; + } + continue; + } + Pending => Pending, } - Pending => Pending, + } else { + // debug!("BLOCK DONE"); + Ready(None) } } else { + // debug!("BLOCK BE"); Pending } } diff --git a/netfetch/src/ca/search.rs b/netfetch/src/ca/search.rs index af1782a..f34e0da 100644 --- a/netfetch/src/ca/search.rs +++ b/netfetch/src/ca/search.rs @@ -179,7 +179,6 @@ pub async fn ca_search_workers_start( Sender, Receiver, Error>>, JoinHandle>, - Vec>>, ), Error, > { @@ -189,8 +188,7 @@ pub async fn ca_search_workers_start( let (out_tx, out_rx) = async_channel::bounded(256); let finder = FindIocStream::new(inp_rx, search_tgts, blacklist, batch_run_max, 20, 16, stats); let jh = taskrun::spawn(finder_run(finder, out_tx)); - let jhs = Vec::new(); - Ok((inp_tx, out_rx, jh, jhs)) + Ok((inp_tx, out_rx, jh)) } async fn search_tgts_from_opts(opts: &CaIngestOpts) -> Result<(Vec, Vec), Error> { @@ -245,6 +243,6 @@ async fn finder_run(finder: FindIocStream, tx: Sender, @@ -23,11 +23,15 @@ pub struct CaIngestOpts { search_blacklist: Vec, whitelist: Option, blacklist: Option, - max_simul: Option, + #[allow(unused)] #[serde(default, with = "humantime_serde")] timeout: Option, postgresql: Database, - scylla: ScyllaConfig, + scylla: ScyllaIngestConfig, + #[serde(default)] + scylla_mt: Option, + #[serde(default)] + scylla_lt: Option, array_truncate: Option, insert_worker_count: Option, insert_worker_concurrency: Option, @@ -60,10 +64,18 @@ impl CaIngestOpts { &self.postgresql } - pub fn scylla_config(&self) -> &ScyllaConfig { + pub fn scylla_config(&self) -> &ScyllaIngestConfig { &self.scylla } + pub fn scylla_config_mt(&self) -> Option<&ScyllaIngestConfig> { + self.scylla_mt.as_ref() + } + + pub fn scylla_config_lt(&self) -> Option<&ScyllaIngestConfig> { + self.scylla_lt.as_ref() + } + pub fn search(&self) -> &Vec { &self.search } @@ -76,28 +88,28 @@ impl CaIngestOpts { Duration::from_millis(1200) } + pub fn insert_scylla_sessions(&self) -> usize { + self.insert_scylla_sessions.unwrap_or(1) + } + pub fn insert_worker_count(&self) -> usize { - self.insert_worker_count.unwrap_or(4) + self.insert_worker_count.unwrap_or(8) } pub fn insert_worker_concurrency(&self) -> usize { self.insert_worker_concurrency.unwrap_or(32) } - pub fn insert_scylla_sessions(&self) -> usize { - self.insert_scylla_sessions.unwrap_or(1) - } - pub fn array_truncate(&self) -> u64 { - self.array_truncate.unwrap_or(512) + self.array_truncate.unwrap_or(1024 * 64) } pub fn insert_item_queue_cap(&self) -> usize { - self.insert_item_queue_cap.unwrap_or(80000) + self.insert_item_queue_cap.unwrap_or(1000 * 1000) } pub fn store_workers_rate(&self) -> u64 { - self.store_workers_rate.unwrap_or(5000) + self.store_workers_rate.unwrap_or(1000 * 500) } pub fn insert_frac(&self) -> u64 { @@ -111,17 +123,19 @@ impl CaIngestOpts { pub fn ttl_index(&self) -> Duration { self.ttl_index .clone() - .unwrap_or_else(|| Duration::from_secs(60 * 60 * 24 * 3)) + .unwrap_or_else(|| Duration::from_secs(60 * 60 * 24 * 50)) } pub fn ttl_d0(&self) -> Duration { self.ttl_d0 .clone() - .unwrap_or_else(|| Duration::from_secs(60 * 60 * 24 * 1)) + .unwrap_or_else(|| Duration::from_secs(60 * 60 * 24 * 40)) } pub fn ttl_d1(&self) -> Duration { - self.ttl_d1.clone().unwrap_or_else(|| Duration::from_secs(60 * 60 * 12)) + self.ttl_d1 + .clone() + .unwrap_or_else(|| Duration::from_secs(60 * 60 * 24 * 10)) } pub fn ttl_binned(&self) -> Duration { @@ -158,7 +172,7 @@ scylla: assert_eq!(conf.channels, Some(PathBuf::from("/some/path/file.txt"))); assert_eq!(&conf.api_bind, "0.0.0.0:3011"); assert_eq!(conf.search.get(0), Some(&"172.26.0.255".to_string())); - assert_eq!(conf.scylla.hosts.get(1), Some(&"sf-nube-12:19042".to_string())); + assert_eq!(conf.scylla.hosts().get(1), Some(&"sf-nube-12:19042".to_string())); assert_eq!(conf.ttl_d1, Some(Duration::from_millis(1000 * (60 * 10 + 3) + 45))); assert_eq!(conf.ttl_binned, None); } @@ -427,7 +441,7 @@ mod serde_option_channel_read_config { } } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub enum ChannelReadConfig { Monitor, Poll(Duration), @@ -455,6 +469,10 @@ CH-03: archiving_configuration: "###; let x: BTreeMap = serde_yaml::from_str(inp).unwrap(); + assert_eq!( + x.get("CH-00").as_ref().unwrap().archiving_configuration.medium_term, + Some(ChannelReadConfig::Poll(Duration::from_millis(1000 * 60))) + ); } #[derive(Debug)] diff --git a/scywr/Cargo.toml b/scywr/Cargo.toml index c796fff..0a08b56 100644 --- a/scywr/Cargo.toml +++ b/scywr/Cargo.toml @@ -11,6 +11,7 @@ scylla = "0.11.0" smallvec = "1.11.0" pin-project = "1.1.3" stackfuture = "0.3.0" +serde = { version = "1", features = ["derive"] } log = { path = "../log" } stats = { path = "../stats" } series = { path = "../series" } diff --git a/scywr/src/config.rs b/scywr/src/config.rs index 8b13789..13dfe4f 100644 --- a/scywr/src/config.rs +++ b/scywr/src/config.rs @@ -1 +1,29 @@ +use serde::Deserialize; +#[derive(Debug, Clone, Deserialize)] +pub struct ScyllaIngestConfig { + hosts: Vec, + keyspace: String, +} + +impl ScyllaIngestConfig { + pub fn new(hosts: I, ks: K) -> Self + where + I: IntoIterator, + H: Into, + K: Into, + { + Self { + hosts: hosts.into_iter().map(Into::into).collect(), + keyspace: ks.into(), + } + } + + pub fn hosts(&self) -> &Vec { + &self.hosts + } + + pub fn keyspace(&self) -> &String { + &self.keyspace + } +} diff --git a/scywr/src/insertworker.rs b/scywr/src/insertworker.rs index 112c4b1..7ea2998 100644 --- a/scywr/src/insertworker.rs +++ b/scywr/src/insertworker.rs @@ -1,3 +1,4 @@ +use crate::config::ScyllaIngestConfig; use crate::iteminsertqueue::insert_channel_status; use crate::iteminsertqueue::insert_channel_status_fut; use crate::iteminsertqueue::insert_connection_status; @@ -6,26 +7,20 @@ use crate::iteminsertqueue::insert_item; use crate::iteminsertqueue::insert_item_fut; use crate::iteminsertqueue::insert_msp_fut; use crate::iteminsertqueue::Accounting; -use crate::iteminsertqueue::ConnectionStatusItem; use crate::iteminsertqueue::InsertFut; use crate::iteminsertqueue::InsertItem; use crate::iteminsertqueue::QueryItem; use crate::iteminsertqueue::TimeBinSimpleF32; use crate::store::DataStore; use async_channel::Receiver; -use async_channel::Sender; use err::Error; -use futures_util::Future; -use futures_util::TryFutureExt; use log::*; use netpod::timeunits::MS; use netpod::timeunits::SEC; -use netpod::ScyllaConfig; use smallvec::smallvec; use smallvec::SmallVec; use stats::InsertWorkerStats; use std::collections::VecDeque; -use std::pin::Pin; use std::sync::atomic; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; @@ -104,7 +99,7 @@ pub struct InsertWorkerOpts { } pub async fn spawn_scylla_insert_workers( - scyconf: ScyllaConfig, + scyconf: ScyllaIngestConfig, insert_scylla_sessions: usize, insert_worker_count: usize, insert_worker_concurrency: usize, @@ -112,7 +107,6 @@ pub async fn spawn_scylla_insert_workers( insert_worker_opts: Arc, store_stats: Arc, use_rate_limit_queue: bool, - ttls: Ttls, ) -> Result>>, Error> { let item_inp = if use_rate_limit_queue { crate::ratelimit::rate_limiter(insert_worker_opts.store_workers_rate.clone(), item_inp) @@ -140,7 +134,6 @@ pub async fn spawn_scylla_insert_workers( worker_ix, insert_worker_concurrency, item_inp.clone(), - ttls.clone(), insert_worker_opts.clone(), data_store, store_stats.clone(), @@ -233,7 +226,8 @@ async fn worker( item.emd, ttls.index.as_secs() as i32, ); - let qres = data_store.scy.execute(&data_store.qu_insert_muted, values).await; + let qu = err::todoval(); + let qres = data_store.scy.execute(&qu, values).await; match qres { Ok(_) => { stats.inserted_mute().inc(); @@ -254,10 +248,8 @@ async fn worker( item.emd, ttls.index.as_secs() as i32, ); - let qres = data_store - .scy - .execute(&data_store.qu_insert_item_recv_ivl, values) - .await; + let qu = err::todoval(); + let qres = data_store.scy.execute(&qu, values).await; match qres { Ok(_) => { stats.inserted_interval().inc(); @@ -279,7 +271,8 @@ async fn worker( item.evsize as i32, ttls.index.as_secs() as i32, ); - let qres = data_store.scy.execute(&data_store.qu_insert_channel_ping, params).await; + let qu = err::todoval(); + let qres = data_store.scy.execute(&qu, params).await; match qres { Ok(_) => { stats.inserted_channel_info().inc(); @@ -310,7 +303,6 @@ async fn worker_streamed( worker_ix: usize, concurrency: usize, item_inp: Receiver>, - ttls: Ttls, insert_worker_opts: Arc, data_store: Arc, stats: Arc, @@ -333,22 +325,20 @@ async fn worker_streamed( let mut res = Vec::with_capacity(32); for item in batch { let futs = match item { - QueryItem::Insert(item) => prepare_query_insert_futs(item, &ttls, &data_store, &stats, tsnow_u64), + QueryItem::Insert(item) => prepare_query_insert_futs(item, &data_store, &stats, tsnow_u64), QueryItem::ConnectionStatus(item) => { stats.inserted_connection_status().inc(); - let fut = insert_connection_status_fut(item, &ttls, &data_store, stats.clone()); + let fut = insert_connection_status_fut(item, &data_store, stats.clone()); smallvec![fut] } QueryItem::ChannelStatus(item) => { stats.inserted_channel_status().inc(); - insert_channel_status_fut(item, &ttls, &data_store, stats.clone()) + insert_channel_status_fut(item, &data_store, stats.clone()) } QueryItem::TimeBinSimpleF32(item) => { - prepare_timebin_insert_futs(item, &ttls, &data_store, &stats, tsnow_u64) - } - QueryItem::Accounting(item) => { - prepare_accounting_insert_futs(item, &ttls, &data_store, &stats, tsnow_u64) + prepare_timebin_insert_futs(item, &data_store, &stats, tsnow_u64) } + QueryItem::Accounting(item) => prepare_accounting_insert_futs(item, &data_store, &stats, tsnow_u64), _ => { // TODO debug!("TODO insert item {item:?}"); @@ -397,7 +387,6 @@ async fn worker_streamed( fn prepare_query_insert_futs( item: InsertItem, - ttls: &Ttls, data_store: &Arc, stats: &Arc, tsnow_u64: u64, @@ -414,7 +403,7 @@ fn prepare_query_insert_futs( // TODO if true || item_ts_local & 0x3f00000 < 0x0600000 { - let fut = insert_item_fut(item, &ttls, &data_store, do_insert, stats); + let fut = insert_item_fut(item, &data_store, do_insert, stats); futs.push(fut); if msp_bump { stats.inserts_msp().inc(); @@ -422,7 +411,6 @@ fn prepare_query_insert_futs( series, ts_msp, item_ts_local, - ttls, data_store.scy.clone(), data_store.qu_insert_ts_msp.clone(), stats.clone(), @@ -453,7 +441,6 @@ fn prepare_query_insert_futs( fn prepare_timebin_insert_futs( item: TimeBinSimpleF32, - ttls: &Ttls, data_store: &Arc, stats: &Arc, tsnow_u64: u64, @@ -468,7 +455,6 @@ fn prepare_timebin_insert_futs( item.min, item.max, item.avg, - ttls.binned.as_secs() as i32, ); // TODO would be better to count inserts only on completed insert stats.inserted_binned().inc(); @@ -497,19 +483,11 @@ fn prepare_timebin_insert_futs( fn prepare_accounting_insert_futs( item: Accounting, - ttls: &Ttls, data_store: &Arc, stats: &Arc, tsnow_u64: u64, ) -> SmallVec<[InsertFut; 4]> { - let params = ( - item.part, - item.ts, - item.series.id() as i64, - item.count, - item.bytes, - ttls.binned.as_secs() as i32, - ); + let params = (item.part, item.ts, item.series.id() as i64, item.count, item.bytes); let fut = InsertFut::new( data_store.scy.clone(), data_store.qu_account_00.clone(), diff --git a/scywr/src/iteminsertqueue.rs b/scywr/src/iteminsertqueue.rs index 45c87ec..2d02ae4 100644 --- a/scywr/src/iteminsertqueue.rs +++ b/scywr/src/iteminsertqueue.rs @@ -370,7 +370,6 @@ struct InsParCom { ts_lsp: u64, ts_local: u64, pulse: u64, - ttl: u32, do_insert: bool, stats: Arc, } @@ -385,7 +384,6 @@ where par.ts_lsp as i64, par.pulse as i64, val, - par.ttl as i32, ); InsertFut::new(scy, qu, params, par.ts_local, par.stats) } @@ -400,7 +398,6 @@ where par.ts_lsp as i64, par.pulse as i64, val, - par.ttl as i32, ); InsertFut::new(scy, qu, params, par.ts_local, par.stats) } @@ -470,7 +467,6 @@ where par.ts_lsp as i64, par.pulse as i64, val, - par.ttl as i32, ); if par.do_insert { let y = data_store.scy.execute(qu, params).await; @@ -507,7 +503,6 @@ where par.ts_lsp as i64, par.pulse as i64, val, - par.ttl as i32, ); let y = data_store.scy.execute(qu, params).await; match y { @@ -563,7 +558,6 @@ pub async fn insert_item( ts_lsp: item.ts_lsp, ts_local: item.ts_local, pulse: item.pulse, - ttl: ttls.d0.as_secs() as _, do_insert, stats: stats.clone(), }; @@ -586,7 +580,6 @@ pub async fn insert_item( ts_lsp: item.ts_lsp, ts_local: item.ts_local, pulse: item.pulse, - ttl: ttls.d1.as_secs() as _, do_insert, stats: stats.clone(), }; @@ -610,18 +603,16 @@ pub fn insert_msp_fut( ts_msp: u64, // for stats, the timestamp when we received that data tsnet: u64, - ttls: &Ttls, scy: Arc, qu: Arc, stats: Arc, ) -> InsertFut { - let params = (series.id() as i64, ts_msp as i64, ttls.index.as_secs() as i32); + let params = (series.id() as i64, ts_msp as i64); InsertFut::new(scy, qu, params, tsnet, stats) } pub fn insert_item_fut( item: InsertItem, - ttls: &Ttls, data_store: &DataStore, do_insert: bool, stats: &Arc, @@ -636,7 +627,6 @@ pub fn insert_item_fut( ts_lsp: item.ts_lsp, ts_local: item.ts_local, pulse: item.pulse, - ttl: ttls.d0.as_secs() as _, do_insert, stats: stats.clone(), }; @@ -659,7 +649,6 @@ pub fn insert_item_fut( ts_lsp: item.ts_lsp, ts_local: item.ts_local, pulse: item.pulse, - ttl: ttls.d1.as_secs() as _, do_insert, stats: stats.clone(), }; @@ -678,7 +667,6 @@ pub fn insert_item_fut( pub fn insert_connection_status_fut( item: ConnectionStatusItem, - ttls: &Ttls, data_store: &DataStore, stats: Arc, ) -> InsertFut { @@ -692,13 +680,7 @@ pub fn insert_connection_status_fut( let tsnet = ts; let kind = item.status.to_kind(); let addr = format!("{}", item.addr); - let params = ( - ts_msp as i64, - ts_lsp as i64, - kind as i32, - addr, - ttls.index.as_secs() as i32, - ); + let params = (ts_msp as i64, ts_lsp as i64, kind as i32, addr); InsertFut::new( data_store.scy.clone(), data_store.qu_insert_connection_status.clone(), @@ -710,7 +692,6 @@ pub fn insert_connection_status_fut( pub fn insert_channel_status_fut( item: ChannelStatusItem, - ttls: &Ttls, data_store: &DataStore, stats: Arc, ) -> SmallVec<[InsertFut; 4]> { @@ -723,13 +704,7 @@ pub fn insert_channel_status_fut( let tsnet = ts; let kind = item.status.to_kind(); let cssid = item.cssid.id(); - let params = ( - cssid as i64, - ts_msp as i64, - ts_lsp as i64, - kind as i32, - ttls.index.as_secs() as i32, - ); + let params = (cssid as i64, ts_msp as i64, ts_lsp as i64, kind as i32); let fut1 = InsertFut::new( data_store.scy.clone(), data_store.qu_insert_channel_status.clone(), @@ -737,13 +712,7 @@ pub fn insert_channel_status_fut( tsnet, stats.clone(), ); - let params = ( - ts_msp as i64, - ts_lsp as i64, - cssid as i64, - kind as i32, - ttls.index.as_secs() as i32, - ); + let params = (ts_msp as i64, ts_lsp as i64, cssid as i64, kind as i32); let fut2 = InsertFut::new( data_store.scy.clone(), data_store.qu_insert_channel_status_by_ts_msp.clone(), diff --git a/scywr/src/schema.rs b/scywr/src/schema.rs index 8314f93..85868b8 100644 --- a/scywr/src/schema.rs +++ b/scywr/src/schema.rs @@ -1,10 +1,11 @@ +use crate::config::ScyllaIngestConfig; use crate::session::create_session_no_ks; use crate::session::ScySession; use err::thiserror; use err::ThisError; use futures_util::StreamExt; use log::*; -use netpod::ScyllaConfig; +use netpod::ttl::RetentionTime; use scylla::transport::errors::DbError; use scylla::transport::errors::QueryError; use std::fmt; @@ -74,19 +75,22 @@ pub async fn check_table_readable(name: &str, scy: &ScySession) -> Result Result<(), Error> { +pub async fn create_table_ts_msp(table_name: &str, scy: &ScySession) -> Result<(), Error> { use std::fmt::Write; // seconds: let default_time_to_live = 60 * 60 * 5; // hours: let twcs_window_index = 24 * 4; let mut s = String::new(); - s.write_str("create table ts_msp (series bigint, ts_msp bigint, primary key (series, ts_msp))")?; + s.write_str("create table ")?; + s.write_str(table_name)?; + s.write_str(" (series bigint, ts_msp bigint, primary key (series, ts_msp))")?; write!(s, " with default_time_to_live = {}", default_time_to_live)?; s.write_str(" and compaction = { 'class': 'TimeWindowCompactionStrategy'")?; s.write_str(", 'compaction_window_unit': 'HOURS'")?; write!(s, ", 'compaction_window_size': {}", twcs_window_index)?; s.write_str(" }")?; + eprintln!("create table cql {s}"); scy.query(s, ()).await?; Ok(()) } @@ -114,7 +118,8 @@ impl GenTwcsTab { // cql: "(part int, ts_msp int, shape_kind int, scalar_type int, series bigint, primary key ((part, ts_msp, shape_kind, scalar_type), series))".into(), // default_time_to_live: 60 * 60 * 5, // compaction_window_size: 24 * 4, - pub fn new<'a, N, CI, A, B, I2, I2A, I3, I3A>( + pub fn new<'a, PRE, N, CI, A, B, I2, I2A, I3, I3A>( + pre: PRE, name: N, cols: CI, partition_keys: I2, @@ -123,7 +128,8 @@ impl GenTwcsTab { compaction_window_size: Duration, ) -> Self where - N: Into, + PRE: AsRef, + N: AsRef, CI: IntoIterator, // TODO could make for idiomatic to skip extra clone if passed value is already String A: AsRef + 'a, @@ -132,6 +138,35 @@ impl GenTwcsTab { I3: IntoIterator, I2A: Into, I3A: Into, + { + Self::new_inner( + pre.as_ref(), + name.as_ref(), + cols, + partition_keys, + cluster_keys, + default_time_to_live, + compaction_window_size, + ) + } + + fn new_inner<'a, CI, A, B, I2, I2A, I3, I3A>( + pre: &str, + name: &str, + cols: CI, + partition_keys: I2, + cluster_keys: I3, + default_time_to_live: Duration, + compaction_window_size: Duration, + ) -> Self + where + CI: IntoIterator, + A: AsRef + 'a, + B: AsRef + 'a, + I2: IntoIterator, + I3: IntoIterator, + I2A: Into, + I3A: Into, { let mut col_names = Vec::new(); let mut col_types = Vec::new(); @@ -140,7 +175,7 @@ impl GenTwcsTab { col_types.push(b.as_ref().into()); }); Self { - name: name.into(), + name: format!("{}{}", pre, name), col_names, col_types, partition_keys: partition_keys.into_iter().map(Into::into).collect(), @@ -200,8 +235,8 @@ impl GenTwcsTab { // TODO check for more details (all columns, correct types, correct kinds, etc) if !has_table(self.name(), scy).await? { let cql = self.cql(); - info!("CREATE CQL: {cql}"); - scy.query(self.cql(), ()).await?; + info!("scylla create table {} {}", self.name(), cql); + scy.query(cql, ()).await?; } Ok(()) } @@ -235,6 +270,7 @@ fn table_param_compaction_twcs(compaction_window_size: Duration) -> String { } struct EvTabDim0 { + pre: String, sty: String, cqlsty: String, // SCYLLA_TTL_EVENTS_DIM0 @@ -245,7 +281,7 @@ struct EvTabDim0 { impl EvTabDim0 { fn name(&self) -> String { - format!("events_scalar_{}", self.sty) + format!("{}events_scalar_{}", self.pre, self.sty) } fn cql_create(&self) -> String { @@ -262,6 +298,7 @@ impl EvTabDim0 { } struct EvTabDim1 { + pre: String, sty: String, cqlsty: String, // SCYLLA_TTL_EVENTS_DIM1 @@ -272,7 +309,7 @@ struct EvTabDim1 { impl EvTabDim1 { fn name(&self) -> String { - format!("events_array_{}", self.sty) + format!("{}events_array_{}", self.pre, self.sty) } fn cql_create(&self) -> String { @@ -306,7 +343,7 @@ async fn get_columns(keyspace: &str, table: &str, scy: &ScySession) -> Result Result<(), Error> { +async fn check_event_tables(rett: RetentionTime, scy: &ScySession) -> Result<(), Error> { let stys = [ "u8", "u16", "u32", "u64", "i8", "i16", "i32", "i64", "f32", "f64", "bool", "string", ]; @@ -316,6 +353,7 @@ async fn check_event_tables(scy: &ScySession) -> Result<(), Error> { ]; for (sty, cqlsty) in stys.into_iter().zip(cqlstys) { let desc = EvTabDim0 { + pre: rett.table_prefix().into(), sty: sty.into(), cqlsty: cqlsty.into(), // ttl is set in actual data inserts @@ -323,9 +361,11 @@ async fn check_event_tables(scy: &ScySession) -> Result<(), Error> { compaction_window_size: dhours(48), }; if !has_table(&desc.name(), scy).await? { + info!("scylla create table {}", desc.name()); scy.query(desc.cql_create(), ()).await?; } let desc = EvTabDim1 { + pre: rett.table_prefix().into(), sty: sty.into(), cqlsty: format!("frozen>", cqlsty), // ttl is set in actual data inserts @@ -333,18 +373,19 @@ async fn check_event_tables(scy: &ScySession) -> Result<(), Error> { compaction_window_size: dhours(12), }; if !has_table(&desc.name(), scy).await? { + info!("scylla create table {}", desc.name()); scy.query(desc.cql_create(), ()).await?; } } Ok(()) } -pub async fn migrate_scylla_data_schema(scyconf: &ScyllaConfig) -> Result<(), Error> { +pub async fn migrate_scylla_data_schema(scyconf: &ScyllaIngestConfig, rett: RetentionTime) -> Result<(), Error> { let scy2 = create_session_no_ks(scyconf).await?; let scy = &scy2; - if !has_keyspace(&scyconf.keyspace, scy).await? { - let replication = 2; + if !has_keyspace(scyconf.keyspace(), scy).await? { + let replication = 3; let durable = false; let cql = format!( concat!( @@ -352,26 +393,28 @@ pub async fn migrate_scylla_data_schema(scyconf: &ScyllaConfig) -> Result<(), Er " with replication = {{ 'class': 'SimpleStrategy', 'replication_factor': {} }}", " and durable_writes = {};" ), - scyconf.keyspace, replication, durable + scyconf.keyspace(), + replication, + durable ); + info!("scylla create keyspace {cql}"); scy.query_iter(cql, ()).await?; info!("keyspace created"); } - scy.use_keyspace(&scyconf.keyspace, true).await?; + scy.use_keyspace(scyconf.keyspace(), true).await?; - if !has_table("ts_msp", &scy).await? { - create_table_ts_msp(scy).await?; + { + let table_name = format!("{}ts_msp", rett.table_prefix()); + if !has_table(&table_name, &scy).await? { + create_table_ts_msp(&table_name, scy).await?; + } } + check_event_tables(rett.clone(), scy).await?; - check_event_tables(scy).await?; - - if false { - info!("early abort schema"); - return Ok(()); - } { let tab = GenTwcsTab::new( + rett.table_prefix(), "series_by_ts_msp", &[ ("part", "int"), @@ -389,6 +432,7 @@ pub async fn migrate_scylla_data_schema(scyconf: &ScyllaConfig) -> Result<(), Er } { let tab = GenTwcsTab::new( + rett.table_prefix(), "connection_status", &[ ("ts_msp", "bigint"), @@ -405,6 +449,7 @@ pub async fn migrate_scylla_data_schema(scyconf: &ScyllaConfig) -> Result<(), Er } { let tab = GenTwcsTab::new( + rett.table_prefix(), "channel_status", &[ ("series", "bigint"), @@ -421,6 +466,7 @@ pub async fn migrate_scylla_data_schema(scyconf: &ScyllaConfig) -> Result<(), Er } { let tab = GenTwcsTab::new( + rett.table_prefix(), "channel_status_by_ts_msp", &[ ("ts_msp", "bigint"), @@ -437,58 +483,7 @@ pub async fn migrate_scylla_data_schema(scyconf: &ScyllaConfig) -> Result<(), Er } { let tab = GenTwcsTab::new( - "channel_ping", - &[ - ("part", "int"), - ("ts_msp", "int"), - ("series", "bigint"), - ("ivl", "float"), - ("interest", "float"), - ("evsize", "int"), - ], - ["part", "ts_msp"], - ["series"], - dhours(1), - ddays(4), - ); - tab.create_if_missing(scy).await?; - } - { - let tab = GenTwcsTab::new( - "muted", - &[ - ("part", "int"), - ("series", "bigint"), - ("ts", "bigint"), - ("ema", "float"), - ("emd", "float"), - ], - ["part"], - ["series", "ts"], - dhours(4), - ddays(1), - ); - tab.create_if_missing(scy).await?; - } - { - let tab = GenTwcsTab::new( - "item_recv_ivl", - &[ - ("part", "int"), - ("series", "bigint"), - ("ts", "bigint"), - ("ema", "float"), - ("emd", "float"), - ], - ["part"], - ["series", "ts"], - dhours(4), - ddays(1), - ); - tab.create_if_missing(scy).await?; - } - { - let tab = GenTwcsTab::new( + rett.table_prefix(), "binned_scalar_f32", &[ ("series", "bigint"), @@ -509,6 +504,7 @@ pub async fn migrate_scylla_data_schema(scyconf: &ScyllaConfig) -> Result<(), Er } { let tab = GenTwcsTab::new( + rett.table_prefix(), "account_00", &[ ("part", "int"), diff --git a/scywr/src/session.rs b/scywr/src/session.rs index f60e28e..cefb966 100644 --- a/scywr/src/session.rs +++ b/scywr/src/session.rs @@ -1,7 +1,7 @@ -pub use netpod::ScyllaConfig; pub use scylla::Session; pub use Session as ScySession; +use crate::config::ScyllaIngestConfig; use err::thiserror; use err::ThisError; use scylla::execution_profile::ExecutionProfileBuilder; @@ -21,7 +21,7 @@ impl From for Error { } } -pub async fn create_session_no_ks(scyconf: &ScyllaConfig) -> Result, Error> { +pub async fn create_session_no_ks(scyconf: &ScyllaIngestConfig) -> Result, Error> { let profile = ExecutionProfileBuilder::default() .consistency(Consistency::LocalOne) .build() @@ -30,7 +30,7 @@ pub async fn create_session_no_ks(scyconf: &ScyllaConfig) -> Result .pool_size(scylla::transport::session::PoolSize::PerShard( NonZeroUsize::new(1).unwrap(), )) - .known_nodes(&scyconf.hosts) + .known_nodes(scyconf.hosts()) .default_execution_profile_handle(profile) .write_coalescing(true) .build() @@ -39,9 +39,9 @@ pub async fn create_session_no_ks(scyconf: &ScyllaConfig) -> Result Ok(scy) } -pub async fn create_session(scyconf: &ScyllaConfig) -> Result, Error> { +pub async fn create_session(scyconf: &ScyllaIngestConfig) -> Result, Error> { let scy = create_session_no_ks(scyconf).await?; - scy.use_keyspace(&scyconf.keyspace, true) + scy.use_keyspace(scyconf.keyspace(), true) .await .map_err(|e| Error::NewSession(e.to_string()))?; Ok(scy) diff --git a/scywr/src/store.rs b/scywr/src/store.rs index ce7f75b..df2e53f 100644 --- a/scywr/src/store.rs +++ b/scywr/src/store.rs @@ -1,7 +1,7 @@ +use crate::config::ScyllaIngestConfig; use crate::session::create_session; use err::thiserror; use err::ThisError; -use netpod::ScyllaConfig; use scylla::prepared_statement::PreparedStatement; use scylla::transport::errors::NewSessionError; use scylla::transport::errors::QueryError; @@ -34,139 +34,103 @@ pub struct DataStore { pub qu_insert_array_f32: Arc, pub qu_insert_array_f64: Arc, pub qu_insert_array_bool: Arc, - pub qu_insert_muted: Arc, - pub qu_insert_item_recv_ivl: Arc, pub qu_insert_connection_status: Arc, pub qu_insert_channel_status: Arc, pub qu_insert_channel_status_by_ts_msp: Arc, - pub qu_insert_channel_ping: Arc, pub qu_insert_binned_scalar_f32_v02: Arc, pub qu_account_00: Arc, } impl DataStore { - pub async fn new(scyconf: &ScyllaConfig) -> Result { + pub async fn new(scyconf: &ScyllaIngestConfig) -> Result { let scy = create_session(scyconf).await.map_err(|_| Error::NewSession)?; - let q = scy - .prepare("insert into ts_msp (series, ts_msp) values (?, ?) using ttl ?") - .await?; + let q = scy.prepare("insert into ts_msp (series, ts_msp) values (?, ?)").await?; let qu_insert_ts_msp = Arc::new(q); - let cql = "insert into series_by_ts_msp (part, ts_msp, shape_kind, scalar_type, series) values (?, ?, ?, ?, ?) using ttl ?"; + let cql = "insert into series_by_ts_msp (part, ts_msp, shape_kind, scalar_type, series) values (?, ?, ?, ?, ?)"; let q = scy.prepare(cql).await?; let qu_insert_series_by_ts_msp = Arc::new(q); // scalar: - let cql = - "insert into events_scalar_i8 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?"; + let cql = "insert into events_scalar_i8 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)"; let q = scy.prepare(cql).await?; let qu_insert_scalar_i8 = Arc::new(q); - let cql = - "insert into events_scalar_i16 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?"; + let cql = "insert into events_scalar_i16 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)"; let q = scy.prepare(cql).await?; let qu_insert_scalar_i16 = Arc::new(q); - let cql = - "insert into events_scalar_i32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?"; + let cql = "insert into events_scalar_i32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)"; let q = scy.prepare(cql).await?; let qu_insert_scalar_i32 = Arc::new(q); - let cql = - "insert into events_scalar_i64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?"; + let cql = "insert into events_scalar_i64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)"; let q = scy.prepare(cql).await?; let qu_insert_scalar_i64 = Arc::new(q); - let cql = - "insert into events_scalar_f32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?"; + let cql = "insert into events_scalar_f32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)"; let q = scy.prepare(cql).await?; let qu_insert_scalar_f32 = Arc::new(q); - let cql = - "insert into events_scalar_f64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?"; + let cql = "insert into events_scalar_f64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)"; let q = scy.prepare(cql).await?; let qu_insert_scalar_f64 = Arc::new(q); - let cql = - "insert into events_scalar_bool (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?"; + let cql = "insert into events_scalar_bool (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)"; let q = scy.prepare(cql).await?; let qu_insert_scalar_bool = Arc::new(q); - let cql="insert into events_scalar_string (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?"; + let cql = "insert into events_scalar_string (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)"; let q = scy.prepare(cql).await?; let qu_insert_scalar_string = Arc::new(q); // array - let cql = - "insert into events_array_i8 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?"; + let cql = "insert into events_array_i8 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)"; let q = scy.prepare(cql).await?; let qu_insert_array_i8 = Arc::new(q); - let cql = - "insert into events_array_i16 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?"; + let cql = "insert into events_array_i16 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)"; let q = scy.prepare(cql).await?; let qu_insert_array_i16 = Arc::new(q); - let cql = - "insert into events_array_i32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?"; + let cql = "insert into events_array_i32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)"; let q = scy.prepare(cql).await?; let qu_insert_array_i32 = Arc::new(q); - let cql = - "insert into events_array_i64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?"; + let cql = "insert into events_array_i64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)"; let q = scy.prepare(cql).await?; let qu_insert_array_i64 = Arc::new(q); - let cql = - "insert into events_array_f32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?"; + let cql = "insert into events_array_f32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)"; let q = scy.prepare(cql).await?; let qu_insert_array_f32 = Arc::new(q); - let cql = - "insert into events_array_f64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?"; + let cql = "insert into events_array_f64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)"; let q = scy.prepare(cql).await?; let qu_insert_array_f64 = Arc::new(q); - let cql = - "insert into events_array_bool (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?) using ttl ?"; + let cql = "insert into events_array_bool (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)"; let q = scy.prepare(cql).await?; let qu_insert_array_bool = Arc::new(q); - // Others: - let cql = "insert into muted (part, series, ts, ema, emd) values (?, ?, ?, ?, ?) using ttl ?"; - let q = scy.prepare(cql).await?; - let qu_insert_muted = Arc::new(q); - - let cql = "insert into item_recv_ivl (part, series, ts, ema, emd) values (?, ?, ?, ?, ?) using ttl ?"; - let q = scy.prepare(cql).await?; - let qu_insert_item_recv_ivl = Arc::new(q); - // Connection status: - let cql = "insert into connection_status (ts_msp, ts_lsp, kind, addr) values (?, ?, ?, ?) using ttl ?"; + let cql = "insert into connection_status (ts_msp, ts_lsp, kind, addr) values (?, ?, ?, ?)"; let q = scy.prepare(cql).await?; let qu_insert_connection_status = Arc::new(q); - let cql = "insert into channel_status (series, ts_msp, ts_lsp, kind) values (?, ?, ?, ?) using ttl ?"; + let cql = "insert into channel_status (series, ts_msp, ts_lsp, kind) values (?, ?, ?, ?)"; let q = scy.prepare(cql).await?; let qu_insert_channel_status = Arc::new(q); - let cql = "insert into channel_status_by_ts_msp (ts_msp, ts_lsp, series, kind) values (?, ?, ?, ?) using ttl ?"; + let cql = "insert into channel_status_by_ts_msp (ts_msp, ts_lsp, series, kind) values (?, ?, ?, ?)"; let q = scy.prepare(cql).await?; let qu_insert_channel_status_by_ts_msp = Arc::new(q); - let cql = concat!( - "insert into channel_ping (", - "part, ts_msp, series, ivl, interest, evsize", - ") values (?, ?, ?, ?, ?, ?) using ttl ?" - ); - let q = scy.prepare(cql).await?; - let qu_insert_channel_ping = Arc::new(q); - let cql = concat!( "insert into binned_scalar_f32 (", "series, bin_len_ms, ts_msp, off, count, min, max, avg)", - " values (?, ?, ?, ?, ?, ?, ?, ?) using ttl ?" + " values (?, ?, ?, ?, ?, ?, ?, ?)" ); let q = scy.prepare(cql).await?; let qu_insert_binned_scalar_f32_v02 = Arc::new(q); @@ -174,7 +138,7 @@ impl DataStore { let cql = concat!( "insert into account_00", " (part, ts, series, count, bytes)", - " values (?, ?, ?, ?, ?) using ttl ?" + " values (?, ?, ?, ?, ?)" ); let q = scy.prepare(cql).await?; let qu_account_00 = Arc::new(q); @@ -198,12 +162,9 @@ impl DataStore { qu_insert_array_f32, qu_insert_array_f64, qu_insert_array_bool, - qu_insert_muted, - qu_insert_item_recv_ivl, qu_insert_connection_status, qu_insert_channel_status, qu_insert_channel_status_by_ts_msp, - qu_insert_channel_ping, qu_insert_binned_scalar_f32_v02, qu_account_00, }; diff --git a/scywr/src/tools.rs b/scywr/src/tools.rs index 7064dc4..9a4eaeb 100644 --- a/scywr/src/tools.rs +++ b/scywr/src/tools.rs @@ -1,6 +1,6 @@ +use crate::config::ScyllaIngestConfig; use crate::session::create_session; use log::*; -use netpod::ScyllaConfig; use scylla::transport::errors::NewSessionError; use scylla::transport::errors::QueryError; @@ -24,7 +24,7 @@ impl From for Error { } } -pub async fn list_pkey(scylla_conf: &ScyllaConfig) -> Result<(), Error> { +pub async fn list_pkey(scylla_conf: &ScyllaIngestConfig) -> Result<(), Error> { let scy = create_session(scylla_conf) .await .map_err(|e| Error(err::Error::with_msg_no_trace(e.to_string())))?; @@ -62,7 +62,7 @@ pub async fn list_pkey(scylla_conf: &ScyllaConfig) -> Result<(), Error> { Ok(()) } -pub async fn list_pulses(scylla_conf: &ScyllaConfig) -> Result<(), Error> { +pub async fn list_pulses(scylla_conf: &ScyllaIngestConfig) -> Result<(), Error> { let scy = create_session(scylla_conf) .await .map_err(|e| Error(err::Error::with_msg_no_trace(e.to_string())))?; @@ -99,7 +99,7 @@ pub async fn list_pulses(scylla_conf: &ScyllaConfig) -> Result<(), Error> { Ok(()) } -pub async fn fetch_events(backend: &str, channel: &str, scylla_conf: &ScyllaConfig) -> Result<(), Error> { +pub async fn fetch_events(backend: &str, channel: &str, scylla_conf: &ScyllaIngestConfig) -> Result<(), Error> { // TODO use the keyspace from commandline. err::todo(); let scy = create_session(scylla_conf) diff --git a/serieswriter/src/writer.rs b/serieswriter/src/writer.rs index c2e7c61..a9b4cb2 100644 --- a/serieswriter/src/writer.rs +++ b/serieswriter/src/writer.rs @@ -322,7 +322,7 @@ pub fn start_writer_establish_worker( #[test] fn write_00() { use netpod::Database; - use scywr::session::ScyllaConfig; + use scywr::config::ScyllaIngestConfig; use stats::SeriesByChannelStats; use std::sync::Arc; let fut = async { @@ -333,13 +333,10 @@ fn write_00() { user: "daqbuffer".into(), pass: "daqbuffer".into(), }; - let scyconf = &ScyllaConfig { - hosts: vec!["127.0.0.1:19042".into()], - keyspace: "daqingest_test_00".into(), - }; + let scyconf = &ScyllaIngestConfig::new(["127.0.0.1:19042"], "daqingest_test_00"); let (pgc, pg_jh) = dbpg::conn::make_pg_client(dbconf).await?; dbpg::schema::schema_check(&pgc).await?; - scywr::schema::migrate_scylla_data_schema(scyconf).await?; + scywr::schema::migrate_scylla_data_schema(scyconf, netpod::ttl::RetentionTime::Short).await?; let scy = scywr::session::create_session(scyconf).await?; let stats = SeriesByChannelStats::new(); let stats = Arc::new(stats);