Files
daqingest/netfetch/src/ca/conn.rs
2024-01-19 16:23:56 +01:00

2572 lines
94 KiB
Rust

use super::proto;
use super::proto::CaEventValue;
use super::proto::ReadNotify;
use super::ExtraInsertsConf;
use crate::ca::proto::EventCancel;
use crate::senderpolling::SenderPolling;
use crate::throttletrace::ThrottleTrace;
use async_channel::Receiver;
use async_channel::Sender;
use core::fmt;
use dbpg::seriesbychannel::ChannelInfoQuery;
use err::Error;
use futures_util::Future;
use futures_util::FutureExt;
use futures_util::Stream;
use futures_util::StreamExt;
use log::*;
use netpod::timeunits::*;
use netpod::ScalarType;
use netpod::Shape;
use netpod::TsNano;
use proto::CaItem;
use proto::CaMsg;
use proto::CaMsgTy;
use proto::CaProto;
use proto::CreateChan;
use proto::EventAdd;
use scywr::iteminsertqueue as scywriiq;
use scywr::iteminsertqueue::DataValue;
use scywriiq::ChannelInfoItem;
use scywriiq::ChannelStatus;
use scywriiq::ChannelStatusClosedReason;
use scywriiq::ChannelStatusItem;
use scywriiq::ConnectionStatus;
use scywriiq::ConnectionStatusItem;
use scywriiq::IvlItem;
use scywriiq::MuteItem;
use scywriiq::QueryItem;
use serde::Deserialize;
use serde::Serialize;
use series::ChannelStatusSeriesId;
use series::SeriesId;
use serieswriter::writer::EstablishWorkerJob;
use serieswriter::writer::JobId;
use serieswriter::writer::SeriesWriter;
use stats::rand_xoshiro::rand_core::RngCore;
use stats::rand_xoshiro::rand_core::SeedableRng;
use stats::rand_xoshiro::Xoshiro128PlusPlus;
use stats::CaConnStats;
use stats::CaProtoStats;
use stats::IntervalEma;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::collections::HashSet;
use std::collections::VecDeque;
use std::net::SocketAddrV4;
use std::ops::ControlFlow;
use std::pin::Pin;
use std::sync::atomic;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
use std::time::Duration;
use std::time::Instant;
use std::time::SystemTime;
use taskrun::tokio;
use tokio::net::TcpStream;
const CONNECTING_TIMEOUT: Duration = Duration::from_millis(6000);
const IOC_PING_IVL: Duration = Duration::from_millis(80000);
#[allow(unused)]
macro_rules! trace2 {
($($arg:tt)*) => {
if false {
trace!($($arg)*);
}
};
}
#[allow(unused)]
macro_rules! trace3 {
($($arg:tt)*) => {
if false {
trace!($($arg)*);
}
};
}
#[allow(unused)]
macro_rules! trace4 {
($($arg:tt)*) => {
if false {
trace!($($arg)*);
}
};
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum ChannelConnectedInfo {
Disconnected,
Connecting,
Connected,
Error,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ChannelStateInfo {
pub cssid: ChannelStatusSeriesId,
pub addr: SocketAddrV4,
pub series: Option<SeriesId>,
pub channel_connected_info: ChannelConnectedInfo,
pub scalar_type: Option<ScalarType>,
pub shape: Option<Shape>,
// NOTE: this solution can yield to the same Instant serialize to different string representations.
// #[serde(skip_serializing_if = "Option::is_none")]
#[serde(with = "ser_instant")]
pub ts_created: Option<Instant>,
// #[serde(skip_serializing_if = "Option::is_none")]
#[serde(with = "ser_instant")]
pub ts_event_last: Option<Instant>,
pub recv_count: Option<u64>,
pub recv_bytes: Option<u64>,
// #[serde(skip_serializing_if = "Option::is_none")]
pub item_recv_ivl_ema: Option<f32>,
pub interest_score: f32,
}
mod ser_instant {
use super::*;
use serde::Deserializer;
use serde::Serializer;
pub fn serialize<S>(val: &Option<Instant>, ser: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match val {
Some(val) => {
let now = chrono::Utc::now();
let tsnow = Instant::now();
let t1 = if tsnow >= *val {
let dur = tsnow.duration_since(*val);
let dur2 = chrono::Duration::seconds(dur.as_secs() as i64)
.checked_add(&chrono::Duration::microseconds(dur.subsec_micros() as i64))
.unwrap();
now.checked_sub_signed(dur2).unwrap()
} else {
let dur = (*val).duration_since(tsnow);
let dur2 = chrono::Duration::seconds(dur.as_secs() as i64)
.checked_sub(&chrono::Duration::microseconds(dur.subsec_micros() as i64))
.unwrap();
now.checked_add_signed(dur2).unwrap()
};
//info!("formatting {:?}", t1);
let s = t1.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string();
//info!("final string {:?}", s);
ser.serialize_str(&s)
}
None => ser.serialize_none(),
}
}
pub fn deserialize<'de, D>(de: D) -> Result<Option<Instant>, D::Error>
where
D: Deserializer<'de>,
{
let e = serde::de::Error::custom("todo deserialize for ser_instant");
Err(e)
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
struct Cid(pub u32);
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
struct Subid(pub u32);
impl Subid {
pub fn to_u32(&self) -> u32 {
self.0
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
struct Sid(pub u32);
impl Sid {
pub fn to_u32(&self) -> u32 {
self.0
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
struct Ioid(pub u32);
#[derive(Clone, Debug)]
enum ChannelError {
CreateChanFail(ChannelStatusSeriesId),
}
#[derive(Debug, Clone)]
struct CreatingState {
tsbeg: Instant,
cssid: ChannelStatusSeriesId,
cid: Cid,
}
#[derive(Debug, Clone)]
struct MakingSeriesWriterState {
tsbeg: Instant,
channel: CreatedState,
}
#[derive(Debug, Clone)]
struct EnableMonitoringState {
tsbeg: Instant,
subid: Subid,
}
#[derive(Debug, Clone)]
struct MonitoringState {
tsbeg: Instant,
subid: Subid,
}
#[derive(Debug, Clone)]
struct StopMonitoringForPollingState {
tsbeg: Instant,
}
#[derive(Debug, Clone)]
struct PollingState {
tsbeg: Instant,
poll_ivl: Duration,
tick: PollTickState,
}
#[derive(Debug, Clone)]
enum PollTickState {
Idle(Instant),
Wait(Instant, Ioid),
}
#[derive(Debug)]
struct WritableState {
tsbeg: Instant,
channel: CreatedState,
writer: SeriesWriter,
reading: ReadingState,
}
#[derive(Debug, Clone)]
enum ReadingState {
EnableMonitoring(EnableMonitoringState),
Monitoring(MonitoringState),
StopMonitoringForPolling(StopMonitoringForPollingState),
Polling(PollingState),
}
#[derive(Debug, Clone)]
struct CreatedState {
cssid: ChannelStatusSeriesId,
cid: Cid,
sid: Sid,
ca_dbr_type: u16,
ca_dbr_count: u16,
ts_created: Instant,
ts_alive_last: Instant,
ts_msp_last: u64,
ts_msp_grid_last: u32,
inserted_in_ts_msp: u64,
insert_item_ivl_ema: IntervalEma,
item_recv_ivl_ema: IntervalEma,
insert_recv_ivl_last: Instant,
insert_next_earliest: Instant,
muted_before: u32,
insert_ivl_min_mus: u32,
info_store_msp_last: u32,
recv_count: u64,
recv_bytes: u64,
stwin_ts: u64,
stwin_count: u32,
stwin_bytes: u32,
}
impl CreatedState {
fn dummy() -> Self {
let tsnow = Instant::now();
Self {
cssid: ChannelStatusSeriesId::new(0),
cid: Cid(0),
sid: Sid(0),
ca_dbr_type: 0,
ca_dbr_count: 0,
ts_created: tsnow,
ts_alive_last: tsnow,
ts_msp_last: 0,
ts_msp_grid_last: 0,
inserted_in_ts_msp: 0,
insert_item_ivl_ema: IntervalEma::new(),
item_recv_ivl_ema: IntervalEma::new(),
insert_recv_ivl_last: tsnow,
insert_next_earliest: tsnow,
muted_before: 0,
insert_ivl_min_mus: 0,
info_store_msp_last: 0,
recv_count: 0,
recv_bytes: 0,
stwin_ts: 0,
stwin_count: 0,
stwin_bytes: 0,
}
}
}
#[derive(Debug)]
enum ChannelState {
Init(ChannelStatusSeriesId),
Creating(CreatingState),
MakingSeriesWriter(MakingSeriesWriterState),
Writable(WritableState),
Error(ChannelError),
Ended(ChannelStatusSeriesId),
}
impl ChannelState {
fn to_info(&self, cssid: ChannelStatusSeriesId, addr: SocketAddrV4) -> ChannelStateInfo {
let channel_connected_info = match self {
ChannelState::Init(..) => ChannelConnectedInfo::Disconnected,
ChannelState::Creating { .. } => ChannelConnectedInfo::Connecting,
ChannelState::MakingSeriesWriter(_) => ChannelConnectedInfo::Connecting,
ChannelState::Writable(_) => ChannelConnectedInfo::Connected,
ChannelState::Error(_) => ChannelConnectedInfo::Error,
ChannelState::Ended(_) => ChannelConnectedInfo::Disconnected,
};
let scalar_type = match self {
ChannelState::Writable(s) => Some(s.writer.scalar_type().clone()),
_ => None,
};
let shape = match self {
ChannelState::Writable(s) => Some(s.writer.shape().clone()),
_ => None,
};
let ts_created = match self {
ChannelState::Writable(s) => Some(s.channel.ts_created.clone()),
_ => None,
};
let ts_event_last = match self {
ChannelState::Writable(s) => Some(s.channel.ts_alive_last),
_ => None,
};
let recv_count = match self {
ChannelState::Writable(s) => Some(s.channel.recv_count),
_ => None,
};
let recv_bytes = match self {
ChannelState::Writable(s) => Some(s.channel.recv_bytes),
_ => None,
};
let item_recv_ivl_ema = match self {
ChannelState::Writable(s) => {
let ema = s.channel.item_recv_ivl_ema.ema();
if ema.update_count() == 0 {
None
} else {
Some(ema.ema())
}
}
_ => None,
};
let series = match self {
ChannelState::Writable(s) => Some(s.writer.sid()),
_ => None,
};
let interest_score = 1. / item_recv_ivl_ema.unwrap_or(1e10).max(1e-6).min(1e10);
ChannelStateInfo {
cssid,
addr,
series,
channel_connected_info,
scalar_type,
shape,
ts_created,
ts_event_last,
recv_count,
recv_bytes,
item_recv_ivl_ema,
interest_score,
}
}
fn cssid(&self) -> ChannelStatusSeriesId {
match self {
ChannelState::Init(cssid) => cssid.clone(),
ChannelState::Creating(st) => st.cssid.clone(),
ChannelState::MakingSeriesWriter(st) => st.channel.cssid.clone(),
ChannelState::Writable(st) => st.channel.cssid.clone(),
ChannelState::Error(e) => match e {
ChannelError::CreateChanFail(cssid) => cssid.clone(),
},
ChannelState::Ended(cssid) => cssid.clone(),
}
}
}
enum CaConnState {
Unconnected(Instant),
Connecting(
Instant,
SocketAddrV4,
Pin<Box<dyn Future<Output = Result<Result<TcpStream, std::io::Error>, tokio::time::error::Elapsed>> + Send>>,
),
Init,
Handshake,
PeerReady,
Wait(Pin<Box<dyn Future<Output = ()> + Send>>),
Shutdown,
EndOfStream,
}
impl fmt::Debug for CaConnState {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::Unconnected(since) => fmt.debug_tuple("Unconnected").field(since).finish(),
Self::Connecting(since, addr, _) => fmt.debug_tuple("Connecting").field(since).field(addr).finish(),
Self::Init => write!(fmt, "Init"),
Self::Handshake => write!(fmt, "Handshake"),
Self::PeerReady => write!(fmt, "PeerReady"),
Self::Wait(_) => fmt.debug_tuple("Wait").finish(),
Self::Shutdown => write!(fmt, "Shutdown"),
Self::EndOfStream => write!(fmt, "EndOfStream"),
}
}
}
fn wait_fut(dt: u64) -> Pin<Box<dyn Future<Output = ()> + Send>> {
let fut = tokio::time::sleep(Duration::from_millis(dt));
Box::pin(fut)
}
struct CidStore {
cnt: u32,
rng: Xoshiro128PlusPlus,
reg: HashSet<u32>,
}
impl CidStore {
fn new(seed: u32) -> Self {
Self {
cnt: 0,
rng: Xoshiro128PlusPlus::seed_from_u64(seed as _),
reg: HashSet::new(),
}
}
fn new_from_time() -> Self {
Self::new(
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.subsec_nanos(),
)
}
fn next(&mut self) -> Cid {
if true {
let cnt = self.cnt;
self.cnt += 1;
return Cid(cnt);
}
let c = self.cnt << 8;
self.cnt += 1;
let r = self.rng.next_u32();
Cid(c | ((r ^ (r >> 8) ^ (r >> 16) ^ (r >> 24)) & 0xff))
}
}
struct SubidStore {
cnt: u32,
rng: Xoshiro128PlusPlus,
}
impl SubidStore {
fn new(seed: u32) -> Self {
Self {
cnt: 0,
rng: Xoshiro128PlusPlus::seed_from_u64(seed as _),
}
}
fn new_from_time() -> Self {
Self::new(
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.subsec_nanos(),
)
}
fn next(&mut self) -> Subid {
if true {
let cnt = self.cnt;
self.cnt += 1;
return Subid(cnt);
}
let c = self.cnt << 8;
self.cnt += 1;
let r = self.rng.next_u32();
let r = r ^ (r >> 8);
let r = r ^ (r >> 8);
let r = r ^ (r >> 8);
Subid(c | r)
}
}
fn info_store_msp_from_time(ts: SystemTime) -> u32 {
let dt = ts.duration_since(SystemTime::UNIX_EPOCH).unwrap_or(Duration::ZERO);
(dt.as_secs() / 60 * 60) as u32
}
pub type CmdResTx = Sender<Result<(), Error>>;
#[derive(Debug)]
pub enum ConnCommandKind {
ChannelAdd(String, ChannelStatusSeriesId),
ChannelClose(String),
Shutdown,
}
#[derive(Debug)]
pub struct ConnCommand {
id: usize,
kind: ConnCommandKind,
}
impl ConnCommand {
pub fn channel_add(name: String, cssid: ChannelStatusSeriesId) -> Self {
Self {
id: Self::make_id(),
kind: ConnCommandKind::ChannelAdd(name, cssid),
}
}
pub fn channel_close(name: String) -> Self {
Self {
id: Self::make_id(),
kind: ConnCommandKind::ChannelClose(name),
}
}
pub fn shutdown() -> Self {
Self {
id: Self::make_id(),
kind: ConnCommandKind::Shutdown,
}
}
fn make_id() -> usize {
static ID: AtomicUsize = AtomicUsize::new(0);
ID.fetch_add(1, atomic::Ordering::AcqRel)
}
pub fn id(&self) -> usize {
self.id
}
}
#[derive(Debug)]
pub struct ChannelStatusPartial {
pub channel_statuses: BTreeMap<ChannelStatusSeriesId, ChannelStateInfo>,
}
#[derive(Debug)]
pub enum ConnCommandResultKind {
Unused,
}
#[derive(Debug)]
pub struct ConnCommandResult {
pub id: usize,
pub kind: ConnCommandResultKind,
}
impl ConnCommandResult {
pub fn id(&self) -> usize {
self.id
}
fn make_id() -> usize {
static ID: AtomicUsize = AtomicUsize::new(0);
ID.fetch_add(1, atomic::Ordering::AcqRel)
}
}
#[derive(Debug)]
pub enum CaConnEventValue {
None,
EchoTimeout,
ConnCommandResult(ConnCommandResult),
ChannelStatus(ChannelStatusPartial),
QueryItem(QueryItem),
ChannelCreateFail(String),
EndOfStream,
ConnectFail,
}
#[derive(Debug)]
pub struct CaConnEvent {
pub ts: Instant,
pub value: CaConnEventValue,
}
pub struct CaConnOpts {
insert_queue_max: usize,
array_truncate: usize,
}
impl CaConnOpts {
pub fn with_insert_queue_max(mut self, val: usize) -> Self {
self.insert_queue_max = val;
self
}
}
impl Default for CaConnOpts {
fn default() -> Self {
Self {
insert_queue_max: 20000,
array_truncate: 2000,
}
}
}
pub struct CaConn {
opts: CaConnOpts,
backend: String,
state: CaConnState,
ticker: Pin<Box<tokio::time::Sleep>>,
proto: Option<CaProto>,
cid_store: CidStore,
subid_store: SubidStore,
channels: HashMap<Cid, ChannelState>,
// btree because require order:
cid_by_name: BTreeMap<String, Cid>,
cid_by_subid: HashMap<Subid, Cid>,
cid_by_sid: HashMap<Sid, Cid>,
name_by_cid: HashMap<Cid, String>,
channel_status_emit_last: Instant,
tick_last_writer: Instant,
init_state_count: u64,
insert_item_queue: VecDeque<QueryItem>,
remote_addr_dbg: SocketAddrV4,
local_epics_hostname: String,
stats: Arc<CaConnStats>,
insert_ivl_min_mus: u32,
conn_command_tx: Pin<Box<Sender<ConnCommand>>>,
conn_command_rx: Pin<Box<Receiver<ConnCommand>>>,
conn_backoff: f32,
conn_backoff_beg: f32,
extra_inserts_conf: ExtraInsertsConf,
ioc_ping_last: Instant,
ioc_ping_next: Instant,
ioc_ping_start: Option<Instant>,
storage_insert_sender: Pin<Box<SenderPolling<VecDeque<QueryItem>>>>,
ca_conn_event_out_queue: VecDeque<CaConnEvent>,
ca_conn_event_out_queue_max: usize,
thr_msg_poll: ThrottleTrace,
ca_proto_stats: Arc<CaProtoStats>,
weird_count: usize,
rng: Xoshiro128PlusPlus,
writer_establish_qu: VecDeque<EstablishWorkerJob>,
writer_establish_tx: Pin<Box<SenderPolling<EstablishWorkerJob>>>,
writer_tx: Sender<(JobId, Result<SeriesWriter, serieswriter::writer::Error>)>,
writer_rx: Pin<Box<Receiver<(JobId, Result<SeriesWriter, serieswriter::writer::Error>)>>>,
tmp_ts_poll: SystemTime,
poll_tsnow: Instant,
ioid: u32,
read_ioids: HashMap<Ioid, Cid>,
}
impl Drop for CaConn {
fn drop(&mut self) {
debug!("drop CaConn {}", self.remote_addr_dbg);
}
}
impl CaConn {
pub fn new(
opts: CaConnOpts,
backend: String,
remote_addr_dbg: SocketAddrV4,
local_epics_hostname: String,
storage_insert_tx: Sender<VecDeque<QueryItem>>,
channel_info_query_tx: Sender<ChannelInfoQuery>,
stats: Arc<CaConnStats>,
ca_proto_stats: Arc<CaProtoStats>,
writer_establish_tx: Sender<EstablishWorkerJob>,
) -> Self {
let _ = channel_info_query_tx;
let tsnow = Instant::now();
let (writer_tx, writer_rx) = async_channel::bounded(32);
let (cq_tx, cq_rx) = async_channel::bounded(32);
let mut rng = stats::xoshiro_from_time();
Self {
opts,
backend,
state: CaConnState::Unconnected(tsnow),
ticker: Self::new_self_ticker(),
proto: None,
cid_store: CidStore::new_from_time(),
subid_store: SubidStore::new_from_time(),
init_state_count: 0,
channels: HashMap::new(),
cid_by_name: BTreeMap::new(),
cid_by_subid: HashMap::new(),
cid_by_sid: HashMap::new(),
name_by_cid: HashMap::new(),
channel_status_emit_last: tsnow,
tick_last_writer: tsnow,
insert_item_queue: VecDeque::new(),
remote_addr_dbg,
local_epics_hostname,
stats,
insert_ivl_min_mus: 1000 * 4,
conn_command_tx: Box::pin(cq_tx),
conn_command_rx: Box::pin(cq_rx),
conn_backoff: 0.02,
conn_backoff_beg: 0.02,
extra_inserts_conf: ExtraInsertsConf::new(),
ioc_ping_last: tsnow,
ioc_ping_next: tsnow + Self::ioc_ping_ivl_rng(&mut rng),
ioc_ping_start: None,
storage_insert_sender: Box::pin(SenderPolling::new(storage_insert_tx)),
ca_conn_event_out_queue: VecDeque::new(),
ca_conn_event_out_queue_max: 2000,
thr_msg_poll: ThrottleTrace::new(Duration::from_millis(10000)),
ca_proto_stats,
weird_count: 0,
rng,
writer_establish_qu: VecDeque::new(),
writer_establish_tx: Box::pin(SenderPolling::new(writer_establish_tx)),
writer_tx,
writer_rx: Box::pin(writer_rx),
tmp_ts_poll: SystemTime::now(),
poll_tsnow: tsnow,
ioid: 100,
read_ioids: HashMap::new(),
}
}
fn ioc_ping_ivl_rng(rng: &mut Xoshiro128PlusPlus) -> Duration {
IOC_PING_IVL * 100 / (70 + (rng.next_u32() % 60))
}
fn new_self_ticker() -> Pin<Box<tokio::time::Sleep>> {
Box::pin(tokio::time::sleep(Duration::from_millis(500)))
}
pub fn conn_command_tx(&self) -> Sender<ConnCommand> {
self.conn_command_tx.as_ref().get_ref().clone()
}
fn is_shutdown(&self) -> bool {
if let CaConnState::Shutdown = self.state {
true
} else {
false
}
}
fn trigger_shutdown(&mut self, channel_reason: ChannelStatusClosedReason) {
self.state = CaConnState::Shutdown;
self.proto = None;
match &channel_reason {
ChannelStatusClosedReason::ShutdownCommand => {}
ChannelStatusClosedReason::ChannelRemove => {}
ChannelStatusClosedReason::ProtocolError => {}
ChannelStatusClosedReason::FrequencyQuota => {}
ChannelStatusClosedReason::BandwidthQuota => {}
ChannelStatusClosedReason::InternalError => {}
ChannelStatusClosedReason::IocTimeout => {}
ChannelStatusClosedReason::NoProtocol => {}
ChannelStatusClosedReason::ProtocolDone => {}
ChannelStatusClosedReason::ConnectFail => {
debug!("emit status ConnectFail");
let item = CaConnEvent {
ts: Instant::now(),
value: CaConnEventValue::ConnectFail,
};
self.ca_conn_event_out_queue.push_back(item);
}
}
self.channel_state_on_shutdown(channel_reason);
let addr = self.remote_addr_dbg.clone();
self.insert_item_queue
.push_back(QueryItem::ConnectionStatus(ConnectionStatusItem {
ts: self.tmp_ts_poll,
addr,
// TODO map to appropriate status
status: ConnectionStatus::Closing,
}));
}
fn cmd_check_health(&mut self) {
// TODO
// no longer in use.
// CaConn emits health updates by iteself.
// Make sure that we do also the checks here on regular intervals.
trace!("cmd_check_health");
// TODO
// what actions are taken here?
// what status is modified here?
match self.check_channels_alive() {
Ok(_) => {}
Err(e) => {
error!("{e}");
self.trigger_shutdown(ChannelStatusClosedReason::InternalError);
}
}
// TODO
// Time this, is it fast enough?
// let mut kit = self.cid_by_name.values();
// if let Some(mut kk) = kit.next().map(Clone::clone) {
// let mut start = Some(kk.clone());
// if let Some(last) = self.channel_status_last_done.take() {
// while kk <= last {
// kk = if let Some(x) = kit.next().map(Clone::clone) {
// start = Some(x.clone());
// x
// } else {
// start = None;
// break;
// };
// }
// }
// if let Some(mut kk) = start {
// loop {
// kk = if let Some(x) = kit.next().map(Clone::clone) {
// x
// } else {
// break;
// };
// }
// } else {
// // Nothing to do, will continue on next call from front.
// }
// }
// while let Some(kk) = kit.next() {}
// let mut channel_statuses = BTreeMap::new();
// for (k, v) in self.channels.iter() {
// let info = v.to_info(v.cssid(), self.remote_addr_dbg);
// channel_statuses.insert(v.cssid(), info);
// }
}
fn cmd_find_channel(&self, pattern: &str) {
let res = if let Ok(re) = regex::Regex::new(&pattern) {
self.name_by_cid
.values()
.filter(|x| re.is_match(x))
.map(ToString::to_string)
.collect()
} else {
Vec::new()
};
// TODO return the result
}
fn cmd_channel_state(&self, name: String) {
let res = match self.cid_by_name(&name) {
Some(cid) => match self.channels.get(&cid) {
Some(state) => Some(state.to_info(state.cssid(), self.remote_addr_dbg.clone())),
None => None,
},
None => None,
};
let msg = (self.remote_addr_dbg.clone(), res);
if msg.1.is_some() {
info!("Sending back {msg:?}");
}
// TODO return the result
}
fn cmd_channel_states_all(&self) {
let res: Vec<_> = self
.channels
.iter()
.map(|(cid, state)| {
// let name = self
// .name_by_cid
// .get(cid)
// .map_or("--unknown--".into(), |x| x.to_string());
state.to_info(state.cssid(), self.remote_addr_dbg.clone())
})
.collect();
let msg = (self.remote_addr_dbg.clone(), res);
// TODO return the result
}
fn cmd_channel_add(&mut self, name: String, cssid: ChannelStatusSeriesId) {
self.channel_add(name, cssid);
}
fn cmd_channel_close(&mut self, name: String) {
self.channel_close(name);
// TODO return the result
//self.stats.caconn_command_can_not_reply.inc();
}
fn cmd_shutdown(&mut self) {
debug!("cmd_shutdown {}", self.remote_addr_dbg);
self.trigger_shutdown(ChannelStatusClosedReason::ShutdownCommand);
}
fn cmd_extra_inserts_conf(&mut self, extra_inserts_conf: ExtraInsertsConf) {
self.extra_inserts_conf = extra_inserts_conf;
// TODO return the result
}
fn cmd_save_conn_info(&mut self) {
let res = self.emit_channel_info_insert_items();
// TODO return the result
}
fn handle_conn_command(&mut self, cx: &mut Context) -> Result<Poll<Option<()>>, Error> {
use Poll::*;
self.stats.loop3_count.inc();
if self.is_shutdown() {
Ok(Ready(None))
} else {
let rx = self.conn_command_rx.as_mut();
match rx.poll_next(cx) {
Ready(Some(a)) => {
trace3!("handle_conn_command received a command {}", self.remote_addr_dbg);
match a.kind {
ConnCommandKind::ChannelAdd(name, cssid) => {
self.cmd_channel_add(name, cssid);
Ok(Ready(Some(())))
}
ConnCommandKind::ChannelClose(name) => {
self.cmd_channel_close(name);
Ok(Ready(Some(())))
}
ConnCommandKind::Shutdown => {
self.cmd_shutdown();
Ok(Ready(Some(())))
}
}
}
Ready(None) => {
error!("command queue closed");
Ok(Ready(None))
}
Pending => Ok(Pending),
}
}
}
fn handle_writer_establish_result(&mut self, cx: &mut Context) -> Result<Poll<Option<()>>, Error> {
use Poll::*;
if self.is_shutdown() {
Ok(Ready(None))
} else {
let rx = self.writer_rx.as_mut();
match rx.poll_next(cx) {
Ready(Some(res)) => {
trace!("handle_writer_establish_result recv {}", self.remote_addr_dbg);
let jobid = res.0;
// by convention:
let cid = Cid(jobid.0 as _);
match res.1 {
Ok(wr) => {
self.handle_writer_establish_inner(cid, wr)?;
Ok(Ready(Some(())))
}
Err(e) => Err(Error::from_string(e.to_string())),
}
}
Ready(None) => {
error!("writer_establish queue closed");
Ok(Ready(None))
}
Pending => Ok(Pending),
}
}
}
fn handle_writer_establish_inner(&mut self, cid: Cid, writer: SeriesWriter) -> Result<(), Error> {
trace!("handle_writer_establish_inner {cid:?}");
// At this point we have created the channel and created a writer for that type and sid.
// We do not yet monitor.
// TODO main objectives now:
// Store the writer with the channel state.
// Create a monitor for the channel.
// NOTE: must store the Writer even if not yet in Evented, we could also transition to Polled!
if let Some(chst) = self.channels.get_mut(&cid) {
if let ChannelState::MakingSeriesWriter(st2) = chst {
self.stats.get_series_id_ok.inc();
{
let item = QueryItem::ChannelStatus(ChannelStatusItem {
ts: self.tmp_ts_poll,
cssid: st2.channel.cssid.clone(),
status: ChannelStatus::Opened,
});
self.insert_item_queue.push_back(item);
}
let subid = {
let subid = self.subid_store.next();
self.cid_by_subid.insert(subid, cid);
trace!(
"new {:?} for {:?} chst {:?} {:?}",
subid,
cid,
st2.channel.cid,
st2.channel.sid
);
subid
};
{
trace!("send out EventAdd for {cid:?}");
let ty = CaMsgTy::EventAdd(EventAdd {
sid: st2.channel.sid.to_u32(),
data_type: st2.channel.ca_dbr_type,
data_count: st2.channel.ca_dbr_count,
subid: subid.to_u32(),
});
let msg = CaMsg::from_ty_ts(ty, self.poll_tsnow);
let proto = self.proto.as_mut().unwrap();
proto.push_out(msg);
}
let created_state = WritableState {
tsbeg: self.poll_tsnow,
channel: std::mem::replace(&mut st2.channel, CreatedState::dummy()),
writer,
reading: ReadingState::EnableMonitoring(EnableMonitoringState {
tsbeg: self.poll_tsnow,
subid,
}),
};
*chst = ChannelState::Writable(created_state);
Ok(())
} else {
warn!("TODO handle_series_lookup_result channel in bad state, reset");
Ok(())
}
} else {
warn!("TODO handle_series_lookup_result channel in bad state, reset");
Ok(())
}
}
pub fn stats(&self) -> Arc<CaConnStats> {
self.stats.clone()
}
pub fn channel_add(&mut self, channel: String, cssid: ChannelStatusSeriesId) {
if self.cid_by_name(&channel).is_some() {
// TODO count for metrics
return;
}
let cid = self.cid_by_name_or_insert(&channel);
if self.channels.contains_key(&cid) {
error!("logic error channel already exists {channel}");
} else {
self.channels.insert(cid, ChannelState::Init(cssid));
// TODO do not count, use separate queue for those channels.
self.init_state_count += 1;
}
}
pub fn channel_close(&mut self, name: String) {
error!("TODO actually cause the channel to get closed and removed {}", name);
}
pub fn channel_remove(&mut self, name: String) {
if let Some(cid) = self.cid_by_name(&name) {
self.channel_remove_by_cid(cid);
} else {
warn!("channel_remove does not exist {name}");
}
}
fn channel_remove_by_cid(&mut self, cid: Cid) {
self.cid_by_name.retain(|_, v| *v != cid);
self.name_by_cid.remove(&cid);
self.channels.remove(&cid);
}
fn cid_by_name(&self, name: &str) -> Option<Cid> {
self.cid_by_name.get(name).map(Clone::clone)
}
fn cid_by_name_or_insert(&mut self, name: &str) -> Cid {
if let Some(cid) = self.cid_by_name.get(name) {
*cid
} else {
let cid = self.cid_store.next();
self.cid_by_name.insert(name.into(), cid);
self.name_by_cid.insert(cid, name.into());
cid
}
}
fn name_by_cid(&self, cid: Cid) -> Option<&str> {
self.name_by_cid.get(&cid).map(|x| x.as_str())
}
fn backoff_next(&mut self) -> u64 {
let dt = (self.conn_backoff * 300. * 1e3) as u64;
self.conn_backoff = (self.conn_backoff * 2.).tanh();
dt
}
fn backoff_reset(&mut self) {
self.conn_backoff = self.conn_backoff_beg;
}
fn channel_state_on_shutdown(&mut self, channel_reason: ChannelStatusClosedReason) {
// TODO can I reuse emit_channel_info_insert_items ?
trace!("channel_state_on_shutdown channels {}", self.channels.len());
for (_cid, chst) in &mut self.channels {
match chst {
ChannelState::Init(cssid) => {
*chst = ChannelState::Ended(cssid.clone());
}
ChannelState::Creating(st2) => {
*chst = ChannelState::Ended(st2.cssid.clone());
}
ChannelState::MakingSeriesWriter(st) => {
*chst = ChannelState::Ended(st.channel.cssid.clone());
}
ChannelState::Writable(st2) => {
let cssid = st2.channel.cssid.clone();
// TODO should call the proper channel-close handler which in turn emits the status item.
// Make sure I record the reason for the "Close": user command, IOC error, etc..
let item = QueryItem::ChannelStatus(ChannelStatusItem {
ts: self.tmp_ts_poll,
cssid: cssid.clone(),
status: ChannelStatus::Closed(channel_reason.clone()),
});
self.insert_item_queue.push_back(item);
*chst = ChannelState::Ended(cssid);
}
ChannelState::Error(..) => {
warn!("TODO emit error status");
// *chst = ChannelState::Ended;
}
ChannelState::Ended(_) => {}
}
}
}
fn check_channels_alive(&mut self) -> Result<(), Error> {
let tsnow = Instant::now();
trace2!("check_channels_alive {addr:?}", addr = &self.remote_addr_dbg);
if let Some(started) = self.ioc_ping_start {
if started + Duration::from_millis(4000) < tsnow {
self.stats.pong_timeout().inc();
warn!("pong timeout {addr:?}", addr = self.remote_addr_dbg);
self.ioc_ping_start = None;
let item = CaConnEvent {
ts: tsnow,
value: CaConnEventValue::EchoTimeout,
};
self.ca_conn_event_out_queue.push_back(item);
self.trigger_shutdown(ChannelStatusClosedReason::IocTimeout);
}
} else {
if self.ioc_ping_next < tsnow {
if let Some(proto) = &mut self.proto {
self.stats.ping_start().inc();
info!("start ping");
self.ioc_ping_start = Some(Instant::now());
let msg = CaMsg::from_ty_ts(CaMsgTy::Echo, tsnow);
proto.push_out(msg);
} else {
self.stats.ping_no_proto().inc();
warn!("can not ping {} no proto", self.remote_addr_dbg);
self.trigger_shutdown(ChannelStatusClosedReason::NoProtocol);
}
}
}
let mut alive_count = 0;
let mut not_alive_count = 0;
for (_, st) in &self.channels {
match st {
ChannelState::Writable(st2) => {
if tsnow.duration_since(st2.channel.ts_alive_last) >= Duration::from_millis(10000) {
warn!("TODO assume channel not alive because nothing received, but should do CAGET");
not_alive_count += 1;
} else {
alive_count += 1;
}
}
_ => {}
}
}
self.stats.channel_all_count.__set(self.channels.len() as _);
self.stats.channel_alive_count.__set(alive_count as _);
self.stats.channel_not_alive_count.__set(not_alive_count as _);
Ok(())
}
fn emit_channel_info_insert_items(&mut self) -> Result<(), Error> {
let timenow = self.tmp_ts_poll;
for (_, st) in &mut self.channels {
match st {
ChannelState::Init(..) => {
// TODO need last-save-ts for this state.
}
ChannelState::Creating(..) => {
// TODO need last-save-ts for this state.
}
ChannelState::MakingSeriesWriter(..) => {
// TODO ?
}
ChannelState::Writable(st) => {
let crst = &mut st.channel;
// TODO if we don't wave a series id yet, dont' save? write-ampl.
let msp = info_store_msp_from_time(timenow.clone());
if msp != crst.info_store_msp_last {
crst.info_store_msp_last = msp;
let item = QueryItem::ChannelInfo(ChannelInfoItem {
ts_msp: msp,
series: st.writer.sid(),
ivl: crst.item_recv_ivl_ema.ema().ema(),
interest: 0.,
evsize: 0,
});
self.insert_item_queue.push_back(item);
}
}
ChannelState::Error(_) => {
// TODO need last-save-ts for this state.
}
ChannelState::Ended(_) => {}
}
}
Ok(())
}
fn transition_to_polling(&mut self, subid: Subid, tsnow: Instant) -> Result<(), Error> {
let cid = if let Some(x) = self.cid_by_subid.get(&subid) {
*x
} else {
let e = Error::with_msg_no_trace("unknown {subid:?}");
error!("{e}");
return Err(e);
};
let ch_s = if let Some(x) = self.channels.get_mut(&cid) {
x
} else {
// TODO return better as error and let caller decide (with more structured errors)
// TODO
// When removing a channel, keep it in "closed" btree for some time because messages can
// still arrive from all buffers.
// If we don't have it in the "closed" btree, then close connection to the IOC and count
// as logic error.
// Close connection to the IOC. Cout as logic error.
let e = Error::with_msg_no_trace(format!(
"TODO handle_event_add_res can not find channel for {cid:?} {subid:?}"
));
error!("{e}");
return Err(e);
};
if let ChannelState::Writable(st2) = ch_s {
// TODO emit
// TODO for cancel, must supply again the same DBR type and count as in the EventAdd.
let ty = CaMsgTy::EventCancel(EventCancel {
data_type: st2.channel.ca_dbr_type,
data_count: st2.channel.ca_dbr_count,
sid: st2.channel.sid.to_u32(),
subid: subid.to_u32(),
});
let msg = CaMsg::from_ty_ts(ty, self.poll_tsnow);
let proto = self.proto.as_mut().unwrap();
proto.push_out(msg);
st2.reading = ReadingState::StopMonitoringForPolling(StopMonitoringForPollingState { tsbeg: tsnow });
} else {
warn!("can not transition to polling, channel not established");
}
Ok(())
}
fn handle_event_add_res(&mut self, ev: proto::EventAddRes, tsnow: Instant) -> Result<(), Error> {
let subid = Subid(ev.subid);
// TODO handle subid-not-found which can also be peer error:
let cid = if let Some(x) = self.cid_by_subid.get(&subid) {
*x
} else {
warn!("can not find cid for subid {subid:?}");
// return Err(Error::with_msg_no_trace());
return Ok(());
};
let ch_s = if let Some(x) = self.channels.get_mut(&cid) {
x
} else {
// TODO return better as error and let caller decide (with more structured errors)
warn!("TODO handle_event_add_res can not find channel for {cid:?} {subid:?}");
// TODO
// When removing a channel, keep it in "closed" btree for some time because messages can
// still arrive from all buffers.
// If we don't have it in the "closed" btree, then close connection to the IOC and count
// as logic error.
// Close connection to the IOC. Cout as logic error.
// return Err(Error::with_msg_no_trace());
return Ok(());
};
// debug!("handle_event_add_res {ev:?}");
match ch_s {
ChannelState::Writable(st) => {
let stnow = self.tmp_ts_poll;
let crst = &mut st.channel;
let stwin_ts = stnow.duration_since(std::time::UNIX_EPOCH).unwrap().as_secs() / 10;
if crst.stwin_ts != stwin_ts {
crst.stwin_ts = stwin_ts;
crst.stwin_count = 0;
}
{
crst.stwin_count += 1;
crst.stwin_bytes += ev.payload_len;
if crst.stwin_count > 5 || crst.stwin_bytes > 1024 * 1024 * 1 {
let subid = match &mut st.reading {
ReadingState::EnableMonitoring(x) => Some(x.subid.clone()),
ReadingState::Monitoring(x) => Some(x.subid.clone()),
ReadingState::StopMonitoringForPolling(_) => None,
ReadingState::Polling(_) => None,
};
if let Some(subid) = subid {
self.transition_to_polling(subid, tsnow)?;
}
return Ok(());
}
}
match &mut st.reading {
ReadingState::EnableMonitoring(st2) => {
let dt = st2.tsbeg.elapsed().as_secs_f32();
trace!("change to Monitoring after dt {dt:.0} ms");
st.reading = ReadingState::Monitoring(MonitoringState {
tsbeg: tsnow,
subid: st2.subid,
});
let crst = &mut st.channel;
let writer = &mut st.writer;
let iiq = &mut self.insert_item_queue;
let stats = self.stats.as_ref();
Self::event_add_ingest(ev.payload_len, ev.value, crst, writer, iiq, tsnow, stnow, stats)?;
}
ReadingState::Monitoring(_st2) => {
let crst = &mut st.channel;
let writer = &mut st.writer;
let iiq = &mut self.insert_item_queue;
let stats = self.stats.as_ref();
Self::event_add_ingest(ev.payload_len, ev.value, crst, writer, iiq, tsnow, stnow, stats)?;
}
ReadingState::StopMonitoringForPolling(st2) => {
// TODO count for metrics
if st2.tsbeg + Duration::from_millis(2000) < tsnow {
error!("TODO handle_event_add_res handle StopMonitoringForPolling");
std::process::exit(1);
}
}
ReadingState::Polling(st2) => {
// TODO count for metrics
if st2.tsbeg + Duration::from_millis(2000) < tsnow {
error!("TODO handle_event_add_res handle Polling");
std::process::exit(1);
}
}
}
}
_ => {
// TODO count instead of print
error!("unexpected state: EventAddRes while having {ch_s:?}");
}
}
Ok(())
}
fn handle_event_add_res_empty(&mut self, ev: proto::EventAddResEmpty, tsnow: Instant) -> Result<(), Error> {
let subid = Subid(ev.subid);
// TODO handle subid-not-found which can also be peer error:
let cid = if let Some(x) = self.cid_by_subid.get(&subid) {
*x
} else {
warn!("can not find cid for subid {subid:?}");
// return Err(Error::with_msg_no_trace());
return Ok(());
};
let ch_s = if let Some(x) = self.channels.get_mut(&cid) {
x
} else {
// TODO return better as error and let caller decide (with more structured errors)
warn!("TODO handle_event_add_res can not find channel for {cid:?} {subid:?}");
// TODO
// When removing a channel, keep it in "closed" btree for some time because messages can
// still arrive from all buffers.
// If we don't have it in the "closed" btree, then close connection to the IOC and count
// as logic error.
// Close connection to the IOC. Cout as logic error.
// return Err(Error::with_msg_no_trace());
return Ok(());
};
// debug!("handle_event_add_res {ev:?}");
match ch_s {
ChannelState::Writable(st) => match &mut st.reading {
ReadingState::StopMonitoringForPolling(..) => {
st.reading = ReadingState::Polling(PollingState {
tsbeg: tsnow,
poll_ivl: Duration::from_millis(2000),
tick: PollTickState::Idle(tsnow),
});
}
ReadingState::EnableMonitoring(..) => {
let name = self.name_by_cid(cid);
warn!("received event-cancel but channel {name:?} in wrong state");
}
ReadingState::Monitoring(..) => {
let name = self.name_by_cid(cid);
warn!("received event-cancel but channel {name:?} in wrong state");
}
ReadingState::Polling(..) => {
let name = self.name_by_cid(cid);
warn!("received event-cancel but channel {name:?} in wrong state");
}
},
_ => {
// TODO count instead of print
error!("unexpected state: EventAddRes while having {ch_s:?}");
}
}
Ok(())
}
fn handle_read_notify_res(&mut self, ev: proto::ReadNotifyRes, tsnow: Instant) -> Result<(), Error> {
// trace!("handle_read_notify_res {ev:?}");
// TODO can not rely on the SID in the response.
let sid_ev = Sid(ev.sid);
let ioid = Ioid(ev.ioid);
if let Some(cid) = self.read_ioids.get(&ioid) {
let ch_s = if let Some(x) = self.channels.get_mut(&cid) {
x
} else {
warn!("handle_read_notify_res can not find channel for {cid:?} {ioid:?}");
return Ok(());
};
match ch_s {
ChannelState::Writable(st) => {
if st.channel.sid != sid_ev {
// TODO count for metrics
// warn!("mismatch in ReadNotifyRes {:?} {:?}", st.channel.sid, sid_ev);
}
let stnow = self.tmp_ts_poll;
let crst = &mut st.channel;
let stwin_ts = stnow.duration_since(std::time::UNIX_EPOCH).unwrap().as_secs() / 1;
if crst.stwin_ts != stwin_ts {
crst.stwin_ts = stwin_ts;
crst.stwin_count = 0;
}
{
crst.stwin_count += 1;
crst.stwin_bytes += ev.payload_len;
}
match &mut st.reading {
ReadingState::Polling(st2) => match &mut st2.tick {
PollTickState::Idle(_st3) => {
warn!("received ReadNotifyRes while in Wait state");
}
PollTickState::Wait(st3, ioid) => {
let dt = tsnow.saturating_duration_since(*st3);
self.stats.caget_lat().ingest((1e3 * dt.as_secs_f32()) as u32);
self.read_ioids.remove(ioid);
// TODO maintain histogram of read-notify latencies
st2.tick = PollTickState::Idle(tsnow);
let crst = &mut st.channel;
let writer = &mut st.writer;
let iiq = &mut self.insert_item_queue;
let stats = self.stats.as_ref();
Self::event_add_ingest(
ev.payload_len,
ev.value,
crst,
writer,
iiq,
tsnow,
stnow,
stats,
)?;
}
},
ReadingState::EnableMonitoring(..) => {
error!("TODO handle_read_notify_res handle EnableMonitoring");
}
ReadingState::Monitoring(..) => {
error!("TODO handle_read_notify_res handle Monitoring");
}
ReadingState::StopMonitoringForPolling(..) => {
error!("TODO handle_read_notify_res handle StopMonitoringForPolling");
}
}
}
_ => {
// TODO count instead of print
error!("unexpected state: ReadNotifyRes while having {ch_s:?}");
}
}
} else {
warn!("unknown {ioid:?}");
}
Ok(())
}
fn event_add_ingest(
payload_len: u32,
value: CaEventValue,
crst: &mut CreatedState,
writer: &mut SeriesWriter,
iiq: &mut VecDeque<QueryItem>,
tsnow: Instant,
stnow: SystemTime,
stats: &CaConnStats,
) -> Result<(), Error> {
crst.ts_alive_last = tsnow;
crst.item_recv_ivl_ema.tick(tsnow);
crst.recv_count += 1;
crst.recv_bytes += payload_len as u64;
let series = writer.sid();
// TODO should attach these counters already to Writable state.
let ts_local = {
let epoch = stnow.duration_since(std::time::UNIX_EPOCH).unwrap();
epoch.as_secs() * SEC + epoch.subsec_nanos() as u64
};
let ts = value.ts;
let ts_diff = ts.abs_diff(ts_local);
stats.ca_ts_off().ingest((ts_diff / MS) as u32);
if tsnow >= crst.insert_next_earliest {
{
crst.muted_before = 0;
crst.insert_item_ivl_ema.tick(tsnow);
let em = crst.insert_item_ivl_ema.ema();
let ema = em.ema();
let ivl_min = (crst.insert_ivl_min_mus as f32) * 1e-6;
let dt = (ivl_min - ema).max(0.) / em.k();
crst.insert_next_earliest = tsnow + Duration::from_micros((dt * 1e6) as u64);
}
Self::check_ev_value_data(&value.data, writer.scalar_type())?;
{
let val: DataValue = value.data.into();
writer
.write(TsNano::from_ns(ts), TsNano::from_ns(ts_local), val, iiq)
.map_err(|e| Error::from_string(e))?;
}
Ok(())
} else {
stats.channel_fast_item_drop.inc();
if tsnow.duration_since(crst.insert_recv_ivl_last) >= Duration::from_millis(10000) {
crst.insert_recv_ivl_last = tsnow;
let ema = crst.insert_item_ivl_ema.ema();
let item = IvlItem {
series: series.clone(),
ts,
ema: ema.ema(),
emd: ema.emv().sqrt(),
};
iiq.push_back(QueryItem::Ivl(item));
}
if false && crst.muted_before == 0 {
let ema = crst.insert_item_ivl_ema.ema();
let item = MuteItem {
series: series.clone(),
ts,
ema: ema.ema(),
emd: ema.emv().sqrt(),
};
iiq.push_back(QueryItem::Mute(item));
}
crst.muted_before = 1;
Ok(())
}
}
fn check_ev_value_data(data: &proto::CaDataValue, scalar_type: &ScalarType) -> Result<(), Error> {
use crate::ca::proto::CaDataScalarValue;
use crate::ca::proto::CaDataValue;
match data {
CaDataValue::Scalar(x) => match &x {
CaDataScalarValue::F32(..) => match &scalar_type {
ScalarType::F32 => {}
_ => {
error!("MISMATCH got f32 exp {:?}", scalar_type);
}
},
CaDataScalarValue::F64(..) => match &scalar_type {
ScalarType::F64 => {}
_ => {
error!("MISMATCH got f64 exp {:?}", scalar_type);
}
},
CaDataScalarValue::I16(..) => match &scalar_type {
ScalarType::I16 => {}
_ => {
error!("MISMATCH got i16 exp {:?}", scalar_type);
}
},
CaDataScalarValue::I32(..) => match &scalar_type {
ScalarType::I32 => {}
_ => {
error!("MISMATCH got i32 exp {:?}", scalar_type);
}
},
_ => {}
},
_ => {}
}
Ok(())
}
/*
Acts more like a stream? Can be:
Pending
Ready(no-more-work, something-was-done, error)
*/
fn handle_handshake(&mut self, cx: &mut Context) -> Poll<Option<Result<(), Error>>> {
use Poll::*;
match self.proto.as_mut().unwrap().poll_next_unpin(cx) {
Ready(Some(k)) => match k {
Ok(k) => match k {
CaItem::Empty => {
info!("CaItem::Empty");
Ready(Some(Ok(())))
}
CaItem::Msg(msg) => match msg.ty {
CaMsgTy::VersionRes(n) => {
debug!("see incoming {:?} {:?}", self.remote_addr_dbg, msg);
if n < 12 || n > 13 {
error!("See some unexpected version {n} channel search may not work.");
Ready(Some(Ok(())))
} else {
if n != 13 {
warn!("Received peer version {n}");
}
self.state = CaConnState::PeerReady;
Ready(Some(Ok(())))
}
}
k => {
warn!("Got some other unhandled message: {k:?}");
Ready(Some(Ok(())))
}
},
},
Err(e) => {
error!("got error item from CaProto {e:?}");
Ready(Some(Err(e.to_string().into())))
}
},
Ready(None) => {
warn!("handle_conn_listen CaProto is done {:?}", self.remote_addr_dbg);
self.state = CaConnState::Wait(wait_fut(self.backoff_next()));
self.proto = None;
Ready(None)
}
Pending => Pending,
}
}
fn check_channels_state_init(&mut self, tsnow: Instant, cx: &mut Context) -> Result<(), Error> {
let mut do_wake_again = false;
// TODO profile, efficient enough?
if self.init_state_count == 0 {
return Ok(());
}
let keys: Vec<Cid> = self.channels.keys().map(|x| *x).collect();
for cid in keys {
match self.channels.get(&cid).unwrap() {
ChannelState::Init(cssid) => {
let cssid = cssid.clone();
let name = self
.name_by_cid(cid)
.ok_or_else(|| Error::with_msg_no_trace("name for cid not known"));
let name = match name {
Ok(k) => k.to_string(),
Err(e) => return Err(e),
};
let msg = CaMsg::from_ty_ts(
CaMsgTy::CreateChan(CreateChan {
cid: cid.0,
channel: name.into(),
}),
tsnow,
);
do_wake_again = true;
self.proto.as_mut().unwrap().push_out(msg);
// TODO handle not-found error:
let ch_s = self.channels.get_mut(&cid).unwrap();
*ch_s = ChannelState::Creating(CreatingState {
tsbeg: tsnow,
cssid,
cid,
});
self.init_state_count -= 1;
}
_ => {}
}
}
if do_wake_again {
cx.waker().wake_by_ref();
}
Ok(())
}
fn check_channels_state_poll(&mut self, tsnow: Instant, cx: &mut Context) -> Result<(), Error> {
let mut do_wake_again = false;
let channels = &mut self.channels;
for (_k, v) in channels {
match v {
ChannelState::Init(_) => {}
ChannelState::Creating(_) => {}
ChannelState::MakingSeriesWriter(_) => {}
ChannelState::Writable(st2) => match &mut st2.reading {
ReadingState::EnableMonitoring(_) => {}
ReadingState::Monitoring(_) => {}
ReadingState::StopMonitoringForPolling(_) => {}
ReadingState::Polling(st3) => match &mut st3.tick {
PollTickState::Idle(x) => {
if *x + st3.poll_ivl <= tsnow {
let ioid = Ioid(self.ioid);
self.ioid = self.ioid.wrapping_add(1);
self.read_ioids.insert(ioid, st2.channel.cid.clone());
let msg = CaMsg::from_ty_ts(
CaMsgTy::ReadNotify(ReadNotify {
data_type: st2.channel.ca_dbr_type,
data_count: st2.channel.ca_dbr_count,
sid: st2.channel.sid.to_u32(),
ioid: ioid.0,
}),
tsnow,
);
do_wake_again = true;
self.proto.as_mut().unwrap().push_out(msg);
st3.tick = PollTickState::Wait(tsnow, ioid);
}
}
PollTickState::Wait(x, ioid) => {
if *x + Duration::from_millis(10000) <= tsnow {
self.read_ioids.remove(ioid);
self.stats.caget_timeout().inc();
// warn!("channel caget timeout");
// std::process::exit(1);
}
}
},
},
ChannelState::Error(_) => {}
ChannelState::Ended(_) => {}
}
}
if do_wake_again {
cx.waker().wake_by_ref();
}
Ok(())
}
// Can return:
// Pending, error, work-done (pending state unknown), no-more-work-ever-again.
fn handle_peer_ready(&mut self, cx: &mut Context) -> Poll<Option<Result<(), Error>>> {
use Poll::*;
let mut ts1 = Instant::now();
// TODO unify with Listen state where protocol gets polled as well.
let ts2 = Instant::now();
self.stats
.time_check_channels_state_init
.add((ts2.duration_since(ts1) * MS as u32).as_secs());
ts1 = ts2;
let _ = ts1;
let tsnow = Instant::now();
let proto = if let Some(x) = self.proto.as_mut() {
x
} else {
return Ready(Some(Err(Error::with_msg_no_trace("handle_peer_ready but no proto"))));
};
let res = match proto.poll_next_unpin(cx) {
Ready(Some(Ok(k))) => {
match k {
CaItem::Msg(camsg) => {
match camsg.ty {
CaMsgTy::SearchRes(k) => {
let a = k.addr.to_be_bytes();
let addr = format!("{}.{}.{}.{}:{}", a[0], a[1], a[2], a[3], k.tcp_port);
trace!("Search result indicates server address: {addr}");
// TODO count this unexpected case.
}
CaMsgTy::CreateChanRes(k) => {
self.handle_create_chan_res(k, tsnow)?;
cx.waker().wake_by_ref();
}
CaMsgTy::EventAddRes(ev) => {
trace!("got EventAddRes {:?} cnt {}", camsg.ts, ev.data_count);
self.stats.event_add_res_recv.inc();
let res = Self::handle_event_add_res(self, ev, tsnow);
let ts2 = Instant::now();
self.stats
.time_handle_event_add_res
.add((ts2.duration_since(tsnow) * MS as u32).as_secs());
res?;
}
CaMsgTy::EventAddResEmpty(ev) => {
trace!("got EventAddResEmpty {:?}", camsg.ts);
let res = Self::handle_event_add_res_empty(self, ev, tsnow);
let ts2 = Instant::now();
self.stats
.time_handle_event_add_res
.add((ts2.duration_since(tsnow) * MS as u32).as_secs());
res?;
}
CaMsgTy::ReadNotifyRes(ev) => Self::handle_read_notify_res(self, ev, tsnow)?,
CaMsgTy::Echo => {
// let addr = &self.remote_addr_dbg;
if let Some(started) = self.ioc_ping_start {
let dt = started.elapsed();
let dt = dt.as_secs() as u32 + dt.subsec_millis();
self.stats.pong_recv_lat().ingest(dt);
} else {
let addr = &self.remote_addr_dbg;
warn!("Received Echo even though we didn't asked for it {addr:?}");
}
self.ioc_ping_last = tsnow;
self.ioc_ping_next = tsnow + Self::ioc_ping_ivl_rng(&mut self.rng);
self.ioc_ping_start = None;
}
CaMsgTy::CreateChanFail(msg) => {
// TODO
// Here, must indicate that the address could be wrong!
// The channel status must be "Fail" so that ConnSet can decide to re-search.
// TODO how to transition the channel state? Any invariants or simply write to the map?
let cid = Cid(msg.cid);
if let Some(name) = self.name_by_cid.get(&cid) {
debug!("queue event to notive channel create fail {name}");
let item = CaConnEvent {
ts: tsnow,
value: CaConnEventValue::ChannelCreateFail(name.into()),
};
self.ca_conn_event_out_queue.push_back(item);
}
self.channel_remove_by_cid(cid);
warn!("CaConn sees: {msg:?}");
}
CaMsgTy::Error(msg) => {
warn!("CaConn sees: {msg:?}");
}
CaMsgTy::AccessRightsRes(msg) => {
if false {
warn!("CaConn sees: {msg:?}");
}
}
#[cfg(DISABLED)]
CaMsgTy::IssueDataCount(hi, stat, sev, secs, nanos) => {
let cid = *self.cid_by_subid.get(&hi.param2()).unwrap();
let name = self.name_by_cid.get(&cid).unwrap();
debug!("ca large count for {name} {hi:?} {stat} {sev} {secs} {nanos}");
self.weird_count += 1;
if self.weird_count > 200 {
std::process::exit(13);
}
}
CaMsgTy::VersionRes(x) => {
debug!("VersionRes({x})");
self.weird_count += 1;
if self.weird_count > 200 {
std::process::exit(13);
}
}
_ => {
warn!("Received unexpected protocol message {:?}", camsg);
}
}
}
CaItem::Empty => {}
}
Ready(Some(Ok(())))
}
Ready(Some(Err(e))) => {
error!("CaProto yields error: {e:?} remote {:?}", self.remote_addr_dbg);
self.trigger_shutdown(ChannelStatusClosedReason::ProtocolError);
Ready(Some(Err(e)))
}
Ready(None) => {
warn!("handle_peer_ready CaProto is done {:?}", self.remote_addr_dbg);
self.trigger_shutdown(ChannelStatusClosedReason::ProtocolDone);
Ready(None)
}
Pending => Pending,
};
res.map_err(|e| Error::from(e.to_string()))
}
fn handle_create_chan_res(&mut self, k: proto::CreateChanRes, tsnow: Instant) -> Result<(), Error> {
let cid = Cid(k.cid);
let sid = Sid(k.sid);
let channels = &mut self.channels;
let name_by_cid = &self.name_by_cid;
// TODO handle cid-not-found which can also indicate peer error.
let name = if let Some(x) = name_by_cid.get(&cid) {
x.to_string()
} else {
return Err(Error::with_msg_no_trace(format!("no name for {cid:?}")));
};
trace!("handle_create_chan_res {k:?} {name:?}");
// TODO handle not-found error:
let ch_s = channels.get_mut(&cid).unwrap();
let cssid = match ch_s {
ChannelState::Creating(st) => st.cssid.clone(),
_ => {
// TODO handle in better way:
// Remove channel and emit notice that channel is removed with reason.
let e = Error::with_msg_no_trace("handle_peer_ready bad state");
return Err(e);
}
};
self.cid_by_sid.insert(sid, cid);
if k.data_type > 6 {
error!("CreateChanRes with unexpected data_type {}", k.data_type);
}
// Ask for DBR_TIME_...
let ca_dbr_type = k.data_type + 14;
let scalar_type = ScalarType::from_ca_id(k.data_type)?;
let shape = Shape::from_ca_count(k.data_count)?;
let channel = CreatedState {
cssid,
cid,
sid,
ca_dbr_type,
// TODO for extended epics messages, can be u32!
ca_dbr_count: k.data_count as u16,
ts_created: tsnow,
ts_alive_last: tsnow,
ts_msp_last: 0,
ts_msp_grid_last: 0,
inserted_in_ts_msp: u64::MAX,
insert_item_ivl_ema: IntervalEma::new(),
item_recv_ivl_ema: IntervalEma::new(),
insert_recv_ivl_last: tsnow,
insert_next_earliest: tsnow,
muted_before: 0,
insert_ivl_min_mus: self.insert_ivl_min_mus,
info_store_msp_last: info_store_msp_from_time(self.tmp_ts_poll),
recv_count: 0,
recv_bytes: 0,
stwin_ts: 0,
stwin_count: 0,
stwin_bytes: 0,
};
*ch_s = ChannelState::MakingSeriesWriter(MakingSeriesWriterState { tsbeg: tsnow, channel });
let job = EstablishWorkerJob::new(
JobId(cid.0 as _),
self.backend.clone(),
name.into(),
scalar_type,
shape,
self.writer_tx.clone(),
self.tmp_ts_poll,
);
self.writer_establish_qu.push_back(job);
Ok(())
}
// `?` works not in here.
fn _test_control_flow(&mut self, _cx: &mut Context) -> ControlFlow<Poll<Result<(), Error>>> {
use ControlFlow::*;
use Poll::*;
let e = Error::with_msg_no_trace(format!("test"));
//Err(e)?;
let _ = e;
Break(Pending)
}
fn handle_conn_state(&mut self, tsnow: Instant, cx: &mut Context) -> Result<Poll<Option<()>>, Error> {
use Poll::*;
match &mut self.state {
CaConnState::Unconnected(_since) => {
let addr = self.remote_addr_dbg.clone();
// TODO issue a TCP-connect event (and later a "connected")
trace!("create tcp connection to {:?}", (addr.ip(), addr.port()));
let fut = tokio::time::timeout(Duration::from_millis(1000), TcpStream::connect(addr));
self.state = CaConnState::Connecting(Instant::now(), addr, Box::pin(fut));
Ok(Ready(Some(())))
}
CaConnState::Connecting(_since, addr, fut) => {
match fut.poll_unpin(cx) {
Ready(connect_result) => {
match connect_result {
Ok(Ok(tcp)) => {
self.stats.tcp_connected.inc();
let addr = addr.clone();
self.insert_item_queue
.push_back(QueryItem::ConnectionStatus(ConnectionStatusItem {
ts: self.tmp_ts_poll,
addr,
status: ConnectionStatus::Established,
}));
self.backoff_reset();
let proto = CaProto::new(
tcp,
self.remote_addr_dbg.clone(),
self.opts.array_truncate,
self.ca_proto_stats.clone(),
);
self.state = CaConnState::Init;
self.proto = Some(proto);
Ok(Ready(Some(())))
}
Ok(Err(e)) => {
trace!("error connect to {addr} {e}");
if true {
let addr = addr.clone();
self.insert_item_queue.push_back(QueryItem::ConnectionStatus(
ConnectionStatusItem {
ts: self.tmp_ts_poll,
addr,
status: ConnectionStatus::ConnectError,
},
));
self.trigger_shutdown(ChannelStatusClosedReason::ConnectFail);
} else {
// TODO log with exponential backoff
let dt = self.backoff_next();
self.state = CaConnState::Wait(wait_fut(dt));
self.proto = None;
}
Ok(Ready(Some(())))
}
Err(e) => {
// TODO log with exponential backoff
trace!("timeout connect to {addr} {e}");
if true {
let addr = addr.clone();
self.insert_item_queue.push_back(QueryItem::ConnectionStatus(
ConnectionStatusItem {
ts: self.tmp_ts_poll,
addr,
status: ConnectionStatus::ConnectTimeout,
},
));
self.trigger_shutdown(ChannelStatusClosedReason::ConnectFail);
} else {
let dt = self.backoff_next();
self.state = CaConnState::Wait(wait_fut(dt));
self.proto = None;
}
Ok(Ready(Some(())))
}
}
}
Pending => Ok(Pending),
}
}
CaConnState::Init => {
trace4!("Init");
let hostname = self.local_epics_hostname.clone();
let proto = self.proto.as_mut().unwrap();
let msg = CaMsg::from_ty_ts(CaMsgTy::Version, tsnow);
proto.push_out(msg);
let msg = CaMsg::from_ty_ts(CaMsgTy::ClientName, tsnow);
proto.push_out(msg);
let msg = CaMsg::from_ty_ts(CaMsgTy::HostName(hostname), tsnow);
proto.push_out(msg);
self.state = CaConnState::Handshake;
Ok(Ready(Some(())))
}
CaConnState::Handshake => {
match {
let res = self.handle_handshake(cx);
res
} {
Ready(Some(Ok(()))) => Ok(Ready(Some(()))),
Ready(Some(Err(e))) => Err(e),
Ready(None) => Ok(Ready(Some(()))),
Pending => Ok(Pending),
}
}
CaConnState::PeerReady => {
trace4!("PeerReady");
let res = self.handle_peer_ready(cx);
match res {
Ready(Some(Ok(()))) => Ok(Ready(Some(()))),
Ready(Some(Err(e))) => Err(e),
Ready(None) => Ok(Ready(Some(()))),
Pending => Ok(Pending),
}
}
CaConnState::Wait(inst) => {
trace4!("Wait");
match inst.poll_unpin(cx) {
Ready(_) => {
self.state = CaConnState::Unconnected(Instant::now());
self.proto = None;
Ok(Ready(Some(())))
}
Pending => Ok(Pending),
}
}
CaConnState::Shutdown => {
trace4!("Shutdown");
Ok(Ready(None))
}
CaConnState::EndOfStream => {
trace4!("EndOfStream");
Ok(Ready(None))
}
}
}
fn loop_inner(&mut self, cx: &mut Context) -> Result<Poll<Option<()>>, Error> {
use Poll::*;
let tsnow = Instant::now();
let mut have_progress = false;
for _ in 0..64 {
self.stats.loop2_count.inc();
if self.is_shutdown() {
break;
} else if self.insert_item_queue.len() >= self.opts.insert_queue_max {
break;
} else {
match self.handle_conn_state(tsnow, cx) {
Ok(x) => match x {
Ready(Some(())) => {
have_progress = true;
continue;
}
Ready(None) => {
error!("handle_conn_state yields {x:?}");
return Err(Error::with_msg_no_trace("logic error"));
}
Pending => return Ok(Pending),
},
Err(e) => return Err(e),
}
};
}
if have_progress {
Ok(Ready(Some(())))
} else {
Ok(Ready(None))
}
}
fn poll_own_ticker(mut self: Pin<&mut Self>, cx: &mut Context) -> Result<Poll<()>, Error> {
use Poll::*;
match self.ticker.poll_unpin(cx) {
Ready(()) => match self.as_mut().handle_own_ticker(cx) {
Ok(_) => Ok(Pending),
Err(e) => {
error!("handle_own_ticker {e}");
self.trigger_shutdown(ChannelStatusClosedReason::InternalError);
Err(e)
}
},
Pending => Ok(Pending),
}
}
fn handle_own_ticker(mut self: Pin<&mut Self>, cx: &mut Context) -> Result<(), Error> {
// debug!("tick CaConn {}", self.remote_addr_dbg);
let tsnow = Instant::now();
if !self.is_shutdown() {
self.ticker = Self::new_self_ticker();
let _ = self.ticker.poll_unpin(cx);
// cx.waker().wake_by_ref();
}
self.check_channels_state_init(tsnow, cx)?;
self.check_channels_state_poll(tsnow, cx)?;
// TODO add some random variation
if self.channel_status_emit_last + Duration::from_millis(3000) <= tsnow {
self.channel_status_emit_last = tsnow;
self.emit_channel_status()?;
}
if self.tick_last_writer + Duration::from_millis(2000) <= tsnow {
self.tick_last_writer = tsnow;
self.tick_writers()?;
}
match &self.state {
CaConnState::Unconnected(_) => {}
CaConnState::Connecting(since, _, _) => {
if *since + CONNECTING_TIMEOUT < tsnow {
debug!("CONNECTING_TIMEOUT");
}
}
CaConnState::Init => {}
CaConnState::Handshake => {}
CaConnState::PeerReady => {}
CaConnState::Wait(_) => {}
CaConnState::Shutdown => {}
CaConnState::EndOfStream => {}
}
Ok(())
}
fn emit_channel_status(&mut self) -> Result<(), Error> {
let mut channel_statuses = BTreeMap::new();
for e in self.channels.iter() {
let ch = &e.1;
let chinfo = ch.to_info(ch.cssid(), self.remote_addr_dbg);
channel_statuses.insert(ch.cssid(), chinfo);
}
trace!("emit_channel_status {}", channel_statuses.len());
let val = ChannelStatusPartial { channel_statuses };
let item = CaConnEvent {
ts: Instant::now(),
value: CaConnEventValue::ChannelStatus(val),
};
// TODO limit the queue length.
// Maybe factor the actual push item into new function.
// What to do if limit reached?
// Increase some error counter.
if self.ca_conn_event_out_queue.len() > self.ca_conn_event_out_queue_max {
self.stats.out_queue_full().inc();
} else {
self.ca_conn_event_out_queue.push_back(item);
}
Ok(())
}
fn tick_writers(&mut self) -> Result<(), Error> {
for (k, st) in &mut self.channels {
if let ChannelState::Writable(st2) = st {
st2.writer
.tick(&mut self.insert_item_queue)
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
}
}
Ok(())
}
fn check_ticker_connecting_timeout(&mut self, since: Instant) -> Result<(), Error> {
Ok(())
}
fn queues_out_flushed(&self) -> bool {
self.queues_async_out_flushed() && self.ca_conn_event_out_queue.is_empty()
}
fn queues_async_out_flushed(&self) -> bool {
// self.channel_info_query_queue.is_empty() && self.channel_info_query_sending.is_idle()
// TODO re-enable later
self.insert_item_queue.is_empty() && self.storage_insert_sender.is_idle()
}
fn attempt_flush_queue<T, Q, FB, FS>(
qu: &mut VecDeque<T>,
sp: &mut Pin<Box<SenderPolling<Q>>>,
qu_to_si: FB,
loop_max: u32,
cx: &mut Context,
id: &str,
stats: FS,
) -> Result<Poll<Option<()>>, Error>
where
Q: Unpin,
FB: Fn(&mut VecDeque<T>) -> Option<Q>,
FS: Fn(&Q),
{
use Poll::*;
let mut have_progress = false;
let mut i = 0;
loop {
i += 1;
if i > loop_max {
break;
}
if !sp.has_sender() {
return Err(Error::with_msg_no_trace(format!("flush queue {id} no sender")));
}
if sp.is_idle() {
if let Some(item) = qu_to_si(qu) {
stats(&item);
sp.as_mut().send_pin(item);
} else {
break;
}
}
if sp.is_sending() {
match sp.poll_unpin(cx) {
Ready(Ok(())) => {
have_progress = true;
}
Ready(Err(e)) => {
let e = Error::with_msg_no_trace(format!("flush queue {id} {e}"));
return Err(e);
}
Pending => {
return Ok(Pending);
}
}
} else {
let e = Error::with_msg_no_trace(format!("flush queue {id} not sending"));
return Err(e);
}
}
if have_progress {
Ok(Ready(Some(())))
} else {
Ok(Ready(None))
}
}
}
// $have is tuple (have_progress, have_pending))
macro_rules! flush_queue {
($self:expr, $qu:ident, $sp:ident, $batcher:expr, $loop_max:expr, $have:expr, $id:expr, $cx:expr, $stats:expr) => {
let obj = $self.as_mut().get_mut();
let qu = &mut obj.$qu;
let sp = &mut obj.$sp;
match Self::attempt_flush_queue(qu, sp, $batcher, $loop_max, $cx, $id, $stats) {
Ok(Ready(Some(()))) => {
*$have.0 |= true;
}
Ok(Ready(None)) => {}
Ok(Pending) => {
*$have.1 |= true;
}
Err(e) => break Ready(Some(Err(e))),
}
};
}
fn send_individual<T>(qu: &mut VecDeque<T>) -> Option<T> {
qu.pop_front()
}
fn send_batched<const N: usize, T>(qu: &mut VecDeque<T>) -> Option<VecDeque<T>> {
let n = qu.len();
if n == 0 {
None
} else {
let batch = qu.drain(..n.min(N)).collect();
Some(batch)
}
}
impl Stream for CaConn {
type Item = Result<CaConnEvent, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
self.poll_tsnow = Instant::now();
self.tmp_ts_poll = SystemTime::now();
let poll_ts1 = Instant::now();
self.stats.poll_count().inc();
self.stats.poll_fn_begin().inc();
let mut reloops: u32 = 0;
let ret = loop {
let lts1 = Instant::now();
self.stats.poll_loop_begin().inc();
let qlen = self.insert_item_queue.len();
if qlen >= self.opts.insert_queue_max * 2 / 3 {
self.stats.insert_item_queue_pressure().inc();
} else if qlen >= self.opts.insert_queue_max {
self.stats.insert_item_queue_full().inc();
}
let mut have_pending = false;
let mut have_progress = false;
if let CaConnState::EndOfStream = self.state {
break Ready(None);
}
if let Some(item) = self.ca_conn_event_out_queue.pop_front() {
break Ready(Some(Ok(item)));
}
let lts2 = Instant::now();
match self.as_mut().poll_own_ticker(cx) {
Ok(Ready(())) => {
have_progress = true;
}
Ok(Pending) => {
have_pending = true;
}
Err(e) => break Ready(Some(Err(e))),
}
{
// TODO use stats histogram type to test the native prometheus histogram feature
let qu = &self.insert_item_queue;
let stats = &self.stats;
let n = qu.len();
if n >= 128 {
stats.storage_queue_above_128().inc();
} else if n >= 32 {
stats.storage_queue_above_32().inc();
} else if n >= 8 {
stats.storage_queue_above_8().inc();
}
}
let lts3;
if !self.is_shutdown() {
let stats2 = self.stats.clone();
let stats_fn = move |item: &VecDeque<QueryItem>| {
stats2.iiq_batch_len().ingest(item.len() as u32);
};
flush_queue!(
self,
insert_item_queue,
storage_insert_sender,
send_batched::<256, _>,
32,
(&mut have_progress, &mut have_pending),
"strg",
cx,
stats_fn
);
lts3 = Instant::now();
flush_queue!(
self,
writer_establish_qu,
writer_establish_tx,
send_individual,
32,
(&mut have_progress, &mut have_pending),
"wrest",
cx,
|_| {}
);
} else {
lts3 = Instant::now();
}
let lts4 = Instant::now();
match self.as_mut().handle_writer_establish_result(cx) {
Ok(Ready(Some(()))) => {
have_progress = true;
}
Ok(Ready(None)) => {}
Ok(Pending) => {
have_pending = true;
}
Err(e) => break Ready(Some(Err(e))),
}
let lts5 = Instant::now();
match self.as_mut().handle_conn_command(cx) {
Ok(Ready(Some(()))) => {
have_progress = true;
}
Ok(Ready(None)) => {}
Ok(Pending) => {
have_pending = true;
}
Err(e) => break Ready(Some(Err(e))),
}
let lts6 = Instant::now();
match self.loop_inner(cx) {
Ok(Ready(Some(()))) => {
have_progress = true;
}
Ok(Ready(None)) => {}
Ok(Pending) => {
have_pending = true;
}
Err(e) => {
error!("{e}");
self.state = CaConnState::EndOfStream;
break Ready(Some(Err(e)));
}
}
let lts7 = Instant::now();
let max = Duration::from_millis(200);
let dt = lts2.saturating_duration_since(lts1);
if dt > max {
debug!("LONG OPERATION 2 {:.0} ms", 1e3 * dt.as_secs_f32());
}
let dt = lts3.saturating_duration_since(lts2);
self.stats.poll_op3_dt().ingest((1e3 * dt.as_secs_f32()) as u32);
if dt > max {
debug!("LONG OPERATION 3 {:.0} ms", 1e3 * dt.as_secs_f32());
}
let dt = lts4.saturating_duration_since(lts3);
if dt > max {
debug!("LONG OPERATION 4 {:.0} ms", 1e3 * dt.as_secs_f32());
}
let dt = lts5.saturating_duration_since(lts4);
if dt > max {
debug!("LONG OPERATION 5 {:.0} ms", 1e3 * dt.as_secs_f32());
}
let dt = lts6.saturating_duration_since(lts5);
if dt > max {
debug!("LONG OPERATION 6 {:.0} ms", 1e3 * dt.as_secs_f32());
}
let dt = lts7.saturating_duration_since(lts6);
if dt > max {
debug!("LONG OPERATION 7 {:.0} ms", 1e3 * dt.as_secs_f32());
}
break if self.is_shutdown() {
if self.queues_out_flushed() {
// debug!("end of stream {}", self.remote_addr_dbg);
self.state = CaConnState::EndOfStream;
Ready(None)
} else {
// debug!("queues_out_flushed false");
if have_progress {
self.stats.poll_reloop().inc();
reloops += 1;
continue;
} else if have_pending {
self.stats.poll_pending().inc();
Pending
} else {
// TODO error
error!("shutting down, queues not flushed, no progress, no pending");
self.stats.logic_error().inc();
let e = Error::with_msg_no_trace("shutting down, queues not flushed, no progress, no pending");
Ready(Some(Err(e)))
}
}
} else {
if have_progress {
if poll_ts1.elapsed() > Duration::from_millis(5) {
self.stats.poll_wake_break().inc();
cx.waker().wake_by_ref();
break Ready(Some(Ok(CaConnEvent {
ts: poll_ts1,
value: CaConnEventValue::None,
})));
} else {
self.stats.poll_reloop().inc();
reloops += 1;
continue;
}
} else if have_pending {
self.stats.poll_pending().inc();
Pending
} else {
self.stats.poll_no_progress_no_pending().inc();
let e = Error::with_msg_no_trace("no progress no pending");
Ready(Some(Err(e)))
}
};
};
let poll_ts2 = Instant::now();
let dt = poll_ts2.saturating_duration_since(poll_ts1);
self.stats.poll_all_dt().ingest((1e3 * dt.as_secs_f32()) as u32);
self.stats.read_ioids_len().set(self.read_ioids.len() as u64);
let n = match &self.proto {
Some(x) => x.proto_out_len() as u64,
None => 0,
};
self.stats.proto_out_len().set(n);
self.stats.poll_reloops().ingest(reloops);
ret
}
}