From 6407af9574c850dfd9ffd29d2f8cdb8c4ea61a3a Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Mon, 11 Sep 2023 17:25:44 +0200 Subject: [PATCH] WIP split channel --- daqingest/src/daemon.rs | 84 ++++--- dbpg/src/seriesbychannel.rs | 56 +++-- netfetch/src/ca.rs | 1 + netfetch/src/ca/conn.rs | 328 +++++++++++++------------ netfetch/src/ca/connset.rs | 70 ++++-- netfetch/src/ca/connset_input_merge.rs | 66 +++++ netfetch/src/ca/finder.rs | 12 +- netfetch/src/lib.rs | 1 + netfetch/src/polltimer.rs | 38 +++ netfetch/src/timebin.rs | 19 +- 10 files changed, 439 insertions(+), 236 deletions(-) create mode 100644 netfetch/src/ca/connset_input_merge.rs create mode 100644 netfetch/src/polltimer.rs diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index 49d6cea..bc6e917 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -4,6 +4,7 @@ pub mod inserthook; use async_channel::Receiver; use async_channel::Sender; use async_channel::WeakReceiver; +use async_channel::WeakSender; use err::Error; use log::*; use netfetch::ca::connset::CaConnSet; @@ -45,6 +46,12 @@ const CHANNEL_CHECK_INTERVAL: Duration = Duration::from_millis(5000); const PRINT_ACTIVE_INTERVAL: Duration = Duration::from_millis(60000); const PRINT_STATUS_INTERVAL: Duration = Duration::from_millis(20000); +#[derive(Debug)] +enum CheckPeriodic { + Waiting(Instant), + Ongoing(Instant), +} + pub struct DaemonOpts { backend: String, local_epics_hostname: String, @@ -78,30 +85,28 @@ pub struct Daemon { count_assigned: usize, last_status_print: SystemTime, insert_workers_jh: Vec>>, - caconn_last_channel_check: Instant, stats: Arc, shutting_down: bool, insert_rx_weak: WeakReceiver, connset_ctrl: CaConnSetCtrl, - connset_status_last: Instant, + connset_status_last: CheckPeriodic, // TODO should be a stats object? insert_workers_running: AtomicU64, + query_item_tx_weak: WeakSender, } impl Daemon { pub async fn new(opts: DaemonOpts) -> Result { - let datastore = DataStore::new(&opts.scyconf) - .await - .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; - let datastore = Arc::new(datastore); let (daemon_ev_tx, daemon_ev_rx) = async_channel::bounded(32); // TODO keep join handles and await later - let (channel_info_query_tx, ..) = dbpg::seriesbychannel::start_lookup_workers(4, &opts.pgconf) + let (channel_info_query_tx, jhs, jh) = dbpg::seriesbychannel::start_lookup_workers(4, &opts.pgconf) .await .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; let (query_item_tx, query_item_rx) = async_channel::bounded(opts.insert_item_queue_cap); + let query_item_tx_weak = query_item_tx.downgrade(); + let insert_queue_counter = Arc::new(AtomicUsize::new(0)); // Insert queue hook @@ -207,13 +212,13 @@ impl Daemon { count_assigned: 0, last_status_print: SystemTime::now(), insert_workers_jh, - caconn_last_channel_check: Instant::now(), stats: Arc::new(DaemonStats::new()), shutting_down: false, insert_rx_weak: query_item_rx.downgrade(), connset_ctrl: conn_set_ctrl, - connset_status_last: Instant::now(), + connset_status_last: CheckPeriodic::Waiting(Instant::now()), insert_workers_running: AtomicU64::new(0), + query_item_tx_weak, }; Ok(ret) } @@ -222,10 +227,24 @@ impl Daemon { &self.stats } - async fn check_caconn_chans(&mut self) -> Result<(), Error> { - if self.caconn_last_channel_check.elapsed() > CHANNEL_CHECK_INTERVAL { - self.connset_ctrl.check_health().await?; - self.caconn_last_channel_check = Instant::now(); + async fn check_caconn_chans(&mut self, ts1: Instant) -> Result<(), Error> { + match &self.connset_status_last { + CheckPeriodic::Waiting(since) => { + if *since + Duration::from_millis(2000) < ts1 { + debug!("======================================== issue health check CaConn"); + self.connset_ctrl.check_health().await?; + self.connset_status_last = CheckPeriodic::Ongoing(ts1); + if let Some(tx) = self.query_item_tx_weak.upgrade() { + info!("query_item_tx len {}", tx.len()); + } + } + } + CheckPeriodic::Ongoing(since) => { + let dt = ts1.saturating_duration_since(*since); + if dt > Duration::from_millis(4000) { + error!("======================================== CaConnSet has not reported health status since {:.0}", dt.as_secs_f32() * 1e3); + } + } } Ok(()) } @@ -244,7 +263,6 @@ impl Daemon { } } self.stats.handle_timer_tick_count.inc(); - let ts1 = Instant::now(); let tsnow = SystemTime::now(); if SIGINT.load(atomic::Ordering::Acquire) == 1 { warn!("Received SIGINT"); @@ -254,18 +272,8 @@ impl Daemon { warn!("Received SIGTERM"); SIGTERM.store(2, atomic::Ordering::Release); } - if self.connset_status_last + Duration::from_millis(2000) < ts1 { - self.connset_ctrl.check_health().await?; - } - if self.connset_status_last + Duration::from_millis(10000) < ts1 { - error!("CaConnSet has not reported health status"); - } - let dt = ts1.elapsed(); - if dt > Duration::from_millis(500) { - info!("slow check_chans {}ms", dt.as_secs_f32() * 1e3); - } let ts1 = Instant::now(); - self.check_caconn_chans().await?; + self.check_caconn_chans(ts1).await?; let dt = ts1.elapsed(); if dt > Duration::from_millis(500) { info!("slow check_chans {}ms", dt.as_secs_f32() * 1e3); @@ -287,6 +295,7 @@ impl Daemon { } async fn handle_channel_add(&mut self, ch: Channel) -> Result<(), Error> { + debug!("handle_channel_add {ch:?}"); self.connset_ctrl .add_channel( self.opts.backend.clone(), @@ -356,8 +365,20 @@ impl Daemon { async fn handle_ca_conn_set_item(&mut self, item: CaConnSetItem) -> Result<(), Error> { use CaConnSetItem::*; match item { - Healthy => { - self.connset_status_last = Instant::now(); + Healthy(ts1, ts2) => { + let ts3 = Instant::now(); + let dt1 = ts2.duration_since(ts1).as_secs_f32() * 1e3; + let dt2 = ts3.duration_since(ts2).as_secs_f32() * 1e3; + match &self.connset_status_last { + CheckPeriodic::Waiting(_since) => { + error!("======================================== received CaConnSet health report without having asked {dt1:.0} ms {dt2:.0} ms"); + } + CheckPeriodic::Ongoing(since) => { + let dtsince = ts3.duration_since(*since).as_secs_f32() * 1e3; + debug!("======================================== received CaConnSet healthy dtsince {dtsince:.0} ms {dt1:.0} ms {dt2:.0} ms"); + self.connset_status_last = CheckPeriodic::Waiting(ts3); + } + } } } Ok(()) @@ -404,7 +425,7 @@ impl Daemon { let ts1 = Instant::now(); let ret = self.handle_timer_tick().await; match tx.send(i.wrapping_add(1)).await { - Ok(_) => {} + Ok(()) => {} Err(_) => { self.stats.ticker_token_release_error.inc(); error!("can not send ticker token"); @@ -575,9 +596,16 @@ pub async fn run(opts: CaIngestOpts, channels: Vec) -> Result<(), Error> }; let daemon_jh = taskrun::spawn(daemon.daemon()); + + debug!("will configure {} channels", channels.len()); + let mut i = 0; for s in &channels { let ch = Channel::new(s.into()); tx.send(DaemonEvent::ChannelAdd(ch)).await?; + i += 1; + if i % 1000 == 0 { + debug!("sent {} ChannelAdd", i); + } } debug!("{} configured channels applied", channels.len()); daemon_jh.await.map_err(|e| Error::with_msg_no_trace(e.to_string()))??; diff --git a/dbpg/src/seriesbychannel.rs b/dbpg/src/seriesbychannel.rs index 70bd128..e5aa31e 100644 --- a/dbpg/src/seriesbychannel.rs +++ b/dbpg/src/seriesbychannel.rs @@ -1,6 +1,6 @@ use async_channel::Receiver; -use async_channel::SendError; use async_channel::Sender; +use core::fmt; use err::thiserror; use err::ThisError; use futures_util::Future; @@ -21,7 +21,7 @@ use tokio_postgres::Statement as PgStatement; #[allow(unused)] macro_rules! trace2 { ($($arg:tt)*) => { - if true { + if false { trace!($($arg)*); } }; @@ -30,7 +30,7 @@ macro_rules! trace2 { #[allow(unused)] macro_rules! trace3 { ($($arg:tt)*) => { - if true { + if false { trace!($($arg)*); } }; @@ -67,6 +67,17 @@ pub struct ChannelInfoQuery { pub tx: Pin>, } +impl fmt::Debug for ChannelInfoQuery { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("ChannelInfoQuery") + .field("backend", &self.backend) + .field("channel", &self.channel) + .field("scalar_type", &self.scalar_type) + .field("shape_dims", &self.shape_dims) + .finish() + } +} + struct ChannelInfoResult2 { backend: String, channel: String, @@ -160,24 +171,30 @@ impl Worker { let mut it1 = rows.into_iter(); let mut e1 = it1.next(); for (qrid, tx) in tx { - if let Some(row) = &e1 { + let i = qrid as usize; + let found = if let Some(row) = &e1 { let rid: i32 = row.get(1); - let channel: String = row.get(2); if rid as u32 == qrid { let series: i64 = row.get(0); + let ch2: String = row.get(2); let series = SeriesId::new(series as _); let res = ChannelInfoResult2 { // TODO take from database query. Needs test. backend: backend[0].clone(), - channel, + channel: ch2, series: Existence::Existing(series), tx, }; result.push(res); + e1 = it1.next(); + None + } else { + Some(tx) } - e1 = it1.next(); } else { - let i = qrid as usize; + Some(tx) + }; + if let Some(tx) = found { let k = ChannelInfoQuery { backend: backend[i].clone(), channel: channel[i].clone(), @@ -265,7 +282,7 @@ impl Worker { async fn work(&mut self) -> Result<(), Error> { while let Some(batch) = self.batch_rx.next().await { - trace2!("worker recv batch len {}", batch.len()); + trace!("worker recv batch len {}", batch.len()); for x in &batch { trace3!( "search for {} {} {:?} {:?}", @@ -277,19 +294,26 @@ impl Worker { } let (res1, missing) = self.select(batch).await?; let res3 = if missing.len() > 0 { + trace2!("missing {}", missing.len()); + for x in &missing { + trace2!("insert missing {x:?}"); + } + let missing_count = missing.len(); self.insert_missing(&missing).await?; let (res2, missing2) = self.select(missing).await?; if missing2.len() > 0 { - warn!("series ids still missing after insert"); + for x in &missing2 { + warn!("series ids still missing after insert {}", x.channel); + } Err(Error::SeriesMissing) } else { - Ok(res2) + trace2!("select missing after insert {} of {}", missing_count, res2.len()); + Ok((res1, res2)) } } else { - Ok(res1) - }; - let res4 = res3?; - for r in res4 { + Ok((res1, Vec::new())) + }?; + for r in res3.0.into_iter().chain(res3.1.into_iter()) { let item = ChannelInfoResult { backend: r.backend, channel: r.channel, @@ -306,7 +330,7 @@ impl Worker { } } } - info!("Worker done"); + debug!("Worker done"); Ok(()) } } diff --git a/netfetch/src/ca.rs b/netfetch/src/ca.rs index fcb0f5f..ba0fcba 100644 --- a/netfetch/src/ca.rs +++ b/netfetch/src/ca.rs @@ -1,5 +1,6 @@ pub mod conn; pub mod connset; +pub mod connset_input_merge; pub mod finder; pub mod findioc; pub mod proto; diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 481d057..e585a0e 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -1,11 +1,5 @@ use super::proto; -use super::proto::CaItem; -use super::proto::CaMsg; -use super::proto::CaMsgTy; -use super::proto::CaProto; use super::ExtraInsertsConf; -use crate::ca::proto::CreateChan; -use crate::ca::proto::EventAdd; use crate::senderpolling::SenderPolling; use crate::timebin::ConnTimeBin; use async_channel::Sender; @@ -23,6 +17,12 @@ use netpod::ScalarType; use netpod::Shape; use netpod::TS_MSP_GRID_SPACING; use netpod::TS_MSP_GRID_UNIT; +use proto::CaItem; +use proto::CaMsg; +use proto::CaMsgTy; +use proto::CaProto; +use proto::CreateChan; +use proto::EventAdd; use scywr::iteminsertqueue as scywriiq; use scywriiq::ChannelInfoItem; use scywriiq::ChannelStatus; @@ -46,9 +46,7 @@ use std::ops::ControlFlow; use std::pin::Pin; use std::sync::atomic; use std::sync::atomic::AtomicUsize; -use std::sync::atomic::Ordering; use std::sync::Arc; -use std::sync::Mutex as StdMutex; use std::task::Context; use std::task::Poll; use std::time::Duration; @@ -57,10 +55,19 @@ use std::time::SystemTime; use taskrun::tokio; use tokio::net::TcpStream; +#[allow(unused)] +macro_rules! trace2 { + ($($arg:tt)*) => { + if false { + trace!($($arg)*); + } + }; +} + #[allow(unused)] macro_rules! trace3 { ($($arg:tt)*) => { - if true { + if false { trace!($($arg)*); } }; @@ -69,7 +76,7 @@ macro_rules! trace3 { #[allow(unused)] macro_rules! trace4 { ($($arg:tt)*) => { - if true { + if false { trace!($($arg)*); } }; @@ -394,12 +401,6 @@ pub struct CaConnEvent { pub value: CaConnEventValue, } -#[derive(Debug)] -enum ChannelSetOp { - Add(ChannelStatusSeriesId), - Remove, -} - struct SendSeriesLookup { tx: Sender, } @@ -626,6 +627,7 @@ impl CaConn { &mut self, res: Result, ) -> Result<(), Error> { + trace2!("handle_series_lookup_result {res:?}"); match res { Ok(res) => { let series = res.series.into_inner(); @@ -643,7 +645,7 @@ impl CaConn { let data_type = st2.data_type; let data_count = st2.data_count; match self.channel_to_evented(cid, sid, data_type, data_count, series) { - Ok(_) => {} + Ok(()) => {} Err(e) => { error!("handle_series_lookup_result {e}"); } @@ -668,38 +670,40 @@ impl CaConn { fn handle_conn_command(&mut self, cx: &mut Context) -> Poll>> { // TODO if this loops for too long time, yield and make sure we get wake up again. use Poll::*; - self.stats.caconn_loop3_count.inc(); - match self.conn_command_rx.poll_next_unpin(cx) { - Ready(Some(a)) => { - trace!("handle_conn_command received a command {}", self.remote_addr_dbg); - match a.kind { - ConnCommandKind::ChannelAdd(name, cssid) => { - self.cmd_channel_add(name, cssid); - Ready(Some(Ok(()))) + loop { + self.stats.caconn_loop3_count.inc(); + break match self.conn_command_rx.poll_next_unpin(cx) { + Ready(Some(a)) => { + trace3!("handle_conn_command received a command {}", self.remote_addr_dbg); + match a.kind { + ConnCommandKind::ChannelAdd(name, cssid) => { + self.cmd_channel_add(name, cssid); + Ready(Some(Ok(()))) + } + ConnCommandKind::ChannelRemove(name) => { + self.cmd_channel_remove(name); + Ready(Some(Ok(()))) + } + ConnCommandKind::CheckHealth => { + self.cmd_check_health(); + Ready(Some(Ok(()))) + } + ConnCommandKind::Shutdown => { + self.cmd_shutdown(); + Ready(Some(Ok(()))) + } + ConnCommandKind::SeriesLookupResult(x) => match self.handle_series_lookup_result(x) { + Ok(()) => Ready(Some(Ok(()))), + Err(e) => Ready(Some(Err(e))), + }, } - ConnCommandKind::ChannelRemove(name) => { - self.cmd_channel_remove(name); - Ready(Some(Ok(()))) - } - ConnCommandKind::CheckHealth => { - self.cmd_check_health(); - Ready(Some(Ok(()))) - } - ConnCommandKind::Shutdown => { - self.cmd_shutdown(); - Ready(Some(Ok(()))) - } - ConnCommandKind::SeriesLookupResult(x) => match self.handle_series_lookup_result(x) { - Ok(()) => Ready(Some(Ok(()))), - Err(e) => Ready(Some(Err(e))), - }, } - } - Ready(None) => { - error!("Command queue closed"); - Ready(None) - } - Pending => Pending, + Ready(None) => { + error!("Command queue closed"); + Ready(None) + } + Pending => Pending, + }; } } @@ -937,22 +941,23 @@ impl CaConn { series: SeriesId, ) -> Result<(), Error> { let tsnow = Instant::now(); + let name = self.name_by_cid(cid).unwrap().to_string(); + // TODO handle error better! Transition channel to Error state? + let scalar_type = ScalarType::from_ca_id(data_type)?; + let shape = Shape::from_ca_count(data_count)?; + trace2!("channel_to_evented {name:?} {scalar_type:?} {shape:?}"); self.stats.get_series_id_ok.inc(); if series.id() == 0 { - warn!("Weird series id: {series:?}"); + warn!("unexpected {series:?}"); } if data_type > 6 { error!("data type of series unexpected: {}", data_type); } - // TODO handle error better! Transition channel to Error state? - let scalar_type = ScalarType::from_ca_id(data_type)?; - let shape = Shape::from_ca_count(data_count)?; let mut tb = ConnTimeBin::empty(); tb.setup_for(series.clone(), &scalar_type, &shape)?; self.time_binners.insert(cid, tb); let subid = self.subid_store.next(); self.cid_by_subid.insert(subid, cid); - let name = self.name_by_cid(cid).unwrap().to_string(); // TODO convert first to CaDbrType, set to `Time`, then convert to ix: let data_type_asked = data_type + 14; let msg = CaMsg { @@ -1170,10 +1175,12 @@ impl CaConn { let item_queue = &mut self.insert_item_queue; let inserts_counter = &mut self.inserts_counter; let extra_inserts_conf = &self.extra_inserts_conf; - if let Some(tb) = self.time_binners.get_mut(&cid) { - tb.push(ts, &ev.value)?; - } else { - // TODO count or report error + if false { + if let Some(tb) = self.time_binners.get_mut(&cid) { + tb.push(ts, &ev.value)?; + } else { + // TODO count or report error + } } #[cfg(DISABLED)] match &ev.value.data { @@ -1383,10 +1390,7 @@ impl CaConn { let sid = k.sid; // TODO handle error: let name = self.name_by_cid(cid).unwrap().to_string(); - debug!("CreateChanRes {name:?}"); - if false && name.contains(".STAT") { - info!("Channel created for {}", name); - } + trace3!("CreateChanRes {name:?}"); if k.data_type > 6 { error!("CreateChanRes with unexpected data_type {}", k.data_type); } @@ -1438,7 +1442,7 @@ impl CaConn { do_wake_again = true; } CaMsgTy::EventAddRes(k) => { - trace!("got EventAddRes: {k:?}"); + trace4!("got EventAddRes: {k:?}"); self.stats.caconn_recv_data.inc(); let res = Self::handle_event_add_res(self, k, tsnow); let ts2 = Instant::now(); @@ -1509,17 +1513,19 @@ impl CaConn { Break(Pending) } - fn handle_conn_state(&mut self, cx: &mut Context) -> Result>, Error> { + fn handle_conn_state(&mut self, cx: &mut Context) -> Result>, Error> { use Poll::*; match &mut self.state { CaConnState::Unconnected => { + trace4!("Unconnected"); let addr = self.remote_addr_dbg.clone(); trace!("create tcp connection to {:?}", (addr.ip(), addr.port())); let fut = tokio::time::timeout(Duration::from_millis(1000), TcpStream::connect(addr)); self.state = CaConnState::Connecting(addr, Box::pin(fut)); - Ok(None) + Ok(Ready(Some(()))) } CaConnState::Connecting(ref addr, ref mut fut) => { + trace4!("Connecting"); match fut.poll_unpin(cx) { Ready(connect_result) => { match connect_result { @@ -1535,7 +1541,7 @@ impl CaConn { let proto = CaProto::new(tcp, self.remote_addr_dbg.clone(), self.opts.array_truncate); self.state = CaConnState::Init; self.proto = Some(proto); - Ok(None) + Ok(Ready(Some(()))) } Ok(Err(_e)) => { // TODO log with exponential backoff @@ -1549,7 +1555,7 @@ impl CaConn { let dt = self.backoff_next(); self.state = CaConnState::Wait(wait_fut(dt)); self.proto = None; - Ok(None) + Ok(Ready(Some(()))) } Err(e) => { // TODO log with exponential backoff @@ -1564,14 +1570,15 @@ impl CaConn { let dt = self.backoff_next(); self.state = CaConnState::Wait(wait_fut(dt)); self.proto = None; - Ok(None) + Ok(Ready(Some(()))) } } } - Pending => Ok(Some(Pending)), + Pending => Ok(Pending), } } CaConnState::Init => { + trace4!("Init"); let hostname = self.local_epics_hostname.clone(); let proto = self.proto.as_mut().unwrap(); let msg = CaMsg { ty: CaMsgTy::Version }; @@ -1585,54 +1592,74 @@ impl CaConn { }; proto.push_out(msg); self.state = CaConnState::Listen; - Ok(None) + Ok(Ready(Some(()))) + } + CaConnState::Listen => { + trace4!("Listen"); + match { + let res = self.handle_conn_listen(cx); + res + } { + Ready(Some(Ok(()))) => Ok(Ready(Some(()))), + Ready(Some(Err(e))) => Err(e), + Ready(None) => Ok(Ready(Some(()))), + Pending => Ok(Pending), + } } - CaConnState::Listen => match { - let res = self.handle_conn_listen(cx); - res - } { - Ready(Some(Ok(()))) => Ok(Some(Ready(()))), - Ready(Some(Err(e))) => Err(e), - Ready(None) => Ok(None), - Pending => Ok(Some(Pending)), - }, CaConnState::PeerReady => { + trace4!("PeerReady"); let res = self.handle_peer_ready(cx); match res { - Ready(Some(Ok(()))) => Ok(None), + Ready(Some(Ok(()))) => Ok(Ready(Some(()))), Ready(Some(Err(e))) => Err(e), - Ready(None) => Ok(None), - Pending => Ok(Some(Pending)), + Ready(None) => Ok(Ready(Some(()))), + Pending => Ok(Pending), } } - CaConnState::Wait(inst) => match inst.poll_unpin(cx) { - Ready(_) => { - self.state = CaConnState::Unconnected; - self.proto = None; - Ok(None) + CaConnState::Wait(inst) => { + trace4!("Wait"); + match inst.poll_unpin(cx) { + Ready(_) => { + self.state = CaConnState::Unconnected; + self.proto = None; + Ok(Ready(Some(()))) + } + Pending => Ok(Pending), } - Pending => Ok(Some(Pending)), - }, - CaConnState::Shutdown => Ok(None), - CaConnState::EndOfStream => Ok(None), + } + CaConnState::Shutdown => { + trace4!("Shutdown"); + Ok(Ready(None)) + } + CaConnState::EndOfStream => { + trace4!("EndOfStream"); + Ok(Ready(None)) + } } } - fn loop_inner(&mut self, cx: &mut Context) -> Result>, Error> { + fn loop_inner(&mut self, cx: &mut Context) -> Result>, Error> { use Poll::*; loop { self.stats.caconn_loop2_count.inc(); - if self.is_shutdown() { - break Ok(None); - } - if self.insert_item_queue.len() >= self.opts.insert_queue_max { - break Ok(None); - } - match self.handle_conn_state(cx)? { - Some(Ready(_)) => continue, - Some(Pending) => break Ok(Some(Pending)), - None => break Ok(None), - } + break if self.is_shutdown() { + Ok(Ready(None)) + } else if self.insert_item_queue.len() >= self.opts.insert_queue_max { + warn!("======================================================= queue stall"); + Ok(Ready(None)) + } else { + match self.handle_conn_state(cx) { + Ok(x) => match x { + Ready(Some(())) => continue, + Ready(None) => { + error!("handle_conn_state yields {x:?}"); + Err(Error::with_msg_no_trace("logic error")) + } + Pending => Ok(Pending), + }, + Err(e) => Err(e), + } + }; } } @@ -1662,9 +1689,11 @@ impl CaConn { fn handle_own_ticker_tick(self: Pin<&mut Self>, _cx: &mut Context) -> Result<(), Error> { let this = self.get_mut(); - for (_, tb) in this.time_binners.iter_mut() { - let iiq = &mut this.insert_item_queue; - tb.tick(iiq)?; + if false { + for (_, tb) in this.time_binners.iter_mut() { + let iiq = &mut this.insert_item_queue; + tb.tick(iiq)?; + } } Ok(()) } @@ -1680,10 +1709,11 @@ impl CaConn { break if sd.is_sending() { match sd.poll_unpin(cx) { Ready(Ok(())) => continue, - Ready(Err(e)) => Err(Error::with_msg_no_trace("can not send into channel")), + Ready(Err(_)) => Err(Error::with_msg_no_trace("can not send into channel")), Pending => Ok(()), } } else if let Some(item) = self.channel_info_query_queue.pop_front() { + trace3!("send series query {item:?}"); let sd = &mut self.channel_info_query_sending; sd.send(item); continue; @@ -1700,8 +1730,8 @@ impl Stream for CaConn { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; self.stats.caconn_poll_count.inc(); - loop { - let mut have_pending = false; + let poll_ts1 = Instant::now(); + let ret = loop { break if let CaConnState::EndOfStream = self.state { Ready(None) } else if let Err(e) = self.as_mut().handle_own_ticker(cx) { @@ -1724,64 +1754,42 @@ impl Stream for CaConn { Ready(Some(Err(e))) } else if let Ready(Some(Err(e))) = self.as_mut().handle_conn_command(cx) { Ready(Some(Err(e))) - } else if let Some(item) = { - match self.loop_inner(cx) { - // TODO what does this mean: should we re-loop or yield something? - Ok(Some(Ready(()))) => None, - // This is the last step, so we yield Pending. - // But in general, this does not compose well when we would add another step. - Ok(Some(Pending)) => { - have_pending = true; - None - } - Ok(None) => None, - Err(e) => Some(Err(e)), - } - } { - Ready(Some(item)) } else { - // Ready(_) => self.stats.conn_stream_ready.inc(), - // Pending => self.stats.conn_stream_pending.inc(), - let _item = CaConnEvent { - ts: Instant::now(), - value: CaConnEventValue::None, - }; - if self.is_shutdown() && self.queues_async_out_flushed() { - self.state = CaConnState::EndOfStream; - Ready(None) - } else if have_pending { - Pending - } else { - continue; + match self.loop_inner(cx) { + Ok(Ready(Some(()))) => continue, + Ok(Ready(None)) => { + // Ready(_) => self.stats.conn_stream_ready.inc(), + // Pending => self.stats.conn_stream_pending.inc(), + let _item = CaConnEvent { + ts: Instant::now(), + value: CaConnEventValue::None, + }; + if self.is_shutdown() && self.queues_async_out_flushed() { + debug!("end of stream {}", self.remote_addr_dbg); + self.state = CaConnState::EndOfStream; + Ready(None) + } else { + continue; + } + } + Ok(Pending) => Pending, + Err(e) => { + error!("{e}"); + self.state = CaConnState::EndOfStream; + Ready(Some(Err(e))) + } } }; - } - } -} - -pub struct PollTimer { - inp: INP, -} - -impl PollTimer { - pub fn new(inp: INP) -> Self { - Self { inp } - } -} - -impl Stream for PollTimer -where - INP: Stream + Unpin, -{ - type Item = ::Item; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - let poll_ts1 = Instant::now(); - let inp = &mut self.inp; - let ret = inp.poll_next_unpin(cx); + }; let poll_ts2 = Instant::now(); let dt = poll_ts2.saturating_duration_since(poll_ts1); - if dt > Duration::from_millis(40) {} + if dt > Duration::from_millis(80) { + warn!("long poll duration {:.0} ms", dt.as_secs_f32() * 1e3) + } else if dt > Duration::from_millis(40) { + info!("long poll duration {:.0} ms", dt.as_secs_f32() * 1e3) + } else if dt > Duration::from_millis(5) { + debug!("long poll duration {:.0} ms", dt.as_secs_f32() * 1e3) + } ret } } diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index 734f6b5..36a48a4 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -1,3 +1,4 @@ +use super::connset_input_merge::InputMerge; use super::findioc::FindIocRes; use super::statemap; use super::statemap::ChannelState; @@ -68,6 +69,33 @@ static SEARCH_REQ_RECV_COUNT: AtomicUsize = AtomicUsize::new(0); static SEARCH_REQ_BATCH_SEND_COUNT: AtomicUsize = AtomicUsize::new(0); static SEARCH_ANS_COUNT: AtomicUsize = AtomicUsize::new(0); +#[allow(unused)] +macro_rules! trace2 { + ($($arg:tt)*) => { + if false { + trace!($($arg)*); + } + }; +} + +#[allow(unused)] +macro_rules! trace3 { + ($($arg:tt)*) => { + if false { + trace!($($arg)*); + } + }; +} + +#[allow(unused)] +macro_rules! trace4 { + ($($arg:tt)*) => { + if false { + trace!($($arg)*); + } + }; +} + #[derive(Debug, PartialEq, Eq)] pub struct CmdId(SocketAddrV4, usize); @@ -122,7 +150,7 @@ pub enum ConnSetCmd { ChannelAddWithAddr(ChannelAddWithAddr), ChannelRemove(ChannelRemove), IocAddrQueryResult(VecDeque), - CheckHealth, + CheckHealth(Instant), Shutdown, } @@ -134,7 +162,7 @@ pub enum CaConnSetEvent { #[derive(Debug, Clone)] pub enum CaConnSetItem { - Healthy, + Healthy(Instant, Instant), } pub struct CaConnSetCtrl { @@ -173,7 +201,7 @@ impl CaConnSetCtrl { } pub async fn check_health(&self) -> Result<(), Error> { - let cmd = ConnSetCmd::CheckHealth; + let cmd = ConnSetCmd::CheckHealth(Instant::now()); self.tx.send(CaConnSetEvent::ConnSetCmd(cmd)).await?; Ok(()) } @@ -212,7 +240,8 @@ pub struct CaConnSet { ca_conn_ress: BTreeMap, channel_states: ChannelStateMap, connset_tx: Sender, - connset_rx: Receiver, + // connset_rx: Receiver, + connset_rx: crate::ca::connset_input_merge::InputMerge, channel_info_query_tx: Sender, storage_insert_tx: Sender, shutdown_stopping: bool, @@ -231,17 +260,20 @@ impl CaConnSet { channel_info_query_tx: Sender, pgconf: Database, ) -> CaConnSetCtrl { + let (connset_inp_tx, connset_inp_rx) = async_channel::bounded(256); let (connset_out_tx, connset_out_rx) = async_channel::bounded(256); - let (connset_tx, connset_rx) = async_channel::bounded(10000); - let (search_tx, ioc_finder_jh) = super::finder::start_finder(connset_tx.clone(), backend.clone(), pgconf); + let (find_ioc_res_tx, find_ioc_res_rx) = async_channel::bounded(10000); + let (search_tx, ioc_finder_jh) = super::finder::start_finder(find_ioc_res_tx.clone(), backend.clone(), pgconf); + let input_merge = InputMerge::new(todo!(), find_ioc_res_rx); let connset = Self { backend, local_epics_hostname, search_tx, ca_conn_ress: BTreeMap::new(), channel_states: ChannelStateMap::new(), - connset_tx: connset_tx.clone(), - connset_rx, + connset_tx: connset_inp_tx, + // connset_rx: find_ioc_res_rx, + connset_rx: todo!(), channel_info_query_tx, storage_insert_tx, shutdown_stopping: false, @@ -254,7 +286,7 @@ impl CaConnSet { // TODO await on jh let jh = tokio::spawn(CaConnSet::run(connset)); CaConnSetCtrl { - tx: connset_tx, + tx: connset_inp_tx, rx: connset_out_rx, jh, } @@ -262,10 +294,10 @@ impl CaConnSet { async fn run(mut this: CaConnSet) -> Result<(), Error> { loop { - let x = this.connset_rx.recv().await; + let x = this.connset_rx.next().await; match x { - Ok(ev) => this.handle_event(ev).await?, - Err(_) => { + Some(ev) => this.handle_event(ev).await?, + None => { if this.shutdown_stopping { // all fine break; @@ -302,7 +334,7 @@ impl CaConnSet { ConnSetCmd::ChannelRemove(x) => self.handle_remove_channel(x).await, ConnSetCmd::IocAddrQueryResult(x) => self.handle_ioc_query_result(x).await, ConnSetCmd::SeriesLookupResult(x) => self.handle_series_lookup_result(x).await, - ConnSetCmd::CheckHealth => self.handle_check_health().await, + ConnSetCmd::CheckHealth(ts1) => self.handle_check_health(ts1).await, ConnSetCmd::Shutdown => self.handle_shutdown().await, }, CaConnSetEvent::CaConnEvent((addr, ev)) => match ev.value { @@ -322,7 +354,7 @@ impl CaConnSet { &mut self, res: Result, ) -> Result<(), Error> { - debug!("handle_series_lookup_result {res:?}"); + trace3!("handle_series_lookup_result {res:?}"); match res { Ok(res) => { let add = ChannelAddWithStatusId { @@ -372,7 +404,7 @@ impl CaConnSet { debug!("handle_add_channel but shutdown_stopping"); return Ok(()); } - debug!("handle_add_channel_with_status_id {add:?}"); + trace3!("handle_add_channel_with_status_id {add:?}"); let ch = Channel::new(add.name.clone()); if let Some(chst) = self.channel_states.inner().get_mut(&ch) { if let ChannelStateValue::Active(chst2) = &mut chst.value { @@ -462,7 +494,7 @@ impl CaConnSet { } = ast { if let Some(addr) = e.addr { - debug!("ioc found {e:?}"); + trace3!("ioc found {e:?}"); let add = ChannelAddWithAddr { backend: self.backend.clone(), name: e.channel, @@ -496,9 +528,10 @@ impl CaConnSet { Ok(()) } - async fn handle_check_health(&mut self) -> Result<(), Error> { + async fn handle_check_health(&mut self, ts1: Instant) -> Result<(), Error> { debug!("TODO handle_check_health"); - let item = CaConnSetItem::Healthy; + let ts2 = Instant::now(); + let item = CaConnSetItem::Healthy(ts1, ts2); self.connset_out_tx.send(item).await?; Ok(()) } @@ -907,7 +940,6 @@ impl CaConnSet { } } } - use atomic::Ordering::Release; self.stats.channel_unknown_address.__set(unknown_address); self.stats.channel_search_pending.__set(search_pending); self.stats.channel_no_address.__set(no_address); diff --git a/netfetch/src/ca/connset_input_merge.rs b/netfetch/src/ca/connset_input_merge.rs new file mode 100644 index 0000000..86bb370 --- /dev/null +++ b/netfetch/src/ca/connset_input_merge.rs @@ -0,0 +1,66 @@ +use super::connset::CaConnSetEvent; +use super::findioc::FindIocRes; +use crate::ca::connset::ConnSetCmd; +use async_channel::Receiver; +use futures_util::StreamExt; +use std::collections::VecDeque; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +pub struct InputMerge { + inp1: Option>, + inp2: Option>>, +} + +impl InputMerge { + pub fn new(inp1: Receiver, inp2: Receiver>) -> Self { + Self { + inp1: Some(inp1), + inp2: Some(inp2), + } + } + + pub fn close(&mut self) { + if let Some(x) = self.inp1.as_ref() { + x.close(); + } + } +} + +impl futures_util::Stream for InputMerge { + type Item = CaConnSetEvent; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + let mut poll_next = false; + let ret = if let Some(inp) = &mut self.inp2 { + match inp.poll_next_unpin(cx) { + Ready(Some(x)) => Some(CaConnSetEvent::ConnSetCmd(ConnSetCmd::IocAddrQueryResult(x))), + Ready(None) => { + self.inp2 = None; + None + } + Pending => None, + } + } else { + None + }; + if let Some(x) = ret { + Ready(Some(x)) + } else { + if let Some(inp) = &mut self.inp1 { + match inp.poll_next_unpin(cx) { + Ready(Some(x)) => Ready(Some(x)), + Ready(None) => { + self.inp1 = None; + Ready(None) + } + Pending => Pending, + } + } else { + Ready(None) + } + } + } +} diff --git a/netfetch/src/ca/finder.rs b/netfetch/src/ca/finder.rs index 7ca0783..db21a4a 100644 --- a/netfetch/src/ca/finder.rs +++ b/netfetch/src/ca/finder.rs @@ -85,7 +85,7 @@ fn transform_pgres(rows: Vec) -> VecDeque { async fn finder_worker_single( inp: Receiver>, - tx: Sender, + tx: Sender>, backend: String, db: Database, ) -> Result<(), Error> { @@ -164,11 +164,7 @@ async fn finder_worker_single( error!("STILL NOT MATCHING LEN"); } SEARCH_RES_3_COUNT.fetch_add(items.len(), atomic::Ordering::AcqRel); - let x = tx - .send(CaConnSetEvent::ConnSetCmd( - crate::ca::connset::ConnSetCmd::IocAddrQueryResult(items), - )) - .await; + let x = tx.send(items).await; match x { Ok(_) => {} Err(e) => { @@ -191,7 +187,7 @@ async fn finder_worker_single( async fn finder_worker( qrx: Receiver, - tx: Sender, + tx: Sender>, backend: String, db: Database, ) -> Result<(), Error> { @@ -215,7 +211,7 @@ async fn finder_worker( } pub fn start_finder( - tx: Sender, + tx: Sender>, backend: String, db: Database, ) -> (Sender, JoinHandle>) { diff --git a/netfetch/src/lib.rs b/netfetch/src/lib.rs index 5786bc7..000e888 100644 --- a/netfetch/src/lib.rs +++ b/netfetch/src/lib.rs @@ -7,6 +7,7 @@ pub mod linuxhelper; pub mod metrics; pub mod netbuf; pub mod patchcollect; +pub mod polltimer; pub mod rt; pub mod senderpolling; #[cfg(test)] diff --git a/netfetch/src/polltimer.rs b/netfetch/src/polltimer.rs new file mode 100644 index 0000000..bdfc1ab --- /dev/null +++ b/netfetch/src/polltimer.rs @@ -0,0 +1,38 @@ +use futures_util::Stream; +use futures_util::StreamExt; +use log::*; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; +use std::time::Duration; +use std::time::Instant; + +pub struct PollTimer { + inp: INP, + timeout_warn: Duration, +} + +impl PollTimer { + pub fn new(inp: INP, timeout_warn: Duration) -> Self { + Self { inp, timeout_warn } + } +} + +impl Stream for PollTimer +where + INP: Stream + Unpin, +{ + type Item = ::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let poll_ts1 = Instant::now(); + let inp = &mut self.inp; + let ret = inp.poll_next_unpin(cx); + let poll_ts2 = Instant::now(); + let dt = poll_ts2.saturating_duration_since(poll_ts1); + if dt > self.timeout_warn { + warn!("long poll duration {:.0} ms", dt.as_secs_f32() * 1e3) + } + ret + } +} diff --git a/netfetch/src/timebin.rs b/netfetch/src/timebin.rs index ae3580f..1a23de3 100644 --- a/netfetch/src/timebin.rs +++ b/netfetch/src/timebin.rs @@ -26,6 +26,15 @@ use std::any::Any; use std::collections::VecDeque; use std::time::SystemTime; +#[allow(unused)] +macro_rules! trace2 { + ($($arg:tt)*) => { + if false { + trace!($($arg)*); + } + }; +} + struct TickParams<'a> { series: SeriesId, acc: &'a mut Box, @@ -77,7 +86,7 @@ impl ConnTimeBin { match scalar_type { I8 => { type ST = i8; - info!("SCALAR {}", any::type_name::()); + trace2!("SCALAR {}", any::type_name::()); let cont = Cont::::empty(); self.events_binner = Some(cont.as_time_binnable_ref().time_binner_new(binrange, do_time_weight)); @@ -88,7 +97,7 @@ impl ConnTimeBin { } I16 => { type ST = i16; - info!("SCALAR {}", std::any::type_name::()); + trace2!("SCALAR {}", std::any::type_name::()); let cont = Cont::::empty(); self.events_binner = Some(cont.as_time_binnable_ref().time_binner_new(binrange, do_time_weight)); @@ -99,7 +108,7 @@ impl ConnTimeBin { } I32 => { type ST = i32; - info!("SCALAR {}", std::any::type_name::()); + trace2!("SCALAR {}", std::any::type_name::()); let cont = Cont::::empty(); self.events_binner = Some(cont.as_time_binnable_ref().time_binner_new(binrange, do_time_weight)); @@ -110,7 +119,7 @@ impl ConnTimeBin { } F32 => { type ST = f32; - info!("SCALAR {}", std::any::type_name::()); + trace2!("SCALAR {}", std::any::type_name::()); let cont = Cont::::empty(); self.events_binner = Some(cont.as_time_binnable_ref().time_binner_new(binrange, do_time_weight)); @@ -121,7 +130,7 @@ impl ConnTimeBin { } F64 => { type ST = f64; - info!("SCALAR {}", std::any::type_name::()); + trace2!("SCALAR {}", std::any::type_name::()); let cont = Cont::::empty(); self.events_binner = Some(cont.as_time_binnable_ref().time_binner_new(binrange, do_time_weight));