Read back array data from value blob

This commit is contained in:
Dominik Werder
2024-05-17 22:26:29 +02:00
parent 51bafd4681
commit 437e6d0d76
9 changed files with 375 additions and 150 deletions

View File

@@ -64,7 +64,7 @@ async fn go() -> Result<(), Error> {
};
match opts.subcmd {
SubCmd::Retrieval(subcmd) => {
info!("daqbuffer version {} +0002", clap::crate_version!());
info!("daqbuffer version {} +0003", clap::crate_version!());
info!(" service_version {}", service_version);
if false {
#[allow(non_snake_case)]

View File

@@ -15,7 +15,6 @@ use httpclient::Requ;
use httpclient::StreamResponse;
use netpod::log::*;
use netpod::NodeConfigCached;
use netpod::ServiceVersion;
use std::sync::Arc;
#[derive(Debug, ThisError)]
@@ -85,7 +84,7 @@ impl EventDataHandler {
.await
.map_err(|_| EventDataError::InternalError)?;
let (evsubq,) = nodenet::conn::events_parse_input_query(frames).map_err(|_| EventDataError::QueryParse)?;
let stream = nodenet::conn::create_response_bytes_stream(evsubq, ncc)
let stream = nodenet::conn::create_response_bytes_stream(evsubq, shared_res.scyqueue.as_ref(), ncc)
.await
.map_err(|e| EventDataError::Error(Box::new(e)))?;
let ret = response(StatusCode::OK)

View File

@@ -47,6 +47,8 @@ use netpod::APP_JSON;
use panic::AssertUnwindSafe;
use panic::UnwindSafe;
use pin::Pin;
use scyllaconn::worker::ScyllaQueue;
use scyllaconn::worker::ScyllaWorker;
use serde::Deserialize;
use serde::Serialize;
use std::net;
@@ -102,11 +104,12 @@ impl ::err::ToErr for RetrievalError {
pub struct ServiceSharedResources {
pgqueue: PgQueue,
scyqueue: Option<ScyllaQueue>,
}
impl ServiceSharedResources {
pub fn new(pgqueue: PgQueue) -> Self {
Self { pgqueue }
pub fn new(pgqueue: PgQueue, scyqueue: Option<ScyllaQueue>) -> Self {
Self { pgqueue, scyqueue }
}
}
@@ -119,7 +122,21 @@ pub async fn host(ncc: NodeConfigCached, service_version: ServiceVersion) -> Res
// let rawjh = taskrun::spawn(nodenet::conn::events_service(node_config.clone()));
let (pgqueue, pgworker) = PgWorker::new(&ncc.node_config.cluster.database).await?;
let pgworker_jh = taskrun::spawn(pgworker.work());
let shared_res = ServiceSharedResources::new(pgqueue);
let scyqueue = if let (Some(st), Some(mt), Some(lt)) = (
ncc.node_config.cluster.scylla_st(),
ncc.node_config.cluster.scylla_mt(),
ncc.node_config.cluster.scylla_lt(),
) {
let (scyqueue, scylla_worker) = ScyllaWorker::new(st, mt, lt).await.map_err(|e| {
error!("{e}");
RetrievalError::TextError(e.to_string())
})?;
let scylla_worker_jh = taskrun::spawn(scylla_worker.work());
Some(scyqueue)
} else {
None
};
let shared_res = ServiceSharedResources::new(pgqueue, scyqueue);
let shared_res = Arc::new(shared_res);
use std::str::FromStr;
let bind_addr = SocketAddr::from_str(&format!("{}:{}", ncc.node.listen(), ncc.node.port))?;
@@ -136,7 +153,6 @@ pub async fn host(ncc: NodeConfigCached, service_version: ServiceVersion) -> Res
let service_version = service_version.clone();
let io = TokioIo::new(stream);
let shared_res = shared_res.clone();
// let shared_res = &shared_res;
tokio::task::spawn(async move {
let res = hyper::server::conn::http1::Builder::new()
.serve_connection(

View File

@@ -17,7 +17,6 @@ use items_2::empty::empty_events_dyn_ev;
use items_2::framable::EventQueryJsonStringFrame;
use items_2::framable::Framable;
use items_2::frame::decode_frame;
use items_2::frame::make_error_frame;
use items_2::frame::make_term_frame;
use items_2::inmem::InMemoryFrame;
use netpod::histo::HistoLog2;
@@ -26,6 +25,7 @@ use netpod::NodeConfigCached;
use netpod::ReqCtxArc;
use query::api4::events::EventsSubQuery;
use query::api4::events::Frame1Parts;
use scyllaconn::worker::ScyllaQueue;
use std::net::SocketAddr;
use std::pin::Pin;
use streams::frames::inmem::BoxedBytesStream;
@@ -45,13 +45,14 @@ mod test;
#[derive(Debug, ThisError)]
pub enum NodeNetError {}
pub async fn events_service(node_config: NodeConfigCached) -> Result<(), Error> {
let addr = format!("{}:{}", node_config.node.listen(), node_config.node.port_raw);
pub async fn events_service(ncc: NodeConfigCached) -> Result<(), Error> {
let scyqueue = err::todoval();
let addr = format!("{}:{}", ncc.node.listen(), ncc.node.port_raw);
let lis = tokio::net::TcpListener::bind(addr).await?;
loop {
match lis.accept().await {
Ok((stream, addr)) => {
taskrun::spawn(events_conn_handler(stream, addr, node_config.clone()));
taskrun::spawn(events_conn_handler(stream, addr, scyqueue, ncc.clone()));
}
Err(e) => Err(e)?,
}
@@ -76,15 +77,16 @@ impl<E: Into<Error>> From<(E, OwnedWriteHalf)> for ConnErr {
async fn make_channel_events_stream_data(
subq: EventsSubQuery,
reqctx: ReqCtxArc,
scyqueue: Option<&ScyllaQueue>,
ncc: &NodeConfigCached,
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>>, Error> {
if subq.backend() == TEST_BACKEND {
let node_count = ncc.node_config.cluster.nodes.len() as u64;
let node_ix = ncc.ix as u64;
streams::generators::make_test_channel_events_stream_data(subq, node_count, node_ix)
} else if let Some(scyconf) = &ncc.node_config.cluster.scylla_st() {
} else if let Some(scyqueue) = scyqueue {
let cfg = subq.ch_conf().to_scylla()?;
scylla_channel_event_stream(subq, cfg, scyconf, ncc).await
scylla_channel_event_stream(subq, cfg, scyqueue).await
} else if let Some(_) = &ncc.node.channel_archiver {
let e = Error::with_msg_no_trace("archapp not built");
Err(e)
@@ -100,11 +102,12 @@ async fn make_channel_events_stream_data(
async fn make_channel_events_stream(
subq: EventsSubQuery,
reqctx: ReqCtxArc,
scyqueue: Option<&ScyllaQueue>,
ncc: &NodeConfigCached,
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>>, Error> {
let empty = empty_events_dyn_ev(subq.ch_conf().scalar_type(), subq.ch_conf().shape())?;
let empty = sitem_data(ChannelEvents::Events(empty));
let stream = make_channel_events_stream_data(subq, reqctx, ncc).await?;
let stream = make_channel_events_stream_data(subq, reqctx, scyqueue, ncc).await?;
let ret = futures_util::stream::iter([empty]).chain(stream);
let ret = Box::pin(ret);
Ok(ret)
@@ -112,6 +115,7 @@ async fn make_channel_events_stream(
pub async fn create_response_bytes_stream(
evq: EventsSubQuery,
scyqueue: Option<&ScyllaQueue>,
ncc: &NodeConfigCached,
) -> Result<BoxedBytesStream, Error> {
debug!(
@@ -136,7 +140,7 @@ pub async fn create_response_bytes_stream(
Ok(ret)
} else {
let mut tr = build_event_transform(evq.transform())?;
let stream = make_channel_events_stream(evq, reqctx, ncc).await?;
let stream = make_channel_events_stream(evq, reqctx, scyqueue, ncc).await?;
let stream = stream.map(move |x| {
on_sitemty_data!(x, |x: ChannelEvents| {
match x {
@@ -162,9 +166,10 @@ pub async fn create_response_bytes_stream(
async fn events_conn_handler_with_reqid(
mut netout: OwnedWriteHalf,
evq: EventsSubQuery,
scyqueue: Option<&ScyllaQueue>,
ncc: &NodeConfigCached,
) -> Result<(), ConnErr> {
let mut stream = match create_response_bytes_stream(evq, ncc).await {
let mut stream = match create_response_bytes_stream(evq, scyqueue, ncc).await {
Ok(x) => x,
Err(e) => return Err((e, netout))?,
};
@@ -283,6 +288,7 @@ async fn events_conn_handler_inner_try<INP>(
netin: INP,
netout: OwnedWriteHalf,
addr: SocketAddr,
scyqueue: Option<&ScyllaQueue>,
ncc: &NodeConfigCached,
) -> Result<(), ConnErr>
where
@@ -300,19 +306,22 @@ where
debug!("events_conn_handler sees: {evq:?}");
let reqid = evq.reqid();
let span = tracing::info_span!("subreq", reqid = reqid);
events_conn_handler_with_reqid(netout, evq, ncc).instrument(span).await
events_conn_handler_with_reqid(netout, evq, scyqueue, ncc)
.instrument(span)
.await
}
async fn events_conn_handler_inner<INP>(
netin: INP,
netout: OwnedWriteHalf,
addr: SocketAddr,
node_config: &NodeConfigCached,
scyqueue: Option<&ScyllaQueue>,
ncc: &NodeConfigCached,
) -> Result<(), Error>
where
INP: Stream<Item = Result<Bytes, Error>> + Unpin,
{
match events_conn_handler_inner_try(netin, netout, addr, node_config).await {
match events_conn_handler_inner_try(netin, netout, addr, scyqueue, ncc).await {
Ok(_) => (),
Err(ce) => {
let mut out = ce.netout;
@@ -324,11 +333,16 @@ where
Ok(())
}
async fn events_conn_handler(stream: TcpStream, addr: SocketAddr, node_config: NodeConfigCached) -> Result<(), Error> {
async fn events_conn_handler(
stream: TcpStream,
addr: SocketAddr,
scyqueue: Option<&ScyllaQueue>,
ncc: NodeConfigCached,
) -> Result<(), Error> {
let (netin, netout) = stream.into_split();
let inp = Box::new(TcpReadAsBytes::new(netin));
let span1 = span!(Level::INFO, "events_conn_handler");
let r = events_conn_handler_inner(inp, netout, addr, &node_config)
let r = events_conn_handler_inner(inp, netout, addr, scyqueue, &ncc)
.instrument(span1)
.await;
match r {

View File

@@ -88,7 +88,8 @@ fn raw_data_00() {
let frame1 = Frame1Parts::new(qu.clone());
let query = EventQueryJsonStringFrame(serde_json::to_string(&frame1).unwrap());
let frame = sitem_data(query).make_frame()?;
let jh = taskrun::spawn(events_conn_handler(client, addr, cfg));
let scyqueue = err::todoval();
let jh = taskrun::spawn(events_conn_handler(client, addr, scyqueue, cfg));
con.write_all(&frame).await.unwrap();
eprintln!("written");
con.shutdown().await.unwrap();

View File

@@ -7,23 +7,20 @@ use items_0::streamitem::StreamItem;
use items_2::channelevents::ChannelEvents;
use netpod::log::*;
use netpod::ChConf;
use netpod::NodeConfigCached;
use netpod::ScyllaConfig;
use query::api4::events::EventsSubQuery;
use scyllaconn::worker::ScyllaQueue;
use std::pin::Pin;
use taskrun::tokio;
pub async fn scylla_channel_event_stream(
evq: EventsSubQuery,
chconf: ChConf,
scyco: &ScyllaConfig,
_ncc: &NodeConfigCached,
scyqueue: &ScyllaQueue,
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>>, Error> {
// TODO depends in general on the query
// TODO why both in PlainEventsQuery and as separate parameter? Check other usages.
// let do_one_before_range = evq.need_one_before_range();
let do_one_before_range = false;
// TODO use better builder pattern with shortcuts for production and dev defaults
let scy = scyllaconn::conn::create_scy_session(scyco).await?;
let series = chconf.series();
let scalar_type = chconf.scalar_type();
let shape = chconf.shape();
@@ -37,7 +34,7 @@ pub async fn scylla_channel_event_stream(
scalar_type.clone(),
shape.clone(),
with_values,
scy,
scyqueue.clone(),
do_test_stream_error,
);
let stream = stream

View File

@@ -1,5 +1,4 @@
use crate::errconv::ErrConv;
use crate::events::EventsStreamScylla;
use err::Error;
use futures_util::Future;
use futures_util::Stream;

View File

@@ -1,5 +1,6 @@
use crate::errconv::ErrConv;
use crate::range::ScyllaSeriesRange;
use crate::worker::ScyllaQueue;
use err::Error;
use futures_util::Future;
use futures_util::FutureExt;
@@ -20,7 +21,10 @@ use netpod::Shape;
use netpod::TsMs;
use netpod::TsNano;
use scylla::frame::response::result::Row;
use scylla::prepared_statement::PreparedStatement;
use scylla::Session;
use scylla::Session as ScySession;
use std::collections::BTreeMap;
use std::collections::VecDeque;
use std::mem;
use std::pin::Pin;
@@ -28,61 +32,131 @@ use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
async fn find_ts_msp(
#[derive(Debug)]
pub struct StmtsEventsRt {
ts_msp_bck: PreparedStatement,
ts_msp_fwd: PreparedStatement,
read_value_queries: BTreeMap<String, PreparedStatement>,
}
impl StmtsEventsRt {
pub(super) async fn new(rtpre: &str, scy: &Session) -> Result<Self, Error> {
let cql = format!(
"select ts_msp from {}{} where series = ? and ts_msp < ? order by ts_msp desc limit 2",
rtpre, "ts_msp"
);
let ts_msp_bck = scy.prepare(cql).await.err_conv()?;
let cql = format!(
"select ts_msp from {}{} where series = ? and ts_msp >= ? and ts_msp < ?",
rtpre, "ts_msp"
);
let ts_msp_fwd = scy.prepare(cql).await.err_conv()?;
let mut read_value_queries = BTreeMap::new();
for sct in [
"u8", "u16", "u32", "u64", "i8", "i16", "i32", "i64", "f32", "f64", "bool", "string",
] {
let combinations = [
("timestamps", "scalar", "ts_lsp, pulse"),
("timestamps", "array", "ts_lsp, pulse"),
("values", "scalar", "ts_lsp, pulse, value"),
("valueblobs", "array", "ts_lsp, pulse, valueblob"),
];
for com in combinations {
let query_name = format!("{}_{}_{}_fwd", com.1, sct, com.0);
let cql = format!(
concat!(
"select {} from {}events_{}_{}",
" where series = ? and ts_msp = ? and ts_lsp >= ? and ts_lsp < ?"
),
com.2, rtpre, com.1, sct,
);
let qu = scy.prepare(cql).await.err_conv()?;
read_value_queries.insert(query_name, qu);
let query_name = format!("{}_{}_{}_bck", com.1, sct, com.0);
let cql = format!(
concat!(
"select {} from {}events_{}_{}",
" where series = ? and ts_msp = ? and ts_lsp < ? order by ts_lsp desc limit 1"
),
com.2, rtpre, com.1, sct,
);
let qu = scy.prepare(cql).await.err_conv()?;
read_value_queries.insert(query_name, qu);
}
}
let ret = Self {
ts_msp_bck,
ts_msp_fwd,
read_value_queries,
};
Ok(ret)
}
}
pub(super) async fn find_ts_msp_worker(
series: u64,
range: ScyllaSeriesRange,
scy: Arc<ScySession>,
stmts: &StmtsEventsRt,
scy: &ScySession,
) -> Result<(VecDeque<TsMs>, VecDeque<TsMs>), Error> {
trace!("find_ts_msp series {:?} {:?}", series, range);
let mut ret1 = VecDeque::new();
let mut ret2 = VecDeque::new();
// TODO use prepared statements
let cql = "select ts_msp from st_ts_msp where series = ? and ts_msp < ? order by ts_msp desc limit 2";
let params = (series as i64, range.beg().ms() as i64);
trace!("find_ts_msp query 1 params {:?}", params);
let res = scy.query(cql, params).await.err_conv()?;
for row in res.rows_typed_or_empty::<(i64,)>() {
let row = row.err_conv()?;
let mut res = scy
.execute_iter(stmts.ts_msp_bck.clone(), params)
.await
.err_conv()?
.into_typed::<(i64,)>();
while let Some(x) = res.next().await {
let row = x.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
let ts = TsMs::from_ms_u64(row.0 as u64);
trace!("query 1 ts_msp {}", ts);
ret1.push_front(ts);
}
let cql = "select ts_msp from st_ts_msp where series = ? and ts_msp >= ? and ts_msp < ?";
let params = (series as i64, range.beg().ms() as i64, 1 + range.end().ms() as i64);
trace!("find_ts_msp query 2 params {:?}", params);
let res = scy.query(cql, params).await.err_conv()?;
for row in res.rows_typed_or_empty::<(i64,)>() {
let row = row.err_conv()?;
let mut res = scy
.execute_iter(stmts.ts_msp_fwd.clone(), params)
.await
.err_conv()?
.into_typed::<(i64,)>();
while let Some(x) = res.next().await {
let row = x.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
let ts = TsMs::from_ms_u64(row.0 as u64);
trace!("query 2 ts_msp {}", ts);
ret2.push_back(ts);
}
let cql = "select ts_msp from st_ts_msp where series = ? and ts_msp >= ? limit 1";
let params = (series as i64, range.end().ms() as i64);
trace!("find_ts_msp query 3 params {:?}", params);
let res = scy.query(cql, params).await.err_conv()?;
for row in res.rows_typed_or_empty::<(i64,)>() {
let row = row.err_conv()?;
let ts = TsMs::from_ms_u64(row.0 as u64);
trace!("query 3 ts_msp {}", ts);
ret2.push_back(ts);
ret2.push_front(ts);
}
// let cql = "select ts_msp from st_ts_msp where series = ? and ts_msp >= ? limit 1";
// let params = (series as i64, range.end().ms() as i64);
// trace!("find_ts_msp query 3 params {:?}", params);
// let res = scy.query(cql, params).await.err_conv()?;
// for row in res.rows_typed_or_empty::<(i64,)>() {
// let row = row.err_conv()?;
// let ts = TsMs::from_ms_u64(row.0 as u64);
// trace!("query 3 ts_msp {}", ts);
// ret2.push_back(ts);
// }
trace!("find_ts_msp n1 {:?} n2 {:?}", ret1.len(), ret2.len());
Ok((ret1, ret2))
}
trait ValTy: Sized {
trait ValTy: Sized + 'static {
type ScaTy: ScalarOps + std::default::Default;
type ScyTy: scylla::cql_to_rust::FromCqlVal<scylla::frame::response::result::CqlValue>;
type Container: Events + Appendable<Self>;
fn from_scyty(inp: Self::ScyTy) -> Self;
fn from_valueblob(inp: Vec<u8>) -> Self;
fn table_name() -> &'static str;
fn default() -> Self;
fn is_valueblob() -> bool;
fn st_name() -> &'static str;
}
macro_rules! impl_scaty_scalar {
($st:ty, $st_scy:ty, $table_name:expr) => {
($st:ty, $st_scy:ty, $st_name:expr, $table_name:expr) => {
impl ValTy for $st {
type ScaTy = $st;
type ScyTy = $st_scy;
@@ -90,6 +164,9 @@ macro_rules! impl_scaty_scalar {
fn from_scyty(inp: Self::ScyTy) -> Self {
inp as Self
}
fn from_valueblob(_inp: Vec<u8>) -> Self {
<Self as ValTy>::default()
}
fn table_name() -> &'static str {
$table_name
}
@@ -99,12 +176,15 @@ macro_rules! impl_scaty_scalar {
fn is_valueblob() -> bool {
false
}
fn st_name() -> &'static str {
$st_name
}
}
};
}
macro_rules! impl_scaty_array {
($vt:ty, $st:ty, $st_scy:ty, $table_name:expr) => {
($vt:ty, $st:ty, $st_scy:ty, $st_name:expr, $table_name:expr) => {
impl ValTy for $vt {
type ScaTy = $st;
type ScyTy = $st_scy;
@@ -112,6 +192,22 @@ macro_rules! impl_scaty_array {
fn from_scyty(inp: Self::ScyTy) -> Self {
inp.into_iter().map(|x| x as Self::ScaTy).collect()
}
fn from_valueblob(inp: Vec<u8>) -> Self {
if inp.len() < 32 {
<Self as ValTy>::default()
} else {
let en = std::mem::size_of::<Self::ScaTy>();
let n = (inp.len().max(32) - 32) / en;
let mut c = Vec::with_capacity(n);
for i in 0..n {
let r1 = &inp[32 + en * (0 + i)..32 + en * (1 + i)];
let p1 = r1 as *const _ as *const $st;
let v1 = unsafe { p1.read_unaligned() };
c.push(v1);
}
c
}
}
fn table_name() -> &'static str {
$table_name
}
@@ -121,35 +217,38 @@ macro_rules! impl_scaty_array {
fn is_valueblob() -> bool {
true
}
fn st_name() -> &'static str {
$st_name
}
}
};
}
impl_scaty_scalar!(u8, i8, "st_events_scalar_u8");
impl_scaty_scalar!(u16, i16, "st_events_scalar_u16");
impl_scaty_scalar!(u32, i32, "st_events_scalar_u32");
impl_scaty_scalar!(u64, i64, "st_events_scalar_u64");
impl_scaty_scalar!(i8, i8, "st_events_scalar_i8");
impl_scaty_scalar!(i16, i16, "st_events_scalar_i16");
impl_scaty_scalar!(i32, i32, "st_events_scalar_i32");
impl_scaty_scalar!(i64, i64, "st_events_scalar_i64");
impl_scaty_scalar!(f32, f32, "st_events_scalar_f32");
impl_scaty_scalar!(f64, f64, "st_events_scalar_f64");
impl_scaty_scalar!(bool, bool, "st_events_scalar_bool");
impl_scaty_scalar!(String, String, "st_events_scalar_string");
impl_scaty_scalar!(u8, i8, "u8", "st_events_scalar_u8");
impl_scaty_scalar!(u16, i16, "u16", "st_events_scalar_u16");
impl_scaty_scalar!(u32, i32, "u32", "st_events_scalar_u32");
impl_scaty_scalar!(u64, i64, "u64", "st_events_scalar_u64");
impl_scaty_scalar!(i8, i8, "i8", "st_events_scalar_i8");
impl_scaty_scalar!(i16, i16, "i16", "st_events_scalar_i16");
impl_scaty_scalar!(i32, i32, "i32", "st_events_scalar_i32");
impl_scaty_scalar!(i64, i64, "i64", "st_events_scalar_i64");
impl_scaty_scalar!(f32, f32, "f32", "st_events_scalar_f32");
impl_scaty_scalar!(f64, f64, "f64", "st_events_scalar_f64");
impl_scaty_scalar!(bool, bool, "bool", "st_events_scalar_bool");
impl_scaty_scalar!(String, String, "string", "st_events_scalar_string");
impl_scaty_array!(Vec<u8>, u8, Vec<i8>, "st_events_array_u8");
impl_scaty_array!(Vec<u16>, u16, Vec<i16>, "st_events_array_u16");
impl_scaty_array!(Vec<u32>, u32, Vec<i32>, "st_events_array_u32");
impl_scaty_array!(Vec<u64>, u64, Vec<i64>, "st_events_array_u64");
impl_scaty_array!(Vec<i8>, i8, Vec<i8>, "st_events_array_i8");
impl_scaty_array!(Vec<i16>, i16, Vec<i16>, "st_events_array_i16");
impl_scaty_array!(Vec<i32>, i32, Vec<i32>, "st_events_array_i32");
impl_scaty_array!(Vec<i64>, i64, Vec<i64>, "st_events_array_i64");
impl_scaty_array!(Vec<f32>, f32, Vec<f32>, "st_events_array_f32");
impl_scaty_array!(Vec<f64>, f64, Vec<f64>, "st_events_array_f64");
impl_scaty_array!(Vec<bool>, bool, Vec<bool>, "st_events_array_bool");
impl_scaty_array!(Vec<String>, String, Vec<String>, "st_events_array_string");
impl_scaty_array!(Vec<u8>, u8, Vec<i8>, "u8", "st_events_array_u8");
impl_scaty_array!(Vec<u16>, u16, Vec<i16>, "u16", "st_events_array_u16");
impl_scaty_array!(Vec<u32>, u32, Vec<i32>, "u32", "st_events_array_u32");
impl_scaty_array!(Vec<u64>, u64, Vec<i64>, "u64", "st_events_array_u64");
impl_scaty_array!(Vec<i8>, i8, Vec<i8>, "i8", "st_events_array_i8");
impl_scaty_array!(Vec<i16>, i16, Vec<i16>, "i16", "st_events_array_i16");
impl_scaty_array!(Vec<i32>, i32, Vec<i32>, "i32", "st_events_array_i32");
impl_scaty_array!(Vec<i64>, i64, Vec<i64>, "i64", "st_events_array_i64");
impl_scaty_array!(Vec<f32>, f32, Vec<f32>, "f32", "st_events_array_f32");
impl_scaty_array!(Vec<f64>, f64, Vec<f64>, "f64", "st_events_array_f64");
impl_scaty_array!(Vec<bool>, bool, Vec<bool>, "bool", "st_events_array_bool");
// impl_scaty_array!(Vec<String>, String, Vec<String>, "string", "st_events_array_string");
struct ReadNextValuesOpts {
series: u64,
@@ -157,33 +256,40 @@ struct ReadNextValuesOpts {
range: ScyllaSeriesRange,
fwd: bool,
with_values: bool,
scy: Arc<ScySession>,
scyqueue: ScyllaQueue,
}
async fn read_next_values<ST>(opts: ReadNextValuesOpts) -> Result<Box<dyn Events>, Error>
where
ST: ValTy,
{
debug!("read_next_values {} {}", opts.series, opts.ts_msp);
// TODO could take scyqeue out of opts struct.
let scyqueue = opts.scyqueue.clone();
let futgen = Box::new(|scy: Arc<ScySession>, stmts: Arc<StmtsEventsRt>| {
let fut = read_next_values_worker::<ST>(opts, scy, stmts);
Box::pin(fut) as Pin<Box<dyn Future<Output = Result<Box<dyn Events>, err::Error>> + Send>>
});
let res = scyqueue.read_next_values(futgen).await?;
Ok(res)
}
async fn read_next_values_worker<ST>(
opts: ReadNextValuesOpts,
scy: Arc<ScySession>,
stmts: Arc<StmtsEventsRt>,
) -> Result<Box<dyn Events>, Error>
where
ST: ValTy,
{
trace!("read_next_values_worker {} {}", opts.series, opts.ts_msp);
let series = opts.series;
let ts_msp = opts.ts_msp;
let range = opts.range;
let fwd = opts.fwd;
let scy = opts.scy;
let table_name = ST::table_name();
if range.end() > TsNano::from_ns(i64::MAX as u64) {
return Err(Error::with_msg_no_trace(format!("range.end overflows i64")));
}
let cql_fields = if opts.with_values {
if ST::is_valueblob() {
"ts_lsp, pulse, valueblob"
} else {
"ts_lsp, pulse, value"
}
} else {
"ts_lsp, pulse"
};
let ret = if fwd {
let ret = if opts.fwd {
let ts_lsp_min = if range.beg() > ts_msp.ns() {
range.beg().delta(ts_msp.ns())
} else {
@@ -201,14 +307,20 @@ where
ts_lsp_max,
table_name,
);
// TODO use prepared!
let cql = format!(
concat!(
"select {} from {}",
" where series = ? and ts_msp = ? and ts_lsp >= ? and ts_lsp < ?"
),
cql_fields, table_name,
);
let qu_name = if opts.with_values {
if ST::is_valueblob() {
format!("array_{}_valueblobs_fwd", ST::st_name())
} else {
format!("array_{}_values_fwd", ST::st_name())
}
} else {
format!("array_{}_timestamps_fwd", ST::st_name())
};
let qu = stmts.read_value_queries.get(&qu_name).ok_or_else(|| {
let e = Error::with_msg_no_trace(format!("can not find query name {}", qu_name));
error!("{e}");
e
})?;
let params = (
series as i64,
ts_msp.ms() as i64,
@@ -216,13 +328,13 @@ where
ts_lsp_max.ns() as i64,
);
trace!("FWD event search params {:?}", params);
let mut res = scy.query_iter(cql, params).await.err_conv()?;
let mut res = scy.execute_iter(qu.clone(), params).await.err_conv()?;
let mut rows = Vec::new();
while let Some(x) = res.next().await {
rows.push(x.err_conv()?);
}
let mut last_before = None;
let ret = convert_rows::<ST>(rows, range, ts_msp, opts.with_values, !fwd, &mut last_before)?;
let ret = convert_rows::<ST>(rows, range, ts_msp, opts.with_values, !opts.fwd, &mut last_before)?;
ret
} else {
let ts_lsp_max = if ts_msp.ns() < range.beg() {
@@ -231,23 +343,29 @@ where
DtNano::from_ns(0)
};
trace!("BCK ts_msp {} ts_lsp_max {} {}", ts_msp, ts_lsp_max, table_name,);
// TODO use prepared!
let cql = format!(
concat!(
"select {} from {}",
" where series = ? and ts_msp = ? and ts_lsp < ? order by ts_lsp desc limit 1"
),
cql_fields, table_name,
);
let qu_name = if opts.with_values {
if ST::is_valueblob() {
format!("array_{}_valueblobs_bck", ST::st_name())
} else {
format!("array_{}_values_bck", ST::st_name())
}
} else {
format!("array_{}_timestamps_bck", ST::st_name())
};
let qu = stmts.read_value_queries.get(&qu_name).ok_or_else(|| {
let e = Error::with_msg_no_trace(format!("can not find query name {}", qu_name));
error!("{e}");
e
})?;
let params = (series as i64, ts_msp.ms() as i64, ts_lsp_max.ns() as i64);
trace!("BCK event search params {:?}", params);
let mut res = scy.query_iter(cql, params).await.err_conv()?;
let mut res = scy.execute_iter(qu.clone(), params).await.err_conv()?;
let mut rows = Vec::new();
while let Some(x) = res.next().await {
rows.push(x.err_conv()?);
}
let mut _last_before = None;
let ret = convert_rows::<ST>(rows, range, ts_msp, opts.with_values, !fwd, &mut _last_before)?;
let ret = convert_rows::<ST>(rows, range, ts_msp, opts.with_values, !opts.fwd, &mut _last_before)?;
if ret.len() > 1 {
error!("multiple events in backwards search {}", ret.len());
}
@@ -274,7 +392,7 @@ fn convert_rows<ST: ValTy>(
trace!("read a value blob len {}", row.2.len());
let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64);
let pulse = row.1 as u64;
let value = ValTy::default();
let value = ValTy::from_valueblob(row.2);
(ts, pulse, value)
} else {
let row: (i64, i64, ST::ScyTy) = row.into_typed().err_conv()?;
@@ -326,7 +444,7 @@ struct ReadValues {
with_values: bool,
fut: Pin<Box<dyn Future<Output = Result<Box<dyn Events>, Error>> + Send>>,
fut_done: bool,
scy: Arc<ScySession>,
scyqueue: ScyllaQueue,
}
impl ReadValues {
@@ -338,7 +456,7 @@ impl ReadValues {
ts_msps: VecDeque<TsMs>,
fwd: bool,
with_values: bool,
scy: Arc<ScySession>,
scyqueue: ScyllaQueue,
) -> Self {
let mut ret = Self {
series,
@@ -352,7 +470,7 @@ impl ReadValues {
"future not initialized",
)))),
fut_done: false,
scy,
scyqueue,
};
ret.next();
ret
@@ -375,7 +493,7 @@ impl ReadValues {
range: self.range.clone(),
fwd: self.fwd,
with_values: self.with_values,
scy: self.scy.clone(),
scyqueue: self.scyqueue.clone(),
};
let scalar_type = self.scalar_type.clone();
let shape = self.shape.clone();
@@ -394,8 +512,8 @@ impl ReadValues {
ScalarType::F64 => read_next_values::<f64>(opts).await,
ScalarType::BOOL => read_next_values::<bool>(opts).await,
ScalarType::STRING => read_next_values::<String>(opts).await,
_ => {
error!("TODO ReadValues add more types");
ScalarType::ChannelStatus => {
warn!("read scalar channel status not yet supported");
err::todoval()
}
},
@@ -411,8 +529,12 @@ impl ReadValues {
ScalarType::F32 => read_next_values::<Vec<f32>>(opts).await,
ScalarType::F64 => read_next_values::<Vec<f64>>(opts).await,
ScalarType::BOOL => read_next_values::<Vec<bool>>(opts).await,
_ => {
error!("TODO ReadValues add more types");
ScalarType::STRING => {
warn!("read array string not yet supported");
err::todoval()
}
ScalarType::ChannelStatus => {
warn!("read array channel status not yet supported");
err::todoval()
}
},
@@ -428,7 +550,7 @@ impl ReadValues {
enum FrState {
New,
FindMsp(Pin<Box<dyn Future<Output = Result<(VecDeque<TsMs>, VecDeque<TsMs>), Error>> + Send>>),
FindMsp(Pin<Box<dyn Future<Output = Result<(VecDeque<TsMs>, VecDeque<TsMs>), crate::worker::Error>> + Send>>),
ReadBack1(ReadValues),
ReadBack2(ReadValues),
ReadValues(ReadValues),
@@ -445,7 +567,7 @@ pub struct EventsStreamScylla {
do_one_before_range: bool,
ts_msp_bck: VecDeque<TsMs>,
ts_msp_fwd: VecDeque<TsMs>,
scy: Arc<ScySession>,
scyqueue: ScyllaQueue,
do_test_stream_error: bool,
found_one_after: bool,
with_values: bool,
@@ -460,7 +582,7 @@ impl EventsStreamScylla {
scalar_type: ScalarType,
shape: Shape,
with_values: bool,
scy: Arc<ScySession>,
scyqueue: ScyllaQueue,
do_test_stream_error: bool,
) -> Self {
debug!("EventsStreamScylla::new");
@@ -473,7 +595,7 @@ impl EventsStreamScylla {
do_one_before_range,
ts_msp_bck: VecDeque::new(),
ts_msp_fwd: VecDeque::new(),
scy,
scyqueue,
do_test_stream_error,
found_one_after: false,
with_values,
@@ -505,7 +627,7 @@ impl EventsStreamScylla {
[msp].into(),
false,
self.with_values,
self.scy.clone(),
self.scyqueue.clone(),
);
self.state = FrState::ReadBack1(st);
} else if self.ts_msp_fwd.len() > 0 {
@@ -518,7 +640,7 @@ impl EventsStreamScylla {
mem::replace(&mut self.ts_msp_fwd, VecDeque::new()),
true,
self.with_values,
self.scy.clone(),
self.scyqueue.clone(),
);
self.state = FrState::ReadValues(st);
} else {
@@ -539,7 +661,7 @@ impl EventsStreamScylla {
mem::replace(&mut self.ts_msp_fwd, VecDeque::new()),
true,
self.with_values,
self.scy.clone(),
self.scyqueue.clone(),
);
self.state = FrState::ReadValues(st);
} else {
@@ -556,7 +678,7 @@ impl EventsStreamScylla {
[msp].into(),
false,
self.with_values,
self.scy.clone(),
self.scyqueue.clone(),
);
self.state = FrState::ReadBack2(st);
} else if self.ts_msp_fwd.len() > 0 {
@@ -569,7 +691,7 @@ impl EventsStreamScylla {
mem::replace(&mut self.ts_msp_fwd, VecDeque::new()),
true,
self.with_values,
self.scy.clone(),
self.scyqueue.clone(),
);
self.state = FrState::ReadValues(st);
} else {
@@ -593,7 +715,7 @@ impl EventsStreamScylla {
mem::replace(&mut self.ts_msp_fwd, VecDeque::new()),
true,
self.with_values,
self.scy.clone(),
self.scyqueue.clone(),
);
self.state = FrState::ReadValues(st);
} else {
@@ -602,6 +724,14 @@ impl EventsStreamScylla {
}
}
async fn find_ts_msp_via_queue(
series: u64,
range: ScyllaSeriesRange,
scyqueue: ScyllaQueue,
) -> Result<(VecDeque<TsMs>, VecDeque<TsMs>), crate::worker::Error> {
scyqueue.find_ts_msp(series, range).await
}
impl Stream for EventsStreamScylla {
type Item = Result<ChannelEvents, Error>;
@@ -620,7 +750,9 @@ impl Stream for EventsStreamScylla {
}
break match self.state {
FrState::New => {
let fut = find_ts_msp(self.series, self.range.clone(), self.scy.clone());
let series = self.series.clone();
let range = self.range.clone();
let fut = find_ts_msp_via_queue(series, range, self.scyqueue.clone());
let fut = Box::pin(fut);
self.state = FrState::FindMsp(fut);
continue;
@@ -633,7 +765,7 @@ impl Stream for EventsStreamScylla {
Ready(Err(e)) => {
error!("EventsStreamScylla FindMsp {e}");
self.state = FrState::DataDone;
Ready(Some(Err(e)))
Ready(Some(Err(e.into())))
}
Pending => Pending,
},

View File

@@ -1,12 +1,20 @@
use crate::conn::create_scy_session_no_ks;
use crate::events::StmtsEventsRt;
use crate::range::ScyllaSeriesRange;
use async_channel::Receiver;
use async_channel::Sender;
use err::thiserror;
use err::ThisError;
use futures_util::Future;
use items_0::Events;
use netpod::log::*;
use netpod::range::evrange::NanoRange;
use netpod::ScyllaConfig;
use netpod::TsMs;
use scylla::Session;
use std::collections::VecDeque;
use std::fmt;
use std::pin::Pin;
use std::sync::Arc;
#[derive(Debug, ThisError)]
pub enum Error {
@@ -24,7 +32,31 @@ impl err::ToErr for Error {
#[derive(Debug)]
enum Job {
JobA(String, Sender<Result<String, Error>>),
FindTsMsp(
// series-id
u64,
ScyllaSeriesRange,
Sender<Result<(VecDeque<TsMs>, VecDeque<TsMs>), Error>>,
),
ReadNextValues(ReadNextValues),
}
struct ReadNextValues {
futgen: Box<
dyn FnOnce(
Arc<Session>,
Arc<StmtsEventsRt>,
) -> Pin<Box<dyn Future<Output = Result<Box<dyn Events>, err::Error>> + Send>>
+ Send,
>,
// fut: Pin<Box<dyn Future<Output = Result<Box<dyn Events>, Error>> + Send>>,
tx: Sender<Result<Box<dyn Events>, Error>>,
}
impl fmt::Debug for ReadNextValues {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "ReadNextValues {{ .. }}")
}
}
#[derive(Debug, Clone)]
@@ -33,31 +65,59 @@ pub struct ScyllaQueue {
}
impl ScyllaQueue {
pub async fn job_a(&self, backend: &str) -> Result<Receiver<Result<String, Error>>, Error> {
pub async fn find_ts_msp(
&self,
series: u64,
range: ScyllaSeriesRange,
) -> Result<(VecDeque<TsMs>, VecDeque<TsMs>), Error> {
let (tx, rx) = async_channel::bounded(1);
let job = Job::JobA(backend.into(), tx);
let job = Job::FindTsMsp(series, range, tx);
self.tx.send(job).await.map_err(|_| Error::ChannelSend)?;
Ok(rx)
let res = rx.recv().await.map_err(|_| Error::ChannelRecv)??;
Ok(res)
}
pub async fn read_next_values<F>(&self, futgen: F) -> Result<Box<dyn Events>, Error>
where
F: FnOnce(
Arc<Session>,
Arc<StmtsEventsRt>,
) -> Pin<Box<dyn Future<Output = Result<Box<dyn Events>, err::Error>> + Send>>
+ Send
+ 'static,
{
let (tx, rx) = async_channel::bounded(1);
let job = Job::ReadNextValues(ReadNextValues {
futgen: Box::new(futgen),
tx,
});
self.tx.send(job).await.map_err(|_| Error::ChannelSend)?;
let res = rx.recv().await.map_err(|_| Error::ChannelRecv)??;
Ok(res)
}
}
#[derive(Debug)]
pub struct ScyllaWorker {
rx: Receiver<Job>,
scy: Session,
// pgjh: Option<JoinHandle<Result<(), err::Error>>>,
scy: Arc<Session>,
stmts_st: Arc<StmtsEventsRt>,
}
impl ScyllaWorker {
pub async fn new(scyconf: &ScyllaConfig) -> Result<(ScyllaQueue, Self), Error> {
pub async fn new(
scyconf_st: &ScyllaConfig,
scyconf_mt: &ScyllaConfig,
scyconf_lt: &ScyllaConfig,
) -> Result<(ScyllaQueue, Self), Error> {
let (tx, rx) = async_channel::bounded(64);
let scy = create_scy_session_no_ks(scyconf).await?;
let scy = create_scy_session_no_ks(scyconf_st).await?;
let scy = Arc::new(scy);
let rtpre = format!("{}.st_", scyconf_st.keyspace);
let stmts_st = StmtsEventsRt::new(&rtpre, &scy).await?;
let stmts_st = Arc::new(stmts_st);
let queue = ScyllaQueue { tx };
let worker = Self {
rx,
scy,
// pgjh: Some(pgjh),
};
let worker = Self { rx, scy, stmts_st };
Ok((queue, worker))
}
@@ -72,12 +132,19 @@ impl ScyllaWorker {
}
};
match job {
Job::JobA(backend, tx) => {
let res = Ok::<_, Error>(backend);
Job::FindTsMsp(series, range, tx) => {
let res = crate::events::find_ts_msp_worker(series, range, &self.stmts_st, &self.scy).await;
if tx.send(res.map_err(Into::into)).await.is_err() {
// TODO count for stats
}
}
Job::ReadNextValues(job) => {
let fut = (job.futgen)(self.scy.clone(), self.stmts_st.clone());
let res = fut.await;
if job.tx.send(res.map_err(Into::into)).await.is_err() {
// TODO count for stats
}
}
}
}
}