From 2c856f5c1f490876342d4165546c17322f319449 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 9 Jul 2024 14:30:43 +0200 Subject: [PATCH] Delete of old event data --- Cargo.toml | 2 +- daqingest/src/bin/daqingest.rs | 6 + daqingest/src/opts.rs | 7 + daqingest/src/tools.rs | 194 +++++++++++++++++++++++++++ dbpg/src/err.rs | 1 + dbpg/src/findaddr.rs | 1 + dbpg/src/pool.rs | 1 + dbpg/src/schema.rs | 1 + dbpg/src/seriesbychannel.rs | 1 + dbpg/src/seriesid.rs | 1 + dbpg/src/testerr.rs | 1 + ingest-linux/Cargo.toml | 3 + ingest-linux/src/signal.rs | 1 + netfetch/src/ca/beacons.rs | 1 + netfetch/src/ca/conn.rs | 18 ++- netfetch/src/ca/conn/enumfetch.rs | 1 + netfetch/src/ca/proto.rs | 1 + netfetch/src/metrics/delete.rs | 1 + netfetch/src/metrics/ingest.rs | 1 + netfetch/src/metrics/postingest.rs | 1 + scywr/src/access.rs | 1 + scywr/src/insertqueues.rs | 1 + scywr/src/iteminsertqueue.rs | 1 + scywr/src/schema.rs | 34 ++++- scywr/src/senderpolling.rs | 2 + scywr/src/session.rs | 1 + scywr/src/store.rs | 1 + serieswriter/src/binwriter.rs | 1 + serieswriter/src/establish_worker.rs | 1 + serieswriter/src/rtwriter.rs | 1 + serieswriter/src/timebin.rs | 1 + serieswriter/src/writer.rs | 1 + stats_types/src/stats_types.rs | 1 + 33 files changed, 278 insertions(+), 13 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 16f7b42..db4bf9a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,4 +12,4 @@ codegen-units = 64 incremental = true [patch.crates-io] -thiserror = { git = "https://github.com/dominikwerder/thiserror.git" } +thiserror = { git = "https://github.com/dominikwerder/thiserror.git", branch = "cstm" } diff --git a/daqingest/src/bin/daqingest.rs b/daqingest/src/bin/daqingest.rs index 06d82a0..a22f5e3 100644 --- a/daqingest/src/bin/daqingest.rs +++ b/daqingest/src/bin/daqingest.rs @@ -72,6 +72,12 @@ async fn main_run_inner(opts: DaqIngestOpts) -> Result<(), Error> { .await .map_err(Error::from_string)?; } + DbDataSub::RemoveOlderAll(params) => { + info!("RemoveOlderAll {:?} {:?}", params, scyconf); + daqingest::tools::remove_older_all(params, &scyconf) + .await + .map_err(Error::from_string)?; + } DbDataSub::FindOlder(params) => { info!("FindOlder {:?} {:?}", pgconf, scyconf); daqingest::tools::find_older_msp(u.backend, params, &pgconf, &scyconf) diff --git a/daqingest/src/opts.rs b/daqingest/src/opts.rs index 9a53288..d349a7f 100644 --- a/daqingest/src/opts.rs +++ b/daqingest/src/opts.rs @@ -131,6 +131,7 @@ pub struct DbData { #[derive(Debug, clap::Parser)] pub enum DbDataSub { RemoveOlder(RemoveOlder), + RemoveOlderAll(RemoveOlderAll), FindOlder(FindOlder), } @@ -142,6 +143,12 @@ pub struct RemoveOlder { pub channel_regex: String, } +#[derive(Debug, clap::Parser)] +pub struct RemoveOlderAll { + #[arg(long)] + pub date: String, +} + #[derive(Debug, clap::Parser)] pub struct FindOlder { #[arg(long)] diff --git a/daqingest/src/tools.rs b/daqingest/src/tools.rs index 8b1ce83..879f435 100644 --- a/daqingest/src/tools.rs +++ b/daqingest/src/tools.rs @@ -1,12 +1,16 @@ use crate::opts::FindOlder; use crate::opts::RemoveOlder; +use crate::opts::RemoveOlderAll; use chrono::DateTime; use chrono::Utc; use dbpg::conn::PgClient; use err::thiserror; use err::ThisError; +use futures_util::future; +use futures_util::stream; use futures_util::StreamExt; use log::*; +use netpod::ttl::RetentionTime; use netpod::Database; use netpod::ScalarType; use netpod::Shape; @@ -16,8 +20,14 @@ use scywr::scylla::prepared_statement::PreparedStatement; use scywr::scylla::transport::errors::QueryError; use scywr::scylla::transport::iterator::NextRowError; use scywr::session::ScySession; +use series::SeriesId; +use std::collections::VecDeque; +use std::sync::Arc; +use std::time::Duration; +use std::time::Instant; #[derive(Debug, ThisError)] +#[cstm(name = "DaqingestTools")] pub enum Error { PgConn(#[from] dbpg::err::Error), Postgres(#[from] dbpg::postgres::Error), @@ -99,6 +109,190 @@ async fn remove_older_series( Ok(()) } +struct Stmts { + qu_select_series: Arc, + qu_select_msp: PreparedStatement, + qu_delete: Vec, +} + +impl Stmts { + async fn new(ks: &str, rt: RetentionTime, scy: &ScySession) -> Result { + let cql = format!("select distinct series from {}.{}{}", ks, rt.table_prefix(), "ts_msp"); + let mut qu_select_series = scy.prepare(cql).await?; + qu_select_series.set_page_size(10000); + let qu_select_series = Arc::new(qu_select_series); + let cql = format!( + concat!("select ts_msp from {}.{}{} where series = ?"), + ks, + rt.table_prefix(), + "ts_msp" + ); + let mut qu_select_msp = scy.prepare(cql).await?; + qu_select_msp.set_page_size(10000); + let mut qu_delete = Vec::new(); + let tynames = [ + "u8", "u16", "u32", "u64", "i8", "i16", "i32", "i64", "f32", "f64", "bool", "string", + ]; + let shapenames = ["scalar", "array"]; + for shn in &shapenames { + for tyn in &tynames { + let qu = scy + .prepare(format!( + "delete from {}.{}events_{}_{} where series = ? and ts_msp = ?", + ks, + rt.table_prefix(), + shn, + tyn + )) + .await?; + qu_delete.push(qu); + } + } + for shn in &["scalar"] { + for tyn in &["enum"] { + let qu = scy + .prepare(format!( + "delete from {}.{}events_{}_{} where series = ? and ts_msp = ?", + ks, + rt.table_prefix(), + shn, + tyn + )) + .await?; + qu_delete.push(qu); + } + } + let ret = Self { + qu_select_series, + qu_select_msp, + qu_delete, + }; + Ok(ret) + } +} + +pub async fn remove_older_all(params: RemoveOlderAll, scyconf: &ScyllaIngestConfig) -> Result<(), Error> { + let date_cut = parse_date_str(¶ms.date)?; + let ts_cut = TsMs::from_ns_u64(date_to_ts_ns(date_cut)); + debug!("chosen date is {:?} {:?}", date_cut, ts_cut); + let scy = scywr::session::create_session(scyconf).await?; + let ks = scyconf.keyspace(); + for rt in [RetentionTime::Short] { + remove_older_all_rt(ts_cut, ks, rt, &scy).await?; + } + Ok(()) +} + +pub async fn remove_older_all_rt(ts_cut: TsMs, ks: &str, rt: RetentionTime, scy: &ScySession) -> Result<(), Error> { + let stmts = Stmts::new(ks, rt.clone(), &scy).await?; + type RowType = (i64,); + let it = scy.execute_iter(stmts.qu_select_series.as_ref().clone(), ()).await?; + let mut it = it.into_typed::(); + let mut series_ids = Vec::with_capacity(1000000); + let print_dt = Duration::from_millis(2000); + let mut print_next = Instant::now() + print_dt; + while let Some(e) = it.next().await { + let row = e?; + let series = SeriesId::new(row.0 as u64); + series_ids.push(series); + let tsnow = Instant::now(); + if print_next <= tsnow { + print_next = tsnow + print_dt; + info!("found so far {}", series_ids.len()); + } + if series_ids.len() > 50000000000 { + break; + } + } + info!("found {} series", series_ids.len()); + let mut print_next = Instant::now() + print_dt; + for (i, series) in series_ids.iter().enumerate() { + remove_older_all_series(ts_cut, series.clone(), &stmts, &scy).await?; + let tsnow = Instant::now(); + if print_next <= tsnow { + print_next = tsnow + print_dt; + let frac = i as f32 / series_ids.len() as f32; + info!("removed so far {:8} of {:8} {:.4}", i, series_ids.len(), frac); + } + } + Ok(()) +} + +async fn remove_older_all_series(ts_cut: TsMs, series: SeriesId, stmts: &Stmts, scy: &ScySession) -> Result<(), Error> { + type RowType = (i64,); + let ts1 = Instant::now(); + let mut it = scy + .execute_iter(stmts.qu_select_msp.clone(), (series.to_i64(),)) + .await? + .into_typed::(); + let mut msp_last = 0; + let mut to_remove = Vec::new(); + let mut n_keep = 0; + let mut n_remove = 0; + let ts2 = Instant::now(); + while let Some(e) = it.next().await { + let row = e?; + let msp = row.0 as u64; + if msp < msp_last { + panic!("msp ordering error {:?}", series); + } + if msp <= ts_cut.0 && msp_last != 0 { + // info!("remove {:?} {:?}", series, msp_last); + n_remove += 1; + to_remove.push(msp_last); + } else { + // info!("keep {:?} {:?}", series, msp_last); + n_keep += 1; + } + msp_last = msp; + } + let ts3 = Instant::now(); + if n_remove != 0 { + let frac = n_remove as f32 / (n_keep + n_remove) as f32; + remove_older_all_series_msps(series, to_remove, stmts, scy).await?; + let ts4 = Instant::now(); + let dt2 = ts2.saturating_duration_since(ts1); + let dt3 = ts3.saturating_duration_since(ts2); + let dt4 = ts4.saturating_duration_since(ts3); + info!( + "{:4.0} {:4.0} {:4.0} n_keep {:7} n_remove {:7} {:.4} {:?}", + 1e3 * dt2.as_secs_f32(), + 1e3 * dt3.as_secs_f32(), + 1e3 * dt4.as_secs_f32(), + n_keep, + n_remove, + frac, + series, + ); + } + Ok(()) +} + +async fn remove_older_all_series_msps( + series: SeriesId, + msps: Vec, + stmts: &Stmts, + scy: &ScySession, +) -> Result<(), Error> { + for stmt in &stmts.qu_delete { + stream::iter(msps.clone()) + .map(|msp| async move { + let stmt = stmt.clone(); + scy.execute(&stmt, (series.to_i64(), msp as i64)).await + }) + .buffer_unordered(32) + .take_while(|x| { + if let Err(e) = &x { + error!("{e}"); + } + future::ready(x.is_ok()) + }) + .fold(0, |_, _| future::ready(0i32)) + .await; + } + Ok(()) +} + pub async fn find_older_msp( _backend: String, params: FindOlder, diff --git a/dbpg/src/err.rs b/dbpg/src/err.rs index 23f8f83..a922d4e 100644 --- a/dbpg/src/err.rs +++ b/dbpg/src/err.rs @@ -2,6 +2,7 @@ use err::thiserror; use err::ThisError; #[derive(Debug, ThisError)] +#[cstm(name = "Postgres")] pub enum Error { Postgres(#[from] tokio_postgres::Error), } diff --git a/dbpg/src/findaddr.rs b/dbpg/src/findaddr.rs index 7447bb5..a46e387 100644 --- a/dbpg/src/findaddr.rs +++ b/dbpg/src/findaddr.rs @@ -5,6 +5,7 @@ use log::*; use std::net::SocketAddrV4; #[derive(Debug, ThisError)] +#[cstm(name = "PgFindAddr")] pub enum Error { Postgres(#[from] tokio_postgres::Error), IocAddrNotFound, diff --git a/dbpg/src/pool.rs b/dbpg/src/pool.rs index 2d84f16..9e5f26f 100644 --- a/dbpg/src/pool.rs +++ b/dbpg/src/pool.rs @@ -9,6 +9,7 @@ use log::*; use netpod::Database; #[derive(Debug, ThisError)] +#[cstm(name = "PgPool")] pub enum Error { Postgres(#[from] tokio_postgres::Error), EndOfPool, diff --git a/dbpg/src/schema.rs b/dbpg/src/schema.rs index c155e09..0f4b0f5 100644 --- a/dbpg/src/schema.rs +++ b/dbpg/src/schema.rs @@ -4,6 +4,7 @@ use err::ThisError; use log::*; #[derive(Debug, ThisError)] +#[cstm(name = "PgSchema")] pub enum Error { Postgres(#[from] tokio_postgres::Error), LogicError(String), diff --git a/dbpg/src/seriesbychannel.rs b/dbpg/src/seriesbychannel.rs index c13b711..f1b8b9c 100644 --- a/dbpg/src/seriesbychannel.rs +++ b/dbpg/src/seriesbychannel.rs @@ -43,6 +43,7 @@ macro_rules! trace3 { } #[derive(Debug, ThisError)] +#[cstm(name = "PgSeries")] pub enum Error { Postgres(#[from] tokio_postgres::Error), CreateSeriesFail, diff --git a/dbpg/src/seriesid.rs b/dbpg/src/seriesid.rs index bbc9f9b..4d911f9 100644 --- a/dbpg/src/seriesid.rs +++ b/dbpg/src/seriesid.rs @@ -3,6 +3,7 @@ use err::ThisError; // TODO still needed? #[derive(Debug, ThisError)] +#[cstm(name = "PgSeriesId")] pub enum Error { Postgres(#[from] tokio_postgres::Error), IocAddrNotFound, diff --git a/dbpg/src/testerr.rs b/dbpg/src/testerr.rs index 12e9409..9eca904 100644 --- a/dbpg/src/testerr.rs +++ b/dbpg/src/testerr.rs @@ -16,6 +16,7 @@ impl fmt::Display for TestError { impl std::error::Error for TestError {} #[derive(Debug, ThisError)] +#[cstm(name = "PgTestErr")] enum Error { Postgres(#[from] tokio_postgres::Error), Dummy(#[from] TestError), diff --git a/ingest-linux/Cargo.toml b/ingest-linux/Cargo.toml index c7f5ab0..f8ea59d 100644 --- a/ingest-linux/Cargo.toml +++ b/ingest-linux/Cargo.toml @@ -9,3 +9,6 @@ libc = "0.2" thiserror = "=0.0.1" log = { path = "../log" } taskrun = { path = "../../daqbuffer/crates/taskrun" } + +[patch.crates-io] +thiserror = { git = "https://github.com/dominikwerder/thiserror.git", branch = "cstm" } diff --git a/ingest-linux/src/signal.rs b/ingest-linux/src/signal.rs index 95fb22f..70bacb0 100644 --- a/ingest-linux/src/signal.rs +++ b/ingest-linux/src/signal.rs @@ -3,6 +3,7 @@ use std::mem::MaybeUninit; use thiserror::Error; #[derive(Debug, Error)] +#[cstm(name = "LinuxSignal")] pub enum Error { SignalHandlerSet, SignalHandlerUnset, diff --git a/netfetch/src/ca/beacons.rs b/netfetch/src/ca/beacons.rs index 10c5864..0e5774f 100644 --- a/netfetch/src/ca/beacons.rs +++ b/netfetch/src/ca/beacons.rs @@ -17,6 +17,7 @@ use std::time::SystemTime; use taskrun::tokio::net::UdpSocket; #[derive(Debug, ThisError)] +#[cstm(name = "NetfetchBeacons")] pub enum Error { Io(#[from] std::io::Error), SeriesWriter(#[from] serieswriter::writer::Error), diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 5a8c42d..d7d2937 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -143,6 +143,7 @@ fn dbg_chn_cid(cid: Cid, conn: &CaConn) -> bool { } #[derive(Debug, ThisError)] +#[cstm(name = "NetfetchConn")] pub enum Error { NoProtocol, ProtocolError, @@ -264,6 +265,12 @@ mod ser_instant { #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] struct Cid(pub u32); +impl fmt::Display for Cid { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "Cid({})", self.0) + } +} + #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] struct Subid(pub u32); @@ -1819,16 +1826,17 @@ impl CaConn { CaDataScalarValue::I32(x) => ScalarValue::I32(x), CaDataScalarValue::F32(x) => ScalarValue::F32(x), CaDataScalarValue::F64(x) => ScalarValue::F64(x), - CaDataScalarValue::Enum(x) => ScalarValue::Enum( - x, - crst.enum_str_table.as_ref().map_or_else( + CaDataScalarValue::Enum(x) => ScalarValue::Enum(x, { + let conv = crst.enum_str_table.as_ref().map_or_else( || String::from("missingstrings"), |map| { map.get(x as usize) .map_or_else(|| String::from("undefined"), String::from) }, - ), - ), + ); + info!("convert_event_data {} {:?}", crst.name(), conv); + conv + }), CaDataScalarValue::String(x) => ScalarValue::String(x), CaDataScalarValue::Bool(x) => ScalarValue::Bool(x), } diff --git a/netfetch/src/ca/conn/enumfetch.rs b/netfetch/src/ca/conn/enumfetch.rs index c5c9734..5e09de1 100644 --- a/netfetch/src/ca/conn/enumfetch.rs +++ b/netfetch/src/ca/conn/enumfetch.rs @@ -11,6 +11,7 @@ use std::pin::Pin; use std::time::Instant; #[derive(Debug, ThisError)] +#[cstm(name = "NetfetchEnumfetch")] pub enum Error { MissingState, } diff --git a/netfetch/src/ca/proto.rs b/netfetch/src/ca/proto.rs index 0455b2c..437f90c 100644 --- a/netfetch/src/ca/proto.rs +++ b/netfetch/src/ca/proto.rs @@ -20,6 +20,7 @@ use tokio::io::AsyncWrite; use tokio::io::ReadBuf; #[derive(Debug, ThisError)] +#[cstm(name = "NetfetchCaProto")] pub enum Error { NetBuf(#[from] netbuf::Error), SlideBuf(#[from] slidebuf::Error), diff --git a/netfetch/src/metrics/delete.rs b/netfetch/src/metrics/delete.rs index ff49dc2..6f80164 100644 --- a/netfetch/src/metrics/delete.rs +++ b/netfetch/src/metrics/delete.rs @@ -46,6 +46,7 @@ macro_rules! debug_cql { } #[derive(Debug, ThisError)] +#[cstm(name = "HttpDelete")] pub enum Error { Logic, MissingRetentionTime, diff --git a/netfetch/src/metrics/ingest.rs b/netfetch/src/metrics/ingest.rs index fcbd0fe..b81e335 100644 --- a/netfetch/src/metrics/ingest.rs +++ b/netfetch/src/metrics/ingest.rs @@ -64,6 +64,7 @@ macro_rules! trace_queues { } #[derive(Debug, ThisError)] +#[cstm(name = "MetricsIngest")] pub enum Error { UnsupportedContentType, Logic, diff --git a/netfetch/src/metrics/postingest.rs b/netfetch/src/metrics/postingest.rs index 94c3927..4cb6d59 100644 --- a/netfetch/src/metrics/postingest.rs +++ b/netfetch/src/metrics/postingest.rs @@ -19,6 +19,7 @@ use std::time::Instant; use std::time::SystemTime; #[derive(Debug, ThisError)] +#[cstm(name = "HttpPostingest")] pub enum Error { Msg, SeriesWriter(#[from] serieswriter::writer::Error), diff --git a/scywr/src/access.rs b/scywr/src/access.rs index 4eda2cc..e97de18 100644 --- a/scywr/src/access.rs +++ b/scywr/src/access.rs @@ -4,6 +4,7 @@ use scylla::transport::errors::DbError; use scylla::transport::errors::QueryError; #[derive(Debug, ThisError)] +#[cstm(name = "ScyllaAccess")] pub enum Error { DbError(#[from] DbError), QueryError(#[from] QueryError), diff --git a/scywr/src/insertqueues.rs b/scywr/src/insertqueues.rs index 92d964e..a5f20cd 100644 --- a/scywr/src/insertqueues.rs +++ b/scywr/src/insertqueues.rs @@ -14,6 +14,7 @@ use std::collections::VecDeque; use std::pin::Pin; #[derive(Debug, ThisError)] +#[cstm(name = "ScyllaInsertQueue")] pub enum Error { QueuePush, #[error("ChannelSend({0}, {1})")] diff --git a/scywr/src/iteminsertqueue.rs b/scywr/src/iteminsertqueue.rs index 037da08..0f6318b 100644 --- a/scywr/src/iteminsertqueue.rs +++ b/scywr/src/iteminsertqueue.rs @@ -36,6 +36,7 @@ use std::task::Poll; use std::time::SystemTime; #[derive(Debug, ThisError)] +#[cstm(name = "ScyllaItemInsertQueue")] pub enum Error { DbTimeout, DbOverload, diff --git a/scywr/src/schema.rs b/scywr/src/schema.rs index cbd9592..4b5e099 100644 --- a/scywr/src/schema.rs +++ b/scywr/src/schema.rs @@ -14,6 +14,7 @@ use std::fmt; use std::time::Duration; #[derive(Debug, ThisError)] +#[cstm(name = "ScyllaSchema")] pub enum Error { NoKeyspaceChosen, Fmt(#[from] fmt::Error), @@ -268,14 +269,29 @@ impl GenTwcsTab { } if let Some(row) = rows.get(0) { let mut set_opts = Vec::new(); + info!( + "{:20} vs {:20} {:20} {:20}", + row.0, + self.default_time_to_live.as_secs(), + self.keyspace, + self.name, + ); if row.0 != self.default_time_to_live.as_secs() { - set_opts.push(format!( - "default_time_to_live = {}", - self.default_time_to_live.as_secs() - )); + if false { + set_opts.push(format!( + "default_time_to_live = {}", + self.default_time_to_live.as_secs() + )); + } else { + info!("mismatch default_time_to_live"); + } } if row.1 != self.gc_grace.as_secs() { - set_opts.push(format!("gc_grace_seconds = {}", self.gc_grace.as_secs())); + if false { + set_opts.push(format!("gc_grace_seconds = {}", self.gc_grace.as_secs())); + } else { + info!("mismatch gc_grace_seconds"); + } } if row.2 != self.compaction_options() { let params: Vec<_> = self @@ -284,11 +300,15 @@ impl GenTwcsTab { .map(|(k, v)| format!("'{k}': '{v}'")) .collect(); let params = params.join(", "); - set_opts.push(format!("compaction = {{ {} }}", params)); + if false { + set_opts.push(format!("compaction = {{ {} }}", params)); + } else { + info!("mismatch compaction"); + } } if set_opts.len() != 0 { let cql = format!(concat!("alter table {} with {}"), self.name(), set_opts.join(" and ")); - debug!("{cql}"); + info!("{cql}"); scy.query(cql, ()).await?; } } else { diff --git a/scywr/src/senderpolling.rs b/scywr/src/senderpolling.rs index ec82728..9312d82 100644 --- a/scywr/src/senderpolling.rs +++ b/scywr/src/senderpolling.rs @@ -12,8 +12,10 @@ use std::task::Poll; use thiserror::Error; #[derive(Debug, Error)] +#[cstm(name = "SenderPolling")] pub enum Error { NoSendInProgress, + #[error("Closed")] Closed(T), } diff --git a/scywr/src/session.rs b/scywr/src/session.rs index b66491d..d6b6695 100644 --- a/scywr/src/session.rs +++ b/scywr/src/session.rs @@ -10,6 +10,7 @@ use scylla::transport::errors::NewSessionError; use std::sync::Arc; #[derive(Debug, ThisError)] +#[cstm(name = "ScyllaSession")] pub enum Error { NewSession(String), } diff --git a/scywr/src/store.rs b/scywr/src/store.rs index 9953d27..4e015d5 100644 --- a/scywr/src/store.rs +++ b/scywr/src/store.rs @@ -10,6 +10,7 @@ use scylla::Session as ScySession; use std::sync::Arc; #[derive(Debug, ThisError)] +#[cstm(name = "ScyllaStore")] pub enum Error { NewSessionError(#[from] NewSessionError), QueryError(#[from] QueryError), diff --git a/serieswriter/src/binwriter.rs b/serieswriter/src/binwriter.rs index 2b0ab59..e1b5776 100644 --- a/serieswriter/src/binwriter.rs +++ b/serieswriter/src/binwriter.rs @@ -28,6 +28,7 @@ macro_rules! trace_binning { } #[derive(Debug, ThisError)] +#[cstm(name = "SerieswriterBinwriter")] pub enum Error { SeriesLookupError, SeriesWriter(#[from] crate::writer::Error), diff --git a/serieswriter/src/establish_worker.rs b/serieswriter/src/establish_worker.rs index a2f8489..6e6f0da 100644 --- a/serieswriter/src/establish_worker.rs +++ b/serieswriter/src/establish_worker.rs @@ -26,6 +26,7 @@ use std::time::Duration; use std::time::SystemTime; #[derive(Debug, ThisError)] +#[cstm(name = "SerieswriterEstablishWorker")] pub enum Error { Postgres(#[from] dbpg::err::Error), PostgresSchema(#[from] dbpg::schema::Error), diff --git a/serieswriter/src/rtwriter.rs b/serieswriter/src/rtwriter.rs index d249fb5..eebd198 100644 --- a/serieswriter/src/rtwriter.rs +++ b/serieswriter/src/rtwriter.rs @@ -28,6 +28,7 @@ macro_rules! trace_rt_decision { } #[derive(Debug, ThisError)] +#[cstm(name = "SerieswriterRtwriter")] pub enum Error { SeriesLookupError, SeriesWriter(#[from] crate::writer::Error), diff --git a/serieswriter/src/timebin.rs b/serieswriter/src/timebin.rs index 96599f2..87388b7 100644 --- a/serieswriter/src/timebin.rs +++ b/serieswriter/src/timebin.rs @@ -78,6 +78,7 @@ macro_rules! trace_push { } #[derive(Debug, ThisError)] +#[cstm(name = "SerieswriterTimebin")] pub enum Error { UnexpectedContainer, PatchWithoutBins, diff --git a/serieswriter/src/writer.rs b/serieswriter/src/writer.rs index c123fb8..85dbb93 100644 --- a/serieswriter/src/writer.rs +++ b/serieswriter/src/writer.rs @@ -18,6 +18,7 @@ use std::collections::VecDeque; use std::time::SystemTime; #[derive(Debug, ThisError)] +#[cstm(name = "SerieswriterWriter")] pub enum Error { DbPgSid(#[from] dbpg::seriesid::Error), ChannelSendError, diff --git a/stats_types/src/stats_types.rs b/stats_types/src/stats_types.rs index 4eb8487..ed444f3 100644 --- a/stats_types/src/stats_types.rs +++ b/stats_types/src/stats_types.rs @@ -84,6 +84,7 @@ pub trait DropMark { fn field(&self) -> &Value; } +#[allow(unused)] pub struct DropGuard<'a> { mark: &'a Value, }