From 3f151b6e7f551157b8e362d232cce010fc5246c9 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 1 Oct 2021 15:37:01 +0200 Subject: [PATCH] Bundle small config file for unit test --- disk/src/eventblobs.rs | 3 +- disk/src/merge.rs | 99 ++++++++++-------- disk/src/merge/mergedblobsfromremotes.rs | 8 +- disk/src/merge/mergedfromremotes.rs | 12 +-- disk/src/rangefilter.rs | 2 - httpret/src/lib.rs | 2 +- netfetch/src/ca.rs | 81 ++++++++++++++ netfetch/src/lib.rs | 84 +-------------- netfetch/src/test.rs | 13 +-- netfetch/src/zmtp.rs | 6 +- parse/src/channelconfig.rs | 5 +- ...LOD100-PUP10:SIG-AMPLT-latest-00000_Config | Bin 0 -> 3869 bytes ...LOD100-PUP10:SIG-AMPLT-latest-00000_Config | Bin 0 -> 2129 bytes 13 files changed, 150 insertions(+), 165 deletions(-) create mode 100644 netfetch/src/ca.rs create mode 100644 resources/sf-daqbuf-21-S10CB01-RLOD100-PUP10:SIG-AMPLT-latest-00000_Config create mode 100644 resources/sf-daqbuf-33-S10CB01-RLOD100-PUP10:SIG-AMPLT-latest-00000_Config diff --git a/disk/src/eventblobs.rs b/disk/src/eventblobs.rs index b87bf6a..5cdfe5c 100644 --- a/disk/src/eventblobs.rs +++ b/disk/src/eventblobs.rs @@ -186,7 +186,7 @@ impl Stream for EventChunkerMultifile { chunkers.push(chunker); } } - let merged = MergedStream::new(chunkers, self.range.clone(), self.expand); + let merged = MergedStream::new(chunkers); self.evs = Some(Box::pin(merged)); Ready(Some(Ok(StreamItem::Log(item)))) } @@ -221,7 +221,6 @@ impl Stream for EventChunkerMultifile { #[cfg(test)] mod test { - use crate::merge::MergedStream; use crate::rangefilter::RangeFilter; use crate::{eventblobs::EventChunkerMultifile, eventchunker::EventChunkerConf}; use err::Error; diff --git a/disk/src/merge.rs b/disk/src/merge.rs index 3511ecf..fe99487 100644 --- a/disk/src/merge.rs +++ b/disk/src/merge.rs @@ -2,12 +2,12 @@ use crate::HasSeenBeforeRangeCount; use err::Error; use futures_core::Stream; use futures_util::StreamExt; +use items::ByteEstimate; use items::{Appendable, LogItem, PushableIndex, RangeCompletableItem, Sitemty, StatsItem, StreamItem, WithTimestamps}; -use items::{ByteEstimate, Clearable}; use netpod::histo::HistoLog2; +use netpod::log::*; use netpod::ByteSize; use netpod::EventDataReadStats; -use netpod::{log::*, NanoRange}; use std::collections::VecDeque; use std::pin::Pin; use std::task::{Context, Poll}; @@ -56,7 +56,7 @@ where S: Stream> + Unpin, ITY: Appendable + Unpin, { - pub fn new(inps: Vec, range: NanoRange, expand: bool) -> Self { + pub fn new(inps: Vec) -> Self { let n = inps.len(); let current = (0..n).into_iter().map(|_| MergedCurVal::None).collect(); Self { @@ -288,6 +288,7 @@ mod test { use crate::dataopen::position_file_for_test; use crate::eventchunker::{EventChunker, EventChunkerConf}; use crate::file_content_stream; + use crate::merge::MergedStream; use err::Error; use futures_util::StreamExt; use items::{RangeCompletableItem, StreamItem}; @@ -300,7 +301,7 @@ mod test { const SCALAR_FILE: &str = "../tmpdata/node00/ks_2/byTime/scalar-i32-be/0000000000000000001/0000000000/0000000000086400000_00000_Data"; - const WAVE_FILE: &str = + const _WAVE_FILE: &str = "../tmpdata/node00/ks_3/byTime/wave-f64-be-n21/0000000000000000001/0000000000/0000000000086400000_00000_Data"; #[derive(Debug)] @@ -321,48 +322,55 @@ mod test { .ok_or_else(|| Error::with_msg(format!("can not open file {:?}", path)))?, ); } - //Merge - let file_io_buffer_size = FileIoBufferSize(1024 * 4); - let inp = file_content_stream(err::todoval(), file_io_buffer_size); - let inp = Box::pin(inp); - let channel_config = ChannelConfig { - channel: Channel { - backend: "testbackend".into(), - name: "scalar-i32-be".into(), - }, - keyspace: 2, - time_bin_size: Nanos { ns: DAY }, - scalar_type: ScalarType::I32, - byte_order: ByteOrder::BE, - array: false, - compression: false, - shape: Shape::Scalar, - }; - let stats_conf = EventChunkerConf { - disk_stats_every: ByteSize::kb(1024), - }; - let max_ts = Arc::new(AtomicU64::new(0)); - let expand = false; - let do_decompress = false; - let dbg_path = err::todoval(); + let inps = files + .into_iter() + .map(|file| { + let file_io_buffer_size = FileIoBufferSize(1024 * 4); + let inp = file_content_stream(file, file_io_buffer_size); + inp + }) + .map(|inp| { + let channel_config = ChannelConfig { + channel: Channel { + backend: "testbackend".into(), + name: "scalar-i32-be".into(), + }, + keyspace: 2, + time_bin_size: Nanos { ns: DAY }, + scalar_type: ScalarType::I32, + byte_order: ByteOrder::BE, + array: false, + compression: false, + shape: Shape::Scalar, + }; + let stats_conf = EventChunkerConf { + disk_stats_every: ByteSize::kb(1024), + }; + let max_ts = Arc::new(AtomicU64::new(0)); + let expand = false; + let do_decompress = false; + let dbg_path = PathBuf::from("/dbg/dummy"); - // TODO `expand` flag usage - - let mut chunker = EventChunker::from_event_boundary( - inp, - channel_config, - range, - stats_conf, - dbg_path, - max_ts, - expand, - do_decompress, - ); + // TODO `expand` flag usage + // Does Chunker need to know about `expand` and why? + let chunker = EventChunker::from_event_boundary( + Box::pin(inp), + channel_config, + range.clone(), + stats_conf, + dbg_path, + max_ts, + expand, + do_decompress, + ); + chunker + }) + .collect(); + let mut merged = MergedStream::new(inps); let mut cevs = CollectedEvents { tss: vec![] }; - let mut i1 = 0; - while let Some(item) = chunker.next().await { + while let Some(item) = merged.next().await { if let Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) = item { info!("item: {:?}", item); for ts in item.tss { @@ -376,7 +384,7 @@ mod test { } info!("read {} data items", i1); info!("cevs: {:?}", cevs); - err::todoval() + Ok(cevs) } #[test] @@ -388,6 +396,11 @@ mod test { }; let path = PathBuf::from(SCALAR_FILE); collect_merged_events(vec![path], range).await?; + + // TODO + // assert things + // remove zmtp test from default test suite, move to cli instead + Ok(()) }; taskrun::run(fut) diff --git a/disk/src/merge/mergedblobsfromremotes.rs b/disk/src/merge/mergedblobsfromremotes.rs index 9d6ed9e..2b46c0a 100644 --- a/disk/src/merge/mergedblobsfromremotes.rs +++ b/disk/src/merge/mergedblobsfromremotes.rs @@ -5,8 +5,8 @@ use err::Error; use futures_core::Stream; use futures_util::{pin_mut, StreamExt}; use items::Sitemty; +use netpod::log::*; use netpod::query::RawEventsQuery; -use netpod::{log::*, NanoRange}; use netpod::{Cluster, PerfOpts}; use std::future::Future; use std::pin::Pin; @@ -21,8 +21,6 @@ pub struct MergedBlobsFromRemotes { merged: Option>, completed: bool, errored: bool, - range: NanoRange, - expand: bool, } impl MergedBlobsFromRemotes { @@ -41,8 +39,6 @@ impl MergedBlobsFromRemotes { merged: None, completed: false, errored: false, - range: evq.range.clone(), - expand: evq.agg_kind.need_expand(), } } } @@ -99,7 +95,7 @@ impl Stream for MergedBlobsFromRemotes { } else { if c1 == self.tcp_establish_futs.len() { let inps: Vec<_> = self.nodein.iter_mut().map(|k| k.take().unwrap()).collect(); - let s1 = MergedStream::new(inps, self.range.clone(), self.expand); + let s1 = MergedStream::new(inps); self.merged = Some(Box::pin(s1)); } continue 'outer; diff --git a/disk/src/merge/mergedfromremotes.rs b/disk/src/merge/mergedfromremotes.rs index 65a030c..a2eca59 100644 --- a/disk/src/merge/mergedfromremotes.rs +++ b/disk/src/merge/mergedfromremotes.rs @@ -4,8 +4,8 @@ use err::Error; use futures_core::Stream; use futures_util::{pin_mut, StreamExt}; use items::{Appendable, Clearable, EventsNodeProcessor, FrameType, PushableIndex, Sitemty}; +use netpod::log::*; use netpod::query::RawEventsQuery; -use netpod::{log::*, NanoRange}; use netpod::{Cluster, PerfOpts}; use std::future::Future; use std::pin::Pin; @@ -23,8 +23,6 @@ where merged: Option::Output>>, completed: bool, errored: bool, - range: NanoRange, - expand: bool, } impl MergedFromRemotes @@ -48,8 +46,6 @@ where merged: None, completed: false, errored: false, - range: evq.range.clone(), - expand: evq.agg_kind.need_expand(), } } } @@ -110,11 +106,7 @@ where } else { if c1 == self.tcp_establish_futs.len() { let inps: Vec<_> = self.nodein.iter_mut().map(|k| k.take().unwrap()).collect(); - let s1 = MergedStream::<_, ::Output>::new( - inps, - self.range.clone(), - self.expand, - ); + let s1 = MergedStream::<_, ::Output>::new(inps); self.merged = Some(Box::pin(s1)); } continue 'outer; diff --git a/disk/src/rangefilter.rs b/disk/src/rangefilter.rs index bbfef7a..eebf829 100644 --- a/disk/src/rangefilter.rs +++ b/disk/src/rangefilter.rs @@ -12,7 +12,6 @@ pub struct RangeFilter { expand: bool, prerange: ITY, have_pre: bool, - emitted_pre: bool, emitted_post: bool, done: bool, complete: bool, @@ -29,7 +28,6 @@ where expand, prerange: ITY::empty(), have_pre: false, - emitted_pre: false, emitted_post: false, done: false, complete: false, diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index 0ddd204..4a312c6 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -681,7 +681,7 @@ pub async fn channel_config(req: Request, node_config: &NodeConfigCached) pub async fn ca_connect_1(req: Request, node_config: &NodeConfigCached) -> Result, Error> { let url = Url::parse(&format!("dummy:{}", req.uri()))?; let pairs = get_url_query_pairs(&url); - let res = netfetch::ca_connect_1(pairs, node_config).await?; + let res = netfetch::ca::ca_connect_1(pairs, node_config).await?; let ret = response(StatusCode::OK) .header(http::header::CONTENT_TYPE, APP_JSON_LINES) .body(Body::wrap_stream(res.map(|k| match serde_json::to_string(&k) { diff --git a/netfetch/src/ca.rs b/netfetch/src/ca.rs new file mode 100644 index 0000000..66c5bc4 --- /dev/null +++ b/netfetch/src/ca.rs @@ -0,0 +1,81 @@ +use async_channel::{bounded, Receiver}; +use bytes::{BufMut, BytesMut}; +use err::Error; +use futures_util::FutureExt; +use netpod::NodeConfigCached; +use serde::{Deserialize, Serialize}; +use std::collections::BTreeMap; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; + +#[derive(Debug, Serialize, Deserialize)] +pub struct Message { + cmd: u16, + payload_len: u16, + type_type: u16, + data_len: u16, +} + +#[derive(Debug, Serialize, Deserialize)] +pub enum FetchItem { + Log(String), + Message(Message), +} + +pub async fn ca_connect_1( + _pairs: BTreeMap, + _node_config: &NodeConfigCached, +) -> Result>, Error> { + let (tx, rx) = bounded(16); + let tx2 = tx.clone(); + tokio::task::spawn( + async move { + let mut conn = tokio::net::TcpStream::connect("S30CB06-CVME-LLRF2.psi.ch:5064").await?; + let (mut inp, mut out) = conn.split(); + tx.send(Ok(FetchItem::Log(format!("connected")))).await?; + let mut buf = [0; 64]; + + let mut b2 = BytesMut::with_capacity(128); + b2.put_u16(0x00); + b2.put_u16(0); + b2.put_u16(0); + b2.put_u16(0xb); + b2.put_u32(0); + b2.put_u32(0); + out.write_all(&b2).await?; + tx.send(Ok(FetchItem::Log(format!("written")))).await?; + let n1 = inp.read(&mut buf).await?; + tx.send(Ok(FetchItem::Log(format!("received: {} {:?}", n1, buf)))) + .await?; + + // Search to get cid: + let chn = b"SATCB01-DBPM220:Y2"; + b2.clear(); + b2.put_u16(0x06); + b2.put_u16((16 + chn.len()) as u16); + b2.put_u16(0x00); + b2.put_u16(0x0b); + b2.put_u32(0x71803472); + b2.put_u32(0x71803472); + b2.put_slice(chn); + out.write_all(&b2).await?; + tx.send(Ok(FetchItem::Log(format!("written")))).await?; + let n1 = inp.read(&mut buf).await?; + tx.send(Ok(FetchItem::Log(format!("received: {} {:?}", n1, buf)))) + .await?; + + Ok::<_, Error>(()) + } + .then({ + move |item| async move { + match item { + Ok(_) => {} + Err(e) => { + tx2.send(Ok(FetchItem::Log(format!("Seeing error: {:?}", e)))).await?; + } + } + Ok::<_, Error>(()) + } + }), + ); + Ok(rx) +} diff --git a/netfetch/src/lib.rs b/netfetch/src/lib.rs index 5e3e952..d37227b 100644 --- a/netfetch/src/lib.rs +++ b/netfetch/src/lib.rs @@ -1,86 +1,4 @@ -use async_channel::{bounded, Receiver}; -use bytes::{BufMut, BytesMut}; -use futures_util::FutureExt; -use serde::{Deserialize, Serialize}; -use std::collections::BTreeMap; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; - -use err::Error; -use netpod::NodeConfigCached; - +pub mod ca; #[cfg(test)] pub mod test; pub mod zmtp; - -#[derive(Debug, Serialize, Deserialize)] -pub struct Message { - cmd: u16, - payload_len: u16, - type_type: u16, - data_len: u16, -} - -#[derive(Debug, Serialize, Deserialize)] -pub enum FetchItem { - Log(String), - Message(Message), -} - -pub async fn ca_connect_1( - _pairs: BTreeMap, - _node_config: &NodeConfigCached, -) -> Result>, Error> { - let (tx, rx) = bounded(16); - let tx2 = tx.clone(); - tokio::task::spawn( - async move { - let mut conn = tokio::net::TcpStream::connect("S30CB06-CVME-LLRF2.psi.ch:5064").await?; - let (mut inp, mut out) = conn.split(); - tx.send(Ok(FetchItem::Log(format!("connected")))).await?; - let mut buf = [0; 64]; - - let mut b2 = BytesMut::with_capacity(128); - b2.put_u16(0x00); - b2.put_u16(0); - b2.put_u16(0); - b2.put_u16(0xb); - b2.put_u32(0); - b2.put_u32(0); - out.write_all(&b2).await?; - tx.send(Ok(FetchItem::Log(format!("written")))).await?; - let n1 = inp.read(&mut buf).await?; - tx.send(Ok(FetchItem::Log(format!("received: {} {:?}", n1, buf)))) - .await?; - - // Search to get cid: - let chn = b"SATCB01-DBPM220:Y2"; - b2.clear(); - b2.put_u16(0x06); - b2.put_u16((16 + chn.len()) as u16); - b2.put_u16(0x00); - b2.put_u16(0x0b); - b2.put_u32(0x71803472); - b2.put_u32(0x71803472); - b2.put_slice(chn); - out.write_all(&b2).await?; - tx.send(Ok(FetchItem::Log(format!("written")))).await?; - let n1 = inp.read(&mut buf).await?; - tx.send(Ok(FetchItem::Log(format!("received: {} {:?}", n1, buf)))) - .await?; - - Ok::<_, Error>(()) - } - .then({ - move |item| async move { - match item { - Ok(_) => {} - Err(e) => { - tx2.send(Ok(FetchItem::Log(format!("Seeing error: {:?}", e)))).await?; - } - } - Ok::<_, Error>(()) - } - }), - ); - Ok(rx) -} diff --git a/netfetch/src/test.rs b/netfetch/src/test.rs index 61d7a65..bae7811 100644 --- a/netfetch/src/test.rs +++ b/netfetch/src/test.rs @@ -38,7 +38,7 @@ fn ca_connect_1() { }, ix: 0, }; - let mut rx = super::ca_connect_1(pairs, &node_config).await?; + let mut rx = super::ca::ca_connect_1(pairs, &node_config).await?; while let Some(item) = rx.next().await { info!("got next: {:?}", item); } @@ -46,14 +46,3 @@ fn ca_connect_1() { }) .unwrap(); } - -#[test] -fn zmtp_00() { - taskrun::run(async { - let it = vec![(String::new(), String::new())].into_iter(); - let _pairs = BTreeMap::from_iter(it); - crate::zmtp::zmtp_00().await?; - Ok(()) - }) - .unwrap(); -} diff --git a/netfetch/src/zmtp.rs b/netfetch/src/zmtp.rs index b625a56..548d2c8 100644 --- a/netfetch/src/zmtp.rs +++ b/netfetch/src/zmtp.rs @@ -11,11 +11,7 @@ use tokio::net::TcpStream; pub async fn zmtp_00() -> Result<(), Error> { let addr = "S10-CPPM-MOT0991:9999"; - let conn = tokio::net::TcpStream::connect(addr).await?; - let mut zmtp = Zmtp::new(conn); - while let Some(ev) = zmtp.next().await { - info!("got zmtp event: {:?}", ev); - } + zmtp_client(addr).await?; Ok(()) } diff --git a/parse/src/channelconfig.rs b/parse/src/channelconfig.rs index a5719c8..23dd4da 100644 --- a/parse/src/channelconfig.rs +++ b/parse/src/channelconfig.rs @@ -335,7 +335,10 @@ mod test { fn read_data() -> Vec { use std::io::Read; - let path = "ks/config/S10CB01-RLOD100-PUP10:SIG-AMPLT/latest/00000_Config"; + //let path = "ks/config/S10CB01-RLOD100-PUP10:SIG-AMPLT/latest/00000_Config"; + let cwd = std::env::current_dir(); + netpod::log::info!("CWD: {:?}", cwd); + let path = "../resources/sf-daqbuf-33-S10CB01-RLOD100-PUP10:SIG-AMPLT-latest-00000_Config"; let mut f1 = std::fs::File::open(path).unwrap(); let mut buf = vec![]; f1.read_to_end(&mut buf).unwrap(); diff --git a/resources/sf-daqbuf-21-S10CB01-RLOD100-PUP10:SIG-AMPLT-latest-00000_Config b/resources/sf-daqbuf-21-S10CB01-RLOD100-PUP10:SIG-AMPLT-latest-00000_Config new file mode 100644 index 0000000000000000000000000000000000000000..88c0b0b2012d1d576fde0ac17b7d8a7d8c711445 GIT binary patch literal 3869 zcmZQz00Q-3Ljz|g14G>)AAc7^0|VWF&;UaNt6)!eT}R&lpAe7|ATHrJ&J(Ew0K z2?&ccteI;)Telo!gwE`)?LZ3IP97Ma3#0%HsA1>!^Vx^jJ^&e~vo1IRNMW^;y8hh3 z*cN;zA7bZn5RKJN>iYA@Ry(Qd&m&J7=5GbX8-vafH+cL((-53b zU4I@IV7<##2DbC~%Wrh?XXkQVg-9Jx8fLIw#b<)cPOAE|%l;1Q^xzF3J8f2(8sM^% zs`1%>ooVLx-Ug7Jt_N%n04c0#h^qeVzs@|hewiA?PWv=mc2d=!t70NMFy z&7ulicBX(7fB_ZC_)oq$d71OTradvUX#g^49-pFuE3cnAzUmp!E=KVqNgx`loz#p^ z(cpR6*VK#mfb49&*z*U?PMXx&qKBn6@8S!(3$%;Lu3`s>roBH8X9l`VxZ@1A^Uq8W zjcg}v{P}XTLOS0>Q2u19w@hckWhY1i0w_mgX}z>TXP%DPI#S7&2%d!vfn&8aT=&i!gRcd6TNLhllT;iELeVW!-GDMoo%c8 z_|W_bx09;z8QC~5`%ep~eZbUqf~5qPom355pq)E2*Mr(aAUo09Lp1VdCtts$lNQ*{ zD|N5v)AAc7^0|VWF&;UaNt6)!eT}R&lpAe7|ATHrJ&J(Ew0K z2?&cW2=rO{(MJo&W@7WHUInC)?F3mx13N=1-Iq&bGxazPRk9YJ5gE&dX$+ydG@l36^|Zc2YC1hym@~mbdRC z)J_nMHEgNt&rZJX*4-*#JFnC|!DT0P{n^R);NV$fh@JJtxa_2^KhH$|{{6@RWM{i; z`UYHfY5*l5fC^;nKI4>n=| literal 0 HcmV?d00001