This commit is contained in:
Dominik Werder
2021-05-03 22:23:40 +02:00
parent c5b5986a28
commit 4caa133ad7
11 changed files with 116 additions and 77 deletions

View File

@@ -17,9 +17,9 @@ pub async fn channel_exists(channel: &Channel, node_config: &NodeConfigCached) -
let rows = cl
.query("select rowid from channels where name = $1::text", &[&channel.name])
.await?;
info!("channel_exists {} rows", rows.len());
debug!("channel_exists {} rows", rows.len());
for row in rows {
info!(
debug!(
" db on channel search: {:?} {:?} {:?}",
row,
row.columns(),

View File

@@ -46,6 +46,7 @@ pub trait AggregatableTdim {
pub struct ValuesDim0 {
tss: Vec<u64>,
values: Vec<Vec<f32>>,
// TODO add the stats and flags
}
impl std::fmt::Debug for ValuesDim0 {
@@ -71,9 +72,13 @@ impl AggregatableXdim1Bin for ValuesDim1 {
avgs: Vec::with_capacity(self.tss.len()),
event_data_read_stats: EventDataReadStats::new(),
values_extract_stats: ValuesExtractStats::new(),
range_complete_observed: false,
};
ret.event_data_read_stats.trans(&mut self.event_data_read_stats);
ret.values_extract_stats.trans(&mut self.values_extract_stats);
if self.range_complete_observed {
ret.range_complete_observed = true;
}
for i1 in 0..self.tss.len() {
let ts = self.tss[i1];
let mut min = f32::MAX;
@@ -126,6 +131,7 @@ pub struct ValuesDim1 {
pub values: Vec<Vec<f32>>,
pub event_data_read_stats: EventDataReadStats,
pub values_extract_stats: ValuesExtractStats,
pub range_complete_observed: bool,
}
impl ValuesDim1 {
@@ -135,6 +141,7 @@ impl ValuesDim1 {
values: vec![],
event_data_read_stats: EventDataReadStats::new(),
values_extract_stats: ValuesExtractStats::new(),
range_complete_observed: false,
}
}
}
@@ -162,9 +169,13 @@ impl AggregatableXdim1Bin for ValuesDim0 {
avgs: Vec::with_capacity(self.tss.len()),
event_data_read_stats: EventDataReadStats::new(),
values_extract_stats: ValuesExtractStats::new(),
range_complete_observed: false,
};
// TODO stats are not yet in ValuesDim0
err::todoval::<u32>();
//if self.range_complete_observed {
// ret.range_complete_observed = true;
//}
for i1 in 0..self.tss.len() {
let ts = self.tss[i1];
let mut min = f32::MAX;
@@ -323,7 +334,7 @@ where
pub struct Dim1F32Stream<S>
where
S: Stream<Item = Result<EventFull, Error>>,
S: Stream,
{
inp: S,
errored: bool,
@@ -332,7 +343,7 @@ where
impl<S> Dim1F32Stream<S>
where
S: Stream<Item = Result<EventFull, Error>>,
S: Stream,
{
pub fn new(inp: S) -> Self {
Self {
@@ -363,6 +374,9 @@ where
let inst1 = Instant::now();
let mut ret = ValuesDim1::empty();
use ScalarType::*;
if k.end_of_range_observed {
ret.range_complete_observed = true;
}
for i1 in 0..k.tss.len() {
// TODO iterate sibling arrays after single bounds check
let ty = &k.scalar_types[i1];

View File

@@ -15,6 +15,7 @@ pub struct MinMaxAvgScalarEventBatch {
pub avgs: Vec<f32>,
pub event_data_read_stats: EventDataReadStats,
pub values_extract_stats: ValuesExtractStats,
pub range_complete_observed: bool,
}
impl MinMaxAvgScalarEventBatch {
@@ -26,8 +27,10 @@ impl MinMaxAvgScalarEventBatch {
avgs: vec![],
event_data_read_stats: EventDataReadStats::new(),
values_extract_stats: ValuesExtractStats::new(),
range_complete_observed: false,
}
}
#[allow(dead_code)]
pub fn old_from_full_frame(buf: &Bytes) -> Self {
info!("construct MinMaxAvgScalarEventBatch from full frame len {}", buf.len());
@@ -151,6 +154,7 @@ pub struct MinMaxAvgScalarEventBatchAggregator {
sum: f32,
event_data_read_stats: EventDataReadStats,
values_extract_stats: ValuesExtractStats,
range_complete_observed: bool,
}
impl MinMaxAvgScalarEventBatchAggregator {
@@ -164,6 +168,7 @@ impl MinMaxAvgScalarEventBatchAggregator {
count: 0,
event_data_read_stats: EventDataReadStats::new(),
values_extract_stats: ValuesExtractStats::new(),
range_complete_observed: false,
}
}
}
@@ -206,6 +211,9 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator {
}
self.event_data_read_stats.trans(&mut v.event_data_read_stats);
self.values_extract_stats.trans(&mut v.values_extract_stats);
if v.range_complete_observed {
self.range_complete_observed = true;
}
for i1 in 0..v.tss.len() {
let ts = v.tss[i1];
if ts < self.ts1 {
@@ -259,6 +267,7 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator {
avgs: vec![avg],
event_data_read_stats: std::mem::replace(&mut self.event_data_read_stats, EventDataReadStats::new()),
values_extract_stats: std::mem::replace(&mut self.values_extract_stats, ValuesExtractStats::new()),
range_complete_observed: self.range_complete_observed,
}
}
}

View File

@@ -17,6 +17,7 @@ pub struct MinMaxAvgScalarBinBatch {
pub avgs: Vec<f32>,
pub event_data_read_stats: EventDataReadStats,
pub values_extract_stats: ValuesExtractStats,
pub range_complete_observed: bool,
}
impl MinMaxAvgScalarBinBatch {
@@ -30,6 +31,7 @@ impl MinMaxAvgScalarBinBatch {
avgs: vec![],
event_data_read_stats: EventDataReadStats::new(),
values_extract_stats: ValuesExtractStats::new(),
range_complete_observed: false,
}
}
@@ -114,6 +116,23 @@ impl MinMaxAvgScalarBinBatch {
}
}
impl std::fmt::Debug for MinMaxAvgScalarBinBatch {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
fmt,
"MinMaxAvgScalarBinBatch count {} ts1s {:?} ts2s {:?} counts {:?} avgs {:?} EDS {:?} VXS {:?} COMP {}",
self.ts1s.len(),
self.ts1s.iter().map(|k| k / SEC).collect::<Vec<_>>(),
self.ts2s.iter().map(|k| k / SEC).collect::<Vec<_>>(),
self.counts,
self.avgs,
self.event_data_read_stats,
self.values_extract_stats,
self.range_complete_observed,
)
}
}
impl FitsInside for MinMaxAvgScalarBinBatch {
fn fits_inside(&self, range: NanoRange) -> Fits {
if self.ts1s.is_empty() {
@@ -168,22 +187,6 @@ impl MinMaxAvgScalarBinBatch {
}
}
impl std::fmt::Debug for MinMaxAvgScalarBinBatch {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
fmt,
"MinMaxAvgScalarBinBatch count {} ts1s {:?} ts2s {:?} counts {:?} avgs {:?} EDS {:?} VXS {:?}",
self.ts1s.len(),
self.ts1s.iter().map(|k| k / SEC).collect::<Vec<_>>(),
self.ts2s.iter().map(|k| k / SEC).collect::<Vec<_>>(),
self.counts,
self.avgs,
self.event_data_read_stats,
self.values_extract_stats,
)
}
}
impl AggregatableXdim1Bin for MinMaxAvgScalarBinBatch {
type Output = MinMaxAvgScalarBinBatch;
fn into_agg(self) -> Self::Output {
@@ -209,6 +212,7 @@ pub struct MinMaxAvgScalarBinBatchAggregator {
sumc: u64,
event_data_read_stats: EventDataReadStats,
values_extract_stats: ValuesExtractStats,
range_complete_observed: bool,
}
impl MinMaxAvgScalarBinBatchAggregator {
@@ -223,6 +227,7 @@ impl MinMaxAvgScalarBinBatchAggregator {
sumc: 0,
event_data_read_stats: EventDataReadStats::new(),
values_extract_stats: ValuesExtractStats::new(),
range_complete_observed: false,
}
}
}
@@ -255,6 +260,9 @@ impl AggregatorTdim for MinMaxAvgScalarBinBatchAggregator {
fn ingest(&mut self, v: &mut Self::InputValue) {
self.event_data_read_stats.trans(&mut v.event_data_read_stats);
self.values_extract_stats.trans(&mut v.values_extract_stats);
if v.range_complete_observed {
self.range_complete_observed = true;
}
for i1 in 0..v.ts1s.len() {
let ts1 = v.ts1s[i1];
let ts2 = v.ts2s[i1];
@@ -289,6 +297,7 @@ impl AggregatorTdim for MinMaxAvgScalarBinBatchAggregator {
avgs: vec![avg],
event_data_read_stats: std::mem::replace(&mut self.event_data_read_stats, EventDataReadStats::new()),
values_extract_stats: std::mem::replace(&mut self.values_extract_stats, ValuesExtractStats::new()),
range_complete_observed: self.range_complete_observed,
}
}
}

View File

@@ -113,6 +113,7 @@ impl PreBinnedValueStream {
.filter_map(|k| match k {
Ok(k) => ready(Some(k)),
Err(e) => {
// TODO Reconsider error handling here:
error!("{:?}", e);
ready(None)
}

View File

@@ -69,11 +69,11 @@ async fn open_files_inner(
if ts_bin.ns + channel_config.time_bin_size.ns <= range.beg {
continue;
}
info!("opening tb {:?}", &tb);
debug!("opening tb {:?}", &tb);
let path = paths::datapath(tb, &channel_config, &node);
info!("opening path {:?}", &path);
debug!("opening path {:?}", &path);
let mut file = OpenOptions::new().read(true).open(&path).await?;
info!("opened file {:?} {:?}", &path, &file);
debug!("opened file {:?} {:?}", &path, &file);
{
let index_path = paths::index_path(ts_bin, &channel_config, &node)?;
@@ -103,15 +103,15 @@ async fn open_files_inner(
}
let mut buf = BytesMut::with_capacity(meta.len() as usize);
buf.resize(buf.capacity(), 0);
info!("read exact index file {} {}", buf.len(), buf.len() % 16);
debug!("read exact index file {} {}", buf.len(), buf.len() % 16);
index_file.read_exact(&mut buf).await?;
match find_ge(range.beg, &buf[2..])? {
Some(o) => {
info!("FOUND ts IN INDEX: {:?}", o);
debug!("FOUND ts IN INDEX: {:?}", o);
file.seek(SeekFrom::Start(o.1)).await?;
}
None => {
info!("NOT FOUND IN INDEX");
debug!("NOT FOUND IN INDEX");
file.seek(SeekFrom::End(0)).await?;
}
}
@@ -132,7 +132,8 @@ async fn open_files_inner(
chtx.send(Ok(file)).await?;
}
warn!("OPEN FILES LOOP DONE");
// TODO keep track of number of running
debug!("open_files_inner done");
Ok(())
}

View File

@@ -15,6 +15,8 @@ pub struct EventBlobsComplete {
evs: Option<EventChunker>,
buffer_size: usize,
range: NanoRange,
errored: bool,
completed: bool,
}
impl EventBlobsComplete {
@@ -25,6 +27,8 @@ impl EventBlobsComplete {
buffer_size,
channel_config,
range,
errored: false,
completed: false,
}
}
}
@@ -34,6 +38,13 @@ impl Stream for EventBlobsComplete {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
use Poll::*;
if self.completed {
panic!("EventBlobsComplete poll_next on completed");
}
if self.errored {
self.completed = true;
return Ready(None);
}
'outer: loop {
let z = match &mut self.evs {
Some(evs) => match evs.poll_next_unpin(cx) {
@@ -53,9 +64,15 @@ impl Stream for EventBlobsComplete {
self.evs = Some(chunker);
continue 'outer;
}
Err(e) => Ready(Some(Err(e))),
Err(e) => {
self.errored = true;
Ready(Some(Err(e)))
}
},
Ready(None) => Ready(None),
Ready(None) => {
self.completed = true;
Ready(None)
}
Pending => Pending,
},
};

View File

@@ -108,12 +108,7 @@ impl EventChunker {
let len = sl.read_i32::<BE>().unwrap();
assert!(len >= 20 && len < 1024 * 1024 * 10);
let len = len as u32;
if (buf.len() as u32) < 20 {
// TODO gather stats about how often we find not enough input
//info!("parse_buf not enough B");
self.need_min = len as u32;
break;
} else if (buf.len() as u32) < len {
if (buf.len() as u32) < len {
self.need_min = len as u32;
break;
} else {
@@ -125,6 +120,8 @@ impl EventChunker {
let pulse = sl.read_i64::<BE>().unwrap() as u64;
if ts >= self.range.end {
self.seen_beyond_range = true;
ret.end_of_range_observed = true;
info!("END OF RANGE OBSERVED");
break;
}
if ts < self.range.beg {
@@ -140,6 +137,7 @@ impl EventChunker {
let type_flags = sl.read_u8().unwrap();
let type_index = sl.read_u8().unwrap();
assert!(type_index <= 13);
let scalar_type = ScalarType::from_dtype_index(type_index)?;
use super::dtflags::*;
let is_compressed = type_flags & COMPRESSION != 0;
let is_array = type_flags & ARRAY != 0;
@@ -166,7 +164,7 @@ impl EventChunker {
assert!(value_bytes < 1024 * 256);
assert!(block_size < 1024 * 32);
//let value_bytes = value_bytes;
let type_size = type_size(type_index);
let type_size = scalar_type.bytes() as u32;
let ele_count = value_bytes / type_size as u64;
let ele_size = type_size;
match self.channel_config.shape {
@@ -236,26 +234,6 @@ impl EventChunker {
}
}
fn type_size(ix: u8) -> u32 {
match ix {
0 => 1,
1 => 1,
2 => 1,
3 => 1,
4 => 2,
5 => 2,
6 => 2,
7 => 4,
8 => 4,
9 => 8,
10 => 8,
11 => 4,
12 => 8,
13 => 1,
_ => panic!("logic"),
}
}
struct ParseResult {
events: EventFull,
}
@@ -278,7 +256,6 @@ impl Stream for EventChunker {
}
match self.inp.poll_next_unpin(cx) {
Ready(Some(Ok(mut fcr))) => {
trace!("EventChunker got buffer len {}", fcr.buf.len());
let r = self.parse_buf(&mut fcr.buf);
match r {
Ok(res) => {
@@ -323,6 +300,7 @@ pub struct EventFull {
pub decomps: Vec<Option<BytesMut>>,
pub scalar_types: Vec<ScalarType>,
pub event_data_read_stats: EventDataReadStats,
pub end_of_range_observed: bool,
}
impl EventFull {
@@ -333,6 +311,7 @@ impl EventFull {
decomps: vec![],
scalar_types: vec![],
event_data_read_stats: EventDataReadStats::new(),
end_of_range_observed: false,
}
}

View File

@@ -144,7 +144,7 @@ where
if self.bufcap < nl {
// TODO count cases in production
let n = 2 * nl;
warn!("Adjust bufcap old {} new {}", self.bufcap, n);
debug!("Adjust bufcap old {} new {}", self.bufcap, n);
self.bufcap = n;
}
if nb < nl {
@@ -244,7 +244,6 @@ where
type Item = Result<InMemoryFrame, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
trace!("InMemoryFrameAsyncReadStream poll_next");
use Poll::*;
assert!(!self.completed);
if self.errored {
@@ -287,7 +286,7 @@ where
let n2 = self.buf.len();
if n2 != 0 {
warn!(
"InMemoryFrameAsyncReadStream n2 != 0 n2 {} consumed {} ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~",
"InMemoryFrameAsyncReadStream n2 != 0 n2 {} consumed {}",
n2, self.inp_bytes_consumed
);
}

View File

@@ -133,12 +133,6 @@ enum CurVal {
Val(ValuesDim1),
}
/*
============== MergedMinMaxAvgScalarStream
*/
pub struct MergedMinMaxAvgScalarStream<S>
where
S: Stream<Item = Result<MinMaxAvgScalarEventBatch, Error>>,
@@ -150,6 +144,8 @@ where
completed: bool,
batch: MinMaxAvgScalarEventBatch,
ts_last_emit: u64,
range_complete_observed: Vec<bool>,
range_complete_observed_all: bool,
}
impl<S> MergedMinMaxAvgScalarStream<S>
@@ -170,6 +166,8 @@ where
completed: false,
batch: MinMaxAvgScalarEventBatch::empty(),
ts_last_emit: 0,
range_complete_observed: vec![false; n],
range_complete_observed_all: false,
}
}
}
@@ -198,6 +196,16 @@ where
Ready(Some(Ok(mut k))) => {
self.batch.event_data_read_stats.trans(&mut k.event_data_read_stats);
self.batch.values_extract_stats.trans(&mut k.values_extract_stats);
if k.range_complete_observed {
self.range_complete_observed[i1] = true;
let d = self.range_complete_observed.iter().filter(|&&k| k).count();
if d == self.range_complete_observed.len() {
self.range_complete_observed_all = true;
info!("\n\n:::::: range_complete d {} COMPLETE", d);
} else {
info!("\n\n:::::: range_complete d {}", d);
}
}
self.current[i1] = MergedMinMaxAvgScalarStreamCurVal::Val(k);
}
Ready(Some(Err(e))) => {
@@ -242,11 +250,14 @@ where
}
if lowest_ix == usize::MAX {
if self.batch.tss.len() != 0 {
let k = std::mem::replace(&mut self.batch, MinMaxAvgScalarEventBatch::empty());
info!("```````````````` MergedMinMaxAvgScalarStream emit Ready(Some( current batch ))");
let mut k = std::mem::replace(&mut self.batch, MinMaxAvgScalarEventBatch::empty());
if self.range_complete_observed_all {
k.range_complete_observed = true;
}
info!("MergedMinMaxAvgScalarStream no more lowest emit Ready(Some( current batch ))");
break Ready(Some(Ok(k)));
} else {
info!("```````````````` MergedMinMaxAvgScalarStream emit Ready(None)");
info!("MergedMinMaxAvgScalarStream no more lowest emit Ready(None)");
self.completed = true;
break Ready(None);
}
@@ -266,7 +277,10 @@ where
self.ixs[lowest_ix] += 1;
}
if self.batch.tss.len() >= 64 {
let k = std::mem::replace(&mut self.batch, MinMaxAvgScalarEventBatch::empty());
let mut k = std::mem::replace(&mut self.batch, MinMaxAvgScalarEventBatch::empty());
if self.range_complete_observed_all {
k.range_complete_observed = true;
}
break Ready(Some(Ok(k)));
}
}

View File

@@ -89,7 +89,7 @@ async fn raw_conn_handler_inner_try(
addr: SocketAddr,
node_config: &NodeConfigCached,
) -> Result<(), ConnErr> {
info!("raw_conn_handler SPAWNED for {:?}", addr);
debug!("raw_conn_handler SPAWNED for {:?}", addr);
let (netin, mut netout) = stream.into_split();
let mut h = InMemoryFrameAsyncReadStream::new(netin);
let mut frames = vec![];
@@ -100,7 +100,6 @@ async fn raw_conn_handler_inner_try(
{
match k {
Ok(k) => {
info!(". . . . . . . . . . . . . . . . . . . . . . . . . . raw_conn_handler FRAME RECV");
frames.push(k);
}
Err(e) => {
@@ -116,7 +115,6 @@ async fn raw_conn_handler_inner_try(
Ok(k) => k,
Err(e) => return Err((e, netout).into()),
};
trace!("json: {}", qitem.0);
let res: Result<EventsQuery, _> = serde_json::from_str(&qitem.0);
let evq = match res {
Ok(k) => k,
@@ -129,7 +127,6 @@ async fn raw_conn_handler_inner_try(
Ok(_) => (),
Err(e) => return Err((e, netout))?,
}
debug!("REQUEST {:?}", evq);
let range = &evq.range;
let channel_config = match read_local_config(&evq.channel, &node_config.node).await {
Ok(k) => k,
@@ -139,7 +136,7 @@ async fn raw_conn_handler_inner_try(
Ok(k) => k,
Err(e) => return Err((e, netout))?,
};
info!("found config entry {:?}", entry);
debug!("found config entry {:?}", entry);
let shape = match &entry.shape {
Some(lens) => {
@@ -171,12 +168,11 @@ async fn raw_conn_handler_inner_try(
// TODO use the requested buffer size
buffer_size: 1024 * 4,
};
let buffer_size = 1024 * 4;
let mut s1 = EventBlobsComplete::new(
range.clone(),
query.channel_config.clone(),
node_config.node.clone(),
buffer_size,
query.buffer_size as usize,
)
.into_dim_1_f32_stream()
.into_binned_x_bins_1();