Trigger search for channel create fail

This commit is contained in:
Dominik Werder
2023-09-27 14:08:29 +02:00
parent 38ea01e724
commit 1a2ef153f6
16 changed files with 845 additions and 354 deletions

View File

@@ -42,6 +42,7 @@ where
}
},
Err(e) => {
debug!("batcher timeout rx len {}", rx.len());
let _e: tokio::time::error::Elapsed = e;
if all.len() > 0 {
do_emit = true;

View File

@@ -32,6 +32,7 @@ pub fn main() -> Result<(), Error> {
scywr::tools::fetch_events(&k.backend, &k.channel, &scylla_conf).await?
}
SubCmd::ChannelAccess(k) => match k {
#[cfg(DISABLED)]
ChannelAccess::CaSearch(k) => {
info!("daqingest version {}", clap::crate_version!());
let (conf, channels) = parse_config(k.config.into()).await?;

View File

@@ -3,7 +3,6 @@ pub mod inserthook;
use async_channel::Receiver;
use async_channel::Sender;
use async_channel::WeakReceiver;
use async_channel::WeakSender;
use err::Error;
use log::*;
@@ -13,7 +12,6 @@ use netfetch::ca::connset::CaConnSetItem;
use netfetch::conf::CaIngestOpts;
use netfetch::daemon_common::Channel;
use netfetch::daemon_common::DaemonEvent;
use netfetch::metrics::ExtraInsertsConf;
use netfetch::metrics::StatsSet;
use netfetch::throttletrace::ThrottleTrace;
use netpod::Database;
@@ -21,18 +19,10 @@ use netpod::ScyllaConfig;
use scywr::insertworker::InsertWorkerOpts;
use scywr::insertworker::Ttls;
use scywr::iteminsertqueue as scywriiq;
use scywr::store::DataStore;
use scywriiq::QueryItem;
use serde::Serialize;
use series::ChannelStatusSeriesId;
use series::SeriesId;
use stats::DaemonStats;
use stats::InsertWorkerStats;
use stats::SeriesByChannelStats;
use std::collections::BTreeMap;
use std::collections::VecDeque;
use std::net::SocketAddr;
use std::net::SocketAddrV4;
use std::sync::atomic;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::AtomicUsize;
@@ -43,9 +33,6 @@ use std::time::SystemTime;
use taskrun::tokio;
use tokio::task::JoinHandle;
const CA_CONN_INSERT_QUEUE_MAX: usize = 256;
const CHANNEL_CHECK_INTERVAL: Duration = Duration::from_millis(5000);
const PRINT_ACTIVE_INTERVAL: Duration = Duration::from_millis(60000);
const PRINT_STATUS_INTERVAL: Duration = Duration::from_millis(20000);
@@ -98,10 +85,11 @@ pub struct Daemon {
// TODO should be a stats object?
insert_workers_running: AtomicU64,
query_item_tx_weak: WeakSender<QueryItem>,
connset_health_lat_ema: f32,
}
impl Daemon {
pub async fn new(opts: DaemonOpts) -> Result<Self, Error> {
pub async fn new(opts: DaemonOpts, ingest_opts: CaIngestOpts) -> Result<Self, Error> {
let (daemon_ev_tx, daemon_ev_rx) = async_channel::bounded(32);
let series_by_channel_stats = Arc::new(SeriesByChannelStats::new());
@@ -126,7 +114,7 @@ impl Daemon {
opts.local_epics_hostname.clone(),
query_item_tx,
channel_info_query_tx,
opts.pgconf.clone(),
ingest_opts,
);
// TODO remove
@@ -230,6 +218,7 @@ impl Daemon {
connset_status_last: CheckPeriodic::Waiting(Instant::now()),
insert_workers_running: AtomicU64::new(0),
query_item_tx_weak,
connset_health_lat_ema: 0.,
};
Ok(ret)
}
@@ -242,18 +231,17 @@ impl Daemon {
match &self.connset_status_last {
CheckPeriodic::Waiting(since) => {
if *since + Duration::from_millis(5000) < ts1 {
debug!("======================================== issue health check CaConn");
self.connset_ctrl.check_health().await?;
self.connset_status_last = CheckPeriodic::Ongoing(ts1);
if let Some(tx) = self.query_item_tx_weak.upgrade() {
info!("query_item_tx len {}", tx.len());
}
}
}
CheckPeriodic::Ongoing(since) => {
let dt = ts1.saturating_duration_since(*since);
if dt > Duration::from_millis(2000) {
error!("======================================== CaConnSet has not reported health status since {:.0}", dt.as_secs_f32() * 1e3);
error!(
"CaConnSet has not reported health status since {:.0}",
dt.as_secs_f32() * 1e3
);
}
}
}
@@ -289,7 +277,7 @@ impl Daemon {
if dt > Duration::from_millis(500) {
info!("slow check_chans {}ms", dt.as_secs_f32() * 1e3);
}
if tsnow.duration_since(self.last_status_print).unwrap_or(Duration::ZERO) >= PRINT_STATUS_INTERVAL {
if false && tsnow.duration_since(self.last_status_print).unwrap_or(Duration::ZERO) >= PRINT_STATUS_INTERVAL {
self.last_status_print = tsnow;
info!(
"{:8} {:8} {:8} : {:8} : {:8} {:8} : {:10}",
@@ -382,11 +370,17 @@ impl Daemon {
let dt2 = ts3.duration_since(ts2).as_secs_f32() * 1e3;
match &self.connset_status_last {
CheckPeriodic::Waiting(_since) => {
error!("======================================== received CaConnSet health report without having asked {dt1:.0} ms {dt2:.0} ms");
error!("received CaConnSet health report without having asked {dt1:.0} ms {dt2:.0} ms");
}
CheckPeriodic::Ongoing(since) => {
let dtsince = ts3.duration_since(*since).as_secs_f32() * 1e3;
debug!("======================================== received CaConnSet healthy dtsince {dtsince:.0} ms {dt1:.0} ms {dt2:.0} ms");
// TODO insert response time as series to scylla.
let dtsince = ts3.duration_since(*since).as_secs_f32() * 1e6;
{
let v = &mut self.connset_health_lat_ema;
*v += (dtsince - *v) * 0.2;
self.stats.connset_health_lat_ema().set(*v as _);
}
// debug!("======================================== received CaConnSet healthy dtsince {dtsince:.0} ms {dt1:.0} ms {dt2:.0} ms");
self.connset_status_last = CheckPeriodic::Waiting(ts3);
}
}
@@ -604,7 +598,7 @@ pub async fn run(opts: CaIngestOpts, channels: Vec<String>) -> Result<(), Error>
insert_scylla_sessions: opts.insert_scylla_sessions(),
insert_frac: insert_frac.clone(),
};
let daemon = Daemon::new(opts2).await?;
let daemon = Daemon::new(opts2, opts.clone()).await?;
let tx = daemon.tx.clone();
let daemon_stats = daemon.stats().clone();
let connset_cmd_tx = daemon.connset_ctrl.sender().clone();
@@ -622,6 +616,7 @@ pub async fn run(opts: CaIngestOpts, channels: Vec<String>) -> Result<(), Error>
daemon.connset_ctrl.ca_proto_stats().clone(),
daemon.insert_worker_stats.clone(),
daemon.series_by_channel_stats.clone(),
daemon.connset_ctrl.ioc_finder_stats().clone(),
insert_frac,
);
let fut =
@@ -640,7 +635,7 @@ pub async fn run(opts: CaIngestOpts, channels: Vec<String>) -> Result<(), Error>
Ok(()) => {}
Err(_) => break,
}
thr_msg.trigger("sent ChannelAdd", &[&i as &_]);
thr_msg.trigger("daemon sent ChannelAdd", &[&i as &_]);
i += 1;
}
debug!("{} configured channels applied", channels.len());

View File

@@ -31,18 +31,17 @@ pub async fn active_channel_insert_hook_worker(rx: Receiver<QueryItem>, tx: Send
Shape::Wave(_) => 1,
Shape::Image(_, _) => 2,
};
if let ScalarType::STRING = item.scalar_type {
histo
.entry(item.series.clone())
.and_modify(|(c, msp, lsp, pulse, _shape_kind)| {
*c += 1;
*msp = item.ts_msp;
*lsp = item.ts_lsp;
*pulse = item.pulse;
// TODO should check that shape_kind stays the same.
})
.or_insert((0 as usize, item.ts_msp, item.ts_lsp, item.pulse, shape_kind));
}
if let ScalarType::STRING = item.scalar_type {}
histo
.entry(item.series.clone())
.and_modify(|(c, msp, lsp, pulse, _shape_kind)| {
*c += 1;
*msp = item.ts_msp;
*lsp = item.ts_lsp;
*pulse = item.pulse;
// TODO should check that shape_kind stays the same.
})
.or_insert((0 as usize, item.ts_msp, item.ts_lsp, item.pulse, shape_kind));
}
_ => {}
}

View File

@@ -79,6 +79,7 @@ pub struct BsreadDump {
#[derive(Debug, Parser)]
pub enum ChannelAccess {
CaIngest(CaConfig),
#[cfg(DISABLED)]
CaSearch(CaSearch),
}

View File

@@ -58,6 +58,8 @@ use std::time::SystemTime;
use taskrun::tokio;
use tokio::net::TcpStream;
const CONNECTING_TIMEOUT: Duration = Duration::from_millis(6000);
#[allow(unused)]
macro_rules! trace2 {
($($arg:tt)*) => {
@@ -144,8 +146,7 @@ struct Cid(pub u32);
#[derive(Clone, Debug)]
enum ChannelError {
#[allow(unused)]
NoSuccess,
CreateChanFail,
}
#[derive(Clone, Debug)]
@@ -261,8 +262,9 @@ impl ChannelState {
}
enum CaConnState {
Unconnected,
Unconnected(Instant),
Connecting(
Instant,
SocketAddrV4,
Pin<Box<dyn Future<Output = Result<Result<TcpStream, std::io::Error>, tokio::time::error::Elapsed>> + Send>>,
),
@@ -277,8 +279,8 @@ enum CaConnState {
impl fmt::Debug for CaConnState {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::Unconnected => write!(fmt, "Unconnected"),
Self::Connecting(arg0, _) => fmt.debug_tuple("Connecting").field(arg0).finish(),
Self::Unconnected(since) => fmt.debug_tuple("Unconnected").field(since).finish(),
Self::Connecting(since, addr, _) => fmt.debug_tuple("Connecting").field(since).field(addr).finish(),
Self::Init => write!(fmt, "Init"),
Self::Listen => write!(fmt, "Listen"),
Self::PeerReady => write!(fmt, "PeerReady"),
@@ -425,7 +427,9 @@ pub enum CaConnEventValue {
EchoTimeout,
ConnCommandResult(ConnCommandResult),
QueryItem(QueryItem),
ChannelCreateFail(String),
EndOfStream,
ConnectFail,
}
#[derive(Debug)]
@@ -497,7 +501,6 @@ pub struct CaConn {
ioc_ping_last: Instant,
ioc_ping_start: Option<Instant>,
storage_insert_sender: SenderPolling<QueryItem>,
cmd_res_queue: VecDeque<ConnCommandResult>,
ca_conn_event_out_queue: VecDeque<CaConnEvent>,
channel_info_query_queue: VecDeque<ChannelInfoQuery>,
channel_info_query_sending: SenderPolling<ChannelInfoQuery>,
@@ -528,7 +531,7 @@ impl CaConn {
Self {
opts,
backend,
state: CaConnState::Unconnected,
state: CaConnState::Unconnected(Instant::now()),
ticker: Self::new_self_ticker(),
proto: None,
cid_store: CidStore::new(),
@@ -552,7 +555,6 @@ impl CaConn {
ioc_ping_last: Instant::now(),
ioc_ping_start: None,
storage_insert_sender: SenderPolling::new(storage_insert_tx),
cmd_res_queue: VecDeque::new(),
ca_conn_event_out_queue: VecDeque::new(),
channel_info_query_queue: VecDeque::new(),
channel_info_query_sending: SenderPolling::new(channel_info_query_tx),
@@ -581,7 +583,34 @@ impl CaConn {
fn trigger_shutdown(&mut self, channel_reason: ChannelStatusClosedReason) {
self.state = CaConnState::Shutdown;
self.proto = None;
match &channel_reason {
ChannelStatusClosedReason::ShutdownCommand => {}
ChannelStatusClosedReason::ChannelRemove => {}
ChannelStatusClosedReason::ProtocolError => {}
ChannelStatusClosedReason::FrequencyQuota => {}
ChannelStatusClosedReason::BandwidthQuota => {}
ChannelStatusClosedReason::InternalError => {}
ChannelStatusClosedReason::IocTimeout => {}
ChannelStatusClosedReason::NoProtocol => {}
ChannelStatusClosedReason::ProtocolDone => {}
ChannelStatusClosedReason::ConnectFail => {
debug!("emit status ConnectFail");
let item = CaConnEvent {
ts: Instant::now(),
value: CaConnEventValue::ConnectFail,
};
self.ca_conn_event_out_queue.push_back(item);
}
}
self.channel_state_on_shutdown(channel_reason);
let addr = self.remote_addr_dbg.clone();
self.insert_item_queue
.push_back(QueryItem::ConnectionStatus(ConnectionStatusItem {
ts: SystemTime::now(),
addr,
// TODO map to appropriate status
status: ConnectionStatus::Closing,
}));
}
fn cmd_check_health(&mut self) {
@@ -606,7 +635,11 @@ impl CaConn {
id: ConnCommandResult::make_id(),
kind: ConnCommandResultKind::CheckHealth(health),
};
self.cmd_res_queue.push_back(res);
let item = CaConnEvent {
ts: Instant::now(),
value: CaConnEventValue::ConnCommandResult(res),
};
self.ca_conn_event_out_queue.push_back(item);
}
fn cmd_find_channel(&self, pattern: &str) {
@@ -688,16 +721,17 @@ impl CaConn {
trace2!("handle_series_lookup_result {res:?}");
match res {
Ok(res) => {
let series = res.series.into_inner();
let item = QueryItem::ChannelStatus(ChannelStatusItem {
ts: SystemTime::now(),
series: series.clone(),
status: ChannelStatus::Opened,
});
self.insert_item_queue.push_back(item);
if let Some(cid) = self.cid_by_name.get(&res.channel) {
if let Some(chst) = self.channels.get(cid) {
if let ChannelState::FetchingSeriesId(st2) = chst {
let cssid = st2.cssid.clone();
let series = res.series.into_inner();
let item = QueryItem::ChannelStatus(ChannelStatusItem {
ts: SystemTime::now(),
cssid,
status: ChannelStatus::Opened,
});
self.insert_item_queue.push_back(item);
let cid = st2.cid.clone();
let sid = st2.sid;
let data_type = st2.data_type;
@@ -805,17 +839,35 @@ impl CaConn {
)
}
pub fn channel_remove(&mut self, channel: String) {
Self::channel_remove_expl(
channel,
&mut self.channels,
&mut self.cid_by_name,
&mut self.name_by_cid,
&mut self.cid_store,
&mut self.time_binners,
)
}
fn channel_remove_by_cid(&mut self, cid: Cid) {
self.channels.remove(&cid);
self.name_by_cid.remove(&cid);
self.time_binners.remove(&cid);
self.cid_by_name.retain(|_, v| v == &cid);
}
fn channel_remove_expl(
channel: String,
name: String,
channels: &mut BTreeMap<Cid, ChannelState>,
cid_by_name: &mut BTreeMap<String, Cid>,
name_by_cid: &mut BTreeMap<Cid, String>,
cid_store: &mut CidStore,
time_binners: &mut BTreeMap<Cid, ConnTimeBin>,
) {
let cid = Self::cid_by_name_expl(&channel, cid_by_name, name_by_cid, cid_store);
let cid = Self::cid_by_name_expl(&name, cid_by_name, name_by_cid, cid_store);
if channels.contains_key(&cid) {
warn!("TODO actually cause the channel to get closed and removed {}", channel);
warn!("TODO actually cause the channel to get closed and removed {}", name);
}
{
let a: Vec<_> = cid_by_name
@@ -833,17 +885,6 @@ impl CaConn {
time_binners.remove(&cid);
}
pub fn channel_remove(&mut self, channel: String) {
Self::channel_remove_expl(
channel,
&mut self.channels,
&mut self.cid_by_name,
&mut self.name_by_cid,
&mut self.cid_store,
&mut self.time_binners,
)
}
fn cid_by_name_expl(
name: &str,
cid_by_name: &mut BTreeMap<String, Cid>,
@@ -887,10 +928,10 @@ impl CaConn {
ChannelState::FetchingSeriesId(..) => {
*chst = ChannelState::Ended;
}
ChannelState::Created(series, ..) => {
ChannelState::Created(series, st2) => {
let item = QueryItem::ChannelStatus(ChannelStatusItem {
ts: SystemTime::now(),
series: series.clone(),
cssid: st2.cssid.clone(),
status: ChannelStatus::Closed(channel_reason.clone()),
});
self.insert_item_queue.push_back(item);
@@ -920,7 +961,8 @@ impl CaConn {
self.trigger_shutdown(ChannelStatusClosedReason::IocTimeout);
}
} else {
if self.ioc_ping_last.elapsed() > Duration::from_millis(20000) {
// TODO randomize delay a bit
if self.ioc_ping_last.elapsed() > Duration::from_millis(120000) {
if let Some(proto) = &mut self.proto {
self.stats.ping_start().inc();
self.ioc_ping_start = Some(Instant::now());
@@ -1470,6 +1512,8 @@ impl CaConn {
let cssid = match ch_s {
ChannelState::Creating { cssid, .. } => cssid.clone(),
_ => {
// TODO handle in better way:
// Remove channel and emit notice that channel is removed with reason.
let e = Error::with_msg_no_trace("handle_peer_ready bad state");
return Ready(Some(Err(e)));
}
@@ -1522,10 +1566,6 @@ impl CaConn {
let _ = ts1;
res?
}
CaMsgTy::Error(e) => {
warn!("channel access error message {e:?}");
}
CaMsgTy::AccessRightsRes(_) => {}
CaMsgTy::Echo => {
// let addr = &self.remote_addr_dbg;
if let Some(started) = self.ioc_ping_start {
@@ -1553,8 +1593,30 @@ impl CaConn {
self.ioc_ping_last = Instant::now();
self.ioc_ping_start = None;
}
CaMsgTy::CreateChanFail(_) => {
// TODO handle CreateChanFail
CaMsgTy::CreateChanFail(msg) => {
// TODO
// Here, must indicate that the address could be wrong!
// The channel status must be "Fail" so that ConnSet can decide to re-search.
// TODO how to transition the channel state? Any invariants or simply write to the map?
let cid = Cid(msg.cid);
if let Some(name) = self.name_by_cid.get(&cid) {
debug!("queue event to notive channel create fail {name}");
let item = CaConnEvent {
ts: tsnow,
value: CaConnEventValue::ChannelCreateFail(name.into()),
};
self.ca_conn_event_out_queue.push_back(item);
}
self.channel_remove_by_cid(cid);
warn!("CaConn sees: {msg:?}");
}
CaMsgTy::Error(msg) => {
warn!("CaConn sees: {msg:?}");
}
CaMsgTy::AccessRightsRes(msg) => {
if false {
warn!("CaConn sees: {msg:?}");
}
}
_ => {
warn!("Received unexpected protocol message {:?}", k);
@@ -1597,17 +1659,17 @@ impl CaConn {
fn handle_conn_state(&mut self, cx: &mut Context) -> Result<Poll<Option<()>>, Error> {
use Poll::*;
match &mut self.state {
CaConnState::Unconnected => {
CaConnState::Unconnected(_since) => {
let addr = self.remote_addr_dbg.clone();
// TODO issue a TCP-connect event (and later a "connected")
trace!("create tcp connection to {:?}", (addr.ip(), addr.port()));
let fut = tokio::time::timeout(Duration::from_millis(1000), TcpStream::connect(addr));
self.state = CaConnState::Connecting(addr, Box::pin(fut));
self.state = CaConnState::Connecting(Instant::now(), addr, Box::pin(fut));
Ok(Ready(Some(())))
}
CaConnState::Connecting(ref addr, ref mut fut) => {
CaConnState::Connecting(_since, addr, fut) => {
match fut.poll_unpin(cx) {
Ready(connect_result) => {
match connect_result {
@@ -1630,33 +1692,44 @@ impl CaConn {
self.proto = Some(proto);
Ok(Ready(Some(())))
}
Ok(Err(_e)) => {
// TODO log with exponential backoff
let addr = addr.clone();
self.insert_item_queue
.push_back(QueryItem::ConnectionStatus(ConnectionStatusItem {
ts: SystemTime::now(),
addr,
status: ConnectionStatus::ConnectError,
}));
let dt = self.backoff_next();
self.state = CaConnState::Wait(wait_fut(dt));
self.proto = None;
Ok(Err(e)) => {
trace!("error connect to {addr} {e}");
if true {
let addr = addr.clone();
self.insert_item_queue.push_back(QueryItem::ConnectionStatus(
ConnectionStatusItem {
ts: SystemTime::now(),
addr,
status: ConnectionStatus::ConnectError,
},
));
self.trigger_shutdown(ChannelStatusClosedReason::ConnectFail);
} else {
// TODO log with exponential backoff
let dt = self.backoff_next();
self.state = CaConnState::Wait(wait_fut(dt));
self.proto = None;
}
Ok(Ready(Some(())))
}
Err(e) => {
// TODO log with exponential backoff
trace!("timeout during connect to {addr:?} {e:?}");
let addr = addr.clone();
self.insert_item_queue
.push_back(QueryItem::ConnectionStatus(ConnectionStatusItem {
ts: SystemTime::now(),
addr,
status: ConnectionStatus::ConnectTimeout,
}));
let dt = self.backoff_next();
self.state = CaConnState::Wait(wait_fut(dt));
self.proto = None;
trace!("timeout connect to {addr} {e}");
if true {
let addr = addr.clone();
self.insert_item_queue.push_back(QueryItem::ConnectionStatus(
ConnectionStatusItem {
ts: SystemTime::now(),
addr,
status: ConnectionStatus::ConnectTimeout,
},
));
self.trigger_shutdown(ChannelStatusClosedReason::ConnectFail);
} else {
let dt = self.backoff_next();
self.state = CaConnState::Wait(wait_fut(dt));
self.proto = None;
}
Ok(Ready(Some(())))
}
}
@@ -1707,7 +1780,7 @@ impl CaConn {
trace4!("Wait");
match inst.poll_unpin(cx) {
Ready(_) => {
self.state = CaConnState::Unconnected;
self.state = CaConnState::Unconnected(Instant::now());
self.proto = None;
Ok(Ready(Some(())))
}
@@ -1784,7 +1857,22 @@ impl CaConn {
fn handle_own_ticker_tick(self: Pin<&mut Self>, _cx: &mut Context) -> Result<(), Error> {
// debug!("tick CaConn {}", self.remote_addr_dbg);
let tsnow = Instant::now();
let this = self.get_mut();
match &this.state {
CaConnState::Unconnected(since) => {}
CaConnState::Connecting(since, _addr, _) => {
if *since + CONNECTING_TIMEOUT < tsnow {
debug!("CONNECTION TIMEOUT");
}
}
CaConnState::Init => {}
CaConnState::Listen => {}
CaConnState::PeerReady => {}
CaConnState::Wait(_) => {}
CaConnState::Shutdown => {}
CaConnState::EndOfStream => {}
}
if false {
for (_, tb) in this.time_binners.iter_mut() {
let iiq = &mut this.insert_item_queue;
@@ -1794,10 +1882,18 @@ impl CaConn {
Ok(())
}
fn check_ticker_connecting_timeout(&mut self, since: Instant) -> Result<(), Error> {
Ok(())
}
fn queues_out_flushed(&self) -> bool {
self.queues_async_out_flushed() && self.ca_conn_event_out_queue.is_empty()
}
fn queues_async_out_flushed(&self) -> bool {
// self.channel_info_query_queue.is_empty() && self.channel_info_query_sending.is_idle()
// TODO re-enable later
true
self.insert_item_queue.is_empty() && self.storage_insert_sender.is_idle()
}
fn attempt_flush_storage_queue(mut self: Pin<&mut Self>, cx: &mut Context) -> Result<Poll<Option<()>>, Error> {
@@ -1877,23 +1973,9 @@ impl Stream for CaConn {
if let CaConnState::EndOfStream = self.state {
break Ready(None);
}
if let Some(item) = self.cmd_res_queue.pop_front() {
let item = CaConnEvent {
ts: Instant::now(),
value: CaConnEventValue::ConnCommandResult(item),
};
break Ready(Some(Ok(item)));
}
if let Some(item) = self.ca_conn_event_out_queue.pop_front() {
break Ready(Some(Ok(item)));
}
// if let Some(item) = self.insert_item_queue.pop_front() {
// let ev = CaConnEvent {
// ts: Instant::now(),
// value: CaConnEventValue::QueryItem(item),
// };
// break Ready(Some(Ok(ev)));
// }
match self.as_mut().handle_own_ticker(cx) {
Ok(Ready(())) => {
@@ -1954,11 +2036,12 @@ impl Stream for CaConn {
}
break if self.is_shutdown() {
if self.queues_async_out_flushed() {
if self.queues_out_flushed() {
debug!("end of stream {}", self.remote_addr_dbg);
self.state = CaConnState::EndOfStream;
Ready(None)
} else {
debug!("queues_out_flushed false");
if have_progress {
self.stats.ca_conn_poll_reloop().inc();
continue;

View File

@@ -13,6 +13,7 @@ use crate::ca::conn::CaConnOpts;
use crate::ca::conn::ConnCommand;
use crate::ca::statemap::CaConnState;
use crate::ca::statemap::WithAddressState;
use crate::conf::CaIngestOpts;
use crate::daemon_common::Channel;
use crate::errconv::ErrConv;
use crate::rt::JoinHandle;
@@ -32,6 +33,9 @@ use futures_util::Stream;
use futures_util::StreamExt;
use log::*;
use netpod::Database;
use scywr::iteminsertqueue::ChannelInfoItem;
use scywr::iteminsertqueue::ChannelStatus;
use scywr::iteminsertqueue::ChannelStatusItem;
use scywr::iteminsertqueue::QueryItem;
use serde::Serialize;
use series::ChannelStatusSeriesId;
@@ -45,6 +49,7 @@ use statemap::CHANNEL_STATUS_DUMMY_SCALAR_TYPE;
use stats::CaConnSetStats;
use stats::CaConnStats;
use stats::CaProtoStats;
use stats::IocFinderStats;
use std::collections::BTreeMap;
use std::collections::VecDeque;
use std::net::SocketAddr;
@@ -62,15 +67,13 @@ use taskrun::tokio;
const CHECK_CHANS_PER_TICK: usize = 10000;
pub const SEARCH_BATCH_MAX: usize = 256;
pub const CURRENT_SEARCH_PENDING_MAX: usize = SEARCH_BATCH_MAX * 4;
const UNKNOWN_ADDRESS_STAY: Duration = Duration::from_millis(2000);
const UNKNOWN_ADDRESS_STAY: Duration = Duration::from_millis(15000);
const NO_ADDRESS_STAY: Duration = Duration::from_millis(20000);
const MAYBE_WRONG_ADDRESS_STAY: Duration = Duration::from_millis(400);
const SEARCH_PENDING_TIMEOUT: Duration = Duration::from_millis(30000);
const SEARCH_PENDING_TIMEOUT_WARN: Duration = Duration::from_millis(8000);
const CHANNEL_HEALTH_TIMEOUT: Duration = Duration::from_millis(8000);
const CHANNEL_UNASSIGNED_TIMEOUT: Duration = Duration::from_millis(8000);
// TODO put all these into metrics
static SEARCH_REQ_MARK_COUNT: AtomicUsize = AtomicUsize::new(0);
const SEARCH_PENDING_TIMEOUT_WARN: Duration = Duration::from_millis(15000);
const CHANNEL_HEALTH_TIMEOUT: Duration = Duration::from_millis(30000);
const CHANNEL_UNASSIGNED_TIMEOUT: Duration = Duration::from_millis(15000);
#[allow(unused)]
macro_rules! trace2 {
@@ -209,6 +212,7 @@ pub struct CaConnSetCtrl {
stats: Arc<CaConnSetStats>,
ca_conn_stats: Arc<CaConnStats>,
ca_proto_stats: Arc<CaProtoStats>,
ioc_finder_stats: Arc<IocFinderStats>,
jh: JoinHandle<Result<(), Error>>,
}
@@ -267,11 +271,38 @@ impl CaConnSetCtrl {
pub fn ca_proto_stats(&self) -> &Arc<CaProtoStats> {
&self.ca_proto_stats
}
pub fn ioc_finder_stats(&self) -> &Arc<IocFinderStats> {
&self.ioc_finder_stats
}
}
#[derive(Debug)]
pub struct IocAddrQuery {
pub name: String,
name: String,
use_cache: bool,
}
impl IocAddrQuery {
pub fn cached(name: String) -> Self {
Self { name, use_cache: true }
}
pub fn uncached(name: String) -> Self {
Self { name, use_cache: false }
}
pub fn name(&self) -> &str {
&self.name
}
pub fn name_string(&self) -> &String {
&self.name
}
pub fn use_cache(&self) -> bool {
self.use_cache
}
}
struct SeriesLookupSender {
@@ -332,14 +363,20 @@ impl CaConnSet {
local_epics_hostname: String,
storage_insert_tx: Sender<QueryItem>,
channel_info_query_tx: Sender<ChannelInfoQuery>,
pgconf: Database,
ingest_opts: CaIngestOpts,
) -> CaConnSetCtrl {
let (ca_conn_res_tx, ca_conn_res_rx) = async_channel::bounded(200);
let (connset_inp_tx, connset_inp_rx) = async_channel::bounded(200);
let (connset_out_tx, connset_out_rx) = async_channel::bounded(200);
let (find_ioc_res_tx, find_ioc_res_rx) = async_channel::bounded(400);
let (find_ioc_query_tx, ioc_finder_jh) =
super::finder::start_finder(find_ioc_res_tx.clone(), backend.clone(), pgconf);
let ioc_finder_stats = Arc::new(IocFinderStats::new());
let (find_ioc_query_tx, ioc_finder_jh) = super::finder::start_finder(
find_ioc_res_tx.clone(),
backend.clone(),
ingest_opts,
ioc_finder_stats.clone(),
)
.unwrap();
let (channel_info_res_tx, channel_info_res_rx) = async_channel::bounded(400);
let stats = Arc::new(CaConnSetStats::new());
let ca_proto_stats = Arc::new(CaProtoStats::new());
@@ -388,6 +425,7 @@ impl CaConnSet {
stats,
ca_conn_stats,
ca_proto_stats,
ioc_finder_stats,
jh,
}
}
@@ -407,6 +445,9 @@ impl CaConnSet {
// this.find_ioc_query_tx.receiver_count()
// );
debug!("CaConnSet EndOfStream");
debug!("join ioc_finder_jh A {:?}", this.find_ioc_query_sender.len());
this.find_ioc_query_sender.drop();
debug!("join ioc_finder_jh B {:?}", this.find_ioc_query_sender.len());
this.ioc_finder_jh
.await
.map_err(|e| Error::with_msg_no_trace(e.to_string()))??;
@@ -443,7 +484,9 @@ impl CaConnSet {
self.storage_insert_queue.push_back(item);
Ok(())
}
CaConnEventValue::ChannelCreateFail(x) => self.handle_channel_create_fail(addr, x),
CaConnEventValue::EndOfStream => self.handle_ca_conn_eos(addr),
CaConnEventValue::ConnectFail => self.handle_connect_fail(addr),
}
}
@@ -482,6 +525,7 @@ impl CaConnSet {
value: ChannelStateValue::Active(ActiveChannelState::WaitForStatusSeriesId {
since: SystemTime::now(),
}),
running_cmd_id: None,
});
let item = ChannelInfoQuery {
backend: cmd.backend,
@@ -510,12 +554,12 @@ impl CaConnSet {
*chst2 = ActiveChannelState::WithStatusSeriesId {
status_series_id: cmd.cssid,
state: WithStatusSeriesIdState {
inner: WithStatusSeriesIdStateInner::SearchPending {
inner: WithStatusSeriesIdStateInner::AddrSearchPending {
since: SystemTime::now(),
},
},
};
let qu = IocAddrQuery { name: cmd.name };
let qu = IocAddrQuery::cached(cmd.name);
self.find_ioc_query_queue.push_back(qu);
self.stats.ioc_search_start().inc();
} else {
@@ -593,7 +637,7 @@ impl CaConnSet {
WithStatusSeriesIdStateInner::UnknownAddress { .. } => {
k.value = ChannelStateValue::ToRemove { addr: None };
}
WithStatusSeriesIdStateInner::SearchPending { .. } => {
WithStatusSeriesIdStateInner::AddrSearchPending { .. } => {
k.value = ChannelStateValue::ToRemove { addr: None };
}
WithStatusSeriesIdStateInner::WithAddress { addr, state: _ } => {
@@ -604,6 +648,9 @@ impl CaConnSet {
WithStatusSeriesIdStateInner::NoAddress { .. } => {
k.value = ChannelStateValue::ToRemove { addr: None };
}
WithStatusSeriesIdStateInner::MaybeWrongAddress { .. } => {
k.value = ChannelStateValue::ToRemove { addr: None };
}
},
},
ChannelStateValue::ToRemove { .. } => {}
@@ -616,7 +663,6 @@ impl CaConnSet {
if self.shutdown_stopping {
return Ok(());
}
self.stats.ioc_addr_found().inc();
trace3!("handle_ioc_query_result");
for e in res {
let ch = Channel::new(e.channel.clone());
@@ -628,6 +674,7 @@ impl CaConnSet {
} = ast
{
if let Some(addr) = e.addr {
self.stats.ioc_addr_found().inc();
trace3!("ioc found {e:?}");
let add = ChannelAddWithAddr {
backend: self.backend.clone(),
@@ -643,17 +690,21 @@ impl CaConnSet {
};
self.handle_add_channel_with_addr(add)?;
} else {
self.stats.ioc_addr_not_found().inc();
trace3!("ioc not found {e:?}");
let since = SystemTime::now();
state.inner = WithStatusSeriesIdStateInner::UnknownAddress { since };
}
} else {
self.stats.ioc_addr_result_for_unknown_channel().inc();
warn!("TODO got address but no longer active");
}
} else {
self.stats.ioc_addr_result_for_unknown_channel().inc();
warn!("TODO got address but no longer active");
}
} else {
self.stats.ioc_addr_result_for_unknown_channel().inc();
warn!("ioc addr lookup done but channel no longer here");
}
}
@@ -664,10 +715,10 @@ impl CaConnSet {
if self.shutdown_stopping {
return Ok(());
}
self.thr_msg_storage_len
.trigger("msg", &[&self.storage_insert_sender.len()]);
debug!("TODO handle_check_health");
if false {
self.thr_msg_storage_len
.trigger("connset handle_check_health", &[&self.storage_insert_sender.len()]);
}
self.check_channel_states()?;
// Trigger already the next health check, but use the current data that we have.
@@ -714,6 +765,7 @@ impl CaConnSet {
}
debug!("handle_shutdown");
self.shutdown_stopping = true;
self.find_ioc_res_rx.close();
self.channel_info_query_sender.drop();
self.find_ioc_query_sender.drop();
for (_addr, res) in self.ca_conn_ress.iter() {
@@ -775,6 +827,25 @@ impl CaConnSet {
Ok(())
}
fn handle_channel_create_fail(&mut self, addr: SocketAddr, name: String) -> Result<(), Error> {
trace!("handle_channel_create_fail {addr} {name}");
let tsnow = SystemTime::now();
let ch = Channel::new(name);
if let Some(st1) = self.channel_states.inner().get_mut(&ch) {
if let ChannelStateValue::Active(st2) = &mut st1.value {
if let ActiveChannelState::WithStatusSeriesId {
status_series_id: _,
state: st3,
} = st2
{
trace!("handle_channel_create_fail {addr} {ch:?} set to MaybeWrongAddress");
st3.inner = WithStatusSeriesIdStateInner::MaybeWrongAddress { since: tsnow };
}
}
}
Ok(())
}
fn handle_ca_conn_eos(&mut self, addr: SocketAddr) -> Result<(), Error> {
trace2!("handle_ca_conn_eos {addr}");
if let Some(e) = self.ca_conn_ress.remove(&addr) {
@@ -789,6 +860,28 @@ impl CaConnSet {
Ok(())
}
fn handle_connect_fail(&mut self, addr: SocketAddr) -> Result<(), Error> {
trace2!("handle_connect_fail {addr}");
let tsnow = SystemTime::now();
for (ch, st1) in self.channel_states.inner().iter_mut() {
match &mut st1.value {
ChannelStateValue::Active(st2) => match st2 {
ActiveChannelState::Init { since: _ } => {}
ActiveChannelState::WaitForStatusSeriesId { since: _ } => {}
ActiveChannelState::WithStatusSeriesId {
status_series_id: _,
state: st3,
} => {
debug!("connect fail, maybe wrong address for {}", ch.id());
st3.inner = WithStatusSeriesIdStateInner::MaybeWrongAddress { since: tsnow };
}
},
ChannelStateValue::ToRemove { addr: _ } => {}
}
}
Ok(())
}
fn remove_status_for_addr(&mut self, addr: SocketAddr) -> Result<(), Error> {
Ok(())
}
@@ -828,7 +921,7 @@ impl CaConnSet {
let conn_stats = conn.stats();
let tx1 = self.ca_conn_res_tx.clone();
let tx2 = self.storage_insert_tx.clone();
let jh = tokio::spawn(Self::ca_conn_item_merge(conn, tx1, tx2, addr));
let jh = tokio::spawn(Self::ca_conn_item_merge(conn, tx1, tx2, addr, self.stats.clone()));
let ca_conn_res = CaConnRes {
state: CaConnState::new(CaConnStateValue::Fresh),
sender: conn_tx,
@@ -844,30 +937,52 @@ impl CaConnSet {
tx1: Sender<(SocketAddr, CaConnEvent)>,
tx2: Sender<QueryItem>,
addr: SocketAddr,
stats: Arc<CaConnSetStats>,
) -> Result<(), Error> {
stats.ca_conn_task_begin().inc();
trace2!("ca_conn_consumer begin {}", addr);
let stats = conn.stats();
let connstats = conn.stats();
let mut conn = conn;
let mut ret = Ok(());
while let Some(item) = conn.next().await {
match item {
Ok(item) => {
stats.conn_item_count.inc();
connstats.conn_item_count.inc();
match item.value {
CaConnEventValue::QueryItem(x) => {
tx2.send(x).await;
if let Err(_) = tx2.send(x).await {
break;
}
}
CaConnEventValue::None => {
tx1.send((addr, item)).await;
if let Err(_) = tx1.send((addr, item)).await {
break;
}
}
CaConnEventValue::EchoTimeout => {
tx1.send((addr, item)).await;
if let Err(_) = tx1.send((addr, item)).await {
break;
}
}
CaConnEventValue::ConnCommandResult(_) => {
tx1.send((addr, item)).await;
if let Err(_) = tx1.send((addr, item)).await {
break;
}
}
CaConnEventValue::ChannelCreateFail(_) => {
if let Err(_) = tx1.send((addr, item)).await {
break;
}
}
CaConnEventValue::EndOfStream => {
tx1.send((addr, item)).await;
if let Err(_) = tx1.send((addr, item)).await {
break;
}
}
CaConnEventValue::ConnectFail => {
if let Err(_) = tx1.send((addr, item)).await {
break;
}
}
}
}
@@ -888,9 +1003,25 @@ impl CaConnSet {
))
.await?;
trace2!("ca_conn_consumer signaled {}", addr);
stats.ca_conn_task_done().inc();
ret
}
fn push_channel_status(&mut self, item: ChannelStatusItem) -> Result<(), Error> {
if false {
let _ = ChannelInfoItem {
ts_msp: todo!(),
series: todo!(),
ivl: todo!(),
interest: todo!(),
evsize: todo!(),
};
}
let item = QueryItem::ChannelStatus(item);
self.storage_insert_queue.push_back(item);
Ok(())
}
#[allow(unused)]
async fn __enqueue_command_to_all<F>(&self, cmdgen: F) -> Result<Vec<CmdId>, Error>
where
@@ -1024,6 +1155,7 @@ impl CaConnSet {
let (mut search_pending_count,) = self.update_channel_state_counts();
let mut cmd_remove_channel = Vec::new();
let mut cmd_add_channel = Vec::new();
let mut channel_status_items = Vec::new();
let k = self.chan_check_next.take();
let it = if let Some(last) = k {
trace!("check_chans start at {:?}", last);
@@ -1054,17 +1186,17 @@ impl CaConnSet {
state,
} => match &mut state.inner {
WithStatusSeriesIdStateInner::UnknownAddress { since } => {
let dt = tsnow.duration_since(*since).unwrap_or(Duration::ZERO);
if dt > UNKNOWN_ADDRESS_STAY {
//info!("UnknownAddress {} {:?}", i, ch);
if (search_pending_count as usize) < CURRENT_SEARCH_PENDING_MAX {
search_pending_count += 1;
state.inner = WithStatusSeriesIdStateInner::SearchPending { since: tsnow };
SEARCH_REQ_MARK_COUNT.fetch_add(1, atomic::Ordering::AcqRel);
if search_pending_count < CURRENT_SEARCH_PENDING_MAX as _ {
if since.checked_add(UNKNOWN_ADDRESS_STAY).unwrap() < tsnow {
if false {
// TODO
search_pending_count += 1;
state.inner = WithStatusSeriesIdStateInner::AddrSearchPending { since: tsnow };
}
}
}
}
WithStatusSeriesIdStateInner::SearchPending { since } => {
WithStatusSeriesIdStateInner::AddrSearchPending { since } => {
let dt = tsnow.duration_since(*since).unwrap_or(Duration::ZERO);
if dt > SEARCH_PENDING_TIMEOUT {
debug!("TODO should receive some error indication instead of timeout for {ch:?}");
@@ -1093,19 +1225,36 @@ impl CaConnSet {
Assigned(st4) => {
if st4.updated + CHANNEL_HEALTH_TIMEOUT < tsnow {
self.stats.channel_health_timeout().inc();
debug!("health timeout channel {ch:?} ~~~~~~~~~~~~~~~~~~~");
let addr = SocketAddr::V4(*addr_v4);
cmd_remove_channel.push((addr, ch.clone()));
*st3 = WithAddressState::Unassigned { since: tsnow };
state.inner = WithStatusSeriesIdStateInner::MaybeWrongAddress { since: tsnow };
// *st3 = WithAddressState::Unassigned { since: tsnow };
let item =
ChannelStatusItem::new_closed_conn_timeout(tsnow, status_series_id.clone());
channel_status_items.push(item);
}
}
}
}
WithStatusSeriesIdStateInner::NoAddress { since } => {
let dt = tsnow.duration_since(*since).unwrap_or(Duration::ZERO);
if dt > NO_ADDRESS_STAY {
if *since + NO_ADDRESS_STAY < tsnow {
state.inner = WithStatusSeriesIdStateInner::UnknownAddress { since: tsnow };
}
}
WithStatusSeriesIdStateInner::MaybeWrongAddress { since } => {
if *since + MAYBE_WRONG_ADDRESS_STAY < tsnow {
if search_pending_count < CURRENT_SEARCH_PENDING_MAX as _ {
if since.checked_add(UNKNOWN_ADDRESS_STAY).unwrap() < tsnow {
search_pending_count += 1;
state.inner = WithStatusSeriesIdStateInner::AddrSearchPending { since: tsnow };
let qu = IocAddrQuery::uncached(ch.id().into());
self.find_ioc_query_queue.push_back(qu);
self.stats.ioc_search_start().inc();
}
}
}
}
},
},
ChannelStateValue::ToRemove { .. } => {
@@ -1117,6 +1266,9 @@ impl CaConnSet {
break;
}
}
for item in channel_status_items {
self.push_channel_status(item)?;
}
for (addr, ch) in cmd_remove_channel {
if let Some(g) = self.ca_conn_ress.get_mut(&addr) {
let cmd = ConnCommand::channel_remove(ch.id().into());
@@ -1138,6 +1290,7 @@ impl CaConnSet {
let mut unassigned = 0;
let mut assigned = 0;
let mut connected = 0;
let mut maybe_wrong_address = 0;
for (_ch, st) in self.channel_states.inner().iter() {
match &st.value {
ChannelStateValue::Active(st2) => match st2 {
@@ -1151,7 +1304,7 @@ impl CaConnSet {
WithStatusSeriesIdStateInner::UnknownAddress { .. } => {
unknown_address += 1;
}
WithStatusSeriesIdStateInner::SearchPending { .. } => {
WithStatusSeriesIdStateInner::AddrSearchPending { .. } => {
search_pending += 1;
}
WithStatusSeriesIdStateInner::WithAddress { state, .. } => match state {
@@ -1170,6 +1323,9 @@ impl CaConnSet {
WithStatusSeriesIdStateInner::NoAddress { .. } => {
no_address += 1;
}
WithStatusSeriesIdStateInner::MaybeWrongAddress { .. } => {
maybe_wrong_address += 1;
}
},
},
ChannelStateValue::ToRemove { .. } => {
@@ -1183,6 +1339,7 @@ impl CaConnSet {
self.stats.channel_unassigned.set(unassigned);
self.stats.channel_assigned.set(assigned);
self.stats.channel_connected.set(connected);
self.stats.channel_maybe_wrong_address.set(maybe_wrong_address);
(search_pending,)
}
@@ -1222,9 +1379,11 @@ impl Stream for CaConnSet {
self.stats.poll_fn_begin().inc();
loop {
self.stats.poll_loop_begin().inc();
self.thr_msg_poll_1.trigger("CaConnSet::poll", &[]);
self.stats.storage_insert_tx_len.set(self.storage_insert_tx.len() as _);
self.stats
.storage_insert_queue_len
.set(self.storage_insert_queue.len() as _);
self.stats
.channel_info_query_queue_len
.set(self.channel_info_query_queue.len() as _);

View File

@@ -2,23 +2,29 @@ use super::connset::CaConnSetEvent;
use super::connset::IocAddrQuery;
use super::connset::CURRENT_SEARCH_PENDING_MAX;
use super::connset::SEARCH_BATCH_MAX;
use super::search::ca_search_workers_start;
use crate::ca::findioc::FindIocRes;
use crate::ca::findioc::FindIocStream;
use crate::conf::CaIngestOpts;
use crate::daemon_common::DaemonEvent;
use async_channel::Receiver;
use async_channel::Sender;
use dbpg::conn::make_pg_client;
use dbpg::iocindex::IocItem;
use dbpg::iocindex::IocSearchIndexWorker;
use dbpg::postgres::Row as PgRow;
use err::Error;
use futures_util::FutureExt;
use futures_util::StreamExt;
use log::*;
use netpod::Database;
use stats::IocFinderStats;
use std::collections::HashMap;
use std::collections::VecDeque;
use std::net::SocketAddrV4;
use std::sync::atomic;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use taskrun::tokio;
@@ -30,16 +36,8 @@ const FINDER_BATCH_SIZE: usize = 8;
const FINDER_IN_FLIGHT_MAX: usize = 800;
const FINDER_TIMEOUT: Duration = Duration::from_millis(100);
// TODO pull out into a stats
static SEARCH_REQ_BATCH_RECV_COUNT: AtomicUsize = AtomicUsize::new(0);
static SEARCH_RES_0_COUNT: AtomicUsize = AtomicUsize::new(0);
static SEARCH_RES_1_COUNT: AtomicUsize = AtomicUsize::new(0);
static SEARCH_RES_2_COUNT: AtomicUsize = AtomicUsize::new(0);
static SEARCH_RES_3_COUNT: AtomicUsize = AtomicUsize::new(0);
#[allow(unused)]
macro_rules! debug_batch {
// (D$($arg:tt)*) => ();
($($arg:tt)*) => (if false {
debug!($($arg)*);
});
@@ -47,7 +45,6 @@ macro_rules! debug_batch {
#[allow(unused)]
macro_rules! trace_batch {
// (D$($arg:tt)*) => ();
($($arg:tt)*) => (if false {
trace!($($arg)*);
});
@@ -83,11 +80,50 @@ fn transform_pgres(rows: Vec<PgRow>) -> VecDeque<FindIocRes> {
ret
}
pub fn start_finder(
tx: Sender<VecDeque<FindIocRes>>,
backend: String,
opts: CaIngestOpts,
stats: Arc<IocFinderStats>,
) -> Result<(Sender<IocAddrQuery>, JoinHandle<Result<(), Error>>), Error> {
let (qtx, qrx) = async_channel::bounded(CURRENT_SEARCH_PENDING_MAX);
let jh = taskrun::spawn(finder_full(qrx, tx, backend, opts, stats));
Ok((qtx, jh))
}
async fn finder_worker(
qrx: Receiver<IocAddrQuery>,
tx: Sender<VecDeque<FindIocRes>>,
backend: String,
db: Database,
stats: Arc<IocFinderStats>,
) -> Result<(), Error> {
// TODO do something with join handle
let (batch_rx, jh) = batchtools::batcher::batch(
SEARCH_BATCH_MAX,
Duration::from_millis(200),
SEARCH_DB_PIPELINE_LEN,
qrx,
);
for _ in 0..SEARCH_DB_PIPELINE_LEN {
// TODO use join handle
tokio::spawn(finder_worker_single(
batch_rx.clone(),
tx.clone(),
backend.clone(),
db.clone(),
stats.clone(),
));
}
Ok(())
}
async fn finder_worker_single(
inp: Receiver<Vec<IocAddrQuery>>,
tx: Sender<VecDeque<FindIocRes>>,
backend: String,
db: Database,
stats: Arc<IocFinderStats>,
) -> Result<(), Error> {
let (pg, jh) = make_pg_client(&db)
.await
@@ -106,10 +142,11 @@ async fn finder_worker_single(
loop {
match inp.recv().await {
Ok(batch) => {
SEARCH_REQ_BATCH_RECV_COUNT.fetch_add(batch.len(), atomic::Ordering::AcqRel);
stats.dbsearcher_batch_recv().inc();
stats.dbsearcher_item_recv().add(batch.len() as _);
let ts1 = Instant::now();
debug_batch!("run query batch len {}", batch.len());
let names: Vec<_> = batch.iter().map(|x| x.name.as_str()).collect();
let names: Vec<_> = batch.iter().filter(|x| x.use_cache()).map(|x| x.name()).collect();
let qres = pg.query(&qu_select_multi, &[&backend, &names]).await;
let dt = ts1.elapsed();
debug_batch!(
@@ -125,30 +162,29 @@ async fn finder_worker_single(
out.push_str(", ");
}
out.push('\'');
out.push_str(&e.name);
out.push_str(e.name());
out.push('\'');
}
out.push(']');
eprintln!("VERY SLOW QUERY\n{out}");
trace!("very slow query\n{out}");
}
match qres {
Ok(rows) => {
stats.dbsearcher_select_res_0().add(rows.len() as _);
if rows.len() > batch.len() {
error!("MORE RESULTS THAN INPUT");
stats.dbsearcher_select_error_len_mismatch().inc();
} else if rows.len() < batch.len() {
resdiff += batch.len() - rows.len();
}
let nbatch = batch.len();
trace_batch!("received results {} resdiff {}", rows.len(), resdiff);
SEARCH_RES_0_COUNT.fetch_add(rows.len(), atomic::Ordering::AcqRel);
let items = transform_pgres(rows);
let names: HashMap<_, _> = items.iter().map(|x| (&x.channel, true)).collect();
let mut to_add = Vec::new();
for e in batch {
let s = e.name;
if !names.contains_key(&s) {
if !names.contains_key(e.name_string()) {
let item = FindIocRes {
channel: s,
channel: e.name().into(),
response_addr: None,
addr: None,
dt: Duration::from_millis(0),
@@ -156,17 +192,17 @@ async fn finder_worker_single(
to_add.push(item);
}
}
SEARCH_RES_1_COUNT.fetch_add(items.len(), atomic::Ordering::AcqRel);
SEARCH_RES_2_COUNT.fetch_add(to_add.len(), atomic::Ordering::AcqRel);
let mut items = items;
items.extend(to_add.into_iter());
if items.len() != nbatch {
error!("STILL NOT MATCHING LEN");
let items_len = items.len();
if items_len != nbatch {
stats.dbsearcher_select_error_len_mismatch().inc();
}
SEARCH_RES_3_COUNT.fetch_add(items.len(), atomic::Ordering::AcqRel);
let x = tx.send(items).await;
match x {
Ok(_) => {}
match tx.send(items).await {
Ok(_) => {
stats.dbsearcher_batch_send().inc();
stats.dbsearcher_item_send().add(items_len as _);
}
Err(e) => {
error!("finder sees: {e}");
break;
@@ -182,76 +218,110 @@ async fn finder_worker_single(
Err(_e) => break,
}
}
debug!("finder_worker_single done");
jh.await?.map_err(|e| Error::from_string(e))?;
Ok(())
}
async fn finder_worker(
async fn finder_full(
qrx: Receiver<IocAddrQuery>,
tx: Sender<VecDeque<FindIocRes>>,
backend: String,
db: Database,
opts: CaIngestOpts,
stats: Arc<IocFinderStats>,
) -> Result<(), Error> {
// TODO do something with join handle
let (batch_rx, _jh) = batchtools::batcher::batch(
SEARCH_BATCH_MAX,
Duration::from_millis(200),
SEARCH_DB_PIPELINE_LEN,
let (tx1, rx1) = async_channel::bounded(20);
let jh1 = taskrun::spawn(finder_worker(
qrx,
);
for _ in 0..SEARCH_DB_PIPELINE_LEN {
// TODO use join handle
tokio::spawn(finder_worker_single(
batch_rx.clone(),
tx.clone(),
backend.clone(),
db.clone(),
));
tx1,
backend,
opts.postgresql_config().clone(),
stats.clone(),
));
let jh2 = taskrun::spawn(finder_network_if_not_found(rx1, tx, opts.clone(), stats));
jh1.await??;
jh2.await??;
Ok(())
}
async fn finder_network_if_not_found(
mut rx: Receiver<VecDeque<FindIocRes>>,
tx: Sender<VecDeque<FindIocRes>>,
opts: CaIngestOpts,
stats: Arc<IocFinderStats>,
) -> Result<(), Error> {
let (net_tx, net_rx, jh, jhs) = ca_search_workers_start(&opts, stats.clone()).await.unwrap();
let jh2 = taskrun::spawn(process_net_result(net_rx, tx.clone(), opts.clone()));
'outer: while let Some(item) = rx.next().await {
let mut res = VecDeque::new();
let mut net = VecDeque::new();
for e in item {
if e.addr.is_none() {
net.push_back(e.channel);
} else {
res.push_back(e);
}
}
if let Err(_) = tx.send(res).await {
break;
}
for ch in net {
if let Err(_) = net_tx.send(ch).await {
break 'outer;
}
}
}
for jh in jhs {
jh.await??;
}
jh.await??;
jh2.await??;
debug!("finder_network_if_not_found done");
Ok(())
}
async fn process_net_result(
mut net_rx: Receiver<Result<VecDeque<FindIocRes>, Error>>,
tx: Sender<VecDeque<FindIocRes>>,
opts: CaIngestOpts,
) -> Result<(), Error> {
const IOC_SEARCH_INDEX_WORKER_COUNT: usize = 1;
let (dbtx, dbrx) = async_channel::bounded(64);
let mut ioc_search_index_worker_jhs = Vec::new();
let mut index_worker_pg_jh = Vec::new();
for _ in 0..IOC_SEARCH_INDEX_WORKER_COUNT {
let backend = opts.backend().into();
let (pg, jh) = dbpg::conn::make_pg_client(opts.postgresql_config()).await.unwrap();
index_worker_pg_jh.push(jh);
let worker = IocSearchIndexWorker::prepare(dbrx.clone(), backend, pg).await.unwrap();
let jh = tokio::spawn(async move { worker.worker().await });
ioc_search_index_worker_jhs.push(jh);
}
drop(dbrx);
while let Some(item) = net_rx.next().await {
match item {
Ok(item) => {
for e in item.iter() {
let cacheitem =
IocItem::new(e.channel.clone(), e.response_addr.clone(), e.addr.clone(), e.dt.clone());
if let Err(_) = dbtx.send(cacheitem).await {
break;
}
}
if let Err(_) = tx.send(item).await {
break;
}
}
Err(e) => {
warn!("error during network search: {e}");
break;
}
}
}
Ok(())
}
pub fn start_finder(
tx: Sender<VecDeque<FindIocRes>>,
backend: String,
db: Database,
) -> (Sender<IocAddrQuery>, JoinHandle<Result<(), Error>>) {
let (qtx, qrx) = async_channel::bounded(CURRENT_SEARCH_PENDING_MAX);
let jh = taskrun::spawn(finder_worker(qrx, tx, backend, db));
(qtx, jh)
}
struct OptFut<F> {
fut: Option<F>,
}
impl<F> OptFut<F> {
fn empty() -> Self {
Self { fut: None }
}
fn new(fut: F) -> Self {
Self { fut: Some(fut) }
}
fn is_enabled(&self) -> bool {
self.fut.is_some()
}
}
impl<F> futures_util::Future for OptFut<F>
where
F: futures_util::Future + std::marker::Unpin,
{
type Output = <F as futures_util::Future>::Output;
fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context) -> std::task::Poll<Self::Output> {
match self.fut.as_mut() {
Some(fut) => fut.poll_unpin(cx),
None => std::task::Poll::Pending,
}
}
}
#[cfg(DISABLED)]
#[allow(unused)]
fn start_finder_ca(tx: Sender<DaemonEvent>, tgts: Vec<SocketAddrV4>) -> (Sender<String>, JoinHandle<()>) {
let (qtx, qrx) = async_channel::bounded(32);

View File

@@ -1,12 +1,16 @@
use crate::ca::proto::CaMsg;
use crate::ca::proto::CaMsgTy;
use crate::ca::proto::HeadInfo;
use crate::throttletrace::ThrottleTrace;
use async_channel::Receiver;
use err::Error;
use futures_util::Future;
use futures_util::FutureExt;
use futures_util::Stream;
use futures_util::StreamExt;
use libc::c_int;
use log::*;
use stats::IocFinderStats;
use std::collections::BTreeMap;
use std::collections::VecDeque;
use std::net::Ipv4Addr;
@@ -14,6 +18,7 @@ use std::net::SocketAddrV4;
use std::pin::Pin;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
use std::time::Duration;
@@ -21,6 +26,15 @@ use std::time::Instant;
use taskrun::tokio;
use tokio::io::unix::AsyncFd;
#[allow(unused)]
macro_rules! trace2 {
($($arg:tt)*) => {
if true {
trace!($($arg)*);
}
};
}
struct SockBox(c_int);
impl Drop for SockBox {
@@ -36,14 +50,26 @@ impl Drop for SockBox {
// TODO should be able to get away with non-atomic counters.
static BATCH_ID: AtomicUsize = AtomicUsize::new(0);
static SEARCH_ID2: AtomicUsize = AtomicUsize::new(0);
static SEARCH_ID: AtomicUsize = AtomicUsize::new(0);
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug)]
struct BatchId(u32);
impl BatchId {
fn next() -> Self {
Self(BATCH_ID.fetch_add(1, Ordering::AcqRel) as u32)
}
}
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug)]
struct SearchId(u32);
impl SearchId {
fn next() -> Self {
Self(SEARCH_ID.fetch_add(1, Ordering::AcqRel) as u32)
}
}
struct SearchBatch {
ts_beg: Instant,
tgts: VecDeque<usize>,
@@ -62,7 +88,7 @@ pub struct FindIocRes {
pub struct FindIocStream {
tgts: Vec<SocketAddrV4>,
channels_input: VecDeque<String>,
channels_input: Receiver<String>,
in_flight: BTreeMap<BatchId, SearchBatch>,
in_flight_max: usize,
bid_by_sid: BTreeMap<SearchId, BatchId>,
@@ -80,16 +106,30 @@ pub struct FindIocStream {
sids_done: BTreeMap<SearchId, ()>,
result_for_done_sid_count: u64,
sleeper: Pin<Box<dyn Future<Output = ()> + Send>>,
stop_on_empty_queue: bool,
#[allow(unused)]
thr_msg_0: ThrottleTrace,
#[allow(unused)]
thr_msg_1: ThrottleTrace,
#[allow(unused)]
thr_msg_2: ThrottleTrace,
stats: Arc<IocFinderStats>,
}
impl FindIocStream {
pub fn new(tgts: Vec<SocketAddrV4>, batch_run_max: Duration, in_flight_max: usize, batch_size: usize) -> Self {
pub fn new(
channels_input: Receiver<String>,
tgts: Vec<SocketAddrV4>,
#[allow(unused)] blacklist: Vec<SocketAddrV4>,
batch_run_max: Duration,
in_flight_max: usize,
batch_size: usize,
stats: Arc<IocFinderStats>,
) -> Self {
let sock = unsafe { Self::create_socket() }.unwrap();
let afd = AsyncFd::new(sock.0).unwrap();
Self {
tgts,
channels_input: VecDeque::new(),
channels_input,
in_flight: BTreeMap::new(),
bid_by_sid: BTreeMap::new(),
batch_send_queue: VecDeque::new(),
@@ -107,14 +147,13 @@ impl FindIocStream {
channels_per_batch: batch_size,
batch_run_max,
sleeper: Box::pin(tokio::time::sleep(Duration::from_millis(500))),
stop_on_empty_queue: false,
thr_msg_0: ThrottleTrace::new(Duration::from_millis(1000)),
thr_msg_1: ThrottleTrace::new(Duration::from_millis(1000)),
thr_msg_2: ThrottleTrace::new(Duration::from_millis(1000)),
stats,
}
}
pub fn set_stop_on_empty_queue(&mut self) {
self.stop_on_empty_queue = true;
}
pub fn quick_state(&self) -> String {
format!(
"channels_input {} in_flight {} bid_by_sid {} out_queue {} result_for_done_sid_count {} bids_timed_out {}",
@@ -131,10 +170,6 @@ impl FindIocStream {
self.channels_input.len()
}
pub fn push(&mut self, x: String) {
self.channels_input.push_back(x);
}
fn buf_and_batch(&mut self, bid: &BatchId) -> Option<(&mut Vec<u8>, &mut SearchBatch)> {
match self.in_flight.get_mut(bid) {
Some(batch) => Some((&mut self.buf1, batch)),
@@ -235,7 +270,10 @@ impl FindIocStream {
Poll::Ready(Ok(()))
}
unsafe fn try_read(sock: i32) -> Poll<Result<(SocketAddrV4, Vec<(SearchId, SocketAddrV4)>), Error>> {
unsafe fn try_read(
sock: i32,
stats: &IocFinderStats,
) -> Poll<Result<(SocketAddrV4, Vec<(SearchId, SocketAddrV4)>), Error>> {
let mut saddr_mem = [0u8; std::mem::size_of::<libc::sockaddr>()];
let mut saddr_len: libc::socklen_t = saddr_mem.len() as _;
let mut buf = vec![0u8; 1024];
@@ -255,11 +293,14 @@ impl FindIocStream {
return Poll::Ready(Err("FindIocStream can not read".into()));
}
} else if ec < 0 {
stats.ca_udp_io_error().inc();
error!("unexpected received {ec}");
Poll::Ready(Err(Error::with_msg_no_trace(format!("try_read ec {ec}"))))
} else if ec == 0 {
stats.ca_udp_io_empty().inc();
Poll::Ready(Err(Error::with_msg_no_trace(format!("try_read ec {ec}"))))
} else {
stats.ca_udp_io_recv().inc();
let saddr2: libc::sockaddr_in = std::mem::transmute_copy(&saddr_mem);
let src_addr = Ipv4Addr::from(saddr2.sin_addr.s_addr.to_ne_bytes());
let src_port = u16::from_be(saddr2.sin_port);
@@ -308,13 +349,16 @@ impl FindIocStream {
accounted += 16 + hi.payload();
}
if accounted != ec as usize {
info!("unaccounted data ec {} accounted {}", ec, accounted);
stats.ca_udp_unaccounted_data().inc();
debug!("unaccounted data ec {} accounted {}", ec, accounted);
}
if msgs.len() < 1 {
warn!("received answer without messages");
stats.ca_udp_warn().inc();
debug!("received answer without messages");
}
if msgs.len() == 1 {
warn!("received answer with single message: {msgs:?}");
stats.ca_udp_warn().inc();
debug!("received answer with single message: {msgs:?}");
}
let mut good = true;
if let CaMsgTy::VersionRes(v) = msgs[0].ty {
@@ -323,7 +367,8 @@ impl FindIocStream {
good = false;
}
} else {
debug!("first message is not a version: {:?}", msgs[0].ty);
stats.ca_udp_first_msg_not_version().inc();
// debug!("first message is not a version: {:?}", msgs[0].ty);
// Seems like a bug in many IOCs
//good = false;
}
@@ -337,6 +382,7 @@ impl FindIocStream {
}
//CaMsgTy::VersionRes(13) => {}
_ => {
stats.ca_udp_error().inc();
warn!("try_read: unknown message received {:?}", msg.ty);
}
}
@@ -365,15 +411,15 @@ impl FindIocStream {
}
}
fn create_in_flight(&mut self) {
let bid = BatchId(BATCH_ID.fetch_add(1, Ordering::AcqRel) as u32);
fn create_in_flight(&mut self, chns: Vec<String>) {
let bid = BatchId::next();
let mut sids = Vec::new();
let mut chs = Vec::new();
while chs.len() < self.channels_per_batch && self.channels_input.len() > 0 {
let sid = SearchId(SEARCH_ID2.fetch_add(1, Ordering::AcqRel) as u32);
for ch in chns {
let sid = SearchId::next();
self.bid_by_sid.insert(sid.clone(), bid.clone());
sids.push(sid);
chs.push(self.channels_input.pop_front().unwrap());
chs.push(ch);
}
let n = chs.len();
let batch = SearchBatch {
@@ -385,6 +431,7 @@ impl FindIocStream {
};
self.in_flight.insert(bid.clone(), batch);
self.batch_send_queue.push_back(bid);
self.stats.ca_udp_batch_created().inc();
}
fn handle_result(&mut self, src: SocketAddrV4, res: Vec<(SearchId, SocketAddrV4)>) {
@@ -411,6 +458,7 @@ impl FindIocStream {
addr: Some(addr),
dt,
};
trace!("udp search response {res:?}");
self.out_queue.push_back(res);
}
None => {
@@ -488,6 +536,28 @@ impl FindIocStream {
self.in_flight.remove(&bid);
}
}
fn get_input_up_to_batch_max(&mut self, cx: &mut Context) -> Vec<String> {
use Poll::*;
let mut ret = Vec::new();
loop {
break match self.channels_input.poll_next_unpin(cx) {
Ready(Some(item)) => {
ret.push(item);
if ret.len() < self.channels_per_batch {
continue;
}
}
Ready(None) => {}
Pending => {}
};
}
ret
}
fn ready_for_end_of_stream(&self) -> bool {
self.channels_input.is_closed() && self.in_flight.is_empty() && self.out_queue.is_empty()
}
}
impl Stream for FindIocStream {
@@ -495,16 +565,17 @@ impl Stream for FindIocStream {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
// self.thr_msg_0.trigger("FindIocStream::poll_next", &[]);
match self.ping.poll_unpin(cx) {
Ready(_) => {
self.ping = Box::pin(tokio::time::sleep(Duration::from_millis(200)));
cx.waker().wake_by_ref();
let _ = self.ping.poll_unpin(cx);
}
Pending => {}
}
self.clear_timed_out();
loop {
let mut loop_again = false;
let mut have_progress = false;
if self.out_queue.is_empty() == false {
let ret = std::mem::replace(&mut self.out_queue, VecDeque::new());
break Ready(Some(Ok(ret)));
@@ -514,7 +585,7 @@ impl Stream for FindIocStream {
Ready(Ok(mut g)) => match unsafe { Self::try_send(self.sock.0, &self.send_addr, &self.buf1) } {
Ready(Ok(())) => {
self.buf1.clear();
loop_again = true;
have_progress = true;
}
Ready(Err(e)) => {
error!("{e:?}");
@@ -522,7 +593,7 @@ impl Stream for FindIocStream {
Pending => {
g.clear_ready();
warn!("socket seemed ready for write, but is not");
loop_again = true;
have_progress = true;
}
},
Ready(Err(e)) => {
@@ -546,19 +617,19 @@ impl Stream for FindIocStream {
//info!("Serialize and queue {bid:?}");
self.send_addr = tgt.clone();
self.batch_send_queue.push_back(bid);
loop_again = true;
have_progress = true;
}
None => {
self.buf1.clear();
self.batch_send_queue.push_back(bid);
loop_again = true;
have_progress = true;
error!("tgtix does not exist");
}
}
}
None => {
//info!("Batch exhausted");
loop_again = true;
have_progress = true;
}
}
}
@@ -569,19 +640,37 @@ impl Stream for FindIocStream {
} else {
warn!("bid {bid:?} from batch send queue not in flight NOT done");
}
loop_again = true;
have_progress = true;
}
}
}
None => break,
}
}
while !self.channels_input.is_empty() && self.in_flight.len() < self.in_flight_max {
self.create_in_flight();
loop_again = true;
if !self.channels_input.is_closed() {
while self.in_flight.len() < self.in_flight_max {
#[cfg(DISABLED)]
{
let n1 = self.in_flight.len();
self.thr_msg_1.trigger("FindIocStream while A {}", &[&n1]);
}
let chns = self.get_input_up_to_batch_max(cx);
if chns.len() == 0 {
break;
} else {
#[cfg(DISABLED)]
{
let n1 = self.in_flight.len();
let n2 = chns.len();
self.thr_msg_2.trigger("FindIocStream while B {} {}", &[&n1, &n2]);
}
self.create_in_flight(chns);
have_progress = true;
}
}
}
break match self.afd.poll_read_ready(cx) {
Ready(Ok(mut g)) => match unsafe { Self::try_read(self.sock.0) } {
Ready(Ok(mut g)) => match unsafe { Self::try_read(self.sock.0, &self.stats) } {
Ready(Ok((src, res))) => {
self.handle_result(src, res);
continue;
@@ -601,20 +690,16 @@ impl Stream for FindIocStream {
Ready(Some(Err(e)))
}
Pending => {
if loop_again {
if have_progress {
continue;
} else {
if self.channels_input.is_empty() && self.in_flight.is_empty() && self.out_queue.is_empty() {
if self.stop_on_empty_queue {
Ready(None)
} else {
match self.sleeper.poll_unpin(cx) {
Ready(_) => {
self.sleeper = Box::pin(tokio::time::sleep(Duration::from_millis(500)));
continue;
}
Pending => Pending,
if self.ready_for_end_of_stream() {
match self.sleeper.poll_unpin(cx) {
Ready(_) => {
self.sleeper = Box::pin(tokio::time::sleep(Duration::from_millis(500)));
continue;
}
Pending => Pending,
}
} else {
Pending

View File

@@ -1,3 +1,4 @@
use super::findioc::FindIocRes;
use crate::ca::findioc::FindIocStream;
use crate::conf::CaIngestOpts;
use async_channel::Receiver;
@@ -8,15 +9,16 @@ use dbpg::iocindex::IocSearchIndexWorker;
use err::Error;
use futures_util::StreamExt;
use log::*;
use stats::IocFinderStats;
use std::collections::VecDeque;
use std::net::IpAddr;
use std::net::SocketAddr;
use std::net::SocketAddrV4;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use taskrun::tokio;
use tokio::task::JoinHandle;
const DB_WORKER_COUNT: usize = 4;
async fn resolve_address(addr_str: &str) -> Result<SocketAddr, Error> {
const PORT_DEFAULT: u16 = 5064;
let ac = match addr_str.parse::<SocketAddr>() {
@@ -68,6 +70,7 @@ impl DbUpdateWorker {
}
}
#[cfg(DISABLED)]
pub async fn ca_search(opts: CaIngestOpts, channels: &Vec<String>) -> Result<(), Error> {
info!("ca_search begin");
let (pg, jh) = dbpg::conn::make_pg_client(opts.postgresql_config())
@@ -76,51 +79,17 @@ pub async fn ca_search(opts: CaIngestOpts, channels: &Vec<String>) -> Result<(),
dbpg::schema::schema_check(&pg)
.await
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
let mut addrs = Vec::new();
for s in opts.search() {
match resolve_address(s).await {
Ok(addr) => {
trace!("resolved {s} as {addr}");
addrs.push(addr);
}
Err(e) => {
error!("can not resolve {s} {e}");
}
}
}
let gw_addrs = {
let mut gw_addrs = Vec::new();
for s in opts.search_blacklist() {
match resolve_address(s).await {
Ok(addr) => {
trace!("resolved {s} as {addr}");
gw_addrs.push(addr);
}
Err(e) => {
warn!("can not resolve {s} {e}");
}
}
}
gw_addrs
};
let addrs = addrs
.into_iter()
.filter_map(|x| match x {
SocketAddr::V4(x) => Some(x),
SocketAddr::V6(_) => {
error!("TODO check ipv6 support for IOCs");
None
}
})
.collect();
let mut finder = FindIocStream::new(addrs, Duration::from_millis(800), 20, 4);
finder.set_stop_on_empty_queue();
for ch in channels.iter() {
finder.push(ch.into());
}
let (search_tgts, blacklist) = search_tgts_from_opts(&opts).await?;
// let mut finder = FindIocStream::new(search_tgts, Duration::from_millis(800), 20, 16);
// finder.set_stop_on_empty_queue();
// for ch in channels.iter() {
// finder.push(ch.into());
// }
const DB_WORKER_COUNT: usize = 1;
let (dbtx, dbrx) = async_channel::bounded(64);
let mut dbworkers = Vec::new();
for _ in 0..DB_WORKER_COUNT {
let (pg, jh) = dbpg::conn::make_pg_client(opts.postgresql_config())
@@ -201,3 +170,80 @@ pub async fn ca_search(opts: CaIngestOpts, channels: &Vec<String>) -> Result<(),
info!("all done");
Ok(())
}
pub async fn ca_search_workers_start(
opts: &CaIngestOpts,
stats: Arc<IocFinderStats>,
) -> Result<
(
Sender<String>,
Receiver<Result<VecDeque<FindIocRes>, Error>>,
JoinHandle<Result<(), Error>>,
Vec<JoinHandle<Result<(), Error>>>,
),
Error,
> {
let (search_tgts, blacklist) = search_tgts_from_opts(&opts).await?;
let batch_run_max = Duration::from_millis(800);
let (inp_tx, inp_rx) = async_channel::bounded(256);
let (out_tx, out_rx) = async_channel::bounded(256);
let finder = FindIocStream::new(inp_rx, search_tgts, blacklist, batch_run_max, 20, 16, stats);
let jh = taskrun::spawn(finder_run(finder, out_tx));
let jhs = Vec::new();
Ok((inp_tx, out_rx, jh, jhs))
}
async fn search_tgts_from_opts(opts: &CaIngestOpts) -> Result<(Vec<SocketAddrV4>, Vec<SocketAddrV4>), Error> {
let mut addrs = Vec::new();
for s in opts.search() {
match resolve_address(s).await {
Ok(addr) => {
trace!("resolved {s} as {addr}");
match addr {
SocketAddr::V4(addr) => {
addrs.push(addr);
}
SocketAddr::V6(_) => {
error!("no ipv6 for epics");
}
}
}
Err(e) => {
error!("can not resolve {s} {e}");
}
}
}
let blacklist = {
let mut addrs = Vec::new();
for s in opts.search_blacklist() {
match resolve_address(s).await {
Ok(addr) => {
trace!("resolved {s} as {addr}");
match addr {
SocketAddr::V4(addr) => {
addrs.push(addr);
}
SocketAddr::V6(_) => {
error!("no ipv6 for epics");
}
}
}
Err(e) => {
warn!("can not resolve {s} {e}");
}
}
}
addrs
};
Ok((addrs, blacklist))
}
async fn finder_run(mut finder: FindIocStream, tx: Sender<Result<VecDeque<FindIocRes>, Error>>) -> Result<(), Error> {
while let Some(item) = finder.next().await {
if let Err(_) = tx.send(item).await {
break;
}
}
debug!("finder_run done");
Ok(())
}

View File

@@ -7,7 +7,7 @@ use std::net::SocketAddrV4;
use std::time::Instant;
use std::time::SystemTime;
pub const CHANNEL_STATUS_DUMMY_SCALAR_TYPE: i32 = i32::MIN + 1;
pub const CHANNEL_STATUS_DUMMY_SCALAR_TYPE: i32 = 14;
#[derive(Debug)]
pub enum CaConnStateValue {
@@ -60,7 +60,7 @@ pub enum WithStatusSeriesIdStateInner {
#[serde(with = "humantime_serde")]
since: SystemTime,
},
SearchPending {
AddrSearchPending {
#[serde(with = "humantime_serde")]
since: SystemTime,
},
@@ -72,6 +72,10 @@ pub enum WithStatusSeriesIdStateInner {
#[serde(with = "humantime_serde")]
since: SystemTime,
},
MaybeWrongAddress {
#[serde(with = "humantime_serde")]
since: SystemTime,
},
}
#[derive(Debug, Clone, Serialize)]
@@ -104,6 +108,7 @@ pub enum ChannelStateValue {
#[derive(Debug, Clone, Serialize)]
pub struct ChannelState {
pub value: ChannelStateValue,
pub running_cmd_id: Option<usize>,
}
#[derive(Debug, Clone, Serialize)]

View File

@@ -22,6 +22,7 @@ use stats::CaConnStatsAggDiff;
use stats::CaProtoStats;
use stats::DaemonStats;
use stats::InsertWorkerStats;
use stats::IocFinderStats;
use stats::SeriesByChannelStats;
use std::collections::BTreeMap;
use std::collections::HashMap;
@@ -39,6 +40,7 @@ pub struct StatsSet {
ca_proto: Arc<CaProtoStats>,
insert_worker_stats: Arc<InsertWorkerStats>,
series_by_channel_stats: Arc<SeriesByChannelStats>,
ioc_finder_stats: Arc<IocFinderStats>,
insert_frac: Arc<AtomicU64>,
}
@@ -50,6 +52,7 @@ impl StatsSet {
ca_proto: Arc<CaProtoStats>,
insert_worker_stats: Arc<InsertWorkerStats>,
series_by_channel_stats: Arc<SeriesByChannelStats>,
ioc_finder_stats: Arc<IocFinderStats>,
insert_frac: Arc<AtomicU64>,
) -> Self {
Self {
@@ -59,6 +62,7 @@ impl StatsSet {
ca_proto,
insert_worker_stats,
series_by_channel_stats,
ioc_finder_stats,
insert_frac,
}
}
@@ -208,14 +212,14 @@ fn make_routes(dcom: Arc<DaemonComm>, connset_cmd_tx: Sender<CaConnSetEvent>, st
get({
//
|| async move {
debug!("metrics");
let mut s1 = stats_set.daemon.prometheus();
let s1 = stats_set.daemon.prometheus();
let s2 = stats_set.ca_conn_set.prometheus();
let s3 = stats_set.insert_worker_stats.prometheus();
let s4 = stats_set.ca_conn.prometheus();
let s5 = stats_set.series_by_channel_stats.prometheus();
let s6 = stats_set.ca_proto.prometheus();
[s1, s2, s3, s4, s5, s6].join("")
let s7 = stats_set.ioc_finder_stats.prometheus();
[s1, s2, s3, s4, s5, s6, s7].join("")
}
}),
)

View File

@@ -72,8 +72,8 @@ impl<T> SenderPolling<T> {
}
pub fn drop(&mut self) {
self.sender = None;
self.fut = None;
self.sender = None;
}
pub fn len(&self) -> Option<usize> {

View File

@@ -14,6 +14,7 @@ use scylla::prepared_statement::PreparedStatement;
use scylla::transport::errors::DbError;
use scylla::transport::errors::QueryError;
use scylla::QueryResult;
use series::ChannelStatusSeriesId;
use series::SeriesId;
use smallvec::smallvec;
use smallvec::SmallVec;
@@ -126,6 +127,7 @@ pub enum ChannelStatusClosedReason {
IocTimeout,
NoProtocol,
ProtocolDone,
ConnectFail,
}
#[derive(Debug)]
@@ -152,6 +154,7 @@ impl ChannelStatus {
IocTimeout => 8,
NoProtocol => 9,
ProtocolDone => 10,
ConnectFail => 11,
},
}
}
@@ -170,6 +173,7 @@ impl ChannelStatus {
8 => Closed(IocTimeout),
9 => Closed(NoProtocol),
10 => Closed(ProtocolDone),
11 => Closed(ConnectFail),
24 => AssignedToAddress,
_ => {
return Err(err::Error::with_msg_no_trace(format!(
@@ -184,10 +188,20 @@ impl ChannelStatus {
#[derive(Debug)]
pub struct ChannelStatusItem {
pub ts: SystemTime,
pub series: SeriesId,
pub cssid: ChannelStatusSeriesId,
pub status: ChannelStatus,
}
impl ChannelStatusItem {
pub fn new_closed_conn_timeout(ts: SystemTime, cssid: ChannelStatusSeriesId) -> Self {
Self {
ts,
cssid,
status: ChannelStatus::Closed(ChannelStatusClosedReason::IocTimeout),
}
}
}
#[derive(Debug)]
pub struct InsertItem {
pub series: SeriesId,
@@ -565,9 +579,9 @@ pub fn insert_channel_status_fut(
let ts_msp = ts / CONNECTION_STATUS_DIV * CONNECTION_STATUS_DIV;
let ts_lsp = ts - ts_msp;
let kind = item.status.to_kind();
let series = item.series.id();
let cssid = item.cssid.id();
let params = (
series as i64,
cssid as i64,
ts_msp as i64,
ts_lsp as i64,
kind as i32,
@@ -581,7 +595,7 @@ pub fn insert_channel_status_fut(
let params = (
ts_msp as i64,
ts_lsp as i64,
series as i64,
cssid as i64,
kind as i32,
ttls.index.as_secs() as i32,
);
@@ -626,9 +640,9 @@ pub async fn insert_channel_status(
let ts_msp = ts / CONNECTION_STATUS_DIV * CONNECTION_STATUS_DIV;
let ts_lsp = ts - ts_msp;
let kind = item.status.to_kind();
let series = item.series.id();
let cssid = item.cssid.id();
let params = (
series as i64,
cssid as i64,
ts_msp as i64,
ts_lsp as i64,
kind as i32,
@@ -641,7 +655,7 @@ pub async fn insert_channel_status(
let params = (
ts_msp as i64,
ts_lsp as i64,
series as i64,
cssid as i64,
kind as i32,
ttl.as_secs() as i32,
);

View File

@@ -275,7 +275,7 @@ impl EvTabDim1 {
format!("events_array_{}", self.sty)
}
fn cql(&self) -> String {
fn cql_create(&self) -> String {
use std::fmt::Write;
let mut s = String::new();
let ttl = self.default_time_to_live.as_secs();
@@ -332,8 +332,8 @@ async fn check_event_tables(scy: &ScySession) -> Result<(), Error> {
default_time_to_live: dhours(1),
compaction_window_size: dhours(12),
};
if !check_table_readable(&desc.name(), scy).await? {
scy.query(desc.cql(), ()).await?;
if !has_table(&desc.name(), scy).await? {
scy.query(desc.cql_create(), ()).await?;
}
}
Ok(())

View File

@@ -223,6 +223,10 @@ stats_proc::stats_struct!((
channel_health_timeout,
ioc_search_start,
ioc_addr_found,
ioc_addr_not_found,
ioc_addr_result_for_unknown_channel,
ca_conn_task_begin,
ca_conn_task_done,
ca_conn_task_join_done_ok,
ca_conn_task_join_done_err,
ca_conn_task_join_err,
@@ -241,6 +245,7 @@ stats_proc::stats_struct!((
poll_no_progress_no_pending,
),
values(
storage_insert_queue_len,
storage_insert_tx_len,
channel_info_query_queue_len,
channel_info_query_sender_len,
@@ -253,6 +258,7 @@ stats_proc::stats_struct!((
channel_unassigned,
channel_assigned,
channel_connected,
channel_maybe_wrong_address,
channel_rogue,
),
),
@@ -304,6 +310,27 @@ stats_proc::stats_struct!((
worker_finish,
)
),
stats_struct(
name(IocFinderStats),
prefix(ioc_finder),
counters(
dbsearcher_batch_recv,
dbsearcher_item_recv,
dbsearcher_select_res_0,
dbsearcher_select_error_len_mismatch,
dbsearcher_batch_send,
dbsearcher_item_send,
ca_udp_error,
ca_udp_warn,
ca_udp_unaccounted_data,
ca_udp_batch_created,
ca_udp_io_error,
ca_udp_io_empty,
ca_udp_io_recv,
ca_udp_first_msg_not_version,
),
values(db_lookup_workers,)
),
));
// #[cfg(DISABLED)]
@@ -417,7 +444,8 @@ stats_proc::stats_struct!((
channel_unknown_address,
channel_search_pending,
channel_with_address,
channel_no_address
channel_no_address,
connset_health_lat_ema,
),
),
agg(name(DaemonStatsAgg), parent(DaemonStats)),