diff --git a/daqingest/Cargo.toml b/daqingest/Cargo.toml index 9d2fc3b..f062027 100644 --- a/daqingest/Cargo.toml +++ b/daqingest/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqingest" -version = "0.3.1-aa.1" +version = "0.3.1-aa.2" authors = ["Dominik Werder "] edition = "2024" @@ -15,7 +15,7 @@ clap = { version = "4.5.28", features = ["derive", "cargo"] } tracing = "0.1" serde = { version = "1.0", features = ["derive"] } tokio-postgres = "0.7.13" -async-channel = "2.3.1" +async-channel = "2.5.0" futures-util = "0.3" chrono = "0.4.38" bytes = "1.10.0" diff --git a/netfetch/Cargo.toml b/netfetch/Cargo.toml index 3512fed..2527c62 100644 --- a/netfetch/Cargo.toml +++ b/netfetch/Cargo.toml @@ -13,9 +13,10 @@ serde_json = "1.0" serde_cbor = "0.11" serde_yaml = "0.9.16" ciborium = "0.2.2" +hcl-rs = "0.19.2" tokio-stream = { version = "0.1", features = ["fs"] } tracing = "0.1.37" -async-channel = "2.3.1" +async-channel = "2.5.0" bytes = "1.10" arrayref = "0.3" byteorder = "1.5" diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 807faf0..0f438e8 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -195,6 +195,9 @@ pub struct ChannelStateInfo { pub write_st_last: SystemTime, pub write_mt_last: SystemTime, pub write_lt_last: SystemTime, + pub val_lst_st: serde_json::Value, + pub val_lst_mt: serde_json::Value, + pub val_lst_lt: serde_json::Value, pub status_emit_count: u64, pub last_comparisons: Option>, } @@ -502,6 +505,9 @@ struct CreatedState { dw_st_last: SystemTime, dw_mt_last: SystemTime, dw_lt_last: SystemTime, + val_lst_st: serde_json::Value, + val_lst_mt: serde_json::Value, + val_lst_lt: serde_json::Value, scalar_type: ScalarType, shape: Shape, name: String, @@ -541,6 +547,9 @@ impl CreatedState { dw_st_last: SystemTime::UNIX_EPOCH, dw_mt_last: SystemTime::UNIX_EPOCH, dw_lt_last: SystemTime::UNIX_EPOCH, + val_lst_st: serde_json::Value::Null, + val_lst_mt: serde_json::Value::Null, + val_lst_lt: serde_json::Value::Null, scalar_type: ScalarType::I8, shape: Shape::Scalar, name: String::new(), @@ -660,6 +669,18 @@ impl ChannelState { (a, a, a, a) } }; + let (val_lst_st, val_lst_mt, val_lst_lt) = match self { + ChannelState::Writable(s) => { + let a = s.channel.val_lst_st.clone(); + let b = s.channel.val_lst_mt.clone(); + let c = s.channel.val_lst_lt.clone(); + (a, b, c) + } + _ => { + let a = serde_json::Value::Null; + (a.clone(), a.clone(), a.clone()) + } + }; let item_recv_ivl_ema = match self { ChannelState::Writable(s) => { let ema = s.channel.item_recv_ivl_ema.ema(); @@ -705,6 +726,9 @@ impl ChannelState { write_st_last, write_mt_last, write_lt_last, + val_lst_st, + val_lst_mt, + val_lst_lt, status_emit_count, last_comparisons, } @@ -1094,17 +1118,22 @@ impl<'a> EventAddIngestRefobj<'a> { crst.muted_before = 0; crst.insert_item_ivl_ema.tick(tsnow); let val_for_agg = value.f32_for_binning(); - let wres = rtwriter.write(CaWriterValue::new(value, crst), tscaproto, tsev, self.iqdqs)?; + // TODO refactor + let value_cloned = value.clone(); + let wres = rtwriter.write(CaWriterValue::new(value_cloned, crst), tscaproto, tsev, self.iqdqs)?; if wres.st.accept { crst.dw_st_last = stnow; + crst.val_lst_st = value.to_json_value(); crst.acc_st.push_written(payload_len); } if wres.mt.accept { crst.dw_mt_last = stnow; + crst.val_lst_mt = value.to_json_value(); crst.acc_mt.push_written(payload_len); } if wres.lt.accept { crst.dw_lt_last = stnow; + crst.val_lst_lt = value.to_json_value(); crst.acc_lt.push_written(payload_len); } if let Some(binwriter) = self.binwriter.as_mut() { @@ -3234,6 +3263,9 @@ impl CaConn { dw_st_last: SystemTime::UNIX_EPOCH, dw_mt_last: SystemTime::UNIX_EPOCH, dw_lt_last: SystemTime::UNIX_EPOCH, + val_lst_st: serde_json::Value::Null, + val_lst_mt: serde_json::Value::Null, + val_lst_lt: serde_json::Value::Null, scalar_type: scalar_type.clone(), shape: shape.clone(), name: conf.conf.name().into(), diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index 2ac02eb..cfcef49 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -1012,6 +1012,7 @@ impl CaConnSet { let channels_ca_conn_set = self .channel_states .iter() + .take(req.limit as _) .filter(|(k, _)| reg1.is_match(k.name())) .map(|(k, v)| (k.name().to_string(), v.clone())) .collect(); diff --git a/netfetch/src/conf.rs b/netfetch/src/conf.rs index 848ad95..e53c6bc 100644 --- a/netfetch/src/conf.rs +++ b/netfetch/src/conf.rs @@ -407,6 +407,61 @@ scylla_2nd: assert_eq!(conf.timeout, Some(Duration::from_millis(1000 * (60 * 10 + 3) + 45))); } +#[test] +fn test_parse_config_hcl_00() { + let conf = r###" +backend = test_backend +api_bind = "0.0.0.0:3011" +search = [ + "172.26.0.255", + "172.26.2.255", +] +postgresql { + host = HOST + port = 2345 + user = USER + pass = PASS + name = NAME +} +scylla_st { keyspace = KS_ST } +scylla_mt { keyspace = "KS_MT" } +scylla_lt { keyspace = KS_LT } +scylla_lt { + keyspace = "KS_LT" + hosts = [ + "1.2:3", + "1.2:4", + ] +} +scylla_st_rf1 { keyspace = "KS_ST_RF1" } +"###; + let mut body = hcl::parse(conf).unwrap(); + for att in body.attributes_mut() { + let e = &mut att.expr; + if let hcl::Expression::Variable(id) = e { + *e = hcl::Expression::String(id[..].into()); + } + } + for block in body.blocks_mut() { + for att in block.body.attributes_mut() { + let e = &mut att.expr; + if let hcl::Expression::Variable(id) = e { + *e = hcl::Expression::String(id[..].into()); + } + } + } + let conf: CaIngestOpts = hcl::from_body(body).unwrap(); + assert_eq!(conf.backend, "test_backend"); + assert_eq!(conf.search.len(), 2); + assert_eq!(conf.search.get(0), Some(&"172.26.0.255".into())); + assert_eq!(conf.search.get(1), Some(&"172.26.2.255".into())); + assert_eq!(conf.postgresql.port, 2345); + assert_eq!(conf.postgresql.user, "USER"); + assert_eq!(conf.postgresql.pass, "PASS"); + assert_eq!(conf.scylla_st.keyspace, "KS_ST"); + assert_eq!(conf.scylla_lt.keyspace, "KS_LT"); +} + #[test] fn test_duration_parse() { #[derive(Serialize, Deserialize)] diff --git a/netfetch/src/metrics/status.rs b/netfetch/src/metrics/status.rs index c6439b1..807848f 100644 --- a/netfetch/src/metrics/status.rs +++ b/netfetch/src/metrics/status.rs @@ -16,6 +16,8 @@ autoerr::create_error_v1!( name(Error, "StatusError"), enum variants { Internal, + ChannelSend, + ChannelRecv, }, ); @@ -55,6 +57,63 @@ impl StorageUsage { } } +#[derive(Debug, Serialize)] +struct ChannelWrite { + #[serde(with = "humantime_serde")] + ts: SystemTime, + val: serde_json::Value, +} + +impl ChannelWrite { + fn unset() -> Self { + Self { + ts: SystemTime::UNIX_EPOCH, + val: serde_json::Value::Null, + } + } + + fn is_unset(&self) -> bool { + self.ts == SystemTime::UNIX_EPOCH + } +} + +#[derive(Debug, Serialize)] +struct ChannelWrites { + #[serde(skip_serializing_if = "ChannelWrite::is_unset")] + st: ChannelWrite, + #[serde(skip_serializing_if = "ChannelWrite::is_unset")] + mt: ChannelWrite, + #[serde(skip_serializing_if = "ChannelWrite::is_unset")] + lt: ChannelWrite, +} + +impl ChannelWrites { + fn from_conn_channel_state_info(chst: &crate::ca::conn::ChannelStateInfo) -> Self { + Self { + st: ChannelWrite { + ts: chst.write_st_last, + val: chst.val_lst_st.clone(), + }, + mt: ChannelWrite { + ts: chst.write_mt_last, + val: chst.val_lst_mt.clone(), + }, + lt: ChannelWrite { + ts: chst.write_lt_last, + val: chst.val_lst_lt.clone(), + }, + } + } + + fn unset() -> Self { + Self { + st: ChannelWrite::unset(), + mt: ChannelWrite::unset(), + lt: ChannelWrite::unset(), + } + } +} + #[derive(Debug, Serialize)] struct ChannelState { ioc_address: Option, @@ -62,15 +121,16 @@ struct ChannelState { archiving_configuration: ChannelConfigForStatesApi, recv_count: u64, recv_bytes: u64, - #[serde(with = "humantime_serde", skip_serializing_if = "system_time_epoch")] + #[serde(with = "humantime_serde", skip_serializing_if = "is_system_time_unix_epoch")] recv_last: SystemTime, - #[serde(with = "humantime_serde", skip_serializing_if = "system_time_epoch")] + #[serde(with = "humantime_serde", skip_serializing_if = "is_system_time_unix_epoch")] write_st_last: SystemTime, - #[serde(with = "humantime_serde", skip_serializing_if = "system_time_epoch")] + channel_writes: ChannelWrites, + #[serde(with = "humantime_serde", skip_serializing_if = "is_system_time_unix_epoch")] write_mt_last: SystemTime, - #[serde(with = "humantime_serde", skip_serializing_if = "system_time_epoch")] + #[serde(with = "humantime_serde", skip_serializing_if = "is_system_time_unix_epoch")] write_lt_last: SystemTime, - #[serde(with = "humantime_serde", skip_serializing_if = "system_time_epoch")] + #[serde(with = "humantime_serde", skip_serializing_if = "is_system_time_unix_epoch")] updated: SystemTime, #[serde(with = "humantime_serde")] pong_last: Option, @@ -94,6 +154,7 @@ impl ChannelState { write_st_last: SystemTime::UNIX_EPOCH, write_mt_last: SystemTime::UNIX_EPOCH, write_lt_last: SystemTime::UNIX_EPOCH, + channel_writes: ChannelWrites::unset(), updated: SystemTime::UNIX_EPOCH, pong_last: None, } @@ -125,6 +186,7 @@ impl ChannelState { write_st_last: chst.write_st_last, write_mt_last: chst.write_mt_last, write_lt_last: chst.write_lt_last, + channel_writes: ChannelWrites::from_conn_channel_state_info(&chst), updated: chst.stnow, pong_last: chst.pong_last, private, @@ -147,7 +209,7 @@ impl StatePrivate { } } -fn system_time_epoch(x: &SystemTime) -> bool { +fn is_system_time_unix_epoch(x: &SystemTime) -> bool { *x == SystemTime::UNIX_EPOCH } @@ -187,13 +249,12 @@ async fn channel_states_try( tx: Sender, ) -> Result, Error> { let name = params.get("name").map_or(String::new(), |x| x.clone()).to_string(); - let limit = params.get("limit").and_then(|x| x.parse().ok()).unwrap_or(40); + let limit = params.get("limit").and_then(|x| x.parse().ok()).unwrap_or(1000 * 1000); let (tx2, rx2) = async_channel::bounded(1); let req = ChannelStatusesRequest { name, limit, tx: tx2 }; let item = CaConnSetEvent::ConnSetCmd(ConnSetCmd::ChannelStatuses(req)); - // TODO handle error - tx.send(item).await.unwrap(); - let res = rx2.recv().await.unwrap(); + tx.send(item).await.map_err(|e| Error::ChannelSend)?; + let res = rx2.recv().await.map_err(|e| Error::ChannelRecv)?; let mut states = ChannelStates { running_since: Utc::now(), channels: BTreeMap::new(),