Debug enum write

This commit is contained in:
Dominik Werder
2024-07-03 07:29:55 +02:00
parent 816f7f130f
commit 54008718eb
7 changed files with 276 additions and 144 deletions
+98 -38
View File
@@ -130,6 +130,18 @@ macro_rules! trace_event_incoming {
};
}
fn dbg_chn_name(name: impl AsRef<str>) -> bool {
name.as_ref() == "SINSB02-KCOL-ACT:V-EY21700-MAN-ON-SP"
}
fn dbg_chn_cid(cid: Cid, conn: &CaConn) -> bool {
if let Some(name) = conn.name_by_cid(cid) {
dbg_chn_name(name)
} else {
false
}
}
#[derive(Debug, ThisError)]
pub enum Error {
NoProtocol,
@@ -156,6 +168,8 @@ pub enum Error {
NoFreeCid,
InsertQueues(#[from] scywr::insertqueues::Error),
FutLogic,
MissingTimestamp,
EnumFetch(#[from] enumfetch::Error),
}
impl err::ToErr for Error {
@@ -431,6 +445,7 @@ struct CreatedState {
shape: Shape,
log_more: bool,
name: String,
enum_str_table: Option<Vec<String>>,
}
impl CreatedState {
@@ -472,6 +487,7 @@ impl CreatedState {
shape: Shape::Scalar,
log_more: false,
name: String::new(),
enum_str_table: None,
}
}
@@ -999,7 +1015,7 @@ impl CaConn {
}
fn new_self_ticker() -> Pin<Box<tokio::time::Sleep>> {
Box::pin(tokio::time::sleep(Duration::from_millis(500)))
Box::pin(tokio::time::sleep(Duration::from_millis(1500)))
}
fn proto(&mut self) -> Option<&mut CaProto> {
@@ -1144,13 +1160,11 @@ impl CaConn {
fn handle_writer_establish_inner(&mut self, cid: Cid, writer: RtWriter) -> Result<(), Error> {
trace!("handle_writer_establish_inner {cid:?}");
let dbg_chn_cid = dbg_chn_cid(cid, self);
if dbg_chn_cid {
info!("handle_writer_establish_inner {:?}", cid);
}
let stnow = self.tmp_ts_poll.clone();
// At this point we have created the channel and created a writer for that type and sid.
// We do not yet monitor.
// TODO main objectives now:
// Store the writer with the channel state.
// Create a monitor for the channel.
// NOTE: must store the Writer even if not yet in Evented, we could also transition to Polled!
if let Some(conf) = self.channels.get_mut(&cid) {
// TODO refactor, should only execute this when required:
let conf_poll_conf = conf.poll_conf();
@@ -1173,7 +1187,7 @@ impl CaConn {
cssid: st2.channel.cssid.clone(),
status: ChannelStatus::Opened,
});
self.iqdqs.emit_status_item(item);
self.iqdqs.emit_status_item(item)?;
}
if let Some((ivl,)) = conf_poll_conf {
let created_state = WritableState {
@@ -1204,7 +1218,9 @@ impl CaConn {
subid
};
{
trace!("send out EventAdd for {cid:?}");
if dbg_chn_cid {
info!("send out EventAdd for {cid:?}");
}
let ty = CaMsgTy::EventAdd(EventAdd {
sid: st2.channel.sid.to_u32(),
data_type: st2.channel.ca_dbr_type,
@@ -1322,6 +1338,9 @@ impl CaConn {
// TODO can I reuse emit_channel_info_insert_items ?
trace!("channel_state_on_shutdown channels {}", self.channels.len());
for (_cid, conf) in &mut self.channels {
if dbg_chn_name(conf.conf.name()) {
info!("channel_state_on_shutdown {:?}", conf);
}
let chst = &mut conf.state;
match chst {
ChannelState::Init(cssid) => {
@@ -1443,6 +1462,7 @@ impl CaConn {
// return Err(Error::with_msg_no_trace());
return Ok(());
};
let dbg_chn = dbg_chn_cid(cid, self);
let ch_s = if let Some(x) = self.channels.get_mut(&cid) {
&mut x.state
} else {
@@ -1457,9 +1477,14 @@ impl CaConn {
// return Err(Error::with_msg_no_trace());
return Ok(());
};
trace!("handle_event_add_res {:?}", ch_s.cssid());
if dbg_chn {
info!("handle_event_add_res {:?} {:?}", cid, ev);
}
match ch_s {
ChannelState::Writable(st) => {
if dbg_chn {
info!("handle_event_add_res Writable {:?} {:?}", cid, ev);
}
// debug!(
// "CaConn sees data_count {} payload_len {}",
// ev.data_count, ev.payload_len
@@ -1664,7 +1689,7 @@ impl CaConn {
ty: CaMsgTy::ReadNotifyRes(ev),
ts: camsg_ts,
};
fut.as_mut().camsg(camsg, self);
fut.as_mut().camsg(camsg, self)?;
Ok(())
} else {
Err(Error::FutLogic)
@@ -1781,6 +1806,49 @@ impl CaConn {
Ok(())
}
fn convert_event_data(crst: &mut CreatedState, data: super::proto::CaDataValue) -> Result<DataValue, Error> {
use super::proto::CaDataValue;
use scywr::iteminsertqueue::DataValue;
let ret = match data {
CaDataValue::Scalar(val) => DataValue::Scalar({
use super::proto::CaDataScalarValue;
use scywr::iteminsertqueue::ScalarValue;
match val {
CaDataScalarValue::I8(x) => ScalarValue::I8(x),
CaDataScalarValue::I16(x) => ScalarValue::I16(x),
CaDataScalarValue::I32(x) => ScalarValue::I32(x),
CaDataScalarValue::F32(x) => ScalarValue::F32(x),
CaDataScalarValue::F64(x) => ScalarValue::F64(x),
CaDataScalarValue::Enum(x) => ScalarValue::Enum(
x,
crst.enum_str_table.as_ref().map_or_else(
|| String::from("missingstrings"),
|map| {
map.get(x as usize)
.map_or_else(|| String::from("undefined"), String::from)
},
),
),
CaDataScalarValue::String(x) => ScalarValue::String(x),
CaDataScalarValue::Bool(x) => ScalarValue::Bool(x),
}
}),
CaDataValue::Array(val) => DataValue::Array({
use super::proto::CaDataArrayValue;
use scywr::iteminsertqueue::ArrayValue;
match val {
CaDataArrayValue::I8(x) => ArrayValue::I8(x),
CaDataArrayValue::I16(x) => ArrayValue::I16(x),
CaDataArrayValue::I32(x) => ArrayValue::I32(x),
CaDataArrayValue::F32(x) => ArrayValue::F32(x),
CaDataArrayValue::F64(x) => ArrayValue::F64(x),
CaDataArrayValue::Bool(x) => ArrayValue::Bool(x),
}
}),
};
Ok(ret)
}
fn event_add_ingest(
payload_len: u32,
value: CaEventValue,
@@ -1792,20 +1860,7 @@ impl CaConn {
stnow: SystemTime,
stats: &CaConnStats,
) -> Result<(), Error> {
if crst.log_more {
info!(
"event_add_ingest payload_len {} value {:?} {} {}",
payload_len, value, value.status, value.severity
);
} else {
trace_event_incoming!(
"event_add_ingest payload_len {} value {:?} {} {}",
payload_len,
value,
value.status,
value.severity
);
}
trace_event_incoming!("event_add_ingest payload_len {} value {:?}", payload_len, value);
crst.ts_alive_last = tsnow;
crst.ts_activity_last = tsnow;
crst.st_activity_last = stnow;
@@ -1818,7 +1873,7 @@ impl CaConn {
let epoch = stnow.duration_since(std::time::UNIX_EPOCH).unwrap_or(Duration::ZERO);
epoch.as_secs() * SEC + epoch.subsec_nanos() as u64
};
let ts = value.ts;
let ts = value.ts().ok_or_else(|| Error::MissingTimestamp)?;
let ts_diff = ts.abs_diff(ts_local);
stats.ca_ts_off().ingest((ts_diff / MS) as u32);
{
@@ -1827,7 +1882,7 @@ impl CaConn {
crst.insert_item_ivl_ema.tick(tsnow);
let ts_ioc = TsNano::from_ns(ts);
let ts_local = TsNano::from_ns(ts_local);
let val: DataValue = value.data.into();
let val = Self::convert_event_data(crst, value.data)?;
// binwriter.ingest(ts_ioc, ts_local, &val, iqdqs)?;
{
let ((dwst, dwmt, dwlt),) = writer.write(ts_ioc, ts_local, val, iqdqs)?;
@@ -2346,9 +2401,9 @@ impl CaConn {
let log_more = match &scalar_type {
ScalarType::Enum => {
if cssid.id() % 20 == 14 {
if cssid.id() % 60 == 14 {
let name = conf.conf.name();
info!("ENUM {}", name);
// info!("ENUM {}", name);
true
} else {
false
@@ -2392,18 +2447,23 @@ impl CaConn {
shape: shape.clone(),
log_more,
name: conf.conf.name().into(),
enum_str_table: None,
};
if dbg_chn_name(created_state.name()) {
info!(
"handle_create_chan_res {:?} {}",
created_state.cid,
created_state.name()
);
}
match &scalar_type {
ScalarType::Enum => {
if created_state.log_more {
let min_quiets = conf.conf.min_quiets();
let fut = enumfetch::EnumFetch::new(created_state, self, min_quiets);
// TODO should always check if the slot is free.
let ioid = fut.ioid();
let x = Box::pin(fut);
self.handler_by_ioid.insert(ioid, Some(x));
} else {
}
let min_quiets = conf.conf.min_quiets();
let fut = enumfetch::EnumFetch::new(created_state, self, min_quiets);
// TODO should always check if the slot is free.
let ioid = fut.ioid();
let x = Box::pin(fut);
self.handler_by_ioid.insert(ioid, Some(x));
}
_ => {
*chst = ChannelState::MakingSeriesWriter(MakingSeriesWriterState {
+25 -12
View File
@@ -8,14 +8,15 @@ use err::ThisError;
use log::*;
use serieswriter::establish_worker::EstablishWorkerJob;
use std::pin::Pin;
use std::task::Poll;
use std::time::Instant;
#[derive(Debug, ThisError)]
pub enum Error {}
pub enum Error {
MissingState,
}
pub trait ConnFuture: Send {
fn camsg(self: Pin<&mut Self>, camsg: CaMsg, conn: &mut CaConn) -> Poll<()>;
fn camsg(self: Pin<&mut Self>, camsg: CaMsg, conn: &mut CaConn) -> Result<(), Error>;
}
pub struct EnumFetch {
@@ -26,8 +27,9 @@ pub struct EnumFetch {
impl EnumFetch {
pub fn new(created_state: CreatedState, conn: &mut CaConn, min_quiets: serieswriter::rtwriter::MinQuiets) -> Self {
if created_state.cssid.id() == 4705698279895902114 {}
let name = created_state.name();
info!("EnumFetch::new name {name}");
// info!("EnumFetch::new name {name}");
let dbr_ctrl_enum = 31;
let ioid = conn.ioid_next();
let ty = crate::ca::proto::CaMsgTy::ReadNotify(ReadNotify {
@@ -52,16 +54,26 @@ impl EnumFetch {
}
impl ConnFuture for EnumFetch {
fn camsg(self: Pin<&mut Self>, camsg: CaMsg, conn: &mut CaConn) -> Poll<()> {
use Poll::*;
fn camsg(mut self: Pin<&mut Self>, camsg: CaMsg, conn: &mut CaConn) -> Result<(), Error> {
let tsnow = Instant::now();
let crst = &self.created_state;
let crst = &mut self.created_state;
let name = self.created_state.name();
info!("EnumFetch::poll {name}");
let name = crst.name();
// info!("EnumFetch::poll {name}");
//*chst =
super::ChannelState::MakingSeriesWriter(super::MakingSeriesWriterState {
match camsg.ty {
crate::ca::proto::CaMsgTy::ReadNotifyRes(msg2) => match msg2.value.meta {
super::proto::CaMetaValue::CaMetaVariants(meta) => {
crst.enum_str_table = Some(meta.variants);
}
_ => {}
},
_ => {}
};
// This handler must not exist if the channel gets removed.
let conf = conn.channels.get_mut(&crst.cid).ok_or(Error::MissingState)?;
conf.state = super::ChannelState::MakingSeriesWriter(super::MakingSeriesWriterState {
tsbeg: tsnow,
channel: crst.clone(),
});
@@ -78,6 +90,7 @@ impl ConnFuture for EnumFetch {
);
conn.writer_establish_qu.push_back(job);
Pending
conn.handler_by_ioid.remove(&self.ioid);
Ok(())
}
}
+89 -72
View File
@@ -4,6 +4,7 @@ use err::ThisError;
use futures_util::Stream;
use log::*;
use netpod::timeunits::*;
use netpod::TsNano;
use slidebuf::SlideBuf;
use stats::CaProtoStats;
use std::collections::VecDeque;
@@ -252,28 +253,12 @@ pub enum CaDataScalarValue {
I32(i32),
F32(f32),
F64(f64),
Enum(i16, String),
Enum(i16),
String(String),
// TODO remove, CA has no bool, make new enum for other use cases.
Bool(bool),
}
impl From<CaDataScalarValue> for scywr::iteminsertqueue::ScalarValue {
fn from(val: CaDataScalarValue) -> Self {
use scywr::iteminsertqueue::ScalarValue;
match val {
CaDataScalarValue::I8(x) => ScalarValue::I8(x),
CaDataScalarValue::I16(x) => ScalarValue::I16(x),
CaDataScalarValue::I32(x) => ScalarValue::I32(x),
CaDataScalarValue::F32(x) => ScalarValue::F32(x),
CaDataScalarValue::F64(x) => ScalarValue::F64(x),
CaDataScalarValue::Enum(x, y) => ScalarValue::Enum(x, y),
CaDataScalarValue::String(x) => ScalarValue::String(x),
CaDataScalarValue::Bool(x) => ScalarValue::Bool(x),
}
}
}
#[derive(Clone, Debug)]
pub enum CaDataArrayValue {
I8(Vec<i8>),
@@ -285,42 +270,50 @@ pub enum CaDataArrayValue {
Bool(Vec<bool>),
}
impl From<CaDataArrayValue> for scywr::iteminsertqueue::ArrayValue {
fn from(val: CaDataArrayValue) -> Self {
use scywr::iteminsertqueue::ArrayValue;
match val {
CaDataArrayValue::I8(x) => ArrayValue::I8(x),
CaDataArrayValue::I16(x) => ArrayValue::I16(x),
CaDataArrayValue::I32(x) => ArrayValue::I32(x),
CaDataArrayValue::F32(x) => ArrayValue::F32(x),
CaDataArrayValue::F64(x) => ArrayValue::F64(x),
CaDataArrayValue::Bool(x) => ArrayValue::Bool(x),
}
}
}
#[derive(Clone, Debug)]
pub enum CaDataValue {
Scalar(CaDataScalarValue),
Array(CaDataArrayValue),
}
impl From<CaDataValue> for scywr::iteminsertqueue::DataValue {
fn from(value: CaDataValue) -> Self {
use scywr::iteminsertqueue::DataValue;
match value {
CaDataValue::Scalar(x) => DataValue::Scalar(x.into()),
CaDataValue::Array(x) => DataValue::Array(x.into()),
#[derive(Clone, Debug)]
pub struct CaEventValue {
pub data: CaDataValue,
pub meta: CaMetaValue,
}
impl CaEventValue {
// Timestamp ns from unix epoch.
pub fn ts(&self) -> Option<u64> {
match &self.meta {
CaMetaValue::CaMetaTime(x) => {
let ts = SEC * (x.ca_secs as u64 + EPICS_EPOCH_OFFSET) + x.ca_nanos as u64;
Some(ts)
}
CaMetaValue::CaMetaVariants(_) => None,
}
}
}
#[derive(Clone, Debug)]
pub struct CaEventValue {
pub ts: u64,
pub enum CaMetaValue {
CaMetaTime(CaMetaTime),
CaMetaVariants(CaMetaVariants),
}
#[derive(Clone, Debug)]
pub struct CaMetaTime {
pub status: u16,
pub severity: u16,
pub data: CaDataValue,
pub ca_secs: u32,
pub ca_nanos: u32,
}
#[derive(Clone, Debug)]
pub struct CaMetaVariants {
pub status: u16,
pub severity: u16,
pub variants: Vec<String>,
}
#[derive(Debug)]
@@ -637,6 +630,18 @@ macro_rules! convert_scalar_value {
}};
}
macro_rules! convert_scalar_enum_value {
($st:ty, $buf:expr) => {{
type ST = $st;
const STL: usize = std::mem::size_of::<ST>();
if $buf.len() < STL {
return Err(Error::NotEnoughPayload);
}
let v = ST::from_be_bytes($buf[..STL].try_into().map_err(|_| Error::BadSlice)?);
CaDataValue::Scalar(CaDataScalarValue::Enum(v))
}};
}
macro_rules! convert_wave_value {
($st:ty, $var:ident, $n:expr, $buf:expr) => {{
type ST = $st;
@@ -740,7 +745,7 @@ impl CaMsg {
CaScalarType::I32 => convert_scalar_value!(i32, I32, buf),
CaScalarType::F32 => convert_scalar_value!(f32, F32, buf),
CaScalarType::F64 => convert_scalar_value!(f64, F64, buf),
CaScalarType::Enum => convert_scalar_value!(i16, I16, buf),
CaScalarType::Enum => convert_scalar_enum_value!(i16, buf),
CaScalarType::String => {
// TODO constrain string length to the CA `data_count`.
let mut ixn = buf.len();
@@ -916,37 +921,55 @@ impl CaMsg {
fn extract_ca_data_value(hi: &HeadInfo, payload: &[u8], array_truncate: usize) -> Result<CaEventValue, Error> {
use netpod::Shape;
let ca_dbr_ty = CaDbrType::from_ca_u16(hi.data_type)?;
let ca_status;
let ca_severity;
let ca_secs;
let ca_nanos;
let ca_sh;
let data_offset = match &ca_dbr_ty.meta {
let ca_sh = Shape::from_ca_count(hi.data_count() as _).map_err(|_| {
error!("BadCaCount {hi:?}");
Error::BadCaCount
})?;
let (meta, data_offset) = match &ca_dbr_ty.meta {
CaDbrMetaType::Plain => return Err(Error::MismatchDbrTimeType),
CaDbrMetaType::Status => return Err(Error::MismatchDbrTimeType),
CaDbrMetaType::Time => {
ca_status = u16::from_be_bytes(payload[0..2].try_into().map_err(|_| Error::BadSlice)?);
ca_severity = u16::from_be_bytes(payload[2..4].try_into().map_err(|_| Error::BadSlice)?);
ca_secs = u32::from_be_bytes(payload[4..8].try_into().map_err(|_| Error::BadSlice)?);
ca_nanos = u32::from_be_bytes(payload[8..12].try_into().map_err(|_| Error::BadSlice)?);
ca_sh = Shape::from_ca_count(hi.data_count() as _).map_err(|_| {
error!("BadCaCount {hi:?}");
Error::BadCaCount
})?;
12
let status = u16::from_be_bytes(payload[0..2].try_into().map_err(|_| Error::BadSlice)?);
let severity = u16::from_be_bytes(payload[2..4].try_into().map_err(|_| Error::BadSlice)?);
let ca_secs = u32::from_be_bytes(payload[4..8].try_into().map_err(|_| Error::BadSlice)?);
let ca_nanos = u32::from_be_bytes(payload[8..12].try_into().map_err(|_| Error::BadSlice)?);
let meta = CaMetaValue::CaMetaTime(CaMetaTime {
status,
severity,
ca_secs,
ca_nanos,
});
(meta, 12)
}
CaDbrMetaType::Ctrl => {
ca_status = u16::from_be_bytes(payload[0..2].try_into().map_err(|_| Error::BadSlice)?);
ca_severity = u16::from_be_bytes(payload[2..4].try_into().map_err(|_| Error::BadSlice)?);
let st = std::time::SystemTime::now();
let dt = st.duration_since(std::time::SystemTime::UNIX_EPOCH).unwrap();
ca_secs = (dt.as_secs() - EPICS_EPOCH_OFFSET) as u32;
ca_nanos = dt.subsec_nanos();
ca_sh = Shape::Scalar;
let status = u16::from_be_bytes(payload[0..2].try_into().map_err(|_| Error::BadSlice)?);
let severity = u16::from_be_bytes(payload[2..4].try_into().map_err(|_| Error::BadSlice)?);
let varcnt = u16::from_be_bytes(payload[4..6].try_into().map_err(|_| Error::BadSlice)?);
if varcnt > 16 {
return Err(Error::BadCaCount);
}
let s = String::from_utf8_lossy(&payload[6..6 + 26 * 16]);
info!("enum variants debug {varcnt} {s}");
2 + 2 + 2 + 26 * 16
let mut variants = Vec::new();
for i in 0..varcnt {
let p = (6 + 26 * i) as usize;
let s1 = std::ffi::CStr::from_bytes_until_nul(&payload[p..p + 26])
.map_or(String::from("encodingerror"), |x| {
x.to_str().map_or(String::from("encodingerror"), |x| x.to_string())
});
let s1 = if s1.len() >= 26 {
String::from("toolongerror")
} else {
s1
};
variants.push(s1);
}
// info!("enum variants debug {varcnt} {s} {variants:?}");
let meta = CaMetaValue::CaMetaVariants(CaMetaVariants {
status,
severity,
variants,
});
(meta, 2 + 2 + 2 + 26 * 16)
}
};
let meta_padding = match ca_dbr_ty.meta {
@@ -988,13 +1011,7 @@ impl CaMsg {
err::todoval()
}
};
let ts = SEC * (ca_secs as u64 + EPICS_EPOCH_OFFSET) + ca_nanos as u64;
let value = CaEventValue {
ts,
status: ca_status,
severity: ca_severity,
data: value,
};
let value = CaEventValue { data: value, meta };
Ok(value)
}
}
+21 -18
View File
@@ -324,7 +324,7 @@ impl DataValue {
ScalarValue::I64(_) => ScalarType::I64,
ScalarValue::F32(_) => ScalarType::F32,
ScalarValue::F64(_) => ScalarType::F64,
ScalarValue::Enum(_, _) => ScalarType::Enum,
ScalarValue::Enum(..) => ScalarType::Enum,
ScalarValue::String(_) => ScalarType::STRING,
ScalarValue::Bool(_) => ScalarType::BOOL,
},
@@ -617,8 +617,6 @@ struct InsParCom {
ts_msp: TsMs,
ts_lsp: DtNano,
ts_net: TsMs,
ts_alt_1: TsNano,
pulse: u64,
do_insert: bool,
stats: Arc<InsertWorkerStats>,
}
@@ -626,28 +624,35 @@ struct InsParCom {
fn insert_scalar_gen_fut<ST>(par: InsParCom, val: ST, qu: Arc<PreparedStatement>, scy: Arc<ScySession>) -> InsertFut
where
ST: Value + SerializeCql + Send + 'static,
{
let params = (par.series.to_i64(), par.ts_msp.to_i64(), par.ts_lsp.to_i64(), val);
InsertFut::new(scy, qu, params, par.ts_net, par.stats)
}
fn insert_scalar_enum_gen_fut<ST1, ST2>(
par: InsParCom,
val: ST1,
valstr: ST2,
qu: Arc<PreparedStatement>,
scy: Arc<ScySession>,
) -> InsertFut
where
ST1: Value + SerializeCql + Send + 'static,
ST2: Value + SerializeCql + Send + 'static,
{
let params = (
par.series.to_i64(),
par.ts_msp.to_i64(),
par.ts_lsp.to_i64(),
par.ts_alt_1.ns() as i64,
par.pulse as i64,
val,
valstr,
);
InsertFut::new(scy, qu, params, par.ts_net, par.stats)
}
// val: Vec<ST> where ST: Value + SerializeCql + Send + 'static,
fn insert_array_gen_fut(par: InsParCom, val: Vec<u8>, qu: Arc<PreparedStatement>, scy: Arc<ScySession>) -> InsertFut {
let params = (
par.series.to_i64(),
par.ts_msp.to_i64(),
par.ts_lsp.to_i64(),
par.ts_alt_1.ns() as i64,
par.pulse as i64,
val,
);
let params = (par.series.to_i64(), par.ts_msp.to_i64(), par.ts_lsp.to_i64(), val);
InsertFut::new(scy, qu, params, par.ts_net, par.stats)
}
@@ -732,8 +737,6 @@ pub fn insert_item_fut(
ts_msp: item.ts_msp,
ts_lsp: item.ts_lsp,
ts_net: item.ts_net,
ts_alt_1: item.ts_alt_1,
pulse: item.pulse,
do_insert,
stats: stats.clone(),
};
@@ -749,7 +752,9 @@ pub fn insert_item_fut(
I64(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_i64.clone(), scy),
F32(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_f32.clone(), scy),
F64(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_f64.clone(), scy),
Enum(a, b) => insert_scalar_gen_fut(par, a, data_store.qu_insert_scalar_i16.clone(), scy),
Enum(val, valstr) => {
insert_scalar_enum_gen_fut(par, val, valstr, data_store.qu_insert_scalar_enum.clone(), scy)
}
String(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_string.clone(), scy),
Bool(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_bool.clone(), scy),
}
@@ -760,8 +765,6 @@ pub fn insert_item_fut(
ts_msp: item.ts_msp,
ts_lsp: item.ts_lsp,
ts_net: item.ts_net,
ts_alt_1: item.ts_alt_1,
pulse: item.pulse,
do_insert,
stats: stats.clone(),
};
+18
View File
@@ -426,6 +426,24 @@ async fn check_event_tables(keyspace: &str, rett: RetentionTime, scy: &ScySessio
tab.setup(scy).await?;
}
}
{
let tab = GenTwcsTab::new(
keyspace,
rett.table_prefix(),
format!("events_scalar_enum"),
&[
("series", "bigint"),
("ts_msp", "bigint"),
("ts_lsp", "bigint"),
("value", "smallint"),
("valuestr", "text"),
],
["series", "ts_msp"],
["ts_lsp"],
rett.ttl_events_d1(),
);
tab.setup(scy).await?;
}
Ok(())
}
+22 -4
View File
@@ -32,6 +32,7 @@ pub struct DataStore {
pub qu_insert_scalar_f64: Arc<PreparedStatement>,
pub qu_insert_scalar_bool: Arc<PreparedStatement>,
pub qu_insert_scalar_string: Arc<PreparedStatement>,
pub qu_insert_scalar_enum: Arc<PreparedStatement>,
pub qu_insert_array_u8: Arc<PreparedStatement>,
pub qu_insert_array_u16: Arc<PreparedStatement>,
pub qu_insert_array_u32: Arc<PreparedStatement>,
@@ -56,8 +57,8 @@ macro_rules! prep_qu_ins_a {
($id1:expr, $rett:expr, $scy:expr) => {{
let cql = format!(
concat!(
"insert into {}{} (series, ts_msp, ts_lsp, ts_alt_1, pulse, value)",
" values (?, ?, ?, ?, ?, ?)"
"insert into {}{} (series, ts_msp, ts_lsp, pulse, value)",
" values (?, ?, ?, 0, ?)"
),
$rett.table_prefix(),
$id1
@@ -71,8 +72,23 @@ macro_rules! prep_qu_ins_b {
($id1:expr, $rett:expr, $scy:expr) => {{
let cql = format!(
concat!(
"insert into {}{} (series, ts_msp, ts_lsp, ts_alt_1, pulse, valueblob)",
" values (?, ?, ?, ?, ?, ?)"
"insert into {}{} (series, ts_msp, ts_lsp, pulse, valueblob)",
" values (?, ?, ?, 0, ?)"
),
$rett.table_prefix(),
$id1
);
let q = $scy.prepare(cql).await?;
Arc::new(q)
}};
}
macro_rules! prep_qu_ins_enum {
($id1:expr, $rett:expr, $scy:expr) => {{
let cql = format!(
concat!(
"insert into {}{} (series, ts_msp, ts_lsp, value, valuestr)",
" values (?, ?, ?, ?, ?)"
),
$rett.table_prefix(),
$id1
@@ -121,6 +137,7 @@ impl DataStore {
let qu_insert_scalar_f64 = prep_qu_ins_a!("events_scalar_f64", rett, scy);
let qu_insert_scalar_bool = prep_qu_ins_a!("events_scalar_bool", rett, scy);
let qu_insert_scalar_string = prep_qu_ins_a!("events_scalar_string", rett, scy);
let qu_insert_scalar_enum = prep_qu_ins_enum!("events_scalar_enum", rett, scy);
let qu_insert_array_u8 = prep_qu_ins_b!("events_array_u8", rett, scy);
let qu_insert_array_u16 = prep_qu_ins_b!("events_array_u16", rett, scy);
@@ -208,6 +225,7 @@ impl DataStore {
qu_insert_scalar_f64,
qu_insert_scalar_bool,
qu_insert_scalar_string,
qu_insert_scalar_enum,
qu_insert_array_u8,
qu_insert_array_u16,
qu_insert_array_u32,
+3
View File
@@ -127,6 +127,9 @@ impl RtWriter {
iqdqs: &mut InsertDeques,
) -> Result<((bool, bool, bool),), Error> {
let sid = self.sid;
if sid.id() == 6050300124140774549 {
info!("write {:?}", val);
}
let (did_write_st,) = Self::write_inner(
"ST",
self.min_quiets.st,