From c046303c7fc7e6ab18f8319cb38429ddf72ef8c4 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 22 Jun 2022 10:17:00 +0200 Subject: [PATCH] Move scylla event fetch logic to dbconn crate --- dbconn/src/bincache.rs | 105 ++++++-- dbconn/src/dbconn.rs | 3 +- dbconn/src/events_scylla.rs | 500 +++++++++++++++++++++++++++++++++++ items/src/items.rs | 4 +- nodenet/Cargo.toml | 1 + nodenet/src/conn.rs | 2 +- nodenet/src/scylla.rs | 511 ------------------------------------ 7 files changed, 589 insertions(+), 537 deletions(-) create mode 100644 dbconn/src/events_scylla.rs diff --git a/dbconn/src/bincache.rs b/dbconn/src/bincache.rs index d15f944..049e45d 100644 --- a/dbconn/src/bincache.rs +++ b/dbconn/src/bincache.rs @@ -1,11 +1,17 @@ +use crate::events_scylla::ScyllaFramableStream; use crate::ErrConv; use err::Error; use futures_util::{Future, Stream, StreamExt}; use items::TimeBinned; use netpod::log::*; -use netpod::{ChannelTyped, NanoRange, PreBinnedPatchCoord, PreBinnedPatchIterator, PreBinnedPatchRange, ScyllaConfig}; +use netpod::query::RawEventsQuery; +use netpod::{ + AggKind, ChannelTyped, NanoRange, PreBinnedPatchCoord, PreBinnedPatchIterator, PreBinnedPatchRange, ScyllaConfig, +}; use scylla::Session as ScySession; use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; pub async fn read_cached_scylla( chn: &ChannelTyped, @@ -20,57 +26,96 @@ pub async fn read_cached_scylla( err::todoval() } -pub async fn write_cached_scylla( +#[allow(unused)] +struct WriteFut<'a> { + chn: &'a ChannelTyped, + coord: &'a PreBinnedPatchCoord, + data: &'a dyn TimeBinned, + scy: &'a ScySession, +} + +impl<'a> WriteFut<'a> { + fn new( + chn: &'a ChannelTyped, + coord: &'a PreBinnedPatchCoord, + data: &'a dyn TimeBinned, + scy: &'a ScySession, + ) -> Self { + Self { chn, coord, data, scy } + } +} + +impl<'a> Future for WriteFut<'a> { + type Output = Result<(), Error>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let _ = cx; + todo!() + } +} + +pub fn write_cached_scylla<'a>( chn: &ChannelTyped, coord: &PreBinnedPatchCoord, - data: &dyn TimeBinned, + data: &'a dyn TimeBinned, scy: &ScySession, -) -> Result<(), Error> { - let _ = coord; - let _ = data; - let series = chn.series_id()?; - let res = scy.query_iter("", (series as i64,)).await.err_conv()?; - let _ = res; - // TODO write the data. - err::todoval() +) -> Pin> + Send + 'a>> { + let chn = unsafe { &*(chn as *const ChannelTyped) }; + let data = unsafe { &*(data as *const dyn TimeBinned) }; + let scy = unsafe { &*(scy as *const ScySession) }; + let fut = async move { + let _ = coord; + let series = chn.series_id()?; + let res = scy + .query_iter("", (series as i64, data.dummy_test_i32())) + .await + .err_conv()?; + let _ = res; + // TODO write the data. + //err::todoval(); + Ok(()) + }; + Box::pin(fut) } // TODO must indicate to the caller whether it is safe to cache this (complete). pub async fn fetch_uncached_data( chn: ChannelTyped, coord: PreBinnedPatchCoord, - scy: &ScySession, + scy: Arc, ) -> Result>, Error> { info!("fetch_uncached_data"); let range = coord.patch_range(); // TODO why the extra plus one? let bin = match PreBinnedPatchRange::covering_range(range, coord.bin_count() + 1) { - Ok(Some(range)) => fetch_uncached_higher_res_prebinned(&chn, &range, scy).await, - Ok(None) => fetch_uncached_binned_events(&chn, &coord.patch_range(), scy).await, + Ok(Some(range)) => fetch_uncached_higher_res_prebinned(&chn, &range, scy.clone()).await, + Ok(None) => fetch_uncached_binned_events(&chn, &coord.patch_range(), scy.clone()).await, Err(e) => Err(e), }?; - err::todoval() + //let data = bin.workaround_clone(); + WriteFut::new(&chn, &coord, bin.as_ref(), &scy).await?; + write_cached_scylla(&chn, &coord, bin.as_ref(), &scy).await?; + Ok(Some(bin)) } pub fn fetch_uncached_data_box( chn: &ChannelTyped, coord: &PreBinnedPatchCoord, - scy: &ScySession, + scy: Arc, ) -> Pin>, Error>> + Send>> { - let scy = unsafe { &*(scy as *const _) }; Box::pin(fetch_uncached_data(chn.clone(), coord.clone(), scy)) } pub async fn fetch_uncached_higher_res_prebinned( chn: &ChannelTyped, range: &PreBinnedPatchRange, - scy: &ScySession, + scy: Arc, ) -> Result, Error> { let mut aggt = None; let patch_it = PreBinnedPatchIterator::from_range(range.clone()); for patch in patch_it { let coord = PreBinnedPatchCoord::new(patch.bin_t_len(), patch.patch_t_len(), patch.ix()); - let mut stream = pre_binned_value_stream_with_scy(chn, &coord, scy).await?; + let mut stream = pre_binned_value_stream_with_scy(chn, &coord, scy.clone()).await?; while let Some(item) = stream.next().await { let item = item?; // TODO here I will need some new API to aggregate (time-bin) trait objects. @@ -92,25 +137,38 @@ pub async fn fetch_uncached_higher_res_prebinned( pub async fn fetch_uncached_binned_events( chn: &ChannelTyped, range: &NanoRange, - scy: &ScySession, + scy: Arc, ) -> Result, Error> { // TODO ask Scylla directly, do not go through HTTP. // Refactor the event fetch stream code such that I can use that easily here. + let evq = RawEventsQuery::new(chn.channel.clone(), range.clone(), AggKind::Plain); + let _res = Box::pin(ScyllaFramableStream::new( + &evq, + chn.scalar_type.clone(), + chn.shape.clone(), + scy, + false, + )); + // TODO ScyllaFramableStream must return a new events trait object designed for trait object use. err::todoval() } pub async fn pre_binned_value_stream_with_scy( chn: &ChannelTyped, coord: &PreBinnedPatchCoord, - scy: &ScySession, + scy: Arc, ) -> Result, Error>> + Send>>, Error> { info!("pre_binned_value_stream_with_scy {chn:?} {coord:?}"); + // TODO determine the range: let range = err::todoval(); if let Some(item) = read_cached_scylla(chn, &range, &scy).await? { Ok(Box::pin(futures_util::stream::iter([Ok(item)]))) } else { let bin = fetch_uncached_data_box(chn, coord, scy).await?; - Ok(Box::pin(futures_util::stream::empty())) + // TODO when can it ever be that we get back a None? + // TODO also, probably the caller wants to know whether the bin is Complete. + let bin = bin.unwrap(); + Ok(Box::pin(futures_util::stream::iter([Ok(bin)]))) } } @@ -126,5 +184,6 @@ pub async fn pre_binned_value_stream( .build() .await .err_conv()?; - pre_binned_value_stream_with_scy(chn, coord, &scy).await + let scy = Arc::new(scy); + pre_binned_value_stream_with_scy(chn, coord, scy).await } diff --git a/dbconn/src/dbconn.rs b/dbconn/src/dbconn.rs index 21ca370..8f28b47 100644 --- a/dbconn/src/dbconn.rs +++ b/dbconn/src/dbconn.rs @@ -1,5 +1,6 @@ -pub mod scan; pub mod bincache; +pub mod events_scylla; +pub mod scan; pub mod search; pub mod pg { pub use tokio_postgres::{Client, Error}; diff --git a/dbconn/src/events_scylla.rs b/dbconn/src/events_scylla.rs new file mode 100644 index 0000000..5044606 --- /dev/null +++ b/dbconn/src/events_scylla.rs @@ -0,0 +1,500 @@ +use err::Error; +use futures_core::{Future, Stream}; +use futures_util::FutureExt; +use items::scalarevents::ScalarEvents; +use items::waveevents::WaveEvents; +use items::{Framable, RangeCompletableItem, StreamItem}; +use netpod::log::*; +use netpod::query::RawEventsQuery; +use netpod::{Database, NanoRange, ScalarType, ScyllaConfig, Shape}; +use scylla::frame::response::cql_to_rust::FromRowError as ScyFromRowError; +use scylla::transport::errors::{NewSessionError as ScyNewSessionError, QueryError as ScyQueryError}; +use scylla::Session as ScySession; +use std::collections::VecDeque; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; +use tokio_postgres::Client as PgClient; + +trait ErrConv { + fn err_conv(self) -> Result; +} + +impl ErrConv for Result { + fn err_conv(self) -> Result { + match self { + Ok(k) => Ok(k), + Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))), + } + } +} + +impl ErrConv for Result { + fn err_conv(self) -> Result { + match self { + Ok(k) => Ok(k), + Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))), + } + } +} + +impl ErrConv for Result { + fn err_conv(self) -> Result { + match self { + Ok(k) => Ok(k), + Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))), + } + } +} + +impl ErrConv for Result { + fn err_conv(self) -> Result { + match self { + Ok(k) => Ok(k), + Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))), + } + } +} + +macro_rules! impl_read_values_fut { + ($fname:ident, $self:expr, $ts_msp:expr) => {{ + let fut = $fname($self.series, $ts_msp, $self.range.clone(), $self.scy.clone()); + let fut = fut.map(|x| { + let x2 = match x { + Ok(k) => { + // + Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))) + } + Err(e) => { + // + Err(e) + } + }; + //Box::new(Ok(StreamItem::DataItem(RangeCompletableItem::Data(x)))) as Box}); + let ret = Box::new(x2) as Box; + ret + }); + let fut = Box::pin(fut) as Pin> + Send>>; + fut + }}; +} + +struct ReadValues { + series: i64, + scalar_type: ScalarType, + shape: Shape, + range: NanoRange, + ts_msp: VecDeque, + fut: Pin> + Send>>, + scy: Arc, +} + +impl ReadValues { + fn new( + series: i64, + scalar_type: ScalarType, + shape: Shape, + range: NanoRange, + ts_msp: VecDeque, + scy: Arc, + ) -> Self { + Self { + series, + scalar_type, + shape, + range, + ts_msp, + fut: Box::pin(futures_util::future::lazy(|_| panic!())), + scy, + } + } + + fn next(&mut self) -> bool { + if let Some(ts_msp) = self.ts_msp.pop_front() { + self.fut = self.make_fut(ts_msp, self.ts_msp.len() > 1); + true + } else { + false + } + } + + fn make_fut( + &mut self, + ts_msp: u64, + _has_more_msp: bool, + ) -> Pin> + Send>> { + // TODO this also needs to differentiate on Shape. + let fut = match &self.shape { + Shape::Scalar => match &self.scalar_type { + ScalarType::I32 => { + impl_read_values_fut!(read_next_values_scalar_i32, self, ts_msp) + } + ScalarType::F32 => { + impl_read_values_fut!(read_next_values_scalar_f32, self, ts_msp) + } + ScalarType::F64 => { + impl_read_values_fut!(read_next_values_scalar_f64, self, ts_msp) + } + _ => err::todoval(), + }, + Shape::Wave(_) => match &self.scalar_type { + ScalarType::U16 => { + impl_read_values_fut!(read_next_values_array_u16, self, ts_msp) + } + _ => err::todoval(), + }, + _ => err::todoval(), + }; + fut + } +} + +enum FrState { + New, + FindMsp(Pin, Error>> + Send>>), + ReadValues(ReadValues), + Done, +} + +pub struct ScyllaFramableStream { + state: FrState, + #[allow(unused)] + evq: RawEventsQuery, + scalar_type: ScalarType, + shape: Shape, + series: u64, + range: NanoRange, + scy: Arc, + do_test_stream_error: bool, +} + +impl ScyllaFramableStream { + pub fn new( + evq: &RawEventsQuery, + scalar_type: ScalarType, + shape: Shape, + scy: Arc, + do_test_stream_error: bool, + ) -> Self { + Self { + state: FrState::New, + series: evq.channel.series.unwrap(), + evq: evq.clone(), + scalar_type, + shape, + range: evq.range.clone(), + scy, + do_test_stream_error, + } + } +} + +impl Stream for ScyllaFramableStream { + type Item = Box; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + if self.do_test_stream_error { + let e = Error::with_msg(format!("Test PRIVATE STREAM error.")) + .add_public_msg(format!("Test PUBLIC STREAM error.")); + return Ready(Some( + Box::new(Err::>>, _>(e)) as _, + )); + } + loop { + break match self.state { + FrState::New => { + let fut = find_ts_msp(self.series as i64, self.range.clone(), self.scy.clone()); + let fut = Box::pin(fut); + self.state = FrState::FindMsp(fut); + continue; + } + FrState::FindMsp(ref mut fut) => match fut.poll_unpin(cx) { + Ready(Ok(ts_msp)) => { + info!("found ts_msp {ts_msp:?}"); + // TODO get rid of into() for VecDeque + let mut st = ReadValues::new( + self.series as i64, + self.scalar_type.clone(), + self.shape.clone(), + self.range.clone(), + // TODO get rid of the conversion: + ts_msp.into(), + self.scy.clone(), + ); + if st.next() { + self.state = FrState::ReadValues(st); + } else { + self.state = FrState::Done; + } + continue; + } + Ready(Err(e)) => { + self.state = FrState::Done; + Ready(Some(Box::new( + Err(e) as Result>>, _> + ))) + } + Pending => Pending, + }, + FrState::ReadValues(ref mut st) => match st.fut.poll_unpin(cx) { + Ready(item) => { + if st.next() { + } else { + info!("ReadValues exhausted"); + self.state = FrState::Done; + } + Ready(Some(item)) + } + Pending => Pending, + }, + FrState::Done => Ready(None), + }; + } + } +} + +async fn find_series(series: u64, pgclient: Arc) -> Result<(ScalarType, Shape), Error> { + info!("find_series series {}", series); + let rows = { + let q = "select facility, channel, scalar_type, shape_dims from series_by_channel where series = $1"; + pgclient.query(q, &[&(series as i64)]).await.err_conv()? + }; + if rows.len() < 1 { + return Err(Error::with_public_msg_no_trace( + "Multiple series found for channel, can not return data for ambiguous series", + )); + } + if rows.len() > 1 { + error!("Multiple series found for channel, can not return data for ambiguous series"); + return Err(Error::with_public_msg_no_trace( + "Multiple series found for channel, can not return data for ambiguous series", + )); + } + let row = rows + .into_iter() + .next() + .ok_or_else(|| Error::with_public_msg_no_trace(format!("can not find series for channel")))?; + info!("row {row:?}"); + let _facility: String = row.get(0); + let _channel: String = row.get(1); + let a: i32 = row.get(2); + let scalar_type = ScalarType::from_scylla_i32(a)?; + let a: Vec = row.get(3); + let shape = Shape::from_scylla_shape_dims(&a)?; + info!("make_scylla_stream series {series} scalar_type {scalar_type:?} shape {shape:?}"); + Ok((scalar_type, shape)) +} + +async fn find_ts_msp(series: i64, range: NanoRange, scy: Arc) -> Result, Error> { + trace!("find_ts_msp series {} {:?}", series, range); + // TODO use prepared statements + let cql = "select ts_msp from ts_msp where series = ? and ts_msp < ? order by ts_msp desc limit 1"; + let res = scy.query(cql, (series, range.beg as i64)).await.err_conv()?; + let mut before = vec![]; + for row in res.rows_typed_or_empty::<(i64,)>() { + let row = row.err_conv()?; + before.push(row.0 as u64); + } + trace!("FOUND BEFORE THE REQUESTED TIME: {} {:?}", before.len(), before); + let cql = "select ts_msp from ts_msp where series = ? and ts_msp >= ? and ts_msp < ?"; + let res = scy + .query(cql, (series, range.beg as i64, range.end as i64)) + .await + .err_conv()?; + let mut ret = vec![]; + for x in before { + ret.push(x); + } + for row in res.rows_typed_or_empty::<(i64,)>() { + let row = row.err_conv()?; + ret.push(row.0 as u64); + } + trace!("found in total {} rows", ret.len()); + Ok(ret) +} + +macro_rules! read_next_scalar_values { + ($fname:ident, $st:ty, $scyty:ty, $table_name:expr) => { + async fn $fname( + series: i64, + ts_msp: u64, + range: NanoRange, + scy: Arc, + ) -> Result, Error> { + type ST = $st; + type SCYTY = $scyty; + trace!("{} series {} ts_msp {}", stringify!($fname), series, ts_msp); + let _ts_lsp_max = if range.end <= ts_msp { + // TODO we should not be here... + } else { + }; + if range.end > i64::MAX as u64 { + return Err(Error::with_msg_no_trace(format!("range.end overflows i64"))); + } + let ts_lsp_max = range.end; + let cql = concat!( + "select ts_lsp, pulse, value from ", + $table_name, + " where series = ? and ts_msp = ? and ts_lsp < ?" + ); + let res = scy + .query(cql, (series, ts_msp as i64, ts_lsp_max as i64)) + .await + .err_conv()?; + let mut ret = ScalarEvents::::empty(); + let mut discarded = 0; + for row in res.rows_typed_or_empty::<(i64, i64, SCYTY)>() { + let row = row.err_conv()?; + let ts = ts_msp + row.0 as u64; + let pulse = row.1 as u64; + let value = row.2 as ST; + if ts < range.beg || ts >= range.end { + discarded += 1; + } else { + ret.push(ts, pulse, value); + } + } + trace!( + "found in total {} events ts_msp {} discarded {}", + ret.tss.len(), + ts_msp, + discarded + ); + Ok(ret) + } + }; +} + +macro_rules! read_next_array_values { + ($fname:ident, $st:ty, $scyty:ty, $table_name:expr) => { + async fn $fname( + series: i64, + ts_msp: u64, + _range: NanoRange, + scy: Arc, + ) -> Result, Error> { + type ST = $st; + type SCYTY = $scyty; + info!("{} series {} ts_msp {}", stringify!($fname), series, ts_msp); + let cql = concat!( + "select ts_lsp, pulse, value from ", + $table_name, + " where series = ? and ts_msp = ?" + ); + let res = scy.query(cql, (series, ts_msp as i64)).await.err_conv()?; + let mut ret = WaveEvents::::empty(); + for row in res.rows_typed_or_empty::<(i64, i64, Vec)>() { + let row = row.err_conv()?; + let ts = ts_msp + row.0 as u64; + let pulse = row.1 as u64; + let value = row.2.into_iter().map(|x| x as ST).collect(); + ret.push(ts, pulse, value); + } + info!("found in total {} events ts_msp {}", ret.tss.len(), ts_msp); + Ok(ret) + } + }; +} + +read_next_scalar_values!(read_next_values_scalar_i32, i32, i32, "events_scalar_i32"); +read_next_scalar_values!(read_next_values_scalar_f32, f32, f32, "events_scalar_f32"); +read_next_scalar_values!(read_next_values_scalar_f64, f64, f64, "events_scalar_f64"); + +read_next_array_values!(read_next_values_array_u16, u16, i16, "events_wave_u16"); + +pub async fn make_scylla_stream( + evq: &RawEventsQuery, + scyco: &ScyllaConfig, + dbconf: Database, + do_test_stream_error: bool, +) -> Result> + Send>>, Error> { + info!("make_scylla_stream open scylla connection"); + // TODO should RawEventsQuery already contain ScalarType and Shape? + let (scalar_type, shape) = { + let u = { + let d = &dbconf; + format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, d.port, d.name) + }; + let (pgclient, pgconn) = tokio_postgres::connect(&u, tokio_postgres::NoTls).await.err_conv()?; + // TODO use common connection/pool: + tokio::spawn(pgconn); + let pgclient = Arc::new(pgclient); + find_series(evq.channel.series.unwrap(), pgclient.clone()).await? + }; + // TODO reuse existing connection: + let scy = scylla::SessionBuilder::new() + .known_nodes(&scyco.hosts) + .use_keyspace(&scyco.keyspace, true) + .build() + .await + .err_conv()?; + let scy = Arc::new(scy); + let res = Box::pin(ScyllaFramableStream::new( + evq, + scalar_type, + shape, + scy, + do_test_stream_error, + )) as _; + Ok(res) +} + +#[allow(unused)] +async fn _make_scylla_stream_2( + evq: &RawEventsQuery, + scyco: &ScyllaConfig, +) -> Result> + Send>>, Error> { + // Find the "series" id. + info!("make_scylla_stream finding series id"); + let scy = scylla::SessionBuilder::new() + .known_nodes(&scyco.hosts) + .use_keyspace(&scyco.keyspace, true) + .build() + .await + .err_conv()?; + let res = { + let cql = + "select series, scalar_type, shape_dims from series_by_channel where facility = ? and channel_name = ?"; + scy.query(cql, (&evq.channel.backend, evq.channel.name())) + .await + .err_conv()? + }; + let rows: Vec<_> = res.rows_typed_or_empty::<(i64, i32, Vec)>().collect(); + if rows.len() > 1 { + error!("Multiple series found for channel, can not return data for ambiguous series"); + return Err(Error::with_public_msg_no_trace( + "Multiple series found for channel, can not return data for ambiguous series", + )); + } + if rows.len() < 1 { + return Err(Error::with_public_msg_no_trace( + "Multiple series found for channel, can not return data for ambiguous series", + )); + } + let row = rows + .into_iter() + .next() + .ok_or_else(|| Error::with_public_msg_no_trace(format!("can not find series for channel")))? + .err_conv()?; + info!("make_scylla_stream row {row:?}"); + let series = row.0; + info!("make_scylla_stream series {series}"); + let _expand = evq.agg_kind.need_expand(); + let range = &evq.range; + { + let cql = "select ts_msp from ts_msp where series = ? and ts_msp >= ? and ts_msp < ?"; + let res = scy + .query(cql, (series, range.beg as i64, range.end as i64)) + .await + .err_conv()?; + let mut rc = 0; + for _row in res.rows_or_empty() { + rc += 1; + } + info!("found in total {} rows", rc); + } + error!("TODO scylla fetch continue here"); + let res = Box::pin(futures_util::stream::empty()); + Ok(res) +} diff --git a/items/src/items.rs b/items/src/items.rs index 3354382..e910c12 100644 --- a/items/src/items.rs +++ b/items/src/items.rs @@ -421,9 +421,11 @@ pub trait TimeBinnableDynAggregator: Send { fn result(&mut self) -> Box; } -pub trait TimeBinned: Framable + Send + TimeBinnableDyn { +pub trait TimeBinned: Framable + Sync + Send + TimeBinnableDyn { fn aggregator_new(&self) -> Box; fn as_time_binnable_dyn(&self) -> &dyn TimeBinnableDyn; + fn workaround_clone(&self) -> Box; + fn dummy_test_i32(&self) -> i32; } // TODO should get I/O and tokio dependence out of this crate diff --git a/nodenet/Cargo.toml b/nodenet/Cargo.toml index 657b92b..e582011 100644 --- a/nodenet/Cargo.toml +++ b/nodenet/Cargo.toml @@ -31,4 +31,5 @@ disk = { path = "../disk" } archapp_wrap = { path = "../archapp_wrap" } #parse = { path = "../parse" } items = { path = "../items" } +dbconn = { path = "../dbconn" } taskrun = { path = "../taskrun" } diff --git a/nodenet/src/conn.rs b/nodenet/src/conn.rs index b4463f5..d25aab0 100644 --- a/nodenet/src/conn.rs +++ b/nodenet/src/conn.rs @@ -1,4 +1,4 @@ -use crate::scylla::make_scylla_stream; +use dbconn::events_scylla::make_scylla_stream; use disk::frame::inmem::InMemoryFrameAsyncReadStream; use err::Error; use futures_core::Stream; diff --git a/nodenet/src/scylla.rs b/nodenet/src/scylla.rs index 83fecc3..8b13789 100644 --- a/nodenet/src/scylla.rs +++ b/nodenet/src/scylla.rs @@ -1,512 +1 @@ -use err::Error; -use futures_core::{Future, Stream}; -use futures_util::FutureExt; -use items::scalarevents::ScalarEvents; -use items::waveevents::WaveEvents; -use items::{Framable, RangeCompletableItem, StreamItem}; -use netpod::log::*; -use netpod::query::RawEventsQuery; -use netpod::{Channel, Database, NanoRange, ScalarType, ScyllaConfig, Shape}; -use scylla::frame::response::cql_to_rust::FromRowError as ScyFromRowError; -use scylla::transport::errors::{NewSessionError as ScyNewSessionError, QueryError as ScyQueryError}; -use scylla::Session as ScySession; -use std::collections::VecDeque; -use std::pin::Pin; -use std::sync::Arc; -use std::task::{Context, Poll}; -use tokio_postgres::Client as PgClient; -trait ErrConv { - fn err_conv(self) -> Result; -} - -impl ErrConv for Result { - fn err_conv(self) -> Result { - match self { - Ok(k) => Ok(k), - Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))), - } - } -} - -impl ErrConv for Result { - fn err_conv(self) -> Result { - match self { - Ok(k) => Ok(k), - Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))), - } - } -} - -impl ErrConv for Result { - fn err_conv(self) -> Result { - match self { - Ok(k) => Ok(k), - Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))), - } - } -} - -impl ErrConv for Result { - fn err_conv(self) -> Result { - match self { - Ok(k) => Ok(k), - Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))), - } - } -} - -macro_rules! impl_read_values_fut { - ($fname:ident, $self:expr, $ts_msp:expr) => {{ - let fut = $fname($self.series, $ts_msp, $self.range.clone(), $self.scy.clone()); - let fut = fut.map(|x| { - let x2 = match x { - Ok(k) => { - // - Ok(StreamItem::DataItem(RangeCompletableItem::Data(k))) - } - Err(e) => { - // - Err(e) - } - }; - //Box::new(Ok(StreamItem::DataItem(RangeCompletableItem::Data(x)))) as Box}); - let ret = Box::new(x2) as Box; - ret - }); - let fut = Box::pin(fut) as Pin> + Send>>; - fut - }}; -} - -struct ReadValues { - series: i64, - scalar_type: ScalarType, - shape: Shape, - range: NanoRange, - ts_msp: VecDeque, - fut: Pin> + Send>>, - scy: Arc, -} - -impl ReadValues { - fn new( - series: i64, - scalar_type: ScalarType, - shape: Shape, - range: NanoRange, - ts_msp: VecDeque, - scy: Arc, - ) -> Self { - Self { - series, - scalar_type, - shape, - range, - ts_msp, - fut: Box::pin(futures_util::future::lazy(|_| panic!())), - scy, - } - } - - fn next(&mut self) -> bool { - if let Some(ts_msp) = self.ts_msp.pop_front() { - self.fut = self.make_fut(ts_msp, self.ts_msp.len() > 1); - true - } else { - false - } - } - - fn make_fut( - &mut self, - ts_msp: u64, - _has_more_msp: bool, - ) -> Pin> + Send>> { - // TODO this also needs to differentiate on Shape. - let fut = match &self.shape { - Shape::Scalar => match &self.scalar_type { - ScalarType::I32 => { - impl_read_values_fut!(read_next_values_scalar_i32, self, ts_msp) - } - ScalarType::F32 => { - impl_read_values_fut!(read_next_values_scalar_f32, self, ts_msp) - } - ScalarType::F64 => { - impl_read_values_fut!(read_next_values_scalar_f64, self, ts_msp) - } - _ => err::todoval(), - }, - Shape::Wave(_) => match &self.scalar_type { - ScalarType::U16 => { - impl_read_values_fut!(read_next_values_array_u16, self, ts_msp) - } - _ => err::todoval(), - }, - _ => err::todoval(), - }; - fut - } -} - -enum FrState { - New, - FindSeries(Pin> + Send>>), - FindMsp(Pin, Error>> + Send>>), - ReadValues(ReadValues), - Done, -} - -pub struct ScyllaFramableStream { - state: FrState, - #[allow(unused)] - evq: RawEventsQuery, - #[allow(unused)] - channel: Channel, - series: u64, - range: NanoRange, - scalar_type: Option, - shape: Option, - scy: Arc, - pgclient: Arc, - do_test_stream_error: bool, -} - -impl ScyllaFramableStream { - pub fn new( - evq: &RawEventsQuery, - scy: Arc, - pgclient: Arc, - do_test_stream_error: bool, - ) -> Self { - Self { - state: FrState::New, - series: evq.channel.series.unwrap(), - evq: evq.clone(), - channel: evq.channel.clone(), - range: evq.range.clone(), - scalar_type: None, - shape: None, - scy, - pgclient, - do_test_stream_error, - } - } -} - -impl Stream for ScyllaFramableStream { - type Item = Box; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - use Poll::*; - if self.do_test_stream_error { - let e = Error::with_msg(format!("Test PRIVATE STREAM error.")) - .add_public_msg(format!("Test PUBLIC STREAM error.")); - return Ready(Some( - Box::new(Err::>>, _>(e)) as _, - )); - } - loop { - break match self.state { - FrState::New => { - let fut = find_series(self.series, self.pgclient.clone()); - let fut = Box::pin(fut); - self.state = FrState::FindSeries(fut); - continue; - } - FrState::FindSeries(ref mut fut) => match fut.poll_unpin(cx) { - Ready(Ok((scalar_type, shape))) => { - info!("ScyllaFramableStream found series {:?} {:?}", scalar_type, shape); - self.scalar_type = Some(scalar_type); - self.shape = Some(shape); - let fut = find_ts_msp(self.series as i64, self.range.clone(), self.scy.clone()); - let fut = Box::pin(fut); - self.state = FrState::FindMsp(fut); - continue; - } - Ready(Err(e)) => { - self.state = FrState::Done; - Ready(Some(Box::new( - Err(e) as Result>>, _> - ))) - } - Pending => Pending, - }, - FrState::FindMsp(ref mut fut) => match fut.poll_unpin(cx) { - Ready(Ok(ts_msp)) => { - info!("found ts_msp {ts_msp:?}"); - // TODO get rid of into() for VecDeque - let mut st = ReadValues::new( - self.series as i64, - self.scalar_type.as_ref().unwrap().clone(), - self.shape.as_ref().unwrap().clone(), - self.range.clone(), - // TODO get rid of the conversion: - ts_msp.into(), - self.scy.clone(), - ); - if st.next() { - self.state = FrState::ReadValues(st); - } else { - self.state = FrState::Done; - } - continue; - } - Ready(Err(e)) => { - self.state = FrState::Done; - Ready(Some(Box::new( - Err(e) as Result>>, _> - ))) - } - Pending => Pending, - }, - FrState::ReadValues(ref mut st) => match st.fut.poll_unpin(cx) { - Ready(item) => { - if st.next() { - } else { - info!("ReadValues exhausted"); - self.state = FrState::Done; - } - Ready(Some(item)) - } - Pending => Pending, - }, - FrState::Done => Ready(None), - }; - } - } -} - -async fn find_series(series: u64, pgclient: Arc) -> Result<(ScalarType, Shape), Error> { - info!("find_series series {}", series); - let rows = { - let q = "select facility, channel, scalar_type, shape_dims from series_by_channel where series = $1"; - pgclient.query(q, &[&(series as i64)]).await.err_conv()? - }; - if rows.len() < 1 { - return Err(Error::with_public_msg_no_trace( - "Multiple series found for channel, can not return data for ambiguous series", - )); - } - if rows.len() > 1 { - error!("Multiple series found for channel, can not return data for ambiguous series"); - return Err(Error::with_public_msg_no_trace( - "Multiple series found for channel, can not return data for ambiguous series", - )); - } - let row = rows - .into_iter() - .next() - .ok_or_else(|| Error::with_public_msg_no_trace(format!("can not find series for channel")))?; - info!("row {row:?}"); - let _facility: String = row.get(0); - let _channel: String = row.get(1); - let a: i32 = row.get(2); - let scalar_type = ScalarType::from_scylla_i32(a)?; - let a: Vec = row.get(3); - let shape = Shape::from_scylla_shape_dims(&a)?; - info!("make_scylla_stream series {series} scalar_type {scalar_type:?} shape {shape:?}"); - Ok((scalar_type, shape)) -} - -async fn find_ts_msp(series: i64, range: NanoRange, scy: Arc) -> Result, Error> { - trace!("find_ts_msp series {} {:?}", series, range); - // TODO use prepared statements - let cql = "select ts_msp from ts_msp where series = ? and ts_msp < ? order by ts_msp desc limit 1"; - let res = scy.query(cql, (series, range.beg as i64)).await.err_conv()?; - let mut before = vec![]; - for row in res.rows_typed_or_empty::<(i64,)>() { - let row = row.err_conv()?; - before.push(row.0 as u64); - } - trace!("FOUND BEFORE THE REQUESTED TIME: {} {:?}", before.len(), before); - let cql = "select ts_msp from ts_msp where series = ? and ts_msp >= ? and ts_msp < ?"; - let res = scy - .query(cql, (series, range.beg as i64, range.end as i64)) - .await - .err_conv()?; - let mut ret = vec![]; - for x in before { - ret.push(x); - } - for row in res.rows_typed_or_empty::<(i64,)>() { - let row = row.err_conv()?; - ret.push(row.0 as u64); - } - trace!("found in total {} rows", ret.len()); - Ok(ret) -} - -macro_rules! read_next_scalar_values { - ($fname:ident, $st:ty, $scyty:ty, $table_name:expr) => { - async fn $fname( - series: i64, - ts_msp: u64, - range: NanoRange, - scy: Arc, - ) -> Result, Error> { - type ST = $st; - type SCYTY = $scyty; - trace!("{} series {} ts_msp {}", stringify!($fname), series, ts_msp); - let _ts_lsp_max = if range.end <= ts_msp { - // TODO we should not be here... - } else { - }; - if range.end > i64::MAX as u64 { - return Err(Error::with_msg_no_trace(format!("range.end overflows i64"))); - } - let ts_lsp_max = range.end; - let cql = concat!( - "select ts_lsp, pulse, value from ", - $table_name, - " where series = ? and ts_msp = ? and ts_lsp < ?" - ); - let res = scy - .query(cql, (series, ts_msp as i64, ts_lsp_max as i64)) - .await - .err_conv()?; - let mut ret = ScalarEvents::::empty(); - let mut discarded = 0; - for row in res.rows_typed_or_empty::<(i64, i64, SCYTY)>() { - let row = row.err_conv()?; - let ts = ts_msp + row.0 as u64; - let pulse = row.1 as u64; - let value = row.2 as ST; - if ts < range.beg || ts >= range.end { - discarded += 1; - } else { - ret.push(ts, pulse, value); - } - } - trace!( - "found in total {} events ts_msp {} discarded {}", - ret.tss.len(), - ts_msp, - discarded - ); - Ok(ret) - } - }; -} - -macro_rules! read_next_array_values { - ($fname:ident, $st:ty, $scyty:ty, $table_name:expr) => { - async fn $fname( - series: i64, - ts_msp: u64, - _range: NanoRange, - scy: Arc, - ) -> Result, Error> { - type ST = $st; - type SCYTY = $scyty; - info!("{} series {} ts_msp {}", stringify!($fname), series, ts_msp); - let cql = concat!( - "select ts_lsp, pulse, value from ", - $table_name, - " where series = ? and ts_msp = ?" - ); - let res = scy.query(cql, (series, ts_msp as i64)).await.err_conv()?; - let mut ret = WaveEvents::::empty(); - for row in res.rows_typed_or_empty::<(i64, i64, Vec)>() { - let row = row.err_conv()?; - let ts = ts_msp + row.0 as u64; - let pulse = row.1 as u64; - let value = row.2.into_iter().map(|x| x as ST).collect(); - ret.push(ts, pulse, value); - } - info!("found in total {} events ts_msp {}", ret.tss.len(), ts_msp); - Ok(ret) - } - }; -} - -read_next_scalar_values!(read_next_values_scalar_i32, i32, i32, "events_scalar_i32"); -read_next_scalar_values!(read_next_values_scalar_f32, f32, f32, "events_scalar_f32"); -read_next_scalar_values!(read_next_values_scalar_f64, f64, f64, "events_scalar_f64"); - -read_next_array_values!(read_next_values_array_u16, u16, i16, "events_wave_u16"); - -pub async fn make_scylla_stream( - evq: &RawEventsQuery, - scyco: &ScyllaConfig, - dbconf: Database, - do_test_stream_error: bool, -) -> Result> + Send>>, Error> { - info!("make_scylla_stream open scylla connection"); - // TODO reuse existing connection: - let scy = scylla::SessionBuilder::new() - .known_nodes(&scyco.hosts) - .use_keyspace(&scyco.keyspace, true) - .build() - .await - .err_conv()?; - let u = { - let d = &dbconf; - format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, d.port, d.name) - }; - let (pgclient, pgconn) = tokio_postgres::connect(&u, tokio_postgres::NoTls).await.err_conv()?; - // TODO use common connection/pool: - tokio::spawn(pgconn); - let pgclient = Arc::new(pgclient); - let scy = Arc::new(scy); - let res = Box::pin(ScyllaFramableStream::new(evq, scy, pgclient, do_test_stream_error)) as _; - Ok(res) -} - -pub async fn make_scylla_stream_2( - evq: &RawEventsQuery, - scyco: &ScyllaConfig, -) -> Result> + Send>>, Error> { - // Find the "series" id. - info!("make_scylla_stream finding series id"); - let scy = scylla::SessionBuilder::new() - .known_nodes(&scyco.hosts) - .use_keyspace(&scyco.keyspace, true) - .build() - .await - .err_conv()?; - let res = { - let cql = - "select series, scalar_type, shape_dims from series_by_channel where facility = ? and channel_name = ?"; - scy.query(cql, (&evq.channel.backend, evq.channel.name())) - .await - .err_conv()? - }; - let rows: Vec<_> = res.rows_typed_or_empty::<(i64, i32, Vec)>().collect(); - if rows.len() > 1 { - error!("Multiple series found for channel, can not return data for ambiguous series"); - return Err(Error::with_public_msg_no_trace( - "Multiple series found for channel, can not return data for ambiguous series", - )); - } - if rows.len() < 1 { - return Err(Error::with_public_msg_no_trace( - "Multiple series found for channel, can not return data for ambiguous series", - )); - } - let row = rows - .into_iter() - .next() - .ok_or_else(|| Error::with_public_msg_no_trace(format!("can not find series for channel")))? - .err_conv()?; - info!("make_scylla_stream row {row:?}"); - let series = row.0; - info!("make_scylla_stream series {series}"); - let _expand = evq.agg_kind.need_expand(); - let range = &evq.range; - { - let cql = "select ts_msp from ts_msp where series = ? and ts_msp >= ? and ts_msp < ?"; - let res = scy - .query(cql, (series, range.beg as i64, range.end as i64)) - .await - .err_conv()?; - let mut rc = 0; - for _row in res.rows_or_empty() { - rc += 1; - } - info!("found in total {} rows", rc); - } - error!("TODO scylla fetch continue here"); - let res = Box::pin(futures_util::stream::empty()); - Ok(res) -}