From c17032db231e6de51291992bf939ddcaf5f65887 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Mon, 26 May 2025 16:31:10 +0200 Subject: [PATCH] Log usage --- .cargo/cargo-lock | 93 +++++++++++++----------------- daqingest/src/daemon.rs | 2 +- dbpg/src/conn.rs | 2 +- log/src/log.rs | 57 +++++++++++++++++- netfetch/src/ca/conn.rs | 4 +- netfetch/src/ca/connset.rs | 36 ++++++------ netfetch/src/metrics.rs | 2 +- scywr/src/schema.rs | 6 +- scywr/src/tools.rs | 14 +++-- serieswriter/src/binwriter.rs | 38 +++++++++--- serieswriter/src/binwriter/test.rs | 43 ++++++++++++++ serieswriter/src/rtwriter.rs | 18 ++++++ 12 files changed, 224 insertions(+), 91 deletions(-) create mode 100644 serieswriter/src/binwriter/test.rs diff --git a/.cargo/cargo-lock b/.cargo/cargo-lock index df78ee1..c9e49d6 100644 --- a/.cargo/cargo-lock +++ b/.cargo/cargo-lock @@ -100,12 +100,12 @@ dependencies = [ [[package]] name = "anstyle-wincon" -version = "3.0.7" +version = "3.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca3534e77181a9cc07539ad51f2141fe32f6c3ffd4df76db8ad92346b003ae4e" +checksum = "6680de5231bd6ee4c6191b8a1325daa282b415391ec9d3a37bd34f2060dc73fa" dependencies = [ "anstyle", - "once_cell", + "once_cell_polyfill", "windows-sys 0.59.0", ] @@ -314,9 +314,9 @@ checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" [[package]] name = "cc" -version = "1.2.22" +version = "1.2.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32db95edf998450acc7881c932f94cd9b05c87b4b2599e8bab064753da4acfd1" +checksum = "16595d3be041c03b09d08d0858631facccee9221e579704070e6e9e4915d3bc7" dependencies = [ "shlex", ] @@ -533,10 +533,11 @@ dependencies = [ [[package]] name = "daqbuf-err" -version = "0.0.6" +version = "0.0.7" dependencies = [ "anyhow", "async-channel", + "autoerr", "backtrace", "chrono", "http", @@ -545,7 +546,6 @@ dependencies = [ "serde", "serde_cbor", "serde_json", - "thiserror 0.0.1", "url", ] @@ -718,7 +718,6 @@ dependencies = [ "serde", "serde_cbor", "serde_json", - "thiserror 0.0.1", "typetag", ] @@ -1245,9 +1244,9 @@ dependencies = [ [[package]] name = "hyper-util" -version = "0.1.11" +version = "0.1.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "497bbc33a26fdd4af9ed9c70d63f61cf56a938375fbb32df34db9b1cd6d643f2" +checksum = "cf9f1e950e0d9d1d3c47184416723cf29c0d1f93bd8cccf37e4beb6b44f31710" dependencies = [ "bytes", "futures-util", @@ -1332,9 +1331,9 @@ checksum = "00210d6893afc98edb752b664b8890f0ef174c8adbb8d0be9710fa66fbbf72d3" [[package]] name = "icu_properties" -version = "2.0.0" +version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2549ca8c7241c82f59c80ba2a6f415d931c5b58d24fb8412caa1a1f02c49139a" +checksum = "016c619c1eeb94efb86809b015c58f479963de65bdb6253345c1a1276f22e32b" dependencies = [ "displaydoc", "icu_collections", @@ -1348,9 +1347,9 @@ dependencies = [ [[package]] name = "icu_properties_data" -version = "2.0.0" +version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8197e866e47b68f8f7d95249e172903bec06004b18b2937f1095d40a0c57de04" +checksum = "298459143998310acd25ffe6810ed544932242d3f07083eee1084d83a71bd632" [[package]] name = "icu_provider" @@ -1581,13 +1580,13 @@ dependencies = [ [[package]] name = "mio" -version = "1.0.3" +version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd" +checksum = "78bed444cc8a2160f01cbcf811ef18cac863ad68ae8ca62092e8db51d51c761c" dependencies = [ "libc", "wasi 0.11.0+wasi-snapshot-preview1", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -1702,6 +1701,12 @@ version = "1.21.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" +[[package]] +name = "once_cell_polyfill" +version = "1.70.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4895175b425cb1f87721b59f0f286c2092bd4af812243672510e1ac53e2e0ad" + [[package]] name = "overload" version = "0.1.1" @@ -2074,9 +2079,9 @@ dependencies = [ [[package]] name = "rustversion" -version = "1.0.20" +version = "1.0.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eded382c5f5f786b989652c49544c4877d9f015cc22e145a5ea8ea66c2921cd2" +checksum = "8a0d197bd2c9dc6e53b84da9556a69ba4cdfab8619eb41a8bd1cc2027a0f6b1d" [[package]] name = "ryu" @@ -2115,7 +2120,7 @@ dependencies = [ "smallvec", "snap", "socket2", - "thiserror 2.0.12", + "thiserror", "tokio", "tracing", "uuid", @@ -2144,7 +2149,7 @@ dependencies = [ "smallvec", "snap", "socket2", - "thiserror 2.0.12", + "thiserror", "tokio", "tracing", "uuid", @@ -2163,7 +2168,7 @@ dependencies = [ "scylla-macros 0.7.1", "snap", "stable_deref_trait", - "thiserror 2.0.12", + "thiserror", "tokio", "uuid", "yoke 0.7.5", @@ -2184,7 +2189,7 @@ dependencies = [ "scylla-macros 1.1.0", "snap", "stable_deref_trait", - "thiserror 2.0.12", + "thiserror", "tokio", "uuid", "yoke 0.7.5", @@ -2535,31 +2540,13 @@ dependencies = [ "tracing-subscriber", ] -[[package]] -name = "thiserror" -version = "0.0.1" -source = "git+https://github.com/dominikwerder/thiserror.git?branch=cstm#8d3fc303d3741068c05ce2b533c058fa44bf9a1d" -dependencies = [ - "thiserror-impl 1.0.61", -] - [[package]] name = "thiserror" version = "2.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "567b8a2dae586314f7be2a752ec7474332959c6460e02bde30d702a66d488708" dependencies = [ - "thiserror-impl 2.0.12", -] - -[[package]] -name = "thiserror-impl" -version = "1.0.61" -source = "git+https://github.com/dominikwerder/thiserror.git?branch=cstm#8d3fc303d3741068c05ce2b533c058fa44bf9a1d" -dependencies = [ - "proc-macro2", - "quote", - "syn", + "thiserror-impl", ] [[package]] @@ -2641,9 +2628,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.45.0" +version = "1.45.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2513ca694ef9ede0fb23fe71a4ee4107cb102b9dc1930f6d0fd77aae068ae165" +checksum = "75ef51a33ef1da925cea3e4eb122833cb377c61439ca401b770f54902b806779" dependencies = [ "backtrace", "bytes", @@ -2910,11 +2897,13 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "uuid" -version = "1.16.0" +version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "458f7a779bf54acc9f347480ac654f68407d3aab21269a6e3c9f922acd9e2da9" +checksum = "3cf4199d1e5d15ddd86a694e4d0dffa9c323ce759fea589f00fef9d81cc1931d" dependencies = [ "getrandom 0.3.3", + "js-sys", + "wasm-bindgen", ] [[package]] @@ -3053,9 +3042,9 @@ checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" [[package]] name = "windows-core" -version = "0.61.0" +version = "0.61.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4763c1de310c86d75a878046489e2e5ba02c649d185f21c67d4cf8a56d098980" +checksum = "c0fdd3ddb90610c7638aa2b3a3ab2904fb9e5cdbecc643ddb3647212781c4ae3" dependencies = [ "windows-implement", "windows-interface", @@ -3094,18 +3083,18 @@ checksum = "76840935b766e1b0a05c0066835fb9ec80071d4c09a16f6bd5f7e655e3c14c38" [[package]] name = "windows-result" -version = "0.3.2" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c64fd11a4fd95df68efcfee5f44a294fe71b8bc6a91993e2791938abcc712252" +checksum = "56f42bd332cc6c8eac5af113fc0c1fd6a8fd2aa08a0119358686e5160d0586c6" dependencies = [ "windows-link", ] [[package]] name = "windows-strings" -version = "0.4.0" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a2ba9642430ee452d5a7aa78d72907ebe8cfda358e8cb7918a2050581322f97" +checksum = "56e6c93f3a0c3b36176cb1327a4958a0353d5d166c2a35cb268ace15e91d3b57" dependencies = [ "windows-link", ] diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index 8644f7e..f5c7c8e 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -932,7 +932,7 @@ fn handler_sigterm(_a: libc::c_int, _b: *const libc::siginfo_t, _c: *const libc: } pub async fn run(opts: CaIngestOpts, channels_config: Option) -> Result<(), Error> { - info!("start up {opts:?}"); + info!("start up {:?}", opts); ingest_linux::signal::set_signal_handler(libc::SIGINT, handler_sigint).map_err(Error::from_string)?; ingest_linux::signal::set_signal_handler(libc::SIGTERM, handler_sigterm).map_err(Error::from_string)?; { diff --git a/dbpg/src/conn.rs b/dbpg/src/conn.rs index e5afbc2..86ca9a2 100644 --- a/dbpg/src/conn.rs +++ b/dbpg/src/conn.rs @@ -10,7 +10,7 @@ pub type PgClient = Client; pub async fn make_pg_client(dbconf: &Database) -> Result<(PgClient, JoinHandle>), Error> { let d = dbconf; let url = format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, d.port, d.name); - info!("connect to {url}"); + info!("connect to {}", url); let (client, pg_conn) = tokio_postgres::connect(&url, tokio_postgres::tls::NoTls).await?; // TODO allow clean shutdown on ctrl-c and join the pg_conn in the end: let jh = tokio::spawn(async move { diff --git a/log/src/log.rs b/log/src/log.rs index db88b50..5bb6c52 100644 --- a/log/src/log.rs +++ b/log/src/log.rs @@ -1,6 +1,61 @@ #![allow(unused_imports)] pub use tracing::debug; pub use tracing::error; -pub use tracing::info; +// pub use tracing::info; pub use tracing::trace; pub use tracing::warn; + +pub use direct_info as info; + +pub mod log_macros_direct { + #[allow(unused)] + #[macro_export] + macro_rules! direct_trace { + ($fmt:expr) => { + eprintln!(concat!("TRACE ", $fmt)); + }; + ($fmt:expr, $($arg:expr),*) => { + eprintln!(concat!("TRACE ", $fmt), $($arg)*); + }; + } + #[allow(unused)] + #[macro_export] + macro_rules! direct_debug { + ($fmt:expr) => { + eprintln!(concat!("DEBUG ", $fmt)); + }; + ($fmt:expr, $($arg:expr),*) => { + eprintln!(concat!("DEBUG ", $fmt), $($arg)*); + }; + } + #[allow(unused)] + #[macro_export] + macro_rules! direct_info { + ($fmt:expr) => { + eprintln!(concat!("INFO ", $fmt)); + }; + ($fmt:expr, $($arg:tt)*) => { + eprintln!(concat!("INFO ", $fmt), $($arg)*); + }; + } + #[allow(unused)] + #[macro_export] + macro_rules! direct_warn { + ($fmt:expr) => { + eprintln!(concat!("WARN ", $fmt)); + }; + ($fmt:expr, $($arg:expr),*) => { + eprintln!(concat!("WARN ", $fmt), $($arg)*); + }; + } + #[allow(unused)] + #[macro_export] + macro_rules! direct_error { + ($fmt:expr) => { + eprintln!(concat!("ERROR ", $fmt)); + }; + ($fmt:expr, $($arg:expr),*) => { + eprintln!(concat!("ERROR ", $fmt), $($arg)*); + }; + } +} diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 55e82de..91025fc 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -1551,7 +1551,7 @@ impl CaConn { }; { if dbg_chn_cid { - info!("send out EventAdd for {cid:?}"); + info!("send out EventAdd for {:?}", cid); } let data_count = st2.channel.ca_dbr_count; let _data_count = 0; @@ -2881,7 +2881,7 @@ impl CaConn { if let Some(conf) = self.channels.get(&cid) { let name = conf.conf.name(); if series::dbg::dbg_chn(&name) { - info!("queue event to notice channel create fail {name}"); + info!("queue event to notice channel create fail {}", name); } let name2 = name.to_string(); let failinfo = format!("name {} cid {}", name, cid); diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index c82ad71..5987326 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -743,7 +743,7 @@ impl CaConnSet { } self.mett.channel_status_series_found().inc(); if series::dbg::dbg_chn(&name) { - info!("handle_add_channel_with_status_id {cmd:?}"); + info!("handle_add_channel_with_status_id {:?}", cmd); } let ch = ChannelName::new(name.into()); if let Some(chst) = self.channel_states.get_mut(&ch) { @@ -808,7 +808,7 @@ impl CaConnSet { return Err(Error::ExpectIpv4); }; if series::dbg::dbg_chn(&name) { - info!("handle_add_channel_with_addr {cmd:?}"); + info!("handle_add_channel_with_addr {:?}", cmd); } let ch = ChannelName::new(name.into()); if let Some(chst) = self.channel_states.get_mut(&ch) { @@ -816,7 +816,7 @@ impl CaConnSet { chst.config = cmd.ch_cfg.clone(); if let ChannelStateValue::Active(ast) = &mut chst.value { if let ActiveChannelState::WithStatusSeriesId(st3) = ast { - trace!("handle_add_channel_with_addr INNER {cmd:?}"); + trace!("handle_add_channel_with_addr INNER {:?}", cmd); self.mett.handle_add_channel_with_addr().inc(); let tsnow = SystemTime::now(); let mut writer_status = serieswriter::writer::SeriesWriter::new(SeriesId::new(cmd.cssid.id()))?; @@ -850,9 +850,9 @@ impl CaConnSet { }; let addr = cmd.addr; if self.ca_conn_ress.contains_key(&addr) { - trace!("ca_conn_ress has already {addr:?}"); + trace!("ca_conn_ress has already {:?}", addr_v4); } else { - trace!("ca_conn_ress NEW {addr:?}"); + trace!("ca_conn_ress NEW {:?}", addr); let c = self.create_ca_conn(cmd.clone())?; self.ca_conn_ress.insert(addr, c); } @@ -927,14 +927,14 @@ impl CaConnSet { for res in results { let ch = ChannelName::new(res.channel.clone()); if series::dbg::dbg_chn(&ch.name()) { - info!("handle_ioc_query_result {res:?}"); + info!("handle_ioc_query_result {:?}", res); } if let Some(chst) = self.channel_states.get_mut(&ch) { if let ChannelStateValue::Active(ast) = &mut chst.value { if let ActiveChannelState::WithStatusSeriesId(st2) = ast { if let Some(addr) = res.addr { self.mett.ioc_addr_found().inc(); - trace!("ioc found {res:?}"); + trace!("ioc found {:?}", res); let cmd = ChannelAddWithAddr { ch_cfg: chst.config.clone(), addr: SocketAddr::V4(addr), @@ -943,7 +943,7 @@ impl CaConnSet { self.handle_add_channel_with_addr(cmd)?; } else { self.mett.ioc_addr_not_found().inc(); - trace!("ioc not found {res:?}"); + trace!("ioc not found {:?}", res); let since = SystemTime::now(); st2.inner = WithStatusSeriesIdStateInner::UnknownAddress { since }; } @@ -1058,12 +1058,12 @@ impl CaConnSet { } fn apply_ca_conn_health_update(&mut self, addr: SocketAddr, res: ChannelStatusPartial) -> Result<(), Error> { - trace_health_update!("apply_ca_conn_health_update {addr}"); + trace_health_update!("apply_ca_conn_health_update {}", addr); let tsnow = SystemTime::now(); let mut rogue_channel_count = 0; for (k, v) in res.channel_statuses { trace_health_update!("self.rogue_channel_count {}", rogue_channel_count); - trace_health_update!("apply_ca_conn_health_update {k:?} {v:?}"); + trace_health_update!("apply_ca_conn_health_update {:?} {:?}", k, v); let ch = if let Some(x) = self.channel_by_cssid.get(&k) { x } else { @@ -1146,7 +1146,7 @@ impl CaConnSet { self.await_ca_conn_jhs.push_back((addr, e.jh)); } else { self.mett.ca_conn_eos_unexpected().inc(); - warn!("end-of-stream received for non-existent CaConn {addr}"); + warn!("end-of-stream received for non-existent CaConn {}", addr); } { use EndOfStreamReason::*; @@ -1156,7 +1156,7 @@ impl CaConnSet { self.handle_connect_fail(addr)? } Error(e) => { - warn!("received error {addr} {e}"); + warn!("received error {} {}", addr, e); self.handle_connect_fail(addr)? } ConnectRefused => self.handle_connect_fail(addr)?, @@ -1175,7 +1175,7 @@ impl CaConnSet { } fn handle_ca_conn_channel_removed(&mut self, addr: SocketAddr, name: String) -> Result<(), Error> { - debug!("handle_ca_conn_channel_removed {addr} {name}"); + debug!("handle_ca_conn_channel_removed {} {}", addr, name); let name = ChannelName::new(name); if let Some(st1) = self.channel_states.get_mut(&name) { match &mut st1.value { @@ -1332,7 +1332,7 @@ impl CaConnSet { .await?; } Err(e) => { - error!("ca_conn_item_merge received from inner: {e}"); + error!("ca_conn_item_merge received from inner: {}", e); } } Ok(()) @@ -1351,7 +1351,7 @@ impl CaConnSet { // let e = Error::with_msg_no_trace(format!("CaConn delivered already eos {addr} {x:?}")); // error!("{e}"); // return Err(e); - warn!("CaConn {addr} EOS reason [{x:?}] after [{eos_reason:?}]"); + warn!("CaConn {} EOS reason [{:?}] after [{:?}]", addr, x, eos_reason); } match item.value { CaConnEventValue::None @@ -1425,7 +1425,7 @@ impl CaConnSet { WithStatusSeriesIdStateInner::AddrSearchPending { since } => { let dt = stnow.duration_since(*since).unwrap_or(Duration::ZERO); if dt > SEARCH_PENDING_TIMEOUT { - info!("should receive some error indication instead of timeout for {ch:?}"); + info!("should receive some error indication instead of timeout for {:?}", ch); st3.inner = WithStatusSeriesIdStateInner::NoAddress { since: stnow }; search_pending_count -= 1; } @@ -1455,9 +1455,9 @@ impl CaConnSet { } if st4.updated + CHANNEL_HEALTH_TIMEOUT < stnow { channel_health_timeout_reached += 1; - trace!("health timeout channel {ch:?} ~~~~~~~~~~~~~~~~~~~"); + trace!("health timeout channel {:?} ~~~~~~~~~~~~~~~~~~~", ch); // TODO - error!("TODO health timeout channel {ch:?} ~~~~~~~~~~~~~~~~~~~"); + error!("TODO health timeout channel {:?} ~~~~~~~~~~~~~~~~~~~", ch); if true { std::process::exit(1); } diff --git a/netfetch/src/metrics.rs b/netfetch/src/metrics.rs index f245ba5..7d547aa 100644 --- a/netfetch/src/metrics.rs +++ b/netfetch/src/metrics.rs @@ -628,7 +628,7 @@ pub async fn metrics_service( shutdown_signal: Receiver, rres: Arc, ) -> Result<(), Error> { - info!("metrics service start {bind_to}"); + info!("metrics service start {}", bind_to); let addr: SocketAddr = bind_to.parse().map_err(Error::from_string)?; let router = make_routes(rres, dcom, connset_cmd_tx, stats_set).into_make_service(); let listener = TcpListener::bind(addr).await?; diff --git a/scywr/src/schema.rs b/scywr/src/schema.rs index 66f8053..a5a831d 100644 --- a/scywr/src/schema.rs +++ b/scywr/src/schema.rs @@ -55,7 +55,7 @@ impl Changeset { fn log_statements(&self) { for q in &self.todo { - info!("would execute:\n{q}\n"); + info!("would execute:\n{}\n", q); } } } @@ -540,10 +540,10 @@ async fn migrate_scylla_data_schema( ), ks, rf, durable ); - info!("scylla create keyspace {cql}"); + info!("scylla create keyspace {}", cql); chs.add_todo(cql); } else { - info!("scylla has keyspace {ks}"); + info!("scylla has keyspace {}", ks); } check_event_tables(ks, rett.clone(), chs, scy).await?; diff --git a/scywr/src/tools.rs b/scywr/src/tools.rs index 7d96031..69cf210 100644 --- a/scywr/src/tools.rs +++ b/scywr/src/tools.rs @@ -32,7 +32,7 @@ pub async fn list_pkey(scylla_conf: &ScyllaIngestConfig) -> Result<(), Error> { .await? .rows_stream::<(i64, i64)>()?; while let Some((pulse_a_token, pulse_a)) = it.try_next().await? { - info!("pulse_a_token {pulse_a_token} pulse_a {pulse_a}"); + info!("pulse_a_token {} pulse_a {}", pulse_a_token, pulse_a); pulse_a_max = pulse_a_max.max(pulse_a); } if t2 == i64::MAX { @@ -42,7 +42,7 @@ pub async fn list_pkey(scylla_conf: &ScyllaIngestConfig) -> Result<(), Error> { t1 = t2 + 1; } } - info!("pulse_a_max {pulse_a_max}"); + info!("pulse_a_max {}", pulse_a_max); Ok(()) } @@ -62,7 +62,10 @@ pub async fn list_pulses(scylla_conf: &ScyllaIngestConfig) -> Result<(), Error> .await? .rows_stream::<(i64, i32, i32, i64)>()?; while let Some((tsa_token, tsa, tsb, pulse)) = it.try_next().await? { - info!("tsa_token {tsa_token:21} tsa {tsa:12} tsb {tsb:12} pulse {pulse:21}"); + info!( + "tsa_token {:21} tsa {:12} tsb {:12} pulse {:21}", + tsa_token, tsa, tsb, pulse + ); } if t2 == i64::MAX { info!("end of token range"); @@ -92,7 +95,10 @@ pub async fn fetch_events(backend: &str, channel: &str, scylla_conf: &ScyllaInge .rows_stream::<(i64, i32, i32, i64)>()?; while let Some((tsa_token, tsa, tsb, pulse)) = it.try_next().await? { if false { - info!("tsa_token {tsa_token:21} tsa {tsa:12} tsb {tsb:12} pulse {pulse:21}"); + info!( + "tsa_token {:21} tsa {:12} tsb {:12} pulse {:21}", + tsa_token, tsa, tsb, pulse + ); } rowcnt += 1; } diff --git a/serieswriter/src/binwriter.rs b/serieswriter/src/binwriter.rs index b564fd0..eb81ba1 100644 --- a/serieswriter/src/binwriter.rs +++ b/serieswriter/src/binwriter.rs @@ -1,3 +1,6 @@ +#[cfg(test)] +mod test; + use crate::log; use crate::rtwriter::MinQuiets; use items_0::timebin::BinnedBinsTimeweightTrait; @@ -158,6 +161,7 @@ impl BinWriter { const DUR_MAX: DtMs = DtMs::from_ms_u64(1000 * 60 * 60 * 24 * 123); let rts = [RetentionTime::Short, RetentionTime::Medium, RetentionTime::Long]; let quiets = [min_quiets.st.clone(), min_quiets.mt.clone(), min_quiets.lt.clone()]; + let cnt_zero_disable = WriteCntZero::Disable; let mut binner_1st = None; let mut binner_others = Vec::new(); let mut has_monitor = None; @@ -170,7 +174,7 @@ impl BinWriter { } }) .filter(|x| x.1 > DUR_ZERO && x.1 < DUR_MAX) - .map(|x| (x.0, bin_len_clamp(x.1), WriteCntZero::Disable)) + .map(|x| (x.0, bin_len_clamp(x.1), cnt_zero_disable.clone())) .collect(); let has_monitor = has_monitor; debug_init!(trd, "has_monitor {:?} is_polled {:?}", has_monitor, is_polled); @@ -186,27 +190,39 @@ impl BinWriter { combs.push((RetentionTime::Long, PrebinnedPartitioning::Day1, WriteCntZero::Enable)); } _ => { - combs.push((RetentionTime::Long, PrebinnedPartitioning::Hour1, WriteCntZero::Disable)); + combs.push(( + RetentionTime::Long, + PrebinnedPartitioning::Hour1, + cnt_zero_disable.clone(), + )); combs.push((RetentionTime::Long, PrebinnedPartitioning::Day1, WriteCntZero::Enable)); } } } else { match &has_monitor { Some(RetentionTime::Short) => { - combs.push((RetentionTime::Short, PrebinnedPartitioning::Min1, WriteCntZero::Disable)); + combs.push(( + RetentionTime::Short, + PrebinnedPartitioning::Min1, + cnt_zero_disable.clone(), + )); combs.push(( RetentionTime::Medium, PrebinnedPartitioning::Hour1, - WriteCntZero::Disable, + cnt_zero_disable.clone(), )); combs.push((RetentionTime::Long, PrebinnedPartitioning::Day1, WriteCntZero::Enable)); } Some(RetentionTime::Medium) => { - combs.push((RetentionTime::Short, PrebinnedPartitioning::Min1, WriteCntZero::Disable)); + combs.push(( + RetentionTime::Short, + PrebinnedPartitioning::Min1, + cnt_zero_disable.clone(), + )); combs.push(( RetentionTime::Medium, PrebinnedPartitioning::Hour1, - WriteCntZero::Disable, + cnt_zero_disable.clone(), )); combs.push((RetentionTime::Long, PrebinnedPartitioning::Day1, WriteCntZero::Enable)); } @@ -214,9 +230,13 @@ impl BinWriter { combs.push(( RetentionTime::Medium, PrebinnedPartitioning::Min1, - WriteCntZero::Disable, + cnt_zero_disable.clone(), + )); + combs.push(( + RetentionTime::Long, + PrebinnedPartitioning::Hour1, + cnt_zero_disable.clone(), )); - combs.push((RetentionTime::Long, PrebinnedPartitioning::Hour1, WriteCntZero::Disable)); combs.push((RetentionTime::Long, PrebinnedPartitioning::Day1, WriteCntZero::Enable)); } None => { @@ -458,6 +478,8 @@ impl BinWriter { } let bins_len = bins.len(); for (ts1, ts2, cnt, min, max, avg, lst, fnl) in bins.zip_iter_2() { + eprintln!("cnt {}", cnt); + info!("cnt {}", cnt); let bin_len = DtMs::from_ms_u64(ts2.delta(ts1).ms_u64()); if fnl == false { info!("non final bin {:?}", series); diff --git a/serieswriter/src/binwriter/test.rs b/serieswriter/src/binwriter/test.rs new file mode 100644 index 0000000..e26d928 --- /dev/null +++ b/serieswriter/src/binwriter/test.rs @@ -0,0 +1,43 @@ +use super::BinWriter; +use crate::rtwriter::MinQuiets; +use netpod::ScalarType; +use netpod::Shape; +use netpod::TsNano; +use scywr::insertqueues::InsertDeques; +use series::ChannelStatusSeriesId; +use series::SeriesId; + +fn sec(sec: f32) -> TsNano { + TsNano::from_ms((1e3 * sec) as u64) +} + +#[test] +fn binwriter_nest01_00() { + let beg = TsNano::from_ms(1000 * 40); + let min_quiets = MinQuiets::test_1_10_60(); + let is_polled = false; + let cssid = ChannelStatusSeriesId::new(50); + let sid = SeriesId::new(51); + let scalar_type = ScalarType::F32; + let shape = Shape::Scalar; + let chname2 = String::from("DUMMY"); + let mut iqdqs = InsertDeques::new(); + let mut binwriter = BinWriter::new(beg, min_quiets, is_polled, cssid, sid, scalar_type, shape, chname2).unwrap(); + binwriter.ingest(sec(39.9), 2., &mut iqdqs).unwrap(); + binwriter.ingest(sec(40.0), 2., &mut iqdqs).unwrap(); + binwriter.ingest(sec(40.1), 2., &mut iqdqs).unwrap(); + binwriter.ingest(sec(50.0), 2., &mut iqdqs).unwrap(); + binwriter.ingest(sec(60.0), 2., &mut iqdqs).unwrap(); + // binwriter.ingest(sec(70.0), 2., &mut iqdqs).unwrap(); + binwriter.tick(&mut iqdqs).unwrap(); + eprintln!("iqdqs summary {}", iqdqs.summary()); + for x in iqdqs.st_rf3_qu { + eprintln!("ST {:?}", x); + } + for x in iqdqs.mt_rf3_qu { + eprintln!("MT {:?}", x); + } + for x in iqdqs.lt_rf3_qu { + eprintln!("LT {:?}", x); + } +} diff --git a/serieswriter/src/rtwriter.rs b/serieswriter/src/rtwriter.rs index c83cadc..b8ad59b 100644 --- a/serieswriter/src/rtwriter.rs +++ b/serieswriter/src/rtwriter.rs @@ -32,6 +32,24 @@ pub struct MinQuiets { pub lt: Duration, } +impl MinQuiets { + pub fn test_mon_1_10() -> Self { + Self { + st: Duration::from_millis(0), + mt: Duration::from_millis(1000 * 1), + lt: Duration::from_millis(1000 * 10), + } + } + + pub fn test_1_10_60() -> Self { + Self { + st: Duration::from_millis(1000 * 1), + mt: Duration::from_millis(1000 * 10), + lt: Duration::from_millis(1000 * 60), + } + } +} + #[derive(Debug, Serialize)] struct State where