diff --git a/crates/err/src/lib.rs b/crates/err/src/lib.rs index fa36b5b..5e401be 100644 --- a/crates/err/src/lib.rs +++ b/crates/err/src/lib.rs @@ -153,16 +153,6 @@ impl Error { pub fn reason(&self) -> Option { self.reason.clone() } - - pub fn to_public_error(&self) -> PublicError { - PublicError { - reason: self.reason(), - msg: self - .public_msg() - .map(|k| k.join("; ")) - .unwrap_or("No error message".into()), - } - } } #[allow(unused)] @@ -284,7 +274,7 @@ impl From for Error { Self { msg: String::new(), trace_str: None, - public_msg: Some(vec![k.msg().into()]), + public_msg: Some(k.msg.clone()), reason: k.reason(), parent: None, } @@ -462,7 +452,7 @@ impl From for Error { #[derive(Debug, Serialize, Deserialize)] pub struct PublicError { reason: Option, - msg: String, + msg: Vec, } impl PublicError { @@ -470,17 +460,25 @@ impl PublicError { self.reason.clone() } - pub fn msg(&self) -> &str { + pub fn msg(&self) -> &Vec { &self.msg } } -// TODO make this more useful +impl From for PublicError { + fn from(value: String) -> Self { + Self { + reason: None, + msg: vec![value], + } + } +} + impl From for PublicError { fn from(k: Error) -> Self { Self { reason: k.reason(), - msg: k.msg().into(), + msg: k.public_msg().map(Clone::clone).unwrap_or(Vec::new()), } } } @@ -489,21 +487,20 @@ impl From<&Error> for PublicError { fn from(k: &Error) -> Self { Self { reason: k.reason(), - msg: k.msg().into(), + msg: vec![k.msg().into()], } } } impl fmt::Display for PublicError { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - write!(fmt, "{}", self.msg) + write!(fmt, "{:?}", self.msg) } } impl ToPublicError for Error { - fn to_public_error(&self) -> String { - let e = PublicError::from(self); - e.msg().into() + fn to_public_error(&self) -> PublicError { + PublicError::from(self) } } @@ -520,7 +517,7 @@ pub fn todoval() -> T { } pub trait ToPublicError: std::error::Error + Send { - fn to_public_error(&self) -> String; + fn to_public_error(&self) -> PublicError; } #[cfg(test)] diff --git a/crates/httpret/src/api4/accounting.rs b/crates/httpret/src/api4/accounting.rs index a7b6b42..795593e 100644 --- a/crates/httpret/src/api4/accounting.rs +++ b/crates/httpret/src/api4/accounting.rs @@ -1,7 +1,10 @@ use crate::bodystream::response; +use crate::bodystream::ToPublicResponse; use crate::err::Error; use crate::requests::accepts_json_or_all; use crate::ReqCtx; +use err::PublicError; +use err::ToPublicError; use futures_util::StreamExt; use http::Method; use http::StatusCode; @@ -14,43 +17,33 @@ use httpclient::ToJsonBody; use items_0::Empty; use items_0::Extendable; use items_2::accounting::AccountingEvents; -use items_2::channelevents::ChannelStatusEvents; use netpod::log::*; -use netpod::query::ChannelStateEventsQuery; use netpod::req_uri_to_url; use netpod::FromUrl; use netpod::NodeConfigCached; +use query::api4::AccountingIngestedBytesQuery; pub struct AccountingIngestedBytes {} impl AccountingIngestedBytes { pub fn handler(req: &Requ) -> Option { - if req.uri().path().starts_with("/api/4/status/accounting/ingested/bytes/") { + if req.uri().path().starts_with("/api/4/accounting/ingested/bytes") { Some(Self {}) } else { None } } - pub async fn handle( - &self, - req: Requ, - _ctx: &ReqCtx, - node_config: &NodeConfigCached, - ) -> Result { + pub async fn handle(&self, req: Requ, ctx: &ReqCtx, ncc: &NodeConfigCached) -> Result { if req.method() == Method::GET { if accepts_json_or_all(req.headers()) { - let url = req_uri_to_url(req.uri())?; - let q = ChannelStateEventsQuery::from_url(&url)?; - match self.fetch_data(&q, node_config).await { - Ok(k) => { - let body = ToJsonBody::from(&k).into_body(); - Ok(response(StatusCode::OK).body(body)?) - } + match self.handle_get(req, ctx, ncc).await { + Ok(x) => Ok(x), Err(e) => { error!("{e}"); - Ok(response(StatusCode::INTERNAL_SERVER_ERROR) - .body(body_string(format!("{:?}", e.public_msg())))?) + let e2 = e.to_public_error(); + let s = serde_json::to_string(&e2)?; + Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(body_string(s))?) } } } else { @@ -61,12 +54,21 @@ impl AccountingIngestedBytes { } } + async fn handle_get(&self, req: Requ, ctx: &ReqCtx, ncc: &NodeConfigCached) -> Result { + let url = req_uri_to_url(req.uri())?; + let q = AccountingIngestedBytesQuery::from_url(&url)?; + let res = self.fetch_data(q, ctx, ncc).await?; + let body = ToJsonBody::from(&res).into_body(); + Ok(response(StatusCode::OK).body(body)?) + } + async fn fetch_data( &self, - q: &ChannelStateEventsQuery, - node_config: &NodeConfigCached, + q: AccountingIngestedBytesQuery, + _ctx: &ReqCtx, + ncc: &NodeConfigCached, ) -> Result { - let scyco = node_config + let scyco = ncc .node_config .cluster .scylla @@ -75,7 +77,7 @@ impl AccountingIngestedBytes { let scy = scyllaconn::conn::create_scy_session(scyco).await?; // TODO so far, we sum over everything let series_id = 0; - let mut stream = scyllaconn::accounting::AccountingStreamScylla::new(series_id, q.range().clone(), scy); + let mut stream = scyllaconn::accounting::AccountingStreamScylla::new(q.range().try_into()?, scy); let mut ret = AccountingEvents::empty(); while let Some(item) = stream.next().await { let mut item = item?; diff --git a/crates/httpret/src/api4/databuffer_tools.rs b/crates/httpret/src/api4/databuffer_tools.rs index 0c6430d..f1a83d9 100644 --- a/crates/httpret/src/api4/databuffer_tools.rs +++ b/crates/httpret/src/api4/databuffer_tools.rs @@ -4,6 +4,7 @@ use async_channel::Receiver; use async_channel::Sender; use bytes::Bytes; use err::thiserror; +use err::PublicError; use err::ThisError; use err::ToPublicError; use futures_util::Stream; @@ -41,14 +42,14 @@ pub enum FindActiveError { } impl ToPublicError for FindActiveError { - fn to_public_error(&self) -> String { + fn to_public_error(&self) -> PublicError { match self { - FindActiveError::HttpBadAccept => format!("{self}"), - FindActiveError::HttpBadUrl => format!("{self}"), + FindActiveError::HttpBadAccept => format!("{self}").into(), + FindActiveError::HttpBadUrl => format!("{self}").into(), FindActiveError::Error(e) => e.to_public_error(), - FindActiveError::UrlError(_) => format!("{self}"), - FindActiveError::InternalError => format!("{self}"), - FindActiveError::IO(_) => format!("{self}"), + FindActiveError::UrlError(_) => format!("{self}").into(), + FindActiveError::InternalError => format!("{self}").into(), + FindActiveError::IO(_) => format!("{self}").into(), } } } diff --git a/crates/httpret/src/api4/eventdata.rs b/crates/httpret/src/api4/eventdata.rs index ab007eb..36ff716 100644 --- a/crates/httpret/src/api4/eventdata.rs +++ b/crates/httpret/src/api4/eventdata.rs @@ -2,6 +2,7 @@ use crate::bodystream::response_err_msg; use crate::response; use crate::ReqCtx; use err::thiserror; +use err::PublicError; use err::ThisError; use err::ToPublicError; use http::Method; @@ -24,11 +25,11 @@ pub enum EventDataError { } impl ToPublicError for EventDataError { - fn to_public_error(&self) -> String { + fn to_public_error(&self) -> PublicError { match self { - EventDataError::QueryParse => format!("{self}"), + EventDataError::QueryParse => format!("{self}").into(), EventDataError::Error(e) => e.to_public_error(), - EventDataError::InternalError => format!("{self}"), + EventDataError::InternalError => format!("{self}").into(), } } } diff --git a/crates/httpret/src/bodystream.rs b/crates/httpret/src/bodystream.rs index 84ce003..0d49de1 100644 --- a/crates/httpret/src/bodystream.rs +++ b/crates/httpret/src/bodystream.rs @@ -1,5 +1,6 @@ use crate::err::Error; use crate::RetrievalError; +use err::ToPublicError; use http::Response; use http::StatusCode; use httpclient::body_empty; diff --git a/crates/httpret/src/err.rs b/crates/httpret/src/err.rs index 55637f7..0606233 100644 --- a/crates/httpret/src/err.rs +++ b/crates/httpret/src/err.rs @@ -1,5 +1,7 @@ +use err::ToPublicError; use serde::Deserialize; use serde::Serialize; +use serde_json::Value as JsVal; use std::fmt; use taskrun::tokio; @@ -56,6 +58,12 @@ impl fmt::Display for Error { } } +impl ToPublicError for Error { + fn to_public_error(&self) -> err::PublicError { + err::PublicError::from(&self.0) + } +} + impl std::error::Error for Error {} impl From for Error { diff --git a/crates/httpret/src/httpret.rs b/crates/httpret/src/httpret.rs index e20c321..26fdf9d 100644 --- a/crates/httpret/src/httpret.rs +++ b/crates/httpret/src/httpret.rs @@ -341,6 +341,8 @@ async fn http_service_inner( Ok(h.handle(req, &node_config).await?) } else if let Some(h) = channelconfig::AmbigiousChannelNames::handler(&req) { Ok(h.handle(req, &node_config).await?) + } else if let Some(h) = api4::accounting::AccountingIngestedBytes::handler(&req) { + Ok(h.handle(req, ctx, &node_config).await?) } else if path == "/api/4/prebinned" { if req.method() == Method::GET { Ok(prebinned(req, ctx, &node_config).await?) diff --git a/crates/items_2/src/accounting.rs b/crates/items_2/src/accounting.rs index f8ca0a1..b3009f6 100644 --- a/crates/items_2/src/accounting.rs +++ b/crates/items_2/src/accounting.rs @@ -8,6 +8,7 @@ use std::collections::VecDeque; #[derive(Debug, Serialize, Deserialize)] pub struct AccountingEvents { pub tss: VecDeque, + pub count: VecDeque, pub bytes: VecDeque, } @@ -15,6 +16,7 @@ impl Empty for AccountingEvents { fn empty() -> Self { Self { tss: VecDeque::new(), + count: VecDeque::new(), bytes: VecDeque::new(), } } @@ -31,6 +33,8 @@ impl Extendable for AccountingEvents { use core::mem::replace; let v = replace(&mut src.tss, VecDeque::new()); self.tss.extend(v.into_iter()); + let v = replace(&mut src.count, VecDeque::new()); + self.count.extend(v.into_iter()); let v = replace(&mut src.bytes, VecDeque::new()); self.bytes.extend(v.into_iter()); } diff --git a/crates/netpod/src/netpod.rs b/crates/netpod/src/netpod.rs index 811f135..48a288c 100644 --- a/crates/netpod/src/netpod.rs +++ b/crates/netpod/src/netpod.rs @@ -3397,7 +3397,11 @@ impl From<&StatusBoardEntry> for StatusBoardEntryUser { error_count: e.error_count, warn_count: e.warn_count, channel_not_found: e.channel_not_found, - errors: e.errors.iter().map(|e| e.to_public_error()).collect(), + errors: e + .errors + .iter() + .map(|e| err::ToPublicError::to_public_error(e)) + .collect(), } } } diff --git a/crates/netpod/src/range/evrange.rs b/crates/netpod/src/range/evrange.rs index 73a9625..a6e6b8c 100644 --- a/crates/netpod/src/range/evrange.rs +++ b/crates/netpod/src/range/evrange.rs @@ -1,5 +1,9 @@ +use crate::query::PulseRangeQuery; +use crate::query::TimeRangeQuery; use crate::timeunits::SEC; +use crate::AppendToUrl; use crate::Dim0Kind; +use crate::FromUrl; use crate::TsNano; use chrono::DateTime; use chrono::TimeZone; @@ -7,7 +11,9 @@ use chrono::Utc; use err::Error; use serde::Deserialize; use serde::Serialize; +use std::collections::BTreeMap; use std::fmt; +use url::Url; #[derive(Clone, Debug, Serialize, Deserialize)] pub enum TimeRange { @@ -77,7 +83,7 @@ impl TryFrom<&SeriesRange> for NanoRange { fn try_from(val: &SeriesRange) -> Result { match val { SeriesRange::TimeRange(x) => Ok(x.clone()), - SeriesRange::PulseRange(_) => Err(Error::with_msg_no_trace("not a Time range")), + SeriesRange::PulseRange(_) => Err(Error::with_public_msg_no_trace("given SeriesRange is not a time range")), } } } @@ -149,3 +155,30 @@ impl From for SeriesRange { Self::PulseRange(k) } } + +impl FromUrl for SeriesRange { + fn from_url(url: &url::Url) -> Result { + let pairs = crate::get_url_query_pairs(url); + Self::from_pairs(&pairs) + } + + fn from_pairs(pairs: &BTreeMap) -> Result { + let ret = if let Ok(x) = TimeRangeQuery::from_pairs(pairs) { + SeriesRange::TimeRange(x.into()) + } else if let Ok(x) = PulseRangeQuery::from_pairs(pairs) { + SeriesRange::PulseRange(x.into()) + } else { + return Err(Error::with_public_msg_no_trace("no time range in url")); + }; + Ok(ret) + } +} + +impl AppendToUrl for SeriesRange { + fn append_to_url(&self, url: &mut Url) { + match self { + SeriesRange::TimeRange(k) => TimeRangeQuery::from(k).append_to_url(url), + SeriesRange::PulseRange(k) => PulseRangeQuery::from(k).append_to_url(url), + } + } +} diff --git a/crates/query/src/api4.rs b/crates/query/src/api4.rs index f6a084b..ee22543 100644 --- a/crates/query/src/api4.rs +++ b/crates/query/src/api4.rs @@ -3,17 +3,27 @@ pub mod events; use err::Error; use netpod::get_url_query_pairs; +use netpod::range::evrange::SeriesRange; use netpod::AppendToUrl; use netpod::FromUrl; use netpod::HasBackend; use netpod::HasTimeout; use serde::Deserialize; use serde::Serialize; +use std::collections::BTreeMap; use std::time::Duration; +use url::Url; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct AccountingIngestedBytesQuery { backend: String, + range: SeriesRange, +} + +impl AccountingIngestedBytesQuery { + pub fn range(&self) -> &SeriesRange { + &self.range + } } impl HasBackend for AccountingIngestedBytesQuery { @@ -29,25 +39,29 @@ impl HasTimeout for AccountingIngestedBytesQuery { } impl FromUrl for AccountingIngestedBytesQuery { - fn from_url(url: &url::Url) -> Result { + fn from_url(url: &Url) -> Result { let pairs = get_url_query_pairs(url); Self::from_pairs(&pairs) } - fn from_pairs(pairs: &std::collections::BTreeMap) -> Result { + fn from_pairs(pairs: &BTreeMap) -> Result { let ret = Self { backend: pairs .get("backend") .ok_or_else(|| Error::with_msg_no_trace("missing backend"))? .to_string(), + range: SeriesRange::from_pairs(pairs)?, }; Ok(ret) } } impl AppendToUrl for AccountingIngestedBytesQuery { - fn append_to_url(&self, url: &mut url::Url) { - let mut g = url.query_pairs_mut(); - g.append_pair("backend", &self.backend); + fn append_to_url(&self, url: &mut Url) { + { + let mut g = url.query_pairs_mut(); + g.append_pair("backend", &self.backend); + } + self.range.append_to_url(url); } } diff --git a/crates/query/src/api4/binned.rs b/crates/query/src/api4/binned.rs index 55609d4..9224e9a 100644 --- a/crates/query/src/api4/binned.rs +++ b/crates/query/src/api4/binned.rs @@ -171,16 +171,9 @@ impl FromUrl for BinnedQuery { } fn from_pairs(pairs: &BTreeMap) -> Result { - let range = if let Ok(x) = TimeRangeQuery::from_pairs(pairs) { - SeriesRange::TimeRange(x.into()) - } else if let Ok(x) = PulseRangeQuery::from_pairs(pairs) { - SeriesRange::PulseRange(x.into()) - } else { - return Err(Error::with_msg_no_trace("no series range in url")); - }; let ret = Self { channel: SfDbChannel::from_pairs(&pairs)?, - range, + range: SeriesRange::from_pairs(pairs)?, bin_count: pairs .get("binCount") .ok_or_else(|| Error::with_msg_no_trace("missing binCount"))? @@ -218,11 +211,8 @@ impl FromUrl for BinnedQuery { impl AppendToUrl for BinnedQuery { fn append_to_url(&self, url: &mut Url) { - match &self.range { - SeriesRange::TimeRange(k) => TimeRangeQuery::from(k).append_to_url(url), - SeriesRange::PulseRange(k) => PulseRangeQuery::from(k).append_to_url(url), - } self.channel.append_to_url(url); + self.range.append_to_url(url); { let mut g = url.query_pairs_mut(); g.append_pair("binCount", &format!("{}", self.bin_count)); diff --git a/crates/query/src/api4/events.rs b/crates/query/src/api4/events.rs index 697e8f5..719d296 100644 --- a/crates/query/src/api4/events.rs +++ b/crates/query/src/api4/events.rs @@ -198,7 +198,7 @@ impl FromUrl for PlainEventsQuery { } else if let Ok(x) = PulseRangeQuery::from_pairs(pairs) { SeriesRange::PulseRange(x.into()) } else { - return Err(Error::with_msg_no_trace("no series range in url")); + return Err(Error::with_public_msg_no_trace("no time range in url")); }; let ret = Self { channel: SfDbChannel::from_pairs(pairs)?, diff --git a/crates/scyllaconn/src/accounting.rs b/crates/scyllaconn/src/accounting.rs index 5be013e..d95ca53 100644 --- a/crates/scyllaconn/src/accounting.rs +++ b/crates/scyllaconn/src/accounting.rs @@ -3,6 +3,7 @@ use err::Error; use futures_util::Future; use futures_util::FutureExt; use futures_util::Stream; +use futures_util::StreamExt; use items_0::Empty; use items_0::Extendable; use items_0::WithLen; @@ -11,6 +12,7 @@ use netpod::log::*; use netpod::range::evrange::NanoRange; use netpod::timeunits; use netpod::EMIT_ACCOUNTING_SNAP; +use scylla::prepared_statement::PreparedStatement; use scylla::Session as ScySession; use std::collections::VecDeque; use std::pin::Pin; @@ -18,73 +20,63 @@ use std::sync::Arc; use std::task::Context; use std::task::Poll; -async fn read_next(ts_msp: u64, range: NanoRange, fwd: bool, scy: Arc) -> Result { - if ts_msp >= range.end { - warn!( - "given ts_msp {} >= range.end {} not necessary to read this", - ts_msp, range.end - ); - } - if range.end > i64::MAX as u64 { - return Err(Error::with_msg_no_trace(format!("range.end overflows i64"))); - } +async fn read_next( + ts_msp: u64, + fwd: bool, + qu: PreparedStatement, + scy: Arc, +) -> Result { + type RowType = (i64, i64, i64); let mut ret = AccountingEvents::empty(); + let mut tot_count = 0; let mut tot_bytes = 0; for part in 0..255_u32 { - let res = if fwd { - let ts_lsp_min = if ts_msp < range.beg { range.beg - ts_msp } else { 0 }; - let ts_lsp_max = if ts_msp < range.end { range.end - ts_msp } else { 0 }; - trace!( - "FWD ts_msp {} ts_lsp_min {} ts_lsp_max {} beg {} end {}", - ts_msp, - ts_lsp_min, - ts_lsp_max, - range.beg, - range.end - ); - // TODO use prepared! - let cql = concat!("select series, count, bytes from account_00 where part = ? and ts = ?"); - scy.query(cql, (part as i32, ts_msp as i64)).await.err_conv()? + let mut res = if fwd { + scy.execute_iter(qu.clone(), (part as i32, ts_msp as i64)) + .await + .err_conv()? + .into_typed::() } else { return Err(Error::with_msg_no_trace("no backward support")); }; - type RowType = (i64, i64, i64); - for row in res.rows_typed_or_empty::() { - let row = row.err_conv()?; - let ts = ts_msp; - let series = row.0 as u64; + while let Some(row) = res.next().await { + let row = row.map_err(Error::from_string)?; + let _ts = ts_msp; + let _series = row.0 as u64; let count = row.1 as u64; - let bytes = row.1 as u64; + let bytes = row.2 as u64; + tot_count += count; tot_bytes += bytes; } } ret.tss.push_back(ts_msp); + ret.count.push_back(tot_count); ret.bytes.push_back(tot_bytes); - trace!("found in total {} events ts_msp {}", ret.len(), ts_msp); Ok(ret) } struct ReadValues { - series: u64, + #[allow(unused)] range: NanoRange, ts_msps: VecDeque, fwd: bool, + #[allow(unused)] do_one_before_range: bool, fut: Pin> + Send>>, scy: Arc, + qu: PreparedStatement, } impl ReadValues { fn new( - series: u64, range: NanoRange, ts_msps: VecDeque, fwd: bool, do_one_before_range: bool, scy: Arc, + qu: PreparedStatement, ) -> Self { let mut ret = Self { - series, range, ts_msps, fwd, @@ -93,6 +85,7 @@ impl ReadValues { "future not initialized", )))), scy, + qu, }; ret.next(); ret @@ -103,49 +96,65 @@ impl ReadValues { self.fut = self.make_fut(ts_msp); true } else { - debug!("no more msp"); false } } fn make_fut(&mut self, ts_msp: u64) -> Pin> + Send>> { - debug!("make fut for {ts_msp}"); - let fut = read_next(ts_msp, self.range.clone(), self.fwd, self.scy.clone()); + let fut = read_next(ts_msp, self.fwd, self.qu.clone(), self.scy.clone()); Box::pin(fut) } } enum FrState { New, + Prepare(PrepFut), + Start, ReadValues(ReadValues), Done, } +type PrepFut = Pin> + Send>>; + pub struct AccountingStreamScylla { state: FrState, - series: u64, range: NanoRange, scy: Arc, + qu_select: Option, outbuf: AccountingEvents, + poll_count: u32, } impl AccountingStreamScylla { - pub fn new(series: u64, range: NanoRange, scy: Arc) -> Self { + pub fn new(range: NanoRange, scy: Arc) -> Self { Self { state: FrState::New, - series, range, scy, + qu_select: None, outbuf: AccountingEvents::empty(), + poll_count: 0, } } } +async fn prep(cql: &str, scy: Arc) -> Result { + scy.prepare(cql) + .await + .map_err(|e| Error::with_msg_no_trace(format!("cql error {e}"))) +} + impl Stream for AccountingStreamScylla { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; + // debug!("poll {}", self.poll_count); + self.poll_count += 1; + if self.poll_count > 200000 { + debug!("abort high poll count"); + return Ready(None); + } let span = tracing::span!(tracing::Level::TRACE, "poll_next"); let _spg = span.enter(); loop { @@ -153,32 +162,59 @@ impl Stream for AccountingStreamScylla { let item = std::mem::replace(&mut self.outbuf, AccountingEvents::empty()); break Ready(Some(Ok(item))); } - break match self.state { + break match &mut self.state { FrState::New => { + let cql = concat!("select series, count, bytes from account_00 where part = ? and ts = ?"); + let fut = prep(cql, self.scy.clone()); + let fut: PrepFut = Box::pin(fut); + self.state = FrState::Prepare(fut); + continue; + } + FrState::Prepare(fut) => match fut.poll_unpin(cx) { + Ready(Ok(x)) => { + self.qu_select = Some(x); + self.state = FrState::Start; + continue; + } + Ready(Err(e)) => { + error!("{e}"); + Ready(Some(Err(Error::with_msg_no_trace("cql error")))) + } + Pending => Pending, + }, + FrState::Start => { let mut ts_msps = VecDeque::new(); let mut ts = self.range.beg / timeunits::SEC / EMIT_ACCOUNTING_SNAP * EMIT_ACCOUNTING_SNAP; - while ts < self.range.end { - debug!("use ts {ts}"); + let ts_e = self.range.end / timeunits::SEC / EMIT_ACCOUNTING_SNAP * EMIT_ACCOUNTING_SNAP; + while ts < ts_e { + if ts_msps.len() >= 100 { + debug!("too large time range requested"); + break; + } ts_msps.push_back(ts); ts += EMIT_ACCOUNTING_SNAP; } - let fwd = true; - let do_one_before_range = false; - let st = ReadValues::new( - self.series, - self.range.clone(), - ts_msps, - fwd, - do_one_before_range, - self.scy.clone(), - ); - self.state = FrState::ReadValues(st); - continue; + if ts_msps.len() == 0 { + self.state = FrState::Done; + continue; + } else { + let fwd = true; + let do_one_before_range = false; + let st = ReadValues::new( + self.range.clone(), + ts_msps, + fwd, + do_one_before_range, + self.scy.clone(), + self.qu_select.as_ref().unwrap().clone(), + ); + self.state = FrState::ReadValues(st); + continue; + } } FrState::ReadValues(ref mut st) => match st.fut.poll_unpin(cx) { Ready(Ok(mut item)) => { if !st.next() { - debug!("ReadValues exhausted"); self.state = FrState::Done; } self.outbuf.extend_from(&mut item);