From fb8184957ccf1a78885eb5510910b817629f58ab Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 4 May 2022 16:38:17 +0200 Subject: [PATCH] WIP factor into subfunctions --- netfetch/src/ca/conn.rs | 223 +++++++++++++++++++--------------------- 1 file changed, 106 insertions(+), 117 deletions(-) diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 6e2e78d..708384a 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -1,4 +1,4 @@ -use super::proto::{CaItem, CaMsg, CaMsgTy, CaProto}; +use super::proto::{self, CaItem, CaMsg, CaMsgTy, CaProto, EventAddRes}; use super::store::DataStore; use crate::bsread::ChannelDescDecoded; use crate::ca::proto::{CreateChan, EventAdd, HeadInfo, ReadNotify}; @@ -20,6 +20,8 @@ use std::time::{Duration, Instant, SystemTime}; use tokio::io::unix::AsyncFd; use tokio::net::TcpStream; +const INSERT_FUTS_MAX: usize = 10; + #[derive(Debug)] enum ChannelError { NoSuccess, @@ -141,6 +143,107 @@ impl CaConn { fn name_by_cid(&self, cid: u32) -> Option<&str> { self.name_by_cid.get(&cid).map(|x| x.as_str()) } + + fn handle_event_add_res(&mut self, ev: proto::EventAddRes) { + // TODO handle subid-not-found which can also be peer error: + let cid = *self.cid_by_subid.get(&ev.subid).unwrap(); + // TODO get rid of the string clone when I don't want the log output any longer: + let name: String = self.name_by_cid(cid).unwrap().into(); + // TODO handle not-found error: + let mut series_2 = None; + let ch_s = self.channels.get_mut(&cid).unwrap(); + match ch_s { + ChannelState::Created(st) => { + match st.state { + MonitoringState::AddingEvent(ref series) => { + let series = series.clone(); + series_2 = Some(series.clone()); + info!("Confirmation {name} is subscribed."); + // TODO get ts from faster common source: + st.state = MonitoringState::Evented( + series, + EventedState { + ts_last: Instant::now(), + }, + ); + } + MonitoringState::Evented(ref series, ref mut st) => { + series_2 = Some(series.clone()); + // TODO get ts from faster common source: + st.ts_last = Instant::now(); + } + _ => { + error!("unexpected state: EventAddRes while having {ch_s:?}"); + } + } + } + _ => { + error!("unexpected state: EventAddRes while having {ch_s:?}"); + } + } + { + let series = series_2.unwrap(); + // TODO where to actually get the time from? + let ts = SystemTime::now(); + let epoch = ts.duration_since(std::time::UNIX_EPOCH).unwrap(); + // TODO decide on better msp/lsp: random offset! + // As long as one writer is active, the msp is arbitrary. + let ts = epoch.as_secs() * 1000000000 + epoch.subsec_nanos() as u64; + let ts_msp = ts / (30 * SEC) * (30 * SEC); + let ts_lsp = ts - ts_msp; + // TODO make sure that I only accept types I expect. + use crate::ca::proto::CaDataScalarValue::*; + use crate::ca::proto::CaDataValue::*; + match ev.value { + Scalar(v) => match v { + F64(val) => match ch_s { + ChannelState::Created(st) => match st.shape { + Shape::Scalar => match st.scalar_type { + ScalarType::F64 => self.insert_scalar_f64(series, ts_msp, ts_lsp, val), + _ => { + error!("unexpected value type"); + } + }, + _ => { + error!("unexpected value shape"); + } + }, + _ => { + error!("got value but channel not created"); + } + }, + _ => {} + }, + Array => {} + } + } + } + + fn insert_scalar_f64(&mut self, series: SeriesId, ts_msp: u64, ts_lsp: u64, val: f64) { + let pulse = 0 as u64; + let y = unsafe { &*(self as *const CaConn) }; + let fut1 = y + .data_store + .scy + .execute(&y.data_store.qu_insert_ts_msp, (series.id() as i64, ts_msp as i64)) + .map(|_| Ok::<_, Error>(())) + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}"))); + let fut2 = y + .data_store + .scy + .execute( + &y.data_store.qu_insert_scalar_f64, + (series.id() as i64, ts_msp as i64, ts_lsp as i64, pulse as i64, val), + ) + .map(|_| Ok::<_, Error>(())) + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}"))); + let fut = fut1.and_then(move |_a| fut2); + if self.value_insert_futs.len() > INSERT_FUTS_MAX { + warn!("can not keep up"); + } else { + self.value_insert_futs.push(Box::pin(fut) as _); + } + } } impl Stream for CaConn { @@ -343,9 +446,7 @@ impl Stream for CaConn { byte_order: netpod::ByteOrder::LE, compression: None, }; - let y = self.as_ref(); - let y = unsafe { Pin::into_inner_unchecked(y) }; - let y = unsafe { &*(y as *const CaConn) }; + let y = unsafe { &*(&self as &Self as *const CaConn) }; let fut = y.data_store.chan_reg.get_series_id(cd).map_ok(move |series| { (cid, k.sid, k.data_type, k.data_count, series) @@ -354,119 +455,7 @@ impl Stream for CaConn { self.fut_get_series.push(Box::pin(fut) as _); do_wake_again = true; } - CaMsgTy::EventAddRes(k) => { - // TODO handle subid-not-found which can also be peer error: - let cid = *self.cid_by_subid.get(&k.subid).unwrap(); - // TODO get rid of the string clone when I don't want the log output any longer: - let name: String = self.name_by_cid(cid).unwrap().into(); - // TODO handle not-found error: - let mut series_2 = None; - let ch_s = self.channels.get_mut(&cid).unwrap(); - match ch_s { - ChannelState::Created(st) => { - match st.state { - MonitoringState::AddingEvent(ref series) => { - let series = series.clone(); - series_2 = Some(series.clone()); - info!("Confirmation {name} is subscribed."); - // TODO get ts from faster common source: - st.state = MonitoringState::Evented( - series, - EventedState { - ts_last: Instant::now(), - }, - ); - } - MonitoringState::Evented(ref series, ref mut st) => { - series_2 = Some(series.clone()); - // TODO get ts from faster common source: - st.ts_last = Instant::now(); - } - _ => { - error!( - "unexpected state: EventAddRes while having {ch_s:?}" - ); - } - } - } - _ => { - error!("unexpected state: EventAddRes while having {ch_s:?}"); - } - } - { - let series = series_2.unwrap(); - // TODO where to actually get the time from? - let ts = SystemTime::now(); - let epoch = ts.duration_since(std::time::UNIX_EPOCH).unwrap(); - let ts = epoch.as_secs() * 1000000000 + epoch.subsec_nanos() as u64; - let ts_msp = ts / (30 * SEC) * (30 * SEC); - let ts_lsp = ts - ts_msp; - // TODO make sure that I only accept types I expect. - use crate::ca::proto::CaDataScalarValue::*; - use crate::ca::proto::CaDataValue::*; - match k.value { - Scalar(v) => { - match v { - F64(val) => { - match ch_s { - ChannelState::Created(st) => { - match st.shape { - Shape::Scalar => { - match st.scalar_type { - ScalarType::F64 => { - // TODO get msp/lsp: cap at some count. only one writer!! - let y = self.as_ref(); - let y = unsafe { - Pin::into_inner_unchecked(y) - }; - let y = unsafe { - &*(y as *const CaConn) - }; - let fut1 = y.data_store.scy.execute( - &y.data_store.qu_insert_ts_msp, - (ts_msp as i64, ts_lsp as i64), - ).map(|_| Ok::<_, Error>(())) - .map_err(|e| Error::with_msg_no_trace(format!("{e:?}"))); - let fut2 = y.data_store.scy.execute( - &y.data_store.qu_insert_scalar_f64, - (series.id() as i64, ts_msp as i64, ts_lsp as i64, 0i64, val), - ).map(|_| Ok::<_, Error>(())) - .map_err(|e| Error::with_msg_no_trace(format!("{e:?}"))); - let fut = fut1 - .and_then(move |a| fut2); - if self.value_insert_futs.len() - > 10 - { - warn!("can not keep up"); - } else { - self.value_insert_futs - .push( - Box::pin(fut) as _ - ); - } - } - _ => { - error!("unexpected value type"); - } - } - } - _ => { - error!("unexpected value shape"); - } - } - } - _ => { - error!("got value but channel not created"); - } - } - } - _ => {} - } - } - Array => {} - } - } - } + CaMsgTy::EventAddRes(k) => Self::handle_event_add_res(&mut self, k), _ => {} } }