From 8c426dd22ee01ef76bf62fa3be3a77912d08f0e5 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Mon, 30 Sep 2024 09:50:31 +0200 Subject: [PATCH] Update scylla driver --- daqingest/src/tools.rs | 20 ++++----- netfetch/src/metrics/delete.rs | 68 +++++++++-------------------- scywr/Cargo.toml | 2 +- scywr/src/err.rs | 1 + scywr/src/futinsert.rs | 2 +- scywr/src/iteminsertqueue.rs | 12 +++--- scywr/src/schema.rs | 8 ++-- scywr/src/tools.rs | 79 +++++++++++++++++----------------- 8 files changed, 80 insertions(+), 112 deletions(-) diff --git a/daqingest/src/tools.rs b/daqingest/src/tools.rs index 44d80f7..848d979 100644 --- a/daqingest/src/tools.rs +++ b/daqingest/src/tools.rs @@ -94,18 +94,12 @@ async fn remove_older_series( let row = e?; let ts_msp = row.0; debug!("remove ts_msp {}", ts_msp); - let res = scy.execute(&qu_delete, (series as i64, ts_msp)).await?; - { - // informative - if let Some(rows) = res.rows { - debug!("rows returned {}", rows.len()); - for row in rows { - debug!("{:?}", row.columns); - } - } else { - // debug!("delete no rows returned"); - } + let mut it = scy.execute_iter(qu_delete.clone(), (series as i64, ts_msp)).await?; + let mut j = 0; + while let Some(_) = it.next().await { + j += 1; } + debug!("rows returned {}", j); } Ok(()) } @@ -279,7 +273,9 @@ async fn remove_older_all_series_msps( stream::iter(msps.clone()) .map(|msp| async move { let stmt = stmt.clone(); - scy.execute(&stmt, (series.to_i64(), msp as i64)).await + // scy.execute_iter(&stmt, (series.to_i64(), msp as i64)).await + todo!(); + Ok::<_, Error>(0i32) }) .buffer_unordered(32) .take_while(|x| { diff --git a/netfetch/src/metrics/delete.rs b/netfetch/src/metrics/delete.rs index 6f80164..0286e37 100644 --- a/netfetch/src/metrics/delete.rs +++ b/netfetch/src/metrics/delete.rs @@ -10,6 +10,7 @@ use chrono::Utc; use core::fmt; use err::thiserror; use err::ThisError; +use futures_util::StreamExt; use netpod::log::*; use netpod::ttl::RetentionTime; use netpod::ScalarType; @@ -58,6 +59,7 @@ pub enum Error { ScyllaQuery(#[from] scylla::transport::errors::QueryError), ScyllaRowType(#[from] scylla::transport::query_result::RowsExpectedError), ScyllaRowError(#[from] scylla::cql_to_rust::FromRowError), + ScyllaNextRow(#[from] scylla::transport::iterator::NextRowError), InvalidTimestamp, } @@ -150,30 +152,19 @@ async fn delete_try( ); scy.prepare(scylla::query::Query::new(cql).with_page_size(100)).await? }; - let mut pst = None; let mut i = 0; - loop { - // debug_cql!("query iteration {i}"); - let z = scy.execute_paged(&qu, (series.to_i64(),), pst).await?; - pst = z.paging_state.clone(); - for x in z.rows_typed::<(i64,)>()? { - let (msp,) = x?; - let msp = TsMs::from_ms_u64(msp as _); - let msp_ns = msp.ns_u64(); - delete_val(series.clone(), msp, beg, end, &qu_delete_val, &scy).await?; - } - if pst.is_none() { - debug_cql!("last page"); - break; - } - i += 1; - if false { - if i > 20 { - debug_cql!("loop limit"); - break; - } - } + // debug_cql!("query iteration {i}"); + let mut it = scy + .execute_iter(qu.clone(), (series.to_i64(),)) + .await? + .into_typed::<(i64,)>(); + while let Some(x) = it.next().await { + let (msp,) = x?; + let msp = TsMs::from_ms_u64(msp as _); + let msp_ns = msp.ns_u64(); + delete_val(series.clone(), msp, beg, end, &qu_delete_val, &scy).await?; } + i += 1; Ok(Json(serde_json::Value::Null)) } @@ -196,32 +187,13 @@ async fn delete_val( let o1 = DateTime::from_timestamp_millis((msp.ms() + r1 / 1000000) as i64).unwrap(); let o2 = DateTime::from_timestamp_millis((msp.ms() + r2 / 1000000) as i64).unwrap(); debug_cql!(" sub query {o0:?} {o1:?} {o2:?}"); - let mut pst = None; - let mut i = 0; - loop { - // debug_cql!(" sub query iteration {i}"); - let params = (series.to_i64(), msp.ms() as i64, r1 as i64, r2 as i64); - let z = scy.execute_paged(&qu_delete_val, params, pst).await?; - pst = z.paging_state.clone(); - if z.rows_num().is_ok() { - for (i, x) in z.rows_typed::<(i64,)>()?.enumerate() { - let (lsp,) = x?; - if false && i < 4 { - debug_cql!(" lsp {lsp}"); - } - } - } - if pst.is_none() { - // debug_cql!(" last page"); - break; - } - i += 1; - if false { - if i > 20 { - debug_cql!(" loop limit"); - break; - } - } + let params = (series.to_i64(), msp.ms() as i64, r1 as i64, r2 as i64); + let mut it = scy + .execute_iter(qu_delete_val.clone(), params) + .await? + .into_typed::<(i64,)>(); + while let Some(x) = it.next().await { + let (lsp,) = x?; } Ok(()) } diff --git a/scywr/Cargo.toml b/scywr/Cargo.toml index f27dc70..272b071 100644 --- a/scywr/Cargo.toml +++ b/scywr/Cargo.toml @@ -11,7 +11,7 @@ scylla = "0.14.0" smallvec = "1.11.0" pin-project = "1.1.5" stackfuture = "0.3.0" -bytes = "1.6.0" +bytes = "1.7.1" serde = { version = "1", features = ["derive"] } log = { path = "../log" } stats = { path = "../stats" } diff --git a/scywr/src/err.rs b/scywr/src/err.rs index 9de1e5f..d37d45b 100644 --- a/scywr/src/err.rs +++ b/scywr/src/err.rs @@ -40,6 +40,7 @@ impl IntoSimplerError for QueryError { QueryError::UnableToAllocStreamId => Error::DbError(e.to_string()), QueryError::RequestTimeout(e) => Error::DbError(e.to_string()), QueryError::TranslationError(e) => Error::DbError(e.to_string()), + QueryError::CqlResponseParseError(e) => Error::DbError(e.to_string()), } } } diff --git a/scywr/src/futinsert.rs b/scywr/src/futinsert.rs index 87cf87b..3328fbc 100644 --- a/scywr/src/futinsert.rs +++ b/scywr/src/futinsert.rs @@ -27,7 +27,7 @@ impl<'a> ScyInsertFut<'a> { where V: ValueList + SerializeRow + Send + 'static, { - let fut = scy.execute(query, values); + let fut = scy.execute_unpaged(query, values); let fut = Box::pin(fut) as _; let tsnow = Instant::now(); Self { diff --git a/scywr/src/iteminsertqueue.rs b/scywr/src/iteminsertqueue.rs index a43ae0a..576a071 100644 --- a/scywr/src/iteminsertqueue.rs +++ b/scywr/src/iteminsertqueue.rs @@ -17,7 +17,7 @@ use scylla::frame::value::Value; use scylla::frame::value::ValueList; use scylla::prepared_statement::PreparedStatement; use scylla::serialize::row::SerializeRow; -use scylla::serialize::value::SerializeCql; +use scylla::serialize::value::SerializeValue; use scylla::transport::errors::DbError; use scylla::transport::errors::QueryError; use scylla::QueryResult; @@ -546,7 +546,7 @@ struct InsParCom { fn insert_scalar_gen_fut(par: InsParCom, val: ST, qu: Arc, scy: Arc) -> InsertFut where - ST: Value + SerializeCql + Send + 'static, + ST: Value + SerializeValue + Send + 'static, { let params = (par.series.to_i64(), par.ts_msp.to_i64(), par.ts_lsp.to_i64(), val); InsertFut::new(scy, qu, params, par.ts_net, par.stats) @@ -560,8 +560,8 @@ fn insert_scalar_enum_gen_fut( scy: Arc, ) -> InsertFut where - ST1: Value + SerializeCql + Send + 'static, - ST2: Value + SerializeCql + Send + 'static, + ST1: Value + SerializeValue + Send + 'static, + ST2: Value + SerializeValue + Send + 'static, { let params = ( par.series.to_i64(), @@ -573,7 +573,7 @@ where InsertFut::new(scy, qu, params, par.ts_net, par.stats) } -// val: Vec where ST: Value + SerializeCql + Send + 'static, +// val: Vec where ST: Value + SerializeValue + Send + 'static, fn insert_array_gen_fut(par: InsParCom, val: Vec, qu: Arc, scy: Arc) -> InsertFut { let params = (par.series.to_i64(), par.ts_msp.to_i64(), par.ts_lsp.to_i64(), val); InsertFut::new(scy, qu, params, par.ts_net, par.stats) @@ -601,7 +601,7 @@ impl InsertFut { ) -> Self { let scy_ref = unsafe { NonNull::from(scy.as_ref()).as_ref() }; let qu_ref = unsafe { NonNull::from(qu.as_ref()).as_ref() }; - let fut = scy_ref.execute_paged(qu_ref, params, None); + let fut = scy_ref.execute_unpaged(qu_ref, params); let fut = fut.map(move |x| { let dt = tsnet.elapsed(); let dt_ms = 1000 * dt.as_secs() as u32 + dt.subsec_millis(); diff --git a/scywr/src/schema.rs b/scywr/src/schema.rs index f3978a1..ed6144c 100644 --- a/scywr/src/schema.rs +++ b/scywr/src/schema.rs @@ -65,7 +65,7 @@ pub async fn has_table(name: &str, scy: &ScySession) -> Result { } pub async fn check_table_readable(name: &str, scy: &ScySession) -> Result { - match scy.query(format!("select * from {} limit 1", name), ()).await { + match scy.query_unpaged(format!("select * from {} limit 1", name), ()).await { Ok(_) => Ok(true), Err(e) => match &e { QueryError::DbError(e2, msg) => match e2 { @@ -198,7 +198,7 @@ impl GenTwcsTab { if !has_table(self.name(), scy).await? { let cql = self.cql(); info!("scylla create table {} {}", self.name(), cql); - scy.query(cql, ()).await?; + scy.query_unpaged(cql, ()).await?; } Ok(()) } @@ -327,7 +327,7 @@ impl GenTwcsTab { if set_opts.len() != 0 { let cql = format!(concat!("alter table {} with {}"), self.name(), set_opts.join(" and ")); info!("{cql}"); - scy.query(cql, ()).await?; + scy.query_unpaged(cql, ()).await?; } } } else { @@ -390,7 +390,7 @@ impl GenTwcsTab { async fn add_column(&self, name: &str, ty: &str, scy: &ScySession) -> Result<(), Error> { let cql = format!(concat!("alter table {} add {} {}"), self.name(), name, ty); debug!("NOTE add_column CQL {}", cql); - scy.query(cql, ()).await?; + scy.query_unpaged(cql, ()).await?; Ok(()) } } diff --git a/scywr/src/tools.rs b/scywr/src/tools.rs index 84a61ef..46b950d 100644 --- a/scywr/src/tools.rs +++ b/scywr/src/tools.rs @@ -1,5 +1,6 @@ use crate::config::ScyllaIngestConfig; use crate::session::create_session; +use futures_util::StreamExt; use log::*; use scylla::transport::errors::NewSessionError; use scylla::transport::errors::QueryError; @@ -38,17 +39,16 @@ pub async fn list_pkey(scylla_conf: &ScyllaIngestConfig) -> Result<(), Error> { let t2 = if t1 < i64::MAX - td { t1 + td } else { i64::MAX }; let pct = (t1 - i64::MIN) as u64 / (u64::MAX / 100000); info!("Token range {:.2}%", pct as f32 * 1e-3); - let qr = scy.execute(&query, (t1, t2)).await?; - if let Some(rows) = qr.rows { - for r in rows { - if r.columns.len() < 2 { - warn!("see {} columns", r.columns.len()); - } else { - let pulse_a_token = r.columns[0].as_ref().unwrap().as_bigint().unwrap(); - let pulse_a = r.columns[1].as_ref().unwrap().as_bigint().unwrap(); - info!("pulse_a_token {pulse_a_token} pulse_a {pulse_a}"); - pulse_a_max = pulse_a_max.max(pulse_a); - } + let mut it = scy.execute_iter(query.clone(), (t1, t2)).await?; + while let Some(x) = it.next().await { + let r = x?; + if r.columns.len() < 2 { + warn!("see {} columns", r.columns.len()); + } else { + let pulse_a_token = r.columns[0].as_ref().unwrap().as_bigint().unwrap(); + let pulse_a = r.columns[1].as_ref().unwrap().as_bigint().unwrap(); + info!("pulse_a_token {pulse_a_token} pulse_a {pulse_a}"); + pulse_a_max = pulse_a_max.max(pulse_a); } } if t2 == i64::MAX { @@ -75,18 +75,17 @@ pub async fn list_pulses(scylla_conf: &ScyllaIngestConfig) -> Result<(), Error> let t2 = if t1 < i64::MAX - td { t1 + td } else { i64::MAX }; let pct = (t1 - i64::MIN) as u64 / (u64::MAX / 100000); info!("Token range {:.2}%", pct as f32 * 1e-3); - let qr = scy.execute(&query, (t1, t2)).await?; - if let Some(rows) = qr.rows { - for r in rows { - if r.columns.len() < 2 { - warn!("see {} columns", r.columns.len()); - } else { - let tsa_token = r.columns[0].as_ref().unwrap().as_bigint().unwrap(); - let tsa = r.columns[1].as_ref().unwrap().as_int().unwrap() as u32; - let tsb = r.columns[2].as_ref().unwrap().as_int().unwrap() as u32; - let pulse = r.columns[3].as_ref().unwrap().as_bigint().unwrap() as u64; - info!("tsa_token {tsa_token:21} tsa {tsa:12} tsb {tsb:12} pulse {pulse:21}"); - } + let mut it = scy.execute_iter(query.clone(), (t1, t2)).await?; + while let Some(x) = it.next().await { + let r = x?; + if r.columns.len() < 2 { + warn!("see {} columns", r.columns.len()); + } else { + let tsa_token = r.columns[0].as_ref().unwrap().as_bigint().unwrap(); + let tsa = r.columns[1].as_ref().unwrap().as_int().unwrap() as u32; + let tsb = r.columns[2].as_ref().unwrap().as_int().unwrap() as u32; + let pulse = r.columns[3].as_ref().unwrap().as_bigint().unwrap() as u64; + info!("tsa_token {tsa_token:21} tsa {tsa:12} tsb {tsb:12} pulse {pulse:21}"); } } if t2 == i64::MAX { @@ -110,25 +109,25 @@ pub async fn fetch_events(backend: &str, channel: &str, scylla_conf: &ScyllaInge "select series, scalar_type, shape_dims from series_by_channel where facility = ? and channel_name = ?", ) .await?; - let qres = scy.execute(&qu_series, (backend, channel)).await?; - if let Some(rows) = qres.rows { - info!("Found {} matching series", rows.len()); - for r in &rows { - info!("Got row: {r:?}"); - if false { - if r.columns.len() < 2 { - warn!("see {} columns", r.columns.len()); - } else { - let tsa_token = r.columns[0].as_ref().unwrap().as_bigint().unwrap(); - let tsa = r.columns[1].as_ref().unwrap().as_int().unwrap() as u32; - let tsb = r.columns[2].as_ref().unwrap().as_int().unwrap() as u32; - let pulse = r.columns[3].as_ref().unwrap().as_bigint().unwrap() as u64; - info!("tsa_token {tsa_token:21} tsa {tsa:12} tsb {tsb:12} pulse {pulse:21}"); - } + let mut rowcnt = 0; + let mut it = scy.execute_iter(qu_series.clone(), (backend, channel)).await?; + while let Some(x) = it.next().await { + let r = x?; + info!("Got row: {r:?}"); + if false { + if r.columns.len() < 2 { + warn!("see {} columns", r.columns.len()); + } else { + let tsa_token = r.columns[0].as_ref().unwrap().as_bigint().unwrap(); + let tsa = r.columns[1].as_ref().unwrap().as_int().unwrap() as u32; + let tsb = r.columns[2].as_ref().unwrap().as_int().unwrap() as u32; + let pulse = r.columns[3].as_ref().unwrap().as_bigint().unwrap() as u64; + info!("tsa_token {tsa_token:21} tsa {tsa:12} tsb {tsb:12} pulse {pulse:21}"); } } - let _row = rows.into_iter().next().unwrap(); - } else { + rowcnt += 1; + } + if rowcnt == 0 { warn!("No result from series lookup"); } Ok(())