Refactor scylla config and default ttls

This commit is contained in:
Dominik Werder
2024-02-20 09:44:38 +01:00
parent ce60445af3
commit 7beb5a9ced
17 changed files with 307 additions and 315 deletions

View File

@@ -478,7 +478,7 @@ impl CaConnSet {
}
async fn run(mut this: CaConnSet) -> Result<(), Error> {
debug!("CaConnSet run begin");
trace!("CaConnSet run begin");
loop {
let x = this.next().await;
match x {
@@ -486,19 +486,14 @@ impl CaConnSet {
None => break,
}
}
// debug!(
// "search_tx sender {} receiver {}",
// this.find_ioc_query_tx.sender_count(),
// this.find_ioc_query_tx.receiver_count()
// );
debug!("CaConnSet EndOfStream");
debug!("join ioc_finder_jh A {:?}", this.find_ioc_query_sender.len());
trace!("CaConnSet EndOfStream");
trace!("join ioc_finder_jh A {:?}", this.find_ioc_query_sender.len());
this.find_ioc_query_sender.as_mut().drop();
debug!("join ioc_finder_jh B {:?}", this.find_ioc_query_sender.len());
trace!("join ioc_finder_jh B {:?}", this.find_ioc_query_sender.len());
this.ioc_finder_jh
.await
.map_err(|e| Error::with_msg_no_trace(e.to_string()))??;
debug!("joined ioc_finder_jh");
trace!("joined ioc_finder_jh");
this.connset_out_tx.close();
this.connset_inp_rx.close();
this.shutdown_done = true;

View File

@@ -96,7 +96,10 @@ async fn finder_full(
));
let jh2 = taskrun::spawn(finder_network_if_not_found(rx1, tx, opts.clone(), stats));
jh1.await??;
trace!("finder::finder_full awaited A");
jh2.await??;
trace!("finder::finder_full awaited B");
trace!("finder::finder_full done");
Ok(())
}
@@ -108,21 +111,28 @@ async fn finder_worker(
stats: Arc<IocFinderStats>,
) -> Result<(), Error> {
// TODO do something with join handle
let (batch_rx, jh) = batchtools::batcher::batch(
let (batch_rx, jh_batch) = batchtools::batcher::batch(
SEARCH_BATCH_MAX,
Duration::from_millis(200),
SEARCH_DB_PIPELINE_LEN,
qrx,
);
let mut jhs = Vec::new();
for _ in 0..SEARCH_DB_PIPELINE_LEN {
// TODO use join handle
tokio::spawn(finder_worker_single(
let jh = tokio::spawn(finder_worker_single(
batch_rx.clone(),
tx.clone(),
backend.clone(),
db.clone(),
stats.clone(),
));
jhs.push(jh);
}
jh_batch.await?;
trace!("finder_worker jh_batch awaited");
for (i, jh) in jhs.into_iter().enumerate() {
jh.await??;
trace!("finder_worker single {i} awaited");
}
Ok(())
}
@@ -165,17 +175,7 @@ async fn finder_worker_single(
dt.as_secs_f32() * 1e3
);
if dt > Duration::from_millis(5000) {
let mut out = String::from("[");
for e in &batch {
if out.len() > 1 {
out.push_str(", ");
}
out.push('\'');
out.push_str(e.name());
out.push('\'');
}
out.push(']');
trace!("very slow query\n{out}");
warn!("very slow query");
}
match qres {
Ok(rows) => {
@@ -237,8 +237,9 @@ async fn finder_worker_single(
Err(_e) => break,
}
}
debug!("finder_worker_single done");
drop(pg);
jh.await?.map_err(|e| Error::from_string(e))?;
trace!("finder_worker_single done");
Ok(())
}
@@ -248,13 +249,14 @@ async fn finder_network_if_not_found(
opts: CaIngestOpts,
stats: Arc<IocFinderStats>,
) -> Result<(), Error> {
let (net_tx, net_rx, jh, jhs) = ca_search_workers_start(&opts, stats.clone()).await.unwrap();
let self_name = "finder_network_if_not_found";
let (net_tx, net_rx, jh_ca_search) = ca_search_workers_start(&opts, stats.clone()).await?;
let jh2 = taskrun::spawn(process_net_result(net_rx, tx.clone(), opts.clone()));
'outer: while let Ok(item) = rx.recv().await {
let mut res = VecDeque::new();
let mut net = VecDeque::new();
for e in item {
trace!("finder_network_if_not_found sees {e:?}");
trace!("{self_name} sees {e:?}");
if e.addr.is_none() {
net.push_back(e.channel);
} else {
@@ -262,20 +264,22 @@ async fn finder_network_if_not_found(
}
}
if let Err(_) = tx.send(res).await {
debug!("{self_name} res send error, break");
break;
}
for ch in net {
if let Err(_) = net_tx.send(ch).await {
debug!("{self_name} net ch send error, break");
break 'outer;
}
}
}
for jh in jhs {
jh.await??;
}
jh.await??;
drop(net_tx);
trace!("{self_name} loop end");
jh_ca_search.await??;
trace!("{self_name} jh_ca_search awaited");
jh2.await??;
debug!("finder_network_if_not_found done");
trace!("{self_name} process_net_result awaited");
Ok(())
}
@@ -290,9 +294,13 @@ async fn process_net_result(
let mut index_worker_pg_jh = Vec::new();
for _ in 0..IOC_SEARCH_INDEX_WORKER_COUNT {
let backend = opts.backend().into();
let (pg, jh) = dbpg::conn::make_pg_client(opts.postgresql_config()).await.unwrap();
let (pg, jh) = dbpg::conn::make_pg_client(opts.postgresql_config())
.await
.map_err(Error::from_string)?;
index_worker_pg_jh.push(jh);
let worker = IocSearchIndexWorker::prepare(dbrx.clone(), backend, pg).await.unwrap();
let worker = IocSearchIndexWorker::prepare(dbrx.clone(), backend, pg)
.await
.map_err(Error::from_string)?;
let jh = tokio::spawn(async move { worker.worker().await });
ioc_search_index_worker_jhs.push(jh);
}
@@ -317,6 +325,13 @@ async fn process_net_result(
}
}
}
trace!("process_net_result break loop");
dbtx.close();
trace!("process_net_result dbtx closed");
for (i, jh) in ioc_search_index_worker_jhs.into_iter().enumerate() {
jh.await?;
trace!("process_net_result search index worker {i} awaited");
}
Ok(())
}

View File

@@ -104,7 +104,8 @@ pub struct FindIocStream {
bids_timed_out: BTreeMap<BatchId, ()>,
sids_done: BTreeMap<SearchId, ()>,
result_for_done_sid_count: u64,
sleeper: Pin<Box<dyn Future<Output = ()> + Send>>,
sleep_count: u8,
sleeper: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
#[allow(unused)]
thr_msg_0: ThrottleTrace,
#[allow(unused)]
@@ -145,7 +146,8 @@ impl FindIocStream {
in_flight_max,
channels_per_batch: batch_size,
batch_run_max,
sleeper: Box::pin(tokio::time::sleep(Duration::from_millis(500))),
sleep_count: 0,
sleeper: Some(Box::pin(tokio::time::sleep(Duration::from_millis(500)))),
thr_msg_0: ThrottleTrace::new(Duration::from_millis(1000)),
thr_msg_1: ThrottleTrace::new(Duration::from_millis(1000)),
thr_msg_2: ThrottleTrace::new(Duration::from_millis(1000)),
@@ -155,7 +157,8 @@ impl FindIocStream {
pub fn quick_state(&self) -> String {
format!(
"channels_input {} in_flight {} bid_by_sid {} out_queue {} result_for_done_sid_count {} bids_timed_out {}",
"channels_input {} {} in_flight {} bid_by_sid {} out_queue {} result_for_done_sid_count {} bids_timed_out {}",
self.channels_input.is_closed(),
self.channels_input.len(),
self.in_flight.len(),
self.bid_by_sid.len(),
@@ -562,7 +565,10 @@ impl FindIocStream {
}
fn ready_for_end_of_stream(&self) -> bool {
self.channels_input.is_closed() && self.in_flight.is_empty() && self.out_queue.is_empty()
self.channels_input.is_closed()
&& self.channels_input.is_empty()
&& self.in_flight.is_empty()
&& self.out_queue.is_empty()
}
}
@@ -571,6 +577,9 @@ impl Stream for FindIocStream {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
if self.channels_input.is_closed() {
debug!("{}", self.quick_state());
}
// self.thr_msg_0.trigger("FindIocStream::poll_next", &[]);
match self.ping.poll_unpin(cx) {
Ready(_) => {
@@ -582,7 +591,7 @@ impl Stream for FindIocStream {
self.clear_timed_out();
loop {
let mut have_progress = false;
if self.out_queue.is_empty() == false {
if !self.out_queue.is_empty() {
let ret = std::mem::replace(&mut self.out_queue, VecDeque::new());
break Ready(Some(Ok(ret)));
}
@@ -675,39 +684,69 @@ impl Stream for FindIocStream {
}
}
}
if self.ready_for_end_of_stream() {
// debug!("ready_for_end_of_stream but in late part");
}
break match self.afd.poll_read_ready(cx) {
Ready(Ok(mut g)) => match unsafe { Self::try_read(self.sock.0, &self.stats) } {
Ready(Ok((src, res))) => {
self.handle_result(src, res);
continue;
Ready(Ok(mut g)) => {
debug!("BLOCK AA");
match unsafe { Self::try_read(self.sock.0, &self.stats) } {
Ready(Ok((src, res))) => {
self.handle_result(src, res);
if self.ready_for_end_of_stream() {
debug!("ready_for_end_of_stream continue after handle_result");
}
continue;
}
Ready(Err(e)) => {
error!("Error from try_read {e:?}");
Ready(Some(Err(e)))
}
Pending => {
g.clear_ready();
if self.ready_for_end_of_stream() {
debug!("ready_for_end_of_stream continue after clear_ready");
}
continue;
}
}
Ready(Err(e)) => {
error!("Error from try_read {e:?}");
Ready(Some(Err(e)))
}
Pending => {
g.clear_ready();
continue;
}
},
}
Ready(Err(e)) => {
let e = Error::with_msg_no_trace(format!("{e:?}"));
error!("poll_read_ready {e:?}");
Ready(Some(Err(e)))
}
Pending => {
// debug!("BLOCK BB");
if have_progress {
if self.ready_for_end_of_stream() {
debug!("ready_for_end_of_stream continue after progress");
}
continue;
} else {
// debug!("BLOCK BC");
if self.ready_for_end_of_stream() {
match self.sleeper.poll_unpin(cx) {
Ready(_) => {
self.sleeper = Box::pin(tokio::time::sleep(Duration::from_millis(500)));
continue;
// debug!("BLOCK BD");
if let Some(fut) = self.sleeper.as_mut() {
match fut.poll_unpin(cx) {
Ready(()) => {
if self.sleep_count < 0 {
self.sleeper =
Some(Box::pin(tokio::time::sleep(Duration::from_millis(100))));
self.sleep_count += 1;
} else {
self.sleeper = None;
}
continue;
}
Pending => Pending,
}
Pending => Pending,
} else {
// debug!("BLOCK DONE");
Ready(None)
}
} else {
// debug!("BLOCK BE");
Pending
}
}

View File

@@ -179,7 +179,6 @@ pub async fn ca_search_workers_start(
Sender<String>,
Receiver<Result<VecDeque<FindIocRes>, Error>>,
JoinHandle<Result<(), Error>>,
Vec<JoinHandle<Result<(), Error>>>,
),
Error,
> {
@@ -189,8 +188,7 @@ pub async fn ca_search_workers_start(
let (out_tx, out_rx) = async_channel::bounded(256);
let finder = FindIocStream::new(inp_rx, search_tgts, blacklist, batch_run_max, 20, 16, stats);
let jh = taskrun::spawn(finder_run(finder, out_tx));
let jhs = Vec::new();
Ok((inp_tx, out_rx, jh, jhs))
Ok((inp_tx, out_rx, jh))
}
async fn search_tgts_from_opts(opts: &CaIngestOpts) -> Result<(Vec<SocketAddrV4>, Vec<SocketAddrV4>), Error> {
@@ -245,6 +243,6 @@ async fn finder_run(finder: FindIocStream, tx: Sender<Result<VecDeque<FindIocRes
break;
}
}
debug!("finder_run done");
trace!("finder_run done");
Ok(())
}

View File

@@ -1,8 +1,8 @@
use err::Error;
use netpod::log::*;
use netpod::Database;
use netpod::ScyllaConfig;
use regex::Regex;
use scywr::config::ScyllaIngestConfig;
use serde::Deserialize;
use serde::Serialize;
use std::collections::BTreeMap;
@@ -13,7 +13,7 @@ use taskrun::tokio;
use tokio::fs::OpenOptions;
use tokio::io::AsyncReadExt;
#[derive(Clone, Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Deserialize)]
pub struct CaIngestOpts {
backend: String,
channels: Option<PathBuf>,
@@ -23,11 +23,15 @@ pub struct CaIngestOpts {
search_blacklist: Vec<String>,
whitelist: Option<String>,
blacklist: Option<String>,
max_simul: Option<usize>,
#[allow(unused)]
#[serde(default, with = "humantime_serde")]
timeout: Option<Duration>,
postgresql: Database,
scylla: ScyllaConfig,
scylla: ScyllaIngestConfig,
#[serde(default)]
scylla_mt: Option<ScyllaIngestConfig>,
#[serde(default)]
scylla_lt: Option<ScyllaIngestConfig>,
array_truncate: Option<u64>,
insert_worker_count: Option<usize>,
insert_worker_concurrency: Option<usize>,
@@ -60,10 +64,18 @@ impl CaIngestOpts {
&self.postgresql
}
pub fn scylla_config(&self) -> &ScyllaConfig {
pub fn scylla_config(&self) -> &ScyllaIngestConfig {
&self.scylla
}
pub fn scylla_config_mt(&self) -> Option<&ScyllaIngestConfig> {
self.scylla_mt.as_ref()
}
pub fn scylla_config_lt(&self) -> Option<&ScyllaIngestConfig> {
self.scylla_lt.as_ref()
}
pub fn search(&self) -> &Vec<String> {
&self.search
}
@@ -76,28 +88,28 @@ impl CaIngestOpts {
Duration::from_millis(1200)
}
pub fn insert_scylla_sessions(&self) -> usize {
self.insert_scylla_sessions.unwrap_or(1)
}
pub fn insert_worker_count(&self) -> usize {
self.insert_worker_count.unwrap_or(4)
self.insert_worker_count.unwrap_or(8)
}
pub fn insert_worker_concurrency(&self) -> usize {
self.insert_worker_concurrency.unwrap_or(32)
}
pub fn insert_scylla_sessions(&self) -> usize {
self.insert_scylla_sessions.unwrap_or(1)
}
pub fn array_truncate(&self) -> u64 {
self.array_truncate.unwrap_or(512)
self.array_truncate.unwrap_or(1024 * 64)
}
pub fn insert_item_queue_cap(&self) -> usize {
self.insert_item_queue_cap.unwrap_or(80000)
self.insert_item_queue_cap.unwrap_or(1000 * 1000)
}
pub fn store_workers_rate(&self) -> u64 {
self.store_workers_rate.unwrap_or(5000)
self.store_workers_rate.unwrap_or(1000 * 500)
}
pub fn insert_frac(&self) -> u64 {
@@ -111,17 +123,19 @@ impl CaIngestOpts {
pub fn ttl_index(&self) -> Duration {
self.ttl_index
.clone()
.unwrap_or_else(|| Duration::from_secs(60 * 60 * 24 * 3))
.unwrap_or_else(|| Duration::from_secs(60 * 60 * 24 * 50))
}
pub fn ttl_d0(&self) -> Duration {
self.ttl_d0
.clone()
.unwrap_or_else(|| Duration::from_secs(60 * 60 * 24 * 1))
.unwrap_or_else(|| Duration::from_secs(60 * 60 * 24 * 40))
}
pub fn ttl_d1(&self) -> Duration {
self.ttl_d1.clone().unwrap_or_else(|| Duration::from_secs(60 * 60 * 12))
self.ttl_d1
.clone()
.unwrap_or_else(|| Duration::from_secs(60 * 60 * 24 * 10))
}
pub fn ttl_binned(&self) -> Duration {
@@ -158,7 +172,7 @@ scylla:
assert_eq!(conf.channels, Some(PathBuf::from("/some/path/file.txt")));
assert_eq!(&conf.api_bind, "0.0.0.0:3011");
assert_eq!(conf.search.get(0), Some(&"172.26.0.255".to_string()));
assert_eq!(conf.scylla.hosts.get(1), Some(&"sf-nube-12:19042".to_string()));
assert_eq!(conf.scylla.hosts().get(1), Some(&"sf-nube-12:19042".to_string()));
assert_eq!(conf.ttl_d1, Some(Duration::from_millis(1000 * (60 * 10 + 3) + 45)));
assert_eq!(conf.ttl_binned, None);
}
@@ -427,7 +441,7 @@ mod serde_option_channel_read_config {
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum ChannelReadConfig {
Monitor,
Poll(Duration),
@@ -455,6 +469,10 @@ CH-03:
archiving_configuration:
"###;
let x: BTreeMap<String, ChannelConfigParse> = serde_yaml::from_str(inp).unwrap();
assert_eq!(
x.get("CH-00").as_ref().unwrap().archiving_configuration.medium_term,
Some(ChannelReadConfig::Poll(Duration::from_millis(1000 * 60)))
);
}
#[derive(Debug)]