diff --git a/netfetch/src/ca.rs b/netfetch/src/ca.rs index a9b5cb2..d5c5714 100644 --- a/netfetch/src/ca.rs +++ b/netfetch/src/ca.rs @@ -6,6 +6,7 @@ pub mod store; use self::store::DataStore; use crate::ca::conn::ConnCommand; +use crate::errconv::ErrConv; use crate::store::{CommonInsertItemQueue, QueryItem}; use async_channel::Sender; use conn::CaConn; @@ -46,6 +47,10 @@ struct ChannelConfig { backend: String, channels: Vec, search: Vec, + #[serde(default)] + search_blacklist: Vec, + #[serde(default)] + tmp_remove: Vec, addr_bind: Option, addr_conn: Option, whitelist: Option, @@ -92,6 +97,7 @@ pub async fn parse_config(config: PathBuf) -> Result { backend: conf.backend, channels: conf.channels, search: conf.search, + search_blacklist: conf.search_blacklist, addr_bind: conf.addr_bind.unwrap_or(Ipv4Addr::new(0, 0, 0, 0)), addr_conn: conf.addr_conn.unwrap_or(Ipv4Addr::new(255, 255, 255, 255)), timeout: conf.timeout.unwrap_or(2000), @@ -111,6 +117,7 @@ pub struct CaConnectOpts { pub backend: String, pub channels: Vec, pub search: Vec, + pub search_blacklist: Vec, pub addr_bind: Ipv4Addr, pub addr_conn: Ipv4Addr, pub timeout: u64, @@ -265,9 +272,23 @@ impl CommandQueueSet { } } - pub fn queues(&self) -> &tokio::sync::Mutex>> { + pub async fn queues(&self) -> &tokio::sync::Mutex>> { &self.queues } + + pub async fn queues_locked(&self) -> tokio::sync::MutexGuard>> { + let mut g = self.queues.lock().await; + let mut rm = Vec::new(); + for (k, v) in g.iter() { + if v.is_closed() { + rm.push(*k); + } + } + for x in rm { + g.remove(&x); + } + g + } } pub struct IngestCommons { @@ -297,29 +318,32 @@ pub async fn find_channel_addr( tokio::spawn(pg_conn); let pg_client = Arc::new(pg_client); let qu_find_addr = pg_client - .prepare("with q1 as (select t1.facility, t1.channel, t1.addr from ioc_by_channel t1 where t1.facility = $1 and t1.channel in ($2) and t1.addr != '' order by t1.tsmod desc) select distinct on (q1.facility, q1.channel) q1.facility, q1.channel, q1.addr from q1") + .prepare( + "select t1.facility, t1.channel, t1.addr from ioc_by_channel t1 where t1.facility = $1 and t1.channel = $2", + ) .await - .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; - let rows = pg_client - .query(&qu_find_addr, &[&backend, &name]) - .await - .map_err(|e| Error::with_msg_no_trace(format!("pg lookup error: {e:?}")))?; + .err_conv()?; + let rows = pg_client.query(&qu_find_addr, &[&backend, &name]).await.err_conv()?; if rows.is_empty() { error!("can not find any addresses of channels {:?}", name); + Err(Error::with_msg_no_trace(format!("no address for channel {}", name))) } else { for row in rows { let addr: &str = row.get(2); if addr == "" { - return Ok(None); + return Err(Error::with_msg_no_trace(format!("no address for channel {}", name))); } else { match addr.parse::() { Ok(addr) => return Ok(Some(addr)), - Err(_) => return Ok(None), + Err(e) => { + error!("can not parse {e:?}"); + return Err(Error::with_msg_no_trace(format!("no address for channel {}", name))); + } } } } + Ok(None) } - Ok(None) } pub async fn create_ca_conn( @@ -348,7 +372,8 @@ pub async fn create_ca_conn( let stats2 = conn.stats(); let conn_command_tx = conn.conn_command_tx(); { - command_queue_set.queues().lock().await.insert(addr, conn_command_tx); + //command_queue_set.queues().lock().await.insert(addr, conn_command_tx); + command_queue_set.queues_locked().await.insert(addr, conn_command_tx); } let conn_block = async move { let mut conn = conn; @@ -409,6 +434,22 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { .prepare("with q1 as (select t1.facility, t1.channel, t1.addr from ioc_by_channel t1 where t1.facility = $1 and t1.channel in ($2, $3, $4, $5, $6, $7, $8, $9) and t1.addr != '' order by t1.tsmod desc) select distinct on (q1.facility, q1.channel) q1.facility, q1.channel, q1.addr from q1") .await .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + + // Fetch all addresses for all channels. + let rows = pg_client + .query("select channel, addr from ioc_by_channel", &[]) + .await + .err_conv()?; + let mut phonebook = BTreeMap::new(); + for row in rows { + let channel: String = row.get(0); + let addr: String = row.get(1); + let addr: SocketAddrV4 = addr + .parse() + .map_err(|_| Error::with_msg_no_trace(format!("can not parse address {addr}")))?; + phonebook.insert(channel, addr); + } + let mut channels_by_host = BTreeMap::new(); let data_store = Arc::new(DataStore::new(pg_client.clone(), scy.clone()).await?); @@ -481,30 +522,28 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { let mut chstmp = ["__NONE__"; 8]; let mut ix = 0; while chns_todo.len() > 0 { - for (s1, s2) in chns_todo.iter().zip(chstmp.iter_mut()) { - *s2 = s1; - } - chns_todo = &chns_todo[chstmp.len().min(chns_todo.len())..]; - let rows = pg_client - .query( - &qu_find_addr, - &[ - &opts.backend, - &chstmp[0], - &chstmp[1], - &chstmp[2], - &chstmp[3], - &chstmp[4], - &chstmp[5], - &chstmp[6], - &chstmp[7], - ], - ) - .await - .map_err(|e| Error::with_msg_no_trace(format!("pg lookup error: {e:?}")))?; - if rows.is_empty() { - error!("can not find any addresses of channels {:?}", chstmp); - } else { + if false { + for (s1, s2) in chns_todo.iter().zip(chstmp.iter_mut()) { + *s2 = s1; + } + chns_todo = &chns_todo[chstmp.len().min(chns_todo.len())..]; + let rows = pg_client + .query( + &qu_find_addr, + &[ + &opts.backend, + &chstmp[0], + &chstmp[1], + &chstmp[2], + &chstmp[3], + &chstmp[4], + &chstmp[5], + &chstmp[6], + &chstmp[7], + ], + ) + .await + .map_err(|e| Error::with_msg_no_trace(format!("pg lookup error: {e:?}")))?; for row in rows { let ch: &str = row.get(1); let addr: &str = row.get(2); @@ -521,76 +560,84 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { continue; } }; - ix += 1; - if ix % 1000 == 0 { - info!("{} of {} {} {:?}", ix, opts.channels.len(), ch, addr); - } - if !channels_by_host.contains_key(&addr) { - channels_by_host.insert(addr, vec![ch.to_string()]); + let _ = addr; + } + } + } + if let Some(ch) = chns_todo.first() { + let ch = ch.clone(); + chns_todo = &chns_todo[1..]; + if let Some(addr) = phonebook.get(&ch) { + if !channels_by_host.contains_key(&addr) { + channels_by_host.insert(addr, vec![ch.to_string()]); + } else { + channels_by_host.get_mut(&addr).unwrap().push(ch.to_string()); + } + let create_new = { + let g = command_queue_set.queues_locked().await; + if let Some(tx) = g.get(&addr) { + let (cmd, rx) = ConnCommand::channel_add(ch.to_string()); + tx.send(cmd).await.unwrap(); + if !rx.recv().await.unwrap() { + error!("Could not add channel: {}", ch); + } + false } else { - channels_by_host.get_mut(&addr).unwrap().push(ch.to_string()); + true } + }; + if create_new { + info!("create new CaConn {:?} {:?}", addr, ch); + let data_store = data_store.clone(); + let conn = CaConn::new( + addr.clone(), + opts.local_epics_hostname.clone(), + data_store.clone(), + insert_item_queue.sender(), + opts.array_truncate, + opts.insert_queue_max, + insert_ivl_min.clone(), + ); + conn_stats.lock().await.push(conn.stats()); + let stats2 = conn.stats(); + let conn_command_tx = conn.conn_command_tx(); + let tx = conn_command_tx.clone(); { - let create_new = { - let g = command_queue_set.queues().lock().await; - if let Some(tx) = g.get(&addr) { - let (cmd, rx) = ConnCommand::channel_add(ch.to_string()); - tx.send(cmd).await.unwrap(); - if !rx.recv().await.unwrap() { - error!("Could not add channel: {}", ch); + command_queue_set + .queues_locked() + .await + .insert(addr.clone(), conn_command_tx); + } + let conn_block = async move { + let mut conn = conn; + while let Some(item) = conn.next().await { + match item { + Ok(_) => { + stats2.conn_item_count_inc(); } - false - } else { - true - } - }; - if create_new { - info!("create new CaConn {:?} {:?}", addr, ch); - let data_store = data_store.clone(); - let conn = CaConn::new( - addr, - opts.local_epics_hostname.clone(), - data_store.clone(), - insert_item_queue.sender(), - opts.array_truncate, - opts.insert_queue_max, - insert_ivl_min.clone(), - ); - conn_stats.lock().await.push(conn.stats()); - let stats2 = conn.stats(); - let conn_command_tx = conn.conn_command_tx(); - let tx = conn_command_tx.clone(); - { - command_queue_set.queues().lock().await.insert(addr, conn_command_tx); - } - let conn_block = async move { - let mut conn = conn; - while let Some(item) = conn.next().await { - match item { - Ok(_) => { - stats2.conn_item_count_inc(); - } - Err(e) => { - error!("CaConn gives error: {e:?}"); - break; - } - } - } - Ok::<_, Error>(()) - }; - let jh = tokio::spawn(conn_block); - conn_jhs.push(jh); - { - let (cmd, rx) = ConnCommand::channel_add(ch.to_string()); - tx.send(cmd).await.unwrap(); - if !rx.recv().await.unwrap() { - error!("Could not add channel: {}", ch); + Err(e) => { + error!("CaConn gives error: {e:?}"); + break; } } } + Ok::<_, Error>(()) + }; + let jh = tokio::spawn(conn_block); + conn_jhs.push(jh); + { + let (cmd, rx) = ConnCommand::channel_add(ch.to_string()); + tx.send(cmd).await.unwrap(); + if !rx.recv().await.unwrap() { + error!("Could not add channel: {}", ch); + } } } } + ix += 1; + if ix % 1000 == 0 { + info!("{} of {} {}", ix, opts.channels.len(), ch); + } } } info!("channels_by_host len {}", channels_by_host.len()); diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 32ba035..ce625e5 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -852,12 +852,12 @@ impl CaConn { }, Err(e) => { error!("got error item from CaProto {e:?}"); - Ready(Some(Ok(()))) + Ready(Some(Err(e))) } }, Ready(None) => { - warn!("CaProto is done {:?}", self.remote_addr_dbg); - self.state = CaConnState::Wait(wait_fut(10000)); + warn!("handle_conn_listen CaProto is done {:?}", self.remote_addr_dbg); + self.state = CaConnState::Wait(wait_fut(self.backoff_next())); self.proto = None; Ready(None) } @@ -1014,8 +1014,8 @@ impl CaConn { Ready(Some(Err(e))) } Ready(None) => { - warn!("CaProto is done"); - self.state = CaConnState::Wait(wait_fut(10000)); + warn!("handle_peer_ready CaProto is done {:?}", self.remote_addr_dbg); + self.state = CaConnState::Wait(wait_fut(self.backoff_next())); self.proto = None; Ready(None) } @@ -1028,6 +1028,8 @@ impl CaConn { } res } + + //fn loop_inner(&mut self, cx: &mut Context) } impl Stream for CaConn { @@ -1160,6 +1162,7 @@ impl Stream for CaConn { }, CaConnState::PeerReady => { { + // TODO can I move this block somewhere else? let _ = self.handle_get_series_futs(cx)?; let ts2 = Instant::now(); self.stats @@ -1180,10 +1183,7 @@ impl Stream for CaConn { } } Ready(Some(Err(e))) => Ready(Some(Err(e))), - Ready(None) => { - // TODO even though protocol is done, we might still have e.g. insert items to flush! - Ready(None) - } + Ready(None) => continue 'outer, Pending => Pending, } } diff --git a/netfetch/src/ca/findioc.rs b/netfetch/src/ca/findioc.rs index 4cb1e7f..3f510d3 100644 --- a/netfetch/src/ca/findioc.rs +++ b/netfetch/src/ca/findioc.rs @@ -44,8 +44,9 @@ struct SearchBatch { #[derive(Debug)] pub struct FindIocRes { - pub src: SocketAddrV4, pub channel: String, + pub query_addr: Option, + pub response_addr: Option, pub addr: Option, } @@ -90,7 +91,7 @@ impl FindIocStream { bids_timed_out: BTreeMap::new(), sids_done: BTreeMap::new(), result_for_done_sid_count: 0, - in_flight_max: 40, + in_flight_max: 20, channels_per_batch: 10, batch_run_max: Duration::from_millis(2500), } @@ -240,12 +241,6 @@ impl FindIocStream { let saddr2: libc::sockaddr_in = std::mem::transmute_copy(&saddr_mem); let src_addr = Ipv4Addr::from(saddr2.sin_addr.s_addr.to_ne_bytes()); let src_port = u16::from_be(saddr2.sin_port); - trace!( - "received from src_addr {:?} src_port {} ec {}", - src_addr, - src_port, - ec - ); if false { let mut s1 = String::new(); for i in 0..(ec as usize) { @@ -257,10 +252,15 @@ impl FindIocStream { String::from_utf8_lossy(buf[..ec as usize].into()) ); } - // TODO handle if we get a too large answer. + if ec > 2048 { + // TODO handle if we get a too large answer. + error!("received packet too large"); + panic!(); + } let mut nb = crate::netbuf::NetBuf::new(2048); nb.put_slice(&buf[..ec as usize])?; let mut msgs = vec![]; + let mut accounted = 0; loop { let n = nb.data().len(); if n == 0 { @@ -271,6 +271,11 @@ impl FindIocStream { break; } let hi = HeadInfo::from_netbuf(&mut nb)?; + if hi.cmdid() == 0 && hi.payload() == 0 { + } else if hi.cmdid() == 6 && hi.payload() == 8 { + } else { + info!("cmdid {} payload {}", hi.cmdid(), hi.payload()); + } if nb.data().len() < hi.payload() { error!("incomplete message, missing payload"); break; @@ -278,6 +283,13 @@ impl FindIocStream { let msg = CaMsg::from_proto_infos(&hi, nb.data(), 32)?; nb.adv(hi.payload())?; msgs.push(msg); + accounted += 16 + hi.payload(); + } + if accounted != ec as usize { + info!("unaccounted data ec {} accounted {}", ec, accounted); + } + if msgs.len() != 2 { + info!("expect always 2 commands in the response, instead got {}", msgs.len()); } let mut res = vec![]; for msg in msgs.iter() { @@ -286,7 +298,10 @@ impl FindIocStream { let addr = SocketAddrV4::new(src_addr, k.tcp_port); res.push((SearchId(k.id), addr)); } - _ => {} + CaMsgTy::VersionRes(13) => {} + _ => { + warn!("try_read: unknown message received {:?}", msg.ty); + } } } Poll::Ready(Ok((SocketAddrV4::new(src_addr, src_port), res))) @@ -344,14 +359,17 @@ impl FindIocStream { sids_remove.push(sid.clone()); match self.in_flight.get_mut(bid) { Some(batch) => { + // TGT for (i2, s2) in batch.sids.iter().enumerate() { if s2 == &sid { match batch.channels.get(i2) { Some(ch) => { let res = FindIocRes { channel: ch.into(), + // TODO associate a batch with a specific query address. + query_addr: None, + response_addr: Some(src.clone()), addr: Some(addr), - src: src.clone(), }; self.out_queue.push_back(res); } @@ -420,7 +438,8 @@ impl FindIocStream { } for (sid, ch) in sids.into_iter().zip(chns) { let res = FindIocRes { - src: SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0), + query_addr: None, + response_addr: None, channel: ch, addr: None, }; diff --git a/netfetch/src/ca/proto.rs b/netfetch/src/ca/proto.rs index 5edacd4..596add7 100644 --- a/netfetch/src/ca/proto.rs +++ b/netfetch/src/ca/proto.rs @@ -723,6 +723,10 @@ impl HeadInfo { Ok(hi) } + pub fn cmdid(&self) -> u16 { + self.cmdid + } + pub fn payload(&self) -> usize { self.payload_size as _ } diff --git a/netfetch/src/ca/search.rs b/netfetch/src/ca/search.rs index 298d1b1..98aceae 100644 --- a/netfetch/src/ca/search.rs +++ b/netfetch/src/ca/search.rs @@ -74,24 +74,16 @@ pub async fn ca_search(opts: ListenFromFileOpts) -> Result<(), Error> { // TODO join pg_conn in the end: tokio::spawn(pg_conn); let pg_client = Arc::new(pg_client); - let qu_select = pg_client - .prepare("select addr from ioc_by_channel where facility = $1 and channel = $2 and searchaddr = $3") - .await - .unwrap(); let qu_insert = { const TEXT: tokio_postgres::types::Type = tokio_postgres::types::Type::TEXT; pg_client .prepare_typed( - "insert into ioc_by_channel (facility, channel, searchaddr, addr) values ($1, $2, $3, $4) on conflict do nothing", - &[TEXT, TEXT, TEXT, TEXT], + "insert into ioc_by_channel_log (facility, channel, queryaddr, responseaddr, addr) values ($1, $2, $3, $4, $5)", + &[TEXT, TEXT, TEXT, TEXT, TEXT], ) .await .unwrap() }; - let qu_update = pg_client - .prepare("update ioc_by_channel set addr = $4, tsmod = now(), modcount = modcount + 1 where facility = $1 and channel = $2 and searchaddr = $3") - .await - .unwrap(); let mut addrs = vec![]; for s in &opts.search { match resolve_address(s).await { @@ -105,19 +97,8 @@ pub async fn ca_search(opts: ListenFromFileOpts) -> Result<(), Error> { } } let gw_addrs = { - // Try to blacklist.. - // TODO if it helps, add a config option for it. - let gateways = [ - "sf-cagw", - "saresa-cagw", - "saresb-cagw", - "saresc-cagw", - "satesd-cagw", - "satese-cagw", - "satesf-cagw", - ]; let mut gw_addrs = vec![]; - for s in gateways { + for s in &opts.search_blacklist { match resolve_address(s).await { Ok(addr) => { info!("resolved {s} as {addr}"); @@ -163,9 +144,11 @@ pub async fn ca_search(opts: ListenFromFileOpts) -> Result<(), Error> { for item in item { let mut do_block = false; for a2 in &gw_addrs { - if &item.src == a2 { - do_block = true; - warn!("gateways responded to search"); + if let Some(response_addr) = &item.response_addr { + if response_addr == a2 { + do_block = true; + warn!("gateways responded to search"); + } } } if let Some(a1) = item.addr.as_ref() { @@ -177,31 +160,41 @@ pub async fn ca_search(opts: ListenFromFileOpts) -> Result<(), Error> { } } if do_block { - info!("blocking {item:?}"); + info!("blacklisting {item:?}"); } else { - info!("using {item:?}"); + /* let srcaddr = item.src.to_string(); let addr = item.addr.map(|x| x.to_string()).unwrap_or(String::new()); let rows = pg_client .query(&qu_select, &[&facility, &item.channel, &srcaddr]) .await .unwrap(); - if rows.is_empty() { - info!("insert {item:?}"); + if true || rows.is_empty() { + //info!("insert {item:?}"); pg_client .execute(&qu_insert, &[&facility, &item.channel, &srcaddr, &addr]) .await .unwrap(); } else { - info!("update {item:?}"); + //info!("update {item:?}"); let addr2: &str = rows[0].get(0); - if addr2 != addr { - pg_client - .execute(&qu_update, &[&facility, &item.channel, &srcaddr, &addr]) - .await - .unwrap(); - } + if addr2 != addr {} + pg_client + .execute(&qu_update, &[&facility, &item.channel, &srcaddr, &addr]) + .await + .unwrap(); } + */ + let queryaddr = item.query_addr.map(|x| x.to_string()); + let responseaddr = item.response_addr.map(|x| x.to_string()); + let addr = item.addr.map(|x| x.to_string()); + pg_client + .execute( + &qu_insert, + &[&facility, &item.channel, &queryaddr, &responseaddr, &addr], + ) + .await + .unwrap(); } } } diff --git a/netfetch/src/metrics.rs b/netfetch/src/metrics.rs index a001141..4f10134 100644 --- a/netfetch/src/metrics.rs +++ b/netfetch/src/metrics.rs @@ -37,7 +37,7 @@ async fn find_channel( ingest_commons: Arc, ) -> axum::Json)>> { let pattern = params.get("pattern").map_or(String::new(), |x| x.clone()).to_string(); - let g = ingest_commons.command_queue_set.queues().lock().await; + let g = ingest_commons.command_queue_set.queues_locked().await; let mut it = g.iter(); let rxs = send_command(&mut it, || ConnCommand::find_channel(pattern.clone())).await; let mut res = Vec::new(); @@ -58,8 +58,7 @@ async fn channel_add(params: HashMap, ingest_commons: Arc { if ingest_commons .command_queue_set - .queues() - .lock() + .queues_locked() .await .contains_key(&addr) { @@ -85,7 +84,7 @@ async fn channel_add(params: HashMap, ingest_commons: Arc, ingest_commons: Arc) -> String { let name = params.get("name").map_or(String::new(), |x| x.clone()).to_string(); - let g = ingest_commons.command_queue_set.queues().lock().await; + let g = ingest_commons.command_queue_set.queues_locked().await; let mut rxs = Vec::new(); for (_, tx) in g.iter() { let (cmd, rx) = ConnCommand::channel_state(name.clone()); @@ -190,7 +189,7 @@ async fn channel_states( _params: HashMap, ingest_commons: Arc, ) -> axum::Json> { - let g = ingest_commons.command_queue_set.queues().lock().await; + let g = ingest_commons.command_queue_set.queues_locked().await; let mut rxs = Vec::new(); for (_, tx) in g.iter() { let (cmd, rx) = ConnCommand::channel_states_all();