Open data via index file

This commit is contained in:
Dominik Werder
2021-04-28 11:35:06 +02:00
parent 09ea4175dc
commit 0204c37017
23 changed files with 966 additions and 709 deletions

View File

@@ -3,20 +3,22 @@ Aggregation and binning support.
*/
use super::eventchunker::EventFull;
use bytes::{BufMut, Bytes, BytesMut};
use crate::agg::eventbatch::MinMaxAvgScalarEventBatch;
use err::Error;
use futures_core::Stream;
use futures_util::StreamExt;
use netpod::timeunits::SEC;
use netpod::{BinSpecDimT, NanoRange};
use netpod::NanoRange;
use netpod::{Node, ScalarType};
use serde::{Deserialize, Serialize};
use std::mem::size_of;
use std::pin::Pin;
use std::task::{Context, Poll};
#[allow(unused_imports)]
use tracing::{debug, error, info, span, trace, warn, Level};
pub mod binnedt;
pub mod binnedx;
pub mod eventbatch;
pub mod scalarbinbatch;
pub trait AggregatorTdim {
type InputValue;
type OutputValue: AggregatableXdim1Bin + AggregatableTdim;
@@ -35,7 +37,7 @@ pub trait AggregatableXdim1Bin {
pub trait AggregatableTdim {
type Output: AggregatableXdim1Bin + AggregatableTdim;
type Aggregator: AggregatorTdim<InputValue = Self>;
fn aggregator_new(&self, ts1: u64, ts2: u64) -> Self::Aggregator;
fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator;
}
/// DO NOT USE. This is just a dummy for some testing.
@@ -49,7 +51,7 @@ impl AggregatableXdim1Bin for () {
impl AggregatableTdim for () {
type Output = ();
type Aggregator = ();
fn aggregator_new(&self, _ts1: u64, _ts2: u64) -> Self::Aggregator {
fn aggregator_new_static(_ts1: u64, _ts2: u64) -> Self::Aggregator {
todo!()
}
}
@@ -201,329 +203,6 @@ impl AggregatableXdim1Bin for ValuesDim0 {
}
}
#[derive(Serialize, Deserialize)]
pub struct MinMaxAvgScalarEventBatch {
pub tss: Vec<u64>,
pub mins: Vec<f32>,
pub maxs: Vec<f32>,
pub avgs: Vec<f32>,
}
impl MinMaxAvgScalarEventBatch {
pub fn empty() -> Self {
Self {
tss: vec![],
mins: vec![],
maxs: vec![],
avgs: vec![],
}
}
pub fn from_full_frame(buf: &Bytes) -> Self {
info!("construct MinMaxAvgScalarEventBatch from full frame len {}", buf.len());
assert!(buf.len() >= 4);
let mut g = MinMaxAvgScalarEventBatch::empty();
let n1;
unsafe {
let ptr = (&buf[0] as *const u8) as *const [u8; 4];
n1 = u32::from_le_bytes(*ptr);
trace!("--- +++ --- +++ --- +++ n1: {}", n1);
}
if n1 == 0 {
g
} else {
let n2 = n1 as usize;
g.tss.reserve(n2);
g.mins.reserve(n2);
g.maxs.reserve(n2);
g.avgs.reserve(n2);
unsafe {
// TODO Can I unsafely create ptrs and just assign them?
// TODO What are cases where I really need transmute?
g.tss.set_len(n2);
g.mins.set_len(n2);
g.maxs.set_len(n2);
g.avgs.set_len(n2);
let ptr0 = &buf[4] as *const u8;
{
let ptr1 = ptr0 as *const u64;
for i1 in 0..n2 {
g.tss[i1] = *ptr1.add(i1);
}
}
{
let ptr1 = ptr0.add((8) * n2) as *const f32;
for i1 in 0..n2 {
g.mins[i1] = *ptr1.add(i1);
}
}
{
let ptr1 = ptr0.add((8 + 4) * n2) as *const f32;
for i1 in 0..n2 {
g.maxs[i1] = *ptr1;
}
}
{
let ptr1 = ptr0.add((8 + 4 + 4) * n2) as *const f32;
for i1 in 0..n2 {
g.avgs[i1] = *ptr1;
}
}
}
info!("CONTENT {:?}", g);
g
}
}
}
impl std::fmt::Debug for MinMaxAvgScalarEventBatch {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
fmt,
"MinMaxAvgScalarEventBatch count {} tss {:?} mins {:?} maxs {:?} avgs {:?}",
self.tss.len(),
self.tss,
self.mins,
self.maxs,
self.avgs,
)
}
}
impl AggregatableXdim1Bin for MinMaxAvgScalarEventBatch {
type Output = MinMaxAvgScalarEventBatch;
fn into_agg(self) -> Self::Output {
self
}
}
impl AggregatableTdim for MinMaxAvgScalarEventBatch {
type Output = MinMaxAvgScalarBinBatch;
type Aggregator = MinMaxAvgScalarEventBatchAggregator;
fn aggregator_new(&self, ts1: u64, ts2: u64) -> Self::Aggregator {
MinMaxAvgScalarEventBatchAggregator::new(ts1, ts2)
}
}
impl MinMaxAvgScalarEventBatch {
#[allow(dead_code)]
fn old_serialized(&self) -> Bytes {
let n1 = self.tss.len();
let mut g = BytesMut::with_capacity(4 + n1 * (8 + 3 * 4));
g.put_u32_le(n1 as u32);
if n1 > 0 {
let ptr = &self.tss[0] as *const u64 as *const u8;
let a = unsafe { std::slice::from_raw_parts(ptr, size_of::<u64>() * n1) };
g.put(a);
let ptr = &self.mins[0] as *const f32 as *const u8;
let a = unsafe { std::slice::from_raw_parts(ptr, size_of::<f32>() * n1) };
g.put(a);
let ptr = &self.maxs[0] as *const f32 as *const u8;
let a = unsafe { std::slice::from_raw_parts(ptr, size_of::<f32>() * n1) };
g.put(a);
let ptr = &self.avgs[0] as *const f32 as *const u8;
let a = unsafe { std::slice::from_raw_parts(ptr, size_of::<f32>() * n1) };
g.put(a);
}
info!("impl Frameable for MinMaxAvgScalarEventBatch g.len() {}", g.len());
g.freeze()
}
}
pub struct MinMaxAvgScalarEventBatchAggregator {
ts1: u64,
ts2: u64,
count: u64,
min: f32,
max: f32,
sum: f32,
}
impl MinMaxAvgScalarEventBatchAggregator {
pub fn new(ts1: u64, ts2: u64) -> Self {
Self {
ts1,
ts2,
min: f32::MAX,
max: f32::MIN,
sum: 0f32,
count: 0,
}
}
}
impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator {
type InputValue = MinMaxAvgScalarEventBatch;
type OutputValue = MinMaxAvgScalarBinSingle;
fn ends_before(&self, inp: &Self::InputValue) -> bool {
match inp.tss.last() {
Some(ts) => *ts < self.ts1,
None => true,
}
}
fn ends_after(&self, inp: &Self::InputValue) -> bool {
match inp.tss.last() {
Some(ts) => *ts >= self.ts2,
_ => panic!(),
}
}
fn starts_after(&self, inp: &Self::InputValue) -> bool {
match inp.tss.first() {
Some(ts) => *ts >= self.ts2,
_ => panic!(),
}
}
fn ingest(&mut self, v: &Self::InputValue) {
for i1 in 0..v.tss.len() {
let ts = v.tss[i1];
if ts < self.ts1 {
//info!("EventBatchAgg {} {} {} {} IS BEFORE", v.tss[i1], v.mins[i1], v.maxs[i1], v.avgs[i1]);
continue;
} else if ts >= self.ts2 {
//info!("EventBatchAgg {} {} {} {} IS AFTER", v.tss[i1], v.mins[i1], v.maxs[i1], v.avgs[i1]);
continue;
} else {
//info!("EventBatchAgg {} {} {} {}", v.tss[i1], v.mins[i1], v.maxs[i1], v.avgs[i1]);
self.min = self.min.min(v.mins[i1]);
self.max = self.max.max(v.maxs[i1]);
self.sum += v.avgs[i1];
self.count += 1;
}
}
}
fn result(self) -> Self::OutputValue {
let min = if self.min == f32::MAX { f32::NAN } else { self.min };
let max = if self.max == f32::MIN { f32::NAN } else { self.max };
let avg = if self.count == 0 {
f32::NAN
} else {
self.sum / self.count as f32
};
MinMaxAvgScalarBinSingle {
ts1: self.ts1,
ts2: self.ts2,
count: self.count,
min,
max,
avg,
}
}
}
#[allow(dead_code)]
#[derive(Serialize, Deserialize)]
pub struct MinMaxAvgScalarBinBatch {
ts1s: Vec<u64>,
ts2s: Vec<u64>,
counts: Vec<u64>,
mins: Vec<f32>,
maxs: Vec<f32>,
avgs: Vec<f32>,
}
impl MinMaxAvgScalarBinBatch {
pub fn empty() -> Self {
Self {
ts1s: vec![],
ts2s: vec![],
counts: vec![],
mins: vec![],
maxs: vec![],
avgs: vec![],
}
}
pub fn len(&self) -> usize {
self.ts1s.len()
}
pub fn push_single(&mut self, g: &MinMaxAvgScalarBinSingle) {
self.ts1s.push(g.ts1);
self.ts2s.push(g.ts2);
self.counts.push(g.count);
self.mins.push(g.min);
self.maxs.push(g.max);
self.avgs.push(g.avg);
}
pub fn from_full_frame(buf: &Bytes) -> Self {
info!("MinMaxAvgScalarBinBatch construct from full frame len {}", buf.len());
assert!(buf.len() >= 4);
let mut g = MinMaxAvgScalarBinBatch::empty();
let n1;
unsafe {
let ptr = (&buf[0] as *const u8) as *const [u8; 4];
n1 = u32::from_le_bytes(*ptr);
trace!(
"MinMaxAvgScalarBinBatch construct --- +++ --- +++ --- +++ n1: {}",
n1
);
}
if n1 == 0 {
g
} else {
let n2 = n1 as usize;
g.ts1s.reserve(n2);
g.ts2s.reserve(n2);
g.counts.reserve(n2);
g.mins.reserve(n2);
g.maxs.reserve(n2);
g.avgs.reserve(n2);
unsafe {
// TODO Can I unsafely create ptrs and just assign them?
// TODO What are cases where I really need transmute?
g.ts1s.set_len(n2);
g.ts2s.set_len(n2);
g.counts.set_len(n2);
g.mins.set_len(n2);
g.maxs.set_len(n2);
g.avgs.set_len(n2);
let ptr0 = &buf[4] as *const u8;
{
let ptr1 = ptr0.add(0) as *const u64;
for i1 in 0..n2 {
g.ts1s[i1] = *ptr1.add(i1);
}
}
{
let ptr1 = ptr0.add((8) * n2) as *const u64;
for i1 in 0..n2 {
g.ts2s[i1] = *ptr1.add(i1);
}
}
{
let ptr1 = ptr0.add((8 + 8) * n2) as *const u64;
for i1 in 0..n2 {
g.counts[i1] = *ptr1.add(i1);
}
}
{
let ptr1 = ptr0.add((8 + 8 + 8) * n2) as *const f32;
for i1 in 0..n2 {
g.mins[i1] = *ptr1.add(i1);
}
}
{
let ptr1 = ptr0.add((8 + 8 + 8 + 4) * n2) as *const f32;
for i1 in 0..n2 {
g.maxs[i1] = *ptr1;
}
}
{
let ptr1 = ptr0.add((8 + 8 + 8 + 4 + 4) * n2) as *const f32;
for i1 in 0..n2 {
g.avgs[i1] = *ptr1;
}
}
}
info!("CONTENT {:?}", g);
g
}
}
}
pub enum Fits {
Empty,
Lower,
@@ -538,116 +217,6 @@ pub trait FitsInside {
fn fits_inside(&self, range: NanoRange) -> Fits;
}
impl FitsInside for MinMaxAvgScalarBinBatch {
fn fits_inside(&self, range: NanoRange) -> Fits {
if self.ts1s.is_empty() {
Fits::Empty
} else {
let t1 = *self.ts1s.first().unwrap();
let t2 = *self.ts2s.last().unwrap();
if t2 <= range.beg {
Fits::Lower
} else if t1 >= range.end {
Fits::Greater
} else if t1 < range.beg && t2 > range.end {
Fits::PartlyLowerAndGreater
} else if t1 < range.beg {
Fits::PartlyLower
} else if t2 > range.end {
Fits::PartlyGreater
} else {
Fits::Inside
}
}
}
}
impl MinMaxAvgScalarBinBatch {
#[allow(dead_code)]
fn old_serialized(&self) -> Bytes {
let n1 = self.ts1s.len();
let mut g = BytesMut::with_capacity(4 + n1 * (3 * 8 + 3 * 4));
g.put_u32_le(n1 as u32);
if n1 > 0 {
let ptr = &self.ts1s[0] as *const u64 as *const u8;
let a = unsafe { std::slice::from_raw_parts(ptr, size_of::<u64>() * n1) };
g.put(a);
let ptr = &self.ts2s[0] as *const u64 as *const u8;
let a = unsafe { std::slice::from_raw_parts(ptr, size_of::<u64>() * n1) };
g.put(a);
let ptr = &self.counts[0] as *const u64 as *const u8;
let a = unsafe { std::slice::from_raw_parts(ptr, size_of::<u64>() * n1) };
g.put(a);
let ptr = &self.mins[0] as *const f32 as *const u8;
let a = unsafe { std::slice::from_raw_parts(ptr, size_of::<f32>() * n1) };
g.put(a);
let ptr = &self.maxs[0] as *const f32 as *const u8;
let a = unsafe { std::slice::from_raw_parts(ptr, size_of::<f32>() * n1) };
g.put(a);
let ptr = &self.avgs[0] as *const f32 as *const u8;
let a = unsafe { std::slice::from_raw_parts(ptr, size_of::<f32>() * n1) };
g.put(a);
}
g.freeze()
}
}
impl std::fmt::Debug for MinMaxAvgScalarBinBatch {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
fmt,
"MinMaxAvgScalarBinBatch count {} ts1s {:?} ts2s {:?} counts {:?} avgs {:?}",
self.ts1s.len(),
self.ts1s.iter().map(|k| k / SEC).collect::<Vec<_>>(),
self.ts2s.iter().map(|k| k / SEC).collect::<Vec<_>>(),
self.counts,
self.avgs
)
}
}
impl AggregatableXdim1Bin for MinMaxAvgScalarBinBatch {
type Output = MinMaxAvgScalarBinBatch;
fn into_agg(self) -> Self::Output {
todo!()
}
}
impl AggregatableTdim for MinMaxAvgScalarBinBatch {
type Output = MinMaxAvgScalarBinSingle;
type Aggregator = MinMaxAvgScalarBinBatchAggregator;
fn aggregator_new(&self, _ts1: u64, _ts2: u64) -> Self::Aggregator {
todo!()
}
}
pub struct MinMaxAvgScalarBinBatchAggregator {}
impl AggregatorTdim for MinMaxAvgScalarBinBatchAggregator {
type InputValue = MinMaxAvgScalarBinBatch;
type OutputValue = MinMaxAvgScalarBinSingle;
fn ends_before(&self, _inp: &Self::InputValue) -> bool {
todo!()
}
fn ends_after(&self, _inp: &Self::InputValue) -> bool {
todo!()
}
fn starts_after(&self, _inp: &Self::InputValue) -> bool {
todo!()
}
fn ingest(&mut self, _v: &Self::InputValue) {
todo!()
}
fn result(self) -> Self::OutputValue {
todo!()
}
}
pub struct MinMaxAvgScalarBinSingle {
ts1: u64,
ts2: u64,
@@ -670,7 +239,7 @@ impl std::fmt::Debug for MinMaxAvgScalarBinSingle {
impl AggregatableTdim for MinMaxAvgScalarBinSingle {
type Output = MinMaxAvgScalarBinSingle;
type Aggregator = MinMaxAvgScalarBinSingleAggregator;
fn aggregator_new(&self, _ts1: u64, _ts2: u64) -> Self::Aggregator {
fn aggregator_new_static(_ts1: u64, _ts2: u64) -> Self::Aggregator {
todo!()
}
}
@@ -871,187 +440,6 @@ where
}
}
pub trait IntoBinnedXBins1<I: AggregatableXdim1Bin> {
type StreamOut;
fn into_binned_x_bins_1(self) -> Self::StreamOut
where
Self: Stream<Item = Result<I, Error>>;
}
impl<T, I: AggregatableXdim1Bin> IntoBinnedXBins1<I> for T
where
T: Stream<Item = Result<I, Error>> + Unpin,
{
type StreamOut = IntoBinnedXBins1DefaultStream<T, I>;
fn into_binned_x_bins_1(self) -> Self::StreamOut {
IntoBinnedXBins1DefaultStream { inp: self }
}
}
pub struct IntoBinnedXBins1DefaultStream<S, I>
where
S: Stream<Item = Result<I, Error>> + Unpin,
I: AggregatableXdim1Bin,
{
inp: S,
}
impl<S, I> Stream for IntoBinnedXBins1DefaultStream<S, I>
where
S: Stream<Item = Result<I, Error>> + Unpin,
I: AggregatableXdim1Bin,
{
type Item = Result<I::Output, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
match self.inp.poll_next_unpin(cx) {
Ready(Some(Ok(k))) => Ready(Some(Ok(k.into_agg()))),
Ready(Some(Err(e))) => Ready(Some(Err(e))),
Ready(None) => Ready(None),
Pending => Pending,
}
}
}
pub trait IntoBinnedT {
type StreamOut: Stream;
fn into_binned_t(self, spec: BinSpecDimT) -> Self::StreamOut;
}
impl<T, I> IntoBinnedT for T
where
I: AggregatableTdim + Unpin,
T: Stream<Item = Result<I, Error>> + Unpin,
I::Aggregator: Unpin,
{
type StreamOut = IntoBinnedTDefaultStream<T, I>;
fn into_binned_t(self, spec: BinSpecDimT) -> Self::StreamOut {
IntoBinnedTDefaultStream::new(self, spec)
}
}
pub struct IntoBinnedTDefaultStream<S, I>
where
I: AggregatableTdim,
S: Stream<Item = Result<I, Error>>,
{
inp: S,
aggtor: Option<I::Aggregator>,
spec: BinSpecDimT,
curbin: u32,
left: Option<Poll<Option<Result<I, Error>>>>,
errored: bool,
completed: bool,
inp_completed: bool,
}
impl<S, I> IntoBinnedTDefaultStream<S, I>
where
I: AggregatableTdim,
S: Stream<Item = Result<I, Error>>,
{
pub fn new(inp: S, spec: BinSpecDimT) -> Self {
//info!("spec ts {} {}", spec.ts1, spec.ts2);
Self {
inp,
aggtor: None,
spec,
curbin: 0,
left: None,
errored: false,
completed: false,
inp_completed: false,
}
}
}
impl<T, I> Stream for IntoBinnedTDefaultStream<T, I>
where
I: AggregatableTdim + Unpin,
T: Stream<Item = Result<I, Error>> + Unpin,
I::Aggregator: Unpin,
{
type Item = Result<<I::Aggregator as AggregatorTdim>::OutputValue, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
if self.completed {
panic!("MergedFromRemotes ✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗ poll_next on completed");
}
if self.errored {
self.completed = true;
return Ready(None);
}
'outer: loop {
let cur = if self.curbin as u64 >= self.spec.count {
trace!("IntoBinnedTDefaultStream curbin out of spec, END");
Ready(None)
} else if let Some(k) = self.left.take() {
trace!("IntoBinnedTDefaultStream USE LEFTOVER");
k
} else if self.inp_completed {
Ready(None)
} else {
let inp_poll_span = span!(Level::TRACE, "into_t_inp_poll");
inp_poll_span.in_scope(|| self.inp.poll_next_unpin(cx))
};
break match cur {
Ready(Some(Ok(k))) => {
if self.aggtor.is_none() {
let range = self.spec.get_range(self.curbin);
//info!("range: {} {}", range.ts1, range.ts2);
self.aggtor = Some(k.aggregator_new(range.beg, range.end));
}
let ag = self.aggtor.as_mut().unwrap();
if ag.ends_before(&k) {
//info!("ENDS BEFORE");
continue 'outer;
} else if ag.starts_after(&k) {
//info!("STARTS AFTER");
self.left = Some(Ready(Some(Ok(k))));
self.curbin += 1;
Ready(Some(Ok(self.aggtor.take().unwrap().result())))
} else {
//info!("INGEST");
ag.ingest(&k);
// if this input contains also data after the current bin, then I need to keep
// it for the next round.
if ag.ends_after(&k) {
//info!("ENDS AFTER");
self.left = Some(Ready(Some(Ok(k))));
self.curbin += 1;
Ready(Some(Ok(self.aggtor.take().unwrap().result())))
} else {
//info!("ENDS WITHIN");
continue 'outer;
}
}
}
Ready(Some(Err(e))) => {
error!("IntoBinnedTDefaultStream err from input");
self.errored = true;
Ready(Some(Err(e)))
}
Ready(None) => {
self.inp_completed = true;
match self.aggtor.take() {
Some(ag) => Ready(Some(Ok(ag.result()))),
None => {
warn!("TODO add the trailing empty bins until requested range is complete");
self.completed = true;
Ready(None)
}
}
}
Pending => Pending,
};
}
}
}
pub fn make_test_node(id: u32) -> Node {
Node {
id,

154
disk/src/agg/binnedt.rs Normal file
View File

@@ -0,0 +1,154 @@
use crate::agg::{AggregatableTdim, AggregatorTdim};
use err::Error;
use futures_core::Stream;
use futures_util::StreamExt;
use netpod::log::*;
use netpod::BinSpecDimT;
use std::pin::Pin;
use std::task::{Context, Poll};
pub trait IntoBinnedT {
type StreamOut: Stream;
fn into_binned_t(self, spec: BinSpecDimT) -> Self::StreamOut;
}
impl<T, I> IntoBinnedT for T
where
I: AggregatableTdim + Unpin,
T: Stream<Item = Result<I, Error>> + Unpin,
I::Aggregator: Unpin,
{
type StreamOut = IntoBinnedTDefaultStream<T, I>;
fn into_binned_t(self, spec: BinSpecDimT) -> Self::StreamOut {
IntoBinnedTDefaultStream::new(self, spec)
}
}
pub struct IntoBinnedTDefaultStream<S, I>
where
I: AggregatableTdim,
S: Stream<Item = Result<I, Error>>,
{
inp: S,
aggtor: Option<I::Aggregator>,
spec: BinSpecDimT,
curbin: u32,
left: Option<Poll<Option<Result<I, Error>>>>,
errored: bool,
completed: bool,
inp_completed: bool,
}
impl<S, I> IntoBinnedTDefaultStream<S, I>
where
I: AggregatableTdim,
S: Stream<Item = Result<I, Error>>,
{
pub fn new(inp: S, spec: BinSpecDimT) -> Self {
let range = spec.get_range(0);
Self {
inp,
aggtor: Some(I::aggregator_new_static(range.beg, range.end)),
spec,
curbin: 0,
left: None,
errored: false,
completed: false,
inp_completed: false,
}
}
}
impl<T, I> Stream for IntoBinnedTDefaultStream<T, I>
where
I: AggregatableTdim + Unpin,
T: Stream<Item = Result<I, Error>> + Unpin,
I::Aggregator: Unpin,
{
type Item = Result<<I::Aggregator as AggregatorTdim>::OutputValue, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
if self.completed {
panic!("MergedFromRemotes ✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗ poll_next on completed");
}
if self.errored {
self.completed = true;
return Ready(None);
}
'outer: loop {
let cur = if let Some(k) = self.left.take() {
trace!("IntoBinnedTDefaultStream USE LEFTOVER");
k
} else if self.inp_completed {
Ready(None)
} else {
let inp_poll_span = span!(Level::TRACE, "into_t_inp_poll");
inp_poll_span.in_scope(|| self.inp.poll_next_unpin(cx))
};
break match cur {
Ready(Some(Ok(k))) => {
let ag = self.aggtor.as_mut().unwrap();
if ag.ends_before(&k) {
//info!("ENDS BEFORE");
continue 'outer;
} else if ag.starts_after(&k) {
//info!("STARTS AFTER");
self.left = Some(Ready(Some(Ok(k))));
self.curbin += 1;
let range = self.spec.get_range(self.curbin);
let ret = self
.aggtor
.replace(I::aggregator_new_static(range.beg, range.end))
.unwrap()
.result();
Ready(Some(Ok(ret)))
} else {
//info!("INGEST");
ag.ingest(&k);
// if this input contains also data after the current bin, then I need to keep
// it for the next round.
if ag.ends_after(&k) {
//info!("ENDS AFTER");
self.left = Some(Ready(Some(Ok(k))));
self.curbin += 1;
let range = self.spec.get_range(self.curbin);
let ret = self
.aggtor
.replace(I::aggregator_new_static(range.beg, range.end))
.unwrap()
.result();
Ready(Some(Ok(ret)))
} else {
//info!("ENDS WITHIN");
continue 'outer;
}
}
}
Ready(Some(Err(e))) => {
self.errored = true;
Ready(Some(Err(e)))
}
Ready(None) => {
self.inp_completed = true;
if self.curbin as u64 >= self.spec.count {
warn!("IntoBinnedTDefaultStream curbin out of spec, END");
self.completed = true;
Ready(None)
} else {
self.curbin += 1;
let range = self.spec.get_range(self.curbin);
match self.aggtor.replace(I::aggregator_new_static(range.beg, range.end)) {
Some(ag) => Ready(Some(Ok(ag.result()))),
None => {
panic!();
}
}
}
}
Pending => Pending,
};
}
}
}

50
disk/src/agg/binnedx.rs Normal file
View File

@@ -0,0 +1,50 @@
use crate::agg::AggregatableXdim1Bin;
use err::Error;
use futures_core::Stream;
use futures_util::StreamExt;
use std::pin::Pin;
use std::task::{Context, Poll};
pub trait IntoBinnedXBins1<I: AggregatableXdim1Bin> {
type StreamOut;
fn into_binned_x_bins_1(self) -> Self::StreamOut
where
Self: Stream<Item = Result<I, Error>>;
}
impl<T, I: AggregatableXdim1Bin> IntoBinnedXBins1<I> for T
where
T: Stream<Item = Result<I, Error>> + Unpin,
{
type StreamOut = IntoBinnedXBins1DefaultStream<T, I>;
fn into_binned_x_bins_1(self) -> Self::StreamOut {
IntoBinnedXBins1DefaultStream { inp: self }
}
}
pub struct IntoBinnedXBins1DefaultStream<S, I>
where
S: Stream<Item = Result<I, Error>> + Unpin,
I: AggregatableXdim1Bin,
{
inp: S,
}
impl<S, I> Stream for IntoBinnedXBins1DefaultStream<S, I>
where
S: Stream<Item = Result<I, Error>> + Unpin,
I: AggregatableXdim1Bin,
{
type Item = Result<I::Output, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
match self.inp.poll_next_unpin(cx) {
Ready(Some(Ok(k))) => Ready(Some(Ok(k.into_agg()))),
Ready(Some(Err(e))) => Ready(Some(Err(e))),
Ready(None) => Ready(None),
Pending => Pending,
}
}
}

247
disk/src/agg/eventbatch.rs Normal file
View File

@@ -0,0 +1,247 @@
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
use crate::agg::{AggregatableTdim, AggregatableXdim1Bin, AggregatorTdim, MinMaxAvgScalarBinSingle};
use bytes::{BufMut, Bytes, BytesMut};
use netpod::log::*;
use netpod::timeunits::SEC;
use serde::{Deserialize, Serialize};
use std::mem::size_of;
#[derive(Serialize, Deserialize)]
pub struct MinMaxAvgScalarEventBatch {
pub tss: Vec<u64>,
pub mins: Vec<f32>,
pub maxs: Vec<f32>,
pub avgs: Vec<f32>,
}
impl MinMaxAvgScalarEventBatch {
pub fn empty() -> Self {
Self {
tss: vec![],
mins: vec![],
maxs: vec![],
avgs: vec![],
}
}
#[allow(dead_code)]
pub fn old_from_full_frame(buf: &Bytes) -> Self {
info!("construct MinMaxAvgScalarEventBatch from full frame len {}", buf.len());
assert!(buf.len() >= 4);
let mut g = MinMaxAvgScalarEventBatch::empty();
let n1;
unsafe {
let ptr = (&buf[0] as *const u8) as *const [u8; 4];
n1 = u32::from_le_bytes(*ptr);
trace!("--- +++ --- +++ --- +++ n1: {}", n1);
}
if n1 == 0 {
g
} else {
let n2 = n1 as usize;
g.tss.reserve(n2);
g.mins.reserve(n2);
g.maxs.reserve(n2);
g.avgs.reserve(n2);
unsafe {
// TODO Can I unsafely create ptrs and just assign them?
// TODO What are cases where I really need transmute?
g.tss.set_len(n2);
g.mins.set_len(n2);
g.maxs.set_len(n2);
g.avgs.set_len(n2);
let ptr0 = &buf[4] as *const u8;
{
let ptr1 = ptr0 as *const u64;
for i1 in 0..n2 {
g.tss[i1] = *ptr1.add(i1);
}
}
{
let ptr1 = ptr0.add((8) * n2) as *const f32;
for i1 in 0..n2 {
g.mins[i1] = *ptr1.add(i1);
}
}
{
let ptr1 = ptr0.add((8 + 4) * n2) as *const f32;
for i1 in 0..n2 {
g.maxs[i1] = *ptr1;
}
}
{
let ptr1 = ptr0.add((8 + 4 + 4) * n2) as *const f32;
for i1 in 0..n2 {
g.avgs[i1] = *ptr1;
}
}
}
info!("CONTENT {:?}", g);
g
}
}
}
impl std::fmt::Debug for MinMaxAvgScalarEventBatch {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
fmt,
"MinMaxAvgScalarEventBatch count {} tss {:?} mins {:?} maxs {:?} avgs {:?}",
self.tss.len(),
self.tss,
self.mins,
self.maxs,
self.avgs,
)
}
}
impl AggregatableXdim1Bin for MinMaxAvgScalarEventBatch {
type Output = MinMaxAvgScalarEventBatch;
fn into_agg(self) -> Self::Output {
self
}
}
impl AggregatableTdim for MinMaxAvgScalarEventBatch {
type Output = MinMaxAvgScalarBinBatch;
type Aggregator = MinMaxAvgScalarEventBatchAggregator;
fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator {
MinMaxAvgScalarEventBatchAggregator::new(ts1, ts2)
}
}
impl MinMaxAvgScalarEventBatch {
#[allow(dead_code)]
fn old_serialized(&self) -> Bytes {
let n1 = self.tss.len();
let mut g = BytesMut::with_capacity(4 + n1 * (8 + 3 * 4));
g.put_u32_le(n1 as u32);
if n1 > 0 {
let ptr = &self.tss[0] as *const u64 as *const u8;
let a = unsafe { std::slice::from_raw_parts(ptr, size_of::<u64>() * n1) };
g.put(a);
let ptr = &self.mins[0] as *const f32 as *const u8;
let a = unsafe { std::slice::from_raw_parts(ptr, size_of::<f32>() * n1) };
g.put(a);
let ptr = &self.maxs[0] as *const f32 as *const u8;
let a = unsafe { std::slice::from_raw_parts(ptr, size_of::<f32>() * n1) };
g.put(a);
let ptr = &self.avgs[0] as *const f32 as *const u8;
let a = unsafe { std::slice::from_raw_parts(ptr, size_of::<f32>() * n1) };
g.put(a);
}
info!("impl Frameable for MinMaxAvgScalarEventBatch g.len() {}", g.len());
g.freeze()
}
}
pub struct MinMaxAvgScalarEventBatchAggregator {
ts1: u64,
ts2: u64,
count: u64,
min: f32,
max: f32,
sum: f32,
}
impl MinMaxAvgScalarEventBatchAggregator {
pub fn new(ts1: u64, ts2: u64) -> Self {
Self {
ts1,
ts2,
min: f32::MAX,
max: f32::MIN,
sum: 0f32,
count: 0,
}
}
}
impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator {
type InputValue = MinMaxAvgScalarEventBatch;
type OutputValue = MinMaxAvgScalarBinSingle;
fn ends_before(&self, inp: &Self::InputValue) -> bool {
match inp.tss.last() {
Some(ts) => *ts < self.ts1,
None => true,
}
}
fn ends_after(&self, inp: &Self::InputValue) -> bool {
match inp.tss.last() {
Some(ts) => *ts >= self.ts2,
_ => panic!(),
}
}
fn starts_after(&self, inp: &Self::InputValue) -> bool {
match inp.tss.first() {
Some(ts) => *ts >= self.ts2,
_ => panic!(),
}
}
fn ingest(&mut self, v: &Self::InputValue) {
trace!(
"ingest {} {} {} {:?} {:?}",
self.ends_before(v),
self.ends_after(v),
self.starts_after(v),
v.tss.first().map(|k| k / SEC),
v.tss.last().map(|k| k / SEC),
);
for i1 in 0..v.tss.len() {
let ts = v.tss[i1];
if ts < self.ts1 {
trace!(
"EventBatchAgg {} {} {} {} IS BEFORE",
v.tss[i1],
v.mins[i1],
v.maxs[i1],
v.avgs[i1]
);
continue;
} else if ts >= self.ts2 {
trace!(
"EventBatchAgg {} {} {} {} IS AFTER",
v.tss[i1],
v.mins[i1],
v.maxs[i1],
v.avgs[i1]
);
continue;
} else {
trace!(
"EventBatchAgg {} {} {} {}",
v.tss[i1],
v.mins[i1],
v.maxs[i1],
v.avgs[i1]
);
self.min = self.min.min(v.mins[i1]);
self.max = self.max.max(v.maxs[i1]);
self.sum += v.avgs[i1];
self.count += 1;
}
}
}
fn result(self) -> Self::OutputValue {
let min = if self.min == f32::MAX { f32::NAN } else { self.min };
let max = if self.max == f32::MIN { f32::NAN } else { self.max };
let avg = if self.count == 0 {
f32::NAN
} else {
self.sum / self.count as f32
};
MinMaxAvgScalarBinSingle {
ts1: self.ts1,
ts2: self.ts2,
count: self.count,
min,
max,
avg,
}
}
}

View File

@@ -0,0 +1,226 @@
use crate::agg::{AggregatableTdim, AggregatableXdim1Bin, AggregatorTdim, Fits, FitsInside, MinMaxAvgScalarBinSingle};
use bytes::{BufMut, Bytes, BytesMut};
use netpod::log::*;
use netpod::timeunits::SEC;
use netpod::NanoRange;
use serde::{Deserialize, Serialize};
use std::mem::size_of;
#[allow(dead_code)]
#[derive(Serialize, Deserialize)]
pub struct MinMaxAvgScalarBinBatch {
ts1s: Vec<u64>,
ts2s: Vec<u64>,
counts: Vec<u64>,
mins: Vec<f32>,
maxs: Vec<f32>,
avgs: Vec<f32>,
}
impl MinMaxAvgScalarBinBatch {
pub fn empty() -> Self {
Self {
ts1s: vec![],
ts2s: vec![],
counts: vec![],
mins: vec![],
maxs: vec![],
avgs: vec![],
}
}
pub fn len(&self) -> usize {
self.ts1s.len()
}
pub fn push_single(&mut self, g: &MinMaxAvgScalarBinSingle) {
self.ts1s.push(g.ts1);
self.ts2s.push(g.ts2);
self.counts.push(g.count);
self.mins.push(g.min);
self.maxs.push(g.max);
self.avgs.push(g.avg);
}
pub fn from_full_frame(buf: &Bytes) -> Self {
info!("MinMaxAvgScalarBinBatch construct from full frame len {}", buf.len());
assert!(buf.len() >= 4);
let mut g = MinMaxAvgScalarBinBatch::empty();
let n1;
unsafe {
let ptr = (&buf[0] as *const u8) as *const [u8; 4];
n1 = u32::from_le_bytes(*ptr);
trace!(
"MinMaxAvgScalarBinBatch construct --- +++ --- +++ --- +++ n1: {}",
n1
);
}
if n1 == 0 {
g
} else {
let n2 = n1 as usize;
g.ts1s.reserve(n2);
g.ts2s.reserve(n2);
g.counts.reserve(n2);
g.mins.reserve(n2);
g.maxs.reserve(n2);
g.avgs.reserve(n2);
unsafe {
// TODO Can I unsafely create ptrs and just assign them?
// TODO What are cases where I really need transmute?
g.ts1s.set_len(n2);
g.ts2s.set_len(n2);
g.counts.set_len(n2);
g.mins.set_len(n2);
g.maxs.set_len(n2);
g.avgs.set_len(n2);
let ptr0 = &buf[4] as *const u8;
{
let ptr1 = ptr0.add(0) as *const u64;
for i1 in 0..n2 {
g.ts1s[i1] = *ptr1.add(i1);
}
}
{
let ptr1 = ptr0.add((8) * n2) as *const u64;
for i1 in 0..n2 {
g.ts2s[i1] = *ptr1.add(i1);
}
}
{
let ptr1 = ptr0.add((8 + 8) * n2) as *const u64;
for i1 in 0..n2 {
g.counts[i1] = *ptr1.add(i1);
}
}
{
let ptr1 = ptr0.add((8 + 8 + 8) * n2) as *const f32;
for i1 in 0..n2 {
g.mins[i1] = *ptr1.add(i1);
}
}
{
let ptr1 = ptr0.add((8 + 8 + 8 + 4) * n2) as *const f32;
for i1 in 0..n2 {
g.maxs[i1] = *ptr1;
}
}
{
let ptr1 = ptr0.add((8 + 8 + 8 + 4 + 4) * n2) as *const f32;
for i1 in 0..n2 {
g.avgs[i1] = *ptr1;
}
}
}
info!("CONTENT {:?}", g);
g
}
}
}
impl FitsInside for MinMaxAvgScalarBinBatch {
fn fits_inside(&self, range: NanoRange) -> Fits {
if self.ts1s.is_empty() {
Fits::Empty
} else {
let t1 = *self.ts1s.first().unwrap();
let t2 = *self.ts2s.last().unwrap();
if t2 <= range.beg {
Fits::Lower
} else if t1 >= range.end {
Fits::Greater
} else if t1 < range.beg && t2 > range.end {
Fits::PartlyLowerAndGreater
} else if t1 < range.beg {
Fits::PartlyLower
} else if t2 > range.end {
Fits::PartlyGreater
} else {
Fits::Inside
}
}
}
}
impl MinMaxAvgScalarBinBatch {
#[allow(dead_code)]
fn old_serialized(&self) -> Bytes {
let n1 = self.ts1s.len();
let mut g = BytesMut::with_capacity(4 + n1 * (3 * 8 + 3 * 4));
g.put_u32_le(n1 as u32);
if n1 > 0 {
let ptr = &self.ts1s[0] as *const u64 as *const u8;
let a = unsafe { std::slice::from_raw_parts(ptr, size_of::<u64>() * n1) };
g.put(a);
let ptr = &self.ts2s[0] as *const u64 as *const u8;
let a = unsafe { std::slice::from_raw_parts(ptr, size_of::<u64>() * n1) };
g.put(a);
let ptr = &self.counts[0] as *const u64 as *const u8;
let a = unsafe { std::slice::from_raw_parts(ptr, size_of::<u64>() * n1) };
g.put(a);
let ptr = &self.mins[0] as *const f32 as *const u8;
let a = unsafe { std::slice::from_raw_parts(ptr, size_of::<f32>() * n1) };
g.put(a);
let ptr = &self.maxs[0] as *const f32 as *const u8;
let a = unsafe { std::slice::from_raw_parts(ptr, size_of::<f32>() * n1) };
g.put(a);
let ptr = &self.avgs[0] as *const f32 as *const u8;
let a = unsafe { std::slice::from_raw_parts(ptr, size_of::<f32>() * n1) };
g.put(a);
}
g.freeze()
}
}
impl std::fmt::Debug for MinMaxAvgScalarBinBatch {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
fmt,
"MinMaxAvgScalarBinBatch count {} ts1s {:?} ts2s {:?} counts {:?} avgs {:?}",
self.ts1s.len(),
self.ts1s.iter().map(|k| k / SEC).collect::<Vec<_>>(),
self.ts2s.iter().map(|k| k / SEC).collect::<Vec<_>>(),
self.counts,
self.avgs
)
}
}
impl AggregatableXdim1Bin for MinMaxAvgScalarBinBatch {
type Output = MinMaxAvgScalarBinBatch;
fn into_agg(self) -> Self::Output {
todo!()
}
}
impl AggregatableTdim for MinMaxAvgScalarBinBatch {
type Output = MinMaxAvgScalarBinSingle;
type Aggregator = MinMaxAvgScalarBinBatchAggregator;
fn aggregator_new_static(_ts1: u64, _ts2: u64) -> Self::Aggregator {
todo!()
}
}
pub struct MinMaxAvgScalarBinBatchAggregator {}
impl AggregatorTdim for MinMaxAvgScalarBinBatchAggregator {
type InputValue = MinMaxAvgScalarBinBatch;
type OutputValue = MinMaxAvgScalarBinSingle;
fn ends_before(&self, _inp: &Self::InputValue) -> bool {
todo!()
}
fn ends_after(&self, _inp: &Self::InputValue) -> bool {
todo!()
}
fn starts_after(&self, _inp: &Self::InputValue) -> bool {
todo!()
}
fn ingest(&mut self, _v: &Self::InputValue) {
todo!()
}
fn result(self) -> Self::OutputValue {
todo!()
}
}

View File

@@ -1,5 +1,7 @@
use super::agg::{AggregatableXdim1Bin, IntoBinnedT, IntoBinnedXBins1, IntoDim1F32Stream, ValuesDim1};
use super::agg::{AggregatableXdim1Bin, IntoDim1F32Stream, ValuesDim1};
use super::merge::MergeDim1F32Stream;
use crate::agg::binnedt::IntoBinnedT;
use crate::agg::binnedx::IntoBinnedXBins1;
use crate::agg::make_test_node;
use futures_util::StreamExt;
use netpod::timeunits::*;

View File

@@ -1,4 +1,4 @@
use crate::agg::MinMaxAvgScalarBinBatch;
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
use crate::cache::pbvfs::PreBinnedValueFetchedStream;
use err::Error;
use futures_core::Stream;
@@ -41,16 +41,16 @@ impl BinnedStream {
let g = match k {
Ok(k) => {
use super::agg::{Fits, FitsInside};
//info!("BinnedStream got good item {:?}", k);
match k.fits_inside(range.clone()) {
Fits::Inside => Some(Ok(k)),
Fits::Inside
| Fits::PartlyGreater
| Fits::PartlyLower
| Fits::PartlyLowerAndGreater => Some(Ok(k)),
_ => None,
}
}
Err(e) => {
error!(
"\n\n----------------------------------------------------- BinnedStream got error"
);
error!("observe error in stream {:?}", e);
Some(Err(e))
}
};

View File

@@ -1,4 +1,4 @@
use crate::agg::MinMaxAvgScalarEventBatch;
use crate::agg::eventbatch::MinMaxAvgScalarEventBatch;
use crate::binnedstream::BinnedStream;
use crate::cache::pbv::PreBinnedValueByteStream;
use crate::channelconfig::{extract_matching_config_entry, read_local_config};

View File

@@ -1,4 +1,5 @@
use crate::agg::{IntoBinnedT, MinMaxAvgScalarBinBatch};
use crate::agg::binnedt::IntoBinnedT;
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
use crate::cache::pbvfs::{PreBinnedHttpFrame, PreBinnedValueFetchedStream};
use crate::cache::{node_ix_for_patch, MergedFromRemotes};
use crate::frame::makeframe::make_frame;

View File

@@ -1,4 +1,4 @@
use crate::agg::MinMaxAvgScalarBinBatch;
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
use crate::cache::{node_ix_for_patch, HttpBodyAsAsyncRead};
use crate::frame::inmem::InMemoryFrameAsyncReadStream;
use crate::frame::makeframe::FrameType;

169
disk/src/dataopen.rs Normal file
View File

@@ -0,0 +1,169 @@
use super::paths;
use bytes::BytesMut;
use err::Error;
use futures_util::StreamExt;
use netpod::log::*;
use netpod::{ChannelConfig, NanoRange, Nanos, Node};
use std::mem::size_of;
use std::sync::Arc;
use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncReadExt, AsyncSeekExt, ErrorKind, SeekFrom};
pub fn open_files(
range: &NanoRange,
channel_config: &ChannelConfig,
node: Arc<Node>,
) -> async_channel::Receiver<Result<File, Error>> {
let (chtx, chrx) = async_channel::bounded(2);
let range = range.clone();
let channel_config = channel_config.clone();
tokio::spawn(async move {
match open_files_inner(&chtx, &range, &channel_config, node).await {
Ok(_) => {}
Err(e) => match chtx.send(Err(e.into())).await {
Ok(_) => {}
Err(e) => {
error!("open_files channel send error {:?}", e);
}
},
}
});
chrx
}
async fn open_files_inner(
chtx: &async_channel::Sender<Result<File, Error>>,
range: &NanoRange,
channel_config: &ChannelConfig,
node: Arc<Node>,
) -> Result<(), Error> {
let channel_config = channel_config.clone();
// TODO reduce usage of `query` and see what we actually need.
let mut timebins = vec![];
{
let rd = tokio::fs::read_dir(paths::channel_timebins_dir_path(&channel_config, &node)?).await?;
let mut rd = tokio_stream::wrappers::ReadDirStream::new(rd);
while let Some(e) = rd.next().await {
let e = e?;
let dn = e
.file_name()
.into_string()
.map_err(|e| Error::with_msg(format!("Bad OS path {:?}", e)))?;
let vv = dn.chars().fold(0, |a, x| if x.is_digit(10) { a + 1 } else { a });
if vv == 19 {
timebins.push(dn.parse::<u64>()?);
}
}
}
timebins.sort_unstable();
info!("TIMEBINS FOUND: {:?}", timebins);
for &tb in &timebins {
let ts_bin = Nanos {
ns: tb * channel_config.time_bin_size,
};
if ts_bin.ns >= range.end {
continue;
}
if ts_bin.ns + channel_config.time_bin_size <= range.beg {
continue;
}
let path = paths::datapath(tb, &channel_config, &node);
let mut file = OpenOptions::new().read(true).open(&path).await?;
info!("opened file {:?} {:?}", &path, &file);
{
let index_path = paths::index_path(ts_bin, &channel_config, &node)?;
match OpenOptions::new().read(true).open(&index_path).await {
Ok(mut index_file) => {
let meta = index_file.metadata().await?;
if meta.len() > 1024 * 1024 * 10 {
return Err(Error::with_msg(format!(
"too large index file {} bytes for {}",
meta.len(),
channel_config.channel.name
)));
}
if meta.len() % 16 != 0 {
return Err(Error::with_msg(format!(
"bad meta len {} for {}",
meta.len(),
channel_config.channel.name
)));
}
let mut buf = BytesMut::with_capacity(meta.len() as usize);
buf.resize(buf.capacity(), 0);
info!("read exact index file {} {}", buf.len(), buf.len() % 16);
index_file.read_exact(&mut buf).await?;
match find_ge(range.beg, &buf)? {
Some(o) => {
info!("FOUND ts IN INDEX: {:?}", o);
file.seek(SeekFrom::Start(o.1)).await?;
}
None => {
info!("NOT FOUND IN INDEX");
file.seek(SeekFrom::End(0)).await?;
}
}
}
Err(e) => match e.kind() {
ErrorKind::NotFound => {
// TODO Read first 1k, assume that channel header fits.
// TODO Seek via binary search. Can not read whole file into memory!
todo!("Seek directly in scalar file");
}
_ => Err(e)?,
},
}
}
// TODO Since I want to seek into the data file, the consumer of this channel must not expect a file channel name header.
chtx.send(Ok(file)).await?;
}
Ok(())
}
fn find_ge(h: u64, buf: &[u8]) -> Result<Option<(u64, u64)>, Error> {
trace!("find_ge {}", h);
const N: usize = 2 * size_of::<u64>();
let n1 = buf.len();
if n1 % N != 0 {
return Err(Error::with_msg(format!("find_ge bad len {}", n1)));
}
if n1 == 0 {
warn!("Empty index data");
return Ok(None);
}
let n1 = n1 / N;
let a = unsafe {
let ptr = &buf[0] as *const u8 as *const ([u8; 8], [u8; 8]);
std::slice::from_raw_parts(ptr, n1)
};
let mut j = 0;
let mut k = n1 - 1;
let x = u64::from_be_bytes(a[j].0);
let y = u64::from_be_bytes(a[k].0);
trace!("first/last ts: {} {}", x, y);
if x >= h {
return Ok(Some((u64::from_be_bytes(a[j].0), u64::from_be_bytes(a[j].1))));
}
if y < h {
return Ok(None);
}
loop {
if k - j < 2 {
let ret = (u64::from_be_bytes(a[k].0), u64::from_be_bytes(a[k].1));
trace!("FOUND {:?}", ret);
return Ok(Some(ret));
}
let m = (k + j) / 2;
let x = u64::from_be_bytes(a[m].0);
trace!("CHECK NEW M: {}", x);
if x < h {
j = m;
} else {
k = m;
}
}
}

View File

@@ -1,5 +1,6 @@
use crate::dataopen::open_files;
use crate::eventchunker::{EventChunker, EventFull};
use crate::{file_content_stream, open_files};
use crate::file_content_stream;
use err::Error;
use futures_core::Stream;
use futures_util::StreamExt;
@@ -48,7 +49,8 @@ impl Stream for EventBlobsComplete {
Ready(Some(k)) => match k {
Ok(file) => {
let inp = Box::pin(file_content_stream(file, self.buffer_size as usize));
let chunker = EventChunker::new(inp, self.channel_config.clone(), self.range.clone());
let chunker =
EventChunker::from_event_boundary(inp, self.channel_config.clone(), self.range.clone());
self.evs = Some(chunker);
continue 'outer;
}
@@ -75,7 +77,7 @@ pub fn event_blobs_complete(
match fileres {
Ok(file) => {
let inp = Box::pin(file_content_stream(file, query.buffer_size as usize));
let mut chunker = EventChunker::new(inp, err::todoval(), err::todoval());
let mut chunker = EventChunker::from_event_boundary(inp, err::todoval(), err::todoval());
while let Some(evres) = chunker.next().await {
match evres {
Ok(evres) => {

View File

@@ -29,7 +29,7 @@ enum DataFileState {
}
impl EventChunker {
pub fn new(
pub fn from_start(
inp: Pin<Box<dyn Stream<Item = Result<BytesMut, Error>> + Send>>,
channel_config: ChannelConfig,
range: NanoRange,
@@ -37,7 +37,7 @@ impl EventChunker {
let mut inp = NeedMinBuffer::new(inp);
inp.set_need_min(6);
Self {
inp: inp,
inp,
polled: 0,
state: DataFileState::FileHeader,
need_min: 6,
@@ -49,6 +49,26 @@ impl EventChunker {
}
}
pub fn from_event_boundary(
inp: Pin<Box<dyn Stream<Item = Result<BytesMut, Error>> + Send>>,
channel_config: ChannelConfig,
range: NanoRange,
) -> Self {
let mut inp = NeedMinBuffer::new(inp);
inp.set_need_min(4);
Self {
inp,
polled: 0,
state: DataFileState::Event,
need_min: 4,
channel_config,
errored: false,
completed: false,
range,
seen_beyond_range: false,
}
}
fn parse_buf(&mut self, buf: &mut BytesMut) -> Result<ParseResult, Error> {
span!(Level::INFO, "EventChunker::parse_buf").in_scope(|| self.parse_buf_inner(buf))
}
@@ -194,7 +214,9 @@ impl EventChunker {
assert!(c1 as u32 == k1);
trace!("decompress result c1 {} k1 {}", c1, k1);
if ts < self.range.beg {
warn!("UNNECESSARY EVENT DECOMPRESS {}", ts / SEC);
error!("EVENT BEFORE RANGE {}", ts / SEC);
} else if ts >= self.range.end {
error!("EVENT BEFORE RANGE {}", ts / SEC);
} else {
ret.add_event(
ts,

View File

@@ -179,6 +179,37 @@ async fn gen_config(
Ok(())
}
struct CountedFile {
file: File,
bytes: u64,
}
impl CountedFile {
pub fn new(file: File) -> Self {
Self { file, bytes: 0 }
}
pub async fn write_all(&mut self, buf: &[u8]) -> Result<u64, Error> {
let l = buf.len();
let mut i = 0;
loop {
match self.file.write(&buf[i..]).await {
Ok(n) => {
i += n;
self.bytes += n as u64;
if i >= l {
break;
}
}
Err(e) => Err(e)?,
}
}
Ok(i as u64)
}
pub fn written_len(&self) -> u64 {
self.bytes
}
}
struct GenTimebinRes {
evix: u64,
ts: u64,
@@ -201,12 +232,13 @@ async fn gen_timebin(
let data_path = path.join(format!("{:019}_{:05}_Data", config.time_bin_size / MS, 0));
let index_path = path.join(format!("{:019}_{:05}_Data_Index", config.time_bin_size / MS, 0));
info!("open data file {:?}", data_path);
let mut file = OpenOptions::new()
let file = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(data_path)
.await?;
let mut file = CountedFile::new(file);
let mut index_file = if let Shape::Wave(_) = config.shape {
info!("open index file {:?}", index_path);
let f = OpenOptions::new()
@@ -215,7 +247,7 @@ async fn gen_timebin(
.truncate(true)
.open(index_path)
.await?;
Some(f)
Some(CountedFile::new(f))
} else {
None
};
@@ -234,7 +266,7 @@ async fn gen_timebin(
Ok(ret)
}
async fn gen_datafile_header(file: &mut File, config: &ChannelConfig) -> Result<(), Error> {
async fn gen_datafile_header(file: &mut CountedFile, config: &ChannelConfig) -> Result<(), Error> {
let mut buf = BytesMut::with_capacity(1024);
let cnenc = config.channel.name.as_bytes();
let len1 = cnenc.len() + 8;
@@ -247,8 +279,8 @@ async fn gen_datafile_header(file: &mut File, config: &ChannelConfig) -> Result<
}
async fn gen_event(
file: &mut File,
_index_file: Option<&mut File>,
file: &mut CountedFile,
index_file: Option<&mut CountedFile>,
evix: u64,
ts: u64,
config: &ChannelConfig,
@@ -305,6 +337,13 @@ async fn gen_event(
buf.put_u32(len);
buf.as_mut().put_u32(len);
}
let z = file.written_len();
file.write_all(buf.as_ref()).await?;
if let Some(f) = index_file {
let mut buf = BytesMut::with_capacity(16);
buf.put_u64(ts);
buf.put_u64(z);
f.write_all(&buf).await?;
}
Ok(())
}

View File

@@ -1,3 +1,4 @@
use crate::dataopen::open_files;
use crate::dtflags::{ARRAY, BIG_ENDIAN, COMPRESSION, SHAPE};
use bytes::{Bytes, BytesMut};
use err::Error;
@@ -21,6 +22,7 @@ pub mod aggtest;
pub mod binnedstream;
pub mod cache;
pub mod channelconfig;
pub mod dataopen;
pub mod eventblobs;
pub mod eventchunker;
pub mod frame;
@@ -297,55 +299,6 @@ pub fn raw_concat_channel_read_stream_file_pipe(
}
}
fn open_files(
range: &NanoRange,
channel_config: &ChannelConfig,
node: Arc<Node>,
) -> async_channel::Receiver<Result<tokio::fs::File, Error>> {
let channel_config = channel_config.clone();
let (chtx, chrx) = async_channel::bounded(2);
tokio::spawn(async move {
// TODO reduce usage of `query` and see what we actually need.
// TODO scan the timebins on the filesystem for the potential files first instead of trying to open hundreds in worst case.
let mut timebins = vec![];
{
let rd = tokio::fs::read_dir(paths::channel_timebins_dir_path(&channel_config, &node)?).await?;
let mut rd = tokio_stream::wrappers::ReadDirStream::new(rd);
while let Some(e) = rd.next().await {
let e = e?;
let dn = e
.file_name()
.into_string()
.map_err(|e| Error::with_msg(format!("Bad OS path {:?}", e)))?;
let vv = dn.chars().fold(0, |a, x| if x.is_digit(10) { a + 1 } else { a });
if vv == 19 {
timebins.push(dn.parse::<u64>()?);
}
}
}
timebins.sort_unstable();
info!("TIMEBINS FOUND: {:?}", timebins);
for &tb in &timebins {
let path = paths::datapath(tb, &channel_config, &node);
let fileres = OpenOptions::new().read(true).open(&path).await;
info!("opened file {:?} {:?}", &path, &fileres);
match fileres {
Ok(k) => match chtx.send(Ok(k)).await {
Ok(_) => (),
Err(_) => break,
},
Err(e) => match chtx.send(Err(e.into())).await {
Ok(_) => (),
Err(_) => break,
},
}
}
Ok::<_, Error>(())
});
chrx
}
pub fn file_content_stream(
mut file: tokio::fs::File,
buffer_size: usize,
@@ -379,7 +332,7 @@ pub fn parsed1(
Ok(file) => {
let inp = Box::pin(file_content_stream(file, query.buffer_size as usize));
let range = err::todoval();
let mut chunker = eventchunker::EventChunker::new(inp, err::todoval(), range);
let mut chunker = eventchunker::EventChunker::from_event_boundary(inp, err::todoval(), range);
while let Some(evres) = chunker.next().await {
match evres {
Ok(evres) => {

View File

@@ -1,4 +1,5 @@
use crate::agg::{Dim1F32Stream, MinMaxAvgScalarEventBatch, ValuesDim1};
use crate::agg::eventbatch::MinMaxAvgScalarEventBatch;
use crate::agg::{Dim1F32Stream, ValuesDim1};
use crate::eventchunker::EventFull;
use err::Error;
use futures_core::Stream;

View File

@@ -5,7 +5,7 @@ Delivers event data (not yet time-binned) from local storage and provides client
to request such data from nodes.
*/
use crate::agg::MinMaxAvgScalarEventBatch;
use crate::agg::eventbatch::MinMaxAvgScalarEventBatch;
use crate::frame::inmem::InMemoryFrameAsyncReadStream;
use crate::frame::makeframe::{make_frame, make_term_frame};
use crate::raw::bffr::MinMaxAvgScalarEventBatchStreamFromFrames;

View File

@@ -1,4 +1,4 @@
use crate::agg::MinMaxAvgScalarEventBatch;
use crate::agg::eventbatch::MinMaxAvgScalarEventBatch;
use crate::frame::inmem::InMemoryFrameAsyncReadStream;
use crate::frame::makeframe::FrameType;
use crate::raw::conn::RawConnOut;

View File

@@ -1,4 +1,6 @@
use crate::agg::{IntoBinnedXBins1, IntoDim1F32Stream, MinMaxAvgScalarEventBatch};
use crate::agg::binnedx::IntoBinnedXBins1;
use crate::agg::eventbatch::MinMaxAvgScalarEventBatch;
use crate::agg::IntoDim1F32Stream;
use crate::channelconfig::{extract_matching_config_entry, read_local_config};
use crate::eventblobs::EventBlobsComplete;
use crate::frame::inmem::InMemoryFrameAsyncReadStream;
@@ -128,11 +130,7 @@ async fn raw_conn_handler_inner_try(
Ok(_) => (),
Err(e) => return Err((e, netout))?,
}
debug!("REQUEST FOR RANGE {:?}", evq.range);
error!(
"TODO decide on response content based on the parsed json query\n{:?}",
evq
);
debug!("REQUEST {:?}", evq);
let range = &evq.range;
let channel_config = match read_local_config(&evq.channel, node_config.clone()).await {
Ok(k) => k,
@@ -182,7 +180,6 @@ async fn raw_conn_handler_inner_try(
buffer_size,
)
.into_dim_1_f32_stream()
.take(10)
.into_binned_x_bins_1();
let mut e = 0;
while let Some(item) = s1.next().await {

View File

@@ -90,6 +90,8 @@ impl std::fmt::Display for Error {
}
}
impl std::error::Error for Error {}
impl From<String> for Error {
fn from(k: String) -> Self {
Self::with_msg(k)
@@ -102,8 +104,6 @@ impl From<&str> for Error {
}
}
impl std::error::Error for Error {}
impl From<std::io::Error> for Error {
fn from(k: std::io::Error) -> Self {
Self::with_msg(k.to_string())
@@ -192,6 +192,12 @@ impl From<tokio_postgres::Error> for Error {
}
}
impl<T> From<async_channel::SendError<T>> for Error {
fn from(k: async_channel::SendError<T>) -> Self {
Self::with_msg(k.to_string())
}
}
pub fn todoval<T>() -> T {
todo!("TODO todoval")
}

View File

@@ -151,7 +151,7 @@ pub enum TimeRange {
Nano { beg: u64, end: u64 },
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub struct Nanos {
pub ns: u64,
}

View File

@@ -48,8 +48,8 @@ async fn get_cached_0_inner() -> Result<(), Error> {
let cluster = Arc::new(test_cluster());
let node0 = &cluster.nodes[0];
let hosts = spawn_test_hosts(cluster.clone());
let beg_date: chrono::DateTime<Utc> = "1970-01-01T00:00:10.000Z".parse()?;
let end_date: chrono::DateTime<Utc> = "1970-01-01T00:00:51.000Z".parse()?;
let beg_date: chrono::DateTime<Utc> = "1970-01-01T00:20:10.000Z".parse()?;
let end_date: chrono::DateTime<Utc> = "1970-01-01T00:20:51.000Z".parse()?;
let channel_backend = "back";
let channel_name = "wave1";
let bin_count = 4;

View File

@@ -56,7 +56,7 @@ pub fn tracing_init() {
.with_thread_names(true)
//.with_max_level(tracing::Level::INFO)
.with_env_filter(tracing_subscriber::EnvFilter::new(
"info,retrieval=trace,retrieval::test=trace,disk::raw::conn=trace,tokio_postgres=info",
"info,retrieval=trace,retrieval::test=trace,disk::raw::conn=info",
))
.init();
}