From 8351dabb729ffaee55e737dc003ce9d56a66ad91 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 9 Nov 2023 22:35:12 +0100 Subject: [PATCH] Refactor --- netfetch/src/ca/conn.rs | 234 ++++++++++++++++++++++--------------- netfetch/src/ca/connset.rs | 3 + netfetch/src/ca/proto.rs | 2 +- netfetch/src/conf.rs | 6 +- scywr/src/insertworker.rs | 4 +- 5 files changed, 148 insertions(+), 101 deletions(-) diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index e2cee03..9548944 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -1,4 +1,5 @@ use super::proto; +use super::proto::CreateChanRes; use super::ExtraInsertsConf; use crate::senderpolling::SenderPolling; use crate::throttletrace::ThrottleTrace; @@ -151,6 +152,9 @@ fn ser_instant(val: &Option, ser: S) -> Result Pin + Send>> { } struct CidStore { - next: u32, + rng: XorShift32, } impl CidStore { - fn new() -> Self { - Self { next: 0 } + fn new(seed: u32) -> Self { + Self { + rng: XorShift32::new(seed), + } + } + + fn new_from_time() -> Self { + Self::new( + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .subsec_nanos(), + ) } fn next(&mut self) -> Cid { - self.next += 1; - let ret = self.next; - Cid(ret) + Cid(self.rng.next()) } } struct SubidStore { - next: u32, + rng: XorShift32, } impl SubidStore { - fn new() -> Self { - Self { next: 0 } + fn new(seed: u32) -> Self { + Self { + rng: XorShift32::new(seed), + } } - fn next(&mut self) -> u32 { - self.next += 1; - let ret = self.next; - ret + fn new_from_time() -> Self { + Self::new( + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .subsec_nanos(), + ) + } + + fn next(&mut self) -> Subid { + Subid(self.rng.next()) } } @@ -500,10 +522,11 @@ pub struct CaConn { cid_store: CidStore, subid_store: SubidStore, channels: BTreeMap, - init_state_count: u64, cid_by_name: BTreeMap, - cid_by_subid: BTreeMap, + cid_by_subid: BTreeMap, name_by_cid: BTreeMap, + time_binners: BTreeMap, + init_state_count: u64, insert_item_queue: VecDeque, remote_addr_dbg: SocketAddrV4, local_epics_hostname: String, @@ -522,7 +545,6 @@ pub struct CaConn { ca_conn_event_out_queue: VecDeque, channel_info_query_queue: VecDeque, channel_info_query_sending: Pin>>, - time_binners: BTreeMap, thr_msg_poll: ThrottleTrace, ca_proto_stats: Arc, weird_count: usize, @@ -555,13 +577,14 @@ impl CaConn { state: CaConnState::Unconnected(Instant::now()), ticker: Self::new_self_ticker(), proto: None, - cid_store: CidStore::new(), - subid_store: SubidStore::new(), - channels: BTreeMap::new(), + cid_store: CidStore::new_from_time(), + subid_store: SubidStore::new_from_time(), init_state_count: 0, + channels: BTreeMap::new(), cid_by_name: BTreeMap::new(), cid_by_subid: BTreeMap::new(), name_by_cid: BTreeMap::new(), + time_binners: BTreeMap::new(), insert_item_queue: VecDeque::new(), remote_addr_dbg, local_epics_hostname, @@ -580,7 +603,6 @@ impl CaConn { ca_conn_event_out_queue: VecDeque::new(), channel_info_query_queue: VecDeque::new(), channel_info_query_sending: Box::pin(SenderPolling::new(channel_info_query_tx)), - time_binners: BTreeMap::new(), thr_msg_poll: ThrottleTrace::new(Duration::from_millis(10000)), ca_proto_stats, weird_count: 0, @@ -978,10 +1000,10 @@ impl CaConn { let tsnow = Instant::now(); trace2!("check_channels_alive {addr:?}", addr = &self.remote_addr_dbg); if let Some(started) = self.ioc_ping_start { - if started.elapsed() >= Duration::from_millis(4000) { + if started + Duration::from_millis(4000) < tsnow { self.stats.pong_timeout().inc(); - self.ioc_ping_start = None; warn!("pong timeout {addr:?}", addr = self.remote_addr_dbg); + self.ioc_ping_start = None; let item = CaConnEvent { ts: tsnow, value: CaConnEventValue::EchoTimeout, @@ -993,6 +1015,7 @@ impl CaConn { if self.ioc_ping_next < tsnow { if let Some(proto) = &mut self.proto { self.stats.ping_start().inc(); + info!("start ping"); self.ioc_ping_start = Some(Instant::now()); let msg = CaMsg::from_ty_ts(CaMsgTy::Echo, tsnow); proto.push_out(msg); @@ -1097,7 +1120,7 @@ impl CaConn { sid, data_type: data_type_asked, data_count: data_count as _, - subid, + subid: subid.0, }); let msg = CaMsg::from_ty_ts(ty, tsnow); let proto = self.proto.as_mut().unwrap(); @@ -1252,15 +1275,36 @@ impl CaConn { } fn handle_event_add_res(&mut self, ev: proto::EventAddRes, tsnow: Instant) -> Result<(), Error> { + let subid = Subid(ev.subid); // TODO handle subid-not-found which can also be peer error: - let cid = *self.cid_by_subid.get(&ev.subid).unwrap(); + let cid = if let Some(x) = self.cid_by_subid.get(&subid) { + *x + } else { + warn!("can not find cid for subid {subid:?}"); + // return Err(Error::with_msg_no_trace()); + return Ok(()); + }; if false { let name = self.name_by_cid(cid); info!("event {name:?} {ev:?}"); } // TODO handle not-found error: let mut series_2 = None; - let ch_s = self.channels.get_mut(&cid).unwrap(); + let ch_s = if let Some(x) = self.channels.get_mut(&cid) { + x + } else { + // TODO return better as error and let caller decide (with more structured errors) + warn!("can not find channel for {cid:?} {subid:?}"); + // TODO + // When removing a channel, keep it in "closed" btree for some time because messages can + // still arrive from all buffers. + // If we don't have it in the "closed" btree, then close connection to the IOC and count + // as logic error. + // Close connection to the IOC. Cout as logic error. + // return Err(Error::with_msg_no_trace()); + std::process::exit(1); + return Ok(()); + }; match ch_s { ChannelState::Created(_series, st) => { st.ts_alive_last = tsnow; @@ -1448,7 +1492,7 @@ impl CaConn { } } - fn check_channels_state_init(&mut self, msgs_tmp: &mut Vec) -> Result<(), Error> { + fn check_channels_state_init(&mut self, do_wake_again: &mut bool) -> Result<(), Error> { // TODO profile, efficient enough? if self.init_state_count == 0 { return Ok(()); @@ -1472,7 +1516,8 @@ impl CaConn { }), Instant::now(), ); - msgs_tmp.push(msg); + *do_wake_again = true; + self.proto.as_mut().unwrap().push_out(msg); // TODO handle not-found error: let ch_s = self.channels.get_mut(&cid).unwrap(); *ch_s = ChannelState::Creating { @@ -1494,24 +1539,13 @@ impl CaConn { use Poll::*; let mut ts1 = Instant::now(); // TODO unify with Listen state where protocol gets polled as well. - let mut msgs_tmp = Vec::new(); - self.check_channels_state_init(&mut msgs_tmp)?; + let mut do_wake_again = false; + self.check_channels_state_init(&mut do_wake_again)?; let ts2 = Instant::now(); self.stats .time_check_channels_state_init .add((ts2.duration_since(ts1) * MS as u32).as_secs()); ts1 = ts2; - let mut do_wake_again = false; - if msgs_tmp.len() > 0 { - do_wake_again = true; - } - { - let proto = self.proto.as_mut().unwrap(); - // TODO be careful to not overload outgoing message queue. - for msg in msgs_tmp { - proto.push_out(msg); - } - } let tsnow = Instant::now(); let res = match self.proto.as_mut().unwrap().poll_next_unpin(cx) { Ready(Some(Ok(k))) => { @@ -1525,62 +1559,7 @@ impl CaConn { // TODO count this unexpected case. } CaMsgTy::CreateChanRes(k) => { - // TODO handle cid-not-found which can also indicate peer error. - let cid = Cid(k.cid); - let sid = k.sid; - // TODO handle error: - let name = self.name_by_cid(cid).unwrap().to_string(); - trace3!("CreateChanRes {name:?}"); - if k.data_type > 6 { - error!("CreateChanRes with unexpected data_type {}", k.data_type); - } - let scalar_type = ScalarType::from_ca_id(k.data_type)?; - let shape = Shape::from_ca_count(k.data_count)?; - // TODO handle not-found error: - let ch_s = self.channels.get_mut(&cid).unwrap(); - let cssid = match ch_s { - ChannelState::Creating { cssid, .. } => cssid.clone(), - _ => { - // TODO handle in better way: - // Remove channel and emit notice that channel is removed with reason. - let e = Error::with_msg_no_trace("handle_peer_ready bad state"); - return Ready(Some(Err(e))); - } - }; - let created_state = CreatedState { - cssid, - cid, - sid, - data_type: k.data_type, - data_count: k.data_count, - scalar_type: scalar_type.clone(), - shape: shape.clone(), - ts_created: tsnow, - ts_alive_last: tsnow, - state: MonitoringState::FetchSeriesId, - ts_msp_last: 0, - ts_msp_grid_last: 0, - inserted_in_ts_msp: u64::MAX, - insert_item_ivl_ema: IntervalEma::new(), - item_recv_ivl_ema: IntervalEma::new(), - insert_recv_ivl_last: tsnow, - insert_next_earliest: tsnow, - muted_before: 0, - info_store_msp_last: info_store_msp_from_time(SystemTime::now()), - }; - *ch_s = ChannelState::FetchingSeriesId(created_state); - // TODO handle error in different way. Should most likely not abort. - let tx = SendSeriesLookup { - tx: self.conn_command_tx(), - }; - let query = ChannelInfoQuery { - backend: self.backend.clone(), - channel: name.clone(), - scalar_type: scalar_type.to_scylla_i32(), - shape_dims: shape.to_scylla_vec(), - tx: Box::pin(tx), - }; - self.channel_info_query_queue.push_back(query); + self.handle_create_chan_res(k, tsnow)?; do_wake_again = true; } CaMsgTy::EventAddRes(k) => { @@ -1679,6 +1658,69 @@ impl CaConn { res.map_err(|e| Error::from(e.to_string())) } + fn handle_create_chan_res(&mut self, k: CreateChanRes, tsnow: Instant) -> Result<(), Error> { + // TODO handle cid-not-found which can also indicate peer error. + let cid = Cid(k.cid); + let sid = k.sid; + let name = if let Some(x) = self.name_by_cid(cid) { + x.to_string() + } else { + return Err(Error::with_msg_no_trace(format!("no name for {cid:?}"))); + }; + trace3!("CreateChanRes {name:?}"); + if k.data_type > 6 { + error!("CreateChanRes with unexpected data_type {}", k.data_type); + } + let scalar_type = ScalarType::from_ca_id(k.data_type)?; + let shape = Shape::from_ca_count(k.data_count)?; + // TODO handle not-found error: + let ch_s = self.channels.get_mut(&cid).unwrap(); + let cssid = match ch_s { + ChannelState::Creating { cssid, .. } => cssid.clone(), + _ => { + // TODO handle in better way: + // Remove channel and emit notice that channel is removed with reason. + let e = Error::with_msg_no_trace("handle_peer_ready bad state"); + return Err(e); + } + }; + let created_state = CreatedState { + cssid, + cid, + sid, + data_type: k.data_type, + data_count: k.data_count, + scalar_type: scalar_type.clone(), + shape: shape.clone(), + ts_created: tsnow, + ts_alive_last: tsnow, + state: MonitoringState::FetchSeriesId, + ts_msp_last: 0, + ts_msp_grid_last: 0, + inserted_in_ts_msp: u64::MAX, + insert_item_ivl_ema: IntervalEma::new(), + item_recv_ivl_ema: IntervalEma::new(), + insert_recv_ivl_last: tsnow, + insert_next_earliest: tsnow, + muted_before: 0, + info_store_msp_last: info_store_msp_from_time(SystemTime::now()), + }; + *ch_s = ChannelState::FetchingSeriesId(created_state); + // TODO handle error in different way. Should most likely not abort. + let tx = SendSeriesLookup { + tx: self.conn_command_tx(), + }; + let query = ChannelInfoQuery { + backend: self.backend.clone(), + channel: name.clone(), + scalar_type: scalar_type.to_scylla_i32(), + shape_dims: shape.to_scylla_vec(), + tx: Box::pin(tx), + }; + self.channel_info_query_queue.push_back(query); + Ok(()) + } + // `?` works not in here. fn _test_control_flow(&mut self, _cx: &mut Context) -> ControlFlow>> { use ControlFlow::*; diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index 52a5a6d..6ed66b5 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -1284,6 +1284,9 @@ impl CaConnSet { if st4.updated + CHANNEL_HEALTH_TIMEOUT < tsnow { self.stats.channel_health_timeout().inc(); trace!("health timeout channel {ch:?} ~~~~~~~~~~~~~~~~~~~"); + // TODO + error!("health timeout channel {ch:?} ~~~~~~~~~~~~~~~~~~~"); + std::process::exit(1); let addr = SocketAddr::V4(*addr_v4); cmd_remove_channel.push((addr, ch.clone())); if st.health_timeout_count < 3 { diff --git a/netfetch/src/ca/proto.rs b/netfetch/src/ca/proto.rs index 2a4de75..ca7afaa 100644 --- a/netfetch/src/ca/proto.rs +++ b/netfetch/src/ca/proto.rs @@ -1026,7 +1026,7 @@ impl CaProto { tcp, remote_addr_dbg, state: CaState::StdHead, - buf: SlideBuf::new(1024 * 512), + buf: SlideBuf::new(1024 * 1024 * 4), outbuf: SlideBuf::new(1024 * 128), out: VecDeque::new(), array_truncate, diff --git a/netfetch/src/conf.rs b/netfetch/src/conf.rs index 4cfe5d3..acb932f 100644 --- a/netfetch/src/conf.rs +++ b/netfetch/src/conf.rs @@ -15,7 +15,7 @@ use tokio::io::AsyncReadExt; pub struct CaIngestOpts { backend: String, channels: PathBuf, - api_bind: Option, + api_bind: String, search: Vec, #[serde(default)] search_blacklist: Vec, @@ -52,7 +52,7 @@ impl CaIngestOpts { } pub fn api_bind(&self) -> String { - self.api_bind.clone().unwrap_or_else(|| "0.0.0.0:3011".into()) + self.api_bind.clone() } pub fn postgresql_config(&self) -> &Database { @@ -159,7 +159,7 @@ scylla: let res: Result = serde_yaml::from_slice(conf.as_bytes()); let conf = res.unwrap(); assert_eq!(conf.channels, PathBuf::from("/some/path/file.txt")); - assert_eq!(conf.api_bind, Some("0.0.0.0:3011".to_string())); + assert_eq!(&conf.api_bind, "0.0.0.0:3011"); assert_eq!(conf.search.get(0), Some(&"172.26.0.255".to_string())); assert_eq!(conf.scylla.hosts.get(1), Some(&"sf-nube-12:19042".to_string())); assert_eq!(conf.ttl_d1, Some(Duration::from_millis(1000 * (60 * 10 + 3) + 45))); diff --git a/scywr/src/insertworker.rs b/scywr/src/insertworker.rs index 08d6590..95f39fe 100644 --- a/scywr/src/insertworker.rs +++ b/scywr/src/insertworker.rs @@ -421,7 +421,9 @@ fn prepare_query_insert_futs( data_store.qu_insert_ts_msp.clone(), stats.clone(), ); - futs.push(fut); + if item_ts_local % 100000 == 7461 { + futs.push(fut); + } } #[cfg(DISABLED)] if let Some(ts_msp_grid) = item.ts_msp_grid {