More pipelined reads

This commit is contained in:
Dominik Werder
2024-10-30 16:12:53 +01:00
parent eb49ee9296
commit b0d9d5f0a8
9 changed files with 359 additions and 245 deletions

View File

@@ -157,13 +157,20 @@ impl IntoBody for ToJsonBody {
pub fn body_stream<S, I, E>(stream: S) -> StreamBody
where
S: Stream<Item = Result<I, E>> + Send + 'static,
I: Into<Bytes>,
E: fmt::Display,
I: Into<Bytes> + Send,
E: fmt::Display + Send,
{
let stream = stream.map(|x| match x {
Ok(x) => Ok(Frame::data(x.into())),
Err(_e) => Err(BodyError::Bad),
});
let stream = stream
.inspect(|x| {
if let Err(e) = x {
error!("observe error in body stream: {e}");
}
})
.take_while(|x| futures_util::future::ready(x.is_ok()))
.map(|x| match x {
Ok(x) => Ok(Frame::data(x.into())),
Err(_e) => Err(BodyError::Bad),
});
StreamBody::new(Box::pin(stream))
}

View File

@@ -1 +0,0 @@

View File

@@ -1,5 +1,4 @@
pub mod accounting;
pub mod binnedcollected;
pub mod binning;
pub mod binsdim0;
pub mod binsxbindim0;

View File

@@ -42,6 +42,7 @@ pub async fn scylla_channel_event_stream(
evq.need_one_before_range(),
evq.need_value_data(),
evq.transform().enum_as_string().unwrap_or(false),
evq.settings().scylla_read_queue_len().unwrap_or(1),
);
let stream: Pin<Box<dyn Stream<Item = _> + Send>> = if let Some(rt) = evq.use_rt() {
let x = scyllaconn::events2::events::EventsStreamRt::new(

View File

@@ -87,6 +87,8 @@ pub struct BinnedQuery {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub merger_out_len_max: Option<u32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
scylla_read_queue_len: Option<u32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
test_do_wasm: Option<String>,
#[serde(default)]
log_level: String,
@@ -108,6 +110,7 @@ impl BinnedQuery {
disk_stats_every: None,
timeout_content: None,
merger_out_len_max: None,
scylla_read_queue_len: None,
test_do_wasm: None,
log_level: String::new(),
use_rt: None,
@@ -164,6 +167,10 @@ impl BinnedQuery {
self.merger_out_len_max
}
pub fn scylla_read_queue_len(&self) -> Option<u32> {
self.scylla_read_queue_len
}
pub fn set_series_id(&mut self, series: u64) {
self.channel.set_series(series);
}
@@ -285,6 +292,9 @@ impl FromUrl for BinnedQuery {
merger_out_len_max: pairs
.get("mergerOutLenMax")
.map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?,
scylla_read_queue_len: pairs
.get("scyllaReadQueueLen")
.map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?,
test_do_wasm: pairs.get("testDoWasm").map(|x| String::from(x)),
log_level: pairs.get("log_level").map_or(String::new(), String::from),
use_rt: pairs.get("useRt").map_or(Ok(None), |k| {
@@ -351,6 +361,9 @@ impl AppendToUrl for BinnedQuery {
if let Some(x) = self.merger_out_len_max.as_ref() {
g.append_pair("mergerOutLenMax", &format!("{}", x));
}
if let Some(x) = self.scylla_read_queue_len.as_ref() {
g.append_pair("scyllaReadQueueLen", &x.to_string());
}
if let Some(x) = &self.test_do_wasm {
g.append_pair("testDoWasm", &x);
}

View File

@@ -60,6 +60,8 @@ pub struct PlainEventsQuery {
test_do_wasm: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
merger_out_len_max: Option<u32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
scylla_read_queue_len: Option<u32>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
create_errors: Vec<String>,
#[serde(default)]
@@ -91,6 +93,7 @@ impl PlainEventsQuery {
do_test_stream_error: false,
test_do_wasm: None,
merger_out_len_max: None,
scylla_read_queue_len: None,
create_errors: Vec::new(),
log_level: String::new(),
use_rt: None,
@@ -160,6 +163,10 @@ impl PlainEventsQuery {
self.merger_out_len_max
}
pub fn scylla_read_queue_len(&self) -> Option<u32> {
self.scylla_read_queue_len
}
pub fn do_test_main_error(&self) -> bool {
self.do_test_main_error
}
@@ -292,6 +299,9 @@ impl FromUrl for PlainEventsQuery {
merger_out_len_max: pairs
.get("mergerOutLenMax")
.map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?,
scylla_read_queue_len: pairs
.get("scyllaReadQueueLen")
.map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?,
create_errors: pairs
.get("create_errors")
.map(|x| x.split(",").map(|x| x.to_string()).collect())
@@ -357,6 +367,9 @@ impl AppendToUrl for PlainEventsQuery {
if let Some(x) = self.merger_out_len_max.as_ref() {
g.append_pair("mergerOutLenMax", &x.to_string());
}
if let Some(x) = self.scylla_read_queue_len.as_ref() {
g.append_pair("scyllaReadQueueLen", &x.to_string());
}
if self.create_errors.len() != 0 {
g.append_pair("create_errors", &self.create_errors.join(","));
}
@@ -418,12 +431,17 @@ pub struct EventsSubQuerySettings {
create_errors: Vec<String>,
use_rt: Option<RetentionTime>,
merger_out_len_max: Option<u32>,
scylla_read_queue_len: Option<u32>,
}
impl EventsSubQuerySettings {
pub fn merger_out_len_max(&self) -> Option<u32> {
self.merger_out_len_max
}
pub fn scylla_read_queue_len(&self) -> Option<u32> {
self.scylla_read_queue_len
}
}
impl Default for EventsSubQuerySettings {
@@ -439,6 +457,7 @@ impl Default for EventsSubQuerySettings {
create_errors: Vec::new(),
use_rt: None,
merger_out_len_max: None,
scylla_read_queue_len: None,
}
}
}
@@ -457,6 +476,7 @@ impl From<&PlainEventsQuery> for EventsSubQuerySettings {
create_errors: value.create_errors.clone(),
use_rt: value.use_rt(),
merger_out_len_max: value.merger_out_len_max(),
scylla_read_queue_len: value.scylla_read_queue_len(),
}
}
}
@@ -476,6 +496,7 @@ impl From<&BinnedQuery> for EventsSubQuerySettings {
create_errors: Vec::new(),
use_rt: value.use_rt(),
merger_out_len_max: value.merger_out_len_max(),
scylla_read_queue_len: value.scylla_read_queue_len(),
}
}
}
@@ -495,6 +516,7 @@ impl From<&Api1Query> for EventsSubQuerySettings {
create_errors: Vec::new(),
use_rt: None,
merger_out_len_max: None,
scylla_read_queue_len: None,
}
}
}

View File

@@ -474,7 +474,7 @@ where
let ts_msp = opts.ts_msp;
let range = opts.range;
let table_name = ST::table_name();
let with_values = opts.readopts.with_values;
let with_values = opts.readopts.with_values();
if range.end() > TsNano::from_ns(i64::MAX as u64) {
return Err(Error::RangeEndOverflow);
}

View File

@@ -25,28 +25,19 @@ use std::collections::VecDeque;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use std::time::Duration;
#[allow(unused)]
macro_rules! trace_fetch {
($($arg:tt)*) => ( if true { trace!($($arg)*); } ) }
macro_rules! trace_fetch { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
#[allow(unused)]
macro_rules! trace_emit {
($($arg:tt)*) => {
if true {
trace!($($arg)*);
}
};
}
macro_rules! trace_msp_fetch { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
#[allow(unused)]
macro_rules! warn_item {
($($arg:tt)*) => {
if true {
debug!($($arg)*);
}
};
}
macro_rules! trace_emit { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
#[allow(unused)]
macro_rules! warn_item { ($($arg:tt)*) => ( if true { debug!($($arg)*); } ) }
#[allow(unused)]
macro_rules! trace_every_event {
@@ -59,19 +50,25 @@ macro_rules! trace_every_event {
#[derive(Debug, Clone)]
pub struct EventReadOpts {
pub with_values: bool,
pub enum_as_strings: bool,
pub one_before: bool,
with_values: bool,
enum_as_strings: bool,
one_before: bool,
qucap: u32,
}
impl EventReadOpts {
pub fn new(one_before: bool, with_values: bool, enum_as_strings: bool) -> Self {
pub fn new(one_before: bool, with_values: bool, enum_as_strings: bool, qucap: u32) -> Self {
Self {
one_before,
with_values,
enum_as_strings,
qucap,
}
}
pub fn with_values(&self) -> bool {
self.with_values
}
}
#[derive(Debug, ThisError)]
@@ -83,6 +80,8 @@ pub enum Error {
Unordered,
OutOfRange,
BadBatch,
ReadQueueEmptyBck,
ReadQueueEmptyFwd,
Logic,
Merge(#[from] items_0::MergeError),
TruncateLogic,
@@ -90,7 +89,7 @@ pub enum Error {
}
struct FetchMsp {
fut: Pin<Box<dyn Future<Output = Option<Result<TsMs, crate::events2::msp::Error>>> + Send>>,
fut: Pin<Box<dyn Future<Output = Result<Vec<TsMs>, crate::events2::msp::Error>> + Send>>,
}
type ReadEventsFutOut = Result<(Box<dyn Events>, ReadJobTrace), crate::events2::events::Error>;
@@ -148,13 +147,71 @@ where
}
}
struct ReadQueue {
cap: usize,
futs: VecDeque<Fst<FetchEventsFut>>,
}
impl ReadQueue {
fn new(cap: usize) -> Self {
Self {
cap,
futs: VecDeque::new(),
}
}
fn cap(&self) -> usize {
self.cap
}
fn len(&self) -> usize {
self.futs.len()
}
fn has_space(&self) -> bool {
self.len() < self.cap
}
fn push(&mut self, fut: FetchEventsFut) {
self.futs.push_back(Fst::Ongoing(fut));
}
}
impl Stream for ReadQueue {
type Item = <FetchEventsFut as Future>::Output;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
if self.futs.len() == 0 {
Ready(None)
} else {
for fut in self.futs.iter_mut() {
let _ = fut.poll_unpin(cx);
}
if let Some(Fst::Taken) = self.futs.front() {
error!("fut in queue Taken");
panic!()
} else {
if let Some(Fst::Ready(_)) = self.futs.front() {
if let Some(Fst::Ready(k)) = self.futs.pop_front() {
Ready(Some(k))
} else {
panic!()
}
} else {
Pending
}
}
}
}
}
struct FetchEvents2 {
fut: Fst<FetchEventsFut>,
}
struct FetchEvents {
a: FetchEvents2,
b: Option<FetchEvents2>,
qu: ReadQueue,
}
enum ReadingState {
@@ -192,6 +249,7 @@ pub struct EventsStreamRt {
out: VecDeque<Box<dyn Events>>,
out_cnt: u64,
ts_seen_max: u64,
qucap: usize,
}
impl EventsStreamRt {
@@ -206,6 +264,7 @@ impl EventsStreamRt {
let series = SeriesId::new(ch_conf.series());
let msp_inp = crate::events2::msp::MspStreamRt::new(rt.clone(), series, range.clone(), scyqueue.clone());
Self {
qucap: readopts.qucap as usize,
rt,
ch_conf,
series,
@@ -224,35 +283,56 @@ impl EventsStreamRt {
fn make_msp_read_fut(
msp_inp: &mut MspStreamRt,
) -> Pin<Box<dyn Future<Output = Option<Result<TsMs, crate::events2::msp::Error>>> + Send>> {
) -> Pin<Box<dyn Future<Output = Result<Vec<TsMs>, crate::events2::msp::Error>> + Send>> {
trace_fetch!("make_msp_read_fut");
let msp_inp = unsafe {
let ptr = msp_inp as *mut MspStreamRt;
&mut *ptr
};
let fut = Box::pin(msp_inp.next());
let fut = async {
let cap = 128;
let mut a = Vec::with_capacity(cap);
while let Some(x) = msp_inp.next().await {
match x {
Ok(x) => {
a.push(x);
}
Err(e) => {
return Err(e);
}
}
if a.len() >= cap {
break;
}
}
Ok(a)
};
let fut = Box::pin(fut);
fut
}
fn make_read_events_fut(
&mut self,
ts_msp: TsMs,
bck: bool,
rt: RetentionTime,
series: SeriesId,
range: ScyllaSeriesRange,
readopts: EventReadOpts,
ch_conf: ChConf,
scyqueue: ScyllaQueue,
jobtrace: ReadJobTrace,
) -> Pin<Box<dyn Future<Output = Result<(Box<dyn Events>, ReadJobTrace), Error>> + Send>> {
trace!("make_read_events_fut --- {} ---", ts_msp);
let opts = ReadNextValuesOpts::new(
self.rt.clone(),
self.series.clone(),
rt.clone(),
series.clone(),
ts_msp,
self.range.clone(),
range.clone(),
!bck,
self.readopts.clone(),
readopts.clone(),
scyqueue,
);
let scalar_type = self.ch_conf.scalar_type().clone();
let shape = self.ch_conf.shape().clone();
let scalar_type = ch_conf.scalar_type().clone();
let shape = ch_conf.shape().clone();
trace_fetch!(
"make_read_events_fut bck {} msp {:?} {} {:?} {:?}",
bck,
@@ -262,6 +342,9 @@ impl EventsStreamRt {
scalar_type
);
let fut = async move {
if false {
taskrun::tokio::time::sleep(Duration::from_millis(10)).await;
}
let params = crate::events::ReadNextValuesParams { opts, jobtrace };
let ret = match &shape {
Shape::Scalar => match &scalar_type {
@@ -335,12 +418,16 @@ impl EventsStreamRt {
trace_fetch!("setup_bck_read {}", ts.fmt());
let jobtrace = ReadJobTrace::new();
let scyqueue = self.scyqueue.clone();
let fut = self.make_read_events_fut(ts, true, scyqueue, jobtrace);
let rt = self.rt.clone();
let series = self.series.clone();
let range = self.range.clone();
let readopts = self.readopts.clone();
let ch_conf = self.ch_conf.clone();
let fut = Self::make_read_events_fut(ts, true, rt, series, range, readopts, ch_conf, scyqueue, jobtrace);
let mut qu = ReadQueue::new(self.qucap);
qu.push(fut);
self.state = State::ReadingBck(ReadingBck {
reading_state: ReadingState::FetchEvents(FetchEvents {
a: FetchEvents2 { fut: Fst::Ongoing(fut) },
b: None,
}),
reading_state: ReadingState::FetchEvents(FetchEvents { qu }),
});
} else {
trace_fetch!("setup_bck_read no msp");
@@ -356,68 +443,57 @@ impl EventsStreamRt {
}
fn setup_fwd_read(&mut self) {
// TODO always try to setup all available slots.
if let Some(ts) = self.msp_buf.pop_front() {
trace_fetch!("setup_fwd_read {}", ts.fmt());
let jobtrace = ReadJobTrace::new();
let scyqueue = self.scyqueue.clone();
let fut = self.make_read_events_fut(ts, false, scyqueue, jobtrace);
// Assert that this fn is only called when there is at least one slot available.
// At the moment with 2 slots, this means that the 2nd is always empty.
// TODO careful in general, MUST NOT overwrite the secondary slot with None, there could be something running.
if let State::ReadingFwd(st2) = &self.state {
if let ReadingState::FetchEvents(st3) = &st2.reading_state {
if st3.b.is_some() {
panic!()
} else {
}
if let State::ReadingFwd(st2) = &self.state {
if let ReadingState::FetchEvents(st3) = &st2.reading_state {
if st3.qu.has_space() {
} else {
self.state = State::ReadingFwd(ReadingFwd {
reading_state: ReadingState::FetchEvents(FetchEvents {
a: FetchEvents2 { fut: Fst::Ongoing(fut) },
b: None,
}),
});
panic!()
}
} else {
self.state = State::ReadingFwd(ReadingFwd {
reading_state: ReadingState::FetchEvents(FetchEvents {
a: FetchEvents2 { fut: Fst::Ongoing(fut) },
b: None,
qu: ReadQueue::new(self.qucap),
}),
});
}
} else {
self.state = State::ReadingFwd(ReadingFwd {
reading_state: ReadingState::FetchEvents(FetchEvents {
qu: ReadQueue::new(self.qucap),
}),
});
}
if let State::ReadingFwd(st2) = &self.state {
if let ReadingState::FetchEvents(st3) = &st2.reading_state {
if st3.b.is_some() {
panic!()
} else {
// Try the same with the 2nd slot
if let Some(ts) = self.msp_buf.pop_front() {
trace_fetch!("setup_fwd_read {} SECONDARY SLOT", ts.fmt());
let jobtrace = ReadJobTrace::new();
let scyqueue = self.scyqueue.clone();
let fut = self.make_read_events_fut(ts, false, scyqueue, jobtrace);
if let State::ReadingFwd(st2) = &mut self.state {
if let ReadingState::FetchEvents(st3) = &mut st2.reading_state {
if st3.b.is_some() {
panic!()
} else {
st3.b = Some(FetchEvents2 { fut: Fst::Ongoing(fut) });
}
}
}
} else {
// nothing to do
}
}
}
let qu = if let State::ReadingFwd(st2) = &mut self.state {
if let ReadingState::FetchEvents(st3) = &mut st2.reading_state {
&mut st3.qu
} else {
panic!()
}
} else {
trace_fetch!("setup_fwd_read no msp");
panic!()
};
trace_fetch!("setup_fwd_read {} {} BEFORE", self.msp_buf.len(), qu.len());
while qu.has_space() {
if let Some(ts) = self.msp_buf.pop_front() {
trace_fetch!("setup_fwd_read {} FILL A SLOT", ts.fmt());
let jobtrace = ReadJobTrace::new();
let scyqueue = self.scyqueue.clone();
let rt = self.rt.clone();
let series = self.series.clone();
let range = self.range.clone();
let readopts = self.readopts.clone();
let ch_conf = self.ch_conf.clone();
let fut =
Self::make_read_events_fut(ts, false, rt, series, range, readopts, ch_conf, scyqueue, jobtrace);
qu.push(fut);
} else {
break;
}
}
trace_fetch!("setup_fwd_read {} {} AFTER", self.msp_buf.len(), qu.len());
if self.msp_buf.len() == 0 && qu.len() == 0 {
trace_msp_fetch!("setup_fwd_read no msp");
let fut = Self::make_msp_read_fut(&mut self.msp_inp);
self.state = State::ReadingFwd(ReadingFwd {
reading_state: ReadingState::FetchMsp(FetchMsp { fut }),
@@ -431,6 +507,7 @@ impl Stream for EventsStreamRt {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
// return Ready(Some(Err(Error::Logic)));
loop {
if let Some(mut item) = self.out.pop_front() {
if !item.verify() {
@@ -521,126 +598,118 @@ impl Stream for EventsStreamRt {
}
State::ReadingBck(st) => match &mut st.reading_state {
ReadingState::FetchMsp(st2) => match st2.fut.poll_unpin(cx) {
Ready(Some(Ok(ts))) => {
trace_fetch!("ReadingBck FetchMsp {}", ts.fmt());
self.msp_buf.push_back(ts);
if ts.ns() >= self.range.beg() {
Ready(Ok(a)) => {
// trace_fetch!("ReadingBck FetchMsp {}", a.fmt());
if a.len() == 0 {
self.transition_to_bck_read();
continue;
} else {
let fut = Self::make_msp_read_fut(&mut self.msp_inp);
self.state = State::ReadingBck(ReadingBck {
reading_state: ReadingState::FetchMsp(FetchMsp { fut }),
});
for x in a {
self.msp_buf.push_back(x);
}
if let Some(ts) = self.msp_buf.back() {
if ts.ns() >= self.range.beg() {
self.transition_to_bck_read();
} else {
let fut = Self::make_msp_read_fut(&mut self.msp_inp);
self.state = State::ReadingBck(ReadingBck {
reading_state: ReadingState::FetchMsp(FetchMsp { fut }),
});
}
} else {
panic!("absolutely nothing to read");
}
continue;
}
continue;
}
Ready(Some(Err(e))) => Ready(Some(Err(e.into()))),
Ready(None) => {
self.transition_to_bck_read();
continue;
}
Ready(Err(e)) => Ready(Some(Err(e.into()))),
Pending => Pending,
},
ReadingState::FetchEvents(st2) => match st2.a.fut.poll_unpin(cx) {
Ready(()) => match st2.a.fut.take_if_ready() {
Ready(Some(x)) => match x {
Ok((mut evs, jobtrace)) => {
use items_2::merger::Mergeable;
trace!("ReadingBck {jobtrace}");
trace_fetch!("ReadingBck FetchEvents got len {}", evs.len());
for ts in Mergeable::tss(&evs) {
trace_every_event!("ReadingBck FetchEvents ts {}", ts.fmt());
}
if let Some(ix) = Mergeable::find_highest_index_lt(&evs, self.range.beg().ns()) {
trace_fetch!("ReadingBck FetchEvents find_highest_index_lt {:?}", ix);
let mut y = Mergeable::new_empty(&evs);
match Mergeable::drain_into(&mut evs, &mut y, (ix, 1 + ix)) {
Ok(()) => {
trace_fetch!("ReadingBck FetchEvents drained y len {:?}", y.len());
self.out.push_back(y);
self.transition_to_fwd_read();
continue;
}
Err(e) => {
self.state = State::Done;
Ready(Some(Err(e.into())))
}
ReadingState::FetchEvents(st2) => match st2.qu.poll_next_unpin(cx) {
Ready(Some(x)) => match x {
Ok((mut evs, jobtrace)) => {
use items_2::merger::Mergeable;
trace_fetch!("ReadingBck {jobtrace}");
trace_fetch!("ReadingBck FetchEvents got len {}", evs.len());
for ts in Mergeable::tss(&evs) {
trace_every_event!("ReadingBck FetchEvents ts {}", ts.fmt());
}
if let Some(ix) = Mergeable::find_highest_index_lt(&evs, self.range.beg().ns()) {
trace_fetch!("ReadingBck FetchEvents find_highest_index_lt {:?}", ix);
let mut y = Mergeable::new_empty(&evs);
match Mergeable::drain_into(&mut evs, &mut y, (ix, 1 + ix)) {
Ok(()) => {
trace_fetch!("ReadingBck FetchEvents drained y len {:?}", y.len());
self.out.push_back(y);
self.transition_to_fwd_read();
continue;
}
Err(e) => {
self.state = State::Done;
Ready(Some(Err(e.into())))
}
} else {
trace_fetch!("ReadingBck FetchEvents find_highest_index_lt None");
self.setup_bck_read();
continue;
}
} else {
trace_fetch!("ReadingBck FetchEvents find_highest_index_lt None");
self.setup_bck_read();
continue;
}
Err(e) => {
self.state = State::Done;
Ready(Some(Err(e)))
}
},
Ready(None) => {
self.state = State::Done;
Ready(Some(Err(Error::AlreadyTaken)))
}
Pending => {
Err(e) => {
self.state = State::Done;
Ready(Some(Err(Error::Logic)))
Ready(Some(Err(e)))
}
},
Ready(None) => {
self.state = State::Done;
Ready(Some(Err(Error::ReadQueueEmptyBck)))
}
Pending => Pending,
},
},
State::ReadingFwd(st) => match &mut st.reading_state {
ReadingState::FetchMsp(st2) => match st2.fut.poll_unpin(cx) {
Ready(Some(Ok(ts))) => {
trace_fetch!("ReadingFwd FetchMsp {}", ts.fmt());
self.msp_buf.push_back(ts);
self.setup_fwd_read();
continue;
Ready(Ok(a)) => {
// trace_fetch!("ReadingFwd FetchMsp {}", ts.fmt());
for x in a {
self.msp_buf.push_back(x);
}
if self.msp_buf.len() == 0 {
self.state = State::InputDone;
continue;
} else {
self.setup_fwd_read();
continue;
}
}
Ready(Some(Err(e))) => Ready(Some(Err(e.into()))),
Ready(Err(e)) => Ready(Some(Err(e.into()))),
Pending => Pending,
},
ReadingState::FetchEvents(st2) => match st2.qu.poll_next_unpin(cx) {
Ready(Some(x)) => match x {
Ok((evs, mut jobtrace)) => {
jobtrace
.add_event_now(crate::events::ReadEventKind::EventsStreamRtSees(evs.len() as u32));
use items_2::merger::Mergeable;
trace_fetch!("ReadingFwd {jobtrace}");
for ts in Mergeable::tss(&evs) {
trace_every_event!("ReadingFwd FetchEvents ts {}", ts.fmt());
}
self.out.push_back(evs);
self.setup_fwd_read();
continue;
}
Err(e) => {
self.state = State::Done;
Ready(Some(Err(e.into())))
}
},
Ready(None) => {
self.state = State::InputDone;
continue;
self.state = State::Done;
Ready(Some(Err(Error::ReadQueueEmptyFwd)))
}
Pending => Pending,
},
ReadingState::FetchEvents(st2) => {
let _ = st2.a.fut.poll_unpin(cx);
if let Some(st3) = st2.b.as_mut() {
let _ = st3.fut.poll_unpin(cx);
}
match st2.a.fut.take_if_ready() {
Ready(Some(x)) => {
if let Some(b) = st2.b.take() {
st2.a = b;
}
match x {
Ok((evs, mut jobtrace)) => {
jobtrace.add_event_now(crate::events::ReadEventKind::EventsStreamRtSees(
evs.len() as u32,
));
use items_2::merger::Mergeable;
trace!("ReadingFwd {jobtrace}");
for ts in Mergeable::tss(&evs) {
trace_every_event!("ReadingFwd FetchEvents ts {}", ts.fmt());
}
self.out.push_back(evs);
self.setup_fwd_read();
continue;
}
Err(e) => {
self.state = State::Done;
Ready(Some(Err(e.into())))
}
}
}
Ready(None) => {
self.state = State::Done;
Ready(Some(Err(Error::Logic)))
}
Pending => Pending,
}
}
},
State::InputDone => {
if self.out.len() == 0 {

View File

@@ -8,6 +8,7 @@ use async_channel::Sender;
use err::thiserror;
use err::ThisError;
use futures_util::Future;
use futures_util::StreamExt;
use items_0::Events;
use items_2::binning::container_bins::ContainerBins;
use netpod::log::*;
@@ -205,7 +206,7 @@ impl ScyllaWorker {
scyconf_mt: ScyllaConfig,
scyconf_lt: ScyllaConfig,
) -> Result<(ScyllaQueue, Self), Error> {
let (tx, rx) = async_channel::bounded(64);
let (tx, rx) = async_channel::bounded(200);
let queue = ScyllaQueue { tx };
let worker = Self {
rx,
@@ -229,61 +230,64 @@ impl ScyllaWorker {
self.scyconf_mt.keyspace.as_str(),
self.scyconf_lt.keyspace.as_str(),
];
info!("scylla worker PREPARE START");
debug!("scylla worker prepare start");
let stmts = StmtsEvents::new(kss.try_into().map_err(|_| Error::MissingKeyspaceConfig)?, &scy).await?;
let stmts = Arc::new(stmts);
let stmts_cache = StmtsCache::new(kss[0], &scy).await?;
let stmts_cache = Arc::new(stmts_cache);
info!("scylla worker PREPARE DONE");
loop {
let x = self.rx.recv().await;
let job = match x {
Ok(x) => x,
Err(_) => {
break;
}
};
match job {
Job::FindTsMsp(rt, series, range, bck, tx) => {
let res = crate::events2::msp::find_ts_msp(&rt, series, range, bck, &stmts, &scy).await;
if tx.send(res.map_err(Into::into)).await.is_err() {
// TODO count for stats
debug!("scylla worker prepare done");
self.rx
.map(|job| async {
match job {
Job::FindTsMsp(rt, series, range, bck, tx) => {
let res = crate::events2::msp::find_ts_msp(&rt, series, range, bck, &stmts, &scy).await;
if tx.send(res.map_err(Into::into)).await.is_err() {
// TODO count for stats
}
}
Job::ReadNextValues(job) => {
let fut = (job.futgen)(scy.clone(), stmts.clone(), job.jobtrace);
let res = fut.await;
if job.tx.send(res.map_err(Into::into)).await.is_err() {
// TODO count for stats
}
}
Job::AccountingReadTs(rt, ts, tx) => {
let ks = match &rt {
RetentionTime::Short => &self.scyconf_st.keyspace,
RetentionTime::Medium => &self.scyconf_mt.keyspace,
RetentionTime::Long => &self.scyconf_lt.keyspace,
};
let res = crate::accounting::toplist::read_ts(&ks, rt, ts, &scy).await;
if tx.send(res.map_err(Into::into)).await.is_err() {
// TODO count for stats
}
}
Job::WriteCacheF32(series, bins, tx) => {
let res = super::bincache::worker_write(series, bins, &stmts_cache, &scy).await;
if tx.send(res).await.is_err() {
// TODO count for stats
}
}
Job::ReadCacheF32(job) => {
let res = super::bincache::worker_read(
job.series,
job.bin_len,
job.msp,
job.offs,
&stmts_cache,
&scy,
)
.await;
if job.tx.send(res).await.is_err() {
// TODO count for stats
}
}
}
Job::ReadNextValues(job) => {
let fut = (job.futgen)(scy.clone(), stmts.clone(), job.jobtrace);
let res = fut.await;
if job.tx.send(res.map_err(Into::into)).await.is_err() {
// TODO count for stats
}
}
Job::AccountingReadTs(rt, ts, tx) => {
let ks = match &rt {
RetentionTime::Short => &self.scyconf_st.keyspace,
RetentionTime::Medium => &self.scyconf_mt.keyspace,
RetentionTime::Long => &self.scyconf_lt.keyspace,
};
let res = crate::accounting::toplist::read_ts(&ks, rt, ts, &scy).await;
if tx.send(res.map_err(Into::into)).await.is_err() {
// TODO count for stats
}
}
Job::WriteCacheF32(series, bins, tx) => {
let res = super::bincache::worker_write(series, bins, &stmts_cache, &scy).await;
if tx.send(res).await.is_err() {
// TODO count for stats
}
}
Job::ReadCacheF32(job) => {
let res =
super::bincache::worker_read(job.series, job.bin_len, job.msp, job.offs, &stmts_cache, &scy)
.await;
if job.tx.send(res).await.is_err() {
// TODO count for stats
}
}
}
}
})
.buffer_unordered(80)
.for_each(|_| futures_util::future::ready(()))
.await;
info!("scylla worker finished");
Ok(())
}