From fc22d1ebaf39fa980d8d73b775e618c48e6385ca Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 8 Nov 2022 16:33:10 +0100 Subject: [PATCH] WIP on make tests run again --- daqbuffer/src/bin/daqbuffer.rs | 4 ++- disk/src/frame/inmem.rs | 2 +- disk/src/gen.rs | 14 ++------- nodenet/src/conn.rs | 45 +++++++++++++++------------- nodenet/src/conn/test.rs | 54 ++++++++++++++++++++++++++++++++++ 5 files changed, 85 insertions(+), 34 deletions(-) create mode 100644 nodenet/src/conn/test.rs diff --git a/daqbuffer/src/bin/daqbuffer.rs b/daqbuffer/src/bin/daqbuffer.rs index d1fed44..07bd7ce 100644 --- a/daqbuffer/src/bin/daqbuffer.rs +++ b/daqbuffer/src/bin/daqbuffer.rs @@ -103,7 +103,9 @@ async fn go() -> Result<(), Error> { Ok(()) } -#[test] +// TODO test data needs to be generated +//#[test] +#[allow(unused)] fn simple_fetch() { use daqbuffer::err::ErrConv; use netpod::Nanos; diff --git a/disk/src/frame/inmem.rs b/disk/src/frame/inmem.rs index 0db0dd6..0de1b1d 100644 --- a/disk/src/frame/inmem.rs +++ b/disk/src/frame/inmem.rs @@ -110,7 +110,7 @@ where let tyid = u32::from_le_bytes(*arrayref::array_ref![buf, 8, 4]); let len = u32::from_le_bytes(*arrayref::array_ref![buf, 12, 4]); if magic != INMEM_FRAME_MAGIC { - let z = nb.min(32); + let z = nb.min(256); let u = String::from_utf8_lossy(&buf[0..z]); let e = Error::with_msg("INCORRECT MAGIC"); error!("incorrect magic buf as utf8: {:?} error: {:?}", u, e); diff --git a/disk/src/gen.rs b/disk/src/gen.rs index 586fde2..74c81ed 100644 --- a/disk/src/gen.rs +++ b/disk/src/gen.rs @@ -2,20 +2,12 @@ use crate::ChannelConfigExt; use bitshuffle::bitshuffle_compress; use bytes::{BufMut, BytesMut}; use err::Error; -use netpod::{timeunits::*, ByteOrder, Channel, ChannelConfig, GenVar, Node, SfDatabuffer, Shape}; -use netpod::{Nanos, ScalarType}; +use netpod::log::*; +use netpod::timeunits::*; +use netpod::{ByteOrder, Channel, ChannelConfig, GenVar, Nanos, Node, ScalarType, SfDatabuffer, Shape}; use std::path::{Path, PathBuf}; use tokio::fs::{File, OpenOptions}; use tokio::io::AsyncWriteExt; -#[allow(unused_imports)] -use tracing::{debug, error, info, trace, warn}; - -#[test] -pub fn gen_test_data_test() { - if false { - taskrun::run(gen_test_data()).unwrap(); - } -} pub async fn gen_test_data() -> Result<(), Error> { let homedir = std::env::var("HOME").unwrap(); diff --git a/nodenet/src/conn.rs b/nodenet/src/conn.rs index 8244cad..7df60a3 100644 --- a/nodenet/src/conn.rs +++ b/nodenet/src/conn.rs @@ -1,3 +1,6 @@ +#[cfg(test)] +mod test; + use dbconn::events_scylla::make_scylla_stream; use disk::frame::inmem::InMemoryFrameAsyncReadStream; use err::Error; @@ -44,27 +47,6 @@ async fn events_conn_handler(stream: TcpStream, addr: SocketAddr, node_config: N } } -async fn events_conn_handler_inner( - stream: TcpStream, - addr: SocketAddr, - node_config: &NodeConfigCached, -) -> Result<(), Error> { - match events_conn_handler_inner_try(stream, addr, node_config).await { - Ok(_) => (), - Err(ce) => { - // Try to pass the error over the network. - // If that fails, give error to the caller. - let mut out = ce.netout; - let e = ce.err; - let buf = items::frame::make_error_frame(&e)?; - //type T = StreamItem>>; - //let buf = Err::(e).make_frame()?; - out.write_all(&buf).await?; - } - } - Ok(()) -} - struct ConnErr { err: Error, #[allow(dead_code)] @@ -202,3 +184,24 @@ async fn events_conn_handler_inner_try( debug!("events_conn_handler_inner_try buf_len_histo: {:?}", buf_len_histo); Ok(()) } + +async fn events_conn_handler_inner( + stream: TcpStream, + addr: SocketAddr, + node_config: &NodeConfigCached, +) -> Result<(), Error> { + match events_conn_handler_inner_try(stream, addr, node_config).await { + Ok(_) => (), + Err(ce) => { + // Try to pass the error over the network. + // If that fails, give error to the caller. + let mut out = ce.netout; + let e = ce.err; + let buf = items::frame::make_error_frame(&e)?; + //type T = StreamItem>>; + //let buf = Err::(e).make_frame()?; + out.write_all(&buf).await?; + } + } + Ok(()) +} diff --git a/nodenet/src/conn/test.rs b/nodenet/src/conn/test.rs new file mode 100644 index 0000000..465c730 --- /dev/null +++ b/nodenet/src/conn/test.rs @@ -0,0 +1,54 @@ +use netpod::{Cluster, Database, FileIoBufferSize, Node, NodeConfig, SfDatabuffer}; +use tokio::net::TcpListener; + +use super::*; + +#[test] +fn raw_data_00() { + //taskrun::run(disk::gen::gen_test_data()).unwrap(); + let fut = async { + let lis = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let con = TcpStream::connect(lis.local_addr().unwrap()).await.unwrap(); + let (client, addr) = lis.accept().await.unwrap(); + let cfg = NodeConfigCached { + node_config: NodeConfig { + name: "node_name_dummy".into(), + cluster: Cluster { + backend: "backend_dummy".into(), + nodes: vec![], + database: Database { + name: "".into(), + host: "".into(), + port: 5432, + user: "".into(), + pass: "".into(), + }, + run_map_pulse_task: false, + is_central_storage: false, + file_io_buffer_size: FileIoBufferSize(1024 * 8), + scylla: None, + cache_scylla: None, + }, + }, + node: Node { + host: "empty".into(), + listen: "listen_dummy".into(), + port: 9090, + port_raw: 9090, + cache_base_path: "".into(), + sf_databuffer: Some(SfDatabuffer { + data_base_path: "/home/dominik/daqbuffer-testdata/databuffer/node00".into(), + ksprefix: "ks".into(), + splits: None, + }), + archiver_appliance: None, + channel_archiver: None, + prometheus_api_bind: None, + }, + ix: 0, + }; + events_conn_handler(client, addr, cfg).await.unwrap(); + Ok(()) + }; + taskrun::run(fut).unwrap(); +}