Update scylla dependency

This commit is contained in:
Dominik Werder
2024-12-05 17:45:15 +01:00
parent 8aa922a6a1
commit e85728c1e3
14 changed files with 189 additions and 262 deletions

View File

@@ -9,4 +9,4 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
daqbuf-err = { path = "../../../daqbuf-err" }
taskrun = { path = "../taskrun" }
redis = { version = "0.26.1", features = [] }
redis = { version = "0.27.6", features = [] }

View File

@@ -20,14 +20,14 @@ futures-util = "0.3.14"
tracing = "0.1"
tracing-futures = "0.2"
async-channel = "1.9.0"
itertools = "0.11.0"
itertools = "0.13.0"
chrono = "0.4.23"
md-5 = "0.10.6"
regex = "1.10.2"
rand = "0.8.5"
ciborium = "0.2.1"
flate2 = "1"
brotli = "3.4.0"
brotli = "7.0.0"
daqbuf-err = { path = "../../../daqbuf-err" }
netpod = { path = "../../../daqbuf-netpod", package = "daqbuf-netpod" }
query = { path = "../../../daqbuf-query", package = "daqbuf-query" }

View File

@@ -1,9 +1,10 @@
use crate::response;
use crate::ServiceSharedResources;
use core::fmt;
use daqbuf_err::thiserror;
use dbconn::create_connection;
use dbconn::worker::PgQueue;
use futures_util::StreamExt;
use futures_util::TryStreamExt;
use http::Method;
use http::StatusCode;
use httpclient::body_empty;
@@ -40,164 +41,47 @@ use serde::Serialize;
use std::collections::BTreeMap;
use url::Url;
#[derive(Debug)]
#[derive(Debug, thiserror::Error)]
#[cstm(name = "ChannelConfigError")]
pub enum Error {
NotFound(SfDbChannel),
ConfigQuorum(nodenet::configquorum::Error),
ConfigNode(nodenet::channelconfig::Error),
Http(crate::Error),
HttpCrate(http::Error),
ConfigQuorum(#[from] nodenet::configquorum::Error),
ConfigNode(#[from] nodenet::channelconfig::Error),
Http(#[from] crate::Error),
HttpCrate(#[from] http::Error),
// TODO create dedicated error type for query parsing
BadQuery(daqbuf_err::Error),
BadQuery(#[from] daqbuf_err::Error),
MissingBackend,
MissingScalarType,
MissingShape,
MissingShapeKind,
MissingEdge,
MissingTimerange,
Uri(netpod::UriError),
Uri(#[from] netpod::UriError),
ChannelConfigQuery(daqbuf_err::Error),
ExpectScyllaBackend,
Pg(dbconn::pg::Error),
Pg(#[from] dbconn::pg::Error),
Scylla(String),
Join,
OtherErr(daqbuf_err::Error),
PgWorker(dbconn::worker::Error),
Async(netpod::AsyncChannelError),
ChannelConfig(dbconn::channelconfig::Error),
Netpod(netpod::NetpodError),
}
impl fmt::Display for Error {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
let name = "HttpChannelConfigError";
write!(fmt, "{name}(")?;
match self {
Error::NotFound(chn) => write!(fmt, "NotFound({chn}")?,
Error::ConfigQuorum(e) => write!(fmt, "ConfigQuorum({e})")?,
Error::ConfigNode(e) => write!(fmt, "ConfigNode({e})")?,
Error::Http(e) => write!(fmt, "Http({e})")?,
Error::HttpCrate(e) => write!(fmt, "HttpCrate({e})")?,
Error::BadQuery(e) => write!(fmt, "BadQuery({e})")?,
Error::MissingBackend => write!(fmt, "MissingBackend")?,
Error::MissingScalarType => write!(fmt, "MissingScalarType")?,
Error::MissingShape => write!(fmt, "MissingShape")?,
Error::MissingShapeKind => write!(fmt, "MissingShapeKind")?,
Error::MissingEdge => write!(fmt, "MissingEdge")?,
Error::MissingTimerange => write!(fmt, "MissingTimerange")?,
Error::Uri(x) => write!(fmt, "Uri({x})")?,
Error::ChannelConfigQuery(e) => write!(fmt, "ChannelConfigQuery({e})")?,
Error::ExpectScyllaBackend => write!(fmt, "ExpectScyllaBackend")?,
Error::Pg(e) => write!(fmt, "Pg({e})")?,
Error::Scylla(e) => write!(fmt, "Scylla({e})")?,
Error::Join => write!(fmt, "Join")?,
Error::OtherErr(e) => write!(fmt, "OtherErr({e})")?,
Error::PgWorker(e) => write!(fmt, "PgWorker({e})")?,
Error::Async(e) => write!(fmt, "Async({e})")?,
Error::ChannelConfig(e) => write!(fmt, "ChannelConfig({e})")?,
Error::Netpod(e) => write!(fmt, "Netpod({e})")?,
}
write!(fmt, ")")?;
Ok(())
}
PgWorker(#[from] dbconn::worker::Error),
Async(#[from] netpod::AsyncChannelError),
ChannelConfig(#[from] dbconn::channelconfig::Error),
Netpod(#[from] netpod::NetpodError),
ScyllaQuery(#[from] scyllaconn::scylla::transport::errors::QueryError),
ScyllaTypeCheck(#[from] scyllaconn::scylla::deserialize::TypeCheckError),
}
fn other_err_error(e: daqbuf_err::Error) -> Error {
Error::OtherErr(e)
}
impl std::error::Error for Error {}
impl From<crate::Error> for Error {
fn from(e: crate::Error) -> Self {
Self::Http(e)
}
}
impl From<http::Error> for Error {
fn from(e: http::Error) -> Self {
Self::HttpCrate(e)
}
}
impl From<nodenet::configquorum::Error> for Error {
fn from(e: nodenet::configquorum::Error) -> Self {
use nodenet::configquorum::Error::*;
match e {
NotFound(a) => Self::NotFound(a),
_ => Self::ConfigQuorum(e),
}
}
}
impl From<nodenet::channelconfig::Error> for Error {
fn from(e: nodenet::channelconfig::Error) -> Self {
match e {
nodenet::channelconfig::Error::NotFoundChannel(a) => Self::NotFound(a),
_ => Self::ConfigNode(e),
}
}
}
impl From<netpod::UriError> for Error {
fn from(e: netpod::UriError) -> Self {
Self::Uri(e)
}
}
impl From<dbconn::pg::Error> for Error {
fn from(e: dbconn::pg::Error) -> Self {
Self::Pg(e)
}
}
impl From<dbconn::worker::Error> for Error {
fn from(e: dbconn::worker::Error) -> Self {
Self::PgWorker(e)
}
}
impl From<scyllaconn::scylla::cql_to_rust::FromRowError> for Error {
fn from(e: scyllaconn::scylla::cql_to_rust::FromRowError) -> Self {
Self::Scylla(e.to_string())
}
}
impl From<scyllaconn::scylla::transport::errors::QueryError> for Error {
fn from(e: scyllaconn::scylla::transport::errors::QueryError) -> Self {
Self::Scylla(e.to_string())
}
}
impl From<scyllaconn::scylla::transport::iterator::NextRowError> for Error {
fn from(e: scyllaconn::scylla::transport::iterator::NextRowError) -> Self {
Self::Scylla(e.to_string())
}
}
impl From<taskrun::tokio::task::JoinError> for Error {
fn from(_e: taskrun::tokio::task::JoinError) -> Self {
Self::Join
}
}
impl From<netpod::AsyncChannelError> for Error {
fn from(e: netpod::AsyncChannelError) -> Self {
Self::Async(e)
}
}
impl From<dbconn::channelconfig::Error> for Error {
fn from(e: dbconn::channelconfig::Error) -> Self {
Self::ChannelConfig(e)
}
}
impl From<netpod::NetpodError> for Error {
fn from(e: netpod::NetpodError) -> Self {
Self::Netpod(e)
}
}
impl From<Error> for crate::err::Error {
fn from(e: Error) -> Self {
Self::with_msg_no_trace(format!("{e} TODO add public message"))
@@ -623,11 +507,9 @@ impl ScyllaChannelsActive {
"select series from series_by_ts_msp where part = ? and ts_msp = ? and shape_kind = ? and scalar_type = ?",
(part as i32, tsedge as i32, q.shape_kind as i32, q.scalar_type.to_scylla_i32()),
)
.await.map_err(|e| Error::Scylla(e.to_string()))?;
while let Some(row) = res.next().await {
let row = row?;
let (series,): (i64,) = row.into_typed()?;
ret.push(series as u64);
.await?.rows_stream::<(i64,)>()?;
while let Some(row) = res.try_next().await? {
ret.push(row.0 as u64);
}
}
Ok(ret)
@@ -1005,17 +887,16 @@ impl GenerateScyllaTestData {
let series: u64 = 42001;
// TODO query `ts_msp` for all MSP values und use that to delete from event table first.
// Only later delete also from the `ts_msp` table.
let it = scy
let mut it = scy
.query_iter("select ts_msp from ts_msp where series = ?", (series as i64,))
.await?;
let mut it = it.into_typed::<(i64,)>();
while let Some(row) = it.next().await {
let row = row?;
.await?
.rows_stream::<(i64,)>()?;
while let Some(row) = it.try_next().await? {
let values = (series as i64, row.0);
scy.query("delete from events_scalar_f64 where series = ? and ts_msp = ?", values)
scy.query_unpaged("delete from events_scalar_f64 where series = ? and ts_msp = ?", values)
.await?;
}
scy.query("delete from ts_msp where series = ?", (series as i64,))
scy.query_unpaged("delete from ts_msp where series = ?", (series as i64,))
.await?;
// Generate
@@ -1023,7 +904,7 @@ impl GenerateScyllaTestData {
let mut last = 0;
for msp in msps.0.iter().map(|x| *x) {
if msp != last {
scy.query(
scy.query_unpaged(
"insert into ts_msp (series, ts_msp) values (?, ?)",
(series as i64, msp as i64),
)
@@ -1032,7 +913,7 @@ impl GenerateScyllaTestData {
last = msp;
}
for (((msp, lsp), pulse), val) in msps.0.into_iter().zip(lsps.0).zip(pulses.0).zip(vals.0) {
scy.query(
scy.query_unpaged(
"insert into events_scalar_f64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)",
(series as i64, msp as i64, lsp as i64, pulse as i64, val),
)

View File

@@ -12,6 +12,7 @@ use chrono::Utc;
use futures_util::stream::FuturesOrdered;
use futures_util::stream::FuturesUnordered;
use futures_util::FutureExt;
use futures_util::TryStreamExt;
use http::header;
use http::Method;
use http::StatusCode;
@@ -904,7 +905,7 @@ impl<T> ErrConv<T> for Result<T, scylla::transport::errors::QueryError> {
}
}
impl<T> ErrConv<T> for Result<T, scylla::transport::query_result::RowsExpectedError> {
impl<T> ErrConv<T> for Result<T, scylla::deserialize::TypeCheckError> {
fn err_conv(self) -> Result<T, daqbuf_err::Error> {
self.map_err(|e| daqbuf_err::Error::with_msg_no_trace(format!("{e:?}")))
}
@@ -940,23 +941,21 @@ impl MapPulseScyllaHandler {
let scy = scyllaconn::conn::create_scy_session(&scyconf).await?;
let pulse_a = (pulse >> 14) as i64;
let pulse_b = (pulse & 0x3fff) as i32;
let res = scy
.query(
let mut it = scy
.query_iter(
"select ts_a, ts_b from pulse where pulse_a = ? and pulse_b = ?",
(pulse_a, pulse_b),
)
.await
.err_conv()?
.rows_stream::<(i64, i32)>()
.err_conv()?;
let rows = res.rows().err_conv()?;
let ch = "pulsemaptable";
let mut tss = Vec::new();
let mut channels = Vec::new();
use scylla::frame::response::result::CqlValue;
let ts_a_def = CqlValue::BigInt(0);
let ts_b_def = CqlValue::Int(0);
for row in rows {
let ts_a = row.columns[0].as_ref().unwrap_or(&ts_a_def).as_bigint().unwrap_or(0) as u64;
let ts_b = row.columns[1].as_ref().unwrap_or(&ts_b_def).as_int().unwrap_or(0) as u32 as u64;
while let Some(row) = it.try_next().await.err_conv()? {
let ts_a = row.0 as u64;
let ts_b = row.1 as u64;
tss.push(ts_a * netpod::timeunits::SEC + ts_b);
channels.push(ch.into());
}

View File

@@ -11,7 +11,7 @@ path = "src/scyllaconn.rs"
futures-util = "0.3.24"
pin-project = "1"
async-channel = "2.3.1"
scylla = "0.13.0"
scylla = "0.15.0"
daqbuf-err = { path = "../../../daqbuf-err" }
netpod = { path = "../../../daqbuf-netpod", package = "daqbuf-netpod" }
query = { path = "../../../daqbuf-query", package = "daqbuf-query" }

View File

@@ -2,6 +2,7 @@ use daqbuf_err as err;
use err::thiserror;
use err::ThisError;
use futures_util::StreamExt;
use futures_util::TryStreamExt;
use netpod::log::*;
use netpod::ttl::RetentionTime;
use netpod::TsMs;
@@ -14,6 +15,7 @@ use scylla::Session as ScySession;
pub enum Error {
ScyllaQuery(#[from] scylla::transport::errors::QueryError),
ScyllaNextRow(#[from] scylla::transport::iterator::NextRowError),
ScyllaTypeCheck(#[from] scylla::deserialize::TypeCheckError),
UsageDataMalformed,
}
@@ -118,9 +120,8 @@ async fn read_ts_inner(ks: &str, rt: RetentionTime, ts: TsMs, scy: &ScySession)
let mut res = scy
.execute_iter(qu.clone(), (part as i32, ts_sec))
.await?
.into_typed::<RowType>();
while let Some(row) = res.next().await {
let row = row?;
.rows_stream::<RowType>()?;
while let Some(row) = res.try_next().await? {
let series = row.0 as u64;
let count = row.1 as u64;
let bytes = row.2 as u64;

View File

@@ -4,7 +4,7 @@ use err::Error;
use futures_util::Future;
use futures_util::FutureExt;
use futures_util::Stream;
use futures_util::StreamExt;
use futures_util::TryStreamExt;
use items_0::Empty;
use items_0::Extendable;
use items_0::WithLen;
@@ -36,12 +36,12 @@ async fn read_next(
scy.execute_iter(qu.clone(), (part as i32, ts_msp as i64))
.await
.err_conv()?
.into_typed::<RowType>()
.rows_stream::<RowType>()
.err_conv()?
} else {
return Err(Error::with_msg_no_trace("no backward support"));
};
while let Some(row) = res.next().await {
let row = row.map_err(Error::from_string)?;
while let Some(row) = res.try_next().await.err_conv()? {
let _ts = ts_msp;
let _series = row.0 as u64;
let count = row.1 as u64;

View File

@@ -1,6 +1,7 @@
use crate::events2::prepare::StmtsCache;
use crate::worker::ScyllaQueue;
use futures_util::StreamExt;
use futures_util::TryStreamExt;
use items_0::timebin::BinsBoxed;
use items_2::binning::container_bins::ContainerBins;
use netpod::DtMs;
@@ -63,7 +64,7 @@ pub async fn worker_write(
lst,
);
// trace!("cache write {:?}", params);
scy.execute(stmts_cache.st_write_f32(), params)
scy.execute_unpaged(stmts_cache.st_write_f32(), params)
.await
.map_err(|e| streams::timebin::cached::reader::Error::Scylla(e.to_string()))?;
}
@@ -90,10 +91,15 @@ pub async fn worker_read(
.execute_iter(stmts_cache.st_read_f32().clone(), params)
.await
.map_err(|e| streams::timebin::cached::reader::Error::Scylla(e.to_string()))?;
let mut it = res.into_typed::<(i32, i64, f32, f32, f32, f32)>();
let mut it = res
.rows_stream::<(i32, i64, f32, f32, f32, f32)>()
.map_err(|e| streams::timebin::cached::reader::Error::Scylla(e.to_string()))?;
let mut bins = ContainerBins::new();
while let Some(x) = it.next().await {
let row = x.map_err(|e| streams::timebin::cached::reader::Error::Scylla(e.to_string()))?;
while let Some(row) = it
.try_next()
.await
.map_err(|e| streams::timebin::cached::reader::Error::Scylla(e.to_string()))?
{
let off = row.0 as u64;
let cnt = row.1 as u64;
let min = row.2;

View File

@@ -3,7 +3,6 @@ use err::Error;
use scylla::frame::response::cql_to_rust::FromRowError as ScyFromRowError;
use scylla::transport::errors::NewSessionError as ScyNewSessionError;
use scylla::transport::errors::QueryError as ScyQueryError;
use scylla::transport::query_result::RowsExpectedError;
pub trait ErrConv<T> {
fn err_conv(self) -> Result<T, Error>;
@@ -44,7 +43,7 @@ impl<T> ErrConv<T> for Result<T, ScyFromRowError> {
}
}
impl<T> ErrConv<T> for Result<T, RowsExpectedError> {
impl<T> ErrConv<T> for Result<T, scylla::deserialize::TypeCheckError> {
fn err_conv(self) -> Result<T, Error> {
match self {
Ok(k) => Ok(k),

View File

@@ -9,6 +9,7 @@ use err::thiserror;
use err::ThisError;
use futures_util::Future;
use futures_util::StreamExt;
use futures_util::TryStreamExt;
use items_0::scalar_ops::ScalarOps;
use items_0::timebin::BinningggContainerEventsDyn;
use items_0::Appendable;
@@ -38,6 +39,7 @@ pub enum Error {
ScyllaNextRow(#[from] scylla::transport::iterator::NextRowError),
ScyllaTypeConv(#[from] scylla::cql_to_rust::FromRowError),
ScyllaWorker(Box<crate::worker::Error>),
ScyllaTypeCheck(#[from] scylla::deserialize::TypeCheckError),
MissingQuery(String),
NotTokenAware,
RangeEndOverflow,
@@ -55,7 +57,7 @@ impl From<crate::worker::Error> for Error {
pub(super) trait ValTy: Sized + 'static {
type ScaTy: ScalarOps + std::default::Default;
type ScyTy: scylla::cql_to_rust::FromCqlVal<scylla::frame::response::result::CqlValue>;
type ScyTy: for<'a, 'b> scylla::deserialize::DeserializeValue<'a, 'b>;
type Container: BinningggContainerEventsDyn + Empty + Appendable<Self>;
fn from_scyty(inp: Self::ScyTy) -> Self;
fn from_valueblob(inp: Vec<u8>) -> Self;
@@ -521,21 +523,21 @@ where
jobtrace.add_event_now(ReadEventKind::CallExecuteIter);
let mut res = scy.execute_iter(qu.clone(), params).await?;
if use_method_2 == false {
let mut rows = Vec::new();
while let Some(x) = res.next().await {
rows.push(x?);
}
let mut last_before = None;
let ret = <ST as ValTy>::convert_rows(rows, range, ts_msp, with_values, !opts.fwd, &mut last_before)?;
ret
// let mut rows = Vec::new();
// while let Some(x) = res.next().await {
// rows.push(x?);
// }
// let mut last_before = None;
// let ret = <ST as ValTy>::convert_rows(rows, range, ts_msp, with_values, !opts.fwd, &mut last_before)?;
// ret
todo!()
} else {
let mut ret = <ST as ValTy>::Container::empty();
// TODO must branch already here depending on what input columns we expect
if with_values {
if <ST as ValTy>::is_valueblob() {
let mut it = res.into_typed::<(i64, Vec<u8>)>();
while let Some(x) = it.next().await {
let row = x?;
let mut it = res.rows_stream::<(i64, Vec<u8>)>()?;
while let Some(row) = it.try_next().await? {
let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64);
let value = <ST as ValTy>::from_valueblob(row.1);
ret.push(ts, value);
@@ -543,9 +545,8 @@ where
ret
} else {
let mut i = 0;
let mut it = res.into_typed::<(i64, ST::ScyTy)>();
while let Some(x) = it.next().await {
let row = x?;
let mut it = res.rows_stream::<(i64, ST::ScyTy)>()?;
while let Some(row) = it.try_next().await? {
let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64);
let value = <ST as ValTy>::from_scyty(row.1);
ret.push(ts, value);
@@ -560,9 +561,8 @@ where
ret
}
} else {
let mut it = res.into_typed::<(i64,)>();
while let Some(x) = it.next().await {
let row = x?;
let mut it = res.rows_stream::<(i64,)>()?;
while let Some(row) = it.try_next().await? {
let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64);
let value = <ST as ValTy>::default();
ret.push(ts, value);
@@ -590,16 +590,51 @@ where
let params = (series as i64, ts_msp.ms() as i64, ts_lsp_max.ns() as i64);
trace!("BCK event search params {:?}", params);
let mut res = scy.execute_iter(qu.clone(), params).await?;
let mut rows = Vec::new();
while let Some(x) = res.next().await {
rows.push(x?);
{
let mut ret = <ST as ValTy>::Container::empty();
// TODO must branch already here depending on what input columns we expect
if with_values {
if <ST as ValTy>::is_valueblob() {
let mut it = res.rows_stream::<(i64, Vec<u8>)>()?;
while let Some(row) = it.try_next().await? {
let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64);
let value = <ST as ValTy>::from_valueblob(row.1);
ret.push(ts, value);
}
ret
} else {
let mut i = 0;
let mut it = res.rows_stream::<(i64, ST::ScyTy)>()?;
while let Some(row) = it.try_next().await? {
let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64);
let value = <ST as ValTy>::from_scyty(row.1);
ret.push(ts, value);
i += 1;
if i % 2000 == 0 {
jobtrace.add_event_now(ReadEventKind::ScyllaReadRow(i));
}
}
{
jobtrace.add_event_now(ReadEventKind::ScyllaReadRowDone(i));
}
ret
}
} else {
let mut it = res.rows_stream::<(i64,)>()?;
while let Some(row) = it.try_next().await? {
let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64);
let value = <ST as ValTy>::default();
ret.push(ts, value);
}
ret
}
}
let mut _last_before = None;
let ret = ST::convert_rows(rows, range, ts_msp, with_values, !opts.fwd, &mut _last_before)?;
if ret.len() > 1 {
error!("multiple events in backwards search {}", ret.len());
}
ret
// let mut _last_before = None;
// let ret = ST::convert_rows(rows, range, ts_msp, with_values, !opts.fwd, &mut _last_before)?;
// if ret.len() > 1 {
// error!("multiple events in backwards search {}", ret.len());
// }
// ret
};
trace!("read ts_msp {} len {}", ts_msp.fmt(), ret.len());
let ret = Box::new(ret);
@@ -614,53 +649,64 @@ fn convert_rows_0<ST: ValTy>(
bck: bool,
last_before: &mut Option<(TsNano, ST)>,
) -> Result<<ST as ValTy>::Container, Error> {
let mut ret = <ST as ValTy>::Container::empty();
for row in rows {
let (ts, value) = if with_values {
if ST::is_valueblob() {
let row: (i64, Vec<u8>) = row.into_typed()?;
// trace!("read a value blob len {}", row.1.len());
let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64);
let value = ValTy::from_valueblob(row.1);
(ts, value)
} else {
let row: (i64, ST::ScyTy) = row.into_typed()?;
let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64);
let value = ValTy::from_scyty(row.1);
(ts, value)
}
} else {
let row: (i64,) = row.into_typed()?;
let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64);
let value = ValTy::default();
(ts, value)
};
if bck {
if ts >= range.beg() {
// TODO count as logic error
error!("ts >= range.beg");
} else if ts < range.beg() {
ret.push(ts, value);
} else {
*last_before = Some((ts, value));
}
} else {
if ts >= range.end() {
// TODO count as logic error
error!("ts >= range.end");
} else if ts >= range.beg() {
ret.push(ts, value);
} else {
if last_before.is_none() {
warn!("encounter event before range in forward read {ts}");
}
*last_before = Some((ts, value));
}
}
}
Ok(ret)
todo!()
}
// fn convert_rows_0<ST: ValTy>(
// rows: Vec<Row>,
// range: ScyllaSeriesRange,
// ts_msp: TsMs,
// with_values: bool,
// bck: bool,
// last_before: &mut Option<(TsNano, ST)>,
// ) -> Result<<ST as ValTy>::Container, Error> {
// let mut ret = <ST as ValTy>::Container::empty();
// for row in rows {
// let (ts, value) = if with_values {
// if ST::is_valueblob() {
// let row: (i64, Vec<u8>) = row.into_typed()?;
// // trace!("read a value blob len {}", row.1.len());
// let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64);
// let value = ValTy::from_valueblob(row.1);
// (ts, value)
// } else {
// let row: (i64, ST::ScyTy) = row.into_typed()?;
// let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64);
// let value = ValTy::from_scyty(row.1);
// (ts, value)
// }
// } else {
// let row: (i64,) = row.into_typed()?;
// let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64);
// let value = ValTy::default();
// (ts, value)
// };
// if bck {
// if ts >= range.beg() {
// // TODO count as logic error
// error!("ts >= range.beg");
// } else if ts < range.beg() {
// ret.push(ts, value);
// } else {
// *last_before = Some((ts, value));
// }
// } else {
// if ts >= range.end() {
// // TODO count as logic error
// error!("ts >= range.end");
// } else if ts >= range.beg() {
// ret.push(ts, value);
// } else {
// if last_before.is_none() {
// warn!("encounter event before range in forward read {ts}");
// }
// *last_before = Some((ts, value));
// }
// }
// }
// Ok(ret)
// }
fn convert_rows_enum(
rows: Vec<Row>,
range: ScyllaSeriesRange,

View File

@@ -466,12 +466,7 @@ impl Stream for EventsStreamRt {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
let mut i = 0usize;
loop {
i += 1;
if i > 500000000000 {
panic!("too many iterations")
}
if let Some(mut item) = self.out.pop_front() {
if item.is_consistent() == false {
warn_item!("{}bad item {:?}", "\n\n--------------------------\n", item);

View File

@@ -8,7 +8,7 @@ use err::ThisError;
use futures_util::Future;
use futures_util::FutureExt;
use futures_util::Stream;
use futures_util::StreamExt;
use futures_util::TryStreamExt;
use netpod::log::*;
use netpod::ttl::RetentionTime;
use netpod::TsMs;
@@ -28,6 +28,7 @@ pub enum Error {
Worker(Box<crate::worker::Error>),
ScyllaQuery(#[from] scylla::transport::errors::QueryError),
ScyllaRow(#[from] scylla::transport::iterator::NextRowError),
ScyllaTypeCheck(#[from] scylla::deserialize::TypeCheckError),
}
impl From<crate::worker::Error> for Error {
@@ -331,9 +332,8 @@ async fn find_ts_msp_fwd(
let mut res = scy
.execute_iter(stmts.rt(rt).ts_msp_fwd().clone(), params)
.await?
.into_typed::<(i64,)>();
while let Some(x) = res.next().await {
let row = x?;
.rows_stream::<(i64,)>()?;
while let Some(row) = res.try_next().await? {
let ts = TsMs::from_ms_u64(row.0 as u64);
ret.push_back(ts);
}
@@ -352,9 +352,8 @@ async fn find_ts_msp_bck(
let mut res = scy
.execute_iter(stmts.rt(rt).ts_msp_bck().clone(), params)
.await?
.into_typed::<(i64,)>();
while let Some(x) = res.next().await {
let row = x?;
.rows_stream::<(i64,)>()?;
while let Some(row) = res.try_next().await? {
let ts = TsMs::from_ms_u64(row.0 as u64);
ret.push_front(ts);
}

View File

@@ -19,6 +19,6 @@ pub async fn schema(rt: RetentionTime, scyco: &ScyllaConfig, scy: &ScySession) -
rt.table_prefix(),
table
);
let _ = scy.query(cql, ()).await;
let _ = scy.query_unpaged(cql, ()).await;
Ok(())
}

View File

@@ -4,6 +4,7 @@ use err::Error;
use futures_util::Future;
use futures_util::FutureExt;
use futures_util::Stream;
use futures_util::TryStreamExt;
use items_0::isodate::IsoDateTime;
use items_0::Empty;
use items_0::Extendable;
@@ -53,7 +54,7 @@ async fn read_next_status_events(
let cql = concat!(
"select ts_lsp, kind from channel_status where series = ? and ts_msp = ? and ts_lsp >= ? and ts_lsp < ?"
);
scy.query(
scy.query_iter(
cql,
(series as i64, ts_msp as i64, ts_lsp_min as i64, ts_lsp_max as i64),
)
@@ -73,14 +74,14 @@ async fn read_next_status_events(
let cql = concat!(
"select ts_lsp, kind from channel_status where series = ? and ts_msp = ? and ts_lsp < ? order by ts_lsp desc limit 1"
);
scy.query(cql, (series as i64, ts_msp as i64, ts_lsp_max as i64))
scy.query_iter(cql, (series as i64, ts_msp as i64, ts_lsp_max as i64))
.await
.err_conv()?
};
let mut last_before = None;
let mut ret = ChannelStatusEvents::empty();
for row in res.rows_typed_or_empty::<(i64, i32)>() {
let row = row.err_conv()?;
let mut it = res.rows_stream::<(i64, i32)>().err_conv()?;
while let Some(row) = it.try_next().await.err_conv()? {
let ts = ts_msp + row.0 as u64;
let kind = row.1 as u32;
let datetime = IsoDateTime::from_unix_millis(ts / MS);