Finer grained generation of extra inserts for testing
This commit is contained in:
@@ -106,6 +106,8 @@ pub async fn list_pulses() -> Result<(), Error> {
|
||||
}
|
||||
|
||||
pub async fn fetch_events(opts: FetchEvents) -> Result<(), Error> {
|
||||
// TODO use the keyspace from commandline.
|
||||
err::todo();
|
||||
let scy = SessionBuilder::new()
|
||||
.known_nodes(&opts.scylla)
|
||||
.default_consistency(Consistency::LocalOne)
|
||||
|
||||
@@ -159,6 +159,17 @@ pub struct CaConnectOpts {
|
||||
pub local_epics_hostname: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct ExtraInsertsConf {
|
||||
pub copies: Vec<(u64, u64)>,
|
||||
}
|
||||
|
||||
impl ExtraInsertsConf {
|
||||
pub fn new() -> Self {
|
||||
Self { copies: Vec::new() }
|
||||
}
|
||||
}
|
||||
|
||||
async fn spawn_scylla_insert_workers(
|
||||
scyconf: ScyllaConfig,
|
||||
insert_scylla_sessions: usize,
|
||||
@@ -333,7 +344,9 @@ pub struct IngestCommons {
|
||||
pub local_epics_hostname: String,
|
||||
pub insert_item_queue: Arc<CommonInsertItemQueue>,
|
||||
pub data_store: Arc<DataStore>,
|
||||
pub insert_frac: Arc<AtomicU64>,
|
||||
pub insert_ivl_min: Arc<AtomicU64>,
|
||||
pub extra_inserts_conf: Mutex<ExtraInsertsConf>,
|
||||
pub conn_stats: Arc<TokMx<Vec<Arc<CaConnStats>>>>,
|
||||
pub command_queue_set: Arc<CommandQueueSet>,
|
||||
}
|
||||
@@ -393,6 +406,7 @@ pub async fn create_ca_conn(
|
||||
insert_ivl_min: Arc<AtomicU64>,
|
||||
conn_stats: Arc<TokMx<Vec<Arc<CaConnStats>>>>,
|
||||
command_queue_set: Arc<CommandQueueSet>,
|
||||
ingest_commons: Arc<IngestCommons>,
|
||||
) -> Result<JoinHandle<Result<(), Error>>, Error> {
|
||||
info!("create new CaConn {:?}", addr);
|
||||
let data_store = data_store.clone();
|
||||
@@ -404,6 +418,7 @@ pub async fn create_ca_conn(
|
||||
array_truncate,
|
||||
insert_queue_max,
|
||||
insert_ivl_min.clone(),
|
||||
ingest_commons.clone(),
|
||||
);
|
||||
conn_stats.lock().await.push(conn.stats());
|
||||
let stats2 = conn.stats();
|
||||
@@ -434,6 +449,7 @@ pub async fn create_ca_conn(
|
||||
pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
|
||||
crate::linuxhelper::set_signal_handler()?;
|
||||
let insert_frac = Arc::new(AtomicU64::new(1000));
|
||||
let extra_inserts_conf = Mutex::new(ExtraInsertsConf { copies: Vec::new() });
|
||||
let insert_ivl_min = Arc::new(AtomicU64::new(8800));
|
||||
let opts = parse_config(opts.config).await?;
|
||||
let scyconf = opts.scyconf.clone();
|
||||
@@ -510,14 +526,14 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
|
||||
insert_ivl_min: insert_ivl_min.clone(),
|
||||
conn_stats: conn_stats.clone(),
|
||||
command_queue_set: command_queue_set.clone(),
|
||||
insert_frac,
|
||||
extra_inserts_conf,
|
||||
};
|
||||
let ingest_commons = Arc::new(ingest_commons);
|
||||
|
||||
if true {
|
||||
tokio::spawn(crate::metrics::start_metrics_service(
|
||||
opts.api_bind.clone(),
|
||||
insert_frac.clone(),
|
||||
insert_ivl_min.clone(),
|
||||
ingest_commons.clone(),
|
||||
));
|
||||
}
|
||||
@@ -626,6 +642,7 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
|
||||
opts.array_truncate,
|
||||
opts.insert_queue_max,
|
||||
insert_ivl_min.clone(),
|
||||
ingest_commons.clone(),
|
||||
);
|
||||
conn_stats.lock().await.push(conn.stats());
|
||||
let stats2 = conn.stats();
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use super::proto::{self, CaItem, CaMsg, CaMsgTy, CaProto};
|
||||
use super::store::DataStore;
|
||||
use super::{ExtraInsertsConf, IngestCommons};
|
||||
use crate::bsread::ChannelDescDecoded;
|
||||
use crate::ca::proto::{CreateChan, EventAdd};
|
||||
use crate::ca::store::ChannelRegistry;
|
||||
@@ -253,6 +254,7 @@ pub enum ConnCommandKind {
|
||||
ChannelAdd(String, Sender<bool>),
|
||||
ChannelRemove(String, Sender<bool>),
|
||||
Shutdown(Sender<bool>),
|
||||
ExtraInsertsConf(ExtraInsertsConf, Sender<bool>),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -316,6 +318,14 @@ impl ConnCommand {
|
||||
};
|
||||
(cmd, rx)
|
||||
}
|
||||
|
||||
pub fn extra_inserts_conf_set(k: ExtraInsertsConf) -> (ConnCommand, async_channel::Receiver<bool>) {
|
||||
let (tx, rx) = async_channel::bounded(1);
|
||||
let cmd = Self {
|
||||
kind: ConnCommandKind::ExtraInsertsConf(k, tx),
|
||||
};
|
||||
(cmd, rx)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct CaConn {
|
||||
@@ -346,6 +356,10 @@ pub struct CaConn {
|
||||
conn_command_rx: async_channel::Receiver<ConnCommand>,
|
||||
conn_backoff: f32,
|
||||
conn_backoff_beg: f32,
|
||||
inserts_counter: u64,
|
||||
#[allow(unused)]
|
||||
ingest_commons: Arc<IngestCommons>,
|
||||
extra_inserts_conf: ExtraInsertsConf,
|
||||
}
|
||||
|
||||
impl CaConn {
|
||||
@@ -357,6 +371,7 @@ impl CaConn {
|
||||
array_truncate: usize,
|
||||
insert_queue_max: usize,
|
||||
insert_ivl_min: Arc<AtomicU64>,
|
||||
ingest_commons: Arc<IngestCommons>,
|
||||
) -> Self {
|
||||
let (cq_tx, cq_rx) = async_channel::bounded(32);
|
||||
Self {
|
||||
@@ -386,6 +401,9 @@ impl CaConn {
|
||||
conn_command_rx: cq_rx,
|
||||
conn_backoff: 0.02,
|
||||
conn_backoff_beg: 0.02,
|
||||
inserts_counter: 0,
|
||||
ingest_commons,
|
||||
extra_inserts_conf: ExtraInsertsConf::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -486,6 +504,15 @@ impl CaConn {
|
||||
}
|
||||
}
|
||||
}
|
||||
ConnCommandKind::ExtraInsertsConf(k, tx) => {
|
||||
self.extra_inserts_conf = k;
|
||||
match tx.try_send(true) {
|
||||
Ok(_) => {}
|
||||
Err(_) => {
|
||||
//error!("response channel full or closed");
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
Ready(None) => {
|
||||
error!("Command queue closed");
|
||||
@@ -802,6 +829,8 @@ impl CaConn {
|
||||
item_queue: &mut VecDeque<QueryItem>,
|
||||
insert_ivl_min: Arc<AtomicU64>,
|
||||
stats: Arc<CaConnStats>,
|
||||
inserts_counter: &mut u64,
|
||||
extra_inserts_conf: &ExtraInsertsConf,
|
||||
) -> Result<(), Error> {
|
||||
st.muted_before = 0;
|
||||
st.insert_item_ivl_ema.tick(tsnow);
|
||||
@@ -823,6 +852,23 @@ impl CaConn {
|
||||
} else {
|
||||
None
|
||||
};
|
||||
for &(m, l) in extra_inserts_conf.copies.iter() {
|
||||
if *inserts_counter % m == l {
|
||||
Self::event_add_insert(
|
||||
st,
|
||||
series.clone(),
|
||||
scalar_type.clone(),
|
||||
shape.clone(),
|
||||
ts - 2,
|
||||
ev.clone(),
|
||||
item_queue,
|
||||
ts_msp_last,
|
||||
inserted_in_ts_msp,
|
||||
ts_msp_grid,
|
||||
stats.clone(),
|
||||
)?;
|
||||
}
|
||||
}
|
||||
Self::event_add_insert(
|
||||
st,
|
||||
series,
|
||||
@@ -836,6 +882,7 @@ impl CaConn {
|
||||
ts_msp_grid,
|
||||
stats,
|
||||
)?;
|
||||
*inserts_counter += 1;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -895,6 +942,8 @@ impl CaConn {
|
||||
if tsnow >= st.insert_next_earliest {
|
||||
//let channel_state = self.channels.get_mut(&cid).unwrap();
|
||||
let item_queue = &mut self.insert_item_queue;
|
||||
let inserts_counter = &mut self.inserts_counter;
|
||||
let extra_inserts_conf = &self.extra_inserts_conf;
|
||||
Self::do_event_insert(
|
||||
st,
|
||||
series,
|
||||
@@ -906,6 +955,8 @@ impl CaConn {
|
||||
item_queue,
|
||||
self.insert_ivl_min.clone(),
|
||||
self.stats.clone(),
|
||||
inserts_counter,
|
||||
extra_inserts_conf,
|
||||
)?;
|
||||
} else {
|
||||
self.stats.channel_fast_item_drop_inc();
|
||||
|
||||
@@ -73,7 +73,8 @@ pub struct EventAdd {
|
||||
pub subid: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
// TODO Clone is only used for testing purposes and should get removed later.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct EventAddRes {
|
||||
pub data_type: u16,
|
||||
pub data_count: u16,
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
use crate::ca::conn::ConnCommand;
|
||||
use crate::ca::IngestCommons;
|
||||
use crate::ca::{ExtraInsertsConf, IngestCommons};
|
||||
use axum::extract::Query;
|
||||
use http::request::Parts;
|
||||
use log::*;
|
||||
use std::collections::HashMap;
|
||||
use std::net::SocketAddrV4;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::Arc;
|
||||
|
||||
async fn get_empty() -> String {
|
||||
@@ -73,6 +73,7 @@ async fn channel_add(params: HashMap<String, String>, ingest_commons: Arc<Ingest
|
||||
ingest_commons.insert_ivl_min.clone(),
|
||||
ingest_commons.conn_stats.clone(),
|
||||
ingest_commons.command_queue_set.clone(),
|
||||
ingest_commons.clone(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
@@ -214,12 +215,18 @@ async fn channel_states(
|
||||
axum::Json(res)
|
||||
}
|
||||
|
||||
pub async fn start_metrics_service(
|
||||
bind_to: String,
|
||||
insert_frac: Arc<AtomicU64>,
|
||||
insert_ivl_min: Arc<AtomicU64>,
|
||||
ingest_commons: Arc<IngestCommons>,
|
||||
) {
|
||||
async fn extra_inserts_conf_set(v: ExtraInsertsConf, ingest_commons: Arc<IngestCommons>) -> axum::Json<bool> {
|
||||
*ingest_commons.extra_inserts_conf.lock().unwrap() = v.clone();
|
||||
let g = ingest_commons.command_queue_set.queues_locked().await;
|
||||
let mut it = g.iter();
|
||||
let rxs = send_command(&mut it, || ConnCommand::extra_inserts_conf_set(v.clone())).await;
|
||||
for rx in rxs {
|
||||
let _item = rx.recv().await.unwrap();
|
||||
}
|
||||
axum::Json(true)
|
||||
}
|
||||
|
||||
pub async fn start_metrics_service(bind_to: String, ingest_commons: Arc<IngestCommons>) {
|
||||
use axum::routing::{get, put};
|
||||
use axum::{extract, Router};
|
||||
let app = Router::new()
|
||||
@@ -276,14 +283,27 @@ pub async fn start_metrics_service(
|
||||
)
|
||||
.route(
|
||||
"/insert_frac",
|
||||
get(get_empty).put(|v: extract::Json<u64>| async move {
|
||||
insert_frac.store(v.0, Ordering::Release);
|
||||
get(get_empty).put({
|
||||
let insert_frac = ingest_commons.insert_frac.clone();
|
||||
|v: extract::Json<u64>| async move {
|
||||
insert_frac.store(v.0, Ordering::Release);
|
||||
}
|
||||
}),
|
||||
)
|
||||
.route(
|
||||
"/insert_ivl_min",
|
||||
put(|v: extract::Json<u64>| async move {
|
||||
insert_ivl_min.store(v.0, Ordering::Release);
|
||||
put({
|
||||
let insert_ivl_min = ingest_commons.insert_ivl_min.clone();
|
||||
|v: extract::Json<u64>| async move {
|
||||
insert_ivl_min.store(v.0, Ordering::Release);
|
||||
}
|
||||
}),
|
||||
)
|
||||
.route(
|
||||
"/extra_inserts_conf",
|
||||
put({
|
||||
let ingest_commons = ingest_commons.clone();
|
||||
|v: extract::Json<ExtraInsertsConf>| extra_inserts_conf_set(v.0, ingest_commons)
|
||||
}),
|
||||
)
|
||||
.fallback(
|
||||
|
||||
@@ -433,6 +433,8 @@ pub async fn zmtp_client(opts: ZmtpClientOpts) -> Result<(), Error> {
|
||||
for a in &opts.scylla {
|
||||
scy = scy.known_node(a);
|
||||
}
|
||||
// TODO use keyspace from configuration.
|
||||
err::todo();
|
||||
let scy = scy
|
||||
.use_keyspace("ks1", false)
|
||||
.build()
|
||||
|
||||
Reference in New Issue
Block a user