Improve event read

This commit is contained in:
Dominik Werder
2024-10-30 17:57:32 +01:00
parent 2d26c5946b
commit cde2e4c1a2
2 changed files with 123 additions and 64 deletions

View File

@@ -42,7 +42,7 @@ pub async fn scylla_channel_event_stream(
evq.need_one_before_range(),
evq.need_value_data(),
evq.transform().enum_as_string().unwrap_or(false),
evq.settings().scylla_read_queue_len().unwrap_or(1),
evq.settings().scylla_read_queue_len(),
);
let stream: Pin<Box<dyn Stream<Item = _> + Send>> = if let Some(rt) = evq.use_rt() {
let x = scyllaconn::events2::events::EventsStreamRt::new(

View File

@@ -33,20 +33,17 @@ macro_rules! trace_fetch { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
#[allow(unused)]
macro_rules! trace_msp_fetch { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
#[allow(unused)]
macro_rules! trace_redo_fwd_read { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
#[allow(unused)]
macro_rules! trace_emit { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
#[allow(unused)]
macro_rules! warn_item { ($($arg:tt)*) => ( if true { debug!($($arg)*); } ) }
macro_rules! trace_every_event { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
#[allow(unused)]
macro_rules! trace_every_event {
($($arg:tt)*) => {
if false {
trace!($($arg)*);
}
};
}
macro_rules! warn_item { ($($arg:tt)*) => ( if true { debug!($($arg)*); } ) }
#[derive(Debug, Clone)]
pub struct EventReadOpts {
@@ -57,12 +54,12 @@ pub struct EventReadOpts {
}
impl EventReadOpts {
pub fn new(one_before: bool, with_values: bool, enum_as_strings: bool, qucap: u32) -> Self {
pub fn new(one_before: bool, with_values: bool, enum_as_strings: bool, qucap: Option<u32>) -> Self {
Self {
one_before,
with_values,
enum_as_strings,
qucap,
qucap: qucap.unwrap_or(2),
}
}
@@ -209,14 +206,39 @@ struct ReadingFwd {
msp_done: bool,
msp_fut: Option<FetchMsp>,
qu: ReadQueue,
make_fut_info: MakeFutInfo,
}
impl ReadingFwd {
fn new(qucap: usize) -> Self {
fn new(k: &EventsStreamRt) -> Self {
Self {
msp_done: false,
msp_fut: None,
qu: ReadQueue::new(qucap),
qu: ReadQueue::new(k.qucap),
make_fut_info: MakeFutInfo::new(k),
}
}
}
#[derive(Clone)]
struct MakeFutInfo {
scyqueue: ScyllaQueue,
rt: RetentionTime,
series: SeriesId,
range: ScyllaSeriesRange,
readopts: EventReadOpts,
ch_conf: ChConf,
}
impl MakeFutInfo {
fn new(k: &EventsStreamRt) -> Self {
Self {
scyqueue: k.scyqueue.clone(),
rt: k.rt.clone(),
series: k.series.clone(),
range: k.range.clone(),
readopts: k.readopts.clone(),
ch_conf: k.ch_conf.clone(),
}
}
}
@@ -308,25 +330,12 @@ impl EventsStreamRt {
fn make_read_events_fut(
ts_msp: TsMs,
bck: bool,
rt: RetentionTime,
series: SeriesId,
range: ScyllaSeriesRange,
readopts: EventReadOpts,
ch_conf: ChConf,
scyqueue: ScyllaQueue,
mfi: MakeFutInfo,
jobtrace: ReadJobTrace,
) -> Pin<Box<dyn Future<Output = Result<(Box<dyn Events>, ReadJobTrace), Error>> + Send>> {
let opts = ReadNextValuesOpts::new(
rt.clone(),
series.clone(),
ts_msp,
range.clone(),
!bck,
readopts.clone(),
scyqueue,
);
let scalar_type = ch_conf.scalar_type().clone();
let shape = ch_conf.shape().clone();
let opts = ReadNextValuesOpts::new(mfi.rt, mfi.series, ts_msp, mfi.range, !bck, mfi.readopts, mfi.scyqueue);
let scalar_type = mfi.ch_conf.scalar_type().clone();
let shape = mfi.ch_conf.shape().clone();
trace_fetch!(
"make_read_events_fut bck {} msp {:?} {} {:?} {:?}",
bck,
@@ -411,13 +420,8 @@ impl EventsStreamRt {
if let Some(ts) = self.msp_buf_bck.pop_back() {
trace_fetch!("setup_bck_read {}", ts.fmt());
let jobtrace = ReadJobTrace::new();
let scyqueue = self.scyqueue.clone();
let rt = self.rt.clone();
let series = self.series.clone();
let range = self.range.clone();
let readopts = self.readopts.clone();
let ch_conf = self.ch_conf.clone();
let fut = Self::make_read_events_fut(ts, true, rt, series, range, readopts, ch_conf, scyqueue, jobtrace);
let mfi = MakeFutInfo::new(self);
let fut = Self::make_read_events_fut(ts, true, mfi, jobtrace);
let mut qu = ReadQueue::new(self.qucap);
qu.push(fut);
self.state = State::ReadingBck(ReadingBck {
@@ -437,29 +441,27 @@ impl EventsStreamRt {
}
fn setup_fwd_read(&mut self) {
self.state = State::ReadingFwd(ReadingFwd::new(self.qucap));
let selfname = "setup_fwd_read";
trace_fetch!("{selfname}");
self.state = State::ReadingFwd(ReadingFwd::new(self));
}
fn redo_fwd_read(&mut self) {
// trace_fetch!("setup_fwd_read {} {} BEFORE", self.msp_buf.len(), qu.len());
// while qu.has_space() {
// if let Some(ts) = self.msp_buf.pop_front() {
// trace_fetch!("setup_fwd_read {} FILL A SLOT", ts.fmt());
// let jobtrace = ReadJobTrace::new();
// let scyqueue = self.scyqueue.clone();
// let rt = self.rt.clone();
// let series = self.series.clone();
// let range = self.range.clone();
// let readopts = self.readopts.clone();
// let ch_conf = self.ch_conf.clone();
// let fut =
// Self::make_read_events_fut(ts, false, rt, series, range, readopts, ch_conf, scyqueue, jobtrace);
// qu.push(fut);
// } else {
// break;
// }
// }
// trace_fetch!("setup_fwd_read {} {} AFTER", self.msp_buf.len(), qu.len());
fn redo_fwd_read(st: &mut ReadingFwd, msp_buf: &mut VecDeque<TsMs>) {
let selfname = "redo_fwd_read";
let qu = &mut st.qu;
trace_redo_fwd_read!("{selfname} {} {} BEFORE", msp_buf.len(), qu.len());
while qu.has_space() {
if let Some(ts) = msp_buf.pop_front() {
trace_fetch!("{selfname} {} FILL A SLOT", ts.fmt());
let jobtrace = ReadJobTrace::new();
let mfi = st.make_fut_info.clone();
let fut = Self::make_read_events_fut(ts, false, mfi, jobtrace);
qu.push(fut);
} else {
break;
}
}
trace_redo_fwd_read!("{selfname} {} {} AFTER", msp_buf.len(), qu.len());
}
}
@@ -468,8 +470,12 @@ impl Stream for EventsStreamRt {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
// return Ready(Some(Err(Error::Logic)));
let mut i = 0;
loop {
i += 1;
if i > 5000 {
panic!("too many iterations")
}
if let Some(mut item) = self.out.pop_front() {
if !item.verify() {
warn_item!("{}bad item {:?}", "\n\n--------------------------\n", item);
@@ -632,8 +638,23 @@ impl Stream for EventsStreamRt {
let mut have_pending = false;
if let Some(fut) = st.msp_fut.as_mut() {
match fut.fut.poll_unpin(cx) {
Ready(_) => {
Ready(a) => {
st.msp_fut = None;
match a {
Ok(a) => {
if a.len() == 0 {
trace_fetch!("MSP INPUT DONE --------------------");
st.msp_done = true;
}
for x in a {
msp_buf.push_back(x);
}
}
Err(e) => {
self.state = State::Done;
return Ready(Some(Err(e.into())));
}
}
}
Pending => {
have_pending = true;
@@ -642,10 +663,48 @@ impl Stream for EventsStreamRt {
} else if st.msp_done == false && msp_buf.len() < 100 {
trace_msp_fetch!("create msp read fut");
let fut = Self::make_msp_read_fut(&mut self2.msp_inp);
// let fut = err::todoval();
st.msp_fut = Some(FetchMsp { fut: fut });
};
panic!()
st.msp_fut = Some(FetchMsp { fut });
} else {
// trace_fetch!("nothing to do for msp fetch");
}
if st.qu.has_space() {
Self::redo_fwd_read(st, msp_buf);
}
match st.qu.poll_next_unpin(cx) {
Ready(Some(x)) => match x {
Ok((evs, mut jobtrace)) => {
jobtrace
.add_event_now(crate::events::ReadEventKind::EventsStreamRtSees(evs.len() as u32));
use items_2::merger::Mergeable;
trace_fetch!("ReadingFwd {jobtrace}");
for ts in Mergeable::tss(&evs) {
trace_every_event!("ReadingFwd FetchEvents ts {}", ts.fmt());
}
self.out.push_back(evs);
continue;
}
Err(e) => {
self.state = State::Done;
return Ready(Some(Err(e.into())));
}
},
Ready(None) => {}
Pending => {
have_pending = true;
}
}
if have_pending {
Pending
} else {
if msp_buf.len() == 0 && st.msp_done && st.qu.len() == 0 {
self.state = State::InputDone;
continue;
} else if self.out.len() != 0 {
continue;
} else {
panic!("not pending, nothing to output")
}
}
}
// State::ReadingFwd(st) => match &mut st.reading_state {
// ReadingState::FetchMsp(st2) => match st2.fut.poll_unpin(cx) {