diff --git a/bitshuffle/src/lib.rs b/bitshuffle/src/lib.rs index 776df72..05edd6d 100644 --- a/bitshuffle/src/lib.rs +++ b/bitshuffle/src/lib.rs @@ -1,29 +1,51 @@ -use libc::{size_t}; +use libc::size_t; -extern { - pub fn bshuf_compress_lz4(inp: *const u8, out: *const u8, size: size_t, elem_size: size_t, block_size: size_t) -> i64; - pub fn bshuf_decompress_lz4(inp: *const u8, out: *const u8, size: size_t, elem_size: size_t, block_size: size_t) -> i64; +extern "C" { + pub fn bshuf_compress_lz4( + inp: *const u8, + out: *const u8, + size: size_t, + elem_size: size_t, + block_size: size_t, + ) -> i64; + pub fn bshuf_decompress_lz4( + inp: *const u8, + out: *const u8, + size: size_t, + elem_size: size_t, + block_size: size_t, + ) -> i64; } -pub fn bitshuffle_compress(inp: &[u8], out: &mut [u8], size: usize, elem_size: usize, block_size: usize) -> Result { +pub fn bitshuffle_compress( + inp: &[u8], + out: &mut [u8], + size: usize, + elem_size: usize, + block_size: usize, +) -> Result { unsafe { let n = bshuf_compress_lz4(inp.as_ptr(), out.as_mut_ptr(), size, elem_size, block_size); if n >= 0 { Ok(n as usize) - } - else { + } else { Err(n as isize) } } } -pub fn bitshuffle_decompress(inp: &[u8], out: &mut [u8], size: usize, elem_size: usize, block_size: usize) -> Result { +pub fn bitshuffle_decompress( + inp: &[u8], + out: &mut [u8], + size: usize, + elem_size: usize, + block_size: usize, +) -> Result { unsafe { let n = bshuf_decompress_lz4(inp.as_ptr(), out.as_mut_ptr(), size, elem_size, block_size); if n >= 0 { Ok(n as usize) - } - else { + } else { Err(n as isize) } } diff --git a/disk/src/agg.rs b/disk/src/agg.rs index ad54494..8a41c9f 100644 --- a/disk/src/agg.rs +++ b/disk/src/agg.rs @@ -1,15 +1,15 @@ -#[allow(unused_imports)] -use tracing::{error, warn, info, debug, trace}; -use err::Error; -use std::task::{Context, Poll}; -use std::pin::Pin; -use crate::EventFull; -use futures_core::Stream; -use futures_util::{pin_mut, StreamExt, future::ready}; -use netpod::{Channel, ChannelConfig, ScalarType, Shape, Node, timeunits::*}; use crate::merge::MergeDim1F32Stream; +use crate::EventFull; +use err::Error; +use futures_core::Stream; +use futures_util::{future::ready, pin_mut, StreamExt}; use netpod::BinSpecDimT; +use netpod::{timeunits::*, Channel, ChannelConfig, Node, ScalarType, Shape}; +use std::pin::Pin; use std::sync::Arc; +use std::task::{Context, Poll}; +#[allow(unused_imports)] +use tracing::{debug, error, info, trace, warn}; pub trait AggregatorTdim { type InputValue; @@ -32,11 +32,12 @@ pub trait AggregatableTdim { fn aggregator_new(&self, ts1: u64, ts2: u64) -> Self::Aggregator; } - // dummy impl AggregatableXdim1Bin for () { type Output = (); - fn into_agg(self) -> Self::Output { todo!() } + fn into_agg(self) -> Self::Output { + todo!() + } } impl AggregatableTdim for () { type Output = (); @@ -61,11 +62,14 @@ impl AggregatorTdim for () { todo!() } - fn ingest(&mut self, v: &Self::InputValue) { todo!() } - fn result(self) -> Self::OutputValue { todo!() } + fn ingest(&mut self, v: &Self::InputValue) { + todo!() + } + fn result(self) -> Self::OutputValue { + todo!() + } } - pub struct ValuesDim0 { tss: Vec, values: Vec>, @@ -73,7 +77,13 @@ pub struct ValuesDim0 { impl std::fmt::Debug for ValuesDim0 { fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(fmt, "count {} tsA {:?} tsB {:?}", self.tss.len(), self.tss.first(), self.tss.last()) + write!( + fmt, + "count {} tsA {:?} tsB {:?}", + self.tss.len(), + self.tss.first(), + self.tss.last() + ) } } @@ -101,8 +111,12 @@ impl AggregatableXdim1Bin for ValuesDim1 { max = max.max(v); sum += v; } - if min == f32::MAX { min = f32::NAN; } - if max == f32::MIN { max = f32::NAN; } + if min == f32::MAX { + min = f32::NAN; + } + if max == f32::MIN { + max = f32::NAN; + } ret.tss.push(ts); ret.mins.push(min); ret.maxs.push(max); @@ -110,29 +124,31 @@ impl AggregatableXdim1Bin for ValuesDim1 { } ret } - } - pub struct ValuesDim1 { pub tss: Vec, pub values: Vec>, } impl ValuesDim1 { - pub fn empty() -> Self { Self { tss: vec![], values: vec![], } } - } impl std::fmt::Debug for ValuesDim1 { fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(fmt, "count {} tsA {:?} tsB {:?}", self.tss.len(), self.tss.first(), self.tss.last()) + write!( + fmt, + "count {} tsA {:?} tsB {:?}", + self.tss.len(), + self.tss.first(), + self.tss.last() + ) } } @@ -160,8 +176,12 @@ impl AggregatableXdim1Bin for ValuesDim0 { max = max.max(v); sum += v; } - if min == f32::MAX { min = f32::NAN; } - if max == f32::MIN { max = f32::NAN; } + if min == f32::MAX { + min = f32::NAN; + } + if max == f32::MIN { + max = f32::NAN; + } ret.tss.push(ts); ret.mins.push(min); ret.maxs.push(max); @@ -169,10 +189,8 @@ impl AggregatableXdim1Bin for ValuesDim0 { } ret } - } - pub struct MinMaxAvgScalarEventBatch { tss: Vec, mins: Vec, @@ -200,7 +218,6 @@ impl AggregatableTdim for MinMaxAvgScalarEventBatch { fn aggregator_new(&self, ts1: u64, ts2: u64) -> Self::Aggregator { MinMaxAvgScalarEventBatchAggregator::new(ts1, ts2) } - } pub struct MinMaxAvgScalarEventBatchAggregator { @@ -213,7 +230,6 @@ pub struct MinMaxAvgScalarEventBatchAggregator { } impl MinMaxAvgScalarEventBatchAggregator { - pub fn new(ts1: u64, ts2: u64) -> Self { Self { ts1, @@ -224,7 +240,6 @@ impl MinMaxAvgScalarEventBatchAggregator { count: 0, } } - } impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator { @@ -233,28 +248,22 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator { fn ends_before(&self, inp: &Self::InputValue) -> bool { match inp.tss.last() { - Some(ts) => { - *ts < self.ts1 - } + Some(ts) => *ts < self.ts1, None => true, } } fn ends_after(&self, inp: &Self::InputValue) -> bool { match inp.tss.last() { - Some(ts) => { - *ts >= self.ts2 - } - _ => panic!() + Some(ts) => *ts >= self.ts2, + _ => panic!(), } } fn starts_after(&self, inp: &Self::InputValue) -> bool { match inp.tss.first() { - Some(ts) => { - *ts >= self.ts2 - } - _ => panic!() + Some(ts) => *ts >= self.ts2, + _ => panic!(), } } @@ -264,12 +273,10 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator { if ts < self.ts1 { //info!("EventBatchAgg {} {} {} {} IS BEFORE", v.tss[i1], v.mins[i1], v.maxs[i1], v.avgs[i1]); continue; - } - else if ts >= self.ts2 { + } else if ts >= self.ts2 { //info!("EventBatchAgg {} {} {} {} IS AFTER", v.tss[i1], v.mins[i1], v.maxs[i1], v.avgs[i1]); continue; - } - else { + } else { //info!("EventBatchAgg {} {} {} {}", v.tss[i1], v.mins[i1], v.maxs[i1], v.avgs[i1]); self.min = self.min.min(v.mins[i1]); self.max = self.max.max(v.maxs[i1]); @@ -280,9 +287,21 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator { } fn result(self) -> Self::OutputValue { - let min = if self.min == f32::MAX { f32::NAN } else { self.min }; - let max = if self.max == f32::MIN { f32::NAN } else { self.max }; - let avg = if self.count == 0 { f32::NAN } else { self.sum / self.count as f32 }; + let min = if self.min == f32::MAX { + f32::NAN + } else { + self.min + }; + let max = if self.max == f32::MIN { + f32::NAN + } else { + self.max + }; + let avg = if self.count == 0 { + f32::NAN + } else { + self.sum / self.count as f32 + }; MinMaxAvgScalarBinSingle { ts1: self.ts1, ts2: self.ts2, @@ -292,10 +311,8 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator { avg, } } - } - pub struct MinMaxAvgScalarBinBatch { ts1s: Vec, ts2s: Vec, @@ -348,11 +365,11 @@ impl AggregatorTdim for MinMaxAvgScalarBinBatchAggregator { todo!() } - fn result(self) -> Self::OutputValue { todo!() } - + fn result(self) -> Self::OutputValue { + todo!() + } } - pub struct MinMaxAvgScalarBinSingle { ts1: u64, ts2: u64, @@ -408,17 +425,19 @@ impl AggregatorTdim for MinMaxAvgScalarBinSingleAggregator { fn result(self) -> Self::OutputValue { todo!() } - } - - - -pub struct Dim0F32Stream where S: Stream> { +pub struct Dim0F32Stream +where + S: Stream>, +{ inp: S, } -impl Stream for Dim0F32Stream where S: Stream> + Unpin { +impl Stream for Dim0F32Stream +where + S: Stream> + Unpin, +{ type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { @@ -447,7 +466,9 @@ impl Stream for Dim0F32Stream where S: Stream Stream for Dim0F32Stream where S: Stream todo!() + } + _ => todo!(), } } Ready(Some(Ok(todo!()))) @@ -472,31 +493,34 @@ impl Stream for Dim0F32Stream where S: Stream Pending, } } - } pub trait IntoDim0F32Stream { - fn into_dim_0_f32_stream(self) -> Dim0F32Stream where Self: Stream> + Sized; + fn into_dim_0_f32_stream(self) -> Dim0F32Stream + where + Self: Stream> + Sized; } -impl IntoDim0F32Stream for T where T: Stream> { - +impl IntoDim0F32Stream for T +where + T: Stream>, +{ fn into_dim_0_f32_stream(self) -> Dim0F32Stream { - Dim0F32Stream { - inp: self, - } + Dim0F32Stream { inp: self } } - } - - - -pub struct Dim1F32Stream where S: Stream> { +pub struct Dim1F32Stream +where + S: Stream>, +{ inp: S, } -impl Stream for Dim1F32Stream where S: Stream> + Unpin { +impl Stream for Dim1F32Stream +where + S: Stream> + Unpin, +{ type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { @@ -521,7 +545,9 @@ impl Stream for Dim1F32Stream where S: Stream Stream for Dim1F32Stream where S: Stream todo!() + } + _ => todo!(), } } Ready(Some(Ok(ret))) @@ -546,45 +572,54 @@ impl Stream for Dim1F32Stream where S: Stream Pending, } } - } pub trait IntoDim1F32Stream { - fn into_dim_1_f32_stream(self) -> Dim1F32Stream where Self: Stream> + Sized; + fn into_dim_1_f32_stream(self) -> Dim1F32Stream + where + Self: Stream> + Sized; } -impl IntoDim1F32Stream for T where T: Stream> { - +impl IntoDim1F32Stream for T +where + T: Stream>, +{ fn into_dim_1_f32_stream(self) -> Dim1F32Stream { - Dim1F32Stream { - inp: self, - } + Dim1F32Stream { inp: self } } - } - pub trait IntoBinnedXBins1 { type StreamOut; - fn into_binned_x_bins_1(self) -> Self::StreamOut where Self: Stream>; + fn into_binned_x_bins_1(self) -> Self::StreamOut + where + Self: Stream>; } -impl IntoBinnedXBins1 for T where T: Stream> + Unpin { +impl IntoBinnedXBins1 for T +where + T: Stream> + Unpin, +{ type StreamOut = IntoBinnedXBins1DefaultStream; fn into_binned_x_bins_1(self) -> Self::StreamOut { - IntoBinnedXBins1DefaultStream { - inp: self, - } + IntoBinnedXBins1DefaultStream { inp: self } } - } -pub struct IntoBinnedXBins1DefaultStream where S: Stream> + Unpin, I: AggregatableXdim1Bin { +pub struct IntoBinnedXBins1DefaultStream +where + S: Stream> + Unpin, + I: AggregatableXdim1Bin, +{ inp: S, } -impl Stream for IntoBinnedXBins1DefaultStream where S: Stream> + Unpin, I: AggregatableXdim1Bin { +impl Stream for IntoBinnedXBins1DefaultStream +where + S: Stream> + Unpin, + I: AggregatableXdim1Bin, +{ type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { @@ -596,26 +631,31 @@ impl Stream for IntoBinnedXBins1DefaultStream where S: Stream Pending, } } - } - - pub trait IntoBinnedT { type StreamOut: Stream; fn into_binned_t(self, spec: BinSpecDimT) -> Self::StreamOut; } -impl IntoBinnedT for T where I: AggregatableTdim + Unpin, T: Stream> + Unpin, I::Aggregator: Unpin { +impl IntoBinnedT for T +where + I: AggregatableTdim + Unpin, + T: Stream> + Unpin, + I::Aggregator: Unpin, +{ type StreamOut = IntoBinnedTDefaultStream; fn into_binned_t(self, spec: BinSpecDimT) -> Self::StreamOut { IntoBinnedTDefaultStream::new(self, spec) } - } -pub struct IntoBinnedTDefaultStream where I: AggregatableTdim, S: Stream> { +pub struct IntoBinnedTDefaultStream +where + I: AggregatableTdim, + S: Stream>, +{ inp: S, aggtor: Option, spec: BinSpecDimT, @@ -623,8 +663,11 @@ pub struct IntoBinnedTDefaultStream where I: AggregatableTdim, S: Stream>>>, } -impl IntoBinnedTDefaultStream where I: AggregatableTdim, S: Stream> { - +impl IntoBinnedTDefaultStream +where + I: AggregatableTdim, + S: Stream>, +{ pub fn new(inp: S, spec: BinSpecDimT) -> Self { //info!("spec ts {} {}", spec.ts1, spec.ts2); Self { @@ -635,11 +678,13 @@ impl IntoBinnedTDefaultStream where I: AggregatableTdim, S: Stream Stream for IntoBinnedTDefaultStream -where I: AggregatableTdim + Unpin, T: Stream> + Unpin, I::Aggregator: Unpin +where + I: AggregatableTdim + Unpin, + T: Stream> + Unpin, + I::Aggregator: Unpin, { type Item = Result<::OutputValue, Error>; @@ -648,11 +693,9 @@ where I: AggregatableTdim + Unpin, T: Stream> + Unpin, I:: 'outer: loop { let cur = if self.curbin as u64 >= self.spec.count { Ready(None) - } - else if let Some(k) = self.left.take() { + } else if let Some(k) = self.left.take() { k - } - else { + } else { self.inp.poll_next_unpin(cx) }; break match cur { @@ -666,14 +709,12 @@ where I: AggregatableTdim + Unpin, T: Stream> + Unpin, I:: if ag.ends_before(&k) { //info!("ENDS BEFORE"); continue 'outer; - } - else if ag.starts_after(&k) { + } else if ag.starts_after(&k) { //info!("STARTS AFTER"); self.left = Some(Ready(Some(Ok(k)))); self.curbin += 1; Ready(Some(Ok(self.aggtor.take().unwrap().result()))) - } - else { + } else { //info!("INGEST"); ag.ingest(&k); // if this input contains also data after the current bin, then I need to keep @@ -683,30 +724,24 @@ where I: AggregatableTdim + Unpin, T: Stream> + Unpin, I:: self.left = Some(Ready(Some(Ok(k)))); self.curbin += 1; Ready(Some(Ok(self.aggtor.take().unwrap().result()))) - } - else { + } else { //info!("ENDS WITHIN"); continue 'outer; } } } Ready(Some(Err(e))) => Ready(Some(Err(e))), - Ready(None) => { - match self.aggtor.take() { - Some(ag) => { - Ready(Some(Ok(ag.result()))) - } - None => { - warn!("TODO add trailing bins"); - Ready(None) - } + Ready(None) => match self.aggtor.take() { + Some(ag) => Ready(Some(Ok(ag.result()))), + None => { + warn!("TODO add trailing bins"); + Ready(None) } }, Pending => Pending, }; } } - } pub fn make_test_node(id: u32) -> Node { Node { @@ -719,10 +754,13 @@ pub fn make_test_node(id: u32) -> Node { } } - #[test] fn agg_x_dim_0() { - taskrun::run(async { agg_x_dim_0_inner().await; Ok(()) }).unwrap(); + taskrun::run(async { + agg_x_dim_0_inner().await; + Ok(()) + }) + .unwrap(); } async fn agg_x_dim_0_inner() { @@ -750,32 +788,35 @@ async fn agg_x_dim_0_inner() { let ts1 = query.timebin as u64 * query.channel_config.time_bin_size; let ts2 = ts1 + HOUR * 24; let fut1 = crate::EventBlobsComplete::new(&query, query.channel_config.clone(), node) - .into_dim_1_f32_stream() - //.take(1000) - .map(|q| { - if let Ok(ref k) = q { - //info!("vals: {:?}", k); - } - q - }) - .into_binned_x_bins_1() - .map(|k| { - //info!("after X binning {:?}", k.as_ref().unwrap()); - k - }) - .into_binned_t(BinSpecDimT::over_range(bin_count, ts1, ts2)) - .map(|k| { - info!("after T binning {:?}", k.as_ref().unwrap()); - k - }) - .for_each(|k| ready(())); + .into_dim_1_f32_stream() + //.take(1000) + .map(|q| { + if let Ok(ref k) = q { + //info!("vals: {:?}", k); + } + q + }) + .into_binned_x_bins_1() + .map(|k| { + //info!("after X binning {:?}", k.as_ref().unwrap()); + k + }) + .into_binned_t(BinSpecDimT::over_range(bin_count, ts1, ts2)) + .map(|k| { + info!("after T binning {:?}", k.as_ref().unwrap()); + k + }) + .for_each(|k| ready(())); fut1.await; } - #[test] fn agg_x_dim_1() { - taskrun::run(async { agg_x_dim_1_inner().await; Ok(()) }).unwrap(); + taskrun::run(async { + agg_x_dim_1_inner().await; + Ok(()) + }) + .unwrap(); } async fn agg_x_dim_1_inner() { @@ -806,31 +847,35 @@ async fn agg_x_dim_1_inner() { let ts1 = query.timebin as u64 * query.channel_config.time_bin_size; let ts2 = ts1 + HOUR * 24; let fut1 = crate::EventBlobsComplete::new(&query, query.channel_config.clone(), node) - .into_dim_1_f32_stream() - //.take(1000) - .map(|q| { - if let Ok(ref k) = q { - //info!("vals: {:?}", k); - } - q - }) - .into_binned_x_bins_1() - .map(|k| { - //info!("after X binning {:?}", k.as_ref().unwrap()); - k - }) - .into_binned_t(BinSpecDimT::over_range(bin_count, ts1, ts2)) - .map(|k| { - info!("after T binning {:?}", k.as_ref().unwrap()); - k - }) - .for_each(|k| ready(())); + .into_dim_1_f32_stream() + //.take(1000) + .map(|q| { + if let Ok(ref k) = q { + //info!("vals: {:?}", k); + } + q + }) + .into_binned_x_bins_1() + .map(|k| { + //info!("after X binning {:?}", k.as_ref().unwrap()); + k + }) + .into_binned_t(BinSpecDimT::over_range(bin_count, ts1, ts2)) + .map(|k| { + info!("after T binning {:?}", k.as_ref().unwrap()); + k + }) + .for_each(|k| ready(())); fut1.await; } #[test] fn merge_0() { - taskrun::run(async { merge_0_inner().await; Ok(()) }).unwrap(); + taskrun::run(async { + merge_0_inner().await; + Ok(()) + }) + .unwrap(); } async fn merge_0_inner() { @@ -852,26 +897,23 @@ async fn merge_0_inner() { tb_file_count: 1, buffer_size: 1024 * 8, }; - let streams = (0..13).into_iter() - .map(|k| { - make_test_node(k) - }) - .map(|node| { - let node = Arc::new(node); - crate::EventBlobsComplete::new(&query, query.channel_config.clone(), node) - .into_dim_1_f32_stream() - }) - .collect(); + let streams = (0..13) + .into_iter() + .map(|k| make_test_node(k)) + .map(|node| { + let node = Arc::new(node); + crate::EventBlobsComplete::new(&query, query.channel_config.clone(), node) + .into_dim_1_f32_stream() + }) + .collect(); MergeDim1F32Stream::new(streams) - .map(|k| { - //info!("NEXT MERGED ITEM ts {:?}", k.as_ref().unwrap().tss); - }) - .fold(0, |k, q| ready(0)) - .await; + .map(|k| { + //info!("NEXT MERGED ITEM ts {:?}", k.as_ref().unwrap().tss); + }) + .fold(0, |k, q| ready(0)) + .await; } - - pub fn tmp_some_older_things() { let vals = ValuesDim1 { tss: vec![0, 1, 2, 3], diff --git a/disk/src/cache.rs b/disk/src/cache.rs index 1014960..d52d1c3 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -1,20 +1,23 @@ -#[allow(unused_imports)] -use tracing::{error, warn, info, debug, trace}; +use crate::agg::MinMaxAvgScalarBinBatch; +use bytes::{BufMut, Bytes, BytesMut}; +use chrono::{DateTime, Utc}; use err::Error; +use futures_core::Stream; +use futures_util::{pin_mut, FutureExt, StreamExt, TryFutureExt}; +use http::uri::Scheme; +use netpod::{ + AggKind, Channel, Cluster, NanoRange, Node, NodeConfig, PreBinnedPatchCoord, + PreBinnedPatchGridSpec, PreBinnedPatchIterator, PreBinnedPatchRange, ToNanos, +}; +use serde::{Deserialize, Serialize}; use std::future::{ready, Future}; use std::pin::Pin; -use std::task::{Context, Poll}; -use futures_core::Stream; -use futures_util::{StreamExt, FutureExt, pin_mut, TryFutureExt}; -use bytes::{Bytes, BytesMut, BufMut}; -use chrono::{DateTime, Utc}; -use netpod::{Node, Cluster, AggKind, NanoRange, ToNanos, PreBinnedPatchRange, PreBinnedPatchIterator, PreBinnedPatchCoord, Channel, NodeConfig, PreBinnedPatchGridSpec}; -use crate::agg::MinMaxAvgScalarBinBatch; -use http::uri::Scheme; -use tiny_keccak::Hasher; -use serde::{Serialize, Deserialize}; use std::sync::Arc; +use std::task::{Context, Poll}; +use tiny_keccak::Hasher; use tokio::fs::OpenOptions; +#[allow(unused_imports)] +use tracing::{debug, error, info, trace, warn}; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct Query { @@ -25,17 +28,24 @@ pub struct Query { } impl Query { - pub fn from_request(req: &http::request::Parts) -> Result { let params = netpod::query_params(req.uri.query()); - let beg_date = params.get("beg_date").ok_or(Error::with_msg("missing beg_date"))?; - let end_date = params.get("end_date").ok_or(Error::with_msg("missing end_date"))?; + let beg_date = params + .get("beg_date") + .ok_or(Error::with_msg("missing beg_date"))?; + let end_date = params + .get("end_date") + .ok_or(Error::with_msg("missing end_date"))?; let ret = Query { range: NanoRange { beg: beg_date.parse::>()?.to_nanos(), end: end_date.parse::>()?.to_nanos(), }, - count: params.get("bin_count").ok_or(Error::with_msg("missing beg_date"))?.parse().unwrap(), + count: params + .get("bin_count") + .ok_or(Error::with_msg("missing beg_date"))? + .parse() + .unwrap(), agg_kind: AggKind::DimXBins1, channel: Channel { backend: params.get("channel_backend").unwrap().into(), @@ -45,11 +55,12 @@ impl Query { info!("Query::from_request {:?}", ret); Ok(ret) } - } - -pub fn binned_bytes_for_http(node_config: Arc, query: &Query) -> Result { +pub fn binned_bytes_for_http( + node_config: Arc, + query: &Query, +) -> Result { let agg_kind = AggKind::DimXBins1; // TODO @@ -59,15 +70,18 @@ pub fn binned_bytes_for_http(node_config: Arc, query: &Query) -> Res Some(spec) => { info!("GOT PreBinnedPatchGridSpec: {:?}", spec); warn!("Pass here to BinnedStream what kind of Agg, range, ..."); - let s1 = BinnedStream::new(PreBinnedPatchIterator::from_range(spec), query.channel.clone(), agg_kind, node_config.clone()); + let s1 = BinnedStream::new( + PreBinnedPatchIterator::from_range(spec), + query.channel.clone(), + agg_kind, + node_config.clone(), + ); // Iterate over the patches. // Request the patch from each node. // Merge. // Agg+Bin. // Deliver. - let ret = BinnedBytesForHttpStream { - inp: s1, - }; + let ret = BinnedBytesForHttpStream { inp: s1 }; Ok(ret) } None => { @@ -78,13 +92,11 @@ pub fn binned_bytes_for_http(node_config: Arc, query: &Query) -> Res } } - pub struct BinnedBytesForHttpStream { inp: BinnedStream, } -impl BinnedBytesForHttpStream { -} +impl BinnedBytesForHttpStream {} impl Stream for BinnedBytesForHttpStream { type Item = Result; @@ -102,13 +114,8 @@ impl Stream for BinnedBytesForHttpStream { Pending => Pending, } } - } - - - - #[derive(Clone, Debug)] pub struct PreBinnedQuery { patch: PreBinnedPatchCoord, @@ -117,7 +124,6 @@ pub struct PreBinnedQuery { } impl PreBinnedQuery { - pub fn from_request(req: &http::request::Parts) -> Result { let params = netpod::query_params(req.uri.query()); let ret = PreBinnedQuery { @@ -131,34 +137,44 @@ impl PreBinnedQuery { info!("PreBinnedQuery::from_request {:?}", ret); Ok(ret) } - } - - // NOTE This answers a request for a single valid pre-binned patch. // A user must first make sure that the grid spec is valid, and that this node is responsible for it. // Otherwise it is an error. -pub fn pre_binned_bytes_for_http(node_config: Arc, query: &PreBinnedQuery) -> Result { - info!("pre_binned_bytes_for_http {:?} {:?}", query, node_config.node); - let ret = PreBinnedValueByteStream::new(query.patch.clone(), query.channel.clone(), query.agg_kind.clone(), node_config); +pub fn pre_binned_bytes_for_http( + node_config: Arc, + query: &PreBinnedQuery, +) -> Result { + info!( + "pre_binned_bytes_for_http {:?} {:?}", + query, node_config.node + ); + let ret = PreBinnedValueByteStream::new( + query.patch.clone(), + query.channel.clone(), + query.agg_kind.clone(), + node_config, + ); Ok(ret) } - pub struct PreBinnedValueByteStream { inp: PreBinnedValueStream, } impl PreBinnedValueByteStream { - - pub fn new(patch: PreBinnedPatchCoord, channel: Channel, agg_kind: AggKind, node_config: Arc) -> Self { + pub fn new( + patch: PreBinnedPatchCoord, + channel: Channel, + agg_kind: AggKind, + node_config: Arc, + ) -> Self { warn!("PreBinnedValueByteStream"); Self { inp: PreBinnedValueStream::new(patch, channel, agg_kind, node_config), } } - } impl Stream for PreBinnedValueByteStream { @@ -177,25 +193,25 @@ impl Stream for PreBinnedValueByteStream { Pending => Pending, } } - } - - - - pub struct PreBinnedValueStream { patch_coord: PreBinnedPatchCoord, channel: Channel, agg_kind: AggKind, node_config: Arc, - open_check_local_file: Option> + Send>>>, - fut2: Option> + Send>>>, + open_check_local_file: + Option> + Send>>>, + fut2: Option> + Send>>>, } impl PreBinnedValueStream { - - pub fn new(patch_coord: PreBinnedPatchCoord, channel: Channel, agg_kind: AggKind, node_config: Arc) -> Self { + pub fn new( + patch_coord: PreBinnedPatchCoord, + channel: Channel, + agg_kind: AggKind, + node_config: Arc, + ) -> Self { let node_ix = node_ix_for_patch(&patch_coord, &channel, &node_config.cluster); assert!(node_ix == node_config.node.id); Self { @@ -209,7 +225,10 @@ impl PreBinnedValueStream { } fn try_setup_fetch_prebinned_higher_res(&mut self) { - info!("try to find a next better granularity for {:?}", self.patch_coord); + info!( + "try to find a next better granularity for {:?}", + self.patch_coord + ); let g = self.patch_coord.bin_t_len(); let range = NanoRange { beg: self.patch_coord.patch_beg(), @@ -218,7 +237,14 @@ impl PreBinnedValueStream { match PreBinnedPatchRange::covering_range(range, self.patch_coord.bin_count() + 1) { Some(range) => { let h = range.grid_spec.bin_t_len(); - info!("FOUND NEXT GRAN g {} h {} ratio {} mod {} {:?}", g, h, g/h, g%h, range); + info!( + "FOUND NEXT GRAN g {} h {} ratio {} mod {} {:?}", + g, + h, + g / h, + g % h, + range + ); assert!(g / h > 1); assert!(g / h < 20); assert!(g % h == 0); @@ -228,14 +254,19 @@ impl PreBinnedValueStream { let node_config = self.node_config.clone(); let mut patch_it = PreBinnedPatchIterator::from_range(range); let s = futures_util::stream::iter(patch_it) - .map(move |coord| { - PreBinnedValueFetchedStream::new(coord, channel.clone(), agg_kind.clone(), node_config.clone()) - }) - .flatten() - .map(move |k| { - info!("ITEM from sub res bin_size {} {:?}", bin_size, k); - k - }); + .map(move |coord| { + PreBinnedValueFetchedStream::new( + coord, + channel.clone(), + agg_kind.clone(), + node_config.clone(), + ) + }) + .flatten() + .map(move |k| { + info!("ITEM from sub res bin_size {} {:?}", bin_size, k); + k + }); self.fut2 = Some(Box::pin(s)); } None => { @@ -250,7 +281,6 @@ impl PreBinnedValueStream { } } } - } impl Stream for PreBinnedValueStream { @@ -262,48 +292,41 @@ impl Stream for PreBinnedValueStream { 'outer: loop { break if let Some(fut) = self.fut2.as_mut() { fut.poll_next_unpin(cx) - } - else if let Some(fut) = self.open_check_local_file.as_mut() { + } else if let Some(fut) = self.open_check_local_file.as_mut() { match fut.poll_unpin(cx) { Ready(Ok(file)) => { todo!("IMPLEMENT READ FROM LOCAL CACHE"); Pending } - Ready(Err(e)) => { - match e.kind() { - std::io::ErrorKind::NotFound => { - warn!("TODO LOCAL CACHE FILE NOT FOUND"); - self.try_setup_fetch_prebinned_higher_res(); - continue 'outer; - } - _ => { - error!("File I/O error: {:?}", e); - Ready(Some(Err(e.into()))) - } + Ready(Err(e)) => match e.kind() { + std::io::ErrorKind::NotFound => { + warn!("TODO LOCAL CACHE FILE NOT FOUND"); + self.try_setup_fetch_prebinned_higher_res(); + continue 'outer; } - } + _ => { + error!("File I/O error: {:?}", e); + Ready(Some(Err(e.into()))) + } + }, Pending => Pending, } - } - else { + } else { use std::os::unix::fs::OpenOptionsExt; let mut opts = std::fs::OpenOptions::new(); opts.read(true); let fut = async { tokio::fs::OpenOptions::from(opts) - .open("/DOESNOTEXIST").await + .open("/DOESNOTEXIST") + .await }; self.open_check_local_file = Some(Box::pin(fut)); continue 'outer; - } + }; } } - } - - - pub struct PreBinnedValueFetchedStream { uri: http::Uri, patch_coord: PreBinnedPatchCoord, @@ -312,8 +335,12 @@ pub struct PreBinnedValueFetchedStream { } impl PreBinnedValueFetchedStream { - - pub fn new(patch_coord: PreBinnedPatchCoord, channel: Channel, agg_kind: AggKind, node_config: Arc) -> Self { + pub fn new( + patch_coord: PreBinnedPatchCoord, + channel: Channel, + agg_kind: AggKind, + node_config: Arc, + ) -> Self { let nodeix = node_ix_for_patch(&patch_coord, &channel, &node_config.cluster); let node = &node_config.cluster.nodes[nodeix as usize]; warn!("TODO defining property of a PreBinnedPatchCoord? patchlen + ix? binsize + patchix? binsize + patchsize + patchix?"); @@ -325,7 +352,9 @@ impl PreBinnedValueFetchedStream { channel.backend, channel.name, agg_kind, - ).parse().unwrap(); + ) + .parse() + .unwrap(); Self { uri, patch_coord, @@ -333,7 +362,6 @@ impl PreBinnedValueFetchedStream { res: None, } } - } impl Stream for PreBinnedValueFetchedStream { @@ -351,79 +379,72 @@ impl Stream for PreBinnedValueFetchedStream { pin_mut!(res); use hyper::body::HttpBody; match res.poll_data(cx) { - Ready(Some(Ok(k))) => { - Pending - } + Ready(Some(Ok(k))) => Pending, Ready(Some(Err(e))) => Ready(Some(Err(e.into()))), Ready(None) => Ready(None), Pending => Pending, } } - None => { - match self.resfut.as_mut() { - Some(mut resfut) => { - match resfut.poll_unpin(cx) { - Ready(res) => { - match res { - Ok(res) => { - info!("GOT result from SUB REQUEST: {:?}", res); - self.res = Some(res); - continue 'outer; - } - Err(e) => { - error!("PreBinnedValueStream error in stream {:?}", e); - Ready(Some(Err(e.into()))) - } - } - } - Pending => { - Pending - } + None => match self.resfut.as_mut() { + Some(mut resfut) => match resfut.poll_unpin(cx) { + Ready(res) => match res { + Ok(res) => { + info!("GOT result from SUB REQUEST: {:?}", res); + self.res = Some(res); + continue 'outer; } - } - None => { - let req = hyper::Request::builder() + Err(e) => { + error!("PreBinnedValueStream error in stream {:?}", e); + Ready(Some(Err(e.into()))) + } + }, + Pending => Pending, + }, + None => { + let req = hyper::Request::builder() .method(http::Method::GET) .uri(&self.uri) .body(hyper::Body::empty())?; - let client = hyper::Client::new(); - info!("START REQUEST FOR {:?}", req); - self.resfut = Some(client.request(req)); - continue 'outer; - } + let client = hyper::Client::new(); + info!("START REQUEST FOR {:?}", req); + self.resfut = Some(client.request(req)); + continue 'outer; } - } - } + }, + }; } } - } - - pub struct BinnedStream { - inp: Pin> + Send>>, + inp: Pin> + Send>>, } impl BinnedStream { - - pub fn new(patch_it: PreBinnedPatchIterator, channel: Channel, agg_kind: AggKind, node_config: Arc) -> Self { + pub fn new( + patch_it: PreBinnedPatchIterator, + channel: Channel, + agg_kind: AggKind, + node_config: Arc, + ) -> Self { warn!("BinnedStream will open a PreBinnedValueStream"); let mut patch_it = patch_it; let inp = futures_util::stream::iter(patch_it) - .map(move |coord| { - PreBinnedValueFetchedStream::new(coord, channel.clone(), agg_kind.clone(), node_config.clone()) - }) - .flatten() - .map(|k| { - info!("ITEM {:?}", k); - k - }); - Self { - inp: Box::pin(inp), - } + .map(move |coord| { + PreBinnedValueFetchedStream::new( + coord, + channel.clone(), + agg_kind.clone(), + node_config.clone(), + ) + }) + .flatten() + .map(|k| { + info!("ITEM {:?}", k); + k + }); + Self { inp: Box::pin(inp) } } - } impl Stream for BinnedStream { @@ -433,33 +454,28 @@ impl Stream for BinnedStream { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; match self.inp.poll_next_unpin(cx) { - Ready(Some(Ok(k))) => { - Ready(Some(Ok(k))) - } + Ready(Some(Ok(k))) => Ready(Some(Ok(k))), Ready(Some(Err(e))) => Ready(Some(Err(e))), Ready(None) => Ready(None), Pending => Pending, } } - } - - pub struct SomeReturnThing {} impl From for Bytes { - fn from(k: SomeReturnThing) -> Self { error!("TODO convert result to octets"); todo!("TODO convert result to octets") } - } - - -pub fn node_ix_for_patch(patch_coord: &PreBinnedPatchCoord, channel: &Channel, cluster: &Cluster) -> u32 { +pub fn node_ix_for_patch( + patch_coord: &PreBinnedPatchCoord, + channel: &Channel, + cluster: &Cluster, +) -> u32 { let mut hash = tiny_keccak::Sha3::v256(); hash.update(channel.backend.as_bytes()); hash.update(channel.name.as_bytes()); diff --git a/disk/src/channelconfig.rs b/disk/src/channelconfig.rs index 4cd369d..197c74b 100644 --- a/disk/src/channelconfig.rs +++ b/disk/src/channelconfig.rs @@ -1,10 +1,15 @@ +use crate::{BadError, Error}; +use nom::number::complete::{be_i16, be_i32, be_i64, be_i8, be_u8}; #[allow(unused_imports)] -use nom::{IResult, bytes::complete::{tag, take, take_while_m_n}, combinator::map_res, sequence::tuple}; -use nom::number::complete::{be_i8, be_u8, be_i16, be_i32, be_i64}; -use crate::{Error, BadError}; +use nom::{ + bytes::complete::{tag, take, take_while_m_n}, + combinator::map_res, + sequence::tuple, + IResult, +}; use num_derive::{FromPrimitive, ToPrimitive}; -use num_traits::{ToPrimitive}; -use serde_derive::{Serialize, Deserialize}; +use num_traits::ToPrimitive; +use serde_derive::{Deserialize, Serialize}; #[derive(Debug, FromPrimitive, ToPrimitive, Serialize, Deserialize)] pub enum DType { @@ -25,7 +30,9 @@ pub enum DType { } impl DType { - pub fn to_i16(&self) -> i16 { ToPrimitive::to_i16(self).unwrap() } + pub fn to_i16(&self) -> i16 { + ToPrimitive::to_i16(self).unwrap() + } } #[derive(Debug, FromPrimitive, ToPrimitive, Serialize, Deserialize)] @@ -34,7 +41,9 @@ pub enum CompressionMethod { } impl CompressionMethod { - pub fn to_i16(&self) -> i16 { ToPrimitive::to_i16(self).unwrap() } + pub fn to_i16(&self) -> i16 { + ToPrimitive::to_i16(self).unwrap() + } } #[derive(Debug, Serialize, Deserialize)] @@ -106,7 +115,7 @@ pub fn parseEntry(inp: &[u8]) -> Result<(&[u8], Option), Error> { if inp.len() < len1 as usize - 4 { return BadError(format!("incomplete input")); } - let inpE = &inp[(len1-8) as usize ..]; + let inpE = &inp[(len1 - 8) as usize..]; let (inp, ts) = be_i64(inp)?; let (inp, pulse) = be_i64(inp)?; let (inp, ks) = be_i32(inp)?; @@ -132,7 +141,7 @@ pub fn parseEntry(inp: &[u8]) -> Result<(&[u8], Option), Error> { } let dtype = match num_traits::FromPrimitive::from_i8(dtype) { Some(k) => k, - None => return BadError(format!("Can not convert {} to DType", dtype)) + None => return BadError(format!("Can not convert {} to DType", dtype)), }; let (inp, compressionMethod) = match isCompressed { false => (inp, None), @@ -148,7 +157,9 @@ pub fn parseEntry(inp: &[u8]) -> Result<(&[u8], Option), Error> { false => (inp, None), true => { let (mut inp, dim) = be_u8(inp)?; - if dim > 4 { return BadError(format!("unexpected number of dimensions: {}", dim)); } + if dim > 4 { + return BadError(format!("unexpected number of dimensions: {}", dim)); + } let mut shape = vec![]; for _ in 0..dim { let t1 = be_i32(inp)?; @@ -168,11 +179,33 @@ pub fn parseEntry(inp: &[u8]) -> Result<(&[u8], Option), Error> { if len1 != len2 { return BadError(format!("mismatch len1 {} len2 {}", len1, len2)); } - Ok((inpE, Some(ConfigEntry { - ts, pulse, ks, bs, splitCount, status, bb, modulo, offset, precision, dtype, - isCompressed, isArray, isShaped, isBigEndian, compressionMethod, shape, - sourceName, unit, description, optionalFields, valueConverter, - }))) + Ok(( + inpE, + Some(ConfigEntry { + ts, + pulse, + ks, + bs, + splitCount, + status, + bb, + modulo, + offset, + precision, + dtype, + isCompressed, + isArray, + isShaped, + isBigEndian, + compressionMethod, + shape, + sourceName, + unit, + description, + optionalFields, + valueConverter, + }), + )) } /* @@ -194,10 +227,12 @@ pub fn parseConfig(inp: &[u8]) -> Result { while inpA.len() > 0 { let inp = inpA; let (inp, e) = parseEntry(inp)?; - if let Some(e) = e { entries.push(e); } + if let Some(e) = e { + entries.push(e); + } inpA = inp; } - Ok(Config{ + Ok(Config { formatVersion: ver, channelName: String::from_utf8(chn.to_vec())?, entries: entries, @@ -207,21 +242,24 @@ pub fn parseConfig(inp: &[u8]) -> Result { #[cfg(test)] fn read_data() -> Vec { use std::io::Read; - let mut f1 = std::fs::File::open("ks/config/S10CB01-RLOD100-PUP10:SIG-AMPLT/latest/00000_Config").unwrap(); + let mut f1 = + std::fs::File::open("ks/config/S10CB01-RLOD100-PUP10:SIG-AMPLT/latest/00000_Config") + .unwrap(); let mut buf = vec![]; f1.read_to_end(&mut buf).unwrap(); buf } -#[test] fn parse_dummy() { - let config = parseConfig(&[0, 0, 0, 0, 0, 11, 0x61, 0x62, 0x63, 0, 0, 0, 11, - 0, 0, 0, 1, - ]).unwrap(); +#[test] +fn parse_dummy() { + let config = + parseConfig(&[0, 0, 0, 0, 0, 11, 0x61, 0x62, 0x63, 0, 0, 0, 11, 0, 0, 0, 1]).unwrap(); assert_eq!(0, config.formatVersion); assert_eq!("abc", config.channelName); } -#[test] fn open_file() { +#[test] +fn open_file() { let config = parseConfig(&readData()).unwrap(); assert_eq!(0, config.formatVersion); assert_eq!(9, config.entries.len()); diff --git a/disk/src/gen.rs b/disk/src/gen.rs index a3c57ce..9b8bbcd 100644 --- a/disk/src/gen.rs +++ b/disk/src/gen.rs @@ -1,21 +1,21 @@ -#[allow(unused_imports)] -use tracing::{error, warn, info, debug, trace}; +use crate::ChannelConfigExt; +use bitshuffle::bitshuffle_compress; +use bytes::{Buf, BufMut, Bytes, BytesMut}; use err::Error; -use std::task::{Context, Poll}; -use std::future::Future; use futures_core::Stream; use futures_util::future::FusedFuture; use futures_util::{pin_mut, StreamExt}; -use std::pin::Pin; -use tokio::io::{AsyncRead, AsyncWriteExt}; -use tokio::fs::{OpenOptions, File}; -use bytes::{Bytes, BytesMut, BufMut, Buf}; -use std::path::{Path, PathBuf}; -use bitshuffle::bitshuffle_compress; use netpod::ScalarType; +use netpod::{timeunits::*, Channel, ChannelConfig, Node, Shape}; +use std::future::Future; +use std::path::{Path, PathBuf}; +use std::pin::Pin; use std::sync::Arc; -use netpod::{Node, Channel, ChannelConfig, Shape, timeunits::*}; -use crate::ChannelConfigExt; +use std::task::{Context, Poll}; +use tokio::fs::{File, OpenOptions}; +use tokio::io::{AsyncRead, AsyncWriteExt}; +#[allow(unused_imports)] +use tracing::{debug, error, info, trace, warn}; #[test] fn test_gen_test_data() { @@ -88,31 +88,54 @@ async fn gen_node(node: &Node, ensemble: &Ensemble) -> Result<(), Error> { } async fn gen_channel(chn: &ChannelGenProps, node: &Node, ensemble: &Ensemble) -> Result<(), Error> { - let config_path = node.data_base_path - .join("config") - .join(&chn.config.channel.name); - let channel_path = node.data_base_path - .join(format!("{}_{}", node.ksprefix, chn.config.keyspace)) - .join("byTime") - .join(&chn.config.channel.name); + let config_path = node + .data_base_path + .join("config") + .join(&chn.config.channel.name); + let channel_path = node + .data_base_path + .join(format!("{}_{}", node.ksprefix, chn.config.keyspace)) + .join("byTime") + .join(&chn.config.channel.name); tokio::fs::create_dir_all(&channel_path).await?; - gen_config(&config_path, &chn.config, node, ensemble).await.map_err(|k| Error::with_msg(format!("can not generate config {:?}", k)))?; + gen_config(&config_path, &chn.config, node, ensemble) + .await + .map_err(|k| Error::with_msg(format!("can not generate config {:?}", k)))?; let mut evix = 0; let mut ts = 0; while ts < DAY { - let res = gen_timebin(evix, ts, chn.time_spacing, &channel_path, &chn.config, node, ensemble).await?; + let res = gen_timebin( + evix, + ts, + chn.time_spacing, + &channel_path, + &chn.config, + node, + ensemble, + ) + .await?; evix = res.evix; ts = res.ts; } Ok(()) } -async fn gen_config(config_path: &Path, config: &ChannelConfig, node: &Node, ensemble: &Ensemble) -> Result<(), Error> { +async fn gen_config( + config_path: &Path, + config: &ChannelConfig, + node: &Node, + ensemble: &Ensemble, +) -> Result<(), Error> { let path = config_path.join("latest"); tokio::fs::create_dir_all(&path).await?; let path = path.join("00000_Config"); info!("try to open {:?}", path); - let mut file = OpenOptions::new().write(true).create(true).truncate(true).open(path).await?; + let mut file = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(path) + .await?; let mut buf = BytesMut::with_capacity(1024 * 1); let ver = 0; buf.put_i16(ver); @@ -155,7 +178,9 @@ async fn gen_config(config_path: &Path, config: &ChannelConfig, node: &Node, ens } match config.shape { Shape::Scalar => {} - Shape::Wave(k) => { buf.put_i32(k as i32); } + Shape::Wave(k) => { + buf.put_i32(k as i32); + } } let len = buf.len() - p3; buf.as_mut()[p3..].as_mut().put_i32(len as i32); @@ -174,13 +199,28 @@ struct GenTimebinRes { ts: u64, } -async fn gen_timebin(evix: u64, ts: u64, ts_spacing: u64, channel_path: &Path, config: &ChannelConfig, node: &Node, ensemble: &Ensemble) -> Result { +async fn gen_timebin( + evix: u64, + ts: u64, + ts_spacing: u64, + channel_path: &Path, + config: &ChannelConfig, + node: &Node, + ensemble: &Ensemble, +) -> Result { let tb = ts / config.time_bin_size; - let path = channel_path.join(format!("{:019}", tb)).join(format!("{:010}", node.split)); + let path = channel_path + .join(format!("{:019}", tb)) + .join(format!("{:010}", node.split)); tokio::fs::create_dir_all(&path).await?; let path = path.join(format!("{:019}_{:05}_Data", config.time_bin_size / MS, 0)); info!("open file {:?}", path); - let mut file = OpenOptions::new().write(true).create(true).truncate(true).open(path).await?; + let mut file = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(path) + .await?; gen_datafile_header(&mut file, config).await?; let mut evix = evix; let mut ts = ts; @@ -192,10 +232,7 @@ async fn gen_timebin(evix: u64, ts: u64, ts_spacing: u64, channel_path: &Path, c evix += 1; ts += ts_spacing; } - let ret = GenTimebinRes { - evix, - ts, - }; + let ret = GenTimebinRes { evix, ts }; Ok(ret) } @@ -211,7 +248,12 @@ async fn gen_datafile_header(file: &mut File, config: &ChannelConfig) -> Result< Ok(()) } -async fn gen_event(file: &mut File, evix: u64, ts: u64, config: &ChannelConfig) -> Result<(), Error> { +async fn gen_event( + file: &mut File, + evix: u64, + ts: u64, + config: &ChannelConfig, +) -> Result<(), Error> { let mut buf = BytesMut::with_capacity(1024 * 16); buf.put_i32(0xcafecafe as u32 as i32); buf.put_u64(0xcafecafe); @@ -244,19 +286,25 @@ async fn gen_event(file: &mut File, evix: u64, ts: u64, config: &ChannelConfig) std::io::Write::write_all(&mut c1, &a)?; } let mut comp = vec![0u8; (ele_size * ele_count + 64) as usize]; - let n1 = bitshuffle_compress(&vals, &mut comp, ele_count as usize, ele_size as usize, 0).unwrap(); + let n1 = bitshuffle_compress( + &vals, + &mut comp, + ele_count as usize, + ele_size as usize, + 0, + ) + .unwrap(); buf.put_u64(vals.len() as u64); let comp_block_size = 0; buf.put_u32(comp_block_size); buf.put(&comp[..n1]); } - _ => todo!() + _ => todo!(), } } - _ => todo!() + _ => todo!(), } - } - else { + } else { todo!() } { diff --git a/disk/src/lib.rs b/disk/src/lib.rs index 0c7afac..16dad7a 100644 --- a/disk/src/lib.rs +++ b/disk/src/lib.rs @@ -1,36 +1,35 @@ -#[allow(unused_imports)] -use tracing::{error, warn, info, debug, trace}; +use crate::dtflags::{ARRAY, BIG_ENDIAN, COMPRESSION, SHAPE}; +use bitshuffle::bitshuffle_decompress; +use bytes::{Buf, Bytes, BytesMut}; use err::Error; -use std::task::{Context, Poll}; -use std::future::Future; use futures_core::Stream; use futures_util::future::FusedFuture; -use futures_util::{FutureExt, StreamExt, pin_mut, select}; -use std::pin::Pin; -use tokio::io::AsyncRead; -use tokio::fs::{OpenOptions, File}; -use bytes::{Bytes, BytesMut, Buf}; +use futures_util::{pin_mut, select, FutureExt, StreamExt}; +use netpod::{ChannelConfig, Node, ScalarType, Shape}; +use std::future::Future; use std::path::PathBuf; -use bitshuffle::bitshuffle_decompress; -use netpod::{ScalarType, Shape, Node, ChannelConfig}; +use std::pin::Pin; use std::sync::Arc; -use crate::dtflags::{COMPRESSION, BIG_ENDIAN, ARRAY, SHAPE}; +use std::task::{Context, Poll}; +use tokio::fs::{File, OpenOptions}; +use tokio::io::AsyncRead; +#[allow(unused_imports)] +use tracing::{debug, error, info, trace, warn}; pub mod agg; +pub mod cache; +pub mod channelconfig; pub mod gen; pub mod merge; -pub mod cache; pub mod raw; -pub mod channelconfig; - -pub async fn read_test_1(query: &netpod::AggQuerySingleChannel, node: Arc) -> Result { +pub async fn read_test_1( + query: &netpod::AggQuerySingleChannel, + node: Arc, +) -> Result { let path = datapath(query.timebin as u64, &query.channel_config, &node); debug!("try path: {:?}", path); - let fin = OpenOptions::new() - .read(true) - .open(path) - .await?; + let fin = OpenOptions::new().read(true).open(path).await?; let meta = fin.metadata().await; debug!("file meta {:?}", meta); let stream = netpod::BodyStream { @@ -68,8 +67,7 @@ impl Stream for FileReader { let rlen = buf.filled().len(); if rlen == 0 { Poll::Ready(None) - } - else { + } else { if rlen != blen { info!("short read {} of {}", buf.filled().len(), blen); } @@ -77,25 +75,20 @@ impl Stream for FileReader { Poll::Ready(Some(Ok(buf2.freeze()))) } } - Poll::Ready(Err(e)) => { - Poll::Ready(Some(Err(Error::from(e)))) - } - Poll::Pending => Poll::Pending + Poll::Ready(Err(e)) => Poll::Ready(Some(Err(Error::from(e)))), + Poll::Pending => Poll::Pending, } } - } - #[allow(dead_code)] struct Fopen1 { opts: OpenOptions, - fut: Pin>>>, + fut: Pin>>>, term: bool, } impl Fopen1 { - pub fn new(path: PathBuf) -> Self { let fut = Box::pin(async { let mut o1 = OpenOptions::new(); @@ -104,17 +97,14 @@ impl Fopen1 { //() == res; //todo!() res.await - }) as Pin>>>; - let _fut2: Box> = Box::new(async { - 123 - }); + }) as Pin>>>; + let _fut2: Box> = Box::new(async { 123 }); Self { opts: OpenOptions::new(), fut, term: false, } } - } impl Future for Fopen1 { @@ -126,18 +116,16 @@ impl Future for Fopen1 { Poll::Ready(Ok(k)) => { self.term = true; Poll::Ready(Ok(k)) - }, + } Poll::Ready(Err(k)) => { self.term = true; Poll::Ready(Err(k.into())) - }, + } Poll::Pending => Poll::Pending, } } - } - impl FusedFuture for Fopen1 { fn is_terminated(&self) -> bool { self.term @@ -146,8 +134,10 @@ impl FusedFuture for Fopen1 { unsafe impl Send for Fopen1 {} - -pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::AggQuerySingleChannel, node: Arc) -> impl Stream> + Send { +pub fn raw_concat_channel_read_stream_try_open_in_background( + query: &netpod::AggQuerySingleChannel, + node: Arc, +) -> impl Stream> + Send { let mut query = query.clone(); let node = node.clone(); async_stream::stream! { @@ -271,8 +261,10 @@ pub fn raw_concat_channel_read_stream_try_open_in_background(query: &netpod::Agg } } - -pub fn raw_concat_channel_read_stream_file_pipe(query: &netpod::AggQuerySingleChannel, node: Arc) -> impl Stream> + Send { +pub fn raw_concat_channel_read_stream_file_pipe( + query: &netpod::AggQuerySingleChannel, + node: Arc, +) -> impl Stream> + Send { let query = query.clone(); let node = node.clone(); async_stream::stream! { @@ -298,7 +290,10 @@ pub fn raw_concat_channel_read_stream_file_pipe(query: &netpod::AggQuerySingleCh } } -fn open_files(query: &netpod::AggQuerySingleChannel, node: Arc) -> async_channel::Receiver> { +fn open_files( + query: &netpod::AggQuerySingleChannel, + node: Arc, +) -> async_channel::Receiver> { let (chtx, chrx) = async_channel::bounded(2); let mut query = query.clone(); let node = node.clone(); @@ -307,32 +302,27 @@ fn open_files(query: &netpod::AggQuerySingleChannel, node: Arc) -> async_c for i1 in 0..query.tb_file_count { query.timebin = tb0 + i1; let path = datapath(query.timebin as u64, &query.channel_config, &node); - let fileres = OpenOptions::new() - .read(true) - .open(&path) - .await; + let fileres = OpenOptions::new().read(true).open(&path).await; info!("opened file {:?} {:?}", &path, &fileres); match fileres { - Ok(k) => { - match chtx.send(Ok(k)).await { - Ok(_) => (), - Err(_) => break - } - } - Err(e) => { - match chtx.send(Err(e.into())).await { - Ok(_) => (), - Err(_) => break - } - } + Ok(k) => match chtx.send(Ok(k)).await { + Ok(_) => (), + Err(_) => break, + }, + Err(e) => match chtx.send(Err(e.into())).await { + Ok(_) => (), + Err(_) => break, + }, } } }); chrx } - -pub fn file_content_stream(mut file: tokio::fs::File, buffer_size: usize) -> impl Stream> + Send { +pub fn file_content_stream( + mut file: tokio::fs::File, + buffer_size: usize, +) -> impl Stream> + Send { async_stream::stream! { use tokio::io::AsyncReadExt; loop { @@ -349,8 +339,10 @@ pub fn file_content_stream(mut file: tokio::fs::File, buffer_size: usize) -> imp } } - -pub fn parsed1(query: &netpod::AggQuerySingleChannel, node: Arc) -> impl Stream> + Send { +pub fn parsed1( + query: &netpod::AggQuerySingleChannel, + node: Arc, +) -> impl Stream> + Send { let query = query.clone(); let node = node.clone(); async_stream::stream! { @@ -387,7 +379,6 @@ pub fn parsed1(query: &netpod::AggQuerySingleChannel, node: Arc) -> impl S } } - pub struct EventBlobsComplete { channel_config: ChannelConfig, file_chan: async_channel::Receiver>, @@ -396,8 +387,11 @@ pub struct EventBlobsComplete { } impl EventBlobsComplete { - - pub fn new(query: &netpod::AggQuerySingleChannel, channel_config: ChannelConfig, node: Arc) -> Self { + pub fn new( + query: &netpod::AggQuerySingleChannel, + channel_config: ChannelConfig, + node: Arc, + ) -> Self { Self { file_chan: open_files(query, node), evs: None, @@ -405,7 +399,6 @@ impl EventBlobsComplete { channel_config, } } - } impl Stream for EventBlobsComplete { @@ -415,44 +408,38 @@ impl Stream for EventBlobsComplete { use Poll::*; 'outer: loop { let z = match &mut self.evs { - Some(evs) => { - match evs.poll_next_unpin(cx) { - Ready(Some(k)) => { - Ready(Some(k)) - } - Ready(None) => { - self.evs = None; + Some(evs) => match evs.poll_next_unpin(cx) { + Ready(Some(k)) => Ready(Some(k)), + Ready(None) => { + self.evs = None; + continue 'outer; + } + Pending => Pending, + }, + None => match self.file_chan.poll_next_unpin(cx) { + Ready(Some(k)) => match k { + Ok(file) => { + let inp = + Box::pin(file_content_stream(file, self.buffer_size as usize)); + let mut chunker = EventChunker::new(inp, self.channel_config.clone()); + self.evs.replace(chunker); continue 'outer; } - Pending => Pending, - } - } - None => { - match self.file_chan.poll_next_unpin(cx) { - Ready(Some(k)) => { - match k { - Ok(file) => { - let inp = Box::pin(file_content_stream(file, self.buffer_size as usize)); - let mut chunker = EventChunker::new(inp, self.channel_config.clone()); - self.evs.replace(chunker); - continue 'outer; - } - Err(e) => Ready(Some(Err(e))) - } - } - Ready(None) => Ready(None), - Pending => Pending, - } - } + Err(e) => Ready(Some(Err(e))), + }, + Ready(None) => Ready(None), + Pending => Pending, + }, }; break z; } } - } - -pub fn event_blobs_complete(query: &netpod::AggQuerySingleChannel, node: Arc) -> impl Stream> + Send { +pub fn event_blobs_complete( + query: &netpod::AggQuerySingleChannel, + node: Arc, +) -> impl Stream> + Send { let query = query.clone(); let node = node.clone(); async_stream::stream! { @@ -481,7 +468,6 @@ pub fn event_blobs_complete(query: &netpod::AggQuerySingleChannel, node: Arc> + Send>>, channel_config: ChannelConfig) -> Self { + pub fn new( + inp: Pin> + Send>>, + channel_config: ChannelConfig, + ) -> Self { let mut inp = NeedMinBuffer::new(inp); inp.set_need_min(6); Self { @@ -518,7 +506,7 @@ impl EventChunker { // how many bytes I need min to make progress let mut ret = EventFull::empty(); let mut need_min = 0 as u32; - use byteorder::{BE, ReadBytesExt}; + use byteorder::{ReadBytesExt, BE}; //info!("parse_buf rb {}", buf.len()); //let mut i1 = 0; loop { @@ -539,12 +527,13 @@ impl EventChunker { info!("parse_buf not enough A totlen {}", totlen); need_min = totlen as u32; break; - } - else { + } else { sl.advance(len as usize - 8); let len2 = sl.read_i32::().unwrap(); assert!(len == len2, "len mismatch"); - let s1 = String::from_utf8(buf.as_ref()[6..(len as usize + 6 - 8)].to_vec()).unwrap(); + let s1 = + String::from_utf8(buf.as_ref()[6..(len as usize + 6 - 8)].to_vec()) + .unwrap(); info!("channel name {} len {} len2 {}", s1, len, len2); self.state = DataFileState::Event; need_min = 4; @@ -560,8 +549,7 @@ impl EventChunker { //info!("parse_buf not enough B"); need_min = len as u32; break; - } - else if (buf.len() as u32) < len as u32 { + } else if (buf.len() as u32) < len as u32 { // TODO this is just for testing let mut sl = std::io::Cursor::new(buf.as_ref()); sl.read_i32::().unwrap(); @@ -570,8 +558,7 @@ impl EventChunker { //info!("parse_buf not enough C len {} have {} ts {}", len, buf.len(), ts); need_min = len as u32; break; - } - else { + } else { let mut sl = std::io::Cursor::new(buf.as_ref()); let len1b = sl.read_i32::().unwrap(); assert!(len == len1b); @@ -598,16 +585,10 @@ impl EventChunker { } let compression_method = if is_compressed { sl.read_u8().unwrap() - } - else { - 0 - }; - let shape_dim = if is_shaped { - sl.read_u8().unwrap() - } - else { + } else { 0 }; + let shape_dim = if is_shaped { sl.read_u8().unwrap() } else { 0 }; assert!(compression_method <= 0); assert!(!is_shaped || (shape_dim >= 1 && shape_dim <= 2)); let mut shape_lens = [0, 0, 0, 0]; @@ -639,12 +620,23 @@ impl EventChunker { decomp.set_len(decomp_bytes); } //debug!("try decompress value_bytes {} ele_size {} ele_count {} type_index {}", value_bytes, ele_size, ele_count, type_index); - let c1 = bitshuffle_decompress(&buf.as_ref()[p1 as usize..], &mut decomp, ele_count as usize, ele_size as usize, 0).unwrap(); + let c1 = bitshuffle_decompress( + &buf.as_ref()[p1 as usize..], + &mut decomp, + ele_count as usize, + ele_size as usize, + 0, + ) + .unwrap(); //debug!("decompress result c1 {} k1 {}", c1, k1); assert!(c1 as u32 == k1); - ret.add_event(ts, pulse, Some(decomp), ScalarType::from_dtype_index(type_index)); - } - else { + ret.add_event( + ts, + pulse, + Some(decomp), + ScalarType::from_dtype_index(type_index), + ); + } else { todo!() } buf.advance(len as usize); @@ -659,7 +651,6 @@ impl EventChunker { need_min: need_min, }) } - } fn type_size(ix: u8) -> u32 { @@ -678,7 +669,7 @@ fn type_size(ix: u8) -> u32 { 11 => 4, 12 => 8, 13 => 1, - _ => panic!("logic") + _ => panic!("logic"), } } @@ -704,11 +695,9 @@ impl Stream for EventChunker { match self.parse_buf(&mut buf) { Ok(res) => { if buf.len() > 0 { - // TODO gather stats about this: //info!("parse_buf returned {} leftover bytes to me", buf.len()); self.inp.put_back(buf); - } if res.need_min > 8000 { warn!("spurious EventChunker asks for need_min {}", res.need_min); @@ -717,7 +706,7 @@ impl Stream for EventChunker { self.inp.set_need_min(res.need_min); Poll::Ready(Some(Ok(res.events))) } - Err(e) => Poll::Ready(Some(Err(e.into()))) + Err(e) => Poll::Ready(Some(Err(e.into()))), } } Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))), @@ -725,7 +714,6 @@ impl Stream for EventChunker { Poll::Pending => Poll::Pending, } } - } pub struct EventFull { @@ -736,7 +724,6 @@ pub struct EventFull { } impl EventFull { - pub fn empty() -> Self { Self { tss: vec![], @@ -746,27 +733,28 @@ impl EventFull { } } - fn add_event(&mut self, ts: u64, pulse: u64, decomp: Option, scalar_type: ScalarType) { + fn add_event( + &mut self, + ts: u64, + pulse: u64, + decomp: Option, + scalar_type: ScalarType, + ) { self.tss.push(ts); self.pulses.push(pulse); self.decomps.push(decomp); self.scalar_types.push(scalar_type); } - } - - - pub struct NeedMinBuffer { - inp: Pin> + Send>>, + inp: Pin> + Send>>, need_min: u32, left: Option, } impl NeedMinBuffer { - - pub fn new(inp: Pin> + Send>>) -> Self { + pub fn new(inp: Pin> + Send>>) -> Self { Self { inp: inp, need_min: 1, @@ -782,7 +770,6 @@ impl NeedMinBuffer { pub fn set_need_min(&mut self, need_min: u32) { self.need_min = need_min; } - } impl Stream for NeedMinBuffer { @@ -803,8 +790,7 @@ impl Stream for NeedMinBuffer { if buf.len() as u32 >= self.need_min { //info!("with left ready len {} need_min {}", buf.len(), self.need_min); Poll::Ready(Some(Ok(buf))) - } - else { + } else { //info!("with left not enough len {} need_min {}", buf.len(), self.need_min); self.left.replace(buf); again = true; @@ -815,8 +801,7 @@ impl Stream for NeedMinBuffer { if buf.len() as u32 >= self.need_min { //info!("simply ready len {} need_min {}", buf.len(), self.need_min); Poll::Ready(Some(Ok(buf))) - } - else { + } else { //info!("no previous leftover, need more len {} need_min {}", buf.len(), self.need_min); self.left.replace(buf); again = true; @@ -834,12 +819,12 @@ impl Stream for NeedMinBuffer { } } } - } - - -pub fn raw_concat_channel_read_stream(query: &netpod::AggQuerySingleChannel, node: Arc) -> impl Stream> + Send { +pub fn raw_concat_channel_read_stream( + query: &netpod::AggQuerySingleChannel, + node: Arc, +) -> impl Stream> + Send { let mut query = query.clone(); let node = node.clone(); async_stream::stream! { @@ -860,8 +845,10 @@ pub fn raw_concat_channel_read_stream(query: &netpod::AggQuerySingleChannel, nod } } - -pub fn raw_concat_channel_read_stream_timebin(query: &netpod::AggQuerySingleChannel, node: Arc) -> impl Stream> { +pub fn raw_concat_channel_read_stream_timebin( + query: &netpod::AggQuerySingleChannel, + node: Arc, +) -> impl Stream> { let query = query.clone(); let node = node.clone(); async_stream::stream! { @@ -890,19 +877,20 @@ pub fn raw_concat_channel_read_stream_timebin(query: &netpod::AggQuerySingleChan } } - fn datapath(timebin: u64, config: &netpod::ChannelConfig, node: &Node) -> PathBuf { //let pre = "/data/sf-databuffer/daq_swissfel"; node.data_base_path - .join(format!("{}_{}", node.ksprefix, config.keyspace)) - .join("byTime") - .join(config.channel.name.clone()) - .join(format!("{:019}", timebin)) - .join(format!("{:010}", node.split)) - .join(format!("{:019}_00000_Data", config.time_bin_size / netpod::timeunits::MS)) + .join(format!("{}_{}", node.ksprefix, config.keyspace)) + .join("byTime") + .join(config.channel.name.clone()) + .join(format!("{:019}", timebin)) + .join(format!("{:010}", node.split)) + .join(format!( + "{:019}_00000_Data", + config.time_bin_size / netpod::timeunits::MS + )) } - /** Read all events from all timebins for the given channel and split. */ @@ -923,18 +911,16 @@ pub struct RawConcatChannelReader { // • How can I transition between Stream and async world? // • I guess I must not poll a completed Future which comes from some async fn again after it completed. // • relevant crates: async-stream, tokio-stream - fopen: Option>> + Send>>, + fopen: Option>> + Send>>, } impl RawConcatChannelReader { - pub fn read(self) -> Result { let res = netpod::BodyStream { inner: Box::new(self), }; Ok(res) } - } impl futures_core::Stream for RawConcatChannelReader { @@ -943,7 +929,6 @@ impl futures_core::Stream for RawConcatChannelReader { fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { todo!() } - } pub mod dtflags { @@ -953,23 +938,28 @@ pub mod dtflags { pub const SHAPE: u8 = 0x10; } - trait ChannelConfigExt { fn dtflags(&self) -> u8; } impl ChannelConfigExt for ChannelConfig { - fn dtflags(&self) -> u8 { let mut ret = 0; - if self.compression { ret |= COMPRESSION; } + if self.compression { + ret |= COMPRESSION; + } match self.shape { Shape::Scalar => {} - Shape::Wave(_) => { ret |= SHAPE; } + Shape::Wave(_) => { + ret |= SHAPE; + } + } + if self.big_endian { + ret |= BIG_ENDIAN; + } + if self.array { + ret |= ARRAY; } - if self.big_endian { ret |= BIG_ENDIAN; } - if self.array { ret |= ARRAY; } ret } - } diff --git a/disk/src/merge.rs b/disk/src/merge.rs index 6ab7eab..addd4fa 100644 --- a/disk/src/merge.rs +++ b/disk/src/merge.rs @@ -1,14 +1,17 @@ -#[allow(unused_imports)] -use tracing::{error, warn, info, debug, trace}; -use futures_core::Stream; -use err::Error; -use std::task::{Poll, Context}; -use std::pin::Pin; use crate::agg::{Dim1F32Stream, ValuesDim1}; use crate::EventFull; -use futures_util::{pin_mut, StreamExt, future::ready}; +use err::Error; +use futures_core::Stream; +use futures_util::{future::ready, pin_mut, StreamExt}; +use std::pin::Pin; +use std::task::{Context, Poll}; +#[allow(unused_imports)] +use tracing::{debug, error, info, trace, warn}; -pub struct MergeDim1F32Stream where S: Stream> { +pub struct MergeDim1F32Stream +where + S: Stream>, +{ inps: Vec>, current: Vec, ixs: Vec, @@ -16,8 +19,10 @@ pub struct MergeDim1F32Stream where S: Stream> batch: ValuesDim1, } -impl MergeDim1F32Stream where S: Stream> { - +impl MergeDim1F32Stream +where + S: Stream>, +{ pub fn new(inps: Vec>) -> Self { let n = inps.len(); let mut current = vec![]; @@ -32,10 +37,12 @@ impl MergeDim1F32Stream where S: Stream> { batch: ValuesDim1::empty(), } } - } -impl Stream for MergeDim1F32Stream where S: Stream> + Unpin { +impl Stream for MergeDim1F32Stream +where + S: Stream> + Unpin, +{ //type Item = ::Item; type Item = Result; @@ -83,8 +90,7 @@ impl Stream for MergeDim1F32Stream where S: Stream Stream for MergeDim1F32Stream where S: Stream panic!() + _ => panic!(), } } if lowest_ix == usize::MAX { // TODO all inputs in finished state break Ready(None); - } - else { + } else { //trace!("decided on next lowest ts {} ix {}", lowest_ts, lowest_ix); self.batch.tss.push(lowest_ts); let rix = self.ixs[lowest_ix]; @@ -108,7 +113,7 @@ impl Stream for MergeDim1F32Stream where S: Stream panic!() + _ => panic!(), } self.ixs[lowest_ix] += 1; } @@ -118,7 +123,6 @@ impl Stream for MergeDim1F32Stream where S: Stream>(s: S) -> Self { - Self { - msg: s.into(), - } + Self { msg: s.into() } } } impl From for Error { fn from(k: String) -> Self { - Self { - msg: k, - } + Self { msg: k } } } @@ -27,61 +23,46 @@ impl std::fmt::Display for Error { } } -impl std::error::Error for Error { -} +impl std::error::Error for Error {} impl From for Error { - fn from (k: std::io::Error) -> Self { - Self { - msg: k.to_string(), - } + fn from(k: std::io::Error) -> Self { + Self { msg: k.to_string() } } } impl From for Error { - fn from (k: http::Error) -> Self { - Self { - msg: k.to_string(), - } + fn from(k: http::Error) -> Self { + Self { msg: k.to_string() } } } impl From for Error { - fn from (k: hyper::Error) -> Self { - Self { - msg: k.to_string(), - } + fn from(k: hyper::Error) -> Self { + Self { msg: k.to_string() } } } impl From for Error { - fn from (k: serde_json::Error) -> Self { - Self { - msg: k.to_string(), - } + fn from(k: serde_json::Error) -> Self { + Self { msg: k.to_string() } } } impl From for Error { - fn from (k: async_channel::RecvError) -> Self { - Self { - msg: k.to_string(), - } + fn from(k: async_channel::RecvError) -> Self { + Self { msg: k.to_string() } } } impl From for Error { - fn from (k: chrono::format::ParseError) -> Self { - Self { - msg: k.to_string(), - } + fn from(k: chrono::format::ParseError) -> Self { + Self { msg: k.to_string() } } } impl From for Error { - fn from (k: ParseIntError) -> Self { - Self { - msg: k.to_string(), - } + fn from(k: ParseIntError) -> Self { + Self { msg: k.to_string() } } } diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index 21d58e1..6666c62 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -1,23 +1,23 @@ -#[allow(unused_imports)] -use tracing::{error, warn, info, debug, trace}; +use bytes::Bytes; +use disk::cache::PreBinnedQuery; use err::Error; -use std::{task, future, pin, net, panic, sync}; -use net::SocketAddr; -use http::{Method, StatusCode, HeaderMap}; -use hyper::{Body, Request, Response, server::Server}; -use hyper::service::{make_service_fn, service_fn}; -use task::{Context, Poll}; use future::Future; -use pin::Pin; use futures_core::Stream; use futures_util::{FutureExt, StreamExt}; -use netpod::{Node, Cluster, AggKind, NodeConfig}; +use http::{HeaderMap, Method, StatusCode}; +use hyper::service::{make_service_fn, service_fn}; +use hyper::{server::Server, Body, Request, Response}; +use net::SocketAddr; +use netpod::{AggKind, Cluster, Node, NodeConfig}; +use panic::{AssertUnwindSafe, UnwindSafe}; +use pin::Pin; +use std::{future, net, panic, pin, sync, task}; use sync::Arc; -use disk::cache::PreBinnedQuery; -use panic::{UnwindSafe, AssertUnwindSafe}; -use bytes::Bytes; -use tokio::net::TcpStream; +use task::{Context, Poll}; use tokio::io::AsyncWriteExt; +use tokio::net::TcpStream; +#[allow(unused_imports)] +use tracing::{debug, error, info, trace, warn}; pub async fn host(node_config: Arc) -> Result<(), Error> { let rawjh = taskrun::spawn(raw_service(node_config.clone())); @@ -41,9 +41,12 @@ pub async fn host(node_config: Arc) -> Result<(), Error> { Ok(()) } -async fn data_api_proxy(req: Request, node_config: Arc) -> Result, Error> { +async fn data_api_proxy( + req: Request, + node_config: Arc, +) -> Result, Error> { match data_api_proxy_try(req, node_config).await { - Ok(k) => { Ok(k) } + Ok(k) => Ok(k), Err(e) => { error!("{:?}", e); Err(e) @@ -55,13 +58,14 @@ struct Cont { f: Pin>, } -impl Future for Cont where F: Future> { +impl Future for Cont +where + F: Future>, +{ type Output = ::Output; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let h = std::panic::catch_unwind(AssertUnwindSafe(|| { - self.f.poll_unpin(cx) - })); + let h = std::panic::catch_unwind(AssertUnwindSafe(|| self.f.poll_unpin(cx))); match h { Ok(k) => k, Err(e) => { @@ -70,62 +74,56 @@ impl Future for Cont where F: Future> { Some(e) => { error!("Cont catch_unwind is Error: {:?}", e); } - None => { - } + None => {} } Poll::Ready(Err(Error::from(format!("{:?}", e)))) } } } - } impl UnwindSafe for Cont {} - -async fn data_api_proxy_try(req: Request, node_config: Arc) -> Result, Error> { +async fn data_api_proxy_try( + req: Request, + node_config: Arc, +) -> Result, Error> { let uri = req.uri().clone(); let path = uri.path(); if path == "/api/1/parsed_raw" { if req.method() == Method::POST { Ok(parsed_raw(req).await?) - } - else { + } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } - } - else if path == "/api/1/binned" { + } else if path == "/api/1/binned" { if req.method() == Method::GET { Ok(binned(req, node_config.clone()).await?) - } - else { + } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } - } - else if path == "/api/1/prebinned" { + } else if path == "/api/1/prebinned" { if req.method() == Method::GET { Ok(prebinned(req, node_config.clone()).await?) - } - else { + } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } - } - else { + } else { Ok(response(StatusCode::NOT_FOUND).body(Body::empty())?) } } fn response(status: T) -> http::response::Builder - where - http::StatusCode: std::convert::TryFrom, - >::Error: Into, +where + http::StatusCode: std::convert::TryFrom, + >::Error: Into, { - Response::builder().status(status) + Response::builder() + .status(status) .header("access-control-allow-origin", "*") .header("access-control-allow-headers", "*") } - async fn parsed_raw(req: Request) -> Result, Error> { let node = todo!("get node from config"); use netpod::AggQuerySingleChannel; @@ -135,8 +133,7 @@ async fn parsed_raw(req: Request) -> Result, Error> { //let q = disk::read_test_1(&query).await?; //let s = q.inner; let s = disk::parsed1(&query, node); - let res = response(StatusCode::OK) - .body(Body::wrap_stream(s))?; + let res = response(StatusCode::OK).body(Body::wrap_stream(s))?; /* let res = match q { Ok(k) => { @@ -152,14 +149,16 @@ async fn parsed_raw(req: Request) -> Result, Error> { Ok(res) } - struct BodyStreamWrap(netpod::BodyStream); impl hyper::body::HttpBody for BodyStreamWrap { type Data = bytes::Bytes; type Error = Error; - fn poll_data(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll>> { + fn poll_data( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll>> { /* use futures_core::stream::Stream; let z: &mut async_channel::Receiver> = &mut self.0.receiver; @@ -171,39 +170,42 @@ impl hyper::body::HttpBody for BodyStreamWrap { todo!() } - fn poll_trailers(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll, Self::Error>> { + fn poll_trailers( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll, Self::Error>> { Poll::Ready(Ok(None)) } - } - struct BodyStream { inp: S, } -impl BodyStream where S: Stream> + Unpin + Send + 'static, I: Into + Sized + 'static { - +impl BodyStream +where + S: Stream> + Unpin + Send + 'static, + I: Into + Sized + 'static, +{ pub fn new(inp: S) -> Self { - Self { - inp, - } + Self { inp } } pub fn wrapped(inp: S) -> Body { Body::wrap_stream(Self::new(inp)) } - } -impl Stream for BodyStream where S: Stream> + Unpin, I: Into + Sized { +impl Stream for BodyStream +where + S: Stream> + Unpin, + I: Into + Sized, +{ type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - let t = std::panic::catch_unwind(AssertUnwindSafe(|| { - self.inp.poll_next_unpin(cx) - })); + let t = std::panic::catch_unwind(AssertUnwindSafe(|| self.inp.poll_next_unpin(cx))); let r = match t { Ok(k) => k, Err(e) => { @@ -221,10 +223,8 @@ impl Stream for BodyStream where S: Stream> + Unp Pending => Pending, } } - } - async fn binned(req: Request, node_config: Arc) -> Result, Error> { info!("-------------------------------------------------------- BINNED"); let (head, body) = req.into_parts(); @@ -238,10 +238,7 @@ async fn binned(req: Request, node_config: Arc) -> Result { - response(StatusCode::OK) - .body(BodyStream::wrapped(s))? - } + Ok(s) => response(StatusCode::OK).body(BodyStream::wrapped(s))?, Err(e) => { error!("{:?}", e); response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())? @@ -250,16 +247,15 @@ async fn binned(req: Request, node_config: Arc) -> Result, node_config: Arc) -> Result, Error> { +async fn prebinned( + req: Request, + node_config: Arc, +) -> Result, Error> { info!("-------------------------------------------------------- PRE-BINNED"); let (head, body) = req.into_parts(); let q = PreBinnedQuery::from_request(&head)?; let ret = match disk::cache::pre_binned_bytes_for_http(node_config, &q) { - Ok(s) => { - response(StatusCode::OK) - .body(BodyStream::wrapped(s))? - } + Ok(s) => response(StatusCode::OK).body(BodyStream::wrapped(s))?, Err(e) => { error!("{:?}", e); response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())? @@ -275,7 +271,7 @@ async fn raw_service(node_config: Arc) -> Result<(), Error> { Ok((stream, addr)) => { taskrun::spawn(raw_conn_handler(stream, addr)); } - Err(e) => Err(e)? + Err(e) => Err(e)?, } } Ok(()) diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index e80cd7d..7801775 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -1,13 +1,12 @@ -#[allow(unused_imports)] -use tracing::{error, warn, info, debug, trace}; -use serde::{Serialize, Deserialize}; +use chrono::{DateTime, TimeZone, Utc}; use err::Error; -use std::path::PathBuf; -use chrono::{DateTime, Utc, TimeZone}; -use std::sync::Arc; +use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; +use std::path::PathBuf; +use std::sync::Arc; use timeunits::*; - +#[allow(unused_imports)] +use tracing::{debug, error, info, trace, warn}; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct AggQuerySingleChannel { @@ -19,7 +18,7 @@ pub struct AggQuerySingleChannel { pub struct BodyStream { //pub receiver: async_channel::Receiver>, - pub inner: Box> + Send + Unpin>, + pub inner: Box> + Send + Unpin>, } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -37,7 +36,6 @@ pub enum ScalarType { } impl ScalarType { - pub fn from_dtype_index(ix: u8) -> Self { use ScalarType::*; match ix { @@ -90,7 +88,6 @@ impl ScalarType { F64 => 12, } } - } #[derive(Debug)] @@ -109,20 +106,17 @@ impl Node { } } - #[derive(Debug)] pub struct Cluster { pub nodes: Vec>, } - #[derive(Debug)] pub struct NodeConfig { pub node: Arc, pub cluster: Arc, } - #[derive(Clone, Debug, Serialize, Deserialize)] pub struct Channel { pub backend: String, @@ -135,7 +129,6 @@ impl Channel { } } - #[derive(Clone, Debug, Serialize, Deserialize)] pub enum TimeRange { Time { @@ -152,7 +145,6 @@ pub enum TimeRange { }, } - #[derive(Clone, Debug, Serialize, Deserialize)] pub struct NanoRange { pub beg: u64, @@ -160,14 +152,11 @@ pub struct NanoRange { } impl NanoRange { - pub fn delta(&self) -> u64 { self.end - self.beg } - } - #[derive(Clone, Debug, Serialize, Deserialize)] pub struct ChannelConfig { pub channel: Channel, @@ -180,7 +169,6 @@ pub struct ChannelConfig { pub big_endian: bool, } - #[derive(Clone, Debug, Serialize, Deserialize)] pub enum Shape { Scalar, @@ -197,8 +185,6 @@ pub mod timeunits { pub const WEEK: u64 = DAY * 7; } - - pub struct BinSpecDimT { pub count: u64, pub ts1: u64, @@ -207,7 +193,6 @@ pub struct BinSpecDimT { } impl BinSpecDimT { - pub fn over_range(count: u64, ts1: u64, ts2: u64) -> Self { use timeunits::*; assert!(count >= 1); @@ -217,15 +202,39 @@ impl BinSpecDimT { assert!(dt <= DAY * 14); let bs = dt / count; let BIN_THRESHOLDS = [ - 2, 10, 100, - 1000, 10_000, 100_000, - MU, MU * 10, MU * 100, - MS, MS * 10, MS * 100, - SEC, SEC * 5, SEC * 10, SEC * 20, - MIN, MIN * 5, MIN * 10, MIN * 20, - HOUR, HOUR * 2, HOUR * 4, HOUR * 12, - DAY, DAY * 2, DAY * 4, DAY * 8, DAY * 16, - WEEK, WEEK * 2, WEEK * 10, WEEK * 60, + 2, + 10, + 100, + 1000, + 10_000, + 100_000, + MU, + MU * 10, + MU * 100, + MS, + MS * 10, + MS * 100, + SEC, + SEC * 5, + SEC * 10, + SEC * 20, + MIN, + MIN * 5, + MIN * 10, + MIN * 20, + HOUR, + HOUR * 2, + HOUR * 4, + HOUR * 12, + DAY, + DAY * 2, + DAY * 4, + DAY * 8, + DAY * 16, + WEEK, + WEEK * 2, + WEEK * 10, + WEEK * 60, ]; let mut i1 = 0; let bs = loop { @@ -257,17 +266,14 @@ impl BinSpecDimT { end: self.ts1 + (ix as u64 + 1) * self.bs, } } - } - #[derive(Clone, Debug)] pub struct PreBinnedPatchGridSpec { bin_t_len: u64, } impl PreBinnedPatchGridSpec { - pub fn new(bin_t_len: u64) -> Self { let mut ok = false; for &j in PATCH_T_LEN_OPTIONS.iter() { @@ -277,11 +283,12 @@ impl PreBinnedPatchGridSpec { } } if !ok { - panic!("invalid bin_t_len for PreBinnedPatchGridSpec {}", bin_t_len); - } - Self { - bin_t_len, + panic!( + "invalid bin_t_len for PreBinnedPatchGridSpec {}", + bin_t_len + ); } + Self { bin_t_len } } pub fn from_query_params(params: &BTreeMap) -> Self { @@ -315,27 +322,11 @@ impl PreBinnedPatchGridSpec { } panic!() } - } -const BIN_T_LEN_OPTIONS: [u64; 6] = [ - SEC * 10, - MIN * 10, - HOUR, - HOUR * 4, - DAY, - DAY * 4, -]; - -const PATCH_T_LEN_OPTIONS: [u64; 6] = [ - MIN * 10, - HOUR, - HOUR * 4, - DAY, - DAY * 4, - DAY * 12, -]; +const BIN_T_LEN_OPTIONS: [u64; 6] = [SEC * 10, MIN * 10, HOUR, HOUR * 4, DAY, DAY * 4]; +const PATCH_T_LEN_OPTIONS: [u64; 6] = [MIN * 10, HOUR, HOUR * 4, DAY, DAY * 4, DAY * 12]; #[derive(Clone, Debug)] pub struct PreBinnedPatchRange { @@ -345,7 +336,6 @@ pub struct PreBinnedPatchRange { } impl PreBinnedPatchRange { - pub fn covering_range(range: NanoRange, min_bin_count: u64) -> Option { assert!(min_bin_count >= 1); assert!(min_bin_count <= 2000); @@ -357,8 +347,7 @@ impl PreBinnedPatchRange { loop { if i1 <= 0 { break None; - } - else { + } else { i1 -= 1; let t = BIN_T_LEN_OPTIONS[i1]; //info!("look at threshold {} bs {}", t, bs); @@ -369,22 +358,17 @@ impl PreBinnedPatchRange { let count = range.delta() / bs; let offset = ts1 / bs; break Some(Self { - grid_spec: PreBinnedPatchGridSpec { - bin_t_len: bs, - }, + grid_spec: PreBinnedPatchGridSpec { bin_t_len: bs }, count, offset, }); - } - else { + } else { } } } } - } - #[derive(Clone, Debug)] pub struct PreBinnedPatchCoord { spec: PreBinnedPatchGridSpec, @@ -392,7 +376,6 @@ pub struct PreBinnedPatchCoord { } impl PreBinnedPatchCoord { - pub fn bin_t_len(&self) -> u64 { self.spec.bin_t_len } @@ -422,7 +405,12 @@ impl PreBinnedPatchCoord { } pub fn to_url_params_strings(&self) -> String { - format!("patch_t_len={}&bin_t_len={}&patch_ix={}", self.spec.patch_t_len(), self.spec.bin_t_len(), self.ix()) + format!( + "patch_t_len={}&bin_t_len={}&patch_ix={}", + self.spec.patch_t_len(), + self.spec.bin_t_len(), + self.ix() + ) } pub fn from_query_params(params: &BTreeMap) -> Self { @@ -432,7 +420,6 @@ impl PreBinnedPatchCoord { ix: patch_ix, } } - } pub struct PreBinnedPatchIterator { @@ -442,7 +429,6 @@ pub struct PreBinnedPatchIterator { } impl PreBinnedPatchIterator { - pub fn from_range(range: PreBinnedPatchRange) -> Self { Self { range, @@ -450,7 +436,6 @@ impl PreBinnedPatchIterator { ix: 0, } } - } impl Iterator for PreBinnedPatchIterator { @@ -459,8 +444,7 @@ impl Iterator for PreBinnedPatchIterator { fn next(&mut self) -> Option { if self.ix >= self.range.count { None - } - else { + } else { let ret = Self::Item { spec: self.range.grid_spec.clone(), ix: self.range.offset + self.ix, @@ -469,16 +453,13 @@ impl Iterator for PreBinnedPatchIterator { Some(ret) } } - } - #[derive(Clone, Debug, Serialize, Deserialize)] pub enum AggKind { DimXBins1, } - pub fn query_params(q: Option<&str>) -> std::collections::BTreeMap { let mut map = std::collections::BTreeMap::new(); match q { @@ -492,13 +473,11 @@ pub fn query_params(q: Option<&str>) -> std::collections::BTreeMap { - } + None => {} } map } - pub trait ToNanos { fn to_nanos(&self) -> u64; } diff --git a/retrieval/src/bin/retrieval.rs b/retrieval/src/bin/retrieval.rs index cc166de..3fbea16 100644 --- a/retrieval/src/bin/retrieval.rs +++ b/retrieval/src/bin/retrieval.rs @@ -1,8 +1,8 @@ -#[allow(unused_imports)] -use tracing::{error, warn, info, debug, trace}; use err::Error; -use netpod::{ChannelConfig, Channel, timeunits::*, ScalarType, Shape, Node, Cluster, NodeConfig}; +use netpod::{timeunits::*, Channel, ChannelConfig, Cluster, Node, NodeConfig, ScalarType, Shape}; use std::sync::Arc; +#[allow(unused_imports)] +use tracing::{debug, error, info, trace, warn}; pub fn main() { match taskrun::run(go()) { @@ -60,9 +60,7 @@ fn simple_fetch() { tb_file_count: 1, buffer_size: 1024 * 8, }; - let cluster = Cluster { - nodes: vec![node], - }; + let cluster = Cluster { nodes: vec![node] }; let cluster = Arc::new(cluster); let node_config = NodeConfig { cluster: cluster, @@ -99,8 +97,13 @@ fn simple_fetch() { let t2 = chrono::Utc::now(); let ms = t2.signed_duration_since(t1).num_milliseconds() as u64; let throughput = ntot / 1024 * 1000 / ms; - info!("total download {} MB throughput {:5} kB/s", ntot / 1024 / 1024, throughput); + info!( + "total download {} MB throughput {:5} kB/s", + ntot / 1024 / 1024, + throughput + ); //Err::<(), _>(format!("test error").into()) Ok(()) - }).unwrap(); + }) + .unwrap(); } diff --git a/retrieval/src/cli.rs b/retrieval/src/cli.rs index beb0066..1a4004e 100644 --- a/retrieval/src/cli.rs +++ b/retrieval/src/cli.rs @@ -1,4 +1,4 @@ -use clap::{Clap, crate_version}; +use clap::{crate_version, Clap}; #[derive(Debug, Clap)] #[clap(name="retrieval", author="Dominik Werder ", version=crate_version!())] @@ -15,5 +15,4 @@ pub enum SubCmd { } #[derive(Debug, Clap)] -pub struct Retrieval { -} +pub struct Retrieval {} diff --git a/retrieval/src/lib.rs b/retrieval/src/lib.rs index 90e21d3..432d4ae 100644 --- a/retrieval/src/lib.rs +++ b/retrieval/src/lib.rs @@ -1,10 +1,10 @@ -#[allow(unused_imports)] -use tracing::{error, warn, info, debug, trace}; use err::Error; -use tokio::task::JoinHandle; -use netpod::{Node, Cluster, NodeConfig}; use hyper::Body; +use netpod::{Cluster, Node, NodeConfig}; use std::sync::Arc; +use tokio::task::JoinHandle; +#[allow(unused_imports)] +use tracing::{debug, error, info, trace, warn}; pub mod cli; @@ -47,28 +47,31 @@ async fn get_cached_0_inner() -> Result<(), Error> { let t2 = chrono::Utc::now(); let ms = t2.signed_duration_since(t1).num_milliseconds() as u64; let throughput = ntot / 1024 * 1000 / ms; - info!("get_cached_0 DONE total download {} MB throughput {:5} kB/s", ntot / 1024 / 1024, throughput); + info!( + "get_cached_0 DONE total download {} MB throughput {:5} kB/s", + ntot / 1024 / 1024, + throughput + ); //Err::<(), _>(format!("test error").into()) Ok(()) } - fn test_cluster() -> Cluster { - let nodes = (0..1).into_iter().map(|id| { - let node = Node { - id, - host: "localhost".into(), - port: 8360 + id as u16, - data_base_path: format!("../tmpdata/node{:02}", id).into(), - ksprefix: "ks".into(), - split: 0, - }; - Arc::new(node) - }) - .collect(); - Cluster { - nodes: nodes, - } + let nodes = (0..1) + .into_iter() + .map(|id| { + let node = Node { + id, + host: "localhost".into(), + port: 8360 + id as u16, + data_base_path: format!("../tmpdata/node{:02}", id).into(), + ksprefix: "ks".into(), + split: 0, + }; + Arc::new(node) + }) + .collect(); + Cluster { nodes: nodes } } fn spawn_test_hosts(cluster: Arc) -> Vec>> { diff --git a/rustfmt.toml b/rustfmt.toml index f8f0f42..0fbfc4e 100644 --- a/rustfmt.toml +++ b/rustfmt.toml @@ -1,3 +1,3 @@ -unstable_features = true -empty_item_single_line = false -control_brace_style = "ClosingNextLine" +#unstable_features = true +#empty_item_single_line = false +#control_brace_style = "ClosingNextLine" diff --git a/taskrun/src/lib.rs b/taskrun/src/lib.rs index 1706602..f4137a6 100644 --- a/taskrun/src/lib.rs +++ b/taskrun/src/lib.rs @@ -1,11 +1,11 @@ -#[allow(unused_imports)] -use tracing::{error, warn, info, debug, trace}; use err::Error; +use std::future::Future; use std::panic; use tokio::task::JoinHandle; -use std::future::Future; +#[allow(unused_imports)] +use tracing::{debug, error, info, trace, warn}; -pub fn run>>(f: F) -> Result { +pub fn run>>(f: F) -> Result { tracing_init(); tokio::runtime::Builder::new_multi_thread() .worker_threads(12) @@ -27,15 +27,20 @@ pub fn run>>(f: F) -> Result(task: T) -> JoinHandle where T: Future + Send + 'static, T::Output: Send + 'static { +pub fn spawn(task: T) -> JoinHandle +where + T: Future + Send + 'static, + T::Output: Send + 'static, +{ tokio::spawn(task) }