Refactor datafile path handling

This commit is contained in:
Dominik Werder
2021-04-27 17:33:42 +02:00
parent 0b40702b6c
commit 18c11b30fd
10 changed files with 240 additions and 120 deletions

View File

@@ -10,6 +10,7 @@ serde_json = "1.0"
http = "0.2"
chrono = { version = "0.4.19", features = ["serde"] }
tokio = { version = "1.5.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] }
tokio-stream = {version = "0.1.5", features = ["fs"]}
hyper = { version = "0.14", features = ["http1", "http2", "client", "server", "tcp", "stream"] }
async-channel = "1.6"
bytes = "1.0.1"

View File

@@ -43,32 +43,37 @@ async fn agg_x_dim_0_inner() {
let ts1 = query.timebin as u64 * query.channel_config.time_bin_size;
let ts2 = ts1 + HOUR * 24;
let range = NanoRange { beg: ts1, end: ts2 };
let fut1 = super::eventblobs::EventBlobsComplete::new(&query, query.channel_config.clone(), range, node)
.into_dim_1_f32_stream()
//.take(1000)
.map(|q| {
if false {
if let Ok(ref k) = q {
trace!("vals: {:?}", k);
}
let fut1 = super::eventblobs::EventBlobsComplete::new(
range.clone(),
query.channel_config.clone(),
node.clone(),
query.buffer_size as usize,
)
.into_dim_1_f32_stream()
//.take(1000)
.map(|q| {
if false {
if let Ok(ref k) = q {
trace!("vals: {:?}", k);
}
q
})
.into_binned_x_bins_1()
.map(|k| {
if false {
trace!("after X binning {:?}", k.as_ref().unwrap());
}
k
})
.into_binned_t(BinSpecDimT::over_range(bin_count, ts1, ts2))
.map(|k| {
if false {
trace!("after T binning {:?}", k.as_ref().unwrap());
}
k
})
.for_each(|_k| ready(()));
}
q
})
.into_binned_x_bins_1()
.map(|k| {
if false {
trace!("after X binning {:?}", k.as_ref().unwrap());
}
k
})
.into_binned_t(BinSpecDimT::over_range(bin_count, ts1, ts2))
.map(|k| {
if false {
trace!("after T binning {:?}", k.as_ref().unwrap());
}
k
})
.for_each(|_k| ready(()));
fut1.await;
}
@@ -109,28 +114,33 @@ async fn agg_x_dim_1_inner() {
let ts1 = query.timebin as u64 * query.channel_config.time_bin_size;
let ts2 = ts1 + HOUR * 24;
let range = NanoRange { beg: ts1, end: ts2 };
let fut1 = super::eventblobs::EventBlobsComplete::new(&query, query.channel_config.clone(), range, node)
.into_dim_1_f32_stream()
//.take(1000)
.map(|q| {
if false {
if let Ok(ref k) = q {
info!("vals: {:?}", k);
}
let fut1 = super::eventblobs::EventBlobsComplete::new(
range.clone(),
query.channel_config.clone(),
node.clone(),
query.buffer_size as usize,
)
.into_dim_1_f32_stream()
//.take(1000)
.map(|q| {
if false {
if let Ok(ref k) = q {
info!("vals: {:?}", k);
}
q
})
.into_binned_x_bins_1()
.map(|k| {
//info!("after X binning {:?}", k.as_ref().unwrap());
k
})
.into_binned_t(BinSpecDimT::over_range(bin_count, ts1, ts2))
.map(|k| {
info!("after T binning {:?}", k.as_ref().unwrap());
k
})
.for_each(|_k| ready(()));
}
q
})
.into_binned_x_bins_1()
.map(|k| {
//info!("after X binning {:?}", k.as_ref().unwrap());
k
})
.into_binned_t(BinSpecDimT::over_range(bin_count, ts1, ts2))
.map(|k| {
info!("after T binning {:?}", k.as_ref().unwrap());
k
})
.for_each(|_k| ready(()));
fut1.await;
}
@@ -168,8 +178,13 @@ async fn merge_0_inner() {
.map(|k| make_test_node(k))
.map(|node| {
let node = Arc::new(node);
super::eventblobs::EventBlobsComplete::new(&query, query.channel_config.clone(), range.clone(), node)
.into_dim_1_f32_stream()
super::eventblobs::EventBlobsComplete::new(
range.clone(),
query.channel_config.clone(),
node.clone(),
query.buffer_size as usize,
)
.into_dim_1_f32_stream()
})
.collect();
MergeDim1F32Stream::new(streams)

View File

@@ -13,21 +13,16 @@ pub struct EventBlobsComplete {
channel_config: ChannelConfig,
file_chan: async_channel::Receiver<Result<File, Error>>,
evs: Option<EventChunker>,
buffer_size: u32,
buffer_size: usize,
range: NanoRange,
}
impl EventBlobsComplete {
pub fn new(
query: &netpod::AggQuerySingleChannel,
channel_config: ChannelConfig,
range: NanoRange,
node: Arc<Node>,
) -> Self {
pub fn new(range: NanoRange, channel_config: ChannelConfig, node: Arc<Node>, buffer_size: usize) -> Self {
Self {
file_chan: open_files(query, node),
file_chan: open_files(&range, &channel_config, node.clone()),
evs: None,
buffer_size: query.buffer_size,
buffer_size,
channel_config,
range,
}
@@ -75,7 +70,7 @@ pub fn event_blobs_complete(
let query = query.clone();
let node = node.clone();
async_stream::stream! {
let filerx = open_files(&query, node.clone());
let filerx = open_files(err::todoval(), err::todoval(), node.clone());
while let Ok(fileres) = filerx.recv().await {
match fileres {
Ok(file) => {

View File

@@ -76,7 +76,9 @@ where
);
let gg = self.buf.len() - self.wp;
let mut buf2 = ReadBuf::new(&mut self.buf[self.wp..]);
assert!(gg > 0);
if gg < 1 || gg > 1024 * 1024 * 20 {
panic!("have gg {}", gg);
}
assert!(buf2.remaining() == gg);
assert!(buf2.capacity() == gg);
assert!(buf2.filled().len() == 0);

View File

@@ -28,7 +28,7 @@ pub async fn gen_test_data() -> Result<(), Error> {
time_bin_size: DAY,
array: true,
scalar_type: ScalarType::F64,
shape: Shape::Wave(17),
shape: Shape::Wave(21),
big_endian: true,
compression: true,
},
@@ -85,7 +85,7 @@ async fn gen_channel(chn: &ChannelGenProps, node: &Node, ensemble: &Ensemble) ->
.map_err(|k| Error::with_msg(format!("can not generate config {:?}", k)))?;
let mut evix = 0;
let mut ts = 0;
while ts < DAY {
while ts < DAY * 2 {
let res = gen_timebin(evix, ts, chn.time_spacing, &channel_path, &chn.config, node, ensemble).await?;
evix = res.evix;
ts = res.ts;
@@ -198,21 +198,34 @@ async fn gen_timebin(
.join(format!("{:019}", tb))
.join(format!("{:010}", node.split));
tokio::fs::create_dir_all(&path).await?;
let path = path.join(format!("{:019}_{:05}_Data", config.time_bin_size / MS, 0));
info!("open file {:?}", path);
let data_path = path.join(format!("{:019}_{:05}_Data", config.time_bin_size / MS, 0));
let index_path = path.join(format!("{:019}_{:05}_Data_Index", config.time_bin_size / MS, 0));
info!("open data file {:?}", data_path);
let mut file = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(path)
.open(data_path)
.await?;
let mut index_file = if let Shape::Wave(_) = config.shape {
info!("open index file {:?}", index_path);
let f = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(index_path)
.await?;
Some(f)
} else {
None
};
gen_datafile_header(&mut file, config).await?;
let mut evix = evix;
let mut ts = ts;
let tsmax = (tb + 1) * config.time_bin_size;
while ts < tsmax {
if evix % ensemble.nodes.len() as u64 == node.split as u64 {
gen_event(&mut file, evix, ts, config).await?;
gen_event(&mut file, index_file.as_mut(), evix, ts, config).await?;
}
evix += 1;
ts += ts_spacing;
@@ -233,7 +246,13 @@ async fn gen_datafile_header(file: &mut File, config: &ChannelConfig) -> Result<
Ok(())
}
async fn gen_event(file: &mut File, evix: u64, ts: u64, config: &ChannelConfig) -> Result<(), Error> {
async fn gen_event(
file: &mut File,
_index_file: Option<&mut File>,
evix: u64,
ts: u64,
config: &ChannelConfig,
) -> Result<(), Error> {
let mut buf = BytesMut::with_capacity(1024 * 16);
buf.put_i32(0xcafecafe as u32 as i32);
buf.put_u64(0xcafecafe);

26
disk/src/index.rs Normal file
View File

@@ -0,0 +1,26 @@
use err::Error;
use netpod::log::*;
use netpod::{ChannelConfig, Nanos, Node};
use tokio::fs::OpenOptions;
use tokio::io::ErrorKind;
pub async fn find_start_pos_for_config(
ts: Nanos,
channel_config: &ChannelConfig,
node: &Node,
) -> Result<Option<u64>, Error> {
let index_path = super::paths::index_path(ts, channel_config, node)?;
let ret = match OpenOptions::new().open(&index_path).await {
Ok(_file) => {
info!("opened index file");
error!("??????????????? TODO search index for start");
err::todoval::<u32>();
None
}
Err(e) => match e.kind() {
ErrorKind::NotFound => None,
_ => Err(e)?,
},
};
Ok(ret)
}

View File

@@ -4,7 +4,7 @@ use err::Error;
use futures_core::Stream;
use futures_util::future::FusedFuture;
use futures_util::{pin_mut, select, FutureExt, StreamExt};
use netpod::{ChannelConfig, Node, Shape};
use netpod::{ChannelConfig, NanoRange, Node, Shape};
use std::future::Future;
use std::path::PathBuf;
use std::pin::Pin;
@@ -25,11 +25,13 @@ pub mod eventblobs;
pub mod eventchunker;
pub mod frame;
pub mod gen;
pub mod index;
pub mod merge;
pub mod paths;
pub mod raw;
pub async fn read_test_1(query: &netpod::AggQuerySingleChannel, node: Arc<Node>) -> Result<netpod::BodyStream, Error> {
let path = datapath(query.timebin as u64, &query.channel_config, &node);
let path = paths::datapath(query.timebin as u64, &query.channel_config, &node);
debug!("try path: {:?}", path);
let fin = OpenOptions::new().read(true).open(path).await?;
let meta = fin.metadata().await;
@@ -156,7 +158,7 @@ pub fn raw_concat_channel_read_stream_try_open_in_background(
{
if !fopen_avail && file_prep.is_none() && i1 < 16 {
info!("Prepare open task for next file {}", query.timebin + i1);
fopen.replace(Fopen1::new(datapath(query.timebin as u64 + i1 as u64, &query.channel_config, &node)));
fopen.replace(Fopen1::new(paths::datapath(query.timebin as u64 + i1 as u64, &query.channel_config, &node)));
fopen_avail = true;
i1 += 1;
}
@@ -264,20 +266,23 @@ pub fn raw_concat_channel_read_stream_try_open_in_background(
}
pub fn raw_concat_channel_read_stream_file_pipe(
query: &netpod::AggQuerySingleChannel,
range: &NanoRange,
channel_config: &ChannelConfig,
node: Arc<Node>,
buffer_size: usize,
) -> impl Stream<Item = Result<BytesMut, Error>> + Send {
let query = query.clone();
let range = range.clone();
let channel_config = channel_config.clone();
let node = node.clone();
async_stream::stream! {
let chrx = open_files(&query, node.clone());
let chrx = open_files(&range, &channel_config, node.clone());
while let Ok(file) = chrx.recv().await {
let mut file = match file {
Ok(k) => k,
Err(_) => break
};
loop {
let mut buf = BytesMut::with_capacity(query.buffer_size as usize);
let mut buf = BytesMut::with_capacity(buffer_size);
use tokio::io::AsyncReadExt;
let n1 = file.read_buf(&mut buf).await?;
if n1 == 0 {
@@ -293,17 +298,36 @@ pub fn raw_concat_channel_read_stream_file_pipe(
}
fn open_files(
query: &netpod::AggQuerySingleChannel,
range: &NanoRange,
channel_config: &ChannelConfig,
node: Arc<Node>,
) -> async_channel::Receiver<Result<tokio::fs::File, Error>> {
let channel_config = channel_config.clone();
let (chtx, chrx) = async_channel::bounded(2);
let mut query = query.clone();
let node = node.clone();
tokio::spawn(async move {
let tb0 = query.timebin;
for i1 in 0..query.tb_file_count {
query.timebin = tb0 + i1;
let path = datapath(query.timebin as u64, &query.channel_config, &node);
// TODO reduce usage of `query` and see what we actually need.
// TODO scan the timebins on the filesystem for the potential files first instead of trying to open hundreds in worst case.
let mut timebins = vec![];
{
let rd = tokio::fs::read_dir(paths::channel_timebins_dir_path(&channel_config, &node)?).await?;
let mut rd = tokio_stream::wrappers::ReadDirStream::new(rd);
while let Some(e) = rd.next().await {
let e = e?;
let dn = e
.file_name()
.into_string()
.map_err(|e| Error::with_msg(format!("Bad OS path {:?}", e)))?;
let vv = dn.chars().fold(0, |a, x| if x.is_digit(10) { a + 1 } else { a });
if vv == 19 {
timebins.push(dn.parse::<u64>()?);
}
}
}
timebins.sort_unstable();
info!("TIMEBINS FOUND: {:?}", timebins);
for &tb in &timebins {
let path = paths::datapath(tb, &channel_config, &node);
let fileres = OpenOptions::new().read(true).open(&path).await;
info!("opened file {:?} {:?}", &path, &fileres);
match fileres {
@@ -317,6 +341,7 @@ fn open_files(
},
}
}
Ok::<_, Error>(())
});
chrx
}
@@ -348,7 +373,7 @@ pub fn parsed1(
let query = query.clone();
let node = node.clone();
async_stream::stream! {
let filerx = open_files(&query, node.clone());
let filerx = open_files(err::todoval(), err::todoval(), node.clone());
while let Ok(fileres) = filerx.recv().await {
match fileres {
Ok(file) => {
@@ -486,7 +511,7 @@ pub fn raw_concat_channel_read_stream_timebin(
let query = query.clone();
let node = node.clone();
async_stream::stream! {
let path = datapath(query.timebin as u64, &query.channel_config, &node);
let path = paths::datapath(query.timebin as u64, &query.channel_config, &node);
debug!("try path: {:?}", path);
let mut fin = OpenOptions::new().read(true).open(path).await?;
let meta = fin.metadata().await?;
@@ -511,20 +536,6 @@ pub fn raw_concat_channel_read_stream_timebin(
}
}
fn datapath(timebin: u64, config: &netpod::ChannelConfig, node: &Node) -> PathBuf {
//let pre = "/data/sf-databuffer/daq_swissfel";
node.data_base_path
.join(format!("{}_{}", node.ksprefix, config.keyspace))
.join("byTime")
.join(config.channel.name.clone())
.join(format!("{:019}", timebin))
.join(format!("{:010}", node.split))
.join(format!(
"{:019}_00000_Data",
config.time_bin_size / netpod::timeunits::MS
))
}
/**
Read all events from all timebins for the given channel and split.
*/

46
disk/src/paths.rs Normal file
View File

@@ -0,0 +1,46 @@
use err::Error;
use netpod::timeunits::MS;
use netpod::{ChannelConfig, Nanos, Node};
use std::path::PathBuf;
pub fn datapath(timebin: u64, config: &netpod::ChannelConfig, node: &Node) -> PathBuf {
//let pre = "/data/sf-databuffer/daq_swissfel";
node.data_base_path
.join(format!("{}_{}", node.ksprefix, config.keyspace))
.join("byTime")
.join(config.channel.name.clone())
.join(format!("{:019}", timebin))
.join(format!("{:010}", node.split))
.join(format!(
"{:019}_00000_Data",
config.time_bin_size / netpod::timeunits::MS
))
}
pub fn channel_timebins_dir_path(channel_config: &ChannelConfig, node: &Node) -> Result<PathBuf, Error> {
let ret = node
.data_base_path
.join(format!("{}_{}", node.ksprefix, channel_config.keyspace))
.join("byTime")
.join(&channel_config.channel.name);
Ok(ret)
}
pub fn data_dir_path(ts: Nanos, channel_config: &ChannelConfig, node: &Node) -> Result<PathBuf, Error> {
let ret = channel_timebins_dir_path(channel_config, node)?
.join(format!("{:019}", ts.ns / channel_config.time_bin_size))
.join(format!("{:010}", node.split));
Ok(ret)
}
pub fn data_path(ts: Nanos, channel_config: &ChannelConfig, node: &Node) -> Result<PathBuf, Error> {
let fname = format!("{:019}_{:05}_Data", channel_config.time_bin_size / MS, 0);
let ret = data_dir_path(ts, channel_config, node)?.join(fname);
Ok(ret)
}
pub fn index_path(ts: Nanos, channel_config: &ChannelConfig, node: &Node) -> Result<PathBuf, Error> {
let fname = format!("{:019}_{:05}_Data_Index", channel_config.time_bin_size / MS, 0);
let ret = data_dir_path(ts, channel_config, node)?.join(fname);
Ok(ret)
}

View File

@@ -8,7 +8,7 @@ use err::Error;
use futures_util::StreamExt;
#[allow(unused_imports)]
use netpod::log::*;
use netpod::timeunits::{DAY, SEC};
use netpod::timeunits::SEC;
use netpod::{NodeConfig, ScalarType, Shape};
use std::net::SocketAddr;
use std::sync::Arc;
@@ -128,11 +128,7 @@ async fn raw_conn_handler_inner_try(
Ok(_) => (),
Err(e) => return Err((e, netout))?,
}
debug!(
"\n\nREQUEST FOR RANGE {} {}\n\n",
evq.range.beg / SEC,
evq.range.end / SEC
);
debug!("REQUEST FOR RANGE {:?}", evq.range);
error!(
"TODO decide on response content based on the parsed json query\n{:?}",
evq
@@ -142,7 +138,10 @@ async fn raw_conn_handler_inner_try(
Ok(k) => k,
Err(e) => return Err((e, netout))?,
};
let entry = extract_matching_config_entry(range, &channel_config)?;
let entry = match extract_matching_config_entry(range, &channel_config) {
Ok(k) => k,
Err(e) => return Err((e, netout))?,
};
info!("found config entry {:?}", entry);
let shape = match &entry.shape {
@@ -150,25 +149,14 @@ async fn raw_conn_handler_inner_try(
if lens.len() == 1 {
Shape::Wave(lens[0])
} else {
return Err(Error::with_msg(format!("Channel config unsupported shape {:?}", entry)))?;
return Err((
Error::with_msg(format!("Channel config unsupported shape {:?}", entry)),
netout,
))?;
}
}
None => Shape::Scalar,
};
/*
TODO
This endpoint should deliver events over some time range, across files.
Therefore, based on the query and the found channel config, list the available files in the
candidate directories, and iterate over events from those files.
!!! use index if available
• Generate index file for my test data.
• Use index file if available.
• If not, must use binary search if possible in that type.
Create a new type in place of AggQuerySingleChannel?
*/
err::todoval();
let query = netpod::AggQuerySingleChannel {
channel_config: netpod::ChannelConfig {
channel: evq.channel.clone(),
@@ -186,11 +174,12 @@ async fn raw_conn_handler_inner_try(
// TODO use the requested buffer size
buffer_size: 1024 * 4,
};
let buffer_size = 1024 * 4;
let mut s1 = EventBlobsComplete::new(
&query,
range.clone(),
query.channel_config.clone(),
evq.range.clone(),
node_config.node.clone(),
buffer_size,
)
.into_dim_1_f32_stream()
.take(10)

View File

@@ -152,11 +152,27 @@ pub enum TimeRange {
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Nanos {
pub ns: u64,
}
#[derive(Clone, Serialize, Deserialize)]
pub struct NanoRange {
pub beg: u64,
pub end: u64,
}
impl std::fmt::Debug for NanoRange {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
fmt,
"NanoRange {{ beg: {} s, end: {} s }}",
self.beg / SEC,
self.end / SEC
)
}
}
impl NanoRange {
pub fn delta(&self) -> u64 {
self.end - self.beg