From 399dfdb4bfe87808f1ca43a9b13669127e107ac7 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 31 Aug 2022 17:03:23 +0200 Subject: [PATCH] Change stop behavior --- netfetch/src/ca.rs | 190 ++++++++++--- netfetch/src/ca/conn.rs | 579 +++++++++++++++++++++++++-------------- netfetch/src/ca/proto.rs | 32 ++- netfetch/src/series.rs | 10 + netfetch/src/store.rs | 72 ++++- stats/src/stats.rs | 12 +- 6 files changed, 641 insertions(+), 254 deletions(-) diff --git a/netfetch/src/ca.rs b/netfetch/src/ca.rs index d5c5714..db8d1be 100644 --- a/netfetch/src/ca.rs +++ b/netfetch/src/ca.rs @@ -11,16 +11,20 @@ use crate::store::{CommonInsertItemQueue, QueryItem}; use async_channel::Sender; use conn::CaConn; use err::Error; -use futures_util::StreamExt; +use futures_util::future::Fuse; +use futures_util::stream::FuturesUnordered; +use futures_util::{FutureExt, StreamExt}; use log::*; use netpod::{Database, ScyllaConfig}; use scylla::batch::Consistency; use serde::{Deserialize, Serialize}; use stats::{CaConnStats, CaConnStatsAgg, CaConnStatsAggDiff}; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, VecDeque}; +use std::ffi::CStr; +use std::mem::MaybeUninit; use std::net::{Ipv4Addr, SocketAddrV4}; use std::path::PathBuf; -use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; use std::sync::{Arc, Mutex, Once}; use std::time::Duration; use tokio::fs::OpenOptions; @@ -140,8 +144,9 @@ async fn spawn_scylla_insert_workers( insert_frac: Arc, pg_client: Arc, store_stats: Arc, -) -> Result<(), Error> { - let mut data_stores = vec![]; +) -> Result>, Error> { + let mut jhs = Vec::new(); + let mut data_stores = Vec::new(); for _ in 0..insert_scylla_sessions { let scy = scylla::SessionBuilder::new() .known_nodes(&scyconf.hosts) @@ -181,8 +186,23 @@ async fn spawn_scylla_insert_workers( } } } - QueryItem::ChannelStatus(_item) => { - // TODO + QueryItem::ChannelStatus(item) => { + match crate::store::insert_channel_status(item, &data_store, &stats).await { + Ok(_) => { + stats.store_worker_item_insert_inc(); + } + Err(e) => { + stats.store_worker_item_error_inc(); + // TODO introduce more structured error variants. + if e.msg().contains("WriteTimeout") { + tokio::time::sleep(Duration::from_millis(100)).await; + } else { + // TODO back off but continue. + error!("insert worker sees error: {e:?}"); + break; + } + } + } } QueryItem::Insert(item) => { stats.store_worker_item_recv_inc(); @@ -211,8 +231,8 @@ async fn spawn_scylla_insert_workers( } QueryItem::Mute(item) => { let values = ( - (item.series & 0xff) as i32, - item.series as i64, + (item.series.id() & 0xff) as i32, + item.series.id() as i64, item.ts as i64, item.ema, item.emd, @@ -233,8 +253,8 @@ async fn spawn_scylla_insert_workers( } QueryItem::Ivl(item) => { let values = ( - (item.series & 0xff) as i32, - item.series as i64, + (item.series.id() & 0xff) as i32, + item.series.id() as i64, item.ts as i64, item.ema, item.emd, @@ -255,10 +275,12 @@ async fn spawn_scylla_insert_workers( } } } + info!("insert worker has no more messages"); }; - tokio::spawn(fut); + let jh = tokio::spawn(fut); + jhs.push(jh); } - Ok(()) + Ok(jhs) } pub struct CommandQueueSet { @@ -394,7 +416,56 @@ pub async fn create_ca_conn( Ok(jh) } +pub static SIGINT: AtomicU32 = AtomicU32::new(0); + +fn handler_sigaction(_a: libc::c_int, _b: *const libc::siginfo_t, _c: *const libc::c_void) { + SIGINT.store(1, Ordering::Release); + let _ = unset_signal_handler(); +} + +fn set_signal_handler() -> Result<(), Error> { + let mask: libc::sigset_t = unsafe { MaybeUninit::zeroed().assume_init() }; + let handler: libc::sighandler_t = handler_sigaction as *const libc::c_void as _; + let act = libc::sigaction { + sa_sigaction: handler, + sa_mask: mask, + sa_flags: 0, + sa_restorer: None, + }; + unsafe { + let ec = libc::sigaction(libc::SIGINT, &act, std::ptr::null_mut()); + if ec != 0 { + let errno = *libc::__errno_location(); + let msg = CStr::from_ptr(libc::strerror(errno)); + error!("error: {:?}", msg); + return Err(Error::with_msg_no_trace(format!("can not set signal handler"))); + } + } + Ok(()) +} + +fn unset_signal_handler() -> Result<(), Error> { + let mask: libc::sigset_t = unsafe { MaybeUninit::zeroed().assume_init() }; + let act = libc::sigaction { + sa_sigaction: libc::SIG_DFL, + sa_mask: mask, + sa_flags: 0, + sa_restorer: None, + }; + unsafe { + let ec = libc::sigaction(libc::SIGINT, &act, std::ptr::null_mut()); + if ec != 0 { + let errno = *libc::__errno_location(); + let msg = CStr::from_ptr(libc::strerror(errno)); + error!("error: {:?}", msg); + return Err(Error::with_msg_no_trace(format!("can not set signal handler"))); + } + } + Ok(()) +} + pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { + set_signal_handler()?; let insert_frac = Arc::new(AtomicU64::new(1000)); let insert_ivl_min = Arc::new(AtomicU64::new(8800)); let opts = parse_config(opts.config).await?; @@ -457,7 +528,7 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { let insert_item_queue = Arc::new(insert_item_queue); // TODO use a new stats struct let store_stats = Arc::new(CaConnStats::new()); - spawn_scylla_insert_workers( + let jh_insert_workers = spawn_scylla_insert_workers( opts.scyconf.clone(), opts.insert_scylla_sessions, opts.insert_worker_count, @@ -483,12 +554,14 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { }; let ingest_commons = Arc::new(ingest_commons); - tokio::spawn(crate::metrics::start_metrics_service( - opts.api_bind.clone(), - insert_frac.clone(), - insert_ivl_min.clone(), - ingest_commons.clone(), - )); + if true { + tokio::spawn(crate::metrics::start_metrics_service( + opts.api_bind.clone(), + insert_frac.clone(), + insert_ivl_min.clone(), + ingest_commons.clone(), + )); + } let metrics_agg_fut = { let conn_stats = conn_stats.clone(); @@ -521,7 +594,7 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { let mut chns_todo = &opts.channels[..]; let mut chstmp = ["__NONE__"; 8]; let mut ix = 0; - while chns_todo.len() > 0 { + while chns_todo.len() > 0 && SIGINT.load(Ordering::Acquire) == 0 { if false { for (s1, s2) in chns_todo.iter().zip(chstmp.iter_mut()) { *s2 = s1; @@ -551,10 +624,7 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { // TODO the address was searched before but could not be found. } else { let addr: SocketAddrV4 = match addr.parse() { - Ok(k) => { - local_stats.ioc_lookup_inc(); - k - } + Ok(k) => k, Err(e) => { error!("can not parse {addr:?} for channel {ch:?} {e:?}"); continue; @@ -642,19 +712,77 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { } info!("channels_by_host len {}", channels_by_host.len()); - for jh in conn_jhs { - match jh.await { - Ok(k) => match k { - Ok(_) => {} + let mut conn_jhs: VecDeque>>> = + conn_jhs.into_iter().map(|jh| jh.fuse()).collect(); + let mut sent_stop_commands = false; + loop { + if SIGINT.load(Ordering::Acquire) != 0 { + let receiver = insert_item_queue.receiver(); + info!( + "item queue AGAIN senders {} receivers {}", + receiver.sender_count(), + receiver.receiver_count() + ); + info!("Stopping"); + if !sent_stop_commands { + sent_stop_commands = true; + info!("sending stop command"); + let queues = command_queue_set.queues_locked().await; + for q in queues.iter() { + let (cmd, _rx) = ConnCommand::shutdown(); + let _ = q.1.send(cmd).await; + } + } + } + let mut jh = if let Some(x) = conn_jhs.pop_front() { + x + } else { + break; + }; + futures_util::select! { + a = jh => match a { + Ok(k) => match k { + Ok(_) => { + info!("joined"); + } + Err(e) => { + error!("{e:?}"); + } + }, Err(e) => { error!("{e:?}"); } }, + _b = tokio::time::sleep(Duration::from_millis(1000)).fuse() => { + conn_jhs.push_back(jh); + } + }; + } + info!("all connections done."); + + drop(ingest_commons); + metrics_agg_jh.abort(); + drop(metrics_agg_jh); + + let receiver = insert_item_queue.receiver(); + drop(insert_item_queue); + info!( + "item queue AGAIN senders {} receivers {}", + receiver.sender_count(), + receiver.receiver_count() + ); + + let mut futs = FuturesUnordered::from_iter(jh_insert_workers); + while let Some(x) = futs.next().await { + match x { + Ok(_) => { + info!("insert worker done"); + } Err(e) => { - error!("{e:?}"); + error!("error on shutdown: {e:?}"); } } } - metrics_agg_jh.await.unwrap(); + info!("all insert workers done."); Ok(()) } diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index ce625e5..16b32bf 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -5,7 +5,8 @@ use crate::ca::proto::{CreateChan, EventAdd}; use crate::ca::store::ChannelRegistry; use crate::series::{Existence, SeriesId}; use crate::store::{ - CommonInsertItemQueueSender, ConnectionStatus, ConnectionStatusItem, InsertItem, IvlItem, MuteItem, QueryItem, + ChannelStatus, ChannelStatusItem, CommonInsertItemQueueSender, ConnectionStatus, ConnectionStatusItem, InsertItem, + IvlItem, MuteItem, QueryItem, }; use async_channel::Sender; use err::Error; @@ -18,6 +19,7 @@ use serde::Serialize; use stats::{CaConnStats, IntervalEma}; use std::collections::{BTreeMap, VecDeque}; use std::net::{Ipv4Addr, SocketAddrV4}; +use std::ops::ControlFlow; use std::pin::Pin; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; @@ -77,6 +79,9 @@ fn ser_instant(val: &Option, ser: S) -> Result, } #[allow(unused)] #[derive(Clone, Debug)] enum ChannelState { Init, - Creating { cid: u32, ts_beg: Instant }, + Creating { cid: Cid, ts_beg: Instant }, Created(CreatedState), Error(ChannelError), } @@ -185,6 +184,11 @@ impl ChannelState { } } +#[allow(unused)] +struct ChannelsStates { + channels: BTreeMap, +} + enum CaConnState { Unconnected, Connecting( @@ -195,6 +199,7 @@ enum CaConnState { Listen, PeerReady, Wait(Pin + Send>>), + Shutdown, } fn wait_fut(dt: u64) -> Pin + Send>> { @@ -202,11 +207,27 @@ fn wait_fut(dt: u64) -> Pin + Send>> { Box::pin(fut) } -struct IdStore { +struct CidStore { next: u32, } -impl IdStore { +impl CidStore { + fn new() -> Self { + Self { next: 0 } + } + + fn next(&mut self) -> Cid { + self.next += 1; + let ret = self.next; + Cid(ret) + } +} + +struct SubidStore { + next: u32, +} + +impl SubidStore { fn new() -> Self { Self { next: 0 } } @@ -225,6 +246,7 @@ pub enum ConnCommandKind { ChannelStatesAll((), Sender<(SocketAddrV4, Vec)>), ChannelAdd(String, Sender), ChannelRemove(String, Sender), + Shutdown(Sender), } #[derive(Debug)] @@ -280,34 +302,40 @@ impl ConnCommand { }; (cmd, rx) } + + pub fn shutdown() -> (ConnCommand, async_channel::Receiver) { + let (tx, rx) = async_channel::bounded(1); + let cmd = Self { + kind: ConnCommandKind::Shutdown(tx), + }; + (cmd, rx) + } } -#[allow(unused)] pub struct CaConn { state: CaConnState, + shutdown: bool, proto: Option, - cid_store: IdStore, - ioid_store: IdStore, - subid_store: IdStore, - // TODO use a Cid or so instead of u32. - channels: BTreeMap, + cid_store: CidStore, + subid_store: SubidStore, + channels: BTreeMap, init_state_count: u64, - cid_by_name: BTreeMap, - cid_by_subid: BTreeMap, - name_by_cid: BTreeMap, - poll_count: usize, + cid_by_name: BTreeMap, + cid_by_subid: BTreeMap, + name_by_cid: BTreeMap, data_store: Arc, insert_item_queue: VecDeque, insert_item_sender: CommonInsertItemQueueSender, insert_item_send_fut: Option>, fut_get_series: - FuturesOrdered), Error>> + Send>>>, + FuturesOrdered), Error>> + Send>>>, remote_addr_dbg: SocketAddrV4, local_epics_hostname: String, array_truncate: usize, stats: Arc, insert_queue_max: usize, insert_ivl_min: Arc, + ts_channel_alive_check_last: Instant, conn_command_tx: async_channel::Sender, conn_command_rx: async_channel::Receiver, conn_backoff: f32, @@ -327,16 +355,15 @@ impl CaConn { let (cq_tx, cq_rx) = async_channel::bounded(32); Self { state: CaConnState::Unconnected, + shutdown: false, proto: None, - cid_store: IdStore::new(), - ioid_store: IdStore::new(), - subid_store: IdStore::new(), + cid_store: CidStore::new(), + subid_store: SubidStore::new(), channels: BTreeMap::new(), init_state_count: 0, cid_by_name: BTreeMap::new(), cid_by_subid: BTreeMap::new(), name_by_cid: BTreeMap::new(), - poll_count: 0, data_store, insert_item_queue: VecDeque::new(), insert_item_sender, @@ -348,6 +375,7 @@ impl CaConn { stats: Arc::new(CaConnStats::new()), insert_queue_max, insert_ivl_min, + ts_channel_alive_check_last: Instant::now(), conn_command_tx: cq_tx, conn_command_rx: cq_rx, conn_backoff: 0.02, @@ -363,6 +391,7 @@ impl CaConn { // TODO if this loops for too long time, yield and make sure we get wake up again. use Poll::*; loop { + self.stats.caconn_loop3_count_inc(); match self.conn_command_rx.poll_next_unpin(cx) { Ready(Some(a)) => match a.kind { ConnCommandKind::FindChannel(pattern, tx) => { @@ -431,7 +460,6 @@ impl CaConn { } } ConnCommandKind::ChannelRemove(name, tx) => { - info!("remove {}", name); self.channel_remove(name); match tx.try_send(true) { Ok(_) => {} @@ -440,6 +468,18 @@ impl CaConn { } } } + ConnCommandKind::Shutdown(tx) => { + self.shutdown = true; + let _ = self.before_reset_of_channel_state(); + self.state = CaConnState::Shutdown; + self.proto = None; + match tx.try_send(true) { + Ok(_) => {} + Err(_) => { + //error!("response channel full or closed"); + } + } + } }, Ready(None) => { error!("Command queue closed"); @@ -472,7 +512,7 @@ impl CaConn { } } - fn cid_by_name(&mut self, name: &str) -> u32 { + fn cid_by_name(&mut self, name: &str) -> Cid { if let Some(cid) = self.cid_by_name.get(name) { *cid } else { @@ -483,7 +523,7 @@ impl CaConn { } } - fn name_by_cid(&self, cid: u32) -> Option<&str> { + fn name_by_cid(&self, cid: Cid) -> Option<&str> { self.name_by_cid.get(&cid).map(|x| x.as_str()) } @@ -497,9 +537,39 @@ impl CaConn { self.conn_backoff = self.conn_backoff_beg; } + fn before_reset_of_channel_state(&mut self) -> Result<(), Error> { + warn!("before_reset_of_channel_state channels {}", self.channels.len()); + let mut created = 0; + for (_cid, chst) in &self.channels { + match chst { + ChannelState::Created(st) => { + if let Some(series) = &st.series { + let item = QueryItem::ChannelStatus(ChannelStatusItem { + ts: SystemTime::now(), + series: series.clone(), + status: ChannelStatus::Closed, + }); + if created < 20 { + //info!("store {:?}", item); + } + self.insert_item_queue.push_back(item); + } else { + if created < 20 { + //info!("no series for cid {:?}", st.cid); + } + } + created += 1; + } + _ => (), + } + } + Ok(()) + } + fn handle_insert_futs(&mut self, cx: &mut Context) -> Poll> { use Poll::*; loop { + self.stats.caconn_loop4_count_inc(); match self.insert_item_send_fut.as_mut() { Some(fut) => match fut.poll_unpin(cx) { Ready(Ok(())) => { @@ -534,15 +604,50 @@ impl CaConn { } } + fn check_channels_alive(&mut self) -> Result<(), Error> { + let tsnow = Instant::now(); + let mut alive_count = 0; + let mut not_alive_count = 0; + for (_, st) in &self.channels { + match st { + ChannelState::Creating { cid, ts_beg } => { + if tsnow.duration_since(*ts_beg) >= Duration::from_millis(10000) { + let name = self.name_by_cid.get(cid); + warn!("channel Creating timed out {} {:?}", cid.0, name); + } + } + ChannelState::Created(st) => { + if tsnow.duration_since(st.ts_alive_last) >= Duration::from_millis(10000) { + not_alive_count += 1; + } else { + alive_count += 1; + } + } + _ => {} + } + } + self.stats + .channel_all_count + .store(self.channels.len() as _, Ordering::Release); + self.stats + .channel_alive_count + .store(alive_count as _, Ordering::Release); + self.stats + .channel_not_alive_count + .store(not_alive_count as _, Ordering::Release); + Ok(()) + } + fn channel_to_evented( &mut self, - cid: u32, + cid: Cid, sid: u32, data_type: u16, data_count: u16, series: Existence, cx: &mut Context, ) -> Result<(), Error> { + let tsnow = Instant::now(); self.stats.get_series_id_ok_inc(); let series = match series { Existence::Created(k) => k, @@ -577,16 +682,18 @@ impl CaConn { // TODO handle error better! Transition channel to Error state? scalar_type: ScalarType::from_ca_id(data_type)?, shape: Shape::from_ca_count(data_count)?, - ts_created: Instant::now(), - state: MonitoringState::AddingEvent(series), + ts_created: tsnow, + ts_alive_last: tsnow, + state: MonitoringState::AddingEvent(series.clone()), ts_msp_last: 0, ts_msp_grid_last: 0, inserted_in_ts_msp: u64::MAX, insert_item_ivl_ema: IntervalEma::new(), item_recv_ivl_ema: IntervalEma::new(), - insert_recv_ivl_last: Instant::now(), - insert_next_earliest: Instant::now(), + insert_recv_ivl_last: tsnow, + insert_next_earliest: tsnow, muted_before: 0, + series: Some(series), }); let scalar_type = ScalarType::from_ca_id(data_type)?; let shape = Shape::from_ca_count(data_count)?; @@ -608,6 +715,15 @@ impl CaConn { while self.fut_get_series.len() > 0 { match self.fut_get_series.poll_next_unpin(cx) { Ready(Some(Ok((cid, sid, data_type, data_count, series)))) => { + { + let series = series.clone().into_inner(); + let item = QueryItem::ChannelStatus(ChannelStatusItem { + ts: SystemTime::now(), + series: series, + status: ChannelStatus::Opened, + }); + self.insert_item_queue.push_back(item); + } match self.channel_to_evented(cid, sid, data_type, data_count, series, cx) { Ok(_) => {} Err(e) => { @@ -653,7 +769,7 @@ impl CaConn { }; let ts_lsp = ts - ts_msp; let item = InsertItem { - series: series.id(), + series, ts_msp, ts_lsp, msp_bump: ts_msp_changed, @@ -719,13 +835,13 @@ impl CaConn { fn handle_event_add_res(&mut self, ev: proto::EventAddRes, tsnow: Instant) -> Result<(), Error> { // TODO handle subid-not-found which can also be peer error: let cid = *self.cid_by_subid.get(&ev.subid).unwrap(); - let _name = self.name_by_cid(cid).unwrap().to_string(); // TODO get rid of the string clone when I don't want the log output any longer: // TODO handle not-found error: let mut series_2 = None; let ch_s = self.channels.get_mut(&cid).unwrap(); match ch_s { ChannelState::Created(st) => { + st.ts_alive_last = tsnow; st.item_recv_ivl_ema.tick(tsnow); let scalar_type = st.scalar_type.clone(); let shape = st.shape.clone(); @@ -790,7 +906,7 @@ impl CaConn { st.insert_recv_ivl_last = tsnow; let ema = st.insert_item_ivl_ema.ema(); let item = IvlItem { - series: series.id(), + series: series.clone(), ts, ema: ema.ema(), emd: ema.emv().sqrt(), @@ -800,7 +916,7 @@ impl CaConn { if false && st.muted_before == 0 { let ema = st.insert_item_ivl_ema.ema(); let item = MuteItem { - series: series.id(), + series, ts, ema: ema.ema(), emd: ema.emv().sqrt(), @@ -811,6 +927,7 @@ impl CaConn { } } _ => { + // TODO count instead of print error!("unexpected state: EventAddRes while having {ch_s:?}"); } } @@ -870,7 +987,7 @@ impl CaConn { if self.init_state_count == 0 { return Ok(()); } - let keys: Vec = self.channels.keys().map(|x| *x).collect(); + let keys: Vec = self.channels.keys().map(|x| *x).collect(); for cid in keys { match self.channels.get_mut(&cid).unwrap() { ChannelState::Init => { @@ -881,10 +998,9 @@ impl CaConn { Ok(k) => k, Err(e) => return Err(e), }; - debug!("Sending CreateChan for {}", name); let msg = CaMsg { ty: CaMsgTy::CreateChan(CreateChan { - cid, + cid: cid.0, channel: name.into(), }), }; @@ -918,7 +1034,6 @@ impl CaConn { ts1 = ts2; let mut do_wake_again = false; if msgs_tmp.len() > 0 { - //info!("msgs_tmp.len() {}", msgs_tmp.len()); do_wake_again = true; } { @@ -937,11 +1052,12 @@ impl CaConn { CaMsgTy::SearchRes(k) => { let a = k.addr.to_be_bytes(); let addr = format!("{}.{}.{}.{}:{}", a[0], a[1], a[2], a[3], k.tcp_port); - info!("Search result indicates server address: {addr}"); + trace!("Search result indicates server address: {addr}"); + // TODO count this unexpected case. } CaMsgTy::CreateChanRes(k) => { // TODO handle cid-not-found which can also indicate peer error. - let cid = k.cid; + let cid = Cid(k.cid); let sid = k.sid; // TODO handle error: let name = self.name_by_cid(cid).unwrap().to_string(); @@ -962,6 +1078,7 @@ impl CaConn { scalar_type: scalar_type.clone(), shape: shape.clone(), ts_created: tsnow, + ts_alive_last: tsnow, state: MonitoringState::FetchSeriesId, ts_msp_last: 0, ts_msp_grid_last: 0, @@ -971,6 +1088,7 @@ impl CaConn { insert_recv_ivl_last: tsnow, insert_next_earliest: tsnow, muted_before: 0, + series: None, }); // TODO handle error in different way. Should most likely not abort. let cd = ChannelDescDecoded { @@ -989,7 +1107,7 @@ impl CaConn { .get_series_id(cd) .map_ok(move |series| (cid, k.sid, k.data_type, k.data_count, series)); // TODO throttle execution rate: - self.fut_get_series.push(Box::pin(fut) as _); + self.fut_get_series.push_back(Box::pin(fut) as _); do_wake_again = true; } CaMsgTy::EventAddRes(k) => { @@ -1002,7 +1120,13 @@ impl CaConn { let _ = ts1; res? } - _ => {} + CaMsgTy::Error(e) => { + warn!("channel access error message {e:?}"); + } + CaMsgTy::AccessRightsRes(_) => {} + k => { + warn!("unexpected ca cmd {k:?}"); + } } } _ => {} @@ -1010,14 +1134,27 @@ impl CaConn { Ready(Some(Ok(()))) } Ready(Some(Err(e))) => { - error!("CaProto yields error: {e:?}"); + error!("CaProto yields error: {e:?} remote {:?}", self.remote_addr_dbg); + // TODO unify this handling with the block below + let reset_res = self.before_reset_of_channel_state(); + self.state = CaConnState::Wait(wait_fut(self.backoff_next())); + self.proto = None; + if let Err(e) = reset_res { + error!("can not destruct channel state before reset {e:?}"); + } Ready(Some(Err(e))) } Ready(None) => { warn!("handle_peer_ready CaProto is done {:?}", self.remote_addr_dbg); + let reset_res = self.before_reset_of_channel_state(); self.state = CaConnState::Wait(wait_fut(self.backoff_next())); self.proto = None; - Ready(None) + if let Err(e) = reset_res { + error!("can not destruct channel state before reset {e:?}"); + Ready(Some(Err(e))) + } else { + Ready(None) + } } Pending => Pending, }; @@ -1029,7 +1166,152 @@ impl CaConn { res } - //fn loop_inner(&mut self, cx: &mut Context) + // `?` works not in here. + fn _test_control_flow(&mut self, _cx: &mut Context) -> ControlFlow>> { + use ControlFlow::*; + use Poll::*; + let e = Error::with_msg_no_trace(format!("test")); + //Err(e)?; + let _ = e; + Break(Pending) + } + + // `?` works not in here. + fn handle_conn_state(&mut self, cx: &mut Context) -> Option>> { + use Poll::*; + match &mut self.state { + CaConnState::Unconnected => { + let addr = self.remote_addr_dbg.clone(); + trace!("create tcp connection to {:?}", (addr.ip(), addr.port())); + let fut = tokio::time::timeout(Duration::from_millis(500), TcpStream::connect(addr)); + self.state = CaConnState::Connecting(addr, Box::pin(fut)); + None + } + CaConnState::Connecting(ref addr, ref mut fut) => { + match fut.poll_unpin(cx) { + Ready(connect_result) => { + match connect_result { + Ok(Ok(tcp)) => { + let addr = addr.clone(); + self.insert_item_queue + .push_back(QueryItem::ConnectionStatus(ConnectionStatusItem { + ts: SystemTime::now(), + addr, + status: ConnectionStatus::Established, + })); + self.backoff_reset(); + let proto = CaProto::new(tcp, self.remote_addr_dbg.clone(), self.array_truncate); + self.state = CaConnState::Init; + self.proto = Some(proto); + None + } + Ok(Err(e)) => { + // TODO log with exponential backoff + // 172.26.24.118:2072 + const ADDR2: Ipv4Addr = Ipv4Addr::new(172, 26, 24, 118); + if addr.ip() == &ADDR2 && addr.port() == 2072 { + warn!("error during connect to {addr:?} {e:?}"); + } + let addr = addr.clone(); + self.insert_item_queue + .push_back(QueryItem::ConnectionStatus(ConnectionStatusItem { + ts: SystemTime::now(), + addr, + status: ConnectionStatus::ConnectError, + })); + let dt = self.backoff_next(); + self.state = CaConnState::Wait(wait_fut(dt)); + self.proto = None; + None + } + Err(e) => { + // TODO log with exponential backoff + trace!("timeout during connect to {addr:?} {e:?}"); + let addr = addr.clone(); + self.insert_item_queue + .push_back(QueryItem::ConnectionStatus(ConnectionStatusItem { + ts: SystemTime::now(), + addr, + status: ConnectionStatus::ConnectTimeout, + })); + let dt = self.backoff_next(); + self.state = CaConnState::Wait(wait_fut(dt)); + self.proto = None; + None + } + } + } + Pending => Some(Pending), + } + } + CaConnState::Init => { + let hostname = self.local_epics_hostname.clone(); + let proto = self.proto.as_mut().unwrap(); + let msg = CaMsg { ty: CaMsgTy::Version }; + proto.push_out(msg); + let msg = CaMsg { + ty: CaMsgTy::ClientName, + }; + proto.push_out(msg); + let msg = CaMsg { + ty: CaMsgTy::HostName(hostname), + }; + proto.push_out(msg); + self.state = CaConnState::Listen; + None + } + CaConnState::Listen => match { + let res = self.handle_conn_listen(cx); + res + } { + Ready(Some(Ok(()))) => Some(Ready(Ok(()))), + Ready(Some(Err(e))) => Some(Ready(Err(e))), + Ready(None) => None, + Pending => Some(Pending), + }, + CaConnState::PeerReady => { + { + // TODO can I move this block somewhere else? + match self.handle_get_series_futs(cx) { + Ready(Ok(_)) => (), + Ready(Err(e)) => return Some(Ready(Err(e))), + Pending => (), + } + } + let res = self.handle_peer_ready(cx); + match res { + Ready(Some(Ok(()))) => None, + Ready(Some(Err(e))) => Some(Ready(Err(e))), + Ready(None) => None, + Pending => Some(Pending), + } + } + CaConnState::Wait(inst) => match inst.poll_unpin(cx) { + Ready(_) => { + self.state = CaConnState::Unconnected; + self.proto = None; + None + } + Pending => Some(Pending), + }, + CaConnState::Shutdown => None, + } + } + + fn loop_inner(&mut self, cx: &mut Context) -> Option>> { + loop { + self.stats.caconn_loop2_count_inc(); + if let Some(v) = self.handle_conn_state(cx) { + break Some(v); + } + if self.insert_item_queue.len() >= self.insert_queue_max { + break None; + } + if self.shutdown { + break None; + } + } + } } impl Stream for CaConn { @@ -1037,174 +1319,57 @@ impl Stream for CaConn { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - let ts_outer_1 = Instant::now(); - let mut ts1 = ts_outer_1; - self.poll_count += 1; - // TODO factor out the inner loop: - let ret = 'outer: loop { - self.handle_conn_command(cx); + self.stats.caconn_poll_count_inc(); + let tsnow = Instant::now(); + if tsnow.duration_since(self.ts_channel_alive_check_last) >= Duration::from_millis(4000) { + self.ts_channel_alive_check_last = tsnow; + if let Err(e) = self.check_channels_alive() { + error!("check_dead_channels {e:?}"); + } + } + if self.shutdown { + info!("CaConn poll"); + } + let ret = loop { + if self.shutdown { + info!("CaConn loop 1"); + } + self.stats.caconn_loop1_count_inc(); + if !self.shutdown { + self.handle_conn_command(cx); + } let q = self.handle_insert_futs(cx); - let ts2 = Instant::now(); - self.stats - .poll_time_handle_insert_futs - .fetch_add((ts2.duration_since(ts1) * MS as u32).as_secs(), Ordering::AcqRel); - ts1 = ts2; match q { Ready(_) => {} Pending => break Pending, } - + if self.shutdown { + if self.insert_item_queue.len() == 0 { + info!("no more items to flush"); + break Ready(Ok(())); + } else { + info!("more items {}", self.insert_item_queue.len()); + } + } if self.insert_item_queue.len() >= self.insert_queue_max { break Pending; } - - break loop { - break match &mut self.state { - CaConnState::Unconnected => { - let addr = self.remote_addr_dbg.clone(); - trace!("create tcp connection to {:?}", (addr.ip(), addr.port())); - let fut = async move { - tokio::time::timeout(Duration::from_millis(500), TcpStream::connect(addr)).await - }; - self.state = CaConnState::Connecting(addr, Box::pin(fut)); - continue 'outer; - } - CaConnState::Connecting(ref addr, ref mut fut) => { - match fut.poll_unpin(cx) { - Ready(connect_result) => { - match connect_result { - Ok(Ok(tcp)) => { - let addr = addr.clone(); - self.insert_item_queue.push_back(QueryItem::ConnectionStatus( - ConnectionStatusItem { - ts: SystemTime::now(), - addr, - status: ConnectionStatus::Established, - }, - )); - self.backoff_reset(); - let proto = - CaProto::new(tcp, self.remote_addr_dbg.clone(), self.array_truncate); - self.state = CaConnState::Init; - self.proto = Some(proto); - continue 'outer; - } - Ok(Err(e)) => { - // TODO log with exponential backoff - // 172.26.24.118:2072 - const ADDR2: Ipv4Addr = Ipv4Addr::new(172, 26, 24, 118); - if addr.ip() == &ADDR2 && addr.port() == 2072 { - warn!("error during connect to {addr:?} {e:?}"); - } - let addr = addr.clone(); - self.insert_item_queue.push_back(QueryItem::ConnectionStatus( - ConnectionStatusItem { - ts: SystemTime::now(), - addr, - status: ConnectionStatus::ConnectError, - }, - )); - let dt = self.backoff_next(); - self.state = CaConnState::Wait(wait_fut(dt)); - self.proto = None; - continue 'outer; - } - Err(e) => { - // TODO log with exponential backoff - trace!("timeout during connect to {addr:?} {e:?}"); - let addr = addr.clone(); - self.insert_item_queue.push_back(QueryItem::ConnectionStatus( - ConnectionStatusItem { - ts: SystemTime::now(), - addr, - status: ConnectionStatus::ConnectTimeout, - }, - )); - let dt = self.backoff_next(); - self.state = CaConnState::Wait(wait_fut(dt)); - self.proto = None; - continue 'outer; - } - } - } - Pending => Pending, - } - } - CaConnState::Init => { - let hostname = self.local_epics_hostname.clone(); - let proto = self.proto.as_mut().unwrap(); - let msg = CaMsg { ty: CaMsgTy::Version }; - proto.push_out(msg); - let msg = CaMsg { - ty: CaMsgTy::ClientName, - }; - proto.push_out(msg); - let msg = CaMsg { - ty: CaMsgTy::HostName(hostname), - }; - proto.push_out(msg); - self.state = CaConnState::Listen; - continue 'outer; - } - CaConnState::Listen => match { - let res = self.handle_conn_listen(cx); - let ts2 = Instant::now(); - self.stats - .time_handle_conn_listen - .fetch_add((ts2.duration_since(ts1) * MS as u32).as_secs(), Ordering::AcqRel); - ts1 = ts2; - res - } { - Ready(Some(Ok(()))) => Ready(Some(Ok(()))), - Ready(Some(Err(e))) => Ready(Some(Err(e))), - Ready(None) => continue 'outer, - Pending => Pending, - }, - CaConnState::PeerReady => { - { - // TODO can I move this block somewhere else? - let _ = self.handle_get_series_futs(cx)?; - let ts2 = Instant::now(); - self.stats - .poll_time_get_series_futs - .fetch_add((ts2.duration_since(ts1) * MS as u32).as_secs(), Ordering::AcqRel); - ts1 = ts2; - } - let res = self.handle_peer_ready(cx); - let ts2 = Instant::now(); - self.stats.time_handle_peer_ready_dur(ts2.duration_since(ts1)); - ts1 = ts2; - match res { - Ready(Some(Ok(()))) => { - if self.insert_item_queue.len() >= self.insert_queue_max { - continue 'outer; - } else { - continue; - } - } - Ready(Some(Err(e))) => Ready(Some(Err(e))), - Ready(None) => continue 'outer, - Pending => Pending, - } - } - CaConnState::Wait(inst) => match inst.poll_unpin(cx) { - Ready(_) => { - self.state = CaConnState::Unconnected; - self.proto = None; - continue 'outer; - } - Pending => Pending, - }, - }; - }; + if !self.shutdown { + if let Some(v) = self.loop_inner(cx) { + break v; + } + } }; - let ts_outer_2 = Instant::now(); - self.stats.poll_time_all_dur(ts_outer_2.duration_since(ts_outer_1)); - // TODO currently, this will never stop by itself match &ret { Ready(_) => self.stats.conn_stream_ready_inc(), Pending => self.stats.conn_stream_pending_inc(), } - ret + if self.shutdown && self.insert_item_queue.len() == 0 { + return Ready(None); + } + match ret { + Ready(x) => Ready(Some(x)), + Pending => Pending, + } } } diff --git a/netfetch/src/ca/proto.rs b/netfetch/src/ca/proto.rs index 596add7..06158e3 100644 --- a/netfetch/src/ca/proto.rs +++ b/netfetch/src/ca/proto.rs @@ -28,6 +28,13 @@ pub struct SearchRes { pub proto_version: u16, } +#[derive(Debug)] +pub struct ErrorCmd { + pub cid: u32, + pub eid: u32, + pub msg: String, +} + #[derive(Debug)] pub struct ClientNameRes { pub name: String, @@ -183,6 +190,7 @@ pub struct CaEventValue { pub enum CaMsgTy { Version, VersionRes(u16), + Error(ErrorCmd), ClientName, ClientNameRes(ClientNameRes), HostName(String), @@ -204,6 +212,7 @@ impl CaMsgTy { match self { Version => 0, VersionRes(_) => 0, + Error(_) => 0x0b, ClientName => 0x14, ClientNameRes(_) => 0x14, HostName(_) => 0x15, @@ -229,6 +238,7 @@ impl CaMsgTy { match self { Version => 0, VersionRes(_) => 0, + Error(x) => (16 + x.msg.len() + 1 + 7) / 8 * 8, ClientName => 0x10, ClientNameRes(x) => (x.name.len() + 1 + 7) / 8 * 8, HostName(_) => 0x18, @@ -256,6 +266,7 @@ impl CaMsgTy { match self { Version => 0, VersionRes(n) => *n, + Error(_) => 0, ClientName => 0, ClientNameRes(_) => 0, HostName(_) => 0, @@ -280,6 +291,7 @@ impl CaMsgTy { match self { Version => CA_PROTO_VERSION, VersionRes(_) => 0, + Error(_) => 0, ClientName => 0, ClientNameRes(_) => 0, HostName(_) => 0, @@ -301,6 +313,7 @@ impl CaMsgTy { match self { Version => 0, VersionRes(_) => 0, + Error(_) => 0, ClientName => 0, ClientNameRes(_) => 0, HostName(_) => 0, @@ -322,6 +335,7 @@ impl CaMsgTy { match self { Version => 0, VersionRes(_) => 0, + Error(_) => 0, ClientName => 0, ClientNameRes(_) => 0, HostName(_) => 0, @@ -343,6 +357,8 @@ impl CaMsgTy { match self { Version => {} VersionRes(_) => {} + // Specs: error cmd only from server to client. + Error(_) => todo!(), ClientName => { // TODO allow variable client name. let s = "daqingest".as_bytes(); @@ -519,9 +535,23 @@ impl CaMsg { pub fn from_proto_infos(hi: &HeadInfo, payload: &[u8], array_truncate: usize) -> Result { let msg = match hi.cmdid { - 0 => CaMsg { + 0x00 => CaMsg { ty: CaMsgTy::VersionRes(hi.data_count), }, + 0x0b => { + let mut s = String::new(); + s.extend(format!("{:?}", &payload[..payload.len().min(16)]).chars()); + if payload.len() >= 17 { + s.extend(" msg: ".chars()); + s.extend(String::from_utf8_lossy(&payload[17..payload.len() - 1]).chars()); + } + let e = ErrorCmd { + cid: hi.param1, + eid: hi.param2, + msg: s, + }; + CaMsg { ty: CaMsgTy::Error(e) } + } 20 => { let name = std::ffi::CString::new(payload) .map(|s| s.into_string().unwrap_or_else(|e| format!("{e:?}"))) diff --git a/netfetch/src/series.rs b/netfetch/src/series.rs index 793be1d..8869aa1 100644 --- a/netfetch/src/series.rs +++ b/netfetch/src/series.rs @@ -12,6 +12,16 @@ pub enum Existence { Existing(T), } +impl Existence { + pub fn into_inner(self) -> T { + use Existence::*; + match self { + Created(x) => x, + Existing(x) => x, + } + } +} + #[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Serialize)] pub struct SeriesId(u64); diff --git a/netfetch/src/store.rs b/netfetch/src/store.rs index cb6a649..045e106 100644 --- a/netfetch/src/store.rs +++ b/netfetch/src/store.rs @@ -1,6 +1,7 @@ use crate::ca::proto::{CaDataArrayValue, CaDataScalarValue, CaDataValue}; use crate::ca::store::DataStore; use crate::errconv::ErrConv; +use crate::series::SeriesId; use err::Error; use futures_util::{Future, FutureExt}; use log::*; @@ -102,21 +103,32 @@ pub struct ConnectionStatusItem { #[derive(Debug)] pub enum ChannelStatus { - Opened = 1, - Closed = 2, - ClosedUnexpected = 3, + Opened, + Closed, + ClosedUnexpected, +} + +impl ChannelStatus { + pub fn kind(&self) -> u32 { + use ChannelStatus::*; + match self { + Opened => 1, + Closed => 2, + ClosedUnexpected => 3, + } + } } #[derive(Debug)] pub struct ChannelStatusItem { pub ts: SystemTime, - pub series: u64, + pub series: SeriesId, pub status: ChannelStatus, } #[derive(Debug)] pub struct InsertItem { - pub series: u64, + pub series: SeriesId, pub ts_msp: u64, pub ts_lsp: u64, pub msp_bump: bool, @@ -129,7 +141,7 @@ pub struct InsertItem { #[derive(Debug)] pub struct MuteItem { - pub series: u64, + pub series: SeriesId, pub ts: u64, pub ema: f32, pub emd: f32, @@ -137,7 +149,7 @@ pub struct MuteItem { #[derive(Debug)] pub struct IvlItem { - pub series: u64, + pub series: SeriesId, pub ts: u64, pub ema: f32, pub emd: f32, @@ -191,6 +203,18 @@ impl CommonInsertItemQueue { pub fn receiver(&self) -> async_channel::Receiver { self.recv.clone() } + + pub fn sender_count(&self) -> usize { + self.sender.sender_count() + } + + pub fn sender_count2(&self) -> usize { + self.recv.sender_count() + } + + pub fn receiver_count(&self) -> usize { + self.recv.receiver_count() + } } struct InsParCom { @@ -242,7 +266,7 @@ where pub async fn insert_item(item: InsertItem, data_store: &DataStore, stats: &CaConnStats) -> Result<(), Error> { if item.msp_bump { - let params = (item.series as i64, item.ts_msp as i64); + let params = (item.series.id() as i64, item.ts_msp as i64); data_store .scy .execute(&data_store.qu_insert_ts_msp, params) @@ -252,11 +276,11 @@ pub async fn insert_item(item: InsertItem, data_store: &DataStore, stats: &CaCon } if let Some(ts_msp_grid) = item.ts_msp_grid { let params = ( - (item.series as i32) & 0xff, + (item.series.id() as i32) & 0xff, ts_msp_grid as i32, if item.shape.to_scylla_vec().is_empty() { 0 } else { 1 } as i32, item.scalar_type.to_scylla_i32(), - item.series as i64, + item.series.id() as i64, ); data_store .scy @@ -266,7 +290,7 @@ pub async fn insert_item(item: InsertItem, data_store: &DataStore, stats: &CaCon stats.inserts_msp_grid_inc() } let par = InsParCom { - series: item.series, + series: item.series.id(), ts_msp: item.ts_msp, ts_lsp: item.ts_lsp, pulse: item.pulse, @@ -325,3 +349,29 @@ pub async fn insert_connection_status( .err_conv()?; Ok(()) } + +pub async fn insert_channel_status( + item: ChannelStatusItem, + data_store: &DataStore, + _stats: &CaConnStats, +) -> Result<(), Error> { + let tsunix = item.ts.duration_since(std::time::UNIX_EPOCH).unwrap(); + let secs = tsunix.as_secs() * netpod::timeunits::SEC; + let nanos = tsunix.subsec_nanos() as u64; + let ts = secs + nanos; + let div = netpod::timeunits::DAY; + let ts_msp = ts / div * div; + let ts_lsp = ts - ts_msp; + let kind = item.status.kind(); + let series = item.series.id(); + let params = (series as i64, ts_msp as i64, ts_lsp as i64, kind as i32); + data_store + .scy + .query( + "insert into channel_status (series, ts_msp, ts_lsp, kind) values (?, ?, ?, ?)", + params, + ) + .await + .err_conv()?; + Ok(()) +} diff --git a/stats/src/stats.rs b/stats/src/stats.rs index dba0fef..13f7351 100644 --- a/stats/src/stats.rs +++ b/stats/src/stats.rs @@ -156,19 +156,23 @@ stats_proc::stats_struct!(( store_worker_item_insert, store_worker_item_drop, store_worker_item_error, - poll_time_all, - poll_time_handle_insert_futs, - poll_time_get_series_futs, + caconn_poll_count, + caconn_loop1_count, + caconn_loop2_count, + caconn_loop3_count, + caconn_loop4_count, time_handle_conn_listen, time_handle_peer_ready, time_check_channels_state_init, time_handle_event_add_res, - ioc_lookup, tcp_connected, get_series_id_ok, conn_item_count, conn_stream_ready, conn_stream_pending, + channel_all_count, + channel_alive_count, + channel_not_alive_count, ca_ts_off_1, ca_ts_off_2, ca_ts_off_3,