Ingest swissfel ca until before electric test

This commit is contained in:
Dominik Werder
2023-01-06 16:09:25 +01:00
parent ec205ee9df
commit a5c927538e
7 changed files with 179 additions and 36 deletions

View File

@@ -25,5 +25,6 @@ serde = { version = "1.0", features = ["derive"] }
err = { path = "../../daqbuffer/err" }
log = { path = "../log" }
netpod = { path = "../../daqbuffer/netpod" }
stats = { path = "../stats" }
netfetch = { path = "../netfetch" }
taskrun = { path = "../../daqbuffer/taskrun" }

View File

@@ -29,15 +29,16 @@ use std::sync::Arc;
use std::time::Duration;
use std::time::SystemTime;
use tokio_postgres::Client as PgClient;
use tokio_postgres::Statement as PgStatement;
const CHECK_CHANS_PER_TICK: usize = 10000;
const FINDER_TIMEOUT: usize = 100;
const TIMEOUT_WARN: usize = 3000;
const FINDER_JOB_QUEUE_LEN_MAX: usize = 20;
const FINDER_IN_FLIGHT_MAX: usize = 200;
const FINDER_IN_FLIGHT_MAX: usize = 400;
const FINDER_BATCH_SIZE: usize = 8;
const CURRENT_SEARCH_PENDING_MAX: usize = 220;
const CURRENT_SEARCH_PENDING_MAX: usize = 420;
const SEARCH_PENDING_TIMEOUT: usize = 10000;
const TIMEOUT_WARN_FACTOR: usize = 10;
#[derive(Clone, Debug, Serialize, PartialEq, PartialOrd, Eq, Ord)]
pub struct Channel {
@@ -180,6 +181,13 @@ pub async fn make_pg_client(d: &Database) -> Result<PgClient, Error> {
Ok(client)
}
struct AddrInsertJob {
backend: String,
channel: Channel,
response_addr: SocketAddrV4,
addr: SocketAddrV4,
}
pub struct Daemon {
opts: DaemonOpts,
channel_states: BTreeMap<Channel, ChannelState>,
@@ -198,19 +206,79 @@ pub struct Daemon {
count_unassigned: usize,
count_assigned: usize,
last_status_print: SystemTime,
insert_workers_jh: Vec<tokio::task::JoinHandle<()>>,
#[allow(unused)]
pg_client: Arc<PgClient>,
#[allow(unused)]
qu_addr_insert: PgStatement,
ioc_addr_inserter_jh: tokio::task::JoinHandle<()>,
ioc_addr_inserter_tx: Sender<AddrInsertJob>,
}
impl Daemon {
pub async fn new(opts: DaemonOpts) -> Result<Self, Error> {
let pg_client = Arc::new(make_pg_client(&opts.pgconf).await?);
let datastore = DataStore::new(&opts.scyconf, pg_client).await?;
let datastore = DataStore::new(&opts.scyconf, pg_client.clone()).await?;
let datastore = Arc::new(datastore);
let (tx, rx) = async_channel::bounded(32);
let tgts = opts.search_tgts.clone();
let (search_tx, ioc_finder_jh) = Self::start_finder(tx.clone(), tgts);
let common_insert_item_queue = Arc::new(CommonInsertItemQueue::new(opts.insert_item_queue_cap));
let common_insert_item_queue_2 = Arc::new(CommonInsertItemQueue::new(opts.insert_item_queue_cap));
let insert_queue_counter = Arc::new(AtomicUsize::new(0));
// Insert queue hook
if true {
tokio::spawn({
let rx = common_insert_item_queue.receiver();
let tx = common_insert_item_queue_2.sender().await;
let insert_queue_counter = insert_queue_counter.clone();
async move {
let mut printed_last = SystemTime::now();
let mut histo = BTreeMap::new();
while let Ok(item) = rx.recv().await {
insert_queue_counter.fetch_add(1, atomic::Ordering::AcqRel);
//trace!("insert queue item {item:?}");
match &item {
netfetch::store::QueryItem::Insert(item) => {
item.pulse;
histo
.entry(item.series.clone())
.and_modify(|(c, msp, lsp, pulse)| {
*c += 1;
*msp = item.ts_msp;
*lsp = item.ts_lsp;
*pulse = item.pulse;
})
.or_insert((0 as usize, item.ts_msp, item.ts_lsp, item.pulse));
}
_ => {}
}
match tx.send(item).await {
Ok(_) => {}
Err(e) => {
error!("insert queue hook send {e}");
break;
}
}
let tsnow = SystemTime::now();
if tsnow.duration_since(printed_last).unwrap_or(Duration::ZERO) >= Duration::from_millis(2000) {
printed_last = tsnow;
let mut all: Vec<_> = histo
.iter()
.map(|(k, (c, msp, lsp, pulse))| (usize::MAX - *c, k.clone(), *msp, *lsp, *pulse))
.collect();
all.sort_unstable();
for (c, sid, msp, lsp, pulse) in all.into_iter().take(4) {
info!("{:10} {:20} {:14} {:20} {:?}", usize::MAX - c, msp, lsp, pulse, sid);
}
histo.clear();
}
}
}
});
}
let ingest_commons = netfetch::ca::IngestCommons {
pgconf: Arc::new(opts.pgconf.clone()),
backend: opts.backend().into(),
@@ -223,19 +291,74 @@ impl Daemon {
insert_frac: AtomicU64::new(1000),
ca_conn_set: netfetch::ca::connset::CaConnSet::new(),
};
let _ingest_commons = Arc::new(ingest_commons);
let ingest_commons = Arc::new(ingest_commons);
// TODO hook up with insert worker
tokio::spawn({
let rx = common_insert_item_queue.receiver();
let insert_queue_counter = insert_queue_counter.clone();
let qu_addr_insert = {
const TEXT: tokio_postgres::types::Type = tokio_postgres::types::Type::TEXT;
let sql = "insert into ioc_by_channel_log (facility, channel, responseaddr, addr) values ($1, $2, $3, $4)";
let res = pg_client
.prepare_typed(sql, &[TEXT, TEXT, TEXT, TEXT])
.await
.map_err(|e| Error::from(e.to_string()))?;
res
};
let insert_scylla_sessions = 1;
let insert_worker_count = 1000;
let use_rate_limit_queue = false;
// TODO use a new stats type:
let store_stats = Arc::new(stats::CaConnStats::new());
let ttls = netfetch::insertworker::Ttls {
index: Duration::from_secs(60 * 60 * 24 * 4),
d0: Duration::from_secs(60 * 60 * 24 * 3),
d1: Duration::from_secs(60 * 60 * 4),
};
let jh_insert_workers = netfetch::insertworker::spawn_scylla_insert_workers(
opts.scyconf.clone(),
insert_scylla_sessions,
insert_worker_count,
common_insert_item_queue_2.clone(),
ingest_commons.clone(),
pg_client.clone(),
store_stats.clone(),
use_rate_limit_queue,
ttls,
)
.await?;
let (ioc_addr_inserter_tx, ioc_addr_inserter_rx) = async_channel::bounded(64);
let fut = {
let pg_client = pg_client.clone();
let qu_addr_insert = qu_addr_insert.clone();
async move {
while let Ok(item) = rx.recv().await {
insert_queue_counter.fetch_add(1, atomic::Ordering::AcqRel);
trace!("insert queue item {item:?}");
let mut stream = ioc_addr_inserter_rx
.map(|k: AddrInsertJob| {
let pgc = pg_client.clone();
let qu_addr_insert = qu_addr_insert.clone();
async move {
let response_addr = k.response_addr.to_string();
let addr = k.addr.to_string();
let res = pgc
.execute(&qu_addr_insert, &[&k.backend, &k.channel.id(), &response_addr, &addr])
.await
.map_err(|e| Error::from(e.to_string()))?;
Ok::<_, Error>(res)
}
})
.buffer_unordered(16);
while let Some(item) = stream.next().await {
match item {
Ok(_) => {}
Err(e) => {
error!("{e}");
}
}
}
}
});
};
let ioc_addr_inserter_task = tokio::spawn(fut);
let ret = Self {
opts,
channel_states: BTreeMap::new(),
@@ -254,6 +377,11 @@ impl Daemon {
count_unassigned: 0,
count_assigned: 0,
last_status_print: SystemTime::now(),
insert_workers_jh: jh_insert_workers,
pg_client,
qu_addr_insert,
ioc_addr_inserter_jh: ioc_addr_inserter_task,
ioc_addr_inserter_tx,
};
Ok(ret)
}
@@ -622,7 +750,7 @@ impl Daemon {
Ok(())
}
fn handle_search_done(&mut self, item: Result<VecDeque<FindIocRes>, Error>) -> Result<(), Error> {
async fn handle_search_done(&mut self, item: Result<VecDeque<FindIocRes>, Error>) -> Result<(), Error> {
//debug!("handle SearchDone: {res:?}");
match item {
Ok(a) => {
@@ -633,7 +761,7 @@ impl Daemon {
if let Some(st) = self.channel_states.get_mut(&ch) {
if let ChannelStateValue::Active(ActiveChannelState::SearchPending { since }) = &st.value {
let dt = SystemTime::now().duration_since(*since).unwrap();
if dt > Duration::from_millis(FINDER_TIMEOUT as u64 * TIMEOUT_WARN_FACTOR as u64) {
if dt > Duration::from_millis(TIMEOUT_WARN as u64) {
warn!(
" FOUND {:5.0} {:5.0} {addr}",
1e3 * dt.as_secs_f32(),
@@ -647,6 +775,15 @@ impl Daemon {
},
});
st.value = stnew;
if let (Some(response_addr),) = (res.response_addr,) {
let job = AddrInsertJob {
backend: self.opts.backend().into(),
channel: ch.clone(),
response_addr: response_addr,
addr: addr,
};
self.ioc_addr_inserter_tx.send(job).await?;
}
} else {
warn!(
"address found, but state for {ch:?} is not SearchPending: {:?}",
@@ -662,7 +799,7 @@ impl Daemon {
if let Some(st) = self.channel_states.get_mut(&ch) {
if let ChannelStateValue::Active(ActiveChannelState::SearchPending { since }) = &st.value {
let dt = SystemTime::now().duration_since(*since).unwrap();
if dt > Duration::from_millis(FINDER_TIMEOUT as u64 * TIMEOUT_WARN_FACTOR as u64) {
if dt > Duration::from_millis(TIMEOUT_WARN as u64) {
warn!(
"NOT FOUND {:5.0} {:5.0}",
1e3 * dt.as_secs_f32(),
@@ -692,7 +829,7 @@ impl Daemon {
TimerTick => self.handle_timer_tick().await,
ChannelAdd(ch) => self.handle_channel_add(ch),
ChannelRemove(ch) => self.handle_channel_remove(ch),
SearchDone(item) => self.handle_search_done(item),
SearchDone(item) => self.handle_search_done(item).await,
}
}
@@ -727,8 +864,11 @@ impl Daemon {
}
}
}
warn!("TODO shut down IOC finder properly");
warn!("TODO wait for IOC finder properly");
let _ = &self.ioc_finder_jh;
warn!("TODO wait for insert workers");
let _ = &self.insert_workers_jh;
let _ = &self.ioc_addr_inserter_jh;
Ok(())
}
}

View File

@@ -11,7 +11,7 @@ path = "src/netfetch.rs"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_cbor = "0.11"
serde_yaml = "0.8.23"
serde_yaml = "0.9.16"
tokio = { version = "1.23.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs", "tracing"] }
tokio-stream = { version = "0.1", features = ["fs"]}
async-channel = "1.6"

View File

@@ -199,6 +199,11 @@ pub async fn ca_connect(opts: CaIngestOpts, channels: &Vec<String>) -> Result<()
// TODO use a new stats type:
let store_stats = Arc::new(CaConnStats::new());
let ttls = crate::insertworker::Ttls {
index: opts.ttl_index(),
d0: opts.ttl_d0(),
d1: opts.ttl_d1(),
};
let jh_insert_workers = spawn_scylla_insert_workers(
opts.scylla().clone(),
opts.insert_scylla_sessions(),
@@ -208,7 +213,7 @@ pub async fn ca_connect(opts: CaIngestOpts, channels: &Vec<String>) -> Result<()
pg_client.clone(),
store_stats.clone(),
opts.use_rate_limit_queue(),
opts.clone(),
ttls,
)
.await?;

View File

@@ -45,7 +45,6 @@ struct SearchBatch {
#[derive(Debug)]
pub struct FindIocRes {
pub channel: String,
pub query_addr: Option<SocketAddrV4>,
pub response_addr: Option<SocketAddrV4>,
pub addr: Option<SocketAddrV4>,
pub dt: Duration,
@@ -392,8 +391,6 @@ impl FindIocStream {
let dt = tsnow.saturating_duration_since(batch.ts_beg);
let res = FindIocRes {
channel: ch.into(),
// TODO associate a batch with a specific query address.
query_addr: None,
response_addr: Some(src.clone()),
addr: Some(addr),
dt,
@@ -463,7 +460,6 @@ impl FindIocStream {
}
for ((sid, ch), dt) in sids.into_iter().zip(chns).zip(dts) {
let res = FindIocRes {
query_addr: None,
response_addr: None,
channel: ch,
addr: None,

View File

@@ -60,8 +60,8 @@ pub async fn ca_search(opts: CaIngestOpts, channels: &Vec<String>) -> Result<(),
const TEXT: tokio_postgres::types::Type = tokio_postgres::types::Type::TEXT;
pg_client
.prepare_typed(
"insert into ioc_by_channel_log (facility, channel, queryaddr, responseaddr, addr) values ($1, $2, $3, $4, $5)",
&[TEXT, TEXT, TEXT, TEXT, TEXT],
"insert into ioc_by_channel_log (facility, channel, responseaddr, addr) values ($1, $2, $3, $4)",
&[TEXT, TEXT, TEXT, TEXT],
)
.await
.unwrap()
@@ -154,14 +154,10 @@ pub async fn ca_search(opts: CaIngestOpts, channels: &Vec<String>) -> Result<(),
if do_block {
info!("blacklisting {item:?}");
} else {
let queryaddr = item.query_addr.map(|x| x.to_string());
let responseaddr = item.response_addr.map(|x| x.to_string());
let addr = item.addr.map(|x| x.to_string());
pg_client
.execute(
&qu_insert,
&[&opts.backend(), &item.channel, &queryaddr, &responseaddr, &addr],
)
.execute(&qu_insert, &[&opts.backend(), &item.channel, &responseaddr, &addr])
.await
.unwrap();
}

View File

@@ -1,6 +1,5 @@
use crate::ca::store::DataStore;
use crate::ca::IngestCommons;
use crate::conf::CaIngestOpts;
use crate::rt::JoinHandle;
use crate::store::{CommonInsertItemQueue, IntoSimplerError, QueryItem};
use err::Error;
@@ -44,6 +43,12 @@ async fn back_off_sleep(backoff_dt: &mut Duration) {
tokio::time::sleep(*backoff_dt).await;
}
pub struct Ttls {
pub index: Duration,
pub d0: Duration,
pub d1: Duration,
}
pub async fn spawn_scylla_insert_workers(
scyconf: ScyllaConfig,
insert_scylla_sessions: usize,
@@ -53,7 +58,7 @@ pub async fn spawn_scylla_insert_workers(
pg_client: Arc<PgClient>,
store_stats: Arc<stats::CaConnStats>,
use_rate_limit_queue: bool,
opts: CaIngestOpts,
ttls: Ttls,
) -> Result<Vec<JoinHandle<()>>, Error> {
let (q2_tx, q2_rx) = async_channel::bounded(insert_item_queue.receiver().capacity().unwrap_or(20000));
{
@@ -125,9 +130,9 @@ pub async fn spawn_scylla_insert_workers(
insert_item_queue.receiver()
};
let ingest_commons = ingest_commons.clone();
let ttl_msp = opts.ttl_index();
let ttl_0d = opts.ttl_d0();
let ttl_1d = opts.ttl_d1();
let ttl_msp = ttls.index;
let ttl_0d = ttls.d0;
let ttl_1d = ttls.d1;
let fut = async move {
let backoff_0 = Duration::from_millis(10);
let mut backoff = backoff_0.clone();