Add u16 channel and more test variations

This commit is contained in:
Dominik Werder
2021-04-29 15:35:59 +02:00
parent 0ebd0b80f5
commit 3b8a307c90
5 changed files with 167 additions and 77 deletions
+6 -4
View File
@@ -134,14 +134,17 @@ impl EventChunker {
let mut sl = std::io::Cursor::new(buf.as_ref()); let mut sl = std::io::Cursor::new(buf.as_ref());
let len1b = sl.read_i32::<BE>().unwrap(); let len1b = sl.read_i32::<BE>().unwrap();
assert!(len == len1b as u32); assert!(len == len1b as u32);
sl.read_i64::<BE>().unwrap(); let _ttl = sl.read_i64::<BE>().unwrap();
let ts = sl.read_i64::<BE>().unwrap() as u64; let ts = sl.read_i64::<BE>().unwrap() as u64;
let pulse = sl.read_i64::<BE>().unwrap() as u64; let pulse = sl.read_i64::<BE>().unwrap() as u64;
if ts >= self.range.end { if ts >= self.range.end {
self.seen_beyond_range = true; self.seen_beyond_range = true;
break; break;
} }
sl.read_i64::<BE>().unwrap(); if ts < self.range.beg {
error!("seen before range: {}", ts / SEC);
}
let _ioc_ts = sl.read_i64::<BE>().unwrap();
let status = sl.read_i8().unwrap(); let status = sl.read_i8().unwrap();
let severity = sl.read_i8().unwrap(); let severity = sl.read_i8().unwrap();
let optional = sl.read_i32::<BE>().unwrap(); let optional = sl.read_i32::<BE>().unwrap();
@@ -214,9 +217,8 @@ impl EventChunker {
assert!(c1 as u32 == k1); assert!(c1 as u32 == k1);
trace!("decompress result c1 {} k1 {}", c1, k1); trace!("decompress result c1 {} k1 {}", c1, k1);
if ts < self.range.beg { if ts < self.range.beg {
error!("EVENT BEFORE RANGE {}", ts / SEC);
} else if ts >= self.range.end { } else if ts >= self.range.end {
error!("EVENT BEFORE RANGE {}", ts / SEC); error!("EVENT AFTER RANGE {}", ts / SEC);
} else { } else {
ret.add_event( ret.add_event(
ts, ts,
+80 -17
View File
@@ -11,7 +11,7 @@ use tokio::io::AsyncWriteExt;
use tracing::{debug, error, info, trace, warn}; use tracing::{debug, error, info, trace, warn};
pub async fn gen_test_data() -> Result<(), Error> { pub async fn gen_test_data() -> Result<(), Error> {
let data_base_path = PathBuf::from("../tmpdata"); let data_base_path = PathBuf::from("tmpdata");
let ksprefix = String::from("ks"); let ksprefix = String::from("ks");
let mut ensemble = Ensemble { let mut ensemble = Ensemble {
nodes: vec![], nodes: vec![],
@@ -21,8 +21,8 @@ pub async fn gen_test_data() -> Result<(), Error> {
let chn = ChannelGenProps { let chn = ChannelGenProps {
config: ChannelConfig { config: ChannelConfig {
channel: Channel { channel: Channel {
backend: "test".into(), backend: "testbackend".into(),
name: "wave1".into(), name: "wave-f64-be-n21".into(),
}, },
keyspace: 3, keyspace: 3,
time_bin_size: Nanos { ns: DAY }, time_bin_size: Nanos { ns: DAY },
@@ -35,6 +35,23 @@ pub async fn gen_test_data() -> Result<(), Error> {
time_spacing: MS * 1000, time_spacing: MS * 1000,
}; };
ensemble.channels.push(chn); ensemble.channels.push(chn);
let chn = ChannelGenProps {
config: ChannelConfig {
channel: Channel {
backend: "testbackend".into(),
name: "wave-u16-le-n77".into(),
},
keyspace: 3,
time_bin_size: Nanos { ns: DAY },
array: true,
scalar_type: ScalarType::U16,
shape: Shape::Wave(77),
big_endian: false,
compression: true,
},
time_spacing: MS * 100,
};
ensemble.channels.push(chn);
} }
for i1 in 0..3 { for i1 in 0..3 {
let node = Node { let node = Node {
@@ -85,10 +102,22 @@ async fn gen_channel(chn: &ChannelGenProps, node: &Node, ensemble: &Ensemble) ->
.map_err(|k| Error::with_msg(format!("can not generate config {:?}", k)))?; .map_err(|k| Error::with_msg(format!("can not generate config {:?}", k)))?;
let mut evix = 0; let mut evix = 0;
let mut ts = Nanos { ns: 0 }; let mut ts = Nanos { ns: 0 };
let mut pulse = 0;
while ts.ns < DAY * 2 { while ts.ns < DAY * 2 {
let res = gen_timebin(evix, ts, chn.time_spacing, &channel_path, &chn.config, node, ensemble).await?; let res = gen_timebin(
evix,
ts,
pulse,
chn.time_spacing,
&channel_path,
&chn.config,
node,
ensemble,
)
.await?;
evix = res.evix; evix = res.evix;
ts.ns = res.ts.ns; ts.ns = res.ts.ns;
pulse = res.pulse;
} }
Ok(()) Ok(())
} }
@@ -131,7 +160,7 @@ async fn gen_config(
buf.put_i64(ts); buf.put_i64(ts);
buf.put_i64(pulse); buf.put_i64(pulse);
buf.put_u32(config.keyspace as u32); buf.put_u32(config.keyspace as u32);
buf.put_u64(config.time_bin_size.ns); buf.put_u64(config.time_bin_size.ns / MS);
buf.put_i32(sc); buf.put_i32(sc);
buf.put_i32(status); buf.put_i32(status);
buf.put_i8(bb); buf.put_i8(bb);
@@ -213,11 +242,13 @@ impl CountedFile {
struct GenTimebinRes { struct GenTimebinRes {
evix: u64, evix: u64,
ts: Nanos, ts: Nanos,
pulse: u64,
} }
async fn gen_timebin( async fn gen_timebin(
evix: u64, evix: u64,
ts: Nanos, ts: Nanos,
pulse: u64,
ts_spacing: u64, ts_spacing: u64,
channel_path: &Path, channel_path: &Path,
config: &ChannelConfig, config: &ChannelConfig,
@@ -256,17 +287,19 @@ async fn gen_timebin(
gen_datafile_header(&mut file, config).await?; gen_datafile_header(&mut file, config).await?;
let mut evix = evix; let mut evix = evix;
let mut ts = ts; let mut ts = ts;
let mut pulse = pulse;
let tsmax = Nanos { let tsmax = Nanos {
ns: (tb + 1) * config.time_bin_size.ns, ns: (tb + 1) * config.time_bin_size.ns,
}; };
while ts.ns < tsmax.ns { while ts.ns < tsmax.ns {
if evix % ensemble.nodes.len() as u64 == node.split as u64 { if evix % ensemble.nodes.len() as u64 == node.split as u64 {
gen_event(&mut file, index_file.as_mut(), evix, ts, config).await?; gen_event(&mut file, index_file.as_mut(), evix, ts, pulse, config).await?;
} }
evix += 1; evix += 1;
ts.ns += ts_spacing; ts.ns += ts_spacing;
pulse += 1;
} }
let ret = GenTimebinRes { evix, ts }; let ret = GenTimebinRes { evix, ts, pulse };
Ok(ret) Ok(ret)
} }
@@ -287,14 +320,17 @@ async fn gen_event(
index_file: Option<&mut CountedFile>, index_file: Option<&mut CountedFile>,
evix: u64, evix: u64,
ts: Nanos, ts: Nanos,
pulse: u64,
config: &ChannelConfig, config: &ChannelConfig,
) -> Result<(), Error> { ) -> Result<(), Error> {
let ttl = 0xcafecafe;
let ioc_ts = 0xcafecafe;
let mut buf = BytesMut::with_capacity(1024 * 16); let mut buf = BytesMut::with_capacity(1024 * 16);
buf.put_i32(0xcafecafe as u32 as i32); buf.put_i32(0xcafecafe as u32 as i32);
buf.put_u64(0xcafecafe); buf.put_u64(ttl);
buf.put_u64(ts.ns); buf.put_u64(ts.ns);
buf.put_u64(2323); buf.put_u64(pulse);
buf.put_u64(0xcafecafe); buf.put_u64(ioc_ts);
buf.put_u8(0); buf.put_u8(0);
buf.put_u8(0); buf.put_u8(0);
buf.put_i32(-1); buf.put_i32(-1);
@@ -314,11 +350,15 @@ async fn gen_event(
let mut vals = vec![0; (ele_size * ele_count) as usize]; let mut vals = vec![0; (ele_size * ele_count) as usize];
for i1 in 0..ele_count { for i1 in 0..ele_count {
let v = evix as f64; let v = evix as f64;
let a = v.to_be_bytes(); let a = if config.big_endian {
let mut c1 = std::io::Cursor::new(&mut vals); v.to_be_bytes()
use std::io::{Seek, SeekFrom}; } else {
v.to_le_bytes()
};
use std::io::{Cursor, Seek, SeekFrom, Write};
let mut c1 = Cursor::new(&mut vals);
c1.seek(SeekFrom::Start(i1 as u64 * ele_size as u64))?; c1.seek(SeekFrom::Start(i1 as u64 * ele_size as u64))?;
std::io::Write::write_all(&mut c1, &a)?; Write::write_all(&mut c1, &a)?;
} }
let mut comp = vec![0u8; (ele_size * ele_count + 64) as usize]; let mut comp = vec![0u8; (ele_size * ele_count + 64) as usize];
let n1 = let n1 =
@@ -328,13 +368,36 @@ async fn gen_event(
buf.put_u32(comp_block_size); buf.put_u32(comp_block_size);
buf.put(&comp[..n1]); buf.put(&comp[..n1]);
} }
_ => todo!(), ScalarType::U16 => {
let ele_size = 2;
let mut vals = vec![0; (ele_size * ele_count) as usize];
for i1 in 0..ele_count {
let v = evix as u16;
let a = if config.big_endian {
v.to_be_bytes()
} else {
v.to_le_bytes()
};
use std::io::{Cursor, Seek, SeekFrom, Write};
let mut c1 = Cursor::new(&mut vals);
c1.seek(SeekFrom::Start(i1 as u64 * ele_size as u64))?;
Write::write_all(&mut c1, &a)?;
}
let mut comp = vec![0u8; (ele_size * ele_count + 64) as usize];
let n1 =
bitshuffle_compress(&vals, &mut comp, ele_count as usize, ele_size as usize, 0).unwrap();
buf.put_u64(vals.len() as u64);
let comp_block_size = 0;
buf.put_u32(comp_block_size);
buf.put(&comp[..n1]);
}
_ => todo!("Datatype not yet supported: {:?}", config.scalar_type),
} }
} }
_ => todo!(), _ => todo!("Shape not yet supported: {:?}", config.shape),
} }
} else { } else {
todo!() todo!("Uncompressed not yet supported");
} }
{ {
let len = buf.len() as u32 + 4; let len = buf.len() as u32 + 4;
+3
View File
@@ -40,6 +40,9 @@ async fn go() -> Result<(), Error> {
retrieval::client::get_binned(opts.host, opts.port, opts.channel, beg, end, opts.bins).await?; retrieval::client::get_binned(opts.host, opts.port, opts.channel, beg, end, opts.bins).await?;
} }
}, },
SubCmd::GenerateTestData => {
disk::gen::gen_test_data().await?;
}
} }
Ok(()) Ok(())
} }
+1
View File
@@ -13,6 +13,7 @@ pub struct Opts {
pub enum SubCmd { pub enum SubCmd {
Retrieval(Retrieval), Retrieval(Retrieval),
Client(Client), Client(Client),
GenerateTestData,
} }
#[derive(Debug, Clap)] #[derive(Debug, Clap)]
+77 -56
View File
@@ -3,11 +3,13 @@ use bytes::BytesMut;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use disk::frame::inmem::InMemoryFrameAsyncReadStream; use disk::frame::inmem::InMemoryFrameAsyncReadStream;
use err::Error; use err::Error;
use futures_util::StreamExt;
use futures_util::TryStreamExt; use futures_util::TryStreamExt;
use hyper::Body; use hyper::Body;
use netpod::log::*;
use netpod::{Cluster, Database, Node}; use netpod::{Cluster, Database, Node};
#[allow(unused_imports)] use std::future::ready;
use tracing::{debug, error, info, trace, warn}; use tokio::io::AsyncRead;
fn test_cluster() -> Cluster { fn test_cluster() -> Cluster {
let nodes = (0..3) let nodes = (0..3)
@@ -35,20 +37,46 @@ fn test_cluster() -> Cluster {
} }
#[test] #[test]
fn get_cached_0() { fn get_binned() {
taskrun::run(get_cached_0_inner()).unwrap(); taskrun::run(get_binned_0_inner()).unwrap();
} }
async fn get_cached_0_inner() -> Result<(), Error> { async fn get_binned_0_inner() -> Result<(), Error> {
get_binned_channel(
"wave-f64-be-n21",
"1970-01-01T00:20:10.000Z",
"1970-01-01T00:20:51.000Z",
4,
)
.await?;
get_binned_channel(
"wave-u16-le-n77",
"1970-01-01T01:11:00.000Z",
"1970-01-01T02:12:00.000Z",
4,
)
.await?;
get_binned_channel(
"wave-u16-le-n77",
"1970-01-01T01:42:00.000Z",
"1970-01-01T03:55:00.000Z",
2,
)
.await?;
Ok(())
}
async fn get_binned_channel<S>(channel_name: &str, beg_date: S, end_date: S, bin_count: u32) -> Result<(), Error>
where
S: AsRef<str>,
{
let t1 = Utc::now(); let t1 = Utc::now();
let cluster = test_cluster(); let cluster = test_cluster();
let node0 = &cluster.nodes[0]; let node0 = &cluster.nodes[0];
let hosts = spawn_test_hosts(cluster.clone()); let _hosts = spawn_test_hosts(cluster.clone());
let beg_date: DateTime<Utc> = "1970-01-01T00:20:10.000Z".parse()?; let beg_date: DateTime<Utc> = beg_date.as_ref().parse()?;
let end_date: DateTime<Utc> = "1970-01-01T00:20:51.000Z".parse()?; let end_date: DateTime<Utc> = end_date.as_ref().parse()?;
let channel_backend = "back"; let channel_backend = "back";
let channel_name = "wave1";
let bin_count = 4;
let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ"; let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ";
let uri = format!( let uri = format!(
"http://{}:{}/api/1/binned?channel_backend={}&channel_name={}&bin_count={}&beg_date={}&end_date={}", "http://{}:{}/api/1/binned?channel_backend={}&channel_name={}&bin_count={}&beg_date={}&end_date={}",
@@ -72,26 +100,30 @@ async fn get_cached_0_inner() -> Result<(), Error> {
//let (res_head, mut res_body) = res.into_parts(); //let (res_head, mut res_body) = res.into_parts();
let s1 = disk::cache::HttpBodyAsAsyncRead::new(res); let s1 = disk::cache::HttpBodyAsAsyncRead::new(res);
let s2 = InMemoryFrameAsyncReadStream::new(s1); let s2 = InMemoryFrameAsyncReadStream::new(s1);
/*use hyper::body::HttpBody; let res = consume_binned_response(s2).await?;
loop { let t2 = chrono::Utc::now();
match res_body.data().await { let ms = t2.signed_duration_since(t1).num_milliseconds() as u64;
Some(Ok(k)) => { //let throughput = ntot / 1024 * 1000 / ms;
info!("packet.. len {}", k.len()); info!("get_cached_0 DONE bin_count {} time {} ms", res.bin_count, ms);
ntot += k.len() as u64; Ok(())
} }
Some(Err(e)) => {
error!("{:?}", e); #[derive(Debug)]
} pub struct BinnedResponse {
None => { bin_count: usize,
info!("response stream exhausted"); }
break;
} impl BinnedResponse {
} pub fn new() -> Self {
}*/ Self { bin_count: 0 }
use futures_util::StreamExt; }
use std::future::ready; }
let mut bin_count = 0;
let s3 = s2 async fn consume_binned_response<T>(inp: InMemoryFrameAsyncReadStream<T>) -> Result<BinnedResponse, Error>
where
T: AsyncRead + Unpin,
{
let s1 = inp
.map_err(|e| error!("TEST GOT ERROR {:?}", e)) .map_err(|e| error!("TEST GOT ERROR {:?}", e))
.filter_map(|item| { .filter_map(|item| {
let g = match item { let g = match item {
@@ -102,7 +134,6 @@ async fn get_cached_0_inner() -> Result<(), Error> {
Ok(item) => match item { Ok(item) => match item {
Ok(item) => { Ok(item) => {
info!("TEST GOT ITEM {:?}", item); info!("TEST GOT ITEM {:?}", item);
bin_count += 1;
Some(Ok(item)) Some(Ok(item))
} }
Err(e) => { Err(e) => {
@@ -120,31 +151,21 @@ async fn get_cached_0_inner() -> Result<(), Error> {
}; };
ready(g) ready(g)
}) })
.for_each(|_| ready(())); .fold(Ok(BinnedResponse::new()), |a, k| {
s3.await; let g = match a {
let t2 = chrono::Utc::now(); Ok(mut a) => match k {
let ntot = 0; Ok(k) => {
let ms = t2.signed_duration_since(t1).num_milliseconds() as u64; a.bin_count += k.ts1s.len();
let throughput = ntot / 1024 * 1000 / ms; Ok(a)
info!( }
"get_cached_0 DONE total download {} MB throughput {:5} kB/s bin_count {}", Err(e) => Err(e),
ntot / 1024 / 1024, },
throughput, Err(e) => Err(e),
bin_count, };
); ready(g)
drop(hosts); });
//Err::<(), _>(format!("test error").into()) let ret = s1.await;
Ok(()) ret
}
#[test]
fn test_gen_test_data() {
let res = taskrun::run(async {
disk::gen::gen_test_data().await?;
Ok(())
});
info!("{:?}", res);
res.unwrap();
} }
#[test] #[test]