Write bins, support config reload

This commit is contained in:
Dominik Werder
2024-10-10 12:27:34 +02:00
parent d4b8beaa82
commit 87e6dfdcaa
16 changed files with 938 additions and 226 deletions
+139 -11
View File
@@ -710,6 +710,48 @@ impl ChannelState {
ChannelState::Closing(st) => st.cssid.clone(),
}
}
fn created_state(&self) -> Option<&CreatedState> {
match self {
ChannelState::Init(_) => None,
ChannelState::Creating(_) => None,
ChannelState::FetchEnumDetails(_) => None,
ChannelState::FetchCaStatusSeries(st2) => Some(&st2.channel),
ChannelState::MakingSeriesWriter(st2) => Some(&st2.channel),
ChannelState::Writable(st2) => Some(&st2.channel),
ChannelState::Closing(_) => None,
ChannelState::Error(_) => None,
ChannelState::Ended(_) => None,
}
}
fn cid(&self) -> Option<Cid> {
match self {
ChannelState::Init(_) => None,
ChannelState::Creating(_) => None,
ChannelState::FetchEnumDetails(_) => None,
ChannelState::FetchCaStatusSeries(st2) => Some(st2.channel.cid),
ChannelState::MakingSeriesWriter(st2) => Some(st2.channel.cid),
ChannelState::Writable(st2) => Some(st2.channel.cid),
ChannelState::Closing(_) => None,
ChannelState::Error(_) => None,
ChannelState::Ended(_) => None,
}
}
fn sid(&self) -> Option<Sid> {
match self {
ChannelState::Init(_) => None,
ChannelState::Creating(_) => None,
ChannelState::FetchEnumDetails(_) => None,
ChannelState::FetchCaStatusSeries(st2) => Some(st2.channel.sid),
ChannelState::MakingSeriesWriter(st2) => Some(st2.channel.sid),
ChannelState::Writable(st2) => Some(st2.channel.sid),
ChannelState::Closing(_) => None,
ChannelState::Error(_) => None,
ChannelState::Ended(_) => None,
}
}
}
#[derive(Debug)]
@@ -967,6 +1009,7 @@ pub enum CaConnEventValue {
ChannelStatus(ChannelStatusPartial),
ChannelCreateFail(String),
EndOfStream(EndOfStreamReason),
ChannelRemoved(String),
}
impl CaConnEventValue {
@@ -978,6 +1021,7 @@ impl CaConnEventValue {
CaConnEventValue::ChannelStatus(_) => "ChannelStatus",
CaConnEventValue::ChannelCreateFail(_) => "ChannelCreateFail",
CaConnEventValue::EndOfStream(_) => "EndOfStream",
CaConnEventValue::ChannelRemoved(_) => "ChannelRemoved",
}
}
}
@@ -1116,7 +1160,7 @@ impl CaConn {
iqsp: Box::pin(InsertSenderPolling::new(iqtx)),
ca_conn_event_out_queue: VecDeque::new(),
ca_conn_event_out_queue_max: 2000,
thr_msg_poll: ThrottleTrace::new(Duration::from_millis(10000)),
thr_msg_poll: ThrottleTrace::new(Duration::from_millis(2000)),
ca_proto_stats,
rng,
channel_info_query_qu: VecDeque::new(),
@@ -1470,6 +1514,7 @@ impl CaConn {
}
pub fn channel_add(&mut self, conf: ChannelConfig, cssid: ChannelStatusSeriesId) -> Result<(), Error> {
debug!("channel_add {conf:?} {cssid:?}");
if false {
if netpod::trigger.contains(&conf.name()) {
self.trace_channel_poll = true;
@@ -1500,10 +1545,82 @@ impl CaConn {
}
pub fn channel_close(&mut self, name: String) {
error!("TODO actually cause the channel to get closed and removed {}", name);
debug!("channel_close {}", name);
let tsnow = Instant::now();
let stnow = SystemTime::now();
let cid = if let Some(x) = self.cid_by_name.get(&name) {
x.clone()
} else {
debug!("channel_close {} can not find channel", name);
return;
};
self.cid_by_name.remove(&name);
if let Some(conf) = self.channels.get_mut(&cid) {
let mut item_deque = VecDeque::new();
let item = ChannelStatusItem {
ts: stnow,
cssid: conf.state.cssid(),
status: ChannelStatus::Closed(ChannelStatusClosedReason::ChannelRemove),
};
let deque = &mut item_deque;
if conf.wrst.emit_channel_status_item(item, deque).is_err() {
self.stats.logic_error().inc();
}
for x in item_deque {
self.iqdqs.st_rf3_qu.push_back(x);
}
// TODO shutdown the internal writer structures.
if let Some(cst) = conf.state.created_state() {
if let Some(proto) = self.proto.as_mut() {
let ty = CaMsgTy::ChannelClose(ChannelClose {
sid: cst.sid.to_u32(),
cid: cid.0,
});
let item = CaMsg::from_ty_ts(ty, tsnow);
proto.push_out(item);
}
}
{
let mut it = self.cid_by_subid.extract_if(|_, v| *v == cid);
if let Some((subid, _cid)) = it.next() {
it.count();
if let Some(cst) = conf.state.created_state() {
if let Some(proto) = self.proto.as_mut() {
let ty = CaMsgTy::EventCancel(EventCancel {
data_type: cst.ca_dbr_type,
data_count: cst.ca_dbr_count,
sid: cst.sid.to_u32(),
subid: subid.to_u32(),
});
let item = CaMsg::from_ty_ts(ty, tsnow);
proto.push_out(item);
}
}
};
}
} else {
debug!("channel_close {} no channel block", name);
};
{
let it = self.cid_by_sid.extract_if(|_, v| *v == cid);
it.count();
}
self.channels.remove(&cid);
// TODO emit CaConn item to let CaConnSet know that we have closed the channel.
// TODO may be too full
let value = CaConnEventValue::ChannelRemoved(name);
let item = CaConnEvent::new_now(value);
self.ca_conn_event_out_queue.push_back(item);
}
pub fn channel_remove(&mut self, name: String) {
fn channel_remove_by_name(&mut self, name: String) {
if let Some(cid) = self.cid_by_name(&name) {
self.channel_remove_by_cid(cid);
} else {
@@ -1652,7 +1769,10 @@ impl CaConn {
let cid = if let Some(x) = self.cid_by_subid.get(&subid) {
*x
} else {
warn!("can not find cid for subid {subid:?}");
if self.thr_msg_poll.is_action() {
self.stats.no_cid_for_subid().inc();
// debug!("can not find cid for subid {subid:?}");
}
// return Err(Error::with_msg_no_trace());
return Ok(());
};
@@ -1819,7 +1939,10 @@ impl CaConn {
let cid = if let Some(x) = self.cid_by_subid.get(&subid) {
*x
} else {
warn!("can not find cid for subid {subid:?}");
if self.thr_msg_poll.is_action() {
self.stats.no_cid_for_subid().inc();
// debug!("can not find cid for subid {subid:?}");
}
// return Err(Error::with_msg_no_trace());
return Ok(());
};
@@ -2101,20 +2224,20 @@ impl CaConn {
stats.logic_error().inc();
}
}
let ts_local = TsNano::from_system_time(stnow);
let tsev_local = TsNano::from_system_time(stnow);
{
let ts = value.ts().ok_or_else(|| Error::MissingTimestamp)?;
let ts_diff = ts.abs_diff(ts_local.ns());
let ts_diff = ts.abs_diff(tsev_local.ns());
stats.ca_ts_off().ingest((ts_diff / MS) as u32);
}
{
let tsev = ts_local;
let tsev = tsev_local;
Self::check_ev_value_data(&value.data, &writer.scalar_type())?;
crst.muted_before = 0;
crst.insert_item_ivl_ema.tick(tsnow);
// let ts_ioc = TsNano::from_ns(ts);
// let ts_local = TsNano::from_ns(ts_local);
// binwriter.ingest(ts_ioc, ts_local, &val, iqdqs)?;
binwriter.ingest(tsev_local, value.f32_for_binning(), iqdqs)?;
{
let wres = writer.write(CaWriterValue::new(value, crst), tsnow, tsev, iqdqs)?;
crst.status_emit_count += wres.nstatus() as u64;
@@ -3087,7 +3210,9 @@ impl CaConn {
for (_, chconf) in &mut self.channels {
let chst = &mut chconf.state;
if let ChannelState::Writable(st2) = chst {
st2.writer.tick(&mut self.iqdqs)?;
let iqdqs = &mut self.iqdqs;
st2.writer.tick(iqdqs)?;
st2.binwriter.tick(iqdqs)?;
}
}
Ok(())
@@ -3204,6 +3329,9 @@ macro_rules! flush_queue_dqs {
// let sp = std::pin::pin!(obj.iqsp.$sp);
// let sp = &mut obj.iqsp.$sp;
// let sp = std::pin::pin!(sp);
if qu.len() < qu.capacity() * 4 / 10 {
qu.shrink_to(qu.capacity() * 7 / 10);
}
let sp = obj.iqsp.as_mut().$sp();
match Self::attempt_flush_queue(qu, sp, $batcher, $loop_max, $cx, $id, $stats) {
Ok(Ready(Some(()))) => {
@@ -3603,7 +3731,7 @@ impl EmittableType for CaWriterValue {
state.last_accepted_val = Some(self.clone());
let byte_size = self.byte_size();
if diff_data {
debug!("diff_data emit {:?}", state.series_data);
// debug!("diff_data emit {:?}", state.series_data);
let (ts_msp, ts_lsp, ts_msp_chg) = state.msp_split_data.split(ts, self.byte_size());
let data_value = {
use super::proto::CaDataValue;