Endpoint to fetch the msp timestamps

This commit is contained in:
Dominik Werder
2024-06-14 00:54:31 +02:00
parent e3669e4335
commit 902b9a9cb7
11 changed files with 398 additions and 110 deletions

View File

@@ -1,6 +1,6 @@
[package]
name = "daqbuffer"
version = "0.5.1-aa.0"
version = "0.5.1-aa.1"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"
@@ -13,7 +13,7 @@ serde_json = "1.0"
serde_yaml = "0.9.27"
chrono = "0.4.31"
url = "2.5.0"
clap = { version = "4.4.11", features = ["derive", "cargo"] }
clap = { version = "4.5.7", features = ["derive", "cargo"] }
err = { path = "../err" }
taskrun = { path = "../taskrun" }
netpod = { path = "../netpod" }

View File

@@ -1,5 +1,6 @@
use crate::err::Error;
use crate::response;
use crate::ServiceSharedResources;
use crate::ToPublicResponse;
use dbconn::create_connection;
use dbconn::worker::PgQueue;
@@ -15,8 +16,12 @@ use httpclient::ToJsonBody;
use netpod::get_url_query_pairs;
use netpod::log::*;
use netpod::query::prebinned::PreBinnedQuery;
use netpod::query::PulseRangeQuery;
use netpod::query::TimeRangeQuery;
use netpod::range::evrange::SeriesRange;
use netpod::req_uri_to_url;
use netpod::timeunits::*;
use netpod::ttl::RetentionTime;
use netpod::ChannelConfigQuery;
use netpod::ChannelConfigResponse;
use netpod::ChannelTypeConfigGen;
@@ -31,6 +36,7 @@ use nodenet::configquorum::find_config_basics_quorum;
use query::api4::binned::BinnedQuery;
use query::api4::events::PlainEventsQuery;
use scyllaconn::errconv::ErrConv;
use scyllaconn::range::ScyllaSeriesRange;
use serde::Deserialize;
use serde::Serialize;
use std::collections::BTreeMap;
@@ -368,7 +374,7 @@ pub struct ScyllaChannelsActive {}
impl ScyllaChannelsActive {
pub fn handler(req: &Requ) -> Option<Self> {
if req.uri().path() == "/api/4/channels/active" {
if req.uri().path() == "/api/4/private/channels/active" {
Some(Self {})
} else {
None
@@ -470,7 +476,7 @@ pub struct IocForChannel {}
impl IocForChannel {
pub fn handler(req: &Requ) -> Option<Self> {
if req.uri().path() == "/api/4/channel/ioc" {
if req.uri().path() == "/api/4/private/channel/ioc" {
Some(Self {})
} else {
None
@@ -530,8 +536,8 @@ impl IocForChannel {
#[derive(Clone, Debug, Deserialize)]
pub struct ScyllaSeriesTsMspQuery {
#[serde(rename = "seriesId")]
series: u64,
name: String,
range: SeriesRange,
}
impl FromUrl for ScyllaSeriesTsMspQuery {
@@ -541,32 +547,45 @@ impl FromUrl for ScyllaSeriesTsMspQuery {
}
fn from_pairs(pairs: &BTreeMap<String, String>) -> Result<Self, err::Error> {
let s = pairs
.get("seriesId")
.ok_or_else(|| Error::with_public_msg_no_trace("missing seriesId"))?;
let series: u64 = s.parse()?;
Ok(Self { series })
let name = pairs
.get("channelName")
.ok_or_else(|| Error::with_public_msg_no_trace("missing channelName"))?
.into();
let range = if let Ok(x) = TimeRangeQuery::from_pairs(pairs) {
SeriesRange::TimeRange(x.into())
} else if let Ok(x) = PulseRangeQuery::from_pairs(pairs) {
SeriesRange::PulseRange(x.into())
} else {
return Err(err::Error::with_public_msg_no_trace("no time range in url"));
};
Ok(Self { name, range })
}
}
#[derive(Clone, Debug, Serialize)]
pub struct ScyllaSeriesTsMspResponse {
#[serde(rename = "tsMsps")]
ts_msps: Vec<u64>,
st_ts_msp_ms: Vec<String>,
mt_ts_msp_ms: Vec<String>,
lt_ts_msp_ms: Vec<String>,
}
pub struct ScyllaSeriesTsMsp {}
impl ScyllaSeriesTsMsp {
pub fn handler(req: &Requ) -> Option<Self> {
if req.uri().path() == "/api/4/scylla/series/tsMsps" {
if req.uri().path() == "/api/4/private/scylla/series/tsMsp" {
Some(Self {})
} else {
None
}
}
pub async fn handle(&self, req: Requ, node_config: &NodeConfigCached) -> Result<StreamResponse, Error> {
pub async fn handle(
&self,
req: Requ,
shared_res: &ServiceSharedResources,
ncc: &NodeConfigCached,
) -> Result<StreamResponse, Error> {
if req.method() == Method::GET {
let accept_def = APP_JSON;
let accept = req
@@ -576,7 +595,7 @@ impl ScyllaSeriesTsMsp {
if accept == APP_JSON || accept == ACCEPT_ALL {
let url = req_uri_to_url(req.uri())?;
let q = ScyllaSeriesTsMspQuery::from_url(&url)?;
match self.get_ts_msps(&q, node_config).await {
match self.get_ts_msps(&q, shared_res, ncc).await {
Ok(k) => {
let body = ToJsonBody::from(&k).into_body();
Ok(response(StatusCode::OK).body(body)?)
@@ -595,25 +614,65 @@ impl ScyllaSeriesTsMsp {
async fn get_ts_msps(
&self,
q: &ScyllaSeriesTsMspQuery,
node_config: &NodeConfigCached,
shared_res: &ServiceSharedResources,
ncc: &NodeConfigCached,
) -> Result<ScyllaSeriesTsMspResponse, Error> {
let scyco = node_config
.node_config
.cluster
.scylla_st()
.ok_or_else(|| Error::with_public_msg_no_trace(format!("No Scylla configured")))?;
let scy = scyllaconn::conn::create_scy_session(scyco).await?;
let mut ts_msps = Vec::new();
let mut res = scy
.query_iter("select ts_msp from ts_msp where series = ?", (q.series as i64,))
let backend = &ncc.node_config.cluster.backend;
let name = &q.name;
let nano_range = if let SeriesRange::TimeRange(x) = q.range.clone() {
x
} else {
todo!()
};
let chconf = shared_res
.pgqueue
.chconf_best_matching_name_range_job(backend, name, nano_range)
.await
.err_conv()?;
while let Some(row) = res.next().await {
let row = row.err_conv()?;
let (ts_msp,): (i64,) = row.into_typed().err_conv()?;
ts_msps.push(ts_msp as u64);
.map_err(|e| Error::with_msg_no_trace(format!("error from pg worker: {e}")))?
.recv()
.await
.unwrap()
.unwrap();
use scyllaconn::SeriesId;
let sid = SeriesId::new(chconf.series());
let scyqueue = shared_res.scyqueue.clone().unwrap();
let mut st_ts_msp_ms = Vec::new();
let mut msp_stream =
scyllaconn::events2::msp::MspStream::new(RetentionTime::Short, sid, (&q.range).into(), scyqueue.clone());
use chrono::TimeZone;
while let Some(x) = msp_stream.next().await {
let v = x.unwrap().ms();
let st = chrono::Utc.timestamp_millis_opt(v as _).earliest().unwrap();
let s = st.format(netpod::DATETIME_FMT_0MS).to_string();
st_ts_msp_ms.push(s);
}
let ret = ScyllaSeriesTsMspResponse { ts_msps };
let mut mt_ts_msp_ms = Vec::new();
let mut msp_stream =
scyllaconn::events2::msp::MspStream::new(RetentionTime::Medium, sid, (&q.range).into(), scyqueue.clone());
while let Some(x) = msp_stream.next().await {
let v = x.unwrap().ms();
let st = chrono::Utc.timestamp_millis_opt(v as _).earliest().unwrap();
let s = st.format(netpod::DATETIME_FMT_0MS).to_string();
mt_ts_msp_ms.push(s);
}
let mut lt_ts_msp_ms = Vec::new();
let mut msp_stream =
scyllaconn::events2::msp::MspStream::new(RetentionTime::Long, sid, (&q.range).into(), scyqueue.clone());
while let Some(x) = msp_stream.next().await {
let v = x.unwrap().ms();
let st = chrono::Utc.timestamp_millis_opt(v as _).earliest().unwrap();
let s = st.format(netpod::DATETIME_FMT_0MS).to_string();
lt_ts_msp_ms.push(s);
}
let ret = ScyllaSeriesTsMspResponse {
st_ts_msp_ms,
mt_ts_msp_ms,
lt_ts_msp_ms,
};
Ok(ret)
}
}

View File

@@ -373,7 +373,7 @@ async fn http_service_inner(
} else if let Some(h) = channelconfig::ScyllaChannelsActive::handler(&req) {
Ok(h.handle(req, &node_config).await?)
} else if let Some(h) = channelconfig::ScyllaSeriesTsMsp::handler(&req) {
Ok(h.handle(req, &node_config).await?)
Ok(h.handle(req, &shared_res, &node_config).await?)
} else if let Some(h) = channelconfig::AmbigiousChannelNames::handler(&req) {
Ok(h.handle(req, &node_config).await?)
} else if let Some(h) = api4::accounting::AccountingToplistCounts::handler(&req) {

View File

@@ -72,7 +72,7 @@ pub async fn scylla_channel_event_stream(
item
}
},
Err(e) => Err(e),
Err(e) => Err(Error::with_msg_no_trace(format!("scyllaconn eevents error {e}"))),
};
item
});

View File

@@ -9,6 +9,7 @@ path = "src/scyllaconn.rs"
[dependencies]
futures-util = "0.3.24"
pin-project = "1"
async-channel = "2.3.1"
scylla = "0.13.0"
err = { path = "../err" }
@@ -16,3 +17,4 @@ netpod = { path = "../netpod" }
query = { path = "../query" }
items_0 = { path = "../items_0" }
items_2 = { path = "../items_2" }
series = { path = "../../../daqingest/series" }

View File

@@ -1,7 +1,8 @@
use crate::errconv::ErrConv;
use crate::range::ScyllaSeriesRange;
use crate::worker::ScyllaQueue;
use err::Error;
use err::thiserror;
use err::ThisError;
use futures_util::Future;
use futures_util::FutureExt;
use futures_util::Stream;
@@ -32,6 +33,24 @@ use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
#[derive(Debug, ThisError)]
pub enum Error {
ScyllaQuery(#[from] scylla::transport::errors::QueryError),
ScyllaNextRow(#[from] scylla::transport::iterator::NextRowError),
ScyllaTypeConv(#[from] scylla::cql_to_rust::FromRowError),
ScyllaWorker(Box<crate::worker::Error>),
MissingQuery(String),
RangeEndOverflow,
InvalidFuture,
TestError(String),
}
impl From<crate::worker::Error> for Error {
fn from(value: crate::worker::Error) -> Self {
Self::ScyllaWorker(Box::new(value))
}
}
#[derive(Debug)]
pub struct StmtsLspShape {
u8: PreparedStatement,
@@ -52,7 +71,7 @@ impl StmtsLspShape {
fn st(&self, stname: &str) -> Result<&PreparedStatement, Error> {
let ret = match stname {
"u8" => &self.u8,
_ => return Err(Error::with_msg_no_trace(format!("no query for stname {stname}"))),
_ => return Err(Error::MissingQuery(format!("no query for stname {stname}"))),
};
Ok(ret)
}
@@ -123,7 +142,7 @@ async fn make_msp_dir(ks: &str, rt: &RetentionTime, bck: bool, scy: &Session) ->
table_name,
select_cond
);
let qu = scy.prepare(cql).await.err_conv()?;
let qu = scy.prepare(cql).await?;
Ok(qu)
}
@@ -153,7 +172,7 @@ async fn make_lsp(
stname,
select_cond
);
let qu = scy.prepare(cql).await.err_conv()?;
let qu = scy.prepare(cql).await?;
Ok(qu)
}
@@ -234,54 +253,63 @@ impl StmtsEvents {
}
}
pub(super) async fn find_ts_msp_worker(
pub(super) async fn find_ts_msp(
rt: &RetentionTime,
series: u64,
range: ScyllaSeriesRange,
bck: bool,
stmts: &StmtsEvents,
scy: &ScySession,
) -> Result<VecDeque<TsMs>, Error> {
trace!("find_ts_msp series {:?} {:?} {:?} bck {}", rt, series, range, bck);
if bck {
find_ts_msp_bck(rt, series, range, stmts, scy).await
} else {
find_ts_msp_fwd(rt, series, range, stmts, scy).await
}
}
async fn find_ts_msp_fwd(
rt: &RetentionTime,
series: u64,
range: ScyllaSeriesRange,
stmts: &StmtsEvents,
scy: &ScySession,
) -> Result<(VecDeque<TsMs>, VecDeque<TsMs>), Error> {
trace!("find_ts_msp series {:?} {:?}", series, range);
let mut ret1 = VecDeque::new();
let mut ret2 = VecDeque::new();
let params = (series as i64, range.beg().ms() as i64);
trace!("find_ts_msp query 1 params {:?}", params);
let mut res = scy
.execute_iter(stmts.rt(rt).ts_msp_bck.clone(), params)
.await
.err_conv()?
.into_typed::<(i64,)>();
while let Some(x) = res.next().await {
let row = x.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
let ts = TsMs::from_ms_u64(row.0 as u64);
trace!("query 1 ts_msp {}", ts);
ret1.push_front(ts);
}
) -> Result<VecDeque<TsMs>, Error> {
let mut ret = VecDeque::new();
// TODO time range truncation can be handled better
let params = (series as i64, range.beg().ms() as i64, 1 + range.end().ms() as i64);
trace!("find_ts_msp query 2 params {:?}", params);
let mut res = scy
.execute_iter(stmts.rt(rt).ts_msp_fwd.clone(), params)
.await
.err_conv()?
.await?
.into_typed::<(i64,)>();
while let Some(x) = res.next().await {
let row = x.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
let row = x?;
let ts = TsMs::from_ms_u64(row.0 as u64);
trace!("query 2 ts_msp {}", ts);
ret2.push_front(ts);
ret.push_back(ts);
}
// let cql = "select ts_msp from st_ts_msp where series = ? and ts_msp >= ? limit 1";
// let params = (series as i64, range.end().ms() as i64);
// trace!("find_ts_msp query 3 params {:?}", params);
// let res = scy.query(cql, params).await.err_conv()?;
// for row in res.rows_typed_or_empty::<(i64,)>() {
// let row = row.err_conv()?;
// let ts = TsMs::from_ms_u64(row.0 as u64);
// trace!("query 3 ts_msp {}", ts);
// ret2.push_back(ts);
// }
trace!("find_ts_msp n1 {:?} n2 {:?}", ret1.len(), ret2.len());
Ok((ret1, ret2))
Ok(ret)
}
async fn find_ts_msp_bck(
rt: &RetentionTime,
series: u64,
range: ScyllaSeriesRange,
stmts: &StmtsEvents,
scy: &ScySession,
) -> Result<VecDeque<TsMs>, Error> {
let mut ret = VecDeque::new();
let params = (series as i64, range.beg().ms() as i64);
let mut res = scy
.execute_iter(stmts.rt(rt).ts_msp_bck.clone(), params)
.await?
.into_typed::<(i64,)>();
while let Some(x) = res.next().await {
let row = x?;
let ts = TsMs::from_ms_u64(row.0 as u64);
ret.push_front(ts);
}
Ok(ret)
}
trait ValTy: Sized + 'static {
@@ -438,8 +466,12 @@ where
// TODO could take scyqeue out of opts struct.
let scyqueue = opts.scyqueue.clone();
let futgen = Box::new(|scy: Arc<ScySession>, stmts: Arc<StmtsEvents>| {
let fut = read_next_values_2::<ST>(opts, scy, stmts);
Box::pin(fut) as Pin<Box<dyn Future<Output = Result<Box<dyn Events>, err::Error>> + Send>>
let fut = async {
read_next_values_2::<ST>(opts, scy, stmts)
.await
.map_err(crate::worker::Error::from)
};
Box::pin(fut) as Pin<Box<dyn Future<Output = Result<Box<dyn Events>, crate::worker::Error>> + Send>>
});
let res = scyqueue.read_next_values(futgen).await?;
Ok(res)
@@ -459,7 +491,7 @@ where
let range = opts.range;
let table_name = ST::table_name();
if range.end() > TsNano::from_ns(i64::MAX as u64) {
return Err(Error::with_msg_no_trace(format!("range.end overflows i64")));
return Err(Error::RangeEndOverflow);
}
let ret = if opts.fwd {
let ts_lsp_min = if range.beg() > ts_msp.ns() {
@@ -505,10 +537,10 @@ where
ts_lsp_max.ns() as i64,
);
trace!("FWD event search params {:?}", params);
let mut res = scy.execute_iter(qu.clone(), params).await.err_conv()?;
let mut res = scy.execute_iter(qu.clone(), params).await?;
let mut rows = Vec::new();
while let Some(x) = res.next().await {
rows.push(x.err_conv()?);
rows.push(x?);
}
let mut last_before = None;
let ret = convert_rows::<ST>(rows, range, ts_msp, opts.with_values, !opts.fwd, &mut last_before)?;
@@ -541,10 +573,10 @@ where
.st(ST::st_name())?;
let params = (series as i64, ts_msp.ms() as i64, ts_lsp_max.ns() as i64);
trace!("BCK event search params {:?}", params);
let mut res = scy.execute_iter(qu.clone(), params).await.err_conv()?;
let mut res = scy.execute_iter(qu.clone(), params).await?;
let mut rows = Vec::new();
while let Some(x) = res.next().await {
rows.push(x.err_conv()?);
rows.push(x?);
}
let mut _last_before = None;
let ret = convert_rows::<ST>(rows, range, ts_msp, opts.with_values, !opts.fwd, &mut _last_before)?;
@@ -570,21 +602,21 @@ fn convert_rows<ST: ValTy>(
for row in rows {
let (ts, pulse, value) = if with_values {
if ST::is_valueblob() {
let row: (i64, i64, Vec<u8>) = row.into_typed().err_conv()?;
let row: (i64, i64, Vec<u8>) = row.into_typed()?;
trace!("read a value blob len {}", row.2.len());
let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64);
let pulse = row.1 as u64;
let value = ValTy::from_valueblob(row.2);
(ts, pulse, value)
} else {
let row: (i64, i64, ST::ScyTy) = row.into_typed().err_conv()?;
let row: (i64, i64, ST::ScyTy) = row.into_typed()?;
let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64);
let pulse = row.1 as u64;
let value = ValTy::from_scyty(row.2);
(ts, pulse, value)
}
} else {
let row: (i64, i64) = row.into_typed().err_conv()?;
let row: (i64, i64) = row.into_typed()?;
let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64);
let pulse = row.1 as u64;
let value = ValTy::default();
@@ -651,9 +683,7 @@ impl ReadValues {
ts_msps,
fwd,
with_values,
fut: Box::pin(futures_util::future::ready(Err(Error::with_msg_no_trace(
"future not initialized",
)))),
fut: Box::pin(futures_util::future::ready(Err(Error::InvalidFuture))),
fut_done: false,
scyqueue,
};
@@ -738,7 +768,7 @@ impl ReadValues {
enum FrState {
New,
FindMsp(Pin<Box<dyn Future<Output = Result<(VecDeque<TsMs>, VecDeque<TsMs>), crate::worker::Error>> + Send>>),
FindMsp(Pin<Box<dyn Future<Output = Result<VecDeque<TsMs>, crate::worker::Error>> + Send>>),
ReadBack1(ReadValues),
ReadBack2(ReadValues),
ReadValues(ReadValues),
@@ -930,9 +960,10 @@ async fn find_ts_msp_via_queue(
rt: RetentionTime,
series: u64,
range: ScyllaSeriesRange,
bck: bool,
scyqueue: ScyllaQueue,
) -> Result<(VecDeque<TsMs>, VecDeque<TsMs>), crate::worker::Error> {
scyqueue.find_ts_msp(rt, series, range).await
) -> Result<VecDeque<TsMs>, crate::worker::Error> {
scyqueue.find_ts_msp(rt, series, range, bck).await
}
impl Stream for EventsStreamScylla {
@@ -941,8 +972,7 @@ impl Stream for EventsStreamScylla {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
if self.do_test_stream_error {
let e = Error::with_msg(format!("Test PRIVATE STREAM error."))
.add_public_msg(format!("Test PUBLIC STREAM error."));
let e = Error::TestError("test-message".into());
return Ready(Some(Err(e)));
}
loop {
@@ -967,14 +997,15 @@ impl Stream for EventsStreamScylla {
FrState::New => {
let series = self.series.clone();
let range = self.range.clone();
let fut = find_ts_msp_via_queue(self.rt.clone(), series, range, self.scyqueue.clone());
// TODO this no longer works, we miss the backwards part here
let fut = find_ts_msp_via_queue(self.rt.clone(), series, range, false, self.scyqueue.clone());
let fut = Box::pin(fut);
self.state = FrState::FindMsp(fut);
continue;
}
FrState::FindMsp(ref mut fut) => match fut.poll_unpin(cx) {
Ready(Ok((msps1, msps2))) => {
self.ts_msps_found(msps1, msps2);
Ready(Ok(msps)) => {
self.ts_msps_found(VecDeque::new(), msps);
continue;
}
Ready(Err(e)) => {

View File

@@ -0,0 +1 @@
pub mod msp;

View File

@@ -0,0 +1,192 @@
use crate::range::ScyllaSeriesRange;
use crate::worker::ScyllaQueue;
use err::thiserror;
use err::ThisError;
use futures_util::Future;
use futures_util::FutureExt;
use futures_util::Stream;
use netpod::ttl::RetentionTime;
use netpod::TsMs;
use series::SeriesId;
use std::collections::VecDeque;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
#[derive(Debug, ThisError)]
pub enum Error {
Worker(#[from] crate::worker::Error),
Logic,
}
enum Resolvable<F>
where
F: Future,
{
Future(F),
Output(<F as Future>::Output),
Taken,
}
impl<F> Resolvable<F>
where
F: Future,
{
fn take(&mut self) -> Option<<F as Future>::Output> {
let x = std::mem::replace(self, Resolvable::Taken);
match x {
Resolvable::Future(_) => None,
Resolvable::Output(x) => Some(x),
Resolvable::Taken => None,
}
}
}
struct BckAndFirstFwd {
scyqueue: ScyllaQueue,
fut_bck: Resolvable<Pin<Box<dyn Future<Output = Result<VecDeque<TsMs>, crate::worker::Error>> + Send>>>,
fut_fwd: Resolvable<Pin<Box<dyn Future<Output = Result<VecDeque<TsMs>, crate::worker::Error>> + Send>>>,
}
enum State {
BckAndFirstFwd(BckAndFirstFwd),
InputDone,
}
#[pin_project::pin_project]
pub struct MspStream {
rt: RetentionTime,
series: SeriesId,
range: ScyllaSeriesRange,
#[pin]
state: State,
out: VecDeque<TsMs>,
}
impl MspStream {
pub fn new(rt: RetentionTime, series: SeriesId, range: ScyllaSeriesRange, scyqueue: ScyllaQueue) -> Self {
let fut_bck = {
let scyqueue = scyqueue.clone();
let rt = rt.clone();
let series = series.clone();
let range = range.clone();
async move { scyqueue.find_ts_msp(rt, series.id(), range, true).await }
};
let fut_fwd = {
let scyqueue = scyqueue.clone();
let rt = rt.clone();
let series = series.clone();
let range = range.clone();
async move { scyqueue.find_ts_msp(rt, series.id(), range, false).await }
};
Self {
rt,
series,
range,
state: State::BckAndFirstFwd(BckAndFirstFwd {
scyqueue,
fut_bck: Resolvable::Future(Box::pin(fut_bck)),
fut_fwd: Resolvable::Future(Box::pin(fut_fwd)),
}),
out: VecDeque::new(),
}
}
}
impl Stream for MspStream {
type Item = Result<TsMs, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
loop {
break match &mut self.state {
State::BckAndFirstFwd(st) => {
let mut have_pending = false;
let rsv = &mut st.fut_bck;
match rsv {
Resolvable::Future(fut) => match fut.poll_unpin(cx) {
Ready(x) => {
*rsv = Resolvable::Output(x);
}
Pending => {
have_pending = true;
}
},
Resolvable::Output(_) => {}
Resolvable::Taken => {}
}
let rsv = &mut st.fut_fwd;
match rsv {
Resolvable::Future(fut) => match fut.poll_unpin(cx) {
Ready(x) => {
*rsv = Resolvable::Output(x);
}
Pending => {
have_pending = true;
}
},
Resolvable::Output(_) => {}
Resolvable::Taken => {}
}
if have_pending {
Pending
} else {
let taken_bck = st.fut_bck.take();
let taken_fwd = st.fut_fwd.take();
if let Some(x) = taken_bck {
match x {
Ok(v) => {
for e in v {
self.out.push_back(e)
}
if let Some(x) = taken_fwd {
match x {
Ok(v) => {
for e in v {
self.out.push_back(e)
}
self.state = State::InputDone;
continue;
}
Err(e) => Ready(Some(Err(e.into()))),
}
} else {
Ready(Some(Err(Error::Logic)))
}
}
Err(e) => Ready(Some(Err(e.into()))),
}
} else {
Ready(Some(Err(Error::Logic)))
}
}
}
State::InputDone => {
if let Some(x) = self.out.pop_front() {
Ready(Some(Ok(x)))
} else {
Ready(None)
}
}
};
}
}
}
fn trait_assert<T>(_: T)
where
T: Stream + Unpin + Send,
{
}
#[allow(unused)]
fn trait_assert_try() {
let x: MspStream = todoval();
trait_assert(x);
}
fn todoval<T>() -> T {
todo!()
}

View File

@@ -9,3 +9,4 @@ pub mod status;
pub mod worker;
pub use scylla;
pub use series::SeriesId;

View File

@@ -19,18 +19,13 @@ use std::sync::Arc;
#[derive(Debug, ThisError)]
pub enum Error {
Error(#[from] err::Error),
ScyllaConnection(err::Error),
EventsQuery(#[from] crate::events::Error),
ChannelSend,
ChannelRecv,
Join,
}
impl err::ToErr for Error {
fn to_err(self) -> err::Error {
err::Error::from_string(self)
}
}
#[derive(Debug)]
enum Job {
FindTsMsp(
@@ -38,7 +33,8 @@ enum Job {
// series-id
u64,
ScyllaSeriesRange,
Sender<Result<(VecDeque<TsMs>, VecDeque<TsMs>), Error>>,
bool,
Sender<Result<VecDeque<TsMs>, Error>>,
),
ReadNextValues(ReadNextValues),
}
@@ -48,7 +44,7 @@ struct ReadNextValues {
dyn FnOnce(
Arc<Session>,
Arc<StmtsEvents>,
) -> Pin<Box<dyn Future<Output = Result<Box<dyn Events>, err::Error>> + Send>>
) -> Pin<Box<dyn Future<Output = Result<Box<dyn Events>, Error>> + Send>>
+ Send,
>,
// fut: Pin<Box<dyn Future<Output = Result<Box<dyn Events>, Error>> + Send>>,
@@ -72,9 +68,10 @@ impl ScyllaQueue {
rt: RetentionTime,
series: u64,
range: ScyllaSeriesRange,
) -> Result<(VecDeque<TsMs>, VecDeque<TsMs>), Error> {
bck: bool,
) -> Result<VecDeque<TsMs>, Error> {
let (tx, rx) = async_channel::bounded(1);
let job = Job::FindTsMsp(rt, series, range, tx);
let job = Job::FindTsMsp(rt, series, range, bck, tx);
self.tx.send(job).await.map_err(|_| Error::ChannelSend)?;
let res = rx.recv().await.map_err(|_| Error::ChannelRecv)??;
Ok(res)
@@ -85,7 +82,7 @@ impl ScyllaQueue {
F: FnOnce(
Arc<Session>,
Arc<StmtsEvents>,
) -> Pin<Box<dyn Future<Output = Result<Box<dyn Events>, err::Error>> + Send>>
) -> Pin<Box<dyn Future<Output = Result<Box<dyn Events>, Error>> + Send>>
+ Send
+ 'static,
{
@@ -126,7 +123,9 @@ impl ScyllaWorker {
}
pub async fn work(self) -> Result<(), Error> {
let scy = create_scy_session_no_ks(&self.scyconf_st).await?;
let scy = create_scy_session_no_ks(&self.scyconf_st)
.await
.map_err(Error::ScyllaConnection)?;
let scy = Arc::new(scy);
let kss = [
self.scyconf_st.keyspace.as_str(),
@@ -145,8 +144,8 @@ impl ScyllaWorker {
}
};
match job {
Job::FindTsMsp(rt, series, range, tx) => {
let res = crate::events::find_ts_msp_worker(&rt, series, range, &stmts, &scy).await;
Job::FindTsMsp(rt, series, range, bck, tx) => {
let res = crate::events::find_ts_msp(&rt, series, range, bck, &stmts, &scy).await;
if tx.send(res.map_err(Into::into)).await.is_err() {
// TODO count for stats
}

View File

@@ -169,6 +169,9 @@ fn tracing_init_inner(mode: TracingMode) -> Result<(), Error> {
.from_env()
.map_err(|e| Error::with_msg_no_trace(format!("can not build tracing env filter {e}")))?;
let filter_3 = tracing_subscriber::filter::dynamic_filter_fn(|meta, ctx| {
if true {
return true;
}
if *meta.level() <= tracing::Level::TRACE {
if ["httpret", "scyllaconn"].contains(&meta.target()) {
let mut sr = ctx.lookup_current();