Some cleanup
This commit is contained in:
@@ -39,6 +39,7 @@ async fn consumer(rx: Receiver<Item>) {
|
||||
while let Ok(_x) = rx.recv().await {}
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
struct Item {
|
||||
x: u64,
|
||||
y: u64,
|
||||
|
||||
@@ -21,7 +21,6 @@ use scywr::scylla::transport::errors::QueryError;
|
||||
use scywr::scylla::transport::iterator::NextRowError;
|
||||
use scywr::session::ScySession;
|
||||
use series::SeriesId;
|
||||
use std::collections::VecDeque;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
|
||||
@@ -5,6 +5,7 @@ use err::thiserror;
|
||||
use err::ThisError;
|
||||
use log::*;
|
||||
use netpod::ScalarType;
|
||||
use netpod::SeriesKind;
|
||||
use netpod::Shape;
|
||||
use netpod::TsNano;
|
||||
use scywr::iteminsertqueue::DataValue;
|
||||
@@ -21,6 +22,21 @@ use taskrun::tokio::net::UdpSocket;
|
||||
pub enum Error {
|
||||
Io(#[from] std::io::Error),
|
||||
SeriesWriter(#[from] serieswriter::writer::Error),
|
||||
ChannelSend,
|
||||
ChannelRecv,
|
||||
ChannelLookup(#[from] dbpg::seriesbychannel::Error),
|
||||
}
|
||||
|
||||
impl<T> From<async_channel::SendError<T>> for Error {
|
||||
fn from(_value: async_channel::SendError<T>) -> Self {
|
||||
Self::ChannelSend
|
||||
}
|
||||
}
|
||||
|
||||
impl From<async_channel::RecvError> for Error {
|
||||
fn from(_value: async_channel::RecvError) -> Self {
|
||||
Self::ChannelRecv
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn listen_beacons(
|
||||
@@ -32,7 +48,19 @@ pub async fn listen_beacons(
|
||||
let channel = "epics-ca-beacons".to_string();
|
||||
let scalar_type = ScalarType::U64;
|
||||
let shape = Shape::Scalar;
|
||||
// let mut writer = SeriesWriter::establish(worker_tx, backend, channel, scalar_type, shape, stnow).await?;
|
||||
let (tx, rx) = async_channel::bounded(1);
|
||||
let qu = ChannelInfoQuery {
|
||||
backend,
|
||||
channel,
|
||||
kind: SeriesKind::ChannelData,
|
||||
scalar_type,
|
||||
shape,
|
||||
tx: Box::pin(tx),
|
||||
};
|
||||
worker_tx.send(qu).await?;
|
||||
let chinfo = rx.recv().await??;
|
||||
// TODO
|
||||
// let mut writer = SeriesWriter::new(chinfo.series.to_series());
|
||||
// let mut deque = VecDeque::new();
|
||||
let sock = UdpSocket::bind("0.0.0.0:5065").await?;
|
||||
sock.set_broadcast(true).unwrap();
|
||||
|
||||
@@ -29,8 +29,7 @@ pub struct EnumFetch {
|
||||
impl EnumFetch {
|
||||
pub fn new(created_state: CreatedState, conn: &mut CaConn) -> Self {
|
||||
if created_state.cssid.id() == 4705698279895902114 {}
|
||||
let name = created_state.name();
|
||||
// info!("EnumFetch::new name {name}");
|
||||
// info!("EnumFetch::new name {}", created_state.name());
|
||||
let dbr_ctrl_enum = 31;
|
||||
let ioid = conn.ioid_next();
|
||||
let ty = crate::ca::proto::CaMsgTy::ReadNotify(ReadNotify {
|
||||
@@ -54,10 +53,7 @@ impl ConnFuture for EnumFetch {
|
||||
fn camsg(mut self: Pin<&mut Self>, camsg: CaMsg, conn: &mut CaConn) -> Result<(), Error> {
|
||||
let tsnow = Instant::now();
|
||||
let crst = &mut self.created_state;
|
||||
|
||||
let name = crst.name();
|
||||
// info!("EnumFetch::poll {name}");
|
||||
|
||||
// info!("EnumFetch::poll name {}", crst.name());
|
||||
match camsg.ty {
|
||||
crate::ca::proto::CaMsgTy::ReadNotifyRes(msg2) => match msg2.value.meta {
|
||||
super::proto::CaMetaValue::CaMetaVariants(meta) => {
|
||||
|
||||
@@ -4,7 +4,6 @@ use err::ThisError;
|
||||
use futures_util::Stream;
|
||||
use log::*;
|
||||
use netpod::timeunits::*;
|
||||
use netpod::TsNano;
|
||||
use slidebuf::SlideBuf;
|
||||
use stats::CaProtoStats;
|
||||
use std::collections::VecDeque;
|
||||
@@ -986,7 +985,7 @@ impl CaMsg {
|
||||
if varcnt > 16 {
|
||||
return Err(Error::BadCaCount);
|
||||
}
|
||||
let s = String::from_utf8_lossy(&payload[6..6 + 26 * 16]);
|
||||
// let s = String::from_utf8_lossy(&payload[6..6 + 26 * 16]);
|
||||
let mut variants = Vec::new();
|
||||
for i in 0..varcnt {
|
||||
let p = (6 + 26 * i) as usize;
|
||||
|
||||
@@ -10,7 +10,6 @@ use futures_util::FutureExt;
|
||||
#[allow(unused)]
|
||||
use netpod::log::*;
|
||||
use netpod::DtNano;
|
||||
use netpod::ScalarType;
|
||||
use netpod::Shape;
|
||||
use netpod::TsMs;
|
||||
use netpod::TsNano;
|
||||
@@ -316,40 +315,6 @@ impl DataValue {
|
||||
}
|
||||
}
|
||||
|
||||
fn unused_scalar_type(&self) -> ScalarType {
|
||||
match self {
|
||||
DataValue::Scalar(x) => match x {
|
||||
ScalarValue::U8(_) => ScalarType::U8,
|
||||
ScalarValue::U16(_) => ScalarType::U16,
|
||||
ScalarValue::U32(_) => ScalarType::U32,
|
||||
ScalarValue::U64(_) => ScalarType::U64,
|
||||
ScalarValue::I8(_) => ScalarType::I8,
|
||||
ScalarValue::I16(_) => ScalarType::I16,
|
||||
ScalarValue::I32(_) => ScalarType::I32,
|
||||
ScalarValue::I64(_) => ScalarType::I64,
|
||||
ScalarValue::F32(_) => ScalarType::F32,
|
||||
ScalarValue::F64(_) => ScalarType::F64,
|
||||
ScalarValue::Enum(..) => ScalarType::Enum,
|
||||
ScalarValue::String(_) => ScalarType::STRING,
|
||||
ScalarValue::Bool(_) => ScalarType::BOOL,
|
||||
ScalarValue::CaStatus(_) => ScalarType::I16,
|
||||
},
|
||||
DataValue::Array(x) => match x {
|
||||
ArrayValue::U8(_) => ScalarType::U8,
|
||||
ArrayValue::U16(_) => ScalarType::U16,
|
||||
ArrayValue::U32(_) => ScalarType::U32,
|
||||
ArrayValue::U64(_) => ScalarType::U64,
|
||||
ArrayValue::I8(_) => ScalarType::I8,
|
||||
ArrayValue::I16(_) => ScalarType::I16,
|
||||
ArrayValue::I32(_) => ScalarType::I32,
|
||||
ArrayValue::I64(_) => ScalarType::I64,
|
||||
ArrayValue::F32(_) => ScalarType::F32,
|
||||
ArrayValue::F64(_) => ScalarType::F64,
|
||||
ArrayValue::Bool(_) => ScalarType::BOOL,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn shape(&self) -> Shape {
|
||||
match self {
|
||||
DataValue::Scalar(_) => Shape::Scalar,
|
||||
@@ -651,6 +616,7 @@ struct InsParCom {
|
||||
ts_msp: TsMs,
|
||||
ts_lsp: DtNano,
|
||||
ts_net: Instant,
|
||||
#[allow(unused)]
|
||||
do_insert: bool,
|
||||
stats: Arc<InsertWorkerStats>,
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user