diff --git a/crates/httpclient/src/httpclient.rs b/crates/httpclient/src/httpclient.rs index 6294cfb..efc9344 100644 --- a/crates/httpclient/src/httpclient.rs +++ b/crates/httpclient/src/httpclient.rs @@ -157,13 +157,20 @@ impl IntoBody for ToJsonBody { pub fn body_stream(stream: S) -> StreamBody where S: Stream> + Send + 'static, - I: Into, - E: fmt::Display, + I: Into + Send, + E: fmt::Display + Send, { - let stream = stream.map(|x| match x { - Ok(x) => Ok(Frame::data(x.into())), - Err(_e) => Err(BodyError::Bad), - }); + let stream = stream + .inspect(|x| { + if let Err(e) = x { + error!("observe error in body stream: {e}"); + } + }) + .take_while(|x| futures_util::future::ready(x.is_ok())) + .map(|x| match x { + Ok(x) => Ok(Frame::data(x.into())), + Err(_e) => Err(BodyError::Bad), + }); StreamBody::new(Box::pin(stream)) } diff --git a/crates/items_2/src/binnedcollected.rs b/crates/items_2/src/binnedcollected.rs deleted file mode 100644 index 8b13789..0000000 --- a/crates/items_2/src/binnedcollected.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/crates/items_2/src/items_2.rs b/crates/items_2/src/items_2.rs index d16574f..9f709f0 100644 --- a/crates/items_2/src/items_2.rs +++ b/crates/items_2/src/items_2.rs @@ -1,5 +1,4 @@ pub mod accounting; -pub mod binnedcollected; pub mod binning; pub mod binsdim0; pub mod binsxbindim0; diff --git a/crates/nodenet/src/scylla.rs b/crates/nodenet/src/scylla.rs index cfd5e12..e2988fb 100644 --- a/crates/nodenet/src/scylla.rs +++ b/crates/nodenet/src/scylla.rs @@ -42,6 +42,7 @@ pub async fn scylla_channel_event_stream( evq.need_one_before_range(), evq.need_value_data(), evq.transform().enum_as_string().unwrap_or(false), + evq.settings().scylla_read_queue_len().unwrap_or(1), ); let stream: Pin + Send>> = if let Some(rt) = evq.use_rt() { let x = scyllaconn::events2::events::EventsStreamRt::new( diff --git a/crates/query/src/api4/binned.rs b/crates/query/src/api4/binned.rs index f50c1c6..376d1d8 100644 --- a/crates/query/src/api4/binned.rs +++ b/crates/query/src/api4/binned.rs @@ -87,6 +87,8 @@ pub struct BinnedQuery { #[serde(default, skip_serializing_if = "Option::is_none")] pub merger_out_len_max: Option, #[serde(default, skip_serializing_if = "Option::is_none")] + scylla_read_queue_len: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] test_do_wasm: Option, #[serde(default)] log_level: String, @@ -108,6 +110,7 @@ impl BinnedQuery { disk_stats_every: None, timeout_content: None, merger_out_len_max: None, + scylla_read_queue_len: None, test_do_wasm: None, log_level: String::new(), use_rt: None, @@ -164,6 +167,10 @@ impl BinnedQuery { self.merger_out_len_max } + pub fn scylla_read_queue_len(&self) -> Option { + self.scylla_read_queue_len + } + pub fn set_series_id(&mut self, series: u64) { self.channel.set_series(series); } @@ -285,6 +292,9 @@ impl FromUrl for BinnedQuery { merger_out_len_max: pairs .get("mergerOutLenMax") .map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?, + scylla_read_queue_len: pairs + .get("scyllaReadQueueLen") + .map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?, test_do_wasm: pairs.get("testDoWasm").map(|x| String::from(x)), log_level: pairs.get("log_level").map_or(String::new(), String::from), use_rt: pairs.get("useRt").map_or(Ok(None), |k| { @@ -351,6 +361,9 @@ impl AppendToUrl for BinnedQuery { if let Some(x) = self.merger_out_len_max.as_ref() { g.append_pair("mergerOutLenMax", &format!("{}", x)); } + if let Some(x) = self.scylla_read_queue_len.as_ref() { + g.append_pair("scyllaReadQueueLen", &x.to_string()); + } if let Some(x) = &self.test_do_wasm { g.append_pair("testDoWasm", &x); } diff --git a/crates/query/src/api4/events.rs b/crates/query/src/api4/events.rs index bb9d878..f317df3 100644 --- a/crates/query/src/api4/events.rs +++ b/crates/query/src/api4/events.rs @@ -60,6 +60,8 @@ pub struct PlainEventsQuery { test_do_wasm: Option, #[serde(default, skip_serializing_if = "Option::is_none")] merger_out_len_max: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + scylla_read_queue_len: Option, #[serde(default, skip_serializing_if = "Vec::is_empty")] create_errors: Vec, #[serde(default)] @@ -91,6 +93,7 @@ impl PlainEventsQuery { do_test_stream_error: false, test_do_wasm: None, merger_out_len_max: None, + scylla_read_queue_len: None, create_errors: Vec::new(), log_level: String::new(), use_rt: None, @@ -160,6 +163,10 @@ impl PlainEventsQuery { self.merger_out_len_max } + pub fn scylla_read_queue_len(&self) -> Option { + self.scylla_read_queue_len + } + pub fn do_test_main_error(&self) -> bool { self.do_test_main_error } @@ -292,6 +299,9 @@ impl FromUrl for PlainEventsQuery { merger_out_len_max: pairs .get("mergerOutLenMax") .map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?, + scylla_read_queue_len: pairs + .get("scyllaReadQueueLen") + .map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?, create_errors: pairs .get("create_errors") .map(|x| x.split(",").map(|x| x.to_string()).collect()) @@ -357,6 +367,9 @@ impl AppendToUrl for PlainEventsQuery { if let Some(x) = self.merger_out_len_max.as_ref() { g.append_pair("mergerOutLenMax", &x.to_string()); } + if let Some(x) = self.scylla_read_queue_len.as_ref() { + g.append_pair("scyllaReadQueueLen", &x.to_string()); + } if self.create_errors.len() != 0 { g.append_pair("create_errors", &self.create_errors.join(",")); } @@ -418,12 +431,17 @@ pub struct EventsSubQuerySettings { create_errors: Vec, use_rt: Option, merger_out_len_max: Option, + scylla_read_queue_len: Option, } impl EventsSubQuerySettings { pub fn merger_out_len_max(&self) -> Option { self.merger_out_len_max } + + pub fn scylla_read_queue_len(&self) -> Option { + self.scylla_read_queue_len + } } impl Default for EventsSubQuerySettings { @@ -439,6 +457,7 @@ impl Default for EventsSubQuerySettings { create_errors: Vec::new(), use_rt: None, merger_out_len_max: None, + scylla_read_queue_len: None, } } } @@ -457,6 +476,7 @@ impl From<&PlainEventsQuery> for EventsSubQuerySettings { create_errors: value.create_errors.clone(), use_rt: value.use_rt(), merger_out_len_max: value.merger_out_len_max(), + scylla_read_queue_len: value.scylla_read_queue_len(), } } } @@ -476,6 +496,7 @@ impl From<&BinnedQuery> for EventsSubQuerySettings { create_errors: Vec::new(), use_rt: value.use_rt(), merger_out_len_max: value.merger_out_len_max(), + scylla_read_queue_len: value.scylla_read_queue_len(), } } } @@ -495,6 +516,7 @@ impl From<&Api1Query> for EventsSubQuerySettings { create_errors: Vec::new(), use_rt: None, merger_out_len_max: None, + scylla_read_queue_len: None, } } } diff --git a/crates/scyllaconn/src/events.rs b/crates/scyllaconn/src/events.rs index c15e640..07c3b7d 100644 --- a/crates/scyllaconn/src/events.rs +++ b/crates/scyllaconn/src/events.rs @@ -474,7 +474,7 @@ where let ts_msp = opts.ts_msp; let range = opts.range; let table_name = ST::table_name(); - let with_values = opts.readopts.with_values; + let with_values = opts.readopts.with_values(); if range.end() > TsNano::from_ns(i64::MAX as u64) { return Err(Error::RangeEndOverflow); } diff --git a/crates/scyllaconn/src/events2/events.rs b/crates/scyllaconn/src/events2/events.rs index 86de4d6..e36ab91 100644 --- a/crates/scyllaconn/src/events2/events.rs +++ b/crates/scyllaconn/src/events2/events.rs @@ -25,28 +25,19 @@ use std::collections::VecDeque; use std::pin::Pin; use std::task::Context; use std::task::Poll; +use std::time::Duration; #[allow(unused)] -macro_rules! trace_fetch { - ($($arg:tt)*) => ( if true { trace!($($arg)*); } ) } +macro_rules! trace_fetch { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } #[allow(unused)] -macro_rules! trace_emit { - ($($arg:tt)*) => { - if true { - trace!($($arg)*); - } - }; -} +macro_rules! trace_msp_fetch { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } #[allow(unused)] -macro_rules! warn_item { - ($($arg:tt)*) => { - if true { - debug!($($arg)*); - } - }; -} +macro_rules! trace_emit { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } + +#[allow(unused)] +macro_rules! warn_item { ($($arg:tt)*) => ( if true { debug!($($arg)*); } ) } #[allow(unused)] macro_rules! trace_every_event { @@ -59,19 +50,25 @@ macro_rules! trace_every_event { #[derive(Debug, Clone)] pub struct EventReadOpts { - pub with_values: bool, - pub enum_as_strings: bool, - pub one_before: bool, + with_values: bool, + enum_as_strings: bool, + one_before: bool, + qucap: u32, } impl EventReadOpts { - pub fn new(one_before: bool, with_values: bool, enum_as_strings: bool) -> Self { + pub fn new(one_before: bool, with_values: bool, enum_as_strings: bool, qucap: u32) -> Self { Self { one_before, with_values, enum_as_strings, + qucap, } } + + pub fn with_values(&self) -> bool { + self.with_values + } } #[derive(Debug, ThisError)] @@ -83,6 +80,8 @@ pub enum Error { Unordered, OutOfRange, BadBatch, + ReadQueueEmptyBck, + ReadQueueEmptyFwd, Logic, Merge(#[from] items_0::MergeError), TruncateLogic, @@ -90,7 +89,7 @@ pub enum Error { } struct FetchMsp { - fut: Pin>> + Send>>, + fut: Pin, crate::events2::msp::Error>> + Send>>, } type ReadEventsFutOut = Result<(Box, ReadJobTrace), crate::events2::events::Error>; @@ -148,13 +147,71 @@ where } } +struct ReadQueue { + cap: usize, + futs: VecDeque>, +} + +impl ReadQueue { + fn new(cap: usize) -> Self { + Self { + cap, + futs: VecDeque::new(), + } + } + + fn cap(&self) -> usize { + self.cap + } + + fn len(&self) -> usize { + self.futs.len() + } + + fn has_space(&self) -> bool { + self.len() < self.cap + } + + fn push(&mut self, fut: FetchEventsFut) { + self.futs.push_back(Fst::Ongoing(fut)); + } +} + +impl Stream for ReadQueue { + type Item = ::Output; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + if self.futs.len() == 0 { + Ready(None) + } else { + for fut in self.futs.iter_mut() { + let _ = fut.poll_unpin(cx); + } + if let Some(Fst::Taken) = self.futs.front() { + error!("fut in queue Taken"); + panic!() + } else { + if let Some(Fst::Ready(_)) = self.futs.front() { + if let Some(Fst::Ready(k)) = self.futs.pop_front() { + Ready(Some(k)) + } else { + panic!() + } + } else { + Pending + } + } + } + } +} + struct FetchEvents2 { fut: Fst, } struct FetchEvents { - a: FetchEvents2, - b: Option, + qu: ReadQueue, } enum ReadingState { @@ -192,6 +249,7 @@ pub struct EventsStreamRt { out: VecDeque>, out_cnt: u64, ts_seen_max: u64, + qucap: usize, } impl EventsStreamRt { @@ -206,6 +264,7 @@ impl EventsStreamRt { let series = SeriesId::new(ch_conf.series()); let msp_inp = crate::events2::msp::MspStreamRt::new(rt.clone(), series, range.clone(), scyqueue.clone()); Self { + qucap: readopts.qucap as usize, rt, ch_conf, series, @@ -224,35 +283,56 @@ impl EventsStreamRt { fn make_msp_read_fut( msp_inp: &mut MspStreamRt, - ) -> Pin>> + Send>> { + ) -> Pin, crate::events2::msp::Error>> + Send>> { trace_fetch!("make_msp_read_fut"); let msp_inp = unsafe { let ptr = msp_inp as *mut MspStreamRt; &mut *ptr }; - let fut = Box::pin(msp_inp.next()); + let fut = async { + let cap = 128; + let mut a = Vec::with_capacity(cap); + while let Some(x) = msp_inp.next().await { + match x { + Ok(x) => { + a.push(x); + } + Err(e) => { + return Err(e); + } + } + if a.len() >= cap { + break; + } + } + Ok(a) + }; + let fut = Box::pin(fut); fut } fn make_read_events_fut( - &mut self, ts_msp: TsMs, bck: bool, + rt: RetentionTime, + series: SeriesId, + range: ScyllaSeriesRange, + readopts: EventReadOpts, + ch_conf: ChConf, scyqueue: ScyllaQueue, jobtrace: ReadJobTrace, ) -> Pin, ReadJobTrace), Error>> + Send>> { - trace!("make_read_events_fut --- {} ---", ts_msp); let opts = ReadNextValuesOpts::new( - self.rt.clone(), - self.series.clone(), + rt.clone(), + series.clone(), ts_msp, - self.range.clone(), + range.clone(), !bck, - self.readopts.clone(), + readopts.clone(), scyqueue, ); - let scalar_type = self.ch_conf.scalar_type().clone(); - let shape = self.ch_conf.shape().clone(); + let scalar_type = ch_conf.scalar_type().clone(); + let shape = ch_conf.shape().clone(); trace_fetch!( "make_read_events_fut bck {} msp {:?} {} {:?} {:?}", bck, @@ -262,6 +342,9 @@ impl EventsStreamRt { scalar_type ); let fut = async move { + if false { + taskrun::tokio::time::sleep(Duration::from_millis(10)).await; + } let params = crate::events::ReadNextValuesParams { opts, jobtrace }; let ret = match &shape { Shape::Scalar => match &scalar_type { @@ -335,12 +418,16 @@ impl EventsStreamRt { trace_fetch!("setup_bck_read {}", ts.fmt()); let jobtrace = ReadJobTrace::new(); let scyqueue = self.scyqueue.clone(); - let fut = self.make_read_events_fut(ts, true, scyqueue, jobtrace); + let rt = self.rt.clone(); + let series = self.series.clone(); + let range = self.range.clone(); + let readopts = self.readopts.clone(); + let ch_conf = self.ch_conf.clone(); + let fut = Self::make_read_events_fut(ts, true, rt, series, range, readopts, ch_conf, scyqueue, jobtrace); + let mut qu = ReadQueue::new(self.qucap); + qu.push(fut); self.state = State::ReadingBck(ReadingBck { - reading_state: ReadingState::FetchEvents(FetchEvents { - a: FetchEvents2 { fut: Fst::Ongoing(fut) }, - b: None, - }), + reading_state: ReadingState::FetchEvents(FetchEvents { qu }), }); } else { trace_fetch!("setup_bck_read no msp"); @@ -356,68 +443,57 @@ impl EventsStreamRt { } fn setup_fwd_read(&mut self) { - // TODO always try to setup all available slots. - if let Some(ts) = self.msp_buf.pop_front() { - trace_fetch!("setup_fwd_read {}", ts.fmt()); - let jobtrace = ReadJobTrace::new(); - let scyqueue = self.scyqueue.clone(); - let fut = self.make_read_events_fut(ts, false, scyqueue, jobtrace); - - // Assert that this fn is only called when there is at least one slot available. - // At the moment with 2 slots, this means that the 2nd is always empty. - // TODO careful in general, MUST NOT overwrite the secondary slot with None, there could be something running. - - if let State::ReadingFwd(st2) = &self.state { - if let ReadingState::FetchEvents(st3) = &st2.reading_state { - if st3.b.is_some() { - panic!() - } else { - } + if let State::ReadingFwd(st2) = &self.state { + if let ReadingState::FetchEvents(st3) = &st2.reading_state { + if st3.qu.has_space() { } else { - self.state = State::ReadingFwd(ReadingFwd { - reading_state: ReadingState::FetchEvents(FetchEvents { - a: FetchEvents2 { fut: Fst::Ongoing(fut) }, - b: None, - }), - }); + panic!() } } else { self.state = State::ReadingFwd(ReadingFwd { reading_state: ReadingState::FetchEvents(FetchEvents { - a: FetchEvents2 { fut: Fst::Ongoing(fut) }, - b: None, + qu: ReadQueue::new(self.qucap), }), }); } + } else { + self.state = State::ReadingFwd(ReadingFwd { + reading_state: ReadingState::FetchEvents(FetchEvents { + qu: ReadQueue::new(self.qucap), + }), + }); + } - if let State::ReadingFwd(st2) = &self.state { - if let ReadingState::FetchEvents(st3) = &st2.reading_state { - if st3.b.is_some() { - panic!() - } else { - // Try the same with the 2nd slot - if let Some(ts) = self.msp_buf.pop_front() { - trace_fetch!("setup_fwd_read {} SECONDARY SLOT", ts.fmt()); - let jobtrace = ReadJobTrace::new(); - let scyqueue = self.scyqueue.clone(); - let fut = self.make_read_events_fut(ts, false, scyqueue, jobtrace); - if let State::ReadingFwd(st2) = &mut self.state { - if let ReadingState::FetchEvents(st3) = &mut st2.reading_state { - if st3.b.is_some() { - panic!() - } else { - st3.b = Some(FetchEvents2 { fut: Fst::Ongoing(fut) }); - } - } - } - } else { - // nothing to do - } - } - } + let qu = if let State::ReadingFwd(st2) = &mut self.state { + if let ReadingState::FetchEvents(st3) = &mut st2.reading_state { + &mut st3.qu + } else { + panic!() } } else { - trace_fetch!("setup_fwd_read no msp"); + panic!() + }; + trace_fetch!("setup_fwd_read {} {} BEFORE", self.msp_buf.len(), qu.len()); + while qu.has_space() { + if let Some(ts) = self.msp_buf.pop_front() { + trace_fetch!("setup_fwd_read {} FILL A SLOT", ts.fmt()); + let jobtrace = ReadJobTrace::new(); + let scyqueue = self.scyqueue.clone(); + let rt = self.rt.clone(); + let series = self.series.clone(); + let range = self.range.clone(); + let readopts = self.readopts.clone(); + let ch_conf = self.ch_conf.clone(); + let fut = + Self::make_read_events_fut(ts, false, rt, series, range, readopts, ch_conf, scyqueue, jobtrace); + qu.push(fut); + } else { + break; + } + } + trace_fetch!("setup_fwd_read {} {} AFTER", self.msp_buf.len(), qu.len()); + if self.msp_buf.len() == 0 && qu.len() == 0 { + trace_msp_fetch!("setup_fwd_read no msp"); let fut = Self::make_msp_read_fut(&mut self.msp_inp); self.state = State::ReadingFwd(ReadingFwd { reading_state: ReadingState::FetchMsp(FetchMsp { fut }), @@ -431,6 +507,7 @@ impl Stream for EventsStreamRt { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; + // return Ready(Some(Err(Error::Logic))); loop { if let Some(mut item) = self.out.pop_front() { if !item.verify() { @@ -521,126 +598,118 @@ impl Stream for EventsStreamRt { } State::ReadingBck(st) => match &mut st.reading_state { ReadingState::FetchMsp(st2) => match st2.fut.poll_unpin(cx) { - Ready(Some(Ok(ts))) => { - trace_fetch!("ReadingBck FetchMsp {}", ts.fmt()); - self.msp_buf.push_back(ts); - if ts.ns() >= self.range.beg() { + Ready(Ok(a)) => { + // trace_fetch!("ReadingBck FetchMsp {}", a.fmt()); + if a.len() == 0 { self.transition_to_bck_read(); + continue; } else { - let fut = Self::make_msp_read_fut(&mut self.msp_inp); - self.state = State::ReadingBck(ReadingBck { - reading_state: ReadingState::FetchMsp(FetchMsp { fut }), - }); + for x in a { + self.msp_buf.push_back(x); + } + if let Some(ts) = self.msp_buf.back() { + if ts.ns() >= self.range.beg() { + self.transition_to_bck_read(); + } else { + let fut = Self::make_msp_read_fut(&mut self.msp_inp); + self.state = State::ReadingBck(ReadingBck { + reading_state: ReadingState::FetchMsp(FetchMsp { fut }), + }); + } + } else { + panic!("absolutely nothing to read"); + } + continue; } - continue; - } - Ready(Some(Err(e))) => Ready(Some(Err(e.into()))), - Ready(None) => { - self.transition_to_bck_read(); - continue; } + Ready(Err(e)) => Ready(Some(Err(e.into()))), Pending => Pending, }, - ReadingState::FetchEvents(st2) => match st2.a.fut.poll_unpin(cx) { - Ready(()) => match st2.a.fut.take_if_ready() { - Ready(Some(x)) => match x { - Ok((mut evs, jobtrace)) => { - use items_2::merger::Mergeable; - trace!("ReadingBck {jobtrace}"); - trace_fetch!("ReadingBck FetchEvents got len {}", evs.len()); - for ts in Mergeable::tss(&evs) { - trace_every_event!("ReadingBck FetchEvents ts {}", ts.fmt()); - } - if let Some(ix) = Mergeable::find_highest_index_lt(&evs, self.range.beg().ns()) { - trace_fetch!("ReadingBck FetchEvents find_highest_index_lt {:?}", ix); - let mut y = Mergeable::new_empty(&evs); - match Mergeable::drain_into(&mut evs, &mut y, (ix, 1 + ix)) { - Ok(()) => { - trace_fetch!("ReadingBck FetchEvents drained y len {:?}", y.len()); - self.out.push_back(y); - self.transition_to_fwd_read(); - continue; - } - Err(e) => { - self.state = State::Done; - Ready(Some(Err(e.into()))) - } + ReadingState::FetchEvents(st2) => match st2.qu.poll_next_unpin(cx) { + Ready(Some(x)) => match x { + Ok((mut evs, jobtrace)) => { + use items_2::merger::Mergeable; + trace_fetch!("ReadingBck {jobtrace}"); + trace_fetch!("ReadingBck FetchEvents got len {}", evs.len()); + for ts in Mergeable::tss(&evs) { + trace_every_event!("ReadingBck FetchEvents ts {}", ts.fmt()); + } + if let Some(ix) = Mergeable::find_highest_index_lt(&evs, self.range.beg().ns()) { + trace_fetch!("ReadingBck FetchEvents find_highest_index_lt {:?}", ix); + let mut y = Mergeable::new_empty(&evs); + match Mergeable::drain_into(&mut evs, &mut y, (ix, 1 + ix)) { + Ok(()) => { + trace_fetch!("ReadingBck FetchEvents drained y len {:?}", y.len()); + self.out.push_back(y); + self.transition_to_fwd_read(); + continue; + } + Err(e) => { + self.state = State::Done; + Ready(Some(Err(e.into()))) } - } else { - trace_fetch!("ReadingBck FetchEvents find_highest_index_lt None"); - self.setup_bck_read(); - continue; } + } else { + trace_fetch!("ReadingBck FetchEvents find_highest_index_lt None"); + self.setup_bck_read(); + continue; } - Err(e) => { - self.state = State::Done; - Ready(Some(Err(e))) - } - }, - Ready(None) => { - self.state = State::Done; - Ready(Some(Err(Error::AlreadyTaken))) } - Pending => { + Err(e) => { self.state = State::Done; - Ready(Some(Err(Error::Logic))) + Ready(Some(Err(e))) } }, + Ready(None) => { + self.state = State::Done; + Ready(Some(Err(Error::ReadQueueEmptyBck))) + } Pending => Pending, }, }, State::ReadingFwd(st) => match &mut st.reading_state { ReadingState::FetchMsp(st2) => match st2.fut.poll_unpin(cx) { - Ready(Some(Ok(ts))) => { - trace_fetch!("ReadingFwd FetchMsp {}", ts.fmt()); - self.msp_buf.push_back(ts); - self.setup_fwd_read(); - continue; + Ready(Ok(a)) => { + // trace_fetch!("ReadingFwd FetchMsp {}", ts.fmt()); + for x in a { + self.msp_buf.push_back(x); + } + if self.msp_buf.len() == 0 { + self.state = State::InputDone; + continue; + } else { + self.setup_fwd_read(); + continue; + } } - Ready(Some(Err(e))) => Ready(Some(Err(e.into()))), + Ready(Err(e)) => Ready(Some(Err(e.into()))), + Pending => Pending, + }, + ReadingState::FetchEvents(st2) => match st2.qu.poll_next_unpin(cx) { + Ready(Some(x)) => match x { + Ok((evs, mut jobtrace)) => { + jobtrace + .add_event_now(crate::events::ReadEventKind::EventsStreamRtSees(evs.len() as u32)); + use items_2::merger::Mergeable; + trace_fetch!("ReadingFwd {jobtrace}"); + for ts in Mergeable::tss(&evs) { + trace_every_event!("ReadingFwd FetchEvents ts {}", ts.fmt()); + } + self.out.push_back(evs); + self.setup_fwd_read(); + continue; + } + Err(e) => { + self.state = State::Done; + Ready(Some(Err(e.into()))) + } + }, Ready(None) => { - self.state = State::InputDone; - continue; + self.state = State::Done; + Ready(Some(Err(Error::ReadQueueEmptyFwd))) } Pending => Pending, }, - ReadingState::FetchEvents(st2) => { - let _ = st2.a.fut.poll_unpin(cx); - if let Some(st3) = st2.b.as_mut() { - let _ = st3.fut.poll_unpin(cx); - } - match st2.a.fut.take_if_ready() { - Ready(Some(x)) => { - if let Some(b) = st2.b.take() { - st2.a = b; - } - match x { - Ok((evs, mut jobtrace)) => { - jobtrace.add_event_now(crate::events::ReadEventKind::EventsStreamRtSees( - evs.len() as u32, - )); - use items_2::merger::Mergeable; - trace!("ReadingFwd {jobtrace}"); - for ts in Mergeable::tss(&evs) { - trace_every_event!("ReadingFwd FetchEvents ts {}", ts.fmt()); - } - self.out.push_back(evs); - self.setup_fwd_read(); - continue; - } - Err(e) => { - self.state = State::Done; - Ready(Some(Err(e.into()))) - } - } - } - Ready(None) => { - self.state = State::Done; - Ready(Some(Err(Error::Logic))) - } - Pending => Pending, - } - } }, State::InputDone => { if self.out.len() == 0 { diff --git a/crates/scyllaconn/src/worker.rs b/crates/scyllaconn/src/worker.rs index bf518fd..dad55f8 100644 --- a/crates/scyllaconn/src/worker.rs +++ b/crates/scyllaconn/src/worker.rs @@ -8,6 +8,7 @@ use async_channel::Sender; use err::thiserror; use err::ThisError; use futures_util::Future; +use futures_util::StreamExt; use items_0::Events; use items_2::binning::container_bins::ContainerBins; use netpod::log::*; @@ -205,7 +206,7 @@ impl ScyllaWorker { scyconf_mt: ScyllaConfig, scyconf_lt: ScyllaConfig, ) -> Result<(ScyllaQueue, Self), Error> { - let (tx, rx) = async_channel::bounded(64); + let (tx, rx) = async_channel::bounded(200); let queue = ScyllaQueue { tx }; let worker = Self { rx, @@ -229,61 +230,64 @@ impl ScyllaWorker { self.scyconf_mt.keyspace.as_str(), self.scyconf_lt.keyspace.as_str(), ]; - info!("scylla worker PREPARE START"); + debug!("scylla worker prepare start"); let stmts = StmtsEvents::new(kss.try_into().map_err(|_| Error::MissingKeyspaceConfig)?, &scy).await?; let stmts = Arc::new(stmts); let stmts_cache = StmtsCache::new(kss[0], &scy).await?; let stmts_cache = Arc::new(stmts_cache); - info!("scylla worker PREPARE DONE"); - loop { - let x = self.rx.recv().await; - let job = match x { - Ok(x) => x, - Err(_) => { - break; - } - }; - match job { - Job::FindTsMsp(rt, series, range, bck, tx) => { - let res = crate::events2::msp::find_ts_msp(&rt, series, range, bck, &stmts, &scy).await; - if tx.send(res.map_err(Into::into)).await.is_err() { - // TODO count for stats + debug!("scylla worker prepare done"); + self.rx + .map(|job| async { + match job { + Job::FindTsMsp(rt, series, range, bck, tx) => { + let res = crate::events2::msp::find_ts_msp(&rt, series, range, bck, &stmts, &scy).await; + if tx.send(res.map_err(Into::into)).await.is_err() { + // TODO count for stats + } + } + Job::ReadNextValues(job) => { + let fut = (job.futgen)(scy.clone(), stmts.clone(), job.jobtrace); + let res = fut.await; + if job.tx.send(res.map_err(Into::into)).await.is_err() { + // TODO count for stats + } + } + Job::AccountingReadTs(rt, ts, tx) => { + let ks = match &rt { + RetentionTime::Short => &self.scyconf_st.keyspace, + RetentionTime::Medium => &self.scyconf_mt.keyspace, + RetentionTime::Long => &self.scyconf_lt.keyspace, + }; + let res = crate::accounting::toplist::read_ts(&ks, rt, ts, &scy).await; + if tx.send(res.map_err(Into::into)).await.is_err() { + // TODO count for stats + } + } + Job::WriteCacheF32(series, bins, tx) => { + let res = super::bincache::worker_write(series, bins, &stmts_cache, &scy).await; + if tx.send(res).await.is_err() { + // TODO count for stats + } + } + Job::ReadCacheF32(job) => { + let res = super::bincache::worker_read( + job.series, + job.bin_len, + job.msp, + job.offs, + &stmts_cache, + &scy, + ) + .await; + if job.tx.send(res).await.is_err() { + // TODO count for stats + } } } - Job::ReadNextValues(job) => { - let fut = (job.futgen)(scy.clone(), stmts.clone(), job.jobtrace); - let res = fut.await; - if job.tx.send(res.map_err(Into::into)).await.is_err() { - // TODO count for stats - } - } - Job::AccountingReadTs(rt, ts, tx) => { - let ks = match &rt { - RetentionTime::Short => &self.scyconf_st.keyspace, - RetentionTime::Medium => &self.scyconf_mt.keyspace, - RetentionTime::Long => &self.scyconf_lt.keyspace, - }; - let res = crate::accounting::toplist::read_ts(&ks, rt, ts, &scy).await; - if tx.send(res.map_err(Into::into)).await.is_err() { - // TODO count for stats - } - } - Job::WriteCacheF32(series, bins, tx) => { - let res = super::bincache::worker_write(series, bins, &stmts_cache, &scy).await; - if tx.send(res).await.is_err() { - // TODO count for stats - } - } - Job::ReadCacheF32(job) => { - let res = - super::bincache::worker_read(job.series, job.bin_len, job.msp, job.offs, &stmts_cache, &scy) - .await; - if job.tx.send(res).await.is_err() { - // TODO count for stats - } - } - } - } + }) + .buffer_unordered(80) + .for_each(|_| futures_util::future::ready(())) + .await; info!("scylla worker finished"); Ok(()) }