diff --git a/netfetch/src/ca.rs b/netfetch/src/ca.rs index db8d1be..fb74f70 100644 --- a/netfetch/src/ca.rs +++ b/netfetch/src/ca.rs @@ -16,13 +16,12 @@ use futures_util::stream::FuturesUnordered; use futures_util::{FutureExt, StreamExt}; use log::*; use netpod::{Database, ScyllaConfig}; -use scylla::batch::Consistency; use serde::{Deserialize, Serialize}; use stats::{CaConnStats, CaConnStatsAgg, CaConnStatsAggDiff}; use std::collections::{BTreeMap, VecDeque}; use std::ffi::CStr; use std::mem::MaybeUninit; -use std::net::{Ipv4Addr, SocketAddrV4}; +use std::net::{IpAddr, Ipv4Addr, SocketAddrV4}; use std::path::PathBuf; use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; use std::sync::{Arc, Mutex, Once}; @@ -55,8 +54,8 @@ struct ChannelConfig { search_blacklist: Vec, #[serde(default)] tmp_remove: Vec, - addr_bind: Option, - addr_conn: Option, + addr_bind: Option, + addr_conn: Option, whitelist: Option, blacklist: Option, max_simul: Option, @@ -102,8 +101,8 @@ pub async fn parse_config(config: PathBuf) -> Result { channels: conf.channels, search: conf.search, search_blacklist: conf.search_blacklist, - addr_bind: conf.addr_bind.unwrap_or(Ipv4Addr::new(0, 0, 0, 0)), - addr_conn: conf.addr_conn.unwrap_or(Ipv4Addr::new(255, 255, 255, 255)), + addr_bind: conf.addr_bind.unwrap_or(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0))), + addr_conn: conf.addr_conn.unwrap_or(IpAddr::V4(Ipv4Addr::new(255, 255, 255, 255))), timeout: conf.timeout.unwrap_or(2000), pgconf: conf.postgresql, scyconf: conf.scylla, @@ -122,8 +121,8 @@ pub struct CaConnectOpts { pub channels: Vec, pub search: Vec, pub search_blacklist: Vec, - pub addr_bind: Ipv4Addr, - pub addr_conn: Ipv4Addr, + pub addr_bind: IpAddr, + pub addr_conn: IpAddr, pub timeout: u64, pub pgconf: Database, pub scyconf: ScyllaConfig, @@ -148,15 +147,7 @@ async fn spawn_scylla_insert_workers( let mut jhs = Vec::new(); let mut data_stores = Vec::new(); for _ in 0..insert_scylla_sessions { - let scy = scylla::SessionBuilder::new() - .known_nodes(&scyconf.hosts) - .default_consistency(Consistency::One) - .use_keyspace(&scyconf.keyspace, true) - .build() - .await - .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; - let scy = Arc::new(scy); - let data_store = Arc::new(DataStore::new(pg_client.clone(), scy.clone()).await?); + let data_store = Arc::new(DataStore::new(&scyconf, pg_client.clone()).await?); data_stores.push(data_store); } for i1 in 0..insert_worker_count { @@ -490,14 +481,6 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { tokio::spawn(pg_conn); let pg_client = Arc::new(pg_client); - let scy = scylla::SessionBuilder::new() - .known_nodes(&scyconf.hosts) - .default_consistency(Consistency::One) - .use_keyspace(scyconf.keyspace, true) - .build() - .await - .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; - let scy = Arc::new(scy); // TODO use new struct: let local_stats = Arc::new(CaConnStats::new()); // TODO factor the find loop into a separate Stream. @@ -523,7 +506,7 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { let mut channels_by_host = BTreeMap::new(); - let data_store = Arc::new(DataStore::new(pg_client.clone(), scy.clone()).await?); + let data_store = Arc::new(DataStore::new(&scyconf, pg_client.clone()).await?); let insert_item_queue = CommonInsertItemQueue::new(opts.insert_item_queue_cap); let insert_item_queue = Arc::new(insert_item_queue); // TODO use a new stats struct @@ -717,13 +700,12 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { let mut sent_stop_commands = false; loop { if SIGINT.load(Ordering::Acquire) != 0 { - let receiver = insert_item_queue.receiver(); - info!( - "item queue AGAIN senders {} receivers {}", - receiver.sender_count(), - receiver.receiver_count() - ); - info!("Stopping"); + if false { + let receiver = insert_item_queue.receiver(); + let sc = receiver.sender_count(); + let rc = receiver.receiver_count(); + info!("item queue senders {} receivers {}", sc, rc); + } if !sent_stop_commands { sent_stop_commands = true; info!("sending stop command"); @@ -743,7 +725,6 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { a = jh => match a { Ok(k) => match k { Ok(_) => { - info!("joined"); } Err(e) => { error!("{e:?}"); @@ -755,6 +736,9 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { }, _b = tokio::time::sleep(Duration::from_millis(1000)).fuse() => { conn_jhs.push_back(jh); + if sent_stop_commands { + info!("waiting for {} connections", conn_jhs.len()); + } } }; } @@ -764,24 +748,47 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { metrics_agg_jh.abort(); drop(metrics_agg_jh); + if false { + let sender = insert_item_queue.sender_raw(); + sender.close(); + let receiver = insert_item_queue.receiver(); + receiver.close(); + } + if true { + let receiver = insert_item_queue.receiver(); + let sc = receiver.sender_count(); + let rc = receiver.receiver_count(); + info!("item queue A senders {} receivers {}", sc, rc); + } let receiver = insert_item_queue.receiver(); drop(insert_item_queue); - info!( - "item queue AGAIN senders {} receivers {}", - receiver.sender_count(), - receiver.receiver_count() - ); + if true { + let sc = receiver.sender_count(); + let rc = receiver.receiver_count(); + info!("item queue B senders {} receivers {}", sc, rc); + } let mut futs = FuturesUnordered::from_iter(jh_insert_workers); - while let Some(x) = futs.next().await { - match x { - Ok(_) => { - info!("insert worker done"); + loop { + futures_util::select!( + x = futs.next() => match x { + Some(Ok(_)) => { + info!("waiting for {} inserts", futs.len()); + } + Some(Err(e)) => { + error!("error on shutdown: {e:?}"); + } + None => break, + }, + _ = tokio::time::sleep(Duration::from_millis(1000)).fuse() => { + info!("waiting for {} inserters", futs.len()); + if true { + let sc = receiver.sender_count(); + let rc = receiver.receiver_count(); + info!("item queue B senders {} receivers {}", sc, rc); + } } - Err(e) => { - error!("error on shutdown: {e:?}"); - } - } + ); } info!("all insert workers done."); Ok(()) diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 16b32bf..db9b411 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -18,7 +18,7 @@ use netpod::{ScalarType, Shape}; use serde::Serialize; use stats::{CaConnStats, IntervalEma}; use std::collections::{BTreeMap, VecDeque}; -use std::net::{Ipv4Addr, SocketAddrV4}; +use std::net::SocketAddrV4; use std::ops::ControlFlow; use std::pin::Pin; use std::sync::atomic::{AtomicU64, Ordering}; @@ -1205,13 +1205,8 @@ impl CaConn { self.proto = Some(proto); None } - Ok(Err(e)) => { + Ok(Err(_e)) => { // TODO log with exponential backoff - // 172.26.24.118:2072 - const ADDR2: Ipv4Addr = Ipv4Addr::new(172, 26, 24, 118); - if addr.ip() == &ADDR2 && addr.port() == 2072 { - warn!("error during connect to {addr:?} {e:?}"); - } let addr = addr.clone(); self.insert_item_queue .push_back(QueryItem::ConnectionStatus(ConnectionStatusItem { diff --git a/netfetch/src/ca/search.rs b/netfetch/src/ca/search.rs index 98aceae..661cbb1 100644 --- a/netfetch/src/ca/search.rs +++ b/netfetch/src/ca/search.rs @@ -4,20 +4,20 @@ use err::Error; use futures_util::StreamExt; use log::*; use netpod::Database; -use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; +use std::net::{IpAddr, SocketAddr}; use std::sync::Arc; use std::time::{Duration, Instant}; -async fn resolve_address(addr_str: &str) -> Result { +async fn resolve_address(addr_str: &str) -> Result { const PORT_DEFAULT: u16 = 5064; - let ac = match addr_str.parse::() { + let ac = match addr_str.parse::() { Ok(k) => k, Err(_) => { - trace!("can not parse {addr_str} as SocketAddrV4"); - match addr_str.parse::() { - Ok(k) => SocketAddrV4::new(k, PORT_DEFAULT), - Err(e) => { - trace!("can not parse {addr_str} as Ipv4Addr"); + trace!("can not parse {addr_str} as SocketAddr"); + match addr_str.parse::() { + Ok(k) => SocketAddr::new(k, PORT_DEFAULT), + Err(_e) => { + trace!("can not parse {addr_str} as IpAddr"); let (hostname, port) = if addr_str.contains(":") { let mut it = addr_str.split(":"); ( @@ -27,23 +27,13 @@ async fn resolve_address(addr_str: &str) -> Result { } else { (addr_str.to_string(), PORT_DEFAULT) }; - match tokio::net::lookup_host(format!("{}:33", hostname.clone())).await { - Ok(k) => { - let vs: Vec<_> = k - .filter_map(|x| match x { - SocketAddr::V4(k) => Some(k), - SocketAddr::V6(_) => { - error!("TODO ipv6 support"); - None - } - }) - .collect(); - if let Some(k) = vs.first() { - let mut k = *k; - k.set_port(port); + let host = format!("{}:{}", hostname.clone(), port); + match tokio::net::lookup_host(host.clone()).await { + Ok(mut k) => { + if let Some(k) = k.next() { k } else { - return Err(e.into()); + return Err(Error::with_msg_no_trace(format!("can not lookup host {host}"))); } } Err(e) => return Err(e.into()), @@ -112,6 +102,16 @@ pub async fn ca_search(opts: ListenFromFileOpts) -> Result<(), Error> { gw_addrs }; info!("Blacklisting {} gateways", gw_addrs.len()); + let addrs = addrs + .into_iter() + .filter_map(|x| match x { + SocketAddr::V4(x) => Some(x), + SocketAddr::V6(_) => { + error!("TODO check ipv6 support for IOCs"); + None + } + }) + .collect(); let mut finder = FindIocStream::new(addrs); for ch in &opts.channels { finder.push(ch.into()); @@ -145,7 +145,7 @@ pub async fn ca_search(opts: ListenFromFileOpts) -> Result<(), Error> { let mut do_block = false; for a2 in &gw_addrs { if let Some(response_addr) = &item.response_addr { - if response_addr == a2 { + if &SocketAddr::V4(*response_addr) == a2 { do_block = true; warn!("gateways responded to search"); } @@ -153,7 +153,7 @@ pub async fn ca_search(opts: ListenFromFileOpts) -> Result<(), Error> { } if let Some(a1) = item.addr.as_ref() { for a2 in &gw_addrs { - if a1 == a2 { + if &SocketAddr::V4(*a1) == a2 { do_block = true; warn!("do not use gateways as ioc address"); } diff --git a/netfetch/src/ca/store.rs b/netfetch/src/ca/store.rs index bb9a3f0..6f471e5 100644 --- a/netfetch/src/ca/store.rs +++ b/netfetch/src/ca/store.rs @@ -2,7 +2,9 @@ use crate::bsread::ChannelDescDecoded; use crate::series::{Existence, SeriesId}; use async_channel::{Receiver, Sender}; use err::Error; +use netpod::ScyllaConfig; use scylla::prepared_statement::PreparedStatement; +use scylla::statement::Consistency; use scylla::Session as ScySession; use std::sync::Arc; use tokio_postgres::Client as PgClient; @@ -57,7 +59,15 @@ pub struct DataStore { } impl DataStore { - pub async fn new(pg_client: Arc, scy: Arc) -> Result { + pub async fn new(scyconf: &ScyllaConfig, pg_client: Arc) -> Result { + let scy = scylla::SessionBuilder::new() + .known_nodes(&scyconf.hosts) + .default_consistency(Consistency::One) + .use_keyspace(&scyconf.keyspace, true) + .build() + .await + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + let scy = Arc::new(scy); let q = scy .prepare("insert into ts_msp (series, ts_msp) values (?, ?)") .await diff --git a/netfetch/src/store.rs b/netfetch/src/store.rs index 045e106..57d625f 100644 --- a/netfetch/src/store.rs +++ b/netfetch/src/store.rs @@ -200,6 +200,10 @@ impl CommonInsertItemQueue { } } + pub fn sender_raw(&self) -> async_channel::Sender { + self.sender.clone() + } + pub fn receiver(&self) -> async_channel::Receiver { self.recv.clone() }