Interleave query

This commit is contained in:
Dominik Werder
2024-10-29 12:40:38 +01:00
parent 013c888265
commit eb49ee9296
8 changed files with 419 additions and 139 deletions

View File

@@ -73,7 +73,7 @@ where
fn result_and_reset_for_new_bin(&mut self, filled_width_fraction: f32) -> EVT::AggTimeWeightOutputAvg {
let sum = self.sum.clone();
trace!("result_and_reset_for_new_bin sum {} {}", sum, filled_width_fraction);
trace_result!("result_and_reset_for_new_bin sum {} {}", sum, filled_width_fraction);
self.sum = 0.;
sum / filled_width_fraction as f64
}

View File

@@ -472,7 +472,6 @@ where
avgs,
};
let ret = serde_json::to_value(&val).map_err(err::Error::from_string);
info!("VALUE: {:?}", ret);
ret
}
}
@@ -540,10 +539,7 @@ where
range: Option<netpod::range::evrange::SeriesRange>,
binrange: Option<netpod::BinnedRangeEnum>,
) -> Result<Box<dyn items_0::collect_s::CollectedDyn>, err::Error> {
info!(
"----------- ContainerBinsCollector result called len {}",
self.len()
);
// TODO do we need to set timeout, continueAt or anything?
let bins = mem::replace(&mut self.bins, ContainerBins::new());
let ret = ContainerBinsCollectorOutput { bins };
Ok(Box::new(ret))

View File

@@ -18,28 +18,28 @@ use std::mem;
macro_rules! trace_ { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) }
#[allow(unused)]
macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) }
macro_rules! trace_init { ($($arg:tt)*) => ( if false { trace_!($($arg)*); }) }
#[allow(unused)]
macro_rules! trace_cycle { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) }
macro_rules! trace_cycle { ($($arg:tt)*) => ( if false { trace_!($($arg)*); }) }
#[allow(unused)]
macro_rules! trace_event_next { ($($arg:tt)*) => ( if false { trace_!($($arg)*); }) }
#[allow(unused)]
macro_rules! trace_ingest_init_lst { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) }
macro_rules! trace_ingest_init_lst { ($($arg:tt)*) => ( if false { trace_!($($arg)*); }) }
#[allow(unused)]
macro_rules! trace_ingest_minmax { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) }
macro_rules! trace_ingest_minmax { ($($arg:tt)*) => ( if false { trace_!($($arg)*); }) }
#[allow(unused)]
macro_rules! trace_ingest_event { ($($arg:tt)*) => ( if false { trace_!($($arg)*); }) }
#[allow(unused)]
macro_rules! trace_ingest_firsts { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) }
macro_rules! trace_ingest_firsts { ($($arg:tt)*) => ( if false { trace_!($($arg)*); }) }
#[allow(unused)]
macro_rules! trace_ingest_finish_bin { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) }
macro_rules! trace_ingest_finish_bin { ($($arg:tt)*) => ( if false { trace_!($($arg)*); }) }
#[allow(unused)]
macro_rules! trace_ingest_container { ($($arg:tt)*) => ( if false { trace_!($($arg)*); }) }

View File

@@ -5,11 +5,9 @@ use futures_util::FutureExt;
use futures_util::Stream;
use futures_util::StreamExt;
use futures_util::TryStreamExt;
use items_0::on_sitemty_data;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
use items_0::try_map_sitemty_data;
use items_2::channelevents::ChannelEvents;
use netpod::log::*;
use netpod::ChConf;

View File

@@ -2,6 +2,7 @@ use crate::events2::events::EventReadOpts;
use crate::events2::prepare::StmtsEvents;
use crate::range::ScyllaSeriesRange;
use crate::worker::ScyllaQueue;
use core::fmt;
use err::thiserror;
use err::ThisError;
use futures_util::Future;
@@ -24,6 +25,7 @@ use scylla::Session;
use series::SeriesId;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Instant;
use tracing::Instrument;
#[allow(unused)]
@@ -44,10 +46,12 @@ pub enum Error {
ScyllaTypeConv(#[from] scylla::cql_to_rust::FromRowError),
ScyllaWorker(Box<crate::worker::Error>),
MissingQuery(String),
NotTokenAware,
RangeEndOverflow,
InvalidFuture,
TestError(String),
Logic,
TodoUnsupported,
}
impl From<crate::worker::Error> for Error {
@@ -66,11 +70,12 @@ pub(super) trait ValTy: Sized + 'static {
fn default() -> Self;
fn is_valueblob() -> bool;
fn st_name() -> &'static str;
fn read_next_values(
fn read_next_values_trait(
opts: ReadNextValuesOpts,
jobtrace: ReadJobTrace,
scy: Arc<Session>,
stmts: Arc<StmtsEvents>,
) -> Pin<Box<dyn Future<Output = Result<Box<dyn Events>, Error>> + Send>>;
) -> Pin<Box<dyn Future<Output = Result<(Box<dyn Events>, ReadJobTrace), Error>> + Send>>;
fn convert_rows(
rows: Vec<Row>,
range: ScyllaSeriesRange,
@@ -112,12 +117,13 @@ macro_rules! impl_scaty_scalar {
$st_name
}
fn read_next_values(
fn read_next_values_trait(
opts: ReadNextValuesOpts,
jobtrace: ReadJobTrace,
scy: Arc<Session>,
stmts: Arc<StmtsEvents>,
) -> Pin<Box<dyn Future<Output = Result<Box<dyn Events>, Error>> + Send>> {
Box::pin(read_next_values_2::<Self>(opts, scy, stmts))
) -> Pin<Box<dyn Future<Output = Result<(Box<dyn Events>, ReadJobTrace), Error>> + Send>> {
Box::pin(read_next_values_2::<Self>(opts, jobtrace, scy, stmts))
}
fn convert_rows(
@@ -178,12 +184,13 @@ macro_rules! impl_scaty_array {
$st_name
}
fn read_next_values(
fn read_next_values_trait(
opts: ReadNextValuesOpts,
jobtrace: ReadJobTrace,
scy: Arc<Session>,
stmts: Arc<StmtsEvents>,
) -> Pin<Box<dyn Future<Output = Result<Box<dyn Events>, Error>> + Send>> {
Box::pin(read_next_values_2::<Self>(opts, scy, stmts))
) -> Pin<Box<dyn Future<Output = Result<(Box<dyn Events>, ReadJobTrace), Error>> + Send>> {
Box::pin(read_next_values_2::<Self>(opts, jobtrace, scy, stmts))
}
fn convert_rows(
@@ -231,13 +238,13 @@ impl ValTy for EnumVariant {
"enum"
}
fn read_next_values(
fn read_next_values_trait(
opts: ReadNextValuesOpts,
jobtrace: ReadJobTrace,
scy: Arc<Session>,
stmts: Arc<StmtsEvents>,
) -> Pin<Box<dyn Future<Output = Result<Box<dyn Events>, Error>> + Send>> {
let fut = read_next_values_2::<Self>(opts, scy, stmts);
Box::pin(fut)
) -> Pin<Box<dyn Future<Output = Result<(Box<dyn Events>, ReadJobTrace), Error>> + Send>> {
Box::pin(read_next_values_2::<Self>(opts, jobtrace, scy, stmts))
}
fn convert_rows(
@@ -283,12 +290,13 @@ impl ValTy for Vec<String> {
"string"
}
fn read_next_values(
fn read_next_values_trait(
opts: ReadNextValuesOpts,
jobtrace: ReadJobTrace,
scy: Arc<Session>,
stmts: Arc<StmtsEvents>,
) -> Pin<Box<dyn Future<Output = Result<Box<dyn Events>, Error>> + Send>> {
let fut = read_next_values_2::<Self>(opts, scy, stmts);
) -> Pin<Box<dyn Future<Output = Result<(Box<dyn Events>, ReadJobTrace), Error>> + Send>> {
let fut = read_next_values_2::<Self>(opts, jobtrace, scy, stmts);
Box::pin(fut)
}
@@ -329,6 +337,51 @@ impl_scaty_array!(Vec<f32>, f32, Vec<f32>, "f32", "f32");
impl_scaty_array!(Vec<f64>, f64, Vec<f64>, "f64", "f64");
impl_scaty_array!(Vec<bool>, bool, Vec<bool>, "bool", "bool");
#[derive(Debug)]
pub enum ReadEventKind {
Create,
FutgenCallingReadNextValues,
FutgenFutureCreated,
CallExecuteIter,
ScyllaReadRow(u32),
ScyllaReadRowDone(u32),
ReadNextValuesFutureDone,
EventsStreamRtSees(u32),
}
#[derive(Debug)]
pub struct ReadJobTrace {
jobid: u64,
ts0: Instant,
events: Vec<(Instant, ReadEventKind)>,
}
impl ReadJobTrace {
pub fn new() -> Self {
static JOBID: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
Self {
jobid: JOBID.fetch_add(1, std::sync::atomic::Ordering::AcqRel),
ts0: Instant::now(),
events: Vec::with_capacity(128),
}
}
pub fn add_event_now(&mut self, kind: ReadEventKind) {
self.events.push((Instant::now(), kind))
}
}
impl fmt::Display for ReadJobTrace {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "ReadJobTrace jobid {jid}", jid = self.jobid)?;
for (ts, kind) in &self.events {
let dt = 1e3 * ts.saturating_duration_since(self.ts0).as_secs_f32();
write!(fmt, "\njobid {jid:4} {dt:7.2} {kind:?}", jid = self.jobid)?;
}
Ok(())
}
}
#[derive(Debug)]
pub(super) struct ReadNextValuesOpts {
rt: RetentionTime,
@@ -362,15 +415,25 @@ impl ReadNextValuesOpts {
}
}
pub(super) async fn read_next_values<ST>(opts: ReadNextValuesOpts) -> Result<Box<dyn Events>, Error>
pub(super) struct ReadNextValuesParams {
pub opts: ReadNextValuesOpts,
pub jobtrace: ReadJobTrace,
}
pub(super) async fn read_next_values<ST>(params: ReadNextValuesParams) -> Result<(Box<dyn Events>, ReadJobTrace), Error>
where
ST: ValTy,
{
let opts = params.opts;
let jobtrace = params.jobtrace;
// TODO could take scyqeue out of opts struct.
let scyqueue = opts.scyqueue.clone();
let level = taskrun::query_log_level();
let futgen = Box::new(move |scy: Arc<Session>, stmts: Arc<StmtsEvents>| {
let futgen = move |scy: Arc<Session>, stmts: Arc<StmtsEvents>, mut jobtrace: ReadJobTrace| {
// TODO avoid this
// opts.jobtrace = jobtrace;
let fut = async move {
// let jobtrace = &mut opts.jobtrace;
let logspan = if level == Level::DEBUG {
tracing::span!(Level::INFO, "log_span_debug")
} else if level == Level::TRACE {
@@ -378,25 +441,34 @@ where
} else {
tracing::Span::none()
};
ST::read_next_values(opts, scy, stmts)
.instrument(logspan)
.await
.map_err(crate::worker::Error::from)
jobtrace.add_event_now(ReadEventKind::FutgenCallingReadNextValues);
let fut = ST::read_next_values_trait(opts, jobtrace, scy, stmts).instrument(logspan);
match fut.await.map_err(crate::worker::Error::from) {
Ok((ret, mut jobtrace)) => {
jobtrace.add_event_now(ReadEventKind::ReadNextValuesFutureDone);
Ok((ret, jobtrace))
}
Err(e) => Err(e),
}
};
Box::pin(fut) as Pin<Box<dyn Future<Output = Result<Box<dyn Events>, crate::worker::Error>> + Send>>
});
let res = scyqueue.read_next_values(futgen).await?;
Ok(res)
Box::pin(fut)
as Pin<Box<dyn Future<Output = Result<(Box<dyn Events>, ReadJobTrace), crate::worker::Error>> + Send>>
};
let (res, jobtrace) = scyqueue.read_next_values(futgen, jobtrace).await?;
Ok((res, jobtrace))
}
async fn read_next_values_2<ST>(
opts: ReadNextValuesOpts,
mut jobtrace: ReadJobTrace,
scy: Arc<Session>,
stmts: Arc<StmtsEvents>,
) -> Result<Box<dyn Events>, Error>
) -> Result<(Box<dyn Events>, ReadJobTrace), Error>
where
ST: ValTy,
{
let use_method_2 = true;
trace!("read_next_values_2 {:?} st_name {}", opts, ST::st_name());
let series = opts.series;
let ts_msp = opts.ts_msp;
@@ -429,6 +501,15 @@ where
.lsp(!opts.fwd, with_values)
.shape(ST::is_valueblob())
.st(ST::st_name())?;
let qu = {
let mut qu = qu.clone();
if qu.is_token_aware() == false {
return Err(Error::NotTokenAware);
}
qu.set_page_size(10000);
// qu.disable_paging();
qu
};
let params = (
series as i64,
ts_msp.ms() as i64,
@@ -436,14 +517,58 @@ where
ts_lsp_max.ns() as i64,
);
trace!("FWD event search params {:?}", params);
jobtrace.add_event_now(ReadEventKind::CallExecuteIter);
let mut res = scy.execute_iter(qu.clone(), params).await?;
let mut rows = Vec::new();
while let Some(x) = res.next().await {
rows.push(x?);
if use_method_2 == false {
let mut rows = Vec::new();
while let Some(x) = res.next().await {
rows.push(x?);
}
let mut last_before = None;
let ret = <ST as ValTy>::convert_rows(rows, range, ts_msp, with_values, !opts.fwd, &mut last_before)?;
ret
} else {
let mut ret = <ST as ValTy>::Container::empty();
// TODO must branch already here depending on what input columns we expect
if with_values {
if <ST as ValTy>::is_valueblob() {
let mut it = res.into_typed::<(i64, Vec<u8>)>();
while let Some(x) = it.next().await {
let row = x?;
let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64);
let value = <ST as ValTy>::from_valueblob(row.1);
ret.push(ts.ns(), 0, value);
}
ret
} else {
let mut i = 0;
let mut it = res.into_typed::<(i64, ST::ScyTy)>();
while let Some(x) = it.next().await {
let row = x?;
let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64);
let value = <ST as ValTy>::from_scyty(row.1);
ret.push(ts.ns(), 0, value);
i += 1;
if i % 2000 == 0 {
jobtrace.add_event_now(ReadEventKind::ScyllaReadRow(i));
}
}
{
jobtrace.add_event_now(ReadEventKind::ScyllaReadRowDone(i));
}
ret
}
} else {
let mut it = res.into_typed::<(i64,)>();
while let Some(x) = it.next().await {
let row = x?;
let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64);
let value = <ST as ValTy>::default();
ret.push(ts.ns(), 0, value);
}
ret
}
}
let mut last_before = None;
let ret = ST::convert_rows(rows, range, ts_msp, with_values, !opts.fwd, &mut last_before)?;
ret
} else {
let ts_lsp_max = if ts_msp.ns() < range.beg() {
range.beg().delta(ts_msp.ns())
@@ -477,7 +602,7 @@ where
};
trace!("read ts_msp {} len {}", ts_msp.fmt(), ret.len());
let ret = Box::new(ret);
Ok(ret)
Ok((ret, jobtrace))
}
fn convert_rows_0<ST: ValTy>(

View File

@@ -1,5 +1,6 @@
use super::msp::MspStreamRt;
use crate::events::read_next_values;
use crate::events::ReadJobTrace;
use crate::events::ReadNextValuesOpts;
use crate::range::ScyllaSeriesRange;
use crate::worker::ScyllaQueue;
@@ -27,12 +28,7 @@ use std::task::Poll;
#[allow(unused)]
macro_rules! trace_fetch {
($($arg:tt)*) => {
if true {
trace!($($arg)*);
}
};
}
($($arg:tt)*) => ( if true { trace!($($arg)*); } ) }
#[allow(unused)]
macro_rules! trace_emit {
@@ -90,14 +86,75 @@ pub enum Error {
Logic,
Merge(#[from] items_0::MergeError),
TruncateLogic,
AlreadyTaken,
}
struct FetchMsp {
fut: Pin<Box<dyn Future<Output = Option<Result<TsMs, crate::events2::msp::Error>>> + Send>>,
}
type ReadEventsFutOut = Result<(Box<dyn Events>, ReadJobTrace), crate::events2::events::Error>;
type FetchEventsFut = Pin<Box<dyn Future<Output = ReadEventsFutOut> + Send>>;
enum Fst<F>
where
F: Future + Unpin,
<F as Future>::Output: Unpin,
{
Ongoing(F),
Ready(<F as Future>::Output),
Taken,
}
impl<F> Fst<F>
where
F: Future + Unpin,
<F as Future>::Output: Unpin,
{
fn take_if_ready(&mut self) -> Poll<Option<<F as Future>::Output>> {
use Poll::*;
match self {
Fst::Ongoing(_) => Pending,
Fst::Ready(_) => match core::mem::replace(self, Fst::Taken) {
Fst::Ready(x) => Ready(Some(x)),
_ => panic!(),
},
Fst::Taken => Ready(None),
}
}
}
impl<F> Future for Fst<F>
where
F: Future + Unpin,
<F as Future>::Output: Unpin,
{
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
use Poll::*;
match self.as_mut().get_mut() {
Fst::Ongoing(fut) => match fut.poll_unpin(cx) {
Ready(x) => {
*self = Fst::Ready(x);
Ready(())
}
Pending => Pending,
},
Fst::Ready(_) => Ready(()),
Fst::Taken => Ready(()),
}
}
}
struct FetchEvents2 {
fut: Fst<FetchEventsFut>,
}
struct FetchEvents {
fut: Pin<Box<dyn Future<Output = Result<Box<dyn Events>, crate::events2::events::Error>> + Send>>,
a: FetchEvents2,
b: Option<FetchEvents2>,
}
enum ReadingState {
@@ -182,7 +239,9 @@ impl EventsStreamRt {
ts_msp: TsMs,
bck: bool,
scyqueue: ScyllaQueue,
) -> Pin<Box<dyn Future<Output = Result<Box<dyn Events>, Error>> + Send>> {
jobtrace: ReadJobTrace,
) -> Pin<Box<dyn Future<Output = Result<(Box<dyn Events>, ReadJobTrace), Error>> + Send>> {
trace!("make_read_events_fut --- {} ---", ts_msp);
let opts = ReadNextValuesOpts::new(
self.rt.clone(),
self.series.clone(),
@@ -203,34 +262,35 @@ impl EventsStreamRt {
scalar_type
);
let fut = async move {
let params = crate::events::ReadNextValuesParams { opts, jobtrace };
let ret = match &shape {
Shape::Scalar => match &scalar_type {
ScalarType::U8 => read_next_values::<u8>(opts).await,
ScalarType::U16 => read_next_values::<u16>(opts).await,
ScalarType::U32 => read_next_values::<u32>(opts).await,
ScalarType::U64 => read_next_values::<u64>(opts).await,
ScalarType::I8 => read_next_values::<i8>(opts).await,
ScalarType::I16 => read_next_values::<i16>(opts).await,
ScalarType::I32 => read_next_values::<i32>(opts).await,
ScalarType::I64 => read_next_values::<i64>(opts).await,
ScalarType::F32 => read_next_values::<f32>(opts).await,
ScalarType::F64 => read_next_values::<f64>(opts).await,
ScalarType::BOOL => read_next_values::<bool>(opts).await,
ScalarType::STRING => read_next_values::<String>(opts).await,
ScalarType::Enum => read_next_values::<EnumVariant>(opts).await,
ScalarType::U8 => read_next_values::<u8>(params).await,
ScalarType::U16 => read_next_values::<u16>(params).await,
ScalarType::U32 => read_next_values::<u32>(params).await,
ScalarType::U64 => read_next_values::<u64>(params).await,
ScalarType::I8 => read_next_values::<i8>(params).await,
ScalarType::I16 => read_next_values::<i16>(params).await,
ScalarType::I32 => read_next_values::<i32>(params).await,
ScalarType::I64 => read_next_values::<i64>(params).await,
ScalarType::F32 => read_next_values::<f32>(params).await,
ScalarType::F64 => read_next_values::<f64>(params).await,
ScalarType::BOOL => read_next_values::<bool>(params).await,
ScalarType::STRING => read_next_values::<String>(params).await,
ScalarType::Enum => read_next_values::<EnumVariant>(params).await,
},
Shape::Wave(_) => match &scalar_type {
ScalarType::U8 => read_next_values::<Vec<u8>>(opts).await,
ScalarType::U16 => read_next_values::<Vec<u16>>(opts).await,
ScalarType::U32 => read_next_values::<Vec<u32>>(opts).await,
ScalarType::U64 => read_next_values::<Vec<u64>>(opts).await,
ScalarType::I8 => read_next_values::<Vec<i8>>(opts).await,
ScalarType::I16 => read_next_values::<Vec<i16>>(opts).await,
ScalarType::I32 => read_next_values::<Vec<i32>>(opts).await,
ScalarType::I64 => read_next_values::<Vec<i64>>(opts).await,
ScalarType::F32 => read_next_values::<Vec<f32>>(opts).await,
ScalarType::F64 => read_next_values::<Vec<f64>>(opts).await,
ScalarType::BOOL => read_next_values::<Vec<bool>>(opts).await,
ScalarType::U8 => read_next_values::<Vec<u8>>(params).await,
ScalarType::U16 => read_next_values::<Vec<u16>>(params).await,
ScalarType::U32 => read_next_values::<Vec<u32>>(params).await,
ScalarType::U64 => read_next_values::<Vec<u64>>(params).await,
ScalarType::I8 => read_next_values::<Vec<i8>>(params).await,
ScalarType::I16 => read_next_values::<Vec<i16>>(params).await,
ScalarType::I32 => read_next_values::<Vec<i32>>(params).await,
ScalarType::I64 => read_next_values::<Vec<i64>>(params).await,
ScalarType::F32 => read_next_values::<Vec<f32>>(params).await,
ScalarType::F64 => read_next_values::<Vec<f64>>(params).await,
ScalarType::BOOL => read_next_values::<Vec<bool>>(params).await,
ScalarType::STRING => {
warn!("read not yet supported {:?} {:?}", shape, scalar_type);
err::todoval()
@@ -273,10 +333,14 @@ impl EventsStreamRt {
fn setup_bck_read(&mut self) {
if let Some(ts) = self.msp_buf_bck.pop_back() {
trace_fetch!("setup_bck_read {}", ts.fmt());
let jobtrace = ReadJobTrace::new();
let scyqueue = self.scyqueue.clone();
let fut = self.make_read_events_fut(ts, true, scyqueue);
let fut = self.make_read_events_fut(ts, true, scyqueue, jobtrace);
self.state = State::ReadingBck(ReadingBck {
reading_state: ReadingState::FetchEvents(FetchEvents { fut }),
reading_state: ReadingState::FetchEvents(FetchEvents {
a: FetchEvents2 { fut: Fst::Ongoing(fut) },
b: None,
}),
});
} else {
trace_fetch!("setup_bck_read no msp");
@@ -292,13 +356,66 @@ impl EventsStreamRt {
}
fn setup_fwd_read(&mut self) {
// TODO always try to setup all available slots.
if let Some(ts) = self.msp_buf.pop_front() {
trace_fetch!("setup_fwd_read {}", ts.fmt());
let jobtrace = ReadJobTrace::new();
let scyqueue = self.scyqueue.clone();
let fut = self.make_read_events_fut(ts, false, scyqueue);
self.state = State::ReadingFwd(ReadingFwd {
reading_state: ReadingState::FetchEvents(FetchEvents { fut }),
});
let fut = self.make_read_events_fut(ts, false, scyqueue, jobtrace);
// Assert that this fn is only called when there is at least one slot available.
// At the moment with 2 slots, this means that the 2nd is always empty.
// TODO careful in general, MUST NOT overwrite the secondary slot with None, there could be something running.
if let State::ReadingFwd(st2) = &self.state {
if let ReadingState::FetchEvents(st3) = &st2.reading_state {
if st3.b.is_some() {
panic!()
} else {
}
} else {
self.state = State::ReadingFwd(ReadingFwd {
reading_state: ReadingState::FetchEvents(FetchEvents {
a: FetchEvents2 { fut: Fst::Ongoing(fut) },
b: None,
}),
});
}
} else {
self.state = State::ReadingFwd(ReadingFwd {
reading_state: ReadingState::FetchEvents(FetchEvents {
a: FetchEvents2 { fut: Fst::Ongoing(fut) },
b: None,
}),
});
}
if let State::ReadingFwd(st2) = &self.state {
if let ReadingState::FetchEvents(st3) = &st2.reading_state {
if st3.b.is_some() {
panic!()
} else {
// Try the same with the 2nd slot
if let Some(ts) = self.msp_buf.pop_front() {
trace_fetch!("setup_fwd_read {} SECONDARY SLOT", ts.fmt());
let jobtrace = ReadJobTrace::new();
let scyqueue = self.scyqueue.clone();
let fut = self.make_read_events_fut(ts, false, scyqueue, jobtrace);
if let State::ReadingFwd(st2) = &mut self.state {
if let ReadingState::FetchEvents(st3) = &mut st2.reading_state {
if st3.b.is_some() {
panic!()
} else {
st3.b = Some(FetchEvents2 { fut: Fst::Ongoing(fut) });
}
}
}
} else {
// nothing to do
}
}
}
}
} else {
trace_fetch!("setup_fwd_read no msp");
let fut = Self::make_msp_read_fut(&mut self.msp_inp);
@@ -424,38 +541,51 @@ impl Stream for EventsStreamRt {
}
Pending => Pending,
},
ReadingState::FetchEvents(st2) => match st2.fut.poll_unpin(cx) {
Ready(Ok(mut x)) => {
use items_2::merger::Mergeable;
trace_fetch!("ReadingBck FetchEvents got len {}", x.len());
for ts in Mergeable::tss(&x) {
trace_every_event!("ReadingBck FetchEvents ts {}", ts.fmt());
}
if let Some(ix) = Mergeable::find_highest_index_lt(&x, self.range.beg().ns()) {
trace_fetch!("ReadingBck FetchEvents find_highest_index_lt {:?}", ix);
let mut y = Mergeable::new_empty(&x);
match Mergeable::drain_into(&mut x, &mut y, (ix, 1 + ix)) {
Ok(()) => {
trace_fetch!("ReadingBck FetchEvents drained y len {:?}", y.len());
self.out.push_back(y);
self.transition_to_fwd_read();
ReadingState::FetchEvents(st2) => match st2.a.fut.poll_unpin(cx) {
Ready(()) => match st2.a.fut.take_if_ready() {
Ready(Some(x)) => match x {
Ok((mut evs, jobtrace)) => {
use items_2::merger::Mergeable;
trace!("ReadingBck {jobtrace}");
trace_fetch!("ReadingBck FetchEvents got len {}", evs.len());
for ts in Mergeable::tss(&evs) {
trace_every_event!("ReadingBck FetchEvents ts {}", ts.fmt());
}
if let Some(ix) = Mergeable::find_highest_index_lt(&evs, self.range.beg().ns()) {
trace_fetch!("ReadingBck FetchEvents find_highest_index_lt {:?}", ix);
let mut y = Mergeable::new_empty(&evs);
match Mergeable::drain_into(&mut evs, &mut y, (ix, 1 + ix)) {
Ok(()) => {
trace_fetch!("ReadingBck FetchEvents drained y len {:?}", y.len());
self.out.push_back(y);
self.transition_to_fwd_read();
continue;
}
Err(e) => {
self.state = State::Done;
Ready(Some(Err(e.into())))
}
}
} else {
trace_fetch!("ReadingBck FetchEvents find_highest_index_lt None");
self.setup_bck_read();
continue;
}
Err(e) => {
self.state = State::Done;
Ready(Some(Err(e.into())))
}
}
} else {
trace_fetch!("ReadingBck FetchEvents find_highest_index_lt None");
self.setup_bck_read();
continue;
Err(e) => {
self.state = State::Done;
Ready(Some(Err(e)))
}
},
Ready(None) => {
self.state = State::Done;
Ready(Some(Err(Error::AlreadyTaken)))
}
}
Ready(Err(e)) => {
self.state = State::Done;
Ready(Some(Err(e.into())))
}
Pending => {
self.state = State::Done;
Ready(Some(Err(Error::Logic)))
}
},
Pending => Pending,
},
},
@@ -474,23 +604,43 @@ impl Stream for EventsStreamRt {
}
Pending => Pending,
},
ReadingState::FetchEvents(st2) => match st2.fut.poll_unpin(cx) {
Ready(Ok(x)) => {
use items_2::merger::Mergeable;
trace_fetch!("ReadingFwd FetchEvents got len {:?}", x.len());
for ts in Mergeable::tss(&x) {
trace_every_event!("ReadingFwd FetchEvents ts {}", ts.fmt());
ReadingState::FetchEvents(st2) => {
let _ = st2.a.fut.poll_unpin(cx);
if let Some(st3) = st2.b.as_mut() {
let _ = st3.fut.poll_unpin(cx);
}
match st2.a.fut.take_if_ready() {
Ready(Some(x)) => {
if let Some(b) = st2.b.take() {
st2.a = b;
}
match x {
Ok((evs, mut jobtrace)) => {
jobtrace.add_event_now(crate::events::ReadEventKind::EventsStreamRtSees(
evs.len() as u32,
));
use items_2::merger::Mergeable;
trace!("ReadingFwd {jobtrace}");
for ts in Mergeable::tss(&evs) {
trace_every_event!("ReadingFwd FetchEvents ts {}", ts.fmt());
}
self.out.push_back(evs);
self.setup_fwd_read();
continue;
}
Err(e) => {
self.state = State::Done;
Ready(Some(Err(e.into())))
}
}
}
self.out.push_back(x);
self.setup_fwd_read();
continue;
Ready(None) => {
self.state = State::Done;
Ready(Some(Err(Error::Logic)))
}
Pending => Pending,
}
Ready(Err(e)) => {
self.state = State::Done;
Ready(Some(Err(e.into())))
}
Pending => Pending,
},
}
},
State::InputDone => {
if self.out.len() == 0 {

View File

@@ -1,4 +1,5 @@
use crate::conn::create_scy_session_no_ks;
use crate::events::ReadJobTrace;
use crate::events2::prepare::StmtsCache;
use crate::events2::prepare::StmtsEvents;
use crate::range::ScyllaSeriesRange;
@@ -74,11 +75,12 @@ struct ReadNextValues {
dyn FnOnce(
Arc<Session>,
Arc<StmtsEvents>,
) -> Pin<Box<dyn Future<Output = Result<Box<dyn Events>, Error>> + Send>>
ReadJobTrace,
) -> Pin<Box<dyn Future<Output = Result<(Box<dyn Events>, ReadJobTrace), Error>> + Send>>
+ Send,
>,
// fut: Pin<Box<dyn Future<Output = Result<Box<dyn Events>, Error>> + Send>>,
tx: Sender<Result<Box<dyn Events>, Error>>,
tx: Sender<Result<(Box<dyn Events>, ReadJobTrace), Error>>,
jobtrace: ReadJobTrace,
}
impl fmt::Debug for ReadNextValues {
@@ -107,12 +109,17 @@ impl ScyllaQueue {
Ok(res)
}
pub async fn read_next_values<F>(&self, futgen: F) -> Result<Box<dyn Events>, Error>
pub async fn read_next_values<F>(
&self,
futgen: F,
jobtrace: ReadJobTrace,
) -> Result<(Box<dyn Events>, ReadJobTrace), Error>
where
F: FnOnce(
Arc<Session>,
Arc<StmtsEvents>,
) -> Pin<Box<dyn Future<Output = Result<Box<dyn Events>, Error>> + Send>>
ReadJobTrace,
) -> Pin<Box<dyn Future<Output = Result<(Box<dyn Events>, ReadJobTrace), Error>> + Send>>
+ Send
+ 'static,
{
@@ -120,6 +127,7 @@ impl ScyllaQueue {
let job = Job::ReadNextValues(ReadNextValues {
futgen: Box::new(futgen),
tx,
jobtrace,
});
self.tx.send(job).await.map_err(|_| Error::ChannelSend)?;
let res = rx.recv().await.map_err(|_| Error::ChannelRecv)??;
@@ -243,7 +251,7 @@ impl ScyllaWorker {
}
}
Job::ReadNextValues(job) => {
let fut = (job.futgen)(scy.clone(), stmts.clone());
let fut = (job.futgen)(scy.clone(), stmts.clone(), job.jobtrace);
let res = fut.await;
if job.tx.send(res.map_err(Into::into)).await.is_err() {
// TODO count for stats

View File

@@ -23,7 +23,10 @@ macro_rules! trace_emit { ($($arg:tt)*) => ( if true { trace!($($arg)*); } ) }
#[derive(Debug, ThisError)]
#[cstm(name = "ReadingBinnedFromEvents")]
pub enum Error {}
pub enum Error {
ExpectTimerange,
ExpectTimeweighted,
}
pub struct BinnedFromEvents {
stream: Pin<Box<dyn Stream<Item = Sitemty<BinsBoxed>> + Send>>,
@@ -37,7 +40,7 @@ impl BinnedFromEvents {
read_provider: Arc<dyn EventsReadProvider>,
) -> Result<Self, Error> {
if !evq.range().is_time() {
panic!();
return Err(Error::ExpectTimerange);
}
let stream = read_provider.read(evq);
let stream = ConvertForBinning::new(Box::pin(stream));
@@ -45,17 +48,17 @@ impl BinnedFromEvents {
let stream = Box::pin(stream);
BinnedEventsTimeweightStream::new(range, stream)
} else {
panic!("non-weighted TODO")
return Err(Error::ExpectTimeweighted);
};
let stream = stream.map(|item| match item {
Ok(x) => match x {
StreamItem::DataItem(x) => match x {
RangeCompletableItem::Data(x) => {
debug!("see item {:?}", x);
trace_emit!("see item {:?}", x);
Ok(StreamItem::DataItem(RangeCompletableItem::Data(x)))
}
RangeCompletableItem::RangeComplete => {
info!("BinnedFromEvents sees range final");
debug!("BinnedFromEvents sees range final");
Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))
}
},