WIP typechecks

This commit is contained in:
Dominik Werder
2024-11-25 10:19:39 +01:00
parent 38f8b19473
commit f168280d18
4 changed files with 80 additions and 61 deletions

View File

@@ -9,6 +9,7 @@ use futures_util::TryStreamExt;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
use items_2::binning::container_events::ContainerEvents;
use items_2::channelevents::ChannelEvents;
use netpod::log::*;
use netpod::ChConf;
@@ -66,25 +67,21 @@ pub async fn scylla_channel_event_stream(
Ok(k) => match k {
ChannelEvents::Events(mut k) => {
if let SeriesKind::ChannelStatus = chconf.kind() {
use items_0::Empty;
type C1 = items_2::eventsdim0::EventsDim0<u64>;
type C2 = items_2::eventsdim0::EventsDim0<String>;
type C1 = ContainerEvents<u64>;
type C2 = ContainerEvents<String>;
if let Some(j) = k.as_any_mut().downcast_mut::<C1>() {
let mut g = C2::empty();
let tss = j.tss();
let vals = j.private_values_ref();
for (&ts, &val) in tss.iter().zip(vals.iter()) {
let mut g = C2::new();
for (&ts, val) in j.iter_zip() {
use netpod::channelstatus as cs2;
let val = match cs2::ChannelStatus::from_kind(val as _) {
Ok(x) => x.to_user_variant_string(),
Err(_) => format!("{}", val),
};
if val.len() != 0 {
g.push_back(ts, 0, val);
g.push_back(ts, val);
}
}
Ok(ChannelEvents::Events(Box::new(g)))
// Ok(ChannelEvents::Events(k))
} else {
Ok(ChannelEvents::Events(k))
}

View File

@@ -56,7 +56,7 @@ impl From<crate::worker::Error> for Error {
pub(super) trait ValTy: Sized + 'static {
type ScaTy: ScalarOps + std::default::Default;
type ScyTy: scylla::cql_to_rust::FromCqlVal<scylla::frame::response::result::CqlValue>;
type Container: BinningggContainerEventsDyn + Appendable<Self>;
type Container: BinningggContainerEventsDyn + Empty + Appendable<Self>;
fn from_scyty(inp: Self::ScyTy) -> Self;
fn from_valueblob(inp: Vec<u8>) -> Self;
fn table_name() -> &'static str;
@@ -538,7 +538,7 @@ where
let row = x?;
let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64);
let value = <ST as ValTy>::from_valueblob(row.1);
ret.push(ts.ns(), 0, value);
ret.push(ts, value);
}
ret
} else {
@@ -548,7 +548,7 @@ where
let row = x?;
let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64);
let value = <ST as ValTy>::from_scyty(row.1);
ret.push(ts.ns(), 0, value);
ret.push(ts, value);
i += 1;
if i % 2000 == 0 {
jobtrace.add_event_now(ReadEventKind::ScyllaReadRow(i));
@@ -565,7 +565,7 @@ where
let row = x?;
let ts = TsNano::from_ns(ts_msp.ns_u64() + row.0 as u64);
let value = <ST as ValTy>::default();
ret.push(ts.ns(), 0, value);
ret.push(ts, value);
}
ret
}
@@ -640,7 +640,7 @@ fn convert_rows_0<ST: ValTy>(
// TODO count as logic error
error!("ts >= range.beg");
} else if ts < range.beg() {
ret.push(ts.ns(), 0, value);
ret.push(ts, value);
} else {
*last_before = Some((ts, value));
}
@@ -649,7 +649,7 @@ fn convert_rows_0<ST: ValTy>(
// TODO count as logic error
error!("ts >= range.end");
} else if ts >= range.beg() {
ret.push(ts.ns(), 0, value);
ret.push(ts, value);
} else {
if last_before.is_none() {
warn!("encounter event before range in forward read {ts}");

View File

@@ -12,6 +12,7 @@ use futures_util::Future;
use futures_util::FutureExt;
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::merge::DrainIntoNewDynResult;
use items_0::merge::MergeableDyn;
use items_0::timebin::BinningggContainerEventsDyn;
use items_2::channelevents::ChannelEvents;
@@ -23,6 +24,7 @@ use netpod::ScalarType;
use netpod::Shape;
use netpod::TsMs;
use netpod::TsMsVecFmt;
use netpod::TsNano;
use std::collections::VecDeque;
use std::pin::Pin;
use std::task::Context;
@@ -78,6 +80,7 @@ pub enum Error {
Logic,
TruncateLogic,
AlreadyTaken,
DrainFailure,
}
struct FetchMsp {
@@ -259,7 +262,7 @@ pub struct EventsStreamRt {
msp_buf_bck: VecDeque<TsMs>,
out: VecDeque<Box<dyn BinningggContainerEventsDyn>>,
out_cnt: u64,
ts_seen_max: u64,
ts_seen_max: TsNano,
qucap: usize,
}
@@ -288,7 +291,7 @@ impl EventsStreamRt {
msp_buf_bck: VecDeque::new(),
out: VecDeque::new(),
out_cnt: 0,
ts_seen_max: 0,
ts_seen_max: TsNano::from_ns(0),
}
}
@@ -476,7 +479,7 @@ impl Stream for EventsStreamRt {
break Ready(Some(Err(Error::BadBatch)));
}
if let Some(item_min) = item.ts_min() {
if !self.readopts.one_before && item_min < self.range.beg().ns() {
if !self.readopts.one_before && item_min < self.range.beg() {
warn_item!(
"{}out of range error A {} {:?}",
"\n\n--------------------------\n",
@@ -493,15 +496,18 @@ impl Stream for EventsStreamRt {
item_min,
self.ts_seen_max
);
let mut r = MergeableDyn::new_empty(&item);
match MergeableDyn::find_highest_index_lt(&item, self.ts_seen_max) {
Some(ix) => match MergeableDyn::drain_into(&mut item, &mut r, (0, 1 + ix)) {
Ok(()) => {
// TODO count for metrics
match MergeableDyn::find_highest_index_lt(item.as_ref(), self.ts_seen_max) {
Some(ix) => match MergeableDyn::drain_into_new(item.as_mut(), 0..1 + ix) {
DrainIntoNewDynResult::Done(_) => {
// TODO count drained elements for metrics
}
Err(e) => {
DrainIntoNewDynResult::Partial(_) => {
self.state = State::Done;
break Ready(Some(Err(e.into())));
break Ready(Some(Err(Error::DrainFailure)));
}
DrainIntoNewDynResult::NotCompatible => {
self.state = State::Done;
break Ready(Some(Err(Error::DrainFailure)));
}
},
None => {
@@ -512,7 +518,7 @@ impl Stream for EventsStreamRt {
}
}
if let Some(item_max) = item.ts_max() {
if item_max >= self.range.end().ns() {
if item_max >= self.range.end() {
warn_item!(
"{}out of range error B {} {:?}",
"\n\n--------------------------\n",
@@ -535,7 +541,7 @@ impl Stream for EventsStreamRt {
self.ts_seen_max = item_max;
}
}
trace_emit!("deliver item {}", item.output_info());
trace_emit!("deliver item {:?}", item);
self.out_cnt += item.len() as u64;
break Ready(Some(Ok(ChannelEvents::Events(item))));
}
@@ -588,22 +594,25 @@ impl Stream for EventsStreamRt {
Ok((mut evs, jobtrace)) => {
trace_fetch!("ReadingBck {jobtrace}");
trace_fetch!("ReadingBck FetchEvents got len {}", evs.len());
for ts in MergeableDyn::tss_for_testing(&evs) {
for ts in MergeableDyn::tss_for_testing(evs.as_ref()) {
trace_every_event!("ReadingBck FetchEvents ts {}", ts.fmt());
}
if let Some(ix) = MergeableDyn::find_highest_index_lt(&evs, self.range.beg().ns()) {
if let Some(ix) = MergeableDyn::find_highest_index_lt(evs.as_ref(), self.range.beg()) {
trace_fetch!("ReadingBck FetchEvents find_highest_index_lt {:?}", ix);
let mut y = MergeableDyn::new_empty(&evs);
match MergeableDyn::drain_into(&mut evs, &mut y, (ix, 1 + ix)) {
Ok(()) => {
match MergeableDyn::drain_into_new(evs.as_mut(), ix..1 + ix) {
DrainIntoNewDynResult::Done(y) => {
trace_fetch!("ReadingBck FetchEvents drained y len {:?}", y.len());
self.out.push_back(y);
self.transition_to_fwd_read();
continue;
}
Err(e) => {
DrainIntoNewDynResult::Partial(_) => {
self.state = State::Done;
Ready(Some(Err(e.into())))
Ready(Some(Err(Error::DrainFailure)))
}
DrainIntoNewDynResult::NotCompatible => {
self.state = State::Done;
Ready(Some(Err(Error::DrainFailure)))
}
}
} else {
@@ -662,7 +671,7 @@ impl Stream for EventsStreamRt {
jobtrace
.add_event_now(crate::events::ReadEventKind::EventsStreamRtSees(evs.len() as u32));
trace_fetch!("ReadingFwd {jobtrace}");
for ts in MergeableDyn::tss_for_testing(&evs) {
for ts in MergeableDyn::tss_for_testing(evs.as_ref()) {
trace_every_event!("ReadingFwd FetchEvents ts {}", ts.fmt());
}
self.out.push_back(evs);

View File

@@ -11,6 +11,7 @@ use futures_util::Future;
use futures_util::FutureExt;
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::merge::DrainIntoNewResult;
use items_0::merge::MergeableTy;
use items_0::WithLen;
use items_2::channelevents::ChannelEvents;
@@ -270,26 +271,38 @@ impl MergeRtsChained {
}
}
fn handle_first_st(&mut self, mut before: ChannelEvents, bulk: ChannelEvents) {
fn handle_first_st(&mut self, mut before: Option<ChannelEvents>, bulk: Option<ChannelEvents>) {
trace_fetch!("handle_first_st");
Self::move_latest_to_before_buf(&mut before, &mut self.buf_before);
self.buf_st.push_back(bulk);
if let Some(before) = before.as_mut() {
Self::move_latest_to_before_buf(before, &mut self.buf_before);
}
if let Some(bulk) = bulk {
self.buf_st.push_back(bulk);
}
self.setup_first_mt();
self.state = State::FetchFirstMt(self.setup_read_mt());
}
fn handle_first_mt(&mut self, mut before: ChannelEvents, bulk: ChannelEvents) {
fn handle_first_mt(&mut self, mut before: Option<ChannelEvents>, bulk: Option<ChannelEvents>) {
trace_fetch!("handle_first_mt");
Self::move_latest_to_before_buf(&mut before, &mut self.buf_before);
self.buf_mt.push_back(bulk);
if let Some(before) = before.as_mut() {
Self::move_latest_to_before_buf(before, &mut self.buf_before);
}
if let Some(bulk) = bulk {
self.buf_mt.push_back(bulk);
}
self.setup_first_lt();
self.state = State::FetchFirstLt(self.setup_read_lt());
}
fn handle_first_lt(&mut self, mut before: ChannelEvents, bulk: ChannelEvents) {
fn handle_first_lt(&mut self, mut before: Option<ChannelEvents>, bulk: Option<ChannelEvents>) {
trace_fetch!("handle_first_lt");
Self::move_latest_to_before_buf(&mut before, &mut self.buf_before);
self.buf_lt.push_back(bulk);
if let Some(before) = before.as_mut() {
Self::move_latest_to_before_buf(before, &mut self.buf_before);
}
if let Some(bulk) = bulk {
self.buf_lt.push_back(bulk);
}
}
fn handle_all_firsts_done(&mut self) {
@@ -306,17 +319,20 @@ impl MergeRtsChained {
}
fn move_latest_to_before_buf(before: &mut ChannelEvents, buf: &mut Option<ChannelEvents>) {
let buf = buf.get_or_insert_with(|| {
trace_fetch!("move_latest_to_before_buf init before buf");
before.new_empty()
});
if let Some(tsn) = before.ts_max() {
let tsn = tsn;
if buf.ts_max().map_or(true, |x| tsn.ns() > x) {
if buf
.as_ref()
.map_or(true, |buf2| buf2.ts_max().map_or(true, |x| tsn > x))
{
trace_fetch!("move_latest_to_before_buf move possible before item {tsn}");
let n = before.len();
buf.clear();
before.drain_into(buf, (n - 1, n)).unwrap();
match before.drain_into_new(n - 1..n) {
DrainIntoNewResult::Done(x) => {
*buf = Some(x);
}
DrainIntoNewResult::Partial(_) => panic!(),
DrainIntoNewResult::NotCompatible => panic!(),
}
}
}
}
@@ -388,12 +404,11 @@ impl Stream for MergeRtsChained {
Ready(Some(Ok(x))) => match x {
onebeforeandbulk::Output::Before(before) => {
trace_fetch!("have first from ST");
let empty = before.new_empty();
self.handle_first_st(before, empty);
self.handle_first_st(Some(before), None);
continue;
}
onebeforeandbulk::Output::Bulk(item) => {
self.handle_first_st(item.new_empty(), item);
self.handle_first_st(None, Some(item));
continue;
}
},
@@ -414,12 +429,11 @@ impl Stream for MergeRtsChained {
Ready(Some(Ok(x))) => match x {
onebeforeandbulk::Output::Before(before) => {
trace_fetch!("have first from MT");
let empty = before.new_empty();
self.handle_first_mt(before, empty);
self.handle_first_mt(Some(before), None);
continue;
}
onebeforeandbulk::Output::Bulk(item) => {
self.handle_first_mt(item.new_empty(), item);
self.handle_first_mt(None, Some(item));
continue;
}
},
@@ -440,13 +454,12 @@ impl Stream for MergeRtsChained {
Ready(Some(Ok(x))) => match x {
onebeforeandbulk::Output::Before(before) => {
trace_fetch!("have first from LT");
let empty = before.new_empty();
self.handle_first_lt(before, empty);
self.handle_first_lt(Some(before), None);
self.handle_all_firsts_done();
continue;
}
onebeforeandbulk::Output::Bulk(item) => {
self.handle_first_lt(item.new_empty(), item);
self.handle_first_lt(None, Some(item));
self.handle_all_firsts_done();
continue;
}