WIP factor into subfunctions

This commit is contained in:
Dominik Werder
2022-05-04 16:38:17 +02:00
parent b206bd0eb0
commit fb8184957c

View File

@@ -1,4 +1,4 @@
use super::proto::{CaItem, CaMsg, CaMsgTy, CaProto};
use super::proto::{self, CaItem, CaMsg, CaMsgTy, CaProto, EventAddRes};
use super::store::DataStore;
use crate::bsread::ChannelDescDecoded;
use crate::ca::proto::{CreateChan, EventAdd, HeadInfo, ReadNotify};
@@ -20,6 +20,8 @@ use std::time::{Duration, Instant, SystemTime};
use tokio::io::unix::AsyncFd;
use tokio::net::TcpStream;
const INSERT_FUTS_MAX: usize = 10;
#[derive(Debug)]
enum ChannelError {
NoSuccess,
@@ -141,6 +143,107 @@ impl CaConn {
fn name_by_cid(&self, cid: u32) -> Option<&str> {
self.name_by_cid.get(&cid).map(|x| x.as_str())
}
fn handle_event_add_res(&mut self, ev: proto::EventAddRes) {
// TODO handle subid-not-found which can also be peer error:
let cid = *self.cid_by_subid.get(&ev.subid).unwrap();
// TODO get rid of the string clone when I don't want the log output any longer:
let name: String = self.name_by_cid(cid).unwrap().into();
// TODO handle not-found error:
let mut series_2 = None;
let ch_s = self.channels.get_mut(&cid).unwrap();
match ch_s {
ChannelState::Created(st) => {
match st.state {
MonitoringState::AddingEvent(ref series) => {
let series = series.clone();
series_2 = Some(series.clone());
info!("Confirmation {name} is subscribed.");
// TODO get ts from faster common source:
st.state = MonitoringState::Evented(
series,
EventedState {
ts_last: Instant::now(),
},
);
}
MonitoringState::Evented(ref series, ref mut st) => {
series_2 = Some(series.clone());
// TODO get ts from faster common source:
st.ts_last = Instant::now();
}
_ => {
error!("unexpected state: EventAddRes while having {ch_s:?}");
}
}
}
_ => {
error!("unexpected state: EventAddRes while having {ch_s:?}");
}
}
{
let series = series_2.unwrap();
// TODO where to actually get the time from?
let ts = SystemTime::now();
let epoch = ts.duration_since(std::time::UNIX_EPOCH).unwrap();
// TODO decide on better msp/lsp: random offset!
// As long as one writer is active, the msp is arbitrary.
let ts = epoch.as_secs() * 1000000000 + epoch.subsec_nanos() as u64;
let ts_msp = ts / (30 * SEC) * (30 * SEC);
let ts_lsp = ts - ts_msp;
// TODO make sure that I only accept types I expect.
use crate::ca::proto::CaDataScalarValue::*;
use crate::ca::proto::CaDataValue::*;
match ev.value {
Scalar(v) => match v {
F64(val) => match ch_s {
ChannelState::Created(st) => match st.shape {
Shape::Scalar => match st.scalar_type {
ScalarType::F64 => self.insert_scalar_f64(series, ts_msp, ts_lsp, val),
_ => {
error!("unexpected value type");
}
},
_ => {
error!("unexpected value shape");
}
},
_ => {
error!("got value but channel not created");
}
},
_ => {}
},
Array => {}
}
}
}
fn insert_scalar_f64(&mut self, series: SeriesId, ts_msp: u64, ts_lsp: u64, val: f64) {
let pulse = 0 as u64;
let y = unsafe { &*(self as *const CaConn) };
let fut1 = y
.data_store
.scy
.execute(&y.data_store.qu_insert_ts_msp, (series.id() as i64, ts_msp as i64))
.map(|_| Ok::<_, Error>(()))
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")));
let fut2 = y
.data_store
.scy
.execute(
&y.data_store.qu_insert_scalar_f64,
(series.id() as i64, ts_msp as i64, ts_lsp as i64, pulse as i64, val),
)
.map(|_| Ok::<_, Error>(()))
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")));
let fut = fut1.and_then(move |_a| fut2);
if self.value_insert_futs.len() > INSERT_FUTS_MAX {
warn!("can not keep up");
} else {
self.value_insert_futs.push(Box::pin(fut) as _);
}
}
}
impl Stream for CaConn {
@@ -343,9 +446,7 @@ impl Stream for CaConn {
byte_order: netpod::ByteOrder::LE,
compression: None,
};
let y = self.as_ref();
let y = unsafe { Pin::into_inner_unchecked(y) };
let y = unsafe { &*(y as *const CaConn) };
let y = unsafe { &*(&self as &Self as *const CaConn) };
let fut =
y.data_store.chan_reg.get_series_id(cd).map_ok(move |series| {
(cid, k.sid, k.data_type, k.data_count, series)
@@ -354,119 +455,7 @@ impl Stream for CaConn {
self.fut_get_series.push(Box::pin(fut) as _);
do_wake_again = true;
}
CaMsgTy::EventAddRes(k) => {
// TODO handle subid-not-found which can also be peer error:
let cid = *self.cid_by_subid.get(&k.subid).unwrap();
// TODO get rid of the string clone when I don't want the log output any longer:
let name: String = self.name_by_cid(cid).unwrap().into();
// TODO handle not-found error:
let mut series_2 = None;
let ch_s = self.channels.get_mut(&cid).unwrap();
match ch_s {
ChannelState::Created(st) => {
match st.state {
MonitoringState::AddingEvent(ref series) => {
let series = series.clone();
series_2 = Some(series.clone());
info!("Confirmation {name} is subscribed.");
// TODO get ts from faster common source:
st.state = MonitoringState::Evented(
series,
EventedState {
ts_last: Instant::now(),
},
);
}
MonitoringState::Evented(ref series, ref mut st) => {
series_2 = Some(series.clone());
// TODO get ts from faster common source:
st.ts_last = Instant::now();
}
_ => {
error!(
"unexpected state: EventAddRes while having {ch_s:?}"
);
}
}
}
_ => {
error!("unexpected state: EventAddRes while having {ch_s:?}");
}
}
{
let series = series_2.unwrap();
// TODO where to actually get the time from?
let ts = SystemTime::now();
let epoch = ts.duration_since(std::time::UNIX_EPOCH).unwrap();
let ts = epoch.as_secs() * 1000000000 + epoch.subsec_nanos() as u64;
let ts_msp = ts / (30 * SEC) * (30 * SEC);
let ts_lsp = ts - ts_msp;
// TODO make sure that I only accept types I expect.
use crate::ca::proto::CaDataScalarValue::*;
use crate::ca::proto::CaDataValue::*;
match k.value {
Scalar(v) => {
match v {
F64(val) => {
match ch_s {
ChannelState::Created(st) => {
match st.shape {
Shape::Scalar => {
match st.scalar_type {
ScalarType::F64 => {
// TODO get msp/lsp: cap at some count. only one writer!!
let y = self.as_ref();
let y = unsafe {
Pin::into_inner_unchecked(y)
};
let y = unsafe {
&*(y as *const CaConn)
};
let fut1 = y.data_store.scy.execute(
&y.data_store.qu_insert_ts_msp,
(ts_msp as i64, ts_lsp as i64),
).map(|_| Ok::<_, Error>(()))
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")));
let fut2 = y.data_store.scy.execute(
&y.data_store.qu_insert_scalar_f64,
(series.id() as i64, ts_msp as i64, ts_lsp as i64, 0i64, val),
).map(|_| Ok::<_, Error>(()))
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")));
let fut = fut1
.and_then(move |a| fut2);
if self.value_insert_futs.len()
> 10
{
warn!("can not keep up");
} else {
self.value_insert_futs
.push(
Box::pin(fut) as _
);
}
}
_ => {
error!("unexpected value type");
}
}
}
_ => {
error!("unexpected value shape");
}
}
}
_ => {
error!("got value but channel not created");
}
}
}
_ => {}
}
}
Array => {}
}
}
}
CaMsgTy::EventAddRes(k) => Self::handle_event_add_res(&mut self, k),
_ => {}
}
}