Move scylla event fetch logic to dbconn crate

This commit is contained in:
Dominik Werder
2022-06-22 10:17:00 +02:00
parent 9161b7cc1d
commit c046303c7f
7 changed files with 589 additions and 537 deletions

View File

@@ -1,11 +1,17 @@
use crate::events_scylla::ScyllaFramableStream;
use crate::ErrConv;
use err::Error;
use futures_util::{Future, Stream, StreamExt};
use items::TimeBinned;
use netpod::log::*;
use netpod::{ChannelTyped, NanoRange, PreBinnedPatchCoord, PreBinnedPatchIterator, PreBinnedPatchRange, ScyllaConfig};
use netpod::query::RawEventsQuery;
use netpod::{
AggKind, ChannelTyped, NanoRange, PreBinnedPatchCoord, PreBinnedPatchIterator, PreBinnedPatchRange, ScyllaConfig,
};
use scylla::Session as ScySession;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
pub async fn read_cached_scylla(
chn: &ChannelTyped,
@@ -20,57 +26,96 @@ pub async fn read_cached_scylla(
err::todoval()
}
pub async fn write_cached_scylla(
#[allow(unused)]
struct WriteFut<'a> {
chn: &'a ChannelTyped,
coord: &'a PreBinnedPatchCoord,
data: &'a dyn TimeBinned,
scy: &'a ScySession,
}
impl<'a> WriteFut<'a> {
fn new(
chn: &'a ChannelTyped,
coord: &'a PreBinnedPatchCoord,
data: &'a dyn TimeBinned,
scy: &'a ScySession,
) -> Self {
Self { chn, coord, data, scy }
}
}
impl<'a> Future for WriteFut<'a> {
type Output = Result<(), Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let _ = cx;
todo!()
}
}
pub fn write_cached_scylla<'a>(
chn: &ChannelTyped,
coord: &PreBinnedPatchCoord,
data: &dyn TimeBinned,
data: &'a dyn TimeBinned,
scy: &ScySession,
) -> Result<(), Error> {
let _ = coord;
let _ = data;
let series = chn.series_id()?;
let res = scy.query_iter("", (series as i64,)).await.err_conv()?;
let _ = res;
// TODO write the data.
err::todoval()
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'a>> {
let chn = unsafe { &*(chn as *const ChannelTyped) };
let data = unsafe { &*(data as *const dyn TimeBinned) };
let scy = unsafe { &*(scy as *const ScySession) };
let fut = async move {
let _ = coord;
let series = chn.series_id()?;
let res = scy
.query_iter("", (series as i64, data.dummy_test_i32()))
.await
.err_conv()?;
let _ = res;
// TODO write the data.
//err::todoval();
Ok(())
};
Box::pin(fut)
}
// TODO must indicate to the caller whether it is safe to cache this (complete).
pub async fn fetch_uncached_data(
chn: ChannelTyped,
coord: PreBinnedPatchCoord,
scy: &ScySession,
scy: Arc<ScySession>,
) -> Result<Option<Box<dyn TimeBinned>>, Error> {
info!("fetch_uncached_data");
let range = coord.patch_range();
// TODO why the extra plus one?
let bin = match PreBinnedPatchRange::covering_range(range, coord.bin_count() + 1) {
Ok(Some(range)) => fetch_uncached_higher_res_prebinned(&chn, &range, scy).await,
Ok(None) => fetch_uncached_binned_events(&chn, &coord.patch_range(), scy).await,
Ok(Some(range)) => fetch_uncached_higher_res_prebinned(&chn, &range, scy.clone()).await,
Ok(None) => fetch_uncached_binned_events(&chn, &coord.patch_range(), scy.clone()).await,
Err(e) => Err(e),
}?;
err::todoval()
//let data = bin.workaround_clone();
WriteFut::new(&chn, &coord, bin.as_ref(), &scy).await?;
write_cached_scylla(&chn, &coord, bin.as_ref(), &scy).await?;
Ok(Some(bin))
}
pub fn fetch_uncached_data_box(
chn: &ChannelTyped,
coord: &PreBinnedPatchCoord,
scy: &ScySession,
scy: Arc<ScySession>,
) -> Pin<Box<dyn Future<Output = Result<Option<Box<dyn TimeBinned>>, Error>> + Send>> {
let scy = unsafe { &*(scy as *const _) };
Box::pin(fetch_uncached_data(chn.clone(), coord.clone(), scy))
}
pub async fn fetch_uncached_higher_res_prebinned(
chn: &ChannelTyped,
range: &PreBinnedPatchRange,
scy: &ScySession,
scy: Arc<ScySession>,
) -> Result<Box<dyn TimeBinned>, Error> {
let mut aggt = None;
let patch_it = PreBinnedPatchIterator::from_range(range.clone());
for patch in patch_it {
let coord = PreBinnedPatchCoord::new(patch.bin_t_len(), patch.patch_t_len(), patch.ix());
let mut stream = pre_binned_value_stream_with_scy(chn, &coord, scy).await?;
let mut stream = pre_binned_value_stream_with_scy(chn, &coord, scy.clone()).await?;
while let Some(item) = stream.next().await {
let item = item?;
// TODO here I will need some new API to aggregate (time-bin) trait objects.
@@ -92,25 +137,38 @@ pub async fn fetch_uncached_higher_res_prebinned(
pub async fn fetch_uncached_binned_events(
chn: &ChannelTyped,
range: &NanoRange,
scy: &ScySession,
scy: Arc<ScySession>,
) -> Result<Box<dyn TimeBinned>, Error> {
// TODO ask Scylla directly, do not go through HTTP.
// Refactor the event fetch stream code such that I can use that easily here.
let evq = RawEventsQuery::new(chn.channel.clone(), range.clone(), AggKind::Plain);
let _res = Box::pin(ScyllaFramableStream::new(
&evq,
chn.scalar_type.clone(),
chn.shape.clone(),
scy,
false,
));
// TODO ScyllaFramableStream must return a new events trait object designed for trait object use.
err::todoval()
}
pub async fn pre_binned_value_stream_with_scy(
chn: &ChannelTyped,
coord: &PreBinnedPatchCoord,
scy: &ScySession,
scy: Arc<ScySession>,
) -> Result<Pin<Box<dyn Stream<Item = Result<Box<dyn TimeBinned>, Error>> + Send>>, Error> {
info!("pre_binned_value_stream_with_scy {chn:?} {coord:?}");
// TODO determine the range:
let range = err::todoval();
if let Some(item) = read_cached_scylla(chn, &range, &scy).await? {
Ok(Box::pin(futures_util::stream::iter([Ok(item)])))
} else {
let bin = fetch_uncached_data_box(chn, coord, scy).await?;
Ok(Box::pin(futures_util::stream::empty()))
// TODO when can it ever be that we get back a None?
// TODO also, probably the caller wants to know whether the bin is Complete.
let bin = bin.unwrap();
Ok(Box::pin(futures_util::stream::iter([Ok(bin)])))
}
}
@@ -126,5 +184,6 @@ pub async fn pre_binned_value_stream(
.build()
.await
.err_conv()?;
pre_binned_value_stream_with_scy(chn, coord, &scy).await
let scy = Arc::new(scy);
pre_binned_value_stream_with_scy(chn, coord, scy).await
}

View File

@@ -1,5 +1,6 @@
pub mod scan;
pub mod bincache;
pub mod events_scylla;
pub mod scan;
pub mod search;
pub mod pg {
pub use tokio_postgres::{Client, Error};

500
dbconn/src/events_scylla.rs Normal file
View File

@@ -0,0 +1,500 @@
use err::Error;
use futures_core::{Future, Stream};
use futures_util::FutureExt;
use items::scalarevents::ScalarEvents;
use items::waveevents::WaveEvents;
use items::{Framable, RangeCompletableItem, StreamItem};
use netpod::log::*;
use netpod::query::RawEventsQuery;
use netpod::{Database, NanoRange, ScalarType, ScyllaConfig, Shape};
use scylla::frame::response::cql_to_rust::FromRowError as ScyFromRowError;
use scylla::transport::errors::{NewSessionError as ScyNewSessionError, QueryError as ScyQueryError};
use scylla::Session as ScySession;
use std::collections::VecDeque;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio_postgres::Client as PgClient;
trait ErrConv<T> {
fn err_conv(self) -> Result<T, Error>;
}
impl<T> ErrConv<T> for Result<T, ScyQueryError> {
fn err_conv(self) -> Result<T, Error> {
match self {
Ok(k) => Ok(k),
Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))),
}
}
}
impl<T> ErrConv<T> for Result<T, ScyNewSessionError> {
fn err_conv(self) -> Result<T, Error> {
match self {
Ok(k) => Ok(k),
Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))),
}
}
}
impl<T> ErrConv<T> for Result<T, ScyFromRowError> {
fn err_conv(self) -> Result<T, Error> {
match self {
Ok(k) => Ok(k),
Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))),
}
}
}
impl<T> ErrConv<T> for Result<T, tokio_postgres::Error> {
fn err_conv(self) -> Result<T, Error> {
match self {
Ok(k) => Ok(k),
Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))),
}
}
}
macro_rules! impl_read_values_fut {
($fname:ident, $self:expr, $ts_msp:expr) => {{
let fut = $fname($self.series, $ts_msp, $self.range.clone(), $self.scy.clone());
let fut = fut.map(|x| {
let x2 = match x {
Ok(k) => {
//
Ok(StreamItem::DataItem(RangeCompletableItem::Data(k)))
}
Err(e) => {
//
Err(e)
}
};
//Box::new(Ok(StreamItem::DataItem(RangeCompletableItem::Data(x)))) as Box<dyn Framable + Send>});
let ret = Box::new(x2) as Box<dyn Framable + 'static>;
ret
});
let fut = Box::pin(fut) as Pin<Box<dyn Future<Output = Box<dyn Framable>> + Send>>;
fut
}};
}
struct ReadValues {
series: i64,
scalar_type: ScalarType,
shape: Shape,
range: NanoRange,
ts_msp: VecDeque<u64>,
fut: Pin<Box<dyn Future<Output = Box<dyn Framable>> + Send>>,
scy: Arc<ScySession>,
}
impl ReadValues {
fn new(
series: i64,
scalar_type: ScalarType,
shape: Shape,
range: NanoRange,
ts_msp: VecDeque<u64>,
scy: Arc<ScySession>,
) -> Self {
Self {
series,
scalar_type,
shape,
range,
ts_msp,
fut: Box::pin(futures_util::future::lazy(|_| panic!())),
scy,
}
}
fn next(&mut self) -> bool {
if let Some(ts_msp) = self.ts_msp.pop_front() {
self.fut = self.make_fut(ts_msp, self.ts_msp.len() > 1);
true
} else {
false
}
}
fn make_fut(
&mut self,
ts_msp: u64,
_has_more_msp: bool,
) -> Pin<Box<dyn Future<Output = Box<dyn Framable>> + Send>> {
// TODO this also needs to differentiate on Shape.
let fut = match &self.shape {
Shape::Scalar => match &self.scalar_type {
ScalarType::I32 => {
impl_read_values_fut!(read_next_values_scalar_i32, self, ts_msp)
}
ScalarType::F32 => {
impl_read_values_fut!(read_next_values_scalar_f32, self, ts_msp)
}
ScalarType::F64 => {
impl_read_values_fut!(read_next_values_scalar_f64, self, ts_msp)
}
_ => err::todoval(),
},
Shape::Wave(_) => match &self.scalar_type {
ScalarType::U16 => {
impl_read_values_fut!(read_next_values_array_u16, self, ts_msp)
}
_ => err::todoval(),
},
_ => err::todoval(),
};
fut
}
}
enum FrState {
New,
FindMsp(Pin<Box<dyn Future<Output = Result<Vec<u64>, Error>> + Send>>),
ReadValues(ReadValues),
Done,
}
pub struct ScyllaFramableStream {
state: FrState,
#[allow(unused)]
evq: RawEventsQuery,
scalar_type: ScalarType,
shape: Shape,
series: u64,
range: NanoRange,
scy: Arc<ScySession>,
do_test_stream_error: bool,
}
impl ScyllaFramableStream {
pub fn new(
evq: &RawEventsQuery,
scalar_type: ScalarType,
shape: Shape,
scy: Arc<ScySession>,
do_test_stream_error: bool,
) -> Self {
Self {
state: FrState::New,
series: evq.channel.series.unwrap(),
evq: evq.clone(),
scalar_type,
shape,
range: evq.range.clone(),
scy,
do_test_stream_error,
}
}
}
impl Stream for ScyllaFramableStream {
type Item = Box<dyn Framable>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
if self.do_test_stream_error {
let e = Error::with_msg(format!("Test PRIVATE STREAM error."))
.add_public_msg(format!("Test PUBLIC STREAM error."));
return Ready(Some(
Box::new(Err::<StreamItem<RangeCompletableItem<ScalarEvents<f32>>>, _>(e)) as _,
));
}
loop {
break match self.state {
FrState::New => {
let fut = find_ts_msp(self.series as i64, self.range.clone(), self.scy.clone());
let fut = Box::pin(fut);
self.state = FrState::FindMsp(fut);
continue;
}
FrState::FindMsp(ref mut fut) => match fut.poll_unpin(cx) {
Ready(Ok(ts_msp)) => {
info!("found ts_msp {ts_msp:?}");
// TODO get rid of into() for VecDeque
let mut st = ReadValues::new(
self.series as i64,
self.scalar_type.clone(),
self.shape.clone(),
self.range.clone(),
// TODO get rid of the conversion:
ts_msp.into(),
self.scy.clone(),
);
if st.next() {
self.state = FrState::ReadValues(st);
} else {
self.state = FrState::Done;
}
continue;
}
Ready(Err(e)) => {
self.state = FrState::Done;
Ready(Some(Box::new(
Err(e) as Result<StreamItem<RangeCompletableItem<ScalarEvents<f32>>>, _>
)))
}
Pending => Pending,
},
FrState::ReadValues(ref mut st) => match st.fut.poll_unpin(cx) {
Ready(item) => {
if st.next() {
} else {
info!("ReadValues exhausted");
self.state = FrState::Done;
}
Ready(Some(item))
}
Pending => Pending,
},
FrState::Done => Ready(None),
};
}
}
}
async fn find_series(series: u64, pgclient: Arc<PgClient>) -> Result<(ScalarType, Shape), Error> {
info!("find_series series {}", series);
let rows = {
let q = "select facility, channel, scalar_type, shape_dims from series_by_channel where series = $1";
pgclient.query(q, &[&(series as i64)]).await.err_conv()?
};
if rows.len() < 1 {
return Err(Error::with_public_msg_no_trace(
"Multiple series found for channel, can not return data for ambiguous series",
));
}
if rows.len() > 1 {
error!("Multiple series found for channel, can not return data for ambiguous series");
return Err(Error::with_public_msg_no_trace(
"Multiple series found for channel, can not return data for ambiguous series",
));
}
let row = rows
.into_iter()
.next()
.ok_or_else(|| Error::with_public_msg_no_trace(format!("can not find series for channel")))?;
info!("row {row:?}");
let _facility: String = row.get(0);
let _channel: String = row.get(1);
let a: i32 = row.get(2);
let scalar_type = ScalarType::from_scylla_i32(a)?;
let a: Vec<i32> = row.get(3);
let shape = Shape::from_scylla_shape_dims(&a)?;
info!("make_scylla_stream series {series} scalar_type {scalar_type:?} shape {shape:?}");
Ok((scalar_type, shape))
}
async fn find_ts_msp(series: i64, range: NanoRange, scy: Arc<ScySession>) -> Result<Vec<u64>, Error> {
trace!("find_ts_msp series {} {:?}", series, range);
// TODO use prepared statements
let cql = "select ts_msp from ts_msp where series = ? and ts_msp < ? order by ts_msp desc limit 1";
let res = scy.query(cql, (series, range.beg as i64)).await.err_conv()?;
let mut before = vec![];
for row in res.rows_typed_or_empty::<(i64,)>() {
let row = row.err_conv()?;
before.push(row.0 as u64);
}
trace!("FOUND BEFORE THE REQUESTED TIME: {} {:?}", before.len(), before);
let cql = "select ts_msp from ts_msp where series = ? and ts_msp >= ? and ts_msp < ?";
let res = scy
.query(cql, (series, range.beg as i64, range.end as i64))
.await
.err_conv()?;
let mut ret = vec![];
for x in before {
ret.push(x);
}
for row in res.rows_typed_or_empty::<(i64,)>() {
let row = row.err_conv()?;
ret.push(row.0 as u64);
}
trace!("found in total {} rows", ret.len());
Ok(ret)
}
macro_rules! read_next_scalar_values {
($fname:ident, $st:ty, $scyty:ty, $table_name:expr) => {
async fn $fname(
series: i64,
ts_msp: u64,
range: NanoRange,
scy: Arc<ScySession>,
) -> Result<ScalarEvents<$st>, Error> {
type ST = $st;
type SCYTY = $scyty;
trace!("{} series {} ts_msp {}", stringify!($fname), series, ts_msp);
let _ts_lsp_max = if range.end <= ts_msp {
// TODO we should not be here...
} else {
};
if range.end > i64::MAX as u64 {
return Err(Error::with_msg_no_trace(format!("range.end overflows i64")));
}
let ts_lsp_max = range.end;
let cql = concat!(
"select ts_lsp, pulse, value from ",
$table_name,
" where series = ? and ts_msp = ? and ts_lsp < ?"
);
let res = scy
.query(cql, (series, ts_msp as i64, ts_lsp_max as i64))
.await
.err_conv()?;
let mut ret = ScalarEvents::<ST>::empty();
let mut discarded = 0;
for row in res.rows_typed_or_empty::<(i64, i64, SCYTY)>() {
let row = row.err_conv()?;
let ts = ts_msp + row.0 as u64;
let pulse = row.1 as u64;
let value = row.2 as ST;
if ts < range.beg || ts >= range.end {
discarded += 1;
} else {
ret.push(ts, pulse, value);
}
}
trace!(
"found in total {} events ts_msp {} discarded {}",
ret.tss.len(),
ts_msp,
discarded
);
Ok(ret)
}
};
}
macro_rules! read_next_array_values {
($fname:ident, $st:ty, $scyty:ty, $table_name:expr) => {
async fn $fname(
series: i64,
ts_msp: u64,
_range: NanoRange,
scy: Arc<ScySession>,
) -> Result<WaveEvents<$st>, Error> {
type ST = $st;
type SCYTY = $scyty;
info!("{} series {} ts_msp {}", stringify!($fname), series, ts_msp);
let cql = concat!(
"select ts_lsp, pulse, value from ",
$table_name,
" where series = ? and ts_msp = ?"
);
let res = scy.query(cql, (series, ts_msp as i64)).await.err_conv()?;
let mut ret = WaveEvents::<ST>::empty();
for row in res.rows_typed_or_empty::<(i64, i64, Vec<SCYTY>)>() {
let row = row.err_conv()?;
let ts = ts_msp + row.0 as u64;
let pulse = row.1 as u64;
let value = row.2.into_iter().map(|x| x as ST).collect();
ret.push(ts, pulse, value);
}
info!("found in total {} events ts_msp {}", ret.tss.len(), ts_msp);
Ok(ret)
}
};
}
read_next_scalar_values!(read_next_values_scalar_i32, i32, i32, "events_scalar_i32");
read_next_scalar_values!(read_next_values_scalar_f32, f32, f32, "events_scalar_f32");
read_next_scalar_values!(read_next_values_scalar_f64, f64, f64, "events_scalar_f64");
read_next_array_values!(read_next_values_array_u16, u16, i16, "events_wave_u16");
pub async fn make_scylla_stream(
evq: &RawEventsQuery,
scyco: &ScyllaConfig,
dbconf: Database,
do_test_stream_error: bool,
) -> Result<Pin<Box<dyn Stream<Item = Box<dyn Framable>> + Send>>, Error> {
info!("make_scylla_stream open scylla connection");
// TODO should RawEventsQuery already contain ScalarType and Shape?
let (scalar_type, shape) = {
let u = {
let d = &dbconf;
format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, d.port, d.name)
};
let (pgclient, pgconn) = tokio_postgres::connect(&u, tokio_postgres::NoTls).await.err_conv()?;
// TODO use common connection/pool:
tokio::spawn(pgconn);
let pgclient = Arc::new(pgclient);
find_series(evq.channel.series.unwrap(), pgclient.clone()).await?
};
// TODO reuse existing connection:
let scy = scylla::SessionBuilder::new()
.known_nodes(&scyco.hosts)
.use_keyspace(&scyco.keyspace, true)
.build()
.await
.err_conv()?;
let scy = Arc::new(scy);
let res = Box::pin(ScyllaFramableStream::new(
evq,
scalar_type,
shape,
scy,
do_test_stream_error,
)) as _;
Ok(res)
}
#[allow(unused)]
async fn _make_scylla_stream_2(
evq: &RawEventsQuery,
scyco: &ScyllaConfig,
) -> Result<Pin<Box<dyn Stream<Item = Box<dyn Framable>> + Send>>, Error> {
// Find the "series" id.
info!("make_scylla_stream finding series id");
let scy = scylla::SessionBuilder::new()
.known_nodes(&scyco.hosts)
.use_keyspace(&scyco.keyspace, true)
.build()
.await
.err_conv()?;
let res = {
let cql =
"select series, scalar_type, shape_dims from series_by_channel where facility = ? and channel_name = ?";
scy.query(cql, (&evq.channel.backend, evq.channel.name()))
.await
.err_conv()?
};
let rows: Vec<_> = res.rows_typed_or_empty::<(i64, i32, Vec<i32>)>().collect();
if rows.len() > 1 {
error!("Multiple series found for channel, can not return data for ambiguous series");
return Err(Error::with_public_msg_no_trace(
"Multiple series found for channel, can not return data for ambiguous series",
));
}
if rows.len() < 1 {
return Err(Error::with_public_msg_no_trace(
"Multiple series found for channel, can not return data for ambiguous series",
));
}
let row = rows
.into_iter()
.next()
.ok_or_else(|| Error::with_public_msg_no_trace(format!("can not find series for channel")))?
.err_conv()?;
info!("make_scylla_stream row {row:?}");
let series = row.0;
info!("make_scylla_stream series {series}");
let _expand = evq.agg_kind.need_expand();
let range = &evq.range;
{
let cql = "select ts_msp from ts_msp where series = ? and ts_msp >= ? and ts_msp < ?";
let res = scy
.query(cql, (series, range.beg as i64, range.end as i64))
.await
.err_conv()?;
let mut rc = 0;
for _row in res.rows_or_empty() {
rc += 1;
}
info!("found in total {} rows", rc);
}
error!("TODO scylla fetch continue here");
let res = Box::pin(futures_util::stream::empty());
Ok(res)
}

View File

@@ -421,9 +421,11 @@ pub trait TimeBinnableDynAggregator: Send {
fn result(&mut self) -> Box<dyn TimeBinned>;
}
pub trait TimeBinned: Framable + Send + TimeBinnableDyn {
pub trait TimeBinned: Framable + Sync + Send + TimeBinnableDyn {
fn aggregator_new(&self) -> Box<dyn TimeBinnableDynAggregator>;
fn as_time_binnable_dyn(&self) -> &dyn TimeBinnableDyn;
fn workaround_clone(&self) -> Box<dyn TimeBinned>;
fn dummy_test_i32(&self) -> i32;
}
// TODO should get I/O and tokio dependence out of this crate

View File

@@ -31,4 +31,5 @@ disk = { path = "../disk" }
archapp_wrap = { path = "../archapp_wrap" }
#parse = { path = "../parse" }
items = { path = "../items" }
dbconn = { path = "../dbconn" }
taskrun = { path = "../taskrun" }

View File

@@ -1,4 +1,4 @@
use crate::scylla::make_scylla_stream;
use dbconn::events_scylla::make_scylla_stream;
use disk::frame::inmem::InMemoryFrameAsyncReadStream;
use err::Error;
use futures_core::Stream;

View File

@@ -1,512 +1 @@
use err::Error;
use futures_core::{Future, Stream};
use futures_util::FutureExt;
use items::scalarevents::ScalarEvents;
use items::waveevents::WaveEvents;
use items::{Framable, RangeCompletableItem, StreamItem};
use netpod::log::*;
use netpod::query::RawEventsQuery;
use netpod::{Channel, Database, NanoRange, ScalarType, ScyllaConfig, Shape};
use scylla::frame::response::cql_to_rust::FromRowError as ScyFromRowError;
use scylla::transport::errors::{NewSessionError as ScyNewSessionError, QueryError as ScyQueryError};
use scylla::Session as ScySession;
use std::collections::VecDeque;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio_postgres::Client as PgClient;
trait ErrConv<T> {
fn err_conv(self) -> Result<T, Error>;
}
impl<T> ErrConv<T> for Result<T, ScyQueryError> {
fn err_conv(self) -> Result<T, Error> {
match self {
Ok(k) => Ok(k),
Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))),
}
}
}
impl<T> ErrConv<T> for Result<T, ScyNewSessionError> {
fn err_conv(self) -> Result<T, Error> {
match self {
Ok(k) => Ok(k),
Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))),
}
}
}
impl<T> ErrConv<T> for Result<T, ScyFromRowError> {
fn err_conv(self) -> Result<T, Error> {
match self {
Ok(k) => Ok(k),
Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))),
}
}
}
impl<T> ErrConv<T> for Result<T, tokio_postgres::Error> {
fn err_conv(self) -> Result<T, Error> {
match self {
Ok(k) => Ok(k),
Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))),
}
}
}
macro_rules! impl_read_values_fut {
($fname:ident, $self:expr, $ts_msp:expr) => {{
let fut = $fname($self.series, $ts_msp, $self.range.clone(), $self.scy.clone());
let fut = fut.map(|x| {
let x2 = match x {
Ok(k) => {
//
Ok(StreamItem::DataItem(RangeCompletableItem::Data(k)))
}
Err(e) => {
//
Err(e)
}
};
//Box::new(Ok(StreamItem::DataItem(RangeCompletableItem::Data(x)))) as Box<dyn Framable + Send>});
let ret = Box::new(x2) as Box<dyn Framable + 'static>;
ret
});
let fut = Box::pin(fut) as Pin<Box<dyn Future<Output = Box<dyn Framable>> + Send>>;
fut
}};
}
struct ReadValues {
series: i64,
scalar_type: ScalarType,
shape: Shape,
range: NanoRange,
ts_msp: VecDeque<u64>,
fut: Pin<Box<dyn Future<Output = Box<dyn Framable>> + Send>>,
scy: Arc<ScySession>,
}
impl ReadValues {
fn new(
series: i64,
scalar_type: ScalarType,
shape: Shape,
range: NanoRange,
ts_msp: VecDeque<u64>,
scy: Arc<ScySession>,
) -> Self {
Self {
series,
scalar_type,
shape,
range,
ts_msp,
fut: Box::pin(futures_util::future::lazy(|_| panic!())),
scy,
}
}
fn next(&mut self) -> bool {
if let Some(ts_msp) = self.ts_msp.pop_front() {
self.fut = self.make_fut(ts_msp, self.ts_msp.len() > 1);
true
} else {
false
}
}
fn make_fut(
&mut self,
ts_msp: u64,
_has_more_msp: bool,
) -> Pin<Box<dyn Future<Output = Box<dyn Framable>> + Send>> {
// TODO this also needs to differentiate on Shape.
let fut = match &self.shape {
Shape::Scalar => match &self.scalar_type {
ScalarType::I32 => {
impl_read_values_fut!(read_next_values_scalar_i32, self, ts_msp)
}
ScalarType::F32 => {
impl_read_values_fut!(read_next_values_scalar_f32, self, ts_msp)
}
ScalarType::F64 => {
impl_read_values_fut!(read_next_values_scalar_f64, self, ts_msp)
}
_ => err::todoval(),
},
Shape::Wave(_) => match &self.scalar_type {
ScalarType::U16 => {
impl_read_values_fut!(read_next_values_array_u16, self, ts_msp)
}
_ => err::todoval(),
},
_ => err::todoval(),
};
fut
}
}
enum FrState {
New,
FindSeries(Pin<Box<dyn Future<Output = Result<(ScalarType, Shape), Error>> + Send>>),
FindMsp(Pin<Box<dyn Future<Output = Result<Vec<u64>, Error>> + Send>>),
ReadValues(ReadValues),
Done,
}
pub struct ScyllaFramableStream {
state: FrState,
#[allow(unused)]
evq: RawEventsQuery,
#[allow(unused)]
channel: Channel,
series: u64,
range: NanoRange,
scalar_type: Option<ScalarType>,
shape: Option<Shape>,
scy: Arc<ScySession>,
pgclient: Arc<PgClient>,
do_test_stream_error: bool,
}
impl ScyllaFramableStream {
pub fn new(
evq: &RawEventsQuery,
scy: Arc<ScySession>,
pgclient: Arc<PgClient>,
do_test_stream_error: bool,
) -> Self {
Self {
state: FrState::New,
series: evq.channel.series.unwrap(),
evq: evq.clone(),
channel: evq.channel.clone(),
range: evq.range.clone(),
scalar_type: None,
shape: None,
scy,
pgclient,
do_test_stream_error,
}
}
}
impl Stream for ScyllaFramableStream {
type Item = Box<dyn Framable>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
if self.do_test_stream_error {
let e = Error::with_msg(format!("Test PRIVATE STREAM error."))
.add_public_msg(format!("Test PUBLIC STREAM error."));
return Ready(Some(
Box::new(Err::<StreamItem<RangeCompletableItem<ScalarEvents<f32>>>, _>(e)) as _,
));
}
loop {
break match self.state {
FrState::New => {
let fut = find_series(self.series, self.pgclient.clone());
let fut = Box::pin(fut);
self.state = FrState::FindSeries(fut);
continue;
}
FrState::FindSeries(ref mut fut) => match fut.poll_unpin(cx) {
Ready(Ok((scalar_type, shape))) => {
info!("ScyllaFramableStream found series {:?} {:?}", scalar_type, shape);
self.scalar_type = Some(scalar_type);
self.shape = Some(shape);
let fut = find_ts_msp(self.series as i64, self.range.clone(), self.scy.clone());
let fut = Box::pin(fut);
self.state = FrState::FindMsp(fut);
continue;
}
Ready(Err(e)) => {
self.state = FrState::Done;
Ready(Some(Box::new(
Err(e) as Result<StreamItem<RangeCompletableItem<ScalarEvents<f32>>>, _>
)))
}
Pending => Pending,
},
FrState::FindMsp(ref mut fut) => match fut.poll_unpin(cx) {
Ready(Ok(ts_msp)) => {
info!("found ts_msp {ts_msp:?}");
// TODO get rid of into() for VecDeque
let mut st = ReadValues::new(
self.series as i64,
self.scalar_type.as_ref().unwrap().clone(),
self.shape.as_ref().unwrap().clone(),
self.range.clone(),
// TODO get rid of the conversion:
ts_msp.into(),
self.scy.clone(),
);
if st.next() {
self.state = FrState::ReadValues(st);
} else {
self.state = FrState::Done;
}
continue;
}
Ready(Err(e)) => {
self.state = FrState::Done;
Ready(Some(Box::new(
Err(e) as Result<StreamItem<RangeCompletableItem<ScalarEvents<f32>>>, _>
)))
}
Pending => Pending,
},
FrState::ReadValues(ref mut st) => match st.fut.poll_unpin(cx) {
Ready(item) => {
if st.next() {
} else {
info!("ReadValues exhausted");
self.state = FrState::Done;
}
Ready(Some(item))
}
Pending => Pending,
},
FrState::Done => Ready(None),
};
}
}
}
async fn find_series(series: u64, pgclient: Arc<PgClient>) -> Result<(ScalarType, Shape), Error> {
info!("find_series series {}", series);
let rows = {
let q = "select facility, channel, scalar_type, shape_dims from series_by_channel where series = $1";
pgclient.query(q, &[&(series as i64)]).await.err_conv()?
};
if rows.len() < 1 {
return Err(Error::with_public_msg_no_trace(
"Multiple series found for channel, can not return data for ambiguous series",
));
}
if rows.len() > 1 {
error!("Multiple series found for channel, can not return data for ambiguous series");
return Err(Error::with_public_msg_no_trace(
"Multiple series found for channel, can not return data for ambiguous series",
));
}
let row = rows
.into_iter()
.next()
.ok_or_else(|| Error::with_public_msg_no_trace(format!("can not find series for channel")))?;
info!("row {row:?}");
let _facility: String = row.get(0);
let _channel: String = row.get(1);
let a: i32 = row.get(2);
let scalar_type = ScalarType::from_scylla_i32(a)?;
let a: Vec<i32> = row.get(3);
let shape = Shape::from_scylla_shape_dims(&a)?;
info!("make_scylla_stream series {series} scalar_type {scalar_type:?} shape {shape:?}");
Ok((scalar_type, shape))
}
async fn find_ts_msp(series: i64, range: NanoRange, scy: Arc<ScySession>) -> Result<Vec<u64>, Error> {
trace!("find_ts_msp series {} {:?}", series, range);
// TODO use prepared statements
let cql = "select ts_msp from ts_msp where series = ? and ts_msp < ? order by ts_msp desc limit 1";
let res = scy.query(cql, (series, range.beg as i64)).await.err_conv()?;
let mut before = vec![];
for row in res.rows_typed_or_empty::<(i64,)>() {
let row = row.err_conv()?;
before.push(row.0 as u64);
}
trace!("FOUND BEFORE THE REQUESTED TIME: {} {:?}", before.len(), before);
let cql = "select ts_msp from ts_msp where series = ? and ts_msp >= ? and ts_msp < ?";
let res = scy
.query(cql, (series, range.beg as i64, range.end as i64))
.await
.err_conv()?;
let mut ret = vec![];
for x in before {
ret.push(x);
}
for row in res.rows_typed_or_empty::<(i64,)>() {
let row = row.err_conv()?;
ret.push(row.0 as u64);
}
trace!("found in total {} rows", ret.len());
Ok(ret)
}
macro_rules! read_next_scalar_values {
($fname:ident, $st:ty, $scyty:ty, $table_name:expr) => {
async fn $fname(
series: i64,
ts_msp: u64,
range: NanoRange,
scy: Arc<ScySession>,
) -> Result<ScalarEvents<$st>, Error> {
type ST = $st;
type SCYTY = $scyty;
trace!("{} series {} ts_msp {}", stringify!($fname), series, ts_msp);
let _ts_lsp_max = if range.end <= ts_msp {
// TODO we should not be here...
} else {
};
if range.end > i64::MAX as u64 {
return Err(Error::with_msg_no_trace(format!("range.end overflows i64")));
}
let ts_lsp_max = range.end;
let cql = concat!(
"select ts_lsp, pulse, value from ",
$table_name,
" where series = ? and ts_msp = ? and ts_lsp < ?"
);
let res = scy
.query(cql, (series, ts_msp as i64, ts_lsp_max as i64))
.await
.err_conv()?;
let mut ret = ScalarEvents::<ST>::empty();
let mut discarded = 0;
for row in res.rows_typed_or_empty::<(i64, i64, SCYTY)>() {
let row = row.err_conv()?;
let ts = ts_msp + row.0 as u64;
let pulse = row.1 as u64;
let value = row.2 as ST;
if ts < range.beg || ts >= range.end {
discarded += 1;
} else {
ret.push(ts, pulse, value);
}
}
trace!(
"found in total {} events ts_msp {} discarded {}",
ret.tss.len(),
ts_msp,
discarded
);
Ok(ret)
}
};
}
macro_rules! read_next_array_values {
($fname:ident, $st:ty, $scyty:ty, $table_name:expr) => {
async fn $fname(
series: i64,
ts_msp: u64,
_range: NanoRange,
scy: Arc<ScySession>,
) -> Result<WaveEvents<$st>, Error> {
type ST = $st;
type SCYTY = $scyty;
info!("{} series {} ts_msp {}", stringify!($fname), series, ts_msp);
let cql = concat!(
"select ts_lsp, pulse, value from ",
$table_name,
" where series = ? and ts_msp = ?"
);
let res = scy.query(cql, (series, ts_msp as i64)).await.err_conv()?;
let mut ret = WaveEvents::<ST>::empty();
for row in res.rows_typed_or_empty::<(i64, i64, Vec<SCYTY>)>() {
let row = row.err_conv()?;
let ts = ts_msp + row.0 as u64;
let pulse = row.1 as u64;
let value = row.2.into_iter().map(|x| x as ST).collect();
ret.push(ts, pulse, value);
}
info!("found in total {} events ts_msp {}", ret.tss.len(), ts_msp);
Ok(ret)
}
};
}
read_next_scalar_values!(read_next_values_scalar_i32, i32, i32, "events_scalar_i32");
read_next_scalar_values!(read_next_values_scalar_f32, f32, f32, "events_scalar_f32");
read_next_scalar_values!(read_next_values_scalar_f64, f64, f64, "events_scalar_f64");
read_next_array_values!(read_next_values_array_u16, u16, i16, "events_wave_u16");
pub async fn make_scylla_stream(
evq: &RawEventsQuery,
scyco: &ScyllaConfig,
dbconf: Database,
do_test_stream_error: bool,
) -> Result<Pin<Box<dyn Stream<Item = Box<dyn Framable>> + Send>>, Error> {
info!("make_scylla_stream open scylla connection");
// TODO reuse existing connection:
let scy = scylla::SessionBuilder::new()
.known_nodes(&scyco.hosts)
.use_keyspace(&scyco.keyspace, true)
.build()
.await
.err_conv()?;
let u = {
let d = &dbconf;
format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, d.port, d.name)
};
let (pgclient, pgconn) = tokio_postgres::connect(&u, tokio_postgres::NoTls).await.err_conv()?;
// TODO use common connection/pool:
tokio::spawn(pgconn);
let pgclient = Arc::new(pgclient);
let scy = Arc::new(scy);
let res = Box::pin(ScyllaFramableStream::new(evq, scy, pgclient, do_test_stream_error)) as _;
Ok(res)
}
pub async fn make_scylla_stream_2(
evq: &RawEventsQuery,
scyco: &ScyllaConfig,
) -> Result<Pin<Box<dyn Stream<Item = Box<dyn Framable>> + Send>>, Error> {
// Find the "series" id.
info!("make_scylla_stream finding series id");
let scy = scylla::SessionBuilder::new()
.known_nodes(&scyco.hosts)
.use_keyspace(&scyco.keyspace, true)
.build()
.await
.err_conv()?;
let res = {
let cql =
"select series, scalar_type, shape_dims from series_by_channel where facility = ? and channel_name = ?";
scy.query(cql, (&evq.channel.backend, evq.channel.name()))
.await
.err_conv()?
};
let rows: Vec<_> = res.rows_typed_or_empty::<(i64, i32, Vec<i32>)>().collect();
if rows.len() > 1 {
error!("Multiple series found for channel, can not return data for ambiguous series");
return Err(Error::with_public_msg_no_trace(
"Multiple series found for channel, can not return data for ambiguous series",
));
}
if rows.len() < 1 {
return Err(Error::with_public_msg_no_trace(
"Multiple series found for channel, can not return data for ambiguous series",
));
}
let row = rows
.into_iter()
.next()
.ok_or_else(|| Error::with_public_msg_no_trace(format!("can not find series for channel")))?
.err_conv()?;
info!("make_scylla_stream row {row:?}");
let series = row.0;
info!("make_scylla_stream series {series}");
let _expand = evq.agg_kind.need_expand();
let range = &evq.range;
{
let cql = "select ts_msp from ts_msp where series = ? and ts_msp >= ? and ts_msp < ?";
let res = scy
.query(cql, (series, range.beg as i64, range.end as i64))
.await
.err_conv()?;
let mut rc = 0;
for _row in res.rows_or_empty() {
rc += 1;
}
info!("found in total {} rows", rc);
}
error!("TODO scylla fetch continue here");
let res = Box::pin(futures_util::stream::empty());
Ok(res)
}