diff --git a/daqingest/src/bin/daqingest.rs b/daqingest/src/bin/daqingest.rs index acc52bd..ba60fcf 100644 --- a/daqingest/src/bin/daqingest.rs +++ b/daqingest/src/bin/daqingest.rs @@ -68,6 +68,12 @@ async fn main_run_inner(opts: DaqIngestOpts) -> Result<(), Error> { .await .map_err(Error::from_string)?; } + DbDataSub::FindOlder(params) => { + info!("FindOlder {:?} {:?}", pgconf, scyconf); + daqingest::tools::find_older_msp(u.backend, params, &pgconf, &scyconf) + .await + .map_err(Error::from_string)?; + } } } } diff --git a/daqingest/src/opts.rs b/daqingest/src/opts.rs index 3966f9c..7bf9f9d 100644 --- a/daqingest/src/opts.rs +++ b/daqingest/src/opts.rs @@ -130,10 +130,23 @@ pub struct DbData { #[derive(Debug, clap::Parser)] pub enum DbDataSub { RemoveOlder(RemoveOlder), + FindOlder(FindOlder), } #[derive(Debug, clap::Parser)] pub struct RemoveOlder { #[arg(long)] pub date: String, + #[arg(long)] + pub channel_regex: String, +} + +#[derive(Debug, clap::Parser)] +pub struct FindOlder { + #[arg(long)] + pub date: String, + #[arg(long)] + pub table_name: String, + #[arg(long)] + pub slices: u32, } diff --git a/daqingest/src/tools.rs b/daqingest/src/tools.rs index cb1b3dd..e8f1d91 100644 --- a/daqingest/src/tools.rs +++ b/daqingest/src/tools.rs @@ -1,3 +1,4 @@ +use crate::opts::FindOlder; use crate::opts::RemoveOlder; use chrono::DateTime; use chrono::Utc; @@ -7,7 +8,10 @@ use err::ThisError; use futures_util::StreamExt; use log::*; use netpod::Database; +use netpod::ScalarType; +use netpod::Shape; use scywr::config::ScyllaIngestConfig; +use scywr::scylla::prepared_statement::PreparedStatement; use scywr::scylla::transport::errors::QueryError; use scywr::scylla::transport::iterator::NextRowError; use scywr::session::ScySession; @@ -19,7 +23,9 @@ pub enum Error { ScyllaSession(#[from] scywr::session::Error), ScyllaQuery(#[from] QueryError), ScyllaNextRowError(#[from] NextRowError), + ScyllaSchema(#[from] scywr::schema::Error), ParseError(String), + InvalidValue, } pub async fn remove_older( @@ -28,46 +34,164 @@ pub async fn remove_older( pgconf: &Database, scyconf: &ScyllaIngestConfig, ) -> Result<(), Error> { - let channel_regex = "^TEST:SLOW:SCALAR:.+:00000[0-9]$"; - let date_cut: DateTime = params - .date - .parse() - .map_err(|_| Error::ParseError(format!("can not parse {:?}", params.date)))?; - let epoch: DateTime = "1970-01-01T00:00:00Z".parse().unwrap(); - let dt_epoch = date_cut.signed_duration_since(epoch); - let ts_cut = dt_epoch.num_seconds() as u64 * 1000000000; + scywr::schema::migrate_scylla_data_schema(scyconf, netpod::ttl::RetentionTime::Short).await?; + let date_cut = parse_date_str(¶ms.date)?; + let ts_cut = date_to_ts_ns(date_cut); debug!("chosen date is {:?} {}", date_cut, ts_cut); let (pg, _) = dbpg::conn::make_pg_client(pgconf).await?; let scy = scywr::session::create_session(scyconf).await?; let sql = concat!( - "select series, channel from series_by_channel", + "select series, channel, scalar_type, shape_dims", + " from series_by_channel", " where facility = $1 and kind = 2 and channel ~ $2" ); - let rows = pg.query(sql, &[&backend, &channel_regex]).await?; + let rows = pg.query(sql, &[&backend, ¶ms.channel_regex]).await?; for row in rows { let series: i64 = row.get(0); let channel: String = row.get(1); + let scalar_type: i32 = row.get(2); + let shape_dims: Vec = row.get(3); let series = series as u64; + let scalar_type = ScalarType::from_scylla_i32(scalar_type).map_err(|_| Error::InvalidValue)?; + let shape = Shape::from_scylla_shape_dims(&shape_dims).map_err(|_| Error::InvalidValue)?; debug!("care about {} {}", channel, series); - remove_older_series(series, ts_cut, &pg, &scy).await?; + remove_older_series(series, &scalar_type, &shape, ts_cut, &pg, &scy).await?; } Ok(()) } -async fn remove_older_series(series: u64, ts_cut: u64, _pg: &PgClient, scy: &ScySession) -> Result<(), Error> { +async fn remove_older_series( + series: u64, + scalar_type: &ScalarType, + shape: &Shape, + ts_cut: u64, + _pg: &PgClient, + scy: &ScySession, +) -> Result<(), Error> { + let table_name = table_name_from_type(scalar_type, shape); + let cql = format!(concat!("delete from {} where series = ? and ts_msp = ?",), table_name); + let qu_delete: PreparedStatement = scy.prepare(cql).await?; let it = scy .query_iter( "select ts_msp from ts_msp where series = ? and ts_msp < ?", - &(series as i64, ts_cut as i64), + (series as i64, ts_cut as i64), ) .await?; type RowType = (i64,); - let mut it = it.into_typed::().take(10); + let mut it = it.into_typed::(); while let Some(e) = it.next().await { let row = e?; let ts_msp = row.0 as u64; debug!("remove ts_msp {}", ts_msp); - // TODO must know scalar type and shape (at least scalar or wave) to select the correct scylla table + let res = scy.execute(&qu_delete, (series as i64, ts_msp as i64)).await?; + { + // informative + if let Some(rows) = res.rows { + debug!("rows returned {}", rows.len()); + for row in rows { + debug!("{:?}", row.columns); + } + } else { + // debug!("delete no rows returned"); + } + } } Ok(()) } + +pub async fn find_older_msp( + backend: String, + params: FindOlder, + pgconf: &Database, + scyconf: &ScyllaIngestConfig, +) -> Result<(), Error> { + let date_cut = parse_date_str(¶ms.date)?; + let ts_cut = date_to_ts_ns(date_cut); + debug!("chosen date is {:?} {}", date_cut, ts_cut); + let (pg, _) = dbpg::conn::make_pg_client(pgconf).await?; + let scy = scywr::session::create_session(scyconf).await?; + let table_name = ¶ms.table_name; + let cql = format!( + concat!( + "select distinct series, ts_msp from {}", + " where token(series, ts_msp) >= ? and token(series, ts_msp) <= ?", + ), + table_name + ); + let min = i64::MIN; + let max = i64::MAX; + let mut trbeg = min; + let d = (u64::MAX / params.slices as u64) as i64; + let qu = scy.prepare(cql).await?; + loop { + let trend = if trbeg < max - d { trbeg + d } else { max }; + let mut it = scy + .execute_iter(qu.clone(), (trbeg, trend)) + .await? + .into_typed::<(i64, i64)>(); + let mut c = 0; + while let Some(u) = it.next().await { + let row = u?; + let series = row.0 as u64; + let ts_msp = row.1 as u64; + if series == 9033627543553833740 { + debug!("found series {} ts_msp {}", series, ts_msp); + } + c += 1; + } + let pct = trend - min; + debug!("query {:6} {:016x} {:016x} had {:5} rows", pct / d, trbeg, trend, c); + if trend == max { + break; + } + trbeg += d; + } + Ok(()) +} + +fn table_name_from_type(scalar_type: &ScalarType, shape: &Shape) -> &'static str { + match shape { + Shape::Scalar => match scalar_type { + ScalarType::U8 => todo!(), + ScalarType::U16 => todo!(), + ScalarType::U32 => todo!(), + ScalarType::U64 => todo!(), + ScalarType::I8 => todo!(), + ScalarType::I16 => todo!(), + ScalarType::I32 => todo!(), + ScalarType::I64 => todo!(), + ScalarType::F32 => "events_scalar_f32", + ScalarType::F64 => todo!(), + ScalarType::BOOL => todo!(), + ScalarType::STRING => todo!(), + ScalarType::ChannelStatus => todo!(), + }, + Shape::Wave(_) => match scalar_type { + ScalarType::U8 => todo!(), + ScalarType::U16 => todo!(), + ScalarType::U32 => todo!(), + ScalarType::U64 => todo!(), + ScalarType::I8 => todo!(), + ScalarType::I16 => todo!(), + ScalarType::I32 => todo!(), + ScalarType::I64 => todo!(), + ScalarType::F32 => "events_array_f32", + ScalarType::F64 => todo!(), + ScalarType::BOOL => todo!(), + ScalarType::STRING => todo!(), + ScalarType::ChannelStatus => todo!(), + }, + Shape::Image(_, _) => todo!(), + } +} + +fn parse_date_str(inp: &str) -> Result, Error> { + inp.parse() + .map_err(|_| Error::ParseError(format!("can not parse {:?}", inp))) +} + +fn date_to_ts_ns(date: DateTime) -> u64 { + let epoch: DateTime = "1970-01-01T00:00:00Z".parse().unwrap(); + let dt_epoch = date.signed_duration_since(epoch); + dt_epoch.num_seconds() as u64 * 1000000000 +} diff --git a/scywr/src/schema.rs b/scywr/src/schema.rs index 85868b8..75f8d0c 100644 --- a/scywr/src/schema.rs +++ b/scywr/src/schema.rs @@ -412,6 +412,18 @@ pub async fn migrate_scylla_data_schema(scyconf: &ScyllaIngestConfig, rett: Rete } check_event_tables(rett.clone(), scy).await?; + { + let tab = GenTwcsTab::new( + rett.table_prefix(), + "ts_msp_ms", + &[("series", "bigint"), ("ts_msp_ms", "bigint")], + ["series"], + ["ts_msp_ms"], + rett.ttl_ts_msp() / 40, + rett.ttl_ts_msp(), + ); + tab.create_if_missing(scy).await?; + } { let tab = GenTwcsTab::new( rett.table_prefix(),