Change stop behavior
This commit is contained in:
@@ -11,16 +11,20 @@ use crate::store::{CommonInsertItemQueue, QueryItem};
|
||||
use async_channel::Sender;
|
||||
use conn::CaConn;
|
||||
use err::Error;
|
||||
use futures_util::StreamExt;
|
||||
use futures_util::future::Fuse;
|
||||
use futures_util::stream::FuturesUnordered;
|
||||
use futures_util::{FutureExt, StreamExt};
|
||||
use log::*;
|
||||
use netpod::{Database, ScyllaConfig};
|
||||
use scylla::batch::Consistency;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use stats::{CaConnStats, CaConnStatsAgg, CaConnStatsAggDiff};
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::{BTreeMap, VecDeque};
|
||||
use std::ffi::CStr;
|
||||
use std::mem::MaybeUninit;
|
||||
use std::net::{Ipv4Addr, SocketAddrV4};
|
||||
use std::path::PathBuf;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
|
||||
use std::sync::{Arc, Mutex, Once};
|
||||
use std::time::Duration;
|
||||
use tokio::fs::OpenOptions;
|
||||
@@ -140,8 +144,9 @@ async fn spawn_scylla_insert_workers(
|
||||
insert_frac: Arc<AtomicU64>,
|
||||
pg_client: Arc<PgClient>,
|
||||
store_stats: Arc<stats::CaConnStats>,
|
||||
) -> Result<(), Error> {
|
||||
let mut data_stores = vec![];
|
||||
) -> Result<Vec<JoinHandle<()>>, Error> {
|
||||
let mut jhs = Vec::new();
|
||||
let mut data_stores = Vec::new();
|
||||
for _ in 0..insert_scylla_sessions {
|
||||
let scy = scylla::SessionBuilder::new()
|
||||
.known_nodes(&scyconf.hosts)
|
||||
@@ -181,8 +186,23 @@ async fn spawn_scylla_insert_workers(
|
||||
}
|
||||
}
|
||||
}
|
||||
QueryItem::ChannelStatus(_item) => {
|
||||
// TODO
|
||||
QueryItem::ChannelStatus(item) => {
|
||||
match crate::store::insert_channel_status(item, &data_store, &stats).await {
|
||||
Ok(_) => {
|
||||
stats.store_worker_item_insert_inc();
|
||||
}
|
||||
Err(e) => {
|
||||
stats.store_worker_item_error_inc();
|
||||
// TODO introduce more structured error variants.
|
||||
if e.msg().contains("WriteTimeout") {
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
} else {
|
||||
// TODO back off but continue.
|
||||
error!("insert worker sees error: {e:?}");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
QueryItem::Insert(item) => {
|
||||
stats.store_worker_item_recv_inc();
|
||||
@@ -211,8 +231,8 @@ async fn spawn_scylla_insert_workers(
|
||||
}
|
||||
QueryItem::Mute(item) => {
|
||||
let values = (
|
||||
(item.series & 0xff) as i32,
|
||||
item.series as i64,
|
||||
(item.series.id() & 0xff) as i32,
|
||||
item.series.id() as i64,
|
||||
item.ts as i64,
|
||||
item.ema,
|
||||
item.emd,
|
||||
@@ -233,8 +253,8 @@ async fn spawn_scylla_insert_workers(
|
||||
}
|
||||
QueryItem::Ivl(item) => {
|
||||
let values = (
|
||||
(item.series & 0xff) as i32,
|
||||
item.series as i64,
|
||||
(item.series.id() & 0xff) as i32,
|
||||
item.series.id() as i64,
|
||||
item.ts as i64,
|
||||
item.ema,
|
||||
item.emd,
|
||||
@@ -255,10 +275,12 @@ async fn spawn_scylla_insert_workers(
|
||||
}
|
||||
}
|
||||
}
|
||||
info!("insert worker has no more messages");
|
||||
};
|
||||
tokio::spawn(fut);
|
||||
let jh = tokio::spawn(fut);
|
||||
jhs.push(jh);
|
||||
}
|
||||
Ok(())
|
||||
Ok(jhs)
|
||||
}
|
||||
|
||||
pub struct CommandQueueSet {
|
||||
@@ -394,7 +416,56 @@ pub async fn create_ca_conn(
|
||||
Ok(jh)
|
||||
}
|
||||
|
||||
pub static SIGINT: AtomicU32 = AtomicU32::new(0);
|
||||
|
||||
fn handler_sigaction(_a: libc::c_int, _b: *const libc::siginfo_t, _c: *const libc::c_void) {
|
||||
SIGINT.store(1, Ordering::Release);
|
||||
let _ = unset_signal_handler();
|
||||
}
|
||||
|
||||
fn set_signal_handler() -> Result<(), Error> {
|
||||
let mask: libc::sigset_t = unsafe { MaybeUninit::zeroed().assume_init() };
|
||||
let handler: libc::sighandler_t = handler_sigaction as *const libc::c_void as _;
|
||||
let act = libc::sigaction {
|
||||
sa_sigaction: handler,
|
||||
sa_mask: mask,
|
||||
sa_flags: 0,
|
||||
sa_restorer: None,
|
||||
};
|
||||
unsafe {
|
||||
let ec = libc::sigaction(libc::SIGINT, &act, std::ptr::null_mut());
|
||||
if ec != 0 {
|
||||
let errno = *libc::__errno_location();
|
||||
let msg = CStr::from_ptr(libc::strerror(errno));
|
||||
error!("error: {:?}", msg);
|
||||
return Err(Error::with_msg_no_trace(format!("can not set signal handler")));
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn unset_signal_handler() -> Result<(), Error> {
|
||||
let mask: libc::sigset_t = unsafe { MaybeUninit::zeroed().assume_init() };
|
||||
let act = libc::sigaction {
|
||||
sa_sigaction: libc::SIG_DFL,
|
||||
sa_mask: mask,
|
||||
sa_flags: 0,
|
||||
sa_restorer: None,
|
||||
};
|
||||
unsafe {
|
||||
let ec = libc::sigaction(libc::SIGINT, &act, std::ptr::null_mut());
|
||||
if ec != 0 {
|
||||
let errno = *libc::__errno_location();
|
||||
let msg = CStr::from_ptr(libc::strerror(errno));
|
||||
error!("error: {:?}", msg);
|
||||
return Err(Error::with_msg_no_trace(format!("can not set signal handler")));
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
|
||||
set_signal_handler()?;
|
||||
let insert_frac = Arc::new(AtomicU64::new(1000));
|
||||
let insert_ivl_min = Arc::new(AtomicU64::new(8800));
|
||||
let opts = parse_config(opts.config).await?;
|
||||
@@ -457,7 +528,7 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
|
||||
let insert_item_queue = Arc::new(insert_item_queue);
|
||||
// TODO use a new stats struct
|
||||
let store_stats = Arc::new(CaConnStats::new());
|
||||
spawn_scylla_insert_workers(
|
||||
let jh_insert_workers = spawn_scylla_insert_workers(
|
||||
opts.scyconf.clone(),
|
||||
opts.insert_scylla_sessions,
|
||||
opts.insert_worker_count,
|
||||
@@ -483,12 +554,14 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
|
||||
};
|
||||
let ingest_commons = Arc::new(ingest_commons);
|
||||
|
||||
tokio::spawn(crate::metrics::start_metrics_service(
|
||||
opts.api_bind.clone(),
|
||||
insert_frac.clone(),
|
||||
insert_ivl_min.clone(),
|
||||
ingest_commons.clone(),
|
||||
));
|
||||
if true {
|
||||
tokio::spawn(crate::metrics::start_metrics_service(
|
||||
opts.api_bind.clone(),
|
||||
insert_frac.clone(),
|
||||
insert_ivl_min.clone(),
|
||||
ingest_commons.clone(),
|
||||
));
|
||||
}
|
||||
|
||||
let metrics_agg_fut = {
|
||||
let conn_stats = conn_stats.clone();
|
||||
@@ -521,7 +594,7 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
|
||||
let mut chns_todo = &opts.channels[..];
|
||||
let mut chstmp = ["__NONE__"; 8];
|
||||
let mut ix = 0;
|
||||
while chns_todo.len() > 0 {
|
||||
while chns_todo.len() > 0 && SIGINT.load(Ordering::Acquire) == 0 {
|
||||
if false {
|
||||
for (s1, s2) in chns_todo.iter().zip(chstmp.iter_mut()) {
|
||||
*s2 = s1;
|
||||
@@ -551,10 +624,7 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
|
||||
// TODO the address was searched before but could not be found.
|
||||
} else {
|
||||
let addr: SocketAddrV4 = match addr.parse() {
|
||||
Ok(k) => {
|
||||
local_stats.ioc_lookup_inc();
|
||||
k
|
||||
}
|
||||
Ok(k) => k,
|
||||
Err(e) => {
|
||||
error!("can not parse {addr:?} for channel {ch:?} {e:?}");
|
||||
continue;
|
||||
@@ -642,19 +712,77 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> {
|
||||
}
|
||||
info!("channels_by_host len {}", channels_by_host.len());
|
||||
|
||||
for jh in conn_jhs {
|
||||
match jh.await {
|
||||
Ok(k) => match k {
|
||||
Ok(_) => {}
|
||||
let mut conn_jhs: VecDeque<Fuse<JoinHandle<Result<(), Error>>>> =
|
||||
conn_jhs.into_iter().map(|jh| jh.fuse()).collect();
|
||||
let mut sent_stop_commands = false;
|
||||
loop {
|
||||
if SIGINT.load(Ordering::Acquire) != 0 {
|
||||
let receiver = insert_item_queue.receiver();
|
||||
info!(
|
||||
"item queue AGAIN senders {} receivers {}",
|
||||
receiver.sender_count(),
|
||||
receiver.receiver_count()
|
||||
);
|
||||
info!("Stopping");
|
||||
if !sent_stop_commands {
|
||||
sent_stop_commands = true;
|
||||
info!("sending stop command");
|
||||
let queues = command_queue_set.queues_locked().await;
|
||||
for q in queues.iter() {
|
||||
let (cmd, _rx) = ConnCommand::shutdown();
|
||||
let _ = q.1.send(cmd).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
let mut jh = if let Some(x) = conn_jhs.pop_front() {
|
||||
x
|
||||
} else {
|
||||
break;
|
||||
};
|
||||
futures_util::select! {
|
||||
a = jh => match a {
|
||||
Ok(k) => match k {
|
||||
Ok(_) => {
|
||||
info!("joined");
|
||||
}
|
||||
Err(e) => {
|
||||
error!("{e:?}");
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
error!("{e:?}");
|
||||
}
|
||||
},
|
||||
_b = tokio::time::sleep(Duration::from_millis(1000)).fuse() => {
|
||||
conn_jhs.push_back(jh);
|
||||
}
|
||||
};
|
||||
}
|
||||
info!("all connections done.");
|
||||
|
||||
drop(ingest_commons);
|
||||
metrics_agg_jh.abort();
|
||||
drop(metrics_agg_jh);
|
||||
|
||||
let receiver = insert_item_queue.receiver();
|
||||
drop(insert_item_queue);
|
||||
info!(
|
||||
"item queue AGAIN senders {} receivers {}",
|
||||
receiver.sender_count(),
|
||||
receiver.receiver_count()
|
||||
);
|
||||
|
||||
let mut futs = FuturesUnordered::from_iter(jh_insert_workers);
|
||||
while let Some(x) = futs.next().await {
|
||||
match x {
|
||||
Ok(_) => {
|
||||
info!("insert worker done");
|
||||
}
|
||||
Err(e) => {
|
||||
error!("{e:?}");
|
||||
error!("error on shutdown: {e:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
metrics_agg_jh.await.unwrap();
|
||||
info!("all insert workers done.");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -5,7 +5,8 @@ use crate::ca::proto::{CreateChan, EventAdd};
|
||||
use crate::ca::store::ChannelRegistry;
|
||||
use crate::series::{Existence, SeriesId};
|
||||
use crate::store::{
|
||||
CommonInsertItemQueueSender, ConnectionStatus, ConnectionStatusItem, InsertItem, IvlItem, MuteItem, QueryItem,
|
||||
ChannelStatus, ChannelStatusItem, CommonInsertItemQueueSender, ConnectionStatus, ConnectionStatusItem, InsertItem,
|
||||
IvlItem, MuteItem, QueryItem,
|
||||
};
|
||||
use async_channel::Sender;
|
||||
use err::Error;
|
||||
@@ -18,6 +19,7 @@ use serde::Serialize;
|
||||
use stats::{CaConnStats, IntervalEma};
|
||||
use std::collections::{BTreeMap, VecDeque};
|
||||
use std::net::{Ipv4Addr, SocketAddrV4};
|
||||
use std::ops::ControlFlow;
|
||||
use std::pin::Pin;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::Arc;
|
||||
@@ -77,6 +79,9 @@ fn ser_instant<S: serde::Serializer>(val: &Option<Instant>, ser: S) -> Result<S:
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
|
||||
struct Cid(pub u32);
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
enum ChannelError {
|
||||
#[allow(unused)]
|
||||
@@ -93,25 +98,18 @@ enum MonitoringState {
|
||||
FetchSeriesId,
|
||||
AddingEvent(SeriesId),
|
||||
Evented(SeriesId, EventedState),
|
||||
// TODO we also want to read while staying subscribed:
|
||||
#[allow(unused)]
|
||||
Reading,
|
||||
#[allow(unused)]
|
||||
Read,
|
||||
#[allow(unused)]
|
||||
Muted,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
struct CreatedState {
|
||||
#[allow(unused)]
|
||||
cid: u32,
|
||||
cid: Cid,
|
||||
#[allow(unused)]
|
||||
sid: u32,
|
||||
scalar_type: ScalarType,
|
||||
shape: Shape,
|
||||
#[allow(unused)]
|
||||
ts_created: Instant,
|
||||
ts_alive_last: Instant,
|
||||
state: MonitoringState,
|
||||
ts_msp_last: u64,
|
||||
ts_msp_grid_last: u32,
|
||||
@@ -121,13 +119,14 @@ struct CreatedState {
|
||||
insert_recv_ivl_last: Instant,
|
||||
insert_next_earliest: Instant,
|
||||
muted_before: u32,
|
||||
series: Option<SeriesId>,
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
#[derive(Clone, Debug)]
|
||||
enum ChannelState {
|
||||
Init,
|
||||
Creating { cid: u32, ts_beg: Instant },
|
||||
Creating { cid: Cid, ts_beg: Instant },
|
||||
Created(CreatedState),
|
||||
Error(ChannelError),
|
||||
}
|
||||
@@ -185,6 +184,11 @@ impl ChannelState {
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
struct ChannelsStates {
|
||||
channels: BTreeMap<Cid, ChannelState>,
|
||||
}
|
||||
|
||||
enum CaConnState {
|
||||
Unconnected,
|
||||
Connecting(
|
||||
@@ -195,6 +199,7 @@ enum CaConnState {
|
||||
Listen,
|
||||
PeerReady,
|
||||
Wait(Pin<Box<dyn Future<Output = ()> + Send>>),
|
||||
Shutdown,
|
||||
}
|
||||
|
||||
fn wait_fut(dt: u64) -> Pin<Box<dyn Future<Output = ()> + Send>> {
|
||||
@@ -202,11 +207,27 @@ fn wait_fut(dt: u64) -> Pin<Box<dyn Future<Output = ()> + Send>> {
|
||||
Box::pin(fut)
|
||||
}
|
||||
|
||||
struct IdStore {
|
||||
struct CidStore {
|
||||
next: u32,
|
||||
}
|
||||
|
||||
impl IdStore {
|
||||
impl CidStore {
|
||||
fn new() -> Self {
|
||||
Self { next: 0 }
|
||||
}
|
||||
|
||||
fn next(&mut self) -> Cid {
|
||||
self.next += 1;
|
||||
let ret = self.next;
|
||||
Cid(ret)
|
||||
}
|
||||
}
|
||||
|
||||
struct SubidStore {
|
||||
next: u32,
|
||||
}
|
||||
|
||||
impl SubidStore {
|
||||
fn new() -> Self {
|
||||
Self { next: 0 }
|
||||
}
|
||||
@@ -225,6 +246,7 @@ pub enum ConnCommandKind {
|
||||
ChannelStatesAll((), Sender<(SocketAddrV4, Vec<ChannelStateInfo>)>),
|
||||
ChannelAdd(String, Sender<bool>),
|
||||
ChannelRemove(String, Sender<bool>),
|
||||
Shutdown(Sender<bool>),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -280,34 +302,40 @@ impl ConnCommand {
|
||||
};
|
||||
(cmd, rx)
|
||||
}
|
||||
|
||||
pub fn shutdown() -> (ConnCommand, async_channel::Receiver<bool>) {
|
||||
let (tx, rx) = async_channel::bounded(1);
|
||||
let cmd = Self {
|
||||
kind: ConnCommandKind::Shutdown(tx),
|
||||
};
|
||||
(cmd, rx)
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
pub struct CaConn {
|
||||
state: CaConnState,
|
||||
shutdown: bool,
|
||||
proto: Option<CaProto>,
|
||||
cid_store: IdStore,
|
||||
ioid_store: IdStore,
|
||||
subid_store: IdStore,
|
||||
// TODO use a Cid or so instead of u32.
|
||||
channels: BTreeMap<u32, ChannelState>,
|
||||
cid_store: CidStore,
|
||||
subid_store: SubidStore,
|
||||
channels: BTreeMap<Cid, ChannelState>,
|
||||
init_state_count: u64,
|
||||
cid_by_name: BTreeMap<String, u32>,
|
||||
cid_by_subid: BTreeMap<u32, u32>,
|
||||
name_by_cid: BTreeMap<u32, String>,
|
||||
poll_count: usize,
|
||||
cid_by_name: BTreeMap<String, Cid>,
|
||||
cid_by_subid: BTreeMap<u32, Cid>,
|
||||
name_by_cid: BTreeMap<Cid, String>,
|
||||
data_store: Arc<DataStore>,
|
||||
insert_item_queue: VecDeque<QueryItem>,
|
||||
insert_item_sender: CommonInsertItemQueueSender,
|
||||
insert_item_send_fut: Option<async_channel::Send<'static, QueryItem>>,
|
||||
fut_get_series:
|
||||
FuturesOrdered<Pin<Box<dyn Future<Output = Result<(u32, u32, u16, u16, Existence<SeriesId>), Error>> + Send>>>,
|
||||
FuturesOrdered<Pin<Box<dyn Future<Output = Result<(Cid, u32, u16, u16, Existence<SeriesId>), Error>> + Send>>>,
|
||||
remote_addr_dbg: SocketAddrV4,
|
||||
local_epics_hostname: String,
|
||||
array_truncate: usize,
|
||||
stats: Arc<CaConnStats>,
|
||||
insert_queue_max: usize,
|
||||
insert_ivl_min: Arc<AtomicU64>,
|
||||
ts_channel_alive_check_last: Instant,
|
||||
conn_command_tx: async_channel::Sender<ConnCommand>,
|
||||
conn_command_rx: async_channel::Receiver<ConnCommand>,
|
||||
conn_backoff: f32,
|
||||
@@ -327,16 +355,15 @@ impl CaConn {
|
||||
let (cq_tx, cq_rx) = async_channel::bounded(32);
|
||||
Self {
|
||||
state: CaConnState::Unconnected,
|
||||
shutdown: false,
|
||||
proto: None,
|
||||
cid_store: IdStore::new(),
|
||||
ioid_store: IdStore::new(),
|
||||
subid_store: IdStore::new(),
|
||||
cid_store: CidStore::new(),
|
||||
subid_store: SubidStore::new(),
|
||||
channels: BTreeMap::new(),
|
||||
init_state_count: 0,
|
||||
cid_by_name: BTreeMap::new(),
|
||||
cid_by_subid: BTreeMap::new(),
|
||||
name_by_cid: BTreeMap::new(),
|
||||
poll_count: 0,
|
||||
data_store,
|
||||
insert_item_queue: VecDeque::new(),
|
||||
insert_item_sender,
|
||||
@@ -348,6 +375,7 @@ impl CaConn {
|
||||
stats: Arc::new(CaConnStats::new()),
|
||||
insert_queue_max,
|
||||
insert_ivl_min,
|
||||
ts_channel_alive_check_last: Instant::now(),
|
||||
conn_command_tx: cq_tx,
|
||||
conn_command_rx: cq_rx,
|
||||
conn_backoff: 0.02,
|
||||
@@ -363,6 +391,7 @@ impl CaConn {
|
||||
// TODO if this loops for too long time, yield and make sure we get wake up again.
|
||||
use Poll::*;
|
||||
loop {
|
||||
self.stats.caconn_loop3_count_inc();
|
||||
match self.conn_command_rx.poll_next_unpin(cx) {
|
||||
Ready(Some(a)) => match a.kind {
|
||||
ConnCommandKind::FindChannel(pattern, tx) => {
|
||||
@@ -431,7 +460,6 @@ impl CaConn {
|
||||
}
|
||||
}
|
||||
ConnCommandKind::ChannelRemove(name, tx) => {
|
||||
info!("remove {}", name);
|
||||
self.channel_remove(name);
|
||||
match tx.try_send(true) {
|
||||
Ok(_) => {}
|
||||
@@ -440,6 +468,18 @@ impl CaConn {
|
||||
}
|
||||
}
|
||||
}
|
||||
ConnCommandKind::Shutdown(tx) => {
|
||||
self.shutdown = true;
|
||||
let _ = self.before_reset_of_channel_state();
|
||||
self.state = CaConnState::Shutdown;
|
||||
self.proto = None;
|
||||
match tx.try_send(true) {
|
||||
Ok(_) => {}
|
||||
Err(_) => {
|
||||
//error!("response channel full or closed");
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
Ready(None) => {
|
||||
error!("Command queue closed");
|
||||
@@ -472,7 +512,7 @@ impl CaConn {
|
||||
}
|
||||
}
|
||||
|
||||
fn cid_by_name(&mut self, name: &str) -> u32 {
|
||||
fn cid_by_name(&mut self, name: &str) -> Cid {
|
||||
if let Some(cid) = self.cid_by_name.get(name) {
|
||||
*cid
|
||||
} else {
|
||||
@@ -483,7 +523,7 @@ impl CaConn {
|
||||
}
|
||||
}
|
||||
|
||||
fn name_by_cid(&self, cid: u32) -> Option<&str> {
|
||||
fn name_by_cid(&self, cid: Cid) -> Option<&str> {
|
||||
self.name_by_cid.get(&cid).map(|x| x.as_str())
|
||||
}
|
||||
|
||||
@@ -497,9 +537,39 @@ impl CaConn {
|
||||
self.conn_backoff = self.conn_backoff_beg;
|
||||
}
|
||||
|
||||
fn before_reset_of_channel_state(&mut self) -> Result<(), Error> {
|
||||
warn!("before_reset_of_channel_state channels {}", self.channels.len());
|
||||
let mut created = 0;
|
||||
for (_cid, chst) in &self.channels {
|
||||
match chst {
|
||||
ChannelState::Created(st) => {
|
||||
if let Some(series) = &st.series {
|
||||
let item = QueryItem::ChannelStatus(ChannelStatusItem {
|
||||
ts: SystemTime::now(),
|
||||
series: series.clone(),
|
||||
status: ChannelStatus::Closed,
|
||||
});
|
||||
if created < 20 {
|
||||
//info!("store {:?}", item);
|
||||
}
|
||||
self.insert_item_queue.push_back(item);
|
||||
} else {
|
||||
if created < 20 {
|
||||
//info!("no series for cid {:?}", st.cid);
|
||||
}
|
||||
}
|
||||
created += 1;
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_insert_futs(&mut self, cx: &mut Context) -> Poll<Result<(), Error>> {
|
||||
use Poll::*;
|
||||
loop {
|
||||
self.stats.caconn_loop4_count_inc();
|
||||
match self.insert_item_send_fut.as_mut() {
|
||||
Some(fut) => match fut.poll_unpin(cx) {
|
||||
Ready(Ok(())) => {
|
||||
@@ -534,15 +604,50 @@ impl CaConn {
|
||||
}
|
||||
}
|
||||
|
||||
fn check_channels_alive(&mut self) -> Result<(), Error> {
|
||||
let tsnow = Instant::now();
|
||||
let mut alive_count = 0;
|
||||
let mut not_alive_count = 0;
|
||||
for (_, st) in &self.channels {
|
||||
match st {
|
||||
ChannelState::Creating { cid, ts_beg } => {
|
||||
if tsnow.duration_since(*ts_beg) >= Duration::from_millis(10000) {
|
||||
let name = self.name_by_cid.get(cid);
|
||||
warn!("channel Creating timed out {} {:?}", cid.0, name);
|
||||
}
|
||||
}
|
||||
ChannelState::Created(st) => {
|
||||
if tsnow.duration_since(st.ts_alive_last) >= Duration::from_millis(10000) {
|
||||
not_alive_count += 1;
|
||||
} else {
|
||||
alive_count += 1;
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
self.stats
|
||||
.channel_all_count
|
||||
.store(self.channels.len() as _, Ordering::Release);
|
||||
self.stats
|
||||
.channel_alive_count
|
||||
.store(alive_count as _, Ordering::Release);
|
||||
self.stats
|
||||
.channel_not_alive_count
|
||||
.store(not_alive_count as _, Ordering::Release);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn channel_to_evented(
|
||||
&mut self,
|
||||
cid: u32,
|
||||
cid: Cid,
|
||||
sid: u32,
|
||||
data_type: u16,
|
||||
data_count: u16,
|
||||
series: Existence<SeriesId>,
|
||||
cx: &mut Context,
|
||||
) -> Result<(), Error> {
|
||||
let tsnow = Instant::now();
|
||||
self.stats.get_series_id_ok_inc();
|
||||
let series = match series {
|
||||
Existence::Created(k) => k,
|
||||
@@ -577,16 +682,18 @@ impl CaConn {
|
||||
// TODO handle error better! Transition channel to Error state?
|
||||
scalar_type: ScalarType::from_ca_id(data_type)?,
|
||||
shape: Shape::from_ca_count(data_count)?,
|
||||
ts_created: Instant::now(),
|
||||
state: MonitoringState::AddingEvent(series),
|
||||
ts_created: tsnow,
|
||||
ts_alive_last: tsnow,
|
||||
state: MonitoringState::AddingEvent(series.clone()),
|
||||
ts_msp_last: 0,
|
||||
ts_msp_grid_last: 0,
|
||||
inserted_in_ts_msp: u64::MAX,
|
||||
insert_item_ivl_ema: IntervalEma::new(),
|
||||
item_recv_ivl_ema: IntervalEma::new(),
|
||||
insert_recv_ivl_last: Instant::now(),
|
||||
insert_next_earliest: Instant::now(),
|
||||
insert_recv_ivl_last: tsnow,
|
||||
insert_next_earliest: tsnow,
|
||||
muted_before: 0,
|
||||
series: Some(series),
|
||||
});
|
||||
let scalar_type = ScalarType::from_ca_id(data_type)?;
|
||||
let shape = Shape::from_ca_count(data_count)?;
|
||||
@@ -608,6 +715,15 @@ impl CaConn {
|
||||
while self.fut_get_series.len() > 0 {
|
||||
match self.fut_get_series.poll_next_unpin(cx) {
|
||||
Ready(Some(Ok((cid, sid, data_type, data_count, series)))) => {
|
||||
{
|
||||
let series = series.clone().into_inner();
|
||||
let item = QueryItem::ChannelStatus(ChannelStatusItem {
|
||||
ts: SystemTime::now(),
|
||||
series: series,
|
||||
status: ChannelStatus::Opened,
|
||||
});
|
||||
self.insert_item_queue.push_back(item);
|
||||
}
|
||||
match self.channel_to_evented(cid, sid, data_type, data_count, series, cx) {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
@@ -653,7 +769,7 @@ impl CaConn {
|
||||
};
|
||||
let ts_lsp = ts - ts_msp;
|
||||
let item = InsertItem {
|
||||
series: series.id(),
|
||||
series,
|
||||
ts_msp,
|
||||
ts_lsp,
|
||||
msp_bump: ts_msp_changed,
|
||||
@@ -719,13 +835,13 @@ impl CaConn {
|
||||
fn handle_event_add_res(&mut self, ev: proto::EventAddRes, tsnow: Instant) -> Result<(), Error> {
|
||||
// TODO handle subid-not-found which can also be peer error:
|
||||
let cid = *self.cid_by_subid.get(&ev.subid).unwrap();
|
||||
let _name = self.name_by_cid(cid).unwrap().to_string();
|
||||
// TODO get rid of the string clone when I don't want the log output any longer:
|
||||
// TODO handle not-found error:
|
||||
let mut series_2 = None;
|
||||
let ch_s = self.channels.get_mut(&cid).unwrap();
|
||||
match ch_s {
|
||||
ChannelState::Created(st) => {
|
||||
st.ts_alive_last = tsnow;
|
||||
st.item_recv_ivl_ema.tick(tsnow);
|
||||
let scalar_type = st.scalar_type.clone();
|
||||
let shape = st.shape.clone();
|
||||
@@ -790,7 +906,7 @@ impl CaConn {
|
||||
st.insert_recv_ivl_last = tsnow;
|
||||
let ema = st.insert_item_ivl_ema.ema();
|
||||
let item = IvlItem {
|
||||
series: series.id(),
|
||||
series: series.clone(),
|
||||
ts,
|
||||
ema: ema.ema(),
|
||||
emd: ema.emv().sqrt(),
|
||||
@@ -800,7 +916,7 @@ impl CaConn {
|
||||
if false && st.muted_before == 0 {
|
||||
let ema = st.insert_item_ivl_ema.ema();
|
||||
let item = MuteItem {
|
||||
series: series.id(),
|
||||
series,
|
||||
ts,
|
||||
ema: ema.ema(),
|
||||
emd: ema.emv().sqrt(),
|
||||
@@ -811,6 +927,7 @@ impl CaConn {
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
// TODO count instead of print
|
||||
error!("unexpected state: EventAddRes while having {ch_s:?}");
|
||||
}
|
||||
}
|
||||
@@ -870,7 +987,7 @@ impl CaConn {
|
||||
if self.init_state_count == 0 {
|
||||
return Ok(());
|
||||
}
|
||||
let keys: Vec<u32> = self.channels.keys().map(|x| *x).collect();
|
||||
let keys: Vec<Cid> = self.channels.keys().map(|x| *x).collect();
|
||||
for cid in keys {
|
||||
match self.channels.get_mut(&cid).unwrap() {
|
||||
ChannelState::Init => {
|
||||
@@ -881,10 +998,9 @@ impl CaConn {
|
||||
Ok(k) => k,
|
||||
Err(e) => return Err(e),
|
||||
};
|
||||
debug!("Sending CreateChan for {}", name);
|
||||
let msg = CaMsg {
|
||||
ty: CaMsgTy::CreateChan(CreateChan {
|
||||
cid,
|
||||
cid: cid.0,
|
||||
channel: name.into(),
|
||||
}),
|
||||
};
|
||||
@@ -918,7 +1034,6 @@ impl CaConn {
|
||||
ts1 = ts2;
|
||||
let mut do_wake_again = false;
|
||||
if msgs_tmp.len() > 0 {
|
||||
//info!("msgs_tmp.len() {}", msgs_tmp.len());
|
||||
do_wake_again = true;
|
||||
}
|
||||
{
|
||||
@@ -937,11 +1052,12 @@ impl CaConn {
|
||||
CaMsgTy::SearchRes(k) => {
|
||||
let a = k.addr.to_be_bytes();
|
||||
let addr = format!("{}.{}.{}.{}:{}", a[0], a[1], a[2], a[3], k.tcp_port);
|
||||
info!("Search result indicates server address: {addr}");
|
||||
trace!("Search result indicates server address: {addr}");
|
||||
// TODO count this unexpected case.
|
||||
}
|
||||
CaMsgTy::CreateChanRes(k) => {
|
||||
// TODO handle cid-not-found which can also indicate peer error.
|
||||
let cid = k.cid;
|
||||
let cid = Cid(k.cid);
|
||||
let sid = k.sid;
|
||||
// TODO handle error:
|
||||
let name = self.name_by_cid(cid).unwrap().to_string();
|
||||
@@ -962,6 +1078,7 @@ impl CaConn {
|
||||
scalar_type: scalar_type.clone(),
|
||||
shape: shape.clone(),
|
||||
ts_created: tsnow,
|
||||
ts_alive_last: tsnow,
|
||||
state: MonitoringState::FetchSeriesId,
|
||||
ts_msp_last: 0,
|
||||
ts_msp_grid_last: 0,
|
||||
@@ -971,6 +1088,7 @@ impl CaConn {
|
||||
insert_recv_ivl_last: tsnow,
|
||||
insert_next_earliest: tsnow,
|
||||
muted_before: 0,
|
||||
series: None,
|
||||
});
|
||||
// TODO handle error in different way. Should most likely not abort.
|
||||
let cd = ChannelDescDecoded {
|
||||
@@ -989,7 +1107,7 @@ impl CaConn {
|
||||
.get_series_id(cd)
|
||||
.map_ok(move |series| (cid, k.sid, k.data_type, k.data_count, series));
|
||||
// TODO throttle execution rate:
|
||||
self.fut_get_series.push(Box::pin(fut) as _);
|
||||
self.fut_get_series.push_back(Box::pin(fut) as _);
|
||||
do_wake_again = true;
|
||||
}
|
||||
CaMsgTy::EventAddRes(k) => {
|
||||
@@ -1002,7 +1120,13 @@ impl CaConn {
|
||||
let _ = ts1;
|
||||
res?
|
||||
}
|
||||
_ => {}
|
||||
CaMsgTy::Error(e) => {
|
||||
warn!("channel access error message {e:?}");
|
||||
}
|
||||
CaMsgTy::AccessRightsRes(_) => {}
|
||||
k => {
|
||||
warn!("unexpected ca cmd {k:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
@@ -1010,14 +1134,27 @@ impl CaConn {
|
||||
Ready(Some(Ok(())))
|
||||
}
|
||||
Ready(Some(Err(e))) => {
|
||||
error!("CaProto yields error: {e:?}");
|
||||
error!("CaProto yields error: {e:?} remote {:?}", self.remote_addr_dbg);
|
||||
// TODO unify this handling with the block below
|
||||
let reset_res = self.before_reset_of_channel_state();
|
||||
self.state = CaConnState::Wait(wait_fut(self.backoff_next()));
|
||||
self.proto = None;
|
||||
if let Err(e) = reset_res {
|
||||
error!("can not destruct channel state before reset {e:?}");
|
||||
}
|
||||
Ready(Some(Err(e)))
|
||||
}
|
||||
Ready(None) => {
|
||||
warn!("handle_peer_ready CaProto is done {:?}", self.remote_addr_dbg);
|
||||
let reset_res = self.before_reset_of_channel_state();
|
||||
self.state = CaConnState::Wait(wait_fut(self.backoff_next()));
|
||||
self.proto = None;
|
||||
Ready(None)
|
||||
if let Err(e) = reset_res {
|
||||
error!("can not destruct channel state before reset {e:?}");
|
||||
Ready(Some(Err(e)))
|
||||
} else {
|
||||
Ready(None)
|
||||
}
|
||||
}
|
||||
Pending => Pending,
|
||||
};
|
||||
@@ -1029,7 +1166,152 @@ impl CaConn {
|
||||
res
|
||||
}
|
||||
|
||||
//fn loop_inner(&mut self, cx: &mut Context)
|
||||
// `?` works not in here.
|
||||
fn _test_control_flow(&mut self, _cx: &mut Context) -> ControlFlow<Poll<Result<(), Error>>> {
|
||||
use ControlFlow::*;
|
||||
use Poll::*;
|
||||
let e = Error::with_msg_no_trace(format!("test"));
|
||||
//Err(e)?;
|
||||
let _ = e;
|
||||
Break(Pending)
|
||||
}
|
||||
|
||||
// `?` works not in here.
|
||||
fn handle_conn_state(&mut self, cx: &mut Context) -> Option<Poll<Result<(), Error>>> {
|
||||
use Poll::*;
|
||||
match &mut self.state {
|
||||
CaConnState::Unconnected => {
|
||||
let addr = self.remote_addr_dbg.clone();
|
||||
trace!("create tcp connection to {:?}", (addr.ip(), addr.port()));
|
||||
let fut = tokio::time::timeout(Duration::from_millis(500), TcpStream::connect(addr));
|
||||
self.state = CaConnState::Connecting(addr, Box::pin(fut));
|
||||
None
|
||||
}
|
||||
CaConnState::Connecting(ref addr, ref mut fut) => {
|
||||
match fut.poll_unpin(cx) {
|
||||
Ready(connect_result) => {
|
||||
match connect_result {
|
||||
Ok(Ok(tcp)) => {
|
||||
let addr = addr.clone();
|
||||
self.insert_item_queue
|
||||
.push_back(QueryItem::ConnectionStatus(ConnectionStatusItem {
|
||||
ts: SystemTime::now(),
|
||||
addr,
|
||||
status: ConnectionStatus::Established,
|
||||
}));
|
||||
self.backoff_reset();
|
||||
let proto = CaProto::new(tcp, self.remote_addr_dbg.clone(), self.array_truncate);
|
||||
self.state = CaConnState::Init;
|
||||
self.proto = Some(proto);
|
||||
None
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
// TODO log with exponential backoff
|
||||
// 172.26.24.118:2072
|
||||
const ADDR2: Ipv4Addr = Ipv4Addr::new(172, 26, 24, 118);
|
||||
if addr.ip() == &ADDR2 && addr.port() == 2072 {
|
||||
warn!("error during connect to {addr:?} {e:?}");
|
||||
}
|
||||
let addr = addr.clone();
|
||||
self.insert_item_queue
|
||||
.push_back(QueryItem::ConnectionStatus(ConnectionStatusItem {
|
||||
ts: SystemTime::now(),
|
||||
addr,
|
||||
status: ConnectionStatus::ConnectError,
|
||||
}));
|
||||
let dt = self.backoff_next();
|
||||
self.state = CaConnState::Wait(wait_fut(dt));
|
||||
self.proto = None;
|
||||
None
|
||||
}
|
||||
Err(e) => {
|
||||
// TODO log with exponential backoff
|
||||
trace!("timeout during connect to {addr:?} {e:?}");
|
||||
let addr = addr.clone();
|
||||
self.insert_item_queue
|
||||
.push_back(QueryItem::ConnectionStatus(ConnectionStatusItem {
|
||||
ts: SystemTime::now(),
|
||||
addr,
|
||||
status: ConnectionStatus::ConnectTimeout,
|
||||
}));
|
||||
let dt = self.backoff_next();
|
||||
self.state = CaConnState::Wait(wait_fut(dt));
|
||||
self.proto = None;
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
Pending => Some(Pending),
|
||||
}
|
||||
}
|
||||
CaConnState::Init => {
|
||||
let hostname = self.local_epics_hostname.clone();
|
||||
let proto = self.proto.as_mut().unwrap();
|
||||
let msg = CaMsg { ty: CaMsgTy::Version };
|
||||
proto.push_out(msg);
|
||||
let msg = CaMsg {
|
||||
ty: CaMsgTy::ClientName,
|
||||
};
|
||||
proto.push_out(msg);
|
||||
let msg = CaMsg {
|
||||
ty: CaMsgTy::HostName(hostname),
|
||||
};
|
||||
proto.push_out(msg);
|
||||
self.state = CaConnState::Listen;
|
||||
None
|
||||
}
|
||||
CaConnState::Listen => match {
|
||||
let res = self.handle_conn_listen(cx);
|
||||
res
|
||||
} {
|
||||
Ready(Some(Ok(()))) => Some(Ready(Ok(()))),
|
||||
Ready(Some(Err(e))) => Some(Ready(Err(e))),
|
||||
Ready(None) => None,
|
||||
Pending => Some(Pending),
|
||||
},
|
||||
CaConnState::PeerReady => {
|
||||
{
|
||||
// TODO can I move this block somewhere else?
|
||||
match self.handle_get_series_futs(cx) {
|
||||
Ready(Ok(_)) => (),
|
||||
Ready(Err(e)) => return Some(Ready(Err(e))),
|
||||
Pending => (),
|
||||
}
|
||||
}
|
||||
let res = self.handle_peer_ready(cx);
|
||||
match res {
|
||||
Ready(Some(Ok(()))) => None,
|
||||
Ready(Some(Err(e))) => Some(Ready(Err(e))),
|
||||
Ready(None) => None,
|
||||
Pending => Some(Pending),
|
||||
}
|
||||
}
|
||||
CaConnState::Wait(inst) => match inst.poll_unpin(cx) {
|
||||
Ready(_) => {
|
||||
self.state = CaConnState::Unconnected;
|
||||
self.proto = None;
|
||||
None
|
||||
}
|
||||
Pending => Some(Pending),
|
||||
},
|
||||
CaConnState::Shutdown => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn loop_inner(&mut self, cx: &mut Context) -> Option<Poll<Result<(), Error>>> {
|
||||
loop {
|
||||
self.stats.caconn_loop2_count_inc();
|
||||
if let Some(v) = self.handle_conn_state(cx) {
|
||||
break Some(v);
|
||||
}
|
||||
if self.insert_item_queue.len() >= self.insert_queue_max {
|
||||
break None;
|
||||
}
|
||||
if self.shutdown {
|
||||
break None;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for CaConn {
|
||||
@@ -1037,174 +1319,57 @@ impl Stream for CaConn {
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use Poll::*;
|
||||
let ts_outer_1 = Instant::now();
|
||||
let mut ts1 = ts_outer_1;
|
||||
self.poll_count += 1;
|
||||
// TODO factor out the inner loop:
|
||||
let ret = 'outer: loop {
|
||||
self.handle_conn_command(cx);
|
||||
self.stats.caconn_poll_count_inc();
|
||||
let tsnow = Instant::now();
|
||||
if tsnow.duration_since(self.ts_channel_alive_check_last) >= Duration::from_millis(4000) {
|
||||
self.ts_channel_alive_check_last = tsnow;
|
||||
if let Err(e) = self.check_channels_alive() {
|
||||
error!("check_dead_channels {e:?}");
|
||||
}
|
||||
}
|
||||
if self.shutdown {
|
||||
info!("CaConn poll");
|
||||
}
|
||||
let ret = loop {
|
||||
if self.shutdown {
|
||||
info!("CaConn loop 1");
|
||||
}
|
||||
self.stats.caconn_loop1_count_inc();
|
||||
if !self.shutdown {
|
||||
self.handle_conn_command(cx);
|
||||
}
|
||||
let q = self.handle_insert_futs(cx);
|
||||
let ts2 = Instant::now();
|
||||
self.stats
|
||||
.poll_time_handle_insert_futs
|
||||
.fetch_add((ts2.duration_since(ts1) * MS as u32).as_secs(), Ordering::AcqRel);
|
||||
ts1 = ts2;
|
||||
match q {
|
||||
Ready(_) => {}
|
||||
Pending => break Pending,
|
||||
}
|
||||
|
||||
if self.shutdown {
|
||||
if self.insert_item_queue.len() == 0 {
|
||||
info!("no more items to flush");
|
||||
break Ready(Ok(()));
|
||||
} else {
|
||||
info!("more items {}", self.insert_item_queue.len());
|
||||
}
|
||||
}
|
||||
if self.insert_item_queue.len() >= self.insert_queue_max {
|
||||
break Pending;
|
||||
}
|
||||
|
||||
break loop {
|
||||
break match &mut self.state {
|
||||
CaConnState::Unconnected => {
|
||||
let addr = self.remote_addr_dbg.clone();
|
||||
trace!("create tcp connection to {:?}", (addr.ip(), addr.port()));
|
||||
let fut = async move {
|
||||
tokio::time::timeout(Duration::from_millis(500), TcpStream::connect(addr)).await
|
||||
};
|
||||
self.state = CaConnState::Connecting(addr, Box::pin(fut));
|
||||
continue 'outer;
|
||||
}
|
||||
CaConnState::Connecting(ref addr, ref mut fut) => {
|
||||
match fut.poll_unpin(cx) {
|
||||
Ready(connect_result) => {
|
||||
match connect_result {
|
||||
Ok(Ok(tcp)) => {
|
||||
let addr = addr.clone();
|
||||
self.insert_item_queue.push_back(QueryItem::ConnectionStatus(
|
||||
ConnectionStatusItem {
|
||||
ts: SystemTime::now(),
|
||||
addr,
|
||||
status: ConnectionStatus::Established,
|
||||
},
|
||||
));
|
||||
self.backoff_reset();
|
||||
let proto =
|
||||
CaProto::new(tcp, self.remote_addr_dbg.clone(), self.array_truncate);
|
||||
self.state = CaConnState::Init;
|
||||
self.proto = Some(proto);
|
||||
continue 'outer;
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
// TODO log with exponential backoff
|
||||
// 172.26.24.118:2072
|
||||
const ADDR2: Ipv4Addr = Ipv4Addr::new(172, 26, 24, 118);
|
||||
if addr.ip() == &ADDR2 && addr.port() == 2072 {
|
||||
warn!("error during connect to {addr:?} {e:?}");
|
||||
}
|
||||
let addr = addr.clone();
|
||||
self.insert_item_queue.push_back(QueryItem::ConnectionStatus(
|
||||
ConnectionStatusItem {
|
||||
ts: SystemTime::now(),
|
||||
addr,
|
||||
status: ConnectionStatus::ConnectError,
|
||||
},
|
||||
));
|
||||
let dt = self.backoff_next();
|
||||
self.state = CaConnState::Wait(wait_fut(dt));
|
||||
self.proto = None;
|
||||
continue 'outer;
|
||||
}
|
||||
Err(e) => {
|
||||
// TODO log with exponential backoff
|
||||
trace!("timeout during connect to {addr:?} {e:?}");
|
||||
let addr = addr.clone();
|
||||
self.insert_item_queue.push_back(QueryItem::ConnectionStatus(
|
||||
ConnectionStatusItem {
|
||||
ts: SystemTime::now(),
|
||||
addr,
|
||||
status: ConnectionStatus::ConnectTimeout,
|
||||
},
|
||||
));
|
||||
let dt = self.backoff_next();
|
||||
self.state = CaConnState::Wait(wait_fut(dt));
|
||||
self.proto = None;
|
||||
continue 'outer;
|
||||
}
|
||||
}
|
||||
}
|
||||
Pending => Pending,
|
||||
}
|
||||
}
|
||||
CaConnState::Init => {
|
||||
let hostname = self.local_epics_hostname.clone();
|
||||
let proto = self.proto.as_mut().unwrap();
|
||||
let msg = CaMsg { ty: CaMsgTy::Version };
|
||||
proto.push_out(msg);
|
||||
let msg = CaMsg {
|
||||
ty: CaMsgTy::ClientName,
|
||||
};
|
||||
proto.push_out(msg);
|
||||
let msg = CaMsg {
|
||||
ty: CaMsgTy::HostName(hostname),
|
||||
};
|
||||
proto.push_out(msg);
|
||||
self.state = CaConnState::Listen;
|
||||
continue 'outer;
|
||||
}
|
||||
CaConnState::Listen => match {
|
||||
let res = self.handle_conn_listen(cx);
|
||||
let ts2 = Instant::now();
|
||||
self.stats
|
||||
.time_handle_conn_listen
|
||||
.fetch_add((ts2.duration_since(ts1) * MS as u32).as_secs(), Ordering::AcqRel);
|
||||
ts1 = ts2;
|
||||
res
|
||||
} {
|
||||
Ready(Some(Ok(()))) => Ready(Some(Ok(()))),
|
||||
Ready(Some(Err(e))) => Ready(Some(Err(e))),
|
||||
Ready(None) => continue 'outer,
|
||||
Pending => Pending,
|
||||
},
|
||||
CaConnState::PeerReady => {
|
||||
{
|
||||
// TODO can I move this block somewhere else?
|
||||
let _ = self.handle_get_series_futs(cx)?;
|
||||
let ts2 = Instant::now();
|
||||
self.stats
|
||||
.poll_time_get_series_futs
|
||||
.fetch_add((ts2.duration_since(ts1) * MS as u32).as_secs(), Ordering::AcqRel);
|
||||
ts1 = ts2;
|
||||
}
|
||||
let res = self.handle_peer_ready(cx);
|
||||
let ts2 = Instant::now();
|
||||
self.stats.time_handle_peer_ready_dur(ts2.duration_since(ts1));
|
||||
ts1 = ts2;
|
||||
match res {
|
||||
Ready(Some(Ok(()))) => {
|
||||
if self.insert_item_queue.len() >= self.insert_queue_max {
|
||||
continue 'outer;
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
Ready(Some(Err(e))) => Ready(Some(Err(e))),
|
||||
Ready(None) => continue 'outer,
|
||||
Pending => Pending,
|
||||
}
|
||||
}
|
||||
CaConnState::Wait(inst) => match inst.poll_unpin(cx) {
|
||||
Ready(_) => {
|
||||
self.state = CaConnState::Unconnected;
|
||||
self.proto = None;
|
||||
continue 'outer;
|
||||
}
|
||||
Pending => Pending,
|
||||
},
|
||||
};
|
||||
};
|
||||
if !self.shutdown {
|
||||
if let Some(v) = self.loop_inner(cx) {
|
||||
break v;
|
||||
}
|
||||
}
|
||||
};
|
||||
let ts_outer_2 = Instant::now();
|
||||
self.stats.poll_time_all_dur(ts_outer_2.duration_since(ts_outer_1));
|
||||
// TODO currently, this will never stop by itself
|
||||
match &ret {
|
||||
Ready(_) => self.stats.conn_stream_ready_inc(),
|
||||
Pending => self.stats.conn_stream_pending_inc(),
|
||||
}
|
||||
ret
|
||||
if self.shutdown && self.insert_item_queue.len() == 0 {
|
||||
return Ready(None);
|
||||
}
|
||||
match ret {
|
||||
Ready(x) => Ready(Some(x)),
|
||||
Pending => Pending,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -28,6 +28,13 @@ pub struct SearchRes {
|
||||
pub proto_version: u16,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ErrorCmd {
|
||||
pub cid: u32,
|
||||
pub eid: u32,
|
||||
pub msg: String,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ClientNameRes {
|
||||
pub name: String,
|
||||
@@ -183,6 +190,7 @@ pub struct CaEventValue {
|
||||
pub enum CaMsgTy {
|
||||
Version,
|
||||
VersionRes(u16),
|
||||
Error(ErrorCmd),
|
||||
ClientName,
|
||||
ClientNameRes(ClientNameRes),
|
||||
HostName(String),
|
||||
@@ -204,6 +212,7 @@ impl CaMsgTy {
|
||||
match self {
|
||||
Version => 0,
|
||||
VersionRes(_) => 0,
|
||||
Error(_) => 0x0b,
|
||||
ClientName => 0x14,
|
||||
ClientNameRes(_) => 0x14,
|
||||
HostName(_) => 0x15,
|
||||
@@ -229,6 +238,7 @@ impl CaMsgTy {
|
||||
match self {
|
||||
Version => 0,
|
||||
VersionRes(_) => 0,
|
||||
Error(x) => (16 + x.msg.len() + 1 + 7) / 8 * 8,
|
||||
ClientName => 0x10,
|
||||
ClientNameRes(x) => (x.name.len() + 1 + 7) / 8 * 8,
|
||||
HostName(_) => 0x18,
|
||||
@@ -256,6 +266,7 @@ impl CaMsgTy {
|
||||
match self {
|
||||
Version => 0,
|
||||
VersionRes(n) => *n,
|
||||
Error(_) => 0,
|
||||
ClientName => 0,
|
||||
ClientNameRes(_) => 0,
|
||||
HostName(_) => 0,
|
||||
@@ -280,6 +291,7 @@ impl CaMsgTy {
|
||||
match self {
|
||||
Version => CA_PROTO_VERSION,
|
||||
VersionRes(_) => 0,
|
||||
Error(_) => 0,
|
||||
ClientName => 0,
|
||||
ClientNameRes(_) => 0,
|
||||
HostName(_) => 0,
|
||||
@@ -301,6 +313,7 @@ impl CaMsgTy {
|
||||
match self {
|
||||
Version => 0,
|
||||
VersionRes(_) => 0,
|
||||
Error(_) => 0,
|
||||
ClientName => 0,
|
||||
ClientNameRes(_) => 0,
|
||||
HostName(_) => 0,
|
||||
@@ -322,6 +335,7 @@ impl CaMsgTy {
|
||||
match self {
|
||||
Version => 0,
|
||||
VersionRes(_) => 0,
|
||||
Error(_) => 0,
|
||||
ClientName => 0,
|
||||
ClientNameRes(_) => 0,
|
||||
HostName(_) => 0,
|
||||
@@ -343,6 +357,8 @@ impl CaMsgTy {
|
||||
match self {
|
||||
Version => {}
|
||||
VersionRes(_) => {}
|
||||
// Specs: error cmd only from server to client.
|
||||
Error(_) => todo!(),
|
||||
ClientName => {
|
||||
// TODO allow variable client name.
|
||||
let s = "daqingest".as_bytes();
|
||||
@@ -519,9 +535,23 @@ impl CaMsg {
|
||||
|
||||
pub fn from_proto_infos(hi: &HeadInfo, payload: &[u8], array_truncate: usize) -> Result<Self, Error> {
|
||||
let msg = match hi.cmdid {
|
||||
0 => CaMsg {
|
||||
0x00 => CaMsg {
|
||||
ty: CaMsgTy::VersionRes(hi.data_count),
|
||||
},
|
||||
0x0b => {
|
||||
let mut s = String::new();
|
||||
s.extend(format!("{:?}", &payload[..payload.len().min(16)]).chars());
|
||||
if payload.len() >= 17 {
|
||||
s.extend(" msg: ".chars());
|
||||
s.extend(String::from_utf8_lossy(&payload[17..payload.len() - 1]).chars());
|
||||
}
|
||||
let e = ErrorCmd {
|
||||
cid: hi.param1,
|
||||
eid: hi.param2,
|
||||
msg: s,
|
||||
};
|
||||
CaMsg { ty: CaMsgTy::Error(e) }
|
||||
}
|
||||
20 => {
|
||||
let name = std::ffi::CString::new(payload)
|
||||
.map(|s| s.into_string().unwrap_or_else(|e| format!("{e:?}")))
|
||||
|
||||
@@ -12,6 +12,16 @@ pub enum Existence<T> {
|
||||
Existing(T),
|
||||
}
|
||||
|
||||
impl<T> Existence<T> {
|
||||
pub fn into_inner(self) -> T {
|
||||
use Existence::*;
|
||||
match self {
|
||||
Created(x) => x,
|
||||
Existing(x) => x,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Serialize)]
|
||||
pub struct SeriesId(u64);
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use crate::ca::proto::{CaDataArrayValue, CaDataScalarValue, CaDataValue};
|
||||
use crate::ca::store::DataStore;
|
||||
use crate::errconv::ErrConv;
|
||||
use crate::series::SeriesId;
|
||||
use err::Error;
|
||||
use futures_util::{Future, FutureExt};
|
||||
use log::*;
|
||||
@@ -102,21 +103,32 @@ pub struct ConnectionStatusItem {
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum ChannelStatus {
|
||||
Opened = 1,
|
||||
Closed = 2,
|
||||
ClosedUnexpected = 3,
|
||||
Opened,
|
||||
Closed,
|
||||
ClosedUnexpected,
|
||||
}
|
||||
|
||||
impl ChannelStatus {
|
||||
pub fn kind(&self) -> u32 {
|
||||
use ChannelStatus::*;
|
||||
match self {
|
||||
Opened => 1,
|
||||
Closed => 2,
|
||||
ClosedUnexpected => 3,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ChannelStatusItem {
|
||||
pub ts: SystemTime,
|
||||
pub series: u64,
|
||||
pub series: SeriesId,
|
||||
pub status: ChannelStatus,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct InsertItem {
|
||||
pub series: u64,
|
||||
pub series: SeriesId,
|
||||
pub ts_msp: u64,
|
||||
pub ts_lsp: u64,
|
||||
pub msp_bump: bool,
|
||||
@@ -129,7 +141,7 @@ pub struct InsertItem {
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct MuteItem {
|
||||
pub series: u64,
|
||||
pub series: SeriesId,
|
||||
pub ts: u64,
|
||||
pub ema: f32,
|
||||
pub emd: f32,
|
||||
@@ -137,7 +149,7 @@ pub struct MuteItem {
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct IvlItem {
|
||||
pub series: u64,
|
||||
pub series: SeriesId,
|
||||
pub ts: u64,
|
||||
pub ema: f32,
|
||||
pub emd: f32,
|
||||
@@ -191,6 +203,18 @@ impl CommonInsertItemQueue {
|
||||
pub fn receiver(&self) -> async_channel::Receiver<QueryItem> {
|
||||
self.recv.clone()
|
||||
}
|
||||
|
||||
pub fn sender_count(&self) -> usize {
|
||||
self.sender.sender_count()
|
||||
}
|
||||
|
||||
pub fn sender_count2(&self) -> usize {
|
||||
self.recv.sender_count()
|
||||
}
|
||||
|
||||
pub fn receiver_count(&self) -> usize {
|
||||
self.recv.receiver_count()
|
||||
}
|
||||
}
|
||||
|
||||
struct InsParCom {
|
||||
@@ -242,7 +266,7 @@ where
|
||||
|
||||
pub async fn insert_item(item: InsertItem, data_store: &DataStore, stats: &CaConnStats) -> Result<(), Error> {
|
||||
if item.msp_bump {
|
||||
let params = (item.series as i64, item.ts_msp as i64);
|
||||
let params = (item.series.id() as i64, item.ts_msp as i64);
|
||||
data_store
|
||||
.scy
|
||||
.execute(&data_store.qu_insert_ts_msp, params)
|
||||
@@ -252,11 +276,11 @@ pub async fn insert_item(item: InsertItem, data_store: &DataStore, stats: &CaCon
|
||||
}
|
||||
if let Some(ts_msp_grid) = item.ts_msp_grid {
|
||||
let params = (
|
||||
(item.series as i32) & 0xff,
|
||||
(item.series.id() as i32) & 0xff,
|
||||
ts_msp_grid as i32,
|
||||
if item.shape.to_scylla_vec().is_empty() { 0 } else { 1 } as i32,
|
||||
item.scalar_type.to_scylla_i32(),
|
||||
item.series as i64,
|
||||
item.series.id() as i64,
|
||||
);
|
||||
data_store
|
||||
.scy
|
||||
@@ -266,7 +290,7 @@ pub async fn insert_item(item: InsertItem, data_store: &DataStore, stats: &CaCon
|
||||
stats.inserts_msp_grid_inc()
|
||||
}
|
||||
let par = InsParCom {
|
||||
series: item.series,
|
||||
series: item.series.id(),
|
||||
ts_msp: item.ts_msp,
|
||||
ts_lsp: item.ts_lsp,
|
||||
pulse: item.pulse,
|
||||
@@ -325,3 +349,29 @@ pub async fn insert_connection_status(
|
||||
.err_conv()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn insert_channel_status(
|
||||
item: ChannelStatusItem,
|
||||
data_store: &DataStore,
|
||||
_stats: &CaConnStats,
|
||||
) -> Result<(), Error> {
|
||||
let tsunix = item.ts.duration_since(std::time::UNIX_EPOCH).unwrap();
|
||||
let secs = tsunix.as_secs() * netpod::timeunits::SEC;
|
||||
let nanos = tsunix.subsec_nanos() as u64;
|
||||
let ts = secs + nanos;
|
||||
let div = netpod::timeunits::DAY;
|
||||
let ts_msp = ts / div * div;
|
||||
let ts_lsp = ts - ts_msp;
|
||||
let kind = item.status.kind();
|
||||
let series = item.series.id();
|
||||
let params = (series as i64, ts_msp as i64, ts_lsp as i64, kind as i32);
|
||||
data_store
|
||||
.scy
|
||||
.query(
|
||||
"insert into channel_status (series, ts_msp, ts_lsp, kind) values (?, ?, ?, ?)",
|
||||
params,
|
||||
)
|
||||
.await
|
||||
.err_conv()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -156,19 +156,23 @@ stats_proc::stats_struct!((
|
||||
store_worker_item_insert,
|
||||
store_worker_item_drop,
|
||||
store_worker_item_error,
|
||||
poll_time_all,
|
||||
poll_time_handle_insert_futs,
|
||||
poll_time_get_series_futs,
|
||||
caconn_poll_count,
|
||||
caconn_loop1_count,
|
||||
caconn_loop2_count,
|
||||
caconn_loop3_count,
|
||||
caconn_loop4_count,
|
||||
time_handle_conn_listen,
|
||||
time_handle_peer_ready,
|
||||
time_check_channels_state_init,
|
||||
time_handle_event_add_res,
|
||||
ioc_lookup,
|
||||
tcp_connected,
|
||||
get_series_id_ok,
|
||||
conn_item_count,
|
||||
conn_stream_ready,
|
||||
conn_stream_pending,
|
||||
channel_all_count,
|
||||
channel_alive_count,
|
||||
channel_not_alive_count,
|
||||
ca_ts_off_1,
|
||||
ca_ts_off_2,
|
||||
ca_ts_off_3,
|
||||
|
||||
Reference in New Issue
Block a user