Remove legacy
This commit is contained in:
@@ -52,7 +52,7 @@ async fn go() -> Result<(), Error> {
|
||||
SubCmd::Retrieval(subcmd) => {
|
||||
info!("daqbuffer {}", clap::crate_version!());
|
||||
let mut config_file = File::open(subcmd.config).await?;
|
||||
let mut buf = vec![];
|
||||
let mut buf = Vec::new();
|
||||
config_file.read_to_end(&mut buf).await?;
|
||||
let node_config: NodeConfig = serde_json::from_slice(&buf)?;
|
||||
let node_config: Result<NodeConfigCached, Error> = node_config.into();
|
||||
@@ -62,7 +62,7 @@ async fn go() -> Result<(), Error> {
|
||||
SubCmd::Proxy(subcmd) => {
|
||||
info!("daqbuffer proxy {}", clap::crate_version!());
|
||||
let mut config_file = File::open(subcmd.config).await?;
|
||||
let mut buf = vec![];
|
||||
let mut buf = Vec::new();
|
||||
config_file.read_to_end(&mut buf).await?;
|
||||
let proxy_config: ProxyConfig = serde_json::from_slice(&buf)?;
|
||||
daqbufp2::run_proxy(proxy_config.clone()).await?;
|
||||
|
||||
@@ -10,7 +10,7 @@ use netpod::{Cluster, NodeConfig, NodeConfigCached, ProxyConfig};
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
pub fn spawn_test_hosts(cluster: Cluster) -> Vec<JoinHandle<Result<(), Error>>> {
|
||||
let mut ret = vec![];
|
||||
let mut ret = Vec::new();
|
||||
for node in &cluster.nodes {
|
||||
let node_config = NodeConfig {
|
||||
cluster: cluster.clone(),
|
||||
|
||||
@@ -120,7 +120,7 @@ pub async fn table_sizes(node_config: &NodeConfigCached) -> Result<TableSizes, E
|
||||
let sql = sql.as_str();
|
||||
let cl = create_connection(&node_config.node_config.cluster.database).await?;
|
||||
let rows = cl.query(sql, &[]).await.err_conv()?;
|
||||
let mut sizes = TableSizes { sizes: vec![] };
|
||||
let mut sizes = TableSizes { sizes: Vec::new() };
|
||||
sizes.sizes.push((format!("table"), format!("size")));
|
||||
for row in rows {
|
||||
sizes.sizes.push((row.get(0), row.get(1)));
|
||||
|
||||
@@ -208,7 +208,7 @@ impl UpdatedDbWithChannelNamesStream {
|
||||
find: None,
|
||||
update_batch: None,
|
||||
channel_inp_done: false,
|
||||
clist: vec![],
|
||||
clist: Vec::new(),
|
||||
};
|
||||
ret.client_fut = Some(Box::pin(create_connection(
|
||||
&ret.node_config_ref.node_config.cluster.database,
|
||||
@@ -243,7 +243,7 @@ impl Stream for UpdatedDbWithChannelNamesStream {
|
||||
Ready(None) => {
|
||||
*pself.channel_inp_done = true;
|
||||
// Work through the collected items
|
||||
let l = std::mem::replace(pself.clist, vec![]);
|
||||
let l = std::mem::replace(pself.clist, Vec::new());
|
||||
let fut = update_db_with_channel_name_list(
|
||||
l,
|
||||
pself.ident.as_ref().unwrap().facility,
|
||||
|
||||
@@ -26,7 +26,7 @@ pub async fn search_channel_databuffer(
|
||||
true
|
||||
};
|
||||
if empty {
|
||||
let ret = ChannelSearchResult { channels: vec![] };
|
||||
let ret = ChannelSearchResult { channels: Vec::new() };
|
||||
return Ok(ret);
|
||||
}
|
||||
let sql = format!(concat!(
|
||||
@@ -42,14 +42,14 @@ pub async fn search_channel_databuffer(
|
||||
)
|
||||
.await
|
||||
.err_conv()?;
|
||||
let mut res = vec![];
|
||||
let mut res = Vec::new();
|
||||
for row in rows {
|
||||
let shapedb: Option<serde_json::Value> = row.get(4);
|
||||
let shape = match &shapedb {
|
||||
Some(top) => match top {
|
||||
serde_json::Value::Null => vec![],
|
||||
serde_json::Value::Null => Vec::new(),
|
||||
serde_json::Value::Array(items) => {
|
||||
let mut a = vec![];
|
||||
let mut a = Vec::new();
|
||||
for item in items {
|
||||
match item {
|
||||
serde_json::Value::Number(n) => match n.as_i64() {
|
||||
@@ -65,7 +65,7 @@ pub async fn search_channel_databuffer(
|
||||
}
|
||||
_ => return Err(Error::with_msg(format!("can not understand shape {:?}", shapedb))),
|
||||
},
|
||||
None => vec![],
|
||||
None => Vec::new(),
|
||||
};
|
||||
let ty: String = row.get(3);
|
||||
let k = ChannelSearchSingleResult {
|
||||
@@ -149,7 +149,7 @@ pub async fn search_channel_archeng(
|
||||
false
|
||||
};
|
||||
if empty {
|
||||
let ret = ChannelSearchResult { channels: vec![] };
|
||||
let ret = ChannelSearchResult { channels: Vec::new() };
|
||||
return Ok(ret);
|
||||
}
|
||||
let sql = format!(concat!(
|
||||
@@ -161,7 +161,7 @@ pub async fn search_channel_archeng(
|
||||
));
|
||||
let cl = create_connection(database).await?;
|
||||
let rows = cl.query(sql.as_str(), &[&query.name_regex]).await.err_conv()?;
|
||||
let mut res = vec![];
|
||||
let mut res = Vec::new();
|
||||
for row in rows {
|
||||
let name: String = row.get(0);
|
||||
let config: JsVal = row.get(1);
|
||||
@@ -189,7 +189,7 @@ pub async fn search_channel_archeng(
|
||||
Some(k) => match k {
|
||||
JsVal::String(k) => {
|
||||
if k == "Scalar" {
|
||||
vec![]
|
||||
Vec::new()
|
||||
} else {
|
||||
return Err(Error::with_msg_no_trace(format!(
|
||||
"search_channel_archeng can not understand {:?}",
|
||||
@@ -223,7 +223,7 @@ pub async fn search_channel_archeng(
|
||||
)));
|
||||
}
|
||||
},
|
||||
None => vec![],
|
||||
None => Vec::new(),
|
||||
};
|
||||
let k = ChannelSearchSingleResult {
|
||||
backend: backend.clone(),
|
||||
|
||||
126
disk/src/agg.rs
126
disk/src/agg.rs
@@ -1,127 +1 @@
|
||||
//! Aggregation and binning support.
|
||||
|
||||
use bytes::BytesMut;
|
||||
use err::Error;
|
||||
use netpod::ScalarType;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::time::Duration;
|
||||
|
||||
pub mod binnedt;
|
||||
pub mod enp;
|
||||
pub mod scalarbinbatch;
|
||||
pub mod streams;
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct ValuesExtractStats {
|
||||
pub dur: Duration,
|
||||
}
|
||||
|
||||
impl ValuesExtractStats {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
dur: Duration::default(),
|
||||
}
|
||||
}
|
||||
pub fn trans(self: &mut Self, k: &mut Self) {
|
||||
self.dur += k.dur;
|
||||
k.dur = Duration::default();
|
||||
}
|
||||
}
|
||||
|
||||
/// Batch of events with a numeric one-dimensional (i.e. array) value.
|
||||
pub struct ValuesDim1 {
|
||||
pub tss: Vec<u64>,
|
||||
pub values: Vec<Vec<f32>>,
|
||||
}
|
||||
|
||||
impl ValuesDim1 {
|
||||
pub fn empty() -> Self {
|
||||
Self {
|
||||
tss: vec![],
|
||||
values: vec![],
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for ValuesDim1 {
|
||||
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
write!(
|
||||
fmt,
|
||||
"count {} tsA {:?} tsB {:?}",
|
||||
self.tss.len(),
|
||||
self.tss.first(),
|
||||
self.tss.last()
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
trait NumEx {
|
||||
const BY: usize;
|
||||
}
|
||||
|
||||
struct NumF32;
|
||||
impl NumEx for NumF32 {
|
||||
const BY: usize = 4;
|
||||
}
|
||||
|
||||
macro_rules! make_get_values {
|
||||
($n:ident, $TY:ident, $FROM_BYTES:ident, $BY:expr) => {
|
||||
#[allow(unused)]
|
||||
fn $n(decomp: &BytesMut, ty: &ScalarType) -> Result<Vec<f32>, Error> {
|
||||
let n1 = decomp.len();
|
||||
if ty.bytes() as usize != $BY {
|
||||
Err(Error::with_msg(format!(
|
||||
"ty.bytes() != BY {} vs {}",
|
||||
ty.bytes(),
|
||||
$BY
|
||||
)))?;
|
||||
}
|
||||
if n1 % ty.bytes() as usize != 0 {
|
||||
Err(Error::with_msg(format!(
|
||||
"n1 % ty.bytes() as usize != 0 {} vs {}",
|
||||
n1,
|
||||
ty.bytes()
|
||||
)))?;
|
||||
}
|
||||
let ele_count = n1 / ty.bytes() as usize;
|
||||
let mut j = Vec::with_capacity(ele_count);
|
||||
let mut p2 = j.as_mut_ptr();
|
||||
let mut p1 = 0;
|
||||
for _ in 0..ele_count {
|
||||
unsafe {
|
||||
let mut r = [0u8; $BY];
|
||||
std::ptr::copy_nonoverlapping(&decomp[p1], r.as_mut_ptr(), $BY);
|
||||
*p2 = $TY::$FROM_BYTES(r) as f32;
|
||||
p1 += $BY;
|
||||
p2 = p2.add(1);
|
||||
};
|
||||
}
|
||||
unsafe {
|
||||
j.set_len(ele_count);
|
||||
}
|
||||
Ok(j)
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
make_get_values!(get_values_u8_le, u8, from_le_bytes, 1);
|
||||
make_get_values!(get_values_u16_le, u16, from_le_bytes, 2);
|
||||
make_get_values!(get_values_u32_le, u32, from_le_bytes, 4);
|
||||
make_get_values!(get_values_u64_le, u64, from_le_bytes, 8);
|
||||
make_get_values!(get_values_i8_le, i8, from_le_bytes, 1);
|
||||
make_get_values!(get_values_i16_le, i16, from_le_bytes, 2);
|
||||
make_get_values!(get_values_i32_le, i32, from_le_bytes, 4);
|
||||
make_get_values!(get_values_i64_le, i64, from_le_bytes, 8);
|
||||
make_get_values!(get_values_f32_le, f32, from_le_bytes, 4);
|
||||
make_get_values!(get_values_f64_le, f64, from_le_bytes, 8);
|
||||
|
||||
make_get_values!(get_values_u8_be, u8, from_be_bytes, 1);
|
||||
make_get_values!(get_values_u16_be, u16, from_be_bytes, 2);
|
||||
make_get_values!(get_values_u32_be, u32, from_be_bytes, 4);
|
||||
make_get_values!(get_values_u64_be, u64, from_be_bytes, 8);
|
||||
make_get_values!(get_values_i8_be, i8, from_be_bytes, 1);
|
||||
make_get_values!(get_values_i16_be, i16, from_be_bytes, 2);
|
||||
make_get_values!(get_values_i32_be, i32, from_be_bytes, 4);
|
||||
make_get_values!(get_values_i64_be, i64, from_be_bytes, 8);
|
||||
make_get_values!(get_values_f32_be, f32, from_be_bytes, 4);
|
||||
make_get_values!(get_values_f64_be, f64, from_be_bytes, 8);
|
||||
|
||||
@@ -109,7 +109,6 @@ pub fn make_frame_2<T>(item: &T, fty: u32) -> Result<BytesMut, Error>
|
||||
where
|
||||
T: erased_serde::Serialize,
|
||||
{
|
||||
info!("make_frame_2 T = {} fty {:x}", std::any::type_name::<T>(), fty);
|
||||
let mut out = Vec::new();
|
||||
//let mut ser = rmp_serde::Serializer::new(&mut out).with_struct_map();
|
||||
//let writer = ciborium::ser::into_writer(&item, &mut out).unwrap();
|
||||
@@ -272,7 +271,6 @@ pub fn decode_frame<T>(frame: &InMemoryFrame) -> Result<T, Error>
|
||||
where
|
||||
T: FrameDecodable,
|
||||
{
|
||||
info!("decode_frame T = {}", std::any::type_name::<T>());
|
||||
if frame.encid() != INMEM_FRAME_ENCID {
|
||||
return Err(Error::with_msg(format!("unknown encoder id {:?}", frame)));
|
||||
}
|
||||
@@ -334,10 +332,7 @@ where
|
||||
)))
|
||||
} else {
|
||||
match decode_from_slice(frame.buf()) {
|
||||
Ok(item) => {
|
||||
info!("decode_from_slice {} success", std::any::type_name::<T>());
|
||||
Ok(item)
|
||||
}
|
||||
Ok(item) => Ok(item),
|
||||
Err(e) => {
|
||||
error!("decode_frame T = {}", std::any::type_name::<T>());
|
||||
error!("ERROR deserialize len {} tyid {:x}", frame.buf().len(), frame.tyid());
|
||||
|
||||
@@ -125,7 +125,7 @@ macro_rules! read_next_scalar_values {
|
||||
ret.push_front(ts, pulse, value);
|
||||
}
|
||||
}
|
||||
info!("found in total {} events ts_msp {}", ret.tss.len(), ts_msp);
|
||||
trace!("found in total {} events ts_msp {}", ret.tss.len(), ts_msp);
|
||||
Ok(ret)
|
||||
}
|
||||
};
|
||||
@@ -166,7 +166,7 @@ macro_rules! read_next_array_values {
|
||||
ret.push(ts, pulse, value);
|
||||
}
|
||||
*/
|
||||
info!("found in total {} events ts_msp {}", ret.tss.len(), ts_msp);
|
||||
trace!("found in total {} events ts_msp {}", ret.tss.len(), ts_msp);
|
||||
Ok(ret)
|
||||
}
|
||||
};
|
||||
@@ -193,7 +193,7 @@ macro_rules! read_values {
|
||||
let fut = fut.map(|x| match x {
|
||||
Ok(k) => {
|
||||
let self_name = std::any::type_name::<Self>();
|
||||
info!("{self_name} read values len {}", k.len());
|
||||
trace!("{self_name} read values len {}", k.len());
|
||||
let b = Box::new(k) as Box<dyn Events>;
|
||||
Ok(b)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user