diff --git a/dbconn/src/events_scylla.rs b/dbconn/src/events_scylla.rs index fa60b1c..e93c26c 100644 --- a/dbconn/src/events_scylla.rs +++ b/dbconn/src/events_scylla.rs @@ -612,7 +612,7 @@ pub async fn channel_state_events( ret.push((ts, kind)); } } - ts_msp += DAY; + ts_msp += div; if ts_msp >= evq.range().end { break; } diff --git a/httpret/src/channel_status.rs b/httpret/src/channel_status.rs index fb77b0a..c7ad186 100644 --- a/httpret/src/channel_status.rs +++ b/httpret/src/channel_status.rs @@ -7,11 +7,65 @@ use netpod::query::ChannelStateEventsQuery; use netpod::{FromUrl, NodeConfigCached, ACCEPT_ALL, APP_JSON}; use url::Url; -pub struct ChannelStatusConnectionEvents {} +pub struct ConnectionStatusEvents {} -impl ChannelStatusConnectionEvents { +impl ConnectionStatusEvents { pub fn handler(req: &Request) -> Option { - if req.uri().path() == "/api/4/channel/status/connection/events" { + if req.uri().path() == "/api/4/scylla/connection/status/events" { + Some(Self {}) + } else { + None + } + } + + pub async fn handle(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { + if req.method() == Method::GET { + let accept_def = APP_JSON; + let accept = req + .headers() + .get(http::header::ACCEPT) + .map_or(accept_def, |k| k.to_str().unwrap_or(accept_def)); + if accept == APP_JSON || accept == ACCEPT_ALL { + let url = Url::parse(&format!("dummy:{}", req.uri()))?; + let q = ChannelStateEventsQuery::from_url(&url)?; + match self.fetch_data(&q, node_config).await { + Ok(k) => { + let body = Body::from(serde_json::to_vec(&k)?); + Ok(response(StatusCode::OK).body(body)?) + } + Err(e) => Ok(response(StatusCode::INTERNAL_SERVER_ERROR) + .body(Body::from(format!("{:?}", e.public_msg())))?), + } + } else { + Ok(response(StatusCode::BAD_REQUEST).body(Body::empty())?) + } + } else { + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) + } + } + + async fn fetch_data( + &self, + q: &ChannelStateEventsQuery, + node_config: &NodeConfigCached, + ) -> Result, Error> { + let dbconf = &node_config.node_config.cluster.database; + let scyco = node_config + .node_config + .cluster + .scylla + .as_ref() + .ok_or_else(|| Error::with_public_msg_no_trace(format!("No Scylla configured")))?; + let ret = channel_state_events(q, scyco, dbconf.clone()).await?; + Ok(ret) + } +} + +pub struct ChannelConnectionStatusEvents {} + +impl ChannelConnectionStatusEvents { + pub fn handler(req: &Request) -> Option { + if req.uri().path() == "/api/4/scylla/channel/connection/status/events" { Some(Self {}) } else { None diff --git a/httpret/src/events.rs b/httpret/src/events.rs index 46d77e4..ae13796 100644 --- a/httpret/src/events.rs +++ b/httpret/src/events.rs @@ -4,7 +4,7 @@ use crate::{response, response_err, BodyStream, ToPublicResponse}; use futures_util::{Stream, StreamExt, TryStreamExt}; use http::{Method, Request, Response, StatusCode}; use hyper::Body; -use items_2::{binned_collected, ChannelEvents, ChannelEventsMerger}; +use items_2::{binned_collected, empty_events_dyn, empty_events_dyn_2, ChannelEvents, ChannelEventsMerger}; use netpod::log::*; use netpod::query::{BinnedQuery, ChannelStateEventsQuery, PlainEventsQuery}; use netpod::{AggKind, BinnedRange, FromUrl, NodeConfigCached}; @@ -14,6 +14,7 @@ use scyllaconn::errconv::ErrConv; use scyllaconn::events::{channel_state_events, find_series, make_scylla_stream}; use std::pin::Pin; use std::sync::Arc; +use std::time::Instant; use url::Url; pub struct EventsHandler {} @@ -164,11 +165,13 @@ impl EventsHandlerScylla { } async fn gather(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { + let self_name = std::any::type_name::(); let (head, _body) = req.into_parts(); - warn!("TODO PlainEventsQuery needs to take AggKind"); + warn!("TODO PlainEventsQuery needs to take AggKind to do x-binning"); let s1 = format!("dummy:{}", head.uri); let url = Url::parse(&s1)?; let evq = PlainEventsQuery::from_url(&url)?; + let deadline = Instant::now() + evq.timeout(); let pgclient = { // TODO use common connection/pool: info!("--------------- open postgres connection"); @@ -182,16 +185,50 @@ impl EventsHandlerScylla { let pgclient = Arc::new(pgclient); pgclient }; - let mut stream = if let Some(scyco) = &node_config.node_config.cluster.scylla { - let scy = create_scy_session(scyco).await?; - let (series, scalar_type, shape) = { find_series(evq.channel(), pgclient.clone()).await? }; - let stream = make_scylla_stream(&evq, series, scalar_type.clone(), shape.clone(), scy, false).await?; - stream + let scyco = if let Some(scyco) = &node_config.node_config.cluster.scylla { + scyco } else { return Err(Error::with_public_msg(format!("no scylla configured"))); }; + let scy = create_scy_session(scyco).await?; + let do_one_before_range = evq.do_one_before_range(); + let (series, scalar_type, shape) = find_series(evq.channel(), pgclient.clone()).await?; + let empty_item = empty_events_dyn_2(&scalar_type, &shape, &AggKind::TimeWeightedScalar); + let empty_stream = + futures_util::stream::once(futures_util::future::ready(Ok(ChannelEvents::Events(empty_item)))); + let stream2 = make_scylla_stream( + &evq, + do_one_before_range, + series, + scalar_type.clone(), + shape.clone(), + scy, + false, + ) + .await?; + let mut stream = empty_stream.chain(stream2); let mut coll = None; - while let Some(item) = stream.next().await { + let mut fut = None; + loop { + // Alternative way, test what works better: + if fut.is_none() { + fut = Some(stream.next()); + } + let item = match tokio::time::timeout_at(deadline.into(), fut.as_mut().unwrap()).await { + Ok(Some(item)) => { + fut.take(); + item + } + Ok(None) => { + fut.take(); + break; + } + Err(_) => { + warn!("{self_name} timeout"); + fut.take(); + break; + } + }; match item { Ok(k) => match k { ChannelEvents::Events(mut item) => { @@ -218,10 +255,13 @@ impl EventsHandlerScylla { Ok(ret) } None => { - let ret = response(StatusCode::OK).body(BodyStream::wrapped( - futures_util::stream::iter([Ok(Vec::new())]), - format!("EventsHandlerScylla::gather"), - ))?; + error!("should never happen with changed logic, remove case"); + err::todo(); + let item = empty_events_dyn(&scalar_type, &shape, &AggKind::TimeWeightedScalar); + let res = item.to_box_to_json_result(); + let res = res.to_json_result()?; + let res = res.to_json_bytes()?; + let ret = response(StatusCode::OK).body(Body::from(res))?; Ok(ret) } } @@ -269,10 +309,11 @@ impl BinnedHandlerScylla { async fn gather(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { let (head, _body) = req.into_parts(); - warn!("TODO BinnedQuery needs to take AggKind"); + warn!("TODO BinnedQuery needs to take AggKind to do x-binngin"); let s1 = format!("dummy:{}", head.uri); let url = Url::parse(&s1)?; let evq = BinnedQuery::from_url(&url)?; + let do_one_before_range = evq.agg_kind().need_expand(); let pgclient = { // TODO use common connection/pool: info!("--------------- open postgres connection"); @@ -288,13 +329,23 @@ impl BinnedHandlerScylla { }; if let Some(scyco) = &node_config.node_config.cluster.scylla { let scy = create_scy_session(scyco).await?; - let mut query2 = PlainEventsQuery::new(evq.channel().clone(), evq.range().clone(), 0, None, false); + let covering = BinnedRange::covering_range(evq.range().clone(), evq.bin_count())?; + let range = covering.full_range(); + let mut query2 = PlainEventsQuery::new(evq.channel().clone(), range.clone(), 0, None, false); query2.set_timeout(evq.timeout()); let query2 = query2; - let (series, scalar_type, shape) = { find_series(evq.channel(), pgclient.clone()).await? }; - let stream = - make_scylla_stream(&query2, series, scalar_type.clone(), shape.clone(), scy.clone(), false).await?; - let query3 = ChannelStateEventsQuery::new(evq.channel().clone(), evq.range().clone()); + let (series, scalar_type, shape) = find_series(evq.channel(), pgclient.clone()).await?; + let stream = make_scylla_stream( + &query2, + do_one_before_range, + series, + scalar_type.clone(), + shape.clone(), + scy.clone(), + false, + ) + .await?; + let query3 = ChannelStateEventsQuery::new(evq.channel().clone(), range.clone()); let state_stream = channel_state_events(&query3, scy.clone()) .await? .map(|x| { @@ -313,9 +364,6 @@ impl BinnedHandlerScylla { let state_stream = Box::pin(state_stream) as _; let merged_stream = ChannelEventsMerger::new(vec![data_stream, state_stream]); let merged_stream = Box::pin(merged_stream) as Pin + Send>>; - let covering = BinnedRange::covering_range(evq.range().clone(), evq.bin_count())?; - //eprintln!("edges {:?}", covering.edges()); - // TODO return partial result if timed out. let binned_collected = binned_collected( scalar_type.clone(), shape.clone(), diff --git a/httpret/src/httpret.rs b/httpret/src/httpret.rs index 7dda06d..755bd1d 100644 --- a/httpret/src/httpret.rs +++ b/httpret/src/httpret.rs @@ -18,7 +18,6 @@ use crate::bodystream::response; use crate::err::Error; use crate::gather::gather_get_json; use crate::pulsemap::UpdateTask; -use channel_status::ChannelStatusConnectionEvents; use channelconfig::{chconf_from_binned, ChConf}; use disk::binned::query::PreBinnedQuery; use future::Future; @@ -249,7 +248,9 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> h.handle(req, &node_config).await } else if let Some(h) = events::BinnedHandlerScylla::handler(&req) { h.handle(req, &node_config).await - } else if let Some(h) = ChannelStatusConnectionEvents::handler(&req) { + } else if let Some(h) = channel_status::ConnectionStatusEvents::handler(&req) { + h.handle(req, &node_config).await + } else if let Some(h) = channel_status::ChannelConnectionStatusEvents::handler(&req) { h.handle(req, &node_config).await } else if path == "/api/4/binned" { if req.method() == Method::GET { diff --git a/httpret/static/documentation/api4.html b/httpret/static/documentation/api4.html index 460460b..192a636 100644 --- a/httpret/static/documentation/api4.html +++ b/httpret/static/documentation/api4.html @@ -239,12 +239,11 @@ curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/events?channel
  • channelName (e.g. "SLAAR-LSCP4-LAS6891:CH7:1")
  • begDate (e.g. "2021-05-26T07:10:00.000Z")
  • endDate (e.g. "2021-05-26T07:16:00.000Z")
  • -
  • binCount (number of requested bins in time-dimension, e.g. "6". The actual number of returned bins depends - on - bin-cache-grid-resolution. The server tries to find the best match.)
  • +
  • binCount (number of requested bins in time-dimension, e.g. "6". The actual number of returned bins depends on + bin-cache-grid-resolution. The server tries to find a reasonable match.)
  • binningScheme (optional)
    • -
    • if not specified: default is "binningScheme=unweightedScalar".
    • +
    • if not specified: default is "binningScheme=timeWeightedScalar".
    • "binningScheme=unweightedScalar": non-weighted binning, waveform gets first averaged to a scalar.
    • "binningScheme=timeWeightedScalar": time-weighted binning, waveform gets first averaged to a scalar.
    • @@ -272,159 +271,38 @@ curl -H 'Accept: application/json' 'https://data-api.psi.ch/api/4/binned?channel

      Example response (without usage of binningScheme):

      {
      -  "avgs": [
      -    16204.087890625,
      -    16204.3798828125,
      -    16203.9296875,
      -    16204.232421875,
      -    16202.974609375,
      -    16203.208984375,
      -    16203.4345703125
      -  ],
      -  "counts": [
      -    1000,
      -    999,
      -    1000,
      -    999,
      -    1000,
      -    999,
      -    1000
      -  ],
      -  "finalisedRange": true,
      -  "maxs": [
      -    48096,
      -    48100,
      -    48094,
      -    48096,
      -    48096,
      -    48095,
      -    48096
      -  ],
      -  "mins": [
      -    0,
      -    0,
      -    0,
      -    0,
      -    0,
      -    0,
      -    0
      -  ],
         "tsAnchor": 1623769850,
         "tsMs": [
           0,
           10000,
      -    20000,
      -    30000,
      -    40000,
      -    50000,
      -    60000,
      -    70000
      +    20000
         ],
         "tsNs": [
      -    0,
      -    0,
      -    0,
      -    0,
      -    0,
      -    0,
      -    0,
      -    0
      -  ]
      -}
      -
      - -

      Example response (waveform channel and usage of binningScheme):

      -
      {
      -  "tsAnchor": 1623769950,
      -  "tsMs": [
      -    0,
      -    10000,
      -    20000,
      -    30000,
      -    40000,
      -    50000,
      -    60000,
      -    70000
      -  ],
      -  "tsNs": [
      -    0,
      -    0,
      -    0,
      -    0,
      -    0,
           0,
           0,
           0
         ],
      -  "finalisedRange": true,
         "counts": [
           1000,
      -    1000,
      -    ...
      +    999,
      +    1000
         ],
         "avgs": [
      -    [
      -      0.013631398789584637,
      -      34936.76953125,
      -      45045.5078125,
      -      31676.30859375,
      -      880.7999877929688,
      -      576.4010620117188,
      -      295.1236877441406
      -    ],
      -    [
      -      0.01851877197623253,
      -      34935.734375,
      -      45044.2734375,
      -      31675.359375,
      -      880.7310791015625,
      -      576.3038330078125,
      -      295.06134033203125
      -    ],
      -    ...
      +    16204.087890625,
      +    16204.3798828125,
      +    16203.9296875
         ],
         "maxs": [
      -    [
      -      111,
      -      48093,
      -      45804,
      -      47122,
      -      1446,
      -      783,
      -      431
      -    ],
      -    [
      -      120,
      -      48092,
      -      45803,
      -      47124,
      -      1452,
      -      782,
      -      431
      -    ],
      -    ...
      +    48096,
      +    48100,
      +    48094
         ],
         "mins": [
      -    [
      -      0,
      -      0,
      -      44329,
      -      267,
      -      519,
      -      394,
      -      0
      -    ],
      -    [
      -      0,
      -      0,
      -      44327,
      -      265,
      -      514,
      -      395,
      -      0
      -    ],
      -    ...
      -  ]
      +    0,
      +    0,
      +    0
      +  ],
      +  "finalisedRange": true
       }
       
      diff --git a/items_2/src/binsdim0.rs b/items_2/src/binsdim0.rs index 1b377b0..8158d1a 100644 --- a/items_2/src/binsdim0.rs +++ b/items_2/src/binsdim0.rs @@ -186,6 +186,8 @@ pub struct BinsDim0CollectedResult { missing_bins: u32, #[serde(skip_serializing_if = "Option::is_none", rename = "continueAt")] continue_at: Option, + #[serde(skip_serializing_if = "Option::is_none", rename = "finishedAt")] + finished_at: Option, } impl BinsDim0CollectedResult { @@ -273,16 +275,19 @@ impl CollectorType for BinsDim0Collector { fn result(&mut self) -> Result { let bin_count = self.vals.ts1s.len() as u32; - let (missing_bins, continue_at) = if bin_count < self.bin_count_exp { + let (missing_bins, continue_at, finished_at) = if bin_count < self.bin_count_exp { match self.vals.ts2s.back() { Some(&k) => { - let iso = IsoDateTime(Utc.timestamp_nanos(k as i64)); - (self.bin_count_exp - bin_count, Some(iso)) + let missing_bins = self.bin_count_exp - bin_count; + let continue_at = IsoDateTime(Utc.timestamp_nanos(k as i64)); + let u = k + (k - self.vals.ts1s.back().unwrap()) * missing_bins as u64; + let finished_at = IsoDateTime(Utc.timestamp_nanos(u as i64)); + (missing_bins, Some(continue_at), Some(finished_at)) } None => Err(Error::with_msg("partial_content but no bin in result"))?, } } else { - (0, None) + (0, None, None) }; if self.vals.ts1s.as_slices().1.len() != 0 { panic!(); @@ -309,6 +314,7 @@ impl CollectorType for BinsDim0Collector { finalised_range: self.range_complete, missing_bins, continue_at, + finished_at, }; Ok(ret) } diff --git a/items_2/src/items_2.rs b/items_2/src/items_2.rs index 1000b46..ce899b0 100644 --- a/items_2/src/items_2.rs +++ b/items_2/src/items_2.rs @@ -292,6 +292,46 @@ pub trait TimeBinnableTypeAggregator: Send { fn result_reset(&mut self, range: NanoRange, expand: bool) -> Self::Output; } +pub fn empty_events_dyn_2(scalar_type: &ScalarType, shape: &Shape, agg_kind: &AggKind) -> Box { + match shape { + Shape::Scalar => match agg_kind { + AggKind::TimeWeightedScalar => { + use ScalarType::*; + type K = eventsdim0::EventsDim0; + match scalar_type { + U8 => Box::new(K::::empty()), + U16 => Box::new(K::::empty()), + U32 => Box::new(K::::empty()), + U64 => Box::new(K::::empty()), + I8 => Box::new(K::::empty()), + I16 => Box::new(K::::empty()), + I32 => Box::new(K::::empty()), + I64 => Box::new(K::::empty()), + F32 => Box::new(K::::empty()), + F64 => Box::new(K::::empty()), + _ => { + error!("TODO empty_events_dyn {scalar_type:?} {shape:?} {agg_kind:?}"); + err::todoval() + } + } + } + _ => { + error!("TODO empty_events_dyn {scalar_type:?} {shape:?} {agg_kind:?}"); + err::todoval() + } + }, + Shape::Wave(..) => { + error!("TODO empty_events_dyn {scalar_type:?} {shape:?} {agg_kind:?}"); + err::todoval() + } + Shape::Image(..) => { + error!("TODO empty_events_dyn {scalar_type:?} {shape:?} {agg_kind:?}"); + err::todoval() + } + } +} + +// TODO needed any longer? pub fn empty_events_dyn(scalar_type: &ScalarType, shape: &Shape, agg_kind: &AggKind) -> Box { match shape { Shape::Scalar => match agg_kind { @@ -310,22 +350,22 @@ pub fn empty_events_dyn(scalar_type: &ScalarType, shape: &Shape, agg_kind: &AggK F32 => Box::new(K::::empty()), F64 => Box::new(K::::empty()), _ => { - error!("TODO empty_events_dyn"); + error!("TODO empty_events_dyn {scalar_type:?} {shape:?} {agg_kind:?}"); err::todoval() } } } _ => { - error!("TODO empty_events_dyn"); + error!("TODO empty_events_dyn {scalar_type:?} {shape:?} {agg_kind:?}"); err::todoval() } }, Shape::Wave(..) => { - error!("TODO empty_events_dyn"); + error!("TODO empty_events_dyn {scalar_type:?} {shape:?} {agg_kind:?}"); err::todoval() } Shape::Image(..) => { - error!("TODO empty_events_dyn"); + error!("TODO empty_events_dyn {scalar_type:?} {shape:?} {agg_kind:?}"); err::todoval() } } @@ -718,15 +758,17 @@ pub async fn binned_collected( timeout: Duration, inp: Pin> + Send>>, ) -> Result, Error> { + let deadline = Instant::now() + timeout; let bin_count_exp = edges.len().max(2) as u32 - 1; let do_time_weight = agg_kind.do_time_weighted(); let mut coll = None; let mut binner = None; - let mut inp = inp; - let deadline = Instant::now() + timeout; + let empty_item = empty_events_dyn_2(&scalar_type, &shape, &AggKind::TimeWeightedScalar); + let empty_stream = futures_util::stream::once(futures_util::future::ready(Ok(ChannelEvents::Events(empty_item)))); + let mut stream = empty_stream.chain(inp); loop { let item = futures_util::select! { - k = inp.next().fuse() => { + k = stream.next().fuse() => { if let Some(k) = k { k? }else { @@ -798,6 +840,8 @@ pub async fn binned_collected( Ok(res) } None => { + error!("TODO should never happen with changed logic, remove"); + err::todo(); let item = empty_binned_dyn(&scalar_type, &shape, &AggKind::DimXBins1); let ret = item.to_box_to_json_result(); Ok(ret) diff --git a/netpod/src/query.rs b/netpod/src/query.rs index cdbbb5a..905eb49 100644 --- a/netpod/src/query.rs +++ b/netpod/src/query.rs @@ -94,11 +94,11 @@ impl RawEventsQuery { } } -// TODO move this query type out of this `binned` mod #[derive(Clone, Debug)] pub struct PlainEventsQuery { channel: Channel, range: NanoRange, + do_one_before_range: bool, disk_io_buffer_size: usize, report_error: bool, timeout: Duration, @@ -119,6 +119,7 @@ impl PlainEventsQuery { Self { channel, range, + do_one_before_range: false, disk_io_buffer_size, report_error: false, timeout: Duration::from_millis(10000), @@ -165,6 +166,10 @@ impl PlainEventsQuery { self.do_test_stream_error } + pub fn do_one_before_range(&self) -> bool { + self.do_one_before_range + } + pub fn set_series_id(&mut self, series: u64) { self.channel.series = Some(series); } @@ -243,6 +248,11 @@ impl FromUrl for PlainEventsQuery { .map_or("false", |k| k) .parse() .map_err(|e| Error::with_public_msg(format!("can not parse doTestStreamError {:?}", e)))?, + do_one_before_range: pairs + .get("getOneBeforeRange") + .map_or("false", |k| k) + .parse() + .map_err(|e| Error::with_public_msg(format!("can not parse getOneBeforeRange {:?}", e)))?, }; Ok(ret) } @@ -267,6 +277,7 @@ impl AppendToUrl for PlainEventsQuery { g.append_pair("eventsMax", &format!("{}", x)); } g.append_pair("doLog", &format!("{}", self.do_log)); + g.append_pair("getOneBeforeRange", &format!("{}", self.do_one_before_range)); } } diff --git a/scyllaconn/src/bincache.rs b/scyllaconn/src/bincache.rs index 39fdc4c..0976cf0 100644 --- a/scyllaconn/src/bincache.rs +++ b/scyllaconn/src/bincache.rs @@ -358,9 +358,18 @@ pub async fn fetch_uncached_binned_events( let deadline = deadline .checked_add(Duration::from_millis(6000)) .ok_or_else(|| Error::with_msg_no_trace(format!("deadline overflow")))?; + let do_one_before_range = agg_kind.need_expand(); let _evq = RawEventsQuery::new(chn.channel.clone(), coord.patch_range(), agg_kind); let evq = PlainEventsQuery::new(chn.channel.clone(), coord.patch_range(), 4096, None, true); - let mut events_dyn = EventsStreamScylla::new(series, &evq, chn.scalar_type.clone(), chn.shape.clone(), scy, false); + let mut events_dyn = EventsStreamScylla::new( + series, + evq.range().clone(), + do_one_before_range, + chn.scalar_type.clone(), + chn.shape.clone(), + scy, + false, + ); let mut complete = false; loop { let item = tokio::time::timeout_at(deadline.into(), events_dyn.next()).await; diff --git a/scyllaconn/src/events.rs b/scyllaconn/src/events.rs index dd0f64a..8baf423 100644 --- a/scyllaconn/src/events.rs +++ b/scyllaconn/src/events.rs @@ -16,7 +16,14 @@ use tokio_postgres::Client as PgClient; macro_rules! read_values { ($fname:ident, $self:expr, $ts_msp:expr) => {{ - let fut = $fname($self.series, $ts_msp, $self.range.clone(), $self.fwd, $self.scy.clone()); + let fut = $fname( + $self.series, + $ts_msp, + $self.range.clone(), + $self.fwd, + $self.do_one_before_range, + $self.scy.clone(), + ); let fut = fut.map(|x| match x { Ok(k) => { let self_name = std::any::type_name::(); @@ -38,6 +45,7 @@ struct ReadValues { range: NanoRange, ts_msps: VecDeque, fwd: bool, + do_one_before_range: bool, fut: Pin, Error>> + Send>>, scy: Arc, } @@ -50,6 +58,7 @@ impl ReadValues { range: NanoRange, ts_msps: VecDeque, fwd: bool, + do_one_before_range: bool, scy: Arc, ) -> Self { let mut ret = Self { @@ -59,6 +68,7 @@ impl ReadValues { range, ts_msps, fwd, + do_one_before_range, fut: Box::pin(futures_util::future::ready(Err(Error::with_msg_no_trace( "future not initialized", )))), @@ -137,6 +147,7 @@ pub struct EventsStreamScylla { scalar_type: ScalarType, shape: Shape, range: NanoRange, + do_one_before_range: bool, ts_msps: VecDeque, scy: Arc, do_test_stream_error: bool, @@ -145,26 +156,30 @@ pub struct EventsStreamScylla { impl EventsStreamScylla { pub fn new( series: u64, - evq: &PlainEventsQuery, + range: NanoRange, + do_one_before_range: bool, scalar_type: ScalarType, shape: Shape, scy: Arc, do_test_stream_error: bool, ) -> Self { + let self_name = std::any::type_name::(); + info!("{self_name} do_one_before_range {do_one_before_range}"); Self { state: FrState::New, series, scalar_type, shape, - range: evq.range().clone(), + range, + do_one_before_range, ts_msps: VecDeque::new(), scy, do_test_stream_error, } } - fn ts_msps_found(&mut self, ts_msps: VecDeque) { - info!("found ts_msps {ts_msps:?}"); + fn ts_msps_found_one_before(&mut self, ts_msps: VecDeque) { + info!("ts_msps_found_one_before ts_msps {ts_msps:?}"); self.ts_msps = ts_msps; // Find the largest MSP which can potentially contain some event before the range. let befores: Vec<_> = self @@ -181,6 +196,7 @@ impl EventsStreamScylla { self.range.clone(), [befores[befores.len() - 1]].into(), false, + self.do_one_before_range, self.scy.clone(), ); self.state = FrState::ReadBack1(st); @@ -192,6 +208,7 @@ impl EventsStreamScylla { self.range.clone(), self.ts_msps.clone(), true, + self.do_one_before_range, self.scy.clone(), ); self.state = FrState::ReadValues(st); @@ -200,6 +217,10 @@ impl EventsStreamScylla { } } + fn ts_msps_found(&mut self, ts_msps: VecDeque) { + self.ts_msps_found_one_before(ts_msps); + } + fn back_1_done(&mut self, item: Box) -> Option> { info!("back_1_done len {}", item.len()); if item.len() == 0 { @@ -218,6 +239,7 @@ impl EventsStreamScylla { self.range.clone(), [befores[befores.len() - 2]].into(), false, + self.do_one_before_range, self.scy.clone(), ); self.state = FrState::ReadBack2(st); @@ -230,6 +252,7 @@ impl EventsStreamScylla { self.range.clone(), self.ts_msps.clone(), true, + self.do_one_before_range, self.scy.clone(), ); self.state = FrState::ReadValues(st); @@ -247,6 +270,7 @@ impl EventsStreamScylla { self.range.clone(), self.ts_msps.clone(), true, + self.do_one_before_range, self.scy.clone(), ); self.state = FrState::ReadValues(st); @@ -268,6 +292,7 @@ impl EventsStreamScylla { self.range.clone(), self.ts_msps.clone(), true, + self.do_one_before_range, self.scy.clone(), ); self.state = FrState::ReadValues(st); @@ -433,6 +458,7 @@ macro_rules! read_next_scalar_values { ts_msp: u64, range: NanoRange, fwd: bool, + do_one_before: bool, scy: Arc, ) -> Result, Error> { type ST = $st; @@ -486,9 +512,12 @@ macro_rules! read_next_scalar_values { for row in res.rows_typed_or_empty::<(i64, i64, SCYTY)>() { let row = row.err_conv()?; let ts = ts_msp + row.0 as u64; - let pulse = row.1 as u64; - let value = row.2 as ST; - ret.push(ts, pulse, value); + // TODO this should probably better be done at cql level. + if do_one_before || ts >= range.beg { + let pulse = row.1 as u64; + let value = row.2 as ST; + ret.push(ts, pulse, value); + } } trace!("found in total {} events ts_msp {}", ret.tss.len(), ts_msp); Ok(ret) @@ -503,6 +532,7 @@ macro_rules! read_next_array_values { ts_msp: u64, _range: NanoRange, _fwd: bool, + _do_one_before: bool, scy: Arc, ) -> Result, Error> { // TODO change return type: so far EventsDim1 does not exist. @@ -546,20 +576,22 @@ read_next_array_values!(read_next_values_array_u16, u16, i16, "events_wave_u16") pub async fn make_scylla_stream( evq: &PlainEventsQuery, + do_one_before_range: bool, series: u64, scalar_type: ScalarType, shape: Shape, scy: Arc, do_test_stream_error: bool, -) -> Result> + Send>>, Error> { - let res = Box::pin(EventsStreamScylla::new( +) -> Result { + let res = EventsStreamScylla::new( series, - evq, + evq.range().clone(), + do_one_before_range, scalar_type, shape, scy, do_test_stream_error, - )) as _; + ); Ok(res) } @@ -607,7 +639,7 @@ pub async fn channel_state_events( .map_err(|e| format!("{e}"))?; } } - ts_msp += DAY; + ts_msp += div; if ts_msp >= evq.range().end { break; }