diff --git a/crates/scyllaconn/src/events2/events.rs b/crates/scyllaconn/src/events2/events.rs index e36ab91..a15367c 100644 --- a/crates/scyllaconn/src/events2/events.rs +++ b/crates/scyllaconn/src/events2/events.rs @@ -103,7 +103,6 @@ where { Ongoing(F), Ready(::Output), - Taken, } impl Fst @@ -111,17 +110,6 @@ where F: Future + Unpin, ::Output: Unpin, { - fn take_if_ready(&mut self) -> Poll::Output>> { - use Poll::*; - match self { - Fst::Ongoing(_) => Pending, - Fst::Ready(_) => match core::mem::replace(self, Fst::Taken) { - Fst::Ready(x) => Ready(Some(x)), - _ => panic!(), - }, - Fst::Taken => Ready(None), - } - } } impl Future for Fst @@ -142,7 +130,6 @@ where Pending => Pending, }, Fst::Ready(_) => Ready(()), - Fst::Taken => Ready(()), } } } @@ -188,19 +175,14 @@ impl Stream for ReadQueue { for fut in self.futs.iter_mut() { let _ = fut.poll_unpin(cx); } - if let Some(Fst::Taken) = self.futs.front() { - error!("fut in queue Taken"); - panic!() - } else { - if let Some(Fst::Ready(_)) = self.futs.front() { - if let Some(Fst::Ready(k)) = self.futs.pop_front() { - Ready(Some(k)) - } else { - panic!() - } + if let Some(Fst::Ready(_)) = self.futs.front() { + if let Some(Fst::Ready(k)) = self.futs.pop_front() { + Ready(Some(k)) } else { - Pending + panic!() } + } else { + Pending } } } @@ -224,7 +206,19 @@ struct ReadingBck { } struct ReadingFwd { - reading_state: ReadingState, + msp_done: bool, + msp_fut: Option, + qu: ReadQueue, +} + +impl ReadingFwd { + fn new(qucap: usize) -> Self { + Self { + msp_done: false, + msp_fut: None, + qu: ReadQueue::new(qucap), + } + } } enum State { @@ -443,62 +437,29 @@ impl EventsStreamRt { } fn setup_fwd_read(&mut self) { - if let State::ReadingFwd(st2) = &self.state { - if let ReadingState::FetchEvents(st3) = &st2.reading_state { - if st3.qu.has_space() { - } else { - panic!() - } - } else { - self.state = State::ReadingFwd(ReadingFwd { - reading_state: ReadingState::FetchEvents(FetchEvents { - qu: ReadQueue::new(self.qucap), - }), - }); - } - } else { - self.state = State::ReadingFwd(ReadingFwd { - reading_state: ReadingState::FetchEvents(FetchEvents { - qu: ReadQueue::new(self.qucap), - }), - }); - } + self.state = State::ReadingFwd(ReadingFwd::new(self.qucap)); + } - let qu = if let State::ReadingFwd(st2) = &mut self.state { - if let ReadingState::FetchEvents(st3) = &mut st2.reading_state { - &mut st3.qu - } else { - panic!() - } - } else { - panic!() - }; - trace_fetch!("setup_fwd_read {} {} BEFORE", self.msp_buf.len(), qu.len()); - while qu.has_space() { - if let Some(ts) = self.msp_buf.pop_front() { - trace_fetch!("setup_fwd_read {} FILL A SLOT", ts.fmt()); - let jobtrace = ReadJobTrace::new(); - let scyqueue = self.scyqueue.clone(); - let rt = self.rt.clone(); - let series = self.series.clone(); - let range = self.range.clone(); - let readopts = self.readopts.clone(); - let ch_conf = self.ch_conf.clone(); - let fut = - Self::make_read_events_fut(ts, false, rt, series, range, readopts, ch_conf, scyqueue, jobtrace); - qu.push(fut); - } else { - break; - } - } - trace_fetch!("setup_fwd_read {} {} AFTER", self.msp_buf.len(), qu.len()); - if self.msp_buf.len() == 0 && qu.len() == 0 { - trace_msp_fetch!("setup_fwd_read no msp"); - let fut = Self::make_msp_read_fut(&mut self.msp_inp); - self.state = State::ReadingFwd(ReadingFwd { - reading_state: ReadingState::FetchMsp(FetchMsp { fut }), - }); - } + fn redo_fwd_read(&mut self) { + // trace_fetch!("setup_fwd_read {} {} BEFORE", self.msp_buf.len(), qu.len()); + // while qu.has_space() { + // if let Some(ts) = self.msp_buf.pop_front() { + // trace_fetch!("setup_fwd_read {} FILL A SLOT", ts.fmt()); + // let jobtrace = ReadJobTrace::new(); + // let scyqueue = self.scyqueue.clone(); + // let rt = self.rt.clone(); + // let series = self.series.clone(); + // let range = self.range.clone(); + // let readopts = self.readopts.clone(); + // let ch_conf = self.ch_conf.clone(); + // let fut = + // Self::make_read_events_fut(ts, false, rt, series, range, readopts, ch_conf, scyqueue, jobtrace); + // qu.push(fut); + // } else { + // break; + // } + // } + // trace_fetch!("setup_fwd_read {} {} AFTER", self.msp_buf.len(), qu.len()); } } @@ -579,7 +540,9 @@ impl Stream for EventsStreamRt { self.out_cnt += item.len() as u64; break Ready(Some(Ok(ChannelEvents::Events(item)))); } - break match &mut self.state { + let self2 = self.as_mut().get_mut(); + let (state, msp_buf) = (&mut self2.state, &mut self2.msp_buf); + break match state { State::Begin => { if self.readopts.one_before { trace_fetch!("State::Begin Bck"); @@ -589,10 +552,8 @@ impl Stream for EventsStreamRt { }); } else { trace_fetch!("State::Begin Fwd"); - let fut = Self::make_msp_read_fut(&mut self.msp_inp); - self.state = State::ReadingFwd(ReadingFwd { - reading_state: ReadingState::FetchMsp(FetchMsp { fut }), - }); + // let fut = Self::make_msp_read_fut(&mut self.msp_inp); + self.setup_fwd_read(); } continue; } @@ -667,50 +628,69 @@ impl Stream for EventsStreamRt { Pending => Pending, }, }, - State::ReadingFwd(st) => match &mut st.reading_state { - ReadingState::FetchMsp(st2) => match st2.fut.poll_unpin(cx) { - Ready(Ok(a)) => { - // trace_fetch!("ReadingFwd FetchMsp {}", ts.fmt()); - for x in a { - self.msp_buf.push_back(x); + State::ReadingFwd(st) => { + let mut have_pending = false; + if let Some(fut) = st.msp_fut.as_mut() { + match fut.fut.poll_unpin(cx) { + Ready(_) => { + st.msp_fut = None; } - if self.msp_buf.len() == 0 { - self.state = State::InputDone; - continue; - } else { - self.setup_fwd_read(); - continue; + Pending => { + have_pending = true; } } - Ready(Err(e)) => Ready(Some(Err(e.into()))), - Pending => Pending, - }, - ReadingState::FetchEvents(st2) => match st2.qu.poll_next_unpin(cx) { - Ready(Some(x)) => match x { - Ok((evs, mut jobtrace)) => { - jobtrace - .add_event_now(crate::events::ReadEventKind::EventsStreamRtSees(evs.len() as u32)); - use items_2::merger::Mergeable; - trace_fetch!("ReadingFwd {jobtrace}"); - for ts in Mergeable::tss(&evs) { - trace_every_event!("ReadingFwd FetchEvents ts {}", ts.fmt()); - } - self.out.push_back(evs); - self.setup_fwd_read(); - continue; - } - Err(e) => { - self.state = State::Done; - Ready(Some(Err(e.into()))) - } - }, - Ready(None) => { - self.state = State::Done; - Ready(Some(Err(Error::ReadQueueEmptyFwd))) - } - Pending => Pending, - }, - }, + } else if st.msp_done == false && msp_buf.len() < 100 { + trace_msp_fetch!("create msp read fut"); + let fut = Self::make_msp_read_fut(&mut self2.msp_inp); + // let fut = err::todoval(); + st.msp_fut = Some(FetchMsp { fut: fut }); + }; + panic!() + } + // State::ReadingFwd(st) => match &mut st.reading_state { + // ReadingState::FetchMsp(st2) => match st2.fut.poll_unpin(cx) { + // Ready(Ok(a)) => { + // // trace_fetch!("ReadingFwd FetchMsp {}", ts.fmt()); + // for x in a { + // self.msp_buf.push_back(x); + // } + // if self.msp_buf.len() == 0 { + // self.state = State::InputDone; + // continue; + // } else { + // self.setup_fwd_read(); + // continue; + // } + // } + // Ready(Err(e)) => Ready(Some(Err(e.into()))), + // Pending => Pending, + // }, + // ReadingState::FetchEvents(st2) => match st2.qu.poll_next_unpin(cx) { + // Ready(Some(x)) => match x { + // Ok((evs, mut jobtrace)) => { + // jobtrace + // .add_event_now(crate::events::ReadEventKind::EventsStreamRtSees(evs.len() as u32)); + // use items_2::merger::Mergeable; + // trace_fetch!("ReadingFwd {jobtrace}"); + // for ts in Mergeable::tss(&evs) { + // trace_every_event!("ReadingFwd FetchEvents ts {}", ts.fmt()); + // } + // self.out.push_back(evs); + // self.setup_fwd_read(); + // continue; + // } + // Err(e) => { + // self.state = State::Done; + // Ready(Some(Err(e.into()))) + // } + // }, + // Ready(None) => { + // self.state = State::Done; + // Ready(Some(Err(Error::ReadQueueEmptyFwd))) + // } + // Pending => Pending, + // }, + // }, State::InputDone => { if self.out.len() == 0 { self.state = State::Done;