Prepare for merge of retention times

This commit is contained in:
Dominik Werder
2024-06-15 09:49:07 +02:00
parent 902b9a9cb7
commit 3a77d116f6
13 changed files with 896 additions and 47 deletions

View File

@@ -1,6 +1,6 @@
[package]
name = "daqbuffer"
version = "0.5.1-aa.1"
version = "0.5.1-aa.2"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"

View File

@@ -639,7 +639,7 @@ impl ScyllaSeriesTsMsp {
let mut st_ts_msp_ms = Vec::new();
let mut msp_stream =
scyllaconn::events2::msp::MspStream::new(RetentionTime::Short, sid, (&q.range).into(), scyqueue.clone());
scyllaconn::events2::msp::MspStreamRt::new(RetentionTime::Short, sid, (&q.range).into(), scyqueue.clone());
use chrono::TimeZone;
while let Some(x) = msp_stream.next().await {
let v = x.unwrap().ms();
@@ -650,7 +650,7 @@ impl ScyllaSeriesTsMsp {
let mut mt_ts_msp_ms = Vec::new();
let mut msp_stream =
scyllaconn::events2::msp::MspStream::new(RetentionTime::Medium, sid, (&q.range).into(), scyqueue.clone());
scyllaconn::events2::msp::MspStreamRt::new(RetentionTime::Medium, sid, (&q.range).into(), scyqueue.clone());
while let Some(x) = msp_stream.next().await {
let v = x.unwrap().ms();
let st = chrono::Utc.timestamp_millis_opt(v as _).earliest().unwrap();
@@ -660,7 +660,7 @@ impl ScyllaSeriesTsMsp {
let mut lt_ts_msp_ms = Vec::new();
let mut msp_stream =
scyllaconn::events2::msp::MspStream::new(RetentionTime::Long, sid, (&q.range).into(), scyqueue.clone());
scyllaconn::events2::msp::MspStreamRt::new(RetentionTime::Long, sid, (&q.range).into(), scyqueue.clone());
while let Some(x) = msp_stream.next().await {
let v = x.unwrap().ms();
let st = chrono::Utc.timestamp_millis_opt(v as _).earliest().unwrap();

View File

@@ -77,6 +77,15 @@ impl NanoRange {
}
}
impl From<(u64, u64)> for NanoRange {
fn from(value: (u64, u64)) -> Self {
Self {
beg: value.0,
end: value.1,
}
}
}
impl TryFrom<&SeriesRange> for NanoRange {
type Error = Error;

View File

@@ -1,6 +1,7 @@
use err::Error;
use futures_util::Stream;
use futures_util::StreamExt;
use futures_util::TryStreamExt;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
@@ -10,6 +11,7 @@ use netpod::ttl::RetentionTime;
use netpod::ChConf;
use query::api4::events::EventsSubQuery;
use scyllaconn::worker::ScyllaQueue;
use scyllaconn::SeriesId;
use std::pin::Pin;
use taskrun::tokio;
@@ -28,7 +30,30 @@ pub async fn scylla_channel_event_stream(
let do_test_stream_error = false;
let with_values = evq.need_value_data();
debug!("\n\nmake EventsStreamScylla {series:?} {scalar_type:?} {shape:?}\n");
let stream = scyllaconn::events::EventsStreamScylla::new(
let stream: Pin<Box<dyn Stream<Item = _> + Send>> = if evq.use_all_rt() {
let x = scyllaconn::events2::mergert::MergeRts::new(
SeriesId::new(chconf.series()),
scalar_type.clone(),
shape.clone(),
evq.range().into(),
with_values,
scyqueue.clone(),
);
Box::pin(x)
} else {
let x = scyllaconn::events2::events::EventsStreamRt::new(
RetentionTime::Short,
SeriesId::new(chconf.series()),
scalar_type.clone(),
shape.clone(),
evq.range().into(),
with_values,
scyqueue.clone(),
)
.map_err(|e| scyllaconn::events2::mergert::Error::from(e));
Box::pin(x)
};
/*let stream = scyllaconn::events::EventsStreamScylla::new(
RetentionTime::Short,
series,
evq.range().into(),
@@ -38,7 +63,7 @@ pub async fn scylla_channel_event_stream(
with_values,
scyqueue.clone(),
do_test_stream_error,
);
);*/
let stream = stream
.map(move |item| match &item {
Ok(k) => match k {

View File

@@ -56,6 +56,8 @@ pub struct PlainEventsQuery {
create_errors: Vec<String>,
#[serde(default)]
log_level: String,
#[serde(default)]
use_all_rt: bool,
}
impl PlainEventsQuery {
@@ -81,6 +83,7 @@ impl PlainEventsQuery {
merger_out_len_max: None,
create_errors: Vec::new(),
log_level: String::new(),
use_all_rt: false,
}
}
@@ -206,6 +209,10 @@ impl PlainEventsQuery {
pub fn log_level(&self) -> &str {
&self.log_level
}
pub fn use_all_rt(&self) -> bool {
self.use_all_rt
}
}
impl HasBackend for PlainEventsQuery {
@@ -283,6 +290,11 @@ impl FromUrl for PlainEventsQuery {
.map(|x| x.split(",").map(|x| x.to_string()).collect())
.unwrap_or(Vec::new()),
log_level: pairs.get("log_level").map_or(String::new(), String::from),
use_all_rt: pairs
.get("useAllRt")
.map_or("false", |k| k)
.parse()
.map_err(|e| Error::with_public_msg_no_trace(format!("can not parse useAllRt: {}", e)))?,
};
Ok(ret)
}
@@ -342,6 +354,9 @@ impl AppendToUrl for PlainEventsQuery {
if self.log_level.len() != 0 {
g.append_pair("log_level", &self.log_level);
}
if self.use_all_rt {
g.append_pair("useAllRt", "true");
}
}
}
@@ -385,6 +400,7 @@ pub struct EventsSubQuerySettings {
buf_len_disk_io: Option<usize>,
queue_len_disk_io: Option<usize>,
create_errors: Vec<String>,
use_all_rt: bool,
}
impl Default for EventsSubQuerySettings {
@@ -398,6 +414,7 @@ impl Default for EventsSubQuerySettings {
buf_len_disk_io: None,
queue_len_disk_io: None,
create_errors: Vec::new(),
use_all_rt: true,
}
}
}
@@ -414,6 +431,7 @@ impl From<&PlainEventsQuery> for EventsSubQuerySettings {
// TODO add to query
queue_len_disk_io: None,
create_errors: value.create_errors.clone(),
use_all_rt: value.use_all_rt(),
}
}
}
@@ -431,6 +449,7 @@ impl From<&BinnedQuery> for EventsSubQuerySettings {
// TODO add to query
queue_len_disk_io: None,
create_errors: Vec::new(),
use_all_rt: true,
}
}
}
@@ -448,6 +467,7 @@ impl From<&Api1Query> for EventsSubQuerySettings {
buf_len_disk_io: Some(disk_io_tune.read_buffer_len),
queue_len_disk_io: Some(disk_io_tune.read_queue_len),
create_errors: Vec::new(),
use_all_rt: false,
}
}
}
@@ -551,6 +571,10 @@ impl EventsSubQuery {
pub fn log_level(&self) -> &str {
&self.log_level
}
pub fn use_all_rt(&self) -> bool {
self.settings.use_all_rt
}
}
#[derive(Debug, Serialize, Deserialize)]

View File

@@ -26,6 +26,7 @@ use scylla::frame::response::result::Row;
use scylla::prepared_statement::PreparedStatement;
use scylla::Session;
use scylla::Session as ScySession;
use series::SeriesId;
use std::collections::VecDeque;
use std::mem;
use std::pin::Pin;
@@ -71,6 +72,17 @@ impl StmtsLspShape {
fn st(&self, stname: &str) -> Result<&PreparedStatement, Error> {
let ret = match stname {
"u8" => &self.u8,
"u16" => &self.u16,
"u32" => &self.u32,
"u64" => &self.u64,
"i8" => &self.i8,
"i16" => &self.i16,
"i32" => &self.i32,
"i64" => &self.i64,
"f32" => &self.f32,
"f64" => &self.f64,
"bool" => &self.bool,
"string" => &self.string,
_ => return Err(Error::MissingQuery(format!("no query for stname {stname}"))),
};
Ok(ret)
@@ -449,7 +461,7 @@ impl_scaty_array!(Vec<f32>, f32, Vec<f32>, "f32", "f32");
impl_scaty_array!(Vec<f64>, f64, Vec<f64>, "f64", "f64");
impl_scaty_array!(Vec<bool>, bool, Vec<bool>, "bool", "bool");
struct ReadNextValuesOpts {
pub(super) struct ReadNextValuesOpts {
rt: RetentionTime,
series: u64,
ts_msp: TsMs,
@@ -459,7 +471,29 @@ struct ReadNextValuesOpts {
scyqueue: ScyllaQueue,
}
async fn read_next_values<ST>(opts: ReadNextValuesOpts) -> Result<Box<dyn Events>, Error>
impl ReadNextValuesOpts {
pub(super) fn new(
rt: RetentionTime,
series: SeriesId,
ts_msp: TsMs,
range: ScyllaSeriesRange,
fwd: bool,
with_values: bool,
scyqueue: ScyllaQueue,
) -> Self {
Self {
rt,
series: series.id(),
ts_msp,
range,
fwd,
with_values,
scyqueue,
}
}
}
pub(super) async fn read_next_values<ST>(opts: ReadNextValuesOpts) -> Result<Box<dyn Events>, Error>
where
ST: ValTy,
{
@@ -648,7 +682,7 @@ fn convert_rows<ST: ValTy>(
Ok(ret)
}
struct ReadValues {
pub(super) struct ReadValues {
rt: RetentionTime,
series: u64,
scalar_type: ScalarType,
@@ -663,7 +697,7 @@ struct ReadValues {
}
impl ReadValues {
fn new(
pub(super) fn new(
rt: RetentionTime,
series: u64,
scalar_type: ScalarType,

View File

@@ -1 +1,5 @@
pub mod events;
pub mod firstbefore;
pub mod mergert;
pub mod msp;
pub mod nonempty;

View File

@@ -0,0 +1,276 @@
use super::msp::MspStreamRt;
use crate::events::read_next_values;
use crate::events::ReadNextValuesOpts;
use crate::range::ScyllaSeriesRange;
use crate::worker::ScyllaQueue;
use err::thiserror;
use err::ThisError;
use futures_util::Future;
use futures_util::FutureExt;
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::Events;
use items_2::channelevents::ChannelEvents;
use netpod::log::*;
use netpod::ttl::RetentionTime;
use netpod::ScalarType;
use netpod::Shape;
use netpod::TsMs;
use series::SeriesId;
use std::collections::VecDeque;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
#[derive(Debug, ThisError)]
pub enum Error {
Worker(#[from] crate::worker::Error),
Events(#[from] crate::events::Error),
Msp(#[from] crate::events2::msp::Error),
Logic,
}
struct FetchMsp {
fut: Pin<Box<dyn Future<Output = Option<Result<TsMs, crate::events2::msp::Error>>> + Send>>,
}
struct FetchEvents {
fut: Pin<Box<dyn Future<Output = Result<Box<dyn Events>, crate::events2::events::Error>> + Send>>,
}
enum ReadingState {
FetchMsp(FetchMsp),
FetchEvents(FetchEvents),
}
struct Reading {
scyqueue: ScyllaQueue,
reading_state: ReadingState,
}
enum State {
Begin,
Reading(Reading),
InputDone,
}
pub struct EventsStreamRt {
rt: RetentionTime,
series: SeriesId,
scalar_type: ScalarType,
shape: Shape,
range: ScyllaSeriesRange,
with_values: bool,
state: State,
scyqueue: ScyllaQueue,
msp_inp: MspStreamRt,
out: VecDeque<Box<dyn Events>>,
ts_seen_max: u64,
}
impl EventsStreamRt {
pub fn new(
rt: RetentionTime,
series: SeriesId,
scalar_type: ScalarType,
shape: Shape,
range: ScyllaSeriesRange,
with_values: bool,
scyqueue: ScyllaQueue,
) -> Self {
debug!("EventsStreamRt::new {series:?} {range:?} {rt:?}");
let msp_inp =
crate::events2::msp::MspStreamRt::new(rt.clone(), series.clone(), range.clone(), scyqueue.clone());
Self {
rt,
series,
scalar_type,
shape,
range,
with_values,
state: State::Begin,
scyqueue,
msp_inp,
out: VecDeque::new(),
ts_seen_max: 0,
}
}
fn __handle_reading(self: Pin<&mut Self>, st: &mut Reading, cx: &mut Context) -> Result<(), Error> {
let _ = st;
let _ = cx;
todo!()
}
fn make_read_events_fut(
&mut self,
ts_msp: TsMs,
scyqueue: ScyllaQueue,
) -> Pin<Box<dyn Future<Output = Result<Box<dyn Events>, Error>> + Send>> {
let fwd = true;
let opts = ReadNextValuesOpts::new(
self.rt.clone(),
self.series.clone(),
ts_msp,
self.range.clone(),
fwd,
self.with_values,
scyqueue,
);
let scalar_type = self.scalar_type.clone();
let shape = self.shape.clone();
let fut = async move {
let ret = match &shape {
Shape::Scalar => match &scalar_type {
ScalarType::U8 => read_next_values::<u8>(opts).await,
ScalarType::U16 => read_next_values::<u16>(opts).await,
ScalarType::U32 => read_next_values::<u32>(opts).await,
ScalarType::U64 => read_next_values::<u64>(opts).await,
ScalarType::I8 => read_next_values::<i8>(opts).await,
ScalarType::I16 => read_next_values::<i16>(opts).await,
ScalarType::I32 => read_next_values::<i32>(opts).await,
ScalarType::I64 => read_next_values::<i64>(opts).await,
ScalarType::F32 => read_next_values::<f32>(opts).await,
ScalarType::F64 => read_next_values::<f64>(opts).await,
ScalarType::BOOL => read_next_values::<bool>(opts).await,
ScalarType::STRING => read_next_values::<String>(opts).await,
ScalarType::Enum => read_next_values::<String>(opts).await,
ScalarType::ChannelStatus => {
warn!("read scalar channel status not yet supported");
err::todoval()
}
},
Shape::Wave(_) => match &scalar_type {
ScalarType::U8 => read_next_values::<Vec<u8>>(opts).await,
ScalarType::U16 => read_next_values::<Vec<u16>>(opts).await,
ScalarType::U32 => read_next_values::<Vec<u32>>(opts).await,
ScalarType::U64 => read_next_values::<Vec<u64>>(opts).await,
ScalarType::I8 => read_next_values::<Vec<i8>>(opts).await,
ScalarType::I16 => read_next_values::<Vec<i16>>(opts).await,
ScalarType::I32 => read_next_values::<Vec<i32>>(opts).await,
ScalarType::I64 => read_next_values::<Vec<i64>>(opts).await,
ScalarType::F32 => read_next_values::<Vec<f32>>(opts).await,
ScalarType::F64 => read_next_values::<Vec<f64>>(opts).await,
ScalarType::BOOL => read_next_values::<Vec<bool>>(opts).await,
ScalarType::STRING => {
warn!("read array string not yet supported");
err::todoval()
}
ScalarType::Enum => read_next_values::<Vec<String>>(opts).await,
ScalarType::ChannelStatus => {
warn!("read array channel status not yet supported");
err::todoval()
}
},
_ => {
error!("TODO ReadValues add more types");
err::todoval()
}
};
ret.map_err(Error::from)
};
Box::pin(fut)
}
}
impl Stream for EventsStreamRt {
type Item = Result<ChannelEvents, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
loop {
if let Some(item) = self.out.pop_front() {
item.verify();
if let Some(item_min) = item.ts_min() {
if item_min < self.ts_seen_max {
debug!("ordering error A {} {}", item_min, self.ts_seen_max);
}
}
if let Some(item_max) = item.ts_max() {
if item_max < self.ts_seen_max {
debug!("ordering error B {} {}", item_max, self.ts_seen_max);
} else {
self.ts_seen_max = item_max;
}
}
debug!("deliver item {}", item.output_info());
break Ready(Some(Ok(ChannelEvents::Events(item))));
}
break match &mut self.state {
State::Begin => {
let msp_inp = unsafe {
let ptr = (&mut self.msp_inp) as *mut MspStreamRt;
&mut *ptr
};
let fut = Box::pin(msp_inp.next());
self.state = State::Reading(Reading {
scyqueue: self.scyqueue.clone(),
reading_state: ReadingState::FetchMsp(FetchMsp { fut }),
});
continue;
}
State::Reading(st) => match &mut st.reading_state {
ReadingState::FetchMsp(st2) => match st2.fut.poll_unpin(cx) {
Ready(Some(Ok(ts))) => {
let scyqueue = st.scyqueue.clone();
let fut = self.make_read_events_fut(ts, scyqueue);
if let State::Reading(st) = &mut self.state {
st.reading_state = ReadingState::FetchEvents(FetchEvents { fut });
continue;
} else {
Ready(Some(Err(Error::Logic)))
}
}
Ready(Some(Err(e))) => Ready(Some(Err(e.into()))),
Ready(None) => {
self.state = State::InputDone;
continue;
}
Pending => Pending,
},
ReadingState::FetchEvents(st2) => match st2.fut.poll_unpin(cx) {
Ready(Ok(x)) => {
self.out.push_back(x);
let msp_inp = unsafe {
let ptr = (&mut self.msp_inp) as *mut MspStreamRt;
&mut *ptr
};
let fut = Box::pin(msp_inp.next());
if let State::Reading(st) = &mut self.state {
st.reading_state = ReadingState::FetchMsp(FetchMsp { fut });
continue;
} else {
Ready(Some(Err(Error::Logic)))
}
}
Ready(Err(e)) => Ready(Some(Err(e.into()))),
Pending => Pending,
},
},
State::InputDone => {
if self.out.len() == 0 {
Ready(None)
} else {
continue;
}
}
};
}
}
}
fn trait_assert<T>(_: T)
where
T: Stream + Unpin + Send,
{
}
#[allow(unused)]
fn trait_assert_try() {
let x: EventsStreamRt = phantomval();
trait_assert(x);
}
fn phantomval<T>() -> T {
panic!()
}

View File

@@ -0,0 +1,22 @@
use futures_util::Stream;
use items_0::Events;
use items_0::WithLen;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
pub struct FirstBefore<S> {
inp: S,
}
impl<S, T, E> Stream for FirstBefore<S>
where
S: Stream<Item = Result<T, E>> + Unpin,
T: Events,
{
type Item = <S as Stream>::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
todo!()
}
}

View File

@@ -0,0 +1,423 @@
use super::events::EventsStreamRt;
use super::nonempty::NonEmpty;
use crate::range::ScyllaSeriesRange;
use crate::worker::ScyllaQueue;
use err::thiserror;
use err::ThisError;
use futures_util::Future;
use futures_util::FutureExt;
use futures_util::Stream;
use futures_util::StreamExt;
use items_2::channelevents::ChannelEvents;
use items_2::merger::Mergeable;
use netpod::log::*;
use netpod::range::evrange::NanoRange;
use netpod::range::evrange::SeriesRange;
use netpod::ttl::RetentionTime;
use netpod::ScalarType;
use netpod::Shape;
use series::SeriesId;
use std::collections::VecDeque;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
#[derive(Debug, ThisError)]
pub enum Error {
Events(#[from] crate::events2::events::Error),
Logic,
}
enum Resolvable<F>
where
F: Future,
{
Future(F),
Output(<F as Future>::Output),
Taken,
}
impl<F> Resolvable<F>
where
F: Future,
{
fn unresolved(&self) -> bool {
match self {
Resolvable::Future(_) => true,
Resolvable::Output(_) => false,
Resolvable::Taken => false,
}
}
fn take(&mut self) -> Option<<F as Future>::Output> {
let x = std::mem::replace(self, Resolvable::Taken);
match x {
Resolvable::Future(_) => None,
Resolvable::Output(x) => Some(x),
Resolvable::Taken => None,
}
}
}
impl<F> Future for Resolvable<F>
where
F: Future + Unpin,
{
type Output = <F as Future>::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<<F as Future>::Output> {
match unsafe { self.get_unchecked_mut() } {
Resolvable::Future(fut) => fut.poll_unpin(cx),
Resolvable::Output(_) => panic!(),
Resolvable::Taken => panic!(),
}
}
}
struct ReadEvents {
fut: Pin<Box<dyn Future<Output = Option<Result<ChannelEvents, crate::events2::events::Error>>> + Send>>,
}
enum State {
Begin,
FetchFirstSt(ReadEvents),
FetchFirstMt(ReadEvents),
FetchFirstLt(ReadEvents),
ReadingLt(
Option<ReadEvents>,
VecDeque<ChannelEvents>,
Option<Box<NonEmpty<EventsStreamRt>>>,
),
ReadingMt(
Option<ReadEvents>,
VecDeque<ChannelEvents>,
Option<Box<NonEmpty<EventsStreamRt>>>,
),
ReadingSt(
Option<ReadEvents>,
VecDeque<ChannelEvents>,
Option<Box<NonEmpty<EventsStreamRt>>>,
),
Error,
}
pub struct MergeRts {
series: SeriesId,
scalar_type: ScalarType,
shape: Shape,
range: ScyllaSeriesRange,
with_values: bool,
scyqueue: ScyllaQueue,
inp_st: Option<Box<NonEmpty<EventsStreamRt>>>,
inp_mt: Option<Box<NonEmpty<EventsStreamRt>>>,
inp_lt: Option<Box<NonEmpty<EventsStreamRt>>>,
state: State,
buf_st: VecDeque<ChannelEvents>,
buf_mt: VecDeque<ChannelEvents>,
buf_lt: VecDeque<ChannelEvents>,
out: VecDeque<ChannelEvents>,
}
impl MergeRts {
pub fn new(
series: SeriesId,
scalar_type: ScalarType,
shape: Shape,
range: ScyllaSeriesRange,
with_values: bool,
scyqueue: ScyllaQueue,
) -> Self {
Self {
series,
scalar_type,
shape,
range,
with_values,
scyqueue,
inp_st: None,
inp_mt: None,
inp_lt: None,
state: State::Begin,
buf_st: VecDeque::new(),
buf_mt: VecDeque::new(),
buf_lt: VecDeque::new(),
out: VecDeque::new(),
}
}
fn setup_first_st(&mut self) {
let inp = EventsStreamRt::new(
RetentionTime::Short,
self.series.clone(),
self.scalar_type.clone(),
self.shape.clone(),
self.range.clone(),
self.with_values,
self.scyqueue.clone(),
);
let inp = NonEmpty::new(inp);
self.inp_st = Some(Box::new(inp));
}
fn setup_first_mt(&mut self) {
let inp = EventsStreamRt::new(
RetentionTime::Medium,
self.series.clone(),
self.scalar_type.clone(),
self.shape.clone(),
Self::constrained_range(&self.range, &self.buf_st),
self.with_values,
self.scyqueue.clone(),
);
let inp = NonEmpty::new(inp);
self.inp_mt = Some(Box::new(inp));
}
fn setup_first_lt(&mut self) {
let inp = EventsStreamRt::new(
RetentionTime::Long,
self.series.clone(),
self.scalar_type.clone(),
self.shape.clone(),
Self::constrained_range(&self.range, &self.buf_mt),
self.with_values,
self.scyqueue.clone(),
);
let inp = NonEmpty::new(inp);
self.inp_lt = Some(Box::new(inp));
}
fn setup_read_st(&mut self) -> ReadEvents {
let stream = unsafe { &mut *(self.inp_st.as_mut().unwrap().as_mut() as *mut NonEmpty<EventsStreamRt>) };
let fut = Box::pin(stream.next());
ReadEvents { fut }
}
fn setup_read_mt(&mut self) -> ReadEvents {
let stream = unsafe { &mut *(self.inp_mt.as_mut().unwrap().as_mut() as *mut NonEmpty<EventsStreamRt>) };
let fut = Box::pin(stream.next());
ReadEvents { fut }
}
fn setup_read_lt(&mut self) -> ReadEvents {
let stream = unsafe { &mut *(self.inp_lt.as_mut().unwrap().as_mut() as *mut NonEmpty<EventsStreamRt>) };
let fut = Box::pin(stream.next());
ReadEvents { fut }
}
fn setup_read_any(inp: &mut Option<Box<NonEmpty<EventsStreamRt>>>) -> ReadEvents {
let stream = unsafe { &mut *(inp.as_mut().unwrap().as_mut() as *mut NonEmpty<EventsStreamRt>) };
let fut = Box::pin(stream.next());
ReadEvents { fut }
}
fn constrained_range(full: &ScyllaSeriesRange, buf: &VecDeque<ChannelEvents>) -> ScyllaSeriesRange {
if let Some(e) = buf.front() {
if let Some(ts) = e.ts_min() {
let nrange = NanoRange::from((ts, 0));
ScyllaSeriesRange::from(&SeriesRange::from(nrange))
} else {
debug!("no ts even though should not have empty buffers");
full.clone()
}
} else {
full.clone()
}
}
fn dummy(&mut self) -> bool {
if self.inp_lt.is_some() {
// *fut = Some(self.setup_read_lt());
true
} else {
false
}
}
}
impl Stream for MergeRts {
type Item = Result<ChannelEvents, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
let mut out2 = VecDeque::new();
loop {
while let Some(x) = out2.pop_front() {
self.out.push_back(x);
}
if let Some(item) = self.out.pop_front() {
break Ready(Some(Ok(item)));
}
break match &mut self.state {
State::Begin => {
self.setup_first_st();
self.state = State::FetchFirstSt(self.setup_read_st());
continue;
}
State::FetchFirstSt(st2) => match st2.fut.poll_unpin(cx) {
Ready(Some(Ok(x))) => {
debug!("have first from ST");
self.buf_st.push_back(x);
self.setup_first_mt();
self.state = State::FetchFirstMt(self.setup_read_mt());
continue;
}
Ready(Some(Err(e))) => {
self.state = State::Error;
Ready(Some(Err(e.into())))
}
Ready(None) => {
debug!("no first from ST");
self.inp_st = None;
self.setup_first_mt();
self.state = State::FetchFirstMt(self.setup_read_mt());
continue;
}
Pending => Pending,
},
State::FetchFirstMt(st2) => match st2.fut.poll_unpin(cx) {
Ready(Some(Ok(x))) => {
debug!("have first from MT");
self.buf_mt.push_back(x);
self.setup_first_lt();
self.state = State::FetchFirstLt(self.setup_read_lt());
continue;
}
Ready(Some(Err(e))) => {
self.state = State::Error;
Ready(Some(Err(e.into())))
}
Ready(None) => {
debug!("no first from MT");
self.inp_mt = None;
self.setup_first_lt();
self.state = State::FetchFirstLt(self.setup_read_lt());
continue;
}
Pending => Pending,
},
State::FetchFirstLt(st2) => match st2.fut.poll_unpin(cx) {
Ready(Some(Ok(x))) => {
debug!("have first from LT");
self.buf_lt.push_back(x);
let buf = core::mem::replace(&mut self.buf_lt, VecDeque::new());
self.state = State::ReadingLt(None, buf, self.inp_lt.take());
continue;
}
Ready(Some(Err(e))) => {
self.state = State::Error;
Ready(Some(Err(e.into())))
}
Ready(None) => {
debug!("no first from LT");
self.inp_lt = None;
let buf = core::mem::replace(&mut self.buf_lt, VecDeque::new());
self.state = State::ReadingLt(None, buf, self.inp_lt.take());
continue;
}
Pending => Pending,
},
State::ReadingLt(fut, buf, inp) => {
if let Some(x) = buf.pop_front() {
out2.push_back(x);
continue;
} else if let Some(fut2) = fut {
match fut2.fut.poll_unpin(cx) {
Ready(Some(Ok(x))) => {
*fut = None;
buf.push_back(x);
continue;
}
Ready(Some(Err(e))) => {
*fut = None;
self.state = State::Error;
Ready(Some(Err(e.into())))
}
Ready(None) => {
*fut = None;
self.inp_lt = None;
continue;
}
Pending => Pending,
}
} else if inp.is_some() {
let buf = core::mem::replace(buf, VecDeque::new());
self.state = State::ReadingLt(Some(Self::setup_read_any(inp)), buf, inp.take());
// *fut = Some(self.setup_read_lt());
continue;
} else {
let buf = core::mem::replace(&mut self.buf_mt, VecDeque::new());
self.state = State::ReadingMt(None, buf, self.inp_mt.take());
continue;
}
}
State::ReadingMt(fut, buf, inp) => {
if let Some(x) = buf.pop_front() {
out2.push_back(x);
continue;
} else if let Some(fut2) = fut {
match fut2.fut.poll_unpin(cx) {
Ready(Some(Ok(x))) => {
*fut = None;
buf.push_back(x);
continue;
}
Ready(Some(Err(e))) => {
*fut = None;
self.state = State::Error;
Ready(Some(Err(e.into())))
}
Ready(None) => {
*fut = None;
self.inp_mt = None;
continue;
}
Pending => Pending,
}
} else if inp.is_some() {
let buf = core::mem::replace(buf, VecDeque::new());
self.state = State::ReadingMt(Some(Self::setup_read_any(inp)), buf, inp.take());
continue;
} else {
let buf = core::mem::replace(&mut self.buf_st, VecDeque::new());
self.state = State::ReadingSt(None, buf, self.inp_st.take());
continue;
}
}
State::ReadingSt(fut, buf, inp) => {
if let Some(x) = buf.pop_front() {
out2.push_back(x);
continue;
} else if let Some(fut2) = fut {
match fut2.fut.poll_unpin(cx) {
Ready(Some(Ok(x))) => {
*fut = None;
buf.push_back(x);
continue;
}
Ready(Some(Err(e))) => {
*fut = None;
self.state = State::Error;
Ready(Some(Err(e.into())))
}
Ready(None) => {
*fut = None;
self.inp_st = None;
continue;
}
Pending => Pending,
}
} else if inp.is_some() {
let buf = core::mem::replace(buf, VecDeque::new());
self.state = State::ReadingSt(Some(Self::setup_read_any(inp)), buf, inp.take());
continue;
} else {
debug!("fully done");
Ready(None)
}
}
State::Error => Ready(None),
};
}
}
}

View File

@@ -54,7 +54,7 @@ enum State {
}
#[pin_project::pin_project]
pub struct MspStream {
pub struct MspStreamRt {
rt: RetentionTime,
series: SeriesId,
range: ScyllaSeriesRange,
@@ -63,7 +63,7 @@ pub struct MspStream {
out: VecDeque<TsMs>,
}
impl MspStream {
impl MspStreamRt {
pub fn new(rt: RetentionTime, series: SeriesId, range: ScyllaSeriesRange, scyqueue: ScyllaQueue) -> Self {
let fut_bck = {
let scyqueue = scyqueue.clone();
@@ -93,7 +93,7 @@ impl MspStream {
}
}
impl Stream for MspStream {
impl Stream for MspStreamRt {
type Item = Result<TsMs, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
@@ -112,8 +112,7 @@ impl Stream for MspStream {
have_pending = true;
}
},
Resolvable::Output(_) => {}
Resolvable::Taken => {}
_ => {}
}
let rsv = &mut st.fut_fwd;
match rsv {
@@ -125,37 +124,28 @@ impl Stream for MspStream {
have_pending = true;
}
},
Resolvable::Output(_) => {}
Resolvable::Taken => {}
_ => {}
}
if have_pending {
Pending
} else {
let taken_bck = st.fut_bck.take();
let taken_fwd = st.fut_fwd.take();
if let Some(x) = taken_bck {
match x {
Ok(v) => {
for e in v {
self.out.push_back(e)
}
if let Some(x) = taken_fwd {
match x {
Ok(v) => {
for e in v {
self.out.push_back(e)
}
self.state = State::InputDone;
continue;
}
Err(e) => Ready(Some(Err(e.into()))),
self.state = State::InputDone;
if let (Some(taken_bck), Some(taken_fwd)) = (taken_bck, taken_fwd) {
match taken_bck {
Ok(v1) => match taken_fwd {
Ok(v2) => {
for e in v1 {
self.out.push_back(e)
}
} else {
Ready(Some(Err(Error::Logic)))
for e in v2 {
self.out.push_back(e)
}
continue;
}
}
Err(e) => Ready(Some(Err(e.into()))),
},
Err(e) => Ready(Some(Err(e.into()))),
}
} else {
@@ -183,10 +173,10 @@ where
#[allow(unused)]
fn trait_assert_try() {
let x: MspStream = todoval();
let x: MspStreamRt = phantomval();
trait_assert(x);
}
fn todoval<T>() -> T {
todo!()
fn phantomval<T>() -> T {
panic!()
}

View File

@@ -0,0 +1,42 @@
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::WithLen;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
pub struct NonEmpty<S> {
inp: S,
}
impl<S> NonEmpty<S> {
pub fn new(inp: S) -> Self {
Self { inp }
}
}
impl<S, T, E> Stream for NonEmpty<S>
where
S: Stream<Item = Result<T, E>> + Unpin,
T: WithLen,
{
type Item = <S as Stream>::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
loop {
break match self.inp.poll_next_unpin(cx) {
Ready(Some(Ok(x))) => {
if x.len() != 0 {
Ready(Some(Ok(x)))
} else {
continue;
}
}
Ready(Some(Err(e))) => Ready(Some(Err(e))),
Ready(None) => Ready(None),
Pending => Pending,
};
}
}
}

View File

@@ -159,7 +159,7 @@ fn tracing_init_inner(mode: TracingMode) -> Result<(), Error> {
console_subscriber::init();
} else {
// Logging setup
let filter = tracing_subscriber::EnvFilter::builder()
let filter_1 = tracing_subscriber::EnvFilter::builder()
.with_default_directive(tracing::metadata::LevelFilter::INFO.into())
.from_env()
.map_err(|e| Error::with_msg_no_trace(format!("can not build tracing env filter {e}")))?;
@@ -168,9 +168,9 @@ fn tracing_init_inner(mode: TracingMode) -> Result<(), Error> {
.with_default_directive(tracing::metadata::LevelFilter::INFO.into())
.from_env()
.map_err(|e| Error::with_msg_no_trace(format!("can not build tracing env filter {e}")))?;
let filter_3 = tracing_subscriber::filter::dynamic_filter_fn(|meta, ctx| {
/*let filter_3 = tracing_subscriber::filter::dynamic_filter_fn(|meta, ctx| {
if true {
return true;
return false;
}
if *meta.level() <= tracing::Level::TRACE {
if ["httpret", "scyllaconn"].contains(&meta.target()) {
@@ -207,7 +207,7 @@ fn tracing_init_inner(mode: TracingMode) -> Result<(), Error> {
} else {
true
}
});
});*/
let fmt_layer = tracing_subscriber::fmt::Layer::new()
.with_writer(io::stderr)
.with_timer(timer)
@@ -215,9 +215,9 @@ fn tracing_init_inner(mode: TracingMode) -> Result<(), Error> {
.with_ansi(false)
.with_thread_names(true)
.event_format(formatter::FormatTxt)
.with_filter(filter_3)
// .with_filter(filter_3)
.with_filter(filter_2)
.with_filter(filter)
.with_filter(filter_1)
// .and_then(LogFilterLayer::new("lay1".into()))
// .and_then(LogFilterLayer::new("lay2".into()))
;