Simplify shutdown

This commit is contained in:
Dominik Werder
2023-09-08 16:07:08 +02:00
parent 4a31f3f81f
commit 8bbd6c37d1
10 changed files with 374 additions and 488 deletions

View File

@@ -4,17 +4,12 @@ pub mod inserthook;
use async_channel::Receiver;
use async_channel::Sender;
use async_channel::WeakReceiver;
use dbpg::seriesbychannel::ChannelInfoQuery;
use err::Error;
use log::*;
use netfetch::ca::conn::CaConnEvent;
use netfetch::ca::conn::ConnCommand;
use netfetch::ca::connset::CaConnSet;
use netfetch::ca::connset::CaConnSetCtrl;
use netfetch::ca::connset::CaConnSetItem;
use netfetch::ca::findioc::FindIocRes;
use netfetch::ca::IngestCommons;
use netfetch::ca::SlowWarnable;
use netfetch::conf::CaIngestOpts;
use netfetch::daemon_common::Channel;
use netfetch::daemon_common::DaemonEvent;
@@ -25,14 +20,8 @@ use netpod::ScyllaConfig;
use scywr::insertworker::Ttls;
use scywr::iteminsertqueue as scywriiq;
use scywr::store::DataStore;
use scywriiq::ChannelStatus;
use scywriiq::ChannelStatusItem;
use scywriiq::CommonInsertItemQueue;
use scywriiq::ConnectionStatus;
use scywriiq::ConnectionStatusItem;
use scywriiq::QueryItem;
use serde::Serialize;
use series::series::Existence;
use series::ChannelStatusSeriesId;
use series::SeriesId;
use stats::DaemonStats;
@@ -49,8 +38,6 @@ use std::time::Instant;
use std::time::SystemTime;
use taskrun::tokio;
use tokio::task::JoinHandle;
use tracing::info_span;
use tracing::Instrument;
const CA_CONN_INSERT_QUEUE_MAX: usize = 256;
@@ -90,7 +77,7 @@ pub struct Daemon {
count_unassigned: usize,
count_assigned: usize,
last_status_print: SystemTime,
insert_workers_jh: Vec<JoinHandle<()>>,
insert_workers_jh: Vec<JoinHandle<Result<(), Error>>>,
ingest_commons: Arc<IngestCommons>,
caconn_last_channel_check: Instant,
stats: Arc<DaemonStats>,
@@ -98,7 +85,6 @@ pub struct Daemon {
insert_rx_weak: WeakReceiver<QueryItem>,
connset_ctrl: CaConnSetCtrl,
connset_status_last: Instant,
query_item_tx: Sender<QueryItem>,
}
impl Daemon {
@@ -114,33 +100,38 @@ impl Daemon {
.await
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
let common_insert_item_queue = Arc::new(CommonInsertItemQueue::new(opts.insert_item_queue_cap));
let (query_item_tx, query_item_rx) = async_channel::bounded(opts.insert_item_queue_cap);
let insert_queue_counter = Arc::new(AtomicUsize::new(0));
// Insert queue hook
let rx = inserthook::active_channel_insert_hook(common_insert_item_queue.receiver().unwrap());
let common_insert_item_queue_2 = rx;
let query_item_rx = inserthook::active_channel_insert_hook(query_item_rx);
let conn_set_ctrl = CaConnSet::start(
opts.backend.clone(),
opts.local_epics_hostname.clone(),
common_insert_item_queue.sender().unwrap().inner().clone(),
query_item_tx,
channel_info_query_tx,
opts.pgconf.clone(),
);
// TODO remove
tokio::spawn({
let rx = conn_set_ctrl.rx.clone();
let rx = conn_set_ctrl.receiver().clone();
let tx = daemon_ev_tx.clone();
async move {
loop {
match rx.recv().await {
Ok(item) => {
let item = DaemonEvent::CaConnSetItem(item);
tx.send(item).await;
if let Err(_) = tx.send(item).await {
debug!("CaConnSet to Daemon adapter: tx closed, break");
break;
}
}
Err(_) => {
debug!("CaConnSet to Daemon adapter: rx done, break");
break;
}
Err(e) => break,
}
}
}
@@ -150,7 +141,6 @@ impl Daemon {
pgconf: Arc::new(opts.pgconf.clone()),
backend: opts.backend().into(),
local_epics_hostname: opts.local_epics_hostname.clone(),
insert_item_queue: common_insert_item_queue.clone(),
data_store: datastore.clone(),
insert_ivl_min: Arc::new(AtomicU64::new(0)),
extra_inserts_conf: tokio::sync::Mutex::new(ExtraInsertsConf::new()),
@@ -166,11 +156,11 @@ impl Daemon {
let store_stats = Arc::new(stats::CaConnStats::new());
let ttls = opts.ttls.clone();
let insert_worker_opts = Arc::new(ingest_commons.as_ref().into());
let jh_insert_workers = scywr::insertworker::spawn_scylla_insert_workers(
let insert_workers_jh = scywr::insertworker::spawn_scylla_insert_workers(
opts.scyconf.clone(),
opts.insert_scylla_sessions,
opts.insert_worker_count,
common_insert_item_queue_2.clone(),
query_item_rx.clone(),
insert_worker_opts,
store_stats.clone(),
use_rate_limit_queue,
@@ -223,15 +213,14 @@ impl Daemon {
count_unassigned: 0,
count_assigned: 0,
last_status_print: SystemTime::now(),
insert_workers_jh: jh_insert_workers,
insert_workers_jh,
ingest_commons,
caconn_last_channel_check: Instant::now(),
stats: Arc::new(DaemonStats::new()),
shutting_down: false,
insert_rx_weak: common_insert_item_queue_2.downgrade(),
insert_rx_weak: query_item_rx.downgrade(),
connset_ctrl: conn_set_ctrl,
connset_status_last: Instant::now(),
query_item_tx: common_insert_item_queue.sender().unwrap().inner().clone(),
};
Ok(ret)
}
@@ -240,10 +229,6 @@ impl Daemon {
&self.stats
}
fn allow_create_new_connections(&self) -> bool {
!self.shutting_down
}
async fn check_caconn_chans(&mut self) -> Result<(), Error> {
if self.caconn_last_channel_check.elapsed() > CHANNEL_CHECK_INTERVAL {
self.connset_ctrl.check_health().await?;
@@ -252,24 +237,17 @@ impl Daemon {
Ok(())
}
async fn ca_conn_send_shutdown(&mut self) -> Result<(), Error> {
self.connset_ctrl.shutdown().await?;
Ok(())
}
async fn handle_timer_tick(&mut self) -> Result<(), Error> {
if self.shutting_down {
let sa1 = self.ingest_commons.insert_item_queue.sender_count();
let sa2 = self.ingest_commons.insert_item_queue.sender_count_2();
let nworkers = self
.ingest_commons
.insert_workers_running
.load(atomic::Ordering::Acquire);
let nitems = self.insert_rx_weak.upgrade().map(|x| x.len());
info!(
"qu senders A {:?} {:?} nworkers {} nitems {:?}",
sa1, sa2, nworkers, nitems
);
let nitems = self
.insert_rx_weak
.upgrade()
.map(|x| (x.sender_count(), x.receiver_count(), x.len()));
info!("qu senders A nworkers {} nitems {:?}", nworkers, nitems);
if nworkers == 0 {
info!("goodbye");
std::process::exit(0);
@@ -330,163 +308,7 @@ impl Daemon {
}
async fn handle_channel_remove(&mut self, ch: Channel) -> Result<(), Error> {
warn!("TODO handle_channel_remove");
#[cfg(DISABLED)]
if let Some(k) = self.channel_states.get_mut(&ch) {
match &k.value {
ChannelStateValue::Active(j) => match j {
ActiveChannelState::Init { .. } => {
k.value = ChannelStateValue::ToRemove { addr: None };
}
ActiveChannelState::WaitForStatusSeriesId { .. } => {
k.value = ChannelStateValue::ToRemove { addr: None };
}
ActiveChannelState::WithStatusSeriesId {
status_series_id: _,
state,
} => match state.inner {
WithStatusSeriesIdStateInner::UnknownAddress { .. } => {
k.value = ChannelStateValue::ToRemove { addr: None };
}
WithStatusSeriesIdStateInner::SearchPending { .. } => {
k.value = ChannelStateValue::ToRemove { addr: None };
}
WithStatusSeriesIdStateInner::WithAddress { addr, .. } => {
k.value = ChannelStateValue::ToRemove {
addr: Some(addr.clone()),
};
}
WithStatusSeriesIdStateInner::NoAddress { .. } => {
k.value = ChannelStateValue::ToRemove { addr: None };
}
},
},
ChannelStateValue::ToRemove { .. } => {}
}
}
Ok(())
}
async fn handle_search_done(&mut self, item: Result<VecDeque<FindIocRes>, Error>) -> Result<(), Error> {
warn!("TODO handle_search_done");
//debug!("handle SearchDone: {res:?}");
// let allow_create_new_connections = self.allow_create_new_connections();
// let tsnow = SystemTime::now();
#[cfg(DISABLED)]
match item {
Ok(ress) => {
SEARCH_ANS_COUNT.fetch_add(ress.len(), atomic::Ordering::AcqRel);
for res in ress {
if let Some(addr) = &res.addr {
self.stats.ioc_search_some_inc();
let ch = Channel::new(res.channel);
if let Some(st) = self.channel_states.get_mut(&ch) {
match &st.value {
ChannelStateValue::Active(st2) => match st2 {
ActiveChannelState::Init { .. } => {}
ActiveChannelState::WaitForStatusSeriesId { .. } => {}
ActiveChannelState::WithStatusSeriesId {
status_series_id,
state,
} => match state.inner {
WithStatusSeriesIdStateInner::SearchPending { since, did_send: _ } => {
if allow_create_new_connections {
let dt = tsnow.duration_since(since).unwrap();
if dt > SEARCH_PENDING_TIMEOUT_WARN {
warn!(
" FOUND {:5.0} {:5.0} {addr}",
1e3 * dt.as_secs_f32(),
1e3 * res.dt.as_secs_f32()
);
}
let stnew =
ChannelStateValue::Active(ActiveChannelState::WithStatusSeriesId {
status_series_id: status_series_id.clone(),
state: WithStatusSeriesIdState {
inner: WithStatusSeriesIdStateInner::WithAddress {
addr: addr.clone(),
state: WithAddressState::Unassigned {
assign_at: tsnow,
},
},
},
});
st.value = stnew;
} else {
// Emit something here?
}
}
_ => {
warn!(
"address found, but state for {ch:?} is not SearchPending: {:?}",
st.value
);
}
},
},
ChannelStateValue::ToRemove { addr: _ } => {}
}
} else {
warn!("can not find channel state for {ch:?}");
}
} else {
//debug!("no addr from search in {res:?}");
let ch = Channel::new(res.channel);
if let Some(st) = self.channel_states.get_mut(&ch) {
let mut unexpected_state = true;
match &st.value {
ChannelStateValue::Active(st2) => match st2 {
ActiveChannelState::Init { .. } => {}
ActiveChannelState::WaitForStatusSeriesId { .. } => {}
ActiveChannelState::WithStatusSeriesId {
status_series_id,
state: st3,
} => match &st3.inner {
WithStatusSeriesIdStateInner::UnknownAddress { .. } => {}
WithStatusSeriesIdStateInner::SearchPending { since, .. } => {
unexpected_state = false;
let dt = tsnow.duration_since(*since).unwrap();
if dt > SEARCH_PENDING_TIMEOUT_WARN {
warn!(
"NOT FOUND {:5.0} {:5.0}",
1e3 * dt.as_secs_f32(),
1e3 * res.dt.as_secs_f32()
);
}
st.value =
ChannelStateValue::Active(ActiveChannelState::WithStatusSeriesId {
status_series_id: status_series_id.clone(),
state: WithStatusSeriesIdState {
inner: WithStatusSeriesIdStateInner::NoAddress { since: tsnow },
},
});
}
WithStatusSeriesIdStateInner::WithAddress { .. } => {}
WithStatusSeriesIdStateInner::NoAddress { .. } => {}
},
},
ChannelStateValue::ToRemove { .. } => {}
}
if unexpected_state {
warn!("no address, but state for {ch:?} is not SearchPending: {:?}", st.value);
}
} else {
warn!("can not find channel state for {ch:?}");
}
}
}
}
Err(e) => {
self.stats.ioc_search_err_inc();
error!("error from search: {e}");
}
}
Ok(())
}
async fn handle_ca_conn_done(&mut self, conn_addr: SocketAddrV4) -> Result<(), Error> {
warn!("TODO handle_ca_conn_done {conn_addr:?}");
self.connset_ctrl.remove_channel(ch.id().into()).await?;
Ok(())
}
@@ -541,48 +363,6 @@ impl Daemon {
Ok(())
}
async fn handle_ca_conn_event(&mut self, addr: SocketAddrV4, item: CaConnEvent) -> Result<(), Error> {
self.stats.event_ca_conn_inc();
use netfetch::ca::conn::CaConnEventValue::*;
match item.value {
None => {
// TODO count, maybe reduce.
Ok(())
}
EchoTimeout => {
self.stats.ca_echo_timeout_total_inc();
error!("TODO on EchoTimeout remove the CaConn and reset channels");
Ok(())
}
ConnCommandResult(item) => {
self.stats.todo_mark_inc();
use netfetch::ca::conn::ConnCommandResultKind::*;
match &item.kind {
CheckHealth => {
todo!("TODO collect the CaConn health check in CaConnSet");
#[cfg(DISABLED)]
if let Some(st) = self.connection_states.get_mut(&addr) {
self.stats.ca_conn_status_feedback_recv_inc();
st.last_feedback = Instant::now();
Ok(())
} else {
self.stats.ca_conn_status_feedback_no_dst_inc();
Ok(())
}
}
}
}
QueryItem(item) => {
self.query_item_tx.send(item).await?;
Ok(())
}
EndOfStream => {
self.stats.ca_conn_status_done_inc();
self.handle_ca_conn_done(addr).await
}
}
}
async fn handle_ca_conn_set_item(&mut self, item: CaConnSetItem) -> Result<(), Error> {
use CaConnSetItem::*;
match item {
@@ -647,8 +427,6 @@ impl Daemon {
}
ChannelAdd(ch) => self.handle_channel_add(ch).await,
ChannelRemove(ch) => self.handle_channel_remove(ch).await,
SearchDone(item) => self.handle_search_done(item).await,
CaConnEvent(addr, item) => self.handle_ca_conn_event(addr, item).await,
CaConnSetItem(item) => self.handle_ca_conn_set_item(item).await,
Shutdown => self.handle_shutdown().await,
};
@@ -696,14 +474,17 @@ impl Daemon {
taskrun::spawn(ticker);
}
pub async fn daemon(&mut self) -> Result<(), Error> {
pub async fn daemon(mut self) -> Result<(), Error> {
Self::spawn_ticker(self.tx.clone(), self.stats.clone());
loop {
if self.shutting_down {
break;
}
match self.rx.recv().await {
Ok(item) => match self.handle_event(item).await {
Ok(_) => {}
Ok(item) => match self.handle_event(item.clone()).await {
Ok(()) => {}
Err(e) => {
error!("daemon: {e}");
error!("fn daemon: error from handle_event {item:?} {e}");
break;
}
},
@@ -713,8 +494,23 @@ impl Daemon {
}
}
}
warn!("TODO should not have to close the channel");
warn!("TODO wait for insert workers");
let _ = &self.insert_workers_jh;
while let Some(jh) = self.insert_workers_jh.pop() {
match jh.await.map_err(Error::from_string) {
Ok(x) => match x {
Ok(()) => {
debug!("joined insert worker");
}
Err(e) => {
error!("joined insert worker, error {e}");
}
},
Err(e) => {
error!("insert worker join error {e}");
}
}
}
info!("daemon done");
Ok(())
}
@@ -778,7 +574,7 @@ pub async fn run(opts: CaIngestOpts, channels: Vec<String>) -> Result<(), Error>
insert_worker_count: opts.insert_worker_count(),
insert_scylla_sessions: opts.insert_scylla_sessions(),
};
let mut daemon = Daemon::new(opts2).await?;
let daemon = Daemon::new(opts2).await?;
let tx = daemon.tx.clone();
let daemon_stats = daemon.stats().clone();
@@ -789,16 +585,13 @@ pub async fn run(opts: CaIngestOpts, channels: Vec<String>) -> Result<(), Error>
tokio::task::spawn(fut)
};
let daemon_jh = taskrun::spawn(async move {
// TODO handle Err
daemon.daemon().await.unwrap();
});
let daemon_jh = taskrun::spawn(daemon.daemon());
for s in &channels {
let ch = Channel::new(s.into());
tx.send(DaemonEvent::ChannelAdd(ch)).await?;
}
info!("configured channels applied");
daemon_jh.await.unwrap();
debug!("{} configured channels applied", channels.len());
daemon_jh.await.map_err(|e| Error::with_msg_no_trace(e.to_string()))??;
if false {
metrics_jh.await.unwrap();
}

View File

@@ -6,8 +6,6 @@ pub mod proto;
pub mod search;
pub mod statemap;
use self::connset::CaConnSetCtrl;
use crate::ca::connset::CaConnSet;
use crate::metrics::ExtraInsertsConf;
use crate::rt::TokMx;
use futures_util::Future;
@@ -15,7 +13,6 @@ use futures_util::FutureExt;
use log::*;
use netpod::Database;
use scywr::insertworker::InsertWorkerOpts;
use scywr::iteminsertqueue::CommonInsertItemQueue;
use scywr::store::DataStore;
use stats::CaConnStatsAgg;
use std::pin::Pin;
@@ -39,7 +36,6 @@ pub struct IngestCommons {
pub pgconf: Arc<Database>,
pub backend: String,
pub local_epics_hostname: String,
pub insert_item_queue: Arc<CommonInsertItemQueue>,
pub data_store: Arc<DataStore>,
pub insert_ivl_min: Arc<AtomicU64>,
pub extra_inserts_conf: TokMx<ExtraInsertsConf>,

View File

@@ -262,6 +262,7 @@ enum CaConnState {
PeerReady,
Wait(Pin<Box<dyn Future<Output = ()> + Send>>),
Shutdown,
EndOfStream,
}
fn wait_fut(dt: u64) -> Pin<Box<dyn Future<Output = ()> + Send>> {
@@ -852,11 +853,11 @@ impl CaConn {
fn check_channels_alive(&mut self) -> Result<(), Error> {
let tsnow = Instant::now();
trace!("CheckChannelsAlive {addr:?}", addr = &self.remote_addr_dbg);
trace!("check_channels_alive {addr:?}", addr = &self.remote_addr_dbg);
if self.ioc_ping_last.elapsed() > Duration::from_millis(20000) {
if let Some(started) = self.ioc_ping_start {
if started.elapsed() > Duration::from_millis(4000) {
warn!("Echo timeout {addr:?}", addr = self.remote_addr_dbg);
warn!("pong timeout {addr:?}", addr = self.remote_addr_dbg);
let item = CaConnEvent {
ts: Instant::now(),
value: CaConnEventValue::EchoTimeout,
@@ -867,11 +868,11 @@ impl CaConn {
} else {
self.ioc_ping_start = Some(Instant::now());
if let Some(proto) = &mut self.proto {
debug!("push echo to {}", self.remote_addr_dbg);
debug!("ping to {}", self.remote_addr_dbg);
let msg = CaMsg { ty: CaMsgTy::Echo };
proto.push_out(msg);
} else {
warn!("can not push echo, no proto {}", self.remote_addr_dbg);
warn!("can not ping {} no proto", self.remote_addr_dbg);
self.trigger_shutdown(ChannelStatusClosedReason::NoProtocol);
}
}
@@ -1630,6 +1631,7 @@ impl CaConn {
Pending => Ok(Some(Pending)),
},
CaConnState::Shutdown => Ok(None),
CaConnState::EndOfStream => Ok(None),
}
}
@@ -1725,8 +1727,8 @@ impl CaConn {
Ok(())
}
fn outgoing_queues_empty(&self) -> bool {
self.channel_info_query_queue.is_empty() && !self.channel_info_query_sending.is_sending()
fn queues_async_out_flushed(&self) -> bool {
self.channel_info_query_queue.is_empty() && self.channel_info_query_sending.is_idle()
}
fn attempt_flush_channel_info_query(mut self: Pin<&mut Self>, cx: &mut Context) -> Result<(), Error> {
@@ -1741,7 +1743,7 @@ impl CaConn {
}
} else if let Some(item) = self.channel_info_query_queue.pop_front() {
let sd = &mut self.channel_info_query_sending;
sd.send2(item);
sd.send(item);
continue;
} else {
Ok(())
@@ -1758,7 +1760,9 @@ impl Stream for CaConn {
self.stats.caconn_poll_count_inc();
loop {
let mut have_pending = false;
break if let Err(e) = self.as_mut().handle_own_ticker(cx) {
break if let CaConnState::EndOfStream = self.state {
Ready(None)
} else if let Err(e) = self.as_mut().handle_own_ticker(cx) {
Ready(Some(Err(e)))
} else if let Some(item) = self.cmd_res_queue.pop_front() {
let item = CaConnEvent {
@@ -1779,21 +1783,17 @@ impl Stream for CaConn {
} else if let Ready(Some(Err(e))) = self.as_mut().handle_conn_command(cx) {
Ready(Some(Err(e)))
} else if let Some(item) = {
if self.is_shutdown() {
None
} else {
match self.loop_inner(cx) {
// TODO what does this mean: should we re-loop or yield something?
Ok(Some(Ready(()))) => None,
// This is the last step, so we yield Pending.
// But in general, this does not compose well when we would add another step.
Ok(Some(Pending)) => {
have_pending = true;
None
}
Ok(None) => None,
Err(e) => Some(Err(e)),
match self.loop_inner(cx) {
// TODO what does this mean: should we re-loop or yield something?
Ok(Some(Ready(()))) => None,
// This is the last step, so we yield Pending.
// But in general, this does not compose well when we would add another step.
Ok(Some(Pending)) => {
have_pending = true;
None
}
Ok(None) => None,
Err(e) => Some(Err(e)),
}
} {
Ready(Some(item))
@@ -1804,7 +1804,10 @@ impl Stream for CaConn {
ts: Instant::now(),
value: CaConnEventValue::None,
};
if have_pending {
if self.is_shutdown() && self.queues_async_out_flushed() {
self.state = CaConnState::EndOfStream;
Ready(None)
} else if have_pending {
Pending
} else {
continue;

View File

@@ -109,12 +109,18 @@ pub struct ChannelAdd {
local_epics_hostname: String,
}
#[derive(Debug, Clone)]
pub struct ChannelRemove {
name: String,
}
#[derive(Debug)]
pub enum ConnSetCmd {
SeriesLookupResult(Result<ChannelInfoResult, dbpg::seriesbychannel::Error>),
ChannelAdd(ChannelAdd),
ChannelAddWithStatusId(ChannelAddWithStatusId),
ChannelAddWithAddr(ChannelAddWithAddr),
ChannelRemove(ChannelRemove),
IocAddrQueryResult(VecDeque<FindIocRes>),
CheckHealth,
Shutdown,
@@ -126,18 +132,22 @@ pub enum CaConnSetEvent {
CaConnEvent((SocketAddr, CaConnEvent)),
}
#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum CaConnSetItem {
Healthy,
}
#[derive(Clone)]
pub struct CaConnSetCtrl {
tx: Sender<CaConnSetEvent>,
pub rx: Receiver<CaConnSetItem>,
rx: Receiver<CaConnSetItem>,
jh: JoinHandle<Result<(), Error>>,
}
impl CaConnSetCtrl {
pub fn receiver(&self) -> Receiver<CaConnSetItem> {
self.rx.clone()
}
pub async fn add_channel(&self, backend: String, name: String, local_epics_hostname: String) -> Result<(), Error> {
let cmd = ChannelAdd {
backend,
@@ -149,6 +159,13 @@ impl CaConnSetCtrl {
Ok(())
}
pub async fn remove_channel(&self, name: String) -> Result<(), Error> {
let cmd = ChannelRemove { name };
let cmd = ConnSetCmd::ChannelRemove(cmd);
self.tx.send(CaConnSetEvent::ConnSetCmd(cmd)).await?;
Ok(())
}
pub async fn shutdown(&self) -> Result<(), Error> {
let cmd = ConnSetCmd::Shutdown;
self.tx.send(CaConnSetEvent::ConnSetCmd(cmd)).await?;
@@ -160,6 +177,11 @@ impl CaConnSetCtrl {
self.tx.send(CaConnSetEvent::ConnSetCmd(cmd)).await?;
Ok(())
}
pub async fn join(self) -> Result<(), Error> {
self.jh.await.map_err(|e| Error::with_msg_no_trace(e.to_string()))??;
Ok(())
}
}
#[derive(Debug)]
@@ -198,6 +220,7 @@ pub struct CaConnSet {
chan_check_next: Option<Channel>,
stats: CaConnSetStats,
connset_out_tx: Sender<CaConnSetItem>,
ioc_finder_jh: JoinHandle<Result<(), Error>>,
}
impl CaConnSet {
@@ -226,12 +249,14 @@ impl CaConnSet {
chan_check_next: None,
stats: CaConnSetStats::new(),
connset_out_tx,
ioc_finder_jh,
};
// TODO await on jh
let jh = tokio::spawn(CaConnSet::run(connset));
CaConnSetCtrl {
tx: connset_tx,
rx: connset_out_rx,
jh,
}
}
@@ -241,15 +266,31 @@ impl CaConnSet {
match x {
Ok(ev) => this.handle_event(ev).await?,
Err(_) => {
if this.shutdown_done {
if this.shutdown_stopping {
// all fine
break Ok(());
break;
} else {
error!("channel closed without shutdown_done");
error!("channel closed without shutdown_stopping");
}
}
}
if this.shutdown_stopping {
break;
}
}
debug!(
"search_tx sender {} receiver {}",
this.search_tx.sender_count(),
this.search_tx.receiver_count()
);
this.ioc_finder_jh
.await
.map_err(|e| Error::with_msg_no_trace(e.to_string()))??;
debug!("joined ioc_finder_jh");
this.connset_out_tx.close();
this.connset_rx.close();
this.shutdown_done = true;
Ok(())
}
async fn handle_event(&mut self, ev: CaConnSetEvent) -> Result<(), Error> {
@@ -258,6 +299,7 @@ impl CaConnSet {
ConnSetCmd::ChannelAdd(x) => self.handle_add_channel(x).await,
ConnSetCmd::ChannelAddWithStatusId(x) => self.handle_add_channel_with_status_id(x).await,
ConnSetCmd::ChannelAddWithAddr(x) => self.handle_add_channel_with_addr(x).await,
ConnSetCmd::ChannelRemove(x) => self.handle_remove_channel(x).await,
ConnSetCmd::IocAddrQueryResult(x) => self.handle_ioc_query_result(x).await,
ConnSetCmd::SeriesLookupResult(x) => self.handle_series_lookup_result(x).await,
ConnSetCmd::CheckHealth => self.handle_check_health().await,
@@ -301,6 +343,10 @@ impl CaConnSet {
}
async fn handle_add_channel(&mut self, add: ChannelAdd) -> Result<(), Error> {
if self.shutdown_stopping {
debug!("handle_add_channel but shutdown_stopping");
return Ok(());
}
// TODO should I add the transition through ActiveChannelState::Init as well?
let ch = Channel::new(add.name.clone());
let _st = self.channel_states.inner().entry(ch).or_insert_with(|| ChannelState {
@@ -322,6 +368,10 @@ impl CaConnSet {
}
async fn handle_add_channel_with_status_id(&mut self, add: ChannelAddWithStatusId) -> Result<(), Error> {
if self.shutdown_stopping {
debug!("handle_add_channel but shutdown_stopping");
return Ok(());
}
debug!("handle_add_channel_with_status_id {add:?}");
let ch = Channel::new(add.name.clone());
if let Some(chst) = self.channel_states.inner().get_mut(&ch) {
@@ -350,6 +400,10 @@ impl CaConnSet {
}
async fn handle_add_channel_with_addr(&mut self, add: ChannelAddWithAddr) -> Result<(), Error> {
if self.shutdown_stopping {
debug!("handle_add_channel but shutdown_stopping");
return Ok(());
}
if !self.ca_conn_ress.contains_key(&add.addr) {
let c = self.create_ca_conn(add.clone())?;
self.ca_conn_ress.insert(add.addr, c);
@@ -360,6 +414,43 @@ impl CaConnSet {
Ok(())
}
async fn handle_remove_channel(&mut self, add: ChannelRemove) -> Result<(), Error> {
let ch = Channel::new(add.name);
if let Some(k) = self.channel_states.inner().get_mut(&ch) {
match &k.value {
ChannelStateValue::Active(j) => match j {
ActiveChannelState::Init { .. } => {
k.value = ChannelStateValue::ToRemove { addr: None };
}
ActiveChannelState::WaitForStatusSeriesId { .. } => {
k.value = ChannelStateValue::ToRemove { addr: None };
}
ActiveChannelState::WithStatusSeriesId {
status_series_id: _,
state,
} => match &state.inner {
WithStatusSeriesIdStateInner::UnknownAddress { .. } => {
k.value = ChannelStateValue::ToRemove { addr: None };
}
WithStatusSeriesIdStateInner::SearchPending { .. } => {
k.value = ChannelStateValue::ToRemove { addr: None };
}
WithStatusSeriesIdStateInner::WithAddress { addr, state: _ } => {
k.value = ChannelStateValue::ToRemove {
addr: Some(addr.clone()),
};
}
WithStatusSeriesIdStateInner::NoAddress { .. } => {
k.value = ChannelStateValue::ToRemove { addr: None };
}
},
},
ChannelStateValue::ToRemove { .. } => {}
}
}
Ok(())
}
async fn handle_ioc_query_result(&mut self, res: VecDeque<FindIocRes>) -> Result<(), Error> {
for e in res {
let ch = Channel::new(e.channel.clone());
@@ -416,6 +507,7 @@ impl CaConnSet {
debug!("TODO handle_shutdown");
debug!("shutdown received");
self.shutdown_stopping = true;
self.search_tx.close();
for (addr, res) in self.ca_conn_ress.iter() {
let item = ConnCommand::shutdown();
res.sender.send(item).await?;
@@ -428,16 +520,20 @@ impl CaConnSet {
if let Some(e) = self.ca_conn_ress.remove(&addr) {
match e.jh.await {
Ok(Ok(())) => {
self.stats.ca_conn_task_join_done_ok_inc();
debug!("CaConn {addr} finished well");
}
Ok(Err(e)) => {
self.stats.ca_conn_task_join_done_err_inc();
error!("CaConn {addr} task error: {e}");
}
Err(e) => {
self.stats.ca_conn_task_join_err_inc();
error!("CaConn {addr} join error: {e}");
}
}
} else {
self.stats.ca_conn_task_eos_non_exist_inc();
warn!("end-of-stream received for non-existent CaConn {addr}");
}
Ok(())

View File

@@ -338,14 +338,9 @@ fn start_finder_ca(tx: Sender<DaemonEvent>, tgts: Vec<SocketAddrV4>) -> (Sender<
taskrun::spawn({
async move {
while let Ok(item) = arx.recv().await {
match tx.send(DaemonEvent::SearchDone(item)).await {
Ok(_) => {}
Err(e) => {
error!("search res fwd {e}");
}
}
todo!("send the result item");
}
warn!("search res fwd nput broken");
warn!("search res fwd inp closed");
}
});
(qtx, ioc_finder_jh)

View File

@@ -1,11 +1,6 @@
use crate::ca::conn::CaConnEvent;
use crate::ca::connset::CaConnSetItem;
use crate::ca::findioc::FindIocRes;
use async_channel::Sender;
use err::Error;
use serde::Serialize;
use std::collections::VecDeque;
use std::net::SocketAddrV4;
#[derive(Clone, Debug, Serialize, PartialEq, PartialOrd, Eq, Ord)]
pub struct Channel {
@@ -22,13 +17,11 @@ impl Channel {
}
}
#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum DaemonEvent {
TimerTick(u32, Sender<u32>),
ChannelAdd(Channel),
ChannelRemove(Channel),
SearchDone(Result<VecDeque<FindIocRes>, Error>),
CaConnEvent(SocketAddrV4, CaConnEvent),
CaConnSetItem(CaConnSetItem),
Shutdown,
}
@@ -40,17 +33,6 @@ impl DaemonEvent {
TimerTick(_, _) => format!("TimerTick"),
ChannelAdd(x) => format!("ChannelAdd {x:?}"),
ChannelRemove(x) => format!("ChannelRemove {x:?}"),
SearchDone(_x) => format!("SearchDone"),
CaConnEvent(_a, b) => {
use crate::ca::conn::CaConnEventValue::*;
match &b.value {
None => format!("CaConnEvent/None"),
EchoTimeout => format!("CaConnEvent/EchoTimeout"),
ConnCommandResult(_) => format!("CaConnEvent/ConnCommandResult"),
QueryItem(_) => format!("CaConnEvent/QueryItem"),
EndOfStream => format!("CaConnEvent/EndOfStream"),
}
}
CaConnSetItem(_) => format!("CaConnSetItem"),
Shutdown => format!("Shutdown"),
}

View File

@@ -270,11 +270,10 @@ pub async fn metrics_agg_task(
}
}
{
let val = ingest_commons
.insert_item_queue
.receiver()
.map_or(0, |x| x.len() as u64);
agg.store_worker_recv_queue_len.store(val, Ordering::Release);
warn!("TODO provide metrics with a weak ref to the query_item_channel");
let nitems = 0;
// let nitems = weak.upgrade()..len();
agg.store_worker_recv_queue_len.store(nitems, Ordering::Release);
}
let mut m = METRICS.lock().unwrap();
*m = Some(agg.clone());

View File

@@ -42,11 +42,15 @@ impl<T> SenderPolling<T> {
ret
}
pub fn is_idle(&self) -> bool {
self.fut.is_none()
}
pub fn is_sending(&self) -> bool {
self.fut.is_some()
}
pub fn send(self: Pin<&mut Self>, item: T) {
pub fn send_pin(self: Pin<&mut Self>, item: T) {
let (tx, fut) = unsafe {
let x = Pin::get_unchecked_mut(self);
(x.sender_ptr.as_mut(), &mut x.fut)
@@ -55,7 +59,7 @@ impl<T> SenderPolling<T> {
*fut = Some(s);
}
pub fn send2(&mut self, item: T) {
pub fn send(&mut self, item: T) {
let sender = unsafe { self.sender_ptr.as_mut() };
let s = sender.send(item);
self.fut = Some(s);

View File

@@ -10,6 +10,7 @@ use log::*;
use netpod::timeunits::MS;
use netpod::timeunits::SEC;
use netpod::ScyllaConfig;
use stats::CaConnStats;
use std::sync::atomic;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
@@ -129,6 +130,175 @@ fn rate_limiter(
rx
}
async fn worker(
worker_ix: usize,
item_inp: Receiver<QueryItem>,
ttls: Ttls,
insert_worker_opts: Arc<InsertWorkerOpts>,
data_store: Arc<DataStore>,
stats: Arc<CaConnStats>,
) -> Result<(), Error> {
insert_worker_opts
.insert_workers_running
.fetch_add(1, atomic::Ordering::AcqRel);
let backoff_0 = Duration::from_millis(10);
let mut backoff = backoff_0.clone();
let mut i1 = 0;
loop {
let item = if let Ok(item) = item_inp.recv().await {
stats.store_worker_item_recv_inc();
item
} else {
break;
};
match item {
QueryItem::ConnectionStatus(item) => {
match insert_connection_status(item, ttls.index, &data_store, &stats).await {
Ok(_) => {
stats.connection_status_insert_done_inc();
backoff = backoff_0;
}
Err(e) => {
stats_inc_for_err(&stats, &e);
back_off_sleep(&mut backoff).await;
}
}
}
QueryItem::ChannelStatus(item) => {
match insert_channel_status(item, ttls.index, &data_store, &stats).await {
Ok(_) => {
stats.channel_status_insert_done_inc();
backoff = backoff_0;
}
Err(e) => {
stats_inc_for_err(&stats, &e);
back_off_sleep(&mut backoff).await;
}
}
}
QueryItem::Insert(item) => {
let insert_frac = insert_worker_opts.insert_frac.load(Ordering::Acquire);
if i1 % 1000 < insert_frac {
match insert_item(item, ttls.index, ttls.d0, ttls.d1, &data_store, &stats).await {
Ok(_) => {
stats.store_worker_insert_done_inc();
backoff = backoff_0;
}
Err(e) => {
stats_inc_for_err(&stats, &e);
back_off_sleep(&mut backoff).await;
}
}
} else {
stats.store_worker_fraction_drop_inc();
}
i1 += 1;
}
QueryItem::Mute(item) => {
let values = (
(item.series.id() & 0xff) as i32,
item.series.id() as i64,
item.ts as i64,
item.ema,
item.emd,
ttls.index.as_secs() as i32,
);
let qres = data_store.scy.execute(&data_store.qu_insert_muted, values).await;
match qres {
Ok(_) => {
stats.mute_insert_done_inc();
backoff = backoff_0;
}
Err(e) => {
stats_inc_for_err(&stats, &crate::iteminsertqueue::Error::QueryError(e));
back_off_sleep(&mut backoff).await;
}
}
}
QueryItem::Ivl(item) => {
let values = (
(item.series.id() & 0xff) as i32,
item.series.id() as i64,
item.ts as i64,
item.ema,
item.emd,
ttls.index.as_secs() as i32,
);
let qres = data_store
.scy
.execute(&data_store.qu_insert_item_recv_ivl, values)
.await;
match qres {
Ok(_) => {
stats.ivl_insert_done_inc();
backoff = backoff_0;
}
Err(e) => {
stats_inc_for_err(&stats, &crate::iteminsertqueue::Error::QueryError(e));
back_off_sleep(&mut backoff).await;
}
}
}
QueryItem::ChannelInfo(item) => {
let params = (
(item.series.id() & 0xff) as i32,
item.ts_msp as i32,
item.series.id() as i64,
item.ivl,
item.interest,
item.evsize as i32,
ttls.index.as_secs() as i32,
);
let qres = data_store.scy.execute(&data_store.qu_insert_channel_ping, params).await;
match qres {
Ok(_) => {
stats.channel_info_insert_done_inc();
backoff = backoff_0;
}
Err(e) => {
stats_inc_for_err(&stats, &crate::iteminsertqueue::Error::QueryError(e));
back_off_sleep(&mut backoff).await;
}
}
}
QueryItem::TimeBinPatchSimpleF32(item) => {
info!("have time bin patch to insert: {item:?}");
let params = (
item.series.id() as i64,
item.bin_len_sec as i32,
item.bin_count as i32,
item.off_msp as i32,
item.off_lsp as i32,
item.counts,
item.mins,
item.maxs,
item.avgs,
ttls.binned.as_secs() as i32,
);
let qres = data_store
.scy
.execute(&data_store.qu_insert_binned_scalar_f32_v01, params)
.await;
match qres {
Ok(_) => {
stats.store_worker_insert_binned_done_inc();
backoff = backoff_0;
}
Err(e) => {
stats_inc_for_err(&stats, &crate::iteminsertqueue::Error::QueryError(e));
back_off_sleep(&mut backoff).await;
}
}
}
}
}
insert_worker_opts
.insert_workers_running
.fetch_sub(1, atomic::Ordering::AcqRel);
trace!("insert worker {worker_ix} done");
Ok(())
}
pub async fn spawn_scylla_insert_workers(
scyconf: ScyllaConfig,
insert_scylla_sessions: usize,
@@ -138,7 +308,7 @@ pub async fn spawn_scylla_insert_workers(
store_stats: Arc<stats::CaConnStats>,
use_rate_limit_queue: bool,
ttls: Ttls,
) -> Result<Vec<JoinHandle<()>>, Error> {
) -> Result<Vec<JoinHandle<Result<(), Error>>>, Error> {
let item_inp = if use_rate_limit_queue {
rate_limiter(item_inp, insert_worker_opts.clone(), store_stats.clone())
} else {
@@ -151,171 +321,15 @@ pub async fn spawn_scylla_insert_workers(
data_stores.push(data_store);
}
for worker_ix in 0..insert_worker_count {
let item_inp = item_inp.clone();
let data_store = data_stores[worker_ix * data_stores.len() / insert_worker_count].clone();
let stats = store_stats.clone();
let insert_worker_opts = insert_worker_opts.clone();
let fut = async move {
insert_worker_opts
.insert_workers_running
.fetch_add(1, atomic::Ordering::AcqRel);
let backoff_0 = Duration::from_millis(10);
let mut backoff = backoff_0.clone();
let mut i1 = 0;
loop {
let item = if let Ok(item) = item_inp.recv().await {
stats.store_worker_item_recv_inc();
item
} else {
break;
};
match item {
QueryItem::ConnectionStatus(item) => {
match insert_connection_status(item, ttls.index, &data_store, &stats).await {
Ok(_) => {
stats.connection_status_insert_done_inc();
backoff = backoff_0;
}
Err(e) => {
stats_inc_for_err(&stats, &e);
back_off_sleep(&mut backoff).await;
}
}
}
QueryItem::ChannelStatus(item) => {
match insert_channel_status(item, ttls.index, &data_store, &stats).await {
Ok(_) => {
stats.channel_status_insert_done_inc();
backoff = backoff_0;
}
Err(e) => {
stats_inc_for_err(&stats, &e);
back_off_sleep(&mut backoff).await;
}
}
}
QueryItem::Insert(item) => {
let insert_frac = insert_worker_opts.insert_frac.load(Ordering::Acquire);
if i1 % 1000 < insert_frac {
match insert_item(item, ttls.index, ttls.d0, ttls.d1, &data_store, &stats).await {
Ok(_) => {
stats.store_worker_insert_done_inc();
backoff = backoff_0;
}
Err(e) => {
stats_inc_for_err(&stats, &e);
back_off_sleep(&mut backoff).await;
}
}
} else {
stats.store_worker_fraction_drop_inc();
}
i1 += 1;
}
QueryItem::Mute(item) => {
let values = (
(item.series.id() & 0xff) as i32,
item.series.id() as i64,
item.ts as i64,
item.ema,
item.emd,
ttls.index.as_secs() as i32,
);
let qres = data_store.scy.execute(&data_store.qu_insert_muted, values).await;
match qres {
Ok(_) => {
stats.mute_insert_done_inc();
backoff = backoff_0;
}
Err(e) => {
stats_inc_for_err(&stats, &crate::iteminsertqueue::Error::QueryError(e));
back_off_sleep(&mut backoff).await;
}
}
}
QueryItem::Ivl(item) => {
let values = (
(item.series.id() & 0xff) as i32,
item.series.id() as i64,
item.ts as i64,
item.ema,
item.emd,
ttls.index.as_secs() as i32,
);
let qres = data_store
.scy
.execute(&data_store.qu_insert_item_recv_ivl, values)
.await;
match qres {
Ok(_) => {
stats.ivl_insert_done_inc();
backoff = backoff_0;
}
Err(e) => {
stats_inc_for_err(&stats, &crate::iteminsertqueue::Error::QueryError(e));
back_off_sleep(&mut backoff).await;
}
}
}
QueryItem::ChannelInfo(item) => {
let params = (
(item.series.id() & 0xff) as i32,
item.ts_msp as i32,
item.series.id() as i64,
item.ivl,
item.interest,
item.evsize as i32,
ttls.index.as_secs() as i32,
);
let qres = data_store.scy.execute(&data_store.qu_insert_channel_ping, params).await;
match qres {
Ok(_) => {
stats.channel_info_insert_done_inc();
backoff = backoff_0;
}
Err(e) => {
stats_inc_for_err(&stats, &crate::iteminsertqueue::Error::QueryError(e));
back_off_sleep(&mut backoff).await;
}
}
}
QueryItem::TimeBinPatchSimpleF32(item) => {
info!("have time bin patch to insert: {item:?}");
let params = (
item.series.id() as i64,
item.bin_len_sec as i32,
item.bin_count as i32,
item.off_msp as i32,
item.off_lsp as i32,
item.counts,
item.mins,
item.maxs,
item.avgs,
ttls.binned.as_secs() as i32,
);
let qres = data_store
.scy
.execute(&data_store.qu_insert_binned_scalar_f32_v01, params)
.await;
match qres {
Ok(_) => {
stats.store_worker_insert_binned_done_inc();
backoff = backoff_0;
}
Err(e) => {
stats_inc_for_err(&stats, &crate::iteminsertqueue::Error::QueryError(e));
back_off_sleep(&mut backoff).await;
}
}
}
}
}
insert_worker_opts
.insert_workers_running
.fetch_sub(1, atomic::Ordering::AcqRel);
trace!("insert worker {worker_ix} done");
};
let jh = tokio::spawn(fut);
let jh = tokio::spawn(worker(
worker_ix,
item_inp.clone(),
ttls.clone(),
insert_worker_opts.clone(),
data_store,
store_stats.clone(),
));
jhs.push(jh);
}
Ok(jhs)

View File

@@ -217,6 +217,10 @@ stats_proc::stats_struct!((
channel_with_address,
channel_unassigned,
channel_assigned,
ca_conn_task_join_done_ok,
ca_conn_task_join_done_err,
ca_conn_task_join_err,
ca_conn_task_eos_non_exist,
),
),
// agg(name(CaConnSetStatsAgg), parent(CaConnSetStats)),