From 16aec2732b53510f04da4527d66314e22eec7226 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 28 May 2025 17:48:45 +0200 Subject: [PATCH] Log fix --- daqingest/src/daemon.rs | 22 +++++----- daqingest/src/daemon/inserthook.rs | 2 +- daqingest/src/tools.rs | 4 +- dbpg/src/findaddr.rs | 6 +-- dbpg/src/seriesbychannel.rs | 24 +++++------ log/src/log.rs | 18 ++++---- netfetch/src/ca/beacons.rs | 4 +- netfetch/src/ca/conn.rs | 66 ++++++++++++++++-------------- netfetch/src/ca/connset.rs | 22 +++++----- netfetch/src/ca/finder.rs | 12 +++--- netfetch/src/ca/findioc.rs | 24 +++++------ netfetch/src/ca/search.rs | 4 +- scywr/src/insertworker.rs | 14 +++---- 13 files changed, 115 insertions(+), 107 deletions(-) diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index f5c7c8e..c683ecd 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -526,7 +526,7 @@ impl Daemon { self.count_no_address, self.count_unassigned, self.count_assigned, - self.insert_queue_counter.load(atomic::Ordering::Acquire), + self.insert_queue_counter.load(atomic::Ordering::Acquire) ); } let iqtxm = self @@ -632,7 +632,7 @@ impl Daemon { self.daemon_metrics.caconnset_health_response().inc(); } Error(e) => { - error!("error from CaConnSet: {e}"); + error!("error from CaConnSet: {}", e); self.handle_shutdown().await?; } Metrics(x) => { @@ -723,7 +723,7 @@ impl Daemon { Ok(()) } Err(e) => { - error!("{e}"); + error!("handle_config_reload {}", e); if tx.send(127).await.is_err() { self.daemon_metrics.channel_send_err().inc(); } @@ -806,7 +806,7 @@ impl Daemon { if SIGINT.load(atomic::Ordering::Acquire) != 0 || SIGTERM.load(atomic::Ordering::Acquire) != 0 { if SHUTDOWN_SENT.load(atomic::Ordering::Acquire) == 0 { if let Err(e) = tx.send(DaemonEvent::Shutdown).await { - error!("can not send TimerTick {e}"); + error!("can not send TimerTick {}", e); break; } else { SHUTDOWN_SENT.store(1, atomic::Ordering::Release); @@ -814,7 +814,7 @@ impl Daemon { } } if let Err(e) = tx.send(DaemonEvent::TimerTick(0, ticker_inp_tx.clone())).await { - error!("can not send TimerTick {e}"); + error!("can not send TimerTick {}", e); break; } let c = ticker_inp_rx.len().max(1); @@ -878,12 +878,12 @@ impl Daemon { Ok(item) => match self.handle_event(item).await { Ok(()) => {} Err(e) => { - error!("fn daemon: error from handle_event {e}"); + error!("fn daemon: error from handle_event {}", e); break; } }, Err(e) => { - error!("{e}"); + error!("daemon {}", e); break; } } @@ -900,11 +900,11 @@ impl Daemon { Ok(x) => match x { Ok(()) => {} Err(e) => { - error!("joined insert worker, error {e}"); + error!("joined insert worker, error {}", e); } }, Err(e) => { - error!("insert worker join error {e}"); + error!("insert worker join error {}", e); } } } @@ -1007,7 +1007,7 @@ pub async fn run(opts: CaIngestOpts, channels_config: Option) -> { Ok(()) => {} Err(e) => { - error!("{e}"); + error!("daemon run {}", e); break; } } @@ -1021,6 +1021,6 @@ pub async fn run(opts: CaIngestOpts, channels_config: Option) -> ); } daemon_jh.await.map_err(|e| Error::with_msg_no_trace(e.to_string()))??; - info!("Joined daemon"); + info!("joined daemon"); Ok(()) } diff --git a/daqingest/src/daemon/inserthook.rs b/daqingest/src/daemon/inserthook.rs index 297c44a..f6f9df4 100644 --- a/daqingest/src/daemon/inserthook.rs +++ b/daqingest/src/daemon/inserthook.rs @@ -41,7 +41,7 @@ pub async fn active_channel_insert_hook_worker(rx: Receiver, tx: Send match tx.send(item).await { Ok(_) => {} Err(e) => { - error!("insert queue hook send {e}"); + error!("insert queue hook send {}", e); break; } } diff --git a/daqingest/src/tools.rs b/daqingest/src/tools.rs index 3385a83..ea822cc 100644 --- a/daqingest/src/tools.rs +++ b/daqingest/src/tools.rs @@ -256,7 +256,7 @@ async fn remove_older_all_series(ts_cut: TsMs, series: SeriesId, stmts: &Stmts, n_keep, n_remove, frac, - series, + series ); } Ok(()) @@ -279,7 +279,7 @@ async fn remove_older_all_series_msps( .buffer_unordered(32) .take_while(|x| { if let Err(e) = &x { - error!("{e}"); + error!("{}", e); } future::ready(x.is_ok()) }) diff --git a/dbpg/src/findaddr.rs b/dbpg/src/findaddr.rs index 6ca604b..e7b9a98 100644 --- a/dbpg/src/findaddr.rs +++ b/dbpg/src/findaddr.rs @@ -27,12 +27,12 @@ async fn __find_channel_addr(backend: &str, name: String, pg: &PgClient) -> Resu Ok(addr) => match addr.parse::() { Ok(addr) => return Ok(Some(addr)), Err(e) => { - error!("can not parse {e:?}"); + error!("can not parse {}", e); return Err(Error::IocAddrNotFound); } }, Err(e) => { - error!("can not find addr for {name} {e:?}"); + error!("can not find addr for {:?} {}", name, e); } } } @@ -75,7 +75,7 @@ async fn __query_addr_multiple(backend: &str, pg_client: &PgClient) -> Result<() let addr: SocketAddrV4 = match addr.parse() { Ok(k) => k, Err(e) => { - error!("can not parse {addr:?} for channel {ch:?} {e:?}"); + error!("can not parse {:?} for channel {:?} {}", addr, ch, e); continue; } }; diff --git a/dbpg/src/seriesbychannel.rs b/dbpg/src/seriesbychannel.rs index eab5e64..5dd944d 100644 --- a/dbpg/src/seriesbychannel.rs +++ b/dbpg/src/seriesbychannel.rs @@ -229,14 +229,14 @@ impl Worker { } } Err(e) => { - warn!("commit error {e}"); + warn!("commit error {}", e); self.pg.execute("rollback", &[]).await?; tokio::time::sleep(Duration::from_millis(1000)).await; } }; } Err(e) => { - error!("transaction error {e}"); + error!("transaction error {}", e); self.pg.execute("rollback", &[]).await?; } }; @@ -551,7 +551,7 @@ impl Worker { } async fn update_used_before(&self, sid: Vec) -> Result<(), Error> { - debug!("update_used_before {sid:?}"); + debug!("update_used_before {:?}", sid); if sid.contains(&1605348259462543621) { debug!("UPDATE TSC FOR 1605348259462543621"); } @@ -565,7 +565,7 @@ impl Worker { .prepare_typed(sql, &[tokio_postgres::types::Type::INT8_ARRAY]) .await?; let n = self.pg.execute(&qu, &[&sid]).await?; - trace!("update_used_before n {n}"); + trace!("update_used_before n {}", n); Ok(()) } } @@ -649,22 +649,22 @@ async fn psql_play(db: &Database) -> Result<(), Error> { // let qu = pg.prepare(sql).await?; let p1 = 4f64; let rows = pg.query(&qu, &[&p1]).await; - debug!("{rows:?}"); + debug!("{:?}", rows); let p1 = &[12i32, 13, 14][..]; let rows = pg.query(&qu, &[&p1]).await; - debug!("{rows:?}"); + debug!("{:?}", rows); let p1 = vec![12i32, 13, 14]; let rows = pg.query(&qu, &[&p1]).await; - debug!("{rows:?}"); + debug!("{:?}", rows); let p1 = vec![12i64, 13, 14]; let rows = pg.query(&qu, &[&p1]).await; - debug!("{rows:?}"); + debug!("{:?}", rows); let p1 = vec![String::from("a"), String::from("b")]; let rows = pg.query(&qu, &[&p1]).await; - debug!("{rows:?}"); + debug!("{:?}", rows); let p1 = vec![vec![4i64, 8], vec![10, 12]]; let rows = pg.query(&qu, &[&p1]).await; - debug!("{rows:?}"); + debug!("{:?}", rows); } if false { let sql = concat!( @@ -807,7 +807,7 @@ fn test_series_by_channel_01() { let mut series_ids = Vec::new(); for rx in rxs { let res = rx.recv().await.unwrap(); - debug!("received A: {res:?}"); + debug!("received A: {:?}", res); series_ids.push(res.unwrap().series); } { @@ -830,7 +830,7 @@ fn test_series_by_channel_01() { }; channel_info_query_tx.send(item).await.unwrap(); let res = rx.recv().await.unwrap(); - debug!("received C: {res:?}"); + debug!("received C: {:?}", res); { let rows = pg diff --git a/log/src/log.rs b/log/src/log.rs index 5bb6c52..e08dc55 100644 --- a/log/src/log.rs +++ b/log/src/log.rs @@ -1,10 +1,12 @@ #![allow(unused_imports)] -pub use tracing::debug; -pub use tracing::error; +// pub use tracing::debug; +// pub use tracing::error; // pub use tracing::info; pub use tracing::trace; pub use tracing::warn; +pub use direct_debug as debug; +pub use direct_error as error; pub use direct_info as info; pub mod log_macros_direct { @@ -15,7 +17,7 @@ pub mod log_macros_direct { eprintln!(concat!("TRACE ", $fmt)); }; ($fmt:expr, $($arg:expr),*) => { - eprintln!(concat!("TRACE ", $fmt), $($arg)*); + eprintln!(concat!("TRACE ", $fmt), $($arg),*); }; } #[allow(unused)] @@ -25,7 +27,7 @@ pub mod log_macros_direct { eprintln!(concat!("DEBUG ", $fmt)); }; ($fmt:expr, $($arg:expr),*) => { - eprintln!(concat!("DEBUG ", $fmt), $($arg)*); + eprintln!(concat!("DEBUG ", $fmt), $($arg),*); }; } #[allow(unused)] @@ -34,8 +36,8 @@ pub mod log_macros_direct { ($fmt:expr) => { eprintln!(concat!("INFO ", $fmt)); }; - ($fmt:expr, $($arg:tt)*) => { - eprintln!(concat!("INFO ", $fmt), $($arg)*); + ($fmt:expr, $($arg:expr),*) => { + eprintln!(concat!("INFO ", $fmt), $($arg),*); }; } #[allow(unused)] @@ -45,7 +47,7 @@ pub mod log_macros_direct { eprintln!(concat!("WARN ", $fmt)); }; ($fmt:expr, $($arg:expr),*) => { - eprintln!(concat!("WARN ", $fmt), $($arg)*); + eprintln!(concat!("WARN ", $fmt), $($arg),*); }; } #[allow(unused)] @@ -55,7 +57,7 @@ pub mod log_macros_direct { eprintln!(concat!("ERROR ", $fmt)); }; ($fmt:expr, $($arg:expr),*) => { - eprintln!(concat!("ERROR ", $fmt), $($arg)*); + eprintln!(concat!("ERROR ", $fmt), $($arg),*); }; } } diff --git a/netfetch/src/ca/beacons.rs b/netfetch/src/ca/beacons.rs index 6740272..cce0db1 100644 --- a/netfetch/src/ca/beacons.rs +++ b/netfetch/src/ca/beacons.rs @@ -72,7 +72,7 @@ pub async fn listen_beacons( } }?; if n != 16 { - debug!("len recv {n}"); + debug!("len recv {}", n); } if n >= 16 { let mut cur = Cursor::new(bb); @@ -84,7 +84,7 @@ pub async fn listen_beacons( let addr_u32 = cur.get_u32(); let addr = Ipv4Addr::from(addr_u32); if cmd == 0x0d { - debug!("beacon {remote} {ver} {addr} {port}"); + debug!("beacon {} {} {} {}", remote, ver, addr, port); let stnow = SystemTime::now(); let x = stnow.duration_since(SystemTime::UNIX_EPOCH).unwrap(); let ts = TsNano::from_ms(1000 * x.as_secs() + x.subsec_millis() as u64); diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 91025fc..b511440 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -1589,7 +1589,7 @@ impl CaConn { } pub fn channel_add(&mut self, conf: ChannelConfig, cssid: ChannelStatusSeriesId) -> Result<(), Error> { - debug!("channel_add {conf:?} {cssid:?}"); + debug!("channel_add {:?} {:?}", conf, cssid); if false { if series::dbg::dbg_chn(&conf.name()) { self.trace_channel_poll = true; @@ -1598,7 +1598,7 @@ impl CaConn { if self.cid_by_name(conf.name()).is_some() { self.mett.channel_add_exists().inc(); if series::dbg::dbg_chn(&conf.name()) { - error!("logic error channel already exists {conf:?}"); + error!("logic error channel already exists {:?}", conf); } Ok(()) } else { @@ -1606,7 +1606,7 @@ impl CaConn { if self.channels.contains_key(&cid) { self.mett.channel_add_exists().inc(); if series::dbg::dbg_chn(&conf.name()) { - error!("logic error channel already exists {conf:?}"); + error!("logic error channel already exists {:?}", conf); } Ok(()) } else { @@ -1688,7 +1688,7 @@ impl CaConn { if let Some(cid) = self.cid_by_name(&name) { self.channel_remove_by_cid(cid); } else { - warn!("channel_remove does not exist {name}"); + warn!("channel_remove does not exist {}", name); } } @@ -1803,7 +1803,7 @@ impl CaConn { // as logic error. // Close connection to the IOC. Cout as logic error. let e = Error::UnknownCid(cid); - error!("{e}"); + error!("transition_to_polling {}", e); return Err(e); }; if let ChannelState::Writable(st2) = ch_s { @@ -1848,7 +1848,10 @@ impl CaConn { (&mut x.state, &mut x.wrst, &x.conf) } else { // TODO return better as error and let caller decide (with more structured errors) - warn!("TODO handle_event_add_res can not find channel for {cid:?} {subid:?}"); + warn!( + "TODO handle_event_add_res can not find channel for {:?} {:?}", + cid, subid + ); // TODO // When removing a channel, keep it in "closed" btree for some time because messages can // still arrive from all buffers. @@ -1912,7 +1915,7 @@ impl CaConn { match &mut st.reading { ReadingState::EnableMonitoring(st2) => { let dt = st2.tsbeg.elapsed().as_secs_f32(); - trace!("change to Monitoring after dt {dt:.0} ms"); + trace!("change to Monitoring after dt {:.0} ms", dt); st.reading = ReadingState::Monitoring(MonitoringState { tsbeg: tsnow, subid: st2.subid, @@ -2024,7 +2027,10 @@ impl CaConn { &mut x.state } else { // TODO return better as error and let caller decide (with more structured errors) - warn!("TODO handle_event_add_res can not find channel for {cid:?} {subid:?}"); + warn!( + "TODO handle_event_add_res can not find channel for {:?} {:?}", + cid, subid + ); // TODO // When removing a channel, keep it in "closed" btree for some time because messages can // still arrive from all buffers. @@ -2050,7 +2056,7 @@ impl CaConn { } ReadingState::EnableMonitoring(..) => { let name = self.name_by_cid(cid); - warn!("received event-cancel but channel {name:?} in wrong state"); + warn!("received event-cancel but channel {:?} in wrong state", name); } ReadingState::Monitoring(st2) => { match &mut st2.mon2state { @@ -2061,16 +2067,16 @@ impl CaConn { Monitoring2State::ReadPending(_) => {} } let name = self.name_by_cid(cid); - warn!("received event-cancel but channel {name:?} in wrong state"); + warn!("received event-cancel but channel {:?} in wrong state", name); } ReadingState::Polling(..) => { let name = self.name_by_cid(cid); - warn!("received event-cancel but channel {name:?} in wrong state"); + warn!("received event-cancel but channel {:?} in wrong state", name); } }, _ => { // TODO count instead of print - error!("unexpected state: EventAddRes while having {ch_s:?}"); + error!("unexpected state: EventAddRes while having {:?}", ch_s); } } Ok(()) @@ -2099,7 +2105,7 @@ impl CaConn { let (ch_s, ch_wrst, ch_conf) = if let Some(x) = self.channels.get_mut(cid) { (&mut x.state, &mut x.wrst, &x.conf) } else { - warn!("handle_read_notify_res can not find channel for {cid:?} {ioid:?}"); + warn!("handle_read_notify_res can not find channel for {:?} {:?}", cid, ioid); return Ok(()); }; match ch_s { @@ -2254,7 +2260,7 @@ impl CaConn { } _ => { // TODO count instead of print - error!("unexpected state: ReadNotifyRes while having {ch_s:?}"); + error!("unexpected state: ReadNotifyRes while having {:?}", ch_s); } } } else { @@ -2462,32 +2468,32 @@ impl CaConn { CaMsgTy::VersionRes(n) => { // debug!("see incoming {:?} {:?}", self.remote_addr_dbg, msg); if n < 12 || n > 13 { - error!("see some unexpected version {n} channel search may not work."); + error!("see some unexpected version {} channel search may not work", n); Ready(Some(Ok(()))) } else { if n != 13 { - warn!("received peer version {n}"); + warn!("received peer version {}", n); } self.state = CaConnState::PeerReady; Ready(Some(Ok(()))) } } CaMsgTy::CreateChanRes(k) => { - warn!("got unexpected {k:?}",); + warn!("got unexpected {:?}", k); Ready(Some(Ok(()))) } CaMsgTy::AccessRightsRes(k) => { - warn!("got unexpected {k:?}",); + warn!("got unexpected {:?}", k); Ready(Some(Ok(()))) } k => { - warn!("got some other unhandled message: {k:?}"); + warn!("got some other unhandled message: {:?}", k); Ready(Some(Ok(()))) } }, }, Err(e) => { - error!("got error item from CaProto {e:?}"); + error!("got error item from CaProto {:?}", e); Ready(Some(Err(e.into()))) } }, @@ -2933,7 +2939,7 @@ impl CaConn { Ready(Some(Ok(()))) } Ready(Some(Err(e))) => { - error!("CaProto yields error: {e:?} remote {:?}", self.remote_addr_dbg); + error!("CaProto yields error: {} remote {:?}", e, self.remote_addr_dbg); self.trigger_shutdown(ShutdownReason::Protocol); Ready(Some(Err(e))) } @@ -3108,7 +3114,7 @@ impl CaConn { } Ok(Err(e)) => { use std::io::ErrorKind; - debug!("error connect to {addr} {e}"); + debug!("error connect to {} {}", addr, e); let addr = addr.clone(); self.emit_connection_status_item(ConnectionStatusItem { ts: self.tmp_ts_poll, @@ -3124,7 +3130,7 @@ impl CaConn { } Err(e) => { // TODO log with exponential backoff - debug!("timeout connect to {addr} {e}"); + debug!("timeout connect to {} {}", addr, e); let addr = addr.clone(); self.emit_connection_status_item(ConnectionStatusItem { ts: self.tmp_ts_poll, @@ -3204,7 +3210,7 @@ impl CaConn { continue; } Ready(None) => { - error!("handle_conn_state yields {x:?}"); + error!("handle_conn_state yields {:?}", x); return Err(Error::LoopInnerLogicError); } Pending => { @@ -3231,7 +3237,7 @@ impl CaConn { Ready(()) => match self.as_mut().handle_own_ticker(cx) { Ok(_) => Ok(Pending), Err(e) => { - error!("handle_own_ticker {e}"); + error!("handle_own_ticker {}", e); self.trigger_shutdown(ShutdownReason::InternalError); Err(e) } @@ -3267,7 +3273,7 @@ impl CaConn { match self.tick_writers() { Ok(()) => {} Err(e) => { - error!("error in writers: {e}"); + error!("error in writers: {}", e); } } } @@ -3486,7 +3492,7 @@ impl CaConn { let self_name = "attempt_flush_queue"; use Poll::*; if qu.len() != 0 { - trace_flush_queue!("{self_name} id {:10} len {}", id, qu.len()); + trace_flush_queue!("{} id {:10} len {}", self_name, id, qu.len()); } let mut have_progress = false; let mut i = 0; @@ -3509,7 +3515,7 @@ impl CaConn { if sp.is_sending() { match sp.poll_unpin(cx) { Ready(Ok(())) => { - trace_flush_queue!("{self_name} id {:10} send done", id); + trace_flush_queue!("{} id {:10} send done", self_name, id); have_progress = true; } Ready(Err(e)) => { @@ -3517,7 +3523,7 @@ impl CaConn { match e { SpErr::NoSendInProgress => return Err(Error::NotSending), SpErr::Closed(_) => { - error!("{self_name} queue closed id {:10}", id); + error!("{} queue closed id {:10}", self_name, id); return Err(Error::ClosedSending); } } @@ -3778,7 +3784,7 @@ impl Stream for CaConn { have_pending = true; } Err(e) => { - error!("{e}"); + error!("{}", e); self.state = CaConnState::EndOfStream; break Ready(Some(CaConnEvent::err_now(e))); } diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index 5987326..6c3bb7f 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -655,7 +655,7 @@ impl CaConnSet { } } for cmd in cmds { - debug!("call handle_remove_channel {cmd:?}"); + debug!("call handle_remove_channel {:?}", cmd); self.handle_remove_channel(cmd)?; } if let Err(_) = cmd.restx.try_send(Ok(())) { @@ -727,7 +727,7 @@ impl CaConnSet { } } Err(e) => { - warn!("TODO handle error {e}"); + warn!("TODO handle error {}", e); Ok(()) } } @@ -887,7 +887,7 @@ impl CaConnSet { k.value = ChannelStateValue::ToRemove { addr: None }; } WithStatusSeriesIdStateInner::WithAddress { addr, state: _ } => { - debug!("send remove {ch:?} to {addr}"); + debug!("send remove {:?} to {}", ch, addr); let conn_ress = self .ca_conn_ress .get_mut(&SocketAddr::V4(addr.clone())) @@ -1140,7 +1140,7 @@ impl CaConnSet { } fn handle_ca_conn_eos(&mut self, addr: SocketAddr, reason: EndOfStreamReason) -> Result<(), Error> { - debug!("handle_ca_conn_eos {addr} {reason:?}"); + debug!("handle_ca_conn_eos {} {:?}", addr, reason); if let Some(e) = self.ca_conn_ress.remove(&addr) { self.mett.ca_conn_eos_ok().inc(); self.await_ca_conn_jhs.push_back((addr, e.jh)); @@ -1411,7 +1411,7 @@ impl CaConnSet { ActiveChannelState::WaitForStatusSeriesId { since } => { let dt = stnow.duration_since(*since).unwrap_or(Duration::ZERO); if dt > Duration::from_millis(20000) { - warn!("timeout can not get status series id for {ch:?}"); + warn!("timeout can not get status series id for {:?}", ch); *st2 = ActiveChannelState::Init { since: stnow }; } else { // TODO @@ -1675,7 +1675,7 @@ impl CaConnSet { Ready(Err(e)) => match e { scywr::senderpolling::Error::NoSendInProgress => { let e = Error::PushCmdsNoSendInProgress(*addr); - error!("{e}"); + error!("try_push_ca_conn_cmds {}", e); return Some(Ready(Err(e))); } scywr::senderpolling::Error::Closed(_) => { @@ -1816,7 +1816,7 @@ where Some(Ready(Ok(()))) } Ready(Err(e)) => { - error!("sender_polling_send {e}"); + error!("sender_polling_send {}", e); Some(Ready(Err(e.into()))) } Pending => Some(Pending), @@ -1871,7 +1871,7 @@ impl Stream for CaConnSet { penpro.mark_progress(); } Err(e) => { - error!("ticker {e}"); + error!("ticker {}", e); break Ready(Some(CaConnSetItem::Error(e))); } }, @@ -1893,15 +1893,15 @@ impl Stream for CaConnSet { match x { Ok(Ok(())) => { self.mett.ca_conn_task_join_done_ok().inc(); - debug!("CaConn {addr} finished well left {left}"); + debug!("CaConn {} finished well left {}", addr, left); } Ok(Err(e)) => { self.mett.ca_conn_task_join_done_err().inc(); - error!("CaConn {addr} task error: {e} left {left}"); + error!("CaConn {} task error: {} left {}", addr, e, left); } Err(e) => { self.mett.ca_conn_task_join_err().inc(); - error!("CaConn {addr} join error: {e} left {left}"); + error!("CaConn {} join error: {} left {}", addr, e, left); } } penpro.mark_progress(); diff --git a/netfetch/src/ca/finder.rs b/netfetch/src/ca/finder.rs index 9426348..8812ba2 100644 --- a/netfetch/src/ca/finder.rs +++ b/netfetch/src/ca/finder.rs @@ -240,7 +240,7 @@ async fn finder_network_if_not_found( let mut res = VecDeque::new(); let mut net = VecDeque::new(); for e in item { - trace!("{self_name} sees {e:?}"); + trace!("{} sees {:?}", self_name, e); if e.addr.is_none() { net.push_back(e.channel); } else { @@ -248,12 +248,12 @@ async fn finder_network_if_not_found( } } if let Err(_) = tx.send(res).await { - debug!("{self_name} res send error, break"); + debug!("{} res send error, break", self_name); break; } for ch in net { if let Err(_) = net_tx.send(ch).await { - debug!("{self_name} net ch send error, break"); + debug!("{} net ch send error, break", self_name); break 'outer; } } @@ -300,7 +300,7 @@ async fn process_net_result( } } Err(e) => { - warn!("error during network search: {e}"); + warn!("error during network search: {}", e); break; } } @@ -310,7 +310,7 @@ async fn process_net_result( trace!("process_net_result dbtx closed"); for (i, jh) in ioc_search_index_worker_jhs.into_iter().enumerate() { jh.await?; - trace!("process_net_result search index worker {i} awaited"); + trace!("process_net_result search index worker {} awaited", i); } Ok(()) } @@ -361,7 +361,7 @@ fn start_finder_ca(tx: Sender, tgts: Vec) -> (Sender< } Err(e) => { // TODO input is done... ignore from here on. - error!("Finder input channel error {e}"); + error!("Finder input channel error {}",e); qrx_more = false; } } diff --git a/netfetch/src/ca/findioc.rs b/netfetch/src/ca/findioc.rs index 9fd1c50..fa926c9 100644 --- a/netfetch/src/ca/findioc.rs +++ b/netfetch/src/ca/findioc.rs @@ -233,7 +233,7 @@ impl FindIocStream { let mut addr_len = std::mem::size_of::(); let ec = unsafe { libc::getsockname(sock.0, &mut addr as *mut _ as _, &mut addr_len as *mut _ as _) }; if ec == -1 { - error!("getsockname {ec}"); + error!("getsockname {}", ec); return Err(Error::SocketConvertTokio); } else { if true { @@ -303,7 +303,7 @@ impl FindIocStream { } } else if ec < 0 { // stats.ca_udp_io_error().inc(); - error!("unexpected received {ec}"); + error!("unexpected received {}", ec); Poll::Ready(Err(Error::ReadFailure)) } else if ec == 0 { // stats.ca_udp_io_empty().inc(); @@ -318,7 +318,7 @@ impl FindIocStream { for i in 0..(ec as usize) { s1.extend(format!(" {:02x}", buf[i]).chars()); } - debug!("received answer {s1}"); + debug!("received answer {}", s1); debug!( "received answer string {}", String::from_utf8_lossy(buf[..ec as usize].into()) @@ -367,12 +367,12 @@ impl FindIocStream { } if msgs.len() == 1 { // stats.ca_udp_warn().inc(); - debug!("received answer with single message: {msgs:?}"); + debug!("received answer with single message: {:?}", msgs); } let mut good = true; if let CaMsgTy::VersionRes(v) = msgs[0].ty { if v != 13 { - warn!("bad version in search response: {v}"); + warn!("bad version in search response: {}", v); good = false; } } else { @@ -484,7 +484,7 @@ impl FindIocStream { } } if !found_sid { - error!("can not find sid {sid:?} in batch {bid:?}"); + error!("can not find sid {:?} in batch {:?}", sid, bid); } let all_done = batch.done.iter().all(|x| *x); if all_done { @@ -494,7 +494,7 @@ impl FindIocStream { } None => { // TODO analyze reasons - error!("no batch for {bid:?}"); + error!("no batch for {:?}", bid); } } } @@ -503,7 +503,7 @@ impl FindIocStream { if self.sids_done.contains_key(&sid) { self.result_for_done_sid_count += 1; } else { - error!("no bid for {sid:?}"); + error!("no bid for {:?}", sid); } } } @@ -608,7 +608,7 @@ impl Stream for FindIocStream { have_progress = true; } Ready(Err(e)) => { - error!("{e}"); + error!("FindIocStream {}", e); } Pending => { g.clear_ready(); @@ -617,7 +617,7 @@ impl Stream for FindIocStream { } }, Ready(Err(e)) => { - error!("poll_write_ready {e}"); + error!("poll_write_ready {}", e); // TODO should we abort? } Pending => {} @@ -693,7 +693,7 @@ impl Stream for FindIocStream { continue; } Ready(Err(e)) => { - error!("Error from try_read {e:?}"); + error!("try_read {}", e); Ready(Some(Err(e))) } Pending => { @@ -706,7 +706,7 @@ impl Stream for FindIocStream { } } Ready(Err(e)) => { - error!("poll_read_ready {e:?}"); + error!("poll_read_ready {}", e); Ready(Some(Err(e.into()))) } Pending => { diff --git a/netfetch/src/ca/search.rs b/netfetch/src/ca/search.rs index e8c7a00..53ac540 100644 --- a/netfetch/src/ca/search.rs +++ b/netfetch/src/ca/search.rs @@ -90,7 +90,7 @@ async fn search_tgts_from_opts(opts: &CaIngestOpts) -> Result<(Vec } } Err(e) => { - error!("can not resolve {s} {e}"); + error!("can not resolve {} {}", s, e); } } } @@ -110,7 +110,7 @@ async fn search_tgts_from_opts(opts: &CaIngestOpts) -> Result<(Vec } } Err(e) => { - warn!("can not resolve {s} {e}"); + warn!("can not resolve {} {}", s, e); } } } diff --git a/scywr/src/insertworker.rs b/scywr/src/insertworker.rs index f7e0631..5972cf8 100644 --- a/scywr/src/insertworker.rs +++ b/scywr/src/insertworker.rs @@ -278,7 +278,7 @@ async fn worker_streamed( insert_worker_opts .insert_workers_running .fetch_sub(1, atomic::Ordering::AcqRel); - debug_setup!("insert worker {worker_ix} done"); + debug_setup!("insert worker {} done", worker_ix); Ok(()) } @@ -367,22 +367,22 @@ fn inspect_items( for item in batch { match &item { QueryItem::Insert(item) => { - trace_item_execute!("execute {worker_name} Insert {}", item.string_short()); + trace_item_execute!("execute {} Insert {}", worker_name, item.string_short()); } QueryItem::Msp(item) => { - trace_item_execute!("execute {worker_name} Msp {}", item.string_short()); + trace_item_execute!("execute {} Msp {}", worker_name, item.string_short()); } QueryItem::TimeBinSimpleF32V02(_) => { - trace_item_execute!("execute {worker_name} TimeBinSimpleF32V02"); + trace_item_execute!("execute {} TimeBinSimpleF32V02", worker_name); } QueryItem::BinWriteIndexV04(_) => { - trace_item_execute!("execute {worker_name} BinWriteIndexV04"); + trace_item_execute!("execute {} BinWriteIndexV04", worker_name); } QueryItem::Accounting(_) => { - trace_item_execute!("execute {worker_name} Accounting {item:?}"); + trace_item_execute!("execute {} Accounting {:?}", worker_name, item); } QueryItem::AccountingRecv(_) => { - trace_item_execute!("execute {worker_name} Accounting {item:?}"); + trace_item_execute!("execute {} Accounting {:?}", worker_name, item); } } }