From c43a558c0e383447eafef3619a8fd67345d7cd28 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 9 May 2023 16:35:35 +0200 Subject: [PATCH] WIP --- Cargo.toml | 3 --- netfetch/Cargo.toml | 3 +++ netfetch/src/ca/conn.rs | 60 ++++++++++++++++++++++++++++++++++++++--- 3 files changed, 60 insertions(+), 6 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 54cc388..e821c7a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,3 @@ debug-assertions = false lto = "thin" codegen-units = 64 incremental = true - -[patch.crates-io] -#tokio = { git = "https://github.com/dominikwerder/tokio", rev = "995221d8" } diff --git a/netfetch/Cargo.toml b/netfetch/Cargo.toml index 16c19c4..8d8fb11 100644 --- a/netfetch/Cargo.toml +++ b/netfetch/Cargo.toml @@ -40,5 +40,8 @@ log = { path = "../log" } stats = { path = "../stats" } err = { path = "../../daqbuffer/err" } netpod = { path = "../../daqbuffer/netpod" } +items_0 = { path = "../../daqbuffer/items_0" } +items_2 = { path = "../../daqbuffer/items_2" } +streams = { path = "../../daqbuffer/streams" } taskrun = { path = "../../daqbuffer/taskrun" } bitshuffle = { path = "../../daqbuffer/bitshuffle" } diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 59aa710..8058a66 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -30,6 +30,9 @@ use futures_util::Future; use futures_util::FutureExt; use futures_util::Stream; use futures_util::StreamExt; +use items_0::Appendable; +use items_0::Empty; +use items_2::eventsdim0::EventsDim0; use log::*; use netpod::timeunits::*; use netpod::ScalarType; @@ -39,6 +42,7 @@ use netpod::TS_MSP_GRID_UNIT; use serde::Serialize; use stats::CaConnStats; use stats::IntervalEma; +use std::any::Any; use std::collections::BTreeMap; use std::collections::VecDeque; use std::net::SocketAddrV4; @@ -435,6 +439,7 @@ pub struct CaConn { series_lookup_futs: FuturesUnordered< Pin), Error>> + Send>>, >, + events_acc: Box, } impl CaConn { @@ -487,6 +492,7 @@ impl CaConn { channel_info_query_tx, series_lookup_schedule: BTreeMap::new(), series_lookup_futs: FuturesUnordered::new(), + events_acc: Box::new(()), } } @@ -909,6 +915,10 @@ impl CaConn { if data_type > 6 { error!("data type of series unexpected: {}", data_type); } + // TODO handle error better! Transition channel to Error state? + let scalar_type = ScalarType::from_ca_id(data_type)?; + let shape = Shape::from_ca_count(data_count)?; + self.setup_event_acc(&scalar_type, &shape)?; let subid = self.subid_store.next(); self.cid_by_subid.insert(subid, cid); let name = self.name_by_cid(cid).unwrap().to_string(); @@ -938,9 +948,8 @@ impl CaConn { cssid, cid, sid, - // TODO handle error better! Transition channel to Error state? - scalar_type: ScalarType::from_ca_id(data_type)?, - shape: Shape::from_ca_count(data_count)?, + scalar_type, + shape, ts_created: tsnow, ts_alive_last: tsnow, state: MonitoringState::AddingEvent(series.clone()), @@ -1020,6 +1029,51 @@ impl CaConn { } } + // TODO write generic. + // TODO store a pointer to the type-resolved function for faster usage. + fn event_acc_push(&mut self, ts: u64, ev: proto::EventAddRes) -> Result<(), Error> { + match &ev.value.data { + proto::CaDataValue::Scalar(k) => { + use proto::CaDataScalarValue::*; + match k { + F64(v) => { + if let Some(c) = self.events_acc.downcast_mut::>() { + c.push(ts, 0, v.clone()); + // TODO check for a max length. + // Also check at every check-tick. + } else { + } + } + _ => { + // TODO + } + } + } + proto::CaDataValue::Array(_) => { + // TODO + } + } + Ok(()) + } + + fn setup_event_acc(&mut self, scalar_type: &ScalarType, shape: &Shape) -> Result<(), Error> { + use ScalarType::*; + match shape { + Shape::Scalar => match scalar_type { + F64 => { + self.events_acc = Box::new(EventsDim0::::empty()); + } + _ => { + // TODO + } + }, + _ => { + // TODO + } + } + Ok(()) + } + fn event_add_insert( st: &mut CreatedState, series: SeriesId,