From 403f0b37a3d4500e4588b042ae83d0e03c8aeb7e Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 19 Jan 2024 16:23:56 +0100 Subject: [PATCH] Fall back to read-notify --- batchtools/src/batcher.rs | 2 +- daqingest/src/daemon.rs | 7 +- ingest-linux/src/net.rs | 1 - netfetch/src/ca/conn.rs | 715 ++++++++++++++++++++++++---------- netfetch/src/ca/connset.rs | 12 +- netfetch/src/ca/findioc.rs | 3 +- netfetch/src/ca/proto.rs | 235 ++++++----- netfetch/src/senderpolling.rs | 7 + serieswriter/src/timebin.rs | 12 +- serieswriter/src/writer.rs | 41 +- stats/src/stats.rs | 18 +- 11 files changed, 713 insertions(+), 340 deletions(-) diff --git a/batchtools/src/batcher.rs b/batchtools/src/batcher.rs index 8b7d5a6..13ad720 100644 --- a/batchtools/src/batcher.rs +++ b/batchtools/src/batcher.rs @@ -51,7 +51,7 @@ async fn run_batcher(rx: Receiver, batch_tx: Sender>, batch_limit: }, Err(e) => { let _: Elapsed = e; - trace!("-------------------------- batcher timeout rx len {}", rx.len()); + // trace!("-------------------------- batcher timeout rx len {}", rx.len()); if all.len() > 0 { do_emit = true; } diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index a856174..a492898 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -23,6 +23,7 @@ use scywriiq::QueryItem; use stats::DaemonStats; use stats::InsertWorkerStats; use stats::SeriesByChannelStats; +use stats::SeriesWriterEstablishStats; use std::collections::VecDeque; use std::sync::atomic; use std::sync::atomic::AtomicU64; @@ -97,8 +98,10 @@ impl Daemon { // Insert queue hook // let query_item_rx = inserthook::active_channel_insert_hook(query_item_rx); - let (writer_establis_tx,) = serieswriter::writer::start_writer_establish_worker(channel_info_query_tx.clone()) - .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; + let wrest_stats = Arc::new(SeriesWriterEstablishStats::new()); + let (writer_establis_tx,) = + serieswriter::writer::start_writer_establish_worker(channel_info_query_tx.clone(), wrest_stats.clone()) + .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; let local_epics_hostname = ingest_linux::net::local_hostname(); let conn_set_ctrl = CaConnSet::start( diff --git a/ingest-linux/src/net.rs b/ingest-linux/src/net.rs index d4c698b..a752a15 100644 --- a/ingest-linux/src/net.rs +++ b/ingest-linux/src/net.rs @@ -20,7 +20,6 @@ pub fn local_hostname() -> String { let hostname = CStr::from_ptr(&buf[0] as *const _ as _); hostname.to_str().unwrap() }; - log::info!("---------------------- found hostname {hostname:?}"); hostname.into() } diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 2ae1553..d280a9c 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -1,5 +1,8 @@ use super::proto; +use super::proto::CaEventValue; +use super::proto::ReadNotify; use super::ExtraInsertsConf; +use crate::ca::proto::EventCancel; use crate::senderpolling::SenderPolling; use crate::throttletrace::ThrottleTrace; use async_channel::Receiver; @@ -16,8 +19,6 @@ use netpod::timeunits::*; use netpod::ScalarType; use netpod::Shape; use netpod::TsNano; -use netpod::TS_MSP_GRID_SPACING; -use netpod::TS_MSP_GRID_UNIT; use proto::CaItem; use proto::CaMsg; use proto::CaMsgTy; @@ -32,7 +33,6 @@ use scywriiq::ChannelStatusClosedReason; use scywriiq::ChannelStatusItem; use scywriiq::ConnectionStatus; use scywriiq::ConnectionStatusItem; -use scywriiq::InsertItem; use scywriiq::IvlItem; use scywriiq::MuteItem; use scywriiq::QueryItem; @@ -51,6 +51,7 @@ use stats::CaProtoStats; use stats::IntervalEma; use std::collections::BTreeMap; use std::collections::HashMap; +use std::collections::HashSet; use std::collections::VecDeque; use std::net::SocketAddrV4; use std::ops::ControlFlow; @@ -191,6 +192,9 @@ impl Sid { } } +#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] +struct Ioid(pub u32); + #[derive(Clone, Debug)] enum ChannelError { CreateChanFail(ChannelStatusSeriesId), @@ -212,11 +216,13 @@ struct MakingSeriesWriterState { #[derive(Debug, Clone)] struct EnableMonitoringState { tsbeg: Instant, + subid: Subid, } #[derive(Debug, Clone)] struct MonitoringState { tsbeg: Instant, + subid: Subid, } #[derive(Debug, Clone)] @@ -228,10 +234,17 @@ struct StopMonitoringForPollingState { struct PollingState { tsbeg: Instant, poll_ivl: Duration, + tick: PollTickState, +} + +#[derive(Debug, Clone)] +enum PollTickState { + Idle(Instant), + Wait(Instant, Ioid), } #[derive(Debug)] -struct CreatedState22 { +struct WritableState { tsbeg: Instant, channel: CreatedState, writer: SeriesWriter, @@ -251,6 +264,8 @@ struct CreatedState { cssid: ChannelStatusSeriesId, cid: Cid, sid: Sid, + ca_dbr_type: u16, + ca_dbr_count: u16, ts_created: Instant, ts_alive_last: Instant, ts_msp_last: u64, @@ -265,6 +280,9 @@ struct CreatedState { info_store_msp_last: u32, recv_count: u64, recv_bytes: u64, + stwin_ts: u64, + stwin_count: u32, + stwin_bytes: u32, } impl CreatedState { @@ -274,6 +292,8 @@ impl CreatedState { cssid: ChannelStatusSeriesId::new(0), cid: Cid(0), sid: Sid(0), + ca_dbr_type: 0, + ca_dbr_count: 0, ts_created: tsnow, ts_alive_last: tsnow, ts_msp_last: 0, @@ -288,6 +308,9 @@ impl CreatedState { info_store_msp_last: 0, recv_count: 0, recv_bytes: 0, + stwin_ts: 0, + stwin_count: 0, + stwin_bytes: 0, } } } @@ -297,7 +320,7 @@ enum ChannelState { Init(ChannelStatusSeriesId), Creating(CreatingState), MakingSeriesWriter(MakingSeriesWriterState), - Created(CreatedState22), + Writable(WritableState), Error(ChannelError), Ended(ChannelStatusSeriesId), } @@ -308,36 +331,36 @@ impl ChannelState { ChannelState::Init(..) => ChannelConnectedInfo::Disconnected, ChannelState::Creating { .. } => ChannelConnectedInfo::Connecting, ChannelState::MakingSeriesWriter(_) => ChannelConnectedInfo::Connecting, - ChannelState::Created(_) => ChannelConnectedInfo::Connected, + ChannelState::Writable(_) => ChannelConnectedInfo::Connected, ChannelState::Error(_) => ChannelConnectedInfo::Error, ChannelState::Ended(_) => ChannelConnectedInfo::Disconnected, }; let scalar_type = match self { - ChannelState::Created(s) => Some(s.writer.scalar_type().clone()), + ChannelState::Writable(s) => Some(s.writer.scalar_type().clone()), _ => None, }; let shape = match self { - ChannelState::Created(s) => Some(s.writer.shape().clone()), + ChannelState::Writable(s) => Some(s.writer.shape().clone()), _ => None, }; let ts_created = match self { - ChannelState::Created(s) => Some(s.channel.ts_created.clone()), + ChannelState::Writable(s) => Some(s.channel.ts_created.clone()), _ => None, }; let ts_event_last = match self { - ChannelState::Created(s) => Some(s.channel.ts_alive_last), + ChannelState::Writable(s) => Some(s.channel.ts_alive_last), _ => None, }; let recv_count = match self { - ChannelState::Created(s) => Some(s.channel.recv_count), + ChannelState::Writable(s) => Some(s.channel.recv_count), _ => None, }; let recv_bytes = match self { - ChannelState::Created(s) => Some(s.channel.recv_bytes), + ChannelState::Writable(s) => Some(s.channel.recv_bytes), _ => None, }; let item_recv_ivl_ema = match self { - ChannelState::Created(s) => { + ChannelState::Writable(s) => { let ema = s.channel.item_recv_ivl_ema.ema(); if ema.update_count() == 0 { None @@ -348,7 +371,7 @@ impl ChannelState { _ => None, }; let series = match self { - ChannelState::Created(s) => Some(s.writer.sid()), + ChannelState::Writable(s) => Some(s.writer.sid()), _ => None, }; let interest_score = 1. / item_recv_ivl_ema.unwrap_or(1e10).max(1e-6).min(1e10); @@ -373,7 +396,7 @@ impl ChannelState { ChannelState::Init(cssid) => cssid.clone(), ChannelState::Creating(st) => st.cssid.clone(), ChannelState::MakingSeriesWriter(st) => st.channel.cssid.clone(), - ChannelState::Created(st) => st.channel.cssid.clone(), + ChannelState::Writable(st) => st.channel.cssid.clone(), ChannelState::Error(e) => match e { ChannelError::CreateChanFail(cssid) => cssid.clone(), }, @@ -390,7 +413,7 @@ enum CaConnState { Pin, tokio::time::error::Elapsed>> + Send>>, ), Init, - Listen, + Handshake, PeerReady, Wait(Pin + Send>>), Shutdown, @@ -403,7 +426,7 @@ impl fmt::Debug for CaConnState { Self::Unconnected(since) => fmt.debug_tuple("Unconnected").field(since).finish(), Self::Connecting(since, addr, _) => fmt.debug_tuple("Connecting").field(since).field(addr).finish(), Self::Init => write!(fmt, "Init"), - Self::Listen => write!(fmt, "Listen"), + Self::Handshake => write!(fmt, "Handshake"), Self::PeerReady => write!(fmt, "PeerReady"), Self::Wait(_) => fmt.debug_tuple("Wait").finish(), Self::Shutdown => write!(fmt, "Shutdown"), @@ -420,6 +443,7 @@ fn wait_fut(dt: u64) -> Pin + Send>> { struct CidStore { cnt: u32, rng: Xoshiro128PlusPlus, + reg: HashSet, } impl CidStore { @@ -427,6 +451,7 @@ impl CidStore { Self { cnt: 0, rng: Xoshiro128PlusPlus::seed_from_u64(seed as _), + reg: HashSet::new(), } } @@ -440,13 +465,15 @@ impl CidStore { } fn next(&mut self) -> Cid { + if true { + let cnt = self.cnt; + self.cnt += 1; + return Cid(cnt); + } let c = self.cnt << 8; self.cnt += 1; let r = self.rng.next_u32(); - let r = r ^ (r >> 8); - let r = r ^ (r >> 8); - let r = r ^ (r >> 8); - Cid(c | r) + Cid(c | ((r ^ (r >> 8) ^ (r >> 16) ^ (r >> 24)) & 0xff)) } } @@ -473,6 +500,11 @@ impl SubidStore { } fn next(&mut self) -> Subid { + if true { + let cnt = self.cnt; + self.cnt += 1; + return Subid(cnt); + } let c = self.cnt << 8; self.cnt += 1; let r = self.rng.next_u32(); @@ -493,7 +525,7 @@ pub type CmdResTx = Sender>; #[derive(Debug)] pub enum ConnCommandKind { ChannelAdd(String, ChannelStatusSeriesId), - ChannelRemove(String), + ChannelClose(String), Shutdown, } @@ -511,10 +543,10 @@ impl ConnCommand { } } - pub fn channel_remove(name: String) -> Self { + pub fn channel_close(name: String) -> Self { Self { id: Self::make_id(), - kind: ConnCommandKind::ChannelRemove(name), + kind: ConnCommandKind::ChannelClose(name), } } @@ -613,6 +645,7 @@ pub struct CaConn { // btree because require order: cid_by_name: BTreeMap, cid_by_subid: HashMap, + cid_by_sid: HashMap, name_by_cid: HashMap, channel_status_emit_last: Instant, tick_last_writer: Instant, @@ -643,6 +676,8 @@ pub struct CaConn { writer_rx: Pin)>>>, tmp_ts_poll: SystemTime, poll_tsnow: Instant, + ioid: u32, + read_ioids: HashMap, } impl Drop for CaConn { @@ -680,6 +715,7 @@ impl CaConn { channels: HashMap::new(), cid_by_name: BTreeMap::new(), cid_by_subid: HashMap::new(), + cid_by_sid: HashMap::new(), name_by_cid: HashMap::new(), channel_status_emit_last: tsnow, tick_last_writer: tsnow, @@ -709,6 +745,8 @@ impl CaConn { writer_rx: Box::pin(writer_rx), tmp_ts_poll: SystemTime::now(), poll_tsnow: tsnow, + ioid: 100, + read_ioids: HashMap::new(), } } @@ -835,8 +873,8 @@ impl CaConn { } fn cmd_channel_state(&self, name: String) { - let res = match self.cid_by_name.get(&name) { - Some(cid) => match self.channels.get(cid) { + let res = match self.cid_by_name(&name) { + Some(cid) => match self.channels.get(&cid) { Some(state) => Some(state.to_info(state.cssid(), self.remote_addr_dbg.clone())), None => None, }, @@ -869,8 +907,8 @@ impl CaConn { self.channel_add(name, cssid); } - fn cmd_channel_remove(&mut self, name: String) { - self.channel_remove(name); + fn cmd_channel_close(&mut self, name: String) { + self.channel_close(name); // TODO return the result //self.stats.caconn_command_can_not_reply.inc(); } @@ -887,7 +925,6 @@ impl CaConn { fn cmd_save_conn_info(&mut self) { let res = self.emit_channel_info_insert_items(); - let res = res.is_ok(); // TODO return the result } @@ -906,8 +943,8 @@ impl CaConn { self.cmd_channel_add(name, cssid); Ok(Ready(Some(()))) } - ConnCommandKind::ChannelRemove(name) => { - self.cmd_channel_remove(name); + ConnCommandKind::ChannelClose(name) => { + self.cmd_channel_close(name); Ok(Ready(Some(()))) } ConnCommandKind::Shutdown => { @@ -933,7 +970,7 @@ impl CaConn { let rx = self.writer_rx.as_mut(); match rx.poll_next(cx) { Ready(Some(res)) => { - debug!("handle_writer_establish_result recv {}", self.remote_addr_dbg); + trace!("handle_writer_establish_result recv {}", self.remote_addr_dbg); let jobid = res.0; // by convention: let cid = Cid(jobid.0 as _); @@ -955,7 +992,7 @@ impl CaConn { } fn handle_writer_establish_inner(&mut self, cid: Cid, writer: SeriesWriter) -> Result<(), Error> { - debug!("handle_writer_establish_inner {cid:?}"); + trace!("handle_writer_establish_inner {cid:?}"); // At this point we have created the channel and created a writer for that type and sid. // We do not yet monitor. // TODO main objectives now: @@ -973,37 +1010,40 @@ impl CaConn { }); self.insert_item_queue.push_back(item); } - { - let data_type = writer.scalar_type().to_ca_id()?; - if data_type > 6 { - error!( - "data type of series unexpected {} {:?}", - data_type, - writer.scalar_type() - ); - } + let subid = { let subid = self.subid_store.next(); self.cid_by_subid.insert(subid, cid); - // TODO convert first to CaDbrType, set to `Time`, then convert to ix: - let data_type_asked = data_type + 14; - debug!("send out EventAdd for {cid:?}"); + trace!( + "new {:?} for {:?} chst {:?} {:?}", + subid, + cid, + st2.channel.cid, + st2.channel.sid + ); + subid + }; + { + trace!("send out EventAdd for {cid:?}"); let ty = CaMsgTy::EventAdd(EventAdd { sid: st2.channel.sid.to_u32(), - data_type: data_type_asked, - data_count: writer.shape().to_ca_count()? as u16, + data_type: st2.channel.ca_dbr_type, + data_count: st2.channel.ca_dbr_count, subid: subid.to_u32(), }); let msg = CaMsg::from_ty_ts(ty, self.poll_tsnow); let proto = self.proto.as_mut().unwrap(); proto.push_out(msg); } - let created_state = CreatedState22 { + let created_state = WritableState { tsbeg: self.poll_tsnow, channel: std::mem::replace(&mut st2.channel, CreatedState::dummy()), writer, - reading: ReadingState::EnableMonitoring(EnableMonitoringState { tsbeg: self.poll_tsnow }), + reading: ReadingState::EnableMonitoring(EnableMonitoringState { + tsbeg: self.poll_tsnow, + subid, + }), }; - *chst = ChannelState::Created(created_state); + *chst = ChannelState::Writable(created_state); Ok(()) } else { warn!("TODO handle_series_lookup_result channel in bad state, reset"); @@ -1020,17 +1060,13 @@ impl CaConn { } pub fn channel_add(&mut self, channel: String, cssid: ChannelStatusSeriesId) { - if self.cid_by_name.contains_key(&channel) { + if self.cid_by_name(&channel).is_some() { + // TODO count for metrics return; } - let cid = Self::cid_by_name_expl( - &channel, - &mut self.cid_by_name, - &mut self.name_by_cid, - &mut self.cid_store, - ); + let cid = self.cid_by_name_or_insert(&channel); if self.channels.contains_key(&cid) { - error!("logic error"); + error!("logic error channel already exists {channel}"); } else { self.channels.insert(cid, ChannelState::Init(cssid)); // TODO do not count, use separate queue for those channels. @@ -1038,59 +1074,35 @@ impl CaConn { } } - pub fn channel_remove(&mut self, channel: String) { - Self::channel_remove_expl( - channel, - &mut self.channels, - &mut self.cid_by_name, - &mut self.name_by_cid, - &mut self.cid_store, - ) + pub fn channel_close(&mut self, name: String) { + error!("TODO actually cause the channel to get closed and removed {}", name); + } + + pub fn channel_remove(&mut self, name: String) { + if let Some(cid) = self.cid_by_name(&name) { + self.channel_remove_by_cid(cid); + } else { + warn!("channel_remove does not exist {name}"); + } } fn channel_remove_by_cid(&mut self, cid: Cid) { - self.channels.remove(&cid); + self.cid_by_name.retain(|_, v| *v != cid); self.name_by_cid.remove(&cid); - self.cid_by_name.retain(|_, v| v == &cid); + self.channels.remove(&cid); } - fn channel_remove_expl( - name: String, - channels: &mut HashMap, - cid_by_name: &mut BTreeMap, - name_by_cid: &mut HashMap, - cid_store: &mut CidStore, - ) { - let cid = Self::cid_by_name_expl(&name, cid_by_name, name_by_cid, cid_store); - if channels.contains_key(&cid) { - warn!("TODO actually cause the channel to get closed and removed {}", name); - } - { - let a: Vec<_> = cid_by_name - .iter() - .filter(|x| x.1 == &cid) - .map(|x| x.0.clone()) - .collect(); - for x in a { - cid_by_name.remove(&x); - } - } - channels.remove(&cid); - name_by_cid.remove(&cid); + fn cid_by_name(&self, name: &str) -> Option { + self.cid_by_name.get(name).map(Clone::clone) } - fn cid_by_name_expl( - name: &str, - cid_by_name: &mut BTreeMap, - name_by_cid: &mut HashMap, - cid_store: &mut CidStore, - ) -> Cid { - if let Some(cid) = cid_by_name.get(name) { + fn cid_by_name_or_insert(&mut self, name: &str) -> Cid { + if let Some(cid) = self.cid_by_name.get(name) { *cid } else { - let cid = cid_store.next(); - cid_by_name.insert(name.into(), cid); - name_by_cid.insert(cid, name.into()); + let cid = self.cid_store.next(); + self.cid_by_name.insert(name.into(), cid); + self.name_by_cid.insert(cid, name.into()); cid } } @@ -1123,7 +1135,7 @@ impl CaConn { ChannelState::MakingSeriesWriter(st) => { *chst = ChannelState::Ended(st.channel.cssid.clone()); } - ChannelState::Created(st2) => { + ChannelState::Writable(st2) => { let cssid = st2.channel.cssid.clone(); // TODO should call the proper channel-close handler which in turn emits the status item. // Make sure I record the reason for the "Close": user command, IOC error, etc.. @@ -1178,7 +1190,7 @@ impl CaConn { let mut not_alive_count = 0; for (_, st) in &self.channels { match st { - ChannelState::Created(st2) => { + ChannelState::Writable(st2) => { if tsnow.duration_since(st2.channel.ts_alive_last) >= Duration::from_millis(10000) { warn!("TODO assume channel not alive because nothing received, but should do CAGET"); not_alive_count += 1; @@ -1208,7 +1220,7 @@ impl CaConn { ChannelState::MakingSeriesWriter(..) => { // TODO ? } - ChannelState::Created(st) => { + ChannelState::Writable(st) => { let crst = &mut st.channel; // TODO if we don't wave a series id yet, dont' save? write-ampl. let msp = info_store_msp_from_time(timenow.clone()); @@ -1233,6 +1245,49 @@ impl CaConn { Ok(()) } + fn transition_to_polling(&mut self, subid: Subid, tsnow: Instant) -> Result<(), Error> { + let cid = if let Some(x) = self.cid_by_subid.get(&subid) { + *x + } else { + let e = Error::with_msg_no_trace("unknown {subid:?}"); + error!("{e}"); + return Err(e); + }; + let ch_s = if let Some(x) = self.channels.get_mut(&cid) { + x + } else { + // TODO return better as error and let caller decide (with more structured errors) + // TODO + // When removing a channel, keep it in "closed" btree for some time because messages can + // still arrive from all buffers. + // If we don't have it in the "closed" btree, then close connection to the IOC and count + // as logic error. + // Close connection to the IOC. Cout as logic error. + let e = Error::with_msg_no_trace(format!( + "TODO handle_event_add_res can not find channel for {cid:?} {subid:?}" + )); + error!("{e}"); + return Err(e); + }; + if let ChannelState::Writable(st2) = ch_s { + // TODO emit + // TODO for cancel, must supply again the same DBR type and count as in the EventAdd. + let ty = CaMsgTy::EventCancel(EventCancel { + data_type: st2.channel.ca_dbr_type, + data_count: st2.channel.ca_dbr_count, + sid: st2.channel.sid.to_u32(), + subid: subid.to_u32(), + }); + let msg = CaMsg::from_ty_ts(ty, self.poll_tsnow); + let proto = self.proto.as_mut().unwrap(); + proto.push_out(msg); + st2.reading = ReadingState::StopMonitoringForPolling(StopMonitoringForPollingState { tsbeg: tsnow }); + } else { + warn!("can not transition to polling, channel not established"); + } + Ok(()) + } + fn handle_event_add_res(&mut self, ev: proto::EventAddRes, tsnow: Instant) -> Result<(), Error> { let subid = Subid(ev.subid); // TODO handle subid-not-found which can also be peer error: @@ -1259,33 +1314,120 @@ impl CaConn { }; // debug!("handle_event_add_res {ev:?}"); match ch_s { - ChannelState::Created(st) => match &mut st.reading { - ReadingState::EnableMonitoring(st2) => { - let dt = st2.tsbeg.elapsed().as_secs_f32(); - debug!("change to Monitoring after dt {dt:.0} ms"); - st.reading = ReadingState::Monitoring(MonitoringState { tsbeg: tsnow }); - Self::event_add_ingest( - ev, - &mut st.channel, - &mut st.writer, - &mut self.insert_item_queue, - tsnow, - self.tmp_ts_poll, - self.stats.as_ref(), - )?; + ChannelState::Writable(st) => { + let stnow = self.tmp_ts_poll; + let crst = &mut st.channel; + let stwin_ts = stnow.duration_since(std::time::UNIX_EPOCH).unwrap().as_secs() / 10; + if crst.stwin_ts != stwin_ts { + crst.stwin_ts = stwin_ts; + crst.stwin_count = 0; } - ReadingState::Monitoring(_st2) => { - let crst = &mut st.channel; - let writer = &mut st.writer; - let iiq = &mut self.insert_item_queue; - let stats = self.stats.as_ref(); - Self::event_add_ingest(ev, crst, writer, iiq, tsnow, self.tmp_ts_poll, stats)?; + { + crst.stwin_count += 1; + crst.stwin_bytes += ev.payload_len; + if crst.stwin_count > 5 || crst.stwin_bytes > 1024 * 1024 * 1 { + let subid = match &mut st.reading { + ReadingState::EnableMonitoring(x) => Some(x.subid.clone()), + ReadingState::Monitoring(x) => Some(x.subid.clone()), + ReadingState::StopMonitoringForPolling(_) => None, + ReadingState::Polling(_) => None, + }; + if let Some(subid) = subid { + self.transition_to_polling(subid, tsnow)?; + } + return Ok(()); + } } + match &mut st.reading { + ReadingState::EnableMonitoring(st2) => { + let dt = st2.tsbeg.elapsed().as_secs_f32(); + trace!("change to Monitoring after dt {dt:.0} ms"); + st.reading = ReadingState::Monitoring(MonitoringState { + tsbeg: tsnow, + subid: st2.subid, + }); + let crst = &mut st.channel; + let writer = &mut st.writer; + let iiq = &mut self.insert_item_queue; + let stats = self.stats.as_ref(); + Self::event_add_ingest(ev.payload_len, ev.value, crst, writer, iiq, tsnow, stnow, stats)?; + } + ReadingState::Monitoring(_st2) => { + let crst = &mut st.channel; + let writer = &mut st.writer; + let iiq = &mut self.insert_item_queue; + let stats = self.stats.as_ref(); + Self::event_add_ingest(ev.payload_len, ev.value, crst, writer, iiq, tsnow, stnow, stats)?; + } + ReadingState::StopMonitoringForPolling(st2) => { + // TODO count for metrics + if st2.tsbeg + Duration::from_millis(2000) < tsnow { + error!("TODO handle_event_add_res handle StopMonitoringForPolling"); + std::process::exit(1); + } + } + ReadingState::Polling(st2) => { + // TODO count for metrics + if st2.tsbeg + Duration::from_millis(2000) < tsnow { + error!("TODO handle_event_add_res handle Polling"); + std::process::exit(1); + } + } + } + } + _ => { + // TODO count instead of print + error!("unexpected state: EventAddRes while having {ch_s:?}"); + } + } + Ok(()) + } + + fn handle_event_add_res_empty(&mut self, ev: proto::EventAddResEmpty, tsnow: Instant) -> Result<(), Error> { + let subid = Subid(ev.subid); + // TODO handle subid-not-found which can also be peer error: + let cid = if let Some(x) = self.cid_by_subid.get(&subid) { + *x + } else { + warn!("can not find cid for subid {subid:?}"); + // return Err(Error::with_msg_no_trace()); + return Ok(()); + }; + let ch_s = if let Some(x) = self.channels.get_mut(&cid) { + x + } else { + // TODO return better as error and let caller decide (with more structured errors) + warn!("TODO handle_event_add_res can not find channel for {cid:?} {subid:?}"); + // TODO + // When removing a channel, keep it in "closed" btree for some time because messages can + // still arrive from all buffers. + // If we don't have it in the "closed" btree, then close connection to the IOC and count + // as logic error. + // Close connection to the IOC. Cout as logic error. + // return Err(Error::with_msg_no_trace()); + return Ok(()); + }; + // debug!("handle_event_add_res {ev:?}"); + match ch_s { + ChannelState::Writable(st) => match &mut st.reading { ReadingState::StopMonitoringForPolling(..) => { - error!("TODO handle_event_add_res handle StopMonitoringForPolling"); + st.reading = ReadingState::Polling(PollingState { + tsbeg: tsnow, + poll_ivl: Duration::from_millis(2000), + tick: PollTickState::Idle(tsnow), + }); + } + ReadingState::EnableMonitoring(..) => { + let name = self.name_by_cid(cid); + warn!("received event-cancel but channel {name:?} in wrong state"); + } + ReadingState::Monitoring(..) => { + let name = self.name_by_cid(cid); + warn!("received event-cancel but channel {name:?} in wrong state"); } ReadingState::Polling(..) => { - error!("TODO handle_event_add_res handle Polling"); + let name = self.name_by_cid(cid); + warn!("received event-cancel but channel {name:?} in wrong state"); } }, _ => { @@ -1296,8 +1438,87 @@ impl CaConn { Ok(()) } + fn handle_read_notify_res(&mut self, ev: proto::ReadNotifyRes, tsnow: Instant) -> Result<(), Error> { + // trace!("handle_read_notify_res {ev:?}"); + // TODO can not rely on the SID in the response. + let sid_ev = Sid(ev.sid); + let ioid = Ioid(ev.ioid); + if let Some(cid) = self.read_ioids.get(&ioid) { + let ch_s = if let Some(x) = self.channels.get_mut(&cid) { + x + } else { + warn!("handle_read_notify_res can not find channel for {cid:?} {ioid:?}"); + return Ok(()); + }; + match ch_s { + ChannelState::Writable(st) => { + if st.channel.sid != sid_ev { + // TODO count for metrics + // warn!("mismatch in ReadNotifyRes {:?} {:?}", st.channel.sid, sid_ev); + } + let stnow = self.tmp_ts_poll; + let crst = &mut st.channel; + let stwin_ts = stnow.duration_since(std::time::UNIX_EPOCH).unwrap().as_secs() / 1; + if crst.stwin_ts != stwin_ts { + crst.stwin_ts = stwin_ts; + crst.stwin_count = 0; + } + { + crst.stwin_count += 1; + crst.stwin_bytes += ev.payload_len; + } + match &mut st.reading { + ReadingState::Polling(st2) => match &mut st2.tick { + PollTickState::Idle(_st3) => { + warn!("received ReadNotifyRes while in Wait state"); + } + PollTickState::Wait(st3, ioid) => { + let dt = tsnow.saturating_duration_since(*st3); + self.stats.caget_lat().ingest((1e3 * dt.as_secs_f32()) as u32); + self.read_ioids.remove(ioid); + // TODO maintain histogram of read-notify latencies + st2.tick = PollTickState::Idle(tsnow); + let crst = &mut st.channel; + let writer = &mut st.writer; + let iiq = &mut self.insert_item_queue; + let stats = self.stats.as_ref(); + Self::event_add_ingest( + ev.payload_len, + ev.value, + crst, + writer, + iiq, + tsnow, + stnow, + stats, + )?; + } + }, + ReadingState::EnableMonitoring(..) => { + error!("TODO handle_read_notify_res handle EnableMonitoring"); + } + ReadingState::Monitoring(..) => { + error!("TODO handle_read_notify_res handle Monitoring"); + } + ReadingState::StopMonitoringForPolling(..) => { + error!("TODO handle_read_notify_res handle StopMonitoringForPolling"); + } + } + } + _ => { + // TODO count instead of print + error!("unexpected state: ReadNotifyRes while having {ch_s:?}"); + } + } + } else { + warn!("unknown {ioid:?}"); + } + Ok(()) + } + fn event_add_ingest( - ev: proto::EventAddRes, + payload_len: u32, + value: CaEventValue, crst: &mut CreatedState, writer: &mut SeriesWriter, iiq: &mut VecDeque, @@ -1308,14 +1529,14 @@ impl CaConn { crst.ts_alive_last = tsnow; crst.item_recv_ivl_ema.tick(tsnow); crst.recv_count += 1; - crst.recv_bytes += ev.payload_len as u64; + crst.recv_bytes += payload_len as u64; let series = writer.sid(); // TODO should attach these counters already to Writable state. let ts_local = { let epoch = stnow.duration_since(std::time::UNIX_EPOCH).unwrap(); epoch.as_secs() * SEC + epoch.subsec_nanos() as u64 }; - let ts = ev.value.ts; + let ts = value.ts; let ts_diff = ts.abs_diff(ts_local); stats.ca_ts_off().ingest((ts_diff / MS) as u32); if tsnow >= crst.insert_next_earliest { @@ -1328,9 +1549,9 @@ impl CaConn { let dt = (ivl_min - ema).max(0.) / em.k(); crst.insert_next_earliest = tsnow + Duration::from_micros((dt * 1e6) as u64); } - Self::check_ev_value_data(&ev.value.data, writer.scalar_type())?; + Self::check_ev_value_data(&value.data, writer.scalar_type())?; { - let val: DataValue = ev.value.data.into(); + let val: DataValue = value.data.into(); writer .write(TsNano::from_ns(ts), TsNano::from_ns(ts_local), val, iiq) .map_err(|e| Error::from_string(e))?; @@ -1405,7 +1626,7 @@ impl CaConn { Pending Ready(no-more-work, something-was-done, error) */ - fn handle_conn_listen(&mut self, cx: &mut Context) -> Poll>> { + fn handle_handshake(&mut self, cx: &mut Context) -> Poll>> { use Poll::*; match self.proto.as_mut().unwrap().poll_next_unpin(cx) { Ready(Some(k)) => match k { @@ -1449,7 +1670,8 @@ impl CaConn { } } - fn check_channels_state_init(&mut self, tsnow: Instant, do_wake_again: &mut bool) -> Result<(), Error> { + fn check_channels_state_init(&mut self, tsnow: Instant, cx: &mut Context) -> Result<(), Error> { + let mut do_wake_again = false; // TODO profile, efficient enough? if self.init_state_count == 0 { return Ok(()); @@ -1473,7 +1695,7 @@ impl CaConn { }), tsnow, ); - *do_wake_again = true; + do_wake_again = true; self.proto.as_mut().unwrap().push_out(msg); // TODO handle not-found error: let ch_s = self.channels.get_mut(&cid).unwrap(); @@ -1487,6 +1709,61 @@ impl CaConn { _ => {} } } + if do_wake_again { + cx.waker().wake_by_ref(); + } + Ok(()) + } + + fn check_channels_state_poll(&mut self, tsnow: Instant, cx: &mut Context) -> Result<(), Error> { + let mut do_wake_again = false; + let channels = &mut self.channels; + for (_k, v) in channels { + match v { + ChannelState::Init(_) => {} + ChannelState::Creating(_) => {} + ChannelState::MakingSeriesWriter(_) => {} + ChannelState::Writable(st2) => match &mut st2.reading { + ReadingState::EnableMonitoring(_) => {} + ReadingState::Monitoring(_) => {} + ReadingState::StopMonitoringForPolling(_) => {} + ReadingState::Polling(st3) => match &mut st3.tick { + PollTickState::Idle(x) => { + if *x + st3.poll_ivl <= tsnow { + let ioid = Ioid(self.ioid); + self.ioid = self.ioid.wrapping_add(1); + self.read_ioids.insert(ioid, st2.channel.cid.clone()); + let msg = CaMsg::from_ty_ts( + CaMsgTy::ReadNotify(ReadNotify { + data_type: st2.channel.ca_dbr_type, + data_count: st2.channel.ca_dbr_count, + sid: st2.channel.sid.to_u32(), + ioid: ioid.0, + }), + tsnow, + ); + do_wake_again = true; + self.proto.as_mut().unwrap().push_out(msg); + st3.tick = PollTickState::Wait(tsnow, ioid); + } + } + PollTickState::Wait(x, ioid) => { + if *x + Duration::from_millis(10000) <= tsnow { + self.read_ioids.remove(ioid); + self.stats.caget_timeout().inc(); + // warn!("channel caget timeout"); + // std::process::exit(1); + } + } + }, + }, + ChannelState::Error(_) => {} + ChannelState::Ended(_) => {} + } + } + if do_wake_again { + cx.waker().wake_by_ref(); + } Ok(()) } @@ -1496,8 +1773,6 @@ impl CaConn { use Poll::*; let mut ts1 = Instant::now(); // TODO unify with Listen state where protocol gets polled as well. - let mut do_wake_again = false; - self.check_channels_state_init(ts1, &mut do_wake_again)?; let ts2 = Instant::now(); self.stats .time_check_channels_state_init @@ -1523,10 +1798,10 @@ impl CaConn { } CaMsgTy::CreateChanRes(k) => { self.handle_create_chan_res(k, tsnow)?; - do_wake_again = true; + cx.waker().wake_by_ref(); } CaMsgTy::EventAddRes(ev) => { - trace!("got EventAddRes: {ev:?}"); + trace!("got EventAddRes {:?} cnt {}", camsg.ts, ev.data_count); self.stats.event_add_res_recv.inc(); let res = Self::handle_event_add_res(self, ev, tsnow); let ts2 = Instant::now(); @@ -1535,6 +1810,16 @@ impl CaConn { .add((ts2.duration_since(tsnow) * MS as u32).as_secs()); res?; } + CaMsgTy::EventAddResEmpty(ev) => { + trace!("got EventAddResEmpty {:?}", camsg.ts); + let res = Self::handle_event_add_res_empty(self, ev, tsnow); + let ts2 = Instant::now(); + self.stats + .time_handle_event_add_res + .add((ts2.duration_since(tsnow) * MS as u32).as_secs()); + res?; + } + CaMsgTy::ReadNotifyRes(ev) => Self::handle_read_notify_res(self, ev, tsnow)?, CaMsgTy::Echo => { // let addr = &self.remote_addr_dbg; if let Some(started) = self.ioc_ping_start { @@ -1612,10 +1897,6 @@ impl CaConn { } Pending => Pending, }; - if do_wake_again { - // TODO remove the need for this: - cx.waker().wake_by_ref(); - } res.map_err(|e| Error::from(e.to_string())) } @@ -1624,6 +1905,13 @@ impl CaConn { let sid = Sid(k.sid); let channels = &mut self.channels; let name_by_cid = &self.name_by_cid; + // TODO handle cid-not-found which can also indicate peer error. + let name = if let Some(x) = name_by_cid.get(&cid) { + x.to_string() + } else { + return Err(Error::with_msg_no_trace(format!("no name for {cid:?}"))); + }; + trace!("handle_create_chan_res {k:?} {name:?}"); // TODO handle not-found error: let ch_s = channels.get_mut(&cid).unwrap(); let cssid = match ch_s { @@ -1635,22 +1923,21 @@ impl CaConn { return Err(e); } }; - // TODO handle cid-not-found which can also indicate peer error. - let name = if let Some(x) = name_by_cid.get(&cid) { - x.to_string() - } else { - return Err(Error::with_msg_no_trace(format!("no name for {cid:?}"))); - }; - trace3!("CreateChanRes {name:?}"); + self.cid_by_sid.insert(sid, cid); if k.data_type > 6 { error!("CreateChanRes with unexpected data_type {}", k.data_type); } + // Ask for DBR_TIME_... + let ca_dbr_type = k.data_type + 14; let scalar_type = ScalarType::from_ca_id(k.data_type)?; let shape = Shape::from_ca_count(k.data_count)?; let channel = CreatedState { cssid, cid, sid, + ca_dbr_type, + // TODO for extended epics messages, can be u32! + ca_dbr_count: k.data_count as u16, ts_created: tsnow, ts_alive_last: tsnow, ts_msp_last: 0, @@ -1665,12 +1952,11 @@ impl CaConn { info_store_msp_last: info_store_msp_from_time(self.tmp_ts_poll), recv_count: 0, recv_bytes: 0, + stwin_ts: 0, + stwin_count: 0, + stwin_bytes: 0, }; *ch_s = ChannelState::MakingSeriesWriter(MakingSeriesWriterState { tsbeg: tsnow, channel }); - let name = self - .name_by_cid(cid) - .ok_or_else(|| Error::from_string(format!("no name for cid {cid:?}")))?; - // info!("MonitoringState::AddingEvent cssid {cssid:?} {name:?} {cid:?}"); let job = EstablishWorkerJob::new( JobId(cid.0 as _), self.backend.clone(), @@ -1786,13 +2072,12 @@ impl CaConn { proto.push_out(msg); let msg = CaMsg::from_ty_ts(CaMsgTy::HostName(hostname), tsnow); proto.push_out(msg); - self.state = CaConnState::Listen; + self.state = CaConnState::Handshake; Ok(Ready(Some(()))) } - CaConnState::Listen => { - trace4!("Listen"); + CaConnState::Handshake => { match { - let res = self.handle_conn_listen(cx); + let res = self.handle_handshake(cx); res } { Ready(Some(Ok(()))) => Ok(Ready(Some(()))), @@ -1870,30 +2155,28 @@ impl CaConn { fn poll_own_ticker(mut self: Pin<&mut Self>, cx: &mut Context) -> Result, Error> { use Poll::*; match self.ticker.poll_unpin(cx) { - Ready(()) => { - match self.as_mut().handle_own_ticker(cx) { - Ok(_) => { - if !self.is_shutdown() { - self.ticker = Self::new_self_ticker(); - let _ = self.ticker.poll_unpin(cx); - // cx.waker().wake_by_ref(); - } - Ok(Pending) - } - Err(e) => { - error!("handle_own_ticker {e}"); - self.trigger_shutdown(ChannelStatusClosedReason::InternalError); - Err(e) - } + Ready(()) => match self.as_mut().handle_own_ticker(cx) { + Ok(_) => Ok(Pending), + Err(e) => { + error!("handle_own_ticker {e}"); + self.trigger_shutdown(ChannelStatusClosedReason::InternalError); + Err(e) } - } + }, Pending => Ok(Pending), } } - fn handle_own_ticker(mut self: Pin<&mut Self>, _cx: &mut Context) -> Result<(), Error> { + fn handle_own_ticker(mut self: Pin<&mut Self>, cx: &mut Context) -> Result<(), Error> { // debug!("tick CaConn {}", self.remote_addr_dbg); let tsnow = Instant::now(); + if !self.is_shutdown() { + self.ticker = Self::new_self_ticker(); + let _ = self.ticker.poll_unpin(cx); + // cx.waker().wake_by_ref(); + } + self.check_channels_state_init(tsnow, cx)?; + self.check_channels_state_poll(tsnow, cx)?; // TODO add some random variation if self.channel_status_emit_last + Duration::from_millis(3000) <= tsnow { self.channel_status_emit_last = tsnow; @@ -1907,11 +2190,11 @@ impl CaConn { CaConnState::Unconnected(_) => {} CaConnState::Connecting(since, _, _) => { if *since + CONNECTING_TIMEOUT < tsnow { - debug!("CONNECTION TIMEOUT"); + debug!("CONNECTING_TIMEOUT"); } } CaConnState::Init => {} - CaConnState::Listen => {} + CaConnState::Handshake => {} CaConnState::PeerReady => {} CaConnState::Wait(_) => {} CaConnState::Shutdown => {} @@ -1927,6 +2210,7 @@ impl CaConn { let chinfo = ch.to_info(ch.cssid(), self.remote_addr_dbg); channel_statuses.insert(ch.cssid(), chinfo); } + trace!("emit_channel_status {}", channel_statuses.len()); let val = ChannelStatusPartial { channel_statuses }; let item = CaConnEvent { ts: Instant::now(), @@ -1946,7 +2230,7 @@ impl CaConn { fn tick_writers(&mut self) -> Result<(), Error> { for (k, st) in &mut self.channels { - if let ChannelState::Created(st2) = st { + if let ChannelState::Writable(st2) = st { st2.writer .tick(&mut self.insert_item_queue) .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; @@ -1969,17 +2253,19 @@ impl CaConn { self.insert_item_queue.is_empty() && self.storage_insert_sender.is_idle() } - fn attempt_flush_queue( + fn attempt_flush_queue( qu: &mut VecDeque, sp: &mut Pin>>, qu_to_si: FB, loop_max: u32, cx: &mut Context, id: &str, + stats: FS, ) -> Result>, Error> where Q: Unpin, FB: Fn(&mut VecDeque) -> Option, + FS: Fn(&Q), { use Poll::*; let mut have_progress = false; @@ -1994,6 +2280,7 @@ impl CaConn { } if sp.is_idle() { if let Some(item) = qu_to_si(qu) { + stats(&item); sp.as_mut().send_pin(item); } else { break; @@ -2027,11 +2314,11 @@ impl CaConn { // $have is tuple (have_progress, have_pending)) macro_rules! flush_queue { - ($self:expr, $qu:ident, $sp:ident, $batcher:expr, $loop_max:expr, $have:expr, $id:expr, $cx:expr) => { + ($self:expr, $qu:ident, $sp:ident, $batcher:expr, $loop_max:expr, $have:expr, $id:expr, $cx:expr, $stats:expr) => { let obj = $self.as_mut().get_mut(); let qu = &mut obj.$qu; let sp = &mut obj.$sp; - match Self::attempt_flush_queue(qu, sp, $batcher, $loop_max, $cx, $id) { + match Self::attempt_flush_queue(qu, sp, $batcher, $loop_max, $cx, $id, $stats) { Ok(Ready(Some(()))) => { *$have.0 |= true; } @@ -2090,6 +2377,8 @@ impl Stream for CaConn { break Ready(Some(Ok(item))); } + let lts2 = Instant::now(); + match self.as_mut().poll_own_ticker(cx) { Ok(Ready(())) => { have_progress = true; @@ -2114,21 +2403,25 @@ impl Stream for CaConn { } } - let lts2; let lts3; if !self.is_shutdown() { + let stats2 = self.stats.clone(); + let stats_fn = move |item: &VecDeque| { + stats2.iiq_batch_len().ingest(item.len() as u32); + }; flush_queue!( self, insert_item_queue, storage_insert_sender, - send_batched::<48, _>, + send_batched::<256, _>, 32, (&mut have_progress, &mut have_pending), "strg", - cx + cx, + stats_fn ); - lts2 = Instant::now(); + lts3 = Instant::now(); flush_queue!( self, @@ -2138,14 +2431,15 @@ impl Stream for CaConn { 32, (&mut have_progress, &mut have_pending), "wrest", - cx + cx, + |_| {} ); - lts3 = Instant::now(); } else { - lts2 = Instant::now(); lts3 = Instant::now(); } + let lts4 = Instant::now(); + match self.as_mut().handle_writer_establish_result(cx) { Ok(Ready(Some(()))) => { have_progress = true; @@ -2157,6 +2451,8 @@ impl Stream for CaConn { Err(e) => break Ready(Some(Err(e))), } + let lts5 = Instant::now(); + match self.as_mut().handle_conn_command(cx) { Ok(Ready(Some(()))) => { have_progress = true; @@ -2168,7 +2464,7 @@ impl Stream for CaConn { Err(e) => break Ready(Some(Err(e))), } - let lts4 = Instant::now(); + let lts6 = Instant::now(); match self.loop_inner(cx) { Ok(Ready(Some(()))) => { @@ -2185,24 +2481,33 @@ impl Stream for CaConn { } } - let lts5 = Instant::now(); + let lts7 = Instant::now(); - let max = Duration::from_millis(14); + let max = Duration::from_millis(200); let dt = lts2.saturating_duration_since(lts1); if dt > max { - debug!("LONG OPERATION 2 {dt:?}"); + debug!("LONG OPERATION 2 {:.0} ms", 1e3 * dt.as_secs_f32()); } let dt = lts3.saturating_duration_since(lts2); + self.stats.poll_op3_dt().ingest((1e3 * dt.as_secs_f32()) as u32); if dt > max { - debug!("LONG OPERATION 3 {dt:?}"); + debug!("LONG OPERATION 3 {:.0} ms", 1e3 * dt.as_secs_f32()); } let dt = lts4.saturating_duration_since(lts3); if dt > max { - debug!("LONG OPERATION 4 {dt:?}"); + debug!("LONG OPERATION 4 {:.0} ms", 1e3 * dt.as_secs_f32()); } let dt = lts5.saturating_duration_since(lts4); if dt > max { - debug!("LONG OPERATION 5 {dt:?}"); + debug!("LONG OPERATION 5 {:.0} ms", 1e3 * dt.as_secs_f32()); + } + let dt = lts6.saturating_duration_since(lts5); + if dt > max { + debug!("LONG OPERATION 6 {:.0} ms", 1e3 * dt.as_secs_f32()); + } + let dt = lts7.saturating_duration_since(lts6); + if dt > max { + debug!("LONG OPERATION 7 {:.0} ms", 1e3 * dt.as_secs_f32()); } break if self.is_shutdown() { @@ -2251,22 +2556,16 @@ impl Stream for CaConn { } }; }; - if reloops >= 512 { - self.stats.poll_reloops_512().inc(); - } else if reloops >= 64 { - self.stats.poll_reloops_64().inc(); - } else if reloops >= 8 { - self.stats.poll_reloops_8().inc(); - } let poll_ts2 = Instant::now(); let dt = poll_ts2.saturating_duration_since(poll_ts1); - if dt > Duration::from_millis(80) { - warn!("long poll duration {:.0} ms", dt.as_secs_f32() * 1e3) - } else if dt > Duration::from_millis(40) { - info!("long poll duration {:.0} ms", dt.as_secs_f32() * 1e3) - } else if dt > Duration::from_millis(14) { - debug!("long poll duration {:.0} ms", dt.as_secs_f32() * 1e3) - } + self.stats.poll_all_dt().ingest((1e3 * dt.as_secs_f32()) as u32); + self.stats.read_ioids_len().set(self.read_ioids.len() as u64); + let n = match &self.proto { + Some(x) => x.proto_out_len() as u64, + None => 0, + }; + self.stats.proto_out_len().set(n); + self.stats.poll_reloops().ingest(reloops); ret } } diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index 09abafd..d6e4ca6 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -85,7 +85,7 @@ const MAYBE_WRONG_ADDRESS_STAY: Duration = Duration::from_millis(4000); const SEARCH_PENDING_TIMEOUT: Duration = Duration::from_millis(30000); const CHANNEL_HEALTH_TIMEOUT: Duration = Duration::from_millis(30000); const CHANNEL_UNASSIGNED_TIMEOUT: Duration = Duration::from_millis(0); -const CHANNEL_MAX_WITHOUT_HEALTH_UPDATE: usize = 10000; +const CHANNEL_MAX_WITHOUT_HEALTH_UPDATE: usize = 3000000; #[allow(unused)] macro_rules! trace2 { @@ -1398,7 +1398,7 @@ impl CaConnSet { } for (addr, ch) in cmd_remove_channel { if let Some(g) = self.ca_conn_ress.get_mut(&addr) { - let cmd = ConnCommand::channel_remove(ch.id().into()); + let cmd = ConnCommand::channel_close(ch.id().into()); g.cmd_queue.push_back(cmd); } let cmd = ChannelRemove { name: ch.id().into() }; @@ -1756,13 +1756,7 @@ impl Stream for CaConnSet { trace4!("CaConnSet poll done"); let poll_ts2 = Instant::now(); let dt = poll_ts2.saturating_duration_since(poll_ts1); - if dt > Duration::from_millis(80) { - warn!("long poll duration {:.0} ms", dt.as_secs_f32() * 1e3) - } else if dt > Duration::from_millis(40) { - info!("long poll duration {:.0} ms", dt.as_secs_f32() * 1e3) - } else if dt > Duration::from_millis(5) { - debug!("long poll duration {:.0} ms", dt.as_secs_f32() * 1e3) - } + self.stats.poll_all_dt().ingest((1e3 * dt.as_secs_f32()) as u32); ret } } diff --git a/netfetch/src/ca/findioc.rs b/netfetch/src/ca/findioc.rs index a5f5e91..d9fec45 100644 --- a/netfetch/src/ca/findioc.rs +++ b/netfetch/src/ca/findioc.rs @@ -369,6 +369,7 @@ impl FindIocStream { } else { stats.ca_udp_first_msg_not_version().inc(); } + // trace2!("recv {:?} {:?}", src_addr, msgs); let mut res = Vec::new(); if good { for msg in &msgs[1..] { @@ -616,7 +617,7 @@ impl Stream for FindIocStream { match batch.tgts.pop_front() { Some(tgtix) => { Self::serialize_batch(buf1, batch); - debug!("serialized for search {:?}", batch.channels); + trace!("serialized for search {:?}", batch.channels); match self.tgts.get(tgtix) { Some(tgt) => { let tgt = tgt.clone(); diff --git a/netfetch/src/ca/proto.rs b/netfetch/src/ca/proto.rs index accba2e..1f0f773 100644 --- a/netfetch/src/ca/proto.rs +++ b/netfetch/src/ca/proto.rs @@ -111,6 +111,21 @@ pub struct EventAdd { pub subid: u32, } +#[derive(Debug)] +pub struct EventCancel { + pub data_type: u16, + pub data_count: u16, + pub sid: u32, + pub subid: u32, +} + +#[derive(Debug)] +pub struct EventCancelRes { + pub data_type: u16, + pub sid: u32, + pub subid: u32, +} + // TODO Clone is only used for testing purposes and should get removed later. #[derive(Debug, Clone)] pub struct EventAddRes { @@ -118,8 +133,15 @@ pub struct EventAddRes { pub data_count: u32, pub status: u32, pub subid: u32, - pub value: CaEventValue, pub payload_len: u32, + pub value: CaEventValue, +} + +#[derive(Debug, Clone)] +pub struct EventAddResEmpty { + pub data_type: u16, + pub sid: u32, + pub subid: u32, } #[derive(Debug)] @@ -136,6 +158,8 @@ pub struct ReadNotifyRes { pub data_count: u32, pub sid: u32, pub ioid: u32, + pub payload_len: u32, + pub value: CaEventValue, } #[derive(Debug)] @@ -283,6 +307,9 @@ pub enum CaMsgTy { AccessRightsRes(AccessRightsRes), EventAdd(EventAdd), EventAddRes(EventAddRes), + EventAddResEmpty(EventAddResEmpty), + EventCancel(EventCancel), + EventCancelRes(EventCancelRes), ReadNotify(ReadNotify), ReadNotifyRes(ReadNotifyRes), Echo, @@ -306,6 +333,11 @@ impl CaMsgTy { AccessRightsRes(_) => 0x16, EventAdd(_) => 0x01, EventAddRes(_) => 0x01, + // sic: the response to event-cancel is an event-add: + EventAddResEmpty(_) => 0x01, + EventCancel(_) => 0x02, + // sic: the response to event-cancel is an event-add: + EventCancelRes(_) => 0x01, ReadNotify(_) => 0x0f, ReadNotifyRes(_) => 0x0f, Echo => 0x17, @@ -318,7 +350,6 @@ impl CaMsgTy { fn payload_len(&self) -> usize { use CaMsgTy::*; - trace!("payload_len for {self:?}"); match self { Version => 0, VersionRes(_) => 0, @@ -337,6 +368,9 @@ impl CaMsgTy { error!("should not attempt to serialize the response again"); panic!(); } + EventAddResEmpty(_) => 0, + EventCancel(_) => 0, + EventCancelRes(_) => 0, ReadNotify(_) => 0, ReadNotifyRes(_) => { error!("should not attempt to serialize the response again"); @@ -366,6 +400,9 @@ impl CaMsgTy { AccessRightsRes(_) => 0, EventAdd(x) => x.data_type, EventAddRes(x) => x.data_type, + EventAddResEmpty(x) => x.data_type, + EventCancel(x) => x.data_type, + EventCancelRes(x) => x.data_type, ReadNotify(x) => x.data_type, ReadNotifyRes(x) => x.data_type, Echo => 0, @@ -395,6 +432,9 @@ impl CaMsgTy { panic!(); x.data_count as _ } + EventAddResEmpty(_) => 0, + EventCancel(x) => x.data_count, + EventCancelRes(x) => 0, ReadNotify(x) => x.data_count, ReadNotifyRes(x) => { panic!(); @@ -421,6 +461,9 @@ impl CaMsgTy { AccessRightsRes(x) => x.cid, EventAdd(x) => x.sid, EventAddRes(x) => x.status, + EventAddResEmpty(x) => x.sid, + EventCancel(x) => x.sid, + EventCancelRes(x) => x.sid, ReadNotify(x) => x.sid, ReadNotifyRes(x) => x.sid, Echo => 0, @@ -444,6 +487,9 @@ impl CaMsgTy { AccessRightsRes(x) => x.rights, EventAdd(x) => x.subid, EventAddRes(x) => x.subid, + EventAddResEmpty(x) => x.subid, + EventCancel(x) => x.subid, + EventCancelRes(x) => x.subid, ReadNotify(x) => x.ioid, ReadNotifyRes(x) => x.ioid, Echo => 0, @@ -508,6 +554,9 @@ impl CaMsgTy { buf.copy_from_slice(&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0x0e, 0, 0]); } EventAddRes(_) => {} + EventAddResEmpty(_) => {} + EventCancel(_) => {} + EventCancelRes(_) => {} ReadNotify(_) => {} ReadNotifyRes(_) => {} Echo => {} @@ -704,127 +753,115 @@ impl CaMsg { let ty = CaMsgTy::CreateChanFail(CreateChanFail { cid: hi.param1 }); CaMsg::from_ty_ts(ty, tsnow) } - 1 => { - use netpod::Shape; - let ca_dbr_ty = CaDbrType::from_ca_u16(hi.data_type)?; - if let CaDbrMetaType::Time = ca_dbr_ty.meta { - } else { - return Err(Error::MismatchDbrTimeType); - } + 0x01 => { if payload.len() < 12 { - return Err(Error::NotEnoughPayloadTimeMetadata(payload.len())); + if payload.len() == 0 { + if hi.data_count() != 0 { + // TODO according to protocol, this should not happen. Count for metrics. + } + let ty = CaMsgTy::EventAddResEmpty(EventAddResEmpty { + data_type: hi.data_type, + sid: hi.param1, + subid: hi.param2, + }); + return Ok(CaMsg::from_ty_ts(ty, tsnow)); + } else { + error!("EventAddRes but bad header {hi:?}"); + return Err(Error::NotEnoughPayloadTimeMetadata(payload.len())); + } } - let ca_status = u16::from_be_bytes(payload[0..2].try_into().map_err(|_| Error::BadSlice)?); - let ca_severity = u16::from_be_bytes(payload[2..4].try_into().map_err(|_| Error::BadSlice)?); - let ca_secs = u32::from_be_bytes(payload[4..8].try_into().map_err(|_| Error::BadSlice)?); - let ca_nanos = u32::from_be_bytes(payload[8..12].try_into().map_err(|_| Error::BadSlice)?); - let ca_sh = Shape::from_ca_count(hi.data_count() as _).map_err(|_| { - error!("BadCaCount {hi:?}"); - Error::BadCaCount - })?; - let meta_padding = match ca_dbr_ty.meta { - CaDbrMetaType::Plain => 0, - CaDbrMetaType::Status => match ca_dbr_ty.scalar_type { - CaScalarType::I8 => 1, - CaScalarType::I16 => 0, - CaScalarType::I32 => 0, - CaScalarType::F32 => 0, - CaScalarType::F64 => 4, - CaScalarType::Enum => 0, - CaScalarType::String => 0, - }, - CaDbrMetaType::Time => match ca_dbr_ty.scalar_type { - CaScalarType::I8 => 3, - CaScalarType::I16 => 2, - CaScalarType::I32 => 0, - CaScalarType::F32 => 0, - CaScalarType::F64 => 4, - CaScalarType::Enum => 2, - CaScalarType::String => 0, - }, - }; - let valbuf = &payload[12 + meta_padding..]; - let value = match ca_sh { - Shape::Scalar => Self::ca_scalar_value(&ca_dbr_ty.scalar_type, valbuf)?, - Shape::Wave(n) => { - Self::ca_wave_value(&ca_dbr_ty.scalar_type, (n as usize).min(array_truncate), valbuf)? - } - Shape::Image(_, _) => { - error!("Can not handle image from channel access"); - err::todoval() - } - }; - let ts = SEC * (ca_secs as u64 + EPICS_EPOCH_OFFSET) + ca_nanos as u64; - let value = CaEventValue { - ts, - status: ca_status, - severity: ca_severity, - data: value, - }; + let value = Self::extract_ca_data_value(hi, payload, array_truncate)?; let d = EventAddRes { data_type: hi.data_type, data_count: hi.data_count() as _, status: hi.param1, subid: hi.param2, - value, payload_len: hi.payload_len() as u32, + value, }; - // TODO quick test only - if false { - let nn = 4; - let mut blob = vec![0; nn]; - for (i, x) in blob.iter_mut().enumerate() { - *x = i as _; - } - let d = EventAddRes { - // i32 with time and status - data_type: 19, - data_count: nn as u32, - status: hi.param1, - subid: hi.param2, - value: CaEventValue { - ts, - status: ca_status, - severity: ca_severity, - data: CaDataValue::Array(CaDataArrayValue::I32(blob)), - }, - payload_len: hi.payload_len() as u32, - }; - let ty = CaMsgTy::EventAddRes(d); - return Ok(CaMsg::from_ty_ts(ty, tsnow)); - } let ty = CaMsgTy::EventAddRes(d); CaMsg::from_ty_ts(ty, tsnow) } - 15 => { + 0x0f => { if payload.len() == 8 { let v = u64::from_be_bytes(payload.try_into().map_err(|_| Error::BadSlice)?); - info!("Payload as u64: {v}"); + debug!("Payload as u64: {v}"); let v = i64::from_be_bytes(payload.try_into().map_err(|_| Error::BadSlice)?); - info!("Payload as i64: {v}"); + debug!("Payload as i64: {v}"); let v = f64::from_be_bytes(payload.try_into().map_err(|_| Error::BadSlice)?); - info!("Payload as f64: {v}"); - } else { - info!( - "payload string {:?} payload {:?}", - String::from_utf8_lossy(&payload[..payload.len().min(12)]), - &payload[..payload.len().min(12)], - ); + debug!("Payload as f64: {v}"); } - // TODO use different structs for request and response: + let value = Self::extract_ca_data_value(hi, payload, array_truncate)?; let ty = CaMsgTy::ReadNotifyRes(ReadNotifyRes { data_type: hi.data_type, data_count: hi.data_count() as _, sid: hi.param1, ioid: hi.param2, + payload_len: hi.payload_len() as u32, + value, }); CaMsg::from_ty_ts(ty, tsnow) } - 0x17 => CaMsg::from_ty_ts(CaMsgTy::Echo, tsnow), + 0x11 => CaMsg::from_ty_ts(CaMsgTy::Echo, tsnow), x => return Err(Error::CaCommandNotSupported(x)), }; Ok(msg) } + + fn extract_ca_data_value(hi: &HeadInfo, payload: &[u8], array_truncate: usize) -> Result { + use netpod::Shape; + let ca_dbr_ty = CaDbrType::from_ca_u16(hi.data_type)?; + if let CaDbrMetaType::Time = ca_dbr_ty.meta { + } else { + return Err(Error::MismatchDbrTimeType); + } + let ca_status = u16::from_be_bytes(payload[0..2].try_into().map_err(|_| Error::BadSlice)?); + let ca_severity = u16::from_be_bytes(payload[2..4].try_into().map_err(|_| Error::BadSlice)?); + let ca_secs = u32::from_be_bytes(payload[4..8].try_into().map_err(|_| Error::BadSlice)?); + let ca_nanos = u32::from_be_bytes(payload[8..12].try_into().map_err(|_| Error::BadSlice)?); + let ca_sh = Shape::from_ca_count(hi.data_count() as _).map_err(|_| { + error!("BadCaCount {hi:?}"); + Error::BadCaCount + })?; + let meta_padding = match ca_dbr_ty.meta { + CaDbrMetaType::Plain => 0, + CaDbrMetaType::Status => match ca_dbr_ty.scalar_type { + CaScalarType::I8 => 1, + CaScalarType::I16 => 0, + CaScalarType::I32 => 0, + CaScalarType::F32 => 0, + CaScalarType::F64 => 4, + CaScalarType::Enum => 0, + CaScalarType::String => 0, + }, + CaDbrMetaType::Time => match ca_dbr_ty.scalar_type { + CaScalarType::I8 => 3, + CaScalarType::I16 => 2, + CaScalarType::I32 => 0, + CaScalarType::F32 => 0, + CaScalarType::F64 => 4, + CaScalarType::Enum => 2, + CaScalarType::String => 0, + }, + }; + let valbuf = &payload[12 + meta_padding..]; + let value = match ca_sh { + Shape::Scalar => Self::ca_scalar_value(&ca_dbr_ty.scalar_type, valbuf)?, + Shape::Wave(n) => Self::ca_wave_value(&ca_dbr_ty.scalar_type, (n as usize).min(array_truncate), valbuf)?, + Shape::Image(_, _) => { + error!("Can not handle image from channel access"); + err::todoval() + } + }; + let ts = SEC * (ca_secs as u64 + EPICS_EPOCH_OFFSET) + ca_nanos as u64; + let value = CaEventValue { + ts, + status: ca_status, + severity: ca_severity, + data: value, + }; + Ok(value) + } } #[derive(Debug)] @@ -833,12 +870,6 @@ pub enum CaItem { Msg(CaMsg), } -impl CaItem { - fn empty() -> Self { - CaItem::Empty - } -} - #[derive(Clone, Debug)] pub struct HeadInfo { cmdid: u16, @@ -953,6 +984,10 @@ impl CaProto { } } + pub fn proto_out_len(&self) -> usize { + self.out.len() + } + pub fn push_out(&mut self, item: CaMsg) { self.out.push_back(item); } diff --git a/netfetch/src/senderpolling.rs b/netfetch/src/senderpolling.rs index 9186cff..ec82728 100644 --- a/netfetch/src/senderpolling.rs +++ b/netfetch/src/senderpolling.rs @@ -29,6 +29,13 @@ where _pin: PhantomPinned, } +fn _require_unpin(_: &T) {} + +fn _check_unpin() { + let _r: &SenderPolling = err::todoval(); + // _require_unpin(_r); +} + unsafe impl core::marker::Send for SenderPolling where T: core::marker::Send {} impl SenderPolling { diff --git a/serieswriter/src/timebin.rs b/serieswriter/src/timebin.rs index 3285418..0f079f0 100644 --- a/serieswriter/src/timebin.rs +++ b/serieswriter/src/timebin.rs @@ -299,7 +299,7 @@ where c.reset(); let nbins = tb.bins_ready_count(); if nbins >= 1 { - info!("store bins len {} {:?}", nbins, params.series); + trace!("store bins len {} {:?}", nbins, params.series); store_bins(params.series.clone(), tb, iiq, next)?; // if let Some(mut bins) = tb.bins_ready() { // //info!("store bins {bins:?}"); @@ -363,7 +363,7 @@ fn store_bins( } // TODO this must depend on the data type: waveforms need smaller batches - let bins_per_msp = 10000; + let bins_per_msp = 82000; let ts1ms = ts1 / MS; let ts2ms = ts2 / MS; @@ -382,15 +382,15 @@ fn store_bins( avg, }; let item = QueryItem::TimeBinSimpleF32(item); - debug!("push item B ts1ms {ts1ms} bin_len_ms {bin_len_ms} ts_msp {ts_msp} off {off}"); + trace!("push item B ts1ms {ts1ms} bin_len_ms {bin_len_ms} ts_msp {ts_msp} off {off}"); iiq.push_back(item); } } + Ok(()) } else { error!("unexpected container!"); - return Err(Error::PatchUnexpectedContainer); + Err(Error::PatchUnexpectedContainer) } - // TODO feed also the next patch collector for the next coarse resolution. // pc.ingest(bins.as_mut())?; // let noutq = pc.outq_len(); @@ -402,8 +402,6 @@ fn store_bins( // warn!("pc outq len zero"); // Ok(()) // } - - Ok(()) } else { error!("have bins but none returned"); Err(Error::HaveBinsButNoneReturned) diff --git a/serieswriter/src/writer.rs b/serieswriter/src/writer.rs index 93de3af..12dff63 100644 --- a/serieswriter/src/writer.rs +++ b/serieswriter/src/writer.rs @@ -21,8 +21,12 @@ use scywr::iteminsertqueue::QueryItem; use series::series::CHANNEL_STATUS_DUMMY_SCALAR_TYPE; use series::ChannelStatusSeriesId; use series::SeriesId; -use stats::SeriesByChannelStats; +use stats::SeriesWriterEstablishStats; use std::collections::VecDeque; +use std::sync::atomic; +use std::sync::atomic::AtomicU64; +use std::sync::Arc; +use std::time::Duration; use std::time::SystemTime; #[derive(Debug, ThisError)] @@ -213,20 +217,42 @@ pub struct JobId(pub u64); pub struct EstablishWriterWorker { worker_tx: Sender, jobrx: Receiver, + stats: Arc, } impl EstablishWriterWorker { - fn new(worker_tx: Sender, jobrx: Receiver) -> Self { - Self { worker_tx, jobrx } + fn new( + worker_tx: Sender, + jobrx: Receiver, + stats: Arc, + ) -> Self { + Self { + worker_tx, + jobrx, + stats, + } } async fn work(self) { + let cnt = Arc::new(AtomicU64::new(0)); + taskrun::spawn({ + let cnt = cnt.clone(); + async move { + if true { + return Ok::<_, Error>(()); + } + loop { + taskrun::tokio::time::sleep(Duration::from_millis(10000)).await; + debug!("EstablishWriterWorker cnt {}", cnt.load(atomic::Ordering::SeqCst)); + } + Ok::<_, Error>(()) + } + }); self.jobrx .map(move |item| { let wtx = self.worker_tx.clone(); + let cnt = cnt.clone(); async move { - // TODO - debug!("got job"); let res = SeriesWriter::establish( wtx.clone(), item.backend, @@ -236,6 +262,7 @@ impl EstablishWriterWorker { item.tsnow, ) .await; + cnt.fetch_add(1, atomic::Ordering::SeqCst); if item.restx.send((item.job_id, res)).await.is_err() { warn!("can not send writer establish result"); } @@ -281,9 +308,10 @@ impl EstablishWorkerJob { pub fn start_writer_establish_worker( worker_tx: Sender, + stats: Arc, ) -> Result<(Sender,), Error> { let (tx, rx) = async_channel::bounded(256); - let worker = EstablishWriterWorker::new(worker_tx, rx); + let worker = EstablishWriterWorker::new(worker_tx, rx, stats); taskrun::spawn(worker.work()); Ok((tx,)) } @@ -292,6 +320,7 @@ pub fn start_writer_establish_worker( fn write_00() { use netpod::Database; use scywr::session::ScyllaConfig; + use stats::SeriesByChannelStats; use std::sync::Arc; let fut = async { let dbconf = &Database { diff --git a/stats/src/stats.rs b/stats/src/stats.rs index 0828beb..a52ac2b 100644 --- a/stats/src/stats.rs +++ b/stats/src/stats.rs @@ -306,6 +306,7 @@ stats_proc::stats_struct!(( channel_assigned_without_health_update, channel_rogue, ), + histolog2s(poll_all_dt,), ), // agg(name(CaConnSetStatsAgg), parent(CaConnSetStats)), // diff(name(CaConnSetStatsDiff), input(CaConnSetStats)), @@ -366,6 +367,7 @@ stats_proc::stats_struct!(( ), values(db_lookup_workers,) ), + stats_struct(name(SeriesWriterEstablishStats), prefix(wrest), counters(job_recv,),), )); stats_proc::stats_struct!(( @@ -428,9 +430,6 @@ stats_proc::stats_struct!(( poll_reloop, poll_pending, poll_no_progress_no_pending, - poll_reloops_8, - poll_reloops_64, - poll_reloops_512, poll_wake_break, storage_queue_send, storage_queue_pending, @@ -438,9 +437,18 @@ stats_proc::stats_struct!(( storage_queue_above_32, storage_queue_above_128, event_add_res_recv, + caget_timeout, + ), + values(inter_ivl_ema, read_ioids_len, proto_out_len,), + histolog2s( + poll_all_dt, + poll_op3_dt, + poll_reloops, + pong_recv_lat, + ca_ts_off, + iiq_batch_len, + caget_lat, ), - values(inter_ivl_ema), - histolog2s(pong_recv_lat, ca_ts_off,), ), agg(name(CaConnStatsAgg), parent(CaConnStats)), diff(name(CaConnStatsAggDiff), input(CaConnStatsAgg)),