Add partly ipv6 support and refactor a bit

This commit is contained in:
Dominik Werder
2022-09-01 18:51:49 +02:00
parent 399dfdb4bf
commit 3beae66bbf
5 changed files with 96 additions and 80 deletions

View File

@@ -16,13 +16,12 @@ use futures_util::stream::FuturesUnordered;
use futures_util::{FutureExt, StreamExt};
use log::*;
use netpod::{Database, ScyllaConfig};
use scylla::batch::Consistency;
use serde::{Deserialize, Serialize};
use stats::{CaConnStats, CaConnStatsAgg, CaConnStatsAggDiff};
use std::collections::{BTreeMap, VecDeque};
use std::ffi::CStr;
use std::mem::MaybeUninit;
use std::net::{Ipv4Addr, SocketAddrV4};
use std::net::{IpAddr, Ipv4Addr, SocketAddrV4};
use std::path::PathBuf;
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::sync::{Arc, Mutex, Once};
@@ -55,8 +54,8 @@ struct ChannelConfig {
search_blacklist: Vec<String>,
#[serde(default)]
tmp_remove: Vec<String>,
addr_bind: Option<Ipv4Addr>,
addr_conn: Option<Ipv4Addr>,
addr_bind: Option<IpAddr>,
addr_conn: Option<IpAddr>,
whitelist: Option<String>,
blacklist: Option<String>,
max_simul: Option<usize>,
@@ -102,8 +101,8 @@ pub async fn parse_config(config: PathBuf) -> Result<CaConnectOpts, Error> {
channels: conf.channels,
search: conf.search,
search_blacklist: conf.search_blacklist,
addr_bind: conf.addr_bind.unwrap_or(Ipv4Addr::new(0, 0, 0, 0)),
addr_conn: conf.addr_conn.unwrap_or(Ipv4Addr::new(255, 255, 255, 255)),
addr_bind: conf.addr_bind.unwrap_or(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0))),
addr_conn: conf.addr_conn.unwrap_or(IpAddr::V4(Ipv4Addr::new(255, 255, 255, 255))),
timeout: conf.timeout.unwrap_or(2000),
pgconf: conf.postgresql,
scyconf: conf.scylla,
@@ -122,8 +121,8 @@ pub struct CaConnectOpts {
pub channels: Vec<String>,
pub search: Vec<String>,
pub search_blacklist: Vec<String>,
pub addr_bind: Ipv4Addr,
pub addr_conn: Ipv4Addr,
pub addr_bind: IpAddr,
pub addr_conn: IpAddr,
pub timeout: u64,
pub pgconf: Database,
pub scyconf: ScyllaConfig,
@@ -148,15 +147,7 @@ async fn spawn_scylla_insert_workers(
let mut jhs = Vec::new();
let mut data_stores = Vec::new();
for _ in 0..insert_scylla_sessions {
let scy = scylla::SessionBuilder::new()
.known_nodes(&scyconf.hosts)
.default_consistency(Consistency::One)
.use_keyspace(&scyconf.keyspace, true)
.build()
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let scy = Arc::new(scy);
let data_store = Arc::new(DataStore::new(pg_client.clone(), scy.clone()).await?);
let data_store = Arc::new(DataStore::new(&scyconf, pg_client.clone()).await?);
data_stores.push(data_store);
}
for i1 in 0..insert_worker_count {
@@ -490,14 +481,6 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
tokio::spawn(pg_conn);
let pg_client = Arc::new(pg_client);
let scy = scylla::SessionBuilder::new()
.known_nodes(&scyconf.hosts)
.default_consistency(Consistency::One)
.use_keyspace(scyconf.keyspace, true)
.build()
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let scy = Arc::new(scy);
// TODO use new struct:
let local_stats = Arc::new(CaConnStats::new());
// TODO factor the find loop into a separate Stream.
@@ -523,7 +506,7 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
let mut channels_by_host = BTreeMap::new();
let data_store = Arc::new(DataStore::new(pg_client.clone(), scy.clone()).await?);
let data_store = Arc::new(DataStore::new(&scyconf, pg_client.clone()).await?);
let insert_item_queue = CommonInsertItemQueue::new(opts.insert_item_queue_cap);
let insert_item_queue = Arc::new(insert_item_queue);
// TODO use a new stats struct
@@ -717,13 +700,12 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
let mut sent_stop_commands = false;
loop {
if SIGINT.load(Ordering::Acquire) != 0 {
let receiver = insert_item_queue.receiver();
info!(
"item queue AGAIN senders {} receivers {}",
receiver.sender_count(),
receiver.receiver_count()
);
info!("Stopping");
if false {
let receiver = insert_item_queue.receiver();
let sc = receiver.sender_count();
let rc = receiver.receiver_count();
info!("item queue senders {} receivers {}", sc, rc);
}
if !sent_stop_commands {
sent_stop_commands = true;
info!("sending stop command");
@@ -743,7 +725,6 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
a = jh => match a {
Ok(k) => match k {
Ok(_) => {
info!("joined");
}
Err(e) => {
error!("{e:?}");
@@ -755,6 +736,9 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
},
_b = tokio::time::sleep(Duration::from_millis(1000)).fuse() => {
conn_jhs.push_back(jh);
if sent_stop_commands {
info!("waiting for {} connections", conn_jhs.len());
}
}
};
}
@@ -764,24 +748,47 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
metrics_agg_jh.abort();
drop(metrics_agg_jh);
if false {
let sender = insert_item_queue.sender_raw();
sender.close();
let receiver = insert_item_queue.receiver();
receiver.close();
}
if true {
let receiver = insert_item_queue.receiver();
let sc = receiver.sender_count();
let rc = receiver.receiver_count();
info!("item queue A senders {} receivers {}", sc, rc);
}
let receiver = insert_item_queue.receiver();
drop(insert_item_queue);
info!(
"item queue AGAIN senders {} receivers {}",
receiver.sender_count(),
receiver.receiver_count()
);
if true {
let sc = receiver.sender_count();
let rc = receiver.receiver_count();
info!("item queue B senders {} receivers {}", sc, rc);
}
let mut futs = FuturesUnordered::from_iter(jh_insert_workers);
while let Some(x) = futs.next().await {
match x {
Ok(_) => {
info!("insert worker done");
loop {
futures_util::select!(
x = futs.next() => match x {
Some(Ok(_)) => {
info!("waiting for {} inserts", futs.len());
}
Some(Err(e)) => {
error!("error on shutdown: {e:?}");
}
None => break,
},
_ = tokio::time::sleep(Duration::from_millis(1000)).fuse() => {
info!("waiting for {} inserters", futs.len());
if true {
let sc = receiver.sender_count();
let rc = receiver.receiver_count();
info!("item queue B senders {} receivers {}", sc, rc);
}
}
Err(e) => {
error!("error on shutdown: {e:?}");
}
}
);
}
info!("all insert workers done.");
Ok(())

View File

@@ -18,7 +18,7 @@ use netpod::{ScalarType, Shape};
use serde::Serialize;
use stats::{CaConnStats, IntervalEma};
use std::collections::{BTreeMap, VecDeque};
use std::net::{Ipv4Addr, SocketAddrV4};
use std::net::SocketAddrV4;
use std::ops::ControlFlow;
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
@@ -1205,13 +1205,8 @@ impl CaConn {
self.proto = Some(proto);
None
}
Ok(Err(e)) => {
Ok(Err(_e)) => {
// TODO log with exponential backoff
// 172.26.24.118:2072
const ADDR2: Ipv4Addr = Ipv4Addr::new(172, 26, 24, 118);
if addr.ip() == &ADDR2 && addr.port() == 2072 {
warn!("error during connect to {addr:?} {e:?}");
}
let addr = addr.clone();
self.insert_item_queue
.push_back(QueryItem::ConnectionStatus(ConnectionStatusItem {

View File

@@ -4,20 +4,20 @@ use err::Error;
use futures_util::StreamExt;
use log::*;
use netpod::Database;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::net::{IpAddr, SocketAddr};
use std::sync::Arc;
use std::time::{Duration, Instant};
async fn resolve_address(addr_str: &str) -> Result<SocketAddrV4, Error> {
async fn resolve_address(addr_str: &str) -> Result<SocketAddr, Error> {
const PORT_DEFAULT: u16 = 5064;
let ac = match addr_str.parse::<SocketAddrV4>() {
let ac = match addr_str.parse::<SocketAddr>() {
Ok(k) => k,
Err(_) => {
trace!("can not parse {addr_str} as SocketAddrV4");
match addr_str.parse::<Ipv4Addr>() {
Ok(k) => SocketAddrV4::new(k, PORT_DEFAULT),
Err(e) => {
trace!("can not parse {addr_str} as Ipv4Addr");
trace!("can not parse {addr_str} as SocketAddr");
match addr_str.parse::<IpAddr>() {
Ok(k) => SocketAddr::new(k, PORT_DEFAULT),
Err(_e) => {
trace!("can not parse {addr_str} as IpAddr");
let (hostname, port) = if addr_str.contains(":") {
let mut it = addr_str.split(":");
(
@@ -27,23 +27,13 @@ async fn resolve_address(addr_str: &str) -> Result<SocketAddrV4, Error> {
} else {
(addr_str.to_string(), PORT_DEFAULT)
};
match tokio::net::lookup_host(format!("{}:33", hostname.clone())).await {
Ok(k) => {
let vs: Vec<_> = k
.filter_map(|x| match x {
SocketAddr::V4(k) => Some(k),
SocketAddr::V6(_) => {
error!("TODO ipv6 support");
None
}
})
.collect();
if let Some(k) = vs.first() {
let mut k = *k;
k.set_port(port);
let host = format!("{}:{}", hostname.clone(), port);
match tokio::net::lookup_host(host.clone()).await {
Ok(mut k) => {
if let Some(k) = k.next() {
k
} else {
return Err(e.into());
return Err(Error::with_msg_no_trace(format!("can not lookup host {host}")));
}
}
Err(e) => return Err(e.into()),
@@ -112,6 +102,16 @@ pub async fn ca_search(opts: ListenFromFileOpts) -> Result<(), Error> {
gw_addrs
};
info!("Blacklisting {} gateways", gw_addrs.len());
let addrs = addrs
.into_iter()
.filter_map(|x| match x {
SocketAddr::V4(x) => Some(x),
SocketAddr::V6(_) => {
error!("TODO check ipv6 support for IOCs");
None
}
})
.collect();
let mut finder = FindIocStream::new(addrs);
for ch in &opts.channels {
finder.push(ch.into());
@@ -145,7 +145,7 @@ pub async fn ca_search(opts: ListenFromFileOpts) -> Result<(), Error> {
let mut do_block = false;
for a2 in &gw_addrs {
if let Some(response_addr) = &item.response_addr {
if response_addr == a2 {
if &SocketAddr::V4(*response_addr) == a2 {
do_block = true;
warn!("gateways responded to search");
}
@@ -153,7 +153,7 @@ pub async fn ca_search(opts: ListenFromFileOpts) -> Result<(), Error> {
}
if let Some(a1) = item.addr.as_ref() {
for a2 in &gw_addrs {
if a1 == a2 {
if &SocketAddr::V4(*a1) == a2 {
do_block = true;
warn!("do not use gateways as ioc address");
}

View File

@@ -2,7 +2,9 @@ use crate::bsread::ChannelDescDecoded;
use crate::series::{Existence, SeriesId};
use async_channel::{Receiver, Sender};
use err::Error;
use netpod::ScyllaConfig;
use scylla::prepared_statement::PreparedStatement;
use scylla::statement::Consistency;
use scylla::Session as ScySession;
use std::sync::Arc;
use tokio_postgres::Client as PgClient;
@@ -57,7 +59,15 @@ pub struct DataStore {
}
impl DataStore {
pub async fn new(pg_client: Arc<PgClient>, scy: Arc<ScySession>) -> Result<Self, Error> {
pub async fn new(scyconf: &ScyllaConfig, pg_client: Arc<PgClient>) -> Result<Self, Error> {
let scy = scylla::SessionBuilder::new()
.known_nodes(&scyconf.hosts)
.default_consistency(Consistency::One)
.use_keyspace(&scyconf.keyspace, true)
.build()
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let scy = Arc::new(scy);
let q = scy
.prepare("insert into ts_msp (series, ts_msp) values (?, ?)")
.await

View File

@@ -200,6 +200,10 @@ impl CommonInsertItemQueue {
}
}
pub fn sender_raw(&self) -> async_channel::Sender<QueryItem> {
self.sender.clone()
}
pub fn receiver(&self) -> async_channel::Receiver<QueryItem> {
self.recv.clone()
}