Merge branch 'rtmerge' into dev
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "daqbuffer"
|
||||
version = "0.5.1-aa.1"
|
||||
version = "0.5.1-aa.2"
|
||||
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
|
||||
edition = "2021"
|
||||
|
||||
|
||||
@@ -17,6 +17,7 @@ use netpod::Shape;
|
||||
use taskrun::tokio;
|
||||
use tokio::fs::File;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tracing::Instrument;
|
||||
|
||||
pub fn main() {
|
||||
match taskrun::run(go()) {
|
||||
@@ -55,6 +56,7 @@ fn parse_ts(s: &str) -> Result<DateTime<Utc>, Error> {
|
||||
}
|
||||
|
||||
async fn go() -> Result<(), Error> {
|
||||
let buildmark = "+0008";
|
||||
let opts = Opts::parse();
|
||||
let service_version = ServiceVersion {
|
||||
major: std::env!("CARGO_PKG_VERSION_MAJOR").parse().unwrap_or(0),
|
||||
@@ -64,8 +66,8 @@ async fn go() -> Result<(), Error> {
|
||||
};
|
||||
match opts.subcmd {
|
||||
SubCmd::Retrieval(subcmd) => {
|
||||
info!("daqbuffer version {} +0005", clap::crate_version!());
|
||||
info!(" service_version {}", service_version);
|
||||
info!("daqbuffer version {} {} retrieval", clap::crate_version!(), buildmark);
|
||||
info!(" service_version {} {} retrieval", service_version, buildmark);
|
||||
if false {
|
||||
#[allow(non_snake_case)]
|
||||
let TARGET = std::env!("DAQBUF_TARGET");
|
||||
@@ -95,7 +97,8 @@ async fn go() -> Result<(), Error> {
|
||||
}
|
||||
}
|
||||
SubCmd::Proxy(subcmd) => {
|
||||
info!("daqbuffer proxy {}", clap::crate_version!());
|
||||
info!("daqbuffer version {} {} proxy", clap::crate_version!(), buildmark);
|
||||
info!(" service_version {} {} proxy", service_version, buildmark);
|
||||
let mut config_file = File::open(&subcmd.config).await?;
|
||||
let mut buf = Vec::new();
|
||||
config_file.read_to_end(&mut buf).await?;
|
||||
@@ -143,10 +146,25 @@ async fn go() -> Result<(), Error> {
|
||||
SubCmd::Version => {
|
||||
println!("{}", clap::crate_version!());
|
||||
}
|
||||
SubCmd::TestLog => {
|
||||
test_log().await;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn test_log() {
|
||||
daqbufp2::test_log().await;
|
||||
let logspan = tracing::span!(tracing::Level::INFO, "log_span_debug", spanlevel = "info");
|
||||
daqbufp2::test_log().instrument(logspan).await;
|
||||
let logspan = tracing::span!(tracing::Level::INFO, "log_span_debug", spanlevel = "trace");
|
||||
daqbufp2::test_log().instrument(logspan).await;
|
||||
let logspan = tracing::span!(tracing::Level::TRACE, "log_span_trace", spanlevel = "info");
|
||||
daqbufp2::test_log().instrument(logspan).await;
|
||||
let logspan = tracing::span!(tracing::Level::TRACE, "log_span_trace", spanlevel = "trace");
|
||||
daqbufp2::test_log().instrument(logspan).await;
|
||||
}
|
||||
|
||||
// TODO test data needs to be generated.
|
||||
// TODO use httpclient for the request: need to add binary POST.
|
||||
//#[test]
|
||||
|
||||
@@ -18,6 +18,7 @@ pub enum SubCmd {
|
||||
GenerateTestData,
|
||||
Test,
|
||||
Version,
|
||||
TestLog,
|
||||
}
|
||||
|
||||
#[derive(Debug, Parser)]
|
||||
|
||||
@@ -46,3 +46,13 @@ pub async fn run_proxy(proxy_config: ProxyConfig, service_version: ServiceVersio
|
||||
httpret::proxy::proxy(proxy_config, service_version).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn test_log() {
|
||||
use netpod::log::*;
|
||||
error!("------");
|
||||
warn!("------");
|
||||
info!("------");
|
||||
debug!("------");
|
||||
trace!("------");
|
||||
httpret::test_log().await;
|
||||
}
|
||||
|
||||
@@ -66,7 +66,7 @@ pub async fn delay_io_medium() {
|
||||
}
|
||||
|
||||
pub async fn create_connection(db_config: &Database) -> Result<(PgClient, JoinHandle<Result<(), Error>>), Error> {
|
||||
warn!("create_connection\n\n CREATING CONNECTION\n\n");
|
||||
warn!("create_connection\n\n CREATING POSTGRES CONNECTION\n\n");
|
||||
// TODO use a common already running worker pool for these queries:
|
||||
let d = db_config;
|
||||
let uri = format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, d.port, d.name);
|
||||
|
||||
@@ -13,6 +13,7 @@ use serde_json::Value as JsVal;
|
||||
|
||||
pub async fn search_channel_databuffer(
|
||||
query: ChannelSearchQuery,
|
||||
backend: &str,
|
||||
node_config: &NodeConfigCached,
|
||||
) -> Result<ChannelSearchResult, Error> {
|
||||
let empty = if !query.name_regex.is_empty() {
|
||||
@@ -33,12 +34,19 @@ pub async fn search_channel_databuffer(
|
||||
" channel_id, channel_name, source_name,",
|
||||
" dtype, shape, unit, description, channel_backend",
|
||||
" from searchext($1, $2, $3, $4)",
|
||||
" where channel_backend = $5"
|
||||
);
|
||||
let (pg, _pgjh) = create_connection(&node_config.node_config.cluster.database).await?;
|
||||
let rows = pg
|
||||
.query(
|
||||
sql,
|
||||
&[&query.name_regex, &query.source_regex, &query.description_regex, &"asc"],
|
||||
&[
|
||||
&query.name_regex,
|
||||
&query.source_regex,
|
||||
&query.description_regex,
|
||||
&"asc",
|
||||
&backend,
|
||||
],
|
||||
)
|
||||
.await
|
||||
.err_conv()?;
|
||||
@@ -90,14 +98,19 @@ pub async fn search_channel_databuffer(
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
pub async fn search_channel_scylla(query: ChannelSearchQuery, pgconf: &Database) -> Result<ChannelSearchResult, Error> {
|
||||
pub async fn search_channel_scylla(
|
||||
query: ChannelSearchQuery,
|
||||
backend: &str,
|
||||
pgconf: &Database,
|
||||
) -> Result<ChannelSearchResult, Error> {
|
||||
let empty = if !query.name_regex.is_empty() { false } else { true };
|
||||
if empty {
|
||||
let ret = ChannelSearchResult { channels: Vec::new() };
|
||||
return Ok(ret);
|
||||
}
|
||||
let ch_kind: i16 = if query.channel_status { 1 } else { 2 };
|
||||
let (cb1, cb2) = if let Some(x) = &query.backend {
|
||||
let tmp_backend = Some(backend.to_string());
|
||||
let (cb1, cb2) = if let Some(x) = &tmp_backend {
|
||||
(false, x.as_str())
|
||||
} else {
|
||||
(true, "")
|
||||
@@ -266,19 +279,17 @@ async fn search_channel_archeng(
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
pub async fn search_channel(
|
||||
query: ChannelSearchQuery,
|
||||
node_config: &NodeConfigCached,
|
||||
) -> Result<ChannelSearchResult, Error> {
|
||||
let pgconf = &node_config.node_config.cluster.database;
|
||||
if let Some(_scyconf) = node_config.node_config.cluster.scylla_st() {
|
||||
search_channel_scylla(query, pgconf).await
|
||||
} else if let Some(conf) = node_config.node.channel_archiver.as_ref() {
|
||||
search_channel_archeng(query, node_config.node_config.cluster.backend.clone(), conf, pgconf).await
|
||||
} else if let Some(_conf) = node_config.node.archiver_appliance.as_ref() {
|
||||
pub async fn search_channel(query: ChannelSearchQuery, ncc: &NodeConfigCached) -> Result<ChannelSearchResult, Error> {
|
||||
let backend = &ncc.node_config.cluster.backend;
|
||||
let pgconf = &ncc.node_config.cluster.database;
|
||||
if let Some(_scyconf) = ncc.node_config.cluster.scylla_st() {
|
||||
search_channel_scylla(query, backend, pgconf).await
|
||||
} else if let Some(conf) = ncc.node.channel_archiver.as_ref() {
|
||||
search_channel_archeng(query, backend.clone(), conf, pgconf).await
|
||||
} else if let Some(_conf) = ncc.node.archiver_appliance.as_ref() {
|
||||
// TODO
|
||||
err::todoval()
|
||||
} else {
|
||||
search_channel_databuffer(query, node_config).await
|
||||
search_channel_databuffer(query, backend, ncc).await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ use err::thiserror;
|
||||
use err::PublicError;
|
||||
use err::ThisError;
|
||||
use err::ToPublicError;
|
||||
use futures_util::Stream;
|
||||
use http::Method;
|
||||
use http::StatusCode;
|
||||
use httpclient::body_empty;
|
||||
@@ -16,7 +17,7 @@ use httpclient::StreamResponse;
|
||||
use netpod::log::*;
|
||||
use netpod::NodeConfigCached;
|
||||
use std::sync::Arc;
|
||||
use tracing::Instrument;
|
||||
use streams::instrument::InstrumentStream;
|
||||
|
||||
#[derive(Debug, ThisError)]
|
||||
pub enum EventDataError {
|
||||
@@ -85,9 +86,8 @@ impl EventDataHandler {
|
||||
.await
|
||||
.map_err(|_| EventDataError::InternalError)?;
|
||||
let (evsubq,) = nodenet::conn::events_parse_input_query(frames).map_err(|_| EventDataError::QueryParse)?;
|
||||
let logspan = if false {
|
||||
tracing::Span::none()
|
||||
} else if evsubq.log_level() == "trace" {
|
||||
info!("{:?}", evsubq);
|
||||
let logspan = if evsubq.log_level() == "trace" {
|
||||
trace!("enable trace for handler");
|
||||
tracing::span!(tracing::Level::INFO, "log_span_trace")
|
||||
} else if evsubq.log_level() == "debug" {
|
||||
@@ -96,10 +96,12 @@ impl EventDataHandler {
|
||||
} else {
|
||||
tracing::Span::none()
|
||||
};
|
||||
use tracing::Instrument;
|
||||
let stream = nodenet::conn::create_response_bytes_stream(evsubq, shared_res.scyqueue.as_ref(), ncc)
|
||||
.instrument(logspan)
|
||||
.instrument(logspan.clone())
|
||||
.await
|
||||
.map_err(|e| EventDataError::Error(Box::new(e)))?;
|
||||
let stream = InstrumentStream::new(stream, logspan);
|
||||
let ret = response(StatusCode::OK)
|
||||
.body(body_stream(stream))
|
||||
.map_err(|_| EventDataError::InternalError)?;
|
||||
|
||||
@@ -29,6 +29,7 @@ use netpod::NodeConfigCached;
|
||||
use netpod::ReqCtx;
|
||||
use nodenet::client::OpenBoxedBytesViaHttp;
|
||||
use query::api4::events::PlainEventsQuery;
|
||||
use streams::instrument::InstrumentStream;
|
||||
use tracing::Instrument;
|
||||
|
||||
pub struct EventsHandler {}
|
||||
@@ -57,9 +58,7 @@ impl EventsHandler {
|
||||
let evq =
|
||||
PlainEventsQuery::from_url(&url).map_err(|e| e.add_public_msg(format!("Can not understand query")))?;
|
||||
debug!("{self_name} evq {evq:?}");
|
||||
let logspan = if false {
|
||||
tracing::Span::none()
|
||||
} else if evq.log_level() == "trace" {
|
||||
let logspan = if evq.log_level() == "trace" {
|
||||
trace!("enable trace for handler");
|
||||
tracing::span!(tracing::Level::INFO, "log_span_trace")
|
||||
} else if evq.log_level() == "debug" {
|
||||
@@ -134,6 +133,16 @@ async fn plain_events_cbor_framed(
|
||||
}
|
||||
})
|
||||
.filter(|x| if let Ok(x) = x { ready(x.len() > 0) } else { ready(true) });
|
||||
let logspan = if evq.log_level() == "trace" {
|
||||
trace!("enable trace for handler");
|
||||
tracing::span!(tracing::Level::INFO, "log_span_trace")
|
||||
} else if evq.log_level() == "debug" {
|
||||
debug!("enable debug for handler");
|
||||
tracing::span!(tracing::Level::INFO, "log_span_debug")
|
||||
} else {
|
||||
tracing::Span::none()
|
||||
};
|
||||
let stream = InstrumentStream::new(stream, logspan);
|
||||
let ret = response(StatusCode::OK).body(body_stream(stream))?;
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
@@ -639,7 +639,7 @@ impl ScyllaSeriesTsMsp {
|
||||
|
||||
let mut st_ts_msp_ms = Vec::new();
|
||||
let mut msp_stream =
|
||||
scyllaconn::events2::msp::MspStream::new(RetentionTime::Short, sid, (&q.range).into(), scyqueue.clone());
|
||||
scyllaconn::events2::msp::MspStreamRt::new(RetentionTime::Short, sid, (&q.range).into(), scyqueue.clone());
|
||||
use chrono::TimeZone;
|
||||
while let Some(x) = msp_stream.next().await {
|
||||
let v = x.unwrap().ms();
|
||||
@@ -650,7 +650,7 @@ impl ScyllaSeriesTsMsp {
|
||||
|
||||
let mut mt_ts_msp_ms = Vec::new();
|
||||
let mut msp_stream =
|
||||
scyllaconn::events2::msp::MspStream::new(RetentionTime::Medium, sid, (&q.range).into(), scyqueue.clone());
|
||||
scyllaconn::events2::msp::MspStreamRt::new(RetentionTime::Medium, sid, (&q.range).into(), scyqueue.clone());
|
||||
while let Some(x) = msp_stream.next().await {
|
||||
let v = x.unwrap().ms();
|
||||
let st = chrono::Utc.timestamp_millis_opt(v as _).earliest().unwrap();
|
||||
@@ -660,7 +660,7 @@ impl ScyllaSeriesTsMsp {
|
||||
|
||||
let mut lt_ts_msp_ms = Vec::new();
|
||||
let mut msp_stream =
|
||||
scyllaconn::events2::msp::MspStream::new(RetentionTime::Long, sid, (&q.range).into(), scyqueue.clone());
|
||||
scyllaconn::events2::msp::MspStreamRt::new(RetentionTime::Long, sid, (&q.range).into(), scyqueue.clone());
|
||||
while let Some(x) = msp_stream.next().await {
|
||||
let v = x.unwrap().ms();
|
||||
let st = chrono::Utc.timestamp_millis_opt(v as _).earliest().unwrap();
|
||||
|
||||
@@ -522,3 +522,12 @@ async fn clear_cache_all(
|
||||
.body(body_string(serde_json::to_string(&res)?))?;
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
pub async fn test_log() {
|
||||
error!("------");
|
||||
warn!("------");
|
||||
info!("------");
|
||||
debug!("------");
|
||||
trace!("------");
|
||||
scyllaconn::test_log().await;
|
||||
}
|
||||
|
||||
@@ -74,11 +74,12 @@ pub async fn channel_search(req: Requ, ctx: &ReqCtx, proxy_config: &ProxyConfig)
|
||||
let res: ChannelSearchResult = match serde_json::from_slice(&body) {
|
||||
Ok(k) => k,
|
||||
Err(_) => {
|
||||
let msg = format!("can not parse result: {}", String::from_utf8_lossy(&body));
|
||||
let msg = format!("can not parse result tag {} {}", tag, String::from_utf8_lossy(&body));
|
||||
error!("{}", msg);
|
||||
return Err(Error::with_msg_no_trace(msg));
|
||||
}
|
||||
};
|
||||
info!("from {} len {}", tag, res.channels.len());
|
||||
let ret = SubRes {
|
||||
tag,
|
||||
status: StatusCode::OK,
|
||||
|
||||
@@ -110,6 +110,8 @@ impl fmt::Display for MergeError {
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for MergeError {}
|
||||
|
||||
// TODO can I remove the Any bound?
|
||||
|
||||
/// Container of some form of events, for use as trait object.
|
||||
@@ -137,7 +139,7 @@ pub trait Events:
|
||||
// TODO is this used?
|
||||
fn take_new_events_until_ts(&mut self, ts_end: u64) -> Box<dyn Events>;
|
||||
fn new_empty_evs(&self) -> Box<dyn Events>;
|
||||
fn drain_into_evs(&mut self, dst: &mut Box<dyn Events>, range: (usize, usize)) -> Result<(), MergeError>;
|
||||
fn drain_into_evs(&mut self, dst: &mut dyn Events, range: (usize, usize)) -> Result<(), MergeError>;
|
||||
fn find_lowest_index_gt_evs(&self, ts: u64) -> Option<usize>;
|
||||
fn find_lowest_index_ge_evs(&self, ts: u64) -> Option<usize>;
|
||||
fn find_highest_index_lt_evs(&self, ts: u64) -> Option<usize>;
|
||||
@@ -151,6 +153,7 @@ pub trait Events:
|
||||
fn to_min_max_avg(&mut self) -> Box<dyn Events>;
|
||||
fn to_json_vec_u8(&self) -> Vec<u8>;
|
||||
fn to_cbor_vec_u8(&self) -> Vec<u8>;
|
||||
fn clear(&mut self);
|
||||
}
|
||||
|
||||
impl WithLen for Box<dyn Events> {
|
||||
@@ -222,7 +225,7 @@ impl Events for Box<dyn Events> {
|
||||
Events::new_empty_evs(self.as_ref())
|
||||
}
|
||||
|
||||
fn drain_into_evs(&mut self, dst: &mut Box<dyn Events>, range: (usize, usize)) -> Result<(), MergeError> {
|
||||
fn drain_into_evs(&mut self, dst: &mut dyn Events, range: (usize, usize)) -> Result<(), MergeError> {
|
||||
Events::drain_into_evs(self.as_mut(), dst, range)
|
||||
}
|
||||
|
||||
@@ -277,4 +280,8 @@ impl Events for Box<dyn Events> {
|
||||
fn to_cbor_vec_u8(&self) -> Vec<u8> {
|
||||
Events::to_cbor_vec_u8(self.as_ref())
|
||||
}
|
||||
|
||||
fn clear(&mut self) {
|
||||
Events::clear(self.as_mut())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -154,6 +154,15 @@ pub enum ChannelEvents {
|
||||
Status(Option<ConnStatusEvent>),
|
||||
}
|
||||
|
||||
impl ChannelEvents {
|
||||
pub fn is_events(&self) -> bool {
|
||||
match self {
|
||||
ChannelEvents::Events(_) => true,
|
||||
ChannelEvents::Status(_) => false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TypeName for ChannelEvents {
|
||||
fn type_name(&self) -> String {
|
||||
any::type_name::<Self>().into()
|
||||
@@ -702,6 +711,17 @@ impl Mergeable for ChannelEvents {
|
||||
}
|
||||
}
|
||||
|
||||
fn clear(&mut self) {
|
||||
match self {
|
||||
ChannelEvents::Events(x) => {
|
||||
Mergeable::clear(x);
|
||||
}
|
||||
ChannelEvents::Status(x) => {
|
||||
*x = None;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn drain_into(&mut self, dst: &mut Self, range: (usize, usize)) -> Result<(), MergeError> {
|
||||
match self {
|
||||
ChannelEvents::Events(k) => match dst {
|
||||
@@ -829,7 +849,10 @@ impl Events for ChannelEvents {
|
||||
}
|
||||
|
||||
fn verify(&self) -> bool {
|
||||
todo!()
|
||||
match self {
|
||||
ChannelEvents::Events(x) => Events::verify(x),
|
||||
ChannelEvents::Status(_) => panic!(),
|
||||
}
|
||||
}
|
||||
|
||||
fn output_info(&self) -> String {
|
||||
@@ -861,11 +884,26 @@ impl Events for ChannelEvents {
|
||||
}
|
||||
|
||||
fn new_empty_evs(&self) -> Box<dyn Events> {
|
||||
todo!()
|
||||
match self {
|
||||
ChannelEvents::Events(x) => Events::new_empty_evs(x),
|
||||
ChannelEvents::Status(_) => panic!(),
|
||||
}
|
||||
}
|
||||
|
||||
fn drain_into_evs(&mut self, dst: &mut Box<dyn Events>, range: (usize, usize)) -> Result<(), MergeError> {
|
||||
todo!()
|
||||
fn drain_into_evs(&mut self, dst: &mut dyn Events, range: (usize, usize)) -> Result<(), MergeError> {
|
||||
let dst2 = if let Some(x) = dst.as_any_mut().downcast_mut::<Self>() {
|
||||
// debug!("unwrapped dst ChannelEvents as well");
|
||||
x
|
||||
} else {
|
||||
panic!("dst is not ChannelEvents");
|
||||
};
|
||||
match self {
|
||||
ChannelEvents::Events(k) => match dst2 {
|
||||
ChannelEvents::Events(j) => Events::drain_into_evs(k, j, range),
|
||||
ChannelEvents::Status(_) => panic!("dst is not events"),
|
||||
},
|
||||
ChannelEvents::Status(_) => panic!("self is not events"),
|
||||
}
|
||||
}
|
||||
|
||||
fn find_lowest_index_gt_evs(&self, ts: u64) -> Option<usize> {
|
||||
@@ -896,11 +934,14 @@ impl Events for ChannelEvents {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn tss(&self) -> &std::collections::VecDeque<u64> {
|
||||
todo!()
|
||||
fn tss(&self) -> &VecDeque<u64> {
|
||||
match self {
|
||||
ChannelEvents::Events(x) => Events::tss(x),
|
||||
ChannelEvents::Status(_) => panic!(),
|
||||
}
|
||||
}
|
||||
|
||||
fn pulses(&self) -> &std::collections::VecDeque<u64> {
|
||||
fn pulses(&self) -> &VecDeque<u64> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
@@ -934,6 +975,15 @@ impl Events for ChannelEvents {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn clear(&mut self) {
|
||||
match self {
|
||||
ChannelEvents::Events(x) => Events::clear(x.as_mut()),
|
||||
ChannelEvents::Status(x) => {
|
||||
*x = None;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Collectable for ChannelEvents {
|
||||
|
||||
@@ -201,6 +201,17 @@ impl Mergeable for EventFull {
|
||||
Empty::empty()
|
||||
}
|
||||
|
||||
fn clear(&mut self) {
|
||||
self.tss.clear();
|
||||
self.pulses.clear();
|
||||
self.blobs.clear();
|
||||
self.scalar_types.clear();
|
||||
self.be.clear();
|
||||
self.shapes.clear();
|
||||
self.comps.clear();
|
||||
self.entry_payload_max = 0;
|
||||
}
|
||||
|
||||
fn drain_into(&mut self, dst: &mut Self, range: (usize, usize)) -> Result<(), MergeError> {
|
||||
// TODO make it harder to forget new members when the struct may get modified in the future
|
||||
let r = range.0..range.1;
|
||||
|
||||
@@ -74,6 +74,23 @@ macro_rules! trace2 {
|
||||
($($arg:tt)*) => { trace!($($arg)*); };
|
||||
}
|
||||
|
||||
#[derive(Clone, PartialEq, Serialize, Deserialize)]
|
||||
pub struct EventsDim0NoPulse<STY> {
|
||||
pub tss: VecDeque<u64>,
|
||||
pub values: VecDeque<STY>,
|
||||
}
|
||||
|
||||
impl<STY> From<EventsDim0NoPulse<STY>> for EventsDim0<STY> {
|
||||
fn from(value: EventsDim0NoPulse<STY>) -> Self {
|
||||
let pulses = vec![0; value.tss.len()].into();
|
||||
Self {
|
||||
tss: value.tss,
|
||||
pulses,
|
||||
values: value.values,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, PartialEq, Serialize, Deserialize)]
|
||||
pub struct EventsDim0<STY> {
|
||||
pub tss: VecDeque<u64>,
|
||||
@@ -859,9 +876,9 @@ impl<STY: ScalarOps> Events for EventsDim0<STY> {
|
||||
Box::new(Self::empty())
|
||||
}
|
||||
|
||||
fn drain_into_evs(&mut self, dst: &mut Box<dyn Events>, range: (usize, usize)) -> Result<(), MergeError> {
|
||||
fn drain_into_evs(&mut self, dst: &mut dyn Events, range: (usize, usize)) -> Result<(), MergeError> {
|
||||
// TODO as_any and as_any_mut are declared on unrelated traits. Simplify.
|
||||
if let Some(dst) = dst.as_mut().as_any_mut().downcast_mut::<Self>() {
|
||||
if let Some(dst) = dst.as_any_mut().downcast_mut::<Self>() {
|
||||
// TODO make it harder to forget new members when the struct may get modified in the future
|
||||
let r = range.0..range.1;
|
||||
dst.tss.extend(self.tss.drain(r.clone()));
|
||||
@@ -869,7 +886,12 @@ impl<STY: ScalarOps> Events for EventsDim0<STY> {
|
||||
dst.values.extend(self.values.drain(r.clone()));
|
||||
Ok(())
|
||||
} else {
|
||||
error!("downcast to EventsDim0 FAILED");
|
||||
error!(
|
||||
"downcast to EventsDim0 FAILED\n\n{}\n\n{}\n\n",
|
||||
self.type_name(),
|
||||
dst.type_name()
|
||||
);
|
||||
panic!();
|
||||
Err(MergeError::NotCompatible)
|
||||
}
|
||||
}
|
||||
@@ -989,6 +1011,12 @@ impl<STY: ScalarOps> Events for EventsDim0<STY> {
|
||||
ciborium::into_writer(&ret, &mut buf).unwrap();
|
||||
buf
|
||||
}
|
||||
|
||||
fn clear(&mut self) {
|
||||
self.tss.clear();
|
||||
self.pulses.clear();
|
||||
self.values.clear();
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
||||
@@ -47,6 +47,23 @@ macro_rules! trace2 {
|
||||
($($arg:tt)*) => (trace!($($arg)*));
|
||||
}
|
||||
|
||||
#[derive(Clone, PartialEq, Serialize, Deserialize)]
|
||||
pub struct EventsDim1NoPulse<STY> {
|
||||
pub tss: VecDeque<u64>,
|
||||
pub values: VecDeque<Vec<STY>>,
|
||||
}
|
||||
|
||||
impl<STY> From<EventsDim1NoPulse<STY>> for EventsDim1<STY> {
|
||||
fn from(value: EventsDim1NoPulse<STY>) -> Self {
|
||||
let pulses = vec![0; value.tss.len()].into();
|
||||
Self {
|
||||
tss: value.tss,
|
||||
pulses,
|
||||
values: value.values,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, PartialEq, Serialize, Deserialize)]
|
||||
pub struct EventsDim1<STY> {
|
||||
pub tss: VecDeque<u64>,
|
||||
@@ -813,9 +830,9 @@ impl<STY: ScalarOps> Events for EventsDim1<STY> {
|
||||
Box::new(Self::empty())
|
||||
}
|
||||
|
||||
fn drain_into_evs(&mut self, dst: &mut Box<dyn Events>, range: (usize, usize)) -> Result<(), MergeError> {
|
||||
fn drain_into_evs(&mut self, dst: &mut dyn Events, range: (usize, usize)) -> Result<(), MergeError> {
|
||||
// TODO as_any and as_any_mut are declared on unrelated traits. Simplify.
|
||||
if let Some(dst) = dst.as_mut().as_any_mut().downcast_mut::<Self>() {
|
||||
if let Some(dst) = dst.as_any_mut().downcast_mut::<Self>() {
|
||||
// TODO make it harder to forget new members when the struct may get modified in the future
|
||||
let r = range.0..range.1;
|
||||
dst.tss.extend(self.tss.drain(r.clone()));
|
||||
@@ -949,6 +966,12 @@ impl<STY: ScalarOps> Events for EventsDim1<STY> {
|
||||
ciborium::into_writer(&ret, &mut buf).unwrap();
|
||||
buf
|
||||
}
|
||||
|
||||
fn clear(&mut self) {
|
||||
self.tss.clear();
|
||||
self.pulses.clear();
|
||||
self.values.clear();
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
||||
@@ -261,9 +261,9 @@ impl<STY: ScalarOps> Events for EventsXbinDim0<STY> {
|
||||
Box::new(Self::empty())
|
||||
}
|
||||
|
||||
fn drain_into_evs(&mut self, dst: &mut Box<dyn Events>, range: (usize, usize)) -> Result<(), MergeError> {
|
||||
fn drain_into_evs(&mut self, dst: &mut dyn Events, range: (usize, usize)) -> Result<(), MergeError> {
|
||||
// TODO as_any and as_any_mut are declared on unrelated traits. Simplify.
|
||||
if let Some(dst) = dst.as_mut().as_any_mut().downcast_mut::<Self>() {
|
||||
if let Some(dst) = dst.as_any_mut().downcast_mut::<Self>() {
|
||||
// TODO make it harder to forget new members when the struct may get modified in the future
|
||||
let r = range.0..range.1;
|
||||
dst.tss.extend(self.tss.drain(r.clone()));
|
||||
@@ -365,6 +365,14 @@ impl<STY: ScalarOps> Events for EventsXbinDim0<STY> {
|
||||
fn to_cbor_vec_u8(&self) -> Vec<u8> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn clear(&mut self) {
|
||||
self.tss.clear();
|
||||
self.pulses.clear();
|
||||
self.mins.clear();
|
||||
self.maxs.clear();
|
||||
self.avgs.clear();
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
||||
@@ -170,6 +170,10 @@ impl Mergeable for Box<dyn Events> {
|
||||
self.as_ref().new_empty_evs()
|
||||
}
|
||||
|
||||
fn clear(&mut self) {
|
||||
Events::clear(self.as_mut())
|
||||
}
|
||||
|
||||
fn drain_into(&mut self, dst: &mut Self, range: (usize, usize)) -> Result<(), MergeError> {
|
||||
self.as_mut().drain_into_evs(dst, range)
|
||||
}
|
||||
|
||||
@@ -47,6 +47,7 @@ pub trait Mergeable<Rhs = Self>: fmt::Debug + WithLen + ByteEstimate + Unpin {
|
||||
fn ts_min(&self) -> Option<u64>;
|
||||
fn ts_max(&self) -> Option<u64>;
|
||||
fn new_empty(&self) -> Self;
|
||||
fn clear(&mut self);
|
||||
// TODO when MergeError::Full gets returned, any guarantees about what has been modified or kept unchanged?
|
||||
fn drain_into(&mut self, dst: &mut Self, range: (usize, usize)) -> Result<(), MergeError>;
|
||||
fn find_lowest_index_gt(&self, ts: u64) -> Option<usize>;
|
||||
|
||||
@@ -11,6 +11,7 @@ path = "src/netpod.rs"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1.0"
|
||||
http = "1.0.0"
|
||||
humantime = "2.1.0"
|
||||
humantime-serde = "1.1.1"
|
||||
async-channel = "1.8.0"
|
||||
bytes = "1.4.0"
|
||||
|
||||
@@ -226,7 +226,7 @@ impl<'de> serde::de::Visitor<'de> for ScalarTypeVis {
|
||||
"bool" => ScalarType::BOOL,
|
||||
"string" => ScalarType::STRING,
|
||||
"enum" => ScalarType::Enum,
|
||||
"channelstatus" => ScalarType::ChannelStatus,
|
||||
"ChannelStatus" => ScalarType::ChannelStatus,
|
||||
k => return Err(E::custom(format!("can not understand variant {k:?}"))),
|
||||
};
|
||||
Ok(ret)
|
||||
|
||||
@@ -85,6 +85,20 @@ pub struct TimeRangeQuery {
|
||||
range: NanoRange,
|
||||
}
|
||||
|
||||
fn parse_time(v: &str) -> Result<DateTime<Utc>, Error> {
|
||||
if let Ok(x) = v.parse() {
|
||||
Ok(x)
|
||||
} else {
|
||||
if v.ends_with("ago") {
|
||||
let d = humantime::parse_duration(&v[..v.len() - 3])
|
||||
.map_err(|_| Error::with_public_msg_no_trace(format!("can not parse {v}")))?;
|
||||
Ok(Utc::now() - d)
|
||||
} else {
|
||||
Err(Error::with_public_msg_no_trace(format!("can not parse {v}")))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl FromUrl for TimeRangeQuery {
|
||||
fn from_url(url: &Url) -> Result<Self, Error> {
|
||||
let pairs = get_url_query_pairs(url);
|
||||
@@ -95,8 +109,8 @@ impl FromUrl for TimeRangeQuery {
|
||||
if let (Some(beg), Some(end)) = (pairs.get("begDate"), pairs.get("endDate")) {
|
||||
let ret = Self {
|
||||
range: NanoRange {
|
||||
beg: beg.parse::<DateTime<Utc>>()?.to_nanos(),
|
||||
end: end.parse::<DateTime<Utc>>()?.to_nanos(),
|
||||
beg: parse_time(beg)?.to_nanos(),
|
||||
end: parse_time(end)?.to_nanos(),
|
||||
},
|
||||
};
|
||||
Ok(ret)
|
||||
|
||||
@@ -77,6 +77,15 @@ impl NanoRange {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<(u64, u64)> for NanoRange {
|
||||
fn from(value: (u64, u64)) -> Self {
|
||||
Self {
|
||||
beg: value.0,
|
||||
end: value.1,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<&SeriesRange> for NanoRange {
|
||||
type Error = Error;
|
||||
|
||||
|
||||
@@ -1,6 +1,12 @@
|
||||
use core::fmt;
|
||||
use err::thiserror;
|
||||
use err::ThisError;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use std::str::FromStr;
|
||||
use std::time::Duration;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub enum RetentionTime {
|
||||
Short,
|
||||
Medium,
|
||||
@@ -59,3 +65,43 @@ impl RetentionTime {
|
||||
self.ttl_events_d0()
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for RetentionTime {
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||
let s = match self {
|
||||
RetentionTime::Short => "short",
|
||||
RetentionTime::Medium => "medium",
|
||||
RetentionTime::Long => "long",
|
||||
};
|
||||
fmt.write_str(s)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, ThisError)]
|
||||
pub enum Error {
|
||||
Parse,
|
||||
}
|
||||
|
||||
impl FromStr for RetentionTime {
|
||||
type Err = Error;
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
let ret = match s {
|
||||
"short" => Self::Short,
|
||||
"medium" => Self::Medium,
|
||||
"long" => Self::Long,
|
||||
_ => return Err(Error::Parse),
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
}
|
||||
|
||||
// impl ToString for RetentionTime {
|
||||
// fn to_string(&self) -> String {
|
||||
// match self {
|
||||
// RetentionTime::Short => "short".into(),
|
||||
// RetentionTime::Medium => "medium".into(),
|
||||
// RetentionTime::Long => "long".into(),
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
@@ -118,7 +118,7 @@ pub async fn create_response_bytes_stream(
|
||||
scyqueue: Option<&ScyllaQueue>,
|
||||
ncc: &NodeConfigCached,
|
||||
) -> Result<BoxedBytesStream, Error> {
|
||||
debug!(
|
||||
info!(
|
||||
"create_response_bytes_stream {:?} {:?}",
|
||||
evq.ch_conf().scalar_type(),
|
||||
evq.ch_conf().shape(),
|
||||
@@ -275,6 +275,7 @@ pub fn events_parse_input_query(frames: Vec<InMemoryFrame>) -> Result<(EventsSub
|
||||
},
|
||||
Err(e) => return Err(e),
|
||||
};
|
||||
info!("parsing json {:?}", qitem.str());
|
||||
let frame1: Frame1Parts = serde_json::from_str(&qitem.str()).map_err(|e| {
|
||||
let e = Error::with_msg_no_trace(format!("json parse error: {} inp {}", e, qitem.str()));
|
||||
error!("{e}");
|
||||
|
||||
@@ -1,15 +1,16 @@
|
||||
use err::Error;
|
||||
use futures_util::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use futures_util::TryStreamExt;
|
||||
use items_0::streamitem::RangeCompletableItem;
|
||||
use items_0::streamitem::Sitemty;
|
||||
use items_0::streamitem::StreamItem;
|
||||
use items_2::channelevents::ChannelEvents;
|
||||
use netpod::log::*;
|
||||
use netpod::ttl::RetentionTime;
|
||||
use netpod::ChConf;
|
||||
use query::api4::events::EventsSubQuery;
|
||||
use scyllaconn::worker::ScyllaQueue;
|
||||
use scyllaconn::SeriesId;
|
||||
use std::pin::Pin;
|
||||
use taskrun::tokio;
|
||||
|
||||
@@ -22,23 +23,34 @@ pub async fn scylla_channel_event_stream(
|
||||
// TODO why both in PlainEventsQuery and as separate parameter? Check other usages.
|
||||
// let do_one_before_range = evq.need_one_before_range();
|
||||
let do_one_before_range = false;
|
||||
let series = chconf.series();
|
||||
let series = SeriesId::new(chconf.series());
|
||||
let scalar_type = chconf.scalar_type();
|
||||
let shape = chconf.shape();
|
||||
let do_test_stream_error = false;
|
||||
let with_values = evq.need_value_data();
|
||||
debug!("\n\nmake EventsStreamScylla {series:?} {scalar_type:?} {shape:?}\n");
|
||||
let stream = scyllaconn::events::EventsStreamScylla::new(
|
||||
RetentionTime::Short,
|
||||
series,
|
||||
evq.range().into(),
|
||||
do_one_before_range,
|
||||
scalar_type.clone(),
|
||||
shape.clone(),
|
||||
with_values,
|
||||
scyqueue.clone(),
|
||||
do_test_stream_error,
|
||||
);
|
||||
let stream: Pin<Box<dyn Stream<Item = _> + Send>> = if let Some(rt) = evq.use_rt() {
|
||||
let x = scyllaconn::events2::events::EventsStreamRt::new(
|
||||
rt,
|
||||
series,
|
||||
scalar_type.clone(),
|
||||
shape.clone(),
|
||||
evq.range().into(),
|
||||
with_values,
|
||||
scyqueue.clone(),
|
||||
)
|
||||
.map_err(|e| scyllaconn::events2::mergert::Error::from(e));
|
||||
Box::pin(x)
|
||||
} else {
|
||||
let x = scyllaconn::events2::mergert::MergeRts::new(
|
||||
series,
|
||||
scalar_type.clone(),
|
||||
shape.clone(),
|
||||
evq.range().into(),
|
||||
with_values,
|
||||
scyqueue.clone(),
|
||||
);
|
||||
Box::pin(x)
|
||||
};
|
||||
let stream = stream
|
||||
.map(move |item| match &item {
|
||||
Ok(k) => match k {
|
||||
@@ -72,7 +84,7 @@ pub async fn scylla_channel_event_stream(
|
||||
item
|
||||
}
|
||||
},
|
||||
Err(e) => Err(Error::with_msg_no_trace(format!("scyllaconn eevents error {e}"))),
|
||||
Err(e) => Err(Error::with_msg_no_trace(format!("scyllaconn events error {e}"))),
|
||||
};
|
||||
item
|
||||
});
|
||||
|
||||
@@ -10,6 +10,7 @@ serde_json = "1.0"
|
||||
tracing = "0.1"
|
||||
chrono = { version = "0.4.19", features = ["serde"] }
|
||||
url = "2.2"
|
||||
humantime = "2.1.0"
|
||||
humantime-serde = "1.1.1"
|
||||
err = { path = "../err" }
|
||||
netpod = { path = "../netpod" }
|
||||
|
||||
@@ -6,6 +6,7 @@ use chrono::TimeZone;
|
||||
use chrono::Utc;
|
||||
use err::Error;
|
||||
use netpod::get_url_query_pairs;
|
||||
use netpod::log::*;
|
||||
use netpod::range::evrange::SeriesRange;
|
||||
use netpod::AppendToUrl;
|
||||
use netpod::FromUrl;
|
||||
@@ -124,7 +125,13 @@ impl FromUrl for AccountingToplistQuery {
|
||||
let v = pairs
|
||||
.get("tsDate")
|
||||
.ok_or(Error::with_public_msg_no_trace("missing tsDate"))?;
|
||||
let w = v.parse::<DateTime<Utc>>()?;
|
||||
let mut w = v.parse::<DateTime<Utc>>();
|
||||
if w.is_err() && v.ends_with("ago") {
|
||||
let d = humantime::parse_duration(&v[..v.len() - 3])
|
||||
.map_err(|_| Error::with_public_msg_no_trace(format!("can not parse {v}")))?;
|
||||
w = Ok(Utc::now() - d);
|
||||
}
|
||||
let w = w?;
|
||||
Ok::<_, Error>(TsNano::from_ns(w.to_nanos()))
|
||||
};
|
||||
let ret = Self {
|
||||
|
||||
@@ -7,6 +7,7 @@ use netpod::query::api1::Api1Query;
|
||||
use netpod::query::PulseRangeQuery;
|
||||
use netpod::query::TimeRangeQuery;
|
||||
use netpod::range::evrange::SeriesRange;
|
||||
use netpod::ttl::RetentionTime;
|
||||
use netpod::AppendToUrl;
|
||||
use netpod::ByteSize;
|
||||
use netpod::ChannelTypeConfigGen;
|
||||
@@ -56,6 +57,8 @@ pub struct PlainEventsQuery {
|
||||
create_errors: Vec<String>,
|
||||
#[serde(default)]
|
||||
log_level: String,
|
||||
#[serde(default)]
|
||||
use_rt: Option<RetentionTime>,
|
||||
}
|
||||
|
||||
impl PlainEventsQuery {
|
||||
@@ -81,6 +84,7 @@ impl PlainEventsQuery {
|
||||
merger_out_len_max: None,
|
||||
create_errors: Vec::new(),
|
||||
log_level: String::new(),
|
||||
use_rt: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -206,6 +210,10 @@ impl PlainEventsQuery {
|
||||
pub fn log_level(&self) -> &str {
|
||||
&self.log_level
|
||||
}
|
||||
|
||||
pub fn use_rt(&self) -> Option<RetentionTime> {
|
||||
self.use_rt.clone()
|
||||
}
|
||||
}
|
||||
|
||||
impl HasBackend for PlainEventsQuery {
|
||||
@@ -283,6 +291,11 @@ impl FromUrl for PlainEventsQuery {
|
||||
.map(|x| x.split(",").map(|x| x.to_string()).collect())
|
||||
.unwrap_or(Vec::new()),
|
||||
log_level: pairs.get("log_level").map_or(String::new(), String::from),
|
||||
use_rt: pairs.get("useRt").map_or(Ok(None), |k| {
|
||||
k.parse()
|
||||
.map(Some)
|
||||
.map_err(|_| Error::with_public_msg_no_trace(format!("can not parse useRt: {}", k)))
|
||||
})?,
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
@@ -342,6 +355,9 @@ impl AppendToUrl for PlainEventsQuery {
|
||||
if self.log_level.len() != 0 {
|
||||
g.append_pair("log_level", &self.log_level);
|
||||
}
|
||||
if let Some(x) = self.use_rt.as_ref() {
|
||||
g.append_pair("useRt", &x.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -385,6 +401,7 @@ pub struct EventsSubQuerySettings {
|
||||
buf_len_disk_io: Option<usize>,
|
||||
queue_len_disk_io: Option<usize>,
|
||||
create_errors: Vec<String>,
|
||||
use_rt: Option<RetentionTime>,
|
||||
}
|
||||
|
||||
impl Default for EventsSubQuerySettings {
|
||||
@@ -398,6 +415,7 @@ impl Default for EventsSubQuerySettings {
|
||||
buf_len_disk_io: None,
|
||||
queue_len_disk_io: None,
|
||||
create_errors: Vec::new(),
|
||||
use_rt: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -414,6 +432,7 @@ impl From<&PlainEventsQuery> for EventsSubQuerySettings {
|
||||
// TODO add to query
|
||||
queue_len_disk_io: None,
|
||||
create_errors: value.create_errors.clone(),
|
||||
use_rt: value.use_rt(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -431,6 +450,7 @@ impl From<&BinnedQuery> for EventsSubQuerySettings {
|
||||
// TODO add to query
|
||||
queue_len_disk_io: None,
|
||||
create_errors: Vec::new(),
|
||||
use_rt: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -448,6 +468,7 @@ impl From<&Api1Query> for EventsSubQuerySettings {
|
||||
buf_len_disk_io: Some(disk_io_tune.read_buffer_len),
|
||||
queue_len_disk_io: Some(disk_io_tune.read_queue_len),
|
||||
create_errors: Vec::new(),
|
||||
use_rt: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -551,6 +572,10 @@ impl EventsSubQuery {
|
||||
pub fn log_level(&self) -> &str {
|
||||
&self.log_level
|
||||
}
|
||||
|
||||
pub fn use_rt(&self) -> Option<RetentionTime> {
|
||||
self.settings.use_rt.clone()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use crate::errconv::ErrConv;
|
||||
use err::Error;
|
||||
use netpod::log::*;
|
||||
use netpod::ScyllaConfig;
|
||||
use scylla::execution_profile::ExecutionProfileBuilder;
|
||||
use scylla::statement::Consistency;
|
||||
@@ -16,6 +17,7 @@ pub async fn create_scy_session(scyconf: &ScyllaConfig) -> Result<Arc<ScySession
|
||||
}
|
||||
|
||||
pub async fn create_scy_session_no_ks(scyconf: &ScyllaConfig) -> Result<ScySession, Error> {
|
||||
warn!("create_connection\n\n CREATING SCYLLA CONNECTION\n\n");
|
||||
let scy = scylla::SessionBuilder::new()
|
||||
.known_nodes(&scyconf.hosts)
|
||||
.default_execution_profile_handle(
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use crate::errconv::ErrConv;
|
||||
use crate::events2::prepare::StmtsEvents;
|
||||
use crate::range::ScyllaSeriesRange;
|
||||
use crate::worker::ScyllaQueue;
|
||||
use err::thiserror;
|
||||
@@ -23,9 +23,8 @@ use netpod::Shape;
|
||||
use netpod::TsMs;
|
||||
use netpod::TsNano;
|
||||
use scylla::frame::response::result::Row;
|
||||
use scylla::prepared_statement::PreparedStatement;
|
||||
use scylla::Session;
|
||||
use scylla::Session as ScySession;
|
||||
use series::SeriesId;
|
||||
use std::collections::VecDeque;
|
||||
use std::mem;
|
||||
use std::pin::Pin;
|
||||
@@ -35,6 +34,7 @@ use std::task::Poll;
|
||||
|
||||
#[derive(Debug, ThisError)]
|
||||
pub enum Error {
|
||||
Prepare(#[from] crate::events2::prepare::Error),
|
||||
ScyllaQuery(#[from] scylla::transport::errors::QueryError),
|
||||
ScyllaNextRow(#[from] scylla::transport::iterator::NextRowError),
|
||||
ScyllaTypeConv(#[from] scylla::cql_to_rust::FromRowError),
|
||||
@@ -51,268 +51,7 @@ impl From<crate::worker::Error> for Error {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct StmtsLspShape {
|
||||
u8: PreparedStatement,
|
||||
u16: PreparedStatement,
|
||||
u32: PreparedStatement,
|
||||
u64: PreparedStatement,
|
||||
i8: PreparedStatement,
|
||||
i16: PreparedStatement,
|
||||
i32: PreparedStatement,
|
||||
i64: PreparedStatement,
|
||||
f32: PreparedStatement,
|
||||
f64: PreparedStatement,
|
||||
bool: PreparedStatement,
|
||||
string: PreparedStatement,
|
||||
}
|
||||
|
||||
impl StmtsLspShape {
|
||||
fn st(&self, stname: &str) -> Result<&PreparedStatement, Error> {
|
||||
let ret = match stname {
|
||||
"u8" => &self.u8,
|
||||
_ => return Err(Error::MissingQuery(format!("no query for stname {stname}"))),
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct StmtsLspDir {
|
||||
scalar: StmtsLspShape,
|
||||
array: StmtsLspShape,
|
||||
}
|
||||
|
||||
impl StmtsLspDir {
|
||||
fn shape(&self, array: bool) -> &StmtsLspShape {
|
||||
if array {
|
||||
&self.array
|
||||
} else {
|
||||
&self.scalar
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct StmtsEventsRt {
|
||||
ts_msp_fwd: PreparedStatement,
|
||||
ts_msp_bck: PreparedStatement,
|
||||
lsp_fwd_val: StmtsLspDir,
|
||||
lsp_bck_val: StmtsLspDir,
|
||||
lsp_fwd_ts: StmtsLspDir,
|
||||
lsp_bck_ts: StmtsLspDir,
|
||||
}
|
||||
|
||||
impl StmtsEventsRt {
|
||||
fn lsp(&self, bck: bool, val: bool) -> &StmtsLspDir {
|
||||
if bck {
|
||||
if val {
|
||||
&self.lsp_bck_val
|
||||
} else {
|
||||
&self.lsp_bck_ts
|
||||
}
|
||||
} else {
|
||||
if val {
|
||||
&self.lsp_fwd_val
|
||||
} else {
|
||||
&self.lsp_fwd_ts
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct StmtsEvents {
|
||||
st: StmtsEventsRt,
|
||||
mt: StmtsEventsRt,
|
||||
lt: StmtsEventsRt,
|
||||
}
|
||||
|
||||
async fn make_msp_dir(ks: &str, rt: &RetentionTime, bck: bool, scy: &Session) -> Result<PreparedStatement, Error> {
|
||||
let table_name = "ts_msp";
|
||||
let select_cond = if bck {
|
||||
"ts_msp < ? order by ts_msp desc limit 2"
|
||||
} else {
|
||||
"ts_msp >= ? and ts_msp < ?"
|
||||
};
|
||||
let cql = format!(
|
||||
"select ts_msp from {}.{}{} where series = ? and {}",
|
||||
ks,
|
||||
rt.table_prefix(),
|
||||
table_name,
|
||||
select_cond
|
||||
);
|
||||
let qu = scy.prepare(cql).await?;
|
||||
Ok(qu)
|
||||
}
|
||||
|
||||
async fn make_lsp(
|
||||
ks: &str,
|
||||
rt: &RetentionTime,
|
||||
shapepre: &str,
|
||||
stname: &str,
|
||||
values: &str,
|
||||
bck: bool,
|
||||
scy: &Session,
|
||||
) -> Result<PreparedStatement, Error> {
|
||||
let select_cond = if bck {
|
||||
"ts_lsp < ? order by ts_lsp desc limit 1"
|
||||
} else {
|
||||
"ts_lsp >= ? and ts_lsp < ?"
|
||||
};
|
||||
let cql = format!(
|
||||
concat!(
|
||||
"select {} from {}.{}events_{}_{}",
|
||||
" where series = ? and ts_msp = ? and {}"
|
||||
),
|
||||
values,
|
||||
ks,
|
||||
rt.table_prefix(),
|
||||
shapepre,
|
||||
stname,
|
||||
select_cond
|
||||
);
|
||||
let qu = scy.prepare(cql).await?;
|
||||
Ok(qu)
|
||||
}
|
||||
|
||||
async fn make_lsp_shape(
|
||||
ks: &str,
|
||||
rt: &RetentionTime,
|
||||
shapepre: &str,
|
||||
values: &str,
|
||||
bck: bool,
|
||||
scy: &Session,
|
||||
) -> Result<StmtsLspShape, Error> {
|
||||
let values = if shapepre.contains("array") {
|
||||
values.replace("value", "valueblob")
|
||||
} else {
|
||||
values.into()
|
||||
};
|
||||
let values = &values;
|
||||
let maker = |stname| make_lsp(ks, rt, shapepre, stname, values, bck, scy);
|
||||
let ret = StmtsLspShape {
|
||||
u8: maker("u8").await?,
|
||||
u16: maker("u16").await?,
|
||||
u32: maker("u32").await?,
|
||||
u64: maker("u64").await?,
|
||||
i8: maker("i8").await?,
|
||||
i16: maker("i16").await?,
|
||||
i32: maker("i32").await?,
|
||||
i64: maker("i64").await?,
|
||||
f32: maker("f32").await?,
|
||||
f64: maker("f64").await?,
|
||||
bool: maker("bool").await?,
|
||||
string: maker("string").await?,
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
async fn make_lsp_dir(
|
||||
ks: &str,
|
||||
rt: &RetentionTime,
|
||||
values: &str,
|
||||
bck: bool,
|
||||
scy: &Session,
|
||||
) -> Result<StmtsLspDir, Error> {
|
||||
let ret = StmtsLspDir {
|
||||
scalar: make_lsp_shape(ks, rt, "scalar", values, bck, scy).await?,
|
||||
array: make_lsp_shape(ks, rt, "array", values, bck, scy).await?,
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
async fn make_rt(ks: &str, rt: &RetentionTime, scy: &Session) -> Result<StmtsEventsRt, Error> {
|
||||
let ret = StmtsEventsRt {
|
||||
ts_msp_fwd: make_msp_dir(ks, rt, false, scy).await?,
|
||||
ts_msp_bck: make_msp_dir(ks, rt, true, scy).await?,
|
||||
lsp_fwd_val: make_lsp_dir(ks, rt, "ts_lsp, pulse, value", false, scy).await?,
|
||||
lsp_bck_val: make_lsp_dir(ks, rt, "ts_lsp, pulse, value", true, scy).await?,
|
||||
lsp_fwd_ts: make_lsp_dir(ks, rt, "ts_lsp, pulse", false, scy).await?,
|
||||
lsp_bck_ts: make_lsp_dir(ks, rt, "ts_lsp, pulse", true, scy).await?,
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
impl StmtsEvents {
|
||||
pub(super) async fn new(ks: [&str; 3], scy: &Session) -> Result<Self, Error> {
|
||||
let ret = StmtsEvents {
|
||||
st: make_rt(ks[0], &RetentionTime::Short, scy).await?,
|
||||
mt: make_rt(ks[1], &RetentionTime::Medium, scy).await?,
|
||||
lt: make_rt(ks[2], &RetentionTime::Long, scy).await?,
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
fn rt(&self, rt: &RetentionTime) -> &StmtsEventsRt {
|
||||
match rt {
|
||||
RetentionTime::Short => &self.st,
|
||||
RetentionTime::Medium => &self.mt,
|
||||
RetentionTime::Long => &&self.lt,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) async fn find_ts_msp(
|
||||
rt: &RetentionTime,
|
||||
series: u64,
|
||||
range: ScyllaSeriesRange,
|
||||
bck: bool,
|
||||
stmts: &StmtsEvents,
|
||||
scy: &ScySession,
|
||||
) -> Result<VecDeque<TsMs>, Error> {
|
||||
trace!("find_ts_msp series {:?} {:?} {:?} bck {}", rt, series, range, bck);
|
||||
if bck {
|
||||
find_ts_msp_bck(rt, series, range, stmts, scy).await
|
||||
} else {
|
||||
find_ts_msp_fwd(rt, series, range, stmts, scy).await
|
||||
}
|
||||
}
|
||||
|
||||
async fn find_ts_msp_fwd(
|
||||
rt: &RetentionTime,
|
||||
series: u64,
|
||||
range: ScyllaSeriesRange,
|
||||
stmts: &StmtsEvents,
|
||||
scy: &ScySession,
|
||||
) -> Result<VecDeque<TsMs>, Error> {
|
||||
let mut ret = VecDeque::new();
|
||||
// TODO time range truncation can be handled better
|
||||
let params = (series as i64, range.beg().ms() as i64, 1 + range.end().ms() as i64);
|
||||
let mut res = scy
|
||||
.execute_iter(stmts.rt(rt).ts_msp_fwd.clone(), params)
|
||||
.await?
|
||||
.into_typed::<(i64,)>();
|
||||
while let Some(x) = res.next().await {
|
||||
let row = x?;
|
||||
let ts = TsMs::from_ms_u64(row.0 as u64);
|
||||
ret.push_back(ts);
|
||||
}
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
async fn find_ts_msp_bck(
|
||||
rt: &RetentionTime,
|
||||
series: u64,
|
||||
range: ScyllaSeriesRange,
|
||||
stmts: &StmtsEvents,
|
||||
scy: &ScySession,
|
||||
) -> Result<VecDeque<TsMs>, Error> {
|
||||
let mut ret = VecDeque::new();
|
||||
let params = (series as i64, range.beg().ms() as i64);
|
||||
let mut res = scy
|
||||
.execute_iter(stmts.rt(rt).ts_msp_bck.clone(), params)
|
||||
.await?
|
||||
.into_typed::<(i64,)>();
|
||||
while let Some(x) = res.next().await {
|
||||
let row = x?;
|
||||
let ts = TsMs::from_ms_u64(row.0 as u64);
|
||||
ret.push_front(ts);
|
||||
}
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
trait ValTy: Sized + 'static {
|
||||
pub(super) trait ValTy: Sized + 'static {
|
||||
type ScaTy: ScalarOps + std::default::Default;
|
||||
type ScyTy: scylla::cql_to_rust::FromCqlVal<scylla::frame::response::result::CqlValue>;
|
||||
type Container: Events + Appendable<Self>;
|
||||
@@ -449,7 +188,7 @@ impl_scaty_array!(Vec<f32>, f32, Vec<f32>, "f32", "f32");
|
||||
impl_scaty_array!(Vec<f64>, f64, Vec<f64>, "f64", "f64");
|
||||
impl_scaty_array!(Vec<bool>, bool, Vec<bool>, "bool", "bool");
|
||||
|
||||
struct ReadNextValuesOpts {
|
||||
pub(super) struct ReadNextValuesOpts {
|
||||
rt: RetentionTime,
|
||||
series: u64,
|
||||
ts_msp: TsMs,
|
||||
@@ -459,13 +198,35 @@ struct ReadNextValuesOpts {
|
||||
scyqueue: ScyllaQueue,
|
||||
}
|
||||
|
||||
async fn read_next_values<ST>(opts: ReadNextValuesOpts) -> Result<Box<dyn Events>, Error>
|
||||
impl ReadNextValuesOpts {
|
||||
pub(super) fn new(
|
||||
rt: RetentionTime,
|
||||
series: SeriesId,
|
||||
ts_msp: TsMs,
|
||||
range: ScyllaSeriesRange,
|
||||
fwd: bool,
|
||||
with_values: bool,
|
||||
scyqueue: ScyllaQueue,
|
||||
) -> Self {
|
||||
Self {
|
||||
rt,
|
||||
series: series.id(),
|
||||
ts_msp,
|
||||
range,
|
||||
fwd,
|
||||
with_values,
|
||||
scyqueue,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) async fn read_next_values<ST>(opts: ReadNextValuesOpts) -> Result<Box<dyn Events>, Error>
|
||||
where
|
||||
ST: ValTy,
|
||||
{
|
||||
// TODO could take scyqeue out of opts struct.
|
||||
let scyqueue = opts.scyqueue.clone();
|
||||
let futgen = Box::new(|scy: Arc<ScySession>, stmts: Arc<StmtsEvents>| {
|
||||
let futgen = Box::new(|scy: Arc<Session>, stmts: Arc<StmtsEvents>| {
|
||||
let fut = async {
|
||||
read_next_values_2::<ST>(opts, scy, stmts)
|
||||
.await
|
||||
@@ -479,7 +240,7 @@ where
|
||||
|
||||
async fn read_next_values_2<ST>(
|
||||
opts: ReadNextValuesOpts,
|
||||
scy: Arc<ScySession>,
|
||||
scy: Arc<Session>,
|
||||
stmts: Arc<StmtsEvents>,
|
||||
) -> Result<Box<dyn Events>, Error>
|
||||
where
|
||||
@@ -511,20 +272,6 @@ where
|
||||
ts_lsp_max,
|
||||
table_name,
|
||||
);
|
||||
let dir = "fwd";
|
||||
let qu_name = if opts.with_values {
|
||||
if ST::is_valueblob() {
|
||||
format!("array_{}_valueblobs_{}", ST::st_name(), dir)
|
||||
} else {
|
||||
format!("scalar_{}_values_{}", ST::st_name(), dir)
|
||||
}
|
||||
} else {
|
||||
if ST::is_valueblob() {
|
||||
format!("array_{}_timestamps_{}", ST::st_name(), dir)
|
||||
} else {
|
||||
format!("scalar_{}_timestamps_{}", ST::st_name(), dir)
|
||||
}
|
||||
};
|
||||
let qu = stmts
|
||||
.rt(&opts.rt)
|
||||
.lsp(!opts.fwd, opts.with_values)
|
||||
@@ -552,20 +299,6 @@ where
|
||||
DtNano::from_ns(0)
|
||||
};
|
||||
trace!("BCK ts_msp {} ts_lsp_max {} {}", ts_msp, ts_lsp_max, table_name,);
|
||||
let dir = "bck";
|
||||
let qu_name = if opts.with_values {
|
||||
if ST::is_valueblob() {
|
||||
format!("array_{}_valueblobs_{}", ST::st_name(), dir)
|
||||
} else {
|
||||
format!("scalar_{}_values_{}", ST::st_name(), dir)
|
||||
}
|
||||
} else {
|
||||
if ST::is_valueblob() {
|
||||
format!("array_{}_timestamps_{}", ST::st_name(), dir)
|
||||
} else {
|
||||
format!("scalar_{}_timestamps_{}", ST::st_name(), dir)
|
||||
}
|
||||
};
|
||||
let qu = stmts
|
||||
.rt(&opts.rt)
|
||||
.lsp(!opts.fwd, opts.with_values)
|
||||
@@ -648,7 +381,7 @@ fn convert_rows<ST: ValTy>(
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
struct ReadValues {
|
||||
pub(super) struct ReadValues {
|
||||
rt: RetentionTime,
|
||||
series: u64,
|
||||
scalar_type: ScalarType,
|
||||
@@ -663,7 +396,7 @@ struct ReadValues {
|
||||
}
|
||||
|
||||
impl ReadValues {
|
||||
fn new(
|
||||
pub(super) fn new(
|
||||
rt: RetentionTime,
|
||||
series: u64,
|
||||
scalar_type: ScalarType,
|
||||
@@ -795,7 +528,7 @@ pub struct EventsStreamScylla {
|
||||
}
|
||||
|
||||
impl EventsStreamScylla {
|
||||
pub fn new(
|
||||
pub fn _new(
|
||||
rt: RetentionTime,
|
||||
series: u64,
|
||||
range: ScyllaSeriesRange,
|
||||
@@ -956,16 +689,6 @@ impl EventsStreamScylla {
|
||||
}
|
||||
}
|
||||
|
||||
async fn find_ts_msp_via_queue(
|
||||
rt: RetentionTime,
|
||||
series: u64,
|
||||
range: ScyllaSeriesRange,
|
||||
bck: bool,
|
||||
scyqueue: ScyllaQueue,
|
||||
) -> Result<VecDeque<TsMs>, crate::worker::Error> {
|
||||
scyqueue.find_ts_msp(rt, series, range, bck).await
|
||||
}
|
||||
|
||||
impl Stream for EventsStreamScylla {
|
||||
type Item = Result<ChannelEvents, Error>;
|
||||
|
||||
@@ -998,8 +721,9 @@ impl Stream for EventsStreamScylla {
|
||||
let series = self.series.clone();
|
||||
let range = self.range.clone();
|
||||
// TODO this no longer works, we miss the backwards part here
|
||||
let fut = find_ts_msp_via_queue(self.rt.clone(), series, range, false, self.scyqueue.clone());
|
||||
let fut = Box::pin(fut);
|
||||
// let fut = find_ts_msp_via_queue(self.rt.clone(), series, range, false, self.scyqueue.clone());
|
||||
// let fut = Box::pin(fut);
|
||||
let fut = todo!();
|
||||
self.state = FrState::FindMsp(fut);
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -1 +1,6 @@
|
||||
pub mod events;
|
||||
pub mod firstbefore;
|
||||
pub mod mergert;
|
||||
pub mod msp;
|
||||
pub mod nonempty;
|
||||
pub mod prepare;
|
||||
|
||||
360
crates/scyllaconn/src/events2/events.rs
Normal file
360
crates/scyllaconn/src/events2/events.rs
Normal file
@@ -0,0 +1,360 @@
|
||||
use super::msp::MspStreamRt;
|
||||
use crate::events::read_next_values;
|
||||
use crate::events::ReadNextValuesOpts;
|
||||
use crate::range::ScyllaSeriesRange;
|
||||
use crate::worker::ScyllaQueue;
|
||||
use err::thiserror;
|
||||
use err::ThisError;
|
||||
use futures_util::Future;
|
||||
use futures_util::FutureExt;
|
||||
use futures_util::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use items_0::Events;
|
||||
use items_2::channelevents::ChannelEvents;
|
||||
use netpod::log::*;
|
||||
use netpod::ttl::RetentionTime;
|
||||
use netpod::ScalarType;
|
||||
use netpod::Shape;
|
||||
use netpod::TsMs;
|
||||
use series::SeriesId;
|
||||
use std::collections::VecDeque;
|
||||
use std::pin::Pin;
|
||||
use std::task::Context;
|
||||
use std::task::Poll;
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! trace_emit {
|
||||
($($arg:tt)*) => {
|
||||
if true {
|
||||
trace!($($arg)*);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! warn_item {
|
||||
($($arg:tt)*) => {
|
||||
if true {
|
||||
debug!($($arg)*);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
#[derive(Debug, ThisError)]
|
||||
pub enum Error {
|
||||
Worker(#[from] crate::worker::Error),
|
||||
Events(#[from] crate::events::Error),
|
||||
Msp(#[from] crate::events2::msp::Error),
|
||||
Unordered,
|
||||
OutOfRange,
|
||||
BadBatch,
|
||||
Logic,
|
||||
Merge(#[from] items_0::MergeError),
|
||||
TruncateLogic,
|
||||
}
|
||||
|
||||
struct FetchMsp {
|
||||
fut: Pin<Box<dyn Future<Output = Option<Result<TsMs, crate::events2::msp::Error>>> + Send>>,
|
||||
}
|
||||
|
||||
struct FetchEvents {
|
||||
fut: Pin<Box<dyn Future<Output = Result<Box<dyn Events>, crate::events2::events::Error>> + Send>>,
|
||||
}
|
||||
|
||||
enum ReadingState {
|
||||
FetchMsp(FetchMsp),
|
||||
FetchEvents(FetchEvents),
|
||||
}
|
||||
|
||||
struct Reading {
|
||||
scyqueue: ScyllaQueue,
|
||||
reading_state: ReadingState,
|
||||
}
|
||||
|
||||
enum State {
|
||||
Begin,
|
||||
Reading(Reading),
|
||||
InputDone,
|
||||
Done,
|
||||
}
|
||||
|
||||
pub struct EventsStreamRt {
|
||||
rt: RetentionTime,
|
||||
series: SeriesId,
|
||||
scalar_type: ScalarType,
|
||||
shape: Shape,
|
||||
range: ScyllaSeriesRange,
|
||||
with_values: bool,
|
||||
state: State,
|
||||
scyqueue: ScyllaQueue,
|
||||
msp_inp: MspStreamRt,
|
||||
out: VecDeque<Box<dyn Events>>,
|
||||
ts_seen_max: u64,
|
||||
}
|
||||
|
||||
impl EventsStreamRt {
|
||||
pub fn new(
|
||||
rt: RetentionTime,
|
||||
series: SeriesId,
|
||||
scalar_type: ScalarType,
|
||||
shape: Shape,
|
||||
range: ScyllaSeriesRange,
|
||||
with_values: bool,
|
||||
scyqueue: ScyllaQueue,
|
||||
) -> Self {
|
||||
debug!("EventsStreamRt::new {series:?} {range:?} {rt:?}");
|
||||
let msp_inp =
|
||||
crate::events2::msp::MspStreamRt::new(rt.clone(), series.clone(), range.clone(), scyqueue.clone());
|
||||
Self {
|
||||
rt,
|
||||
series,
|
||||
scalar_type,
|
||||
shape,
|
||||
range,
|
||||
with_values,
|
||||
state: State::Begin,
|
||||
scyqueue,
|
||||
msp_inp,
|
||||
out: VecDeque::new(),
|
||||
ts_seen_max: 0,
|
||||
}
|
||||
}
|
||||
|
||||
fn __handle_reading(self: Pin<&mut Self>, st: &mut Reading, cx: &mut Context) -> Result<(), Error> {
|
||||
let _ = st;
|
||||
let _ = cx;
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn make_read_events_fut(
|
||||
&mut self,
|
||||
ts_msp: TsMs,
|
||||
scyqueue: ScyllaQueue,
|
||||
) -> Pin<Box<dyn Future<Output = Result<Box<dyn Events>, Error>> + Send>> {
|
||||
let fwd = true;
|
||||
let opts = ReadNextValuesOpts::new(
|
||||
self.rt.clone(),
|
||||
self.series.clone(),
|
||||
ts_msp,
|
||||
self.range.clone(),
|
||||
fwd,
|
||||
self.with_values,
|
||||
scyqueue,
|
||||
);
|
||||
let scalar_type = self.scalar_type.clone();
|
||||
let shape = self.shape.clone();
|
||||
let fut = async move {
|
||||
let ret = match &shape {
|
||||
Shape::Scalar => match &scalar_type {
|
||||
ScalarType::U8 => read_next_values::<u8>(opts).await,
|
||||
ScalarType::U16 => read_next_values::<u16>(opts).await,
|
||||
ScalarType::U32 => read_next_values::<u32>(opts).await,
|
||||
ScalarType::U64 => read_next_values::<u64>(opts).await,
|
||||
ScalarType::I8 => read_next_values::<i8>(opts).await,
|
||||
ScalarType::I16 => read_next_values::<i16>(opts).await,
|
||||
ScalarType::I32 => read_next_values::<i32>(opts).await,
|
||||
ScalarType::I64 => read_next_values::<i64>(opts).await,
|
||||
ScalarType::F32 => read_next_values::<f32>(opts).await,
|
||||
ScalarType::F64 => read_next_values::<f64>(opts).await,
|
||||
ScalarType::BOOL => read_next_values::<bool>(opts).await,
|
||||
ScalarType::STRING => read_next_values::<String>(opts).await,
|
||||
ScalarType::Enum => read_next_values::<String>(opts).await,
|
||||
ScalarType::ChannelStatus => {
|
||||
warn!("read scalar channel status not yet supported");
|
||||
err::todoval()
|
||||
}
|
||||
},
|
||||
Shape::Wave(_) => match &scalar_type {
|
||||
ScalarType::U8 => read_next_values::<Vec<u8>>(opts).await,
|
||||
ScalarType::U16 => read_next_values::<Vec<u16>>(opts).await,
|
||||
ScalarType::U32 => read_next_values::<Vec<u32>>(opts).await,
|
||||
ScalarType::U64 => read_next_values::<Vec<u64>>(opts).await,
|
||||
ScalarType::I8 => read_next_values::<Vec<i8>>(opts).await,
|
||||
ScalarType::I16 => read_next_values::<Vec<i16>>(opts).await,
|
||||
ScalarType::I32 => read_next_values::<Vec<i32>>(opts).await,
|
||||
ScalarType::I64 => read_next_values::<Vec<i64>>(opts).await,
|
||||
ScalarType::F32 => read_next_values::<Vec<f32>>(opts).await,
|
||||
ScalarType::F64 => read_next_values::<Vec<f64>>(opts).await,
|
||||
ScalarType::BOOL => read_next_values::<Vec<bool>>(opts).await,
|
||||
ScalarType::STRING => {
|
||||
warn!("read array string not yet supported");
|
||||
err::todoval()
|
||||
}
|
||||
ScalarType::Enum => read_next_values::<Vec<String>>(opts).await,
|
||||
ScalarType::ChannelStatus => {
|
||||
warn!("read array channel status not yet supported");
|
||||
err::todoval()
|
||||
}
|
||||
},
|
||||
_ => {
|
||||
error!("TODO ReadValues add more types");
|
||||
err::todoval()
|
||||
}
|
||||
};
|
||||
ret.map_err(Error::from)
|
||||
};
|
||||
Box::pin(fut)
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for EventsStreamRt {
|
||||
type Item = Result<ChannelEvents, Error>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use Poll::*;
|
||||
loop {
|
||||
if let Some(mut item) = self.out.pop_front() {
|
||||
if !item.verify() {
|
||||
warn_item!("{}bad item {:?}", "\n\n--------------------------\n", item);
|
||||
self.state = State::Done;
|
||||
break Ready(Some(Err(Error::BadBatch)));
|
||||
}
|
||||
if let Some(item_min) = item.ts_min() {
|
||||
if item_min < self.range.beg().ns() {
|
||||
warn_item!(
|
||||
"{}out of range error A {} {:?}",
|
||||
"\n\n--------------------------\n",
|
||||
item_min,
|
||||
self.range
|
||||
);
|
||||
self.state = State::Done;
|
||||
break Ready(Some(Err(Error::OutOfRange)));
|
||||
}
|
||||
if item_min < self.ts_seen_max {
|
||||
warn_item!(
|
||||
"{}ordering error A {} {}",
|
||||
"\n\n--------------------------\n",
|
||||
item_min,
|
||||
self.ts_seen_max
|
||||
);
|
||||
let mut r = items_2::merger::Mergeable::new_empty(&item);
|
||||
match items_2::merger::Mergeable::find_highest_index_lt(&item, self.ts_seen_max) {
|
||||
Some(ix) => {
|
||||
match items_2::merger::Mergeable::drain_into(&mut item, &mut r, (0, ix)) {
|
||||
Ok(()) => {}
|
||||
Err(e) => {
|
||||
self.state = State::Done;
|
||||
break Ready(Some(Err(e.into())));
|
||||
}
|
||||
}
|
||||
// self.state = State::Done;
|
||||
// break Ready(Some(Err(Error::Unordered)));
|
||||
}
|
||||
None => {
|
||||
self.state = State::Done;
|
||||
break Ready(Some(Err(Error::TruncateLogic)));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if let Some(item_max) = item.ts_max() {
|
||||
if item_max >= self.range.end().ns() {
|
||||
warn_item!(
|
||||
"{}out of range error B {} {:?}",
|
||||
"\n\n--------------------------\n",
|
||||
item_max,
|
||||
self.range
|
||||
);
|
||||
self.state = State::Done;
|
||||
break Ready(Some(Err(Error::OutOfRange)));
|
||||
}
|
||||
if item_max < self.ts_seen_max {
|
||||
warn_item!(
|
||||
"{}ordering error B {} {}",
|
||||
"\n\n--------------------------\n",
|
||||
item_max,
|
||||
self.ts_seen_max
|
||||
);
|
||||
self.state = State::Done;
|
||||
break Ready(Some(Err(Error::Unordered)));
|
||||
} else {
|
||||
self.ts_seen_max = item_max;
|
||||
}
|
||||
}
|
||||
trace_emit!("deliver item {}", item.output_info());
|
||||
break Ready(Some(Ok(ChannelEvents::Events(item))));
|
||||
}
|
||||
break match &mut self.state {
|
||||
State::Begin => {
|
||||
let msp_inp = unsafe {
|
||||
let ptr = (&mut self.msp_inp) as *mut MspStreamRt;
|
||||
&mut *ptr
|
||||
};
|
||||
let fut = Box::pin(msp_inp.next());
|
||||
self.state = State::Reading(Reading {
|
||||
scyqueue: self.scyqueue.clone(),
|
||||
reading_state: ReadingState::FetchMsp(FetchMsp { fut }),
|
||||
});
|
||||
continue;
|
||||
}
|
||||
State::Reading(st) => match &mut st.reading_state {
|
||||
ReadingState::FetchMsp(st2) => match st2.fut.poll_unpin(cx) {
|
||||
Ready(Some(Ok(ts))) => {
|
||||
let scyqueue = st.scyqueue.clone();
|
||||
let fut = self.make_read_events_fut(ts, scyqueue);
|
||||
if let State::Reading(st) = &mut self.state {
|
||||
st.reading_state = ReadingState::FetchEvents(FetchEvents { fut });
|
||||
continue;
|
||||
} else {
|
||||
self.state = State::Done;
|
||||
Ready(Some(Err(Error::Logic)))
|
||||
}
|
||||
}
|
||||
Ready(Some(Err(e))) => Ready(Some(Err(e.into()))),
|
||||
Ready(None) => {
|
||||
self.state = State::InputDone;
|
||||
continue;
|
||||
}
|
||||
Pending => Pending,
|
||||
},
|
||||
ReadingState::FetchEvents(st2) => match st2.fut.poll_unpin(cx) {
|
||||
Ready(Ok(x)) => {
|
||||
self.out.push_back(x);
|
||||
let msp_inp = unsafe {
|
||||
let ptr = (&mut self.msp_inp) as *mut MspStreamRt;
|
||||
&mut *ptr
|
||||
};
|
||||
let fut = Box::pin(msp_inp.next());
|
||||
if let State::Reading(st) = &mut self.state {
|
||||
st.reading_state = ReadingState::FetchMsp(FetchMsp { fut });
|
||||
continue;
|
||||
} else {
|
||||
self.state = State::Done;
|
||||
Ready(Some(Err(Error::Logic)))
|
||||
}
|
||||
}
|
||||
Ready(Err(e)) => {
|
||||
self.state = State::Done;
|
||||
Ready(Some(Err(e.into())))
|
||||
}
|
||||
Pending => Pending,
|
||||
},
|
||||
},
|
||||
State::InputDone => {
|
||||
if self.out.len() == 0 {
|
||||
Ready(None)
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
State::Done => Ready(None),
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn trait_assert<T>(_: T)
|
||||
where
|
||||
T: Stream + Unpin + Send,
|
||||
{
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
fn trait_assert_try() {
|
||||
let x: EventsStreamRt = phantomval();
|
||||
trait_assert(x);
|
||||
}
|
||||
|
||||
fn phantomval<T>() -> T {
|
||||
panic!()
|
||||
}
|
||||
179
crates/scyllaconn/src/events2/firstbefore.rs
Normal file
179
crates/scyllaconn/src/events2/firstbefore.rs
Normal file
@@ -0,0 +1,179 @@
|
||||
use err::thiserror;
|
||||
use err::ThisError;
|
||||
use futures_util::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use items_0::Events;
|
||||
use items_2::merger::Mergeable;
|
||||
use netpod::log::*;
|
||||
use netpod::TsNano;
|
||||
use std::pin::Pin;
|
||||
use std::task::Context;
|
||||
use std::task::Poll;
|
||||
|
||||
#[derive(Debug, ThisError)]
|
||||
pub enum Error {
|
||||
Unordered,
|
||||
Logic,
|
||||
Input(Box<dyn std::error::Error + Send>),
|
||||
}
|
||||
|
||||
pub enum Output<T> {
|
||||
First(T, T),
|
||||
Bulk(T),
|
||||
}
|
||||
|
||||
enum State {
|
||||
Begin,
|
||||
Bulk,
|
||||
Done,
|
||||
}
|
||||
|
||||
pub struct FirstBeforeAndInside<S, T>
|
||||
where
|
||||
S: Stream + Unpin,
|
||||
T: Events + Mergeable + Unpin,
|
||||
{
|
||||
ts0: TsNano,
|
||||
inp: S,
|
||||
state: State,
|
||||
buf: Option<T>,
|
||||
}
|
||||
|
||||
impl<S, T> FirstBeforeAndInside<S, T>
|
||||
where
|
||||
S: Stream + Unpin,
|
||||
T: Events + Mergeable + Unpin,
|
||||
{
|
||||
pub fn new(inp: S, ts0: TsNano) -> Self {
|
||||
Self {
|
||||
ts0,
|
||||
inp,
|
||||
state: State::Begin,
|
||||
buf: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, T, E> Stream for FirstBeforeAndInside<S, T>
|
||||
where
|
||||
S: Stream<Item = Result<T, E>> + Unpin,
|
||||
T: Events + Mergeable + Unpin,
|
||||
E: std::error::Error + Send + 'static,
|
||||
{
|
||||
type Item = Result<Output<T>, Error>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use Poll::*;
|
||||
loop {
|
||||
break match &self.state {
|
||||
State::Begin => match self.inp.poll_next_unpin(cx) {
|
||||
Ready(Some(Ok(mut item))) => {
|
||||
// It is an invariant that we process ordered streams, but for robustness
|
||||
// verify the batch item again:
|
||||
if item.verify() != true {
|
||||
self.state = State::Done;
|
||||
let e = Error::Unordered;
|
||||
Ready(Some(Err(e)))
|
||||
} else {
|
||||
// Separate events into before and bulk
|
||||
let tss = item.tss();
|
||||
let pp = tss.partition_point(|&x| x < self.ts0.ns());
|
||||
if pp >= tss.len() {
|
||||
// all entries are before
|
||||
if self.buf.is_none() {
|
||||
self.buf = Some(item.new_empty());
|
||||
}
|
||||
match item.drain_into_evs(self.buf.as_mut().unwrap(), (0, item.len())) {
|
||||
Ok(()) => {
|
||||
continue;
|
||||
}
|
||||
Err(e) => {
|
||||
self.state = State::Done;
|
||||
Ready(Some(Err(Error::Input(Box::new(e)))))
|
||||
}
|
||||
}
|
||||
} else if pp == 0 {
|
||||
// all entries are bulk
|
||||
debug!("transition immediately to bulk");
|
||||
self.state = State::Bulk;
|
||||
let o1 = core::mem::replace(&mut self.buf, Some(item.new_empty()))
|
||||
.unwrap_or_else(|| item.new_empty());
|
||||
Ready(Some(Ok(Output::First(o1, item))))
|
||||
} else {
|
||||
// mixed
|
||||
match item.drain_into_evs(self.buf.as_mut().unwrap(), (0, pp)) {
|
||||
Ok(()) => {
|
||||
debug!("transition with mixed to bulk");
|
||||
self.state = State::Bulk;
|
||||
let o1 = core::mem::replace(&mut self.buf, Some(item.new_empty()))
|
||||
.unwrap_or_else(|| item.new_empty());
|
||||
Ready(Some(Ok(Output::First(o1, item))))
|
||||
}
|
||||
Err(e) => {
|
||||
self.state = State::Done;
|
||||
Ready(Some(Err(Error::Input(Box::new(e)))))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ready(Some(Err(e))) => {
|
||||
self.state = State::Done;
|
||||
Ready(Some(Err(Error::Input(Box::new(e)))))
|
||||
}
|
||||
Ready(None) => {
|
||||
self.state = State::Done;
|
||||
Ready(None)
|
||||
}
|
||||
Pending => Pending,
|
||||
},
|
||||
State::Bulk => {
|
||||
if self.buf.as_ref().map_or(0, |x| x.len()) != 0 {
|
||||
self.state = State::Done;
|
||||
Ready(Some(Err(Error::Logic)))
|
||||
} else {
|
||||
match self.inp.poll_next_unpin(cx) {
|
||||
Ready(Some(Ok(item))) => {
|
||||
if item.verify() != true {
|
||||
self.state = State::Done;
|
||||
let e = Error::Unordered;
|
||||
Ready(Some(Err(e)))
|
||||
} else {
|
||||
debug!("output bulk item len {}", item.len());
|
||||
Ready(Some(Ok(Output::Bulk(item))))
|
||||
}
|
||||
}
|
||||
Ready(Some(Err(e))) => {
|
||||
self.state = State::Done;
|
||||
Ready(Some(Err(Error::Input(Box::new(e)))))
|
||||
}
|
||||
Ready(None) => {
|
||||
debug!("in bulk, input done");
|
||||
self.state = State::Done;
|
||||
Ready(None)
|
||||
}
|
||||
Pending => Pending,
|
||||
}
|
||||
}
|
||||
}
|
||||
State::Done => Ready(None),
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn trait_assert<T>(_: T)
|
||||
where
|
||||
T: Stream + Unpin + Send,
|
||||
{
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
fn trait_assert_try() {
|
||||
let x: FirstBeforeAndInside<super::events::EventsStreamRt, items_2::channelevents::ChannelEvents> = phantomval();
|
||||
trait_assert(x);
|
||||
}
|
||||
|
||||
fn phantomval<T>() -> T {
|
||||
panic!()
|
||||
}
|
||||
559
crates/scyllaconn/src/events2/mergert.rs
Normal file
559
crates/scyllaconn/src/events2/mergert.rs
Normal file
@@ -0,0 +1,559 @@
|
||||
use super::events::EventsStreamRt;
|
||||
use super::firstbefore::FirstBeforeAndInside;
|
||||
use crate::events2::firstbefore;
|
||||
use crate::range::ScyllaSeriesRange;
|
||||
use crate::worker::ScyllaQueue;
|
||||
use err::thiserror;
|
||||
use err::ThisError;
|
||||
use futures_util::Future;
|
||||
use futures_util::FutureExt;
|
||||
use futures_util::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use items_0::WithLen;
|
||||
use items_2::channelevents::ChannelEvents;
|
||||
use items_2::merger::Mergeable;
|
||||
use netpod::log::*;
|
||||
use netpod::range::evrange::NanoRange;
|
||||
use netpod::range::evrange::SeriesRange;
|
||||
use netpod::ttl::RetentionTime;
|
||||
use netpod::ScalarType;
|
||||
use netpod::Shape;
|
||||
use series::SeriesId;
|
||||
use std::collections::VecDeque;
|
||||
use std::pin::Pin;
|
||||
use std::task::Context;
|
||||
use std::task::Poll;
|
||||
|
||||
#[derive(Debug, ThisError)]
|
||||
pub enum Error {
|
||||
Input(#[from] crate::events2::firstbefore::Error),
|
||||
Events(#[from] crate::events2::events::Error),
|
||||
Logic,
|
||||
}
|
||||
|
||||
enum Resolvable<F>
|
||||
where
|
||||
F: Future,
|
||||
{
|
||||
Future(F),
|
||||
Output(<F as Future>::Output),
|
||||
Taken,
|
||||
}
|
||||
|
||||
impl<F> Resolvable<F>
|
||||
where
|
||||
F: Future,
|
||||
{
|
||||
fn unresolved(&self) -> bool {
|
||||
match self {
|
||||
Resolvable::Future(_) => true,
|
||||
Resolvable::Output(_) => false,
|
||||
Resolvable::Taken => false,
|
||||
}
|
||||
}
|
||||
|
||||
fn take(&mut self) -> Option<<F as Future>::Output> {
|
||||
let x = std::mem::replace(self, Resolvable::Taken);
|
||||
match x {
|
||||
Resolvable::Future(_) => None,
|
||||
Resolvable::Output(x) => Some(x),
|
||||
Resolvable::Taken => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<F> Future for Resolvable<F>
|
||||
where
|
||||
F: Future + Unpin,
|
||||
{
|
||||
type Output = <F as Future>::Output;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<<F as Future>::Output> {
|
||||
match unsafe { self.get_unchecked_mut() } {
|
||||
Resolvable::Future(fut) => fut.poll_unpin(cx),
|
||||
Resolvable::Output(_) => panic!(),
|
||||
Resolvable::Taken => panic!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type TI = FirstBeforeAndInside<EventsStreamRt, ChannelEvents>;
|
||||
type INPI = Result<crate::events2::firstbefore::Output<ChannelEvents>, crate::events2::firstbefore::Error>;
|
||||
|
||||
struct ReadEvents {
|
||||
fut: Pin<Box<dyn Future<Output = Option<INPI>> + Send>>,
|
||||
}
|
||||
|
||||
enum State {
|
||||
Begin,
|
||||
FetchFirstSt(ReadEvents),
|
||||
FetchFirstMt(ReadEvents),
|
||||
FetchFirstLt(ReadEvents),
|
||||
ReadingLt(Option<ReadEvents>, VecDeque<ChannelEvents>, Option<Box<TI>>),
|
||||
ReadingMt(Option<ReadEvents>, VecDeque<ChannelEvents>, Option<Box<TI>>),
|
||||
ReadingSt(Option<ReadEvents>, VecDeque<ChannelEvents>, Option<Box<TI>>),
|
||||
Done,
|
||||
}
|
||||
|
||||
pub struct MergeRts {
|
||||
series: SeriesId,
|
||||
scalar_type: ScalarType,
|
||||
shape: Shape,
|
||||
range: ScyllaSeriesRange,
|
||||
range_mt: ScyllaSeriesRange,
|
||||
range_lt: ScyllaSeriesRange,
|
||||
with_values: bool,
|
||||
scyqueue: ScyllaQueue,
|
||||
inp_st: Option<Box<TI>>,
|
||||
inp_mt: Option<Box<TI>>,
|
||||
inp_lt: Option<Box<TI>>,
|
||||
state: State,
|
||||
buf_st: VecDeque<ChannelEvents>,
|
||||
buf_mt: VecDeque<ChannelEvents>,
|
||||
buf_lt: VecDeque<ChannelEvents>,
|
||||
out: VecDeque<ChannelEvents>,
|
||||
buf_before: Option<ChannelEvents>,
|
||||
ts_seen_max: u64,
|
||||
}
|
||||
|
||||
impl MergeRts {
|
||||
pub fn new(
|
||||
series: SeriesId,
|
||||
scalar_type: ScalarType,
|
||||
shape: Shape,
|
||||
range: ScyllaSeriesRange,
|
||||
with_values: bool,
|
||||
scyqueue: ScyllaQueue,
|
||||
) -> Self {
|
||||
Self {
|
||||
series,
|
||||
scalar_type,
|
||||
shape,
|
||||
range_mt: range.clone(),
|
||||
range_lt: range.clone(),
|
||||
range,
|
||||
with_values,
|
||||
scyqueue,
|
||||
inp_st: None,
|
||||
inp_mt: None,
|
||||
inp_lt: None,
|
||||
state: State::Begin,
|
||||
buf_st: VecDeque::new(),
|
||||
buf_mt: VecDeque::new(),
|
||||
buf_lt: VecDeque::new(),
|
||||
out: VecDeque::new(),
|
||||
buf_before: None,
|
||||
ts_seen_max: 0,
|
||||
}
|
||||
}
|
||||
|
||||
fn setup_first_st(&mut self) {
|
||||
let rt = RetentionTime::Short;
|
||||
let limbuf = &VecDeque::new();
|
||||
let inpdst = &mut self.inp_st;
|
||||
let range = Self::constrained_range(&self.range, limbuf);
|
||||
debug!("setup_first_st constrained beg {}", range.beg().ns());
|
||||
let tsbeg = range.beg();
|
||||
let inp = EventsStreamRt::new(
|
||||
rt,
|
||||
self.series.clone(),
|
||||
self.scalar_type.clone(),
|
||||
self.shape.clone(),
|
||||
range,
|
||||
self.with_values,
|
||||
self.scyqueue.clone(),
|
||||
);
|
||||
let inp = TI::new(inp, tsbeg);
|
||||
*inpdst = Some(Box::new(inp));
|
||||
}
|
||||
|
||||
fn setup_first_mt(&mut self) {
|
||||
let rt = RetentionTime::Medium;
|
||||
let limbuf = &self.buf_st;
|
||||
let inpdst = &mut self.inp_mt;
|
||||
let range = Self::constrained_range(&self.range_mt, limbuf);
|
||||
self.range_lt = range.clone();
|
||||
debug!("setup_first_mt constrained beg {}", range.beg().ns());
|
||||
let tsbeg = range.beg();
|
||||
let inp = EventsStreamRt::new(
|
||||
rt,
|
||||
self.series.clone(),
|
||||
self.scalar_type.clone(),
|
||||
self.shape.clone(),
|
||||
range,
|
||||
self.with_values,
|
||||
self.scyqueue.clone(),
|
||||
);
|
||||
let inp = TI::new(inp, tsbeg);
|
||||
*inpdst = Some(Box::new(inp));
|
||||
}
|
||||
|
||||
fn setup_first_lt(&mut self) {
|
||||
let rt = RetentionTime::Long;
|
||||
let limbuf = &self.buf_mt;
|
||||
let inpdst = &mut self.inp_lt;
|
||||
let range = Self::constrained_range(&self.range_lt, limbuf);
|
||||
debug!("setup_first_lt constrained beg {}", range.beg().ns());
|
||||
let tsbeg = range.beg();
|
||||
let inp = EventsStreamRt::new(
|
||||
rt,
|
||||
self.series.clone(),
|
||||
self.scalar_type.clone(),
|
||||
self.shape.clone(),
|
||||
range,
|
||||
self.with_values,
|
||||
self.scyqueue.clone(),
|
||||
);
|
||||
let inp = TI::new(inp, tsbeg);
|
||||
*inpdst = Some(Box::new(inp));
|
||||
}
|
||||
|
||||
fn setup_read_st(&mut self) -> ReadEvents {
|
||||
let stream = unsafe { &mut *(self.inp_st.as_mut().unwrap().as_mut() as *mut TI) };
|
||||
let fut = Box::pin(stream.next());
|
||||
ReadEvents { fut }
|
||||
}
|
||||
|
||||
fn setup_read_mt(&mut self) -> ReadEvents {
|
||||
let stream = unsafe { &mut *(self.inp_mt.as_mut().unwrap().as_mut() as *mut TI) };
|
||||
let fut = Box::pin(stream.next());
|
||||
ReadEvents { fut }
|
||||
}
|
||||
|
||||
fn setup_read_lt(&mut self) -> ReadEvents {
|
||||
let stream = unsafe { &mut *(self.inp_lt.as_mut().unwrap().as_mut() as *mut TI) };
|
||||
let fut = Box::pin(stream.next());
|
||||
ReadEvents { fut }
|
||||
}
|
||||
|
||||
fn setup_read_any(inp: &mut Option<Box<TI>>) -> ReadEvents {
|
||||
let stream = unsafe { &mut *(inp.as_mut().unwrap().as_mut() as *mut TI) };
|
||||
let fut = Box::pin(stream.next());
|
||||
ReadEvents { fut }
|
||||
}
|
||||
|
||||
fn constrained_range(full: &ScyllaSeriesRange, buf: &VecDeque<ChannelEvents>) -> ScyllaSeriesRange {
|
||||
debug!("constrained_range {:?} {:?}", full, buf.front());
|
||||
if let Some(e) = buf.front() {
|
||||
if let Some(ts) = e.ts_min() {
|
||||
let nrange = NanoRange::from((full.beg().ns(), ts));
|
||||
ScyllaSeriesRange::from(&SeriesRange::from(nrange))
|
||||
} else {
|
||||
debug!("no ts even though should not have empty buffers");
|
||||
full.clone()
|
||||
}
|
||||
} else {
|
||||
full.clone()
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_first_st(&mut self, mut before: ChannelEvents, bulk: ChannelEvents) {
|
||||
Self::move_latest_to_before_buf(&mut before, &mut self.buf_before);
|
||||
self.buf_st.push_back(bulk);
|
||||
self.setup_first_mt();
|
||||
self.state = State::FetchFirstMt(self.setup_read_mt());
|
||||
}
|
||||
|
||||
fn handle_first_mt(&mut self, mut before: ChannelEvents, bulk: ChannelEvents) {
|
||||
Self::move_latest_to_before_buf(&mut before, &mut self.buf_before);
|
||||
self.buf_mt.push_back(bulk);
|
||||
self.setup_first_lt();
|
||||
self.state = State::FetchFirstLt(self.setup_read_lt());
|
||||
}
|
||||
|
||||
fn handle_first_lt(&mut self, mut before: ChannelEvents, bulk: ChannelEvents) {
|
||||
Self::move_latest_to_before_buf(&mut before, &mut self.buf_before);
|
||||
self.buf_lt.push_back(bulk);
|
||||
let buf = core::mem::replace(&mut self.buf_lt, VecDeque::new());
|
||||
self.state = State::ReadingLt(None, buf, self.inp_lt.take());
|
||||
}
|
||||
|
||||
fn move_latest_to_before_buf(before: &mut ChannelEvents, buf: &mut Option<ChannelEvents>) {
|
||||
if buf.is_none() {
|
||||
*buf = Some(before.new_empty());
|
||||
}
|
||||
let buf = buf.as_mut().unwrap();
|
||||
if let Some(tsn) = before.ts_max() {
|
||||
if let Some(tse) = buf.ts_max() {
|
||||
if tsn > tse {
|
||||
let n = before.len();
|
||||
buf.clear();
|
||||
before.drain_into(buf, (n - 1, n)).unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for MergeRts {
|
||||
type Item = Result<ChannelEvents, Error>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use Poll::*;
|
||||
let mut out2 = VecDeque::new();
|
||||
loop {
|
||||
while let Some(x) = out2.pop_front() {
|
||||
self.out.push_back(x);
|
||||
}
|
||||
if let Some(item) = self.out.pop_front() {
|
||||
debug!("emit item {} {:?}", items_0::Events::verify(&item), item);
|
||||
if items_0::Events::verify(&item) != true {
|
||||
debug!("{}bad item {:?}", "\n\n--------------------------\n", item);
|
||||
self.state = State::Done;
|
||||
}
|
||||
if let Some(item_min) = item.ts_min() {
|
||||
if item_min < self.ts_seen_max {
|
||||
debug!(
|
||||
"{}ordering error A {} {}",
|
||||
"\n\n--------------------------\n", item_min, self.ts_seen_max
|
||||
);
|
||||
self.state = State::Done;
|
||||
break Ready(Some(Err(Error::Logic)));
|
||||
}
|
||||
}
|
||||
if let Some(item_max) = item.ts_max() {
|
||||
if item_max < self.ts_seen_max {
|
||||
debug!(
|
||||
"{}ordering error B {} {}",
|
||||
"\n\n--------------------------\n", item_max, self.ts_seen_max
|
||||
);
|
||||
self.state = State::Done;
|
||||
break Ready(Some(Err(Error::Logic)));
|
||||
} else {
|
||||
self.ts_seen_max = item_max;
|
||||
}
|
||||
}
|
||||
break Ready(Some(Ok(item)));
|
||||
}
|
||||
break match &mut self.state {
|
||||
State::Begin => {
|
||||
self.setup_first_st();
|
||||
self.state = State::FetchFirstSt(self.setup_read_st());
|
||||
continue;
|
||||
}
|
||||
State::FetchFirstSt(st2) => match st2.fut.poll_unpin(cx) {
|
||||
Ready(Some(Ok(x))) => match x {
|
||||
firstbefore::Output::First(before, bulk) => {
|
||||
debug!("have first from ST");
|
||||
self.handle_first_st(before, bulk);
|
||||
continue;
|
||||
}
|
||||
firstbefore::Output::Bulk(_) => {
|
||||
self.state = State::Done;
|
||||
let e = Error::Logic;
|
||||
Ready(Some(Err(e)))
|
||||
}
|
||||
},
|
||||
Ready(Some(Err(e))) => {
|
||||
self.state = State::Done;
|
||||
Ready(Some(Err(e.into())))
|
||||
}
|
||||
Ready(None) => {
|
||||
debug!("no first from ST");
|
||||
self.inp_st = None;
|
||||
self.setup_first_mt();
|
||||
self.state = State::FetchFirstMt(self.setup_read_mt());
|
||||
continue;
|
||||
}
|
||||
Pending => Pending,
|
||||
},
|
||||
State::FetchFirstMt(st2) => match st2.fut.poll_unpin(cx) {
|
||||
Ready(Some(Ok(x))) => match x {
|
||||
firstbefore::Output::First(before, bulk) => {
|
||||
debug!("have first from MT");
|
||||
self.handle_first_mt(before, bulk);
|
||||
continue;
|
||||
}
|
||||
firstbefore::Output::Bulk(_) => {
|
||||
self.state = State::Done;
|
||||
let e = Error::Logic;
|
||||
Ready(Some(Err(e)))
|
||||
}
|
||||
},
|
||||
Ready(Some(Err(e))) => {
|
||||
self.state = State::Done;
|
||||
Ready(Some(Err(e.into())))
|
||||
}
|
||||
Ready(None) => {
|
||||
debug!("no first from MT");
|
||||
self.inp_mt = None;
|
||||
self.setup_first_lt();
|
||||
self.state = State::FetchFirstLt(self.setup_read_lt());
|
||||
continue;
|
||||
}
|
||||
Pending => Pending,
|
||||
},
|
||||
State::FetchFirstLt(st2) => match st2.fut.poll_unpin(cx) {
|
||||
Ready(Some(Ok(x))) => match x {
|
||||
firstbefore::Output::First(before, bulk) => {
|
||||
debug!("have first from LT");
|
||||
self.handle_first_lt(before, bulk);
|
||||
continue;
|
||||
}
|
||||
firstbefore::Output::Bulk(_) => {
|
||||
self.state = State::Done;
|
||||
let e = Error::Logic;
|
||||
Ready(Some(Err(e)))
|
||||
}
|
||||
},
|
||||
Ready(Some(Err(e))) => {
|
||||
self.state = State::Done;
|
||||
Ready(Some(Err(e.into())))
|
||||
}
|
||||
Ready(None) => {
|
||||
debug!("no first from LT");
|
||||
self.inp_lt = None;
|
||||
let buf = core::mem::replace(&mut self.buf_lt, VecDeque::new());
|
||||
self.state = State::ReadingLt(None, buf, self.inp_lt.take());
|
||||
continue;
|
||||
}
|
||||
Pending => Pending,
|
||||
},
|
||||
State::ReadingLt(fut, buf, inp) => {
|
||||
if let Some(x) = buf.pop_front() {
|
||||
out2.push_back(x);
|
||||
continue;
|
||||
} else if let Some(fut2) = fut {
|
||||
match fut2.fut.poll_unpin(cx) {
|
||||
Ready(Some(Ok(x))) => {
|
||||
*fut = None;
|
||||
match x {
|
||||
firstbefore::Output::Bulk(x) => {
|
||||
buf.push_back(x);
|
||||
continue;
|
||||
}
|
||||
firstbefore::Output::First(_, _) => {
|
||||
self.state = State::Done;
|
||||
let e = Error::Logic;
|
||||
Ready(Some(Err(e)))
|
||||
}
|
||||
}
|
||||
}
|
||||
Ready(Some(Err(e))) => {
|
||||
*fut = None;
|
||||
self.state = State::Done;
|
||||
Ready(Some(Err(e.into())))
|
||||
}
|
||||
Ready(None) => {
|
||||
*fut = None;
|
||||
*inp = None;
|
||||
continue;
|
||||
}
|
||||
Pending => Pending,
|
||||
}
|
||||
} else if inp.is_some() {
|
||||
let buf = core::mem::replace(buf, VecDeque::new());
|
||||
self.state = State::ReadingLt(Some(Self::setup_read_any(inp)), buf, inp.take());
|
||||
continue;
|
||||
} else {
|
||||
debug!("transition ReadingLt to ReadingMt");
|
||||
let buf = core::mem::replace(&mut self.buf_mt, VecDeque::new());
|
||||
self.state = State::ReadingMt(None, buf, self.inp_mt.take());
|
||||
continue;
|
||||
}
|
||||
}
|
||||
State::ReadingMt(fut, buf, inp) => {
|
||||
if let Some(x) = buf.pop_front() {
|
||||
out2.push_back(x);
|
||||
continue;
|
||||
} else if let Some(fut2) = fut {
|
||||
match fut2.fut.poll_unpin(cx) {
|
||||
Ready(Some(Ok(x))) => {
|
||||
*fut = None;
|
||||
match x {
|
||||
firstbefore::Output::Bulk(x) => {
|
||||
buf.push_back(x);
|
||||
continue;
|
||||
}
|
||||
firstbefore::Output::First(_, _) => {
|
||||
self.state = State::Done;
|
||||
let e = Error::Logic;
|
||||
Ready(Some(Err(e)))
|
||||
}
|
||||
}
|
||||
}
|
||||
Ready(Some(Err(e))) => {
|
||||
*fut = None;
|
||||
self.state = State::Done;
|
||||
Ready(Some(Err(e.into())))
|
||||
}
|
||||
Ready(None) => {
|
||||
*fut = None;
|
||||
*inp = None;
|
||||
continue;
|
||||
}
|
||||
Pending => Pending,
|
||||
}
|
||||
} else if inp.is_some() {
|
||||
let buf = core::mem::replace(buf, VecDeque::new());
|
||||
self.state = State::ReadingMt(Some(Self::setup_read_any(inp)), buf, inp.take());
|
||||
continue;
|
||||
} else {
|
||||
debug!("transition ReadingMt to ReadingSt");
|
||||
let buf = core::mem::replace(&mut self.buf_st, VecDeque::new());
|
||||
self.state = State::ReadingSt(None, buf, self.inp_st.take());
|
||||
continue;
|
||||
}
|
||||
}
|
||||
State::ReadingSt(fut, buf, inp) => {
|
||||
if let Some(x) = buf.pop_front() {
|
||||
out2.push_back(x);
|
||||
continue;
|
||||
} else if let Some(fut2) = fut {
|
||||
match fut2.fut.poll_unpin(cx) {
|
||||
Ready(Some(Ok(x))) => {
|
||||
*fut = None;
|
||||
match x {
|
||||
firstbefore::Output::Bulk(x) => {
|
||||
buf.push_back(x);
|
||||
continue;
|
||||
}
|
||||
firstbefore::Output::First(_, _) => {
|
||||
self.state = State::Done;
|
||||
let e = Error::Logic;
|
||||
Ready(Some(Err(e)))
|
||||
}
|
||||
}
|
||||
}
|
||||
Ready(Some(Err(e))) => {
|
||||
*fut = None;
|
||||
self.state = State::Done;
|
||||
Ready(Some(Err(e.into())))
|
||||
}
|
||||
Ready(None) => {
|
||||
*fut = None;
|
||||
*inp = None;
|
||||
continue;
|
||||
}
|
||||
Pending => Pending,
|
||||
}
|
||||
} else if inp.is_some() {
|
||||
let buf = core::mem::replace(buf, VecDeque::new());
|
||||
self.state = State::ReadingSt(Some(Self::setup_read_any(inp)), buf, inp.take());
|
||||
continue;
|
||||
} else {
|
||||
debug!("fully done");
|
||||
Ready(None)
|
||||
}
|
||||
}
|
||||
State::Done => Ready(None),
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn trait_assert<T>(_: T)
|
||||
where
|
||||
T: Stream + Unpin + Send,
|
||||
{
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
fn trait_assert_try() {
|
||||
let x: MergeRts = phantomval();
|
||||
trait_assert(x);
|
||||
}
|
||||
|
||||
fn phantomval<T>() -> T {
|
||||
panic!()
|
||||
}
|
||||
@@ -1,3 +1,4 @@
|
||||
use super::prepare::StmtsEvents;
|
||||
use crate::range::ScyllaSeriesRange;
|
||||
use crate::worker::ScyllaQueue;
|
||||
use err::thiserror;
|
||||
@@ -5,8 +6,11 @@ use err::ThisError;
|
||||
use futures_util::Future;
|
||||
use futures_util::FutureExt;
|
||||
use futures_util::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use netpod::log::*;
|
||||
use netpod::ttl::RetentionTime;
|
||||
use netpod::TsMs;
|
||||
use scylla::Session;
|
||||
use series::SeriesId;
|
||||
use std::collections::VecDeque;
|
||||
use std::pin::Pin;
|
||||
@@ -15,8 +19,17 @@ use std::task::Poll;
|
||||
|
||||
#[derive(Debug, ThisError)]
|
||||
pub enum Error {
|
||||
Worker(#[from] crate::worker::Error),
|
||||
Logic,
|
||||
#[error("Worker({0})")]
|
||||
Worker(Box<crate::worker::Error>),
|
||||
ScyllaQuery(#[from] scylla::transport::errors::QueryError),
|
||||
ScyllaRow(#[from] scylla::transport::iterator::NextRowError),
|
||||
}
|
||||
|
||||
impl From<crate::worker::Error> for Error {
|
||||
fn from(value: crate::worker::Error) -> Self {
|
||||
Self::Worker(Box::new(value))
|
||||
}
|
||||
}
|
||||
|
||||
enum Resolvable<F>
|
||||
@@ -54,7 +67,7 @@ enum State {
|
||||
}
|
||||
|
||||
#[pin_project::pin_project]
|
||||
pub struct MspStream {
|
||||
pub struct MspStreamRt {
|
||||
rt: RetentionTime,
|
||||
series: SeriesId,
|
||||
range: ScyllaSeriesRange,
|
||||
@@ -63,7 +76,7 @@ pub struct MspStream {
|
||||
out: VecDeque<TsMs>,
|
||||
}
|
||||
|
||||
impl MspStream {
|
||||
impl MspStreamRt {
|
||||
pub fn new(rt: RetentionTime, series: SeriesId, range: ScyllaSeriesRange, scyqueue: ScyllaQueue) -> Self {
|
||||
let fut_bck = {
|
||||
let scyqueue = scyqueue.clone();
|
||||
@@ -93,7 +106,7 @@ impl MspStream {
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for MspStream {
|
||||
impl Stream for MspStreamRt {
|
||||
type Item = Result<TsMs, Error>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
@@ -112,8 +125,7 @@ impl Stream for MspStream {
|
||||
have_pending = true;
|
||||
}
|
||||
},
|
||||
Resolvable::Output(_) => {}
|
||||
Resolvable::Taken => {}
|
||||
_ => {}
|
||||
}
|
||||
let rsv = &mut st.fut_fwd;
|
||||
match rsv {
|
||||
@@ -125,37 +137,28 @@ impl Stream for MspStream {
|
||||
have_pending = true;
|
||||
}
|
||||
},
|
||||
Resolvable::Output(_) => {}
|
||||
Resolvable::Taken => {}
|
||||
_ => {}
|
||||
}
|
||||
if have_pending {
|
||||
Pending
|
||||
} else {
|
||||
let taken_bck = st.fut_bck.take();
|
||||
let taken_fwd = st.fut_fwd.take();
|
||||
if let Some(x) = taken_bck {
|
||||
match x {
|
||||
Ok(v) => {
|
||||
for e in v {
|
||||
self.out.push_back(e)
|
||||
}
|
||||
|
||||
if let Some(x) = taken_fwd {
|
||||
match x {
|
||||
Ok(v) => {
|
||||
for e in v {
|
||||
self.out.push_back(e)
|
||||
}
|
||||
|
||||
self.state = State::InputDone;
|
||||
continue;
|
||||
}
|
||||
Err(e) => Ready(Some(Err(e.into()))),
|
||||
self.state = State::InputDone;
|
||||
if let (Some(taken_bck), Some(taken_fwd)) = (taken_bck, taken_fwd) {
|
||||
match taken_bck {
|
||||
Ok(v1) => match taken_fwd {
|
||||
Ok(v2) => {
|
||||
for e in v1 {
|
||||
self.out.push_back(e)
|
||||
}
|
||||
} else {
|
||||
Ready(Some(Err(Error::Logic)))
|
||||
for e in v2 {
|
||||
self.out.push_back(e)
|
||||
}
|
||||
continue;
|
||||
}
|
||||
}
|
||||
Err(e) => Ready(Some(Err(e.into()))),
|
||||
},
|
||||
Err(e) => Ready(Some(Err(e.into()))),
|
||||
}
|
||||
} else {
|
||||
@@ -183,10 +186,69 @@ where
|
||||
|
||||
#[allow(unused)]
|
||||
fn trait_assert_try() {
|
||||
let x: MspStream = todoval();
|
||||
let x: MspStreamRt = phantomval();
|
||||
trait_assert(x);
|
||||
}
|
||||
|
||||
fn todoval<T>() -> T {
|
||||
todo!()
|
||||
fn phantomval<T>() -> T {
|
||||
panic!()
|
||||
}
|
||||
|
||||
pub async fn find_ts_msp(
|
||||
rt: &RetentionTime,
|
||||
series: u64,
|
||||
range: ScyllaSeriesRange,
|
||||
bck: bool,
|
||||
stmts: &StmtsEvents,
|
||||
scy: &Session,
|
||||
) -> Result<VecDeque<TsMs>, Error> {
|
||||
trace!("find_ts_msp series {:?} {:?} {:?} bck {}", rt, series, range, bck);
|
||||
if bck {
|
||||
find_ts_msp_bck(rt, series, range, stmts, scy).await
|
||||
} else {
|
||||
find_ts_msp_fwd(rt, series, range, stmts, scy).await
|
||||
}
|
||||
}
|
||||
|
||||
async fn find_ts_msp_fwd(
|
||||
rt: &RetentionTime,
|
||||
series: u64,
|
||||
range: ScyllaSeriesRange,
|
||||
stmts: &StmtsEvents,
|
||||
scy: &Session,
|
||||
) -> Result<VecDeque<TsMs>, Error> {
|
||||
let mut ret = VecDeque::new();
|
||||
// TODO time range truncation can be handled better
|
||||
let params = (series as i64, range.beg().ms() as i64, 1 + range.end().ms() as i64);
|
||||
let mut res = scy
|
||||
.execute_iter(stmts.rt(rt).ts_msp_fwd().clone(), params)
|
||||
.await?
|
||||
.into_typed::<(i64,)>();
|
||||
while let Some(x) = res.next().await {
|
||||
let row = x?;
|
||||
let ts = TsMs::from_ms_u64(row.0 as u64);
|
||||
ret.push_back(ts);
|
||||
}
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
async fn find_ts_msp_bck(
|
||||
rt: &RetentionTime,
|
||||
series: u64,
|
||||
range: ScyllaSeriesRange,
|
||||
stmts: &StmtsEvents,
|
||||
scy: &Session,
|
||||
) -> Result<VecDeque<TsMs>, Error> {
|
||||
let mut ret = VecDeque::new();
|
||||
let params = (series as i64, range.beg().ms() as i64);
|
||||
let mut res = scy
|
||||
.execute_iter(stmts.rt(rt).ts_msp_bck().clone(), params)
|
||||
.await?
|
||||
.into_typed::<(i64,)>();
|
||||
while let Some(x) = res.next().await {
|
||||
let row = x?;
|
||||
let ts = TsMs::from_ms_u64(row.0 as u64);
|
||||
ret.push_front(ts);
|
||||
}
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
42
crates/scyllaconn/src/events2/nonempty.rs
Normal file
42
crates/scyllaconn/src/events2/nonempty.rs
Normal file
@@ -0,0 +1,42 @@
|
||||
use futures_util::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use items_0::WithLen;
|
||||
use std::pin::Pin;
|
||||
use std::task::Context;
|
||||
use std::task::Poll;
|
||||
|
||||
pub struct NonEmpty<S> {
|
||||
inp: S,
|
||||
}
|
||||
|
||||
impl<S> NonEmpty<S> {
|
||||
pub fn new(inp: S) -> Self {
|
||||
Self { inp }
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, T, E> Stream for NonEmpty<S>
|
||||
where
|
||||
S: Stream<Item = Result<T, E>> + Unpin,
|
||||
T: WithLen,
|
||||
{
|
||||
type Item = <S as Stream>::Item;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use Poll::*;
|
||||
loop {
|
||||
break match self.inp.poll_next_unpin(cx) {
|
||||
Ready(Some(Ok(x))) => {
|
||||
if x.len() != 0 {
|
||||
Ready(Some(Ok(x)))
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
Ready(Some(Err(e))) => Ready(Some(Err(e))),
|
||||
Ready(None) => Ready(None),
|
||||
Pending => Pending,
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
238
crates/scyllaconn/src/events2/prepare.rs
Normal file
238
crates/scyllaconn/src/events2/prepare.rs
Normal file
@@ -0,0 +1,238 @@
|
||||
use err::thiserror;
|
||||
use err::ThisError;
|
||||
use netpod::ttl::RetentionTime;
|
||||
use scylla::prepared_statement::PreparedStatement;
|
||||
use scylla::Session;
|
||||
|
||||
#[derive(Debug, ThisError)]
|
||||
pub enum Error {
|
||||
ScyllaQuery(#[from] scylla::transport::errors::QueryError),
|
||||
ScyllaNextRow(#[from] scylla::transport::iterator::NextRowError),
|
||||
ScyllaTypeConv(#[from] scylla::cql_to_rust::FromRowError),
|
||||
ScyllaWorker(Box<crate::worker::Error>),
|
||||
MissingQuery(String),
|
||||
RangeEndOverflow,
|
||||
InvalidFuture,
|
||||
TestError(String),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct StmtsLspShape {
|
||||
u8: PreparedStatement,
|
||||
u16: PreparedStatement,
|
||||
u32: PreparedStatement,
|
||||
u64: PreparedStatement,
|
||||
i8: PreparedStatement,
|
||||
i16: PreparedStatement,
|
||||
i32: PreparedStatement,
|
||||
i64: PreparedStatement,
|
||||
f32: PreparedStatement,
|
||||
f64: PreparedStatement,
|
||||
bool: PreparedStatement,
|
||||
string: PreparedStatement,
|
||||
}
|
||||
|
||||
impl StmtsLspShape {
|
||||
pub fn st(&self, stname: &str) -> Result<&PreparedStatement, Error> {
|
||||
let ret = match stname {
|
||||
"u8" => &self.u8,
|
||||
"u16" => &self.u16,
|
||||
"u32" => &self.u32,
|
||||
"u64" => &self.u64,
|
||||
"i8" => &self.i8,
|
||||
"i16" => &self.i16,
|
||||
"i32" => &self.i32,
|
||||
"i64" => &self.i64,
|
||||
"f32" => &self.f32,
|
||||
"f64" => &self.f64,
|
||||
"bool" => &self.bool,
|
||||
"string" => &self.string,
|
||||
_ => return Err(Error::MissingQuery(format!("no query for stname {stname}"))),
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct StmtsLspDir {
|
||||
scalar: StmtsLspShape,
|
||||
array: StmtsLspShape,
|
||||
}
|
||||
|
||||
impl StmtsLspDir {
|
||||
pub fn shape(&self, array: bool) -> &StmtsLspShape {
|
||||
if array {
|
||||
&self.array
|
||||
} else {
|
||||
&self.scalar
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct StmtsEventsRt {
|
||||
ts_msp_fwd: PreparedStatement,
|
||||
ts_msp_bck: PreparedStatement,
|
||||
lsp_fwd_val: StmtsLspDir,
|
||||
lsp_bck_val: StmtsLspDir,
|
||||
lsp_fwd_ts: StmtsLspDir,
|
||||
lsp_bck_ts: StmtsLspDir,
|
||||
}
|
||||
|
||||
impl StmtsEventsRt {
|
||||
pub fn ts_msp_fwd(&self) -> &PreparedStatement {
|
||||
&self.ts_msp_fwd
|
||||
}
|
||||
|
||||
pub fn ts_msp_bck(&self) -> &PreparedStatement {
|
||||
&self.ts_msp_bck
|
||||
}
|
||||
|
||||
pub fn lsp(&self, bck: bool, val: bool) -> &StmtsLspDir {
|
||||
if bck {
|
||||
if val {
|
||||
&self.lsp_bck_val
|
||||
} else {
|
||||
&self.lsp_bck_ts
|
||||
}
|
||||
} else {
|
||||
if val {
|
||||
&self.lsp_fwd_val
|
||||
} else {
|
||||
&self.lsp_fwd_ts
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn make_msp_dir(ks: &str, rt: &RetentionTime, bck: bool, scy: &Session) -> Result<PreparedStatement, Error> {
|
||||
let table_name = "ts_msp";
|
||||
let select_cond = if bck {
|
||||
"ts_msp < ? order by ts_msp desc limit 2"
|
||||
} else {
|
||||
"ts_msp >= ? and ts_msp < ?"
|
||||
};
|
||||
let cql = format!(
|
||||
"select ts_msp from {}.{}{} where series = ? and {}",
|
||||
ks,
|
||||
rt.table_prefix(),
|
||||
table_name,
|
||||
select_cond
|
||||
);
|
||||
let qu = scy.prepare(cql).await?;
|
||||
Ok(qu)
|
||||
}
|
||||
|
||||
async fn make_lsp(
|
||||
ks: &str,
|
||||
rt: &RetentionTime,
|
||||
shapepre: &str,
|
||||
stname: &str,
|
||||
values: &str,
|
||||
bck: bool,
|
||||
scy: &Session,
|
||||
) -> Result<PreparedStatement, Error> {
|
||||
let select_cond = if bck {
|
||||
"ts_lsp < ? order by ts_lsp desc limit 1"
|
||||
} else {
|
||||
"ts_lsp >= ? and ts_lsp < ?"
|
||||
};
|
||||
let cql = format!(
|
||||
concat!(
|
||||
"select {} from {}.{}events_{}_{}",
|
||||
" where series = ? and ts_msp = ? and {}"
|
||||
),
|
||||
values,
|
||||
ks,
|
||||
rt.table_prefix(),
|
||||
shapepre,
|
||||
stname,
|
||||
select_cond
|
||||
);
|
||||
let qu = scy.prepare(cql).await?;
|
||||
Ok(qu)
|
||||
}
|
||||
|
||||
async fn make_lsp_shape(
|
||||
ks: &str,
|
||||
rt: &RetentionTime,
|
||||
shapepre: &str,
|
||||
values: &str,
|
||||
bck: bool,
|
||||
scy: &Session,
|
||||
) -> Result<StmtsLspShape, Error> {
|
||||
let values = if shapepre.contains("array") {
|
||||
values.replace("value", "valueblob")
|
||||
} else {
|
||||
values.into()
|
||||
};
|
||||
let values = &values;
|
||||
let maker = |stname| make_lsp(ks, rt, shapepre, stname, values, bck, scy);
|
||||
let ret = StmtsLspShape {
|
||||
u8: maker("u8").await?,
|
||||
u16: maker("u16").await?,
|
||||
u32: maker("u32").await?,
|
||||
u64: maker("u64").await?,
|
||||
i8: maker("i8").await?,
|
||||
i16: maker("i16").await?,
|
||||
i32: maker("i32").await?,
|
||||
i64: maker("i64").await?,
|
||||
f32: maker("f32").await?,
|
||||
f64: maker("f64").await?,
|
||||
bool: maker("bool").await?,
|
||||
string: maker("string").await?,
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
async fn make_lsp_dir(
|
||||
ks: &str,
|
||||
rt: &RetentionTime,
|
||||
values: &str,
|
||||
bck: bool,
|
||||
scy: &Session,
|
||||
) -> Result<StmtsLspDir, Error> {
|
||||
let ret = StmtsLspDir {
|
||||
scalar: make_lsp_shape(ks, rt, "scalar", values, bck, scy).await?,
|
||||
array: make_lsp_shape(ks, rt, "array", values, bck, scy).await?,
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
async fn make_rt(ks: &str, rt: &RetentionTime, scy: &Session) -> Result<StmtsEventsRt, Error> {
|
||||
let ret = StmtsEventsRt {
|
||||
ts_msp_fwd: make_msp_dir(ks, rt, false, scy).await?,
|
||||
ts_msp_bck: make_msp_dir(ks, rt, true, scy).await?,
|
||||
lsp_fwd_val: make_lsp_dir(ks, rt, "ts_lsp, pulse, value", false, scy).await?,
|
||||
lsp_bck_val: make_lsp_dir(ks, rt, "ts_lsp, pulse, value", true, scy).await?,
|
||||
lsp_fwd_ts: make_lsp_dir(ks, rt, "ts_lsp, pulse", false, scy).await?,
|
||||
lsp_bck_ts: make_lsp_dir(ks, rt, "ts_lsp, pulse", true, scy).await?,
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct StmtsEvents {
|
||||
st: StmtsEventsRt,
|
||||
mt: StmtsEventsRt,
|
||||
lt: StmtsEventsRt,
|
||||
}
|
||||
|
||||
impl StmtsEvents {
|
||||
pub async fn new(ks: [&str; 3], scy: &Session) -> Result<Self, Error> {
|
||||
let ret = StmtsEvents {
|
||||
st: make_rt(ks[0], &RetentionTime::Short, scy).await?,
|
||||
mt: make_rt(ks[1], &RetentionTime::Medium, scy).await?,
|
||||
lt: make_rt(ks[2], &RetentionTime::Long, scy).await?,
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
pub fn rt(&self, rt: &RetentionTime) -> &StmtsEventsRt {
|
||||
match rt {
|
||||
RetentionTime::Short => &self.st,
|
||||
RetentionTime::Medium => &self.mt,
|
||||
RetentionTime::Long => &&self.lt,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -10,3 +10,12 @@ pub mod worker;
|
||||
|
||||
pub use scylla;
|
||||
pub use series::SeriesId;
|
||||
|
||||
pub async fn test_log() {
|
||||
use netpod::log::*;
|
||||
error!("------");
|
||||
warn!("------");
|
||||
info!("------");
|
||||
debug!("------");
|
||||
trace!("------");
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use crate::conn::create_scy_session_no_ks;
|
||||
use crate::events::StmtsEvents;
|
||||
use crate::events2::prepare::StmtsEvents;
|
||||
use crate::range::ScyllaSeriesRange;
|
||||
use async_channel::Receiver;
|
||||
use async_channel::Sender;
|
||||
@@ -19,8 +19,11 @@ use std::sync::Arc;
|
||||
|
||||
#[derive(Debug, ThisError)]
|
||||
pub enum Error {
|
||||
#[error("ScyllaConnection({0})")]
|
||||
ScyllaConnection(err::Error),
|
||||
Prepare(#[from] crate::events2::prepare::Error),
|
||||
EventsQuery(#[from] crate::events::Error),
|
||||
Msp(#[from] crate::events2::msp::Error),
|
||||
ChannelSend,
|
||||
ChannelRecv,
|
||||
Join,
|
||||
@@ -145,7 +148,7 @@ impl ScyllaWorker {
|
||||
};
|
||||
match job {
|
||||
Job::FindTsMsp(rt, series, range, bck, tx) => {
|
||||
let res = crate::events::find_ts_msp(&rt, series, range, bck, &stmts, &scy).await;
|
||||
let res = crate::events2::msp::find_ts_msp(&rt, series, range, bck, &stmts, &scy).await;
|
||||
if tx.send(res.map_err(Into::into)).await.is_err() {
|
||||
// TODO count for stats
|
||||
}
|
||||
|
||||
@@ -14,7 +14,7 @@ serde_json = "1.0"
|
||||
serde_cbor = "0.11.1"
|
||||
typetag = "0.2.14"
|
||||
ciborium = "0.2.1"
|
||||
bytes = "1.3"
|
||||
bytes = "1.6"
|
||||
arrayref = "0.3.6"
|
||||
crc32fast = "1.3.2"
|
||||
byteorder = "1.4.3"
|
||||
|
||||
136
crates/streams/src/framed_bytes.rs
Normal file
136
crates/streams/src/framed_bytes.rs
Normal file
@@ -0,0 +1,136 @@
|
||||
use bytes::Buf;
|
||||
use bytes::BufMut;
|
||||
use bytes::Bytes;
|
||||
use bytes::BytesMut;
|
||||
use err::thiserror;
|
||||
use err::ThisError;
|
||||
use futures_util::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use netpod::log::*;
|
||||
use std::pin::Pin;
|
||||
use std::task::Context;
|
||||
use std::task::Poll;
|
||||
|
||||
const FRAME_HEAD_LEN: usize = 16;
|
||||
const FRAME_PAYLOAD_MAX: u32 = 1024 * 1024 * 8;
|
||||
const BUF_MAX: usize = (FRAME_HEAD_LEN + FRAME_PAYLOAD_MAX as usize) * 2;
|
||||
|
||||
#[allow(unused)]
|
||||
macro_rules! trace_parse {
|
||||
($($arg:tt)*) => {
|
||||
if false {
|
||||
trace!($($arg)*);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
#[derive(Debug, ThisError)]
|
||||
pub enum Error {
|
||||
FrameTooLarge,
|
||||
Logic,
|
||||
}
|
||||
|
||||
pub type BoxedFramedBytesStream = Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send>>;
|
||||
|
||||
// TODO move this type decl because it is not specific to cbor
|
||||
pub type SitemtyFramedBytesStream = Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send>>;
|
||||
|
||||
pub enum State {
|
||||
Reading,
|
||||
Done,
|
||||
}
|
||||
|
||||
pub struct FramedBytesStream<S> {
|
||||
inp: S,
|
||||
buf: BytesMut,
|
||||
state: State,
|
||||
}
|
||||
|
||||
impl<S, E> FramedBytesStream<S>
|
||||
where
|
||||
S: Stream<Item = Result<Bytes, E>> + Unpin,
|
||||
E: Into<Error>,
|
||||
{
|
||||
pub fn new(inp: S) -> Self {
|
||||
Self {
|
||||
inp,
|
||||
buf: BytesMut::with_capacity(1024 * 256),
|
||||
state: State::Reading,
|
||||
}
|
||||
}
|
||||
|
||||
fn try_parse(&mut self) -> Result<Option<Bytes>, Error> {
|
||||
trace_parse!("try_parse self.buf.len() {}", self.buf.len());
|
||||
if self.buf.len() < FRAME_HEAD_LEN {
|
||||
return Ok(None);
|
||||
}
|
||||
let n = u32::from_le_bytes(self.buf[..4].try_into().map_err(|_| Error::Logic)?);
|
||||
trace_parse!("try_parse n {}", n);
|
||||
if n > FRAME_PAYLOAD_MAX {
|
||||
let e = Error::FrameTooLarge;
|
||||
return Err(e);
|
||||
}
|
||||
let frame_len = FRAME_HEAD_LEN + n as usize;
|
||||
trace_parse!("try_parse frame_len {}", frame_len);
|
||||
assert!(self.buf.len() <= self.buf.capacity());
|
||||
if self.buf.capacity() < frame_len {
|
||||
let add_max = BUF_MAX - self.buf.capacity().min(BUF_MAX);
|
||||
let nadd = ((frame_len.min(FRAME_PAYLOAD_MAX as usize) - self.buf.len()) * 2).min(add_max);
|
||||
self.buf.reserve(nadd);
|
||||
}
|
||||
let adv = (frame_len + 7) / 8 * 8;
|
||||
trace_parse!("try_parse adv {}", adv);
|
||||
if self.buf.len() < adv {
|
||||
Ok(None)
|
||||
} else {
|
||||
self.buf.advance(FRAME_HEAD_LEN);
|
||||
let buf = self.buf.split_to(n as usize);
|
||||
self.buf.advance(adv - frame_len);
|
||||
Ok(Some(buf.freeze()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, E> Stream for FramedBytesStream<S>
|
||||
where
|
||||
S: Stream<Item = Result<Bytes, E>> + Unpin,
|
||||
E: Into<Error>,
|
||||
{
|
||||
type Item = Result<Bytes, Error>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use Poll::*;
|
||||
loop {
|
||||
break match &self.state {
|
||||
State::Reading => match self.try_parse() {
|
||||
Ok(Some(x)) => Ready(Some(Ok(x))),
|
||||
Ok(None) => match self.inp.poll_next_unpin(cx) {
|
||||
Ready(Some(x)) => match x {
|
||||
Ok(x) => {
|
||||
self.buf.put_slice(&x);
|
||||
continue;
|
||||
}
|
||||
Err(e) => {
|
||||
self.state = State::Done;
|
||||
Ready(Some(Err(e.into())))
|
||||
}
|
||||
},
|
||||
Ready(None) => {
|
||||
if self.buf.len() > 0 {
|
||||
warn!("remaining bytes in input buffer, input closed len {}", self.buf.len());
|
||||
}
|
||||
self.state = State::Done;
|
||||
Ready(None)
|
||||
}
|
||||
Pending => Pending,
|
||||
},
|
||||
Err(e) => {
|
||||
self.state = State::Done;
|
||||
Ready(Some(Err(e)))
|
||||
}
|
||||
},
|
||||
State::Done => Ready(None),
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
33
crates/streams/src/instrument.rs
Normal file
33
crates/streams/src/instrument.rs
Normal file
@@ -0,0 +1,33 @@
|
||||
use futures_util::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use netpod::log::tracing;
|
||||
use std::pin::Pin;
|
||||
use std::task::Context;
|
||||
use std::task::Poll;
|
||||
|
||||
#[pin_project::pin_project]
|
||||
pub struct InstrumentStream<S> {
|
||||
#[pin]
|
||||
inp: S,
|
||||
#[pin]
|
||||
span: tracing::Span,
|
||||
}
|
||||
|
||||
impl<S> InstrumentStream<S> {
|
||||
pub fn new(inp: S, span: tracing::Span) -> Self {
|
||||
Self { inp, span }
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> Stream for InstrumentStream<S>
|
||||
where
|
||||
S: Stream,
|
||||
{
|
||||
type Item = <S as Stream>::Item;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
let mut this = self.project();
|
||||
let _spg = this.span.enter();
|
||||
this.inp.poll_next_unpin(cx)
|
||||
}
|
||||
}
|
||||
@@ -4,8 +4,10 @@ pub mod collect;
|
||||
pub mod dtflags;
|
||||
pub mod filechunkread;
|
||||
pub mod firsterr;
|
||||
pub mod framed_bytes;
|
||||
pub mod frames;
|
||||
pub mod generators;
|
||||
pub mod instrument;
|
||||
pub mod itemclone;
|
||||
pub mod json_stream;
|
||||
pub mod lenframed;
|
||||
|
||||
@@ -159,7 +159,7 @@ fn tracing_init_inner(mode: TracingMode) -> Result<(), Error> {
|
||||
console_subscriber::init();
|
||||
} else {
|
||||
// Logging setup
|
||||
let filter = tracing_subscriber::EnvFilter::builder()
|
||||
let filter_1 = tracing_subscriber::EnvFilter::builder()
|
||||
.with_default_directive(tracing::metadata::LevelFilter::INFO.into())
|
||||
.from_env()
|
||||
.map_err(|e| Error::with_msg_no_trace(format!("can not build tracing env filter {e}")))?;
|
||||
@@ -168,9 +168,9 @@ fn tracing_init_inner(mode: TracingMode) -> Result<(), Error> {
|
||||
.with_default_directive(tracing::metadata::LevelFilter::INFO.into())
|
||||
.from_env()
|
||||
.map_err(|e| Error::with_msg_no_trace(format!("can not build tracing env filter {e}")))?;
|
||||
let filter_3 = tracing_subscriber::filter::dynamic_filter_fn(|meta, ctx| {
|
||||
/*let filter_3 = tracing_subscriber::filter::dynamic_filter_fn(|meta, ctx| {
|
||||
if true {
|
||||
return true;
|
||||
return false;
|
||||
}
|
||||
if *meta.level() <= tracing::Level::TRACE {
|
||||
if ["httpret", "scyllaconn"].contains(&meta.target()) {
|
||||
@@ -207,7 +207,7 @@ fn tracing_init_inner(mode: TracingMode) -> Result<(), Error> {
|
||||
} else {
|
||||
true
|
||||
}
|
||||
});
|
||||
});*/
|
||||
let fmt_layer = tracing_subscriber::fmt::Layer::new()
|
||||
.with_writer(io::stderr)
|
||||
.with_timer(timer)
|
||||
@@ -215,13 +215,22 @@ fn tracing_init_inner(mode: TracingMode) -> Result<(), Error> {
|
||||
.with_ansi(false)
|
||||
.with_thread_names(true)
|
||||
.event_format(formatter::FormatTxt)
|
||||
.with_filter(filter_3)
|
||||
// .with_filter(filter_3)
|
||||
.with_filter(filter_2)
|
||||
.with_filter(filter)
|
||||
// .and_then(LogFilterLayer::new("lay1".into()))
|
||||
// .and_then(LogFilterLayer::new("lay2".into()))
|
||||
;
|
||||
.with_filter(filter_1)
|
||||
// let fmt_layer = fmt_layer.with_filter(filter_3);
|
||||
// let fmt_layer: Box<dyn Layer<tracing_subscriber::Registry>> = if std::env::var("RUST_LOG_USE_2").is_ok() {
|
||||
// let a = fmt_layer.with_filter(filter_2);
|
||||
// Box::new(a)
|
||||
// } else {
|
||||
// let a = fmt_layer;
|
||||
// Box::new(a)
|
||||
// };
|
||||
// let fmt_layer = fmt_layer.with_filter(filter_1);
|
||||
// .and_then(LogFilterLayer::new("lay1".into()))
|
||||
// .and_then(LogFilterLayer::new("lay2".into()))
|
||||
// let layer_2 = LogFilterLayer::new("lay1".into(), fmt_layer);
|
||||
;
|
||||
|
||||
let reg = tracing_subscriber::registry();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user