From 816f7f130fee49f109099493fb9ddb45d4e374f0 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Mon, 1 Jul 2024 17:33:07 +0200 Subject: [PATCH] WIP factor initial get --- daqingest/Cargo.toml | 2 +- netfetch/Cargo.toml | 2 +- netfetch/src/ca/conn.rs | 307 ++++++++++++++++++++---------- netfetch/src/ca/conn/enumfetch.rs | 83 ++++++++ netfetch/src/ca/proto.rs | 64 +++++-- scywr/Cargo.toml | 2 +- serieswriter/src/rtwriter.rs | 2 +- 7 files changed, 342 insertions(+), 120 deletions(-) create mode 100644 netfetch/src/ca/conn/enumfetch.rs diff --git a/daqingest/Cargo.toml b/daqingest/Cargo.toml index b4e1a8f..a5ab372 100644 --- a/daqingest/Cargo.toml +++ b/daqingest/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqingest" -version = "0.2.2-aa.2" +version = "0.2.2-aa.3" authors = ["Dominik Werder "] edition = "2021" diff --git a/netfetch/Cargo.toml b/netfetch/Cargo.toml index ae4f339..1301382 100644 --- a/netfetch/Cargo.toml +++ b/netfetch/Cargo.toml @@ -34,7 +34,7 @@ pin-project = "1" lazy_static = "1" libc = "0.2" slidebuf = "0.0.1" -dashmap = "5.5.3" +dashmap = "6.0.1" hashbrown = "0.14.3" log = { path = "../log" } series = { path = "../series" } diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 80f8b3c..f07d74f 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -1,3 +1,5 @@ +mod enumfetch; + use super::proto; use super::proto::CaEventValue; use super::proto::ReadNotify; @@ -10,6 +12,7 @@ use async_channel::Receiver; use async_channel::Sender; use core::fmt; use dbpg::seriesbychannel::ChannelInfoQuery; +use enumfetch::ConnFuture; use err::thiserror; use err::ThisError; use futures_util::Future; @@ -152,6 +155,7 @@ pub enum Error { DurationOutOfBounds, NoFreeCid, InsertQueues(#[from] scywr::insertqueues::Error), + FutLogic, } impl err::ToErr for Error { @@ -285,6 +289,12 @@ struct MakingSeriesWriterState { channel: CreatedState, } +#[derive(Debug, Clone)] +struct FetchEnumDetails { + tsbeg: Instant, + cssid: ChannelStatusSeriesId, +} + #[derive(Debug, Clone)] struct EnableMonitoringState { tsbeg: Instant, @@ -419,6 +429,8 @@ struct CreatedState { dw_lt_last: SystemTime, scalar_type: ScalarType, shape: Shape, + log_more: bool, + name: String, } impl CreatedState { @@ -458,14 +470,21 @@ impl CreatedState { dw_lt_last: SystemTime::UNIX_EPOCH, scalar_type: ScalarType::I8, shape: Shape::Scalar, + log_more: false, + name: String::new(), } } + + pub fn name(&self) -> &str { + &self.name + } } #[derive(Debug)] enum ChannelState { Init(ChannelStatusSeriesId), Creating(CreatingState), + FetchEnumDetails(FetchEnumDetails), MakingSeriesWriter(MakingSeriesWriterState), Writable(WritableState), Closing(ClosingState), @@ -502,6 +521,7 @@ impl ChannelState { let channel_connected_info = match self { ChannelState::Init(..) => ChannelConnectedInfo::Disconnected, ChannelState::Creating { .. } => ChannelConnectedInfo::Connecting, + ChannelState::FetchEnumDetails(_) => ChannelConnectedInfo::Connecting, ChannelState::MakingSeriesWriter(_) => ChannelConnectedInfo::Connecting, ChannelState::Writable(_) => ChannelConnectedInfo::Connected, ChannelState::Error(_) => ChannelConnectedInfo::Error, @@ -587,6 +607,7 @@ impl ChannelState { match self { ChannelState::Init(cssid) => cssid.clone(), ChannelState::Creating(st) => st.cssid.clone(), + ChannelState::FetchEnumDetails(st) => st.cssid.clone(), ChannelState::MakingSeriesWriter(st) => st.channel.cssid.clone(), ChannelState::Writable(st) => st.channel.cssid.clone(), ChannelState::Error(e) => match e { @@ -898,6 +919,7 @@ pub struct CaConn { poll_tsnow: Instant, ioid: u32, read_ioids: HashMap, + handler_by_ioid: HashMap>>>, } impl Drop for CaConn { @@ -964,6 +986,7 @@ impl CaConn { poll_tsnow: tsnow, ioid: 100, read_ioids: HashMap::new(), + handler_by_ioid: HashMap::new(), } } @@ -979,6 +1002,15 @@ impl CaConn { Box::pin(tokio::time::sleep(Duration::from_millis(500))) } + fn proto(&mut self) -> Option<&mut CaProto> { + self.proto.as_mut() + } + + fn ioid_next(&mut self) -> Ioid { + self.ioid = self.ioid.wrapping_add(1); + Ioid(self.ioid) + } + pub fn conn_command_tx(&self) -> Sender { self.conn_command_tx.as_ref().get_ref().clone() } @@ -1298,6 +1330,9 @@ impl CaConn { ChannelState::Creating(st2) => { *chst = ChannelState::Ended(st2.cssid.clone()); } + ChannelState::FetchEnumDetails(st) => { + *chst = ChannelState::Ended(st.cssid.clone()); + } ChannelState::MakingSeriesWriter(st) => { *chst = ChannelState::Ended(st.channel.cssid.clone()); } @@ -1331,6 +1366,9 @@ impl CaConn { ChannelState::Init(..) => { // TODO need last-save-ts for this state. } + ChannelState::FetchEnumDetails(..) => { + // TODO need last-save-ts for this state. + } ChannelState::Creating(..) => { // TODO need last-save-ts for this state. } @@ -1535,7 +1573,10 @@ impl CaConn { } } } - ChannelState::Creating(_) | ChannelState::Init(_) | ChannelState::MakingSeriesWriter(_) => { + ChannelState::Creating(_) + | ChannelState::Init(_) + | ChannelState::FetchEnumDetails(_) + | ChannelState::MakingSeriesWriter(_) => { self.stats.recv_read_notify_but_not_init_yet.inc(); } ChannelState::Closing(_) | ChannelState::Ended(_) | ChannelState::Error(_) => { @@ -1607,94 +1648,112 @@ impl CaConn { Ok(()) } - fn handle_read_notify_res(&mut self, ev: proto::ReadNotifyRes, tsnow: Instant) -> Result<(), Error> { + fn handle_read_notify_res( + &mut self, + ev: proto::ReadNotifyRes, + camsg_ts: Instant, + tsnow: Instant, + ) -> Result<(), Error> { // trace!("handle_read_notify_res {ev:?}"); // TODO can not rely on the SID in the response. let sid_ev = Sid(ev.sid); let ioid = Ioid(ev.ioid); - if let Some(cid) = self.read_ioids.get(&ioid) { - let ch_s = if let Some(x) = self.channels.get_mut(cid) { - &mut x.state + if let Some(pp) = self.handler_by_ioid.get_mut(&ioid) { + if let Some(mut fut) = pp.take() { + let camsg = CaMsg { + ty: CaMsgTy::ReadNotifyRes(ev), + ts: camsg_ts, + }; + fut.as_mut().camsg(camsg, self); + Ok(()) } else { - warn!("handle_read_notify_res can not find channel for {cid:?} {ioid:?}"); - return Ok(()); - }; - match ch_s { - ChannelState::Writable(st) => { - if st.channel.sid != sid_ev { - // TODO count for metrics - // warn!("mismatch in ReadNotifyRes {:?} {:?}", st.channel.sid, sid_ev); - } - let stnow = self.tmp_ts_poll; - let crst = &mut st.channel; - let stwin_ts = stnow.duration_since(std::time::UNIX_EPOCH).unwrap().as_secs() / 1; - if crst.stwin_ts != stwin_ts { - crst.stwin_ts = stwin_ts; - crst.stwin_count = 0; - } - { - crst.stwin_count += 1; - crst.stwin_bytes += ev.payload_len; - } - match &mut st.reading { - ReadingState::Polling(st2) => match &mut st2.tick { - PollTickState::Idle(_st3) => { - self.stats.recv_read_notify_while_polling_idle.inc(); - } - PollTickState::Wait(st3, ioid) => { - let dt = tsnow.saturating_duration_since(*st3); - self.stats.caget_lat().ingest((1e3 * dt.as_secs_f32()) as u32); - // TODO maintain histogram of read-notify latencies - self.read_ioids.remove(ioid); - st2.tick = PollTickState::Idle(tsnow); - let iqdqs = &mut self.iqdqs; - let stats = self.stats.as_ref(); - Self::read_notify_res_for_write(ev, st, iqdqs, stnow, tsnow, stats)?; - } - }, - ReadingState::EnableMonitoring(_) => { - self.stats.recv_read_notify_while_enabling_monitoring.inc(); - } - ReadingState::Monitoring(st2) => match &mut st2.mon2state { - Monitoring2State::Passive(st3) => { - if self.read_ioids.remove(&ioid).is_some() { - self.stats.recv_read_notify_state_passive_found_ioid.inc(); - } else { - self.stats.recv_read_notify_state_passive.inc(); - } - st3.tsbeg = tsnow; - } - Monitoring2State::ReadPending(ioid2, _since) => { - // We don't check again for `since` here. That's done in timeout checking. - // So we could be here a little beyond timeout but we don't care about that. - if ioid != *ioid2 { - // warn!("IOID mismatch ReadNotifyRes on Monitor Read Pending {ioid:?} {ioid2:?}"); - self.stats.recv_read_notify_state_read_pending_bad_ioid.inc(); - } else { - self.stats.recv_read_notify_state_read_pending.inc(); - } - self.read_ioids.remove(&ioid); - st2.mon2state = Monitoring2State::Passive(Monitoring2PassiveState { tsbeg: tsnow }); - let iqdqs = &mut self.iqdqs; - let stats = self.stats.as_ref(); - Self::read_notify_res_for_write(ev, st, iqdqs, stnow, tsnow, stats)?; - } - }, - ReadingState::StopMonitoringForPolling(..) => { - error!("TODO handle_read_notify_res handle StopMonitoringForPolling"); - } - } - } - _ => { - // TODO count instead of print - error!("unexpected state: ReadNotifyRes while having {ch_s:?}"); - } + Err(Error::FutLogic) } } else { - // warn!("unknown {ioid:?}"); - self.stats.unknown_ioid().inc(); + if let Some(cid) = self.read_ioids.get(&ioid) { + let ch_s = if let Some(x) = self.channels.get_mut(cid) { + &mut x.state + } else { + warn!("handle_read_notify_res can not find channel for {cid:?} {ioid:?}"); + return Ok(()); + }; + match ch_s { + ChannelState::Writable(st) => { + if st.channel.sid != sid_ev { + // TODO count for metrics + // warn!("mismatch in ReadNotifyRes {:?} {:?}", st.channel.sid, sid_ev); + } + let stnow = self.tmp_ts_poll; + let crst = &mut st.channel; + let stwin_ts = stnow.duration_since(std::time::UNIX_EPOCH).unwrap().as_secs() / 1; + if crst.stwin_ts != stwin_ts { + crst.stwin_ts = stwin_ts; + crst.stwin_count = 0; + } + { + crst.stwin_count += 1; + crst.stwin_bytes += ev.payload_len; + } + match &mut st.reading { + ReadingState::Polling(st2) => match &mut st2.tick { + PollTickState::Idle(_st3) => { + self.stats.recv_read_notify_while_polling_idle.inc(); + } + PollTickState::Wait(st3, ioid) => { + let dt = tsnow.saturating_duration_since(*st3); + self.stats.caget_lat().ingest((1e3 * dt.as_secs_f32()) as u32); + // TODO maintain histogram of read-notify latencies + self.read_ioids.remove(ioid); + st2.tick = PollTickState::Idle(tsnow); + let iqdqs = &mut self.iqdqs; + let stats = self.stats.as_ref(); + Self::read_notify_res_for_write(ev, st, iqdqs, stnow, tsnow, stats)?; + } + }, + ReadingState::EnableMonitoring(_) => { + self.stats.recv_read_notify_while_enabling_monitoring.inc(); + } + ReadingState::Monitoring(st2) => match &mut st2.mon2state { + Monitoring2State::Passive(st3) => { + if self.read_ioids.remove(&ioid).is_some() { + self.stats.recv_read_notify_state_passive_found_ioid.inc(); + } else { + self.stats.recv_read_notify_state_passive.inc(); + } + st3.tsbeg = tsnow; + } + Monitoring2State::ReadPending(ioid2, _since) => { + // We don't check again for `since` here. That's done in timeout checking. + // So we could be here a little beyond timeout but we don't care about that. + if ioid != *ioid2 { + // warn!("IOID mismatch ReadNotifyRes on Monitor Read Pending {ioid:?} {ioid2:?}"); + self.stats.recv_read_notify_state_read_pending_bad_ioid.inc(); + } else { + self.stats.recv_read_notify_state_read_pending.inc(); + } + self.read_ioids.remove(&ioid); + st2.mon2state = Monitoring2State::Passive(Monitoring2PassiveState { tsbeg: tsnow }); + let iqdqs = &mut self.iqdqs; + let stats = self.stats.as_ref(); + Self::read_notify_res_for_write(ev, st, iqdqs, stnow, tsnow, stats)?; + } + }, + ReadingState::StopMonitoringForPolling(..) => { + error!("TODO handle_read_notify_res handle StopMonitoringForPolling"); + } + } + } + _ => { + // TODO count instead of print + error!("unexpected state: ReadNotifyRes while having {ch_s:?}"); + } + } + } else { + // warn!("unknown {ioid:?}"); + self.stats.unknown_ioid().inc(); + } + Ok(()) } - Ok(()) } fn read_notify_res_for_write( @@ -1733,13 +1792,20 @@ impl CaConn { stnow: SystemTime, stats: &CaConnStats, ) -> Result<(), Error> { - trace_event_incoming!( - "event_add_ingest payload_len {} value {:?} {} {}", - payload_len, - value, - value.status, - value.severity - ); + if crst.log_more { + info!( + "event_add_ingest payload_len {} value {:?} {} {}", + payload_len, value, value.status, value.severity + ); + } else { + trace_event_incoming!( + "event_add_ingest payload_len {} value {:?} {} {}", + payload_len, + value, + value.status, + value.severity + ); + } crst.ts_alive_last = tsnow; crst.ts_activity_last = tsnow; crst.st_activity_last = stnow; @@ -1934,6 +2000,7 @@ impl CaConn { match chst { ChannelState::Init(_) => {} ChannelState::Creating(_) => {} + ChannelState::FetchEnumDetails(_) => {} ChannelState::MakingSeriesWriter(_) => {} ChannelState::Writable(st2) => match &mut st2.reading { ReadingState::EnableMonitoring(_) => {} @@ -2158,7 +2225,7 @@ impl CaConn { trace4!("got EventAddResEmpty {:?}", camsg.ts); Self::handle_event_add_res_empty(self, ev, tsnow)? } - CaMsgTy::ReadNotifyRes(ev) => Self::handle_read_notify_res(self, ev, tsnow)?, + CaMsgTy::ReadNotifyRes(ev) => Self::handle_read_notify_res(self, ev, camsg.ts, tsnow)?, CaMsgTy::Echo => { // let addr = &self.remote_addr_dbg; if let Some(started) = self.ioc_ping_start { @@ -2276,8 +2343,22 @@ impl CaConn { let ca_dbr_type = k.data_type + 14; let scalar_type = ScalarType::from_ca_id(k.data_type)?; let shape = Shape::from_ca_count(k.data_count)?; + + let log_more = match &scalar_type { + ScalarType::Enum => { + if cssid.id() % 20 == 14 { + let name = conf.conf.name(); + info!("ENUM {}", name); + true + } else { + false + } + } + _ => false, + }; + let (acc_msp, _) = TsMs::from_system_time(stnow).to_grid_02(EMIT_ACCOUNTING_SNAP); - let channel = CreatedState { + let created_state = CreatedState { cssid, cid, sid, @@ -2309,20 +2390,40 @@ impl CaConn { dw_lt_last: SystemTime::UNIX_EPOCH, scalar_type: scalar_type.clone(), shape: shape.clone(), + log_more, + name: conf.conf.name().into(), }; - *chst = ChannelState::MakingSeriesWriter(MakingSeriesWriterState { tsbeg: tsnow, channel }); - let job = EstablishWorkerJob::new( - JobId(cid.0 as _), - self.backend.clone(), - conf.conf.name().into(), - cssid, - scalar_type, - shape, - conf.conf.min_quiets(), - self.writer_tx.clone(), - self.tmp_ts_poll, - ); - self.writer_establish_qu.push_back(job); + match &scalar_type { + ScalarType::Enum => { + if created_state.log_more { + let min_quiets = conf.conf.min_quiets(); + let fut = enumfetch::EnumFetch::new(created_state, self, min_quiets); + // TODO should always check if the slot is free. + let ioid = fut.ioid(); + let x = Box::pin(fut); + self.handler_by_ioid.insert(ioid, Some(x)); + } else { + } + } + _ => { + *chst = ChannelState::MakingSeriesWriter(MakingSeriesWriterState { + tsbeg: tsnow, + channel: created_state, + }); + let job = EstablishWorkerJob::new( + JobId(cid.0 as _), + self.backend.clone(), + conf.conf.name().into(), + cssid, + scalar_type, + shape, + conf.conf.min_quiets(), + self.writer_tx.clone(), + self.tmp_ts_poll, + ); + self.writer_establish_qu.push_back(job); + } + } Ok(()) } diff --git a/netfetch/src/ca/conn/enumfetch.rs b/netfetch/src/ca/conn/enumfetch.rs new file mode 100644 index 0000000..f99994c --- /dev/null +++ b/netfetch/src/ca/conn/enumfetch.rs @@ -0,0 +1,83 @@ +use super::CaConn; +use super::CreatedState; +use super::Ioid; +use crate::ca::proto::CaMsg; +use crate::ca::proto::ReadNotify; +use err::thiserror; +use err::ThisError; +use log::*; +use serieswriter::establish_worker::EstablishWorkerJob; +use std::pin::Pin; +use std::task::Poll; +use std::time::Instant; + +#[derive(Debug, ThisError)] +pub enum Error {} + +pub trait ConnFuture: Send { + fn camsg(self: Pin<&mut Self>, camsg: CaMsg, conn: &mut CaConn) -> Poll<()>; +} + +pub struct EnumFetch { + created_state: CreatedState, + ioid: Ioid, + min_quiets: serieswriter::rtwriter::MinQuiets, +} + +impl EnumFetch { + pub fn new(created_state: CreatedState, conn: &mut CaConn, min_quiets: serieswriter::rtwriter::MinQuiets) -> Self { + let name = created_state.name(); + info!("EnumFetch::new name {name}"); + let dbr_ctrl_enum = 31; + let ioid = conn.ioid_next(); + let ty = crate::ca::proto::CaMsgTy::ReadNotify(ReadNotify { + data_type: dbr_ctrl_enum, + data_count: 0, + sid: created_state.sid.to_u32(), + ioid: ioid.0, + }); + let ts = Instant::now(); + let item = CaMsg::from_ty_ts(ty, ts); + conn.proto().unwrap().push_out(item); + Self { + created_state, + ioid, + min_quiets, + } + } + + pub fn ioid(&self) -> Ioid { + self.ioid + } +} + +impl ConnFuture for EnumFetch { + fn camsg(self: Pin<&mut Self>, camsg: CaMsg, conn: &mut CaConn) -> Poll<()> { + use Poll::*; + let tsnow = Instant::now(); + let crst = &self.created_state; + + let name = self.created_state.name(); + info!("EnumFetch::poll {name}"); + + //*chst = + super::ChannelState::MakingSeriesWriter(super::MakingSeriesWriterState { + tsbeg: tsnow, + channel: crst.clone(), + }); + let job = EstablishWorkerJob::new( + serieswriter::establish_worker::JobId(crst.cid.0 as _), + conn.backend.clone(), + crst.name().into(), + crst.cssid.clone(), + crst.scalar_type.clone(), + crst.shape.clone(), + self.min_quiets.clone(), + conn.writer_tx.clone(), + conn.tmp_ts_poll, + ); + conn.writer_establish_qu.push_back(job); + + Pending + } +} diff --git a/netfetch/src/ca/proto.rs b/netfetch/src/ca/proto.rs index e69c757..ac37314 100644 --- a/netfetch/src/ca/proto.rs +++ b/netfetch/src/ca/proto.rs @@ -202,6 +202,7 @@ enum CaDbrMetaType { Plain, Status, Time, + Ctrl, } #[derive(Debug)] @@ -212,6 +213,13 @@ pub struct CaDbrType { impl CaDbrType { pub fn from_ca_u16(k: u16) -> Result { + if k == 31 { + let ret = CaDbrType { + meta: CaDbrMetaType::Ctrl, + scalar_type: CaScalarType::Enum, + }; + return Ok(ret); + } if k > 20 { return Err(Error::BadCaDbrTypeId(k)); } @@ -908,18 +916,39 @@ impl CaMsg { fn extract_ca_data_value(hi: &HeadInfo, payload: &[u8], array_truncate: usize) -> Result { use netpod::Shape; let ca_dbr_ty = CaDbrType::from_ca_u16(hi.data_type)?; - if let CaDbrMetaType::Time = ca_dbr_ty.meta { - } else { - return Err(Error::MismatchDbrTimeType); - } - let ca_status = u16::from_be_bytes(payload[0..2].try_into().map_err(|_| Error::BadSlice)?); - let ca_severity = u16::from_be_bytes(payload[2..4].try_into().map_err(|_| Error::BadSlice)?); - let ca_secs = u32::from_be_bytes(payload[4..8].try_into().map_err(|_| Error::BadSlice)?); - let ca_nanos = u32::from_be_bytes(payload[8..12].try_into().map_err(|_| Error::BadSlice)?); - let ca_sh = Shape::from_ca_count(hi.data_count() as _).map_err(|_| { - error!("BadCaCount {hi:?}"); - Error::BadCaCount - })?; + let ca_status; + let ca_severity; + let ca_secs; + let ca_nanos; + let ca_sh; + let data_offset = match &ca_dbr_ty.meta { + CaDbrMetaType::Plain => return Err(Error::MismatchDbrTimeType), + CaDbrMetaType::Status => return Err(Error::MismatchDbrTimeType), + CaDbrMetaType::Time => { + ca_status = u16::from_be_bytes(payload[0..2].try_into().map_err(|_| Error::BadSlice)?); + ca_severity = u16::from_be_bytes(payload[2..4].try_into().map_err(|_| Error::BadSlice)?); + ca_secs = u32::from_be_bytes(payload[4..8].try_into().map_err(|_| Error::BadSlice)?); + ca_nanos = u32::from_be_bytes(payload[8..12].try_into().map_err(|_| Error::BadSlice)?); + ca_sh = Shape::from_ca_count(hi.data_count() as _).map_err(|_| { + error!("BadCaCount {hi:?}"); + Error::BadCaCount + })?; + 12 + } + CaDbrMetaType::Ctrl => { + ca_status = u16::from_be_bytes(payload[0..2].try_into().map_err(|_| Error::BadSlice)?); + ca_severity = u16::from_be_bytes(payload[2..4].try_into().map_err(|_| Error::BadSlice)?); + let st = std::time::SystemTime::now(); + let dt = st.duration_since(std::time::SystemTime::UNIX_EPOCH).unwrap(); + ca_secs = (dt.as_secs() - EPICS_EPOCH_OFFSET) as u32; + ca_nanos = dt.subsec_nanos(); + ca_sh = Shape::Scalar; + let varcnt = u16::from_be_bytes(payload[4..6].try_into().map_err(|_| Error::BadSlice)?); + let s = String::from_utf8_lossy(&payload[6..6 + 26 * 16]); + info!("enum variants debug {varcnt} {s}"); + 2 + 2 + 2 + 26 * 16 + } + }; let meta_padding = match ca_dbr_ty.meta { CaDbrMetaType::Plain => 0, CaDbrMetaType::Status => match ca_dbr_ty.scalar_type { @@ -940,8 +969,17 @@ impl CaMsg { CaScalarType::Enum => 2, CaScalarType::String => 0, }, + CaDbrMetaType::Ctrl => match ca_dbr_ty.scalar_type { + CaScalarType::I8 => 1, + CaScalarType::I16 => 0, + CaScalarType::I32 => 0, + CaScalarType::F32 => 0, + CaScalarType::F64 => 0, + CaScalarType::Enum => 0, + CaScalarType::String => 0, + }, }; - let valbuf = &payload[12 + meta_padding..]; + let valbuf = &payload[data_offset + meta_padding..]; let value = match ca_sh { Shape::Scalar => Self::ca_scalar_value(&ca_dbr_ty.scalar_type, valbuf)?, Shape::Wave(n) => Self::ca_wave_value(&ca_dbr_ty.scalar_type, (n as usize).min(array_truncate), valbuf)?, diff --git a/scywr/Cargo.toml b/scywr/Cargo.toml index d46430c..20a78dd 100644 --- a/scywr/Cargo.toml +++ b/scywr/Cargo.toml @@ -7,7 +7,7 @@ edition = "2021" [dependencies] futures-util = "0.3.28" async-channel = "2.3.1" -scylla = "0.11.0" +scylla = "0.13.0" smallvec = "1.11.0" pin-project = "1.1.5" stackfuture = "0.3.0" diff --git a/serieswriter/src/rtwriter.rs b/serieswriter/src/rtwriter.rs index 59d46e8..c7ef687 100644 --- a/serieswriter/src/rtwriter.rs +++ b/serieswriter/src/rtwriter.rs @@ -33,7 +33,7 @@ pub enum Error { SeriesWriter(#[from] crate::writer::Error), } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct MinQuiets { pub st: Duration, pub mt: Duration,