Fixes and timeout handler

This commit is contained in:
Dominik Werder
2022-09-08 18:19:53 +02:00
parent eff80450ee
commit 62a1be0c4b
10 changed files with 275 additions and 192 deletions

View File

@@ -612,7 +612,7 @@ pub async fn channel_state_events(
ret.push((ts, kind));
}
}
ts_msp += DAY;
ts_msp += div;
if ts_msp >= evq.range().end {
break;
}

View File

@@ -7,11 +7,65 @@ use netpod::query::ChannelStateEventsQuery;
use netpod::{FromUrl, NodeConfigCached, ACCEPT_ALL, APP_JSON};
use url::Url;
pub struct ChannelStatusConnectionEvents {}
pub struct ConnectionStatusEvents {}
impl ChannelStatusConnectionEvents {
impl ConnectionStatusEvents {
pub fn handler(req: &Request<Body>) -> Option<Self> {
if req.uri().path() == "/api/4/channel/status/connection/events" {
if req.uri().path() == "/api/4/scylla/connection/status/events" {
Some(Self {})
} else {
None
}
}
pub async fn handle(&self, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
if req.method() == Method::GET {
let accept_def = APP_JSON;
let accept = req
.headers()
.get(http::header::ACCEPT)
.map_or(accept_def, |k| k.to_str().unwrap_or(accept_def));
if accept == APP_JSON || accept == ACCEPT_ALL {
let url = Url::parse(&format!("dummy:{}", req.uri()))?;
let q = ChannelStateEventsQuery::from_url(&url)?;
match self.fetch_data(&q, node_config).await {
Ok(k) => {
let body = Body::from(serde_json::to_vec(&k)?);
Ok(response(StatusCode::OK).body(body)?)
}
Err(e) => Ok(response(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::from(format!("{:?}", e.public_msg())))?),
}
} else {
Ok(response(StatusCode::BAD_REQUEST).body(Body::empty())?)
}
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
}
}
async fn fetch_data(
&self,
q: &ChannelStateEventsQuery,
node_config: &NodeConfigCached,
) -> Result<Vec<(u64, u32)>, Error> {
let dbconf = &node_config.node_config.cluster.database;
let scyco = node_config
.node_config
.cluster
.scylla
.as_ref()
.ok_or_else(|| Error::with_public_msg_no_trace(format!("No Scylla configured")))?;
let ret = channel_state_events(q, scyco, dbconf.clone()).await?;
Ok(ret)
}
}
pub struct ChannelConnectionStatusEvents {}
impl ChannelConnectionStatusEvents {
pub fn handler(req: &Request<Body>) -> Option<Self> {
if req.uri().path() == "/api/4/scylla/channel/connection/status/events" {
Some(Self {})
} else {
None

View File

@@ -4,7 +4,7 @@ use crate::{response, response_err, BodyStream, ToPublicResponse};
use futures_util::{Stream, StreamExt, TryStreamExt};
use http::{Method, Request, Response, StatusCode};
use hyper::Body;
use items_2::{binned_collected, ChannelEvents, ChannelEventsMerger};
use items_2::{binned_collected, empty_events_dyn, empty_events_dyn_2, ChannelEvents, ChannelEventsMerger};
use netpod::log::*;
use netpod::query::{BinnedQuery, ChannelStateEventsQuery, PlainEventsQuery};
use netpod::{AggKind, BinnedRange, FromUrl, NodeConfigCached};
@@ -14,6 +14,7 @@ use scyllaconn::errconv::ErrConv;
use scyllaconn::events::{channel_state_events, find_series, make_scylla_stream};
use std::pin::Pin;
use std::sync::Arc;
use std::time::Instant;
use url::Url;
pub struct EventsHandler {}
@@ -164,11 +165,13 @@ impl EventsHandlerScylla {
}
async fn gather(&self, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
let self_name = std::any::type_name::<Self>();
let (head, _body) = req.into_parts();
warn!("TODO PlainEventsQuery needs to take AggKind");
warn!("TODO PlainEventsQuery needs to take AggKind to do x-binning");
let s1 = format!("dummy:{}", head.uri);
let url = Url::parse(&s1)?;
let evq = PlainEventsQuery::from_url(&url)?;
let deadline = Instant::now() + evq.timeout();
let pgclient = {
// TODO use common connection/pool:
info!("--------------- open postgres connection");
@@ -182,16 +185,50 @@ impl EventsHandlerScylla {
let pgclient = Arc::new(pgclient);
pgclient
};
let mut stream = if let Some(scyco) = &node_config.node_config.cluster.scylla {
let scy = create_scy_session(scyco).await?;
let (series, scalar_type, shape) = { find_series(evq.channel(), pgclient.clone()).await? };
let stream = make_scylla_stream(&evq, series, scalar_type.clone(), shape.clone(), scy, false).await?;
stream
let scyco = if let Some(scyco) = &node_config.node_config.cluster.scylla {
scyco
} else {
return Err(Error::with_public_msg(format!("no scylla configured")));
};
let scy = create_scy_session(scyco).await?;
let do_one_before_range = evq.do_one_before_range();
let (series, scalar_type, shape) = find_series(evq.channel(), pgclient.clone()).await?;
let empty_item = empty_events_dyn_2(&scalar_type, &shape, &AggKind::TimeWeightedScalar);
let empty_stream =
futures_util::stream::once(futures_util::future::ready(Ok(ChannelEvents::Events(empty_item))));
let stream2 = make_scylla_stream(
&evq,
do_one_before_range,
series,
scalar_type.clone(),
shape.clone(),
scy,
false,
)
.await?;
let mut stream = empty_stream.chain(stream2);
let mut coll = None;
while let Some(item) = stream.next().await {
let mut fut = None;
loop {
// Alternative way, test what works better:
if fut.is_none() {
fut = Some(stream.next());
}
let item = match tokio::time::timeout_at(deadline.into(), fut.as_mut().unwrap()).await {
Ok(Some(item)) => {
fut.take();
item
}
Ok(None) => {
fut.take();
break;
}
Err(_) => {
warn!("{self_name} timeout");
fut.take();
break;
}
};
match item {
Ok(k) => match k {
ChannelEvents::Events(mut item) => {
@@ -218,10 +255,13 @@ impl EventsHandlerScylla {
Ok(ret)
}
None => {
let ret = response(StatusCode::OK).body(BodyStream::wrapped(
futures_util::stream::iter([Ok(Vec::new())]),
format!("EventsHandlerScylla::gather"),
))?;
error!("should never happen with changed logic, remove case");
err::todo();
let item = empty_events_dyn(&scalar_type, &shape, &AggKind::TimeWeightedScalar);
let res = item.to_box_to_json_result();
let res = res.to_json_result()?;
let res = res.to_json_bytes()?;
let ret = response(StatusCode::OK).body(Body::from(res))?;
Ok(ret)
}
}
@@ -269,10 +309,11 @@ impl BinnedHandlerScylla {
async fn gather(&self, req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
let (head, _body) = req.into_parts();
warn!("TODO BinnedQuery needs to take AggKind");
warn!("TODO BinnedQuery needs to take AggKind to do x-binngin");
let s1 = format!("dummy:{}", head.uri);
let url = Url::parse(&s1)?;
let evq = BinnedQuery::from_url(&url)?;
let do_one_before_range = evq.agg_kind().need_expand();
let pgclient = {
// TODO use common connection/pool:
info!("--------------- open postgres connection");
@@ -288,13 +329,23 @@ impl BinnedHandlerScylla {
};
if let Some(scyco) = &node_config.node_config.cluster.scylla {
let scy = create_scy_session(scyco).await?;
let mut query2 = PlainEventsQuery::new(evq.channel().clone(), evq.range().clone(), 0, None, false);
let covering = BinnedRange::covering_range(evq.range().clone(), evq.bin_count())?;
let range = covering.full_range();
let mut query2 = PlainEventsQuery::new(evq.channel().clone(), range.clone(), 0, None, false);
query2.set_timeout(evq.timeout());
let query2 = query2;
let (series, scalar_type, shape) = { find_series(evq.channel(), pgclient.clone()).await? };
let stream =
make_scylla_stream(&query2, series, scalar_type.clone(), shape.clone(), scy.clone(), false).await?;
let query3 = ChannelStateEventsQuery::new(evq.channel().clone(), evq.range().clone());
let (series, scalar_type, shape) = find_series(evq.channel(), pgclient.clone()).await?;
let stream = make_scylla_stream(
&query2,
do_one_before_range,
series,
scalar_type.clone(),
shape.clone(),
scy.clone(),
false,
)
.await?;
let query3 = ChannelStateEventsQuery::new(evq.channel().clone(), range.clone());
let state_stream = channel_state_events(&query3, scy.clone())
.await?
.map(|x| {
@@ -313,9 +364,6 @@ impl BinnedHandlerScylla {
let state_stream = Box::pin(state_stream) as _;
let merged_stream = ChannelEventsMerger::new(vec![data_stream, state_stream]);
let merged_stream = Box::pin(merged_stream) as Pin<Box<dyn Stream<Item = _> + Send>>;
let covering = BinnedRange::covering_range(evq.range().clone(), evq.bin_count())?;
//eprintln!("edges {:?}", covering.edges());
// TODO return partial result if timed out.
let binned_collected = binned_collected(
scalar_type.clone(),
shape.clone(),

View File

@@ -18,7 +18,6 @@ use crate::bodystream::response;
use crate::err::Error;
use crate::gather::gather_get_json;
use crate::pulsemap::UpdateTask;
use channel_status::ChannelStatusConnectionEvents;
use channelconfig::{chconf_from_binned, ChConf};
use disk::binned::query::PreBinnedQuery;
use future::Future;
@@ -249,7 +248,9 @@ async fn http_service_try(req: Request<Body>, node_config: &NodeConfigCached) ->
h.handle(req, &node_config).await
} else if let Some(h) = events::BinnedHandlerScylla::handler(&req) {
h.handle(req, &node_config).await
} else if let Some(h) = ChannelStatusConnectionEvents::handler(&req) {
} else if let Some(h) = channel_status::ConnectionStatusEvents::handler(&req) {
h.handle(req, &node_config).await
} else if let Some(h) = channel_status::ChannelConnectionStatusEvents::handler(&req) {
h.handle(req, &node_config).await
} else if path == "/api/4/binned" {
if req.method() == Method::GET {

View File

@@ -239,12 +239,11 @@ curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/events?channel
<li>channelName (e.g. "SLAAR-LSCP4-LAS6891:CH7:1")</li>
<li>begDate (e.g. "2021-05-26T07:10:00.000Z")</li>
<li>endDate (e.g. "2021-05-26T07:16:00.000Z")</li>
<li>binCount (number of requested bins in time-dimension, e.g. "6". The actual number of returned bins depends
on
bin-cache-grid-resolution. The server tries to find the best match.)</li>
<li>binCount (number of requested bins in time-dimension, e.g. "6". The actual number of returned bins depends on
bin-cache-grid-resolution. The server tries to find a reasonable match.)</li>
<li>binningScheme (optional)</li>
<ul>
<li>if not specified: default is "binningScheme=unweightedScalar".</li>
<li>if not specified: default is "binningScheme=timeWeightedScalar".</li>
<li>"binningScheme=unweightedScalar": non-weighted binning, waveform gets first averaged to a scalar.</li>
<li>"binningScheme=timeWeightedScalar": time-weighted binning, waveform gets first averaged to a scalar.
</li>
@@ -272,159 +271,38 @@ curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/binned?channel
<h4>Example response (without usage of binningScheme):</h4>
<pre>{
"avgs": [
16204.087890625,
16204.3798828125,
16203.9296875,
16204.232421875,
16202.974609375,
16203.208984375,
16203.4345703125
],
"counts": [
1000,
999,
1000,
999,
1000,
999,
1000
],
"finalisedRange": true,
"maxs": [
48096,
48100,
48094,
48096,
48096,
48095,
48096
],
"mins": [
0,
0,
0,
0,
0,
0,
0
],
"tsAnchor": 1623769850,
"tsMs": [
0,
10000,
20000,
30000,
40000,
50000,
60000,
70000
20000
],
"tsNs": [
0,
0,
0,
0,
0,
0,
0,
0
]
}
</pre>
<h4>Example response (waveform channel and usage of binningScheme):</h4>
<pre>{
"tsAnchor": 1623769950,
"tsMs": [
0,
10000,
20000,
30000,
40000,
50000,
60000,
70000
],
"tsNs": [
0,
0,
0,
0,
0,
0,
0,
0
],
"finalisedRange": true,
"counts": [
1000,
1000,
...
999,
1000
],
"avgs": [
[
0.013631398789584637,
34936.76953125,
45045.5078125,
31676.30859375,
880.7999877929688,
576.4010620117188,
295.1236877441406
],
[
0.01851877197623253,
34935.734375,
45044.2734375,
31675.359375,
880.7310791015625,
576.3038330078125,
295.06134033203125
],
...
16204.087890625,
16204.3798828125,
16203.9296875
],
"maxs": [
[
111,
48093,
45804,
47122,
1446,
783,
431
],
[
120,
48092,
45803,
47124,
1452,
782,
431
],
...
48096,
48100,
48094
],
"mins": [
[
0,
0,
44329,
267,
519,
394,
0
],
[
0,
0,
44327,
265,
514,
395,
0
],
...
]
0,
0,
0
],
"finalisedRange": true
}
</pre>

View File

@@ -186,6 +186,8 @@ pub struct BinsDim0CollectedResult<NTY> {
missing_bins: u32,
#[serde(skip_serializing_if = "Option::is_none", rename = "continueAt")]
continue_at: Option<IsoDateTime>,
#[serde(skip_serializing_if = "Option::is_none", rename = "finishedAt")]
finished_at: Option<IsoDateTime>,
}
impl<NTY> BinsDim0CollectedResult<NTY> {
@@ -273,16 +275,19 @@ impl<NTY: ScalarOps> CollectorType for BinsDim0Collector<NTY> {
fn result(&mut self) -> Result<Self::Output, Error> {
let bin_count = self.vals.ts1s.len() as u32;
let (missing_bins, continue_at) = if bin_count < self.bin_count_exp {
let (missing_bins, continue_at, finished_at) = if bin_count < self.bin_count_exp {
match self.vals.ts2s.back() {
Some(&k) => {
let iso = IsoDateTime(Utc.timestamp_nanos(k as i64));
(self.bin_count_exp - bin_count, Some(iso))
let missing_bins = self.bin_count_exp - bin_count;
let continue_at = IsoDateTime(Utc.timestamp_nanos(k as i64));
let u = k + (k - self.vals.ts1s.back().unwrap()) * missing_bins as u64;
let finished_at = IsoDateTime(Utc.timestamp_nanos(u as i64));
(missing_bins, Some(continue_at), Some(finished_at))
}
None => Err(Error::with_msg("partial_content but no bin in result"))?,
}
} else {
(0, None)
(0, None, None)
};
if self.vals.ts1s.as_slices().1.len() != 0 {
panic!();
@@ -309,6 +314,7 @@ impl<NTY: ScalarOps> CollectorType for BinsDim0Collector<NTY> {
finalised_range: self.range_complete,
missing_bins,
continue_at,
finished_at,
};
Ok(ret)
}

View File

@@ -292,6 +292,46 @@ pub trait TimeBinnableTypeAggregator: Send {
fn result_reset(&mut self, range: NanoRange, expand: bool) -> Self::Output;
}
pub fn empty_events_dyn_2(scalar_type: &ScalarType, shape: &Shape, agg_kind: &AggKind) -> Box<dyn Events> {
match shape {
Shape::Scalar => match agg_kind {
AggKind::TimeWeightedScalar => {
use ScalarType::*;
type K<T> = eventsdim0::EventsDim0<T>;
match scalar_type {
U8 => Box::new(K::<u8>::empty()),
U16 => Box::new(K::<u16>::empty()),
U32 => Box::new(K::<u32>::empty()),
U64 => Box::new(K::<u64>::empty()),
I8 => Box::new(K::<i8>::empty()),
I16 => Box::new(K::<i16>::empty()),
I32 => Box::new(K::<i32>::empty()),
I64 => Box::new(K::<i64>::empty()),
F32 => Box::new(K::<f32>::empty()),
F64 => Box::new(K::<f64>::empty()),
_ => {
error!("TODO empty_events_dyn {scalar_type:?} {shape:?} {agg_kind:?}");
err::todoval()
}
}
}
_ => {
error!("TODO empty_events_dyn {scalar_type:?} {shape:?} {agg_kind:?}");
err::todoval()
}
},
Shape::Wave(..) => {
error!("TODO empty_events_dyn {scalar_type:?} {shape:?} {agg_kind:?}");
err::todoval()
}
Shape::Image(..) => {
error!("TODO empty_events_dyn {scalar_type:?} {shape:?} {agg_kind:?}");
err::todoval()
}
}
}
// TODO needed any longer?
pub fn empty_events_dyn(scalar_type: &ScalarType, shape: &Shape, agg_kind: &AggKind) -> Box<dyn TimeBinnable> {
match shape {
Shape::Scalar => match agg_kind {
@@ -310,22 +350,22 @@ pub fn empty_events_dyn(scalar_type: &ScalarType, shape: &Shape, agg_kind: &AggK
F32 => Box::new(K::<f32>::empty()),
F64 => Box::new(K::<f64>::empty()),
_ => {
error!("TODO empty_events_dyn");
error!("TODO empty_events_dyn {scalar_type:?} {shape:?} {agg_kind:?}");
err::todoval()
}
}
}
_ => {
error!("TODO empty_events_dyn");
error!("TODO empty_events_dyn {scalar_type:?} {shape:?} {agg_kind:?}");
err::todoval()
}
},
Shape::Wave(..) => {
error!("TODO empty_events_dyn");
error!("TODO empty_events_dyn {scalar_type:?} {shape:?} {agg_kind:?}");
err::todoval()
}
Shape::Image(..) => {
error!("TODO empty_events_dyn");
error!("TODO empty_events_dyn {scalar_type:?} {shape:?} {agg_kind:?}");
err::todoval()
}
}
@@ -718,15 +758,17 @@ pub async fn binned_collected(
timeout: Duration,
inp: Pin<Box<dyn Stream<Item = Result<ChannelEvents, Error>> + Send>>,
) -> Result<Box<dyn ToJsonResult>, Error> {
let deadline = Instant::now() + timeout;
let bin_count_exp = edges.len().max(2) as u32 - 1;
let do_time_weight = agg_kind.do_time_weighted();
let mut coll = None;
let mut binner = None;
let mut inp = inp;
let deadline = Instant::now() + timeout;
let empty_item = empty_events_dyn_2(&scalar_type, &shape, &AggKind::TimeWeightedScalar);
let empty_stream = futures_util::stream::once(futures_util::future::ready(Ok(ChannelEvents::Events(empty_item))));
let mut stream = empty_stream.chain(inp);
loop {
let item = futures_util::select! {
k = inp.next().fuse() => {
k = stream.next().fuse() => {
if let Some(k) = k {
k?
}else {
@@ -798,6 +840,8 @@ pub async fn binned_collected(
Ok(res)
}
None => {
error!("TODO should never happen with changed logic, remove");
err::todo();
let item = empty_binned_dyn(&scalar_type, &shape, &AggKind::DimXBins1);
let ret = item.to_box_to_json_result();
Ok(ret)

View File

@@ -94,11 +94,11 @@ impl RawEventsQuery {
}
}
// TODO move this query type out of this `binned` mod
#[derive(Clone, Debug)]
pub struct PlainEventsQuery {
channel: Channel,
range: NanoRange,
do_one_before_range: bool,
disk_io_buffer_size: usize,
report_error: bool,
timeout: Duration,
@@ -119,6 +119,7 @@ impl PlainEventsQuery {
Self {
channel,
range,
do_one_before_range: false,
disk_io_buffer_size,
report_error: false,
timeout: Duration::from_millis(10000),
@@ -165,6 +166,10 @@ impl PlainEventsQuery {
self.do_test_stream_error
}
pub fn do_one_before_range(&self) -> bool {
self.do_one_before_range
}
pub fn set_series_id(&mut self, series: u64) {
self.channel.series = Some(series);
}
@@ -243,6 +248,11 @@ impl FromUrl for PlainEventsQuery {
.map_or("false", |k| k)
.parse()
.map_err(|e| Error::with_public_msg(format!("can not parse doTestStreamError {:?}", e)))?,
do_one_before_range: pairs
.get("getOneBeforeRange")
.map_or("false", |k| k)
.parse()
.map_err(|e| Error::with_public_msg(format!("can not parse getOneBeforeRange {:?}", e)))?,
};
Ok(ret)
}
@@ -267,6 +277,7 @@ impl AppendToUrl for PlainEventsQuery {
g.append_pair("eventsMax", &format!("{}", x));
}
g.append_pair("doLog", &format!("{}", self.do_log));
g.append_pair("getOneBeforeRange", &format!("{}", self.do_one_before_range));
}
}

View File

@@ -358,9 +358,18 @@ pub async fn fetch_uncached_binned_events(
let deadline = deadline
.checked_add(Duration::from_millis(6000))
.ok_or_else(|| Error::with_msg_no_trace(format!("deadline overflow")))?;
let do_one_before_range = agg_kind.need_expand();
let _evq = RawEventsQuery::new(chn.channel.clone(), coord.patch_range(), agg_kind);
let evq = PlainEventsQuery::new(chn.channel.clone(), coord.patch_range(), 4096, None, true);
let mut events_dyn = EventsStreamScylla::new(series, &evq, chn.scalar_type.clone(), chn.shape.clone(), scy, false);
let mut events_dyn = EventsStreamScylla::new(
series,
evq.range().clone(),
do_one_before_range,
chn.scalar_type.clone(),
chn.shape.clone(),
scy,
false,
);
let mut complete = false;
loop {
let item = tokio::time::timeout_at(deadline.into(), events_dyn.next()).await;

View File

@@ -16,7 +16,14 @@ use tokio_postgres::Client as PgClient;
macro_rules! read_values {
($fname:ident, $self:expr, $ts_msp:expr) => {{
let fut = $fname($self.series, $ts_msp, $self.range.clone(), $self.fwd, $self.scy.clone());
let fut = $fname(
$self.series,
$ts_msp,
$self.range.clone(),
$self.fwd,
$self.do_one_before_range,
$self.scy.clone(),
);
let fut = fut.map(|x| match x {
Ok(k) => {
let self_name = std::any::type_name::<Self>();
@@ -38,6 +45,7 @@ struct ReadValues {
range: NanoRange,
ts_msps: VecDeque<u64>,
fwd: bool,
do_one_before_range: bool,
fut: Pin<Box<dyn Future<Output = Result<Box<dyn Events>, Error>> + Send>>,
scy: Arc<ScySession>,
}
@@ -50,6 +58,7 @@ impl ReadValues {
range: NanoRange,
ts_msps: VecDeque<u64>,
fwd: bool,
do_one_before_range: bool,
scy: Arc<ScySession>,
) -> Self {
let mut ret = Self {
@@ -59,6 +68,7 @@ impl ReadValues {
range,
ts_msps,
fwd,
do_one_before_range,
fut: Box::pin(futures_util::future::ready(Err(Error::with_msg_no_trace(
"future not initialized",
)))),
@@ -137,6 +147,7 @@ pub struct EventsStreamScylla {
scalar_type: ScalarType,
shape: Shape,
range: NanoRange,
do_one_before_range: bool,
ts_msps: VecDeque<u64>,
scy: Arc<ScySession>,
do_test_stream_error: bool,
@@ -145,26 +156,30 @@ pub struct EventsStreamScylla {
impl EventsStreamScylla {
pub fn new(
series: u64,
evq: &PlainEventsQuery,
range: NanoRange,
do_one_before_range: bool,
scalar_type: ScalarType,
shape: Shape,
scy: Arc<ScySession>,
do_test_stream_error: bool,
) -> Self {
let self_name = std::any::type_name::<Self>();
info!("{self_name} do_one_before_range {do_one_before_range}");
Self {
state: FrState::New,
series,
scalar_type,
shape,
range: evq.range().clone(),
range,
do_one_before_range,
ts_msps: VecDeque::new(),
scy,
do_test_stream_error,
}
}
fn ts_msps_found(&mut self, ts_msps: VecDeque<u64>) {
info!("found ts_msps {ts_msps:?}");
fn ts_msps_found_one_before(&mut self, ts_msps: VecDeque<u64>) {
info!("ts_msps_found_one_before ts_msps {ts_msps:?}");
self.ts_msps = ts_msps;
// Find the largest MSP which can potentially contain some event before the range.
let befores: Vec<_> = self
@@ -181,6 +196,7 @@ impl EventsStreamScylla {
self.range.clone(),
[befores[befores.len() - 1]].into(),
false,
self.do_one_before_range,
self.scy.clone(),
);
self.state = FrState::ReadBack1(st);
@@ -192,6 +208,7 @@ impl EventsStreamScylla {
self.range.clone(),
self.ts_msps.clone(),
true,
self.do_one_before_range,
self.scy.clone(),
);
self.state = FrState::ReadValues(st);
@@ -200,6 +217,10 @@ impl EventsStreamScylla {
}
}
fn ts_msps_found(&mut self, ts_msps: VecDeque<u64>) {
self.ts_msps_found_one_before(ts_msps);
}
fn back_1_done(&mut self, item: Box<dyn Events>) -> Option<Box<dyn Events>> {
info!("back_1_done len {}", item.len());
if item.len() == 0 {
@@ -218,6 +239,7 @@ impl EventsStreamScylla {
self.range.clone(),
[befores[befores.len() - 2]].into(),
false,
self.do_one_before_range,
self.scy.clone(),
);
self.state = FrState::ReadBack2(st);
@@ -230,6 +252,7 @@ impl EventsStreamScylla {
self.range.clone(),
self.ts_msps.clone(),
true,
self.do_one_before_range,
self.scy.clone(),
);
self.state = FrState::ReadValues(st);
@@ -247,6 +270,7 @@ impl EventsStreamScylla {
self.range.clone(),
self.ts_msps.clone(),
true,
self.do_one_before_range,
self.scy.clone(),
);
self.state = FrState::ReadValues(st);
@@ -268,6 +292,7 @@ impl EventsStreamScylla {
self.range.clone(),
self.ts_msps.clone(),
true,
self.do_one_before_range,
self.scy.clone(),
);
self.state = FrState::ReadValues(st);
@@ -433,6 +458,7 @@ macro_rules! read_next_scalar_values {
ts_msp: u64,
range: NanoRange,
fwd: bool,
do_one_before: bool,
scy: Arc<ScySession>,
) -> Result<EventsDim0<$st>, Error> {
type ST = $st;
@@ -486,9 +512,12 @@ macro_rules! read_next_scalar_values {
for row in res.rows_typed_or_empty::<(i64, i64, SCYTY)>() {
let row = row.err_conv()?;
let ts = ts_msp + row.0 as u64;
let pulse = row.1 as u64;
let value = row.2 as ST;
ret.push(ts, pulse, value);
// TODO this should probably better be done at cql level.
if do_one_before || ts >= range.beg {
let pulse = row.1 as u64;
let value = row.2 as ST;
ret.push(ts, pulse, value);
}
}
trace!("found in total {} events ts_msp {}", ret.tss.len(), ts_msp);
Ok(ret)
@@ -503,6 +532,7 @@ macro_rules! read_next_array_values {
ts_msp: u64,
_range: NanoRange,
_fwd: bool,
_do_one_before: bool,
scy: Arc<ScySession>,
) -> Result<EventsDim0<$st>, Error> {
// TODO change return type: so far EventsDim1 does not exist.
@@ -546,20 +576,22 @@ read_next_array_values!(read_next_values_array_u16, u16, i16, "events_wave_u16")
pub async fn make_scylla_stream(
evq: &PlainEventsQuery,
do_one_before_range: bool,
series: u64,
scalar_type: ScalarType,
shape: Shape,
scy: Arc<ScySession>,
do_test_stream_error: bool,
) -> Result<Pin<Box<dyn Stream<Item = Result<ChannelEvents, Error>> + Send>>, Error> {
let res = Box::pin(EventsStreamScylla::new(
) -> Result<EventsStreamScylla, Error> {
let res = EventsStreamScylla::new(
series,
evq,
evq.range().clone(),
do_one_before_range,
scalar_type,
shape,
scy,
do_test_stream_error,
)) as _;
);
Ok(res)
}
@@ -607,7 +639,7 @@ pub async fn channel_state_events(
.map_err(|e| format!("{e}"))?;
}
}
ts_msp += DAY;
ts_msp += div;
if ts_msp >= evq.range().end {
break;
}