From 55376b440583c90fd7653a6f27e31663dc1ef997 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 16 Apr 2021 14:52:24 +0200 Subject: [PATCH] Change max_width --- disk/src/agg.rs | 28 ++++++----------- disk/src/cache.rs | 63 ++++++++------------------------------- disk/src/channelconfig.rs | 8 ++--- disk/src/gen.rs | 40 ++++--------------------- disk/src/lib.rs | 44 ++++++--------------------- disk/src/raw.rs | 3 +- httpret/src/lib.rs | 25 ++++------------ netpod/src/lib.rs | 24 ++++----------- retrieval/src/lib.rs | 9 ++++-- rustfmt.toml | 2 ++ taskrun/src/lib.rs | 33 ++++++++++---------- 11 files changed, 77 insertions(+), 202 deletions(-) diff --git a/disk/src/agg.rs b/disk/src/agg.rs index 8a41c9f..f35a0e7 100644 --- a/disk/src/agg.rs +++ b/disk/src/agg.rs @@ -287,16 +287,8 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator { } fn result(self) -> Self::OutputValue { - let min = if self.min == f32::MAX { - f32::NAN - } else { - self.min - }; - let max = if self.max == f32::MIN { - f32::NAN - } else { - self.max - }; + let min = if self.min == f32::MAX { f32::NAN } else { self.min }; + let max = if self.max == f32::MIN { f32::NAN } else { self.max }; let avg = if self.count == 0 { f32::NAN } else { @@ -381,7 +373,11 @@ pub struct MinMaxAvgScalarBinSingle { impl std::fmt::Debug for MinMaxAvgScalarBinSingle { fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(fmt, "MinMaxAvgScalarBinSingle ts1 {} ts2 {} count {} min {:7.2e} max {:7.2e} avg {:7.2e}", self.ts1, self.ts2, self.count, self.min, self.max, self.avg) + write!( + fmt, + "MinMaxAvgScalarBinSingle ts1 {} ts2 {} count {} min {:7.2e} max {:7.2e} avg {:7.2e}", + self.ts1, self.ts2, self.count, self.min, self.max, self.avg + ) } } @@ -902,8 +898,7 @@ async fn merge_0_inner() { .map(|k| make_test_node(k)) .map(|node| { let node = Arc::new(node); - crate::EventBlobsComplete::new(&query, query.channel_config.clone(), node) - .into_dim_1_f32_stream() + crate::EventBlobsComplete::new(&query, query.channel_config.clone(), node).into_dim_1_f32_stream() }) .collect(); MergeDim1F32Stream::new(streams) @@ -917,12 +912,7 @@ async fn merge_0_inner() { pub fn tmp_some_older_things() { let vals = ValuesDim1 { tss: vec![0, 1, 2, 3], - values: vec![ - vec![0., 0., 0.], - vec![1., 1., 1.], - vec![2., 2., 2.], - vec![3., 3., 3.], - ], + values: vec![vec![0., 0., 0.], vec![1., 1., 1.], vec![2., 2., 2.], vec![3., 3., 3.]], }; // I want to distinguish already in the outer part between dim-0 and dim-1 and generate // separate code for these cases... diff --git a/disk/src/cache.rs b/disk/src/cache.rs index d52d1c3..0ff2549 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -6,8 +6,8 @@ use futures_core::Stream; use futures_util::{pin_mut, FutureExt, StreamExt, TryFutureExt}; use http::uri::Scheme; use netpod::{ - AggKind, Channel, Cluster, NanoRange, Node, NodeConfig, PreBinnedPatchCoord, - PreBinnedPatchGridSpec, PreBinnedPatchIterator, PreBinnedPatchRange, ToNanos, + AggKind, Channel, Cluster, NanoRange, Node, NodeConfig, PreBinnedPatchCoord, PreBinnedPatchGridSpec, + PreBinnedPatchIterator, PreBinnedPatchRange, ToNanos, }; use serde::{Deserialize, Serialize}; use std::future::{ready, Future}; @@ -30,12 +30,8 @@ pub struct Query { impl Query { pub fn from_request(req: &http::request::Parts) -> Result { let params = netpod::query_params(req.uri.query()); - let beg_date = params - .get("beg_date") - .ok_or(Error::with_msg("missing beg_date"))?; - let end_date = params - .get("end_date") - .ok_or(Error::with_msg("missing end_date"))?; + let beg_date = params.get("beg_date").ok_or(Error::with_msg("missing beg_date"))?; + let end_date = params.get("end_date").ok_or(Error::with_msg("missing end_date"))?; let ret = Query { range: NanoRange { beg: beg_date.parse::>()?.to_nanos(), @@ -57,10 +53,7 @@ impl Query { } } -pub fn binned_bytes_for_http( - node_config: Arc, - query: &Query, -) -> Result { +pub fn binned_bytes_for_http(node_config: Arc, query: &Query) -> Result { let agg_kind = AggKind::DimXBins1; // TODO @@ -146,10 +139,7 @@ pub fn pre_binned_bytes_for_http( node_config: Arc, query: &PreBinnedQuery, ) -> Result { - info!( - "pre_binned_bytes_for_http {:?} {:?}", - query, node_config.node - ); + info!("pre_binned_bytes_for_http {:?} {:?}", query, node_config.node); let ret = PreBinnedValueByteStream::new( query.patch.clone(), query.channel.clone(), @@ -164,12 +154,7 @@ pub struct PreBinnedValueByteStream { } impl PreBinnedValueByteStream { - pub fn new( - patch: PreBinnedPatchCoord, - channel: Channel, - agg_kind: AggKind, - node_config: Arc, - ) -> Self { + pub fn new(patch: PreBinnedPatchCoord, channel: Channel, agg_kind: AggKind, node_config: Arc) -> Self { warn!("PreBinnedValueByteStream"); Self { inp: PreBinnedValueStream::new(patch, channel, agg_kind, node_config), @@ -200,8 +185,7 @@ pub struct PreBinnedValueStream { channel: Channel, agg_kind: AggKind, node_config: Arc, - open_check_local_file: - Option> + Send>>>, + open_check_local_file: Option> + Send>>>, fut2: Option> + Send>>>, } @@ -225,10 +209,7 @@ impl PreBinnedValueStream { } fn try_setup_fetch_prebinned_higher_res(&mut self) { - info!( - "try to find a next better granularity for {:?}", - self.patch_coord - ); + info!("try to find a next better granularity for {:?}", self.patch_coord); let g = self.patch_coord.bin_t_len(); let range = NanoRange { beg: self.patch_coord.patch_beg(), @@ -255,12 +236,7 @@ impl PreBinnedValueStream { let mut patch_it = PreBinnedPatchIterator::from_range(range); let s = futures_util::stream::iter(patch_it) .map(move |coord| { - PreBinnedValueFetchedStream::new( - coord, - channel.clone(), - agg_kind.clone(), - node_config.clone(), - ) + PreBinnedValueFetchedStream::new(coord, channel.clone(), agg_kind.clone(), node_config.clone()) }) .flatten() .map(move |k| { @@ -315,11 +291,7 @@ impl Stream for PreBinnedValueStream { use std::os::unix::fs::OpenOptionsExt; let mut opts = std::fs::OpenOptions::new(); opts.read(true); - let fut = async { - tokio::fs::OpenOptions::from(opts) - .open("/DOESNOTEXIST") - .await - }; + let fut = async { tokio::fs::OpenOptions::from(opts).open("/DOESNOTEXIST").await }; self.open_check_local_file = Some(Box::pin(fut)); continue 'outer; }; @@ -431,12 +403,7 @@ impl BinnedStream { let mut patch_it = patch_it; let inp = futures_util::stream::iter(patch_it) .map(move |coord| { - PreBinnedValueFetchedStream::new( - coord, - channel.clone(), - agg_kind.clone(), - node_config.clone(), - ) + PreBinnedValueFetchedStream::new(coord, channel.clone(), agg_kind.clone(), node_config.clone()) }) .flatten() .map(|k| { @@ -471,11 +438,7 @@ impl From for Bytes { } } -pub fn node_ix_for_patch( - patch_coord: &PreBinnedPatchCoord, - channel: &Channel, - cluster: &Cluster, -) -> u32 { +pub fn node_ix_for_patch(patch_coord: &PreBinnedPatchCoord, channel: &Channel, cluster: &Cluster) -> u32 { let mut hash = tiny_keccak::Sha3::v256(); hash.update(channel.backend.as_bytes()); hash.update(channel.name.as_bytes()); diff --git a/disk/src/channelconfig.rs b/disk/src/channelconfig.rs index 197c74b..6fa2c26 100644 --- a/disk/src/channelconfig.rs +++ b/disk/src/channelconfig.rs @@ -242,9 +242,8 @@ pub fn parseConfig(inp: &[u8]) -> Result { #[cfg(test)] fn read_data() -> Vec { use std::io::Read; - let mut f1 = - std::fs::File::open("ks/config/S10CB01-RLOD100-PUP10:SIG-AMPLT/latest/00000_Config") - .unwrap(); + let path = "ks/config/S10CB01-RLOD100-PUP10:SIG-AMPLT/latest/00000_Config"; + let mut f1 = std::fs::File::open(path).unwrap(); let mut buf = vec![]; f1.read_to_end(&mut buf).unwrap(); buf @@ -252,8 +251,7 @@ fn read_data() -> Vec { #[test] fn parse_dummy() { - let config = - parseConfig(&[0, 0, 0, 0, 0, 11, 0x61, 0x62, 0x63, 0, 0, 0, 11, 0, 0, 0, 1]).unwrap(); + let config = parseConfig(&[0, 0, 0, 0, 0, 11, 0x61, 0x62, 0x63, 0, 0, 0, 11, 0, 0, 0, 1]).unwrap(); assert_eq!(0, config.formatVersion); assert_eq!("abc", config.channelName); } diff --git a/disk/src/gen.rs b/disk/src/gen.rs index 9b8bbcd..d66f101 100644 --- a/disk/src/gen.rs +++ b/disk/src/gen.rs @@ -88,10 +88,7 @@ async fn gen_node(node: &Node, ensemble: &Ensemble) -> Result<(), Error> { } async fn gen_channel(chn: &ChannelGenProps, node: &Node, ensemble: &Ensemble) -> Result<(), Error> { - let config_path = node - .data_base_path - .join("config") - .join(&chn.config.channel.name); + let config_path = node.data_base_path.join("config").join(&chn.config.channel.name); let channel_path = node .data_base_path .join(format!("{}_{}", node.ksprefix, chn.config.keyspace)) @@ -104,28 +101,14 @@ async fn gen_channel(chn: &ChannelGenProps, node: &Node, ensemble: &Ensemble) -> let mut evix = 0; let mut ts = 0; while ts < DAY { - let res = gen_timebin( - evix, - ts, - chn.time_spacing, - &channel_path, - &chn.config, - node, - ensemble, - ) - .await?; + let res = gen_timebin(evix, ts, chn.time_spacing, &channel_path, &chn.config, node, ensemble).await?; evix = res.evix; ts = res.ts; } Ok(()) } -async fn gen_config( - config_path: &Path, - config: &ChannelConfig, - node: &Node, - ensemble: &Ensemble, -) -> Result<(), Error> { +async fn gen_config(config_path: &Path, config: &ChannelConfig, node: &Node, ensemble: &Ensemble) -> Result<(), Error> { let path = config_path.join("latest"); tokio::fs::create_dir_all(&path).await?; let path = path.join("00000_Config"); @@ -248,12 +231,7 @@ async fn gen_datafile_header(file: &mut File, config: &ChannelConfig) -> Result< Ok(()) } -async fn gen_event( - file: &mut File, - evix: u64, - ts: u64, - config: &ChannelConfig, -) -> Result<(), Error> { +async fn gen_event(file: &mut File, evix: u64, ts: u64, config: &ChannelConfig) -> Result<(), Error> { let mut buf = BytesMut::with_capacity(1024 * 16); buf.put_i32(0xcafecafe as u32 as i32); buf.put_u64(0xcafecafe); @@ -286,14 +264,8 @@ async fn gen_event( std::io::Write::write_all(&mut c1, &a)?; } let mut comp = vec![0u8; (ele_size * ele_count + 64) as usize]; - let n1 = bitshuffle_compress( - &vals, - &mut comp, - ele_count as usize, - ele_size as usize, - 0, - ) - .unwrap(); + let n1 = + bitshuffle_compress(&vals, &mut comp, ele_count as usize, ele_size as usize, 0).unwrap(); buf.put_u64(vals.len() as u64); let comp_block_size = 0; buf.put_u32(comp_block_size); diff --git a/disk/src/lib.rs b/disk/src/lib.rs index 16dad7a..7e0083d 100644 --- a/disk/src/lib.rs +++ b/disk/src/lib.rs @@ -13,6 +13,7 @@ use std::sync::Arc; use std::task::{Context, Poll}; use tokio::fs::{File, OpenOptions}; use tokio::io::AsyncRead; + #[allow(unused_imports)] use tracing::{debug, error, info, trace, warn}; @@ -23,10 +24,7 @@ pub mod gen; pub mod merge; pub mod raw; -pub async fn read_test_1( - query: &netpod::AggQuerySingleChannel, - node: Arc, -) -> Result { +pub async fn read_test_1(query: &netpod::AggQuerySingleChannel, node: Arc) -> Result { let path = datapath(query.timebin as u64, &query.channel_config, &node); debug!("try path: {:?}", path); let fin = OpenOptions::new().read(true).open(path).await?; @@ -387,11 +385,7 @@ pub struct EventBlobsComplete { } impl EventBlobsComplete { - pub fn new( - query: &netpod::AggQuerySingleChannel, - channel_config: ChannelConfig, - node: Arc, - ) -> Self { + pub fn new(query: &netpod::AggQuerySingleChannel, channel_config: ChannelConfig, node: Arc) -> Self { Self { file_chan: open_files(query, node), evs: None, @@ -419,8 +413,7 @@ impl Stream for EventBlobsComplete { None => match self.file_chan.poll_next_unpin(cx) { Ready(Some(k)) => match k { Ok(file) => { - let inp = - Box::pin(file_content_stream(file, self.buffer_size as usize)); + let inp = Box::pin(file_content_stream(file, self.buffer_size as usize)); let mut chunker = EventChunker::new(inp, self.channel_config.clone()); self.evs.replace(chunker); continue 'outer; @@ -531,9 +524,7 @@ impl EventChunker { sl.advance(len as usize - 8); let len2 = sl.read_i32::().unwrap(); assert!(len == len2, "len mismatch"); - let s1 = - String::from_utf8(buf.as_ref()[6..(len as usize + 6 - 8)].to_vec()) - .unwrap(); + let s1 = String::from_utf8(buf.as_ref()[6..(len as usize + 6 - 8)].to_vec()).unwrap(); info!("channel name {} len {} len2 {}", s1, len, len2); self.state = DataFileState::Event; need_min = 4; @@ -583,11 +574,7 @@ impl EventChunker { if let Shape::Wave(_) = self.channel_config.shape { assert!(is_array); } - let compression_method = if is_compressed { - sl.read_u8().unwrap() - } else { - 0 - }; + let compression_method = if is_compressed { sl.read_u8().unwrap() } else { 0 }; let shape_dim = if is_shaped { sl.read_u8().unwrap() } else { 0 }; assert!(compression_method <= 0); assert!(!is_shaped || (shape_dim >= 1 && shape_dim <= 2)); @@ -630,12 +617,7 @@ impl EventChunker { .unwrap(); //debug!("decompress result c1 {} k1 {}", c1, k1); assert!(c1 as u32 == k1); - ret.add_event( - ts, - pulse, - Some(decomp), - ScalarType::from_dtype_index(type_index), - ); + ret.add_event(ts, pulse, Some(decomp), ScalarType::from_dtype_index(type_index)); } else { todo!() } @@ -733,13 +715,7 @@ impl EventFull { } } - fn add_event( - &mut self, - ts: u64, - pulse: u64, - decomp: Option, - scalar_type: ScalarType, - ) { + fn add_event(&mut self, ts: u64, pulse: u64, decomp: Option, scalar_type: ScalarType) { self.tss.push(ts); self.pulses.push(pulse); self.decomps.push(decomp); @@ -916,9 +892,7 @@ pub struct RawConcatChannelReader { impl RawConcatChannelReader { pub fn read(self) -> Result { - let res = netpod::BodyStream { - inner: Box::new(self), - }; + let res = netpod::BodyStream { inner: Box::new(self) }; Ok(res) } } diff --git a/disk/src/raw.rs b/disk/src/raw.rs index 36b2a5c..265457c 100644 --- a/disk/src/raw.rs +++ b/disk/src/raw.rs @@ -34,6 +34,5 @@ async fn local_unpacked_test() { // TODO find the matching config entry. (bonus: fuse consecutive compatible entries) use crate::agg::IntoDim1F32Stream; - let stream = crate::EventBlobsComplete::new(&query, query.channel_config.clone(), node) - .into_dim_1_f32_stream(); + let stream = crate::EventBlobsComplete::new(&query, query.channel_config.clone(), node).into_dim_1_f32_stream(); } diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index 6666c62..423758b 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -41,10 +41,7 @@ pub async fn host(node_config: Arc) -> Result<(), Error> { Ok(()) } -async fn data_api_proxy( - req: Request, - node_config: Arc, -) -> Result, Error> { +async fn data_api_proxy(req: Request, node_config: Arc) -> Result, Error> { match data_api_proxy_try(req, node_config).await { Ok(k) => Ok(k), Err(e) => { @@ -84,10 +81,7 @@ where impl UnwindSafe for Cont {} -async fn data_api_proxy_try( - req: Request, - node_config: Arc, -) -> Result, Error> { +async fn data_api_proxy_try(req: Request, node_config: Arc) -> Result, Error> { let uri = req.uri().clone(); let path = uri.path(); if path == "/api/1/parsed_raw" { @@ -155,10 +149,7 @@ impl hyper::body::HttpBody for BodyStreamWrap { type Data = bytes::Bytes; type Error = Error; - fn poll_data( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - ) -> Poll>> { + fn poll_data(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll>> { /* use futures_core::stream::Stream; let z: &mut async_channel::Receiver> = &mut self.0.receiver; @@ -170,10 +161,7 @@ impl hyper::body::HttpBody for BodyStreamWrap { todo!() } - fn poll_trailers( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { + fn poll_trailers(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll, Self::Error>> { Poll::Ready(Ok(None)) } } @@ -247,10 +235,7 @@ async fn binned(req: Request, node_config: Arc) -> Result, - node_config: Arc, -) -> Result, Error> { +async fn prebinned(req: Request, node_config: Arc) -> Result, Error> { info!("-------------------------------------------------------- PRE-BINNED"); let (head, body) = req.into_parts(); let q = PreBinnedQuery::from_request(&head)?; diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index 7801775..fcb0472 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -131,18 +131,9 @@ impl Channel { #[derive(Clone, Debug, Serialize, Deserialize)] pub enum TimeRange { - Time { - beg: DateTime, - end: DateTime, - }, - Pulse { - beg: u64, - end: u64, - }, - Nano { - beg: u64, - end: u64, - }, + Time { beg: DateTime, end: DateTime }, + Pulse { beg: u64, end: u64 }, + Nano { beg: u64, end: u64 }, } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -283,10 +274,7 @@ impl PreBinnedPatchGridSpec { } } if !ok { - panic!( - "invalid bin_t_len for PreBinnedPatchGridSpec {}", - bin_t_len - ); + panic!("invalid bin_t_len for PreBinnedPatchGridSpec {}", bin_t_len); } Self { bin_t_len } } @@ -296,9 +284,7 @@ impl PreBinnedPatchGridSpec { if !Self::is_valid_bin_t_len(bin_t_len) { panic!("invalid bin_t_len {}", bin_t_len); } - Self { - bin_t_len: bin_t_len, - } + Self { bin_t_len: bin_t_len } } pub fn bin_t_len(&self) -> u64 { diff --git a/retrieval/src/lib.rs b/retrieval/src/lib.rs index 432d4ae..c1e5102 100644 --- a/retrieval/src/lib.rs +++ b/retrieval/src/lib.rs @@ -20,9 +20,12 @@ async fn get_cached_0_inner() -> Result<(), Error> { let node0 = &cluster.nodes[0]; let hosts = spawn_test_hosts(cluster.clone()); let req = hyper::Request::builder() - .method(http::Method::GET) - .uri(format!("http://{}:{}/api/1/binned?channel_backend=testbackend&channel_keyspace=3&channel_name=wave1&bin_count=4&beg_date=1970-01-01T00:00:10.000Z&end_date=1970-01-01T00:00:51.000Z", node0.host, node0.port)) - .body(Body::empty())?; + .method(http::Method::GET) + .uri(format!( + "http://{}:{}/api/1/binned?channel_backend=testbackend&channel_keyspace=3&channel_name=wave1&bin_count=4&beg_date=1970-01-01T00:00:10.000Z&end_date=1970-01-01T00:00:51.000Z", + node0.host, node0.port + )) + .body(Body::empty())?; let client = hyper::Client::new(); let res = client.request(req).await?; info!("client response {:?}", res); diff --git a/rustfmt.toml b/rustfmt.toml index 0fbfc4e..20b3f0e 100644 --- a/rustfmt.toml +++ b/rustfmt.toml @@ -1,3 +1,5 @@ +max_width = 120 #unstable_features = true +#comment_width = 120 #empty_item_single_line = false #control_brace_style = "ClosingNextLine" diff --git a/taskrun/src/lib.rs b/taskrun/src/lib.rs index f4137a6..672f1b4 100644 --- a/taskrun/src/lib.rs +++ b/taskrun/src/lib.rs @@ -8,21 +8,24 @@ use tracing::{debug, error, info, trace, warn}; pub fn run>>(f: F) -> Result { tracing_init(); tokio::runtime::Builder::new_multi_thread() - .worker_threads(12) - .max_blocking_threads(256) - .enable_all() - .on_thread_start(|| { - let old = panic::take_hook(); - panic::set_hook(Box::new(move |info| { - error!("✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗ panicking\n{:?}\nLOCATION: {:?}\nPAYLOAD: {:?}", backtrace::Backtrace::new(), info.location(), info.payload()); - //old(info); - })); - }) - .build() - .unwrap() - .block_on(async { - f.await - }) + .worker_threads(12) + .max_blocking_threads(256) + .enable_all() + .on_thread_start(|| { + let old = panic::take_hook(); + panic::set_hook(Box::new(move |info| { + error!( + "✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗ panicking\n{:?}\nLOCATION: {:?}\nPAYLOAD: {:?}", + backtrace::Backtrace::new(), + info.location(), + info.payload() + ); + //old(info); + })); + }) + .build() + .unwrap() + .block_on(async { f.await }) } pub fn tracing_init() {