From 0204c370170fb47cb2225433b6facdb8fb54d6d3 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 28 Apr 2021 11:35:06 +0200 Subject: [PATCH] Open data via index file --- disk/src/agg.rs | 632 +-------------------------------- disk/src/agg/binnedt.rs | 154 ++++++++ disk/src/agg/binnedx.rs | 50 +++ disk/src/agg/eventbatch.rs | 247 +++++++++++++ disk/src/agg/scalarbinbatch.rs | 226 ++++++++++++ disk/src/aggtest.rs | 4 +- disk/src/binnedstream.rs | 12 +- disk/src/cache.rs | 2 +- disk/src/cache/pbv.rs | 3 +- disk/src/cache/pbvfs.rs | 2 +- disk/src/dataopen.rs | 169 +++++++++ disk/src/eventblobs.rs | 8 +- disk/src/eventchunker.rs | 28 +- disk/src/gen.rs | 49 ++- disk/src/lib.rs | 53 +-- disk/src/merge.rs | 3 +- disk/src/raw.rs | 2 +- disk/src/raw/bffr.rs | 2 +- disk/src/raw/conn.rs | 11 +- err/src/lib.rs | 10 +- netpod/src/lib.rs | 2 +- retrieval/src/test.rs | 4 +- taskrun/src/lib.rs | 2 +- 23 files changed, 966 insertions(+), 709 deletions(-) create mode 100644 disk/src/agg/binnedt.rs create mode 100644 disk/src/agg/binnedx.rs create mode 100644 disk/src/agg/eventbatch.rs create mode 100644 disk/src/agg/scalarbinbatch.rs create mode 100644 disk/src/dataopen.rs diff --git a/disk/src/agg.rs b/disk/src/agg.rs index 92d4df3..8742c02 100644 --- a/disk/src/agg.rs +++ b/disk/src/agg.rs @@ -3,20 +3,22 @@ Aggregation and binning support. */ use super::eventchunker::EventFull; -use bytes::{BufMut, Bytes, BytesMut}; +use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use err::Error; use futures_core::Stream; use futures_util::StreamExt; -use netpod::timeunits::SEC; -use netpod::{BinSpecDimT, NanoRange}; +use netpod::NanoRange; use netpod::{Node, ScalarType}; -use serde::{Deserialize, Serialize}; -use std::mem::size_of; use std::pin::Pin; use std::task::{Context, Poll}; #[allow(unused_imports)] use tracing::{debug, error, info, span, trace, warn, Level}; +pub mod binnedt; +pub mod binnedx; +pub mod eventbatch; +pub mod scalarbinbatch; + pub trait AggregatorTdim { type InputValue; type OutputValue: AggregatableXdim1Bin + AggregatableTdim; @@ -35,7 +37,7 @@ pub trait AggregatableXdim1Bin { pub trait AggregatableTdim { type Output: AggregatableXdim1Bin + AggregatableTdim; type Aggregator: AggregatorTdim; - fn aggregator_new(&self, ts1: u64, ts2: u64) -> Self::Aggregator; + fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator; } /// DO NOT USE. This is just a dummy for some testing. @@ -49,7 +51,7 @@ impl AggregatableXdim1Bin for () { impl AggregatableTdim for () { type Output = (); type Aggregator = (); - fn aggregator_new(&self, _ts1: u64, _ts2: u64) -> Self::Aggregator { + fn aggregator_new_static(_ts1: u64, _ts2: u64) -> Self::Aggregator { todo!() } } @@ -201,329 +203,6 @@ impl AggregatableXdim1Bin for ValuesDim0 { } } -#[derive(Serialize, Deserialize)] -pub struct MinMaxAvgScalarEventBatch { - pub tss: Vec, - pub mins: Vec, - pub maxs: Vec, - pub avgs: Vec, -} - -impl MinMaxAvgScalarEventBatch { - pub fn empty() -> Self { - Self { - tss: vec![], - mins: vec![], - maxs: vec![], - avgs: vec![], - } - } - pub fn from_full_frame(buf: &Bytes) -> Self { - info!("construct MinMaxAvgScalarEventBatch from full frame len {}", buf.len()); - assert!(buf.len() >= 4); - let mut g = MinMaxAvgScalarEventBatch::empty(); - let n1; - unsafe { - let ptr = (&buf[0] as *const u8) as *const [u8; 4]; - n1 = u32::from_le_bytes(*ptr); - trace!("--- +++ --- +++ --- +++ n1: {}", n1); - } - if n1 == 0 { - g - } else { - let n2 = n1 as usize; - g.tss.reserve(n2); - g.mins.reserve(n2); - g.maxs.reserve(n2); - g.avgs.reserve(n2); - unsafe { - // TODO Can I unsafely create ptrs and just assign them? - // TODO What are cases where I really need transmute? - g.tss.set_len(n2); - g.mins.set_len(n2); - g.maxs.set_len(n2); - g.avgs.set_len(n2); - let ptr0 = &buf[4] as *const u8; - { - let ptr1 = ptr0 as *const u64; - for i1 in 0..n2 { - g.tss[i1] = *ptr1.add(i1); - } - } - { - let ptr1 = ptr0.add((8) * n2) as *const f32; - for i1 in 0..n2 { - g.mins[i1] = *ptr1.add(i1); - } - } - { - let ptr1 = ptr0.add((8 + 4) * n2) as *const f32; - for i1 in 0..n2 { - g.maxs[i1] = *ptr1; - } - } - { - let ptr1 = ptr0.add((8 + 4 + 4) * n2) as *const f32; - for i1 in 0..n2 { - g.avgs[i1] = *ptr1; - } - } - } - info!("CONTENT {:?}", g); - g - } - } -} - -impl std::fmt::Debug for MinMaxAvgScalarEventBatch { - fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { - write!( - fmt, - "MinMaxAvgScalarEventBatch count {} tss {:?} mins {:?} maxs {:?} avgs {:?}", - self.tss.len(), - self.tss, - self.mins, - self.maxs, - self.avgs, - ) - } -} - -impl AggregatableXdim1Bin for MinMaxAvgScalarEventBatch { - type Output = MinMaxAvgScalarEventBatch; - fn into_agg(self) -> Self::Output { - self - } -} - -impl AggregatableTdim for MinMaxAvgScalarEventBatch { - type Output = MinMaxAvgScalarBinBatch; - type Aggregator = MinMaxAvgScalarEventBatchAggregator; - - fn aggregator_new(&self, ts1: u64, ts2: u64) -> Self::Aggregator { - MinMaxAvgScalarEventBatchAggregator::new(ts1, ts2) - } -} - -impl MinMaxAvgScalarEventBatch { - #[allow(dead_code)] - fn old_serialized(&self) -> Bytes { - let n1 = self.tss.len(); - let mut g = BytesMut::with_capacity(4 + n1 * (8 + 3 * 4)); - g.put_u32_le(n1 as u32); - if n1 > 0 { - let ptr = &self.tss[0] as *const u64 as *const u8; - let a = unsafe { std::slice::from_raw_parts(ptr, size_of::() * n1) }; - g.put(a); - let ptr = &self.mins[0] as *const f32 as *const u8; - let a = unsafe { std::slice::from_raw_parts(ptr, size_of::() * n1) }; - g.put(a); - let ptr = &self.maxs[0] as *const f32 as *const u8; - let a = unsafe { std::slice::from_raw_parts(ptr, size_of::() * n1) }; - g.put(a); - let ptr = &self.avgs[0] as *const f32 as *const u8; - let a = unsafe { std::slice::from_raw_parts(ptr, size_of::() * n1) }; - g.put(a); - } - info!("impl Frameable for MinMaxAvgScalarEventBatch g.len() {}", g.len()); - g.freeze() - } -} - -pub struct MinMaxAvgScalarEventBatchAggregator { - ts1: u64, - ts2: u64, - count: u64, - min: f32, - max: f32, - sum: f32, -} - -impl MinMaxAvgScalarEventBatchAggregator { - pub fn new(ts1: u64, ts2: u64) -> Self { - Self { - ts1, - ts2, - min: f32::MAX, - max: f32::MIN, - sum: 0f32, - count: 0, - } - } -} - -impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator { - type InputValue = MinMaxAvgScalarEventBatch; - type OutputValue = MinMaxAvgScalarBinSingle; - - fn ends_before(&self, inp: &Self::InputValue) -> bool { - match inp.tss.last() { - Some(ts) => *ts < self.ts1, - None => true, - } - } - - fn ends_after(&self, inp: &Self::InputValue) -> bool { - match inp.tss.last() { - Some(ts) => *ts >= self.ts2, - _ => panic!(), - } - } - - fn starts_after(&self, inp: &Self::InputValue) -> bool { - match inp.tss.first() { - Some(ts) => *ts >= self.ts2, - _ => panic!(), - } - } - - fn ingest(&mut self, v: &Self::InputValue) { - for i1 in 0..v.tss.len() { - let ts = v.tss[i1]; - if ts < self.ts1 { - //info!("EventBatchAgg {} {} {} {} IS BEFORE", v.tss[i1], v.mins[i1], v.maxs[i1], v.avgs[i1]); - continue; - } else if ts >= self.ts2 { - //info!("EventBatchAgg {} {} {} {} IS AFTER", v.tss[i1], v.mins[i1], v.maxs[i1], v.avgs[i1]); - continue; - } else { - //info!("EventBatchAgg {} {} {} {}", v.tss[i1], v.mins[i1], v.maxs[i1], v.avgs[i1]); - self.min = self.min.min(v.mins[i1]); - self.max = self.max.max(v.maxs[i1]); - self.sum += v.avgs[i1]; - self.count += 1; - } - } - } - - fn result(self) -> Self::OutputValue { - let min = if self.min == f32::MAX { f32::NAN } else { self.min }; - let max = if self.max == f32::MIN { f32::NAN } else { self.max }; - let avg = if self.count == 0 { - f32::NAN - } else { - self.sum / self.count as f32 - }; - MinMaxAvgScalarBinSingle { - ts1: self.ts1, - ts2: self.ts2, - count: self.count, - min, - max, - avg, - } - } -} - -#[allow(dead_code)] -#[derive(Serialize, Deserialize)] -pub struct MinMaxAvgScalarBinBatch { - ts1s: Vec, - ts2s: Vec, - counts: Vec, - mins: Vec, - maxs: Vec, - avgs: Vec, -} - -impl MinMaxAvgScalarBinBatch { - pub fn empty() -> Self { - Self { - ts1s: vec![], - ts2s: vec![], - counts: vec![], - mins: vec![], - maxs: vec![], - avgs: vec![], - } - } - pub fn len(&self) -> usize { - self.ts1s.len() - } - pub fn push_single(&mut self, g: &MinMaxAvgScalarBinSingle) { - self.ts1s.push(g.ts1); - self.ts2s.push(g.ts2); - self.counts.push(g.count); - self.mins.push(g.min); - self.maxs.push(g.max); - self.avgs.push(g.avg); - } - pub fn from_full_frame(buf: &Bytes) -> Self { - info!("MinMaxAvgScalarBinBatch construct from full frame len {}", buf.len()); - assert!(buf.len() >= 4); - let mut g = MinMaxAvgScalarBinBatch::empty(); - let n1; - unsafe { - let ptr = (&buf[0] as *const u8) as *const [u8; 4]; - n1 = u32::from_le_bytes(*ptr); - trace!( - "MinMaxAvgScalarBinBatch construct --- +++ --- +++ --- +++ n1: {}", - n1 - ); - } - if n1 == 0 { - g - } else { - let n2 = n1 as usize; - g.ts1s.reserve(n2); - g.ts2s.reserve(n2); - g.counts.reserve(n2); - g.mins.reserve(n2); - g.maxs.reserve(n2); - g.avgs.reserve(n2); - unsafe { - // TODO Can I unsafely create ptrs and just assign them? - // TODO What are cases where I really need transmute? - g.ts1s.set_len(n2); - g.ts2s.set_len(n2); - g.counts.set_len(n2); - g.mins.set_len(n2); - g.maxs.set_len(n2); - g.avgs.set_len(n2); - let ptr0 = &buf[4] as *const u8; - { - let ptr1 = ptr0.add(0) as *const u64; - for i1 in 0..n2 { - g.ts1s[i1] = *ptr1.add(i1); - } - } - { - let ptr1 = ptr0.add((8) * n2) as *const u64; - for i1 in 0..n2 { - g.ts2s[i1] = *ptr1.add(i1); - } - } - { - let ptr1 = ptr0.add((8 + 8) * n2) as *const u64; - for i1 in 0..n2 { - g.counts[i1] = *ptr1.add(i1); - } - } - { - let ptr1 = ptr0.add((8 + 8 + 8) * n2) as *const f32; - for i1 in 0..n2 { - g.mins[i1] = *ptr1.add(i1); - } - } - { - let ptr1 = ptr0.add((8 + 8 + 8 + 4) * n2) as *const f32; - for i1 in 0..n2 { - g.maxs[i1] = *ptr1; - } - } - { - let ptr1 = ptr0.add((8 + 8 + 8 + 4 + 4) * n2) as *const f32; - for i1 in 0..n2 { - g.avgs[i1] = *ptr1; - } - } - } - info!("CONTENT {:?}", g); - g - } - } -} - pub enum Fits { Empty, Lower, @@ -538,116 +217,6 @@ pub trait FitsInside { fn fits_inside(&self, range: NanoRange) -> Fits; } -impl FitsInside for MinMaxAvgScalarBinBatch { - fn fits_inside(&self, range: NanoRange) -> Fits { - if self.ts1s.is_empty() { - Fits::Empty - } else { - let t1 = *self.ts1s.first().unwrap(); - let t2 = *self.ts2s.last().unwrap(); - if t2 <= range.beg { - Fits::Lower - } else if t1 >= range.end { - Fits::Greater - } else if t1 < range.beg && t2 > range.end { - Fits::PartlyLowerAndGreater - } else if t1 < range.beg { - Fits::PartlyLower - } else if t2 > range.end { - Fits::PartlyGreater - } else { - Fits::Inside - } - } - } -} - -impl MinMaxAvgScalarBinBatch { - #[allow(dead_code)] - fn old_serialized(&self) -> Bytes { - let n1 = self.ts1s.len(); - let mut g = BytesMut::with_capacity(4 + n1 * (3 * 8 + 3 * 4)); - g.put_u32_le(n1 as u32); - if n1 > 0 { - let ptr = &self.ts1s[0] as *const u64 as *const u8; - let a = unsafe { std::slice::from_raw_parts(ptr, size_of::() * n1) }; - g.put(a); - let ptr = &self.ts2s[0] as *const u64 as *const u8; - let a = unsafe { std::slice::from_raw_parts(ptr, size_of::() * n1) }; - g.put(a); - let ptr = &self.counts[0] as *const u64 as *const u8; - let a = unsafe { std::slice::from_raw_parts(ptr, size_of::() * n1) }; - g.put(a); - let ptr = &self.mins[0] as *const f32 as *const u8; - let a = unsafe { std::slice::from_raw_parts(ptr, size_of::() * n1) }; - g.put(a); - let ptr = &self.maxs[0] as *const f32 as *const u8; - let a = unsafe { std::slice::from_raw_parts(ptr, size_of::() * n1) }; - g.put(a); - let ptr = &self.avgs[0] as *const f32 as *const u8; - let a = unsafe { std::slice::from_raw_parts(ptr, size_of::() * n1) }; - g.put(a); - } - g.freeze() - } -} - -impl std::fmt::Debug for MinMaxAvgScalarBinBatch { - fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { - write!( - fmt, - "MinMaxAvgScalarBinBatch count {} ts1s {:?} ts2s {:?} counts {:?} avgs {:?}", - self.ts1s.len(), - self.ts1s.iter().map(|k| k / SEC).collect::>(), - self.ts2s.iter().map(|k| k / SEC).collect::>(), - self.counts, - self.avgs - ) - } -} - -impl AggregatableXdim1Bin for MinMaxAvgScalarBinBatch { - type Output = MinMaxAvgScalarBinBatch; - fn into_agg(self) -> Self::Output { - todo!() - } -} - -impl AggregatableTdim for MinMaxAvgScalarBinBatch { - type Output = MinMaxAvgScalarBinSingle; - type Aggregator = MinMaxAvgScalarBinBatchAggregator; - fn aggregator_new(&self, _ts1: u64, _ts2: u64) -> Self::Aggregator { - todo!() - } -} - -pub struct MinMaxAvgScalarBinBatchAggregator {} - -impl AggregatorTdim for MinMaxAvgScalarBinBatchAggregator { - type InputValue = MinMaxAvgScalarBinBatch; - type OutputValue = MinMaxAvgScalarBinSingle; - - fn ends_before(&self, _inp: &Self::InputValue) -> bool { - todo!() - } - - fn ends_after(&self, _inp: &Self::InputValue) -> bool { - todo!() - } - - fn starts_after(&self, _inp: &Self::InputValue) -> bool { - todo!() - } - - fn ingest(&mut self, _v: &Self::InputValue) { - todo!() - } - - fn result(self) -> Self::OutputValue { - todo!() - } -} - pub struct MinMaxAvgScalarBinSingle { ts1: u64, ts2: u64, @@ -670,7 +239,7 @@ impl std::fmt::Debug for MinMaxAvgScalarBinSingle { impl AggregatableTdim for MinMaxAvgScalarBinSingle { type Output = MinMaxAvgScalarBinSingle; type Aggregator = MinMaxAvgScalarBinSingleAggregator; - fn aggregator_new(&self, _ts1: u64, _ts2: u64) -> Self::Aggregator { + fn aggregator_new_static(_ts1: u64, _ts2: u64) -> Self::Aggregator { todo!() } } @@ -871,187 +440,6 @@ where } } -pub trait IntoBinnedXBins1 { - type StreamOut; - fn into_binned_x_bins_1(self) -> Self::StreamOut - where - Self: Stream>; -} - -impl IntoBinnedXBins1 for T -where - T: Stream> + Unpin, -{ - type StreamOut = IntoBinnedXBins1DefaultStream; - - fn into_binned_x_bins_1(self) -> Self::StreamOut { - IntoBinnedXBins1DefaultStream { inp: self } - } -} - -pub struct IntoBinnedXBins1DefaultStream -where - S: Stream> + Unpin, - I: AggregatableXdim1Bin, -{ - inp: S, -} - -impl Stream for IntoBinnedXBins1DefaultStream -where - S: Stream> + Unpin, - I: AggregatableXdim1Bin, -{ - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - use Poll::*; - match self.inp.poll_next_unpin(cx) { - Ready(Some(Ok(k))) => Ready(Some(Ok(k.into_agg()))), - Ready(Some(Err(e))) => Ready(Some(Err(e))), - Ready(None) => Ready(None), - Pending => Pending, - } - } -} - -pub trait IntoBinnedT { - type StreamOut: Stream; - fn into_binned_t(self, spec: BinSpecDimT) -> Self::StreamOut; -} - -impl IntoBinnedT for T -where - I: AggregatableTdim + Unpin, - T: Stream> + Unpin, - I::Aggregator: Unpin, -{ - type StreamOut = IntoBinnedTDefaultStream; - - fn into_binned_t(self, spec: BinSpecDimT) -> Self::StreamOut { - IntoBinnedTDefaultStream::new(self, spec) - } -} - -pub struct IntoBinnedTDefaultStream -where - I: AggregatableTdim, - S: Stream>, -{ - inp: S, - aggtor: Option, - spec: BinSpecDimT, - curbin: u32, - left: Option>>>, - errored: bool, - completed: bool, - inp_completed: bool, -} - -impl IntoBinnedTDefaultStream -where - I: AggregatableTdim, - S: Stream>, -{ - pub fn new(inp: S, spec: BinSpecDimT) -> Self { - //info!("spec ts {} {}", spec.ts1, spec.ts2); - Self { - inp, - aggtor: None, - spec, - curbin: 0, - left: None, - errored: false, - completed: false, - inp_completed: false, - } - } -} - -impl Stream for IntoBinnedTDefaultStream -where - I: AggregatableTdim + Unpin, - T: Stream> + Unpin, - I::Aggregator: Unpin, -{ - type Item = Result<::OutputValue, Error>; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - use Poll::*; - if self.completed { - panic!("MergedFromRemotes ✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗ poll_next on completed"); - } - if self.errored { - self.completed = true; - return Ready(None); - } - 'outer: loop { - let cur = if self.curbin as u64 >= self.spec.count { - trace!("IntoBinnedTDefaultStream curbin out of spec, END"); - Ready(None) - } else if let Some(k) = self.left.take() { - trace!("IntoBinnedTDefaultStream USE LEFTOVER"); - k - } else if self.inp_completed { - Ready(None) - } else { - let inp_poll_span = span!(Level::TRACE, "into_t_inp_poll"); - inp_poll_span.in_scope(|| self.inp.poll_next_unpin(cx)) - }; - break match cur { - Ready(Some(Ok(k))) => { - if self.aggtor.is_none() { - let range = self.spec.get_range(self.curbin); - //info!("range: {} {}", range.ts1, range.ts2); - self.aggtor = Some(k.aggregator_new(range.beg, range.end)); - } - let ag = self.aggtor.as_mut().unwrap(); - if ag.ends_before(&k) { - //info!("ENDS BEFORE"); - continue 'outer; - } else if ag.starts_after(&k) { - //info!("STARTS AFTER"); - self.left = Some(Ready(Some(Ok(k)))); - self.curbin += 1; - Ready(Some(Ok(self.aggtor.take().unwrap().result()))) - } else { - //info!("INGEST"); - ag.ingest(&k); - // if this input contains also data after the current bin, then I need to keep - // it for the next round. - if ag.ends_after(&k) { - //info!("ENDS AFTER"); - self.left = Some(Ready(Some(Ok(k)))); - self.curbin += 1; - Ready(Some(Ok(self.aggtor.take().unwrap().result()))) - } else { - //info!("ENDS WITHIN"); - continue 'outer; - } - } - } - Ready(Some(Err(e))) => { - error!("IntoBinnedTDefaultStream err from input"); - self.errored = true; - Ready(Some(Err(e))) - } - Ready(None) => { - self.inp_completed = true; - match self.aggtor.take() { - Some(ag) => Ready(Some(Ok(ag.result()))), - None => { - warn!("TODO add the trailing empty bins until requested range is complete"); - self.completed = true; - Ready(None) - } - } - } - Pending => Pending, - }; - } - } -} - pub fn make_test_node(id: u32) -> Node { Node { id, diff --git a/disk/src/agg/binnedt.rs b/disk/src/agg/binnedt.rs new file mode 100644 index 0000000..1e59616 --- /dev/null +++ b/disk/src/agg/binnedt.rs @@ -0,0 +1,154 @@ +use crate::agg::{AggregatableTdim, AggregatorTdim}; +use err::Error; +use futures_core::Stream; +use futures_util::StreamExt; +use netpod::log::*; +use netpod::BinSpecDimT; +use std::pin::Pin; +use std::task::{Context, Poll}; + +pub trait IntoBinnedT { + type StreamOut: Stream; + fn into_binned_t(self, spec: BinSpecDimT) -> Self::StreamOut; +} + +impl IntoBinnedT for T +where + I: AggregatableTdim + Unpin, + T: Stream> + Unpin, + I::Aggregator: Unpin, +{ + type StreamOut = IntoBinnedTDefaultStream; + + fn into_binned_t(self, spec: BinSpecDimT) -> Self::StreamOut { + IntoBinnedTDefaultStream::new(self, spec) + } +} + +pub struct IntoBinnedTDefaultStream +where + I: AggregatableTdim, + S: Stream>, +{ + inp: S, + aggtor: Option, + spec: BinSpecDimT, + curbin: u32, + left: Option>>>, + errored: bool, + completed: bool, + inp_completed: bool, +} + +impl IntoBinnedTDefaultStream +where + I: AggregatableTdim, + S: Stream>, +{ + pub fn new(inp: S, spec: BinSpecDimT) -> Self { + let range = spec.get_range(0); + Self { + inp, + aggtor: Some(I::aggregator_new_static(range.beg, range.end)), + spec, + curbin: 0, + left: None, + errored: false, + completed: false, + inp_completed: false, + } + } +} + +impl Stream for IntoBinnedTDefaultStream +where + I: AggregatableTdim + Unpin, + T: Stream> + Unpin, + I::Aggregator: Unpin, +{ + type Item = Result<::OutputValue, Error>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + if self.completed { + panic!("MergedFromRemotes ✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗ poll_next on completed"); + } + if self.errored { + self.completed = true; + return Ready(None); + } + 'outer: loop { + let cur = if let Some(k) = self.left.take() { + trace!("IntoBinnedTDefaultStream USE LEFTOVER"); + k + } else if self.inp_completed { + Ready(None) + } else { + let inp_poll_span = span!(Level::TRACE, "into_t_inp_poll"); + inp_poll_span.in_scope(|| self.inp.poll_next_unpin(cx)) + }; + break match cur { + Ready(Some(Ok(k))) => { + let ag = self.aggtor.as_mut().unwrap(); + if ag.ends_before(&k) { + //info!("ENDS BEFORE"); + continue 'outer; + } else if ag.starts_after(&k) { + //info!("STARTS AFTER"); + self.left = Some(Ready(Some(Ok(k)))); + self.curbin += 1; + let range = self.spec.get_range(self.curbin); + let ret = self + .aggtor + .replace(I::aggregator_new_static(range.beg, range.end)) + .unwrap() + .result(); + Ready(Some(Ok(ret))) + } else { + //info!("INGEST"); + ag.ingest(&k); + // if this input contains also data after the current bin, then I need to keep + // it for the next round. + if ag.ends_after(&k) { + //info!("ENDS AFTER"); + self.left = Some(Ready(Some(Ok(k)))); + self.curbin += 1; + let range = self.spec.get_range(self.curbin); + let ret = self + .aggtor + .replace(I::aggregator_new_static(range.beg, range.end)) + .unwrap() + .result(); + Ready(Some(Ok(ret))) + } else { + //info!("ENDS WITHIN"); + continue 'outer; + } + } + } + Ready(Some(Err(e))) => { + self.errored = true; + Ready(Some(Err(e))) + } + Ready(None) => { + self.inp_completed = true; + if self.curbin as u64 >= self.spec.count { + warn!("IntoBinnedTDefaultStream curbin out of spec, END"); + self.completed = true; + Ready(None) + } else { + self.curbin += 1; + let range = self.spec.get_range(self.curbin); + match self.aggtor.replace(I::aggregator_new_static(range.beg, range.end)) { + Some(ag) => Ready(Some(Ok(ag.result()))), + None => { + panic!(); + } + } + } + } + Pending => Pending, + }; + } + } +} diff --git a/disk/src/agg/binnedx.rs b/disk/src/agg/binnedx.rs new file mode 100644 index 0000000..f2d1eba --- /dev/null +++ b/disk/src/agg/binnedx.rs @@ -0,0 +1,50 @@ +use crate::agg::AggregatableXdim1Bin; +use err::Error; +use futures_core::Stream; +use futures_util::StreamExt; +use std::pin::Pin; +use std::task::{Context, Poll}; + +pub trait IntoBinnedXBins1 { + type StreamOut; + fn into_binned_x_bins_1(self) -> Self::StreamOut + where + Self: Stream>; +} + +impl IntoBinnedXBins1 for T +where + T: Stream> + Unpin, +{ + type StreamOut = IntoBinnedXBins1DefaultStream; + + fn into_binned_x_bins_1(self) -> Self::StreamOut { + IntoBinnedXBins1DefaultStream { inp: self } + } +} + +pub struct IntoBinnedXBins1DefaultStream +where + S: Stream> + Unpin, + I: AggregatableXdim1Bin, +{ + inp: S, +} + +impl Stream for IntoBinnedXBins1DefaultStream +where + S: Stream> + Unpin, + I: AggregatableXdim1Bin, +{ + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + match self.inp.poll_next_unpin(cx) { + Ready(Some(Ok(k))) => Ready(Some(Ok(k.into_agg()))), + Ready(Some(Err(e))) => Ready(Some(Err(e))), + Ready(None) => Ready(None), + Pending => Pending, + } + } +} diff --git a/disk/src/agg/eventbatch.rs b/disk/src/agg/eventbatch.rs new file mode 100644 index 0000000..19b76e4 --- /dev/null +++ b/disk/src/agg/eventbatch.rs @@ -0,0 +1,247 @@ +use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; +use crate::agg::{AggregatableTdim, AggregatableXdim1Bin, AggregatorTdim, MinMaxAvgScalarBinSingle}; +use bytes::{BufMut, Bytes, BytesMut}; +use netpod::log::*; +use netpod::timeunits::SEC; +use serde::{Deserialize, Serialize}; +use std::mem::size_of; + +#[derive(Serialize, Deserialize)] +pub struct MinMaxAvgScalarEventBatch { + pub tss: Vec, + pub mins: Vec, + pub maxs: Vec, + pub avgs: Vec, +} + +impl MinMaxAvgScalarEventBatch { + pub fn empty() -> Self { + Self { + tss: vec![], + mins: vec![], + maxs: vec![], + avgs: vec![], + } + } + #[allow(dead_code)] + pub fn old_from_full_frame(buf: &Bytes) -> Self { + info!("construct MinMaxAvgScalarEventBatch from full frame len {}", buf.len()); + assert!(buf.len() >= 4); + let mut g = MinMaxAvgScalarEventBatch::empty(); + let n1; + unsafe { + let ptr = (&buf[0] as *const u8) as *const [u8; 4]; + n1 = u32::from_le_bytes(*ptr); + trace!("--- +++ --- +++ --- +++ n1: {}", n1); + } + if n1 == 0 { + g + } else { + let n2 = n1 as usize; + g.tss.reserve(n2); + g.mins.reserve(n2); + g.maxs.reserve(n2); + g.avgs.reserve(n2); + unsafe { + // TODO Can I unsafely create ptrs and just assign them? + // TODO What are cases where I really need transmute? + g.tss.set_len(n2); + g.mins.set_len(n2); + g.maxs.set_len(n2); + g.avgs.set_len(n2); + let ptr0 = &buf[4] as *const u8; + { + let ptr1 = ptr0 as *const u64; + for i1 in 0..n2 { + g.tss[i1] = *ptr1.add(i1); + } + } + { + let ptr1 = ptr0.add((8) * n2) as *const f32; + for i1 in 0..n2 { + g.mins[i1] = *ptr1.add(i1); + } + } + { + let ptr1 = ptr0.add((8 + 4) * n2) as *const f32; + for i1 in 0..n2 { + g.maxs[i1] = *ptr1; + } + } + { + let ptr1 = ptr0.add((8 + 4 + 4) * n2) as *const f32; + for i1 in 0..n2 { + g.avgs[i1] = *ptr1; + } + } + } + info!("CONTENT {:?}", g); + g + } + } +} + +impl std::fmt::Debug for MinMaxAvgScalarEventBatch { + fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { + write!( + fmt, + "MinMaxAvgScalarEventBatch count {} tss {:?} mins {:?} maxs {:?} avgs {:?}", + self.tss.len(), + self.tss, + self.mins, + self.maxs, + self.avgs, + ) + } +} + +impl AggregatableXdim1Bin for MinMaxAvgScalarEventBatch { + type Output = MinMaxAvgScalarEventBatch; + fn into_agg(self) -> Self::Output { + self + } +} + +impl AggregatableTdim for MinMaxAvgScalarEventBatch { + type Output = MinMaxAvgScalarBinBatch; + type Aggregator = MinMaxAvgScalarEventBatchAggregator; + fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator { + MinMaxAvgScalarEventBatchAggregator::new(ts1, ts2) + } +} + +impl MinMaxAvgScalarEventBatch { + #[allow(dead_code)] + fn old_serialized(&self) -> Bytes { + let n1 = self.tss.len(); + let mut g = BytesMut::with_capacity(4 + n1 * (8 + 3 * 4)); + g.put_u32_le(n1 as u32); + if n1 > 0 { + let ptr = &self.tss[0] as *const u64 as *const u8; + let a = unsafe { std::slice::from_raw_parts(ptr, size_of::() * n1) }; + g.put(a); + let ptr = &self.mins[0] as *const f32 as *const u8; + let a = unsafe { std::slice::from_raw_parts(ptr, size_of::() * n1) }; + g.put(a); + let ptr = &self.maxs[0] as *const f32 as *const u8; + let a = unsafe { std::slice::from_raw_parts(ptr, size_of::() * n1) }; + g.put(a); + let ptr = &self.avgs[0] as *const f32 as *const u8; + let a = unsafe { std::slice::from_raw_parts(ptr, size_of::() * n1) }; + g.put(a); + } + info!("impl Frameable for MinMaxAvgScalarEventBatch g.len() {}", g.len()); + g.freeze() + } +} + +pub struct MinMaxAvgScalarEventBatchAggregator { + ts1: u64, + ts2: u64, + count: u64, + min: f32, + max: f32, + sum: f32, +} + +impl MinMaxAvgScalarEventBatchAggregator { + pub fn new(ts1: u64, ts2: u64) -> Self { + Self { + ts1, + ts2, + min: f32::MAX, + max: f32::MIN, + sum: 0f32, + count: 0, + } + } +} + +impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator { + type InputValue = MinMaxAvgScalarEventBatch; + type OutputValue = MinMaxAvgScalarBinSingle; + + fn ends_before(&self, inp: &Self::InputValue) -> bool { + match inp.tss.last() { + Some(ts) => *ts < self.ts1, + None => true, + } + } + + fn ends_after(&self, inp: &Self::InputValue) -> bool { + match inp.tss.last() { + Some(ts) => *ts >= self.ts2, + _ => panic!(), + } + } + + fn starts_after(&self, inp: &Self::InputValue) -> bool { + match inp.tss.first() { + Some(ts) => *ts >= self.ts2, + _ => panic!(), + } + } + + fn ingest(&mut self, v: &Self::InputValue) { + trace!( + "ingest {} {} {} {:?} {:?}", + self.ends_before(v), + self.ends_after(v), + self.starts_after(v), + v.tss.first().map(|k| k / SEC), + v.tss.last().map(|k| k / SEC), + ); + for i1 in 0..v.tss.len() { + let ts = v.tss[i1]; + if ts < self.ts1 { + trace!( + "EventBatchAgg {} {} {} {} IS BEFORE", + v.tss[i1], + v.mins[i1], + v.maxs[i1], + v.avgs[i1] + ); + continue; + } else if ts >= self.ts2 { + trace!( + "EventBatchAgg {} {} {} {} IS AFTER", + v.tss[i1], + v.mins[i1], + v.maxs[i1], + v.avgs[i1] + ); + continue; + } else { + trace!( + "EventBatchAgg {} {} {} {}", + v.tss[i1], + v.mins[i1], + v.maxs[i1], + v.avgs[i1] + ); + self.min = self.min.min(v.mins[i1]); + self.max = self.max.max(v.maxs[i1]); + self.sum += v.avgs[i1]; + self.count += 1; + } + } + } + + fn result(self) -> Self::OutputValue { + let min = if self.min == f32::MAX { f32::NAN } else { self.min }; + let max = if self.max == f32::MIN { f32::NAN } else { self.max }; + let avg = if self.count == 0 { + f32::NAN + } else { + self.sum / self.count as f32 + }; + MinMaxAvgScalarBinSingle { + ts1: self.ts1, + ts2: self.ts2, + count: self.count, + min, + max, + avg, + } + } +} diff --git a/disk/src/agg/scalarbinbatch.rs b/disk/src/agg/scalarbinbatch.rs new file mode 100644 index 0000000..18fb703 --- /dev/null +++ b/disk/src/agg/scalarbinbatch.rs @@ -0,0 +1,226 @@ +use crate::agg::{AggregatableTdim, AggregatableXdim1Bin, AggregatorTdim, Fits, FitsInside, MinMaxAvgScalarBinSingle}; +use bytes::{BufMut, Bytes, BytesMut}; +use netpod::log::*; +use netpod::timeunits::SEC; +use netpod::NanoRange; +use serde::{Deserialize, Serialize}; +use std::mem::size_of; + +#[allow(dead_code)] +#[derive(Serialize, Deserialize)] +pub struct MinMaxAvgScalarBinBatch { + ts1s: Vec, + ts2s: Vec, + counts: Vec, + mins: Vec, + maxs: Vec, + avgs: Vec, +} + +impl MinMaxAvgScalarBinBatch { + pub fn empty() -> Self { + Self { + ts1s: vec![], + ts2s: vec![], + counts: vec![], + mins: vec![], + maxs: vec![], + avgs: vec![], + } + } + pub fn len(&self) -> usize { + self.ts1s.len() + } + pub fn push_single(&mut self, g: &MinMaxAvgScalarBinSingle) { + self.ts1s.push(g.ts1); + self.ts2s.push(g.ts2); + self.counts.push(g.count); + self.mins.push(g.min); + self.maxs.push(g.max); + self.avgs.push(g.avg); + } + pub fn from_full_frame(buf: &Bytes) -> Self { + info!("MinMaxAvgScalarBinBatch construct from full frame len {}", buf.len()); + assert!(buf.len() >= 4); + let mut g = MinMaxAvgScalarBinBatch::empty(); + let n1; + unsafe { + let ptr = (&buf[0] as *const u8) as *const [u8; 4]; + n1 = u32::from_le_bytes(*ptr); + trace!( + "MinMaxAvgScalarBinBatch construct --- +++ --- +++ --- +++ n1: {}", + n1 + ); + } + if n1 == 0 { + g + } else { + let n2 = n1 as usize; + g.ts1s.reserve(n2); + g.ts2s.reserve(n2); + g.counts.reserve(n2); + g.mins.reserve(n2); + g.maxs.reserve(n2); + g.avgs.reserve(n2); + unsafe { + // TODO Can I unsafely create ptrs and just assign them? + // TODO What are cases where I really need transmute? + g.ts1s.set_len(n2); + g.ts2s.set_len(n2); + g.counts.set_len(n2); + g.mins.set_len(n2); + g.maxs.set_len(n2); + g.avgs.set_len(n2); + let ptr0 = &buf[4] as *const u8; + { + let ptr1 = ptr0.add(0) as *const u64; + for i1 in 0..n2 { + g.ts1s[i1] = *ptr1.add(i1); + } + } + { + let ptr1 = ptr0.add((8) * n2) as *const u64; + for i1 in 0..n2 { + g.ts2s[i1] = *ptr1.add(i1); + } + } + { + let ptr1 = ptr0.add((8 + 8) * n2) as *const u64; + for i1 in 0..n2 { + g.counts[i1] = *ptr1.add(i1); + } + } + { + let ptr1 = ptr0.add((8 + 8 + 8) * n2) as *const f32; + for i1 in 0..n2 { + g.mins[i1] = *ptr1.add(i1); + } + } + { + let ptr1 = ptr0.add((8 + 8 + 8 + 4) * n2) as *const f32; + for i1 in 0..n2 { + g.maxs[i1] = *ptr1; + } + } + { + let ptr1 = ptr0.add((8 + 8 + 8 + 4 + 4) * n2) as *const f32; + for i1 in 0..n2 { + g.avgs[i1] = *ptr1; + } + } + } + info!("CONTENT {:?}", g); + g + } + } +} + +impl FitsInside for MinMaxAvgScalarBinBatch { + fn fits_inside(&self, range: NanoRange) -> Fits { + if self.ts1s.is_empty() { + Fits::Empty + } else { + let t1 = *self.ts1s.first().unwrap(); + let t2 = *self.ts2s.last().unwrap(); + if t2 <= range.beg { + Fits::Lower + } else if t1 >= range.end { + Fits::Greater + } else if t1 < range.beg && t2 > range.end { + Fits::PartlyLowerAndGreater + } else if t1 < range.beg { + Fits::PartlyLower + } else if t2 > range.end { + Fits::PartlyGreater + } else { + Fits::Inside + } + } + } +} + +impl MinMaxAvgScalarBinBatch { + #[allow(dead_code)] + fn old_serialized(&self) -> Bytes { + let n1 = self.ts1s.len(); + let mut g = BytesMut::with_capacity(4 + n1 * (3 * 8 + 3 * 4)); + g.put_u32_le(n1 as u32); + if n1 > 0 { + let ptr = &self.ts1s[0] as *const u64 as *const u8; + let a = unsafe { std::slice::from_raw_parts(ptr, size_of::() * n1) }; + g.put(a); + let ptr = &self.ts2s[0] as *const u64 as *const u8; + let a = unsafe { std::slice::from_raw_parts(ptr, size_of::() * n1) }; + g.put(a); + let ptr = &self.counts[0] as *const u64 as *const u8; + let a = unsafe { std::slice::from_raw_parts(ptr, size_of::() * n1) }; + g.put(a); + let ptr = &self.mins[0] as *const f32 as *const u8; + let a = unsafe { std::slice::from_raw_parts(ptr, size_of::() * n1) }; + g.put(a); + let ptr = &self.maxs[0] as *const f32 as *const u8; + let a = unsafe { std::slice::from_raw_parts(ptr, size_of::() * n1) }; + g.put(a); + let ptr = &self.avgs[0] as *const f32 as *const u8; + let a = unsafe { std::slice::from_raw_parts(ptr, size_of::() * n1) }; + g.put(a); + } + g.freeze() + } +} + +impl std::fmt::Debug for MinMaxAvgScalarBinBatch { + fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { + write!( + fmt, + "MinMaxAvgScalarBinBatch count {} ts1s {:?} ts2s {:?} counts {:?} avgs {:?}", + self.ts1s.len(), + self.ts1s.iter().map(|k| k / SEC).collect::>(), + self.ts2s.iter().map(|k| k / SEC).collect::>(), + self.counts, + self.avgs + ) + } +} + +impl AggregatableXdim1Bin for MinMaxAvgScalarBinBatch { + type Output = MinMaxAvgScalarBinBatch; + fn into_agg(self) -> Self::Output { + todo!() + } +} + +impl AggregatableTdim for MinMaxAvgScalarBinBatch { + type Output = MinMaxAvgScalarBinSingle; + type Aggregator = MinMaxAvgScalarBinBatchAggregator; + fn aggregator_new_static(_ts1: u64, _ts2: u64) -> Self::Aggregator { + todo!() + } +} + +pub struct MinMaxAvgScalarBinBatchAggregator {} + +impl AggregatorTdim for MinMaxAvgScalarBinBatchAggregator { + type InputValue = MinMaxAvgScalarBinBatch; + type OutputValue = MinMaxAvgScalarBinSingle; + + fn ends_before(&self, _inp: &Self::InputValue) -> bool { + todo!() + } + + fn ends_after(&self, _inp: &Self::InputValue) -> bool { + todo!() + } + + fn starts_after(&self, _inp: &Self::InputValue) -> bool { + todo!() + } + + fn ingest(&mut self, _v: &Self::InputValue) { + todo!() + } + + fn result(self) -> Self::OutputValue { + todo!() + } +} diff --git a/disk/src/aggtest.rs b/disk/src/aggtest.rs index f7b54a5..93ee932 100644 --- a/disk/src/aggtest.rs +++ b/disk/src/aggtest.rs @@ -1,5 +1,7 @@ -use super::agg::{AggregatableXdim1Bin, IntoBinnedT, IntoBinnedXBins1, IntoDim1F32Stream, ValuesDim1}; +use super::agg::{AggregatableXdim1Bin, IntoDim1F32Stream, ValuesDim1}; use super::merge::MergeDim1F32Stream; +use crate::agg::binnedt::IntoBinnedT; +use crate::agg::binnedx::IntoBinnedXBins1; use crate::agg::make_test_node; use futures_util::StreamExt; use netpod::timeunits::*; diff --git a/disk/src/binnedstream.rs b/disk/src/binnedstream.rs index b445d94..39d0124 100644 --- a/disk/src/binnedstream.rs +++ b/disk/src/binnedstream.rs @@ -1,4 +1,4 @@ -use crate::agg::MinMaxAvgScalarBinBatch; +use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::cache::pbvfs::PreBinnedValueFetchedStream; use err::Error; use futures_core::Stream; @@ -41,16 +41,16 @@ impl BinnedStream { let g = match k { Ok(k) => { use super::agg::{Fits, FitsInside}; - //info!("BinnedStream got good item {:?}", k); match k.fits_inside(range.clone()) { - Fits::Inside => Some(Ok(k)), + Fits::Inside + | Fits::PartlyGreater + | Fits::PartlyLower + | Fits::PartlyLowerAndGreater => Some(Ok(k)), _ => None, } } Err(e) => { - error!( - "\n\n----------------------------------------------------- BinnedStream got error" - ); + error!("observe error in stream {:?}", e); Some(Err(e)) } }; diff --git a/disk/src/cache.rs b/disk/src/cache.rs index a3256ea..15bf5a0 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -1,4 +1,4 @@ -use crate::agg::MinMaxAvgScalarEventBatch; +use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use crate::binnedstream::BinnedStream; use crate::cache::pbv::PreBinnedValueByteStream; use crate::channelconfig::{extract_matching_config_entry, read_local_config}; diff --git a/disk/src/cache/pbv.rs b/disk/src/cache/pbv.rs index 22335c8..f8bcd71 100644 --- a/disk/src/cache/pbv.rs +++ b/disk/src/cache/pbv.rs @@ -1,4 +1,5 @@ -use crate::agg::{IntoBinnedT, MinMaxAvgScalarBinBatch}; +use crate::agg::binnedt::IntoBinnedT; +use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::cache::pbvfs::{PreBinnedHttpFrame, PreBinnedValueFetchedStream}; use crate::cache::{node_ix_for_patch, MergedFromRemotes}; use crate::frame::makeframe::make_frame; diff --git a/disk/src/cache/pbvfs.rs b/disk/src/cache/pbvfs.rs index 65279b7..f2fcc4e 100644 --- a/disk/src/cache/pbvfs.rs +++ b/disk/src/cache/pbvfs.rs @@ -1,4 +1,4 @@ -use crate::agg::MinMaxAvgScalarBinBatch; +use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::cache::{node_ix_for_patch, HttpBodyAsAsyncRead}; use crate::frame::inmem::InMemoryFrameAsyncReadStream; use crate::frame::makeframe::FrameType; diff --git a/disk/src/dataopen.rs b/disk/src/dataopen.rs new file mode 100644 index 0000000..79831a5 --- /dev/null +++ b/disk/src/dataopen.rs @@ -0,0 +1,169 @@ +use super::paths; +use bytes::BytesMut; +use err::Error; +use futures_util::StreamExt; +use netpod::log::*; +use netpod::{ChannelConfig, NanoRange, Nanos, Node}; +use std::mem::size_of; +use std::sync::Arc; +use tokio::fs::{File, OpenOptions}; +use tokio::io::{AsyncReadExt, AsyncSeekExt, ErrorKind, SeekFrom}; + +pub fn open_files( + range: &NanoRange, + channel_config: &ChannelConfig, + node: Arc, +) -> async_channel::Receiver> { + let (chtx, chrx) = async_channel::bounded(2); + let range = range.clone(); + let channel_config = channel_config.clone(); + tokio::spawn(async move { + match open_files_inner(&chtx, &range, &channel_config, node).await { + Ok(_) => {} + Err(e) => match chtx.send(Err(e.into())).await { + Ok(_) => {} + Err(e) => { + error!("open_files channel send error {:?}", e); + } + }, + } + }); + chrx +} + +async fn open_files_inner( + chtx: &async_channel::Sender>, + range: &NanoRange, + channel_config: &ChannelConfig, + node: Arc, +) -> Result<(), Error> { + let channel_config = channel_config.clone(); + // TODO reduce usage of `query` and see what we actually need. + let mut timebins = vec![]; + { + let rd = tokio::fs::read_dir(paths::channel_timebins_dir_path(&channel_config, &node)?).await?; + let mut rd = tokio_stream::wrappers::ReadDirStream::new(rd); + while let Some(e) = rd.next().await { + let e = e?; + let dn = e + .file_name() + .into_string() + .map_err(|e| Error::with_msg(format!("Bad OS path {:?}", e)))?; + let vv = dn.chars().fold(0, |a, x| if x.is_digit(10) { a + 1 } else { a }); + if vv == 19 { + timebins.push(dn.parse::()?); + } + } + } + timebins.sort_unstable(); + info!("TIMEBINS FOUND: {:?}", timebins); + for &tb in &timebins { + let ts_bin = Nanos { + ns: tb * channel_config.time_bin_size, + }; + if ts_bin.ns >= range.end { + continue; + } + if ts_bin.ns + channel_config.time_bin_size <= range.beg { + continue; + } + + let path = paths::datapath(tb, &channel_config, &node); + let mut file = OpenOptions::new().read(true).open(&path).await?; + info!("opened file {:?} {:?}", &path, &file); + + { + let index_path = paths::index_path(ts_bin, &channel_config, &node)?; + match OpenOptions::new().read(true).open(&index_path).await { + Ok(mut index_file) => { + let meta = index_file.metadata().await?; + if meta.len() > 1024 * 1024 * 10 { + return Err(Error::with_msg(format!( + "too large index file {} bytes for {}", + meta.len(), + channel_config.channel.name + ))); + } + if meta.len() % 16 != 0 { + return Err(Error::with_msg(format!( + "bad meta len {} for {}", + meta.len(), + channel_config.channel.name + ))); + } + let mut buf = BytesMut::with_capacity(meta.len() as usize); + buf.resize(buf.capacity(), 0); + info!("read exact index file {} {}", buf.len(), buf.len() % 16); + index_file.read_exact(&mut buf).await?; + match find_ge(range.beg, &buf)? { + Some(o) => { + info!("FOUND ts IN INDEX: {:?}", o); + file.seek(SeekFrom::Start(o.1)).await?; + } + None => { + info!("NOT FOUND IN INDEX"); + file.seek(SeekFrom::End(0)).await?; + } + } + } + Err(e) => match e.kind() { + ErrorKind::NotFound => { + // TODO Read first 1k, assume that channel header fits. + // TODO Seek via binary search. Can not read whole file into memory! + todo!("Seek directly in scalar file"); + } + _ => Err(e)?, + }, + } + } + + // TODO Since I want to seek into the data file, the consumer of this channel must not expect a file channel name header. + + chtx.send(Ok(file)).await?; + } + Ok(()) +} + +fn find_ge(h: u64, buf: &[u8]) -> Result, Error> { + trace!("find_ge {}", h); + const N: usize = 2 * size_of::(); + let n1 = buf.len(); + if n1 % N != 0 { + return Err(Error::with_msg(format!("find_ge bad len {}", n1))); + } + if n1 == 0 { + warn!("Empty index data"); + return Ok(None); + } + let n1 = n1 / N; + let a = unsafe { + let ptr = &buf[0] as *const u8 as *const ([u8; 8], [u8; 8]); + std::slice::from_raw_parts(ptr, n1) + }; + let mut j = 0; + let mut k = n1 - 1; + let x = u64::from_be_bytes(a[j].0); + let y = u64::from_be_bytes(a[k].0); + trace!("first/last ts: {} {}", x, y); + if x >= h { + return Ok(Some((u64::from_be_bytes(a[j].0), u64::from_be_bytes(a[j].1)))); + } + if y < h { + return Ok(None); + } + loop { + if k - j < 2 { + let ret = (u64::from_be_bytes(a[k].0), u64::from_be_bytes(a[k].1)); + trace!("FOUND {:?}", ret); + return Ok(Some(ret)); + } + let m = (k + j) / 2; + let x = u64::from_be_bytes(a[m].0); + trace!("CHECK NEW M: {}", x); + if x < h { + j = m; + } else { + k = m; + } + } +} diff --git a/disk/src/eventblobs.rs b/disk/src/eventblobs.rs index c144a98..2f92a03 100644 --- a/disk/src/eventblobs.rs +++ b/disk/src/eventblobs.rs @@ -1,5 +1,6 @@ +use crate::dataopen::open_files; use crate::eventchunker::{EventChunker, EventFull}; -use crate::{file_content_stream, open_files}; +use crate::file_content_stream; use err::Error; use futures_core::Stream; use futures_util::StreamExt; @@ -48,7 +49,8 @@ impl Stream for EventBlobsComplete { Ready(Some(k)) => match k { Ok(file) => { let inp = Box::pin(file_content_stream(file, self.buffer_size as usize)); - let chunker = EventChunker::new(inp, self.channel_config.clone(), self.range.clone()); + let chunker = + EventChunker::from_event_boundary(inp, self.channel_config.clone(), self.range.clone()); self.evs = Some(chunker); continue 'outer; } @@ -75,7 +77,7 @@ pub fn event_blobs_complete( match fileres { Ok(file) => { let inp = Box::pin(file_content_stream(file, query.buffer_size as usize)); - let mut chunker = EventChunker::new(inp, err::todoval(), err::todoval()); + let mut chunker = EventChunker::from_event_boundary(inp, err::todoval(), err::todoval()); while let Some(evres) = chunker.next().await { match evres { Ok(evres) => { diff --git a/disk/src/eventchunker.rs b/disk/src/eventchunker.rs index 1700297..061f176 100644 --- a/disk/src/eventchunker.rs +++ b/disk/src/eventchunker.rs @@ -29,7 +29,7 @@ enum DataFileState { } impl EventChunker { - pub fn new( + pub fn from_start( inp: Pin> + Send>>, channel_config: ChannelConfig, range: NanoRange, @@ -37,7 +37,7 @@ impl EventChunker { let mut inp = NeedMinBuffer::new(inp); inp.set_need_min(6); Self { - inp: inp, + inp, polled: 0, state: DataFileState::FileHeader, need_min: 6, @@ -49,6 +49,26 @@ impl EventChunker { } } + pub fn from_event_boundary( + inp: Pin> + Send>>, + channel_config: ChannelConfig, + range: NanoRange, + ) -> Self { + let mut inp = NeedMinBuffer::new(inp); + inp.set_need_min(4); + Self { + inp, + polled: 0, + state: DataFileState::Event, + need_min: 4, + channel_config, + errored: false, + completed: false, + range, + seen_beyond_range: false, + } + } + fn parse_buf(&mut self, buf: &mut BytesMut) -> Result { span!(Level::INFO, "EventChunker::parse_buf").in_scope(|| self.parse_buf_inner(buf)) } @@ -194,7 +214,9 @@ impl EventChunker { assert!(c1 as u32 == k1); trace!("decompress result c1 {} k1 {}", c1, k1); if ts < self.range.beg { - warn!("UNNECESSARY EVENT DECOMPRESS {}", ts / SEC); + error!("EVENT BEFORE RANGE {}", ts / SEC); + } else if ts >= self.range.end { + error!("EVENT BEFORE RANGE {}", ts / SEC); } else { ret.add_event( ts, diff --git a/disk/src/gen.rs b/disk/src/gen.rs index 56335d8..781de5a 100644 --- a/disk/src/gen.rs +++ b/disk/src/gen.rs @@ -179,6 +179,37 @@ async fn gen_config( Ok(()) } +struct CountedFile { + file: File, + bytes: u64, +} + +impl CountedFile { + pub fn new(file: File) -> Self { + Self { file, bytes: 0 } + } + pub async fn write_all(&mut self, buf: &[u8]) -> Result { + let l = buf.len(); + let mut i = 0; + loop { + match self.file.write(&buf[i..]).await { + Ok(n) => { + i += n; + self.bytes += n as u64; + if i >= l { + break; + } + } + Err(e) => Err(e)?, + } + } + Ok(i as u64) + } + pub fn written_len(&self) -> u64 { + self.bytes + } +} + struct GenTimebinRes { evix: u64, ts: u64, @@ -201,12 +232,13 @@ async fn gen_timebin( let data_path = path.join(format!("{:019}_{:05}_Data", config.time_bin_size / MS, 0)); let index_path = path.join(format!("{:019}_{:05}_Data_Index", config.time_bin_size / MS, 0)); info!("open data file {:?}", data_path); - let mut file = OpenOptions::new() + let file = OpenOptions::new() .write(true) .create(true) .truncate(true) .open(data_path) .await?; + let mut file = CountedFile::new(file); let mut index_file = if let Shape::Wave(_) = config.shape { info!("open index file {:?}", index_path); let f = OpenOptions::new() @@ -215,7 +247,7 @@ async fn gen_timebin( .truncate(true) .open(index_path) .await?; - Some(f) + Some(CountedFile::new(f)) } else { None }; @@ -234,7 +266,7 @@ async fn gen_timebin( Ok(ret) } -async fn gen_datafile_header(file: &mut File, config: &ChannelConfig) -> Result<(), Error> { +async fn gen_datafile_header(file: &mut CountedFile, config: &ChannelConfig) -> Result<(), Error> { let mut buf = BytesMut::with_capacity(1024); let cnenc = config.channel.name.as_bytes(); let len1 = cnenc.len() + 8; @@ -247,8 +279,8 @@ async fn gen_datafile_header(file: &mut File, config: &ChannelConfig) -> Result< } async fn gen_event( - file: &mut File, - _index_file: Option<&mut File>, + file: &mut CountedFile, + index_file: Option<&mut CountedFile>, evix: u64, ts: u64, config: &ChannelConfig, @@ -305,6 +337,13 @@ async fn gen_event( buf.put_u32(len); buf.as_mut().put_u32(len); } + let z = file.written_len(); file.write_all(buf.as_ref()).await?; + if let Some(f) = index_file { + let mut buf = BytesMut::with_capacity(16); + buf.put_u64(ts); + buf.put_u64(z); + f.write_all(&buf).await?; + } Ok(()) } diff --git a/disk/src/lib.rs b/disk/src/lib.rs index 15d5d3f..9b482eb 100644 --- a/disk/src/lib.rs +++ b/disk/src/lib.rs @@ -1,3 +1,4 @@ +use crate::dataopen::open_files; use crate::dtflags::{ARRAY, BIG_ENDIAN, COMPRESSION, SHAPE}; use bytes::{Bytes, BytesMut}; use err::Error; @@ -21,6 +22,7 @@ pub mod aggtest; pub mod binnedstream; pub mod cache; pub mod channelconfig; +pub mod dataopen; pub mod eventblobs; pub mod eventchunker; pub mod frame; @@ -297,55 +299,6 @@ pub fn raw_concat_channel_read_stream_file_pipe( } } -fn open_files( - range: &NanoRange, - channel_config: &ChannelConfig, - node: Arc, -) -> async_channel::Receiver> { - let channel_config = channel_config.clone(); - let (chtx, chrx) = async_channel::bounded(2); - tokio::spawn(async move { - // TODO reduce usage of `query` and see what we actually need. - // TODO scan the timebins on the filesystem for the potential files first instead of trying to open hundreds in worst case. - - let mut timebins = vec![]; - { - let rd = tokio::fs::read_dir(paths::channel_timebins_dir_path(&channel_config, &node)?).await?; - let mut rd = tokio_stream::wrappers::ReadDirStream::new(rd); - while let Some(e) = rd.next().await { - let e = e?; - let dn = e - .file_name() - .into_string() - .map_err(|e| Error::with_msg(format!("Bad OS path {:?}", e)))?; - let vv = dn.chars().fold(0, |a, x| if x.is_digit(10) { a + 1 } else { a }); - if vv == 19 { - timebins.push(dn.parse::()?); - } - } - } - timebins.sort_unstable(); - info!("TIMEBINS FOUND: {:?}", timebins); - for &tb in &timebins { - let path = paths::datapath(tb, &channel_config, &node); - let fileres = OpenOptions::new().read(true).open(&path).await; - info!("opened file {:?} {:?}", &path, &fileres); - match fileres { - Ok(k) => match chtx.send(Ok(k)).await { - Ok(_) => (), - Err(_) => break, - }, - Err(e) => match chtx.send(Err(e.into())).await { - Ok(_) => (), - Err(_) => break, - }, - } - } - Ok::<_, Error>(()) - }); - chrx -} - pub fn file_content_stream( mut file: tokio::fs::File, buffer_size: usize, @@ -379,7 +332,7 @@ pub fn parsed1( Ok(file) => { let inp = Box::pin(file_content_stream(file, query.buffer_size as usize)); let range = err::todoval(); - let mut chunker = eventchunker::EventChunker::new(inp, err::todoval(), range); + let mut chunker = eventchunker::EventChunker::from_event_boundary(inp, err::todoval(), range); while let Some(evres) = chunker.next().await { match evres { Ok(evres) => { diff --git a/disk/src/merge.rs b/disk/src/merge.rs index 9198993..726ace6 100644 --- a/disk/src/merge.rs +++ b/disk/src/merge.rs @@ -1,4 +1,5 @@ -use crate::agg::{Dim1F32Stream, MinMaxAvgScalarEventBatch, ValuesDim1}; +use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; +use crate::agg::{Dim1F32Stream, ValuesDim1}; use crate::eventchunker::EventFull; use err::Error; use futures_core::Stream; diff --git a/disk/src/raw.rs b/disk/src/raw.rs index d5a80d2..163dbfb 100644 --- a/disk/src/raw.rs +++ b/disk/src/raw.rs @@ -5,7 +5,7 @@ Delivers event data (not yet time-binned) from local storage and provides client to request such data from nodes. */ -use crate::agg::MinMaxAvgScalarEventBatch; +use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use crate::frame::inmem::InMemoryFrameAsyncReadStream; use crate::frame::makeframe::{make_frame, make_term_frame}; use crate::raw::bffr::MinMaxAvgScalarEventBatchStreamFromFrames; diff --git a/disk/src/raw/bffr.rs b/disk/src/raw/bffr.rs index 5c13ecc..9e4e4c8 100644 --- a/disk/src/raw/bffr.rs +++ b/disk/src/raw/bffr.rs @@ -1,4 +1,4 @@ -use crate::agg::MinMaxAvgScalarEventBatch; +use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use crate::frame::inmem::InMemoryFrameAsyncReadStream; use crate::frame::makeframe::FrameType; use crate::raw::conn::RawConnOut; diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index f2c4e28..a636a17 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -1,4 +1,6 @@ -use crate::agg::{IntoBinnedXBins1, IntoDim1F32Stream, MinMaxAvgScalarEventBatch}; +use crate::agg::binnedx::IntoBinnedXBins1; +use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; +use crate::agg::IntoDim1F32Stream; use crate::channelconfig::{extract_matching_config_entry, read_local_config}; use crate::eventblobs::EventBlobsComplete; use crate::frame::inmem::InMemoryFrameAsyncReadStream; @@ -128,11 +130,7 @@ async fn raw_conn_handler_inner_try( Ok(_) => (), Err(e) => return Err((e, netout))?, } - debug!("REQUEST FOR RANGE {:?}", evq.range); - error!( - "TODO decide on response content based on the parsed json query\n{:?}", - evq - ); + debug!("REQUEST {:?}", evq); let range = &evq.range; let channel_config = match read_local_config(&evq.channel, node_config.clone()).await { Ok(k) => k, @@ -182,7 +180,6 @@ async fn raw_conn_handler_inner_try( buffer_size, ) .into_dim_1_f32_stream() - .take(10) .into_binned_x_bins_1(); let mut e = 0; while let Some(item) = s1.next().await { diff --git a/err/src/lib.rs b/err/src/lib.rs index 727ea0a..0f7f1a9 100644 --- a/err/src/lib.rs +++ b/err/src/lib.rs @@ -90,6 +90,8 @@ impl std::fmt::Display for Error { } } +impl std::error::Error for Error {} + impl From for Error { fn from(k: String) -> Self { Self::with_msg(k) @@ -102,8 +104,6 @@ impl From<&str> for Error { } } -impl std::error::Error for Error {} - impl From for Error { fn from(k: std::io::Error) -> Self { Self::with_msg(k.to_string()) @@ -192,6 +192,12 @@ impl From for Error { } } +impl From> for Error { + fn from(k: async_channel::SendError) -> Self { + Self::with_msg(k.to_string()) + } +} + pub fn todoval() -> T { todo!("TODO todoval") } diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index 7d72583..a96e083 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -151,7 +151,7 @@ pub enum TimeRange { Nano { beg: u64, end: u64 }, } -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Copy, Debug, Serialize, Deserialize)] pub struct Nanos { pub ns: u64, } diff --git a/retrieval/src/test.rs b/retrieval/src/test.rs index 4dc6ffb..b5dda16 100644 --- a/retrieval/src/test.rs +++ b/retrieval/src/test.rs @@ -48,8 +48,8 @@ async fn get_cached_0_inner() -> Result<(), Error> { let cluster = Arc::new(test_cluster()); let node0 = &cluster.nodes[0]; let hosts = spawn_test_hosts(cluster.clone()); - let beg_date: chrono::DateTime = "1970-01-01T00:00:10.000Z".parse()?; - let end_date: chrono::DateTime = "1970-01-01T00:00:51.000Z".parse()?; + let beg_date: chrono::DateTime = "1970-01-01T00:20:10.000Z".parse()?; + let end_date: chrono::DateTime = "1970-01-01T00:20:51.000Z".parse()?; let channel_backend = "back"; let channel_name = "wave1"; let bin_count = 4; diff --git a/taskrun/src/lib.rs b/taskrun/src/lib.rs index f07ce57..394b842 100644 --- a/taskrun/src/lib.rs +++ b/taskrun/src/lib.rs @@ -56,7 +56,7 @@ pub fn tracing_init() { .with_thread_names(true) //.with_max_level(tracing::Level::INFO) .with_env_filter(tracing_subscriber::EnvFilter::new( - "info,retrieval=trace,retrieval::test=trace,disk::raw::conn=trace,tokio_postgres=info", + "info,retrieval=trace,retrieval::test=trace,disk::raw::conn=info", )) .init(); }