WIP Before merging Merger

This commit is contained in:
Dominik Werder
2021-09-22 17:57:15 +02:00
parent 5e385fd88c
commit 10e0fd887d
5 changed files with 698 additions and 213 deletions

View File

@@ -10,6 +10,158 @@ use std::time::Instant;
use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncReadExt, AsyncSeekExt, ErrorKind, SeekFrom};
pub struct Positioned {
pub file: OpenedFile,
pub found: bool,
}
pub async fn position_file_for_test(
path: &PathBuf,
range: &NanoRange,
expand_left: bool,
expand_right: bool,
) -> Result<Positioned, Error> {
position_file(path, range, expand_left, expand_right).await
}
async fn position_file(
path: &PathBuf,
range: &NanoRange,
expand_left: bool,
expand_right: bool,
) -> Result<Positioned, Error> {
assert_eq!(expand_left && expand_right, false);
match OpenOptions::new().read(true).open(&path).await {
Ok(file) => {
let index_path = PathBuf::from(format!("{}_Index", path.to_str().unwrap()));
match OpenOptions::new().read(true).open(&index_path).await {
Ok(mut index_file) => {
let meta = index_file.metadata().await?;
if meta.len() > 1024 * 1024 * 120 {
let msg = format!("too large index file {} bytes for {:?}", meta.len(), index_path);
error!("{}", msg);
return Err(Error::with_msg(msg));
} else if meta.len() > 1024 * 1024 * 80 {
let msg = format!("very large index file {} bytes for {:?}", meta.len(), index_path);
warn!("{}", msg);
} else if meta.len() > 1024 * 1024 * 20 {
let msg = format!("large index file {} bytes for {:?}", meta.len(), index_path);
info!("{}", msg);
}
if meta.len() < 2 {
return Err(Error::with_msg(format!(
"bad meta len {} for {:?}",
meta.len(),
index_path
)));
}
if meta.len() % 16 != 2 {
return Err(Error::with_msg(format!(
"bad meta len {} for {:?}",
meta.len(),
index_path
)));
}
let mut buf = BytesMut::with_capacity(meta.len() as usize);
buf.resize(buf.capacity(), 0);
index_file.read_exact(&mut buf).await?;
let gg = if expand_left {
super::index::find_largest_smaller_than(range.clone(), expand_right, &buf[2..])?
} else {
super::index::find_ge(range.clone(), expand_right, &buf[2..])?
};
match gg {
Some(o) => {
let mut file = file;
file.seek(SeekFrom::Start(o.1)).await?;
//info!("position_file case A {:?}", path);
let g = OpenedFile {
file: Some(file),
path: path.clone(),
positioned: true,
index: true,
nreads: 0,
pos: o.1,
};
return Ok(Positioned { file: g, found: true });
}
None => {
//info!("position_file case B {:?}", path);
let g = OpenedFile {
file: Some(file),
path: path.clone(),
positioned: false,
index: true,
nreads: 0,
pos: 0,
};
return Ok(Positioned { file: g, found: false });
}
}
}
Err(e) => match e.kind() {
ErrorKind::NotFound => {
let ts1 = Instant::now();
let res = if expand_left {
super::index::position_static_len_datafile_at_largest_smaller_than(
file,
range.clone(),
expand_right,
)
.await?
} else {
super::index::position_static_len_datafile(file, range.clone(), expand_right).await?
};
let ts2 = Instant::now();
if false {
// TODO collect for stats:
let dur = ts2.duration_since(ts1);
info!("position_static_len_datafile took ms {}", dur.as_millis());
}
let file = res.0;
if res.1 {
//info!("position_file case C {:?}", path);
let g = OpenedFile {
file: Some(file),
path: path.clone(),
positioned: true,
index: false,
nreads: res.2,
pos: res.3,
};
return Ok(Positioned { file: g, found: true });
} else {
//info!("position_file case D {:?}", path);
let g = OpenedFile {
file: Some(file),
path: path.clone(),
positioned: false,
index: false,
nreads: res.2,
pos: 0,
};
return Ok(Positioned { file: g, found: false });
}
}
_ => Err(e)?,
},
}
}
Err(e) => {
warn!("can not open {:?} error {:?}", path, e);
let g = OpenedFile {
file: None,
path: path.clone(),
positioned: false,
index: true,
nreads: 0,
pos: 0,
};
return Ok(Positioned { file: g, found: false });
}
}
}
pub struct OpenedFile {
pub path: PathBuf,
pub file: Option<File>,
@@ -82,7 +234,7 @@ async fn open_files_inner(
}
let mut a = vec![];
for path in paths::datapaths_for_timebin(tb, &channel_config, &node).await? {
let w = position_file(&path, range, false).await?;
let w = position_file(&path, range, false, false).await?;
if w.found {
a.push(w.file);
}
@@ -97,141 +249,9 @@ async fn open_files_inner(
Ok(())
}
struct Positioned {
file: OpenedFile,
found: bool,
}
async fn position_file(path: &PathBuf, range: &NanoRange, expand: bool) -> Result<Positioned, Error> {
match OpenOptions::new().read(true).open(&path).await {
Ok(file) => {
let index_path = PathBuf::from(format!("{}_Index", path.to_str().unwrap()));
match OpenOptions::new().read(true).open(&index_path).await {
Ok(mut index_file) => {
let meta = index_file.metadata().await?;
if meta.len() > 1024 * 1024 * 120 {
let msg = format!("too large index file {} bytes for {:?}", meta.len(), index_path);
error!("{}", msg);
return Err(Error::with_msg(msg));
} else if meta.len() > 1024 * 1024 * 80 {
let msg = format!("very large index file {} bytes for {:?}", meta.len(), index_path);
warn!("{}", msg);
} else if meta.len() > 1024 * 1024 * 20 {
let msg = format!("large index file {} bytes for {:?}", meta.len(), index_path);
info!("{}", msg);
}
if meta.len() < 2 {
return Err(Error::with_msg(format!(
"bad meta len {} for {:?}",
meta.len(),
index_path
)));
}
if meta.len() % 16 != 2 {
return Err(Error::with_msg(format!(
"bad meta len {} for {:?}",
meta.len(),
index_path
)));
}
let mut buf = BytesMut::with_capacity(meta.len() as usize);
buf.resize(buf.capacity(), 0);
index_file.read_exact(&mut buf).await?;
let gg = if expand {
super::index::find_largest_smaller_than(range.beg, &buf[2..])?
} else {
super::index::find_ge(range.beg, &buf[2..])?
};
match gg {
Some(o) => {
let mut file = file;
file.seek(SeekFrom::Start(o.1)).await?;
info!("position_file case A {:?}", path);
let g = OpenedFile {
file: Some(file),
path: path.clone(),
positioned: true,
index: true,
nreads: 0,
pos: o.1,
};
return Ok(Positioned { file: g, found: true });
}
None => {
info!("position_file case B {:?}", path);
let g = OpenedFile {
file: None,
path: path.clone(),
positioned: false,
index: true,
nreads: 0,
pos: 0,
};
return Ok(Positioned { file: g, found: false });
}
}
}
Err(e) => match e.kind() {
ErrorKind::NotFound => {
let ts1 = Instant::now();
let res = if expand {
super::index::position_static_len_datafile_at_largest_smaller_than(file, range.clone())
.await?
} else {
super::index::position_static_len_datafile(file, range.clone()).await?
};
let ts2 = Instant::now();
if false {
// TODO collect for stats:
let dur = ts2.duration_since(ts1);
info!("position_static_len_datafile took ms {}", dur.as_millis());
}
let file = res.0;
if res.1 {
info!("position_file case C {:?}", path);
let g = OpenedFile {
file: Some(file),
path: path.clone(),
positioned: true,
index: false,
nreads: res.2,
pos: res.3,
};
return Ok(Positioned { file: g, found: true });
} else {
info!("position_file case D {:?}", path);
let g = OpenedFile {
file: None,
path: path.clone(),
positioned: false,
index: false,
nreads: res.2,
pos: 0,
};
return Ok(Positioned { file: g, found: false });
}
}
_ => Err(e)?,
},
}
}
Err(e) => {
warn!("can not open {:?} error {:?}", path, e);
let g = OpenedFile {
file: None,
path: path.clone(),
positioned: false,
index: true,
nreads: 0,
pos: 0,
};
return Ok(Positioned { file: g, found: false });
}
}
}
/*
/**
Provide the stream of positioned data files which are relevant for the given parameters.
Expanded to one event before and after the requested range, if exists.
*/
pub fn open_expanded_files(
@@ -289,7 +309,7 @@ async fn get_timebins(channel_config: &ChannelConfig, node: Node) -> Result<Vec<
}
}
pub async fn open_expanded_files_inner(
async fn open_expanded_files_inner(
chtx: &async_channel::Sender<Result<OpenedFileSet, Error>>,
range: &NanoRange,
channel_config: &ChannelConfig,
@@ -317,16 +337,16 @@ pub async fn open_expanded_files_inner(
p1, range, channel_config
)));
}
let mut found_first = false;
let mut found_pre = false;
loop {
let tb = timebins[p1];
let mut a = vec![];
for path in paths::datapaths_for_timebin(tb, &channel_config, &node).await? {
let w = position_file(&path, range, true).await?;
let w = position_file(&path, range, true, false).await?;
if w.found {
info!("----- open_expanded_files_inner w.found for {:?}", path);
a.push(w.file);
found_first = true;
found_pre = true;
}
}
let h = OpenedFileSet { timebin: tb, files: a };
@@ -335,7 +355,7 @@ pub async fn open_expanded_files_inner(
h.files.len()
);
chtx.send(Ok(h)).await?;
if found_first {
if found_pre {
p1 += 1;
break;
} else if p1 == 0 {
@@ -344,13 +364,13 @@ pub async fn open_expanded_files_inner(
p1 -= 1;
}
}
if found_first {
if found_pre {
// Append all following positioned files.
while p1 < timebins.len() {
let tb = timebins[p1];
let mut a = vec![];
for path in paths::datapaths_for_timebin(tb, &channel_config, &node).await? {
let w = position_file(&path, range, false).await?;
let w = position_file(&path, range, false, true).await?;
if w.found {
a.push(w.file);
}
@@ -419,19 +439,23 @@ mod test {
use err::Error;
use netpod::timeunits::{DAY, HOUR, MS};
use netpod::NanoRange;
use tokio::fs::OpenOptions;
const WAVE_FILE: &str =
"../tmpdata/node00/ks_3/byTime/wave-f64-be-n21/0000000000000000001/0000000000/0000000000086400000_00000_Data";
const SCALAR_FILE: &str =
"../tmpdata/node00/ks_2/byTime/scalar-i32-be/0000000000000000001/0000000000/0000000000086400000_00000_Data";
#[test]
fn position_basic_file_at_begin() -> Result<(), Error> {
let fut = async {
let path = "../tmpdata/node00/ks_2/byTime/scalar-i32-be/0000000000000000001/0000000000/0000000000086400000_00000_Data".into();
let path = SCALAR_FILE.into();
let range = NanoRange {
beg: DAY,
end: DAY + MS * 20000,
};
let expand = false;
let res = position_file(&path, &range, expand).await?;
let res = position_file(&path, &range, false, false).await?;
assert_eq!(res.found, true);
assert_eq!(res.file.file.is_some(), true);
assert_eq!(res.file.index, false);
assert_eq!(res.file.positioned, true);
assert_eq!(res.file.pos, 23);
@@ -444,18 +468,15 @@ mod test {
#[test]
fn position_basic_file_for_empty_range() -> Result<(), Error> {
let fut = async {
let path = "../tmpdata/node00/ks_2/byTime/scalar-i32-be/0000000000000000001/0000000000/0000000000086400000_00000_Data".into();
let path = SCALAR_FILE.into();
let range = NanoRange {
beg: DAY + MS * 80000,
end: DAY + MS * 80000,
};
let expand = false;
let res = position_file(&path, &range, expand).await?;
let res = position_file(&path, &range, false, false).await?;
assert_eq!(res.found, false);
assert_eq!(res.file.file.is_some(), false);
assert_eq!(res.file.index, false);
assert_eq!(res.file.positioned, false);
//assert_eq!(res.file.pos, 23);
Ok(())
};
taskrun::run(fut)?;
@@ -463,17 +484,15 @@ mod test {
}
#[test]
fn position_basic_file_at_begin_for_small_range() -> Result<(), Error> {
fn position_basic_file_at_begin_for_range() -> Result<(), Error> {
let fut = async {
let path = "../tmpdata/node00/ks_2/byTime/scalar-i32-be/0000000000000000001/0000000000/0000000000086400000_00000_Data".into();
let path = SCALAR_FILE.into();
let range = NanoRange {
beg: DAY,
end: DAY + MS * 300000,
};
let expand = false;
let res = position_file(&path, &range, expand).await?;
let res = position_file(&path, &range, false, false).await?;
assert_eq!(res.found, true);
assert_eq!(res.file.file.is_some(), true);
assert_eq!(res.file.index, false);
assert_eq!(res.file.positioned, true);
assert_eq!(res.file.pos, 23);
@@ -486,15 +505,13 @@ mod test {
#[test]
fn position_basic_file_at_inner() -> Result<(), Error> {
let fut = async {
let path = "../tmpdata/node00/ks_2/byTime/scalar-i32-be/0000000000000000001/0000000000/0000000000086400000_00000_Data".into();
let path = SCALAR_FILE.into();
let range = NanoRange {
beg: DAY + MS * 4000,
end: DAY + MS * 7000,
};
let expand = false;
let res = position_file(&path, &range, expand).await?;
let res = position_file(&path, &range, false, false).await?;
assert_eq!(res.found, true);
assert_eq!(res.file.file.is_some(), true);
assert_eq!(res.file.index, false);
assert_eq!(res.file.positioned, true);
assert_eq!(res.file.pos, 179);
@@ -504,20 +521,129 @@ mod test {
Ok(())
}
// TODO add same test for WAVE
#[test]
fn position_basic_file_at_inner_for_too_small_range() -> Result<(), Error> {
let fut = async {
let path = "../tmpdata/node00/ks_2/byTime/scalar-i32-be/0000000000000000001/0000000000/0000000000086400000_00000_Data".into();
let path = SCALAR_FILE.into();
let range = NanoRange {
beg: DAY + MS * 1501,
end: DAY + MS * 1502,
};
let expand = false;
let res = position_file(&path, &range, expand).await?;
let res = position_file(&path, &range, false, false).await?;
assert_eq!(res.found, false);
assert_eq!(res.file.file.is_some(), false);
assert_eq!(res.file.index, false);
assert_eq!(res.file.positioned, false);
Ok(())
};
taskrun::run(fut)?;
Ok(())
}
// TODO add same test for WAVE
#[test]
fn position_basic_file_starts_after_range() -> Result<(), Error> {
let fut = async {
let path = SCALAR_FILE.into();
let range = NanoRange {
beg: HOUR * 22,
end: HOUR * 23,
};
let res = position_file(&path, &range, false, false).await?;
assert_eq!(res.found, false);
assert_eq!(res.file.index, false);
assert_eq!(res.file.positioned, false);
Ok(())
};
taskrun::run(fut)?;
Ok(())
}
#[test]
fn position_basic_file_ends_before_range() -> Result<(), Error> {
let fut = async {
let path = SCALAR_FILE.into();
let range = NanoRange {
beg: DAY * 2,
end: DAY * 2 + HOUR,
};
let res = position_file(&path, &range, false, false).await?;
assert_eq!(res.found, false);
assert_eq!(res.file.index, false);
assert_eq!(res.file.positioned, false);
Ok(())
};
taskrun::run(fut)?;
Ok(())
}
#[test]
fn position_basic_index() -> Result<(), Error> {
let fut = async {
let path = WAVE_FILE.into();
let range = NanoRange {
beg: DAY + MS * 4000,
end: DAY + MS * 90000,
};
let res = position_file(&path, &range, false, false).await?;
assert_eq!(res.found, true);
assert_eq!(res.file.index, true);
assert_eq!(res.file.positioned, true);
assert_eq!(res.file.pos, 184);
Ok(())
};
taskrun::run(fut)?;
Ok(())
}
#[test]
fn position_basic_index_too_small_range() -> Result<(), Error> {
let fut = async {
let path = WAVE_FILE.into();
let range = NanoRange {
beg: DAY + MS * 3100,
end: DAY + MS * 3200,
};
let res = position_file(&path, &range, false, false).await?;
assert_eq!(res.found, false);
assert_eq!(res.file.index, true);
assert_eq!(res.file.positioned, false);
Ok(())
};
taskrun::run(fut)?;
Ok(())
}
#[test]
fn position_basic_index_starts_after_range() -> Result<(), Error> {
let fut = async {
let path = WAVE_FILE.into();
let range = NanoRange {
beg: HOUR * 10,
end: HOUR * 12,
};
let res = position_file(&path, &range, false, false).await?;
assert_eq!(res.found, false);
assert_eq!(res.file.index, true);
assert_eq!(res.file.positioned, false);
Ok(())
};
taskrun::run(fut)?;
Ok(())
}
#[test]
fn position_basic_index_ends_before_range() -> Result<(), Error> {
let fut = async {
let path = WAVE_FILE.into();
let range = NanoRange {
beg: DAY * 2,
end: DAY * 2 + MS * 40000,
};
let res = position_file(&path, &range, false, false).await?;
assert_eq!(res.found, false);
assert_eq!(res.file.index, true);
assert_eq!(res.file.positioned, false);
assert_eq!(res.file.pos, 0);
Ok(())
};
@@ -525,18 +651,40 @@ mod test {
Ok(())
}
//
// -------------- Expanded -----------------------------------
//
#[test]
fn position_basic_file_starts_after_range() -> Result<(), Error> {
fn position_expand_file_at_begin_no_fallback() -> Result<(), Error> {
let fut = async {
let path = "../tmpdata/node00/ks_2/byTime/scalar-i32-be/0000000000000000001/0000000000/0000000000086400000_00000_Data".into();
let path = SCALAR_FILE;
let range = NanoRange {
beg: HOUR * 22,
end: HOUR * 23,
beg: DAY + MS * 3000,
end: DAY + MS * 40000,
};
let expand = false;
let res = position_file(&path, &range, expand).await?;
let file = OpenOptions::new().read(true).open(path).await?;
let res =
super::super::index::position_static_len_datafile_at_largest_smaller_than(file, range.clone(), true)
.await?;
assert_eq!(res.1, true);
assert_eq!(res.3, 75);
Ok(())
};
taskrun::run(fut)?;
Ok(())
}
#[test]
fn position_expand_left_file_at_evts_file_begin() -> Result<(), Error> {
let fut = async {
let path = SCALAR_FILE.into();
let range = NanoRange {
beg: DAY,
end: DAY + MS * 40000,
};
let res = position_file(&path, &range, true, false).await?;
assert_eq!(res.found, false);
assert_eq!(res.file.file.is_some(), false);
assert_eq!(res.file.index, false);
assert_eq!(res.file.positioned, false);
Ok(())
@@ -544,4 +692,141 @@ mod test {
taskrun::run(fut)?;
Ok(())
}
#[test]
fn position_expand_right_file_at_evts_file_begin() -> Result<(), Error> {
let fut = async {
let path = SCALAR_FILE.into();
let range = NanoRange {
beg: DAY,
end: DAY + MS * 40000,
};
let res = position_file(&path, &range, false, true).await?;
assert_eq!(res.found, true);
assert_eq!(res.file.index, false);
assert_eq!(res.file.positioned, true);
assert_eq!(res.file.pos, 23);
Ok(())
};
taskrun::run(fut)?;
Ok(())
}
#[test]
fn position_expand_left_file_at_evts_file_within() -> Result<(), Error> {
let fut = async {
let path = SCALAR_FILE.into();
let range = NanoRange {
beg: DAY + MS * 3000,
end: DAY + MS * 40000,
};
let res = position_file(&path, &range, true, false).await?;
assert_eq!(res.found, true);
assert_eq!(res.file.index, false);
assert_eq!(res.file.positioned, true);
assert_eq!(res.file.pos, 75);
Ok(())
};
taskrun::run(fut)?;
Ok(())
}
// ------- TODO do the same with Wave (index)
#[test]
fn position_expand_left_file_ends_before_range() -> Result<(), Error> {
let fut = async {
let path = SCALAR_FILE.into();
let range = NanoRange {
beg: DAY * 2,
end: DAY * 2 + MS * 40000,
};
let res = position_file(&path, &range, true, false).await?;
assert_eq!(res.found, true);
assert_eq!(res.file.index, false);
assert_eq!(res.file.positioned, true);
assert_eq!(res.file.pos, 2995171);
Ok(())
};
taskrun::run(fut)?;
Ok(())
}
// ------- TODO do the same with Wave (index)
#[test]
fn position_expand_left_file_begins_exactly_after_range() -> Result<(), Error> {
let fut = async {
let path = SCALAR_FILE.into();
let range = NanoRange {
beg: HOUR * 23,
end: DAY,
};
let res = position_file(&path, &range, true, false).await?;
assert_eq!(res.found, false);
assert_eq!(res.file.index, false);
assert_eq!(res.file.positioned, false);
Ok(())
};
taskrun::run(fut)?;
Ok(())
}
// ------- TODO do the same with Wave (index)
#[test]
fn position_expand_right_file_begins_exactly_after_range() -> Result<(), Error> {
let fut = async {
let path = SCALAR_FILE.into();
let range = NanoRange {
beg: HOUR * 23,
end: DAY,
};
let res = position_file(&path, &range, false, true).await?;
assert_eq!(res.found, true);
assert_eq!(res.file.index, false);
assert_eq!(res.file.positioned, true);
assert_eq!(res.file.pos, 23);
Ok(())
};
taskrun::run(fut)?;
Ok(())
}
// TODO add same test for indexed
#[test]
fn position_expand_left_basic_file_at_inner_for_too_small_range() -> Result<(), Error> {
let fut = async {
let path = SCALAR_FILE.into();
let range = NanoRange {
beg: DAY + MS * 1501,
end: DAY + MS * 1502,
};
let res = position_file(&path, &range, true, false).await?;
assert_eq!(res.found, true);
assert_eq!(res.file.index, false);
assert_eq!(res.file.positioned, true);
assert_eq!(res.file.pos, 75);
Ok(())
};
taskrun::run(fut)?;
Ok(())
}
// TODO add same test for indexed
#[test]
fn position_expand_right_basic_file_at_inner_for_too_small_range() -> Result<(), Error> {
let fut = async {
let path = SCALAR_FILE.into();
let range = NanoRange {
beg: DAY + MS * 1501,
end: DAY + MS * 1502,
};
let res = position_file(&path, &range, false, true).await?;
assert_eq!(res.found, true);
assert_eq!(res.file.index, false);
assert_eq!(res.file.positioned, true);
assert_eq!(res.file.pos, 127);
Ok(())
};
taskrun::run(fut)?;
Ok(())
}
}