Reference self Node by host and optionally port, get rid of DType

This commit is contained in:
Dominik Werder
2021-05-01 11:31:10 +02:00
parent c4fa5dcc28
commit 0f9408e9f8
11 changed files with 44 additions and 64 deletions

View File

@@ -421,7 +421,6 @@ where
pub fn make_test_node(id: u32) -> Node {
Node {
id: format!("{:02}", id),
host: "localhost".into(),
listen: "0.0.0.0".into(),
port: 8800 + id as u16,

View File

@@ -1,6 +1,6 @@
use err::Error;
use netpod::timeunits::MS;
use netpod::{Channel, NanoRange, Nanos, Node};
use netpod::{Channel, NanoRange, Nanos, Node, ScalarType};
use nom::number::complete::{be_i16, be_i32, be_i64, be_i8, be_u8};
use nom::Needed;
#[allow(unused_imports)]
@@ -25,30 +25,6 @@ where
Err(nom::Err::Error(e))
}
#[derive(Debug, FromPrimitive, ToPrimitive, Serialize, Deserialize)]
pub enum DType {
Bool = 0,
Bool8 = 1,
Int8 = 2,
Uint8 = 3,
Int16 = 4,
Uint16 = 5,
Character = 6,
Int32 = 7,
Uint32 = 8,
Int64 = 9,
Uint64 = 10,
Float32 = 11,
Float64 = 12,
String = 13,
}
impl DType {
pub fn to_i16(&self) -> i16 {
ToPrimitive::to_i16(self).unwrap()
}
}
#[derive(Debug, FromPrimitive, ToPrimitive, Serialize, Deserialize)]
pub enum CompressionMethod {
BitshuffleLZ4 = 0,
@@ -78,7 +54,7 @@ pub struct ConfigEntry {
-16 f64
*/
pub precision: i16,
pub dtype: DType,
pub scalar_type: ScalarType,
pub is_compressed: bool,
pub is_shaped: bool,
pub is_array: bool,
@@ -117,7 +93,6 @@ fn parse_short_string(inp: &[u8]) -> NRes<Option<String>> {
}
}
//pub fn parse_entry(inp: &[u8]) -> IResult<&[u8], Option<ConfigEntry>> {
pub fn parse_entry(inp: &[u8]) -> NRes<Option<ConfigEntry>> {
let (inp, len1) = be_i32(inp)?;
if len1 < 0 || len1 > 4000 {
@@ -150,14 +125,14 @@ pub fn parse_entry(inp: &[u8]) -> NRes<Option<ConfigEntry>> {
let is_array = dtmask & 0x40 != 0;
let is_big_endian = dtmask & 0x20 != 0;
let is_shaped = dtmask & 0x10 != 0;
let (inp, dtype) = be_i8(inp)?;
let (inp, dtype) = be_u8(inp)?;
if dtype > 13 {
return mkerr(format!("unexpected data type {}", dtype));
}
let dtype = match num_traits::FromPrimitive::from_i8(dtype) {
Some(k) => k,
None => {
return mkerr(format!("Can not convert {} to DType", dtype));
let scalar_type = match ScalarType::from_dtype_index(dtype) {
Ok(k) => k,
Err(e) => {
return mkerr(format!("Can not convert {} to DType {:?}", dtype, e));
}
};
let (inp, compression_method) = match is_compressed {
@@ -211,7 +186,7 @@ pub fn parse_entry(inp: &[u8]) -> NRes<Option<ConfigEntry>> {
modulo,
offset,
precision,
dtype,
scalar_type,
is_compressed: is_compressed,
is_array: is_array,
is_shaped: is_shaped,

View File

@@ -224,7 +224,7 @@ impl EventChunker {
ts,
pulse,
Some(decomp),
ScalarType::from_dtype_index(type_index),
ScalarType::from_dtype_index(type_index)?,
);
}
}

View File

@@ -103,11 +103,6 @@ where
buf: BytesMut,
wp: usize,
) -> (Option<Option<Result<InMemoryFrame, Error>>>, BytesMut, usize) {
trace!(
"+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ tryparse with buf.len() {} wp {}",
buf.len(),
wp
);
let mut buf = buf;
let nb = wp;
if nb >= INMEM_FRAME_HEAD {
@@ -126,9 +121,10 @@ where
wp,
);
}
trace!("tryparse len {}", len);
if len == 0 {
info!("stop-frame with nb {}", nb);
if nb != INMEM_FRAME_HEAD + INMEM_FRAME_FOOT {
warn!("stop-frame with nb {}", nb);
}
(Some(None), buf, wp)
} else {
if len > 1024 * 32 {

View File

@@ -55,7 +55,6 @@ pub async fn gen_test_data() -> Result<(), Error> {
}
for i1 in 0..3 {
let node = Node {
id: format!("{:02}", i1),
host: "localhost".into(),
listen: "0.0.0.0".into(),
port: 7780 + i1 as u16,

View File

@@ -11,7 +11,7 @@ use futures_util::StreamExt;
#[allow(unused_imports)]
use netpod::log::*;
use netpod::timeunits::SEC;
use netpod::{Node, NodeConfig, ScalarType, Shape};
use netpod::{Node, NodeConfig, Shape};
use std::net::SocketAddr;
use tokio::io::AsyncWriteExt;
use tokio::net::tcp::OwnedWriteHalf;
@@ -168,7 +168,7 @@ async fn raw_conn_handler_inner_try(
keyspace: entry.ks as u8,
time_bin_size: entry.bs,
shape: shape,
scalar_type: ScalarType::from_dtype_index(entry.dtype.to_i16() as u8),
scalar_type: entry.scalar_type.clone(),
big_endian: entry.is_big_endian,
array: entry.is_array,
compression: entry.is_compressed,

View File

@@ -10,6 +10,7 @@ use timeunits::*;
#[allow(unused_imports)]
use tracing::{debug, error, info, trace, warn};
pub mod status;
pub mod streamext;
#[derive(Clone, Debug, Serialize, Deserialize)]
@@ -40,11 +41,11 @@ pub enum ScalarType {
}
impl ScalarType {
pub fn from_dtype_index(ix: u8) -> Self {
pub fn from_dtype_index(ix: u8) -> Result<Self, Error> {
use ScalarType::*;
match ix {
0 => panic!("BOOL not supported"),
1 => panic!("BOOL8 not supported"),
let g = match ix {
0 => return Err(Error::with_msg(format!("BOOL not supported"))),
1 => return Err(Error::with_msg(format!("BOOL8 not supported"))),
3 => U8,
5 => U16,
8 => U32,
@@ -55,10 +56,11 @@ impl ScalarType {
9 => I64,
11 => F32,
12 => F64,
6 => panic!("CHARACTER not supported"),
13 => panic!("STRING not supported"),
_ => panic!("unknown"),
}
6 => return Err(Error::with_msg(format!("CHARACTER not supported"))),
13 => return Err(Error::with_msg(format!("STRING not supported"))),
_ => return Err(Error::with_msg(format!("unknown dtype code: {}", ix))),
};
Ok(g)
}
pub fn bytes(&self) -> u8 {
@@ -96,7 +98,6 @@ impl ScalarType {
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Node {
pub id: String,
pub host: String,
pub listen: String,
pub port: u16,
@@ -107,7 +108,7 @@ pub struct Node {
}
impl Node {
pub fn name(&self) -> String {
pub fn _name(&self) -> String {
format!("{}-{}", self.host, self.port)
}
}
@@ -128,15 +129,23 @@ pub struct Cluster {
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct NodeConfig {
pub nodeid: String,
pub name: String,
pub cluster: Cluster,
}
impl NodeConfig {
pub fn get_node(&self) -> Option<&Node> {
for n in &self.cluster.nodes {
if n.id == self.nodeid {
return Some(n);
if self.name.contains(":") {
for n in &self.cluster.nodes {
if self.name == format!("{}:{}", n.host, n.port) {
return Some(n);
}
}
} else {
for n in &self.cluster.nodes {
if self.name == format!("{}", n.host) {
return Some(n);
}
}
}
None

4
netpod/src/status.rs Normal file
View File

@@ -0,0 +1,4 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize)]
pub struct SystemStats {}

View File

@@ -30,7 +30,7 @@ async fn go() -> Result<(), Error> {
let node_config: NodeConfig = serde_json::from_slice(&buf)?;
let node = node_config
.get_node()
.ok_or(Error::with_msg(format!("nodeid config error")))?;
.ok_or(Error::with_msg(format!("nodeid config error {:?}", node_config)))?;
retrieval::run_node(node_config.clone(), node.clone()).await?;
}
SubCmd::Client(client) => match client.client_type {
@@ -54,7 +54,6 @@ fn simple_fetch() {
taskrun::run(async {
let t1 = chrono::Utc::now();
let node = Node {
id: format!("{:02}", 0),
host: "localhost".into(),
listen: "0.0.0.0".into(),
port: 8360,
@@ -91,7 +90,7 @@ fn simple_fetch() {
},
};
let node_config = NodeConfig {
nodeid: cluster.nodes[0].id.clone(),
name: format!("{}:{}", cluster.nodes[0].host, cluster.nodes[0].port),
cluster,
};
let node = node_config.get_node().unwrap();

View File

@@ -14,7 +14,7 @@ pub fn spawn_test_hosts(cluster: Cluster) -> Vec<JoinHandle<Result<(), Error>>>
for node in &cluster.nodes {
let node_config = NodeConfig {
cluster: cluster.clone(),
nodeid: node.id.clone(),
name: format!("{}:{}", node.host, node.port),
};
let h = tokio::spawn(httpret::host(node_config, node.clone()));
ret.push(h);

View File

@@ -15,7 +15,6 @@ fn test_cluster() -> Cluster {
let nodes = (0..3)
.into_iter()
.map(|id| Node {
id: format!("{:02}", id),
host: "localhost".into(),
listen: "0.0.0.0".into(),
port: 8360 + id as u16,