diff --git a/daqbuffer/src/bin/daqbuffer.rs b/daqbuffer/src/bin/daqbuffer.rs index 392a95a..e44be98 100644 --- a/daqbuffer/src/bin/daqbuffer.rs +++ b/daqbuffer/src/bin/daqbuffer.rs @@ -52,7 +52,7 @@ async fn go() -> Result<(), Error> { SubCmd::Retrieval(subcmd) => { info!("daqbuffer {}", clap::crate_version!()); let mut config_file = File::open(subcmd.config).await?; - let mut buf = vec![]; + let mut buf = Vec::new(); config_file.read_to_end(&mut buf).await?; let node_config: NodeConfig = serde_json::from_slice(&buf)?; let node_config: Result = node_config.into(); @@ -62,7 +62,7 @@ async fn go() -> Result<(), Error> { SubCmd::Proxy(subcmd) => { info!("daqbuffer proxy {}", clap::crate_version!()); let mut config_file = File::open(subcmd.config).await?; - let mut buf = vec![]; + let mut buf = Vec::new(); config_file.read_to_end(&mut buf).await?; let proxy_config: ProxyConfig = serde_json::from_slice(&buf)?; daqbufp2::run_proxy(proxy_config.clone()).await?; diff --git a/daqbufp2/src/daqbufp2.rs b/daqbufp2/src/daqbufp2.rs index cb5cdf9..49b1d15 100644 --- a/daqbufp2/src/daqbufp2.rs +++ b/daqbufp2/src/daqbufp2.rs @@ -10,7 +10,7 @@ use netpod::{Cluster, NodeConfig, NodeConfigCached, ProxyConfig}; use tokio::task::JoinHandle; pub fn spawn_test_hosts(cluster: Cluster) -> Vec>> { - let mut ret = vec![]; + let mut ret = Vec::new(); for node in &cluster.nodes { let node_config = NodeConfig { cluster: cluster.clone(), diff --git a/dbconn/src/dbconn.rs b/dbconn/src/dbconn.rs index 02cfde4..11ab765 100644 --- a/dbconn/src/dbconn.rs +++ b/dbconn/src/dbconn.rs @@ -120,7 +120,7 @@ pub async fn table_sizes(node_config: &NodeConfigCached) -> Result { *pself.channel_inp_done = true; // Work through the collected items - let l = std::mem::replace(pself.clist, vec![]); + let l = std::mem::replace(pself.clist, Vec::new()); let fut = update_db_with_channel_name_list( l, pself.ident.as_ref().unwrap().facility, diff --git a/dbconn/src/search.rs b/dbconn/src/search.rs index f8beb78..79057c8 100644 --- a/dbconn/src/search.rs +++ b/dbconn/src/search.rs @@ -26,7 +26,7 @@ pub async fn search_channel_databuffer( true }; if empty { - let ret = ChannelSearchResult { channels: vec![] }; + let ret = ChannelSearchResult { channels: Vec::new() }; return Ok(ret); } let sql = format!(concat!( @@ -42,14 +42,14 @@ pub async fn search_channel_databuffer( ) .await .err_conv()?; - let mut res = vec![]; + let mut res = Vec::new(); for row in rows { let shapedb: Option = row.get(4); let shape = match &shapedb { Some(top) => match top { - serde_json::Value::Null => vec![], + serde_json::Value::Null => Vec::new(), serde_json::Value::Array(items) => { - let mut a = vec![]; + let mut a = Vec::new(); for item in items { match item { serde_json::Value::Number(n) => match n.as_i64() { @@ -65,7 +65,7 @@ pub async fn search_channel_databuffer( } _ => return Err(Error::with_msg(format!("can not understand shape {:?}", shapedb))), }, - None => vec![], + None => Vec::new(), }; let ty: String = row.get(3); let k = ChannelSearchSingleResult { @@ -149,7 +149,7 @@ pub async fn search_channel_archeng( false }; if empty { - let ret = ChannelSearchResult { channels: vec![] }; + let ret = ChannelSearchResult { channels: Vec::new() }; return Ok(ret); } let sql = format!(concat!( @@ -161,7 +161,7 @@ pub async fn search_channel_archeng( )); let cl = create_connection(database).await?; let rows = cl.query(sql.as_str(), &[&query.name_regex]).await.err_conv()?; - let mut res = vec![]; + let mut res = Vec::new(); for row in rows { let name: String = row.get(0); let config: JsVal = row.get(1); @@ -189,7 +189,7 @@ pub async fn search_channel_archeng( Some(k) => match k { JsVal::String(k) => { if k == "Scalar" { - vec![] + Vec::new() } else { return Err(Error::with_msg_no_trace(format!( "search_channel_archeng can not understand {:?}", @@ -223,7 +223,7 @@ pub async fn search_channel_archeng( ))); } }, - None => vec![], + None => Vec::new(), }; let k = ChannelSearchSingleResult { backend: backend.clone(), diff --git a/disk/src/agg.rs b/disk/src/agg.rs index 223ee3c..f4b0d43 100644 --- a/disk/src/agg.rs +++ b/disk/src/agg.rs @@ -1,127 +1 @@ -//! Aggregation and binning support. - -use bytes::BytesMut; -use err::Error; -use netpod::ScalarType; -use serde::{Deserialize, Serialize}; -use std::time::Duration; - -pub mod binnedt; pub mod enp; -pub mod scalarbinbatch; -pub mod streams; - -#[derive(Debug, Serialize, Deserialize)] -pub struct ValuesExtractStats { - pub dur: Duration, -} - -impl ValuesExtractStats { - pub fn new() -> Self { - Self { - dur: Duration::default(), - } - } - pub fn trans(self: &mut Self, k: &mut Self) { - self.dur += k.dur; - k.dur = Duration::default(); - } -} - -/// Batch of events with a numeric one-dimensional (i.e. array) value. -pub struct ValuesDim1 { - pub tss: Vec, - pub values: Vec>, -} - -impl ValuesDim1 { - pub fn empty() -> Self { - Self { - tss: vec![], - values: vec![], - } - } -} - -impl std::fmt::Debug for ValuesDim1 { - fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { - write!( - fmt, - "count {} tsA {:?} tsB {:?}", - self.tss.len(), - self.tss.first(), - self.tss.last() - ) - } -} - -trait NumEx { - const BY: usize; -} - -struct NumF32; -impl NumEx for NumF32 { - const BY: usize = 4; -} - -macro_rules! make_get_values { - ($n:ident, $TY:ident, $FROM_BYTES:ident, $BY:expr) => { - #[allow(unused)] - fn $n(decomp: &BytesMut, ty: &ScalarType) -> Result, Error> { - let n1 = decomp.len(); - if ty.bytes() as usize != $BY { - Err(Error::with_msg(format!( - "ty.bytes() != BY {} vs {}", - ty.bytes(), - $BY - )))?; - } - if n1 % ty.bytes() as usize != 0 { - Err(Error::with_msg(format!( - "n1 % ty.bytes() as usize != 0 {} vs {}", - n1, - ty.bytes() - )))?; - } - let ele_count = n1 / ty.bytes() as usize; - let mut j = Vec::with_capacity(ele_count); - let mut p2 = j.as_mut_ptr(); - let mut p1 = 0; - for _ in 0..ele_count { - unsafe { - let mut r = [0u8; $BY]; - std::ptr::copy_nonoverlapping(&decomp[p1], r.as_mut_ptr(), $BY); - *p2 = $TY::$FROM_BYTES(r) as f32; - p1 += $BY; - p2 = p2.add(1); - }; - } - unsafe { - j.set_len(ele_count); - } - Ok(j) - } - }; -} - -make_get_values!(get_values_u8_le, u8, from_le_bytes, 1); -make_get_values!(get_values_u16_le, u16, from_le_bytes, 2); -make_get_values!(get_values_u32_le, u32, from_le_bytes, 4); -make_get_values!(get_values_u64_le, u64, from_le_bytes, 8); -make_get_values!(get_values_i8_le, i8, from_le_bytes, 1); -make_get_values!(get_values_i16_le, i16, from_le_bytes, 2); -make_get_values!(get_values_i32_le, i32, from_le_bytes, 4); -make_get_values!(get_values_i64_le, i64, from_le_bytes, 8); -make_get_values!(get_values_f32_le, f32, from_le_bytes, 4); -make_get_values!(get_values_f64_le, f64, from_le_bytes, 8); - -make_get_values!(get_values_u8_be, u8, from_be_bytes, 1); -make_get_values!(get_values_u16_be, u16, from_be_bytes, 2); -make_get_values!(get_values_u32_be, u32, from_be_bytes, 4); -make_get_values!(get_values_u64_be, u64, from_be_bytes, 8); -make_get_values!(get_values_i8_be, i8, from_be_bytes, 1); -make_get_values!(get_values_i16_be, i16, from_be_bytes, 2); -make_get_values!(get_values_i32_be, i32, from_be_bytes, 4); -make_get_values!(get_values_i64_be, i64, from_be_bytes, 8); -make_get_values!(get_values_f32_be, f32, from_be_bytes, 4); -make_get_values!(get_values_f64_be, f64, from_be_bytes, 8); diff --git a/items/src/frame.rs b/items/src/frame.rs index b485cfc..d80dffd 100644 --- a/items/src/frame.rs +++ b/items/src/frame.rs @@ -109,7 +109,6 @@ pub fn make_frame_2(item: &T, fty: u32) -> Result where T: erased_serde::Serialize, { - info!("make_frame_2 T = {} fty {:x}", std::any::type_name::(), fty); let mut out = Vec::new(); //let mut ser = rmp_serde::Serializer::new(&mut out).with_struct_map(); //let writer = ciborium::ser::into_writer(&item, &mut out).unwrap(); @@ -272,7 +271,6 @@ pub fn decode_frame(frame: &InMemoryFrame) -> Result where T: FrameDecodable, { - info!("decode_frame T = {}", std::any::type_name::()); if frame.encid() != INMEM_FRAME_ENCID { return Err(Error::with_msg(format!("unknown encoder id {:?}", frame))); } @@ -334,10 +332,7 @@ where ))) } else { match decode_from_slice(frame.buf()) { - Ok(item) => { - info!("decode_from_slice {} success", std::any::type_name::()); - Ok(item) - } + Ok(item) => Ok(item), Err(e) => { error!("decode_frame T = {}", std::any::type_name::()); error!("ERROR deserialize len {} tyid {:x}", frame.buf().len(), frame.tyid()); diff --git a/scyllaconn/src/events.rs b/scyllaconn/src/events.rs index fe4b280..955bd98 100644 --- a/scyllaconn/src/events.rs +++ b/scyllaconn/src/events.rs @@ -125,7 +125,7 @@ macro_rules! read_next_scalar_values { ret.push_front(ts, pulse, value); } } - info!("found in total {} events ts_msp {}", ret.tss.len(), ts_msp); + trace!("found in total {} events ts_msp {}", ret.tss.len(), ts_msp); Ok(ret) } }; @@ -166,7 +166,7 @@ macro_rules! read_next_array_values { ret.push(ts, pulse, value); } */ - info!("found in total {} events ts_msp {}", ret.tss.len(), ts_msp); + trace!("found in total {} events ts_msp {}", ret.tss.len(), ts_msp); Ok(ret) } }; @@ -193,7 +193,7 @@ macro_rules! read_values { let fut = fut.map(|x| match x { Ok(k) => { let self_name = std::any::type_name::(); - info!("{self_name} read values len {}", k.len()); + trace!("{self_name} read values len {}", k.len()); let b = Box::new(k) as Box; Ok(b) }