diff --git a/nodenet/src/scylla.rs b/nodenet/src/scylla.rs index 3d2c00b..99710da 100644 --- a/nodenet/src/scylla.rs +++ b/nodenet/src/scylla.rs @@ -6,7 +6,7 @@ use items::waveevents::WaveEvents; use items::{Framable, RangeCompletableItem, StreamItem}; use netpod::log::*; use netpod::query::RawEventsQuery; -use netpod::{NanoRange, ScalarType, ScyllaConfig}; +use netpod::{NanoRange, ScalarType, ScyllaConfig, Shape}; use scylla::frame::response::cql_to_rust::FromRowError as ScyFromRowError; use scylla::transport::errors::{NewSessionError as ScyNewSessionError, QueryError as ScyQueryError}; use scylla::Session as ScySession; @@ -72,6 +72,7 @@ macro_rules! impl_read_values_fut { struct ReadValues { series: i64, scalar_type: ScalarType, + shape: Shape, range: NanoRange, ts_msp: VecDeque, fut: Pin> + Send>>, @@ -82,6 +83,7 @@ impl ReadValues { fn new( series: i64, scalar_type: ScalarType, + shape: Shape, range: NanoRange, ts_msp: VecDeque, scy: Arc, @@ -89,6 +91,7 @@ impl ReadValues { Self { series, scalar_type, + shape, range, ts_msp, fut: Box::pin(futures_util::future::lazy(|_| panic!())), @@ -107,13 +110,25 @@ impl ReadValues { fn make_fut(&mut self, ts_msp: u64) -> Pin> + Send>> { // TODO this also needs to differentiate on Shape. - let fut = match &self.scalar_type { - ScalarType::F32 => { - impl_read_values_fut!(read_next_values_scalar_f32, self, ts_msp) - } - ScalarType::F64 => { - impl_read_values_fut!(read_next_values_scalar_f64, self, ts_msp) - } + let fut = match &self.shape { + Shape::Scalar => match &self.scalar_type { + ScalarType::I32 => { + impl_read_values_fut!(read_next_values_scalar_i32, self, ts_msp) + } + ScalarType::F32 => { + impl_read_values_fut!(read_next_values_scalar_f32, self, ts_msp) + } + ScalarType::F64 => { + impl_read_values_fut!(read_next_values_scalar_f64, self, ts_msp) + } + _ => err::todoval(), + }, + Shape::Wave(_) => match &self.scalar_type { + ScalarType::U16 => { + impl_read_values_fut!(read_next_values_array_u16, self, ts_msp) + } + _ => err::todoval(), + }, _ => err::todoval(), }; fut @@ -122,7 +137,7 @@ impl ReadValues { enum FrState { New, - FindSeries(Pin> + Send>>), + FindSeries(Pin> + Send>>), FindMsp(Pin, Error>> + Send>>), ReadValues(ReadValues), Done, @@ -134,6 +149,7 @@ pub struct ScyllaFramableStream { channel_name: String, range: NanoRange, scalar_type: Option, + shape: Option, series: i64, scy: Arc, } @@ -146,6 +162,7 @@ impl ScyllaFramableStream { channel_name: evq.channel.name().into(), range: evq.range.clone(), scalar_type: None, + shape: None, series: 0, scy, } @@ -166,10 +183,11 @@ impl Stream for ScyllaFramableStream { continue; } FrState::FindSeries(ref mut fut) => match fut.poll_unpin(cx) { - Ready(Ok((series, scalar_type))) => { + Ready(Ok((series, scalar_type, shape))) => { info!("ScyllaFramableStream found series {}", series); self.series = series; self.scalar_type = Some(scalar_type); + self.shape = Some(shape); let fut = find_ts_msp(series, self.range.clone(), self.scy.clone()); let fut = Box::pin(fut); self.state = FrState::FindMsp(fut); @@ -190,6 +208,7 @@ impl Stream for ScyllaFramableStream { let mut st = ReadValues::new( self.series, self.scalar_type.as_ref().unwrap().clone(), + self.shape.as_ref().unwrap().clone(), self.range.clone(), ts_msp.into(), self.scy.clone(), @@ -226,7 +245,11 @@ impl Stream for ScyllaFramableStream { } } -async fn find_series(facility: String, channel_name: String, scy: Arc) -> Result<(i64, ScalarType), Error> { +async fn find_series( + facility: String, + channel_name: String, + scy: Arc, +) -> Result<(i64, ScalarType, Shape), Error> { info!("find_series"); let res = { let cql = @@ -253,8 +276,9 @@ async fn find_series(facility: String, channel_name: String, scy: Arc) -> Result, Error> { @@ -316,7 +340,7 @@ macro_rules! read_next_scalar_values { }; } -macro_rules! read_next_1d_values { +macro_rules! read_next_array_values { ($fname:ident, $st:ty, $scyty:ty, $table_name:expr) => { async fn $fname( series: i64, @@ -346,12 +370,12 @@ macro_rules! read_next_1d_values { .query(cql, (series, ts_msp as i64, ts_lsp_max as i64)) .await .err_conv()?; - let mut ret = ScalarEvents::::empty(); - for row in res.rows_typed_or_empty::<(i64, i64, SCYTY)>() { + let mut ret = WaveEvents::::empty(); + for row in res.rows_typed_or_empty::<(i64, i64, Vec)>() { let row = row.err_conv()?; let ts = ts_msp + row.0 as u64; let pulse = row.1 as u64; - let value = row.2 as ST; + let value = row.2.into_iter().map(|x| x as ST).collect(); ret.push(ts, pulse, value); } info!("found in total {} events ts_msp {}", ret.tss.len(), ts_msp); @@ -360,10 +384,11 @@ macro_rules! read_next_1d_values { }; } +read_next_scalar_values!(read_next_values_scalar_i32, i32, i32, "events_scalar_i32"); read_next_scalar_values!(read_next_values_scalar_f32, f32, f32, "events_scalar_f32"); read_next_scalar_values!(read_next_values_scalar_f64, f64, f64, "events_scalar_f64"); -read_next_1d_values!(read_next_values_1d_u16, u16, u16, "events_wave_u16"); +read_next_array_values!(read_next_values_array_u16, u16, i16, "events_wave_u16"); pub async fn make_scylla_stream( evq: &RawEventsQuery,