This commit is contained in:
Dominik Werder
2024-10-30 16:58:19 +01:00
parent b0d9d5f0a8
commit 2d26c5946b

View File

@@ -103,7 +103,6 @@ where
{
Ongoing(F),
Ready(<F as Future>::Output),
Taken,
}
impl<F> Fst<F>
@@ -111,17 +110,6 @@ where
F: Future + Unpin,
<F as Future>::Output: Unpin,
{
fn take_if_ready(&mut self) -> Poll<Option<<F as Future>::Output>> {
use Poll::*;
match self {
Fst::Ongoing(_) => Pending,
Fst::Ready(_) => match core::mem::replace(self, Fst::Taken) {
Fst::Ready(x) => Ready(Some(x)),
_ => panic!(),
},
Fst::Taken => Ready(None),
}
}
}
impl<F> Future for Fst<F>
@@ -142,7 +130,6 @@ where
Pending => Pending,
},
Fst::Ready(_) => Ready(()),
Fst::Taken => Ready(()),
}
}
}
@@ -188,19 +175,14 @@ impl Stream for ReadQueue {
for fut in self.futs.iter_mut() {
let _ = fut.poll_unpin(cx);
}
if let Some(Fst::Taken) = self.futs.front() {
error!("fut in queue Taken");
panic!()
} else {
if let Some(Fst::Ready(_)) = self.futs.front() {
if let Some(Fst::Ready(k)) = self.futs.pop_front() {
Ready(Some(k))
} else {
panic!()
}
if let Some(Fst::Ready(_)) = self.futs.front() {
if let Some(Fst::Ready(k)) = self.futs.pop_front() {
Ready(Some(k))
} else {
Pending
panic!()
}
} else {
Pending
}
}
}
@@ -224,7 +206,19 @@ struct ReadingBck {
}
struct ReadingFwd {
reading_state: ReadingState,
msp_done: bool,
msp_fut: Option<FetchMsp>,
qu: ReadQueue,
}
impl ReadingFwd {
fn new(qucap: usize) -> Self {
Self {
msp_done: false,
msp_fut: None,
qu: ReadQueue::new(qucap),
}
}
}
enum State {
@@ -443,62 +437,29 @@ impl EventsStreamRt {
}
fn setup_fwd_read(&mut self) {
if let State::ReadingFwd(st2) = &self.state {
if let ReadingState::FetchEvents(st3) = &st2.reading_state {
if st3.qu.has_space() {
} else {
panic!()
}
} else {
self.state = State::ReadingFwd(ReadingFwd {
reading_state: ReadingState::FetchEvents(FetchEvents {
qu: ReadQueue::new(self.qucap),
}),
});
}
} else {
self.state = State::ReadingFwd(ReadingFwd {
reading_state: ReadingState::FetchEvents(FetchEvents {
qu: ReadQueue::new(self.qucap),
}),
});
}
self.state = State::ReadingFwd(ReadingFwd::new(self.qucap));
}
let qu = if let State::ReadingFwd(st2) = &mut self.state {
if let ReadingState::FetchEvents(st3) = &mut st2.reading_state {
&mut st3.qu
} else {
panic!()
}
} else {
panic!()
};
trace_fetch!("setup_fwd_read {} {} BEFORE", self.msp_buf.len(), qu.len());
while qu.has_space() {
if let Some(ts) = self.msp_buf.pop_front() {
trace_fetch!("setup_fwd_read {} FILL A SLOT", ts.fmt());
let jobtrace = ReadJobTrace::new();
let scyqueue = self.scyqueue.clone();
let rt = self.rt.clone();
let series = self.series.clone();
let range = self.range.clone();
let readopts = self.readopts.clone();
let ch_conf = self.ch_conf.clone();
let fut =
Self::make_read_events_fut(ts, false, rt, series, range, readopts, ch_conf, scyqueue, jobtrace);
qu.push(fut);
} else {
break;
}
}
trace_fetch!("setup_fwd_read {} {} AFTER", self.msp_buf.len(), qu.len());
if self.msp_buf.len() == 0 && qu.len() == 0 {
trace_msp_fetch!("setup_fwd_read no msp");
let fut = Self::make_msp_read_fut(&mut self.msp_inp);
self.state = State::ReadingFwd(ReadingFwd {
reading_state: ReadingState::FetchMsp(FetchMsp { fut }),
});
}
fn redo_fwd_read(&mut self) {
// trace_fetch!("setup_fwd_read {} {} BEFORE", self.msp_buf.len(), qu.len());
// while qu.has_space() {
// if let Some(ts) = self.msp_buf.pop_front() {
// trace_fetch!("setup_fwd_read {} FILL A SLOT", ts.fmt());
// let jobtrace = ReadJobTrace::new();
// let scyqueue = self.scyqueue.clone();
// let rt = self.rt.clone();
// let series = self.series.clone();
// let range = self.range.clone();
// let readopts = self.readopts.clone();
// let ch_conf = self.ch_conf.clone();
// let fut =
// Self::make_read_events_fut(ts, false, rt, series, range, readopts, ch_conf, scyqueue, jobtrace);
// qu.push(fut);
// } else {
// break;
// }
// }
// trace_fetch!("setup_fwd_read {} {} AFTER", self.msp_buf.len(), qu.len());
}
}
@@ -579,7 +540,9 @@ impl Stream for EventsStreamRt {
self.out_cnt += item.len() as u64;
break Ready(Some(Ok(ChannelEvents::Events(item))));
}
break match &mut self.state {
let self2 = self.as_mut().get_mut();
let (state, msp_buf) = (&mut self2.state, &mut self2.msp_buf);
break match state {
State::Begin => {
if self.readopts.one_before {
trace_fetch!("State::Begin Bck");
@@ -589,10 +552,8 @@ impl Stream for EventsStreamRt {
});
} else {
trace_fetch!("State::Begin Fwd");
let fut = Self::make_msp_read_fut(&mut self.msp_inp);
self.state = State::ReadingFwd(ReadingFwd {
reading_state: ReadingState::FetchMsp(FetchMsp { fut }),
});
// let fut = Self::make_msp_read_fut(&mut self.msp_inp);
self.setup_fwd_read();
}
continue;
}
@@ -667,50 +628,69 @@ impl Stream for EventsStreamRt {
Pending => Pending,
},
},
State::ReadingFwd(st) => match &mut st.reading_state {
ReadingState::FetchMsp(st2) => match st2.fut.poll_unpin(cx) {
Ready(Ok(a)) => {
// trace_fetch!("ReadingFwd FetchMsp {}", ts.fmt());
for x in a {
self.msp_buf.push_back(x);
State::ReadingFwd(st) => {
let mut have_pending = false;
if let Some(fut) = st.msp_fut.as_mut() {
match fut.fut.poll_unpin(cx) {
Ready(_) => {
st.msp_fut = None;
}
if self.msp_buf.len() == 0 {
self.state = State::InputDone;
continue;
} else {
self.setup_fwd_read();
continue;
Pending => {
have_pending = true;
}
}
Ready(Err(e)) => Ready(Some(Err(e.into()))),
Pending => Pending,
},
ReadingState::FetchEvents(st2) => match st2.qu.poll_next_unpin(cx) {
Ready(Some(x)) => match x {
Ok((evs, mut jobtrace)) => {
jobtrace
.add_event_now(crate::events::ReadEventKind::EventsStreamRtSees(evs.len() as u32));
use items_2::merger::Mergeable;
trace_fetch!("ReadingFwd {jobtrace}");
for ts in Mergeable::tss(&evs) {
trace_every_event!("ReadingFwd FetchEvents ts {}", ts.fmt());
}
self.out.push_back(evs);
self.setup_fwd_read();
continue;
}
Err(e) => {
self.state = State::Done;
Ready(Some(Err(e.into())))
}
},
Ready(None) => {
self.state = State::Done;
Ready(Some(Err(Error::ReadQueueEmptyFwd)))
}
Pending => Pending,
},
},
} else if st.msp_done == false && msp_buf.len() < 100 {
trace_msp_fetch!("create msp read fut");
let fut = Self::make_msp_read_fut(&mut self2.msp_inp);
// let fut = err::todoval();
st.msp_fut = Some(FetchMsp { fut: fut });
};
panic!()
}
// State::ReadingFwd(st) => match &mut st.reading_state {
// ReadingState::FetchMsp(st2) => match st2.fut.poll_unpin(cx) {
// Ready(Ok(a)) => {
// // trace_fetch!("ReadingFwd FetchMsp {}", ts.fmt());
// for x in a {
// self.msp_buf.push_back(x);
// }
// if self.msp_buf.len() == 0 {
// self.state = State::InputDone;
// continue;
// } else {
// self.setup_fwd_read();
// continue;
// }
// }
// Ready(Err(e)) => Ready(Some(Err(e.into()))),
// Pending => Pending,
// },
// ReadingState::FetchEvents(st2) => match st2.qu.poll_next_unpin(cx) {
// Ready(Some(x)) => match x {
// Ok((evs, mut jobtrace)) => {
// jobtrace
// .add_event_now(crate::events::ReadEventKind::EventsStreamRtSees(evs.len() as u32));
// use items_2::merger::Mergeable;
// trace_fetch!("ReadingFwd {jobtrace}");
// for ts in Mergeable::tss(&evs) {
// trace_every_event!("ReadingFwd FetchEvents ts {}", ts.fmt());
// }
// self.out.push_back(evs);
// self.setup_fwd_read();
// continue;
// }
// Err(e) => {
// self.state = State::Done;
// Ready(Some(Err(e.into())))
// }
// },
// Ready(None) => {
// self.state = State::Done;
// Ready(Some(Err(Error::ReadQueueEmptyFwd)))
// }
// Pending => Pending,
// },
// },
State::InputDone => {
if self.out.len() == 0 {
self.state = State::Done;