This commit is contained in:
Dominik Werder
2023-05-09 16:35:35 +02:00
parent 2d9263c74e
commit c43a558c0e
3 changed files with 60 additions and 6 deletions

View File

@@ -9,6 +9,3 @@ debug-assertions = false
lto = "thin"
codegen-units = 64
incremental = true
[patch.crates-io]
#tokio = { git = "https://github.com/dominikwerder/tokio", rev = "995221d8" }

View File

@@ -40,5 +40,8 @@ log = { path = "../log" }
stats = { path = "../stats" }
err = { path = "../../daqbuffer/err" }
netpod = { path = "../../daqbuffer/netpod" }
items_0 = { path = "../../daqbuffer/items_0" }
items_2 = { path = "../../daqbuffer/items_2" }
streams = { path = "../../daqbuffer/streams" }
taskrun = { path = "../../daqbuffer/taskrun" }
bitshuffle = { path = "../../daqbuffer/bitshuffle" }

View File

@@ -30,6 +30,9 @@ use futures_util::Future;
use futures_util::FutureExt;
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::Appendable;
use items_0::Empty;
use items_2::eventsdim0::EventsDim0;
use log::*;
use netpod::timeunits::*;
use netpod::ScalarType;
@@ -39,6 +42,7 @@ use netpod::TS_MSP_GRID_UNIT;
use serde::Serialize;
use stats::CaConnStats;
use stats::IntervalEma;
use std::any::Any;
use std::collections::BTreeMap;
use std::collections::VecDeque;
use std::net::SocketAddrV4;
@@ -435,6 +439,7 @@ pub struct CaConn {
series_lookup_futs: FuturesUnordered<
Pin<Box<dyn Future<Output = Result<(Cid, u32, u16, u16, Existence<SeriesId>), Error>> + Send>>,
>,
events_acc: Box<dyn Any + Send>,
}
impl CaConn {
@@ -487,6 +492,7 @@ impl CaConn {
channel_info_query_tx,
series_lookup_schedule: BTreeMap::new(),
series_lookup_futs: FuturesUnordered::new(),
events_acc: Box::new(()),
}
}
@@ -909,6 +915,10 @@ impl CaConn {
if data_type > 6 {
error!("data type of series unexpected: {}", data_type);
}
// TODO handle error better! Transition channel to Error state?
let scalar_type = ScalarType::from_ca_id(data_type)?;
let shape = Shape::from_ca_count(data_count)?;
self.setup_event_acc(&scalar_type, &shape)?;
let subid = self.subid_store.next();
self.cid_by_subid.insert(subid, cid);
let name = self.name_by_cid(cid).unwrap().to_string();
@@ -938,9 +948,8 @@ impl CaConn {
cssid,
cid,
sid,
// TODO handle error better! Transition channel to Error state?
scalar_type: ScalarType::from_ca_id(data_type)?,
shape: Shape::from_ca_count(data_count)?,
scalar_type,
shape,
ts_created: tsnow,
ts_alive_last: tsnow,
state: MonitoringState::AddingEvent(series.clone()),
@@ -1020,6 +1029,51 @@ impl CaConn {
}
}
// TODO write generic.
// TODO store a pointer to the type-resolved function for faster usage.
fn event_acc_push(&mut self, ts: u64, ev: proto::EventAddRes) -> Result<(), Error> {
match &ev.value.data {
proto::CaDataValue::Scalar(k) => {
use proto::CaDataScalarValue::*;
match k {
F64(v) => {
if let Some(c) = self.events_acc.downcast_mut::<EventsDim0<f64>>() {
c.push(ts, 0, v.clone());
// TODO check for a max length.
// Also check at every check-tick.
} else {
}
}
_ => {
// TODO
}
}
}
proto::CaDataValue::Array(_) => {
// TODO
}
}
Ok(())
}
fn setup_event_acc(&mut self, scalar_type: &ScalarType, shape: &Shape) -> Result<(), Error> {
use ScalarType::*;
match shape {
Shape::Scalar => match scalar_type {
F64 => {
self.events_acc = Box::new(EventsDim0::<f64>::empty());
}
_ => {
// TODO
}
},
_ => {
// TODO
}
}
Ok(())
}
fn event_add_insert(
st: &mut CreatedState,
series: SeriesId,