diff --git a/archapp/src/archeng.rs b/archapp/src/archeng.rs index e170953..3ffa995 100644 --- a/archapp/src/archeng.rs +++ b/archapp/src/archeng.rs @@ -1,10 +1,11 @@ +use async_channel::{Receiver, Sender}; use err::Error; use netpod::timeunits::SEC; -use netpod::{log::*, Nanos}; +use netpod::{log::*, FilePos, Nanos}; use std::convert::TryInto; use std::io::{self, SeekFrom}; use std::path::PathBuf; -use std::time::Instant; +use std::time::{Duration, Instant}; use tokio::fs::{File, OpenOptions}; use tokio::io::{AsyncReadExt, AsyncSeekExt}; @@ -33,7 +34,7 @@ impl TimedIo for File { type Offset = u64; const OFFSET_SIZE: usize = std::mem::size_of::(); -async fn open_read(path: PathBuf) -> io::Result { +pub async fn open_read(path: PathBuf) -> io::Result { let ts1 = Instant::now(); let res = OpenOptions::new().read(true).open(path).await; let ts2 = Instant::now(); @@ -109,10 +110,6 @@ impl IndexFileBasics { panic!() } } - - pub async fn read_named_hash_channel_entry(&self, file: &mut File) -> Result<(), Error> { - Ok(()) - } } pub fn name_hash(s: &str, ht_len: u32) -> u32 { @@ -290,35 +287,49 @@ pub async fn read_file_basics(f1: &mut File) -> Result { pub struct RTreeNodeRecord { ts1: u64, ts2: u64, + // TODO should probably be better name `child or offset` and be made enum. child_or_id: Offset, } #[derive(Debug)] pub struct RTreeNode { + pos: FilePos, records: Vec, + is_leaf: bool, + rtree_m: usize, +} + +#[derive(Debug)] +pub struct RTreeNodeAtRecord { + node: RTreeNode, + rix: usize, } // TODO refactor as struct, rtree_m is a property of the tree. -pub async fn read_rtree_node(file: &mut File, pos: u64, rtree_m: usize) -> Result { +pub async fn read_rtree_node(file: &mut File, pos: FilePos, rtree_m: usize) -> Result { const OFF1: usize = 9; const RLEN: usize = 24; const NANO_MAX: u32 = 999999999; - seek(file, SeekFrom::Start(pos)).await?; + seek(file, SeekFrom::Start(pos.into())).await?; let mut rb = RingBuf::new(); // TODO must know how much data I need at least... rb.fill_min(file, OFF1 + rtree_m * RLEN).await?; - let s = format_hex_block(rb.data(), 128); - info!("RTREE NODE:\n{}", s); + if false { + let s = format_hex_block(rb.data(), 128); + info!("RTREE NODE:\n{}", s); + } if rb.len() < 1 + OFFSET_SIZE { return Err(Error::with_msg_no_trace("could not read enough")); } let b = rb.data(); let is_leaf = b[0] != 0; let parent = readu64(b, 1); - info!("is_leaf: {} parent: {}", is_leaf, parent); + if false { + info!("is_leaf: {} parent: {}", is_leaf, parent); + } let recs = (0..rtree_m) .into_iter() - .map(|i| { + .filter_map(|i| { let off2 = OFF1 + i * RLEN; let ts1a = readu32(b, off2 + 0); let ts1b = readu32(b, off2 + 4); @@ -330,15 +341,24 @@ pub async fn read_rtree_node(file: &mut File, pos: u64, rtree_m: usize) -> Resul let ts2 = ts2a as u64 * SEC + ts2b as u64; let child_or_id = readu64(b, off2 + 16); //info!("NODE {} {} {} {} {}", ts1a, ts1b, ts2a, ts2b, child_or_id); - let rec = RTreeNodeRecord { ts1, ts2, child_or_id }; - rec + if child_or_id != 0 { + let rec = RTreeNodeRecord { ts1, ts2, child_or_id }; + Some(rec) + } else { + None + } }) .collect(); - let node = RTreeNode { records: recs }; + let node = RTreeNode { + pos, + records: recs, + is_leaf, + rtree_m, + }; Ok(node) } -pub async fn read_rtree_entrypoint(file: &mut File, pos: u64, _basics: &IndexFileBasics) -> Result<(), Error> { +pub async fn read_rtree_entrypoint(file: &mut File, pos: u64, _basics: &IndexFileBasics) -> Result { seek(file, SeekFrom::Start(pos)).await?; let mut rb = RingBuf::new(); // TODO should be able to indicate: @@ -350,24 +370,71 @@ pub async fn read_rtree_entrypoint(file: &mut File, pos: u64, _basics: &IndexFil let b = rb.data(); let node_offset = readu64(b, 0); let rtree_m = readu32(b, OFFSET_SIZE); - info!("node_offset: {} rtree_m: {}", node_offset, rtree_m); - read_rtree_node(file, node_offset, rtree_m as usize).await?; - Ok(()) + //info!("node_offset: {} rtree_m: {}", node_offset, rtree_m); + let pos = FilePos { pos: node_offset }; + let node = read_rtree_node(file, pos, rtree_m as usize).await?; + //info!("read_rtree_entrypoint READ ROOT NODE: {:?}", node); + Ok(node) } -// TODO -// Implement search as tail recursion or loop. -// What is a good new type to identify a node? the location in the file? Use a new type. -pub async fn search(file: &mut File, rtree_m: usize, start_node_pos: u64, beg: Nanos) -> Result, Error> { - let node = read_rtree_node(file, start_node_pos, rtree_m).await?; - if beg.ns < node.records[0].ts1 { - None - } else { - }; - Ok(()) +#[derive(Debug)] +pub struct TreeSearchStats { + duration: Duration, + node_reads: usize, } -pub async fn read_channel(index_file: &mut File, channel_name: &str) -> Result<(), Error> { +impl TreeSearchStats { + fn new(ts1: Instant, node_reads: usize) -> Self { + Self { + duration: Instant::now().duration_since(ts1), + node_reads, + } + } +} + +pub async fn search_record( + file: &mut File, + rtree_m: usize, + start_node_pos: FilePos, + beg: Nanos, +) -> Result<(Option, TreeSearchStats), Error> { + let ts1 = Instant::now(); + let mut node = read_rtree_node(file, start_node_pos, rtree_m).await?; + let mut node_reads = 1; + 'outer: loop { + let nr = node.records.len(); + for (i, rec) in node.records.iter().enumerate() { + //info!("looking at record i {}", i); + if rec.ts2 > beg.ns { + if node.is_leaf { + info!("found leaf match at {} / {}", i, nr); + let ret = RTreeNodeAtRecord { node, rix: i }; + let stats = TreeSearchStats::new(ts1, node_reads); + return Ok((Some(ret), stats)); + } else { + info!("found non-leaf match at {} / {}", i, nr); + let pos = FilePos { pos: rec.child_or_id }; + node = read_rtree_node(file, pos, rtree_m).await?; + node_reads += 1; + continue 'outer; + } + } + } + { + let stats = TreeSearchStats::new(ts1, node_reads); + return Ok((None, stats)); + } + } +} + +#[derive(Debug)] +pub struct ChannelInfoBasics { + channel_name: String, + rtree_m: usize, + rtree_start_pos: FilePos, +} + +pub async fn read_channel(index_file: &mut File, channel_name: &str) -> Result, Error> { // TODO // How do I locate the correct index file? // Given a channel name, how do I find the master index? @@ -399,12 +466,12 @@ pub async fn read_channel(index_file: &mut File, channel_name: &str) -> Result<( let n0 = 20; let n1 = name_len as usize; let n2 = id_txt_len as usize; - let channel_name = String::from_utf8(buf[n0..n0 + n1].to_vec())?; + let channel_name_found = String::from_utf8(buf[n0..n0 + n1].to_vec())?; let id_txt = String::from_utf8(buf[n0 + n1..n0 + n1 + n2].to_vec())?; let e = NamedHashChannelEntry { next, id_rtree_pos: id, - channel_name, + channel_name: channel_name_found, id_txt, }; entries.push(e); @@ -418,13 +485,35 @@ pub async fn read_channel(index_file: &mut File, channel_name: &str) -> Result<( rb.adv(next as usize); } } - info!("parsed {} items", entries.len()); - - if entries.len() > 0 { - let e = &entries[0]; - read_rtree_entrypoint(f1, e.id_rtree_pos, &basics).await?; + for e in &entries { + if e.channel_name == channel_name { + let ep = read_rtree_entrypoint(f1, e.id_rtree_pos, &basics).await?; + let ret = ChannelInfoBasics { + channel_name: channel_name.into(), + rtree_m: ep.rtree_m, + rtree_start_pos: ep.pos, + }; + return Ok(Some(ret)); + } } - Ok(()) + Ok(None) +} + +async fn datarange_stream_fill(channel_name: &str, tx: Sender) { + // Search the first relevant leaf node. + // Pipe all ranges from there, and continue with nodes. + // Issue: can not stop because I don't look into the files. +} + +// TODO +// Should contain enough information to allow one to open and position a relevant datafile. +pub struct Datarange {} + +pub fn datarange_stream(channel_name: &str) -> Result, Error> { + let (tx, rx) = async_channel::bounded(4); + let task = async {}; + taskrun::spawn(task); + Ok(rx) } #[cfg(test)] @@ -444,11 +533,16 @@ mod test { use netpod::{ByteSize, ChannelConfig, FileIoBufferSize, Nanos}; use std::path::PathBuf; + use super::search_record; + fn open_index_inner(path: impl Into) -> Result<(), Error> { let task = async move { Ok(()) }; Ok(taskrun::run(task).unwrap()) } + /* + Root node: most left record ts1 965081099942616289, most right record ts2 1002441959876114632 + */ const CHN_0_MASTER_INDEX: &str = "/data/daqbuffer-testdata/sls/gfa03/bl_arch/archive_X05DA_SH/index"; #[test] @@ -475,4 +569,88 @@ mod test { }; Ok(taskrun::run(fut).unwrap()) } + + #[test] + fn search_record_middle() -> Result<(), Error> { + /* + RTreeNodeRecord { ts1: 969729779686636130, ts2: 970351442331056677, child_or_id: 130731 }, + RTreeNodeRecord { ts1: 970351499684884156, ts2: 970417919634086480, child_or_id: 185074 }, + RTreeNodeRecord { ts1: 970417979635219603, ts2: 970429859806669835, child_or_id: 185015 }, + */ + let fut = async { + let mut index_file = open_read(CHN_0_MASTER_INDEX.into()).await?; + let channel_name = "X05DA-FE-WI1:TC1"; + const T0: u64 = 970351442331056677 + 1; + let beg = Nanos { ns: T0 }; + let res = read_channel(&mut index_file, channel_name).await?; + let cib = res.unwrap(); + let (res, stats) = search_record(&mut index_file, cib.rtree_m, cib.rtree_start_pos, beg).await?; + assert_eq!(res.is_some(), true); + let res = res.unwrap(); + assert_eq!(res.node.pos.pos, 8216); + assert_eq!(res.rix, 17); + let rec = &res.node.records[res.rix]; + assert_eq!(rec.ts1, 970351499684884156); + assert_eq!(rec.ts2, 970417919634086480); + Ok(()) + }; + Ok(taskrun::run(fut).unwrap()) + } + + // Note: this tests only the index tree, but it does not look for any actual event in some file. + #[test] + fn search_record_at_beg() -> Result<(), Error> { + let fut = async { + let mut index_file = open_read(CHN_0_MASTER_INDEX.into()).await?; + let channel_name = "X05DA-FE-WI1:TC1"; + const T0: u64 = 965081099942616289; + let beg = Nanos { ns: T0 }; + let res = read_channel(&mut index_file, channel_name).await?; + let cib = res.unwrap(); + let (res, stats) = search_record(&mut index_file, cib.rtree_m, cib.rtree_start_pos, beg).await?; + assert_eq!(res.is_some(), true); + let res = res.unwrap(); + assert_eq!(res.node.pos.pos, 8216); + assert_eq!(res.rix, 0); + Ok(()) + }; + Ok(taskrun::run(fut).unwrap()) + } + + // Note: this tests only the index tree, but it does not look for any actual event in some file. + #[test] + fn search_record_at_end() -> Result<(), Error> { + let fut = async { + let mut index_file = open_read(CHN_0_MASTER_INDEX.into()).await?; + let channel_name = "X05DA-FE-WI1:TC1"; + const T0: u64 = 1002441959876114632 - 1; + let beg = Nanos { ns: T0 }; + let res = read_channel(&mut index_file, channel_name).await?; + let cib = res.unwrap(); + let (res, stats) = search_record(&mut index_file, cib.rtree_m, cib.rtree_start_pos, beg).await?; + assert_eq!(res.is_some(), true); + let res = res.unwrap(); + assert_eq!(res.node.pos.pos, 1861178); + assert_eq!(res.rix, 46); + Ok(()) + }; + Ok(taskrun::run(fut).unwrap()) + } + + // Note: this tests only the index tree, but it does not look for any actual event in some file. + #[test] + fn search_record_beyond_end() -> Result<(), Error> { + let fut = async { + let mut index_file = open_read(CHN_0_MASTER_INDEX.into()).await?; + let channel_name = "X05DA-FE-WI1:TC1"; + const T0: u64 = 1002441959876114632 - 0; + let beg = Nanos { ns: T0 }; + let res = read_channel(&mut index_file, channel_name).await?; + let cib = res.unwrap(); + let (res, stats) = search_record(&mut index_file, cib.rtree_m, cib.rtree_start_pos, beg).await?; + assert_eq!(res.is_none(), true); + Ok(()) + }; + Ok(taskrun::run(fut).unwrap()) + } } diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index 85bdc98..c2508e5 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -259,6 +259,17 @@ impl Channel { } } +#[derive(Clone, Copy, Debug, Serialize, Deserialize)] +pub struct FilePos { + pub pos: u64, +} + +impl From for u64 { + fn from(k: FilePos) -> Self { + k.pos + } +} + #[derive(Clone, Debug, Serialize, Deserialize)] pub enum TimeRange { Time { beg: DateTime, end: DateTime },