diff --git a/crates/daqbuf-redis/Cargo.toml b/crates/daqbuf-redis/Cargo.toml index ca8bf38..ed7ca41 100644 --- a/crates/daqbuf-redis/Cargo.toml +++ b/crates/daqbuf-redis/Cargo.toml @@ -9,4 +9,4 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" daqbuf-err = { path = "../../../daqbuf-err" } taskrun = { path = "../taskrun" } -redis = { version = "0.26.1", features = [] } +redis = { version = "0.27.6", features = [] } diff --git a/crates/httpret/Cargo.toml b/crates/httpret/Cargo.toml index d0eaf05..cc78958 100644 --- a/crates/httpret/Cargo.toml +++ b/crates/httpret/Cargo.toml @@ -20,14 +20,14 @@ futures-util = "0.3.14" tracing = "0.1" tracing-futures = "0.2" async-channel = "1.9.0" -itertools = "0.11.0" +itertools = "0.13.0" chrono = "0.4.23" md-5 = "0.10.6" regex = "1.10.2" rand = "0.8.5" ciborium = "0.2.1" flate2 = "1" -brotli = "3.4.0" +brotli = "7.0.0" daqbuf-err = { path = "../../../daqbuf-err" } netpod = { path = "../../../daqbuf-netpod", package = "daqbuf-netpod" } query = { path = "../../../daqbuf-query", package = "daqbuf-query" } diff --git a/crates/httpret/src/channelconfig.rs b/crates/httpret/src/channelconfig.rs index ae64e05..2c4ed09 100644 --- a/crates/httpret/src/channelconfig.rs +++ b/crates/httpret/src/channelconfig.rs @@ -1,9 +1,10 @@ use crate::response; use crate::ServiceSharedResources; -use core::fmt; +use daqbuf_err::thiserror; use dbconn::create_connection; use dbconn::worker::PgQueue; use futures_util::StreamExt; +use futures_util::TryStreamExt; use http::Method; use http::StatusCode; use httpclient::body_empty; @@ -40,164 +41,47 @@ use serde::Serialize; use std::collections::BTreeMap; use url::Url; -#[derive(Debug)] +#[derive(Debug, thiserror::Error)] +#[cstm(name = "ChannelConfigError")] pub enum Error { NotFound(SfDbChannel), - ConfigQuorum(nodenet::configquorum::Error), - ConfigNode(nodenet::channelconfig::Error), - Http(crate::Error), - HttpCrate(http::Error), + ConfigQuorum(#[from] nodenet::configquorum::Error), + ConfigNode(#[from] nodenet::channelconfig::Error), + Http(#[from] crate::Error), + HttpCrate(#[from] http::Error), // TODO create dedicated error type for query parsing - BadQuery(daqbuf_err::Error), + BadQuery(#[from] daqbuf_err::Error), MissingBackend, MissingScalarType, MissingShape, MissingShapeKind, MissingEdge, MissingTimerange, - Uri(netpod::UriError), + Uri(#[from] netpod::UriError), ChannelConfigQuery(daqbuf_err::Error), ExpectScyllaBackend, - Pg(dbconn::pg::Error), + Pg(#[from] dbconn::pg::Error), Scylla(String), Join, OtherErr(daqbuf_err::Error), - PgWorker(dbconn::worker::Error), - Async(netpod::AsyncChannelError), - ChannelConfig(dbconn::channelconfig::Error), - Netpod(netpod::NetpodError), -} - -impl fmt::Display for Error { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - let name = "HttpChannelConfigError"; - write!(fmt, "{name}(")?; - match self { - Error::NotFound(chn) => write!(fmt, "NotFound({chn}")?, - Error::ConfigQuorum(e) => write!(fmt, "ConfigQuorum({e})")?, - Error::ConfigNode(e) => write!(fmt, "ConfigNode({e})")?, - Error::Http(e) => write!(fmt, "Http({e})")?, - Error::HttpCrate(e) => write!(fmt, "HttpCrate({e})")?, - Error::BadQuery(e) => write!(fmt, "BadQuery({e})")?, - Error::MissingBackend => write!(fmt, "MissingBackend")?, - Error::MissingScalarType => write!(fmt, "MissingScalarType")?, - Error::MissingShape => write!(fmt, "MissingShape")?, - Error::MissingShapeKind => write!(fmt, "MissingShapeKind")?, - Error::MissingEdge => write!(fmt, "MissingEdge")?, - Error::MissingTimerange => write!(fmt, "MissingTimerange")?, - Error::Uri(x) => write!(fmt, "Uri({x})")?, - Error::ChannelConfigQuery(e) => write!(fmt, "ChannelConfigQuery({e})")?, - Error::ExpectScyllaBackend => write!(fmt, "ExpectScyllaBackend")?, - Error::Pg(e) => write!(fmt, "Pg({e})")?, - Error::Scylla(e) => write!(fmt, "Scylla({e})")?, - Error::Join => write!(fmt, "Join")?, - Error::OtherErr(e) => write!(fmt, "OtherErr({e})")?, - Error::PgWorker(e) => write!(fmt, "PgWorker({e})")?, - Error::Async(e) => write!(fmt, "Async({e})")?, - Error::ChannelConfig(e) => write!(fmt, "ChannelConfig({e})")?, - Error::Netpod(e) => write!(fmt, "Netpod({e})")?, - } - write!(fmt, ")")?; - Ok(()) - } + PgWorker(#[from] dbconn::worker::Error), + Async(#[from] netpod::AsyncChannelError), + ChannelConfig(#[from] dbconn::channelconfig::Error), + Netpod(#[from] netpod::NetpodError), + ScyllaQuery(#[from] scyllaconn::scylla::transport::errors::QueryError), + ScyllaTypeCheck(#[from] scyllaconn::scylla::deserialize::TypeCheckError), } fn other_err_error(e: daqbuf_err::Error) -> Error { Error::OtherErr(e) } -impl std::error::Error for Error {} - -impl From for Error { - fn from(e: crate::Error) -> Self { - Self::Http(e) - } -} -impl From for Error { - fn from(e: http::Error) -> Self { - Self::HttpCrate(e) - } -} - -impl From for Error { - fn from(e: nodenet::configquorum::Error) -> Self { - use nodenet::configquorum::Error::*; - match e { - NotFound(a) => Self::NotFound(a), - _ => Self::ConfigQuorum(e), - } - } -} - -impl From for Error { - fn from(e: nodenet::channelconfig::Error) -> Self { - match e { - nodenet::channelconfig::Error::NotFoundChannel(a) => Self::NotFound(a), - _ => Self::ConfigNode(e), - } - } -} - -impl From for Error { - fn from(e: netpod::UriError) -> Self { - Self::Uri(e) - } -} - -impl From for Error { - fn from(e: dbconn::pg::Error) -> Self { - Self::Pg(e) - } -} - -impl From for Error { - fn from(e: dbconn::worker::Error) -> Self { - Self::PgWorker(e) - } -} - -impl From for Error { - fn from(e: scyllaconn::scylla::cql_to_rust::FromRowError) -> Self { - Self::Scylla(e.to_string()) - } -} - -impl From for Error { - fn from(e: scyllaconn::scylla::transport::errors::QueryError) -> Self { - Self::Scylla(e.to_string()) - } -} - -impl From for Error { - fn from(e: scyllaconn::scylla::transport::iterator::NextRowError) -> Self { - Self::Scylla(e.to_string()) - } -} - impl From for Error { fn from(_e: taskrun::tokio::task::JoinError) -> Self { Self::Join } } -impl From for Error { - fn from(e: netpod::AsyncChannelError) -> Self { - Self::Async(e) - } -} - -impl From for Error { - fn from(e: dbconn::channelconfig::Error) -> Self { - Self::ChannelConfig(e) - } -} - -impl From for Error { - fn from(e: netpod::NetpodError) -> Self { - Self::Netpod(e) - } -} - impl From for crate::err::Error { fn from(e: Error) -> Self { Self::with_msg_no_trace(format!("{e} TODO add public message")) @@ -623,11 +507,9 @@ impl ScyllaChannelsActive { "select series from series_by_ts_msp where part = ? and ts_msp = ? and shape_kind = ? and scalar_type = ?", (part as i32, tsedge as i32, q.shape_kind as i32, q.scalar_type.to_scylla_i32()), ) - .await.map_err(|e| Error::Scylla(e.to_string()))?; - while let Some(row) = res.next().await { - let row = row?; - let (series,): (i64,) = row.into_typed()?; - ret.push(series as u64); + .await?.rows_stream::<(i64,)>()?; + while let Some(row) = res.try_next().await? { + ret.push(row.0 as u64); } } Ok(ret) @@ -1005,17 +887,16 @@ impl GenerateScyllaTestData { let series: u64 = 42001; // TODO query `ts_msp` for all MSP values und use that to delete from event table first. // Only later delete also from the `ts_msp` table. - let it = scy + let mut it = scy .query_iter("select ts_msp from ts_msp where series = ?", (series as i64,)) - .await?; - let mut it = it.into_typed::<(i64,)>(); - while let Some(row) = it.next().await { - let row = row?; + .await? + .rows_stream::<(i64,)>()?; + while let Some(row) = it.try_next().await? { let values = (series as i64, row.0); - scy.query("delete from events_scalar_f64 where series = ? and ts_msp = ?", values) + scy.query_unpaged("delete from events_scalar_f64 where series = ? and ts_msp = ?", values) .await?; } - scy.query("delete from ts_msp where series = ?", (series as i64,)) + scy.query_unpaged("delete from ts_msp where series = ?", (series as i64,)) .await?; // Generate @@ -1023,7 +904,7 @@ impl GenerateScyllaTestData { let mut last = 0; for msp in msps.0.iter().map(|x| *x) { if msp != last { - scy.query( + scy.query_unpaged( "insert into ts_msp (series, ts_msp) values (?, ?)", (series as i64, msp as i64), ) @@ -1032,7 +913,7 @@ impl GenerateScyllaTestData { last = msp; } for (((msp, lsp), pulse), val) in msps.0.into_iter().zip(lsps.0).zip(pulses.0).zip(vals.0) { - scy.query( + scy.query_unpaged( "insert into events_scalar_f64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)", (series as i64, msp as i64, lsp as i64, pulse as i64, val), ) diff --git a/crates/httpret/src/pulsemap.rs b/crates/httpret/src/pulsemap.rs index 1d0f9a6..4b1fc39 100644 --- a/crates/httpret/src/pulsemap.rs +++ b/crates/httpret/src/pulsemap.rs @@ -12,6 +12,7 @@ use chrono::Utc; use futures_util::stream::FuturesOrdered; use futures_util::stream::FuturesUnordered; use futures_util::FutureExt; +use futures_util::TryStreamExt; use http::header; use http::Method; use http::StatusCode; @@ -904,7 +905,7 @@ impl ErrConv for Result { } } -impl ErrConv for Result { +impl ErrConv for Result { fn err_conv(self) -> Result { self.map_err(|e| daqbuf_err::Error::with_msg_no_trace(format!("{e:?}"))) } @@ -940,23 +941,21 @@ impl MapPulseScyllaHandler { let scy = scyllaconn::conn::create_scy_session(&scyconf).await?; let pulse_a = (pulse >> 14) as i64; let pulse_b = (pulse & 0x3fff) as i32; - let res = scy - .query( + let mut it = scy + .query_iter( "select ts_a, ts_b from pulse where pulse_a = ? and pulse_b = ?", (pulse_a, pulse_b), ) .await + .err_conv()? + .rows_stream::<(i64, i32)>() .err_conv()?; - let rows = res.rows().err_conv()?; let ch = "pulsemaptable"; let mut tss = Vec::new(); let mut channels = Vec::new(); - use scylla::frame::response::result::CqlValue; - let ts_a_def = CqlValue::BigInt(0); - let ts_b_def = CqlValue::Int(0); - for row in rows { - let ts_a = row.columns[0].as_ref().unwrap_or(&ts_a_def).as_bigint().unwrap_or(0) as u64; - let ts_b = row.columns[1].as_ref().unwrap_or(&ts_b_def).as_int().unwrap_or(0) as u32 as u64; + while let Some(row) = it.try_next().await.err_conv()? { + let ts_a = row.0 as u64; + let ts_b = row.1 as u64; tss.push(ts_a * netpod::timeunits::SEC + ts_b); channels.push(ch.into()); } diff --git a/crates/scyllaconn/Cargo.toml b/crates/scyllaconn/Cargo.toml index 61e7921..a442fdf 100644 --- a/crates/scyllaconn/Cargo.toml +++ b/crates/scyllaconn/Cargo.toml @@ -11,7 +11,7 @@ path = "src/scyllaconn.rs" futures-util = "0.3.24" pin-project = "1" async-channel = "2.3.1" -scylla = "0.13.0" +scylla = "0.15.0" daqbuf-err = { path = "../../../daqbuf-err" } netpod = { path = "../../../daqbuf-netpod", package = "daqbuf-netpod" } query = { path = "../../../daqbuf-query", package = "daqbuf-query" } diff --git a/crates/scyllaconn/src/accounting/toplist.rs b/crates/scyllaconn/src/accounting/toplist.rs index b6369b7..a98a875 100644 --- a/crates/scyllaconn/src/accounting/toplist.rs +++ b/crates/scyllaconn/src/accounting/toplist.rs @@ -2,6 +2,7 @@ use daqbuf_err as err; use err::thiserror; use err::ThisError; use futures_util::StreamExt; +use futures_util::TryStreamExt; use netpod::log::*; use netpod::ttl::RetentionTime; use netpod::TsMs; @@ -14,6 +15,7 @@ use scylla::Session as ScySession; pub enum Error { ScyllaQuery(#[from] scylla::transport::errors::QueryError), ScyllaNextRow(#[from] scylla::transport::iterator::NextRowError), + ScyllaTypeCheck(#[from] scylla::deserialize::TypeCheckError), UsageDataMalformed, } @@ -118,9 +120,8 @@ async fn read_ts_inner(ks: &str, rt: RetentionTime, ts: TsMs, scy: &ScySession) let mut res = scy .execute_iter(qu.clone(), (part as i32, ts_sec)) .await? - .into_typed::(); - while let Some(row) = res.next().await { - let row = row?; + .rows_stream::()?; + while let Some(row) = res.try_next().await? { let series = row.0 as u64; let count = row.1 as u64; let bytes = row.2 as u64; diff --git a/crates/scyllaconn/src/accounting/totals.rs b/crates/scyllaconn/src/accounting/totals.rs index 099ad22..9443435 100644 --- a/crates/scyllaconn/src/accounting/totals.rs +++ b/crates/scyllaconn/src/accounting/totals.rs @@ -4,7 +4,7 @@ use err::Error; use futures_util::Future; use futures_util::FutureExt; use futures_util::Stream; -use futures_util::StreamExt; +use futures_util::TryStreamExt; use items_0::Empty; use items_0::Extendable; use items_0::WithLen; @@ -36,12 +36,12 @@ async fn read_next( scy.execute_iter(qu.clone(), (part as i32, ts_msp as i64)) .await .err_conv()? - .into_typed::() + .rows_stream::() + .err_conv()? } else { return Err(Error::with_msg_no_trace("no backward support")); }; - while let Some(row) = res.next().await { - let row = row.map_err(Error::from_string)?; + while let Some(row) = res.try_next().await.err_conv()? { let _ts = ts_msp; let _series = row.0 as u64; let count = row.1 as u64; diff --git a/crates/scyllaconn/src/bincache.rs b/crates/scyllaconn/src/bincache.rs index 2af4f31..f840ef3 100644 --- a/crates/scyllaconn/src/bincache.rs +++ b/crates/scyllaconn/src/bincache.rs @@ -1,6 +1,7 @@ use crate::events2::prepare::StmtsCache; use crate::worker::ScyllaQueue; use futures_util::StreamExt; +use futures_util::TryStreamExt; use items_0::timebin::BinsBoxed; use items_2::binning::container_bins::ContainerBins; use netpod::DtMs; @@ -63,7 +64,7 @@ pub async fn worker_write( lst, ); // trace!("cache write {:?}", params); - scy.execute(stmts_cache.st_write_f32(), params) + scy.execute_unpaged(stmts_cache.st_write_f32(), params) .await .map_err(|e| streams::timebin::cached::reader::Error::Scylla(e.to_string()))?; } @@ -90,10 +91,15 @@ pub async fn worker_read( .execute_iter(stmts_cache.st_read_f32().clone(), params) .await .map_err(|e| streams::timebin::cached::reader::Error::Scylla(e.to_string()))?; - let mut it = res.into_typed::<(i32, i64, f32, f32, f32, f32)>(); + let mut it = res + .rows_stream::<(i32, i64, f32, f32, f32, f32)>() + .map_err(|e| streams::timebin::cached::reader::Error::Scylla(e.to_string()))?; let mut bins = ContainerBins::new(); - while let Some(x) = it.next().await { - let row = x.map_err(|e| streams::timebin::cached::reader::Error::Scylla(e.to_string()))?; + while let Some(row) = it + .try_next() + .await + .map_err(|e| streams::timebin::cached::reader::Error::Scylla(e.to_string()))? + { let off = row.0 as u64; let cnt = row.1 as u64; let min = row.2; diff --git a/crates/scyllaconn/src/errconv.rs b/crates/scyllaconn/src/errconv.rs index 0e47bbf..6967d75 100644 --- a/crates/scyllaconn/src/errconv.rs +++ b/crates/scyllaconn/src/errconv.rs @@ -3,7 +3,6 @@ use err::Error; use scylla::frame::response::cql_to_rust::FromRowError as ScyFromRowError; use scylla::transport::errors::NewSessionError as ScyNewSessionError; use scylla::transport::errors::QueryError as ScyQueryError; -use scylla::transport::query_result::RowsExpectedError; pub trait ErrConv { fn err_conv(self) -> Result; @@ -44,7 +43,7 @@ impl ErrConv for Result { } } -impl ErrConv for Result { +impl ErrConv for Result { fn err_conv(self) -> Result { match self { Ok(k) => Ok(k), diff --git a/crates/scyllaconn/src/events.rs b/crates/scyllaconn/src/events.rs index 2289165..75e9419 100644 --- a/crates/scyllaconn/src/events.rs +++ b/crates/scyllaconn/src/events.rs @@ -9,6 +9,7 @@ use err::thiserror; use err::ThisError; use futures_util::Future; use futures_util::StreamExt; +use futures_util::TryStreamExt; use items_0::scalar_ops::ScalarOps; use items_0::timebin::BinningggContainerEventsDyn; use items_0::Appendable; @@ -38,6 +39,7 @@ pub enum Error { ScyllaNextRow(#[from] scylla::transport::iterator::NextRowError), ScyllaTypeConv(#[from] scylla::cql_to_rust::FromRowError), ScyllaWorker(Box), + ScyllaTypeCheck(#[from] scylla::deserialize::TypeCheckError), MissingQuery(String), NotTokenAware, RangeEndOverflow, @@ -55,7 +57,7 @@ impl From for Error { pub(super) trait ValTy: Sized + 'static { type ScaTy: ScalarOps + std::default::Default; - type ScyTy: scylla::cql_to_rust::FromCqlVal; + type ScyTy: for<'a, 'b> scylla::deserialize::DeserializeValue<'a, 'b>; type Container: BinningggContainerEventsDyn + Empty + Appendable; fn from_scyty(inp: Self::ScyTy) -> Self; fn from_valueblob(inp: Vec) -> Self; @@ -521,21 +523,21 @@ where jobtrace.add_event_now(ReadEventKind::CallExecuteIter); let mut res = scy.execute_iter(qu.clone(), params).await?; if use_method_2 == false { - let mut rows = Vec::new(); - while let Some(x) = res.next().await { - rows.push(x?); - } - let mut last_before = None; - let ret = ::convert_rows(rows, range, ts_msp, with_values, !opts.fwd, &mut last_before)?; - ret + // let mut rows = Vec::new(); + // while let Some(x) = res.next().await { + // rows.push(x?); + // } + // let mut last_before = None; + // let ret = ::convert_rows(rows, range, ts_msp, with_values, !opts.fwd, &mut last_before)?; + // ret + todo!() } else { let mut ret = ::Container::empty(); // TODO must branch already here depending on what input columns we expect if with_values { if ::is_valueblob() { - let mut it = res.into_typed::<(i64, Vec)>(); - while let Some(x) = it.next().await { - let row = x?; + let mut it = res.rows_stream::<(i64, Vec)>()?; + while let Some(row) = it.try_next().await? { let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); let value = ::from_valueblob(row.1); ret.push(ts, value); @@ -543,9 +545,8 @@ where ret } else { let mut i = 0; - let mut it = res.into_typed::<(i64, ST::ScyTy)>(); - while let Some(x) = it.next().await { - let row = x?; + let mut it = res.rows_stream::<(i64, ST::ScyTy)>()?; + while let Some(row) = it.try_next().await? { let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); let value = ::from_scyty(row.1); ret.push(ts, value); @@ -560,9 +561,8 @@ where ret } } else { - let mut it = res.into_typed::<(i64,)>(); - while let Some(x) = it.next().await { - let row = x?; + let mut it = res.rows_stream::<(i64,)>()?; + while let Some(row) = it.try_next().await? { let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); let value = ::default(); ret.push(ts, value); @@ -590,16 +590,51 @@ where let params = (series as i64, ts_msp.ms() as i64, ts_lsp_max.ns() as i64); trace!("BCK event search params {:?}", params); let mut res = scy.execute_iter(qu.clone(), params).await?; - let mut rows = Vec::new(); - while let Some(x) = res.next().await { - rows.push(x?); + { + let mut ret = ::Container::empty(); + // TODO must branch already here depending on what input columns we expect + if with_values { + if ::is_valueblob() { + let mut it = res.rows_stream::<(i64, Vec)>()?; + while let Some(row) = it.try_next().await? { + let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); + let value = ::from_valueblob(row.1); + ret.push(ts, value); + } + ret + } else { + let mut i = 0; + let mut it = res.rows_stream::<(i64, ST::ScyTy)>()?; + while let Some(row) = it.try_next().await? { + let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); + let value = ::from_scyty(row.1); + ret.push(ts, value); + i += 1; + if i % 2000 == 0 { + jobtrace.add_event_now(ReadEventKind::ScyllaReadRow(i)); + } + } + { + jobtrace.add_event_now(ReadEventKind::ScyllaReadRowDone(i)); + } + ret + } + } else { + let mut it = res.rows_stream::<(i64,)>()?; + while let Some(row) = it.try_next().await? { + let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); + let value = ::default(); + ret.push(ts, value); + } + ret + } } - let mut _last_before = None; - let ret = ST::convert_rows(rows, range, ts_msp, with_values, !opts.fwd, &mut _last_before)?; - if ret.len() > 1 { - error!("multiple events in backwards search {}", ret.len()); - } - ret + // let mut _last_before = None; + // let ret = ST::convert_rows(rows, range, ts_msp, with_values, !opts.fwd, &mut _last_before)?; + // if ret.len() > 1 { + // error!("multiple events in backwards search {}", ret.len()); + // } + // ret }; trace!("read ts_msp {} len {}", ts_msp.fmt(), ret.len()); let ret = Box::new(ret); @@ -614,53 +649,64 @@ fn convert_rows_0( bck: bool, last_before: &mut Option<(TsNano, ST)>, ) -> Result<::Container, Error> { - let mut ret = ::Container::empty(); - for row in rows { - let (ts, value) = if with_values { - if ST::is_valueblob() { - let row: (i64, Vec) = row.into_typed()?; - // trace!("read a value blob len {}", row.1.len()); - let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); - let value = ValTy::from_valueblob(row.1); - (ts, value) - } else { - let row: (i64, ST::ScyTy) = row.into_typed()?; - let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); - let value = ValTy::from_scyty(row.1); - (ts, value) - } - } else { - let row: (i64,) = row.into_typed()?; - let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); - let value = ValTy::default(); - (ts, value) - }; - if bck { - if ts >= range.beg() { - // TODO count as logic error - error!("ts >= range.beg"); - } else if ts < range.beg() { - ret.push(ts, value); - } else { - *last_before = Some((ts, value)); - } - } else { - if ts >= range.end() { - // TODO count as logic error - error!("ts >= range.end"); - } else if ts >= range.beg() { - ret.push(ts, value); - } else { - if last_before.is_none() { - warn!("encounter event before range in forward read {ts}"); - } - *last_before = Some((ts, value)); - } - } - } - Ok(ret) + todo!() } +// fn convert_rows_0( +// rows: Vec, +// range: ScyllaSeriesRange, +// ts_msp: TsMs, +// with_values: bool, +// bck: bool, +// last_before: &mut Option<(TsNano, ST)>, +// ) -> Result<::Container, Error> { +// let mut ret = ::Container::empty(); +// for row in rows { +// let (ts, value) = if with_values { +// if ST::is_valueblob() { +// let row: (i64, Vec) = row.into_typed()?; +// // trace!("read a value blob len {}", row.1.len()); +// let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); +// let value = ValTy::from_valueblob(row.1); +// (ts, value) +// } else { +// let row: (i64, ST::ScyTy) = row.into_typed()?; +// let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); +// let value = ValTy::from_scyty(row.1); +// (ts, value) +// } +// } else { +// let row: (i64,) = row.into_typed()?; +// let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64); +// let value = ValTy::default(); +// (ts, value) +// }; +// if bck { +// if ts >= range.beg() { +// // TODO count as logic error +// error!("ts >= range.beg"); +// } else if ts < range.beg() { +// ret.push(ts, value); +// } else { +// *last_before = Some((ts, value)); +// } +// } else { +// if ts >= range.end() { +// // TODO count as logic error +// error!("ts >= range.end"); +// } else if ts >= range.beg() { +// ret.push(ts, value); +// } else { +// if last_before.is_none() { +// warn!("encounter event before range in forward read {ts}"); +// } +// *last_before = Some((ts, value)); +// } +// } +// } +// Ok(ret) +// } + fn convert_rows_enum( rows: Vec, range: ScyllaSeriesRange, diff --git a/crates/scyllaconn/src/events2/events.rs b/crates/scyllaconn/src/events2/events.rs index 5e1da21..92fd4a3 100644 --- a/crates/scyllaconn/src/events2/events.rs +++ b/crates/scyllaconn/src/events2/events.rs @@ -466,12 +466,7 @@ impl Stream for EventsStreamRt { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - let mut i = 0usize; loop { - i += 1; - if i > 500000000000 { - panic!("too many iterations") - } if let Some(mut item) = self.out.pop_front() { if item.is_consistent() == false { warn_item!("{}bad item {:?}", "\n\n--------------------------\n", item); diff --git a/crates/scyllaconn/src/events2/msp.rs b/crates/scyllaconn/src/events2/msp.rs index f33853b..5521845 100644 --- a/crates/scyllaconn/src/events2/msp.rs +++ b/crates/scyllaconn/src/events2/msp.rs @@ -8,7 +8,7 @@ use err::ThisError; use futures_util::Future; use futures_util::FutureExt; use futures_util::Stream; -use futures_util::StreamExt; +use futures_util::TryStreamExt; use netpod::log::*; use netpod::ttl::RetentionTime; use netpod::TsMs; @@ -28,6 +28,7 @@ pub enum Error { Worker(Box), ScyllaQuery(#[from] scylla::transport::errors::QueryError), ScyllaRow(#[from] scylla::transport::iterator::NextRowError), + ScyllaTypeCheck(#[from] scylla::deserialize::TypeCheckError), } impl From for Error { @@ -331,9 +332,8 @@ async fn find_ts_msp_fwd( let mut res = scy .execute_iter(stmts.rt(rt).ts_msp_fwd().clone(), params) .await? - .into_typed::<(i64,)>(); - while let Some(x) = res.next().await { - let row = x?; + .rows_stream::<(i64,)>()?; + while let Some(row) = res.try_next().await? { let ts = TsMs::from_ms_u64(row.0 as u64); ret.push_back(ts); } @@ -352,9 +352,8 @@ async fn find_ts_msp_bck( let mut res = scy .execute_iter(stmts.rt(rt).ts_msp_bck().clone(), params) .await? - .into_typed::<(i64,)>(); - while let Some(x) = res.next().await { - let row = x?; + .rows_stream::<(i64,)>()?; + while let Some(row) = res.try_next().await? { let ts = TsMs::from_ms_u64(row.0 as u64); ret.push_front(ts); } diff --git a/crates/scyllaconn/src/schema.rs b/crates/scyllaconn/src/schema.rs index 109971d..4dd6329 100644 --- a/crates/scyllaconn/src/schema.rs +++ b/crates/scyllaconn/src/schema.rs @@ -19,6 +19,6 @@ pub async fn schema(rt: RetentionTime, scyco: &ScyllaConfig, scy: &ScySession) - rt.table_prefix(), table ); - let _ = scy.query(cql, ()).await; + let _ = scy.query_unpaged(cql, ()).await; Ok(()) } diff --git a/crates/scyllaconn/src/status.rs b/crates/scyllaconn/src/status.rs index be52c22..b90b014 100644 --- a/crates/scyllaconn/src/status.rs +++ b/crates/scyllaconn/src/status.rs @@ -4,6 +4,7 @@ use err::Error; use futures_util::Future; use futures_util::FutureExt; use futures_util::Stream; +use futures_util::TryStreamExt; use items_0::isodate::IsoDateTime; use items_0::Empty; use items_0::Extendable; @@ -53,7 +54,7 @@ async fn read_next_status_events( let cql = concat!( "select ts_lsp, kind from channel_status where series = ? and ts_msp = ? and ts_lsp >= ? and ts_lsp < ?" ); - scy.query( + scy.query_iter( cql, (series as i64, ts_msp as i64, ts_lsp_min as i64, ts_lsp_max as i64), ) @@ -73,14 +74,14 @@ async fn read_next_status_events( let cql = concat!( "select ts_lsp, kind from channel_status where series = ? and ts_msp = ? and ts_lsp < ? order by ts_lsp desc limit 1" ); - scy.query(cql, (series as i64, ts_msp as i64, ts_lsp_max as i64)) + scy.query_iter(cql, (series as i64, ts_msp as i64, ts_lsp_max as i64)) .await .err_conv()? }; let mut last_before = None; let mut ret = ChannelStatusEvents::empty(); - for row in res.rows_typed_or_empty::<(i64, i32)>() { - let row = row.err_conv()?; + let mut it = res.rows_stream::<(i64, i32)>().err_conv()?; + while let Some(row) = it.try_next().await.err_conv()? { let ts = ts_msp + row.0 as u64; let kind = row.1 as u32; let datetime = IsoDateTime::from_unix_millis(ts / MS);