Provide last value for status json

This commit is contained in:
Dominik Werder
2025-07-31 15:04:06 +02:00
parent a545c1a5b7
commit 249f3ba910
6 changed files with 164 additions and 14 deletions

View File

@@ -1,6 +1,6 @@
[package]
name = "daqingest"
version = "0.3.1-aa.1"
version = "0.3.1-aa.2"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2024"
@@ -15,7 +15,7 @@ clap = { version = "4.5.28", features = ["derive", "cargo"] }
tracing = "0.1"
serde = { version = "1.0", features = ["derive"] }
tokio-postgres = "0.7.13"
async-channel = "2.3.1"
async-channel = "2.5.0"
futures-util = "0.3"
chrono = "0.4.38"
bytes = "1.10.0"

View File

@@ -13,9 +13,10 @@ serde_json = "1.0"
serde_cbor = "0.11"
serde_yaml = "0.9.16"
ciborium = "0.2.2"
hcl-rs = "0.19.2"
tokio-stream = { version = "0.1", features = ["fs"] }
tracing = "0.1.37"
async-channel = "2.3.1"
async-channel = "2.5.0"
bytes = "1.10"
arrayref = "0.3"
byteorder = "1.5"

View File

@@ -195,6 +195,9 @@ pub struct ChannelStateInfo {
pub write_st_last: SystemTime,
pub write_mt_last: SystemTime,
pub write_lt_last: SystemTime,
pub val_lst_st: serde_json::Value,
pub val_lst_mt: serde_json::Value,
pub val_lst_lt: serde_json::Value,
pub status_emit_count: u64,
pub last_comparisons: Option<VecDeque<(UtcDateTime, MonitorReadCmp)>>,
}
@@ -502,6 +505,9 @@ struct CreatedState {
dw_st_last: SystemTime,
dw_mt_last: SystemTime,
dw_lt_last: SystemTime,
val_lst_st: serde_json::Value,
val_lst_mt: serde_json::Value,
val_lst_lt: serde_json::Value,
scalar_type: ScalarType,
shape: Shape,
name: String,
@@ -541,6 +547,9 @@ impl CreatedState {
dw_st_last: SystemTime::UNIX_EPOCH,
dw_mt_last: SystemTime::UNIX_EPOCH,
dw_lt_last: SystemTime::UNIX_EPOCH,
val_lst_st: serde_json::Value::Null,
val_lst_mt: serde_json::Value::Null,
val_lst_lt: serde_json::Value::Null,
scalar_type: ScalarType::I8,
shape: Shape::Scalar,
name: String::new(),
@@ -660,6 +669,18 @@ impl ChannelState {
(a, a, a, a)
}
};
let (val_lst_st, val_lst_mt, val_lst_lt) = match self {
ChannelState::Writable(s) => {
let a = s.channel.val_lst_st.clone();
let b = s.channel.val_lst_mt.clone();
let c = s.channel.val_lst_lt.clone();
(a, b, c)
}
_ => {
let a = serde_json::Value::Null;
(a.clone(), a.clone(), a.clone())
}
};
let item_recv_ivl_ema = match self {
ChannelState::Writable(s) => {
let ema = s.channel.item_recv_ivl_ema.ema();
@@ -705,6 +726,9 @@ impl ChannelState {
write_st_last,
write_mt_last,
write_lt_last,
val_lst_st,
val_lst_mt,
val_lst_lt,
status_emit_count,
last_comparisons,
}
@@ -1094,17 +1118,22 @@ impl<'a> EventAddIngestRefobj<'a> {
crst.muted_before = 0;
crst.insert_item_ivl_ema.tick(tsnow);
let val_for_agg = value.f32_for_binning();
let wres = rtwriter.write(CaWriterValue::new(value, crst), tscaproto, tsev, self.iqdqs)?;
// TODO refactor
let value_cloned = value.clone();
let wres = rtwriter.write(CaWriterValue::new(value_cloned, crst), tscaproto, tsev, self.iqdqs)?;
if wres.st.accept {
crst.dw_st_last = stnow;
crst.val_lst_st = value.to_json_value();
crst.acc_st.push_written(payload_len);
}
if wres.mt.accept {
crst.dw_mt_last = stnow;
crst.val_lst_mt = value.to_json_value();
crst.acc_mt.push_written(payload_len);
}
if wres.lt.accept {
crst.dw_lt_last = stnow;
crst.val_lst_lt = value.to_json_value();
crst.acc_lt.push_written(payload_len);
}
if let Some(binwriter) = self.binwriter.as_mut() {
@@ -3234,6 +3263,9 @@ impl CaConn {
dw_st_last: SystemTime::UNIX_EPOCH,
dw_mt_last: SystemTime::UNIX_EPOCH,
dw_lt_last: SystemTime::UNIX_EPOCH,
val_lst_st: serde_json::Value::Null,
val_lst_mt: serde_json::Value::Null,
val_lst_lt: serde_json::Value::Null,
scalar_type: scalar_type.clone(),
shape: shape.clone(),
name: conf.conf.name().into(),

View File

@@ -1012,6 +1012,7 @@ impl CaConnSet {
let channels_ca_conn_set = self
.channel_states
.iter()
.take(req.limit as _)
.filter(|(k, _)| reg1.is_match(k.name()))
.map(|(k, v)| (k.name().to_string(), v.clone()))
.collect();

View File

@@ -407,6 +407,61 @@ scylla_2nd:
assert_eq!(conf.timeout, Some(Duration::from_millis(1000 * (60 * 10 + 3) + 45)));
}
#[test]
fn test_parse_config_hcl_00() {
let conf = r###"
backend = test_backend
api_bind = "0.0.0.0:3011"
search = [
"172.26.0.255",
"172.26.2.255",
]
postgresql {
host = HOST
port = 2345
user = USER
pass = PASS
name = NAME
}
scylla_st { keyspace = KS_ST }
scylla_mt { keyspace = "KS_MT" }
scylla_lt { keyspace = KS_LT }
scylla_lt {
keyspace = "KS_LT"
hosts = [
"1.2:3",
"1.2:4",
]
}
scylla_st_rf1 { keyspace = "KS_ST_RF1" }
"###;
let mut body = hcl::parse(conf).unwrap();
for att in body.attributes_mut() {
let e = &mut att.expr;
if let hcl::Expression::Variable(id) = e {
*e = hcl::Expression::String(id[..].into());
}
}
for block in body.blocks_mut() {
for att in block.body.attributes_mut() {
let e = &mut att.expr;
if let hcl::Expression::Variable(id) = e {
*e = hcl::Expression::String(id[..].into());
}
}
}
let conf: CaIngestOpts = hcl::from_body(body).unwrap();
assert_eq!(conf.backend, "test_backend");
assert_eq!(conf.search.len(), 2);
assert_eq!(conf.search.get(0), Some(&"172.26.0.255".into()));
assert_eq!(conf.search.get(1), Some(&"172.26.2.255".into()));
assert_eq!(conf.postgresql.port, 2345);
assert_eq!(conf.postgresql.user, "USER");
assert_eq!(conf.postgresql.pass, "PASS");
assert_eq!(conf.scylla_st.keyspace, "KS_ST");
assert_eq!(conf.scylla_lt.keyspace, "KS_LT");
}
#[test]
fn test_duration_parse() {
#[derive(Serialize, Deserialize)]

View File

@@ -16,6 +16,8 @@ autoerr::create_error_v1!(
name(Error, "StatusError"),
enum variants {
Internal,
ChannelSend,
ChannelRecv,
},
);
@@ -55,6 +57,63 @@ impl StorageUsage {
}
}
#[derive(Debug, Serialize)]
struct ChannelWrite {
#[serde(with = "humantime_serde")]
ts: SystemTime,
val: serde_json::Value,
}
impl ChannelWrite {
fn unset() -> Self {
Self {
ts: SystemTime::UNIX_EPOCH,
val: serde_json::Value::Null,
}
}
fn is_unset(&self) -> bool {
self.ts == SystemTime::UNIX_EPOCH
}
}
#[derive(Debug, Serialize)]
struct ChannelWrites {
#[serde(skip_serializing_if = "ChannelWrite::is_unset")]
st: ChannelWrite,
#[serde(skip_serializing_if = "ChannelWrite::is_unset")]
mt: ChannelWrite,
#[serde(skip_serializing_if = "ChannelWrite::is_unset")]
lt: ChannelWrite,
}
impl ChannelWrites {
fn from_conn_channel_state_info(chst: &crate::ca::conn::ChannelStateInfo) -> Self {
Self {
st: ChannelWrite {
ts: chst.write_st_last,
val: chst.val_lst_st.clone(),
},
mt: ChannelWrite {
ts: chst.write_mt_last,
val: chst.val_lst_mt.clone(),
},
lt: ChannelWrite {
ts: chst.write_lt_last,
val: chst.val_lst_lt.clone(),
},
}
}
fn unset() -> Self {
Self {
st: ChannelWrite::unset(),
mt: ChannelWrite::unset(),
lt: ChannelWrite::unset(),
}
}
}
#[derive(Debug, Serialize)]
struct ChannelState {
ioc_address: Option<SocketAddr>,
@@ -62,15 +121,16 @@ struct ChannelState {
archiving_configuration: ChannelConfigForStatesApi,
recv_count: u64,
recv_bytes: u64,
#[serde(with = "humantime_serde", skip_serializing_if = "system_time_epoch")]
#[serde(with = "humantime_serde", skip_serializing_if = "is_system_time_unix_epoch")]
recv_last: SystemTime,
#[serde(with = "humantime_serde", skip_serializing_if = "system_time_epoch")]
#[serde(with = "humantime_serde", skip_serializing_if = "is_system_time_unix_epoch")]
write_st_last: SystemTime,
#[serde(with = "humantime_serde", skip_serializing_if = "system_time_epoch")]
channel_writes: ChannelWrites,
#[serde(with = "humantime_serde", skip_serializing_if = "is_system_time_unix_epoch")]
write_mt_last: SystemTime,
#[serde(with = "humantime_serde", skip_serializing_if = "system_time_epoch")]
#[serde(with = "humantime_serde", skip_serializing_if = "is_system_time_unix_epoch")]
write_lt_last: SystemTime,
#[serde(with = "humantime_serde", skip_serializing_if = "system_time_epoch")]
#[serde(with = "humantime_serde", skip_serializing_if = "is_system_time_unix_epoch")]
updated: SystemTime,
#[serde(with = "humantime_serde")]
pong_last: Option<SystemTime>,
@@ -94,6 +154,7 @@ impl ChannelState {
write_st_last: SystemTime::UNIX_EPOCH,
write_mt_last: SystemTime::UNIX_EPOCH,
write_lt_last: SystemTime::UNIX_EPOCH,
channel_writes: ChannelWrites::unset(),
updated: SystemTime::UNIX_EPOCH,
pong_last: None,
}
@@ -125,6 +186,7 @@ impl ChannelState {
write_st_last: chst.write_st_last,
write_mt_last: chst.write_mt_last,
write_lt_last: chst.write_lt_last,
channel_writes: ChannelWrites::from_conn_channel_state_info(&chst),
updated: chst.stnow,
pong_last: chst.pong_last,
private,
@@ -147,7 +209,7 @@ impl StatePrivate {
}
}
fn system_time_epoch(x: &SystemTime) -> bool {
fn is_system_time_unix_epoch(x: &SystemTime) -> bool {
*x == SystemTime::UNIX_EPOCH
}
@@ -187,13 +249,12 @@ async fn channel_states_try(
tx: Sender<CaConnSetEvent>,
) -> Result<axum::Json<ChannelStates>, Error> {
let name = params.get("name").map_or(String::new(), |x| x.clone()).to_string();
let limit = params.get("limit").and_then(|x| x.parse().ok()).unwrap_or(40);
let limit = params.get("limit").and_then(|x| x.parse().ok()).unwrap_or(1000 * 1000);
let (tx2, rx2) = async_channel::bounded(1);
let req = ChannelStatusesRequest { name, limit, tx: tx2 };
let item = CaConnSetEvent::ConnSetCmd(ConnSetCmd::ChannelStatuses(req));
// TODO handle error
tx.send(item).await.unwrap();
let res = rx2.recv().await.unwrap();
tx.send(item).await.map_err(|e| Error::ChannelSend)?;
let res = rx2.recv().await.map_err(|e| Error::ChannelRecv)?;
let mut states = ChannelStates {
running_since: Utc::now(),
channels: BTreeMap::new(),