WIP to deliver also wave events

This commit is contained in:
Dominik Werder
2022-04-24 18:28:46 +02:00
parent e7016eda36
commit 165ddf316d

View File

@@ -6,7 +6,7 @@ use items::waveevents::WaveEvents;
use items::{Framable, RangeCompletableItem, StreamItem};
use netpod::log::*;
use netpod::query::RawEventsQuery;
use netpod::{NanoRange, ScalarType, ScyllaConfig};
use netpod::{NanoRange, ScalarType, ScyllaConfig, Shape};
use scylla::frame::response::cql_to_rust::FromRowError as ScyFromRowError;
use scylla::transport::errors::{NewSessionError as ScyNewSessionError, QueryError as ScyQueryError};
use scylla::Session as ScySession;
@@ -72,6 +72,7 @@ macro_rules! impl_read_values_fut {
struct ReadValues {
series: i64,
scalar_type: ScalarType,
shape: Shape,
range: NanoRange,
ts_msp: VecDeque<u64>,
fut: Pin<Box<dyn Future<Output = Box<dyn Framable>> + Send>>,
@@ -82,6 +83,7 @@ impl ReadValues {
fn new(
series: i64,
scalar_type: ScalarType,
shape: Shape,
range: NanoRange,
ts_msp: VecDeque<u64>,
scy: Arc<ScySession>,
@@ -89,6 +91,7 @@ impl ReadValues {
Self {
series,
scalar_type,
shape,
range,
ts_msp,
fut: Box::pin(futures_util::future::lazy(|_| panic!())),
@@ -107,13 +110,25 @@ impl ReadValues {
fn make_fut(&mut self, ts_msp: u64) -> Pin<Box<dyn Future<Output = Box<dyn Framable>> + Send>> {
// TODO this also needs to differentiate on Shape.
let fut = match &self.scalar_type {
ScalarType::F32 => {
impl_read_values_fut!(read_next_values_scalar_f32, self, ts_msp)
}
ScalarType::F64 => {
impl_read_values_fut!(read_next_values_scalar_f64, self, ts_msp)
}
let fut = match &self.shape {
Shape::Scalar => match &self.scalar_type {
ScalarType::I32 => {
impl_read_values_fut!(read_next_values_scalar_i32, self, ts_msp)
}
ScalarType::F32 => {
impl_read_values_fut!(read_next_values_scalar_f32, self, ts_msp)
}
ScalarType::F64 => {
impl_read_values_fut!(read_next_values_scalar_f64, self, ts_msp)
}
_ => err::todoval(),
},
Shape::Wave(_) => match &self.scalar_type {
ScalarType::U16 => {
impl_read_values_fut!(read_next_values_array_u16, self, ts_msp)
}
_ => err::todoval(),
},
_ => err::todoval(),
};
fut
@@ -122,7 +137,7 @@ impl ReadValues {
enum FrState {
New,
FindSeries(Pin<Box<dyn Future<Output = Result<(i64, ScalarType), Error>> + Send>>),
FindSeries(Pin<Box<dyn Future<Output = Result<(i64, ScalarType, Shape), Error>> + Send>>),
FindMsp(Pin<Box<dyn Future<Output = Result<Vec<u64>, Error>> + Send>>),
ReadValues(ReadValues),
Done,
@@ -134,6 +149,7 @@ pub struct ScyllaFramableStream {
channel_name: String,
range: NanoRange,
scalar_type: Option<ScalarType>,
shape: Option<Shape>,
series: i64,
scy: Arc<ScySession>,
}
@@ -146,6 +162,7 @@ impl ScyllaFramableStream {
channel_name: evq.channel.name().into(),
range: evq.range.clone(),
scalar_type: None,
shape: None,
series: 0,
scy,
}
@@ -166,10 +183,11 @@ impl Stream for ScyllaFramableStream {
continue;
}
FrState::FindSeries(ref mut fut) => match fut.poll_unpin(cx) {
Ready(Ok((series, scalar_type))) => {
Ready(Ok((series, scalar_type, shape))) => {
info!("ScyllaFramableStream found series {}", series);
self.series = series;
self.scalar_type = Some(scalar_type);
self.shape = Some(shape);
let fut = find_ts_msp(series, self.range.clone(), self.scy.clone());
let fut = Box::pin(fut);
self.state = FrState::FindMsp(fut);
@@ -190,6 +208,7 @@ impl Stream for ScyllaFramableStream {
let mut st = ReadValues::new(
self.series,
self.scalar_type.as_ref().unwrap().clone(),
self.shape.as_ref().unwrap().clone(),
self.range.clone(),
ts_msp.into(),
self.scy.clone(),
@@ -226,7 +245,11 @@ impl Stream for ScyllaFramableStream {
}
}
async fn find_series(facility: String, channel_name: String, scy: Arc<ScySession>) -> Result<(i64, ScalarType), Error> {
async fn find_series(
facility: String,
channel_name: String,
scy: Arc<ScySession>,
) -> Result<(i64, ScalarType, Shape), Error> {
info!("find_series");
let res = {
let cql =
@@ -253,8 +276,9 @@ async fn find_series(facility: String, channel_name: String, scy: Arc<ScySession
info!("make_scylla_stream row {row:?}");
let series = row.0;
let scalar_type = ScalarType::from_scylla_i32(row.1)?;
info!("make_scylla_stream series {series}");
Ok((series, scalar_type))
let shape = Shape::from_scylla_shape_dims(&row.2)?;
info!("make_scylla_stream series {series} scalar_type {scalar_type:?} shape {shape:?}");
Ok((series, scalar_type, shape))
}
async fn find_ts_msp(series: i64, range: NanoRange, scy: Arc<ScySession>) -> Result<Vec<u64>, Error> {
@@ -316,7 +340,7 @@ macro_rules! read_next_scalar_values {
};
}
macro_rules! read_next_1d_values {
macro_rules! read_next_array_values {
($fname:ident, $st:ty, $scyty:ty, $table_name:expr) => {
async fn $fname(
series: i64,
@@ -346,12 +370,12 @@ macro_rules! read_next_1d_values {
.query(cql, (series, ts_msp as i64, ts_lsp_max as i64))
.await
.err_conv()?;
let mut ret = ScalarEvents::<ST>::empty();
for row in res.rows_typed_or_empty::<(i64, i64, SCYTY)>() {
let mut ret = WaveEvents::<ST>::empty();
for row in res.rows_typed_or_empty::<(i64, i64, Vec<SCYTY>)>() {
let row = row.err_conv()?;
let ts = ts_msp + row.0 as u64;
let pulse = row.1 as u64;
let value = row.2 as ST;
let value = row.2.into_iter().map(|x| x as ST).collect();
ret.push(ts, pulse, value);
}
info!("found in total {} events ts_msp {}", ret.tss.len(), ts_msp);
@@ -360,10 +384,11 @@ macro_rules! read_next_1d_values {
};
}
read_next_scalar_values!(read_next_values_scalar_i32, i32, i32, "events_scalar_i32");
read_next_scalar_values!(read_next_values_scalar_f32, f32, f32, "events_scalar_f32");
read_next_scalar_values!(read_next_values_scalar_f64, f64, f64, "events_scalar_f64");
read_next_1d_values!(read_next_values_1d_u16, u16, u16, "events_wave_u16");
read_next_array_values!(read_next_values_array_u16, u16, i16, "events_wave_u16");
pub async fn make_scylla_stream(
evq: &RawEventsQuery,