From 4caa133ad782e231b6a89b0dd26c28017182465e Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Mon, 3 May 2021 22:23:40 +0200 Subject: [PATCH] WIP --- dbconn/src/lib.rs | 4 ++-- disk/src/agg.rs | 18 +++++++++++++-- disk/src/agg/eventbatch.rs | 9 ++++++++ disk/src/agg/scalarbinbatch.rs | 41 +++++++++++++++++++++------------- disk/src/cache/pbv.rs | 1 + disk/src/dataopen.rs | 15 +++++++------ disk/src/eventblobs.rs | 21 +++++++++++++++-- disk/src/eventchunker.rs | 35 ++++++----------------------- disk/src/frame/inmem.rs | 5 ++--- disk/src/merge.rs | 34 +++++++++++++++++++--------- disk/src/raw/conn.rs | 10 +++------ 11 files changed, 116 insertions(+), 77 deletions(-) diff --git a/dbconn/src/lib.rs b/dbconn/src/lib.rs index 4be3bf9..4c74199 100644 --- a/dbconn/src/lib.rs +++ b/dbconn/src/lib.rs @@ -17,9 +17,9 @@ pub async fn channel_exists(channel: &Channel, node_config: &NodeConfigCached) - let rows = cl .query("select rowid from channels where name = $1::text", &[&channel.name]) .await?; - info!("channel_exists {} rows", rows.len()); + debug!("channel_exists {} rows", rows.len()); for row in rows { - info!( + debug!( " db on channel search: {:?} {:?} {:?}", row, row.columns(), diff --git a/disk/src/agg.rs b/disk/src/agg.rs index 75e259f..e4804df 100644 --- a/disk/src/agg.rs +++ b/disk/src/agg.rs @@ -46,6 +46,7 @@ pub trait AggregatableTdim { pub struct ValuesDim0 { tss: Vec, values: Vec>, + // TODO add the stats and flags } impl std::fmt::Debug for ValuesDim0 { @@ -71,9 +72,13 @@ impl AggregatableXdim1Bin for ValuesDim1 { avgs: Vec::with_capacity(self.tss.len()), event_data_read_stats: EventDataReadStats::new(), values_extract_stats: ValuesExtractStats::new(), + range_complete_observed: false, }; ret.event_data_read_stats.trans(&mut self.event_data_read_stats); ret.values_extract_stats.trans(&mut self.values_extract_stats); + if self.range_complete_observed { + ret.range_complete_observed = true; + } for i1 in 0..self.tss.len() { let ts = self.tss[i1]; let mut min = f32::MAX; @@ -126,6 +131,7 @@ pub struct ValuesDim1 { pub values: Vec>, pub event_data_read_stats: EventDataReadStats, pub values_extract_stats: ValuesExtractStats, + pub range_complete_observed: bool, } impl ValuesDim1 { @@ -135,6 +141,7 @@ impl ValuesDim1 { values: vec![], event_data_read_stats: EventDataReadStats::new(), values_extract_stats: ValuesExtractStats::new(), + range_complete_observed: false, } } } @@ -162,9 +169,13 @@ impl AggregatableXdim1Bin for ValuesDim0 { avgs: Vec::with_capacity(self.tss.len()), event_data_read_stats: EventDataReadStats::new(), values_extract_stats: ValuesExtractStats::new(), + range_complete_observed: false, }; // TODO stats are not yet in ValuesDim0 err::todoval::(); + //if self.range_complete_observed { + // ret.range_complete_observed = true; + //} for i1 in 0..self.tss.len() { let ts = self.tss[i1]; let mut min = f32::MAX; @@ -323,7 +334,7 @@ where pub struct Dim1F32Stream where - S: Stream>, + S: Stream, { inp: S, errored: bool, @@ -332,7 +343,7 @@ where impl Dim1F32Stream where - S: Stream>, + S: Stream, { pub fn new(inp: S) -> Self { Self { @@ -363,6 +374,9 @@ where let inst1 = Instant::now(); let mut ret = ValuesDim1::empty(); use ScalarType::*; + if k.end_of_range_observed { + ret.range_complete_observed = true; + } for i1 in 0..k.tss.len() { // TODO iterate sibling arrays after single bounds check let ty = &k.scalar_types[i1]; diff --git a/disk/src/agg/eventbatch.rs b/disk/src/agg/eventbatch.rs index 40f7f4e..91ceebc 100644 --- a/disk/src/agg/eventbatch.rs +++ b/disk/src/agg/eventbatch.rs @@ -15,6 +15,7 @@ pub struct MinMaxAvgScalarEventBatch { pub avgs: Vec, pub event_data_read_stats: EventDataReadStats, pub values_extract_stats: ValuesExtractStats, + pub range_complete_observed: bool, } impl MinMaxAvgScalarEventBatch { @@ -26,8 +27,10 @@ impl MinMaxAvgScalarEventBatch { avgs: vec![], event_data_read_stats: EventDataReadStats::new(), values_extract_stats: ValuesExtractStats::new(), + range_complete_observed: false, } } + #[allow(dead_code)] pub fn old_from_full_frame(buf: &Bytes) -> Self { info!("construct MinMaxAvgScalarEventBatch from full frame len {}", buf.len()); @@ -151,6 +154,7 @@ pub struct MinMaxAvgScalarEventBatchAggregator { sum: f32, event_data_read_stats: EventDataReadStats, values_extract_stats: ValuesExtractStats, + range_complete_observed: bool, } impl MinMaxAvgScalarEventBatchAggregator { @@ -164,6 +168,7 @@ impl MinMaxAvgScalarEventBatchAggregator { count: 0, event_data_read_stats: EventDataReadStats::new(), values_extract_stats: ValuesExtractStats::new(), + range_complete_observed: false, } } } @@ -206,6 +211,9 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator { } self.event_data_read_stats.trans(&mut v.event_data_read_stats); self.values_extract_stats.trans(&mut v.values_extract_stats); + if v.range_complete_observed { + self.range_complete_observed = true; + } for i1 in 0..v.tss.len() { let ts = v.tss[i1]; if ts < self.ts1 { @@ -259,6 +267,7 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator { avgs: vec![avg], event_data_read_stats: std::mem::replace(&mut self.event_data_read_stats, EventDataReadStats::new()), values_extract_stats: std::mem::replace(&mut self.values_extract_stats, ValuesExtractStats::new()), + range_complete_observed: self.range_complete_observed, } } } diff --git a/disk/src/agg/scalarbinbatch.rs b/disk/src/agg/scalarbinbatch.rs index 93c2dc2..4e832d9 100644 --- a/disk/src/agg/scalarbinbatch.rs +++ b/disk/src/agg/scalarbinbatch.rs @@ -17,6 +17,7 @@ pub struct MinMaxAvgScalarBinBatch { pub avgs: Vec, pub event_data_read_stats: EventDataReadStats, pub values_extract_stats: ValuesExtractStats, + pub range_complete_observed: bool, } impl MinMaxAvgScalarBinBatch { @@ -30,6 +31,7 @@ impl MinMaxAvgScalarBinBatch { avgs: vec![], event_data_read_stats: EventDataReadStats::new(), values_extract_stats: ValuesExtractStats::new(), + range_complete_observed: false, } } @@ -114,6 +116,23 @@ impl MinMaxAvgScalarBinBatch { } } +impl std::fmt::Debug for MinMaxAvgScalarBinBatch { + fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { + write!( + fmt, + "MinMaxAvgScalarBinBatch count {} ts1s {:?} ts2s {:?} counts {:?} avgs {:?} EDS {:?} VXS {:?} COMP {}", + self.ts1s.len(), + self.ts1s.iter().map(|k| k / SEC).collect::>(), + self.ts2s.iter().map(|k| k / SEC).collect::>(), + self.counts, + self.avgs, + self.event_data_read_stats, + self.values_extract_stats, + self.range_complete_observed, + ) + } +} + impl FitsInside for MinMaxAvgScalarBinBatch { fn fits_inside(&self, range: NanoRange) -> Fits { if self.ts1s.is_empty() { @@ -168,22 +187,6 @@ impl MinMaxAvgScalarBinBatch { } } -impl std::fmt::Debug for MinMaxAvgScalarBinBatch { - fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { - write!( - fmt, - "MinMaxAvgScalarBinBatch count {} ts1s {:?} ts2s {:?} counts {:?} avgs {:?} EDS {:?} VXS {:?}", - self.ts1s.len(), - self.ts1s.iter().map(|k| k / SEC).collect::>(), - self.ts2s.iter().map(|k| k / SEC).collect::>(), - self.counts, - self.avgs, - self.event_data_read_stats, - self.values_extract_stats, - ) - } -} - impl AggregatableXdim1Bin for MinMaxAvgScalarBinBatch { type Output = MinMaxAvgScalarBinBatch; fn into_agg(self) -> Self::Output { @@ -209,6 +212,7 @@ pub struct MinMaxAvgScalarBinBatchAggregator { sumc: u64, event_data_read_stats: EventDataReadStats, values_extract_stats: ValuesExtractStats, + range_complete_observed: bool, } impl MinMaxAvgScalarBinBatchAggregator { @@ -223,6 +227,7 @@ impl MinMaxAvgScalarBinBatchAggregator { sumc: 0, event_data_read_stats: EventDataReadStats::new(), values_extract_stats: ValuesExtractStats::new(), + range_complete_observed: false, } } } @@ -255,6 +260,9 @@ impl AggregatorTdim for MinMaxAvgScalarBinBatchAggregator { fn ingest(&mut self, v: &mut Self::InputValue) { self.event_data_read_stats.trans(&mut v.event_data_read_stats); self.values_extract_stats.trans(&mut v.values_extract_stats); + if v.range_complete_observed { + self.range_complete_observed = true; + } for i1 in 0..v.ts1s.len() { let ts1 = v.ts1s[i1]; let ts2 = v.ts2s[i1]; @@ -289,6 +297,7 @@ impl AggregatorTdim for MinMaxAvgScalarBinBatchAggregator { avgs: vec![avg], event_data_read_stats: std::mem::replace(&mut self.event_data_read_stats, EventDataReadStats::new()), values_extract_stats: std::mem::replace(&mut self.values_extract_stats, ValuesExtractStats::new()), + range_complete_observed: self.range_complete_observed, } } } diff --git a/disk/src/cache/pbv.rs b/disk/src/cache/pbv.rs index 553dabb..20355ad 100644 --- a/disk/src/cache/pbv.rs +++ b/disk/src/cache/pbv.rs @@ -113,6 +113,7 @@ impl PreBinnedValueStream { .filter_map(|k| match k { Ok(k) => ready(Some(k)), Err(e) => { + // TODO Reconsider error handling here: error!("{:?}", e); ready(None) } diff --git a/disk/src/dataopen.rs b/disk/src/dataopen.rs index ed2b963..d89f19b 100644 --- a/disk/src/dataopen.rs +++ b/disk/src/dataopen.rs @@ -69,11 +69,11 @@ async fn open_files_inner( if ts_bin.ns + channel_config.time_bin_size.ns <= range.beg { continue; } - info!("opening tb {:?}", &tb); + debug!("opening tb {:?}", &tb); let path = paths::datapath(tb, &channel_config, &node); - info!("opening path {:?}", &path); + debug!("opening path {:?}", &path); let mut file = OpenOptions::new().read(true).open(&path).await?; - info!("opened file {:?} {:?}", &path, &file); + debug!("opened file {:?} {:?}", &path, &file); { let index_path = paths::index_path(ts_bin, &channel_config, &node)?; @@ -103,15 +103,15 @@ async fn open_files_inner( } let mut buf = BytesMut::with_capacity(meta.len() as usize); buf.resize(buf.capacity(), 0); - info!("read exact index file {} {}", buf.len(), buf.len() % 16); + debug!("read exact index file {} {}", buf.len(), buf.len() % 16); index_file.read_exact(&mut buf).await?; match find_ge(range.beg, &buf[2..])? { Some(o) => { - info!("FOUND ts IN INDEX: {:?}", o); + debug!("FOUND ts IN INDEX: {:?}", o); file.seek(SeekFrom::Start(o.1)).await?; } None => { - info!("NOT FOUND IN INDEX"); + debug!("NOT FOUND IN INDEX"); file.seek(SeekFrom::End(0)).await?; } } @@ -132,7 +132,8 @@ async fn open_files_inner( chtx.send(Ok(file)).await?; } - warn!("OPEN FILES LOOP DONE"); + // TODO keep track of number of running + debug!("open_files_inner done"); Ok(()) } diff --git a/disk/src/eventblobs.rs b/disk/src/eventblobs.rs index 19ddb52..3685523 100644 --- a/disk/src/eventblobs.rs +++ b/disk/src/eventblobs.rs @@ -15,6 +15,8 @@ pub struct EventBlobsComplete { evs: Option, buffer_size: usize, range: NanoRange, + errored: bool, + completed: bool, } impl EventBlobsComplete { @@ -25,6 +27,8 @@ impl EventBlobsComplete { buffer_size, channel_config, range, + errored: false, + completed: false, } } } @@ -34,6 +38,13 @@ impl Stream for EventBlobsComplete { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { use Poll::*; + if self.completed { + panic!("EventBlobsComplete poll_next on completed"); + } + if self.errored { + self.completed = true; + return Ready(None); + } 'outer: loop { let z = match &mut self.evs { Some(evs) => match evs.poll_next_unpin(cx) { @@ -53,9 +64,15 @@ impl Stream for EventBlobsComplete { self.evs = Some(chunker); continue 'outer; } - Err(e) => Ready(Some(Err(e))), + Err(e) => { + self.errored = true; + Ready(Some(Err(e))) + } }, - Ready(None) => Ready(None), + Ready(None) => { + self.completed = true; + Ready(None) + } Pending => Pending, }, }; diff --git a/disk/src/eventchunker.rs b/disk/src/eventchunker.rs index f009712..eec76ef 100644 --- a/disk/src/eventchunker.rs +++ b/disk/src/eventchunker.rs @@ -108,12 +108,7 @@ impl EventChunker { let len = sl.read_i32::().unwrap(); assert!(len >= 20 && len < 1024 * 1024 * 10); let len = len as u32; - if (buf.len() as u32) < 20 { - // TODO gather stats about how often we find not enough input - //info!("parse_buf not enough B"); - self.need_min = len as u32; - break; - } else if (buf.len() as u32) < len { + if (buf.len() as u32) < len { self.need_min = len as u32; break; } else { @@ -125,6 +120,8 @@ impl EventChunker { let pulse = sl.read_i64::().unwrap() as u64; if ts >= self.range.end { self.seen_beyond_range = true; + ret.end_of_range_observed = true; + info!("END OF RANGE OBSERVED"); break; } if ts < self.range.beg { @@ -140,6 +137,7 @@ impl EventChunker { let type_flags = sl.read_u8().unwrap(); let type_index = sl.read_u8().unwrap(); assert!(type_index <= 13); + let scalar_type = ScalarType::from_dtype_index(type_index)?; use super::dtflags::*; let is_compressed = type_flags & COMPRESSION != 0; let is_array = type_flags & ARRAY != 0; @@ -166,7 +164,7 @@ impl EventChunker { assert!(value_bytes < 1024 * 256); assert!(block_size < 1024 * 32); //let value_bytes = value_bytes; - let type_size = type_size(type_index); + let type_size = scalar_type.bytes() as u32; let ele_count = value_bytes / type_size as u64; let ele_size = type_size; match self.channel_config.shape { @@ -236,26 +234,6 @@ impl EventChunker { } } -fn type_size(ix: u8) -> u32 { - match ix { - 0 => 1, - 1 => 1, - 2 => 1, - 3 => 1, - 4 => 2, - 5 => 2, - 6 => 2, - 7 => 4, - 8 => 4, - 9 => 8, - 10 => 8, - 11 => 4, - 12 => 8, - 13 => 1, - _ => panic!("logic"), - } -} - struct ParseResult { events: EventFull, } @@ -278,7 +256,6 @@ impl Stream for EventChunker { } match self.inp.poll_next_unpin(cx) { Ready(Some(Ok(mut fcr))) => { - trace!("EventChunker got buffer len {}", fcr.buf.len()); let r = self.parse_buf(&mut fcr.buf); match r { Ok(res) => { @@ -323,6 +300,7 @@ pub struct EventFull { pub decomps: Vec>, pub scalar_types: Vec, pub event_data_read_stats: EventDataReadStats, + pub end_of_range_observed: bool, } impl EventFull { @@ -333,6 +311,7 @@ impl EventFull { decomps: vec![], scalar_types: vec![], event_data_read_stats: EventDataReadStats::new(), + end_of_range_observed: false, } } diff --git a/disk/src/frame/inmem.rs b/disk/src/frame/inmem.rs index d8025d5..64badc4 100644 --- a/disk/src/frame/inmem.rs +++ b/disk/src/frame/inmem.rs @@ -144,7 +144,7 @@ where if self.bufcap < nl { // TODO count cases in production let n = 2 * nl; - warn!("Adjust bufcap old {} new {}", self.bufcap, n); + debug!("Adjust bufcap old {} new {}", self.bufcap, n); self.bufcap = n; } if nb < nl { @@ -244,7 +244,6 @@ where type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - trace!("InMemoryFrameAsyncReadStream poll_next"); use Poll::*; assert!(!self.completed); if self.errored { @@ -287,7 +286,7 @@ where let n2 = self.buf.len(); if n2 != 0 { warn!( - "InMemoryFrameAsyncReadStream n2 != 0 n2 {} consumed {} ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~", + "InMemoryFrameAsyncReadStream n2 != 0 n2 {} consumed {}", n2, self.inp_bytes_consumed ); } diff --git a/disk/src/merge.rs b/disk/src/merge.rs index 2338c35..26ef0dc 100644 --- a/disk/src/merge.rs +++ b/disk/src/merge.rs @@ -133,12 +133,6 @@ enum CurVal { Val(ValuesDim1), } -/* - -============== MergedMinMaxAvgScalarStream - -*/ - pub struct MergedMinMaxAvgScalarStream where S: Stream>, @@ -150,6 +144,8 @@ where completed: bool, batch: MinMaxAvgScalarEventBatch, ts_last_emit: u64, + range_complete_observed: Vec, + range_complete_observed_all: bool, } impl MergedMinMaxAvgScalarStream @@ -170,6 +166,8 @@ where completed: false, batch: MinMaxAvgScalarEventBatch::empty(), ts_last_emit: 0, + range_complete_observed: vec![false; n], + range_complete_observed_all: false, } } } @@ -198,6 +196,16 @@ where Ready(Some(Ok(mut k))) => { self.batch.event_data_read_stats.trans(&mut k.event_data_read_stats); self.batch.values_extract_stats.trans(&mut k.values_extract_stats); + if k.range_complete_observed { + self.range_complete_observed[i1] = true; + let d = self.range_complete_observed.iter().filter(|&&k| k).count(); + if d == self.range_complete_observed.len() { + self.range_complete_observed_all = true; + info!("\n\n:::::: range_complete d {} COMPLETE", d); + } else { + info!("\n\n:::::: range_complete d {}", d); + } + } self.current[i1] = MergedMinMaxAvgScalarStreamCurVal::Val(k); } Ready(Some(Err(e))) => { @@ -242,11 +250,14 @@ where } if lowest_ix == usize::MAX { if self.batch.tss.len() != 0 { - let k = std::mem::replace(&mut self.batch, MinMaxAvgScalarEventBatch::empty()); - info!("```````````````` MergedMinMaxAvgScalarStream emit Ready(Some( current batch ))"); + let mut k = std::mem::replace(&mut self.batch, MinMaxAvgScalarEventBatch::empty()); + if self.range_complete_observed_all { + k.range_complete_observed = true; + } + info!("MergedMinMaxAvgScalarStream no more lowest emit Ready(Some( current batch ))"); break Ready(Some(Ok(k))); } else { - info!("```````````````` MergedMinMaxAvgScalarStream emit Ready(None)"); + info!("MergedMinMaxAvgScalarStream no more lowest emit Ready(None)"); self.completed = true; break Ready(None); } @@ -266,7 +277,10 @@ where self.ixs[lowest_ix] += 1; } if self.batch.tss.len() >= 64 { - let k = std::mem::replace(&mut self.batch, MinMaxAvgScalarEventBatch::empty()); + let mut k = std::mem::replace(&mut self.batch, MinMaxAvgScalarEventBatch::empty()); + if self.range_complete_observed_all { + k.range_complete_observed = true; + } break Ready(Some(Ok(k))); } } diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index 88b6d58..e88b91f 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -89,7 +89,7 @@ async fn raw_conn_handler_inner_try( addr: SocketAddr, node_config: &NodeConfigCached, ) -> Result<(), ConnErr> { - info!("raw_conn_handler SPAWNED for {:?}", addr); + debug!("raw_conn_handler SPAWNED for {:?}", addr); let (netin, mut netout) = stream.into_split(); let mut h = InMemoryFrameAsyncReadStream::new(netin); let mut frames = vec![]; @@ -100,7 +100,6 @@ async fn raw_conn_handler_inner_try( { match k { Ok(k) => { - info!(". . . . . . . . . . . . . . . . . . . . . . . . . . raw_conn_handler FRAME RECV"); frames.push(k); } Err(e) => { @@ -116,7 +115,6 @@ async fn raw_conn_handler_inner_try( Ok(k) => k, Err(e) => return Err((e, netout).into()), }; - trace!("json: {}", qitem.0); let res: Result = serde_json::from_str(&qitem.0); let evq = match res { Ok(k) => k, @@ -129,7 +127,6 @@ async fn raw_conn_handler_inner_try( Ok(_) => (), Err(e) => return Err((e, netout))?, } - debug!("REQUEST {:?}", evq); let range = &evq.range; let channel_config = match read_local_config(&evq.channel, &node_config.node).await { Ok(k) => k, @@ -139,7 +136,7 @@ async fn raw_conn_handler_inner_try( Ok(k) => k, Err(e) => return Err((e, netout))?, }; - info!("found config entry {:?}", entry); + debug!("found config entry {:?}", entry); let shape = match &entry.shape { Some(lens) => { @@ -171,12 +168,11 @@ async fn raw_conn_handler_inner_try( // TODO use the requested buffer size buffer_size: 1024 * 4, }; - let buffer_size = 1024 * 4; let mut s1 = EventBlobsComplete::new( range.clone(), query.channel_config.clone(), node_config.node.clone(), - buffer_size, + query.buffer_size as usize, ) .into_dim_1_f32_stream() .into_binned_x_bins_1();