From 90fe799ba6cdef57e208a0c14aba4c076ba79c93 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 24 Jul 2024 22:13:48 +0200 Subject: [PATCH] Some cleanup --- batchtools/src/channeltest.rs | 1 + daqingest/src/tools.rs | 1 - netfetch/src/ca/beacons.rs | 30 +++++++++++++++++++++++++- netfetch/src/ca/conn/enumfetch.rs | 8 ++----- netfetch/src/ca/proto.rs | 3 +-- scywr/src/iteminsertqueue.rs | 36 +------------------------------ 6 files changed, 34 insertions(+), 45 deletions(-) diff --git a/batchtools/src/channeltest.rs b/batchtools/src/channeltest.rs index b13ea93..db147cc 100644 --- a/batchtools/src/channeltest.rs +++ b/batchtools/src/channeltest.rs @@ -39,6 +39,7 @@ async fn consumer(rx: Receiver) { while let Ok(_x) = rx.recv().await {} } +#[allow(unused)] struct Item { x: u64, y: u64, diff --git a/daqingest/src/tools.rs b/daqingest/src/tools.rs index fa82351..cdd2cb8 100644 --- a/daqingest/src/tools.rs +++ b/daqingest/src/tools.rs @@ -21,7 +21,6 @@ use scywr::scylla::transport::errors::QueryError; use scywr::scylla::transport::iterator::NextRowError; use scywr::session::ScySession; use series::SeriesId; -use std::collections::VecDeque; use std::sync::Arc; use std::time::Duration; use std::time::Instant; diff --git a/netfetch/src/ca/beacons.rs b/netfetch/src/ca/beacons.rs index 7f5aed6..858ede8 100644 --- a/netfetch/src/ca/beacons.rs +++ b/netfetch/src/ca/beacons.rs @@ -5,6 +5,7 @@ use err::thiserror; use err::ThisError; use log::*; use netpod::ScalarType; +use netpod::SeriesKind; use netpod::Shape; use netpod::TsNano; use scywr::iteminsertqueue::DataValue; @@ -21,6 +22,21 @@ use taskrun::tokio::net::UdpSocket; pub enum Error { Io(#[from] std::io::Error), SeriesWriter(#[from] serieswriter::writer::Error), + ChannelSend, + ChannelRecv, + ChannelLookup(#[from] dbpg::seriesbychannel::Error), +} + +impl From> for Error { + fn from(_value: async_channel::SendError) -> Self { + Self::ChannelSend + } +} + +impl From for Error { + fn from(_value: async_channel::RecvError) -> Self { + Self::ChannelRecv + } } pub async fn listen_beacons( @@ -32,7 +48,19 @@ pub async fn listen_beacons( let channel = "epics-ca-beacons".to_string(); let scalar_type = ScalarType::U64; let shape = Shape::Scalar; - // let mut writer = SeriesWriter::establish(worker_tx, backend, channel, scalar_type, shape, stnow).await?; + let (tx, rx) = async_channel::bounded(1); + let qu = ChannelInfoQuery { + backend, + channel, + kind: SeriesKind::ChannelData, + scalar_type, + shape, + tx: Box::pin(tx), + }; + worker_tx.send(qu).await?; + let chinfo = rx.recv().await??; + // TODO + // let mut writer = SeriesWriter::new(chinfo.series.to_series()); // let mut deque = VecDeque::new(); let sock = UdpSocket::bind("0.0.0.0:5065").await?; sock.set_broadcast(true).unwrap(); diff --git a/netfetch/src/ca/conn/enumfetch.rs b/netfetch/src/ca/conn/enumfetch.rs index 33e57df..d9ac921 100644 --- a/netfetch/src/ca/conn/enumfetch.rs +++ b/netfetch/src/ca/conn/enumfetch.rs @@ -29,8 +29,7 @@ pub struct EnumFetch { impl EnumFetch { pub fn new(created_state: CreatedState, conn: &mut CaConn) -> Self { if created_state.cssid.id() == 4705698279895902114 {} - let name = created_state.name(); - // info!("EnumFetch::new name {name}"); + // info!("EnumFetch::new name {}", created_state.name()); let dbr_ctrl_enum = 31; let ioid = conn.ioid_next(); let ty = crate::ca::proto::CaMsgTy::ReadNotify(ReadNotify { @@ -54,10 +53,7 @@ impl ConnFuture for EnumFetch { fn camsg(mut self: Pin<&mut Self>, camsg: CaMsg, conn: &mut CaConn) -> Result<(), Error> { let tsnow = Instant::now(); let crst = &mut self.created_state; - - let name = crst.name(); - // info!("EnumFetch::poll {name}"); - + // info!("EnumFetch::poll name {}", crst.name()); match camsg.ty { crate::ca::proto::CaMsgTy::ReadNotifyRes(msg2) => match msg2.value.meta { super::proto::CaMetaValue::CaMetaVariants(meta) => { diff --git a/netfetch/src/ca/proto.rs b/netfetch/src/ca/proto.rs index 1949826..62be960 100644 --- a/netfetch/src/ca/proto.rs +++ b/netfetch/src/ca/proto.rs @@ -4,7 +4,6 @@ use err::ThisError; use futures_util::Stream; use log::*; use netpod::timeunits::*; -use netpod::TsNano; use slidebuf::SlideBuf; use stats::CaProtoStats; use std::collections::VecDeque; @@ -986,7 +985,7 @@ impl CaMsg { if varcnt > 16 { return Err(Error::BadCaCount); } - let s = String::from_utf8_lossy(&payload[6..6 + 26 * 16]); + // let s = String::from_utf8_lossy(&payload[6..6 + 26 * 16]); let mut variants = Vec::new(); for i in 0..varcnt { let p = (6 + 26 * i) as usize; diff --git a/scywr/src/iteminsertqueue.rs b/scywr/src/iteminsertqueue.rs index 3ca88f4..f37e361 100644 --- a/scywr/src/iteminsertqueue.rs +++ b/scywr/src/iteminsertqueue.rs @@ -10,7 +10,6 @@ use futures_util::FutureExt; #[allow(unused)] use netpod::log::*; use netpod::DtNano; -use netpod::ScalarType; use netpod::Shape; use netpod::TsMs; use netpod::TsNano; @@ -316,40 +315,6 @@ impl DataValue { } } - fn unused_scalar_type(&self) -> ScalarType { - match self { - DataValue::Scalar(x) => match x { - ScalarValue::U8(_) => ScalarType::U8, - ScalarValue::U16(_) => ScalarType::U16, - ScalarValue::U32(_) => ScalarType::U32, - ScalarValue::U64(_) => ScalarType::U64, - ScalarValue::I8(_) => ScalarType::I8, - ScalarValue::I16(_) => ScalarType::I16, - ScalarValue::I32(_) => ScalarType::I32, - ScalarValue::I64(_) => ScalarType::I64, - ScalarValue::F32(_) => ScalarType::F32, - ScalarValue::F64(_) => ScalarType::F64, - ScalarValue::Enum(..) => ScalarType::Enum, - ScalarValue::String(_) => ScalarType::STRING, - ScalarValue::Bool(_) => ScalarType::BOOL, - ScalarValue::CaStatus(_) => ScalarType::I16, - }, - DataValue::Array(x) => match x { - ArrayValue::U8(_) => ScalarType::U8, - ArrayValue::U16(_) => ScalarType::U16, - ArrayValue::U32(_) => ScalarType::U32, - ArrayValue::U64(_) => ScalarType::U64, - ArrayValue::I8(_) => ScalarType::I8, - ArrayValue::I16(_) => ScalarType::I16, - ArrayValue::I32(_) => ScalarType::I32, - ArrayValue::I64(_) => ScalarType::I64, - ArrayValue::F32(_) => ScalarType::F32, - ArrayValue::F64(_) => ScalarType::F64, - ArrayValue::Bool(_) => ScalarType::BOOL, - }, - } - } - pub fn shape(&self) -> Shape { match self { DataValue::Scalar(_) => Shape::Scalar, @@ -651,6 +616,7 @@ struct InsParCom { ts_msp: TsMs, ts_lsp: DtNano, ts_net: Instant, + #[allow(unused)] do_insert: bool, stats: Arc, }