diff --git a/archapp/src/archeng.rs b/archapp/src/archeng.rs new file mode 100644 index 0000000..cddf4d7 --- /dev/null +++ b/archapp/src/archeng.rs @@ -0,0 +1,83 @@ +use err::Error; +use std::convert::TryInto; +use std::path::PathBuf; +use tokio::fs::OpenOptions; +use tokio::io::AsyncReadExt; + +// TODO +// Wrap each I/O operation into a stopwatch to detect bad NFS performance! + +#[derive(Debug)] +struct IndexFileBasics { + version: u8, + name_hash_anchor_beg: u64, + name_hash_anchor_len: u64, + fa_used_list_beg: u64, + fa_used_list_end: u64, + fa_used_list_len: u64, + fa_free_list_beg: u64, + fa_free_list_end: u64, + fa_free_list_len: u64, + fa_header_prev: u64, + fa_header_next: u64, + fa_header_len: u64, +} + +pub async fn read_file_basics(path: PathBuf) -> Result { + let mut f1 = OpenOptions::new().read(true).open(path).await?; + let mut buf = vec![0; 0x34]; + f1.read_exact(&mut buf).await?; + let version = String::from_utf8(buf[3..4].to_vec())?.parse()?; + let ret = IndexFileBasics { + version, + name_hash_anchor_beg: u32::from_be_bytes(buf[0x04..0x08].try_into().unwrap()) as u64, + name_hash_anchor_len: u32::from_be_bytes(buf[0x08..0x0c].try_into().unwrap()) as u64, + // TODO: + fa_used_list_beg: u32::from_le_bytes(buf[0x08..0x0c].try_into().unwrap()) as u64, + fa_used_list_end: u32::from_le_bytes(buf[0x08..0x0c].try_into().unwrap()) as u64, + fa_used_list_len: u32::from_le_bytes(buf[0x08..0x0c].try_into().unwrap()) as u64, + fa_free_list_beg: u32::from_le_bytes(buf[0x08..0x0c].try_into().unwrap()) as u64, + fa_free_list_end: u32::from_le_bytes(buf[0x08..0x0c].try_into().unwrap()) as u64, + fa_free_list_len: u32::from_le_bytes(buf[0x08..0x0c].try_into().unwrap()) as u64, + fa_header_prev: u32::from_le_bytes(buf[0x08..0x0c].try_into().unwrap()) as u64, + fa_header_next: u32::from_le_bytes(buf[0x08..0x0c].try_into().unwrap()) as u64, + fa_header_len: u32::from_le_bytes(buf[0x08..0x0c].try_into().unwrap()) as u64, + }; + Ok(ret) +} + +#[cfg(test)] +mod test { + // TODO move RangeFilter to a different crate (items?) + // because the `disk` crate should become the specific sf-databuffer reader engine. + + //use disk::rangefilter::RangeFilter; + //use disk::{eventblobs::EventChunkerMultifile, eventchunker::EventChunkerConf}; + + use crate::archeng::read_file_basics; + use err::Error; + use futures_util::StreamExt; + use items::{RangeCompletableItem, StreamItem}; + use netpod::log::*; + use netpod::timeunits::{DAY, MS}; + use netpod::{ByteSize, ChannelConfig, FileIoBufferSize, Nanos}; + use std::path::PathBuf; + + fn open_index_inner(path: impl Into) -> Result<(), Error> { + let task = async move { Ok(()) }; + Ok(taskrun::run(task).unwrap()) + } + + const CHN_0_MASTER_INDEX: &str = "/data/daqbuffer-testdata/sls/gfa03/bl_arch/archive_X05DA_SH/index"; + + #[test] + fn read_magic() -> Result<(), Error> { + let fut = async { + let res = read_file_basics(CHN_0_MASTER_INDEX.into()).await?; + info!("got {:?}", res); + assert!(res.version == 2 || res.version == 3); + Ok(()) + }; + Ok(taskrun::run(fut).unwrap()) + } +} diff --git a/archapp/src/lib.rs b/archapp/src/lib.rs index 2bd4a08..156fb65 100644 --- a/archapp/src/lib.rs +++ b/archapp/src/lib.rs @@ -1,13 +1,14 @@ -use err::Error; - #[cfg(feature = "devread")] pub mod generated; #[cfg(not(feature = "devread"))] pub mod generated {} +pub mod archeng; #[cfg(feature = "devread")] pub mod parse; #[cfg(not(feature = "devread"))] pub mod parsestub; + +use err::Error; use items::eventvalues::EventValues; use items::numops::NumOps; use items::waveevents::{WaveEvents, WaveXBinner}; diff --git a/daqbufp2/src/test/binnedbinary.rs b/daqbufp2/src/test/binnedbinary.rs index 18b94cb..78c9690 100644 --- a/daqbufp2/src/test/binnedbinary.rs +++ b/daqbufp2/src/test/binnedbinary.rs @@ -37,6 +37,9 @@ async fn get_binned_binary_inner() -> Result<(), Error> { ) .await?; } + if true { + return Ok(()); + }; if true { get_binned_channel::( "wave-f64-be-n21", diff --git a/disk/src/aggtest.rs b/disk/src/aggtest.rs index ea2026d..8205326 100644 --- a/disk/src/aggtest.rs +++ b/disk/src/aggtest.rs @@ -15,6 +15,7 @@ pub fn make_test_node(id: u32) -> Node { cache_base_path: format!("../tmpdata/node{:02}", id).into(), ksprefix: "ks".into(), backend: "testbackend".into(), + splits: None, archiver_appliance: None, } } diff --git a/disk/src/eventblobs.rs b/disk/src/eventblobs.rs index eec9a12..e6c5e03 100644 --- a/disk/src/eventblobs.rs +++ b/disk/src/eventblobs.rs @@ -2,6 +2,7 @@ use crate::dataopen::{open_expanded_files, open_files, OpenedFileSet}; use crate::eventchunker::{EventChunker, EventChunkerConf, EventFull}; use crate::file_content_stream; use crate::merge::MergedStream; +use crate::rangefilter::RangeFilter; use err::Error; use futures_core::Stream; use futures_util::StreamExt; @@ -108,7 +109,7 @@ impl Stream for EventChunkerMultifile { Ready(Some(k)) => match k { Ok(ofs) => { self.files_count += ofs.files.len() as u32; - if false && ofs.files.len() == 1 { + if ofs.files.len() == 1 { let mut ofs = ofs; let file = ofs.files.pop().unwrap(); let path = file.path; @@ -131,7 +132,9 @@ impl Stream for EventChunkerMultifile { self.expand, self.do_decompress, ); - self.evs = Some(Box::pin(chunker)); + let filtered = + RangeFilter::new(chunker, self.range.clone(), self.expand); + self.evs = Some(Box::pin(filtered)); } None => {} } @@ -166,7 +169,8 @@ impl Stream for EventChunkerMultifile { } } let merged = MergedStream::new(chunkers); - self.evs = Some(Box::pin(merged)); + let filtered = RangeFilter::new(merged, self.range.clone(), self.expand); + self.evs = Some(Box::pin(filtered)); Ready(Some(Ok(StreamItem::Log(item)))) } } diff --git a/disk/src/gen.rs b/disk/src/gen.rs index 2a2b50c..f99ebbc 100644 --- a/disk/src/gen.rs +++ b/disk/src/gen.rs @@ -124,6 +124,7 @@ pub async fn gen_test_data() -> Result<(), Error> { cache_base_path: data_base_path.join(format!("node{:02}", i1)), ksprefix: ksprefix.clone(), backend: "testbackend".into(), + splits: None, archiver_appliance: None, }; ensemble.nodes.push(node); diff --git a/disk/src/paths.rs b/disk/src/paths.rs index 7b4d7e2..1a59053 100644 --- a/disk/src/paths.rs +++ b/disk/src/paths.rs @@ -46,7 +46,17 @@ pub async fn datapaths_for_timebin( } let vv = dn.chars().fold(0, |a, x| if x.is_digit(10) { a + 1 } else { a }); if vv == 10 { - splits.push(dn.parse::()?); + let split: u64 = dn.parse()?; + match &node.splits { + Some(sps) => { + if sps.contains(&split) { + splits.push(split); + } + } + None => { + splits.push(split); + } + } } } let mut ret = vec![]; diff --git a/disk/src/rangefilter.rs b/disk/src/rangefilter.rs index eebf829..2a97582 100644 --- a/disk/src/rangefilter.rs +++ b/disk/src/rangefilter.rs @@ -1,10 +1,10 @@ -use std::pin::Pin; -use std::task::{Context, Poll}; - use futures_core::Stream; use futures_util::StreamExt; use items::{Appendable, Clearable, PushableIndex, RangeCompletableItem, Sitemty, StreamItem, WithTimestamps}; +use netpod::log::*; use netpod::NanoRange; +use std::pin::Pin; +use std::task::{Context, Poll}; pub struct RangeFilter { inp: S, @@ -95,6 +95,10 @@ where } Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret))))) } + Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)) => { + warn!("\n\nRangeFilter got RangeComplete\n"); + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) + } k => Ready(Some(k)), }, Ready(None) => { diff --git a/netfetch/src/test.rs b/netfetch/src/test.rs index bae7811..6795e21 100644 --- a/netfetch/src/test.rs +++ b/netfetch/src/test.rs @@ -19,6 +19,7 @@ fn ca_connect_1() { cache_base_path: "".into(), listen: "".into(), ksprefix: "".into(), + splits: None, archiver_appliance: None, }, node_config: NodeConfig { diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index 44f8353..85bdc98 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -133,6 +133,7 @@ pub struct Node { pub cache_base_path: PathBuf, pub ksprefix: String, pub backend: String, + pub splits: Option>, pub archiver_appliance: Option, } @@ -147,6 +148,7 @@ impl Node { cache_base_path: PathBuf::new(), ksprefix: "daqlocal".into(), backend: "dummybackend".into(), + splits: None, archiver_appliance: None, } } diff --git a/taskrun/src/lib.rs b/taskrun/src/lib.rs index 4ca33ee..cc27c48 100644 --- a/taskrun/src/lib.rs +++ b/taskrun/src/lib.rs @@ -118,6 +118,7 @@ pub fn test_cluster() -> netpod::Cluster { cache_base_path: format!("../tmpdata/node{:02}", id).into(), ksprefix: "ks".into(), backend: "testbackend".into(), + splits: None, archiver_appliance: None, }) .collect();