This commit is contained in:
Dominik Werder
2023-09-07 22:47:54 +02:00
parent 649011b2e2
commit 4a31f3f81f
5 changed files with 274 additions and 191 deletions
+6 -3
View File
@@ -78,8 +78,10 @@ jobs:
working-directory: ${{steps.wdset.outputs.gh}}
- run: git clone --branch dev https://github.com/paulscherrerinstitute/daqbuffer.git
working-directory: ${{steps.wdset.outputs.gh}}/build
- run: git clone --branch dev https://github.com/paulscherrerinstitute/daqingest.git
- run: git clone https://github.com/paulscherrerinstitute/daqingest.git
working-directory: ${{steps.wdset.outputs.gh}}/build
- run: git reset --hard $GITHUB_SHA
working-directory: ${{steps.wdset.outputs.gh}}/build/daqingest
# - run: ls -la $GITHUB_WORKSPACE
# - run: find $GITHUB_WORKSPACE -type f -and \( -name \*.rs -or -name \*.toml \)
# - run: find ${{steps.wdset.outputs.gh}} -type f -and \( -name \*.rs -or -name \*.toml \)
@@ -102,8 +104,9 @@ jobs:
- run: "echo 'version: [${{env.DAQVER}}]'"
- run: echo "SELFPKG=daqingest-$DAQVER" >> $GITHUB_ENV
- run: echo "SELFPKGTGT=$SELFPKG-amd64-rhel7" >> $GITHUB_ENV
- run: echo SELFPKG $SELFPKG
- run: echo SELFPKGTGT $SELFPKGTGT
- run: echo DAQVER ..$DAQVER..
- run: echo SELFPKG ..$SELFPKG..
- run: echo SELFPKGTGT ..$SELFPKGTGT..
- run: mkdir $SELFPKGTGT
- run: cp ${{steps.wdset.outputs.gh}}/build/daqingest/target/release/daqingest $SELFPKGTGT/daqingest
- run: tar -czf $SELFPKGTGT.tar.gz $SELFPKGTGT
+51 -17
View File
@@ -11,6 +11,7 @@ use netfetch::ca::conn::CaConnEvent;
use netfetch::ca::conn::ConnCommand;
use netfetch::ca::connset::CaConnSet;
use netfetch::ca::connset::CaConnSetCtrl;
use netfetch::ca::connset::CaConnSetItem;
use netfetch::ca::findioc::FindIocRes;
use netfetch::ca::IngestCommons;
use netfetch::ca::SlowWarnable;
@@ -95,8 +96,8 @@ pub struct Daemon {
stats: Arc<DaemonStats>,
shutting_down: bool,
insert_rx_weak: WeakReceiver<QueryItem>,
channel_info_query_tx: Sender<ChannelInfoQuery>,
connset_ctrl: CaConnSetCtrl,
connset_status_last: Instant,
query_item_tx: Sender<QueryItem>,
}
@@ -124,10 +125,27 @@ impl Daemon {
opts.backend.clone(),
opts.local_epics_hostname.clone(),
common_insert_item_queue.sender().unwrap().inner().clone(),
channel_info_query_tx.clone(),
channel_info_query_tx,
opts.pgconf.clone(),
);
// TODO remove
tokio::spawn({
let rx = conn_set_ctrl.rx.clone();
let tx = daemon_ev_tx.clone();
async move {
loop {
match rx.recv().await {
Ok(item) => {
let item = DaemonEvent::CaConnSetItem(item);
tx.send(item).await;
}
Err(e) => break,
}
}
}
});
let ingest_commons = IngestCommons {
pgconf: Arc::new(opts.pgconf.clone()),
backend: opts.backend().into(),
@@ -211,8 +229,8 @@ impl Daemon {
stats: Arc::new(DaemonStats::new()),
shutting_down: false,
insert_rx_weak: common_insert_item_queue_2.downgrade(),
channel_info_query_tx,
connset_ctrl: conn_set_ctrl,
connset_status_last: Instant::now(),
query_item_tx: common_insert_item_queue.sender().unwrap().inner().clone(),
};
Ok(ret)
@@ -268,10 +286,12 @@ impl Daemon {
warn!("Received SIGTERM");
SIGTERM.store(2, atomic::Ordering::Release);
}
warn!("TODO let CaConnSet check health");
// TODO
// self.check_connection_states()?;
// self.check_channel_states().await?;
if self.connset_status_last + Duration::from_millis(2000) < ts1 {
self.connset_ctrl.check_health().await?;
}
if self.connset_status_last + Duration::from_millis(10000) < ts1 {
error!("CaConnSet has not reported health status");
}
let dt = ts1.elapsed();
if dt > Duration::from_millis(500) {
info!("slow check_chans {}ms", dt.as_secs_f32() * 1e3);
@@ -563,16 +583,31 @@ impl Daemon {
}
}
async fn handle_ca_conn_set_item(&mut self, item: CaConnSetItem) -> Result<(), Error> {
use CaConnSetItem::*;
match item {
Healthy => {
self.connset_status_last = Instant::now();
}
}
Ok(())
}
async fn handle_shutdown(&mut self) -> Result<(), Error> {
error!("TODO handle_shutdown");
// TODO make sure we:
// set a flag so that we don't attempt to use resources any longer (why could that happen?)
// does anybody might still want to communicate with us? can't be excluded.
// send shutdown signal to everyone.
// drop our ends of channels to workers (gate them behind option?).
// await the connection sets.
// await other workers that we've spawned.
self.connset_ctrl.shutdown().await?;
if self.shutting_down {
warn!("already shutting down");
} else {
self.shutting_down = true;
// TODO make sure we:
// set a flag so that we don't attempt to use resources any longer (why could that happen?)
// does anybody might still want to communicate with us? can't be excluded.
// send shutdown signal to everyone.
// drop our ends of channels to workers (gate them behind option?).
// await the connection sets.
// await other workers that we've spawned.
self.connset_ctrl.shutdown().await?;
}
Ok(())
}
@@ -580,10 +615,8 @@ impl Daemon {
async fn handle_shutdown(&mut self) -> Result<(), Error> {
warn!("received shutdown event");
if self.shutting_down {
info!("already shutting down");
Ok(())
} else {
self.shutting_down = true;
self.channel_states.clear();
self.ca_conn_send_shutdown().await?;
self.ingest_commons.insert_item_queue.drop_sender();
@@ -616,6 +649,7 @@ impl Daemon {
ChannelRemove(ch) => self.handle_channel_remove(ch).await,
SearchDone(item) => self.handle_search_done(item).await,
CaConnEvent(addr, item) => self.handle_ca_conn_event(addr, item).await,
CaConnSetItem(item) => self.handle_ca_conn_set_item(item).await,
Shutdown => self.handle_shutdown().await,
};
let dt = ts1.elapsed();
+154 -156
View File
@@ -13,7 +13,6 @@ use dbpg::seriesbychannel::CanSendChannelInfoResult;
use dbpg::seriesbychannel::ChannelInfoQuery;
use dbpg::seriesbychannel::ChannelInfoResult;
use err::Error;
use futures_util::stream::FuturesUnordered;
use futures_util::Future;
use futures_util::FutureExt;
use futures_util::Stream;
@@ -36,7 +35,6 @@ use scywriiq::IvlItem;
use scywriiq::MuteItem;
use scywriiq::QueryItem;
use serde::Serialize;
use series::series::Existence;
use series::ChannelStatusSeriesId;
use series::SeriesId;
use stats::CaConnStats;
@@ -455,7 +453,6 @@ pub struct CaConn {
cid_by_subid: BTreeMap<u32, Cid>,
name_by_cid: BTreeMap<Cid, String>,
insert_item_queue: VecDeque<QueryItem>,
sender_polling: SenderPolling<QueryItem>,
remote_addr_dbg: SocketAddrV4,
local_epics_hostname: String,
stats: Arc<CaConnStats>,
@@ -473,7 +470,6 @@ pub struct CaConn {
channel_info_query_queue: VecDeque<ChannelInfoQuery>,
channel_info_query_sending: SenderPolling<ChannelInfoQuery>,
time_binners: BTreeMap<Cid, ConnTimeBin>,
ts_earliest_warn_poll_slow: Instant,
}
impl Drop for CaConn {
@@ -505,7 +501,6 @@ impl CaConn {
cid_by_subid: BTreeMap::new(),
name_by_cid: BTreeMap::new(),
insert_item_queue: VecDeque::new(),
sender_polling: SenderPolling::new(async_channel::bounded(1).0),
remote_addr_dbg,
local_epics_hostname,
stats: Arc::new(CaConnStats::new()),
@@ -523,12 +518,11 @@ impl CaConn {
channel_info_query_queue: VecDeque::new(),
channel_info_query_sending: SenderPolling::new(channel_info_query_tx),
time_binners: BTreeMap::new(),
ts_earliest_warn_poll_slow: Instant::now(),
}
}
fn new_self_ticker() -> Pin<Box<tokio::time::Sleep>> {
Box::pin(tokio::time::sleep(Duration::from_millis(1000)))
Box::pin(tokio::time::sleep(Duration::from_millis(500)))
}
pub fn conn_command_tx(&self) -> async_channel::Sender<ConnCommand> {
@@ -623,6 +617,7 @@ impl CaConn {
}
fn cmd_shutdown(&mut self) {
debug!("cmd_shutdown {}", self.remote_addr_dbg);
self.trigger_shutdown(ChannelStatusClosedReason::ShutdownCommand);
}
@@ -660,17 +655,17 @@ impl CaConn {
match self.channel_to_evented(cid, sid, data_type, data_count, series) {
Ok(_) => {}
Err(e) => {
error!("channel_to_evented {e}");
error!("handle_series_lookup_result {e}");
}
}
} else {
warn!("TODO channel in bad state, reset");
warn!("TODO handle_series_lookup_result channel in bad state, reset");
}
} else {
warn!("TODO channel in bad state, reset");
warn!("TODO handle_series_lookup_result channel in bad state, reset");
}
} else {
warn!("TODO channel in bad state, reset");
warn!("TODO handle_series_lookup_result channel in bad state, reset");
}
}
Err(e) => {
@@ -683,11 +678,10 @@ impl CaConn {
fn handle_conn_command(&mut self, cx: &mut Context) -> Poll<Option<Result<(), Error>>> {
// TODO if this loops for too long time, yield and make sure we get wake up again.
use Poll::*;
trace!("handle_conn_command {}", self.remote_addr_dbg);
self.stats.caconn_loop3_count_inc();
match self.conn_command_rx.poll_next_unpin(cx) {
Ready(Some(a)) => {
trace!("handle_conn_command received a command");
trace!("handle_conn_command received a command {}", self.remote_addr_dbg);
match a.kind {
ConnCommandKind::ChannelAdd(name, cssid) => {
self.cmd_channel_add(name, cssid);
@@ -1019,9 +1013,6 @@ impl CaConn {
info_store_msp_last: info_store_msp_from_time(SystemTime::now()),
};
*ch_s = ChannelState::Created(series, created_state);
let scalar_type = ScalarType::from_ca_id(data_type)?;
let shape = Shape::from_ca_count(data_count)?;
error!("TODO channel_to_evented make sure we get polled again?");
Ok(())
}
@@ -1534,8 +1525,7 @@ impl CaConn {
Break(Pending)
}
// `?` works not in here.
fn handle_conn_state(&mut self, cx: &mut Context) -> Option<Poll<Result<(), Error>>> {
fn handle_conn_state(&mut self, cx: &mut Context) -> Result<Option<Poll<()>>, Error> {
use Poll::*;
match &mut self.state {
CaConnState::Unconnected => {
@@ -1543,7 +1533,7 @@ impl CaConn {
trace!("create tcp connection to {:?}", (addr.ip(), addr.port()));
let fut = tokio::time::timeout(Duration::from_millis(1000), TcpStream::connect(addr));
self.state = CaConnState::Connecting(addr, Box::pin(fut));
None
Ok(None)
}
CaConnState::Connecting(ref addr, ref mut fut) => {
match fut.poll_unpin(cx) {
@@ -1561,7 +1551,7 @@ impl CaConn {
let proto = CaProto::new(tcp, self.remote_addr_dbg.clone(), self.opts.array_truncate);
self.state = CaConnState::Init;
self.proto = Some(proto);
None
Ok(None)
}
Ok(Err(_e)) => {
// TODO log with exponential backoff
@@ -1575,7 +1565,7 @@ impl CaConn {
let dt = self.backoff_next();
self.state = CaConnState::Wait(wait_fut(dt));
self.proto = None;
None
Ok(None)
}
Err(e) => {
// TODO log with exponential backoff
@@ -1590,11 +1580,11 @@ impl CaConn {
let dt = self.backoff_next();
self.state = CaConnState::Wait(wait_fut(dt));
self.proto = None;
None
Ok(None)
}
}
}
Pending => Some(Pending),
Pending => Ok(Some(Pending)),
}
}
CaConnState::Init => {
@@ -1611,49 +1601,52 @@ impl CaConn {
};
proto.push_out(msg);
self.state = CaConnState::Listen;
None
Ok(None)
}
CaConnState::Listen => match {
let res = self.handle_conn_listen(cx);
res
} {
Ready(Some(Ok(()))) => Some(Ready(Ok(()))),
Ready(Some(Err(e))) => Some(Ready(Err(e))),
Ready(None) => None,
Pending => Some(Pending),
Ready(Some(Ok(()))) => Ok(Some(Ready(()))),
Ready(Some(Err(e))) => Err(e),
Ready(None) => Ok(None),
Pending => Ok(Some(Pending)),
},
CaConnState::PeerReady => {
let res = self.handle_peer_ready(cx);
match res {
Ready(Some(Ok(()))) => None,
Ready(Some(Err(e))) => Some(Ready(Err(e))),
Ready(None) => None,
Pending => Some(Pending),
Ready(Some(Ok(()))) => Ok(None),
Ready(Some(Err(e))) => Err(e),
Ready(None) => Ok(None),
Pending => Ok(Some(Pending)),
}
}
CaConnState::Wait(inst) => match inst.poll_unpin(cx) {
Ready(_) => {
self.state = CaConnState::Unconnected;
self.proto = None;
None
Ok(None)
}
Pending => Some(Pending),
Pending => Ok(Some(Pending)),
},
CaConnState::Shutdown => None,
CaConnState::Shutdown => Ok(None),
}
}
fn loop_inner(&mut self, cx: &mut Context) -> Option<Poll<Result<(), Error>>> {
fn loop_inner(&mut self, cx: &mut Context) -> Result<Option<Poll<()>>, Error> {
use Poll::*;
loop {
self.stats.caconn_loop2_count_inc();
if let Some(v) = self.handle_conn_state(cx) {
break Some(v);
if self.is_shutdown() {
break Ok(None);
}
if self.insert_item_queue.len() >= self.opts.insert_queue_max {
break None;
break Ok(None);
}
if self.is_shutdown() {
break None;
match self.handle_conn_state(cx)? {
Some(Ready(_)) => continue,
Some(Pending) => break Ok(Some(Pending)),
None => break Ok(None),
}
}
}
@@ -1699,7 +1692,31 @@ impl CaConn {
Self::apply_channel_ops_with_res(res)
}
fn handle_own_ticker_tick(mut self: Pin<&mut Self>, cx: &mut Context) -> Result<(), Error> {
fn handle_own_ticker(mut self: Pin<&mut Self>, cx: &mut Context) -> Result<(), Error> {
use Poll::*;
match self.ticker.poll_unpin(cx) {
Ready(()) => {
match self.as_mut().handle_own_ticker_tick(cx) {
Ok(_) => {
if !self.is_shutdown() {
self.ticker = Self::new_self_ticker();
let _ = self.ticker.poll_unpin(cx);
// cx.waker().wake_by_ref();
}
Ok(())
}
Err(e) => {
error!("handle_own_ticker {e}");
self.trigger_shutdown(ChannelStatusClosedReason::InternalError);
Err(e)
}
}
}
Pending => Ok(()),
}
}
fn handle_own_ticker_tick(self: Pin<&mut Self>, _cx: &mut Context) -> Result<(), Error> {
let this = self.get_mut();
for (_, tb) in this.time_binners.iter_mut() {
let iiq = &mut this.insert_item_queue;
@@ -1709,7 +1726,27 @@ impl CaConn {
}
fn outgoing_queues_empty(&self) -> bool {
self.insert_item_queue.is_empty() && !self.sender_polling.is_sending()
self.channel_info_query_queue.is_empty() && !self.channel_info_query_sending.is_sending()
}
fn attempt_flush_channel_info_query(mut self: Pin<&mut Self>, cx: &mut Context) -> Result<(), Error> {
use Poll::*;
loop {
let sd = &mut self.channel_info_query_sending;
break if sd.is_sending() {
match sd.poll_unpin(cx) {
Ready(Ok(())) => continue,
Ready(Err(e)) => Err(Error::with_msg_no_trace("can not send into channel")),
Pending => Ok(()),
}
} else if let Some(item) = self.channel_info_query_queue.pop_front() {
let sd = &mut self.channel_info_query_sending;
sd.send2(item);
continue;
} else {
Ok(())
};
}
}
}
@@ -1718,127 +1755,88 @@ impl Stream for CaConn {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
let poll_ts1 = Instant::now();
self.stats.caconn_poll_count_inc();
match self.ticker.poll_unpin(cx) {
Ready(()) => {
match self.as_mut().handle_own_ticker_tick(cx) {
Ok(_) => {
let _ = self.ticker.poll_unpin(cx);
}
Err(e) => {
error!("{e}");
self.trigger_shutdown(ChannelStatusClosedReason::InternalError);
return Ready(Some(Err(e)));
}
}
self.ticker = Self::new_self_ticker();
let _ = self.ticker.poll_unpin(cx);
// cx.waker().wake_by_ref();
}
Pending => {}
}
let ret = if let Some(item) = self.cmd_res_queue.pop_front() {
Ready(Some(Ok(CaConnEvent {
ts: Instant::now(),
value: CaConnEventValue::ConnCommandResult(item),
})))
} else if let Some(item) = self.ca_conn_event_out_queue.pop_front() {
Ready(Some(Ok(item)))
} else if let Some(item) = self.insert_item_queue.pop_front() {
let ev = CaConnEvent {
ts: Instant::now(),
value: CaConnEventValue::QueryItem(item),
};
Ready(Some(Ok(ev)))
} else {
let _ = loop {
let sd = &mut self.channel_info_query_sending;
break if sd.is_sending() {
match sd.poll_unpin(cx) {
Ready(Ok(())) => continue,
Ready(Err(e)) => Ready(Some(e)),
Pending => Pending,
}
} else if let Some(item) = self.channel_info_query_queue.pop_front() {
let sd = &mut self.channel_info_query_sending;
sd.send2(item);
continue;
} else {
Ready(None)
loop {
let mut have_pending = false;
break if let Err(e) = self.as_mut().handle_own_ticker(cx) {
Ready(Some(Err(e)))
} else if let Some(item) = self.cmd_res_queue.pop_front() {
let item = CaConnEvent {
ts: Instant::now(),
value: CaConnEventValue::ConnCommandResult(item),
};
};
let ret = loop {
self.stats.caconn_loop1_count_inc();
loop {
break if self.is_shutdown() {
()
} else {
match self.handle_conn_command(cx) {
Ready(Some(Ok(_))) => (),
Ready(Some(Err(e))) => {
error!("{e}");
self.trigger_shutdown(ChannelStatusClosedReason::InternalError);
()
}
Ready(None) => {
warn!("command input queue closed, do shutdown");
self.trigger_shutdown(ChannelStatusClosedReason::InternalError);
()
}
Pending => (),
}
};
}
Ready(Some(Ok(item)))
} else if let Some(item) = self.ca_conn_event_out_queue.pop_front() {
Ready(Some(Ok(item)))
} else if let Some(item) = self.insert_item_queue.pop_front() {
let ev = CaConnEvent {
ts: Instant::now(),
value: CaConnEventValue::QueryItem(item),
};
Ready(Some(Ok(ev)))
} else if let Err(e) = self.as_mut().attempt_flush_channel_info_query(cx) {
Ready(Some(Err(e)))
} else if let Ready(Some(Err(e))) = self.as_mut().handle_conn_command(cx) {
Ready(Some(Err(e)))
} else if let Some(item) = {
if self.is_shutdown() {
if self.outgoing_queues_empty() {
debug!("shut down and all items flushed {}", self.remote_addr_dbg);
break Ready(Ok(()));
} else {
// trace!("more items {}", self.insert_item_queue.len());
None
} else {
match self.loop_inner(cx) {
// TODO what does this mean: should we re-loop or yield something?
Ok(Some(Ready(()))) => None,
// This is the last step, so we yield Pending.
// But in general, this does not compose well when we would add another step.
Ok(Some(Pending)) => {
have_pending = true;
None
}
Ok(None) => None,
Err(e) => Some(Err(e)),
}
}
if self.insert_item_queue.len() >= self.opts.insert_queue_max {
break Pending;
}
if !self.is_shutdown() {
if let Some(v) = self.loop_inner(cx) {
break v;
}
} {
Ready(Some(item))
} else {
// Ready(_) => self.stats.conn_stream_ready_inc(),
// Pending => self.stats.conn_stream_pending_inc(),
let _item = CaConnEvent {
ts: Instant::now(),
value: CaConnEventValue::None,
};
if have_pending {
Pending
} else {
continue;
}
};
match &ret {
Ready(_) => self.stats.conn_stream_ready_inc(),
Pending => self.stats.conn_stream_pending_inc(),
}
if self.is_shutdown() && self.outgoing_queues_empty() {
debug!("end stream {}", self.remote_addr_dbg);
Ready(None)
} else {
match ret {
Ready(Ok(())) => {
let item = CaConnEvent {
ts: Instant::now(),
value: CaConnEventValue::None,
};
Ready(Some(Ok(item)))
}
Ready(Err(e)) => Ready(Some(Err(e))),
Pending => Pending,
}
}
};
{
let tsnow = Instant::now();
let dt = tsnow.saturating_duration_since(poll_ts1);
if dt > Duration::from_millis(40) {
if poll_ts1 > self.ts_earliest_warn_poll_slow {
// TODO factor out the rate limit logic in reusable type
self.ts_earliest_warn_poll_slow = tsnow + Duration::from_millis(2000);
warn!("slow poll: {}ms", dt.as_secs_f32() * 1e3);
}
}
}
}
}
pub struct PollTimer<INP> {
inp: INP,
}
impl<INP> PollTimer<INP> {
pub fn new(inp: INP) -> Self {
Self { inp }
}
}
impl<INP> Stream for PollTimer<INP>
where
INP: Stream + Unpin,
{
type Item = <INP as Stream>::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let poll_ts1 = Instant::now();
let inp = &mut self.inp;
let ret = inp.poll_next_unpin(cx);
let poll_ts2 = Instant::now();
let dt = poll_ts2.saturating_duration_since(poll_ts1);
if dt > Duration::from_millis(40) {}
ret
}
}
+60 -15
View File
@@ -126,9 +126,15 @@ pub enum CaConnSetEvent {
CaConnEvent((SocketAddr, CaConnEvent)),
}
#[derive(Debug)]
pub enum CaConnSetItem {
Healthy,
}
#[derive(Clone)]
pub struct CaConnSetCtrl {
tx: Sender<CaConnSetEvent>,
pub rx: Receiver<CaConnSetItem>,
}
impl CaConnSetCtrl {
@@ -187,9 +193,11 @@ pub struct CaConnSet {
connset_rx: Receiver<CaConnSetEvent>,
channel_info_query_tx: Sender<ChannelInfoQuery>,
storage_insert_tx: Sender<QueryItem>,
shutdown: bool,
shutdown_stopping: bool,
shutdown_done: bool,
chan_check_next: Option<Channel>,
stats: CaConnSetStats,
connset_out_tx: Sender<CaConnSetItem>,
}
impl CaConnSet {
@@ -200,6 +208,7 @@ impl CaConnSet {
channel_info_query_tx: Sender<ChannelInfoQuery>,
pgconf: Database,
) -> CaConnSetCtrl {
let (connset_out_tx, connset_out_rx) = async_channel::bounded(256);
let (connset_tx, connset_rx) = async_channel::bounded(10000);
let (search_tx, ioc_finder_jh) = super::finder::start_finder(connset_tx.clone(), backend.clone(), pgconf);
let connset = Self {
@@ -212,13 +221,18 @@ impl CaConnSet {
connset_rx,
channel_info_query_tx,
storage_insert_tx,
shutdown: false,
shutdown_stopping: false,
shutdown_done: false,
chan_check_next: None,
stats: CaConnSetStats::new(),
connset_out_tx,
};
// TODO await on jh
let jh = tokio::spawn(CaConnSet::run(connset));
CaConnSetCtrl { tx: connset_tx }
CaConnSetCtrl {
tx: connset_tx,
rx: connset_out_rx,
}
}
async fn run(mut this: CaConnSet) -> Result<(), Error> {
@@ -227,11 +241,11 @@ impl CaConnSet {
match x {
Ok(ev) => this.handle_event(ev).await?,
Err(_) => {
if this.shutdown {
if this.shutdown_done {
// all fine
break Ok(());
} else {
error!("channel closed without shutdown");
error!("channel closed without shutdown_done");
}
}
}
@@ -245,16 +259,9 @@ impl CaConnSet {
ConnSetCmd::ChannelAddWithStatusId(x) => self.handle_add_channel_with_status_id(x).await,
ConnSetCmd::ChannelAddWithAddr(x) => self.handle_add_channel_with_addr(x).await,
ConnSetCmd::IocAddrQueryResult(x) => self.handle_ioc_query_result(x).await,
ConnSetCmd::CheckHealth => {
error!("TODO implement check health");
Ok(())
}
ConnSetCmd::Shutdown => {
debug!("shutdown received");
self.shutdown = true;
Ok(())
}
ConnSetCmd::SeriesLookupResult(x) => self.handle_series_lookup_result(x).await,
ConnSetCmd::CheckHealth => self.handle_check_health().await,
ConnSetCmd::Shutdown => self.handle_shutdown().await,
},
CaConnSetEvent::CaConnEvent((addr, ev)) => match ev.value {
CaConnEventValue::None => Ok(()),
@@ -264,7 +271,7 @@ impl CaConnSet {
self.storage_insert_tx.send(item).await?;
Ok(())
}
CaConnEventValue::EndOfStream => todo!(),
CaConnEventValue::EndOfStream => self.handle_ca_conn_eos(addr).await,
},
}
}
@@ -398,6 +405,44 @@ impl CaConnSet {
Ok(())
}
async fn handle_check_health(&mut self) -> Result<(), Error> {
debug!("TODO handle_check_health");
let item = CaConnSetItem::Healthy;
self.connset_out_tx.send(item).await?;
Ok(())
}
async fn handle_shutdown(&mut self) -> Result<(), Error> {
debug!("TODO handle_shutdown");
debug!("shutdown received");
self.shutdown_stopping = true;
for (addr, res) in self.ca_conn_ress.iter() {
let item = ConnCommand::shutdown();
res.sender.send(item).await?;
}
Ok(())
}
async fn handle_ca_conn_eos(&mut self, addr: SocketAddr) -> Result<(), Error> {
debug!("handle_ca_conn_eos {addr}");
if let Some(e) = self.ca_conn_ress.remove(&addr) {
match e.jh.await {
Ok(Ok(())) => {
debug!("CaConn {addr} finished well");
}
Ok(Err(e)) => {
error!("CaConn {addr} task error: {e}");
}
Err(e) => {
error!("CaConn {addr} join error: {e}");
}
}
} else {
warn!("end-of-stream received for non-existent CaConn {addr}");
}
Ok(())
}
fn create_ca_conn(&self, add: ChannelAddWithAddr) -> Result<CaConnRes, Error> {
// TODO should we save this as event?
let opts = CaConnOpts::default();
+3
View File
@@ -1,4 +1,5 @@
use crate::ca::conn::CaConnEvent;
use crate::ca::connset::CaConnSetItem;
use crate::ca::findioc::FindIocRes;
use async_channel::Sender;
use err::Error;
@@ -28,6 +29,7 @@ pub enum DaemonEvent {
ChannelRemove(Channel),
SearchDone(Result<VecDeque<FindIocRes>, Error>),
CaConnEvent(SocketAddrV4, CaConnEvent),
CaConnSetItem(CaConnSetItem),
Shutdown,
}
@@ -49,6 +51,7 @@ impl DaemonEvent {
EndOfStream => format!("CaConnEvent/EndOfStream"),
}
}
CaConnSetItem(_) => format!("CaConnSetItem"),
Shutdown => format!("Shutdown"),
}
}