Log usage

This commit is contained in:
Dominik Werder
2025-05-26 16:31:10 +02:00
parent 195d550781
commit c17032db23
12 changed files with 224 additions and 91 deletions

View File

@@ -100,12 +100,12 @@ dependencies = [
[[package]]
name = "anstyle-wincon"
version = "3.0.7"
version = "3.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca3534e77181a9cc07539ad51f2141fe32f6c3ffd4df76db8ad92346b003ae4e"
checksum = "6680de5231bd6ee4c6191b8a1325daa282b415391ec9d3a37bd34f2060dc73fa"
dependencies = [
"anstyle",
"once_cell",
"once_cell_polyfill",
"windows-sys 0.59.0",
]
@@ -314,9 +314,9 @@ checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a"
[[package]]
name = "cc"
version = "1.2.22"
version = "1.2.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32db95edf998450acc7881c932f94cd9b05c87b4b2599e8bab064753da4acfd1"
checksum = "16595d3be041c03b09d08d0858631facccee9221e579704070e6e9e4915d3bc7"
dependencies = [
"shlex",
]
@@ -533,10 +533,11 @@ dependencies = [
[[package]]
name = "daqbuf-err"
version = "0.0.6"
version = "0.0.7"
dependencies = [
"anyhow",
"async-channel",
"autoerr",
"backtrace",
"chrono",
"http",
@@ -545,7 +546,6 @@ dependencies = [
"serde",
"serde_cbor",
"serde_json",
"thiserror 0.0.1",
"url",
]
@@ -718,7 +718,6 @@ dependencies = [
"serde",
"serde_cbor",
"serde_json",
"thiserror 0.0.1",
"typetag",
]
@@ -1245,9 +1244,9 @@ dependencies = [
[[package]]
name = "hyper-util"
version = "0.1.11"
version = "0.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "497bbc33a26fdd4af9ed9c70d63f61cf56a938375fbb32df34db9b1cd6d643f2"
checksum = "cf9f1e950e0d9d1d3c47184416723cf29c0d1f93bd8cccf37e4beb6b44f31710"
dependencies = [
"bytes",
"futures-util",
@@ -1332,9 +1331,9 @@ checksum = "00210d6893afc98edb752b664b8890f0ef174c8adbb8d0be9710fa66fbbf72d3"
[[package]]
name = "icu_properties"
version = "2.0.0"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2549ca8c7241c82f59c80ba2a6f415d931c5b58d24fb8412caa1a1f02c49139a"
checksum = "016c619c1eeb94efb86809b015c58f479963de65bdb6253345c1a1276f22e32b"
dependencies = [
"displaydoc",
"icu_collections",
@@ -1348,9 +1347,9 @@ dependencies = [
[[package]]
name = "icu_properties_data"
version = "2.0.0"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8197e866e47b68f8f7d95249e172903bec06004b18b2937f1095d40a0c57de04"
checksum = "298459143998310acd25ffe6810ed544932242d3f07083eee1084d83a71bd632"
[[package]]
name = "icu_provider"
@@ -1581,13 +1580,13 @@ dependencies = [
[[package]]
name = "mio"
version = "1.0.3"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd"
checksum = "78bed444cc8a2160f01cbcf811ef18cac863ad68ae8ca62092e8db51d51c761c"
dependencies = [
"libc",
"wasi 0.11.0+wasi-snapshot-preview1",
"windows-sys 0.52.0",
"windows-sys 0.59.0",
]
[[package]]
@@ -1702,6 +1701,12 @@ version = "1.21.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d"
[[package]]
name = "once_cell_polyfill"
version = "1.70.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4895175b425cb1f87721b59f0f286c2092bd4af812243672510e1ac53e2e0ad"
[[package]]
name = "overload"
version = "0.1.1"
@@ -2074,9 +2079,9 @@ dependencies = [
[[package]]
name = "rustversion"
version = "1.0.20"
version = "1.0.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eded382c5f5f786b989652c49544c4877d9f015cc22e145a5ea8ea66c2921cd2"
checksum = "8a0d197bd2c9dc6e53b84da9556a69ba4cdfab8619eb41a8bd1cc2027a0f6b1d"
[[package]]
name = "ryu"
@@ -2115,7 +2120,7 @@ dependencies = [
"smallvec",
"snap",
"socket2",
"thiserror 2.0.12",
"thiserror",
"tokio",
"tracing",
"uuid",
@@ -2144,7 +2149,7 @@ dependencies = [
"smallvec",
"snap",
"socket2",
"thiserror 2.0.12",
"thiserror",
"tokio",
"tracing",
"uuid",
@@ -2163,7 +2168,7 @@ dependencies = [
"scylla-macros 0.7.1",
"snap",
"stable_deref_trait",
"thiserror 2.0.12",
"thiserror",
"tokio",
"uuid",
"yoke 0.7.5",
@@ -2184,7 +2189,7 @@ dependencies = [
"scylla-macros 1.1.0",
"snap",
"stable_deref_trait",
"thiserror 2.0.12",
"thiserror",
"tokio",
"uuid",
"yoke 0.7.5",
@@ -2535,31 +2540,13 @@ dependencies = [
"tracing-subscriber",
]
[[package]]
name = "thiserror"
version = "0.0.1"
source = "git+https://github.com/dominikwerder/thiserror.git?branch=cstm#8d3fc303d3741068c05ce2b533c058fa44bf9a1d"
dependencies = [
"thiserror-impl 1.0.61",
]
[[package]]
name = "thiserror"
version = "2.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "567b8a2dae586314f7be2a752ec7474332959c6460e02bde30d702a66d488708"
dependencies = [
"thiserror-impl 2.0.12",
]
[[package]]
name = "thiserror-impl"
version = "1.0.61"
source = "git+https://github.com/dominikwerder/thiserror.git?branch=cstm#8d3fc303d3741068c05ce2b533c058fa44bf9a1d"
dependencies = [
"proc-macro2",
"quote",
"syn",
"thiserror-impl",
]
[[package]]
@@ -2641,9 +2628,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
version = "1.45.0"
version = "1.45.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2513ca694ef9ede0fb23fe71a4ee4107cb102b9dc1930f6d0fd77aae068ae165"
checksum = "75ef51a33ef1da925cea3e4eb122833cb377c61439ca401b770f54902b806779"
dependencies = [
"backtrace",
"bytes",
@@ -2910,11 +2897,13 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
[[package]]
name = "uuid"
version = "1.16.0"
version = "1.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "458f7a779bf54acc9f347480ac654f68407d3aab21269a6e3c9f922acd9e2da9"
checksum = "3cf4199d1e5d15ddd86a694e4d0dffa9c323ce759fea589f00fef9d81cc1931d"
dependencies = [
"getrandom 0.3.3",
"js-sys",
"wasm-bindgen",
]
[[package]]
@@ -3053,9 +3042,9 @@ checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "windows-core"
version = "0.61.0"
version = "0.61.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4763c1de310c86d75a878046489e2e5ba02c649d185f21c67d4cf8a56d098980"
checksum = "c0fdd3ddb90610c7638aa2b3a3ab2904fb9e5cdbecc643ddb3647212781c4ae3"
dependencies = [
"windows-implement",
"windows-interface",
@@ -3094,18 +3083,18 @@ checksum = "76840935b766e1b0a05c0066835fb9ec80071d4c09a16f6bd5f7e655e3c14c38"
[[package]]
name = "windows-result"
version = "0.3.2"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c64fd11a4fd95df68efcfee5f44a294fe71b8bc6a91993e2791938abcc712252"
checksum = "56f42bd332cc6c8eac5af113fc0c1fd6a8fd2aa08a0119358686e5160d0586c6"
dependencies = [
"windows-link",
]
[[package]]
name = "windows-strings"
version = "0.4.0"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a2ba9642430ee452d5a7aa78d72907ebe8cfda358e8cb7918a2050581322f97"
checksum = "56e6c93f3a0c3b36176cb1327a4958a0353d5d166c2a35cb268ace15e91d3b57"
dependencies = [
"windows-link",
]

View File

@@ -932,7 +932,7 @@ fn handler_sigterm(_a: libc::c_int, _b: *const libc::siginfo_t, _c: *const libc:
}
pub async fn run(opts: CaIngestOpts, channels_config: Option<ChannelsConfig>) -> Result<(), Error> {
info!("start up {opts:?}");
info!("start up {:?}", opts);
ingest_linux::signal::set_signal_handler(libc::SIGINT, handler_sigint).map_err(Error::from_string)?;
ingest_linux::signal::set_signal_handler(libc::SIGTERM, handler_sigterm).map_err(Error::from_string)?;
{

View File

@@ -10,7 +10,7 @@ pub type PgClient = Client;
pub async fn make_pg_client(dbconf: &Database) -> Result<(PgClient, JoinHandle<Result<(), Error>>), Error> {
let d = dbconf;
let url = format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, d.port, d.name);
info!("connect to {url}");
info!("connect to {}", url);
let (client, pg_conn) = tokio_postgres::connect(&url, tokio_postgres::tls::NoTls).await?;
// TODO allow clean shutdown on ctrl-c and join the pg_conn in the end:
let jh = tokio::spawn(async move {

View File

@@ -1,6 +1,61 @@
#![allow(unused_imports)]
pub use tracing::debug;
pub use tracing::error;
pub use tracing::info;
// pub use tracing::info;
pub use tracing::trace;
pub use tracing::warn;
pub use direct_info as info;
pub mod log_macros_direct {
#[allow(unused)]
#[macro_export]
macro_rules! direct_trace {
($fmt:expr) => {
eprintln!(concat!("TRACE ", $fmt));
};
($fmt:expr, $($arg:expr),*) => {
eprintln!(concat!("TRACE ", $fmt), $($arg)*);
};
}
#[allow(unused)]
#[macro_export]
macro_rules! direct_debug {
($fmt:expr) => {
eprintln!(concat!("DEBUG ", $fmt));
};
($fmt:expr, $($arg:expr),*) => {
eprintln!(concat!("DEBUG ", $fmt), $($arg)*);
};
}
#[allow(unused)]
#[macro_export]
macro_rules! direct_info {
($fmt:expr) => {
eprintln!(concat!("INFO ", $fmt));
};
($fmt:expr, $($arg:tt)*) => {
eprintln!(concat!("INFO ", $fmt), $($arg)*);
};
}
#[allow(unused)]
#[macro_export]
macro_rules! direct_warn {
($fmt:expr) => {
eprintln!(concat!("WARN ", $fmt));
};
($fmt:expr, $($arg:expr),*) => {
eprintln!(concat!("WARN ", $fmt), $($arg)*);
};
}
#[allow(unused)]
#[macro_export]
macro_rules! direct_error {
($fmt:expr) => {
eprintln!(concat!("ERROR ", $fmt));
};
($fmt:expr, $($arg:expr),*) => {
eprintln!(concat!("ERROR ", $fmt), $($arg)*);
};
}
}

View File

@@ -1551,7 +1551,7 @@ impl CaConn {
};
{
if dbg_chn_cid {
info!("send out EventAdd for {cid:?}");
info!("send out EventAdd for {:?}", cid);
}
let data_count = st2.channel.ca_dbr_count;
let _data_count = 0;
@@ -2881,7 +2881,7 @@ impl CaConn {
if let Some(conf) = self.channels.get(&cid) {
let name = conf.conf.name();
if series::dbg::dbg_chn(&name) {
info!("queue event to notice channel create fail {name}");
info!("queue event to notice channel create fail {}", name);
}
let name2 = name.to_string();
let failinfo = format!("name {} cid {}", name, cid);

View File

@@ -743,7 +743,7 @@ impl CaConnSet {
}
self.mett.channel_status_series_found().inc();
if series::dbg::dbg_chn(&name) {
info!("handle_add_channel_with_status_id {cmd:?}");
info!("handle_add_channel_with_status_id {:?}", cmd);
}
let ch = ChannelName::new(name.into());
if let Some(chst) = self.channel_states.get_mut(&ch) {
@@ -808,7 +808,7 @@ impl CaConnSet {
return Err(Error::ExpectIpv4);
};
if series::dbg::dbg_chn(&name) {
info!("handle_add_channel_with_addr {cmd:?}");
info!("handle_add_channel_with_addr {:?}", cmd);
}
let ch = ChannelName::new(name.into());
if let Some(chst) = self.channel_states.get_mut(&ch) {
@@ -816,7 +816,7 @@ impl CaConnSet {
chst.config = cmd.ch_cfg.clone();
if let ChannelStateValue::Active(ast) = &mut chst.value {
if let ActiveChannelState::WithStatusSeriesId(st3) = ast {
trace!("handle_add_channel_with_addr INNER {cmd:?}");
trace!("handle_add_channel_with_addr INNER {:?}", cmd);
self.mett.handle_add_channel_with_addr().inc();
let tsnow = SystemTime::now();
let mut writer_status = serieswriter::writer::SeriesWriter::new(SeriesId::new(cmd.cssid.id()))?;
@@ -850,9 +850,9 @@ impl CaConnSet {
};
let addr = cmd.addr;
if self.ca_conn_ress.contains_key(&addr) {
trace!("ca_conn_ress has already {addr:?}");
trace!("ca_conn_ress has already {:?}", addr_v4);
} else {
trace!("ca_conn_ress NEW {addr:?}");
trace!("ca_conn_ress NEW {:?}", addr);
let c = self.create_ca_conn(cmd.clone())?;
self.ca_conn_ress.insert(addr, c);
}
@@ -927,14 +927,14 @@ impl CaConnSet {
for res in results {
let ch = ChannelName::new(res.channel.clone());
if series::dbg::dbg_chn(&ch.name()) {
info!("handle_ioc_query_result {res:?}");
info!("handle_ioc_query_result {:?}", res);
}
if let Some(chst) = self.channel_states.get_mut(&ch) {
if let ChannelStateValue::Active(ast) = &mut chst.value {
if let ActiveChannelState::WithStatusSeriesId(st2) = ast {
if let Some(addr) = res.addr {
self.mett.ioc_addr_found().inc();
trace!("ioc found {res:?}");
trace!("ioc found {:?}", res);
let cmd = ChannelAddWithAddr {
ch_cfg: chst.config.clone(),
addr: SocketAddr::V4(addr),
@@ -943,7 +943,7 @@ impl CaConnSet {
self.handle_add_channel_with_addr(cmd)?;
} else {
self.mett.ioc_addr_not_found().inc();
trace!("ioc not found {res:?}");
trace!("ioc not found {:?}", res);
let since = SystemTime::now();
st2.inner = WithStatusSeriesIdStateInner::UnknownAddress { since };
}
@@ -1058,12 +1058,12 @@ impl CaConnSet {
}
fn apply_ca_conn_health_update(&mut self, addr: SocketAddr, res: ChannelStatusPartial) -> Result<(), Error> {
trace_health_update!("apply_ca_conn_health_update {addr}");
trace_health_update!("apply_ca_conn_health_update {}", addr);
let tsnow = SystemTime::now();
let mut rogue_channel_count = 0;
for (k, v) in res.channel_statuses {
trace_health_update!("self.rogue_channel_count {}", rogue_channel_count);
trace_health_update!("apply_ca_conn_health_update {k:?} {v:?}");
trace_health_update!("apply_ca_conn_health_update {:?} {:?}", k, v);
let ch = if let Some(x) = self.channel_by_cssid.get(&k) {
x
} else {
@@ -1146,7 +1146,7 @@ impl CaConnSet {
self.await_ca_conn_jhs.push_back((addr, e.jh));
} else {
self.mett.ca_conn_eos_unexpected().inc();
warn!("end-of-stream received for non-existent CaConn {addr}");
warn!("end-of-stream received for non-existent CaConn {}", addr);
}
{
use EndOfStreamReason::*;
@@ -1156,7 +1156,7 @@ impl CaConnSet {
self.handle_connect_fail(addr)?
}
Error(e) => {
warn!("received error {addr} {e}");
warn!("received error {} {}", addr, e);
self.handle_connect_fail(addr)?
}
ConnectRefused => self.handle_connect_fail(addr)?,
@@ -1175,7 +1175,7 @@ impl CaConnSet {
}
fn handle_ca_conn_channel_removed(&mut self, addr: SocketAddr, name: String) -> Result<(), Error> {
debug!("handle_ca_conn_channel_removed {addr} {name}");
debug!("handle_ca_conn_channel_removed {} {}", addr, name);
let name = ChannelName::new(name);
if let Some(st1) = self.channel_states.get_mut(&name) {
match &mut st1.value {
@@ -1332,7 +1332,7 @@ impl CaConnSet {
.await?;
}
Err(e) => {
error!("ca_conn_item_merge received from inner: {e}");
error!("ca_conn_item_merge received from inner: {}", e);
}
}
Ok(())
@@ -1351,7 +1351,7 @@ impl CaConnSet {
// let e = Error::with_msg_no_trace(format!("CaConn delivered already eos {addr} {x:?}"));
// error!("{e}");
// return Err(e);
warn!("CaConn {addr} EOS reason [{x:?}] after [{eos_reason:?}]");
warn!("CaConn {} EOS reason [{:?}] after [{:?}]", addr, x, eos_reason);
}
match item.value {
CaConnEventValue::None
@@ -1425,7 +1425,7 @@ impl CaConnSet {
WithStatusSeriesIdStateInner::AddrSearchPending { since } => {
let dt = stnow.duration_since(*since).unwrap_or(Duration::ZERO);
if dt > SEARCH_PENDING_TIMEOUT {
info!("should receive some error indication instead of timeout for {ch:?}");
info!("should receive some error indication instead of timeout for {:?}", ch);
st3.inner = WithStatusSeriesIdStateInner::NoAddress { since: stnow };
search_pending_count -= 1;
}
@@ -1455,9 +1455,9 @@ impl CaConnSet {
}
if st4.updated + CHANNEL_HEALTH_TIMEOUT < stnow {
channel_health_timeout_reached += 1;
trace!("health timeout channel {ch:?} ~~~~~~~~~~~~~~~~~~~");
trace!("health timeout channel {:?} ~~~~~~~~~~~~~~~~~~~", ch);
// TODO
error!("TODO health timeout channel {ch:?} ~~~~~~~~~~~~~~~~~~~");
error!("TODO health timeout channel {:?} ~~~~~~~~~~~~~~~~~~~", ch);
if true {
std::process::exit(1);
}

View File

@@ -628,7 +628,7 @@ pub async fn metrics_service(
shutdown_signal: Receiver<u32>,
rres: Arc<RoutesResources>,
) -> Result<(), Error> {
info!("metrics service start {bind_to}");
info!("metrics service start {}", bind_to);
let addr: SocketAddr = bind_to.parse().map_err(Error::from_string)?;
let router = make_routes(rres, dcom, connset_cmd_tx, stats_set).into_make_service();
let listener = TcpListener::bind(addr).await?;

View File

@@ -55,7 +55,7 @@ impl Changeset {
fn log_statements(&self) {
for q in &self.todo {
info!("would execute:\n{q}\n");
info!("would execute:\n{}\n", q);
}
}
}
@@ -540,10 +540,10 @@ async fn migrate_scylla_data_schema(
),
ks, rf, durable
);
info!("scylla create keyspace {cql}");
info!("scylla create keyspace {}", cql);
chs.add_todo(cql);
} else {
info!("scylla has keyspace {ks}");
info!("scylla has keyspace {}", ks);
}
check_event_tables(ks, rett.clone(), chs, scy).await?;

View File

@@ -32,7 +32,7 @@ pub async fn list_pkey(scylla_conf: &ScyllaIngestConfig) -> Result<(), Error> {
.await?
.rows_stream::<(i64, i64)>()?;
while let Some((pulse_a_token, pulse_a)) = it.try_next().await? {
info!("pulse_a_token {pulse_a_token} pulse_a {pulse_a}");
info!("pulse_a_token {} pulse_a {}", pulse_a_token, pulse_a);
pulse_a_max = pulse_a_max.max(pulse_a);
}
if t2 == i64::MAX {
@@ -42,7 +42,7 @@ pub async fn list_pkey(scylla_conf: &ScyllaIngestConfig) -> Result<(), Error> {
t1 = t2 + 1;
}
}
info!("pulse_a_max {pulse_a_max}");
info!("pulse_a_max {}", pulse_a_max);
Ok(())
}
@@ -62,7 +62,10 @@ pub async fn list_pulses(scylla_conf: &ScyllaIngestConfig) -> Result<(), Error>
.await?
.rows_stream::<(i64, i32, i32, i64)>()?;
while let Some((tsa_token, tsa, tsb, pulse)) = it.try_next().await? {
info!("tsa_token {tsa_token:21} tsa {tsa:12} tsb {tsb:12} pulse {pulse:21}");
info!(
"tsa_token {:21} tsa {:12} tsb {:12} pulse {:21}",
tsa_token, tsa, tsb, pulse
);
}
if t2 == i64::MAX {
info!("end of token range");
@@ -92,7 +95,10 @@ pub async fn fetch_events(backend: &str, channel: &str, scylla_conf: &ScyllaInge
.rows_stream::<(i64, i32, i32, i64)>()?;
while let Some((tsa_token, tsa, tsb, pulse)) = it.try_next().await? {
if false {
info!("tsa_token {tsa_token:21} tsa {tsa:12} tsb {tsb:12} pulse {pulse:21}");
info!(
"tsa_token {:21} tsa {:12} tsb {:12} pulse {:21}",
tsa_token, tsa, tsb, pulse
);
}
rowcnt += 1;
}

View File

@@ -1,3 +1,6 @@
#[cfg(test)]
mod test;
use crate::log;
use crate::rtwriter::MinQuiets;
use items_0::timebin::BinnedBinsTimeweightTrait;
@@ -158,6 +161,7 @@ impl BinWriter {
const DUR_MAX: DtMs = DtMs::from_ms_u64(1000 * 60 * 60 * 24 * 123);
let rts = [RetentionTime::Short, RetentionTime::Medium, RetentionTime::Long];
let quiets = [min_quiets.st.clone(), min_quiets.mt.clone(), min_quiets.lt.clone()];
let cnt_zero_disable = WriteCntZero::Disable;
let mut binner_1st = None;
let mut binner_others = Vec::new();
let mut has_monitor = None;
@@ -170,7 +174,7 @@ impl BinWriter {
}
})
.filter(|x| x.1 > DUR_ZERO && x.1 < DUR_MAX)
.map(|x| (x.0, bin_len_clamp(x.1), WriteCntZero::Disable))
.map(|x| (x.0, bin_len_clamp(x.1), cnt_zero_disable.clone()))
.collect();
let has_monitor = has_monitor;
debug_init!(trd, "has_monitor {:?} is_polled {:?}", has_monitor, is_polled);
@@ -186,27 +190,39 @@ impl BinWriter {
combs.push((RetentionTime::Long, PrebinnedPartitioning::Day1, WriteCntZero::Enable));
}
_ => {
combs.push((RetentionTime::Long, PrebinnedPartitioning::Hour1, WriteCntZero::Disable));
combs.push((
RetentionTime::Long,
PrebinnedPartitioning::Hour1,
cnt_zero_disable.clone(),
));
combs.push((RetentionTime::Long, PrebinnedPartitioning::Day1, WriteCntZero::Enable));
}
}
} else {
match &has_monitor {
Some(RetentionTime::Short) => {
combs.push((RetentionTime::Short, PrebinnedPartitioning::Min1, WriteCntZero::Disable));
combs.push((
RetentionTime::Short,
PrebinnedPartitioning::Min1,
cnt_zero_disable.clone(),
));
combs.push((
RetentionTime::Medium,
PrebinnedPartitioning::Hour1,
WriteCntZero::Disable,
cnt_zero_disable.clone(),
));
combs.push((RetentionTime::Long, PrebinnedPartitioning::Day1, WriteCntZero::Enable));
}
Some(RetentionTime::Medium) => {
combs.push((RetentionTime::Short, PrebinnedPartitioning::Min1, WriteCntZero::Disable));
combs.push((
RetentionTime::Short,
PrebinnedPartitioning::Min1,
cnt_zero_disable.clone(),
));
combs.push((
RetentionTime::Medium,
PrebinnedPartitioning::Hour1,
WriteCntZero::Disable,
cnt_zero_disable.clone(),
));
combs.push((RetentionTime::Long, PrebinnedPartitioning::Day1, WriteCntZero::Enable));
}
@@ -214,9 +230,13 @@ impl BinWriter {
combs.push((
RetentionTime::Medium,
PrebinnedPartitioning::Min1,
WriteCntZero::Disable,
cnt_zero_disable.clone(),
));
combs.push((
RetentionTime::Long,
PrebinnedPartitioning::Hour1,
cnt_zero_disable.clone(),
));
combs.push((RetentionTime::Long, PrebinnedPartitioning::Hour1, WriteCntZero::Disable));
combs.push((RetentionTime::Long, PrebinnedPartitioning::Day1, WriteCntZero::Enable));
}
None => {
@@ -458,6 +478,8 @@ impl BinWriter {
}
let bins_len = bins.len();
for (ts1, ts2, cnt, min, max, avg, lst, fnl) in bins.zip_iter_2() {
eprintln!("cnt {}", cnt);
info!("cnt {}", cnt);
let bin_len = DtMs::from_ms_u64(ts2.delta(ts1).ms_u64());
if fnl == false {
info!("non final bin {:?}", series);

View File

@@ -0,0 +1,43 @@
use super::BinWriter;
use crate::rtwriter::MinQuiets;
use netpod::ScalarType;
use netpod::Shape;
use netpod::TsNano;
use scywr::insertqueues::InsertDeques;
use series::ChannelStatusSeriesId;
use series::SeriesId;
fn sec(sec: f32) -> TsNano {
TsNano::from_ms((1e3 * sec) as u64)
}
#[test]
fn binwriter_nest01_00() {
let beg = TsNano::from_ms(1000 * 40);
let min_quiets = MinQuiets::test_1_10_60();
let is_polled = false;
let cssid = ChannelStatusSeriesId::new(50);
let sid = SeriesId::new(51);
let scalar_type = ScalarType::F32;
let shape = Shape::Scalar;
let chname2 = String::from("DUMMY");
let mut iqdqs = InsertDeques::new();
let mut binwriter = BinWriter::new(beg, min_quiets, is_polled, cssid, sid, scalar_type, shape, chname2).unwrap();
binwriter.ingest(sec(39.9), 2., &mut iqdqs).unwrap();
binwriter.ingest(sec(40.0), 2., &mut iqdqs).unwrap();
binwriter.ingest(sec(40.1), 2., &mut iqdqs).unwrap();
binwriter.ingest(sec(50.0), 2., &mut iqdqs).unwrap();
binwriter.ingest(sec(60.0), 2., &mut iqdqs).unwrap();
// binwriter.ingest(sec(70.0), 2., &mut iqdqs).unwrap();
binwriter.tick(&mut iqdqs).unwrap();
eprintln!("iqdqs summary {}", iqdqs.summary());
for x in iqdqs.st_rf3_qu {
eprintln!("ST {:?}", x);
}
for x in iqdqs.mt_rf3_qu {
eprintln!("MT {:?}", x);
}
for x in iqdqs.lt_rf3_qu {
eprintln!("LT {:?}", x);
}
}

View File

@@ -32,6 +32,24 @@ pub struct MinQuiets {
pub lt: Duration,
}
impl MinQuiets {
pub fn test_mon_1_10() -> Self {
Self {
st: Duration::from_millis(0),
mt: Duration::from_millis(1000 * 1),
lt: Duration::from_millis(1000 * 10),
}
}
pub fn test_1_10_60() -> Self {
Self {
st: Duration::from_millis(1000 * 1),
mt: Duration::from_millis(1000 * 10),
lt: Duration::from_millis(1000 * 60),
}
}
}
#[derive(Debug, Serialize)]
struct State<ET>
where