Reduce db connections, improve merge mt/lt
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
use crate::errconv::ErrConv;
|
||||
use err::Error;
|
||||
use netpod::log::*;
|
||||
use netpod::ScyllaConfig;
|
||||
use scylla::execution_profile::ExecutionProfileBuilder;
|
||||
use scylla::statement::Consistency;
|
||||
@@ -16,6 +17,7 @@ pub async fn create_scy_session(scyconf: &ScyllaConfig) -> Result<Arc<ScySession
|
||||
}
|
||||
|
||||
pub async fn create_scy_session_no_ks(scyconf: &ScyllaConfig) -> Result<ScySession, Error> {
|
||||
warn!("create_connection\n\n CREATING SCYLLA CONNECTION\n\n");
|
||||
let scy = scylla::SessionBuilder::new()
|
||||
.known_nodes(&scyconf.hosts)
|
||||
.default_execution_profile_handle(
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
use crate::errconv::ErrConv;
|
||||
use crate::range::ScyllaSeriesRange;
|
||||
use crate::worker::ScyllaQueue;
|
||||
use err::thiserror;
|
||||
@@ -25,7 +24,6 @@ use netpod::TsNano;
|
||||
use scylla::frame::response::result::Row;
|
||||
use scylla::prepared_statement::PreparedStatement;
|
||||
use scylla::Session;
|
||||
use scylla::Session as ScySession;
|
||||
use series::SeriesId;
|
||||
use std::collections::VecDeque;
|
||||
use std::mem;
|
||||
@@ -271,7 +269,7 @@ pub(super) async fn find_ts_msp(
|
||||
range: ScyllaSeriesRange,
|
||||
bck: bool,
|
||||
stmts: &StmtsEvents,
|
||||
scy: &ScySession,
|
||||
scy: &Session,
|
||||
) -> Result<VecDeque<TsMs>, Error> {
|
||||
trace!("find_ts_msp series {:?} {:?} {:?} bck {}", rt, series, range, bck);
|
||||
if bck {
|
||||
@@ -286,7 +284,7 @@ async fn find_ts_msp_fwd(
|
||||
series: u64,
|
||||
range: ScyllaSeriesRange,
|
||||
stmts: &StmtsEvents,
|
||||
scy: &ScySession,
|
||||
scy: &Session,
|
||||
) -> Result<VecDeque<TsMs>, Error> {
|
||||
let mut ret = VecDeque::new();
|
||||
// TODO time range truncation can be handled better
|
||||
@@ -308,7 +306,7 @@ async fn find_ts_msp_bck(
|
||||
series: u64,
|
||||
range: ScyllaSeriesRange,
|
||||
stmts: &StmtsEvents,
|
||||
scy: &ScySession,
|
||||
scy: &Session,
|
||||
) -> Result<VecDeque<TsMs>, Error> {
|
||||
let mut ret = VecDeque::new();
|
||||
let params = (series as i64, range.beg().ms() as i64);
|
||||
@@ -324,7 +322,7 @@ async fn find_ts_msp_bck(
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
trait ValTy: Sized + 'static {
|
||||
pub(super) trait ValTy: Sized + 'static {
|
||||
type ScaTy: ScalarOps + std::default::Default;
|
||||
type ScyTy: scylla::cql_to_rust::FromCqlVal<scylla::frame::response::result::CqlValue>;
|
||||
type Container: Events + Appendable<Self>;
|
||||
@@ -499,7 +497,7 @@ where
|
||||
{
|
||||
// TODO could take scyqeue out of opts struct.
|
||||
let scyqueue = opts.scyqueue.clone();
|
||||
let futgen = Box::new(|scy: Arc<ScySession>, stmts: Arc<StmtsEvents>| {
|
||||
let futgen = Box::new(|scy: Arc<Session>, stmts: Arc<StmtsEvents>| {
|
||||
let fut = async {
|
||||
read_next_values_2::<ST>(opts, scy, stmts)
|
||||
.await
|
||||
@@ -513,7 +511,7 @@ where
|
||||
|
||||
async fn read_next_values_2<ST>(
|
||||
opts: ReadNextValuesOpts,
|
||||
scy: Arc<ScySession>,
|
||||
scy: Arc<Session>,
|
||||
stmts: Arc<StmtsEvents>,
|
||||
) -> Result<Box<dyn Events>, Error>
|
||||
where
|
||||
@@ -545,20 +543,6 @@ where
|
||||
ts_lsp_max,
|
||||
table_name,
|
||||
);
|
||||
let dir = "fwd";
|
||||
let qu_name = if opts.with_values {
|
||||
if ST::is_valueblob() {
|
||||
format!("array_{}_valueblobs_{}", ST::st_name(), dir)
|
||||
} else {
|
||||
format!("scalar_{}_values_{}", ST::st_name(), dir)
|
||||
}
|
||||
} else {
|
||||
if ST::is_valueblob() {
|
||||
format!("array_{}_timestamps_{}", ST::st_name(), dir)
|
||||
} else {
|
||||
format!("scalar_{}_timestamps_{}", ST::st_name(), dir)
|
||||
}
|
||||
};
|
||||
let qu = stmts
|
||||
.rt(&opts.rt)
|
||||
.lsp(!opts.fwd, opts.with_values)
|
||||
@@ -586,20 +570,6 @@ where
|
||||
DtNano::from_ns(0)
|
||||
};
|
||||
trace!("BCK ts_msp {} ts_lsp_max {} {}", ts_msp, ts_lsp_max, table_name,);
|
||||
let dir = "bck";
|
||||
let qu_name = if opts.with_values {
|
||||
if ST::is_valueblob() {
|
||||
format!("array_{}_valueblobs_{}", ST::st_name(), dir)
|
||||
} else {
|
||||
format!("scalar_{}_values_{}", ST::st_name(), dir)
|
||||
}
|
||||
} else {
|
||||
if ST::is_valueblob() {
|
||||
format!("array_{}_timestamps_{}", ST::st_name(), dir)
|
||||
} else {
|
||||
format!("scalar_{}_timestamps_{}", ST::st_name(), dir)
|
||||
}
|
||||
};
|
||||
let qu = stmts
|
||||
.rt(&opts.rt)
|
||||
.lsp(!opts.fwd, opts.with_values)
|
||||
@@ -829,7 +799,7 @@ pub struct EventsStreamScylla {
|
||||
}
|
||||
|
||||
impl EventsStreamScylla {
|
||||
pub fn new(
|
||||
pub fn _new(
|
||||
rt: RetentionTime,
|
||||
series: u64,
|
||||
range: ScyllaSeriesRange,
|
||||
@@ -990,16 +960,6 @@ impl EventsStreamScylla {
|
||||
}
|
||||
}
|
||||
|
||||
async fn find_ts_msp_via_queue(
|
||||
rt: RetentionTime,
|
||||
series: u64,
|
||||
range: ScyllaSeriesRange,
|
||||
bck: bool,
|
||||
scyqueue: ScyllaQueue,
|
||||
) -> Result<VecDeque<TsMs>, crate::worker::Error> {
|
||||
scyqueue.find_ts_msp(rt, series, range, bck).await
|
||||
}
|
||||
|
||||
impl Stream for EventsStreamScylla {
|
||||
type Item = Result<ChannelEvents, Error>;
|
||||
|
||||
@@ -1032,8 +992,9 @@ impl Stream for EventsStreamScylla {
|
||||
let series = self.series.clone();
|
||||
let range = self.range.clone();
|
||||
// TODO this no longer works, we miss the backwards part here
|
||||
let fut = find_ts_msp_via_queue(self.rt.clone(), series, range, false, self.scyqueue.clone());
|
||||
let fut = Box::pin(fut);
|
||||
// let fut = find_ts_msp_via_queue(self.rt.clone(), series, range, false, self.scyqueue.clone());
|
||||
// let fut = Box::pin(fut);
|
||||
let fut = todo!();
|
||||
self.state = FrState::FindMsp(fut);
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -52,6 +52,7 @@ enum State {
|
||||
Begin,
|
||||
Reading(Reading),
|
||||
InputDone,
|
||||
Done,
|
||||
}
|
||||
|
||||
pub struct EventsStreamRt {
|
||||
@@ -180,15 +181,45 @@ impl Stream for EventsStreamRt {
|
||||
use Poll::*;
|
||||
loop {
|
||||
if let Some(item) = self.out.pop_front() {
|
||||
item.verify();
|
||||
if !item.verify() {
|
||||
debug!("{}bad item {:?}", "\n\n--------------------------\n", item);
|
||||
self.state = State::Done;
|
||||
break Ready(Some(Err(Error::Logic)));
|
||||
}
|
||||
if let Some(item_min) = item.ts_min() {
|
||||
if item_min < self.range.beg().ns() {
|
||||
debug!(
|
||||
"{}out of range error A {} {:?}",
|
||||
"\n\n--------------------------\n", item_min, self.range
|
||||
);
|
||||
self.state = State::Done;
|
||||
break Ready(Some(Err(Error::Logic)));
|
||||
}
|
||||
if item_min < self.ts_seen_max {
|
||||
debug!("ordering error A {} {}", item_min, self.ts_seen_max);
|
||||
debug!(
|
||||
"{}ordering error A {} {}",
|
||||
"\n\n--------------------------\n", item_min, self.ts_seen_max
|
||||
);
|
||||
self.state = State::Done;
|
||||
break Ready(Some(Err(Error::Logic)));
|
||||
}
|
||||
}
|
||||
if let Some(item_max) = item.ts_max() {
|
||||
if item_max >= self.range.end().ns() {
|
||||
debug!(
|
||||
"{}out of range error B {} {:?}",
|
||||
"\n\n--------------------------\n", item_max, self.range
|
||||
);
|
||||
self.state = State::Done;
|
||||
break Ready(Some(Err(Error::Logic)));
|
||||
}
|
||||
if item_max < self.ts_seen_max {
|
||||
debug!("ordering error B {} {}", item_max, self.ts_seen_max);
|
||||
debug!(
|
||||
"{}ordering error B {} {}",
|
||||
"\n\n--------------------------\n", item_max, self.ts_seen_max
|
||||
);
|
||||
self.state = State::Done;
|
||||
break Ready(Some(Err(Error::Logic)));
|
||||
} else {
|
||||
self.ts_seen_max = item_max;
|
||||
}
|
||||
@@ -218,6 +249,7 @@ impl Stream for EventsStreamRt {
|
||||
st.reading_state = ReadingState::FetchEvents(FetchEvents { fut });
|
||||
continue;
|
||||
} else {
|
||||
self.state = State::Done;
|
||||
Ready(Some(Err(Error::Logic)))
|
||||
}
|
||||
}
|
||||
@@ -240,10 +272,14 @@ impl Stream for EventsStreamRt {
|
||||
st.reading_state = ReadingState::FetchMsp(FetchMsp { fut });
|
||||
continue;
|
||||
} else {
|
||||
self.state = State::Done;
|
||||
Ready(Some(Err(Error::Logic)))
|
||||
}
|
||||
}
|
||||
Ready(Err(e)) => Ready(Some(Err(e.into()))),
|
||||
Ready(Err(e)) => {
|
||||
self.state = State::Done;
|
||||
Ready(Some(Err(e.into())))
|
||||
}
|
||||
Pending => Pending,
|
||||
},
|
||||
},
|
||||
@@ -254,6 +290,7 @@ impl Stream for EventsStreamRt {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
State::Done => Ready(None),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,22 +1,179 @@
|
||||
use err::thiserror;
|
||||
use err::ThisError;
|
||||
use futures_util::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use items_0::Events;
|
||||
use items_0::WithLen;
|
||||
use items_2::merger::Mergeable;
|
||||
use netpod::log::*;
|
||||
use netpod::TsNano;
|
||||
use std::pin::Pin;
|
||||
use std::task::Context;
|
||||
use std::task::Poll;
|
||||
|
||||
pub struct FirstBefore<S> {
|
||||
inp: S,
|
||||
#[derive(Debug, ThisError)]
|
||||
pub enum Error {
|
||||
Unordered,
|
||||
Logic,
|
||||
Input(Box<dyn std::error::Error + Send>),
|
||||
}
|
||||
|
||||
impl<S, T, E> Stream for FirstBefore<S>
|
||||
where
|
||||
S: Stream<Item = Result<T, E>> + Unpin,
|
||||
T: Events,
|
||||
{
|
||||
type Item = <S as Stream>::Item;
|
||||
pub enum Output<T> {
|
||||
First(T, T),
|
||||
Bulk(T),
|
||||
}
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
todo!()
|
||||
enum State {
|
||||
Begin,
|
||||
Bulk,
|
||||
Done,
|
||||
}
|
||||
|
||||
pub struct FirstBeforeAndInside<S, T>
|
||||
where
|
||||
S: Stream + Unpin,
|
||||
T: Events + Mergeable + Unpin,
|
||||
{
|
||||
ts0: TsNano,
|
||||
inp: S,
|
||||
state: State,
|
||||
buf: Option<T>,
|
||||
}
|
||||
|
||||
impl<S, T> FirstBeforeAndInside<S, T>
|
||||
where
|
||||
S: Stream + Unpin,
|
||||
T: Events + Mergeable + Unpin,
|
||||
{
|
||||
pub fn new(inp: S, ts0: TsNano) -> Self {
|
||||
Self {
|
||||
ts0,
|
||||
inp,
|
||||
state: State::Begin,
|
||||
buf: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, T, E> Stream for FirstBeforeAndInside<S, T>
|
||||
where
|
||||
S: Stream<Item = Result<T, E>> + Unpin,
|
||||
T: Events + Mergeable + Unpin,
|
||||
E: std::error::Error + Send + 'static,
|
||||
{
|
||||
type Item = Result<Output<T>, Error>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use Poll::*;
|
||||
loop {
|
||||
break match &self.state {
|
||||
State::Begin => match self.inp.poll_next_unpin(cx) {
|
||||
Ready(Some(Ok(mut item))) => {
|
||||
// It is an invariant that we process ordered streams, but for robustness
|
||||
// verify the batch item again:
|
||||
if item.verify() != true {
|
||||
self.state = State::Done;
|
||||
let e = Error::Unordered;
|
||||
Ready(Some(Err(e)))
|
||||
} else {
|
||||
// Separate events into before and bulk
|
||||
let tss = item.tss();
|
||||
let pp = tss.partition_point(|&x| x < self.ts0.ns());
|
||||
if pp >= tss.len() {
|
||||
// all entries are before
|
||||
if self.buf.is_none() {
|
||||
self.buf = Some(item.new_empty());
|
||||
}
|
||||
match item.drain_into_evs(self.buf.as_mut().unwrap(), (0, item.len())) {
|
||||
Ok(()) => {
|
||||
continue;
|
||||
}
|
||||
Err(e) => {
|
||||
self.state = State::Done;
|
||||
Ready(Some(Err(Error::Input(Box::new(e)))))
|
||||
}
|
||||
}
|
||||
} else if pp == 0 {
|
||||
// all entries are bulk
|
||||
debug!("transition immediately to bulk");
|
||||
self.state = State::Bulk;
|
||||
let o1 = core::mem::replace(&mut self.buf, Some(item.new_empty()))
|
||||
.unwrap_or_else(|| item.new_empty());
|
||||
Ready(Some(Ok(Output::First(o1, item))))
|
||||
} else {
|
||||
// mixed
|
||||
match item.drain_into_evs(self.buf.as_mut().unwrap(), (0, pp)) {
|
||||
Ok(()) => {
|
||||
debug!("transition with mixed to bulk");
|
||||
self.state = State::Bulk;
|
||||
let o1 = core::mem::replace(&mut self.buf, Some(item.new_empty()))
|
||||
.unwrap_or_else(|| item.new_empty());
|
||||
Ready(Some(Ok(Output::First(o1, item))))
|
||||
}
|
||||
Err(e) => {
|
||||
self.state = State::Done;
|
||||
Ready(Some(Err(Error::Input(Box::new(e)))))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ready(Some(Err(e))) => {
|
||||
self.state = State::Done;
|
||||
Ready(Some(Err(Error::Input(Box::new(e)))))
|
||||
}
|
||||
Ready(None) => {
|
||||
self.state = State::Done;
|
||||
Ready(None)
|
||||
}
|
||||
Pending => Pending,
|
||||
},
|
||||
State::Bulk => {
|
||||
if self.buf.as_ref().map_or(0, |x| x.len()) != 0 {
|
||||
self.state = State::Done;
|
||||
Ready(Some(Err(Error::Logic)))
|
||||
} else {
|
||||
match self.inp.poll_next_unpin(cx) {
|
||||
Ready(Some(Ok(item))) => {
|
||||
if item.verify() != true {
|
||||
self.state = State::Done;
|
||||
let e = Error::Unordered;
|
||||
Ready(Some(Err(e)))
|
||||
} else {
|
||||
debug!("output bulk item len {}", item.len());
|
||||
Ready(Some(Ok(Output::Bulk(item))))
|
||||
}
|
||||
}
|
||||
Ready(Some(Err(e))) => {
|
||||
self.state = State::Done;
|
||||
Ready(Some(Err(Error::Input(Box::new(e)))))
|
||||
}
|
||||
Ready(None) => {
|
||||
debug!("in bulk, input done");
|
||||
self.state = State::Done;
|
||||
Ready(None)
|
||||
}
|
||||
Pending => Pending,
|
||||
}
|
||||
}
|
||||
}
|
||||
State::Done => Ready(None),
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn trait_assert<T>(_: T)
|
||||
where
|
||||
T: Stream + Unpin + Send,
|
||||
{
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
fn trait_assert_try() {
|
||||
let x: FirstBeforeAndInside<super::events::EventsStreamRt, items_2::channelevents::ChannelEvents> = phantomval();
|
||||
trait_assert(x);
|
||||
}
|
||||
|
||||
fn phantomval<T>() -> T {
|
||||
panic!()
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use super::events::EventsStreamRt;
|
||||
use super::nonempty::NonEmpty;
|
||||
use super::firstbefore::FirstBeforeAndInside;
|
||||
use crate::events2::firstbefore;
|
||||
use crate::range::ScyllaSeriesRange;
|
||||
use crate::worker::ScyllaQueue;
|
||||
use err::thiserror;
|
||||
@@ -8,6 +9,7 @@ use futures_util::Future;
|
||||
use futures_util::FutureExt;
|
||||
use futures_util::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use items_0::WithLen;
|
||||
use items_2::channelevents::ChannelEvents;
|
||||
use items_2::merger::Mergeable;
|
||||
use netpod::log::*;
|
||||
@@ -24,6 +26,7 @@ use std::task::Poll;
|
||||
|
||||
#[derive(Debug, ThisError)]
|
||||
pub enum Error {
|
||||
Input(#[from] crate::events2::firstbefore::Error),
|
||||
Events(#[from] crate::events2::events::Error),
|
||||
Logic,
|
||||
}
|
||||
@@ -74,8 +77,11 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
type TI = FirstBeforeAndInside<EventsStreamRt, ChannelEvents>;
|
||||
type INPI = Result<crate::events2::firstbefore::Output<ChannelEvents>, crate::events2::firstbefore::Error>;
|
||||
|
||||
struct ReadEvents {
|
||||
fut: Pin<Box<dyn Future<Output = Option<Result<ChannelEvents, crate::events2::events::Error>>> + Send>>,
|
||||
fut: Pin<Box<dyn Future<Output = Option<INPI>> + Send>>,
|
||||
}
|
||||
|
||||
enum State {
|
||||
@@ -83,22 +89,10 @@ enum State {
|
||||
FetchFirstSt(ReadEvents),
|
||||
FetchFirstMt(ReadEvents),
|
||||
FetchFirstLt(ReadEvents),
|
||||
ReadingLt(
|
||||
Option<ReadEvents>,
|
||||
VecDeque<ChannelEvents>,
|
||||
Option<Box<NonEmpty<EventsStreamRt>>>,
|
||||
),
|
||||
ReadingMt(
|
||||
Option<ReadEvents>,
|
||||
VecDeque<ChannelEvents>,
|
||||
Option<Box<NonEmpty<EventsStreamRt>>>,
|
||||
),
|
||||
ReadingSt(
|
||||
Option<ReadEvents>,
|
||||
VecDeque<ChannelEvents>,
|
||||
Option<Box<NonEmpty<EventsStreamRt>>>,
|
||||
),
|
||||
Error,
|
||||
ReadingLt(Option<ReadEvents>, VecDeque<ChannelEvents>, Option<Box<TI>>),
|
||||
ReadingMt(Option<ReadEvents>, VecDeque<ChannelEvents>, Option<Box<TI>>),
|
||||
ReadingSt(Option<ReadEvents>, VecDeque<ChannelEvents>, Option<Box<TI>>),
|
||||
Done,
|
||||
}
|
||||
|
||||
pub struct MergeRts {
|
||||
@@ -106,16 +100,20 @@ pub struct MergeRts {
|
||||
scalar_type: ScalarType,
|
||||
shape: Shape,
|
||||
range: ScyllaSeriesRange,
|
||||
range_mt: ScyllaSeriesRange,
|
||||
range_lt: ScyllaSeriesRange,
|
||||
with_values: bool,
|
||||
scyqueue: ScyllaQueue,
|
||||
inp_st: Option<Box<NonEmpty<EventsStreamRt>>>,
|
||||
inp_mt: Option<Box<NonEmpty<EventsStreamRt>>>,
|
||||
inp_lt: Option<Box<NonEmpty<EventsStreamRt>>>,
|
||||
inp_st: Option<Box<TI>>,
|
||||
inp_mt: Option<Box<TI>>,
|
||||
inp_lt: Option<Box<TI>>,
|
||||
state: State,
|
||||
buf_st: VecDeque<ChannelEvents>,
|
||||
buf_mt: VecDeque<ChannelEvents>,
|
||||
buf_lt: VecDeque<ChannelEvents>,
|
||||
out: VecDeque<ChannelEvents>,
|
||||
buf_before: Option<ChannelEvents>,
|
||||
ts_seen_max: u64,
|
||||
}
|
||||
|
||||
impl MergeRts {
|
||||
@@ -131,6 +129,8 @@ impl MergeRts {
|
||||
series,
|
||||
scalar_type,
|
||||
shape,
|
||||
range_mt: range.clone(),
|
||||
range_lt: range.clone(),
|
||||
range,
|
||||
with_values,
|
||||
scyqueue,
|
||||
@@ -142,79 +142,101 @@ impl MergeRts {
|
||||
buf_mt: VecDeque::new(),
|
||||
buf_lt: VecDeque::new(),
|
||||
out: VecDeque::new(),
|
||||
buf_before: None,
|
||||
ts_seen_max: 0,
|
||||
}
|
||||
}
|
||||
|
||||
fn setup_first_st(&mut self) {
|
||||
let rt = RetentionTime::Short;
|
||||
let limbuf = &VecDeque::new();
|
||||
let inpdst = &mut self.inp_st;
|
||||
let range = Self::constrained_range(&self.range, limbuf);
|
||||
debug!("setup_first_st constrained beg {}", range.beg().ns());
|
||||
let tsbeg = range.beg();
|
||||
let inp = EventsStreamRt::new(
|
||||
RetentionTime::Short,
|
||||
rt,
|
||||
self.series.clone(),
|
||||
self.scalar_type.clone(),
|
||||
self.shape.clone(),
|
||||
self.range.clone(),
|
||||
range,
|
||||
self.with_values,
|
||||
self.scyqueue.clone(),
|
||||
);
|
||||
let inp = NonEmpty::new(inp);
|
||||
self.inp_st = Some(Box::new(inp));
|
||||
let inp = TI::new(inp, tsbeg);
|
||||
*inpdst = Some(Box::new(inp));
|
||||
}
|
||||
|
||||
fn setup_first_mt(&mut self) {
|
||||
let rt = RetentionTime::Medium;
|
||||
let limbuf = &self.buf_st;
|
||||
let inpdst = &mut self.inp_mt;
|
||||
let range = Self::constrained_range(&self.range_mt, limbuf);
|
||||
self.range_lt = range.clone();
|
||||
debug!("setup_first_mt constrained beg {}", range.beg().ns());
|
||||
let tsbeg = range.beg();
|
||||
let inp = EventsStreamRt::new(
|
||||
RetentionTime::Medium,
|
||||
rt,
|
||||
self.series.clone(),
|
||||
self.scalar_type.clone(),
|
||||
self.shape.clone(),
|
||||
Self::constrained_range(&self.range, &self.buf_st),
|
||||
range,
|
||||
self.with_values,
|
||||
self.scyqueue.clone(),
|
||||
);
|
||||
let inp = NonEmpty::new(inp);
|
||||
self.inp_mt = Some(Box::new(inp));
|
||||
let inp = TI::new(inp, tsbeg);
|
||||
*inpdst = Some(Box::new(inp));
|
||||
}
|
||||
|
||||
fn setup_first_lt(&mut self) {
|
||||
let rt = RetentionTime::Long;
|
||||
let limbuf = &self.buf_mt;
|
||||
let inpdst = &mut self.inp_lt;
|
||||
let range = Self::constrained_range(&self.range_lt, limbuf);
|
||||
debug!("setup_first_lt constrained beg {}", range.beg().ns());
|
||||
let tsbeg = range.beg();
|
||||
let inp = EventsStreamRt::new(
|
||||
RetentionTime::Long,
|
||||
rt,
|
||||
self.series.clone(),
|
||||
self.scalar_type.clone(),
|
||||
self.shape.clone(),
|
||||
Self::constrained_range(&self.range, &self.buf_mt),
|
||||
range,
|
||||
self.with_values,
|
||||
self.scyqueue.clone(),
|
||||
);
|
||||
let inp = NonEmpty::new(inp);
|
||||
self.inp_lt = Some(Box::new(inp));
|
||||
let inp = TI::new(inp, tsbeg);
|
||||
*inpdst = Some(Box::new(inp));
|
||||
}
|
||||
|
||||
fn setup_read_st(&mut self) -> ReadEvents {
|
||||
let stream = unsafe { &mut *(self.inp_st.as_mut().unwrap().as_mut() as *mut NonEmpty<EventsStreamRt>) };
|
||||
let stream = unsafe { &mut *(self.inp_st.as_mut().unwrap().as_mut() as *mut TI) };
|
||||
let fut = Box::pin(stream.next());
|
||||
ReadEvents { fut }
|
||||
}
|
||||
|
||||
fn setup_read_mt(&mut self) -> ReadEvents {
|
||||
let stream = unsafe { &mut *(self.inp_mt.as_mut().unwrap().as_mut() as *mut NonEmpty<EventsStreamRt>) };
|
||||
let stream = unsafe { &mut *(self.inp_mt.as_mut().unwrap().as_mut() as *mut TI) };
|
||||
let fut = Box::pin(stream.next());
|
||||
ReadEvents { fut }
|
||||
}
|
||||
|
||||
fn setup_read_lt(&mut self) -> ReadEvents {
|
||||
let stream = unsafe { &mut *(self.inp_lt.as_mut().unwrap().as_mut() as *mut NonEmpty<EventsStreamRt>) };
|
||||
let stream = unsafe { &mut *(self.inp_lt.as_mut().unwrap().as_mut() as *mut TI) };
|
||||
let fut = Box::pin(stream.next());
|
||||
ReadEvents { fut }
|
||||
}
|
||||
|
||||
fn setup_read_any(inp: &mut Option<Box<NonEmpty<EventsStreamRt>>>) -> ReadEvents {
|
||||
let stream = unsafe { &mut *(inp.as_mut().unwrap().as_mut() as *mut NonEmpty<EventsStreamRt>) };
|
||||
fn setup_read_any(inp: &mut Option<Box<TI>>) -> ReadEvents {
|
||||
let stream = unsafe { &mut *(inp.as_mut().unwrap().as_mut() as *mut TI) };
|
||||
let fut = Box::pin(stream.next());
|
||||
ReadEvents { fut }
|
||||
}
|
||||
|
||||
fn constrained_range(full: &ScyllaSeriesRange, buf: &VecDeque<ChannelEvents>) -> ScyllaSeriesRange {
|
||||
debug!("constrained_range {:?} {:?}", full, buf.front());
|
||||
if let Some(e) = buf.front() {
|
||||
if let Some(ts) = e.ts_min() {
|
||||
let nrange = NanoRange::from((ts, 0));
|
||||
let nrange = NanoRange::from((full.beg().ns(), ts));
|
||||
ScyllaSeriesRange::from(&SeriesRange::from(nrange))
|
||||
} else {
|
||||
debug!("no ts even though should not have empty buffers");
|
||||
@@ -225,12 +247,40 @@ impl MergeRts {
|
||||
}
|
||||
}
|
||||
|
||||
fn dummy(&mut self) -> bool {
|
||||
if self.inp_lt.is_some() {
|
||||
// *fut = Some(self.setup_read_lt());
|
||||
true
|
||||
} else {
|
||||
false
|
||||
fn handle_first_st(&mut self, mut before: ChannelEvents, bulk: ChannelEvents) {
|
||||
Self::move_latest_to_before_buf(&mut before, &mut self.buf_before);
|
||||
self.buf_st.push_back(bulk);
|
||||
self.setup_first_mt();
|
||||
self.state = State::FetchFirstMt(self.setup_read_mt());
|
||||
}
|
||||
|
||||
fn handle_first_mt(&mut self, mut before: ChannelEvents, bulk: ChannelEvents) {
|
||||
Self::move_latest_to_before_buf(&mut before, &mut self.buf_before);
|
||||
self.buf_mt.push_back(bulk);
|
||||
self.setup_first_lt();
|
||||
self.state = State::FetchFirstLt(self.setup_read_lt());
|
||||
}
|
||||
|
||||
fn handle_first_lt(&mut self, mut before: ChannelEvents, bulk: ChannelEvents) {
|
||||
Self::move_latest_to_before_buf(&mut before, &mut self.buf_before);
|
||||
self.buf_lt.push_back(bulk);
|
||||
let buf = core::mem::replace(&mut self.buf_lt, VecDeque::new());
|
||||
self.state = State::ReadingLt(None, buf, self.inp_lt.take());
|
||||
}
|
||||
|
||||
fn move_latest_to_before_buf(before: &mut ChannelEvents, buf: &mut Option<ChannelEvents>) {
|
||||
if buf.is_none() {
|
||||
*buf = Some(before.new_empty());
|
||||
}
|
||||
let buf = buf.as_mut().unwrap();
|
||||
if let Some(tsn) = before.ts_max() {
|
||||
if let Some(tse) = buf.ts_max() {
|
||||
if tsn > tse {
|
||||
let n = before.len();
|
||||
buf.clear();
|
||||
before.drain_into(buf, (n - 1, n)).unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -246,6 +296,33 @@ impl Stream for MergeRts {
|
||||
self.out.push_back(x);
|
||||
}
|
||||
if let Some(item) = self.out.pop_front() {
|
||||
debug!("emit item {} {:?}", items_0::Events::verify(&item), item);
|
||||
if items_0::Events::verify(&item) != true {
|
||||
debug!("{}bad item {:?}", "\n\n--------------------------\n", item);
|
||||
self.state = State::Done;
|
||||
}
|
||||
if let Some(item_min) = item.ts_min() {
|
||||
if item_min < self.ts_seen_max {
|
||||
debug!(
|
||||
"{}ordering error A {} {}",
|
||||
"\n\n--------------------------\n", item_min, self.ts_seen_max
|
||||
);
|
||||
self.state = State::Done;
|
||||
break Ready(Some(Err(Error::Logic)));
|
||||
}
|
||||
}
|
||||
if let Some(item_max) = item.ts_max() {
|
||||
if item_max < self.ts_seen_max {
|
||||
debug!(
|
||||
"{}ordering error B {} {}",
|
||||
"\n\n--------------------------\n", item_max, self.ts_seen_max
|
||||
);
|
||||
self.state = State::Done;
|
||||
break Ready(Some(Err(Error::Logic)));
|
||||
} else {
|
||||
self.ts_seen_max = item_max;
|
||||
}
|
||||
}
|
||||
break Ready(Some(Ok(item)));
|
||||
}
|
||||
break match &mut self.state {
|
||||
@@ -255,15 +332,20 @@ impl Stream for MergeRts {
|
||||
continue;
|
||||
}
|
||||
State::FetchFirstSt(st2) => match st2.fut.poll_unpin(cx) {
|
||||
Ready(Some(Ok(x))) => {
|
||||
debug!("have first from ST");
|
||||
self.buf_st.push_back(x);
|
||||
self.setup_first_mt();
|
||||
self.state = State::FetchFirstMt(self.setup_read_mt());
|
||||
continue;
|
||||
}
|
||||
Ready(Some(Ok(x))) => match x {
|
||||
firstbefore::Output::First(before, bulk) => {
|
||||
debug!("have first from ST");
|
||||
self.handle_first_st(before, bulk);
|
||||
continue;
|
||||
}
|
||||
firstbefore::Output::Bulk(_) => {
|
||||
self.state = State::Done;
|
||||
let e = Error::Logic;
|
||||
Ready(Some(Err(e)))
|
||||
}
|
||||
},
|
||||
Ready(Some(Err(e))) => {
|
||||
self.state = State::Error;
|
||||
self.state = State::Done;
|
||||
Ready(Some(Err(e.into())))
|
||||
}
|
||||
Ready(None) => {
|
||||
@@ -276,15 +358,20 @@ impl Stream for MergeRts {
|
||||
Pending => Pending,
|
||||
},
|
||||
State::FetchFirstMt(st2) => match st2.fut.poll_unpin(cx) {
|
||||
Ready(Some(Ok(x))) => {
|
||||
debug!("have first from MT");
|
||||
self.buf_mt.push_back(x);
|
||||
self.setup_first_lt();
|
||||
self.state = State::FetchFirstLt(self.setup_read_lt());
|
||||
continue;
|
||||
}
|
||||
Ready(Some(Ok(x))) => match x {
|
||||
firstbefore::Output::First(before, bulk) => {
|
||||
debug!("have first from MT");
|
||||
self.handle_first_mt(before, bulk);
|
||||
continue;
|
||||
}
|
||||
firstbefore::Output::Bulk(_) => {
|
||||
self.state = State::Done;
|
||||
let e = Error::Logic;
|
||||
Ready(Some(Err(e)))
|
||||
}
|
||||
},
|
||||
Ready(Some(Err(e))) => {
|
||||
self.state = State::Error;
|
||||
self.state = State::Done;
|
||||
Ready(Some(Err(e.into())))
|
||||
}
|
||||
Ready(None) => {
|
||||
@@ -297,15 +384,20 @@ impl Stream for MergeRts {
|
||||
Pending => Pending,
|
||||
},
|
||||
State::FetchFirstLt(st2) => match st2.fut.poll_unpin(cx) {
|
||||
Ready(Some(Ok(x))) => {
|
||||
debug!("have first from LT");
|
||||
self.buf_lt.push_back(x);
|
||||
let buf = core::mem::replace(&mut self.buf_lt, VecDeque::new());
|
||||
self.state = State::ReadingLt(None, buf, self.inp_lt.take());
|
||||
continue;
|
||||
}
|
||||
Ready(Some(Ok(x))) => match x {
|
||||
firstbefore::Output::First(before, bulk) => {
|
||||
debug!("have first from LT");
|
||||
self.handle_first_lt(before, bulk);
|
||||
continue;
|
||||
}
|
||||
firstbefore::Output::Bulk(_) => {
|
||||
self.state = State::Done;
|
||||
let e = Error::Logic;
|
||||
Ready(Some(Err(e)))
|
||||
}
|
||||
},
|
||||
Ready(Some(Err(e))) => {
|
||||
self.state = State::Error;
|
||||
self.state = State::Done;
|
||||
Ready(Some(Err(e.into())))
|
||||
}
|
||||
Ready(None) => {
|
||||
@@ -325,17 +417,26 @@ impl Stream for MergeRts {
|
||||
match fut2.fut.poll_unpin(cx) {
|
||||
Ready(Some(Ok(x))) => {
|
||||
*fut = None;
|
||||
buf.push_back(x);
|
||||
continue;
|
||||
match x {
|
||||
firstbefore::Output::Bulk(x) => {
|
||||
buf.push_back(x);
|
||||
continue;
|
||||
}
|
||||
firstbefore::Output::First(_, _) => {
|
||||
self.state = State::Done;
|
||||
let e = Error::Logic;
|
||||
Ready(Some(Err(e)))
|
||||
}
|
||||
}
|
||||
}
|
||||
Ready(Some(Err(e))) => {
|
||||
*fut = None;
|
||||
self.state = State::Error;
|
||||
self.state = State::Done;
|
||||
Ready(Some(Err(e.into())))
|
||||
}
|
||||
Ready(None) => {
|
||||
*fut = None;
|
||||
self.inp_lt = None;
|
||||
*inp = None;
|
||||
continue;
|
||||
}
|
||||
Pending => Pending,
|
||||
@@ -343,9 +444,9 @@ impl Stream for MergeRts {
|
||||
} else if inp.is_some() {
|
||||
let buf = core::mem::replace(buf, VecDeque::new());
|
||||
self.state = State::ReadingLt(Some(Self::setup_read_any(inp)), buf, inp.take());
|
||||
// *fut = Some(self.setup_read_lt());
|
||||
continue;
|
||||
} else {
|
||||
debug!("transition ReadingLt to ReadingMt");
|
||||
let buf = core::mem::replace(&mut self.buf_mt, VecDeque::new());
|
||||
self.state = State::ReadingMt(None, buf, self.inp_mt.take());
|
||||
continue;
|
||||
@@ -359,17 +460,26 @@ impl Stream for MergeRts {
|
||||
match fut2.fut.poll_unpin(cx) {
|
||||
Ready(Some(Ok(x))) => {
|
||||
*fut = None;
|
||||
buf.push_back(x);
|
||||
continue;
|
||||
match x {
|
||||
firstbefore::Output::Bulk(x) => {
|
||||
buf.push_back(x);
|
||||
continue;
|
||||
}
|
||||
firstbefore::Output::First(_, _) => {
|
||||
self.state = State::Done;
|
||||
let e = Error::Logic;
|
||||
Ready(Some(Err(e)))
|
||||
}
|
||||
}
|
||||
}
|
||||
Ready(Some(Err(e))) => {
|
||||
*fut = None;
|
||||
self.state = State::Error;
|
||||
self.state = State::Done;
|
||||
Ready(Some(Err(e.into())))
|
||||
}
|
||||
Ready(None) => {
|
||||
*fut = None;
|
||||
self.inp_mt = None;
|
||||
*inp = None;
|
||||
continue;
|
||||
}
|
||||
Pending => Pending,
|
||||
@@ -379,6 +489,7 @@ impl Stream for MergeRts {
|
||||
self.state = State::ReadingMt(Some(Self::setup_read_any(inp)), buf, inp.take());
|
||||
continue;
|
||||
} else {
|
||||
debug!("transition ReadingMt to ReadingSt");
|
||||
let buf = core::mem::replace(&mut self.buf_st, VecDeque::new());
|
||||
self.state = State::ReadingSt(None, buf, self.inp_st.take());
|
||||
continue;
|
||||
@@ -392,17 +503,26 @@ impl Stream for MergeRts {
|
||||
match fut2.fut.poll_unpin(cx) {
|
||||
Ready(Some(Ok(x))) => {
|
||||
*fut = None;
|
||||
buf.push_back(x);
|
||||
continue;
|
||||
match x {
|
||||
firstbefore::Output::Bulk(x) => {
|
||||
buf.push_back(x);
|
||||
continue;
|
||||
}
|
||||
firstbefore::Output::First(_, _) => {
|
||||
self.state = State::Done;
|
||||
let e = Error::Logic;
|
||||
Ready(Some(Err(e)))
|
||||
}
|
||||
}
|
||||
}
|
||||
Ready(Some(Err(e))) => {
|
||||
*fut = None;
|
||||
self.state = State::Error;
|
||||
self.state = State::Done;
|
||||
Ready(Some(Err(e.into())))
|
||||
}
|
||||
Ready(None) => {
|
||||
*fut = None;
|
||||
self.inp_st = None;
|
||||
*inp = None;
|
||||
continue;
|
||||
}
|
||||
Pending => Pending,
|
||||
@@ -416,8 +536,24 @@ impl Stream for MergeRts {
|
||||
Ready(None)
|
||||
}
|
||||
}
|
||||
State::Error => Ready(None),
|
||||
State::Done => Ready(None),
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn trait_assert<T>(_: T)
|
||||
where
|
||||
T: Stream + Unpin + Send,
|
||||
{
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
fn trait_assert_try() {
|
||||
let x: MergeRts = phantomval();
|
||||
trait_assert(x);
|
||||
}
|
||||
|
||||
fn phantomval<T>() -> T {
|
||||
panic!()
|
||||
}
|
||||
|
||||
@@ -10,3 +10,12 @@ pub mod worker;
|
||||
|
||||
pub use scylla;
|
||||
pub use series::SeriesId;
|
||||
|
||||
pub async fn test_log() {
|
||||
use netpod::log::*;
|
||||
error!("------");
|
||||
warn!("------");
|
||||
info!("------");
|
||||
debug!("------");
|
||||
trace!("------");
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user