Refactor channel state struct

This commit is contained in:
Dominik Werder
2024-01-14 11:17:22 +01:00
parent 2b185b8277
commit 05de9938c8

View File

@@ -102,7 +102,6 @@ pub enum ChannelConnectedInfo {
Connecting,
Connected,
Error,
Ended,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
@@ -121,6 +120,7 @@ pub struct ChannelStateInfo {
#[serde(with = "ser_instant")]
pub ts_event_last: Option<Instant>,
pub recv_count: Option<u64>,
pub recv_bytes: Option<u64>,
// #[serde(skip_serializing_if = "Option::is_none")]
pub item_recv_ivl_ema: Option<f32>,
pub interest_score: f32,
@@ -176,26 +176,74 @@ struct Cid(pub u32);
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
struct Subid(pub u32);
impl Subid {
pub fn to_u32(&self) -> u32 {
self.0
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
struct Sid(pub u32);
impl Sid {
pub fn to_u32(&self) -> u32 {
self.0
}
}
#[derive(Clone, Debug)]
enum ChannelError {
CreateChanFail(ChannelStatusSeriesId),
}
#[derive(Debug, Clone)]
struct EventedState {
ts_last: Instant,
recv_count: u64,
recv_bytes: u64,
struct CreatingState {
tsbeg: Instant,
cssid: ChannelStatusSeriesId,
cid: Cid,
}
#[derive(Debug, Clone)]
enum MonitoringState {
FetchSeriesId,
AddingEvent(SeriesId),
Evented(SeriesId, EventedState),
struct MakingSeriesWriterState {
tsbeg: Instant,
channel: CreatedState,
}
#[derive(Debug, Clone)]
struct EnableMonitoringState {
tsbeg: Instant,
}
#[derive(Debug, Clone)]
struct MonitoringState {
tsbeg: Instant,
}
#[derive(Debug, Clone)]
struct StopMonitoringForPollingState {
tsbeg: Instant,
}
#[derive(Debug, Clone)]
struct PollingState {
tsbeg: Instant,
poll_ivl: Duration,
}
#[derive(Debug)]
struct CreatedState22 {
tsbeg: Instant,
channel: CreatedState,
writer: SeriesWriter,
reading: ReadingState,
}
#[derive(Debug, Clone)]
enum ReadingState {
EnableMonitoring(EnableMonitoringState),
Monitoring(MonitoringState),
StopMonitoringForPolling(StopMonitoringForPollingState),
Polling(PollingState),
}
#[derive(Debug, Clone)]
@@ -203,10 +251,8 @@ struct CreatedState {
cssid: ChannelStatusSeriesId,
cid: Cid,
sid: Sid,
#[allow(unused)]
ts_created: Instant,
ts_alive_last: Instant,
state: MonitoringState,
ts_msp_last: u64,
ts_msp_grid_last: u32,
inserted_in_ts_msp: u64,
@@ -215,25 +261,43 @@ struct CreatedState {
insert_recv_ivl_last: Instant,
insert_next_earliest: Instant,
muted_before: u32,
insert_ivl_min_mus: u32,
info_store_msp_last: u32,
recv_count: u64,
recv_bytes: u64,
}
#[derive(Debug)]
struct WritableState {
created: CreatedState,
writer: SeriesWriter,
impl CreatedState {
fn dummy() -> Self {
let tsnow = Instant::now();
Self {
cssid: ChannelStatusSeriesId::new(0),
cid: Cid(0),
sid: Sid(0),
ts_created: tsnow,
ts_alive_last: tsnow,
ts_msp_last: 0,
ts_msp_grid_last: 0,
inserted_in_ts_msp: 0,
insert_item_ivl_ema: IntervalEma::new(),
item_recv_ivl_ema: IntervalEma::new(),
insert_recv_ivl_last: tsnow,
insert_next_earliest: tsnow,
muted_before: 0,
insert_ivl_min_mus: 0,
info_store_msp_last: 0,
recv_count: 0,
recv_bytes: 0,
}
}
}
#[derive(Debug)]
enum ChannelState {
Init(ChannelStatusSeriesId),
Creating {
cssid: ChannelStatusSeriesId,
cid: Cid,
ts_beg: Instant,
},
MakingSeriesWriter(CreatedState),
Writable(WritableState),
Creating(CreatingState),
MakingSeriesWriter(MakingSeriesWriterState),
Created(CreatedState22),
Error(ChannelError),
Ended(ChannelStatusSeriesId),
}
@@ -244,39 +308,37 @@ impl ChannelState {
ChannelState::Init(..) => ChannelConnectedInfo::Disconnected,
ChannelState::Creating { .. } => ChannelConnectedInfo::Connecting,
ChannelState::MakingSeriesWriter(_) => ChannelConnectedInfo::Connecting,
ChannelState::Writable(..) => ChannelConnectedInfo::Connected,
ChannelState::Created(_) => ChannelConnectedInfo::Connected,
ChannelState::Error(_) => ChannelConnectedInfo::Error,
ChannelState::Ended(_) => ChannelConnectedInfo::Ended,
ChannelState::Ended(_) => ChannelConnectedInfo::Disconnected,
};
let scalar_type = match self {
ChannelState::Writable(s) => Some(s.writer.scalar_type().clone()),
ChannelState::Created(s) => Some(s.writer.scalar_type().clone()),
_ => None,
};
let shape = match self {
ChannelState::Writable(s) => Some(s.writer.shape().clone()),
ChannelState::Created(s) => Some(s.writer.shape().clone()),
_ => None,
};
let ts_created = match self {
ChannelState::Writable(s) => Some(s.created.ts_created.clone()),
ChannelState::Created(s) => Some(s.channel.ts_created.clone()),
_ => None,
};
let ts_event_last = match self {
ChannelState::Writable(s) => match &s.created.state {
MonitoringState::Evented(_, s) => Some(s.ts_last),
_ => None,
},
ChannelState::Created(s) => Some(s.channel.ts_alive_last),
_ => None,
};
let recv_count = match self {
ChannelState::Writable(s) => match &s.created.state {
MonitoringState::Evented(_, s) => Some(s.recv_count),
_ => None,
},
ChannelState::Created(s) => Some(s.channel.recv_count),
_ => None,
};
let recv_bytes = match self {
ChannelState::Created(s) => Some(s.channel.recv_bytes),
_ => None,
};
let item_recv_ivl_ema = match self {
ChannelState::Writable(s) => {
let ema = s.created.item_recv_ivl_ema.ema();
ChannelState::Created(s) => {
let ema = s.channel.item_recv_ivl_ema.ema();
if ema.update_count() == 0 {
None
} else {
@@ -286,7 +348,7 @@ impl ChannelState {
_ => None,
};
let series = match self {
ChannelState::Writable(s) => Some(s.writer.sid()),
ChannelState::Created(s) => Some(s.writer.sid()),
_ => None,
};
let interest_score = 1. / item_recv_ivl_ema.unwrap_or(1e10).max(1e-6).min(1e10);
@@ -300,6 +362,7 @@ impl ChannelState {
ts_created,
ts_event_last,
recv_count,
recv_bytes,
item_recv_ivl_ema,
interest_score,
}
@@ -308,9 +371,9 @@ impl ChannelState {
fn cssid(&self) -> ChannelStatusSeriesId {
match self {
ChannelState::Init(cssid) => cssid.clone(),
ChannelState::Creating { cssid, .. } => cssid.clone(),
ChannelState::MakingSeriesWriter(st) => st.cssid.clone(),
ChannelState::Writable(st) => st.created.cssid.clone(),
ChannelState::Creating(st) => st.cssid.clone(),
ChannelState::MakingSeriesWriter(st) => st.channel.cssid.clone(),
ChannelState::Created(st) => st.channel.cssid.clone(),
ChannelState::Error(e) => match e {
ChannelError::CreateChanFail(cssid) => cssid.clone(),
},
@@ -558,12 +621,11 @@ pub struct CaConn {
remote_addr_dbg: SocketAddrV4,
local_epics_hostname: String,
stats: Arc<CaConnStats>,
insert_ivl_min_mus: u64,
insert_ivl_min_mus: u32,
conn_command_tx: Pin<Box<Sender<ConnCommand>>>,
conn_command_rx: Pin<Box<Receiver<ConnCommand>>>,
conn_backoff: f32,
conn_backoff_beg: f32,
inserts_counter: u64,
extra_inserts_conf: ExtraInsertsConf,
ioc_ping_last: Instant,
ioc_ping_next: Instant,
@@ -580,6 +642,7 @@ pub struct CaConn {
writer_tx: Sender<(JobId, Result<SeriesWriter, serieswriter::writer::Error>)>,
writer_rx: Pin<Box<Receiver<(JobId, Result<SeriesWriter, serieswriter::writer::Error>)>>>,
tmp_ts_poll: SystemTime,
poll_tsnow: Instant,
}
impl Drop for CaConn {
@@ -600,6 +663,7 @@ impl CaConn {
ca_proto_stats: Arc<CaProtoStats>,
writer_establish_tx: Sender<EstablishWorkerJob>,
) -> Self {
let _ = channel_info_query_tx;
let tsnow = Instant::now();
let (writer_tx, writer_rx) = async_channel::bounded(32);
let (cq_tx, cq_rx) = async_channel::bounded(32);
@@ -628,7 +692,6 @@ impl CaConn {
conn_command_rx: Box::pin(cq_rx),
conn_backoff: 0.02,
conn_backoff_beg: 0.02,
inserts_counter: 0,
extra_inserts_conf: ExtraInsertsConf::new(),
ioc_ping_last: tsnow,
ioc_ping_next: tsnow + Self::ioc_ping_ivl_rng(&mut rng),
@@ -645,6 +708,7 @@ impl CaConn {
writer_tx,
writer_rx: Box::pin(writer_rx),
tmp_ts_poll: SystemTime::now(),
poll_tsnow: tsnow,
}
}
@@ -890,7 +954,7 @@ impl CaConn {
}
}
fn handle_writer_establish_inner(&mut self, cid: Cid, wr: SeriesWriter) -> Result<(), Error> {
fn handle_writer_establish_inner(&mut self, cid: Cid, writer: SeriesWriter) -> Result<(), Error> {
debug!("handle_writer_establish_inner {cid:?}");
// At this point we have created the channel and created a writer for that type and sid.
// We do not yet monitor.
@@ -901,18 +965,22 @@ impl CaConn {
if let Some(chst) = self.channels.get_mut(&cid) {
if let ChannelState::MakingSeriesWriter(st2) = chst {
self.stats.get_series_id_ok.inc();
let item = QueryItem::ChannelStatus(ChannelStatusItem {
ts: self.tmp_ts_poll,
cssid: st2.cssid.clone(),
status: ChannelStatus::Opened,
});
self.insert_item_queue.push_back(item);
{
let data_type = wr.scalar_type().to_ca_id()?;
let item = QueryItem::ChannelStatus(ChannelStatusItem {
ts: self.tmp_ts_poll,
cssid: st2.channel.cssid.clone(),
status: ChannelStatus::Opened,
});
self.insert_item_queue.push_back(item);
}
{
let data_type = writer.scalar_type().to_ca_id()?;
if data_type > 6 {
error!("data type of series unexpected {} {:?}", data_type, wr.scalar_type());
error!(
"data type of series unexpected {} {:?}",
data_type,
writer.scalar_type()
);
}
let subid = self.subid_store.next();
self.cid_by_subid.insert(subid, cid);
@@ -920,20 +988,22 @@ impl CaConn {
let data_type_asked = data_type + 14;
debug!("send out EventAdd for {cid:?}");
let ty = CaMsgTy::EventAdd(EventAdd {
sid: st2.sid.0,
sid: st2.channel.sid.to_u32(),
data_type: data_type_asked,
data_count: wr.shape().to_ca_count()? as _,
subid: subid.0,
data_count: writer.shape().to_ca_count()? as u16,
subid: subid.to_u32(),
});
let msg = CaMsg::from_ty_ts(ty, Instant::now());
let msg = CaMsg::from_ty_ts(ty, self.poll_tsnow);
let proto = self.proto.as_mut().unwrap();
proto.push_out(msg);
}
st2.state = MonitoringState::AddingEvent(wr.sid());
*chst = ChannelState::Writable(WritableState {
created: st2.clone(),
writer: wr,
});
let created_state = CreatedState22 {
tsbeg: self.poll_tsnow,
channel: std::mem::replace(&mut st2.channel, CreatedState::dummy()),
writer,
reading: ReadingState::EnableMonitoring(EnableMonitoringState { tsbeg: self.poll_tsnow }),
};
*chst = ChannelState::Created(created_state);
Ok(())
} else {
warn!("TODO handle_series_lookup_result channel in bad state, reset");
@@ -1047,14 +1117,16 @@ impl CaConn {
ChannelState::Init(cssid) => {
*chst = ChannelState::Ended(cssid.clone());
}
ChannelState::Creating { cssid, .. } => {
*chst = ChannelState::Ended(cssid.clone());
ChannelState::Creating(st2) => {
*chst = ChannelState::Ended(st2.cssid.clone());
}
ChannelState::MakingSeriesWriter(st) => {
*chst = ChannelState::Ended(st.cssid.clone());
*chst = ChannelState::Ended(st.channel.cssid.clone());
}
ChannelState::Writable(st2) => {
let cssid = st2.created.cssid.clone();
ChannelState::Created(st2) => {
let cssid = st2.channel.cssid.clone();
// TODO should call the proper channel-close handler which in turn emits the status item.
// Make sure I record the reason for the "Close": user command, IOC error, etc..
let item = QueryItem::ChannelStatus(ChannelStatusItem {
ts: self.tmp_ts_poll,
cssid: cssid.clone(),
@@ -1106,8 +1178,9 @@ impl CaConn {
let mut not_alive_count = 0;
for (_, st) in &self.channels {
match st {
ChannelState::Writable(st) => {
if tsnow.duration_since(st.created.ts_alive_last) >= Duration::from_millis(10000) {
ChannelState::Created(st2) => {
if tsnow.duration_since(st2.channel.ts_alive_last) >= Duration::from_millis(10000) {
warn!("TODO assume channel not alive because nothing received, but should do CAGET");
not_alive_count += 1;
} else {
alive_count += 1;
@@ -1126,29 +1199,25 @@ impl CaConn {
let timenow = self.tmp_ts_poll;
for (_, st) in &mut self.channels {
match st {
ChannelState::Init(_cssid) => {
ChannelState::Init(..) => {
// TODO need last-save-ts for this state.
}
ChannelState::Creating {
cid: _,
ts_beg: _,
cssid: _,
} => {
ChannelState::Creating(..) => {
// TODO need last-save-ts for this state.
}
ChannelState::MakingSeriesWriter(..) => {
// TODO ?
}
ChannelState::Writable(st) => {
let created = &mut st.created;
ChannelState::Created(st) => {
let crst = &mut st.channel;
// TODO if we don't wave a series id yet, dont' save? write-ampl.
let msp = info_store_msp_from_time(timenow.clone());
if msp != created.info_store_msp_last {
created.info_store_msp_last = msp;
if msp != crst.info_store_msp_last {
crst.info_store_msp_last = msp;
let item = QueryItem::ChannelInfo(ChannelInfoItem {
ts_msp: msp,
series: st.writer.sid(),
ivl: created.item_recv_ivl_ema.ema().ema(),
ivl: crst.item_recv_ivl_ema.ema().ema(),
interest: 0.,
evsize: 0,
});
@@ -1190,95 +1259,35 @@ impl CaConn {
};
// debug!("handle_event_add_res {ev:?}");
match ch_s {
ChannelState::Writable(st) => {
let created = &mut st.created;
created.ts_alive_last = tsnow;
created.item_recv_ivl_ema.tick(tsnow);
let series = match &mut created.state {
MonitoringState::AddingEvent(series) => {
let series = series.clone();
created.state = MonitoringState::Evented(
series.clone(),
EventedState {
ts_last: tsnow,
recv_count: 0,
recv_bytes: 0,
},
);
series
}
MonitoringState::Evented(series, st) => {
st.ts_last = tsnow;
series.clone()
}
_ => {
let e = Error::from_string(format!("unexpected state: EventAddRes while having {:?}", created));
error!("{e}");
return Err(e);
}
};
// TODO should attach these counters already to Writable state.
if let MonitoringState::Evented(_, st2) = &mut created.state {
st2.recv_count += 1;
st2.recv_bytes += ev.payload_len as u64;
ChannelState::Created(st) => match &mut st.reading {
ReadingState::EnableMonitoring(st2) => {
let dt = st2.tsbeg.elapsed().as_secs_f32();
debug!("change to Monitoring after dt {dt:.0} ms");
st.reading = ReadingState::Monitoring(MonitoringState { tsbeg: tsnow });
Self::event_add_ingest(
ev,
&mut st.channel,
&mut st.writer,
&mut self.insert_item_queue,
tsnow,
self.tmp_ts_poll,
self.stats.as_ref(),
)?;
}
let ts_local = {
let ts = self.tmp_ts_poll;
let epoch = ts.duration_since(std::time::UNIX_EPOCH).unwrap();
epoch.as_secs() * SEC + epoch.subsec_nanos() as u64
};
let ts = ev.value.ts;
let ts_diff = ts.abs_diff(ts_local);
self.stats.ca_ts_off().ingest((ts_diff / MS) as u32);
if tsnow >= created.insert_next_earliest {
{
created.muted_before = 0;
created.insert_item_ivl_ema.tick(tsnow);
let em = created.insert_item_ivl_ema.ema();
let ema = em.ema();
let ivl_min = (self.insert_ivl_min_mus as f32) * 1e-6;
let dt = (ivl_min - ema).max(0.) / em.k();
created.insert_next_earliest = tsnow + Duration::from_micros((dt * 1e6) as u64);
}
Self::check_ev_value_data(&ev.value.data, st.writer.scalar_type())?;
{
let val: DataValue = ev.value.data.into();
st.writer
.write(
TsNano::from_ns(ts),
TsNano::from_ns(ts_local),
val,
&mut self.insert_item_queue,
)
.map_err(|e| Error::from_string(e))?;
self.inserts_counter += 1;
}
} else {
self.stats.channel_fast_item_drop.inc();
if tsnow.duration_since(created.insert_recv_ivl_last) >= Duration::from_millis(10000) {
created.insert_recv_ivl_last = tsnow;
let ema = created.insert_item_ivl_ema.ema();
let item = IvlItem {
series: series.clone(),
ts,
ema: ema.ema(),
emd: ema.emv().sqrt(),
};
self.insert_item_queue.push_back(QueryItem::Ivl(item));
}
if false && created.muted_before == 0 {
let ema = created.insert_item_ivl_ema.ema();
let item = MuteItem {
series: series.clone(),
ts,
ema: ema.ema(),
emd: ema.emv().sqrt(),
};
self.insert_item_queue.push_back(QueryItem::Mute(item));
}
created.muted_before = 1;
ReadingState::Monitoring(_st2) => {
let crst = &mut st.channel;
let writer = &mut st.writer;
let iiq = &mut self.insert_item_queue;
let stats = self.stats.as_ref();
Self::event_add_ingest(ev, crst, writer, iiq, tsnow, self.tmp_ts_poll, stats)?;
}
}
ReadingState::StopMonitoringForPolling(..) => {
error!("TODO handle_event_add_res handle StopMonitoringForPolling");
}
ReadingState::Polling(..) => {
error!("TODO handle_event_add_res handle Polling");
}
},
_ => {
// TODO count instead of print
error!("unexpected state: EventAddRes while having {ch_s:?}");
@@ -1287,6 +1296,74 @@ impl CaConn {
Ok(())
}
fn event_add_ingest(
ev: proto::EventAddRes,
crst: &mut CreatedState,
writer: &mut SeriesWriter,
iiq: &mut VecDeque<QueryItem>,
tsnow: Instant,
stnow: SystemTime,
stats: &CaConnStats,
) -> Result<(), Error> {
crst.ts_alive_last = tsnow;
crst.item_recv_ivl_ema.tick(tsnow);
crst.recv_count += 1;
crst.recv_bytes += ev.payload_len as u64;
let series = writer.sid();
// TODO should attach these counters already to Writable state.
let ts_local = {
let epoch = stnow.duration_since(std::time::UNIX_EPOCH).unwrap();
epoch.as_secs() * SEC + epoch.subsec_nanos() as u64
};
let ts = ev.value.ts;
let ts_diff = ts.abs_diff(ts_local);
stats.ca_ts_off().ingest((ts_diff / MS) as u32);
if tsnow >= crst.insert_next_earliest {
{
crst.muted_before = 0;
crst.insert_item_ivl_ema.tick(tsnow);
let em = crst.insert_item_ivl_ema.ema();
let ema = em.ema();
let ivl_min = (crst.insert_ivl_min_mus as f32) * 1e-6;
let dt = (ivl_min - ema).max(0.) / em.k();
crst.insert_next_earliest = tsnow + Duration::from_micros((dt * 1e6) as u64);
}
Self::check_ev_value_data(&ev.value.data, writer.scalar_type())?;
{
let val: DataValue = ev.value.data.into();
writer
.write(TsNano::from_ns(ts), TsNano::from_ns(ts_local), val, iiq)
.map_err(|e| Error::from_string(e))?;
}
Ok(())
} else {
stats.channel_fast_item_drop.inc();
if tsnow.duration_since(crst.insert_recv_ivl_last) >= Duration::from_millis(10000) {
crst.insert_recv_ivl_last = tsnow;
let ema = crst.insert_item_ivl_ema.ema();
let item = IvlItem {
series: series.clone(),
ts,
ema: ema.ema(),
emd: ema.emv().sqrt(),
};
iiq.push_back(QueryItem::Ivl(item));
}
if false && crst.muted_before == 0 {
let ema = crst.insert_item_ivl_ema.ema();
let item = MuteItem {
series: series.clone(),
ts,
ema: ema.ema(),
emd: ema.emv().sqrt(),
};
iiq.push_back(QueryItem::Mute(item));
}
crst.muted_before = 1;
Ok(())
}
}
fn check_ev_value_data(data: &proto::CaDataValue, scalar_type: &ScalarType) -> Result<(), Error> {
use crate::ca::proto::CaDataScalarValue;
use crate::ca::proto::CaDataValue;
@@ -1372,14 +1449,14 @@ impl CaConn {
}
}
fn check_channels_state_init(&mut self, do_wake_again: &mut bool) -> Result<(), Error> {
fn check_channels_state_init(&mut self, tsnow: Instant, do_wake_again: &mut bool) -> Result<(), Error> {
// TODO profile, efficient enough?
if self.init_state_count == 0 {
return Ok(());
}
let keys: Vec<Cid> = self.channels.keys().map(|x| *x).collect();
for cid in keys {
match self.channels.get_mut(&cid).unwrap() {
match self.channels.get(&cid).unwrap() {
ChannelState::Init(cssid) => {
let cssid = cssid.clone();
let name = self
@@ -1394,17 +1471,17 @@ impl CaConn {
cid: cid.0,
channel: name.into(),
}),
Instant::now(),
tsnow,
);
*do_wake_again = true;
self.proto.as_mut().unwrap().push_out(msg);
// TODO handle not-found error:
let ch_s = self.channels.get_mut(&cid).unwrap();
*ch_s = ChannelState::Creating {
*ch_s = ChannelState::Creating(CreatingState {
tsbeg: tsnow,
cssid,
cid,
ts_beg: Instant::now(),
};
});
self.init_state_count -= 1;
}
_ => {}
@@ -1420,7 +1497,7 @@ impl CaConn {
let mut ts1 = Instant::now();
// TODO unify with Listen state where protocol gets polled as well.
let mut do_wake_again = false;
self.check_channels_state_init(&mut do_wake_again)?;
self.check_channels_state_init(ts1, &mut do_wake_again)?;
let ts2 = Instant::now();
self.stats
.time_check_channels_state_init
@@ -1543,10 +1620,23 @@ impl CaConn {
}
fn handle_create_chan_res(&mut self, k: proto::CreateChanRes, tsnow: Instant) -> Result<(), Error> {
// TODO handle cid-not-found which can also indicate peer error.
let cid = Cid(k.cid);
let sid = Sid(k.sid);
let name = if let Some(x) = self.name_by_cid(cid) {
let channels = &mut self.channels;
let name_by_cid = &self.name_by_cid;
// TODO handle not-found error:
let ch_s = channels.get_mut(&cid).unwrap();
let cssid = match ch_s {
ChannelState::Creating(st) => st.cssid.clone(),
_ => {
// TODO handle in better way:
// Remove channel and emit notice that channel is removed with reason.
let e = Error::with_msg_no_trace("handle_peer_ready bad state");
return Err(e);
}
};
// TODO handle cid-not-found which can also indicate peer error.
let name = if let Some(x) = name_by_cid.get(&cid) {
x.to_string()
} else {
return Err(Error::with_msg_no_trace(format!("no name for {cid:?}")));
@@ -1557,24 +1647,12 @@ impl CaConn {
}
let scalar_type = ScalarType::from_ca_id(k.data_type)?;
let shape = Shape::from_ca_count(k.data_count)?;
// TODO handle not-found error:
let ch_s = self.channels.get_mut(&cid).unwrap();
let cssid = match ch_s {
ChannelState::Creating { cssid, .. } => cssid.clone(),
_ => {
// TODO handle in better way:
// Remove channel and emit notice that channel is removed with reason.
let e = Error::with_msg_no_trace("handle_peer_ready bad state");
return Err(e);
}
};
let created_state = CreatedState {
let channel = CreatedState {
cssid,
cid,
sid,
ts_created: tsnow,
ts_alive_last: tsnow,
state: MonitoringState::FetchSeriesId,
ts_msp_last: 0,
ts_msp_grid_last: 0,
inserted_in_ts_msp: u64::MAX,
@@ -1583,9 +1661,12 @@ impl CaConn {
insert_recv_ivl_last: tsnow,
insert_next_earliest: tsnow,
muted_before: 0,
insert_ivl_min_mus: self.insert_ivl_min_mus,
info_store_msp_last: info_store_msp_from_time(self.tmp_ts_poll),
recv_count: 0,
recv_bytes: 0,
};
*ch_s = ChannelState::MakingSeriesWriter(created_state);
*ch_s = ChannelState::MakingSeriesWriter(MakingSeriesWriterState { tsbeg: tsnow, channel });
let name = self
.name_by_cid(cid)
.ok_or_else(|| Error::from_string(format!("no name for cid {cid:?}")))?;
@@ -1865,7 +1946,7 @@ impl CaConn {
fn tick_writers(&mut self) -> Result<(), Error> {
for (k, st) in &mut self.channels {
if let ChannelState::Writable(st2) = st {
if let ChannelState::Created(st2) = st {
st2.writer
.tick(&mut self.insert_item_queue)
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
@@ -1888,110 +1969,6 @@ impl CaConn {
self.insert_item_queue.is_empty() && self.storage_insert_sender.is_idle()
}
fn attempt_flush_storage_queue(mut self: Pin<&mut Self>, cx: &mut Context) -> Result<Poll<Option<()>>, Error> {
use Poll::*;
let (qu, sd, stats) = Self::storage_queue_vars(&mut self);
{
// TODO use stats histogram type to test the native prometheus histogram feature
let n = qu.len();
if n >= 128 {
stats.storage_queue_above_128().inc();
} else if n >= 32 {
stats.storage_queue_above_32().inc();
} else if n >= 8 {
stats.storage_queue_above_8().inc();
}
}
let mut have_progress = false;
let mut i = 0;
loop {
i += 1;
if i > 120 {
break;
}
if !sd.has_sender() {
return Err(Error::with_msg_no_trace("attempt_flush_storage_queue no more sender"));
}
if sd.is_idle() {
if qu.len() != 0 {
let item: VecDeque<_> = qu.drain(..).collect();
stats.storage_queue_send().add(item.len() as _);
sd.as_mut().send_pin(item);
} else {
break;
}
}
if sd.is_sending() {
match sd.poll_unpin(cx) {
Ready(Ok(())) => {
have_progress = true;
}
Ready(Err(_)) => {
return Err(Error::with_msg_no_trace(
"attempt_flush_storage_queue can not send into channel",
));
}
Pending => {
stats.storage_queue_pending().inc();
return Ok(Pending);
}
}
}
}
if have_progress {
Ok(Ready(Some(())))
} else {
Ok(Ready(None))
}
}
// TODO refactor, put together in separate type:
fn storage_queue_vars(
this: &mut CaConn,
) -> (
&mut VecDeque<QueryItem>,
&mut Pin<Box<SenderPolling<VecDeque<QueryItem>>>>,
&CaConnStats,
) {
(
&mut this.insert_item_queue,
&mut this.storage_insert_sender,
&this.stats,
)
}
fn attempt_flush_writer_establish(mut self: Pin<&mut Self>, cx: &mut Context) -> Result<Poll<Option<()>>, Error> {
use Poll::*;
let sd = self.writer_establish_tx.as_mut();
if !sd.has_sender() {
return Err(Error::with_msg_no_trace(
"attempt_flush_channel_info_query no more sender",
));
}
if sd.is_idle() {
if let Some(item) = self.writer_establish_qu.pop_front() {
trace3!("send EstablishWorkerJob");
let sd = self.writer_establish_tx.as_mut();
sd.send_pin(item);
}
}
let sd = &mut self.writer_establish_tx;
if sd.is_sending() {
match sd.poll_unpin(cx) {
Ready(Ok(())) => {
debug!("flushed writer establish job");
Ok(Ready(Some(())))
}
Ready(Err(_)) => Err(Error::with_msg_no_trace(
"attempt_flush_channel_info_query can not send into channel",
)),
Pending => Ok(Pending),
}
} else {
Ok(Ready(None))
}
}
fn attempt_flush_queue<T, Q, FB>(
qu: &mut VecDeque<T>,
sp: &mut Pin<Box<SenderPolling<Q>>>,
@@ -2046,19 +2023,38 @@ impl CaConn {
Ok(Ready(None))
}
}
}
fn send_individual<T>(qu: &mut VecDeque<T>) -> Option<T> {
qu.pop_front()
}
fn send_batched<const N: usize, T>(qu: &mut VecDeque<T>) -> Option<VecDeque<T>> {
let n = qu.len();
if n == 0 {
None
} else {
let batch = qu.drain(..n.min(N)).collect();
Some(batch)
// $have is tuple (have_progress, have_pending))
macro_rules! flush_queue {
($self:expr, $qu:ident, $sp:ident, $batcher:expr, $loop_max:expr, $have:expr, $id:expr, $cx:expr) => {
let obj = $self.as_mut().get_mut();
let qu = &mut obj.$qu;
let sp = &mut obj.$sp;
match Self::attempt_flush_queue(qu, sp, $batcher, $loop_max, $cx, $id) {
Ok(Ready(Some(()))) => {
*$have.0 |= true;
}
Ok(Ready(None)) => {}
Ok(Pending) => {
*$have.1 |= true;
}
Err(e) => break Ready(Some(Err(e))),
}
};
}
fn send_individual<T>(qu: &mut VecDeque<T>) -> Option<T> {
qu.pop_front()
}
fn send_batched<const N: usize, T>(qu: &mut VecDeque<T>) -> Option<VecDeque<T>> {
let n = qu.len();
if n == 0 {
None
} else {
let batch = qu.drain(..n.min(N)).collect();
Some(batch)
}
}
@@ -2067,6 +2063,7 @@ impl Stream for CaConn {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
self.poll_tsnow = Instant::now();
self.tmp_ts_poll = SystemTime::now();
let poll_ts1 = Instant::now();
self.stats.poll_count().inc();
@@ -2103,41 +2100,51 @@ impl Stream for CaConn {
Err(e) => break Ready(Some(Err(e))),
}
if !self.is_shutdown() {
let obj = self.as_mut().get_mut();
let qu = &mut obj.insert_item_queue;
let sp = &mut obj.storage_insert_sender;
match Self::attempt_flush_queue(qu, sp, Self::send_batched::<32, _>, 32, cx, "strg") {
Ok(Ready(Some(()))) => {
have_progress = true;
}
Ok(Ready(None)) => {}
Ok(Pending) => {
have_pending = true;
}
Err(e) => break Ready(Some(Err(e))),
{
// TODO use stats histogram type to test the native prometheus histogram feature
let qu = &self.insert_item_queue;
let stats = &self.stats;
let n = qu.len();
if n >= 128 {
stats.storage_queue_above_128().inc();
} else if n >= 32 {
stats.storage_queue_above_32().inc();
} else if n >= 8 {
stats.storage_queue_above_8().inc();
}
}
let lts3 = Instant::now();
let lts2;
let lts3;
if !self.is_shutdown() {
let obj = self.as_mut().get_mut();
let qu = &mut obj.writer_establish_qu;
let sp = &mut obj.writer_establish_tx;
match Self::attempt_flush_queue(qu, sp, Self::send_individual, 32, cx, "wr-est") {
Ok(Ready(Some(()))) => {
have_progress = true;
}
Ok(Ready(None)) => {}
Ok(Pending) => {
have_pending = true;
}
Err(e) => break Ready(Some(Err(e))),
}
}
flush_queue!(
self,
insert_item_queue,
storage_insert_sender,
send_batched::<48, _>,
32,
(&mut have_progress, &mut have_pending),
"strg",
cx
);
lts2 = Instant::now();
let lts2 = Instant::now();
flush_queue!(
self,
writer_establish_qu,
writer_establish_tx,
send_individual,
32,
(&mut have_progress, &mut have_pending),
"wrest",
cx
);
lts3 = Instant::now();
} else {
lts2 = Instant::now();
lts3 = Instant::now();
}
match self.as_mut().handle_writer_establish_result(cx) {
Ok(Ready(Some(()))) => {