diff --git a/disk/src/agg.rs b/disk/src/agg.rs index f35a0e7..31c7d20 100644 --- a/disk/src/agg.rs +++ b/disk/src/agg.rs @@ -1,12 +1,10 @@ -use crate::merge::MergeDim1F32Stream; use crate::EventFull; use err::Error; use futures_core::Stream; -use futures_util::{future::ready, pin_mut, StreamExt}; +use futures_util::StreamExt; use netpod::BinSpecDimT; -use netpod::{timeunits::*, Channel, ChannelConfig, Node, ScalarType, Shape}; +use netpod::{Node, ScalarType}; use std::pin::Pin; -use std::sync::Arc; use std::task::{Context, Poll}; #[allow(unused_imports)] use tracing::{debug, error, info, trace, warn}; @@ -42,7 +40,7 @@ impl AggregatableXdim1Bin for () { impl AggregatableTdim for () { type Output = (); type Aggregator = (); - fn aggregator_new(&self, ts1: u64, ts2: u64) -> Self::Aggregator { + fn aggregator_new(&self, _ts1: u64, _ts2: u64) -> Self::Aggregator { todo!() } } @@ -50,19 +48,19 @@ impl AggregatorTdim for () { type InputValue = (); type OutputValue = (); - fn ends_before(&self, inp: &Self::InputValue) -> bool { + fn ends_before(&self, _inp: &Self::InputValue) -> bool { todo!() } - fn ends_after(&self, inp: &Self::InputValue) -> bool { + fn ends_after(&self, _inp: &Self::InputValue) -> bool { todo!() } - fn starts_after(&self, inp: &Self::InputValue) -> bool { + fn starts_after(&self, _inp: &Self::InputValue) -> bool { todo!() } - fn ingest(&mut self, v: &Self::InputValue) { + fn ingest(&mut self, _v: &Self::InputValue) { todo!() } fn result(self) -> Self::OutputValue { @@ -305,6 +303,7 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator { } } +#[allow(dead_code)] pub struct MinMaxAvgScalarBinBatch { ts1s: Vec, ts2s: Vec, @@ -330,7 +329,7 @@ impl AggregatableXdim1Bin for MinMaxAvgScalarBinBatch { impl AggregatableTdim for MinMaxAvgScalarBinBatch { type Output = MinMaxAvgScalarBinSingle; type Aggregator = MinMaxAvgScalarBinBatchAggregator; - fn aggregator_new(&self, ts1: u64, ts2: u64) -> Self::Aggregator { + fn aggregator_new(&self, _ts1: u64, _ts2: u64) -> Self::Aggregator { todo!() } } @@ -341,19 +340,19 @@ impl AggregatorTdim for MinMaxAvgScalarBinBatchAggregator { type InputValue = MinMaxAvgScalarBinBatch; type OutputValue = MinMaxAvgScalarBinSingle; - fn ends_before(&self, inp: &Self::InputValue) -> bool { + fn ends_before(&self, _inp: &Self::InputValue) -> bool { todo!() } - fn ends_after(&self, inp: &Self::InputValue) -> bool { + fn ends_after(&self, _inp: &Self::InputValue) -> bool { todo!() } - fn starts_after(&self, inp: &Self::InputValue) -> bool { + fn starts_after(&self, _inp: &Self::InputValue) -> bool { todo!() } - fn ingest(&mut self, v: &Self::InputValue) { + fn ingest(&mut self, _v: &Self::InputValue) { todo!() } @@ -384,7 +383,7 @@ impl std::fmt::Debug for MinMaxAvgScalarBinSingle { impl AggregatableTdim for MinMaxAvgScalarBinSingle { type Output = MinMaxAvgScalarBinSingle; type Aggregator = MinMaxAvgScalarBinSingleAggregator; - fn aggregator_new(&self, ts1: u64, ts2: u64) -> Self::Aggregator { + fn aggregator_new(&self, _ts1: u64, _ts2: u64) -> Self::Aggregator { todo!() } } @@ -402,19 +401,19 @@ impl AggregatorTdim for MinMaxAvgScalarBinSingleAggregator { type InputValue = MinMaxAvgScalarBinSingle; type OutputValue = MinMaxAvgScalarBinSingle; - fn ends_before(&self, inp: &Self::InputValue) -> bool { + fn ends_before(&self, _inp: &Self::InputValue) -> bool { todo!() } - fn ends_after(&self, inp: &Self::InputValue) -> bool { + fn ends_after(&self, _inp: &Self::InputValue) -> bool { todo!() } - fn starts_after(&self, inp: &Self::InputValue) -> bool { + fn starts_after(&self, _inp: &Self::InputValue) -> bool { todo!() } - fn ingest(&mut self, v: &Self::InputValue) { + fn ingest(&mut self, _v: &Self::InputValue) { todo!() } @@ -455,7 +454,9 @@ where // do the conversion // TODO only a scalar! - todo!(); + if true { + todo!(); + } let n1 = decomp.len(); assert!(n1 % ty.bytes() as usize == 0); @@ -479,10 +480,10 @@ where ret.tss.push(k.tss[i1]); ret.values.push(j); } - _ => todo!(), + _ => err::todoval(), } } - Ready(Some(Ok(todo!()))) + Ready(Some(Ok(err::todoval()))) } Ready(Some(Err(e))) => Ready(Some(Err(e))), Ready(None) => Ready(None), @@ -744,193 +745,9 @@ pub fn make_test_node(id: u32) -> Node { id, host: "localhost".into(), port: 8800 + id as u16, + port_raw: 8800 + id as u16 + 100, data_base_path: format!("../tmpdata/node{:02}", id).into(), split: id, ksprefix: "ks".into(), } } - -#[test] -fn agg_x_dim_0() { - taskrun::run(async { - agg_x_dim_0_inner().await; - Ok(()) - }) - .unwrap(); -} - -async fn agg_x_dim_0_inner() { - let node = make_test_node(0); - let node = Arc::new(node); - let query = netpod::AggQuerySingleChannel { - channel_config: ChannelConfig { - channel: Channel { - backend: "sf-databuffer".into(), - name: "S10BC01-DBAM070:EOM1_T1".into(), - }, - keyspace: 2, - time_bin_size: DAY, - array: false, - shape: Shape::Scalar, - scalar_type: ScalarType::F64, - big_endian: true, - compression: true, - }, - timebin: 18723, - tb_file_count: 1, - buffer_size: 1024 * 4, - }; - let bin_count = 20; - let ts1 = query.timebin as u64 * query.channel_config.time_bin_size; - let ts2 = ts1 + HOUR * 24; - let fut1 = crate::EventBlobsComplete::new(&query, query.channel_config.clone(), node) - .into_dim_1_f32_stream() - //.take(1000) - .map(|q| { - if let Ok(ref k) = q { - //info!("vals: {:?}", k); - } - q - }) - .into_binned_x_bins_1() - .map(|k| { - //info!("after X binning {:?}", k.as_ref().unwrap()); - k - }) - .into_binned_t(BinSpecDimT::over_range(bin_count, ts1, ts2)) - .map(|k| { - info!("after T binning {:?}", k.as_ref().unwrap()); - k - }) - .for_each(|k| ready(())); - fut1.await; -} - -#[test] -fn agg_x_dim_1() { - taskrun::run(async { - agg_x_dim_1_inner().await; - Ok(()) - }) - .unwrap(); -} - -async fn agg_x_dim_1_inner() { - // sf-databuffer - // /data/sf-databuffer/daq_swissfel/daq_swissfel_3/byTime/S10BC01-DBAM070\:BAM_CH1_NORM/* - // S10BC01-DBAM070:BAM_CH1_NORM - let node = make_test_node(0); - let node = Arc::new(node); - let query = netpod::AggQuerySingleChannel { - channel_config: ChannelConfig { - channel: Channel { - backend: "ks".into(), - name: "wave1".into(), - }, - keyspace: 3, - time_bin_size: DAY, - array: true, - shape: Shape::Wave(1024), - scalar_type: ScalarType::F64, - big_endian: true, - compression: true, - }, - timebin: 0, - tb_file_count: 1, - buffer_size: 17, - }; - let bin_count = 10; - let ts1 = query.timebin as u64 * query.channel_config.time_bin_size; - let ts2 = ts1 + HOUR * 24; - let fut1 = crate::EventBlobsComplete::new(&query, query.channel_config.clone(), node) - .into_dim_1_f32_stream() - //.take(1000) - .map(|q| { - if let Ok(ref k) = q { - //info!("vals: {:?}", k); - } - q - }) - .into_binned_x_bins_1() - .map(|k| { - //info!("after X binning {:?}", k.as_ref().unwrap()); - k - }) - .into_binned_t(BinSpecDimT::over_range(bin_count, ts1, ts2)) - .map(|k| { - info!("after T binning {:?}", k.as_ref().unwrap()); - k - }) - .for_each(|k| ready(())); - fut1.await; -} - -#[test] -fn merge_0() { - taskrun::run(async { - merge_0_inner().await; - Ok(()) - }) - .unwrap(); -} - -async fn merge_0_inner() { - let query = netpod::AggQuerySingleChannel { - channel_config: ChannelConfig { - channel: Channel { - backend: "ks".into(), - name: "wave1".into(), - }, - keyspace: 3, - time_bin_size: DAY, - array: true, - shape: Shape::Wave(17), - scalar_type: ScalarType::F64, - big_endian: true, - compression: true, - }, - timebin: 0, - tb_file_count: 1, - buffer_size: 1024 * 8, - }; - let streams = (0..13) - .into_iter() - .map(|k| make_test_node(k)) - .map(|node| { - let node = Arc::new(node); - crate::EventBlobsComplete::new(&query, query.channel_config.clone(), node).into_dim_1_f32_stream() - }) - .collect(); - MergeDim1F32Stream::new(streams) - .map(|k| { - //info!("NEXT MERGED ITEM ts {:?}", k.as_ref().unwrap().tss); - }) - .fold(0, |k, q| ready(0)) - .await; -} - -pub fn tmp_some_older_things() { - let vals = ValuesDim1 { - tss: vec![0, 1, 2, 3], - values: vec![vec![0., 0., 0.], vec![1., 1., 1.], vec![2., 2., 2.], vec![3., 3., 3.]], - }; - // I want to distinguish already in the outer part between dim-0 and dim-1 and generate - // separate code for these cases... - // That means that also the reading chain itself needs to be typed on that. - // Need to supply some event-payload converter type which has that type as Output type. - let vals2 = vals.into_agg(); - // Now the T-binning: - - /* - T-aggregator must be able to produce empty-values of correct type even if we never get - a single value of input data. - Therefore, it needs the bin range definition. - How do I want to drive the system? - If I write the T-binner as a Stream, then I also need to pass it the input! - Meaning, I need to pass the Stream which produces the actual numbers from disk. - - readchannel() -> Stream of timestamped byte blobs - .to_f32() -> Stream ? indirection to branch on the underlying shape - .agg_x_bins_1() -> Stream ? can I keep it at the single indirection on the top level? - */ -} diff --git a/disk/src/aggtest.rs b/disk/src/aggtest.rs new file mode 100644 index 0000000..164e72e --- /dev/null +++ b/disk/src/aggtest.rs @@ -0,0 +1,203 @@ +use super::agg::{AggregatableXdim1Bin, IntoBinnedT, IntoBinnedXBins1, IntoDim1F32Stream, ValuesDim1}; +use super::merge::MergeDim1F32Stream; +use crate::agg::make_test_node; +use futures_util::StreamExt; +use netpod::timeunits::*; +use netpod::{BinSpecDimT, Channel, ChannelConfig, ScalarType, Shape}; +use std::future::ready; +use std::sync::Arc; +#[allow(unused_imports)] +use tracing::{debug, error, info, trace, warn}; + +#[test] +fn agg_x_dim_0() { + taskrun::run(async { + agg_x_dim_0_inner().await; + Ok(()) + }) + .unwrap(); +} + +async fn agg_x_dim_0_inner() { + let node = make_test_node(0); + let node = Arc::new(node); + let query = netpod::AggQuerySingleChannel { + channel_config: ChannelConfig { + channel: Channel { + backend: "sf-databuffer".into(), + name: "S10BC01-DBAM070:EOM1_T1".into(), + }, + keyspace: 2, + time_bin_size: DAY, + array: false, + shape: Shape::Scalar, + scalar_type: ScalarType::F64, + big_endian: true, + compression: true, + }, + timebin: 18723, + tb_file_count: 1, + buffer_size: 1024 * 4, + }; + let bin_count = 20; + let ts1 = query.timebin as u64 * query.channel_config.time_bin_size; + let ts2 = ts1 + HOUR * 24; + let fut1 = crate::EventBlobsComplete::new(&query, query.channel_config.clone(), node) + .into_dim_1_f32_stream() + //.take(1000) + .map(|q| { + if false { + if let Ok(ref k) = q { + trace!("vals: {:?}", k); + } + } + q + }) + .into_binned_x_bins_1() + .map(|k| { + if false { + trace!("after X binning {:?}", k.as_ref().unwrap()); + } + k + }) + .into_binned_t(BinSpecDimT::over_range(bin_count, ts1, ts2)) + .map(|k| { + if false { + trace!("after T binning {:?}", k.as_ref().unwrap()); + } + k + }) + .for_each(|_k| ready(())); + fut1.await; +} + +#[test] +fn agg_x_dim_1() { + taskrun::run(async { + agg_x_dim_1_inner().await; + Ok(()) + }) + .unwrap(); +} + +async fn agg_x_dim_1_inner() { + // sf-databuffer + // /data/sf-databuffer/daq_swissfel/daq_swissfel_3/byTime/S10BC01-DBAM070\:BAM_CH1_NORM/* + // S10BC01-DBAM070:BAM_CH1_NORM + let node = make_test_node(0); + let node = Arc::new(node); + let query = netpod::AggQuerySingleChannel { + channel_config: ChannelConfig { + channel: Channel { + backend: "ks".into(), + name: "wave1".into(), + }, + keyspace: 3, + time_bin_size: DAY, + array: true, + shape: Shape::Wave(1024), + scalar_type: ScalarType::F64, + big_endian: true, + compression: true, + }, + timebin: 0, + tb_file_count: 1, + buffer_size: 17, + }; + let bin_count = 10; + let ts1 = query.timebin as u64 * query.channel_config.time_bin_size; + let ts2 = ts1 + HOUR * 24; + let fut1 = crate::EventBlobsComplete::new(&query, query.channel_config.clone(), node) + .into_dim_1_f32_stream() + //.take(1000) + .map(|q| { + if false { + if let Ok(ref k) = q { + info!("vals: {:?}", k); + } + } + q + }) + .into_binned_x_bins_1() + .map(|k| { + //info!("after X binning {:?}", k.as_ref().unwrap()); + k + }) + .into_binned_t(BinSpecDimT::over_range(bin_count, ts1, ts2)) + .map(|k| { + info!("after T binning {:?}", k.as_ref().unwrap()); + k + }) + .for_each(|_k| ready(())); + fut1.await; +} + +#[test] +fn merge_0() { + taskrun::run(async { + merge_0_inner().await; + Ok(()) + }) + .unwrap(); +} + +async fn merge_0_inner() { + let query = netpod::AggQuerySingleChannel { + channel_config: ChannelConfig { + channel: Channel { + backend: "ks".into(), + name: "wave1".into(), + }, + keyspace: 3, + time_bin_size: DAY, + array: true, + shape: Shape::Wave(17), + scalar_type: ScalarType::F64, + big_endian: true, + compression: true, + }, + timebin: 0, + tb_file_count: 1, + buffer_size: 1024 * 8, + }; + let streams = (0..13) + .into_iter() + .map(|k| make_test_node(k)) + .map(|node| { + let node = Arc::new(node); + crate::EventBlobsComplete::new(&query, query.channel_config.clone(), node).into_dim_1_f32_stream() + }) + .collect(); + MergeDim1F32Stream::new(streams) + .map(|_k| { + //info!("NEXT MERGED ITEM ts {:?}", k.as_ref().unwrap().tss); + }) + .fold(0, |_k, _q| ready(0)) + .await; +} + +pub fn tmp_some_older_things() { + let vals = ValuesDim1 { + tss: vec![0, 1, 2, 3], + values: vec![vec![0., 0., 0.], vec![1., 1., 1.], vec![2., 2., 2.], vec![3., 3., 3.]], + }; + // I want to distinguish already in the outer part between dim-0 and dim-1 and generate + // separate code for these cases... + // That means that also the reading chain itself needs to be typed on that. + // Need to supply some event-payload converter type which has that type as Output type. + let _vals2 = vals.into_agg(); + // Now the T-binning: + + /* + T-aggregator must be able to produce empty-values of correct type even if we never get + a single value of input data. + Therefore, it needs the bin range definition. + How do I want to drive the system? + If I write the T-binner as a Stream, then I also need to pass it the input! + Meaning, I need to pass the Stream which produces the actual numbers from disk. + + readchannel() -> Stream of timestamped byte blobs + .to_f32() -> Stream ? indirection to branch on the underlying shape + .agg_x_bins_1() -> Stream ? can I keep it at the single indirection on the top level? + */ +} diff --git a/disk/src/cache.rs b/disk/src/cache.rs index 60b9655..9715280 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -3,19 +3,17 @@ use bytes::{BufMut, Bytes, BytesMut}; use chrono::{DateTime, Utc}; use err::Error; use futures_core::Stream; -use futures_util::{pin_mut, FutureExt, StreamExt, TryFutureExt}; -use http::uri::Scheme; +use futures_util::{pin_mut, FutureExt, StreamExt}; use netpod::{ - AggKind, Channel, Cluster, NanoRange, Node, NodeConfig, PreBinnedPatchCoord, PreBinnedPatchGridSpec, - PreBinnedPatchIterator, PreBinnedPatchRange, ToNanos, + AggKind, Channel, Cluster, NanoRange, NodeConfig, PreBinnedPatchCoord, PreBinnedPatchIterator, PreBinnedPatchRange, + ToNanos, }; use serde::{Deserialize, Serialize}; -use std::future::{ready, Future}; +use std::future::Future; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; use tiny_keccak::Hasher; -use tokio::fs::OpenOptions; #[allow(unused_imports)] use tracing::{debug, error, info, trace, warn}; @@ -97,7 +95,7 @@ impl Stream for BinnedBytesForHttpStream { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; match self.inp.poll_next_unpin(cx) { - Ready(Some(Ok(k))) => { + Ready(Some(Ok(_k))) => { let mut buf = BytesMut::with_capacity(250); buf.put(&b"TODO serialize to bytes\n"[..]); Ready(Some(Ok(buf.freeze()))) @@ -168,7 +166,7 @@ impl Stream for PreBinnedValueByteStream { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; match self.inp.poll_next_unpin(cx) { - Ready(Some(Ok(k))) => { + Ready(Some(Ok(_k))) => { error!("TODO convert item to Bytes"); let buf = Bytes::new(); Ready(Some(Ok(buf))) @@ -233,7 +231,7 @@ impl PreBinnedValueStream { let channel = self.channel.clone(); let agg_kind = self.agg_kind.clone(); let node_config = self.node_config.clone(); - let mut patch_it = PreBinnedPatchIterator::from_range(range); + let patch_it = PreBinnedPatchIterator::from_range(range); let s = futures_util::stream::iter(patch_it) .map(move |coord| { PreBinnedValueFetchedStream::new(coord, channel.clone(), agg_kind.clone(), node_config.clone()) @@ -264,13 +262,10 @@ impl Stream for PreBinnedValueStream { fut.poll_next_unpin(cx) } else if let Some(fut) = self.open_check_local_file.as_mut() { match fut.poll_unpin(cx) { - Ready(Ok(file)) => { - todo!("IMPLEMENT READ FROM LOCAL CACHE"); - Pending - } + Ready(Ok(_file)) => err::todoval(), Ready(Err(e)) => match e.kind() { std::io::ErrorKind::NotFound => { - warn!("TODO LOCAL CACHE FILE NOT FOUND"); + error!("TODO LOCAL CACHE FILE NOT FOUND"); self.try_setup_fetch_prebinned_higher_res(); continue 'outer; } @@ -282,6 +277,7 @@ impl Stream for PreBinnedValueStream { Pending => Pending, } } else { + #[allow(unused_imports)] use std::os::unix::fs::OpenOptionsExt; let mut opts = std::fs::OpenOptions::new(); opts.read(true); @@ -295,7 +291,6 @@ impl Stream for PreBinnedValueStream { pub struct PreBinnedValueFetchedStream { uri: http::Uri, - patch_coord: PreBinnedPatchCoord, resfut: Option, res: Option>, } @@ -323,7 +318,6 @@ impl PreBinnedValueFetchedStream { .unwrap(); Self { uri, - patch_coord, resfut: None, res: None, } @@ -345,14 +339,14 @@ impl Stream for PreBinnedValueFetchedStream { pin_mut!(res); use hyper::body::HttpBody; match res.poll_data(cx) { - Ready(Some(Ok(k))) => Pending, + Ready(Some(Ok(_k))) => todo!(), Ready(Some(Err(e))) => Ready(Some(Err(e.into()))), Ready(None) => Ready(None), Pending => Pending, } } None => match self.resfut.as_mut() { - Some(mut resfut) => match resfut.poll_unpin(cx) { + Some(resfut) => match resfut.poll_unpin(cx) { Ready(res) => match res { Ok(res) => { info!("GOT result from SUB REQUEST: {:?}", res); @@ -394,7 +388,6 @@ impl BinnedStream { node_config: Arc, ) -> Self { warn!("BinnedStream will open a PreBinnedValueStream"); - let mut patch_it = patch_it; let inp = futures_util::stream::iter(patch_it) .map(move |coord| { PreBinnedValueFetchedStream::new(coord, channel.clone(), agg_kind.clone(), node_config.clone()) @@ -426,7 +419,7 @@ impl Stream for BinnedStream { pub struct SomeReturnThing {} impl From for Bytes { - fn from(k: SomeReturnThing) -> Self { + fn from(_k: SomeReturnThing) -> Self { error!("TODO convert result to octets"); todo!("TODO convert result to octets") } @@ -441,7 +434,7 @@ pub fn node_ix_for_patch(patch_coord: &PreBinnedPatchCoord, channel: &Channel, c hash.update(&patch_coord.bin_t_len().to_le_bytes()); let mut out = [0; 32]; hash.finalize(&mut out); - let mut a = [out[0], out[1], out[2], out[3]]; + let a = [out[0], out[1], out[2], out[3]]; let ix = u32::from_le_bytes(a) % cluster.nodes.len() as u32; info!("node_ix_for_patch {}", ix); ix diff --git a/disk/src/channelconfig.rs b/disk/src/channelconfig.rs index 06d829d..e34e8a5 100644 --- a/disk/src/channelconfig.rs +++ b/disk/src/channelconfig.rs @@ -1,5 +1,4 @@ use err::Error; -use nom::error::{ErrorKind, VerboseError}; use nom::number::complete::{be_i16, be_i32, be_i64, be_i8, be_u8}; use nom::Needed; #[allow(unused_imports)] @@ -7,7 +6,6 @@ use nom::{ bytes::complete::{tag, take, take_while_m_n}, combinator::map_res, sequence::tuple, - IResult, }; use num_derive::{FromPrimitive, ToPrimitive}; use num_traits::ToPrimitive; @@ -15,6 +13,16 @@ use serde::{Deserialize, Serialize}; #[allow(unused_imports)] use tracing::{debug, error, info, trace, warn}; +type NRes<'a, O> = nom::IResult<&'a [u8], O, err::Error>; + +fn mkerr<'a, S, O>(msg: S) -> NRes<'a, O> +where + S: Into, +{ + let e = Error::with_msg(msg); + Err(nom::Err::Error(e)) +} + #[derive(Debug, FromPrimitive, ToPrimitive, Serialize, Deserialize)] pub enum DType { Bool = 0, @@ -56,7 +64,7 @@ pub struct ConfigEntry { pub pulse: i64, pub ks: i32, pub bs: i64, - pub splitCount: i32, + pub split_count: i32, pub status: i32, pub bb: i8, pub modulo: i32, @@ -69,71 +77,62 @@ pub struct ConfigEntry { */ pub precision: i16, pub dtype: DType, - pub isCompressed: bool, - pub isShaped: bool, - pub isArray: bool, - pub isBigEndian: bool, - pub compressionMethod: Option, + pub is_compressed: bool, + pub is_shaped: bool, + pub is_array: bool, + pub is_big_endian: bool, + pub compression_method: Option, pub shape: Option>, - pub sourceName: Option, + pub source_name: Option, unit: Option, description: Option, - optionalFields: Option, - valueConverter: Option, + optional_fields: Option, + value_converter: Option, } #[derive(Debug, Serialize, Deserialize)] pub struct Config { - pub formatVersion: i16, - pub channelName: String, + pub format_version: i16, + pub channel_name: String, pub entries: Vec, } -fn parse_short_string(inp: &[u8]) -> IResult<&[u8], Option> { +fn parse_short_string(inp: &[u8]) -> NRes> { let (inp, len1) = be_i32(inp)?; if len1 == -1 { return Ok((inp, None)); } if len1 < 4 { - error!("bad string len {}", len1); - let err = nom::error::make_error(inp, ErrorKind::Verify); - return Err(nom::Err::Error(err)); + return mkerr(format!("bad string len {}", len1)); } if len1 > 500 { - error!("large string len {}", len1); - let err = nom::error::make_error(inp, ErrorKind::Verify); - return Err(nom::Err::Error(err)); + return mkerr(format!("large string len {}", len1)); } let (inp, snb) = take((len1 - 4) as usize)(inp)?; match String::from_utf8(snb.to_vec()) { Ok(s1) => Ok((inp, Some(s1))), - Err(e) => { - let err = nom::error::make_error(inp, ErrorKind::Verify); - Err(nom::Err::Error(err)) - } + Err(e) => mkerr(format!("{:?}", e)), } } -pub fn parse_entry(inp: &[u8]) -> IResult<&[u8], Option> { - let t = be_i32(inp); - let (inp, len1) = t?; +//pub fn parse_entry(inp: &[u8]) -> IResult<&[u8], Option> { +pub fn parse_entry(inp: &[u8]) -> NRes> { + let (inp, len1) = be_i32(inp)?; if len1 < 0 || len1 > 4000 { - return Err(nom::Err::Error(nom::error::Error::new(inp, ErrorKind::Verify))); - //return Err(format!("ConfigEntry bad len1 {}", len1).into()); + return mkerr(format!("ConfigEntry bad len1 {}", len1)); } if inp.len() == 0 { return Ok((inp, None)); } if inp.len() < len1 as usize - 4 { return Err(nom::Err::Incomplete(Needed::new(len1 as usize - 4))); - //return Err(format!("incomplete input").into()); } - let inpE = &inp[(len1 - 8) as usize..]; + let inp_e = &inp[(len1 - 8) as usize..]; let (inp, ts) = be_i64(inp)?; let (inp, pulse) = be_i64(inp)?; let (inp, ks) = be_i32(inp)?; let (inp, bs) = be_i64(inp)?; - let (inp, splitCount) = be_i32(inp)?; + let (inp, split_count) = be_i32(inp)?; let (inp, status) = be_i32(inp)?; let (inp, bb) = be_i8(inp)?; let (inp, modulo) = be_i32(inp)?; @@ -141,47 +140,41 @@ pub fn parse_entry(inp: &[u8]) -> IResult<&[u8], Option> { let (inp, precision) = be_i16(inp)?; let (inp, dtlen) = be_i32(inp)?; if dtlen > 100 { - error!("unexpected data type len {}", dtlen); - return Err(nom::Err::Error(nom::error::make_error(inp, ErrorKind::Verify))); - //return Err(format!("unexpected data type len {}", dtlen).into()); + return mkerr(format!("unexpected data type len {}", dtlen)); } let (inp, dtmask) = be_u8(inp)?; - let isCompressed = dtmask & 0x80 != 0; - let isArray = dtmask & 0x40 != 0; - let isBigEndian = dtmask & 0x20 != 0; - let isShaped = dtmask & 0x10 != 0; + let is_compressed = dtmask & 0x80 != 0; + let is_array = dtmask & 0x40 != 0; + let is_big_endian = dtmask & 0x20 != 0; + let is_shaped = dtmask & 0x10 != 0; let (inp, dtype) = be_i8(inp)?; if dtype > 13 { - error!("unexpected data type {}", dtype); - return Err(nom::Err::Error(nom::error::make_error(inp, ErrorKind::Verify))); + return mkerr(format!("unexpected data type {}", dtype)); } let dtype = match num_traits::FromPrimitive::from_i8(dtype) { Some(k) => k, None => { - error!("Can not convert {} to DType", dtype); - return Err(nom::Err::Error(nom::error::make_error(inp, ErrorKind::Verify))); + return mkerr(format!("Can not convert {} to DType", dtype)); } }; - let (inp, compressionMethod) = match isCompressed { + let (inp, compression_method) = match is_compressed { false => (inp, None), true => { let (inp, cm) = be_u8(inp)?; match num_traits::FromPrimitive::from_u8(cm) { Some(k) => (inp, Some(k)), None => { - error!("unknown compression"); - return Err(nom::Err::Error(nom::error::make_error(inp, ErrorKind::Verify))); + return mkerr(format!("unknown compression")); } } } }; - let (inp, shape) = match isShaped { + let (inp, shape) = match is_shaped { false => (inp, None), true => { let (mut inp, dim) = be_u8(inp)?; if dim > 4 { - error!("unexpected number of dimensions: {}", dim); - return Err(nom::Err::Error(nom::error::make_error(inp, ErrorKind::Verify))); + return mkerr(format!("unexpected number of dimensions: {}", dim)); } let mut shape = vec![]; for _ in 0..dim { @@ -192,42 +185,41 @@ pub fn parse_entry(inp: &[u8]) -> IResult<&[u8], Option> { (inp, Some(shape)) } }; - let (inp, sourceName) = parse_short_string(inp)?; + let (inp, source_name) = parse_short_string(inp)?; let (inp, unit) = parse_short_string(inp)?; let (inp, description) = parse_short_string(inp)?; - let (inp, optionalFields) = parse_short_string(inp)?; - let (inp, valueConverter) = parse_short_string(inp)?; - assert_eq!(inp.len(), inpE.len()); - let (inpE, len2) = be_i32(inpE)?; + let (inp, optional_fields) = parse_short_string(inp)?; + let (inp, value_converter) = parse_short_string(inp)?; + assert_eq!(inp.len(), inp_e.len()); + let (inp_e, len2) = be_i32(inp_e)?; if len1 != len2 { - error!("mismatch len1 {} len2 {}", len1, len2); - return Err(nom::Err::Error(nom::error::make_error(inp, ErrorKind::Verify))); + return mkerr(format!("mismatch len1 {} len2 {}", len1, len2)); } Ok(( - inpE, + inp_e, Some(ConfigEntry { ts, pulse, ks, bs, - splitCount, + split_count: split_count, status, bb, modulo, offset, precision, dtype, - isCompressed, - isArray, - isShaped, - isBigEndian, - compressionMethod, + is_compressed: is_compressed, + is_array: is_array, + is_shaped: is_shaped, + is_big_endian: is_big_endian, + compression_method: compression_method, shape, - sourceName, + source_name: source_name, unit, description, - optionalFields, - valueConverter, + optional_fields: optional_fields, + value_converter: value_converter, }), )) } @@ -235,39 +227,36 @@ pub fn parse_entry(inp: &[u8]) -> IResult<&[u8], Option> { /* Parse the full configuration file. */ -pub fn parseConfig(inp: &[u8]) -> IResult<&[u8], Config> { +pub fn parse_config(inp: &[u8]) -> NRes { let (inp, ver) = be_i16(inp)?; let (inp, len1) = be_i32(inp)?; if len1 <= 8 || len1 > 500 { - error!("no channel name. len1 {}", len1); - return Err(nom::Err::Error(nom::error::make_error(inp, ErrorKind::Verify))); + return mkerr(format!("no channel name. len1 {}", len1)); } let (inp, chn) = take((len1 - 8) as usize)(inp)?; let (inp, len2) = be_i32(inp)?; if len1 != len2 { - error!("Mismatch len1 {} len2 {}", len1, len2); - return Err(nom::Err::Error(nom::error::make_error(inp, ErrorKind::Verify))); + return mkerr(format!("Mismatch len1 {} len2 {}", len1, len2)); } let mut entries = vec![]; - let mut inpA = inp; - while inpA.len() > 0 { - let inp = inpA; + let mut inp_a = inp; + while inp_a.len() > 0 { + let inp = inp_a; let (inp, e) = parse_entry(inp)?; if let Some(e) = e { entries.push(e); } - inpA = inp; + inp_a = inp; } - let channelName = match String::from_utf8(chn.to_vec()) { + let channel_name = match String::from_utf8(chn.to_vec()) { Ok(k) => k, Err(e) => { - error!("channelName utf8 error"); - return Err(nom::Err::Error(nom::error::make_error(inp, ErrorKind::Verify))); + return mkerr(format!("channelName utf8 error {:?}", e)); } }; let ret = Config { - formatVersion: ver, - channelName, + format_version: ver, + channel_name: channel_name, entries: entries, }; Ok((inp, ret)) @@ -285,15 +274,15 @@ fn read_data() -> Vec { #[test] fn parse_dummy() { - let config = parseConfig(&[0, 0, 0, 0, 0, 11, 0x61, 0x62, 0x63, 0, 0, 0, 11, 0, 0, 0, 1]).unwrap(); - assert_eq!(0, config.1.formatVersion); - assert_eq!("abc", config.1.channelName); + let config = parse_config(&[0, 0, 0, 0, 0, 11, 0x61, 0x62, 0x63, 0, 0, 0, 11, 0, 0, 0, 1]).unwrap(); + assert_eq!(0, config.1.format_version); + assert_eq!("abc", config.1.channel_name); } #[test] fn open_file() { - let config = parseConfig(&read_data()).unwrap().1; - assert_eq!(0, config.formatVersion); + let config = parse_config(&read_data()).unwrap().1; + assert_eq!(0, config.format_version); assert_eq!(9, config.entries.len()); for e in &config.entries { assert!(e.ts >= 631152000000000000); diff --git a/disk/src/gen.rs b/disk/src/gen.rs index d66f101..d0e66a6 100644 --- a/disk/src/gen.rs +++ b/disk/src/gen.rs @@ -1,19 +1,12 @@ use crate::ChannelConfigExt; use bitshuffle::bitshuffle_compress; -use bytes::{Buf, BufMut, Bytes, BytesMut}; +use bytes::{BufMut, BytesMut}; use err::Error; -use futures_core::Stream; -use futures_util::future::FusedFuture; -use futures_util::{pin_mut, StreamExt}; use netpod::ScalarType; use netpod::{timeunits::*, Channel, ChannelConfig, Node, Shape}; -use std::future::Future; use std::path::{Path, PathBuf}; -use std::pin::Pin; -use std::sync::Arc; -use std::task::{Context, Poll}; use tokio::fs::{File, OpenOptions}; -use tokio::io::{AsyncRead, AsyncWriteExt}; +use tokio::io::AsyncWriteExt; #[allow(unused_imports)] use tracing::{debug, error, info, trace, warn}; @@ -58,6 +51,7 @@ pub async fn gen_test_data() -> Result<(), Error> { id: i1, host: "localhost".into(), port: 7780 + i1 as u16, + port_raw: 7780 + i1 as u16 + 100, split: i1, data_base_path: data_base_path.join(format!("node{:02}", i1)), ksprefix: ksprefix.clone(), @@ -108,7 +102,12 @@ async fn gen_channel(chn: &ChannelGenProps, node: &Node, ensemble: &Ensemble) -> Ok(()) } -async fn gen_config(config_path: &Path, config: &ChannelConfig, node: &Node, ensemble: &Ensemble) -> Result<(), Error> { +async fn gen_config( + config_path: &Path, + config: &ChannelConfig, + _node: &Node, + _ensemble: &Ensemble, +) -> Result<(), Error> { let path = config_path.join("latest"); tokio::fs::create_dir_all(&path).await?; let path = path.join("00000_Config"); @@ -173,7 +172,7 @@ async fn gen_config(config_path: &Path, config: &ChannelConfig, node: &Node, ens let len = p2 - p1 + 4; buf.put_i32(len as i32); buf.as_mut()[p1..].as_mut().put_i32(len as i32); - file.write(&buf); + file.write(&buf).await?; Ok(()) } diff --git a/disk/src/lib.rs b/disk/src/lib.rs index 7e0083d..a24d503 100644 --- a/disk/src/lib.rs +++ b/disk/src/lib.rs @@ -18,6 +18,8 @@ use tokio::io::AsyncRead; use tracing::{debug, error, info, trace, warn}; pub mod agg; +#[cfg(test)] +pub mod aggtest; pub mod cache; pub mod channelconfig; pub mod gen; @@ -136,7 +138,7 @@ pub fn raw_concat_channel_read_stream_try_open_in_background( query: &netpod::AggQuerySingleChannel, node: Arc, ) -> impl Stream> + Send { - let mut query = query.clone(); + let query = query.clone(); let node = node.clone(); async_stream::stream! { use tokio::io::AsyncReadExt; @@ -349,7 +351,7 @@ pub fn parsed1( match fileres { Ok(file) => { let inp = Box::pin(file_content_stream(file, query.buffer_size as usize)); - let mut chunker = EventChunker::new(inp, todo!()); + let mut chunker = EventChunker::new(inp, err::todoval()); while let Some(evres) = chunker.next().await { match evres { Ok(evres) => { @@ -414,7 +416,7 @@ impl Stream for EventBlobsComplete { Ready(Some(k)) => match k { Ok(file) => { let inp = Box::pin(file_content_stream(file, self.buffer_size as usize)); - let mut chunker = EventChunker::new(inp, self.channel_config.clone()); + let chunker = EventChunker::new(inp, self.channel_config.clone()); self.evs.replace(chunker); continue 'outer; } @@ -441,7 +443,7 @@ pub fn event_blobs_complete( match fileres { Ok(file) => { let inp = Box::pin(file_content_stream(file, query.buffer_size as usize)); - let mut chunker = EventChunker::new(inp, todo!()); + let mut chunker = EventChunker::new(inp, err::todoval()); while let Some(evres) = chunker.next().await { match evres { Ok(evres) => { @@ -463,10 +465,8 @@ pub fn event_blobs_complete( pub struct EventChunker { inp: NeedMinBuffer, - had_channel: bool, polled: u32, state: DataFileState, - tmpbuf: Vec, channel_config: ChannelConfig, } @@ -484,10 +484,8 @@ impl EventChunker { inp.set_need_min(6); Self { inp: inp, - had_channel: false, polled: 0, state: DataFileState::FileHeader, - tmpbuf: vec![0; 1024 * 1024 * 4], channel_config, } } @@ -545,7 +543,7 @@ impl EventChunker { let mut sl = std::io::Cursor::new(buf.as_ref()); sl.read_i32::().unwrap(); sl.read_i64::().unwrap(); - let ts = sl.read_i64::().unwrap(); + let _ts = sl.read_i64::().unwrap(); //info!("parse_buf not enough C len {} have {} ts {}", len, buf.len(), ts); need_min = len as u32; break; @@ -569,7 +567,7 @@ impl EventChunker { use dtflags::*; let is_compressed = type_flags & COMPRESSION != 0; let is_array = type_flags & ARRAY != 0; - let is_big_endian = type_flags & BIG_ENDIAN != 0; + let _is_big_endian = type_flags & BIG_ENDIAN != 0; let is_shaped = type_flags & SHAPE != 0; if let Shape::Wave(_) = self.channel_config.shape { assert!(is_array); diff --git a/disk/src/merge.rs b/disk/src/merge.rs index addd4fa..f0dcdea 100644 --- a/disk/src/merge.rs +++ b/disk/src/merge.rs @@ -2,7 +2,7 @@ use crate::agg::{Dim1F32Stream, ValuesDim1}; use crate::EventFull; use err::Error; use futures_core::Stream; -use futures_util::{future::ready, pin_mut, StreamExt}; +use futures_util::StreamExt; use std::pin::Pin; use std::task::{Context, Poll}; #[allow(unused_imports)] @@ -61,10 +61,9 @@ where self.current[i1] = CurVal::Val(k); } Ready(Some(Err(e))) => { + // TODO emit this error, consider this stream as done, anything more to do here? //self.current[i1] = CurVal::Err(e); return Ready(Some(Err(e))); - // TODO emit this error, consider this stream as done. - todo!() } Ready(None) => { self.current[i1] = CurVal::Finish; @@ -72,7 +71,6 @@ where Pending => { // TODO is this behaviour correct? return Pending; - todo!() } } } @@ -128,6 +126,7 @@ where enum CurVal { None, Finish, + #[allow(dead_code)] Err(Error), Val(ValuesDim1), } diff --git a/disk/src/raw.rs b/disk/src/raw.rs index 265457c..c710e63 100644 --- a/disk/src/raw.rs +++ b/disk/src/raw.rs @@ -2,6 +2,7 @@ Provide ser/de of value data to a good net exchange format. */ +#[allow(dead_code)] async fn local_unpacked_test() { // TODO what kind of query format? What information do I need here? // Don't need exact details of channel because I need to parse the databuffer config anyway. @@ -24,8 +25,8 @@ async fn local_unpacked_test() { buffer_size: 1024 * 8, };*/ - let query = todo!(); - let node = todo!(); + let query = err::todoval(); + let node = err::todoval(); // TODO generate channel configs for my test data. @@ -34,5 +35,5 @@ async fn local_unpacked_test() { // TODO find the matching config entry. (bonus: fuse consecutive compatible entries) use crate::agg::IntoDim1F32Stream; - let stream = crate::EventBlobsComplete::new(&query, query.channel_config.clone(), node).into_dim_1_f32_stream(); + let _stream = crate::EventBlobsComplete::new(&query, query.channel_config.clone(), node).into_dim_1_f32_stream(); } diff --git a/err/Cargo.toml b/err/Cargo.toml index 1aac78c..f274563 100644 --- a/err/Cargo.toml +++ b/err/Cargo.toml @@ -7,6 +7,7 @@ edition = "2018" [dependencies] hyper = { version = "0.14", features = ["http1", "http2", "client", "server", "tcp"] } http = "0.2" +tokio = { version = "1.5.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } serde_json = "1.0" async-channel = "1.6" chrono = { version = "0.4.19", features = ["serde"] } diff --git a/err/src/lib.rs b/err/src/lib.rs index c68b759..8c78d3e 100644 --- a/err/src/lib.rs +++ b/err/src/lib.rs @@ -2,6 +2,7 @@ use nom::error::ErrorKind; use std::fmt::Debug; use std::num::ParseIntError; use std::string::FromUtf8Error; +use tokio::task::JoinError; #[derive(Debug)] pub struct Error { @@ -82,28 +83,31 @@ impl From for Error { } } -impl From> for Error { +impl From> for Error { fn from(k: nom::Err) -> Self { - Self { - msg: format!("nom::Err"), - } - } -} - -impl From> for Error { - fn from(k: nom::error::VerboseError) -> Self { - Self { - msg: format!("nom::error::VerboseError"), - } + Self::with_msg(format!("nom::Err {:?}", k)) } } impl nom::error::ParseError for Error { - fn from_error_kind(input: I, kind: ErrorKind) -> Self { - todo!() + fn from_error_kind(_input: I, kind: ErrorKind) -> Self { + Self::with_msg(format!("ParseError {:?}", kind)) } - fn append(input: I, kind: ErrorKind, other: Self) -> Self { - todo!() + fn append(_input: I, kind: ErrorKind, other: Self) -> Self { + Self::with_msg(format!("ParseError kind {:?} other {:?}", kind, other)) } } + +impl From for Error { + fn from(k: JoinError) -> Self { + Self::with_msg(format!("JoinError {:?}", k)) + } +} + +pub fn todoval() -> T { + if true { + todo!("TODO todoval"); + } + todo!() +} diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index 423758b..5184625 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -8,7 +8,7 @@ use http::{HeaderMap, Method, StatusCode}; use hyper::service::{make_service_fn, service_fn}; use hyper::{server::Server, Body, Request, Response}; use net::SocketAddr; -use netpod::{AggKind, Cluster, Node, NodeConfig}; +use netpod::NodeConfig; use panic::{AssertUnwindSafe, UnwindSafe}; use pin::Pin; use std::{future, net, panic, pin, sync, task}; @@ -37,7 +37,7 @@ pub async fn host(node_config: Arc) -> Result<(), Error> { } }); Server::bind(&addr).serve(make_service).await?; - rawjh.await; + rawjh.await??; Ok(()) } @@ -86,7 +86,7 @@ async fn data_api_proxy_try(req: Request, node_config: Arc) -> let path = uri.path(); if path == "/api/1/parsed_raw" { if req.method() == Method::POST { - Ok(parsed_raw(req).await?) + Ok(parsed_raw(req, node_config.clone()).await?) } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } @@ -118,8 +118,8 @@ where .header("access-control-allow-headers", "*") } -async fn parsed_raw(req: Request) -> Result, Error> { - let node = todo!("get node from config"); +async fn parsed_raw(req: Request, node_config: Arc) -> Result, Error> { + let node = node_config.node.clone(); use netpod::AggQuerySingleChannel; let reqbody = req.into_body(); let bodyslice = hyper::body::to_bytes(reqbody).await?; @@ -215,7 +215,7 @@ where async fn binned(req: Request, node_config: Arc) -> Result, Error> { info!("-------------------------------------------------------- BINNED"); - let (head, body) = req.into_parts(); + let (head, _body) = req.into_parts(); //let params = netpod::query_params(head.uri.query()); // TODO @@ -237,7 +237,7 @@ async fn binned(req: Request, node_config: Arc) -> Result, node_config: Arc) -> Result, Error> { info!("-------------------------------------------------------- PRE-BINNED"); - let (head, body) = req.into_parts(); + let (head, _body) = req.into_parts(); let q = PreBinnedQuery::from_request(&head)?; let ret = match disk::cache::pre_binned_bytes_for_http(node_config, &q) { Ok(s) => response(StatusCode::OK).body(BodyStream::wrapped(s))?, @@ -250,7 +250,8 @@ async fn prebinned(req: Request, node_config: Arc) -> Result) -> Result<(), Error> { - let lis = tokio::net::TcpListener::bind("0.0.0.0:5555").await?; + let addr = format!("0.0.0.0:{}", node_config.node.port_raw); + let lis = tokio::net::TcpListener::bind(addr).await?; loop { match lis.accept().await { Ok((stream, addr)) => { @@ -259,7 +260,6 @@ async fn raw_service(node_config: Arc) -> Result<(), Error> { Err(e) => Err(e)?, } } - Ok(()) } async fn raw_conn_handler(mut stream: TcpStream, addr: SocketAddr) -> Result<(), Error> { diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index fcb0472..a51627c 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -95,6 +95,7 @@ pub struct Node { pub id: u32, pub host: String, pub port: u16, + pub port_raw: u16, pub split: u32, pub data_base_path: PathBuf, pub ksprefix: String, @@ -192,41 +193,6 @@ impl BinSpecDimT { let dt = ts2 - ts1; assert!(dt <= DAY * 14); let bs = dt / count; - let BIN_THRESHOLDS = [ - 2, - 10, - 100, - 1000, - 10_000, - 100_000, - MU, - MU * 10, - MU * 100, - MS, - MS * 10, - MS * 100, - SEC, - SEC * 5, - SEC * 10, - SEC * 20, - MIN, - MIN * 5, - MIN * 10, - MIN * 20, - HOUR, - HOUR * 2, - HOUR * 4, - HOUR * 12, - DAY, - DAY * 2, - DAY * 4, - DAY * 8, - DAY * 16, - WEEK, - WEEK * 2, - WEEK * 10, - WEEK * 60, - ]; let mut i1 = 0; let bs = loop { if i1 >= BIN_THRESHOLDS.len() { @@ -314,6 +280,42 @@ const BIN_T_LEN_OPTIONS: [u64; 6] = [SEC * 10, MIN * 10, HOUR, HOUR * 4, DAY, DA const PATCH_T_LEN_OPTIONS: [u64; 6] = [MIN * 10, HOUR, HOUR * 4, DAY, DAY * 4, DAY * 12]; +const BIN_THRESHOLDS: [u64; 33] = [ + 2, + 10, + 100, + 1000, + 10_000, + 100_000, + MU, + MU * 10, + MU * 100, + MS, + MS * 10, + MS * 100, + SEC, + SEC * 5, + SEC * 10, + SEC * 20, + MIN, + MIN * 5, + MIN * 10, + MIN * 20, + HOUR, + HOUR * 2, + HOUR * 4, + HOUR * 12, + DAY, + DAY * 2, + DAY * 4, + DAY * 8, + DAY * 16, + WEEK, + WEEK * 2, + WEEK * 10, + WEEK * 60, +]; + #[derive(Clone, Debug)] pub struct PreBinnedPatchRange { pub grid_spec: PreBinnedPatchGridSpec, @@ -340,7 +342,7 @@ impl PreBinnedPatchRange { if t <= bs { let bs = t; let ts1 = range.beg / bs * bs; - let ts2 = (range.end + bs - 1) / bs * bs; + let _ts2 = (range.end + bs - 1) / bs * bs; let count = range.delta() / bs; let offset = ts1 / bs; break Some(Self { @@ -410,6 +412,7 @@ impl PreBinnedPatchCoord { pub struct PreBinnedPatchIterator { range: PreBinnedPatchRange, + #[allow(dead_code)] agg_kind: AggKind, ix: u64, } diff --git a/retrieval/src/bin/retrieval.rs b/retrieval/src/bin/retrieval.rs index 3fbea16..3a1a66d 100644 --- a/retrieval/src/bin/retrieval.rs +++ b/retrieval/src/bin/retrieval.rs @@ -37,7 +37,8 @@ fn simple_fetch() { id: 0, host: "localhost".into(), port: 8360, - data_base_path: todo!(), + port_raw: 8360 + 100, + data_base_path: err::todoval(), ksprefix: "daq_swissfel".into(), split: 0, }; @@ -52,7 +53,7 @@ fn simple_fetch() { time_bin_size: DAY, array: true, scalar_type: ScalarType::F64, - shape: Shape::Wave(todo!()), + shape: Shape::Wave(err::todoval()), big_endian: true, compression: true, }, @@ -63,12 +64,12 @@ fn simple_fetch() { let cluster = Cluster { nodes: vec![node] }; let cluster = Arc::new(cluster); let node_config = NodeConfig { - cluster: cluster, node: cluster.nodes[0].clone(), + cluster: cluster, }; let node_config = Arc::new(node_config); let query_string = serde_json::to_string(&query).unwrap(); - let _host = tokio::spawn(httpret::host(node_config)); + let host = tokio::spawn(httpret::host(node_config)); let req = hyper::Request::builder() .method(http::Method::POST) .uri("http://localhost:8360/api/1/parsed_raw") @@ -102,6 +103,7 @@ fn simple_fetch() { ntot / 1024 / 1024, throughput ); + drop(host); //Err::<(), _>(format!("test error").into()) Ok(()) }) diff --git a/retrieval/src/lib.rs b/retrieval/src/lib.rs index c1e5102..9490ebd 100644 --- a/retrieval/src/lib.rs +++ b/retrieval/src/lib.rs @@ -1,83 +1,15 @@ use err::Error; -use hyper::Body; -use netpod::{Cluster, Node, NodeConfig}; +use netpod::{Cluster, NodeConfig}; use std::sync::Arc; use tokio::task::JoinHandle; #[allow(unused_imports)] use tracing::{debug, error, info, trace, warn}; pub mod cli; - -#[test] -fn get_cached_0() { - taskrun::run(get_cached_0_inner()).unwrap(); -} - #[cfg(test)] -async fn get_cached_0_inner() -> Result<(), Error> { - let t1 = chrono::Utc::now(); - let cluster = Arc::new(test_cluster()); - let node0 = &cluster.nodes[0]; - let hosts = spawn_test_hosts(cluster.clone()); - let req = hyper::Request::builder() - .method(http::Method::GET) - .uri(format!( - "http://{}:{}/api/1/binned?channel_backend=testbackend&channel_keyspace=3&channel_name=wave1&bin_count=4&beg_date=1970-01-01T00:00:10.000Z&end_date=1970-01-01T00:00:51.000Z", - node0.host, node0.port - )) - .body(Body::empty())?; - let client = hyper::Client::new(); - let res = client.request(req).await?; - info!("client response {:?}", res); - let mut res_body = res.into_body(); - use hyper::body::HttpBody; - let mut ntot = 0 as u64; - loop { - match res_body.data().await { - Some(Ok(k)) => { - //info!("packet.. len {}", k.len()); - ntot += k.len() as u64; - } - Some(Err(e)) => { - error!("{:?}", e); - } - None => { - info!("response stream exhausted"); - break; - } - } - } - let t2 = chrono::Utc::now(); - let ms = t2.signed_duration_since(t1).num_milliseconds() as u64; - let throughput = ntot / 1024 * 1000 / ms; - info!( - "get_cached_0 DONE total download {} MB throughput {:5} kB/s", - ntot / 1024 / 1024, - throughput - ); - //Err::<(), _>(format!("test error").into()) - Ok(()) -} +pub mod test; -fn test_cluster() -> Cluster { - let nodes = (0..1) - .into_iter() - .map(|id| { - let node = Node { - id, - host: "localhost".into(), - port: 8360 + id as u16, - data_base_path: format!("../tmpdata/node{:02}", id).into(), - ksprefix: "ks".into(), - split: 0, - }; - Arc::new(node) - }) - .collect(); - Cluster { nodes: nodes } -} - -fn spawn_test_hosts(cluster: Arc) -> Vec>> { +pub fn spawn_test_hosts(cluster: Arc) -> Vec>> { let mut ret = vec![]; for node in &cluster.nodes { let node_config = NodeConfig { diff --git a/retrieval/src/test.rs b/retrieval/src/test.rs new file mode 100644 index 0000000..37ed6ff --- /dev/null +++ b/retrieval/src/test.rs @@ -0,0 +1,77 @@ +use crate::spawn_test_hosts; +use err::Error; +use hyper::Body; +use netpod::{Cluster, Node}; +use std::sync::Arc; +#[allow(unused_imports)] +use tracing::{debug, error, info, trace, warn}; + +fn test_cluster() -> Cluster { + let nodes = (0..1) + .into_iter() + .map(|id| { + let node = Node { + id, + host: "localhost".into(), + port: 8360 + id as u16, + port_raw: 8360 + id as u16 + 100, + data_base_path: format!("../tmpdata/node{:02}", id).into(), + ksprefix: "ks".into(), + split: 0, + }; + Arc::new(node) + }) + .collect(); + Cluster { nodes: nodes } +} + +#[test] +fn get_cached_0() { + taskrun::run(get_cached_0_inner()).unwrap(); +} + +async fn get_cached_0_inner() -> Result<(), Error> { + let t1 = chrono::Utc::now(); + let cluster = Arc::new(test_cluster()); + let node0 = &cluster.nodes[0]; + let hosts = spawn_test_hosts(cluster.clone()); + let req = hyper::Request::builder() + .method(http::Method::GET) + .uri(format!( + "http://{}:{}/api/1/binned?channel_backend=testbackend&channel_keyspace=3&channel_name=wave1&bin_count=4&beg_date=1970-01-01T00:00:10.000Z&end_date=1970-01-01T00:00:51.000Z", + node0.host, node0.port + )) + .body(Body::empty())?; + let client = hyper::Client::new(); + let res = client.request(req).await?; + info!("client response {:?}", res); + let mut res_body = res.into_body(); + use hyper::body::HttpBody; + let mut ntot = 0 as u64; + loop { + match res_body.data().await { + Some(Ok(k)) => { + //info!("packet.. len {}", k.len()); + ntot += k.len() as u64; + } + Some(Err(e)) => { + error!("{:?}", e); + } + None => { + info!("response stream exhausted"); + break; + } + } + } + let t2 = chrono::Utc::now(); + let ms = t2.signed_duration_since(t1).num_milliseconds() as u64; + let throughput = ntot / 1024 * 1000 / ms; + info!( + "get_cached_0 DONE total download {} MB throughput {:5} kB/s", + ntot / 1024 / 1024, + throughput + ); + drop(hosts); + //Err::<(), _>(format!("test error").into()) + Ok(()) +} diff --git a/taskrun/src/lib.rs b/taskrun/src/lib.rs index 672f1b4..232c1ac 100644 --- a/taskrun/src/lib.rs +++ b/taskrun/src/lib.rs @@ -12,7 +12,7 @@ pub fn run>>(f: F) -> Result .max_blocking_threads(256) .enable_all() .on_thread_start(|| { - let old = panic::take_hook(); + let _old = panic::take_hook(); panic::set_hook(Box::new(move |info| { error!( "✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗ panicking\n{:?}\nLOCATION: {:?}\nPAYLOAD: {:?}",