Find files for expanded query
This commit is contained in:
@@ -3,8 +3,8 @@ use bytes::BytesMut;
|
||||
use err::Error;
|
||||
use futures_util::StreamExt;
|
||||
use netpod::log::*;
|
||||
use netpod::timeunits::DAY;
|
||||
use netpod::{ChannelConfig, NanoRange, Nanos, Node};
|
||||
use std::fmt;
|
||||
use std::path::PathBuf;
|
||||
use std::time::Instant;
|
||||
use tokio::fs::{File, OpenOptions};
|
||||
@@ -18,6 +18,18 @@ pub struct OpenedFile {
|
||||
pub nreads: u32,
|
||||
}
|
||||
|
||||
impl fmt::Debug for OpenedFile {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
f.debug_struct("OpenedFile")
|
||||
.field("path", &self.path)
|
||||
.field("file", &self.file)
|
||||
.field("positioned", &self.positioned)
|
||||
.field("index", &self.index)
|
||||
.field("nreads", &self.nreads)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn open_files(
|
||||
range: &NanoRange,
|
||||
channel_config: &ChannelConfig,
|
||||
@@ -64,6 +76,7 @@ async fn open_files_inner(
|
||||
}
|
||||
}
|
||||
timebins.sort_unstable();
|
||||
let timebins = timebins;
|
||||
for &tb in &timebins {
|
||||
let ts_bin = Nanos {
|
||||
ns: tb * channel_config.time_bin_size.ns,
|
||||
@@ -150,7 +163,7 @@ async fn open_files_inner(
|
||||
path,
|
||||
positioned: false,
|
||||
index: false,
|
||||
nreads: 0,
|
||||
nreads: res.2,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -164,58 +177,321 @@ async fn open_files_inner(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/**
|
||||
Try to find and position the file with the youngest event before the requested range.
|
||||
/*
|
||||
Provide the stream of positioned data files which are relevant for the given parameters.
|
||||
Expanded to one event before and after the requested range, if exists.
|
||||
*/
|
||||
async fn single_file_before_range(
|
||||
chtx: async_channel::Sender<Result<OpenedFile, Error>>,
|
||||
range: NanoRange,
|
||||
channel_config: ChannelConfig,
|
||||
pub fn open_expanded_files(
|
||||
range: &NanoRange,
|
||||
channel_config: &ChannelConfig,
|
||||
node: Node,
|
||||
) -> async_channel::Receiver<Result<OpenedFile, Error>> {
|
||||
let (chtx, chrx) = async_channel::bounded(2);
|
||||
let range = range.clone();
|
||||
let channel_config = channel_config.clone();
|
||||
tokio::spawn(async move {
|
||||
match open_expanded_files_inner(&chtx, &range, &channel_config, node).await {
|
||||
Ok(_) => {}
|
||||
Err(e) => match chtx.send(Err(e.into())).await {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
error!("open_files channel send error {:?}", e);
|
||||
}
|
||||
},
|
||||
}
|
||||
});
|
||||
chrx
|
||||
}
|
||||
|
||||
async fn open_expanded_files_inner(
|
||||
chtx: &async_channel::Sender<Result<OpenedFile, Error>>,
|
||||
range: &NanoRange,
|
||||
channel_config: &ChannelConfig,
|
||||
node: Node,
|
||||
) -> Result<(), Error> {
|
||||
let channel_config = channel_config.clone();
|
||||
let mut timebins = vec![];
|
||||
{
|
||||
let rd = tokio::fs::read_dir(paths::channel_timebins_dir_path(&channel_config, &node)?).await?;
|
||||
let mut rd = tokio_stream::wrappers::ReadDirStream::new(rd);
|
||||
while let Some(e) = rd.next().await {
|
||||
let e = e?;
|
||||
let dn = e
|
||||
.file_name()
|
||||
.into_string()
|
||||
.map_err(|e| Error::with_msg(format!("Bad OS path {:?}", e)))?;
|
||||
let vv = dn.chars().fold(0, |a, x| if x.is_digit(10) { a + 1 } else { a });
|
||||
if vv == 19 {
|
||||
timebins.push(dn.parse::<u64>()?);
|
||||
}
|
||||
}
|
||||
}
|
||||
timebins.sort_unstable();
|
||||
let timebins = timebins;
|
||||
if timebins.len() == 0 {
|
||||
return Ok(());
|
||||
}
|
||||
let mut p1 = None;
|
||||
for (i1, tb) in timebins.iter().enumerate().rev() {
|
||||
let ts_bin = Nanos {
|
||||
ns: tb * channel_config.time_bin_size.ns,
|
||||
};
|
||||
if ts_bin.ns <= range.beg {
|
||||
p1 = Some(i1);
|
||||
break;
|
||||
}
|
||||
}
|
||||
let mut p1 = if let Some(i1) = p1 { i1 } else { 0 };
|
||||
let mut found_first = false;
|
||||
loop {
|
||||
let tb = timebins[p1];
|
||||
let ts_bin = Nanos {
|
||||
ns: tb * channel_config.time_bin_size.ns,
|
||||
};
|
||||
let path = paths::datapath(tb, &channel_config, &node);
|
||||
let mut file = OpenOptions::new().read(true).open(&path).await?;
|
||||
let ret = {
|
||||
let index_path = paths::index_path(ts_bin, &channel_config, &node)?;
|
||||
match OpenOptions::new().read(true).open(&index_path).await {
|
||||
Ok(mut index_file) => {
|
||||
let meta = index_file.metadata().await?;
|
||||
if meta.len() > 1024 * 1024 * 20 {
|
||||
return Err(Error::with_msg(format!(
|
||||
"too large index file {} bytes for {}",
|
||||
meta.len(),
|
||||
channel_config.channel.name
|
||||
)));
|
||||
}
|
||||
if meta.len() < 2 {
|
||||
return Err(Error::with_msg(format!(
|
||||
"bad meta len {} for {}",
|
||||
meta.len(),
|
||||
channel_config.channel.name
|
||||
)));
|
||||
}
|
||||
if meta.len() % 16 != 2 {
|
||||
return Err(Error::with_msg(format!(
|
||||
"bad meta len {} for {}",
|
||||
meta.len(),
|
||||
channel_config.channel.name
|
||||
)));
|
||||
}
|
||||
let mut buf = BytesMut::with_capacity(meta.len() as usize);
|
||||
buf.resize(buf.capacity(), 0);
|
||||
index_file.read_exact(&mut buf).await?;
|
||||
match super::index::find_largest_smaller_than(range.beg, &buf[2..])? {
|
||||
Some(o) => {
|
||||
found_first = true;
|
||||
file.seek(SeekFrom::Start(o.1)).await?;
|
||||
OpenedFile {
|
||||
file: Some(file),
|
||||
path,
|
||||
positioned: true,
|
||||
index: true,
|
||||
nreads: 0,
|
||||
}
|
||||
}
|
||||
None => OpenedFile {
|
||||
file: None,
|
||||
path,
|
||||
positioned: false,
|
||||
index: true,
|
||||
nreads: 0,
|
||||
},
|
||||
}
|
||||
}
|
||||
Err(e) => match e.kind() {
|
||||
ErrorKind::NotFound => {
|
||||
let ts1 = Instant::now();
|
||||
let res =
|
||||
super::index::position_static_len_datafile_at_largest_smaller_than(file, range.beg).await?;
|
||||
let ts2 = Instant::now();
|
||||
if false {
|
||||
// TODO collect for stats:
|
||||
let dur = ts2.duration_since(ts1);
|
||||
info!("position_static_len_datafile took ms {}", dur.as_millis());
|
||||
}
|
||||
file = res.0;
|
||||
if res.1 {
|
||||
found_first = true;
|
||||
OpenedFile {
|
||||
file: Some(file),
|
||||
path,
|
||||
positioned: true,
|
||||
index: false,
|
||||
nreads: res.2,
|
||||
}
|
||||
} else {
|
||||
OpenedFile {
|
||||
file: None,
|
||||
path,
|
||||
positioned: false,
|
||||
index: false,
|
||||
nreads: res.2,
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => Err(e)?,
|
||||
},
|
||||
}
|
||||
};
|
||||
if found_first {
|
||||
p1 += 1;
|
||||
chtx.send(Ok(ret)).await?;
|
||||
break;
|
||||
} else {
|
||||
if p1 == 0 {
|
||||
break;
|
||||
} else {
|
||||
p1 -= 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
if found_first {
|
||||
// Append all following positioned files.
|
||||
loop {
|
||||
let tb = timebins[p1];
|
||||
let ts_bin = Nanos {
|
||||
ns: tb * channel_config.time_bin_size.ns,
|
||||
};
|
||||
let path = paths::datapath(tb, &channel_config, &node);
|
||||
let mut file = OpenOptions::new().read(true).open(&path).await?;
|
||||
let ret = {
|
||||
let index_path = paths::index_path(ts_bin, &channel_config, &node)?;
|
||||
match OpenOptions::new().read(true).open(&index_path).await {
|
||||
Ok(mut index_file) => {
|
||||
let meta = index_file.metadata().await?;
|
||||
if meta.len() > 1024 * 1024 * 20 {
|
||||
return Err(Error::with_msg(format!(
|
||||
"too large index file {} bytes for {}",
|
||||
meta.len(),
|
||||
channel_config.channel.name
|
||||
)));
|
||||
}
|
||||
if meta.len() < 2 {
|
||||
return Err(Error::with_msg(format!(
|
||||
"bad meta len {} for {}",
|
||||
meta.len(),
|
||||
channel_config.channel.name
|
||||
)));
|
||||
}
|
||||
if meta.len() % 16 != 2 {
|
||||
return Err(Error::with_msg(format!(
|
||||
"bad meta len {} for {}",
|
||||
meta.len(),
|
||||
channel_config.channel.name
|
||||
)));
|
||||
}
|
||||
let mut buf = BytesMut::with_capacity(meta.len() as usize);
|
||||
buf.resize(buf.capacity(), 0);
|
||||
index_file.read_exact(&mut buf).await?;
|
||||
match super::index::find_ge(range.beg, &buf[2..])? {
|
||||
Some(o) => {
|
||||
file.seek(SeekFrom::Start(o.1)).await?;
|
||||
OpenedFile {
|
||||
file: Some(file),
|
||||
path,
|
||||
positioned: true,
|
||||
index: true,
|
||||
nreads: 0,
|
||||
}
|
||||
}
|
||||
None => OpenedFile {
|
||||
file: None,
|
||||
path,
|
||||
positioned: false,
|
||||
index: true,
|
||||
nreads: 0,
|
||||
},
|
||||
}
|
||||
}
|
||||
Err(e) => match e.kind() {
|
||||
ErrorKind::NotFound => {
|
||||
let ts1 = Instant::now();
|
||||
let res = super::index::position_static_len_datafile(file, range.beg).await?;
|
||||
let ts2 = Instant::now();
|
||||
if false {
|
||||
// TODO collect for stats:
|
||||
let dur = ts2.duration_since(ts1);
|
||||
info!("position_static_len_datafile took ms {}", dur.as_millis());
|
||||
}
|
||||
file = res.0;
|
||||
if res.1 {
|
||||
OpenedFile {
|
||||
file: Some(file),
|
||||
path,
|
||||
positioned: true,
|
||||
index: false,
|
||||
nreads: res.2,
|
||||
}
|
||||
} else {
|
||||
OpenedFile {
|
||||
file: None,
|
||||
path,
|
||||
positioned: false,
|
||||
index: false,
|
||||
nreads: res.2,
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => Err(e)?,
|
||||
},
|
||||
}
|
||||
};
|
||||
chtx.send(Ok(ret)).await?;
|
||||
p1 += 1;
|
||||
if p1 >= timebins.len() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Try to locate files according to non-expand-algorithm.
|
||||
open_files_inner(chtx, range, &channel_config, node).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn single_file() {
|
||||
let range = netpod::NanoRange { beg: 0, end: 0 };
|
||||
fn expanded_file_list() {
|
||||
use netpod::timeunits::*;
|
||||
let range = netpod::NanoRange {
|
||||
beg: DAY + HOUR * 5,
|
||||
end: DAY + HOUR * 8,
|
||||
};
|
||||
let chn = netpod::Channel {
|
||||
backend: "testbackend".into(),
|
||||
name: "scalar-i32-be".into(),
|
||||
};
|
||||
let cfg = ChannelConfig {
|
||||
// TODO read config from disk.
|
||||
let channel_config = ChannelConfig {
|
||||
channel: chn,
|
||||
keyspace: 2,
|
||||
time_bin_size: Nanos { ns: DAY },
|
||||
scalar_type: netpod::ScalarType::I32,
|
||||
compression: false,
|
||||
byte_order: netpod::ByteOrder::big_endian(),
|
||||
shape: netpod::Shape::Scalar,
|
||||
array: false,
|
||||
byte_order: netpod::ByteOrder::big_endian(),
|
||||
compression: false,
|
||||
};
|
||||
let cluster = taskrun::test_cluster();
|
||||
let task = async move {
|
||||
let (tx, rx) = async_channel::bounded(2);
|
||||
let jh = taskrun::spawn(single_file_before_range(tx, range, cfg, cluster.nodes[0].clone()));
|
||||
loop {
|
||||
match rx.recv().await {
|
||||
Ok(k) => match k {
|
||||
Ok(k) => {
|
||||
info!("opened file: {:?}", k.path);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("error while trying to open {:?}", e);
|
||||
break;
|
||||
}
|
||||
},
|
||||
let mut paths = vec![];
|
||||
let mut files = open_expanded_files(&range, &channel_config, cluster.nodes[0].clone());
|
||||
while let Some(file) = files.next().await {
|
||||
match file {
|
||||
Ok(k) => {
|
||||
info!("opened file: {:?}", k);
|
||||
paths.push(k.path.clone());
|
||||
}
|
||||
Err(e) => {
|
||||
// channel closed.
|
||||
info!("channel closed");
|
||||
error!("error while trying to open {:?}", e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
jh.await??;
|
||||
if paths.len() != 2 {
|
||||
return Err(Error::with_msg_no_trace("expected 2 files"));
|
||||
}
|
||||
Ok(())
|
||||
};
|
||||
taskrun::run(task).unwrap();
|
||||
|
||||
@@ -10,6 +10,12 @@ use tokio::io::AsyncWriteExt;
|
||||
#[allow(unused_imports)]
|
||||
use tracing::{debug, error, info, trace, warn};
|
||||
|
||||
//#[test]
|
||||
pub fn gen_test_data_test() {
|
||||
std::env::set_current_dir("..").unwrap();
|
||||
taskrun::run(gen_test_data()).unwrap();
|
||||
}
|
||||
|
||||
pub async fn gen_test_data() -> Result<(), Error> {
|
||||
let data_base_path = PathBuf::from("tmpdata");
|
||||
let ksprefix = String::from("ks");
|
||||
@@ -26,13 +32,13 @@ pub async fn gen_test_data() -> Result<(), Error> {
|
||||
},
|
||||
keyspace: 2,
|
||||
time_bin_size: Nanos { ns: DAY },
|
||||
array: false,
|
||||
scalar_type: ScalarType::I32,
|
||||
shape: Shape::Scalar,
|
||||
byte_order: ByteOrder::big_endian(),
|
||||
shape: Shape::Scalar,
|
||||
array: false,
|
||||
compression: false,
|
||||
},
|
||||
time_spacing: MS * 100,
|
||||
time_spacing: MS * 500,
|
||||
};
|
||||
ensemble.channels.push(chn);
|
||||
let chn = ChannelGenProps {
|
||||
@@ -49,7 +55,7 @@ pub async fn gen_test_data() -> Result<(), Error> {
|
||||
byte_order: ByteOrder::big_endian(),
|
||||
compression: true,
|
||||
},
|
||||
time_spacing: MS * 1000,
|
||||
time_spacing: MS * 4000,
|
||||
};
|
||||
ensemble.channels.push(chn);
|
||||
let chn = ChannelGenProps {
|
||||
@@ -66,7 +72,7 @@ pub async fn gen_test_data() -> Result<(), Error> {
|
||||
byte_order: ByteOrder::little_endian(),
|
||||
compression: true,
|
||||
},
|
||||
time_spacing: MS * 100,
|
||||
time_spacing: MS * 500,
|
||||
};
|
||||
ensemble.channels.push(chn);
|
||||
}
|
||||
@@ -122,7 +128,7 @@ async fn gen_channel(chn: &ChannelGenProps, node: &Node, ensemble: &Ensemble) ->
|
||||
let mut evix = 0;
|
||||
let mut ts = Nanos { ns: 0 };
|
||||
let mut pulse = 0;
|
||||
while ts.ns < DAY * 2 {
|
||||
while ts.ns < DAY * 3 {
|
||||
let res = gen_timebin(
|
||||
evix,
|
||||
ts,
|
||||
|
||||
@@ -25,6 +25,9 @@ pub fn find_ge(h: u64, buf: &[u8]) -> Result<Option<(u64, u64)>, Error> {
|
||||
let mut k = n1 - 1;
|
||||
let x = u64::from_be_bytes(a[j].0);
|
||||
let y = u64::from_be_bytes(a[k].0);
|
||||
if x >= y {
|
||||
return Err(Error::with_msg(format!("search in unordered data")));
|
||||
}
|
||||
if x >= h {
|
||||
return Ok(Some((u64::from_be_bytes(a[j].0), u64::from_be_bytes(a[j].1))));
|
||||
}
|
||||
@@ -46,6 +49,52 @@ pub fn find_ge(h: u64, buf: &[u8]) -> Result<Option<(u64, u64)>, Error> {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn find_largest_smaller_than(h: u64, buf: &[u8]) -> Result<Option<(u64, u64)>, Error> {
|
||||
type NUM = u64;
|
||||
const ELESIZE: usize = size_of::<NUM>();
|
||||
const N: usize = 2 * ELESIZE;
|
||||
let n1 = buf.len();
|
||||
if n1 % N != 0 {
|
||||
return Err(Error::with_msg(format!("find_ge bad len {}", n1)));
|
||||
}
|
||||
if n1 == 0 {
|
||||
warn!("Empty index data");
|
||||
return Ok(None);
|
||||
}
|
||||
let n1 = n1 / N;
|
||||
let a = unsafe {
|
||||
let ptr = &buf[0] as *const u8 as *const ([u8; ELESIZE], [u8; ELESIZE]);
|
||||
std::slice::from_raw_parts(ptr, n1)
|
||||
};
|
||||
let mut j = 0;
|
||||
let mut k = n1 - 1;
|
||||
let x = NUM::from_be_bytes(a[j].0);
|
||||
let y = NUM::from_be_bytes(a[k].0);
|
||||
if x >= y {
|
||||
return Err(Error::with_msg(format!("search in unordered data")));
|
||||
}
|
||||
if x >= h {
|
||||
return Ok(None);
|
||||
}
|
||||
if y < h {
|
||||
let ret = (NUM::from_be_bytes(a[k].0), NUM::from_be_bytes(a[k].1));
|
||||
return Ok(Some(ret));
|
||||
}
|
||||
loop {
|
||||
if k - j < 2 {
|
||||
let ret = (NUM::from_be_bytes(a[j].0), NUM::from_be_bytes(a[j].1));
|
||||
return Ok(Some(ret));
|
||||
}
|
||||
let m = (k + j) / 2;
|
||||
let x = NUM::from_be_bytes(a[m].0);
|
||||
if x < h {
|
||||
j = m;
|
||||
} else {
|
||||
k = m;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn read(buf: &mut [u8], file: &mut File) -> Result<usize, Error> {
|
||||
let mut wp = 0;
|
||||
loop {
|
||||
@@ -170,3 +219,58 @@ pub async fn position_static_len_datafile(mut file: File, beg: u64) -> Result<(F
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn position_static_len_datafile_at_largest_smaller_than(
|
||||
mut file: File,
|
||||
beg: u64,
|
||||
) -> Result<(File, bool, u32), Error> {
|
||||
let flen = file.seek(SeekFrom::End(0)).await?;
|
||||
file.seek(SeekFrom::Start(0)).await?;
|
||||
let mut buf = vec![0; 1024];
|
||||
let _n1 = read(&mut buf, &mut file).await?;
|
||||
let hres = parse_channel_header(&buf)?;
|
||||
let headoff = 2 + hres.0 as u64;
|
||||
let ev = parse_event(&buf[headoff as usize..])?;
|
||||
let evlen = ev.0 as u64;
|
||||
let mut j = headoff;
|
||||
let mut k = ((flen - headoff) / evlen - 1) * evlen + headoff;
|
||||
let x = ev.1.ns;
|
||||
let t = read_event_at(k, &mut file).await?;
|
||||
if t.0 != evlen as u32 {
|
||||
Err(Error::with_msg(format!(
|
||||
"inconsistent event lengths: {} vs {}",
|
||||
t.0, evlen
|
||||
)))?;
|
||||
}
|
||||
let y = t.1.ns;
|
||||
let mut nreads = 2;
|
||||
if x >= beg {
|
||||
file.seek(SeekFrom::Start(j)).await?;
|
||||
return Ok((file, false, nreads));
|
||||
}
|
||||
if y < beg {
|
||||
file.seek(SeekFrom::Start(k)).await?;
|
||||
return Ok((file, true, nreads));
|
||||
}
|
||||
loop {
|
||||
if k - j < 2 * evlen {
|
||||
file.seek(SeekFrom::Start(j)).await?;
|
||||
return Ok((file, true, nreads));
|
||||
}
|
||||
let m = j + (k - j) / 2 / evlen * evlen;
|
||||
let t = read_event_at(m, &mut file).await?;
|
||||
if t.0 != evlen as u32 {
|
||||
Err(Error::with_msg(format!(
|
||||
"inconsistent event lengths: {} vs {}",
|
||||
t.0, evlen
|
||||
)))?;
|
||||
}
|
||||
nreads += 1;
|
||||
let x = t.1.ns;
|
||||
if x < beg {
|
||||
j = m;
|
||||
} else {
|
||||
k = m;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user