Test for file positioning

This commit is contained in:
Dominik Werder
2021-09-21 16:13:29 +02:00
parent b54b31fe98
commit 5e385fd88c
12 changed files with 256 additions and 72 deletions

2
.gitignore vendored
View File

@@ -1,5 +1,7 @@
/Cargo.lock
/target
/archapp/src/generated
/.idea
/.vscode
/tmpdata
/docs

View File

@@ -2,12 +2,13 @@
members = ["daqbuffer", "h5out", "archapp", "archapp_xc", "archapp_wrap", "items", "nodenet", "httpclient"]
[profile.release]
debug = 1
opt-level = 1
debug = 1
#overflow-checks = true
#debug-assertions = true
lto = "off"
codegen-units = 40
codegen-units = 160
incremental = true
[patch.crates-io]
tokio = { git = "https://github.com/dominikwerder/tokio", branch = "maxbuf" }
tokio = { git = "https://github.com/dominikwerder/tokio", rev = "637a0c8a" }

View File

@@ -10,8 +10,17 @@ pub fn read_pb_dummy() -> Result<(), Error> {
#[cfg(feature = "devread")]
#[test]
fn read_pb_00() -> Result<(), Error> {
use std::path::PathBuf;
let block1 = async move {
let path = "../../../../archappdata/tmp/lts/ArchiverStore/SARUN16/MQUA080/X:2021_01.pb";
let homedir = std::env::var("HOME").unwrap();
let path = PathBuf::from(homedir)
.join("archappdata")
.join("lts")
.join("ArchiverStore")
.join("SARUN16")
.join("MQUA080")
.join("X:2021_01.pb");
let f1 = tokio::fs::read(path).await?;
let mut j1 = 0;
loop {

View File

@@ -19,7 +19,7 @@ bincode = "1.3.3"
pin-project = "1.0.7"
#async-channel = "1"
#dashmap = "3"
tokio-postgres = { version = "0.7", features = ["runtime", "with-chrono-0_4", "with-serde_json-1"] }
tokio-postgres = { version = "0.7.2", features = ["runtime", "with-chrono-0_4", "with-serde_json-1"] }
async-channel = "1.6"
chrono = "0.4"
regex = "1.5.4"

View File

@@ -22,7 +22,7 @@ byteorder = "1.4.3"
futures-core = "0.3.14"
futures-util = "0.3.14"
async-stream = "0.3.0"
tracing = { version = "0.1.25", features = [] }
tracing = "0.1.25"
tracing-futures = { version = "0.2.5", features = ["futures-01", "futures-03", "std-future"] }
fs2 = "0.4.3"
libc = "0.2.93"

View File

@@ -16,6 +16,7 @@ pub struct OpenedFile {
pub positioned: bool,
pub index: bool,
pub nreads: u32,
pub pos: u64,
}
#[derive(Debug)]
@@ -81,7 +82,7 @@ async fn open_files_inner(
}
let mut a = vec![];
for path in paths::datapaths_for_timebin(tb, &channel_config, &node).await? {
let w = position_file(&path, range, &channel_config, false).await?;
let w = position_file(&path, range, false).await?;
if w.found {
a.push(w.file);
}
@@ -101,12 +102,7 @@ struct Positioned {
found: bool,
}
async fn position_file(
path: &PathBuf,
range: &NanoRange,
channel_config: &ChannelConfig,
expand: bool,
) -> Result<Positioned, Error> {
async fn position_file(path: &PathBuf, range: &NanoRange, expand: bool) -> Result<Positioned, Error> {
match OpenOptions::new().read(true).open(&path).await {
Ok(file) => {
let index_path = PathBuf::from(format!("{}_Index", path.to_str().unwrap()));
@@ -114,40 +110,28 @@ async fn position_file(
Ok(mut index_file) => {
let meta = index_file.metadata().await?;
if meta.len() > 1024 * 1024 * 120 {
let msg = format!(
"too large index file {} bytes for {}",
meta.len(),
channel_config.channel.name
);
let msg = format!("too large index file {} bytes for {:?}", meta.len(), index_path);
error!("{}", msg);
return Err(Error::with_msg(msg));
} else if meta.len() > 1024 * 1024 * 80 {
let msg = format!(
"very large index file {} bytes for {}",
meta.len(),
channel_config.channel.name
);
let msg = format!("very large index file {} bytes for {:?}", meta.len(), index_path);
warn!("{}", msg);
} else if meta.len() > 1024 * 1024 * 20 {
let msg = format!(
"large index file {} bytes for {}",
meta.len(),
channel_config.channel.name
);
let msg = format!("large index file {} bytes for {:?}", meta.len(), index_path);
info!("{}", msg);
}
if meta.len() < 2 {
return Err(Error::with_msg(format!(
"bad meta len {} for {}",
"bad meta len {} for {:?}",
meta.len(),
channel_config.channel.name
index_path
)));
}
if meta.len() % 16 != 2 {
return Err(Error::with_msg(format!(
"bad meta len {} for {}",
"bad meta len {} for {:?}",
meta.len(),
channel_config.channel.name
index_path
)));
}
let mut buf = BytesMut::with_capacity(meta.len() as usize);
@@ -169,6 +153,7 @@ async fn position_file(
positioned: true,
index: true,
nreads: 0,
pos: o.1,
};
return Ok(Positioned { file: g, found: true });
}
@@ -180,6 +165,7 @@ async fn position_file(
positioned: false,
index: true,
nreads: 0,
pos: 0,
};
return Ok(Positioned { file: g, found: false });
}
@@ -189,9 +175,10 @@ async fn position_file(
ErrorKind::NotFound => {
let ts1 = Instant::now();
let res = if expand {
super::index::position_static_len_datafile_at_largest_smaller_than(file, range.beg).await?
super::index::position_static_len_datafile_at_largest_smaller_than(file, range.clone())
.await?
} else {
super::index::position_static_len_datafile(file, range.beg).await?
super::index::position_static_len_datafile(file, range.clone()).await?
};
let ts2 = Instant::now();
if false {
@@ -208,6 +195,7 @@ async fn position_file(
positioned: true,
index: false,
nreads: res.2,
pos: res.3,
};
return Ok(Positioned { file: g, found: true });
} else {
@@ -218,6 +206,7 @@ async fn position_file(
positioned: false,
index: false,
nreads: res.2,
pos: 0,
};
return Ok(Positioned { file: g, found: false });
}
@@ -234,6 +223,7 @@ async fn position_file(
positioned: false,
index: true,
nreads: 0,
pos: 0,
};
return Ok(Positioned { file: g, found: false });
}
@@ -299,7 +289,7 @@ async fn get_timebins(channel_config: &ChannelConfig, node: Node) -> Result<Vec<
}
}
async fn open_expanded_files_inner(
pub async fn open_expanded_files_inner(
chtx: &async_channel::Sender<Result<OpenedFileSet, Error>>,
range: &NanoRange,
channel_config: &ChannelConfig,
@@ -332,7 +322,7 @@ async fn open_expanded_files_inner(
let tb = timebins[p1];
let mut a = vec![];
for path in paths::datapaths_for_timebin(tb, &channel_config, &node).await? {
let w = position_file(&path, range, &channel_config, true).await?;
let w = position_file(&path, range, true).await?;
if w.found {
info!("----- open_expanded_files_inner w.found for {:?}", path);
a.push(w.file);
@@ -360,7 +350,7 @@ async fn open_expanded_files_inner(
let tb = timebins[p1];
let mut a = vec![];
for path in paths::datapaths_for_timebin(tb, &channel_config, &node).await? {
let w = position_file(&path, range, &channel_config, false).await?;
let w = position_file(&path, range, false).await?;
if w.found {
a.push(w.file);
}
@@ -422,3 +412,136 @@ fn expanded_file_list() {
};
taskrun::run(task).unwrap();
}
#[cfg(test)]
mod test {
use crate::dataopen::position_file;
use err::Error;
use netpod::timeunits::{DAY, HOUR, MS};
use netpod::NanoRange;
#[test]
fn position_basic_file_at_begin() -> Result<(), Error> {
let fut = async {
let path = "../tmpdata/node00/ks_2/byTime/scalar-i32-be/0000000000000000001/0000000000/0000000000086400000_00000_Data".into();
let range = NanoRange {
beg: DAY,
end: DAY + MS * 20000,
};
let expand = false;
let res = position_file(&path, &range, expand).await?;
assert_eq!(res.found, true);
assert_eq!(res.file.file.is_some(), true);
assert_eq!(res.file.index, false);
assert_eq!(res.file.positioned, true);
assert_eq!(res.file.pos, 23);
Ok(())
};
taskrun::run(fut)?;
Ok(())
}
#[test]
fn position_basic_file_for_empty_range() -> Result<(), Error> {
let fut = async {
let path = "../tmpdata/node00/ks_2/byTime/scalar-i32-be/0000000000000000001/0000000000/0000000000086400000_00000_Data".into();
let range = NanoRange {
beg: DAY + MS * 80000,
end: DAY + MS * 80000,
};
let expand = false;
let res = position_file(&path, &range, expand).await?;
assert_eq!(res.found, false);
assert_eq!(res.file.file.is_some(), false);
assert_eq!(res.file.index, false);
assert_eq!(res.file.positioned, false);
//assert_eq!(res.file.pos, 23);
Ok(())
};
taskrun::run(fut)?;
Ok(())
}
#[test]
fn position_basic_file_at_begin_for_small_range() -> Result<(), Error> {
let fut = async {
let path = "../tmpdata/node00/ks_2/byTime/scalar-i32-be/0000000000000000001/0000000000/0000000000086400000_00000_Data".into();
let range = NanoRange {
beg: DAY,
end: DAY + MS * 300000,
};
let expand = false;
let res = position_file(&path, &range, expand).await?;
assert_eq!(res.found, true);
assert_eq!(res.file.file.is_some(), true);
assert_eq!(res.file.index, false);
assert_eq!(res.file.positioned, true);
assert_eq!(res.file.pos, 23);
Ok(())
};
taskrun::run(fut)?;
Ok(())
}
#[test]
fn position_basic_file_at_inner() -> Result<(), Error> {
let fut = async {
let path = "../tmpdata/node00/ks_2/byTime/scalar-i32-be/0000000000000000001/0000000000/0000000000086400000_00000_Data".into();
let range = NanoRange {
beg: DAY + MS * 4000,
end: DAY + MS * 7000,
};
let expand = false;
let res = position_file(&path, &range, expand).await?;
assert_eq!(res.found, true);
assert_eq!(res.file.file.is_some(), true);
assert_eq!(res.file.index, false);
assert_eq!(res.file.positioned, true);
assert_eq!(res.file.pos, 179);
Ok(())
};
taskrun::run(fut)?;
Ok(())
}
#[test]
fn position_basic_file_at_inner_for_too_small_range() -> Result<(), Error> {
let fut = async {
let path = "../tmpdata/node00/ks_2/byTime/scalar-i32-be/0000000000000000001/0000000000/0000000000086400000_00000_Data".into();
let range = NanoRange {
beg: DAY + MS * 1501,
end: DAY + MS * 1502,
};
let expand = false;
let res = position_file(&path, &range, expand).await?;
assert_eq!(res.found, false);
assert_eq!(res.file.file.is_some(), false);
assert_eq!(res.file.index, false);
assert_eq!(res.file.positioned, false);
assert_eq!(res.file.pos, 0);
Ok(())
};
taskrun::run(fut)?;
Ok(())
}
#[test]
fn position_basic_file_starts_after_range() -> Result<(), Error> {
let fut = async {
let path = "../tmpdata/node00/ks_2/byTime/scalar-i32-be/0000000000000000001/0000000000/0000000000086400000_00000_Data".into();
let range = NanoRange {
beg: HOUR * 22,
end: HOUR * 23,
};
let expand = false;
let res = position_file(&path, &range, expand).await?;
assert_eq!(res.found, false);
assert_eq!(res.file.file.is_some(), false);
assert_eq!(res.file.index, false);
assert_eq!(res.file.positioned, false);
Ok(())
};
taskrun::run(fut)?;
Ok(())
}
}

View File

@@ -219,7 +219,7 @@ impl Stream for EventChunkerMultifile {
}
#[cfg(test)]
fn read_expanded_for_range(range: netpod::NanoRange) -> Result<(usize, usize), Error> {
fn read_expanded_for_range(range: netpod::NanoRange, nodeix: usize) -> Result<(usize, usize), Error> {
use netpod::timeunits::*;
use netpod::{ByteSize, Nanos};
let chn = netpod::Channel {
@@ -238,8 +238,7 @@ fn read_expanded_for_range(range: netpod::NanoRange) -> Result<(usize, usize), E
compression: false,
};
let cluster = taskrun::test_cluster();
let node_ix = 0;
let node = cluster.nodes[node_ix].clone();
let node = cluster.nodes[nodeix].clone();
let buffer_size = 512;
let event_chunker_conf = EventChunkerConf {
disk_stats_every: ByteSize::kb(1024),
@@ -250,7 +249,7 @@ fn read_expanded_for_range(range: netpod::NanoRange) -> Result<(usize, usize), E
range,
channel_config,
node,
node_ix,
nodeix,
FileIoBufferSize::new(buffer_size),
event_chunker_conf,
true,
@@ -261,6 +260,7 @@ fn read_expanded_for_range(range: netpod::NanoRange) -> Result<(usize, usize), E
Ok(item) => match item {
StreamItem::DataItem(item) => match item {
RangeCompletableItem::Data(item) => {
info!("item: {:?}", item.tss.iter().map(|x| x / 1000000).collect::<Vec<_>>());
event_count += item.tss.len();
}
_ => {}
@@ -287,9 +287,9 @@ fn read_expanded_0() -> Result<(), Error> {
use netpod::timeunits::*;
let range = netpod::NanoRange {
beg: DAY + MS * 0,
end: DAY + MS * 0 + MS * 1500,
end: DAY + MS * 1500,
};
let res = read_expanded_for_range(range)?;
let res = read_expanded_for_range(range, 0)?;
if res.0 != 2 {
Err(Error::with_msg(format!("unexpected number of events: {}", res.0)))?;
}
@@ -301,9 +301,9 @@ fn read_expanded_1() -> Result<(), Error> {
use netpod::timeunits::*;
let range = netpod::NanoRange {
beg: DAY + MS * 0,
end: DAY + MS * 0 + MS * 1501,
end: DAY + MS * 1501,
};
let res = read_expanded_for_range(range)?;
let res = read_expanded_for_range(range, 0)?;
if res.0 != 3 {
Err(Error::with_msg(format!("unexpected number of events: {}", res.0)))?;
}
@@ -315,9 +315,9 @@ fn read_expanded_2() -> Result<(), Error> {
use netpod::timeunits::*;
let range = netpod::NanoRange {
beg: DAY - MS * 100,
end: DAY + MS * 0 + MS * 1501,
end: DAY + MS * 1501,
};
let res = read_expanded_for_range(range)?;
let res = read_expanded_for_range(range, 0)?;
if res.0 != 3 {
Err(Error::with_msg(format!("unexpected number of events: {}", res.0)))?;
}
@@ -329,9 +329,9 @@ fn read_expanded_3() -> Result<(), Error> {
use netpod::timeunits::*;
let range = netpod::NanoRange {
beg: DAY - MS * 1500,
end: DAY + MS * 0 + MS * 1501,
end: DAY + MS * 1501,
};
let res = read_expanded_for_range(range)?;
let res = read_expanded_for_range(range, 0)?;
if res.0 != 4 {
Err(Error::with_msg(format!("unexpected number of events: {}", res.0)))?;
}

View File

@@ -691,3 +691,37 @@ impl HasSeenBeforeRangeCount for EventChunker {
self.seen_before_range_count()
}
}
#[cfg(test)]
mod test {
use err::Error;
use netpod::timeunits::*;
use netpod::{ByteSize, Nanos};
/*
#[test]
fn read_expanded_for_range(range: netpod::NanoRange, nodeix: usize) -> Result<(usize, usize), Error> {
let chn = netpod::Channel {
backend: "testbackend".into(),
name: "scalar-i32-be".into(),
};
// TODO read config from disk.
let channel_config = ChannelConfig {
channel: chn,
keyspace: 2,
time_bin_size: Nanos { ns: DAY },
scalar_type: netpod::ScalarType::I32,
byte_order: netpod::ByteOrder::big_endian(),
shape: netpod::Shape::Scalar,
array: false,
compression: false,
};
let cluster = taskrun::test_cluster();
let node = cluster.nodes[nodeix].clone();
let buffer_size = 512;
let event_chunker_conf = EventChunkerConf {
disk_stats_every: ByteSize::kb(1024),
};
}
*/
}

View File

@@ -1,6 +1,7 @@
use arrayref::array_ref;
use err::Error;
use netpod::log::*;
use netpod::NanoRange;
use netpod::Nanos;
use std::mem::size_of;
use tokio::fs::File;
@@ -168,7 +169,7 @@ pub async fn read_event_at(pos: u64, file: &mut File) -> Result<(u32, Nanos), Er
Ok(ev)
}
pub async fn position_static_len_datafile(mut file: File, beg: u64) -> Result<(File, bool, u32), Error> {
pub async fn position_static_len_datafile(mut file: File, range: NanoRange) -> Result<(File, bool, u32, u64), Error> {
let flen = file.seek(SeekFrom::End(0)).await?;
file.seek(SeekFrom::Start(0)).await?;
let mut buf = vec![0; 1024];
@@ -189,18 +190,30 @@ pub async fn position_static_len_datafile(mut file: File, beg: u64) -> Result<(F
}
let y = t.1.ns;
let mut nreads = 2;
if x >= beg {
if x >= range.end {
file.seek(SeekFrom::Start(j)).await?;
return Ok((file, true, nreads));
return Ok((file, false, nreads, j));
}
if y < beg {
if y < range.beg {
file.seek(SeekFrom::Start(j)).await?;
return Ok((file, false, nreads));
return Ok((file, false, nreads, j));
}
if x >= range.beg && x < range.end {
file.seek(SeekFrom::Start(j)).await?;
return Ok((file, true, nreads, j));
}
let mut x = x;
let mut y = y;
loop {
assert_eq!((k - j) % evlen, 0);
if k - j < 2 * evlen {
file.seek(SeekFrom::Start(k)).await?;
return Ok((file, true, nreads));
if y < range.end {
file.seek(SeekFrom::Start(k)).await?;
return Ok((file, true, nreads, k));
} else {
file.seek(SeekFrom::Start(k)).await?;
return Ok((file, false, nreads, k));
}
}
let m = j + (k - j) / 2 / evlen * evlen;
let t = read_event_at(m, &mut file).await?;
@@ -211,10 +224,12 @@ pub async fn position_static_len_datafile(mut file: File, beg: u64) -> Result<(F
)))?;
}
nreads += 1;
let x = t.1.ns;
if x < beg {
let e = t.1.ns;
if e < range.beg {
x = e;
j = m;
} else {
y = e;
k = m;
}
}
@@ -222,8 +237,8 @@ pub async fn position_static_len_datafile(mut file: File, beg: u64) -> Result<(F
pub async fn position_static_len_datafile_at_largest_smaller_than(
mut file: File,
beg: u64,
) -> Result<(File, bool, u32), Error> {
range: NanoRange,
) -> Result<(File, bool, u32, u64), Error> {
let flen = file.seek(SeekFrom::End(0)).await?;
file.seek(SeekFrom::Start(0)).await?;
let mut buf = vec![0; 1024];
@@ -244,18 +259,18 @@ pub async fn position_static_len_datafile_at_largest_smaller_than(
}
let y = t.1.ns;
let mut nreads = 2;
if x >= beg {
if x >= range.beg {
file.seek(SeekFrom::Start(j)).await?;
return Ok((file, false, nreads));
return Ok((file, false, nreads, j));
}
if y < beg {
if y < range.beg {
file.seek(SeekFrom::Start(k)).await?;
return Ok((file, true, nreads));
return Ok((file, true, nreads, k));
}
loop {
if k - j < 2 * evlen {
file.seek(SeekFrom::Start(j)).await?;
return Ok((file, true, nreads));
return Ok((file, true, nreads, j));
}
let m = j + (k - j) / 2 / evlen * evlen;
let t = read_event_at(m, &mut file).await?;
@@ -267,7 +282,7 @@ pub async fn position_static_len_datafile_at_largest_smaller_than(
}
nreads += 1;
let x = t.1.ns;
if x < beg {
if x < range.beg {
j = m;
} else {
k = m;

View File

@@ -5,13 +5,13 @@ authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2018"
[dependencies]
serde_json = "1.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
http = "0.2"
url = "2.2"
tokio = { version = "1.7.1", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] }
hyper = { version = "0.14", features = ["http1", "http2", "client", "server", "tcp", "stream"] }
hyper-tls = { version="0.5.0" }
tokio = { version = "1.11.1", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] }
hyper = { version = "0.14.3", features = ["http1", "http2", "client", "server", "tcp", "stream"] }
hyper-tls = { version = "0.5.0" }
bytes = "1.0.1"
futures-core = "0.3.14"
futures-util = "0.3.14"

View File

@@ -24,7 +24,7 @@ so please feel free to create some Jira ticket!</p>
<p>The result encodes timestamps in the form:</p>
<pre>{
"tsAnchor": 1623909860, // Time-anchor of this result in UNIX epoch seconds.
"tsOffMs": [573, 15671, 37932, ...], // Millisecond-offset to tsAnchor for each event/bin-edge.
"tsOffMs": [173, 472, 857, ...], // Millisecond-offset to tsAnchor for each event/bin-edge.
"tsOffNs": [422901, 422902, 422903, ...], // Nanosecond-offset to tsAnchor in addition to tsOffMs for each event/bin-edge.
}</pre>
<p>which results in these nanosecond-timestamps:</p>

View File

@@ -122,7 +122,7 @@ pub fn test_cluster() -> netpod::Cluster {
})
.collect();
netpod::Cluster {
nodes: nodes,
nodes,
database: netpod::Database {
name: "daqbuffer".into(),
host: "localhost".into(),