WIP refactor frame type id, it type checks

This commit is contained in:
Dominik Werder
2022-06-23 13:33:07 +02:00
parent c046303c7f
commit 66215f583f
29 changed files with 453 additions and 255 deletions

View File

@@ -1,8 +1,8 @@
use crate::events_scylla::ScyllaFramableStream;
use crate::events_scylla::EventsStreamScylla;
use crate::ErrConv;
use err::Error;
use futures_util::{Future, Stream, StreamExt};
use items::TimeBinned;
use items::{TimeBinnableDyn, TimeBinned};
use netpod::log::*;
use netpod::query::RawEventsQuery;
use netpod::{
@@ -142,13 +142,15 @@ pub async fn fetch_uncached_binned_events(
// TODO ask Scylla directly, do not go through HTTP.
// Refactor the event fetch stream code such that I can use that easily here.
let evq = RawEventsQuery::new(chn.channel.clone(), range.clone(), AggKind::Plain);
let _res = Box::pin(ScyllaFramableStream::new(
let _res = Box::pin(EventsStreamScylla::new(
&evq,
chn.scalar_type.clone(),
chn.shape.clone(),
scy,
false,
));
// TODO add the time binner.
// TODO return the result of the binning procedure.
// TODO ScyllaFramableStream must return a new events trait object designed for trait object use.
err::todoval()
}

View File

@@ -1,14 +1,12 @@
use crate::ErrConv;
use err::Error;
use futures_core::{Future, Stream};
use futures_util::FutureExt;
use futures_util::{Future, FutureExt, Stream};
use items::scalarevents::ScalarEvents;
use items::waveevents::WaveEvents;
use items::{Framable, RangeCompletableItem, StreamItem};
use items::{EventsDyn, RangeCompletableItem, Sitemty, StreamItem};
use netpod::log::*;
use netpod::query::RawEventsQuery;
use netpod::{Database, NanoRange, ScalarType, ScyllaConfig, Shape};
use scylla::frame::response::cql_to_rust::FromRowError as ScyFromRowError;
use scylla::transport::errors::{NewSessionError as ScyNewSessionError, QueryError as ScyQueryError};
use scylla::Session as ScySession;
use std::collections::VecDeque;
use std::pin::Pin;
@@ -16,65 +14,20 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use tokio_postgres::Client as PgClient;
trait ErrConv<T> {
fn err_conv(self) -> Result<T, Error>;
}
impl<T> ErrConv<T> for Result<T, ScyQueryError> {
fn err_conv(self) -> Result<T, Error> {
match self {
Ok(k) => Ok(k),
Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))),
}
}
}
impl<T> ErrConv<T> for Result<T, ScyNewSessionError> {
fn err_conv(self) -> Result<T, Error> {
match self {
Ok(k) => Ok(k),
Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))),
}
}
}
impl<T> ErrConv<T> for Result<T, ScyFromRowError> {
fn err_conv(self) -> Result<T, Error> {
match self {
Ok(k) => Ok(k),
Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))),
}
}
}
impl<T> ErrConv<T> for Result<T, tokio_postgres::Error> {
fn err_conv(self) -> Result<T, Error> {
match self {
Ok(k) => Ok(k),
Err(e) => Err(Error::with_msg_no_trace(format!("{e:?}"))),
}
}
}
macro_rules! impl_read_values_fut {
($fname:ident, $self:expr, $ts_msp:expr) => {{
let fut = $fname($self.series, $ts_msp, $self.range.clone(), $self.scy.clone());
let fut = fut.map(|x| {
let x2 = match x {
match x {
Ok(k) => {
//
Ok(StreamItem::DataItem(RangeCompletableItem::Data(k)))
// TODO why static needed?
let b = Box::new(k) as Box<dyn EventsDyn + 'static>;
Ok(b)
}
Err(e) => {
//
Err(e)
}
};
//Box::new(Ok(StreamItem::DataItem(RangeCompletableItem::Data(x)))) as Box<dyn Framable + Send>});
let ret = Box::new(x2) as Box<dyn Framable + 'static>;
ret
Err(e) => Err(e),
}
});
let fut = Box::pin(fut) as Pin<Box<dyn Future<Output = Box<dyn Framable>> + Send>>;
let fut = Box::pin(fut) as Pin<Box<dyn Future<Output = Result<Box<dyn EventsDyn>, Error>> + Send>>;
fut
}};
}
@@ -85,7 +38,7 @@ struct ReadValues {
shape: Shape,
range: NanoRange,
ts_msp: VecDeque<u64>,
fut: Pin<Box<dyn Future<Output = Box<dyn Framable>> + Send>>,
fut: Pin<Box<dyn Future<Output = Result<Box<dyn EventsDyn>, Error>> + Send>>,
scy: Arc<ScySession>,
}
@@ -122,7 +75,7 @@ impl ReadValues {
&mut self,
ts_msp: u64,
_has_more_msp: bool,
) -> Pin<Box<dyn Future<Output = Box<dyn Framable>> + Send>> {
) -> Pin<Box<dyn Future<Output = Result<Box<dyn EventsDyn>, Error>> + Send>> {
// TODO this also needs to differentiate on Shape.
let fut = match &self.shape {
Shape::Scalar => match &self.scalar_type {
@@ -156,7 +109,7 @@ enum FrState {
Done,
}
pub struct ScyllaFramableStream {
pub struct EventsStreamScylla {
state: FrState,
#[allow(unused)]
evq: RawEventsQuery,
@@ -168,7 +121,7 @@ pub struct ScyllaFramableStream {
do_test_stream_error: bool,
}
impl ScyllaFramableStream {
impl EventsStreamScylla {
pub fn new(
evq: &RawEventsQuery,
scalar_type: ScalarType,
@@ -189,17 +142,15 @@ impl ScyllaFramableStream {
}
}
impl Stream for ScyllaFramableStream {
type Item = Box<dyn Framable>;
impl Stream for EventsStreamScylla {
type Item = Sitemty<Box<dyn EventsDyn>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
if self.do_test_stream_error {
let e = Error::with_msg(format!("Test PRIVATE STREAM error."))
.add_public_msg(format!("Test PUBLIC STREAM error."));
return Ready(Some(
Box::new(Err::<StreamItem<RangeCompletableItem<ScalarEvents<f32>>>, _>(e)) as _,
));
return Ready(Some(Err(e)));
}
loop {
break match self.state {
@@ -231,21 +182,20 @@ impl Stream for ScyllaFramableStream {
}
Ready(Err(e)) => {
self.state = FrState::Done;
Ready(Some(Box::new(
Err(e) as Result<StreamItem<RangeCompletableItem<ScalarEvents<f32>>>, _>
)))
Ready(Some(Err(e)))
}
Pending => Pending,
},
FrState::ReadValues(ref mut st) => match st.fut.poll_unpin(cx) {
Ready(item) => {
Ready(Ok(item)) => {
if st.next() {
} else {
info!("ReadValues exhausted");
self.state = FrState::Done;
}
Ready(Some(item))
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))))
}
Ready(Err(e)) => Ready(Some(Err(e))),
Pending => Pending,
},
FrState::Done => Ready(None),
@@ -408,7 +358,7 @@ pub async fn make_scylla_stream(
scyco: &ScyllaConfig,
dbconf: Database,
do_test_stream_error: bool,
) -> Result<Pin<Box<dyn Stream<Item = Box<dyn Framable>> + Send>>, Error> {
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<Box<dyn EventsDyn>>> + Send>>, Error> {
info!("make_scylla_stream open scylla connection");
// TODO should RawEventsQuery already contain ScalarType and Shape?
let (scalar_type, shape) = {
@@ -430,7 +380,7 @@ pub async fn make_scylla_stream(
.await
.err_conv()?;
let scy = Arc::new(scy);
let res = Box::pin(ScyllaFramableStream::new(
let res = Box::pin(EventsStreamScylla::new(
evq,
scalar_type,
shape,
@@ -439,62 +389,3 @@ pub async fn make_scylla_stream(
)) as _;
Ok(res)
}
#[allow(unused)]
async fn _make_scylla_stream_2(
evq: &RawEventsQuery,
scyco: &ScyllaConfig,
) -> Result<Pin<Box<dyn Stream<Item = Box<dyn Framable>> + Send>>, Error> {
// Find the "series" id.
info!("make_scylla_stream finding series id");
let scy = scylla::SessionBuilder::new()
.known_nodes(&scyco.hosts)
.use_keyspace(&scyco.keyspace, true)
.build()
.await
.err_conv()?;
let res = {
let cql =
"select series, scalar_type, shape_dims from series_by_channel where facility = ? and channel_name = ?";
scy.query(cql, (&evq.channel.backend, evq.channel.name()))
.await
.err_conv()?
};
let rows: Vec<_> = res.rows_typed_or_empty::<(i64, i32, Vec<i32>)>().collect();
if rows.len() > 1 {
error!("Multiple series found for channel, can not return data for ambiguous series");
return Err(Error::with_public_msg_no_trace(
"Multiple series found for channel, can not return data for ambiguous series",
));
}
if rows.len() < 1 {
return Err(Error::with_public_msg_no_trace(
"Multiple series found for channel, can not return data for ambiguous series",
));
}
let row = rows
.into_iter()
.next()
.ok_or_else(|| Error::with_public_msg_no_trace(format!("can not find series for channel")))?
.err_conv()?;
info!("make_scylla_stream row {row:?}");
let series = row.0;
info!("make_scylla_stream series {series}");
let _expand = evq.agg_kind.need_expand();
let range = &evq.range;
{
let cql = "select ts_msp from ts_msp where series = ? and ts_msp >= ? and ts_msp < ?";
let res = scy
.query(cql, (series, range.beg as i64, range.end as i64))
.await
.err_conv()?;
let mut rc = 0;
for _row in res.rows_or_empty() {
rc += 1;
}
info!("found in total {} rows", rc);
}
error!("TODO scylla fetch continue here");
let res = Box::pin(futures_util::stream::empty());
Ok(res)
}