diff --git a/daqingest/src/bin/daqingest.rs b/daqingest/src/bin/daqingest.rs index 0084aac..acc52bd 100644 --- a/daqingest/src/bin/daqingest.rs +++ b/daqingest/src/bin/daqingest.rs @@ -59,8 +59,16 @@ async fn main_run_inner(opts: DaqIngestOpts) -> Result<(), Error> { }; let scyconf = ScyllaIngestConfig::new([k.scylla_host], k.scylla_keyspace); match k.sub { - DbSub::RemoveOlder(j) => { - info!("RemoveOlder {:?} {:?}", pgconf, scyconf); + DbSub::Data(u) => { + use daqingest::opts::DbDataSub; + match u.sub { + DbDataSub::RemoveOlder(params) => { + info!("RemoveOlder {:?} {:?}", pgconf, scyconf); + daqingest::tools::remove_older(u.backend, params, &pgconf, &scyconf) + .await + .map_err(Error::from_string)?; + } + } } } } diff --git a/daqingest/src/opts.rs b/daqingest/src/opts.rs index 9646cf2..3966f9c 100644 --- a/daqingest/src/opts.rs +++ b/daqingest/src/opts.rs @@ -102,8 +102,7 @@ pub struct Db { pub scylla_keyspace: String, #[arg(long)] pub pg_host: String, - #[arg(long)] - #[clap(default_value = "5432")] + #[arg(long, default_value = "5432")] pub pg_port: u16, #[arg(long)] pub pg_user: String, @@ -117,11 +116,24 @@ pub struct Db { #[derive(Debug, clap::Parser)] pub enum DbSub { + Data(DbData), +} + +#[derive(Debug, clap::Parser)] +pub struct DbData { + #[arg(long)] + pub backend: String, + #[command(subcommand)] + pub sub: DbDataSub, +} + +#[derive(Debug, clap::Parser)] +pub enum DbDataSub { RemoveOlder(RemoveOlder), } #[derive(Debug, clap::Parser)] pub struct RemoveOlder { #[arg(long)] - date: String, + pub date: String, } diff --git a/daqingest/src/tools.rs b/daqingest/src/tools.rs index e69de29..cb1b3dd 100644 --- a/daqingest/src/tools.rs +++ b/daqingest/src/tools.rs @@ -0,0 +1,73 @@ +use crate::opts::RemoveOlder; +use chrono::DateTime; +use chrono::Utc; +use dbpg::conn::PgClient; +use err::thiserror; +use err::ThisError; +use futures_util::StreamExt; +use log::*; +use netpod::Database; +use scywr::config::ScyllaIngestConfig; +use scywr::scylla::transport::errors::QueryError; +use scywr::scylla::transport::iterator::NextRowError; +use scywr::session::ScySession; + +#[derive(Debug, ThisError)] +pub enum Error { + PgConn(#[from] dbpg::err::Error), + Postgres(#[from] dbpg::postgres::Error), + ScyllaSession(#[from] scywr::session::Error), + ScyllaQuery(#[from] QueryError), + ScyllaNextRowError(#[from] NextRowError), + ParseError(String), +} + +pub async fn remove_older( + backend: String, + params: RemoveOlder, + pgconf: &Database, + scyconf: &ScyllaIngestConfig, +) -> Result<(), Error> { + let channel_regex = "^TEST:SLOW:SCALAR:.+:00000[0-9]$"; + let date_cut: DateTime = params + .date + .parse() + .map_err(|_| Error::ParseError(format!("can not parse {:?}", params.date)))?; + let epoch: DateTime = "1970-01-01T00:00:00Z".parse().unwrap(); + let dt_epoch = date_cut.signed_duration_since(epoch); + let ts_cut = dt_epoch.num_seconds() as u64 * 1000000000; + debug!("chosen date is {:?} {}", date_cut, ts_cut); + let (pg, _) = dbpg::conn::make_pg_client(pgconf).await?; + let scy = scywr::session::create_session(scyconf).await?; + let sql = concat!( + "select series, channel from series_by_channel", + " where facility = $1 and kind = 2 and channel ~ $2" + ); + let rows = pg.query(sql, &[&backend, &channel_regex]).await?; + for row in rows { + let series: i64 = row.get(0); + let channel: String = row.get(1); + let series = series as u64; + debug!("care about {} {}", channel, series); + remove_older_series(series, ts_cut, &pg, &scy).await?; + } + Ok(()) +} + +async fn remove_older_series(series: u64, ts_cut: u64, _pg: &PgClient, scy: &ScySession) -> Result<(), Error> { + let it = scy + .query_iter( + "select ts_msp from ts_msp where series = ? and ts_msp < ?", + &(series as i64, ts_cut as i64), + ) + .await?; + type RowType = (i64,); + let mut it = it.into_typed::().take(10); + while let Some(e) = it.next().await { + let row = e?; + let ts_msp = row.0 as u64; + debug!("remove ts_msp {}", ts_msp); + // TODO must know scalar type and shape (at least scalar or wave) to select the correct scylla table + } + Ok(()) +} diff --git a/scywr/src/lib.rs b/scywr/src/lib.rs index 38b04f6..14cf810 100644 --- a/scywr/src/lib.rs +++ b/scywr/src/lib.rs @@ -13,3 +13,5 @@ pub mod schema; pub mod session; pub mod store; pub mod tools; + +pub use scylla; diff --git a/scywr/src/tools.rs b/scywr/src/tools.rs index 9a4eaeb..84a61ef 100644 --- a/scywr/src/tools.rs +++ b/scywr/src/tools.rs @@ -133,3 +133,13 @@ pub async fn fetch_events(backend: &str, channel: &str, scylla_conf: &ScyllaInge } Ok(()) } + +/* +select * from ts_msp where token(series) >= 1500000000000000000 and token(series) < 1600000000000000000 and ts_msp < 1709112680000000000 allow filtering; + +Can also simply: +scan the channel postgres database. +iterate over the series ids. +fetch all msp lower than. +delete them individually. +*/