Update scylla driver

This commit is contained in:
Dominik Werder
2024-09-30 09:50:31 +02:00
parent ca8ff71238
commit 8c426dd22e
8 changed files with 80 additions and 112 deletions

View File

@@ -94,18 +94,12 @@ async fn remove_older_series(
let row = e?;
let ts_msp = row.0;
debug!("remove ts_msp {}", ts_msp);
let res = scy.execute(&qu_delete, (series as i64, ts_msp)).await?;
{
// informative
if let Some(rows) = res.rows {
debug!("rows returned {}", rows.len());
for row in rows {
debug!("{:?}", row.columns);
}
} else {
// debug!("delete no rows returned");
}
let mut it = scy.execute_iter(qu_delete.clone(), (series as i64, ts_msp)).await?;
let mut j = 0;
while let Some(_) = it.next().await {
j += 1;
}
debug!("rows returned {}", j);
}
Ok(())
}
@@ -279,7 +273,9 @@ async fn remove_older_all_series_msps(
stream::iter(msps.clone())
.map(|msp| async move {
let stmt = stmt.clone();
scy.execute(&stmt, (series.to_i64(), msp as i64)).await
// scy.execute_iter(&stmt, (series.to_i64(), msp as i64)).await
todo!();
Ok::<_, Error>(0i32)
})
.buffer_unordered(32)
.take_while(|x| {

View File

@@ -10,6 +10,7 @@ use chrono::Utc;
use core::fmt;
use err::thiserror;
use err::ThisError;
use futures_util::StreamExt;
use netpod::log::*;
use netpod::ttl::RetentionTime;
use netpod::ScalarType;
@@ -58,6 +59,7 @@ pub enum Error {
ScyllaQuery(#[from] scylla::transport::errors::QueryError),
ScyllaRowType(#[from] scylla::transport::query_result::RowsExpectedError),
ScyllaRowError(#[from] scylla::cql_to_rust::FromRowError),
ScyllaNextRow(#[from] scylla::transport::iterator::NextRowError),
InvalidTimestamp,
}
@@ -150,30 +152,19 @@ async fn delete_try(
);
scy.prepare(scylla::query::Query::new(cql).with_page_size(100)).await?
};
let mut pst = None;
let mut i = 0;
loop {
// debug_cql!("query iteration {i}");
let z = scy.execute_paged(&qu, (series.to_i64(),), pst).await?;
pst = z.paging_state.clone();
for x in z.rows_typed::<(i64,)>()? {
let (msp,) = x?;
let msp = TsMs::from_ms_u64(msp as _);
let msp_ns = msp.ns_u64();
delete_val(series.clone(), msp, beg, end, &qu_delete_val, &scy).await?;
}
if pst.is_none() {
debug_cql!("last page");
break;
}
i += 1;
if false {
if i > 20 {
debug_cql!("loop limit");
break;
}
}
// debug_cql!("query iteration {i}");
let mut it = scy
.execute_iter(qu.clone(), (series.to_i64(),))
.await?
.into_typed::<(i64,)>();
while let Some(x) = it.next().await {
let (msp,) = x?;
let msp = TsMs::from_ms_u64(msp as _);
let msp_ns = msp.ns_u64();
delete_val(series.clone(), msp, beg, end, &qu_delete_val, &scy).await?;
}
i += 1;
Ok(Json(serde_json::Value::Null))
}
@@ -196,32 +187,13 @@ async fn delete_val(
let o1 = DateTime::from_timestamp_millis((msp.ms() + r1 / 1000000) as i64).unwrap();
let o2 = DateTime::from_timestamp_millis((msp.ms() + r2 / 1000000) as i64).unwrap();
debug_cql!(" sub query {o0:?} {o1:?} {o2:?}");
let mut pst = None;
let mut i = 0;
loop {
// debug_cql!(" sub query iteration {i}");
let params = (series.to_i64(), msp.ms() as i64, r1 as i64, r2 as i64);
let z = scy.execute_paged(&qu_delete_val, params, pst).await?;
pst = z.paging_state.clone();
if z.rows_num().is_ok() {
for (i, x) in z.rows_typed::<(i64,)>()?.enumerate() {
let (lsp,) = x?;
if false && i < 4 {
debug_cql!(" lsp {lsp}");
}
}
}
if pst.is_none() {
// debug_cql!(" last page");
break;
}
i += 1;
if false {
if i > 20 {
debug_cql!(" loop limit");
break;
}
}
let params = (series.to_i64(), msp.ms() as i64, r1 as i64, r2 as i64);
let mut it = scy
.execute_iter(qu_delete_val.clone(), params)
.await?
.into_typed::<(i64,)>();
while let Some(x) = it.next().await {
let (lsp,) = x?;
}
Ok(())
}

View File

@@ -11,7 +11,7 @@ scylla = "0.14.0"
smallvec = "1.11.0"
pin-project = "1.1.5"
stackfuture = "0.3.0"
bytes = "1.6.0"
bytes = "1.7.1"
serde = { version = "1", features = ["derive"] }
log = { path = "../log" }
stats = { path = "../stats" }

View File

@@ -40,6 +40,7 @@ impl IntoSimplerError for QueryError {
QueryError::UnableToAllocStreamId => Error::DbError(e.to_string()),
QueryError::RequestTimeout(e) => Error::DbError(e.to_string()),
QueryError::TranslationError(e) => Error::DbError(e.to_string()),
QueryError::CqlResponseParseError(e) => Error::DbError(e.to_string()),
}
}
}

View File

@@ -27,7 +27,7 @@ impl<'a> ScyInsertFut<'a> {
where
V: ValueList + SerializeRow + Send + 'static,
{
let fut = scy.execute(query, values);
let fut = scy.execute_unpaged(query, values);
let fut = Box::pin(fut) as _;
let tsnow = Instant::now();
Self {

View File

@@ -17,7 +17,7 @@ use scylla::frame::value::Value;
use scylla::frame::value::ValueList;
use scylla::prepared_statement::PreparedStatement;
use scylla::serialize::row::SerializeRow;
use scylla::serialize::value::SerializeCql;
use scylla::serialize::value::SerializeValue;
use scylla::transport::errors::DbError;
use scylla::transport::errors::QueryError;
use scylla::QueryResult;
@@ -546,7 +546,7 @@ struct InsParCom {
fn insert_scalar_gen_fut<ST>(par: InsParCom, val: ST, qu: Arc<PreparedStatement>, scy: Arc<ScySession>) -> InsertFut
where
ST: Value + SerializeCql + Send + 'static,
ST: Value + SerializeValue + Send + 'static,
{
let params = (par.series.to_i64(), par.ts_msp.to_i64(), par.ts_lsp.to_i64(), val);
InsertFut::new(scy, qu, params, par.ts_net, par.stats)
@@ -560,8 +560,8 @@ fn insert_scalar_enum_gen_fut<ST1, ST2>(
scy: Arc<ScySession>,
) -> InsertFut
where
ST1: Value + SerializeCql + Send + 'static,
ST2: Value + SerializeCql + Send + 'static,
ST1: Value + SerializeValue + Send + 'static,
ST2: Value + SerializeValue + Send + 'static,
{
let params = (
par.series.to_i64(),
@@ -573,7 +573,7 @@ where
InsertFut::new(scy, qu, params, par.ts_net, par.stats)
}
// val: Vec<ST> where ST: Value + SerializeCql + Send + 'static,
// val: Vec<ST> where ST: Value + SerializeValue + Send + 'static,
fn insert_array_gen_fut(par: InsParCom, val: Vec<u8>, qu: Arc<PreparedStatement>, scy: Arc<ScySession>) -> InsertFut {
let params = (par.series.to_i64(), par.ts_msp.to_i64(), par.ts_lsp.to_i64(), val);
InsertFut::new(scy, qu, params, par.ts_net, par.stats)
@@ -601,7 +601,7 @@ impl InsertFut {
) -> Self {
let scy_ref = unsafe { NonNull::from(scy.as_ref()).as_ref() };
let qu_ref = unsafe { NonNull::from(qu.as_ref()).as_ref() };
let fut = scy_ref.execute_paged(qu_ref, params, None);
let fut = scy_ref.execute_unpaged(qu_ref, params);
let fut = fut.map(move |x| {
let dt = tsnet.elapsed();
let dt_ms = 1000 * dt.as_secs() as u32 + dt.subsec_millis();

View File

@@ -65,7 +65,7 @@ pub async fn has_table(name: &str, scy: &ScySession) -> Result<bool, Error> {
}
pub async fn check_table_readable(name: &str, scy: &ScySession) -> Result<bool, Error> {
match scy.query(format!("select * from {} limit 1", name), ()).await {
match scy.query_unpaged(format!("select * from {} limit 1", name), ()).await {
Ok(_) => Ok(true),
Err(e) => match &e {
QueryError::DbError(e2, msg) => match e2 {
@@ -198,7 +198,7 @@ impl GenTwcsTab {
if !has_table(self.name(), scy).await? {
let cql = self.cql();
info!("scylla create table {} {}", self.name(), cql);
scy.query(cql, ()).await?;
scy.query_unpaged(cql, ()).await?;
}
Ok(())
}
@@ -327,7 +327,7 @@ impl GenTwcsTab {
if set_opts.len() != 0 {
let cql = format!(concat!("alter table {} with {}"), self.name(), set_opts.join(" and "));
info!("{cql}");
scy.query(cql, ()).await?;
scy.query_unpaged(cql, ()).await?;
}
}
} else {
@@ -390,7 +390,7 @@ impl GenTwcsTab {
async fn add_column(&self, name: &str, ty: &str, scy: &ScySession) -> Result<(), Error> {
let cql = format!(concat!("alter table {} add {} {}"), self.name(), name, ty);
debug!("NOTE add_column CQL {}", cql);
scy.query(cql, ()).await?;
scy.query_unpaged(cql, ()).await?;
Ok(())
}
}

View File

@@ -1,5 +1,6 @@
use crate::config::ScyllaIngestConfig;
use crate::session::create_session;
use futures_util::StreamExt;
use log::*;
use scylla::transport::errors::NewSessionError;
use scylla::transport::errors::QueryError;
@@ -38,17 +39,16 @@ pub async fn list_pkey(scylla_conf: &ScyllaIngestConfig) -> Result<(), Error> {
let t2 = if t1 < i64::MAX - td { t1 + td } else { i64::MAX };
let pct = (t1 - i64::MIN) as u64 / (u64::MAX / 100000);
info!("Token range {:.2}%", pct as f32 * 1e-3);
let qr = scy.execute(&query, (t1, t2)).await?;
if let Some(rows) = qr.rows {
for r in rows {
if r.columns.len() < 2 {
warn!("see {} columns", r.columns.len());
} else {
let pulse_a_token = r.columns[0].as_ref().unwrap().as_bigint().unwrap();
let pulse_a = r.columns[1].as_ref().unwrap().as_bigint().unwrap();
info!("pulse_a_token {pulse_a_token} pulse_a {pulse_a}");
pulse_a_max = pulse_a_max.max(pulse_a);
}
let mut it = scy.execute_iter(query.clone(), (t1, t2)).await?;
while let Some(x) = it.next().await {
let r = x?;
if r.columns.len() < 2 {
warn!("see {} columns", r.columns.len());
} else {
let pulse_a_token = r.columns[0].as_ref().unwrap().as_bigint().unwrap();
let pulse_a = r.columns[1].as_ref().unwrap().as_bigint().unwrap();
info!("pulse_a_token {pulse_a_token} pulse_a {pulse_a}");
pulse_a_max = pulse_a_max.max(pulse_a);
}
}
if t2 == i64::MAX {
@@ -75,18 +75,17 @@ pub async fn list_pulses(scylla_conf: &ScyllaIngestConfig) -> Result<(), Error>
let t2 = if t1 < i64::MAX - td { t1 + td } else { i64::MAX };
let pct = (t1 - i64::MIN) as u64 / (u64::MAX / 100000);
info!("Token range {:.2}%", pct as f32 * 1e-3);
let qr = scy.execute(&query, (t1, t2)).await?;
if let Some(rows) = qr.rows {
for r in rows {
if r.columns.len() < 2 {
warn!("see {} columns", r.columns.len());
} else {
let tsa_token = r.columns[0].as_ref().unwrap().as_bigint().unwrap();
let tsa = r.columns[1].as_ref().unwrap().as_int().unwrap() as u32;
let tsb = r.columns[2].as_ref().unwrap().as_int().unwrap() as u32;
let pulse = r.columns[3].as_ref().unwrap().as_bigint().unwrap() as u64;
info!("tsa_token {tsa_token:21} tsa {tsa:12} tsb {tsb:12} pulse {pulse:21}");
}
let mut it = scy.execute_iter(query.clone(), (t1, t2)).await?;
while let Some(x) = it.next().await {
let r = x?;
if r.columns.len() < 2 {
warn!("see {} columns", r.columns.len());
} else {
let tsa_token = r.columns[0].as_ref().unwrap().as_bigint().unwrap();
let tsa = r.columns[1].as_ref().unwrap().as_int().unwrap() as u32;
let tsb = r.columns[2].as_ref().unwrap().as_int().unwrap() as u32;
let pulse = r.columns[3].as_ref().unwrap().as_bigint().unwrap() as u64;
info!("tsa_token {tsa_token:21} tsa {tsa:12} tsb {tsb:12} pulse {pulse:21}");
}
}
if t2 == i64::MAX {
@@ -110,25 +109,25 @@ pub async fn fetch_events(backend: &str, channel: &str, scylla_conf: &ScyllaInge
"select series, scalar_type, shape_dims from series_by_channel where facility = ? and channel_name = ?",
)
.await?;
let qres = scy.execute(&qu_series, (backend, channel)).await?;
if let Some(rows) = qres.rows {
info!("Found {} matching series", rows.len());
for r in &rows {
info!("Got row: {r:?}");
if false {
if r.columns.len() < 2 {
warn!("see {} columns", r.columns.len());
} else {
let tsa_token = r.columns[0].as_ref().unwrap().as_bigint().unwrap();
let tsa = r.columns[1].as_ref().unwrap().as_int().unwrap() as u32;
let tsb = r.columns[2].as_ref().unwrap().as_int().unwrap() as u32;
let pulse = r.columns[3].as_ref().unwrap().as_bigint().unwrap() as u64;
info!("tsa_token {tsa_token:21} tsa {tsa:12} tsb {tsb:12} pulse {pulse:21}");
}
let mut rowcnt = 0;
let mut it = scy.execute_iter(qu_series.clone(), (backend, channel)).await?;
while let Some(x) = it.next().await {
let r = x?;
info!("Got row: {r:?}");
if false {
if r.columns.len() < 2 {
warn!("see {} columns", r.columns.len());
} else {
let tsa_token = r.columns[0].as_ref().unwrap().as_bigint().unwrap();
let tsa = r.columns[1].as_ref().unwrap().as_int().unwrap() as u32;
let tsb = r.columns[2].as_ref().unwrap().as_int().unwrap() as u32;
let pulse = r.columns[3].as_ref().unwrap().as_bigint().unwrap() as u64;
info!("tsa_token {tsa_token:21} tsa {tsa:12} tsb {tsb:12} pulse {pulse:21}");
}
}
let _row = rows.into_iter().next().unwrap();
} else {
rowcnt += 1;
}
if rowcnt == 0 {
warn!("No result from series lookup");
}
Ok(())