From 5e385fd88cc3e51a06cef282f2cd972d57e773f6 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 21 Sep 2021 16:13:29 +0200 Subject: [PATCH] Test for file positioning --- .gitignore | 2 + Cargo.toml | 7 +- archapp/src/test.rs | 11 +- dbconn/Cargo.toml | 2 +- disk/Cargo.toml | 2 +- disk/src/dataopen.rs | 185 ++++++++++++++++++++----- disk/src/eventblobs.rs | 24 ++-- disk/src/eventchunker.rs | 34 +++++ disk/src/index.rs | 49 ++++--- httpclient/Cargo.toml | 8 +- httpret/static/documentation/api4.html | 2 +- taskrun/src/lib.rs | 2 +- 12 files changed, 256 insertions(+), 72 deletions(-) diff --git a/.gitignore b/.gitignore index 9245b17..9018c9f 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,7 @@ /Cargo.lock /target +/archapp/src/generated /.idea +/.vscode /tmpdata /docs diff --git a/Cargo.toml b/Cargo.toml index 023196d..81c534e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,12 +2,13 @@ members = ["daqbuffer", "h5out", "archapp", "archapp_xc", "archapp_wrap", "items", "nodenet", "httpclient"] [profile.release] -debug = 1 opt-level = 1 +debug = 1 #overflow-checks = true #debug-assertions = true lto = "off" -codegen-units = 40 +codegen-units = 160 +incremental = true [patch.crates-io] -tokio = { git = "https://github.com/dominikwerder/tokio", branch = "maxbuf" } +tokio = { git = "https://github.com/dominikwerder/tokio", rev = "637a0c8a" } diff --git a/archapp/src/test.rs b/archapp/src/test.rs index 023712c..5e35cd9 100644 --- a/archapp/src/test.rs +++ b/archapp/src/test.rs @@ -10,8 +10,17 @@ pub fn read_pb_dummy() -> Result<(), Error> { #[cfg(feature = "devread")] #[test] fn read_pb_00() -> Result<(), Error> { + use std::path::PathBuf; + let block1 = async move { - let path = "../../../../archappdata/tmp/lts/ArchiverStore/SARUN16/MQUA080/X:2021_01.pb"; + let homedir = std::env::var("HOME").unwrap(); + let path = PathBuf::from(homedir) + .join("archappdata") + .join("lts") + .join("ArchiverStore") + .join("SARUN16") + .join("MQUA080") + .join("X:2021_01.pb"); let f1 = tokio::fs::read(path).await?; let mut j1 = 0; loop { diff --git a/dbconn/Cargo.toml b/dbconn/Cargo.toml index 7ee2a95..b8a45cc 100644 --- a/dbconn/Cargo.toml +++ b/dbconn/Cargo.toml @@ -19,7 +19,7 @@ bincode = "1.3.3" pin-project = "1.0.7" #async-channel = "1" #dashmap = "3" -tokio-postgres = { version = "0.7", features = ["runtime", "with-chrono-0_4", "with-serde_json-1"] } +tokio-postgres = { version = "0.7.2", features = ["runtime", "with-chrono-0_4", "with-serde_json-1"] } async-channel = "1.6" chrono = "0.4" regex = "1.5.4" diff --git a/disk/Cargo.toml b/disk/Cargo.toml index 74c33f5..7f477ee 100644 --- a/disk/Cargo.toml +++ b/disk/Cargo.toml @@ -22,7 +22,7 @@ byteorder = "1.4.3" futures-core = "0.3.14" futures-util = "0.3.14" async-stream = "0.3.0" -tracing = { version = "0.1.25", features = [] } +tracing = "0.1.25" tracing-futures = { version = "0.2.5", features = ["futures-01", "futures-03", "std-future"] } fs2 = "0.4.3" libc = "0.2.93" diff --git a/disk/src/dataopen.rs b/disk/src/dataopen.rs index a12d483..733a272 100644 --- a/disk/src/dataopen.rs +++ b/disk/src/dataopen.rs @@ -16,6 +16,7 @@ pub struct OpenedFile { pub positioned: bool, pub index: bool, pub nreads: u32, + pub pos: u64, } #[derive(Debug)] @@ -81,7 +82,7 @@ async fn open_files_inner( } let mut a = vec![]; for path in paths::datapaths_for_timebin(tb, &channel_config, &node).await? { - let w = position_file(&path, range, &channel_config, false).await?; + let w = position_file(&path, range, false).await?; if w.found { a.push(w.file); } @@ -101,12 +102,7 @@ struct Positioned { found: bool, } -async fn position_file( - path: &PathBuf, - range: &NanoRange, - channel_config: &ChannelConfig, - expand: bool, -) -> Result { +async fn position_file(path: &PathBuf, range: &NanoRange, expand: bool) -> Result { match OpenOptions::new().read(true).open(&path).await { Ok(file) => { let index_path = PathBuf::from(format!("{}_Index", path.to_str().unwrap())); @@ -114,40 +110,28 @@ async fn position_file( Ok(mut index_file) => { let meta = index_file.metadata().await?; if meta.len() > 1024 * 1024 * 120 { - let msg = format!( - "too large index file {} bytes for {}", - meta.len(), - channel_config.channel.name - ); + let msg = format!("too large index file {} bytes for {:?}", meta.len(), index_path); error!("{}", msg); return Err(Error::with_msg(msg)); } else if meta.len() > 1024 * 1024 * 80 { - let msg = format!( - "very large index file {} bytes for {}", - meta.len(), - channel_config.channel.name - ); + let msg = format!("very large index file {} bytes for {:?}", meta.len(), index_path); warn!("{}", msg); } else if meta.len() > 1024 * 1024 * 20 { - let msg = format!( - "large index file {} bytes for {}", - meta.len(), - channel_config.channel.name - ); + let msg = format!("large index file {} bytes for {:?}", meta.len(), index_path); info!("{}", msg); } if meta.len() < 2 { return Err(Error::with_msg(format!( - "bad meta len {} for {}", + "bad meta len {} for {:?}", meta.len(), - channel_config.channel.name + index_path ))); } if meta.len() % 16 != 2 { return Err(Error::with_msg(format!( - "bad meta len {} for {}", + "bad meta len {} for {:?}", meta.len(), - channel_config.channel.name + index_path ))); } let mut buf = BytesMut::with_capacity(meta.len() as usize); @@ -169,6 +153,7 @@ async fn position_file( positioned: true, index: true, nreads: 0, + pos: o.1, }; return Ok(Positioned { file: g, found: true }); } @@ -180,6 +165,7 @@ async fn position_file( positioned: false, index: true, nreads: 0, + pos: 0, }; return Ok(Positioned { file: g, found: false }); } @@ -189,9 +175,10 @@ async fn position_file( ErrorKind::NotFound => { let ts1 = Instant::now(); let res = if expand { - super::index::position_static_len_datafile_at_largest_smaller_than(file, range.beg).await? + super::index::position_static_len_datafile_at_largest_smaller_than(file, range.clone()) + .await? } else { - super::index::position_static_len_datafile(file, range.beg).await? + super::index::position_static_len_datafile(file, range.clone()).await? }; let ts2 = Instant::now(); if false { @@ -208,6 +195,7 @@ async fn position_file( positioned: true, index: false, nreads: res.2, + pos: res.3, }; return Ok(Positioned { file: g, found: true }); } else { @@ -218,6 +206,7 @@ async fn position_file( positioned: false, index: false, nreads: res.2, + pos: 0, }; return Ok(Positioned { file: g, found: false }); } @@ -234,6 +223,7 @@ async fn position_file( positioned: false, index: true, nreads: 0, + pos: 0, }; return Ok(Positioned { file: g, found: false }); } @@ -299,7 +289,7 @@ async fn get_timebins(channel_config: &ChannelConfig, node: Node) -> Result>, range: &NanoRange, channel_config: &ChannelConfig, @@ -332,7 +322,7 @@ async fn open_expanded_files_inner( let tb = timebins[p1]; let mut a = vec![]; for path in paths::datapaths_for_timebin(tb, &channel_config, &node).await? { - let w = position_file(&path, range, &channel_config, true).await?; + let w = position_file(&path, range, true).await?; if w.found { info!("----- open_expanded_files_inner w.found for {:?}", path); a.push(w.file); @@ -360,7 +350,7 @@ async fn open_expanded_files_inner( let tb = timebins[p1]; let mut a = vec![]; for path in paths::datapaths_for_timebin(tb, &channel_config, &node).await? { - let w = position_file(&path, range, &channel_config, false).await?; + let w = position_file(&path, range, false).await?; if w.found { a.push(w.file); } @@ -422,3 +412,136 @@ fn expanded_file_list() { }; taskrun::run(task).unwrap(); } + +#[cfg(test)] +mod test { + use crate::dataopen::position_file; + use err::Error; + use netpod::timeunits::{DAY, HOUR, MS}; + use netpod::NanoRange; + + #[test] + fn position_basic_file_at_begin() -> Result<(), Error> { + let fut = async { + let path = "../tmpdata/node00/ks_2/byTime/scalar-i32-be/0000000000000000001/0000000000/0000000000086400000_00000_Data".into(); + let range = NanoRange { + beg: DAY, + end: DAY + MS * 20000, + }; + let expand = false; + let res = position_file(&path, &range, expand).await?; + assert_eq!(res.found, true); + assert_eq!(res.file.file.is_some(), true); + assert_eq!(res.file.index, false); + assert_eq!(res.file.positioned, true); + assert_eq!(res.file.pos, 23); + Ok(()) + }; + taskrun::run(fut)?; + Ok(()) + } + + #[test] + fn position_basic_file_for_empty_range() -> Result<(), Error> { + let fut = async { + let path = "../tmpdata/node00/ks_2/byTime/scalar-i32-be/0000000000000000001/0000000000/0000000000086400000_00000_Data".into(); + let range = NanoRange { + beg: DAY + MS * 80000, + end: DAY + MS * 80000, + }; + let expand = false; + let res = position_file(&path, &range, expand).await?; + assert_eq!(res.found, false); + assert_eq!(res.file.file.is_some(), false); + assert_eq!(res.file.index, false); + assert_eq!(res.file.positioned, false); + //assert_eq!(res.file.pos, 23); + Ok(()) + }; + taskrun::run(fut)?; + Ok(()) + } + + #[test] + fn position_basic_file_at_begin_for_small_range() -> Result<(), Error> { + let fut = async { + let path = "../tmpdata/node00/ks_2/byTime/scalar-i32-be/0000000000000000001/0000000000/0000000000086400000_00000_Data".into(); + let range = NanoRange { + beg: DAY, + end: DAY + MS * 300000, + }; + let expand = false; + let res = position_file(&path, &range, expand).await?; + assert_eq!(res.found, true); + assert_eq!(res.file.file.is_some(), true); + assert_eq!(res.file.index, false); + assert_eq!(res.file.positioned, true); + assert_eq!(res.file.pos, 23); + Ok(()) + }; + taskrun::run(fut)?; + Ok(()) + } + + #[test] + fn position_basic_file_at_inner() -> Result<(), Error> { + let fut = async { + let path = "../tmpdata/node00/ks_2/byTime/scalar-i32-be/0000000000000000001/0000000000/0000000000086400000_00000_Data".into(); + let range = NanoRange { + beg: DAY + MS * 4000, + end: DAY + MS * 7000, + }; + let expand = false; + let res = position_file(&path, &range, expand).await?; + assert_eq!(res.found, true); + assert_eq!(res.file.file.is_some(), true); + assert_eq!(res.file.index, false); + assert_eq!(res.file.positioned, true); + assert_eq!(res.file.pos, 179); + Ok(()) + }; + taskrun::run(fut)?; + Ok(()) + } + + #[test] + fn position_basic_file_at_inner_for_too_small_range() -> Result<(), Error> { + let fut = async { + let path = "../tmpdata/node00/ks_2/byTime/scalar-i32-be/0000000000000000001/0000000000/0000000000086400000_00000_Data".into(); + let range = NanoRange { + beg: DAY + MS * 1501, + end: DAY + MS * 1502, + }; + let expand = false; + let res = position_file(&path, &range, expand).await?; + assert_eq!(res.found, false); + assert_eq!(res.file.file.is_some(), false); + assert_eq!(res.file.index, false); + assert_eq!(res.file.positioned, false); + assert_eq!(res.file.pos, 0); + Ok(()) + }; + taskrun::run(fut)?; + Ok(()) + } + + #[test] + fn position_basic_file_starts_after_range() -> Result<(), Error> { + let fut = async { + let path = "../tmpdata/node00/ks_2/byTime/scalar-i32-be/0000000000000000001/0000000000/0000000000086400000_00000_Data".into(); + let range = NanoRange { + beg: HOUR * 22, + end: HOUR * 23, + }; + let expand = false; + let res = position_file(&path, &range, expand).await?; + assert_eq!(res.found, false); + assert_eq!(res.file.file.is_some(), false); + assert_eq!(res.file.index, false); + assert_eq!(res.file.positioned, false); + Ok(()) + }; + taskrun::run(fut)?; + Ok(()) + } +} diff --git a/disk/src/eventblobs.rs b/disk/src/eventblobs.rs index f00bcec..5092ca6 100644 --- a/disk/src/eventblobs.rs +++ b/disk/src/eventblobs.rs @@ -219,7 +219,7 @@ impl Stream for EventChunkerMultifile { } #[cfg(test)] -fn read_expanded_for_range(range: netpod::NanoRange) -> Result<(usize, usize), Error> { +fn read_expanded_for_range(range: netpod::NanoRange, nodeix: usize) -> Result<(usize, usize), Error> { use netpod::timeunits::*; use netpod::{ByteSize, Nanos}; let chn = netpod::Channel { @@ -238,8 +238,7 @@ fn read_expanded_for_range(range: netpod::NanoRange) -> Result<(usize, usize), E compression: false, }; let cluster = taskrun::test_cluster(); - let node_ix = 0; - let node = cluster.nodes[node_ix].clone(); + let node = cluster.nodes[nodeix].clone(); let buffer_size = 512; let event_chunker_conf = EventChunkerConf { disk_stats_every: ByteSize::kb(1024), @@ -250,7 +249,7 @@ fn read_expanded_for_range(range: netpod::NanoRange) -> Result<(usize, usize), E range, channel_config, node, - node_ix, + nodeix, FileIoBufferSize::new(buffer_size), event_chunker_conf, true, @@ -261,6 +260,7 @@ fn read_expanded_for_range(range: netpod::NanoRange) -> Result<(usize, usize), E Ok(item) => match item { StreamItem::DataItem(item) => match item { RangeCompletableItem::Data(item) => { + info!("item: {:?}", item.tss.iter().map(|x| x / 1000000).collect::>()); event_count += item.tss.len(); } _ => {} @@ -287,9 +287,9 @@ fn read_expanded_0() -> Result<(), Error> { use netpod::timeunits::*; let range = netpod::NanoRange { beg: DAY + MS * 0, - end: DAY + MS * 0 + MS * 1500, + end: DAY + MS * 1500, }; - let res = read_expanded_for_range(range)?; + let res = read_expanded_for_range(range, 0)?; if res.0 != 2 { Err(Error::with_msg(format!("unexpected number of events: {}", res.0)))?; } @@ -301,9 +301,9 @@ fn read_expanded_1() -> Result<(), Error> { use netpod::timeunits::*; let range = netpod::NanoRange { beg: DAY + MS * 0, - end: DAY + MS * 0 + MS * 1501, + end: DAY + MS * 1501, }; - let res = read_expanded_for_range(range)?; + let res = read_expanded_for_range(range, 0)?; if res.0 != 3 { Err(Error::with_msg(format!("unexpected number of events: {}", res.0)))?; } @@ -315,9 +315,9 @@ fn read_expanded_2() -> Result<(), Error> { use netpod::timeunits::*; let range = netpod::NanoRange { beg: DAY - MS * 100, - end: DAY + MS * 0 + MS * 1501, + end: DAY + MS * 1501, }; - let res = read_expanded_for_range(range)?; + let res = read_expanded_for_range(range, 0)?; if res.0 != 3 { Err(Error::with_msg(format!("unexpected number of events: {}", res.0)))?; } @@ -329,9 +329,9 @@ fn read_expanded_3() -> Result<(), Error> { use netpod::timeunits::*; let range = netpod::NanoRange { beg: DAY - MS * 1500, - end: DAY + MS * 0 + MS * 1501, + end: DAY + MS * 1501, }; - let res = read_expanded_for_range(range)?; + let res = read_expanded_for_range(range, 0)?; if res.0 != 4 { Err(Error::with_msg(format!("unexpected number of events: {}", res.0)))?; } diff --git a/disk/src/eventchunker.rs b/disk/src/eventchunker.rs index 96ed20e..308ef8d 100644 --- a/disk/src/eventchunker.rs +++ b/disk/src/eventchunker.rs @@ -691,3 +691,37 @@ impl HasSeenBeforeRangeCount for EventChunker { self.seen_before_range_count() } } + +#[cfg(test)] +mod test { + use err::Error; + use netpod::timeunits::*; + use netpod::{ByteSize, Nanos}; + + /* + #[test] + fn read_expanded_for_range(range: netpod::NanoRange, nodeix: usize) -> Result<(usize, usize), Error> { + let chn = netpod::Channel { + backend: "testbackend".into(), + name: "scalar-i32-be".into(), + }; + // TODO read config from disk. + let channel_config = ChannelConfig { + channel: chn, + keyspace: 2, + time_bin_size: Nanos { ns: DAY }, + scalar_type: netpod::ScalarType::I32, + byte_order: netpod::ByteOrder::big_endian(), + shape: netpod::Shape::Scalar, + array: false, + compression: false, + }; + let cluster = taskrun::test_cluster(); + let node = cluster.nodes[nodeix].clone(); + let buffer_size = 512; + let event_chunker_conf = EventChunkerConf { + disk_stats_every: ByteSize::kb(1024), + }; + } + */ +} diff --git a/disk/src/index.rs b/disk/src/index.rs index 2665ca1..d736cc0 100644 --- a/disk/src/index.rs +++ b/disk/src/index.rs @@ -1,6 +1,7 @@ use arrayref::array_ref; use err::Error; use netpod::log::*; +use netpod::NanoRange; use netpod::Nanos; use std::mem::size_of; use tokio::fs::File; @@ -168,7 +169,7 @@ pub async fn read_event_at(pos: u64, file: &mut File) -> Result<(u32, Nanos), Er Ok(ev) } -pub async fn position_static_len_datafile(mut file: File, beg: u64) -> Result<(File, bool, u32), Error> { +pub async fn position_static_len_datafile(mut file: File, range: NanoRange) -> Result<(File, bool, u32, u64), Error> { let flen = file.seek(SeekFrom::End(0)).await?; file.seek(SeekFrom::Start(0)).await?; let mut buf = vec![0; 1024]; @@ -189,18 +190,30 @@ pub async fn position_static_len_datafile(mut file: File, beg: u64) -> Result<(F } let y = t.1.ns; let mut nreads = 2; - if x >= beg { + if x >= range.end { file.seek(SeekFrom::Start(j)).await?; - return Ok((file, true, nreads)); + return Ok((file, false, nreads, j)); } - if y < beg { + if y < range.beg { file.seek(SeekFrom::Start(j)).await?; - return Ok((file, false, nreads)); + return Ok((file, false, nreads, j)); } + if x >= range.beg && x < range.end { + file.seek(SeekFrom::Start(j)).await?; + return Ok((file, true, nreads, j)); + } + let mut x = x; + let mut y = y; loop { + assert_eq!((k - j) % evlen, 0); if k - j < 2 * evlen { - file.seek(SeekFrom::Start(k)).await?; - return Ok((file, true, nreads)); + if y < range.end { + file.seek(SeekFrom::Start(k)).await?; + return Ok((file, true, nreads, k)); + } else { + file.seek(SeekFrom::Start(k)).await?; + return Ok((file, false, nreads, k)); + } } let m = j + (k - j) / 2 / evlen * evlen; let t = read_event_at(m, &mut file).await?; @@ -211,10 +224,12 @@ pub async fn position_static_len_datafile(mut file: File, beg: u64) -> Result<(F )))?; } nreads += 1; - let x = t.1.ns; - if x < beg { + let e = t.1.ns; + if e < range.beg { + x = e; j = m; } else { + y = e; k = m; } } @@ -222,8 +237,8 @@ pub async fn position_static_len_datafile(mut file: File, beg: u64) -> Result<(F pub async fn position_static_len_datafile_at_largest_smaller_than( mut file: File, - beg: u64, -) -> Result<(File, bool, u32), Error> { + range: NanoRange, +) -> Result<(File, bool, u32, u64), Error> { let flen = file.seek(SeekFrom::End(0)).await?; file.seek(SeekFrom::Start(0)).await?; let mut buf = vec![0; 1024]; @@ -244,18 +259,18 @@ pub async fn position_static_len_datafile_at_largest_smaller_than( } let y = t.1.ns; let mut nreads = 2; - if x >= beg { + if x >= range.beg { file.seek(SeekFrom::Start(j)).await?; - return Ok((file, false, nreads)); + return Ok((file, false, nreads, j)); } - if y < beg { + if y < range.beg { file.seek(SeekFrom::Start(k)).await?; - return Ok((file, true, nreads)); + return Ok((file, true, nreads, k)); } loop { if k - j < 2 * evlen { file.seek(SeekFrom::Start(j)).await?; - return Ok((file, true, nreads)); + return Ok((file, true, nreads, j)); } let m = j + (k - j) / 2 / evlen * evlen; let t = read_event_at(m, &mut file).await?; @@ -267,7 +282,7 @@ pub async fn position_static_len_datafile_at_largest_smaller_than( } nreads += 1; let x = t.1.ns; - if x < beg { + if x < range.beg { j = m; } else { k = m; diff --git a/httpclient/Cargo.toml b/httpclient/Cargo.toml index 6800ace..e95d6b8 100644 --- a/httpclient/Cargo.toml +++ b/httpclient/Cargo.toml @@ -5,13 +5,13 @@ authors = ["Dominik Werder "] edition = "2018" [dependencies] -serde_json = "1.0" serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" http = "0.2" url = "2.2" -tokio = { version = "1.7.1", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } -hyper = { version = "0.14", features = ["http1", "http2", "client", "server", "tcp", "stream"] } -hyper-tls = { version="0.5.0" } +tokio = { version = "1.11.1", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } +hyper = { version = "0.14.3", features = ["http1", "http2", "client", "server", "tcp", "stream"] } +hyper-tls = { version = "0.5.0" } bytes = "1.0.1" futures-core = "0.3.14" futures-util = "0.3.14" diff --git a/httpret/static/documentation/api4.html b/httpret/static/documentation/api4.html index 1788085..b431b4a 100644 --- a/httpret/static/documentation/api4.html +++ b/httpret/static/documentation/api4.html @@ -24,7 +24,7 @@ so please feel free to create some Jira ticket!

The result encodes timestamps in the form:

{
   "tsAnchor": 1623909860,                    // Time-anchor of this result in UNIX epoch seconds.
-  "tsOffMs": [573, 15671, 37932, ...],       // Millisecond-offset to tsAnchor for each event/bin-edge.
+  "tsOffMs": [173, 472, 857, ...],       // Millisecond-offset to tsAnchor for each event/bin-edge.
   "tsOffNs": [422901, 422902, 422903, ...],  // Nanosecond-offset to tsAnchor in addition to tsOffMs for each event/bin-edge.
 }

which results in these nanosecond-timestamps:

diff --git a/taskrun/src/lib.rs b/taskrun/src/lib.rs index 8fd9009..4ca33ee 100644 --- a/taskrun/src/lib.rs +++ b/taskrun/src/lib.rs @@ -122,7 +122,7 @@ pub fn test_cluster() -> netpod::Cluster { }) .collect(); netpod::Cluster { - nodes: nodes, + nodes, database: netpod::Database { name: "daqbuffer".into(), host: "localhost".into(),