From 6e7a279a111cca9047054ddbf18fe79d110e5514 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 1 Sep 2021 14:10:35 +0200 Subject: [PATCH] Find files for expanded query --- disk/src/dataopen.rs | 334 +++++++++++++++++++++++++++++++++++++++---- disk/src/gen.rs | 18 ++- disk/src/index.rs | 104 ++++++++++++++ 3 files changed, 421 insertions(+), 35 deletions(-) diff --git a/disk/src/dataopen.rs b/disk/src/dataopen.rs index d93d0d3..5256a81 100644 --- a/disk/src/dataopen.rs +++ b/disk/src/dataopen.rs @@ -3,8 +3,8 @@ use bytes::BytesMut; use err::Error; use futures_util::StreamExt; use netpod::log::*; -use netpod::timeunits::DAY; use netpod::{ChannelConfig, NanoRange, Nanos, Node}; +use std::fmt; use std::path::PathBuf; use std::time::Instant; use tokio::fs::{File, OpenOptions}; @@ -18,6 +18,18 @@ pub struct OpenedFile { pub nreads: u32, } +impl fmt::Debug for OpenedFile { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("OpenedFile") + .field("path", &self.path) + .field("file", &self.file) + .field("positioned", &self.positioned) + .field("index", &self.index) + .field("nreads", &self.nreads) + .finish() + } +} + pub fn open_files( range: &NanoRange, channel_config: &ChannelConfig, @@ -64,6 +76,7 @@ async fn open_files_inner( } } timebins.sort_unstable(); + let timebins = timebins; for &tb in &timebins { let ts_bin = Nanos { ns: tb * channel_config.time_bin_size.ns, @@ -150,7 +163,7 @@ async fn open_files_inner( path, positioned: false, index: false, - nreads: 0, + nreads: res.2, } } } @@ -164,58 +177,321 @@ async fn open_files_inner( Ok(()) } -/** -Try to find and position the file with the youngest event before the requested range. +/* +Provide the stream of positioned data files which are relevant for the given parameters. +Expanded to one event before and after the requested range, if exists. */ -async fn single_file_before_range( - chtx: async_channel::Sender>, - range: NanoRange, - channel_config: ChannelConfig, +pub fn open_expanded_files( + range: &NanoRange, + channel_config: &ChannelConfig, + node: Node, +) -> async_channel::Receiver> { + let (chtx, chrx) = async_channel::bounded(2); + let range = range.clone(); + let channel_config = channel_config.clone(); + tokio::spawn(async move { + match open_expanded_files_inner(&chtx, &range, &channel_config, node).await { + Ok(_) => {} + Err(e) => match chtx.send(Err(e.into())).await { + Ok(_) => {} + Err(e) => { + error!("open_files channel send error {:?}", e); + } + }, + } + }); + chrx +} + +async fn open_expanded_files_inner( + chtx: &async_channel::Sender>, + range: &NanoRange, + channel_config: &ChannelConfig, node: Node, ) -> Result<(), Error> { + let channel_config = channel_config.clone(); + let mut timebins = vec![]; + { + let rd = tokio::fs::read_dir(paths::channel_timebins_dir_path(&channel_config, &node)?).await?; + let mut rd = tokio_stream::wrappers::ReadDirStream::new(rd); + while let Some(e) = rd.next().await { + let e = e?; + let dn = e + .file_name() + .into_string() + .map_err(|e| Error::with_msg(format!("Bad OS path {:?}", e)))?; + let vv = dn.chars().fold(0, |a, x| if x.is_digit(10) { a + 1 } else { a }); + if vv == 19 { + timebins.push(dn.parse::()?); + } + } + } + timebins.sort_unstable(); + let timebins = timebins; + if timebins.len() == 0 { + return Ok(()); + } + let mut p1 = None; + for (i1, tb) in timebins.iter().enumerate().rev() { + let ts_bin = Nanos { + ns: tb * channel_config.time_bin_size.ns, + }; + if ts_bin.ns <= range.beg { + p1 = Some(i1); + break; + } + } + let mut p1 = if let Some(i1) = p1 { i1 } else { 0 }; + let mut found_first = false; + loop { + let tb = timebins[p1]; + let ts_bin = Nanos { + ns: tb * channel_config.time_bin_size.ns, + }; + let path = paths::datapath(tb, &channel_config, &node); + let mut file = OpenOptions::new().read(true).open(&path).await?; + let ret = { + let index_path = paths::index_path(ts_bin, &channel_config, &node)?; + match OpenOptions::new().read(true).open(&index_path).await { + Ok(mut index_file) => { + let meta = index_file.metadata().await?; + if meta.len() > 1024 * 1024 * 20 { + return Err(Error::with_msg(format!( + "too large index file {} bytes for {}", + meta.len(), + channel_config.channel.name + ))); + } + if meta.len() < 2 { + return Err(Error::with_msg(format!( + "bad meta len {} for {}", + meta.len(), + channel_config.channel.name + ))); + } + if meta.len() % 16 != 2 { + return Err(Error::with_msg(format!( + "bad meta len {} for {}", + meta.len(), + channel_config.channel.name + ))); + } + let mut buf = BytesMut::with_capacity(meta.len() as usize); + buf.resize(buf.capacity(), 0); + index_file.read_exact(&mut buf).await?; + match super::index::find_largest_smaller_than(range.beg, &buf[2..])? { + Some(o) => { + found_first = true; + file.seek(SeekFrom::Start(o.1)).await?; + OpenedFile { + file: Some(file), + path, + positioned: true, + index: true, + nreads: 0, + } + } + None => OpenedFile { + file: None, + path, + positioned: false, + index: true, + nreads: 0, + }, + } + } + Err(e) => match e.kind() { + ErrorKind::NotFound => { + let ts1 = Instant::now(); + let res = + super::index::position_static_len_datafile_at_largest_smaller_than(file, range.beg).await?; + let ts2 = Instant::now(); + if false { + // TODO collect for stats: + let dur = ts2.duration_since(ts1); + info!("position_static_len_datafile took ms {}", dur.as_millis()); + } + file = res.0; + if res.1 { + found_first = true; + OpenedFile { + file: Some(file), + path, + positioned: true, + index: false, + nreads: res.2, + } + } else { + OpenedFile { + file: None, + path, + positioned: false, + index: false, + nreads: res.2, + } + } + } + _ => Err(e)?, + }, + } + }; + if found_first { + p1 += 1; + chtx.send(Ok(ret)).await?; + break; + } else { + if p1 == 0 { + break; + } else { + p1 -= 1; + } + } + } + if found_first { + // Append all following positioned files. + loop { + let tb = timebins[p1]; + let ts_bin = Nanos { + ns: tb * channel_config.time_bin_size.ns, + }; + let path = paths::datapath(tb, &channel_config, &node); + let mut file = OpenOptions::new().read(true).open(&path).await?; + let ret = { + let index_path = paths::index_path(ts_bin, &channel_config, &node)?; + match OpenOptions::new().read(true).open(&index_path).await { + Ok(mut index_file) => { + let meta = index_file.metadata().await?; + if meta.len() > 1024 * 1024 * 20 { + return Err(Error::with_msg(format!( + "too large index file {} bytes for {}", + meta.len(), + channel_config.channel.name + ))); + } + if meta.len() < 2 { + return Err(Error::with_msg(format!( + "bad meta len {} for {}", + meta.len(), + channel_config.channel.name + ))); + } + if meta.len() % 16 != 2 { + return Err(Error::with_msg(format!( + "bad meta len {} for {}", + meta.len(), + channel_config.channel.name + ))); + } + let mut buf = BytesMut::with_capacity(meta.len() as usize); + buf.resize(buf.capacity(), 0); + index_file.read_exact(&mut buf).await?; + match super::index::find_ge(range.beg, &buf[2..])? { + Some(o) => { + file.seek(SeekFrom::Start(o.1)).await?; + OpenedFile { + file: Some(file), + path, + positioned: true, + index: true, + nreads: 0, + } + } + None => OpenedFile { + file: None, + path, + positioned: false, + index: true, + nreads: 0, + }, + } + } + Err(e) => match e.kind() { + ErrorKind::NotFound => { + let ts1 = Instant::now(); + let res = super::index::position_static_len_datafile(file, range.beg).await?; + let ts2 = Instant::now(); + if false { + // TODO collect for stats: + let dur = ts2.duration_since(ts1); + info!("position_static_len_datafile took ms {}", dur.as_millis()); + } + file = res.0; + if res.1 { + OpenedFile { + file: Some(file), + path, + positioned: true, + index: false, + nreads: res.2, + } + } else { + OpenedFile { + file: None, + path, + positioned: false, + index: false, + nreads: res.2, + } + } + } + _ => Err(e)?, + }, + } + }; + chtx.send(Ok(ret)).await?; + p1 += 1; + if p1 >= timebins.len() { + break; + } + } + } else { + // Try to locate files according to non-expand-algorithm. + open_files_inner(chtx, range, &channel_config, node).await?; + } Ok(()) } #[test] -fn single_file() { - let range = netpod::NanoRange { beg: 0, end: 0 }; +fn expanded_file_list() { + use netpod::timeunits::*; + let range = netpod::NanoRange { + beg: DAY + HOUR * 5, + end: DAY + HOUR * 8, + }; let chn = netpod::Channel { backend: "testbackend".into(), name: "scalar-i32-be".into(), }; - let cfg = ChannelConfig { + // TODO read config from disk. + let channel_config = ChannelConfig { channel: chn, keyspace: 2, time_bin_size: Nanos { ns: DAY }, scalar_type: netpod::ScalarType::I32, - compression: false, + byte_order: netpod::ByteOrder::big_endian(), shape: netpod::Shape::Scalar, array: false, - byte_order: netpod::ByteOrder::big_endian(), + compression: false, }; let cluster = taskrun::test_cluster(); let task = async move { - let (tx, rx) = async_channel::bounded(2); - let jh = taskrun::spawn(single_file_before_range(tx, range, cfg, cluster.nodes[0].clone())); - loop { - match rx.recv().await { - Ok(k) => match k { - Ok(k) => { - info!("opened file: {:?}", k.path); - } - Err(e) => { - error!("error while trying to open {:?}", e); - break; - } - }, + let mut paths = vec![]; + let mut files = open_expanded_files(&range, &channel_config, cluster.nodes[0].clone()); + while let Some(file) = files.next().await { + match file { + Ok(k) => { + info!("opened file: {:?}", k); + paths.push(k.path.clone()); + } Err(e) => { - // channel closed. - info!("channel closed"); + error!("error while trying to open {:?}", e); break; } } } - jh.await??; + if paths.len() != 2 { + return Err(Error::with_msg_no_trace("expected 2 files")); + } Ok(()) }; taskrun::run(task).unwrap(); diff --git a/disk/src/gen.rs b/disk/src/gen.rs index e4ab172..adeca43 100644 --- a/disk/src/gen.rs +++ b/disk/src/gen.rs @@ -10,6 +10,12 @@ use tokio::io::AsyncWriteExt; #[allow(unused_imports)] use tracing::{debug, error, info, trace, warn}; +//#[test] +pub fn gen_test_data_test() { + std::env::set_current_dir("..").unwrap(); + taskrun::run(gen_test_data()).unwrap(); +} + pub async fn gen_test_data() -> Result<(), Error> { let data_base_path = PathBuf::from("tmpdata"); let ksprefix = String::from("ks"); @@ -26,13 +32,13 @@ pub async fn gen_test_data() -> Result<(), Error> { }, keyspace: 2, time_bin_size: Nanos { ns: DAY }, - array: false, scalar_type: ScalarType::I32, - shape: Shape::Scalar, byte_order: ByteOrder::big_endian(), + shape: Shape::Scalar, + array: false, compression: false, }, - time_spacing: MS * 100, + time_spacing: MS * 500, }; ensemble.channels.push(chn); let chn = ChannelGenProps { @@ -49,7 +55,7 @@ pub async fn gen_test_data() -> Result<(), Error> { byte_order: ByteOrder::big_endian(), compression: true, }, - time_spacing: MS * 1000, + time_spacing: MS * 4000, }; ensemble.channels.push(chn); let chn = ChannelGenProps { @@ -66,7 +72,7 @@ pub async fn gen_test_data() -> Result<(), Error> { byte_order: ByteOrder::little_endian(), compression: true, }, - time_spacing: MS * 100, + time_spacing: MS * 500, }; ensemble.channels.push(chn); } @@ -122,7 +128,7 @@ async fn gen_channel(chn: &ChannelGenProps, node: &Node, ensemble: &Ensemble) -> let mut evix = 0; let mut ts = Nanos { ns: 0 }; let mut pulse = 0; - while ts.ns < DAY * 2 { + while ts.ns < DAY * 3 { let res = gen_timebin( evix, ts, diff --git a/disk/src/index.rs b/disk/src/index.rs index ddfec6d..2665ca1 100644 --- a/disk/src/index.rs +++ b/disk/src/index.rs @@ -25,6 +25,9 @@ pub fn find_ge(h: u64, buf: &[u8]) -> Result, Error> { let mut k = n1 - 1; let x = u64::from_be_bytes(a[j].0); let y = u64::from_be_bytes(a[k].0); + if x >= y { + return Err(Error::with_msg(format!("search in unordered data"))); + } if x >= h { return Ok(Some((u64::from_be_bytes(a[j].0), u64::from_be_bytes(a[j].1)))); } @@ -46,6 +49,52 @@ pub fn find_ge(h: u64, buf: &[u8]) -> Result, Error> { } } +pub fn find_largest_smaller_than(h: u64, buf: &[u8]) -> Result, Error> { + type NUM = u64; + const ELESIZE: usize = size_of::(); + const N: usize = 2 * ELESIZE; + let n1 = buf.len(); + if n1 % N != 0 { + return Err(Error::with_msg(format!("find_ge bad len {}", n1))); + } + if n1 == 0 { + warn!("Empty index data"); + return Ok(None); + } + let n1 = n1 / N; + let a = unsafe { + let ptr = &buf[0] as *const u8 as *const ([u8; ELESIZE], [u8; ELESIZE]); + std::slice::from_raw_parts(ptr, n1) + }; + let mut j = 0; + let mut k = n1 - 1; + let x = NUM::from_be_bytes(a[j].0); + let y = NUM::from_be_bytes(a[k].0); + if x >= y { + return Err(Error::with_msg(format!("search in unordered data"))); + } + if x >= h { + return Ok(None); + } + if y < h { + let ret = (NUM::from_be_bytes(a[k].0), NUM::from_be_bytes(a[k].1)); + return Ok(Some(ret)); + } + loop { + if k - j < 2 { + let ret = (NUM::from_be_bytes(a[j].0), NUM::from_be_bytes(a[j].1)); + return Ok(Some(ret)); + } + let m = (k + j) / 2; + let x = NUM::from_be_bytes(a[m].0); + if x < h { + j = m; + } else { + k = m; + } + } +} + async fn read(buf: &mut [u8], file: &mut File) -> Result { let mut wp = 0; loop { @@ -170,3 +219,58 @@ pub async fn position_static_len_datafile(mut file: File, beg: u64) -> Result<(F } } } + +pub async fn position_static_len_datafile_at_largest_smaller_than( + mut file: File, + beg: u64, +) -> Result<(File, bool, u32), Error> { + let flen = file.seek(SeekFrom::End(0)).await?; + file.seek(SeekFrom::Start(0)).await?; + let mut buf = vec![0; 1024]; + let _n1 = read(&mut buf, &mut file).await?; + let hres = parse_channel_header(&buf)?; + let headoff = 2 + hres.0 as u64; + let ev = parse_event(&buf[headoff as usize..])?; + let evlen = ev.0 as u64; + let mut j = headoff; + let mut k = ((flen - headoff) / evlen - 1) * evlen + headoff; + let x = ev.1.ns; + let t = read_event_at(k, &mut file).await?; + if t.0 != evlen as u32 { + Err(Error::with_msg(format!( + "inconsistent event lengths: {} vs {}", + t.0, evlen + )))?; + } + let y = t.1.ns; + let mut nreads = 2; + if x >= beg { + file.seek(SeekFrom::Start(j)).await?; + return Ok((file, false, nreads)); + } + if y < beg { + file.seek(SeekFrom::Start(k)).await?; + return Ok((file, true, nreads)); + } + loop { + if k - j < 2 * evlen { + file.seek(SeekFrom::Start(j)).await?; + return Ok((file, true, nreads)); + } + let m = j + (k - j) / 2 / evlen * evlen; + let t = read_event_at(m, &mut file).await?; + if t.0 != evlen as u32 { + Err(Error::with_msg(format!( + "inconsistent event lengths: {} vs {}", + t.0, evlen + )))?; + } + nreads += 1; + let x = t.1.ns; + if x < beg { + j = m; + } else { + k = m; + } + } +}