diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index f07d74f..5a8c42d 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -130,6 +130,18 @@ macro_rules! trace_event_incoming { }; } +fn dbg_chn_name(name: impl AsRef) -> bool { + name.as_ref() == "SINSB02-KCOL-ACT:V-EY21700-MAN-ON-SP" +} + +fn dbg_chn_cid(cid: Cid, conn: &CaConn) -> bool { + if let Some(name) = conn.name_by_cid(cid) { + dbg_chn_name(name) + } else { + false + } +} + #[derive(Debug, ThisError)] pub enum Error { NoProtocol, @@ -156,6 +168,8 @@ pub enum Error { NoFreeCid, InsertQueues(#[from] scywr::insertqueues::Error), FutLogic, + MissingTimestamp, + EnumFetch(#[from] enumfetch::Error), } impl err::ToErr for Error { @@ -431,6 +445,7 @@ struct CreatedState { shape: Shape, log_more: bool, name: String, + enum_str_table: Option>, } impl CreatedState { @@ -472,6 +487,7 @@ impl CreatedState { shape: Shape::Scalar, log_more: false, name: String::new(), + enum_str_table: None, } } @@ -999,7 +1015,7 @@ impl CaConn { } fn new_self_ticker() -> Pin> { - Box::pin(tokio::time::sleep(Duration::from_millis(500))) + Box::pin(tokio::time::sleep(Duration::from_millis(1500))) } fn proto(&mut self) -> Option<&mut CaProto> { @@ -1144,13 +1160,11 @@ impl CaConn { fn handle_writer_establish_inner(&mut self, cid: Cid, writer: RtWriter) -> Result<(), Error> { trace!("handle_writer_establish_inner {cid:?}"); + let dbg_chn_cid = dbg_chn_cid(cid, self); + if dbg_chn_cid { + info!("handle_writer_establish_inner {:?}", cid); + } let stnow = self.tmp_ts_poll.clone(); - // At this point we have created the channel and created a writer for that type and sid. - // We do not yet monitor. - // TODO main objectives now: - // Store the writer with the channel state. - // Create a monitor for the channel. - // NOTE: must store the Writer even if not yet in Evented, we could also transition to Polled! if let Some(conf) = self.channels.get_mut(&cid) { // TODO refactor, should only execute this when required: let conf_poll_conf = conf.poll_conf(); @@ -1173,7 +1187,7 @@ impl CaConn { cssid: st2.channel.cssid.clone(), status: ChannelStatus::Opened, }); - self.iqdqs.emit_status_item(item); + self.iqdqs.emit_status_item(item)?; } if let Some((ivl,)) = conf_poll_conf { let created_state = WritableState { @@ -1204,7 +1218,9 @@ impl CaConn { subid }; { - trace!("send out EventAdd for {cid:?}"); + if dbg_chn_cid { + info!("send out EventAdd for {cid:?}"); + } let ty = CaMsgTy::EventAdd(EventAdd { sid: st2.channel.sid.to_u32(), data_type: st2.channel.ca_dbr_type, @@ -1322,6 +1338,9 @@ impl CaConn { // TODO can I reuse emit_channel_info_insert_items ? trace!("channel_state_on_shutdown channels {}", self.channels.len()); for (_cid, conf) in &mut self.channels { + if dbg_chn_name(conf.conf.name()) { + info!("channel_state_on_shutdown {:?}", conf); + } let chst = &mut conf.state; match chst { ChannelState::Init(cssid) => { @@ -1443,6 +1462,7 @@ impl CaConn { // return Err(Error::with_msg_no_trace()); return Ok(()); }; + let dbg_chn = dbg_chn_cid(cid, self); let ch_s = if let Some(x) = self.channels.get_mut(&cid) { &mut x.state } else { @@ -1457,9 +1477,14 @@ impl CaConn { // return Err(Error::with_msg_no_trace()); return Ok(()); }; - trace!("handle_event_add_res {:?}", ch_s.cssid()); + if dbg_chn { + info!("handle_event_add_res {:?} {:?}", cid, ev); + } match ch_s { ChannelState::Writable(st) => { + if dbg_chn { + info!("handle_event_add_res Writable {:?} {:?}", cid, ev); + } // debug!( // "CaConn sees data_count {} payload_len {}", // ev.data_count, ev.payload_len @@ -1664,7 +1689,7 @@ impl CaConn { ty: CaMsgTy::ReadNotifyRes(ev), ts: camsg_ts, }; - fut.as_mut().camsg(camsg, self); + fut.as_mut().camsg(camsg, self)?; Ok(()) } else { Err(Error::FutLogic) @@ -1781,6 +1806,49 @@ impl CaConn { Ok(()) } + fn convert_event_data(crst: &mut CreatedState, data: super::proto::CaDataValue) -> Result { + use super::proto::CaDataValue; + use scywr::iteminsertqueue::DataValue; + let ret = match data { + CaDataValue::Scalar(val) => DataValue::Scalar({ + use super::proto::CaDataScalarValue; + use scywr::iteminsertqueue::ScalarValue; + match val { + CaDataScalarValue::I8(x) => ScalarValue::I8(x), + CaDataScalarValue::I16(x) => ScalarValue::I16(x), + CaDataScalarValue::I32(x) => ScalarValue::I32(x), + CaDataScalarValue::F32(x) => ScalarValue::F32(x), + CaDataScalarValue::F64(x) => ScalarValue::F64(x), + CaDataScalarValue::Enum(x) => ScalarValue::Enum( + x, + crst.enum_str_table.as_ref().map_or_else( + || String::from("missingstrings"), + |map| { + map.get(x as usize) + .map_or_else(|| String::from("undefined"), String::from) + }, + ), + ), + CaDataScalarValue::String(x) => ScalarValue::String(x), + CaDataScalarValue::Bool(x) => ScalarValue::Bool(x), + } + }), + CaDataValue::Array(val) => DataValue::Array({ + use super::proto::CaDataArrayValue; + use scywr::iteminsertqueue::ArrayValue; + match val { + CaDataArrayValue::I8(x) => ArrayValue::I8(x), + CaDataArrayValue::I16(x) => ArrayValue::I16(x), + CaDataArrayValue::I32(x) => ArrayValue::I32(x), + CaDataArrayValue::F32(x) => ArrayValue::F32(x), + CaDataArrayValue::F64(x) => ArrayValue::F64(x), + CaDataArrayValue::Bool(x) => ArrayValue::Bool(x), + } + }), + }; + Ok(ret) + } + fn event_add_ingest( payload_len: u32, value: CaEventValue, @@ -1792,20 +1860,7 @@ impl CaConn { stnow: SystemTime, stats: &CaConnStats, ) -> Result<(), Error> { - if crst.log_more { - info!( - "event_add_ingest payload_len {} value {:?} {} {}", - payload_len, value, value.status, value.severity - ); - } else { - trace_event_incoming!( - "event_add_ingest payload_len {} value {:?} {} {}", - payload_len, - value, - value.status, - value.severity - ); - } + trace_event_incoming!("event_add_ingest payload_len {} value {:?}", payload_len, value); crst.ts_alive_last = tsnow; crst.ts_activity_last = tsnow; crst.st_activity_last = stnow; @@ -1818,7 +1873,7 @@ impl CaConn { let epoch = stnow.duration_since(std::time::UNIX_EPOCH).unwrap_or(Duration::ZERO); epoch.as_secs() * SEC + epoch.subsec_nanos() as u64 }; - let ts = value.ts; + let ts = value.ts().ok_or_else(|| Error::MissingTimestamp)?; let ts_diff = ts.abs_diff(ts_local); stats.ca_ts_off().ingest((ts_diff / MS) as u32); { @@ -1827,7 +1882,7 @@ impl CaConn { crst.insert_item_ivl_ema.tick(tsnow); let ts_ioc = TsNano::from_ns(ts); let ts_local = TsNano::from_ns(ts_local); - let val: DataValue = value.data.into(); + let val = Self::convert_event_data(crst, value.data)?; // binwriter.ingest(ts_ioc, ts_local, &val, iqdqs)?; { let ((dwst, dwmt, dwlt),) = writer.write(ts_ioc, ts_local, val, iqdqs)?; @@ -2346,9 +2401,9 @@ impl CaConn { let log_more = match &scalar_type { ScalarType::Enum => { - if cssid.id() % 20 == 14 { + if cssid.id() % 60 == 14 { let name = conf.conf.name(); - info!("ENUM {}", name); + // info!("ENUM {}", name); true } else { false @@ -2392,18 +2447,23 @@ impl CaConn { shape: shape.clone(), log_more, name: conf.conf.name().into(), + enum_str_table: None, }; + if dbg_chn_name(created_state.name()) { + info!( + "handle_create_chan_res {:?} {}", + created_state.cid, + created_state.name() + ); + } match &scalar_type { ScalarType::Enum => { - if created_state.log_more { - let min_quiets = conf.conf.min_quiets(); - let fut = enumfetch::EnumFetch::new(created_state, self, min_quiets); - // TODO should always check if the slot is free. - let ioid = fut.ioid(); - let x = Box::pin(fut); - self.handler_by_ioid.insert(ioid, Some(x)); - } else { - } + let min_quiets = conf.conf.min_quiets(); + let fut = enumfetch::EnumFetch::new(created_state, self, min_quiets); + // TODO should always check if the slot is free. + let ioid = fut.ioid(); + let x = Box::pin(fut); + self.handler_by_ioid.insert(ioid, Some(x)); } _ => { *chst = ChannelState::MakingSeriesWriter(MakingSeriesWriterState { diff --git a/netfetch/src/ca/conn/enumfetch.rs b/netfetch/src/ca/conn/enumfetch.rs index f99994c..c5c9734 100644 --- a/netfetch/src/ca/conn/enumfetch.rs +++ b/netfetch/src/ca/conn/enumfetch.rs @@ -8,14 +8,15 @@ use err::ThisError; use log::*; use serieswriter::establish_worker::EstablishWorkerJob; use std::pin::Pin; -use std::task::Poll; use std::time::Instant; #[derive(Debug, ThisError)] -pub enum Error {} +pub enum Error { + MissingState, +} pub trait ConnFuture: Send { - fn camsg(self: Pin<&mut Self>, camsg: CaMsg, conn: &mut CaConn) -> Poll<()>; + fn camsg(self: Pin<&mut Self>, camsg: CaMsg, conn: &mut CaConn) -> Result<(), Error>; } pub struct EnumFetch { @@ -26,8 +27,9 @@ pub struct EnumFetch { impl EnumFetch { pub fn new(created_state: CreatedState, conn: &mut CaConn, min_quiets: serieswriter::rtwriter::MinQuiets) -> Self { + if created_state.cssid.id() == 4705698279895902114 {} let name = created_state.name(); - info!("EnumFetch::new name {name}"); + // info!("EnumFetch::new name {name}"); let dbr_ctrl_enum = 31; let ioid = conn.ioid_next(); let ty = crate::ca::proto::CaMsgTy::ReadNotify(ReadNotify { @@ -52,16 +54,26 @@ impl EnumFetch { } impl ConnFuture for EnumFetch { - fn camsg(self: Pin<&mut Self>, camsg: CaMsg, conn: &mut CaConn) -> Poll<()> { - use Poll::*; + fn camsg(mut self: Pin<&mut Self>, camsg: CaMsg, conn: &mut CaConn) -> Result<(), Error> { let tsnow = Instant::now(); - let crst = &self.created_state; + let crst = &mut self.created_state; - let name = self.created_state.name(); - info!("EnumFetch::poll {name}"); + let name = crst.name(); + // info!("EnumFetch::poll {name}"); - //*chst = - super::ChannelState::MakingSeriesWriter(super::MakingSeriesWriterState { + match camsg.ty { + crate::ca::proto::CaMsgTy::ReadNotifyRes(msg2) => match msg2.value.meta { + super::proto::CaMetaValue::CaMetaVariants(meta) => { + crst.enum_str_table = Some(meta.variants); + } + _ => {} + }, + _ => {} + }; + + // This handler must not exist if the channel gets removed. + let conf = conn.channels.get_mut(&crst.cid).ok_or(Error::MissingState)?; + conf.state = super::ChannelState::MakingSeriesWriter(super::MakingSeriesWriterState { tsbeg: tsnow, channel: crst.clone(), }); @@ -78,6 +90,7 @@ impl ConnFuture for EnumFetch { ); conn.writer_establish_qu.push_back(job); - Pending + conn.handler_by_ioid.remove(&self.ioid); + Ok(()) } } diff --git a/netfetch/src/ca/proto.rs b/netfetch/src/ca/proto.rs index ac37314..0455b2c 100644 --- a/netfetch/src/ca/proto.rs +++ b/netfetch/src/ca/proto.rs @@ -4,6 +4,7 @@ use err::ThisError; use futures_util::Stream; use log::*; use netpod::timeunits::*; +use netpod::TsNano; use slidebuf::SlideBuf; use stats::CaProtoStats; use std::collections::VecDeque; @@ -252,28 +253,12 @@ pub enum CaDataScalarValue { I32(i32), F32(f32), F64(f64), - Enum(i16, String), + Enum(i16), String(String), // TODO remove, CA has no bool, make new enum for other use cases. Bool(bool), } -impl From for scywr::iteminsertqueue::ScalarValue { - fn from(val: CaDataScalarValue) -> Self { - use scywr::iteminsertqueue::ScalarValue; - match val { - CaDataScalarValue::I8(x) => ScalarValue::I8(x), - CaDataScalarValue::I16(x) => ScalarValue::I16(x), - CaDataScalarValue::I32(x) => ScalarValue::I32(x), - CaDataScalarValue::F32(x) => ScalarValue::F32(x), - CaDataScalarValue::F64(x) => ScalarValue::F64(x), - CaDataScalarValue::Enum(x, y) => ScalarValue::Enum(x, y), - CaDataScalarValue::String(x) => ScalarValue::String(x), - CaDataScalarValue::Bool(x) => ScalarValue::Bool(x), - } - } -} - #[derive(Clone, Debug)] pub enum CaDataArrayValue { I8(Vec), @@ -285,42 +270,50 @@ pub enum CaDataArrayValue { Bool(Vec), } -impl From for scywr::iteminsertqueue::ArrayValue { - fn from(val: CaDataArrayValue) -> Self { - use scywr::iteminsertqueue::ArrayValue; - match val { - CaDataArrayValue::I8(x) => ArrayValue::I8(x), - CaDataArrayValue::I16(x) => ArrayValue::I16(x), - CaDataArrayValue::I32(x) => ArrayValue::I32(x), - CaDataArrayValue::F32(x) => ArrayValue::F32(x), - CaDataArrayValue::F64(x) => ArrayValue::F64(x), - CaDataArrayValue::Bool(x) => ArrayValue::Bool(x), - } - } -} - #[derive(Clone, Debug)] pub enum CaDataValue { Scalar(CaDataScalarValue), Array(CaDataArrayValue), } -impl From for scywr::iteminsertqueue::DataValue { - fn from(value: CaDataValue) -> Self { - use scywr::iteminsertqueue::DataValue; - match value { - CaDataValue::Scalar(x) => DataValue::Scalar(x.into()), - CaDataValue::Array(x) => DataValue::Array(x.into()), +#[derive(Clone, Debug)] +pub struct CaEventValue { + pub data: CaDataValue, + pub meta: CaMetaValue, +} + +impl CaEventValue { + // Timestamp ns from unix epoch. + pub fn ts(&self) -> Option { + match &self.meta { + CaMetaValue::CaMetaTime(x) => { + let ts = SEC * (x.ca_secs as u64 + EPICS_EPOCH_OFFSET) + x.ca_nanos as u64; + Some(ts) + } + CaMetaValue::CaMetaVariants(_) => None, } } } #[derive(Clone, Debug)] -pub struct CaEventValue { - pub ts: u64, +pub enum CaMetaValue { + CaMetaTime(CaMetaTime), + CaMetaVariants(CaMetaVariants), +} + +#[derive(Clone, Debug)] +pub struct CaMetaTime { pub status: u16, pub severity: u16, - pub data: CaDataValue, + pub ca_secs: u32, + pub ca_nanos: u32, +} + +#[derive(Clone, Debug)] +pub struct CaMetaVariants { + pub status: u16, + pub severity: u16, + pub variants: Vec, } #[derive(Debug)] @@ -637,6 +630,18 @@ macro_rules! convert_scalar_value { }}; } +macro_rules! convert_scalar_enum_value { + ($st:ty, $buf:expr) => {{ + type ST = $st; + const STL: usize = std::mem::size_of::(); + if $buf.len() < STL { + return Err(Error::NotEnoughPayload); + } + let v = ST::from_be_bytes($buf[..STL].try_into().map_err(|_| Error::BadSlice)?); + CaDataValue::Scalar(CaDataScalarValue::Enum(v)) + }}; +} + macro_rules! convert_wave_value { ($st:ty, $var:ident, $n:expr, $buf:expr) => {{ type ST = $st; @@ -740,7 +745,7 @@ impl CaMsg { CaScalarType::I32 => convert_scalar_value!(i32, I32, buf), CaScalarType::F32 => convert_scalar_value!(f32, F32, buf), CaScalarType::F64 => convert_scalar_value!(f64, F64, buf), - CaScalarType::Enum => convert_scalar_value!(i16, I16, buf), + CaScalarType::Enum => convert_scalar_enum_value!(i16, buf), CaScalarType::String => { // TODO constrain string length to the CA `data_count`. let mut ixn = buf.len(); @@ -916,37 +921,55 @@ impl CaMsg { fn extract_ca_data_value(hi: &HeadInfo, payload: &[u8], array_truncate: usize) -> Result { use netpod::Shape; let ca_dbr_ty = CaDbrType::from_ca_u16(hi.data_type)?; - let ca_status; - let ca_severity; - let ca_secs; - let ca_nanos; - let ca_sh; - let data_offset = match &ca_dbr_ty.meta { + let ca_sh = Shape::from_ca_count(hi.data_count() as _).map_err(|_| { + error!("BadCaCount {hi:?}"); + Error::BadCaCount + })?; + let (meta, data_offset) = match &ca_dbr_ty.meta { CaDbrMetaType::Plain => return Err(Error::MismatchDbrTimeType), CaDbrMetaType::Status => return Err(Error::MismatchDbrTimeType), CaDbrMetaType::Time => { - ca_status = u16::from_be_bytes(payload[0..2].try_into().map_err(|_| Error::BadSlice)?); - ca_severity = u16::from_be_bytes(payload[2..4].try_into().map_err(|_| Error::BadSlice)?); - ca_secs = u32::from_be_bytes(payload[4..8].try_into().map_err(|_| Error::BadSlice)?); - ca_nanos = u32::from_be_bytes(payload[8..12].try_into().map_err(|_| Error::BadSlice)?); - ca_sh = Shape::from_ca_count(hi.data_count() as _).map_err(|_| { - error!("BadCaCount {hi:?}"); - Error::BadCaCount - })?; - 12 + let status = u16::from_be_bytes(payload[0..2].try_into().map_err(|_| Error::BadSlice)?); + let severity = u16::from_be_bytes(payload[2..4].try_into().map_err(|_| Error::BadSlice)?); + let ca_secs = u32::from_be_bytes(payload[4..8].try_into().map_err(|_| Error::BadSlice)?); + let ca_nanos = u32::from_be_bytes(payload[8..12].try_into().map_err(|_| Error::BadSlice)?); + let meta = CaMetaValue::CaMetaTime(CaMetaTime { + status, + severity, + ca_secs, + ca_nanos, + }); + (meta, 12) } CaDbrMetaType::Ctrl => { - ca_status = u16::from_be_bytes(payload[0..2].try_into().map_err(|_| Error::BadSlice)?); - ca_severity = u16::from_be_bytes(payload[2..4].try_into().map_err(|_| Error::BadSlice)?); - let st = std::time::SystemTime::now(); - let dt = st.duration_since(std::time::SystemTime::UNIX_EPOCH).unwrap(); - ca_secs = (dt.as_secs() - EPICS_EPOCH_OFFSET) as u32; - ca_nanos = dt.subsec_nanos(); - ca_sh = Shape::Scalar; + let status = u16::from_be_bytes(payload[0..2].try_into().map_err(|_| Error::BadSlice)?); + let severity = u16::from_be_bytes(payload[2..4].try_into().map_err(|_| Error::BadSlice)?); let varcnt = u16::from_be_bytes(payload[4..6].try_into().map_err(|_| Error::BadSlice)?); + if varcnt > 16 { + return Err(Error::BadCaCount); + } let s = String::from_utf8_lossy(&payload[6..6 + 26 * 16]); - info!("enum variants debug {varcnt} {s}"); - 2 + 2 + 2 + 26 * 16 + let mut variants = Vec::new(); + for i in 0..varcnt { + let p = (6 + 26 * i) as usize; + let s1 = std::ffi::CStr::from_bytes_until_nul(&payload[p..p + 26]) + .map_or(String::from("encodingerror"), |x| { + x.to_str().map_or(String::from("encodingerror"), |x| x.to_string()) + }); + let s1 = if s1.len() >= 26 { + String::from("toolongerror") + } else { + s1 + }; + variants.push(s1); + } + // info!("enum variants debug {varcnt} {s} {variants:?}"); + let meta = CaMetaValue::CaMetaVariants(CaMetaVariants { + status, + severity, + variants, + }); + (meta, 2 + 2 + 2 + 26 * 16) } }; let meta_padding = match ca_dbr_ty.meta { @@ -988,13 +1011,7 @@ impl CaMsg { err::todoval() } }; - let ts = SEC * (ca_secs as u64 + EPICS_EPOCH_OFFSET) + ca_nanos as u64; - let value = CaEventValue { - ts, - status: ca_status, - severity: ca_severity, - data: value, - }; + let value = CaEventValue { data: value, meta }; Ok(value) } } diff --git a/scywr/src/iteminsertqueue.rs b/scywr/src/iteminsertqueue.rs index 3fa0065..037da08 100644 --- a/scywr/src/iteminsertqueue.rs +++ b/scywr/src/iteminsertqueue.rs @@ -324,7 +324,7 @@ impl DataValue { ScalarValue::I64(_) => ScalarType::I64, ScalarValue::F32(_) => ScalarType::F32, ScalarValue::F64(_) => ScalarType::F64, - ScalarValue::Enum(_, _) => ScalarType::Enum, + ScalarValue::Enum(..) => ScalarType::Enum, ScalarValue::String(_) => ScalarType::STRING, ScalarValue::Bool(_) => ScalarType::BOOL, }, @@ -617,8 +617,6 @@ struct InsParCom { ts_msp: TsMs, ts_lsp: DtNano, ts_net: TsMs, - ts_alt_1: TsNano, - pulse: u64, do_insert: bool, stats: Arc, } @@ -626,28 +624,35 @@ struct InsParCom { fn insert_scalar_gen_fut(par: InsParCom, val: ST, qu: Arc, scy: Arc) -> InsertFut where ST: Value + SerializeCql + Send + 'static, +{ + let params = (par.series.to_i64(), par.ts_msp.to_i64(), par.ts_lsp.to_i64(), val); + InsertFut::new(scy, qu, params, par.ts_net, par.stats) +} + +fn insert_scalar_enum_gen_fut( + par: InsParCom, + val: ST1, + valstr: ST2, + qu: Arc, + scy: Arc, +) -> InsertFut +where + ST1: Value + SerializeCql + Send + 'static, + ST2: Value + SerializeCql + Send + 'static, { let params = ( par.series.to_i64(), par.ts_msp.to_i64(), par.ts_lsp.to_i64(), - par.ts_alt_1.ns() as i64, - par.pulse as i64, val, + valstr, ); InsertFut::new(scy, qu, params, par.ts_net, par.stats) } // val: Vec where ST: Value + SerializeCql + Send + 'static, fn insert_array_gen_fut(par: InsParCom, val: Vec, qu: Arc, scy: Arc) -> InsertFut { - let params = ( - par.series.to_i64(), - par.ts_msp.to_i64(), - par.ts_lsp.to_i64(), - par.ts_alt_1.ns() as i64, - par.pulse as i64, - val, - ); + let params = (par.series.to_i64(), par.ts_msp.to_i64(), par.ts_lsp.to_i64(), val); InsertFut::new(scy, qu, params, par.ts_net, par.stats) } @@ -732,8 +737,6 @@ pub fn insert_item_fut( ts_msp: item.ts_msp, ts_lsp: item.ts_lsp, ts_net: item.ts_net, - ts_alt_1: item.ts_alt_1, - pulse: item.pulse, do_insert, stats: stats.clone(), }; @@ -749,7 +752,9 @@ pub fn insert_item_fut( I64(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_i64.clone(), scy), F32(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_f32.clone(), scy), F64(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_f64.clone(), scy), - Enum(a, b) => insert_scalar_gen_fut(par, a, data_store.qu_insert_scalar_i16.clone(), scy), + Enum(val, valstr) => { + insert_scalar_enum_gen_fut(par, val, valstr, data_store.qu_insert_scalar_enum.clone(), scy) + } String(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_string.clone(), scy), Bool(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_bool.clone(), scy), } @@ -760,8 +765,6 @@ pub fn insert_item_fut( ts_msp: item.ts_msp, ts_lsp: item.ts_lsp, ts_net: item.ts_net, - ts_alt_1: item.ts_alt_1, - pulse: item.pulse, do_insert, stats: stats.clone(), }; diff --git a/scywr/src/schema.rs b/scywr/src/schema.rs index cd36bca..cbd9592 100644 --- a/scywr/src/schema.rs +++ b/scywr/src/schema.rs @@ -426,6 +426,24 @@ async fn check_event_tables(keyspace: &str, rett: RetentionTime, scy: &ScySessio tab.setup(scy).await?; } } + { + let tab = GenTwcsTab::new( + keyspace, + rett.table_prefix(), + format!("events_scalar_enum"), + &[ + ("series", "bigint"), + ("ts_msp", "bigint"), + ("ts_lsp", "bigint"), + ("value", "smallint"), + ("valuestr", "text"), + ], + ["series", "ts_msp"], + ["ts_lsp"], + rett.ttl_events_d1(), + ); + tab.setup(scy).await?; + } Ok(()) } diff --git a/scywr/src/store.rs b/scywr/src/store.rs index 467aded..9953d27 100644 --- a/scywr/src/store.rs +++ b/scywr/src/store.rs @@ -32,6 +32,7 @@ pub struct DataStore { pub qu_insert_scalar_f64: Arc, pub qu_insert_scalar_bool: Arc, pub qu_insert_scalar_string: Arc, + pub qu_insert_scalar_enum: Arc, pub qu_insert_array_u8: Arc, pub qu_insert_array_u16: Arc, pub qu_insert_array_u32: Arc, @@ -56,8 +57,8 @@ macro_rules! prep_qu_ins_a { ($id1:expr, $rett:expr, $scy:expr) => {{ let cql = format!( concat!( - "insert into {}{} (series, ts_msp, ts_lsp, ts_alt_1, pulse, value)", - " values (?, ?, ?, ?, ?, ?)" + "insert into {}{} (series, ts_msp, ts_lsp, pulse, value)", + " values (?, ?, ?, 0, ?)" ), $rett.table_prefix(), $id1 @@ -71,8 +72,23 @@ macro_rules! prep_qu_ins_b { ($id1:expr, $rett:expr, $scy:expr) => {{ let cql = format!( concat!( - "insert into {}{} (series, ts_msp, ts_lsp, ts_alt_1, pulse, valueblob)", - " values (?, ?, ?, ?, ?, ?)" + "insert into {}{} (series, ts_msp, ts_lsp, pulse, valueblob)", + " values (?, ?, ?, 0, ?)" + ), + $rett.table_prefix(), + $id1 + ); + let q = $scy.prepare(cql).await?; + Arc::new(q) + }}; +} + +macro_rules! prep_qu_ins_enum { + ($id1:expr, $rett:expr, $scy:expr) => {{ + let cql = format!( + concat!( + "insert into {}{} (series, ts_msp, ts_lsp, value, valuestr)", + " values (?, ?, ?, ?, ?)" ), $rett.table_prefix(), $id1 @@ -121,6 +137,7 @@ impl DataStore { let qu_insert_scalar_f64 = prep_qu_ins_a!("events_scalar_f64", rett, scy); let qu_insert_scalar_bool = prep_qu_ins_a!("events_scalar_bool", rett, scy); let qu_insert_scalar_string = prep_qu_ins_a!("events_scalar_string", rett, scy); + let qu_insert_scalar_enum = prep_qu_ins_enum!("events_scalar_enum", rett, scy); let qu_insert_array_u8 = prep_qu_ins_b!("events_array_u8", rett, scy); let qu_insert_array_u16 = prep_qu_ins_b!("events_array_u16", rett, scy); @@ -208,6 +225,7 @@ impl DataStore { qu_insert_scalar_f64, qu_insert_scalar_bool, qu_insert_scalar_string, + qu_insert_scalar_enum, qu_insert_array_u8, qu_insert_array_u16, qu_insert_array_u32, diff --git a/serieswriter/src/rtwriter.rs b/serieswriter/src/rtwriter.rs index c7ef687..d249fb5 100644 --- a/serieswriter/src/rtwriter.rs +++ b/serieswriter/src/rtwriter.rs @@ -127,6 +127,9 @@ impl RtWriter { iqdqs: &mut InsertDeques, ) -> Result<((bool, bool, bool),), Error> { let sid = self.sid; + if sid.id() == 6050300124140774549 { + info!("write {:?}", val); + } let (did_write_st,) = Self::write_inner( "ST", self.min_quiets.st,