Improve schema change view, but more todo

This commit is contained in:
Dominik Werder
2024-11-15 16:21:16 +01:00
parent 505ea3c6ac
commit 63770d4903
3 changed files with 116 additions and 84 deletions
+114 -82
View File
@@ -23,7 +23,6 @@ pub enum Error {
ScyllaNextRow(#[from] NextRowError),
MissingData,
AddColumnImpossible,
Msg(String),
BadSchema,
}
@@ -35,6 +34,57 @@ impl From<crate::session::Error> for Error {
}
}
struct Changeset {
do_change: bool,
would_do: Vec<String>,
done: Vec<String>,
}
impl Changeset {
fn new() -> Self {
Self {
do_change: false,
would_do: Vec::new(),
done: Vec::new(),
}
}
fn with_do_change(self, do_change: bool) -> Self {
let mut x = self;
x.do_change = do_change;
x
}
fn do_change(&self) -> bool {
self.do_change
}
fn add_would_do(&mut self, cql: String) {
self.would_do.push(cql);
}
fn add_done(&mut self, cql: String) {
self.done.push(cql);
}
fn differs(&self) -> bool {
if self.would_do.len() != 0 {
true
} else {
false
}
}
fn log_statements(&self) {
for q in &self.done {
info!("DONE {q}");
}
for q in &self.would_do {
info!("WOULD DO {q}");
}
}
}
pub async fn has_keyspace(name: &str, scy: &ScySession) -> Result<bool, Error> {
let cql = "select keyspace_name from system_schema.keyspaces where keyspace_name = ?";
let mut res = scy.query_iter(cql, (name,)).await?;
@@ -186,19 +236,24 @@ impl GenTwcsTab {
&self.name
}
async fn setup(&self, do_change: bool, scy: &ScySession) -> Result<(), Error> {
self.create_if_missing(scy).await?;
self.check_table_options(do_change, scy).await?;
self.check_columns(scy).await?;
async fn setup(&self, chs: &mut Changeset, scy: &ScySession) -> Result<(), Error> {
self.create_if_missing(chs, scy).await?;
self.check_table_options(chs, scy).await?;
self.check_columns(chs, scy).await?;
Ok(())
}
async fn create_if_missing(&self, scy: &ScySession) -> Result<(), Error> {
async fn create_if_missing(&self, chs: &mut Changeset, scy: &ScySession) -> Result<(), Error> {
// TODO check for more details (all columns, correct types, correct kinds, etc)
if !has_table(self.name(), scy).await? {
let cql = self.cql();
info!("scylla create table {} {}", self.name(), cql);
scy.query_unpaged(cql, ()).await?;
if chs.do_change() {
info!("scylla create table {} {}", self.name(), cql);
scy.query_unpaged(cql.clone(), ()).await?;
chs.add_done(cql);
} else {
chs.add_would_do(cql);
}
}
Ok(())
}
@@ -257,8 +312,7 @@ impl GenTwcsTab {
map
}
async fn check_table_options(&self, do_change: bool, scy: &ScySession) -> Result<(), Error> {
let mut differ = false;
async fn check_table_options(&self, chs: &mut Changeset, scy: &ScySession) -> Result<(), Error> {
let cql = concat!(
"select default_time_to_live, gc_grace_seconds, compaction",
" from system_schema.tables where keyspace_name = ? and table_name = ?"
@@ -273,35 +327,13 @@ impl GenTwcsTab {
if let Some(row) = rows.get(0) {
let mut set_opts = Vec::new();
if row.0 != self.default_time_to_live.as_secs() {
if do_change {
set_opts.push(format!(
"default_time_to_live = {}",
self.default_time_to_live.as_secs()
));
} else {
error!(
"mismatch default_time_to_live {:10} exp {:10} {} {}",
row.0,
self.default_time_to_live.as_secs(),
self.keyspace,
self.name,
);
differ = true;
}
set_opts.push(format!(
"default_time_to_live = {}",
self.default_time_to_live.as_secs()
));
}
if row.1 != self.gc_grace.as_secs() {
if do_change {
set_opts.push(format!("gc_grace_seconds = {}", self.gc_grace.as_secs()));
} else {
error!(
"mismatch gc_grace_seconds {:10} exp {:10} {} {}",
row.1,
self.gc_grace.as_secs(),
self.keyspace,
self.name,
);
differ = true;
}
set_opts.push(format!("gc_grace_seconds = {}", self.gc_grace.as_secs()));
}
if row.2 != self.compaction_options() {
let params: Vec<_> = self
@@ -310,37 +342,25 @@ impl GenTwcsTab {
.map(|(k, v)| format!("'{k}': '{v}'"))
.collect();
let params = params.join(", ");
if do_change {
set_opts.push(format!("compaction = {{ {} }}", params));
} else {
error!(
"mismatch compaction {:?} exp {:?} {} {}",
row.2,
self.compaction_options(),
self.keyspace,
self.name,
);
differ = true;
}
set_opts.push(format!("compaction = {{ {} }}", params));
}
if do_change {
if set_opts.len() != 0 {
let cql = format!(concat!("alter table {} with {}"), self.name(), set_opts.join(" and "));
info!("{cql}");
scy.query_unpaged(cql, ()).await?;
if set_opts.len() != 0 {
let cql = format!(concat!("alter table {} with {}"), self.name(), set_opts.join(" and "));
if chs.do_change() {
info!("EXECUTE {cql}");
scy.query_unpaged(cql.clone(), ()).await?;
chs.add_done(cql);
} else {
chs.add_would_do(cql);
}
}
} else {
return Err(Error::MissingData);
}
if differ {
Err(Error::BadSchema)
} else {
Ok(())
}
Ok(())
}
async fn check_columns(&self, scy: &ScySession) -> Result<(), Error> {
async fn check_columns(&self, chs: &mut Changeset, scy: &ScySession) -> Result<(), Error> {
let cql = concat!(
"select column_name, type from system_schema.columns",
" where keyspace_name = ?",
@@ -381,16 +401,21 @@ impl GenTwcsTab {
error!("ck {} {}", cn, ct);
return Err(Error::AddColumnImpossible);
}
self.add_column(cn, ct, scy).await?;
self.add_column(cn, ct, chs, scy).await?;
}
}
Ok(())
}
async fn add_column(&self, name: &str, ty: &str, scy: &ScySession) -> Result<(), Error> {
async fn add_column(&self, name: &str, ty: &str, chs: &mut Changeset, scy: &ScySession) -> Result<(), Error> {
let cql = format!(concat!("alter table {} add {} {}"), self.name(), name, ty);
debug!("NOTE add_column CQL {}", cql);
scy.query_unpaged(cql, ()).await?;
if chs.do_change() {
info!("EXECUTE add_column CQL {}", cql);
scy.query_unpaged(cql.clone(), ()).await?;
chs.add_done(cql);
} else {
chs.add_would_do(cql);
}
Ok(())
}
}
@@ -422,7 +447,7 @@ async fn get_columns(keyspace: &str, table: &str, scy: &ScySession) -> Result<Ve
async fn check_event_tables(
keyspace: &str,
rett: RetentionTime,
do_change: bool,
chs: &mut Changeset,
scy: &ScySession,
) -> Result<(), Error> {
let stys = [
@@ -450,7 +475,7 @@ async fn check_event_tables(
["ts_lsp"],
rett.ttl_events_d0(),
);
tab.setup(do_change, scy).await?;
tab.setup(chs, scy).await?;
}
{
let tab = GenTwcsTab::new(
@@ -470,7 +495,7 @@ async fn check_event_tables(
["ts_lsp"],
rett.ttl_events_d1(),
);
tab.setup(do_change, scy).await?;
tab.setup(chs, scy).await?;
}
}
{
@@ -489,7 +514,7 @@ async fn check_event_tables(
["ts_lsp"],
rett.ttl_events_d1(),
);
tab.setup(do_change, scy).await?;
tab.setup(chs, scy).await?;
}
{
let tab = GenTwcsTab::new(
@@ -506,7 +531,7 @@ async fn check_event_tables(
["ts_lsp"],
rett.ttl_events_d1(),
);
tab.setup(do_change, scy).await?;
tab.setup(chs, scy).await?;
}
{
let tab = GenTwcsTab::new(
@@ -523,7 +548,7 @@ async fn check_event_tables(
["ts_lsp"],
rett.ttl_events_d1(),
);
tab.setup(do_change, scy).await?;
tab.setup(chs, scy).await?;
}
Ok(())
}
@@ -533,12 +558,14 @@ pub async fn migrate_scylla_data_schema(
rett: RetentionTime,
do_change: bool,
) -> Result<(), Error> {
let mut chsv = Changeset::new().with_do_change(do_change);
let chs = &mut chsv;
let scy2 = create_session_no_ks(scyconf).await?;
let scy = &scy2;
let durable = true;
if !has_keyspace(scyconf.keyspace(), scy).await? {
if do_change {
if chs.do_change() {
// TODO
let replication = 3;
let cql = format!(
@@ -562,7 +589,7 @@ pub async fn migrate_scylla_data_schema(
if let Some(ks) = scyconf.keyspace_rf1() {
if !has_keyspace(ks, scy).await? {
if do_change {
if chs.do_change() {
let replication = 1;
let cql = format!(
concat!(
@@ -588,7 +615,7 @@ pub async fn migrate_scylla_data_schema(
scy.use_keyspace(ks, true).await?;
check_event_tables(ks, rett.clone(), do_change, scy).await?;
check_event_tables(ks, rett.clone(), chs, scy).await?;
{
let tab = GenTwcsTab::new(
@@ -600,7 +627,7 @@ pub async fn migrate_scylla_data_schema(
["ts_msp"],
rett.ttl_ts_msp(),
);
tab.setup(do_change, scy).await?;
tab.setup(chs, scy).await?;
}
{
let tab = GenTwcsTab::new(
@@ -617,7 +644,7 @@ pub async fn migrate_scylla_data_schema(
["ts_lsp"],
rett.ttl_channel_status(),
);
tab.setup(do_change, scy).await?;
tab.setup(chs, scy).await?;
}
{
let tab = GenTwcsTab::new(
@@ -634,7 +661,7 @@ pub async fn migrate_scylla_data_schema(
["ts_lsp"],
rett.ttl_channel_status(),
);
tab.setup(do_change, scy).await?;
tab.setup(chs, scy).await?;
}
{
let tab = GenTwcsTab::new(
@@ -651,7 +678,7 @@ pub async fn migrate_scylla_data_schema(
["ts_lsp"],
rett.ttl_channel_status(),
);
tab.setup(do_change, scy).await?;
tab.setup(chs, scy).await?;
}
{
let tab = GenTwcsTab::new(
@@ -674,8 +701,7 @@ pub async fn migrate_scylla_data_schema(
["off"],
rett.ttl_binned(),
);
// let do_change = true;
tab.setup(do_change, scy).await?;
tab.setup(chs, scy).await?;
}
{
let tab = GenTwcsTab::new(
@@ -693,7 +719,7 @@ pub async fn migrate_scylla_data_schema(
["series"],
rett.ttl_channel_status(),
);
tab.setup(do_change, scy).await?;
tab.setup(chs, scy).await?;
}
{
let tab = GenTwcsTab::new(
@@ -711,7 +737,13 @@ pub async fn migrate_scylla_data_schema(
["series"],
rett.ttl_channel_status(),
);
tab.setup(do_change, scy).await?;
tab.setup(chs, scy).await?;
}
if chs.differs() {
chs.log_statements();
Err(Error::BadSchema)
} else {
Ok(())
}
Ok(())
}
+1 -1
View File
@@ -106,7 +106,7 @@ impl BinWriter {
let buf = mem::replace(&mut self.evbuf, ContainerEvents::new());
// TODO bin the more fine grid from the coarse grid, do not clone events
for writer in self.writers.iter_mut() {
writer.ingest(buf.clone(), iqdqs)?;
writer.ingest(&buf, iqdqs)?;
}
} else {
trace_tick_verbose!("tick NOTHING TO INGEST");
+1 -1
View File
@@ -79,7 +79,7 @@ impl BinWriterGrid {
self.shape.clone()
}
pub fn ingest(&mut self, evs: ContainerEvents<f32>, iqdqs: &mut InsertDeques) -> Result<(), Error> {
pub fn ingest(&mut self, evs: &ContainerEvents<f32>, iqdqs: &mut InsertDeques) -> Result<(), Error> {
let _ = iqdqs;
trace_ingest!("{:?} {:?}", self, evs);
self.binner.ingest(evs)?;