After rustfmt

This commit is contained in:
Dominik Werder
2021-04-16 14:43:21 +02:00
parent 1150bb3c55
commit 3afcddb1c7
16 changed files with 951 additions and 832 deletions
+32 -10
View File
@@ -1,29 +1,51 @@
use libc::{size_t}; use libc::size_t;
extern { extern "C" {
pub fn bshuf_compress_lz4(inp: *const u8, out: *const u8, size: size_t, elem_size: size_t, block_size: size_t) -> i64; pub fn bshuf_compress_lz4(
pub fn bshuf_decompress_lz4(inp: *const u8, out: *const u8, size: size_t, elem_size: size_t, block_size: size_t) -> i64; inp: *const u8,
out: *const u8,
size: size_t,
elem_size: size_t,
block_size: size_t,
) -> i64;
pub fn bshuf_decompress_lz4(
inp: *const u8,
out: *const u8,
size: size_t,
elem_size: size_t,
block_size: size_t,
) -> i64;
} }
pub fn bitshuffle_compress(inp: &[u8], out: &mut [u8], size: usize, elem_size: usize, block_size: usize) -> Result<usize, isize> { pub fn bitshuffle_compress(
inp: &[u8],
out: &mut [u8],
size: usize,
elem_size: usize,
block_size: usize,
) -> Result<usize, isize> {
unsafe { unsafe {
let n = bshuf_compress_lz4(inp.as_ptr(), out.as_mut_ptr(), size, elem_size, block_size); let n = bshuf_compress_lz4(inp.as_ptr(), out.as_mut_ptr(), size, elem_size, block_size);
if n >= 0 { if n >= 0 {
Ok(n as usize) Ok(n as usize)
} } else {
else {
Err(n as isize) Err(n as isize)
} }
} }
} }
pub fn bitshuffle_decompress(inp: &[u8], out: &mut [u8], size: usize, elem_size: usize, block_size: usize) -> Result<usize, isize> { pub fn bitshuffle_decompress(
inp: &[u8],
out: &mut [u8],
size: usize,
elem_size: usize,
block_size: usize,
) -> Result<usize, isize> {
unsafe { unsafe {
let n = bshuf_decompress_lz4(inp.as_ptr(), out.as_mut_ptr(), size, elem_size, block_size); let n = bshuf_decompress_lz4(inp.as_ptr(), out.as_mut_ptr(), size, elem_size, block_size);
if n >= 0 { if n >= 0 {
Ok(n as usize) Ok(n as usize)
} } else {
else {
Err(n as isize) Err(n as isize)
} }
} }
+226 -184
View File
@@ -1,15 +1,15 @@
#[allow(unused_imports)]
use tracing::{error, warn, info, debug, trace};
use err::Error;
use std::task::{Context, Poll};
use std::pin::Pin;
use crate::EventFull;
use futures_core::Stream;
use futures_util::{pin_mut, StreamExt, future::ready};
use netpod::{Channel, ChannelConfig, ScalarType, Shape, Node, timeunits::*};
use crate::merge::MergeDim1F32Stream; use crate::merge::MergeDim1F32Stream;
use crate::EventFull;
use err::Error;
use futures_core::Stream;
use futures_util::{future::ready, pin_mut, StreamExt};
use netpod::BinSpecDimT; use netpod::BinSpecDimT;
use netpod::{timeunits::*, Channel, ChannelConfig, Node, ScalarType, Shape};
use std::pin::Pin;
use std::sync::Arc; use std::sync::Arc;
use std::task::{Context, Poll};
#[allow(unused_imports)]
use tracing::{debug, error, info, trace, warn};
pub trait AggregatorTdim { pub trait AggregatorTdim {
type InputValue; type InputValue;
@@ -32,11 +32,12 @@ pub trait AggregatableTdim {
fn aggregator_new(&self, ts1: u64, ts2: u64) -> Self::Aggregator; fn aggregator_new(&self, ts1: u64, ts2: u64) -> Self::Aggregator;
} }
// dummy // dummy
impl AggregatableXdim1Bin for () { impl AggregatableXdim1Bin for () {
type Output = (); type Output = ();
fn into_agg(self) -> Self::Output { todo!() } fn into_agg(self) -> Self::Output {
todo!()
}
} }
impl AggregatableTdim for () { impl AggregatableTdim for () {
type Output = (); type Output = ();
@@ -61,11 +62,14 @@ impl AggregatorTdim for () {
todo!() todo!()
} }
fn ingest(&mut self, v: &Self::InputValue) { todo!() } fn ingest(&mut self, v: &Self::InputValue) {
fn result(self) -> Self::OutputValue { todo!() } todo!()
}
fn result(self) -> Self::OutputValue {
todo!()
}
} }
pub struct ValuesDim0 { pub struct ValuesDim0 {
tss: Vec<u64>, tss: Vec<u64>,
values: Vec<Vec<f32>>, values: Vec<Vec<f32>>,
@@ -73,7 +77,13 @@ pub struct ValuesDim0 {
impl std::fmt::Debug for ValuesDim0 { impl std::fmt::Debug for ValuesDim0 {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(fmt, "count {} tsA {:?} tsB {:?}", self.tss.len(), self.tss.first(), self.tss.last()) write!(
fmt,
"count {} tsA {:?} tsB {:?}",
self.tss.len(),
self.tss.first(),
self.tss.last()
)
} }
} }
@@ -101,8 +111,12 @@ impl AggregatableXdim1Bin for ValuesDim1 {
max = max.max(v); max = max.max(v);
sum += v; sum += v;
} }
if min == f32::MAX { min = f32::NAN; } if min == f32::MAX {
if max == f32::MIN { max = f32::NAN; } min = f32::NAN;
}
if max == f32::MIN {
max = f32::NAN;
}
ret.tss.push(ts); ret.tss.push(ts);
ret.mins.push(min); ret.mins.push(min);
ret.maxs.push(max); ret.maxs.push(max);
@@ -110,29 +124,31 @@ impl AggregatableXdim1Bin for ValuesDim1 {
} }
ret ret
} }
} }
pub struct ValuesDim1 { pub struct ValuesDim1 {
pub tss: Vec<u64>, pub tss: Vec<u64>,
pub values: Vec<Vec<f32>>, pub values: Vec<Vec<f32>>,
} }
impl ValuesDim1 { impl ValuesDim1 {
pub fn empty() -> Self { pub fn empty() -> Self {
Self { Self {
tss: vec![], tss: vec![],
values: vec![], values: vec![],
} }
} }
} }
impl std::fmt::Debug for ValuesDim1 { impl std::fmt::Debug for ValuesDim1 {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(fmt, "count {} tsA {:?} tsB {:?}", self.tss.len(), self.tss.first(), self.tss.last()) write!(
fmt,
"count {} tsA {:?} tsB {:?}",
self.tss.len(),
self.tss.first(),
self.tss.last()
)
} }
} }
@@ -160,8 +176,12 @@ impl AggregatableXdim1Bin for ValuesDim0 {
max = max.max(v); max = max.max(v);
sum += v; sum += v;
} }
if min == f32::MAX { min = f32::NAN; } if min == f32::MAX {
if max == f32::MIN { max = f32::NAN; } min = f32::NAN;
}
if max == f32::MIN {
max = f32::NAN;
}
ret.tss.push(ts); ret.tss.push(ts);
ret.mins.push(min); ret.mins.push(min);
ret.maxs.push(max); ret.maxs.push(max);
@@ -169,10 +189,8 @@ impl AggregatableXdim1Bin for ValuesDim0 {
} }
ret ret
} }
} }
pub struct MinMaxAvgScalarEventBatch { pub struct MinMaxAvgScalarEventBatch {
tss: Vec<u64>, tss: Vec<u64>,
mins: Vec<f32>, mins: Vec<f32>,
@@ -200,7 +218,6 @@ impl AggregatableTdim for MinMaxAvgScalarEventBatch {
fn aggregator_new(&self, ts1: u64, ts2: u64) -> Self::Aggregator { fn aggregator_new(&self, ts1: u64, ts2: u64) -> Self::Aggregator {
MinMaxAvgScalarEventBatchAggregator::new(ts1, ts2) MinMaxAvgScalarEventBatchAggregator::new(ts1, ts2)
} }
} }
pub struct MinMaxAvgScalarEventBatchAggregator { pub struct MinMaxAvgScalarEventBatchAggregator {
@@ -213,7 +230,6 @@ pub struct MinMaxAvgScalarEventBatchAggregator {
} }
impl MinMaxAvgScalarEventBatchAggregator { impl MinMaxAvgScalarEventBatchAggregator {
pub fn new(ts1: u64, ts2: u64) -> Self { pub fn new(ts1: u64, ts2: u64) -> Self {
Self { Self {
ts1, ts1,
@@ -224,7 +240,6 @@ impl MinMaxAvgScalarEventBatchAggregator {
count: 0, count: 0,
} }
} }
} }
impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator { impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator {
@@ -233,28 +248,22 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator {
fn ends_before(&self, inp: &Self::InputValue) -> bool { fn ends_before(&self, inp: &Self::InputValue) -> bool {
match inp.tss.last() { match inp.tss.last() {
Some(ts) => { Some(ts) => *ts < self.ts1,
*ts < self.ts1
}
None => true, None => true,
} }
} }
fn ends_after(&self, inp: &Self::InputValue) -> bool { fn ends_after(&self, inp: &Self::InputValue) -> bool {
match inp.tss.last() { match inp.tss.last() {
Some(ts) => { Some(ts) => *ts >= self.ts2,
*ts >= self.ts2 _ => panic!(),
}
_ => panic!()
} }
} }
fn starts_after(&self, inp: &Self::InputValue) -> bool { fn starts_after(&self, inp: &Self::InputValue) -> bool {
match inp.tss.first() { match inp.tss.first() {
Some(ts) => { Some(ts) => *ts >= self.ts2,
*ts >= self.ts2 _ => panic!(),
}
_ => panic!()
} }
} }
@@ -264,12 +273,10 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator {
if ts < self.ts1 { if ts < self.ts1 {
//info!("EventBatchAgg {} {} {} {} IS BEFORE", v.tss[i1], v.mins[i1], v.maxs[i1], v.avgs[i1]); //info!("EventBatchAgg {} {} {} {} IS BEFORE", v.tss[i1], v.mins[i1], v.maxs[i1], v.avgs[i1]);
continue; continue;
} } else if ts >= self.ts2 {
else if ts >= self.ts2 {
//info!("EventBatchAgg {} {} {} {} IS AFTER", v.tss[i1], v.mins[i1], v.maxs[i1], v.avgs[i1]); //info!("EventBatchAgg {} {} {} {} IS AFTER", v.tss[i1], v.mins[i1], v.maxs[i1], v.avgs[i1]);
continue; continue;
} } else {
else {
//info!("EventBatchAgg {} {} {} {}", v.tss[i1], v.mins[i1], v.maxs[i1], v.avgs[i1]); //info!("EventBatchAgg {} {} {} {}", v.tss[i1], v.mins[i1], v.maxs[i1], v.avgs[i1]);
self.min = self.min.min(v.mins[i1]); self.min = self.min.min(v.mins[i1]);
self.max = self.max.max(v.maxs[i1]); self.max = self.max.max(v.maxs[i1]);
@@ -280,9 +287,21 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator {
} }
fn result(self) -> Self::OutputValue { fn result(self) -> Self::OutputValue {
let min = if self.min == f32::MAX { f32::NAN } else { self.min }; let min = if self.min == f32::MAX {
let max = if self.max == f32::MIN { f32::NAN } else { self.max }; f32::NAN
let avg = if self.count == 0 { f32::NAN } else { self.sum / self.count as f32 }; } else {
self.min
};
let max = if self.max == f32::MIN {
f32::NAN
} else {
self.max
};
let avg = if self.count == 0 {
f32::NAN
} else {
self.sum / self.count as f32
};
MinMaxAvgScalarBinSingle { MinMaxAvgScalarBinSingle {
ts1: self.ts1, ts1: self.ts1,
ts2: self.ts2, ts2: self.ts2,
@@ -292,10 +311,8 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator {
avg, avg,
} }
} }
} }
pub struct MinMaxAvgScalarBinBatch { pub struct MinMaxAvgScalarBinBatch {
ts1s: Vec<u64>, ts1s: Vec<u64>,
ts2s: Vec<u64>, ts2s: Vec<u64>,
@@ -348,11 +365,11 @@ impl AggregatorTdim for MinMaxAvgScalarBinBatchAggregator {
todo!() todo!()
} }
fn result(self) -> Self::OutputValue { todo!() } fn result(self) -> Self::OutputValue {
todo!()
}
} }
pub struct MinMaxAvgScalarBinSingle { pub struct MinMaxAvgScalarBinSingle {
ts1: u64, ts1: u64,
ts2: u64, ts2: u64,
@@ -408,17 +425,19 @@ impl AggregatorTdim for MinMaxAvgScalarBinSingleAggregator {
fn result(self) -> Self::OutputValue { fn result(self) -> Self::OutputValue {
todo!() todo!()
} }
} }
pub struct Dim0F32Stream<S>
where
S: Stream<Item = Result<EventFull, Error>>,
pub struct Dim0F32Stream<S> where S: Stream<Item=Result<EventFull, Error>> { {
inp: S, inp: S,
} }
impl<S> Stream for Dim0F32Stream<S> where S: Stream<Item=Result<EventFull, Error>> + Unpin { impl<S> Stream for Dim0F32Stream<S>
where
S: Stream<Item = Result<EventFull, Error>> + Unpin,
{
type Item = Result<ValuesDim0, Error>; type Item = Result<ValuesDim0, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
@@ -447,7 +466,9 @@ impl<S> Stream for Dim0F32Stream<S> where S: Stream<Item=Result<EventFull, Error
let ele_count = n1 / ty.bytes() as usize; let ele_count = n1 / ty.bytes() as usize;
let mut j = Vec::with_capacity(ele_count); let mut j = Vec::with_capacity(ele_count);
// this is safe for ints and floats // this is safe for ints and floats
unsafe { j.set_len(ele_count); } unsafe {
j.set_len(ele_count);
}
let mut p1 = 0; let mut p1 = 0;
for i1 in 0..ele_count { for i1 in 0..ele_count {
let u = unsafe { let u = unsafe {
@@ -461,8 +482,8 @@ impl<S> Stream for Dim0F32Stream<S> where S: Stream<Item=Result<EventFull, Error
} }
ret.tss.push(k.tss[i1]); ret.tss.push(k.tss[i1]);
ret.values.push(j); ret.values.push(j);
}, }
_ => todo!() _ => todo!(),
} }
} }
Ready(Some(Ok(todo!()))) Ready(Some(Ok(todo!())))
@@ -472,31 +493,34 @@ impl<S> Stream for Dim0F32Stream<S> where S: Stream<Item=Result<EventFull, Error
Pending => Pending, Pending => Pending,
} }
} }
} }
pub trait IntoDim0F32Stream { pub trait IntoDim0F32Stream {
fn into_dim_0_f32_stream(self) -> Dim0F32Stream<Self> where Self: Stream<Item=Result<EventFull, Error>> + Sized; fn into_dim_0_f32_stream(self) -> Dim0F32Stream<Self>
where
Self: Stream<Item = Result<EventFull, Error>> + Sized;
} }
impl<T> IntoDim0F32Stream for T where T: Stream<Item=Result<EventFull, Error>> { impl<T> IntoDim0F32Stream for T
where
T: Stream<Item = Result<EventFull, Error>>,
{
fn into_dim_0_f32_stream(self) -> Dim0F32Stream<T> { fn into_dim_0_f32_stream(self) -> Dim0F32Stream<T> {
Dim0F32Stream { Dim0F32Stream { inp: self }
inp: self,
}
} }
} }
pub struct Dim1F32Stream<S>
where
S: Stream<Item = Result<EventFull, Error>>,
pub struct Dim1F32Stream<S> where S: Stream<Item=Result<EventFull, Error>> { {
inp: S, inp: S,
} }
impl<S> Stream for Dim1F32Stream<S> where S: Stream<Item=Result<EventFull, Error>> + Unpin { impl<S> Stream for Dim1F32Stream<S>
where
S: Stream<Item = Result<EventFull, Error>> + Unpin,
{
type Item = Result<ValuesDim1, Error>; type Item = Result<ValuesDim1, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
@@ -521,7 +545,9 @@ impl<S> Stream for Dim1F32Stream<S> where S: Stream<Item=Result<EventFull, Error
let ele_count = n1 / ty.bytes() as usize; let ele_count = n1 / ty.bytes() as usize;
let mut j = Vec::with_capacity(ele_count); let mut j = Vec::with_capacity(ele_count);
// this is safe for ints and floats // this is safe for ints and floats
unsafe { j.set_len(ele_count); } unsafe {
j.set_len(ele_count);
}
let mut p1 = 0; let mut p1 = 0;
for i1 in 0..ele_count { for i1 in 0..ele_count {
let u = unsafe { let u = unsafe {
@@ -535,8 +561,8 @@ impl<S> Stream for Dim1F32Stream<S> where S: Stream<Item=Result<EventFull, Error
} }
ret.tss.push(k.tss[i1]); ret.tss.push(k.tss[i1]);
ret.values.push(j); ret.values.push(j);
}, }
_ => todo!() _ => todo!(),
} }
} }
Ready(Some(Ok(ret))) Ready(Some(Ok(ret)))
@@ -546,45 +572,54 @@ impl<S> Stream for Dim1F32Stream<S> where S: Stream<Item=Result<EventFull, Error
Pending => Pending, Pending => Pending,
} }
} }
} }
pub trait IntoDim1F32Stream { pub trait IntoDim1F32Stream {
fn into_dim_1_f32_stream(self) -> Dim1F32Stream<Self> where Self: Stream<Item=Result<EventFull, Error>> + Sized; fn into_dim_1_f32_stream(self) -> Dim1F32Stream<Self>
where
Self: Stream<Item = Result<EventFull, Error>> + Sized;
} }
impl<T> IntoDim1F32Stream for T where T: Stream<Item=Result<EventFull, Error>> { impl<T> IntoDim1F32Stream for T
where
T: Stream<Item = Result<EventFull, Error>>,
{
fn into_dim_1_f32_stream(self) -> Dim1F32Stream<T> { fn into_dim_1_f32_stream(self) -> Dim1F32Stream<T> {
Dim1F32Stream { Dim1F32Stream { inp: self }
inp: self,
}
} }
} }
pub trait IntoBinnedXBins1<I: AggregatableXdim1Bin> { pub trait IntoBinnedXBins1<I: AggregatableXdim1Bin> {
type StreamOut; type StreamOut;
fn into_binned_x_bins_1(self) -> Self::StreamOut where Self: Stream<Item=Result<I, Error>>; fn into_binned_x_bins_1(self) -> Self::StreamOut
where
Self: Stream<Item = Result<I, Error>>;
} }
impl<T, I: AggregatableXdim1Bin> IntoBinnedXBins1<I> for T where T: Stream<Item=Result<I, Error>> + Unpin { impl<T, I: AggregatableXdim1Bin> IntoBinnedXBins1<I> for T
where
T: Stream<Item = Result<I, Error>> + Unpin,
{
type StreamOut = IntoBinnedXBins1DefaultStream<T, I>; type StreamOut = IntoBinnedXBins1DefaultStream<T, I>;
fn into_binned_x_bins_1(self) -> Self::StreamOut { fn into_binned_x_bins_1(self) -> Self::StreamOut {
IntoBinnedXBins1DefaultStream { IntoBinnedXBins1DefaultStream { inp: self }
inp: self,
}
} }
} }
pub struct IntoBinnedXBins1DefaultStream<S, I> where S: Stream<Item=Result<I, Error>> + Unpin, I: AggregatableXdim1Bin { pub struct IntoBinnedXBins1DefaultStream<S, I>
where
S: Stream<Item = Result<I, Error>> + Unpin,
I: AggregatableXdim1Bin,
{
inp: S, inp: S,
} }
impl<S, I> Stream for IntoBinnedXBins1DefaultStream<S, I> where S: Stream<Item=Result<I, Error>> + Unpin, I: AggregatableXdim1Bin { impl<S, I> Stream for IntoBinnedXBins1DefaultStream<S, I>
where
S: Stream<Item = Result<I, Error>> + Unpin,
I: AggregatableXdim1Bin,
{
type Item = Result<I::Output, Error>; type Item = Result<I::Output, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
@@ -596,26 +631,31 @@ impl<S, I> Stream for IntoBinnedXBins1DefaultStream<S, I> where S: Stream<Item=R
Pending => Pending, Pending => Pending,
} }
} }
} }
pub trait IntoBinnedT { pub trait IntoBinnedT {
type StreamOut: Stream; type StreamOut: Stream;
fn into_binned_t(self, spec: BinSpecDimT) -> Self::StreamOut; fn into_binned_t(self, spec: BinSpecDimT) -> Self::StreamOut;
} }
impl<T, I> IntoBinnedT for T where I: AggregatableTdim + Unpin, T: Stream<Item=Result<I, Error>> + Unpin, I::Aggregator: Unpin { impl<T, I> IntoBinnedT for T
where
I: AggregatableTdim + Unpin,
T: Stream<Item = Result<I, Error>> + Unpin,
I::Aggregator: Unpin,
{
type StreamOut = IntoBinnedTDefaultStream<T, I>; type StreamOut = IntoBinnedTDefaultStream<T, I>;
fn into_binned_t(self, spec: BinSpecDimT) -> Self::StreamOut { fn into_binned_t(self, spec: BinSpecDimT) -> Self::StreamOut {
IntoBinnedTDefaultStream::new(self, spec) IntoBinnedTDefaultStream::new(self, spec)
} }
} }
pub struct IntoBinnedTDefaultStream<S, I> where I: AggregatableTdim, S: Stream<Item=Result<I, Error>> { pub struct IntoBinnedTDefaultStream<S, I>
where
I: AggregatableTdim,
S: Stream<Item = Result<I, Error>>,
{
inp: S, inp: S,
aggtor: Option<I::Aggregator>, aggtor: Option<I::Aggregator>,
spec: BinSpecDimT, spec: BinSpecDimT,
@@ -623,8 +663,11 @@ pub struct IntoBinnedTDefaultStream<S, I> where I: AggregatableTdim, S: Stream<I
left: Option<Poll<Option<Result<I, Error>>>>, left: Option<Poll<Option<Result<I, Error>>>>,
} }
impl<S, I> IntoBinnedTDefaultStream<S, I> where I: AggregatableTdim, S: Stream<Item=Result<I, Error>> { impl<S, I> IntoBinnedTDefaultStream<S, I>
where
I: AggregatableTdim,
S: Stream<Item = Result<I, Error>>,
{
pub fn new(inp: S, spec: BinSpecDimT) -> Self { pub fn new(inp: S, spec: BinSpecDimT) -> Self {
//info!("spec ts {} {}", spec.ts1, spec.ts2); //info!("spec ts {} {}", spec.ts1, spec.ts2);
Self { Self {
@@ -635,11 +678,13 @@ impl<S, I> IntoBinnedTDefaultStream<S, I> where I: AggregatableTdim, S: Stream<I
left: None, left: None,
} }
} }
} }
impl<T, I> Stream for IntoBinnedTDefaultStream<T, I> impl<T, I> Stream for IntoBinnedTDefaultStream<T, I>
where I: AggregatableTdim + Unpin, T: Stream<Item=Result<I, Error>> + Unpin, I::Aggregator: Unpin where
I: AggregatableTdim + Unpin,
T: Stream<Item = Result<I, Error>> + Unpin,
I::Aggregator: Unpin,
{ {
type Item = Result<<I::Aggregator as AggregatorTdim>::OutputValue, Error>; type Item = Result<<I::Aggregator as AggregatorTdim>::OutputValue, Error>;
@@ -648,11 +693,9 @@ where I: AggregatableTdim + Unpin, T: Stream<Item=Result<I, Error>> + Unpin, I::
'outer: loop { 'outer: loop {
let cur = if self.curbin as u64 >= self.spec.count { let cur = if self.curbin as u64 >= self.spec.count {
Ready(None) Ready(None)
} } else if let Some(k) = self.left.take() {
else if let Some(k) = self.left.take() {
k k
} } else {
else {
self.inp.poll_next_unpin(cx) self.inp.poll_next_unpin(cx)
}; };
break match cur { break match cur {
@@ -666,14 +709,12 @@ where I: AggregatableTdim + Unpin, T: Stream<Item=Result<I, Error>> + Unpin, I::
if ag.ends_before(&k) { if ag.ends_before(&k) {
//info!("ENDS BEFORE"); //info!("ENDS BEFORE");
continue 'outer; continue 'outer;
} } else if ag.starts_after(&k) {
else if ag.starts_after(&k) {
//info!("STARTS AFTER"); //info!("STARTS AFTER");
self.left = Some(Ready(Some(Ok(k)))); self.left = Some(Ready(Some(Ok(k))));
self.curbin += 1; self.curbin += 1;
Ready(Some(Ok(self.aggtor.take().unwrap().result()))) Ready(Some(Ok(self.aggtor.take().unwrap().result())))
} } else {
else {
//info!("INGEST"); //info!("INGEST");
ag.ingest(&k); ag.ingest(&k);
// if this input contains also data after the current bin, then I need to keep // if this input contains also data after the current bin, then I need to keep
@@ -683,30 +724,24 @@ where I: AggregatableTdim + Unpin, T: Stream<Item=Result<I, Error>> + Unpin, I::
self.left = Some(Ready(Some(Ok(k)))); self.left = Some(Ready(Some(Ok(k))));
self.curbin += 1; self.curbin += 1;
Ready(Some(Ok(self.aggtor.take().unwrap().result()))) Ready(Some(Ok(self.aggtor.take().unwrap().result())))
} } else {
else {
//info!("ENDS WITHIN"); //info!("ENDS WITHIN");
continue 'outer; continue 'outer;
} }
} }
} }
Ready(Some(Err(e))) => Ready(Some(Err(e))), Ready(Some(Err(e))) => Ready(Some(Err(e))),
Ready(None) => { Ready(None) => match self.aggtor.take() {
match self.aggtor.take() { Some(ag) => Ready(Some(Ok(ag.result()))),
Some(ag) => { None => {
Ready(Some(Ok(ag.result()))) warn!("TODO add trailing bins");
} Ready(None)
None => {
warn!("TODO add trailing bins");
Ready(None)
}
} }
}, },
Pending => Pending, Pending => Pending,
}; };
} }
} }
} }
pub fn make_test_node(id: u32) -> Node { pub fn make_test_node(id: u32) -> Node {
Node { Node {
@@ -719,10 +754,13 @@ pub fn make_test_node(id: u32) -> Node {
} }
} }
#[test] #[test]
fn agg_x_dim_0() { fn agg_x_dim_0() {
taskrun::run(async { agg_x_dim_0_inner().await; Ok(()) }).unwrap(); taskrun::run(async {
agg_x_dim_0_inner().await;
Ok(())
})
.unwrap();
} }
async fn agg_x_dim_0_inner() { async fn agg_x_dim_0_inner() {
@@ -750,32 +788,35 @@ async fn agg_x_dim_0_inner() {
let ts1 = query.timebin as u64 * query.channel_config.time_bin_size; let ts1 = query.timebin as u64 * query.channel_config.time_bin_size;
let ts2 = ts1 + HOUR * 24; let ts2 = ts1 + HOUR * 24;
let fut1 = crate::EventBlobsComplete::new(&query, query.channel_config.clone(), node) let fut1 = crate::EventBlobsComplete::new(&query, query.channel_config.clone(), node)
.into_dim_1_f32_stream() .into_dim_1_f32_stream()
//.take(1000) //.take(1000)
.map(|q| { .map(|q| {
if let Ok(ref k) = q { if let Ok(ref k) = q {
//info!("vals: {:?}", k); //info!("vals: {:?}", k);
} }
q q
}) })
.into_binned_x_bins_1() .into_binned_x_bins_1()
.map(|k| { .map(|k| {
//info!("after X binning {:?}", k.as_ref().unwrap()); //info!("after X binning {:?}", k.as_ref().unwrap());
k k
}) })
.into_binned_t(BinSpecDimT::over_range(bin_count, ts1, ts2)) .into_binned_t(BinSpecDimT::over_range(bin_count, ts1, ts2))
.map(|k| { .map(|k| {
info!("after T binning {:?}", k.as_ref().unwrap()); info!("after T binning {:?}", k.as_ref().unwrap());
k k
}) })
.for_each(|k| ready(())); .for_each(|k| ready(()));
fut1.await; fut1.await;
} }
#[test] #[test]
fn agg_x_dim_1() { fn agg_x_dim_1() {
taskrun::run(async { agg_x_dim_1_inner().await; Ok(()) }).unwrap(); taskrun::run(async {
agg_x_dim_1_inner().await;
Ok(())
})
.unwrap();
} }
async fn agg_x_dim_1_inner() { async fn agg_x_dim_1_inner() {
@@ -806,31 +847,35 @@ async fn agg_x_dim_1_inner() {
let ts1 = query.timebin as u64 * query.channel_config.time_bin_size; let ts1 = query.timebin as u64 * query.channel_config.time_bin_size;
let ts2 = ts1 + HOUR * 24; let ts2 = ts1 + HOUR * 24;
let fut1 = crate::EventBlobsComplete::new(&query, query.channel_config.clone(), node) let fut1 = crate::EventBlobsComplete::new(&query, query.channel_config.clone(), node)
.into_dim_1_f32_stream() .into_dim_1_f32_stream()
//.take(1000) //.take(1000)
.map(|q| { .map(|q| {
if let Ok(ref k) = q { if let Ok(ref k) = q {
//info!("vals: {:?}", k); //info!("vals: {:?}", k);
} }
q q
}) })
.into_binned_x_bins_1() .into_binned_x_bins_1()
.map(|k| { .map(|k| {
//info!("after X binning {:?}", k.as_ref().unwrap()); //info!("after X binning {:?}", k.as_ref().unwrap());
k k
}) })
.into_binned_t(BinSpecDimT::over_range(bin_count, ts1, ts2)) .into_binned_t(BinSpecDimT::over_range(bin_count, ts1, ts2))
.map(|k| { .map(|k| {
info!("after T binning {:?}", k.as_ref().unwrap()); info!("after T binning {:?}", k.as_ref().unwrap());
k k
}) })
.for_each(|k| ready(())); .for_each(|k| ready(()));
fut1.await; fut1.await;
} }
#[test] #[test]
fn merge_0() { fn merge_0() {
taskrun::run(async { merge_0_inner().await; Ok(()) }).unwrap(); taskrun::run(async {
merge_0_inner().await;
Ok(())
})
.unwrap();
} }
async fn merge_0_inner() { async fn merge_0_inner() {
@@ -852,26 +897,23 @@ async fn merge_0_inner() {
tb_file_count: 1, tb_file_count: 1,
buffer_size: 1024 * 8, buffer_size: 1024 * 8,
}; };
let streams = (0..13).into_iter() let streams = (0..13)
.map(|k| { .into_iter()
make_test_node(k) .map(|k| make_test_node(k))
}) .map(|node| {
.map(|node| { let node = Arc::new(node);
let node = Arc::new(node); crate::EventBlobsComplete::new(&query, query.channel_config.clone(), node)
crate::EventBlobsComplete::new(&query, query.channel_config.clone(), node) .into_dim_1_f32_stream()
.into_dim_1_f32_stream() })
}) .collect();
.collect();
MergeDim1F32Stream::new(streams) MergeDim1F32Stream::new(streams)
.map(|k| { .map(|k| {
//info!("NEXT MERGED ITEM ts {:?}", k.as_ref().unwrap().tss); //info!("NEXT MERGED ITEM ts {:?}", k.as_ref().unwrap().tss);
}) })
.fold(0, |k, q| ready(0)) .fold(0, |k, q| ready(0))
.await; .await;
} }
pub fn tmp_some_older_things() { pub fn tmp_some_older_things() {
let vals = ValuesDim1 { let vals = ValuesDim1 {
tss: vec![0, 1, 2, 3], tss: vec![0, 1, 2, 3],
+166 -150
View File
@@ -1,20 +1,23 @@
#[allow(unused_imports)] use crate::agg::MinMaxAvgScalarBinBatch;
use tracing::{error, warn, info, debug, trace}; use bytes::{BufMut, Bytes, BytesMut};
use chrono::{DateTime, Utc};
use err::Error; use err::Error;
use futures_core::Stream;
use futures_util::{pin_mut, FutureExt, StreamExt, TryFutureExt};
use http::uri::Scheme;
use netpod::{
AggKind, Channel, Cluster, NanoRange, Node, NodeConfig, PreBinnedPatchCoord,
PreBinnedPatchGridSpec, PreBinnedPatchIterator, PreBinnedPatchRange, ToNanos,
};
use serde::{Deserialize, Serialize};
use std::future::{ready, Future}; use std::future::{ready, Future};
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll};
use futures_core::Stream;
use futures_util::{StreamExt, FutureExt, pin_mut, TryFutureExt};
use bytes::{Bytes, BytesMut, BufMut};
use chrono::{DateTime, Utc};
use netpod::{Node, Cluster, AggKind, NanoRange, ToNanos, PreBinnedPatchRange, PreBinnedPatchIterator, PreBinnedPatchCoord, Channel, NodeConfig, PreBinnedPatchGridSpec};
use crate::agg::MinMaxAvgScalarBinBatch;
use http::uri::Scheme;
use tiny_keccak::Hasher;
use serde::{Serialize, Deserialize};
use std::sync::Arc; use std::sync::Arc;
use std::task::{Context, Poll};
use tiny_keccak::Hasher;
use tokio::fs::OpenOptions; use tokio::fs::OpenOptions;
#[allow(unused_imports)]
use tracing::{debug, error, info, trace, warn};
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Query { pub struct Query {
@@ -25,17 +28,24 @@ pub struct Query {
} }
impl Query { impl Query {
pub fn from_request(req: &http::request::Parts) -> Result<Self, Error> { pub fn from_request(req: &http::request::Parts) -> Result<Self, Error> {
let params = netpod::query_params(req.uri.query()); let params = netpod::query_params(req.uri.query());
let beg_date = params.get("beg_date").ok_or(Error::with_msg("missing beg_date"))?; let beg_date = params
let end_date = params.get("end_date").ok_or(Error::with_msg("missing end_date"))?; .get("beg_date")
.ok_or(Error::with_msg("missing beg_date"))?;
let end_date = params
.get("end_date")
.ok_or(Error::with_msg("missing end_date"))?;
let ret = Query { let ret = Query {
range: NanoRange { range: NanoRange {
beg: beg_date.parse::<DateTime<Utc>>()?.to_nanos(), beg: beg_date.parse::<DateTime<Utc>>()?.to_nanos(),
end: end_date.parse::<DateTime<Utc>>()?.to_nanos(), end: end_date.parse::<DateTime<Utc>>()?.to_nanos(),
}, },
count: params.get("bin_count").ok_or(Error::with_msg("missing beg_date"))?.parse().unwrap(), count: params
.get("bin_count")
.ok_or(Error::with_msg("missing beg_date"))?
.parse()
.unwrap(),
agg_kind: AggKind::DimXBins1, agg_kind: AggKind::DimXBins1,
channel: Channel { channel: Channel {
backend: params.get("channel_backend").unwrap().into(), backend: params.get("channel_backend").unwrap().into(),
@@ -45,11 +55,12 @@ impl Query {
info!("Query::from_request {:?}", ret); info!("Query::from_request {:?}", ret);
Ok(ret) Ok(ret)
} }
} }
pub fn binned_bytes_for_http(
pub fn binned_bytes_for_http(node_config: Arc<NodeConfig>, query: &Query) -> Result<BinnedBytesForHttpStream, Error> { node_config: Arc<NodeConfig>,
query: &Query,
) -> Result<BinnedBytesForHttpStream, Error> {
let agg_kind = AggKind::DimXBins1; let agg_kind = AggKind::DimXBins1;
// TODO // TODO
@@ -59,15 +70,18 @@ pub fn binned_bytes_for_http(node_config: Arc<NodeConfig>, query: &Query) -> Res
Some(spec) => { Some(spec) => {
info!("GOT PreBinnedPatchGridSpec: {:?}", spec); info!("GOT PreBinnedPatchGridSpec: {:?}", spec);
warn!("Pass here to BinnedStream what kind of Agg, range, ..."); warn!("Pass here to BinnedStream what kind of Agg, range, ...");
let s1 = BinnedStream::new(PreBinnedPatchIterator::from_range(spec), query.channel.clone(), agg_kind, node_config.clone()); let s1 = BinnedStream::new(
PreBinnedPatchIterator::from_range(spec),
query.channel.clone(),
agg_kind,
node_config.clone(),
);
// Iterate over the patches. // Iterate over the patches.
// Request the patch from each node. // Request the patch from each node.
// Merge. // Merge.
// Agg+Bin. // Agg+Bin.
// Deliver. // Deliver.
let ret = BinnedBytesForHttpStream { let ret = BinnedBytesForHttpStream { inp: s1 };
inp: s1,
};
Ok(ret) Ok(ret)
} }
None => { None => {
@@ -78,13 +92,11 @@ pub fn binned_bytes_for_http(node_config: Arc<NodeConfig>, query: &Query) -> Res
} }
} }
pub struct BinnedBytesForHttpStream { pub struct BinnedBytesForHttpStream {
inp: BinnedStream, inp: BinnedStream,
} }
impl BinnedBytesForHttpStream { impl BinnedBytesForHttpStream {}
}
impl Stream for BinnedBytesForHttpStream { impl Stream for BinnedBytesForHttpStream {
type Item = Result<Bytes, Error>; type Item = Result<Bytes, Error>;
@@ -102,13 +114,8 @@ impl Stream for BinnedBytesForHttpStream {
Pending => Pending, Pending => Pending,
} }
} }
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct PreBinnedQuery { pub struct PreBinnedQuery {
patch: PreBinnedPatchCoord, patch: PreBinnedPatchCoord,
@@ -117,7 +124,6 @@ pub struct PreBinnedQuery {
} }
impl PreBinnedQuery { impl PreBinnedQuery {
pub fn from_request(req: &http::request::Parts) -> Result<Self, Error> { pub fn from_request(req: &http::request::Parts) -> Result<Self, Error> {
let params = netpod::query_params(req.uri.query()); let params = netpod::query_params(req.uri.query());
let ret = PreBinnedQuery { let ret = PreBinnedQuery {
@@ -131,34 +137,44 @@ impl PreBinnedQuery {
info!("PreBinnedQuery::from_request {:?}", ret); info!("PreBinnedQuery::from_request {:?}", ret);
Ok(ret) Ok(ret)
} }
} }
// NOTE This answers a request for a single valid pre-binned patch. // NOTE This answers a request for a single valid pre-binned patch.
// A user must first make sure that the grid spec is valid, and that this node is responsible for it. // A user must first make sure that the grid spec is valid, and that this node is responsible for it.
// Otherwise it is an error. // Otherwise it is an error.
pub fn pre_binned_bytes_for_http(node_config: Arc<NodeConfig>, query: &PreBinnedQuery) -> Result<PreBinnedValueByteStream, Error> { pub fn pre_binned_bytes_for_http(
info!("pre_binned_bytes_for_http {:?} {:?}", query, node_config.node); node_config: Arc<NodeConfig>,
let ret = PreBinnedValueByteStream::new(query.patch.clone(), query.channel.clone(), query.agg_kind.clone(), node_config); query: &PreBinnedQuery,
) -> Result<PreBinnedValueByteStream, Error> {
info!(
"pre_binned_bytes_for_http {:?} {:?}",
query, node_config.node
);
let ret = PreBinnedValueByteStream::new(
query.patch.clone(),
query.channel.clone(),
query.agg_kind.clone(),
node_config,
);
Ok(ret) Ok(ret)
} }
pub struct PreBinnedValueByteStream { pub struct PreBinnedValueByteStream {
inp: PreBinnedValueStream, inp: PreBinnedValueStream,
} }
impl PreBinnedValueByteStream { impl PreBinnedValueByteStream {
pub fn new(
pub fn new(patch: PreBinnedPatchCoord, channel: Channel, agg_kind: AggKind, node_config: Arc<NodeConfig>) -> Self { patch: PreBinnedPatchCoord,
channel: Channel,
agg_kind: AggKind,
node_config: Arc<NodeConfig>,
) -> Self {
warn!("PreBinnedValueByteStream"); warn!("PreBinnedValueByteStream");
Self { Self {
inp: PreBinnedValueStream::new(patch, channel, agg_kind, node_config), inp: PreBinnedValueStream::new(patch, channel, agg_kind, node_config),
} }
} }
} }
impl Stream for PreBinnedValueByteStream { impl Stream for PreBinnedValueByteStream {
@@ -177,25 +193,25 @@ impl Stream for PreBinnedValueByteStream {
Pending => Pending, Pending => Pending,
} }
} }
} }
pub struct PreBinnedValueStream { pub struct PreBinnedValueStream {
patch_coord: PreBinnedPatchCoord, patch_coord: PreBinnedPatchCoord,
channel: Channel, channel: Channel,
agg_kind: AggKind, agg_kind: AggKind,
node_config: Arc<NodeConfig>, node_config: Arc<NodeConfig>,
open_check_local_file: Option<Pin<Box<dyn Future<Output=Result<tokio::fs::File, std::io::Error>> + Send>>>, open_check_local_file:
fut2: Option<Pin<Box<dyn Stream<Item=Result<MinMaxAvgScalarBinBatch, Error>> + Send>>>, Option<Pin<Box<dyn Future<Output = Result<tokio::fs::File, std::io::Error>> + Send>>>,
fut2: Option<Pin<Box<dyn Stream<Item = Result<MinMaxAvgScalarBinBatch, Error>> + Send>>>,
} }
impl PreBinnedValueStream { impl PreBinnedValueStream {
pub fn new(
pub fn new(patch_coord: PreBinnedPatchCoord, channel: Channel, agg_kind: AggKind, node_config: Arc<NodeConfig>) -> Self { patch_coord: PreBinnedPatchCoord,
channel: Channel,
agg_kind: AggKind,
node_config: Arc<NodeConfig>,
) -> Self {
let node_ix = node_ix_for_patch(&patch_coord, &channel, &node_config.cluster); let node_ix = node_ix_for_patch(&patch_coord, &channel, &node_config.cluster);
assert!(node_ix == node_config.node.id); assert!(node_ix == node_config.node.id);
Self { Self {
@@ -209,7 +225,10 @@ impl PreBinnedValueStream {
} }
fn try_setup_fetch_prebinned_higher_res(&mut self) { fn try_setup_fetch_prebinned_higher_res(&mut self) {
info!("try to find a next better granularity for {:?}", self.patch_coord); info!(
"try to find a next better granularity for {:?}",
self.patch_coord
);
let g = self.patch_coord.bin_t_len(); let g = self.patch_coord.bin_t_len();
let range = NanoRange { let range = NanoRange {
beg: self.patch_coord.patch_beg(), beg: self.patch_coord.patch_beg(),
@@ -218,7 +237,14 @@ impl PreBinnedValueStream {
match PreBinnedPatchRange::covering_range(range, self.patch_coord.bin_count() + 1) { match PreBinnedPatchRange::covering_range(range, self.patch_coord.bin_count() + 1) {
Some(range) => { Some(range) => {
let h = range.grid_spec.bin_t_len(); let h = range.grid_spec.bin_t_len();
info!("FOUND NEXT GRAN g {} h {} ratio {} mod {} {:?}", g, h, g/h, g%h, range); info!(
"FOUND NEXT GRAN g {} h {} ratio {} mod {} {:?}",
g,
h,
g / h,
g % h,
range
);
assert!(g / h > 1); assert!(g / h > 1);
assert!(g / h < 20); assert!(g / h < 20);
assert!(g % h == 0); assert!(g % h == 0);
@@ -228,14 +254,19 @@ impl PreBinnedValueStream {
let node_config = self.node_config.clone(); let node_config = self.node_config.clone();
let mut patch_it = PreBinnedPatchIterator::from_range(range); let mut patch_it = PreBinnedPatchIterator::from_range(range);
let s = futures_util::stream::iter(patch_it) let s = futures_util::stream::iter(patch_it)
.map(move |coord| { .map(move |coord| {
PreBinnedValueFetchedStream::new(coord, channel.clone(), agg_kind.clone(), node_config.clone()) PreBinnedValueFetchedStream::new(
}) coord,
.flatten() channel.clone(),
.map(move |k| { agg_kind.clone(),
info!("ITEM from sub res bin_size {} {:?}", bin_size, k); node_config.clone(),
k )
}); })
.flatten()
.map(move |k| {
info!("ITEM from sub res bin_size {} {:?}", bin_size, k);
k
});
self.fut2 = Some(Box::pin(s)); self.fut2 = Some(Box::pin(s));
} }
None => { None => {
@@ -250,7 +281,6 @@ impl PreBinnedValueStream {
} }
} }
} }
} }
impl Stream for PreBinnedValueStream { impl Stream for PreBinnedValueStream {
@@ -262,48 +292,41 @@ impl Stream for PreBinnedValueStream {
'outer: loop { 'outer: loop {
break if let Some(fut) = self.fut2.as_mut() { break if let Some(fut) = self.fut2.as_mut() {
fut.poll_next_unpin(cx) fut.poll_next_unpin(cx)
} } else if let Some(fut) = self.open_check_local_file.as_mut() {
else if let Some(fut) = self.open_check_local_file.as_mut() {
match fut.poll_unpin(cx) { match fut.poll_unpin(cx) {
Ready(Ok(file)) => { Ready(Ok(file)) => {
todo!("IMPLEMENT READ FROM LOCAL CACHE"); todo!("IMPLEMENT READ FROM LOCAL CACHE");
Pending Pending
} }
Ready(Err(e)) => { Ready(Err(e)) => match e.kind() {
match e.kind() { std::io::ErrorKind::NotFound => {
std::io::ErrorKind::NotFound => { warn!("TODO LOCAL CACHE FILE NOT FOUND");
warn!("TODO LOCAL CACHE FILE NOT FOUND"); self.try_setup_fetch_prebinned_higher_res();
self.try_setup_fetch_prebinned_higher_res(); continue 'outer;
continue 'outer;
}
_ => {
error!("File I/O error: {:?}", e);
Ready(Some(Err(e.into())))
}
} }
} _ => {
error!("File I/O error: {:?}", e);
Ready(Some(Err(e.into())))
}
},
Pending => Pending, Pending => Pending,
} }
} } else {
else {
use std::os::unix::fs::OpenOptionsExt; use std::os::unix::fs::OpenOptionsExt;
let mut opts = std::fs::OpenOptions::new(); let mut opts = std::fs::OpenOptions::new();
opts.read(true); opts.read(true);
let fut = async { let fut = async {
tokio::fs::OpenOptions::from(opts) tokio::fs::OpenOptions::from(opts)
.open("/DOESNOTEXIST").await .open("/DOESNOTEXIST")
.await
}; };
self.open_check_local_file = Some(Box::pin(fut)); self.open_check_local_file = Some(Box::pin(fut));
continue 'outer; continue 'outer;
} };
} }
} }
} }
pub struct PreBinnedValueFetchedStream { pub struct PreBinnedValueFetchedStream {
uri: http::Uri, uri: http::Uri,
patch_coord: PreBinnedPatchCoord, patch_coord: PreBinnedPatchCoord,
@@ -312,8 +335,12 @@ pub struct PreBinnedValueFetchedStream {
} }
impl PreBinnedValueFetchedStream { impl PreBinnedValueFetchedStream {
pub fn new(
pub fn new(patch_coord: PreBinnedPatchCoord, channel: Channel, agg_kind: AggKind, node_config: Arc<NodeConfig>) -> Self { patch_coord: PreBinnedPatchCoord,
channel: Channel,
agg_kind: AggKind,
node_config: Arc<NodeConfig>,
) -> Self {
let nodeix = node_ix_for_patch(&patch_coord, &channel, &node_config.cluster); let nodeix = node_ix_for_patch(&patch_coord, &channel, &node_config.cluster);
let node = &node_config.cluster.nodes[nodeix as usize]; let node = &node_config.cluster.nodes[nodeix as usize];
warn!("TODO defining property of a PreBinnedPatchCoord? patchlen + ix? binsize + patchix? binsize + patchsize + patchix?"); warn!("TODO defining property of a PreBinnedPatchCoord? patchlen + ix? binsize + patchix? binsize + patchsize + patchix?");
@@ -325,7 +352,9 @@ impl PreBinnedValueFetchedStream {
channel.backend, channel.backend,
channel.name, channel.name,
agg_kind, agg_kind,
).parse().unwrap(); )
.parse()
.unwrap();
Self { Self {
uri, uri,
patch_coord, patch_coord,
@@ -333,7 +362,6 @@ impl PreBinnedValueFetchedStream {
res: None, res: None,
} }
} }
} }
impl Stream for PreBinnedValueFetchedStream { impl Stream for PreBinnedValueFetchedStream {
@@ -351,79 +379,72 @@ impl Stream for PreBinnedValueFetchedStream {
pin_mut!(res); pin_mut!(res);
use hyper::body::HttpBody; use hyper::body::HttpBody;
match res.poll_data(cx) { match res.poll_data(cx) {
Ready(Some(Ok(k))) => { Ready(Some(Ok(k))) => Pending,
Pending
}
Ready(Some(Err(e))) => Ready(Some(Err(e.into()))), Ready(Some(Err(e))) => Ready(Some(Err(e.into()))),
Ready(None) => Ready(None), Ready(None) => Ready(None),
Pending => Pending, Pending => Pending,
} }
} }
None => { None => match self.resfut.as_mut() {
match self.resfut.as_mut() { Some(mut resfut) => match resfut.poll_unpin(cx) {
Some(mut resfut) => { Ready(res) => match res {
match resfut.poll_unpin(cx) { Ok(res) => {
Ready(res) => { info!("GOT result from SUB REQUEST: {:?}", res);
match res { self.res = Some(res);
Ok(res) => { continue 'outer;
info!("GOT result from SUB REQUEST: {:?}", res);
self.res = Some(res);
continue 'outer;
}
Err(e) => {
error!("PreBinnedValueStream error in stream {:?}", e);
Ready(Some(Err(e.into())))
}
}
}
Pending => {
Pending
}
} }
} Err(e) => {
None => { error!("PreBinnedValueStream error in stream {:?}", e);
let req = hyper::Request::builder() Ready(Some(Err(e.into())))
}
},
Pending => Pending,
},
None => {
let req = hyper::Request::builder()
.method(http::Method::GET) .method(http::Method::GET)
.uri(&self.uri) .uri(&self.uri)
.body(hyper::Body::empty())?; .body(hyper::Body::empty())?;
let client = hyper::Client::new(); let client = hyper::Client::new();
info!("START REQUEST FOR {:?}", req); info!("START REQUEST FOR {:?}", req);
self.resfut = Some(client.request(req)); self.resfut = Some(client.request(req));
continue 'outer; continue 'outer;
}
} }
} },
} };
} }
} }
} }
pub struct BinnedStream { pub struct BinnedStream {
inp: Pin<Box<dyn Stream<Item=Result<MinMaxAvgScalarBinBatch, Error>> + Send>>, inp: Pin<Box<dyn Stream<Item = Result<MinMaxAvgScalarBinBatch, Error>> + Send>>,
} }
impl BinnedStream { impl BinnedStream {
pub fn new(
pub fn new(patch_it: PreBinnedPatchIterator, channel: Channel, agg_kind: AggKind, node_config: Arc<NodeConfig>) -> Self { patch_it: PreBinnedPatchIterator,
channel: Channel,
agg_kind: AggKind,
node_config: Arc<NodeConfig>,
) -> Self {
warn!("BinnedStream will open a PreBinnedValueStream"); warn!("BinnedStream will open a PreBinnedValueStream");
let mut patch_it = patch_it; let mut patch_it = patch_it;
let inp = futures_util::stream::iter(patch_it) let inp = futures_util::stream::iter(patch_it)
.map(move |coord| { .map(move |coord| {
PreBinnedValueFetchedStream::new(coord, channel.clone(), agg_kind.clone(), node_config.clone()) PreBinnedValueFetchedStream::new(
}) coord,
.flatten() channel.clone(),
.map(|k| { agg_kind.clone(),
info!("ITEM {:?}", k); node_config.clone(),
k )
}); })
Self { .flatten()
inp: Box::pin(inp), .map(|k| {
} info!("ITEM {:?}", k);
k
});
Self { inp: Box::pin(inp) }
} }
} }
impl Stream for BinnedStream { impl Stream for BinnedStream {
@@ -433,33 +454,28 @@ impl Stream for BinnedStream {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*; use Poll::*;
match self.inp.poll_next_unpin(cx) { match self.inp.poll_next_unpin(cx) {
Ready(Some(Ok(k))) => { Ready(Some(Ok(k))) => Ready(Some(Ok(k))),
Ready(Some(Ok(k)))
}
Ready(Some(Err(e))) => Ready(Some(Err(e))), Ready(Some(Err(e))) => Ready(Some(Err(e))),
Ready(None) => Ready(None), Ready(None) => Ready(None),
Pending => Pending, Pending => Pending,
} }
} }
} }
pub struct SomeReturnThing {} pub struct SomeReturnThing {}
impl From<SomeReturnThing> for Bytes { impl From<SomeReturnThing> for Bytes {
fn from(k: SomeReturnThing) -> Self { fn from(k: SomeReturnThing) -> Self {
error!("TODO convert result to octets"); error!("TODO convert result to octets");
todo!("TODO convert result to octets") todo!("TODO convert result to octets")
} }
} }
pub fn node_ix_for_patch(
patch_coord: &PreBinnedPatchCoord,
pub fn node_ix_for_patch(patch_coord: &PreBinnedPatchCoord, channel: &Channel, cluster: &Cluster) -> u32 { channel: &Channel,
cluster: &Cluster,
) -> u32 {
let mut hash = tiny_keccak::Sha3::v256(); let mut hash = tiny_keccak::Sha3::v256();
hash.update(channel.backend.as_bytes()); hash.update(channel.backend.as_bytes());
hash.update(channel.name.as_bytes()); hash.update(channel.name.as_bytes());
+61 -23
View File
@@ -1,10 +1,15 @@
use crate::{BadError, Error};
use nom::number::complete::{be_i16, be_i32, be_i64, be_i8, be_u8};
#[allow(unused_imports)] #[allow(unused_imports)]
use nom::{IResult, bytes::complete::{tag, take, take_while_m_n}, combinator::map_res, sequence::tuple}; use nom::{
use nom::number::complete::{be_i8, be_u8, be_i16, be_i32, be_i64}; bytes::complete::{tag, take, take_while_m_n},
use crate::{Error, BadError}; combinator::map_res,
sequence::tuple,
IResult,
};
use num_derive::{FromPrimitive, ToPrimitive}; use num_derive::{FromPrimitive, ToPrimitive};
use num_traits::{ToPrimitive}; use num_traits::ToPrimitive;
use serde_derive::{Serialize, Deserialize}; use serde_derive::{Deserialize, Serialize};
#[derive(Debug, FromPrimitive, ToPrimitive, Serialize, Deserialize)] #[derive(Debug, FromPrimitive, ToPrimitive, Serialize, Deserialize)]
pub enum DType { pub enum DType {
@@ -25,7 +30,9 @@ pub enum DType {
} }
impl DType { impl DType {
pub fn to_i16(&self) -> i16 { ToPrimitive::to_i16(self).unwrap() } pub fn to_i16(&self) -> i16 {
ToPrimitive::to_i16(self).unwrap()
}
} }
#[derive(Debug, FromPrimitive, ToPrimitive, Serialize, Deserialize)] #[derive(Debug, FromPrimitive, ToPrimitive, Serialize, Deserialize)]
@@ -34,7 +41,9 @@ pub enum CompressionMethod {
} }
impl CompressionMethod { impl CompressionMethod {
pub fn to_i16(&self) -> i16 { ToPrimitive::to_i16(self).unwrap() } pub fn to_i16(&self) -> i16 {
ToPrimitive::to_i16(self).unwrap()
}
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
@@ -106,7 +115,7 @@ pub fn parseEntry(inp: &[u8]) -> Result<(&[u8], Option<ConfigEntry>), Error> {
if inp.len() < len1 as usize - 4 { if inp.len() < len1 as usize - 4 {
return BadError(format!("incomplete input")); return BadError(format!("incomplete input"));
} }
let inpE = &inp[(len1-8) as usize ..]; let inpE = &inp[(len1 - 8) as usize..];
let (inp, ts) = be_i64(inp)?; let (inp, ts) = be_i64(inp)?;
let (inp, pulse) = be_i64(inp)?; let (inp, pulse) = be_i64(inp)?;
let (inp, ks) = be_i32(inp)?; let (inp, ks) = be_i32(inp)?;
@@ -132,7 +141,7 @@ pub fn parseEntry(inp: &[u8]) -> Result<(&[u8], Option<ConfigEntry>), Error> {
} }
let dtype = match num_traits::FromPrimitive::from_i8(dtype) { let dtype = match num_traits::FromPrimitive::from_i8(dtype) {
Some(k) => k, Some(k) => k,
None => return BadError(format!("Can not convert {} to DType", dtype)) None => return BadError(format!("Can not convert {} to DType", dtype)),
}; };
let (inp, compressionMethod) = match isCompressed { let (inp, compressionMethod) = match isCompressed {
false => (inp, None), false => (inp, None),
@@ -148,7 +157,9 @@ pub fn parseEntry(inp: &[u8]) -> Result<(&[u8], Option<ConfigEntry>), Error> {
false => (inp, None), false => (inp, None),
true => { true => {
let (mut inp, dim) = be_u8(inp)?; let (mut inp, dim) = be_u8(inp)?;
if dim > 4 { return BadError(format!("unexpected number of dimensions: {}", dim)); } if dim > 4 {
return BadError(format!("unexpected number of dimensions: {}", dim));
}
let mut shape = vec![]; let mut shape = vec![];
for _ in 0..dim { for _ in 0..dim {
let t1 = be_i32(inp)?; let t1 = be_i32(inp)?;
@@ -168,11 +179,33 @@ pub fn parseEntry(inp: &[u8]) -> Result<(&[u8], Option<ConfigEntry>), Error> {
if len1 != len2 { if len1 != len2 {
return BadError(format!("mismatch len1 {} len2 {}", len1, len2)); return BadError(format!("mismatch len1 {} len2 {}", len1, len2));
} }
Ok((inpE, Some(ConfigEntry { Ok((
ts, pulse, ks, bs, splitCount, status, bb, modulo, offset, precision, dtype, inpE,
isCompressed, isArray, isShaped, isBigEndian, compressionMethod, shape, Some(ConfigEntry {
sourceName, unit, description, optionalFields, valueConverter, ts,
}))) pulse,
ks,
bs,
splitCount,
status,
bb,
modulo,
offset,
precision,
dtype,
isCompressed,
isArray,
isShaped,
isBigEndian,
compressionMethod,
shape,
sourceName,
unit,
description,
optionalFields,
valueConverter,
}),
))
} }
/* /*
@@ -194,10 +227,12 @@ pub fn parseConfig(inp: &[u8]) -> Result<Config, Error> {
while inpA.len() > 0 { while inpA.len() > 0 {
let inp = inpA; let inp = inpA;
let (inp, e) = parseEntry(inp)?; let (inp, e) = parseEntry(inp)?;
if let Some(e) = e { entries.push(e); } if let Some(e) = e {
entries.push(e);
}
inpA = inp; inpA = inp;
} }
Ok(Config{ Ok(Config {
formatVersion: ver, formatVersion: ver,
channelName: String::from_utf8(chn.to_vec())?, channelName: String::from_utf8(chn.to_vec())?,
entries: entries, entries: entries,
@@ -207,21 +242,24 @@ pub fn parseConfig(inp: &[u8]) -> Result<Config, Error> {
#[cfg(test)] #[cfg(test)]
fn read_data() -> Vec<u8> { fn read_data() -> Vec<u8> {
use std::io::Read; use std::io::Read;
let mut f1 = std::fs::File::open("ks/config/S10CB01-RLOD100-PUP10:SIG-AMPLT/latest/00000_Config").unwrap(); let mut f1 =
std::fs::File::open("ks/config/S10CB01-RLOD100-PUP10:SIG-AMPLT/latest/00000_Config")
.unwrap();
let mut buf = vec![]; let mut buf = vec![];
f1.read_to_end(&mut buf).unwrap(); f1.read_to_end(&mut buf).unwrap();
buf buf
} }
#[test] fn parse_dummy() { #[test]
let config = parseConfig(&[0, 0, 0, 0, 0, 11, 0x61, 0x62, 0x63, 0, 0, 0, 11, fn parse_dummy() {
0, 0, 0, 1, let config =
]).unwrap(); parseConfig(&[0, 0, 0, 0, 0, 11, 0x61, 0x62, 0x63, 0, 0, 0, 11, 0, 0, 0, 1]).unwrap();
assert_eq!(0, config.formatVersion); assert_eq!(0, config.formatVersion);
assert_eq!("abc", config.channelName); assert_eq!("abc", config.channelName);
} }
#[test] fn open_file() { #[test]
fn open_file() {
let config = parseConfig(&readData()).unwrap(); let config = parseConfig(&readData()).unwrap();
assert_eq!(0, config.formatVersion); assert_eq!(0, config.formatVersion);
assert_eq!(9, config.entries.len()); assert_eq!(9, config.entries.len());
+85 -37
View File
@@ -1,21 +1,21 @@
#[allow(unused_imports)] use crate::ChannelConfigExt;
use tracing::{error, warn, info, debug, trace}; use bitshuffle::bitshuffle_compress;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use err::Error; use err::Error;
use std::task::{Context, Poll};
use std::future::Future;
use futures_core::Stream; use futures_core::Stream;
use futures_util::future::FusedFuture; use futures_util::future::FusedFuture;
use futures_util::{pin_mut, StreamExt}; use futures_util::{pin_mut, StreamExt};
use std::pin::Pin;
use tokio::io::{AsyncRead, AsyncWriteExt};
use tokio::fs::{OpenOptions, File};
use bytes::{Bytes, BytesMut, BufMut, Buf};
use std::path::{Path, PathBuf};
use bitshuffle::bitshuffle_compress;
use netpod::ScalarType; use netpod::ScalarType;
use netpod::{timeunits::*, Channel, ChannelConfig, Node, Shape};
use std::future::Future;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::Arc; use std::sync::Arc;
use netpod::{Node, Channel, ChannelConfig, Shape, timeunits::*}; use std::task::{Context, Poll};
use crate::ChannelConfigExt; use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncRead, AsyncWriteExt};
#[allow(unused_imports)]
use tracing::{debug, error, info, trace, warn};
#[test] #[test]
fn test_gen_test_data() { fn test_gen_test_data() {
@@ -88,31 +88,54 @@ async fn gen_node(node: &Node, ensemble: &Ensemble) -> Result<(), Error> {
} }
async fn gen_channel(chn: &ChannelGenProps, node: &Node, ensemble: &Ensemble) -> Result<(), Error> { async fn gen_channel(chn: &ChannelGenProps, node: &Node, ensemble: &Ensemble) -> Result<(), Error> {
let config_path = node.data_base_path let config_path = node
.join("config") .data_base_path
.join(&chn.config.channel.name); .join("config")
let channel_path = node.data_base_path .join(&chn.config.channel.name);
.join(format!("{}_{}", node.ksprefix, chn.config.keyspace)) let channel_path = node
.join("byTime") .data_base_path
.join(&chn.config.channel.name); .join(format!("{}_{}", node.ksprefix, chn.config.keyspace))
.join("byTime")
.join(&chn.config.channel.name);
tokio::fs::create_dir_all(&channel_path).await?; tokio::fs::create_dir_all(&channel_path).await?;
gen_config(&config_path, &chn.config, node, ensemble).await.map_err(|k| Error::with_msg(format!("can not generate config {:?}", k)))?; gen_config(&config_path, &chn.config, node, ensemble)
.await
.map_err(|k| Error::with_msg(format!("can not generate config {:?}", k)))?;
let mut evix = 0; let mut evix = 0;
let mut ts = 0; let mut ts = 0;
while ts < DAY { while ts < DAY {
let res = gen_timebin(evix, ts, chn.time_spacing, &channel_path, &chn.config, node, ensemble).await?; let res = gen_timebin(
evix,
ts,
chn.time_spacing,
&channel_path,
&chn.config,
node,
ensemble,
)
.await?;
evix = res.evix; evix = res.evix;
ts = res.ts; ts = res.ts;
} }
Ok(()) Ok(())
} }
async fn gen_config(config_path: &Path, config: &ChannelConfig, node: &Node, ensemble: &Ensemble) -> Result<(), Error> { async fn gen_config(
config_path: &Path,
config: &ChannelConfig,
node: &Node,
ensemble: &Ensemble,
) -> Result<(), Error> {
let path = config_path.join("latest"); let path = config_path.join("latest");
tokio::fs::create_dir_all(&path).await?; tokio::fs::create_dir_all(&path).await?;
let path = path.join("00000_Config"); let path = path.join("00000_Config");
info!("try to open {:?}", path); info!("try to open {:?}", path);
let mut file = OpenOptions::new().write(true).create(true).truncate(true).open(path).await?; let mut file = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(path)
.await?;
let mut buf = BytesMut::with_capacity(1024 * 1); let mut buf = BytesMut::with_capacity(1024 * 1);
let ver = 0; let ver = 0;
buf.put_i16(ver); buf.put_i16(ver);
@@ -155,7 +178,9 @@ async fn gen_config(config_path: &Path, config: &ChannelConfig, node: &Node, ens
} }
match config.shape { match config.shape {
Shape::Scalar => {} Shape::Scalar => {}
Shape::Wave(k) => { buf.put_i32(k as i32); } Shape::Wave(k) => {
buf.put_i32(k as i32);
}
} }
let len = buf.len() - p3; let len = buf.len() - p3;
buf.as_mut()[p3..].as_mut().put_i32(len as i32); buf.as_mut()[p3..].as_mut().put_i32(len as i32);
@@ -174,13 +199,28 @@ struct GenTimebinRes {
ts: u64, ts: u64,
} }
async fn gen_timebin(evix: u64, ts: u64, ts_spacing: u64, channel_path: &Path, config: &ChannelConfig, node: &Node, ensemble: &Ensemble) -> Result<GenTimebinRes, Error> { async fn gen_timebin(
evix: u64,
ts: u64,
ts_spacing: u64,
channel_path: &Path,
config: &ChannelConfig,
node: &Node,
ensemble: &Ensemble,
) -> Result<GenTimebinRes, Error> {
let tb = ts / config.time_bin_size; let tb = ts / config.time_bin_size;
let path = channel_path.join(format!("{:019}", tb)).join(format!("{:010}", node.split)); let path = channel_path
.join(format!("{:019}", tb))
.join(format!("{:010}", node.split));
tokio::fs::create_dir_all(&path).await?; tokio::fs::create_dir_all(&path).await?;
let path = path.join(format!("{:019}_{:05}_Data", config.time_bin_size / MS, 0)); let path = path.join(format!("{:019}_{:05}_Data", config.time_bin_size / MS, 0));
info!("open file {:?}", path); info!("open file {:?}", path);
let mut file = OpenOptions::new().write(true).create(true).truncate(true).open(path).await?; let mut file = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(path)
.await?;
gen_datafile_header(&mut file, config).await?; gen_datafile_header(&mut file, config).await?;
let mut evix = evix; let mut evix = evix;
let mut ts = ts; let mut ts = ts;
@@ -192,10 +232,7 @@ async fn gen_timebin(evix: u64, ts: u64, ts_spacing: u64, channel_path: &Path, c
evix += 1; evix += 1;
ts += ts_spacing; ts += ts_spacing;
} }
let ret = GenTimebinRes { let ret = GenTimebinRes { evix, ts };
evix,
ts,
};
Ok(ret) Ok(ret)
} }
@@ -211,7 +248,12 @@ async fn gen_datafile_header(file: &mut File, config: &ChannelConfig) -> Result<
Ok(()) Ok(())
} }
async fn gen_event(file: &mut File, evix: u64, ts: u64, config: &ChannelConfig) -> Result<(), Error> { async fn gen_event(
file: &mut File,
evix: u64,
ts: u64,
config: &ChannelConfig,
) -> Result<(), Error> {
let mut buf = BytesMut::with_capacity(1024 * 16); let mut buf = BytesMut::with_capacity(1024 * 16);
buf.put_i32(0xcafecafe as u32 as i32); buf.put_i32(0xcafecafe as u32 as i32);
buf.put_u64(0xcafecafe); buf.put_u64(0xcafecafe);
@@ -244,19 +286,25 @@ async fn gen_event(file: &mut File, evix: u64, ts: u64, config: &ChannelConfig)
std::io::Write::write_all(&mut c1, &a)?; std::io::Write::write_all(&mut c1, &a)?;
} }
let mut comp = vec![0u8; (ele_size * ele_count + 64) as usize]; let mut comp = vec![0u8; (ele_size * ele_count + 64) as usize];
let n1 = bitshuffle_compress(&vals, &mut comp, ele_count as usize, ele_size as usize, 0).unwrap(); let n1 = bitshuffle_compress(
&vals,
&mut comp,
ele_count as usize,
ele_size as usize,
0,
)
.unwrap();
buf.put_u64(vals.len() as u64); buf.put_u64(vals.len() as u64);
let comp_block_size = 0; let comp_block_size = 0;
buf.put_u32(comp_block_size); buf.put_u32(comp_block_size);
buf.put(&comp[..n1]); buf.put(&comp[..n1]);
} }
_ => todo!() _ => todo!(),
} }
} }
_ => todo!() _ => todo!(),
} }
} } else {
else {
todo!() todo!()
} }
{ {
+156 -166
View File
@@ -1,36 +1,35 @@
#[allow(unused_imports)] use crate::dtflags::{ARRAY, BIG_ENDIAN, COMPRESSION, SHAPE};
use tracing::{error, warn, info, debug, trace}; use bitshuffle::bitshuffle_decompress;
use bytes::{Buf, Bytes, BytesMut};
use err::Error; use err::Error;
use std::task::{Context, Poll};
use std::future::Future;
use futures_core::Stream; use futures_core::Stream;
use futures_util::future::FusedFuture; use futures_util::future::FusedFuture;
use futures_util::{FutureExt, StreamExt, pin_mut, select}; use futures_util::{pin_mut, select, FutureExt, StreamExt};
use std::pin::Pin; use netpod::{ChannelConfig, Node, ScalarType, Shape};
use tokio::io::AsyncRead; use std::future::Future;
use tokio::fs::{OpenOptions, File};
use bytes::{Bytes, BytesMut, Buf};
use std::path::PathBuf; use std::path::PathBuf;
use bitshuffle::bitshuffle_decompress; use std::pin::Pin;
use netpod::{ScalarType, Shape, Node, ChannelConfig};
use std::sync::Arc; use std::sync::Arc;
use crate::dtflags::{COMPRESSION, BIG_ENDIAN, ARRAY, SHAPE}; use std::task::{Context, Poll};
use tokio::fs::{File, OpenOptions};
use tokio::io::AsyncRead;
#[allow(unused_imports)]
use tracing::{debug, error, info, trace, warn};
pub mod agg; pub mod agg;
pub mod cache;
pub mod channelconfig;
pub mod gen; pub mod gen;
pub mod merge; pub mod merge;
pub mod cache;
pub mod raw; pub mod raw;
pub mod channelconfig;
pub async fn read_test_1(
pub async fn read_test_1(query: &netpod::AggQuerySingleChannel, node: Arc<Node>) -> Result<netpod::BodyStream, Error> { query: &netpod::AggQuerySingleChannel,
node: Arc<Node>,
) -> Result<netpod::BodyStream, Error> {
let path = datapath(query.timebin as u64, &query.channel_config, &node); let path = datapath(query.timebin as u64, &query.channel_config, &node);
debug!("try path: {:?}", path); debug!("try path: {:?}", path);
let fin = OpenOptions::new() let fin = OpenOptions::new().read(true).open(path).await?;
.read(true)
.open(path)
.await?;
let meta = fin.metadata().await; let meta = fin.metadata().await;
debug!("file meta {:?}", meta); debug!("file meta {:?}", meta);
let stream = netpod::BodyStream { let stream = netpod::BodyStream {
@@ -68,8 +67,7 @@ impl Stream for FileReader {
let rlen = buf.filled().len(); let rlen = buf.filled().len();
if rlen == 0 { if rlen == 0 {
Poll::Ready(None) Poll::Ready(None)
} } else {
else {
if rlen != blen { if rlen != blen {
info!("short read {} of {}", buf.filled().len(), blen); info!("short read {} of {}", buf.filled().len(), blen);
} }
@@ -77,25 +75,20 @@ impl Stream for FileReader {
Poll::Ready(Some(Ok(buf2.freeze()))) Poll::Ready(Some(Ok(buf2.freeze())))
} }
} }
Poll::Ready(Err(e)) => { Poll::Ready(Err(e)) => Poll::Ready(Some(Err(Error::from(e)))),
Poll::Ready(Some(Err(Error::from(e)))) Poll::Pending => Poll::Pending,
}
Poll::Pending => Poll::Pending
} }
} }
} }
#[allow(dead_code)] #[allow(dead_code)]
struct Fopen1 { struct Fopen1 {
opts: OpenOptions, opts: OpenOptions,
fut: Pin<Box<dyn Future<Output=Result<File, std::io::Error>>>>, fut: Pin<Box<dyn Future<Output = Result<File, std::io::Error>>>>,
term: bool, term: bool,
} }
impl Fopen1 { impl Fopen1 {
pub fn new(path: PathBuf) -> Self { pub fn new(path: PathBuf) -> Self {
let fut = Box::pin(async { let fut = Box::pin(async {
let mut o1 = OpenOptions::new(); let mut o1 = OpenOptions::new();
@@ -104,17 +97,14 @@ impl Fopen1 {
//() == res; //() == res;
//todo!() //todo!()
res.await res.await
}) as Pin<Box<dyn Future<Output=Result<File, std::io::Error>>>>; }) as Pin<Box<dyn Future<Output = Result<File, std::io::Error>>>>;
let _fut2: Box<dyn Future<Output=u32>> = Box::new(async { let _fut2: Box<dyn Future<Output = u32>> = Box::new(async { 123 });
123
});
Self { Self {
opts: OpenOptions::new(), opts: OpenOptions::new(),
fut, fut,
term: false, term: false,
} }
} }
} }
impl Future for Fopen1 { impl Future for Fopen1 {
@@ -126,18 +116,16 @@ impl Future for Fopen1 {
Poll::Ready(Ok(k)) => { Poll::Ready(Ok(k)) => {
self.term = true; self.term = true;
Poll::Ready(Ok(k)) Poll::Ready(Ok(k))
}, }
Poll::Ready(Err(k)) => { Poll::Ready(Err(k)) => {
self.term = true; self.term = true;
Poll::Ready(Err(k.into())) Poll::Ready(Err(k.into()))
}, }
Poll::Pending => Poll::Pending, Poll::Pending => Poll::Pending,
} }
} }
} }
impl FusedFuture for Fopen1 { impl FusedFuture for Fopen1 {
fn is_terminated(&self) -> bool { fn is_terminated(&self) -> bool {
self.term self.term
@@ -146,8 +134,10 @@ impl FusedFuture for Fopen1 {
unsafe impl Send for Fopen1 {} unsafe impl Send for Fopen1 {}
pub fn raw_concat_channel_read_stream_try_open_in_background(
pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::AggQuerySingleChannel, node: Arc<Node>) -> impl Stream<Item=Result<Bytes, Error>> + Send { query: &netpod::AggQuerySingleChannel,
node: Arc<Node>,
) -> impl Stream<Item = Result<Bytes, Error>> + Send {
let mut query = query.clone(); let mut query = query.clone();
let node = node.clone(); let node = node.clone();
async_stream::stream! { async_stream::stream! {
@@ -271,8 +261,10 @@ pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::Agg
} }
} }
pub fn raw_concat_channel_read_stream_file_pipe(
pub fn raw_concat_channel_read_stream_file_pipe(query: &netpod::AggQuerySingleChannel, node: Arc<Node>) -> impl Stream<Item=Result<BytesMut, Error>> + Send { query: &netpod::AggQuerySingleChannel,
node: Arc<Node>,
) -> impl Stream<Item = Result<BytesMut, Error>> + Send {
let query = query.clone(); let query = query.clone();
let node = node.clone(); let node = node.clone();
async_stream::stream! { async_stream::stream! {
@@ -298,7 +290,10 @@ pub fn raw_concat_channel_read_stream_file_pipe(query: &netpod::AggQuerySingleCh
} }
} }
fn open_files(query: &netpod::AggQuerySingleChannel, node: Arc<Node>) -> async_channel::Receiver<Result<tokio::fs::File, Error>> { fn open_files(
query: &netpod::AggQuerySingleChannel,
node: Arc<Node>,
) -> async_channel::Receiver<Result<tokio::fs::File, Error>> {
let (chtx, chrx) = async_channel::bounded(2); let (chtx, chrx) = async_channel::bounded(2);
let mut query = query.clone(); let mut query = query.clone();
let node = node.clone(); let node = node.clone();
@@ -307,32 +302,27 @@ fn open_files(query: &netpod::AggQuerySingleChannel, node: Arc<Node>) -> async_c
for i1 in 0..query.tb_file_count { for i1 in 0..query.tb_file_count {
query.timebin = tb0 + i1; query.timebin = tb0 + i1;
let path = datapath(query.timebin as u64, &query.channel_config, &node); let path = datapath(query.timebin as u64, &query.channel_config, &node);
let fileres = OpenOptions::new() let fileres = OpenOptions::new().read(true).open(&path).await;
.read(true)
.open(&path)
.await;
info!("opened file {:?} {:?}", &path, &fileres); info!("opened file {:?} {:?}", &path, &fileres);
match fileres { match fileres {
Ok(k) => { Ok(k) => match chtx.send(Ok(k)).await {
match chtx.send(Ok(k)).await { Ok(_) => (),
Ok(_) => (), Err(_) => break,
Err(_) => break },
} Err(e) => match chtx.send(Err(e.into())).await {
} Ok(_) => (),
Err(e) => { Err(_) => break,
match chtx.send(Err(e.into())).await { },
Ok(_) => (),
Err(_) => break
}
}
} }
} }
}); });
chrx chrx
} }
pub fn file_content_stream(
pub fn file_content_stream(mut file: tokio::fs::File, buffer_size: usize) -> impl Stream<Item=Result<BytesMut, Error>> + Send { mut file: tokio::fs::File,
buffer_size: usize,
) -> impl Stream<Item = Result<BytesMut, Error>> + Send {
async_stream::stream! { async_stream::stream! {
use tokio::io::AsyncReadExt; use tokio::io::AsyncReadExt;
loop { loop {
@@ -349,8 +339,10 @@ pub fn file_content_stream(mut file: tokio::fs::File, buffer_size: usize) -> imp
} }
} }
pub fn parsed1(
pub fn parsed1(query: &netpod::AggQuerySingleChannel, node: Arc<Node>) -> impl Stream<Item=Result<Bytes, Error>> + Send { query: &netpod::AggQuerySingleChannel,
node: Arc<Node>,
) -> impl Stream<Item = Result<Bytes, Error>> + Send {
let query = query.clone(); let query = query.clone();
let node = node.clone(); let node = node.clone();
async_stream::stream! { async_stream::stream! {
@@ -387,7 +379,6 @@ pub fn parsed1(query: &netpod::AggQuerySingleChannel, node: Arc<Node>) -> impl S
} }
} }
pub struct EventBlobsComplete { pub struct EventBlobsComplete {
channel_config: ChannelConfig, channel_config: ChannelConfig,
file_chan: async_channel::Receiver<Result<File, Error>>, file_chan: async_channel::Receiver<Result<File, Error>>,
@@ -396,8 +387,11 @@ pub struct EventBlobsComplete {
} }
impl EventBlobsComplete { impl EventBlobsComplete {
pub fn new(
pub fn new(query: &netpod::AggQuerySingleChannel, channel_config: ChannelConfig, node: Arc<Node>) -> Self { query: &netpod::AggQuerySingleChannel,
channel_config: ChannelConfig,
node: Arc<Node>,
) -> Self {
Self { Self {
file_chan: open_files(query, node), file_chan: open_files(query, node),
evs: None, evs: None,
@@ -405,7 +399,6 @@ impl EventBlobsComplete {
channel_config, channel_config,
} }
} }
} }
impl Stream for EventBlobsComplete { impl Stream for EventBlobsComplete {
@@ -415,44 +408,38 @@ impl Stream for EventBlobsComplete {
use Poll::*; use Poll::*;
'outer: loop { 'outer: loop {
let z = match &mut self.evs { let z = match &mut self.evs {
Some(evs) => { Some(evs) => match evs.poll_next_unpin(cx) {
match evs.poll_next_unpin(cx) { Ready(Some(k)) => Ready(Some(k)),
Ready(Some(k)) => { Ready(None) => {
Ready(Some(k)) self.evs = None;
} continue 'outer;
Ready(None) => { }
self.evs = None; Pending => Pending,
},
None => match self.file_chan.poll_next_unpin(cx) {
Ready(Some(k)) => match k {
Ok(file) => {
let inp =
Box::pin(file_content_stream(file, self.buffer_size as usize));
let mut chunker = EventChunker::new(inp, self.channel_config.clone());
self.evs.replace(chunker);
continue 'outer; continue 'outer;
} }
Pending => Pending, Err(e) => Ready(Some(Err(e))),
} },
} Ready(None) => Ready(None),
None => { Pending => Pending,
match self.file_chan.poll_next_unpin(cx) { },
Ready(Some(k)) => {
match k {
Ok(file) => {
let inp = Box::pin(file_content_stream(file, self.buffer_size as usize));
let mut chunker = EventChunker::new(inp, self.channel_config.clone());
self.evs.replace(chunker);
continue 'outer;
}
Err(e) => Ready(Some(Err(e)))
}
}
Ready(None) => Ready(None),
Pending => Pending,
}
}
}; };
break z; break z;
} }
} }
} }
pub fn event_blobs_complete(
pub fn event_blobs_complete(query: &netpod::AggQuerySingleChannel, node: Arc<Node>) -> impl Stream<Item=Result<EventFull, Error>> + Send { query: &netpod::AggQuerySingleChannel,
node: Arc<Node>,
) -> impl Stream<Item = Result<EventFull, Error>> + Send {
let query = query.clone(); let query = query.clone();
let node = node.clone(); let node = node.clone();
async_stream::stream! { async_stream::stream! {
@@ -481,7 +468,6 @@ pub fn event_blobs_complete(query: &netpod::AggQuerySingleChannel, node: Arc<Nod
} }
} }
pub struct EventChunker { pub struct EventChunker {
inp: NeedMinBuffer, inp: NeedMinBuffer,
had_channel: bool, had_channel: bool,
@@ -497,8 +483,10 @@ enum DataFileState {
} }
impl EventChunker { impl EventChunker {
pub fn new(
pub fn new(inp: Pin<Box<dyn Stream<Item=Result<BytesMut, Error>> + Send>>, channel_config: ChannelConfig) -> Self { inp: Pin<Box<dyn Stream<Item = Result<BytesMut, Error>> + Send>>,
channel_config: ChannelConfig,
) -> Self {
let mut inp = NeedMinBuffer::new(inp); let mut inp = NeedMinBuffer::new(inp);
inp.set_need_min(6); inp.set_need_min(6);
Self { Self {
@@ -518,7 +506,7 @@ impl EventChunker {
// how many bytes I need min to make progress // how many bytes I need min to make progress
let mut ret = EventFull::empty(); let mut ret = EventFull::empty();
let mut need_min = 0 as u32; let mut need_min = 0 as u32;
use byteorder::{BE, ReadBytesExt}; use byteorder::{ReadBytesExt, BE};
//info!("parse_buf rb {}", buf.len()); //info!("parse_buf rb {}", buf.len());
//let mut i1 = 0; //let mut i1 = 0;
loop { loop {
@@ -539,12 +527,13 @@ impl EventChunker {
info!("parse_buf not enough A totlen {}", totlen); info!("parse_buf not enough A totlen {}", totlen);
need_min = totlen as u32; need_min = totlen as u32;
break; break;
} } else {
else {
sl.advance(len as usize - 8); sl.advance(len as usize - 8);
let len2 = sl.read_i32::<BE>().unwrap(); let len2 = sl.read_i32::<BE>().unwrap();
assert!(len == len2, "len mismatch"); assert!(len == len2, "len mismatch");
let s1 = String::from_utf8(buf.as_ref()[6..(len as usize + 6 - 8)].to_vec()).unwrap(); let s1 =
String::from_utf8(buf.as_ref()[6..(len as usize + 6 - 8)].to_vec())
.unwrap();
info!("channel name {} len {} len2 {}", s1, len, len2); info!("channel name {} len {} len2 {}", s1, len, len2);
self.state = DataFileState::Event; self.state = DataFileState::Event;
need_min = 4; need_min = 4;
@@ -560,8 +549,7 @@ impl EventChunker {
//info!("parse_buf not enough B"); //info!("parse_buf not enough B");
need_min = len as u32; need_min = len as u32;
break; break;
} } else if (buf.len() as u32) < len as u32 {
else if (buf.len() as u32) < len as u32 {
// TODO this is just for testing // TODO this is just for testing
let mut sl = std::io::Cursor::new(buf.as_ref()); let mut sl = std::io::Cursor::new(buf.as_ref());
sl.read_i32::<BE>().unwrap(); sl.read_i32::<BE>().unwrap();
@@ -570,8 +558,7 @@ impl EventChunker {
//info!("parse_buf not enough C len {} have {} ts {}", len, buf.len(), ts); //info!("parse_buf not enough C len {} have {} ts {}", len, buf.len(), ts);
need_min = len as u32; need_min = len as u32;
break; break;
} } else {
else {
let mut sl = std::io::Cursor::new(buf.as_ref()); let mut sl = std::io::Cursor::new(buf.as_ref());
let len1b = sl.read_i32::<BE>().unwrap(); let len1b = sl.read_i32::<BE>().unwrap();
assert!(len == len1b); assert!(len == len1b);
@@ -598,16 +585,10 @@ impl EventChunker {
} }
let compression_method = if is_compressed { let compression_method = if is_compressed {
sl.read_u8().unwrap() sl.read_u8().unwrap()
} } else {
else {
0
};
let shape_dim = if is_shaped {
sl.read_u8().unwrap()
}
else {
0 0
}; };
let shape_dim = if is_shaped { sl.read_u8().unwrap() } else { 0 };
assert!(compression_method <= 0); assert!(compression_method <= 0);
assert!(!is_shaped || (shape_dim >= 1 && shape_dim <= 2)); assert!(!is_shaped || (shape_dim >= 1 && shape_dim <= 2));
let mut shape_lens = [0, 0, 0, 0]; let mut shape_lens = [0, 0, 0, 0];
@@ -639,12 +620,23 @@ impl EventChunker {
decomp.set_len(decomp_bytes); decomp.set_len(decomp_bytes);
} }
//debug!("try decompress value_bytes {} ele_size {} ele_count {} type_index {}", value_bytes, ele_size, ele_count, type_index); //debug!("try decompress value_bytes {} ele_size {} ele_count {} type_index {}", value_bytes, ele_size, ele_count, type_index);
let c1 = bitshuffle_decompress(&buf.as_ref()[p1 as usize..], &mut decomp, ele_count as usize, ele_size as usize, 0).unwrap(); let c1 = bitshuffle_decompress(
&buf.as_ref()[p1 as usize..],
&mut decomp,
ele_count as usize,
ele_size as usize,
0,
)
.unwrap();
//debug!("decompress result c1 {} k1 {}", c1, k1); //debug!("decompress result c1 {} k1 {}", c1, k1);
assert!(c1 as u32 == k1); assert!(c1 as u32 == k1);
ret.add_event(ts, pulse, Some(decomp), ScalarType::from_dtype_index(type_index)); ret.add_event(
} ts,
else { pulse,
Some(decomp),
ScalarType::from_dtype_index(type_index),
);
} else {
todo!() todo!()
} }
buf.advance(len as usize); buf.advance(len as usize);
@@ -659,7 +651,6 @@ impl EventChunker {
need_min: need_min, need_min: need_min,
}) })
} }
} }
fn type_size(ix: u8) -> u32 { fn type_size(ix: u8) -> u32 {
@@ -678,7 +669,7 @@ fn type_size(ix: u8) -> u32 {
11 => 4, 11 => 4,
12 => 8, 12 => 8,
13 => 1, 13 => 1,
_ => panic!("logic") _ => panic!("logic"),
} }
} }
@@ -704,11 +695,9 @@ impl Stream for EventChunker {
match self.parse_buf(&mut buf) { match self.parse_buf(&mut buf) {
Ok(res) => { Ok(res) => {
if buf.len() > 0 { if buf.len() > 0 {
// TODO gather stats about this: // TODO gather stats about this:
//info!("parse_buf returned {} leftover bytes to me", buf.len()); //info!("parse_buf returned {} leftover bytes to me", buf.len());
self.inp.put_back(buf); self.inp.put_back(buf);
} }
if res.need_min > 8000 { if res.need_min > 8000 {
warn!("spurious EventChunker asks for need_min {}", res.need_min); warn!("spurious EventChunker asks for need_min {}", res.need_min);
@@ -717,7 +706,7 @@ impl Stream for EventChunker {
self.inp.set_need_min(res.need_min); self.inp.set_need_min(res.need_min);
Poll::Ready(Some(Ok(res.events))) Poll::Ready(Some(Ok(res.events)))
} }
Err(e) => Poll::Ready(Some(Err(e.into()))) Err(e) => Poll::Ready(Some(Err(e.into()))),
} }
} }
Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))), Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
@@ -725,7 +714,6 @@ impl Stream for EventChunker {
Poll::Pending => Poll::Pending, Poll::Pending => Poll::Pending,
} }
} }
} }
pub struct EventFull { pub struct EventFull {
@@ -736,7 +724,6 @@ pub struct EventFull {
} }
impl EventFull { impl EventFull {
pub fn empty() -> Self { pub fn empty() -> Self {
Self { Self {
tss: vec![], tss: vec![],
@@ -746,27 +733,28 @@ impl EventFull {
} }
} }
fn add_event(&mut self, ts: u64, pulse: u64, decomp: Option<BytesMut>, scalar_type: ScalarType) { fn add_event(
&mut self,
ts: u64,
pulse: u64,
decomp: Option<BytesMut>,
scalar_type: ScalarType,
) {
self.tss.push(ts); self.tss.push(ts);
self.pulses.push(pulse); self.pulses.push(pulse);
self.decomps.push(decomp); self.decomps.push(decomp);
self.scalar_types.push(scalar_type); self.scalar_types.push(scalar_type);
} }
} }
pub struct NeedMinBuffer { pub struct NeedMinBuffer {
inp: Pin<Box<dyn Stream<Item=Result<BytesMut, Error>> + Send>>, inp: Pin<Box<dyn Stream<Item = Result<BytesMut, Error>> + Send>>,
need_min: u32, need_min: u32,
left: Option<BytesMut>, left: Option<BytesMut>,
} }
impl NeedMinBuffer { impl NeedMinBuffer {
pub fn new(inp: Pin<Box<dyn Stream<Item = Result<BytesMut, Error>> + Send>>) -> Self {
pub fn new(inp: Pin<Box<dyn Stream<Item=Result<BytesMut, Error>> + Send>>) -> Self {
Self { Self {
inp: inp, inp: inp,
need_min: 1, need_min: 1,
@@ -782,7 +770,6 @@ impl NeedMinBuffer {
pub fn set_need_min(&mut self, need_min: u32) { pub fn set_need_min(&mut self, need_min: u32) {
self.need_min = need_min; self.need_min = need_min;
} }
} }
impl Stream for NeedMinBuffer { impl Stream for NeedMinBuffer {
@@ -803,8 +790,7 @@ impl Stream for NeedMinBuffer {
if buf.len() as u32 >= self.need_min { if buf.len() as u32 >= self.need_min {
//info!("with left ready len {} need_min {}", buf.len(), self.need_min); //info!("with left ready len {} need_min {}", buf.len(), self.need_min);
Poll::Ready(Some(Ok(buf))) Poll::Ready(Some(Ok(buf)))
} } else {
else {
//info!("with left not enough len {} need_min {}", buf.len(), self.need_min); //info!("with left not enough len {} need_min {}", buf.len(), self.need_min);
self.left.replace(buf); self.left.replace(buf);
again = true; again = true;
@@ -815,8 +801,7 @@ impl Stream for NeedMinBuffer {
if buf.len() as u32 >= self.need_min { if buf.len() as u32 >= self.need_min {
//info!("simply ready len {} need_min {}", buf.len(), self.need_min); //info!("simply ready len {} need_min {}", buf.len(), self.need_min);
Poll::Ready(Some(Ok(buf))) Poll::Ready(Some(Ok(buf)))
} } else {
else {
//info!("no previous leftover, need more len {} need_min {}", buf.len(), self.need_min); //info!("no previous leftover, need more len {} need_min {}", buf.len(), self.need_min);
self.left.replace(buf); self.left.replace(buf);
again = true; again = true;
@@ -834,12 +819,12 @@ impl Stream for NeedMinBuffer {
} }
} }
} }
} }
pub fn raw_concat_channel_read_stream(
query: &netpod::AggQuerySingleChannel,
pub fn raw_concat_channel_read_stream(query: &netpod::AggQuerySingleChannel, node: Arc<Node>) -> impl Stream<Item=Result<Bytes, Error>> + Send { node: Arc<Node>,
) -> impl Stream<Item = Result<Bytes, Error>> + Send {
let mut query = query.clone(); let mut query = query.clone();
let node = node.clone(); let node = node.clone();
async_stream::stream! { async_stream::stream! {
@@ -860,8 +845,10 @@ pub fn raw_concat_channel_read_stream(query: &netpod::AggQuerySingleChannel, nod
} }
} }
pub fn raw_concat_channel_read_stream_timebin(
pub fn raw_concat_channel_read_stream_timebin(query: &netpod::AggQuerySingleChannel, node: Arc<Node>) -> impl Stream<Item=Result<Bytes, Error>> { query: &netpod::AggQuerySingleChannel,
node: Arc<Node>,
) -> impl Stream<Item = Result<Bytes, Error>> {
let query = query.clone(); let query = query.clone();
let node = node.clone(); let node = node.clone();
async_stream::stream! { async_stream::stream! {
@@ -890,19 +877,20 @@ pub fn raw_concat_channel_read_stream_timebin(query: &netpod::AggQuerySingleChan
} }
} }
fn datapath(timebin: u64, config: &netpod::ChannelConfig, node: &Node) -> PathBuf { fn datapath(timebin: u64, config: &netpod::ChannelConfig, node: &Node) -> PathBuf {
//let pre = "/data/sf-databuffer/daq_swissfel"; //let pre = "/data/sf-databuffer/daq_swissfel";
node.data_base_path node.data_base_path
.join(format!("{}_{}", node.ksprefix, config.keyspace)) .join(format!("{}_{}", node.ksprefix, config.keyspace))
.join("byTime") .join("byTime")
.join(config.channel.name.clone()) .join(config.channel.name.clone())
.join(format!("{:019}", timebin)) .join(format!("{:019}", timebin))
.join(format!("{:010}", node.split)) .join(format!("{:010}", node.split))
.join(format!("{:019}_00000_Data", config.time_bin_size / netpod::timeunits::MS)) .join(format!(
"{:019}_00000_Data",
config.time_bin_size / netpod::timeunits::MS
))
} }
/** /**
Read all events from all timebins for the given channel and split. Read all events from all timebins for the given channel and split.
*/ */
@@ -923,18 +911,16 @@ pub struct RawConcatChannelReader {
// • How can I transition between Stream and async world? // • How can I transition between Stream and async world?
// • I guess I must not poll a completed Future which comes from some async fn again after it completed. // • I guess I must not poll a completed Future which comes from some async fn again after it completed.
// • relevant crates: async-stream, tokio-stream // • relevant crates: async-stream, tokio-stream
fopen: Option<Box<dyn Future<Output=Option<Result<Bytes, Error>>> + Send>>, fopen: Option<Box<dyn Future<Output = Option<Result<Bytes, Error>>> + Send>>,
} }
impl RawConcatChannelReader { impl RawConcatChannelReader {
pub fn read(self) -> Result<netpod::BodyStream, Error> { pub fn read(self) -> Result<netpod::BodyStream, Error> {
let res = netpod::BodyStream { let res = netpod::BodyStream {
inner: Box::new(self), inner: Box::new(self),
}; };
Ok(res) Ok(res)
} }
} }
impl futures_core::Stream for RawConcatChannelReader { impl futures_core::Stream for RawConcatChannelReader {
@@ -943,7 +929,6 @@ impl futures_core::Stream for RawConcatChannelReader {
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
todo!() todo!()
} }
} }
pub mod dtflags { pub mod dtflags {
@@ -953,23 +938,28 @@ pub mod dtflags {
pub const SHAPE: u8 = 0x10; pub const SHAPE: u8 = 0x10;
} }
trait ChannelConfigExt { trait ChannelConfigExt {
fn dtflags(&self) -> u8; fn dtflags(&self) -> u8;
} }
impl ChannelConfigExt for ChannelConfig { impl ChannelConfigExt for ChannelConfig {
fn dtflags(&self) -> u8 { fn dtflags(&self) -> u8 {
let mut ret = 0; let mut ret = 0;
if self.compression { ret |= COMPRESSION; } if self.compression {
ret |= COMPRESSION;
}
match self.shape { match self.shape {
Shape::Scalar => {} Shape::Scalar => {}
Shape::Wave(_) => { ret |= SHAPE; } Shape::Wave(_) => {
ret |= SHAPE;
}
}
if self.big_endian {
ret |= BIG_ENDIAN;
}
if self.array {
ret |= ARRAY;
} }
if self.big_endian { ret |= BIG_ENDIAN; }
if self.array { ret |= ARRAY; }
ret ret
} }
} }
+23 -19
View File
@@ -1,14 +1,17 @@
#[allow(unused_imports)]
use tracing::{error, warn, info, debug, trace};
use futures_core::Stream;
use err::Error;
use std::task::{Poll, Context};
use std::pin::Pin;
use crate::agg::{Dim1F32Stream, ValuesDim1}; use crate::agg::{Dim1F32Stream, ValuesDim1};
use crate::EventFull; use crate::EventFull;
use futures_util::{pin_mut, StreamExt, future::ready}; use err::Error;
use futures_core::Stream;
use futures_util::{future::ready, pin_mut, StreamExt};
use std::pin::Pin;
use std::task::{Context, Poll};
#[allow(unused_imports)]
use tracing::{debug, error, info, trace, warn};
pub struct MergeDim1F32Stream<S> where S: Stream<Item=Result<EventFull, Error>> { pub struct MergeDim1F32Stream<S>
where
S: Stream<Item = Result<EventFull, Error>>,
{
inps: Vec<Dim1F32Stream<S>>, inps: Vec<Dim1F32Stream<S>>,
current: Vec<CurVal>, current: Vec<CurVal>,
ixs: Vec<usize>, ixs: Vec<usize>,
@@ -16,8 +19,10 @@ pub struct MergeDim1F32Stream<S> where S: Stream<Item=Result<EventFull, Error>>
batch: ValuesDim1, batch: ValuesDim1,
} }
impl<S> MergeDim1F32Stream<S> where S: Stream<Item=Result<EventFull, Error>> { impl<S> MergeDim1F32Stream<S>
where
S: Stream<Item = Result<EventFull, Error>>,
{
pub fn new(inps: Vec<Dim1F32Stream<S>>) -> Self { pub fn new(inps: Vec<Dim1F32Stream<S>>) -> Self {
let n = inps.len(); let n = inps.len();
let mut current = vec![]; let mut current = vec![];
@@ -32,10 +37,12 @@ impl<S> MergeDim1F32Stream<S> where S: Stream<Item=Result<EventFull, Error>> {
batch: ValuesDim1::empty(), batch: ValuesDim1::empty(),
} }
} }
} }
impl<S> Stream for MergeDim1F32Stream<S> where S: Stream<Item=Result<EventFull, Error>> + Unpin { impl<S> Stream for MergeDim1F32Stream<S>
where
S: Stream<Item = Result<EventFull, Error>> + Unpin,
{
//type Item = <Dim1F32Stream as Stream>::Item; //type Item = <Dim1F32Stream as Stream>::Item;
type Item = Result<ValuesDim1, Error>; type Item = Result<ValuesDim1, Error>;
@@ -83,8 +90,7 @@ impl<S> Stream for MergeDim1F32Stream<S> where S: Stream<Item=Result<EventFull,
self.ixs[i1] = 0; self.ixs[i1] = 0;
self.current[i1] = CurVal::None; self.current[i1] = CurVal::None;
continue 'outer; continue 'outer;
} } else {
else {
let ts = val.tss[u]; let ts = val.tss[u];
if ts < lowest_ts { if ts < lowest_ts {
lowest_ix = i1; lowest_ix = i1;
@@ -92,14 +98,13 @@ impl<S> Stream for MergeDim1F32Stream<S> where S: Stream<Item=Result<EventFull,
} }
} }
} }
_ => panic!() _ => panic!(),
} }
} }
if lowest_ix == usize::MAX { if lowest_ix == usize::MAX {
// TODO all inputs in finished state // TODO all inputs in finished state
break Ready(None); break Ready(None);
} } else {
else {
//trace!("decided on next lowest ts {} ix {}", lowest_ts, lowest_ix); //trace!("decided on next lowest ts {} ix {}", lowest_ts, lowest_ix);
self.batch.tss.push(lowest_ts); self.batch.tss.push(lowest_ts);
let rix = self.ixs[lowest_ix]; let rix = self.ixs[lowest_ix];
@@ -108,7 +113,7 @@ impl<S> Stream for MergeDim1F32Stream<S> where S: Stream<Item=Result<EventFull,
let k = std::mem::replace(&mut k.values[rix], vec![]); let k = std::mem::replace(&mut k.values[rix], vec![]);
self.batch.values.push(k); self.batch.values.push(k);
} }
_ => panic!() _ => panic!(),
} }
self.ixs[lowest_ix] += 1; self.ixs[lowest_ix] += 1;
} }
@@ -118,7 +123,6 @@ impl<S> Stream for MergeDim1F32Stream<S> where S: Stream<Item=Result<EventFull,
} }
} }
} }
} }
enum CurVal { enum CurVal {
+2 -9
View File
@@ -1,14 +1,8 @@
/* /*
Provide ser/de of value data to a good net exchange format. Provide ser/de of value data to a good net exchange format.
*/ */
async fn local_unpacked_test() { async fn local_unpacked_test() {
// TODO what kind of query format? What information do I need here? // TODO what kind of query format? What information do I need here?
// Don't need exact details of channel because I need to parse the databuffer config anyway. // Don't need exact details of channel because I need to parse the databuffer config anyway.
@@ -39,8 +33,7 @@ async fn local_unpacked_test() {
// TODO find the matching config entry. (bonus: fuse consecutive compatible entries) // TODO find the matching config entry. (bonus: fuse consecutive compatible entries)
use crate::agg::IntoDim1F32Stream;
use crate::agg::{IntoDim1F32Stream};
let stream = crate::EventBlobsComplete::new(&query, query.channel_config.clone(), node) let stream = crate::EventBlobsComplete::new(&query, query.channel_config.clone(), node)
.into_dim_1_f32_stream(); .into_dim_1_f32_stream();
} }
+17 -36
View File
@@ -7,17 +7,13 @@ pub struct Error {
impl Error { impl Error {
pub fn with_msg<S: Into<String>>(s: S) -> Self { pub fn with_msg<S: Into<String>>(s: S) -> Self {
Self { Self { msg: s.into() }
msg: s.into(),
}
} }
} }
impl From<String> for Error { impl From<String> for Error {
fn from(k: String) -> Self { fn from(k: String) -> Self {
Self { Self { msg: k }
msg: k,
}
} }
} }
@@ -27,61 +23,46 @@ impl std::fmt::Display for Error {
} }
} }
impl std::error::Error for Error { impl std::error::Error for Error {}
}
impl From<std::io::Error> for Error { impl From<std::io::Error> for Error {
fn from (k: std::io::Error) -> Self { fn from(k: std::io::Error) -> Self {
Self { Self { msg: k.to_string() }
msg: k.to_string(),
}
} }
} }
impl From<http::Error> for Error { impl From<http::Error> for Error {
fn from (k: http::Error) -> Self { fn from(k: http::Error) -> Self {
Self { Self { msg: k.to_string() }
msg: k.to_string(),
}
} }
} }
impl From<hyper::Error> for Error { impl From<hyper::Error> for Error {
fn from (k: hyper::Error) -> Self { fn from(k: hyper::Error) -> Self {
Self { Self { msg: k.to_string() }
msg: k.to_string(),
}
} }
} }
impl From<serde_json::Error> for Error { impl From<serde_json::Error> for Error {
fn from (k: serde_json::Error) -> Self { fn from(k: serde_json::Error) -> Self {
Self { Self { msg: k.to_string() }
msg: k.to_string(),
}
} }
} }
impl From<async_channel::RecvError> for Error { impl From<async_channel::RecvError> for Error {
fn from (k: async_channel::RecvError) -> Self { fn from(k: async_channel::RecvError) -> Self {
Self { Self { msg: k.to_string() }
msg: k.to_string(),
}
} }
} }
impl From<chrono::format::ParseError> for Error { impl From<chrono::format::ParseError> for Error {
fn from (k: chrono::format::ParseError) -> Self { fn from(k: chrono::format::ParseError) -> Self {
Self { Self { msg: k.to_string() }
msg: k.to_string(),
}
} }
} }
impl From<ParseIntError> for Error { impl From<ParseIntError> for Error {
fn from (k: ParseIntError) -> Self { fn from(k: ParseIntError) -> Self {
Self { Self { msg: k.to_string() }
msg: k.to_string(),
}
} }
} }
+68 -72
View File
@@ -1,23 +1,23 @@
#[allow(unused_imports)] use bytes::Bytes;
use tracing::{error, warn, info, debug, trace}; use disk::cache::PreBinnedQuery;
use err::Error; use err::Error;
use std::{task, future, pin, net, panic, sync};
use net::SocketAddr;
use http::{Method, StatusCode, HeaderMap};
use hyper::{Body, Request, Response, server::Server};
use hyper::service::{make_service_fn, service_fn};
use task::{Context, Poll};
use future::Future; use future::Future;
use pin::Pin;
use futures_core::Stream; use futures_core::Stream;
use futures_util::{FutureExt, StreamExt}; use futures_util::{FutureExt, StreamExt};
use netpod::{Node, Cluster, AggKind, NodeConfig}; use http::{HeaderMap, Method, StatusCode};
use hyper::service::{make_service_fn, service_fn};
use hyper::{server::Server, Body, Request, Response};
use net::SocketAddr;
use netpod::{AggKind, Cluster, Node, NodeConfig};
use panic::{AssertUnwindSafe, UnwindSafe};
use pin::Pin;
use std::{future, net, panic, pin, sync, task};
use sync::Arc; use sync::Arc;
use disk::cache::PreBinnedQuery; use task::{Context, Poll};
use panic::{UnwindSafe, AssertUnwindSafe};
use bytes::Bytes;
use tokio::net::TcpStream;
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;
#[allow(unused_imports)]
use tracing::{debug, error, info, trace, warn};
pub async fn host(node_config: Arc<NodeConfig>) -> Result<(), Error> { pub async fn host(node_config: Arc<NodeConfig>) -> Result<(), Error> {
let rawjh = taskrun::spawn(raw_service(node_config.clone())); let rawjh = taskrun::spawn(raw_service(node_config.clone()));
@@ -41,9 +41,12 @@ pub async fn host(node_config: Arc<NodeConfig>) -> Result<(), Error> {
Ok(()) Ok(())
} }
async fn data_api_proxy(req: Request<Body>, node_config: Arc<NodeConfig>) -> Result<Response<Body>, Error> { async fn data_api_proxy(
req: Request<Body>,
node_config: Arc<NodeConfig>,
) -> Result<Response<Body>, Error> {
match data_api_proxy_try(req, node_config).await { match data_api_proxy_try(req, node_config).await {
Ok(k) => { Ok(k) } Ok(k) => Ok(k),
Err(e) => { Err(e) => {
error!("{:?}", e); error!("{:?}", e);
Err(e) Err(e)
@@ -55,13 +58,14 @@ struct Cont<F> {
f: Pin<Box<F>>, f: Pin<Box<F>>,
} }
impl<F, I> Future for Cont<F> where F: Future<Output=Result<I, Error>> { impl<F, I> Future for Cont<F>
where
F: Future<Output = Result<I, Error>>,
{
type Output = <F as Future>::Output; type Output = <F as Future>::Output;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let h = std::panic::catch_unwind(AssertUnwindSafe(|| { let h = std::panic::catch_unwind(AssertUnwindSafe(|| self.f.poll_unpin(cx)));
self.f.poll_unpin(cx)
}));
match h { match h {
Ok(k) => k, Ok(k) => k,
Err(e) => { Err(e) => {
@@ -70,62 +74,56 @@ impl<F, I> Future for Cont<F> where F: Future<Output=Result<I, Error>> {
Some(e) => { Some(e) => {
error!("Cont<F> catch_unwind is Error: {:?}", e); error!("Cont<F> catch_unwind is Error: {:?}", e);
} }
None => { None => {}
}
} }
Poll::Ready(Err(Error::from(format!("{:?}", e)))) Poll::Ready(Err(Error::from(format!("{:?}", e))))
} }
} }
} }
} }
impl<F> UnwindSafe for Cont<F> {} impl<F> UnwindSafe for Cont<F> {}
async fn data_api_proxy_try(
async fn data_api_proxy_try(req: Request<Body>, node_config: Arc<NodeConfig>) -> Result<Response<Body>, Error> { req: Request<Body>,
node_config: Arc<NodeConfig>,
) -> Result<Response<Body>, Error> {
let uri = req.uri().clone(); let uri = req.uri().clone();
let path = uri.path(); let path = uri.path();
if path == "/api/1/parsed_raw" { if path == "/api/1/parsed_raw" {
if req.method() == Method::POST { if req.method() == Method::POST {
Ok(parsed_raw(req).await?) Ok(parsed_raw(req).await?)
} } else {
else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
} }
} } else if path == "/api/1/binned" {
else if path == "/api/1/binned" {
if req.method() == Method::GET { if req.method() == Method::GET {
Ok(binned(req, node_config.clone()).await?) Ok(binned(req, node_config.clone()).await?)
} } else {
else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
} }
} } else if path == "/api/1/prebinned" {
else if path == "/api/1/prebinned" {
if req.method() == Method::GET { if req.method() == Method::GET {
Ok(prebinned(req, node_config.clone()).await?) Ok(prebinned(req, node_config.clone()).await?)
} } else {
else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?)
} }
} } else {
else {
Ok(response(StatusCode::NOT_FOUND).body(Body::empty())?) Ok(response(StatusCode::NOT_FOUND).body(Body::empty())?)
} }
} }
fn response<T>(status: T) -> http::response::Builder fn response<T>(status: T) -> http::response::Builder
where where
http::StatusCode: std::convert::TryFrom<T>, http::StatusCode: std::convert::TryFrom<T>,
<http::StatusCode as std::convert::TryFrom<T>>::Error: Into<http::Error>, <http::StatusCode as std::convert::TryFrom<T>>::Error: Into<http::Error>,
{ {
Response::builder().status(status) Response::builder()
.status(status)
.header("access-control-allow-origin", "*") .header("access-control-allow-origin", "*")
.header("access-control-allow-headers", "*") .header("access-control-allow-headers", "*")
} }
async fn parsed_raw(req: Request<Body>) -> Result<Response<Body>, Error> { async fn parsed_raw(req: Request<Body>) -> Result<Response<Body>, Error> {
let node = todo!("get node from config"); let node = todo!("get node from config");
use netpod::AggQuerySingleChannel; use netpod::AggQuerySingleChannel;
@@ -135,8 +133,7 @@ async fn parsed_raw(req: Request<Body>) -> Result<Response<Body>, Error> {
//let q = disk::read_test_1(&query).await?; //let q = disk::read_test_1(&query).await?;
//let s = q.inner; //let s = q.inner;
let s = disk::parsed1(&query, node); let s = disk::parsed1(&query, node);
let res = response(StatusCode::OK) let res = response(StatusCode::OK).body(Body::wrap_stream(s))?;
.body(Body::wrap_stream(s))?;
/* /*
let res = match q { let res = match q {
Ok(k) => { Ok(k) => {
@@ -152,14 +149,16 @@ async fn parsed_raw(req: Request<Body>) -> Result<Response<Body>, Error> {
Ok(res) Ok(res)
} }
struct BodyStreamWrap(netpod::BodyStream); struct BodyStreamWrap(netpod::BodyStream);
impl hyper::body::HttpBody for BodyStreamWrap { impl hyper::body::HttpBody for BodyStreamWrap {
type Data = bytes::Bytes; type Data = bytes::Bytes;
type Error = Error; type Error = Error;
fn poll_data(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Result<Self::Data, Self::Error>>> { fn poll_data(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
/* /*
use futures_core::stream::Stream; use futures_core::stream::Stream;
let z: &mut async_channel::Receiver<Result<Self::Data, Self::Error>> = &mut self.0.receiver; let z: &mut async_channel::Receiver<Result<Self::Data, Self::Error>> = &mut self.0.receiver;
@@ -171,39 +170,42 @@ impl hyper::body::HttpBody for BodyStreamWrap {
todo!() todo!()
} }
fn poll_trailers(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<Option<HeaderMap>, Self::Error>> { fn poll_trailers(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
Poll::Ready(Ok(None)) Poll::Ready(Ok(None))
} }
} }
struct BodyStream<S> { struct BodyStream<S> {
inp: S, inp: S,
} }
impl<S, I> BodyStream<S> where S: Stream<Item=Result<I, Error>> + Unpin + Send + 'static, I: Into<Bytes> + Sized + 'static { impl<S, I> BodyStream<S>
where
S: Stream<Item = Result<I, Error>> + Unpin + Send + 'static,
I: Into<Bytes> + Sized + 'static,
{
pub fn new(inp: S) -> Self { pub fn new(inp: S) -> Self {
Self { Self { inp }
inp,
}
} }
pub fn wrapped(inp: S) -> Body { pub fn wrapped(inp: S) -> Body {
Body::wrap_stream(Self::new(inp)) Body::wrap_stream(Self::new(inp))
} }
} }
impl<S, I> Stream for BodyStream<S> where S: Stream<Item=Result<I, Error>> + Unpin, I: Into<Bytes> + Sized { impl<S, I> Stream for BodyStream<S>
where
S: Stream<Item = Result<I, Error>> + Unpin,
I: Into<Bytes> + Sized,
{
type Item = Result<I, Error>; type Item = Result<I, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*; use Poll::*;
let t = std::panic::catch_unwind(AssertUnwindSafe(|| { let t = std::panic::catch_unwind(AssertUnwindSafe(|| self.inp.poll_next_unpin(cx)));
self.inp.poll_next_unpin(cx)
}));
let r = match t { let r = match t {
Ok(k) => k, Ok(k) => k,
Err(e) => { Err(e) => {
@@ -221,10 +223,8 @@ impl<S, I> Stream for BodyStream<S> where S: Stream<Item=Result<I, Error>> + Unp
Pending => Pending, Pending => Pending,
} }
} }
} }
async fn binned(req: Request<Body>, node_config: Arc<NodeConfig>) -> Result<Response<Body>, Error> { async fn binned(req: Request<Body>, node_config: Arc<NodeConfig>) -> Result<Response<Body>, Error> {
info!("-------------------------------------------------------- BINNED"); info!("-------------------------------------------------------- BINNED");
let (head, body) = req.into_parts(); let (head, body) = req.into_parts();
@@ -238,10 +238,7 @@ async fn binned(req: Request<Body>, node_config: Arc<NodeConfig>) -> Result<Resp
let query = disk::cache::Query::from_request(&head)?; let query = disk::cache::Query::from_request(&head)?;
let ret = match disk::cache::binned_bytes_for_http(node_config, &query) { let ret = match disk::cache::binned_bytes_for_http(node_config, &query) {
Ok(s) => { Ok(s) => response(StatusCode::OK).body(BodyStream::wrapped(s))?,
response(StatusCode::OK)
.body(BodyStream::wrapped(s))?
}
Err(e) => { Err(e) => {
error!("{:?}", e); error!("{:?}", e);
response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())? response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())?
@@ -250,16 +247,15 @@ async fn binned(req: Request<Body>, node_config: Arc<NodeConfig>) -> Result<Resp
Ok(ret) Ok(ret)
} }
async fn prebinned(
async fn prebinned(req: Request<Body>, node_config: Arc<NodeConfig>) -> Result<Response<Body>, Error> { req: Request<Body>,
node_config: Arc<NodeConfig>,
) -> Result<Response<Body>, Error> {
info!("-------------------------------------------------------- PRE-BINNED"); info!("-------------------------------------------------------- PRE-BINNED");
let (head, body) = req.into_parts(); let (head, body) = req.into_parts();
let q = PreBinnedQuery::from_request(&head)?; let q = PreBinnedQuery::from_request(&head)?;
let ret = match disk::cache::pre_binned_bytes_for_http(node_config, &q) { let ret = match disk::cache::pre_binned_bytes_for_http(node_config, &q) {
Ok(s) => { Ok(s) => response(StatusCode::OK).body(BodyStream::wrapped(s))?,
response(StatusCode::OK)
.body(BodyStream::wrapped(s))?
}
Err(e) => { Err(e) => {
error!("{:?}", e); error!("{:?}", e);
response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())? response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())?
@@ -275,7 +271,7 @@ async fn raw_service(node_config: Arc<NodeConfig>) -> Result<(), Error> {
Ok((stream, addr)) => { Ok((stream, addr)) => {
taskrun::spawn(raw_conn_handler(stream, addr)); taskrun::spawn(raw_conn_handler(stream, addr));
} }
Err(e) => Err(e)? Err(e) => Err(e)?,
} }
} }
Ok(()) Ok(())
+58 -79
View File
@@ -1,13 +1,12 @@
#[allow(unused_imports)] use chrono::{DateTime, TimeZone, Utc};
use tracing::{error, warn, info, debug, trace};
use serde::{Serialize, Deserialize};
use err::Error; use err::Error;
use std::path::PathBuf; use serde::{Deserialize, Serialize};
use chrono::{DateTime, Utc, TimeZone};
use std::sync::Arc;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::path::PathBuf;
use std::sync::Arc;
use timeunits::*; use timeunits::*;
#[allow(unused_imports)]
use tracing::{debug, error, info, trace, warn};
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct AggQuerySingleChannel { pub struct AggQuerySingleChannel {
@@ -19,7 +18,7 @@ pub struct AggQuerySingleChannel {
pub struct BodyStream { pub struct BodyStream {
//pub receiver: async_channel::Receiver<Result<bytes::Bytes, Error>>, //pub receiver: async_channel::Receiver<Result<bytes::Bytes, Error>>,
pub inner: Box<dyn futures_core::Stream<Item=Result<bytes::Bytes, Error>> + Send + Unpin>, pub inner: Box<dyn futures_core::Stream<Item = Result<bytes::Bytes, Error>> + Send + Unpin>,
} }
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
@@ -37,7 +36,6 @@ pub enum ScalarType {
} }
impl ScalarType { impl ScalarType {
pub fn from_dtype_index(ix: u8) -> Self { pub fn from_dtype_index(ix: u8) -> Self {
use ScalarType::*; use ScalarType::*;
match ix { match ix {
@@ -90,7 +88,6 @@ impl ScalarType {
F64 => 12, F64 => 12,
} }
} }
} }
#[derive(Debug)] #[derive(Debug)]
@@ -109,20 +106,17 @@ impl Node {
} }
} }
#[derive(Debug)] #[derive(Debug)]
pub struct Cluster { pub struct Cluster {
pub nodes: Vec<Arc<Node>>, pub nodes: Vec<Arc<Node>>,
} }
#[derive(Debug)] #[derive(Debug)]
pub struct NodeConfig { pub struct NodeConfig {
pub node: Arc<Node>, pub node: Arc<Node>,
pub cluster: Arc<Cluster>, pub cluster: Arc<Cluster>,
} }
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Channel { pub struct Channel {
pub backend: String, pub backend: String,
@@ -135,7 +129,6 @@ impl Channel {
} }
} }
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub enum TimeRange { pub enum TimeRange {
Time { Time {
@@ -152,7 +145,6 @@ pub enum TimeRange {
}, },
} }
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct NanoRange { pub struct NanoRange {
pub beg: u64, pub beg: u64,
@@ -160,14 +152,11 @@ pub struct NanoRange {
} }
impl NanoRange { impl NanoRange {
pub fn delta(&self) -> u64 { pub fn delta(&self) -> u64 {
self.end - self.beg self.end - self.beg
} }
} }
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ChannelConfig { pub struct ChannelConfig {
pub channel: Channel, pub channel: Channel,
@@ -180,7 +169,6 @@ pub struct ChannelConfig {
pub big_endian: bool, pub big_endian: bool,
} }
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub enum Shape { pub enum Shape {
Scalar, Scalar,
@@ -197,8 +185,6 @@ pub mod timeunits {
pub const WEEK: u64 = DAY * 7; pub const WEEK: u64 = DAY * 7;
} }
pub struct BinSpecDimT { pub struct BinSpecDimT {
pub count: u64, pub count: u64,
pub ts1: u64, pub ts1: u64,
@@ -207,7 +193,6 @@ pub struct BinSpecDimT {
} }
impl BinSpecDimT { impl BinSpecDimT {
pub fn over_range(count: u64, ts1: u64, ts2: u64) -> Self { pub fn over_range(count: u64, ts1: u64, ts2: u64) -> Self {
use timeunits::*; use timeunits::*;
assert!(count >= 1); assert!(count >= 1);
@@ -217,15 +202,39 @@ impl BinSpecDimT {
assert!(dt <= DAY * 14); assert!(dt <= DAY * 14);
let bs = dt / count; let bs = dt / count;
let BIN_THRESHOLDS = [ let BIN_THRESHOLDS = [
2, 10, 100, 2,
1000, 10_000, 100_000, 10,
MU, MU * 10, MU * 100, 100,
MS, MS * 10, MS * 100, 1000,
SEC, SEC * 5, SEC * 10, SEC * 20, 10_000,
MIN, MIN * 5, MIN * 10, MIN * 20, 100_000,
HOUR, HOUR * 2, HOUR * 4, HOUR * 12, MU,
DAY, DAY * 2, DAY * 4, DAY * 8, DAY * 16, MU * 10,
WEEK, WEEK * 2, WEEK * 10, WEEK * 60, MU * 100,
MS,
MS * 10,
MS * 100,
SEC,
SEC * 5,
SEC * 10,
SEC * 20,
MIN,
MIN * 5,
MIN * 10,
MIN * 20,
HOUR,
HOUR * 2,
HOUR * 4,
HOUR * 12,
DAY,
DAY * 2,
DAY * 4,
DAY * 8,
DAY * 16,
WEEK,
WEEK * 2,
WEEK * 10,
WEEK * 60,
]; ];
let mut i1 = 0; let mut i1 = 0;
let bs = loop { let bs = loop {
@@ -257,17 +266,14 @@ impl BinSpecDimT {
end: self.ts1 + (ix as u64 + 1) * self.bs, end: self.ts1 + (ix as u64 + 1) * self.bs,
} }
} }
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct PreBinnedPatchGridSpec { pub struct PreBinnedPatchGridSpec {
bin_t_len: u64, bin_t_len: u64,
} }
impl PreBinnedPatchGridSpec { impl PreBinnedPatchGridSpec {
pub fn new(bin_t_len: u64) -> Self { pub fn new(bin_t_len: u64) -> Self {
let mut ok = false; let mut ok = false;
for &j in PATCH_T_LEN_OPTIONS.iter() { for &j in PATCH_T_LEN_OPTIONS.iter() {
@@ -277,11 +283,12 @@ impl PreBinnedPatchGridSpec {
} }
} }
if !ok { if !ok {
panic!("invalid bin_t_len for PreBinnedPatchGridSpec {}", bin_t_len); panic!(
} "invalid bin_t_len for PreBinnedPatchGridSpec {}",
Self { bin_t_len
bin_t_len, );
} }
Self { bin_t_len }
} }
pub fn from_query_params(params: &BTreeMap<String, String>) -> Self { pub fn from_query_params(params: &BTreeMap<String, String>) -> Self {
@@ -315,27 +322,11 @@ impl PreBinnedPatchGridSpec {
} }
panic!() panic!()
} }
} }
const BIN_T_LEN_OPTIONS: [u64; 6] = [ const BIN_T_LEN_OPTIONS: [u64; 6] = [SEC * 10, MIN * 10, HOUR, HOUR * 4, DAY, DAY * 4];
SEC * 10,
MIN * 10,
HOUR,
HOUR * 4,
DAY,
DAY * 4,
];
const PATCH_T_LEN_OPTIONS: [u64; 6] = [
MIN * 10,
HOUR,
HOUR * 4,
DAY,
DAY * 4,
DAY * 12,
];
const PATCH_T_LEN_OPTIONS: [u64; 6] = [MIN * 10, HOUR, HOUR * 4, DAY, DAY * 4, DAY * 12];
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct PreBinnedPatchRange { pub struct PreBinnedPatchRange {
@@ -345,7 +336,6 @@ pub struct PreBinnedPatchRange {
} }
impl PreBinnedPatchRange { impl PreBinnedPatchRange {
pub fn covering_range(range: NanoRange, min_bin_count: u64) -> Option<Self> { pub fn covering_range(range: NanoRange, min_bin_count: u64) -> Option<Self> {
assert!(min_bin_count >= 1); assert!(min_bin_count >= 1);
assert!(min_bin_count <= 2000); assert!(min_bin_count <= 2000);
@@ -357,8 +347,7 @@ impl PreBinnedPatchRange {
loop { loop {
if i1 <= 0 { if i1 <= 0 {
break None; break None;
} } else {
else {
i1 -= 1; i1 -= 1;
let t = BIN_T_LEN_OPTIONS[i1]; let t = BIN_T_LEN_OPTIONS[i1];
//info!("look at threshold {} bs {}", t, bs); //info!("look at threshold {} bs {}", t, bs);
@@ -369,22 +358,17 @@ impl PreBinnedPatchRange {
let count = range.delta() / bs; let count = range.delta() / bs;
let offset = ts1 / bs; let offset = ts1 / bs;
break Some(Self { break Some(Self {
grid_spec: PreBinnedPatchGridSpec { grid_spec: PreBinnedPatchGridSpec { bin_t_len: bs },
bin_t_len: bs,
},
count, count,
offset, offset,
}); });
} } else {
else {
} }
} }
} }
} }
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct PreBinnedPatchCoord { pub struct PreBinnedPatchCoord {
spec: PreBinnedPatchGridSpec, spec: PreBinnedPatchGridSpec,
@@ -392,7 +376,6 @@ pub struct PreBinnedPatchCoord {
} }
impl PreBinnedPatchCoord { impl PreBinnedPatchCoord {
pub fn bin_t_len(&self) -> u64 { pub fn bin_t_len(&self) -> u64 {
self.spec.bin_t_len self.spec.bin_t_len
} }
@@ -422,7 +405,12 @@ impl PreBinnedPatchCoord {
} }
pub fn to_url_params_strings(&self) -> String { pub fn to_url_params_strings(&self) -> String {
format!("patch_t_len={}&bin_t_len={}&patch_ix={}", self.spec.patch_t_len(), self.spec.bin_t_len(), self.ix()) format!(
"patch_t_len={}&bin_t_len={}&patch_ix={}",
self.spec.patch_t_len(),
self.spec.bin_t_len(),
self.ix()
)
} }
pub fn from_query_params(params: &BTreeMap<String, String>) -> Self { pub fn from_query_params(params: &BTreeMap<String, String>) -> Self {
@@ -432,7 +420,6 @@ impl PreBinnedPatchCoord {
ix: patch_ix, ix: patch_ix,
} }
} }
} }
pub struct PreBinnedPatchIterator { pub struct PreBinnedPatchIterator {
@@ -442,7 +429,6 @@ pub struct PreBinnedPatchIterator {
} }
impl PreBinnedPatchIterator { impl PreBinnedPatchIterator {
pub fn from_range(range: PreBinnedPatchRange) -> Self { pub fn from_range(range: PreBinnedPatchRange) -> Self {
Self { Self {
range, range,
@@ -450,7 +436,6 @@ impl PreBinnedPatchIterator {
ix: 0, ix: 0,
} }
} }
} }
impl Iterator for PreBinnedPatchIterator { impl Iterator for PreBinnedPatchIterator {
@@ -459,8 +444,7 @@ impl Iterator for PreBinnedPatchIterator {
fn next(&mut self) -> Option<Self::Item> { fn next(&mut self) -> Option<Self::Item> {
if self.ix >= self.range.count { if self.ix >= self.range.count {
None None
} } else {
else {
let ret = Self::Item { let ret = Self::Item {
spec: self.range.grid_spec.clone(), spec: self.range.grid_spec.clone(),
ix: self.range.offset + self.ix, ix: self.range.offset + self.ix,
@@ -469,16 +453,13 @@ impl Iterator for PreBinnedPatchIterator {
Some(ret) Some(ret)
} }
} }
} }
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub enum AggKind { pub enum AggKind {
DimXBins1, DimXBins1,
} }
pub fn query_params(q: Option<&str>) -> std::collections::BTreeMap<String, String> { pub fn query_params(q: Option<&str>) -> std::collections::BTreeMap<String, String> {
let mut map = std::collections::BTreeMap::new(); let mut map = std::collections::BTreeMap::new();
match q { match q {
@@ -492,13 +473,11 @@ pub fn query_params(q: Option<&str>) -> std::collections::BTreeMap<String, Strin
} }
} }
} }
None => { None => {}
}
} }
map map
} }
pub trait ToNanos { pub trait ToNanos {
fn to_nanos(&self) -> u64; fn to_nanos(&self) -> u64;
} }
+11 -8
View File
@@ -1,8 +1,8 @@
#[allow(unused_imports)]
use tracing::{error, warn, info, debug, trace};
use err::Error; use err::Error;
use netpod::{ChannelConfig, Channel, timeunits::*, ScalarType, Shape, Node, Cluster, NodeConfig}; use netpod::{timeunits::*, Channel, ChannelConfig, Cluster, Node, NodeConfig, ScalarType, Shape};
use std::sync::Arc; use std::sync::Arc;
#[allow(unused_imports)]
use tracing::{debug, error, info, trace, warn};
pub fn main() { pub fn main() {
match taskrun::run(go()) { match taskrun::run(go()) {
@@ -60,9 +60,7 @@ fn simple_fetch() {
tb_file_count: 1, tb_file_count: 1,
buffer_size: 1024 * 8, buffer_size: 1024 * 8,
}; };
let cluster = Cluster { let cluster = Cluster { nodes: vec![node] };
nodes: vec![node],
};
let cluster = Arc::new(cluster); let cluster = Arc::new(cluster);
let node_config = NodeConfig { let node_config = NodeConfig {
cluster: cluster, cluster: cluster,
@@ -99,8 +97,13 @@ fn simple_fetch() {
let t2 = chrono::Utc::now(); let t2 = chrono::Utc::now();
let ms = t2.signed_duration_since(t1).num_milliseconds() as u64; let ms = t2.signed_duration_since(t1).num_milliseconds() as u64;
let throughput = ntot / 1024 * 1000 / ms; let throughput = ntot / 1024 * 1000 / ms;
info!("total download {} MB throughput {:5} kB/s", ntot / 1024 / 1024, throughput); info!(
"total download {} MB throughput {:5} kB/s",
ntot / 1024 / 1024,
throughput
);
//Err::<(), _>(format!("test error").into()) //Err::<(), _>(format!("test error").into())
Ok(()) Ok(())
}).unwrap(); })
.unwrap();
} }
+2 -3
View File
@@ -1,4 +1,4 @@
use clap::{Clap, crate_version}; use clap::{crate_version, Clap};
#[derive(Debug, Clap)] #[derive(Debug, Clap)]
#[clap(name="retrieval", author="Dominik Werder <dominik.werder@gmail.com>", version=crate_version!())] #[clap(name="retrieval", author="Dominik Werder <dominik.werder@gmail.com>", version=crate_version!())]
@@ -15,5 +15,4 @@ pub enum SubCmd {
} }
#[derive(Debug, Clap)] #[derive(Debug, Clap)]
pub struct Retrieval { pub struct Retrieval {}
}
+24 -21
View File
@@ -1,10 +1,10 @@
#[allow(unused_imports)]
use tracing::{error, warn, info, debug, trace};
use err::Error; use err::Error;
use tokio::task::JoinHandle;
use netpod::{Node, Cluster, NodeConfig};
use hyper::Body; use hyper::Body;
use netpod::{Cluster, Node, NodeConfig};
use std::sync::Arc; use std::sync::Arc;
use tokio::task::JoinHandle;
#[allow(unused_imports)]
use tracing::{debug, error, info, trace, warn};
pub mod cli; pub mod cli;
@@ -47,28 +47,31 @@ async fn get_cached_0_inner() -> Result<(), Error> {
let t2 = chrono::Utc::now(); let t2 = chrono::Utc::now();
let ms = t2.signed_duration_since(t1).num_milliseconds() as u64; let ms = t2.signed_duration_since(t1).num_milliseconds() as u64;
let throughput = ntot / 1024 * 1000 / ms; let throughput = ntot / 1024 * 1000 / ms;
info!("get_cached_0 DONE total download {} MB throughput {:5} kB/s", ntot / 1024 / 1024, throughput); info!(
"get_cached_0 DONE total download {} MB throughput {:5} kB/s",
ntot / 1024 / 1024,
throughput
);
//Err::<(), _>(format!("test error").into()) //Err::<(), _>(format!("test error").into())
Ok(()) Ok(())
} }
fn test_cluster() -> Cluster { fn test_cluster() -> Cluster {
let nodes = (0..1).into_iter().map(|id| { let nodes = (0..1)
let node = Node { .into_iter()
id, .map(|id| {
host: "localhost".into(), let node = Node {
port: 8360 + id as u16, id,
data_base_path: format!("../tmpdata/node{:02}", id).into(), host: "localhost".into(),
ksprefix: "ks".into(), port: 8360 + id as u16,
split: 0, data_base_path: format!("../tmpdata/node{:02}", id).into(),
}; ksprefix: "ks".into(),
Arc::new(node) split: 0,
}) };
.collect(); Arc::new(node)
Cluster { })
nodes: nodes, .collect();
} Cluster { nodes: nodes }
} }
fn spawn_test_hosts(cluster: Arc<Cluster>) -> Vec<JoinHandle<Result<(), Error>>> { fn spawn_test_hosts(cluster: Arc<Cluster>) -> Vec<JoinHandle<Result<(), Error>>> {
+3 -3
View File
@@ -1,3 +1,3 @@
unstable_features = true #unstable_features = true
empty_item_single_line = false #empty_item_single_line = false
control_brace_style = "ClosingNextLine" #control_brace_style = "ClosingNextLine"
+17 -12
View File
@@ -1,11 +1,11 @@
#[allow(unused_imports)]
use tracing::{error, warn, info, debug, trace};
use err::Error; use err::Error;
use std::future::Future;
use std::panic; use std::panic;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use std::future::Future; #[allow(unused_imports)]
use tracing::{debug, error, info, trace, warn};
pub fn run<T, F: std::future::Future<Output=Result<T, Error>>>(f: F) -> Result<T, Error> { pub fn run<T, F: std::future::Future<Output = Result<T, Error>>>(f: F) -> Result<T, Error> {
tracing_init(); tracing_init();
tokio::runtime::Builder::new_multi_thread() tokio::runtime::Builder::new_multi_thread()
.worker_threads(12) .worker_threads(12)
@@ -27,15 +27,20 @@ pub fn run<T, F: std::future::Future<Output=Result<T, Error>>>(f: F) -> Result<T
pub fn tracing_init() { pub fn tracing_init() {
tracing_subscriber::fmt() tracing_subscriber::fmt()
//.with_timer(tracing_subscriber::fmt::time::uptime()) //.with_timer(tracing_subscriber::fmt::time::uptime())
.with_target(true) .with_target(true)
.with_thread_names(true) .with_thread_names(true)
//.with_max_level(tracing::Level::INFO) //.with_max_level(tracing::Level::INFO)
.with_env_filter(tracing_subscriber::EnvFilter::new("info,retrieval=trace,disk=trace,tokio_postgres=info")) .with_env_filter(tracing_subscriber::EnvFilter::new(
.init(); "info,retrieval=trace,disk=trace,tokio_postgres=info",
))
.init();
} }
pub fn spawn<T>(task: T) -> JoinHandle<T::Output>
pub fn spawn<T>(task: T) -> JoinHandle<T::Output> where T: Future + Send + 'static, T::Output: Send + 'static { where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
tokio::spawn(task) tokio::spawn(task)
} }