Investigate config vs event mismatch waveform 1181 vs 162 ele

This commit is contained in:
Dominik Werder
2023-09-02 06:25:40 +02:00
parent 790edee192
commit 76c61f564c
17 changed files with 188 additions and 115 deletions

View File

@@ -8,23 +8,11 @@ pub mod binnedjson;
mod timeweightedjson;
use bytes::BytesMut;
use err::Error;
use std::future::Future;
fn run_test<F>(f: F) -> Result<(), Error>
where
F: Future<Output = Result<(), Error>> + Send,
{
let runtime = taskrun::get_runtime();
let _g = runtime.enter();
runtime.block_on(f)
//let jh = tokio::spawn(f);
//jh.await;
}
#[test]
fn bufs() {
use bytes::{Buf, BufMut};
use bytes::Buf;
use bytes::BufMut;
let mut buf = BytesMut::with_capacity(1024);
assert!(buf.as_mut().len() == 0);
buf.put_u32_le(123);

View File

@@ -1,5 +1,4 @@
use super::paths;
use crate::SfDbChConf;
use bytes::BytesMut;
use err::ErrStr;
use err::Error;
@@ -827,9 +826,9 @@ mod test {
beg: DAY + HOUR * 5,
end: DAY + HOUR * 8,
};
let chn = netpod::SfDbChannel::from_name(BACKEND, "scalar-i32-be");
let _chn = netpod::SfDbChannel::from_name(BACKEND, "scalar-i32-be");
// TODO read config from disk? Or expose the config from data generator?
let fetch_info = todo!();
let fetch_info = err::todoval();
// let fetch_info = SfChFetchInfo {
// channel: chn,
// keyspace: 2,

View File

@@ -1,4 +1,3 @@
use crate::eventchunkermultifile::EventChunkerMultifile;
use err::Error;
use futures_util::Stream;
use futures_util::StreamExt;
@@ -300,8 +299,7 @@ fn make_scalar_conv(
pub struct EventsDynStream {
scalar_type: ScalarType,
shape: Shape,
agg_kind: AggKind,
events_full: EventChunkerMultifile,
events_full: Pin<Box<dyn Stream<Item = Sitemty<EventFull>> + Send>>,
events_out: Box<dyn Events>,
scalar_conv: Box<dyn ValueFromBytes>,
emit_threshold: usize,
@@ -318,15 +316,14 @@ impl EventsDynStream {
scalar_type: ScalarType,
shape: Shape,
agg_kind: AggKind,
events_full: EventChunkerMultifile,
events_full: Pin<Box<dyn Stream<Item = Sitemty<EventFull>> + Send>>,
) -> Result<Self, Error> {
let st = &scalar_type;
let sh = &shape;
let ag = &agg_kind;
warn!("TODO EventsDynStream::new feed through transform");
// TODO do we need/want the empty item from here?
let events_out = items_2::empty::empty_events_dyn_ev(st, sh)?;
let scalar_conv = make_scalar_conv(st, sh, ag)?;
let scalar_conv = make_scalar_conv(st, sh, &agg_kind)?;
let emit_threshold = match &shape {
Shape::Scalar => 2048,
Shape::Wave(_) => 64,
@@ -335,7 +332,6 @@ impl EventsDynStream {
let ret = Self {
scalar_type,
shape,
agg_kind,
events_full,
events_out,
scalar_conv,
@@ -360,16 +356,13 @@ impl EventsDynStream {
if item.len() >= self.emit_threshold {
info!("handle_event_full item len {}", item.len());
}
for (((buf, &be), &ts), &pulse) in item
.blobs
.iter()
.zip(item.be.iter())
.zip(item.tss.iter())
.zip(item.pulses.iter())
{
for (i, ((&be, &ts), &pulse)) in item.be.iter().zip(item.tss.iter()).zip(item.pulses.iter()).enumerate() {
let buf = item
.data_decompressed(i)
.map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
let endian = if be { Endian::Big } else { Endian::Little };
self.scalar_conv
.convert(ts, pulse, buf, endian, self.events_out.as_mut())?;
.convert(ts, pulse, &buf, endian, self.events_out.as_mut())?;
}
Ok(())
}

View File

@@ -7,6 +7,7 @@ pub mod dataopen;
pub mod decode;
pub mod eventchunker;
pub mod eventchunkermultifile;
pub mod eventfilter;
pub mod frame;
pub mod gen;
pub mod index;
@@ -836,7 +837,7 @@ impl Stream for BlockingTaskIntoChannel {
}
fn blocking_task_into_channel(
path: PathBuf,
_path: PathBuf,
file: File,
disk_io_tune: DiskIoTune,
reqid: String,

View File

@@ -72,26 +72,38 @@ pub struct EventChunker {
dbg_path: PathBuf,
last_ts: u64,
expand: bool,
decomp_dt_histo: HistoLog2,
item_len_emit_histo: HistoLog2,
seen_before_range_count: usize,
seen_after_range_count: usize,
seen_events: usize,
unordered_count: usize,
repeated_ts_count: usize,
config_mismatch_discard: usize,
discard_count: usize,
discard_count_range: usize,
discard_count_scalar_type: usize,
discard_count_shape: usize,
discard_count_shape_derived: usize,
discard_count_shape_derived_err: usize,
log_items: VecDeque<LogItem>,
}
impl Drop for EventChunker {
fn drop(&mut self) {
// TODO collect somewhere
if self.config_mismatch_discard != 0 {
warn!("config_mismatch_discard {}", self.config_mismatch_discard);
}
debug!(
"EventChunker-stats {{ decomp_dt_histo: {:?}, item_len_emit_histo: {:?} }}",
self.decomp_dt_histo, self.item_len_emit_histo
concat!(
"EventChunker-stats {{ node_ix: {}, seen_events: {}, discard_count_range: {},",
" discard_count_scalar_type: {}, discard_count_shape: {},",
" discard_count_shape_derived: {}, discard_count_shape_derived_err: {},",
" item_len_emit_histo: {:?} }}",
),
self.node_ix,
self.seen_events,
self.discard_count_range,
self.discard_count_scalar_type,
self.discard_count_shape,
self.discard_count_shape_derived,
self.discard_count_shape_derived_err,
self.item_len_emit_histo
);
}
}
@@ -117,33 +129,6 @@ impl EventChunkerConf {
}
}
fn is_config_match(is_array: &bool, ele_count: &u64, fetch_info: &SfChFetchInfo) -> bool {
match fetch_info.shape() {
Shape::Scalar => {
if *is_array {
false
} else {
true
}
}
Shape::Wave(dim1count) => {
if (*dim1count as u64) != *ele_count {
false
} else {
true
}
}
Shape::Image(n1, n2) => {
let nt = (*n1 as u64) * (*n2 as u64);
if nt != *ele_count {
false
} else {
true
}
}
}
}
impl EventChunker {
pub fn self_name() -> &'static str {
std::any::type_name::<Self>()
@@ -186,14 +171,17 @@ impl EventChunker {
node_ix,
last_ts: 0,
expand,
decomp_dt_histo: HistoLog2::new(8),
item_len_emit_histo: HistoLog2::new(0),
seen_before_range_count: 0,
seen_after_range_count: 0,
seen_events: 0,
unordered_count: 0,
repeated_ts_count: 0,
config_mismatch_discard: 0,
discard_count: 0,
discard_count_range: 0,
discard_count_scalar_type: 0,
discard_count_shape: 0,
discard_count_shape_derived: 0,
discard_count_shape_derived_err: 0,
log_items: VecDeque::new(),
}
}
@@ -283,6 +271,7 @@ impl EventChunker {
self.need_min = len as u32;
break;
} else {
self.seen_events += 1;
let mut discard = false;
let _ttl = sl.read_i64::<BE>().unwrap();
let ts = sl.read_i64::<BE>().unwrap() as u64;
@@ -305,6 +294,7 @@ impl EventChunker {
}
if ts < self.last_ts {
discard = true;
self.discard_count_range += 1;
self.unordered_count += 1;
if self.unordered_count < 20 {
let msg = format!(
@@ -323,6 +313,7 @@ impl EventChunker {
self.last_ts = ts;
if ts >= self.range.end {
discard = true;
self.discard_count_range += 1;
self.seen_after_range_count += 1;
if !self.expand || self.seen_after_range_count >= 2 {
self.seen_beyond_range = true;
@@ -332,6 +323,7 @@ impl EventChunker {
}
if ts < self.range.beg {
discard = true;
self.discard_count_range += 1;
self.seen_before_range_count += 1;
if self.seen_before_range_count < 20 {
let msg = format!(
@@ -414,10 +406,12 @@ impl EventChunker {
Shape::Image(shape_lens[0], shape_lens[1])
} else if shape_dim == 0 {
discard = true;
self.discard_count_shape += 1;
// return Err(DataParseError::ShapedWithoutDims);
Shape::Scalar
} else {
discard = true;
self.discard_count_shape += 1;
// return Err(DataParseError::TooManyDims);
Shape::Scalar
}
@@ -436,11 +430,11 @@ impl EventChunker {
};
if self.fetch_info.scalar_type().ne(&scalar_type) {
discard = true;
self.discard_count_scalar_type += 1;
let msg = format!(
"scalar_type mismatch {:?} {:?} {:?}",
"scalar_type mismatch {:?} {:?}",
scalar_type,
self.fetch_info.scalar_type(),
self.dbg_path,
);
let item = LogItem::from_node(self.node_ix, Level::WARN, msg);
log_items.push(item);
@@ -450,12 +444,8 @@ impl EventChunker {
// especially for waveforms it will wrongly indicate scalar. So this is unusable.
if self.fetch_info.shape().ne(&shape_this) {
discard = true;
let msg = format!(
"shape mismatch {:?} {:?} {:?}",
shape_this,
self.fetch_info.shape(),
self.dbg_path,
);
self.discard_count_shape += 1;
let msg = format!("shape mismatch {:?} {:?}", shape_this, self.fetch_info.shape(),);
let item = LogItem::from_node(self.node_ix, Level::WARN, msg);
log_items.push(item);
}
@@ -465,7 +455,6 @@ impl EventChunker {
let n2 = len as u64 - n1 - 4;
let databuf = buf[p1 as usize..(p1 as usize + n2 as usize)].as_ref();
if discard {
self.discard_count += 1;
} else {
ret.push(
ts,
@@ -479,27 +468,25 @@ impl EventChunker {
match ret.shape_derived(ret.len() - 1, self.fetch_info.shape()) {
Ok(sh) => {
if sh.ne(self.fetch_info.shape()) {
self.discard_count += 1;
self.discard_count_shape_derived += 1;
ret.pop_back();
let msg = format!(
"shape_derived mismatch {:?} {:?} {:?} {:?}",
"shape_derived mismatch {:?} {:?} {:?}",
self.fetch_info.scalar_type(),
self.fetch_info.shape(),
sh,
self.dbg_path,
);
let item = LogItem::from_node(self.node_ix, Level::WARN, msg);
log_items.push(item);
}
}
Err(_) => {
self.discard_count += 1;
self.discard_count_shape_derived_err += 1;
ret.pop_back();
let msg = format!(
"shape_derived error {:?} {:?} {:?}",
"shape_derived error {:?} {:?}",
self.fetch_info.scalar_type(),
self.fetch_info.shape(),
self.dbg_path,
);
let item = LogItem::from_node(self.node_ix, Level::WARN, msg);
log_items.push(item);
@@ -526,7 +513,7 @@ impl Stream for EventChunker {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
'outer: loop {
loop {
break if self.completed {
panic!("EventChunker poll_next on completed");
} else if let Some(item) = self.log_items.pop_front() {
@@ -552,7 +539,7 @@ impl Stream for EventChunker {
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))))
} else {
trace!("sent_beyond_range non-complete");
continue 'outer;
continue;
}
} else if self.data_emit_complete {
let item = EventDataReadStats {
@@ -616,7 +603,7 @@ impl Stream for EventChunker {
}
Ready(None) => {
self.data_emit_complete = true;
continue 'outer;
continue;
}
Pending => Pending,
}

View File

@@ -0,0 +1,77 @@
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::streamitem::LogItem;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
use items_0::WithLen;
use items_2::eventfull::EventFull;
use netpod::Shape;
use std::collections::VecDeque;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use tracing::Level;
pub struct EventFullShapeFilter<INP> {
inp: INP,
shape_exp: Shape,
node_ix: usize,
log_items: VecDeque<LogItem>,
}
impl<INP> EventFullShapeFilter<INP> {
fn filter_item(&mut self, item: &mut EventFull) {
let node_ix = self.node_ix;
let p: Vec<_> = (0..item.len())
.map(|i| {
let sh = item.shape_derived(i, &self.shape_exp);
match sh {
Ok(sh) => {
if sh.ne(&self.shape_exp) {
let msg = format!("shape_derived mismatch {:?} {:?}", sh, self.shape_exp);
let item = LogItem::from_node(node_ix, Level::WARN, msg);
self.log_items.push_back(item);
false
} else {
true
}
}
Err(_) => {
let msg = format!("shape_derived mismatch {:?} {:?}", sh, self.shape_exp);
let item = LogItem::from_node(self.node_ix, Level::WARN, msg);
self.log_items.push_back(item);
false
}
}
})
.collect();
item.keep_ixs(&p);
}
}
impl<INP> Stream for EventFullShapeFilter<INP>
where
INP: Stream<Item = Sitemty<EventFull>> + Unpin,
{
type Item = Sitemty<EventFull>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
if let Some(item) = self.log_items.pop_front() {
Ready(Some(Ok(StreamItem::Log(item))))
} else {
match self.inp.poll_next_unpin(cx) {
Ready(Some(item)) => match item {
Ok(StreamItem::DataItem(RangeCompletableItem::Data(mut item))) => {
self.filter_item(&mut item);
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))))
}
x => Ready(Some(x)),
},
Ready(None) => Ready(None),
Pending => Pending,
}
}
}
}

View File

@@ -31,6 +31,7 @@ fn make_num_pipeline_stream_evs(
) -> Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>> {
let scalar_type = fetch_info.scalar_type().clone();
let shape = fetch_info.shape().clone();
let event_blobs = Box::pin(event_blobs);
let event_stream = match crate::decode::EventsDynStream::new(scalar_type, shape, agg_kind, event_blobs) {
Ok(k) => k,
Err(e) => {

View File

@@ -1,6 +1,6 @@
[package]
name = "err"
version = "0.0.4"
version = "0.0.5"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"
@@ -18,5 +18,8 @@ chrono = { version = "0.4.26", features = ["serde"] }
url = "2.4.0"
regex = "1.9.1"
http = "0.2.9"
thiserror = { path = "../../../thiserror" }
thiserror = "=0.0.1"
anyhow = "1.0"
[patch.crates-io]
thiserror = { git = "https://github.com/dominikwerder/thiserror.git" }

View File

@@ -5,9 +5,7 @@ use crate::response;
use crate::ReqCtx;
use bytes::BufMut;
use bytes::BytesMut;
use disk::eventchunker::EventChunkerConf;
use disk::merge::mergedblobsfromremotes::MergedBlobsFromRemotes;
use disk::raw::conn::make_event_blobs_stream;
use futures_util::Stream;
use futures_util::StreamExt;
use http::Method;
@@ -27,7 +25,6 @@ use netpod::query::api1::Api1Query;
use netpod::range::evrange::NanoRange;
use netpod::timeunits::SEC;
use netpod::Api1WarningStats;
use netpod::ByteSize;
use netpod::ChannelSearchQuery;
use netpod::ChannelSearchResult;
use netpod::ChannelTypeConfigGen;
@@ -523,6 +520,7 @@ pub struct DataApiPython3DataStream {
current_fetch_info: Option<SfChFetchInfo>,
node_config: NodeConfigCached,
chan_stream: Option<Pin<Box<dyn Stream<Item = Sitemty<EventFull>> + Send>>>,
#[allow(unused)]
disk_io_tune: DiskIoTune,
do_decompress: bool,
event_count: usize,

View File

@@ -1388,7 +1388,10 @@ impl Api4MapPulseHttpFunction {
let ret = match Self::find_timestamp(q, ncc).await {
Ok(Some(val)) => Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&val)?))?),
Ok(None) => Ok(response(StatusCode::NO_CONTENT).body(Body::empty())?),
Err(e) => Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())?),
Err(e) => {
error!("find_timestamp {e}");
Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())?)
}
};
let ts2 = Instant::now();
let dt = ts2.duration_since(ts1);
@@ -1446,7 +1449,10 @@ impl Api4MapPulse2HttpFunction {
Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&res)?))?)
}
Ok(None) => Ok(response(StatusCode::NO_CONTENT).body(Body::empty())?),
Err(e) => Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())?),
Err(e) => {
error!("find_timestamp {e}");
Ok(response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())?)
}
};
let ts2 = Instant::now();
let dt = ts2.duration_since(ts1);

View File

@@ -18,7 +18,6 @@ pub use futures_util;
use collect_s::Collectable;
use container::ByteEstimate;
use netpod::range::evrange::SeriesRange;
use std::any::Any;
use std::collections::VecDeque;
use std::fmt;

View File

@@ -136,6 +136,20 @@ impl EventFull {
self.shapes.pop_back();
self.comps.pop_back();
}
pub fn keep_ixs(&mut self, ixs: &[bool]) {
fn inner<T>(v: &mut VecDeque<T>, ixs: &[bool]) {
let mut it = ixs.iter();
v.retain_mut(move |_| it.next().map(Clone::clone).unwrap_or(false));
}
inner(&mut self.tss, ixs);
inner(&mut self.pulses, ixs);
inner(&mut self.blobs, ixs);
inner(&mut self.scalar_types, ixs);
inner(&mut self.be, ixs);
inner(&mut self.shapes, ixs);
inner(&mut self.comps, ixs);
}
}
impl FrameTypeInnerStatic for EventFull {

View File

@@ -1437,6 +1437,7 @@ impl Dim0Index for TsNano {
}
fn prebin_patch_len_for(i: usize) -> Self {
let _ = i;
todo!()
}
@@ -1507,6 +1508,7 @@ impl Dim0Index for PulseId {
}
fn prebin_patch_len_for(i: usize) -> Self {
let _ = i;
todo!()
}
@@ -1543,6 +1545,7 @@ const PREBIN_TIME_BIN_LEN_VAR0: [u64; 3] = [MIN * 1, HOUR * 1, DAY];
const PREBIN_PULSE_BIN_LEN_VAR0: [u64; 4] = [100, 10000, 1000000, 100000000];
#[allow(unused)]
const PATCH_T_LEN_OPTIONS_SCALAR: [u64; 3] = [
//
//MIN * 60,
@@ -1551,6 +1554,7 @@ const PATCH_T_LEN_OPTIONS_SCALAR: [u64; 3] = [
DAY * 64,
];
#[allow(unused)]
const PATCH_T_LEN_OPTIONS_WAVE: [u64; 3] = [
//
//MIN * 10,
@@ -1606,10 +1610,12 @@ const PULSE_BIN_THRESHOLDS: [u64; 25] = [
800000, 1000000, 2000000, 4000000, 8000000, 10000000,
];
#[allow(unused)]
const fn time_bin_threshold_at(i: usize) -> TsNano {
TsNano(TIME_BIN_THRESHOLDS[i])
}
#[allow(unused)]
const fn pulse_bin_threshold_at(i: usize) -> PulseId {
PulseId(PULSE_BIN_THRESHOLDS[i])
}
@@ -1726,17 +1732,17 @@ impl PreBinnedPatchCoordEnum {
}
impl FromUrl for PreBinnedPatchCoordEnum {
fn from_url(url: &Url) -> Result<Self, Error> {
fn from_url(_url: &Url) -> Result<Self, Error> {
todo!()
}
fn from_pairs(pairs: &BTreeMap<String, String>) -> Result<Self, Error> {
fn from_pairs(_pairs: &BTreeMap<String, String>) -> Result<Self, Error> {
todo!()
}
}
impl AppendToUrl for PreBinnedPatchCoordEnum {
fn append_to_url(&self, url: &mut Url) {
fn append_to_url(&self, _url: &mut Url) {
todo!()
}
}
@@ -1755,8 +1761,8 @@ where
T: Dim0Index,
{
pub fn edges(&self) -> Vec<u64> {
let mut ret = Vec::new();
err::todo();
let ret = Vec::new();
ret
}
@@ -1855,6 +1861,7 @@ where
}
pub fn get_range(&self, ix: u32) -> NanoRange {
let _ = ix;
/*NanoRange {
beg: (self.offset + ix as u64) * self.grid_spec.bin_t_len,
end: (self.offset + ix as u64 + 1) * self.grid_spec.bin_t_len,

View File

@@ -3,17 +3,14 @@ pub mod datetime;
pub mod prebinned;
use crate::get_url_query_pairs;
use crate::is_false;
use crate::log::*;
use crate::AggKind;
use crate::AppendToUrl;
use crate::ByteSize;
use crate::FromUrl;
use crate::HasBackend;
use crate::HasTimeout;
use crate::NanoRange;
use crate::PulseRange;
use crate::SeriesRange;
use crate::SfDbChannel;
use crate::ToNanos;
use crate::DATETIME_FMT_6MS;

View File

@@ -5,14 +5,6 @@ use serde::{Deserialize, Serialize};
use std::fmt;
use std::time::Duration;
fn bool_true() -> bool {
true
}
fn bool_is_true(x: &bool) -> bool {
*x
}
#[derive(Debug, PartialEq, Serialize, Deserialize)]
pub struct Api1Range {
#[serde(rename = "type", default, skip_serializing_if = "String::is_empty")]

View File

@@ -105,7 +105,17 @@ impl Collect {
}
},
StreamItem::Log(item) => {
trace!("collect log {:?}", item);
if item.level == Level::ERROR {
error!("node {} msg {}", item.node_ix, item.msg);
} else if item.level == Level::WARN {
warn!("node {} msg {}", item.node_ix, item.msg);
} else if item.level == Level::INFO {
info!("node {} msg {}", item.node_ix, item.msg);
} else if item.level == Level::DEBUG {
debug!("node {} msg {}", item.node_ix, item.msg);
} else if item.level == Level::TRACE {
trace!("node {} msg {}", item.node_ix, item.msg);
}
Ok(())
}
StreamItem::Stats(item) => {

View File

@@ -225,7 +225,7 @@ where
ITY: Mergeable,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct(Self::type_name()).finish()
f.debug_struct("RangeFilter2").field("stats", &self.stats).finish()
}
}
@@ -235,6 +235,7 @@ where
ITY: Mergeable,
{
fn drop(&mut self) {
debug!("drop {} {:?}", Self::type_name(), self);
// Self::type_name()
debug!("drop {:?}", self);
}
}