diff --git a/disk/src/agg.rs b/disk/src/agg.rs index 4da4396..900214f 100644 --- a/disk/src/agg.rs +++ b/disk/src/agg.rs @@ -421,7 +421,6 @@ where pub fn make_test_node(id: u32) -> Node { Node { - id: format!("{:02}", id), host: "localhost".into(), listen: "0.0.0.0".into(), port: 8800 + id as u16, diff --git a/disk/src/channelconfig.rs b/disk/src/channelconfig.rs index 2aba793..27d695f 100644 --- a/disk/src/channelconfig.rs +++ b/disk/src/channelconfig.rs @@ -1,6 +1,6 @@ use err::Error; use netpod::timeunits::MS; -use netpod::{Channel, NanoRange, Nanos, Node}; +use netpod::{Channel, NanoRange, Nanos, Node, ScalarType}; use nom::number::complete::{be_i16, be_i32, be_i64, be_i8, be_u8}; use nom::Needed; #[allow(unused_imports)] @@ -25,30 +25,6 @@ where Err(nom::Err::Error(e)) } -#[derive(Debug, FromPrimitive, ToPrimitive, Serialize, Deserialize)] -pub enum DType { - Bool = 0, - Bool8 = 1, - Int8 = 2, - Uint8 = 3, - Int16 = 4, - Uint16 = 5, - Character = 6, - Int32 = 7, - Uint32 = 8, - Int64 = 9, - Uint64 = 10, - Float32 = 11, - Float64 = 12, - String = 13, -} - -impl DType { - pub fn to_i16(&self) -> i16 { - ToPrimitive::to_i16(self).unwrap() - } -} - #[derive(Debug, FromPrimitive, ToPrimitive, Serialize, Deserialize)] pub enum CompressionMethod { BitshuffleLZ4 = 0, @@ -78,7 +54,7 @@ pub struct ConfigEntry { -16 f64 */ pub precision: i16, - pub dtype: DType, + pub scalar_type: ScalarType, pub is_compressed: bool, pub is_shaped: bool, pub is_array: bool, @@ -117,7 +93,6 @@ fn parse_short_string(inp: &[u8]) -> NRes> { } } -//pub fn parse_entry(inp: &[u8]) -> IResult<&[u8], Option> { pub fn parse_entry(inp: &[u8]) -> NRes> { let (inp, len1) = be_i32(inp)?; if len1 < 0 || len1 > 4000 { @@ -150,14 +125,14 @@ pub fn parse_entry(inp: &[u8]) -> NRes> { let is_array = dtmask & 0x40 != 0; let is_big_endian = dtmask & 0x20 != 0; let is_shaped = dtmask & 0x10 != 0; - let (inp, dtype) = be_i8(inp)?; + let (inp, dtype) = be_u8(inp)?; if dtype > 13 { return mkerr(format!("unexpected data type {}", dtype)); } - let dtype = match num_traits::FromPrimitive::from_i8(dtype) { - Some(k) => k, - None => { - return mkerr(format!("Can not convert {} to DType", dtype)); + let scalar_type = match ScalarType::from_dtype_index(dtype) { + Ok(k) => k, + Err(e) => { + return mkerr(format!("Can not convert {} to DType {:?}", dtype, e)); } }; let (inp, compression_method) = match is_compressed { @@ -211,7 +186,7 @@ pub fn parse_entry(inp: &[u8]) -> NRes> { modulo, offset, precision, - dtype, + scalar_type, is_compressed: is_compressed, is_array: is_array, is_shaped: is_shaped, diff --git a/disk/src/eventchunker.rs b/disk/src/eventchunker.rs index 2031aa2..8c44aae 100644 --- a/disk/src/eventchunker.rs +++ b/disk/src/eventchunker.rs @@ -224,7 +224,7 @@ impl EventChunker { ts, pulse, Some(decomp), - ScalarType::from_dtype_index(type_index), + ScalarType::from_dtype_index(type_index)?, ); } } diff --git a/disk/src/frame/inmem.rs b/disk/src/frame/inmem.rs index 0b5e145..d8025d5 100644 --- a/disk/src/frame/inmem.rs +++ b/disk/src/frame/inmem.rs @@ -103,11 +103,6 @@ where buf: BytesMut, wp: usize, ) -> (Option>>, BytesMut, usize) { - trace!( - "+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ tryparse with buf.len() {} wp {}", - buf.len(), - wp - ); let mut buf = buf; let nb = wp; if nb >= INMEM_FRAME_HEAD { @@ -126,9 +121,10 @@ where wp, ); } - trace!("tryparse len {}", len); if len == 0 { - info!("stop-frame with nb {}", nb); + if nb != INMEM_FRAME_HEAD + INMEM_FRAME_FOOT { + warn!("stop-frame with nb {}", nb); + } (Some(None), buf, wp) } else { if len > 1024 * 32 { diff --git a/disk/src/gen.rs b/disk/src/gen.rs index c72b25b..3f22a4a 100644 --- a/disk/src/gen.rs +++ b/disk/src/gen.rs @@ -55,7 +55,6 @@ pub async fn gen_test_data() -> Result<(), Error> { } for i1 in 0..3 { let node = Node { - id: format!("{:02}", i1), host: "localhost".into(), listen: "0.0.0.0".into(), port: 7780 + i1 as u16, diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index ee64d17..4dc5838 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -11,7 +11,7 @@ use futures_util::StreamExt; #[allow(unused_imports)] use netpod::log::*; use netpod::timeunits::SEC; -use netpod::{Node, NodeConfig, ScalarType, Shape}; +use netpod::{Node, NodeConfig, Shape}; use std::net::SocketAddr; use tokio::io::AsyncWriteExt; use tokio::net::tcp::OwnedWriteHalf; @@ -168,7 +168,7 @@ async fn raw_conn_handler_inner_try( keyspace: entry.ks as u8, time_bin_size: entry.bs, shape: shape, - scalar_type: ScalarType::from_dtype_index(entry.dtype.to_i16() as u8), + scalar_type: entry.scalar_type.clone(), big_endian: entry.is_big_endian, array: entry.is_array, compression: entry.is_compressed, diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index 78dc929..7f6e6dc 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -10,6 +10,7 @@ use timeunits::*; #[allow(unused_imports)] use tracing::{debug, error, info, trace, warn}; +pub mod status; pub mod streamext; #[derive(Clone, Debug, Serialize, Deserialize)] @@ -40,11 +41,11 @@ pub enum ScalarType { } impl ScalarType { - pub fn from_dtype_index(ix: u8) -> Self { + pub fn from_dtype_index(ix: u8) -> Result { use ScalarType::*; - match ix { - 0 => panic!("BOOL not supported"), - 1 => panic!("BOOL8 not supported"), + let g = match ix { + 0 => return Err(Error::with_msg(format!("BOOL not supported"))), + 1 => return Err(Error::with_msg(format!("BOOL8 not supported"))), 3 => U8, 5 => U16, 8 => U32, @@ -55,10 +56,11 @@ impl ScalarType { 9 => I64, 11 => F32, 12 => F64, - 6 => panic!("CHARACTER not supported"), - 13 => panic!("STRING not supported"), - _ => panic!("unknown"), - } + 6 => return Err(Error::with_msg(format!("CHARACTER not supported"))), + 13 => return Err(Error::with_msg(format!("STRING not supported"))), + _ => return Err(Error::with_msg(format!("unknown dtype code: {}", ix))), + }; + Ok(g) } pub fn bytes(&self) -> u8 { @@ -96,7 +98,6 @@ impl ScalarType { #[derive(Clone, Debug, Serialize, Deserialize)] pub struct Node { - pub id: String, pub host: String, pub listen: String, pub port: u16, @@ -107,7 +108,7 @@ pub struct Node { } impl Node { - pub fn name(&self) -> String { + pub fn _name(&self) -> String { format!("{}-{}", self.host, self.port) } } @@ -128,15 +129,23 @@ pub struct Cluster { #[derive(Clone, Debug, Serialize, Deserialize)] pub struct NodeConfig { - pub nodeid: String, + pub name: String, pub cluster: Cluster, } impl NodeConfig { pub fn get_node(&self) -> Option<&Node> { - for n in &self.cluster.nodes { - if n.id == self.nodeid { - return Some(n); + if self.name.contains(":") { + for n in &self.cluster.nodes { + if self.name == format!("{}:{}", n.host, n.port) { + return Some(n); + } + } + } else { + for n in &self.cluster.nodes { + if self.name == format!("{}", n.host) { + return Some(n); + } } } None diff --git a/netpod/src/status.rs b/netpod/src/status.rs new file mode 100644 index 0000000..7443131 --- /dev/null +++ b/netpod/src/status.rs @@ -0,0 +1,4 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Serialize, Deserialize)] +pub struct SystemStats {} diff --git a/retrieval/src/bin/retrieval.rs b/retrieval/src/bin/retrieval.rs index b565e54..465e82b 100644 --- a/retrieval/src/bin/retrieval.rs +++ b/retrieval/src/bin/retrieval.rs @@ -30,7 +30,7 @@ async fn go() -> Result<(), Error> { let node_config: NodeConfig = serde_json::from_slice(&buf)?; let node = node_config .get_node() - .ok_or(Error::with_msg(format!("nodeid config error")))?; + .ok_or(Error::with_msg(format!("nodeid config error {:?}", node_config)))?; retrieval::run_node(node_config.clone(), node.clone()).await?; } SubCmd::Client(client) => match client.client_type { @@ -54,7 +54,6 @@ fn simple_fetch() { taskrun::run(async { let t1 = chrono::Utc::now(); let node = Node { - id: format!("{:02}", 0), host: "localhost".into(), listen: "0.0.0.0".into(), port: 8360, @@ -91,7 +90,7 @@ fn simple_fetch() { }, }; let node_config = NodeConfig { - nodeid: cluster.nodes[0].id.clone(), + name: format!("{}:{}", cluster.nodes[0].host, cluster.nodes[0].port), cluster, }; let node = node_config.get_node().unwrap(); diff --git a/retrieval/src/lib.rs b/retrieval/src/lib.rs index 4e184ce..2adf6ed 100644 --- a/retrieval/src/lib.rs +++ b/retrieval/src/lib.rs @@ -14,7 +14,7 @@ pub fn spawn_test_hosts(cluster: Cluster) -> Vec>> for node in &cluster.nodes { let node_config = NodeConfig { cluster: cluster.clone(), - nodeid: node.id.clone(), + name: format!("{}:{}", node.host, node.port), }; let h = tokio::spawn(httpret::host(node_config, node.clone())); ret.push(h); diff --git a/retrieval/src/test.rs b/retrieval/src/test.rs index 0a4f50d..f28bd6c 100644 --- a/retrieval/src/test.rs +++ b/retrieval/src/test.rs @@ -15,7 +15,6 @@ fn test_cluster() -> Cluster { let nodes = (0..3) .into_iter() .map(|id| Node { - id: format!("{:02}", id), host: "localhost".into(), listen: "0.0.0.0".into(), port: 8360 + id as u16,