Improve channel status return

This commit is contained in:
Dominik Werder
2025-02-18 15:59:25 +01:00
parent 9a81e25625
commit cc251dc433
6 changed files with 234 additions and 76 deletions

View File

@@ -20,7 +20,6 @@ use conn::ChannelStateInfo;
use conn::ChannelStatusPartial;
use conn::ConnCommand;
use conn::ConnCommandResult;
use core::fmt;
use dbpg::seriesbychannel::BoxedSend;
use dbpg::seriesbychannel::CanSendChannelInfoResult;
use dbpg::seriesbychannel::ChannelInfoQuery;
@@ -55,6 +54,7 @@ use stats::CaProtoStats;
use stats::IocFinderStats;
use std::collections::BTreeMap;
use std::collections::VecDeque;
use std::fmt;
use std::net::SocketAddr;
use std::net::SocketAddrV4;
use std::pin::Pin;
@@ -584,13 +584,7 @@ impl CaConnSet {
fn handle_add_channel_new(cmd: ChannelAdd, ress: StateTransRes) -> Result<(), Error> {
{
let item = ChannelState {
value: ChannelStateValue::Active(ActiveChannelState::WaitForStatusSeriesId {
since: SystemTime::now(),
}),
config: cmd.ch_cfg.clone(),
touched: 1,
};
let item = ChannelState::new_wait_for_cssid(&cmd.ch_cfg);
*ress.chst = item;
}
{

View File

@@ -188,6 +188,7 @@ pub struct ChannelState {
pub value: ChannelStateValue,
pub config: ChannelConfig,
pub touched: u8,
config_file_basename: String,
}
impl ChannelState {
@@ -199,6 +200,26 @@ impl ChannelState {
false
}
}
pub fn new_dummy() -> Self {
Self {
value: ChannelStateValue::InitDummy,
config: ChannelConfig::dummy(),
touched: 0,
config_file_basename: String::new(),
}
}
pub fn new_wait_for_cssid(ch_cfg: &crate::conf::ChannelConfig) -> Self {
Self {
value: ChannelStateValue::Active(ActiveChannelState::WaitForStatusSeriesId {
since: SystemTime::now(),
}),
config: ch_cfg.clone(),
touched: 1,
config_file_basename: ch_cfg.config_file_basename().into(),
}
}
}
#[derive(Debug, Serialize)]
@@ -230,11 +251,7 @@ impl ChannelStateMap {
pub fn get_mut_or_dummy_init(&mut self, k: &ChannelName) -> &mut ChannelState {
if !self.map.contains_key(k) {
let dummy = ChannelState {
value: ChannelStateValue::InitDummy,
config: ChannelConfig::dummy(),
touched: 0,
};
let dummy = ChannelState::new_dummy();
self.map.insert(k.clone(), dummy);
}
self.map.get_mut(k).unwrap()

View File

@@ -43,6 +43,7 @@ pub struct CaIngestOpts {
pub test_bsread_addr: Option<String>,
#[serde(default)]
scylla_disable: bool,
#[serde(default)]
scylla_ignore_writes: bool,
}
@@ -188,6 +189,7 @@ fn test_duration_parse() {
}
async fn parse_channel_config_txt(fname: &Path) -> Result<ChannelsConfig, Error> {
let basename = fname.file_stem().unwrap().to_str().unwrap();
let re_p = Regex::new("--------------------------").unwrap();
let re_n = Regex::new("--------------------------").unwrap();
let mut file = OpenOptions::new().read(true).open(fname).await?;
@@ -218,6 +220,7 @@ async fn parse_channel_config_txt(fname: &Path) -> Result<ChannelsConfig, Error>
is_polled: false,
timestamp: ChannelTimestamp::Archiver,
},
config_file_basename: basename.to_string(),
};
conf.channels.push(item);
}
@@ -272,11 +275,12 @@ async fn parse_config_dir(dir: &Path) -> Result<ChannelsConfig, Error> {
let fnp = e.path();
let fns = fnp.to_str().unwrap();
if fns.ends_with(".yml") || fns.ends_with(".yaml") {
let basename = fnp.file_stem().unwrap().to_str().unwrap();
let buf = tokio::fs::read(e.path()).await?;
let conf: BTreeMap<String, ChannelConfigParse> =
serde_yaml::from_slice(&buf).map_err(Error::from_string)?;
info!("parsed {} channels from {}", conf.len(), fns);
ret.push_from_parsed(&conf);
ret.push_from_parsed(&conf, basename);
} else {
debug!("ignore channel config file {:?}", e.path());
}
@@ -299,23 +303,31 @@ impl ChannelTimestamp {
fn default_config() -> Self {
Self::Archiver
}
fn is_default(&self) -> bool {
if let ChannelTimestamp::Archiver = self {
true
} else {
false
}
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Deserialize)]
pub struct IngestConfigArchiving {
#[serde(default = "bool_true")]
#[serde(with = "serde_replication_bool")]
replication: bool,
#[serde(default, skip_serializing_if = "Option::is_none")]
#[serde(default)]
#[serde(with = "serde_option_channel_read_config")]
short_term: Option<ChannelReadConfig>,
#[serde(default, skip_serializing_if = "Option::is_none")]
#[serde(default)]
#[serde(with = "serde_option_channel_read_config")]
medium_term: Option<ChannelReadConfig>,
#[serde(default, skip_serializing_if = "Option::is_none")]
#[serde(default)]
#[serde(with = "serde_option_channel_read_config")]
long_term: Option<ChannelReadConfig>,
#[serde(default, skip_serializing_if = "bool_is_false")]
#[serde(default)]
is_polled: bool,
#[serde(default = "ChannelTimestamp::default_config")]
timestamp: ChannelTimestamp,
@@ -343,6 +355,71 @@ fn bool_true() -> bool {
true
}
mod serde_ingest_config_archiving {
use super::ChannelReadConfigApiFormat;
use super::IngestConfigArchiving;
use serde::de;
use serde::ser;
use serde::ser::SerializeMap;
use serde::Deserializer;
use serde::Serializer;
use std::fmt;
impl ser::Serialize for IngestConfigArchiving {
fn serialize<S>(&self, ser: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut map = ser.serialize_map(None)?;
// ser.is_human_readable()
if !self.replication {
map.serialize_entry("replication", &self.replication)?;
}
if let Some(v) = self.short_term.as_ref() {
map.serialize_entry("short_term", &ChannelReadConfigApiFormat(&v))?;
}
if let Some(v) = self.medium_term.as_ref() {
map.serialize_entry("medium_term", &ChannelReadConfigApiFormat(&v))?;
}
if let Some(v) = self.long_term.as_ref() {
map.serialize_entry("long_term", &ChannelReadConfigApiFormat(&v))?;
}
let anymon = [&self.short_term, &self.medium_term, &self.long_term]
.into_iter()
.map(|c| c.as_ref().map_or(false, |x| x.is_monitor()))
.fold(false, |a, x| a || x);
if anymon && self.is_polled || !anymon && !self.is_polled {
map.serialize_entry("is_polled", &self.is_polled)?;
}
if !self.timestamp.is_default() {
map.serialize_entry("timestamp", &self.timestamp)?;
}
map.end()
}
}
}
struct ChannelReadConfigApiFormat<'a>(&'a ChannelReadConfig);
#[allow(non_snake_case)]
mod serde_ChannelReadConfigApiFormat {
use super::ChannelReadConfig;
use super::ChannelReadConfigApiFormat;
use serde::ser;
impl<'a> ser::Serialize for ChannelReadConfigApiFormat<'a> {
fn serialize<S>(&self, ser: S) -> Result<S::Ok, S::Error>
where
S: ser::Serializer,
{
match &self.0 {
ChannelReadConfig::Monitor => ser.serialize_str("Monitor"),
ChannelReadConfig::Poll(n) => ser.serialize_u32(n.as_secs() as u32),
}
}
}
}
mod serde_replication_bool {
use serde::de;
use serde::Deserializer;
@@ -489,6 +566,16 @@ pub enum ChannelReadConfig {
Poll(Duration),
}
impl ChannelReadConfig {
pub fn is_monitor(&self) -> bool {
if let Self::Monitor = self {
true
} else {
false
}
}
}
#[test]
fn test_channel_config_00() {
let inp = r###"
@@ -546,38 +633,27 @@ impl ChannelsConfig {
&self.channels
}
fn push_from_parsed(&mut self, rhs: &BTreeMap<String, ChannelConfigParse>) {
fn push_from_parsed(&mut self, rhs: &BTreeMap<String, ChannelConfigParse>, config_file_basename: &str) {
for (k, v) in rhs.iter() {
let item = ChannelConfig {
name: k.into(),
arch: v.archiving_configuration.clone(),
config_file_basename: config_file_basename.into(),
};
self.channels.push(item);
}
}
}
impl From<BTreeMap<String, ChannelConfigParse>> for ChannelsConfig {
fn from(value: BTreeMap<String, ChannelConfigParse>) -> Self {
let channels = value
.into_iter()
.map(|(k, v)| ChannelConfig {
name: k,
arch: v.archiving_configuration,
})
.collect();
ChannelsConfig { channels }
}
}
#[derive(Debug, Clone, PartialEq, Serialize)]
pub struct ChannelConfig {
name: String,
arch: IngestConfigArchiving,
config_file_basename: String,
}
impl ChannelConfig {
pub fn st_monitor<S: Into<String>>(name: S) -> Self {
pub fn st_monitor<S: Into<String>>(name: S, config_file_basename: &str) -> Self {
Self {
name: name.into(),
arch: IngestConfigArchiving {
@@ -588,6 +664,7 @@ impl ChannelConfig {
is_polled: false,
timestamp: ChannelTimestamp::Archiver,
},
config_file_basename: config_file_basename.into(),
}
}
@@ -595,6 +672,10 @@ impl ChannelConfig {
&self.name
}
pub fn config_file_basename(&self) -> &str {
&self.config_file_basename
}
pub fn is_polled(&self) -> bool {
self.arch.is_polled
}
@@ -678,6 +759,18 @@ impl ChannelConfig {
Self {
name: String::from("dummy"),
arch: IngestConfigArchiving::dummy(),
config_file_basename: String::new(),
}
}
}
#[derive(Debug, Clone, PartialEq, Serialize)]
pub struct ChannelConfigForStatesApi {
arch: IngestConfigArchiving,
}
impl From<ChannelConfig> for ChannelConfigForStatesApi {
fn from(value: ChannelConfig) -> Self {
Self { arch: value.arch }
}
}

View File

@@ -215,7 +215,7 @@ async fn find_channel(
async fn channel_add_inner(params: HashMap<String, String>, dcom: Arc<DaemonComm>) -> Result<(), Error> {
if let Some(name) = params.get("name") {
// let ch = crate::daemon_common::Channel::new(name.into());
let ch_cfg = ChannelConfig::st_monitor(name);
let ch_cfg = ChannelConfig::st_monitor(name, "api");
let (tx, rx) = async_channel::bounded(1);
let ev = DaemonEvent::ChannelAdd(ch_cfg, tx);
dcom.tx.send(ev).await?;

View File

@@ -2,6 +2,7 @@ use crate::ca::connset::CaConnSetEvent;
use crate::ca::connset::ChannelStatusesRequest;
use crate::ca::connset::ConnSetCmd;
use crate::conf::ChannelConfig;
use crate::conf::ChannelConfigForStatesApi;
use async_channel::Sender;
use chrono::DateTime;
use chrono::Utc;
@@ -62,7 +63,7 @@ impl StorageUsage {
struct ChannelState {
ioc_address: Option<SocketAddr>,
connection: ConnectionState,
archiving_configuration: ChannelConfig,
archiving_configuration: ChannelConfigForStatesApi,
recv_count: u64,
recv_bytes: u64,
#[serde(with = "humantime_serde", skip_serializing_if = "system_time_epoch")]
@@ -87,9 +88,10 @@ impl ChannelState {
fn connecting_addr(config: ChannelConfig, ioc_address: Option<SocketAddr>, connst: ConnectionState) -> Self {
Self {
private: StatePrivate::default(config.config_file_basename()),
ioc_address,
connection: connst,
archiving_configuration: config,
archiving_configuration: config.into(),
recv_count: 0,
recv_bytes: 0,
recv_last: SystemTime::UNIX_EPOCH,
@@ -98,13 +100,13 @@ impl ChannelState {
write_lt_last: SystemTime::UNIX_EPOCH,
updated: SystemTime::UNIX_EPOCH,
pong_last: None,
private: StatePrivate::default(),
}
}
fn with_chst(config: ChannelConfig, chst: crate::ca::conn::ChannelStateInfo) -> Self {
let private = StatePrivate {
status_emit_count: chst.status_emit_count,
config_file_basename: config.config_file_basename().into(),
};
let connst = {
use crate::ca::conn::ChannelConnectedInfo::*;
@@ -120,7 +122,7 @@ impl ChannelState {
connection: connst,
// TODO config is stored in two places
// conf: chst.conf,
archiving_configuration: config,
archiving_configuration: config.into(),
recv_count: chst.recv_count.unwrap_or(0),
recv_bytes: chst.recv_bytes.unwrap_or(0),
recv_last: chst.recv_last,
@@ -137,11 +139,15 @@ impl ChannelState {
#[derive(Debug, Serialize)]
struct StatePrivate {
status_emit_count: u64,
config_file_basename: String,
}
impl Default for StatePrivate {
fn default() -> Self {
Self { status_emit_count: 0 }
impl StatePrivate {
fn default(config_file_basename: &str) -> Self {
Self {
status_emit_count: 0,
config_file_basename: config_file_basename.into(),
}
}
}

View File

@@ -22,12 +22,12 @@ use series::ChannelStatusSeriesId;
use series::SeriesId;
use std::time::Duration;
macro_rules! trace_ingest { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
macro_rules! trace_tick { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
macro_rules! trace_tick_verbose { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
macro_rules! trace_ingest { ($($arg:expr),*) => ( if false { trace!($($arg),*); } ) }
macro_rules! trace_tick { ($($arg:expr),*) => ( if false { trace!($($arg),*); } ) }
macro_rules! trace_tick_verbose { ($($arg:expr),*) => ( if false { trace!($($arg),*); } ) }
macro_rules! debug_bin2 { ($t:expr, $($arg:tt)*) => ( if true { if $t { debug!($($arg)*); } } ) }
macro_rules! trace_bin2 { ($t:expr, $($arg:tt)*) => ( if false { if $t { trace!($($arg)*); } } ) }
macro_rules! debug_bin2 { ($t:expr, $($arg:expr),*) => ( if true { if $t { debug!($($arg),*); } } ) }
macro_rules! trace_bin2 { ($t:expr, $($arg:expr),*) => ( if false { if $t { trace!($($arg),*); } } ) }
autoerr::create_error_v1!(
name(Error, "SerieswriterBinwriter"),
@@ -39,6 +39,7 @@ autoerr::create_error_v1!(
BinBinning(#[from] items_0::timebin::BinningggError),
UnexpectedContainerType,
PartitionMsp(#[from] series::msp::Error),
UnsupportedGridDiv(DtMs, DtMs),
},
);
@@ -65,6 +66,21 @@ fn get_div(bin_len: DtMs) -> Result<DtMs, Error> {
Ok(ret)
}
#[derive(Debug, Clone)]
enum WriteCntZero {
Enable,
Disable,
}
impl WriteCntZero {
fn enabled(&self) -> bool {
match self {
WriteCntZero::Enable => true,
WriteCntZero::Disable => false,
}
}
}
#[derive(Debug)]
pub struct BinWriter {
chname: String,
@@ -73,8 +89,8 @@ pub struct BinWriter {
scalar_type: ScalarType,
shape: Shape,
evbuf: ContainerEvents<f32>,
binner_1st: Option<(RetentionTime, BinnedEventsTimeweight<f32>)>,
binner_others: Vec<(RetentionTime, BinnedBinsTimeweight<f32, f32>)>,
binner_1st: Option<(RetentionTime, BinnedEventsTimeweight<f32>, WriteCntZero)>,
binner_others: Vec<(RetentionTime, BinnedBinsTimeweight<f32, f32>, WriteCntZero)>,
trd: bool,
}
@@ -98,16 +114,35 @@ impl BinWriter {
let quiets = [min_quiets.st.clone(), min_quiets.mt.clone(), min_quiets.lt.clone()];
let mut binner_1st = None;
let mut binner_others = Vec::new();
let mut combs: Vec<_> = rts.into_iter().zip(quiets.into_iter().map(bin_len_clamp)).collect();
let mut combs: Vec<_> = rts
.into_iter()
.zip(quiets.into_iter().map(bin_len_clamp))
.map(|x| (x.0, x.1, WriteCntZero::Disable))
.collect();
if let Some(last) = combs.last_mut() {
if last.1 >= DtMs::from_ms_u64(1000 * 60 * 60 * 24) {
last.0 = RetentionTime::Long;
last.1 = DtMs::from_ms_u64(1000 * 60 * 60 * 24);
last.2 = WriteCntZero::Enable;
} else if last.1 >= DtMs::from_ms_u64(1000 * 60 * 60 * 1) {
last.0 = RetentionTime::Long;
combs.push((RetentionTime::Long, DtMs::from_ms_u64(1000 * 60 * 60 * 24)));
last.1 = DtMs::from_ms_u64(1000 * 60 * 60 * 1);
combs.push((
RetentionTime::Long,
DtMs::from_ms_u64(1000 * 60 * 60 * 24),
WriteCntZero::Enable,
));
} else {
combs.push((RetentionTime::Long, DtMs::from_ms_u64(1000 * 60 * 60 * 1)));
combs.push((RetentionTime::Long, DtMs::from_ms_u64(1000 * 60 * 60 * 24)));
combs.push((
RetentionTime::Long,
DtMs::from_ms_u64(1000 * 60 * 60 * 1),
WriteCntZero::Disable,
));
combs.push((
RetentionTime::Long,
DtMs::from_ms_u64(1000 * 60 * 60 * 24),
WriteCntZero::Enable,
));
}
}
// check
@@ -120,16 +155,23 @@ impl BinWriter {
}
let combs = combs;
debug_bin2!(trd, "{:?} binning combs {:?}", chname, combs);
for (rt, bin_len) in combs {
for (rt, bin_len, write_zero) in combs {
if bin_len > DUR_ZERO && bin_len <= DUR_MAX {
if binner_1st.is_none() {
let range = BinnedRange::from_beg_to_inf(beg, bin_len);
let binner = BinnedEventsTimeweight::new(range);
binner_1st = Some((rt, binner));
let mut binner = BinnedEventsTimeweight::new(range);
if let WriteCntZero::Enable = write_zero {
binner.cnt_zero_enable();
}
binner_1st = Some((rt, binner, write_zero));
} else {
let range = BinnedRange::from_beg_to_inf(beg, bin_len);
let binner = BinnedBinsTimeweight::new(range);
binner_others.push((rt, binner));
if let WriteCntZero::Enable = write_zero {
// TODO
// binner.cnt_zero_enable();
}
binner_others.push((rt, binner, write_zero));
}
}
}
@@ -172,21 +214,21 @@ impl BinWriter {
trace_tick!("tick evbuf len {}", self.evbuf.len());
let buf = &self.evbuf;
if true {
if let Some(binner) = self.binner_1st.as_mut() {
let rt = binner.0.clone();
if let Some(ee) = self.binner_1st.as_mut() {
let rt = ee.0.clone();
let write_zero = ee.2.clone();
let binner = &mut ee.1;
// TODO avoid boxing
binner.1.ingest(&Box::new(buf))?;
let bins = binner.1.output();
binner.ingest(&Box::new(buf))?;
let bins = binner.output();
if bins.len() > 0 {
trace_bin2!(self.trd, "binner_1st out len {}", bins.len());
Self::handle_output_ready(self.trd, self.sid, rt, &bins, iqdqs)?;
//
// TODO write these bins to scylla
//
Self::handle_output_ready(self.trd, self.sid, rt, &bins, write_zero, iqdqs)?;
// TODO avoid boxing
let mut bins2: BinsBoxed = Box::new(bins);
for i in 0..self.binner_others.len() {
let (rt, binner) = &mut self.binner_others[i];
let (rt, binner, write_zero) = &mut self.binner_others[i];
let write_zero = write_zero.clone();
binner.ingest(&bins2)?;
let bb: Option<BinsBoxed> = binner.output()?;
match bb {
@@ -194,13 +236,17 @@ impl BinWriter {
if bb.len() > 0 {
trace_bin2!(self.trd, "binner_others {} out len {}", i, bb.len());
if let Some(bb2) = bb.as_any_ref().downcast_ref::<ContainerBins<f32, f32>>() {
Self::handle_output_ready(self.trd, self.sid, rt.clone(), &bb2, iqdqs)?;
Self::handle_output_ready(
self.trd,
self.sid,
rt.clone(),
&bb2,
write_zero,
iqdqs,
)?;
} else {
return Err(Error::UnexpectedContainerType);
}
//
// TODO write these bins to scylla
//
bins2 = bb;
} else {
break;
@@ -220,7 +266,7 @@ impl BinWriter {
}
self.evbuf.clear();
} else {
trace_tick_verbose!("tick NOTHING TO INGEST");
trace_tick_verbose!("tick nothing to ingest");
}
Ok(())
}
@@ -230,6 +276,7 @@ impl BinWriter {
series: SeriesId,
rt: RetentionTime,
bins: &ContainerBins<f32, f32>,
write_zero: WriteCntZero,
iqdqs: &mut InsertDeques,
) -> Result<(), Error> {
let selfname = "handle_output_ready";
@@ -239,15 +286,16 @@ impl BinWriter {
}
let bins_len = bins.len();
for (ts1, ts2, cnt, min, max, avg, lst, fnl) in bins.zip_iter_2() {
let bin_len = DtMs::from_ms_u64(ts2.delta(ts1).ms_u64());
if fnl == false {
info!("non final bin");
} else if cnt == 0 {
info!("zero count bin");
info!("non final bin {:?}", series);
} else if cnt == 0 && !write_zero.enabled() {
info!("zero count bin {:?}", series);
} else {
let bin_len = DtMs::from_ms_u64(ts2.delta(ts1).ms_u64());
let div = get_div(bin_len)?;
if div.ns() % bin_len.ns() != 0 {
panic!("divisor not a multiple {:?} {:?}", bin_len, div);
let e = Error::UnsupportedGridDiv(bin_len, div);
return Err(e);
}
let msp = ts1.ms() / div.ms();
let off = (ts1.ms() - div.ms() * msp) / bin_len.ms();