Bundle small config file for unit test

This commit is contained in:
Dominik Werder
2021-10-01 15:37:01 +02:00
parent c6b9ba7154
commit 3f151b6e7f
13 changed files with 150 additions and 165 deletions

View File

@@ -186,7 +186,7 @@ impl Stream for EventChunkerMultifile {
chunkers.push(chunker);
}
}
let merged = MergedStream::new(chunkers, self.range.clone(), self.expand);
let merged = MergedStream::new(chunkers);
self.evs = Some(Box::pin(merged));
Ready(Some(Ok(StreamItem::Log(item))))
}
@@ -221,7 +221,6 @@ impl Stream for EventChunkerMultifile {
#[cfg(test)]
mod test {
use crate::merge::MergedStream;
use crate::rangefilter::RangeFilter;
use crate::{eventblobs::EventChunkerMultifile, eventchunker::EventChunkerConf};
use err::Error;

View File

@@ -2,12 +2,12 @@ use crate::HasSeenBeforeRangeCount;
use err::Error;
use futures_core::Stream;
use futures_util::StreamExt;
use items::ByteEstimate;
use items::{Appendable, LogItem, PushableIndex, RangeCompletableItem, Sitemty, StatsItem, StreamItem, WithTimestamps};
use items::{ByteEstimate, Clearable};
use netpod::histo::HistoLog2;
use netpod::log::*;
use netpod::ByteSize;
use netpod::EventDataReadStats;
use netpod::{log::*, NanoRange};
use std::collections::VecDeque;
use std::pin::Pin;
use std::task::{Context, Poll};
@@ -56,7 +56,7 @@ where
S: Stream<Item = Sitemty<ITY>> + Unpin,
ITY: Appendable + Unpin,
{
pub fn new(inps: Vec<S>, range: NanoRange, expand: bool) -> Self {
pub fn new(inps: Vec<S>) -> Self {
let n = inps.len();
let current = (0..n).into_iter().map(|_| MergedCurVal::None).collect();
Self {
@@ -288,6 +288,7 @@ mod test {
use crate::dataopen::position_file_for_test;
use crate::eventchunker::{EventChunker, EventChunkerConf};
use crate::file_content_stream;
use crate::merge::MergedStream;
use err::Error;
use futures_util::StreamExt;
use items::{RangeCompletableItem, StreamItem};
@@ -300,7 +301,7 @@ mod test {
const SCALAR_FILE: &str =
"../tmpdata/node00/ks_2/byTime/scalar-i32-be/0000000000000000001/0000000000/0000000000086400000_00000_Data";
const WAVE_FILE: &str =
const _WAVE_FILE: &str =
"../tmpdata/node00/ks_3/byTime/wave-f64-be-n21/0000000000000000001/0000000000/0000000000086400000_00000_Data";
#[derive(Debug)]
@@ -321,48 +322,55 @@ mod test {
.ok_or_else(|| Error::with_msg(format!("can not open file {:?}", path)))?,
);
}
//Merge
let file_io_buffer_size = FileIoBufferSize(1024 * 4);
let inp = file_content_stream(err::todoval(), file_io_buffer_size);
let inp = Box::pin(inp);
let channel_config = ChannelConfig {
channel: Channel {
backend: "testbackend".into(),
name: "scalar-i32-be".into(),
},
keyspace: 2,
time_bin_size: Nanos { ns: DAY },
scalar_type: ScalarType::I32,
byte_order: ByteOrder::BE,
array: false,
compression: false,
shape: Shape::Scalar,
};
let stats_conf = EventChunkerConf {
disk_stats_every: ByteSize::kb(1024),
};
let max_ts = Arc::new(AtomicU64::new(0));
let expand = false;
let do_decompress = false;
let dbg_path = err::todoval();
let inps = files
.into_iter()
.map(|file| {
let file_io_buffer_size = FileIoBufferSize(1024 * 4);
let inp = file_content_stream(file, file_io_buffer_size);
inp
})
.map(|inp| {
let channel_config = ChannelConfig {
channel: Channel {
backend: "testbackend".into(),
name: "scalar-i32-be".into(),
},
keyspace: 2,
time_bin_size: Nanos { ns: DAY },
scalar_type: ScalarType::I32,
byte_order: ByteOrder::BE,
array: false,
compression: false,
shape: Shape::Scalar,
};
let stats_conf = EventChunkerConf {
disk_stats_every: ByteSize::kb(1024),
};
let max_ts = Arc::new(AtomicU64::new(0));
let expand = false;
let do_decompress = false;
let dbg_path = PathBuf::from("/dbg/dummy");
// TODO `expand` flag usage
let mut chunker = EventChunker::from_event_boundary(
inp,
channel_config,
range,
stats_conf,
dbg_path,
max_ts,
expand,
do_decompress,
);
// TODO `expand` flag usage
// Does Chunker need to know about `expand` and why?
let chunker = EventChunker::from_event_boundary(
Box::pin(inp),
channel_config,
range.clone(),
stats_conf,
dbg_path,
max_ts,
expand,
do_decompress,
);
chunker
})
.collect();
let mut merged = MergedStream::new(inps);
let mut cevs = CollectedEvents { tss: vec![] };
let mut i1 = 0;
while let Some(item) = chunker.next().await {
while let Some(item) = merged.next().await {
if let Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) = item {
info!("item: {:?}", item);
for ts in item.tss {
@@ -376,7 +384,7 @@ mod test {
}
info!("read {} data items", i1);
info!("cevs: {:?}", cevs);
err::todoval()
Ok(cevs)
}
#[test]
@@ -388,6 +396,11 @@ mod test {
};
let path = PathBuf::from(SCALAR_FILE);
collect_merged_events(vec![path], range).await?;
// TODO
// assert things
// remove zmtp test from default test suite, move to cli instead
Ok(())
};
taskrun::run(fut)

View File

@@ -5,8 +5,8 @@ use err::Error;
use futures_core::Stream;
use futures_util::{pin_mut, StreamExt};
use items::Sitemty;
use netpod::log::*;
use netpod::query::RawEventsQuery;
use netpod::{log::*, NanoRange};
use netpod::{Cluster, PerfOpts};
use std::future::Future;
use std::pin::Pin;
@@ -21,8 +21,6 @@ pub struct MergedBlobsFromRemotes {
merged: Option<T001<EventFull>>,
completed: bool,
errored: bool,
range: NanoRange,
expand: bool,
}
impl MergedBlobsFromRemotes {
@@ -41,8 +39,6 @@ impl MergedBlobsFromRemotes {
merged: None,
completed: false,
errored: false,
range: evq.range.clone(),
expand: evq.agg_kind.need_expand(),
}
}
}
@@ -99,7 +95,7 @@ impl Stream for MergedBlobsFromRemotes {
} else {
if c1 == self.tcp_establish_futs.len() {
let inps: Vec<_> = self.nodein.iter_mut().map(|k| k.take().unwrap()).collect();
let s1 = MergedStream::new(inps, self.range.clone(), self.expand);
let s1 = MergedStream::new(inps);
self.merged = Some(Box::pin(s1));
}
continue 'outer;

View File

@@ -4,8 +4,8 @@ use err::Error;
use futures_core::Stream;
use futures_util::{pin_mut, StreamExt};
use items::{Appendable, Clearable, EventsNodeProcessor, FrameType, PushableIndex, Sitemty};
use netpod::log::*;
use netpod::query::RawEventsQuery;
use netpod::{log::*, NanoRange};
use netpod::{Cluster, PerfOpts};
use std::future::Future;
use std::pin::Pin;
@@ -23,8 +23,6 @@ where
merged: Option<T001<<ENP as EventsNodeProcessor>::Output>>,
completed: bool,
errored: bool,
range: NanoRange,
expand: bool,
}
impl<ENP> MergedFromRemotes<ENP>
@@ -48,8 +46,6 @@ where
merged: None,
completed: false,
errored: false,
range: evq.range.clone(),
expand: evq.agg_kind.need_expand(),
}
}
}
@@ -110,11 +106,7 @@ where
} else {
if c1 == self.tcp_establish_futs.len() {
let inps: Vec<_> = self.nodein.iter_mut().map(|k| k.take().unwrap()).collect();
let s1 = MergedStream::<_, <ENP as EventsNodeProcessor>::Output>::new(
inps,
self.range.clone(),
self.expand,
);
let s1 = MergedStream::<_, <ENP as EventsNodeProcessor>::Output>::new(inps);
self.merged = Some(Box::pin(s1));
}
continue 'outer;

View File

@@ -12,7 +12,6 @@ pub struct RangeFilter<S, ITY> {
expand: bool,
prerange: ITY,
have_pre: bool,
emitted_pre: bool,
emitted_post: bool,
done: bool,
complete: bool,
@@ -29,7 +28,6 @@ where
expand,
prerange: ITY::empty(),
have_pre: false,
emitted_pre: false,
emitted_post: false,
done: false,
complete: false,

View File

@@ -681,7 +681,7 @@ pub async fn channel_config(req: Request<Body>, node_config: &NodeConfigCached)
pub async fn ca_connect_1(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
let url = Url::parse(&format!("dummy:{}", req.uri()))?;
let pairs = get_url_query_pairs(&url);
let res = netfetch::ca_connect_1(pairs, node_config).await?;
let res = netfetch::ca::ca_connect_1(pairs, node_config).await?;
let ret = response(StatusCode::OK)
.header(http::header::CONTENT_TYPE, APP_JSON_LINES)
.body(Body::wrap_stream(res.map(|k| match serde_json::to_string(&k) {

81
netfetch/src/ca.rs Normal file
View File

@@ -0,0 +1,81 @@
use async_channel::{bounded, Receiver};
use bytes::{BufMut, BytesMut};
use err::Error;
use futures_util::FutureExt;
use netpod::NodeConfigCached;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[derive(Debug, Serialize, Deserialize)]
pub struct Message {
cmd: u16,
payload_len: u16,
type_type: u16,
data_len: u16,
}
#[derive(Debug, Serialize, Deserialize)]
pub enum FetchItem {
Log(String),
Message(Message),
}
pub async fn ca_connect_1(
_pairs: BTreeMap<String, String>,
_node_config: &NodeConfigCached,
) -> Result<Receiver<Result<FetchItem, Error>>, Error> {
let (tx, rx) = bounded(16);
let tx2 = tx.clone();
tokio::task::spawn(
async move {
let mut conn = tokio::net::TcpStream::connect("S30CB06-CVME-LLRF2.psi.ch:5064").await?;
let (mut inp, mut out) = conn.split();
tx.send(Ok(FetchItem::Log(format!("connected")))).await?;
let mut buf = [0; 64];
let mut b2 = BytesMut::with_capacity(128);
b2.put_u16(0x00);
b2.put_u16(0);
b2.put_u16(0);
b2.put_u16(0xb);
b2.put_u32(0);
b2.put_u32(0);
out.write_all(&b2).await?;
tx.send(Ok(FetchItem::Log(format!("written")))).await?;
let n1 = inp.read(&mut buf).await?;
tx.send(Ok(FetchItem::Log(format!("received: {} {:?}", n1, buf))))
.await?;
// Search to get cid:
let chn = b"SATCB01-DBPM220:Y2";
b2.clear();
b2.put_u16(0x06);
b2.put_u16((16 + chn.len()) as u16);
b2.put_u16(0x00);
b2.put_u16(0x0b);
b2.put_u32(0x71803472);
b2.put_u32(0x71803472);
b2.put_slice(chn);
out.write_all(&b2).await?;
tx.send(Ok(FetchItem::Log(format!("written")))).await?;
let n1 = inp.read(&mut buf).await?;
tx.send(Ok(FetchItem::Log(format!("received: {} {:?}", n1, buf))))
.await?;
Ok::<_, Error>(())
}
.then({
move |item| async move {
match item {
Ok(_) => {}
Err(e) => {
tx2.send(Ok(FetchItem::Log(format!("Seeing error: {:?}", e)))).await?;
}
}
Ok::<_, Error>(())
}
}),
);
Ok(rx)
}

View File

@@ -1,86 +1,4 @@
use async_channel::{bounded, Receiver};
use bytes::{BufMut, BytesMut};
use futures_util::FutureExt;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use err::Error;
use netpod::NodeConfigCached;
pub mod ca;
#[cfg(test)]
pub mod test;
pub mod zmtp;
#[derive(Debug, Serialize, Deserialize)]
pub struct Message {
cmd: u16,
payload_len: u16,
type_type: u16,
data_len: u16,
}
#[derive(Debug, Serialize, Deserialize)]
pub enum FetchItem {
Log(String),
Message(Message),
}
pub async fn ca_connect_1(
_pairs: BTreeMap<String, String>,
_node_config: &NodeConfigCached,
) -> Result<Receiver<Result<FetchItem, Error>>, Error> {
let (tx, rx) = bounded(16);
let tx2 = tx.clone();
tokio::task::spawn(
async move {
let mut conn = tokio::net::TcpStream::connect("S30CB06-CVME-LLRF2.psi.ch:5064").await?;
let (mut inp, mut out) = conn.split();
tx.send(Ok(FetchItem::Log(format!("connected")))).await?;
let mut buf = [0; 64];
let mut b2 = BytesMut::with_capacity(128);
b2.put_u16(0x00);
b2.put_u16(0);
b2.put_u16(0);
b2.put_u16(0xb);
b2.put_u32(0);
b2.put_u32(0);
out.write_all(&b2).await?;
tx.send(Ok(FetchItem::Log(format!("written")))).await?;
let n1 = inp.read(&mut buf).await?;
tx.send(Ok(FetchItem::Log(format!("received: {} {:?}", n1, buf))))
.await?;
// Search to get cid:
let chn = b"SATCB01-DBPM220:Y2";
b2.clear();
b2.put_u16(0x06);
b2.put_u16((16 + chn.len()) as u16);
b2.put_u16(0x00);
b2.put_u16(0x0b);
b2.put_u32(0x71803472);
b2.put_u32(0x71803472);
b2.put_slice(chn);
out.write_all(&b2).await?;
tx.send(Ok(FetchItem::Log(format!("written")))).await?;
let n1 = inp.read(&mut buf).await?;
tx.send(Ok(FetchItem::Log(format!("received: {} {:?}", n1, buf))))
.await?;
Ok::<_, Error>(())
}
.then({
move |item| async move {
match item {
Ok(_) => {}
Err(e) => {
tx2.send(Ok(FetchItem::Log(format!("Seeing error: {:?}", e)))).await?;
}
}
Ok::<_, Error>(())
}
}),
);
Ok(rx)
}

View File

@@ -38,7 +38,7 @@ fn ca_connect_1() {
},
ix: 0,
};
let mut rx = super::ca_connect_1(pairs, &node_config).await?;
let mut rx = super::ca::ca_connect_1(pairs, &node_config).await?;
while let Some(item) = rx.next().await {
info!("got next: {:?}", item);
}
@@ -46,14 +46,3 @@ fn ca_connect_1() {
})
.unwrap();
}
#[test]
fn zmtp_00() {
taskrun::run(async {
let it = vec![(String::new(), String::new())].into_iter();
let _pairs = BTreeMap::from_iter(it);
crate::zmtp::zmtp_00().await?;
Ok(())
})
.unwrap();
}

View File

@@ -11,11 +11,7 @@ use tokio::net::TcpStream;
pub async fn zmtp_00() -> Result<(), Error> {
let addr = "S10-CPPM-MOT0991:9999";
let conn = tokio::net::TcpStream::connect(addr).await?;
let mut zmtp = Zmtp::new(conn);
while let Some(ev) = zmtp.next().await {
info!("got zmtp event: {:?}", ev);
}
zmtp_client(addr).await?;
Ok(())
}

View File

@@ -335,7 +335,10 @@ mod test {
fn read_data() -> Vec<u8> {
use std::io::Read;
let path = "ks/config/S10CB01-RLOD100-PUP10:SIG-AMPLT/latest/00000_Config";
//let path = "ks/config/S10CB01-RLOD100-PUP10:SIG-AMPLT/latest/00000_Config";
let cwd = std::env::current_dir();
netpod::log::info!("CWD: {:?}", cwd);
let path = "../resources/sf-daqbuf-33-S10CB01-RLOD100-PUP10:SIG-AMPLT-latest-00000_Config";
let mut f1 = std::fs::File::open(path).unwrap();
let mut buf = vec![];
f1.read_to_end(&mut buf).unwrap();