From 3b8a307c902f5b90a2e6ff3bc64eb6225f322230 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 29 Apr 2021 15:35:59 +0200 Subject: [PATCH] Add u16 channel and more test variations --- disk/src/eventchunker.rs | 10 ++- disk/src/gen.rs | 97 +++++++++++++++++++----- retrieval/src/bin/retrieval.rs | 3 + retrieval/src/cli.rs | 1 + retrieval/src/test.rs | 133 +++++++++++++++++++-------------- 5 files changed, 167 insertions(+), 77 deletions(-) diff --git a/disk/src/eventchunker.rs b/disk/src/eventchunker.rs index 061f176..2031aa2 100644 --- a/disk/src/eventchunker.rs +++ b/disk/src/eventchunker.rs @@ -134,14 +134,17 @@ impl EventChunker { let mut sl = std::io::Cursor::new(buf.as_ref()); let len1b = sl.read_i32::().unwrap(); assert!(len == len1b as u32); - sl.read_i64::().unwrap(); + let _ttl = sl.read_i64::().unwrap(); let ts = sl.read_i64::().unwrap() as u64; let pulse = sl.read_i64::().unwrap() as u64; if ts >= self.range.end { self.seen_beyond_range = true; break; } - sl.read_i64::().unwrap(); + if ts < self.range.beg { + error!("seen before range: {}", ts / SEC); + } + let _ioc_ts = sl.read_i64::().unwrap(); let status = sl.read_i8().unwrap(); let severity = sl.read_i8().unwrap(); let optional = sl.read_i32::().unwrap(); @@ -214,9 +217,8 @@ impl EventChunker { assert!(c1 as u32 == k1); trace!("decompress result c1 {} k1 {}", c1, k1); if ts < self.range.beg { - error!("EVENT BEFORE RANGE {}", ts / SEC); } else if ts >= self.range.end { - error!("EVENT BEFORE RANGE {}", ts / SEC); + error!("EVENT AFTER RANGE {}", ts / SEC); } else { ret.add_event( ts, diff --git a/disk/src/gen.rs b/disk/src/gen.rs index 73200f7..195fd6d 100644 --- a/disk/src/gen.rs +++ b/disk/src/gen.rs @@ -11,7 +11,7 @@ use tokio::io::AsyncWriteExt; use tracing::{debug, error, info, trace, warn}; pub async fn gen_test_data() -> Result<(), Error> { - let data_base_path = PathBuf::from("../tmpdata"); + let data_base_path = PathBuf::from("tmpdata"); let ksprefix = String::from("ks"); let mut ensemble = Ensemble { nodes: vec![], @@ -21,8 +21,8 @@ pub async fn gen_test_data() -> Result<(), Error> { let chn = ChannelGenProps { config: ChannelConfig { channel: Channel { - backend: "test".into(), - name: "wave1".into(), + backend: "testbackend".into(), + name: "wave-f64-be-n21".into(), }, keyspace: 3, time_bin_size: Nanos { ns: DAY }, @@ -35,6 +35,23 @@ pub async fn gen_test_data() -> Result<(), Error> { time_spacing: MS * 1000, }; ensemble.channels.push(chn); + let chn = ChannelGenProps { + config: ChannelConfig { + channel: Channel { + backend: "testbackend".into(), + name: "wave-u16-le-n77".into(), + }, + keyspace: 3, + time_bin_size: Nanos { ns: DAY }, + array: true, + scalar_type: ScalarType::U16, + shape: Shape::Wave(77), + big_endian: false, + compression: true, + }, + time_spacing: MS * 100, + }; + ensemble.channels.push(chn); } for i1 in 0..3 { let node = Node { @@ -85,10 +102,22 @@ async fn gen_channel(chn: &ChannelGenProps, node: &Node, ensemble: &Ensemble) -> .map_err(|k| Error::with_msg(format!("can not generate config {:?}", k)))?; let mut evix = 0; let mut ts = Nanos { ns: 0 }; + let mut pulse = 0; while ts.ns < DAY * 2 { - let res = gen_timebin(evix, ts, chn.time_spacing, &channel_path, &chn.config, node, ensemble).await?; + let res = gen_timebin( + evix, + ts, + pulse, + chn.time_spacing, + &channel_path, + &chn.config, + node, + ensemble, + ) + .await?; evix = res.evix; ts.ns = res.ts.ns; + pulse = res.pulse; } Ok(()) } @@ -131,7 +160,7 @@ async fn gen_config( buf.put_i64(ts); buf.put_i64(pulse); buf.put_u32(config.keyspace as u32); - buf.put_u64(config.time_bin_size.ns); + buf.put_u64(config.time_bin_size.ns / MS); buf.put_i32(sc); buf.put_i32(status); buf.put_i8(bb); @@ -213,11 +242,13 @@ impl CountedFile { struct GenTimebinRes { evix: u64, ts: Nanos, + pulse: u64, } async fn gen_timebin( evix: u64, ts: Nanos, + pulse: u64, ts_spacing: u64, channel_path: &Path, config: &ChannelConfig, @@ -256,17 +287,19 @@ async fn gen_timebin( gen_datafile_header(&mut file, config).await?; let mut evix = evix; let mut ts = ts; + let mut pulse = pulse; let tsmax = Nanos { ns: (tb + 1) * config.time_bin_size.ns, }; while ts.ns < tsmax.ns { if evix % ensemble.nodes.len() as u64 == node.split as u64 { - gen_event(&mut file, index_file.as_mut(), evix, ts, config).await?; + gen_event(&mut file, index_file.as_mut(), evix, ts, pulse, config).await?; } evix += 1; ts.ns += ts_spacing; + pulse += 1; } - let ret = GenTimebinRes { evix, ts }; + let ret = GenTimebinRes { evix, ts, pulse }; Ok(ret) } @@ -287,14 +320,17 @@ async fn gen_event( index_file: Option<&mut CountedFile>, evix: u64, ts: Nanos, + pulse: u64, config: &ChannelConfig, ) -> Result<(), Error> { + let ttl = 0xcafecafe; + let ioc_ts = 0xcafecafe; let mut buf = BytesMut::with_capacity(1024 * 16); buf.put_i32(0xcafecafe as u32 as i32); - buf.put_u64(0xcafecafe); + buf.put_u64(ttl); buf.put_u64(ts.ns); - buf.put_u64(2323); - buf.put_u64(0xcafecafe); + buf.put_u64(pulse); + buf.put_u64(ioc_ts); buf.put_u8(0); buf.put_u8(0); buf.put_i32(-1); @@ -314,11 +350,15 @@ async fn gen_event( let mut vals = vec![0; (ele_size * ele_count) as usize]; for i1 in 0..ele_count { let v = evix as f64; - let a = v.to_be_bytes(); - let mut c1 = std::io::Cursor::new(&mut vals); - use std::io::{Seek, SeekFrom}; + let a = if config.big_endian { + v.to_be_bytes() + } else { + v.to_le_bytes() + }; + use std::io::{Cursor, Seek, SeekFrom, Write}; + let mut c1 = Cursor::new(&mut vals); c1.seek(SeekFrom::Start(i1 as u64 * ele_size as u64))?; - std::io::Write::write_all(&mut c1, &a)?; + Write::write_all(&mut c1, &a)?; } let mut comp = vec![0u8; (ele_size * ele_count + 64) as usize]; let n1 = @@ -328,13 +368,36 @@ async fn gen_event( buf.put_u32(comp_block_size); buf.put(&comp[..n1]); } - _ => todo!(), + ScalarType::U16 => { + let ele_size = 2; + let mut vals = vec![0; (ele_size * ele_count) as usize]; + for i1 in 0..ele_count { + let v = evix as u16; + let a = if config.big_endian { + v.to_be_bytes() + } else { + v.to_le_bytes() + }; + use std::io::{Cursor, Seek, SeekFrom, Write}; + let mut c1 = Cursor::new(&mut vals); + c1.seek(SeekFrom::Start(i1 as u64 * ele_size as u64))?; + Write::write_all(&mut c1, &a)?; + } + let mut comp = vec![0u8; (ele_size * ele_count + 64) as usize]; + let n1 = + bitshuffle_compress(&vals, &mut comp, ele_count as usize, ele_size as usize, 0).unwrap(); + buf.put_u64(vals.len() as u64); + let comp_block_size = 0; + buf.put_u32(comp_block_size); + buf.put(&comp[..n1]); + } + _ => todo!("Datatype not yet supported: {:?}", config.scalar_type), } } - _ => todo!(), + _ => todo!("Shape not yet supported: {:?}", config.shape), } } else { - todo!() + todo!("Uncompressed not yet supported"); } { let len = buf.len() as u32 + 4; diff --git a/retrieval/src/bin/retrieval.rs b/retrieval/src/bin/retrieval.rs index 729701e..b565e54 100644 --- a/retrieval/src/bin/retrieval.rs +++ b/retrieval/src/bin/retrieval.rs @@ -40,6 +40,9 @@ async fn go() -> Result<(), Error> { retrieval::client::get_binned(opts.host, opts.port, opts.channel, beg, end, opts.bins).await?; } }, + SubCmd::GenerateTestData => { + disk::gen::gen_test_data().await?; + } } Ok(()) } diff --git a/retrieval/src/cli.rs b/retrieval/src/cli.rs index a1f581a..e091424 100644 --- a/retrieval/src/cli.rs +++ b/retrieval/src/cli.rs @@ -13,6 +13,7 @@ pub struct Opts { pub enum SubCmd { Retrieval(Retrieval), Client(Client), + GenerateTestData, } #[derive(Debug, Clap)] diff --git a/retrieval/src/test.rs b/retrieval/src/test.rs index 9995c6d..17d4a55 100644 --- a/retrieval/src/test.rs +++ b/retrieval/src/test.rs @@ -3,11 +3,13 @@ use bytes::BytesMut; use chrono::{DateTime, Utc}; use disk::frame::inmem::InMemoryFrameAsyncReadStream; use err::Error; +use futures_util::StreamExt; use futures_util::TryStreamExt; use hyper::Body; +use netpod::log::*; use netpod::{Cluster, Database, Node}; -#[allow(unused_imports)] -use tracing::{debug, error, info, trace, warn}; +use std::future::ready; +use tokio::io::AsyncRead; fn test_cluster() -> Cluster { let nodes = (0..3) @@ -35,20 +37,46 @@ fn test_cluster() -> Cluster { } #[test] -fn get_cached_0() { - taskrun::run(get_cached_0_inner()).unwrap(); +fn get_binned() { + taskrun::run(get_binned_0_inner()).unwrap(); } -async fn get_cached_0_inner() -> Result<(), Error> { +async fn get_binned_0_inner() -> Result<(), Error> { + get_binned_channel( + "wave-f64-be-n21", + "1970-01-01T00:20:10.000Z", + "1970-01-01T00:20:51.000Z", + 4, + ) + .await?; + get_binned_channel( + "wave-u16-le-n77", + "1970-01-01T01:11:00.000Z", + "1970-01-01T02:12:00.000Z", + 4, + ) + .await?; + get_binned_channel( + "wave-u16-le-n77", + "1970-01-01T01:42:00.000Z", + "1970-01-01T03:55:00.000Z", + 2, + ) + .await?; + Ok(()) +} + +async fn get_binned_channel(channel_name: &str, beg_date: S, end_date: S, bin_count: u32) -> Result<(), Error> +where + S: AsRef, +{ let t1 = Utc::now(); let cluster = test_cluster(); let node0 = &cluster.nodes[0]; - let hosts = spawn_test_hosts(cluster.clone()); - let beg_date: DateTime = "1970-01-01T00:20:10.000Z".parse()?; - let end_date: DateTime = "1970-01-01T00:20:51.000Z".parse()?; + let _hosts = spawn_test_hosts(cluster.clone()); + let beg_date: DateTime = beg_date.as_ref().parse()?; + let end_date: DateTime = end_date.as_ref().parse()?; let channel_backend = "back"; - let channel_name = "wave1"; - let bin_count = 4; let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ"; let uri = format!( "http://{}:{}/api/1/binned?channel_backend={}&channel_name={}&bin_count={}&beg_date={}&end_date={}", @@ -72,26 +100,30 @@ async fn get_cached_0_inner() -> Result<(), Error> { //let (res_head, mut res_body) = res.into_parts(); let s1 = disk::cache::HttpBodyAsAsyncRead::new(res); let s2 = InMemoryFrameAsyncReadStream::new(s1); - /*use hyper::body::HttpBody; - loop { - match res_body.data().await { - Some(Ok(k)) => { - info!("packet.. len {}", k.len()); - ntot += k.len() as u64; - } - Some(Err(e)) => { - error!("{:?}", e); - } - None => { - info!("response stream exhausted"); - break; - } - } - }*/ - use futures_util::StreamExt; - use std::future::ready; - let mut bin_count = 0; - let s3 = s2 + let res = consume_binned_response(s2).await?; + let t2 = chrono::Utc::now(); + let ms = t2.signed_duration_since(t1).num_milliseconds() as u64; + //let throughput = ntot / 1024 * 1000 / ms; + info!("get_cached_0 DONE bin_count {} time {} ms", res.bin_count, ms); + Ok(()) +} + +#[derive(Debug)] +pub struct BinnedResponse { + bin_count: usize, +} + +impl BinnedResponse { + pub fn new() -> Self { + Self { bin_count: 0 } + } +} + +async fn consume_binned_response(inp: InMemoryFrameAsyncReadStream) -> Result +where + T: AsyncRead + Unpin, +{ + let s1 = inp .map_err(|e| error!("TEST GOT ERROR {:?}", e)) .filter_map(|item| { let g = match item { @@ -102,7 +134,6 @@ async fn get_cached_0_inner() -> Result<(), Error> { Ok(item) => match item { Ok(item) => { info!("TEST GOT ITEM {:?}", item); - bin_count += 1; Some(Ok(item)) } Err(e) => { @@ -120,31 +151,21 @@ async fn get_cached_0_inner() -> Result<(), Error> { }; ready(g) }) - .for_each(|_| ready(())); - s3.await; - let t2 = chrono::Utc::now(); - let ntot = 0; - let ms = t2.signed_duration_since(t1).num_milliseconds() as u64; - let throughput = ntot / 1024 * 1000 / ms; - info!( - "get_cached_0 DONE total download {} MB throughput {:5} kB/s bin_count {}", - ntot / 1024 / 1024, - throughput, - bin_count, - ); - drop(hosts); - //Err::<(), _>(format!("test error").into()) - Ok(()) -} - -#[test] -fn test_gen_test_data() { - let res = taskrun::run(async { - disk::gen::gen_test_data().await?; - Ok(()) - }); - info!("{:?}", res); - res.unwrap(); + .fold(Ok(BinnedResponse::new()), |a, k| { + let g = match a { + Ok(mut a) => match k { + Ok(k) => { + a.bin_count += k.ts1s.len(); + Ok(a) + } + Err(e) => Err(e), + }, + Err(e) => Err(e), + }; + ready(g) + }); + let ret = s1.await; + ret } #[test]