From e2b337f84823b7b3c7a170a680033947a64e74be Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 26 Aug 2022 15:59:06 +0200 Subject: [PATCH] Block gateways in search --- netfetch/src/ca.rs | 23 +- netfetch/src/ca/conn.rs | 689 ++++++------------------------------- netfetch/src/ca/findioc.rs | 559 ++++++++++++++++++++++++++++++ netfetch/src/ca/search.rs | 147 ++++++-- netfetch/src/metrics.rs | 175 ++++++---- netfetch/src/store.rs | 61 +++- readme.md | 14 +- 7 files changed, 970 insertions(+), 698 deletions(-) create mode 100644 netfetch/src/ca/findioc.rs diff --git a/netfetch/src/ca.rs b/netfetch/src/ca.rs index c2ae97b..a9b5cb2 100644 --- a/netfetch/src/ca.rs +++ b/netfetch/src/ca.rs @@ -1,4 +1,5 @@ pub mod conn; +pub mod findioc; pub mod proto; pub mod search; pub mod store; @@ -155,6 +156,27 @@ async fn spawn_scylla_insert_workers( let mut i1 = 0; while let Ok(item) = recv.recv().await { match item { + QueryItem::ConnectionStatus(item) => { + match crate::store::insert_connection_status(item, &data_store, &stats).await { + Ok(_) => { + stats.store_worker_item_insert_inc(); + } + Err(e) => { + stats.store_worker_item_error_inc(); + // TODO introduce more structured error variants. + if e.msg().contains("WriteTimeout") { + tokio::time::sleep(Duration::from_millis(100)).await; + } else { + // TODO back off but continue. + error!("insert worker sees error: {e:?}"); + break; + } + } + } + } + QueryItem::ChannelStatus(_item) => { + // TODO + } QueryItem::Insert(item) => { stats.store_worker_item_recv_inc(); let insert_frac = insert_frac.load(Ordering::Acquire); @@ -424,7 +446,6 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { opts.api_bind.clone(), insert_frac.clone(), insert_ivl_min.clone(), - command_queue_set.clone(), ingest_commons.clone(), )); diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index d86e286..32ba035 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -1,15 +1,16 @@ use super::proto::{self, CaItem, CaMsg, CaMsgTy, CaProto}; use super::store::DataStore; use crate::bsread::ChannelDescDecoded; -use crate::ca::proto::{CreateChan, EventAdd, HeadInfo}; +use crate::ca::proto::{CreateChan, EventAdd}; use crate::ca::store::ChannelRegistry; use crate::series::{Existence, SeriesId}; -use crate::store::{CommonInsertItemQueueSender, InsertItem, IvlItem, MuteItem, QueryItem}; +use crate::store::{ + CommonInsertItemQueueSender, ConnectionStatus, ConnectionStatusItem, InsertItem, IvlItem, MuteItem, QueryItem, +}; use async_channel::Sender; use err::Error; use futures_util::stream::FuturesOrdered; use futures_util::{Future, FutureExt, Stream, StreamExt, TryFutureExt}; -use libc::c_int; use log::*; use netpod::timeunits::*; use netpod::{ScalarType, Shape}; @@ -18,16 +19,25 @@ use stats::{CaConnStats, IntervalEma}; use std::collections::{BTreeMap, VecDeque}; use std::net::{Ipv4Addr, SocketAddrV4}; use std::pin::Pin; -use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::task::{Context, Poll}; use std::time::{Duration, Instant, SystemTime}; -use tokio::io::unix::AsyncFd; use tokio::net::TcpStream; +#[derive(Clone, Debug, Serialize)] +pub enum ChannelConnectedInfo { + Disconnected, + Connecting, + Connected, + Error, +} + #[derive(Clone, Debug, Serialize)] pub struct ChannelStateInfo { pub name: String, + pub addr: SocketAddrV4, + pub channel_connected_info: ChannelConnectedInfo, pub scalar_type: Option, pub shape: Option, // NOTE: this solution can yield to the same Instant serialize to different string representations. @@ -123,7 +133,13 @@ enum ChannelState { } impl ChannelState { - fn to_info(&self, name: String) -> ChannelStateInfo { + fn to_info(&self, name: String, addr: SocketAddrV4) -> ChannelStateInfo { + let channel_connected_info = match self { + ChannelState::Init => ChannelConnectedInfo::Disconnected, + ChannelState::Creating { .. } => ChannelConnectedInfo::Connecting, + ChannelState::Created(_) => ChannelConnectedInfo::Connected, + ChannelState::Error(_) => ChannelConnectedInfo::Error, + }; let scalar_type = match self { ChannelState::Created(s) => Some(s.scalar_type.clone()), _ => None, @@ -157,6 +173,8 @@ impl ChannelState { let interest_score = 1. / item_recv_ivl_ema.unwrap_or(1e10).max(1e-6).min(1e10); ChannelStateInfo { name, + addr, + channel_connected_info, scalar_type, shape, ts_created, @@ -169,7 +187,10 @@ impl ChannelState { enum CaConnState { Unconnected, - Connecting(Pin> + Send>>), + Connecting( + SocketAddrV4, + Pin, tokio::time::error::Elapsed>> + Send>>, + ), Init, Listen, PeerReady, @@ -289,6 +310,8 @@ pub struct CaConn { insert_ivl_min: Arc, conn_command_tx: async_channel::Sender, conn_command_rx: async_channel::Receiver, + conn_backoff: f32, + conn_backoff_beg: f32, } impl CaConn { @@ -327,6 +350,8 @@ impl CaConn { insert_ivl_min, conn_command_tx: cq_tx, conn_command_rx: cq_rx, + conn_backoff: 0.02, + conn_backoff_beg: 0.02, } } @@ -360,7 +385,7 @@ impl CaConn { //info!("State for {name:?}"); let res = match self.cid_by_name.get(&name) { Some(cid) => match self.channels.get(cid) { - Some(state) => Some(state.to_info(name)), + Some(state) => Some(state.to_info(name, self.remote_addr_dbg.clone())), None => None, }, None => None, @@ -385,7 +410,7 @@ impl CaConn { .name_by_cid .get(cid) .map_or("--unknown--".into(), |x| x.to_string()); - state.to_info(name) + state.to_info(name, self.remote_addr_dbg.clone()) }) .collect(); let msg = (self.remote_addr_dbg.clone(), res); @@ -462,6 +487,16 @@ impl CaConn { self.name_by_cid.get(&cid).map(|x| x.as_str()) } + fn backoff_next(&mut self) -> u64 { + let dt = (self.conn_backoff * 300. * 1e3) as u64; + self.conn_backoff = (self.conn_backoff * 2.).tanh(); + dt + } + + fn backoff_reset(&mut self) { + self.conn_backoff = self.conn_backoff_beg; + } + fn handle_insert_futs(&mut self, cx: &mut Context) -> Poll> { use Poll::*; loop { @@ -1025,41 +1060,70 @@ impl Stream for CaConn { break match &mut self.state { CaConnState::Unconnected => { let addr = self.remote_addr_dbg.clone(); + trace!("create tcp connection to {:?}", (addr.ip(), addr.port())); let fut = async move { - trace!("create tcp connection to {:?}", (addr.ip(), addr.port())); - match tokio::time::timeout(Duration::from_millis(500), TcpStream::connect(addr)).await { - Ok(Ok(k)) => Ok(k), - Ok(Err(e)) => { - // TODO keep this in channel status field, or log when we have exponential backoff - trace!("Can not connect to {addr:?} {e:?}"); - Err(e.into()) - } - Err(_) => { - // TODO keep this in channel status field, or log when we have exponential backoff - trace!("Can not connect to {addr:?} timeout"); - Err(Error::with_msg_no_trace(format!("timeout"))) - } - } + tokio::time::timeout(Duration::from_millis(500), TcpStream::connect(addr)).await }; - self.state = CaConnState::Connecting(Box::pin(fut)); + self.state = CaConnState::Connecting(addr, Box::pin(fut)); continue 'outer; } - CaConnState::Connecting(ref mut fut) => { + CaConnState::Connecting(ref addr, ref mut fut) => { match fut.poll_unpin(cx) { - Ready(Ok(tcp)) => { - let proto = CaProto::new(tcp, self.remote_addr_dbg.clone(), self.array_truncate); - self.state = CaConnState::Init; - self.proto = Some(proto); - continue 'outer; - } - Ready(Err(e)) => { - // TODO keep this in channel status field, or log when we have exponential backoff - trace!("Connection error: {e:?}"); - // We can not connect to the remote. - // TODO do exponential backoff. - self.state = CaConnState::Wait(wait_fut(10000)); - self.proto = None; - continue 'outer; + Ready(connect_result) => { + match connect_result { + Ok(Ok(tcp)) => { + let addr = addr.clone(); + self.insert_item_queue.push_back(QueryItem::ConnectionStatus( + ConnectionStatusItem { + ts: SystemTime::now(), + addr, + status: ConnectionStatus::Established, + }, + )); + self.backoff_reset(); + let proto = + CaProto::new(tcp, self.remote_addr_dbg.clone(), self.array_truncate); + self.state = CaConnState::Init; + self.proto = Some(proto); + continue 'outer; + } + Ok(Err(e)) => { + // TODO log with exponential backoff + // 172.26.24.118:2072 + const ADDR2: Ipv4Addr = Ipv4Addr::new(172, 26, 24, 118); + if addr.ip() == &ADDR2 && addr.port() == 2072 { + warn!("error during connect to {addr:?} {e:?}"); + } + let addr = addr.clone(); + self.insert_item_queue.push_back(QueryItem::ConnectionStatus( + ConnectionStatusItem { + ts: SystemTime::now(), + addr, + status: ConnectionStatus::ConnectError, + }, + )); + let dt = self.backoff_next(); + self.state = CaConnState::Wait(wait_fut(dt)); + self.proto = None; + continue 'outer; + } + Err(e) => { + // TODO log with exponential backoff + trace!("timeout during connect to {addr:?} {e:?}"); + let addr = addr.clone(); + self.insert_item_queue.push_back(QueryItem::ConnectionStatus( + ConnectionStatusItem { + ts: SystemTime::now(), + addr, + status: ConnectionStatus::ConnectTimeout, + }, + )); + let dt = self.backoff_next(); + self.state = CaConnState::Wait(wait_fut(dt)); + self.proto = None; + continue 'outer; + } + } } Pending => Pending, } @@ -1144,550 +1208,3 @@ impl Stream for CaConn { ret } } - -struct SockBox(c_int); - -impl Drop for SockBox { - fn drop(self: &mut Self) { - if self.0 != -1 { - unsafe { - libc::close(self.0); - self.0 = -1; - } - } - } -} - -// TODO should be able to get away with non-atomic counters. -static BATCH_ID: AtomicUsize = AtomicUsize::new(0); -static SEARCH_ID2: AtomicUsize = AtomicUsize::new(0); - -#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug)] -struct BatchId(u32); - -#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug)] -struct SearchId(u32); - -struct SearchBatch { - ts_beg: Instant, - tgts: VecDeque, - channels: Vec, - sids: Vec, - done: Vec, -} - -#[derive(Debug)] -pub struct FindIocRes { - pub src: SocketAddrV4, - pub channel: String, - pub addr: Option, -} - -pub struct FindIocStream { - tgts: Vec, - channels_input: VecDeque, - in_flight: BTreeMap, - in_flight_max: usize, - bid_by_sid: BTreeMap, - batch_send_queue: VecDeque, - sock: SockBox, - afd: AsyncFd, - buf1: Vec, - send_addr: SocketAddrV4, - out_queue: VecDeque, - ping: Pin>, - channels_per_batch: usize, - batch_run_max: Duration, - bids_all_done: BTreeMap, - bids_timed_out: BTreeMap, - sids_done: BTreeMap, - result_for_done_sid_count: u64, -} - -impl FindIocStream { - pub fn new(tgts: Vec) -> Self { - let sock = unsafe { Self::create_socket() }.unwrap(); - let afd = AsyncFd::new(sock.0).unwrap(); - Self { - tgts, - channels_input: VecDeque::new(), - in_flight: BTreeMap::new(), - bid_by_sid: BTreeMap::new(), - batch_send_queue: VecDeque::new(), - sock, - afd, - buf1: vec![0; 1024], - send_addr: SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 5064), - out_queue: VecDeque::new(), - ping: Box::pin(tokio::time::sleep(Duration::from_millis(200))), - bids_all_done: BTreeMap::new(), - bids_timed_out: BTreeMap::new(), - sids_done: BTreeMap::new(), - result_for_done_sid_count: 0, - in_flight_max: 10, - channels_per_batch: 10, - batch_run_max: Duration::from_millis(1500), - } - } - - pub fn quick_state(&self) -> String { - format!( - "channels_input {} in_flight {} bid_by_sid {} out_queue {} result_for_done_sid_count {} bids_timed_out {}", - self.channels_input.len(), - self.in_flight.len(), - self.bid_by_sid.len(), - self.out_queue.len(), - self.result_for_done_sid_count, - self.bids_timed_out.len() - ) - } - - pub fn push(&mut self, x: String) { - self.channels_input.push_back(x); - } - - fn buf_and_batch(&mut self, bid: &BatchId) -> Option<(&mut Vec, &mut SearchBatch)> { - match self.in_flight.get_mut(bid) { - Some(batch) => Some((&mut self.buf1, batch)), - None => None, - } - } - - unsafe fn create_socket() -> Result { - let ec = libc::socket(libc::AF_INET, libc::SOCK_DGRAM, 0); - if ec == -1 { - return Err("can not create socket".into()); - } - let sock = SockBox(ec); - { - let opt: libc::c_int = 1; - let ec = libc::setsockopt( - sock.0, - libc::SOL_SOCKET, - libc::SO_BROADCAST, - &opt as *const _ as _, - std::mem::size_of::() as _, - ); - if ec == -1 { - return Err("can not enable broadcast".into()); - } - } - { - let ec = libc::fcntl(sock.0, libc::F_SETFL, libc::O_NONBLOCK); - if ec == -1 { - return Err("can not set nonblock".into()); - } - } - let ip: [u8; 4] = [0, 0, 0, 0]; - let addr = libc::sockaddr_in { - sin_family: libc::AF_INET as u16, - sin_port: 0, - sin_addr: libc::in_addr { - s_addr: u32::from_ne_bytes(ip), - }, - sin_zero: [0; 8], - }; - let addr_len = std::mem::size_of::(); - let ec = libc::bind(sock.0, &addr as *const _ as _, addr_len as _); - if ec == -1 { - return Err("can not bind socket".into()); - } - { - let mut addr = libc::sockaddr_in { - sin_family: libc::AF_INET as u16, - sin_port: 0, - sin_addr: libc::in_addr { s_addr: 0 }, - sin_zero: [0; 8], - }; - let mut addr_len = std::mem::size_of::(); - let ec = libc::getsockname(sock.0, &mut addr as *mut _ as _, &mut addr_len as *mut _ as _); - if ec == -1 { - error!("getsockname {ec}"); - return Err("can not convert raw socket to tokio socket".into()); - } else { - if false { - let ipv4 = Ipv4Addr::from(addr.sin_addr.s_addr.to_ne_bytes()); - let tcp_port = u16::from_be(addr.sin_port); - info!("bound local socket to {:?} port {}", ipv4, tcp_port); - } - } - } - Ok(sock) - } - - unsafe fn try_send(sock: i32, addr: &SocketAddrV4, buf: &[u8]) -> Poll> { - let ip = addr.ip().octets(); - let port = addr.port(); - let addr = libc::sockaddr_in { - sin_family: libc::AF_INET as u16, - sin_port: port.to_be(), - sin_addr: libc::in_addr { - s_addr: u32::from_ne_bytes(ip), - }, - sin_zero: [0; 8], - }; - let addr_len = std::mem::size_of::(); - let ec = libc::sendto( - sock, - &buf[0] as *const _ as _, - buf.len() as _, - 0, - &addr as *const _ as _, - addr_len as _, - ); - if ec == -1 { - let errno = *libc::__errno_location(); - if errno == libc::EAGAIN { - return Poll::Pending; - } else { - return Poll::Ready(Err("FindIocStream can not send".into())); - } - } - Poll::Ready(Ok(())) - } - - unsafe fn try_read(sock: i32) -> Poll), Error>> { - let mut saddr_mem = [0u8; std::mem::size_of::()]; - let mut saddr_len: libc::socklen_t = saddr_mem.len() as _; - let mut buf = vec![0u8; 1024]; - let ec = libc::recvfrom( - sock, - buf.as_mut_ptr() as _, - buf.len() as _, - libc::O_NONBLOCK, - &mut saddr_mem as *mut _ as _, - &mut saddr_len as *mut _ as _, - ); - if ec == -1 { - let errno = *libc::__errno_location(); - if errno == libc::EAGAIN { - return Poll::Pending; - } else { - return Poll::Ready(Err("FindIocStream can not read".into())); - } - } else if ec < 0 { - error!("unexpected received {ec}"); - Poll::Ready(Err(Error::with_msg_no_trace(format!("try_read ec {ec}")))) - } else if ec == 0 { - Poll::Ready(Err(Error::with_msg_no_trace(format!("try_read ec {ec}")))) - } else { - let saddr2: libc::sockaddr_in = std::mem::transmute_copy(&saddr_mem); - let src_addr = Ipv4Addr::from(saddr2.sin_addr.s_addr.to_ne_bytes()); - let src_port = u16::from_be(saddr2.sin_port); - trace!( - "received from src_addr {:?} src_port {} ec {}", - src_addr, - src_port, - ec - ); - if false { - let mut s1 = String::new(); - for i in 0..(ec as usize) { - s1.extend(format!(" {:02x}", buf[i]).chars()); - } - debug!("received answer {s1}"); - debug!( - "received answer string {}", - String::from_utf8_lossy(buf[..ec as usize].into()) - ); - } - // TODO handle if we get a too large answer. - let mut nb = crate::netbuf::NetBuf::new(2048); - nb.put_slice(&buf[..ec as usize])?; - let mut msgs = vec![]; - loop { - let n = nb.data().len(); - if n == 0 { - break; - } - if n < 16 { - error!("incomplete message, not enough for header"); - break; - } - let hi = HeadInfo::from_netbuf(&mut nb)?; - if nb.data().len() < hi.payload() { - error!("incomplete message, missing payload"); - break; - } - let msg = CaMsg::from_proto_infos(&hi, nb.data(), 32)?; - nb.adv(hi.payload())?; - msgs.push(msg); - } - let mut res = vec![]; - for msg in msgs.iter() { - match &msg.ty { - CaMsgTy::SearchRes(k) => { - let addr = SocketAddrV4::new(src_addr, k.tcp_port); - res.push((SearchId(k.id), addr)); - } - _ => {} - } - } - Poll::Ready(Ok((SocketAddrV4::new(src_addr, src_port), res))) - } - } - - fn serialize_batch(buf: &mut Vec, batch: &SearchBatch) { - buf.extend_from_slice(&[0, 0, 0, 0]); - buf.extend_from_slice(&[0, 0, 0, 13]); - buf.extend_from_slice(&[0, 0, 0, 0]); - buf.extend_from_slice(&[0, 0, 0, 0]); - for (sid, ch) in batch.sids.iter().zip(batch.channels.iter()) { - let chb = ch.as_bytes(); - let npadded = (chb.len() + 1 + 7) / 8 * 8; - let npad = npadded - chb.len(); - buf.extend_from_slice(&[0, 6]); - buf.extend_from_slice(&(npadded as u16).to_be_bytes()); - buf.extend_from_slice(&[0, 0, 0, 13]); - buf.extend_from_slice(&[0, 0, 0, 0]); - buf.extend_from_slice(&sid.0.to_be_bytes()); - buf.extend_from_slice(chb); - buf.extend_from_slice(&vec![0u8; npad]); - } - } - - fn create_in_flight(&mut self) { - let bid = BATCH_ID.fetch_add(1, Ordering::AcqRel); - let bid = BatchId(bid as u32); - let mut sids = vec![]; - let mut chs = vec![]; - while chs.len() < self.channels_per_batch && self.channels_input.len() > 0 { - let sid = SEARCH_ID2.fetch_add(1, Ordering::AcqRel); - let sid = SearchId(sid as u32); - self.bid_by_sid.insert(sid.clone(), bid.clone()); - sids.push(sid); - chs.push(self.channels_input.pop_front().unwrap()); - } - let batch = SearchBatch { - ts_beg: Instant::now(), - channels: chs, - tgts: self.tgts.iter().enumerate().map(|x| x.0).collect(), - sids, - done: vec![], - }; - self.in_flight.insert(bid.clone(), batch); - self.batch_send_queue.push_back(bid); - } - - fn handle_result(&mut self, src: SocketAddrV4, res: Vec<(SearchId, SocketAddrV4)>) { - let mut sids_remove = vec![]; - for (sid, addr) in res { - self.sids_done.insert(sid.clone(), ()); - match self.bid_by_sid.get(&sid) { - Some(bid) => { - sids_remove.push(sid.clone()); - match self.in_flight.get_mut(bid) { - Some(batch) => { - for (i2, s2) in batch.sids.iter().enumerate() { - if s2 == &sid { - match batch.channels.get(i2) { - Some(ch) => { - let res = FindIocRes { - channel: ch.into(), - addr: Some(addr), - src: src.clone(), - }; - self.out_queue.push_back(res); - } - None => { - error!("no matching channel for {sid:?}"); - } - } - } - } - // Book keeping: - batch.done.push(sid); - let mut all_done = true; - if batch.done.len() >= batch.sids.len() { - for s1 in &batch.sids { - if !batch.done.contains(s1) { - all_done = false; - break; - } - } - } else { - all_done = false; - } - if all_done { - self.bids_all_done.insert(bid.clone(), ()); - self.in_flight.remove(bid); - } - } - None => { - // TODO analyze reasons - error!("no batch for {bid:?}"); - } - } - } - None => { - // TODO analyze reasons - if self.sids_done.contains_key(&sid) { - self.result_for_done_sid_count += 1; - } else { - error!("no bid for {sid:?}"); - } - } - } - } - for sid in sids_remove { - self.bid_by_sid.remove(&sid); - } - } - - fn clear_timed_out(&mut self) { - let now = Instant::now(); - let mut bids = vec![]; - let mut sids = vec![]; - let mut chns = vec![]; - for (bid, batch) in &mut self.in_flight { - if now.duration_since(batch.ts_beg) > self.batch_run_max { - self.bids_timed_out.insert(bid.clone(), ()); - for (i2, sid) in batch.sids.iter().enumerate() { - if batch.done.contains(sid) == false { - debug!("Timeout: {bid:?} {}", batch.channels[i2]); - } - sids.push(sid.clone()); - chns.push(batch.channels[i2].clone()); - } - bids.push(bid.clone()); - } - } - for (sid, ch) in sids.into_iter().zip(chns) { - let res = FindIocRes { - src: SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0), - channel: ch, - addr: None, - }; - self.out_queue.push_back(res); - self.bid_by_sid.remove(&sid); - } - for bid in bids { - self.in_flight.remove(&bid); - } - } -} - -impl Stream for FindIocStream { - type Item = Result, Error>; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - use Poll::*; - match self.ping.poll_unpin(cx) { - Ready(_) => { - self.ping = Box::pin(tokio::time::sleep(Duration::from_millis(200))); - cx.waker().wake_by_ref(); - } - Pending => {} - } - self.clear_timed_out(); - loop { - let mut loop_again = false; - if self.out_queue.is_empty() == false { - let ret = std::mem::replace(&mut self.out_queue, VecDeque::new()); - break Ready(Some(Ok(ret))); - } - if !self.buf1.is_empty() { - match self.afd.poll_write_ready(cx) { - Ready(Ok(mut g)) => match unsafe { Self::try_send(self.sock.0, &self.send_addr, &self.buf1) } { - Ready(Ok(())) => { - self.buf1.clear(); - loop_again = true; - } - Ready(Err(e)) => { - error!("{e:?}"); - } - Pending => { - g.clear_ready(); - warn!("socket seemed ready for write, but is not"); - loop_again = true; - } - }, - Ready(Err(e)) => { - let e = Error::with_msg_no_trace(format!("{e:?}")); - error!("poll_write_ready {e:?}"); - } - Pending => {} - } - } - while self.buf1.is_empty() { - match self.batch_send_queue.pop_front() { - Some(bid) => { - match self.buf_and_batch(&bid) { - Some((buf1, batch)) => { - match batch.tgts.pop_front() { - Some(tgtix) => { - Self::serialize_batch(buf1, batch); - match self.tgts.get(tgtix) { - Some(tgt) => { - let tgt = tgt.clone(); - //info!("Serialize and queue {bid:?}"); - self.send_addr = tgt.clone(); - self.batch_send_queue.push_back(bid); - loop_again = true; - } - None => { - self.buf1.clear(); - self.batch_send_queue.push_back(bid); - loop_again = true; - error!("tgtix does not exist"); - } - } - } - None => { - //info!("Batch exhausted"); - loop_again = true; - } - } - } - None => { - if self.bids_all_done.contains_key(&bid) { - // TODO count events - } else { - info!("Batch {bid:?} seems already done"); - } - loop_again = true; - } - } - } - None => break, - } - } - while !self.channels_input.is_empty() && self.in_flight.len() < self.in_flight_max { - self.create_in_flight(); - loop_again = true; - } - break match self.afd.poll_read_ready(cx) { - Ready(Ok(mut g)) => match unsafe { Self::try_read(self.sock.0) } { - Ready(Ok((src, res))) => { - self.handle_result(src, res); - continue; - } - Ready(Err(e)) => { - error!("Error from try_read {e:?}"); - Ready(Some(Err(e))) - } - Pending => { - g.clear_ready(); - continue; - } - }, - Ready(Err(e)) => { - let e = Error::with_msg_no_trace(format!("{e:?}")); - error!("poll_read_ready {e:?}"); - Ready(Some(Err(e))) - } - Pending => { - if loop_again { - continue; - } else { - if self.channels_input.is_empty() && self.in_flight.is_empty() && self.out_queue.is_empty() { - Ready(None) - } else { - Pending - } - } - } - }; - } - } -} diff --git a/netfetch/src/ca/findioc.rs b/netfetch/src/ca/findioc.rs new file mode 100644 index 0000000..4cb1e7f --- /dev/null +++ b/netfetch/src/ca/findioc.rs @@ -0,0 +1,559 @@ +use crate::ca::proto::{CaMsg, CaMsgTy, HeadInfo}; +use err::Error; +use futures_util::{FutureExt, Stream}; +use libc::c_int; +use log::*; +use std::collections::{BTreeMap, VecDeque}; +use std::net::{Ipv4Addr, SocketAddrV4}; +use std::pin::Pin; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::task::{Context, Poll}; +use std::time::{Duration, Instant}; +use tokio::io::unix::AsyncFd; + +struct SockBox(c_int); + +impl Drop for SockBox { + fn drop(self: &mut Self) { + if self.0 != -1 { + unsafe { + libc::close(self.0); + self.0 = -1; + } + } + } +} + +// TODO should be able to get away with non-atomic counters. +static BATCH_ID: AtomicUsize = AtomicUsize::new(0); +static SEARCH_ID2: AtomicUsize = AtomicUsize::new(0); + +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug)] +struct BatchId(u32); + +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug)] +struct SearchId(u32); + +struct SearchBatch { + ts_beg: Instant, + tgts: VecDeque, + channels: Vec, + sids: Vec, + done: Vec, +} + +#[derive(Debug)] +pub struct FindIocRes { + pub src: SocketAddrV4, + pub channel: String, + pub addr: Option, +} + +pub struct FindIocStream { + tgts: Vec, + channels_input: VecDeque, + in_flight: BTreeMap, + in_flight_max: usize, + bid_by_sid: BTreeMap, + batch_send_queue: VecDeque, + sock: SockBox, + afd: AsyncFd, + buf1: Vec, + send_addr: SocketAddrV4, + out_queue: VecDeque, + ping: Pin>, + channels_per_batch: usize, + batch_run_max: Duration, + bids_all_done: BTreeMap, + bids_timed_out: BTreeMap, + sids_done: BTreeMap, + result_for_done_sid_count: u64, +} + +impl FindIocStream { + pub fn new(tgts: Vec) -> Self { + let sock = unsafe { Self::create_socket() }.unwrap(); + let afd = AsyncFd::new(sock.0).unwrap(); + Self { + tgts, + channels_input: VecDeque::new(), + in_flight: BTreeMap::new(), + bid_by_sid: BTreeMap::new(), + batch_send_queue: VecDeque::new(), + sock, + afd, + buf1: vec![0; 1024], + send_addr: SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 5064), + out_queue: VecDeque::new(), + ping: Box::pin(tokio::time::sleep(Duration::from_millis(200))), + bids_all_done: BTreeMap::new(), + bids_timed_out: BTreeMap::new(), + sids_done: BTreeMap::new(), + result_for_done_sid_count: 0, + in_flight_max: 40, + channels_per_batch: 10, + batch_run_max: Duration::from_millis(2500), + } + } + + pub fn quick_state(&self) -> String { + format!( + "channels_input {} in_flight {} bid_by_sid {} out_queue {} result_for_done_sid_count {} bids_timed_out {}", + self.channels_input.len(), + self.in_flight.len(), + self.bid_by_sid.len(), + self.out_queue.len(), + self.result_for_done_sid_count, + self.bids_timed_out.len() + ) + } + + pub fn push(&mut self, x: String) { + self.channels_input.push_back(x); + } + + fn buf_and_batch(&mut self, bid: &BatchId) -> Option<(&mut Vec, &mut SearchBatch)> { + match self.in_flight.get_mut(bid) { + Some(batch) => Some((&mut self.buf1, batch)), + None => None, + } + } + + unsafe fn create_socket() -> Result { + let ec = libc::socket(libc::AF_INET, libc::SOCK_DGRAM, 0); + if ec == -1 { + return Err("can not create socket".into()); + } + let sock = SockBox(ec); + { + let opt: libc::c_int = 1; + let ec = libc::setsockopt( + sock.0, + libc::SOL_SOCKET, + libc::SO_BROADCAST, + &opt as *const _ as _, + std::mem::size_of::() as _, + ); + if ec == -1 { + return Err("can not enable broadcast".into()); + } + } + { + let ec = libc::fcntl(sock.0, libc::F_SETFL, libc::O_NONBLOCK); + if ec == -1 { + return Err("can not set nonblock".into()); + } + } + let ip: [u8; 4] = [0, 0, 0, 0]; + let addr = libc::sockaddr_in { + sin_family: libc::AF_INET as u16, + sin_port: 0, + sin_addr: libc::in_addr { + s_addr: u32::from_ne_bytes(ip), + }, + sin_zero: [0; 8], + }; + let addr_len = std::mem::size_of::(); + let ec = libc::bind(sock.0, &addr as *const _ as _, addr_len as _); + if ec == -1 { + return Err("can not bind socket".into()); + } + { + let mut addr = libc::sockaddr_in { + sin_family: libc::AF_INET as u16, + sin_port: 0, + sin_addr: libc::in_addr { s_addr: 0 }, + sin_zero: [0; 8], + }; + let mut addr_len = std::mem::size_of::(); + let ec = libc::getsockname(sock.0, &mut addr as *mut _ as _, &mut addr_len as *mut _ as _); + if ec == -1 { + error!("getsockname {ec}"); + return Err("can not convert raw socket to tokio socket".into()); + } else { + if false { + let ipv4 = Ipv4Addr::from(addr.sin_addr.s_addr.to_ne_bytes()); + let tcp_port = u16::from_be(addr.sin_port); + info!("bound local socket to {:?} port {}", ipv4, tcp_port); + } + } + } + Ok(sock) + } + + unsafe fn try_send(sock: i32, addr: &SocketAddrV4, buf: &[u8]) -> Poll> { + let ip = addr.ip().octets(); + let port = addr.port(); + let addr = libc::sockaddr_in { + sin_family: libc::AF_INET as u16, + sin_port: port.to_be(), + sin_addr: libc::in_addr { + s_addr: u32::from_ne_bytes(ip), + }, + sin_zero: [0; 8], + }; + let addr_len = std::mem::size_of::(); + let ec = libc::sendto( + sock, + &buf[0] as *const _ as _, + buf.len() as _, + 0, + &addr as *const _ as _, + addr_len as _, + ); + if ec == -1 { + let errno = *libc::__errno_location(); + if errno == libc::EAGAIN { + return Poll::Pending; + } else { + return Poll::Ready(Err("FindIocStream can not send".into())); + } + } + Poll::Ready(Ok(())) + } + + unsafe fn try_read(sock: i32) -> Poll), Error>> { + let mut saddr_mem = [0u8; std::mem::size_of::()]; + let mut saddr_len: libc::socklen_t = saddr_mem.len() as _; + let mut buf = vec![0u8; 1024]; + let ec = libc::recvfrom( + sock, + buf.as_mut_ptr() as _, + buf.len() as _, + libc::O_NONBLOCK, + &mut saddr_mem as *mut _ as _, + &mut saddr_len as *mut _ as _, + ); + if ec == -1 { + let errno = *libc::__errno_location(); + if errno == libc::EAGAIN { + return Poll::Pending; + } else { + return Poll::Ready(Err("FindIocStream can not read".into())); + } + } else if ec < 0 { + error!("unexpected received {ec}"); + Poll::Ready(Err(Error::with_msg_no_trace(format!("try_read ec {ec}")))) + } else if ec == 0 { + Poll::Ready(Err(Error::with_msg_no_trace(format!("try_read ec {ec}")))) + } else { + let saddr2: libc::sockaddr_in = std::mem::transmute_copy(&saddr_mem); + let src_addr = Ipv4Addr::from(saddr2.sin_addr.s_addr.to_ne_bytes()); + let src_port = u16::from_be(saddr2.sin_port); + trace!( + "received from src_addr {:?} src_port {} ec {}", + src_addr, + src_port, + ec + ); + if false { + let mut s1 = String::new(); + for i in 0..(ec as usize) { + s1.extend(format!(" {:02x}", buf[i]).chars()); + } + debug!("received answer {s1}"); + debug!( + "received answer string {}", + String::from_utf8_lossy(buf[..ec as usize].into()) + ); + } + // TODO handle if we get a too large answer. + let mut nb = crate::netbuf::NetBuf::new(2048); + nb.put_slice(&buf[..ec as usize])?; + let mut msgs = vec![]; + loop { + let n = nb.data().len(); + if n == 0 { + break; + } + if n < 16 { + error!("incomplete message, not enough for header"); + break; + } + let hi = HeadInfo::from_netbuf(&mut nb)?; + if nb.data().len() < hi.payload() { + error!("incomplete message, missing payload"); + break; + } + let msg = CaMsg::from_proto_infos(&hi, nb.data(), 32)?; + nb.adv(hi.payload())?; + msgs.push(msg); + } + let mut res = vec![]; + for msg in msgs.iter() { + match &msg.ty { + CaMsgTy::SearchRes(k) => { + let addr = SocketAddrV4::new(src_addr, k.tcp_port); + res.push((SearchId(k.id), addr)); + } + _ => {} + } + } + Poll::Ready(Ok((SocketAddrV4::new(src_addr, src_port), res))) + } + } + + fn serialize_batch(buf: &mut Vec, batch: &SearchBatch) { + buf.extend_from_slice(&[0, 0, 0, 0]); + buf.extend_from_slice(&[0, 0, 0, 13]); + buf.extend_from_slice(&[0, 0, 0, 0]); + buf.extend_from_slice(&[0, 0, 0, 0]); + for (sid, ch) in batch.sids.iter().zip(batch.channels.iter()) { + let chb = ch.as_bytes(); + let npadded = (chb.len() + 1 + 7) / 8 * 8; + let npad = npadded - chb.len(); + buf.extend_from_slice(&[0, 6]); + buf.extend_from_slice(&(npadded as u16).to_be_bytes()); + buf.extend_from_slice(&[0, 0, 0, 13]); + buf.extend_from_slice(&[0, 0, 0, 0]); + buf.extend_from_slice(&sid.0.to_be_bytes()); + buf.extend_from_slice(chb); + buf.extend_from_slice(&vec![0u8; npad]); + } + } + + fn create_in_flight(&mut self) { + let bid = BATCH_ID.fetch_add(1, Ordering::AcqRel); + let bid = BatchId(bid as u32); + let mut sids = vec![]; + let mut chs = vec![]; + while chs.len() < self.channels_per_batch && self.channels_input.len() > 0 { + let sid = SEARCH_ID2.fetch_add(1, Ordering::AcqRel); + let sid = SearchId(sid as u32); + self.bid_by_sid.insert(sid.clone(), bid.clone()); + sids.push(sid); + chs.push(self.channels_input.pop_front().unwrap()); + } + let batch = SearchBatch { + ts_beg: Instant::now(), + channels: chs, + tgts: self.tgts.iter().enumerate().map(|x| x.0).collect(), + sids, + done: vec![], + }; + self.in_flight.insert(bid.clone(), batch); + self.batch_send_queue.push_back(bid); + } + + fn handle_result(&mut self, src: SocketAddrV4, res: Vec<(SearchId, SocketAddrV4)>) { + let mut sids_remove = vec![]; + for (sid, addr) in res { + self.sids_done.insert(sid.clone(), ()); + match self.bid_by_sid.get(&sid) { + Some(bid) => { + sids_remove.push(sid.clone()); + match self.in_flight.get_mut(bid) { + Some(batch) => { + for (i2, s2) in batch.sids.iter().enumerate() { + if s2 == &sid { + match batch.channels.get(i2) { + Some(ch) => { + let res = FindIocRes { + channel: ch.into(), + addr: Some(addr), + src: src.clone(), + }; + self.out_queue.push_back(res); + } + None => { + error!("no matching channel for {sid:?}"); + } + } + } + } + // Book keeping: + batch.done.push(sid); + let mut all_done = true; + if batch.done.len() >= batch.sids.len() { + for s1 in &batch.sids { + if !batch.done.contains(s1) { + all_done = false; + break; + } + } + } else { + all_done = false; + } + if all_done { + self.bids_all_done.insert(bid.clone(), ()); + self.in_flight.remove(bid); + } + } + None => { + // TODO analyze reasons + error!("no batch for {bid:?}"); + } + } + } + None => { + // TODO analyze reasons + if self.sids_done.contains_key(&sid) { + self.result_for_done_sid_count += 1; + } else { + error!("no bid for {sid:?}"); + } + } + } + } + for sid in sids_remove { + self.bid_by_sid.remove(&sid); + } + } + + fn clear_timed_out(&mut self) { + let now = Instant::now(); + let mut bids = vec![]; + let mut sids = vec![]; + let mut chns = vec![]; + for (bid, batch) in &mut self.in_flight { + if now.duration_since(batch.ts_beg) > self.batch_run_max { + self.bids_timed_out.insert(bid.clone(), ()); + for (i2, sid) in batch.sids.iter().enumerate() { + if batch.done.contains(sid) == false { + debug!("Timeout: {bid:?} {}", batch.channels[i2]); + } + sids.push(sid.clone()); + chns.push(batch.channels[i2].clone()); + } + bids.push(bid.clone()); + } + } + for (sid, ch) in sids.into_iter().zip(chns) { + let res = FindIocRes { + src: SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0), + channel: ch, + addr: None, + }; + self.out_queue.push_back(res); + self.bid_by_sid.remove(&sid); + } + for bid in bids { + self.in_flight.remove(&bid); + } + } +} + +impl Stream for FindIocStream { + type Item = Result, Error>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + match self.ping.poll_unpin(cx) { + Ready(_) => { + self.ping = Box::pin(tokio::time::sleep(Duration::from_millis(200))); + cx.waker().wake_by_ref(); + } + Pending => {} + } + self.clear_timed_out(); + loop { + let mut loop_again = false; + if self.out_queue.is_empty() == false { + let ret = std::mem::replace(&mut self.out_queue, VecDeque::new()); + break Ready(Some(Ok(ret))); + } + if !self.buf1.is_empty() { + match self.afd.poll_write_ready(cx) { + Ready(Ok(mut g)) => match unsafe { Self::try_send(self.sock.0, &self.send_addr, &self.buf1) } { + Ready(Ok(())) => { + self.buf1.clear(); + loop_again = true; + } + Ready(Err(e)) => { + error!("{e:?}"); + } + Pending => { + g.clear_ready(); + warn!("socket seemed ready for write, but is not"); + loop_again = true; + } + }, + Ready(Err(e)) => { + let e = Error::with_msg_no_trace(format!("{e:?}")); + error!("poll_write_ready {e:?}"); + } + Pending => {} + } + } + while self.buf1.is_empty() { + match self.batch_send_queue.pop_front() { + Some(bid) => { + match self.buf_and_batch(&bid) { + Some((buf1, batch)) => { + match batch.tgts.pop_front() { + Some(tgtix) => { + Self::serialize_batch(buf1, batch); + match self.tgts.get(tgtix) { + Some(tgt) => { + let tgt = tgt.clone(); + //info!("Serialize and queue {bid:?}"); + self.send_addr = tgt.clone(); + self.batch_send_queue.push_back(bid); + loop_again = true; + } + None => { + self.buf1.clear(); + self.batch_send_queue.push_back(bid); + loop_again = true; + error!("tgtix does not exist"); + } + } + } + None => { + //info!("Batch exhausted"); + loop_again = true; + } + } + } + None => { + if self.bids_all_done.contains_key(&bid) { + // TODO count events + } else { + info!("Batch {bid:?} seems already done"); + } + loop_again = true; + } + } + } + None => break, + } + } + while !self.channels_input.is_empty() && self.in_flight.len() < self.in_flight_max { + self.create_in_flight(); + loop_again = true; + } + break match self.afd.poll_read_ready(cx) { + Ready(Ok(mut g)) => match unsafe { Self::try_read(self.sock.0) } { + Ready(Ok((src, res))) => { + self.handle_result(src, res); + continue; + } + Ready(Err(e)) => { + error!("Error from try_read {e:?}"); + Ready(Some(Err(e))) + } + Pending => { + g.clear_ready(); + continue; + } + }, + Ready(Err(e)) => { + let e = Error::with_msg_no_trace(format!("{e:?}")); + error!("poll_read_ready {e:?}"); + Ready(Some(Err(e))) + } + Pending => { + if loop_again { + continue; + } else { + if self.channels_input.is_empty() && self.in_flight.is_empty() && self.out_queue.is_empty() { + Ready(None) + } else { + Pending + } + } + } + }; + } + } +} diff --git a/netfetch/src/ca/search.rs b/netfetch/src/ca/search.rs index 3e4aa95..298d1b1 100644 --- a/netfetch/src/ca/search.rs +++ b/netfetch/src/ca/search.rs @@ -1,3 +1,4 @@ +use crate::ca::findioc::FindIocStream; use crate::ca::{parse_config, ListenFromFileOpts}; use err::Error; use futures_util::StreamExt; @@ -7,35 +8,49 @@ use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use std::sync::Arc; use std::time::{Duration, Instant}; -use super::conn::FindIocStream; - async fn resolve_address(addr_str: &str) -> Result { const PORT_DEFAULT: u16 = 5064; let ac = match addr_str.parse::() { Ok(k) => k, - Err(_) => match addr_str.parse::() { - Ok(k) => SocketAddrV4::new(k, PORT_DEFAULT), - Err(e) => match tokio::net::lookup_host(&addr_str).await { - Ok(k) => { - let vs: Vec<_> = k - .filter_map(|x| match x { - SocketAddr::V4(k) => Some(k), - SocketAddr::V6(_) => None, - }) - .collect(); - if let Some(k) = vs.first() { - *k + Err(_) => { + trace!("can not parse {addr_str} as SocketAddrV4"); + match addr_str.parse::() { + Ok(k) => SocketAddrV4::new(k, PORT_DEFAULT), + Err(e) => { + trace!("can not parse {addr_str} as Ipv4Addr"); + let (hostname, port) = if addr_str.contains(":") { + let mut it = addr_str.split(":"); + ( + it.next().unwrap().to_string(), + it.next().unwrap().parse::().unwrap(), + ) } else { - error!("Can not understand name for {:?} {:?}", addr_str, vs); - return Err(e.into()); + (addr_str.to_string(), PORT_DEFAULT) + }; + match tokio::net::lookup_host(format!("{}:33", hostname.clone())).await { + Ok(k) => { + let vs: Vec<_> = k + .filter_map(|x| match x { + SocketAddr::V4(k) => Some(k), + SocketAddr::V6(_) => { + error!("TODO ipv6 support"); + None + } + }) + .collect(); + if let Some(k) = vs.first() { + let mut k = *k; + k.set_port(port); + k + } else { + return Err(e.into()); + } + } + Err(e) => return Err(e.into()), } } - Err(e) => { - error!("{e:?}"); - return Err(e.into()); - } - }, - }, + } + } }; Ok(ac) } @@ -79,9 +94,43 @@ pub async fn ca_search(opts: ListenFromFileOpts) -> Result<(), Error> { .unwrap(); let mut addrs = vec![]; for s in &opts.search { - let x = resolve_address(s).await?; - addrs.push(x); + match resolve_address(s).await { + Ok(addr) => { + info!("resolved {s} as {addr}"); + addrs.push(addr); + } + Err(e) => { + error!("can not resolve {s} {e}"); + } + } } + let gw_addrs = { + // Try to blacklist.. + // TODO if it helps, add a config option for it. + let gateways = [ + "sf-cagw", + "saresa-cagw", + "saresb-cagw", + "saresc-cagw", + "satesd-cagw", + "satese-cagw", + "satesf-cagw", + ]; + let mut gw_addrs = vec![]; + for s in gateways { + match resolve_address(s).await { + Ok(addr) => { + info!("resolved {s} as {addr}"); + gw_addrs.push(addr); + } + Err(e) => { + error!("can not resolve {s} {e}"); + } + } + } + gw_addrs + }; + info!("Blacklisting {} gateways", gw_addrs.len()); let mut finder = FindIocStream::new(addrs); for ch in &opts.channels { finder.push(ch.into()); @@ -93,7 +142,7 @@ pub async fn ca_search(opts: ListenFromFileOpts) -> Result<(), Error> { ts_last = ts_now; info!("{}", finder.quick_state()); } - let k = tokio::time::timeout(Duration::from_millis(200), finder.next()).await; + let k = tokio::time::timeout(Duration::from_millis(1500), finder.next()).await; let item = match k { Ok(Some(k)) => k, Ok(None) => { @@ -112,24 +161,46 @@ pub async fn ca_search(opts: ListenFromFileOpts) -> Result<(), Error> { } }; for item in item { - let searchaddr = item.src.to_string(); - let addr = item.addr.map(|x| x.to_string()).unwrap_or(String::new()); - let rows = pg_client - .query(&qu_select, &[&facility, &item.channel, &searchaddr]) - .await - .unwrap(); - if rows.is_empty() { - pg_client - .execute(&qu_insert, &[&facility, &item.channel, &searchaddr, &addr]) + let mut do_block = false; + for a2 in &gw_addrs { + if &item.src == a2 { + do_block = true; + warn!("gateways responded to search"); + } + } + if let Some(a1) = item.addr.as_ref() { + for a2 in &gw_addrs { + if a1 == a2 { + do_block = true; + warn!("do not use gateways as ioc address"); + } + } + } + if do_block { + info!("blocking {item:?}"); + } else { + info!("using {item:?}"); + let srcaddr = item.src.to_string(); + let addr = item.addr.map(|x| x.to_string()).unwrap_or(String::new()); + let rows = pg_client + .query(&qu_select, &[&facility, &item.channel, &srcaddr]) .await .unwrap(); - } else { - let addr2: &str = rows[0].get(0); - if addr2 != addr { + if rows.is_empty() { + info!("insert {item:?}"); pg_client - .execute(&qu_update, &[&facility, &item.channel, &searchaddr, &addr]) + .execute(&qu_insert, &[&facility, &item.channel, &srcaddr, &addr]) .await .unwrap(); + } else { + info!("update {item:?}"); + let addr2: &str = rows[0].get(0); + if addr2 != addr { + pg_client + .execute(&qu_update, &[&facility, &item.channel, &srcaddr, &addr]) + .await + .unwrap(); + } } } } diff --git a/netfetch/src/metrics.rs b/netfetch/src/metrics.rs index 2b7da36..a001141 100644 --- a/netfetch/src/metrics.rs +++ b/netfetch/src/metrics.rs @@ -1,5 +1,5 @@ use crate::ca::conn::ConnCommand; -use crate::ca::{CommandQueueSet, IngestCommons}; +use crate::ca::IngestCommons; use axum::extract::Query; use http::request::Parts; use log::*; @@ -9,7 +9,46 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; async fn get_empty() -> String { - format!("") + String::new() +} + +async fn send_command<'a, IT, F, R>(it: &mut IT, cmdgen: F) -> Vec> +where + IT: Iterator)>, + F: Fn() -> (ConnCommand, async_channel::Receiver), +{ + let mut rxs = Vec::new(); + for (_, tx) in it { + let (cmd, rx) = cmdgen(); + match tx.send(cmd).await { + Ok(()) => { + rxs.push(rx); + } + Err(e) => { + error!("can not send command {e:?}"); + } + } + } + rxs +} + +async fn find_channel( + params: HashMap, + ingest_commons: Arc, +) -> axum::Json)>> { + let pattern = params.get("pattern").map_or(String::new(), |x| x.clone()).to_string(); + let g = ingest_commons.command_queue_set.queues().lock().await; + let mut it = g.iter(); + let rxs = send_command(&mut it, || ConnCommand::find_channel(pattern.clone())).await; + let mut res = Vec::new(); + for rx in rxs { + let item = rx.recv().await.unwrap(); + if item.1.len() > 0 { + let item = (item.0.to_string(), item.1); + res.push(item); + } + } + axum::Json(res) } async fn channel_add(params: HashMap, ingest_commons: Arc) -> String { @@ -121,11 +160,65 @@ async fn channel_remove( } } +async fn channel_state(params: HashMap, ingest_commons: Arc) -> String { + let name = params.get("name").map_or(String::new(), |x| x.clone()).to_string(); + let g = ingest_commons.command_queue_set.queues().lock().await; + let mut rxs = Vec::new(); + for (_, tx) in g.iter() { + let (cmd, rx) = ConnCommand::channel_state(name.clone()); + match tx.send(cmd).await { + Ok(()) => { + rxs.push(rx); + } + Err(e) => { + error!("can not send command {e:?}"); + } + } + } + let mut res = Vec::new(); + for rx in rxs { + let item = rx.recv().await.unwrap(); + if let Some(st) = item.1 { + let item = (item.0.to_string(), st); + res.push(item); + } + } + serde_json::to_string(&res).unwrap() +} + +async fn channel_states( + _params: HashMap, + ingest_commons: Arc, +) -> axum::Json> { + let g = ingest_commons.command_queue_set.queues().lock().await; + let mut rxs = Vec::new(); + for (_, tx) in g.iter() { + let (cmd, rx) = ConnCommand::channel_states_all(); + match tx.send(cmd).await { + Ok(()) => { + rxs.push(rx); + } + Err(e) => { + error!("can not send command {e:?}"); + } + } + } + let mut res = Vec::new(); + for rx in rxs { + let item = rx.recv().await.unwrap(); + for h in item.1 { + res.push(h); + } + } + res.sort_unstable_by_key(|v| u32::MAX - v.interest_score as u32); + //let res: Vec<_> = res.into_iter().rev().take(10).collect(); + axum::Json(res) +} + pub async fn start_metrics_service( bind_to: String, insert_frac: Arc, insert_ivl_min: Arc, - command_queue_set: Arc, ingest_commons: Arc, ) { use axum::routing::{get, put}; @@ -150,96 +243,36 @@ pub async fn start_metrics_service( .route( "/daqingest/find/channel", get({ - let command_queue_set = command_queue_set.clone(); - |Query(params): Query>| async move { - let pattern = params.get("pattern").map_or(String::new(), |x| x.clone()).to_string(); - let g = command_queue_set.queues().lock().await; - let mut rxs = Vec::new(); - for (_, tx) in g.iter() { - let (cmd, rx) = ConnCommand::find_channel(pattern.clone()); - rxs.push(rx); - if let Err(_) = tx.send(cmd).await { - error!("can not send command"); - } - } - let mut res = Vec::new(); - for rx in rxs { - let item = rx.recv().await.unwrap(); - if item.1.len() > 0 { - let item = (item.0.to_string(), item.1); - res.push(item); - } - } - serde_json::to_string(&res).unwrap() - } + let ingest_commons = ingest_commons.clone(); + |Query(params): Query>| find_channel(params, ingest_commons) }), ) .route( "/daqingest/channel/state", get({ - let command_queue_set = command_queue_set.clone(); - |Query(params): Query>| async move { - let name = params.get("name").map_or(String::new(), |x| x.clone()).to_string(); - let g = command_queue_set.queues().lock().await; - let mut rxs = Vec::new(); - for (_, tx) in g.iter() { - let (cmd, rx) = ConnCommand::channel_state(name.clone()); - rxs.push(rx); - if let Err(_) = tx.send(cmd).await { - error!("can not send command"); - } - } - let mut res = Vec::new(); - for rx in rxs { - let item = rx.recv().await.unwrap(); - if let Some(st) = item.1 { - let item = (item.0.to_string(), st); - res.push(item); - } - } - serde_json::to_string(&res).unwrap() - } + let ingest_commons = ingest_commons.clone(); + |Query(params): Query>| channel_state(params, ingest_commons) }), ) .route( "/daqingest/channel/states", get({ - let command_queue_set = command_queue_set.clone(); - |Query(_params): Query>| async move { - let g = command_queue_set.queues().lock().await; - let mut rxs = Vec::new(); - for (_, tx) in g.iter() { - let (cmd, rx) = ConnCommand::channel_states_all(); - rxs.push(rx); - if let Err(_) = tx.send(cmd).await { - error!("can not send command"); - } - } - let mut res = Vec::new(); - for rx in rxs { - let item = rx.recv().await.unwrap(); - for h in item.1 { - res.push((item.0.clone(), h)); - } - } - res.sort_unstable_by_key(|(_, v)| v.interest_score as u32); - let res: Vec<_> = res.into_iter().rev().take(10).collect(); - serde_json::to_string(&res).unwrap() - } + let ingest_commons = ingest_commons.clone(); + |Query(params): Query>| channel_states(params, ingest_commons) }), ) .route( "/daqingest/channel/add", get({ let ingest_commons = ingest_commons.clone(); - |Query(params): Query>| async move { channel_add(params, ingest_commons).await } + |Query(params): Query>| channel_add(params, ingest_commons) }), ) .route( "/daqingest/channel/remove", get({ let ingest_commons = ingest_commons.clone(); - |Query(params): Query>| async move { channel_remove(params, ingest_commons).await } + |Query(params): Query>| channel_remove(params, ingest_commons) }), ) .route( diff --git a/netfetch/src/store.rs b/netfetch/src/store.rs index de84795..cb6a649 100644 --- a/netfetch/src/store.rs +++ b/netfetch/src/store.rs @@ -10,10 +10,11 @@ use scylla::prepared_statement::PreparedStatement; use scylla::transport::errors::QueryError; use scylla::{QueryResult, Session as ScySession}; use stats::CaConnStats; +use std::net::SocketAddrV4; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use std::time::Instant; +use std::time::{Instant, SystemTime}; pub struct ScyInsertFut { #[allow(unused)] @@ -83,6 +84,36 @@ impl Future for ScyInsertFut { } } +#[derive(Debug)] +pub enum ConnectionStatus { + ConnectError = 1, + ConnectTimeout = 2, + Established = 3, + Closing = 4, + ClosedUnexpected = 5, +} + +#[derive(Debug)] +pub struct ConnectionStatusItem { + pub ts: SystemTime, + pub addr: SocketAddrV4, + pub status: ConnectionStatus, +} + +#[derive(Debug)] +pub enum ChannelStatus { + Opened = 1, + Closed = 2, + ClosedUnexpected = 3, +} + +#[derive(Debug)] +pub struct ChannelStatusItem { + pub ts: SystemTime, + pub series: u64, + pub status: ChannelStatus, +} + #[derive(Debug)] pub struct InsertItem { pub series: u64, @@ -114,6 +145,8 @@ pub struct IvlItem { #[derive(Debug)] pub enum QueryItem { + ConnectionStatus(ConnectionStatusItem), + ChannelStatus(ChannelStatusItem), Insert(InsertItem), Mute(MuteItem), Ivl(IvlItem), @@ -266,3 +299,29 @@ pub async fn insert_item(item: InsertItem, data_store: &DataStore, stats: &CaCon stats.inserts_val_inc(); Ok(()) } + +pub async fn insert_connection_status( + item: ConnectionStatusItem, + data_store: &DataStore, + _stats: &CaConnStats, +) -> Result<(), Error> { + let tsunix = item.ts.duration_since(std::time::UNIX_EPOCH).unwrap(); + let secs = tsunix.as_secs() * netpod::timeunits::SEC; + let nanos = tsunix.subsec_nanos() as u64; + let ts = secs + nanos; + let div = netpod::timeunits::SEC * 600; + let ts_msp = ts / div * div; + let ts_lsp = ts - ts_msp; + let kind = item.status as u32; + let addr = format!("{}", item.addr); + let params = (ts_msp as i64, ts_lsp as i64, kind as i32, addr); + data_store + .scy + .query( + "insert into connection_status (ts_msp, ts_lsp, kind, addr) values (?, ?, ?, ?)", + params, + ) + .await + .err_conv()?; + Ok(()) +} diff --git a/readme.md b/readme.md index f6ceb76..6dd4732 100644 --- a/readme.md +++ b/readme.md @@ -26,7 +26,7 @@ api_bind: "0.0.0.0:3011" local_epics_hostname: sf-daqsync-02.psi.ch # The backend name to use for the channels handled by this daqingest instance: backend: scylla -# Hosts to use for channel access search: +# Addresses to use for channel access search: search: - "172.26.0.255" - "172.26.2.255" @@ -49,3 +49,15 @@ channels: - "SOME-CHANNEL:1" - "OTHER-CHANNEL:2" ``` + + +## Access status and configuration of daqingest at runtime + +Status and configuration can be accessed at runtime via http at the address +as configured by the `api_bind` parameter. + +### Check the state of a channel + +```txt +http:///daqingest/channel/state?name=[...] +```