Simplify filter

This commit is contained in:
Dominik Werder
2024-08-18 14:07:01 +02:00
parent c16013ec81
commit bf3a4058ae
14 changed files with 643 additions and 51 deletions

View File

@@ -809,6 +809,13 @@ impl Mergeable for ChannelEvents {
}
}
}
fn tss(&self) -> Vec<netpod::TsMs> {
Events::tss(self)
.iter()
.map(|x| netpod::TsMs::from_ns_u64(*x))
.collect()
}
}
impl RangeOverlapInfo for ChannelEvents {

View File

@@ -256,6 +256,10 @@ impl Mergeable for EventFull {
}
None
}
fn tss(&self) -> Vec<netpod::TsMs> {
self.tss.iter().map(|x| netpod::TsMs::from_ns_u64(*x)).collect()
}
}
#[derive(Debug, ThisError, Serialize, Deserialize)]

View File

@@ -112,6 +112,12 @@ impl<STY> EventsDim0<STY> {
std::any::type_name::<Self>()
}
pub fn push_back(&mut self, ts: u64, pulse: u64, value: STY) {
self.tss.push_back(ts);
self.pulses.push_back(pulse);
self.values.push_back(value);
}
pub fn push_front(&mut self, ts: u64, pulse: u64, value: STY) {
self.tss.push_front(ts);
self.pulses.push_front(pulse);
@@ -858,11 +864,21 @@ impl<STY: ScalarOps> Events for EventsDim0<STY> {
fn output_info(&self) -> String {
let n2 = self.tss.len().max(1) - 1;
let min = if let Some(ts) = self.tss.get(0) {
TsNano::from_ns(*ts).fmt().to_string()
} else {
String::from("None")
};
let max = if let Some(ts) = self.tss.get(n2) {
TsNano::from_ns(*ts).fmt().to_string()
} else {
String::from("None")
};
format!(
"EventsDim0OutputInfo {{ len {}, ts_min {}, ts_max {} }}",
self.tss.len(),
self.tss.get(0).map_or(-1i64, |&x| x as i64),
self.tss.get(n2).map_or(-1i64, |&x| x as i64),
min,
max,
)
}

View File

@@ -189,6 +189,13 @@ impl Mergeable for Box<dyn Events> {
fn find_highest_index_lt(&self, ts: u64) -> Option<usize> {
self.as_ref().find_highest_index_lt_evs(ts)
}
fn tss(&self) -> Vec<netpod::TsMs> {
Events::tss(self)
.iter()
.map(|x| netpod::TsMs::from_ns_u64(*x))
.collect()
}
}
// TODO rename to `Typed`

View File

@@ -15,6 +15,7 @@ use items_0::Events;
use items_0::MergeError;
use items_0::WithLen;
use netpod::log::*;
use netpod::TsMs;
use std::collections::VecDeque;
use std::fmt;
use std::ops::ControlFlow;
@@ -53,6 +54,8 @@ pub trait Mergeable<Rhs = Self>: fmt::Debug + WithLen + ByteEstimate + Unpin {
fn find_lowest_index_gt(&self, ts: u64) -> Option<usize>;
fn find_lowest_index_ge(&self, ts: u64) -> Option<usize>;
fn find_highest_index_lt(&self, ts: u64) -> Option<usize>;
// TODO only for testing:
fn tss(&self) -> Vec<TsMs>;
}
type MergeInp<T> = Pin<Box<dyn Stream<Item = Sitemty<T>> + Send>>;

View File

@@ -118,9 +118,15 @@ pub const DATETIME_FMT_9MS: &str = "%Y-%m-%dT%H:%M:%S.%9fZ";
const TEST_BACKEND: &str = "testbackend-00";
#[allow(non_upper_case_globals)]
pub const trigger: [&'static str; 1] = [
pub const trigger: [&'static str; 2] = [
//
"S30CB05-VMCP-A010:PRESSURE",
"ATSRF-CAV:TUN-DETUNING-REL-ACT",
];
pub const TRACE_SERIES_ID: [u64; 1] = [
//
4985969403507503043,
];
pub struct OnDrop<F>
@@ -1784,8 +1790,8 @@ impl TsNano {
Self(ns)
}
pub const fn from_ms(ns: u64) -> Self {
Self(1000000 * ns)
pub const fn from_ms(ms: u64) -> Self {
Self(1000000 * ms)
}
pub const fn ns(&self) -> u64 {
@@ -1829,6 +1835,10 @@ impl TsNano {
let x = tsunix.as_secs() * 1000000000 + tsunix.subsec_nanos() as u64;
Self::from_ns(x)
}
pub fn fmt(&self) -> TsNanoFmt {
TsNanoFmt { ts: self.clone() }
}
}
impl fmt::Debug for TsNano {
@@ -1853,6 +1863,25 @@ impl fmt::Display for TsNano {
}
}
pub struct TsNanoFmt {
ts: TsNano,
}
impl fmt::Display for TsNanoFmt {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
chrono::DateTime::from_timestamp_millis(self.ts.ms() as i64)
.unwrap()
.format(DATETIME_FMT_3MS)
.fmt(fmt)
}
}
impl fmt::Debug for TsNanoFmt {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt::Display::fmt(self, fmt)
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, PartialOrd)]
pub struct PulseId(u64);
@@ -2677,6 +2706,20 @@ impl TsMs {
let lsp = DtMs(self.0 - msp.0);
(msp, lsp)
}
pub fn bump_epsilon(&self) -> TsMs {
Self(self.0 + 1)
}
pub fn fmt(&self) -> TsMsFmt {
TsMsFmt { ts: self.clone() }
}
}
impl AsRef<TsMs> for TsMs {
fn as_ref(&self) -> &TsMs {
&self
}
}
impl fmt::Display for TsMs {
@@ -2693,6 +2736,45 @@ impl core::ops::Sub for TsMs {
}
}
pub struct TsMsFmt {
ts: TsMs,
}
impl fmt::Debug for TsMsFmt {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
chrono::DateTime::from_timestamp_millis(self.ts.ms() as i64)
.unwrap()
.format(DATETIME_FMT_3MS)
.fmt(fmt)
}
}
impl fmt::Display for TsMsFmt {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
chrono::DateTime::from_timestamp_millis(self.ts.ms() as i64)
.unwrap()
.format(DATETIME_FMT_3MS)
.fmt(fmt)
}
}
pub struct TsMsVecFmt<I>(pub I);
impl<I, T> fmt::Display for TsMsVecFmt<I>
where
I: Clone + IntoIterator<Item = T>,
T: AsRef<TsMs>,
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "[")?;
for ts in self.0.clone().into_iter() {
write!(fmt, " {}", ts.as_ref().fmt())?;
}
write!(fmt, " ]")?;
Ok(())
}
}
pub trait RetStreamExt: Stream {
fn only_first_error(self) -> OnlyFirstError<Self>
where

View File

@@ -29,11 +29,15 @@ pub struct NanoRange {
}
impl fmt::Debug for NanoRange {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
if true {
let beg = TsNano(self.beg);
let end = TsNano(self.end);
f.debug_struct("NanoRange")
write!(fmt, "NanoRange {{ beg: {}, end: {} }}", beg.fmt(), end.fmt())
} else if false {
let beg = TsNano(self.beg);
let end = TsNano(self.end);
fmt.debug_struct("NanoRange")
.field("beg", &beg)
.field("end", &end)
.finish()
@@ -45,9 +49,9 @@ impl fmt::Debug for NanoRange {
.timestamp_opt((self.end / SEC) as i64, (self.end % SEC) as u32)
.earliest();
if let (Some(a), Some(b)) = (beg, end) {
f.debug_struct("NanoRange").field("beg", &a).field("end", &b).finish()
fmt.debug_struct("NanoRange").field("beg", &a).field("end", &b).finish()
} else {
f.debug_struct("NanoRange")
fmt.debug_struct("NanoRange")
.field("beg", &beg)
.field("end", &end)
.finish()

View File

@@ -410,7 +410,7 @@ where
};
trace!(
"FWD ts_msp {} ts_lsp_min {} ts_lsp_max {} {}",
ts_msp,
ts_msp.fmt(),
ts_lsp_min,
ts_lsp_max,
table_name,
@@ -441,7 +441,12 @@ where
} else {
DtNano::from_ns(0)
};
trace!("BCK ts_msp {} ts_lsp_max {} {}", ts_msp, ts_lsp_max, table_name,);
trace!(
"BCK ts_msp {} ts_lsp_max {} {}",
ts_msp.fmt(),
ts_lsp_max,
table_name,
);
let qu = stmts
.rt(&opts.rt)
.lsp(!opts.fwd, with_values)
@@ -461,7 +466,7 @@ where
}
ret
};
trace!("read ts_msp {:?} len {}", ts_msp, ret.len());
trace!("read ts_msp {} len {}", ts_msp.fmt(), ret.len());
let ret = Box::new(ret);
Ok(ret)
}

View File

@@ -18,6 +18,8 @@ use netpod::EnumVariant;
use netpod::ScalarType;
use netpod::Shape;
use netpod::TsMs;
use netpod::TsMsVecFmt;
use netpod::TsNano;
use series::SeriesId;
use std::collections::VecDeque;
use std::pin::Pin;
@@ -184,7 +186,14 @@ impl EventsStreamRt {
);
let scalar_type = self.ch_conf.scalar_type().clone();
let shape = self.ch_conf.shape().clone();
trace_fetch!("make_read_events_fut bck {} {:?} {:?}", bck, shape, scalar_type);
trace_fetch!(
"make_read_events_fut bck {} msp {:?} {} {:?} {:?}",
bck,
ts_msp,
ts_msp.fmt(),
shape,
scalar_type
);
let fut = async move {
let ret = match &shape {
Shape::Scalar => match &scalar_type {
@@ -241,10 +250,10 @@ impl EventsStreamRt {
}
fn transition_to_bck_read(&mut self) {
trace_fetch!("transition_to_bck_read");
trace_fetch!("transition_to_bck_read A {}", TsMsVecFmt(&self.msp_buf));
for ts in self.msp_buf.iter() {
if ts.ns() < self.range.beg() {
self.msp_buf_bck.push_front(ts.clone());
self.msp_buf_bck.push_back(ts.clone());
}
}
let c = self.msp_buf.iter().take_while(|x| x.ns() < self.range.beg()).count();
@@ -252,12 +261,17 @@ impl EventsStreamRt {
for _ in 0..g {
self.msp_buf.pop_front();
}
trace_fetch!(
"transition_to_bck_read B {} {}",
TsMsVecFmt(&self.msp_buf_bck),
TsMsVecFmt(&self.msp_buf)
);
self.setup_bck_read();
}
fn setup_bck_read(&mut self) {
trace_fetch!("setup_bck_read");
if let Some(ts) = self.msp_buf_bck.pop_front() {
if let Some(ts) = self.msp_buf_bck.pop_back() {
trace_fetch!("setup_bck_read {}", ts.fmt());
let scyqueue = self.scyqueue.clone();
let fut = self.make_read_events_fut(ts, true, scyqueue);
self.state = State::ReadingBck(ReadingBck {
@@ -265,6 +279,7 @@ impl EventsStreamRt {
reading_state: ReadingState::FetchEvents(FetchEvents { fut }),
});
} else {
trace_fetch!("setup_bck_read no msp");
self.transition_to_fwd_read();
}
}
@@ -272,12 +287,13 @@ impl EventsStreamRt {
fn transition_to_fwd_read(&mut self) {
trace_fetch!("transition_to_fwd_read");
self.msp_buf_bck = VecDeque::new();
trace_fetch!("transition_to_fwd_read {}", TsMsVecFmt(&self.msp_buf));
self.setup_fwd_read();
}
fn setup_fwd_read(&mut self) {
if let Some(ts) = self.msp_buf.pop_front() {
trace_fetch!("setup_fwd_read {ts}");
trace_fetch!("setup_fwd_read {}", ts.fmt());
let scyqueue = self.scyqueue.clone();
let fut = self.make_read_events_fut(ts, false, scyqueue);
self.state = State::ReadingFwd(ReadingFwd {
@@ -392,7 +408,7 @@ impl Stream for EventsStreamRt {
State::ReadingBck(st) => match &mut st.reading_state {
ReadingState::FetchMsp(st2) => match st2.fut.poll_unpin(cx) {
Ready(Some(Ok(ts))) => {
trace_fetch!("ReadingBck FetchMsp {:?}", ts);
trace_fetch!("ReadingBck FetchMsp {}", ts.fmt());
self.msp_buf.push_back(ts);
if ts.ns() >= self.range.beg() {
self.transition_to_bck_read();
@@ -415,7 +431,10 @@ impl Stream for EventsStreamRt {
ReadingState::FetchEvents(st2) => match st2.fut.poll_unpin(cx) {
Ready(Ok(mut x)) => {
use items_2::merger::Mergeable;
trace_fetch!("ReadingBck FetchEvents got len {:?}", x.len());
trace_fetch!("ReadingBck FetchEvents got len {}", x.len());
for ts in Mergeable::tss(&x) {
trace_fetch!("ReadingBck FetchEvents ts {}", ts.fmt());
}
if let Some(ix) = Mergeable::find_highest_index_lt(&x, self.range.beg().ns()) {
trace_fetch!("ReadingBck FetchEvents find_highest_index_lt {:?}", ix);
let mut y = Mergeable::new_empty(&x);
@@ -447,6 +466,7 @@ impl Stream for EventsStreamRt {
State::ReadingFwd(st) => match &mut st.reading_state {
ReadingState::FetchMsp(st2) => match st2.fut.poll_unpin(cx) {
Ready(Some(Ok(ts))) => {
trace_fetch!("ReadingFwd FetchMsp {}", ts.fmt());
self.msp_buf.push_back(ts);
self.setup_fwd_read();
continue;
@@ -460,6 +480,11 @@ impl Stream for EventsStreamRt {
},
ReadingState::FetchEvents(st2) => match st2.fut.poll_unpin(cx) {
Ready(Ok(x)) => {
trace_fetch!("ReadingFwd FetchEvents got len {:?}", x.len());
for ts_ns in x.tss() {
let ts = TsNano::from_ns(*ts_ns).to_ts_ms();
trace_fetch!("ReadingFwd FetchEvents ts {}", ts.fmt());
}
self.out.push_back(x);
self.setup_fwd_read();
continue;

View File

@@ -119,7 +119,7 @@ where
Ready(Some(Err(e)))
} else {
// Separate events into before and bulk
let tss = item.tss();
let tss = Events::tss(&item);
let pp = tss.partition_point(|&x| x < self.ts0.ns());
trace_transition!("partition_point {pp:?} {n:?}", n = tss.len());
if pp > item.len() {

View File

@@ -1,6 +1,7 @@
use super::prepare::StmtsEvents;
use crate::range::ScyllaSeriesRange;
use crate::worker::ScyllaQueue;
use core::fmt;
use err::thiserror;
use err::ThisError;
use futures_util::Future;
@@ -10,6 +11,7 @@ use futures_util::StreamExt;
use netpod::log::*;
use netpod::ttl::RetentionTime;
use netpod::TsMs;
use netpod::TsMsVecFmt;
use scylla::Session;
use series::SeriesId;
use std::collections::VecDeque;
@@ -17,6 +19,15 @@ use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
#[allow(unused)]
macro_rules! trace_emit {
($det:expr, $($arg:tt)*) => {
if $det {
trace!($($arg)*);
}
};
}
#[derive(Debug, ThisError)]
#[cstm(name = "EventsMsp")]
pub enum Error {
@@ -53,6 +64,14 @@ where
Resolvable::Taken => None,
}
}
fn is_taken(&self) -> bool {
if let Self::Taken = self {
true
} else {
false
}
}
}
struct BckAndFirstFwd {
@@ -60,9 +79,13 @@ struct BckAndFirstFwd {
fut_fwd: Resolvable<Pin<Box<dyn Future<Output = Result<VecDeque<TsMs>, crate::worker::Error>> + Send>>>,
}
struct Fwd {
fut_fwd: Resolvable<Pin<Box<dyn Future<Output = Result<VecDeque<TsMs>, crate::worker::Error>> + Send>>>,
}
enum State {
BckAndFirstFwd(BckAndFirstFwd),
InputDone,
Fwd(Fwd),
}
#[pin_project::pin_project]
@@ -70,9 +93,12 @@ pub struct MspStreamRt {
rt: RetentionTime,
series: SeriesId,
range: ScyllaSeriesRange,
no_more: bool,
#[pin]
state: State,
out: VecDeque<TsMs>,
scyqueue: ScyllaQueue,
do_trace_detail: bool,
}
impl MspStreamRt {
@@ -91,17 +117,45 @@ impl MspStreamRt {
let range = range.clone();
async move { scyqueue.find_ts_msp(rt, series.id(), range, false).await }
};
let do_trace_detail = netpod::TRACE_SERIES_ID.contains(&series.id());
info!("------------------------------------- TEST INFO");
trace_emit!(do_trace_detail, "------------------------------------- TEST TRACE");
Self {
rt,
series,
range,
no_more: false,
state: State::BckAndFirstFwd(BckAndFirstFwd {
fut_bck: Resolvable::Future(Box::pin(fut_bck)),
fut_fwd: Resolvable::Future(Box::pin(fut_fwd)),
}),
out: VecDeque::new(),
scyqueue,
do_trace_detail,
}
}
fn next_fwd_fut(
&mut self,
) -> Resolvable<Pin<Box<dyn Future<Output = Result<VecDeque<TsMs>, crate::worker::Error>> + Send>>> {
let range = if let Some(msp) = self.out.back() {
let x = ScyllaSeriesRange::new(msp.bump_epsilon().ns(), self.range.end());
trace_emit!(self.do_trace_detail, "next_fwd_fut {}", x.fmt());
x
} else {
// should not get here
let x = ScyllaSeriesRange::new(self.range.end(), self.range.end());
trace_emit!(self.do_trace_detail, "next_fwd_fut NOTHING IN BUFFER {}", x.fmt());
x
};
let fut_fwd = {
let scyqueue = self.scyqueue.clone();
let rt = self.rt.clone();
let series = self.series.clone();
async move { scyqueue.find_ts_msp(rt, series.id(), range, false).await }
};
Resolvable::Future(Box::pin(fut_fwd))
}
}
impl Stream for MspStreamRt {
@@ -109,6 +163,7 @@ impl Stream for MspStreamRt {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
let trdet = self.do_trace_detail;
loop {
break match &mut self.state {
State::BckAndFirstFwd(st) => {
@@ -117,6 +172,7 @@ impl Stream for MspStreamRt {
match rsv {
Resolvable::Future(fut) => match fut.poll_unpin(cx) {
Ready(x) => {
trace_emit!(trdet, "bck resolved {x:?}");
*rsv = Resolvable::Output(x);
}
Pending => {
@@ -129,6 +185,7 @@ impl Stream for MspStreamRt {
match rsv {
Resolvable::Future(fut) => match fut.poll_unpin(cx) {
Ready(x) => {
trace_emit!(trdet, "fwd resolved {x:?}");
*rsv = Resolvable::Output(x);
}
Pending => {
@@ -142,7 +199,6 @@ impl Stream for MspStreamRt {
} else {
let taken_bck = st.fut_bck.take();
let taken_fwd = st.fut_fwd.take();
self.state = State::InputDone;
if let (Some(taken_bck), Some(taken_fwd)) = (taken_bck, taken_fwd) {
match taken_bck {
Ok(v1) => match taken_fwd {
@@ -153,6 +209,10 @@ impl Stream for MspStreamRt {
for e in v2 {
self.out.push_back(e)
}
trace_emit!(trdet, "ready out {}", TsMsVecFmt(self.out.iter()));
self.state = State::Fwd(Fwd {
fut_fwd: self.next_fwd_fut(),
});
continue;
}
Err(e) => Ready(Some(Err(e.into()))),
@@ -164,10 +224,62 @@ impl Stream for MspStreamRt {
}
}
}
State::InputDone => {
if let Some(x) = self.out.pop_front() {
Ready(Some(Ok(x)))
State::Fwd(st) => {
// TODO check if more input is coming
let mut have_pending = false;
let mut have_progress = false;
let rsv = &mut st.fut_fwd;
match rsv {
Resolvable::Future(fut) => match fut.poll_unpin(cx) {
Ready(Ok(x)) => {
*rsv = Resolvable::Taken;
trace_emit!(trdet, "bulk fwd resolved {x:?}");
if x.len() == 0 {
self.no_more = true;
}
for e in x {
self.out.push_back(e)
}
}
Ready(Err(e)) => {
trace_emit!(trdet, "bulk fwd error {e}");
*rsv = Resolvable::Taken;
error!("{e}");
}
Pending => {
trace_emit!(trdet, "bulk fwd Pending");
have_pending = true;
}
},
Resolvable::Output(..) => {
trace_emit!(trdet, "bulk fwd Output");
}
Resolvable::Taken => {
trace_emit!(trdet, "bulk fwd Taken");
}
}
let is_taken = if let State::Fwd(st2) = &self.state {
st2.fut_fwd.is_taken()
} else {
panic!("logic");
};
if self.out.len() < 3 && self.no_more == false && is_taken {
trace_emit!(trdet, "bulk fwd low in out make next fut ++++++++++++++++++++");
self.state = State::Fwd(Fwd {
fut_fwd: self.next_fwd_fut(),
});
have_progress = true;
}
if let Some(x) = self.out.pop_front() {
trace_emit!(trdet, "State::Fwd => Some emit {}", x.fmt());
Ready(Some(Ok(x)))
} else if have_progress {
continue;
} else if have_pending {
trace_emit!(trdet, "State::Fwd => Pending !!!!!!!!!!!!!!!!!!!!!!!");
Pending
} else {
trace_emit!(trdet, "State::Fwd => None !!!!!!!!!!!!!!!!!!!!!!");
Ready(None)
}
}
@@ -200,7 +312,13 @@ pub async fn find_ts_msp(
stmts: &StmtsEvents,
scy: &Session,
) -> Result<VecDeque<TsMs>, Error> {
trace!("find_ts_msp series {:?} {:?} {:?} bck {}", rt, series, range, bck);
trace!(
"find_ts_msp series {:?} {:?} {} bck {}",
rt,
series,
range.fmt(),
bck
);
if bck {
find_ts_msp_bck(rt, series, range, stmts, scy).await
} else {

View File

@@ -113,7 +113,7 @@ async fn make_msp_dir(ks: &str, rt: &RetentionTime, bck: bool, scy: &Session) ->
let select_cond = if bck {
"ts_msp < ? order by ts_msp desc limit 2"
} else {
"ts_msp >= ? and ts_msp < ?"
"ts_msp >= ? and ts_msp < ? limit 20"
};
let cql = format!(
"select ts_msp from {}.{}{} where series = ? and {}",

View File

@@ -1,3 +1,4 @@
use core::fmt;
use netpod::range::evrange::SeriesRange;
use netpod::TsNano;
@@ -8,6 +9,13 @@ pub struct ScyllaSeriesRange {
}
impl ScyllaSeriesRange {
pub fn new(beg: TsNano, end: TsNano) -> Self {
Self {
beg: beg.ns(),
end: end.ns(),
}
}
pub fn beg(&self) -> TsNano {
TsNano::from_ns(self.beg)
}
@@ -15,6 +23,10 @@ impl ScyllaSeriesRange {
pub fn end(&self) -> TsNano {
TsNano::from_ns(self.end)
}
pub fn fmt(&self) -> ScyllaSeriesRangeFmt {
ScyllaSeriesRangeFmt { val: self.clone() }
}
}
impl From<&SeriesRange> for ScyllaSeriesRange {
@@ -25,3 +37,18 @@ impl From<&SeriesRange> for ScyllaSeriesRange {
}
}
}
pub struct ScyllaSeriesRangeFmt {
val: ScyllaSeriesRange,
}
impl fmt::Display for ScyllaSeriesRangeFmt {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(
fmt,
"ScyllaSeriesRange {{ beg: {}, end: {} }}",
self.val.beg().fmt(),
self.val.end().fmt()
)
}
}

View File

@@ -10,11 +10,27 @@ use items_2::merger::Mergeable;
use netpod::log::*;
use netpod::range::evrange::NanoRange;
use netpod::RangeFilterStats;
use netpod::TsMsVecFmt;
use netpod::TsNano;
use std::fmt;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
#[cfg(test)]
use items_0::Events;
#[cfg(test)]
use std::collections::VecDeque;
#[allow(unused)]
macro_rules! trace_emit {
($det:expr, $($arg:tt)*) => {
if $det {
eprintln!($($arg)*);
}
};
}
pub struct RangeFilter2<S, ITY>
where
S: Stream<Item = Sitemty<ITY>> + Unpin,
@@ -26,12 +42,12 @@ where
one_before_range: bool,
stats: RangeFilterStats,
slot1: Option<ITY>,
slot2: Option<ITY>,
have_range_complete: bool,
data_done: bool,
inp_done: bool,
raco_done: bool,
done: bool,
complete: bool,
trdet: bool,
}
impl<S, ITY> RangeFilter2<S, ITY>
@@ -44,7 +60,9 @@ where
}
pub fn new(inp: S, range: NanoRange, one_before_range: bool) -> Self {
trace!(
let trdet = false;
trace_emit!(
trdet,
"{}::new range: {:?} one_before_range {:?}",
Self::type_name(),
range,
@@ -57,12 +75,12 @@ where
one_before_range,
stats: RangeFilterStats::new(),
slot1: None,
slot2: None,
have_range_complete: false,
data_done: false,
inp_done: false,
raco_done: false,
done: false,
complete: false,
trdet,
}
}
@@ -98,21 +116,32 @@ where
}
fn handle_item(&mut self, item: ITY) -> Result<ITY, Error> {
let min = item.ts_min().map(|x| TsNano::from_ns(x).fmt());
let max = item.ts_max().map(|x| TsNano::from_ns(x).fmt());
trace_emit!(
self.trdet,
"see event len {} min {:?} max {:?}",
item.len(),
min,
max
);
let mut item = self.prune_high(item, self.range.end)?;
let ret = if self.one_before_range {
match item.find_lowest_index_ge(self.range.beg) {
Some(ilge) => {
if ilge == 0 {
let lige = item.find_lowest_index_ge(self.range.beg);
trace_emit!(self.trdet, "YES one_before_range ilge {:?}", lige);
match lige {
Some(lige) => {
if lige == 0 {
if let Some(sl1) = self.slot1.take() {
self.slot2 = Some(item);
self.slot1 = Some(item);
sl1
} else {
item
}
} else {
trace!("discarding events len {:?}", ilge - 1);
trace_emit!(self.trdet, "discarding events len {:?}", lige - 1);
let mut dummy = item.new_empty();
item.drain_into(&mut dummy, (0, ilge - 1))
item.drain_into(&mut dummy, (0, lige - 1))
.map_err(|e| format!("{e} unexpected MergeError while remove of items"))?;
self.slot1 = None;
item
@@ -120,7 +149,7 @@ where
}
None => {
// TODO keep stats about this case
debug!("drain into to keep one before");
trace_emit!(self.trdet, "drain into to keep one before");
let n = item.len();
let mut keep = item.new_empty();
item.drain_into(&mut keep, (n.max(1) - 1, n))
@@ -130,10 +159,12 @@ where
}
}
} else {
match item.find_lowest_index_ge(self.range.beg) {
Some(ilge) => {
let lige = item.find_lowest_index_ge(self.range.beg);
trace_emit!(self.trdet, "NOT one_before_range ilge {:?}", lige);
match lige {
Some(lige) => {
let mut dummy = item.new_empty();
item.drain_into(&mut dummy, (0, ilge))?;
item.drain_into(&mut dummy, (0, lige))?;
item
}
None => {
@@ -168,23 +199,25 @@ where
let k = std::mem::replace(&mut self.stats, RangeFilterStats::new());
let k = StatsItem::RangeFilterStats(k);
Ready(Some(Ok(StreamItem::Stats(k))))
} else if self.data_done {
} else if self.inp_done {
self.raco_done = true;
if self.have_range_complete {
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))))
} else {
continue;
}
} else if let Some(sl2) = self.slot2.take() {
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(sl2)))))
} else {
match self.inp.poll_next_unpin(cx) {
Ready(Some(item)) => match item {
Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) => match self.handle_item(item) {
Ok(item) => Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))))),
Ok(item) => {
trace_emit!(self.trdet, "emit {}", TsMsVecFmt(Mergeable::tss(&item).iter()));
let item = Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)));
Ready(Some(item))
}
Err(e) => {
error!("sees: {e}");
self.data_done = true;
self.inp_done = true;
Ready(Some(Err(e)))
}
},
@@ -195,8 +228,12 @@ where
k => Ready(Some(k)),
},
Ready(None) => {
self.data_done = true;
continue;
self.inp_done = true;
if let Some(sl1) = self.slot1.take() {
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(sl1)))))
} else {
continue;
}
}
Pending => Pending,
}
@@ -216,7 +253,7 @@ where
let span1 = span!(Level::INFO, "RangeFilter2", range = tracing::field::Empty);
span1.record("range", &self.range_str.as_str());
let _spg = span1.enter();
Self::poll_next(self, cx)
RangeFilter2::poll_next(self, cx)
}
}
@@ -240,3 +277,260 @@ where
debug!("drop {:?}", self);
}
}
#[test]
fn test_00() {
use items_0::Empty;
use items_2::eventsdim0::EventsDim0;
let ms = 1_000_000;
let beg = TsNano::from_ms(1000 * 10);
let end = TsNano::from_ms(1000 * 20);
let mut item1 = EventsDim0::<f32>::empty();
item1.push_back(beg.ns() + 0 * ms, 0, 3.);
item1.push_back(beg.ns() + 1 * ms, 0, 3.1);
item1.push_back(beg.ns() + 2 * ms, 0, 3.2);
item1.push_back(beg.ns() + 3 * ms, 0, 3.3);
item1.push_back(beg.ns() + 4 * ms, 0, 3.4);
item1.push_back(end.ns() - 1, 0, 4.0);
item1.push_back(end.ns() + 0, 0, 4.1);
item1.push_back(end.ns() + 1, 0, 4.1);
let w1: Box<dyn Events> = Box::new(item1.clone());
let e1 = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w1)));
let inp = futures_util::stream::iter([e1]);
let one_before_range = false;
let range = NanoRange::from((beg.ns(), end.ns()));
let stream = RangeFilter2::new(inp, range, one_before_range);
let fut = async move {
let tss_items = fetch_into_tss_items(stream).await;
let exp: &[&[u64]] = &[&[
beg.ns() + 0 * ms,
beg.ns() + 1 * ms,
beg.ns() + 2 * ms,
beg.ns() + 3 * ms,
beg.ns() + 4 * ms,
end.ns() - 1,
]];
assert_eq!(&tss_items, &exp);
Ok::<_, Error>(())
};
taskrun::run(fut).unwrap();
}
#[test]
fn test_cut_before_00() {
use items_0::Empty;
use items_2::eventsdim0::EventsDim0;
let ms = 1_000_000;
let beg = TsNano::from_ms(1000 * 10);
let end = TsNano::from_ms(1000 * 20);
let mut items = Vec::new();
{
let mut item = EventsDim0::<f32>::empty();
item.push_back(beg.ns() - 1, 0, 2.9);
let w: Box<dyn Events> = Box::new(item.clone());
let e: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w)));
items.push(e);
}
{
let mut item = EventsDim0::<f32>::empty();
item.push_back(beg.ns() + 0 * ms, 0, 3.);
item.push_back(beg.ns() + 1 * ms, 0, 3.1);
item.push_back(beg.ns() + 2 * ms, 0, 3.2);
item.push_back(beg.ns() + 3 * ms, 0, 3.3);
item.push_back(beg.ns() + 4 * ms, 0, 3.4);
item.push_back(end.ns() - 1, 0, 4.0);
item.push_back(end.ns() + 0, 0, 4.1);
item.push_back(end.ns() + 1, 0, 4.1);
let w: Box<dyn Events> = Box::new(item.clone());
let e: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w)));
items.push(e);
}
let inp = futures_util::stream::iter(items);
let one_before_range = false;
let range = NanoRange::from((beg.ns(), end.ns()));
let stream = RangeFilter2::new(inp, range, one_before_range);
let fut = async move {
let tss_items = fetch_into_tss_items(stream).await;
let exp: &[&[u64]] = &[
// TODO in the future this empty may be discarded
&[],
&[
beg.ns() + 0 * ms,
beg.ns() + 1 * ms,
beg.ns() + 2 * ms,
beg.ns() + 3 * ms,
beg.ns() + 4 * ms,
end.ns() - 1,
],
];
assert_eq!(&tss_items, &exp);
Ok::<_, Error>(())
};
taskrun::run(fut).unwrap();
}
#[test]
fn test_one_before_00() {
use items_0::Empty;
use items_2::eventsdim0::EventsDim0;
let ms = 1_000_000;
let beg = TsNano::from_ms(1000 * 10);
let end = TsNano::from_ms(1000 * 20);
let mut items = Vec::new();
{
let mut item = EventsDim0::<f32>::empty();
item.push_back(beg.ns() - 1, 0, 2.9);
let w: Box<dyn Events> = Box::new(item.clone());
let e: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w)));
items.push(e);
}
{
let mut item = EventsDim0::<f32>::empty();
item.push_back(beg.ns() + 0 * ms, 0, 3.);
item.push_back(beg.ns() + 1 * ms, 0, 3.1);
item.push_back(beg.ns() + 2 * ms, 0, 3.2);
item.push_back(beg.ns() + 3 * ms, 0, 3.3);
item.push_back(beg.ns() + 4 * ms, 0, 3.4);
item.push_back(end.ns() - 1, 0, 4.0);
item.push_back(end.ns() + 0, 0, 4.1);
item.push_back(end.ns() + 1, 0, 4.1);
let w: Box<dyn Events> = Box::new(item.clone());
let e: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w)));
items.push(e);
}
let inp = futures_util::stream::iter(items);
let one_before_range = true;
let range = NanoRange::from((beg.ns(), end.ns()));
let stream = RangeFilter2::new(inp, range, one_before_range);
let fut = async move {
let tss_items = fetch_into_tss_items(stream).await;
let exp: &[&[u64]] = &[
// TODO in the future this empty may be discarded
&[],
&[
//
beg.ns() - 1,
],
&[
beg.ns() + 0 * ms,
beg.ns() + 1 * ms,
beg.ns() + 2 * ms,
beg.ns() + 3 * ms,
beg.ns() + 4 * ms,
end.ns() - 1,
],
];
assert_eq!(&tss_items, &exp);
Ok::<_, Error>(())
};
taskrun::run(fut).unwrap();
}
#[test]
fn test_one_before_01() {
use items_0::Empty;
use items_2::eventsdim0::EventsDim0;
let ms = 1_000_000;
let beg = TsNano::from_ms(1000 * 10);
let end = TsNano::from_ms(1000 * 20);
let mut items = Vec::new();
{
let mut item = EventsDim0::<f32>::empty();
item.push_back(beg.ns() - 1, 0, 2.9);
item.push_back(beg.ns() + 0 * ms, 0, 3.);
let w: Box<dyn Events> = Box::new(item.clone());
let e: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w)));
items.push(e);
}
{
let mut item = EventsDim0::<f32>::empty();
item.push_back(beg.ns() + 1 * ms, 0, 3.1);
item.push_back(beg.ns() + 2 * ms, 0, 3.2);
item.push_back(beg.ns() + 3 * ms, 0, 3.3);
item.push_back(beg.ns() + 4 * ms, 0, 3.4);
item.push_back(end.ns() - 1, 0, 4.0);
item.push_back(end.ns() + 0, 0, 4.1);
item.push_back(end.ns() + 1, 0, 4.1);
let w: Box<dyn Events> = Box::new(item.clone());
let e: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w)));
items.push(e);
}
let inp = futures_util::stream::iter(items);
let one_before_range = true;
let range = NanoRange::from((beg.ns(), end.ns()));
let stream = RangeFilter2::new(inp, range, one_before_range);
let fut = async move {
let tss_items = fetch_into_tss_items(stream).await;
let exp: &[&[u64]] = &[
// TODO in the future this empty may be discarded
// &[],
&[
//
beg.ns() - 1,
beg.ns() + 0 * ms,
],
&[
beg.ns() + 1 * ms,
beg.ns() + 2 * ms,
beg.ns() + 3 * ms,
beg.ns() + 4 * ms,
end.ns() - 1,
],
];
assert_eq!(&tss_items, &exp);
Ok::<_, Error>(())
};
taskrun::run(fut).unwrap();
}
#[test]
fn test_one_before_only() {
use items_0::Empty;
use items_2::eventsdim0::EventsDim0;
let _ms = 1_000_000;
let beg = TsNano::from_ms(1000 * 10);
let end = TsNano::from_ms(1000 * 20);
let mut items = Vec::new();
{
let mut item = EventsDim0::<f32>::empty();
item.push_back(beg.ns() - 1, 0, 2.9);
let w: Box<dyn Events> = Box::new(item.clone());
let e: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w)));
items.push(e);
}
let inp = futures_util::stream::iter(items);
let one_before_range = true;
let range = NanoRange::from((beg.ns(), end.ns()));
let stream = RangeFilter2::new(inp, range, one_before_range);
let fut = async move {
let tss_items = fetch_into_tss_items(stream).await;
let exp: &[&[u64]] = &[
// TODO in the future this empty may be discarded
&[],
&[
//
beg.ns() - 1,
],
];
assert_eq!(&tss_items, &exp);
Ok::<_, Error>(())
};
taskrun::run(fut).unwrap();
}
#[cfg(test)]
async fn fetch_into_tss_items<INP>(mut inp: INP) -> VecDeque<VecDeque<u64>>
where
INP: Stream<Item = Sitemty<Box<dyn Events>>> + Unpin,
{
let mut tss_items = VecDeque::new();
while let Some(e) = inp.next().await {
if let Ok(StreamItem::DataItem(RangeCompletableItem::Data(evs))) = e {
eprintln!("{:?}", evs);
tss_items.push_back(Events::tss(&evs).clone());
} else {
eprintln!("other item ----------: {:?}", e);
}
}
tss_items
}