WIP factor initial get

This commit is contained in:
Dominik Werder
2024-07-01 17:33:07 +02:00
parent e34fee60fd
commit 816f7f130f
7 changed files with 342 additions and 120 deletions

View File

@@ -1,6 +1,6 @@
[package]
name = "daqingest"
version = "0.2.2-aa.2"
version = "0.2.2-aa.3"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"

View File

@@ -34,7 +34,7 @@ pin-project = "1"
lazy_static = "1"
libc = "0.2"
slidebuf = "0.0.1"
dashmap = "5.5.3"
dashmap = "6.0.1"
hashbrown = "0.14.3"
log = { path = "../log" }
series = { path = "../series" }

View File

@@ -1,3 +1,5 @@
mod enumfetch;
use super::proto;
use super::proto::CaEventValue;
use super::proto::ReadNotify;
@@ -10,6 +12,7 @@ use async_channel::Receiver;
use async_channel::Sender;
use core::fmt;
use dbpg::seriesbychannel::ChannelInfoQuery;
use enumfetch::ConnFuture;
use err::thiserror;
use err::ThisError;
use futures_util::Future;
@@ -152,6 +155,7 @@ pub enum Error {
DurationOutOfBounds,
NoFreeCid,
InsertQueues(#[from] scywr::insertqueues::Error),
FutLogic,
}
impl err::ToErr for Error {
@@ -285,6 +289,12 @@ struct MakingSeriesWriterState {
channel: CreatedState,
}
#[derive(Debug, Clone)]
struct FetchEnumDetails {
tsbeg: Instant,
cssid: ChannelStatusSeriesId,
}
#[derive(Debug, Clone)]
struct EnableMonitoringState {
tsbeg: Instant,
@@ -419,6 +429,8 @@ struct CreatedState {
dw_lt_last: SystemTime,
scalar_type: ScalarType,
shape: Shape,
log_more: bool,
name: String,
}
impl CreatedState {
@@ -458,14 +470,21 @@ impl CreatedState {
dw_lt_last: SystemTime::UNIX_EPOCH,
scalar_type: ScalarType::I8,
shape: Shape::Scalar,
log_more: false,
name: String::new(),
}
}
pub fn name(&self) -> &str {
&self.name
}
}
#[derive(Debug)]
enum ChannelState {
Init(ChannelStatusSeriesId),
Creating(CreatingState),
FetchEnumDetails(FetchEnumDetails),
MakingSeriesWriter(MakingSeriesWriterState),
Writable(WritableState),
Closing(ClosingState),
@@ -502,6 +521,7 @@ impl ChannelState {
let channel_connected_info = match self {
ChannelState::Init(..) => ChannelConnectedInfo::Disconnected,
ChannelState::Creating { .. } => ChannelConnectedInfo::Connecting,
ChannelState::FetchEnumDetails(_) => ChannelConnectedInfo::Connecting,
ChannelState::MakingSeriesWriter(_) => ChannelConnectedInfo::Connecting,
ChannelState::Writable(_) => ChannelConnectedInfo::Connected,
ChannelState::Error(_) => ChannelConnectedInfo::Error,
@@ -587,6 +607,7 @@ impl ChannelState {
match self {
ChannelState::Init(cssid) => cssid.clone(),
ChannelState::Creating(st) => st.cssid.clone(),
ChannelState::FetchEnumDetails(st) => st.cssid.clone(),
ChannelState::MakingSeriesWriter(st) => st.channel.cssid.clone(),
ChannelState::Writable(st) => st.channel.cssid.clone(),
ChannelState::Error(e) => match e {
@@ -898,6 +919,7 @@ pub struct CaConn {
poll_tsnow: Instant,
ioid: u32,
read_ioids: HashMap<Ioid, Cid>,
handler_by_ioid: HashMap<Ioid, Option<Pin<Box<dyn ConnFuture>>>>,
}
impl Drop for CaConn {
@@ -964,6 +986,7 @@ impl CaConn {
poll_tsnow: tsnow,
ioid: 100,
read_ioids: HashMap::new(),
handler_by_ioid: HashMap::new(),
}
}
@@ -979,6 +1002,15 @@ impl CaConn {
Box::pin(tokio::time::sleep(Duration::from_millis(500)))
}
fn proto(&mut self) -> Option<&mut CaProto> {
self.proto.as_mut()
}
fn ioid_next(&mut self) -> Ioid {
self.ioid = self.ioid.wrapping_add(1);
Ioid(self.ioid)
}
pub fn conn_command_tx(&self) -> Sender<ConnCommand> {
self.conn_command_tx.as_ref().get_ref().clone()
}
@@ -1298,6 +1330,9 @@ impl CaConn {
ChannelState::Creating(st2) => {
*chst = ChannelState::Ended(st2.cssid.clone());
}
ChannelState::FetchEnumDetails(st) => {
*chst = ChannelState::Ended(st.cssid.clone());
}
ChannelState::MakingSeriesWriter(st) => {
*chst = ChannelState::Ended(st.channel.cssid.clone());
}
@@ -1331,6 +1366,9 @@ impl CaConn {
ChannelState::Init(..) => {
// TODO need last-save-ts for this state.
}
ChannelState::FetchEnumDetails(..) => {
// TODO need last-save-ts for this state.
}
ChannelState::Creating(..) => {
// TODO need last-save-ts for this state.
}
@@ -1535,7 +1573,10 @@ impl CaConn {
}
}
}
ChannelState::Creating(_) | ChannelState::Init(_) | ChannelState::MakingSeriesWriter(_) => {
ChannelState::Creating(_)
| ChannelState::Init(_)
| ChannelState::FetchEnumDetails(_)
| ChannelState::MakingSeriesWriter(_) => {
self.stats.recv_read_notify_but_not_init_yet.inc();
}
ChannelState::Closing(_) | ChannelState::Ended(_) | ChannelState::Error(_) => {
@@ -1607,94 +1648,112 @@ impl CaConn {
Ok(())
}
fn handle_read_notify_res(&mut self, ev: proto::ReadNotifyRes, tsnow: Instant) -> Result<(), Error> {
fn handle_read_notify_res(
&mut self,
ev: proto::ReadNotifyRes,
camsg_ts: Instant,
tsnow: Instant,
) -> Result<(), Error> {
// trace!("handle_read_notify_res {ev:?}");
// TODO can not rely on the SID in the response.
let sid_ev = Sid(ev.sid);
let ioid = Ioid(ev.ioid);
if let Some(cid) = self.read_ioids.get(&ioid) {
let ch_s = if let Some(x) = self.channels.get_mut(cid) {
&mut x.state
if let Some(pp) = self.handler_by_ioid.get_mut(&ioid) {
if let Some(mut fut) = pp.take() {
let camsg = CaMsg {
ty: CaMsgTy::ReadNotifyRes(ev),
ts: camsg_ts,
};
fut.as_mut().camsg(camsg, self);
Ok(())
} else {
warn!("handle_read_notify_res can not find channel for {cid:?} {ioid:?}");
return Ok(());
};
match ch_s {
ChannelState::Writable(st) => {
if st.channel.sid != sid_ev {
// TODO count for metrics
// warn!("mismatch in ReadNotifyRes {:?} {:?}", st.channel.sid, sid_ev);
}
let stnow = self.tmp_ts_poll;
let crst = &mut st.channel;
let stwin_ts = stnow.duration_since(std::time::UNIX_EPOCH).unwrap().as_secs() / 1;
if crst.stwin_ts != stwin_ts {
crst.stwin_ts = stwin_ts;
crst.stwin_count = 0;
}
{
crst.stwin_count += 1;
crst.stwin_bytes += ev.payload_len;
}
match &mut st.reading {
ReadingState::Polling(st2) => match &mut st2.tick {
PollTickState::Idle(_st3) => {
self.stats.recv_read_notify_while_polling_idle.inc();
}
PollTickState::Wait(st3, ioid) => {
let dt = tsnow.saturating_duration_since(*st3);
self.stats.caget_lat().ingest((1e3 * dt.as_secs_f32()) as u32);
// TODO maintain histogram of read-notify latencies
self.read_ioids.remove(ioid);
st2.tick = PollTickState::Idle(tsnow);
let iqdqs = &mut self.iqdqs;
let stats = self.stats.as_ref();
Self::read_notify_res_for_write(ev, st, iqdqs, stnow, tsnow, stats)?;
}
},
ReadingState::EnableMonitoring(_) => {
self.stats.recv_read_notify_while_enabling_monitoring.inc();
}
ReadingState::Monitoring(st2) => match &mut st2.mon2state {
Monitoring2State::Passive(st3) => {
if self.read_ioids.remove(&ioid).is_some() {
self.stats.recv_read_notify_state_passive_found_ioid.inc();
} else {
self.stats.recv_read_notify_state_passive.inc();
}
st3.tsbeg = tsnow;
}
Monitoring2State::ReadPending(ioid2, _since) => {
// We don't check again for `since` here. That's done in timeout checking.
// So we could be here a little beyond timeout but we don't care about that.
if ioid != *ioid2 {
// warn!("IOID mismatch ReadNotifyRes on Monitor Read Pending {ioid:?} {ioid2:?}");
self.stats.recv_read_notify_state_read_pending_bad_ioid.inc();
} else {
self.stats.recv_read_notify_state_read_pending.inc();
}
self.read_ioids.remove(&ioid);
st2.mon2state = Monitoring2State::Passive(Monitoring2PassiveState { tsbeg: tsnow });
let iqdqs = &mut self.iqdqs;
let stats = self.stats.as_ref();
Self::read_notify_res_for_write(ev, st, iqdqs, stnow, tsnow, stats)?;
}
},
ReadingState::StopMonitoringForPolling(..) => {
error!("TODO handle_read_notify_res handle StopMonitoringForPolling");
}
}
}
_ => {
// TODO count instead of print
error!("unexpected state: ReadNotifyRes while having {ch_s:?}");
}
Err(Error::FutLogic)
}
} else {
// warn!("unknown {ioid:?}");
self.stats.unknown_ioid().inc();
if let Some(cid) = self.read_ioids.get(&ioid) {
let ch_s = if let Some(x) = self.channels.get_mut(cid) {
&mut x.state
} else {
warn!("handle_read_notify_res can not find channel for {cid:?} {ioid:?}");
return Ok(());
};
match ch_s {
ChannelState::Writable(st) => {
if st.channel.sid != sid_ev {
// TODO count for metrics
// warn!("mismatch in ReadNotifyRes {:?} {:?}", st.channel.sid, sid_ev);
}
let stnow = self.tmp_ts_poll;
let crst = &mut st.channel;
let stwin_ts = stnow.duration_since(std::time::UNIX_EPOCH).unwrap().as_secs() / 1;
if crst.stwin_ts != stwin_ts {
crst.stwin_ts = stwin_ts;
crst.stwin_count = 0;
}
{
crst.stwin_count += 1;
crst.stwin_bytes += ev.payload_len;
}
match &mut st.reading {
ReadingState::Polling(st2) => match &mut st2.tick {
PollTickState::Idle(_st3) => {
self.stats.recv_read_notify_while_polling_idle.inc();
}
PollTickState::Wait(st3, ioid) => {
let dt = tsnow.saturating_duration_since(*st3);
self.stats.caget_lat().ingest((1e3 * dt.as_secs_f32()) as u32);
// TODO maintain histogram of read-notify latencies
self.read_ioids.remove(ioid);
st2.tick = PollTickState::Idle(tsnow);
let iqdqs = &mut self.iqdqs;
let stats = self.stats.as_ref();
Self::read_notify_res_for_write(ev, st, iqdqs, stnow, tsnow, stats)?;
}
},
ReadingState::EnableMonitoring(_) => {
self.stats.recv_read_notify_while_enabling_monitoring.inc();
}
ReadingState::Monitoring(st2) => match &mut st2.mon2state {
Monitoring2State::Passive(st3) => {
if self.read_ioids.remove(&ioid).is_some() {
self.stats.recv_read_notify_state_passive_found_ioid.inc();
} else {
self.stats.recv_read_notify_state_passive.inc();
}
st3.tsbeg = tsnow;
}
Monitoring2State::ReadPending(ioid2, _since) => {
// We don't check again for `since` here. That's done in timeout checking.
// So we could be here a little beyond timeout but we don't care about that.
if ioid != *ioid2 {
// warn!("IOID mismatch ReadNotifyRes on Monitor Read Pending {ioid:?} {ioid2:?}");
self.stats.recv_read_notify_state_read_pending_bad_ioid.inc();
} else {
self.stats.recv_read_notify_state_read_pending.inc();
}
self.read_ioids.remove(&ioid);
st2.mon2state = Monitoring2State::Passive(Monitoring2PassiveState { tsbeg: tsnow });
let iqdqs = &mut self.iqdqs;
let stats = self.stats.as_ref();
Self::read_notify_res_for_write(ev, st, iqdqs, stnow, tsnow, stats)?;
}
},
ReadingState::StopMonitoringForPolling(..) => {
error!("TODO handle_read_notify_res handle StopMonitoringForPolling");
}
}
}
_ => {
// TODO count instead of print
error!("unexpected state: ReadNotifyRes while having {ch_s:?}");
}
}
} else {
// warn!("unknown {ioid:?}");
self.stats.unknown_ioid().inc();
}
Ok(())
}
Ok(())
}
fn read_notify_res_for_write(
@@ -1733,13 +1792,20 @@ impl CaConn {
stnow: SystemTime,
stats: &CaConnStats,
) -> Result<(), Error> {
trace_event_incoming!(
"event_add_ingest payload_len {} value {:?} {} {}",
payload_len,
value,
value.status,
value.severity
);
if crst.log_more {
info!(
"event_add_ingest payload_len {} value {:?} {} {}",
payload_len, value, value.status, value.severity
);
} else {
trace_event_incoming!(
"event_add_ingest payload_len {} value {:?} {} {}",
payload_len,
value,
value.status,
value.severity
);
}
crst.ts_alive_last = tsnow;
crst.ts_activity_last = tsnow;
crst.st_activity_last = stnow;
@@ -1934,6 +2000,7 @@ impl CaConn {
match chst {
ChannelState::Init(_) => {}
ChannelState::Creating(_) => {}
ChannelState::FetchEnumDetails(_) => {}
ChannelState::MakingSeriesWriter(_) => {}
ChannelState::Writable(st2) => match &mut st2.reading {
ReadingState::EnableMonitoring(_) => {}
@@ -2158,7 +2225,7 @@ impl CaConn {
trace4!("got EventAddResEmpty {:?}", camsg.ts);
Self::handle_event_add_res_empty(self, ev, tsnow)?
}
CaMsgTy::ReadNotifyRes(ev) => Self::handle_read_notify_res(self, ev, tsnow)?,
CaMsgTy::ReadNotifyRes(ev) => Self::handle_read_notify_res(self, ev, camsg.ts, tsnow)?,
CaMsgTy::Echo => {
// let addr = &self.remote_addr_dbg;
if let Some(started) = self.ioc_ping_start {
@@ -2276,8 +2343,22 @@ impl CaConn {
let ca_dbr_type = k.data_type + 14;
let scalar_type = ScalarType::from_ca_id(k.data_type)?;
let shape = Shape::from_ca_count(k.data_count)?;
let log_more = match &scalar_type {
ScalarType::Enum => {
if cssid.id() % 20 == 14 {
let name = conf.conf.name();
info!("ENUM {}", name);
true
} else {
false
}
}
_ => false,
};
let (acc_msp, _) = TsMs::from_system_time(stnow).to_grid_02(EMIT_ACCOUNTING_SNAP);
let channel = CreatedState {
let created_state = CreatedState {
cssid,
cid,
sid,
@@ -2309,20 +2390,40 @@ impl CaConn {
dw_lt_last: SystemTime::UNIX_EPOCH,
scalar_type: scalar_type.clone(),
shape: shape.clone(),
log_more,
name: conf.conf.name().into(),
};
*chst = ChannelState::MakingSeriesWriter(MakingSeriesWriterState { tsbeg: tsnow, channel });
let job = EstablishWorkerJob::new(
JobId(cid.0 as _),
self.backend.clone(),
conf.conf.name().into(),
cssid,
scalar_type,
shape,
conf.conf.min_quiets(),
self.writer_tx.clone(),
self.tmp_ts_poll,
);
self.writer_establish_qu.push_back(job);
match &scalar_type {
ScalarType::Enum => {
if created_state.log_more {
let min_quiets = conf.conf.min_quiets();
let fut = enumfetch::EnumFetch::new(created_state, self, min_quiets);
// TODO should always check if the slot is free.
let ioid = fut.ioid();
let x = Box::pin(fut);
self.handler_by_ioid.insert(ioid, Some(x));
} else {
}
}
_ => {
*chst = ChannelState::MakingSeriesWriter(MakingSeriesWriterState {
tsbeg: tsnow,
channel: created_state,
});
let job = EstablishWorkerJob::new(
JobId(cid.0 as _),
self.backend.clone(),
conf.conf.name().into(),
cssid,
scalar_type,
shape,
conf.conf.min_quiets(),
self.writer_tx.clone(),
self.tmp_ts_poll,
);
self.writer_establish_qu.push_back(job);
}
}
Ok(())
}

View File

@@ -0,0 +1,83 @@
use super::CaConn;
use super::CreatedState;
use super::Ioid;
use crate::ca::proto::CaMsg;
use crate::ca::proto::ReadNotify;
use err::thiserror;
use err::ThisError;
use log::*;
use serieswriter::establish_worker::EstablishWorkerJob;
use std::pin::Pin;
use std::task::Poll;
use std::time::Instant;
#[derive(Debug, ThisError)]
pub enum Error {}
pub trait ConnFuture: Send {
fn camsg(self: Pin<&mut Self>, camsg: CaMsg, conn: &mut CaConn) -> Poll<()>;
}
pub struct EnumFetch {
created_state: CreatedState,
ioid: Ioid,
min_quiets: serieswriter::rtwriter::MinQuiets,
}
impl EnumFetch {
pub fn new(created_state: CreatedState, conn: &mut CaConn, min_quiets: serieswriter::rtwriter::MinQuiets) -> Self {
let name = created_state.name();
info!("EnumFetch::new name {name}");
let dbr_ctrl_enum = 31;
let ioid = conn.ioid_next();
let ty = crate::ca::proto::CaMsgTy::ReadNotify(ReadNotify {
data_type: dbr_ctrl_enum,
data_count: 0,
sid: created_state.sid.to_u32(),
ioid: ioid.0,
});
let ts = Instant::now();
let item = CaMsg::from_ty_ts(ty, ts);
conn.proto().unwrap().push_out(item);
Self {
created_state,
ioid,
min_quiets,
}
}
pub fn ioid(&self) -> Ioid {
self.ioid
}
}
impl ConnFuture for EnumFetch {
fn camsg(self: Pin<&mut Self>, camsg: CaMsg, conn: &mut CaConn) -> Poll<()> {
use Poll::*;
let tsnow = Instant::now();
let crst = &self.created_state;
let name = self.created_state.name();
info!("EnumFetch::poll {name}");
//*chst =
super::ChannelState::MakingSeriesWriter(super::MakingSeriesWriterState {
tsbeg: tsnow,
channel: crst.clone(),
});
let job = EstablishWorkerJob::new(
serieswriter::establish_worker::JobId(crst.cid.0 as _),
conn.backend.clone(),
crst.name().into(),
crst.cssid.clone(),
crst.scalar_type.clone(),
crst.shape.clone(),
self.min_quiets.clone(),
conn.writer_tx.clone(),
conn.tmp_ts_poll,
);
conn.writer_establish_qu.push_back(job);
Pending
}
}

View File

@@ -202,6 +202,7 @@ enum CaDbrMetaType {
Plain,
Status,
Time,
Ctrl,
}
#[derive(Debug)]
@@ -212,6 +213,13 @@ pub struct CaDbrType {
impl CaDbrType {
pub fn from_ca_u16(k: u16) -> Result<Self, Error> {
if k == 31 {
let ret = CaDbrType {
meta: CaDbrMetaType::Ctrl,
scalar_type: CaScalarType::Enum,
};
return Ok(ret);
}
if k > 20 {
return Err(Error::BadCaDbrTypeId(k));
}
@@ -908,18 +916,39 @@ impl CaMsg {
fn extract_ca_data_value(hi: &HeadInfo, payload: &[u8], array_truncate: usize) -> Result<CaEventValue, Error> {
use netpod::Shape;
let ca_dbr_ty = CaDbrType::from_ca_u16(hi.data_type)?;
if let CaDbrMetaType::Time = ca_dbr_ty.meta {
} else {
return Err(Error::MismatchDbrTimeType);
}
let ca_status = u16::from_be_bytes(payload[0..2].try_into().map_err(|_| Error::BadSlice)?);
let ca_severity = u16::from_be_bytes(payload[2..4].try_into().map_err(|_| Error::BadSlice)?);
let ca_secs = u32::from_be_bytes(payload[4..8].try_into().map_err(|_| Error::BadSlice)?);
let ca_nanos = u32::from_be_bytes(payload[8..12].try_into().map_err(|_| Error::BadSlice)?);
let ca_sh = Shape::from_ca_count(hi.data_count() as _).map_err(|_| {
error!("BadCaCount {hi:?}");
Error::BadCaCount
})?;
let ca_status;
let ca_severity;
let ca_secs;
let ca_nanos;
let ca_sh;
let data_offset = match &ca_dbr_ty.meta {
CaDbrMetaType::Plain => return Err(Error::MismatchDbrTimeType),
CaDbrMetaType::Status => return Err(Error::MismatchDbrTimeType),
CaDbrMetaType::Time => {
ca_status = u16::from_be_bytes(payload[0..2].try_into().map_err(|_| Error::BadSlice)?);
ca_severity = u16::from_be_bytes(payload[2..4].try_into().map_err(|_| Error::BadSlice)?);
ca_secs = u32::from_be_bytes(payload[4..8].try_into().map_err(|_| Error::BadSlice)?);
ca_nanos = u32::from_be_bytes(payload[8..12].try_into().map_err(|_| Error::BadSlice)?);
ca_sh = Shape::from_ca_count(hi.data_count() as _).map_err(|_| {
error!("BadCaCount {hi:?}");
Error::BadCaCount
})?;
12
}
CaDbrMetaType::Ctrl => {
ca_status = u16::from_be_bytes(payload[0..2].try_into().map_err(|_| Error::BadSlice)?);
ca_severity = u16::from_be_bytes(payload[2..4].try_into().map_err(|_| Error::BadSlice)?);
let st = std::time::SystemTime::now();
let dt = st.duration_since(std::time::SystemTime::UNIX_EPOCH).unwrap();
ca_secs = (dt.as_secs() - EPICS_EPOCH_OFFSET) as u32;
ca_nanos = dt.subsec_nanos();
ca_sh = Shape::Scalar;
let varcnt = u16::from_be_bytes(payload[4..6].try_into().map_err(|_| Error::BadSlice)?);
let s = String::from_utf8_lossy(&payload[6..6 + 26 * 16]);
info!("enum variants debug {varcnt} {s}");
2 + 2 + 2 + 26 * 16
}
};
let meta_padding = match ca_dbr_ty.meta {
CaDbrMetaType::Plain => 0,
CaDbrMetaType::Status => match ca_dbr_ty.scalar_type {
@@ -940,8 +969,17 @@ impl CaMsg {
CaScalarType::Enum => 2,
CaScalarType::String => 0,
},
CaDbrMetaType::Ctrl => match ca_dbr_ty.scalar_type {
CaScalarType::I8 => 1,
CaScalarType::I16 => 0,
CaScalarType::I32 => 0,
CaScalarType::F32 => 0,
CaScalarType::F64 => 0,
CaScalarType::Enum => 0,
CaScalarType::String => 0,
},
};
let valbuf = &payload[12 + meta_padding..];
let valbuf = &payload[data_offset + meta_padding..];
let value = match ca_sh {
Shape::Scalar => Self::ca_scalar_value(&ca_dbr_ty.scalar_type, valbuf)?,
Shape::Wave(n) => Self::ca_wave_value(&ca_dbr_ty.scalar_type, (n as usize).min(array_truncate), valbuf)?,

View File

@@ -7,7 +7,7 @@ edition = "2021"
[dependencies]
futures-util = "0.3.28"
async-channel = "2.3.1"
scylla = "0.11.0"
scylla = "0.13.0"
smallvec = "1.11.0"
pin-project = "1.1.5"
stackfuture = "0.3.0"

View File

@@ -33,7 +33,7 @@ pub enum Error {
SeriesWriter(#[from] crate::writer::Error),
}
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct MinQuiets {
pub st: Duration,
pub mt: Duration,