diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index 6cf0a08..334798c 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -20,7 +20,6 @@ use conn::ChannelStateInfo; use conn::ChannelStatusPartial; use conn::ConnCommand; use conn::ConnCommandResult; -use core::fmt; use dbpg::seriesbychannel::BoxedSend; use dbpg::seriesbychannel::CanSendChannelInfoResult; use dbpg::seriesbychannel::ChannelInfoQuery; @@ -55,6 +54,7 @@ use stats::CaProtoStats; use stats::IocFinderStats; use std::collections::BTreeMap; use std::collections::VecDeque; +use std::fmt; use std::net::SocketAddr; use std::net::SocketAddrV4; use std::pin::Pin; @@ -584,13 +584,7 @@ impl CaConnSet { fn handle_add_channel_new(cmd: ChannelAdd, ress: StateTransRes) -> Result<(), Error> { { - let item = ChannelState { - value: ChannelStateValue::Active(ActiveChannelState::WaitForStatusSeriesId { - since: SystemTime::now(), - }), - config: cmd.ch_cfg.clone(), - touched: 1, - }; + let item = ChannelState::new_wait_for_cssid(&cmd.ch_cfg); *ress.chst = item; } { diff --git a/netfetch/src/ca/statemap.rs b/netfetch/src/ca/statemap.rs index 95bdfef..40ff236 100644 --- a/netfetch/src/ca/statemap.rs +++ b/netfetch/src/ca/statemap.rs @@ -188,6 +188,7 @@ pub struct ChannelState { pub value: ChannelStateValue, pub config: ChannelConfig, pub touched: u8, + config_file_basename: String, } impl ChannelState { @@ -199,6 +200,26 @@ impl ChannelState { false } } + + pub fn new_dummy() -> Self { + Self { + value: ChannelStateValue::InitDummy, + config: ChannelConfig::dummy(), + touched: 0, + config_file_basename: String::new(), + } + } + + pub fn new_wait_for_cssid(ch_cfg: &crate::conf::ChannelConfig) -> Self { + Self { + value: ChannelStateValue::Active(ActiveChannelState::WaitForStatusSeriesId { + since: SystemTime::now(), + }), + config: ch_cfg.clone(), + touched: 1, + config_file_basename: ch_cfg.config_file_basename().into(), + } + } } #[derive(Debug, Serialize)] @@ -230,11 +251,7 @@ impl ChannelStateMap { pub fn get_mut_or_dummy_init(&mut self, k: &ChannelName) -> &mut ChannelState { if !self.map.contains_key(k) { - let dummy = ChannelState { - value: ChannelStateValue::InitDummy, - config: ChannelConfig::dummy(), - touched: 0, - }; + let dummy = ChannelState::new_dummy(); self.map.insert(k.clone(), dummy); } self.map.get_mut(k).unwrap() diff --git a/netfetch/src/conf.rs b/netfetch/src/conf.rs index 869c4a8..f09ac5e 100644 --- a/netfetch/src/conf.rs +++ b/netfetch/src/conf.rs @@ -43,6 +43,7 @@ pub struct CaIngestOpts { pub test_bsread_addr: Option, #[serde(default)] scylla_disable: bool, + #[serde(default)] scylla_ignore_writes: bool, } @@ -188,6 +189,7 @@ fn test_duration_parse() { } async fn parse_channel_config_txt(fname: &Path) -> Result { + let basename = fname.file_stem().unwrap().to_str().unwrap(); let re_p = Regex::new("--------------------------").unwrap(); let re_n = Regex::new("--------------------------").unwrap(); let mut file = OpenOptions::new().read(true).open(fname).await?; @@ -218,6 +220,7 @@ async fn parse_channel_config_txt(fname: &Path) -> Result is_polled: false, timestamp: ChannelTimestamp::Archiver, }, + config_file_basename: basename.to_string(), }; conf.channels.push(item); } @@ -272,11 +275,12 @@ async fn parse_config_dir(dir: &Path) -> Result { let fnp = e.path(); let fns = fnp.to_str().unwrap(); if fns.ends_with(".yml") || fns.ends_with(".yaml") { + let basename = fnp.file_stem().unwrap().to_str().unwrap(); let buf = tokio::fs::read(e.path()).await?; let conf: BTreeMap = serde_yaml::from_slice(&buf).map_err(Error::from_string)?; info!("parsed {} channels from {}", conf.len(), fns); - ret.push_from_parsed(&conf); + ret.push_from_parsed(&conf, basename); } else { debug!("ignore channel config file {:?}", e.path()); } @@ -299,23 +303,31 @@ impl ChannelTimestamp { fn default_config() -> Self { Self::Archiver } + + fn is_default(&self) -> bool { + if let ChannelTimestamp::Archiver = self { + true + } else { + false + } + } } -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Deserialize)] pub struct IngestConfigArchiving { #[serde(default = "bool_true")] #[serde(with = "serde_replication_bool")] replication: bool, - #[serde(default, skip_serializing_if = "Option::is_none")] + #[serde(default)] #[serde(with = "serde_option_channel_read_config")] short_term: Option, - #[serde(default, skip_serializing_if = "Option::is_none")] + #[serde(default)] #[serde(with = "serde_option_channel_read_config")] medium_term: Option, - #[serde(default, skip_serializing_if = "Option::is_none")] + #[serde(default)] #[serde(with = "serde_option_channel_read_config")] long_term: Option, - #[serde(default, skip_serializing_if = "bool_is_false")] + #[serde(default)] is_polled: bool, #[serde(default = "ChannelTimestamp::default_config")] timestamp: ChannelTimestamp, @@ -343,6 +355,71 @@ fn bool_true() -> bool { true } +mod serde_ingest_config_archiving { + use super::ChannelReadConfigApiFormat; + use super::IngestConfigArchiving; + use serde::de; + use serde::ser; + use serde::ser::SerializeMap; + use serde::Deserializer; + use serde::Serializer; + use std::fmt; + + impl ser::Serialize for IngestConfigArchiving { + fn serialize(&self, ser: S) -> Result + where + S: Serializer, + { + let mut map = ser.serialize_map(None)?; + // ser.is_human_readable() + if !self.replication { + map.serialize_entry("replication", &self.replication)?; + } + if let Some(v) = self.short_term.as_ref() { + map.serialize_entry("short_term", &ChannelReadConfigApiFormat(&v))?; + } + if let Some(v) = self.medium_term.as_ref() { + map.serialize_entry("medium_term", &ChannelReadConfigApiFormat(&v))?; + } + if let Some(v) = self.long_term.as_ref() { + map.serialize_entry("long_term", &ChannelReadConfigApiFormat(&v))?; + } + let anymon = [&self.short_term, &self.medium_term, &self.long_term] + .into_iter() + .map(|c| c.as_ref().map_or(false, |x| x.is_monitor())) + .fold(false, |a, x| a || x); + if anymon && self.is_polled || !anymon && !self.is_polled { + map.serialize_entry("is_polled", &self.is_polled)?; + } + if !self.timestamp.is_default() { + map.serialize_entry("timestamp", &self.timestamp)?; + } + map.end() + } + } +} + +struct ChannelReadConfigApiFormat<'a>(&'a ChannelReadConfig); + +#[allow(non_snake_case)] +mod serde_ChannelReadConfigApiFormat { + use super::ChannelReadConfig; + use super::ChannelReadConfigApiFormat; + use serde::ser; + + impl<'a> ser::Serialize for ChannelReadConfigApiFormat<'a> { + fn serialize(&self, ser: S) -> Result + where + S: ser::Serializer, + { + match &self.0 { + ChannelReadConfig::Monitor => ser.serialize_str("Monitor"), + ChannelReadConfig::Poll(n) => ser.serialize_u32(n.as_secs() as u32), + } + } + } +} + mod serde_replication_bool { use serde::de; use serde::Deserializer; @@ -489,6 +566,16 @@ pub enum ChannelReadConfig { Poll(Duration), } +impl ChannelReadConfig { + pub fn is_monitor(&self) -> bool { + if let Self::Monitor = self { + true + } else { + false + } + } +} + #[test] fn test_channel_config_00() { let inp = r###" @@ -546,38 +633,27 @@ impl ChannelsConfig { &self.channels } - fn push_from_parsed(&mut self, rhs: &BTreeMap) { + fn push_from_parsed(&mut self, rhs: &BTreeMap, config_file_basename: &str) { for (k, v) in rhs.iter() { let item = ChannelConfig { name: k.into(), arch: v.archiving_configuration.clone(), + config_file_basename: config_file_basename.into(), }; self.channels.push(item); } } } -impl From> for ChannelsConfig { - fn from(value: BTreeMap) -> Self { - let channels = value - .into_iter() - .map(|(k, v)| ChannelConfig { - name: k, - arch: v.archiving_configuration, - }) - .collect(); - ChannelsConfig { channels } - } -} - #[derive(Debug, Clone, PartialEq, Serialize)] pub struct ChannelConfig { name: String, arch: IngestConfigArchiving, + config_file_basename: String, } impl ChannelConfig { - pub fn st_monitor>(name: S) -> Self { + pub fn st_monitor>(name: S, config_file_basename: &str) -> Self { Self { name: name.into(), arch: IngestConfigArchiving { @@ -588,6 +664,7 @@ impl ChannelConfig { is_polled: false, timestamp: ChannelTimestamp::Archiver, }, + config_file_basename: config_file_basename.into(), } } @@ -595,6 +672,10 @@ impl ChannelConfig { &self.name } + pub fn config_file_basename(&self) -> &str { + &self.config_file_basename + } + pub fn is_polled(&self) -> bool { self.arch.is_polled } @@ -678,6 +759,18 @@ impl ChannelConfig { Self { name: String::from("dummy"), arch: IngestConfigArchiving::dummy(), + config_file_basename: String::new(), } } } + +#[derive(Debug, Clone, PartialEq, Serialize)] +pub struct ChannelConfigForStatesApi { + arch: IngestConfigArchiving, +} + +impl From for ChannelConfigForStatesApi { + fn from(value: ChannelConfig) -> Self { + Self { arch: value.arch } + } +} diff --git a/netfetch/src/metrics.rs b/netfetch/src/metrics.rs index 92303d7..0d7552c 100644 --- a/netfetch/src/metrics.rs +++ b/netfetch/src/metrics.rs @@ -215,7 +215,7 @@ async fn find_channel( async fn channel_add_inner(params: HashMap, dcom: Arc) -> Result<(), Error> { if let Some(name) = params.get("name") { // let ch = crate::daemon_common::Channel::new(name.into()); - let ch_cfg = ChannelConfig::st_monitor(name); + let ch_cfg = ChannelConfig::st_monitor(name, "api"); let (tx, rx) = async_channel::bounded(1); let ev = DaemonEvent::ChannelAdd(ch_cfg, tx); dcom.tx.send(ev).await?; diff --git a/netfetch/src/metrics/status.rs b/netfetch/src/metrics/status.rs index 7960959..78ac216 100644 --- a/netfetch/src/metrics/status.rs +++ b/netfetch/src/metrics/status.rs @@ -2,6 +2,7 @@ use crate::ca::connset::CaConnSetEvent; use crate::ca::connset::ChannelStatusesRequest; use crate::ca::connset::ConnSetCmd; use crate::conf::ChannelConfig; +use crate::conf::ChannelConfigForStatesApi; use async_channel::Sender; use chrono::DateTime; use chrono::Utc; @@ -62,7 +63,7 @@ impl StorageUsage { struct ChannelState { ioc_address: Option, connection: ConnectionState, - archiving_configuration: ChannelConfig, + archiving_configuration: ChannelConfigForStatesApi, recv_count: u64, recv_bytes: u64, #[serde(with = "humantime_serde", skip_serializing_if = "system_time_epoch")] @@ -87,9 +88,10 @@ impl ChannelState { fn connecting_addr(config: ChannelConfig, ioc_address: Option, connst: ConnectionState) -> Self { Self { + private: StatePrivate::default(config.config_file_basename()), ioc_address, connection: connst, - archiving_configuration: config, + archiving_configuration: config.into(), recv_count: 0, recv_bytes: 0, recv_last: SystemTime::UNIX_EPOCH, @@ -98,13 +100,13 @@ impl ChannelState { write_lt_last: SystemTime::UNIX_EPOCH, updated: SystemTime::UNIX_EPOCH, pong_last: None, - private: StatePrivate::default(), } } fn with_chst(config: ChannelConfig, chst: crate::ca::conn::ChannelStateInfo) -> Self { let private = StatePrivate { status_emit_count: chst.status_emit_count, + config_file_basename: config.config_file_basename().into(), }; let connst = { use crate::ca::conn::ChannelConnectedInfo::*; @@ -120,7 +122,7 @@ impl ChannelState { connection: connst, // TODO config is stored in two places // conf: chst.conf, - archiving_configuration: config, + archiving_configuration: config.into(), recv_count: chst.recv_count.unwrap_or(0), recv_bytes: chst.recv_bytes.unwrap_or(0), recv_last: chst.recv_last, @@ -137,11 +139,15 @@ impl ChannelState { #[derive(Debug, Serialize)] struct StatePrivate { status_emit_count: u64, + config_file_basename: String, } -impl Default for StatePrivate { - fn default() -> Self { - Self { status_emit_count: 0 } +impl StatePrivate { + fn default(config_file_basename: &str) -> Self { + Self { + status_emit_count: 0, + config_file_basename: config_file_basename.into(), + } } } diff --git a/serieswriter/src/binwriter.rs b/serieswriter/src/binwriter.rs index a3f9630..8827e3b 100644 --- a/serieswriter/src/binwriter.rs +++ b/serieswriter/src/binwriter.rs @@ -22,12 +22,12 @@ use series::ChannelStatusSeriesId; use series::SeriesId; use std::time::Duration; -macro_rules! trace_ingest { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } -macro_rules! trace_tick { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } -macro_rules! trace_tick_verbose { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } +macro_rules! trace_ingest { ($($arg:expr),*) => ( if false { trace!($($arg),*); } ) } +macro_rules! trace_tick { ($($arg:expr),*) => ( if false { trace!($($arg),*); } ) } +macro_rules! trace_tick_verbose { ($($arg:expr),*) => ( if false { trace!($($arg),*); } ) } -macro_rules! debug_bin2 { ($t:expr, $($arg:tt)*) => ( if true { if $t { debug!($($arg)*); } } ) } -macro_rules! trace_bin2 { ($t:expr, $($arg:tt)*) => ( if false { if $t { trace!($($arg)*); } } ) } +macro_rules! debug_bin2 { ($t:expr, $($arg:expr),*) => ( if true { if $t { debug!($($arg),*); } } ) } +macro_rules! trace_bin2 { ($t:expr, $($arg:expr),*) => ( if false { if $t { trace!($($arg),*); } } ) } autoerr::create_error_v1!( name(Error, "SerieswriterBinwriter"), @@ -39,6 +39,7 @@ autoerr::create_error_v1!( BinBinning(#[from] items_0::timebin::BinningggError), UnexpectedContainerType, PartitionMsp(#[from] series::msp::Error), + UnsupportedGridDiv(DtMs, DtMs), }, ); @@ -65,6 +66,21 @@ fn get_div(bin_len: DtMs) -> Result { Ok(ret) } +#[derive(Debug, Clone)] +enum WriteCntZero { + Enable, + Disable, +} + +impl WriteCntZero { + fn enabled(&self) -> bool { + match self { + WriteCntZero::Enable => true, + WriteCntZero::Disable => false, + } + } +} + #[derive(Debug)] pub struct BinWriter { chname: String, @@ -73,8 +89,8 @@ pub struct BinWriter { scalar_type: ScalarType, shape: Shape, evbuf: ContainerEvents, - binner_1st: Option<(RetentionTime, BinnedEventsTimeweight)>, - binner_others: Vec<(RetentionTime, BinnedBinsTimeweight)>, + binner_1st: Option<(RetentionTime, BinnedEventsTimeweight, WriteCntZero)>, + binner_others: Vec<(RetentionTime, BinnedBinsTimeweight, WriteCntZero)>, trd: bool, } @@ -98,16 +114,35 @@ impl BinWriter { let quiets = [min_quiets.st.clone(), min_quiets.mt.clone(), min_quiets.lt.clone()]; let mut binner_1st = None; let mut binner_others = Vec::new(); - let mut combs: Vec<_> = rts.into_iter().zip(quiets.into_iter().map(bin_len_clamp)).collect(); + let mut combs: Vec<_> = rts + .into_iter() + .zip(quiets.into_iter().map(bin_len_clamp)) + .map(|x| (x.0, x.1, WriteCntZero::Disable)) + .collect(); if let Some(last) = combs.last_mut() { if last.1 >= DtMs::from_ms_u64(1000 * 60 * 60 * 24) { last.0 = RetentionTime::Long; + last.1 = DtMs::from_ms_u64(1000 * 60 * 60 * 24); + last.2 = WriteCntZero::Enable; } else if last.1 >= DtMs::from_ms_u64(1000 * 60 * 60 * 1) { last.0 = RetentionTime::Long; - combs.push((RetentionTime::Long, DtMs::from_ms_u64(1000 * 60 * 60 * 24))); + last.1 = DtMs::from_ms_u64(1000 * 60 * 60 * 1); + combs.push(( + RetentionTime::Long, + DtMs::from_ms_u64(1000 * 60 * 60 * 24), + WriteCntZero::Enable, + )); } else { - combs.push((RetentionTime::Long, DtMs::from_ms_u64(1000 * 60 * 60 * 1))); - combs.push((RetentionTime::Long, DtMs::from_ms_u64(1000 * 60 * 60 * 24))); + combs.push(( + RetentionTime::Long, + DtMs::from_ms_u64(1000 * 60 * 60 * 1), + WriteCntZero::Disable, + )); + combs.push(( + RetentionTime::Long, + DtMs::from_ms_u64(1000 * 60 * 60 * 24), + WriteCntZero::Enable, + )); } } // check @@ -120,16 +155,23 @@ impl BinWriter { } let combs = combs; debug_bin2!(trd, "{:?} binning combs {:?}", chname, combs); - for (rt, bin_len) in combs { + for (rt, bin_len, write_zero) in combs { if bin_len > DUR_ZERO && bin_len <= DUR_MAX { if binner_1st.is_none() { let range = BinnedRange::from_beg_to_inf(beg, bin_len); - let binner = BinnedEventsTimeweight::new(range); - binner_1st = Some((rt, binner)); + let mut binner = BinnedEventsTimeweight::new(range); + if let WriteCntZero::Enable = write_zero { + binner.cnt_zero_enable(); + } + binner_1st = Some((rt, binner, write_zero)); } else { let range = BinnedRange::from_beg_to_inf(beg, bin_len); let binner = BinnedBinsTimeweight::new(range); - binner_others.push((rt, binner)); + if let WriteCntZero::Enable = write_zero { + // TODO + // binner.cnt_zero_enable(); + } + binner_others.push((rt, binner, write_zero)); } } } @@ -172,21 +214,21 @@ impl BinWriter { trace_tick!("tick evbuf len {}", self.evbuf.len()); let buf = &self.evbuf; if true { - if let Some(binner) = self.binner_1st.as_mut() { - let rt = binner.0.clone(); + if let Some(ee) = self.binner_1st.as_mut() { + let rt = ee.0.clone(); + let write_zero = ee.2.clone(); + let binner = &mut ee.1; // TODO avoid boxing - binner.1.ingest(&Box::new(buf))?; - let bins = binner.1.output(); + binner.ingest(&Box::new(buf))?; + let bins = binner.output(); if bins.len() > 0 { trace_bin2!(self.trd, "binner_1st out len {}", bins.len()); - Self::handle_output_ready(self.trd, self.sid, rt, &bins, iqdqs)?; - // - // TODO write these bins to scylla - // + Self::handle_output_ready(self.trd, self.sid, rt, &bins, write_zero, iqdqs)?; // TODO avoid boxing let mut bins2: BinsBoxed = Box::new(bins); for i in 0..self.binner_others.len() { - let (rt, binner) = &mut self.binner_others[i]; + let (rt, binner, write_zero) = &mut self.binner_others[i]; + let write_zero = write_zero.clone(); binner.ingest(&bins2)?; let bb: Option = binner.output()?; match bb { @@ -194,13 +236,17 @@ impl BinWriter { if bb.len() > 0 { trace_bin2!(self.trd, "binner_others {} out len {}", i, bb.len()); if let Some(bb2) = bb.as_any_ref().downcast_ref::>() { - Self::handle_output_ready(self.trd, self.sid, rt.clone(), &bb2, iqdqs)?; + Self::handle_output_ready( + self.trd, + self.sid, + rt.clone(), + &bb2, + write_zero, + iqdqs, + )?; } else { return Err(Error::UnexpectedContainerType); } - // - // TODO write these bins to scylla - // bins2 = bb; } else { break; @@ -220,7 +266,7 @@ impl BinWriter { } self.evbuf.clear(); } else { - trace_tick_verbose!("tick NOTHING TO INGEST"); + trace_tick_verbose!("tick nothing to ingest"); } Ok(()) } @@ -230,6 +276,7 @@ impl BinWriter { series: SeriesId, rt: RetentionTime, bins: &ContainerBins, + write_zero: WriteCntZero, iqdqs: &mut InsertDeques, ) -> Result<(), Error> { let selfname = "handle_output_ready"; @@ -239,15 +286,16 @@ impl BinWriter { } let bins_len = bins.len(); for (ts1, ts2, cnt, min, max, avg, lst, fnl) in bins.zip_iter_2() { + let bin_len = DtMs::from_ms_u64(ts2.delta(ts1).ms_u64()); if fnl == false { - info!("non final bin"); - } else if cnt == 0 { - info!("zero count bin"); + info!("non final bin {:?}", series); + } else if cnt == 0 && !write_zero.enabled() { + info!("zero count bin {:?}", series); } else { - let bin_len = DtMs::from_ms_u64(ts2.delta(ts1).ms_u64()); let div = get_div(bin_len)?; if div.ns() % bin_len.ns() != 0 { - panic!("divisor not a multiple {:?} {:?}", bin_len, div); + let e = Error::UnsupportedGridDiv(bin_len, div); + return Err(e); } let msp = ts1.ms() / div.ms(); let off = (ts1.ms() - div.ms() * msp) / bin_len.ms();