Delete of old event data

This commit is contained in:
Dominik Werder
2024-07-09 14:30:43 +02:00
parent 54008718eb
commit 2c856f5c1f
33 changed files with 278 additions and 13 deletions

View File

@@ -12,4 +12,4 @@ codegen-units = 64
incremental = true
[patch.crates-io]
thiserror = { git = "https://github.com/dominikwerder/thiserror.git" }
thiserror = { git = "https://github.com/dominikwerder/thiserror.git", branch = "cstm" }

View File

@@ -72,6 +72,12 @@ async fn main_run_inner(opts: DaqIngestOpts) -> Result<(), Error> {
.await
.map_err(Error::from_string)?;
}
DbDataSub::RemoveOlderAll(params) => {
info!("RemoveOlderAll {:?} {:?}", params, scyconf);
daqingest::tools::remove_older_all(params, &scyconf)
.await
.map_err(Error::from_string)?;
}
DbDataSub::FindOlder(params) => {
info!("FindOlder {:?} {:?}", pgconf, scyconf);
daqingest::tools::find_older_msp(u.backend, params, &pgconf, &scyconf)

View File

@@ -131,6 +131,7 @@ pub struct DbData {
#[derive(Debug, clap::Parser)]
pub enum DbDataSub {
RemoveOlder(RemoveOlder),
RemoveOlderAll(RemoveOlderAll),
FindOlder(FindOlder),
}
@@ -142,6 +143,12 @@ pub struct RemoveOlder {
pub channel_regex: String,
}
#[derive(Debug, clap::Parser)]
pub struct RemoveOlderAll {
#[arg(long)]
pub date: String,
}
#[derive(Debug, clap::Parser)]
pub struct FindOlder {
#[arg(long)]

View File

@@ -1,12 +1,16 @@
use crate::opts::FindOlder;
use crate::opts::RemoveOlder;
use crate::opts::RemoveOlderAll;
use chrono::DateTime;
use chrono::Utc;
use dbpg::conn::PgClient;
use err::thiserror;
use err::ThisError;
use futures_util::future;
use futures_util::stream;
use futures_util::StreamExt;
use log::*;
use netpod::ttl::RetentionTime;
use netpod::Database;
use netpod::ScalarType;
use netpod::Shape;
@@ -16,8 +20,14 @@ use scywr::scylla::prepared_statement::PreparedStatement;
use scywr::scylla::transport::errors::QueryError;
use scywr::scylla::transport::iterator::NextRowError;
use scywr::session::ScySession;
use series::SeriesId;
use std::collections::VecDeque;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
#[derive(Debug, ThisError)]
#[cstm(name = "DaqingestTools")]
pub enum Error {
PgConn(#[from] dbpg::err::Error),
Postgres(#[from] dbpg::postgres::Error),
@@ -99,6 +109,190 @@ async fn remove_older_series(
Ok(())
}
struct Stmts {
qu_select_series: Arc<PreparedStatement>,
qu_select_msp: PreparedStatement,
qu_delete: Vec<PreparedStatement>,
}
impl Stmts {
async fn new(ks: &str, rt: RetentionTime, scy: &ScySession) -> Result<Self, Error> {
let cql = format!("select distinct series from {}.{}{}", ks, rt.table_prefix(), "ts_msp");
let mut qu_select_series = scy.prepare(cql).await?;
qu_select_series.set_page_size(10000);
let qu_select_series = Arc::new(qu_select_series);
let cql = format!(
concat!("select ts_msp from {}.{}{} where series = ?"),
ks,
rt.table_prefix(),
"ts_msp"
);
let mut qu_select_msp = scy.prepare(cql).await?;
qu_select_msp.set_page_size(10000);
let mut qu_delete = Vec::new();
let tynames = [
"u8", "u16", "u32", "u64", "i8", "i16", "i32", "i64", "f32", "f64", "bool", "string",
];
let shapenames = ["scalar", "array"];
for shn in &shapenames {
for tyn in &tynames {
let qu = scy
.prepare(format!(
"delete from {}.{}events_{}_{} where series = ? and ts_msp = ?",
ks,
rt.table_prefix(),
shn,
tyn
))
.await?;
qu_delete.push(qu);
}
}
for shn in &["scalar"] {
for tyn in &["enum"] {
let qu = scy
.prepare(format!(
"delete from {}.{}events_{}_{} where series = ? and ts_msp = ?",
ks,
rt.table_prefix(),
shn,
tyn
))
.await?;
qu_delete.push(qu);
}
}
let ret = Self {
qu_select_series,
qu_select_msp,
qu_delete,
};
Ok(ret)
}
}
pub async fn remove_older_all(params: RemoveOlderAll, scyconf: &ScyllaIngestConfig) -> Result<(), Error> {
let date_cut = parse_date_str(&params.date)?;
let ts_cut = TsMs::from_ns_u64(date_to_ts_ns(date_cut));
debug!("chosen date is {:?} {:?}", date_cut, ts_cut);
let scy = scywr::session::create_session(scyconf).await?;
let ks = scyconf.keyspace();
for rt in [RetentionTime::Short] {
remove_older_all_rt(ts_cut, ks, rt, &scy).await?;
}
Ok(())
}
pub async fn remove_older_all_rt(ts_cut: TsMs, ks: &str, rt: RetentionTime, scy: &ScySession) -> Result<(), Error> {
let stmts = Stmts::new(ks, rt.clone(), &scy).await?;
type RowType = (i64,);
let it = scy.execute_iter(stmts.qu_select_series.as_ref().clone(), ()).await?;
let mut it = it.into_typed::<RowType>();
let mut series_ids = Vec::with_capacity(1000000);
let print_dt = Duration::from_millis(2000);
let mut print_next = Instant::now() + print_dt;
while let Some(e) = it.next().await {
let row = e?;
let series = SeriesId::new(row.0 as u64);
series_ids.push(series);
let tsnow = Instant::now();
if print_next <= tsnow {
print_next = tsnow + print_dt;
info!("found so far {}", series_ids.len());
}
if series_ids.len() > 50000000000 {
break;
}
}
info!("found {} series", series_ids.len());
let mut print_next = Instant::now() + print_dt;
for (i, series) in series_ids.iter().enumerate() {
remove_older_all_series(ts_cut, series.clone(), &stmts, &scy).await?;
let tsnow = Instant::now();
if print_next <= tsnow {
print_next = tsnow + print_dt;
let frac = i as f32 / series_ids.len() as f32;
info!("removed so far {:8} of {:8} {:.4}", i, series_ids.len(), frac);
}
}
Ok(())
}
async fn remove_older_all_series(ts_cut: TsMs, series: SeriesId, stmts: &Stmts, scy: &ScySession) -> Result<(), Error> {
type RowType = (i64,);
let ts1 = Instant::now();
let mut it = scy
.execute_iter(stmts.qu_select_msp.clone(), (series.to_i64(),))
.await?
.into_typed::<RowType>();
let mut msp_last = 0;
let mut to_remove = Vec::new();
let mut n_keep = 0;
let mut n_remove = 0;
let ts2 = Instant::now();
while let Some(e) = it.next().await {
let row = e?;
let msp = row.0 as u64;
if msp < msp_last {
panic!("msp ordering error {:?}", series);
}
if msp <= ts_cut.0 && msp_last != 0 {
// info!("remove {:?} {:?}", series, msp_last);
n_remove += 1;
to_remove.push(msp_last);
} else {
// info!("keep {:?} {:?}", series, msp_last);
n_keep += 1;
}
msp_last = msp;
}
let ts3 = Instant::now();
if n_remove != 0 {
let frac = n_remove as f32 / (n_keep + n_remove) as f32;
remove_older_all_series_msps(series, to_remove, stmts, scy).await?;
let ts4 = Instant::now();
let dt2 = ts2.saturating_duration_since(ts1);
let dt3 = ts3.saturating_duration_since(ts2);
let dt4 = ts4.saturating_duration_since(ts3);
info!(
"{:4.0} {:4.0} {:4.0} n_keep {:7} n_remove {:7} {:.4} {:?}",
1e3 * dt2.as_secs_f32(),
1e3 * dt3.as_secs_f32(),
1e3 * dt4.as_secs_f32(),
n_keep,
n_remove,
frac,
series,
);
}
Ok(())
}
async fn remove_older_all_series_msps(
series: SeriesId,
msps: Vec<u64>,
stmts: &Stmts,
scy: &ScySession,
) -> Result<(), Error> {
for stmt in &stmts.qu_delete {
stream::iter(msps.clone())
.map(|msp| async move {
let stmt = stmt.clone();
scy.execute(&stmt, (series.to_i64(), msp as i64)).await
})
.buffer_unordered(32)
.take_while(|x| {
if let Err(e) = &x {
error!("{e}");
}
future::ready(x.is_ok())
})
.fold(0, |_, _| future::ready(0i32))
.await;
}
Ok(())
}
pub async fn find_older_msp(
_backend: String,
params: FindOlder,

View File

@@ -2,6 +2,7 @@ use err::thiserror;
use err::ThisError;
#[derive(Debug, ThisError)]
#[cstm(name = "Postgres")]
pub enum Error {
Postgres(#[from] tokio_postgres::Error),
}

View File

@@ -5,6 +5,7 @@ use log::*;
use std::net::SocketAddrV4;
#[derive(Debug, ThisError)]
#[cstm(name = "PgFindAddr")]
pub enum Error {
Postgres(#[from] tokio_postgres::Error),
IocAddrNotFound,

View File

@@ -9,6 +9,7 @@ use log::*;
use netpod::Database;
#[derive(Debug, ThisError)]
#[cstm(name = "PgPool")]
pub enum Error {
Postgres(#[from] tokio_postgres::Error),
EndOfPool,

View File

@@ -4,6 +4,7 @@ use err::ThisError;
use log::*;
#[derive(Debug, ThisError)]
#[cstm(name = "PgSchema")]
pub enum Error {
Postgres(#[from] tokio_postgres::Error),
LogicError(String),

View File

@@ -43,6 +43,7 @@ macro_rules! trace3 {
}
#[derive(Debug, ThisError)]
#[cstm(name = "PgSeries")]
pub enum Error {
Postgres(#[from] tokio_postgres::Error),
CreateSeriesFail,

View File

@@ -3,6 +3,7 @@ use err::ThisError;
// TODO still needed?
#[derive(Debug, ThisError)]
#[cstm(name = "PgSeriesId")]
pub enum Error {
Postgres(#[from] tokio_postgres::Error),
IocAddrNotFound,

View File

@@ -16,6 +16,7 @@ impl fmt::Display for TestError {
impl std::error::Error for TestError {}
#[derive(Debug, ThisError)]
#[cstm(name = "PgTestErr")]
enum Error {
Postgres(#[from] tokio_postgres::Error),
Dummy(#[from] TestError),

View File

@@ -9,3 +9,6 @@ libc = "0.2"
thiserror = "=0.0.1"
log = { path = "../log" }
taskrun = { path = "../../daqbuffer/crates/taskrun" }
[patch.crates-io]
thiserror = { git = "https://github.com/dominikwerder/thiserror.git", branch = "cstm" }

View File

@@ -3,6 +3,7 @@ use std::mem::MaybeUninit;
use thiserror::Error;
#[derive(Debug, Error)]
#[cstm(name = "LinuxSignal")]
pub enum Error {
SignalHandlerSet,
SignalHandlerUnset,

View File

@@ -17,6 +17,7 @@ use std::time::SystemTime;
use taskrun::tokio::net::UdpSocket;
#[derive(Debug, ThisError)]
#[cstm(name = "NetfetchBeacons")]
pub enum Error {
Io(#[from] std::io::Error),
SeriesWriter(#[from] serieswriter::writer::Error),

View File

@@ -143,6 +143,7 @@ fn dbg_chn_cid(cid: Cid, conn: &CaConn) -> bool {
}
#[derive(Debug, ThisError)]
#[cstm(name = "NetfetchConn")]
pub enum Error {
NoProtocol,
ProtocolError,
@@ -264,6 +265,12 @@ mod ser_instant {
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
struct Cid(pub u32);
impl fmt::Display for Cid {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "Cid({})", self.0)
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
struct Subid(pub u32);
@@ -1819,16 +1826,17 @@ impl CaConn {
CaDataScalarValue::I32(x) => ScalarValue::I32(x),
CaDataScalarValue::F32(x) => ScalarValue::F32(x),
CaDataScalarValue::F64(x) => ScalarValue::F64(x),
CaDataScalarValue::Enum(x) => ScalarValue::Enum(
x,
crst.enum_str_table.as_ref().map_or_else(
CaDataScalarValue::Enum(x) => ScalarValue::Enum(x, {
let conv = crst.enum_str_table.as_ref().map_or_else(
|| String::from("missingstrings"),
|map| {
map.get(x as usize)
.map_or_else(|| String::from("undefined"), String::from)
},
),
),
);
info!("convert_event_data {} {:?}", crst.name(), conv);
conv
}),
CaDataScalarValue::String(x) => ScalarValue::String(x),
CaDataScalarValue::Bool(x) => ScalarValue::Bool(x),
}

View File

@@ -11,6 +11,7 @@ use std::pin::Pin;
use std::time::Instant;
#[derive(Debug, ThisError)]
#[cstm(name = "NetfetchEnumfetch")]
pub enum Error {
MissingState,
}

View File

@@ -20,6 +20,7 @@ use tokio::io::AsyncWrite;
use tokio::io::ReadBuf;
#[derive(Debug, ThisError)]
#[cstm(name = "NetfetchCaProto")]
pub enum Error {
NetBuf(#[from] netbuf::Error),
SlideBuf(#[from] slidebuf::Error),

View File

@@ -46,6 +46,7 @@ macro_rules! debug_cql {
}
#[derive(Debug, ThisError)]
#[cstm(name = "HttpDelete")]
pub enum Error {
Logic,
MissingRetentionTime,

View File

@@ -64,6 +64,7 @@ macro_rules! trace_queues {
}
#[derive(Debug, ThisError)]
#[cstm(name = "MetricsIngest")]
pub enum Error {
UnsupportedContentType,
Logic,

View File

@@ -19,6 +19,7 @@ use std::time::Instant;
use std::time::SystemTime;
#[derive(Debug, ThisError)]
#[cstm(name = "HttpPostingest")]
pub enum Error {
Msg,
SeriesWriter(#[from] serieswriter::writer::Error),

View File

@@ -4,6 +4,7 @@ use scylla::transport::errors::DbError;
use scylla::transport::errors::QueryError;
#[derive(Debug, ThisError)]
#[cstm(name = "ScyllaAccess")]
pub enum Error {
DbError(#[from] DbError),
QueryError(#[from] QueryError),

View File

@@ -14,6 +14,7 @@ use std::collections::VecDeque;
use std::pin::Pin;
#[derive(Debug, ThisError)]
#[cstm(name = "ScyllaInsertQueue")]
pub enum Error {
QueuePush,
#[error("ChannelSend({0}, {1})")]

View File

@@ -36,6 +36,7 @@ use std::task::Poll;
use std::time::SystemTime;
#[derive(Debug, ThisError)]
#[cstm(name = "ScyllaItemInsertQueue")]
pub enum Error {
DbTimeout,
DbOverload,

View File

@@ -14,6 +14,7 @@ use std::fmt;
use std::time::Duration;
#[derive(Debug, ThisError)]
#[cstm(name = "ScyllaSchema")]
pub enum Error {
NoKeyspaceChosen,
Fmt(#[from] fmt::Error),
@@ -268,14 +269,29 @@ impl GenTwcsTab {
}
if let Some(row) = rows.get(0) {
let mut set_opts = Vec::new();
info!(
"{:20} vs {:20} {:20} {:20}",
row.0,
self.default_time_to_live.as_secs(),
self.keyspace,
self.name,
);
if row.0 != self.default_time_to_live.as_secs() {
set_opts.push(format!(
"default_time_to_live = {}",
self.default_time_to_live.as_secs()
));
if false {
set_opts.push(format!(
"default_time_to_live = {}",
self.default_time_to_live.as_secs()
));
} else {
info!("mismatch default_time_to_live");
}
}
if row.1 != self.gc_grace.as_secs() {
set_opts.push(format!("gc_grace_seconds = {}", self.gc_grace.as_secs()));
if false {
set_opts.push(format!("gc_grace_seconds = {}", self.gc_grace.as_secs()));
} else {
info!("mismatch gc_grace_seconds");
}
}
if row.2 != self.compaction_options() {
let params: Vec<_> = self
@@ -284,11 +300,15 @@ impl GenTwcsTab {
.map(|(k, v)| format!("'{k}': '{v}'"))
.collect();
let params = params.join(", ");
set_opts.push(format!("compaction = {{ {} }}", params));
if false {
set_opts.push(format!("compaction = {{ {} }}", params));
} else {
info!("mismatch compaction");
}
}
if set_opts.len() != 0 {
let cql = format!(concat!("alter table {} with {}"), self.name(), set_opts.join(" and "));
debug!("{cql}");
info!("{cql}");
scy.query(cql, ()).await?;
}
} else {

View File

@@ -12,8 +12,10 @@ use std::task::Poll;
use thiserror::Error;
#[derive(Debug, Error)]
#[cstm(name = "SenderPolling")]
pub enum Error<T> {
NoSendInProgress,
#[error("Closed")]
Closed(T),
}

View File

@@ -10,6 +10,7 @@ use scylla::transport::errors::NewSessionError;
use std::sync::Arc;
#[derive(Debug, ThisError)]
#[cstm(name = "ScyllaSession")]
pub enum Error {
NewSession(String),
}

View File

@@ -10,6 +10,7 @@ use scylla::Session as ScySession;
use std::sync::Arc;
#[derive(Debug, ThisError)]
#[cstm(name = "ScyllaStore")]
pub enum Error {
NewSessionError(#[from] NewSessionError),
QueryError(#[from] QueryError),

View File

@@ -28,6 +28,7 @@ macro_rules! trace_binning {
}
#[derive(Debug, ThisError)]
#[cstm(name = "SerieswriterBinwriter")]
pub enum Error {
SeriesLookupError,
SeriesWriter(#[from] crate::writer::Error),

View File

@@ -26,6 +26,7 @@ use std::time::Duration;
use std::time::SystemTime;
#[derive(Debug, ThisError)]
#[cstm(name = "SerieswriterEstablishWorker")]
pub enum Error {
Postgres(#[from] dbpg::err::Error),
PostgresSchema(#[from] dbpg::schema::Error),

View File

@@ -28,6 +28,7 @@ macro_rules! trace_rt_decision {
}
#[derive(Debug, ThisError)]
#[cstm(name = "SerieswriterRtwriter")]
pub enum Error {
SeriesLookupError,
SeriesWriter(#[from] crate::writer::Error),

View File

@@ -78,6 +78,7 @@ macro_rules! trace_push {
}
#[derive(Debug, ThisError)]
#[cstm(name = "SerieswriterTimebin")]
pub enum Error {
UnexpectedContainer,
PatchWithoutBins,

View File

@@ -18,6 +18,7 @@ use std::collections::VecDeque;
use std::time::SystemTime;
#[derive(Debug, ThisError)]
#[cstm(name = "SerieswriterWriter")]
pub enum Error {
DbPgSid(#[from] dbpg::seriesid::Error),
ChannelSendError,

View File

@@ -84,6 +84,7 @@ pub trait DropMark {
fn field(&self) -> &Value;
}
#[allow(unused)]
pub struct DropGuard<'a> {
mark: &'a Value,
}