Refactor one-before retrieve

This commit is contained in:
Dominik Werder
2024-08-16 10:53:32 +02:00
parent b52fbd9044
commit 9068b1bbad
25 changed files with 467 additions and 136 deletions

View File

@@ -2,6 +2,7 @@ use crate::create_connection;
use crate::worker::PgQueue;
use crate::ErrConv;
use err::Error;
use netpod::log::*;
use netpod::ChannelArchiver;
use netpod::ChannelSearchQuery;
use netpod::ChannelSearchResult;
@@ -128,10 +129,9 @@ pub(super) async fn search_channel_scylla(
),
regop
);
let rows = pgc
.query(sql, &[&ch_kind, &query.name_regex, &cb1, &cb2])
.await
.err_conv()?;
let params: &[&(dyn tokio_postgres::types::ToSql + Sync)] = &[&ch_kind, &query.name_regex, &cb1, &cb2];
info!("search_channel_scylla {:?}", params);
let rows = pgc.query(sql, params).await.err_conv()?;
let mut res = Vec::new();
for row in rows {
let series: i64 = row.get(0);

View File

@@ -310,8 +310,7 @@ pub async fn connect_client(uri: &http::Uri) -> Result<SendRequest<StreamBody>,
}
});
let stream = TcpStream::connect(format!("{host}:{port}")).await?;
#[cfg(DISABLED)]
{
if false {
let executor = hyper_util::rt::TokioExecutor::new();
hyper::client::conn::http2::Builder::new(executor);
}

View File

@@ -765,6 +765,7 @@ impl DataApiPython3DataStream {
let select = EventsSubQuerySelect::new(
ChannelTypeConfigGen::SfDatabuffer(fetch_info.clone()),
self.range.clone().into(),
false,
TransformQuery::for_event_blobs(),
);
let log_level = String::new();

View File

@@ -242,20 +242,21 @@ async fn proxy_http_service_inner(
} else if let Some(h) = super::api4::docs::DocsHandler::handler(&req) {
Ok(h.handle(req, ctx).await?)
} else {
use std::fmt::Write;
let mut body = String::new();
let out = &mut body;
write!(out, "<pre>\n")?;
write!(out, "METHOD {:?}<br>\n", req.method())?;
write!(out, "URI {:?}<br>\n", req.uri())?;
write!(out, "HOST {:?}<br>\n", req.uri().host())?;
write!(out, "PORT {:?}<br>\n", req.uri().port())?;
write!(out, "PATH {:?}<br>\n", req.uri().path())?;
write!(out, "QUERY {:?}<br>\n", req.uri().query())?;
for (hn, hv) in req.headers() {
write!(out, "HEADER {hn:?}: {hv:?}<br>\n")?;
}
write!(out, "</pre>\n")?;
let headers: std::collections::BTreeMap<String, String> = req
.headers()
.iter()
.map(|(k, v)| (k.to_string(), v.to_str().map_or(String::new(), |x| x.to_string())))
.collect();
let res = serde_json::json!({
"method": req.method().to_string(),
"uri": req.uri().to_string(),
"host": req.uri().host().unwrap_or(""),
"port": req.uri().port().map_or(String::new(), |x| x.to_string()),
"path": req.uri().path(),
"query": req.uri().query(),
"headers": headers,
});
let body = serde_json::to_string(&res).unwrap_or("{}".into());
Ok(response(StatusCode::NOT_FOUND).body(body_string(body))?)
}
}
@@ -513,18 +514,20 @@ where
return Ok(response_err_msg(StatusCode::BAD_REQUEST, msg)?);
}
};
debug!("proxy_backend_query {:?} {:?}", query, req.uri());
trace!("proxy_backend_query {:?} {:?}", query, req.uri());
let timeout = query.timeout();
let timeout_next = timeout.saturating_sub(Duration::from_millis(1000));
debug!("timeout {timeout:?} timeout_next {timeout_next:?}");
trace!("timeout {timeout:?} timeout_next {timeout_next:?}");
query.set_timeout(timeout_next);
let query = query;
let backend = query.backend();
let uri_path = proxy_backend_query_helper_uri_path(req.uri().path(), &url);
debug!("uri_path {uri_path}");
trace!("uri_path {uri_path}");
let query_host = get_query_host_for_backend(backend, proxy_config)?;
let mut url = Url::parse(&format!("{}{}", query_host, uri_path))?;
trace!("query_host {query_host} uri_path {uri_path} query {query:?}");
query.append_to_url(&mut url);
trace!("url {:?}", url);
proxy_backend_query_inner(req.headers(), url, timeout, ctx, proxy_config).await
}

View File

@@ -68,7 +68,7 @@ impl EventsHandler {
let url = req_uri_to_url(&head.uri)?;
let pairs = get_url_query_pairs(&url);
let evq = PlainEventsQuery::from_pairs(&pairs)?;
debug!("{evq:?}");
debug!("{:?}", evq);
let query_host = get_query_host_for_backend(evq.backend(), proxy_config)?;
let url_str = format!(
"{}{}",

View File

@@ -11,9 +11,7 @@ pub struct IsoDateTime(DateTime<Utc>);
impl IsoDateTime {
pub fn from_unix_millis(ms: u64) -> Self {
let datetime = chrono::NaiveDateTime::from_timestamp_millis(ms as i64)
.unwrap()
.and_utc();
let datetime = chrono::DateTime::from_timestamp_millis(ms as i64).unwrap();
Self(datetime)
}
}

View File

@@ -1,4 +1,3 @@
use netpod::log::*;
use netpod::range::evrange::SeriesRange;
// TODO rename, no more deque involved

View File

@@ -161,6 +161,7 @@ impl WithLen for Box<dyn TimeBinnable> {
}
}
#[allow(unused)]
impl RangeOverlapInfo for Box<dyn TimeBinnable> {
fn ends_before(&self, range: &SeriesRange) -> bool {
todo!()
@@ -190,6 +191,7 @@ impl TimeBinnable for Box<dyn TimeBinnable> {
}
}
#[allow(unused)]
impl RangeOverlapInfo for Box<dyn Events> {
fn ends_before(&self, range: &SeriesRange) -> bool {
todo!()
@@ -455,6 +457,7 @@ impl TimeBinnerTy for TimeBinnerDynStruct2 {
}
}
#[allow(unused)]
impl TimeBinner for TimeBinnerDynStruct2 {
fn ingest(&mut self, item: &mut dyn TimeBinnable) {
todo!()

View File

@@ -17,7 +17,7 @@ macro_rules! trace_ingest {
($($arg:tt)*) => { trace!($($arg)*) };
}
#[cfg(DISABLED)]
#[cfg(target_abi = "x32")]
impl<T> TimeBinner for T
where
T: TimeBinnerIngest,

View File

@@ -74,6 +74,15 @@ macro_rules! trace2 {
($($arg:tt)*) => { trace!($($arg)*); };
}
#[allow(unused)]
macro_rules! trace_binning {
($($arg:tt)*) => {
if false {
trace!($($arg)*);
}
};
}
#[derive(Clone, PartialEq, Serialize, Deserialize)]
pub struct EventsDim0NoPulse<STY> {
pub tss: VecDeque<u64>,
@@ -667,7 +676,7 @@ impl<STY: ScalarOps> EventsDim0Aggregator<STY> {
fn reset_values(&mut self, range: SeriesRange) {
self.int_ts = range.beg_u64();
trace!("ON RESET SET int_ts {:10}", self.int_ts);
trace_binning!("ON RESET SET int_ts {:10}", self.int_ts);
self.range = range;
self.count = 0;
self.sum = 0.;
@@ -707,9 +716,11 @@ impl<STY: ScalarOps> EventsDim0Aggregator<STY> {
fn result_reset_time_weight(&mut self, range: SeriesRange) -> BinsDim0<STY> {
// TODO check callsite for correct expand status.
debug!(
trace_binning!(
"result_reset_time_weight calls apply_event_time_weight range {:?} items_seen {} count {}",
self.range, self.items_seen, self.count
self.range,
self.items_seen,
self.count
);
let range_beg = self.range.beg_u64();
let range_end = self.range.end_u64();
@@ -777,7 +788,7 @@ impl<STY: ScalarOps> TimeBinnableTypeAggregator for EventsDim0Aggregator<STY> {
}
fn result_reset(&mut self, range: SeriesRange) -> Self::Output {
trace!("result_reset {:?}", range);
trace_binning!("result_reset {:?}", range);
if self.do_time_weight {
self.result_reset_time_weight(range)
} else {
@@ -1066,10 +1077,10 @@ impl<STY: ScalarOps> EventsDim0TimeBinner<STY> {
fn next_bin_range(&mut self) -> Option<SeriesRange> {
self.rix += 1;
if let Some(rng) = self.binrange.range_at(self.rix) {
trace!("{} next_bin_range {:?}", Self::type_name(), rng);
trace_binning!("{} next_bin_range {:?}", Self::type_name(), rng);
Some(rng)
} else {
trace!("{} next_bin_range None", Self::type_name());
trace_binning!("{} next_bin_range None", Self::type_name());
None
}
}

View File

@@ -22,7 +22,7 @@ macro_rules! trace_ingest {
#[allow(unused)]
macro_rules! trace_ingest_item {
($($arg:tt)*) => {
if true {
if false {
info!($($arg)*);
}
};
@@ -30,8 +30,11 @@ macro_rules! trace_ingest_item {
#[allow(unused)]
macro_rules! trace2 {
($($arg:tt)*) => {};
($($arg:tt)*) => { trace!($($arg)*); };
($($arg:tt)*) => {
if false {
trace!($($arg)*);
}
};
}
pub trait TimeBinnerCommonV0Trait {

View File

@@ -3,6 +3,7 @@ pub mod histo;
pub mod query;
pub mod range;
pub mod status;
pub mod stream_impl_tracer;
pub mod streamext;
pub mod ttl;
@@ -116,6 +117,12 @@ pub const DATETIME_FMT_9MS: &str = "%Y-%m-%dT%H:%M:%S.%9fZ";
const TEST_BACKEND: &str = "testbackend-00";
#[allow(non_upper_case_globals)]
pub const trigger: [&'static str; 1] = [
//
"S30CB05-VMCP-A010:PRESSURE",
];
pub struct OnDrop<F>
where
F: FnOnce() -> (),

View File

@@ -0,0 +1,43 @@
use crate::log::*;
pub struct StreamImplTracer {
name: String,
npoll_cnt: usize,
npoll_max: usize,
loop_cnt: usize,
loop_max: usize,
}
impl StreamImplTracer {
pub fn new(name: String, npoll_max: usize, loop_max: usize) -> Self {
Self {
name,
npoll_cnt: 0,
npoll_max,
loop_cnt: 0,
loop_max,
}
}
pub fn poll_enter(&mut self) -> bool {
self.npoll_cnt += 1;
if self.npoll_cnt >= self.npoll_max {
trace!("{} poll {} reached limit", self.name, self.npoll_cnt);
true
} else {
trace!("{} poll {}", self.name, self.npoll_cnt);
false
}
}
pub fn loop_enter(&mut self) -> bool {
self.loop_cnt += 1;
if self.loop_cnt >= self.loop_max {
trace!("{} loop {} reached limit", self.name, self.loop_cnt);
true
} else {
trace!("{} loop {}", self.name, self.loop_cnt);
false
}
}
}

View File

@@ -82,7 +82,8 @@ fn raw_data_00() {
ScalarType::I32,
Shape::Scalar,
);
let select = EventsSubQuerySelect::new(fetch_info.into(), range.into(), TransformQuery::default_events());
let select =
EventsSubQuerySelect::new(fetch_info.into(), range.into(), false, TransformQuery::default_events());
let settings = EventsSubQuerySettings::default();
let log_level = String::new();
let qu = EventsSubQuery::from_parts(select, settings, "dummy".into(), log_level);

View File

@@ -31,13 +31,12 @@ pub async fn scylla_channel_event_stream(
debug!("scylla_channel_event_stream {evq:?}");
// TODO depends in general on the query
// TODO why both in PlainEventsQuery and as separate parameter? Check other usages.
// let do_one_before_range = evq.need_one_before_range();
let do_one_before_range = false;
let series = SeriesId::new(chconf.series());
let scalar_type = chconf.scalar_type();
let shape = chconf.shape();
let do_test_stream_error = false;
let readopts = EventReadOpts::new(evq.need_value_data(), evq.transform().enum_as_string().unwrap_or(false));
let _series = SeriesId::new(chconf.series());
let readopts = EventReadOpts::new(
evq.need_one_before_range(),
evq.need_value_data(),
evq.transform().enum_as_string().unwrap_or(false),
);
let stream: Pin<Box<dyn Stream<Item = _> + Send>> = if let Some(rt) = evq.use_rt() {
let x = scyllaconn::events2::events::EventsStreamRt::new(
rt,

View File

@@ -59,6 +59,7 @@ pub struct PlainEventsQuery {
log_level: String,
#[serde(default)]
use_rt: Option<RetentionTime>,
querymarker: String,
}
impl PlainEventsQuery {
@@ -85,6 +86,7 @@ impl PlainEventsQuery {
create_errors: Vec::new(),
log_level: String::new(),
use_rt: None,
querymarker: String::new(),
}
}
@@ -97,7 +99,7 @@ impl PlainEventsQuery {
}
pub fn one_before_range(&self) -> bool {
self.transform.need_one_before_range()
self.one_before_range || self.transform.need_one_before_range()
}
pub fn transform(&self) -> &TransformQuery {
@@ -296,6 +298,7 @@ impl FromUrl for PlainEventsQuery {
.map(Some)
.map_err(|_| Error::with_public_msg_no_trace(format!("can not parse useRt: {}", k)))
})?,
querymarker: pairs.get("querymarker").map_or(String::new(), |x| x.to_string()),
};
Ok(ret)
}
@@ -308,12 +311,10 @@ impl AppendToUrl for PlainEventsQuery {
SeriesRange::PulseRange(_) => todo!(),
}
self.channel.append_to_url(url);
{
let mut g = url.query_pairs_mut();
if self.one_before_range() {
g.append_pair("oneBeforeRange", "true");
}
}
let mut g = url.query_pairs_mut();
g.append_pair("oneBeforeRange", &self.one_before_range().to_string());
g.append_pair("querymarker", &self.querymarker);
drop(g);
self.transform.append_to_url(url);
let mut g = url.query_pairs_mut();
if let Some(x) = &self.timeout {
@@ -365,15 +366,22 @@ impl AppendToUrl for PlainEventsQuery {
pub struct EventsSubQuerySelect {
ch_conf: ChannelTypeConfigGen,
range: SeriesRange,
one_before_range: bool,
transform: TransformQuery,
wasm1: Option<String>,
}
impl EventsSubQuerySelect {
pub fn new(ch_info: ChannelTypeConfigGen, range: SeriesRange, transform: TransformQuery) -> Self {
pub fn new(
ch_info: ChannelTypeConfigGen,
range: SeriesRange,
one_before_range: bool,
transform: TransformQuery,
) -> Self {
Self {
ch_conf: ch_info,
range,
one_before_range,
transform,
wasm1: None,
}
@@ -510,6 +518,10 @@ impl EventsSubQuery {
&self.select.range
}
pub fn need_one_before_range(&self) -> bool {
self.select.one_before_range
}
pub fn transform(&self) -> &TransformQuery {
&self.select.transform
}

View File

@@ -24,6 +24,15 @@ use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
#[allow(unused)]
macro_rules! trace_fetch {
($($arg:tt)*) => {
if true {
trace!($($arg)*);
}
};
}
#[allow(unused)]
macro_rules! trace_emit {
($($arg:tt)*) => {
@@ -46,11 +55,13 @@ macro_rules! warn_item {
pub struct EventReadOpts {
pub with_values: bool,
pub enum_as_strings: bool,
pub one_before: bool,
}
impl EventReadOpts {
pub fn new(with_values: bool, enum_as_strings: bool) -> Self {
pub fn new(one_before: bool, with_values: bool, enum_as_strings: bool) -> Self {
Self {
one_before,
with_values,
enum_as_strings,
}
@@ -84,14 +95,20 @@ enum ReadingState {
FetchEvents(FetchEvents),
}
struct Reading {
struct ReadingBck {
scyqueue: ScyllaQueue,
reading_state: ReadingState,
}
struct ReadingFwd {
scyqueue: ScyllaQueue,
reading_state: ReadingState,
}
enum State {
Begin,
Reading(Reading),
ReadingBck(ReadingBck),
ReadingFwd(ReadingFwd),
InputDone,
Done,
}
@@ -105,6 +122,8 @@ pub struct EventsStreamRt {
state: State,
scyqueue: ScyllaQueue,
msp_inp: MspStreamRt,
msp_buf: VecDeque<TsMs>,
msp_buf_bck: VecDeque<TsMs>,
out: VecDeque<Box<dyn Events>>,
ts_seen_max: u64,
}
@@ -129,29 +148,43 @@ impl EventsStreamRt {
state: State::Begin,
scyqueue,
msp_inp,
msp_buf: VecDeque::new(),
msp_buf_bck: VecDeque::new(),
out: VecDeque::new(),
ts_seen_max: 0,
}
}
fn make_msp_read_fut(
msp_inp: &mut MspStreamRt,
) -> Pin<Box<dyn Future<Output = Option<Result<TsMs, crate::events2::msp::Error>>> + Send>> {
trace_fetch!("make_msp_read_fut");
let msp_inp = unsafe {
let ptr = msp_inp as *mut MspStreamRt;
&mut *ptr
};
let fut = Box::pin(msp_inp.next());
fut
}
fn make_read_events_fut(
&mut self,
ts_msp: TsMs,
bck: bool,
scyqueue: ScyllaQueue,
) -> Pin<Box<dyn Future<Output = Result<Box<dyn Events>, Error>> + Send>> {
let fwd = true;
let opts = ReadNextValuesOpts::new(
self.rt.clone(),
self.series.clone(),
ts_msp,
self.range.clone(),
fwd,
!bck,
self.readopts.clone(),
scyqueue,
);
let scalar_type = self.ch_conf.scalar_type().clone();
let shape = self.ch_conf.shape().clone();
debug!("make_read_events_fut {:?} {:?}", shape, scalar_type);
trace_fetch!("make_read_events_fut bck {} {:?} {:?}", bck, shape, scalar_type);
let fut = async move {
let ret = match &shape {
Shape::Scalar => match &scalar_type {
@@ -168,9 +201,10 @@ impl EventsStreamRt {
ScalarType::BOOL => read_next_values::<bool>(opts).await,
ScalarType::STRING => read_next_values::<String>(opts).await,
ScalarType::Enum => {
debug!(
trace_fetch!(
"make_read_events_fut {:?} {:?} ------------- good",
shape, scalar_type
shape,
scalar_type
);
read_next_values::<EnumVariant>(opts).await
}
@@ -205,6 +239,60 @@ impl EventsStreamRt {
};
Box::pin(fut)
}
fn transition_to_bck_read(&mut self) {
trace_fetch!("transition_to_bck_read");
for ts in self.msp_buf.iter() {
if ts.ns() < self.range.beg() {
self.msp_buf_bck.push_front(ts.clone());
}
}
let c = self.msp_buf.iter().take_while(|x| x.ns() < self.range.beg()).count();
let g = c.max(1) - 1;
for _ in 0..g {
self.msp_buf.pop_front();
}
self.setup_bck_read();
}
fn setup_bck_read(&mut self) {
trace_fetch!("setup_bck_read");
if let Some(ts) = self.msp_buf_bck.pop_front() {
let scyqueue = self.scyqueue.clone();
let fut = self.make_read_events_fut(ts, true, scyqueue);
self.state = State::ReadingBck(ReadingBck {
scyqueue: self.scyqueue.clone(),
reading_state: ReadingState::FetchEvents(FetchEvents { fut }),
});
} else {
self.transition_to_fwd_read();
}
}
fn transition_to_fwd_read(&mut self) {
trace_fetch!("transition_to_fwd_read");
self.msp_buf_bck = VecDeque::new();
self.setup_fwd_read();
}
fn setup_fwd_read(&mut self) {
if let Some(ts) = self.msp_buf.pop_front() {
trace_fetch!("setup_fwd_read {ts}");
let scyqueue = self.scyqueue.clone();
let fut = self.make_read_events_fut(ts, false, scyqueue);
self.state = State::ReadingFwd(ReadingFwd {
scyqueue: self.scyqueue.clone(),
reading_state: ReadingState::FetchEvents(FetchEvents { fut }),
});
} else {
trace_fetch!("setup_fwd_read no msp");
let fut = Self::make_msp_read_fut(&mut self.msp_inp);
self.state = State::ReadingFwd(ReadingFwd {
scyqueue: self.scyqueue.clone(),
reading_state: ReadingState::FetchMsp(FetchMsp { fut }),
});
}
}
}
impl Stream for EventsStreamRt {
@@ -220,7 +308,7 @@ impl Stream for EventsStreamRt {
break Ready(Some(Err(Error::BadBatch)));
}
if let Some(item_min) = item.ts_min() {
if item_min < self.range.beg().ns() {
if !self.readopts.one_before && item_min < self.range.beg().ns() {
warn_item!(
"{}out of range error A {} {:?}",
"\n\n--------------------------\n",
@@ -284,29 +372,84 @@ impl Stream for EventsStreamRt {
}
break match &mut self.state {
State::Begin => {
let msp_inp = unsafe {
let ptr = (&mut self.msp_inp) as *mut MspStreamRt;
&mut *ptr
};
let fut = Box::pin(msp_inp.next());
self.state = State::Reading(Reading {
scyqueue: self.scyqueue.clone(),
reading_state: ReadingState::FetchMsp(FetchMsp { fut }),
});
if self.readopts.one_before {
trace_fetch!("State::Begin Bck");
let fut = Self::make_msp_read_fut(&mut self.msp_inp);
self.state = State::ReadingBck(ReadingBck {
scyqueue: self.scyqueue.clone(),
reading_state: ReadingState::FetchMsp(FetchMsp { fut }),
});
} else {
trace_fetch!("State::Begin Fwd");
let fut = Self::make_msp_read_fut(&mut self.msp_inp);
self.state = State::ReadingFwd(ReadingFwd {
scyqueue: self.scyqueue.clone(),
reading_state: ReadingState::FetchMsp(FetchMsp { fut }),
});
}
continue;
}
State::Reading(st) => match &mut st.reading_state {
State::ReadingBck(st) => match &mut st.reading_state {
ReadingState::FetchMsp(st2) => match st2.fut.poll_unpin(cx) {
Ready(Some(Ok(ts))) => {
let scyqueue = st.scyqueue.clone();
let fut = self.make_read_events_fut(ts, scyqueue);
if let State::Reading(st) = &mut self.state {
st.reading_state = ReadingState::FetchEvents(FetchEvents { fut });
continue;
trace_fetch!("ReadingBck FetchMsp {:?}", ts);
self.msp_buf.push_back(ts);
if ts.ns() >= self.range.beg() {
self.transition_to_bck_read();
} else {
self.state = State::Done;
Ready(Some(Err(Error::Logic)))
let fut = Self::make_msp_read_fut(&mut self.msp_inp);
self.state = State::ReadingBck(ReadingBck {
scyqueue: self.scyqueue.clone(),
reading_state: ReadingState::FetchMsp(FetchMsp { fut }),
});
}
continue;
}
Ready(Some(Err(e))) => Ready(Some(Err(e.into()))),
Ready(None) => {
self.transition_to_bck_read();
continue;
}
Pending => Pending,
},
ReadingState::FetchEvents(st2) => match st2.fut.poll_unpin(cx) {
Ready(Ok(mut x)) => {
use items_2::merger::Mergeable;
trace_fetch!("ReadingBck FetchEvents got len {:?}", x.len());
if let Some(ix) = Mergeable::find_highest_index_lt(&x, self.range.beg().ns()) {
trace_fetch!("ReadingBck FetchEvents find_highest_index_lt {:?}", ix);
let mut y = Mergeable::new_empty(&x);
match Mergeable::drain_into(&mut x, &mut y, (ix, 1 + ix)) {
Ok(()) => {
trace_fetch!("ReadingBck FetchEvents drained y len {:?}", y.len());
self.out.push_back(y);
self.transition_to_fwd_read();
continue;
}
Err(e) => {
self.state = State::Done;
Ready(Some(Err(e.into())))
}
}
} else {
trace_fetch!("ReadingBck FetchEvents find_highest_index_lt None");
self.setup_bck_read();
continue;
}
}
Ready(Err(e)) => {
self.state = State::Done;
Ready(Some(Err(e.into())))
}
Pending => Pending,
},
},
State::ReadingFwd(st) => match &mut st.reading_state {
ReadingState::FetchMsp(st2) => match st2.fut.poll_unpin(cx) {
Ready(Some(Ok(ts))) => {
self.msp_buf.push_back(ts);
self.setup_fwd_read();
continue;
}
Ready(Some(Err(e))) => Ready(Some(Err(e.into()))),
Ready(None) => {
@@ -318,18 +461,8 @@ impl Stream for EventsStreamRt {
ReadingState::FetchEvents(st2) => match st2.fut.poll_unpin(cx) {
Ready(Ok(x)) => {
self.out.push_back(x);
let msp_inp = unsafe {
let ptr = (&mut self.msp_inp) as *mut MspStreamRt;
&mut *ptr
};
let fut = Box::pin(msp_inp.next());
if let State::Reading(st) = &mut self.state {
st.reading_state = ReadingState::FetchMsp(FetchMsp { fut });
continue;
} else {
self.state = State::Done;
Ready(Some(Err(Error::Logic)))
}
self.setup_fwd_read();
continue;
}
Ready(Err(e)) => {
self.state = State::Done;

View File

@@ -5,17 +5,54 @@ use futures_util::StreamExt;
use items_0::Events;
use items_2::merger::Mergeable;
use netpod::log::*;
use netpod::stream_impl_tracer::StreamImplTracer;
use netpod::TsNano;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
#[allow(unused)]
macro_rules! trace_transition {
($($arg:tt)*) => {
if true {
trace!($($arg)*);
}
};
}
#[allow(unused)]
macro_rules! trace_emit {
($($arg:tt)*) => {
if true {
trace!($($arg)*);
}
};
}
macro_rules! tracer_poll_enter {
($self:expr) => {
if false && $self.tracer.poll_enter() {
return Ready(Some(Err(Error::LimitPoll)));
}
};
}
macro_rules! tracer_loop_enter {
($self:expr) => {
if false && $self.tracer.loop_enter() {
return Ready(Some(Err(Error::LimitLoop)));
}
};
}
#[derive(Debug, ThisError)]
#[cstm(name = "EventsFirstBefore")]
pub enum Error {
Unordered,
Logic,
Input(Box<dyn std::error::Error + Send>),
LimitPoll,
LimitLoop,
}
pub enum Output<T> {
@@ -38,6 +75,7 @@ where
inp: S,
state: State,
buf: Option<T>,
tracer: StreamImplTracer,
}
impl<S, T> FirstBeforeAndInside<S, T>
@@ -46,11 +84,13 @@ where
T: Events + Mergeable + Unpin,
{
pub fn new(inp: S, ts0: TsNano) -> Self {
trace_transition!("FirstBeforeAndInside::new");
Self {
ts0,
inp,
state: State::Begin,
buf: None,
tracer: StreamImplTracer::new("FirstBeforeAndInside".into(), 2000, 100),
}
}
}
@@ -65,7 +105,9 @@ where
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
tracer_poll_enter!(self);
loop {
tracer_loop_enter!(self);
break match &self.state {
State::Begin => match self.inp.poll_next_unpin(cx) {
Ready(Some(Ok(mut item))) => {
@@ -79,8 +121,13 @@ where
// Separate events into before and bulk
let tss = item.tss();
let pp = tss.partition_point(|&x| x < self.ts0.ns());
if pp >= tss.len() {
// all entries are before
trace_transition!("partition_point {pp:?} {n:?}", n = tss.len());
if pp > item.len() {
error!("bad partition point {} {}", pp, item.len());
self.state = State::Done;
Ready(Some(Err(Error::Logic)))
} else if pp == item.len() {
// all entries are before, or empty item
if self.buf.is_none() {
self.buf = Some(item.new_empty());
}
@@ -95,16 +142,19 @@ where
}
} else if pp == 0 {
// all entries are bulk
debug!("transition immediately to bulk");
trace_transition!("transition immediately to bulk");
self.state = State::Bulk;
let o1 = core::mem::replace(&mut self.buf, Some(item.new_empty()))
.unwrap_or_else(|| item.new_empty());
Ready(Some(Ok(Output::First(o1, item))))
} else {
// mixed
if self.buf.is_none() {
self.buf = Some(item.new_empty());
}
match item.drain_into_evs(self.buf.as_mut().unwrap(), (0, pp)) {
Ok(()) => {
debug!("transition with mixed to bulk");
trace_transition!("transition with mixed to bulk");
self.state = State::Bulk;
let o1 = core::mem::replace(&mut self.buf, Some(item.new_empty()))
.unwrap_or_else(|| item.new_empty());
@@ -124,12 +174,21 @@ where
}
Ready(None) => {
self.state = State::Done;
Ready(None)
if let Some(x) = self.buf.take() {
let empty = x.new_empty();
Ready(Some(Ok(Output::First(x, empty))))
} else {
Ready(None)
}
}
Pending => Pending,
},
State::Bulk => {
if self.buf.as_ref().map_or(0, |x| x.len()) != 0 {
error!(
"State::Bulk but buf non-empty {}",
self.buf.as_ref().map_or(0, |x| x.len())
);
self.state = State::Done;
Ready(Some(Err(Error::Logic)))
} else {
@@ -140,7 +199,7 @@ where
let e = Error::Unordered;
Ready(Some(Err(e)))
} else {
debug!("output bulk item len {}", item.len());
trace_emit!("output bulk item len {}", item.len());
Ready(Some(Ok(Output::Bulk(item))))
}
}
@@ -149,7 +208,7 @@ where
Ready(Some(Err(Error::Input(Box::new(e)))))
}
Ready(None) => {
debug!("in bulk, input done");
trace_emit!("in bulk, input done");
self.state = State::Done;
Ready(None)
}

View File

@@ -16,6 +16,7 @@ use items_2::merger::Mergeable;
use netpod::log::*;
use netpod::range::evrange::NanoRange;
use netpod::range::evrange::SeriesRange;
use netpod::stream_impl_tracer::StreamImplTracer;
use netpod::ttl::RetentionTime;
use netpod::ChConf;
use std::collections::VecDeque;
@@ -23,6 +24,40 @@ use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
#[allow(unused)]
macro_rules! trace_fetch {
($($arg:tt)*) => {
if true {
trace!($($arg)*);
}
};
}
#[allow(unused)]
macro_rules! trace_emit {
($($arg:tt)*) => {
if true {
trace!($($arg)*);
}
};
}
macro_rules! tracer_poll_enter {
($self:expr) => {
if false && $self.tracer.poll_enter() {
return Ready(Some(Err(Error::LimitPoll)));
}
};
}
macro_rules! tracer_loop_enter {
($self:expr) => {
if false && $self.tracer.loop_enter() {
return Ready(Some(Err(Error::LimitLoop)));
}
};
}
#[derive(Debug, ThisError)]
#[cstm(name = "EventsMergeRt")]
pub enum Error {
@@ -31,6 +66,8 @@ pub enum Error {
Logic,
OrderMin,
OrderMax,
LimitPoll,
LimitLoop,
}
#[allow(unused)]
@@ -116,10 +153,12 @@ pub struct MergeRts {
out: VecDeque<ChannelEvents>,
buf_before: Option<ChannelEvents>,
ts_seen_max: u64,
tracer: StreamImplTracer,
}
impl MergeRts {
pub fn new(ch_conf: ChConf, range: ScyllaSeriesRange, readopts: EventReadOpts, scyqueue: ScyllaQueue) -> Self {
info!("MergeRts readopts {readopts:?}");
Self {
ch_conf,
range_mt: range.clone(),
@@ -137,6 +176,7 @@ impl MergeRts {
out: VecDeque::new(),
buf_before: None,
ts_seen_max: 0,
tracer: StreamImplTracer::new("MergeRts".into(), 2000, 2000),
}
}
@@ -145,7 +185,7 @@ impl MergeRts {
let limbuf = &VecDeque::new();
let inpdst = &mut self.inp_st;
let range = Self::constrained_range(&self.range, limbuf);
debug!("setup_first_st constrained beg {}", range.beg().ns());
trace_fetch!("setup_first_st constrained beg {}", range.beg().ns());
let tsbeg = range.beg();
let inp = EventsStreamRt::new(
rt,
@@ -164,7 +204,7 @@ impl MergeRts {
let inpdst = &mut self.inp_mt;
let range = Self::constrained_range(&self.range_mt, limbuf);
self.range_lt = range.clone();
debug!("setup_first_mt constrained beg {}", range.beg().ns());
trace_fetch!("setup_first_mt constrained beg {}", range.beg().ns());
let tsbeg = range.beg();
let inp = EventsStreamRt::new(
rt,
@@ -182,7 +222,7 @@ impl MergeRts {
let limbuf = &self.buf_mt;
let inpdst = &mut self.inp_lt;
let range = Self::constrained_range(&self.range_lt, limbuf);
debug!("setup_first_lt constrained beg {}", range.beg().ns());
trace_fetch!("setup_first_lt constrained beg {}", range.beg().ns());
let tsbeg = range.beg();
let inp = EventsStreamRt::new(
rt,
@@ -196,37 +236,35 @@ impl MergeRts {
}
fn setup_read_st(&mut self) -> ReadEvents {
let stream = unsafe { &mut *(self.inp_st.as_mut().unwrap().as_mut() as *mut TI) };
let fut = Box::pin(stream.next());
ReadEvents { fut }
trace_fetch!("setup_read_st");
Self::setup_read_any(&mut self.inp_st)
}
fn setup_read_mt(&mut self) -> ReadEvents {
let stream = unsafe { &mut *(self.inp_mt.as_mut().unwrap().as_mut() as *mut TI) };
let fut = Box::pin(stream.next());
ReadEvents { fut }
trace_fetch!("setup_read_mt");
Self::setup_read_any(&mut self.inp_mt)
}
fn setup_read_lt(&mut self) -> ReadEvents {
let stream = unsafe { &mut *(self.inp_lt.as_mut().unwrap().as_mut() as *mut TI) };
let fut = Box::pin(stream.next());
ReadEvents { fut }
trace_fetch!("setup_read_lt");
Self::setup_read_any(&mut self.inp_lt)
}
fn setup_read_any(inp: &mut Option<Box<TI>>) -> ReadEvents {
trace_fetch!("setup_read_any");
let stream = unsafe { &mut *(inp.as_mut().unwrap().as_mut() as *mut TI) };
let fut = Box::pin(stream.next());
ReadEvents { fut }
}
fn constrained_range(full: &ScyllaSeriesRange, buf: &VecDeque<ChannelEvents>) -> ScyllaSeriesRange {
debug!("constrained_range {:?} {:?}", full, buf.front());
trace_fetch!("constrained_range {:?} {:?}", full, buf.front());
if let Some(e) = buf.front() {
if let Some(ts) = e.ts_min() {
let nrange = NanoRange::from((full.beg().ns(), ts));
ScyllaSeriesRange::from(&SeriesRange::from(nrange))
} else {
debug!("no ts even though should not have empty buffers");
debug!("constrained_range no ts even though should not have empty buffers");
full.clone()
}
} else {
@@ -235,6 +273,7 @@ impl MergeRts {
}
fn handle_first_st(&mut self, mut before: ChannelEvents, bulk: ChannelEvents) {
trace_fetch!("handle_first_st");
Self::move_latest_to_before_buf(&mut before, &mut self.buf_before);
self.buf_st.push_back(bulk);
self.setup_first_mt();
@@ -242,6 +281,7 @@ impl MergeRts {
}
fn handle_first_mt(&mut self, mut before: ChannelEvents, bulk: ChannelEvents) {
trace_fetch!("handle_first_mt");
Self::move_latest_to_before_buf(&mut before, &mut self.buf_before);
self.buf_mt.push_back(bulk);
self.setup_first_lt();
@@ -249,27 +289,39 @@ impl MergeRts {
}
fn handle_first_lt(&mut self, mut before: ChannelEvents, bulk: ChannelEvents) {
trace_fetch!("handle_first_lt");
Self::move_latest_to_before_buf(&mut before, &mut self.buf_before);
self.buf_lt.push_back(bulk);
self.push_out_one_before();
let buf = core::mem::replace(&mut self.buf_lt, VecDeque::new());
self.state = State::ReadingLt(None, buf, self.inp_lt.take());
}
fn move_latest_to_before_buf(before: &mut ChannelEvents, buf: &mut Option<ChannelEvents>) {
trace_fetch!("move_latest_to_before_buf");
if buf.is_none() {
*buf = Some(before.new_empty());
}
let buf = buf.as_mut().unwrap();
if let Some(tsn) = before.ts_max() {
if let Some(tse) = buf.ts_max() {
if tsn > tse {
let n = before.len();
buf.clear();
before.drain_into(buf, (n - 1, n)).unwrap();
}
if buf.ts_max().map_or(true, |x| tsn > x) {
let n = before.len();
buf.clear();
before.drain_into(buf, (n - 1, n)).unwrap();
}
}
}
fn push_out_one_before(&mut self) {
if let Some(buf) = self.buf_before.take() {
trace_fetch!("push_out_one_before len {len:?}", len = buf.len());
if buf.len() != 0 {
self.out.push_back(buf);
}
} else {
trace_fetch!("push_out_one_before no buffer");
}
}
}
impl Stream for MergeRts {
@@ -277,13 +329,15 @@ impl Stream for MergeRts {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
tracer_poll_enter!(self);
let mut out2 = VecDeque::new();
loop {
tracer_loop_enter!(self);
while let Some(x) = out2.pop_front() {
self.out.push_back(x);
}
if let Some(item) = self.out.pop_front() {
debug!("emit item {} {:?}", items_0::Events::verify(&item), item);
trace_emit!("emit item {} {:?}", items_0::Events::verify(&item), item);
if items_0::Events::verify(&item) != true {
debug!("{}bad item {:?}", "\n\n--------------------------\n", item);
self.state = State::Done;
@@ -310,6 +364,9 @@ impl Stream for MergeRts {
self.ts_seen_max = item_max;
}
}
if let Some(ix) = item.find_highest_index_lt(self.range.beg().ns()) {
trace_fetch!("see item before range ix {ix}");
}
break Ready(Some(Ok(item)));
}
break match &mut self.state {
@@ -321,7 +378,7 @@ impl Stream for MergeRts {
State::FetchFirstSt(st2) => match st2.fut.poll_unpin(cx) {
Ready(Some(Ok(x))) => match x {
firstbefore::Output::First(before, bulk) => {
debug!("have first from ST");
trace_fetch!("have first from ST");
self.handle_first_st(before, bulk);
continue;
}
@@ -336,7 +393,7 @@ impl Stream for MergeRts {
Ready(Some(Err(e.into())))
}
Ready(None) => {
debug!("no first from ST");
trace_fetch!("no first from ST");
self.inp_st = None;
self.setup_first_mt();
self.state = State::FetchFirstMt(self.setup_read_mt());
@@ -347,7 +404,7 @@ impl Stream for MergeRts {
State::FetchFirstMt(st2) => match st2.fut.poll_unpin(cx) {
Ready(Some(Ok(x))) => match x {
firstbefore::Output::First(before, bulk) => {
debug!("have first from MT");
trace_fetch!("have first from MT");
self.handle_first_mt(before, bulk);
continue;
}
@@ -362,7 +419,7 @@ impl Stream for MergeRts {
Ready(Some(Err(e.into())))
}
Ready(None) => {
debug!("no first from MT");
trace_fetch!("no first from MT");
self.inp_mt = None;
self.setup_first_lt();
self.state = State::FetchFirstLt(self.setup_read_lt());
@@ -373,7 +430,7 @@ impl Stream for MergeRts {
State::FetchFirstLt(st2) => match st2.fut.poll_unpin(cx) {
Ready(Some(Ok(x))) => match x {
firstbefore::Output::First(before, bulk) => {
debug!("have first from LT");
trace_fetch!("have first from LT");
self.handle_first_lt(before, bulk);
continue;
}
@@ -388,8 +445,9 @@ impl Stream for MergeRts {
Ready(Some(Err(e.into())))
}
Ready(None) => {
debug!("no first from LT");
trace_fetch!("no first from LT");
self.inp_lt = None;
self.push_out_one_before();
let buf = core::mem::replace(&mut self.buf_lt, VecDeque::new());
self.state = State::ReadingLt(None, buf, self.inp_lt.take());
continue;
@@ -433,7 +491,7 @@ impl Stream for MergeRts {
self.state = State::ReadingLt(Some(Self::setup_read_any(inp)), buf, inp.take());
continue;
} else {
debug!("transition ReadingLt to ReadingMt");
trace_emit!("transition ReadingLt to ReadingMt");
let buf = core::mem::replace(&mut self.buf_mt, VecDeque::new());
self.state = State::ReadingMt(None, buf, self.inp_mt.take());
continue;
@@ -476,7 +534,7 @@ impl Stream for MergeRts {
self.state = State::ReadingMt(Some(Self::setup_read_any(inp)), buf, inp.take());
continue;
} else {
debug!("transition ReadingMt to ReadingSt");
trace_emit!("transition ReadingMt to ReadingSt");
let buf = core::mem::replace(&mut self.buf_st, VecDeque::new());
self.state = State::ReadingSt(None, buf, self.inp_st.take());
continue;
@@ -519,7 +577,7 @@ impl Stream for MergeRts {
self.state = State::ReadingSt(Some(Self::setup_read_any(inp)), buf, inp.take());
continue;
} else {
debug!("fully done");
trace_emit!("fully done");
Ready(None)
}
}

View File

@@ -159,7 +159,6 @@ impl ScyllaWorker {
let stmts = Arc::new(stmts);
info!("scylla worker PREPARE DONE");
loop {
info!("scylla worker WAIT FOR JOB");
let x = self.rx.recv().await;
let job = match x {
Ok(x) => x,
@@ -169,14 +168,12 @@ impl ScyllaWorker {
};
match job {
Job::FindTsMsp(rt, series, range, bck, tx) => {
info!("scylla worker Job::FindTsMsp");
let res = crate::events2::msp::find_ts_msp(&rt, series, range, bck, &stmts, &scy).await;
if tx.send(res.map_err(Into::into)).await.is_err() {
// TODO count for stats
}
}
Job::ReadNextValues(job) => {
info!("scylla worker Job::ReadNextValues");
let fut = (job.futgen)(scy.clone(), stmts.clone());
let res = fut.await;
if job.tx.send(res.map_err(Into::into)).await.is_err() {
@@ -184,7 +181,6 @@ impl ScyllaWorker {
}
}
Job::AccountingReadTs(rt, ts, tx) => {
info!("scylla worker Job::AccountingReadTs");
let ks = match &rt {
RetentionTime::Short => &self.scyconf_st.keyspace,
RetentionTime::Medium => &self.scyconf_mt.keyspace,

View File

@@ -30,6 +30,7 @@ pub async fn dyn_events_stream(
let subq = make_sub_query(
ch_conf,
evq.range().clone(),
evq.one_before_range(),
evq.transform().clone(),
evq.test_do_wasm(),
evq,

View File

@@ -110,6 +110,7 @@ where
item
}
} else {
trace!("discarding events len {:?}", ilge - 1);
let mut dummy = item.new_empty();
item.drain_into(&mut dummy, (0, ilge - 1))
.map_err(|e| format!("{e} unexpected MergeError while remove of items"))?;

View File

@@ -200,6 +200,7 @@ where
pub fn make_sub_query<SUB>(
ch_conf: ChannelTypeConfigGen,
range: SeriesRange,
one_before_range: bool,
transform: TransformQuery,
test_do_wasm: Option<&str>,
sub: SUB,
@@ -209,7 +210,7 @@ pub fn make_sub_query<SUB>(
where
SUB: Into<EventsSubQuerySettings>,
{
let mut select = EventsSubQuerySelect::new(ch_conf, range, transform);
let mut select = EventsSubQuerySelect::new(ch_conf, range, one_before_range, transform);
if let Some(wasm1) = test_do_wasm {
select.set_wasm1(wasm1.into());
}

View File

@@ -46,6 +46,7 @@ async fn timebinnable_stream(
let subq = make_sub_query(
ch_conf,
range.clone().into(),
one_before_range,
query.transform().clone(),
query.test_do_wasm(),
&query,