From b3bd344f5df4423ac28336b6d09b6540a1eaf2f8 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 6 Dec 2022 21:45:46 +0100 Subject: [PATCH] Rate limit enable option --- daqingest/src/bin/daqingest.rs | 4 +++- netfetch/src/ca.rs | 27 ++++++++++++++++++--------- netfetch/src/ca/conn.rs | 1 + netfetch/src/ca/findioc.rs | 22 +++++++++++++--------- netfetch/src/ca/search.rs | 5 +++-- netfetch/src/insertworker.rs | 6 +++++- 6 files changed, 43 insertions(+), 22 deletions(-) diff --git a/daqingest/src/bin/daqingest.rs b/daqingest/src/bin/daqingest.rs index d8abdbd..ee08a69 100644 --- a/daqingest/src/bin/daqingest.rs +++ b/daqingest/src/bin/daqingest.rs @@ -5,8 +5,10 @@ use log::*; pub fn main() -> Result<(), Error> { let opts = DaqIngestOpts::parse(); - info!("daqingest version {}", clap::crate_version!()); + // TODO offer again function to get runtime and configure tracing in one call let runtime = taskrun::get_runtime_opts(opts.nworkers.unwrap_or(12), 32); + taskrun::tracing_init().unwrap(); + info!("daqingest version {}", clap::crate_version!()); let res = runtime.block_on(async move { match opts.subcmd { SubCmd::Bsread(k) => netfetch::zmtp::zmtp_client(k.into()).await?, diff --git a/netfetch/src/ca.rs b/netfetch/src/ca.rs index 94bfdf3..e2e392c 100644 --- a/netfetch/src/ca.rs +++ b/netfetch/src/ca.rs @@ -63,6 +63,7 @@ struct ChannelConfig { local_epics_hostname: Option, store_workers_rate: Option, insert_frac: Option, + use_rate_limit_queue: Option, } #[test] @@ -103,7 +104,7 @@ pub struct ListenFromFileOpts { pub async fn parse_config(config: PathBuf) -> Result { let mut file = OpenOptions::new().read(true).open(config).await?; - let mut buf = vec![]; + let mut buf = Vec::new(); file.read_to_end(&mut buf).await?; let mut conf: ChannelConfig = serde_yaml::from_slice(&buf).map_err(|e| Error::with_msg_no_trace(format!("{:?}", e)))?; @@ -141,6 +142,7 @@ pub async fn parse_config(config: PathBuf) -> Result { local_epics_hostname: conf.local_epics_hostname.unwrap_or_else(local_hostname), store_workers_rate: conf.store_workers_rate.unwrap_or(10000), insert_frac: conf.insert_frac.unwrap_or(1000), + use_rate_limit_queue: conf.use_rate_limit_queue.unwrap_or(false), }) } @@ -163,6 +165,7 @@ pub struct CaConnectOpts { pub local_epics_hostname: String, pub store_workers_rate: u64, pub insert_frac: u64, + pub use_rate_limit_queue: bool, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -209,7 +212,7 @@ pub async fn find_channel_addr( let pg_client = Arc::new(pg_client); let qu_find_addr = pg_client .prepare( - "select t1.facility, t1.channel, t1.addr from ioc_by_channel t1 where t1.facility = $1 and t1.channel = $2", + "select t1.facility, t1.channel, t1.addr from ioc_by_channel_log t1 where t1.facility = $1 and t1.channel = $2 and addr is not null order by tsmod desc limit 1", ) .await .err_conv()?; @@ -219,16 +222,16 @@ pub async fn find_channel_addr( Err(Error::with_msg_no_trace(format!("no address for channel {}", name))) } else { for row in rows { - let addr: &str = row.get(2); - if addr == "" { - return Err(Error::with_msg_no_trace(format!("no address for channel {}", name))); - } else { - match addr.parse::() { + match row.try_get::<_, &str>(2) { + Ok(addr) => match addr.parse::() { Ok(addr) => return Ok(Some(addr)), Err(e) => { error!("can not parse {e:?}"); return Err(Error::with_msg_no_trace(format!("no address for channel {}", name))); } + }, + Err(e) => { + error!("can not find addr for {name} {e:?}"); } } } @@ -241,7 +244,7 @@ async fn query_addr_multiple(pg_client: &PgClient) -> Result<(), Error> { let backend: &String = err::todoval(); // TODO factor the find loop into a separate Stream. let qu_find_addr = pg_client - .prepare("with q1 as (select t1.facility, t1.channel, t1.addr from ioc_by_channel t1 where t1.facility = $1 and t1.channel in ($2, $3, $4, $5, $6, $7, $8, $9) and t1.addr != '' order by t1.tsmod desc) select distinct on (q1.facility, q1.channel) q1.facility, q1.channel, q1.addr from q1") + .prepare("with q1 as (select t1.facility, t1.channel, t1.addr from ioc_by_channel_log t1 where t1.facility = $1 and t1.channel in ($2, $3, $4, $5, $6, $7, $8, $9) and t1.addr is not null order by t1.tsmod desc) select distinct on (q1.facility, q1.channel) q1.facility, q1.channel, q1.addr from q1") .await .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; let mut chns_todo: &[String] = err::todoval(); @@ -308,9 +311,13 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { // TODO use a new type: let local_stats = Arc::new(CaConnStats::new()); + info!("fetch phonebook begin"); // Fetch all addresses for all channels. let rows = pg_client - .query("select channel, addr from ioc_by_channel", &[]) + .query( + "select distinct on (facility, channel) channel, addr from ioc_by_channel_log where channel is not null and addr is not null order by facility, channel, tsmod desc", + &[], + ) .await .err_conv()?; let mut phonebook = BTreeMap::new(); @@ -322,6 +329,7 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { .map_err(|_| Error::with_msg_no_trace(format!("can not parse address {addr}")))?; phonebook.insert(channel, addr); } + info!("fetch phonebook done"); let mut channels_by_host = BTreeMap::new(); @@ -352,6 +360,7 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { ingest_commons.clone(), pg_client.clone(), store_stats.clone(), + opts.use_rate_limit_queue, ) .await?; diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 9eda047..970ce1c 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -1250,6 +1250,7 @@ impl CaConn { do_wake_again = true; } CaMsgTy::EventAddRes(k) => { + trace!("got EventAddRes: {k:?}"); self.stats.caconn_recv_data_inc(); let res = Self::handle_event_add_res(self, k, tsnow); let ts2 = Instant::now(); diff --git a/netfetch/src/ca/findioc.rs b/netfetch/src/ca/findioc.rs index 3f510d3..618d673 100644 --- a/netfetch/src/ca/findioc.rs +++ b/netfetch/src/ca/findioc.rs @@ -259,7 +259,7 @@ impl FindIocStream { } let mut nb = crate::netbuf::NetBuf::new(2048); nb.put_slice(&buf[..ec as usize])?; - let mut msgs = vec![]; + let mut msgs = Vec::new(); let mut accounted = 0; loop { let n = nb.data().len(); @@ -291,10 +291,14 @@ impl FindIocStream { if msgs.len() != 2 { info!("expect always 2 commands in the response, instead got {}", msgs.len()); } - let mut res = vec![]; + for m in &msgs { + debug!("m: {m:?}"); + } + let mut res = Vec::new(); for msg in msgs.iter() { match &msg.ty { CaMsgTy::SearchRes(k) => { + info!("SearchRes: {k:?}"); let addr = SocketAddrV4::new(src_addr, k.tcp_port); res.push((SearchId(k.id), addr)); } @@ -330,8 +334,8 @@ impl FindIocStream { fn create_in_flight(&mut self) { let bid = BATCH_ID.fetch_add(1, Ordering::AcqRel); let bid = BatchId(bid as u32); - let mut sids = vec![]; - let mut chs = vec![]; + let mut sids = Vec::new(); + let mut chs = Vec::new(); while chs.len() < self.channels_per_batch && self.channels_input.len() > 0 { let sid = SEARCH_ID2.fetch_add(1, Ordering::AcqRel); let sid = SearchId(sid as u32); @@ -344,14 +348,14 @@ impl FindIocStream { channels: chs, tgts: self.tgts.iter().enumerate().map(|x| x.0).collect(), sids, - done: vec![], + done: Vec::new(), }; self.in_flight.insert(bid.clone(), batch); self.batch_send_queue.push_back(bid); } fn handle_result(&mut self, src: SocketAddrV4, res: Vec<(SearchId, SocketAddrV4)>) { - let mut sids_remove = vec![]; + let mut sids_remove = Vec::new(); for (sid, addr) in res { self.sids_done.insert(sid.clone(), ()); match self.bid_by_sid.get(&sid) { @@ -420,9 +424,9 @@ impl FindIocStream { fn clear_timed_out(&mut self) { let now = Instant::now(); - let mut bids = vec![]; - let mut sids = vec![]; - let mut chns = vec![]; + let mut bids = Vec::new(); + let mut sids = Vec::new(); + let mut chns = Vec::new(); for (bid, batch) in &mut self.in_flight { if now.duration_since(batch.ts_beg) > self.batch_run_max { self.bids_timed_out.insert(bid.clone(), ()); diff --git a/netfetch/src/ca/search.rs b/netfetch/src/ca/search.rs index 661cbb1..2c711ae 100644 --- a/netfetch/src/ca/search.rs +++ b/netfetch/src/ca/search.rs @@ -46,6 +46,7 @@ async fn resolve_address(addr_str: &str) -> Result { } pub async fn ca_search(opts: ListenFromFileOpts) -> Result<(), Error> { + info!("ca_search begin"); let facility = "scylla"; let opts = parse_config(opts.config).await?; let d = Database { @@ -74,7 +75,7 @@ pub async fn ca_search(opts: ListenFromFileOpts) -> Result<(), Error> { .await .unwrap() }; - let mut addrs = vec![]; + let mut addrs = Vec::new(); for s in &opts.search { match resolve_address(s).await { Ok(addr) => { @@ -87,7 +88,7 @@ pub async fn ca_search(opts: ListenFromFileOpts) -> Result<(), Error> { } } let gw_addrs = { - let mut gw_addrs = vec![]; + let mut gw_addrs = Vec::new(); for s in &opts.search_blacklist { match resolve_address(s).await { Ok(addr) => { diff --git a/netfetch/src/insertworker.rs b/netfetch/src/insertworker.rs index 4f2a5a2..9d5599d 100644 --- a/netfetch/src/insertworker.rs +++ b/netfetch/src/insertworker.rs @@ -51,6 +51,7 @@ pub async fn spawn_scylla_insert_workers( ingest_commons: Arc, pg_client: Arc, store_stats: Arc, + use_rate_limit_queue: bool, ) -> Result>, Error> { let (q2_tx, q2_rx) = async_channel::bounded(insert_item_queue.receiver().capacity().unwrap_or(20000)); { @@ -59,6 +60,9 @@ pub async fn spawn_scylla_insert_workers( let recv = insert_item_queue.receiver(); let store_stats = store_stats.clone(); let fut = async move { + if !use_rate_limit_queue { + return; + } let mut ts_forward_last = Instant::now(); let mut ivl_ema = stats::Ema64::with_k(0.00001); loop { @@ -113,7 +117,7 @@ pub async fn spawn_scylla_insert_workers( for i1 in 0..insert_worker_count { let data_store = data_stores[i1 * data_stores.len() / insert_worker_count].clone(); let stats = store_stats.clone(); - let recv = if true { + let recv = if use_rate_limit_queue { q2_rx.clone() } else { insert_item_queue.receiver()