diff --git a/daqingest/src/query.rs b/daqingest/src/query.rs index 156d379..da983a3 100644 --- a/daqingest/src/query.rs +++ b/daqingest/src/query.rs @@ -106,6 +106,8 @@ pub async fn list_pulses() -> Result<(), Error> { } pub async fn fetch_events(opts: FetchEvents) -> Result<(), Error> { + // TODO use the keyspace from commandline. + err::todo(); let scy = SessionBuilder::new() .known_nodes(&opts.scylla) .default_consistency(Consistency::LocalOne) diff --git a/netfetch/src/ca.rs b/netfetch/src/ca.rs index cf1a659..d482b90 100644 --- a/netfetch/src/ca.rs +++ b/netfetch/src/ca.rs @@ -159,6 +159,17 @@ pub struct CaConnectOpts { pub local_epics_hostname: String, } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ExtraInsertsConf { + pub copies: Vec<(u64, u64)>, +} + +impl ExtraInsertsConf { + pub fn new() -> Self { + Self { copies: Vec::new() } + } +} + async fn spawn_scylla_insert_workers( scyconf: ScyllaConfig, insert_scylla_sessions: usize, @@ -333,7 +344,9 @@ pub struct IngestCommons { pub local_epics_hostname: String, pub insert_item_queue: Arc, pub data_store: Arc, + pub insert_frac: Arc, pub insert_ivl_min: Arc, + pub extra_inserts_conf: Mutex, pub conn_stats: Arc>>>, pub command_queue_set: Arc, } @@ -393,6 +406,7 @@ pub async fn create_ca_conn( insert_ivl_min: Arc, conn_stats: Arc>>>, command_queue_set: Arc, + ingest_commons: Arc, ) -> Result>, Error> { info!("create new CaConn {:?}", addr); let data_store = data_store.clone(); @@ -404,6 +418,7 @@ pub async fn create_ca_conn( array_truncate, insert_queue_max, insert_ivl_min.clone(), + ingest_commons.clone(), ); conn_stats.lock().await.push(conn.stats()); let stats2 = conn.stats(); @@ -434,6 +449,7 @@ pub async fn create_ca_conn( pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { crate::linuxhelper::set_signal_handler()?; let insert_frac = Arc::new(AtomicU64::new(1000)); + let extra_inserts_conf = Mutex::new(ExtraInsertsConf { copies: Vec::new() }); let insert_ivl_min = Arc::new(AtomicU64::new(8800)); let opts = parse_config(opts.config).await?; let scyconf = opts.scyconf.clone(); @@ -510,14 +526,14 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { insert_ivl_min: insert_ivl_min.clone(), conn_stats: conn_stats.clone(), command_queue_set: command_queue_set.clone(), + insert_frac, + extra_inserts_conf, }; let ingest_commons = Arc::new(ingest_commons); if true { tokio::spawn(crate::metrics::start_metrics_service( opts.api_bind.clone(), - insert_frac.clone(), - insert_ivl_min.clone(), ingest_commons.clone(), )); } @@ -626,6 +642,7 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { opts.array_truncate, opts.insert_queue_max, insert_ivl_min.clone(), + ingest_commons.clone(), ); conn_stats.lock().await.push(conn.stats()); let stats2 = conn.stats(); diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 54b98d5..5e0786d 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -1,5 +1,6 @@ use super::proto::{self, CaItem, CaMsg, CaMsgTy, CaProto}; use super::store::DataStore; +use super::{ExtraInsertsConf, IngestCommons}; use crate::bsread::ChannelDescDecoded; use crate::ca::proto::{CreateChan, EventAdd}; use crate::ca::store::ChannelRegistry; @@ -253,6 +254,7 @@ pub enum ConnCommandKind { ChannelAdd(String, Sender), ChannelRemove(String, Sender), Shutdown(Sender), + ExtraInsertsConf(ExtraInsertsConf, Sender), } #[derive(Debug)] @@ -316,6 +318,14 @@ impl ConnCommand { }; (cmd, rx) } + + pub fn extra_inserts_conf_set(k: ExtraInsertsConf) -> (ConnCommand, async_channel::Receiver) { + let (tx, rx) = async_channel::bounded(1); + let cmd = Self { + kind: ConnCommandKind::ExtraInsertsConf(k, tx), + }; + (cmd, rx) + } } pub struct CaConn { @@ -346,6 +356,10 @@ pub struct CaConn { conn_command_rx: async_channel::Receiver, conn_backoff: f32, conn_backoff_beg: f32, + inserts_counter: u64, + #[allow(unused)] + ingest_commons: Arc, + extra_inserts_conf: ExtraInsertsConf, } impl CaConn { @@ -357,6 +371,7 @@ impl CaConn { array_truncate: usize, insert_queue_max: usize, insert_ivl_min: Arc, + ingest_commons: Arc, ) -> Self { let (cq_tx, cq_rx) = async_channel::bounded(32); Self { @@ -386,6 +401,9 @@ impl CaConn { conn_command_rx: cq_rx, conn_backoff: 0.02, conn_backoff_beg: 0.02, + inserts_counter: 0, + ingest_commons, + extra_inserts_conf: ExtraInsertsConf::new(), } } @@ -486,6 +504,15 @@ impl CaConn { } } } + ConnCommandKind::ExtraInsertsConf(k, tx) => { + self.extra_inserts_conf = k; + match tx.try_send(true) { + Ok(_) => {} + Err(_) => { + //error!("response channel full or closed"); + } + } + } }, Ready(None) => { error!("Command queue closed"); @@ -802,6 +829,8 @@ impl CaConn { item_queue: &mut VecDeque, insert_ivl_min: Arc, stats: Arc, + inserts_counter: &mut u64, + extra_inserts_conf: &ExtraInsertsConf, ) -> Result<(), Error> { st.muted_before = 0; st.insert_item_ivl_ema.tick(tsnow); @@ -823,6 +852,23 @@ impl CaConn { } else { None }; + for &(m, l) in extra_inserts_conf.copies.iter() { + if *inserts_counter % m == l { + Self::event_add_insert( + st, + series.clone(), + scalar_type.clone(), + shape.clone(), + ts - 2, + ev.clone(), + item_queue, + ts_msp_last, + inserted_in_ts_msp, + ts_msp_grid, + stats.clone(), + )?; + } + } Self::event_add_insert( st, series, @@ -836,6 +882,7 @@ impl CaConn { ts_msp_grid, stats, )?; + *inserts_counter += 1; Ok(()) } @@ -895,6 +942,8 @@ impl CaConn { if tsnow >= st.insert_next_earliest { //let channel_state = self.channels.get_mut(&cid).unwrap(); let item_queue = &mut self.insert_item_queue; + let inserts_counter = &mut self.inserts_counter; + let extra_inserts_conf = &self.extra_inserts_conf; Self::do_event_insert( st, series, @@ -906,6 +955,8 @@ impl CaConn { item_queue, self.insert_ivl_min.clone(), self.stats.clone(), + inserts_counter, + extra_inserts_conf, )?; } else { self.stats.channel_fast_item_drop_inc(); diff --git a/netfetch/src/ca/proto.rs b/netfetch/src/ca/proto.rs index 474885a..82c7eb9 100644 --- a/netfetch/src/ca/proto.rs +++ b/netfetch/src/ca/proto.rs @@ -73,7 +73,8 @@ pub struct EventAdd { pub subid: u32, } -#[derive(Debug)] +// TODO Clone is only used for testing purposes and should get removed later. +#[derive(Debug, Clone)] pub struct EventAddRes { pub data_type: u16, pub data_count: u16, diff --git a/netfetch/src/metrics.rs b/netfetch/src/metrics.rs index 1ff8b3c..f83e806 100644 --- a/netfetch/src/metrics.rs +++ b/netfetch/src/metrics.rs @@ -1,11 +1,11 @@ use crate::ca::conn::ConnCommand; -use crate::ca::IngestCommons; +use crate::ca::{ExtraInsertsConf, IngestCommons}; use axum::extract::Query; use http::request::Parts; use log::*; use std::collections::HashMap; use std::net::SocketAddrV4; -use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::atomic::Ordering; use std::sync::Arc; async fn get_empty() -> String { @@ -73,6 +73,7 @@ async fn channel_add(params: HashMap, ingest_commons: Arc, - insert_ivl_min: Arc, - ingest_commons: Arc, -) { +async fn extra_inserts_conf_set(v: ExtraInsertsConf, ingest_commons: Arc) -> axum::Json { + *ingest_commons.extra_inserts_conf.lock().unwrap() = v.clone(); + let g = ingest_commons.command_queue_set.queues_locked().await; + let mut it = g.iter(); + let rxs = send_command(&mut it, || ConnCommand::extra_inserts_conf_set(v.clone())).await; + for rx in rxs { + let _item = rx.recv().await.unwrap(); + } + axum::Json(true) +} + +pub async fn start_metrics_service(bind_to: String, ingest_commons: Arc) { use axum::routing::{get, put}; use axum::{extract, Router}; let app = Router::new() @@ -276,14 +283,27 @@ pub async fn start_metrics_service( ) .route( "/insert_frac", - get(get_empty).put(|v: extract::Json| async move { - insert_frac.store(v.0, Ordering::Release); + get(get_empty).put({ + let insert_frac = ingest_commons.insert_frac.clone(); + |v: extract::Json| async move { + insert_frac.store(v.0, Ordering::Release); + } }), ) .route( "/insert_ivl_min", - put(|v: extract::Json| async move { - insert_ivl_min.store(v.0, Ordering::Release); + put({ + let insert_ivl_min = ingest_commons.insert_ivl_min.clone(); + |v: extract::Json| async move { + insert_ivl_min.store(v.0, Ordering::Release); + } + }), + ) + .route( + "/extra_inserts_conf", + put({ + let ingest_commons = ingest_commons.clone(); + |v: extract::Json| extra_inserts_conf_set(v.0, ingest_commons) }), ) .fallback( diff --git a/netfetch/src/zmtp.rs b/netfetch/src/zmtp.rs index 56d96c6..0101e67 100644 --- a/netfetch/src/zmtp.rs +++ b/netfetch/src/zmtp.rs @@ -433,6 +433,8 @@ pub async fn zmtp_client(opts: ZmtpClientOpts) -> Result<(), Error> { for a in &opts.scylla { scy = scy.known_node(a); } + // TODO use keyspace from configuration. + err::todo(); let scy = scy .use_keyspace("ks1", false) .build()