Update dependencies

This commit is contained in:
Dominik Werder
2024-12-04 17:57:13 +01:00
parent 63770d4903
commit b38e87833d
12 changed files with 788 additions and 608 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -11,6 +11,7 @@ use err::ThisError;
use futures_util::future;
use futures_util::stream;
use futures_util::StreamExt;
use futures_util::TryStreamExt;
use log::*;
use netpod::ttl::RetentionTime;
use netpod::Database;
@@ -36,6 +37,7 @@ pub enum Error {
ScyllaQuery(#[from] QueryError),
ScyllaNextRowError(#[from] NextRowError),
ScyllaSchema(#[from] scywr::schema::Error),
ScyllaTypeCheck(#[from] scywr::scylla::deserialize::TypeCheckError),
ParseError(String),
InvalidValue,
}
@@ -89,14 +91,15 @@ async fn remove_older_series(
)
.await?;
type RowType = (i64,);
let mut it = it.into_typed::<RowType>();
while let Some(e) = it.next().await {
let row = e?;
let ts_msp = row.0;
let mut it = it.rows_stream::<RowType>()?;
while let Some((ts_msp,)) = it.try_next().await? {
debug!("remove ts_msp {}", ts_msp);
let mut it = scy.execute_iter(qu_delete.clone(), (series as i64, ts_msp)).await?;
let mut it = scy
.execute_iter(qu_delete.clone(), (series as i64, ts_msp))
.await?
.rows_stream::<()>()?;
let mut j = 0;
while let Some(_) = it.next().await {
while let Some(_) = it.try_next().await? {
j += 1;
}
debug!("rows returned {}", j);
@@ -182,12 +185,11 @@ pub async fn remove_older_all_rt(ts_cut: TsMs, ks: &str, rt: RetentionTime, scy:
let stmts = Stmts::new(ks, rt.clone(), &scy).await?;
type RowType = (i64,);
let it = scy.execute_iter(stmts.qu_select_series.as_ref().clone(), ()).await?;
let mut it = it.into_typed::<RowType>();
let mut it = it.rows_stream::<RowType>()?;
let mut series_ids = Vec::with_capacity(1000000);
let print_dt = Duration::from_millis(2000);
let mut print_next = Instant::now() + print_dt;
while let Some(e) = it.next().await {
let row = e?;
while let Some(row) = it.try_next().await? {
let series = SeriesId::new(row.0 as u64);
series_ids.push(series);
let tsnow = Instant::now();
@@ -219,14 +221,13 @@ async fn remove_older_all_series(ts_cut: TsMs, series: SeriesId, stmts: &Stmts,
let mut it = scy
.execute_iter(stmts.qu_select_msp.clone(), (series.to_i64(),))
.await?
.into_typed::<RowType>();
.rows_stream::<RowType>()?;
let mut msp_last = 0;
let mut to_remove = Vec::new();
let mut n_keep = 0;
let mut n_remove = 0;
let ts2 = Instant::now();
while let Some(e) = it.next().await {
let row = e?;
while let Some(row) = it.try_next().await? {
let msp = row.0 as u64;
if msp < msp_last {
panic!("msp ordering error {:?}", series);
@@ -319,10 +320,9 @@ pub async fn find_older_msp(
let mut it = scy
.execute_iter(qu.clone(), (trbeg, trend))
.await?
.into_typed::<(i64, i64)>();
.rows_stream::<(i64, i64)>()?;
let mut c = 0;
while let Some(u) = it.next().await {
let row = u?;
while let Some(row) = it.try_next().await? {
let series = row.0 as u64;
let ts_msp = row.1 as u64;
if series == 9033627543553833740 {

View File

@@ -5,5 +5,5 @@ edition = "2021"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
[dependencies]
hashbrown = "0.14"
hashbrown = "0.15.2"
log = { path = "../log" }

View File

@@ -34,7 +34,7 @@ lazy_static = "1"
libc = "0.2"
slidebuf = "0.0.1"
dashmap = "6.0.1"
hashbrown = "0.14.3"
hashbrown = "0.15.2"
smallvec = "1.13.2"
thiserror = "=0.0.1"
log = { path = "../log" }

View File

@@ -11,6 +11,7 @@ use core::fmt;
use err::thiserror;
use err::ThisError;
use futures_util::StreamExt;
use futures_util::TryStreamExt;
use netpod::log::*;
use netpod::ttl::RetentionTime;
use netpod::ScalarType;
@@ -57,9 +58,9 @@ pub enum Error {
MissingEndDate,
ScyllaTransport(#[from] scylla::transport::errors::NewSessionError),
ScyllaQuery(#[from] scylla::transport::errors::QueryError),
ScyllaRowType(#[from] scylla::transport::query_result::RowsExpectedError),
ScyllaRowError(#[from] scylla::cql_to_rust::FromRowError),
ScyllaNextRow(#[from] scylla::transport::iterator::NextRowError),
ScyllaTypeCheck(#[from] scylla::deserialize::TypeCheckError),
InvalidTimestamp,
}
@@ -157,9 +158,8 @@ async fn delete_try(
let mut it = scy
.execute_iter(qu.clone(), (series.to_i64(),))
.await?
.into_typed::<(i64,)>();
while let Some(x) = it.next().await {
let (msp,) = x?;
.rows_stream::<(i64,)>()?;
while let Some((msp,)) = it.try_next().await? {
let msp = TsMs::from_ms_u64(msp as _);
let msp_ns = msp.ns_u64();
delete_val(series.clone(), msp, beg, end, &qu_delete_val, &scy).await?;
@@ -191,10 +191,8 @@ async fn delete_val(
let mut it = scy
.execute_iter(qu_delete_val.clone(), params)
.await?
.into_typed::<(i64,)>();
while let Some(x) = it.next().await {
let (lsp,) = x?;
}
.rows_stream::<(i64,)>()?;
while let Some((lsp,)) = it.try_next().await? {}
Ok(())
}

View File

@@ -10,11 +10,8 @@ use err::thiserror;
use err::ThisError;
use futures_util::StreamExt;
use futures_util::TryStreamExt;
use items_2::eventsdim0::EventsDim0;
use items_2::eventsdim0::EventsDim0NoPulse;
use items_2::eventsdim0enum::EventsDim0Enum;
use items_2::eventsdim1::EventsDim1;
use items_2::eventsdim1::EventsDim1NoPulse;
use items_2::binning::container_events::ContainerEvents;
use items_2::binning::container_events::EventValueType;
use netpod::log::*;
use netpod::ttl::RetentionTime;
use netpod::EnumVariant;
@@ -283,7 +280,7 @@ async fn post_v01_try(
}
ScalarType::STRING => {
evpush_dim0::<String, _>(&frame, deque, &mut writer, |x| {
DataValue::Scalar(ScalarValue::String(x))
DataValue::Scalar(ScalarValue::String(x.into()))
})?;
}
ScalarType::Enum => {
@@ -351,22 +348,20 @@ fn evpush_dim0<T, F1>(
f1: F1,
) -> Result<(), Error>
where
T: for<'a> Deserialize<'a> + fmt::Debug + Clone,
F1: Fn(T) -> DataValue,
T: EventValueType,
F1: Fn(<T as EventValueType>::IterTy1<'_>) -> DataValue,
{
let evs: EventsDim0NoPulse<T> = ciborium::de::from_reader(Cursor::new(frame))
let evs: ContainerEvents<T> = ciborium::de::from_reader(Cursor::new(frame))
.map_err(|e| {
error!("cbor decode error {e}");
})
.map_err(|_| Error::Decode)?;
let evs: EventsDim0<T> = evs.into();
// trace_input!("see events {:?}", evs);
let stnow = SystemTime::now();
let tsev = TsNano::from_system_time(stnow);
let tsnow = Instant::now();
let mut emit_state = WritableTypeState::new(writer.sid());
for (i, (&ts, val)) in evs.tss.iter().zip(evs.values.iter()).enumerate() {
let ts = TsNano::from_ns(ts);
for (i, (ts, val)) in evs.iter_zip().enumerate() {
let val = val.clone();
trace_input!("ev {:6} {:20} {:20?}", i, ts, val);
let val = f1(val);
@@ -381,7 +376,7 @@ fn evpush_dim0_enum(
deque: &mut VecDeque<QueryItem>,
writer: &mut ValueSeriesWriter,
) -> Result<(), Error> {
let evs: EventsDim0Enum = ciborium::de::from_reader(Cursor::new(frame))
let evs: ContainerEvents<EnumVariant> = ciborium::de::from_reader(Cursor::new(frame))
.map_err(|e| {
error!("cbor decode error {e}");
})
@@ -391,17 +386,10 @@ fn evpush_dim0_enum(
let tsev = TsNano::from_system_time(stnow);
let tsnow = Instant::now();
let mut emit_state = WritableTypeState::new(writer.sid());
for (i, ((&ts, val), vals)) in evs
.tss
.iter()
.zip(evs.values.iter())
.zip(evs.valuestrs.iter())
.enumerate()
{
let ts = TsNano::from_ns(ts);
for (i, (ts, val)) in evs.iter_zip().enumerate() {
let val = val.clone();
trace_input!("ev {:6} {:20} {:20?}", i, ts, val);
let val = DataValue::Scalar(ScalarValue::Enum(val as i16, vals.clone()));
let val = DataValue::Scalar(ScalarValue::Enum(val.ix as i16, val.name.into()));
writer.write(WritableType(ts, val), &mut emit_state, tsnow, tsev, deque)?;
}
Ok(())
@@ -414,23 +402,21 @@ fn evpush_dim1<T, F1>(
f1: F1,
) -> Result<(), Error>
where
T: for<'a> Deserialize<'a> + fmt::Debug + Clone,
F1: Fn(Vec<T>) -> DataValue,
Vec<T>: EventValueType,
F1: Fn(<Vec<T> as EventValueType>::IterTy1<'_>) -> DataValue,
{
let evs: EventsDim1NoPulse<T> = ciborium::de::from_reader(Cursor::new(frame))
let evs: ContainerEvents<Vec<T>> = ciborium::de::from_reader(Cursor::new(frame))
.map_err(|e| {
error!("cbor decode error {e}");
})
.map_err(|_| Error::Decode)?;
let evs: EventsDim1<T> = evs.into();
trace_input!("see events {:?}", evs);
warn!("TODO require timestamp in input format");
let stnow = SystemTime::now();
let tsev = TsNano::from_system_time(stnow);
let tsnow = Instant::now();
let mut emit_state = WritableTypeState::new(writer.sid());
for (i, (&ts, val)) in evs.tss.iter().zip(evs.values.iter()).enumerate() {
let ts = TsNano::from_ns(ts);
for (i, (ts, val)) in evs.iter_zip().enumerate() {
let val = val.clone();
trace_input!("ev {:6} {:20} {:20?}", i, ts, val);
let val = f1(val);

View File

@@ -7,7 +7,7 @@ edition = "2021"
[dependencies]
futures-util = "0.3.28"
async-channel = "2.3.1"
scylla = "0.14.0"
scylla = "0.15.0"
smallvec = "1.11.0"
pin-project = "1.1.5"
stackfuture = "0.3.0"

View File

@@ -21,7 +21,7 @@ pub trait IntoSimplerError {
impl IntoSimplerError for QueryError {
fn into_simpler(self) -> Error {
let e = self;
let e = &self;
match e {
QueryError::DbError(e, msg) => match e {
DbError::Unavailable { .. } => Error::DbUnavailable,
@@ -31,16 +31,8 @@ impl IntoSimplerError for QueryError {
DbError::WriteTimeout { .. } => Error::DbTimeout,
_ => Error::DbError(format!("{e} {msg}")),
},
QueryError::BadQuery(e) => Error::DbError(e.to_string()),
QueryError::IoError(e) => Error::DbError(e.to_string()),
QueryError::ProtocolError(e) => Error::DbError(e.to_string()),
QueryError::InvalidMessage(e) => Error::DbError(e.to_string()),
QueryError::TimeoutError => Error::DbTimeout,
QueryError::TooManyOrphanedStreamIds(e) => Error::DbError(e.to_string()),
QueryError::UnableToAllocStreamId => Error::DbError(e.to_string()),
QueryError::RequestTimeout(e) => Error::DbError(e.to_string()),
QueryError::TranslationError(e) => Error::DbError(e.to_string()),
QueryError::CqlResponseParseError(e) => Error::DbError(e.to_string()),
_ => Error::DbError(e.to_string()),
}
}
}

View File

@@ -663,7 +663,7 @@ impl InsertFut {
Self {
scy,
qu,
fut: Box::pin(async { Err(QueryError::InvalidMessage("no longer used".into())) }),
fut: Box::pin(async { Err(QueryError::TimeoutError) }),
}
}
}

View File

@@ -4,11 +4,10 @@ use crate::session::ScySession;
use err::thiserror;
use err::ThisError;
use futures_util::StreamExt;
use futures_util::TryStreamExt;
use log::*;
use netpod::ttl::RetentionTime;
use scylla::transport::errors::DbError;
use scylla::transport::errors::QueryError;
use scylla::transport::iterator::NextRowError;
use std::collections::BTreeMap;
use std::fmt;
use std::time::Duration;
@@ -18,9 +17,10 @@ use std::time::Duration;
pub enum Error {
NoKeyspaceChosen,
Fmt(#[from] fmt::Error),
Query(#[from] QueryError),
Query(#[from] scylla::transport::errors::QueryError),
NewSession(String),
ScyllaNextRow(#[from] NextRowError),
ScyllaNextRow(#[from] scylla::transport::iterator::NextRowError),
ScyllaTypecheck(#[from] scylla::deserialize::TypeCheckError),
MissingData,
AddColumnImpossible,
BadSchema,
@@ -87,13 +87,10 @@ impl Changeset {
pub async fn has_keyspace(name: &str, scy: &ScySession) -> Result<bool, Error> {
let cql = "select keyspace_name from system_schema.keyspaces where keyspace_name = ?";
let mut res = scy.query_iter(cql, (name,)).await?;
while let Some(k) = res.next().await {
let row = k?;
if let Some(table_name) = row.columns[0].as_ref().unwrap().as_text() {
if table_name == name {
return Ok(true);
}
let mut res = scy.query_iter(cql, (name,)).await?.rows_stream::<(String,)>()?;
while let Some((table_name,)) = res.try_next().await? {
if table_name == name {
return Ok(true);
}
}
Ok(false)
@@ -102,19 +99,17 @@ pub async fn has_keyspace(name: &str, scy: &ScySession) -> Result<bool, Error> {
pub async fn has_table(name: &str, scy: &ScySession) -> Result<bool, Error> {
let cql = "select table_name from system_schema.tables where keyspace_name = ?";
let ks = scy.get_keyspace().ok_or_else(|| Error::NoKeyspaceChosen)?;
let mut res = scy.query_iter(cql, (ks.as_ref(),)).await?;
while let Some(k) = res.next().await {
let row = k?;
if let Some(table_name) = row.columns[0].as_ref().unwrap().as_text() {
if table_name == name {
return Ok(true);
}
let mut res = scy.query_iter(cql, (ks.as_ref(),)).await?.rows_stream::<(String,)>()?;
while let Some((table_name,)) = res.try_next().await? {
if table_name == name {
return Ok(true);
}
}
Ok(false)
}
pub async fn check_table_readable(name: &str, scy: &ScySession) -> Result<bool, Error> {
use crate::scylla::transport::errors::QueryError;
match scy.query_unpaged(format!("select * from {} limit 1", name), ()).await {
Ok(_) => Ok(true),
Err(e) => match &e {
@@ -318,7 +313,8 @@ impl GenTwcsTab {
" from system_schema.tables where keyspace_name = ? and table_name = ?"
);
let x = scy.query_iter(cql, (self.keyspace(), self.name())).await?;
let mut it = x.into_typed::<(i32, i32, BTreeMap<String, String>)>();
let mut it = x.rows_stream::<(i32, i32, BTreeMap<String, String>)>()?;
// let mut it = x.into_typed::<(i32, i32, BTreeMap<String, String>)>();
let mut rows = Vec::new();
while let Some(u) = it.next().await {
let row = u?;
@@ -369,13 +365,12 @@ impl GenTwcsTab {
let mut it = scy
.query_iter(cql, (self.keyspace(), self.name()))
.await?
.into_typed::<(String, String)>();
.rows_stream::<(String, String)>()?;
let mut names_exist = Vec::new();
let mut types_exist = Vec::new();
while let Some(x) = it.next().await {
let row = x?;
names_exist.push(row.0);
types_exist.push(row.1);
while let Some((name, ty)) = it.try_next().await? {
names_exist.push(name);
types_exist.push(ty);
}
debug!("names_exist {:?} types_exist {:?}", names_exist, types_exist);
for (cn, ct) in self.col_names.iter().zip(self.col_types.iter()) {
@@ -433,13 +428,12 @@ async fn get_columns(keyspace: &str, table: &str, scy: &ScySession) -> Result<Ve
let mut it = scy
.query_iter(cql, (keyspace, table))
.await?
.into_typed::<(String, String, String, i32, String)>();
while let Some(x) = it.next().await {
let row = x?;
.rows_stream::<(String, String, String, i32, String)>()?;
while let Some((name, ..)) = it.try_next().await? {
// columns:
// column_name (text)
// type (text): text, blob, int, ...
ret.push(row.0);
ret.push(name);
}
Ok(ret)
}

View File

@@ -1,6 +1,6 @@
use crate::config::ScyllaIngestConfig;
use crate::session::create_session;
use futures_util::StreamExt;
use futures_util::TryStreamExt;
use log::*;
use scylla::transport::errors::NewSessionError;
use scylla::transport::errors::QueryError;
@@ -25,6 +25,12 @@ impl From<QueryError> for Error {
}
}
impl From<scylla::deserialize::TypeCheckError> for Error {
fn from(e: scylla::deserialize::TypeCheckError) -> Self {
Self(err::Error::with_msg_no_trace(format!("{e:?}")))
}
}
pub async fn list_pkey(scylla_conf: &ScyllaIngestConfig) -> Result<(), Error> {
let scy = create_session(scylla_conf)
.await
@@ -39,17 +45,13 @@ pub async fn list_pkey(scylla_conf: &ScyllaIngestConfig) -> Result<(), Error> {
let t2 = if t1 < i64::MAX - td { t1 + td } else { i64::MAX };
let pct = (t1 - i64::MIN) as u64 / (u64::MAX / 100000);
info!("Token range {:.2}%", pct as f32 * 1e-3);
let mut it = scy.execute_iter(query.clone(), (t1, t2)).await?;
while let Some(x) = it.next().await {
let r = x?;
if r.columns.len() < 2 {
warn!("see {} columns", r.columns.len());
} else {
let pulse_a_token = r.columns[0].as_ref().unwrap().as_bigint().unwrap();
let pulse_a = r.columns[1].as_ref().unwrap().as_bigint().unwrap();
info!("pulse_a_token {pulse_a_token} pulse_a {pulse_a}");
pulse_a_max = pulse_a_max.max(pulse_a);
}
let mut it = scy
.execute_iter(query.clone(), (t1, t2))
.await?
.rows_stream::<(i64, i64)>()?;
while let Some((pulse_a_token, pulse_a)) = it.try_next().await? {
info!("pulse_a_token {pulse_a_token} pulse_a {pulse_a}");
pulse_a_max = pulse_a_max.max(pulse_a);
}
if t2 == i64::MAX {
info!("end of token range");
@@ -75,18 +77,12 @@ pub async fn list_pulses(scylla_conf: &ScyllaIngestConfig) -> Result<(), Error>
let t2 = if t1 < i64::MAX - td { t1 + td } else { i64::MAX };
let pct = (t1 - i64::MIN) as u64 / (u64::MAX / 100000);
info!("Token range {:.2}%", pct as f32 * 1e-3);
let mut it = scy.execute_iter(query.clone(), (t1, t2)).await?;
while let Some(x) = it.next().await {
let r = x?;
if r.columns.len() < 2 {
warn!("see {} columns", r.columns.len());
} else {
let tsa_token = r.columns[0].as_ref().unwrap().as_bigint().unwrap();
let tsa = r.columns[1].as_ref().unwrap().as_int().unwrap() as u32;
let tsb = r.columns[2].as_ref().unwrap().as_int().unwrap() as u32;
let pulse = r.columns[3].as_ref().unwrap().as_bigint().unwrap() as u64;
info!("tsa_token {tsa_token:21} tsa {tsa:12} tsb {tsb:12} pulse {pulse:21}");
}
let mut it = scy
.execute_iter(query.clone(), (t1, t2))
.await?
.rows_stream::<(i64, i32, i32, i64)>()?;
while let Some((tsa_token, tsa, tsb, pulse)) = it.try_next().await? {
info!("tsa_token {tsa_token:21} tsa {tsa:12} tsb {tsb:12} pulse {pulse:21}");
}
if t2 == i64::MAX {
info!("end of token range");
@@ -110,20 +106,13 @@ pub async fn fetch_events(backend: &str, channel: &str, scylla_conf: &ScyllaInge
)
.await?;
let mut rowcnt = 0;
let mut it = scy.execute_iter(qu_series.clone(), (backend, channel)).await?;
while let Some(x) = it.next().await {
let r = x?;
info!("Got row: {r:?}");
let mut it = scy
.execute_iter(qu_series.clone(), (backend, channel))
.await?
.rows_stream::<(i64, i32, i32, i64)>()?;
while let Some((tsa_token, tsa, tsb, pulse)) = it.try_next().await? {
if false {
if r.columns.len() < 2 {
warn!("see {} columns", r.columns.len());
} else {
let tsa_token = r.columns[0].as_ref().unwrap().as_bigint().unwrap();
let tsa = r.columns[1].as_ref().unwrap().as_int().unwrap() as u32;
let tsb = r.columns[2].as_ref().unwrap().as_int().unwrap() as u32;
let pulse = r.columns[3].as_ref().unwrap().as_bigint().unwrap() as u64;
info!("tsa_token {tsa_token:21} tsa {tsa:12} tsb {tsb:12} pulse {pulse:21}");
}
info!("tsa_token {tsa_token:21} tsa {tsa:12} tsb {tsb:12} pulse {pulse:21}");
}
rowcnt += 1;
}

View File

@@ -86,13 +86,13 @@ impl BinWriterGrid {
Ok(())
}
fn handle_output_ready(&mut self, out: ContainerBins<f32>, iqdqs: &mut InsertDeques) -> Result<(), Error> {
fn handle_output_ready(&mut self, out: ContainerBins<f32, f32>, iqdqs: &mut InsertDeques) -> Result<(), Error> {
let selfname = "handle_output_ready";
trace_tick!("{selfname} bins ready len {}", out.len());
for e in out.iter_debug() {
trace_tick_verbose!("{e:?}");
}
for (((((((&ts1, &ts2), &cnt), &min), &max), &avg), &lst), &fnl) in out.zip_iter() {
for (((((((&ts1, &ts2), &cnt), min), max), avg), lst), &fnl) in out.zip_iter() {
if fnl == false {
info!("non final bin");
} else if cnt == 0 {