From 10e0fd887d65594431d2f0667605ad4db3d43ae9 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 22 Sep 2021 17:57:15 +0200 Subject: [PATCH] WIP Before merging Merger --- disk/src/dataopen.rs | 627 ++++++++++++++++++++++++++++----------- disk/src/eventchunker.rs | 16 +- disk/src/index.rs | 102 +++++-- disk/src/mergeblobs.rs | 158 ++++++++++ disk/src/paths.rs | 8 +- 5 files changed, 698 insertions(+), 213 deletions(-) diff --git a/disk/src/dataopen.rs b/disk/src/dataopen.rs index 733a272..78b03ee 100644 --- a/disk/src/dataopen.rs +++ b/disk/src/dataopen.rs @@ -10,6 +10,158 @@ use std::time::Instant; use tokio::fs::{File, OpenOptions}; use tokio::io::{AsyncReadExt, AsyncSeekExt, ErrorKind, SeekFrom}; +pub struct Positioned { + pub file: OpenedFile, + pub found: bool, +} + +pub async fn position_file_for_test( + path: &PathBuf, + range: &NanoRange, + expand_left: bool, + expand_right: bool, +) -> Result { + position_file(path, range, expand_left, expand_right).await +} + +async fn position_file( + path: &PathBuf, + range: &NanoRange, + expand_left: bool, + expand_right: bool, +) -> Result { + assert_eq!(expand_left && expand_right, false); + match OpenOptions::new().read(true).open(&path).await { + Ok(file) => { + let index_path = PathBuf::from(format!("{}_Index", path.to_str().unwrap())); + match OpenOptions::new().read(true).open(&index_path).await { + Ok(mut index_file) => { + let meta = index_file.metadata().await?; + if meta.len() > 1024 * 1024 * 120 { + let msg = format!("too large index file {} bytes for {:?}", meta.len(), index_path); + error!("{}", msg); + return Err(Error::with_msg(msg)); + } else if meta.len() > 1024 * 1024 * 80 { + let msg = format!("very large index file {} bytes for {:?}", meta.len(), index_path); + warn!("{}", msg); + } else if meta.len() > 1024 * 1024 * 20 { + let msg = format!("large index file {} bytes for {:?}", meta.len(), index_path); + info!("{}", msg); + } + if meta.len() < 2 { + return Err(Error::with_msg(format!( + "bad meta len {} for {:?}", + meta.len(), + index_path + ))); + } + if meta.len() % 16 != 2 { + return Err(Error::with_msg(format!( + "bad meta len {} for {:?}", + meta.len(), + index_path + ))); + } + let mut buf = BytesMut::with_capacity(meta.len() as usize); + buf.resize(buf.capacity(), 0); + index_file.read_exact(&mut buf).await?; + let gg = if expand_left { + super::index::find_largest_smaller_than(range.clone(), expand_right, &buf[2..])? + } else { + super::index::find_ge(range.clone(), expand_right, &buf[2..])? + }; + match gg { + Some(o) => { + let mut file = file; + file.seek(SeekFrom::Start(o.1)).await?; + //info!("position_file case A {:?}", path); + let g = OpenedFile { + file: Some(file), + path: path.clone(), + positioned: true, + index: true, + nreads: 0, + pos: o.1, + }; + return Ok(Positioned { file: g, found: true }); + } + None => { + //info!("position_file case B {:?}", path); + let g = OpenedFile { + file: Some(file), + path: path.clone(), + positioned: false, + index: true, + nreads: 0, + pos: 0, + }; + return Ok(Positioned { file: g, found: false }); + } + } + } + Err(e) => match e.kind() { + ErrorKind::NotFound => { + let ts1 = Instant::now(); + let res = if expand_left { + super::index::position_static_len_datafile_at_largest_smaller_than( + file, + range.clone(), + expand_right, + ) + .await? + } else { + super::index::position_static_len_datafile(file, range.clone(), expand_right).await? + }; + let ts2 = Instant::now(); + if false { + // TODO collect for stats: + let dur = ts2.duration_since(ts1); + info!("position_static_len_datafile took ms {}", dur.as_millis()); + } + let file = res.0; + if res.1 { + //info!("position_file case C {:?}", path); + let g = OpenedFile { + file: Some(file), + path: path.clone(), + positioned: true, + index: false, + nreads: res.2, + pos: res.3, + }; + return Ok(Positioned { file: g, found: true }); + } else { + //info!("position_file case D {:?}", path); + let g = OpenedFile { + file: Some(file), + path: path.clone(), + positioned: false, + index: false, + nreads: res.2, + pos: 0, + }; + return Ok(Positioned { file: g, found: false }); + } + } + _ => Err(e)?, + }, + } + } + Err(e) => { + warn!("can not open {:?} error {:?}", path, e); + let g = OpenedFile { + file: None, + path: path.clone(), + positioned: false, + index: true, + nreads: 0, + pos: 0, + }; + return Ok(Positioned { file: g, found: false }); + } + } +} + pub struct OpenedFile { pub path: PathBuf, pub file: Option, @@ -82,7 +234,7 @@ async fn open_files_inner( } let mut a = vec![]; for path in paths::datapaths_for_timebin(tb, &channel_config, &node).await? { - let w = position_file(&path, range, false).await?; + let w = position_file(&path, range, false, false).await?; if w.found { a.push(w.file); } @@ -97,141 +249,9 @@ async fn open_files_inner( Ok(()) } -struct Positioned { - file: OpenedFile, - found: bool, -} - -async fn position_file(path: &PathBuf, range: &NanoRange, expand: bool) -> Result { - match OpenOptions::new().read(true).open(&path).await { - Ok(file) => { - let index_path = PathBuf::from(format!("{}_Index", path.to_str().unwrap())); - match OpenOptions::new().read(true).open(&index_path).await { - Ok(mut index_file) => { - let meta = index_file.metadata().await?; - if meta.len() > 1024 * 1024 * 120 { - let msg = format!("too large index file {} bytes for {:?}", meta.len(), index_path); - error!("{}", msg); - return Err(Error::with_msg(msg)); - } else if meta.len() > 1024 * 1024 * 80 { - let msg = format!("very large index file {} bytes for {:?}", meta.len(), index_path); - warn!("{}", msg); - } else if meta.len() > 1024 * 1024 * 20 { - let msg = format!("large index file {} bytes for {:?}", meta.len(), index_path); - info!("{}", msg); - } - if meta.len() < 2 { - return Err(Error::with_msg(format!( - "bad meta len {} for {:?}", - meta.len(), - index_path - ))); - } - if meta.len() % 16 != 2 { - return Err(Error::with_msg(format!( - "bad meta len {} for {:?}", - meta.len(), - index_path - ))); - } - let mut buf = BytesMut::with_capacity(meta.len() as usize); - buf.resize(buf.capacity(), 0); - index_file.read_exact(&mut buf).await?; - let gg = if expand { - super::index::find_largest_smaller_than(range.beg, &buf[2..])? - } else { - super::index::find_ge(range.beg, &buf[2..])? - }; - match gg { - Some(o) => { - let mut file = file; - file.seek(SeekFrom::Start(o.1)).await?; - info!("position_file case A {:?}", path); - let g = OpenedFile { - file: Some(file), - path: path.clone(), - positioned: true, - index: true, - nreads: 0, - pos: o.1, - }; - return Ok(Positioned { file: g, found: true }); - } - None => { - info!("position_file case B {:?}", path); - let g = OpenedFile { - file: None, - path: path.clone(), - positioned: false, - index: true, - nreads: 0, - pos: 0, - }; - return Ok(Positioned { file: g, found: false }); - } - } - } - Err(e) => match e.kind() { - ErrorKind::NotFound => { - let ts1 = Instant::now(); - let res = if expand { - super::index::position_static_len_datafile_at_largest_smaller_than(file, range.clone()) - .await? - } else { - super::index::position_static_len_datafile(file, range.clone()).await? - }; - let ts2 = Instant::now(); - if false { - // TODO collect for stats: - let dur = ts2.duration_since(ts1); - info!("position_static_len_datafile took ms {}", dur.as_millis()); - } - let file = res.0; - if res.1 { - info!("position_file case C {:?}", path); - let g = OpenedFile { - file: Some(file), - path: path.clone(), - positioned: true, - index: false, - nreads: res.2, - pos: res.3, - }; - return Ok(Positioned { file: g, found: true }); - } else { - info!("position_file case D {:?}", path); - let g = OpenedFile { - file: None, - path: path.clone(), - positioned: false, - index: false, - nreads: res.2, - pos: 0, - }; - return Ok(Positioned { file: g, found: false }); - } - } - _ => Err(e)?, - }, - } - } - Err(e) => { - warn!("can not open {:?} error {:?}", path, e); - let g = OpenedFile { - file: None, - path: path.clone(), - positioned: false, - index: true, - nreads: 0, - pos: 0, - }; - return Ok(Positioned { file: g, found: false }); - } - } -} - -/* +/** Provide the stream of positioned data files which are relevant for the given parameters. + Expanded to one event before and after the requested range, if exists. */ pub fn open_expanded_files( @@ -289,7 +309,7 @@ async fn get_timebins(channel_config: &ChannelConfig, node: Node) -> Result>, range: &NanoRange, channel_config: &ChannelConfig, @@ -317,16 +337,16 @@ pub async fn open_expanded_files_inner( p1, range, channel_config ))); } - let mut found_first = false; + let mut found_pre = false; loop { let tb = timebins[p1]; let mut a = vec![]; for path in paths::datapaths_for_timebin(tb, &channel_config, &node).await? { - let w = position_file(&path, range, true).await?; + let w = position_file(&path, range, true, false).await?; if w.found { info!("----- open_expanded_files_inner w.found for {:?}", path); a.push(w.file); - found_first = true; + found_pre = true; } } let h = OpenedFileSet { timebin: tb, files: a }; @@ -335,7 +355,7 @@ pub async fn open_expanded_files_inner( h.files.len() ); chtx.send(Ok(h)).await?; - if found_first { + if found_pre { p1 += 1; break; } else if p1 == 0 { @@ -344,13 +364,13 @@ pub async fn open_expanded_files_inner( p1 -= 1; } } - if found_first { + if found_pre { // Append all following positioned files. while p1 < timebins.len() { let tb = timebins[p1]; let mut a = vec![]; for path in paths::datapaths_for_timebin(tb, &channel_config, &node).await? { - let w = position_file(&path, range, false).await?; + let w = position_file(&path, range, false, true).await?; if w.found { a.push(w.file); } @@ -419,19 +439,23 @@ mod test { use err::Error; use netpod::timeunits::{DAY, HOUR, MS}; use netpod::NanoRange; + use tokio::fs::OpenOptions; + + const WAVE_FILE: &str = + "../tmpdata/node00/ks_3/byTime/wave-f64-be-n21/0000000000000000001/0000000000/0000000000086400000_00000_Data"; + const SCALAR_FILE: &str = + "../tmpdata/node00/ks_2/byTime/scalar-i32-be/0000000000000000001/0000000000/0000000000086400000_00000_Data"; #[test] fn position_basic_file_at_begin() -> Result<(), Error> { let fut = async { - let path = "../tmpdata/node00/ks_2/byTime/scalar-i32-be/0000000000000000001/0000000000/0000000000086400000_00000_Data".into(); + let path = SCALAR_FILE.into(); let range = NanoRange { beg: DAY, end: DAY + MS * 20000, }; - let expand = false; - let res = position_file(&path, &range, expand).await?; + let res = position_file(&path, &range, false, false).await?; assert_eq!(res.found, true); - assert_eq!(res.file.file.is_some(), true); assert_eq!(res.file.index, false); assert_eq!(res.file.positioned, true); assert_eq!(res.file.pos, 23); @@ -444,18 +468,15 @@ mod test { #[test] fn position_basic_file_for_empty_range() -> Result<(), Error> { let fut = async { - let path = "../tmpdata/node00/ks_2/byTime/scalar-i32-be/0000000000000000001/0000000000/0000000000086400000_00000_Data".into(); + let path = SCALAR_FILE.into(); let range = NanoRange { beg: DAY + MS * 80000, end: DAY + MS * 80000, }; - let expand = false; - let res = position_file(&path, &range, expand).await?; + let res = position_file(&path, &range, false, false).await?; assert_eq!(res.found, false); - assert_eq!(res.file.file.is_some(), false); assert_eq!(res.file.index, false); assert_eq!(res.file.positioned, false); - //assert_eq!(res.file.pos, 23); Ok(()) }; taskrun::run(fut)?; @@ -463,17 +484,15 @@ mod test { } #[test] - fn position_basic_file_at_begin_for_small_range() -> Result<(), Error> { + fn position_basic_file_at_begin_for_range() -> Result<(), Error> { let fut = async { - let path = "../tmpdata/node00/ks_2/byTime/scalar-i32-be/0000000000000000001/0000000000/0000000000086400000_00000_Data".into(); + let path = SCALAR_FILE.into(); let range = NanoRange { beg: DAY, end: DAY + MS * 300000, }; - let expand = false; - let res = position_file(&path, &range, expand).await?; + let res = position_file(&path, &range, false, false).await?; assert_eq!(res.found, true); - assert_eq!(res.file.file.is_some(), true); assert_eq!(res.file.index, false); assert_eq!(res.file.positioned, true); assert_eq!(res.file.pos, 23); @@ -486,15 +505,13 @@ mod test { #[test] fn position_basic_file_at_inner() -> Result<(), Error> { let fut = async { - let path = "../tmpdata/node00/ks_2/byTime/scalar-i32-be/0000000000000000001/0000000000/0000000000086400000_00000_Data".into(); + let path = SCALAR_FILE.into(); let range = NanoRange { beg: DAY + MS * 4000, end: DAY + MS * 7000, }; - let expand = false; - let res = position_file(&path, &range, expand).await?; + let res = position_file(&path, &range, false, false).await?; assert_eq!(res.found, true); - assert_eq!(res.file.file.is_some(), true); assert_eq!(res.file.index, false); assert_eq!(res.file.positioned, true); assert_eq!(res.file.pos, 179); @@ -504,20 +521,129 @@ mod test { Ok(()) } + // TODO add same test for WAVE #[test] fn position_basic_file_at_inner_for_too_small_range() -> Result<(), Error> { let fut = async { - let path = "../tmpdata/node00/ks_2/byTime/scalar-i32-be/0000000000000000001/0000000000/0000000000086400000_00000_Data".into(); + let path = SCALAR_FILE.into(); let range = NanoRange { beg: DAY + MS * 1501, end: DAY + MS * 1502, }; - let expand = false; - let res = position_file(&path, &range, expand).await?; + let res = position_file(&path, &range, false, false).await?; assert_eq!(res.found, false); - assert_eq!(res.file.file.is_some(), false); assert_eq!(res.file.index, false); assert_eq!(res.file.positioned, false); + Ok(()) + }; + taskrun::run(fut)?; + Ok(()) + } + + // TODO add same test for WAVE + #[test] + fn position_basic_file_starts_after_range() -> Result<(), Error> { + let fut = async { + let path = SCALAR_FILE.into(); + let range = NanoRange { + beg: HOUR * 22, + end: HOUR * 23, + }; + let res = position_file(&path, &range, false, false).await?; + assert_eq!(res.found, false); + assert_eq!(res.file.index, false); + assert_eq!(res.file.positioned, false); + Ok(()) + }; + taskrun::run(fut)?; + Ok(()) + } + + #[test] + fn position_basic_file_ends_before_range() -> Result<(), Error> { + let fut = async { + let path = SCALAR_FILE.into(); + let range = NanoRange { + beg: DAY * 2, + end: DAY * 2 + HOUR, + }; + let res = position_file(&path, &range, false, false).await?; + assert_eq!(res.found, false); + assert_eq!(res.file.index, false); + assert_eq!(res.file.positioned, false); + Ok(()) + }; + taskrun::run(fut)?; + Ok(()) + } + + #[test] + fn position_basic_index() -> Result<(), Error> { + let fut = async { + let path = WAVE_FILE.into(); + let range = NanoRange { + beg: DAY + MS * 4000, + end: DAY + MS * 90000, + }; + let res = position_file(&path, &range, false, false).await?; + assert_eq!(res.found, true); + assert_eq!(res.file.index, true); + assert_eq!(res.file.positioned, true); + assert_eq!(res.file.pos, 184); + Ok(()) + }; + taskrun::run(fut)?; + Ok(()) + } + + #[test] + fn position_basic_index_too_small_range() -> Result<(), Error> { + let fut = async { + let path = WAVE_FILE.into(); + let range = NanoRange { + beg: DAY + MS * 3100, + end: DAY + MS * 3200, + }; + let res = position_file(&path, &range, false, false).await?; + assert_eq!(res.found, false); + assert_eq!(res.file.index, true); + assert_eq!(res.file.positioned, false); + Ok(()) + }; + taskrun::run(fut)?; + Ok(()) + } + + #[test] + fn position_basic_index_starts_after_range() -> Result<(), Error> { + let fut = async { + let path = WAVE_FILE.into(); + let range = NanoRange { + beg: HOUR * 10, + end: HOUR * 12, + }; + let res = position_file(&path, &range, false, false).await?; + assert_eq!(res.found, false); + assert_eq!(res.file.index, true); + assert_eq!(res.file.positioned, false); + Ok(()) + }; + taskrun::run(fut)?; + Ok(()) + } + + #[test] + fn position_basic_index_ends_before_range() -> Result<(), Error> { + let fut = async { + let path = WAVE_FILE.into(); + let range = NanoRange { + beg: DAY * 2, + end: DAY * 2 + MS * 40000, + }; + let res = position_file(&path, &range, false, false).await?; + assert_eq!(res.found, false); + assert_eq!(res.file.index, true); + assert_eq!(res.file.positioned, false); assert_eq!(res.file.pos, 0); Ok(()) }; @@ -525,18 +651,40 @@ mod test { Ok(()) } + // + // -------------- Expanded ----------------------------------- + // + #[test] - fn position_basic_file_starts_after_range() -> Result<(), Error> { + fn position_expand_file_at_begin_no_fallback() -> Result<(), Error> { let fut = async { - let path = "../tmpdata/node00/ks_2/byTime/scalar-i32-be/0000000000000000001/0000000000/0000000000086400000_00000_Data".into(); + let path = SCALAR_FILE; let range = NanoRange { - beg: HOUR * 22, - end: HOUR * 23, + beg: DAY + MS * 3000, + end: DAY + MS * 40000, }; - let expand = false; - let res = position_file(&path, &range, expand).await?; + let file = OpenOptions::new().read(true).open(path).await?; + let res = + super::super::index::position_static_len_datafile_at_largest_smaller_than(file, range.clone(), true) + .await?; + assert_eq!(res.1, true); + assert_eq!(res.3, 75); + Ok(()) + }; + taskrun::run(fut)?; + Ok(()) + } + + #[test] + fn position_expand_left_file_at_evts_file_begin() -> Result<(), Error> { + let fut = async { + let path = SCALAR_FILE.into(); + let range = NanoRange { + beg: DAY, + end: DAY + MS * 40000, + }; + let res = position_file(&path, &range, true, false).await?; assert_eq!(res.found, false); - assert_eq!(res.file.file.is_some(), false); assert_eq!(res.file.index, false); assert_eq!(res.file.positioned, false); Ok(()) @@ -544,4 +692,141 @@ mod test { taskrun::run(fut)?; Ok(()) } + + #[test] + fn position_expand_right_file_at_evts_file_begin() -> Result<(), Error> { + let fut = async { + let path = SCALAR_FILE.into(); + let range = NanoRange { + beg: DAY, + end: DAY + MS * 40000, + }; + let res = position_file(&path, &range, false, true).await?; + assert_eq!(res.found, true); + assert_eq!(res.file.index, false); + assert_eq!(res.file.positioned, true); + assert_eq!(res.file.pos, 23); + Ok(()) + }; + taskrun::run(fut)?; + Ok(()) + } + + #[test] + fn position_expand_left_file_at_evts_file_within() -> Result<(), Error> { + let fut = async { + let path = SCALAR_FILE.into(); + let range = NanoRange { + beg: DAY + MS * 3000, + end: DAY + MS * 40000, + }; + let res = position_file(&path, &range, true, false).await?; + assert_eq!(res.found, true); + assert_eq!(res.file.index, false); + assert_eq!(res.file.positioned, true); + assert_eq!(res.file.pos, 75); + Ok(()) + }; + taskrun::run(fut)?; + Ok(()) + } + + // ------- TODO do the same with Wave (index) + #[test] + fn position_expand_left_file_ends_before_range() -> Result<(), Error> { + let fut = async { + let path = SCALAR_FILE.into(); + let range = NanoRange { + beg: DAY * 2, + end: DAY * 2 + MS * 40000, + }; + let res = position_file(&path, &range, true, false).await?; + assert_eq!(res.found, true); + assert_eq!(res.file.index, false); + assert_eq!(res.file.positioned, true); + assert_eq!(res.file.pos, 2995171); + Ok(()) + }; + taskrun::run(fut)?; + Ok(()) + } + + // ------- TODO do the same with Wave (index) + #[test] + fn position_expand_left_file_begins_exactly_after_range() -> Result<(), Error> { + let fut = async { + let path = SCALAR_FILE.into(); + let range = NanoRange { + beg: HOUR * 23, + end: DAY, + }; + let res = position_file(&path, &range, true, false).await?; + assert_eq!(res.found, false); + assert_eq!(res.file.index, false); + assert_eq!(res.file.positioned, false); + Ok(()) + }; + taskrun::run(fut)?; + Ok(()) + } + + // ------- TODO do the same with Wave (index) + #[test] + fn position_expand_right_file_begins_exactly_after_range() -> Result<(), Error> { + let fut = async { + let path = SCALAR_FILE.into(); + let range = NanoRange { + beg: HOUR * 23, + end: DAY, + }; + let res = position_file(&path, &range, false, true).await?; + assert_eq!(res.found, true); + assert_eq!(res.file.index, false); + assert_eq!(res.file.positioned, true); + assert_eq!(res.file.pos, 23); + Ok(()) + }; + taskrun::run(fut)?; + Ok(()) + } + + // TODO add same test for indexed + #[test] + fn position_expand_left_basic_file_at_inner_for_too_small_range() -> Result<(), Error> { + let fut = async { + let path = SCALAR_FILE.into(); + let range = NanoRange { + beg: DAY + MS * 1501, + end: DAY + MS * 1502, + }; + let res = position_file(&path, &range, true, false).await?; + assert_eq!(res.found, true); + assert_eq!(res.file.index, false); + assert_eq!(res.file.positioned, true); + assert_eq!(res.file.pos, 75); + Ok(()) + }; + taskrun::run(fut)?; + Ok(()) + } + + // TODO add same test for indexed + #[test] + fn position_expand_right_basic_file_at_inner_for_too_small_range() -> Result<(), Error> { + let fut = async { + let path = SCALAR_FILE.into(); + let range = NanoRange { + beg: DAY + MS * 1501, + end: DAY + MS * 1502, + }; + let res = position_file(&path, &range, false, true).await?; + assert_eq!(res.found, true); + assert_eq!(res.file.index, false); + assert_eq!(res.file.positioned, true); + assert_eq!(res.file.pos, 127); + Ok(()) + }; + taskrun::run(fut)?; + Ok(()) + } } diff --git a/disk/src/eventchunker.rs b/disk/src/eventchunker.rs index 308ef8d..2f39f3b 100644 --- a/disk/src/eventchunker.rs +++ b/disk/src/eventchunker.rs @@ -35,7 +35,7 @@ pub struct EventChunker { data_emit_complete: bool, final_stats_sent: bool, parsed_bytes: u64, - path: PathBuf, + dbg_path: PathBuf, max_ts: Arc, expand: bool, do_decompress: bool, @@ -78,12 +78,13 @@ impl EventChunkerConf { } impl EventChunker { + // TODO `expand` flag usage pub fn from_start( inp: Pin> + Send>>, channel_config: ChannelConfig, range: NanoRange, stats_conf: EventChunkerConf, - path: PathBuf, + dbg_path: PathBuf, max_ts: Arc, expand: bool, do_decompress: bool, @@ -104,7 +105,7 @@ impl EventChunker { data_emit_complete: false, final_stats_sent: false, parsed_bytes: 0, - path, + dbg_path, max_ts, expand, do_decompress, @@ -116,12 +117,13 @@ impl EventChunker { } } + // TODO `expand` flag usage pub fn from_event_boundary( inp: Pin> + Send>>, channel_config: ChannelConfig, range: NanoRange, stats_conf: EventChunkerConf, - path: PathBuf, + dbg_path: PathBuf, max_ts: Arc, expand: bool, do_decompress: bool, @@ -131,7 +133,7 @@ impl EventChunker { channel_config, range, stats_conf, - path, + dbg_path, max_ts, expand, do_decompress, @@ -214,7 +216,7 @@ impl EventChunker { max_ts / SEC, max_ts % SEC, self.channel_config.shape, - self.path + self.dbg_path ); warn!("{}", msg); self.unordered_warn_count += 1; @@ -242,7 +244,7 @@ impl EventChunker { self.range.end % SEC, pulse, self.channel_config.shape, - self.path + self.dbg_path )); Err(e)?; } diff --git a/disk/src/index.rs b/disk/src/index.rs index d736cc0..a5021ed 100644 --- a/disk/src/index.rs +++ b/disk/src/index.rs @@ -7,8 +7,10 @@ use std::mem::size_of; use tokio::fs::File; use tokio::io::{AsyncReadExt, AsyncSeekExt, SeekFrom}; -pub fn find_ge(h: u64, buf: &[u8]) -> Result, Error> { - const N: usize = 2 * size_of::(); +pub fn find_ge(range: NanoRange, expand_right: bool, buf: &[u8]) -> Result, Error> { + type VT = u64; + const NT: usize = size_of::(); + const N: usize = 2 * NT; let n1 = buf.len(); if n1 % N != 0 { return Err(Error::with_msg(format!("find_ge bad len {}", n1))); @@ -19,38 +21,57 @@ pub fn find_ge(h: u64, buf: &[u8]) -> Result, Error> { } let n1 = n1 / N; let a = unsafe { - let ptr = &buf[0] as *const u8 as *const ([u8; 8], [u8; 8]); + let ptr = &buf[0] as *const u8 as *const ([u8; NT], [u8; NT]); std::slice::from_raw_parts(ptr, n1) }; let mut j = 0; let mut k = n1 - 1; - let x = u64::from_be_bytes(a[j].0); - let y = u64::from_be_bytes(a[k].0); + let x = VT::from_be_bytes(a[j].0); + let y = VT::from_be_bytes(a[k].0); if x >= y { return Err(Error::with_msg(format!("search in unordered data"))); } - if x >= h { - return Ok(Some((u64::from_be_bytes(a[j].0), u64::from_be_bytes(a[j].1)))); - } - if y < h { + if y < range.beg { return Ok(None); } + if x >= range.beg { + if x < range.end || expand_right { + return Ok(Some((x, VT::from_be_bytes(a[j].1)))); + } else { + return Ok(None); + } + } + let mut x = x; + let mut y = y; loop { + if x >= y { + return Err(Error::with_msg(format!("search in unordered data"))); + } if k - j < 2 { - let ret = (u64::from_be_bytes(a[k].0), u64::from_be_bytes(a[k].1)); - return Ok(Some(ret)); + if y < range.end || expand_right { + let ret = (y, VT::from_be_bytes(a[k].1)); + return Ok(Some(ret)); + } else { + return Ok(None); + } } let m = (k + j) / 2; - let x = u64::from_be_bytes(a[m].0); - if x < h { + let e = VT::from_be_bytes(a[m].0); + if e < range.beg { j = m; + x = e; } else { k = m; + y = e; } } } -pub fn find_largest_smaller_than(h: u64, buf: &[u8]) -> Result, Error> { +pub fn find_largest_smaller_than( + range: NanoRange, + _expand_right: bool, + buf: &[u8], +) -> Result, Error> { type NUM = u64; const ELESIZE: usize = size_of::(); const N: usize = 2 * ELESIZE; @@ -74,24 +95,31 @@ pub fn find_largest_smaller_than(h: u64, buf: &[u8]) -> Result= y { return Err(Error::with_msg(format!("search in unordered data"))); } - if x >= h { + if x >= range.beg { return Ok(None); } - if y < h { - let ret = (NUM::from_be_bytes(a[k].0), NUM::from_be_bytes(a[k].1)); + if y < range.beg { + let ret = (y, NUM::from_be_bytes(a[k].1)); return Ok(Some(ret)); } + let mut x = x; + let mut y = y; loop { + if x >= y { + return Err(Error::with_msg(format!("search in unordered data"))); + } if k - j < 2 { - let ret = (NUM::from_be_bytes(a[j].0), NUM::from_be_bytes(a[j].1)); + let ret = (x, NUM::from_be_bytes(a[j].1)); return Ok(Some(ret)); } let m = (k + j) / 2; - let x = NUM::from_be_bytes(a[m].0); - if x < h { + let e = NUM::from_be_bytes(a[m].0); + if e < range.beg { j = m; + x = e; } else { k = m; + y = e; } } } @@ -169,7 +197,11 @@ pub async fn read_event_at(pos: u64, file: &mut File) -> Result<(u32, Nanos), Er Ok(ev) } -pub async fn position_static_len_datafile(mut file: File, range: NanoRange) -> Result<(File, bool, u32, u64), Error> { +pub async fn position_static_len_datafile( + mut file: File, + range: NanoRange, + expand_right: bool, +) -> Result<(File, bool, u32, u64), Error> { let flen = file.seek(SeekFrom::End(0)).await?; file.seek(SeekFrom::Start(0)).await?; let mut buf = vec![0; 1024]; @@ -191,28 +223,39 @@ pub async fn position_static_len_datafile(mut file: File, range: NanoRange) -> R let y = t.1.ns; let mut nreads = 2; if x >= range.end { - file.seek(SeekFrom::Start(j)).await?; - return Ok((file, false, nreads, j)); + if expand_right { + file.seek(SeekFrom::Start(j)).await?; + return Ok((file, true, nreads, j)); + } else { + file.seek(SeekFrom::Start(0)).await?; + return Ok((file, false, nreads, 0)); + } } if y < range.beg { file.seek(SeekFrom::Start(j)).await?; return Ok((file, false, nreads, j)); } - if x >= range.beg && x < range.end { - file.seek(SeekFrom::Start(j)).await?; - return Ok((file, true, nreads, j)); + if x >= range.beg { + if x < range.end || expand_right { + file.seek(SeekFrom::Start(j)).await?; + return Ok((file, true, nreads, j)); + } else { + file.seek(SeekFrom::Start(0)).await?; + return Ok((file, false, nreads, 0)); + } } let mut x = x; let mut y = y; loop { + assert!(x < y); assert_eq!((k - j) % evlen, 0); if k - j < 2 * evlen { - if y < range.end { + if y < range.end || expand_right { file.seek(SeekFrom::Start(k)).await?; return Ok((file, true, nreads, k)); } else { - file.seek(SeekFrom::Start(k)).await?; - return Ok((file, false, nreads, k)); + file.seek(SeekFrom::Start(0)).await?; + return Ok((file, false, nreads, 0)); } } let m = j + (k - j) / 2 / evlen * evlen; @@ -238,6 +281,7 @@ pub async fn position_static_len_datafile(mut file: File, range: NanoRange) -> R pub async fn position_static_len_datafile_at_largest_smaller_than( mut file: File, range: NanoRange, + _expand_right: bool, ) -> Result<(File, bool, u32, u64), Error> { let flen = file.seek(SeekFrom::End(0)).await?; file.seek(SeekFrom::Start(0)).await?; diff --git a/disk/src/mergeblobs.rs b/disk/src/mergeblobs.rs index ed9984f..7b3f8f0 100644 --- a/disk/src/mergeblobs.rs +++ b/disk/src/mergeblobs.rs @@ -284,3 +284,161 @@ where 0 } } + +#[cfg(test)] +mod test { + use crate::dataopen::position_file_for_test; + use crate::eventchunker::{EventChunker, EventChunkerConf}; + use crate::file_content_stream; + use err::Error; + use futures_util::StreamExt; + use items::{RangeCompletableItem, StreamItem}; + use netpod::log::*; + use netpod::timeunits::{DAY, MS}; + use netpod::{ByteOrder, ByteSize, Channel, ChannelConfig, FileIoBufferSize, NanoRange, Nanos, ScalarType, Shape}; + use std::path::PathBuf; + use std::sync::atomic::AtomicU64; + use std::sync::Arc; + + const SCALAR_FILE: &str = + "../tmpdata/node00/ks_2/byTime/scalar-i32-be/0000000000000000001/0000000000/0000000000086400000_00000_Data"; + const WAVE_FILE: &str = + "../tmpdata/node00/ks_3/byTime/wave-f64-be-n21/0000000000000000001/0000000000/0000000000086400000_00000_Data"; + + struct CollectedEvents { + tss: Vec, + } + // TODO generify the Mergers into one. + + async fn collect_merged_events(paths: Vec, range: NanoRange) -> Result { + let mut files = vec![]; + for path in paths { + let p = position_file_for_test(&path, &range, false, false).await?; + if !p.found { + return Err(Error::with_msg_no_trace("can not position file??")); + } + files.push( + p.file + .file + .ok_or_else(|| Error::with_msg(format!("can not open file {:?}", path)))?, + ); + } + //Merge + let file_io_buffer_size = FileIoBufferSize(1024 * 4); + let inp = file_content_stream(err::todoval(), file_io_buffer_size); + let inp = Box::pin(inp); + let channel_config = ChannelConfig { + channel: Channel { + backend: "testbackend".into(), + name: "scalar-i32-be".into(), + }, + keyspace: 2, + time_bin_size: Nanos { ns: DAY }, + scalar_type: ScalarType::I32, + byte_order: ByteOrder::BE, + array: false, + compression: false, + shape: Shape::Scalar, + }; + let stats_conf = EventChunkerConf { + disk_stats_every: ByteSize::kb(1024), + }; + let max_ts = Arc::new(AtomicU64::new(0)); + let expand = false; + let do_decompress = false; + let dbg_path = err::todoval(); + + // TODO `expand` flag usage + + let mut chunker = EventChunker::from_event_boundary( + inp, + channel_config, + range, + stats_conf, + dbg_path, + max_ts, + expand, + do_decompress, + ); + + let mut i1 = 0; + while let Some(item) = chunker.next().await { + if let Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) = item { + info!("item: {:?}", item); + i1 += 1; + } + if i1 >= 10 { + break; + } + } + info!("read {} data items", i1); + err::todoval() + } + + #[test] + fn single_file_through_merger() -> Result<(), Error> { + let fut = async { + // TODO open a single file, model after the real opening procedure. + //let file = OpenOptions::new().read(true).open(SCALAR_FILE).await?; + let range = NanoRange { + beg: DAY + MS * 1501, + end: DAY + MS * 4000, + }; + let path = PathBuf::from(SCALAR_FILE); + let p = position_file_for_test(&path, &range, false, false).await?; + if !p.found { + return Err(Error::with_msg_no_trace("can not position file??")); + } + let file_io_buffer_size = FileIoBufferSize(1024 * 4); + let inp = file_content_stream(p.file.file.unwrap(), file_io_buffer_size); + let inp = Box::pin(inp); + let channel_config = ChannelConfig { + channel: Channel { + backend: "testbackend".into(), + name: "scalar-i32-be".into(), + }, + keyspace: 2, + time_bin_size: Nanos { ns: DAY }, + scalar_type: ScalarType::I32, + byte_order: ByteOrder::BE, + array: false, + compression: false, + shape: Shape::Scalar, + }; + let stats_conf = EventChunkerConf { + disk_stats_every: ByteSize::kb(1024), + }; + let max_ts = Arc::new(AtomicU64::new(0)); + let expand = false; + let do_decompress = false; + + // TODO `expand` flag usage + + let mut chunker = EventChunker::from_event_boundary( + inp, + channel_config, + range, + stats_conf, + path, + max_ts, + expand, + do_decompress, + ); + + let mut i1 = 0; + while let Some(item) = chunker.next().await { + if let Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) = item { + info!("item: {:?}", item); + i1 += 1; + } + if i1 >= 10 { + break; + } + } + info!("read {} data items", i1); + Ok(()) + }; + // TODO in general, emit the error message in taskrun::run? + taskrun::run(fut) + } +} diff --git a/disk/src/paths.rs b/disk/src/paths.rs index c7cce60..7b4d7e2 100644 --- a/disk/src/paths.rs +++ b/disk/src/paths.rs @@ -1,6 +1,5 @@ use err::Error; use futures_util::StreamExt; -use netpod::log::*; use netpod::timeunits::MS; use netpod::{ChannelConfig, Nanos, Node}; use std::path::PathBuf; @@ -41,9 +40,9 @@ pub async fn datapaths_for_timebin( let dn = e .file_name() .into_string() - .map_err(|s| Error::with_msg(format!("Bad OS path {:?}", s)))?; + .map_err(|s| Error::with_msg(format!("Bad OS path {:?} path: {:?}", s, e.path())))?; if dn.len() != 10 { - return Err(Error::with_msg(format!("bad split dirname {:?}", e))); + return Err(Error::with_msg(format!("bad split dirname path: {:?}", e.path()))); } let vv = dn.chars().fold(0, |a, x| if x.is_digit(10) { a + 1 } else { a }); if vv == 10 { @@ -62,9 +61,6 @@ pub async fn datapaths_for_timebin( .join(format!("{:019}_00000_Data", config.time_bin_size.ns / MS)); ret.push(path); } - if false { - info!("datapaths_for_timebin returns: {:?}", ret) - } Ok(ret) }