From 18c11b30fd8b4c730668cefab1cfacf6d0755246 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 27 Apr 2021 17:33:42 +0200 Subject: [PATCH] Refactor datafile path handling --- disk/Cargo.toml | 1 + disk/src/aggtest.rs | 111 +++++++++++++++++++++++----------------- disk/src/eventblobs.rs | 15 ++---- disk/src/frame/inmem.rs | 4 +- disk/src/gen.rs | 33 +++++++++--- disk/src/index.rs | 26 ++++++++++ disk/src/lib.rs | 71 ++++++++++++++----------- disk/src/paths.rs | 46 +++++++++++++++++ disk/src/raw/conn.rs | 37 +++++--------- netpod/src/lib.rs | 16 ++++++ 10 files changed, 240 insertions(+), 120 deletions(-) create mode 100644 disk/src/index.rs create mode 100644 disk/src/paths.rs diff --git a/disk/Cargo.toml b/disk/Cargo.toml index 14967aa..ceea577 100644 --- a/disk/Cargo.toml +++ b/disk/Cargo.toml @@ -10,6 +10,7 @@ serde_json = "1.0" http = "0.2" chrono = { version = "0.4.19", features = ["serde"] } tokio = { version = "1.5.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } +tokio-stream = {version = "0.1.5", features = ["fs"]} hyper = { version = "0.14", features = ["http1", "http2", "client", "server", "tcp", "stream"] } async-channel = "1.6" bytes = "1.0.1" diff --git a/disk/src/aggtest.rs b/disk/src/aggtest.rs index 5d5cc65..f7b54a5 100644 --- a/disk/src/aggtest.rs +++ b/disk/src/aggtest.rs @@ -43,32 +43,37 @@ async fn agg_x_dim_0_inner() { let ts1 = query.timebin as u64 * query.channel_config.time_bin_size; let ts2 = ts1 + HOUR * 24; let range = NanoRange { beg: ts1, end: ts2 }; - let fut1 = super::eventblobs::EventBlobsComplete::new(&query, query.channel_config.clone(), range, node) - .into_dim_1_f32_stream() - //.take(1000) - .map(|q| { - if false { - if let Ok(ref k) = q { - trace!("vals: {:?}", k); - } + let fut1 = super::eventblobs::EventBlobsComplete::new( + range.clone(), + query.channel_config.clone(), + node.clone(), + query.buffer_size as usize, + ) + .into_dim_1_f32_stream() + //.take(1000) + .map(|q| { + if false { + if let Ok(ref k) = q { + trace!("vals: {:?}", k); } - q - }) - .into_binned_x_bins_1() - .map(|k| { - if false { - trace!("after X binning {:?}", k.as_ref().unwrap()); - } - k - }) - .into_binned_t(BinSpecDimT::over_range(bin_count, ts1, ts2)) - .map(|k| { - if false { - trace!("after T binning {:?}", k.as_ref().unwrap()); - } - k - }) - .for_each(|_k| ready(())); + } + q + }) + .into_binned_x_bins_1() + .map(|k| { + if false { + trace!("after X binning {:?}", k.as_ref().unwrap()); + } + k + }) + .into_binned_t(BinSpecDimT::over_range(bin_count, ts1, ts2)) + .map(|k| { + if false { + trace!("after T binning {:?}", k.as_ref().unwrap()); + } + k + }) + .for_each(|_k| ready(())); fut1.await; } @@ -109,28 +114,33 @@ async fn agg_x_dim_1_inner() { let ts1 = query.timebin as u64 * query.channel_config.time_bin_size; let ts2 = ts1 + HOUR * 24; let range = NanoRange { beg: ts1, end: ts2 }; - let fut1 = super::eventblobs::EventBlobsComplete::new(&query, query.channel_config.clone(), range, node) - .into_dim_1_f32_stream() - //.take(1000) - .map(|q| { - if false { - if let Ok(ref k) = q { - info!("vals: {:?}", k); - } + let fut1 = super::eventblobs::EventBlobsComplete::new( + range.clone(), + query.channel_config.clone(), + node.clone(), + query.buffer_size as usize, + ) + .into_dim_1_f32_stream() + //.take(1000) + .map(|q| { + if false { + if let Ok(ref k) = q { + info!("vals: {:?}", k); } - q - }) - .into_binned_x_bins_1() - .map(|k| { - //info!("after X binning {:?}", k.as_ref().unwrap()); - k - }) - .into_binned_t(BinSpecDimT::over_range(bin_count, ts1, ts2)) - .map(|k| { - info!("after T binning {:?}", k.as_ref().unwrap()); - k - }) - .for_each(|_k| ready(())); + } + q + }) + .into_binned_x_bins_1() + .map(|k| { + //info!("after X binning {:?}", k.as_ref().unwrap()); + k + }) + .into_binned_t(BinSpecDimT::over_range(bin_count, ts1, ts2)) + .map(|k| { + info!("after T binning {:?}", k.as_ref().unwrap()); + k + }) + .for_each(|_k| ready(())); fut1.await; } @@ -168,8 +178,13 @@ async fn merge_0_inner() { .map(|k| make_test_node(k)) .map(|node| { let node = Arc::new(node); - super::eventblobs::EventBlobsComplete::new(&query, query.channel_config.clone(), range.clone(), node) - .into_dim_1_f32_stream() + super::eventblobs::EventBlobsComplete::new( + range.clone(), + query.channel_config.clone(), + node.clone(), + query.buffer_size as usize, + ) + .into_dim_1_f32_stream() }) .collect(); MergeDim1F32Stream::new(streams) diff --git a/disk/src/eventblobs.rs b/disk/src/eventblobs.rs index 8300269..c144a98 100644 --- a/disk/src/eventblobs.rs +++ b/disk/src/eventblobs.rs @@ -13,21 +13,16 @@ pub struct EventBlobsComplete { channel_config: ChannelConfig, file_chan: async_channel::Receiver>, evs: Option, - buffer_size: u32, + buffer_size: usize, range: NanoRange, } impl EventBlobsComplete { - pub fn new( - query: &netpod::AggQuerySingleChannel, - channel_config: ChannelConfig, - range: NanoRange, - node: Arc, - ) -> Self { + pub fn new(range: NanoRange, channel_config: ChannelConfig, node: Arc, buffer_size: usize) -> Self { Self { - file_chan: open_files(query, node), + file_chan: open_files(&range, &channel_config, node.clone()), evs: None, - buffer_size: query.buffer_size, + buffer_size, channel_config, range, } @@ -75,7 +70,7 @@ pub fn event_blobs_complete( let query = query.clone(); let node = node.clone(); async_stream::stream! { - let filerx = open_files(&query, node.clone()); + let filerx = open_files(err::todoval(), err::todoval(), node.clone()); while let Ok(fileres) = filerx.recv().await { match fileres { Ok(file) => { diff --git a/disk/src/frame/inmem.rs b/disk/src/frame/inmem.rs index 7d3c598..f70f941 100644 --- a/disk/src/frame/inmem.rs +++ b/disk/src/frame/inmem.rs @@ -76,7 +76,9 @@ where ); let gg = self.buf.len() - self.wp; let mut buf2 = ReadBuf::new(&mut self.buf[self.wp..]); - assert!(gg > 0); + if gg < 1 || gg > 1024 * 1024 * 20 { + panic!("have gg {}", gg); + } assert!(buf2.remaining() == gg); assert!(buf2.capacity() == gg); assert!(buf2.filled().len() == 0); diff --git a/disk/src/gen.rs b/disk/src/gen.rs index 3af9bd3..56335d8 100644 --- a/disk/src/gen.rs +++ b/disk/src/gen.rs @@ -28,7 +28,7 @@ pub async fn gen_test_data() -> Result<(), Error> { time_bin_size: DAY, array: true, scalar_type: ScalarType::F64, - shape: Shape::Wave(17), + shape: Shape::Wave(21), big_endian: true, compression: true, }, @@ -85,7 +85,7 @@ async fn gen_channel(chn: &ChannelGenProps, node: &Node, ensemble: &Ensemble) -> .map_err(|k| Error::with_msg(format!("can not generate config {:?}", k)))?; let mut evix = 0; let mut ts = 0; - while ts < DAY { + while ts < DAY * 2 { let res = gen_timebin(evix, ts, chn.time_spacing, &channel_path, &chn.config, node, ensemble).await?; evix = res.evix; ts = res.ts; @@ -198,21 +198,34 @@ async fn gen_timebin( .join(format!("{:019}", tb)) .join(format!("{:010}", node.split)); tokio::fs::create_dir_all(&path).await?; - let path = path.join(format!("{:019}_{:05}_Data", config.time_bin_size / MS, 0)); - info!("open file {:?}", path); + let data_path = path.join(format!("{:019}_{:05}_Data", config.time_bin_size / MS, 0)); + let index_path = path.join(format!("{:019}_{:05}_Data_Index", config.time_bin_size / MS, 0)); + info!("open data file {:?}", data_path); let mut file = OpenOptions::new() .write(true) .create(true) .truncate(true) - .open(path) + .open(data_path) .await?; + let mut index_file = if let Shape::Wave(_) = config.shape { + info!("open index file {:?}", index_path); + let f = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(index_path) + .await?; + Some(f) + } else { + None + }; gen_datafile_header(&mut file, config).await?; let mut evix = evix; let mut ts = ts; let tsmax = (tb + 1) * config.time_bin_size; while ts < tsmax { if evix % ensemble.nodes.len() as u64 == node.split as u64 { - gen_event(&mut file, evix, ts, config).await?; + gen_event(&mut file, index_file.as_mut(), evix, ts, config).await?; } evix += 1; ts += ts_spacing; @@ -233,7 +246,13 @@ async fn gen_datafile_header(file: &mut File, config: &ChannelConfig) -> Result< Ok(()) } -async fn gen_event(file: &mut File, evix: u64, ts: u64, config: &ChannelConfig) -> Result<(), Error> { +async fn gen_event( + file: &mut File, + _index_file: Option<&mut File>, + evix: u64, + ts: u64, + config: &ChannelConfig, +) -> Result<(), Error> { let mut buf = BytesMut::with_capacity(1024 * 16); buf.put_i32(0xcafecafe as u32 as i32); buf.put_u64(0xcafecafe); diff --git a/disk/src/index.rs b/disk/src/index.rs new file mode 100644 index 0000000..e7e3076 --- /dev/null +++ b/disk/src/index.rs @@ -0,0 +1,26 @@ +use err::Error; +use netpod::log::*; +use netpod::{ChannelConfig, Nanos, Node}; +use tokio::fs::OpenOptions; +use tokio::io::ErrorKind; + +pub async fn find_start_pos_for_config( + ts: Nanos, + channel_config: &ChannelConfig, + node: &Node, +) -> Result, Error> { + let index_path = super::paths::index_path(ts, channel_config, node)?; + let ret = match OpenOptions::new().open(&index_path).await { + Ok(_file) => { + info!("opened index file"); + error!("??????????????? TODO search index for start"); + err::todoval::(); + None + } + Err(e) => match e.kind() { + ErrorKind::NotFound => None, + _ => Err(e)?, + }, + }; + Ok(ret) +} diff --git a/disk/src/lib.rs b/disk/src/lib.rs index e1b3395..15d5d3f 100644 --- a/disk/src/lib.rs +++ b/disk/src/lib.rs @@ -4,7 +4,7 @@ use err::Error; use futures_core::Stream; use futures_util::future::FusedFuture; use futures_util::{pin_mut, select, FutureExt, StreamExt}; -use netpod::{ChannelConfig, Node, Shape}; +use netpod::{ChannelConfig, NanoRange, Node, Shape}; use std::future::Future; use std::path::PathBuf; use std::pin::Pin; @@ -25,11 +25,13 @@ pub mod eventblobs; pub mod eventchunker; pub mod frame; pub mod gen; +pub mod index; pub mod merge; +pub mod paths; pub mod raw; pub async fn read_test_1(query: &netpod::AggQuerySingleChannel, node: Arc) -> Result { - let path = datapath(query.timebin as u64, &query.channel_config, &node); + let path = paths::datapath(query.timebin as u64, &query.channel_config, &node); debug!("try path: {:?}", path); let fin = OpenOptions::new().read(true).open(path).await?; let meta = fin.metadata().await; @@ -156,7 +158,7 @@ pub fn raw_concat_channel_read_stream_try_open_in_background( { if !fopen_avail && file_prep.is_none() && i1 < 16 { info!("Prepare open task for next file {}", query.timebin + i1); - fopen.replace(Fopen1::new(datapath(query.timebin as u64 + i1 as u64, &query.channel_config, &node))); + fopen.replace(Fopen1::new(paths::datapath(query.timebin as u64 + i1 as u64, &query.channel_config, &node))); fopen_avail = true; i1 += 1; } @@ -264,20 +266,23 @@ pub fn raw_concat_channel_read_stream_try_open_in_background( } pub fn raw_concat_channel_read_stream_file_pipe( - query: &netpod::AggQuerySingleChannel, + range: &NanoRange, + channel_config: &ChannelConfig, node: Arc, + buffer_size: usize, ) -> impl Stream> + Send { - let query = query.clone(); + let range = range.clone(); + let channel_config = channel_config.clone(); let node = node.clone(); async_stream::stream! { - let chrx = open_files(&query, node.clone()); + let chrx = open_files(&range, &channel_config, node.clone()); while let Ok(file) = chrx.recv().await { let mut file = match file { Ok(k) => k, Err(_) => break }; loop { - let mut buf = BytesMut::with_capacity(query.buffer_size as usize); + let mut buf = BytesMut::with_capacity(buffer_size); use tokio::io::AsyncReadExt; let n1 = file.read_buf(&mut buf).await?; if n1 == 0 { @@ -293,17 +298,36 @@ pub fn raw_concat_channel_read_stream_file_pipe( } fn open_files( - query: &netpod::AggQuerySingleChannel, + range: &NanoRange, + channel_config: &ChannelConfig, node: Arc, ) -> async_channel::Receiver> { + let channel_config = channel_config.clone(); let (chtx, chrx) = async_channel::bounded(2); - let mut query = query.clone(); - let node = node.clone(); tokio::spawn(async move { - let tb0 = query.timebin; - for i1 in 0..query.tb_file_count { - query.timebin = tb0 + i1; - let path = datapath(query.timebin as u64, &query.channel_config, &node); + // TODO reduce usage of `query` and see what we actually need. + // TODO scan the timebins on the filesystem for the potential files first instead of trying to open hundreds in worst case. + + let mut timebins = vec![]; + { + let rd = tokio::fs::read_dir(paths::channel_timebins_dir_path(&channel_config, &node)?).await?; + let mut rd = tokio_stream::wrappers::ReadDirStream::new(rd); + while let Some(e) = rd.next().await { + let e = e?; + let dn = e + .file_name() + .into_string() + .map_err(|e| Error::with_msg(format!("Bad OS path {:?}", e)))?; + let vv = dn.chars().fold(0, |a, x| if x.is_digit(10) { a + 1 } else { a }); + if vv == 19 { + timebins.push(dn.parse::()?); + } + } + } + timebins.sort_unstable(); + info!("TIMEBINS FOUND: {:?}", timebins); + for &tb in &timebins { + let path = paths::datapath(tb, &channel_config, &node); let fileres = OpenOptions::new().read(true).open(&path).await; info!("opened file {:?} {:?}", &path, &fileres); match fileres { @@ -317,6 +341,7 @@ fn open_files( }, } } + Ok::<_, Error>(()) }); chrx } @@ -348,7 +373,7 @@ pub fn parsed1( let query = query.clone(); let node = node.clone(); async_stream::stream! { - let filerx = open_files(&query, node.clone()); + let filerx = open_files(err::todoval(), err::todoval(), node.clone()); while let Ok(fileres) = filerx.recv().await { match fileres { Ok(file) => { @@ -486,7 +511,7 @@ pub fn raw_concat_channel_read_stream_timebin( let query = query.clone(); let node = node.clone(); async_stream::stream! { - let path = datapath(query.timebin as u64, &query.channel_config, &node); + let path = paths::datapath(query.timebin as u64, &query.channel_config, &node); debug!("try path: {:?}", path); let mut fin = OpenOptions::new().read(true).open(path).await?; let meta = fin.metadata().await?; @@ -511,20 +536,6 @@ pub fn raw_concat_channel_read_stream_timebin( } } -fn datapath(timebin: u64, config: &netpod::ChannelConfig, node: &Node) -> PathBuf { - //let pre = "/data/sf-databuffer/daq_swissfel"; - node.data_base_path - .join(format!("{}_{}", node.ksprefix, config.keyspace)) - .join("byTime") - .join(config.channel.name.clone()) - .join(format!("{:019}", timebin)) - .join(format!("{:010}", node.split)) - .join(format!( - "{:019}_00000_Data", - config.time_bin_size / netpod::timeunits::MS - )) -} - /** Read all events from all timebins for the given channel and split. */ diff --git a/disk/src/paths.rs b/disk/src/paths.rs new file mode 100644 index 0000000..3dc4761 --- /dev/null +++ b/disk/src/paths.rs @@ -0,0 +1,46 @@ +use err::Error; +use netpod::timeunits::MS; +use netpod::{ChannelConfig, Nanos, Node}; +use std::path::PathBuf; + +pub fn datapath(timebin: u64, config: &netpod::ChannelConfig, node: &Node) -> PathBuf { + //let pre = "/data/sf-databuffer/daq_swissfel"; + node.data_base_path + .join(format!("{}_{}", node.ksprefix, config.keyspace)) + .join("byTime") + .join(config.channel.name.clone()) + .join(format!("{:019}", timebin)) + .join(format!("{:010}", node.split)) + .join(format!( + "{:019}_00000_Data", + config.time_bin_size / netpod::timeunits::MS + )) +} + +pub fn channel_timebins_dir_path(channel_config: &ChannelConfig, node: &Node) -> Result { + let ret = node + .data_base_path + .join(format!("{}_{}", node.ksprefix, channel_config.keyspace)) + .join("byTime") + .join(&channel_config.channel.name); + Ok(ret) +} + +pub fn data_dir_path(ts: Nanos, channel_config: &ChannelConfig, node: &Node) -> Result { + let ret = channel_timebins_dir_path(channel_config, node)? + .join(format!("{:019}", ts.ns / channel_config.time_bin_size)) + .join(format!("{:010}", node.split)); + Ok(ret) +} + +pub fn data_path(ts: Nanos, channel_config: &ChannelConfig, node: &Node) -> Result { + let fname = format!("{:019}_{:05}_Data", channel_config.time_bin_size / MS, 0); + let ret = data_dir_path(ts, channel_config, node)?.join(fname); + Ok(ret) +} + +pub fn index_path(ts: Nanos, channel_config: &ChannelConfig, node: &Node) -> Result { + let fname = format!("{:019}_{:05}_Data_Index", channel_config.time_bin_size / MS, 0); + let ret = data_dir_path(ts, channel_config, node)?.join(fname); + Ok(ret) +} diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index 8c00536..f2c4e28 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -8,7 +8,7 @@ use err::Error; use futures_util::StreamExt; #[allow(unused_imports)] use netpod::log::*; -use netpod::timeunits::{DAY, SEC}; +use netpod::timeunits::SEC; use netpod::{NodeConfig, ScalarType, Shape}; use std::net::SocketAddr; use std::sync::Arc; @@ -128,11 +128,7 @@ async fn raw_conn_handler_inner_try( Ok(_) => (), Err(e) => return Err((e, netout))?, } - debug!( - "\n\nREQUEST FOR RANGE {} {}\n\n", - evq.range.beg / SEC, - evq.range.end / SEC - ); + debug!("REQUEST FOR RANGE {:?}", evq.range); error!( "TODO decide on response content based on the parsed json query\n{:?}", evq @@ -142,7 +138,10 @@ async fn raw_conn_handler_inner_try( Ok(k) => k, Err(e) => return Err((e, netout))?, }; - let entry = extract_matching_config_entry(range, &channel_config)?; + let entry = match extract_matching_config_entry(range, &channel_config) { + Ok(k) => k, + Err(e) => return Err((e, netout))?, + }; info!("found config entry {:?}", entry); let shape = match &entry.shape { @@ -150,25 +149,14 @@ async fn raw_conn_handler_inner_try( if lens.len() == 1 { Shape::Wave(lens[0]) } else { - return Err(Error::with_msg(format!("Channel config unsupported shape {:?}", entry)))?; + return Err(( + Error::with_msg(format!("Channel config unsupported shape {:?}", entry)), + netout, + ))?; } } None => Shape::Scalar, }; - - /* - TODO - This endpoint should deliver events over some time range, across files. - Therefore, based on the query and the found channel config, list the available files in the - candidate directories, and iterate over events from those files. - !!! use index if available - • Generate index file for my test data. - • Use index file if available. - • If not, must use binary search if possible in that type. - Create a new type in place of AggQuerySingleChannel? - */ - err::todoval(); - let query = netpod::AggQuerySingleChannel { channel_config: netpod::ChannelConfig { channel: evq.channel.clone(), @@ -186,11 +174,12 @@ async fn raw_conn_handler_inner_try( // TODO use the requested buffer size buffer_size: 1024 * 4, }; + let buffer_size = 1024 * 4; let mut s1 = EventBlobsComplete::new( - &query, + range.clone(), query.channel_config.clone(), - evq.range.clone(), node_config.node.clone(), + buffer_size, ) .into_dim_1_f32_stream() .take(10) diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index d6898f6..7d72583 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -152,11 +152,27 @@ pub enum TimeRange { } #[derive(Clone, Debug, Serialize, Deserialize)] +pub struct Nanos { + pub ns: u64, +} + +#[derive(Clone, Serialize, Deserialize)] pub struct NanoRange { pub beg: u64, pub end: u64, } +impl std::fmt::Debug for NanoRange { + fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { + write!( + fmt, + "NanoRange {{ beg: {} s, end: {} s }}", + self.beg / SEC, + self.end / SEC + ) + } +} + impl NanoRange { pub fn delta(&self) -> u64 { self.end - self.beg