From 87e6dfdcaad8cc4fc5b4cda0a137ec0bd9fc9044 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 10 Oct 2024 12:27:34 +0200 Subject: [PATCH] Write bins, support config reload --- daqingest/src/daemon.rs | 57 ++++- netfetch/src/ca/conn.rs | 150 +++++++++++- netfetch/src/ca/connset.rs | 417 +++++++++++++++++++++++++-------- netfetch/src/ca/proto.rs | 29 +++ netfetch/src/ca/statemap.rs | 60 ++++- netfetch/src/conf.rs | 148 +++++++----- netfetch/src/daemon_common.rs | 10 +- netfetch/src/metrics.rs | 31 +++ netfetch/src/metrics/status.rs | 5 + netfetch/src/throttletrace.rs | 11 + scywr/src/insertworker.rs | 49 ++++ scywr/src/iteminsertqueue.rs | 56 +++++ scywr/src/schema.rs | 22 ++ scywr/src/store.rs | 10 + serieswriter/src/binwriter.rs | 107 ++++++--- stats/src/stats.rs | 2 + 16 files changed, 938 insertions(+), 226 deletions(-) diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index 35eae37..b49c772 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -11,7 +11,7 @@ use netfetch::ca::connset::CaConnSetItem; use netfetch::conf::CaIngestOpts; use netfetch::conf::ChannelConfig; use netfetch::conf::ChannelsConfig; -use netfetch::daemon_common::Channel; +use netfetch::daemon_common::ChannelName; use netfetch::daemon_common::DaemonEvent; use netfetch::metrics::RoutesResources; use netfetch::metrics::StatsSet; @@ -411,7 +411,7 @@ impl Daemon { Ok(()) } - async fn handle_channel_remove(&mut self, ch: Channel) -> Result<(), Error> { + async fn handle_channel_remove(&mut self, ch: ChannelName) -> Result<(), Error> { self.connset_ctrl.remove_channel(ch.name().into()).await?; Ok(()) } @@ -501,6 +501,54 @@ impl Daemon { Ok(()) } + async fn handle_config_reload_inner(&mut self) -> Result<(), Error> { + let channels_dir = self.ingest_opts.channels(); + let channels = match netfetch::conf::parse_channels(channels_dir).await { + Ok(x) => x, + Err(e) => { + return Err(Error::with_msg_no_trace(format!( + "could not reload channel config {e}" + ))); + } + }; + if let Some(channels) = channels { + debug!("channels config reloaded"); + // TODO + // Send a marker flag-clear to CaConnSet. + // Send all the channel-add commands. + let mut i = 0; + for ch_cfg in channels.channels() { + let (tx, rx) = async_channel::bounded(10); + self.connset_ctrl.add_channel(ch_cfg.clone(), tx).await?; + rx.recv().await??; + i += 1; + } + debug!("channel add send n {i}"); + // Send a marker remove-cleared to CaConnSet (must impl that on CaConnSet to remove those channels) + Ok(()) + } else { + Err(Error::with_msg_no_trace(format!("no channel config found"))) + } + } + + async fn handle_config_reload(&mut self, tx: async_channel::Sender) -> Result<(), Error> { + match self.handle_config_reload_inner().await { + Ok(x) => { + if tx.send(0).await.is_err() { + self.stats.channel_send_err().inc(); + } + Ok(()) + } + Err(e) => { + error!("{e}"); + if tx.send(127).await.is_err() { + self.stats.channel_send_err().inc(); + } + Ok(()) + } + } + } + #[cfg(target_abi = "x32")] async fn handle_shutdown(&mut self) -> Result<(), Error> { warn!("received shutdown event"); @@ -539,6 +587,7 @@ impl Daemon { ChannelRemove(ch) => self.handle_channel_remove(ch).await, CaConnSetItem(item) => self.handle_ca_conn_set_item(item).await, Shutdown => self.handle_shutdown().await, + ConfigReload(tx) => self.handle_config_reload(tx).await, }; let dt = ts1.elapsed(); if dt > Duration::from_millis(200) { @@ -635,10 +684,10 @@ impl Daemon { break; } match self.rx.recv().await { - Ok(item) => match self.handle_event(item.clone()).await { + Ok(item) => match self.handle_event(item).await { Ok(()) => {} Err(e) => { - error!("fn daemon: error from handle_event {item:?} {e}"); + error!("fn daemon: error from handle_event {e}"); break; } }, diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 37cd434..ee91dd4 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -710,6 +710,48 @@ impl ChannelState { ChannelState::Closing(st) => st.cssid.clone(), } } + + fn created_state(&self) -> Option<&CreatedState> { + match self { + ChannelState::Init(_) => None, + ChannelState::Creating(_) => None, + ChannelState::FetchEnumDetails(_) => None, + ChannelState::FetchCaStatusSeries(st2) => Some(&st2.channel), + ChannelState::MakingSeriesWriter(st2) => Some(&st2.channel), + ChannelState::Writable(st2) => Some(&st2.channel), + ChannelState::Closing(_) => None, + ChannelState::Error(_) => None, + ChannelState::Ended(_) => None, + } + } + + fn cid(&self) -> Option { + match self { + ChannelState::Init(_) => None, + ChannelState::Creating(_) => None, + ChannelState::FetchEnumDetails(_) => None, + ChannelState::FetchCaStatusSeries(st2) => Some(st2.channel.cid), + ChannelState::MakingSeriesWriter(st2) => Some(st2.channel.cid), + ChannelState::Writable(st2) => Some(st2.channel.cid), + ChannelState::Closing(_) => None, + ChannelState::Error(_) => None, + ChannelState::Ended(_) => None, + } + } + + fn sid(&self) -> Option { + match self { + ChannelState::Init(_) => None, + ChannelState::Creating(_) => None, + ChannelState::FetchEnumDetails(_) => None, + ChannelState::FetchCaStatusSeries(st2) => Some(st2.channel.sid), + ChannelState::MakingSeriesWriter(st2) => Some(st2.channel.sid), + ChannelState::Writable(st2) => Some(st2.channel.sid), + ChannelState::Closing(_) => None, + ChannelState::Error(_) => None, + ChannelState::Ended(_) => None, + } + } } #[derive(Debug)] @@ -967,6 +1009,7 @@ pub enum CaConnEventValue { ChannelStatus(ChannelStatusPartial), ChannelCreateFail(String), EndOfStream(EndOfStreamReason), + ChannelRemoved(String), } impl CaConnEventValue { @@ -978,6 +1021,7 @@ impl CaConnEventValue { CaConnEventValue::ChannelStatus(_) => "ChannelStatus", CaConnEventValue::ChannelCreateFail(_) => "ChannelCreateFail", CaConnEventValue::EndOfStream(_) => "EndOfStream", + CaConnEventValue::ChannelRemoved(_) => "ChannelRemoved", } } } @@ -1116,7 +1160,7 @@ impl CaConn { iqsp: Box::pin(InsertSenderPolling::new(iqtx)), ca_conn_event_out_queue: VecDeque::new(), ca_conn_event_out_queue_max: 2000, - thr_msg_poll: ThrottleTrace::new(Duration::from_millis(10000)), + thr_msg_poll: ThrottleTrace::new(Duration::from_millis(2000)), ca_proto_stats, rng, channel_info_query_qu: VecDeque::new(), @@ -1470,6 +1514,7 @@ impl CaConn { } pub fn channel_add(&mut self, conf: ChannelConfig, cssid: ChannelStatusSeriesId) -> Result<(), Error> { + debug!("channel_add {conf:?} {cssid:?}"); if false { if netpod::trigger.contains(&conf.name()) { self.trace_channel_poll = true; @@ -1500,10 +1545,82 @@ impl CaConn { } pub fn channel_close(&mut self, name: String) { - error!("TODO actually cause the channel to get closed and removed {}", name); + debug!("channel_close {}", name); + let tsnow = Instant::now(); + let stnow = SystemTime::now(); + + let cid = if let Some(x) = self.cid_by_name.get(&name) { + x.clone() + } else { + debug!("channel_close {} can not find channel", name); + return; + }; + self.cid_by_name.remove(&name); + + if let Some(conf) = self.channels.get_mut(&cid) { + let mut item_deque = VecDeque::new(); + let item = ChannelStatusItem { + ts: stnow, + cssid: conf.state.cssid(), + status: ChannelStatus::Closed(ChannelStatusClosedReason::ChannelRemove), + }; + let deque = &mut item_deque; + if conf.wrst.emit_channel_status_item(item, deque).is_err() { + self.stats.logic_error().inc(); + } + for x in item_deque { + self.iqdqs.st_rf3_qu.push_back(x); + } + + // TODO shutdown the internal writer structures. + if let Some(cst) = conf.state.created_state() { + if let Some(proto) = self.proto.as_mut() { + let ty = CaMsgTy::ChannelClose(ChannelClose { + sid: cst.sid.to_u32(), + cid: cid.0, + }); + let item = CaMsg::from_ty_ts(ty, tsnow); + proto.push_out(item); + } + } + + { + let mut it = self.cid_by_subid.extract_if(|_, v| *v == cid); + if let Some((subid, _cid)) = it.next() { + it.count(); + if let Some(cst) = conf.state.created_state() { + if let Some(proto) = self.proto.as_mut() { + let ty = CaMsgTy::EventCancel(EventCancel { + data_type: cst.ca_dbr_type, + data_count: cst.ca_dbr_count, + sid: cst.sid.to_u32(), + subid: subid.to_u32(), + }); + let item = CaMsg::from_ty_ts(ty, tsnow); + proto.push_out(item); + } + } + }; + } + } else { + debug!("channel_close {} no channel block", name); + }; + + { + let it = self.cid_by_sid.extract_if(|_, v| *v == cid); + it.count(); + } + + self.channels.remove(&cid); + + // TODO emit CaConn item to let CaConnSet know that we have closed the channel. + // TODO may be too full + let value = CaConnEventValue::ChannelRemoved(name); + let item = CaConnEvent::new_now(value); + self.ca_conn_event_out_queue.push_back(item); } - pub fn channel_remove(&mut self, name: String) { + fn channel_remove_by_name(&mut self, name: String) { if let Some(cid) = self.cid_by_name(&name) { self.channel_remove_by_cid(cid); } else { @@ -1652,7 +1769,10 @@ impl CaConn { let cid = if let Some(x) = self.cid_by_subid.get(&subid) { *x } else { - warn!("can not find cid for subid {subid:?}"); + if self.thr_msg_poll.is_action() { + self.stats.no_cid_for_subid().inc(); + // debug!("can not find cid for subid {subid:?}"); + } // return Err(Error::with_msg_no_trace()); return Ok(()); }; @@ -1819,7 +1939,10 @@ impl CaConn { let cid = if let Some(x) = self.cid_by_subid.get(&subid) { *x } else { - warn!("can not find cid for subid {subid:?}"); + if self.thr_msg_poll.is_action() { + self.stats.no_cid_for_subid().inc(); + // debug!("can not find cid for subid {subid:?}"); + } // return Err(Error::with_msg_no_trace()); return Ok(()); }; @@ -2101,20 +2224,20 @@ impl CaConn { stats.logic_error().inc(); } } - let ts_local = TsNano::from_system_time(stnow); + let tsev_local = TsNano::from_system_time(stnow); { let ts = value.ts().ok_or_else(|| Error::MissingTimestamp)?; - let ts_diff = ts.abs_diff(ts_local.ns()); + let ts_diff = ts.abs_diff(tsev_local.ns()); stats.ca_ts_off().ingest((ts_diff / MS) as u32); } { - let tsev = ts_local; + let tsev = tsev_local; Self::check_ev_value_data(&value.data, &writer.scalar_type())?; crst.muted_before = 0; crst.insert_item_ivl_ema.tick(tsnow); // let ts_ioc = TsNano::from_ns(ts); // let ts_local = TsNano::from_ns(ts_local); - // binwriter.ingest(ts_ioc, ts_local, &val, iqdqs)?; + binwriter.ingest(tsev_local, value.f32_for_binning(), iqdqs)?; { let wres = writer.write(CaWriterValue::new(value, crst), tsnow, tsev, iqdqs)?; crst.status_emit_count += wres.nstatus() as u64; @@ -3087,7 +3210,9 @@ impl CaConn { for (_, chconf) in &mut self.channels { let chst = &mut chconf.state; if let ChannelState::Writable(st2) = chst { - st2.writer.tick(&mut self.iqdqs)?; + let iqdqs = &mut self.iqdqs; + st2.writer.tick(iqdqs)?; + st2.binwriter.tick(iqdqs)?; } } Ok(()) @@ -3204,6 +3329,9 @@ macro_rules! flush_queue_dqs { // let sp = std::pin::pin!(obj.iqsp.$sp); // let sp = &mut obj.iqsp.$sp; // let sp = std::pin::pin!(sp); + if qu.len() < qu.capacity() * 4 / 10 { + qu.shrink_to(qu.capacity() * 7 / 10); + } let sp = obj.iqsp.as_mut().$sp(); match Self::attempt_flush_queue(qu, sp, $batcher, $loop_max, $cx, $id, $stats) { Ok(Ready(Some(()))) => { @@ -3603,7 +3731,7 @@ impl EmittableType for CaWriterValue { state.last_accepted_val = Some(self.clone()); let byte_size = self.byte_size(); if diff_data { - debug!("diff_data emit {:?}", state.series_data); + // debug!("diff_data emit {:?}", state.series_data); let (ts_msp, ts_lsp, ts_msp_chg) = state.msp_split_data.split(ts, self.byte_size()); let data_value = { use super::proto::CaDataValue; diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index 5c0512f..43348ec 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -7,8 +7,7 @@ use crate::ca::statemap::MaybeWrongAddressState; use crate::ca::statemap::WithAddressState; use crate::conf::CaIngestOpts; use crate::conf::ChannelConfig; -use crate::daemon_common::Channel; -use crate::errconv::ErrConv; +use crate::daemon_common::ChannelName; use crate::rt::JoinHandle; use crate::throttletrace::ThrottleTrace; use async_channel::Receiver; @@ -26,7 +25,8 @@ use dbpg::seriesbychannel::BoxedSend; use dbpg::seriesbychannel::CanSendChannelInfoResult; use dbpg::seriesbychannel::ChannelInfoQuery; use dbpg::seriesbychannel::ChannelInfoResult; -use err::Error; +use err::thiserror; +use err::ThisError; use futures_util::FutureExt; use futures_util::Stream; use futures_util::StreamExt; @@ -83,6 +83,7 @@ const MAYBE_WRONG_ADDRESS_STAY: Duration = Duration::from_millis(4000); const SEARCH_PENDING_TIMEOUT: Duration = Duration::from_millis(30000); const CHANNEL_HEALTH_TIMEOUT: Duration = Duration::from_millis(30000); const CHANNEL_UNASSIGNED_TIMEOUT: Duration = Duration::from_millis(0); +const UNASSIGN_FOR_CONFIG_CHANGE_TIMEOUT: Duration = Duration::from_millis(1000 * 10); const CHANNEL_MAX_WITHOUT_HEALTH_UPDATE: usize = 3000000; #[allow(unused)] @@ -112,6 +113,51 @@ macro_rules! trace4 { }; } +#[allow(unused)] +macro_rules! trace_health_update { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } + +#[allow(unused)] +macro_rules! trace_channel_state { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } + +#[derive(Debug, ThisError)] +#[cstm(name = "CaConnSet")] +pub enum Error { + ChannelSend, + TaskJoin(#[from] tokio::task::JoinError), + SeriesLookup(#[from] dbpg::seriesbychannel::Error), + Beacons(#[from] crate::ca::beacons::Error), + SeriesWriter(#[from] serieswriter::writer::Error), + ExpectIpv4, + UnknownCssid, + Regex(#[from] regex::Error), + MissingChannelInfoChannelTx, + UnexpectedChannelDummyState, + CaConnEndWithoutReason, + PushCmdsNoSendInProgress(SocketAddr), + SenderPollingSend, + NoProgressNoPending, + IocFinder(::err::Error), + ChannelAssignedWithoutConnRess, +} + +impl From> for Error { + fn from(_value: async_channel::SendError) -> Self { + Self::ChannelSend + } +} + +impl From> for Error { + fn from(_value: scywr::senderpolling::Error) -> Self { + Self::SenderPollingSend + } +} + +impl From for ::err::Error { + fn from(value: Error) -> Self { + Self::from_string(value) + } +} + #[derive(Debug, PartialEq, Eq)] pub struct CmdId(SocketAddrV4, usize); @@ -210,7 +256,7 @@ impl CaConnSetEvent { // pub fn new_cmd_channel_statuses() -> (Self, Receiver) {} } -#[derive(Debug, Clone)] +#[derive(Debug)] pub enum CaConnSetItem { Error(Error), Healthy, @@ -258,7 +304,7 @@ impl CaConnSetCtrl { } pub async fn join(self) -> Result<(), Error> { - self.jh.await.map_err(|e| Error::with_msg_no_trace(e.to_string()))??; + self.jh.await??; Ok(()) } @@ -324,24 +370,43 @@ struct SeriesLookupSender { impl CanSendChannelInfoResult for SeriesLookupSender { fn make_send(&self, item: Result) -> BoxedSend { let tx = self.tx.clone(); - let fut = async move { - tx.send(item.map_err(|e| Error::with_msg_no_trace(e.to_string()))) - .await - .map_err(|_| ()) - }; + let fut = async move { tx.send(item.map_err(Into::into)).await.map_err(|_| ()) }; Box::pin(fut) } } +struct StateTransRes<'a> { + backend: &'a str, + stats: &'a CaConnSetStats, + ca_conn_ress: &'a mut HashMap, + channel_info_query_qu: &'a mut VecDeque, + channel_info_res_tx: Pin<&'a mut Sender>>, + chst: &'a mut ChannelState, +} + +impl<'a> StateTransRes<'a> { + fn init(value: &'a mut CaConnSet, chname: &ChannelName) -> Self { + let chst = value.channel_states.get_mut_or_dummy_init(&chname); + Self { + backend: &value.backend, + stats: &value.stats, + ca_conn_ress: &mut value.ca_conn_ress, + channel_info_query_qu: &mut value.channel_info_query_qu, + channel_info_res_tx: value.channel_info_res_tx.as_mut(), + chst, + } + } +} + pub struct CaConnSet { ticker: Pin>, backend: String, local_epics_hostname: String, ca_conn_ress: HashMap, channel_states: ChannelStateMap, - channel_by_cssid: HashMap, + channel_by_cssid: HashMap, connset_inp_rx: Pin>>, - channel_info_query_queue: VecDeque, + channel_info_query_qu: VecDeque, channel_info_query_sender: Pin>>, channel_info_query_tx: Option>, channel_info_res_tx: Pin>>>, @@ -359,10 +424,10 @@ pub struct CaConnSet { connset_out_tx: Pin>>, shutdown_stopping: bool, shutdown_done: bool, - chan_check_next: Option, + chan_check_next: Option, stats: Arc, ca_conn_stats: Arc, - ioc_finder_jh: JoinHandle>, + ioc_finder_jh: JoinHandle>, await_ca_conn_jhs: VecDeque<(SocketAddr, JoinHandle>)>, thr_msg_poll_1: ThrottleTrace, thr_msg_storage_len: ThrottleTrace, @@ -408,7 +473,7 @@ impl CaConnSet { channel_states: ChannelStateMap::new(), channel_by_cssid: HashMap::new(), connset_inp_rx: Box::pin(connset_inp_rx), - channel_info_query_queue: VecDeque::new(), + channel_info_query_qu: VecDeque::new(), channel_info_query_sender: Box::pin(SenderPolling::new(channel_info_query_tx.clone())), channel_info_query_tx: Some(channel_info_query_tx), channel_info_res_tx: Box::pin(channel_info_res_tx), @@ -486,14 +551,12 @@ impl CaConnSet { trace!("CaConnSet EndOfStream"); beacons_cancel_guard_tx.send(1).await.ok(); trace!("CaConnSet beacon cancelled"); - beacons_jh.await?.map_err(|e| Error::from_string(e))?; + beacons_jh.await??; trace!("CaConnSet beacon joined"); trace!("join ioc_finder_jh A {:?}", this.find_ioc_query_sender.len()); this.find_ioc_query_sender.as_mut().drop(); trace!("join ioc_finder_jh B {:?}", this.find_ioc_query_sender.len()); - this.ioc_finder_jh - .await - .map_err(|e| Error::with_msg_no_trace(e.to_string()))??; + this.ioc_finder_jh.await?.map_err(|e| Error::IocFinder(e))?; trace!("joined ioc_finder_jh"); this.connset_out_tx.close(); this.connset_inp_rx.close(); @@ -503,55 +566,133 @@ impl CaConnSet { } fn handle_event(&mut self, ev: CaConnSetEvent) -> Result<(), Error> { - // trace!("handle_event {ev:?}"); match ev { CaConnSetEvent::ConnSetCmd(cmd) => match cmd { ConnSetCmd::ChannelAdd(x) => self.handle_add_channel(x), - // ConnSetCmd::ChannelAddWithStatusId(x) => self.handle_add_channel_with_status_id(x), - // ConnSetCmd::ChannelAddWithAddr(x) => self.handle_add_channel_with_addr(x), ConnSetCmd::ChannelRemove(x) => self.handle_remove_channel(x), - // ConnSetCmd::IocAddrQueryResult(x) => self.handle_ioc_query_result(x).await, - // ConnSetCmd::SeriesLookupResult(x) => self.handle_series_lookup_result(x).await, ConnSetCmd::Shutdown => self.handle_shutdown(), ConnSetCmd::ChannelStatuses(x) => self.handle_channel_statuses_req(x), }, } } + fn handle_add_channel_new(cmd: ChannelAdd, ress: StateTransRes) -> Result<(), Error> { + { + let item = ChannelState { + value: ChannelStateValue::Active(ActiveChannelState::WaitForStatusSeriesId { + since: SystemTime::now(), + }), + config: cmd.ch_cfg.clone(), + touched: 1, + }; + *ress.chst = item; + } + { + let channel_name = cmd.name().into(); + let tx = ress.channel_info_res_tx.as_ref().get_ref().clone(); + let item = ChannelInfoQuery { + backend: ress.backend.into(), + channel: channel_name, + kind: SeriesKind::ChannelStatus, + scalar_type: ScalarType::U64, + shape: Shape::Scalar, + tx: Box::pin(SeriesLookupSender { tx }), + }; + ress.channel_info_query_qu.push_back(item); + } + if let Err(_) = cmd.restx.try_send(Ok(())) { + ress.stats.command_reply_fail().inc(); + } + Ok(()) + } + + fn handle_add_channel_existing(cmd: ChannelAdd, ress: StateTransRes) -> Result<(), Error> { + let tsnow = Instant::now(); + if cmd.ch_cfg == ress.chst.config { + debug!("handle_add_channel_existing config same {}", cmd.name()); + if let Err(_) = cmd.restx.try_send(Ok(())) { + ress.stats.command_reply_fail().inc(); + } + Ok(()) + } else { + debug!("handle_add_channel_existing config changed {}", cmd.name()); + // TODO + match &mut ress.chst.value { + ChannelStateValue::Active(st2) => match st2 { + ActiveChannelState::Init { .. } => { + ress.chst.config = cmd.ch_cfg; + } + ActiveChannelState::WaitForStatusSeriesId { .. } => { + ress.chst.config = cmd.ch_cfg; + } + ActiveChannelState::WithStatusSeriesId(st3) => match &mut st3.inner { + WithStatusSeriesIdStateInner::AddrSearchPending { .. } => { + ress.chst.config = cmd.ch_cfg; + } + WithStatusSeriesIdStateInner::WithAddress { addr, state: st4 } => match &st4 { + WithAddressState::Unassigned { .. } => { + ress.chst.config = cmd.ch_cfg; + } + WithAddressState::Assigned(_) => { + debug!("unassign for config change {cmd:?} {addr}"); + let conn_ress = ress + .ca_conn_ress + .get_mut(&SocketAddr::V4(addr.clone())) + .ok_or_else(|| Error::ChannelAssignedWithoutConnRess)?; + let item = ConnCommand::channel_close(cmd.name().into()); + conn_ress.cmd_queue.push_back(item); + st3.inner = WithStatusSeriesIdStateInner::UnassigningForConfigChange( + statemap::UnassigningForConfigChangeState { + config_new: cmd.ch_cfg, + addr: SocketAddr::V4(addr.clone()), + since: tsnow, + }, + ); + } + }, + WithStatusSeriesIdStateInner::UnknownAddress { .. } => { + ress.chst.config = cmd.ch_cfg; + } + WithStatusSeriesIdStateInner::NoAddress { .. } => { + ress.chst.config = cmd.ch_cfg; + } + WithStatusSeriesIdStateInner::MaybeWrongAddress(..) => { + ress.chst.config = cmd.ch_cfg; + } + WithStatusSeriesIdStateInner::UnassigningForConfigChange(st4) => { + st4.config_new = cmd.ch_cfg; + } + }, + }, + ChannelStateValue::ToRemove { .. } => { + ress.chst.config = cmd.ch_cfg; + } + ChannelStateValue::InitDummy => { + return Err(Error::UnexpectedChannelDummyState); + } + } + if let Err(_) = cmd.restx.try_send(Ok(())) { + ress.stats.command_reply_fail().inc(); + } + Ok(()) + } + } + fn handle_add_channel(&mut self, cmd: ChannelAdd) -> Result<(), Error> { if self.shutdown_stopping { trace3!("handle_add_channel but shutdown_stopping"); return Ok(()); } - trace3!("handle_add_channel {:?}", cmd); + trace_channel_state!("handle_add_channel {:?}", cmd); self.stats.channel_add().inc(); // TODO should I add the transition through ActiveChannelState::Init as well? - let ch = Channel::new(cmd.name().into()); - let _st = if let Some(e) = self.channel_states.get_mut(&ch) { - e + let chname = ChannelName::new(cmd.name().into()); + let ress = StateTransRes::init(self, &chname); + if ress.chst.is_dummy() { + // Directly overwrites this dummy state: + Self::handle_add_channel_new(cmd, ress)?; } else { - let item = ChannelState { - value: ChannelStateValue::Active(ActiveChannelState::WaitForStatusSeriesId { - since: SystemTime::now(), - }), - config: cmd.ch_cfg.clone(), - }; - self.channel_states.insert(ch.clone(), item); - self.channel_states.get_mut(&ch).unwrap() - }; - let channel_name = cmd.name().into(); - let tx = self.channel_info_res_tx.as_ref().get_ref().clone(); - let item = ChannelInfoQuery { - backend: self.backend.clone(), - channel: channel_name, - kind: SeriesKind::ChannelStatus, - scalar_type: ScalarType::U64, - shape: Shape::Scalar, - tx: Box::pin(SeriesLookupSender { tx }), - }; - self.channel_info_query_queue.push_back(item); - if let Err(_) = cmd.restx.try_send(Ok(())) { - self.stats.command_reply_fail().inc(); + Self::handle_add_channel_existing(cmd, ress)?; } Ok(()) } @@ -564,6 +705,7 @@ impl CaConnSet { CaConnEventValue::ChannelCreateFail(x) => self.handle_channel_create_fail(addr, x), CaConnEventValue::ChannelStatus(st) => self.apply_ca_conn_health_update(addr, st), CaConnEventValue::EndOfStream(reason) => self.handle_ca_conn_eos(addr, reason), + CaConnEventValue::ChannelRemoved(name) => self.handle_ca_conn_channel_removed(addr, name), } } @@ -574,12 +716,12 @@ impl CaConnSet { } else { match res { Ok(res) => { - let channel = Channel::new(res.channel.clone()); + let channel = ChannelName::new(res.channel.clone()); // TODO must not depend on purely informative `self.channel_state` if let Some(st) = self.channel_states.get_mut(&channel) { let cssid = ChannelStatusSeriesId::new(res.series.to_series().id()); self.channel_by_cssid - .insert(cssid.clone(), Channel::new(res.channel.clone())); + .insert(cssid.clone(), ChannelName::new(res.channel.clone())); let add = ChannelAddWithStatusId { ch_cfg: st.config.clone(), cssid, @@ -611,7 +753,7 @@ impl CaConnSet { if trigger.contains(&name) { info!("handle_add_channel_with_status_id {cmd:?}"); } - let ch = Channel::new(name.into()); + let ch = ChannelName::new(name.into()); if let Some(chst) = self.channel_states.get_mut(&ch) { if let ChannelStateValue::Active(chst2) = &mut chst.value { if let ActiveChannelState::WaitForStatusSeriesId { since } = chst2 { @@ -620,8 +762,7 @@ impl CaConnSet { self.cssid_latency_max = dt + Duration::from_millis(2000); debug!("slow cssid fetch dt {:.0} ms {:?}", 1e3 * dt.as_secs_f32(), cmd); } - let mut writer_status = serieswriter::writer::SeriesWriter::new(SeriesId::new(cmd.cssid.id())) - .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; + let mut writer_status = serieswriter::writer::SeriesWriter::new(SeriesId::new(cmd.cssid.id()))?; let mut writer_status_state = serieswriter::fixgridwriter::ChannelStatusWriteState::new( SeriesId::new(cmd.cssid.id()), serieswriter::fixgridwriter::CHANNEL_STATUS_GRID, @@ -634,9 +775,7 @@ impl CaConnSet { let state = &mut writer_status_state; let ts_net = Instant::now(); let deque = &mut self.storage_insert_queue_l1; - writer_status - .write(item, state, ts_net, ts, deque) - .map_err(Error::from_string)?; + writer_status.write(item, state, ts_net, ts, deque)?; } *chst2 = ActiveChannelState::WithStatusSeriesId(WithStatusSeriesIdState { cssid: cmd.cssid, @@ -674,20 +813,21 @@ impl CaConnSet { let addr_v4 = if let SocketAddr::V4(x) = cmd.addr { x } else { - return Err(Error::with_msg_no_trace("ipv4 for epics")); + return Err(Error::ExpectIpv4); }; if trigger.contains(&name) { info!("handle_add_channel_with_addr {cmd:?}"); } - let ch = Channel::new(name.into()); + let ch = ChannelName::new(name.into()); if let Some(chst) = self.channel_states.get_mut(&ch) { + // TODO should not have some already stored config. + chst.config = cmd.ch_cfg.clone(); if let ChannelStateValue::Active(ast) = &mut chst.value { if let ActiveChannelState::WithStatusSeriesId(st3) = ast { trace!("handle_add_channel_with_addr INNER {cmd:?}"); self.stats.handle_add_channel_with_addr().inc(); let tsnow = SystemTime::now(); - let mut writer_status = serieswriter::writer::SeriesWriter::new(SeriesId::new(cmd.cssid.id())) - .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; + let mut writer_status = serieswriter::writer::SeriesWriter::new(SeriesId::new(cmd.cssid.id()))?; let mut writer_status_state = serieswriter::fixgridwriter::ChannelStatusWriteState::new( SeriesId::new(cmd.cssid.id()), serieswriter::fixgridwriter::CHANNEL_STATUS_GRID, @@ -700,9 +840,7 @@ impl CaConnSet { let state = &mut writer_status_state; let ts_net = Instant::now(); let deque = &mut self.storage_insert_queue_l1; - writer_status - .write(item, state, ts_net, ts, deque) - .map_err(Error::from_string)?; + writer_status.write(item, state, ts_net, ts, deque)?; } *st3 = WithStatusSeriesIdState { cssid: cmd.cssid.clone(), @@ -739,7 +877,7 @@ impl CaConnSet { if self.shutdown_stopping { return Ok(()); } - let ch = Channel::new(cmd.name); + let ch = ChannelName::new(cmd.name); if let Some(k) = self.channel_states.get_mut(&ch) { match &k.value { ChannelStateValue::Active(j) => match j { @@ -767,9 +905,13 @@ impl CaConnSet { WithStatusSeriesIdStateInner::MaybeWrongAddress { .. } => { k.value = ChannelStateValue::ToRemove { addr: None }; } + WithStatusSeriesIdStateInner::UnassigningForConfigChange(..) => { + k.value = ChannelStateValue::ToRemove { addr: None }; + } }, }, ChannelStateValue::ToRemove { .. } => {} + ChannelStateValue::InitDummy { .. } => {} } } Ok(()) @@ -781,7 +923,7 @@ impl CaConnSet { return Ok(()); } for res in results { - let ch = Channel::new(res.channel.clone()); + let ch = ChannelName::new(res.channel.clone()); if trigger.contains(&ch.name()) { info!("handle_ioc_query_result {res:?}"); } @@ -884,14 +1026,16 @@ impl CaConnSet { } fn apply_ca_conn_health_update(&mut self, addr: SocketAddr, res: ChannelStatusPartial) -> Result<(), Error> { - trace2!("apply_ca_conn_health_update {addr}"); + trace_health_update!("apply_ca_conn_health_update {addr}"); let tsnow = SystemTime::now(); self.rogue_channel_count = 0; for (k, v) in res.channel_statuses { + trace_health_update!("self.rogue_channel_count {}", self.rogue_channel_count); + trace_health_update!("apply_ca_conn_health_update {k:?} {v:?}"); let ch = if let Some(x) = self.channel_by_cssid.get(&k) { x } else { - return Err(Error::with_msg_no_trace(format!("unknown cssid {:?}", v.cssid))); + return Err(Error::UnknownCssid); }; if let Some(st1) = self.channel_states.get_mut(&ch) { if let ChannelStateValue::Active(st2) = &mut st1.value { @@ -924,6 +1068,7 @@ impl CaConnSet { self.rogue_channel_count += 1; } } + trace_health_update!("self.rogue_channel_count {}", self.rogue_channel_count); self.stats.channel_rogue.set(self.rogue_channel_count); Ok(()) } @@ -931,7 +1076,7 @@ impl CaConnSet { fn handle_channel_create_fail(&mut self, addr: SocketAddr, name: String) -> Result<(), Error> { trace!("handle_channel_create_fail {addr} {name}"); let tsnow = SystemTime::now(); - let ch = Channel::new(name); + let ch = ChannelName::new(name); if let Some(st1) = self.channel_states.get_mut(&ch) { if let ChannelStateValue::Active(st2) = &mut st1.value { if let ActiveChannelState::WithStatusSeriesId(st3) = st2 { @@ -982,6 +1127,45 @@ impl CaConnSet { Ok(()) } + fn handle_ca_conn_channel_removed(&mut self, addr: SocketAddr, name: String) -> Result<(), Error> { + debug!("handle_ca_conn_channel_removed {addr} {name}"); + let stnow = SystemTime::now(); + let name = ChannelName::new(name); + if let Some(st1) = self.channel_states.get_mut(&name) { + match &mut st1.value { + ChannelStateValue::Active(st2) => match st2 { + ActiveChannelState::Init { .. } => Ok(()), + ActiveChannelState::WaitForStatusSeriesId { .. } => Ok(()), + ActiveChannelState::WithStatusSeriesId(st3) => match &st3.inner { + WithStatusSeriesIdStateInner::AddrSearchPending { .. } => Ok(()), + WithStatusSeriesIdStateInner::WithAddress { .. } => Ok(()), + WithStatusSeriesIdStateInner::UnknownAddress { .. } => Ok(()), + WithStatusSeriesIdStateInner::NoAddress { .. } => Ok(()), + WithStatusSeriesIdStateInner::MaybeWrongAddress(..) => Ok(()), + WithStatusSeriesIdStateInner::UnassigningForConfigChange(st4) => { + st1.config = st4.config_new.clone(); + let cmd = ChannelAddWithAddr { + ch_cfg: st4.config_new.clone(), + cssid: st3.cssid, + addr: st4.addr, + }; + self.handle_add_channel_with_addr(cmd)?; + Ok(()) + } + }, + }, + ChannelStateValue::ToRemove { .. } => { + self.channel_states.remove(&name); + Ok(()) + } + ChannelStateValue::InitDummy => Err(Error::UnexpectedChannelDummyState), + } + } else { + debug!("can not find channel for removed channel {:?}", name); + Ok(()) + } + } + fn handle_connect_fail(&mut self, addr: SocketAddr) -> Result<(), Error> { self.transition_channels_to_maybe_wrong_address(addr)?; Ok(()) @@ -1029,10 +1213,14 @@ impl CaConnSet { UnknownAddress { since: _ } => {} NoAddress { since: _ } => {} MaybeWrongAddress(_) => {} + UnassigningForConfigChange(_) => {} } } }, ChannelStateValue::ToRemove { addr: _ } => {} + ChannelStateValue::InitDummy => { + // TODO must never occur + } } } Ok(()) @@ -1057,7 +1245,7 @@ impl CaConnSet { let addr_v4 = if let SocketAddr::V4(x) = add.addr { x } else { - return Err(Error::with_msg_no_trace("only ipv4 for epics")); + return Err(Error::ExpectIpv4); }; self.stats.create_ca_conn().inc(); let conn = CaConn::new( @@ -1068,7 +1256,7 @@ impl CaConnSet { self.iqtx.clone2(), self.channel_info_query_tx .clone() - .ok_or_else(|| Error::with_msg_no_trace("no more channel_info_query_tx available"))?, + .ok_or_else(|| Error::MissingChannelInfoChannelTx)?, self.ca_conn_stats.clone(), self.ca_proto_stats.clone(), ); @@ -1135,19 +1323,26 @@ impl CaConnSet { | CaConnEventValue::ChannelCreateFail(..) | CaConnEventValue::ChannelStatus(..) => { if let Err(e) = tx1.send((addr, item)).await { - error!("can not deliver error {e}"); - return Err(Error::with_msg_no_trace("can not deliver error")); + error!("channel send {:?}", e); + return Err(e.into()); } } CaConnEventValue::EndOfStream(reason) => { eos_reason = Some(reason); } + CaConnEventValue::ChannelRemoved(_) => { + debug!("ca_conn_item_merge_inner {:?}", item); + if let Err(e) = tx1.send((addr, item)).await { + error!("channel send {:?}", e); + return Err(e.into()); + } + } } } if let Some(x) = eos_reason { Ok(x) } else { - let e = Error::with_msg_no_trace(format!("CaConn gave no reason {addr}")); + let e = Error::CaConnEndWithoutReason; Err(e) } } @@ -1327,17 +1522,13 @@ impl CaConnSet { let item = ChannelStatusItem::new_closed_conn_timeout(stnow, st3.cssid.clone()); let (tsev, val) = item.to_ts_val(); let deque = &mut item_deque; - st3.writer_status - .as_mut() - .unwrap() - .write( - serieswriter::fixgridwriter::ChannelStatusWriteValue::new(tsev, val), - st3.writer_status_state.as_mut().unwrap(), - tsnow, - tsev, - deque, - ) - .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; + st3.writer_status.as_mut().unwrap().write( + serieswriter::fixgridwriter::ChannelStatusWriteValue::new(tsev, val), + st3.writer_status_state.as_mut().unwrap(), + tsnow, + tsev, + deque, + )?; } } } @@ -1362,11 +1553,19 @@ impl CaConnSet { } } } + WithStatusSeriesIdStateInner::UnassigningForConfigChange(st4) => { + if tsnow.saturating_duration_since(st4.since) >= UNASSIGN_FOR_CONFIG_CHANGE_TIMEOUT { + debug!("timeout unassign for config change"); + } + } }, }, ChannelStateValue::ToRemove { .. } => { // TODO if assigned to some address, } + ChannelStateValue::InitDummy => { + // TODO must never occur + } } if i >= CHECK_CHANS_PER_TICK { self.chan_check_next = Some(ch.clone()); @@ -1394,7 +1593,6 @@ impl CaConnSet { let mut search_pending = 0; let mut no_address = 0; let mut unassigned = 0; - let mut backoff = 0; let mut assigned = 0; let mut connected = 0; let mut maybe_wrong_address = 0; @@ -1439,18 +1637,22 @@ impl CaConnSet { WithStatusSeriesIdStateInner::MaybeWrongAddress { .. } => { maybe_wrong_address += 1; } + WithStatusSeriesIdStateInner::UnassigningForConfigChange(_) => { + assigned += 1; + } }, }, ChannelStateValue::ToRemove { .. } => { unassigned += 1; } + ChannelStateValue::InitDummy => {} } } self.stats.channel_unknown_address.set(unknown_address); self.stats.channel_search_pending.set(search_pending); self.stats.channel_no_address.set(no_address); self.stats.channel_unassigned.set(unassigned); - self.stats.channel_backoff.set(backoff); + // self.stats.channel_backoff.set(backoff); self.stats.channel_assigned.set(assigned); self.stats.channel_connected.set(connected); self.stats.channel_maybe_wrong_address.set(maybe_wrong_address); @@ -1460,8 +1662,10 @@ impl CaConnSet { (search_pending, assigned_without_health_update) } - fn try_push_ca_conn_cmds(&mut self, cx: &mut Context) -> Result<(), Error> { + fn try_push_ca_conn_cmds(&mut self, cx: &mut Context) -> Option>> { use Poll::*; + let mut have_pending = false; + let mut have_progress = false; for (addr, v) in self.ca_conn_ress.iter_mut() { let tx = &mut v.sender; loop { @@ -1469,15 +1673,14 @@ impl CaConnSet { match tx.poll_unpin(cx) { Ready(Ok(())) => { self.stats.try_push_ca_conn_cmds_sent.inc(); + have_progress = true; continue; } Ready(Err(e)) => match e { scywr::senderpolling::Error::NoSendInProgress => { - let e = Error::with_msg_no_trace(format!( - "try_push_ca_conn_cmds E-A {addr} NoSendInProgress" - )); + let e = Error::PushCmdsNoSendInProgress(*addr); error!("{e}"); - return Err(e); + return Some(Ready(Err(e))); } scywr::senderpolling::Error::Closed(_) => { // TODO @@ -1487,7 +1690,9 @@ impl CaConnSet { self.stats.try_push_ca_conn_cmds_closed().inc(); } }, - Pending => {} + Pending => { + have_pending = true; + } } } else if let Some(item) = v.cmd_queue.pop_front() { tx.as_mut().send_pin(item); @@ -1497,7 +1702,13 @@ impl CaConnSet { }; } } - Ok(()) + if have_progress { + Some(Ready(Ok(()))) + } else if have_pending { + Some(Pending) + } else { + None + } } fn handle_own_ticker_tick(mut self: Pin<&mut Self>, cx: &mut Context) -> Result<(), Error> { @@ -1596,10 +1807,9 @@ where on_send_ok(); Some(Ready(Ok(()))) } - Ready(Err(_)) => { - let e = Error::with_msg_no_trace("can not send into channel"); - error!("{e}"); - Some(Ready(Err(e))) + Ready(Err(e)) => { + error!("sender_polling_send {e}"); + Some(Ready(Err(e.into()))) } Pending => Some(Pending), } @@ -1627,7 +1837,7 @@ impl Stream for CaConnSet { .set(self.storage_insert_queue.len() as _); self.stats .channel_info_query_queue_len - .set(self.channel_info_query_queue.len() as _); + .set(self.channel_info_query_qu.len() as _); self.stats .channel_info_query_sender_len .set(self.channel_info_query_sender.len().unwrap_or(0) as _); @@ -1641,7 +1851,8 @@ impl Stream for CaConnSet { let mut penpro = PendingProgress::new(); - if let Err(e) = self.try_push_ca_conn_cmds(cx) { + let res = self.try_push_ca_conn_cmds(cx); + if let Err(e) = merge_pending_progress(res, &mut penpro) { break Ready(Some(CaConnSetItem::Error(e))); } @@ -1715,7 +1926,7 @@ impl Stream for CaConnSet { } { let this = self.as_mut().get_mut(); - let qu = &mut this.channel_info_query_queue; + let qu = &mut this.channel_info_query_qu; let tx = this.channel_info_query_sender.as_mut(); let x = sender_polling_send(qu, tx, cx, || ()); if let Err(e) = merge_pending_progress(x, &mut penpro) { @@ -1797,7 +2008,7 @@ impl Stream for CaConnSet { Pending } else { self.stats.poll_no_progress_no_pending().inc(); - let e = Error::with_msg_no_trace("no progress no pending"); + let e = Error::NoProgressNoPending; Ready(Some(CaConnSetItem::Error(e))) } } diff --git a/netfetch/src/ca/proto.rs b/netfetch/src/ca/proto.rs index c6ac720..13ad68e 100644 --- a/netfetch/src/ca/proto.rs +++ b/netfetch/src/ca/proto.rs @@ -330,6 +330,35 @@ impl CaEventValue { CaMetaValue::CaMetaVariants(_) => None, } } + + pub fn f32_for_binning(&self) -> f32 { + match &self.data { + CaDataValue::Scalar(val) => { + use super::proto::CaDataScalarValue::*; + match val { + I8(x) => *x as f32, + I16(x) => *x as f32, + I32(x) => *x as f32, + F32(x) => *x as f32, + F64(x) => *x as f32, + Enum(x) => *x as f32, + String(x) => x.len() as f32, + Bool(x) => f32::from(*x), + } + } + CaDataValue::Array(val) => { + use super::proto::CaDataArrayValue::*; + match val { + I8(x) => x.iter().fold(0., |a, x| a + *x as f32), + I16(x) => x.iter().fold(0., |a, x| a + *x as f32), + I32(x) => x.iter().fold(0., |a, x| a + *x as f32), + F32(x) => x.iter().fold(0., |a, x| a + *x as f32), + F64(x) => x.iter().fold(0., |a, x| a + *x as f32), + Bool(x) => x.iter().fold(0., |a, x| a + f32::from(*x)), + } + } + } + } } #[derive(Debug, Clone, PartialEq)] diff --git a/netfetch/src/ca/statemap.rs b/netfetch/src/ca/statemap.rs index 877086d..651b11a 100644 --- a/netfetch/src/ca/statemap.rs +++ b/netfetch/src/ca/statemap.rs @@ -1,6 +1,6 @@ use crate::ca::conn::ChannelStateInfo; use crate::conf::ChannelConfig; -use crate::daemon_common::Channel; +use crate::daemon_common::ChannelName; use dashmap::DashMap; use serde::Serialize; use series::ChannelStatusSeriesId; @@ -9,6 +9,7 @@ use serieswriter::fixgridwriter::ChannelStatusWriteState; use std::collections::btree_map::RangeMut; use std::collections::BTreeMap; use std::collections::HashMap; +use std::net::SocketAddr; use std::net::SocketAddrV4; use std::ops::RangeBounds; use std::time::Duration; @@ -68,6 +69,14 @@ pub struct UnassignedState { unused_since_ts: Instant, } +#[derive(Debug, Clone, Serialize)] +pub struct UnassigningForConfigChangeState { + pub config_new: ChannelConfig, + pub addr: SocketAddr, + #[serde(with = "serde_helper::serde_Instant")] + pub since: Instant, +} + #[derive(Debug, Clone, Serialize)] pub enum WithStatusSeriesIdStateInner { AddrSearchPending { @@ -87,6 +96,7 @@ pub enum WithStatusSeriesIdStateInner { since: SystemTime, }, MaybeWrongAddress(MaybeWrongAddressState), + UnassigningForConfigChange(UnassigningForConfigChangeState), } #[derive(Debug, Clone, Serialize)] @@ -156,22 +166,35 @@ pub enum ActiveChannelState { pub enum ChannelStateValue { Active(ActiveChannelState), ToRemove { addr: Option }, + InitDummy, } #[derive(Debug, Clone, Serialize)] pub struct ChannelState { pub value: ChannelStateValue, pub config: ChannelConfig, + pub touched: u8, +} + +impl ChannelState { + // TODO remove when no longer needed + pub fn is_dummy(&self) -> bool { + if let ChannelStateValue::InitDummy = self.value { + true + } else { + false + } + } } #[derive(Debug, Serialize)] pub struct ChannelStateMap { - map: BTreeMap, + map: BTreeMap, #[serde(skip)] - map2: HashMap, + map2: HashMap, // TODO implement same interface via dashmap and compare #[serde(skip)] - map3: DashMap, + map3: DashMap, } impl ChannelStateMap { @@ -183,20 +206,31 @@ impl ChannelStateMap { } } - pub fn insert(&mut self, k: Channel, v: ChannelState) -> Option { + pub fn insert(&mut self, k: ChannelName, v: ChannelState) -> Option { self.map.insert(k, v) } - pub fn get_mut(&mut self, k: &Channel) -> Option<&mut ChannelState> { - self.map.iter_mut(); + pub fn get_mut(&mut self, k: &ChannelName) -> Option<&mut ChannelState> { self.map.get_mut(k) } - pub fn iter(&self) -> impl Iterator { + pub fn get_mut_or_dummy_init(&mut self, k: &ChannelName) -> &mut ChannelState { + if !self.map.contains_key(k) { + let dummy = ChannelState { + value: ChannelStateValue::InitDummy, + config: ChannelConfig::dummy(), + touched: 0, + }; + self.map.insert(k.clone(), dummy); + } + self.map.get_mut(k).unwrap() + } + + pub fn iter(&self) -> impl Iterator { self.map.iter() } - pub fn iter_mut(&mut self) -> impl Iterator { + pub fn iter_mut(&mut self) -> impl Iterator { self.map.iter_mut() } @@ -204,12 +238,16 @@ impl ChannelStateMap { todo!() } - pub fn range_mut(&mut self, range: R) -> RangeMut + pub fn range_mut(&mut self, range: R) -> RangeMut where - R: RangeBounds, + R: RangeBounds, { self.map.range_mut(range) } + + pub fn remove(&mut self, k: &ChannelName) -> Option { + self.map.remove(k) + } } pub struct ChannelStateIter<'a> { diff --git a/netfetch/src/conf.rs b/netfetch/src/conf.rs index 1ed4e7a..7ac2a91 100644 --- a/netfetch/src/conf.rs +++ b/netfetch/src/conf.rs @@ -54,6 +54,10 @@ impl CaIngestOpts { self.api_bind.clone() } + pub fn channels(&self) -> Option { + self.channels.clone() + } + pub fn udp_broadcast_bind(&self) -> Option<&str> { self.udp_broadcast_bind.as_ref().map(String::as_str) } @@ -178,61 +182,9 @@ fn test_duration_parse() { assert_eq!(a.dur, Duration::from_millis(3170)); } -pub async fn parse_config(config: PathBuf) -> Result<(CaIngestOpts, Option), Error> { - let mut file = OpenOptions::new().read(true).open(config).await?; - let mut buf = Vec::new(); - file.read_to_end(&mut buf).await?; - let conf: CaIngestOpts = serde_yaml::from_slice(&buf).map_err(Error::from_string)?; - drop(file); - let re_p = regex::Regex::new(&conf.whitelist.clone().unwrap_or("--nothing-ur9nc23ur98c--".into()))?; - let re_n = regex::Regex::new(&conf.blacklist.clone().unwrap_or("--nothing-ksm2u98rcm28--".into()))?; - let channels = if let Some(fname) = conf.channels.as_ref() { - let meta = tokio::fs::metadata(fname).await?; - if meta.is_file() { - if fname.ends_with(".txt") { - Some(parse_channel_config_txt(fname, re_p, re_n).await?) - } else { - let e = Error::with_msg_no_trace(format!("unsupported channel config file {:?}", fname)); - return Err(e); - } - } else if meta.is_dir() { - Some(parse_config_dir(&fname).await?) - } else { - let e = Error::with_msg_no_trace(format!("unsupported channel config input {:?}", fname)); - return Err(e); - } - } else { - None - }; - Ok((conf, channels)) -} - -async fn parse_config_dir(dir: &Path) -> Result { - let mut ret = ChannelsConfig::new(); - let mut rd = tokio::fs::read_dir(dir).await?; - loop { - let e = rd.next_entry().await?; - let e = if let Some(x) = e { - x - } else { - break; - }; - let fnp = e.path(); - let fns = fnp.to_str().unwrap(); - if fns.ends_with(".yml") || fns.ends_with(".yaml") { - let buf = tokio::fs::read(e.path()).await?; - let conf: BTreeMap = - serde_yaml::from_slice(&buf).map_err(Error::from_string)?; - info!("parsed {} channels from {}", conf.len(), fns); - ret.push_from_parsed(&conf); - } else { - debug!("ignore channel config file {:?}", e.path()); - } - } - Ok(ret) -} - -async fn parse_channel_config_txt(fname: &Path, re_p: Regex, re_n: Regex) -> Result { +async fn parse_channel_config_txt(fname: &Path) -> Result { + let re_p = Regex::new("--------------------------").unwrap(); + let re_n = Regex::new("--------------------------").unwrap(); let mut file = OpenOptions::new().read(true).open(fname).await?; let mut buf = Vec::new(); file.read_to_end(&mut buf).await?; @@ -269,12 +221,70 @@ async fn parse_channel_config_txt(fname: &Path, re_p: Regex, re_n: Regex) -> Res Ok(conf) } +pub async fn parse_channels(channels_dir: Option) -> Result, Error> { + if let Some(fname) = channels_dir.as_ref() { + let meta = tokio::fs::metadata(fname).await?; + if meta.is_file() { + if fname.ends_with(".txt") { + Ok(Some(parse_channel_config_txt(fname).await?)) + } else { + let e = Error::with_msg_no_trace(format!("unsupported channel config file {:?}", fname)); + return Err(e); + } + } else if meta.is_dir() { + Ok(Some(parse_config_dir(&fname).await?)) + } else { + let e = Error::with_msg_no_trace(format!("unsupported channel config input {:?}", fname)); + return Err(e); + } + } else { + Ok(None) + } +} + +pub async fn parse_config(config: PathBuf) -> Result<(CaIngestOpts, Option), Error> { + let mut file = OpenOptions::new().read(true).open(config).await?; + let mut buf = Vec::new(); + file.read_to_end(&mut buf).await?; + let conf: CaIngestOpts = serde_yaml::from_slice(&buf).map_err(Error::from_string)?; + drop(file); + // let re_p = regex::Regex::new(&conf.whitelist.clone().unwrap_or("--nothing-ur9nc23ur98c--".into()))?; + // let re_n = regex::Regex::new(&conf.blacklist.clone().unwrap_or("--nothing-ksm2u98rcm28--".into()))?; + let channels = parse_channels(conf.channels.clone()).await?; + Ok((conf, channels)) +} + +async fn parse_config_dir(dir: &Path) -> Result { + let mut ret = ChannelsConfig::new(); + let mut rd = tokio::fs::read_dir(dir).await?; + loop { + let e = rd.next_entry().await?; + let e = if let Some(x) = e { + x + } else { + break; + }; + let fnp = e.path(); + let fns = fnp.to_str().unwrap(); + if fns.ends_with(".yml") || fns.ends_with(".yaml") { + let buf = tokio::fs::read(e.path()).await?; + let conf: BTreeMap = + serde_yaml::from_slice(&buf).map_err(Error::from_string)?; + info!("parsed {} channels from {}", conf.len(), fns); + ret.push_from_parsed(&conf); + } else { + debug!("ignore channel config file {:?}", e.path()); + } + } + Ok(ret) +} + #[derive(Debug, Deserialize)] pub struct ChannelConfigParse { archiving_configuration: IngestConfigArchiving, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub enum ChannelTimestamp { Archiver, IOC, @@ -286,7 +296,7 @@ impl ChannelTimestamp { } } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct IngestConfigArchiving { #[serde(default = "bool_true")] #[serde(with = "serde_replication_bool")] @@ -306,6 +316,20 @@ pub struct IngestConfigArchiving { timestamp: ChannelTimestamp, } +impl IngestConfigArchiving { + // TODO remove when no longer needed + pub fn dummy() -> Self { + Self { + replication: false, + short_term: None, + medium_term: None, + long_term: None, + is_polled: false, + timestamp: ChannelTimestamp::Archiver, + } + } +} + fn bool_is_false(x: &bool) -> bool { *x == false } @@ -545,7 +569,7 @@ impl From> for ChannelsConfig { } } -#[derive(Debug, Clone, Serialize)] +#[derive(Debug, Clone, PartialEq, Serialize)] pub struct ChannelConfig { name: String, arch: IngestConfigArchiving, @@ -640,4 +664,12 @@ impl ChannelConfig { }, } } + + // TODO remove when no longer needed. + pub fn dummy() -> Self { + Self { + name: String::from("dummy"), + arch: IngestConfigArchiving::dummy(), + } + } } diff --git a/netfetch/src/daemon_common.rs b/netfetch/src/daemon_common.rs index 781d2c2..b793257 100644 --- a/netfetch/src/daemon_common.rs +++ b/netfetch/src/daemon_common.rs @@ -4,11 +4,11 @@ use async_channel::Sender; use serde::Serialize; #[derive(Clone, Debug, Serialize, PartialEq, PartialOrd, Eq, Ord, Hash)] -pub struct Channel { +pub struct ChannelName { name: String, } -impl Channel { +impl ChannelName { pub fn new(name: String) -> Self { Self { name } } @@ -18,13 +18,14 @@ impl Channel { } } -#[derive(Debug, Clone)] +#[derive(Debug)] pub enum DaemonEvent { TimerTick(u32, Sender), ChannelAdd(ChannelConfig, crate::ca::conn::CmdResTx), - ChannelRemove(Channel), + ChannelRemove(ChannelName), CaConnSetItem(CaConnSetItem), Shutdown, + ConfigReload(async_channel::Sender), } impl DaemonEvent { @@ -36,6 +37,7 @@ impl DaemonEvent { ChannelRemove(x) => format!("ChannelRemove {x:?}"), CaConnSetItem(_) => format!("CaConnSetItem"), Shutdown => format!("Shutdown"), + ConfigReload(..) => format!("ConfigReload"), } } } diff --git a/netfetch/src/metrics.rs b/netfetch/src/metrics.rs index 3d96be5..4cd489b 100644 --- a/netfetch/src/metrics.rs +++ b/netfetch/src/metrics.rs @@ -182,6 +182,24 @@ async fn always_error(params: HashMap) -> Result) -> Result, Response> { + let (tx, rx) = async_channel::bounded(10); + let item = DaemonEvent::ConfigReload(tx); + dcom.tx.send(item).await; + match rx.recv().await { + Ok(x) => { + let res = serde_json::json!({"result":{"ok":x}}); + let ret = serde_json::to_value(&res).unwrap(); + Ok(axum::Json(ret)) + } + Err(e) => { + let res = serde_json::json!({"result":{"err":"recverr"}}); + let ret = serde_json::to_value(&res).unwrap(); + Ok(axum::Json(ret)) + } + } +} + async fn find_channel( params: HashMap, dcom: Arc, @@ -378,6 +396,19 @@ fn make_routes( }), ), ) + .nest( + "/config", + Router::new() + .route("/", get(|| async { axum::Json(json!({"__tmp":"slashed"})) })) + .route("//", get(|| async { axum::Json(json!({"__tmp":"doubleslashed"})) })) + .route( + "/reload", + get({ + let dcom = dcom.clone(); + || config_reload(dcom) + }), + ), + ) .nest( "/channel", Router::new() diff --git a/netfetch/src/metrics/status.rs b/netfetch/src/metrics/status.rs index 0bd1370..de6e389 100644 --- a/netfetch/src/metrics/status.rs +++ b/netfetch/src/metrics/status.rs @@ -255,11 +255,16 @@ async fn channel_states_try( ChannelState::connecting_addr(st1.config, None, ConnectionState::Unreachable); states.channels.insert(k, chst); } + WithStatusSeriesIdStateInner::UnassigningForConfigChange(_) => { + let chst = ChannelState::connecting_addr(st1.config, None, ConnectionState::Connecting); + states.channels.insert(k, chst); + } } } } } ChannelStateValue::ToRemove { .. } => {} + ChannelStateValue::InitDummy { .. } => {} } } Ok(axum::Json(states)) diff --git a/netfetch/src/throttletrace.rs b/netfetch/src/throttletrace.rs index 2c6a0c4..401c8e1 100644 --- a/netfetch/src/throttletrace.rs +++ b/netfetch/src/throttletrace.rs @@ -37,4 +37,15 @@ impl ThrottleTrace { } } } + + pub fn is_action(&mut self) -> bool { + self.count += 1; + let tsnow = Instant::now(); + if self.next <= tsnow { + self.next = tsnow + self.ivl; + true + } else { + false + } + } } diff --git a/scywr/src/insertworker.rs b/scywr/src/insertworker.rs index cd7f7da..d324824 100644 --- a/scywr/src/insertworker.rs +++ b/scywr/src/insertworker.rs @@ -8,6 +8,7 @@ use crate::iteminsertqueue::InsertItem; use crate::iteminsertqueue::MspItem; use crate::iteminsertqueue::QueryItem; use crate::iteminsertqueue::TimeBinSimpleF32; +use crate::iteminsertqueue::TimeBinSimpleF32V01; use crate::store::DataStore; use async_channel::Receiver; use atomic::AtomicU64; @@ -274,6 +275,9 @@ where QueryItem::Insert(item) => prepare_query_insert_futs(item, &data_store, &stats, tsnow), QueryItem::Msp(item) => prepare_msp_insert_futs(item, &data_store, &stats, tsnow), QueryItem::TimeBinSimpleF32(item) => prepare_timebin_insert_futs(item, &data_store, &stats, tsnow), + QueryItem::TimeBinSimpleF32V01(item) => { + prepare_timebin_v01_insert_futs(item, &data_store, &stats, tsnow) + } QueryItem::Accounting(item) => prepare_accounting_insert_futs(item, &data_store, &stats, tsnow), QueryItem::AccountingRecv(item) => { prepare_accounting_recv_insert_futs(item, &data_store, &stats, tsnow) @@ -305,6 +309,9 @@ fn inspect_items( QueryItem::TimeBinSimpleF32(_) => { trace_item_execute!("execute {worker_name} TimeBinSimpleF32"); } + QueryItem::TimeBinSimpleF32V01(_) => { + trace_item_execute!("execute {worker_name} TimeBinSimpleF32V01"); + } QueryItem::Accounting(_) => { trace_item_execute!("execute {worker_name} Accounting {item:?}"); } @@ -400,6 +407,48 @@ fn prepare_timebin_insert_futs( futs } +fn prepare_timebin_v01_insert_futs( + item: TimeBinSimpleF32V01, + data_store: &Arc, + stats: &Arc, + tsnow: Instant, +) -> SmallVec<[InsertFut; 4]> { + trace!("have time bin patch to insert: {item:?}"); + let params = ( + item.series.id() as i64, + item.bin_len_ms, + item.ts_msp.to_i64(), + item.off, + item.count, + item.min, + item.max, + item.avg, + ); + // TODO would be better to count inserts only on completed insert + stats.inserted_binned().inc(); + let fut = InsertFut::new( + data_store.scy.clone(), + data_store.qu_insert_binned_scalar_f32_v01.clone(), + params, + tsnow, + stats.clone(), + ); + let futs = smallvec![fut]; + + // TODO match on the query result: + // match qres { + // Ok(_) => { + // backoff = backoff_0; + // } + // Err(e) => { + // stats_inc_for_err(&stats, &crate::iteminsertqueue::Error::QueryError(e)); + // back_off_sleep(&mut backoff).await; + // } + // } + + futs +} + fn prepare_accounting_insert_futs( item: Accounting, data_store: &Arc, diff --git a/scywr/src/iteminsertqueue.rs b/scywr/src/iteminsertqueue.rs index 576a071..e4fb115 100644 --- a/scywr/src/iteminsertqueue.rs +++ b/scywr/src/iteminsertqueue.rs @@ -98,6 +98,25 @@ impl ScalarValue { ScalarValue::Bool(x) => x.to_string(), } } + + pub fn f32_for_binning(&self) -> f32 { + use ScalarValue::*; + match self { + U8(x) => *x as f32, + U16(x) => *x as f32, + U32(x) => *x as f32, + U64(x) => *x as f32, + I8(x) => *x as f32, + I16(x) => *x as f32, + I32(x) => *x as f32, + I64(x) => *x as f32, + F32(x) => *x as f32, + F64(x) => *x as f32, + Enum(x, _) => *x as f32, + String(x) => x.len() as f32, + Bool(x) => f32::from(*x), + } + } } #[derive(Clone, Debug, PartialEq)] @@ -294,6 +313,23 @@ impl ArrayValue { Bool(x) => format!("{}", x.get(0).map_or(false, |x| *x)), } } + + pub fn f32_for_binning(&self) -> f32 { + use ArrayValue::*; + match self { + U8(x) => x.iter().fold(0., |a, x| a + *x as f32), + U16(x) => x.iter().fold(0., |a, x| a + *x as f32), + U32(x) => x.iter().fold(0., |a, x| a + *x as f32), + U64(x) => x.iter().fold(0., |a, x| a + *x as f32), + I8(x) => x.iter().fold(0., |a, x| a + *x as f32), + I16(x) => x.iter().fold(0., |a, x| a + *x as f32), + I32(x) => x.iter().fold(0., |a, x| a + *x as f32), + I64(x) => x.iter().fold(0., |a, x| a + *x as f32), + F32(x) => x.iter().fold(0., |a, x| a + *x as f32), + F64(x) => x.iter().fold(0., |a, x| a + *x as f32), + Bool(x) => x.iter().fold(0., |a, x| a + f32::from(*x)), + } + } } #[derive(Clone, Debug, PartialEq)] @@ -323,6 +359,13 @@ impl DataValue { DataValue::Array(x) => x.string_short(), } } + + pub fn f32_for_binning(&self) -> f32 { + match self { + DataValue::Scalar(x) => x.f32_for_binning(), + DataValue::Array(x) => x.f32_for_binning(), + } + } } pub trait GetValHelp { @@ -506,12 +549,25 @@ pub struct TimeBinSimpleF32 { pub avg: f32, } +#[derive(Debug, Clone)] +pub struct TimeBinSimpleF32V01 { + pub series: SeriesId, + pub bin_len_ms: i32, + pub ts_msp: TsMs, + pub off: i32, + pub count: i64, + pub min: f32, + pub max: f32, + pub avg: f32, +} + // Needs to be Clone to send it to multiple retention times if required. #[derive(Debug, Clone)] pub enum QueryItem { Insert(InsertItem), Msp(MspItem), TimeBinSimpleF32(TimeBinSimpleF32), + TimeBinSimpleF32V01(TimeBinSimpleF32V01), Accounting(Accounting), AccountingRecv(AccountingRecv), } diff --git a/scywr/src/schema.rs b/scywr/src/schema.rs index ed6144c..7e30f3f 100644 --- a/scywr/src/schema.rs +++ b/scywr/src/schema.rs @@ -674,6 +674,28 @@ pub async fn migrate_scylla_data_schema( ); tab.setup(do_change, scy).await?; } + { + let tab = GenTwcsTab::new( + ks, + rett.table_prefix(), + "binned_scalar_f32_v01", + &[ + ("series", "bigint"), + ("bin_len_ms", "int"), + ("ts_msp", "bigint"), + ("off", "int"), + ("count", "bigint"), + ("min", "float"), + ("max", "float"), + ("avg", "float"), + ], + ["series", "bin_len_ms", "ts_msp"], + ["off"], + rett.ttl_binned(), + ); + let do_change = true; + tab.setup(do_change, scy).await?; + } { let tab = GenTwcsTab::new( ks, diff --git a/scywr/src/store.rs b/scywr/src/store.rs index c4f761f..a5b5acb 100644 --- a/scywr/src/store.rs +++ b/scywr/src/store.rs @@ -45,6 +45,7 @@ pub struct DataStore { pub qu_insert_array_f32: Arc, pub qu_insert_array_f64: Arc, pub qu_insert_array_bool: Arc, + pub qu_insert_binned_scalar_f32_v01: Arc, pub qu_insert_binned_scalar_f32_v02: Arc, pub qu_account_00: Arc, pub qu_account_recv_00: Arc, @@ -149,6 +150,14 @@ impl DataStore { let qu_insert_array_f64 = prep_qu_ins_b!("events_array_f64", rett, scy); let qu_insert_array_bool = prep_qu_ins_b!("events_array_bool", rett, scy); + let qu_insert_binned_scalar_f32_v01 = prep_qu_ins_c!( + "binned_scalar_f32_v01", + "series, bin_len_ms, ts_msp, off, count, min, max, avg", + "?, ?, ?, ?, ?, ?, ?, ?", + rett, + scy + ); + let qu_insert_binned_scalar_f32_v02 = prep_qu_ins_c!( "binned_scalar_f32", "series, bin_len_ms, ts_msp, off, count, min, max, avg", @@ -210,6 +219,7 @@ impl DataStore { qu_insert_array_f32, qu_insert_array_f64, qu_insert_array_bool, + qu_insert_binned_scalar_f32_v01, qu_insert_binned_scalar_f32_v02, qu_account_00, qu_account_recv_00, diff --git a/serieswriter/src/binwriter.rs b/serieswriter/src/binwriter.rs index e1b5776..ad4ee08 100644 --- a/serieswriter/src/binwriter.rs +++ b/serieswriter/src/binwriter.rs @@ -1,31 +1,31 @@ -use crate::timebin::ConnTimeBin; -use crate::writer::SeriesWriter; -use async_channel::Sender; use err::thiserror; use err::ThisError; +use items_2::binning::container_events::ContainerEvents; +use items_2::binning::timeweight::timeweight_events::BinnedEventsTimeweight; use netpod::log::*; +use netpod::range::evrange::NanoRange; use netpod::ttl::RetentionTime; -use netpod::DtNano; +use netpod::BinnedRange; +use netpod::DtMs; use netpod::ScalarType; use netpod::Shape; +use netpod::TsMs; use netpod::TsNano; use scywr::insertqueues::InsertDeques; -use scywr::iteminsertqueue::DataValue; use scywr::iteminsertqueue::QueryItem; +use scywr::iteminsertqueue::TimeBinSimpleF32V01; use series::ChannelStatusSeriesId; use series::SeriesId; -use std::collections::VecDeque; -use std::time::Duration; -use std::time::SystemTime; +use std::mem; #[allow(unused)] -macro_rules! trace_binning { - ($($arg:tt)*) => { - if false { - trace!($($arg)*); - } - }; -} +macro_rules! trace_ingest { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } + +#[allow(unused)] +macro_rules! trace_tick { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } + +#[allow(unused)] +macro_rules! trace_tick_verbose { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } #[derive(Debug, ThisError)] #[cstm(name = "SerieswriterBinwriter")] @@ -33,15 +33,19 @@ pub enum Error { SeriesLookupError, SeriesWriter(#[from] crate::writer::Error), Timebin(#[from] crate::timebin::Error), + Binning(#[from] items_2::binning::timeweight::timeweight_events::Error), + UnsupportedBinGrid(DtMs), } #[derive(Debug)] pub struct BinWriter { rt: RetentionTime, + cssid: ChannelStatusSeriesId, sid: SeriesId, scalar_type: ScalarType, shape: Shape, - binner: ConnTimeBin, + evbuf: ContainerEvents, + binner: BinnedEventsTimeweight, } impl BinWriter { @@ -57,20 +61,17 @@ impl BinWriter { // TODO select the desired bin width based on channel configuration: // that's user knowledge, it really depends on what users want. // For the moment, assume a fixed value. - let bin_len = DtNano::from_ms(1000 * 10); - let binner = ConnTimeBin::new( - rt.clone(), - sid.clone(), - beg, - bin_len, - scalar_type.clone(), - shape.clone(), - )?; + let margin = 1000 * 1000 * 1000 * 60 * 60 * 24 * 40; + let end = u64::MAX - margin; + let range = BinnedRange::from_nano_range(NanoRange::from_ns_u64(beg.ns(), end), DtMs::from_ms_u64(1000 * 10)); + let binner = BinnedEventsTimeweight::new(range); let ret = Self { rt, + cssid, sid, scalar_type, shape, + evbuf: ContainerEvents::new(), binner, }; Ok(ret) @@ -88,20 +89,56 @@ impl BinWriter { self.shape.clone() } - pub fn ingest( - &mut self, - ts_ioc: TsNano, - ts_local: TsNano, - val: &DataValue, - iqdqs: &mut InsertDeques, - ) -> Result<(), Error> { - let ts_main = ts_local; - self.binner.push(ts_main.clone(), val)?; + pub fn ingest(&mut self, ts_local: TsNano, val: f32, iqdqs: &mut InsertDeques) -> Result<(), Error> { + let _ = iqdqs; + trace_ingest!("ingest {ts_local} {val}"); + self.evbuf.push_back(ts_local, val); Ok(()) } pub fn tick(&mut self, iqdqs: &mut InsertDeques) -> Result<(), Error> { - self.binner.tick(iqdqs)?; + if self.evbuf.len() != 0 { + trace_tick!("tick evbuf len {}", self.evbuf.len()); + let buf = mem::replace(&mut self.evbuf, ContainerEvents::new()); + self.binner.ingest(buf)?; + } else { + trace_tick_verbose!("tick NOTHING TO INGEST"); + } + let out = self.binner.output(); + if out.len() != 0 { + trace_tick!("bins ready len {}", out.len()); + for e in out.iter_debug() { + trace_tick_verbose!("{e:?}"); + } + for ((((((&ts1, &ts2), &cnt), &min), &max), &avg), &fnl) in out.zip_iter() { + if fnl == false { + debug!("non final bin"); + } else { + let bin_len = DtMs::from_ms_u64(ts2.delta(ts1).ms_u64()); + let div = if bin_len == DtMs::from_ms_u64(1000 * 10) { + DtMs::from_ms_u64(1000 * 60 * 60 * 2) + } else { + // TODO + return Err(Error::UnsupportedBinGrid(bin_len)); + }; + let ts_msp = TsMs::from_ms_u64(ts1.ms() / div.ms() * div.ms()); + let off = (ts1.ms() - ts_msp.ms()) / bin_len.ms(); + let item = QueryItem::TimeBinSimpleF32V01(TimeBinSimpleF32V01 { + series: self.sid.clone(), + bin_len_ms: bin_len.ms() as i32, + ts_msp, + off: off as i32, + count: cnt as i64, + min, + max, + avg, + }); + iqdqs.lt_rf3_qu.push_back(item); + } + } + } else { + trace_tick_verbose!("tick NO BINS YET"); + } Ok(()) } } diff --git a/stats/src/stats.rs b/stats/src/stats.rs index c1ef8ff..4211d74 100644 --- a/stats/src/stats.rs +++ b/stats/src/stats.rs @@ -308,6 +308,7 @@ stats_proc::stats_struct!(( insert_worker_join_ok_err, insert_worker_join_err, caconnset_health_response, + channel_send_err, ), values( channel_unknown_address, @@ -402,6 +403,7 @@ stats_proc::stats_struct!(( monitor_stale_read_timeout, ca_proto_no_version_as_first, ca_proto_version_later, + no_cid_for_subid, ), values(inter_ivl_ema, read_ioids_len, proto_out_len,), histolog2s(