From 9f04c7616ce21c684a968833f0d6f7f13451fdc9 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Mon, 15 Apr 2024 12:24:27 +0100 Subject: [PATCH] Refactoring --- daqingest/src/bin/daqingest.rs | 2 +- daqingest/src/daemon.rs | 111 ++++++++++----- ingest-00/channels/ch1.yml | 0 ingest-00/ingest.yml | 15 +++ mrucache/src/mucache.rs | 49 ++++--- netfetch/src/ca/conn.rs | 8 +- netfetch/src/conf.rs | 21 +-- netfetch/src/metrics.rs | 7 +- netfetch/src/metrics/postingest.rs | 43 +++--- scywr/src/config.rs | 13 +- scywr/src/insertworker.rs | 210 ++++++++++++++++++----------- scywr/src/schema.rs | 29 +++- serieswriter/src/writer.rs | 4 +- 13 files changed, 336 insertions(+), 176 deletions(-) create mode 100644 ingest-00/channels/ch1.yml create mode 100644 ingest-00/ingest.yml diff --git a/daqingest/src/bin/daqingest.rs b/daqingest/src/bin/daqingest.rs index 768ef0f..fe687ed 100644 --- a/daqingest/src/bin/daqingest.rs +++ b/daqingest/src/bin/daqingest.rs @@ -57,7 +57,7 @@ async fn main_run_inner(opts: DaqIngestOpts) -> Result<(), Error> { pass: k.pg_pass, name: k.pg_name, }; - let scyconf = ScyllaIngestConfig::new([k.scylla_host], k.scylla_keyspace); + let scyconf = ScyllaIngestConfig::new([k.scylla_host], k.scylla_keyspace, "DUMMY"); // scywr::schema::migrate_scylla_data_schema(&scyconf, netpod::ttl::RetentionTime::Short) // .await // .map_err(Error::from_string)?; diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index 32c6102..e679bfb 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -3,6 +3,7 @@ pub mod inserthook; use async_channel::Receiver; use async_channel::Sender; use async_channel::WeakSender; +use dbpg::seriesbychannel::ChannelInfoQuery; use err::Error; use log::*; use netfetch::ca::connset::CaConnSet; @@ -75,6 +76,10 @@ pub struct Daemon { insert_workers_running: AtomicU64, query_item_tx_weak: WeakSender>, connset_health_lat_ema: f32, + metrics_shutdown_tx: Sender, + metrics_shutdown_rx: Receiver, + metrics_jh: Option>>, + channel_info_query_tx: Sender, } impl Daemon { @@ -109,7 +114,7 @@ impl Daemon { ingest_opts.backend().into(), local_epics_hostname, query_item_tx, - channel_info_query_tx, + channel_info_query_tx.clone(), ingest_opts.clone(), writer_establis_tx, ); @@ -170,6 +175,7 @@ impl Daemon { let rett = RetentionTime::Short; + #[cfg(DISABLED)] let insert_workers_jh = scywr::insertworker::spawn_scylla_insert_workers( rett, opts.scyconf.clone(), @@ -182,6 +188,14 @@ impl Daemon { ingest_opts.use_rate_limit_queue(), ) .await?; + let insert_workers_jh = scywr::insertworker::spawn_scylla_insert_workers_dummy( + ingest_opts.insert_worker_count(), + ingest_opts.insert_worker_concurrency(), + query_item_rx, + insert_worker_opts, + insert_worker_stats.clone(), + ) + .await?; let stats = Arc::new(DaemonStats::new()); stats.insert_worker_spawned().add(insert_workers_jh.len() as _); @@ -218,6 +232,8 @@ impl Daemon { //jh.await.map_err(|e| e.to_string()).map_err(Error::from)??; } + let (metrics_shutdown_tx, metrics_shutdown_rx) = async_channel::bounded(8); + let ret = Self { opts, ingest_opts, @@ -241,6 +257,10 @@ impl Daemon { insert_workers_running: AtomicU64::new(0), query_item_tx_weak, connset_health_lat_ema: 0., + metrics_shutdown_tx, + metrics_shutdown_rx, + metrics_jh: None, + channel_info_query_tx, }; Ok(ret) } @@ -499,7 +519,47 @@ impl Daemon { taskrun::spawn(ticker); } + pub async fn spawn_metrics(&mut self) -> Result<(), Error> { + let tx = self.tx.clone(); + let daemon_stats = self.stats().clone(); + let connset_cmd_tx = self.connset_ctrl.sender().clone(); + let ca_conn_stats = self.connset_ctrl.ca_conn_stats().clone(); + let dcom = Arc::new(netfetch::metrics::DaemonComm::new(tx.clone())); + let metrics_jh = { + let conn_set_stats = self.connset_ctrl.stats().clone(); + let stats_set = StatsSet::new( + daemon_stats, + conn_set_stats, + ca_conn_stats, + self.connset_ctrl.ca_proto_stats().clone(), + self.insert_worker_stats.clone(), + self.series_by_channel_stats.clone(), + self.connset_ctrl.ioc_finder_stats().clone(), + self.opts.insert_frac.clone(), + ); + let fut = netfetch::metrics::metrics_service( + self.ingest_opts.api_bind(), + dcom, + connset_cmd_tx, + stats_set, + self.metrics_shutdown_rx.clone(), + ); + tokio::task::spawn(fut) + }; + self.metrics_jh = Some(metrics_jh); + Ok(()) + } + pub async fn daemon(mut self) -> Result<(), Error> { + { + let backend = String::new(); + let (item_tx, item_rx) = async_channel::bounded(256); + let info_worker_tx = self.channel_info_query_tx.clone(); + let iiq_tx = self.query_item_tx_weak.upgrade().unwrap(); + let worker_fut = + netfetch::metrics::postingest::process_api_query_items(backend, item_rx, info_worker_tx, iiq_tx); + let worker_jh = taskrun::spawn(worker_fut); + } Self::spawn_ticker(self.tx.clone(), self.stats.clone()); loop { if self.shutting_down { @@ -537,7 +597,12 @@ impl Daemon { } } } - info!("daemon done"); + info!("Wait for metrics handler"); + self.metrics_shutdown_tx.send(1).await?; + if let Some(jh) = self.metrics_jh.take() { + jh.await??; + } + info!("Joined metrics handler"); Ok(()) } } @@ -572,10 +637,16 @@ pub async fn run(opts: CaIngestOpts, channels_config: Option) -> drop(pg); jh.await?.map_err(Error::from_string)?; - scywr::schema::migrate_scylla_data_schema(opts.scylla_config(), 1, true, RetentionTime::Short) + scywr::schema::migrate_scylla_data_schema(opts.scylla_config(), RetentionTime::Short) .await .map_err(Error::from_string)?; + if let Some(scyconf) = opts.scylla_config_lt() { + scywr::schema::migrate_scylla_data_schema(scyconf, RetentionTime::Long) + .await + .map_err(Error::from_string)?; + } + info!("database check done"); // TODO use a new stats type: @@ -600,39 +671,14 @@ pub async fn run(opts: CaIngestOpts, channels_config: Option) -> store_workers_rate, }; let daemon = Daemon::new(opts2, opts.clone()).await?; - let tx = daemon.tx.clone(); - let daemon_stats = daemon.stats().clone(); - let connset_cmd_tx = daemon.connset_ctrl.sender().clone(); - let ca_conn_stats = daemon.connset_ctrl.ca_conn_stats().clone(); - - let (metrics_shutdown_tx, metrics_shutdown_rx) = async_channel::bounded(8); - - let dcom = Arc::new(netfetch::metrics::DaemonComm::new(tx.clone())); - let metrics_jh = { - let conn_set_stats = daemon.connset_ctrl.stats().clone(); - let stats_set = StatsSet::new( - daemon_stats, - conn_set_stats, - ca_conn_stats, - daemon.connset_ctrl.ca_proto_stats().clone(), - daemon.insert_worker_stats.clone(), - daemon.series_by_channel_stats.clone(), - daemon.connset_ctrl.ioc_finder_stats().clone(), - insert_frac, - ); - let fut = - netfetch::metrics::metrics_service(opts.api_bind(), dcom, connset_cmd_tx, stats_set, metrics_shutdown_rx); - tokio::task::spawn(fut) - }; - + let daemon_tx = daemon.tx.clone(); let daemon_jh = taskrun::spawn(daemon.daemon()); - if let Some(channels_config) = channels_config { debug!("will configure {} channels", channels_config.len()); let mut thr_msg = ThrottleTrace::new(Duration::from_millis(1000)); let mut i = 0; for ch_cfg in channels_config.channels() { - match tx + match daemon_tx .send(DaemonEvent::ChannelAdd(ch_cfg.clone(), async_channel::bounded(1).0)) .await { @@ -648,9 +694,6 @@ pub async fn run(opts: CaIngestOpts, channels_config: Option) -> debug!("{} configured channels applied", channels_config.len()); } daemon_jh.await.map_err(|e| Error::with_msg_no_trace(e.to_string()))??; - info!("Daemon joined."); - metrics_shutdown_tx.send(1).await?; - metrics_jh.await.unwrap(); - info!("Metrics joined."); + info!("Joined daemon"); Ok(()) } diff --git a/ingest-00/channels/ch1.yml b/ingest-00/channels/ch1.yml new file mode 100644 index 0000000..e69de29 diff --git a/ingest-00/ingest.yml b/ingest-00/ingest.yml new file mode 100644 index 0000000..1bdaeca --- /dev/null +++ b/ingest-00/ingest.yml @@ -0,0 +1,15 @@ +api_bind: 0.0.0.0:2380 +backend: test +channels: channels +search: + - 127.0.0.1 +postgresql: + host: 127.0.0.1 + port: 5432 + user: daqbuffer + pass: daqbuffer + name: daqbuffer +scylla: + hosts: + - 127.0.0.1:19042 + keyspace: ks_dummy_st diff --git a/mrucache/src/mucache.rs b/mrucache/src/mucache.rs index 02ca28c..eaaaebb 100644 --- a/mrucache/src/mucache.rs +++ b/mrucache/src/mucache.rs @@ -1,12 +1,13 @@ use hashbrown::HashMap; use log::*; use std::hash::Hash; -use std::sync::atomic::AtomicU64; -use std::sync::atomic::Ordering; use std::sync::OnceLock; -use std::sync::RwLock; use std::time::Instant; +// use std::sync::atomic::AtomicU64; +// use std::sync::atomic::Ordering; +// use std::sync::RwLock; + fn tsref() -> Instant { static C: OnceLock = OnceLock::new(); let c = C.get_or_init(Instant::now); @@ -23,23 +24,25 @@ fn dts_now() -> u64 { pub struct MuCache { cap: usize, - map: RwLock>, + map: HashMap, } -impl MuCache { +impl MuCache { pub fn new(cap: usize) -> Self { Self { cap, - map: RwLock::new(HashMap::with_capacity(cap)), + map: HashMap::with_capacity(cap), } } - pub fn insert(&self, k: K, v: V) -> Result<(), ()> { - let ts = AtomicU64::new(dts_now()); - let mut map = self.map.write().unwrap(); + pub fn insert(&mut self, k: K, v: V) -> Result<(), ()> { + let ts = dts_now(); + // let ts = AtomicU64::new(ts); + // let mut map = self.map.write().unwrap(); + let map = &mut self.map; let nmax = self.cap * 5 / 4; if map.len() >= nmax { - Self::remove_unused(&mut map, self.cap); + Self::remove_unused(map, self.cap); } if map.len() >= nmax { warn!("no space in MuCache"); @@ -50,30 +53,34 @@ impl MuCache { } } - pub fn get(&self, k: &K) -> Option { - let map = self.map.read().unwrap(); - match map.get(k) { + pub fn get(&mut self, k: &K) -> Option<&V> { + // let map = self.map.read().unwrap(); + let map = &mut self.map; + match map.get_mut(k) { Some((lu, v)) => { - lu.store(dts_now(), Ordering::Release); - Some(v.clone()) + let ts = dts_now(); + // lu.store(ts, Ordering::Release); + *lu = ts; + Some(v) } None => None, } } - fn remove_unused(map: &mut HashMap, cap: usize) { + fn remove_unused(map: &mut HashMap, cap: usize) { let map1 = core::mem::replace(map, HashMap::new()); - let mut items: Vec<_> = map1 - .into_iter() - .map(|x| (x.1 .0.load(Ordering::Acquire), x.1 .1, x.0)) - .collect(); + let mut items: Vec<_> = map1.into_iter().map(|x| (x.1 .0, x.1 .1, x.0)).collect(); items.sort_unstable_by_key(|x| x.0); let ts_cut = items[items.len() - cap].0; let map2 = items .into_iter() .filter(|x| x.0 > ts_cut) - .map(|x| (x.2, (AtomicU64::new(x.0), x.1))) + .map(|x| (x.2, (x.0, x.1))) .collect(); *map = map2; } + + pub fn all_ref_mut(&mut self) -> Vec<&mut V> { + self.map.iter_mut().map(|x| (&mut x.1 .1)).collect() + } } diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 49327f3..89cf10b 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -74,7 +74,7 @@ const DO_RATE_CHECK: bool = false; #[allow(unused)] macro_rules! trace2 { ($($arg:tt)*) => { - if false { + if true { trace!($($arg)*); } }; @@ -83,7 +83,7 @@ macro_rules! trace2 { #[allow(unused)] macro_rules! trace3 { ($($arg:tt)*) => { - if false { + if true { trace!($($arg)*); } }; @@ -92,7 +92,7 @@ macro_rules! trace3 { #[allow(unused)] macro_rules! trace4 { ($($arg:tt)*) => { - if false { + if true { trace!($($arg)*); } }; @@ -2277,6 +2277,7 @@ impl CaConn { FS: Fn(&Q), { use Poll::*; + trace3!("attempt_flush_queue id {} len {}", id, qu.len()); let mut have_progress = false; let mut i = 0; loop { @@ -2298,6 +2299,7 @@ impl CaConn { if sp.is_sending() { match sp.poll_unpin(cx) { Ready(Ok(())) => { + trace3!("attempt_flush_queue id {} send done", id); have_progress = true; } Ready(Err(e)) => { diff --git a/netfetch/src/conf.rs b/netfetch/src/conf.rs index 271d4f5..c9e9f91 100644 --- a/netfetch/src/conf.rs +++ b/netfetch/src/conf.rs @@ -174,19 +174,19 @@ pub async fn parse_config(config: PathBuf) -> Result<(CaIngestOpts, Option Result { let buf = tokio::fs::read(e.path()).await?; let conf: BTreeMap = serde_yaml::from_slice(&buf).map_err(Error::from_string)?; + info!("parsed {} channels from {}", conf.len(), fns); ret.push_from_parsed(&conf); } else { debug!("ignore channel config file {:?}", e.path()); diff --git a/netfetch/src/metrics.rs b/netfetch/src/metrics.rs index 82b4aaa..d6578a4 100644 --- a/netfetch/src/metrics.rs +++ b/netfetch/src/metrics.rs @@ -395,8 +395,8 @@ pub async fn metrics_service( connset_cmd_tx: Sender, stats_set: StatsSet, shutdown_signal: Receiver, -) { - let addr = bind_to.parse().unwrap(); +) -> Result<(), Error> { + let addr = bind_to.parse().map_err(Error::from_string)?; let router = make_routes(dcom, connset_cmd_tx, stats_set).into_make_service(); axum::Server::bind(&addr) .serve(router) @@ -404,7 +404,8 @@ pub async fn metrics_service( let _ = shutdown_signal.recv().await; }) .await - .unwrap() + .map_err(Error::from_string)?; + Ok(()) } pub async fn metrics_agg_task(local_stats: Arc, store_stats: Arc) -> Result<(), Error> { diff --git a/netfetch/src/metrics/postingest.rs b/netfetch/src/metrics/postingest.rs index 47a2aa8..90d18f6 100644 --- a/netfetch/src/metrics/postingest.rs +++ b/netfetch/src/metrics/postingest.rs @@ -3,6 +3,7 @@ use async_channel::Sender; use dbpg::seriesbychannel::ChannelInfoQuery; use err::thiserror; use err::ThisError; +use mrucache::mucache::MuCache; use netpod::ScalarType; use netpod::Shape; use netpod::TsNano; @@ -22,8 +23,8 @@ pub enum Error { SendError, } -impl From> for Error { - fn from(value: async_channel::SendError) -> Self { +impl From>> for Error { + fn from(value: async_channel::SendError>) -> Self { Error::SendError } } @@ -35,12 +36,18 @@ pub struct EventValueItem { val: DataValue, } -async fn process_api_query_items( +struct SeriesWriterIngredients { + writer: SeriesWriter, +} + +pub async fn process_api_query_items( backend: String, item_rx: Receiver, info_worker_tx: Sender, - iiq_tx: Sender, + iiq_tx: Sender>, ) -> Result<(), Error> { + // TODO so far arbitrary upper limit on the number of ad-hoc channels: + let mut mucache: MuCache = MuCache::new(2000); let mut item_qu = VecDeque::new(); let mut sw_tick_last = Instant::now(); @@ -49,7 +56,7 @@ async fn process_api_query_items( let tsnow = Instant::now(); if tsnow.saturating_duration_since(sw_tick_last) >= Duration::from_millis(5000) { sw_tick_last = tsnow; - tick_writers(&mut item_qu)?; + tick_writers(mucache.all_ref_mut(), &mut item_qu)?; } let item = match item { Ok(Ok(item)) => item, @@ -77,21 +84,23 @@ async fn process_api_query_items( let sw = &mut sw; sw.write(item.ts, item.ts, item.val, &mut item_qu)?; - - for e in item_qu.drain(..).into_iter() { - iiq_tx.send(e).await?; - } + let item = core::mem::replace(&mut item_qu, VecDeque::new()); + iiq_tx.send(item).await?; } - // let scalar_type = ScalarType::F32; - // let shape = Shape::Scalar; - - // TODO SeriesWriter need to get ticked. - + finish_writers(mucache.all_ref_mut(), &mut item_qu)?; Ok(()) } -fn tick_writers(iiq: &mut VecDeque) -> Result<(), Error> { - let sw: &mut SeriesWriter = err::todoval(); - sw.tick(iiq)?; +fn tick_writers(sws: Vec<&mut SeriesWriter>, iiq: &mut VecDeque) -> Result<(), Error> { + for sw in sws { + sw.tick(iiq)?; + } + Ok(()) +} + +fn finish_writers(sws: Vec<&mut SeriesWriter>, iiq: &mut VecDeque) -> Result<(), Error> { + for sw in sws { + sw.tick(iiq)?; + } Ok(()) } diff --git a/scywr/src/config.rs b/scywr/src/config.rs index 13dfe4f..522daf7 100644 --- a/scywr/src/config.rs +++ b/scywr/src/config.rs @@ -4,18 +4,21 @@ use serde::Deserialize; pub struct ScyllaIngestConfig { hosts: Vec, keyspace: String, + keyspace_rf1: Option, } impl ScyllaIngestConfig { - pub fn new(hosts: I, ks: K) -> Self + pub fn new(hosts: I, ks_rf3: K1, ks_rf1: K2) -> Self where I: IntoIterator, H: Into, - K: Into, + K1: Into, + K2: Into, { Self { hosts: hosts.into_iter().map(Into::into).collect(), - keyspace: ks.into(), + keyspace: ks_rf3.into(), + keyspace_rf1: Some(ks_rf1.into()), } } @@ -26,4 +29,8 @@ impl ScyllaIngestConfig { pub fn keyspace(&self) -> &String { &self.keyspace } + + pub fn keyspace_rf1(&self) -> Option<&String> { + self.keyspace_rf1.as_ref() + } } diff --git a/scywr/src/insertworker.rs b/scywr/src/insertworker.rs index 15240fc..eb6b004 100644 --- a/scywr/src/insertworker.rs +++ b/scywr/src/insertworker.rs @@ -16,6 +16,9 @@ use async_channel::Receiver; use atomic::AtomicU64; use atomic::Ordering; use err::Error; +use futures_util::Future; +use futures_util::Stream; +use futures_util::StreamExt; use log::*; use netpod::ttl::RetentionTime; use netpod::TsMs; @@ -125,6 +128,29 @@ pub async fn spawn_scylla_insert_workers( data_store, store_stats.clone(), )); + let jh = tokio::spawn(worker_streamed( + worker_ix, + insert_worker_concurrency, + item_inp.clone(), + insert_worker_opts.clone(), + Some(data_store), + store_stats.clone(), + )); + jhs.push(jh); + } + Ok(jhs) +} + +pub async fn spawn_scylla_insert_workers_dummy( + insert_worker_count: usize, + insert_worker_concurrency: usize, + item_inp: Receiver>, + insert_worker_opts: Arc, + store_stats: Arc, +) -> Result>>, Error> { + let mut jhs = Vec::new(); + for worker_ix in 0..insert_worker_count { + let data_store = None; let jh = tokio::spawn(worker_streamed( worker_ix, insert_worker_concurrency, @@ -223,92 +249,54 @@ async fn worker_streamed( concurrency: usize, item_inp: Receiver>, insert_worker_opts: Arc, - data_store: Arc, + data_store: Option>, stats: Arc, ) -> Result<(), Error> { - use futures_util::StreamExt; + trace!("worker_streamed begin"); stats.worker_start().inc(); insert_worker_opts .insert_workers_running .fetch_add(1, atomic::Ordering::AcqRel); - // TODO possible without box? - let item_inp = Box::pin(item_inp); - let mut stream = item_inp - .map(|batch| { - stats.item_recv.inc(); - let tsnow = TsMs::from_system_time(SystemTime::now()); - let mut res = Vec::with_capacity(32); - for item in batch { - if false { - match &item { - QueryItem::ConnectionStatus(_) => { - debug!("execute ConnectionStatus"); - } - QueryItem::ChannelStatus(_) => { - debug!("execute ChannelStatus"); - } - QueryItem::Insert(item) => { - debug!( - "execute Insert {:?} {:?} {:?}", - item.series, - item.ts_msp, - item.val.shape() - ); - } - QueryItem::TimeBinSimpleF32(_) => { - debug!("execute TimeBinSimpleF32"); - } - QueryItem::Accounting(_) => { - debug!("execute Accounting"); - } - } + let stream = item_inp; + let stream = inspect_items(stream); + if let Some(data_store) = data_store { + let stream = transform_to_db_futures(stream, data_store, stats.clone()); + let stream = stream + .map(|x| futures_util::stream::iter(x)) + .flatten_unordered(Some(1)) + // .map(|x| async move { + // drop(x); + // Ok(()) + // }) + .buffer_unordered(concurrency); + let mut stream = Box::pin(stream); + while let Some(item) = stream.next().await { + match item { + Ok(_) => { + stats.inserted_values().inc(); + // TODO compute the insert latency bin and count. } - let futs = match item { - QueryItem::Insert(item) => prepare_query_insert_futs(item, &data_store, &stats, tsnow), - QueryItem::ConnectionStatus(item) => { - stats.inserted_connection_status().inc(); - let fut = insert_connection_status_fut(item, &data_store, stats.clone()); - smallvec![fut] - } - QueryItem::ChannelStatus(item) => { - stats.inserted_channel_status().inc(); - insert_channel_status_fut(item, &data_store, stats.clone()) - } - QueryItem::TimeBinSimpleF32(item) => prepare_timebin_insert_futs(item, &data_store, &stats, tsnow), - QueryItem::Accounting(item) => prepare_accounting_insert_futs(item, &data_store, &stats, tsnow), - }; - res.extend(futs.into_iter()); - } - res - }) - .map(|x| futures_util::stream::iter(x)) - .flatten_unordered(Some(1)) - // .map(|x| async move { - // drop(x); - // Ok(()) - // }) - .buffer_unordered(concurrency); - while let Some(item) = stream.next().await { - match item { - Ok(_) => { - stats.inserted_values().inc(); - // TODO compute the insert latency bin and count. - } - Err(e) => { - use scylla::transport::errors::QueryError; - let e = match e { - QueryError::TimeoutError => crate::iteminsertqueue::Error::DbTimeout, - // TODO use `msg` - QueryError::DbError(e, _msg) => match e { - scylla::transport::errors::DbError::Overloaded => crate::iteminsertqueue::Error::DbOverload, + Err(e) => { + use scylla::transport::errors::QueryError; + let e = match e { + QueryError::TimeoutError => crate::iteminsertqueue::Error::DbTimeout, + // TODO use `msg` + QueryError::DbError(e, _msg) => match e { + scylla::transport::errors::DbError::Overloaded => crate::iteminsertqueue::Error::DbOverload, + _ => e.into(), + }, _ => e.into(), - }, - _ => e.into(), - }; - stats_inc_for_err(&stats, &e); + }; + stats_inc_for_err(&stats, &e); + } } } - } + } else { + let mut stream = Box::pin(stream); + while let Some(item) = stream.next().await { + drop(item); + } + }; stats.worker_finish().inc(); insert_worker_opts .insert_workers_running @@ -317,6 +305,76 @@ async fn worker_streamed( Ok(()) } +fn transform_to_db_futures( + item_inp: S, + data_store: Arc, + stats: Arc, +) -> impl Stream> +where + S: Stream>, +{ + trace!("transform_to_db_futures begin"); + // TODO possible without box? + // let item_inp = Box::pin(item_inp); + item_inp.map(move |batch| { + stats.item_recv.inc(); + trace!("transform_to_db_futures have batch len {}", batch.len()); + let tsnow = TsMs::from_system_time(SystemTime::now()); + let mut res = Vec::with_capacity(32); + for item in batch { + let futs = match item { + QueryItem::Insert(item) => prepare_query_insert_futs(item, &data_store, &stats, tsnow), + QueryItem::ConnectionStatus(item) => { + stats.inserted_connection_status().inc(); + let fut = insert_connection_status_fut(item, &data_store, stats.clone()); + smallvec![fut] + } + QueryItem::ChannelStatus(item) => { + stats.inserted_channel_status().inc(); + insert_channel_status_fut(item, &data_store, stats.clone()) + } + QueryItem::TimeBinSimpleF32(item) => prepare_timebin_insert_futs(item, &data_store, &stats, tsnow), + QueryItem::Accounting(item) => prepare_accounting_insert_futs(item, &data_store, &stats, tsnow), + }; + trace!("prepared futs len {}", futs.len()); + res.extend(futs.into_iter()); + } + res + }) +} + +fn inspect_items(item_inp: Receiver>) -> impl Stream> { + trace!("transform_to_db_futures begin"); + // TODO possible without box? + // let item_inp = Box::pin(item_inp); + item_inp.inspect(move |batch| { + for item in batch { + match &item { + QueryItem::ConnectionStatus(_) => { + debug!("execute ConnectionStatus"); + } + QueryItem::ChannelStatus(_) => { + debug!("execute ChannelStatus"); + } + QueryItem::Insert(item) => { + debug!( + "execute Insert {:?} {:?} {:?}", + item.series, + item.ts_msp, + item.val.shape() + ); + } + QueryItem::TimeBinSimpleF32(_) => { + debug!("execute TimeBinSimpleF32"); + } + QueryItem::Accounting(_) => { + debug!("execute Accounting"); + } + } + } + }) +} + fn prepare_query_insert_futs( item: InsertItem, data_store: &Arc, diff --git a/scywr/src/schema.rs b/scywr/src/schema.rs index 0a059f1..41c355d 100644 --- a/scywr/src/schema.rs +++ b/scywr/src/schema.rs @@ -447,16 +447,14 @@ async fn check_event_tables(keyspace: &str, rett: RetentionTime, scy: &ScySessio Ok(()) } -pub async fn migrate_scylla_data_schema( - scyconf: &ScyllaIngestConfig, - replication: u32, - durable: bool, - rett: RetentionTime, -) -> Result<(), Error> { +pub async fn migrate_scylla_data_schema(scyconf: &ScyllaIngestConfig, rett: RetentionTime) -> Result<(), Error> { let scy2 = create_session_no_ks(scyconf).await?; let scy = &scy2; + let durable = true; if !has_keyspace(scyconf.keyspace(), scy).await? { + // TODO + let replication = 3; let cql = format!( concat!( "create keyspace {}", @@ -472,6 +470,25 @@ pub async fn migrate_scylla_data_schema( info!("keyspace created"); } + if let Some(ks) = scyconf.keyspace_rf1() { + if !has_keyspace(ks, scy).await? { + let replication = 1; + let cql = format!( + concat!( + "create keyspace {}", + " with replication = {{ 'class': 'SimpleStrategy', 'replication_factor': {} }}", + " and durable_writes = {};" + ), + scyconf.keyspace(), + replication, + durable + ); + info!("scylla create keyspace {cql}"); + scy.query_iter(cql, ()).await?; + info!("keyspace created"); + } + } + let ks = scyconf.keyspace(); scy.use_keyspace(ks, true).await?; diff --git a/serieswriter/src/writer.rs b/serieswriter/src/writer.rs index e97d6e1..052a452 100644 --- a/serieswriter/src/writer.rs +++ b/serieswriter/src/writer.rs @@ -329,10 +329,10 @@ fn write_00() { user: "daqbuffer".into(), pass: "daqbuffer".into(), }; - let scyconf = &ScyllaIngestConfig::new(["127.0.0.1:19042"], "daqingest_test_00"); + let scyconf = &ScyllaIngestConfig::new(["127.0.0.1:19042"], "daqingest_test_00_rf3", "daqingest_test_00_rf1"); let (pgc, pg_jh) = dbpg::conn::make_pg_client(dbconf).await?; dbpg::schema::schema_check(&pgc).await?; - scywr::schema::migrate_scylla_data_schema(scyconf, 1, true, netpod::ttl::RetentionTime::Short).await?; + scywr::schema::migrate_scylla_data_schema(scyconf, netpod::ttl::RetentionTime::Short).await?; let scy = scywr::session::create_session(scyconf).await?; let stats = SeriesByChannelStats::new(); let stats = Arc::new(stats);