Clean up shutdown

This commit is contained in:
Dominik Werder
2023-09-13 07:28:34 +02:00
parent 6407af9574
commit 869ed8e5dd
7 changed files with 501 additions and 199 deletions

View File

@@ -295,7 +295,7 @@ impl Daemon {
}
async fn handle_channel_add(&mut self, ch: Channel) -> Result<(), Error> {
debug!("handle_channel_add {ch:?}");
// debug!("handle_channel_add {ch:?}");
self.connset_ctrl
.add_channel(
self.opts.backend.clone(),
@@ -380,12 +380,15 @@ impl Daemon {
}
}
}
Error(e) => {
error!("error from CaConnSet: {e}");
self.handle_shutdown().await?;
}
}
Ok(())
}
async fn handle_shutdown(&mut self) -> Result<(), Error> {
error!("TODO handle_shutdown");
if self.shutting_down {
warn!("already shutting down");
} else {
@@ -398,6 +401,7 @@ impl Daemon {
// await the connection sets.
// await other workers that we've spawned.
self.connset_ctrl.shutdown().await?;
self.rx.close();
}
Ok(())
}
@@ -601,9 +605,12 @@ pub async fn run(opts: CaIngestOpts, channels: Vec<String>) -> Result<(), Error>
let mut i = 0;
for s in &channels {
let ch = Channel::new(s.into());
tx.send(DaemonEvent::ChannelAdd(ch)).await?;
match tx.send(DaemonEvent::ChannelAdd(ch)).await {
Ok(()) => {}
Err(_) => break,
}
i += 1;
if i % 1000 == 0 {
if i % 100 == 0 {
debug!("sent {} ChannelAdd", i);
}
}

View File

@@ -321,12 +321,15 @@ impl Worker {
};
trace3!("try to send result for {:?}", item);
let fut = r.tx.make_send(Ok(item));
match fut.await {
Ok(()) => {}
Err(_e) => {
match tokio::time::timeout(Duration::from_millis(2000), fut).await {
Ok(Ok(())) => {}
Ok(Err(_e)) => {
warn!("can not deliver result");
return Err(Error::ChannelError);
}
Err(_) => {
debug!("timeout can not deliver result");
}
}
}
}

View File

@@ -3,6 +3,7 @@ use super::ExtraInsertsConf;
use crate::senderpolling::SenderPolling;
use crate::timebin::ConnTimeBin;
use async_channel::Sender;
use core::fmt;
use dbpg::seriesbychannel::CanSendChannelInfoResult;
use dbpg::seriesbychannel::ChannelInfoQuery;
use dbpg::seriesbychannel::ChannelInfoResult;
@@ -272,6 +273,21 @@ enum CaConnState {
EndOfStream,
}
impl fmt::Debug for CaConnState {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::Unconnected => write!(fmt, "Unconnected"),
Self::Connecting(arg0, _) => fmt.debug_tuple("Connecting").field(arg0).finish(),
Self::Init => write!(fmt, "Init"),
Self::Listen => write!(fmt, "Listen"),
Self::PeerReady => write!(fmt, "PeerReady"),
Self::Wait(_) => fmt.debug_tuple("Wait").finish(),
Self::Shutdown => write!(fmt, "Shutdown"),
Self::EndOfStream => write!(fmt, "EndOfStream"),
}
}
}
fn wait_fut(dt: u64) -> Pin<Box<dyn Future<Output = ()> + Send>> {
let fut = tokio::time::sleep(Duration::from_millis(dt));
Box::pin(fut)
@@ -672,37 +688,41 @@ impl CaConn {
use Poll::*;
loop {
self.stats.caconn_loop3_count.inc();
break match self.conn_command_rx.poll_next_unpin(cx) {
Ready(Some(a)) => {
trace3!("handle_conn_command received a command {}", self.remote_addr_dbg);
match a.kind {
ConnCommandKind::ChannelAdd(name, cssid) => {
self.cmd_channel_add(name, cssid);
Ready(Some(Ok(())))
break if self.is_shutdown() {
Ready(None)
} else {
match self.conn_command_rx.poll_next_unpin(cx) {
Ready(Some(a)) => {
trace3!("handle_conn_command received a command {}", self.remote_addr_dbg);
match a.kind {
ConnCommandKind::ChannelAdd(name, cssid) => {
self.cmd_channel_add(name, cssid);
Ready(Some(Ok(())))
}
ConnCommandKind::ChannelRemove(name) => {
self.cmd_channel_remove(name);
Ready(Some(Ok(())))
}
ConnCommandKind::CheckHealth => {
self.cmd_check_health();
Ready(Some(Ok(())))
}
ConnCommandKind::Shutdown => {
self.cmd_shutdown();
Ready(Some(Ok(())))
}
ConnCommandKind::SeriesLookupResult(x) => match self.handle_series_lookup_result(x) {
Ok(()) => Ready(Some(Ok(()))),
Err(e) => Ready(Some(Err(e))),
},
}
ConnCommandKind::ChannelRemove(name) => {
self.cmd_channel_remove(name);
Ready(Some(Ok(())))
}
ConnCommandKind::CheckHealth => {
self.cmd_check_health();
Ready(Some(Ok(())))
}
ConnCommandKind::Shutdown => {
self.cmd_shutdown();
Ready(Some(Ok(())))
}
ConnCommandKind::SeriesLookupResult(x) => match self.handle_series_lookup_result(x) {
Ok(()) => Ready(Some(Ok(()))),
Err(e) => Ready(Some(Err(e))),
},
}
Ready(None) => {
error!("Command queue closed");
Ready(None)
}
Pending => Pending,
}
Ready(None) => {
error!("Command queue closed");
Ready(None)
}
Pending => Pending,
};
}
}
@@ -1688,6 +1708,7 @@ impl CaConn {
}
fn handle_own_ticker_tick(self: Pin<&mut Self>, _cx: &mut Context) -> Result<(), Error> {
debug!("tick CaConn {}", self.remote_addr_dbg);
let this = self.get_mut();
if false {
for (_, tb) in this.time_binners.iter_mut() {
@@ -1699,26 +1720,32 @@ impl CaConn {
}
fn queues_async_out_flushed(&self) -> bool {
self.channel_info_query_queue.is_empty() && self.channel_info_query_sending.is_idle()
// self.channel_info_query_queue.is_empty() && self.channel_info_query_sending.is_idle()
// TODO re-enable later
true
}
fn attempt_flush_channel_info_query(mut self: Pin<&mut Self>, cx: &mut Context) -> Result<(), Error> {
use Poll::*;
loop {
let sd = &mut self.channel_info_query_sending;
break if sd.is_sending() {
match sd.poll_unpin(cx) {
Ready(Ok(())) => continue,
Ready(Err(_)) => Err(Error::with_msg_no_trace("can not send into channel")),
Pending => Ok(()),
}
} else if let Some(item) = self.channel_info_query_queue.pop_front() {
trace3!("send series query {item:?}");
let sd = &mut self.channel_info_query_sending;
sd.send(item);
continue;
} else {
break if self.is_shutdown() {
Ok(())
} else {
let sd = &mut self.channel_info_query_sending;
if sd.is_sending() {
match sd.poll_unpin(cx) {
Ready(Ok(())) => continue,
Ready(Err(_)) => Err(Error::with_msg_no_trace("can not send into channel")),
Pending => Ok(()),
}
} else if let Some(item) = self.channel_info_query_queue.pop_front() {
trace3!("send series query {item:?}");
let sd = &mut self.channel_info_query_sending;
sd.send(item);
continue;
} else {
Ok(())
}
};
}
}
@@ -1764,10 +1791,15 @@ impl Stream for CaConn {
ts: Instant::now(),
value: CaConnEventValue::None,
};
if self.is_shutdown() && self.queues_async_out_flushed() {
debug!("end of stream {}", self.remote_addr_dbg);
self.state = CaConnState::EndOfStream;
Ready(None)
if self.is_shutdown() {
if self.queues_async_out_flushed() == false {
debug!("shutdown, but async queues not flushed");
continue;
} else {
debug!("end of stream {}", self.remote_addr_dbg);
self.state = CaConnState::EndOfStream;
Ready(None)
}
} else {
continue;
}

View File

@@ -15,6 +15,7 @@ use crate::daemon_common::Channel;
use crate::errconv::ErrConv;
use crate::rt::JoinHandle;
use crate::rt::TokMx;
use crate::senderpolling::SenderPolling;
use async_channel::Receiver;
use async_channel::Sender;
use atomic::AtomicUsize;
@@ -24,6 +25,7 @@ use dbpg::seriesbychannel::ChannelInfoQuery;
use dbpg::seriesbychannel::ChannelInfoResult;
use err::Error;
use futures_util::FutureExt;
use futures_util::Stream;
use futures_util::StreamExt;
use log::*;
use netpod::Database;
@@ -46,8 +48,11 @@ use std::collections::BTreeMap;
use std::collections::VecDeque;
use std::net::SocketAddr;
use std::net::SocketAddrV4;
use std::pin::Pin;
use std::sync::atomic;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
use std::time::Duration;
use std::time::Instant;
use std::time::SystemTime;
@@ -144,12 +149,8 @@ pub struct ChannelRemove {
#[derive(Debug)]
pub enum ConnSetCmd {
SeriesLookupResult(Result<ChannelInfoResult, dbpg::seriesbychannel::Error>),
ChannelAdd(ChannelAdd),
ChannelAddWithStatusId(ChannelAddWithStatusId),
ChannelAddWithAddr(ChannelAddWithAddr),
ChannelRemove(ChannelRemove),
IocAddrQueryResult(VecDeque<FindIocRes>),
CheckHealth(Instant),
Shutdown,
}
@@ -157,11 +158,11 @@ pub enum ConnSetCmd {
#[derive(Debug)]
pub enum CaConnSetEvent {
ConnSetCmd(ConnSetCmd),
CaConnEvent((SocketAddr, CaConnEvent)),
}
#[derive(Debug, Clone)]
pub enum CaConnSetItem {
Error(Error),
Healthy(Instant, Instant),
}
@@ -218,14 +219,14 @@ pub struct IocAddrQuery {
}
struct SeriesLookupSender {
tx: Sender<CaConnSetEvent>,
tx: Sender<Result<ChannelInfoResult, Error>>,
}
impl CanSendChannelInfoResult for SeriesLookupSender {
fn make_send(&self, item: Result<ChannelInfoResult, dbpg::seriesbychannel::Error>) -> BoxedSend {
let tx = self.tx.clone();
let fut = async move {
tx.send(CaConnSetEvent::ConnSetCmd(ConnSetCmd::SeriesLookupResult(item)))
tx.send(item.map_err(|e| Error::with_msg_no_trace(e.to_string())))
.await
.map_err(|_| ())
};
@@ -236,20 +237,29 @@ impl CanSendChannelInfoResult for SeriesLookupSender {
pub struct CaConnSet {
backend: String,
local_epics_hostname: String,
search_tx: Sender<IocAddrQuery>,
ca_conn_ress: BTreeMap<SocketAddr, CaConnRes>,
channel_states: ChannelStateMap,
connset_tx: Sender<CaConnSetEvent>,
// connset_rx: Receiver<CaConnSetEvent>,
connset_rx: crate::ca::connset_input_merge::InputMerge,
connset_inp_rx: Receiver<CaConnSetEvent>,
channel_info_query_queue: VecDeque<ChannelInfoQuery>,
channel_info_query_sender: SenderPolling<ChannelInfoQuery>,
channel_info_query_tx: Sender<ChannelInfoQuery>,
storage_insert_tx: Sender<QueryItem>,
channel_info_res_tx: Sender<Result<ChannelInfoResult, Error>>,
channel_info_res_rx: Receiver<Result<ChannelInfoResult, Error>>,
find_ioc_query_queue: VecDeque<IocAddrQuery>,
find_ioc_query_sender: SenderPolling<IocAddrQuery>,
find_ioc_res_rx: Receiver<VecDeque<FindIocRes>>,
storage_insert_queue: VecDeque<QueryItem>,
storage_insert_sender: SenderPolling<QueryItem>,
ca_conn_res_tx: Sender<(SocketAddr, CaConnEvent)>,
ca_conn_res_rx: Receiver<(SocketAddr, CaConnEvent)>,
connset_out_queue: VecDeque<CaConnSetItem>,
connset_out_tx: Sender<CaConnSetItem>,
shutdown_stopping: bool,
shutdown_done: bool,
chan_check_next: Option<Channel>,
stats: CaConnSetStats,
connset_out_tx: Sender<CaConnSetItem>,
ioc_finder_jh: JoinHandle<Result<(), Error>>,
await_ca_conn_jhs: VecDeque<(SocketAddr, JoinHandle<Result<(), Error>>)>,
}
impl CaConnSet {
@@ -260,28 +270,40 @@ impl CaConnSet {
channel_info_query_tx: Sender<ChannelInfoQuery>,
pgconf: Database,
) -> CaConnSetCtrl {
let (connset_inp_tx, connset_inp_rx) = async_channel::bounded(256);
let (connset_out_tx, connset_out_rx) = async_channel::bounded(256);
let (find_ioc_res_tx, find_ioc_res_rx) = async_channel::bounded(10000);
let (search_tx, ioc_finder_jh) = super::finder::start_finder(find_ioc_res_tx.clone(), backend.clone(), pgconf);
let input_merge = InputMerge::new(todo!(), find_ioc_res_rx);
let (ca_conn_res_tx, ca_conn_res_rx) = async_channel::bounded(5000);
let (connset_inp_tx, connset_inp_rx) = async_channel::bounded(5000);
let (connset_out_tx, connset_out_rx) = async_channel::bounded(5000);
let (find_ioc_res_tx, find_ioc_res_rx) = async_channel::bounded(5000);
let (find_ioc_query_tx, ioc_finder_jh) =
super::finder::start_finder(find_ioc_res_tx.clone(), backend.clone(), pgconf);
let (channel_info_res_tx, channel_info_res_rx) = async_channel::bounded(5000);
let connset = Self {
backend,
local_epics_hostname,
search_tx,
ca_conn_ress: BTreeMap::new(),
channel_states: ChannelStateMap::new(),
connset_tx: connset_inp_tx,
// connset_rx: find_ioc_res_rx,
connset_rx: todo!(),
connset_inp_rx,
channel_info_query_queue: VecDeque::new(),
channel_info_query_sender: SenderPolling::new(channel_info_query_tx.clone()),
channel_info_query_tx,
storage_insert_tx,
channel_info_res_tx,
channel_info_res_rx,
find_ioc_query_queue: VecDeque::new(),
find_ioc_query_sender: SenderPolling::new(find_ioc_query_tx),
find_ioc_res_rx,
storage_insert_queue: VecDeque::new(),
storage_insert_sender: SenderPolling::new(storage_insert_tx),
ca_conn_res_tx,
ca_conn_res_rx,
shutdown_stopping: false,
shutdown_done: false,
chan_check_next: None,
stats: CaConnSetStats::new(),
connset_out_tx,
connset_out_queue: VecDeque::new(),
// connset_out_sender: SenderPolling::new(connset_out_tx),
ioc_finder_jh,
await_ca_conn_jhs: VecDeque::new(),
};
// TODO await on jh
let jh = tokio::spawn(CaConnSet::run(connset));
@@ -294,66 +316,60 @@ impl CaConnSet {
async fn run(mut this: CaConnSet) -> Result<(), Error> {
loop {
let x = this.connset_rx.next().await;
let x = this.next().await;
match x {
Some(ev) => this.handle_event(ev).await?,
None => {
if this.shutdown_stopping {
// all fine
break;
} else {
error!("channel closed without shutdown_stopping");
}
}
}
if this.shutdown_stopping {
break;
Some(x) => this.connset_out_tx.send(x).await?,
None => break,
}
}
debug!(
"search_tx sender {} receiver {}",
this.search_tx.sender_count(),
this.search_tx.receiver_count()
);
// debug!(
// "search_tx sender {} receiver {}",
// this.find_ioc_query_tx.sender_count(),
// this.find_ioc_query_tx.receiver_count()
// );
this.ioc_finder_jh
.await
.map_err(|e| Error::with_msg_no_trace(e.to_string()))??;
debug!("joined ioc_finder_jh");
this.connset_out_tx.close();
this.connset_rx.close();
this.connset_inp_rx.close();
this.shutdown_done = true;
Ok(())
}
async fn handle_event(&mut self, ev: CaConnSetEvent) -> Result<(), Error> {
fn handle_event(&mut self, ev: CaConnSetEvent) -> Result<(), Error> {
// trace!("handle_event {ev:?}");
match ev {
CaConnSetEvent::ConnSetCmd(cmd) => match cmd {
ConnSetCmd::ChannelAdd(x) => self.handle_add_channel(x).await,
ConnSetCmd::ChannelAddWithStatusId(x) => self.handle_add_channel_with_status_id(x).await,
ConnSetCmd::ChannelAddWithAddr(x) => self.handle_add_channel_with_addr(x).await,
ConnSetCmd::ChannelRemove(x) => self.handle_remove_channel(x).await,
ConnSetCmd::IocAddrQueryResult(x) => self.handle_ioc_query_result(x).await,
ConnSetCmd::SeriesLookupResult(x) => self.handle_series_lookup_result(x).await,
ConnSetCmd::CheckHealth(ts1) => self.handle_check_health(ts1).await,
ConnSetCmd::Shutdown => self.handle_shutdown().await,
},
CaConnSetEvent::CaConnEvent((addr, ev)) => match ev.value {
CaConnEventValue::None => Ok(()),
CaConnEventValue::EchoTimeout => todo!(),
CaConnEventValue::ConnCommandResult(_) => todo!(),
CaConnEventValue::QueryItem(item) => {
self.storage_insert_tx.send(item).await?;
Ok(())
}
CaConnEventValue::EndOfStream => self.handle_ca_conn_eos(addr).await,
ConnSetCmd::ChannelAdd(x) => self.handle_add_channel(x),
// ConnSetCmd::ChannelAddWithStatusId(x) => self.handle_add_channel_with_status_id(x),
// ConnSetCmd::ChannelAddWithAddr(x) => self.handle_add_channel_with_addr(x),
ConnSetCmd::ChannelRemove(x) => self.handle_remove_channel(x),
// ConnSetCmd::IocAddrQueryResult(x) => self.handle_ioc_query_result(x).await,
// ConnSetCmd::SeriesLookupResult(x) => self.handle_series_lookup_result(x).await,
ConnSetCmd::CheckHealth(ts1) => self.handle_check_health(ts1),
ConnSetCmd::Shutdown => self.handle_shutdown(),
},
}
}
async fn handle_series_lookup_result(
&mut self,
res: Result<ChannelInfoResult, dbpg::seriesbychannel::Error>,
) -> Result<(), Error> {
fn handle_ca_conn_event(&mut self, addr: SocketAddr, ev: CaConnEvent) -> Result<(), Error> {
match ev.value {
CaConnEventValue::None => Ok(()),
CaConnEventValue::EchoTimeout => todo!(),
CaConnEventValue::ConnCommandResult(_) => todo!(),
CaConnEventValue::QueryItem(item) => {
self.storage_insert_queue.push_back(item);
Ok(())
}
CaConnEventValue::EndOfStream => self.handle_ca_conn_eos(addr),
}
}
fn handle_series_lookup_result(&mut self, res: Result<ChannelInfoResult, Error>) -> Result<(), Error> {
if self.shutdown_stopping {
return Ok(());
}
trace3!("handle_series_lookup_result {res:?}");
match res {
Ok(res) => {
@@ -363,9 +379,7 @@ impl CaConnSet {
local_epics_hostname: self.local_epics_hostname.clone(),
cssid: ChannelStatusSeriesId::new(res.series.into_inner().id()),
};
self.connset_tx
.send(CaConnSetEvent::ConnSetCmd(ConnSetCmd::ChannelAddWithStatusId(add)))
.await?;
self.handle_add_channel_with_status_id(add)?;
}
Err(e) => {
warn!("TODO handle error {e}");
@@ -374,9 +388,10 @@ impl CaConnSet {
Ok(())
}
async fn handle_add_channel(&mut self, add: ChannelAdd) -> Result<(), Error> {
fn handle_add_channel(&mut self, add: ChannelAdd) -> Result<(), Error> {
trace3!("handle_add_channel {}", add.name);
if self.shutdown_stopping {
debug!("handle_add_channel but shutdown_stopping");
trace3!("handle_add_channel but shutdown_stopping");
return Ok(());
}
// TODO should I add the transition through ActiveChannelState::Init as well?
@@ -392,19 +407,19 @@ impl CaConnSet {
scalar_type: CHANNEL_STATUS_DUMMY_SCALAR_TYPE,
shape_dims: Vec::new(),
tx: Box::pin(SeriesLookupSender {
tx: self.connset_tx.clone(),
tx: self.channel_info_res_tx.clone(),
}),
};
self.channel_info_query_tx.send(item).await?;
self.channel_info_query_queue.push_back(item);
Ok(())
}
async fn handle_add_channel_with_status_id(&mut self, add: ChannelAddWithStatusId) -> Result<(), Error> {
fn handle_add_channel_with_status_id(&mut self, add: ChannelAddWithStatusId) -> Result<(), Error> {
trace3!("handle_add_channel_with_status_id {}", add.name);
if self.shutdown_stopping {
debug!("handle_add_channel but shutdown_stopping");
return Ok(());
}
trace3!("handle_add_channel_with_status_id {add:?}");
let ch = Channel::new(add.name.clone());
if let Some(chst) = self.channel_states.inner().get_mut(&ch) {
if let ChannelStateValue::Active(chst2) = &mut chst.value {
@@ -418,7 +433,7 @@ impl CaConnSet {
},
};
let qu = IocAddrQuery { name: add.name };
self.search_tx.send(qu).await?;
self.find_ioc_query_queue.push_back(qu);
} else {
warn!("TODO have a status series id but no more channel");
}
@@ -431,9 +446,9 @@ impl CaConnSet {
Ok(())
}
async fn handle_add_channel_with_addr(&mut self, add: ChannelAddWithAddr) -> Result<(), Error> {
fn handle_add_channel_with_addr(&mut self, add: ChannelAddWithAddr) -> Result<(), Error> {
if self.shutdown_stopping {
debug!("handle_add_channel but shutdown_stopping");
trace3!("handle_add_channel but shutdown_stopping");
return Ok(());
}
if !self.ca_conn_ress.contains_key(&add.addr) {
@@ -442,11 +457,16 @@ impl CaConnSet {
}
let conn_ress = self.ca_conn_ress.get_mut(&add.addr).unwrap();
let cmd = ConnCommand::channel_add(add.name, add.cssid);
conn_ress.sender.send(cmd).await?;
// TODO not the nicest
let tx = conn_ress.sender.clone();
tokio::spawn(async move { tx.send(cmd).await });
Ok(())
}
async fn handle_remove_channel(&mut self, add: ChannelRemove) -> Result<(), Error> {
fn handle_remove_channel(&mut self, add: ChannelRemove) -> Result<(), Error> {
if self.shutdown_stopping {
return Ok(());
}
let ch = Channel::new(add.name);
if let Some(k) = self.channel_states.inner().get_mut(&ch) {
match &k.value {
@@ -483,7 +503,11 @@ impl CaConnSet {
Ok(())
}
async fn handle_ioc_query_result(&mut self, res: VecDeque<FindIocRes>) -> Result<(), Error> {
fn handle_ioc_query_result(&mut self, res: VecDeque<FindIocRes>) -> Result<(), Error> {
if self.shutdown_stopping {
return Ok(());
}
trace3!("handle_ioc_query_result");
for e in res {
let ch = Channel::new(e.channel.clone());
if let Some(chst) = self.channel_states.inner().get_mut(&ch) {
@@ -502,16 +526,15 @@ impl CaConnSet {
cssid: status_series_id.clone(),
local_epics_hostname: self.local_epics_hostname.clone(),
};
self.connset_tx
.send(CaConnSetEvent::ConnSetCmd(ConnSetCmd::ChannelAddWithAddr(add)))
.await?;
let since = SystemTime::now();
state.inner = WithStatusSeriesIdStateInner::WithAddress {
addr,
state: WithAddressState::Unassigned { since },
}
};
// TODO move state change also in there?
self.handle_add_channel_with_addr(add)?;
} else {
debug!("ioc not found {e:?}");
trace3!("ioc not found {e:?}");
let since = SystemTime::now();
state.inner = WithStatusSeriesIdStateInner::UnknownAddress { since };
}
@@ -528,50 +551,56 @@ impl CaConnSet {
Ok(())
}
async fn handle_check_health(&mut self, ts1: Instant) -> Result<(), Error> {
fn handle_check_health(&mut self, ts1: Instant) -> Result<(), Error> {
if self.shutdown_stopping {
return Ok(());
}
debug!("TODO handle_check_health");
let ts2 = Instant::now();
let item = CaConnSetItem::Healthy(ts1, ts2);
self.connset_out_tx.send(item).await?;
self.connset_out_queue.push_back(item);
Ok(())
}
async fn handle_shutdown(&mut self) -> Result<(), Error> {
debug!("TODO handle_shutdown");
debug!("shutdown received");
fn handle_shutdown(&mut self) -> Result<(), Error> {
if self.shutdown_stopping {
return Ok(());
}
debug!("handle_shutdown");
self.shutdown_stopping = true;
self.search_tx.close();
self.channel_info_query_sender.drop();
self.find_ioc_query_sender.drop();
for (addr, res) in self.ca_conn_ress.iter() {
let item = ConnCommand::shutdown();
res.sender.send(item).await?;
// TODO not the nicest
let tx = res.sender.clone();
tokio::spawn(async move { tx.send(item).await });
}
Ok(())
}
async fn handle_ca_conn_eos(&mut self, addr: SocketAddr) -> Result<(), Error> {
fn handle_ca_conn_eos(&mut self, addr: SocketAddr) -> Result<(), Error> {
debug!("handle_ca_conn_eos {addr}");
if let Some(e) = self.ca_conn_ress.remove(&addr) {
match e.jh.await {
Ok(Ok(())) => {
self.stats.ca_conn_task_join_done_ok.inc();
debug!("CaConn {addr} finished well");
}
Ok(Err(e)) => {
self.stats.ca_conn_task_join_done_err.inc();
error!("CaConn {addr} task error: {e}");
}
Err(e) => {
self.stats.ca_conn_task_join_err.inc();
error!("CaConn {addr} join error: {e}");
}
}
self.await_ca_conn_jhs.push_back((addr, e.jh));
} else {
self.stats.ca_conn_task_eos_non_exist.inc();
warn!("end-of-stream received for non-existent CaConn {addr}");
}
debug!("still CaConn left {}", self.ca_conn_ress.len());
Ok(())
}
fn ready_for_end_of_stream(&self) -> bool {
if self.ca_conn_ress.len() > 0 {
false
} else if self.await_ca_conn_jhs.len() > 1 {
false
} else {
true
}
}
fn create_ca_conn(&self, add: ChannelAddWithAddr) -> Result<CaConnRes, Error> {
// TODO should we save this as event?
let opts = CaConnOpts::default();
@@ -591,8 +620,8 @@ impl CaConnSet {
);
let conn_tx = conn.conn_command_tx();
let conn_stats = conn.stats();
let conn_item_tx = self.connset_tx.clone();
let jh = tokio::spawn(Self::ca_conn_item_merge(conn, conn_item_tx, addr_v4));
let ca_conn_res_tx = self.ca_conn_res_tx.clone();
let jh = tokio::spawn(Self::ca_conn_item_merge(conn, ca_conn_res_tx, addr));
let ca_conn_res = CaConnRes {
state: CaConnState::new(CaConnStateValue::Fresh),
sender: conn_tx,
@@ -604,8 +633,8 @@ impl CaConnSet {
async fn ca_conn_item_merge(
conn: CaConn,
conn_item_tx: Sender<CaConnSetEvent>,
addr: SocketAddrV4,
tx: Sender<(SocketAddr, CaConnEvent)>,
addr: SocketAddr,
) -> Result<(), Error> {
debug!("ca_conn_consumer begin {}", addr);
let stats = conn.stats();
@@ -615,9 +644,7 @@ impl CaConnSet {
match item {
Ok(item) => {
stats.conn_item_count.inc();
conn_item_tx
.send(CaConnSetEvent::CaConnEvent((SocketAddr::V4(addr), item)))
.await?;
tx.send((addr, item)).await?;
}
Err(e) => {
error!("CaConn gives error: {e:?}");
@@ -626,15 +653,14 @@ impl CaConnSet {
}
}
debug!("ca_conn_consumer ended {}", addr);
conn_item_tx
.send(CaConnSetEvent::CaConnEvent((
SocketAddr::V4(addr),
CaConnEvent {
ts: Instant::now(),
value: CaConnEventValue::EndOfStream,
},
)))
.await?;
tx.send((
addr,
CaConnEvent {
ts: Instant::now(),
value: CaConnEventValue::EndOfStream,
},
))
.await?;
debug!("ca_conn_consumer signaled {}", addr);
ret
}
@@ -694,7 +720,7 @@ impl CaConnSet {
rxs
}
pub async fn wait_stopped(&self) -> Result<(), Error> {
async fn wait_stopped(&self) -> Result<(), Error> {
warn!("Lock for wait_stopped");
// let mut g = self.ca_conn_ress.lock().await;
// let mm = std::mem::replace(&mut *g, BTreeMap::new());
@@ -948,3 +974,180 @@ impl CaConnSet {
(search_pending,)
}
}
impl Stream for CaConnSet {
type Item = CaConnSetItem;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
debug!("CaConnSet::poll");
loop {
let mut have_pending = false;
if let Some(item) = self.connset_out_queue.pop_front() {
break Ready(Some(item));
}
if let Some((addr, jh)) = self.await_ca_conn_jhs.front_mut() {
match jh.poll_unpin(cx) {
Ready(x) => {
let addr = *addr;
self.await_ca_conn_jhs.pop_front();
debug!("await_ca_conn_jhs still jhs left {}", self.await_ca_conn_jhs.len());
match x {
Ok(Ok(())) => {
self.stats.ca_conn_task_join_done_ok.inc();
debug!("CaConn {addr} finished well");
}
Ok(Err(e)) => {
self.stats.ca_conn_task_join_done_err.inc();
error!("CaConn {addr} task error: {e}");
}
Err(e) => {
self.stats.ca_conn_task_join_err.inc();
error!("CaConn {addr} join error: {e}");
}
}
}
Pending => {}
}
}
if self.storage_insert_sender.is_idle() {
if let Some(item) = self.storage_insert_queue.pop_front() {
self.storage_insert_sender.send(item);
}
}
if self.storage_insert_sender.is_sending() {
match self.storage_insert_sender.poll_unpin(cx) {
Ready(Ok(())) => continue,
Ready(Err(e)) => {
let e = Error::with_msg_no_trace("can not send into channel");
error!("{e}");
break Ready(Some(CaConnSetItem::Error(e)));
}
Pending => {
have_pending = true;
}
}
}
if self.find_ioc_query_sender.is_idle() {
if let Some(item) = self.find_ioc_query_queue.pop_front() {
self.find_ioc_query_sender.send(item);
}
}
if self.find_ioc_query_sender.is_sending() {
match self.find_ioc_query_sender.poll_unpin(cx) {
Ready(Ok(())) => continue,
Ready(Err(e)) => {
let e = Error::with_msg_no_trace("can not send into channel");
error!("{e}");
break Ready(Some(CaConnSetItem::Error(e)));
}
Pending => {
have_pending = true;
}
}
}
if self.channel_info_query_sender.is_idle() {
if let Some(item) = self.channel_info_query_queue.pop_front() {
self.channel_info_query_sender.send(item);
}
}
if self.channel_info_query_sender.is_sending() {
match self.channel_info_query_sender.poll_unpin(cx) {
Ready(Ok(())) => continue,
Ready(Err(e)) => {
let e = Error::with_msg_no_trace("can not send into channel");
error!("{e}");
break Ready(Some(CaConnSetItem::Error(e)));
}
Pending => {
have_pending = true;
}
}
}
let item = match self.find_ioc_res_rx.poll_next_unpin(cx) {
Ready(Some(x)) => match self.handle_ioc_query_result(x) {
Ok(()) => continue,
Err(e) => Ready(Some(CaConnSetItem::Error(e))),
},
Ready(None) => Ready(None),
Pending => {
have_pending = true;
Pending
}
};
match item {
Ready(Some(x)) => break Ready(Some(x)),
_ => {}
}
let item = match self.ca_conn_res_rx.poll_next_unpin(cx) {
Ready(Some((addr, ev))) => match self.handle_ca_conn_event(addr, ev) {
Ok(()) => continue,
Err(e) => Ready(Some(CaConnSetItem::Error(e))),
},
Ready(None) => Ready(None),
Pending => {
have_pending = true;
Pending
}
};
match item {
Ready(Some(x)) => break Ready(Some(x)),
_ => {}
}
let item = match self.channel_info_res_rx.poll_next_unpin(cx) {
Ready(Some(x)) => match self.handle_series_lookup_result(x) {
Ok(()) => continue,
Err(e) => Ready(Some(CaConnSetItem::Error(e))),
},
Ready(None) => Ready(None),
Pending => {
have_pending = true;
Pending
}
};
match item {
Ready(Some(x)) => break Ready(Some(x)),
_ => {}
}
let item = match self.connset_inp_rx.poll_next_unpin(cx) {
Ready(Some(x)) => match self.handle_event(x) {
Ok(()) => continue,
Err(e) => Ready(Some(CaConnSetItem::Error(e))),
},
Ready(None) => Ready(None),
Pending => {
have_pending = true;
Pending
}
};
match item {
Ready(Some(x)) => break Ready(Some(x)),
_ => {}
}
break if have_pending {
if self.shutdown_stopping && self.ready_for_end_of_stream() {
Ready(None)
} else {
Pending
}
} else if self.shutdown_stopping && self.ready_for_end_of_stream() {
debug!("nothing to do but shutdown");
Ready(None)
} else {
let e = Error::with_msg_no_trace("connset not pending and not shutdown");
error!("{e}");
Ready(Some(CaConnSetItem::Error(e)))
};
}
}
}

View File

@@ -2,6 +2,8 @@ use super::connset::CaConnSetEvent;
use super::findioc::FindIocRes;
use crate::ca::connset::ConnSetCmd;
use async_channel::Receiver;
use dbpg::seriesbychannel::ChannelInfoResult;
use err::Error;
use futures_util::StreamExt;
use std::collections::VecDeque;
use std::pin::Pin;
@@ -11,13 +13,19 @@ use std::task::Poll;
pub struct InputMerge {
inp1: Option<Receiver<CaConnSetEvent>>,
inp2: Option<Receiver<VecDeque<FindIocRes>>>,
inp3: Option<Receiver<Result<ChannelInfoResult, Error>>>,
}
impl InputMerge {
pub fn new(inp1: Receiver<CaConnSetEvent>, inp2: Receiver<VecDeque<FindIocRes>>) -> Self {
pub fn new(
inp1: Receiver<CaConnSetEvent>,
inp2: Receiver<VecDeque<FindIocRes>>,
inp3: Receiver<Result<ChannelInfoResult, Error>>,
) -> Self {
Self {
inp1: Some(inp1),
inp2: Some(inp2),
inp3: Some(inp3),
}
}
@@ -33,18 +41,35 @@ impl futures_util::Stream for InputMerge {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
let mut poll_next = false;
let ret = if let Some(inp) = &mut self.inp2 {
match inp.poll_next_unpin(cx) {
Ready(Some(x)) => Some(CaConnSetEvent::ConnSetCmd(ConnSetCmd::IocAddrQueryResult(x))),
Ready(None) => {
self.inp2 = None;
None
let ret = {
if let Some(inp) = &mut self.inp3 {
match inp.poll_next_unpin(cx) {
Ready(Some(x)) => Some(CaConnSetEvent::ConnSetCmd(todo!())),
Ready(None) => {
self.inp2 = None;
None
}
Pending => None,
}
Pending => None,
} else {
None
}
};
let ret = if let Some(x) = ret {
Some(x)
} else {
None
if let Some(inp) = &mut self.inp2 {
match inp.poll_next_unpin(cx) {
Ready(Some(x)) => Some(CaConnSetEvent::ConnSetCmd(todo!())),
Ready(None) => {
self.inp2 = None;
None
}
Pending => None,
}
} else {
None
}
};
if let Some(x) = ret {
Ready(Some(x))

View File

@@ -22,7 +22,7 @@ pub struct SenderPolling<T>
where
T: 'static,
{
sender: Box<Sender<T>>,
sender: Option<Box<Sender<T>>>,
sender_ptr: NonNull<Sender<T>>,
fut: Option<Send<'static, T>>,
_pin: PhantomPinned,
@@ -33,17 +33,17 @@ unsafe impl<T> core::marker::Send for SenderPolling<T> where T: core::marker::Se
impl<T> SenderPolling<T> {
pub fn new(sender: Sender<T>) -> Self {
let mut ret = Self {
sender: Box::new(sender),
sender: Some(Box::new(sender)),
sender_ptr: NonNull::dangling(),
fut: None,
_pin: PhantomPinned,
};
ret.sender_ptr = NonNull::from(ret.sender.as_ref());
ret.sender_ptr = NonNull::from(ret.sender.as_ref().unwrap().as_ref());
ret
}
pub fn is_idle(&self) -> bool {
self.fut.is_none()
self.sender.is_some() && self.fut.is_none()
}
pub fn is_sending(&self) -> bool {
@@ -51,19 +51,30 @@ impl<T> SenderPolling<T> {
}
pub fn send_pin(self: Pin<&mut Self>, item: T) {
let (tx, fut) = unsafe {
let x = Pin::get_unchecked_mut(self);
(x.sender_ptr.as_mut(), &mut x.fut)
};
let s = tx.send(item);
*fut = Some(s);
unsafe { Pin::get_unchecked_mut(self) }.send(item)
}
pub fn send(&mut self, item: T) {
if self.sender.is_none() {
// panic!("send on dropped sender");
// TODO
return;
}
let sender = unsafe { self.sender_ptr.as_mut() };
let s = sender.send(item);
self.fut = Some(s);
}
pub fn close(&self) {
if let Some(tx) = self.sender.as_ref() {
tx.close();
}
}
pub fn drop(&mut self) {
self.sender = None;
self.fut = None;
}
}
impl<T> Future for SenderPolling<T> {

View File

@@ -14,8 +14,11 @@ use scylla::transport::errors::QueryError;
use series::SeriesId;
use stats::CaConnStats;
use std::net::SocketAddrV4;
use std::sync::atomic;
use std::sync::atomic::AtomicU64;
use std::sync::Mutex;
use std::time::Duration;
use std::time::Instant;
use std::time::SystemTime;
#[derive(Debug, ThisError)]
@@ -377,6 +380,8 @@ where
Ok(())
}
static warn_last: AtomicU64 = AtomicU64::new(0);
pub async fn insert_item(
item: InsertItem,
ttl_index: Duration,
@@ -423,8 +428,24 @@ pub async fn insert_item(
I32(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_i32, &data_store).await?,
F32(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_f32, &data_store).await?,
F64(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_f64, &data_store).await?,
String(_) => warn!("TODO string insert"),
Bool(_v) => warn!("TODO bool insert"),
String(val) => {
let ts = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.map_or(0, |x| x.as_secs());
if ts > warn_last.load(atomic::Ordering::Acquire) + 10 {
warn_last.store(ts, atomic::Ordering::Release);
warn!("TODO string insert {val}");
}
}
Bool(val) => {
let ts = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.map_or(0, |x| x.as_secs());
if ts > warn_last.load(atomic::Ordering::Acquire) + 10 {
warn_last.store(ts, atomic::Ordering::Release);
warn!("TODO bool insert {val}");
}
}
}
}
Array(val) => {