diff --git a/scywr/src/schema.rs b/scywr/src/schema.rs index dc43840..4c7c2b0 100644 --- a/scywr/src/schema.rs +++ b/scywr/src/schema.rs @@ -23,7 +23,6 @@ pub enum Error { ScyllaNextRow(#[from] NextRowError), MissingData, AddColumnImpossible, - Msg(String), BadSchema, } @@ -35,6 +34,57 @@ impl From for Error { } } +struct Changeset { + do_change: bool, + would_do: Vec, + done: Vec, +} + +impl Changeset { + fn new() -> Self { + Self { + do_change: false, + would_do: Vec::new(), + done: Vec::new(), + } + } + + fn with_do_change(self, do_change: bool) -> Self { + let mut x = self; + x.do_change = do_change; + x + } + + fn do_change(&self) -> bool { + self.do_change + } + + fn add_would_do(&mut self, cql: String) { + self.would_do.push(cql); + } + + fn add_done(&mut self, cql: String) { + self.done.push(cql); + } + + fn differs(&self) -> bool { + if self.would_do.len() != 0 { + true + } else { + false + } + } + + fn log_statements(&self) { + for q in &self.done { + info!("DONE {q}"); + } + for q in &self.would_do { + info!("WOULD DO {q}"); + } + } +} + pub async fn has_keyspace(name: &str, scy: &ScySession) -> Result { let cql = "select keyspace_name from system_schema.keyspaces where keyspace_name = ?"; let mut res = scy.query_iter(cql, (name,)).await?; @@ -186,19 +236,24 @@ impl GenTwcsTab { &self.name } - async fn setup(&self, do_change: bool, scy: &ScySession) -> Result<(), Error> { - self.create_if_missing(scy).await?; - self.check_table_options(do_change, scy).await?; - self.check_columns(scy).await?; + async fn setup(&self, chs: &mut Changeset, scy: &ScySession) -> Result<(), Error> { + self.create_if_missing(chs, scy).await?; + self.check_table_options(chs, scy).await?; + self.check_columns(chs, scy).await?; Ok(()) } - async fn create_if_missing(&self, scy: &ScySession) -> Result<(), Error> { + async fn create_if_missing(&self, chs: &mut Changeset, scy: &ScySession) -> Result<(), Error> { // TODO check for more details (all columns, correct types, correct kinds, etc) if !has_table(self.name(), scy).await? { let cql = self.cql(); - info!("scylla create table {} {}", self.name(), cql); - scy.query_unpaged(cql, ()).await?; + if chs.do_change() { + info!("scylla create table {} {}", self.name(), cql); + scy.query_unpaged(cql.clone(), ()).await?; + chs.add_done(cql); + } else { + chs.add_would_do(cql); + } } Ok(()) } @@ -257,8 +312,7 @@ impl GenTwcsTab { map } - async fn check_table_options(&self, do_change: bool, scy: &ScySession) -> Result<(), Error> { - let mut differ = false; + async fn check_table_options(&self, chs: &mut Changeset, scy: &ScySession) -> Result<(), Error> { let cql = concat!( "select default_time_to_live, gc_grace_seconds, compaction", " from system_schema.tables where keyspace_name = ? and table_name = ?" @@ -273,35 +327,13 @@ impl GenTwcsTab { if let Some(row) = rows.get(0) { let mut set_opts = Vec::new(); if row.0 != self.default_time_to_live.as_secs() { - if do_change { - set_opts.push(format!( - "default_time_to_live = {}", - self.default_time_to_live.as_secs() - )); - } else { - error!( - "mismatch default_time_to_live {:10} exp {:10} {} {}", - row.0, - self.default_time_to_live.as_secs(), - self.keyspace, - self.name, - ); - differ = true; - } + set_opts.push(format!( + "default_time_to_live = {}", + self.default_time_to_live.as_secs() + )); } if row.1 != self.gc_grace.as_secs() { - if do_change { - set_opts.push(format!("gc_grace_seconds = {}", self.gc_grace.as_secs())); - } else { - error!( - "mismatch gc_grace_seconds {:10} exp {:10} {} {}", - row.1, - self.gc_grace.as_secs(), - self.keyspace, - self.name, - ); - differ = true; - } + set_opts.push(format!("gc_grace_seconds = {}", self.gc_grace.as_secs())); } if row.2 != self.compaction_options() { let params: Vec<_> = self @@ -310,37 +342,25 @@ impl GenTwcsTab { .map(|(k, v)| format!("'{k}': '{v}'")) .collect(); let params = params.join(", "); - if do_change { - set_opts.push(format!("compaction = {{ {} }}", params)); - } else { - error!( - "mismatch compaction {:?} exp {:?} {} {}", - row.2, - self.compaction_options(), - self.keyspace, - self.name, - ); - differ = true; - } + set_opts.push(format!("compaction = {{ {} }}", params)); } - if do_change { - if set_opts.len() != 0 { - let cql = format!(concat!("alter table {} with {}"), self.name(), set_opts.join(" and ")); - info!("{cql}"); - scy.query_unpaged(cql, ()).await?; + if set_opts.len() != 0 { + let cql = format!(concat!("alter table {} with {}"), self.name(), set_opts.join(" and ")); + if chs.do_change() { + info!("EXECUTE {cql}"); + scy.query_unpaged(cql.clone(), ()).await?; + chs.add_done(cql); + } else { + chs.add_would_do(cql); } } } else { return Err(Error::MissingData); } - if differ { - Err(Error::BadSchema) - } else { - Ok(()) - } + Ok(()) } - async fn check_columns(&self, scy: &ScySession) -> Result<(), Error> { + async fn check_columns(&self, chs: &mut Changeset, scy: &ScySession) -> Result<(), Error> { let cql = concat!( "select column_name, type from system_schema.columns", " where keyspace_name = ?", @@ -381,16 +401,21 @@ impl GenTwcsTab { error!("ck {} {}", cn, ct); return Err(Error::AddColumnImpossible); } - self.add_column(cn, ct, scy).await?; + self.add_column(cn, ct, chs, scy).await?; } } Ok(()) } - async fn add_column(&self, name: &str, ty: &str, scy: &ScySession) -> Result<(), Error> { + async fn add_column(&self, name: &str, ty: &str, chs: &mut Changeset, scy: &ScySession) -> Result<(), Error> { let cql = format!(concat!("alter table {} add {} {}"), self.name(), name, ty); - debug!("NOTE add_column CQL {}", cql); - scy.query_unpaged(cql, ()).await?; + if chs.do_change() { + info!("EXECUTE add_column CQL {}", cql); + scy.query_unpaged(cql.clone(), ()).await?; + chs.add_done(cql); + } else { + chs.add_would_do(cql); + } Ok(()) } } @@ -422,7 +447,7 @@ async fn get_columns(keyspace: &str, table: &str, scy: &ScySession) -> Result Result<(), Error> { let stys = [ @@ -450,7 +475,7 @@ async fn check_event_tables( ["ts_lsp"], rett.ttl_events_d0(), ); - tab.setup(do_change, scy).await?; + tab.setup(chs, scy).await?; } { let tab = GenTwcsTab::new( @@ -470,7 +495,7 @@ async fn check_event_tables( ["ts_lsp"], rett.ttl_events_d1(), ); - tab.setup(do_change, scy).await?; + tab.setup(chs, scy).await?; } } { @@ -489,7 +514,7 @@ async fn check_event_tables( ["ts_lsp"], rett.ttl_events_d1(), ); - tab.setup(do_change, scy).await?; + tab.setup(chs, scy).await?; } { let tab = GenTwcsTab::new( @@ -506,7 +531,7 @@ async fn check_event_tables( ["ts_lsp"], rett.ttl_events_d1(), ); - tab.setup(do_change, scy).await?; + tab.setup(chs, scy).await?; } { let tab = GenTwcsTab::new( @@ -523,7 +548,7 @@ async fn check_event_tables( ["ts_lsp"], rett.ttl_events_d1(), ); - tab.setup(do_change, scy).await?; + tab.setup(chs, scy).await?; } Ok(()) } @@ -533,12 +558,14 @@ pub async fn migrate_scylla_data_schema( rett: RetentionTime, do_change: bool, ) -> Result<(), Error> { + let mut chsv = Changeset::new().with_do_change(do_change); + let chs = &mut chsv; let scy2 = create_session_no_ks(scyconf).await?; let scy = &scy2; let durable = true; if !has_keyspace(scyconf.keyspace(), scy).await? { - if do_change { + if chs.do_change() { // TODO let replication = 3; let cql = format!( @@ -562,7 +589,7 @@ pub async fn migrate_scylla_data_schema( if let Some(ks) = scyconf.keyspace_rf1() { if !has_keyspace(ks, scy).await? { - if do_change { + if chs.do_change() { let replication = 1; let cql = format!( concat!( @@ -588,7 +615,7 @@ pub async fn migrate_scylla_data_schema( scy.use_keyspace(ks, true).await?; - check_event_tables(ks, rett.clone(), do_change, scy).await?; + check_event_tables(ks, rett.clone(), chs, scy).await?; { let tab = GenTwcsTab::new( @@ -600,7 +627,7 @@ pub async fn migrate_scylla_data_schema( ["ts_msp"], rett.ttl_ts_msp(), ); - tab.setup(do_change, scy).await?; + tab.setup(chs, scy).await?; } { let tab = GenTwcsTab::new( @@ -617,7 +644,7 @@ pub async fn migrate_scylla_data_schema( ["ts_lsp"], rett.ttl_channel_status(), ); - tab.setup(do_change, scy).await?; + tab.setup(chs, scy).await?; } { let tab = GenTwcsTab::new( @@ -634,7 +661,7 @@ pub async fn migrate_scylla_data_schema( ["ts_lsp"], rett.ttl_channel_status(), ); - tab.setup(do_change, scy).await?; + tab.setup(chs, scy).await?; } { let tab = GenTwcsTab::new( @@ -651,7 +678,7 @@ pub async fn migrate_scylla_data_schema( ["ts_lsp"], rett.ttl_channel_status(), ); - tab.setup(do_change, scy).await?; + tab.setup(chs, scy).await?; } { let tab = GenTwcsTab::new( @@ -674,8 +701,7 @@ pub async fn migrate_scylla_data_schema( ["off"], rett.ttl_binned(), ); - // let do_change = true; - tab.setup(do_change, scy).await?; + tab.setup(chs, scy).await?; } { let tab = GenTwcsTab::new( @@ -693,7 +719,7 @@ pub async fn migrate_scylla_data_schema( ["series"], rett.ttl_channel_status(), ); - tab.setup(do_change, scy).await?; + tab.setup(chs, scy).await?; } { let tab = GenTwcsTab::new( @@ -711,7 +737,13 @@ pub async fn migrate_scylla_data_schema( ["series"], rett.ttl_channel_status(), ); - tab.setup(do_change, scy).await?; + tab.setup(chs, scy).await?; + } + + if chs.differs() { + chs.log_statements(); + Err(Error::BadSchema) + } else { + Ok(()) } - Ok(()) } diff --git a/serieswriter/src/binwriter.rs b/serieswriter/src/binwriter.rs index 7bab828..efe69b6 100644 --- a/serieswriter/src/binwriter.rs +++ b/serieswriter/src/binwriter.rs @@ -106,7 +106,7 @@ impl BinWriter { let buf = mem::replace(&mut self.evbuf, ContainerEvents::new()); // TODO bin the more fine grid from the coarse grid, do not clone events for writer in self.writers.iter_mut() { - writer.ingest(buf.clone(), iqdqs)?; + writer.ingest(&buf, iqdqs)?; } } else { trace_tick_verbose!("tick NOTHING TO INGEST"); diff --git a/serieswriter/src/binwritergrid.rs b/serieswriter/src/binwritergrid.rs index 0db3625..ede6a99 100644 --- a/serieswriter/src/binwritergrid.rs +++ b/serieswriter/src/binwritergrid.rs @@ -79,7 +79,7 @@ impl BinWriterGrid { self.shape.clone() } - pub fn ingest(&mut self, evs: ContainerEvents, iqdqs: &mut InsertDeques) -> Result<(), Error> { + pub fn ingest(&mut self, evs: &ContainerEvents, iqdqs: &mut InsertDeques) -> Result<(), Error> { let _ = iqdqs; trace_ingest!("{:?} {:?}", self, evs); self.binner.ingest(evs)?;